Edit on GitHub

communex.misc

  1import re
  2from typing import Any, TypeVar
  3
  4from communex._common import transform_stake_dmap
  5from communex.client import CommuneClient
  6from communex.key import check_ss58_address
  7from communex.types import (ModuleInfoWithOptionalBalance, NetworkParams,
  8                            Ss58Address, SubnetParamsMaps,
  9                            SubnetParamsWithEmission)
 10
 11IPFS_REGEX = re.compile(r"^Qm[1-9A-HJ-NP-Za-km-z]{44}$")
 12
 13T = TypeVar("T")
 14
 15
 16def get_map_modules(
 17    client: CommuneClient,
 18    netuid: int = 0,
 19    include_balances: bool = False,
 20) -> dict[str, ModuleInfoWithOptionalBalance]:
 21    """
 22    Gets all modules info on the network
 23    """
 24
 25    request_dict: dict[Any, Any] = {
 26        "SubspaceModule": [
 27            ("StakeFrom", []),
 28            ("Keys", [netuid]),
 29            ("Name", [netuid]),
 30            ("Address", [netuid]),
 31            ("RegistrationBlock", [netuid]),
 32            ("DelegationFee", [netuid]),
 33            ("Emission", []),
 34            ("Incentive", []),
 35            ("Dividends", []),
 36            ("LastUpdate", []),
 37            ("Metadata", [netuid]),
 38        ],
 39    }
 40    if include_balances:
 41        request_dict["System"] = [("Account", [])]
 42
 43    bulk_query = client.query_batch_map(request_dict)
 44    (
 45        ss58_to_stakefrom,
 46        uid_to_key,
 47        uid_to_name,
 48        uid_to_address,
 49        uid_to_regblock,
 50        ss58_to_delegationfee,
 51        uid_to_emission,
 52        uid_to_incentive,
 53        uid_to_dividend,
 54        uid_to_lastupdate,
 55        ss58_to_balances,
 56        uid_to_metadata,
 57    ) = (
 58        bulk_query.get("StakeFrom", {}),
 59        bulk_query.get("Keys", {}),
 60        bulk_query["Name"],
 61        bulk_query["Address"],
 62        bulk_query["RegistrationBlock"],
 63        bulk_query["DelegationFee"],
 64        bulk_query["Emission"],
 65        bulk_query["Incentive"],
 66        bulk_query["Dividends"],
 67        bulk_query["LastUpdate"],
 68        bulk_query.get("Account", {}),
 69        bulk_query.get("Metadata", {}),
 70    )
 71
 72    result_modules: dict[str, ModuleInfoWithOptionalBalance] = {}
 73    ss58_to_stakefrom = transform_stake_dmap(ss58_to_stakefrom)
 74    for uid, key in uid_to_key.items():
 75        key = check_ss58_address(key)
 76        name = uid_to_name[uid]
 77        address = uid_to_address[uid]
 78        emission = uid_to_emission[netuid][uid]
 79        incentive = uid_to_incentive[netuid][uid]
 80        dividend = uid_to_dividend[netuid][uid]
 81        regblock = uid_to_regblock[uid]
 82        stake_from = ss58_to_stakefrom.get(key, [])
 83        last_update = uid_to_lastupdate[netuid][uid]
 84        delegation_fee = ss58_to_delegationfee.get(
 85            key, 20
 86        )  # 20% default delegation fee
 87        metadata = uid_to_metadata.get(uid, None)
 88
 89        balance = None
 90        if include_balances and ss58_to_balances is not None:  # type: ignore
 91            balance_dict = ss58_to_balances.get(key, None)
 92            if balance_dict is not None:
 93                assert isinstance(balance_dict["data"], dict)
 94                balance = balance_dict["data"]["free"]
 95            else:
 96                balance = 0
 97        stake = sum(stake for _, stake in stake_from)
 98
 99        module: ModuleInfoWithOptionalBalance = {
100            "uid": uid,
101            "key": key,
102            "name": name,
103            "address": address,
104            "emission": emission,
105            "incentive": incentive,
106            "dividends": dividend,
107            "stake_from": stake_from,
108            "regblock": regblock,
109            "last_update": last_update,
110            "balance": balance,
111            "stake": stake,
112            "delegation_fee": delegation_fee,
113            "metadata": metadata,
114        }
115
116        result_modules[key] = module
117    return result_modules
118
119
120def to_snake_case(d: dict[str, T]) -> dict[str, T]:
121    """
122    Converts a dictionary with camelCase keys to snake_case keys
123    """
124    def snakerize(camel: str) -> str:
125        return re.sub(r'(?<!^)(?=[A-Z])', '_', camel).lower()
126    snaked: dict[str, T] = {snakerize(k): v for k, v in d.items()}
127    return snaked
128
129
130def get_map_subnets_params(
131    client: CommuneClient, block_hash: str | None = None
132) -> dict[int, SubnetParamsWithEmission]:
133    """
134    Gets all subnets info on the network
135    """
136    bulk_query = client.query_batch_map(
137        {
138            "SubspaceModule": [
139                ("ImmunityPeriod", []),
140                ("MinAllowedWeights", []),
141                ("MaxAllowedWeights", []),
142                ("Tempo", []),
143                ("MaxAllowedUids", []),
144                ("TargetRegistrationsInterval", []),
145                ("TargetRegistrationsPerInterval", []),
146                ("MaxRegistrationsPerInterval", []),
147                ("Founder", []),
148                ("FounderShare", []),
149                ("IncentiveRatio", []),
150                ("TrustRatio", []),
151                ("SubnetNames", []),
152                ("MaxWeightAge", []),
153                ("BondsMovingAverage", []),
154                ("MaximumSetWeightCallsPerEpoch", []),
155                ("AdjustmentAlpha", []),
156                ("MinImmunityStake", []),
157            ],
158            "GovernanceModule": [
159                ("SubnetGovernanceConfig", []),
160            ],
161            "SubnetEmissionModule": [
162                ("SubnetEmission", []),
163            ],
164
165        },
166        block_hash,
167    )
168
169    subnet_maps: SubnetParamsMaps = {
170        "netuid_to_emission": bulk_query["SubnetEmission"],
171        "netuid_to_tempo": bulk_query["Tempo"],
172        "netuid_to_min_allowed_weights": bulk_query["MinAllowedWeights"],
173        "netuid_to_max_allowed_weights": bulk_query["MaxAllowedWeights"],
174        "netuid_to_max_allowed_uids": bulk_query["MaxAllowedUids"],
175        "netuid_to_founder": bulk_query["Founder"],
176        "netuid_to_founder_share": bulk_query["FounderShare"],
177        "netuid_to_incentive_ratio": bulk_query["IncentiveRatio"],
178        "netuid_to_trust_ratio": bulk_query["TrustRatio"],
179        "netuid_to_name": bulk_query["SubnetNames"],
180        "netuid_to_max_weight_age": bulk_query["MaxWeightAge"],
181        "netuid_to_adjustment_alpha": bulk_query["AdjustmentAlpha"],
182        "netuid_to_bonds_ma": bulk_query.get("BondsMovingAverage", {}),
183        "netuid_to_maximum_set_weight_calls_per_epoch": bulk_query.get("MaximumSetWeightCallsPerEpoch", {}),
184        "netuid_to_target_registrations_per_interval": bulk_query.get("TargetRegistrationsPerInterval", {}),
185        "netuid_to_target_registrations_interval": bulk_query.get("TargetRegistrationsInterval", {}),
186        "netuid_to_max_registrations_per_interval": bulk_query.get("MaxRegistrationsPerInterval", {}),
187        "netuid_to_min_immunity_stake": bulk_query.get("MinImmunityStake", {}),
188        "netuid_to_governance_configuration": bulk_query["SubnetGovernanceConfig"],
189        "netuid_to_immunity_period": bulk_query["ImmunityPeriod"],
190    }
191    result_subnets: dict[int, SubnetParamsWithEmission] = {}
192
193    default_target_registrations_interval = 200
194    default_target_registrations_per_interval = int(default_target_registrations_interval / 2)
195    default_max_registrations_per_interval = 42
196
197    for netuid, name in subnet_maps["netuid_to_name"].items():
198
199        subnet: SubnetParamsWithEmission = {
200            "name": name,
201            "founder": subnet_maps["netuid_to_founder"][netuid],
202            "founder_share": subnet_maps["netuid_to_founder_share"][netuid],
203            "incentive_ratio": subnet_maps["netuid_to_incentive_ratio"][netuid],
204            "max_allowed_uids": subnet_maps["netuid_to_max_allowed_uids"][netuid],
205            "max_allowed_weights": subnet_maps["netuid_to_max_allowed_weights"][netuid],
206            "min_allowed_weights": subnet_maps["netuid_to_min_allowed_weights"][netuid],
207            "tempo": subnet_maps["netuid_to_tempo"][netuid],
208            "trust_ratio": subnet_maps["netuid_to_trust_ratio"][netuid],
209            "emission": subnet_maps["netuid_to_emission"][netuid],
210            "max_weight_age": subnet_maps["netuid_to_max_weight_age"][netuid],
211            "adjustment_alpha": subnet_maps["netuid_to_adjustment_alpha"][netuid],
212            "bonds_ma": subnet_maps["netuid_to_bonds_ma"].get(netuid, None),
213            "maximum_set_weight_calls_per_epoch": subnet_maps["netuid_to_maximum_set_weight_calls_per_epoch"].get(netuid, 30),
214            "target_registrations_per_interval": subnet_maps["netuid_to_target_registrations_per_interval"].get(netuid, default_target_registrations_per_interval),
215            "target_registrations_interval": subnet_maps["netuid_to_target_registrations_interval"].get(netuid, default_target_registrations_interval),
216            "max_registrations_per_interval": subnet_maps["netuid_to_max_registrations_per_interval"].get(netuid, default_max_registrations_per_interval),
217            "min_immunity_stake": subnet_maps["netuid_to_min_immunity_stake"].get(netuid, 0),
218            "governance_config": subnet_maps["netuid_to_governance_configuration"][netuid],
219            "immunity_period": subnet_maps["netuid_to_immunity_period"][netuid],
220        }
221
222        result_subnets[netuid] = subnet
223
224    return result_subnets
225
226
227def get_global_params(c_client: CommuneClient) -> NetworkParams:
228    """
229    Returns global parameters of the whole commune ecosystem
230    """
231
232    query_all = c_client.query_batch(
233        {
234            "SubspaceModule": [
235                ("MaxNameLength", []),
236                ("MinNameLength", []),
237                ("MaxAllowedSubnets", []),
238                ("MaxAllowedModules", []),
239                ("MaxRegistrationsPerBlock", []),
240                ("MaxAllowedWeightsGlobal", []),
241                ("FloorDelegationFee", []),
242                ("FloorFounderShare", []),
243                ("MinWeightStake", []),
244                ("BurnConfig", []),
245                ("Kappa", []),
246                ("Rho", []),
247                ("SubnetImmunityPeriod", []),
248            ],
249            "GovernanceModule": [
250                ("GlobalGovernanceConfig", []),
251                ("GeneralSubnetApplicationCost", []),
252                ("Curator", []),
253            ]
254        }
255    )
256    global_params: NetworkParams = {
257        "max_allowed_subnets": int(query_all["MaxAllowedSubnets"]),
258        "max_allowed_modules": int(query_all["MaxAllowedModules"]),
259        "max_registrations_per_block": int(query_all["MaxRegistrationsPerBlock"]),
260        "max_name_length": int(query_all["MaxNameLength"]),
261        "min_burn": int(query_all["BurnConfig"]["min_burn"]),  # type: ignore
262        "max_burn": int(query_all["BurnConfig"]["max_burn"]),  # type: ignore
263        "min_weight_stake": int(query_all["MinWeightStake"]),
264        "floor_delegation_fee": int(query_all["FloorDelegationFee"]),
265        "max_allowed_weights": int(query_all["MaxAllowedWeightsGlobal"]),
266        "curator": Ss58Address(query_all["Curator"]),
267        "min_name_length": int(query_all["MinNameLength"]),
268        "floor_founder_share": int(query_all["FloorFounderShare"]),
269        "general_subnet_application_cost": int(query_all["GeneralSubnetApplicationCost"]),
270        "kappa": int(query_all["Kappa"]),
271        "rho": int(query_all["Rho"]),
272        "subnet_immunity_period": int(query_all["SubnetImmunityPeriod"]),
273        "governance_config": query_all["GlobalGovernanceConfig"]  # type: ignore
274    }
275    return global_params
276
277
278def concat_to_local_keys(
279    balance: dict[str, int], local_key_info: dict[str, Ss58Address]
280) -> dict[str, int]:
281    key2: dict[str, int] = {
282        key_name: balance.get(key_address, 0)
283        for key_name, key_address in local_key_info.items()
284    }
285
286    return key2
287
288
289def local_keys_to_freebalance(
290    c_client: CommuneClient,
291    local_keys: dict[str, Ss58Address],
292) -> dict[str, int]:
293    query_all = c_client.query_batch_map(
294        {
295            "System": [("Account", [])],
296        }
297    )
298    balance_map = query_all["Account"]
299
300    format_balances: dict[str, int] = {
301        key: value["data"]["free"]
302        for key, value in balance_map.items()
303        if "data" in value and "free" in value["data"]
304    }
305
306    key2balance: dict[str, int] = concat_to_local_keys(format_balances, local_keys)
307
308    return key2balance
309
310
311def local_keys_to_stakedbalance(
312    c_client: CommuneClient,
313    local_keys: dict[str, Ss58Address],
314) -> dict[str, int]:
315    staketo_map = c_client.query_map_staketo()
316
317    format_stake: dict[str, int] = {
318        key: sum(stake for _, stake in value) for key, value in staketo_map.items()
319    }
320
321    key2stake: dict[str, int] = concat_to_local_keys(format_stake, local_keys)
322
323    return key2stake
324
325
326def local_keys_to_stakedfrom_balance(
327    c_client: CommuneClient,
328    local_keys: dict[str, Ss58Address],
329) -> dict[str, int]:
330    stakefrom_map = c_client.query_map_stakefrom()
331
332    format_stake: dict[str, int] = {
333        key: sum(stake for _, stake in value) for key, value in stakefrom_map.items()
334    }
335
336    key2stake: dict[str, int] = concat_to_local_keys(format_stake, local_keys)
337    key2stake = {key: stake for key, stake in key2stake.items()}
338    return key2stake
339
340
341def local_keys_allbalance(
342    c_client: CommuneClient,
343    local_keys: dict[str, Ss58Address],
344) -> tuple[dict[str, int], dict[str, int]]:
345    query_all = c_client.query_batch_map(
346        {
347            "System": [("Account", [])],
348            "SubspaceModule": [
349                ("StakeTo", []),
350            ],
351        }
352    )
353
354    balance_map, staketo_map = query_all["Account"], transform_stake_dmap(query_all["StakeTo"])
355
356    format_balances: dict[str, int] = {
357        key: value["data"]["free"]
358        for key, value in balance_map.items()
359        if "data" in value and "free" in value["data"]
360    }
361    key2balance: dict[str, int] = concat_to_local_keys(format_balances, local_keys)
362    format_stake: dict[str, int] = {
363        key: sum(stake for _, stake in value) for key, value in staketo_map.items()
364    }
365
366    key2stake: dict[str, int] = concat_to_local_keys(format_stake, local_keys)
367
368    key2balance = {
369        k: v
370        for k, v in sorted(key2balance.items(), key=lambda item: item[1], reverse=True)
371    }
372
373    key2stake = {
374        k: v
375        for k, v in sorted(key2stake.items(), key=lambda item: item[1], reverse=True)
376    }
377
378    return key2balance, key2stake
379
380
381if __name__ == "__main__":
382    from communex._common import get_node_url
383    client = CommuneClient(get_node_url(use_testnet=True))
384    get_global_params(client)
IPFS_REGEX = re.compile('^Qm[1-9A-HJ-NP-Za-km-z]{44}$')
def get_map_modules( client: communex.client.CommuneClient, netuid: int = 0, include_balances: bool = False) -> dict[str, communex.types.ModuleInfoWithOptionalBalance]:
 17def get_map_modules(
 18    client: CommuneClient,
 19    netuid: int = 0,
 20    include_balances: bool = False,
 21) -> dict[str, ModuleInfoWithOptionalBalance]:
 22    """
 23    Gets all modules info on the network
 24    """
 25
 26    request_dict: dict[Any, Any] = {
 27        "SubspaceModule": [
 28            ("StakeFrom", []),
 29            ("Keys", [netuid]),
 30            ("Name", [netuid]),
 31            ("Address", [netuid]),
 32            ("RegistrationBlock", [netuid]),
 33            ("DelegationFee", [netuid]),
 34            ("Emission", []),
 35            ("Incentive", []),
 36            ("Dividends", []),
 37            ("LastUpdate", []),
 38            ("Metadata", [netuid]),
 39        ],
 40    }
 41    if include_balances:
 42        request_dict["System"] = [("Account", [])]
 43
 44    bulk_query = client.query_batch_map(request_dict)
 45    (
 46        ss58_to_stakefrom,
 47        uid_to_key,
 48        uid_to_name,
 49        uid_to_address,
 50        uid_to_regblock,
 51        ss58_to_delegationfee,
 52        uid_to_emission,
 53        uid_to_incentive,
 54        uid_to_dividend,
 55        uid_to_lastupdate,
 56        ss58_to_balances,
 57        uid_to_metadata,
 58    ) = (
 59        bulk_query.get("StakeFrom", {}),
 60        bulk_query.get("Keys", {}),
 61        bulk_query["Name"],
 62        bulk_query["Address"],
 63        bulk_query["RegistrationBlock"],
 64        bulk_query["DelegationFee"],
 65        bulk_query["Emission"],
 66        bulk_query["Incentive"],
 67        bulk_query["Dividends"],
 68        bulk_query["LastUpdate"],
 69        bulk_query.get("Account", {}),
 70        bulk_query.get("Metadata", {}),
 71    )
 72
 73    result_modules: dict[str, ModuleInfoWithOptionalBalance] = {}
 74    ss58_to_stakefrom = transform_stake_dmap(ss58_to_stakefrom)
 75    for uid, key in uid_to_key.items():
 76        key = check_ss58_address(key)
 77        name = uid_to_name[uid]
 78        address = uid_to_address[uid]
 79        emission = uid_to_emission[netuid][uid]
 80        incentive = uid_to_incentive[netuid][uid]
 81        dividend = uid_to_dividend[netuid][uid]
 82        regblock = uid_to_regblock[uid]
 83        stake_from = ss58_to_stakefrom.get(key, [])
 84        last_update = uid_to_lastupdate[netuid][uid]
 85        delegation_fee = ss58_to_delegationfee.get(
 86            key, 20
 87        )  # 20% default delegation fee
 88        metadata = uid_to_metadata.get(uid, None)
 89
 90        balance = None
 91        if include_balances and ss58_to_balances is not None:  # type: ignore
 92            balance_dict = ss58_to_balances.get(key, None)
 93            if balance_dict is not None:
 94                assert isinstance(balance_dict["data"], dict)
 95                balance = balance_dict["data"]["free"]
 96            else:
 97                balance = 0
 98        stake = sum(stake for _, stake in stake_from)
 99
100        module: ModuleInfoWithOptionalBalance = {
101            "uid": uid,
102            "key": key,
103            "name": name,
104            "address": address,
105            "emission": emission,
106            "incentive": incentive,
107            "dividends": dividend,
108            "stake_from": stake_from,
109            "regblock": regblock,
110            "last_update": last_update,
111            "balance": balance,
112            "stake": stake,
113            "delegation_fee": delegation_fee,
114            "metadata": metadata,
115        }
116
117        result_modules[key] = module
118    return result_modules

Gets all modules info on the network

def to_snake_case(d: dict[str, ~T]) -> dict[str, ~T]:
121def to_snake_case(d: dict[str, T]) -> dict[str, T]:
122    """
123    Converts a dictionary with camelCase keys to snake_case keys
124    """
125    def snakerize(camel: str) -> str:
126        return re.sub(r'(?<!^)(?=[A-Z])', '_', camel).lower()
127    snaked: dict[str, T] = {snakerize(k): v for k, v in d.items()}
128    return snaked

Converts a dictionary with camelCase keys to snake_case keys

def get_map_subnets_params( client: communex.client.CommuneClient, block_hash: str | None = None) -> dict[int, communex.types.SubnetParamsWithEmission]:
131def get_map_subnets_params(
132    client: CommuneClient, block_hash: str | None = None
133) -> dict[int, SubnetParamsWithEmission]:
134    """
135    Gets all subnets info on the network
136    """
137    bulk_query = client.query_batch_map(
138        {
139            "SubspaceModule": [
140                ("ImmunityPeriod", []),
141                ("MinAllowedWeights", []),
142                ("MaxAllowedWeights", []),
143                ("Tempo", []),
144                ("MaxAllowedUids", []),
145                ("TargetRegistrationsInterval", []),
146                ("TargetRegistrationsPerInterval", []),
147                ("MaxRegistrationsPerInterval", []),
148                ("Founder", []),
149                ("FounderShare", []),
150                ("IncentiveRatio", []),
151                ("TrustRatio", []),
152                ("SubnetNames", []),
153                ("MaxWeightAge", []),
154                ("BondsMovingAverage", []),
155                ("MaximumSetWeightCallsPerEpoch", []),
156                ("AdjustmentAlpha", []),
157                ("MinImmunityStake", []),
158            ],
159            "GovernanceModule": [
160                ("SubnetGovernanceConfig", []),
161            ],
162            "SubnetEmissionModule": [
163                ("SubnetEmission", []),
164            ],
165
166        },
167        block_hash,
168    )
169
170    subnet_maps: SubnetParamsMaps = {
171        "netuid_to_emission": bulk_query["SubnetEmission"],
172        "netuid_to_tempo": bulk_query["Tempo"],
173        "netuid_to_min_allowed_weights": bulk_query["MinAllowedWeights"],
174        "netuid_to_max_allowed_weights": bulk_query["MaxAllowedWeights"],
175        "netuid_to_max_allowed_uids": bulk_query["MaxAllowedUids"],
176        "netuid_to_founder": bulk_query["Founder"],
177        "netuid_to_founder_share": bulk_query["FounderShare"],
178        "netuid_to_incentive_ratio": bulk_query["IncentiveRatio"],
179        "netuid_to_trust_ratio": bulk_query["TrustRatio"],
180        "netuid_to_name": bulk_query["SubnetNames"],
181        "netuid_to_max_weight_age": bulk_query["MaxWeightAge"],
182        "netuid_to_adjustment_alpha": bulk_query["AdjustmentAlpha"],
183        "netuid_to_bonds_ma": bulk_query.get("BondsMovingAverage", {}),
184        "netuid_to_maximum_set_weight_calls_per_epoch": bulk_query.get("MaximumSetWeightCallsPerEpoch", {}),
185        "netuid_to_target_registrations_per_interval": bulk_query.get("TargetRegistrationsPerInterval", {}),
186        "netuid_to_target_registrations_interval": bulk_query.get("TargetRegistrationsInterval", {}),
187        "netuid_to_max_registrations_per_interval": bulk_query.get("MaxRegistrationsPerInterval", {}),
188        "netuid_to_min_immunity_stake": bulk_query.get("MinImmunityStake", {}),
189        "netuid_to_governance_configuration": bulk_query["SubnetGovernanceConfig"],
190        "netuid_to_immunity_period": bulk_query["ImmunityPeriod"],
191    }
192    result_subnets: dict[int, SubnetParamsWithEmission] = {}
193
194    default_target_registrations_interval = 200
195    default_target_registrations_per_interval = int(default_target_registrations_interval / 2)
196    default_max_registrations_per_interval = 42
197
198    for netuid, name in subnet_maps["netuid_to_name"].items():
199
200        subnet: SubnetParamsWithEmission = {
201            "name": name,
202            "founder": subnet_maps["netuid_to_founder"][netuid],
203            "founder_share": subnet_maps["netuid_to_founder_share"][netuid],
204            "incentive_ratio": subnet_maps["netuid_to_incentive_ratio"][netuid],
205            "max_allowed_uids": subnet_maps["netuid_to_max_allowed_uids"][netuid],
206            "max_allowed_weights": subnet_maps["netuid_to_max_allowed_weights"][netuid],
207            "min_allowed_weights": subnet_maps["netuid_to_min_allowed_weights"][netuid],
208            "tempo": subnet_maps["netuid_to_tempo"][netuid],
209            "trust_ratio": subnet_maps["netuid_to_trust_ratio"][netuid],
210            "emission": subnet_maps["netuid_to_emission"][netuid],
211            "max_weight_age": subnet_maps["netuid_to_max_weight_age"][netuid],
212            "adjustment_alpha": subnet_maps["netuid_to_adjustment_alpha"][netuid],
213            "bonds_ma": subnet_maps["netuid_to_bonds_ma"].get(netuid, None),
214            "maximum_set_weight_calls_per_epoch": subnet_maps["netuid_to_maximum_set_weight_calls_per_epoch"].get(netuid, 30),
215            "target_registrations_per_interval": subnet_maps["netuid_to_target_registrations_per_interval"].get(netuid, default_target_registrations_per_interval),
216            "target_registrations_interval": subnet_maps["netuid_to_target_registrations_interval"].get(netuid, default_target_registrations_interval),
217            "max_registrations_per_interval": subnet_maps["netuid_to_max_registrations_per_interval"].get(netuid, default_max_registrations_per_interval),
218            "min_immunity_stake": subnet_maps["netuid_to_min_immunity_stake"].get(netuid, 0),
219            "governance_config": subnet_maps["netuid_to_governance_configuration"][netuid],
220            "immunity_period": subnet_maps["netuid_to_immunity_period"][netuid],
221        }
222
223        result_subnets[netuid] = subnet
224
225    return result_subnets

Gets all subnets info on the network

def get_global_params(c_client: communex.client.CommuneClient) -> communex.types.NetworkParams:
228def get_global_params(c_client: CommuneClient) -> NetworkParams:
229    """
230    Returns global parameters of the whole commune ecosystem
231    """
232
233    query_all = c_client.query_batch(
234        {
235            "SubspaceModule": [
236                ("MaxNameLength", []),
237                ("MinNameLength", []),
238                ("MaxAllowedSubnets", []),
239                ("MaxAllowedModules", []),
240                ("MaxRegistrationsPerBlock", []),
241                ("MaxAllowedWeightsGlobal", []),
242                ("FloorDelegationFee", []),
243                ("FloorFounderShare", []),
244                ("MinWeightStake", []),
245                ("BurnConfig", []),
246                ("Kappa", []),
247                ("Rho", []),
248                ("SubnetImmunityPeriod", []),
249            ],
250            "GovernanceModule": [
251                ("GlobalGovernanceConfig", []),
252                ("GeneralSubnetApplicationCost", []),
253                ("Curator", []),
254            ]
255        }
256    )
257    global_params: NetworkParams = {
258        "max_allowed_subnets": int(query_all["MaxAllowedSubnets"]),
259        "max_allowed_modules": int(query_all["MaxAllowedModules"]),
260        "max_registrations_per_block": int(query_all["MaxRegistrationsPerBlock"]),
261        "max_name_length": int(query_all["MaxNameLength"]),
262        "min_burn": int(query_all["BurnConfig"]["min_burn"]),  # type: ignore
263        "max_burn": int(query_all["BurnConfig"]["max_burn"]),  # type: ignore
264        "min_weight_stake": int(query_all["MinWeightStake"]),
265        "floor_delegation_fee": int(query_all["FloorDelegationFee"]),
266        "max_allowed_weights": int(query_all["MaxAllowedWeightsGlobal"]),
267        "curator": Ss58Address(query_all["Curator"]),
268        "min_name_length": int(query_all["MinNameLength"]),
269        "floor_founder_share": int(query_all["FloorFounderShare"]),
270        "general_subnet_application_cost": int(query_all["GeneralSubnetApplicationCost"]),
271        "kappa": int(query_all["Kappa"]),
272        "rho": int(query_all["Rho"]),
273        "subnet_immunity_period": int(query_all["SubnetImmunityPeriod"]),
274        "governance_config": query_all["GlobalGovernanceConfig"]  # type: ignore
275    }
276    return global_params

Returns global parameters of the whole commune ecosystem

def concat_to_local_keys( balance: dict[str, int], local_key_info: dict[str, communex.types.Ss58Address]) -> dict[str, int]:
279def concat_to_local_keys(
280    balance: dict[str, int], local_key_info: dict[str, Ss58Address]
281) -> dict[str, int]:
282    key2: dict[str, int] = {
283        key_name: balance.get(key_address, 0)
284        for key_name, key_address in local_key_info.items()
285    }
286
287    return key2
def local_keys_to_freebalance( c_client: communex.client.CommuneClient, local_keys: dict[str, communex.types.Ss58Address]) -> dict[str, int]:
290def local_keys_to_freebalance(
291    c_client: CommuneClient,
292    local_keys: dict[str, Ss58Address],
293) -> dict[str, int]:
294    query_all = c_client.query_batch_map(
295        {
296            "System": [("Account", [])],
297        }
298    )
299    balance_map = query_all["Account"]
300
301    format_balances: dict[str, int] = {
302        key: value["data"]["free"]
303        for key, value in balance_map.items()
304        if "data" in value and "free" in value["data"]
305    }
306
307    key2balance: dict[str, int] = concat_to_local_keys(format_balances, local_keys)
308
309    return key2balance
def local_keys_to_stakedbalance( c_client: communex.client.CommuneClient, local_keys: dict[str, communex.types.Ss58Address]) -> dict[str, int]:
312def local_keys_to_stakedbalance(
313    c_client: CommuneClient,
314    local_keys: dict[str, Ss58Address],
315) -> dict[str, int]:
316    staketo_map = c_client.query_map_staketo()
317
318    format_stake: dict[str, int] = {
319        key: sum(stake for _, stake in value) for key, value in staketo_map.items()
320    }
321
322    key2stake: dict[str, int] = concat_to_local_keys(format_stake, local_keys)
323
324    return key2stake
def local_keys_to_stakedfrom_balance( c_client: communex.client.CommuneClient, local_keys: dict[str, communex.types.Ss58Address]) -> dict[str, int]:
327def local_keys_to_stakedfrom_balance(
328    c_client: CommuneClient,
329    local_keys: dict[str, Ss58Address],
330) -> dict[str, int]:
331    stakefrom_map = c_client.query_map_stakefrom()
332
333    format_stake: dict[str, int] = {
334        key: sum(stake for _, stake in value) for key, value in stakefrom_map.items()
335    }
336
337    key2stake: dict[str, int] = concat_to_local_keys(format_stake, local_keys)
338    key2stake = {key: stake for key, stake in key2stake.items()}
339    return key2stake
def local_keys_allbalance( c_client: communex.client.CommuneClient, local_keys: dict[str, communex.types.Ss58Address]) -> tuple[dict[str, int], dict[str, int]]:
342def local_keys_allbalance(
343    c_client: CommuneClient,
344    local_keys: dict[str, Ss58Address],
345) -> tuple[dict[str, int], dict[str, int]]:
346    query_all = c_client.query_batch_map(
347        {
348            "System": [("Account", [])],
349            "SubspaceModule": [
350                ("StakeTo", []),
351            ],
352        }
353    )
354
355    balance_map, staketo_map = query_all["Account"], transform_stake_dmap(query_all["StakeTo"])
356
357    format_balances: dict[str, int] = {
358        key: value["data"]["free"]
359        for key, value in balance_map.items()
360        if "data" in value and "free" in value["data"]
361    }
362    key2balance: dict[str, int] = concat_to_local_keys(format_balances, local_keys)
363    format_stake: dict[str, int] = {
364        key: sum(stake for _, stake in value) for key, value in staketo_map.items()
365    }
366
367    key2stake: dict[str, int] = concat_to_local_keys(format_stake, local_keys)
368
369    key2balance = {
370        k: v
371        for k, v in sorted(key2balance.items(), key=lambda item: item[1], reverse=True)
372    }
373
374    key2stake = {
375        k: v
376        for k, v in sorted(key2stake.items(), key=lambda item: item[1], reverse=True)
377    }
378
379    return key2balance, key2stake