Edit on GitHub

communex.cli.subnet

  1import re
  2from typing import Any, cast
  3
  4import typer
  5from typer import Context
  6
  7from communex.cli._common import (
  8    make_custom_context,
  9    print_table_from_plain_dict,
 10    print_table_standardize,
 11)
 12from communex.compat.key import resolve_key_ss58
 13from communex.errors import ChainTransactionError
 14from communex.misc import (
 15    IPFS_REGEX,
 16    get_map_displayable_subnets,
 17    get_map_subnets_params,
 18)
 19from communex.types import SubnetParams, VoteMode
 20
 21subnet_app = typer.Typer(no_args_is_help=True)
 22
 23
 24@subnet_app.command()
 25def list(ctx: Context):
 26    """
 27    Gets subnets.
 28    """
 29    context = make_custom_context(ctx)
 30    client = context.com_client()
 31
 32    with context.progress_status("Getting subnets ..."):
 33        subnets = get_map_displayable_subnets(client)
 34    subnets_with_netuids = [
 35        {"netuid": key, **value} for key, value in subnets.items()
 36    ]
 37    for dict in subnets_with_netuids:  # type: ignore
 38        print_table_from_plain_dict(dict, ["Params", "Values"], context.console)  # type: ignore
 39
 40
 41@subnet_app.command()
 42def distribution(ctx: Context):
 43    context = make_custom_context(ctx)
 44    client = context.com_client()
 45
 46    with context.progress_status("Getting emission distribution..."):
 47        subnets_emission = client.query_map_subnet_emission()
 48        subnet_consensus = client.query_map_subnet_consensus()
 49        subnet_names = client.query_map_subnet_names()
 50        total_emission = sum(subnets_emission.values())
 51        subnet_emission_percentages = {
 52            key: value / total_emission * 100
 53            for key, value in subnets_emission.items()
 54        }
 55
 56    # Prepare the data for the table
 57    table_data: dict[str, Any] = {
 58        "Subnet": [],
 59        "Name": [],
 60        "Consensus": [],
 61        "Emission %": [],
 62    }
 63
 64    for subnet, emission_percentage in subnet_emission_percentages.items():
 65        if emission_percentage > 0:
 66            table_data["Subnet"].append(str(subnet))
 67            table_data["Name"].append(subnet_names.get(subnet, "N/A"))
 68            table_data["Consensus"].append(subnet_consensus.get(subnet, "N/A"))
 69            table_data["Emission %"].append(f"{round(emission_percentage, 2)}%")
 70
 71    print_table_standardize(table_data, context.console)
 72
 73
 74@subnet_app.command()
 75def legit_whitelist(ctx: Context):
 76    """
 77    Gets the legitimate whitelist of modules for the general subnet 0
 78    """
 79
 80    context = make_custom_context(ctx)
 81    client = context.com_client()
 82
 83    with context.progress_status("Getting legitimate whitelist ..."):
 84        whitelist = cast(dict[str, int], client.query_map_legit_whitelist())
 85
 86    print_table_from_plain_dict(
 87        whitelist, ["Module", "Recommended weight"], context.console
 88    )
 89
 90
 91@subnet_app.command()
 92def info(ctx: Context, netuid: int):
 93    """
 94    Gets subnet info.
 95    """
 96    context = make_custom_context(ctx)
 97    client = context.com_client()
 98
 99    with context.progress_status(f"Getting subnet with netuid '{netuid}'..."):
100        subnets = get_map_displayable_subnets(client)
101        subnet = subnets.get(netuid, None)
102
103    if subnet is None:
104        raise ValueError("Subnet not found")
105
106    general_subnet: dict[str, Any] = cast(dict[str, Any], subnet)
107    print_table_from_plain_dict(
108        general_subnet, ["Params", "Values"], context.console
109    )
110
111
112@subnet_app.command()
113def register(
114    ctx: Context, key: str, name: str, metadata: str = typer.Option(None)
115):
116    """
117    Registers a new subnet.
118    """
119    context = make_custom_context(ctx)
120    client = context.com_client()
121    resolved_key = context.load_key(key, None)
122
123    with context.progress_status("Registering subnet ..."):
124        response = client.register_subnet(resolved_key, name, metadata)
125
126    if response.is_success:
127        context.info(f"Successfully registered subnet {name}")
128    else:
129        raise ChainTransactionError(response.error_message)  # type: ignore
130
131
132@subnet_app.command()
133def update(
134    ctx: Context,
135    key: str,
136    netuid: int,
137    founder: str = typer.Option(None),
138    founder_share: int = typer.Option(None),
139    name: str = typer.Option(None),
140    metadata: str = typer.Option(None),
141    immunity_period: int = typer.Option(None),
142    incentive_ratio: int = typer.Option(None),
143    max_allowed_uids: int = typer.Option(None),
144    max_allowed_weights: int = typer.Option(None),
145    min_allowed_weights: int = typer.Option(None),
146    max_weight_age: float = typer.Option(None),
147    tempo: int = typer.Option(None),
148    trust_ratio: int = typer.Option(None),
149    maximum_set_weight_calls_per_epoch: int = typer.Option(None),
150    # GovernanceConfiguration
151    vote_mode: VoteMode = typer.Option(None),
152    bonds_ma: int = typer.Option(None),
153    # BurnConfiguration
154    min_burn: int = typer.Option(None),
155    max_burn: int = typer.Option(None),
156    adjustment_alpha: int = typer.Option(None),
157    target_registrations_interval: int = typer.Option(None),
158    target_registrations_per_interval: int = typer.Option(None),
159    max_registrations_per_interval: int = typer.Option(None),
160    min_validator_stake: int = typer.Option(None),
161    max_allowed_validators: int = typer.Option(None),
162    max_encryption_period: int = typer.Option(None),
163    copier_margin: int = typer.Option(None),
164    use_weights_encryption: bool = typer.Option(None),
165):
166    """
167    Updates a subnet.
168    """
169    provided_params = locals().copy()
170    provided_params.pop("ctx")
171    provided_params.pop("key")
172    provided_params.pop("netuid")
173
174    provided_params = {
175        key: value
176        for key, value in provided_params.items()
177        if value is not None
178    }
179    if vote_mode is not None:  # type: ignore
180        provided_params["vote_mode"] = vote_mode.value
181    context = make_custom_context(ctx)
182    client = context.com_client()
183    subnets_info = get_map_subnets_params(client)
184    subnet_params = subnets_info[netuid]
185    subnet_vote_mode = subnet_params["governance_config"]["vote_mode"]  # type: ignore
186    subnet_burn_config = subnet_params["module_burn_config"]  # type: ignore
187    # intersection update
188    for param, value in provided_params.items():
189        if param in subnet_burn_config and value is not None:
190            subnet_burn_config[param] = value
191
192    subnet_params = dict(subnet_params)
193    subnet_params.pop("emission")
194    subnet_params.pop("governance_config")
195    subnet_params["vote_mode"] = subnet_vote_mode  # type: ignore
196
197    subnet_params = cast(SubnetParams, subnet_params)
198    provided_params = cast(SubnetParams, provided_params)
199    subnet_params.update(provided_params)
200    # because bonds_ma and maximum_set_weights dont have a default value
201    if subnet_params.get("bonds_ma", None) is None:
202        subnet_params["bonds_ma"] = client.query("BondsMovingAverage")
203    if subnet_params.get("maximum_set_weight_calls_per_epoch", None) is None:
204        subnet_params["maximum_set_weight_calls_per_epoch"] = client.query(
205            "MaximumSetWeightCallsPerEpoch"
206        )
207    resolved_key = context.load_key(key, None)
208    with context.progress_status("Updating subnet ..."):
209        response = client.update_subnet(
210            key=resolved_key, params=subnet_params, netuid=netuid
211        )
212
213    if response.is_success:
214        context.info(
215            f"Successfully updated subnet {subnet_params['name']} with netuid {netuid}"
216        )
217    else:
218        raise ChainTransactionError(response.error_message)  # type: ignore
219
220
221@subnet_app.command()
222def propose_on_subnet(
223    ctx: Context,
224    key: str,
225    netuid: int,
226    cid: str,
227    founder: str = typer.Option(None),
228    founder_share: int = typer.Option(None),
229    metadata: str = typer.Option(None),
230    name: str = typer.Option(None),
231    immunity_period: int = typer.Option(None),
232    incentive_ratio: int = typer.Option(None),
233    max_allowed_uids: int = typer.Option(None),
234    max_allowed_weights: int = typer.Option(None),
235    min_allowed_weights: int = typer.Option(None),
236    max_weight_age: int = typer.Option(None),
237    tempo: int = typer.Option(None),
238    trust_ratio: int = typer.Option(None),
239    maximum_set_weight_calls_per_epoch: int = typer.Option(None),
240    bonds_ma: int = typer.Option(None),
241    vote_mode: VoteMode = typer.Option(
242        None, help="0 for Authority, 1 for Vote"
243    ),
244    # BurnConfiguration
245    min_burn: int = typer.Option(None),
246    max_burn: int = typer.Option(None),
247    adjustment_alpha: int = typer.Option(None),
248    target_registrations_interval: int = typer.Option(None),
249    target_registrations_per_interval: int = typer.Option(None),
250    max_registrations_per_interval: int = typer.Option(None),
251    min_validator_stake: int = typer.Option(None),
252    max_allowed_validators: int = typer.Option(None),
253    max_encryption_period: int = typer.Option(None),
254    copier_margin: int = typer.Option(None),
255    use_weights_encryption: int = typer.Option(None),
256):
257    """
258    Adds a proposal to a specific subnet.
259    """
260    context = make_custom_context(ctx)
261    if not re.match(IPFS_REGEX, cid):
262        context.error(f"CID provided is invalid: {cid}")
263        exit(1)
264    else:
265        ipfs_prefix = "ipfs://"
266        cid = ipfs_prefix + cid
267
268    provided_params = locals().copy()
269    provided_params.pop("ctx")
270    provided_params.pop("context")
271    provided_params.pop("key")
272    provided_params.pop("ipfs_prefix")
273    if provided_params["founder"] is not None:
274        resolve_founder = resolve_key_ss58(founder)
275        provided_params["founder"] = resolve_founder
276
277    provided_params = {
278        key: value
279        for key, value in provided_params.items()
280        if value is not None
281    }
282
283    client = context.com_client()
284    subnets_info = get_map_subnets_params(client)
285    subnet_params = subnets_info[netuid]
286    subnet_vote_mode = subnet_params["governance_config"]["vote_mode"]  # type: ignore
287    subnet_burn_config = subnet_params["module_burn_config"]  # type: ignore
288    # intersection update
289    for param, value in provided_params.items():
290        if param in subnet_burn_config and value is not None:
291            subnet_burn_config[param] = value
292    subnet_params["vote_mode"] = subnet_vote_mode  # type: ignore
293
294    subnet_params = dict(subnet_params)
295    subnet_params.pop("emission")
296    subnet_params.pop("governance_config")
297
298    subnet_params.update(provided_params)
299    # because bonds_ma and maximum_set_weights dont have a default value
300    if subnet_params.get("bonds_ma", None) is None:
301        subnet_params["bonds_ma"] = client.query("BondsMovingAverage")
302    if subnet_params.get("maximum_set_weight_calls_per_epoch", None) is None:
303        subnet_params["maximum_set_weight_calls_per_epoch"] = client.query(
304            "MaximumSetWeightCallsPerEpoch"
305        )
306
307    resolved_key = context.load_key(key, None)
308    with context.progress_status("Adding a proposal..."):
309        client.add_subnet_proposal(
310            resolved_key, subnet_params, cid, netuid=netuid
311        )
312    context.info("Proposal added.")
313
314
315@subnet_app.command()
316def submit_general_subnet_application(
317    ctx: Context, key: str, application_key: str, cid: str
318):
319    """
320    Submits a legitimate whitelist application to the general subnet, netuid 0.
321    """
322
323    context = make_custom_context(ctx)
324    if not re.match(IPFS_REGEX, cid):
325        context.error(f"CID provided is invalid: {cid}")
326        exit(1)
327
328    client = context.com_client()
329
330    resolved_key = context.load_key(key, None)
331    resolved_application_key = resolve_key_ss58(application_key)
332
333    # append the ipfs hash
334    ipfs_prefix = "ipfs://"
335    cid = ipfs_prefix + cid
336
337    with context.progress_status("Adding a application..."):
338        client.add_dao_application(resolved_key, resolved_application_key, cid)
339
340
341@subnet_app.command()
342def add_custom_proposal(
343    ctx: Context,
344    key: str,
345    cid: str,
346    netuid: int,
347):
348    """
349    Adds a custom proposal to a specific subnet.
350    """
351    context = make_custom_context(ctx)
352
353    if not re.match(IPFS_REGEX, cid):
354        context.error(f"CID provided is invalid: {cid}")
355        exit(1)
356
357    client = context.com_client()
358
359    resolved_key = context.load_key(key, None)
360
361    ipfs_prefix = "ipfs://"
362    cid = ipfs_prefix + cid
363
364    with context.progress_status("Adding a proposal..."):
365        client.add_custom_subnet_proposal(resolved_key, cid, netuid=netuid)
366
367
368@subnet_app.command()
369def list_curator_applications(ctx: Context):
370    """
371    Lists all curator applications.
372    """
373    context = make_custom_context(ctx)
374    client = context.com_client()
375
376    with context.progress_status("Querying applications..."):
377        apps = client.query_map_curator_applications()
378    print(apps)
subnet_app = <typer.main.Typer object>
@subnet_app.command()
def list(ctx: typer.models.Context):
25@subnet_app.command()
26def list(ctx: Context):
27    """
28    Gets subnets.
29    """
30    context = make_custom_context(ctx)
31    client = context.com_client()
32
33    with context.progress_status("Getting subnets ..."):
34        subnets = get_map_displayable_subnets(client)
35    subnets_with_netuids = [
36        {"netuid": key, **value} for key, value in subnets.items()
37    ]
38    for dict in subnets_with_netuids:  # type: ignore
39        print_table_from_plain_dict(dict, ["Params", "Values"], context.console)  # type: ignore

Gets subnets.

@subnet_app.command()
def distribution(ctx: typer.models.Context):
42@subnet_app.command()
43def distribution(ctx: Context):
44    context = make_custom_context(ctx)
45    client = context.com_client()
46
47    with context.progress_status("Getting emission distribution..."):
48        subnets_emission = client.query_map_subnet_emission()
49        subnet_consensus = client.query_map_subnet_consensus()
50        subnet_names = client.query_map_subnet_names()
51        total_emission = sum(subnets_emission.values())
52        subnet_emission_percentages = {
53            key: value / total_emission * 100
54            for key, value in subnets_emission.items()
55        }
56
57    # Prepare the data for the table
58    table_data: dict[str, Any] = {
59        "Subnet": [],
60        "Name": [],
61        "Consensus": [],
62        "Emission %": [],
63    }
64
65    for subnet, emission_percentage in subnet_emission_percentages.items():
66        if emission_percentage > 0:
67            table_data["Subnet"].append(str(subnet))
68            table_data["Name"].append(subnet_names.get(subnet, "N/A"))
69            table_data["Consensus"].append(subnet_consensus.get(subnet, "N/A"))
70            table_data["Emission %"].append(f"{round(emission_percentage, 2)}%")
71
72    print_table_standardize(table_data, context.console)
@subnet_app.command()
def legit_whitelist(ctx: typer.models.Context):
75@subnet_app.command()
76def legit_whitelist(ctx: Context):
77    """
78    Gets the legitimate whitelist of modules for the general subnet 0
79    """
80
81    context = make_custom_context(ctx)
82    client = context.com_client()
83
84    with context.progress_status("Getting legitimate whitelist ..."):
85        whitelist = cast(dict[str, int], client.query_map_legit_whitelist())
86
87    print_table_from_plain_dict(
88        whitelist, ["Module", "Recommended weight"], context.console
89    )

Gets the legitimate whitelist of modules for the general subnet 0

@subnet_app.command()
def info(ctx: typer.models.Context, netuid: int):
 92@subnet_app.command()
 93def info(ctx: Context, netuid: int):
 94    """
 95    Gets subnet info.
 96    """
 97    context = make_custom_context(ctx)
 98    client = context.com_client()
 99
100    with context.progress_status(f"Getting subnet with netuid '{netuid}'..."):
101        subnets = get_map_displayable_subnets(client)
102        subnet = subnets.get(netuid, None)
103
104    if subnet is None:
105        raise ValueError("Subnet not found")
106
107    general_subnet: dict[str, Any] = cast(dict[str, Any], subnet)
108    print_table_from_plain_dict(
109        general_subnet, ["Params", "Values"], context.console
110    )

Gets subnet info.

@subnet_app.command()
def register( ctx: typer.models.Context, key: str, name: str, metadata: str = <typer.models.OptionInfo object>):
113@subnet_app.command()
114def register(
115    ctx: Context, key: str, name: str, metadata: str = typer.Option(None)
116):
117    """
118    Registers a new subnet.
119    """
120    context = make_custom_context(ctx)
121    client = context.com_client()
122    resolved_key = context.load_key(key, None)
123
124    with context.progress_status("Registering subnet ..."):
125        response = client.register_subnet(resolved_key, name, metadata)
126
127    if response.is_success:
128        context.info(f"Successfully registered subnet {name}")
129    else:
130        raise ChainTransactionError(response.error_message)  # type: ignore

Registers a new subnet.

@subnet_app.command()
def update( ctx: typer.models.Context, key: str, netuid: int, founder: str = <typer.models.OptionInfo object>, founder_share: int = <typer.models.OptionInfo object>, name: str = <typer.models.OptionInfo object>, metadata: str = <typer.models.OptionInfo object>, immunity_period: int = <typer.models.OptionInfo object>, incentive_ratio: int = <typer.models.OptionInfo object>, max_allowed_uids: int = <typer.models.OptionInfo object>, max_allowed_weights: int = <typer.models.OptionInfo object>, min_allowed_weights: int = <typer.models.OptionInfo object>, max_weight_age: float = <typer.models.OptionInfo object>, tempo: int = <typer.models.OptionInfo object>, trust_ratio: int = <typer.models.OptionInfo object>, maximum_set_weight_calls_per_epoch: int = <typer.models.OptionInfo object>, vote_mode: communex.types.VoteMode = <typer.models.OptionInfo object>, bonds_ma: int = <typer.models.OptionInfo object>, min_burn: int = <typer.models.OptionInfo object>, max_burn: int = <typer.models.OptionInfo object>, adjustment_alpha: int = <typer.models.OptionInfo object>, target_registrations_interval: int = <typer.models.OptionInfo object>, target_registrations_per_interval: int = <typer.models.OptionInfo object>, max_registrations_per_interval: int = <typer.models.OptionInfo object>, min_validator_stake: int = <typer.models.OptionInfo object>, max_allowed_validators: int = <typer.models.OptionInfo object>, max_encryption_period: int = <typer.models.OptionInfo object>, copier_margin: int = <typer.models.OptionInfo object>, use_weights_encryption: bool = <typer.models.OptionInfo object>):
133@subnet_app.command()
134def update(
135    ctx: Context,
136    key: str,
137    netuid: int,
138    founder: str = typer.Option(None),
139    founder_share: int = typer.Option(None),
140    name: str = typer.Option(None),
141    metadata: str = typer.Option(None),
142    immunity_period: int = typer.Option(None),
143    incentive_ratio: int = typer.Option(None),
144    max_allowed_uids: int = typer.Option(None),
145    max_allowed_weights: int = typer.Option(None),
146    min_allowed_weights: int = typer.Option(None),
147    max_weight_age: float = typer.Option(None),
148    tempo: int = typer.Option(None),
149    trust_ratio: int = typer.Option(None),
150    maximum_set_weight_calls_per_epoch: int = typer.Option(None),
151    # GovernanceConfiguration
152    vote_mode: VoteMode = typer.Option(None),
153    bonds_ma: int = typer.Option(None),
154    # BurnConfiguration
155    min_burn: int = typer.Option(None),
156    max_burn: int = typer.Option(None),
157    adjustment_alpha: int = typer.Option(None),
158    target_registrations_interval: int = typer.Option(None),
159    target_registrations_per_interval: int = typer.Option(None),
160    max_registrations_per_interval: int = typer.Option(None),
161    min_validator_stake: int = typer.Option(None),
162    max_allowed_validators: int = typer.Option(None),
163    max_encryption_period: int = typer.Option(None),
164    copier_margin: int = typer.Option(None),
165    use_weights_encryption: bool = typer.Option(None),
166):
167    """
168    Updates a subnet.
169    """
170    provided_params = locals().copy()
171    provided_params.pop("ctx")
172    provided_params.pop("key")
173    provided_params.pop("netuid")
174
175    provided_params = {
176        key: value
177        for key, value in provided_params.items()
178        if value is not None
179    }
180    if vote_mode is not None:  # type: ignore
181        provided_params["vote_mode"] = vote_mode.value
182    context = make_custom_context(ctx)
183    client = context.com_client()
184    subnets_info = get_map_subnets_params(client)
185    subnet_params = subnets_info[netuid]
186    subnet_vote_mode = subnet_params["governance_config"]["vote_mode"]  # type: ignore
187    subnet_burn_config = subnet_params["module_burn_config"]  # type: ignore
188    # intersection update
189    for param, value in provided_params.items():
190        if param in subnet_burn_config and value is not None:
191            subnet_burn_config[param] = value
192
193    subnet_params = dict(subnet_params)
194    subnet_params.pop("emission")
195    subnet_params.pop("governance_config")
196    subnet_params["vote_mode"] = subnet_vote_mode  # type: ignore
197
198    subnet_params = cast(SubnetParams, subnet_params)
199    provided_params = cast(SubnetParams, provided_params)
200    subnet_params.update(provided_params)
201    # because bonds_ma and maximum_set_weights dont have a default value
202    if subnet_params.get("bonds_ma", None) is None:
203        subnet_params["bonds_ma"] = client.query("BondsMovingAverage")
204    if subnet_params.get("maximum_set_weight_calls_per_epoch", None) is None:
205        subnet_params["maximum_set_weight_calls_per_epoch"] = client.query(
206            "MaximumSetWeightCallsPerEpoch"
207        )
208    resolved_key = context.load_key(key, None)
209    with context.progress_status("Updating subnet ..."):
210        response = client.update_subnet(
211            key=resolved_key, params=subnet_params, netuid=netuid
212        )
213
214    if response.is_success:
215        context.info(
216            f"Successfully updated subnet {subnet_params['name']} with netuid {netuid}"
217        )
218    else:
219        raise ChainTransactionError(response.error_message)  # type: ignore

Updates a subnet.

@subnet_app.command()
def propose_on_subnet( ctx: typer.models.Context, key: str, netuid: int, cid: str, founder: str = <typer.models.OptionInfo object>, founder_share: int = <typer.models.OptionInfo object>, metadata: str = <typer.models.OptionInfo object>, name: str = <typer.models.OptionInfo object>, immunity_period: int = <typer.models.OptionInfo object>, incentive_ratio: int = <typer.models.OptionInfo object>, max_allowed_uids: int = <typer.models.OptionInfo object>, max_allowed_weights: int = <typer.models.OptionInfo object>, min_allowed_weights: int = <typer.models.OptionInfo object>, max_weight_age: int = <typer.models.OptionInfo object>, tempo: int = <typer.models.OptionInfo object>, trust_ratio: int = <typer.models.OptionInfo object>, maximum_set_weight_calls_per_epoch: int = <typer.models.OptionInfo object>, bonds_ma: int = <typer.models.OptionInfo object>, vote_mode: communex.types.VoteMode = <typer.models.OptionInfo object>, min_burn: int = <typer.models.OptionInfo object>, max_burn: int = <typer.models.OptionInfo object>, adjustment_alpha: int = <typer.models.OptionInfo object>, target_registrations_interval: int = <typer.models.OptionInfo object>, target_registrations_per_interval: int = <typer.models.OptionInfo object>, max_registrations_per_interval: int = <typer.models.OptionInfo object>, min_validator_stake: int = <typer.models.OptionInfo object>, max_allowed_validators: int = <typer.models.OptionInfo object>, max_encryption_period: int = <typer.models.OptionInfo object>, copier_margin: int = <typer.models.OptionInfo object>, use_weights_encryption: int = <typer.models.OptionInfo object>):
222@subnet_app.command()
223def propose_on_subnet(
224    ctx: Context,
225    key: str,
226    netuid: int,
227    cid: str,
228    founder: str = typer.Option(None),
229    founder_share: int = typer.Option(None),
230    metadata: str = typer.Option(None),
231    name: str = typer.Option(None),
232    immunity_period: int = typer.Option(None),
233    incentive_ratio: int = typer.Option(None),
234    max_allowed_uids: int = typer.Option(None),
235    max_allowed_weights: int = typer.Option(None),
236    min_allowed_weights: int = typer.Option(None),
237    max_weight_age: int = typer.Option(None),
238    tempo: int = typer.Option(None),
239    trust_ratio: int = typer.Option(None),
240    maximum_set_weight_calls_per_epoch: int = typer.Option(None),
241    bonds_ma: int = typer.Option(None),
242    vote_mode: VoteMode = typer.Option(
243        None, help="0 for Authority, 1 for Vote"
244    ),
245    # BurnConfiguration
246    min_burn: int = typer.Option(None),
247    max_burn: int = typer.Option(None),
248    adjustment_alpha: int = typer.Option(None),
249    target_registrations_interval: int = typer.Option(None),
250    target_registrations_per_interval: int = typer.Option(None),
251    max_registrations_per_interval: int = typer.Option(None),
252    min_validator_stake: int = typer.Option(None),
253    max_allowed_validators: int = typer.Option(None),
254    max_encryption_period: int = typer.Option(None),
255    copier_margin: int = typer.Option(None),
256    use_weights_encryption: int = typer.Option(None),
257):
258    """
259    Adds a proposal to a specific subnet.
260    """
261    context = make_custom_context(ctx)
262    if not re.match(IPFS_REGEX, cid):
263        context.error(f"CID provided is invalid: {cid}")
264        exit(1)
265    else:
266        ipfs_prefix = "ipfs://"
267        cid = ipfs_prefix + cid
268
269    provided_params = locals().copy()
270    provided_params.pop("ctx")
271    provided_params.pop("context")
272    provided_params.pop("key")
273    provided_params.pop("ipfs_prefix")
274    if provided_params["founder"] is not None:
275        resolve_founder = resolve_key_ss58(founder)
276        provided_params["founder"] = resolve_founder
277
278    provided_params = {
279        key: value
280        for key, value in provided_params.items()
281        if value is not None
282    }
283
284    client = context.com_client()
285    subnets_info = get_map_subnets_params(client)
286    subnet_params = subnets_info[netuid]
287    subnet_vote_mode = subnet_params["governance_config"]["vote_mode"]  # type: ignore
288    subnet_burn_config = subnet_params["module_burn_config"]  # type: ignore
289    # intersection update
290    for param, value in provided_params.items():
291        if param in subnet_burn_config and value is not None:
292            subnet_burn_config[param] = value
293    subnet_params["vote_mode"] = subnet_vote_mode  # type: ignore
294
295    subnet_params = dict(subnet_params)
296    subnet_params.pop("emission")
297    subnet_params.pop("governance_config")
298
299    subnet_params.update(provided_params)
300    # because bonds_ma and maximum_set_weights dont have a default value
301    if subnet_params.get("bonds_ma", None) is None:
302        subnet_params["bonds_ma"] = client.query("BondsMovingAverage")
303    if subnet_params.get("maximum_set_weight_calls_per_epoch", None) is None:
304        subnet_params["maximum_set_weight_calls_per_epoch"] = client.query(
305            "MaximumSetWeightCallsPerEpoch"
306        )
307
308    resolved_key = context.load_key(key, None)
309    with context.progress_status("Adding a proposal..."):
310        client.add_subnet_proposal(
311            resolved_key, subnet_params, cid, netuid=netuid
312        )
313    context.info("Proposal added.")

Adds a proposal to a specific subnet.

@subnet_app.command()
def submit_general_subnet_application(ctx: typer.models.Context, key: str, application_key: str, cid: str):
316@subnet_app.command()
317def submit_general_subnet_application(
318    ctx: Context, key: str, application_key: str, cid: str
319):
320    """
321    Submits a legitimate whitelist application to the general subnet, netuid 0.
322    """
323
324    context = make_custom_context(ctx)
325    if not re.match(IPFS_REGEX, cid):
326        context.error(f"CID provided is invalid: {cid}")
327        exit(1)
328
329    client = context.com_client()
330
331    resolved_key = context.load_key(key, None)
332    resolved_application_key = resolve_key_ss58(application_key)
333
334    # append the ipfs hash
335    ipfs_prefix = "ipfs://"
336    cid = ipfs_prefix + cid
337
338    with context.progress_status("Adding a application..."):
339        client.add_dao_application(resolved_key, resolved_application_key, cid)

Submits a legitimate whitelist application to the general subnet, netuid 0.

@subnet_app.command()
def add_custom_proposal(ctx: typer.models.Context, key: str, cid: str, netuid: int):
342@subnet_app.command()
343def add_custom_proposal(
344    ctx: Context,
345    key: str,
346    cid: str,
347    netuid: int,
348):
349    """
350    Adds a custom proposal to a specific subnet.
351    """
352    context = make_custom_context(ctx)
353
354    if not re.match(IPFS_REGEX, cid):
355        context.error(f"CID provided is invalid: {cid}")
356        exit(1)
357
358    client = context.com_client()
359
360    resolved_key = context.load_key(key, None)
361
362    ipfs_prefix = "ipfs://"
363    cid = ipfs_prefix + cid
364
365    with context.progress_status("Adding a proposal..."):
366        client.add_custom_subnet_proposal(resolved_key, cid, netuid=netuid)

Adds a custom proposal to a specific subnet.

@subnet_app.command()
def list_curator_applications(ctx: typer.models.Context):
369@subnet_app.command()
370def list_curator_applications(ctx: Context):
371    """
372    Lists all curator applications.
373    """
374    context = make_custom_context(ctx)
375    client = context.com_client()
376
377    with context.progress_status("Querying applications..."):
378        apps = client.query_map_curator_applications()
379    print(apps)

Lists all curator applications.