Edit on GitHub

communex.cli.subnet

  1import re
  2from typing import Any, cast
  3
  4import typer
  5from typer import Context
  6
  7from communex.balance import from_nano
  8from communex.cli._common import (make_custom_context,
  9                                  print_table_from_plain_dict,
 10                                  print_table_standardize)
 11from communex.compat.key import resolve_key_ss58, try_classic_load_key
 12from communex.errors import ChainTransactionError
 13from communex.misc import IPFS_REGEX, get_map_subnets_params
 14from communex.types import SubnetParams, VoteMode
 15
 16subnet_app = typer.Typer(no_args_is_help=True)
 17
 18
 19@subnet_app.command()
 20def list(ctx: Context):
 21    """
 22    Gets subnets.
 23    """
 24    context = make_custom_context(ctx)
 25    client = context.com_client()
 26
 27    with context.progress_status("Getting subnets ..."):
 28        subnets = get_map_subnets_params(client)
 29
 30    keys, values = subnets.keys(), subnets.values()
 31    subnets_with_netuids = [
 32        {"netuid": key, **value} for key, value in zip(keys, values)
 33    ]
 34
 35    for subnet_dict in subnets_with_netuids:  # type: ignore
 36        bonds = subnet_dict["bonds_ma"]  # type: ignore
 37        if bonds:
 38            subnet_dict["bonds_ma"] = str(
 39                from_nano(subnet_dict["bonds_ma"])) + " J"  # type: ignore
 40
 41    for dict in subnets_with_netuids:  # type: ignore
 42        print_table_from_plain_dict(
 43            dict, ["Params", "Values"], context.console)  # type: ignore
 44
 45
 46@subnet_app.command()
 47def distribution(ctx: Context):
 48    context = make_custom_context(ctx)
 49    client = context.com_client()
 50
 51    with context.progress_status("Getting emission distribution..."):
 52        subnets_emission = client.query_map_subnet_emission()
 53        subnet_consensus = client.query_map_subnet_consensus()
 54        subnet_names = client.query_map_subnet_names()
 55        total_emission = sum(subnets_emission.values())
 56        subnet_emission_percentages = {
 57            key: value / total_emission * 100 for key, value in subnets_emission.items()
 58        }
 59
 60    # Prepare the data for the table
 61    table_data: dict[str, Any] = {
 62        "Subnet": [],
 63        "Name": [],
 64        "Consensus": [],
 65        "Emission %": []
 66    }
 67
 68    for subnet, emission_percentage in subnet_emission_percentages.items():
 69        if emission_percentage > 0:
 70            table_data["Subnet"].append(str(subnet))
 71            table_data["Name"].append(subnet_names.get(subnet, "N/A"))
 72            table_data["Consensus"].append(subnet_consensus.get(subnet, "N/A"))
 73            table_data["Emission %"].append(f"{round(emission_percentage, 2)}%")
 74
 75    print_table_standardize(table_data, context.console)
 76
 77
 78@subnet_app.command()
 79def legit_whitelist(ctx: Context):
 80    """
 81    Gets the legitimate whitelist of modules for the general subnet 0
 82    """
 83
 84    context = make_custom_context(ctx)
 85    client = context.com_client()
 86
 87    with context.progress_status("Getting legitimate whitelist ..."):
 88        whitelist = cast(dict[str, int], client.query_map_legit_whitelist())
 89
 90    print_table_from_plain_dict(
 91        whitelist, ["Module", "Recommended weight"], context.console
 92    )
 93
 94
 95@subnet_app.command()
 96def info(ctx: Context, netuid: int):
 97    """
 98    Gets subnet info.
 99    """
100    context = make_custom_context(ctx)
101    client = context.com_client()
102
103    with context.progress_status(f"Getting subnet with netuid '{netuid}'..."):
104
105        subnets = get_map_subnets_params(client)
106        subnet = subnets.get(netuid, None)
107
108    if subnet is None:
109        raise ValueError("Subnet not found")
110
111    general_subnet: dict[str, Any] = cast(dict[str, Any], subnet)
112    print_table_from_plain_dict(
113        general_subnet, ["Params", "Values"], context.console)
114
115
116@subnet_app.command()
117def register(
118    ctx: Context,
119    key: str,
120    name: str,
121    metadata: str = typer.Option(None)
122):
123    """
124    Registers a new subnet.
125    """
126    context = make_custom_context(ctx)
127    resolved_key = try_classic_load_key(key)
128    client = context.com_client()
129
130    with context.progress_status("Registering subnet ..."):
131        response = client.register_subnet(resolved_key, name, metadata)
132
133    if response.is_success:
134        context.info(f"Successfully registered subnet {name}")
135    else:
136        raise ChainTransactionError(response.error_message)  # type: ignore
137
138
139@subnet_app.command()
140def update(
141    ctx: Context,
142    key: str,
143    netuid: int,
144    founder: str = typer.Option(None),
145    founder_share: int = typer.Option(None),
146    name: str = typer.Option(None),
147    metadata: str = typer.Option(None),
148    immunity_period: int = typer.Option(None),
149    incentive_ratio: int = typer.Option(None),
150    max_allowed_uids: int = typer.Option(None),
151    max_allowed_weights: int = typer.Option(None),
152    min_allowed_weights: int = typer.Option(None),
153    max_weight_age: float = typer.Option(None),
154    tempo: int = typer.Option(None),
155    trust_ratio: int = typer.Option(None),
156    maximum_set_weight_calls_per_epoch: int = typer.Option(None),
157
158    # GovernanceConfiguration
159    vote_mode: VoteMode = typer.Option(None),
160
161    bonds_ma: int = typer.Option(None),
162
163    # BurnConfiguration
164    min_burn: int = typer.Option(None),
165    max_burn: int = typer.Option(None),
166    adjustment_alpha: int = typer.Option(None),
167    target_registrations_interval: int = typer.Option(None),
168    target_registrations_per_interval: int = typer.Option(None),
169    max_registrations_per_interval: int = typer.Option(None),
170
171    min_validator_stake: int = typer.Option(None),
172    max_allowed_validators: int = typer.Option(None),
173):
174    """
175    Updates a subnet.
176    """
177    provided_params = locals().copy()
178    provided_params.pop("ctx")
179    provided_params.pop("key")
180    provided_params.pop("netuid")
181
182    provided_params = {
183        key: value for key, value in provided_params.items() if value is not None
184    }
185    if vote_mode is not None:  # type: ignore
186        provided_params["vote_mode"] = vote_mode.value
187    context = make_custom_context(ctx)
188    client = context.com_client()
189    subnets_info = get_map_subnets_params(client)
190    subnet_params = subnets_info[netuid]
191    subnet_vote_mode = subnet_params["governance_config"]["vote_mode"]  # type: ignore
192    subnet_burn_config = subnet_params["module_burn_config"]  # type: ignore
193    # intersection update
194    for param, value in provided_params.items():
195        if param in subnet_burn_config and value is not None:
196            subnet_burn_config[param] = value
197
198    subnet_params = dict(subnet_params)
199    subnet_params.pop("emission")
200    subnet_params.pop("governance_config")
201    subnet_params["vote_mode"] = subnet_vote_mode  # type: ignore
202
203    subnet_params = cast(SubnetParams, subnet_params)
204    provided_params = cast(SubnetParams, provided_params)
205    subnet_params.update(provided_params)
206    # because bonds_ma and maximum_set_weights dont have a default value
207    if subnet_params.get("bonds_ma", None) is None:
208        subnet_params["bonds_ma"] = client.query("BondsMovingAverage")
209    if subnet_params.get("maximum_set_weight_calls_per_epoch", None) is None:
210        subnet_params["maximum_set_weight_calls_per_epoch"] = client.query(
211            "MaximumSetWeightCallsPerEpoch"
212        )
213    resolved_key = try_classic_load_key(key)
214    with context.progress_status("Updating subnet ..."):
215        response = client.update_subnet(
216            key=resolved_key, params=subnet_params, netuid=netuid
217        )
218
219    if response.is_success:
220        context.info(
221            f"Successfully updated subnet {subnet_params['name']} with netuid {netuid}"
222        )
223    else:
224        raise ChainTransactionError(response.error_message)  # type: ignore
225
226
227@subnet_app.command()
228def propose_on_subnet(
229    ctx: Context,
230    key: str,
231    netuid: int,
232    cid: str,
233    founder: str = typer.Option(None),
234    founder_share: int = typer.Option(None),
235    metadata: str = typer.Option(None),
236    name: str = typer.Option(None),
237    immunity_period: int = typer.Option(None),
238    incentive_ratio: int = typer.Option(None),
239    max_allowed_uids: int = typer.Option(None),
240    max_allowed_weights: int = typer.Option(None),
241    min_allowed_weights: int = typer.Option(None),
242    max_weight_age: int = typer.Option(None),
243    tempo: int = typer.Option(None),
244    trust_ratio: int = typer.Option(None),
245    maximum_set_weight_calls_per_epoch: int = typer.Option(None),
246    bonds_ma: int = typer.Option(None),
247
248    vote_mode: VoteMode = typer.Option(None, help="0 for Authority, 1 for Vote"),
249
250    # BurnConfiguration
251    min_burn: int = typer.Option(None),
252    max_burn: int = typer.Option(None),
253    adjustment_alpha: int = typer.Option(None),
254    target_registrations_interval: int = typer.Option(None),
255    target_registrations_per_interval: int = typer.Option(None),
256    max_registrations_per_interval: int = typer.Option(None),
257
258    min_validator_stake: int = typer.Option(None),
259    max_allowed_validators: int = typer.Option(None),
260):
261    """
262    Adds a proposal to a specific subnet.
263    """
264    context = make_custom_context(ctx)
265    if not re.match(IPFS_REGEX, cid):
266        context.error(f"CID provided is invalid: {cid}")
267        exit(1)
268    else:
269        ipfs_prefix = "ipfs://"
270        cid = ipfs_prefix + cid
271
272    provided_params = locals().copy()
273    provided_params.pop("ctx")
274    provided_params.pop("context")
275    provided_params.pop("key")
276    provided_params.pop("ipfs_prefix")
277    if provided_params["founder"] is not None:
278        resolve_founder = resolve_key_ss58(founder)
279        provided_params["founder"] = resolve_founder
280
281    provided_params = {
282        key: value for key, value in provided_params.items() if value is not None
283    }
284
285    client = context.com_client()
286    subnets_info = get_map_subnets_params(client)
287    subnet_params = subnets_info[netuid]
288    subnet_vote_mode = subnet_params["governance_config"]["vote_mode"]  # type: ignore
289    subnet_burn_config = subnet_params["module_burn_config"]  # type: ignore
290    # intersection update
291    for param, value in provided_params.items():
292        if param in subnet_burn_config and value is not None:
293            subnet_burn_config[param] = value
294    subnet_params["vote_mode"] = subnet_vote_mode  # type: ignore
295
296    subnet_params = dict(subnet_params)
297    subnet_params.pop("emission")
298    subnet_params.pop("governance_config")
299
300    subnet_params.update(provided_params)
301    # because bonds_ma and maximum_set_weights dont have a default value
302    if subnet_params.get("bonds_ma", None) is None:
303        subnet_params["bonds_ma"] = client.query("BondsMovingAverage")
304    if subnet_params.get("maximum_set_weight_calls_per_epoch", None) is None:
305        subnet_params["maximum_set_weight_calls_per_epoch"] = client.query(
306            "MaximumSetWeightCallsPerEpoch"
307        )
308
309    resolved_key = try_classic_load_key(key)
310    with context.progress_status("Adding a proposal..."):
311        client.add_subnet_proposal(
312            resolved_key,
313            subnet_params,
314            cid,
315            netuid=netuid
316        )
317    context.info("Proposal added.")
318
319
320@subnet_app.command()
321def submit_general_subnet_application(
322    ctx: Context, key: str, application_key: str, cid: str
323):
324    """
325    Submits a legitimate whitelist application to the general subnet, netuid 0.
326    """
327
328    context = make_custom_context(ctx)
329    if not re.match(IPFS_REGEX, cid):
330        context.error(f"CID provided is invalid: {cid}")
331        exit(1)
332
333    client = context.com_client()
334
335    resolved_key = try_classic_load_key(key)
336    resolved_application_key = resolve_key_ss58(application_key)
337
338    # append the ipfs hash
339    ipfs_prefix = "ipfs://"
340    cid = ipfs_prefix + cid
341
342    with context.progress_status("Adding a application..."):
343        client.add_dao_application(resolved_key, resolved_application_key, cid)
344
345
346@subnet_app.command()
347def add_custom_proposal(
348    ctx: Context,
349    key: str,
350    cid: str,
351    netuid: int,
352):
353    """
354    Adds a custom proposal to a specific subnet.
355    """
356
357    context = make_custom_context(ctx)
358    if not re.match(IPFS_REGEX, cid):
359        context.error(f"CID provided is invalid: {cid}")
360        exit(1)
361
362    client = context.com_client()
363
364    resolved_key = try_classic_load_key(key)
365
366    # append the ipfs hash
367    ipfs_prefix = "ipfs://"
368    cid = ipfs_prefix + cid
369
370    with context.progress_status("Adding a proposal..."):
371        client.add_custom_subnet_proposal(resolved_key, cid, netuid=netuid)
372
373
374@subnet_app.command()
375def list_curator_applications(
376    ctx: Context
377):
378    """
379    Lists all curator applications.
380    """
381    context = make_custom_context(ctx)
382    client = context.com_client()
383
384    with context.progress_status("Querying applications..."):
385        apps = client.query_map_curator_applications()
386    print(apps)
subnet_app = <typer.main.Typer object>
@subnet_app.command()
def list(ctx: typer.models.Context):
20@subnet_app.command()
21def list(ctx: Context):
22    """
23    Gets subnets.
24    """
25    context = make_custom_context(ctx)
26    client = context.com_client()
27
28    with context.progress_status("Getting subnets ..."):
29        subnets = get_map_subnets_params(client)
30
31    keys, values = subnets.keys(), subnets.values()
32    subnets_with_netuids = [
33        {"netuid": key, **value} for key, value in zip(keys, values)
34    ]
35
36    for subnet_dict in subnets_with_netuids:  # type: ignore
37        bonds = subnet_dict["bonds_ma"]  # type: ignore
38        if bonds:
39            subnet_dict["bonds_ma"] = str(
40                from_nano(subnet_dict["bonds_ma"])) + " J"  # type: ignore
41
42    for dict in subnets_with_netuids:  # type: ignore
43        print_table_from_plain_dict(
44            dict, ["Params", "Values"], context.console)  # type: ignore

Gets subnets.

@subnet_app.command()
def distribution(ctx: typer.models.Context):
47@subnet_app.command()
48def distribution(ctx: Context):
49    context = make_custom_context(ctx)
50    client = context.com_client()
51
52    with context.progress_status("Getting emission distribution..."):
53        subnets_emission = client.query_map_subnet_emission()
54        subnet_consensus = client.query_map_subnet_consensus()
55        subnet_names = client.query_map_subnet_names()
56        total_emission = sum(subnets_emission.values())
57        subnet_emission_percentages = {
58            key: value / total_emission * 100 for key, value in subnets_emission.items()
59        }
60
61    # Prepare the data for the table
62    table_data: dict[str, Any] = {
63        "Subnet": [],
64        "Name": [],
65        "Consensus": [],
66        "Emission %": []
67    }
68
69    for subnet, emission_percentage in subnet_emission_percentages.items():
70        if emission_percentage > 0:
71            table_data["Subnet"].append(str(subnet))
72            table_data["Name"].append(subnet_names.get(subnet, "N/A"))
73            table_data["Consensus"].append(subnet_consensus.get(subnet, "N/A"))
74            table_data["Emission %"].append(f"{round(emission_percentage, 2)}%")
75
76    print_table_standardize(table_data, context.console)
@subnet_app.command()
def legit_whitelist(ctx: typer.models.Context):
79@subnet_app.command()
80def legit_whitelist(ctx: Context):
81    """
82    Gets the legitimate whitelist of modules for the general subnet 0
83    """
84
85    context = make_custom_context(ctx)
86    client = context.com_client()
87
88    with context.progress_status("Getting legitimate whitelist ..."):
89        whitelist = cast(dict[str, int], client.query_map_legit_whitelist())
90
91    print_table_from_plain_dict(
92        whitelist, ["Module", "Recommended weight"], context.console
93    )

Gets the legitimate whitelist of modules for the general subnet 0

@subnet_app.command()
def info(ctx: typer.models.Context, netuid: int):
 96@subnet_app.command()
 97def info(ctx: Context, netuid: int):
 98    """
 99    Gets subnet info.
100    """
101    context = make_custom_context(ctx)
102    client = context.com_client()
103
104    with context.progress_status(f"Getting subnet with netuid '{netuid}'..."):
105
106        subnets = get_map_subnets_params(client)
107        subnet = subnets.get(netuid, None)
108
109    if subnet is None:
110        raise ValueError("Subnet not found")
111
112    general_subnet: dict[str, Any] = cast(dict[str, Any], subnet)
113    print_table_from_plain_dict(
114        general_subnet, ["Params", "Values"], context.console)

Gets subnet info.

@subnet_app.command()
def register( ctx: typer.models.Context, key: str, name: str, metadata: str = <typer.models.OptionInfo object>):
117@subnet_app.command()
118def register(
119    ctx: Context,
120    key: str,
121    name: str,
122    metadata: str = typer.Option(None)
123):
124    """
125    Registers a new subnet.
126    """
127    context = make_custom_context(ctx)
128    resolved_key = try_classic_load_key(key)
129    client = context.com_client()
130
131    with context.progress_status("Registering subnet ..."):
132        response = client.register_subnet(resolved_key, name, metadata)
133
134    if response.is_success:
135        context.info(f"Successfully registered subnet {name}")
136    else:
137        raise ChainTransactionError(response.error_message)  # type: ignore

Registers a new subnet.

@subnet_app.command()
def update( ctx: typer.models.Context, key: str, netuid: int, founder: str = <typer.models.OptionInfo object>, founder_share: int = <typer.models.OptionInfo object>, name: str = <typer.models.OptionInfo object>, metadata: str = <typer.models.OptionInfo object>, immunity_period: int = <typer.models.OptionInfo object>, incentive_ratio: int = <typer.models.OptionInfo object>, max_allowed_uids: int = <typer.models.OptionInfo object>, max_allowed_weights: int = <typer.models.OptionInfo object>, min_allowed_weights: int = <typer.models.OptionInfo object>, max_weight_age: float = <typer.models.OptionInfo object>, tempo: int = <typer.models.OptionInfo object>, trust_ratio: int = <typer.models.OptionInfo object>, maximum_set_weight_calls_per_epoch: int = <typer.models.OptionInfo object>, vote_mode: communex.types.VoteMode = <typer.models.OptionInfo object>, bonds_ma: int = <typer.models.OptionInfo object>, min_burn: int = <typer.models.OptionInfo object>, max_burn: int = <typer.models.OptionInfo object>, adjustment_alpha: int = <typer.models.OptionInfo object>, target_registrations_interval: int = <typer.models.OptionInfo object>, target_registrations_per_interval: int = <typer.models.OptionInfo object>, max_registrations_per_interval: int = <typer.models.OptionInfo object>, min_validator_stake: int = <typer.models.OptionInfo object>, max_allowed_validators: int = <typer.models.OptionInfo object>):
140@subnet_app.command()
141def update(
142    ctx: Context,
143    key: str,
144    netuid: int,
145    founder: str = typer.Option(None),
146    founder_share: int = typer.Option(None),
147    name: str = typer.Option(None),
148    metadata: str = typer.Option(None),
149    immunity_period: int = typer.Option(None),
150    incentive_ratio: int = typer.Option(None),
151    max_allowed_uids: int = typer.Option(None),
152    max_allowed_weights: int = typer.Option(None),
153    min_allowed_weights: int = typer.Option(None),
154    max_weight_age: float = typer.Option(None),
155    tempo: int = typer.Option(None),
156    trust_ratio: int = typer.Option(None),
157    maximum_set_weight_calls_per_epoch: int = typer.Option(None),
158
159    # GovernanceConfiguration
160    vote_mode: VoteMode = typer.Option(None),
161
162    bonds_ma: int = typer.Option(None),
163
164    # BurnConfiguration
165    min_burn: int = typer.Option(None),
166    max_burn: int = typer.Option(None),
167    adjustment_alpha: int = typer.Option(None),
168    target_registrations_interval: int = typer.Option(None),
169    target_registrations_per_interval: int = typer.Option(None),
170    max_registrations_per_interval: int = typer.Option(None),
171
172    min_validator_stake: int = typer.Option(None),
173    max_allowed_validators: int = typer.Option(None),
174):
175    """
176    Updates a subnet.
177    """
178    provided_params = locals().copy()
179    provided_params.pop("ctx")
180    provided_params.pop("key")
181    provided_params.pop("netuid")
182
183    provided_params = {
184        key: value for key, value in provided_params.items() if value is not None
185    }
186    if vote_mode is not None:  # type: ignore
187        provided_params["vote_mode"] = vote_mode.value
188    context = make_custom_context(ctx)
189    client = context.com_client()
190    subnets_info = get_map_subnets_params(client)
191    subnet_params = subnets_info[netuid]
192    subnet_vote_mode = subnet_params["governance_config"]["vote_mode"]  # type: ignore
193    subnet_burn_config = subnet_params["module_burn_config"]  # type: ignore
194    # intersection update
195    for param, value in provided_params.items():
196        if param in subnet_burn_config and value is not None:
197            subnet_burn_config[param] = value
198
199    subnet_params = dict(subnet_params)
200    subnet_params.pop("emission")
201    subnet_params.pop("governance_config")
202    subnet_params["vote_mode"] = subnet_vote_mode  # type: ignore
203
204    subnet_params = cast(SubnetParams, subnet_params)
205    provided_params = cast(SubnetParams, provided_params)
206    subnet_params.update(provided_params)
207    # because bonds_ma and maximum_set_weights dont have a default value
208    if subnet_params.get("bonds_ma", None) is None:
209        subnet_params["bonds_ma"] = client.query("BondsMovingAverage")
210    if subnet_params.get("maximum_set_weight_calls_per_epoch", None) is None:
211        subnet_params["maximum_set_weight_calls_per_epoch"] = client.query(
212            "MaximumSetWeightCallsPerEpoch"
213        )
214    resolved_key = try_classic_load_key(key)
215    with context.progress_status("Updating subnet ..."):
216        response = client.update_subnet(
217            key=resolved_key, params=subnet_params, netuid=netuid
218        )
219
220    if response.is_success:
221        context.info(
222            f"Successfully updated subnet {subnet_params['name']} with netuid {netuid}"
223        )
224    else:
225        raise ChainTransactionError(response.error_message)  # type: ignore

Updates a subnet.

@subnet_app.command()
def propose_on_subnet( ctx: typer.models.Context, key: str, netuid: int, cid: str, founder: str = <typer.models.OptionInfo object>, founder_share: int = <typer.models.OptionInfo object>, metadata: str = <typer.models.OptionInfo object>, name: str = <typer.models.OptionInfo object>, immunity_period: int = <typer.models.OptionInfo object>, incentive_ratio: int = <typer.models.OptionInfo object>, max_allowed_uids: int = <typer.models.OptionInfo object>, max_allowed_weights: int = <typer.models.OptionInfo object>, min_allowed_weights: int = <typer.models.OptionInfo object>, max_weight_age: int = <typer.models.OptionInfo object>, tempo: int = <typer.models.OptionInfo object>, trust_ratio: int = <typer.models.OptionInfo object>, maximum_set_weight_calls_per_epoch: int = <typer.models.OptionInfo object>, bonds_ma: int = <typer.models.OptionInfo object>, vote_mode: communex.types.VoteMode = <typer.models.OptionInfo object>, min_burn: int = <typer.models.OptionInfo object>, max_burn: int = <typer.models.OptionInfo object>, adjustment_alpha: int = <typer.models.OptionInfo object>, target_registrations_interval: int = <typer.models.OptionInfo object>, target_registrations_per_interval: int = <typer.models.OptionInfo object>, max_registrations_per_interval: int = <typer.models.OptionInfo object>, min_validator_stake: int = <typer.models.OptionInfo object>, max_allowed_validators: int = <typer.models.OptionInfo object>):
228@subnet_app.command()
229def propose_on_subnet(
230    ctx: Context,
231    key: str,
232    netuid: int,
233    cid: str,
234    founder: str = typer.Option(None),
235    founder_share: int = typer.Option(None),
236    metadata: str = typer.Option(None),
237    name: str = typer.Option(None),
238    immunity_period: int = typer.Option(None),
239    incentive_ratio: int = typer.Option(None),
240    max_allowed_uids: int = typer.Option(None),
241    max_allowed_weights: int = typer.Option(None),
242    min_allowed_weights: int = typer.Option(None),
243    max_weight_age: int = typer.Option(None),
244    tempo: int = typer.Option(None),
245    trust_ratio: int = typer.Option(None),
246    maximum_set_weight_calls_per_epoch: int = typer.Option(None),
247    bonds_ma: int = typer.Option(None),
248
249    vote_mode: VoteMode = typer.Option(None, help="0 for Authority, 1 for Vote"),
250
251    # BurnConfiguration
252    min_burn: int = typer.Option(None),
253    max_burn: int = typer.Option(None),
254    adjustment_alpha: int = typer.Option(None),
255    target_registrations_interval: int = typer.Option(None),
256    target_registrations_per_interval: int = typer.Option(None),
257    max_registrations_per_interval: int = typer.Option(None),
258
259    min_validator_stake: int = typer.Option(None),
260    max_allowed_validators: int = typer.Option(None),
261):
262    """
263    Adds a proposal to a specific subnet.
264    """
265    context = make_custom_context(ctx)
266    if not re.match(IPFS_REGEX, cid):
267        context.error(f"CID provided is invalid: {cid}")
268        exit(1)
269    else:
270        ipfs_prefix = "ipfs://"
271        cid = ipfs_prefix + cid
272
273    provided_params = locals().copy()
274    provided_params.pop("ctx")
275    provided_params.pop("context")
276    provided_params.pop("key")
277    provided_params.pop("ipfs_prefix")
278    if provided_params["founder"] is not None:
279        resolve_founder = resolve_key_ss58(founder)
280        provided_params["founder"] = resolve_founder
281
282    provided_params = {
283        key: value for key, value in provided_params.items() if value is not None
284    }
285
286    client = context.com_client()
287    subnets_info = get_map_subnets_params(client)
288    subnet_params = subnets_info[netuid]
289    subnet_vote_mode = subnet_params["governance_config"]["vote_mode"]  # type: ignore
290    subnet_burn_config = subnet_params["module_burn_config"]  # type: ignore
291    # intersection update
292    for param, value in provided_params.items():
293        if param in subnet_burn_config and value is not None:
294            subnet_burn_config[param] = value
295    subnet_params["vote_mode"] = subnet_vote_mode  # type: ignore
296
297    subnet_params = dict(subnet_params)
298    subnet_params.pop("emission")
299    subnet_params.pop("governance_config")
300
301    subnet_params.update(provided_params)
302    # because bonds_ma and maximum_set_weights dont have a default value
303    if subnet_params.get("bonds_ma", None) is None:
304        subnet_params["bonds_ma"] = client.query("BondsMovingAverage")
305    if subnet_params.get("maximum_set_weight_calls_per_epoch", None) is None:
306        subnet_params["maximum_set_weight_calls_per_epoch"] = client.query(
307            "MaximumSetWeightCallsPerEpoch"
308        )
309
310    resolved_key = try_classic_load_key(key)
311    with context.progress_status("Adding a proposal..."):
312        client.add_subnet_proposal(
313            resolved_key,
314            subnet_params,
315            cid,
316            netuid=netuid
317        )
318    context.info("Proposal added.")

Adds a proposal to a specific subnet.

@subnet_app.command()
def submit_general_subnet_application(ctx: typer.models.Context, key: str, application_key: str, cid: str):
321@subnet_app.command()
322def submit_general_subnet_application(
323    ctx: Context, key: str, application_key: str, cid: str
324):
325    """
326    Submits a legitimate whitelist application to the general subnet, netuid 0.
327    """
328
329    context = make_custom_context(ctx)
330    if not re.match(IPFS_REGEX, cid):
331        context.error(f"CID provided is invalid: {cid}")
332        exit(1)
333
334    client = context.com_client()
335
336    resolved_key = try_classic_load_key(key)
337    resolved_application_key = resolve_key_ss58(application_key)
338
339    # append the ipfs hash
340    ipfs_prefix = "ipfs://"
341    cid = ipfs_prefix + cid
342
343    with context.progress_status("Adding a application..."):
344        client.add_dao_application(resolved_key, resolved_application_key, cid)

Submits a legitimate whitelist application to the general subnet, netuid 0.

@subnet_app.command()
def add_custom_proposal(ctx: typer.models.Context, key: str, cid: str, netuid: int):
347@subnet_app.command()
348def add_custom_proposal(
349    ctx: Context,
350    key: str,
351    cid: str,
352    netuid: int,
353):
354    """
355    Adds a custom proposal to a specific subnet.
356    """
357
358    context = make_custom_context(ctx)
359    if not re.match(IPFS_REGEX, cid):
360        context.error(f"CID provided is invalid: {cid}")
361        exit(1)
362
363    client = context.com_client()
364
365    resolved_key = try_classic_load_key(key)
366
367    # append the ipfs hash
368    ipfs_prefix = "ipfs://"
369    cid = ipfs_prefix + cid
370
371    with context.progress_status("Adding a proposal..."):
372        client.add_custom_subnet_proposal(resolved_key, cid, netuid=netuid)

Adds a custom proposal to a specific subnet.

@subnet_app.command()
def list_curator_applications(ctx: typer.models.Context):
375@subnet_app.command()
376def list_curator_applications(
377    ctx: Context
378):
379    """
380    Lists all curator applications.
381    """
382    context = make_custom_context(ctx)
383    client = context.com_client()
384
385    with context.progress_status("Querying applications..."):
386        apps = client.query_map_curator_applications()
387    print(apps)

Lists all curator applications.