communex.cli.subnet
1import re 2from typing import Any, cast 3 4import typer 5from typer import Context 6 7from communex.cli._common import ( 8 make_custom_context, 9 print_table_from_plain_dict, 10 print_table_standardize, 11) 12from communex.compat.key import resolve_key_ss58 13from communex.errors import ChainTransactionError 14from communex.misc import ( 15 IPFS_REGEX, 16 get_map_displayable_subnets, 17 get_map_subnets_params, 18) 19from communex.types import SubnetParams, VoteMode 20 21subnet_app = typer.Typer(no_args_is_help=True) 22 23 24@subnet_app.command() 25def list(ctx: Context): 26 """ 27 Gets subnets. 28 """ 29 context = make_custom_context(ctx) 30 client = context.com_client() 31 32 with context.progress_status("Getting subnets ..."): 33 subnets = get_map_displayable_subnets(client) 34 subnets_with_netuids = [ 35 {"netuid": key, **value} for key, value in subnets.items() 36 ] 37 for dict in subnets_with_netuids: # type: ignore 38 print_table_from_plain_dict(dict, ["Params", "Values"], context.console) # type: ignore 39 40 41@subnet_app.command() 42def distribution(ctx: Context): 43 context = make_custom_context(ctx) 44 client = context.com_client() 45 46 with context.progress_status("Getting emission distribution..."): 47 subnets_emission = client.query_map_subnet_emission() 48 subnet_consensus = client.query_map_subnet_consensus() 49 subnet_names = client.query_map_subnet_names() 50 total_emission = sum(subnets_emission.values()) 51 subnet_emission_percentages = { 52 key: value / total_emission * 100 53 for key, value in subnets_emission.items() 54 } 55 56 # Prepare the data for the table 57 table_data: dict[str, Any] = { 58 "Subnet": [], 59 "Name": [], 60 "Consensus": [], 61 "Emission %": [], 62 } 63 64 for subnet, emission_percentage in subnet_emission_percentages.items(): 65 if emission_percentage > 0: 66 table_data["Subnet"].append(str(subnet)) 67 table_data["Name"].append(subnet_names.get(subnet, "N/A")) 68 table_data["Consensus"].append(subnet_consensus.get(subnet, "N/A")) 69 table_data["Emission %"].append(f"{round(emission_percentage, 2)}%") 70 71 print_table_standardize(table_data, context.console) 72 73 74@subnet_app.command() 75def legit_whitelist(ctx: Context): 76 """ 77 Gets the legitimate whitelist of modules for the general subnet 0 78 """ 79 80 context = make_custom_context(ctx) 81 client = context.com_client() 82 83 with context.progress_status("Getting legitimate whitelist ..."): 84 whitelist = cast(dict[str, int], client.query_map_legit_whitelist()) 85 86 print_table_from_plain_dict( 87 whitelist, ["Module", "Recommended weight"], context.console 88 ) 89 90 91@subnet_app.command() 92def info(ctx: Context, netuid: int): 93 """ 94 Gets subnet info. 95 """ 96 context = make_custom_context(ctx) 97 client = context.com_client() 98 99 with context.progress_status(f"Getting subnet with netuid '{netuid}'..."): 100 subnets = get_map_displayable_subnets(client) 101 subnet = subnets.get(netuid, None) 102 103 if subnet is None: 104 raise ValueError("Subnet not found") 105 106 general_subnet: dict[str, Any] = cast(dict[str, Any], subnet) 107 print_table_from_plain_dict( 108 general_subnet, ["Params", "Values"], context.console 109 ) 110 111 112@subnet_app.command() 113def register( 114 ctx: Context, key: str, name: str, metadata: str = typer.Option(None) 115): 116 """ 117 Registers a new subnet. 118 """ 119 context = make_custom_context(ctx) 120 client = context.com_client() 121 resolved_key = context.load_key(key, None) 122 123 with context.progress_status("Registering subnet ..."): 124 response = client.register_subnet(resolved_key, name, metadata) 125 126 if response.is_success: 127 context.info(f"Successfully registered subnet {name}") 128 else: 129 raise ChainTransactionError(response.error_message) # type: ignore 130 131 132@subnet_app.command() 133def update( 134 ctx: Context, 135 key: str, 136 netuid: int, 137 founder: str = typer.Option(None), 138 founder_share: int = typer.Option(None), 139 name: str = typer.Option(None), 140 metadata: str = typer.Option(None), 141 immunity_period: int = typer.Option(None), 142 incentive_ratio: int = typer.Option(None), 143 max_allowed_uids: int = typer.Option(None), 144 max_allowed_weights: int = typer.Option(None), 145 min_allowed_weights: int = typer.Option(None), 146 max_weight_age: float = typer.Option(None), 147 tempo: int = typer.Option(None), 148 trust_ratio: int = typer.Option(None), 149 maximum_set_weight_calls_per_epoch: int = typer.Option(None), 150 # GovernanceConfiguration 151 vote_mode: VoteMode = typer.Option(None), 152 bonds_ma: int = typer.Option(None), 153 # BurnConfiguration 154 min_burn: int = typer.Option(None), 155 max_burn: int = typer.Option(None), 156 adjustment_alpha: int = typer.Option(None), 157 target_registrations_interval: int = typer.Option(None), 158 target_registrations_per_interval: int = typer.Option(None), 159 max_registrations_per_interval: int = typer.Option(None), 160 min_validator_stake: int = typer.Option(None), 161 max_allowed_validators: int = typer.Option(None), 162): 163 """ 164 Updates a subnet. 165 """ 166 provided_params = locals().copy() 167 provided_params.pop("ctx") 168 provided_params.pop("key") 169 provided_params.pop("netuid") 170 171 provided_params = { 172 key: value 173 for key, value in provided_params.items() 174 if value is not None 175 } 176 if vote_mode is not None: # type: ignore 177 provided_params["vote_mode"] = vote_mode.value 178 context = make_custom_context(ctx) 179 client = context.com_client() 180 subnets_info = get_map_subnets_params(client) 181 subnet_params = subnets_info[netuid] 182 subnet_vote_mode = subnet_params["governance_config"]["vote_mode"] # type: ignore 183 subnet_burn_config = subnet_params["module_burn_config"] # type: ignore 184 # intersection update 185 for param, value in provided_params.items(): 186 if param in subnet_burn_config and value is not None: 187 subnet_burn_config[param] = value 188 189 subnet_params = dict(subnet_params) 190 subnet_params.pop("emission") 191 subnet_params.pop("governance_config") 192 subnet_params["vote_mode"] = subnet_vote_mode # type: ignore 193 194 subnet_params = cast(SubnetParams, subnet_params) 195 provided_params = cast(SubnetParams, provided_params) 196 subnet_params.update(provided_params) 197 # because bonds_ma and maximum_set_weights dont have a default value 198 if subnet_params.get("bonds_ma", None) is None: 199 subnet_params["bonds_ma"] = client.query("BondsMovingAverage") 200 if subnet_params.get("maximum_set_weight_calls_per_epoch", None) is None: 201 subnet_params["maximum_set_weight_calls_per_epoch"] = client.query( 202 "MaximumSetWeightCallsPerEpoch" 203 ) 204 resolved_key = context.load_key(key, None) 205 with context.progress_status("Updating subnet ..."): 206 response = client.update_subnet( 207 key=resolved_key, params=subnet_params, netuid=netuid 208 ) 209 210 if response.is_success: 211 context.info( 212 f"Successfully updated subnet {subnet_params['name']} with netuid {netuid}" 213 ) 214 else: 215 raise ChainTransactionError(response.error_message) # type: ignore 216 217 218@subnet_app.command() 219def propose_on_subnet( 220 ctx: Context, 221 key: str, 222 netuid: int, 223 cid: str, 224 founder: str = typer.Option(None), 225 founder_share: int = typer.Option(None), 226 metadata: str = typer.Option(None), 227 name: str = typer.Option(None), 228 immunity_period: int = typer.Option(None), 229 incentive_ratio: int = typer.Option(None), 230 max_allowed_uids: int = typer.Option(None), 231 max_allowed_weights: int = typer.Option(None), 232 min_allowed_weights: int = typer.Option(None), 233 max_weight_age: int = typer.Option(None), 234 tempo: int = typer.Option(None), 235 trust_ratio: int = typer.Option(None), 236 maximum_set_weight_calls_per_epoch: int = typer.Option(None), 237 bonds_ma: int = typer.Option(None), 238 vote_mode: VoteMode = typer.Option( 239 None, help="0 for Authority, 1 for Vote" 240 ), 241 # BurnConfiguration 242 min_burn: int = typer.Option(None), 243 max_burn: int = typer.Option(None), 244 adjustment_alpha: int = typer.Option(None), 245 target_registrations_interval: int = typer.Option(None), 246 target_registrations_per_interval: int = typer.Option(None), 247 max_registrations_per_interval: int = typer.Option(None), 248 min_validator_stake: int = typer.Option(None), 249 max_allowed_validators: int = typer.Option(None), 250): 251 """ 252 Adds a proposal to a specific subnet. 253 """ 254 context = make_custom_context(ctx) 255 if not re.match(IPFS_REGEX, cid): 256 context.error(f"CID provided is invalid: {cid}") 257 exit(1) 258 else: 259 ipfs_prefix = "ipfs://" 260 cid = ipfs_prefix + cid 261 262 provided_params = locals().copy() 263 provided_params.pop("ctx") 264 provided_params.pop("context") 265 provided_params.pop("key") 266 provided_params.pop("ipfs_prefix") 267 if provided_params["founder"] is not None: 268 resolve_founder = resolve_key_ss58(founder) 269 provided_params["founder"] = resolve_founder 270 271 provided_params = { 272 key: value 273 for key, value in provided_params.items() 274 if value is not None 275 } 276 277 client = context.com_client() 278 subnets_info = get_map_subnets_params(client) 279 subnet_params = subnets_info[netuid] 280 subnet_vote_mode = subnet_params["governance_config"]["vote_mode"] # type: ignore 281 subnet_burn_config = subnet_params["module_burn_config"] # type: ignore 282 # intersection update 283 for param, value in provided_params.items(): 284 if param in subnet_burn_config and value is not None: 285 subnet_burn_config[param] = value 286 subnet_params["vote_mode"] = subnet_vote_mode # type: ignore 287 288 subnet_params = dict(subnet_params) 289 subnet_params.pop("emission") 290 subnet_params.pop("governance_config") 291 292 subnet_params.update(provided_params) 293 # because bonds_ma and maximum_set_weights dont have a default value 294 if subnet_params.get("bonds_ma", None) is None: 295 subnet_params["bonds_ma"] = client.query("BondsMovingAverage") 296 if subnet_params.get("maximum_set_weight_calls_per_epoch", None) is None: 297 subnet_params["maximum_set_weight_calls_per_epoch"] = client.query( 298 "MaximumSetWeightCallsPerEpoch" 299 ) 300 301 resolved_key = context.load_key(key, None) 302 with context.progress_status("Adding a proposal..."): 303 client.add_subnet_proposal( 304 resolved_key, subnet_params, cid, netuid=netuid 305 ) 306 context.info("Proposal added.") 307 308 309@subnet_app.command() 310def submit_general_subnet_application( 311 ctx: Context, key: str, application_key: str, cid: str 312): 313 """ 314 Submits a legitimate whitelist application to the general subnet, netuid 0. 315 """ 316 317 context = make_custom_context(ctx) 318 if not re.match(IPFS_REGEX, cid): 319 context.error(f"CID provided is invalid: {cid}") 320 exit(1) 321 322 client = context.com_client() 323 324 resolved_key = context.load_key(key, None) 325 resolved_application_key = resolve_key_ss58(application_key) 326 327 # append the ipfs hash 328 ipfs_prefix = "ipfs://" 329 cid = ipfs_prefix + cid 330 331 with context.progress_status("Adding a application..."): 332 client.add_dao_application(resolved_key, resolved_application_key, cid) 333 334 335@subnet_app.command() 336def add_custom_proposal( 337 ctx: Context, 338 key: str, 339 cid: str, 340 netuid: int, 341): 342 """ 343 Adds a custom proposal to a specific subnet. 344 """ 345 context = make_custom_context(ctx) 346 347 if not re.match(IPFS_REGEX, cid): 348 context.error(f"CID provided is invalid: {cid}") 349 exit(1) 350 351 client = context.com_client() 352 353 resolved_key = context.load_key(key, None) 354 355 ipfs_prefix = "ipfs://" 356 cid = ipfs_prefix + cid 357 358 with context.progress_status("Adding a proposal..."): 359 client.add_custom_subnet_proposal(resolved_key, cid, netuid=netuid) 360 361 362@subnet_app.command() 363def list_curator_applications(ctx: Context): 364 """ 365 Lists all curator applications. 366 """ 367 context = make_custom_context(ctx) 368 client = context.com_client() 369 370 with context.progress_status("Querying applications..."): 371 apps = client.query_map_curator_applications() 372 print(apps)
subnet_app =
<typer.main.Typer object>
@subnet_app.command()
def
list(ctx: typer.models.Context):
25@subnet_app.command() 26def list(ctx: Context): 27 """ 28 Gets subnets. 29 """ 30 context = make_custom_context(ctx) 31 client = context.com_client() 32 33 with context.progress_status("Getting subnets ..."): 34 subnets = get_map_displayable_subnets(client) 35 subnets_with_netuids = [ 36 {"netuid": key, **value} for key, value in subnets.items() 37 ] 38 for dict in subnets_with_netuids: # type: ignore 39 print_table_from_plain_dict(dict, ["Params", "Values"], context.console) # type: ignore
Gets subnets.
@subnet_app.command()
def
distribution(ctx: typer.models.Context):
42@subnet_app.command() 43def distribution(ctx: Context): 44 context = make_custom_context(ctx) 45 client = context.com_client() 46 47 with context.progress_status("Getting emission distribution..."): 48 subnets_emission = client.query_map_subnet_emission() 49 subnet_consensus = client.query_map_subnet_consensus() 50 subnet_names = client.query_map_subnet_names() 51 total_emission = sum(subnets_emission.values()) 52 subnet_emission_percentages = { 53 key: value / total_emission * 100 54 for key, value in subnets_emission.items() 55 } 56 57 # Prepare the data for the table 58 table_data: dict[str, Any] = { 59 "Subnet": [], 60 "Name": [], 61 "Consensus": [], 62 "Emission %": [], 63 } 64 65 for subnet, emission_percentage in subnet_emission_percentages.items(): 66 if emission_percentage > 0: 67 table_data["Subnet"].append(str(subnet)) 68 table_data["Name"].append(subnet_names.get(subnet, "N/A")) 69 table_data["Consensus"].append(subnet_consensus.get(subnet, "N/A")) 70 table_data["Emission %"].append(f"{round(emission_percentage, 2)}%") 71 72 print_table_standardize(table_data, context.console)
@subnet_app.command()
def
legit_whitelist(ctx: typer.models.Context):
75@subnet_app.command() 76def legit_whitelist(ctx: Context): 77 """ 78 Gets the legitimate whitelist of modules for the general subnet 0 79 """ 80 81 context = make_custom_context(ctx) 82 client = context.com_client() 83 84 with context.progress_status("Getting legitimate whitelist ..."): 85 whitelist = cast(dict[str, int], client.query_map_legit_whitelist()) 86 87 print_table_from_plain_dict( 88 whitelist, ["Module", "Recommended weight"], context.console 89 )
Gets the legitimate whitelist of modules for the general subnet 0
@subnet_app.command()
def
info(ctx: typer.models.Context, netuid: int):
92@subnet_app.command() 93def info(ctx: Context, netuid: int): 94 """ 95 Gets subnet info. 96 """ 97 context = make_custom_context(ctx) 98 client = context.com_client() 99 100 with context.progress_status(f"Getting subnet with netuid '{netuid}'..."): 101 subnets = get_map_displayable_subnets(client) 102 subnet = subnets.get(netuid, None) 103 104 if subnet is None: 105 raise ValueError("Subnet not found") 106 107 general_subnet: dict[str, Any] = cast(dict[str, Any], subnet) 108 print_table_from_plain_dict( 109 general_subnet, ["Params", "Values"], context.console 110 )
Gets subnet info.
@subnet_app.command()
def
register( ctx: typer.models.Context, key: str, name: str, metadata: str = <typer.models.OptionInfo object>):
113@subnet_app.command() 114def register( 115 ctx: Context, key: str, name: str, metadata: str = typer.Option(None) 116): 117 """ 118 Registers a new subnet. 119 """ 120 context = make_custom_context(ctx) 121 client = context.com_client() 122 resolved_key = context.load_key(key, None) 123 124 with context.progress_status("Registering subnet ..."): 125 response = client.register_subnet(resolved_key, name, metadata) 126 127 if response.is_success: 128 context.info(f"Successfully registered subnet {name}") 129 else: 130 raise ChainTransactionError(response.error_message) # type: ignore
Registers a new subnet.
@subnet_app.command()
def
update( ctx: typer.models.Context, key: str, netuid: int, founder: str = <typer.models.OptionInfo object>, founder_share: int = <typer.models.OptionInfo object>, name: str = <typer.models.OptionInfo object>, metadata: str = <typer.models.OptionInfo object>, immunity_period: int = <typer.models.OptionInfo object>, incentive_ratio: int = <typer.models.OptionInfo object>, max_allowed_uids: int = <typer.models.OptionInfo object>, max_allowed_weights: int = <typer.models.OptionInfo object>, min_allowed_weights: int = <typer.models.OptionInfo object>, max_weight_age: float = <typer.models.OptionInfo object>, tempo: int = <typer.models.OptionInfo object>, trust_ratio: int = <typer.models.OptionInfo object>, maximum_set_weight_calls_per_epoch: int = <typer.models.OptionInfo object>, vote_mode: communex.types.VoteMode = <typer.models.OptionInfo object>, bonds_ma: int = <typer.models.OptionInfo object>, min_burn: int = <typer.models.OptionInfo object>, max_burn: int = <typer.models.OptionInfo object>, adjustment_alpha: int = <typer.models.OptionInfo object>, target_registrations_interval: int = <typer.models.OptionInfo object>, target_registrations_per_interval: int = <typer.models.OptionInfo object>, max_registrations_per_interval: int = <typer.models.OptionInfo object>, min_validator_stake: int = <typer.models.OptionInfo object>, max_allowed_validators: int = <typer.models.OptionInfo object>):
133@subnet_app.command() 134def update( 135 ctx: Context, 136 key: str, 137 netuid: int, 138 founder: str = typer.Option(None), 139 founder_share: int = typer.Option(None), 140 name: str = typer.Option(None), 141 metadata: str = typer.Option(None), 142 immunity_period: int = typer.Option(None), 143 incentive_ratio: int = typer.Option(None), 144 max_allowed_uids: int = typer.Option(None), 145 max_allowed_weights: int = typer.Option(None), 146 min_allowed_weights: int = typer.Option(None), 147 max_weight_age: float = typer.Option(None), 148 tempo: int = typer.Option(None), 149 trust_ratio: int = typer.Option(None), 150 maximum_set_weight_calls_per_epoch: int = typer.Option(None), 151 # GovernanceConfiguration 152 vote_mode: VoteMode = typer.Option(None), 153 bonds_ma: int = typer.Option(None), 154 # BurnConfiguration 155 min_burn: int = typer.Option(None), 156 max_burn: int = typer.Option(None), 157 adjustment_alpha: int = typer.Option(None), 158 target_registrations_interval: int = typer.Option(None), 159 target_registrations_per_interval: int = typer.Option(None), 160 max_registrations_per_interval: int = typer.Option(None), 161 min_validator_stake: int = typer.Option(None), 162 max_allowed_validators: int = typer.Option(None), 163): 164 """ 165 Updates a subnet. 166 """ 167 provided_params = locals().copy() 168 provided_params.pop("ctx") 169 provided_params.pop("key") 170 provided_params.pop("netuid") 171 172 provided_params = { 173 key: value 174 for key, value in provided_params.items() 175 if value is not None 176 } 177 if vote_mode is not None: # type: ignore 178 provided_params["vote_mode"] = vote_mode.value 179 context = make_custom_context(ctx) 180 client = context.com_client() 181 subnets_info = get_map_subnets_params(client) 182 subnet_params = subnets_info[netuid] 183 subnet_vote_mode = subnet_params["governance_config"]["vote_mode"] # type: ignore 184 subnet_burn_config = subnet_params["module_burn_config"] # type: ignore 185 # intersection update 186 for param, value in provided_params.items(): 187 if param in subnet_burn_config and value is not None: 188 subnet_burn_config[param] = value 189 190 subnet_params = dict(subnet_params) 191 subnet_params.pop("emission") 192 subnet_params.pop("governance_config") 193 subnet_params["vote_mode"] = subnet_vote_mode # type: ignore 194 195 subnet_params = cast(SubnetParams, subnet_params) 196 provided_params = cast(SubnetParams, provided_params) 197 subnet_params.update(provided_params) 198 # because bonds_ma and maximum_set_weights dont have a default value 199 if subnet_params.get("bonds_ma", None) is None: 200 subnet_params["bonds_ma"] = client.query("BondsMovingAverage") 201 if subnet_params.get("maximum_set_weight_calls_per_epoch", None) is None: 202 subnet_params["maximum_set_weight_calls_per_epoch"] = client.query( 203 "MaximumSetWeightCallsPerEpoch" 204 ) 205 resolved_key = context.load_key(key, None) 206 with context.progress_status("Updating subnet ..."): 207 response = client.update_subnet( 208 key=resolved_key, params=subnet_params, netuid=netuid 209 ) 210 211 if response.is_success: 212 context.info( 213 f"Successfully updated subnet {subnet_params['name']} with netuid {netuid}" 214 ) 215 else: 216 raise ChainTransactionError(response.error_message) # type: ignore
Updates a subnet.
@subnet_app.command()
def
propose_on_subnet( ctx: typer.models.Context, key: str, netuid: int, cid: str, founder: str = <typer.models.OptionInfo object>, founder_share: int = <typer.models.OptionInfo object>, metadata: str = <typer.models.OptionInfo object>, name: str = <typer.models.OptionInfo object>, immunity_period: int = <typer.models.OptionInfo object>, incentive_ratio: int = <typer.models.OptionInfo object>, max_allowed_uids: int = <typer.models.OptionInfo object>, max_allowed_weights: int = <typer.models.OptionInfo object>, min_allowed_weights: int = <typer.models.OptionInfo object>, max_weight_age: int = <typer.models.OptionInfo object>, tempo: int = <typer.models.OptionInfo object>, trust_ratio: int = <typer.models.OptionInfo object>, maximum_set_weight_calls_per_epoch: int = <typer.models.OptionInfo object>, bonds_ma: int = <typer.models.OptionInfo object>, vote_mode: communex.types.VoteMode = <typer.models.OptionInfo object>, min_burn: int = <typer.models.OptionInfo object>, max_burn: int = <typer.models.OptionInfo object>, adjustment_alpha: int = <typer.models.OptionInfo object>, target_registrations_interval: int = <typer.models.OptionInfo object>, target_registrations_per_interval: int = <typer.models.OptionInfo object>, max_registrations_per_interval: int = <typer.models.OptionInfo object>, min_validator_stake: int = <typer.models.OptionInfo object>, max_allowed_validators: int = <typer.models.OptionInfo object>):
219@subnet_app.command() 220def propose_on_subnet( 221 ctx: Context, 222 key: str, 223 netuid: int, 224 cid: str, 225 founder: str = typer.Option(None), 226 founder_share: int = typer.Option(None), 227 metadata: str = typer.Option(None), 228 name: str = typer.Option(None), 229 immunity_period: int = typer.Option(None), 230 incentive_ratio: int = typer.Option(None), 231 max_allowed_uids: int = typer.Option(None), 232 max_allowed_weights: int = typer.Option(None), 233 min_allowed_weights: int = typer.Option(None), 234 max_weight_age: int = typer.Option(None), 235 tempo: int = typer.Option(None), 236 trust_ratio: int = typer.Option(None), 237 maximum_set_weight_calls_per_epoch: int = typer.Option(None), 238 bonds_ma: int = typer.Option(None), 239 vote_mode: VoteMode = typer.Option( 240 None, help="0 for Authority, 1 for Vote" 241 ), 242 # BurnConfiguration 243 min_burn: int = typer.Option(None), 244 max_burn: int = typer.Option(None), 245 adjustment_alpha: int = typer.Option(None), 246 target_registrations_interval: int = typer.Option(None), 247 target_registrations_per_interval: int = typer.Option(None), 248 max_registrations_per_interval: int = typer.Option(None), 249 min_validator_stake: int = typer.Option(None), 250 max_allowed_validators: int = typer.Option(None), 251): 252 """ 253 Adds a proposal to a specific subnet. 254 """ 255 context = make_custom_context(ctx) 256 if not re.match(IPFS_REGEX, cid): 257 context.error(f"CID provided is invalid: {cid}") 258 exit(1) 259 else: 260 ipfs_prefix = "ipfs://" 261 cid = ipfs_prefix + cid 262 263 provided_params = locals().copy() 264 provided_params.pop("ctx") 265 provided_params.pop("context") 266 provided_params.pop("key") 267 provided_params.pop("ipfs_prefix") 268 if provided_params["founder"] is not None: 269 resolve_founder = resolve_key_ss58(founder) 270 provided_params["founder"] = resolve_founder 271 272 provided_params = { 273 key: value 274 for key, value in provided_params.items() 275 if value is not None 276 } 277 278 client = context.com_client() 279 subnets_info = get_map_subnets_params(client) 280 subnet_params = subnets_info[netuid] 281 subnet_vote_mode = subnet_params["governance_config"]["vote_mode"] # type: ignore 282 subnet_burn_config = subnet_params["module_burn_config"] # type: ignore 283 # intersection update 284 for param, value in provided_params.items(): 285 if param in subnet_burn_config and value is not None: 286 subnet_burn_config[param] = value 287 subnet_params["vote_mode"] = subnet_vote_mode # type: ignore 288 289 subnet_params = dict(subnet_params) 290 subnet_params.pop("emission") 291 subnet_params.pop("governance_config") 292 293 subnet_params.update(provided_params) 294 # because bonds_ma and maximum_set_weights dont have a default value 295 if subnet_params.get("bonds_ma", None) is None: 296 subnet_params["bonds_ma"] = client.query("BondsMovingAverage") 297 if subnet_params.get("maximum_set_weight_calls_per_epoch", None) is None: 298 subnet_params["maximum_set_weight_calls_per_epoch"] = client.query( 299 "MaximumSetWeightCallsPerEpoch" 300 ) 301 302 resolved_key = context.load_key(key, None) 303 with context.progress_status("Adding a proposal..."): 304 client.add_subnet_proposal( 305 resolved_key, subnet_params, cid, netuid=netuid 306 ) 307 context.info("Proposal added.")
Adds a proposal to a specific subnet.
@subnet_app.command()
def
submit_general_subnet_application(ctx: typer.models.Context, key: str, application_key: str, cid: str):
310@subnet_app.command() 311def submit_general_subnet_application( 312 ctx: Context, key: str, application_key: str, cid: str 313): 314 """ 315 Submits a legitimate whitelist application to the general subnet, netuid 0. 316 """ 317 318 context = make_custom_context(ctx) 319 if not re.match(IPFS_REGEX, cid): 320 context.error(f"CID provided is invalid: {cid}") 321 exit(1) 322 323 client = context.com_client() 324 325 resolved_key = context.load_key(key, None) 326 resolved_application_key = resolve_key_ss58(application_key) 327 328 # append the ipfs hash 329 ipfs_prefix = "ipfs://" 330 cid = ipfs_prefix + cid 331 332 with context.progress_status("Adding a application..."): 333 client.add_dao_application(resolved_key, resolved_application_key, cid)
Submits a legitimate whitelist application to the general subnet, netuid 0.
@subnet_app.command()
def
add_custom_proposal(ctx: typer.models.Context, key: str, cid: str, netuid: int):
336@subnet_app.command() 337def add_custom_proposal( 338 ctx: Context, 339 key: str, 340 cid: str, 341 netuid: int, 342): 343 """ 344 Adds a custom proposal to a specific subnet. 345 """ 346 context = make_custom_context(ctx) 347 348 if not re.match(IPFS_REGEX, cid): 349 context.error(f"CID provided is invalid: {cid}") 350 exit(1) 351 352 client = context.com_client() 353 354 resolved_key = context.load_key(key, None) 355 356 ipfs_prefix = "ipfs://" 357 cid = ipfs_prefix + cid 358 359 with context.progress_status("Adding a proposal..."): 360 client.add_custom_subnet_proposal(resolved_key, cid, netuid=netuid)
Adds a custom proposal to a specific subnet.
@subnet_app.command()
def
list_curator_applications(ctx: typer.models.Context):
363@subnet_app.command() 364def list_curator_applications(ctx: Context): 365 """ 366 Lists all curator applications. 367 """ 368 context = make_custom_context(ctx) 369 client = context.com_client() 370 371 with context.progress_status("Querying applications..."): 372 apps = client.query_map_curator_applications() 373 print(apps)
Lists all curator applications.