communex.misc
1import re 2from typing import Any, TypeVar, cast 3 4from communex._common import transform_stake_dmap 5from communex.balance import to_nano 6from communex.client import CommuneClient 7from communex.key import check_ss58_address 8from communex.types import ( 9 BurnConfiguration, 10 GovernanceConfiguration, 11 ModuleInfoWithOptionalBalance, 12 NetworkParams, 13 Ss58Address, 14 SubnetParamsMaps, 15 SubnetParamsWithEmission, 16) 17 18IPFS_REGEX = re.compile(r"^Qm[1-9A-HJ-NP-Za-km-z]{44}$") 19 20T = TypeVar("T") 21 22 23def get_map_modules( 24 client: CommuneClient, 25 netuid: int = 0, 26 include_balances: bool = False, 27) -> dict[str, ModuleInfoWithOptionalBalance]: 28 """ 29 Gets all modules info on the network 30 """ 31 32 request_dict: dict[Any, Any] = { 33 "SubspaceModule": [ 34 ("StakeFrom", []), 35 ("Keys", [netuid]), 36 ("Name", [netuid]), 37 ("Address", [netuid]), 38 ("RegistrationBlock", [netuid]), 39 ("ValidatorFeeConfig", []), 40 ("Emission", []), 41 ("Incentive", []), 42 ("Dividends", []), 43 ("LastUpdate", []), 44 ("Metadata", [netuid]), 45 ("StakeTo", []), 46 ], 47 } 48 if include_balances: 49 request_dict["System"] = [("Account", [])] 50 bulk_query = client.query_batch_map(request_dict) 51 ( 52 ss58_to_stakefrom, 53 uid_to_key, 54 uid_to_name, 55 uid_to_address, 56 uid_to_regblock, 57 ss58_to_delegationfee, 58 uid_to_emission, 59 uid_to_incentive, 60 uid_to_dividend, 61 uid_to_lastupdate, 62 ss58_to_balances, 63 ss58_to_metadata, 64 ) = ( 65 bulk_query.get("StakeFrom", {}), 66 bulk_query.get("Keys", {}), 67 bulk_query["Name"], 68 bulk_query["Address"], 69 bulk_query["RegistrationBlock"], 70 bulk_query["ValidatorFeeConfig"], 71 bulk_query["Emission"], 72 bulk_query["Incentive"], 73 bulk_query["Dividends"], 74 bulk_query["LastUpdate"], 75 bulk_query.get("Account", {}), 76 bulk_query.get("Metadata", {}), 77 ) 78 result_modules: dict[str, ModuleInfoWithOptionalBalance] = {} 79 ss58_to_stakefrom = transform_stake_dmap(ss58_to_stakefrom) 80 for uid, key in uid_to_key.items(): 81 key = check_ss58_address(key) 82 name = uid_to_name[uid] 83 address = uid_to_address[uid] 84 emission = uid_to_emission[netuid][uid] 85 incentive = uid_to_incentive[netuid][uid] 86 dividend = uid_to_dividend[netuid][uid] 87 regblock = uid_to_regblock[uid] 88 stake_from = ss58_to_stakefrom.get(key, []) 89 last_update = uid_to_lastupdate[netuid][uid] 90 stake_delegation_fee = ss58_to_delegationfee.get(key, {}).get( 91 "stake_delegation_fee", 0 92 ) 93 validator_weight_fee = ss58_to_delegationfee.get(key, {}).get( 94 "validator_weight_fee", 0 95 ) 96 metadata = ss58_to_metadata.get(key, None) 97 98 balance = None 99 if include_balances and ss58_to_balances is not None: # type: ignore 100 balance_dict = ss58_to_balances.get(key, None) 101 if balance_dict is not None: 102 assert isinstance(balance_dict["data"], dict) 103 balance = balance_dict["data"]["free"] 104 else: 105 balance = 0 106 stake = sum(stake for _, stake in stake_from) 107 108 module: ModuleInfoWithOptionalBalance = { 109 "uid": uid, 110 "key": key, 111 "name": name, 112 "address": address, 113 "emission": emission, 114 "incentive": incentive, 115 "dividends": dividend, 116 "stake_from": stake_from, 117 "regblock": regblock, 118 "last_update": last_update, 119 "balance": balance, 120 "stake": stake, 121 "stake_delegation_fee": stake_delegation_fee, 122 "validator_weight_fee": validator_weight_fee, 123 "metadata": metadata, 124 } 125 126 result_modules[key] = module 127 return result_modules 128 129 130def to_snake_case(d: dict[str, T]) -> dict[str, T]: 131 """ 132 Converts a dictionary with camelCase keys to snake_case keys 133 """ 134 135 def snakerize(camel: str) -> str: 136 return re.sub(r"(?<!^)(?=[A-Z])", "_", camel).lower() 137 138 snaked: dict[str, T] = {snakerize(k): v for k, v in d.items()} 139 return snaked 140 141 142def get_map_displayable_subnets(client: CommuneClient): 143 from communex.cli._common import transform_subnet_params 144 145 subnets = get_map_subnets_params(client) 146 display_values = transform_subnet_params(subnets) 147 return display_values 148 149 150def get_map_subnets_params( 151 client: CommuneClient, block_hash: str | None = None 152) -> dict[int, SubnetParamsWithEmission]: 153 """ 154 Gets all subnets info on the network 155 """ 156 bulk_query = client.query_batch_map( 157 { 158 "SubspaceModule": [ 159 ("ImmunityPeriod", []), 160 ("MinAllowedWeights", []), 161 ("MaxAllowedWeights", []), 162 ("Tempo", []), 163 ("MaxAllowedUids", []), 164 ("Founder", []), 165 ("FounderShare", []), 166 ("IncentiveRatio", []), 167 ("SubnetNames", []), 168 ("MaxWeightAge", []), 169 ("BondsMovingAverage", []), 170 ("MaximumSetWeightCallsPerEpoch", []), 171 ("MinValidatorStake", []), 172 ("MaxAllowedValidators", []), 173 ("ModuleBurnConfig", []), 174 ("SubnetMetadata", []), 175 ("MaxEncryptionPeriod", []), 176 ("CopierMargin", []), 177 ("UseWeightsEncryption", []), 178 ], 179 "GovernanceModule": [ 180 ("SubnetGovernanceConfig", []), 181 ], 182 "SubnetEmissionModule": [ 183 ("SubnetEmission", []), 184 ], 185 }, 186 block_hash, 187 ) 188 subnet_maps: SubnetParamsMaps = { 189 "netuid_to_emission": bulk_query["SubnetEmission"], 190 "netuid_to_tempo": bulk_query["Tempo"], 191 "netuid_to_min_allowed_weights": bulk_query["MinAllowedWeights"], 192 "netuid_to_max_allowed_weights": bulk_query["MaxAllowedWeights"], 193 "netuid_to_max_allowed_uids": bulk_query["MaxAllowedUids"], 194 "netuid_to_founder": bulk_query["Founder"], 195 "netuid_to_founder_share": bulk_query["FounderShare"], 196 "netuid_to_incentive_ratio": bulk_query["IncentiveRatio"], 197 "netuid_to_name": bulk_query["SubnetNames"], 198 "netuid_to_max_weight_age": bulk_query["MaxWeightAge"], 199 "netuid_to_bonds_ma": bulk_query.get("BondsMovingAverage", {}), 200 "netuid_to_maximum_set_weight_calls_per_epoch": bulk_query.get( 201 "MaximumSetWeightCallsPerEpoch", {} 202 ), 203 "netuid_to_governance_configuration": bulk_query[ 204 "SubnetGovernanceConfig" 205 ], 206 "netuid_to_immunity_period": bulk_query["ImmunityPeriod"], 207 "netuid_to_min_validator_stake": bulk_query.get( 208 "MinValidatorStake", {} 209 ), 210 "netuid_to_max_allowed_validators": bulk_query.get( 211 "MaxAllowedValidators", {} 212 ), 213 "netuid_to_module_burn_config": bulk_query.get("ModuleBurnConfig", {}), 214 "netuid_to_subnet_metadata": bulk_query.get("SubnetMetadata", {}), 215 "netuid_to_max_encryption_period": bulk_query.get( 216 "MaxEncryptionPeriod", {} 217 ), 218 "netuid_to_copier_margin": bulk_query.get("CopierMargin", {}), 219 "netuid_to_use_weights_encryption": bulk_query.get( 220 "UseWeightsEncryption", {} 221 ), 222 } 223 result_subnets: dict[int, SubnetParamsWithEmission] = {} 224 225 for netuid, name in subnet_maps["netuid_to_name"].items(): 226 subnet: SubnetParamsWithEmission = { 227 "name": name, 228 "founder": subnet_maps["netuid_to_founder"][netuid], 229 "founder_share": subnet_maps["netuid_to_founder_share"][netuid], 230 "incentive_ratio": subnet_maps["netuid_to_incentive_ratio"][netuid], 231 "max_allowed_uids": subnet_maps["netuid_to_max_allowed_uids"][ 232 netuid 233 ], 234 "max_allowed_weights": subnet_maps["netuid_to_max_allowed_weights"][ 235 netuid 236 ], 237 "min_allowed_weights": subnet_maps["netuid_to_min_allowed_weights"][ 238 netuid 239 ], 240 "tempo": subnet_maps["netuid_to_tempo"][netuid], 241 "emission": subnet_maps["netuid_to_emission"][netuid], 242 "max_weight_age": subnet_maps["netuid_to_max_weight_age"][netuid], 243 "bonds_ma": subnet_maps["netuid_to_bonds_ma"].get(netuid, None), 244 "maximum_set_weight_calls_per_epoch": subnet_maps[ 245 "netuid_to_maximum_set_weight_calls_per_epoch" 246 ].get(netuid, 30), 247 "governance_config": subnet_maps[ 248 "netuid_to_governance_configuration" 249 ][netuid], 250 "immunity_period": subnet_maps["netuid_to_immunity_period"][netuid], 251 "min_validator_stake": subnet_maps[ 252 "netuid_to_min_validator_stake" 253 ].get(netuid, to_nano(50_000)), 254 "max_allowed_validators": subnet_maps[ 255 "netuid_to_max_allowed_validators" 256 ].get(netuid, 50), 257 "module_burn_config": cast( 258 BurnConfiguration, 259 subnet_maps["netuid_to_module_burn_config"].get(netuid, None), 260 ), 261 "subnet_metadata": subnet_maps["netuid_to_subnet_metadata"].get( 262 netuid, None 263 ), 264 "max_encryption_period": subnet_maps[ 265 "netuid_to_max_encryption_period" 266 ].get(netuid, 0), 267 "copier_margin": subnet_maps["netuid_to_copier_margin"].get( 268 netuid, 0 269 ), 270 "use_weights_encryption": subnet_maps[ 271 "netuid_to_use_weights_encryption" 272 ].get(netuid, 0), 273 } 274 275 result_subnets[netuid] = subnet 276 277 return result_subnets 278 279 280def get_global_params(c_client: CommuneClient) -> NetworkParams: 281 """ 282 Returns global parameters of the whole commune ecosystem 283 """ 284 285 query_all = c_client.query_batch( 286 { 287 "SubspaceModule": [ 288 ("MaxNameLength", []), 289 ("MinNameLength", []), 290 ("MaxAllowedSubnets", []), 291 ("MaxAllowedModules", []), 292 ("MaxRegistrationsPerBlock", []), 293 ("MaxAllowedWeightsGlobal", []), 294 ("FloorDelegationFee", []), 295 ("FloorFounderShare", []), 296 ("MinWeightStake", []), 297 ("Kappa", []), 298 ("Rho", []), 299 ("SubnetImmunityPeriod", []), 300 ("SubnetBurn", []), 301 ], 302 "GovernanceModule": [ 303 ("GlobalGovernanceConfig", []), 304 ("GeneralSubnetApplicationCost", []), 305 ("Curator", []), 306 ], 307 } 308 ) 309 global_config = cast( 310 GovernanceConfiguration, query_all["GlobalGovernanceConfig"] 311 ) 312 global_params: NetworkParams = { 313 "max_allowed_subnets": int(query_all["MaxAllowedSubnets"]), 314 "max_allowed_modules": int(query_all["MaxAllowedModules"]), 315 "max_registrations_per_block": int( 316 query_all["MaxRegistrationsPerBlock"] 317 ), 318 "max_name_length": int(query_all["MaxNameLength"]), 319 "min_weight_stake": int(query_all["MinWeightStake"]), 320 "floor_delegation_fee": int(query_all["FloorDelegationFee"]), 321 "max_allowed_weights": int(query_all["MaxAllowedWeightsGlobal"]), 322 "curator": Ss58Address(query_all["Curator"]), 323 "min_name_length": int(query_all["MinNameLength"]), 324 "floor_founder_share": int(query_all["FloorFounderShare"]), 325 "general_subnet_application_cost": int( 326 query_all["GeneralSubnetApplicationCost"] 327 ), 328 "kappa": int(query_all["Kappa"]), 329 "rho": int(query_all["Rho"]), 330 "subnet_immunity_period": int(query_all["SubnetImmunityPeriod"]), 331 "subnet_registration_cost": int(query_all["SubnetBurn"]), 332 "governance_config": { 333 "proposal_cost": int(global_config["proposal_cost"]), 334 "proposal_expiration": int(global_config["proposal_expiration"]), 335 "vote_mode": global_config["vote_mode"], 336 "proposal_reward_treasury_allocation": int( 337 global_config["proposal_reward_treasury_allocation"] 338 ), 339 "max_proposal_reward_treasury_allocation": int( 340 global_config["max_proposal_reward_treasury_allocation"] 341 ), 342 "proposal_reward_interval": int( 343 global_config["proposal_reward_interval"] 344 ), 345 }, 346 } 347 return global_params 348 349 350def concat_to_local_keys( 351 balance: dict[str, int], local_key_info: dict[str, Ss58Address] 352) -> dict[str, int]: 353 key2: dict[str, int] = { 354 key_name: balance.get(key_address, 0) 355 for key_name, key_address in local_key_info.items() 356 } 357 358 return key2 359 360 361def local_keys_to_freebalance( 362 c_client: CommuneClient, 363 local_keys: dict[str, Ss58Address], 364) -> dict[str, int]: 365 query_all = c_client.query_batch_map( 366 { 367 "System": [("Account", [])], 368 } 369 ) 370 balance_map = query_all["Account"] 371 372 format_balances: dict[str, int] = { 373 key: value["data"]["free"] 374 for key, value in balance_map.items() 375 if "data" in value and "free" in value["data"] 376 } 377 378 key2balance: dict[str, int] = concat_to_local_keys( 379 format_balances, local_keys 380 ) 381 382 return key2balance 383 384 385def local_keys_to_stakedbalance( 386 c_client: CommuneClient, 387 local_keys: dict[str, Ss58Address], 388) -> dict[str, int]: 389 staketo_map = c_client.query_map_staketo() 390 391 format_stake: dict[str, int] = { 392 key: sum(stake for _, stake in value) 393 for key, value in staketo_map.items() 394 } 395 396 key2stake: dict[str, int] = concat_to_local_keys(format_stake, local_keys) 397 398 return key2stake 399 400 401def local_keys_to_stakedfrom_balance( 402 c_client: CommuneClient, 403 local_keys: dict[str, Ss58Address], 404) -> dict[str, int]: 405 stakefrom_map = c_client.query_map_stakefrom() 406 407 format_stake: dict[str, int] = { 408 key: sum(stake for _, stake in value) 409 for key, value in stakefrom_map.items() 410 } 411 412 key2stake: dict[str, int] = concat_to_local_keys(format_stake, local_keys) 413 key2stake = {key: stake for key, stake in key2stake.items()} 414 return key2stake 415 416 417def local_keys_allbalance( 418 c_client: CommuneClient, 419 local_keys: dict[str, Ss58Address], 420) -> tuple[dict[str, int], dict[str, int]]: 421 query_all = c_client.query_batch_map( 422 { 423 "System": [("Account", [])], 424 "SubspaceModule": [ 425 ("StakeTo", []), 426 ], 427 } 428 ) 429 430 balance_map, staketo_map = ( 431 query_all["Account"], 432 transform_stake_dmap(query_all["StakeTo"]), 433 ) 434 435 format_balances: dict[str, int] = { 436 key: value["data"]["free"] 437 for key, value in balance_map.items() 438 if "data" in value and "free" in value["data"] 439 } 440 key2balance: dict[str, int] = concat_to_local_keys( 441 format_balances, local_keys 442 ) 443 format_stake: dict[str, int] = { 444 key: sum(stake for _, stake in value) 445 for key, value in staketo_map.items() 446 } 447 448 key2stake: dict[str, int] = concat_to_local_keys(format_stake, local_keys) 449 450 key2balance = { 451 k: v 452 for k, v in sorted( 453 key2balance.items(), key=lambda item: item[1], reverse=True 454 ) 455 } 456 457 key2stake = { 458 k: v 459 for k, v in sorted( 460 key2stake.items(), key=lambda item: item[1], reverse=True 461 ) 462 } 463 464 return key2balance, key2stake 465 466 467if __name__ == "__main__": 468 from communex._common import get_node_url 469 470 client = CommuneClient(get_node_url(use_testnet=True)) 471 get_global_params(client)
IPFS_REGEX =
re.compile('^Qm[1-9A-HJ-NP-Za-km-z]{44}$')
def
get_map_modules( client: communex.client.CommuneClient, netuid: int = 0, include_balances: bool = False) -> dict[str, communex.types.ModuleInfoWithOptionalBalance]:
24def get_map_modules( 25 client: CommuneClient, 26 netuid: int = 0, 27 include_balances: bool = False, 28) -> dict[str, ModuleInfoWithOptionalBalance]: 29 """ 30 Gets all modules info on the network 31 """ 32 33 request_dict: dict[Any, Any] = { 34 "SubspaceModule": [ 35 ("StakeFrom", []), 36 ("Keys", [netuid]), 37 ("Name", [netuid]), 38 ("Address", [netuid]), 39 ("RegistrationBlock", [netuid]), 40 ("ValidatorFeeConfig", []), 41 ("Emission", []), 42 ("Incentive", []), 43 ("Dividends", []), 44 ("LastUpdate", []), 45 ("Metadata", [netuid]), 46 ("StakeTo", []), 47 ], 48 } 49 if include_balances: 50 request_dict["System"] = [("Account", [])] 51 bulk_query = client.query_batch_map(request_dict) 52 ( 53 ss58_to_stakefrom, 54 uid_to_key, 55 uid_to_name, 56 uid_to_address, 57 uid_to_regblock, 58 ss58_to_delegationfee, 59 uid_to_emission, 60 uid_to_incentive, 61 uid_to_dividend, 62 uid_to_lastupdate, 63 ss58_to_balances, 64 ss58_to_metadata, 65 ) = ( 66 bulk_query.get("StakeFrom", {}), 67 bulk_query.get("Keys", {}), 68 bulk_query["Name"], 69 bulk_query["Address"], 70 bulk_query["RegistrationBlock"], 71 bulk_query["ValidatorFeeConfig"], 72 bulk_query["Emission"], 73 bulk_query["Incentive"], 74 bulk_query["Dividends"], 75 bulk_query["LastUpdate"], 76 bulk_query.get("Account", {}), 77 bulk_query.get("Metadata", {}), 78 ) 79 result_modules: dict[str, ModuleInfoWithOptionalBalance] = {} 80 ss58_to_stakefrom = transform_stake_dmap(ss58_to_stakefrom) 81 for uid, key in uid_to_key.items(): 82 key = check_ss58_address(key) 83 name = uid_to_name[uid] 84 address = uid_to_address[uid] 85 emission = uid_to_emission[netuid][uid] 86 incentive = uid_to_incentive[netuid][uid] 87 dividend = uid_to_dividend[netuid][uid] 88 regblock = uid_to_regblock[uid] 89 stake_from = ss58_to_stakefrom.get(key, []) 90 last_update = uid_to_lastupdate[netuid][uid] 91 stake_delegation_fee = ss58_to_delegationfee.get(key, {}).get( 92 "stake_delegation_fee", 0 93 ) 94 validator_weight_fee = ss58_to_delegationfee.get(key, {}).get( 95 "validator_weight_fee", 0 96 ) 97 metadata = ss58_to_metadata.get(key, None) 98 99 balance = None 100 if include_balances and ss58_to_balances is not None: # type: ignore 101 balance_dict = ss58_to_balances.get(key, None) 102 if balance_dict is not None: 103 assert isinstance(balance_dict["data"], dict) 104 balance = balance_dict["data"]["free"] 105 else: 106 balance = 0 107 stake = sum(stake for _, stake in stake_from) 108 109 module: ModuleInfoWithOptionalBalance = { 110 "uid": uid, 111 "key": key, 112 "name": name, 113 "address": address, 114 "emission": emission, 115 "incentive": incentive, 116 "dividends": dividend, 117 "stake_from": stake_from, 118 "regblock": regblock, 119 "last_update": last_update, 120 "balance": balance, 121 "stake": stake, 122 "stake_delegation_fee": stake_delegation_fee, 123 "validator_weight_fee": validator_weight_fee, 124 "metadata": metadata, 125 } 126 127 result_modules[key] = module 128 return result_modules
Gets all modules info on the network
def
to_snake_case(d: dict[str, ~T]) -> dict[str, ~T]:
131def to_snake_case(d: dict[str, T]) -> dict[str, T]: 132 """ 133 Converts a dictionary with camelCase keys to snake_case keys 134 """ 135 136 def snakerize(camel: str) -> str: 137 return re.sub(r"(?<!^)(?=[A-Z])", "_", camel).lower() 138 139 snaked: dict[str, T] = {snakerize(k): v for k, v in d.items()} 140 return snaked
Converts a dictionary with camelCase keys to snake_case keys
def
get_map_subnets_params( client: communex.client.CommuneClient, block_hash: str | None = None) -> dict[int, communex.types.SubnetParamsWithEmission]:
151def get_map_subnets_params( 152 client: CommuneClient, block_hash: str | None = None 153) -> dict[int, SubnetParamsWithEmission]: 154 """ 155 Gets all subnets info on the network 156 """ 157 bulk_query = client.query_batch_map( 158 { 159 "SubspaceModule": [ 160 ("ImmunityPeriod", []), 161 ("MinAllowedWeights", []), 162 ("MaxAllowedWeights", []), 163 ("Tempo", []), 164 ("MaxAllowedUids", []), 165 ("Founder", []), 166 ("FounderShare", []), 167 ("IncentiveRatio", []), 168 ("SubnetNames", []), 169 ("MaxWeightAge", []), 170 ("BondsMovingAverage", []), 171 ("MaximumSetWeightCallsPerEpoch", []), 172 ("MinValidatorStake", []), 173 ("MaxAllowedValidators", []), 174 ("ModuleBurnConfig", []), 175 ("SubnetMetadata", []), 176 ("MaxEncryptionPeriod", []), 177 ("CopierMargin", []), 178 ("UseWeightsEncryption", []), 179 ], 180 "GovernanceModule": [ 181 ("SubnetGovernanceConfig", []), 182 ], 183 "SubnetEmissionModule": [ 184 ("SubnetEmission", []), 185 ], 186 }, 187 block_hash, 188 ) 189 subnet_maps: SubnetParamsMaps = { 190 "netuid_to_emission": bulk_query["SubnetEmission"], 191 "netuid_to_tempo": bulk_query["Tempo"], 192 "netuid_to_min_allowed_weights": bulk_query["MinAllowedWeights"], 193 "netuid_to_max_allowed_weights": bulk_query["MaxAllowedWeights"], 194 "netuid_to_max_allowed_uids": bulk_query["MaxAllowedUids"], 195 "netuid_to_founder": bulk_query["Founder"], 196 "netuid_to_founder_share": bulk_query["FounderShare"], 197 "netuid_to_incentive_ratio": bulk_query["IncentiveRatio"], 198 "netuid_to_name": bulk_query["SubnetNames"], 199 "netuid_to_max_weight_age": bulk_query["MaxWeightAge"], 200 "netuid_to_bonds_ma": bulk_query.get("BondsMovingAverage", {}), 201 "netuid_to_maximum_set_weight_calls_per_epoch": bulk_query.get( 202 "MaximumSetWeightCallsPerEpoch", {} 203 ), 204 "netuid_to_governance_configuration": bulk_query[ 205 "SubnetGovernanceConfig" 206 ], 207 "netuid_to_immunity_period": bulk_query["ImmunityPeriod"], 208 "netuid_to_min_validator_stake": bulk_query.get( 209 "MinValidatorStake", {} 210 ), 211 "netuid_to_max_allowed_validators": bulk_query.get( 212 "MaxAllowedValidators", {} 213 ), 214 "netuid_to_module_burn_config": bulk_query.get("ModuleBurnConfig", {}), 215 "netuid_to_subnet_metadata": bulk_query.get("SubnetMetadata", {}), 216 "netuid_to_max_encryption_period": bulk_query.get( 217 "MaxEncryptionPeriod", {} 218 ), 219 "netuid_to_copier_margin": bulk_query.get("CopierMargin", {}), 220 "netuid_to_use_weights_encryption": bulk_query.get( 221 "UseWeightsEncryption", {} 222 ), 223 } 224 result_subnets: dict[int, SubnetParamsWithEmission] = {} 225 226 for netuid, name in subnet_maps["netuid_to_name"].items(): 227 subnet: SubnetParamsWithEmission = { 228 "name": name, 229 "founder": subnet_maps["netuid_to_founder"][netuid], 230 "founder_share": subnet_maps["netuid_to_founder_share"][netuid], 231 "incentive_ratio": subnet_maps["netuid_to_incentive_ratio"][netuid], 232 "max_allowed_uids": subnet_maps["netuid_to_max_allowed_uids"][ 233 netuid 234 ], 235 "max_allowed_weights": subnet_maps["netuid_to_max_allowed_weights"][ 236 netuid 237 ], 238 "min_allowed_weights": subnet_maps["netuid_to_min_allowed_weights"][ 239 netuid 240 ], 241 "tempo": subnet_maps["netuid_to_tempo"][netuid], 242 "emission": subnet_maps["netuid_to_emission"][netuid], 243 "max_weight_age": subnet_maps["netuid_to_max_weight_age"][netuid], 244 "bonds_ma": subnet_maps["netuid_to_bonds_ma"].get(netuid, None), 245 "maximum_set_weight_calls_per_epoch": subnet_maps[ 246 "netuid_to_maximum_set_weight_calls_per_epoch" 247 ].get(netuid, 30), 248 "governance_config": subnet_maps[ 249 "netuid_to_governance_configuration" 250 ][netuid], 251 "immunity_period": subnet_maps["netuid_to_immunity_period"][netuid], 252 "min_validator_stake": subnet_maps[ 253 "netuid_to_min_validator_stake" 254 ].get(netuid, to_nano(50_000)), 255 "max_allowed_validators": subnet_maps[ 256 "netuid_to_max_allowed_validators" 257 ].get(netuid, 50), 258 "module_burn_config": cast( 259 BurnConfiguration, 260 subnet_maps["netuid_to_module_burn_config"].get(netuid, None), 261 ), 262 "subnet_metadata": subnet_maps["netuid_to_subnet_metadata"].get( 263 netuid, None 264 ), 265 "max_encryption_period": subnet_maps[ 266 "netuid_to_max_encryption_period" 267 ].get(netuid, 0), 268 "copier_margin": subnet_maps["netuid_to_copier_margin"].get( 269 netuid, 0 270 ), 271 "use_weights_encryption": subnet_maps[ 272 "netuid_to_use_weights_encryption" 273 ].get(netuid, 0), 274 } 275 276 result_subnets[netuid] = subnet 277 278 return result_subnets
Gets all subnets info on the network
281def get_global_params(c_client: CommuneClient) -> NetworkParams: 282 """ 283 Returns global parameters of the whole commune ecosystem 284 """ 285 286 query_all = c_client.query_batch( 287 { 288 "SubspaceModule": [ 289 ("MaxNameLength", []), 290 ("MinNameLength", []), 291 ("MaxAllowedSubnets", []), 292 ("MaxAllowedModules", []), 293 ("MaxRegistrationsPerBlock", []), 294 ("MaxAllowedWeightsGlobal", []), 295 ("FloorDelegationFee", []), 296 ("FloorFounderShare", []), 297 ("MinWeightStake", []), 298 ("Kappa", []), 299 ("Rho", []), 300 ("SubnetImmunityPeriod", []), 301 ("SubnetBurn", []), 302 ], 303 "GovernanceModule": [ 304 ("GlobalGovernanceConfig", []), 305 ("GeneralSubnetApplicationCost", []), 306 ("Curator", []), 307 ], 308 } 309 ) 310 global_config = cast( 311 GovernanceConfiguration, query_all["GlobalGovernanceConfig"] 312 ) 313 global_params: NetworkParams = { 314 "max_allowed_subnets": int(query_all["MaxAllowedSubnets"]), 315 "max_allowed_modules": int(query_all["MaxAllowedModules"]), 316 "max_registrations_per_block": int( 317 query_all["MaxRegistrationsPerBlock"] 318 ), 319 "max_name_length": int(query_all["MaxNameLength"]), 320 "min_weight_stake": int(query_all["MinWeightStake"]), 321 "floor_delegation_fee": int(query_all["FloorDelegationFee"]), 322 "max_allowed_weights": int(query_all["MaxAllowedWeightsGlobal"]), 323 "curator": Ss58Address(query_all["Curator"]), 324 "min_name_length": int(query_all["MinNameLength"]), 325 "floor_founder_share": int(query_all["FloorFounderShare"]), 326 "general_subnet_application_cost": int( 327 query_all["GeneralSubnetApplicationCost"] 328 ), 329 "kappa": int(query_all["Kappa"]), 330 "rho": int(query_all["Rho"]), 331 "subnet_immunity_period": int(query_all["SubnetImmunityPeriod"]), 332 "subnet_registration_cost": int(query_all["SubnetBurn"]), 333 "governance_config": { 334 "proposal_cost": int(global_config["proposal_cost"]), 335 "proposal_expiration": int(global_config["proposal_expiration"]), 336 "vote_mode": global_config["vote_mode"], 337 "proposal_reward_treasury_allocation": int( 338 global_config["proposal_reward_treasury_allocation"] 339 ), 340 "max_proposal_reward_treasury_allocation": int( 341 global_config["max_proposal_reward_treasury_allocation"] 342 ), 343 "proposal_reward_interval": int( 344 global_config["proposal_reward_interval"] 345 ), 346 }, 347 } 348 return global_params
Returns global parameters of the whole commune ecosystem
def
concat_to_local_keys( balance: dict[str, int], local_key_info: dict[str, communex.types.Ss58Address]) -> dict[str, int]:
def
local_keys_to_freebalance( c_client: communex.client.CommuneClient, local_keys: dict[str, communex.types.Ss58Address]) -> dict[str, int]:
362def local_keys_to_freebalance( 363 c_client: CommuneClient, 364 local_keys: dict[str, Ss58Address], 365) -> dict[str, int]: 366 query_all = c_client.query_batch_map( 367 { 368 "System": [("Account", [])], 369 } 370 ) 371 balance_map = query_all["Account"] 372 373 format_balances: dict[str, int] = { 374 key: value["data"]["free"] 375 for key, value in balance_map.items() 376 if "data" in value and "free" in value["data"] 377 } 378 379 key2balance: dict[str, int] = concat_to_local_keys( 380 format_balances, local_keys 381 ) 382 383 return key2balance
def
local_keys_to_stakedbalance( c_client: communex.client.CommuneClient, local_keys: dict[str, communex.types.Ss58Address]) -> dict[str, int]:
386def local_keys_to_stakedbalance( 387 c_client: CommuneClient, 388 local_keys: dict[str, Ss58Address], 389) -> dict[str, int]: 390 staketo_map = c_client.query_map_staketo() 391 392 format_stake: dict[str, int] = { 393 key: sum(stake for _, stake in value) 394 for key, value in staketo_map.items() 395 } 396 397 key2stake: dict[str, int] = concat_to_local_keys(format_stake, local_keys) 398 399 return key2stake
def
local_keys_to_stakedfrom_balance( c_client: communex.client.CommuneClient, local_keys: dict[str, communex.types.Ss58Address]) -> dict[str, int]:
402def local_keys_to_stakedfrom_balance( 403 c_client: CommuneClient, 404 local_keys: dict[str, Ss58Address], 405) -> dict[str, int]: 406 stakefrom_map = c_client.query_map_stakefrom() 407 408 format_stake: dict[str, int] = { 409 key: sum(stake for _, stake in value) 410 for key, value in stakefrom_map.items() 411 } 412 413 key2stake: dict[str, int] = concat_to_local_keys(format_stake, local_keys) 414 key2stake = {key: stake for key, stake in key2stake.items()} 415 return key2stake
def
local_keys_allbalance( c_client: communex.client.CommuneClient, local_keys: dict[str, communex.types.Ss58Address]) -> tuple[dict[str, int], dict[str, int]]:
418def local_keys_allbalance( 419 c_client: CommuneClient, 420 local_keys: dict[str, Ss58Address], 421) -> tuple[dict[str, int], dict[str, int]]: 422 query_all = c_client.query_batch_map( 423 { 424 "System": [("Account", [])], 425 "SubspaceModule": [ 426 ("StakeTo", []), 427 ], 428 } 429 ) 430 431 balance_map, staketo_map = ( 432 query_all["Account"], 433 transform_stake_dmap(query_all["StakeTo"]), 434 ) 435 436 format_balances: dict[str, int] = { 437 key: value["data"]["free"] 438 for key, value in balance_map.items() 439 if "data" in value and "free" in value["data"] 440 } 441 key2balance: dict[str, int] = concat_to_local_keys( 442 format_balances, local_keys 443 ) 444 format_stake: dict[str, int] = { 445 key: sum(stake for _, stake in value) 446 for key, value in staketo_map.items() 447 } 448 449 key2stake: dict[str, int] = concat_to_local_keys(format_stake, local_keys) 450 451 key2balance = { 452 k: v 453 for k, v in sorted( 454 key2balance.items(), key=lambda item: item[1], reverse=True 455 ) 456 } 457 458 key2stake = { 459 k: v 460 for k, v in sorted( 461 key2stake.items(), key=lambda item: item[1], reverse=True 462 ) 463 } 464 465 return key2balance, key2stake