Edit on GitHub

communex.misc

  1import re
  2from typing import Any, TypeVar, cast
  3
  4from communex._common import transform_stake_dmap
  5from communex.balance import to_nano
  6from communex.client import CommuneClient
  7from communex.key import check_ss58_address
  8from communex.types import (
  9    BurnConfiguration, ModuleInfoWithOptionalBalance,
 10    NetworkParams, Ss58Address, SubnetParamsMaps,
 11    SubnetParamsWithEmission, GovernanceConfiguration
 12    )
 13
 14IPFS_REGEX = re.compile(r"^Qm[1-9A-HJ-NP-Za-km-z]{44}$")
 15
 16T = TypeVar("T")
 17
 18
 19def get_map_modules(
 20    client: CommuneClient,
 21    netuid: int = 0,
 22    include_balances: bool = False,
 23) -> dict[str, ModuleInfoWithOptionalBalance]:
 24    """
 25    Gets all modules info on the network
 26    """
 27
 28    request_dict: dict[Any, Any] = {
 29        "SubspaceModule": [
 30            ("StakeFrom", []),
 31            ("Keys", [netuid]),
 32            ("Name", [netuid]),
 33            ("Address", [netuid]),
 34            ("RegistrationBlock", [netuid]),
 35            ("DelegationFee", []),
 36            ("Emission", []),
 37            ("Incentive", []),
 38            ("Dividends", []),
 39            ("LastUpdate", []),
 40            ("Metadata", [netuid]),
 41        ],
 42    }
 43    if include_balances:
 44        request_dict["System"] = [("Account", [])]
 45
 46    bulk_query = client.query_batch_map(request_dict)
 47    (
 48        ss58_to_stakefrom,
 49        uid_to_key,
 50        uid_to_name,
 51        uid_to_address,
 52        uid_to_regblock,
 53        ss58_to_delegationfee,
 54        uid_to_emission,
 55        uid_to_incentive,
 56        uid_to_dividend,
 57        uid_to_lastupdate,
 58        ss58_to_balances,
 59        ss58_to_metadata,
 60    ) = (
 61        bulk_query.get("StakeFrom", {}),
 62        bulk_query.get("Keys", {}),
 63        bulk_query["Name"],
 64        bulk_query["Address"],
 65        bulk_query["RegistrationBlock"],
 66        bulk_query["DelegationFee"],
 67        bulk_query["Emission"],
 68        bulk_query["Incentive"],
 69        bulk_query["Dividends"],
 70        bulk_query["LastUpdate"],
 71        bulk_query.get("Account", {}),
 72        bulk_query.get("Metadata", {}),
 73    )
 74    result_modules: dict[str, ModuleInfoWithOptionalBalance] = {}
 75    ss58_to_stakefrom = transform_stake_dmap(ss58_to_stakefrom)
 76    for uid, key in uid_to_key.items():
 77        key = check_ss58_address(key)
 78        name = uid_to_name[uid]
 79        address = uid_to_address[uid]
 80        emission = uid_to_emission[netuid][uid]
 81        incentive = uid_to_incentive[netuid][uid]
 82        dividend = uid_to_dividend[netuid][uid]
 83        regblock = uid_to_regblock[uid]
 84        stake_from = ss58_to_stakefrom.get(key, [])
 85        last_update = uid_to_lastupdate[netuid][uid]
 86        delegation_fee = ss58_to_delegationfee.get(
 87            key, 5
 88        )  # 5% default delegation fee
 89        metadata = ss58_to_metadata.get(key, None)
 90
 91        balance = None
 92        if include_balances and ss58_to_balances is not None:  # type: ignore
 93            balance_dict = ss58_to_balances.get(key, None)
 94            if balance_dict is not None:
 95                assert isinstance(balance_dict["data"], dict)
 96                balance = balance_dict["data"]["free"]
 97            else:
 98                balance = 0
 99        stake = sum(stake for _, stake in stake_from)
100
101        module: ModuleInfoWithOptionalBalance = {
102            "uid": uid,
103            "key": key,
104            "name": name,
105            "address": address,
106            "emission": emission,
107            "incentive": incentive,
108            "dividends": dividend,
109            "stake_from": stake_from,
110            "regblock": regblock,
111            "last_update": last_update,
112            "balance": balance,
113            "stake": stake,
114            "delegation_fee": delegation_fee,
115            "metadata": metadata,
116        }
117
118        result_modules[key] = module
119    return result_modules
120
121
122def to_snake_case(d: dict[str, T]) -> dict[str, T]:
123    """
124    Converts a dictionary with camelCase keys to snake_case keys
125    """
126    def snakerize(camel: str) -> str:
127        return re.sub(r'(?<!^)(?=[A-Z])', '_', camel).lower()
128    snaked: dict[str, T] = {snakerize(k): v for k, v in d.items()}
129    return snaked
130
131
132def get_map_subnets_params(
133    client: CommuneClient, block_hash: str | None = None
134) -> dict[int, SubnetParamsWithEmission]:
135    """
136    Gets all subnets info on the network
137    """
138    bulk_query = client.query_batch_map(
139        {
140            "SubspaceModule": [
141                ("ImmunityPeriod", []),
142                ("MinAllowedWeights", []),
143                ("MaxAllowedWeights", []),
144                ("Tempo", []),
145                ("MaxAllowedUids", []),
146                ("Founder", []),
147                ("FounderShare", []),
148                ("IncentiveRatio", []),
149                ("TrustRatio", []),
150                ("SubnetNames", []),
151                ("MaxWeightAge", []),
152                ("BondsMovingAverage", []),
153                ("MaximumSetWeightCallsPerEpoch", []),
154                ("MinValidatorStake", []),
155                ("MaxAllowedValidators", []),
156                ("ModuleBurnConfig", []),
157                ("SubnetMetadata", []),
158            ],
159            "GovernanceModule": [
160                ("SubnetGovernanceConfig", []),
161            ],
162            "SubnetEmissionModule": [
163                ("SubnetEmission", []),
164            ],
165
166        },
167        block_hash,
168    )
169    subnet_maps: SubnetParamsMaps = {
170        "netuid_to_emission": bulk_query["SubnetEmission"],
171        "netuid_to_tempo": bulk_query["Tempo"],
172        "netuid_to_min_allowed_weights": bulk_query["MinAllowedWeights"],
173        "netuid_to_max_allowed_weights": bulk_query["MaxAllowedWeights"],
174        "netuid_to_max_allowed_uids": bulk_query["MaxAllowedUids"],
175        "netuid_to_founder": bulk_query["Founder"],
176        "netuid_to_founder_share": bulk_query["FounderShare"],
177        "netuid_to_incentive_ratio": bulk_query["IncentiveRatio"],
178        "netuid_to_trust_ratio": bulk_query["TrustRatio"],
179        "netuid_to_name": bulk_query["SubnetNames"],
180        "netuid_to_max_weight_age": bulk_query["MaxWeightAge"],
181        "netuid_to_bonds_ma": bulk_query.get("BondsMovingAverage", {}),
182        "netuid_to_maximum_set_weight_calls_per_epoch": bulk_query.get("MaximumSetWeightCallsPerEpoch", {}),
183        "netuid_to_governance_configuration": bulk_query["SubnetGovernanceConfig"],
184        "netuid_to_immunity_period": bulk_query["ImmunityPeriod"],
185        "netuid_to_min_validator_stake": bulk_query.get("MinValidatorStake", {}),
186        "netuid_to_max_allowed_validators": bulk_query.get("MaxAllowedValidators", {}),
187        "netuid_to_module_burn_config": bulk_query.get("ModuleBurnConfig", {}),
188        "netuid_to_subnet_metadata": bulk_query.get("SubnetMetadata", {}),
189    }
190    result_subnets: dict[int, SubnetParamsWithEmission] = {}
191
192    for netuid, name in subnet_maps["netuid_to_name"].items():
193
194        subnet: SubnetParamsWithEmission = {
195            "name": name,
196            "founder": subnet_maps["netuid_to_founder"][netuid],
197            "founder_share": subnet_maps["netuid_to_founder_share"][netuid],
198            "incentive_ratio": subnet_maps["netuid_to_incentive_ratio"][netuid],
199            "max_allowed_uids": subnet_maps["netuid_to_max_allowed_uids"][netuid],
200            "max_allowed_weights": subnet_maps["netuid_to_max_allowed_weights"][netuid],
201            "min_allowed_weights": subnet_maps["netuid_to_min_allowed_weights"][netuid],
202            "tempo": subnet_maps["netuid_to_tempo"][netuid],
203            "trust_ratio": subnet_maps["netuid_to_trust_ratio"][netuid],
204            "emission": subnet_maps["netuid_to_emission"][netuid],
205            "max_weight_age": subnet_maps["netuid_to_max_weight_age"][netuid],
206            "bonds_ma": subnet_maps["netuid_to_bonds_ma"].get(netuid, None),
207            "maximum_set_weight_calls_per_epoch": subnet_maps["netuid_to_maximum_set_weight_calls_per_epoch"].get(netuid, 30),
208            "governance_config": subnet_maps["netuid_to_governance_configuration"][netuid],
209            "immunity_period": subnet_maps["netuid_to_immunity_period"][netuid],
210            "min_validator_stake": subnet_maps["netuid_to_min_validator_stake"].get(netuid, to_nano(50_000)),
211            "max_allowed_validators": subnet_maps["netuid_to_max_allowed_validators"].get(netuid, 50),
212            "module_burn_config": cast(BurnConfiguration, subnet_maps["netuid_to_module_burn_config"].get(netuid, None)),
213            "subnet_metadata": subnet_maps["netuid_to_subnet_metadata"].get(netuid, None),
214        }
215
216        result_subnets[netuid] = subnet
217
218    return result_subnets
219
220
221def get_global_params(c_client: CommuneClient) -> NetworkParams:
222    """
223    Returns global parameters of the whole commune ecosystem
224    """
225
226    query_all = c_client.query_batch(
227        {
228            "SubspaceModule": [
229                ("MaxNameLength", []),
230                ("MinNameLength", []),
231                ("MaxAllowedSubnets", []),
232                ("MaxAllowedModules", []),
233                ("MaxRegistrationsPerBlock", []),
234                ("MaxAllowedWeightsGlobal", []),
235                ("FloorDelegationFee", []),
236                ("FloorFounderShare", []),
237                ("MinWeightStake", []),
238                ("Kappa", []),
239                ("Rho", []),
240                ("SubnetImmunityPeriod", []),
241            ],
242            "GovernanceModule": [
243                ("GlobalGovernanceConfig", []),
244                ("GeneralSubnetApplicationCost", []),
245                ("Curator", []),
246            ]
247        }
248    )
249    global_config = cast(
250        GovernanceConfiguration,
251        query_all["GlobalGovernanceConfig"]
252    )
253    global_params: NetworkParams = {
254        "max_allowed_subnets": int(query_all["MaxAllowedSubnets"]),
255        "max_allowed_modules": int(query_all["MaxAllowedModules"]),
256        "max_registrations_per_block": int(query_all["MaxRegistrationsPerBlock"]),
257        "max_name_length": int(query_all["MaxNameLength"]),
258        "min_weight_stake": int(query_all["MinWeightStake"]),
259        "floor_delegation_fee": int(query_all["FloorDelegationFee"]),
260        "max_allowed_weights": int(query_all["MaxAllowedWeightsGlobal"]),
261        "curator": Ss58Address(query_all["Curator"]),
262        "min_name_length": int(query_all["MinNameLength"]),
263        "floor_founder_share": int(query_all["FloorFounderShare"]),
264        "general_subnet_application_cost": int(query_all["GeneralSubnetApplicationCost"]),
265        "kappa": int(query_all["Kappa"]),
266        "rho": int(query_all["Rho"]),
267        "subnet_immunity_period": int(query_all["SubnetImmunityPeriod"]),
268        "governance_config": {
269            "proposal_cost": int(global_config["proposal_cost"]),
270            "proposal_expiration": int(global_config["proposal_expiration"]),
271            "vote_mode": global_config["vote_mode"],
272            "proposal_reward_treasury_allocation": int(global_config["proposal_reward_treasury_allocation"]),
273            "max_proposal_reward_treasury_allocation": int(global_config["max_proposal_reward_treasury_allocation"]),
274            "proposal_reward_interval": int(global_config["proposal_reward_interval"])
275        }
276    }
277    return global_params
278
279
280def concat_to_local_keys(
281    balance: dict[str, int], local_key_info: dict[str, Ss58Address]
282) -> dict[str, int]:
283    key2: dict[str, int] = {
284        key_name: balance.get(key_address, 0)
285        for key_name, key_address in local_key_info.items()
286    }
287
288    return key2
289
290
291def local_keys_to_freebalance(
292    c_client: CommuneClient,
293    local_keys: dict[str, Ss58Address],
294) -> dict[str, int]:
295    query_all = c_client.query_batch_map(
296        {
297            "System": [("Account", [])],
298        }
299    )
300    balance_map = query_all["Account"]
301
302    format_balances: dict[str, int] = {
303        key: value["data"]["free"]
304        for key, value in balance_map.items()
305        if "data" in value and "free" in value["data"]
306    }
307
308    key2balance: dict[str, int] = concat_to_local_keys(format_balances, local_keys)
309
310    return key2balance
311
312
313def local_keys_to_stakedbalance(
314    c_client: CommuneClient,
315    local_keys: dict[str, Ss58Address],
316) -> dict[str, int]:
317    staketo_map = c_client.query_map_staketo()
318
319    format_stake: dict[str, int] = {
320        key: sum(stake for _, stake in value) for key, value in staketo_map.items()
321    }
322
323    key2stake: dict[str, int] = concat_to_local_keys(format_stake, local_keys)
324
325    return key2stake
326
327
328def local_keys_to_stakedfrom_balance(
329    c_client: CommuneClient,
330    local_keys: dict[str, Ss58Address],
331) -> dict[str, int]:
332    stakefrom_map = c_client.query_map_stakefrom()
333
334    format_stake: dict[str, int] = {
335        key: sum(stake for _, stake in value) for key, value in stakefrom_map.items()
336    }
337
338    key2stake: dict[str, int] = concat_to_local_keys(format_stake, local_keys)
339    key2stake = {key: stake for key, stake in key2stake.items()}
340    return key2stake
341
342
343def local_keys_allbalance(
344    c_client: CommuneClient,
345    local_keys: dict[str, Ss58Address],
346) -> tuple[dict[str, int], dict[str, int]]:
347    query_all = c_client.query_batch_map(
348        {
349            "System": [("Account", [])],
350            "SubspaceModule": [
351                ("StakeTo", []),
352            ],
353        }
354    )
355
356    balance_map, staketo_map = query_all["Account"], transform_stake_dmap(query_all["StakeTo"])
357
358    format_balances: dict[str, int] = {
359        key: value["data"]["free"]
360        for key, value in balance_map.items()
361        if "data" in value and "free" in value["data"]
362    }
363    key2balance: dict[str, int] = concat_to_local_keys(format_balances, local_keys)
364    format_stake: dict[str, int] = {
365        key: sum(stake for _, stake in value) for key, value in staketo_map.items()
366    }
367
368    key2stake: dict[str, int] = concat_to_local_keys(format_stake, local_keys)
369
370    key2balance = {
371        k: v
372        for k, v in sorted(key2balance.items(), key=lambda item: item[1], reverse=True)
373    }
374
375    key2stake = {
376        k: v
377        for k, v in sorted(key2stake.items(), key=lambda item: item[1], reverse=True)
378    }
379
380    return key2balance, key2stake
381
382
383if __name__ == "__main__":
384    from communex._common import get_node_url
385    client = CommuneClient(get_node_url(use_testnet=True))
386    get_global_params(client)
IPFS_REGEX = re.compile('^Qm[1-9A-HJ-NP-Za-km-z]{44}$')
def get_map_modules( client: communex.client.CommuneClient, netuid: int = 0, include_balances: bool = False) -> dict[str, communex.types.ModuleInfoWithOptionalBalance]:
 20def get_map_modules(
 21    client: CommuneClient,
 22    netuid: int = 0,
 23    include_balances: bool = False,
 24) -> dict[str, ModuleInfoWithOptionalBalance]:
 25    """
 26    Gets all modules info on the network
 27    """
 28
 29    request_dict: dict[Any, Any] = {
 30        "SubspaceModule": [
 31            ("StakeFrom", []),
 32            ("Keys", [netuid]),
 33            ("Name", [netuid]),
 34            ("Address", [netuid]),
 35            ("RegistrationBlock", [netuid]),
 36            ("DelegationFee", []),
 37            ("Emission", []),
 38            ("Incentive", []),
 39            ("Dividends", []),
 40            ("LastUpdate", []),
 41            ("Metadata", [netuid]),
 42        ],
 43    }
 44    if include_balances:
 45        request_dict["System"] = [("Account", [])]
 46
 47    bulk_query = client.query_batch_map(request_dict)
 48    (
 49        ss58_to_stakefrom,
 50        uid_to_key,
 51        uid_to_name,
 52        uid_to_address,
 53        uid_to_regblock,
 54        ss58_to_delegationfee,
 55        uid_to_emission,
 56        uid_to_incentive,
 57        uid_to_dividend,
 58        uid_to_lastupdate,
 59        ss58_to_balances,
 60        ss58_to_metadata,
 61    ) = (
 62        bulk_query.get("StakeFrom", {}),
 63        bulk_query.get("Keys", {}),
 64        bulk_query["Name"],
 65        bulk_query["Address"],
 66        bulk_query["RegistrationBlock"],
 67        bulk_query["DelegationFee"],
 68        bulk_query["Emission"],
 69        bulk_query["Incentive"],
 70        bulk_query["Dividends"],
 71        bulk_query["LastUpdate"],
 72        bulk_query.get("Account", {}),
 73        bulk_query.get("Metadata", {}),
 74    )
 75    result_modules: dict[str, ModuleInfoWithOptionalBalance] = {}
 76    ss58_to_stakefrom = transform_stake_dmap(ss58_to_stakefrom)
 77    for uid, key in uid_to_key.items():
 78        key = check_ss58_address(key)
 79        name = uid_to_name[uid]
 80        address = uid_to_address[uid]
 81        emission = uid_to_emission[netuid][uid]
 82        incentive = uid_to_incentive[netuid][uid]
 83        dividend = uid_to_dividend[netuid][uid]
 84        regblock = uid_to_regblock[uid]
 85        stake_from = ss58_to_stakefrom.get(key, [])
 86        last_update = uid_to_lastupdate[netuid][uid]
 87        delegation_fee = ss58_to_delegationfee.get(
 88            key, 5
 89        )  # 5% default delegation fee
 90        metadata = ss58_to_metadata.get(key, None)
 91
 92        balance = None
 93        if include_balances and ss58_to_balances is not None:  # type: ignore
 94            balance_dict = ss58_to_balances.get(key, None)
 95            if balance_dict is not None:
 96                assert isinstance(balance_dict["data"], dict)
 97                balance = balance_dict["data"]["free"]
 98            else:
 99                balance = 0
100        stake = sum(stake for _, stake in stake_from)
101
102        module: ModuleInfoWithOptionalBalance = {
103            "uid": uid,
104            "key": key,
105            "name": name,
106            "address": address,
107            "emission": emission,
108            "incentive": incentive,
109            "dividends": dividend,
110            "stake_from": stake_from,
111            "regblock": regblock,
112            "last_update": last_update,
113            "balance": balance,
114            "stake": stake,
115            "delegation_fee": delegation_fee,
116            "metadata": metadata,
117        }
118
119        result_modules[key] = module
120    return result_modules

Gets all modules info on the network

def to_snake_case(d: dict[str, ~T]) -> dict[str, ~T]:
123def to_snake_case(d: dict[str, T]) -> dict[str, T]:
124    """
125    Converts a dictionary with camelCase keys to snake_case keys
126    """
127    def snakerize(camel: str) -> str:
128        return re.sub(r'(?<!^)(?=[A-Z])', '_', camel).lower()
129    snaked: dict[str, T] = {snakerize(k): v for k, v in d.items()}
130    return snaked

Converts a dictionary with camelCase keys to snake_case keys

def get_map_subnets_params( client: communex.client.CommuneClient, block_hash: str | None = None) -> dict[int, communex.types.SubnetParamsWithEmission]:
133def get_map_subnets_params(
134    client: CommuneClient, block_hash: str | None = None
135) -> dict[int, SubnetParamsWithEmission]:
136    """
137    Gets all subnets info on the network
138    """
139    bulk_query = client.query_batch_map(
140        {
141            "SubspaceModule": [
142                ("ImmunityPeriod", []),
143                ("MinAllowedWeights", []),
144                ("MaxAllowedWeights", []),
145                ("Tempo", []),
146                ("MaxAllowedUids", []),
147                ("Founder", []),
148                ("FounderShare", []),
149                ("IncentiveRatio", []),
150                ("TrustRatio", []),
151                ("SubnetNames", []),
152                ("MaxWeightAge", []),
153                ("BondsMovingAverage", []),
154                ("MaximumSetWeightCallsPerEpoch", []),
155                ("MinValidatorStake", []),
156                ("MaxAllowedValidators", []),
157                ("ModuleBurnConfig", []),
158                ("SubnetMetadata", []),
159            ],
160            "GovernanceModule": [
161                ("SubnetGovernanceConfig", []),
162            ],
163            "SubnetEmissionModule": [
164                ("SubnetEmission", []),
165            ],
166
167        },
168        block_hash,
169    )
170    subnet_maps: SubnetParamsMaps = {
171        "netuid_to_emission": bulk_query["SubnetEmission"],
172        "netuid_to_tempo": bulk_query["Tempo"],
173        "netuid_to_min_allowed_weights": bulk_query["MinAllowedWeights"],
174        "netuid_to_max_allowed_weights": bulk_query["MaxAllowedWeights"],
175        "netuid_to_max_allowed_uids": bulk_query["MaxAllowedUids"],
176        "netuid_to_founder": bulk_query["Founder"],
177        "netuid_to_founder_share": bulk_query["FounderShare"],
178        "netuid_to_incentive_ratio": bulk_query["IncentiveRatio"],
179        "netuid_to_trust_ratio": bulk_query["TrustRatio"],
180        "netuid_to_name": bulk_query["SubnetNames"],
181        "netuid_to_max_weight_age": bulk_query["MaxWeightAge"],
182        "netuid_to_bonds_ma": bulk_query.get("BondsMovingAverage", {}),
183        "netuid_to_maximum_set_weight_calls_per_epoch": bulk_query.get("MaximumSetWeightCallsPerEpoch", {}),
184        "netuid_to_governance_configuration": bulk_query["SubnetGovernanceConfig"],
185        "netuid_to_immunity_period": bulk_query["ImmunityPeriod"],
186        "netuid_to_min_validator_stake": bulk_query.get("MinValidatorStake", {}),
187        "netuid_to_max_allowed_validators": bulk_query.get("MaxAllowedValidators", {}),
188        "netuid_to_module_burn_config": bulk_query.get("ModuleBurnConfig", {}),
189        "netuid_to_subnet_metadata": bulk_query.get("SubnetMetadata", {}),
190    }
191    result_subnets: dict[int, SubnetParamsWithEmission] = {}
192
193    for netuid, name in subnet_maps["netuid_to_name"].items():
194
195        subnet: SubnetParamsWithEmission = {
196            "name": name,
197            "founder": subnet_maps["netuid_to_founder"][netuid],
198            "founder_share": subnet_maps["netuid_to_founder_share"][netuid],
199            "incentive_ratio": subnet_maps["netuid_to_incentive_ratio"][netuid],
200            "max_allowed_uids": subnet_maps["netuid_to_max_allowed_uids"][netuid],
201            "max_allowed_weights": subnet_maps["netuid_to_max_allowed_weights"][netuid],
202            "min_allowed_weights": subnet_maps["netuid_to_min_allowed_weights"][netuid],
203            "tempo": subnet_maps["netuid_to_tempo"][netuid],
204            "trust_ratio": subnet_maps["netuid_to_trust_ratio"][netuid],
205            "emission": subnet_maps["netuid_to_emission"][netuid],
206            "max_weight_age": subnet_maps["netuid_to_max_weight_age"][netuid],
207            "bonds_ma": subnet_maps["netuid_to_bonds_ma"].get(netuid, None),
208            "maximum_set_weight_calls_per_epoch": subnet_maps["netuid_to_maximum_set_weight_calls_per_epoch"].get(netuid, 30),
209            "governance_config": subnet_maps["netuid_to_governance_configuration"][netuid],
210            "immunity_period": subnet_maps["netuid_to_immunity_period"][netuid],
211            "min_validator_stake": subnet_maps["netuid_to_min_validator_stake"].get(netuid, to_nano(50_000)),
212            "max_allowed_validators": subnet_maps["netuid_to_max_allowed_validators"].get(netuid, 50),
213            "module_burn_config": cast(BurnConfiguration, subnet_maps["netuid_to_module_burn_config"].get(netuid, None)),
214            "subnet_metadata": subnet_maps["netuid_to_subnet_metadata"].get(netuid, None),
215        }
216
217        result_subnets[netuid] = subnet
218
219    return result_subnets

Gets all subnets info on the network

def get_global_params(c_client: communex.client.CommuneClient) -> communex.types.NetworkParams:
222def get_global_params(c_client: CommuneClient) -> NetworkParams:
223    """
224    Returns global parameters of the whole commune ecosystem
225    """
226
227    query_all = c_client.query_batch(
228        {
229            "SubspaceModule": [
230                ("MaxNameLength", []),
231                ("MinNameLength", []),
232                ("MaxAllowedSubnets", []),
233                ("MaxAllowedModules", []),
234                ("MaxRegistrationsPerBlock", []),
235                ("MaxAllowedWeightsGlobal", []),
236                ("FloorDelegationFee", []),
237                ("FloorFounderShare", []),
238                ("MinWeightStake", []),
239                ("Kappa", []),
240                ("Rho", []),
241                ("SubnetImmunityPeriod", []),
242            ],
243            "GovernanceModule": [
244                ("GlobalGovernanceConfig", []),
245                ("GeneralSubnetApplicationCost", []),
246                ("Curator", []),
247            ]
248        }
249    )
250    global_config = cast(
251        GovernanceConfiguration,
252        query_all["GlobalGovernanceConfig"]
253    )
254    global_params: NetworkParams = {
255        "max_allowed_subnets": int(query_all["MaxAllowedSubnets"]),
256        "max_allowed_modules": int(query_all["MaxAllowedModules"]),
257        "max_registrations_per_block": int(query_all["MaxRegistrationsPerBlock"]),
258        "max_name_length": int(query_all["MaxNameLength"]),
259        "min_weight_stake": int(query_all["MinWeightStake"]),
260        "floor_delegation_fee": int(query_all["FloorDelegationFee"]),
261        "max_allowed_weights": int(query_all["MaxAllowedWeightsGlobal"]),
262        "curator": Ss58Address(query_all["Curator"]),
263        "min_name_length": int(query_all["MinNameLength"]),
264        "floor_founder_share": int(query_all["FloorFounderShare"]),
265        "general_subnet_application_cost": int(query_all["GeneralSubnetApplicationCost"]),
266        "kappa": int(query_all["Kappa"]),
267        "rho": int(query_all["Rho"]),
268        "subnet_immunity_period": int(query_all["SubnetImmunityPeriod"]),
269        "governance_config": {
270            "proposal_cost": int(global_config["proposal_cost"]),
271            "proposal_expiration": int(global_config["proposal_expiration"]),
272            "vote_mode": global_config["vote_mode"],
273            "proposal_reward_treasury_allocation": int(global_config["proposal_reward_treasury_allocation"]),
274            "max_proposal_reward_treasury_allocation": int(global_config["max_proposal_reward_treasury_allocation"]),
275            "proposal_reward_interval": int(global_config["proposal_reward_interval"])
276        }
277    }
278    return global_params

Returns global parameters of the whole commune ecosystem

def concat_to_local_keys( balance: dict[str, int], local_key_info: dict[str, communex.types.Ss58Address]) -> dict[str, int]:
281def concat_to_local_keys(
282    balance: dict[str, int], local_key_info: dict[str, Ss58Address]
283) -> dict[str, int]:
284    key2: dict[str, int] = {
285        key_name: balance.get(key_address, 0)
286        for key_name, key_address in local_key_info.items()
287    }
288
289    return key2
def local_keys_to_freebalance( c_client: communex.client.CommuneClient, local_keys: dict[str, communex.types.Ss58Address]) -> dict[str, int]:
292def local_keys_to_freebalance(
293    c_client: CommuneClient,
294    local_keys: dict[str, Ss58Address],
295) -> dict[str, int]:
296    query_all = c_client.query_batch_map(
297        {
298            "System": [("Account", [])],
299        }
300    )
301    balance_map = query_all["Account"]
302
303    format_balances: dict[str, int] = {
304        key: value["data"]["free"]
305        for key, value in balance_map.items()
306        if "data" in value and "free" in value["data"]
307    }
308
309    key2balance: dict[str, int] = concat_to_local_keys(format_balances, local_keys)
310
311    return key2balance
def local_keys_to_stakedbalance( c_client: communex.client.CommuneClient, local_keys: dict[str, communex.types.Ss58Address]) -> dict[str, int]:
314def local_keys_to_stakedbalance(
315    c_client: CommuneClient,
316    local_keys: dict[str, Ss58Address],
317) -> dict[str, int]:
318    staketo_map = c_client.query_map_staketo()
319
320    format_stake: dict[str, int] = {
321        key: sum(stake for _, stake in value) for key, value in staketo_map.items()
322    }
323
324    key2stake: dict[str, int] = concat_to_local_keys(format_stake, local_keys)
325
326    return key2stake
def local_keys_to_stakedfrom_balance( c_client: communex.client.CommuneClient, local_keys: dict[str, communex.types.Ss58Address]) -> dict[str, int]:
329def local_keys_to_stakedfrom_balance(
330    c_client: CommuneClient,
331    local_keys: dict[str, Ss58Address],
332) -> dict[str, int]:
333    stakefrom_map = c_client.query_map_stakefrom()
334
335    format_stake: dict[str, int] = {
336        key: sum(stake for _, stake in value) for key, value in stakefrom_map.items()
337    }
338
339    key2stake: dict[str, int] = concat_to_local_keys(format_stake, local_keys)
340    key2stake = {key: stake for key, stake in key2stake.items()}
341    return key2stake
def local_keys_allbalance( c_client: communex.client.CommuneClient, local_keys: dict[str, communex.types.Ss58Address]) -> tuple[dict[str, int], dict[str, int]]:
344def local_keys_allbalance(
345    c_client: CommuneClient,
346    local_keys: dict[str, Ss58Address],
347) -> tuple[dict[str, int], dict[str, int]]:
348    query_all = c_client.query_batch_map(
349        {
350            "System": [("Account", [])],
351            "SubspaceModule": [
352                ("StakeTo", []),
353            ],
354        }
355    )
356
357    balance_map, staketo_map = query_all["Account"], transform_stake_dmap(query_all["StakeTo"])
358
359    format_balances: dict[str, int] = {
360        key: value["data"]["free"]
361        for key, value in balance_map.items()
362        if "data" in value and "free" in value["data"]
363    }
364    key2balance: dict[str, int] = concat_to_local_keys(format_balances, local_keys)
365    format_stake: dict[str, int] = {
366        key: sum(stake for _, stake in value) for key, value in staketo_map.items()
367    }
368
369    key2stake: dict[str, int] = concat_to_local_keys(format_stake, local_keys)
370
371    key2balance = {
372        k: v
373        for k, v in sorted(key2balance.items(), key=lambda item: item[1], reverse=True)
374    }
375
376    key2stake = {
377        k: v
378        for k, v in sorted(key2stake.items(), key=lambda item: item[1], reverse=True)
379    }
380
381    return key2balance, key2stake