communex.cli.module
1import importlib.util 2from typing import Any, Optional, cast 3 4import typer 5import uvicorn 6from typer import Context 7 8import communex.balance as c_balance 9from communex._common import intersection_update 10from communex.cli._common import (make_custom_context, print_module_info, 11 print_table_from_plain_dict) 12from communex.compat.key import try_classic_load_key 13from communex.errors import ChainTransactionError 14from communex.key import check_ss58_address 15from communex.misc import get_map_modules 16from communex.module._rate_limiters.limiters import (IpLimiterParams, 17 StakeLimiterParams) 18from communex.module.server import ModuleServer 19from communex.types import Ss58Address 20from communex.util import is_ip_valid 21 22module_app = typer.Typer(no_args_is_help=True) 23 24 25def list_to_ss58(str_list: list[str] | None) -> list[Ss58Address] | None: 26 """Raises AssertionError if some input is not a valid Ss58Address.""" 27 28 if str_list is None: 29 return None 30 new_list: list[Ss58Address] = [] 31 for item in str_list: 32 new_item = check_ss58_address(item) 33 new_list.append(new_item) 34 return new_list 35 36 37# TODO: refactor module register CLI 38# - module address should be a single (arbitrary) parameter 39# - key can be infered from name or vice-versa? 40@module_app.command() 41def register( 42 ctx: Context, 43 name: str, 44 key: str, 45 netuid: int, 46 ip: Optional[str] = None, 47 port: Optional[int] = None, 48 metadata: Optional[str] = None, 49): 50 """ 51 Registers a module on a subnet. 52 """ 53 context = make_custom_context(ctx) 54 client = context.com_client() 55 if metadata and len(metadata) > 59: 56 raise ValueError("Metadata must be less than 60 characters") 57 58 burn = client.get_burn(netuid=netuid) 59 60 if netuid != 0: 61 do_burn = context.confirm( 62 f"{c_balance.from_nano(burn)} $COMAI will be permanently burned. Do you want to continue?" 63 ) 64 65 if not do_burn: 66 context.info("Not registering") 67 raise typer.Abort() 68 69 resolved_key = try_classic_load_key(key, context) 70 with context.progress_status(f"Registering Module {name}..."): 71 subnet_name = client.get_subnet_name(netuid) 72 address = f"{ip}:{port}" 73 74 response = client.register_module( 75 resolved_key, 76 name=name, 77 subnet=subnet_name, 78 address=address, 79 metadata=metadata, 80 ) 81 82 if response.is_success: 83 context.info(f"Module {name} registered") 84 else: 85 raise ChainTransactionError(response.error_message) # type: ignore 86 87 88@module_app.command() 89def deregister(ctx: Context, key: str, netuid: int): 90 """ 91 Deregisters a module from a subnet. 92 """ 93 context = make_custom_context(ctx) 94 client = context.com_client() 95 96 resolved_key = try_classic_load_key(key, context) 97 with context.progress_status(f"Deregistering your module on subnet {netuid}..."): 98 99 response = client.deregister_module(key=resolved_key, netuid=netuid) 100 101 if response.is_success: 102 context.info("Module deregistered") 103 else: 104 raise ChainTransactionError(response.error_message) # type: ignore 105 106 107@module_app.command() 108def update( 109 ctx: Context, 110 key: str, 111 netuid: int, 112 name: Optional[str] = None, 113 ip: Optional[str] = None, 114 port: Optional[int] = None, 115 delegation_fee: Optional[int] = None, 116 metadata: Optional[str] = None, 117): 118 """ 119 Update module with custom parameters. 120 """ 121 122 context = make_custom_context(ctx) 123 client = context.com_client() 124 if metadata and len(metadata) > 59: 125 raise ValueError("Metadata must be less than 60 characters") 126 resolved_key = try_classic_load_key(key) 127 128 if ip and not is_ip_valid(ip): 129 raise ValueError("Invalid ip address") 130 modules = get_map_modules(client, netuid=netuid, include_balances=False) 131 modules_to_list = [value for _, value in modules.items()] 132 133 module = next( 134 ( 135 item for item in modules_to_list if item["key"] == resolved_key.ss58_address 136 ), 137 None 138 ) 139 140 if module is None: 141 raise ValueError(f"Module {name} not found") 142 module_params = { 143 "name": name, 144 "ip": ip, 145 "port": port, 146 "delegation_fee": delegation_fee, 147 "metadata": metadata, 148 } 149 to_update = { 150 key: value for key, value in module_params.items() if value is not None 151 } 152 current_ip, current_port = module["address"].split(":") 153 new_ip = to_update.get("ip", current_ip) 154 new_port = to_update.get("port", current_port) 155 address = f"{new_ip}:{new_port}" 156 to_update["address"] = address 157 updated_module = intersection_update(dict(module), to_update) 158 module.update(updated_module) # type: ignore 159 with context.progress_status( 160 f"Updating Module on a subnet with netuid '{netuid}' ..." 161 ): 162 response = client.update_module( 163 key=resolved_key, 164 name=module["name"], 165 address=module["address"], 166 delegation_fee=module["delegation_fee"], 167 netuid=netuid, 168 metadata=module["metadata"], 169 ) 170 171 if response.is_success: 172 context.info(f"Module {key} updated") 173 else: 174 raise ChainTransactionError(response.error_message) # type: ignore 175 176 177@module_app.command() 178def serve( 179 ctx: typer.Context, 180 class_path: str, 181 key: str, 182 port: int = 8000, 183 ip: Optional[str] = None, 184 subnets_whitelist: Optional[list[int]] = [0], 185 whitelist: Optional[list[str]] = None, 186 blacklist: Optional[list[str]] = None, 187 ip_blacklist: Optional[list[str]] = None, 188 test_mode: Optional[bool] = False, 189 request_staleness: int = typer.Option(120), 190 use_ip_limiter: Optional[bool] = typer.Option( 191 False, help=("If this value is passed, the ip limiter will be used") 192 ), 193 token_refill_rate_base_multiplier: Optional[int] = typer.Option( 194 None, help=( 195 "Multiply the base limit per stake. Only used in stake limiter mode." 196 ) 197 ) 198): 199 """ 200 Serves a module on `127.0.0.1` on port `port`. `class_path` should specify 201 the dotted path to the module class e.g. `module.submodule.ClassName`. 202 """ 203 context = make_custom_context(ctx) 204 use_testnet = context.get_use_testnet() 205 path_parts = class_path.split(".") 206 match path_parts: 207 case [*module_parts, class_name]: 208 module_path = ".".join(module_parts) 209 if not module_path: 210 # This could do some kind of relative import somehow? 211 raise ValueError( 212 f"Invalid class path: `{class_path}`, module name is missing" 213 ) 214 if not class_name: 215 raise ValueError( 216 f"Invalid class path: `{class_path}`, class name is missing" 217 ) 218 case _: 219 # This is impossible 220 raise Exception(f"Invalid class path: `{class_path}`") 221 222 try: 223 module = importlib.import_module(module_path) 224 except ModuleNotFoundError: 225 context.error(f"Module `{module_path}` not found") 226 raise typer.Exit(code=1) 227 228 try: 229 class_obj = getattr(module, class_name) 230 except AttributeError: 231 context.error(f"Class `{class_name}` not found in module `{module}`") 232 raise typer.Exit(code=1) 233 234 keypair = try_classic_load_key(key, context) 235 if test_mode: 236 subnets_whitelist = None 237 token_refill_rate = token_refill_rate_base_multiplier or 1 238 limiter_params = IpLimiterParams() if use_ip_limiter else StakeLimiterParams(token_ratio=token_refill_rate) 239 240 if whitelist is None: 241 context.info( 242 "WARNING: No whitelist provided, will accept calls from any key" 243 ) 244 245 try: 246 whitelist_ss58 = list_to_ss58(whitelist) 247 except AssertionError: 248 context.error( 249 "Invalid SS58 address passed to whitelist" 250 ) 251 exit(1) 252 try: 253 blacklist_ss58 = list_to_ss58(blacklist) 254 except AssertionError: 255 context.error( 256 "Invalid SS58 address passed on blacklist" 257 ) 258 exit(1) 259 cast(list[Ss58Address] | None, whitelist) 260 261 server = ModuleServer( 262 class_obj(), keypair, 263 whitelist=whitelist_ss58, blacklist=blacklist_ss58, 264 subnets_whitelist=subnets_whitelist, 265 max_request_staleness=request_staleness, 266 limiter=limiter_params, 267 ip_blacklist=ip_blacklist, 268 use_testnet=use_testnet 269 ) 270 app = server.get_fastapi_app() 271 host = ip or "127.0.0.1" 272 uvicorn.run(app, host=host, port=port) # type: ignore 273 274 275@module_app.command() 276def info(ctx: Context, name: str, balance: bool = False, netuid: int = 0): 277 """ 278 Gets module info 279 """ 280 context = make_custom_context(ctx) 281 client = context.com_client() 282 283 with context.progress_status( 284 f"Getting Module {name} on a subnet with netuid {netuid}…" 285 ): 286 modules = get_map_modules(client, netuid=netuid, include_balances=balance) 287 modules_to_list = [value for _, value in modules.items()] 288 289 module = next((item for item in modules_to_list if item["name"] == name), None) 290 291 if module is None: 292 raise ValueError("Module not found") 293 294 general_module = cast(dict[str, Any], module) 295 print_table_from_plain_dict(general_module, ["Params", "Values"], context.console) 296 297 298@module_app.command(name="list") 299def inventory(ctx: Context, balances: bool = False, netuid: int = 0): 300 """ 301 Modules stats on the network. 302 """ 303 context = make_custom_context(ctx) 304 client = context.com_client() 305 306 # with context.progress_status( 307 # f"Getting Modules on a subnet with netuid {netuid}..." 308 # ): 309 modules = cast(dict[str, Any], get_map_modules( 310 client, netuid=netuid, include_balances=balances)) 311 312 # Convert the values to a human readable format 313 modules_to_list = [value for _, value in modules.items()] 314 315 miners: list[Any] = [] 316 validators: list[Any] = [] 317 inactive: list[Any] = [] 318 319 for module in modules_to_list: 320 if module["incentive"] == module["dividends"] == 0: 321 inactive.append(module) 322 elif module["incentive"] > module["dividends"]: 323 miners.append(module) 324 else: 325 validators.append(module) 326 327 print_module_info(client, miners, context.console, netuid, "miners") 328 print_module_info(client, validators, context.console, netuid, "validators") 329 print_module_info(client, inactive, context.console, netuid, "inactive")
module_app =
<typer.main.Typer object>
26def list_to_ss58(str_list: list[str] | None) -> list[Ss58Address] | None: 27 """Raises AssertionError if some input is not a valid Ss58Address.""" 28 29 if str_list is None: 30 return None 31 new_list: list[Ss58Address] = [] 32 for item in str_list: 33 new_item = check_ss58_address(item) 34 new_list.append(new_item) 35 return new_list
Raises AssertionError if some input is not a valid Ss58Address.
@module_app.command()
def
register( ctx: typer.models.Context, name: str, key: str, netuid: int, ip: Optional[str] = None, port: Optional[int] = None, metadata: Optional[str] = None):
41@module_app.command() 42def register( 43 ctx: Context, 44 name: str, 45 key: str, 46 netuid: int, 47 ip: Optional[str] = None, 48 port: Optional[int] = None, 49 metadata: Optional[str] = None, 50): 51 """ 52 Registers a module on a subnet. 53 """ 54 context = make_custom_context(ctx) 55 client = context.com_client() 56 if metadata and len(metadata) > 59: 57 raise ValueError("Metadata must be less than 60 characters") 58 59 burn = client.get_burn(netuid=netuid) 60 61 if netuid != 0: 62 do_burn = context.confirm( 63 f"{c_balance.from_nano(burn)} $COMAI will be permanently burned. Do you want to continue?" 64 ) 65 66 if not do_burn: 67 context.info("Not registering") 68 raise typer.Abort() 69 70 resolved_key = try_classic_load_key(key, context) 71 with context.progress_status(f"Registering Module {name}..."): 72 subnet_name = client.get_subnet_name(netuid) 73 address = f"{ip}:{port}" 74 75 response = client.register_module( 76 resolved_key, 77 name=name, 78 subnet=subnet_name, 79 address=address, 80 metadata=metadata, 81 ) 82 83 if response.is_success: 84 context.info(f"Module {name} registered") 85 else: 86 raise ChainTransactionError(response.error_message) # type: ignore
Registers a module on a subnet.
@module_app.command()
def
deregister(ctx: typer.models.Context, key: str, netuid: int):
89@module_app.command() 90def deregister(ctx: Context, key: str, netuid: int): 91 """ 92 Deregisters a module from a subnet. 93 """ 94 context = make_custom_context(ctx) 95 client = context.com_client() 96 97 resolved_key = try_classic_load_key(key, context) 98 with context.progress_status(f"Deregistering your module on subnet {netuid}..."): 99 100 response = client.deregister_module(key=resolved_key, netuid=netuid) 101 102 if response.is_success: 103 context.info("Module deregistered") 104 else: 105 raise ChainTransactionError(response.error_message) # type: ignore
Deregisters a module from a subnet.
@module_app.command()
def
update( ctx: typer.models.Context, key: str, netuid: int, name: Optional[str] = None, ip: Optional[str] = None, port: Optional[int] = None, delegation_fee: Optional[int] = None, metadata: Optional[str] = None):
108@module_app.command() 109def update( 110 ctx: Context, 111 key: str, 112 netuid: int, 113 name: Optional[str] = None, 114 ip: Optional[str] = None, 115 port: Optional[int] = None, 116 delegation_fee: Optional[int] = None, 117 metadata: Optional[str] = None, 118): 119 """ 120 Update module with custom parameters. 121 """ 122 123 context = make_custom_context(ctx) 124 client = context.com_client() 125 if metadata and len(metadata) > 59: 126 raise ValueError("Metadata must be less than 60 characters") 127 resolved_key = try_classic_load_key(key) 128 129 if ip and not is_ip_valid(ip): 130 raise ValueError("Invalid ip address") 131 modules = get_map_modules(client, netuid=netuid, include_balances=False) 132 modules_to_list = [value for _, value in modules.items()] 133 134 module = next( 135 ( 136 item for item in modules_to_list if item["key"] == resolved_key.ss58_address 137 ), 138 None 139 ) 140 141 if module is None: 142 raise ValueError(f"Module {name} not found") 143 module_params = { 144 "name": name, 145 "ip": ip, 146 "port": port, 147 "delegation_fee": delegation_fee, 148 "metadata": metadata, 149 } 150 to_update = { 151 key: value for key, value in module_params.items() if value is not None 152 } 153 current_ip, current_port = module["address"].split(":") 154 new_ip = to_update.get("ip", current_ip) 155 new_port = to_update.get("port", current_port) 156 address = f"{new_ip}:{new_port}" 157 to_update["address"] = address 158 updated_module = intersection_update(dict(module), to_update) 159 module.update(updated_module) # type: ignore 160 with context.progress_status( 161 f"Updating Module on a subnet with netuid '{netuid}' ..." 162 ): 163 response = client.update_module( 164 key=resolved_key, 165 name=module["name"], 166 address=module["address"], 167 delegation_fee=module["delegation_fee"], 168 netuid=netuid, 169 metadata=module["metadata"], 170 ) 171 172 if response.is_success: 173 context.info(f"Module {key} updated") 174 else: 175 raise ChainTransactionError(response.error_message) # type: ignore
Update module with custom parameters.
@module_app.command()
def
serve( ctx: typer.models.Context, class_path: str, key: str, port: int = 8000, ip: Optional[str] = None, subnets_whitelist: Optional[list[int]] = [0], whitelist: Optional[list[str]] = None, blacklist: Optional[list[str]] = None, ip_blacklist: Optional[list[str]] = None, test_mode: Optional[bool] = False, request_staleness: int = <typer.models.OptionInfo object>, use_ip_limiter: Optional[bool] = <typer.models.OptionInfo object>, token_refill_rate_base_multiplier: Optional[int] = <typer.models.OptionInfo object>):
178@module_app.command() 179def serve( 180 ctx: typer.Context, 181 class_path: str, 182 key: str, 183 port: int = 8000, 184 ip: Optional[str] = None, 185 subnets_whitelist: Optional[list[int]] = [0], 186 whitelist: Optional[list[str]] = None, 187 blacklist: Optional[list[str]] = None, 188 ip_blacklist: Optional[list[str]] = None, 189 test_mode: Optional[bool] = False, 190 request_staleness: int = typer.Option(120), 191 use_ip_limiter: Optional[bool] = typer.Option( 192 False, help=("If this value is passed, the ip limiter will be used") 193 ), 194 token_refill_rate_base_multiplier: Optional[int] = typer.Option( 195 None, help=( 196 "Multiply the base limit per stake. Only used in stake limiter mode." 197 ) 198 ) 199): 200 """ 201 Serves a module on `127.0.0.1` on port `port`. `class_path` should specify 202 the dotted path to the module class e.g. `module.submodule.ClassName`. 203 """ 204 context = make_custom_context(ctx) 205 use_testnet = context.get_use_testnet() 206 path_parts = class_path.split(".") 207 match path_parts: 208 case [*module_parts, class_name]: 209 module_path = ".".join(module_parts) 210 if not module_path: 211 # This could do some kind of relative import somehow? 212 raise ValueError( 213 f"Invalid class path: `{class_path}`, module name is missing" 214 ) 215 if not class_name: 216 raise ValueError( 217 f"Invalid class path: `{class_path}`, class name is missing" 218 ) 219 case _: 220 # This is impossible 221 raise Exception(f"Invalid class path: `{class_path}`") 222 223 try: 224 module = importlib.import_module(module_path) 225 except ModuleNotFoundError: 226 context.error(f"Module `{module_path}` not found") 227 raise typer.Exit(code=1) 228 229 try: 230 class_obj = getattr(module, class_name) 231 except AttributeError: 232 context.error(f"Class `{class_name}` not found in module `{module}`") 233 raise typer.Exit(code=1) 234 235 keypair = try_classic_load_key(key, context) 236 if test_mode: 237 subnets_whitelist = None 238 token_refill_rate = token_refill_rate_base_multiplier or 1 239 limiter_params = IpLimiterParams() if use_ip_limiter else StakeLimiterParams(token_ratio=token_refill_rate) 240 241 if whitelist is None: 242 context.info( 243 "WARNING: No whitelist provided, will accept calls from any key" 244 ) 245 246 try: 247 whitelist_ss58 = list_to_ss58(whitelist) 248 except AssertionError: 249 context.error( 250 "Invalid SS58 address passed to whitelist" 251 ) 252 exit(1) 253 try: 254 blacklist_ss58 = list_to_ss58(blacklist) 255 except AssertionError: 256 context.error( 257 "Invalid SS58 address passed on blacklist" 258 ) 259 exit(1) 260 cast(list[Ss58Address] | None, whitelist) 261 262 server = ModuleServer( 263 class_obj(), keypair, 264 whitelist=whitelist_ss58, blacklist=blacklist_ss58, 265 subnets_whitelist=subnets_whitelist, 266 max_request_staleness=request_staleness, 267 limiter=limiter_params, 268 ip_blacklist=ip_blacklist, 269 use_testnet=use_testnet 270 ) 271 app = server.get_fastapi_app() 272 host = ip or "127.0.0.1" 273 uvicorn.run(app, host=host, port=port) # type: ignore
Serves a module on 127.0.0.1
on port port
. class_path
should specify
the dotted path to the module class e.g. module.submodule.ClassName
.
@module_app.command()
def
info( ctx: typer.models.Context, name: str, balance: bool = False, netuid: int = 0):
276@module_app.command() 277def info(ctx: Context, name: str, balance: bool = False, netuid: int = 0): 278 """ 279 Gets module info 280 """ 281 context = make_custom_context(ctx) 282 client = context.com_client() 283 284 with context.progress_status( 285 f"Getting Module {name} on a subnet with netuid {netuid}…" 286 ): 287 modules = get_map_modules(client, netuid=netuid, include_balances=balance) 288 modules_to_list = [value for _, value in modules.items()] 289 290 module = next((item for item in modules_to_list if item["name"] == name), None) 291 292 if module is None: 293 raise ValueError("Module not found") 294 295 general_module = cast(dict[str, Any], module) 296 print_table_from_plain_dict(general_module, ["Params", "Values"], context.console)
Gets module info
@module_app.command(name='list')
def
inventory(ctx: typer.models.Context, balances: bool = False, netuid: int = 0):
299@module_app.command(name="list") 300def inventory(ctx: Context, balances: bool = False, netuid: int = 0): 301 """ 302 Modules stats on the network. 303 """ 304 context = make_custom_context(ctx) 305 client = context.com_client() 306 307 # with context.progress_status( 308 # f"Getting Modules on a subnet with netuid {netuid}..." 309 # ): 310 modules = cast(dict[str, Any], get_map_modules( 311 client, netuid=netuid, include_balances=balances)) 312 313 # Convert the values to a human readable format 314 modules_to_list = [value for _, value in modules.items()] 315 316 miners: list[Any] = [] 317 validators: list[Any] = [] 318 inactive: list[Any] = [] 319 320 for module in modules_to_list: 321 if module["incentive"] == module["dividends"] == 0: 322 inactive.append(module) 323 elif module["incentive"] > module["dividends"]: 324 miners.append(module) 325 else: 326 validators.append(module) 327 328 print_module_info(client, miners, context.console, netuid, "miners") 329 print_module_info(client, validators, context.console, netuid, "validators") 330 print_module_info(client, inactive, context.console, netuid, "inactive")
Modules stats on the network.