Edit on GitHub

communex.cli.module

  1import importlib.util
  2from typing import Any, Optional, cast
  3
  4import typer
  5import uvicorn
  6from typer import Context
  7
  8import communex.balance as c_balance
  9from communex._common import intersection_update
 10from communex.cli._common import (
 11    make_custom_context,
 12    print_module_info,
 13    print_table_from_plain_dict,
 14)
 15from communex.errors import ChainTransactionError
 16from communex.key import check_ss58_address
 17from communex.misc import get_map_modules
 18from communex.module._rate_limiters.limiters import (
 19    IpLimiterParams,
 20    StakeLimiterParams,
 21)
 22from communex.module.server import ModuleServer
 23from communex.types import Ss58Address
 24from communex.util import is_ip_valid
 25
 26module_app = typer.Typer(no_args_is_help=True)
 27
 28
 29def list_to_ss58(str_list: list[str] | None) -> list[Ss58Address] | None:
 30    """Raises AssertionError if some input is not a valid Ss58Address."""
 31
 32    if str_list is None:
 33        return None
 34    new_list: list[Ss58Address] = []
 35    for item in str_list:
 36        new_item = check_ss58_address(item)
 37        new_list.append(new_item)
 38    return new_list
 39
 40
 41# TODO: refactor module register CLI
 42# - module address should be a single (arbitrary) parameter
 43# - key can be infered from name or vice-versa?
 44@module_app.command()
 45def register(
 46    ctx: Context,
 47    name: str,
 48    key: str,
 49    netuid: int,
 50    ip: Optional[str] = None,
 51    port: Optional[int] = None,
 52    metadata: Optional[str] = None,
 53):
 54    """
 55    Registers a module on a subnet.
 56    """
 57    context = make_custom_context(ctx)
 58    client = context.com_client()
 59    if metadata and len(metadata) > 59:
 60        raise ValueError("Metadata must be less than 60 characters")
 61
 62    burn = client.get_burn(netuid=netuid)
 63
 64    if netuid != 0:
 65        do_burn = context.confirm(
 66            f"{c_balance.from_nano(burn)} $COMAI will be permanently burned. Do you want to continue?"
 67        )
 68
 69        if not do_burn:
 70            context.info("Not registering")
 71            raise typer.Abort()
 72
 73    resolved_key = context.load_key(key, None)
 74
 75    with context.progress_status(f"Registering Module {name}..."):
 76        subnet_name = client.get_subnet_name(netuid)
 77        address = f"{ip}:{port}"
 78
 79        response = client.register_module(
 80            resolved_key,
 81            name=name,
 82            subnet=subnet_name,
 83            address=address,
 84            metadata=metadata,
 85        )
 86
 87        if response.is_success:
 88            context.info(f"Module {name} registered")
 89        else:
 90            raise ChainTransactionError(response.error_message)  # type: ignore
 91
 92
 93@module_app.command()
 94def deregister(ctx: Context, key: str, netuid: int):
 95    """
 96    Deregisters a module from a subnet.
 97    """
 98    context = make_custom_context(ctx)
 99    client = context.com_client()
100
101    resolved_key = context.load_key(key, None)
102
103    with context.progress_status(
104        f"Deregistering your module on subnet {netuid}..."
105    ):
106        response = client.deregister_module(key=resolved_key, netuid=netuid)
107
108        if response.is_success:
109            context.info("Module deregistered")
110        else:
111            raise ChainTransactionError(response.error_message)  # type: ignore
112
113
114@module_app.command()
115def update(
116    ctx: Context,
117    key: str,
118    netuid: int,
119    name: Optional[str] = None,
120    ip: Optional[str] = None,
121    port: Optional[int] = None,
122    stake_delegation_fee: Optional[int] = None,
123    validator_weight_fee: Optional[int] = None,
124    metadata: Optional[str] = None,
125):
126    """
127    Update module with custom parameters.
128    """
129
130    context = make_custom_context(ctx)
131    client = context.com_client()
132
133    if metadata and len(metadata) > 59:
134        raise ValueError("Metadata must be less than 60 characters")
135
136    resolved_key = context.load_key(key, None)
137
138    if ip and not is_ip_valid(ip):
139        raise ValueError("Invalid ip address")
140    modules = get_map_modules(client, netuid=netuid, include_balances=False)
141    modules_to_list = [value for _, value in modules.items()]
142
143    module = next(
144        (
145            item
146            for item in modules_to_list
147            if item["key"] == resolved_key.ss58_address
148        ),
149        None,
150    )
151
152    if module is None:
153        raise ValueError(f"Module {name} not found")
154    module_params = {
155        "name": name,
156        "ip": ip,
157        "port": port,
158        "delegation_fee": stake_delegation_fee,
159        "metadata": metadata,
160    }
161    to_update = {
162        key: value for key, value in module_params.items() if value is not None
163    }
164    current_address = module["address"]
165    if ":" in current_address:
166        current_ip, current_port = current_address.split(":")
167    else:
168        current_ip, current_port = current_address, None
169
170    new_ip = to_update.get("ip", current_ip)
171    new_port = to_update.get("port", current_port)
172
173    if new_port is not None:
174        address = f"{new_ip}:{new_port}"
175    else:
176        address = new_ip
177    to_update["address"] = address
178    updated_module = intersection_update(dict(module), to_update)
179    module.update(updated_module)  # type: ignore
180    with context.progress_status(
181        f"Updating Module on a subnet with netuid '{netuid}' ..."
182    ):
183        response = client.update_module(
184            key=resolved_key,
185            name=module["name"],
186            address=module["address"],
187            delegation_fee=module["stake_delegation_fee"],
188            weight_setting_delegation_fee=validator_weight_fee,
189            netuid=netuid,
190            metadata=module["metadata"],
191        )
192
193    if response.is_success:
194        context.info(f"Module {key} updated")
195    else:
196        raise ChainTransactionError(response.error_message)  # type: ignore
197
198
199@module_app.command()
200def serve(
201    ctx: typer.Context,
202    class_path: str,
203    key: str,
204    port: int = 8000,
205    ip: Optional[str] = None,
206    subnets_whitelist: Optional[list[int]] = [0],
207    whitelist: Optional[list[str]] = None,
208    blacklist: Optional[list[str]] = None,
209    ip_blacklist: Optional[list[str]] = None,
210    test_mode: Optional[bool] = False,
211    request_staleness: int = typer.Option(120),
212    use_ip_limiter: Optional[bool] = typer.Option(
213        False, help=("If this value is passed, the ip limiter will be used")
214    ),
215    token_refill_rate_base_multiplier: Optional[int] = typer.Option(
216        None,
217        help=(
218            "Multiply the base limit per stake. Only used in stake limiter mode."
219        ),
220    ),
221):
222    """
223    Serves a module on `127.0.0.1` on port `port`. `class_path` should specify
224    the dotted path to the module class e.g. `module.submodule.ClassName`.
225    """
226    context = make_custom_context(ctx)
227    use_testnet = context.get_use_testnet()
228    path_parts = class_path.split(".")
229    match path_parts:
230        case [*module_parts, class_name]:
231            module_path = ".".join(module_parts)
232            if not module_path:
233                # This could do some kind of relative import somehow?
234                raise ValueError(
235                    f"Invalid class path: `{class_path}`, module name is missing"
236                )
237            if not class_name:
238                raise ValueError(
239                    f"Invalid class path: `{class_path}`, class name is missing"
240                )
241        case _:
242            # This is impossible
243            raise Exception(f"Invalid class path: `{class_path}`")
244
245    try:
246        module = importlib.import_module(module_path)
247    except ModuleNotFoundError:
248        context.error(f"Module `{module_path}` not found")
249        raise typer.Exit(code=1)
250
251    try:
252        class_obj = getattr(module, class_name)
253    except AttributeError:
254        context.error(f"Class `{class_name}` not found in module `{module}`")
255        raise typer.Exit(code=1)
256
257    keypair = context.load_key(key, None)
258
259    if test_mode:
260        subnets_whitelist = None
261    token_refill_rate = token_refill_rate_base_multiplier or 1
262    limiter_params = (
263        IpLimiterParams()
264        if use_ip_limiter
265        else StakeLimiterParams(token_ratio=token_refill_rate)
266    )
267
268    if whitelist is None:
269        context.info(
270            "WARNING: No whitelist provided, will accept calls from any key"
271        )
272
273    try:
274        whitelist_ss58 = list_to_ss58(whitelist)
275    except AssertionError:
276        context.error("Invalid SS58 address passed to whitelist")
277        exit(1)
278    try:
279        blacklist_ss58 = list_to_ss58(blacklist)
280    except AssertionError:
281        context.error("Invalid SS58 address passed on blacklist")
282        exit(1)
283    cast(list[Ss58Address] | None, whitelist)
284
285    server = ModuleServer(
286        class_obj(),
287        keypair,
288        whitelist=whitelist_ss58,
289        blacklist=blacklist_ss58,
290        subnets_whitelist=subnets_whitelist,
291        max_request_staleness=request_staleness,
292        limiter=limiter_params,
293        ip_blacklist=ip_blacklist,
294        use_testnet=use_testnet,
295    )
296    app = server.get_fastapi_app()
297    host = ip or "127.0.0.1"
298    uvicorn.run(app, host=host, port=port)  # type: ignore
299
300
301@module_app.command()
302def info(ctx: Context, name: str, balance: bool = False, netuid: int = 0):
303    """
304    Gets module info
305    """
306    context = make_custom_context(ctx)
307    client = context.com_client()
308
309    with context.progress_status(
310        f"Getting Module {name} on a subnet with netuid {netuid}…"
311    ):
312        modules = get_map_modules(
313            client, netuid=netuid, include_balances=balance
314        )
315        modules_to_list = [value for _, value in modules.items()]
316
317        module = next(
318            (item for item in modules_to_list if item["name"] == name), None
319        )
320
321    if module is None:
322        raise ValueError("Module not found")
323
324    general_module = cast(dict[str, Any], module)
325    print_table_from_plain_dict(
326        general_module, ["Params", "Values"], context.console
327    )
328
329
330@module_app.command(name="list")
331def inventory(ctx: Context, balances: bool = False, netuid: int = 0):
332    """
333    Modules stats on the network.
334    """
335    context = make_custom_context(ctx)
336    client = context.com_client()
337
338    # with context.progress_status(
339    #     f"Getting Modules on a subnet with netuid {netuid}..."
340    # ):
341    modules = cast(
342        dict[str, Any],
343        get_map_modules(client, netuid=netuid, include_balances=balances),
344    )
345
346    # Convert the values to a human readable format
347    modules_to_list = [value for _, value in modules.items()]
348
349    miners: list[Any] = []
350    validators: list[Any] = []
351    inactive: list[Any] = []
352
353    for module in modules_to_list:
354        if module["incentive"] == module["dividends"] == 0:
355            inactive.append(module)
356        elif module["incentive"] > module["dividends"]:
357            miners.append(module)
358        else:
359            validators.append(module)
360
361    print_module_info(client, miners, context.console, netuid, "miners")
362    print_module_info(client, validators, context.console, netuid, "validators")
363    print_module_info(client, inactive, context.console, netuid, "inactive")
module_app = <typer.main.Typer object>
def list_to_ss58(str_list: list[str] | None) -> list[communex.types.Ss58Address] | None:
30def list_to_ss58(str_list: list[str] | None) -> list[Ss58Address] | None:
31    """Raises AssertionError if some input is not a valid Ss58Address."""
32
33    if str_list is None:
34        return None
35    new_list: list[Ss58Address] = []
36    for item in str_list:
37        new_item = check_ss58_address(item)
38        new_list.append(new_item)
39    return new_list

Raises AssertionError if some input is not a valid Ss58Address.

@module_app.command()
def register( ctx: typer.models.Context, name: str, key: str, netuid: int, ip: Optional[str] = None, port: Optional[int] = None, metadata: Optional[str] = None):
45@module_app.command()
46def register(
47    ctx: Context,
48    name: str,
49    key: str,
50    netuid: int,
51    ip: Optional[str] = None,
52    port: Optional[int] = None,
53    metadata: Optional[str] = None,
54):
55    """
56    Registers a module on a subnet.
57    """
58    context = make_custom_context(ctx)
59    client = context.com_client()
60    if metadata and len(metadata) > 59:
61        raise ValueError("Metadata must be less than 60 characters")
62
63    burn = client.get_burn(netuid=netuid)
64
65    if netuid != 0:
66        do_burn = context.confirm(
67            f"{c_balance.from_nano(burn)} $COMAI will be permanently burned. Do you want to continue?"
68        )
69
70        if not do_burn:
71            context.info("Not registering")
72            raise typer.Abort()
73
74    resolved_key = context.load_key(key, None)
75
76    with context.progress_status(f"Registering Module {name}..."):
77        subnet_name = client.get_subnet_name(netuid)
78        address = f"{ip}:{port}"
79
80        response = client.register_module(
81            resolved_key,
82            name=name,
83            subnet=subnet_name,
84            address=address,
85            metadata=metadata,
86        )
87
88        if response.is_success:
89            context.info(f"Module {name} registered")
90        else:
91            raise ChainTransactionError(response.error_message)  # type: ignore

Registers a module on a subnet.

@module_app.command()
def deregister(ctx: typer.models.Context, key: str, netuid: int):
 94@module_app.command()
 95def deregister(ctx: Context, key: str, netuid: int):
 96    """
 97    Deregisters a module from a subnet.
 98    """
 99    context = make_custom_context(ctx)
100    client = context.com_client()
101
102    resolved_key = context.load_key(key, None)
103
104    with context.progress_status(
105        f"Deregistering your module on subnet {netuid}..."
106    ):
107        response = client.deregister_module(key=resolved_key, netuid=netuid)
108
109        if response.is_success:
110            context.info("Module deregistered")
111        else:
112            raise ChainTransactionError(response.error_message)  # type: ignore

Deregisters a module from a subnet.

@module_app.command()
def update( ctx: typer.models.Context, key: str, netuid: int, name: Optional[str] = None, ip: Optional[str] = None, port: Optional[int] = None, stake_delegation_fee: Optional[int] = None, validator_weight_fee: Optional[int] = None, metadata: Optional[str] = None):
115@module_app.command()
116def update(
117    ctx: Context,
118    key: str,
119    netuid: int,
120    name: Optional[str] = None,
121    ip: Optional[str] = None,
122    port: Optional[int] = None,
123    stake_delegation_fee: Optional[int] = None,
124    validator_weight_fee: Optional[int] = None,
125    metadata: Optional[str] = None,
126):
127    """
128    Update module with custom parameters.
129    """
130
131    context = make_custom_context(ctx)
132    client = context.com_client()
133
134    if metadata and len(metadata) > 59:
135        raise ValueError("Metadata must be less than 60 characters")
136
137    resolved_key = context.load_key(key, None)
138
139    if ip and not is_ip_valid(ip):
140        raise ValueError("Invalid ip address")
141    modules = get_map_modules(client, netuid=netuid, include_balances=False)
142    modules_to_list = [value for _, value in modules.items()]
143
144    module = next(
145        (
146            item
147            for item in modules_to_list
148            if item["key"] == resolved_key.ss58_address
149        ),
150        None,
151    )
152
153    if module is None:
154        raise ValueError(f"Module {name} not found")
155    module_params = {
156        "name": name,
157        "ip": ip,
158        "port": port,
159        "delegation_fee": stake_delegation_fee,
160        "metadata": metadata,
161    }
162    to_update = {
163        key: value for key, value in module_params.items() if value is not None
164    }
165    current_address = module["address"]
166    if ":" in current_address:
167        current_ip, current_port = current_address.split(":")
168    else:
169        current_ip, current_port = current_address, None
170
171    new_ip = to_update.get("ip", current_ip)
172    new_port = to_update.get("port", current_port)
173
174    if new_port is not None:
175        address = f"{new_ip}:{new_port}"
176    else:
177        address = new_ip
178    to_update["address"] = address
179    updated_module = intersection_update(dict(module), to_update)
180    module.update(updated_module)  # type: ignore
181    with context.progress_status(
182        f"Updating Module on a subnet with netuid '{netuid}' ..."
183    ):
184        response = client.update_module(
185            key=resolved_key,
186            name=module["name"],
187            address=module["address"],
188            delegation_fee=module["stake_delegation_fee"],
189            weight_setting_delegation_fee=validator_weight_fee,
190            netuid=netuid,
191            metadata=module["metadata"],
192        )
193
194    if response.is_success:
195        context.info(f"Module {key} updated")
196    else:
197        raise ChainTransactionError(response.error_message)  # type: ignore

Update module with custom parameters.

@module_app.command()
def serve( ctx: typer.models.Context, class_path: str, key: str, port: int = 8000, ip: Optional[str] = None, subnets_whitelist: Optional[list[int]] = [0], whitelist: Optional[list[str]] = None, blacklist: Optional[list[str]] = None, ip_blacklist: Optional[list[str]] = None, test_mode: Optional[bool] = False, request_staleness: int = <typer.models.OptionInfo object>, use_ip_limiter: Optional[bool] = <typer.models.OptionInfo object>, token_refill_rate_base_multiplier: Optional[int] = <typer.models.OptionInfo object>):
200@module_app.command()
201def serve(
202    ctx: typer.Context,
203    class_path: str,
204    key: str,
205    port: int = 8000,
206    ip: Optional[str] = None,
207    subnets_whitelist: Optional[list[int]] = [0],
208    whitelist: Optional[list[str]] = None,
209    blacklist: Optional[list[str]] = None,
210    ip_blacklist: Optional[list[str]] = None,
211    test_mode: Optional[bool] = False,
212    request_staleness: int = typer.Option(120),
213    use_ip_limiter: Optional[bool] = typer.Option(
214        False, help=("If this value is passed, the ip limiter will be used")
215    ),
216    token_refill_rate_base_multiplier: Optional[int] = typer.Option(
217        None,
218        help=(
219            "Multiply the base limit per stake. Only used in stake limiter mode."
220        ),
221    ),
222):
223    """
224    Serves a module on `127.0.0.1` on port `port`. `class_path` should specify
225    the dotted path to the module class e.g. `module.submodule.ClassName`.
226    """
227    context = make_custom_context(ctx)
228    use_testnet = context.get_use_testnet()
229    path_parts = class_path.split(".")
230    match path_parts:
231        case [*module_parts, class_name]:
232            module_path = ".".join(module_parts)
233            if not module_path:
234                # This could do some kind of relative import somehow?
235                raise ValueError(
236                    f"Invalid class path: `{class_path}`, module name is missing"
237                )
238            if not class_name:
239                raise ValueError(
240                    f"Invalid class path: `{class_path}`, class name is missing"
241                )
242        case _:
243            # This is impossible
244            raise Exception(f"Invalid class path: `{class_path}`")
245
246    try:
247        module = importlib.import_module(module_path)
248    except ModuleNotFoundError:
249        context.error(f"Module `{module_path}` not found")
250        raise typer.Exit(code=1)
251
252    try:
253        class_obj = getattr(module, class_name)
254    except AttributeError:
255        context.error(f"Class `{class_name}` not found in module `{module}`")
256        raise typer.Exit(code=1)
257
258    keypair = context.load_key(key, None)
259
260    if test_mode:
261        subnets_whitelist = None
262    token_refill_rate = token_refill_rate_base_multiplier or 1
263    limiter_params = (
264        IpLimiterParams()
265        if use_ip_limiter
266        else StakeLimiterParams(token_ratio=token_refill_rate)
267    )
268
269    if whitelist is None:
270        context.info(
271            "WARNING: No whitelist provided, will accept calls from any key"
272        )
273
274    try:
275        whitelist_ss58 = list_to_ss58(whitelist)
276    except AssertionError:
277        context.error("Invalid SS58 address passed to whitelist")
278        exit(1)
279    try:
280        blacklist_ss58 = list_to_ss58(blacklist)
281    except AssertionError:
282        context.error("Invalid SS58 address passed on blacklist")
283        exit(1)
284    cast(list[Ss58Address] | None, whitelist)
285
286    server = ModuleServer(
287        class_obj(),
288        keypair,
289        whitelist=whitelist_ss58,
290        blacklist=blacklist_ss58,
291        subnets_whitelist=subnets_whitelist,
292        max_request_staleness=request_staleness,
293        limiter=limiter_params,
294        ip_blacklist=ip_blacklist,
295        use_testnet=use_testnet,
296    )
297    app = server.get_fastapi_app()
298    host = ip or "127.0.0.1"
299    uvicorn.run(app, host=host, port=port)  # type: ignore

Serves a module on 127.0.0.1 on port port. class_path should specify the dotted path to the module class e.g. module.submodule.ClassName.

@module_app.command()
def info( ctx: typer.models.Context, name: str, balance: bool = False, netuid: int = 0):
302@module_app.command()
303def info(ctx: Context, name: str, balance: bool = False, netuid: int = 0):
304    """
305    Gets module info
306    """
307    context = make_custom_context(ctx)
308    client = context.com_client()
309
310    with context.progress_status(
311        f"Getting Module {name} on a subnet with netuid {netuid}…"
312    ):
313        modules = get_map_modules(
314            client, netuid=netuid, include_balances=balance
315        )
316        modules_to_list = [value for _, value in modules.items()]
317
318        module = next(
319            (item for item in modules_to_list if item["name"] == name), None
320        )
321
322    if module is None:
323        raise ValueError("Module not found")
324
325    general_module = cast(dict[str, Any], module)
326    print_table_from_plain_dict(
327        general_module, ["Params", "Values"], context.console
328    )

Gets module info

@module_app.command(name='list')
def inventory(ctx: typer.models.Context, balances: bool = False, netuid: int = 0):
331@module_app.command(name="list")
332def inventory(ctx: Context, balances: bool = False, netuid: int = 0):
333    """
334    Modules stats on the network.
335    """
336    context = make_custom_context(ctx)
337    client = context.com_client()
338
339    # with context.progress_status(
340    #     f"Getting Modules on a subnet with netuid {netuid}..."
341    # ):
342    modules = cast(
343        dict[str, Any],
344        get_map_modules(client, netuid=netuid, include_balances=balances),
345    )
346
347    # Convert the values to a human readable format
348    modules_to_list = [value for _, value in modules.items()]
349
350    miners: list[Any] = []
351    validators: list[Any] = []
352    inactive: list[Any] = []
353
354    for module in modules_to_list:
355        if module["incentive"] == module["dividends"] == 0:
356            inactive.append(module)
357        elif module["incentive"] > module["dividends"]:
358            miners.append(module)
359        else:
360            validators.append(module)
361
362    print_module_info(client, miners, context.console, netuid, "miners")
363    print_module_info(client, validators, context.console, netuid, "validators")
364    print_module_info(client, inactive, context.console, netuid, "inactive")

Modules stats on the network.