communex.misc
1import re 2from typing import Any, TypeVar, cast 3 4from communex._common import transform_stake_dmap 5from communex.balance import to_nano 6from communex.cli._common import transform_subnet_params 7from communex.client import CommuneClient 8from communex.key import check_ss58_address 9from communex.types import ( 10 BurnConfiguration, 11 GovernanceConfiguration, 12 ModuleInfoWithOptionalBalance, 13 NetworkParams, 14 Ss58Address, 15 SubnetParamsMaps, 16 SubnetParamsWithEmission, 17) 18 19IPFS_REGEX = re.compile(r"^Qm[1-9A-HJ-NP-Za-km-z]{44}$") 20 21T = TypeVar("T") 22 23 24def get_map_modules( 25 client: CommuneClient, 26 netuid: int = 0, 27 include_balances: bool = False, 28) -> dict[str, ModuleInfoWithOptionalBalance]: 29 """ 30 Gets all modules info on the network 31 """ 32 33 request_dict: dict[Any, Any] = { 34 "SubspaceModule": [ 35 ("StakeFrom", []), 36 ("Keys", [netuid]), 37 ("Name", [netuid]), 38 ("Address", [netuid]), 39 ("RegistrationBlock", [netuid]), 40 ("DelegationFee", []), 41 ("Emission", []), 42 ("Incentive", []), 43 ("Dividends", []), 44 ("LastUpdate", []), 45 ("Metadata", [netuid]), 46 ("StakeTo", []), 47 ], 48 } 49 if include_balances: 50 request_dict["System"] = [("Account", [])] 51 bulk_query = client.query_batch_map(request_dict) 52 ( 53 ss58_to_stakefrom, 54 uid_to_key, 55 uid_to_name, 56 uid_to_address, 57 uid_to_regblock, 58 ss58_to_delegationfee, 59 uid_to_emission, 60 uid_to_incentive, 61 uid_to_dividend, 62 uid_to_lastupdate, 63 ss58_to_balances, 64 ss58_to_metadata, 65 ) = ( 66 bulk_query.get("StakeFrom", {}), 67 bulk_query.get("Keys", {}), 68 bulk_query["Name"], 69 bulk_query["Address"], 70 bulk_query["RegistrationBlock"], 71 bulk_query["DelegationFee"], 72 bulk_query["Emission"], 73 bulk_query["Incentive"], 74 bulk_query["Dividends"], 75 bulk_query["LastUpdate"], 76 bulk_query.get("Account", {}), 77 bulk_query.get("Metadata", {}), 78 ) 79 result_modules: dict[str, ModuleInfoWithOptionalBalance] = {} 80 ss58_to_stakefrom = transform_stake_dmap(ss58_to_stakefrom) 81 for uid, key in uid_to_key.items(): 82 key = check_ss58_address(key) 83 name = uid_to_name[uid] 84 address = uid_to_address[uid] 85 emission = uid_to_emission[netuid][uid] 86 incentive = uid_to_incentive[netuid][uid] 87 dividend = uid_to_dividend[netuid][uid] 88 regblock = uid_to_regblock[uid] 89 stake_from = ss58_to_stakefrom.get(key, []) 90 last_update = uid_to_lastupdate[netuid][uid] 91 delegation_fee = ss58_to_delegationfee.get( 92 key, 5 93 ) # 5% default delegation fee 94 metadata = ss58_to_metadata.get(key, None) 95 96 balance = None 97 if include_balances and ss58_to_balances is not None: # type: ignore 98 balance_dict = ss58_to_balances.get(key, None) 99 if balance_dict is not None: 100 assert isinstance(balance_dict["data"], dict) 101 balance = balance_dict["data"]["free"] 102 else: 103 balance = 0 104 stake = sum(stake for _, stake in stake_from) 105 106 module: ModuleInfoWithOptionalBalance = { 107 "uid": uid, 108 "key": key, 109 "name": name, 110 "address": address, 111 "emission": emission, 112 "incentive": incentive, 113 "dividends": dividend, 114 "stake_from": stake_from, 115 "regblock": regblock, 116 "last_update": last_update, 117 "balance": balance, 118 "stake": stake, 119 "delegation_fee": delegation_fee, 120 "metadata": metadata, 121 } 122 123 result_modules[key] = module 124 return result_modules 125 126 127def to_snake_case(d: dict[str, T]) -> dict[str, T]: 128 """ 129 Converts a dictionary with camelCase keys to snake_case keys 130 """ 131 132 def snakerize(camel: str) -> str: 133 return re.sub(r"(?<!^)(?=[A-Z])", "_", camel).lower() 134 135 snaked: dict[str, T] = {snakerize(k): v for k, v in d.items()} 136 return snaked 137 138 139def get_map_displayable_subnets(client: CommuneClient): 140 subnets = get_map_subnets_params(client) 141 display_values = transform_subnet_params(subnets) 142 return display_values 143 144 145def get_map_subnets_params( 146 client: CommuneClient, block_hash: str | None = None 147) -> dict[int, SubnetParamsWithEmission]: 148 """ 149 Gets all subnets info on the network 150 """ 151 bulk_query = client.query_batch_map( 152 { 153 "SubspaceModule": [ 154 ("ImmunityPeriod", []), 155 ("MinAllowedWeights", []), 156 ("MaxAllowedWeights", []), 157 ("Tempo", []), 158 ("MaxAllowedUids", []), 159 ("Founder", []), 160 ("FounderShare", []), 161 ("IncentiveRatio", []), 162 ("TrustRatio", []), 163 ("SubnetNames", []), 164 ("MaxWeightAge", []), 165 ("BondsMovingAverage", []), 166 ("MaximumSetWeightCallsPerEpoch", []), 167 ("MinValidatorStake", []), 168 ("MaxAllowedValidators", []), 169 ("ModuleBurnConfig", []), 170 ("SubnetMetadata", []), 171 ], 172 "GovernanceModule": [ 173 ("SubnetGovernanceConfig", []), 174 ], 175 "SubnetEmissionModule": [ 176 ("SubnetEmission", []), 177 ], 178 }, 179 block_hash, 180 ) 181 subnet_maps: SubnetParamsMaps = { 182 "netuid_to_emission": bulk_query["SubnetEmission"], 183 "netuid_to_tempo": bulk_query["Tempo"], 184 "netuid_to_min_allowed_weights": bulk_query["MinAllowedWeights"], 185 "netuid_to_max_allowed_weights": bulk_query["MaxAllowedWeights"], 186 "netuid_to_max_allowed_uids": bulk_query["MaxAllowedUids"], 187 "netuid_to_founder": bulk_query["Founder"], 188 "netuid_to_founder_share": bulk_query["FounderShare"], 189 "netuid_to_incentive_ratio": bulk_query["IncentiveRatio"], 190 "netuid_to_trust_ratio": bulk_query["TrustRatio"], 191 "netuid_to_name": bulk_query["SubnetNames"], 192 "netuid_to_max_weight_age": bulk_query["MaxWeightAge"], 193 "netuid_to_bonds_ma": bulk_query.get("BondsMovingAverage", {}), 194 "netuid_to_maximum_set_weight_calls_per_epoch": bulk_query.get( 195 "MaximumSetWeightCallsPerEpoch", {} 196 ), 197 "netuid_to_governance_configuration": bulk_query[ 198 "SubnetGovernanceConfig" 199 ], 200 "netuid_to_immunity_period": bulk_query["ImmunityPeriod"], 201 "netuid_to_min_validator_stake": bulk_query.get( 202 "MinValidatorStake", {} 203 ), 204 "netuid_to_max_allowed_validators": bulk_query.get( 205 "MaxAllowedValidators", {} 206 ), 207 "netuid_to_module_burn_config": bulk_query.get("ModuleBurnConfig", {}), 208 "netuid_to_subnet_metadata": bulk_query.get("SubnetMetadata", {}), 209 } 210 result_subnets: dict[int, SubnetParamsWithEmission] = {} 211 212 for netuid, name in subnet_maps["netuid_to_name"].items(): 213 subnet: SubnetParamsWithEmission = { 214 "name": name, 215 "founder": subnet_maps["netuid_to_founder"][netuid], 216 "founder_share": subnet_maps["netuid_to_founder_share"][netuid], 217 "incentive_ratio": subnet_maps["netuid_to_incentive_ratio"][netuid], 218 "max_allowed_uids": subnet_maps["netuid_to_max_allowed_uids"][ 219 netuid 220 ], 221 "max_allowed_weights": subnet_maps["netuid_to_max_allowed_weights"][ 222 netuid 223 ], 224 "min_allowed_weights": subnet_maps["netuid_to_min_allowed_weights"][ 225 netuid 226 ], 227 "tempo": subnet_maps["netuid_to_tempo"][netuid], 228 "trust_ratio": subnet_maps["netuid_to_trust_ratio"][netuid], 229 "emission": subnet_maps["netuid_to_emission"][netuid], 230 "max_weight_age": subnet_maps["netuid_to_max_weight_age"][netuid], 231 "bonds_ma": subnet_maps["netuid_to_bonds_ma"].get(netuid, None), 232 "maximum_set_weight_calls_per_epoch": subnet_maps[ 233 "netuid_to_maximum_set_weight_calls_per_epoch" 234 ].get(netuid, 30), 235 "governance_config": subnet_maps[ 236 "netuid_to_governance_configuration" 237 ][netuid], 238 "immunity_period": subnet_maps["netuid_to_immunity_period"][netuid], 239 "min_validator_stake": subnet_maps[ 240 "netuid_to_min_validator_stake" 241 ].get(netuid, to_nano(50_000)), 242 "max_allowed_validators": subnet_maps[ 243 "netuid_to_max_allowed_validators" 244 ].get(netuid, 50), 245 "module_burn_config": cast( 246 BurnConfiguration, 247 subnet_maps["netuid_to_module_burn_config"].get(netuid, None), 248 ), 249 "subnet_metadata": subnet_maps["netuid_to_subnet_metadata"].get( 250 netuid, None 251 ), 252 } 253 254 result_subnets[netuid] = subnet 255 256 return result_subnets 257 258 259def get_global_params(c_client: CommuneClient) -> NetworkParams: 260 """ 261 Returns global parameters of the whole commune ecosystem 262 """ 263 264 query_all = c_client.query_batch( 265 { 266 "SubspaceModule": [ 267 ("MaxNameLength", []), 268 ("MinNameLength", []), 269 ("MaxAllowedSubnets", []), 270 ("MaxAllowedModules", []), 271 ("MaxRegistrationsPerBlock", []), 272 ("MaxAllowedWeightsGlobal", []), 273 ("FloorDelegationFee", []), 274 ("FloorFounderShare", []), 275 ("MinWeightStake", []), 276 ("Kappa", []), 277 ("Rho", []), 278 ("SubnetImmunityPeriod", []), 279 ("SubnetBurn", []), 280 ], 281 "GovernanceModule": [ 282 ("GlobalGovernanceConfig", []), 283 ("GeneralSubnetApplicationCost", []), 284 ("Curator", []), 285 ], 286 } 287 ) 288 global_config = cast( 289 GovernanceConfiguration, query_all["GlobalGovernanceConfig"] 290 ) 291 global_params: NetworkParams = { 292 "max_allowed_subnets": int(query_all["MaxAllowedSubnets"]), 293 "max_allowed_modules": int(query_all["MaxAllowedModules"]), 294 "max_registrations_per_block": int( 295 query_all["MaxRegistrationsPerBlock"] 296 ), 297 "max_name_length": int(query_all["MaxNameLength"]), 298 "min_weight_stake": int(query_all["MinWeightStake"]), 299 "floor_delegation_fee": int(query_all["FloorDelegationFee"]), 300 "max_allowed_weights": int(query_all["MaxAllowedWeightsGlobal"]), 301 "curator": Ss58Address(query_all["Curator"]), 302 "min_name_length": int(query_all["MinNameLength"]), 303 "floor_founder_share": int(query_all["FloorFounderShare"]), 304 "general_subnet_application_cost": int( 305 query_all["GeneralSubnetApplicationCost"] 306 ), 307 "kappa": int(query_all["Kappa"]), 308 "rho": int(query_all["Rho"]), 309 "subnet_immunity_period": int(query_all["SubnetImmunityPeriod"]), 310 "subnet_registration_cost": int(query_all["SubnetBurn"]), 311 "governance_config": { 312 "proposal_cost": int(global_config["proposal_cost"]), 313 "proposal_expiration": int(global_config["proposal_expiration"]), 314 "vote_mode": global_config["vote_mode"], 315 "proposal_reward_treasury_allocation": int( 316 global_config["proposal_reward_treasury_allocation"] 317 ), 318 "max_proposal_reward_treasury_allocation": int( 319 global_config["max_proposal_reward_treasury_allocation"] 320 ), 321 "proposal_reward_interval": int( 322 global_config["proposal_reward_interval"] 323 ), 324 }, 325 } 326 return global_params 327 328 329def concat_to_local_keys( 330 balance: dict[str, int], local_key_info: dict[str, Ss58Address] 331) -> dict[str, int]: 332 key2: dict[str, int] = { 333 key_name: balance.get(key_address, 0) 334 for key_name, key_address in local_key_info.items() 335 } 336 337 return key2 338 339 340def local_keys_to_freebalance( 341 c_client: CommuneClient, 342 local_keys: dict[str, Ss58Address], 343) -> dict[str, int]: 344 query_all = c_client.query_batch_map( 345 { 346 "System": [("Account", [])], 347 } 348 ) 349 balance_map = query_all["Account"] 350 351 format_balances: dict[str, int] = { 352 key: value["data"]["free"] 353 for key, value in balance_map.items() 354 if "data" in value and "free" in value["data"] 355 } 356 357 key2balance: dict[str, int] = concat_to_local_keys( 358 format_balances, local_keys 359 ) 360 361 return key2balance 362 363 364def local_keys_to_stakedbalance( 365 c_client: CommuneClient, 366 local_keys: dict[str, Ss58Address], 367) -> dict[str, int]: 368 staketo_map = c_client.query_map_staketo() 369 370 format_stake: dict[str, int] = { 371 key: sum(stake for _, stake in value) 372 for key, value in staketo_map.items() 373 } 374 375 key2stake: dict[str, int] = concat_to_local_keys(format_stake, local_keys) 376 377 return key2stake 378 379 380def local_keys_to_stakedfrom_balance( 381 c_client: CommuneClient, 382 local_keys: dict[str, Ss58Address], 383) -> dict[str, int]: 384 stakefrom_map = c_client.query_map_stakefrom() 385 386 format_stake: dict[str, int] = { 387 key: sum(stake for _, stake in value) 388 for key, value in stakefrom_map.items() 389 } 390 391 key2stake: dict[str, int] = concat_to_local_keys(format_stake, local_keys) 392 key2stake = {key: stake for key, stake in key2stake.items()} 393 return key2stake 394 395 396def local_keys_allbalance( 397 c_client: CommuneClient, 398 local_keys: dict[str, Ss58Address], 399) -> tuple[dict[str, int], dict[str, int]]: 400 query_all = c_client.query_batch_map( 401 { 402 "System": [("Account", [])], 403 "SubspaceModule": [ 404 ("StakeTo", []), 405 ], 406 } 407 ) 408 409 balance_map, staketo_map = ( 410 query_all["Account"], 411 transform_stake_dmap(query_all["StakeTo"]), 412 ) 413 414 format_balances: dict[str, int] = { 415 key: value["data"]["free"] 416 for key, value in balance_map.items() 417 if "data" in value and "free" in value["data"] 418 } 419 key2balance: dict[str, int] = concat_to_local_keys( 420 format_balances, local_keys 421 ) 422 format_stake: dict[str, int] = { 423 key: sum(stake for _, stake in value) 424 for key, value in staketo_map.items() 425 } 426 427 key2stake: dict[str, int] = concat_to_local_keys(format_stake, local_keys) 428 429 key2balance = { 430 k: v 431 for k, v in sorted( 432 key2balance.items(), key=lambda item: item[1], reverse=True 433 ) 434 } 435 436 key2stake = { 437 k: v 438 for k, v in sorted( 439 key2stake.items(), key=lambda item: item[1], reverse=True 440 ) 441 } 442 443 return key2balance, key2stake 444 445 446if __name__ == "__main__": 447 from communex._common import get_node_url 448 449 client = CommuneClient(get_node_url(use_testnet=True)) 450 get_global_params(client)
IPFS_REGEX =
re.compile('^Qm[1-9A-HJ-NP-Za-km-z]{44}$')
def
get_map_modules( client: communex.client.CommuneClient, netuid: int = 0, include_balances: bool = False) -> dict[str, communex.types.ModuleInfoWithOptionalBalance]:
25def get_map_modules( 26 client: CommuneClient, 27 netuid: int = 0, 28 include_balances: bool = False, 29) -> dict[str, ModuleInfoWithOptionalBalance]: 30 """ 31 Gets all modules info on the network 32 """ 33 34 request_dict: dict[Any, Any] = { 35 "SubspaceModule": [ 36 ("StakeFrom", []), 37 ("Keys", [netuid]), 38 ("Name", [netuid]), 39 ("Address", [netuid]), 40 ("RegistrationBlock", [netuid]), 41 ("DelegationFee", []), 42 ("Emission", []), 43 ("Incentive", []), 44 ("Dividends", []), 45 ("LastUpdate", []), 46 ("Metadata", [netuid]), 47 ("StakeTo", []), 48 ], 49 } 50 if include_balances: 51 request_dict["System"] = [("Account", [])] 52 bulk_query = client.query_batch_map(request_dict) 53 ( 54 ss58_to_stakefrom, 55 uid_to_key, 56 uid_to_name, 57 uid_to_address, 58 uid_to_regblock, 59 ss58_to_delegationfee, 60 uid_to_emission, 61 uid_to_incentive, 62 uid_to_dividend, 63 uid_to_lastupdate, 64 ss58_to_balances, 65 ss58_to_metadata, 66 ) = ( 67 bulk_query.get("StakeFrom", {}), 68 bulk_query.get("Keys", {}), 69 bulk_query["Name"], 70 bulk_query["Address"], 71 bulk_query["RegistrationBlock"], 72 bulk_query["DelegationFee"], 73 bulk_query["Emission"], 74 bulk_query["Incentive"], 75 bulk_query["Dividends"], 76 bulk_query["LastUpdate"], 77 bulk_query.get("Account", {}), 78 bulk_query.get("Metadata", {}), 79 ) 80 result_modules: dict[str, ModuleInfoWithOptionalBalance] = {} 81 ss58_to_stakefrom = transform_stake_dmap(ss58_to_stakefrom) 82 for uid, key in uid_to_key.items(): 83 key = check_ss58_address(key) 84 name = uid_to_name[uid] 85 address = uid_to_address[uid] 86 emission = uid_to_emission[netuid][uid] 87 incentive = uid_to_incentive[netuid][uid] 88 dividend = uid_to_dividend[netuid][uid] 89 regblock = uid_to_regblock[uid] 90 stake_from = ss58_to_stakefrom.get(key, []) 91 last_update = uid_to_lastupdate[netuid][uid] 92 delegation_fee = ss58_to_delegationfee.get( 93 key, 5 94 ) # 5% default delegation fee 95 metadata = ss58_to_metadata.get(key, None) 96 97 balance = None 98 if include_balances and ss58_to_balances is not None: # type: ignore 99 balance_dict = ss58_to_balances.get(key, None) 100 if balance_dict is not None: 101 assert isinstance(balance_dict["data"], dict) 102 balance = balance_dict["data"]["free"] 103 else: 104 balance = 0 105 stake = sum(stake for _, stake in stake_from) 106 107 module: ModuleInfoWithOptionalBalance = { 108 "uid": uid, 109 "key": key, 110 "name": name, 111 "address": address, 112 "emission": emission, 113 "incentive": incentive, 114 "dividends": dividend, 115 "stake_from": stake_from, 116 "regblock": regblock, 117 "last_update": last_update, 118 "balance": balance, 119 "stake": stake, 120 "delegation_fee": delegation_fee, 121 "metadata": metadata, 122 } 123 124 result_modules[key] = module 125 return result_modules
Gets all modules info on the network
def
to_snake_case(d: dict[str, ~T]) -> dict[str, ~T]:
128def to_snake_case(d: dict[str, T]) -> dict[str, T]: 129 """ 130 Converts a dictionary with camelCase keys to snake_case keys 131 """ 132 133 def snakerize(camel: str) -> str: 134 return re.sub(r"(?<!^)(?=[A-Z])", "_", camel).lower() 135 136 snaked: dict[str, T] = {snakerize(k): v for k, v in d.items()} 137 return snaked
Converts a dictionary with camelCase keys to snake_case keys
def
get_map_subnets_params( client: communex.client.CommuneClient, block_hash: str | None = None) -> dict[int, communex.types.SubnetParamsWithEmission]:
146def get_map_subnets_params( 147 client: CommuneClient, block_hash: str | None = None 148) -> dict[int, SubnetParamsWithEmission]: 149 """ 150 Gets all subnets info on the network 151 """ 152 bulk_query = client.query_batch_map( 153 { 154 "SubspaceModule": [ 155 ("ImmunityPeriod", []), 156 ("MinAllowedWeights", []), 157 ("MaxAllowedWeights", []), 158 ("Tempo", []), 159 ("MaxAllowedUids", []), 160 ("Founder", []), 161 ("FounderShare", []), 162 ("IncentiveRatio", []), 163 ("TrustRatio", []), 164 ("SubnetNames", []), 165 ("MaxWeightAge", []), 166 ("BondsMovingAverage", []), 167 ("MaximumSetWeightCallsPerEpoch", []), 168 ("MinValidatorStake", []), 169 ("MaxAllowedValidators", []), 170 ("ModuleBurnConfig", []), 171 ("SubnetMetadata", []), 172 ], 173 "GovernanceModule": [ 174 ("SubnetGovernanceConfig", []), 175 ], 176 "SubnetEmissionModule": [ 177 ("SubnetEmission", []), 178 ], 179 }, 180 block_hash, 181 ) 182 subnet_maps: SubnetParamsMaps = { 183 "netuid_to_emission": bulk_query["SubnetEmission"], 184 "netuid_to_tempo": bulk_query["Tempo"], 185 "netuid_to_min_allowed_weights": bulk_query["MinAllowedWeights"], 186 "netuid_to_max_allowed_weights": bulk_query["MaxAllowedWeights"], 187 "netuid_to_max_allowed_uids": bulk_query["MaxAllowedUids"], 188 "netuid_to_founder": bulk_query["Founder"], 189 "netuid_to_founder_share": bulk_query["FounderShare"], 190 "netuid_to_incentive_ratio": bulk_query["IncentiveRatio"], 191 "netuid_to_trust_ratio": bulk_query["TrustRatio"], 192 "netuid_to_name": bulk_query["SubnetNames"], 193 "netuid_to_max_weight_age": bulk_query["MaxWeightAge"], 194 "netuid_to_bonds_ma": bulk_query.get("BondsMovingAverage", {}), 195 "netuid_to_maximum_set_weight_calls_per_epoch": bulk_query.get( 196 "MaximumSetWeightCallsPerEpoch", {} 197 ), 198 "netuid_to_governance_configuration": bulk_query[ 199 "SubnetGovernanceConfig" 200 ], 201 "netuid_to_immunity_period": bulk_query["ImmunityPeriod"], 202 "netuid_to_min_validator_stake": bulk_query.get( 203 "MinValidatorStake", {} 204 ), 205 "netuid_to_max_allowed_validators": bulk_query.get( 206 "MaxAllowedValidators", {} 207 ), 208 "netuid_to_module_burn_config": bulk_query.get("ModuleBurnConfig", {}), 209 "netuid_to_subnet_metadata": bulk_query.get("SubnetMetadata", {}), 210 } 211 result_subnets: dict[int, SubnetParamsWithEmission] = {} 212 213 for netuid, name in subnet_maps["netuid_to_name"].items(): 214 subnet: SubnetParamsWithEmission = { 215 "name": name, 216 "founder": subnet_maps["netuid_to_founder"][netuid], 217 "founder_share": subnet_maps["netuid_to_founder_share"][netuid], 218 "incentive_ratio": subnet_maps["netuid_to_incentive_ratio"][netuid], 219 "max_allowed_uids": subnet_maps["netuid_to_max_allowed_uids"][ 220 netuid 221 ], 222 "max_allowed_weights": subnet_maps["netuid_to_max_allowed_weights"][ 223 netuid 224 ], 225 "min_allowed_weights": subnet_maps["netuid_to_min_allowed_weights"][ 226 netuid 227 ], 228 "tempo": subnet_maps["netuid_to_tempo"][netuid], 229 "trust_ratio": subnet_maps["netuid_to_trust_ratio"][netuid], 230 "emission": subnet_maps["netuid_to_emission"][netuid], 231 "max_weight_age": subnet_maps["netuid_to_max_weight_age"][netuid], 232 "bonds_ma": subnet_maps["netuid_to_bonds_ma"].get(netuid, None), 233 "maximum_set_weight_calls_per_epoch": subnet_maps[ 234 "netuid_to_maximum_set_weight_calls_per_epoch" 235 ].get(netuid, 30), 236 "governance_config": subnet_maps[ 237 "netuid_to_governance_configuration" 238 ][netuid], 239 "immunity_period": subnet_maps["netuid_to_immunity_period"][netuid], 240 "min_validator_stake": subnet_maps[ 241 "netuid_to_min_validator_stake" 242 ].get(netuid, to_nano(50_000)), 243 "max_allowed_validators": subnet_maps[ 244 "netuid_to_max_allowed_validators" 245 ].get(netuid, 50), 246 "module_burn_config": cast( 247 BurnConfiguration, 248 subnet_maps["netuid_to_module_burn_config"].get(netuid, None), 249 ), 250 "subnet_metadata": subnet_maps["netuid_to_subnet_metadata"].get( 251 netuid, None 252 ), 253 } 254 255 result_subnets[netuid] = subnet 256 257 return result_subnets
Gets all subnets info on the network
260def get_global_params(c_client: CommuneClient) -> NetworkParams: 261 """ 262 Returns global parameters of the whole commune ecosystem 263 """ 264 265 query_all = c_client.query_batch( 266 { 267 "SubspaceModule": [ 268 ("MaxNameLength", []), 269 ("MinNameLength", []), 270 ("MaxAllowedSubnets", []), 271 ("MaxAllowedModules", []), 272 ("MaxRegistrationsPerBlock", []), 273 ("MaxAllowedWeightsGlobal", []), 274 ("FloorDelegationFee", []), 275 ("FloorFounderShare", []), 276 ("MinWeightStake", []), 277 ("Kappa", []), 278 ("Rho", []), 279 ("SubnetImmunityPeriod", []), 280 ("SubnetBurn", []), 281 ], 282 "GovernanceModule": [ 283 ("GlobalGovernanceConfig", []), 284 ("GeneralSubnetApplicationCost", []), 285 ("Curator", []), 286 ], 287 } 288 ) 289 global_config = cast( 290 GovernanceConfiguration, query_all["GlobalGovernanceConfig"] 291 ) 292 global_params: NetworkParams = { 293 "max_allowed_subnets": int(query_all["MaxAllowedSubnets"]), 294 "max_allowed_modules": int(query_all["MaxAllowedModules"]), 295 "max_registrations_per_block": int( 296 query_all["MaxRegistrationsPerBlock"] 297 ), 298 "max_name_length": int(query_all["MaxNameLength"]), 299 "min_weight_stake": int(query_all["MinWeightStake"]), 300 "floor_delegation_fee": int(query_all["FloorDelegationFee"]), 301 "max_allowed_weights": int(query_all["MaxAllowedWeightsGlobal"]), 302 "curator": Ss58Address(query_all["Curator"]), 303 "min_name_length": int(query_all["MinNameLength"]), 304 "floor_founder_share": int(query_all["FloorFounderShare"]), 305 "general_subnet_application_cost": int( 306 query_all["GeneralSubnetApplicationCost"] 307 ), 308 "kappa": int(query_all["Kappa"]), 309 "rho": int(query_all["Rho"]), 310 "subnet_immunity_period": int(query_all["SubnetImmunityPeriod"]), 311 "subnet_registration_cost": int(query_all["SubnetBurn"]), 312 "governance_config": { 313 "proposal_cost": int(global_config["proposal_cost"]), 314 "proposal_expiration": int(global_config["proposal_expiration"]), 315 "vote_mode": global_config["vote_mode"], 316 "proposal_reward_treasury_allocation": int( 317 global_config["proposal_reward_treasury_allocation"] 318 ), 319 "max_proposal_reward_treasury_allocation": int( 320 global_config["max_proposal_reward_treasury_allocation"] 321 ), 322 "proposal_reward_interval": int( 323 global_config["proposal_reward_interval"] 324 ), 325 }, 326 } 327 return global_params
Returns global parameters of the whole commune ecosystem
def
concat_to_local_keys( balance: dict[str, int], local_key_info: dict[str, communex.types.Ss58Address]) -> dict[str, int]:
def
local_keys_to_freebalance( c_client: communex.client.CommuneClient, local_keys: dict[str, communex.types.Ss58Address]) -> dict[str, int]:
341def local_keys_to_freebalance( 342 c_client: CommuneClient, 343 local_keys: dict[str, Ss58Address], 344) -> dict[str, int]: 345 query_all = c_client.query_batch_map( 346 { 347 "System": [("Account", [])], 348 } 349 ) 350 balance_map = query_all["Account"] 351 352 format_balances: dict[str, int] = { 353 key: value["data"]["free"] 354 for key, value in balance_map.items() 355 if "data" in value and "free" in value["data"] 356 } 357 358 key2balance: dict[str, int] = concat_to_local_keys( 359 format_balances, local_keys 360 ) 361 362 return key2balance
def
local_keys_to_stakedbalance( c_client: communex.client.CommuneClient, local_keys: dict[str, communex.types.Ss58Address]) -> dict[str, int]:
365def local_keys_to_stakedbalance( 366 c_client: CommuneClient, 367 local_keys: dict[str, Ss58Address], 368) -> dict[str, int]: 369 staketo_map = c_client.query_map_staketo() 370 371 format_stake: dict[str, int] = { 372 key: sum(stake for _, stake in value) 373 for key, value in staketo_map.items() 374 } 375 376 key2stake: dict[str, int] = concat_to_local_keys(format_stake, local_keys) 377 378 return key2stake
def
local_keys_to_stakedfrom_balance( c_client: communex.client.CommuneClient, local_keys: dict[str, communex.types.Ss58Address]) -> dict[str, int]:
381def local_keys_to_stakedfrom_balance( 382 c_client: CommuneClient, 383 local_keys: dict[str, Ss58Address], 384) -> dict[str, int]: 385 stakefrom_map = c_client.query_map_stakefrom() 386 387 format_stake: dict[str, int] = { 388 key: sum(stake for _, stake in value) 389 for key, value in stakefrom_map.items() 390 } 391 392 key2stake: dict[str, int] = concat_to_local_keys(format_stake, local_keys) 393 key2stake = {key: stake for key, stake in key2stake.items()} 394 return key2stake
def
local_keys_allbalance( c_client: communex.client.CommuneClient, local_keys: dict[str, communex.types.Ss58Address]) -> tuple[dict[str, int], dict[str, int]]:
397def local_keys_allbalance( 398 c_client: CommuneClient, 399 local_keys: dict[str, Ss58Address], 400) -> tuple[dict[str, int], dict[str, int]]: 401 query_all = c_client.query_batch_map( 402 { 403 "System": [("Account", [])], 404 "SubspaceModule": [ 405 ("StakeTo", []), 406 ], 407 } 408 ) 409 410 balance_map, staketo_map = ( 411 query_all["Account"], 412 transform_stake_dmap(query_all["StakeTo"]), 413 ) 414 415 format_balances: dict[str, int] = { 416 key: value["data"]["free"] 417 for key, value in balance_map.items() 418 if "data" in value and "free" in value["data"] 419 } 420 key2balance: dict[str, int] = concat_to_local_keys( 421 format_balances, local_keys 422 ) 423 format_stake: dict[str, int] = { 424 key: sum(stake for _, stake in value) 425 for key, value in staketo_map.items() 426 } 427 428 key2stake: dict[str, int] = concat_to_local_keys(format_stake, local_keys) 429 430 key2balance = { 431 k: v 432 for k, v in sorted( 433 key2balance.items(), key=lambda item: item[1], reverse=True 434 ) 435 } 436 437 key2stake = { 438 k: v 439 for k, v in sorted( 440 key2stake.items(), key=lambda item: item[1], reverse=True 441 ) 442 } 443 444 return key2balance, key2stake