communex.cli.module
1import importlib.util 2from typing import Any, Optional, cast 3 4import typer 5import uvicorn 6from typer import Context 7 8import communex.balance as c_balance 9from communex._common import intersection_update 10from communex.cli._common import ( 11 make_custom_context, 12 print_module_info, 13 print_table_from_plain_dict, 14) 15from communex.errors import ChainTransactionError 16from communex.key import check_ss58_address 17from communex.misc import get_map_modules 18from communex.module._rate_limiters.limiters import ( 19 IpLimiterParams, 20 StakeLimiterParams, 21) 22from communex.module.server import ModuleServer 23from communex.types import Ss58Address 24from communex.util import is_ip_valid 25 26module_app = typer.Typer(no_args_is_help=True) 27 28 29def list_to_ss58(str_list: list[str] | None) -> list[Ss58Address] | None: 30 """Raises AssertionError if some input is not a valid Ss58Address.""" 31 32 if str_list is None: 33 return None 34 new_list: list[Ss58Address] = [] 35 for item in str_list: 36 new_item = check_ss58_address(item) 37 new_list.append(new_item) 38 return new_list 39 40 41# TODO: refactor module register CLI 42# - module address should be a single (arbitrary) parameter 43# - key can be infered from name or vice-versa? 44@module_app.command() 45def register( 46 ctx: Context, 47 name: str, 48 key: str, 49 netuid: int, 50 ip: Optional[str] = None, 51 port: Optional[int] = None, 52 metadata: Optional[str] = None, 53): 54 """ 55 Registers a module on a subnet. 56 """ 57 context = make_custom_context(ctx) 58 client = context.com_client() 59 if metadata and len(metadata) > 59: 60 raise ValueError("Metadata must be less than 60 characters") 61 62 burn = client.get_burn(netuid=netuid) 63 64 if netuid != 0: 65 do_burn = context.confirm( 66 f"{c_balance.from_nano(burn)} $COMAI will be permanently burned. Do you want to continue?" 67 ) 68 69 if not do_burn: 70 context.info("Not registering") 71 raise typer.Abort() 72 73 resolved_key = context.load_key(key, None) 74 75 with context.progress_status(f"Registering Module {name}..."): 76 subnet_name = client.get_subnet_name(netuid) 77 address = f"{ip}:{port}" 78 79 response = client.register_module( 80 resolved_key, 81 name=name, 82 subnet=subnet_name, 83 address=address, 84 metadata=metadata, 85 ) 86 87 if response.is_success: 88 context.info(f"Module {name} registered") 89 else: 90 raise ChainTransactionError(response.error_message) # type: ignore 91 92 93@module_app.command() 94def deregister(ctx: Context, key: str, netuid: int): 95 """ 96 Deregisters a module from a subnet. 97 """ 98 context = make_custom_context(ctx) 99 client = context.com_client() 100 101 resolved_key = context.load_key(key, None) 102 103 with context.progress_status( 104 f"Deregistering your module on subnet {netuid}..." 105 ): 106 response = client.deregister_module(key=resolved_key, netuid=netuid) 107 108 if response.is_success: 109 context.info("Module deregistered") 110 else: 111 raise ChainTransactionError(response.error_message) # type: ignore 112 113 114@module_app.command() 115def update( 116 ctx: Context, 117 key: str, 118 netuid: int, 119 name: Optional[str] = None, 120 ip: Optional[str] = None, 121 port: Optional[int] = None, 122 stake_delegation_fee: Optional[int] = None, 123 validator_weight_fee: Optional[int] = None, 124 metadata: Optional[str] = None, 125): 126 """ 127 Update module with custom parameters. 128 """ 129 130 context = make_custom_context(ctx) 131 client = context.com_client() 132 133 if metadata and len(metadata) > 59: 134 raise ValueError("Metadata must be less than 60 characters") 135 136 resolved_key = context.load_key(key, None) 137 138 if ip and not is_ip_valid(ip): 139 raise ValueError("Invalid ip address") 140 modules = get_map_modules(client, netuid=netuid, include_balances=False) 141 modules_to_list = [value for _, value in modules.items()] 142 143 module = next( 144 ( 145 item 146 for item in modules_to_list 147 if item["key"] == resolved_key.ss58_address 148 ), 149 None, 150 ) 151 152 if module is None: 153 raise ValueError(f"Module {name} not found") 154 module_params = { 155 "name": name, 156 "ip": ip, 157 "port": port, 158 "delegation_fee": stake_delegation_fee, 159 "metadata": metadata, 160 } 161 to_update = { 162 key: value for key, value in module_params.items() if value is not None 163 } 164 current_address = module["address"] 165 if ":" in current_address: 166 current_ip, current_port = current_address.split(":") 167 else: 168 current_ip, current_port = current_address, None 169 170 new_ip = to_update.get("ip", current_ip) 171 new_port = to_update.get("port", current_port) 172 173 if new_port is not None: 174 address = f"{new_ip}:{new_port}" 175 else: 176 address = new_ip 177 to_update["address"] = address 178 updated_module = intersection_update(dict(module), to_update) 179 module.update(updated_module) # type: ignore 180 with context.progress_status( 181 f"Updating Module on a subnet with netuid '{netuid}' ..." 182 ): 183 response = client.update_module( 184 key=resolved_key, 185 name=module["name"], 186 address=module["address"], 187 delegation_fee=module["stake_delegation_fee"], 188 weight_setting_delegation_fee=validator_weight_fee, 189 netuid=netuid, 190 metadata=module["metadata"], 191 ) 192 193 if response.is_success: 194 context.info(f"Module {key} updated") 195 else: 196 raise ChainTransactionError(response.error_message) # type: ignore 197 198 199@module_app.command() 200def serve( 201 ctx: typer.Context, 202 class_path: str, 203 key: str, 204 port: int = 8000, 205 ip: Optional[str] = None, 206 subnets_whitelist: Optional[list[int]] = [0], 207 whitelist: Optional[list[str]] = None, 208 blacklist: Optional[list[str]] = None, 209 ip_blacklist: Optional[list[str]] = None, 210 test_mode: Optional[bool] = False, 211 request_staleness: int = typer.Option(120), 212 use_ip_limiter: Optional[bool] = typer.Option( 213 False, help=("If this value is passed, the ip limiter will be used") 214 ), 215 token_refill_rate_base_multiplier: Optional[int] = typer.Option( 216 None, 217 help=( 218 "Multiply the base limit per stake. Only used in stake limiter mode." 219 ), 220 ), 221): 222 """ 223 Serves a module on `127.0.0.1` on port `port`. `class_path` should specify 224 the dotted path to the module class e.g. `module.submodule.ClassName`. 225 """ 226 context = make_custom_context(ctx) 227 use_testnet = context.get_use_testnet() 228 path_parts = class_path.split(".") 229 match path_parts: 230 case [*module_parts, class_name]: 231 module_path = ".".join(module_parts) 232 if not module_path: 233 # This could do some kind of relative import somehow? 234 raise ValueError( 235 f"Invalid class path: `{class_path}`, module name is missing" 236 ) 237 if not class_name: 238 raise ValueError( 239 f"Invalid class path: `{class_path}`, class name is missing" 240 ) 241 case _: 242 # This is impossible 243 raise Exception(f"Invalid class path: `{class_path}`") 244 245 try: 246 module = importlib.import_module(module_path) 247 except ModuleNotFoundError: 248 context.error(f"Module `{module_path}` not found") 249 raise typer.Exit(code=1) 250 251 try: 252 class_obj = getattr(module, class_name) 253 except AttributeError: 254 context.error(f"Class `{class_name}` not found in module `{module}`") 255 raise typer.Exit(code=1) 256 257 keypair = context.load_key(key, None) 258 259 if test_mode: 260 subnets_whitelist = None 261 token_refill_rate = token_refill_rate_base_multiplier or 1 262 limiter_params = ( 263 IpLimiterParams() 264 if use_ip_limiter 265 else StakeLimiterParams(token_ratio=token_refill_rate) 266 ) 267 268 if whitelist is None: 269 context.info( 270 "WARNING: No whitelist provided, will accept calls from any key" 271 ) 272 273 try: 274 whitelist_ss58 = list_to_ss58(whitelist) 275 except AssertionError: 276 context.error("Invalid SS58 address passed to whitelist") 277 exit(1) 278 try: 279 blacklist_ss58 = list_to_ss58(blacklist) 280 except AssertionError: 281 context.error("Invalid SS58 address passed on blacklist") 282 exit(1) 283 cast(list[Ss58Address] | None, whitelist) 284 285 server = ModuleServer( 286 class_obj(), 287 keypair, 288 whitelist=whitelist_ss58, 289 blacklist=blacklist_ss58, 290 subnets_whitelist=subnets_whitelist, 291 max_request_staleness=request_staleness, 292 limiter=limiter_params, 293 ip_blacklist=ip_blacklist, 294 use_testnet=use_testnet, 295 ) 296 app = server.get_fastapi_app() 297 host = ip or "127.0.0.1" 298 uvicorn.run(app, host=host, port=port) # type: ignore 299 300 301@module_app.command() 302def info(ctx: Context, name: str, balance: bool = False, netuid: int = 0): 303 """ 304 Gets module info 305 """ 306 context = make_custom_context(ctx) 307 client = context.com_client() 308 309 with context.progress_status( 310 f"Getting Module {name} on a subnet with netuid {netuid}…" 311 ): 312 modules = get_map_modules( 313 client, netuid=netuid, include_balances=balance 314 ) 315 modules_to_list = [value for _, value in modules.items()] 316 317 module = next( 318 (item for item in modules_to_list if item["name"] == name), None 319 ) 320 321 if module is None: 322 raise ValueError("Module not found") 323 324 general_module = cast(dict[str, Any], module) 325 print_table_from_plain_dict( 326 general_module, ["Params", "Values"], context.console 327 ) 328 329 330@module_app.command(name="list") 331def inventory(ctx: Context, balances: bool = False, netuid: int = 0): 332 """ 333 Modules stats on the network. 334 """ 335 context = make_custom_context(ctx) 336 client = context.com_client() 337 338 # with context.progress_status( 339 # f"Getting Modules on a subnet with netuid {netuid}..." 340 # ): 341 modules = cast( 342 dict[str, Any], 343 get_map_modules(client, netuid=netuid, include_balances=balances), 344 ) 345 346 # Convert the values to a human readable format 347 modules_to_list = [value for _, value in modules.items()] 348 349 miners: list[Any] = [] 350 validators: list[Any] = [] 351 inactive: list[Any] = [] 352 353 for module in modules_to_list: 354 if module["incentive"] == module["dividends"] == 0: 355 inactive.append(module) 356 elif module["incentive"] > module["dividends"]: 357 miners.append(module) 358 else: 359 validators.append(module) 360 361 print_module_info(client, miners, context.console, netuid, "miners") 362 print_module_info(client, validators, context.console, netuid, "validators") 363 print_module_info(client, inactive, context.console, netuid, "inactive")
module_app =
<typer.main.Typer object>
30def list_to_ss58(str_list: list[str] | None) -> list[Ss58Address] | None: 31 """Raises AssertionError if some input is not a valid Ss58Address.""" 32 33 if str_list is None: 34 return None 35 new_list: list[Ss58Address] = [] 36 for item in str_list: 37 new_item = check_ss58_address(item) 38 new_list.append(new_item) 39 return new_list
Raises AssertionError if some input is not a valid Ss58Address.
@module_app.command()
def
register( ctx: typer.models.Context, name: str, key: str, netuid: int, ip: Optional[str] = None, port: Optional[int] = None, metadata: Optional[str] = None):
45@module_app.command() 46def register( 47 ctx: Context, 48 name: str, 49 key: str, 50 netuid: int, 51 ip: Optional[str] = None, 52 port: Optional[int] = None, 53 metadata: Optional[str] = None, 54): 55 """ 56 Registers a module on a subnet. 57 """ 58 context = make_custom_context(ctx) 59 client = context.com_client() 60 if metadata and len(metadata) > 59: 61 raise ValueError("Metadata must be less than 60 characters") 62 63 burn = client.get_burn(netuid=netuid) 64 65 if netuid != 0: 66 do_burn = context.confirm( 67 f"{c_balance.from_nano(burn)} $COMAI will be permanently burned. Do you want to continue?" 68 ) 69 70 if not do_burn: 71 context.info("Not registering") 72 raise typer.Abort() 73 74 resolved_key = context.load_key(key, None) 75 76 with context.progress_status(f"Registering Module {name}..."): 77 subnet_name = client.get_subnet_name(netuid) 78 address = f"{ip}:{port}" 79 80 response = client.register_module( 81 resolved_key, 82 name=name, 83 subnet=subnet_name, 84 address=address, 85 metadata=metadata, 86 ) 87 88 if response.is_success: 89 context.info(f"Module {name} registered") 90 else: 91 raise ChainTransactionError(response.error_message) # type: ignore
Registers a module on a subnet.
@module_app.command()
def
deregister(ctx: typer.models.Context, key: str, netuid: int):
94@module_app.command() 95def deregister(ctx: Context, key: str, netuid: int): 96 """ 97 Deregisters a module from a subnet. 98 """ 99 context = make_custom_context(ctx) 100 client = context.com_client() 101 102 resolved_key = context.load_key(key, None) 103 104 with context.progress_status( 105 f"Deregistering your module on subnet {netuid}..." 106 ): 107 response = client.deregister_module(key=resolved_key, netuid=netuid) 108 109 if response.is_success: 110 context.info("Module deregistered") 111 else: 112 raise ChainTransactionError(response.error_message) # type: ignore
Deregisters a module from a subnet.
@module_app.command()
def
update( ctx: typer.models.Context, key: str, netuid: int, name: Optional[str] = None, ip: Optional[str] = None, port: Optional[int] = None, stake_delegation_fee: Optional[int] = None, validator_weight_fee: Optional[int] = None, metadata: Optional[str] = None):
115@module_app.command() 116def update( 117 ctx: Context, 118 key: str, 119 netuid: int, 120 name: Optional[str] = None, 121 ip: Optional[str] = None, 122 port: Optional[int] = None, 123 stake_delegation_fee: Optional[int] = None, 124 validator_weight_fee: Optional[int] = None, 125 metadata: Optional[str] = None, 126): 127 """ 128 Update module with custom parameters. 129 """ 130 131 context = make_custom_context(ctx) 132 client = context.com_client() 133 134 if metadata and len(metadata) > 59: 135 raise ValueError("Metadata must be less than 60 characters") 136 137 resolved_key = context.load_key(key, None) 138 139 if ip and not is_ip_valid(ip): 140 raise ValueError("Invalid ip address") 141 modules = get_map_modules(client, netuid=netuid, include_balances=False) 142 modules_to_list = [value for _, value in modules.items()] 143 144 module = next( 145 ( 146 item 147 for item in modules_to_list 148 if item["key"] == resolved_key.ss58_address 149 ), 150 None, 151 ) 152 153 if module is None: 154 raise ValueError(f"Module {name} not found") 155 module_params = { 156 "name": name, 157 "ip": ip, 158 "port": port, 159 "delegation_fee": stake_delegation_fee, 160 "metadata": metadata, 161 } 162 to_update = { 163 key: value for key, value in module_params.items() if value is not None 164 } 165 current_address = module["address"] 166 if ":" in current_address: 167 current_ip, current_port = current_address.split(":") 168 else: 169 current_ip, current_port = current_address, None 170 171 new_ip = to_update.get("ip", current_ip) 172 new_port = to_update.get("port", current_port) 173 174 if new_port is not None: 175 address = f"{new_ip}:{new_port}" 176 else: 177 address = new_ip 178 to_update["address"] = address 179 updated_module = intersection_update(dict(module), to_update) 180 module.update(updated_module) # type: ignore 181 with context.progress_status( 182 f"Updating Module on a subnet with netuid '{netuid}' ..." 183 ): 184 response = client.update_module( 185 key=resolved_key, 186 name=module["name"], 187 address=module["address"], 188 delegation_fee=module["stake_delegation_fee"], 189 weight_setting_delegation_fee=validator_weight_fee, 190 netuid=netuid, 191 metadata=module["metadata"], 192 ) 193 194 if response.is_success: 195 context.info(f"Module {key} updated") 196 else: 197 raise ChainTransactionError(response.error_message) # type: ignore
Update module with custom parameters.
@module_app.command()
def
serve( ctx: typer.models.Context, class_path: str, key: str, port: int = 8000, ip: Optional[str] = None, subnets_whitelist: Optional[list[int]] = [0], whitelist: Optional[list[str]] = None, blacklist: Optional[list[str]] = None, ip_blacklist: Optional[list[str]] = None, test_mode: Optional[bool] = False, request_staleness: int = <typer.models.OptionInfo object>, use_ip_limiter: Optional[bool] = <typer.models.OptionInfo object>, token_refill_rate_base_multiplier: Optional[int] = <typer.models.OptionInfo object>):
200@module_app.command() 201def serve( 202 ctx: typer.Context, 203 class_path: str, 204 key: str, 205 port: int = 8000, 206 ip: Optional[str] = None, 207 subnets_whitelist: Optional[list[int]] = [0], 208 whitelist: Optional[list[str]] = None, 209 blacklist: Optional[list[str]] = None, 210 ip_blacklist: Optional[list[str]] = None, 211 test_mode: Optional[bool] = False, 212 request_staleness: int = typer.Option(120), 213 use_ip_limiter: Optional[bool] = typer.Option( 214 False, help=("If this value is passed, the ip limiter will be used") 215 ), 216 token_refill_rate_base_multiplier: Optional[int] = typer.Option( 217 None, 218 help=( 219 "Multiply the base limit per stake. Only used in stake limiter mode." 220 ), 221 ), 222): 223 """ 224 Serves a module on `127.0.0.1` on port `port`. `class_path` should specify 225 the dotted path to the module class e.g. `module.submodule.ClassName`. 226 """ 227 context = make_custom_context(ctx) 228 use_testnet = context.get_use_testnet() 229 path_parts = class_path.split(".") 230 match path_parts: 231 case [*module_parts, class_name]: 232 module_path = ".".join(module_parts) 233 if not module_path: 234 # This could do some kind of relative import somehow? 235 raise ValueError( 236 f"Invalid class path: `{class_path}`, module name is missing" 237 ) 238 if not class_name: 239 raise ValueError( 240 f"Invalid class path: `{class_path}`, class name is missing" 241 ) 242 case _: 243 # This is impossible 244 raise Exception(f"Invalid class path: `{class_path}`") 245 246 try: 247 module = importlib.import_module(module_path) 248 except ModuleNotFoundError: 249 context.error(f"Module `{module_path}` not found") 250 raise typer.Exit(code=1) 251 252 try: 253 class_obj = getattr(module, class_name) 254 except AttributeError: 255 context.error(f"Class `{class_name}` not found in module `{module}`") 256 raise typer.Exit(code=1) 257 258 keypair = context.load_key(key, None) 259 260 if test_mode: 261 subnets_whitelist = None 262 token_refill_rate = token_refill_rate_base_multiplier or 1 263 limiter_params = ( 264 IpLimiterParams() 265 if use_ip_limiter 266 else StakeLimiterParams(token_ratio=token_refill_rate) 267 ) 268 269 if whitelist is None: 270 context.info( 271 "WARNING: No whitelist provided, will accept calls from any key" 272 ) 273 274 try: 275 whitelist_ss58 = list_to_ss58(whitelist) 276 except AssertionError: 277 context.error("Invalid SS58 address passed to whitelist") 278 exit(1) 279 try: 280 blacklist_ss58 = list_to_ss58(blacklist) 281 except AssertionError: 282 context.error("Invalid SS58 address passed on blacklist") 283 exit(1) 284 cast(list[Ss58Address] | None, whitelist) 285 286 server = ModuleServer( 287 class_obj(), 288 keypair, 289 whitelist=whitelist_ss58, 290 blacklist=blacklist_ss58, 291 subnets_whitelist=subnets_whitelist, 292 max_request_staleness=request_staleness, 293 limiter=limiter_params, 294 ip_blacklist=ip_blacklist, 295 use_testnet=use_testnet, 296 ) 297 app = server.get_fastapi_app() 298 host = ip or "127.0.0.1" 299 uvicorn.run(app, host=host, port=port) # type: ignore
Serves a module on 127.0.0.1
on port port
. class_path
should specify
the dotted path to the module class e.g. module.submodule.ClassName
.
@module_app.command()
def
info( ctx: typer.models.Context, name: str, balance: bool = False, netuid: int = 0):
302@module_app.command() 303def info(ctx: Context, name: str, balance: bool = False, netuid: int = 0): 304 """ 305 Gets module info 306 """ 307 context = make_custom_context(ctx) 308 client = context.com_client() 309 310 with context.progress_status( 311 f"Getting Module {name} on a subnet with netuid {netuid}…" 312 ): 313 modules = get_map_modules( 314 client, netuid=netuid, include_balances=balance 315 ) 316 modules_to_list = [value for _, value in modules.items()] 317 318 module = next( 319 (item for item in modules_to_list if item["name"] == name), None 320 ) 321 322 if module is None: 323 raise ValueError("Module not found") 324 325 general_module = cast(dict[str, Any], module) 326 print_table_from_plain_dict( 327 general_module, ["Params", "Values"], context.console 328 )
Gets module info
@module_app.command(name='list')
def
inventory(ctx: typer.models.Context, balances: bool = False, netuid: int = 0):
331@module_app.command(name="list") 332def inventory(ctx: Context, balances: bool = False, netuid: int = 0): 333 """ 334 Modules stats on the network. 335 """ 336 context = make_custom_context(ctx) 337 client = context.com_client() 338 339 # with context.progress_status( 340 # f"Getting Modules on a subnet with netuid {netuid}..." 341 # ): 342 modules = cast( 343 dict[str, Any], 344 get_map_modules(client, netuid=netuid, include_balances=balances), 345 ) 346 347 # Convert the values to a human readable format 348 modules_to_list = [value for _, value in modules.items()] 349 350 miners: list[Any] = [] 351 validators: list[Any] = [] 352 inactive: list[Any] = [] 353 354 for module in modules_to_list: 355 if module["incentive"] == module["dividends"] == 0: 356 inactive.append(module) 357 elif module["incentive"] > module["dividends"]: 358 miners.append(module) 359 else: 360 validators.append(module) 361 362 print_module_info(client, miners, context.console, netuid, "miners") 363 print_module_info(client, validators, context.console, netuid, "validators") 364 print_module_info(client, inactive, context.console, netuid, "inactive")
Modules stats on the network.