communex.misc
1import re 2from typing import Any, TypeVar 3 4from communex._common import transform_stake_dmap 5from communex.client import CommuneClient 6from communex.key import check_ss58_address 7from communex.types import (ModuleInfoWithOptionalBalance, NetworkParams, 8 Ss58Address, SubnetParamsMaps, 9 SubnetParamsWithEmission) 10 11IPFS_REGEX = re.compile(r"^Qm[1-9A-HJ-NP-Za-km-z]{44}$") 12 13T = TypeVar("T") 14 15 16def get_map_modules( 17 client: CommuneClient, 18 netuid: int = 0, 19 include_balances: bool = False, 20) -> dict[str, ModuleInfoWithOptionalBalance]: 21 """ 22 Gets all modules info on the network 23 """ 24 25 request_dict: dict[Any, Any] = { 26 "SubspaceModule": [ 27 ("StakeFrom", []), 28 ("Keys", [netuid]), 29 ("Name", [netuid]), 30 ("Address", [netuid]), 31 ("RegistrationBlock", [netuid]), 32 ("DelegationFee", [netuid]), 33 ("Emission", []), 34 ("Incentive", []), 35 ("Dividends", []), 36 ("LastUpdate", []), 37 ("Metadata", [netuid]), 38 ], 39 } 40 if include_balances: 41 request_dict["System"] = [("Account", [])] 42 43 bulk_query = client.query_batch_map(request_dict) 44 ( 45 ss58_to_stakefrom, 46 uid_to_key, 47 uid_to_name, 48 uid_to_address, 49 uid_to_regblock, 50 ss58_to_delegationfee, 51 uid_to_emission, 52 uid_to_incentive, 53 uid_to_dividend, 54 uid_to_lastupdate, 55 ss58_to_balances, 56 uid_to_metadata, 57 ) = ( 58 bulk_query.get("StakeFrom", {}), 59 bulk_query.get("Keys", {}), 60 bulk_query["Name"], 61 bulk_query["Address"], 62 bulk_query["RegistrationBlock"], 63 bulk_query["DelegationFee"], 64 bulk_query["Emission"], 65 bulk_query["Incentive"], 66 bulk_query["Dividends"], 67 bulk_query["LastUpdate"], 68 bulk_query.get("Account", {}), 69 bulk_query.get("Metadata", {}), 70 ) 71 72 result_modules: dict[str, ModuleInfoWithOptionalBalance] = {} 73 ss58_to_stakefrom = transform_stake_dmap(ss58_to_stakefrom) 74 for uid, key in uid_to_key.items(): 75 key = check_ss58_address(key) 76 name = uid_to_name[uid] 77 address = uid_to_address[uid] 78 emission = uid_to_emission[netuid][uid] 79 incentive = uid_to_incentive[netuid][uid] 80 dividend = uid_to_dividend[netuid][uid] 81 regblock = uid_to_regblock[uid] 82 stake_from = ss58_to_stakefrom.get(key, []) 83 last_update = uid_to_lastupdate[netuid][uid] 84 delegation_fee = ss58_to_delegationfee.get( 85 key, 20 86 ) # 20% default delegation fee 87 metadata = uid_to_metadata.get(uid, None) 88 89 balance = None 90 if include_balances and ss58_to_balances is not None: # type: ignore 91 balance_dict = ss58_to_balances.get(key, None) 92 if balance_dict is not None: 93 assert isinstance(balance_dict["data"], dict) 94 balance = balance_dict["data"]["free"] 95 else: 96 balance = 0 97 stake = sum(stake for _, stake in stake_from) 98 99 module: ModuleInfoWithOptionalBalance = { 100 "uid": uid, 101 "key": key, 102 "name": name, 103 "address": address, 104 "emission": emission, 105 "incentive": incentive, 106 "dividends": dividend, 107 "stake_from": stake_from, 108 "regblock": regblock, 109 "last_update": last_update, 110 "balance": balance, 111 "stake": stake, 112 "delegation_fee": delegation_fee, 113 "metadata": metadata, 114 } 115 116 result_modules[key] = module 117 return result_modules 118 119 120def to_snake_case(d: dict[str, T]) -> dict[str, T]: 121 """ 122 Converts a dictionary with camelCase keys to snake_case keys 123 """ 124 def snakerize(camel: str) -> str: 125 return re.sub(r'(?<!^)(?=[A-Z])', '_', camel).lower() 126 snaked: dict[str, T] = {snakerize(k): v for k, v in d.items()} 127 return snaked 128 129 130def get_map_subnets_params( 131 client: CommuneClient, block_hash: str | None = None 132) -> dict[int, SubnetParamsWithEmission]: 133 """ 134 Gets all subnets info on the network 135 """ 136 bulk_query = client.query_batch_map( 137 { 138 "SubspaceModule": [ 139 ("ImmunityPeriod", []), 140 ("MinAllowedWeights", []), 141 ("MaxAllowedWeights", []), 142 ("Tempo", []), 143 ("MaxAllowedUids", []), 144 ("TargetRegistrationsInterval", []), 145 ("TargetRegistrationsPerInterval", []), 146 ("MaxRegistrationsPerInterval", []), 147 ("Founder", []), 148 ("FounderShare", []), 149 ("IncentiveRatio", []), 150 ("TrustRatio", []), 151 ("SubnetNames", []), 152 ("MaxWeightAge", []), 153 ("BondsMovingAverage", []), 154 ("MaximumSetWeightCallsPerEpoch", []), 155 ("AdjustmentAlpha", []), 156 ("MinImmunityStake", []), 157 ], 158 "GovernanceModule": [ 159 ("SubnetGovernanceConfig", []), 160 ], 161 "SubnetEmissionModule": [ 162 ("SubnetEmission", []), 163 ], 164 165 }, 166 block_hash, 167 ) 168 169 subnet_maps: SubnetParamsMaps = { 170 "netuid_to_emission": bulk_query["SubnetEmission"], 171 "netuid_to_tempo": bulk_query["Tempo"], 172 "netuid_to_min_allowed_weights": bulk_query["MinAllowedWeights"], 173 "netuid_to_max_allowed_weights": bulk_query["MaxAllowedWeights"], 174 "netuid_to_max_allowed_uids": bulk_query["MaxAllowedUids"], 175 "netuid_to_founder": bulk_query["Founder"], 176 "netuid_to_founder_share": bulk_query["FounderShare"], 177 "netuid_to_incentive_ratio": bulk_query["IncentiveRatio"], 178 "netuid_to_trust_ratio": bulk_query["TrustRatio"], 179 "netuid_to_name": bulk_query["SubnetNames"], 180 "netuid_to_max_weight_age": bulk_query["MaxWeightAge"], 181 "netuid_to_adjustment_alpha": bulk_query["AdjustmentAlpha"], 182 "netuid_to_bonds_ma": bulk_query.get("BondsMovingAverage", {}), 183 "netuid_to_maximum_set_weight_calls_per_epoch": bulk_query.get("MaximumSetWeightCallsPerEpoch", {}), 184 "netuid_to_target_registrations_per_interval": bulk_query.get("TargetRegistrationsPerInterval", {}), 185 "netuid_to_target_registrations_interval": bulk_query.get("TargetRegistrationsInterval", {}), 186 "netuid_to_max_registrations_per_interval": bulk_query.get("MaxRegistrationsPerInterval", {}), 187 "netuid_to_min_immunity_stake": bulk_query.get("MinImmunityStake", {}), 188 "netuid_to_governance_configuration": bulk_query["SubnetGovernanceConfig"], 189 "netuid_to_immunity_period": bulk_query["ImmunityPeriod"], 190 } 191 result_subnets: dict[int, SubnetParamsWithEmission] = {} 192 193 default_target_registrations_interval = 200 194 default_target_registrations_per_interval = int(default_target_registrations_interval / 2) 195 default_max_registrations_per_interval = 42 196 197 for netuid, name in subnet_maps["netuid_to_name"].items(): 198 199 subnet: SubnetParamsWithEmission = { 200 "name": name, 201 "founder": subnet_maps["netuid_to_founder"][netuid], 202 "founder_share": subnet_maps["netuid_to_founder_share"][netuid], 203 "incentive_ratio": subnet_maps["netuid_to_incentive_ratio"][netuid], 204 "max_allowed_uids": subnet_maps["netuid_to_max_allowed_uids"][netuid], 205 "max_allowed_weights": subnet_maps["netuid_to_max_allowed_weights"][netuid], 206 "min_allowed_weights": subnet_maps["netuid_to_min_allowed_weights"][netuid], 207 "tempo": subnet_maps["netuid_to_tempo"][netuid], 208 "trust_ratio": subnet_maps["netuid_to_trust_ratio"][netuid], 209 "emission": subnet_maps["netuid_to_emission"][netuid], 210 "max_weight_age": subnet_maps["netuid_to_max_weight_age"][netuid], 211 "adjustment_alpha": subnet_maps["netuid_to_adjustment_alpha"][netuid], 212 "bonds_ma": subnet_maps["netuid_to_bonds_ma"].get(netuid, None), 213 "maximum_set_weight_calls_per_epoch": subnet_maps["netuid_to_maximum_set_weight_calls_per_epoch"].get(netuid, 30), 214 "target_registrations_per_interval": subnet_maps["netuid_to_target_registrations_per_interval"].get(netuid, default_target_registrations_per_interval), 215 "target_registrations_interval": subnet_maps["netuid_to_target_registrations_interval"].get(netuid, default_target_registrations_interval), 216 "max_registrations_per_interval": subnet_maps["netuid_to_max_registrations_per_interval"].get(netuid, default_max_registrations_per_interval), 217 "min_immunity_stake": subnet_maps["netuid_to_min_immunity_stake"].get(netuid, 0), 218 "governance_config": subnet_maps["netuid_to_governance_configuration"][netuid], 219 "immunity_period": subnet_maps["netuid_to_immunity_period"][netuid], 220 } 221 222 result_subnets[netuid] = subnet 223 224 return result_subnets 225 226 227def get_global_params(c_client: CommuneClient) -> NetworkParams: 228 """ 229 Returns global parameters of the whole commune ecosystem 230 """ 231 232 query_all = c_client.query_batch( 233 { 234 "SubspaceModule": [ 235 ("MaxNameLength", []), 236 ("MinNameLength", []), 237 ("MaxAllowedSubnets", []), 238 ("MaxAllowedModules", []), 239 ("MaxRegistrationsPerBlock", []), 240 ("MaxAllowedWeightsGlobal", []), 241 ("FloorDelegationFee", []), 242 ("FloorFounderShare", []), 243 ("MinWeightStake", []), 244 ("BurnConfig", []), 245 ("Kappa", []), 246 ("Rho", []), 247 ("SubnetImmunityPeriod", []), 248 ], 249 "GovernanceModule": [ 250 ("GlobalGovernanceConfig", []), 251 ("GeneralSubnetApplicationCost", []), 252 ("Curator", []), 253 ] 254 } 255 ) 256 global_params: NetworkParams = { 257 "max_allowed_subnets": int(query_all["MaxAllowedSubnets"]), 258 "max_allowed_modules": int(query_all["MaxAllowedModules"]), 259 "max_registrations_per_block": int(query_all["MaxRegistrationsPerBlock"]), 260 "max_name_length": int(query_all["MaxNameLength"]), 261 "min_burn": int(query_all["BurnConfig"]["min_burn"]), # type: ignore 262 "max_burn": int(query_all["BurnConfig"]["max_burn"]), # type: ignore 263 "min_weight_stake": int(query_all["MinWeightStake"]), 264 "floor_delegation_fee": int(query_all["FloorDelegationFee"]), 265 "max_allowed_weights": int(query_all["MaxAllowedWeightsGlobal"]), 266 "curator": Ss58Address(query_all["Curator"]), 267 "min_name_length": int(query_all["MinNameLength"]), 268 "floor_founder_share": int(query_all["FloorFounderShare"]), 269 "general_subnet_application_cost": int(query_all["GeneralSubnetApplicationCost"]), 270 "kappa": int(query_all["Kappa"]), 271 "rho": int(query_all["Rho"]), 272 "subnet_immunity_period": int(query_all["SubnetImmunityPeriod"]), 273 "governance_config": query_all["GlobalGovernanceConfig"] # type: ignore 274 } 275 return global_params 276 277 278def concat_to_local_keys( 279 balance: dict[str, int], local_key_info: dict[str, Ss58Address] 280) -> dict[str, int]: 281 key2: dict[str, int] = { 282 key_name: balance.get(key_address, 0) 283 for key_name, key_address in local_key_info.items() 284 } 285 286 return key2 287 288 289def local_keys_to_freebalance( 290 c_client: CommuneClient, 291 local_keys: dict[str, Ss58Address], 292) -> dict[str, int]: 293 query_all = c_client.query_batch_map( 294 { 295 "System": [("Account", [])], 296 } 297 ) 298 balance_map = query_all["Account"] 299 300 format_balances: dict[str, int] = { 301 key: value["data"]["free"] 302 for key, value in balance_map.items() 303 if "data" in value and "free" in value["data"] 304 } 305 306 key2balance: dict[str, int] = concat_to_local_keys(format_balances, local_keys) 307 308 return key2balance 309 310 311def local_keys_to_stakedbalance( 312 c_client: CommuneClient, 313 local_keys: dict[str, Ss58Address], 314) -> dict[str, int]: 315 staketo_map = c_client.query_map_staketo() 316 317 format_stake: dict[str, int] = { 318 key: sum(stake for _, stake in value) for key, value in staketo_map.items() 319 } 320 321 key2stake: dict[str, int] = concat_to_local_keys(format_stake, local_keys) 322 323 return key2stake 324 325 326def local_keys_to_stakedfrom_balance( 327 c_client: CommuneClient, 328 local_keys: dict[str, Ss58Address], 329) -> dict[str, int]: 330 stakefrom_map = c_client.query_map_stakefrom() 331 332 format_stake: dict[str, int] = { 333 key: sum(stake for _, stake in value) for key, value in stakefrom_map.items() 334 } 335 336 key2stake: dict[str, int] = concat_to_local_keys(format_stake, local_keys) 337 key2stake = {key: stake for key, stake in key2stake.items()} 338 return key2stake 339 340 341def local_keys_allbalance( 342 c_client: CommuneClient, 343 local_keys: dict[str, Ss58Address], 344) -> tuple[dict[str, int], dict[str, int]]: 345 query_all = c_client.query_batch_map( 346 { 347 "System": [("Account", [])], 348 "SubspaceModule": [ 349 ("StakeTo", []), 350 ], 351 } 352 ) 353 354 balance_map, staketo_map = query_all["Account"], transform_stake_dmap(query_all["StakeTo"]) 355 356 format_balances: dict[str, int] = { 357 key: value["data"]["free"] 358 for key, value in balance_map.items() 359 if "data" in value and "free" in value["data"] 360 } 361 key2balance: dict[str, int] = concat_to_local_keys(format_balances, local_keys) 362 format_stake: dict[str, int] = { 363 key: sum(stake for _, stake in value) for key, value in staketo_map.items() 364 } 365 366 key2stake: dict[str, int] = concat_to_local_keys(format_stake, local_keys) 367 368 key2balance = { 369 k: v 370 for k, v in sorted(key2balance.items(), key=lambda item: item[1], reverse=True) 371 } 372 373 key2stake = { 374 k: v 375 for k, v in sorted(key2stake.items(), key=lambda item: item[1], reverse=True) 376 } 377 378 return key2balance, key2stake 379 380 381if __name__ == "__main__": 382 from communex._common import get_node_url 383 client = CommuneClient(get_node_url(use_testnet=True)) 384 get_global_params(client)
IPFS_REGEX =
re.compile('^Qm[1-9A-HJ-NP-Za-km-z]{44}$')
def
get_map_modules( client: communex.client.CommuneClient, netuid: int = 0, include_balances: bool = False) -> dict[str, communex.types.ModuleInfoWithOptionalBalance]:
17def get_map_modules( 18 client: CommuneClient, 19 netuid: int = 0, 20 include_balances: bool = False, 21) -> dict[str, ModuleInfoWithOptionalBalance]: 22 """ 23 Gets all modules info on the network 24 """ 25 26 request_dict: dict[Any, Any] = { 27 "SubspaceModule": [ 28 ("StakeFrom", []), 29 ("Keys", [netuid]), 30 ("Name", [netuid]), 31 ("Address", [netuid]), 32 ("RegistrationBlock", [netuid]), 33 ("DelegationFee", [netuid]), 34 ("Emission", []), 35 ("Incentive", []), 36 ("Dividends", []), 37 ("LastUpdate", []), 38 ("Metadata", [netuid]), 39 ], 40 } 41 if include_balances: 42 request_dict["System"] = [("Account", [])] 43 44 bulk_query = client.query_batch_map(request_dict) 45 ( 46 ss58_to_stakefrom, 47 uid_to_key, 48 uid_to_name, 49 uid_to_address, 50 uid_to_regblock, 51 ss58_to_delegationfee, 52 uid_to_emission, 53 uid_to_incentive, 54 uid_to_dividend, 55 uid_to_lastupdate, 56 ss58_to_balances, 57 uid_to_metadata, 58 ) = ( 59 bulk_query.get("StakeFrom", {}), 60 bulk_query.get("Keys", {}), 61 bulk_query["Name"], 62 bulk_query["Address"], 63 bulk_query["RegistrationBlock"], 64 bulk_query["DelegationFee"], 65 bulk_query["Emission"], 66 bulk_query["Incentive"], 67 bulk_query["Dividends"], 68 bulk_query["LastUpdate"], 69 bulk_query.get("Account", {}), 70 bulk_query.get("Metadata", {}), 71 ) 72 73 result_modules: dict[str, ModuleInfoWithOptionalBalance] = {} 74 ss58_to_stakefrom = transform_stake_dmap(ss58_to_stakefrom) 75 for uid, key in uid_to_key.items(): 76 key = check_ss58_address(key) 77 name = uid_to_name[uid] 78 address = uid_to_address[uid] 79 emission = uid_to_emission[netuid][uid] 80 incentive = uid_to_incentive[netuid][uid] 81 dividend = uid_to_dividend[netuid][uid] 82 regblock = uid_to_regblock[uid] 83 stake_from = ss58_to_stakefrom.get(key, []) 84 last_update = uid_to_lastupdate[netuid][uid] 85 delegation_fee = ss58_to_delegationfee.get( 86 key, 20 87 ) # 20% default delegation fee 88 metadata = uid_to_metadata.get(uid, None) 89 90 balance = None 91 if include_balances and ss58_to_balances is not None: # type: ignore 92 balance_dict = ss58_to_balances.get(key, None) 93 if balance_dict is not None: 94 assert isinstance(balance_dict["data"], dict) 95 balance = balance_dict["data"]["free"] 96 else: 97 balance = 0 98 stake = sum(stake for _, stake in stake_from) 99 100 module: ModuleInfoWithOptionalBalance = { 101 "uid": uid, 102 "key": key, 103 "name": name, 104 "address": address, 105 "emission": emission, 106 "incentive": incentive, 107 "dividends": dividend, 108 "stake_from": stake_from, 109 "regblock": regblock, 110 "last_update": last_update, 111 "balance": balance, 112 "stake": stake, 113 "delegation_fee": delegation_fee, 114 "metadata": metadata, 115 } 116 117 result_modules[key] = module 118 return result_modules
Gets all modules info on the network
def
to_snake_case(d: dict[str, ~T]) -> dict[str, ~T]:
121def to_snake_case(d: dict[str, T]) -> dict[str, T]: 122 """ 123 Converts a dictionary with camelCase keys to snake_case keys 124 """ 125 def snakerize(camel: str) -> str: 126 return re.sub(r'(?<!^)(?=[A-Z])', '_', camel).lower() 127 snaked: dict[str, T] = {snakerize(k): v for k, v in d.items()} 128 return snaked
Converts a dictionary with camelCase keys to snake_case keys
def
get_map_subnets_params( client: communex.client.CommuneClient, block_hash: str | None = None) -> dict[int, communex.types.SubnetParamsWithEmission]:
131def get_map_subnets_params( 132 client: CommuneClient, block_hash: str | None = None 133) -> dict[int, SubnetParamsWithEmission]: 134 """ 135 Gets all subnets info on the network 136 """ 137 bulk_query = client.query_batch_map( 138 { 139 "SubspaceModule": [ 140 ("ImmunityPeriod", []), 141 ("MinAllowedWeights", []), 142 ("MaxAllowedWeights", []), 143 ("Tempo", []), 144 ("MaxAllowedUids", []), 145 ("TargetRegistrationsInterval", []), 146 ("TargetRegistrationsPerInterval", []), 147 ("MaxRegistrationsPerInterval", []), 148 ("Founder", []), 149 ("FounderShare", []), 150 ("IncentiveRatio", []), 151 ("TrustRatio", []), 152 ("SubnetNames", []), 153 ("MaxWeightAge", []), 154 ("BondsMovingAverage", []), 155 ("MaximumSetWeightCallsPerEpoch", []), 156 ("AdjustmentAlpha", []), 157 ("MinImmunityStake", []), 158 ], 159 "GovernanceModule": [ 160 ("SubnetGovernanceConfig", []), 161 ], 162 "SubnetEmissionModule": [ 163 ("SubnetEmission", []), 164 ], 165 166 }, 167 block_hash, 168 ) 169 170 subnet_maps: SubnetParamsMaps = { 171 "netuid_to_emission": bulk_query["SubnetEmission"], 172 "netuid_to_tempo": bulk_query["Tempo"], 173 "netuid_to_min_allowed_weights": bulk_query["MinAllowedWeights"], 174 "netuid_to_max_allowed_weights": bulk_query["MaxAllowedWeights"], 175 "netuid_to_max_allowed_uids": bulk_query["MaxAllowedUids"], 176 "netuid_to_founder": bulk_query["Founder"], 177 "netuid_to_founder_share": bulk_query["FounderShare"], 178 "netuid_to_incentive_ratio": bulk_query["IncentiveRatio"], 179 "netuid_to_trust_ratio": bulk_query["TrustRatio"], 180 "netuid_to_name": bulk_query["SubnetNames"], 181 "netuid_to_max_weight_age": bulk_query["MaxWeightAge"], 182 "netuid_to_adjustment_alpha": bulk_query["AdjustmentAlpha"], 183 "netuid_to_bonds_ma": bulk_query.get("BondsMovingAverage", {}), 184 "netuid_to_maximum_set_weight_calls_per_epoch": bulk_query.get("MaximumSetWeightCallsPerEpoch", {}), 185 "netuid_to_target_registrations_per_interval": bulk_query.get("TargetRegistrationsPerInterval", {}), 186 "netuid_to_target_registrations_interval": bulk_query.get("TargetRegistrationsInterval", {}), 187 "netuid_to_max_registrations_per_interval": bulk_query.get("MaxRegistrationsPerInterval", {}), 188 "netuid_to_min_immunity_stake": bulk_query.get("MinImmunityStake", {}), 189 "netuid_to_governance_configuration": bulk_query["SubnetGovernanceConfig"], 190 "netuid_to_immunity_period": bulk_query["ImmunityPeriod"], 191 } 192 result_subnets: dict[int, SubnetParamsWithEmission] = {} 193 194 default_target_registrations_interval = 200 195 default_target_registrations_per_interval = int(default_target_registrations_interval / 2) 196 default_max_registrations_per_interval = 42 197 198 for netuid, name in subnet_maps["netuid_to_name"].items(): 199 200 subnet: SubnetParamsWithEmission = { 201 "name": name, 202 "founder": subnet_maps["netuid_to_founder"][netuid], 203 "founder_share": subnet_maps["netuid_to_founder_share"][netuid], 204 "incentive_ratio": subnet_maps["netuid_to_incentive_ratio"][netuid], 205 "max_allowed_uids": subnet_maps["netuid_to_max_allowed_uids"][netuid], 206 "max_allowed_weights": subnet_maps["netuid_to_max_allowed_weights"][netuid], 207 "min_allowed_weights": subnet_maps["netuid_to_min_allowed_weights"][netuid], 208 "tempo": subnet_maps["netuid_to_tempo"][netuid], 209 "trust_ratio": subnet_maps["netuid_to_trust_ratio"][netuid], 210 "emission": subnet_maps["netuid_to_emission"][netuid], 211 "max_weight_age": subnet_maps["netuid_to_max_weight_age"][netuid], 212 "adjustment_alpha": subnet_maps["netuid_to_adjustment_alpha"][netuid], 213 "bonds_ma": subnet_maps["netuid_to_bonds_ma"].get(netuid, None), 214 "maximum_set_weight_calls_per_epoch": subnet_maps["netuid_to_maximum_set_weight_calls_per_epoch"].get(netuid, 30), 215 "target_registrations_per_interval": subnet_maps["netuid_to_target_registrations_per_interval"].get(netuid, default_target_registrations_per_interval), 216 "target_registrations_interval": subnet_maps["netuid_to_target_registrations_interval"].get(netuid, default_target_registrations_interval), 217 "max_registrations_per_interval": subnet_maps["netuid_to_max_registrations_per_interval"].get(netuid, default_max_registrations_per_interval), 218 "min_immunity_stake": subnet_maps["netuid_to_min_immunity_stake"].get(netuid, 0), 219 "governance_config": subnet_maps["netuid_to_governance_configuration"][netuid], 220 "immunity_period": subnet_maps["netuid_to_immunity_period"][netuid], 221 } 222 223 result_subnets[netuid] = subnet 224 225 return result_subnets
Gets all subnets info on the network
228def get_global_params(c_client: CommuneClient) -> NetworkParams: 229 """ 230 Returns global parameters of the whole commune ecosystem 231 """ 232 233 query_all = c_client.query_batch( 234 { 235 "SubspaceModule": [ 236 ("MaxNameLength", []), 237 ("MinNameLength", []), 238 ("MaxAllowedSubnets", []), 239 ("MaxAllowedModules", []), 240 ("MaxRegistrationsPerBlock", []), 241 ("MaxAllowedWeightsGlobal", []), 242 ("FloorDelegationFee", []), 243 ("FloorFounderShare", []), 244 ("MinWeightStake", []), 245 ("BurnConfig", []), 246 ("Kappa", []), 247 ("Rho", []), 248 ("SubnetImmunityPeriod", []), 249 ], 250 "GovernanceModule": [ 251 ("GlobalGovernanceConfig", []), 252 ("GeneralSubnetApplicationCost", []), 253 ("Curator", []), 254 ] 255 } 256 ) 257 global_params: NetworkParams = { 258 "max_allowed_subnets": int(query_all["MaxAllowedSubnets"]), 259 "max_allowed_modules": int(query_all["MaxAllowedModules"]), 260 "max_registrations_per_block": int(query_all["MaxRegistrationsPerBlock"]), 261 "max_name_length": int(query_all["MaxNameLength"]), 262 "min_burn": int(query_all["BurnConfig"]["min_burn"]), # type: ignore 263 "max_burn": int(query_all["BurnConfig"]["max_burn"]), # type: ignore 264 "min_weight_stake": int(query_all["MinWeightStake"]), 265 "floor_delegation_fee": int(query_all["FloorDelegationFee"]), 266 "max_allowed_weights": int(query_all["MaxAllowedWeightsGlobal"]), 267 "curator": Ss58Address(query_all["Curator"]), 268 "min_name_length": int(query_all["MinNameLength"]), 269 "floor_founder_share": int(query_all["FloorFounderShare"]), 270 "general_subnet_application_cost": int(query_all["GeneralSubnetApplicationCost"]), 271 "kappa": int(query_all["Kappa"]), 272 "rho": int(query_all["Rho"]), 273 "subnet_immunity_period": int(query_all["SubnetImmunityPeriod"]), 274 "governance_config": query_all["GlobalGovernanceConfig"] # type: ignore 275 } 276 return global_params
Returns global parameters of the whole commune ecosystem
def
concat_to_local_keys( balance: dict[str, int], local_key_info: dict[str, communex.types.Ss58Address]) -> dict[str, int]:
def
local_keys_to_freebalance( c_client: communex.client.CommuneClient, local_keys: dict[str, communex.types.Ss58Address]) -> dict[str, int]:
290def local_keys_to_freebalance( 291 c_client: CommuneClient, 292 local_keys: dict[str, Ss58Address], 293) -> dict[str, int]: 294 query_all = c_client.query_batch_map( 295 { 296 "System": [("Account", [])], 297 } 298 ) 299 balance_map = query_all["Account"] 300 301 format_balances: dict[str, int] = { 302 key: value["data"]["free"] 303 for key, value in balance_map.items() 304 if "data" in value and "free" in value["data"] 305 } 306 307 key2balance: dict[str, int] = concat_to_local_keys(format_balances, local_keys) 308 309 return key2balance
def
local_keys_to_stakedbalance( c_client: communex.client.CommuneClient, local_keys: dict[str, communex.types.Ss58Address]) -> dict[str, int]:
312def local_keys_to_stakedbalance( 313 c_client: CommuneClient, 314 local_keys: dict[str, Ss58Address], 315) -> dict[str, int]: 316 staketo_map = c_client.query_map_staketo() 317 318 format_stake: dict[str, int] = { 319 key: sum(stake for _, stake in value) for key, value in staketo_map.items() 320 } 321 322 key2stake: dict[str, int] = concat_to_local_keys(format_stake, local_keys) 323 324 return key2stake
def
local_keys_to_stakedfrom_balance( c_client: communex.client.CommuneClient, local_keys: dict[str, communex.types.Ss58Address]) -> dict[str, int]:
327def local_keys_to_stakedfrom_balance( 328 c_client: CommuneClient, 329 local_keys: dict[str, Ss58Address], 330) -> dict[str, int]: 331 stakefrom_map = c_client.query_map_stakefrom() 332 333 format_stake: dict[str, int] = { 334 key: sum(stake for _, stake in value) for key, value in stakefrom_map.items() 335 } 336 337 key2stake: dict[str, int] = concat_to_local_keys(format_stake, local_keys) 338 key2stake = {key: stake for key, stake in key2stake.items()} 339 return key2stake
def
local_keys_allbalance( c_client: communex.client.CommuneClient, local_keys: dict[str, communex.types.Ss58Address]) -> tuple[dict[str, int], dict[str, int]]:
342def local_keys_allbalance( 343 c_client: CommuneClient, 344 local_keys: dict[str, Ss58Address], 345) -> tuple[dict[str, int], dict[str, int]]: 346 query_all = c_client.query_batch_map( 347 { 348 "System": [("Account", [])], 349 "SubspaceModule": [ 350 ("StakeTo", []), 351 ], 352 } 353 ) 354 355 balance_map, staketo_map = query_all["Account"], transform_stake_dmap(query_all["StakeTo"]) 356 357 format_balances: dict[str, int] = { 358 key: value["data"]["free"] 359 for key, value in balance_map.items() 360 if "data" in value and "free" in value["data"] 361 } 362 key2balance: dict[str, int] = concat_to_local_keys(format_balances, local_keys) 363 format_stake: dict[str, int] = { 364 key: sum(stake for _, stake in value) for key, value in staketo_map.items() 365 } 366 367 key2stake: dict[str, int] = concat_to_local_keys(format_stake, local_keys) 368 369 key2balance = { 370 k: v 371 for k, v in sorted(key2balance.items(), key=lambda item: item[1], reverse=True) 372 } 373 374 key2stake = { 375 k: v 376 for k, v in sorted(key2stake.items(), key=lambda item: item[1], reverse=True) 377 } 378 379 return key2balance, key2stake