Edit on GitHub

communex.misc

  1import re
  2from typing import Any, TypeVar
  3
  4from communex.client import CommuneClient
  5from communex.key import check_ss58_address
  6from communex.types import (
  7    ModuleInfoWithOptionalBalance,
  8    NetworkParams,
  9    Ss58Address,
 10    SubnetParamsWithEmission,
 11    SubnetParamsMaps,
 12)
 13
 14IPFS_REGEX = re.compile(r"^Qm[1-9A-HJ-NP-Za-km-z]{44}$")
 15
 16T = TypeVar("T")
 17
 18def get_map_modules(
 19    client: CommuneClient,
 20    netuid: int = 0,
 21    include_balances: bool = False,
 22) -> dict[str, ModuleInfoWithOptionalBalance]:
 23    """
 24    Gets all modules info on the network
 25    """
 26
 27    request_dict: dict[Any, Any] = {
 28        "SubspaceModule": [
 29            ("StakeFrom", [netuid]),
 30            ("Keys", [netuid]),
 31            ("Name", [netuid]),
 32            ("Address", [netuid]),
 33            ("RegistrationBlock", [netuid]),
 34            ("DelegationFee", [netuid]),
 35            ("Emission", []),
 36            
 37            ("Incentive", []),
 38            ("Dividends", []),
 39            ("LastUpdate", []),
 40            ("Metadata", [netuid]),
 41        ],
 42    }
 43    if include_balances:
 44        request_dict["System"] = [("Account", [])]
 45
 46    bulk_query = client.query_batch_map(request_dict)
 47   
 48    (
 49        ss58_to_stakefrom,
 50        uid_to_key,
 51        uid_to_name,
 52        uid_to_address,
 53        uid_to_regblock,
 54        ss58_to_delegationfee,
 55        uid_to_emission,
 56        uid_to_incentive,
 57        uid_to_dividend,
 58        uid_to_lastupdate,
 59        ss58_to_balances,
 60        uid_to_metadata,
 61    ) = (
 62        bulk_query.get("StakeFrom", {}),
 63       bulk_query.get("Keys", {}),
 64        bulk_query["Name"],
 65        bulk_query["Address"],
 66        bulk_query["RegistrationBlock"],
 67        bulk_query["DelegationFee"],
 68        bulk_query["Emission"],
 69        bulk_query["Incentive"],
 70        bulk_query["Dividends"],
 71        bulk_query["LastUpdate"],
 72        bulk_query.get("Account", {}),
 73        bulk_query.get("Metadata", {}),
 74    )
 75
 76    result_modules: dict[str, ModuleInfoWithOptionalBalance] = {}
 77
 78    for uid, key in uid_to_key.items():
 79        key = check_ss58_address(key)
 80
 81        name = uid_to_name[uid]
 82        address = uid_to_address[uid]
 83        emission = uid_to_emission[netuid][uid]
 84        incentive = uid_to_incentive[netuid][uid]
 85        dividend = uid_to_dividend[netuid][uid]
 86        regblock = uid_to_regblock[uid]
 87        stake_from = ss58_to_stakefrom.get(key, [])
 88        last_update = uid_to_lastupdate[netuid][uid]
 89        delegation_fee = ss58_to_delegationfee.get(
 90            key, 20
 91        )  # 20% default delegation fee
 92        metadata = uid_to_metadata.get(uid, None)
 93
 94        balance = None
 95        if include_balances and ss58_to_balances is not None:  # type: ignore
 96            balance_dict = ss58_to_balances.get(key, None)
 97            if balance_dict is not None:
 98                assert isinstance(balance_dict["data"], dict)
 99                balance = balance_dict["data"]["free"]
100            else:
101                balance = 0
102        stake = sum(stake for _, stake in stake_from)
103
104        module: ModuleInfoWithOptionalBalance = {
105            "uid": uid,
106            "key": key,
107            "name": name,
108            "address": address,
109            "emission": emission,
110            "incentive": incentive,
111            "dividends": dividend,
112            "stake_from": stake_from,
113            "regblock": regblock,
114            "last_update": last_update,
115            "balance": balance,
116            "stake": stake,
117            "delegation_fee": delegation_fee,
118            "metadata": metadata,
119        }
120
121        result_modules[key] = module
122    return result_modules
123
124def to_snake_case(d: dict[str, T]) -> dict[str, T]:
125    """
126    Converts a dictionary with camelCase keys to snake_case keys
127    """
128    def snakerize(camel: str) -> str:
129        return re.sub(r'(?<!^)(?=[A-Z])', '_', camel).lower()
130    snaked: dict[str, T] = {snakerize(k): v for k, v in d.items()}
131    return snaked
132
133
134def get_map_subnets_params(
135    client: CommuneClient, block_hash: str | None = None
136) -> dict[int, SubnetParamsWithEmission]:
137    """
138    Gets all subnets info on the network
139    """
140
141    bulk_query = client.query_batch_map(
142        {
143            "SubspaceModule": [
144                ("ImmunityPeriod", []),
145                ("MinAllowedWeights", []),
146                ("MaxAllowedWeights", []),
147                ("MinStake", []),
148                ("SubnetEmission", []),
149                ("Tempo", []),
150                ("MaxAllowedUids", []),
151                ("TargetRegistrationsInterval", []),
152                ("TargetRegistrationsPerInterval", []),
153                ("MaxRegistrationsPerInterval", []),
154                ("Founder", []),
155                ("FounderShare", []),
156                ("IncentiveRatio", []),
157                ("TrustRatio", []),
158                ("SubnetNames", []),
159                ("MaxWeightAge", []),
160                ("BondsMovingAverage", []),
161                ("MaximumSetWeightCallsPerEpoch", []),
162                ("AdjustmentAlpha", []),
163            ],
164                "GovernanceModule": [
165                    ("SubnetGovernanceConfig", []),
166                ]
167        },
168        block_hash,
169    )
170    subnet_maps: SubnetParamsMaps = {
171        "netuid_to_emission": bulk_query["SubnetEmission"],
172        "netuid_to_tempo": bulk_query["Tempo"],
173        "netuid_to_immunity_period": bulk_query["ImmunityPeriod"],
174        "netuid_to_min_allowed_weights": bulk_query["MinAllowedWeights"],
175        "netuid_to_max_allowed_weights": bulk_query["MaxAllowedWeights"],
176        "netuid_to_max_allowed_uids": bulk_query["MaxAllowedUids"],
177        "netuid_to_min_stake": bulk_query["MinStake"],
178        "netuid_to_founder": bulk_query["Founder"],
179        "netuid_to_founder_share": bulk_query["FounderShare"],
180        "netuid_to_incentive_ratio": bulk_query["IncentiveRatio"],
181        "netuid_to_trust_ratio": bulk_query["TrustRatio"],
182        "netuid_to_name": bulk_query["SubnetNames"],
183        "netuid_to_max_weight_age": bulk_query["MaxWeightAge"],
184        "netuid_to_vote_mode": bulk_query["SubnetGovernanceConfig"],
185        "netuid_to_adjustment_alpha": bulk_query["AdjustmentAlpha"],
186        "netuid_to_bonds_ma": bulk_query.get("BondsMovingAverage", {}),
187        "netuid_to_maximum_set_weight_calls_per_epoch": bulk_query.get("MaximumSetWeightCallsPerEpoch", {}),
188        "netuid_to_target_registrations_per_interval": bulk_query.get("TargetRegistrationsInterval", {}),
189        "netuid_to_target_registrations_interval": bulk_query.get("TargetRegistrationsPerInterval", {}),
190        "netuid_to_max_registrations_per_interval": bulk_query.get("MaxRegistrationsPerInterval", {}),
191    }
192    result_subnets: dict[int, SubnetParamsWithEmission] = {}
193
194    default_target_registrations_interval = 200
195    default_target_registrations_per_interval = int(default_target_registrations_interval / 2)
196    default_max_registrations_per_interval = 42
197    for netuid, name in subnet_maps["netuid_to_name"].items():
198
199        subnet: SubnetParamsWithEmission = {
200            "name": name,
201            "founder": subnet_maps["netuid_to_founder"][netuid],
202            "founder_share": subnet_maps["netuid_to_founder_share"][netuid],
203            "immunity_period": subnet_maps["netuid_to_immunity_period"][netuid],
204            "incentive_ratio": subnet_maps["netuid_to_incentive_ratio"][netuid],
205            "max_allowed_uids": subnet_maps["netuid_to_max_allowed_uids"][netuid],
206            "max_allowed_weights": subnet_maps["netuid_to_max_allowed_weights"][netuid],
207            "min_allowed_weights": subnet_maps["netuid_to_min_allowed_weights"][netuid],
208            "min_stake": subnet_maps["netuid_to_min_stake"][netuid],
209            "tempo": subnet_maps["netuid_to_tempo"][netuid],
210            "trust_ratio": subnet_maps["netuid_to_trust_ratio"][netuid],
211            "emission": subnet_maps["netuid_to_emission"][netuid],
212            "max_weight_age": subnet_maps["netuid_to_max_weight_age"][netuid],
213            "vote_mode": subnet_maps["netuid_to_vote_mode"][netuid]["vote_mode"],
214            "adjustment_alpha": subnet_maps["netuid_to_adjustment_alpha"][netuid],
215            "bonds_ma": subnet_maps["netuid_to_bonds_ma"].get(netuid, None),
216            "maximum_set_weight_calls_per_epoch": subnet_maps["netuid_to_maximum_set_weight_calls_per_epoch"].get(netuid, 30),
217            "target_registrations_per_interval": subnet_maps["netuid_to_target_registrations_per_interval"].get(netuid, default_target_registrations_per_interval),
218            "target_registrations_interval": subnet_maps["netuid_to_target_registrations_interval"].get(netuid, default_target_registrations_interval),
219            "max_registrations_per_interval": subnet_maps["netuid_to_max_registrations_per_interval"].get(netuid, default_max_registrations_per_interval),
220        }
221
222        result_subnets[netuid] = subnet
223
224    return result_subnets
225
226
227def get_global_params(c_client: CommuneClient) -> NetworkParams:
228    """
229    Returns global parameters of the whole commune ecosystem
230    """
231
232    query_all = c_client.query_batch(
233        {
234            "SubspaceModule": [
235                ("MaxAllowedSubnets", []),
236                ("MaxAllowedModules", []),
237                ("MaxRegistrationsPerBlock", []),
238                ("MaxNameLength", []),
239                ("FloorDelegationFee", []),
240                ("MaxAllowedWeightsGlobal", []),
241                ("SubnetStakeThreshold", []),
242                ("MinWeightStake", []),
243                ("MinNameLength", []),
244                ("BurnConfig", []),
245                ("FloorFounderShare", []),
246            ],
247            "GovernanceModule": [
248                ("GlobalGovernanceConfig", []),
249                ("GeneralSubnetApplicationCost", []),
250                ("Curator", []),
251            ]
252        }
253    )
254    governance_config: dict[str, int] = query_all["GlobalGovernanceConfig"] # type: ignore
255    global_params: NetworkParams = {
256        "max_allowed_subnets": int(query_all["MaxAllowedSubnets"]),
257        "max_allowed_modules": int(query_all["MaxAllowedModules"]),
258        "max_registrations_per_block": int(query_all["MaxRegistrationsPerBlock"]),
259        "max_name_length": int(query_all["MaxNameLength"]),
260        "min_burn": int(query_all["BurnConfig"]["min_burn"]), # type: ignore
261        "max_burn": int(query_all["BurnConfig"]["max_burn"]), # type: ignore
262        "min_weight_stake": int(query_all["MinWeightStake"]),
263        "floor_delegation_fee": int(query_all["FloorDelegationFee"]),
264        "max_allowed_weights": int(query_all["MaxAllowedWeightsGlobal"]),
265        "curator": Ss58Address(query_all["Curator"]),
266        "proposal_cost": int(governance_config["proposal_cost"]),
267        "proposal_expiration": int(governance_config["proposal_expiration"]),
268        "subnet_stake_threshold": int(query_all["SubnetStakeThreshold"]),
269        "min_name_length": int(query_all["MinNameLength"]),
270        "floor_founder_share": int(query_all["FloorFounderShare"]),
271        "general_subnet_application_cost": int(query_all["GeneralSubnetApplicationCost"]),
272
273
274    }
275    return global_params
276
277
278def concat_to_local_keys(
279    balance: dict[str, int], local_key_info: dict[str, Ss58Address]
280) -> dict[str, int]:
281    key2: dict[str, int] = {
282        key_name: balance.get(key_address, 0)
283        for key_name, key_address in local_key_info.items()
284    }
285
286    return key2
287
288
289def local_keys_to_freebalance(
290    c_client: CommuneClient,
291    local_keys: dict[str, Ss58Address],
292) -> dict[str, int]:
293    query_all = c_client.query_batch_map(
294        {
295            "System": [("Account", [])],
296        }
297    )
298    balance_map = query_all["Account"]
299
300    format_balances: dict[str, int] = {
301        key: value["data"]["free"]
302        for key, value in balance_map.items()
303        if "data" in value and "free" in value["data"]
304    }
305
306    key2balance: dict[str, int] = concat_to_local_keys(format_balances, local_keys)
307
308    return key2balance
309
310
311def local_keys_to_stakedbalance(
312    c_client: CommuneClient,
313    local_keys: dict[str, Ss58Address],
314    netuid: int = 0,
315) -> dict[str, int]:
316    query_all = c_client.query_batch_map(
317        {
318            "SubspaceModule": [("StakeTo", [netuid])],
319        }
320    )
321
322    staketo_map = query_all["StakeTo"]
323
324    format_stake: dict[str, int] = {
325        key: sum(stake for _, stake in value) for key, value in staketo_map.items()
326    }
327
328    key2stake: dict[str, int] = concat_to_local_keys(format_stake, local_keys)
329
330    return key2stake
331
332
333def local_keys_allbalance(
334    c_client: CommuneClient,
335    local_keys: dict[str, Ss58Address],
336    netuid: int | None = None,
337) -> tuple[dict[str, int], dict[str, int]]:
338    staketo_maps: list[Any] = []
339    query_result = c_client.query_batch_map(
340        {
341            "SubspaceModule": [("SubnetNames", [])],
342            "System": [("Account", [])],
343        }
344    )
345    balance_map = query_result["Account"]
346    all_netuids = list(query_result["SubnetNames"].keys())
347
348    # update for all subnets
349    netuids = all_netuids if netuid is None else [netuid]
350    for uid in netuids:
351        query_result = c_client.query_batch_map(
352            {
353                "SubspaceModule": [
354                    ("StakeTo", [uid]),
355                ],
356            }
357        )
358        staketo_map = query_result.get("StakeTo", {})
359        staketo_maps.append(staketo_map)
360
361    format_balances: dict[str, int] = {
362        key: value["data"]["free"]
363        for key, value in balance_map.items()
364        if "data" in value and "free" in value["data"]
365    }
366
367    key2balance: dict[str, int] = concat_to_local_keys(format_balances, local_keys)
368
369    merged_staketo_map: dict[Any, Any] = {}
370
371    # Iterate through each staketo_map in the staketo_maps list
372    for staketo_map in staketo_maps:
373        # Iterate through key-value pairs in the current staketo_map
374        for key, value in staketo_map.items():
375            # If the key is not present in the merged dictionary, add it
376            if key not in merged_staketo_map:
377                merged_staketo_map[key] = value
378            else:
379                # If the key exists, extend the existing list with the new values
380                merged_staketo_map[key].extend(value)
381
382    format_stake: dict[str, int] = {
383        key: sum(stake for _, stake in value)
384        for key, value in merged_staketo_map.items()
385    }
386
387    key2stake: dict[str, int] = concat_to_local_keys(format_stake, local_keys)
388
389    key2balance = {
390        k: v
391        for k, v in sorted(key2balance.items(), key=lambda item: item[1], reverse=True)
392    }
393
394    key2stake = {
395        k: v
396        for k, v in sorted(key2stake.items(), key=lambda item: item[1], reverse=True)
397    }
398
399    return key2balance, key2stake
IPFS_REGEX = re.compile('^Qm[1-9A-HJ-NP-Za-km-z]{44}$')
def get_map_modules( client: communex.client.CommuneClient, netuid: int = 0, include_balances: bool = False) -> dict[str, communex.types.ModuleInfoWithOptionalBalance]:
 19def get_map_modules(
 20    client: CommuneClient,
 21    netuid: int = 0,
 22    include_balances: bool = False,
 23) -> dict[str, ModuleInfoWithOptionalBalance]:
 24    """
 25    Gets all modules info on the network
 26    """
 27
 28    request_dict: dict[Any, Any] = {
 29        "SubspaceModule": [
 30            ("StakeFrom", [netuid]),
 31            ("Keys", [netuid]),
 32            ("Name", [netuid]),
 33            ("Address", [netuid]),
 34            ("RegistrationBlock", [netuid]),
 35            ("DelegationFee", [netuid]),
 36            ("Emission", []),
 37            
 38            ("Incentive", []),
 39            ("Dividends", []),
 40            ("LastUpdate", []),
 41            ("Metadata", [netuid]),
 42        ],
 43    }
 44    if include_balances:
 45        request_dict["System"] = [("Account", [])]
 46
 47    bulk_query = client.query_batch_map(request_dict)
 48   
 49    (
 50        ss58_to_stakefrom,
 51        uid_to_key,
 52        uid_to_name,
 53        uid_to_address,
 54        uid_to_regblock,
 55        ss58_to_delegationfee,
 56        uid_to_emission,
 57        uid_to_incentive,
 58        uid_to_dividend,
 59        uid_to_lastupdate,
 60        ss58_to_balances,
 61        uid_to_metadata,
 62    ) = (
 63        bulk_query.get("StakeFrom", {}),
 64       bulk_query.get("Keys", {}),
 65        bulk_query["Name"],
 66        bulk_query["Address"],
 67        bulk_query["RegistrationBlock"],
 68        bulk_query["DelegationFee"],
 69        bulk_query["Emission"],
 70        bulk_query["Incentive"],
 71        bulk_query["Dividends"],
 72        bulk_query["LastUpdate"],
 73        bulk_query.get("Account", {}),
 74        bulk_query.get("Metadata", {}),
 75    )
 76
 77    result_modules: dict[str, ModuleInfoWithOptionalBalance] = {}
 78
 79    for uid, key in uid_to_key.items():
 80        key = check_ss58_address(key)
 81
 82        name = uid_to_name[uid]
 83        address = uid_to_address[uid]
 84        emission = uid_to_emission[netuid][uid]
 85        incentive = uid_to_incentive[netuid][uid]
 86        dividend = uid_to_dividend[netuid][uid]
 87        regblock = uid_to_regblock[uid]
 88        stake_from = ss58_to_stakefrom.get(key, [])
 89        last_update = uid_to_lastupdate[netuid][uid]
 90        delegation_fee = ss58_to_delegationfee.get(
 91            key, 20
 92        )  # 20% default delegation fee
 93        metadata = uid_to_metadata.get(uid, None)
 94
 95        balance = None
 96        if include_balances and ss58_to_balances is not None:  # type: ignore
 97            balance_dict = ss58_to_balances.get(key, None)
 98            if balance_dict is not None:
 99                assert isinstance(balance_dict["data"], dict)
100                balance = balance_dict["data"]["free"]
101            else:
102                balance = 0
103        stake = sum(stake for _, stake in stake_from)
104
105        module: ModuleInfoWithOptionalBalance = {
106            "uid": uid,
107            "key": key,
108            "name": name,
109            "address": address,
110            "emission": emission,
111            "incentive": incentive,
112            "dividends": dividend,
113            "stake_from": stake_from,
114            "regblock": regblock,
115            "last_update": last_update,
116            "balance": balance,
117            "stake": stake,
118            "delegation_fee": delegation_fee,
119            "metadata": metadata,
120        }
121
122        result_modules[key] = module
123    return result_modules

Gets all modules info on the network

def to_snake_case(d: dict[str, ~T]) -> dict[str, ~T]:
125def to_snake_case(d: dict[str, T]) -> dict[str, T]:
126    """
127    Converts a dictionary with camelCase keys to snake_case keys
128    """
129    def snakerize(camel: str) -> str:
130        return re.sub(r'(?<!^)(?=[A-Z])', '_', camel).lower()
131    snaked: dict[str, T] = {snakerize(k): v for k, v in d.items()}
132    return snaked

Converts a dictionary with camelCase keys to snake_case keys

def get_map_subnets_params( client: communex.client.CommuneClient, block_hash: str | None = None) -> dict[int, communex.types.SubnetParamsWithEmission]:
135def get_map_subnets_params(
136    client: CommuneClient, block_hash: str | None = None
137) -> dict[int, SubnetParamsWithEmission]:
138    """
139    Gets all subnets info on the network
140    """
141
142    bulk_query = client.query_batch_map(
143        {
144            "SubspaceModule": [
145                ("ImmunityPeriod", []),
146                ("MinAllowedWeights", []),
147                ("MaxAllowedWeights", []),
148                ("MinStake", []),
149                ("SubnetEmission", []),
150                ("Tempo", []),
151                ("MaxAllowedUids", []),
152                ("TargetRegistrationsInterval", []),
153                ("TargetRegistrationsPerInterval", []),
154                ("MaxRegistrationsPerInterval", []),
155                ("Founder", []),
156                ("FounderShare", []),
157                ("IncentiveRatio", []),
158                ("TrustRatio", []),
159                ("SubnetNames", []),
160                ("MaxWeightAge", []),
161                ("BondsMovingAverage", []),
162                ("MaximumSetWeightCallsPerEpoch", []),
163                ("AdjustmentAlpha", []),
164            ],
165                "GovernanceModule": [
166                    ("SubnetGovernanceConfig", []),
167                ]
168        },
169        block_hash,
170    )
171    subnet_maps: SubnetParamsMaps = {
172        "netuid_to_emission": bulk_query["SubnetEmission"],
173        "netuid_to_tempo": bulk_query["Tempo"],
174        "netuid_to_immunity_period": bulk_query["ImmunityPeriod"],
175        "netuid_to_min_allowed_weights": bulk_query["MinAllowedWeights"],
176        "netuid_to_max_allowed_weights": bulk_query["MaxAllowedWeights"],
177        "netuid_to_max_allowed_uids": bulk_query["MaxAllowedUids"],
178        "netuid_to_min_stake": bulk_query["MinStake"],
179        "netuid_to_founder": bulk_query["Founder"],
180        "netuid_to_founder_share": bulk_query["FounderShare"],
181        "netuid_to_incentive_ratio": bulk_query["IncentiveRatio"],
182        "netuid_to_trust_ratio": bulk_query["TrustRatio"],
183        "netuid_to_name": bulk_query["SubnetNames"],
184        "netuid_to_max_weight_age": bulk_query["MaxWeightAge"],
185        "netuid_to_vote_mode": bulk_query["SubnetGovernanceConfig"],
186        "netuid_to_adjustment_alpha": bulk_query["AdjustmentAlpha"],
187        "netuid_to_bonds_ma": bulk_query.get("BondsMovingAverage", {}),
188        "netuid_to_maximum_set_weight_calls_per_epoch": bulk_query.get("MaximumSetWeightCallsPerEpoch", {}),
189        "netuid_to_target_registrations_per_interval": bulk_query.get("TargetRegistrationsInterval", {}),
190        "netuid_to_target_registrations_interval": bulk_query.get("TargetRegistrationsPerInterval", {}),
191        "netuid_to_max_registrations_per_interval": bulk_query.get("MaxRegistrationsPerInterval", {}),
192    }
193    result_subnets: dict[int, SubnetParamsWithEmission] = {}
194
195    default_target_registrations_interval = 200
196    default_target_registrations_per_interval = int(default_target_registrations_interval / 2)
197    default_max_registrations_per_interval = 42
198    for netuid, name in subnet_maps["netuid_to_name"].items():
199
200        subnet: SubnetParamsWithEmission = {
201            "name": name,
202            "founder": subnet_maps["netuid_to_founder"][netuid],
203            "founder_share": subnet_maps["netuid_to_founder_share"][netuid],
204            "immunity_period": subnet_maps["netuid_to_immunity_period"][netuid],
205            "incentive_ratio": subnet_maps["netuid_to_incentive_ratio"][netuid],
206            "max_allowed_uids": subnet_maps["netuid_to_max_allowed_uids"][netuid],
207            "max_allowed_weights": subnet_maps["netuid_to_max_allowed_weights"][netuid],
208            "min_allowed_weights": subnet_maps["netuid_to_min_allowed_weights"][netuid],
209            "min_stake": subnet_maps["netuid_to_min_stake"][netuid],
210            "tempo": subnet_maps["netuid_to_tempo"][netuid],
211            "trust_ratio": subnet_maps["netuid_to_trust_ratio"][netuid],
212            "emission": subnet_maps["netuid_to_emission"][netuid],
213            "max_weight_age": subnet_maps["netuid_to_max_weight_age"][netuid],
214            "vote_mode": subnet_maps["netuid_to_vote_mode"][netuid]["vote_mode"],
215            "adjustment_alpha": subnet_maps["netuid_to_adjustment_alpha"][netuid],
216            "bonds_ma": subnet_maps["netuid_to_bonds_ma"].get(netuid, None),
217            "maximum_set_weight_calls_per_epoch": subnet_maps["netuid_to_maximum_set_weight_calls_per_epoch"].get(netuid, 30),
218            "target_registrations_per_interval": subnet_maps["netuid_to_target_registrations_per_interval"].get(netuid, default_target_registrations_per_interval),
219            "target_registrations_interval": subnet_maps["netuid_to_target_registrations_interval"].get(netuid, default_target_registrations_interval),
220            "max_registrations_per_interval": subnet_maps["netuid_to_max_registrations_per_interval"].get(netuid, default_max_registrations_per_interval),
221        }
222
223        result_subnets[netuid] = subnet
224
225    return result_subnets

Gets all subnets info on the network

def get_global_params(c_client: communex.client.CommuneClient) -> communex.types.NetworkParams:
228def get_global_params(c_client: CommuneClient) -> NetworkParams:
229    """
230    Returns global parameters of the whole commune ecosystem
231    """
232
233    query_all = c_client.query_batch(
234        {
235            "SubspaceModule": [
236                ("MaxAllowedSubnets", []),
237                ("MaxAllowedModules", []),
238                ("MaxRegistrationsPerBlock", []),
239                ("MaxNameLength", []),
240                ("FloorDelegationFee", []),
241                ("MaxAllowedWeightsGlobal", []),
242                ("SubnetStakeThreshold", []),
243                ("MinWeightStake", []),
244                ("MinNameLength", []),
245                ("BurnConfig", []),
246                ("FloorFounderShare", []),
247            ],
248            "GovernanceModule": [
249                ("GlobalGovernanceConfig", []),
250                ("GeneralSubnetApplicationCost", []),
251                ("Curator", []),
252            ]
253        }
254    )
255    governance_config: dict[str, int] = query_all["GlobalGovernanceConfig"] # type: ignore
256    global_params: NetworkParams = {
257        "max_allowed_subnets": int(query_all["MaxAllowedSubnets"]),
258        "max_allowed_modules": int(query_all["MaxAllowedModules"]),
259        "max_registrations_per_block": int(query_all["MaxRegistrationsPerBlock"]),
260        "max_name_length": int(query_all["MaxNameLength"]),
261        "min_burn": int(query_all["BurnConfig"]["min_burn"]), # type: ignore
262        "max_burn": int(query_all["BurnConfig"]["max_burn"]), # type: ignore
263        "min_weight_stake": int(query_all["MinWeightStake"]),
264        "floor_delegation_fee": int(query_all["FloorDelegationFee"]),
265        "max_allowed_weights": int(query_all["MaxAllowedWeightsGlobal"]),
266        "curator": Ss58Address(query_all["Curator"]),
267        "proposal_cost": int(governance_config["proposal_cost"]),
268        "proposal_expiration": int(governance_config["proposal_expiration"]),
269        "subnet_stake_threshold": int(query_all["SubnetStakeThreshold"]),
270        "min_name_length": int(query_all["MinNameLength"]),
271        "floor_founder_share": int(query_all["FloorFounderShare"]),
272        "general_subnet_application_cost": int(query_all["GeneralSubnetApplicationCost"]),
273
274
275    }
276    return global_params

Returns global parameters of the whole commune ecosystem

def concat_to_local_keys( balance: dict[str, int], local_key_info: dict[str, communex.types.Ss58Address]) -> dict[str, int]:
279def concat_to_local_keys(
280    balance: dict[str, int], local_key_info: dict[str, Ss58Address]
281) -> dict[str, int]:
282    key2: dict[str, int] = {
283        key_name: balance.get(key_address, 0)
284        for key_name, key_address in local_key_info.items()
285    }
286
287    return key2
def local_keys_to_freebalance( c_client: communex.client.CommuneClient, local_keys: dict[str, communex.types.Ss58Address]) -> dict[str, int]:
290def local_keys_to_freebalance(
291    c_client: CommuneClient,
292    local_keys: dict[str, Ss58Address],
293) -> dict[str, int]:
294    query_all = c_client.query_batch_map(
295        {
296            "System": [("Account", [])],
297        }
298    )
299    balance_map = query_all["Account"]
300
301    format_balances: dict[str, int] = {
302        key: value["data"]["free"]
303        for key, value in balance_map.items()
304        if "data" in value and "free" in value["data"]
305    }
306
307    key2balance: dict[str, int] = concat_to_local_keys(format_balances, local_keys)
308
309    return key2balance
def local_keys_to_stakedbalance( c_client: communex.client.CommuneClient, local_keys: dict[str, communex.types.Ss58Address], netuid: int = 0) -> dict[str, int]:
312def local_keys_to_stakedbalance(
313    c_client: CommuneClient,
314    local_keys: dict[str, Ss58Address],
315    netuid: int = 0,
316) -> dict[str, int]:
317    query_all = c_client.query_batch_map(
318        {
319            "SubspaceModule": [("StakeTo", [netuid])],
320        }
321    )
322
323    staketo_map = query_all["StakeTo"]
324
325    format_stake: dict[str, int] = {
326        key: sum(stake for _, stake in value) for key, value in staketo_map.items()
327    }
328
329    key2stake: dict[str, int] = concat_to_local_keys(format_stake, local_keys)
330
331    return key2stake
def local_keys_allbalance( c_client: communex.client.CommuneClient, local_keys: dict[str, communex.types.Ss58Address], netuid: int | None = None) -> tuple[dict[str, int], dict[str, int]]:
334def local_keys_allbalance(
335    c_client: CommuneClient,
336    local_keys: dict[str, Ss58Address],
337    netuid: int | None = None,
338) -> tuple[dict[str, int], dict[str, int]]:
339    staketo_maps: list[Any] = []
340    query_result = c_client.query_batch_map(
341        {
342            "SubspaceModule": [("SubnetNames", [])],
343            "System": [("Account", [])],
344        }
345    )
346    balance_map = query_result["Account"]
347    all_netuids = list(query_result["SubnetNames"].keys())
348
349    # update for all subnets
350    netuids = all_netuids if netuid is None else [netuid]
351    for uid in netuids:
352        query_result = c_client.query_batch_map(
353            {
354                "SubspaceModule": [
355                    ("StakeTo", [uid]),
356                ],
357            }
358        )
359        staketo_map = query_result.get("StakeTo", {})
360        staketo_maps.append(staketo_map)
361
362    format_balances: dict[str, int] = {
363        key: value["data"]["free"]
364        for key, value in balance_map.items()
365        if "data" in value and "free" in value["data"]
366    }
367
368    key2balance: dict[str, int] = concat_to_local_keys(format_balances, local_keys)
369
370    merged_staketo_map: dict[Any, Any] = {}
371
372    # Iterate through each staketo_map in the staketo_maps list
373    for staketo_map in staketo_maps:
374        # Iterate through key-value pairs in the current staketo_map
375        for key, value in staketo_map.items():
376            # If the key is not present in the merged dictionary, add it
377            if key not in merged_staketo_map:
378                merged_staketo_map[key] = value
379            else:
380                # If the key exists, extend the existing list with the new values
381                merged_staketo_map[key].extend(value)
382
383    format_stake: dict[str, int] = {
384        key: sum(stake for _, stake in value)
385        for key, value in merged_staketo_map.items()
386    }
387
388    key2stake: dict[str, int] = concat_to_local_keys(format_stake, local_keys)
389
390    key2balance = {
391        k: v
392        for k, v in sorted(key2balance.items(), key=lambda item: item[1], reverse=True)
393    }
394
395    key2stake = {
396        k: v
397        for k, v in sorted(key2stake.items(), key=lambda item: item[1], reverse=True)
398    }
399
400    return key2balance, key2stake