communex.cli.subnet
1import re 2from typing import Any, cast 3 4import typer 5from typer import Context 6 7from communex.cli._common import ( 8 make_custom_context, 9 print_table_from_plain_dict, 10 print_table_standardize, 11) 12from communex.compat.key import resolve_key_ss58 13from communex.errors import ChainTransactionError 14from communex.misc import ( 15 IPFS_REGEX, 16 get_map_displayable_subnets, 17 get_map_subnets_params, 18) 19from communex.types import SubnetParams, VoteMode 20 21subnet_app = typer.Typer(no_args_is_help=True) 22 23 24@subnet_app.command() 25def list(ctx: Context): 26 """ 27 Gets subnets. 28 """ 29 context = make_custom_context(ctx) 30 client = context.com_client() 31 32 with context.progress_status("Getting subnets ..."): 33 subnets = get_map_displayable_subnets(client) 34 subnets_with_netuids = [ 35 {"netuid": key, **value} for key, value in subnets.items() 36 ] 37 for dict in subnets_with_netuids: # type: ignore 38 print_table_from_plain_dict(dict, ["Params", "Values"], context.console) # type: ignore 39 40 41@subnet_app.command() 42def distribution(ctx: Context): 43 context = make_custom_context(ctx) 44 client = context.com_client() 45 46 with context.progress_status("Getting emission distribution..."): 47 subnets_emission = client.query_map_subnet_emission() 48 subnet_consensus = client.query_map_subnet_consensus() 49 subnet_names = client.query_map_subnet_names() 50 total_emission = sum(subnets_emission.values()) 51 subnet_emission_percentages = { 52 key: value / total_emission * 100 53 for key, value in subnets_emission.items() 54 } 55 56 # Prepare the data for the table 57 table_data: dict[str, Any] = { 58 "Subnet": [], 59 "Name": [], 60 "Consensus": [], 61 "Emission %": [], 62 } 63 64 for subnet, emission_percentage in subnet_emission_percentages.items(): 65 if emission_percentage > 0: 66 table_data["Subnet"].append(str(subnet)) 67 table_data["Name"].append(subnet_names.get(subnet, "N/A")) 68 table_data["Consensus"].append(subnet_consensus.get(subnet, "N/A")) 69 table_data["Emission %"].append(f"{round(emission_percentage, 2)}%") 70 71 print_table_standardize(table_data, context.console) 72 73 74@subnet_app.command() 75def legit_whitelist(ctx: Context): 76 """ 77 Gets the legitimate whitelist of modules for the general subnet 0 78 """ 79 80 context = make_custom_context(ctx) 81 client = context.com_client() 82 83 with context.progress_status("Getting legitimate whitelist ..."): 84 whitelist = cast(dict[str, int], client.query_map_legit_whitelist()) 85 86 print_table_from_plain_dict( 87 whitelist, ["Module", "Recommended weight"], context.console 88 ) 89 90 91@subnet_app.command() 92def info(ctx: Context, netuid: int): 93 """ 94 Gets subnet info. 95 """ 96 context = make_custom_context(ctx) 97 client = context.com_client() 98 99 with context.progress_status(f"Getting subnet with netuid '{netuid}'..."): 100 subnets = get_map_displayable_subnets(client) 101 subnet = subnets.get(netuid, None) 102 103 if subnet is None: 104 raise ValueError("Subnet not found") 105 106 general_subnet: dict[str, Any] = cast(dict[str, Any], subnet) 107 print_table_from_plain_dict( 108 general_subnet, ["Params", "Values"], context.console 109 ) 110 111 112@subnet_app.command() 113def register( 114 ctx: Context, key: str, name: str, metadata: str = typer.Option(None) 115): 116 """ 117 Registers a new subnet. 118 """ 119 context = make_custom_context(ctx) 120 client = context.com_client() 121 resolved_key = context.load_key(key, None) 122 123 with context.progress_status("Registering subnet ..."): 124 response = client.register_subnet(resolved_key, name, metadata) 125 126 if response.is_success: 127 context.info(f"Successfully registered subnet {name}") 128 else: 129 raise ChainTransactionError(response.error_message) # type: ignore 130 131 132@subnet_app.command() 133def update( 134 ctx: Context, 135 key: str, 136 netuid: int, 137 founder: str = typer.Option(None), 138 founder_share: int = typer.Option(None), 139 name: str = typer.Option(None), 140 metadata: str = typer.Option(None), 141 immunity_period: int = typer.Option(None), 142 incentive_ratio: int = typer.Option(None), 143 max_allowed_uids: int = typer.Option(None), 144 max_allowed_weights: int = typer.Option(None), 145 min_allowed_weights: int = typer.Option(None), 146 max_weight_age: float = typer.Option(None), 147 tempo: int = typer.Option(None), 148 trust_ratio: int = typer.Option(None), 149 maximum_set_weight_calls_per_epoch: int = typer.Option(None), 150 # GovernanceConfiguration 151 vote_mode: VoteMode = typer.Option(None), 152 bonds_ma: int = typer.Option(None), 153 # BurnConfiguration 154 min_burn: int = typer.Option(None), 155 max_burn: int = typer.Option(None), 156 adjustment_alpha: int = typer.Option(None), 157 target_registrations_interval: int = typer.Option(None), 158 target_registrations_per_interval: int = typer.Option(None), 159 max_registrations_per_interval: int = typer.Option(None), 160 min_validator_stake: int = typer.Option(None), 161 max_allowed_validators: int = typer.Option(None), 162 max_encryption_period: int = typer.Option(None), 163 copier_margin: int = typer.Option(None), 164 use_weights_encryption: bool = typer.Option(None), 165): 166 """ 167 Updates a subnet. 168 """ 169 provided_params = locals().copy() 170 provided_params.pop("ctx") 171 provided_params.pop("key") 172 provided_params.pop("netuid") 173 174 provided_params = { 175 key: value 176 for key, value in provided_params.items() 177 if value is not None 178 } 179 if vote_mode is not None: # type: ignore 180 provided_params["vote_mode"] = vote_mode.value 181 context = make_custom_context(ctx) 182 client = context.com_client() 183 subnets_info = get_map_subnets_params(client) 184 subnet_params = subnets_info[netuid] 185 subnet_vote_mode = subnet_params["governance_config"]["vote_mode"] # type: ignore 186 subnet_burn_config = subnet_params["module_burn_config"] # type: ignore 187 # intersection update 188 for param, value in provided_params.items(): 189 if param in subnet_burn_config and value is not None: 190 subnet_burn_config[param] = value 191 192 subnet_params = dict(subnet_params) 193 subnet_params.pop("emission") 194 subnet_params.pop("governance_config") 195 subnet_params["vote_mode"] = subnet_vote_mode # type: ignore 196 197 subnet_params = cast(SubnetParams, subnet_params) 198 provided_params = cast(SubnetParams, provided_params) 199 subnet_params.update(provided_params) 200 # because bonds_ma and maximum_set_weights dont have a default value 201 if subnet_params.get("bonds_ma", None) is None: 202 subnet_params["bonds_ma"] = client.query("BondsMovingAverage") 203 if subnet_params.get("maximum_set_weight_calls_per_epoch", None) is None: 204 subnet_params["maximum_set_weight_calls_per_epoch"] = client.query( 205 "MaximumSetWeightCallsPerEpoch" 206 ) 207 resolved_key = context.load_key(key, None) 208 with context.progress_status("Updating subnet ..."): 209 response = client.update_subnet( 210 key=resolved_key, params=subnet_params, netuid=netuid 211 ) 212 213 if response.is_success: 214 context.info( 215 f"Successfully updated subnet {subnet_params['name']} with netuid {netuid}" 216 ) 217 else: 218 raise ChainTransactionError(response.error_message) # type: ignore 219 220 221@subnet_app.command() 222def propose_on_subnet( 223 ctx: Context, 224 key: str, 225 netuid: int, 226 cid: str, 227 founder: str = typer.Option(None), 228 founder_share: int = typer.Option(None), 229 metadata: str = typer.Option(None), 230 name: str = typer.Option(None), 231 immunity_period: int = typer.Option(None), 232 incentive_ratio: int = typer.Option(None), 233 max_allowed_uids: int = typer.Option(None), 234 max_allowed_weights: int = typer.Option(None), 235 min_allowed_weights: int = typer.Option(None), 236 max_weight_age: int = typer.Option(None), 237 tempo: int = typer.Option(None), 238 trust_ratio: int = typer.Option(None), 239 maximum_set_weight_calls_per_epoch: int = typer.Option(None), 240 bonds_ma: int = typer.Option(None), 241 vote_mode: VoteMode = typer.Option( 242 None, help="0 for Authority, 1 for Vote" 243 ), 244 # BurnConfiguration 245 min_burn: int = typer.Option(None), 246 max_burn: int = typer.Option(None), 247 adjustment_alpha: int = typer.Option(None), 248 target_registrations_interval: int = typer.Option(None), 249 target_registrations_per_interval: int = typer.Option(None), 250 max_registrations_per_interval: int = typer.Option(None), 251 min_validator_stake: int = typer.Option(None), 252 max_allowed_validators: int = typer.Option(None), 253 max_encryption_period: int = typer.Option(None), 254 copier_margin: int = typer.Option(None), 255 use_weights_encryption: int = typer.Option(None), 256): 257 """ 258 Adds a proposal to a specific subnet. 259 """ 260 context = make_custom_context(ctx) 261 if not re.match(IPFS_REGEX, cid): 262 context.error(f"CID provided is invalid: {cid}") 263 exit(1) 264 else: 265 ipfs_prefix = "ipfs://" 266 cid = ipfs_prefix + cid 267 268 provided_params = locals().copy() 269 provided_params.pop("ctx") 270 provided_params.pop("context") 271 provided_params.pop("key") 272 provided_params.pop("ipfs_prefix") 273 if provided_params["founder"] is not None: 274 resolve_founder = resolve_key_ss58(founder) 275 provided_params["founder"] = resolve_founder 276 277 provided_params = { 278 key: value 279 for key, value in provided_params.items() 280 if value is not None 281 } 282 283 client = context.com_client() 284 subnets_info = get_map_subnets_params(client) 285 subnet_params = subnets_info[netuid] 286 subnet_vote_mode = subnet_params["governance_config"]["vote_mode"] # type: ignore 287 subnet_burn_config = subnet_params["module_burn_config"] # type: ignore 288 # intersection update 289 for param, value in provided_params.items(): 290 if param in subnet_burn_config and value is not None: 291 subnet_burn_config[param] = value 292 subnet_params["vote_mode"] = subnet_vote_mode # type: ignore 293 294 subnet_params = dict(subnet_params) 295 subnet_params.pop("emission") 296 subnet_params.pop("governance_config") 297 298 subnet_params.update(provided_params) 299 # because bonds_ma and maximum_set_weights dont have a default value 300 if subnet_params.get("bonds_ma", None) is None: 301 subnet_params["bonds_ma"] = client.query("BondsMovingAverage") 302 if subnet_params.get("maximum_set_weight_calls_per_epoch", None) is None: 303 subnet_params["maximum_set_weight_calls_per_epoch"] = client.query( 304 "MaximumSetWeightCallsPerEpoch" 305 ) 306 307 resolved_key = context.load_key(key, None) 308 with context.progress_status("Adding a proposal..."): 309 client.add_subnet_proposal( 310 resolved_key, subnet_params, cid, netuid=netuid 311 ) 312 context.info("Proposal added.") 313 314 315@subnet_app.command() 316def submit_general_subnet_application( 317 ctx: Context, key: str, application_key: str, cid: str 318): 319 """ 320 Submits a legitimate whitelist application to the general subnet, netuid 0. 321 """ 322 323 context = make_custom_context(ctx) 324 if not re.match(IPFS_REGEX, cid): 325 context.error(f"CID provided is invalid: {cid}") 326 exit(1) 327 328 client = context.com_client() 329 330 resolved_key = context.load_key(key, None) 331 resolved_application_key = resolve_key_ss58(application_key) 332 333 # append the ipfs hash 334 ipfs_prefix = "ipfs://" 335 cid = ipfs_prefix + cid 336 337 with context.progress_status("Adding a application..."): 338 client.add_dao_application(resolved_key, resolved_application_key, cid) 339 340 341@subnet_app.command() 342def add_custom_proposal( 343 ctx: Context, 344 key: str, 345 cid: str, 346 netuid: int, 347): 348 """ 349 Adds a custom proposal to a specific subnet. 350 """ 351 context = make_custom_context(ctx) 352 353 if not re.match(IPFS_REGEX, cid): 354 context.error(f"CID provided is invalid: {cid}") 355 exit(1) 356 357 client = context.com_client() 358 359 resolved_key = context.load_key(key, None) 360 361 ipfs_prefix = "ipfs://" 362 cid = ipfs_prefix + cid 363 364 with context.progress_status("Adding a proposal..."): 365 client.add_custom_subnet_proposal(resolved_key, cid, netuid=netuid) 366 367 368@subnet_app.command() 369def list_curator_applications(ctx: Context): 370 """ 371 Lists all curator applications. 372 """ 373 context = make_custom_context(ctx) 374 client = context.com_client() 375 376 with context.progress_status("Querying applications..."): 377 apps = client.query_map_curator_applications() 378 print(apps)
subnet_app =
<typer.main.Typer object>
@subnet_app.command()
def
list(ctx: typer.models.Context):
25@subnet_app.command() 26def list(ctx: Context): 27 """ 28 Gets subnets. 29 """ 30 context = make_custom_context(ctx) 31 client = context.com_client() 32 33 with context.progress_status("Getting subnets ..."): 34 subnets = get_map_displayable_subnets(client) 35 subnets_with_netuids = [ 36 {"netuid": key, **value} for key, value in subnets.items() 37 ] 38 for dict in subnets_with_netuids: # type: ignore 39 print_table_from_plain_dict(dict, ["Params", "Values"], context.console) # type: ignore
Gets subnets.
@subnet_app.command()
def
distribution(ctx: typer.models.Context):
42@subnet_app.command() 43def distribution(ctx: Context): 44 context = make_custom_context(ctx) 45 client = context.com_client() 46 47 with context.progress_status("Getting emission distribution..."): 48 subnets_emission = client.query_map_subnet_emission() 49 subnet_consensus = client.query_map_subnet_consensus() 50 subnet_names = client.query_map_subnet_names() 51 total_emission = sum(subnets_emission.values()) 52 subnet_emission_percentages = { 53 key: value / total_emission * 100 54 for key, value in subnets_emission.items() 55 } 56 57 # Prepare the data for the table 58 table_data: dict[str, Any] = { 59 "Subnet": [], 60 "Name": [], 61 "Consensus": [], 62 "Emission %": [], 63 } 64 65 for subnet, emission_percentage in subnet_emission_percentages.items(): 66 if emission_percentage > 0: 67 table_data["Subnet"].append(str(subnet)) 68 table_data["Name"].append(subnet_names.get(subnet, "N/A")) 69 table_data["Consensus"].append(subnet_consensus.get(subnet, "N/A")) 70 table_data["Emission %"].append(f"{round(emission_percentage, 2)}%") 71 72 print_table_standardize(table_data, context.console)
@subnet_app.command()
def
legit_whitelist(ctx: typer.models.Context):
75@subnet_app.command() 76def legit_whitelist(ctx: Context): 77 """ 78 Gets the legitimate whitelist of modules for the general subnet 0 79 """ 80 81 context = make_custom_context(ctx) 82 client = context.com_client() 83 84 with context.progress_status("Getting legitimate whitelist ..."): 85 whitelist = cast(dict[str, int], client.query_map_legit_whitelist()) 86 87 print_table_from_plain_dict( 88 whitelist, ["Module", "Recommended weight"], context.console 89 )
Gets the legitimate whitelist of modules for the general subnet 0
@subnet_app.command()
def
info(ctx: typer.models.Context, netuid: int):
92@subnet_app.command() 93def info(ctx: Context, netuid: int): 94 """ 95 Gets subnet info. 96 """ 97 context = make_custom_context(ctx) 98 client = context.com_client() 99 100 with context.progress_status(f"Getting subnet with netuid '{netuid}'..."): 101 subnets = get_map_displayable_subnets(client) 102 subnet = subnets.get(netuid, None) 103 104 if subnet is None: 105 raise ValueError("Subnet not found") 106 107 general_subnet: dict[str, Any] = cast(dict[str, Any], subnet) 108 print_table_from_plain_dict( 109 general_subnet, ["Params", "Values"], context.console 110 )
Gets subnet info.
@subnet_app.command()
def
register( ctx: typer.models.Context, key: str, name: str, metadata: str = <typer.models.OptionInfo object>):
113@subnet_app.command() 114def register( 115 ctx: Context, key: str, name: str, metadata: str = typer.Option(None) 116): 117 """ 118 Registers a new subnet. 119 """ 120 context = make_custom_context(ctx) 121 client = context.com_client() 122 resolved_key = context.load_key(key, None) 123 124 with context.progress_status("Registering subnet ..."): 125 response = client.register_subnet(resolved_key, name, metadata) 126 127 if response.is_success: 128 context.info(f"Successfully registered subnet {name}") 129 else: 130 raise ChainTransactionError(response.error_message) # type: ignore
Registers a new subnet.
@subnet_app.command()
def
update( ctx: typer.models.Context, key: str, netuid: int, founder: str = <typer.models.OptionInfo object>, founder_share: int = <typer.models.OptionInfo object>, name: str = <typer.models.OptionInfo object>, metadata: str = <typer.models.OptionInfo object>, immunity_period: int = <typer.models.OptionInfo object>, incentive_ratio: int = <typer.models.OptionInfo object>, max_allowed_uids: int = <typer.models.OptionInfo object>, max_allowed_weights: int = <typer.models.OptionInfo object>, min_allowed_weights: int = <typer.models.OptionInfo object>, max_weight_age: float = <typer.models.OptionInfo object>, tempo: int = <typer.models.OptionInfo object>, trust_ratio: int = <typer.models.OptionInfo object>, maximum_set_weight_calls_per_epoch: int = <typer.models.OptionInfo object>, vote_mode: communex.types.VoteMode = <typer.models.OptionInfo object>, bonds_ma: int = <typer.models.OptionInfo object>, min_burn: int = <typer.models.OptionInfo object>, max_burn: int = <typer.models.OptionInfo object>, adjustment_alpha: int = <typer.models.OptionInfo object>, target_registrations_interval: int = <typer.models.OptionInfo object>, target_registrations_per_interval: int = <typer.models.OptionInfo object>, max_registrations_per_interval: int = <typer.models.OptionInfo object>, min_validator_stake: int = <typer.models.OptionInfo object>, max_allowed_validators: int = <typer.models.OptionInfo object>, max_encryption_period: int = <typer.models.OptionInfo object>, copier_margin: int = <typer.models.OptionInfo object>, use_weights_encryption: bool = <typer.models.OptionInfo object>):
133@subnet_app.command() 134def update( 135 ctx: Context, 136 key: str, 137 netuid: int, 138 founder: str = typer.Option(None), 139 founder_share: int = typer.Option(None), 140 name: str = typer.Option(None), 141 metadata: str = typer.Option(None), 142 immunity_period: int = typer.Option(None), 143 incentive_ratio: int = typer.Option(None), 144 max_allowed_uids: int = typer.Option(None), 145 max_allowed_weights: int = typer.Option(None), 146 min_allowed_weights: int = typer.Option(None), 147 max_weight_age: float = typer.Option(None), 148 tempo: int = typer.Option(None), 149 trust_ratio: int = typer.Option(None), 150 maximum_set_weight_calls_per_epoch: int = typer.Option(None), 151 # GovernanceConfiguration 152 vote_mode: VoteMode = typer.Option(None), 153 bonds_ma: int = typer.Option(None), 154 # BurnConfiguration 155 min_burn: int = typer.Option(None), 156 max_burn: int = typer.Option(None), 157 adjustment_alpha: int = typer.Option(None), 158 target_registrations_interval: int = typer.Option(None), 159 target_registrations_per_interval: int = typer.Option(None), 160 max_registrations_per_interval: int = typer.Option(None), 161 min_validator_stake: int = typer.Option(None), 162 max_allowed_validators: int = typer.Option(None), 163 max_encryption_period: int = typer.Option(None), 164 copier_margin: int = typer.Option(None), 165 use_weights_encryption: bool = typer.Option(None), 166): 167 """ 168 Updates a subnet. 169 """ 170 provided_params = locals().copy() 171 provided_params.pop("ctx") 172 provided_params.pop("key") 173 provided_params.pop("netuid") 174 175 provided_params = { 176 key: value 177 for key, value in provided_params.items() 178 if value is not None 179 } 180 if vote_mode is not None: # type: ignore 181 provided_params["vote_mode"] = vote_mode.value 182 context = make_custom_context(ctx) 183 client = context.com_client() 184 subnets_info = get_map_subnets_params(client) 185 subnet_params = subnets_info[netuid] 186 subnet_vote_mode = subnet_params["governance_config"]["vote_mode"] # type: ignore 187 subnet_burn_config = subnet_params["module_burn_config"] # type: ignore 188 # intersection update 189 for param, value in provided_params.items(): 190 if param in subnet_burn_config and value is not None: 191 subnet_burn_config[param] = value 192 193 subnet_params = dict(subnet_params) 194 subnet_params.pop("emission") 195 subnet_params.pop("governance_config") 196 subnet_params["vote_mode"] = subnet_vote_mode # type: ignore 197 198 subnet_params = cast(SubnetParams, subnet_params) 199 provided_params = cast(SubnetParams, provided_params) 200 subnet_params.update(provided_params) 201 # because bonds_ma and maximum_set_weights dont have a default value 202 if subnet_params.get("bonds_ma", None) is None: 203 subnet_params["bonds_ma"] = client.query("BondsMovingAverage") 204 if subnet_params.get("maximum_set_weight_calls_per_epoch", None) is None: 205 subnet_params["maximum_set_weight_calls_per_epoch"] = client.query( 206 "MaximumSetWeightCallsPerEpoch" 207 ) 208 resolved_key = context.load_key(key, None) 209 with context.progress_status("Updating subnet ..."): 210 response = client.update_subnet( 211 key=resolved_key, params=subnet_params, netuid=netuid 212 ) 213 214 if response.is_success: 215 context.info( 216 f"Successfully updated subnet {subnet_params['name']} with netuid {netuid}" 217 ) 218 else: 219 raise ChainTransactionError(response.error_message) # type: ignore
Updates a subnet.
@subnet_app.command()
def
propose_on_subnet( ctx: typer.models.Context, key: str, netuid: int, cid: str, founder: str = <typer.models.OptionInfo object>, founder_share: int = <typer.models.OptionInfo object>, metadata: str = <typer.models.OptionInfo object>, name: str = <typer.models.OptionInfo object>, immunity_period: int = <typer.models.OptionInfo object>, incentive_ratio: int = <typer.models.OptionInfo object>, max_allowed_uids: int = <typer.models.OptionInfo object>, max_allowed_weights: int = <typer.models.OptionInfo object>, min_allowed_weights: int = <typer.models.OptionInfo object>, max_weight_age: int = <typer.models.OptionInfo object>, tempo: int = <typer.models.OptionInfo object>, trust_ratio: int = <typer.models.OptionInfo object>, maximum_set_weight_calls_per_epoch: int = <typer.models.OptionInfo object>, bonds_ma: int = <typer.models.OptionInfo object>, vote_mode: communex.types.VoteMode = <typer.models.OptionInfo object>, min_burn: int = <typer.models.OptionInfo object>, max_burn: int = <typer.models.OptionInfo object>, adjustment_alpha: int = <typer.models.OptionInfo object>, target_registrations_interval: int = <typer.models.OptionInfo object>, target_registrations_per_interval: int = <typer.models.OptionInfo object>, max_registrations_per_interval: int = <typer.models.OptionInfo object>, min_validator_stake: int = <typer.models.OptionInfo object>, max_allowed_validators: int = <typer.models.OptionInfo object>, max_encryption_period: int = <typer.models.OptionInfo object>, copier_margin: int = <typer.models.OptionInfo object>, use_weights_encryption: int = <typer.models.OptionInfo object>):
222@subnet_app.command() 223def propose_on_subnet( 224 ctx: Context, 225 key: str, 226 netuid: int, 227 cid: str, 228 founder: str = typer.Option(None), 229 founder_share: int = typer.Option(None), 230 metadata: str = typer.Option(None), 231 name: str = typer.Option(None), 232 immunity_period: int = typer.Option(None), 233 incentive_ratio: int = typer.Option(None), 234 max_allowed_uids: int = typer.Option(None), 235 max_allowed_weights: int = typer.Option(None), 236 min_allowed_weights: int = typer.Option(None), 237 max_weight_age: int = typer.Option(None), 238 tempo: int = typer.Option(None), 239 trust_ratio: int = typer.Option(None), 240 maximum_set_weight_calls_per_epoch: int = typer.Option(None), 241 bonds_ma: int = typer.Option(None), 242 vote_mode: VoteMode = typer.Option( 243 None, help="0 for Authority, 1 for Vote" 244 ), 245 # BurnConfiguration 246 min_burn: int = typer.Option(None), 247 max_burn: int = typer.Option(None), 248 adjustment_alpha: int = typer.Option(None), 249 target_registrations_interval: int = typer.Option(None), 250 target_registrations_per_interval: int = typer.Option(None), 251 max_registrations_per_interval: int = typer.Option(None), 252 min_validator_stake: int = typer.Option(None), 253 max_allowed_validators: int = typer.Option(None), 254 max_encryption_period: int = typer.Option(None), 255 copier_margin: int = typer.Option(None), 256 use_weights_encryption: int = typer.Option(None), 257): 258 """ 259 Adds a proposal to a specific subnet. 260 """ 261 context = make_custom_context(ctx) 262 if not re.match(IPFS_REGEX, cid): 263 context.error(f"CID provided is invalid: {cid}") 264 exit(1) 265 else: 266 ipfs_prefix = "ipfs://" 267 cid = ipfs_prefix + cid 268 269 provided_params = locals().copy() 270 provided_params.pop("ctx") 271 provided_params.pop("context") 272 provided_params.pop("key") 273 provided_params.pop("ipfs_prefix") 274 if provided_params["founder"] is not None: 275 resolve_founder = resolve_key_ss58(founder) 276 provided_params["founder"] = resolve_founder 277 278 provided_params = { 279 key: value 280 for key, value in provided_params.items() 281 if value is not None 282 } 283 284 client = context.com_client() 285 subnets_info = get_map_subnets_params(client) 286 subnet_params = subnets_info[netuid] 287 subnet_vote_mode = subnet_params["governance_config"]["vote_mode"] # type: ignore 288 subnet_burn_config = subnet_params["module_burn_config"] # type: ignore 289 # intersection update 290 for param, value in provided_params.items(): 291 if param in subnet_burn_config and value is not None: 292 subnet_burn_config[param] = value 293 subnet_params["vote_mode"] = subnet_vote_mode # type: ignore 294 295 subnet_params = dict(subnet_params) 296 subnet_params.pop("emission") 297 subnet_params.pop("governance_config") 298 299 subnet_params.update(provided_params) 300 # because bonds_ma and maximum_set_weights dont have a default value 301 if subnet_params.get("bonds_ma", None) is None: 302 subnet_params["bonds_ma"] = client.query("BondsMovingAverage") 303 if subnet_params.get("maximum_set_weight_calls_per_epoch", None) is None: 304 subnet_params["maximum_set_weight_calls_per_epoch"] = client.query( 305 "MaximumSetWeightCallsPerEpoch" 306 ) 307 308 resolved_key = context.load_key(key, None) 309 with context.progress_status("Adding a proposal..."): 310 client.add_subnet_proposal( 311 resolved_key, subnet_params, cid, netuid=netuid 312 ) 313 context.info("Proposal added.")
Adds a proposal to a specific subnet.
@subnet_app.command()
def
submit_general_subnet_application(ctx: typer.models.Context, key: str, application_key: str, cid: str):
316@subnet_app.command() 317def submit_general_subnet_application( 318 ctx: Context, key: str, application_key: str, cid: str 319): 320 """ 321 Submits a legitimate whitelist application to the general subnet, netuid 0. 322 """ 323 324 context = make_custom_context(ctx) 325 if not re.match(IPFS_REGEX, cid): 326 context.error(f"CID provided is invalid: {cid}") 327 exit(1) 328 329 client = context.com_client() 330 331 resolved_key = context.load_key(key, None) 332 resolved_application_key = resolve_key_ss58(application_key) 333 334 # append the ipfs hash 335 ipfs_prefix = "ipfs://" 336 cid = ipfs_prefix + cid 337 338 with context.progress_status("Adding a application..."): 339 client.add_dao_application(resolved_key, resolved_application_key, cid)
Submits a legitimate whitelist application to the general subnet, netuid 0.
@subnet_app.command()
def
add_custom_proposal(ctx: typer.models.Context, key: str, cid: str, netuid: int):
342@subnet_app.command() 343def add_custom_proposal( 344 ctx: Context, 345 key: str, 346 cid: str, 347 netuid: int, 348): 349 """ 350 Adds a custom proposal to a specific subnet. 351 """ 352 context = make_custom_context(ctx) 353 354 if not re.match(IPFS_REGEX, cid): 355 context.error(f"CID provided is invalid: {cid}") 356 exit(1) 357 358 client = context.com_client() 359 360 resolved_key = context.load_key(key, None) 361 362 ipfs_prefix = "ipfs://" 363 cid = ipfs_prefix + cid 364 365 with context.progress_status("Adding a proposal..."): 366 client.add_custom_subnet_proposal(resolved_key, cid, netuid=netuid)
Adds a custom proposal to a specific subnet.
@subnet_app.command()
def
list_curator_applications(ctx: typer.models.Context):
369@subnet_app.command() 370def list_curator_applications(ctx: Context): 371 """ 372 Lists all curator applications. 373 """ 374 context = make_custom_context(ctx) 375 client = context.com_client() 376 377 with context.progress_status("Querying applications..."): 378 apps = client.query_map_curator_applications() 379 print(apps)
Lists all curator applications.