communex.cli._common
1from dataclasses import dataclass 2from getpass import getpass 3from typing import Any, Mapping, cast, TypeVar 4 5import rich 6import typer 7from rich import box 8from rich.console import Console 9from rich.table import Table 10from typer import Context 11 12from communex._common import get_node_url 13from communex.balance import from_horus, from_nano, dict_from_nano 14from communex.client import CommuneClient 15from communex.types import ( 16 ModuleInfoWithOptionalBalance, 17 NetworkParams, 18 SubnetParamsWithEmission, 19 ) 20 21 22@dataclass 23class ExtraCtxData: 24 output_json: bool 25 use_testnet: bool 26 yes_to_all: bool 27 28 29class ExtendedContext(Context): 30 obj: ExtraCtxData 31 32 33@dataclass 34class CustomCtx: 35 ctx: ExtendedContext 36 console: rich.console.Console 37 console_err: rich.console.Console 38 _com_client: CommuneClient | None = None 39 40 def com_client(self) -> CommuneClient: 41 use_testnet = self.ctx.obj.use_testnet 42 if self._com_client is None: 43 node_url = get_node_url(None, use_testnet=use_testnet) 44 self.info(f"Using node: {node_url}") 45 for _ in range(5): 46 try: 47 self._com_client = CommuneClient( 48 url=node_url, num_connections=1, wait_for_finalization=False) 49 except Exception: 50 self.info(f"Failed to connect to node: {node_url}") 51 node_url = get_node_url(None, use_testnet=use_testnet) 52 self.info(f"Will retry with node {node_url}") 53 continue 54 if self._com_client is None: 55 raise ConnectionError("Could not connect to any node") 56 57 return self._com_client 58 59 def get_use_testnet(self) -> bool: 60 return self.ctx.obj.use_testnet 61 62 def output( 63 self, 64 message: str, 65 *args: tuple[Any, ...], 66 **kwargs: dict[str, Any], 67 ) -> None: 68 self.console.print(message, *args, **kwargs) # type: ignore 69 70 def info( 71 self, 72 message: str, 73 *args: tuple[Any, ...], 74 **kwargs: dict[str, Any], 75 ) -> None: 76 77 self.console_err.print(message, *args, **kwargs) # type: ignore 78 79 def error(self, message: str) -> None: 80 message = f"ERROR: {message}" 81 self.console_err.print(message, style="bold red") 82 83 def progress_status(self, message: str): 84 return self.console_err.status(message) 85 86 def confirm(self, message: str) -> bool: 87 if (self.ctx.obj.yes_to_all): 88 print(f"{message} (--yes)") 89 return True 90 return typer.confirm(message) 91 92 93def make_custom_context(ctx: typer.Context) -> CustomCtx: 94 return CustomCtx( 95 ctx=cast(ExtendedContext, ctx), 96 console=Console(), 97 console_err=Console(stderr=True), 98 ) 99 100 101# Formatting 102 103 104def eprint(e: Any) -> None: 105 """ 106 Pretty prints an error. 107 """ 108 109 console = Console() 110 111 console.print(f"[bold red]ERROR: {e}", style="italic") 112 113 114def print_table_from_plain_dict( 115 result: Mapping[str, str | int | float | dict[Any, Any]], column_names: list[str], console: Console 116) -> None: 117 """ 118 Creates a table for a plain dictionary. 119 """ 120 121 table = Table(show_header=True, header_style="bold magenta") 122 123 for name in column_names: 124 table.add_column(name, style="white", vertical="middle") 125 126 # Add non-dictionary values to the table first 127 for key, value in result.items(): 128 if not isinstance(value, dict): 129 table.add_row(key, str(value)) 130 # Add subtables for nested dictionaries. 131 # Important to add after so that the display of the table is nicer. 132 for key, value in result.items(): 133 if isinstance(value, dict): 134 subtable = Table(show_header=False, padding=(0, 0, 0, 0), border_style="bright_black") 135 for subkey, subvalue in value.items(): 136 subtable.add_row(f"{subkey}: {subvalue}") 137 table.add_row(key, subtable) 138 139 console.print(table) 140 141 142 143def print_table_standardize(result: dict[str, list[Any]], console: Console) -> None: 144 """ 145 Creates a table for a standardized dictionary. 146 """ 147 table = Table(show_header=True, header_style="bold magenta") 148 149 for key in result.keys(): 150 table.add_column(key, style="white") 151 rows = [*result.values()] 152 zipped_rows = [list(column) for column in zip(*rows)] 153 for row in zipped_rows: 154 table.add_row(*row, style="white") 155 156 console.print(table) 157 158 159def transform_module_into( 160 to_exclude: list[str], last_block: int, 161 immunity_period: int, modules: list[ModuleInfoWithOptionalBalance], 162 tempo: int 163): 164 mods = cast(list[dict[str, Any]], modules) 165 transformed_modules: list[dict[str, Any]] = [] 166 for mod in mods: 167 module = mod.copy() 168 module_regblock = module["regblock"] 169 module["in_immunity"] = module_regblock + immunity_period > last_block 170 171 for key in to_exclude: 172 del module[key] 173 module["stake"] = round(from_nano(module["stake"]), 2) # type: ignore 174 module["emission"] = round( 175 from_horus( 176 module["emission"], tempo 177 ), 178 4 179 ) # type: ignore 180 if module.get("balance") is not None: 181 module["balance"] = from_nano(module["balance"]) # type: ignore 182 else: 183 # user should not see None values 184 del module["balance"] 185 transformed_modules.append(module) 186 187 return transformed_modules 188 189 190def print_module_info( 191 client: CommuneClient, 192 modules: list[ModuleInfoWithOptionalBalance], 193 console: Console, 194 netuid: int, 195 title: str | None = None, 196) -> None: 197 """ 198 Prints information about a module. 199 """ 200 if not modules: 201 return 202 203 # Get the current block number, we will need this to caluclate immunity period 204 block = client.get_block() 205 if block: 206 last_block = block["header"]["number"] 207 else: 208 raise ValueError("Could not get block info") 209 210 # Get the immunity period on the netuid 211 immunity_period = client.get_immunity_period(netuid) 212 tempo = client.get_tempo(netuid) 213 214 # Transform the module dictionary to have immunity_period 215 table = Table( 216 show_header=True, header_style="bold magenta", 217 box=box.DOUBLE_EDGE, title=title, 218 caption_style="chartreuse3", 219 title_style="bold magenta", 220 221 ) 222 223 to_exclude = ["stake_from", "last_update", "regblock"] 224 tranformed_modules = transform_module_into( 225 to_exclude, last_block, immunity_period, modules, tempo 226 ) 227 228 sample_mod = tranformed_modules[0] 229 for key in sample_mod.keys(): 230 # add columns 231 table.add_column(key, style="white") 232 233 total_stake = 0 234 total_balance = 0 235 236 for mod in tranformed_modules: 237 total_stake += mod["stake"] 238 if mod.get("balance") is not None: 239 total_balance += mod["balance"] 240 241 row: list[str] = [] 242 for val in mod.values(): 243 row.append(str(val)) 244 table.add_row(*row) 245 246 table.caption = "total balance: " + f"{total_balance + total_stake}J" 247 console.print(table) 248 for _ in range(3): 249 console.print() 250 251 252def get_universal_password(ctx: CustomCtx) -> str: 253 ctx.info("Please provide the universal password for all keys") 254 universal_password = getpass() 255 return universal_password 256 257 258def tranform_network_params(params: NetworkParams): 259 """Transform network params to be human readable.""" 260 governance_config = params["governance_config"] 261 allocation = governance_config["proposal_reward_treasury_allocation"] 262 governance_config = cast(dict[str, Any], governance_config) 263 governance_config["proposal_reward_treasury_allocation"] = f"{allocation}%" 264 params_ = cast(dict[str, Any], params) 265 params_["governance_config"] = governance_config 266 general_params = dict_from_nano(params_, [ 267 "min_weight_stake", 268 "general_subnet_application_cost", 269 "subnet_registration_cost", 270 "proposal_cost", 271 "max_proposal_reward_treasury_allocation", 272 ]) 273 274 return general_params 275 276 277T = TypeVar("T") 278V = TypeVar("V") 279def remove_none_values(data: dict[T, V | None]) -> dict[T, V]: 280 """ 281 Removes key-value pairs from a dictionary where the value is None. 282 Works recursively for nested dictionaries. 283 """ 284 cleaned_data: dict[T, V] = {} 285 for key, value in data.items(): 286 if isinstance(value, dict): 287 cleaned_value = remove_none_values(value) # type: ignore 288 if cleaned_value is not None: # type: ignore 289 cleaned_data[key] = cleaned_value 290 elif value is not None: 291 cleaned_data[key] = value 292 return cleaned_data 293 294 295 296def transform_subnet_params(params: SubnetParamsWithEmission): 297 """Transform subnet params to be human readable.""" 298 params_ = cast(dict[str, Any], params) 299 display_params = remove_none_values(params_) 300 display_params = dict_from_nano( 301 display_params, [ 302 "bonds_ma", 303 "min_burn", 304 "max_burn", 305 "min_weight_stake", 306 "proposal_cost", 307 "max_proposal_reward_treasury_allocation", 308 ] 309 ) 310 return display_params
The context is a special internal object that holds state relevant for the script execution at every single level. It's normally invisible to commands unless they opt-in to getting access to it.
The context is useful as it can pass internal objects around and can control special execution features such as reading data from environment variables.
A context can be used as context manager in which case it will call
close()
on teardown.
Parameters
- command: the command class for this context.
- parent: the parent context.
- info_name: the info name for this invocation. Generally this is the most descriptive name for the script or command. For the toplevel script it is usually the name of the script, for commands below it it's the name of the script.
- obj: an arbitrary object of user data.
- auto_envvar_prefix: the prefix to use for automatic environment
variables. If this is
None
then reading from environment variables is disabled. This does not affect manually set environment variables which are always read. - default_map: a dictionary (like object) with default values for parameters.
- terminal_width: the width of the terminal. The default is inherit from parent context. If no context defines the terminal width then auto detection will be applied.
- max_content_width: the maximum width for content rendered by Click (this currently only affects help pages). This defaults to 80 characters if not overridden. In other words: even if the terminal is larger than that, Click will not format things wider than 80 characters by default. In addition to that, formatters might add some safety mapping on the right.
- resilient_parsing: if this flag is enabled then Click will parse without any interactivity or callback invocation. Default values will also be ignored. This is useful for implementing things such as completion support.
- allow_extra_args: if this is set to
True
then extra arguments at the end will not raise an error and will be kept on the context. The default is to inherit from the command. - allow_interspersed_args: if this is set to
False
then options and arguments cannot be mixed. The default is to inherit from the command. - ignore_unknown_options: instructs click to ignore options it does not know and keeps them for later processing.
- help_option_names: optionally a list of strings that define how
the default help parameter is named. The
default is
['--help']
. - token_normalize_func: an optional function that is used to normalize tokens (options, choices, etc.). This for instance can be used to implement case insensitive behavior.
- color: controls if the terminal supports ANSI colors or not. The default is autodetection. This is only needed if ANSI codes are used in texts that Click prints which is by default not the case. This for instance would affect help output.
- show_default: Show the default value for commands. If this
value is not set, it defaults to the value from the parent
context.
Command.show_default
overrides this default for the specific command.
Changed in version 8.1:
The show_default
parameter is overridden by
Command.show_default
, instead of the other way around.
Changed in version 8.0:
The show_default
parameter defaults to the value from the
parent context.
Changed in version 7.1:
Added the show_default
parameter.
Changed in version 4.0:
Added the color
, ignore_unknown_options
, and
max_content_width
parameters.
Changed in version 3.0:
Added the allow_extra_args
and allow_interspersed_args
parameters.
Changed in version 2.0:
Added the resilient_parsing
, help_option_names
, and
token_normalize_func
parameters.
Inherited Members
- click.core.Context
- Context
- formatter_class
- parent
- command
- info_name
- params
- args
- protected_args
- default_map
- invoked_subcommand
- terminal_width
- max_content_width
- allow_extra_args
- allow_interspersed_args
- ignore_unknown_options
- help_option_names
- token_normalize_func
- resilient_parsing
- auto_envvar_prefix
- color
- show_default
- to_info_dict
- scope
- meta
- make_formatter
- with_resource
- call_on_close
- close
- command_path
- find_root
- find_object
- ensure_object
- lookup_default
- fail
- abort
- exit
- get_usage
- get_help
- invoke
- forward
- set_parameter_source
- get_parameter_source
34@dataclass 35class CustomCtx: 36 ctx: ExtendedContext 37 console: rich.console.Console 38 console_err: rich.console.Console 39 _com_client: CommuneClient | None = None 40 41 def com_client(self) -> CommuneClient: 42 use_testnet = self.ctx.obj.use_testnet 43 if self._com_client is None: 44 node_url = get_node_url(None, use_testnet=use_testnet) 45 self.info(f"Using node: {node_url}") 46 for _ in range(5): 47 try: 48 self._com_client = CommuneClient( 49 url=node_url, num_connections=1, wait_for_finalization=False) 50 except Exception: 51 self.info(f"Failed to connect to node: {node_url}") 52 node_url = get_node_url(None, use_testnet=use_testnet) 53 self.info(f"Will retry with node {node_url}") 54 continue 55 if self._com_client is None: 56 raise ConnectionError("Could not connect to any node") 57 58 return self._com_client 59 60 def get_use_testnet(self) -> bool: 61 return self.ctx.obj.use_testnet 62 63 def output( 64 self, 65 message: str, 66 *args: tuple[Any, ...], 67 **kwargs: dict[str, Any], 68 ) -> None: 69 self.console.print(message, *args, **kwargs) # type: ignore 70 71 def info( 72 self, 73 message: str, 74 *args: tuple[Any, ...], 75 **kwargs: dict[str, Any], 76 ) -> None: 77 78 self.console_err.print(message, *args, **kwargs) # type: ignore 79 80 def error(self, message: str) -> None: 81 message = f"ERROR: {message}" 82 self.console_err.print(message, style="bold red") 83 84 def progress_status(self, message: str): 85 return self.console_err.status(message) 86 87 def confirm(self, message: str) -> bool: 88 if (self.ctx.obj.yes_to_all): 89 print(f"{message} (--yes)") 90 return True 91 return typer.confirm(message)
41 def com_client(self) -> CommuneClient: 42 use_testnet = self.ctx.obj.use_testnet 43 if self._com_client is None: 44 node_url = get_node_url(None, use_testnet=use_testnet) 45 self.info(f"Using node: {node_url}") 46 for _ in range(5): 47 try: 48 self._com_client = CommuneClient( 49 url=node_url, num_connections=1, wait_for_finalization=False) 50 except Exception: 51 self.info(f"Failed to connect to node: {node_url}") 52 node_url = get_node_url(None, use_testnet=use_testnet) 53 self.info(f"Will retry with node {node_url}") 54 continue 55 if self._com_client is None: 56 raise ConnectionError("Could not connect to any node") 57 58 return self._com_client
105def eprint(e: Any) -> None: 106 """ 107 Pretty prints an error. 108 """ 109 110 console = Console() 111 112 console.print(f"[bold red]ERROR: {e}", style="italic")
Pretty prints an error.
115def print_table_from_plain_dict( 116 result: Mapping[str, str | int | float | dict[Any, Any]], column_names: list[str], console: Console 117) -> None: 118 """ 119 Creates a table for a plain dictionary. 120 """ 121 122 table = Table(show_header=True, header_style="bold magenta") 123 124 for name in column_names: 125 table.add_column(name, style="white", vertical="middle") 126 127 # Add non-dictionary values to the table first 128 for key, value in result.items(): 129 if not isinstance(value, dict): 130 table.add_row(key, str(value)) 131 # Add subtables for nested dictionaries. 132 # Important to add after so that the display of the table is nicer. 133 for key, value in result.items(): 134 if isinstance(value, dict): 135 subtable = Table(show_header=False, padding=(0, 0, 0, 0), border_style="bright_black") 136 for subkey, subvalue in value.items(): 137 subtable.add_row(f"{subkey}: {subvalue}") 138 table.add_row(key, subtable) 139 140 console.print(table)
Creates a table for a plain dictionary.
144def print_table_standardize(result: dict[str, list[Any]], console: Console) -> None: 145 """ 146 Creates a table for a standardized dictionary. 147 """ 148 table = Table(show_header=True, header_style="bold magenta") 149 150 for key in result.keys(): 151 table.add_column(key, style="white") 152 rows = [*result.values()] 153 zipped_rows = [list(column) for column in zip(*rows)] 154 for row in zipped_rows: 155 table.add_row(*row, style="white") 156 157 console.print(table)
Creates a table for a standardized dictionary.
160def transform_module_into( 161 to_exclude: list[str], last_block: int, 162 immunity_period: int, modules: list[ModuleInfoWithOptionalBalance], 163 tempo: int 164): 165 mods = cast(list[dict[str, Any]], modules) 166 transformed_modules: list[dict[str, Any]] = [] 167 for mod in mods: 168 module = mod.copy() 169 module_regblock = module["regblock"] 170 module["in_immunity"] = module_regblock + immunity_period > last_block 171 172 for key in to_exclude: 173 del module[key] 174 module["stake"] = round(from_nano(module["stake"]), 2) # type: ignore 175 module["emission"] = round( 176 from_horus( 177 module["emission"], tempo 178 ), 179 4 180 ) # type: ignore 181 if module.get("balance") is not None: 182 module["balance"] = from_nano(module["balance"]) # type: ignore 183 else: 184 # user should not see None values 185 del module["balance"] 186 transformed_modules.append(module) 187 188 return transformed_modules
191def print_module_info( 192 client: CommuneClient, 193 modules: list[ModuleInfoWithOptionalBalance], 194 console: Console, 195 netuid: int, 196 title: str | None = None, 197) -> None: 198 """ 199 Prints information about a module. 200 """ 201 if not modules: 202 return 203 204 # Get the current block number, we will need this to caluclate immunity period 205 block = client.get_block() 206 if block: 207 last_block = block["header"]["number"] 208 else: 209 raise ValueError("Could not get block info") 210 211 # Get the immunity period on the netuid 212 immunity_period = client.get_immunity_period(netuid) 213 tempo = client.get_tempo(netuid) 214 215 # Transform the module dictionary to have immunity_period 216 table = Table( 217 show_header=True, header_style="bold magenta", 218 box=box.DOUBLE_EDGE, title=title, 219 caption_style="chartreuse3", 220 title_style="bold magenta", 221 222 ) 223 224 to_exclude = ["stake_from", "last_update", "regblock"] 225 tranformed_modules = transform_module_into( 226 to_exclude, last_block, immunity_period, modules, tempo 227 ) 228 229 sample_mod = tranformed_modules[0] 230 for key in sample_mod.keys(): 231 # add columns 232 table.add_column(key, style="white") 233 234 total_stake = 0 235 total_balance = 0 236 237 for mod in tranformed_modules: 238 total_stake += mod["stake"] 239 if mod.get("balance") is not None: 240 total_balance += mod["balance"] 241 242 row: list[str] = [] 243 for val in mod.values(): 244 row.append(str(val)) 245 table.add_row(*row) 246 247 table.caption = "total balance: " + f"{total_balance + total_stake}J" 248 console.print(table) 249 for _ in range(3): 250 console.print()
Prints information about a module.
259def tranform_network_params(params: NetworkParams): 260 """Transform network params to be human readable.""" 261 governance_config = params["governance_config"] 262 allocation = governance_config["proposal_reward_treasury_allocation"] 263 governance_config = cast(dict[str, Any], governance_config) 264 governance_config["proposal_reward_treasury_allocation"] = f"{allocation}%" 265 params_ = cast(dict[str, Any], params) 266 params_["governance_config"] = governance_config 267 general_params = dict_from_nano(params_, [ 268 "min_weight_stake", 269 "general_subnet_application_cost", 270 "subnet_registration_cost", 271 "proposal_cost", 272 "max_proposal_reward_treasury_allocation", 273 ]) 274 275 return general_params
Transform network params to be human readable.
280def remove_none_values(data: dict[T, V | None]) -> dict[T, V]: 281 """ 282 Removes key-value pairs from a dictionary where the value is None. 283 Works recursively for nested dictionaries. 284 """ 285 cleaned_data: dict[T, V] = {} 286 for key, value in data.items(): 287 if isinstance(value, dict): 288 cleaned_value = remove_none_values(value) # type: ignore 289 if cleaned_value is not None: # type: ignore 290 cleaned_data[key] = cleaned_value 291 elif value is not None: 292 cleaned_data[key] = value 293 return cleaned_data
Removes key-value pairs from a dictionary where the value is None. Works recursively for nested dictionaries.
297def transform_subnet_params(params: SubnetParamsWithEmission): 298 """Transform subnet params to be human readable.""" 299 params_ = cast(dict[str, Any], params) 300 display_params = remove_none_values(params_) 301 display_params = dict_from_nano( 302 display_params, [ 303 "bonds_ma", 304 "min_burn", 305 "max_burn", 306 "min_weight_stake", 307 "proposal_cost", 308 "max_proposal_reward_treasury_allocation", 309 ] 310 ) 311 return display_params
Transform subnet params to be human readable.