Edit on GitHub

communex.misc

  1import re
  2from typing import Any, TypeVar, cast
  3
  4from communex._common import transform_stake_dmap
  5from communex.balance import to_nano
  6from communex.cli._common import transform_subnet_params
  7from communex.client import CommuneClient
  8from communex.key import check_ss58_address
  9from communex.types import (
 10    BurnConfiguration,
 11    GovernanceConfiguration,
 12    ModuleInfoWithOptionalBalance,
 13    NetworkParams,
 14    Ss58Address,
 15    SubnetParamsMaps,
 16    SubnetParamsWithEmission,
 17)
 18
 19IPFS_REGEX = re.compile(r"^Qm[1-9A-HJ-NP-Za-km-z]{44}$")
 20
 21T = TypeVar("T")
 22
 23
 24def get_map_modules(
 25    client: CommuneClient,
 26    netuid: int = 0,
 27    include_balances: bool = False,
 28) -> dict[str, ModuleInfoWithOptionalBalance]:
 29    """
 30    Gets all modules info on the network
 31    """
 32
 33    request_dict: dict[Any, Any] = {
 34        "SubspaceModule": [
 35            ("StakeFrom", []),
 36            ("Keys", [netuid]),
 37            ("Name", [netuid]),
 38            ("Address", [netuid]),
 39            ("RegistrationBlock", [netuid]),
 40            ("DelegationFee", []),
 41            ("Emission", []),
 42            ("Incentive", []),
 43            ("Dividends", []),
 44            ("LastUpdate", []),
 45            ("Metadata", [netuid]),
 46            ("StakeTo", []),
 47        ],
 48    }
 49    if include_balances:
 50        request_dict["System"] = [("Account", [])]
 51    bulk_query = client.query_batch_map(request_dict)
 52    (
 53        ss58_to_stakefrom,
 54        uid_to_key,
 55        uid_to_name,
 56        uid_to_address,
 57        uid_to_regblock,
 58        ss58_to_delegationfee,
 59        uid_to_emission,
 60        uid_to_incentive,
 61        uid_to_dividend,
 62        uid_to_lastupdate,
 63        ss58_to_balances,
 64        ss58_to_metadata,
 65    ) = (
 66        bulk_query.get("StakeFrom", {}),
 67        bulk_query.get("Keys", {}),
 68        bulk_query["Name"],
 69        bulk_query["Address"],
 70        bulk_query["RegistrationBlock"],
 71        bulk_query["DelegationFee"],
 72        bulk_query["Emission"],
 73        bulk_query["Incentive"],
 74        bulk_query["Dividends"],
 75        bulk_query["LastUpdate"],
 76        bulk_query.get("Account", {}),
 77        bulk_query.get("Metadata", {}),
 78    )
 79    result_modules: dict[str, ModuleInfoWithOptionalBalance] = {}
 80    ss58_to_stakefrom = transform_stake_dmap(ss58_to_stakefrom)
 81    for uid, key in uid_to_key.items():
 82        key = check_ss58_address(key)
 83        name = uid_to_name[uid]
 84        address = uid_to_address[uid]
 85        emission = uid_to_emission[netuid][uid]
 86        incentive = uid_to_incentive[netuid][uid]
 87        dividend = uid_to_dividend[netuid][uid]
 88        regblock = uid_to_regblock[uid]
 89        stake_from = ss58_to_stakefrom.get(key, [])
 90        last_update = uid_to_lastupdate[netuid][uid]
 91        delegation_fee = ss58_to_delegationfee.get(
 92            key, 5
 93        )  # 5% default delegation fee
 94        metadata = ss58_to_metadata.get(key, None)
 95
 96        balance = None
 97        if include_balances and ss58_to_balances is not None:  # type: ignore
 98            balance_dict = ss58_to_balances.get(key, None)
 99            if balance_dict is not None:
100                assert isinstance(balance_dict["data"], dict)
101                balance = balance_dict["data"]["free"]
102            else:
103                balance = 0
104        stake = sum(stake for _, stake in stake_from)
105
106        module: ModuleInfoWithOptionalBalance = {
107            "uid": uid,
108            "key": key,
109            "name": name,
110            "address": address,
111            "emission": emission,
112            "incentive": incentive,
113            "dividends": dividend,
114            "stake_from": stake_from,
115            "regblock": regblock,
116            "last_update": last_update,
117            "balance": balance,
118            "stake": stake,
119            "delegation_fee": delegation_fee,
120            "metadata": metadata,
121        }
122
123        result_modules[key] = module
124    return result_modules
125
126
127def to_snake_case(d: dict[str, T]) -> dict[str, T]:
128    """
129    Converts a dictionary with camelCase keys to snake_case keys
130    """
131
132    def snakerize(camel: str) -> str:
133        return re.sub(r"(?<!^)(?=[A-Z])", "_", camel).lower()
134
135    snaked: dict[str, T] = {snakerize(k): v for k, v in d.items()}
136    return snaked
137
138
139def get_map_displayable_subnets(client: CommuneClient):
140    subnets = get_map_subnets_params(client)
141    display_values = transform_subnet_params(subnets)
142    return display_values
143
144
145def get_map_subnets_params(
146    client: CommuneClient, block_hash: str | None = None
147) -> dict[int, SubnetParamsWithEmission]:
148    """
149    Gets all subnets info on the network
150    """
151    bulk_query = client.query_batch_map(
152        {
153            "SubspaceModule": [
154                ("ImmunityPeriod", []),
155                ("MinAllowedWeights", []),
156                ("MaxAllowedWeights", []),
157                ("Tempo", []),
158                ("MaxAllowedUids", []),
159                ("Founder", []),
160                ("FounderShare", []),
161                ("IncentiveRatio", []),
162                ("TrustRatio", []),
163                ("SubnetNames", []),
164                ("MaxWeightAge", []),
165                ("BondsMovingAverage", []),
166                ("MaximumSetWeightCallsPerEpoch", []),
167                ("MinValidatorStake", []),
168                ("MaxAllowedValidators", []),
169                ("ModuleBurnConfig", []),
170                ("SubnetMetadata", []),
171            ],
172            "GovernanceModule": [
173                ("SubnetGovernanceConfig", []),
174            ],
175            "SubnetEmissionModule": [
176                ("SubnetEmission", []),
177            ],
178        },
179        block_hash,
180    )
181    subnet_maps: SubnetParamsMaps = {
182        "netuid_to_emission": bulk_query["SubnetEmission"],
183        "netuid_to_tempo": bulk_query["Tempo"],
184        "netuid_to_min_allowed_weights": bulk_query["MinAllowedWeights"],
185        "netuid_to_max_allowed_weights": bulk_query["MaxAllowedWeights"],
186        "netuid_to_max_allowed_uids": bulk_query["MaxAllowedUids"],
187        "netuid_to_founder": bulk_query["Founder"],
188        "netuid_to_founder_share": bulk_query["FounderShare"],
189        "netuid_to_incentive_ratio": bulk_query["IncentiveRatio"],
190        "netuid_to_trust_ratio": bulk_query["TrustRatio"],
191        "netuid_to_name": bulk_query["SubnetNames"],
192        "netuid_to_max_weight_age": bulk_query["MaxWeightAge"],
193        "netuid_to_bonds_ma": bulk_query.get("BondsMovingAverage", {}),
194        "netuid_to_maximum_set_weight_calls_per_epoch": bulk_query.get(
195            "MaximumSetWeightCallsPerEpoch", {}
196        ),
197        "netuid_to_governance_configuration": bulk_query[
198            "SubnetGovernanceConfig"
199        ],
200        "netuid_to_immunity_period": bulk_query["ImmunityPeriod"],
201        "netuid_to_min_validator_stake": bulk_query.get(
202            "MinValidatorStake", {}
203        ),
204        "netuid_to_max_allowed_validators": bulk_query.get(
205            "MaxAllowedValidators", {}
206        ),
207        "netuid_to_module_burn_config": bulk_query.get("ModuleBurnConfig", {}),
208        "netuid_to_subnet_metadata": bulk_query.get("SubnetMetadata", {}),
209    }
210    result_subnets: dict[int, SubnetParamsWithEmission] = {}
211
212    for netuid, name in subnet_maps["netuid_to_name"].items():
213        subnet: SubnetParamsWithEmission = {
214            "name": name,
215            "founder": subnet_maps["netuid_to_founder"][netuid],
216            "founder_share": subnet_maps["netuid_to_founder_share"][netuid],
217            "incentive_ratio": subnet_maps["netuid_to_incentive_ratio"][netuid],
218            "max_allowed_uids": subnet_maps["netuid_to_max_allowed_uids"][
219                netuid
220            ],
221            "max_allowed_weights": subnet_maps["netuid_to_max_allowed_weights"][
222                netuid
223            ],
224            "min_allowed_weights": subnet_maps["netuid_to_min_allowed_weights"][
225                netuid
226            ],
227            "tempo": subnet_maps["netuid_to_tempo"][netuid],
228            "trust_ratio": subnet_maps["netuid_to_trust_ratio"][netuid],
229            "emission": subnet_maps["netuid_to_emission"][netuid],
230            "max_weight_age": subnet_maps["netuid_to_max_weight_age"][netuid],
231            "bonds_ma": subnet_maps["netuid_to_bonds_ma"].get(netuid, None),
232            "maximum_set_weight_calls_per_epoch": subnet_maps[
233                "netuid_to_maximum_set_weight_calls_per_epoch"
234            ].get(netuid, 30),
235            "governance_config": subnet_maps[
236                "netuid_to_governance_configuration"
237            ][netuid],
238            "immunity_period": subnet_maps["netuid_to_immunity_period"][netuid],
239            "min_validator_stake": subnet_maps[
240                "netuid_to_min_validator_stake"
241            ].get(netuid, to_nano(50_000)),
242            "max_allowed_validators": subnet_maps[
243                "netuid_to_max_allowed_validators"
244            ].get(netuid, 50),
245            "module_burn_config": cast(
246                BurnConfiguration,
247                subnet_maps["netuid_to_module_burn_config"].get(netuid, None),
248            ),
249            "subnet_metadata": subnet_maps["netuid_to_subnet_metadata"].get(
250                netuid, None
251            ),
252        }
253
254        result_subnets[netuid] = subnet
255
256    return result_subnets
257
258
259def get_global_params(c_client: CommuneClient) -> NetworkParams:
260    """
261    Returns global parameters of the whole commune ecosystem
262    """
263
264    query_all = c_client.query_batch(
265        {
266            "SubspaceModule": [
267                ("MaxNameLength", []),
268                ("MinNameLength", []),
269                ("MaxAllowedSubnets", []),
270                ("MaxAllowedModules", []),
271                ("MaxRegistrationsPerBlock", []),
272                ("MaxAllowedWeightsGlobal", []),
273                ("FloorDelegationFee", []),
274                ("FloorFounderShare", []),
275                ("MinWeightStake", []),
276                ("Kappa", []),
277                ("Rho", []),
278                ("SubnetImmunityPeriod", []),
279                ("SubnetBurn", []),
280            ],
281            "GovernanceModule": [
282                ("GlobalGovernanceConfig", []),
283                ("GeneralSubnetApplicationCost", []),
284                ("Curator", []),
285            ],
286        }
287    )
288    global_config = cast(
289        GovernanceConfiguration, query_all["GlobalGovernanceConfig"]
290    )
291    global_params: NetworkParams = {
292        "max_allowed_subnets": int(query_all["MaxAllowedSubnets"]),
293        "max_allowed_modules": int(query_all["MaxAllowedModules"]),
294        "max_registrations_per_block": int(
295            query_all["MaxRegistrationsPerBlock"]
296        ),
297        "max_name_length": int(query_all["MaxNameLength"]),
298        "min_weight_stake": int(query_all["MinWeightStake"]),
299        "floor_delegation_fee": int(query_all["FloorDelegationFee"]),
300        "max_allowed_weights": int(query_all["MaxAllowedWeightsGlobal"]),
301        "curator": Ss58Address(query_all["Curator"]),
302        "min_name_length": int(query_all["MinNameLength"]),
303        "floor_founder_share": int(query_all["FloorFounderShare"]),
304        "general_subnet_application_cost": int(
305            query_all["GeneralSubnetApplicationCost"]
306        ),
307        "kappa": int(query_all["Kappa"]),
308        "rho": int(query_all["Rho"]),
309        "subnet_immunity_period": int(query_all["SubnetImmunityPeriod"]),
310        "subnet_registration_cost": int(query_all["SubnetBurn"]),
311        "governance_config": {
312            "proposal_cost": int(global_config["proposal_cost"]),
313            "proposal_expiration": int(global_config["proposal_expiration"]),
314            "vote_mode": global_config["vote_mode"],
315            "proposal_reward_treasury_allocation": int(
316                global_config["proposal_reward_treasury_allocation"]
317            ),
318            "max_proposal_reward_treasury_allocation": int(
319                global_config["max_proposal_reward_treasury_allocation"]
320            ),
321            "proposal_reward_interval": int(
322                global_config["proposal_reward_interval"]
323            ),
324        },
325    }
326    return global_params
327
328
329def concat_to_local_keys(
330    balance: dict[str, int], local_key_info: dict[str, Ss58Address]
331) -> dict[str, int]:
332    key2: dict[str, int] = {
333        key_name: balance.get(key_address, 0)
334        for key_name, key_address in local_key_info.items()
335    }
336
337    return key2
338
339
340def local_keys_to_freebalance(
341    c_client: CommuneClient,
342    local_keys: dict[str, Ss58Address],
343) -> dict[str, int]:
344    query_all = c_client.query_batch_map(
345        {
346            "System": [("Account", [])],
347        }
348    )
349    balance_map = query_all["Account"]
350
351    format_balances: dict[str, int] = {
352        key: value["data"]["free"]
353        for key, value in balance_map.items()
354        if "data" in value and "free" in value["data"]
355    }
356
357    key2balance: dict[str, int] = concat_to_local_keys(
358        format_balances, local_keys
359    )
360
361    return key2balance
362
363
364def local_keys_to_stakedbalance(
365    c_client: CommuneClient,
366    local_keys: dict[str, Ss58Address],
367) -> dict[str, int]:
368    staketo_map = c_client.query_map_staketo()
369
370    format_stake: dict[str, int] = {
371        key: sum(stake for _, stake in value)
372        for key, value in staketo_map.items()
373    }
374
375    key2stake: dict[str, int] = concat_to_local_keys(format_stake, local_keys)
376
377    return key2stake
378
379
380def local_keys_to_stakedfrom_balance(
381    c_client: CommuneClient,
382    local_keys: dict[str, Ss58Address],
383) -> dict[str, int]:
384    stakefrom_map = c_client.query_map_stakefrom()
385
386    format_stake: dict[str, int] = {
387        key: sum(stake for _, stake in value)
388        for key, value in stakefrom_map.items()
389    }
390
391    key2stake: dict[str, int] = concat_to_local_keys(format_stake, local_keys)
392    key2stake = {key: stake for key, stake in key2stake.items()}
393    return key2stake
394
395
396def local_keys_allbalance(
397    c_client: CommuneClient,
398    local_keys: dict[str, Ss58Address],
399) -> tuple[dict[str, int], dict[str, int]]:
400    query_all = c_client.query_batch_map(
401        {
402            "System": [("Account", [])],
403            "SubspaceModule": [
404                ("StakeTo", []),
405            ],
406        }
407    )
408
409    balance_map, staketo_map = (
410        query_all["Account"],
411        transform_stake_dmap(query_all["StakeTo"]),
412    )
413
414    format_balances: dict[str, int] = {
415        key: value["data"]["free"]
416        for key, value in balance_map.items()
417        if "data" in value and "free" in value["data"]
418    }
419    key2balance: dict[str, int] = concat_to_local_keys(
420        format_balances, local_keys
421    )
422    format_stake: dict[str, int] = {
423        key: sum(stake for _, stake in value)
424        for key, value in staketo_map.items()
425    }
426
427    key2stake: dict[str, int] = concat_to_local_keys(format_stake, local_keys)
428
429    key2balance = {
430        k: v
431        for k, v in sorted(
432            key2balance.items(), key=lambda item: item[1], reverse=True
433        )
434    }
435
436    key2stake = {
437        k: v
438        for k, v in sorted(
439            key2stake.items(), key=lambda item: item[1], reverse=True
440        )
441    }
442
443    return key2balance, key2stake
444
445
446if __name__ == "__main__":
447    from communex._common import get_node_url
448
449    client = CommuneClient(get_node_url(use_testnet=True))
450    get_global_params(client)
IPFS_REGEX = re.compile('^Qm[1-9A-HJ-NP-Za-km-z]{44}$')
def get_map_modules( client: communex.client.CommuneClient, netuid: int = 0, include_balances: bool = False) -> dict[str, communex.types.ModuleInfoWithOptionalBalance]:
 25def get_map_modules(
 26    client: CommuneClient,
 27    netuid: int = 0,
 28    include_balances: bool = False,
 29) -> dict[str, ModuleInfoWithOptionalBalance]:
 30    """
 31    Gets all modules info on the network
 32    """
 33
 34    request_dict: dict[Any, Any] = {
 35        "SubspaceModule": [
 36            ("StakeFrom", []),
 37            ("Keys", [netuid]),
 38            ("Name", [netuid]),
 39            ("Address", [netuid]),
 40            ("RegistrationBlock", [netuid]),
 41            ("DelegationFee", []),
 42            ("Emission", []),
 43            ("Incentive", []),
 44            ("Dividends", []),
 45            ("LastUpdate", []),
 46            ("Metadata", [netuid]),
 47            ("StakeTo", []),
 48        ],
 49    }
 50    if include_balances:
 51        request_dict["System"] = [("Account", [])]
 52    bulk_query = client.query_batch_map(request_dict)
 53    (
 54        ss58_to_stakefrom,
 55        uid_to_key,
 56        uid_to_name,
 57        uid_to_address,
 58        uid_to_regblock,
 59        ss58_to_delegationfee,
 60        uid_to_emission,
 61        uid_to_incentive,
 62        uid_to_dividend,
 63        uid_to_lastupdate,
 64        ss58_to_balances,
 65        ss58_to_metadata,
 66    ) = (
 67        bulk_query.get("StakeFrom", {}),
 68        bulk_query.get("Keys", {}),
 69        bulk_query["Name"],
 70        bulk_query["Address"],
 71        bulk_query["RegistrationBlock"],
 72        bulk_query["DelegationFee"],
 73        bulk_query["Emission"],
 74        bulk_query["Incentive"],
 75        bulk_query["Dividends"],
 76        bulk_query["LastUpdate"],
 77        bulk_query.get("Account", {}),
 78        bulk_query.get("Metadata", {}),
 79    )
 80    result_modules: dict[str, ModuleInfoWithOptionalBalance] = {}
 81    ss58_to_stakefrom = transform_stake_dmap(ss58_to_stakefrom)
 82    for uid, key in uid_to_key.items():
 83        key = check_ss58_address(key)
 84        name = uid_to_name[uid]
 85        address = uid_to_address[uid]
 86        emission = uid_to_emission[netuid][uid]
 87        incentive = uid_to_incentive[netuid][uid]
 88        dividend = uid_to_dividend[netuid][uid]
 89        regblock = uid_to_regblock[uid]
 90        stake_from = ss58_to_stakefrom.get(key, [])
 91        last_update = uid_to_lastupdate[netuid][uid]
 92        delegation_fee = ss58_to_delegationfee.get(
 93            key, 5
 94        )  # 5% default delegation fee
 95        metadata = ss58_to_metadata.get(key, None)
 96
 97        balance = None
 98        if include_balances and ss58_to_balances is not None:  # type: ignore
 99            balance_dict = ss58_to_balances.get(key, None)
100            if balance_dict is not None:
101                assert isinstance(balance_dict["data"], dict)
102                balance = balance_dict["data"]["free"]
103            else:
104                balance = 0
105        stake = sum(stake for _, stake in stake_from)
106
107        module: ModuleInfoWithOptionalBalance = {
108            "uid": uid,
109            "key": key,
110            "name": name,
111            "address": address,
112            "emission": emission,
113            "incentive": incentive,
114            "dividends": dividend,
115            "stake_from": stake_from,
116            "regblock": regblock,
117            "last_update": last_update,
118            "balance": balance,
119            "stake": stake,
120            "delegation_fee": delegation_fee,
121            "metadata": metadata,
122        }
123
124        result_modules[key] = module
125    return result_modules

Gets all modules info on the network

def to_snake_case(d: dict[str, ~T]) -> dict[str, ~T]:
128def to_snake_case(d: dict[str, T]) -> dict[str, T]:
129    """
130    Converts a dictionary with camelCase keys to snake_case keys
131    """
132
133    def snakerize(camel: str) -> str:
134        return re.sub(r"(?<!^)(?=[A-Z])", "_", camel).lower()
135
136    snaked: dict[str, T] = {snakerize(k): v for k, v in d.items()}
137    return snaked

Converts a dictionary with camelCase keys to snake_case keys

def get_map_displayable_subnets(client: communex.client.CommuneClient):
140def get_map_displayable_subnets(client: CommuneClient):
141    subnets = get_map_subnets_params(client)
142    display_values = transform_subnet_params(subnets)
143    return display_values
def get_map_subnets_params( client: communex.client.CommuneClient, block_hash: str | None = None) -> dict[int, communex.types.SubnetParamsWithEmission]:
146def get_map_subnets_params(
147    client: CommuneClient, block_hash: str | None = None
148) -> dict[int, SubnetParamsWithEmission]:
149    """
150    Gets all subnets info on the network
151    """
152    bulk_query = client.query_batch_map(
153        {
154            "SubspaceModule": [
155                ("ImmunityPeriod", []),
156                ("MinAllowedWeights", []),
157                ("MaxAllowedWeights", []),
158                ("Tempo", []),
159                ("MaxAllowedUids", []),
160                ("Founder", []),
161                ("FounderShare", []),
162                ("IncentiveRatio", []),
163                ("TrustRatio", []),
164                ("SubnetNames", []),
165                ("MaxWeightAge", []),
166                ("BondsMovingAverage", []),
167                ("MaximumSetWeightCallsPerEpoch", []),
168                ("MinValidatorStake", []),
169                ("MaxAllowedValidators", []),
170                ("ModuleBurnConfig", []),
171                ("SubnetMetadata", []),
172            ],
173            "GovernanceModule": [
174                ("SubnetGovernanceConfig", []),
175            ],
176            "SubnetEmissionModule": [
177                ("SubnetEmission", []),
178            ],
179        },
180        block_hash,
181    )
182    subnet_maps: SubnetParamsMaps = {
183        "netuid_to_emission": bulk_query["SubnetEmission"],
184        "netuid_to_tempo": bulk_query["Tempo"],
185        "netuid_to_min_allowed_weights": bulk_query["MinAllowedWeights"],
186        "netuid_to_max_allowed_weights": bulk_query["MaxAllowedWeights"],
187        "netuid_to_max_allowed_uids": bulk_query["MaxAllowedUids"],
188        "netuid_to_founder": bulk_query["Founder"],
189        "netuid_to_founder_share": bulk_query["FounderShare"],
190        "netuid_to_incentive_ratio": bulk_query["IncentiveRatio"],
191        "netuid_to_trust_ratio": bulk_query["TrustRatio"],
192        "netuid_to_name": bulk_query["SubnetNames"],
193        "netuid_to_max_weight_age": bulk_query["MaxWeightAge"],
194        "netuid_to_bonds_ma": bulk_query.get("BondsMovingAverage", {}),
195        "netuid_to_maximum_set_weight_calls_per_epoch": bulk_query.get(
196            "MaximumSetWeightCallsPerEpoch", {}
197        ),
198        "netuid_to_governance_configuration": bulk_query[
199            "SubnetGovernanceConfig"
200        ],
201        "netuid_to_immunity_period": bulk_query["ImmunityPeriod"],
202        "netuid_to_min_validator_stake": bulk_query.get(
203            "MinValidatorStake", {}
204        ),
205        "netuid_to_max_allowed_validators": bulk_query.get(
206            "MaxAllowedValidators", {}
207        ),
208        "netuid_to_module_burn_config": bulk_query.get("ModuleBurnConfig", {}),
209        "netuid_to_subnet_metadata": bulk_query.get("SubnetMetadata", {}),
210    }
211    result_subnets: dict[int, SubnetParamsWithEmission] = {}
212
213    for netuid, name in subnet_maps["netuid_to_name"].items():
214        subnet: SubnetParamsWithEmission = {
215            "name": name,
216            "founder": subnet_maps["netuid_to_founder"][netuid],
217            "founder_share": subnet_maps["netuid_to_founder_share"][netuid],
218            "incentive_ratio": subnet_maps["netuid_to_incentive_ratio"][netuid],
219            "max_allowed_uids": subnet_maps["netuid_to_max_allowed_uids"][
220                netuid
221            ],
222            "max_allowed_weights": subnet_maps["netuid_to_max_allowed_weights"][
223                netuid
224            ],
225            "min_allowed_weights": subnet_maps["netuid_to_min_allowed_weights"][
226                netuid
227            ],
228            "tempo": subnet_maps["netuid_to_tempo"][netuid],
229            "trust_ratio": subnet_maps["netuid_to_trust_ratio"][netuid],
230            "emission": subnet_maps["netuid_to_emission"][netuid],
231            "max_weight_age": subnet_maps["netuid_to_max_weight_age"][netuid],
232            "bonds_ma": subnet_maps["netuid_to_bonds_ma"].get(netuid, None),
233            "maximum_set_weight_calls_per_epoch": subnet_maps[
234                "netuid_to_maximum_set_weight_calls_per_epoch"
235            ].get(netuid, 30),
236            "governance_config": subnet_maps[
237                "netuid_to_governance_configuration"
238            ][netuid],
239            "immunity_period": subnet_maps["netuid_to_immunity_period"][netuid],
240            "min_validator_stake": subnet_maps[
241                "netuid_to_min_validator_stake"
242            ].get(netuid, to_nano(50_000)),
243            "max_allowed_validators": subnet_maps[
244                "netuid_to_max_allowed_validators"
245            ].get(netuid, 50),
246            "module_burn_config": cast(
247                BurnConfiguration,
248                subnet_maps["netuid_to_module_burn_config"].get(netuid, None),
249            ),
250            "subnet_metadata": subnet_maps["netuid_to_subnet_metadata"].get(
251                netuid, None
252            ),
253        }
254
255        result_subnets[netuid] = subnet
256
257    return result_subnets

Gets all subnets info on the network

def get_global_params(c_client: communex.client.CommuneClient) -> communex.types.NetworkParams:
260def get_global_params(c_client: CommuneClient) -> NetworkParams:
261    """
262    Returns global parameters of the whole commune ecosystem
263    """
264
265    query_all = c_client.query_batch(
266        {
267            "SubspaceModule": [
268                ("MaxNameLength", []),
269                ("MinNameLength", []),
270                ("MaxAllowedSubnets", []),
271                ("MaxAllowedModules", []),
272                ("MaxRegistrationsPerBlock", []),
273                ("MaxAllowedWeightsGlobal", []),
274                ("FloorDelegationFee", []),
275                ("FloorFounderShare", []),
276                ("MinWeightStake", []),
277                ("Kappa", []),
278                ("Rho", []),
279                ("SubnetImmunityPeriod", []),
280                ("SubnetBurn", []),
281            ],
282            "GovernanceModule": [
283                ("GlobalGovernanceConfig", []),
284                ("GeneralSubnetApplicationCost", []),
285                ("Curator", []),
286            ],
287        }
288    )
289    global_config = cast(
290        GovernanceConfiguration, query_all["GlobalGovernanceConfig"]
291    )
292    global_params: NetworkParams = {
293        "max_allowed_subnets": int(query_all["MaxAllowedSubnets"]),
294        "max_allowed_modules": int(query_all["MaxAllowedModules"]),
295        "max_registrations_per_block": int(
296            query_all["MaxRegistrationsPerBlock"]
297        ),
298        "max_name_length": int(query_all["MaxNameLength"]),
299        "min_weight_stake": int(query_all["MinWeightStake"]),
300        "floor_delegation_fee": int(query_all["FloorDelegationFee"]),
301        "max_allowed_weights": int(query_all["MaxAllowedWeightsGlobal"]),
302        "curator": Ss58Address(query_all["Curator"]),
303        "min_name_length": int(query_all["MinNameLength"]),
304        "floor_founder_share": int(query_all["FloorFounderShare"]),
305        "general_subnet_application_cost": int(
306            query_all["GeneralSubnetApplicationCost"]
307        ),
308        "kappa": int(query_all["Kappa"]),
309        "rho": int(query_all["Rho"]),
310        "subnet_immunity_period": int(query_all["SubnetImmunityPeriod"]),
311        "subnet_registration_cost": int(query_all["SubnetBurn"]),
312        "governance_config": {
313            "proposal_cost": int(global_config["proposal_cost"]),
314            "proposal_expiration": int(global_config["proposal_expiration"]),
315            "vote_mode": global_config["vote_mode"],
316            "proposal_reward_treasury_allocation": int(
317                global_config["proposal_reward_treasury_allocation"]
318            ),
319            "max_proposal_reward_treasury_allocation": int(
320                global_config["max_proposal_reward_treasury_allocation"]
321            ),
322            "proposal_reward_interval": int(
323                global_config["proposal_reward_interval"]
324            ),
325        },
326    }
327    return global_params

Returns global parameters of the whole commune ecosystem

def concat_to_local_keys( balance: dict[str, int], local_key_info: dict[str, communex.types.Ss58Address]) -> dict[str, int]:
330def concat_to_local_keys(
331    balance: dict[str, int], local_key_info: dict[str, Ss58Address]
332) -> dict[str, int]:
333    key2: dict[str, int] = {
334        key_name: balance.get(key_address, 0)
335        for key_name, key_address in local_key_info.items()
336    }
337
338    return key2
def local_keys_to_freebalance( c_client: communex.client.CommuneClient, local_keys: dict[str, communex.types.Ss58Address]) -> dict[str, int]:
341def local_keys_to_freebalance(
342    c_client: CommuneClient,
343    local_keys: dict[str, Ss58Address],
344) -> dict[str, int]:
345    query_all = c_client.query_batch_map(
346        {
347            "System": [("Account", [])],
348        }
349    )
350    balance_map = query_all["Account"]
351
352    format_balances: dict[str, int] = {
353        key: value["data"]["free"]
354        for key, value in balance_map.items()
355        if "data" in value and "free" in value["data"]
356    }
357
358    key2balance: dict[str, int] = concat_to_local_keys(
359        format_balances, local_keys
360    )
361
362    return key2balance
def local_keys_to_stakedbalance( c_client: communex.client.CommuneClient, local_keys: dict[str, communex.types.Ss58Address]) -> dict[str, int]:
365def local_keys_to_stakedbalance(
366    c_client: CommuneClient,
367    local_keys: dict[str, Ss58Address],
368) -> dict[str, int]:
369    staketo_map = c_client.query_map_staketo()
370
371    format_stake: dict[str, int] = {
372        key: sum(stake for _, stake in value)
373        for key, value in staketo_map.items()
374    }
375
376    key2stake: dict[str, int] = concat_to_local_keys(format_stake, local_keys)
377
378    return key2stake
def local_keys_to_stakedfrom_balance( c_client: communex.client.CommuneClient, local_keys: dict[str, communex.types.Ss58Address]) -> dict[str, int]:
381def local_keys_to_stakedfrom_balance(
382    c_client: CommuneClient,
383    local_keys: dict[str, Ss58Address],
384) -> dict[str, int]:
385    stakefrom_map = c_client.query_map_stakefrom()
386
387    format_stake: dict[str, int] = {
388        key: sum(stake for _, stake in value)
389        for key, value in stakefrom_map.items()
390    }
391
392    key2stake: dict[str, int] = concat_to_local_keys(format_stake, local_keys)
393    key2stake = {key: stake for key, stake in key2stake.items()}
394    return key2stake
def local_keys_allbalance( c_client: communex.client.CommuneClient, local_keys: dict[str, communex.types.Ss58Address]) -> tuple[dict[str, int], dict[str, int]]:
397def local_keys_allbalance(
398    c_client: CommuneClient,
399    local_keys: dict[str, Ss58Address],
400) -> tuple[dict[str, int], dict[str, int]]:
401    query_all = c_client.query_batch_map(
402        {
403            "System": [("Account", [])],
404            "SubspaceModule": [
405                ("StakeTo", []),
406            ],
407        }
408    )
409
410    balance_map, staketo_map = (
411        query_all["Account"],
412        transform_stake_dmap(query_all["StakeTo"]),
413    )
414
415    format_balances: dict[str, int] = {
416        key: value["data"]["free"]
417        for key, value in balance_map.items()
418        if "data" in value and "free" in value["data"]
419    }
420    key2balance: dict[str, int] = concat_to_local_keys(
421        format_balances, local_keys
422    )
423    format_stake: dict[str, int] = {
424        key: sum(stake for _, stake in value)
425        for key, value in staketo_map.items()
426    }
427
428    key2stake: dict[str, int] = concat_to_local_keys(format_stake, local_keys)
429
430    key2balance = {
431        k: v
432        for k, v in sorted(
433            key2balance.items(), key=lambda item: item[1], reverse=True
434        )
435    }
436
437    key2stake = {
438        k: v
439        for k, v in sorted(
440            key2stake.items(), key=lambda item: item[1], reverse=True
441        )
442    }
443
444    return key2balance, key2stake