communex.cli.subnet
1import re 2from typing import Any, cast 3 4import typer 5from typer import Context 6 7from communex.balance import from_nano 8from communex.cli._common import (make_custom_context, 9 print_table_from_plain_dict, 10 print_table_standardize) 11from communex.compat.key import resolve_key_ss58, try_classic_load_key 12from communex.errors import ChainTransactionError 13from communex.misc import IPFS_REGEX, get_map_subnets_params 14from communex.types import SubnetParams, VoteMode 15 16subnet_app = typer.Typer(no_args_is_help=True) 17 18 19@subnet_app.command() 20def list(ctx: Context): 21 """ 22 Gets subnets. 23 """ 24 context = make_custom_context(ctx) 25 client = context.com_client() 26 27 with context.progress_status("Getting subnets ..."): 28 subnets = get_map_subnets_params(client) 29 30 keys, values = subnets.keys(), subnets.values() 31 subnets_with_netuids = [ 32 {"netuid": key, **value} for key, value in zip(keys, values) 33 ] 34 35 for subnet_dict in subnets_with_netuids: # type: ignore 36 bonds = subnet_dict["bonds_ma"] # type: ignore 37 if bonds: 38 subnet_dict["bonds_ma"] = str( 39 from_nano(subnet_dict["bonds_ma"])) + " J" # type: ignore 40 41 for dict in subnets_with_netuids: # type: ignore 42 print_table_from_plain_dict( 43 dict, ["Params", "Values"], context.console) # type: ignore 44 45 46@subnet_app.command() 47def distribution(ctx: Context): 48 context = make_custom_context(ctx) 49 client = context.com_client() 50 51 with context.progress_status("Getting emission distribution..."): 52 subnets_emission = client.query_map_subnet_emission() 53 subnet_consensus = client.query_map_subnet_consensus() 54 subnet_names = client.query_map_subnet_names() 55 total_emission = sum(subnets_emission.values()) 56 subnet_emission_percentages = { 57 key: value / total_emission * 100 for key, value in subnets_emission.items() 58 } 59 60 # Prepare the data for the table 61 table_data: dict[str, Any] = { 62 "Subnet": [], 63 "Name": [], 64 "Consensus": [], 65 "Emission %": [] 66 } 67 68 for subnet, emission_percentage in subnet_emission_percentages.items(): 69 if emission_percentage > 0: 70 table_data["Subnet"].append(str(subnet)) 71 table_data["Name"].append(subnet_names.get(subnet, "N/A")) 72 table_data["Consensus"].append(subnet_consensus.get(subnet, "N/A")) 73 table_data["Emission %"].append(f"{round(emission_percentage, 2)}%") 74 75 print_table_standardize(table_data, context.console) 76 77 78@subnet_app.command() 79def legit_whitelist(ctx: Context): 80 """ 81 Gets the legitimate whitelist of modules for the general subnet 0 82 """ 83 84 context = make_custom_context(ctx) 85 client = context.com_client() 86 87 with context.progress_status("Getting legitimate whitelist ..."): 88 whitelist = cast(dict[str, int], client.query_map_legit_whitelist()) 89 90 print_table_from_plain_dict( 91 whitelist, ["Module", "Recommended weight"], context.console 92 ) 93 94 95@subnet_app.command() 96def info(ctx: Context, netuid: int): 97 """ 98 Gets subnet info. 99 """ 100 context = make_custom_context(ctx) 101 client = context.com_client() 102 103 with context.progress_status(f"Getting subnet with netuid '{netuid}'..."): 104 105 subnets = get_map_subnets_params(client) 106 subnet = subnets.get(netuid, None) 107 108 if subnet is None: 109 raise ValueError("Subnet not found") 110 111 general_subnet: dict[str, Any] = cast(dict[str, Any], subnet) 112 print_table_from_plain_dict( 113 general_subnet, ["Params", "Values"], context.console) 114 115 116@subnet_app.command() 117def register( 118 ctx: Context, 119 key: str, 120 name: str, 121 metadata: str = typer.Option(None) 122): 123 """ 124 Registers a new subnet. 125 """ 126 context = make_custom_context(ctx) 127 resolved_key = try_classic_load_key(key) 128 client = context.com_client() 129 130 with context.progress_status("Registering subnet ..."): 131 response = client.register_subnet(resolved_key, name, metadata) 132 133 if response.is_success: 134 context.info(f"Successfully registered subnet {name}") 135 else: 136 raise ChainTransactionError(response.error_message) # type: ignore 137 138 139@subnet_app.command() 140def update( 141 ctx: Context, 142 key: str, 143 netuid: int, 144 founder: str = typer.Option(None), 145 founder_share: int = typer.Option(None), 146 name: str = typer.Option(None), 147 metadata: str = typer.Option(None), 148 immunity_period: int = typer.Option(None), 149 incentive_ratio: int = typer.Option(None), 150 max_allowed_uids: int = typer.Option(None), 151 max_allowed_weights: int = typer.Option(None), 152 min_allowed_weights: int = typer.Option(None), 153 max_weight_age: float = typer.Option(None), 154 tempo: int = typer.Option(None), 155 trust_ratio: int = typer.Option(None), 156 maximum_set_weight_calls_per_epoch: int = typer.Option(None), 157 158 # GovernanceConfiguration 159 vote_mode: VoteMode = typer.Option(None), 160 161 bonds_ma: int = typer.Option(None), 162 163 # BurnConfiguration 164 min_burn: int = typer.Option(None), 165 max_burn: int = typer.Option(None), 166 adjustment_alpha: int = typer.Option(None), 167 target_registrations_interval: int = typer.Option(None), 168 target_registrations_per_interval: int = typer.Option(None), 169 max_registrations_per_interval: int = typer.Option(None), 170 171 min_validator_stake: int = typer.Option(None), 172 max_allowed_validators: int = typer.Option(None), 173): 174 """ 175 Updates a subnet. 176 """ 177 provided_params = locals().copy() 178 provided_params.pop("ctx") 179 provided_params.pop("key") 180 provided_params.pop("netuid") 181 182 provided_params = { 183 key: value for key, value in provided_params.items() if value is not None 184 } 185 if vote_mode is not None: # type: ignore 186 provided_params["vote_mode"] = vote_mode.value 187 context = make_custom_context(ctx) 188 client = context.com_client() 189 subnets_info = get_map_subnets_params(client) 190 subnet_params = subnets_info[netuid] 191 subnet_vote_mode = subnet_params["governance_config"]["vote_mode"] # type: ignore 192 subnet_burn_config = subnet_params["module_burn_config"] # type: ignore 193 # intersection update 194 for param, value in provided_params.items(): 195 if param in subnet_burn_config and value is not None: 196 subnet_burn_config[param] = value 197 198 subnet_params = dict(subnet_params) 199 subnet_params.pop("emission") 200 subnet_params.pop("governance_config") 201 subnet_params["vote_mode"] = subnet_vote_mode # type: ignore 202 203 subnet_params = cast(SubnetParams, subnet_params) 204 provided_params = cast(SubnetParams, provided_params) 205 subnet_params.update(provided_params) 206 # because bonds_ma and maximum_set_weights dont have a default value 207 if subnet_params.get("bonds_ma", None) is None: 208 subnet_params["bonds_ma"] = client.query("BondsMovingAverage") 209 if subnet_params.get("maximum_set_weight_calls_per_epoch", None) is None: 210 subnet_params["maximum_set_weight_calls_per_epoch"] = client.query( 211 "MaximumSetWeightCallsPerEpoch" 212 ) 213 resolved_key = try_classic_load_key(key) 214 with context.progress_status("Updating subnet ..."): 215 response = client.update_subnet( 216 key=resolved_key, params=subnet_params, netuid=netuid 217 ) 218 219 if response.is_success: 220 context.info( 221 f"Successfully updated subnet {subnet_params['name']} with netuid {netuid}" 222 ) 223 else: 224 raise ChainTransactionError(response.error_message) # type: ignore 225 226 227@subnet_app.command() 228def propose_on_subnet( 229 ctx: Context, 230 key: str, 231 netuid: int, 232 cid: str, 233 founder: str = typer.Option(None), 234 founder_share: int = typer.Option(None), 235 metadata: str = typer.Option(None), 236 name: str = typer.Option(None), 237 immunity_period: int = typer.Option(None), 238 incentive_ratio: int = typer.Option(None), 239 max_allowed_uids: int = typer.Option(None), 240 max_allowed_weights: int = typer.Option(None), 241 min_allowed_weights: int = typer.Option(None), 242 max_weight_age: int = typer.Option(None), 243 tempo: int = typer.Option(None), 244 trust_ratio: int = typer.Option(None), 245 maximum_set_weight_calls_per_epoch: int = typer.Option(None), 246 bonds_ma: int = typer.Option(None), 247 248 vote_mode: VoteMode = typer.Option(None, help="0 for Authority, 1 for Vote"), 249 250 # BurnConfiguration 251 min_burn: int = typer.Option(None), 252 max_burn: int = typer.Option(None), 253 adjustment_alpha: int = typer.Option(None), 254 target_registrations_interval: int = typer.Option(None), 255 target_registrations_per_interval: int = typer.Option(None), 256 max_registrations_per_interval: int = typer.Option(None), 257 258 min_validator_stake: int = typer.Option(None), 259 max_allowed_validators: int = typer.Option(None), 260): 261 """ 262 Adds a proposal to a specific subnet. 263 """ 264 context = make_custom_context(ctx) 265 if not re.match(IPFS_REGEX, cid): 266 context.error(f"CID provided is invalid: {cid}") 267 exit(1) 268 else: 269 ipfs_prefix = "ipfs://" 270 cid = ipfs_prefix + cid 271 272 provided_params = locals().copy() 273 provided_params.pop("ctx") 274 provided_params.pop("context") 275 provided_params.pop("key") 276 provided_params.pop("ipfs_prefix") 277 if provided_params["founder"] is not None: 278 resolve_founder = resolve_key_ss58(founder) 279 provided_params["founder"] = resolve_founder 280 281 provided_params = { 282 key: value for key, value in provided_params.items() if value is not None 283 } 284 285 client = context.com_client() 286 subnets_info = get_map_subnets_params(client) 287 subnet_params = subnets_info[netuid] 288 subnet_vote_mode = subnet_params["governance_config"]["vote_mode"] # type: ignore 289 subnet_burn_config = subnet_params["module_burn_config"] # type: ignore 290 # intersection update 291 for param, value in provided_params.items(): 292 if param in subnet_burn_config and value is not None: 293 subnet_burn_config[param] = value 294 subnet_params["vote_mode"] = subnet_vote_mode # type: ignore 295 296 subnet_params = dict(subnet_params) 297 subnet_params.pop("emission") 298 subnet_params.pop("governance_config") 299 300 subnet_params.update(provided_params) 301 # because bonds_ma and maximum_set_weights dont have a default value 302 if subnet_params.get("bonds_ma", None) is None: 303 subnet_params["bonds_ma"] = client.query("BondsMovingAverage") 304 if subnet_params.get("maximum_set_weight_calls_per_epoch", None) is None: 305 subnet_params["maximum_set_weight_calls_per_epoch"] = client.query( 306 "MaximumSetWeightCallsPerEpoch" 307 ) 308 309 resolved_key = try_classic_load_key(key) 310 with context.progress_status("Adding a proposal..."): 311 client.add_subnet_proposal( 312 resolved_key, 313 subnet_params, 314 cid, 315 netuid=netuid 316 ) 317 context.info("Proposal added.") 318 319 320@subnet_app.command() 321def submit_general_subnet_application( 322 ctx: Context, key: str, application_key: str, cid: str 323): 324 """ 325 Submits a legitimate whitelist application to the general subnet, netuid 0. 326 """ 327 328 context = make_custom_context(ctx) 329 if not re.match(IPFS_REGEX, cid): 330 context.error(f"CID provided is invalid: {cid}") 331 exit(1) 332 333 client = context.com_client() 334 335 resolved_key = try_classic_load_key(key) 336 resolved_application_key = resolve_key_ss58(application_key) 337 338 # append the ipfs hash 339 ipfs_prefix = "ipfs://" 340 cid = ipfs_prefix + cid 341 342 with context.progress_status("Adding a application..."): 343 client.add_dao_application(resolved_key, resolved_application_key, cid) 344 345 346@subnet_app.command() 347def add_custom_proposal( 348 ctx: Context, 349 key: str, 350 cid: str, 351 netuid: int, 352): 353 """ 354 Adds a custom proposal to a specific subnet. 355 """ 356 357 context = make_custom_context(ctx) 358 if not re.match(IPFS_REGEX, cid): 359 context.error(f"CID provided is invalid: {cid}") 360 exit(1) 361 362 client = context.com_client() 363 364 resolved_key = try_classic_load_key(key) 365 366 # append the ipfs hash 367 ipfs_prefix = "ipfs://" 368 cid = ipfs_prefix + cid 369 370 with context.progress_status("Adding a proposal..."): 371 client.add_custom_subnet_proposal(resolved_key, cid, netuid=netuid) 372 373 374@subnet_app.command() 375def list_curator_applications( 376 ctx: Context 377): 378 """ 379 Lists all curator applications. 380 """ 381 context = make_custom_context(ctx) 382 client = context.com_client() 383 384 with context.progress_status("Querying applications..."): 385 apps = client.query_map_curator_applications() 386 print(apps)
subnet_app =
<typer.main.Typer object>
@subnet_app.command()
def
list(ctx: typer.models.Context):
20@subnet_app.command() 21def list(ctx: Context): 22 """ 23 Gets subnets. 24 """ 25 context = make_custom_context(ctx) 26 client = context.com_client() 27 28 with context.progress_status("Getting subnets ..."): 29 subnets = get_map_subnets_params(client) 30 31 keys, values = subnets.keys(), subnets.values() 32 subnets_with_netuids = [ 33 {"netuid": key, **value} for key, value in zip(keys, values) 34 ] 35 36 for subnet_dict in subnets_with_netuids: # type: ignore 37 bonds = subnet_dict["bonds_ma"] # type: ignore 38 if bonds: 39 subnet_dict["bonds_ma"] = str( 40 from_nano(subnet_dict["bonds_ma"])) + " J" # type: ignore 41 42 for dict in subnets_with_netuids: # type: ignore 43 print_table_from_plain_dict( 44 dict, ["Params", "Values"], context.console) # type: ignore
Gets subnets.
@subnet_app.command()
def
distribution(ctx: typer.models.Context):
47@subnet_app.command() 48def distribution(ctx: Context): 49 context = make_custom_context(ctx) 50 client = context.com_client() 51 52 with context.progress_status("Getting emission distribution..."): 53 subnets_emission = client.query_map_subnet_emission() 54 subnet_consensus = client.query_map_subnet_consensus() 55 subnet_names = client.query_map_subnet_names() 56 total_emission = sum(subnets_emission.values()) 57 subnet_emission_percentages = { 58 key: value / total_emission * 100 for key, value in subnets_emission.items() 59 } 60 61 # Prepare the data for the table 62 table_data: dict[str, Any] = { 63 "Subnet": [], 64 "Name": [], 65 "Consensus": [], 66 "Emission %": [] 67 } 68 69 for subnet, emission_percentage in subnet_emission_percentages.items(): 70 if emission_percentage > 0: 71 table_data["Subnet"].append(str(subnet)) 72 table_data["Name"].append(subnet_names.get(subnet, "N/A")) 73 table_data["Consensus"].append(subnet_consensus.get(subnet, "N/A")) 74 table_data["Emission %"].append(f"{round(emission_percentage, 2)}%") 75 76 print_table_standardize(table_data, context.console)
@subnet_app.command()
def
legit_whitelist(ctx: typer.models.Context):
79@subnet_app.command() 80def legit_whitelist(ctx: Context): 81 """ 82 Gets the legitimate whitelist of modules for the general subnet 0 83 """ 84 85 context = make_custom_context(ctx) 86 client = context.com_client() 87 88 with context.progress_status("Getting legitimate whitelist ..."): 89 whitelist = cast(dict[str, int], client.query_map_legit_whitelist()) 90 91 print_table_from_plain_dict( 92 whitelist, ["Module", "Recommended weight"], context.console 93 )
Gets the legitimate whitelist of modules for the general subnet 0
@subnet_app.command()
def
info(ctx: typer.models.Context, netuid: int):
96@subnet_app.command() 97def info(ctx: Context, netuid: int): 98 """ 99 Gets subnet info. 100 """ 101 context = make_custom_context(ctx) 102 client = context.com_client() 103 104 with context.progress_status(f"Getting subnet with netuid '{netuid}'..."): 105 106 subnets = get_map_subnets_params(client) 107 subnet = subnets.get(netuid, None) 108 109 if subnet is None: 110 raise ValueError("Subnet not found") 111 112 general_subnet: dict[str, Any] = cast(dict[str, Any], subnet) 113 print_table_from_plain_dict( 114 general_subnet, ["Params", "Values"], context.console)
Gets subnet info.
@subnet_app.command()
def
register( ctx: typer.models.Context, key: str, name: str, metadata: str = <typer.models.OptionInfo object>):
117@subnet_app.command() 118def register( 119 ctx: Context, 120 key: str, 121 name: str, 122 metadata: str = typer.Option(None) 123): 124 """ 125 Registers a new subnet. 126 """ 127 context = make_custom_context(ctx) 128 resolved_key = try_classic_load_key(key) 129 client = context.com_client() 130 131 with context.progress_status("Registering subnet ..."): 132 response = client.register_subnet(resolved_key, name, metadata) 133 134 if response.is_success: 135 context.info(f"Successfully registered subnet {name}") 136 else: 137 raise ChainTransactionError(response.error_message) # type: ignore
Registers a new subnet.
@subnet_app.command()
def
update( ctx: typer.models.Context, key: str, netuid: int, founder: str = <typer.models.OptionInfo object>, founder_share: int = <typer.models.OptionInfo object>, name: str = <typer.models.OptionInfo object>, metadata: str = <typer.models.OptionInfo object>, immunity_period: int = <typer.models.OptionInfo object>, incentive_ratio: int = <typer.models.OptionInfo object>, max_allowed_uids: int = <typer.models.OptionInfo object>, max_allowed_weights: int = <typer.models.OptionInfo object>, min_allowed_weights: int = <typer.models.OptionInfo object>, max_weight_age: float = <typer.models.OptionInfo object>, tempo: int = <typer.models.OptionInfo object>, trust_ratio: int = <typer.models.OptionInfo object>, maximum_set_weight_calls_per_epoch: int = <typer.models.OptionInfo object>, vote_mode: communex.types.VoteMode = <typer.models.OptionInfo object>, bonds_ma: int = <typer.models.OptionInfo object>, min_burn: int = <typer.models.OptionInfo object>, max_burn: int = <typer.models.OptionInfo object>, adjustment_alpha: int = <typer.models.OptionInfo object>, target_registrations_interval: int = <typer.models.OptionInfo object>, target_registrations_per_interval: int = <typer.models.OptionInfo object>, max_registrations_per_interval: int = <typer.models.OptionInfo object>, min_validator_stake: int = <typer.models.OptionInfo object>, max_allowed_validators: int = <typer.models.OptionInfo object>):
140@subnet_app.command() 141def update( 142 ctx: Context, 143 key: str, 144 netuid: int, 145 founder: str = typer.Option(None), 146 founder_share: int = typer.Option(None), 147 name: str = typer.Option(None), 148 metadata: str = typer.Option(None), 149 immunity_period: int = typer.Option(None), 150 incentive_ratio: int = typer.Option(None), 151 max_allowed_uids: int = typer.Option(None), 152 max_allowed_weights: int = typer.Option(None), 153 min_allowed_weights: int = typer.Option(None), 154 max_weight_age: float = typer.Option(None), 155 tempo: int = typer.Option(None), 156 trust_ratio: int = typer.Option(None), 157 maximum_set_weight_calls_per_epoch: int = typer.Option(None), 158 159 # GovernanceConfiguration 160 vote_mode: VoteMode = typer.Option(None), 161 162 bonds_ma: int = typer.Option(None), 163 164 # BurnConfiguration 165 min_burn: int = typer.Option(None), 166 max_burn: int = typer.Option(None), 167 adjustment_alpha: int = typer.Option(None), 168 target_registrations_interval: int = typer.Option(None), 169 target_registrations_per_interval: int = typer.Option(None), 170 max_registrations_per_interval: int = typer.Option(None), 171 172 min_validator_stake: int = typer.Option(None), 173 max_allowed_validators: int = typer.Option(None), 174): 175 """ 176 Updates a subnet. 177 """ 178 provided_params = locals().copy() 179 provided_params.pop("ctx") 180 provided_params.pop("key") 181 provided_params.pop("netuid") 182 183 provided_params = { 184 key: value for key, value in provided_params.items() if value is not None 185 } 186 if vote_mode is not None: # type: ignore 187 provided_params["vote_mode"] = vote_mode.value 188 context = make_custom_context(ctx) 189 client = context.com_client() 190 subnets_info = get_map_subnets_params(client) 191 subnet_params = subnets_info[netuid] 192 subnet_vote_mode = subnet_params["governance_config"]["vote_mode"] # type: ignore 193 subnet_burn_config = subnet_params["module_burn_config"] # type: ignore 194 # intersection update 195 for param, value in provided_params.items(): 196 if param in subnet_burn_config and value is not None: 197 subnet_burn_config[param] = value 198 199 subnet_params = dict(subnet_params) 200 subnet_params.pop("emission") 201 subnet_params.pop("governance_config") 202 subnet_params["vote_mode"] = subnet_vote_mode # type: ignore 203 204 subnet_params = cast(SubnetParams, subnet_params) 205 provided_params = cast(SubnetParams, provided_params) 206 subnet_params.update(provided_params) 207 # because bonds_ma and maximum_set_weights dont have a default value 208 if subnet_params.get("bonds_ma", None) is None: 209 subnet_params["bonds_ma"] = client.query("BondsMovingAverage") 210 if subnet_params.get("maximum_set_weight_calls_per_epoch", None) is None: 211 subnet_params["maximum_set_weight_calls_per_epoch"] = client.query( 212 "MaximumSetWeightCallsPerEpoch" 213 ) 214 resolved_key = try_classic_load_key(key) 215 with context.progress_status("Updating subnet ..."): 216 response = client.update_subnet( 217 key=resolved_key, params=subnet_params, netuid=netuid 218 ) 219 220 if response.is_success: 221 context.info( 222 f"Successfully updated subnet {subnet_params['name']} with netuid {netuid}" 223 ) 224 else: 225 raise ChainTransactionError(response.error_message) # type: ignore
Updates a subnet.
@subnet_app.command()
def
propose_on_subnet( ctx: typer.models.Context, key: str, netuid: int, cid: str, founder: str = <typer.models.OptionInfo object>, founder_share: int = <typer.models.OptionInfo object>, metadata: str = <typer.models.OptionInfo object>, name: str = <typer.models.OptionInfo object>, immunity_period: int = <typer.models.OptionInfo object>, incentive_ratio: int = <typer.models.OptionInfo object>, max_allowed_uids: int = <typer.models.OptionInfo object>, max_allowed_weights: int = <typer.models.OptionInfo object>, min_allowed_weights: int = <typer.models.OptionInfo object>, max_weight_age: int = <typer.models.OptionInfo object>, tempo: int = <typer.models.OptionInfo object>, trust_ratio: int = <typer.models.OptionInfo object>, maximum_set_weight_calls_per_epoch: int = <typer.models.OptionInfo object>, bonds_ma: int = <typer.models.OptionInfo object>, vote_mode: communex.types.VoteMode = <typer.models.OptionInfo object>, min_burn: int = <typer.models.OptionInfo object>, max_burn: int = <typer.models.OptionInfo object>, adjustment_alpha: int = <typer.models.OptionInfo object>, target_registrations_interval: int = <typer.models.OptionInfo object>, target_registrations_per_interval: int = <typer.models.OptionInfo object>, max_registrations_per_interval: int = <typer.models.OptionInfo object>, min_validator_stake: int = <typer.models.OptionInfo object>, max_allowed_validators: int = <typer.models.OptionInfo object>):
228@subnet_app.command() 229def propose_on_subnet( 230 ctx: Context, 231 key: str, 232 netuid: int, 233 cid: str, 234 founder: str = typer.Option(None), 235 founder_share: int = typer.Option(None), 236 metadata: str = typer.Option(None), 237 name: str = typer.Option(None), 238 immunity_period: int = typer.Option(None), 239 incentive_ratio: int = typer.Option(None), 240 max_allowed_uids: int = typer.Option(None), 241 max_allowed_weights: int = typer.Option(None), 242 min_allowed_weights: int = typer.Option(None), 243 max_weight_age: int = typer.Option(None), 244 tempo: int = typer.Option(None), 245 trust_ratio: int = typer.Option(None), 246 maximum_set_weight_calls_per_epoch: int = typer.Option(None), 247 bonds_ma: int = typer.Option(None), 248 249 vote_mode: VoteMode = typer.Option(None, help="0 for Authority, 1 for Vote"), 250 251 # BurnConfiguration 252 min_burn: int = typer.Option(None), 253 max_burn: int = typer.Option(None), 254 adjustment_alpha: int = typer.Option(None), 255 target_registrations_interval: int = typer.Option(None), 256 target_registrations_per_interval: int = typer.Option(None), 257 max_registrations_per_interval: int = typer.Option(None), 258 259 min_validator_stake: int = typer.Option(None), 260 max_allowed_validators: int = typer.Option(None), 261): 262 """ 263 Adds a proposal to a specific subnet. 264 """ 265 context = make_custom_context(ctx) 266 if not re.match(IPFS_REGEX, cid): 267 context.error(f"CID provided is invalid: {cid}") 268 exit(1) 269 else: 270 ipfs_prefix = "ipfs://" 271 cid = ipfs_prefix + cid 272 273 provided_params = locals().copy() 274 provided_params.pop("ctx") 275 provided_params.pop("context") 276 provided_params.pop("key") 277 provided_params.pop("ipfs_prefix") 278 if provided_params["founder"] is not None: 279 resolve_founder = resolve_key_ss58(founder) 280 provided_params["founder"] = resolve_founder 281 282 provided_params = { 283 key: value for key, value in provided_params.items() if value is not None 284 } 285 286 client = context.com_client() 287 subnets_info = get_map_subnets_params(client) 288 subnet_params = subnets_info[netuid] 289 subnet_vote_mode = subnet_params["governance_config"]["vote_mode"] # type: ignore 290 subnet_burn_config = subnet_params["module_burn_config"] # type: ignore 291 # intersection update 292 for param, value in provided_params.items(): 293 if param in subnet_burn_config and value is not None: 294 subnet_burn_config[param] = value 295 subnet_params["vote_mode"] = subnet_vote_mode # type: ignore 296 297 subnet_params = dict(subnet_params) 298 subnet_params.pop("emission") 299 subnet_params.pop("governance_config") 300 301 subnet_params.update(provided_params) 302 # because bonds_ma and maximum_set_weights dont have a default value 303 if subnet_params.get("bonds_ma", None) is None: 304 subnet_params["bonds_ma"] = client.query("BondsMovingAverage") 305 if subnet_params.get("maximum_set_weight_calls_per_epoch", None) is None: 306 subnet_params["maximum_set_weight_calls_per_epoch"] = client.query( 307 "MaximumSetWeightCallsPerEpoch" 308 ) 309 310 resolved_key = try_classic_load_key(key) 311 with context.progress_status("Adding a proposal..."): 312 client.add_subnet_proposal( 313 resolved_key, 314 subnet_params, 315 cid, 316 netuid=netuid 317 ) 318 context.info("Proposal added.")
Adds a proposal to a specific subnet.
@subnet_app.command()
def
submit_general_subnet_application(ctx: typer.models.Context, key: str, application_key: str, cid: str):
321@subnet_app.command() 322def submit_general_subnet_application( 323 ctx: Context, key: str, application_key: str, cid: str 324): 325 """ 326 Submits a legitimate whitelist application to the general subnet, netuid 0. 327 """ 328 329 context = make_custom_context(ctx) 330 if not re.match(IPFS_REGEX, cid): 331 context.error(f"CID provided is invalid: {cid}") 332 exit(1) 333 334 client = context.com_client() 335 336 resolved_key = try_classic_load_key(key) 337 resolved_application_key = resolve_key_ss58(application_key) 338 339 # append the ipfs hash 340 ipfs_prefix = "ipfs://" 341 cid = ipfs_prefix + cid 342 343 with context.progress_status("Adding a application..."): 344 client.add_dao_application(resolved_key, resolved_application_key, cid)
Submits a legitimate whitelist application to the general subnet, netuid 0.
@subnet_app.command()
def
add_custom_proposal(ctx: typer.models.Context, key: str, cid: str, netuid: int):
347@subnet_app.command() 348def add_custom_proposal( 349 ctx: Context, 350 key: str, 351 cid: str, 352 netuid: int, 353): 354 """ 355 Adds a custom proposal to a specific subnet. 356 """ 357 358 context = make_custom_context(ctx) 359 if not re.match(IPFS_REGEX, cid): 360 context.error(f"CID provided is invalid: {cid}") 361 exit(1) 362 363 client = context.com_client() 364 365 resolved_key = try_classic_load_key(key) 366 367 # append the ipfs hash 368 ipfs_prefix = "ipfs://" 369 cid = ipfs_prefix + cid 370 371 with context.progress_status("Adding a proposal..."): 372 client.add_custom_subnet_proposal(resolved_key, cid, netuid=netuid)
Adds a custom proposal to a specific subnet.
@subnet_app.command()
def
list_curator_applications(ctx: typer.models.Context):
375@subnet_app.command() 376def list_curator_applications( 377 ctx: Context 378): 379 """ 380 Lists all curator applications. 381 """ 382 context = make_custom_context(ctx) 383 client = context.com_client() 384 385 with context.progress_status("Querying applications..."): 386 apps = client.query_map_curator_applications() 387 print(apps)
Lists all curator applications.