Edit on GitHub

communex.misc

  1import re
  2from typing import Any, TypeVar, cast
  3
  4from communex._common import transform_stake_dmap
  5from communex.balance import to_nano
  6from communex.client import CommuneClient
  7from communex.key import check_ss58_address
  8from communex.types import (
  9    BurnConfiguration,
 10    GovernanceConfiguration,
 11    ModuleInfoWithOptionalBalance,
 12    NetworkParams,
 13    Ss58Address,
 14    SubnetParamsMaps,
 15    SubnetParamsWithEmission,
 16)
 17
 18IPFS_REGEX = re.compile(r"^Qm[1-9A-HJ-NP-Za-km-z]{44}$")
 19
 20T = TypeVar("T")
 21
 22
 23def get_map_modules(
 24    client: CommuneClient,
 25    netuid: int = 0,
 26    include_balances: bool = False,
 27) -> dict[str, ModuleInfoWithOptionalBalance]:
 28    """
 29    Gets all modules info on the network
 30    """
 31
 32    request_dict: dict[Any, Any] = {
 33        "SubspaceModule": [
 34            ("StakeFrom", []),
 35            ("Keys", [netuid]),
 36            ("Name", [netuid]),
 37            ("Address", [netuid]),
 38            ("RegistrationBlock", [netuid]),
 39            ("ValidatorFeeConfig", []),
 40            ("Emission", []),
 41            ("Incentive", []),
 42            ("Dividends", []),
 43            ("LastUpdate", []),
 44            ("Metadata", [netuid]),
 45            ("StakeTo", []),
 46        ],
 47    }
 48    if include_balances:
 49        request_dict["System"] = [("Account", [])]
 50    bulk_query = client.query_batch_map(request_dict)
 51    (
 52        ss58_to_stakefrom,
 53        uid_to_key,
 54        uid_to_name,
 55        uid_to_address,
 56        uid_to_regblock,
 57        ss58_to_delegationfee,
 58        uid_to_emission,
 59        uid_to_incentive,
 60        uid_to_dividend,
 61        uid_to_lastupdate,
 62        ss58_to_balances,
 63        ss58_to_metadata,
 64    ) = (
 65        bulk_query.get("StakeFrom", {}),
 66        bulk_query.get("Keys", {}),
 67        bulk_query["Name"],
 68        bulk_query["Address"],
 69        bulk_query["RegistrationBlock"],
 70        bulk_query["ValidatorFeeConfig"],
 71        bulk_query["Emission"],
 72        bulk_query["Incentive"],
 73        bulk_query["Dividends"],
 74        bulk_query["LastUpdate"],
 75        bulk_query.get("Account", {}),
 76        bulk_query.get("Metadata", {}),
 77    )
 78    result_modules: dict[str, ModuleInfoWithOptionalBalance] = {}
 79    ss58_to_stakefrom = transform_stake_dmap(ss58_to_stakefrom)
 80    for uid, key in uid_to_key.items():
 81        key = check_ss58_address(key)
 82        name = uid_to_name[uid]
 83        address = uid_to_address[uid]
 84        emission = uid_to_emission[netuid][uid]
 85        incentive = uid_to_incentive[netuid][uid]
 86        dividend = uid_to_dividend[netuid][uid]
 87        regblock = uid_to_regblock[uid]
 88        stake_from = ss58_to_stakefrom.get(key, [])
 89        last_update = uid_to_lastupdate[netuid][uid]
 90        stake_delegation_fee = ss58_to_delegationfee.get(key, {}).get(
 91            "stake_delegation_fee", 0
 92        )
 93        validator_weight_fee = ss58_to_delegationfee.get(key, {}).get(
 94            "validator_weight_fee", 0
 95        )
 96        metadata = ss58_to_metadata.get(key, None)
 97
 98        balance = None
 99        if include_balances and ss58_to_balances is not None:  # type: ignore
100            balance_dict = ss58_to_balances.get(key, None)
101            if balance_dict is not None:
102                assert isinstance(balance_dict["data"], dict)
103                balance = balance_dict["data"]["free"]
104            else:
105                balance = 0
106        stake = sum(stake for _, stake in stake_from)
107
108        module: ModuleInfoWithOptionalBalance = {
109            "uid": uid,
110            "key": key,
111            "name": name,
112            "address": address,
113            "emission": emission,
114            "incentive": incentive,
115            "dividends": dividend,
116            "stake_from": stake_from,
117            "regblock": regblock,
118            "last_update": last_update,
119            "balance": balance,
120            "stake": stake,
121            "stake_delegation_fee": stake_delegation_fee,
122            "validator_weight_fee": validator_weight_fee,
123            "metadata": metadata,
124        }
125
126        result_modules[key] = module
127    return result_modules
128
129
130def to_snake_case(d: dict[str, T]) -> dict[str, T]:
131    """
132    Converts a dictionary with camelCase keys to snake_case keys
133    """
134
135    def snakerize(camel: str) -> str:
136        return re.sub(r"(?<!^)(?=[A-Z])", "_", camel).lower()
137
138    snaked: dict[str, T] = {snakerize(k): v for k, v in d.items()}
139    return snaked
140
141
142def get_map_displayable_subnets(client: CommuneClient):
143    from communex.cli._common import transform_subnet_params
144
145    subnets = get_map_subnets_params(client)
146    display_values = transform_subnet_params(subnets)
147    return display_values
148
149
150def get_map_subnets_params(
151    client: CommuneClient, block_hash: str | None = None
152) -> dict[int, SubnetParamsWithEmission]:
153    """
154    Gets all subnets info on the network
155    """
156    bulk_query = client.query_batch_map(
157        {
158            "SubspaceModule": [
159                ("ImmunityPeriod", []),
160                ("MinAllowedWeights", []),
161                ("MaxAllowedWeights", []),
162                ("Tempo", []),
163                ("MaxAllowedUids", []),
164                ("Founder", []),
165                ("FounderShare", []),
166                ("IncentiveRatio", []),
167                ("SubnetNames", []),
168                ("MaxWeightAge", []),
169                ("BondsMovingAverage", []),
170                ("MaximumSetWeightCallsPerEpoch", []),
171                ("MinValidatorStake", []),
172                ("MaxAllowedValidators", []),
173                ("ModuleBurnConfig", []),
174                ("SubnetMetadata", []),
175                ("MaxEncryptionPeriod", []),
176                ("CopierMargin", []),
177                ("UseWeightsEncryption", []),
178            ],
179            "GovernanceModule": [
180                ("SubnetGovernanceConfig", []),
181            ],
182            "SubnetEmissionModule": [
183                ("SubnetEmission", []),
184            ],
185        },
186        block_hash,
187    )
188    subnet_maps: SubnetParamsMaps = {
189        "netuid_to_emission": bulk_query["SubnetEmission"],
190        "netuid_to_tempo": bulk_query["Tempo"],
191        "netuid_to_min_allowed_weights": bulk_query["MinAllowedWeights"],
192        "netuid_to_max_allowed_weights": bulk_query["MaxAllowedWeights"],
193        "netuid_to_max_allowed_uids": bulk_query["MaxAllowedUids"],
194        "netuid_to_founder": bulk_query["Founder"],
195        "netuid_to_founder_share": bulk_query["FounderShare"],
196        "netuid_to_incentive_ratio": bulk_query["IncentiveRatio"],
197        "netuid_to_name": bulk_query["SubnetNames"],
198        "netuid_to_max_weight_age": bulk_query["MaxWeightAge"],
199        "netuid_to_bonds_ma": bulk_query.get("BondsMovingAverage", {}),
200        "netuid_to_maximum_set_weight_calls_per_epoch": bulk_query.get(
201            "MaximumSetWeightCallsPerEpoch", {}
202        ),
203        "netuid_to_governance_configuration": bulk_query[
204            "SubnetGovernanceConfig"
205        ],
206        "netuid_to_immunity_period": bulk_query["ImmunityPeriod"],
207        "netuid_to_min_validator_stake": bulk_query.get(
208            "MinValidatorStake", {}
209        ),
210        "netuid_to_max_allowed_validators": bulk_query.get(
211            "MaxAllowedValidators", {}
212        ),
213        "netuid_to_module_burn_config": bulk_query.get("ModuleBurnConfig", {}),
214        "netuid_to_subnet_metadata": bulk_query.get("SubnetMetadata", {}),
215        "netuid_to_max_encryption_period": bulk_query.get(
216            "MaxEncryptionPeriod", {}
217        ),
218        "netuid_to_copier_margin": bulk_query.get("CopierMargin", {}),
219        "netuid_to_use_weights_encryption": bulk_query.get(
220            "UseWeightsEncryption", {}
221        ),
222    }
223    result_subnets: dict[int, SubnetParamsWithEmission] = {}
224
225    for netuid, name in subnet_maps["netuid_to_name"].items():
226        subnet: SubnetParamsWithEmission = {
227            "name": name,
228            "founder": subnet_maps["netuid_to_founder"][netuid],
229            "founder_share": subnet_maps["netuid_to_founder_share"][netuid],
230            "incentive_ratio": subnet_maps["netuid_to_incentive_ratio"][netuid],
231            "max_allowed_uids": subnet_maps["netuid_to_max_allowed_uids"][
232                netuid
233            ],
234            "max_allowed_weights": subnet_maps["netuid_to_max_allowed_weights"][
235                netuid
236            ],
237            "min_allowed_weights": subnet_maps["netuid_to_min_allowed_weights"][
238                netuid
239            ],
240            "tempo": subnet_maps["netuid_to_tempo"][netuid],
241            "emission": subnet_maps["netuid_to_emission"][netuid],
242            "max_weight_age": subnet_maps["netuid_to_max_weight_age"][netuid],
243            "bonds_ma": subnet_maps["netuid_to_bonds_ma"].get(netuid, None),
244            "maximum_set_weight_calls_per_epoch": subnet_maps[
245                "netuid_to_maximum_set_weight_calls_per_epoch"
246            ].get(netuid, 30),
247            "governance_config": subnet_maps[
248                "netuid_to_governance_configuration"
249            ][netuid],
250            "immunity_period": subnet_maps["netuid_to_immunity_period"][netuid],
251            "min_validator_stake": subnet_maps[
252                "netuid_to_min_validator_stake"
253            ].get(netuid, to_nano(50_000)),
254            "max_allowed_validators": subnet_maps[
255                "netuid_to_max_allowed_validators"
256            ].get(netuid, 50),
257            "module_burn_config": cast(
258                BurnConfiguration,
259                subnet_maps["netuid_to_module_burn_config"].get(netuid, None),
260            ),
261            "subnet_metadata": subnet_maps["netuid_to_subnet_metadata"].get(
262                netuid, None
263            ),
264            "max_encryption_period": subnet_maps[
265                "netuid_to_max_encryption_period"
266            ].get(netuid, 0),
267            "copier_margin": subnet_maps["netuid_to_copier_margin"].get(
268                netuid, 0
269            ),
270            "use_weights_encryption": subnet_maps[
271                "netuid_to_use_weights_encryption"
272            ].get(netuid, 0),
273        }
274
275        result_subnets[netuid] = subnet
276
277    return result_subnets
278
279
280def get_global_params(c_client: CommuneClient) -> NetworkParams:
281    """
282    Returns global parameters of the whole commune ecosystem
283    """
284
285    query_all = c_client.query_batch(
286        {
287            "SubspaceModule": [
288                ("MaxNameLength", []),
289                ("MinNameLength", []),
290                ("MaxAllowedSubnets", []),
291                ("MaxAllowedModules", []),
292                ("MaxRegistrationsPerBlock", []),
293                ("MaxAllowedWeightsGlobal", []),
294                ("FloorDelegationFee", []),
295                ("FloorFounderShare", []),
296                ("MinWeightStake", []),
297                ("Kappa", []),
298                ("Rho", []),
299                ("SubnetImmunityPeriod", []),
300                ("SubnetBurn", []),
301            ],
302            "GovernanceModule": [
303                ("GlobalGovernanceConfig", []),
304                ("GeneralSubnetApplicationCost", []),
305                ("Curator", []),
306            ],
307        }
308    )
309    global_config = cast(
310        GovernanceConfiguration, query_all["GlobalGovernanceConfig"]
311    )
312    global_params: NetworkParams = {
313        "max_allowed_subnets": int(query_all["MaxAllowedSubnets"]),
314        "max_allowed_modules": int(query_all["MaxAllowedModules"]),
315        "max_registrations_per_block": int(
316            query_all["MaxRegistrationsPerBlock"]
317        ),
318        "max_name_length": int(query_all["MaxNameLength"]),
319        "min_weight_stake": int(query_all["MinWeightStake"]),
320        "floor_delegation_fee": int(query_all["FloorDelegationFee"]),
321        "max_allowed_weights": int(query_all["MaxAllowedWeightsGlobal"]),
322        "curator": Ss58Address(query_all["Curator"]),
323        "min_name_length": int(query_all["MinNameLength"]),
324        "floor_founder_share": int(query_all["FloorFounderShare"]),
325        "general_subnet_application_cost": int(
326            query_all["GeneralSubnetApplicationCost"]
327        ),
328        "kappa": int(query_all["Kappa"]),
329        "rho": int(query_all["Rho"]),
330        "subnet_immunity_period": int(query_all["SubnetImmunityPeriod"]),
331        "subnet_registration_cost": int(query_all["SubnetBurn"]),
332        "governance_config": {
333            "proposal_cost": int(global_config["proposal_cost"]),
334            "proposal_expiration": int(global_config["proposal_expiration"]),
335            "vote_mode": global_config["vote_mode"],
336            "proposal_reward_treasury_allocation": int(
337                global_config["proposal_reward_treasury_allocation"]
338            ),
339            "max_proposal_reward_treasury_allocation": int(
340                global_config["max_proposal_reward_treasury_allocation"]
341            ),
342            "proposal_reward_interval": int(
343                global_config["proposal_reward_interval"]
344            ),
345        },
346    }
347    return global_params
348
349
350def concat_to_local_keys(
351    balance: dict[str, int], local_key_info: dict[str, Ss58Address]
352) -> dict[str, int]:
353    key2: dict[str, int] = {
354        key_name: balance.get(key_address, 0)
355        for key_name, key_address in local_key_info.items()
356    }
357
358    return key2
359
360
361def local_keys_to_freebalance(
362    c_client: CommuneClient,
363    local_keys: dict[str, Ss58Address],
364) -> dict[str, int]:
365    query_all = c_client.query_batch_map(
366        {
367            "System": [("Account", [])],
368        }
369    )
370    balance_map = query_all["Account"]
371
372    format_balances: dict[str, int] = {
373        key: value["data"]["free"]
374        for key, value in balance_map.items()
375        if "data" in value and "free" in value["data"]
376    }
377
378    key2balance: dict[str, int] = concat_to_local_keys(
379        format_balances, local_keys
380    )
381
382    return key2balance
383
384
385def local_keys_to_stakedbalance(
386    c_client: CommuneClient,
387    local_keys: dict[str, Ss58Address],
388) -> dict[str, int]:
389    staketo_map = c_client.query_map_staketo()
390
391    format_stake: dict[str, int] = {
392        key: sum(stake for _, stake in value)
393        for key, value in staketo_map.items()
394    }
395
396    key2stake: dict[str, int] = concat_to_local_keys(format_stake, local_keys)
397
398    return key2stake
399
400
401def local_keys_to_stakedfrom_balance(
402    c_client: CommuneClient,
403    local_keys: dict[str, Ss58Address],
404) -> dict[str, int]:
405    stakefrom_map = c_client.query_map_stakefrom()
406
407    format_stake: dict[str, int] = {
408        key: sum(stake for _, stake in value)
409        for key, value in stakefrom_map.items()
410    }
411
412    key2stake: dict[str, int] = concat_to_local_keys(format_stake, local_keys)
413    key2stake = {key: stake for key, stake in key2stake.items()}
414    return key2stake
415
416
417def local_keys_allbalance(
418    c_client: CommuneClient,
419    local_keys: dict[str, Ss58Address],
420) -> tuple[dict[str, int], dict[str, int]]:
421    query_all = c_client.query_batch_map(
422        {
423            "System": [("Account", [])],
424            "SubspaceModule": [
425                ("StakeTo", []),
426            ],
427        }
428    )
429
430    balance_map, staketo_map = (
431        query_all["Account"],
432        transform_stake_dmap(query_all["StakeTo"]),
433    )
434
435    format_balances: dict[str, int] = {
436        key: value["data"]["free"]
437        for key, value in balance_map.items()
438        if "data" in value and "free" in value["data"]
439    }
440    key2balance: dict[str, int] = concat_to_local_keys(
441        format_balances, local_keys
442    )
443    format_stake: dict[str, int] = {
444        key: sum(stake for _, stake in value)
445        for key, value in staketo_map.items()
446    }
447
448    key2stake: dict[str, int] = concat_to_local_keys(format_stake, local_keys)
449
450    key2balance = {
451        k: v
452        for k, v in sorted(
453            key2balance.items(), key=lambda item: item[1], reverse=True
454        )
455    }
456
457    key2stake = {
458        k: v
459        for k, v in sorted(
460            key2stake.items(), key=lambda item: item[1], reverse=True
461        )
462    }
463
464    return key2balance, key2stake
465
466
467if __name__ == "__main__":
468    from communex._common import get_node_url
469
470    client = CommuneClient(get_node_url(use_testnet=True))
471    get_global_params(client)
IPFS_REGEX = re.compile('^Qm[1-9A-HJ-NP-Za-km-z]{44}$')
def get_map_modules( client: communex.client.CommuneClient, netuid: int = 0, include_balances: bool = False) -> dict[str, communex.types.ModuleInfoWithOptionalBalance]:
 24def get_map_modules(
 25    client: CommuneClient,
 26    netuid: int = 0,
 27    include_balances: bool = False,
 28) -> dict[str, ModuleInfoWithOptionalBalance]:
 29    """
 30    Gets all modules info on the network
 31    """
 32
 33    request_dict: dict[Any, Any] = {
 34        "SubspaceModule": [
 35            ("StakeFrom", []),
 36            ("Keys", [netuid]),
 37            ("Name", [netuid]),
 38            ("Address", [netuid]),
 39            ("RegistrationBlock", [netuid]),
 40            ("ValidatorFeeConfig", []),
 41            ("Emission", []),
 42            ("Incentive", []),
 43            ("Dividends", []),
 44            ("LastUpdate", []),
 45            ("Metadata", [netuid]),
 46            ("StakeTo", []),
 47        ],
 48    }
 49    if include_balances:
 50        request_dict["System"] = [("Account", [])]
 51    bulk_query = client.query_batch_map(request_dict)
 52    (
 53        ss58_to_stakefrom,
 54        uid_to_key,
 55        uid_to_name,
 56        uid_to_address,
 57        uid_to_regblock,
 58        ss58_to_delegationfee,
 59        uid_to_emission,
 60        uid_to_incentive,
 61        uid_to_dividend,
 62        uid_to_lastupdate,
 63        ss58_to_balances,
 64        ss58_to_metadata,
 65    ) = (
 66        bulk_query.get("StakeFrom", {}),
 67        bulk_query.get("Keys", {}),
 68        bulk_query["Name"],
 69        bulk_query["Address"],
 70        bulk_query["RegistrationBlock"],
 71        bulk_query["ValidatorFeeConfig"],
 72        bulk_query["Emission"],
 73        bulk_query["Incentive"],
 74        bulk_query["Dividends"],
 75        bulk_query["LastUpdate"],
 76        bulk_query.get("Account", {}),
 77        bulk_query.get("Metadata", {}),
 78    )
 79    result_modules: dict[str, ModuleInfoWithOptionalBalance] = {}
 80    ss58_to_stakefrom = transform_stake_dmap(ss58_to_stakefrom)
 81    for uid, key in uid_to_key.items():
 82        key = check_ss58_address(key)
 83        name = uid_to_name[uid]
 84        address = uid_to_address[uid]
 85        emission = uid_to_emission[netuid][uid]
 86        incentive = uid_to_incentive[netuid][uid]
 87        dividend = uid_to_dividend[netuid][uid]
 88        regblock = uid_to_regblock[uid]
 89        stake_from = ss58_to_stakefrom.get(key, [])
 90        last_update = uid_to_lastupdate[netuid][uid]
 91        stake_delegation_fee = ss58_to_delegationfee.get(key, {}).get(
 92            "stake_delegation_fee", 0
 93        )
 94        validator_weight_fee = ss58_to_delegationfee.get(key, {}).get(
 95            "validator_weight_fee", 0
 96        )
 97        metadata = ss58_to_metadata.get(key, None)
 98
 99        balance = None
100        if include_balances and ss58_to_balances is not None:  # type: ignore
101            balance_dict = ss58_to_balances.get(key, None)
102            if balance_dict is not None:
103                assert isinstance(balance_dict["data"], dict)
104                balance = balance_dict["data"]["free"]
105            else:
106                balance = 0
107        stake = sum(stake for _, stake in stake_from)
108
109        module: ModuleInfoWithOptionalBalance = {
110            "uid": uid,
111            "key": key,
112            "name": name,
113            "address": address,
114            "emission": emission,
115            "incentive": incentive,
116            "dividends": dividend,
117            "stake_from": stake_from,
118            "regblock": regblock,
119            "last_update": last_update,
120            "balance": balance,
121            "stake": stake,
122            "stake_delegation_fee": stake_delegation_fee,
123            "validator_weight_fee": validator_weight_fee,
124            "metadata": metadata,
125        }
126
127        result_modules[key] = module
128    return result_modules

Gets all modules info on the network

def to_snake_case(d: dict[str, ~T]) -> dict[str, ~T]:
131def to_snake_case(d: dict[str, T]) -> dict[str, T]:
132    """
133    Converts a dictionary with camelCase keys to snake_case keys
134    """
135
136    def snakerize(camel: str) -> str:
137        return re.sub(r"(?<!^)(?=[A-Z])", "_", camel).lower()
138
139    snaked: dict[str, T] = {snakerize(k): v for k, v in d.items()}
140    return snaked

Converts a dictionary with camelCase keys to snake_case keys

def get_map_displayable_subnets(client: communex.client.CommuneClient):
143def get_map_displayable_subnets(client: CommuneClient):
144    from communex.cli._common import transform_subnet_params
145
146    subnets = get_map_subnets_params(client)
147    display_values = transform_subnet_params(subnets)
148    return display_values
def get_map_subnets_params( client: communex.client.CommuneClient, block_hash: str | None = None) -> dict[int, communex.types.SubnetParamsWithEmission]:
151def get_map_subnets_params(
152    client: CommuneClient, block_hash: str | None = None
153) -> dict[int, SubnetParamsWithEmission]:
154    """
155    Gets all subnets info on the network
156    """
157    bulk_query = client.query_batch_map(
158        {
159            "SubspaceModule": [
160                ("ImmunityPeriod", []),
161                ("MinAllowedWeights", []),
162                ("MaxAllowedWeights", []),
163                ("Tempo", []),
164                ("MaxAllowedUids", []),
165                ("Founder", []),
166                ("FounderShare", []),
167                ("IncentiveRatio", []),
168                ("SubnetNames", []),
169                ("MaxWeightAge", []),
170                ("BondsMovingAverage", []),
171                ("MaximumSetWeightCallsPerEpoch", []),
172                ("MinValidatorStake", []),
173                ("MaxAllowedValidators", []),
174                ("ModuleBurnConfig", []),
175                ("SubnetMetadata", []),
176                ("MaxEncryptionPeriod", []),
177                ("CopierMargin", []),
178                ("UseWeightsEncryption", []),
179            ],
180            "GovernanceModule": [
181                ("SubnetGovernanceConfig", []),
182            ],
183            "SubnetEmissionModule": [
184                ("SubnetEmission", []),
185            ],
186        },
187        block_hash,
188    )
189    subnet_maps: SubnetParamsMaps = {
190        "netuid_to_emission": bulk_query["SubnetEmission"],
191        "netuid_to_tempo": bulk_query["Tempo"],
192        "netuid_to_min_allowed_weights": bulk_query["MinAllowedWeights"],
193        "netuid_to_max_allowed_weights": bulk_query["MaxAllowedWeights"],
194        "netuid_to_max_allowed_uids": bulk_query["MaxAllowedUids"],
195        "netuid_to_founder": bulk_query["Founder"],
196        "netuid_to_founder_share": bulk_query["FounderShare"],
197        "netuid_to_incentive_ratio": bulk_query["IncentiveRatio"],
198        "netuid_to_name": bulk_query["SubnetNames"],
199        "netuid_to_max_weight_age": bulk_query["MaxWeightAge"],
200        "netuid_to_bonds_ma": bulk_query.get("BondsMovingAverage", {}),
201        "netuid_to_maximum_set_weight_calls_per_epoch": bulk_query.get(
202            "MaximumSetWeightCallsPerEpoch", {}
203        ),
204        "netuid_to_governance_configuration": bulk_query[
205            "SubnetGovernanceConfig"
206        ],
207        "netuid_to_immunity_period": bulk_query["ImmunityPeriod"],
208        "netuid_to_min_validator_stake": bulk_query.get(
209            "MinValidatorStake", {}
210        ),
211        "netuid_to_max_allowed_validators": bulk_query.get(
212            "MaxAllowedValidators", {}
213        ),
214        "netuid_to_module_burn_config": bulk_query.get("ModuleBurnConfig", {}),
215        "netuid_to_subnet_metadata": bulk_query.get("SubnetMetadata", {}),
216        "netuid_to_max_encryption_period": bulk_query.get(
217            "MaxEncryptionPeriod", {}
218        ),
219        "netuid_to_copier_margin": bulk_query.get("CopierMargin", {}),
220        "netuid_to_use_weights_encryption": bulk_query.get(
221            "UseWeightsEncryption", {}
222        ),
223    }
224    result_subnets: dict[int, SubnetParamsWithEmission] = {}
225
226    for netuid, name in subnet_maps["netuid_to_name"].items():
227        subnet: SubnetParamsWithEmission = {
228            "name": name,
229            "founder": subnet_maps["netuid_to_founder"][netuid],
230            "founder_share": subnet_maps["netuid_to_founder_share"][netuid],
231            "incentive_ratio": subnet_maps["netuid_to_incentive_ratio"][netuid],
232            "max_allowed_uids": subnet_maps["netuid_to_max_allowed_uids"][
233                netuid
234            ],
235            "max_allowed_weights": subnet_maps["netuid_to_max_allowed_weights"][
236                netuid
237            ],
238            "min_allowed_weights": subnet_maps["netuid_to_min_allowed_weights"][
239                netuid
240            ],
241            "tempo": subnet_maps["netuid_to_tempo"][netuid],
242            "emission": subnet_maps["netuid_to_emission"][netuid],
243            "max_weight_age": subnet_maps["netuid_to_max_weight_age"][netuid],
244            "bonds_ma": subnet_maps["netuid_to_bonds_ma"].get(netuid, None),
245            "maximum_set_weight_calls_per_epoch": subnet_maps[
246                "netuid_to_maximum_set_weight_calls_per_epoch"
247            ].get(netuid, 30),
248            "governance_config": subnet_maps[
249                "netuid_to_governance_configuration"
250            ][netuid],
251            "immunity_period": subnet_maps["netuid_to_immunity_period"][netuid],
252            "min_validator_stake": subnet_maps[
253                "netuid_to_min_validator_stake"
254            ].get(netuid, to_nano(50_000)),
255            "max_allowed_validators": subnet_maps[
256                "netuid_to_max_allowed_validators"
257            ].get(netuid, 50),
258            "module_burn_config": cast(
259                BurnConfiguration,
260                subnet_maps["netuid_to_module_burn_config"].get(netuid, None),
261            ),
262            "subnet_metadata": subnet_maps["netuid_to_subnet_metadata"].get(
263                netuid, None
264            ),
265            "max_encryption_period": subnet_maps[
266                "netuid_to_max_encryption_period"
267            ].get(netuid, 0),
268            "copier_margin": subnet_maps["netuid_to_copier_margin"].get(
269                netuid, 0
270            ),
271            "use_weights_encryption": subnet_maps[
272                "netuid_to_use_weights_encryption"
273            ].get(netuid, 0),
274        }
275
276        result_subnets[netuid] = subnet
277
278    return result_subnets

Gets all subnets info on the network

def get_global_params(c_client: communex.client.CommuneClient) -> communex.types.NetworkParams:
281def get_global_params(c_client: CommuneClient) -> NetworkParams:
282    """
283    Returns global parameters of the whole commune ecosystem
284    """
285
286    query_all = c_client.query_batch(
287        {
288            "SubspaceModule": [
289                ("MaxNameLength", []),
290                ("MinNameLength", []),
291                ("MaxAllowedSubnets", []),
292                ("MaxAllowedModules", []),
293                ("MaxRegistrationsPerBlock", []),
294                ("MaxAllowedWeightsGlobal", []),
295                ("FloorDelegationFee", []),
296                ("FloorFounderShare", []),
297                ("MinWeightStake", []),
298                ("Kappa", []),
299                ("Rho", []),
300                ("SubnetImmunityPeriod", []),
301                ("SubnetBurn", []),
302            ],
303            "GovernanceModule": [
304                ("GlobalGovernanceConfig", []),
305                ("GeneralSubnetApplicationCost", []),
306                ("Curator", []),
307            ],
308        }
309    )
310    global_config = cast(
311        GovernanceConfiguration, query_all["GlobalGovernanceConfig"]
312    )
313    global_params: NetworkParams = {
314        "max_allowed_subnets": int(query_all["MaxAllowedSubnets"]),
315        "max_allowed_modules": int(query_all["MaxAllowedModules"]),
316        "max_registrations_per_block": int(
317            query_all["MaxRegistrationsPerBlock"]
318        ),
319        "max_name_length": int(query_all["MaxNameLength"]),
320        "min_weight_stake": int(query_all["MinWeightStake"]),
321        "floor_delegation_fee": int(query_all["FloorDelegationFee"]),
322        "max_allowed_weights": int(query_all["MaxAllowedWeightsGlobal"]),
323        "curator": Ss58Address(query_all["Curator"]),
324        "min_name_length": int(query_all["MinNameLength"]),
325        "floor_founder_share": int(query_all["FloorFounderShare"]),
326        "general_subnet_application_cost": int(
327            query_all["GeneralSubnetApplicationCost"]
328        ),
329        "kappa": int(query_all["Kappa"]),
330        "rho": int(query_all["Rho"]),
331        "subnet_immunity_period": int(query_all["SubnetImmunityPeriod"]),
332        "subnet_registration_cost": int(query_all["SubnetBurn"]),
333        "governance_config": {
334            "proposal_cost": int(global_config["proposal_cost"]),
335            "proposal_expiration": int(global_config["proposal_expiration"]),
336            "vote_mode": global_config["vote_mode"],
337            "proposal_reward_treasury_allocation": int(
338                global_config["proposal_reward_treasury_allocation"]
339            ),
340            "max_proposal_reward_treasury_allocation": int(
341                global_config["max_proposal_reward_treasury_allocation"]
342            ),
343            "proposal_reward_interval": int(
344                global_config["proposal_reward_interval"]
345            ),
346        },
347    }
348    return global_params

Returns global parameters of the whole commune ecosystem

def concat_to_local_keys( balance: dict[str, int], local_key_info: dict[str, communex.types.Ss58Address]) -> dict[str, int]:
351def concat_to_local_keys(
352    balance: dict[str, int], local_key_info: dict[str, Ss58Address]
353) -> dict[str, int]:
354    key2: dict[str, int] = {
355        key_name: balance.get(key_address, 0)
356        for key_name, key_address in local_key_info.items()
357    }
358
359    return key2
def local_keys_to_freebalance( c_client: communex.client.CommuneClient, local_keys: dict[str, communex.types.Ss58Address]) -> dict[str, int]:
362def local_keys_to_freebalance(
363    c_client: CommuneClient,
364    local_keys: dict[str, Ss58Address],
365) -> dict[str, int]:
366    query_all = c_client.query_batch_map(
367        {
368            "System": [("Account", [])],
369        }
370    )
371    balance_map = query_all["Account"]
372
373    format_balances: dict[str, int] = {
374        key: value["data"]["free"]
375        for key, value in balance_map.items()
376        if "data" in value and "free" in value["data"]
377    }
378
379    key2balance: dict[str, int] = concat_to_local_keys(
380        format_balances, local_keys
381    )
382
383    return key2balance
def local_keys_to_stakedbalance( c_client: communex.client.CommuneClient, local_keys: dict[str, communex.types.Ss58Address]) -> dict[str, int]:
386def local_keys_to_stakedbalance(
387    c_client: CommuneClient,
388    local_keys: dict[str, Ss58Address],
389) -> dict[str, int]:
390    staketo_map = c_client.query_map_staketo()
391
392    format_stake: dict[str, int] = {
393        key: sum(stake for _, stake in value)
394        for key, value in staketo_map.items()
395    }
396
397    key2stake: dict[str, int] = concat_to_local_keys(format_stake, local_keys)
398
399    return key2stake
def local_keys_to_stakedfrom_balance( c_client: communex.client.CommuneClient, local_keys: dict[str, communex.types.Ss58Address]) -> dict[str, int]:
402def local_keys_to_stakedfrom_balance(
403    c_client: CommuneClient,
404    local_keys: dict[str, Ss58Address],
405) -> dict[str, int]:
406    stakefrom_map = c_client.query_map_stakefrom()
407
408    format_stake: dict[str, int] = {
409        key: sum(stake for _, stake in value)
410        for key, value in stakefrom_map.items()
411    }
412
413    key2stake: dict[str, int] = concat_to_local_keys(format_stake, local_keys)
414    key2stake = {key: stake for key, stake in key2stake.items()}
415    return key2stake
def local_keys_allbalance( c_client: communex.client.CommuneClient, local_keys: dict[str, communex.types.Ss58Address]) -> tuple[dict[str, int], dict[str, int]]:
418def local_keys_allbalance(
419    c_client: CommuneClient,
420    local_keys: dict[str, Ss58Address],
421) -> tuple[dict[str, int], dict[str, int]]:
422    query_all = c_client.query_batch_map(
423        {
424            "System": [("Account", [])],
425            "SubspaceModule": [
426                ("StakeTo", []),
427            ],
428        }
429    )
430
431    balance_map, staketo_map = (
432        query_all["Account"],
433        transform_stake_dmap(query_all["StakeTo"]),
434    )
435
436    format_balances: dict[str, int] = {
437        key: value["data"]["free"]
438        for key, value in balance_map.items()
439        if "data" in value and "free" in value["data"]
440    }
441    key2balance: dict[str, int] = concat_to_local_keys(
442        format_balances, local_keys
443    )
444    format_stake: dict[str, int] = {
445        key: sum(stake for _, stake in value)
446        for key, value in staketo_map.items()
447    }
448
449    key2stake: dict[str, int] = concat_to_local_keys(format_stake, local_keys)
450
451    key2balance = {
452        k: v
453        for k, v in sorted(
454            key2balance.items(), key=lambda item: item[1], reverse=True
455        )
456    }
457
458    key2stake = {
459        k: v
460        for k, v in sorted(
461            key2stake.items(), key=lambda item: item[1], reverse=True
462        )
463    }
464
465    return key2balance, key2stake