Edit on GitHub

communex.cli.module

  1import importlib.util
  2from typing import Any, Optional, cast
  3
  4import typer
  5import uvicorn
  6from typer import Context
  7
  8import communex.balance as c_balance
  9from communex._common import intersection_update
 10from communex.cli._common import (make_custom_context, print_module_info,
 11                                  print_table_from_plain_dict)
 12from communex.compat.key import try_classic_load_key
 13from communex.errors import ChainTransactionError
 14from communex.key import check_ss58_address
 15from communex.misc import get_map_modules
 16from communex.module._rate_limiters.limiters import (IpLimiterParams,
 17                                                     StakeLimiterParams)
 18from communex.module.server import ModuleServer
 19from communex.types import Ss58Address
 20from communex.util import is_ip_valid
 21
 22module_app = typer.Typer(no_args_is_help=True)
 23
 24
 25def list_to_ss58(str_list: list[str] | None) -> list[Ss58Address] | None:
 26    """Raises AssertionError if some input is not a valid Ss58Address."""
 27
 28    if str_list is None:
 29        return None
 30    new_list: list[Ss58Address] = []
 31    for item in str_list:
 32        new_item = check_ss58_address(item)
 33        new_list.append(new_item)
 34    return new_list
 35
 36
 37# TODO: refactor module register CLI
 38# - module address should be a single (arbitrary) parameter
 39# - key can be infered from name or vice-versa?
 40@module_app.command()
 41def register(
 42    ctx: Context,
 43    name: str,
 44    key: str,
 45    netuid: int,
 46    ip: Optional[str] = None,
 47    port: Optional[int] = None,
 48    metadata: Optional[str] = None,
 49):
 50    """
 51    Registers a module on a subnet.
 52    """
 53    context = make_custom_context(ctx)
 54    client = context.com_client()
 55    if metadata and len(metadata) > 59:
 56        raise ValueError("Metadata must be less than 60 characters")
 57
 58    burn = client.get_burn(netuid=netuid)
 59
 60    if netuid != 0:
 61        do_burn = context.confirm(
 62            f"{c_balance.from_nano(burn)} $COMAI will be permanently burned. Do you want to continue?"
 63        )
 64
 65        if not do_burn:
 66            context.info("Not registering")
 67            raise typer.Abort()
 68
 69    resolved_key = try_classic_load_key(key, context)
 70    with context.progress_status(f"Registering Module {name}..."):
 71        subnet_name = client.get_subnet_name(netuid)
 72        address = f"{ip}:{port}"
 73
 74        response = client.register_module(
 75            resolved_key,
 76            name=name,
 77            subnet=subnet_name,
 78            address=address,
 79            metadata=metadata,
 80        )
 81
 82        if response.is_success:
 83            context.info(f"Module {name} registered")
 84        else:
 85            raise ChainTransactionError(response.error_message)  # type: ignore
 86
 87
 88@module_app.command()
 89def deregister(ctx: Context, key: str, netuid: int):
 90    """
 91    Deregisters a module from a subnet.
 92    """
 93    context = make_custom_context(ctx)
 94    client = context.com_client()
 95
 96    resolved_key = try_classic_load_key(key, context)
 97    with context.progress_status(f"Deregistering your module on subnet {netuid}..."):
 98
 99        response = client.deregister_module(key=resolved_key, netuid=netuid)
100
101        if response.is_success:
102            context.info("Module deregistered")
103        else:
104            raise ChainTransactionError(response.error_message)  # type: ignore
105
106
107@module_app.command()
108def update(
109    ctx: Context,
110    key: str,
111    netuid: int,
112    name: Optional[str] = None,
113    ip: Optional[str] = None,
114    port: Optional[int] = None,
115    delegation_fee: Optional[int] = None,
116    metadata: Optional[str] = None,
117):
118    """
119    Update module with custom parameters.
120    """
121
122    context = make_custom_context(ctx)
123    client = context.com_client()
124    if metadata and len(metadata) > 59:
125        raise ValueError("Metadata must be less than 60 characters")
126    resolved_key = try_classic_load_key(key)
127
128    if ip and not is_ip_valid(ip):
129        raise ValueError("Invalid ip address")
130    modules = get_map_modules(client, netuid=netuid, include_balances=False)
131    modules_to_list = [value for _, value in modules.items()]
132
133    module = next(
134        (
135            item for item in modules_to_list if item["key"] == resolved_key.ss58_address
136        ),
137        None
138    )
139
140    if module is None:
141        raise ValueError(f"Module {name} not found")
142    module_params = {
143        "name": name,
144        "ip": ip,
145        "port": port,
146        "delegation_fee": delegation_fee,
147        "metadata": metadata,
148    }
149    to_update = {
150        key: value for key, value in module_params.items() if value is not None
151    }
152    current_ip, current_port = module["address"].split(":")
153    new_ip = to_update.get("ip", current_ip)
154    new_port = to_update.get("port", current_port)
155    address = f"{new_ip}:{new_port}"
156    to_update["address"] = address
157    updated_module = intersection_update(dict(module), to_update)
158    module.update(updated_module)  # type: ignore
159    with context.progress_status(
160        f"Updating Module on a subnet with netuid '{netuid}' ..."
161    ):
162        response = client.update_module(
163            key=resolved_key,
164            name=module["name"],
165            address=module["address"],
166            delegation_fee=module["delegation_fee"],
167            netuid=netuid,
168            metadata=module["metadata"],
169        )
170
171    if response.is_success:
172        context.info(f"Module {key} updated")
173    else:
174        raise ChainTransactionError(response.error_message)  # type: ignore
175
176
177@module_app.command()
178def serve(
179    ctx: typer.Context,
180    class_path: str,
181    key: str,
182    port: int = 8000,
183    ip: Optional[str] = None,
184    subnets_whitelist: Optional[list[int]] = [0],
185    whitelist: Optional[list[str]] = None,
186    blacklist: Optional[list[str]] = None,
187    ip_blacklist: Optional[list[str]] = None,
188    test_mode: Optional[bool] = False,
189    request_staleness: int = typer.Option(120),
190    use_ip_limiter: Optional[bool] = typer.Option(
191        False, help=("If this value is passed, the ip limiter will be used")
192    ),
193    token_refill_rate_base_multiplier: Optional[int] = typer.Option(
194        None, help=(
195            "Multiply the base limit per stake. Only used in stake limiter mode."
196        )
197    )
198):
199    """
200    Serves a module on `127.0.0.1` on port `port`. `class_path` should specify
201    the dotted path to the module class e.g. `module.submodule.ClassName`.
202    """
203    context = make_custom_context(ctx)
204    use_testnet = context.get_use_testnet()
205    path_parts = class_path.split(".")
206    match path_parts:
207        case [*module_parts, class_name]:
208            module_path = ".".join(module_parts)
209            if not module_path:
210                # This could do some kind of relative import somehow?
211                raise ValueError(
212                    f"Invalid class path: `{class_path}`, module name is missing"
213                )
214            if not class_name:
215                raise ValueError(
216                    f"Invalid class path: `{class_path}`, class name is missing"
217                )
218        case _:
219            # This is impossible
220            raise Exception(f"Invalid class path: `{class_path}`")
221
222    try:
223        module = importlib.import_module(module_path)
224    except ModuleNotFoundError:
225        context.error(f"Module `{module_path}` not found")
226        raise typer.Exit(code=1)
227
228    try:
229        class_obj = getattr(module, class_name)
230    except AttributeError:
231        context.error(f"Class `{class_name}` not found in module `{module}`")
232        raise typer.Exit(code=1)
233
234    keypair = try_classic_load_key(key, context)
235    if test_mode:
236        subnets_whitelist = None
237    token_refill_rate = token_refill_rate_base_multiplier or 1
238    limiter_params = IpLimiterParams() if use_ip_limiter else StakeLimiterParams(token_ratio=token_refill_rate)
239
240    if whitelist is None:
241        context.info(
242            "WARNING: No whitelist provided, will accept calls from any key"
243        )
244
245    try:
246        whitelist_ss58 = list_to_ss58(whitelist)
247    except AssertionError:
248        context.error(
249            "Invalid SS58 address passed to whitelist"
250        )
251        exit(1)
252    try:
253        blacklist_ss58 = list_to_ss58(blacklist)
254    except AssertionError:
255        context.error(
256            "Invalid SS58 address passed on blacklist"
257        )
258        exit(1)
259    cast(list[Ss58Address] | None, whitelist)
260
261    server = ModuleServer(
262        class_obj(), keypair,
263        whitelist=whitelist_ss58, blacklist=blacklist_ss58,
264        subnets_whitelist=subnets_whitelist,
265        max_request_staleness=request_staleness,
266        limiter=limiter_params,
267        ip_blacklist=ip_blacklist,
268        use_testnet=use_testnet
269    )
270    app = server.get_fastapi_app()
271    host = ip or "127.0.0.1"
272    uvicorn.run(app, host=host, port=port)  # type: ignore
273
274
275@module_app.command()
276def info(ctx: Context, name: str, balance: bool = False, netuid: int = 0):
277    """
278    Gets module info
279    """
280    context = make_custom_context(ctx)
281    client = context.com_client()
282
283    with context.progress_status(
284        f"Getting Module {name} on a subnet with netuid {netuid}…"
285    ):
286        modules = get_map_modules(client, netuid=netuid, include_balances=balance)
287        modules_to_list = [value for _, value in modules.items()]
288
289        module = next((item for item in modules_to_list if item["name"] == name), None)
290
291    if module is None:
292        raise ValueError("Module not found")
293
294    general_module = cast(dict[str, Any], module)
295    print_table_from_plain_dict(general_module, ["Params", "Values"], context.console)
296
297
298@module_app.command(name="list")
299def inventory(ctx: Context, balances: bool = False, netuid: int = 0):
300    """
301    Modules stats on the network.
302    """
303    context = make_custom_context(ctx)
304    client = context.com_client()
305
306    # with context.progress_status(
307    #     f"Getting Modules on a subnet with netuid {netuid}..."
308    # ):
309    modules = cast(dict[str, Any], get_map_modules(
310        client, netuid=netuid, include_balances=balances))
311
312    # Convert the values to a human readable format
313    modules_to_list = [value for _, value in modules.items()]
314
315    miners: list[Any] = []
316    validators: list[Any] = []
317    inactive: list[Any] = []
318
319    for module in modules_to_list:
320        if module["incentive"] == module["dividends"] == 0:
321            inactive.append(module)
322        elif module["incentive"] > module["dividends"]:
323            miners.append(module)
324        else:
325            validators.append(module)
326
327    print_module_info(client, miners, context.console, netuid, "miners")
328    print_module_info(client, validators, context.console, netuid, "validators")
329    print_module_info(client, inactive, context.console, netuid, "inactive")
module_app = <typer.main.Typer object>
def list_to_ss58(str_list: list[str] | None) -> list[communex.types.Ss58Address] | None:
26def list_to_ss58(str_list: list[str] | None) -> list[Ss58Address] | None:
27    """Raises AssertionError if some input is not a valid Ss58Address."""
28
29    if str_list is None:
30        return None
31    new_list: list[Ss58Address] = []
32    for item in str_list:
33        new_item = check_ss58_address(item)
34        new_list.append(new_item)
35    return new_list

Raises AssertionError if some input is not a valid Ss58Address.

@module_app.command()
def register( ctx: typer.models.Context, name: str, key: str, netuid: int, ip: Optional[str] = None, port: Optional[int] = None, metadata: Optional[str] = None):
41@module_app.command()
42def register(
43    ctx: Context,
44    name: str,
45    key: str,
46    netuid: int,
47    ip: Optional[str] = None,
48    port: Optional[int] = None,
49    metadata: Optional[str] = None,
50):
51    """
52    Registers a module on a subnet.
53    """
54    context = make_custom_context(ctx)
55    client = context.com_client()
56    if metadata and len(metadata) > 59:
57        raise ValueError("Metadata must be less than 60 characters")
58
59    burn = client.get_burn(netuid=netuid)
60
61    if netuid != 0:
62        do_burn = context.confirm(
63            f"{c_balance.from_nano(burn)} $COMAI will be permanently burned. Do you want to continue?"
64        )
65
66        if not do_burn:
67            context.info("Not registering")
68            raise typer.Abort()
69
70    resolved_key = try_classic_load_key(key, context)
71    with context.progress_status(f"Registering Module {name}..."):
72        subnet_name = client.get_subnet_name(netuid)
73        address = f"{ip}:{port}"
74
75        response = client.register_module(
76            resolved_key,
77            name=name,
78            subnet=subnet_name,
79            address=address,
80            metadata=metadata,
81        )
82
83        if response.is_success:
84            context.info(f"Module {name} registered")
85        else:
86            raise ChainTransactionError(response.error_message)  # type: ignore

Registers a module on a subnet.

@module_app.command()
def deregister(ctx: typer.models.Context, key: str, netuid: int):
 89@module_app.command()
 90def deregister(ctx: Context, key: str, netuid: int):
 91    """
 92    Deregisters a module from a subnet.
 93    """
 94    context = make_custom_context(ctx)
 95    client = context.com_client()
 96
 97    resolved_key = try_classic_load_key(key, context)
 98    with context.progress_status(f"Deregistering your module on subnet {netuid}..."):
 99
100        response = client.deregister_module(key=resolved_key, netuid=netuid)
101
102        if response.is_success:
103            context.info("Module deregistered")
104        else:
105            raise ChainTransactionError(response.error_message)  # type: ignore

Deregisters a module from a subnet.

@module_app.command()
def update( ctx: typer.models.Context, key: str, netuid: int, name: Optional[str] = None, ip: Optional[str] = None, port: Optional[int] = None, delegation_fee: Optional[int] = None, metadata: Optional[str] = None):
108@module_app.command()
109def update(
110    ctx: Context,
111    key: str,
112    netuid: int,
113    name: Optional[str] = None,
114    ip: Optional[str] = None,
115    port: Optional[int] = None,
116    delegation_fee: Optional[int] = None,
117    metadata: Optional[str] = None,
118):
119    """
120    Update module with custom parameters.
121    """
122
123    context = make_custom_context(ctx)
124    client = context.com_client()
125    if metadata and len(metadata) > 59:
126        raise ValueError("Metadata must be less than 60 characters")
127    resolved_key = try_classic_load_key(key)
128
129    if ip and not is_ip_valid(ip):
130        raise ValueError("Invalid ip address")
131    modules = get_map_modules(client, netuid=netuid, include_balances=False)
132    modules_to_list = [value for _, value in modules.items()]
133
134    module = next(
135        (
136            item for item in modules_to_list if item["key"] == resolved_key.ss58_address
137        ),
138        None
139    )
140
141    if module is None:
142        raise ValueError(f"Module {name} not found")
143    module_params = {
144        "name": name,
145        "ip": ip,
146        "port": port,
147        "delegation_fee": delegation_fee,
148        "metadata": metadata,
149    }
150    to_update = {
151        key: value for key, value in module_params.items() if value is not None
152    }
153    current_ip, current_port = module["address"].split(":")
154    new_ip = to_update.get("ip", current_ip)
155    new_port = to_update.get("port", current_port)
156    address = f"{new_ip}:{new_port}"
157    to_update["address"] = address
158    updated_module = intersection_update(dict(module), to_update)
159    module.update(updated_module)  # type: ignore
160    with context.progress_status(
161        f"Updating Module on a subnet with netuid '{netuid}' ..."
162    ):
163        response = client.update_module(
164            key=resolved_key,
165            name=module["name"],
166            address=module["address"],
167            delegation_fee=module["delegation_fee"],
168            netuid=netuid,
169            metadata=module["metadata"],
170        )
171
172    if response.is_success:
173        context.info(f"Module {key} updated")
174    else:
175        raise ChainTransactionError(response.error_message)  # type: ignore

Update module with custom parameters.

@module_app.command()
def serve( ctx: typer.models.Context, class_path: str, key: str, port: int = 8000, ip: Optional[str] = None, subnets_whitelist: Optional[list[int]] = [0], whitelist: Optional[list[str]] = None, blacklist: Optional[list[str]] = None, ip_blacklist: Optional[list[str]] = None, test_mode: Optional[bool] = False, request_staleness: int = <typer.models.OptionInfo object>, use_ip_limiter: Optional[bool] = <typer.models.OptionInfo object>, token_refill_rate_base_multiplier: Optional[int] = <typer.models.OptionInfo object>):
178@module_app.command()
179def serve(
180    ctx: typer.Context,
181    class_path: str,
182    key: str,
183    port: int = 8000,
184    ip: Optional[str] = None,
185    subnets_whitelist: Optional[list[int]] = [0],
186    whitelist: Optional[list[str]] = None,
187    blacklist: Optional[list[str]] = None,
188    ip_blacklist: Optional[list[str]] = None,
189    test_mode: Optional[bool] = False,
190    request_staleness: int = typer.Option(120),
191    use_ip_limiter: Optional[bool] = typer.Option(
192        False, help=("If this value is passed, the ip limiter will be used")
193    ),
194    token_refill_rate_base_multiplier: Optional[int] = typer.Option(
195        None, help=(
196            "Multiply the base limit per stake. Only used in stake limiter mode."
197        )
198    )
199):
200    """
201    Serves a module on `127.0.0.1` on port `port`. `class_path` should specify
202    the dotted path to the module class e.g. `module.submodule.ClassName`.
203    """
204    context = make_custom_context(ctx)
205    use_testnet = context.get_use_testnet()
206    path_parts = class_path.split(".")
207    match path_parts:
208        case [*module_parts, class_name]:
209            module_path = ".".join(module_parts)
210            if not module_path:
211                # This could do some kind of relative import somehow?
212                raise ValueError(
213                    f"Invalid class path: `{class_path}`, module name is missing"
214                )
215            if not class_name:
216                raise ValueError(
217                    f"Invalid class path: `{class_path}`, class name is missing"
218                )
219        case _:
220            # This is impossible
221            raise Exception(f"Invalid class path: `{class_path}`")
222
223    try:
224        module = importlib.import_module(module_path)
225    except ModuleNotFoundError:
226        context.error(f"Module `{module_path}` not found")
227        raise typer.Exit(code=1)
228
229    try:
230        class_obj = getattr(module, class_name)
231    except AttributeError:
232        context.error(f"Class `{class_name}` not found in module `{module}`")
233        raise typer.Exit(code=1)
234
235    keypair = try_classic_load_key(key, context)
236    if test_mode:
237        subnets_whitelist = None
238    token_refill_rate = token_refill_rate_base_multiplier or 1
239    limiter_params = IpLimiterParams() if use_ip_limiter else StakeLimiterParams(token_ratio=token_refill_rate)
240
241    if whitelist is None:
242        context.info(
243            "WARNING: No whitelist provided, will accept calls from any key"
244        )
245
246    try:
247        whitelist_ss58 = list_to_ss58(whitelist)
248    except AssertionError:
249        context.error(
250            "Invalid SS58 address passed to whitelist"
251        )
252        exit(1)
253    try:
254        blacklist_ss58 = list_to_ss58(blacklist)
255    except AssertionError:
256        context.error(
257            "Invalid SS58 address passed on blacklist"
258        )
259        exit(1)
260    cast(list[Ss58Address] | None, whitelist)
261
262    server = ModuleServer(
263        class_obj(), keypair,
264        whitelist=whitelist_ss58, blacklist=blacklist_ss58,
265        subnets_whitelist=subnets_whitelist,
266        max_request_staleness=request_staleness,
267        limiter=limiter_params,
268        ip_blacklist=ip_blacklist,
269        use_testnet=use_testnet
270    )
271    app = server.get_fastapi_app()
272    host = ip or "127.0.0.1"
273    uvicorn.run(app, host=host, port=port)  # type: ignore

Serves a module on 127.0.0.1 on port port. class_path should specify the dotted path to the module class e.g. module.submodule.ClassName.

@module_app.command()
def info( ctx: typer.models.Context, name: str, balance: bool = False, netuid: int = 0):
276@module_app.command()
277def info(ctx: Context, name: str, balance: bool = False, netuid: int = 0):
278    """
279    Gets module info
280    """
281    context = make_custom_context(ctx)
282    client = context.com_client()
283
284    with context.progress_status(
285        f"Getting Module {name} on a subnet with netuid {netuid}…"
286    ):
287        modules = get_map_modules(client, netuid=netuid, include_balances=balance)
288        modules_to_list = [value for _, value in modules.items()]
289
290        module = next((item for item in modules_to_list if item["name"] == name), None)
291
292    if module is None:
293        raise ValueError("Module not found")
294
295    general_module = cast(dict[str, Any], module)
296    print_table_from_plain_dict(general_module, ["Params", "Values"], context.console)

Gets module info

@module_app.command(name='list')
def inventory(ctx: typer.models.Context, balances: bool = False, netuid: int = 0):
299@module_app.command(name="list")
300def inventory(ctx: Context, balances: bool = False, netuid: int = 0):
301    """
302    Modules stats on the network.
303    """
304    context = make_custom_context(ctx)
305    client = context.com_client()
306
307    # with context.progress_status(
308    #     f"Getting Modules on a subnet with netuid {netuid}..."
309    # ):
310    modules = cast(dict[str, Any], get_map_modules(
311        client, netuid=netuid, include_balances=balances))
312
313    # Convert the values to a human readable format
314    modules_to_list = [value for _, value in modules.items()]
315
316    miners: list[Any] = []
317    validators: list[Any] = []
318    inactive: list[Any] = []
319
320    for module in modules_to_list:
321        if module["incentive"] == module["dividends"] == 0:
322            inactive.append(module)
323        elif module["incentive"] > module["dividends"]:
324            miners.append(module)
325        else:
326            validators.append(module)
327
328    print_module_info(client, miners, context.console, netuid, "miners")
329    print_module_info(client, validators, context.console, netuid, "validators")
330    print_module_info(client, inactive, context.console, netuid, "inactive")

Modules stats on the network.