Edit on GitHub

communex.cli.subnet

  1import re
  2from typing import Any, cast
  3
  4import typer
  5from typer import Context
  6
  7from communex.cli._common import (
  8    make_custom_context,
  9    print_table_from_plain_dict,
 10    print_table_standardize,
 11)
 12from communex.compat.key import resolve_key_ss58
 13from communex.errors import ChainTransactionError
 14from communex.misc import (
 15    IPFS_REGEX,
 16    get_map_displayable_subnets,
 17    get_map_subnets_params,
 18)
 19from communex.types import SubnetParams, VoteMode
 20
 21subnet_app = typer.Typer(no_args_is_help=True)
 22
 23
 24@subnet_app.command()
 25def list(ctx: Context):
 26    """
 27    Gets subnets.
 28    """
 29    context = make_custom_context(ctx)
 30    client = context.com_client()
 31
 32    with context.progress_status("Getting subnets ..."):
 33        subnets = get_map_displayable_subnets(client)
 34    subnets_with_netuids = [
 35        {"netuid": key, **value} for key, value in subnets.items()
 36    ]
 37    for dict in subnets_with_netuids:  # type: ignore
 38        print_table_from_plain_dict(dict, ["Params", "Values"], context.console)  # type: ignore
 39
 40
 41@subnet_app.command()
 42def distribution(ctx: Context):
 43    context = make_custom_context(ctx)
 44    client = context.com_client()
 45
 46    with context.progress_status("Getting emission distribution..."):
 47        subnets_emission = client.query_map_subnet_emission()
 48        subnet_consensus = client.query_map_subnet_consensus()
 49        subnet_names = client.query_map_subnet_names()
 50        total_emission = sum(subnets_emission.values())
 51        subnet_emission_percentages = {
 52            key: value / total_emission * 100
 53            for key, value in subnets_emission.items()
 54        }
 55
 56    # Prepare the data for the table
 57    table_data: dict[str, Any] = {
 58        "Subnet": [],
 59        "Name": [],
 60        "Consensus": [],
 61        "Emission %": [],
 62    }
 63
 64    for subnet, emission_percentage in subnet_emission_percentages.items():
 65        if emission_percentage > 0:
 66            table_data["Subnet"].append(str(subnet))
 67            table_data["Name"].append(subnet_names.get(subnet, "N/A"))
 68            table_data["Consensus"].append(subnet_consensus.get(subnet, "N/A"))
 69            table_data["Emission %"].append(f"{round(emission_percentage, 2)}%")
 70
 71    print_table_standardize(table_data, context.console)
 72
 73
 74@subnet_app.command()
 75def legit_whitelist(ctx: Context):
 76    """
 77    Gets the legitimate whitelist of modules for the general subnet 0
 78    """
 79
 80    context = make_custom_context(ctx)
 81    client = context.com_client()
 82
 83    with context.progress_status("Getting legitimate whitelist ..."):
 84        whitelist = cast(dict[str, int], client.query_map_legit_whitelist())
 85
 86    print_table_from_plain_dict(
 87        whitelist, ["Module", "Recommended weight"], context.console
 88    )
 89
 90
 91@subnet_app.command()
 92def info(ctx: Context, netuid: int):
 93    """
 94    Gets subnet info.
 95    """
 96    context = make_custom_context(ctx)
 97    client = context.com_client()
 98
 99    with context.progress_status(f"Getting subnet with netuid '{netuid}'..."):
100        subnets = get_map_displayable_subnets(client)
101        subnet = subnets.get(netuid, None)
102
103    if subnet is None:
104        raise ValueError("Subnet not found")
105
106    general_subnet: dict[str, Any] = cast(dict[str, Any], subnet)
107    print_table_from_plain_dict(
108        general_subnet, ["Params", "Values"], context.console
109    )
110
111
112@subnet_app.command()
113def register(
114    ctx: Context, key: str, name: str, metadata: str = typer.Option(None)
115):
116    """
117    Registers a new subnet.
118    """
119    context = make_custom_context(ctx)
120    client = context.com_client()
121    resolved_key = context.load_key(key, None)
122
123    with context.progress_status("Registering subnet ..."):
124        response = client.register_subnet(resolved_key, name, metadata)
125
126    if response.is_success:
127        context.info(f"Successfully registered subnet {name}")
128    else:
129        raise ChainTransactionError(response.error_message)  # type: ignore
130
131
132@subnet_app.command()
133def update(
134    ctx: Context,
135    key: str,
136    netuid: int,
137    founder: str = typer.Option(None),
138    founder_share: int = typer.Option(None),
139    name: str = typer.Option(None),
140    metadata: str = typer.Option(None),
141    immunity_period: int = typer.Option(None),
142    incentive_ratio: int = typer.Option(None),
143    max_allowed_uids: int = typer.Option(None),
144    max_allowed_weights: int = typer.Option(None),
145    min_allowed_weights: int = typer.Option(None),
146    max_weight_age: float = typer.Option(None),
147    tempo: int = typer.Option(None),
148    trust_ratio: int = typer.Option(None),
149    maximum_set_weight_calls_per_epoch: int = typer.Option(None),
150    # GovernanceConfiguration
151    vote_mode: VoteMode = typer.Option(None),
152    bonds_ma: int = typer.Option(None),
153    # BurnConfiguration
154    min_burn: int = typer.Option(None),
155    max_burn: int = typer.Option(None),
156    adjustment_alpha: int = typer.Option(None),
157    target_registrations_interval: int = typer.Option(None),
158    target_registrations_per_interval: int = typer.Option(None),
159    max_registrations_per_interval: int = typer.Option(None),
160    min_validator_stake: int = typer.Option(None),
161    max_allowed_validators: int = typer.Option(None),
162):
163    """
164    Updates a subnet.
165    """
166    provided_params = locals().copy()
167    provided_params.pop("ctx")
168    provided_params.pop("key")
169    provided_params.pop("netuid")
170
171    provided_params = {
172        key: value
173        for key, value in provided_params.items()
174        if value is not None
175    }
176    if vote_mode is not None:  # type: ignore
177        provided_params["vote_mode"] = vote_mode.value
178    context = make_custom_context(ctx)
179    client = context.com_client()
180    subnets_info = get_map_subnets_params(client)
181    subnet_params = subnets_info[netuid]
182    subnet_vote_mode = subnet_params["governance_config"]["vote_mode"]  # type: ignore
183    subnet_burn_config = subnet_params["module_burn_config"]  # type: ignore
184    # intersection update
185    for param, value in provided_params.items():
186        if param in subnet_burn_config and value is not None:
187            subnet_burn_config[param] = value
188
189    subnet_params = dict(subnet_params)
190    subnet_params.pop("emission")
191    subnet_params.pop("governance_config")
192    subnet_params["vote_mode"] = subnet_vote_mode  # type: ignore
193
194    subnet_params = cast(SubnetParams, subnet_params)
195    provided_params = cast(SubnetParams, provided_params)
196    subnet_params.update(provided_params)
197    # because bonds_ma and maximum_set_weights dont have a default value
198    if subnet_params.get("bonds_ma", None) is None:
199        subnet_params["bonds_ma"] = client.query("BondsMovingAverage")
200    if subnet_params.get("maximum_set_weight_calls_per_epoch", None) is None:
201        subnet_params["maximum_set_weight_calls_per_epoch"] = client.query(
202            "MaximumSetWeightCallsPerEpoch"
203        )
204    resolved_key = context.load_key(key, None)
205    with context.progress_status("Updating subnet ..."):
206        response = client.update_subnet(
207            key=resolved_key, params=subnet_params, netuid=netuid
208        )
209
210    if response.is_success:
211        context.info(
212            f"Successfully updated subnet {subnet_params['name']} with netuid {netuid}"
213        )
214    else:
215        raise ChainTransactionError(response.error_message)  # type: ignore
216
217
218@subnet_app.command()
219def propose_on_subnet(
220    ctx: Context,
221    key: str,
222    netuid: int,
223    cid: str,
224    founder: str = typer.Option(None),
225    founder_share: int = typer.Option(None),
226    metadata: str = typer.Option(None),
227    name: str = typer.Option(None),
228    immunity_period: int = typer.Option(None),
229    incentive_ratio: int = typer.Option(None),
230    max_allowed_uids: int = typer.Option(None),
231    max_allowed_weights: int = typer.Option(None),
232    min_allowed_weights: int = typer.Option(None),
233    max_weight_age: int = typer.Option(None),
234    tempo: int = typer.Option(None),
235    trust_ratio: int = typer.Option(None),
236    maximum_set_weight_calls_per_epoch: int = typer.Option(None),
237    bonds_ma: int = typer.Option(None),
238    vote_mode: VoteMode = typer.Option(
239        None, help="0 for Authority, 1 for Vote"
240    ),
241    # BurnConfiguration
242    min_burn: int = typer.Option(None),
243    max_burn: int = typer.Option(None),
244    adjustment_alpha: int = typer.Option(None),
245    target_registrations_interval: int = typer.Option(None),
246    target_registrations_per_interval: int = typer.Option(None),
247    max_registrations_per_interval: int = typer.Option(None),
248    min_validator_stake: int = typer.Option(None),
249    max_allowed_validators: int = typer.Option(None),
250):
251    """
252    Adds a proposal to a specific subnet.
253    """
254    context = make_custom_context(ctx)
255    if not re.match(IPFS_REGEX, cid):
256        context.error(f"CID provided is invalid: {cid}")
257        exit(1)
258    else:
259        ipfs_prefix = "ipfs://"
260        cid = ipfs_prefix + cid
261
262    provided_params = locals().copy()
263    provided_params.pop("ctx")
264    provided_params.pop("context")
265    provided_params.pop("key")
266    provided_params.pop("ipfs_prefix")
267    if provided_params["founder"] is not None:
268        resolve_founder = resolve_key_ss58(founder)
269        provided_params["founder"] = resolve_founder
270
271    provided_params = {
272        key: value
273        for key, value in provided_params.items()
274        if value is not None
275    }
276
277    client = context.com_client()
278    subnets_info = get_map_subnets_params(client)
279    subnet_params = subnets_info[netuid]
280    subnet_vote_mode = subnet_params["governance_config"]["vote_mode"]  # type: ignore
281    subnet_burn_config = subnet_params["module_burn_config"]  # type: ignore
282    # intersection update
283    for param, value in provided_params.items():
284        if param in subnet_burn_config and value is not None:
285            subnet_burn_config[param] = value
286    subnet_params["vote_mode"] = subnet_vote_mode  # type: ignore
287
288    subnet_params = dict(subnet_params)
289    subnet_params.pop("emission")
290    subnet_params.pop("governance_config")
291
292    subnet_params.update(provided_params)
293    # because bonds_ma and maximum_set_weights dont have a default value
294    if subnet_params.get("bonds_ma", None) is None:
295        subnet_params["bonds_ma"] = client.query("BondsMovingAverage")
296    if subnet_params.get("maximum_set_weight_calls_per_epoch", None) is None:
297        subnet_params["maximum_set_weight_calls_per_epoch"] = client.query(
298            "MaximumSetWeightCallsPerEpoch"
299        )
300
301    resolved_key = context.load_key(key, None)
302    with context.progress_status("Adding a proposal..."):
303        client.add_subnet_proposal(
304            resolved_key, subnet_params, cid, netuid=netuid
305        )
306    context.info("Proposal added.")
307
308
309@subnet_app.command()
310def submit_general_subnet_application(
311    ctx: Context, key: str, application_key: str, cid: str
312):
313    """
314    Submits a legitimate whitelist application to the general subnet, netuid 0.
315    """
316
317    context = make_custom_context(ctx)
318    if not re.match(IPFS_REGEX, cid):
319        context.error(f"CID provided is invalid: {cid}")
320        exit(1)
321
322    client = context.com_client()
323
324    resolved_key = context.load_key(key, None)
325    resolved_application_key = resolve_key_ss58(application_key)
326
327    # append the ipfs hash
328    ipfs_prefix = "ipfs://"
329    cid = ipfs_prefix + cid
330
331    with context.progress_status("Adding a application..."):
332        client.add_dao_application(resolved_key, resolved_application_key, cid)
333
334
335@subnet_app.command()
336def add_custom_proposal(
337    ctx: Context,
338    key: str,
339    cid: str,
340    netuid: int,
341):
342    """
343    Adds a custom proposal to a specific subnet.
344    """
345    context = make_custom_context(ctx)
346
347    if not re.match(IPFS_REGEX, cid):
348        context.error(f"CID provided is invalid: {cid}")
349        exit(1)
350
351    client = context.com_client()
352
353    resolved_key = context.load_key(key, None)
354
355    ipfs_prefix = "ipfs://"
356    cid = ipfs_prefix + cid
357
358    with context.progress_status("Adding a proposal..."):
359        client.add_custom_subnet_proposal(resolved_key, cid, netuid=netuid)
360
361
362@subnet_app.command()
363def list_curator_applications(ctx: Context):
364    """
365    Lists all curator applications.
366    """
367    context = make_custom_context(ctx)
368    client = context.com_client()
369
370    with context.progress_status("Querying applications..."):
371        apps = client.query_map_curator_applications()
372    print(apps)
subnet_app = <typer.main.Typer object>
@subnet_app.command()
def list(ctx: typer.models.Context):
25@subnet_app.command()
26def list(ctx: Context):
27    """
28    Gets subnets.
29    """
30    context = make_custom_context(ctx)
31    client = context.com_client()
32
33    with context.progress_status("Getting subnets ..."):
34        subnets = get_map_displayable_subnets(client)
35    subnets_with_netuids = [
36        {"netuid": key, **value} for key, value in subnets.items()
37    ]
38    for dict in subnets_with_netuids:  # type: ignore
39        print_table_from_plain_dict(dict, ["Params", "Values"], context.console)  # type: ignore

Gets subnets.

@subnet_app.command()
def distribution(ctx: typer.models.Context):
42@subnet_app.command()
43def distribution(ctx: Context):
44    context = make_custom_context(ctx)
45    client = context.com_client()
46
47    with context.progress_status("Getting emission distribution..."):
48        subnets_emission = client.query_map_subnet_emission()
49        subnet_consensus = client.query_map_subnet_consensus()
50        subnet_names = client.query_map_subnet_names()
51        total_emission = sum(subnets_emission.values())
52        subnet_emission_percentages = {
53            key: value / total_emission * 100
54            for key, value in subnets_emission.items()
55        }
56
57    # Prepare the data for the table
58    table_data: dict[str, Any] = {
59        "Subnet": [],
60        "Name": [],
61        "Consensus": [],
62        "Emission %": [],
63    }
64
65    for subnet, emission_percentage in subnet_emission_percentages.items():
66        if emission_percentage > 0:
67            table_data["Subnet"].append(str(subnet))
68            table_data["Name"].append(subnet_names.get(subnet, "N/A"))
69            table_data["Consensus"].append(subnet_consensus.get(subnet, "N/A"))
70            table_data["Emission %"].append(f"{round(emission_percentage, 2)}%")
71
72    print_table_standardize(table_data, context.console)
@subnet_app.command()
def legit_whitelist(ctx: typer.models.Context):
75@subnet_app.command()
76def legit_whitelist(ctx: Context):
77    """
78    Gets the legitimate whitelist of modules for the general subnet 0
79    """
80
81    context = make_custom_context(ctx)
82    client = context.com_client()
83
84    with context.progress_status("Getting legitimate whitelist ..."):
85        whitelist = cast(dict[str, int], client.query_map_legit_whitelist())
86
87    print_table_from_plain_dict(
88        whitelist, ["Module", "Recommended weight"], context.console
89    )

Gets the legitimate whitelist of modules for the general subnet 0

@subnet_app.command()
def info(ctx: typer.models.Context, netuid: int):
 92@subnet_app.command()
 93def info(ctx: Context, netuid: int):
 94    """
 95    Gets subnet info.
 96    """
 97    context = make_custom_context(ctx)
 98    client = context.com_client()
 99
100    with context.progress_status(f"Getting subnet with netuid '{netuid}'..."):
101        subnets = get_map_displayable_subnets(client)
102        subnet = subnets.get(netuid, None)
103
104    if subnet is None:
105        raise ValueError("Subnet not found")
106
107    general_subnet: dict[str, Any] = cast(dict[str, Any], subnet)
108    print_table_from_plain_dict(
109        general_subnet, ["Params", "Values"], context.console
110    )

Gets subnet info.

@subnet_app.command()
def register( ctx: typer.models.Context, key: str, name: str, metadata: str = <typer.models.OptionInfo object>):
113@subnet_app.command()
114def register(
115    ctx: Context, key: str, name: str, metadata: str = typer.Option(None)
116):
117    """
118    Registers a new subnet.
119    """
120    context = make_custom_context(ctx)
121    client = context.com_client()
122    resolved_key = context.load_key(key, None)
123
124    with context.progress_status("Registering subnet ..."):
125        response = client.register_subnet(resolved_key, name, metadata)
126
127    if response.is_success:
128        context.info(f"Successfully registered subnet {name}")
129    else:
130        raise ChainTransactionError(response.error_message)  # type: ignore

Registers a new subnet.

@subnet_app.command()
def update( ctx: typer.models.Context, key: str, netuid: int, founder: str = <typer.models.OptionInfo object>, founder_share: int = <typer.models.OptionInfo object>, name: str = <typer.models.OptionInfo object>, metadata: str = <typer.models.OptionInfo object>, immunity_period: int = <typer.models.OptionInfo object>, incentive_ratio: int = <typer.models.OptionInfo object>, max_allowed_uids: int = <typer.models.OptionInfo object>, max_allowed_weights: int = <typer.models.OptionInfo object>, min_allowed_weights: int = <typer.models.OptionInfo object>, max_weight_age: float = <typer.models.OptionInfo object>, tempo: int = <typer.models.OptionInfo object>, trust_ratio: int = <typer.models.OptionInfo object>, maximum_set_weight_calls_per_epoch: int = <typer.models.OptionInfo object>, vote_mode: communex.types.VoteMode = <typer.models.OptionInfo object>, bonds_ma: int = <typer.models.OptionInfo object>, min_burn: int = <typer.models.OptionInfo object>, max_burn: int = <typer.models.OptionInfo object>, adjustment_alpha: int = <typer.models.OptionInfo object>, target_registrations_interval: int = <typer.models.OptionInfo object>, target_registrations_per_interval: int = <typer.models.OptionInfo object>, max_registrations_per_interval: int = <typer.models.OptionInfo object>, min_validator_stake: int = <typer.models.OptionInfo object>, max_allowed_validators: int = <typer.models.OptionInfo object>):
133@subnet_app.command()
134def update(
135    ctx: Context,
136    key: str,
137    netuid: int,
138    founder: str = typer.Option(None),
139    founder_share: int = typer.Option(None),
140    name: str = typer.Option(None),
141    metadata: str = typer.Option(None),
142    immunity_period: int = typer.Option(None),
143    incentive_ratio: int = typer.Option(None),
144    max_allowed_uids: int = typer.Option(None),
145    max_allowed_weights: int = typer.Option(None),
146    min_allowed_weights: int = typer.Option(None),
147    max_weight_age: float = typer.Option(None),
148    tempo: int = typer.Option(None),
149    trust_ratio: int = typer.Option(None),
150    maximum_set_weight_calls_per_epoch: int = typer.Option(None),
151    # GovernanceConfiguration
152    vote_mode: VoteMode = typer.Option(None),
153    bonds_ma: int = typer.Option(None),
154    # BurnConfiguration
155    min_burn: int = typer.Option(None),
156    max_burn: int = typer.Option(None),
157    adjustment_alpha: int = typer.Option(None),
158    target_registrations_interval: int = typer.Option(None),
159    target_registrations_per_interval: int = typer.Option(None),
160    max_registrations_per_interval: int = typer.Option(None),
161    min_validator_stake: int = typer.Option(None),
162    max_allowed_validators: int = typer.Option(None),
163):
164    """
165    Updates a subnet.
166    """
167    provided_params = locals().copy()
168    provided_params.pop("ctx")
169    provided_params.pop("key")
170    provided_params.pop("netuid")
171
172    provided_params = {
173        key: value
174        for key, value in provided_params.items()
175        if value is not None
176    }
177    if vote_mode is not None:  # type: ignore
178        provided_params["vote_mode"] = vote_mode.value
179    context = make_custom_context(ctx)
180    client = context.com_client()
181    subnets_info = get_map_subnets_params(client)
182    subnet_params = subnets_info[netuid]
183    subnet_vote_mode = subnet_params["governance_config"]["vote_mode"]  # type: ignore
184    subnet_burn_config = subnet_params["module_burn_config"]  # type: ignore
185    # intersection update
186    for param, value in provided_params.items():
187        if param in subnet_burn_config and value is not None:
188            subnet_burn_config[param] = value
189
190    subnet_params = dict(subnet_params)
191    subnet_params.pop("emission")
192    subnet_params.pop("governance_config")
193    subnet_params["vote_mode"] = subnet_vote_mode  # type: ignore
194
195    subnet_params = cast(SubnetParams, subnet_params)
196    provided_params = cast(SubnetParams, provided_params)
197    subnet_params.update(provided_params)
198    # because bonds_ma and maximum_set_weights dont have a default value
199    if subnet_params.get("bonds_ma", None) is None:
200        subnet_params["bonds_ma"] = client.query("BondsMovingAverage")
201    if subnet_params.get("maximum_set_weight_calls_per_epoch", None) is None:
202        subnet_params["maximum_set_weight_calls_per_epoch"] = client.query(
203            "MaximumSetWeightCallsPerEpoch"
204        )
205    resolved_key = context.load_key(key, None)
206    with context.progress_status("Updating subnet ..."):
207        response = client.update_subnet(
208            key=resolved_key, params=subnet_params, netuid=netuid
209        )
210
211    if response.is_success:
212        context.info(
213            f"Successfully updated subnet {subnet_params['name']} with netuid {netuid}"
214        )
215    else:
216        raise ChainTransactionError(response.error_message)  # type: ignore

Updates a subnet.

@subnet_app.command()
def propose_on_subnet( ctx: typer.models.Context, key: str, netuid: int, cid: str, founder: str = <typer.models.OptionInfo object>, founder_share: int = <typer.models.OptionInfo object>, metadata: str = <typer.models.OptionInfo object>, name: str = <typer.models.OptionInfo object>, immunity_period: int = <typer.models.OptionInfo object>, incentive_ratio: int = <typer.models.OptionInfo object>, max_allowed_uids: int = <typer.models.OptionInfo object>, max_allowed_weights: int = <typer.models.OptionInfo object>, min_allowed_weights: int = <typer.models.OptionInfo object>, max_weight_age: int = <typer.models.OptionInfo object>, tempo: int = <typer.models.OptionInfo object>, trust_ratio: int = <typer.models.OptionInfo object>, maximum_set_weight_calls_per_epoch: int = <typer.models.OptionInfo object>, bonds_ma: int = <typer.models.OptionInfo object>, vote_mode: communex.types.VoteMode = <typer.models.OptionInfo object>, min_burn: int = <typer.models.OptionInfo object>, max_burn: int = <typer.models.OptionInfo object>, adjustment_alpha: int = <typer.models.OptionInfo object>, target_registrations_interval: int = <typer.models.OptionInfo object>, target_registrations_per_interval: int = <typer.models.OptionInfo object>, max_registrations_per_interval: int = <typer.models.OptionInfo object>, min_validator_stake: int = <typer.models.OptionInfo object>, max_allowed_validators: int = <typer.models.OptionInfo object>):
219@subnet_app.command()
220def propose_on_subnet(
221    ctx: Context,
222    key: str,
223    netuid: int,
224    cid: str,
225    founder: str = typer.Option(None),
226    founder_share: int = typer.Option(None),
227    metadata: str = typer.Option(None),
228    name: str = typer.Option(None),
229    immunity_period: int = typer.Option(None),
230    incentive_ratio: int = typer.Option(None),
231    max_allowed_uids: int = typer.Option(None),
232    max_allowed_weights: int = typer.Option(None),
233    min_allowed_weights: int = typer.Option(None),
234    max_weight_age: int = typer.Option(None),
235    tempo: int = typer.Option(None),
236    trust_ratio: int = typer.Option(None),
237    maximum_set_weight_calls_per_epoch: int = typer.Option(None),
238    bonds_ma: int = typer.Option(None),
239    vote_mode: VoteMode = typer.Option(
240        None, help="0 for Authority, 1 for Vote"
241    ),
242    # BurnConfiguration
243    min_burn: int = typer.Option(None),
244    max_burn: int = typer.Option(None),
245    adjustment_alpha: int = typer.Option(None),
246    target_registrations_interval: int = typer.Option(None),
247    target_registrations_per_interval: int = typer.Option(None),
248    max_registrations_per_interval: int = typer.Option(None),
249    min_validator_stake: int = typer.Option(None),
250    max_allowed_validators: int = typer.Option(None),
251):
252    """
253    Adds a proposal to a specific subnet.
254    """
255    context = make_custom_context(ctx)
256    if not re.match(IPFS_REGEX, cid):
257        context.error(f"CID provided is invalid: {cid}")
258        exit(1)
259    else:
260        ipfs_prefix = "ipfs://"
261        cid = ipfs_prefix + cid
262
263    provided_params = locals().copy()
264    provided_params.pop("ctx")
265    provided_params.pop("context")
266    provided_params.pop("key")
267    provided_params.pop("ipfs_prefix")
268    if provided_params["founder"] is not None:
269        resolve_founder = resolve_key_ss58(founder)
270        provided_params["founder"] = resolve_founder
271
272    provided_params = {
273        key: value
274        for key, value in provided_params.items()
275        if value is not None
276    }
277
278    client = context.com_client()
279    subnets_info = get_map_subnets_params(client)
280    subnet_params = subnets_info[netuid]
281    subnet_vote_mode = subnet_params["governance_config"]["vote_mode"]  # type: ignore
282    subnet_burn_config = subnet_params["module_burn_config"]  # type: ignore
283    # intersection update
284    for param, value in provided_params.items():
285        if param in subnet_burn_config and value is not None:
286            subnet_burn_config[param] = value
287    subnet_params["vote_mode"] = subnet_vote_mode  # type: ignore
288
289    subnet_params = dict(subnet_params)
290    subnet_params.pop("emission")
291    subnet_params.pop("governance_config")
292
293    subnet_params.update(provided_params)
294    # because bonds_ma and maximum_set_weights dont have a default value
295    if subnet_params.get("bonds_ma", None) is None:
296        subnet_params["bonds_ma"] = client.query("BondsMovingAverage")
297    if subnet_params.get("maximum_set_weight_calls_per_epoch", None) is None:
298        subnet_params["maximum_set_weight_calls_per_epoch"] = client.query(
299            "MaximumSetWeightCallsPerEpoch"
300        )
301
302    resolved_key = context.load_key(key, None)
303    with context.progress_status("Adding a proposal..."):
304        client.add_subnet_proposal(
305            resolved_key, subnet_params, cid, netuid=netuid
306        )
307    context.info("Proposal added.")

Adds a proposal to a specific subnet.

@subnet_app.command()
def submit_general_subnet_application(ctx: typer.models.Context, key: str, application_key: str, cid: str):
310@subnet_app.command()
311def submit_general_subnet_application(
312    ctx: Context, key: str, application_key: str, cid: str
313):
314    """
315    Submits a legitimate whitelist application to the general subnet, netuid 0.
316    """
317
318    context = make_custom_context(ctx)
319    if not re.match(IPFS_REGEX, cid):
320        context.error(f"CID provided is invalid: {cid}")
321        exit(1)
322
323    client = context.com_client()
324
325    resolved_key = context.load_key(key, None)
326    resolved_application_key = resolve_key_ss58(application_key)
327
328    # append the ipfs hash
329    ipfs_prefix = "ipfs://"
330    cid = ipfs_prefix + cid
331
332    with context.progress_status("Adding a application..."):
333        client.add_dao_application(resolved_key, resolved_application_key, cid)

Submits a legitimate whitelist application to the general subnet, netuid 0.

@subnet_app.command()
def add_custom_proposal(ctx: typer.models.Context, key: str, cid: str, netuid: int):
336@subnet_app.command()
337def add_custom_proposal(
338    ctx: Context,
339    key: str,
340    cid: str,
341    netuid: int,
342):
343    """
344    Adds a custom proposal to a specific subnet.
345    """
346    context = make_custom_context(ctx)
347
348    if not re.match(IPFS_REGEX, cid):
349        context.error(f"CID provided is invalid: {cid}")
350        exit(1)
351
352    client = context.com_client()
353
354    resolved_key = context.load_key(key, None)
355
356    ipfs_prefix = "ipfs://"
357    cid = ipfs_prefix + cid
358
359    with context.progress_status("Adding a proposal..."):
360        client.add_custom_subnet_proposal(resolved_key, cid, netuid=netuid)

Adds a custom proposal to a specific subnet.

@subnet_app.command()
def list_curator_applications(ctx: typer.models.Context):
363@subnet_app.command()
364def list_curator_applications(ctx: Context):
365    """
366    Lists all curator applications.
367    """
368    context = make_custom_context(ctx)
369    client = context.com_client()
370
371    with context.progress_status("Querying applications..."):
372        apps = client.query_map_curator_applications()
373    print(apps)

Lists all curator applications.