communex.cli._common
1from dataclasses import dataclass 2from getpass import getpass 3from typing import Any, Callable, Mapping, TypeVar, cast 4 5import rich 6import rich.prompt 7import typer 8from rich import box 9from rich.console import Console 10from rich.table import Table 11from substrateinterface import Keypair 12from typer import Context 13 14from communex._common import ComxSettings, get_node_url 15from communex.balance import dict_from_nano, from_horus, from_nano 16from communex.client import CommuneClient 17from communex.compat.key import resolve_key_ss58_encrypted, try_classic_load_key 18from communex.errors import InvalidPasswordError, PasswordNotProvidedError 19from communex.types import ( 20 ModuleInfoWithOptionalBalance, 21 NetworkParams, 22 Ss58Address, 23 SubnetParamsWithEmission, 24) 25 26 27@dataclass 28class ExtraCtxData: 29 output_json: bool 30 use_testnet: bool 31 yes_to_all: bool 32 33 34class ExtendedContext(Context): 35 obj: ExtraCtxData 36 37 38class CliPasswordProvider: 39 def __init__( 40 self, settings: ComxSettings, prompt_secret: Callable[[str], str] 41 ): 42 self.settings = settings 43 self.prompt_secret = prompt_secret 44 45 def get_password(self, key_name: str) -> str | None: 46 key_map = self.settings.KEY_PASSWORDS 47 if key_map is not None: 48 password = key_map.get(key_name) 49 if password is not None: 50 return password.get_secret_value() 51 # fallback to universal password 52 password = self.settings.UNIVERSAL_PASSWORD 53 if password is not None: 54 return password.get_secret_value() 55 else: 56 return None 57 58 def ask_password(self, key_name: str) -> str: 59 password = self.prompt_secret( 60 f"Please provide the password for the key '{key_name}'" 61 ) 62 return password 63 64 65class CustomCtx: 66 ctx: ExtendedContext 67 settings: ComxSettings 68 console: rich.console.Console 69 console_err: rich.console.Console 70 password_manager: CliPasswordProvider 71 _com_client: CommuneClient | None = None 72 73 def __init__( 74 self, 75 ctx: ExtendedContext, 76 settings: ComxSettings, 77 console: rich.console.Console, 78 console_err: rich.console.Console, 79 com_client: CommuneClient | None = None, 80 ): 81 self.ctx = ctx 82 self.settings = settings 83 self.console = console 84 self.console_err = console_err 85 self._com_client = com_client 86 self.password_manager = CliPasswordProvider( 87 self.settings, self.prompt_secret 88 ) 89 90 def get_use_testnet(self) -> bool: 91 return self.ctx.obj.use_testnet 92 93 def get_node_url(self) -> str: 94 use_testnet = self.get_use_testnet() 95 return get_node_url(self.settings, use_testnet=use_testnet) 96 97 def com_client(self) -> CommuneClient: 98 if self._com_client is None: 99 node_url = self.get_node_url() 100 self.info(f"Using node: {node_url}") 101 for _ in range(5): 102 try: 103 self._com_client = CommuneClient( 104 url=node_url, 105 num_connections=1, 106 wait_for_finalization=False, 107 timeout=65, 108 ) 109 except Exception: 110 self.info(f"Failed to connect to node: {node_url}") 111 node_url = self.get_node_url() 112 self.info(f"Will retry with node {node_url}") 113 continue 114 else: 115 break 116 if self._com_client is None: 117 raise ConnectionError("Could not connect to any node") 118 119 return self._com_client 120 121 def output( 122 self, 123 message: str, 124 *args: tuple[Any, ...], 125 **kwargs: dict[str, Any], 126 ) -> None: 127 self.console.print(message, *args, **kwargs) # type: ignore 128 129 def info( 130 self, 131 message: str, 132 *args: tuple[Any, ...], 133 **kwargs: dict[str, Any], 134 ) -> None: 135 self.console_err.print(message, *args, **kwargs) # type: ignore 136 137 def error( 138 self, 139 message: str, 140 *args: tuple[Any, ...], 141 **kwargs: dict[str, Any], 142 ) -> None: 143 message = f"ERROR: {message}" 144 self.console_err.print(message, *args, style="bold red", **kwargs) # type: ignore 145 146 def progress_status(self, message: str): 147 return self.console_err.status(message) 148 149 def confirm(self, message: str) -> bool: 150 if self.ctx.obj.yes_to_all: 151 print(f"{message} (--yes)") 152 return True 153 return typer.confirm(message, err=True) 154 155 def prompt_secret(self, message: str) -> str: 156 return rich.prompt.Prompt.ask( 157 message, password=True, console=self.console_err 158 ) 159 160 def load_key(self, key: str, password: str | None = None) -> Keypair: 161 try: 162 keypair = try_classic_load_key( 163 key, password, password_provider=self.password_manager 164 ) 165 return keypair 166 except PasswordNotProvidedError: 167 self.error(f"Password not provided for key '{key}'") 168 raise typer.Exit(code=1) 169 except InvalidPasswordError: 170 self.error(f"Incorrect password for key '{key}'") 171 raise typer.Exit(code=1) 172 173 def resolve_key_ss58( 174 self, key: Ss58Address | Keypair | str, password: str | None = None 175 ) -> Ss58Address: 176 try: 177 address = resolve_key_ss58_encrypted( 178 key, password, password_provider=self.password_manager 179 ) 180 return address 181 except PasswordNotProvidedError: 182 self.error(f"Password not provided for key '{key}'") 183 raise typer.Exit(code=1) 184 except InvalidPasswordError: 185 self.error(f"Incorrect password for key '{key}'") 186 raise typer.Exit(code=1) 187 188 189def make_custom_context(ctx: typer.Context) -> CustomCtx: 190 return CustomCtx( 191 ctx=cast(ExtendedContext, ctx), # TODO: better check 192 settings=ComxSettings(), 193 console=Console(), 194 console_err=Console(stderr=True), 195 ) 196 197 198# Formatting 199 200 201def eprint(e: Any) -> None: 202 """ 203 Pretty prints an error. 204 """ 205 206 console = Console() 207 208 console.print(f"[bold red]ERROR: {e}", style="italic") 209 210 211def print_table_from_plain_dict( 212 result: Mapping[str, str | int | float | dict[Any, Any] | Ss58Address], 213 column_names: list[str], 214 console: Console, 215) -> None: 216 """ 217 Creates a table for a plain dictionary. 218 """ 219 220 table = Table(show_header=True, header_style="bold magenta") 221 222 for name in column_names: 223 table.add_column(name, style="white", vertical="middle") 224 225 # Add non-dictionary values to the table first 226 for key, value in result.items(): 227 if not isinstance(value, dict): 228 table.add_row(key, str(value)) 229 # Add subtables for nested dictionaries. 230 # Important to add after so that the display of the table is nicer. 231 for key, value in result.items(): 232 if isinstance(value, dict): 233 subtable = Table( 234 show_header=False, 235 padding=(0, 0, 0, 0), 236 border_style="bright_black", 237 ) 238 for subkey, subvalue in value.items(): 239 subtable.add_row(f"{subkey}: {subvalue}") 240 table.add_row(key, subtable) 241 242 console.print(table) 243 244 245def print_table_standardize( 246 result: dict[str, list[Any]], console: Console 247) -> None: 248 """ 249 Creates a table for a standardized dictionary. 250 """ 251 table = Table(show_header=True, header_style="bold magenta") 252 253 for key in result.keys(): 254 table.add_column(key, style="white") 255 rows = [*result.values()] 256 zipped_rows = [list(column) for column in zip(*rows)] 257 for row in zipped_rows: 258 table.add_row(*row, style="white") 259 260 console.print(table) 261 262 263def transform_module_into( 264 to_exclude: list[str], 265 last_block: int, 266 immunity_period: int, 267 modules: list[ModuleInfoWithOptionalBalance], 268 tempo: int, 269): 270 mods = cast(list[dict[str, Any]], modules) 271 transformed_modules: list[dict[str, Any]] = [] 272 for mod in mods: 273 module = mod.copy() 274 module_regblock = module["regblock"] 275 module["in_immunity"] = module_regblock + immunity_period > last_block 276 277 for key in to_exclude: 278 del module[key] 279 module["stake"] = round(from_nano(module["stake"]), 2) # type: ignore 280 module["emission"] = round(from_horus(module["emission"], tempo), 4) # type: ignore 281 if module.get("balance") is not None: 282 module["balance"] = from_nano(module["balance"]) # type: ignore 283 else: 284 # user should not see None values 285 del module["balance"] 286 transformed_modules.append(module) 287 288 return transformed_modules 289 290 291def print_module_info( 292 client: CommuneClient, 293 modules: list[ModuleInfoWithOptionalBalance], 294 console: Console, 295 netuid: int, 296 title: str | None = None, 297) -> None: 298 """ 299 Prints information about a module. 300 """ 301 if not modules: 302 return 303 304 # Get the current block number, we will need this to caluclate immunity period 305 block = client.get_block() 306 if block: 307 last_block = block["header"]["number"] 308 else: 309 raise ValueError("Could not get block info") 310 311 # Get the immunity period on the netuid 312 immunity_period = client.get_immunity_period(netuid) 313 tempo = client.get_tempo(netuid) 314 315 # Transform the module dictionary to have immunity_period 316 table = Table( 317 show_header=True, 318 header_style="bold magenta", 319 box=box.DOUBLE_EDGE, 320 title=title, 321 caption_style="chartreuse3", 322 title_style="bold magenta", 323 ) 324 325 to_exclude = ["stake_from", "last_update", "regblock"] 326 tranformed_modules = transform_module_into( 327 to_exclude, last_block, immunity_period, modules, tempo 328 ) 329 330 sample_mod = tranformed_modules[0] 331 for key in sample_mod.keys(): 332 # add columns 333 table.add_column(key, style="white") 334 335 total_stake = 0 336 total_balance = 0 337 338 for mod in tranformed_modules: 339 total_stake += mod["stake"] 340 if mod.get("balance") is not None: 341 total_balance += mod["balance"] 342 343 row: list[str] = [] 344 for val in mod.values(): 345 row.append(str(val)) 346 table.add_row(*row) 347 348 table.caption = "total balance: " + f"{total_balance + total_stake}J" 349 console.print(table) 350 for _ in range(3): 351 console.print() 352 353 354def get_universal_password(ctx: CustomCtx) -> str: 355 ctx.info("Please provide the universal password for all keys") 356 universal_password = getpass() 357 return universal_password 358 359 360def tranform_network_params(params: NetworkParams): 361 """Transform network params to be human readable.""" 362 governance_config = params["governance_config"] 363 allocation = governance_config["proposal_reward_treasury_allocation"] 364 governance_config = cast(dict[str, Any], governance_config) 365 governance_config["proposal_reward_treasury_allocation"] = f"{allocation}%" 366 params_ = cast(dict[str, Any], params) 367 params_["governance_config"] = governance_config 368 general_params = dict_from_nano( 369 params_, 370 [ 371 "min_weight_stake", 372 "general_subnet_application_cost", 373 "subnet_registration_cost", 374 "proposal_cost", 375 "max_proposal_reward_treasury_allocation", 376 ], 377 ) 378 379 return general_params 380 381 382T = TypeVar("T") 383V = TypeVar("V") 384 385 386def remove_none_values(data: dict[T, V | None]) -> dict[T, V]: 387 """ 388 Removes key-value pairs from a dictionary where the value is None. 389 Works recursively for nested dictionaries. 390 """ 391 cleaned_data: dict[T, V] = {} 392 for key, value in data.items(): 393 if isinstance(value, dict): 394 cleaned_value = remove_none_values(value) # type: ignore 395 if cleaned_value is not None: # type: ignore 396 cleaned_data[key] = cleaned_value 397 elif value is not None: 398 cleaned_data[key] = value 399 return cleaned_data 400 401 402def transform_subnet_params(params: dict[int, SubnetParamsWithEmission]): 403 """Transform subnet params to be human readable.""" 404 params_ = cast(dict[int, Any], params) 405 display_params = remove_none_values(params_) 406 display_params = dict_from_nano( 407 display_params, 408 [ 409 "bonds_ma", 410 "min_burn", 411 "max_burn", 412 "min_weight_stake", 413 "proposal_cost", 414 "max_proposal_reward_treasury_allocation", 415 "min_validator_stake", 416 ], 417 ) 418 return display_params
The context is a special internal object that holds state relevant for the script execution at every single level. It's normally invisible to commands unless they opt-in to getting access to it.
The context is useful as it can pass internal objects around and can control special execution features such as reading data from environment variables.
A context can be used as context manager in which case it will call
close()
on teardown.
Parameters
- command: the command class for this context.
- parent: the parent context.
- info_name: the info name for this invocation. Generally this is the most descriptive name for the script or command. For the toplevel script it is usually the name of the script, for commands below it it's the name of the script.
- obj: an arbitrary object of user data.
- auto_envvar_prefix: the prefix to use for automatic environment
variables. If this is
None
then reading from environment variables is disabled. This does not affect manually set environment variables which are always read. - default_map: a dictionary (like object) with default values for parameters.
- terminal_width: the width of the terminal. The default is inherit from parent context. If no context defines the terminal width then auto detection will be applied.
- max_content_width: the maximum width for content rendered by Click (this currently only affects help pages). This defaults to 80 characters if not overridden. In other words: even if the terminal is larger than that, Click will not format things wider than 80 characters by default. In addition to that, formatters might add some safety mapping on the right.
- resilient_parsing: if this flag is enabled then Click will parse without any interactivity or callback invocation. Default values will also be ignored. This is useful for implementing things such as completion support.
- allow_extra_args: if this is set to
True
then extra arguments at the end will not raise an error and will be kept on the context. The default is to inherit from the command. - allow_interspersed_args: if this is set to
False
then options and arguments cannot be mixed. The default is to inherit from the command. - ignore_unknown_options: instructs click to ignore options it does not know and keeps them for later processing.
- help_option_names: optionally a list of strings that define how
the default help parameter is named. The
default is
['--help']
. - token_normalize_func: an optional function that is used to normalize tokens (options, choices, etc.). This for instance can be used to implement case insensitive behavior.
- color: controls if the terminal supports ANSI colors or not. The default is autodetection. This is only needed if ANSI codes are used in texts that Click prints which is by default not the case. This for instance would affect help output.
- show_default: Show the default value for commands. If this
value is not set, it defaults to the value from the parent
context.
Command.show_default
overrides this default for the specific command.
Changed in version 8.1:
The show_default
parameter is overridden by
Command.show_default
, instead of the other way around.
Changed in version 8.0:
The show_default
parameter defaults to the value from the
parent context.
Changed in version 7.1:
Added the show_default
parameter.
Changed in version 4.0:
Added the color
, ignore_unknown_options
, and
max_content_width
parameters.
Changed in version 3.0:
Added the allow_extra_args
and allow_interspersed_args
parameters.
Changed in version 2.0:
Added the resilient_parsing
, help_option_names
, and
token_normalize_func
parameters.
Inherited Members
- click.core.Context
- Context
- formatter_class
- parent
- command
- info_name
- params
- args
- protected_args
- default_map
- invoked_subcommand
- terminal_width
- max_content_width
- allow_extra_args
- allow_interspersed_args
- ignore_unknown_options
- help_option_names
- token_normalize_func
- resilient_parsing
- auto_envvar_prefix
- color
- show_default
- to_info_dict
- scope
- meta
- make_formatter
- with_resource
- call_on_close
- close
- command_path
- find_root
- find_object
- ensure_object
- lookup_default
- fail
- abort
- exit
- get_usage
- get_help
- invoke
- forward
- set_parameter_source
- get_parameter_source
39class CliPasswordProvider: 40 def __init__( 41 self, settings: ComxSettings, prompt_secret: Callable[[str], str] 42 ): 43 self.settings = settings 44 self.prompt_secret = prompt_secret 45 46 def get_password(self, key_name: str) -> str | None: 47 key_map = self.settings.KEY_PASSWORDS 48 if key_map is not None: 49 password = key_map.get(key_name) 50 if password is not None: 51 return password.get_secret_value() 52 # fallback to universal password 53 password = self.settings.UNIVERSAL_PASSWORD 54 if password is not None: 55 return password.get_secret_value() 56 else: 57 return None 58 59 def ask_password(self, key_name: str) -> str: 60 password = self.prompt_secret( 61 f"Please provide the password for the key '{key_name}'" 62 ) 63 return password
46 def get_password(self, key_name: str) -> str | None: 47 key_map = self.settings.KEY_PASSWORDS 48 if key_map is not None: 49 password = key_map.get(key_name) 50 if password is not None: 51 return password.get_secret_value() 52 # fallback to universal password 53 password = self.settings.UNIVERSAL_PASSWORD 54 if password is not None: 55 return password.get_secret_value() 56 else: 57 return None
66class CustomCtx: 67 ctx: ExtendedContext 68 settings: ComxSettings 69 console: rich.console.Console 70 console_err: rich.console.Console 71 password_manager: CliPasswordProvider 72 _com_client: CommuneClient | None = None 73 74 def __init__( 75 self, 76 ctx: ExtendedContext, 77 settings: ComxSettings, 78 console: rich.console.Console, 79 console_err: rich.console.Console, 80 com_client: CommuneClient | None = None, 81 ): 82 self.ctx = ctx 83 self.settings = settings 84 self.console = console 85 self.console_err = console_err 86 self._com_client = com_client 87 self.password_manager = CliPasswordProvider( 88 self.settings, self.prompt_secret 89 ) 90 91 def get_use_testnet(self) -> bool: 92 return self.ctx.obj.use_testnet 93 94 def get_node_url(self) -> str: 95 use_testnet = self.get_use_testnet() 96 return get_node_url(self.settings, use_testnet=use_testnet) 97 98 def com_client(self) -> CommuneClient: 99 if self._com_client is None: 100 node_url = self.get_node_url() 101 self.info(f"Using node: {node_url}") 102 for _ in range(5): 103 try: 104 self._com_client = CommuneClient( 105 url=node_url, 106 num_connections=1, 107 wait_for_finalization=False, 108 timeout=65, 109 ) 110 except Exception: 111 self.info(f"Failed to connect to node: {node_url}") 112 node_url = self.get_node_url() 113 self.info(f"Will retry with node {node_url}") 114 continue 115 else: 116 break 117 if self._com_client is None: 118 raise ConnectionError("Could not connect to any node") 119 120 return self._com_client 121 122 def output( 123 self, 124 message: str, 125 *args: tuple[Any, ...], 126 **kwargs: dict[str, Any], 127 ) -> None: 128 self.console.print(message, *args, **kwargs) # type: ignore 129 130 def info( 131 self, 132 message: str, 133 *args: tuple[Any, ...], 134 **kwargs: dict[str, Any], 135 ) -> None: 136 self.console_err.print(message, *args, **kwargs) # type: ignore 137 138 def error( 139 self, 140 message: str, 141 *args: tuple[Any, ...], 142 **kwargs: dict[str, Any], 143 ) -> None: 144 message = f"ERROR: {message}" 145 self.console_err.print(message, *args, style="bold red", **kwargs) # type: ignore 146 147 def progress_status(self, message: str): 148 return self.console_err.status(message) 149 150 def confirm(self, message: str) -> bool: 151 if self.ctx.obj.yes_to_all: 152 print(f"{message} (--yes)") 153 return True 154 return typer.confirm(message, err=True) 155 156 def prompt_secret(self, message: str) -> str: 157 return rich.prompt.Prompt.ask( 158 message, password=True, console=self.console_err 159 ) 160 161 def load_key(self, key: str, password: str | None = None) -> Keypair: 162 try: 163 keypair = try_classic_load_key( 164 key, password, password_provider=self.password_manager 165 ) 166 return keypair 167 except PasswordNotProvidedError: 168 self.error(f"Password not provided for key '{key}'") 169 raise typer.Exit(code=1) 170 except InvalidPasswordError: 171 self.error(f"Incorrect password for key '{key}'") 172 raise typer.Exit(code=1) 173 174 def resolve_key_ss58( 175 self, key: Ss58Address | Keypair | str, password: str | None = None 176 ) -> Ss58Address: 177 try: 178 address = resolve_key_ss58_encrypted( 179 key, password, password_provider=self.password_manager 180 ) 181 return address 182 except PasswordNotProvidedError: 183 self.error(f"Password not provided for key '{key}'") 184 raise typer.Exit(code=1) 185 except InvalidPasswordError: 186 self.error(f"Incorrect password for key '{key}'") 187 raise typer.Exit(code=1)
74 def __init__( 75 self, 76 ctx: ExtendedContext, 77 settings: ComxSettings, 78 console: rich.console.Console, 79 console_err: rich.console.Console, 80 com_client: CommuneClient | None = None, 81 ): 82 self.ctx = ctx 83 self.settings = settings 84 self.console = console 85 self.console_err = console_err 86 self._com_client = com_client 87 self.password_manager = CliPasswordProvider( 88 self.settings, self.prompt_secret 89 )
98 def com_client(self) -> CommuneClient: 99 if self._com_client is None: 100 node_url = self.get_node_url() 101 self.info(f"Using node: {node_url}") 102 for _ in range(5): 103 try: 104 self._com_client = CommuneClient( 105 url=node_url, 106 num_connections=1, 107 wait_for_finalization=False, 108 timeout=65, 109 ) 110 except Exception: 111 self.info(f"Failed to connect to node: {node_url}") 112 node_url = self.get_node_url() 113 self.info(f"Will retry with node {node_url}") 114 continue 115 else: 116 break 117 if self._com_client is None: 118 raise ConnectionError("Could not connect to any node") 119 120 return self._com_client
161 def load_key(self, key: str, password: str | None = None) -> Keypair: 162 try: 163 keypair = try_classic_load_key( 164 key, password, password_provider=self.password_manager 165 ) 166 return keypair 167 except PasswordNotProvidedError: 168 self.error(f"Password not provided for key '{key}'") 169 raise typer.Exit(code=1) 170 except InvalidPasswordError: 171 self.error(f"Incorrect password for key '{key}'") 172 raise typer.Exit(code=1)
174 def resolve_key_ss58( 175 self, key: Ss58Address | Keypair | str, password: str | None = None 176 ) -> Ss58Address: 177 try: 178 address = resolve_key_ss58_encrypted( 179 key, password, password_provider=self.password_manager 180 ) 181 return address 182 except PasswordNotProvidedError: 183 self.error(f"Password not provided for key '{key}'") 184 raise typer.Exit(code=1) 185 except InvalidPasswordError: 186 self.error(f"Incorrect password for key '{key}'") 187 raise typer.Exit(code=1)
202def eprint(e: Any) -> None: 203 """ 204 Pretty prints an error. 205 """ 206 207 console = Console() 208 209 console.print(f"[bold red]ERROR: {e}", style="italic")
Pretty prints an error.
212def print_table_from_plain_dict( 213 result: Mapping[str, str | int | float | dict[Any, Any] | Ss58Address], 214 column_names: list[str], 215 console: Console, 216) -> None: 217 """ 218 Creates a table for a plain dictionary. 219 """ 220 221 table = Table(show_header=True, header_style="bold magenta") 222 223 for name in column_names: 224 table.add_column(name, style="white", vertical="middle") 225 226 # Add non-dictionary values to the table first 227 for key, value in result.items(): 228 if not isinstance(value, dict): 229 table.add_row(key, str(value)) 230 # Add subtables for nested dictionaries. 231 # Important to add after so that the display of the table is nicer. 232 for key, value in result.items(): 233 if isinstance(value, dict): 234 subtable = Table( 235 show_header=False, 236 padding=(0, 0, 0, 0), 237 border_style="bright_black", 238 ) 239 for subkey, subvalue in value.items(): 240 subtable.add_row(f"{subkey}: {subvalue}") 241 table.add_row(key, subtable) 242 243 console.print(table)
Creates a table for a plain dictionary.
246def print_table_standardize( 247 result: dict[str, list[Any]], console: Console 248) -> None: 249 """ 250 Creates a table for a standardized dictionary. 251 """ 252 table = Table(show_header=True, header_style="bold magenta") 253 254 for key in result.keys(): 255 table.add_column(key, style="white") 256 rows = [*result.values()] 257 zipped_rows = [list(column) for column in zip(*rows)] 258 for row in zipped_rows: 259 table.add_row(*row, style="white") 260 261 console.print(table)
Creates a table for a standardized dictionary.
264def transform_module_into( 265 to_exclude: list[str], 266 last_block: int, 267 immunity_period: int, 268 modules: list[ModuleInfoWithOptionalBalance], 269 tempo: int, 270): 271 mods = cast(list[dict[str, Any]], modules) 272 transformed_modules: list[dict[str, Any]] = [] 273 for mod in mods: 274 module = mod.copy() 275 module_regblock = module["regblock"] 276 module["in_immunity"] = module_regblock + immunity_period > last_block 277 278 for key in to_exclude: 279 del module[key] 280 module["stake"] = round(from_nano(module["stake"]), 2) # type: ignore 281 module["emission"] = round(from_horus(module["emission"], tempo), 4) # type: ignore 282 if module.get("balance") is not None: 283 module["balance"] = from_nano(module["balance"]) # type: ignore 284 else: 285 # user should not see None values 286 del module["balance"] 287 transformed_modules.append(module) 288 289 return transformed_modules
292def print_module_info( 293 client: CommuneClient, 294 modules: list[ModuleInfoWithOptionalBalance], 295 console: Console, 296 netuid: int, 297 title: str | None = None, 298) -> None: 299 """ 300 Prints information about a module. 301 """ 302 if not modules: 303 return 304 305 # Get the current block number, we will need this to caluclate immunity period 306 block = client.get_block() 307 if block: 308 last_block = block["header"]["number"] 309 else: 310 raise ValueError("Could not get block info") 311 312 # Get the immunity period on the netuid 313 immunity_period = client.get_immunity_period(netuid) 314 tempo = client.get_tempo(netuid) 315 316 # Transform the module dictionary to have immunity_period 317 table = Table( 318 show_header=True, 319 header_style="bold magenta", 320 box=box.DOUBLE_EDGE, 321 title=title, 322 caption_style="chartreuse3", 323 title_style="bold magenta", 324 ) 325 326 to_exclude = ["stake_from", "last_update", "regblock"] 327 tranformed_modules = transform_module_into( 328 to_exclude, last_block, immunity_period, modules, tempo 329 ) 330 331 sample_mod = tranformed_modules[0] 332 for key in sample_mod.keys(): 333 # add columns 334 table.add_column(key, style="white") 335 336 total_stake = 0 337 total_balance = 0 338 339 for mod in tranformed_modules: 340 total_stake += mod["stake"] 341 if mod.get("balance") is not None: 342 total_balance += mod["balance"] 343 344 row: list[str] = [] 345 for val in mod.values(): 346 row.append(str(val)) 347 table.add_row(*row) 348 349 table.caption = "total balance: " + f"{total_balance + total_stake}J" 350 console.print(table) 351 for _ in range(3): 352 console.print()
Prints information about a module.
361def tranform_network_params(params: NetworkParams): 362 """Transform network params to be human readable.""" 363 governance_config = params["governance_config"] 364 allocation = governance_config["proposal_reward_treasury_allocation"] 365 governance_config = cast(dict[str, Any], governance_config) 366 governance_config["proposal_reward_treasury_allocation"] = f"{allocation}%" 367 params_ = cast(dict[str, Any], params) 368 params_["governance_config"] = governance_config 369 general_params = dict_from_nano( 370 params_, 371 [ 372 "min_weight_stake", 373 "general_subnet_application_cost", 374 "subnet_registration_cost", 375 "proposal_cost", 376 "max_proposal_reward_treasury_allocation", 377 ], 378 ) 379 380 return general_params
Transform network params to be human readable.
387def remove_none_values(data: dict[T, V | None]) -> dict[T, V]: 388 """ 389 Removes key-value pairs from a dictionary where the value is None. 390 Works recursively for nested dictionaries. 391 """ 392 cleaned_data: dict[T, V] = {} 393 for key, value in data.items(): 394 if isinstance(value, dict): 395 cleaned_value = remove_none_values(value) # type: ignore 396 if cleaned_value is not None: # type: ignore 397 cleaned_data[key] = cleaned_value 398 elif value is not None: 399 cleaned_data[key] = value 400 return cleaned_data
Removes key-value pairs from a dictionary where the value is None. Works recursively for nested dictionaries.
403def transform_subnet_params(params: dict[int, SubnetParamsWithEmission]): 404 """Transform subnet params to be human readable.""" 405 params_ = cast(dict[int, Any], params) 406 display_params = remove_none_values(params_) 407 display_params = dict_from_nano( 408 display_params, 409 [ 410 "bonds_ma", 411 "min_burn", 412 "max_burn", 413 "min_weight_stake", 414 "proposal_cost", 415 "max_proposal_reward_treasury_allocation", 416 "min_validator_stake", 417 ], 418 ) 419 return display_params
Transform subnet params to be human readable.