communex.cli._common
1from dataclasses import dataclass 2from getpass import getpass 3from typing import Any, Callable, Mapping, TypeVar, cast 4 5import rich 6import rich.prompt 7import typer 8from rich import box 9from rich.console import Console 10from rich.table import Table 11from substrateinterface import Keypair 12from typer import Context 13 14from communex._common import ComxSettings, get_node_url 15from communex.balance import dict_from_nano, from_horus, from_nano 16from communex.client import CommuneClient 17from communex.compat.key import resolve_key_ss58_encrypted, try_classic_load_key 18from communex.errors import InvalidPasswordError, PasswordNotProvidedError 19from communex.types import ( 20 ModuleInfoWithOptionalBalance, 21 NetworkParams, 22 Ss58Address, 23 SubnetParamsWithEmission, 24) 25 26 27@dataclass 28class ExtraCtxData: 29 output_json: bool 30 use_testnet: bool 31 yes_to_all: bool 32 33 34class ExtendedContext(Context): 35 obj: ExtraCtxData 36 37 38class CliPasswordProvider: 39 def __init__( 40 self, settings: ComxSettings, prompt_secret: Callable[[str], str] 41 ): 42 self.settings = settings 43 self.prompt_secret = prompt_secret 44 45 def get_password(self, key_name: str) -> str | None: 46 key_map = self.settings.KEY_PASSWORDS 47 if key_map is not None: 48 password = key_map.get(key_name) 49 if password is not None: 50 return password.get_secret_value() 51 # fallback to universal password 52 password = self.settings.UNIVERSAL_PASSWORD 53 if password is not None: 54 return password.get_secret_value() 55 else: 56 return None 57 58 def ask_password(self, key_name: str) -> str: 59 password = self.prompt_secret( 60 f"Please provide the password for the key '{key_name}'" 61 ) 62 return password 63 64 65class CustomCtx: 66 ctx: ExtendedContext 67 settings: ComxSettings 68 console: rich.console.Console 69 console_err: rich.console.Console 70 password_manager: CliPasswordProvider 71 _com_client: CommuneClient | None = None 72 73 def __init__( 74 self, 75 ctx: ExtendedContext, 76 settings: ComxSettings, 77 console: rich.console.Console, 78 console_err: rich.console.Console, 79 com_client: CommuneClient | None = None, 80 ): 81 self.ctx = ctx 82 self.settings = settings 83 self.console = console 84 self.console_err = console_err 85 self._com_client = com_client 86 self.password_manager = CliPasswordProvider( 87 self.settings, self.prompt_secret 88 ) 89 90 def get_use_testnet(self) -> bool: 91 return self.ctx.obj.use_testnet 92 93 def get_node_url(self) -> str: 94 use_testnet = self.get_use_testnet() 95 return get_node_url(self.settings, use_testnet=use_testnet) 96 97 def com_client(self) -> CommuneClient: 98 if self._com_client is None: 99 node_url = self.get_node_url() 100 self.info(f"Using node: {node_url}") 101 for _ in range(5): 102 try: 103 self._com_client = CommuneClient( 104 url=node_url, 105 num_connections=1, 106 wait_for_finalization=False, 107 ) 108 except Exception: 109 self.info(f"Failed to connect to node: {node_url}") 110 node_url = self.get_node_url() 111 self.info(f"Will retry with node {node_url}") 112 continue 113 if self._com_client is None: 114 raise ConnectionError("Could not connect to any node") 115 116 return self._com_client 117 118 def output( 119 self, 120 message: str, 121 *args: tuple[Any, ...], 122 **kwargs: dict[str, Any], 123 ) -> None: 124 self.console.print(message, *args, **kwargs) # type: ignore 125 126 def info( 127 self, 128 message: str, 129 *args: tuple[Any, ...], 130 **kwargs: dict[str, Any], 131 ) -> None: 132 self.console_err.print(message, *args, **kwargs) # type: ignore 133 134 def error( 135 self, 136 message: str, 137 *args: tuple[Any, ...], 138 **kwargs: dict[str, Any], 139 ) -> None: 140 message = f"ERROR: {message}" 141 self.console_err.print(message, *args, style="bold red", **kwargs) # type: ignore 142 143 def progress_status(self, message: str): 144 return self.console_err.status(message) 145 146 def confirm(self, message: str) -> bool: 147 if self.ctx.obj.yes_to_all: 148 print(f"{message} (--yes)") 149 return True 150 return typer.confirm(message, err=True) 151 152 def prompt_secret(self, message: str) -> str: 153 return rich.prompt.Prompt.ask( 154 message, password=True, console=self.console_err 155 ) 156 157 def load_key(self, key: str, password: str | None = None) -> Keypair: 158 try: 159 keypair = try_classic_load_key( 160 key, password, password_provider=self.password_manager 161 ) 162 return keypair 163 except PasswordNotProvidedError: 164 self.error(f"Password not provided for key '{key}'") 165 raise typer.Exit(code=1) 166 except InvalidPasswordError: 167 self.error(f"Incorrect password for key '{key}'") 168 raise typer.Exit(code=1) 169 170 def resolve_key_ss58( 171 self, key: Ss58Address | Keypair | str, password: str | None = None 172 ) -> Ss58Address: 173 try: 174 address = resolve_key_ss58_encrypted( 175 key, password, password_provider=self.password_manager 176 ) 177 return address 178 except PasswordNotProvidedError: 179 self.error(f"Password not provided for key '{key}'") 180 raise typer.Exit(code=1) 181 except InvalidPasswordError: 182 self.error(f"Incorrect password for key '{key}'") 183 raise typer.Exit(code=1) 184 185 186def make_custom_context(ctx: typer.Context) -> CustomCtx: 187 return CustomCtx( 188 ctx=cast(ExtendedContext, ctx), # TODO: better check 189 settings=ComxSettings(), 190 console=Console(), 191 console_err=Console(stderr=True), 192 ) 193 194 195# Formatting 196 197 198def eprint(e: Any) -> None: 199 """ 200 Pretty prints an error. 201 """ 202 203 console = Console() 204 205 console.print(f"[bold red]ERROR: {e}", style="italic") 206 207 208def print_table_from_plain_dict( 209 result: Mapping[str, str | int | float | dict[Any, Any] | Ss58Address], 210 column_names: list[str], 211 console: Console, 212) -> None: 213 """ 214 Creates a table for a plain dictionary. 215 """ 216 217 table = Table(show_header=True, header_style="bold magenta") 218 219 for name in column_names: 220 table.add_column(name, style="white", vertical="middle") 221 222 # Add non-dictionary values to the table first 223 for key, value in result.items(): 224 if not isinstance(value, dict): 225 table.add_row(key, str(value)) 226 # Add subtables for nested dictionaries. 227 # Important to add after so that the display of the table is nicer. 228 for key, value in result.items(): 229 if isinstance(value, dict): 230 subtable = Table( 231 show_header=False, 232 padding=(0, 0, 0, 0), 233 border_style="bright_black", 234 ) 235 for subkey, subvalue in value.items(): 236 subtable.add_row(f"{subkey}: {subvalue}") 237 table.add_row(key, subtable) 238 239 console.print(table) 240 241 242def print_table_standardize( 243 result: dict[str, list[Any]], console: Console 244) -> None: 245 """ 246 Creates a table for a standardized dictionary. 247 """ 248 table = Table(show_header=True, header_style="bold magenta") 249 250 for key in result.keys(): 251 table.add_column(key, style="white") 252 rows = [*result.values()] 253 zipped_rows = [list(column) for column in zip(*rows)] 254 for row in zipped_rows: 255 table.add_row(*row, style="white") 256 257 console.print(table) 258 259 260def transform_module_into( 261 to_exclude: list[str], 262 last_block: int, 263 immunity_period: int, 264 modules: list[ModuleInfoWithOptionalBalance], 265 tempo: int, 266): 267 mods = cast(list[dict[str, Any]], modules) 268 transformed_modules: list[dict[str, Any]] = [] 269 for mod in mods: 270 module = mod.copy() 271 module_regblock = module["regblock"] 272 module["in_immunity"] = module_regblock + immunity_period > last_block 273 274 for key in to_exclude: 275 del module[key] 276 module["stake"] = round(from_nano(module["stake"]), 2) # type: ignore 277 module["emission"] = round(from_horus(module["emission"], tempo), 4) # type: ignore 278 if module.get("balance") is not None: 279 module["balance"] = from_nano(module["balance"]) # type: ignore 280 else: 281 # user should not see None values 282 del module["balance"] 283 transformed_modules.append(module) 284 285 return transformed_modules 286 287 288def print_module_info( 289 client: CommuneClient, 290 modules: list[ModuleInfoWithOptionalBalance], 291 console: Console, 292 netuid: int, 293 title: str | None = None, 294) -> None: 295 """ 296 Prints information about a module. 297 """ 298 if not modules: 299 return 300 301 # Get the current block number, we will need this to caluclate immunity period 302 block = client.get_block() 303 if block: 304 last_block = block["header"]["number"] 305 else: 306 raise ValueError("Could not get block info") 307 308 # Get the immunity period on the netuid 309 immunity_period = client.get_immunity_period(netuid) 310 tempo = client.get_tempo(netuid) 311 312 # Transform the module dictionary to have immunity_period 313 table = Table( 314 show_header=True, 315 header_style="bold magenta", 316 box=box.DOUBLE_EDGE, 317 title=title, 318 caption_style="chartreuse3", 319 title_style="bold magenta", 320 ) 321 322 to_exclude = ["stake_from", "last_update", "regblock"] 323 tranformed_modules = transform_module_into( 324 to_exclude, last_block, immunity_period, modules, tempo 325 ) 326 327 sample_mod = tranformed_modules[0] 328 for key in sample_mod.keys(): 329 # add columns 330 table.add_column(key, style="white") 331 332 total_stake = 0 333 total_balance = 0 334 335 for mod in tranformed_modules: 336 total_stake += mod["stake"] 337 if mod.get("balance") is not None: 338 total_balance += mod["balance"] 339 340 row: list[str] = [] 341 for val in mod.values(): 342 row.append(str(val)) 343 table.add_row(*row) 344 345 table.caption = "total balance: " + f"{total_balance + total_stake}J" 346 console.print(table) 347 for _ in range(3): 348 console.print() 349 350 351def get_universal_password(ctx: CustomCtx) -> str: 352 ctx.info("Please provide the universal password for all keys") 353 universal_password = getpass() 354 return universal_password 355 356 357def tranform_network_params(params: NetworkParams): 358 """Transform network params to be human readable.""" 359 governance_config = params["governance_config"] 360 allocation = governance_config["proposal_reward_treasury_allocation"] 361 governance_config = cast(dict[str, Any], governance_config) 362 governance_config["proposal_reward_treasury_allocation"] = f"{allocation}%" 363 params_ = cast(dict[str, Any], params) 364 params_["governance_config"] = governance_config 365 general_params = dict_from_nano( 366 params_, 367 [ 368 "min_weight_stake", 369 "general_subnet_application_cost", 370 "subnet_registration_cost", 371 "proposal_cost", 372 "max_proposal_reward_treasury_allocation", 373 ], 374 ) 375 376 return general_params 377 378 379T = TypeVar("T") 380V = TypeVar("V") 381 382 383def remove_none_values(data: dict[T, V | None]) -> dict[T, V]: 384 """ 385 Removes key-value pairs from a dictionary where the value is None. 386 Works recursively for nested dictionaries. 387 """ 388 cleaned_data: dict[T, V] = {} 389 for key, value in data.items(): 390 if isinstance(value, dict): 391 cleaned_value = remove_none_values(value) # type: ignore 392 if cleaned_value is not None: # type: ignore 393 cleaned_data[key] = cleaned_value 394 elif value is not None: 395 cleaned_data[key] = value 396 return cleaned_data 397 398 399def transform_subnet_params(params: dict[int, SubnetParamsWithEmission]): 400 """Transform subnet params to be human readable.""" 401 params_ = cast(dict[int, Any], params) 402 display_params = remove_none_values(params_) 403 display_params = dict_from_nano( 404 display_params, 405 [ 406 "bonds_ma", 407 "min_burn", 408 "max_burn", 409 "min_weight_stake", 410 "proposal_cost", 411 "max_proposal_reward_treasury_allocation", 412 "min_validator_stake", 413 ], 414 ) 415 return display_params
The context is a special internal object that holds state relevant for the script execution at every single level. It's normally invisible to commands unless they opt-in to getting access to it.
The context is useful as it can pass internal objects around and can control special execution features such as reading data from environment variables.
A context can be used as context manager in which case it will call
close()
on teardown.
Parameters
- command: the command class for this context.
- parent: the parent context.
- info_name: the info name for this invocation. Generally this is the most descriptive name for the script or command. For the toplevel script it is usually the name of the script, for commands below it it's the name of the script.
- obj: an arbitrary object of user data.
- auto_envvar_prefix: the prefix to use for automatic environment
variables. If this is
None
then reading from environment variables is disabled. This does not affect manually set environment variables which are always read. - default_map: a dictionary (like object) with default values for parameters.
- terminal_width: the width of the terminal. The default is inherit from parent context. If no context defines the terminal width then auto detection will be applied.
- max_content_width: the maximum width for content rendered by Click (this currently only affects help pages). This defaults to 80 characters if not overridden. In other words: even if the terminal is larger than that, Click will not format things wider than 80 characters by default. In addition to that, formatters might add some safety mapping on the right.
- resilient_parsing: if this flag is enabled then Click will parse without any interactivity or callback invocation. Default values will also be ignored. This is useful for implementing things such as completion support.
- allow_extra_args: if this is set to
True
then extra arguments at the end will not raise an error and will be kept on the context. The default is to inherit from the command. - allow_interspersed_args: if this is set to
False
then options and arguments cannot be mixed. The default is to inherit from the command. - ignore_unknown_options: instructs click to ignore options it does not know and keeps them for later processing.
- help_option_names: optionally a list of strings that define how
the default help parameter is named. The
default is
['--help']
. - token_normalize_func: an optional function that is used to normalize tokens (options, choices, etc.). This for instance can be used to implement case insensitive behavior.
- color: controls if the terminal supports ANSI colors or not. The default is autodetection. This is only needed if ANSI codes are used in texts that Click prints which is by default not the case. This for instance would affect help output.
- show_default: Show the default value for commands. If this
value is not set, it defaults to the value from the parent
context.
Command.show_default
overrides this default for the specific command.
Changed in version 8.1:
The show_default
parameter is overridden by
Command.show_default
, instead of the other way around.
Changed in version 8.0:
The show_default
parameter defaults to the value from the
parent context.
Changed in version 7.1:
Added the show_default
parameter.
Changed in version 4.0:
Added the color
, ignore_unknown_options
, and
max_content_width
parameters.
Changed in version 3.0:
Added the allow_extra_args
and allow_interspersed_args
parameters.
Changed in version 2.0:
Added the resilient_parsing
, help_option_names
, and
token_normalize_func
parameters.
Inherited Members
- click.core.Context
- Context
- formatter_class
- parent
- command
- info_name
- params
- args
- protected_args
- default_map
- invoked_subcommand
- terminal_width
- max_content_width
- allow_extra_args
- allow_interspersed_args
- ignore_unknown_options
- help_option_names
- token_normalize_func
- resilient_parsing
- auto_envvar_prefix
- color
- show_default
- to_info_dict
- scope
- meta
- make_formatter
- with_resource
- call_on_close
- close
- command_path
- find_root
- find_object
- ensure_object
- lookup_default
- fail
- abort
- exit
- get_usage
- get_help
- invoke
- forward
- set_parameter_source
- get_parameter_source
39class CliPasswordProvider: 40 def __init__( 41 self, settings: ComxSettings, prompt_secret: Callable[[str], str] 42 ): 43 self.settings = settings 44 self.prompt_secret = prompt_secret 45 46 def get_password(self, key_name: str) -> str | None: 47 key_map = self.settings.KEY_PASSWORDS 48 if key_map is not None: 49 password = key_map.get(key_name) 50 if password is not None: 51 return password.get_secret_value() 52 # fallback to universal password 53 password = self.settings.UNIVERSAL_PASSWORD 54 if password is not None: 55 return password.get_secret_value() 56 else: 57 return None 58 59 def ask_password(self, key_name: str) -> str: 60 password = self.prompt_secret( 61 f"Please provide the password for the key '{key_name}'" 62 ) 63 return password
46 def get_password(self, key_name: str) -> str | None: 47 key_map = self.settings.KEY_PASSWORDS 48 if key_map is not None: 49 password = key_map.get(key_name) 50 if password is not None: 51 return password.get_secret_value() 52 # fallback to universal password 53 password = self.settings.UNIVERSAL_PASSWORD 54 if password is not None: 55 return password.get_secret_value() 56 else: 57 return None
66class CustomCtx: 67 ctx: ExtendedContext 68 settings: ComxSettings 69 console: rich.console.Console 70 console_err: rich.console.Console 71 password_manager: CliPasswordProvider 72 _com_client: CommuneClient | None = None 73 74 def __init__( 75 self, 76 ctx: ExtendedContext, 77 settings: ComxSettings, 78 console: rich.console.Console, 79 console_err: rich.console.Console, 80 com_client: CommuneClient | None = None, 81 ): 82 self.ctx = ctx 83 self.settings = settings 84 self.console = console 85 self.console_err = console_err 86 self._com_client = com_client 87 self.password_manager = CliPasswordProvider( 88 self.settings, self.prompt_secret 89 ) 90 91 def get_use_testnet(self) -> bool: 92 return self.ctx.obj.use_testnet 93 94 def get_node_url(self) -> str: 95 use_testnet = self.get_use_testnet() 96 return get_node_url(self.settings, use_testnet=use_testnet) 97 98 def com_client(self) -> CommuneClient: 99 if self._com_client is None: 100 node_url = self.get_node_url() 101 self.info(f"Using node: {node_url}") 102 for _ in range(5): 103 try: 104 self._com_client = CommuneClient( 105 url=node_url, 106 num_connections=1, 107 wait_for_finalization=False, 108 ) 109 except Exception: 110 self.info(f"Failed to connect to node: {node_url}") 111 node_url = self.get_node_url() 112 self.info(f"Will retry with node {node_url}") 113 continue 114 if self._com_client is None: 115 raise ConnectionError("Could not connect to any node") 116 117 return self._com_client 118 119 def output( 120 self, 121 message: str, 122 *args: tuple[Any, ...], 123 **kwargs: dict[str, Any], 124 ) -> None: 125 self.console.print(message, *args, **kwargs) # type: ignore 126 127 def info( 128 self, 129 message: str, 130 *args: tuple[Any, ...], 131 **kwargs: dict[str, Any], 132 ) -> None: 133 self.console_err.print(message, *args, **kwargs) # type: ignore 134 135 def error( 136 self, 137 message: str, 138 *args: tuple[Any, ...], 139 **kwargs: dict[str, Any], 140 ) -> None: 141 message = f"ERROR: {message}" 142 self.console_err.print(message, *args, style="bold red", **kwargs) # type: ignore 143 144 def progress_status(self, message: str): 145 return self.console_err.status(message) 146 147 def confirm(self, message: str) -> bool: 148 if self.ctx.obj.yes_to_all: 149 print(f"{message} (--yes)") 150 return True 151 return typer.confirm(message, err=True) 152 153 def prompt_secret(self, message: str) -> str: 154 return rich.prompt.Prompt.ask( 155 message, password=True, console=self.console_err 156 ) 157 158 def load_key(self, key: str, password: str | None = None) -> Keypair: 159 try: 160 keypair = try_classic_load_key( 161 key, password, password_provider=self.password_manager 162 ) 163 return keypair 164 except PasswordNotProvidedError: 165 self.error(f"Password not provided for key '{key}'") 166 raise typer.Exit(code=1) 167 except InvalidPasswordError: 168 self.error(f"Incorrect password for key '{key}'") 169 raise typer.Exit(code=1) 170 171 def resolve_key_ss58( 172 self, key: Ss58Address | Keypair | str, password: str | None = None 173 ) -> Ss58Address: 174 try: 175 address = resolve_key_ss58_encrypted( 176 key, password, password_provider=self.password_manager 177 ) 178 return address 179 except PasswordNotProvidedError: 180 self.error(f"Password not provided for key '{key}'") 181 raise typer.Exit(code=1) 182 except InvalidPasswordError: 183 self.error(f"Incorrect password for key '{key}'") 184 raise typer.Exit(code=1)
74 def __init__( 75 self, 76 ctx: ExtendedContext, 77 settings: ComxSettings, 78 console: rich.console.Console, 79 console_err: rich.console.Console, 80 com_client: CommuneClient | None = None, 81 ): 82 self.ctx = ctx 83 self.settings = settings 84 self.console = console 85 self.console_err = console_err 86 self._com_client = com_client 87 self.password_manager = CliPasswordProvider( 88 self.settings, self.prompt_secret 89 )
98 def com_client(self) -> CommuneClient: 99 if self._com_client is None: 100 node_url = self.get_node_url() 101 self.info(f"Using node: {node_url}") 102 for _ in range(5): 103 try: 104 self._com_client = CommuneClient( 105 url=node_url, 106 num_connections=1, 107 wait_for_finalization=False, 108 ) 109 except Exception: 110 self.info(f"Failed to connect to node: {node_url}") 111 node_url = self.get_node_url() 112 self.info(f"Will retry with node {node_url}") 113 continue 114 if self._com_client is None: 115 raise ConnectionError("Could not connect to any node") 116 117 return self._com_client
158 def load_key(self, key: str, password: str | None = None) -> Keypair: 159 try: 160 keypair = try_classic_load_key( 161 key, password, password_provider=self.password_manager 162 ) 163 return keypair 164 except PasswordNotProvidedError: 165 self.error(f"Password not provided for key '{key}'") 166 raise typer.Exit(code=1) 167 except InvalidPasswordError: 168 self.error(f"Incorrect password for key '{key}'") 169 raise typer.Exit(code=1)
171 def resolve_key_ss58( 172 self, key: Ss58Address | Keypair | str, password: str | None = None 173 ) -> Ss58Address: 174 try: 175 address = resolve_key_ss58_encrypted( 176 key, password, password_provider=self.password_manager 177 ) 178 return address 179 except PasswordNotProvidedError: 180 self.error(f"Password not provided for key '{key}'") 181 raise typer.Exit(code=1) 182 except InvalidPasswordError: 183 self.error(f"Incorrect password for key '{key}'") 184 raise typer.Exit(code=1)
199def eprint(e: Any) -> None: 200 """ 201 Pretty prints an error. 202 """ 203 204 console = Console() 205 206 console.print(f"[bold red]ERROR: {e}", style="italic")
Pretty prints an error.
209def print_table_from_plain_dict( 210 result: Mapping[str, str | int | float | dict[Any, Any] | Ss58Address], 211 column_names: list[str], 212 console: Console, 213) -> None: 214 """ 215 Creates a table for a plain dictionary. 216 """ 217 218 table = Table(show_header=True, header_style="bold magenta") 219 220 for name in column_names: 221 table.add_column(name, style="white", vertical="middle") 222 223 # Add non-dictionary values to the table first 224 for key, value in result.items(): 225 if not isinstance(value, dict): 226 table.add_row(key, str(value)) 227 # Add subtables for nested dictionaries. 228 # Important to add after so that the display of the table is nicer. 229 for key, value in result.items(): 230 if isinstance(value, dict): 231 subtable = Table( 232 show_header=False, 233 padding=(0, 0, 0, 0), 234 border_style="bright_black", 235 ) 236 for subkey, subvalue in value.items(): 237 subtable.add_row(f"{subkey}: {subvalue}") 238 table.add_row(key, subtable) 239 240 console.print(table)
Creates a table for a plain dictionary.
243def print_table_standardize( 244 result: dict[str, list[Any]], console: Console 245) -> None: 246 """ 247 Creates a table for a standardized dictionary. 248 """ 249 table = Table(show_header=True, header_style="bold magenta") 250 251 for key in result.keys(): 252 table.add_column(key, style="white") 253 rows = [*result.values()] 254 zipped_rows = [list(column) for column in zip(*rows)] 255 for row in zipped_rows: 256 table.add_row(*row, style="white") 257 258 console.print(table)
Creates a table for a standardized dictionary.
261def transform_module_into( 262 to_exclude: list[str], 263 last_block: int, 264 immunity_period: int, 265 modules: list[ModuleInfoWithOptionalBalance], 266 tempo: int, 267): 268 mods = cast(list[dict[str, Any]], modules) 269 transformed_modules: list[dict[str, Any]] = [] 270 for mod in mods: 271 module = mod.copy() 272 module_regblock = module["regblock"] 273 module["in_immunity"] = module_regblock + immunity_period > last_block 274 275 for key in to_exclude: 276 del module[key] 277 module["stake"] = round(from_nano(module["stake"]), 2) # type: ignore 278 module["emission"] = round(from_horus(module["emission"], tempo), 4) # type: ignore 279 if module.get("balance") is not None: 280 module["balance"] = from_nano(module["balance"]) # type: ignore 281 else: 282 # user should not see None values 283 del module["balance"] 284 transformed_modules.append(module) 285 286 return transformed_modules
289def print_module_info( 290 client: CommuneClient, 291 modules: list[ModuleInfoWithOptionalBalance], 292 console: Console, 293 netuid: int, 294 title: str | None = None, 295) -> None: 296 """ 297 Prints information about a module. 298 """ 299 if not modules: 300 return 301 302 # Get the current block number, we will need this to caluclate immunity period 303 block = client.get_block() 304 if block: 305 last_block = block["header"]["number"] 306 else: 307 raise ValueError("Could not get block info") 308 309 # Get the immunity period on the netuid 310 immunity_period = client.get_immunity_period(netuid) 311 tempo = client.get_tempo(netuid) 312 313 # Transform the module dictionary to have immunity_period 314 table = Table( 315 show_header=True, 316 header_style="bold magenta", 317 box=box.DOUBLE_EDGE, 318 title=title, 319 caption_style="chartreuse3", 320 title_style="bold magenta", 321 ) 322 323 to_exclude = ["stake_from", "last_update", "regblock"] 324 tranformed_modules = transform_module_into( 325 to_exclude, last_block, immunity_period, modules, tempo 326 ) 327 328 sample_mod = tranformed_modules[0] 329 for key in sample_mod.keys(): 330 # add columns 331 table.add_column(key, style="white") 332 333 total_stake = 0 334 total_balance = 0 335 336 for mod in tranformed_modules: 337 total_stake += mod["stake"] 338 if mod.get("balance") is not None: 339 total_balance += mod["balance"] 340 341 row: list[str] = [] 342 for val in mod.values(): 343 row.append(str(val)) 344 table.add_row(*row) 345 346 table.caption = "total balance: " + f"{total_balance + total_stake}J" 347 console.print(table) 348 for _ in range(3): 349 console.print()
Prints information about a module.
358def tranform_network_params(params: NetworkParams): 359 """Transform network params to be human readable.""" 360 governance_config = params["governance_config"] 361 allocation = governance_config["proposal_reward_treasury_allocation"] 362 governance_config = cast(dict[str, Any], governance_config) 363 governance_config["proposal_reward_treasury_allocation"] = f"{allocation}%" 364 params_ = cast(dict[str, Any], params) 365 params_["governance_config"] = governance_config 366 general_params = dict_from_nano( 367 params_, 368 [ 369 "min_weight_stake", 370 "general_subnet_application_cost", 371 "subnet_registration_cost", 372 "proposal_cost", 373 "max_proposal_reward_treasury_allocation", 374 ], 375 ) 376 377 return general_params
Transform network params to be human readable.
384def remove_none_values(data: dict[T, V | None]) -> dict[T, V]: 385 """ 386 Removes key-value pairs from a dictionary where the value is None. 387 Works recursively for nested dictionaries. 388 """ 389 cleaned_data: dict[T, V] = {} 390 for key, value in data.items(): 391 if isinstance(value, dict): 392 cleaned_value = remove_none_values(value) # type: ignore 393 if cleaned_value is not None: # type: ignore 394 cleaned_data[key] = cleaned_value 395 elif value is not None: 396 cleaned_data[key] = value 397 return cleaned_data
Removes key-value pairs from a dictionary where the value is None. Works recursively for nested dictionaries.
400def transform_subnet_params(params: dict[int, SubnetParamsWithEmission]): 401 """Transform subnet params to be human readable.""" 402 params_ = cast(dict[int, Any], params) 403 display_params = remove_none_values(params_) 404 display_params = dict_from_nano( 405 display_params, 406 [ 407 "bonds_ma", 408 "min_burn", 409 "max_burn", 410 "min_weight_stake", 411 "proposal_cost", 412 "max_proposal_reward_treasury_allocation", 413 "min_validator_stake", 414 ], 415 ) 416 return display_params
Transform subnet params to be human readable.