Edit on GitHub

communex.cli._common

  1from dataclasses import dataclass
  2from getpass import getpass
  3from typing import Any, Callable, Mapping, TypeVar, cast
  4
  5import rich
  6import rich.prompt
  7import typer
  8from rich import box
  9from rich.console import Console
 10from rich.table import Table
 11from substrateinterface import Keypair
 12from typer import Context
 13
 14from communex._common import ComxSettings, get_node_url
 15from communex.balance import dict_from_nano, from_horus, from_nano
 16from communex.client import CommuneClient
 17from communex.compat.key import resolve_key_ss58_encrypted, try_classic_load_key
 18from communex.errors import InvalidPasswordError, PasswordNotProvidedError
 19from communex.types import (
 20    ModuleInfoWithOptionalBalance,
 21    NetworkParams,
 22    Ss58Address,
 23    SubnetParamsWithEmission,
 24)
 25
 26
 27@dataclass
 28class ExtraCtxData:
 29    output_json: bool
 30    use_testnet: bool
 31    yes_to_all: bool
 32
 33
 34class ExtendedContext(Context):
 35    obj: ExtraCtxData
 36
 37
 38class CliPasswordProvider:
 39    def __init__(
 40        self, settings: ComxSettings, prompt_secret: Callable[[str], str]
 41    ):
 42        self.settings = settings
 43        self.prompt_secret = prompt_secret
 44
 45    def get_password(self, key_name: str) -> str | None:
 46        key_map = self.settings.KEY_PASSWORDS
 47        if key_map is not None:
 48            password = key_map.get(key_name)
 49            if password is not None:
 50                return password.get_secret_value()
 51        # fallback to universal password
 52        password = self.settings.UNIVERSAL_PASSWORD
 53        if password is not None:
 54            return password.get_secret_value()
 55        else:
 56            return None
 57
 58    def ask_password(self, key_name: str) -> str:
 59        password = self.prompt_secret(
 60            f"Please provide the password for the key '{key_name}'"
 61        )
 62        return password
 63
 64
 65class CustomCtx:
 66    ctx: ExtendedContext
 67    settings: ComxSettings
 68    console: rich.console.Console
 69    console_err: rich.console.Console
 70    password_manager: CliPasswordProvider
 71    _com_client: CommuneClient | None = None
 72
 73    def __init__(
 74        self,
 75        ctx: ExtendedContext,
 76        settings: ComxSettings,
 77        console: rich.console.Console,
 78        console_err: rich.console.Console,
 79        com_client: CommuneClient | None = None,
 80    ):
 81        self.ctx = ctx
 82        self.settings = settings
 83        self.console = console
 84        self.console_err = console_err
 85        self._com_client = com_client
 86        self.password_manager = CliPasswordProvider(
 87            self.settings, self.prompt_secret
 88        )
 89
 90    def get_use_testnet(self) -> bool:
 91        return self.ctx.obj.use_testnet
 92
 93    def get_node_url(self) -> str:
 94        use_testnet = self.get_use_testnet()
 95        return get_node_url(self.settings, use_testnet=use_testnet)
 96
 97    def com_client(self) -> CommuneClient:
 98        if self._com_client is None:
 99            node_url = self.get_node_url()
100            self.info(f"Using node: {node_url}")
101            for _ in range(5):
102                try:
103                    self._com_client = CommuneClient(
104                        url=node_url,
105                        num_connections=1,
106                        wait_for_finalization=False,
107                        timeout=65,
108                    )
109                except Exception:
110                    self.info(f"Failed to connect to node: {node_url}")
111                    node_url = self.get_node_url()
112                    self.info(f"Will retry with node {node_url}")
113                    continue
114                else:
115                    break
116            if self._com_client is None:
117                raise ConnectionError("Could not connect to any node")
118
119        return self._com_client
120
121    def output(
122        self,
123        message: str,
124        *args: tuple[Any, ...],
125        **kwargs: dict[str, Any],
126    ) -> None:
127        self.console.print(message, *args, **kwargs)  # type: ignore
128
129    def info(
130        self,
131        message: str,
132        *args: tuple[Any, ...],
133        **kwargs: dict[str, Any],
134    ) -> None:
135        self.console_err.print(message, *args, **kwargs)  # type: ignore
136
137    def error(
138        self,
139        message: str,
140        *args: tuple[Any, ...],
141        **kwargs: dict[str, Any],
142    ) -> None:
143        message = f"ERROR: {message}"
144        self.console_err.print(message, *args, style="bold red", **kwargs)  # type: ignore
145
146    def progress_status(self, message: str):
147        return self.console_err.status(message)
148
149    def confirm(self, message: str) -> bool:
150        if self.ctx.obj.yes_to_all:
151            print(f"{message} (--yes)")
152            return True
153        return typer.confirm(message, err=True)
154
155    def prompt_secret(self, message: str) -> str:
156        return rich.prompt.Prompt.ask(
157            message, password=True, console=self.console_err
158        )
159
160    def load_key(self, key: str, password: str | None = None) -> Keypair:
161        try:
162            keypair = try_classic_load_key(
163                key, password, password_provider=self.password_manager
164            )
165            return keypair
166        except PasswordNotProvidedError:
167            self.error(f"Password not provided for key '{key}'")
168            raise typer.Exit(code=1)
169        except InvalidPasswordError:
170            self.error(f"Incorrect password for key '{key}'")
171            raise typer.Exit(code=1)
172
173    def resolve_key_ss58(
174        self, key: Ss58Address | Keypair | str, password: str | None = None
175    ) -> Ss58Address:
176        try:
177            address = resolve_key_ss58_encrypted(
178                key, password, password_provider=self.password_manager
179            )
180            return address
181        except PasswordNotProvidedError:
182            self.error(f"Password not provided for key '{key}'")
183            raise typer.Exit(code=1)
184        except InvalidPasswordError:
185            self.error(f"Incorrect password for key '{key}'")
186            raise typer.Exit(code=1)
187
188
189def make_custom_context(ctx: typer.Context) -> CustomCtx:
190    return CustomCtx(
191        ctx=cast(ExtendedContext, ctx),  # TODO: better check
192        settings=ComxSettings(),
193        console=Console(),
194        console_err=Console(stderr=True),
195    )
196
197
198# Formatting
199
200
201def eprint(e: Any) -> None:
202    """
203    Pretty prints an error.
204    """
205
206    console = Console()
207
208    console.print(f"[bold red]ERROR: {e}", style="italic")
209
210
211def print_table_from_plain_dict(
212    result: Mapping[str, str | int | float | dict[Any, Any] | Ss58Address],
213    column_names: list[str],
214    console: Console,
215) -> None:
216    """
217    Creates a table for a plain dictionary.
218    """
219
220    table = Table(show_header=True, header_style="bold magenta")
221
222    for name in column_names:
223        table.add_column(name, style="white", vertical="middle")
224
225    # Add non-dictionary values to the table first
226    for key, value in result.items():
227        if not isinstance(value, dict):
228            table.add_row(key, str(value))
229    # Add subtables for nested dictionaries.
230    # Important to add after so that the display of the table is nicer.
231    for key, value in result.items():
232        if isinstance(value, dict):
233            subtable = Table(
234                show_header=False,
235                padding=(0, 0, 0, 0),
236                border_style="bright_black",
237            )
238            for subkey, subvalue in value.items():
239                subtable.add_row(f"{subkey}: {subvalue}")
240            table.add_row(key, subtable)
241
242    console.print(table)
243
244
245def print_table_standardize(
246    result: dict[str, list[Any]], console: Console
247) -> None:
248    """
249    Creates a table for a standardized dictionary.
250    """
251    table = Table(show_header=True, header_style="bold magenta")
252
253    for key in result.keys():
254        table.add_column(key, style="white")
255    rows = [*result.values()]
256    zipped_rows = [list(column) for column in zip(*rows)]
257    for row in zipped_rows:
258        table.add_row(*row, style="white")
259
260    console.print(table)
261
262
263def transform_module_into(
264    to_exclude: list[str],
265    last_block: int,
266    immunity_period: int,
267    modules: list[ModuleInfoWithOptionalBalance],
268    tempo: int,
269):
270    mods = cast(list[dict[str, Any]], modules)
271    transformed_modules: list[dict[str, Any]] = []
272    for mod in mods:
273        module = mod.copy()
274        module_regblock = module["regblock"]
275        module["in_immunity"] = module_regblock + immunity_period > last_block
276
277        for key in to_exclude:
278            del module[key]
279        module["stake"] = round(from_nano(module["stake"]), 2)  # type: ignore
280        module["emission"] = round(from_horus(module["emission"], tempo), 4)  # type: ignore
281        if module.get("balance") is not None:
282            module["balance"] = from_nano(module["balance"])  # type: ignore
283        else:
284            # user should not see None values
285            del module["balance"]
286        transformed_modules.append(module)
287
288    return transformed_modules
289
290
291def print_module_info(
292    client: CommuneClient,
293    modules: list[ModuleInfoWithOptionalBalance],
294    console: Console,
295    netuid: int,
296    title: str | None = None,
297) -> None:
298    """
299    Prints information about a module.
300    """
301    if not modules:
302        return
303
304    # Get the current block number, we will need this to caluclate immunity period
305    block = client.get_block()
306    if block:
307        last_block = block["header"]["number"]
308    else:
309        raise ValueError("Could not get block info")
310
311    # Get the immunity period on the netuid
312    immunity_period = client.get_immunity_period(netuid)
313    tempo = client.get_tempo(netuid)
314
315    # Transform the module dictionary to have immunity_period
316    table = Table(
317        show_header=True,
318        header_style="bold magenta",
319        box=box.DOUBLE_EDGE,
320        title=title,
321        caption_style="chartreuse3",
322        title_style="bold magenta",
323    )
324
325    to_exclude = ["stake_from", "last_update", "regblock"]
326    tranformed_modules = transform_module_into(
327        to_exclude, last_block, immunity_period, modules, tempo
328    )
329
330    sample_mod = tranformed_modules[0]
331    for key in sample_mod.keys():
332        # add columns
333        table.add_column(key, style="white")
334
335    total_stake = 0
336    total_balance = 0
337
338    for mod in tranformed_modules:
339        total_stake += mod["stake"]
340        if mod.get("balance") is not None:
341            total_balance += mod["balance"]
342
343        row: list[str] = []
344        for val in mod.values():
345            row.append(str(val))
346        table.add_row(*row)
347
348    table.caption = "total balance: " + f"{total_balance + total_stake}J"
349    console.print(table)
350    for _ in range(3):
351        console.print()
352
353
354def get_universal_password(ctx: CustomCtx) -> str:
355    ctx.info("Please provide the universal password for all keys")
356    universal_password = getpass()
357    return universal_password
358
359
360def tranform_network_params(params: NetworkParams):
361    """Transform network params to be human readable."""
362    governance_config = params["governance_config"]
363    allocation = governance_config["proposal_reward_treasury_allocation"]
364    governance_config = cast(dict[str, Any], governance_config)
365    governance_config["proposal_reward_treasury_allocation"] = f"{allocation}%"
366    params_ = cast(dict[str, Any], params)
367    params_["governance_config"] = governance_config
368    general_params = dict_from_nano(
369        params_,
370        [
371            "min_weight_stake",
372            "general_subnet_application_cost",
373            "subnet_registration_cost",
374            "proposal_cost",
375            "max_proposal_reward_treasury_allocation",
376        ],
377    )
378
379    return general_params
380
381
382T = TypeVar("T")
383V = TypeVar("V")
384
385
386def remove_none_values(data: dict[T, V | None]) -> dict[T, V]:
387    """
388    Removes key-value pairs from a dictionary where the value is None.
389    Works recursively for nested dictionaries.
390    """
391    cleaned_data: dict[T, V] = {}
392    for key, value in data.items():
393        if isinstance(value, dict):
394            cleaned_value = remove_none_values(value)  # type: ignore
395            if cleaned_value is not None:  # type: ignore
396                cleaned_data[key] = cleaned_value
397        elif value is not None:
398            cleaned_data[key] = value
399    return cleaned_data
400
401
402def transform_subnet_params(params: dict[int, SubnetParamsWithEmission]):
403    """Transform subnet params to be human readable."""
404    params_ = cast(dict[int, Any], params)
405    display_params = remove_none_values(params_)
406    display_params = dict_from_nano(
407        display_params,
408        [
409            "bonds_ma",
410            "min_burn",
411            "max_burn",
412            "min_weight_stake",
413            "proposal_cost",
414            "max_proposal_reward_treasury_allocation",
415            "min_validator_stake",
416        ],
417    )
418    return display_params
@dataclass
class ExtraCtxData:
28@dataclass
29class ExtraCtxData:
30    output_json: bool
31    use_testnet: bool
32    yes_to_all: bool
ExtraCtxData(output_json: bool, use_testnet: bool, yes_to_all: bool)
output_json: bool
use_testnet: bool
yes_to_all: bool
class ExtendedContext(typer.models.Context):
35class ExtendedContext(Context):
36    obj: ExtraCtxData

The context is a special internal object that holds state relevant for the script execution at every single level. It's normally invisible to commands unless they opt-in to getting access to it.

The context is useful as it can pass internal objects around and can control special execution features such as reading data from environment variables.

A context can be used as context manager in which case it will call close() on teardown.

Parameters
  • command: the command class for this context.
  • parent: the parent context.
  • info_name: the info name for this invocation. Generally this is the most descriptive name for the script or command. For the toplevel script it is usually the name of the script, for commands below it it's the name of the script.
  • obj: an arbitrary object of user data.
  • auto_envvar_prefix: the prefix to use for automatic environment variables. If this is None then reading from environment variables is disabled. This does not affect manually set environment variables which are always read.
  • default_map: a dictionary (like object) with default values for parameters.
  • terminal_width: the width of the terminal. The default is inherit from parent context. If no context defines the terminal width then auto detection will be applied.
  • max_content_width: the maximum width for content rendered by Click (this currently only affects help pages). This defaults to 80 characters if not overridden. In other words: even if the terminal is larger than that, Click will not format things wider than 80 characters by default. In addition to that, formatters might add some safety mapping on the right.
  • resilient_parsing: if this flag is enabled then Click will parse without any interactivity or callback invocation. Default values will also be ignored. This is useful for implementing things such as completion support.
  • allow_extra_args: if this is set to True then extra arguments at the end will not raise an error and will be kept on the context. The default is to inherit from the command.
  • allow_interspersed_args: if this is set to False then options and arguments cannot be mixed. The default is to inherit from the command.
  • ignore_unknown_options: instructs click to ignore options it does not know and keeps them for later processing.
  • help_option_names: optionally a list of strings that define how the default help parameter is named. The default is ['--help'].
  • token_normalize_func: an optional function that is used to normalize tokens (options, choices, etc.). This for instance can be used to implement case insensitive behavior.
  • color: controls if the terminal supports ANSI colors or not. The default is autodetection. This is only needed if ANSI codes are used in texts that Click prints which is by default not the case. This for instance would affect help output.
  • show_default: Show the default value for commands. If this value is not set, it defaults to the value from the parent context. Command.show_default overrides this default for the specific command.

Changed in version 8.1: The show_default parameter is overridden by Command.show_default, instead of the other way around.

Changed in version 8.0: The show_default parameter defaults to the value from the parent context.

Changed in version 7.1: Added the show_default parameter.

Changed in version 4.0: Added the color, ignore_unknown_options, and max_content_width parameters.

Changed in version 3.0: Added the allow_extra_args and allow_interspersed_args parameters.

Changed in version 2.0: Added the resilient_parsing, help_option_names, and token_normalize_func parameters.

Inherited Members
click.core.Context
Context
formatter_class
parent
command
info_name
params
args
protected_args
default_map
invoked_subcommand
terminal_width
max_content_width
allow_extra_args
allow_interspersed_args
ignore_unknown_options
help_option_names
token_normalize_func
resilient_parsing
auto_envvar_prefix
color
show_default
to_info_dict
scope
meta
make_formatter
with_resource
call_on_close
close
command_path
find_root
find_object
ensure_object
lookup_default
fail
abort
exit
get_usage
get_help
invoke
forward
set_parameter_source
get_parameter_source
class CliPasswordProvider:
39class CliPasswordProvider:
40    def __init__(
41        self, settings: ComxSettings, prompt_secret: Callable[[str], str]
42    ):
43        self.settings = settings
44        self.prompt_secret = prompt_secret
45
46    def get_password(self, key_name: str) -> str | None:
47        key_map = self.settings.KEY_PASSWORDS
48        if key_map is not None:
49            password = key_map.get(key_name)
50            if password is not None:
51                return password.get_secret_value()
52        # fallback to universal password
53        password = self.settings.UNIVERSAL_PASSWORD
54        if password is not None:
55            return password.get_secret_value()
56        else:
57            return None
58
59    def ask_password(self, key_name: str) -> str:
60        password = self.prompt_secret(
61            f"Please provide the password for the key '{key_name}'"
62        )
63        return password
CliPasswordProvider( settings: communex._common.ComxSettings, prompt_secret: Callable[[str], str])
40    def __init__(
41        self, settings: ComxSettings, prompt_secret: Callable[[str], str]
42    ):
43        self.settings = settings
44        self.prompt_secret = prompt_secret
settings
prompt_secret
def get_password(self, key_name: str) -> str | None:
46    def get_password(self, key_name: str) -> str | None:
47        key_map = self.settings.KEY_PASSWORDS
48        if key_map is not None:
49            password = key_map.get(key_name)
50            if password is not None:
51                return password.get_secret_value()
52        # fallback to universal password
53        password = self.settings.UNIVERSAL_PASSWORD
54        if password is not None:
55            return password.get_secret_value()
56        else:
57            return None
def ask_password(self, key_name: str) -> str:
59    def ask_password(self, key_name: str) -> str:
60        password = self.prompt_secret(
61            f"Please provide the password for the key '{key_name}'"
62        )
63        return password
class CustomCtx:
 66class CustomCtx:
 67    ctx: ExtendedContext
 68    settings: ComxSettings
 69    console: rich.console.Console
 70    console_err: rich.console.Console
 71    password_manager: CliPasswordProvider
 72    _com_client: CommuneClient | None = None
 73
 74    def __init__(
 75        self,
 76        ctx: ExtendedContext,
 77        settings: ComxSettings,
 78        console: rich.console.Console,
 79        console_err: rich.console.Console,
 80        com_client: CommuneClient | None = None,
 81    ):
 82        self.ctx = ctx
 83        self.settings = settings
 84        self.console = console
 85        self.console_err = console_err
 86        self._com_client = com_client
 87        self.password_manager = CliPasswordProvider(
 88            self.settings, self.prompt_secret
 89        )
 90
 91    def get_use_testnet(self) -> bool:
 92        return self.ctx.obj.use_testnet
 93
 94    def get_node_url(self) -> str:
 95        use_testnet = self.get_use_testnet()
 96        return get_node_url(self.settings, use_testnet=use_testnet)
 97
 98    def com_client(self) -> CommuneClient:
 99        if self._com_client is None:
100            node_url = self.get_node_url()
101            self.info(f"Using node: {node_url}")
102            for _ in range(5):
103                try:
104                    self._com_client = CommuneClient(
105                        url=node_url,
106                        num_connections=1,
107                        wait_for_finalization=False,
108                        timeout=65,
109                    )
110                except Exception:
111                    self.info(f"Failed to connect to node: {node_url}")
112                    node_url = self.get_node_url()
113                    self.info(f"Will retry with node {node_url}")
114                    continue
115                else:
116                    break
117            if self._com_client is None:
118                raise ConnectionError("Could not connect to any node")
119
120        return self._com_client
121
122    def output(
123        self,
124        message: str,
125        *args: tuple[Any, ...],
126        **kwargs: dict[str, Any],
127    ) -> None:
128        self.console.print(message, *args, **kwargs)  # type: ignore
129
130    def info(
131        self,
132        message: str,
133        *args: tuple[Any, ...],
134        **kwargs: dict[str, Any],
135    ) -> None:
136        self.console_err.print(message, *args, **kwargs)  # type: ignore
137
138    def error(
139        self,
140        message: str,
141        *args: tuple[Any, ...],
142        **kwargs: dict[str, Any],
143    ) -> None:
144        message = f"ERROR: {message}"
145        self.console_err.print(message, *args, style="bold red", **kwargs)  # type: ignore
146
147    def progress_status(self, message: str):
148        return self.console_err.status(message)
149
150    def confirm(self, message: str) -> bool:
151        if self.ctx.obj.yes_to_all:
152            print(f"{message} (--yes)")
153            return True
154        return typer.confirm(message, err=True)
155
156    def prompt_secret(self, message: str) -> str:
157        return rich.prompt.Prompt.ask(
158            message, password=True, console=self.console_err
159        )
160
161    def load_key(self, key: str, password: str | None = None) -> Keypair:
162        try:
163            keypair = try_classic_load_key(
164                key, password, password_provider=self.password_manager
165            )
166            return keypair
167        except PasswordNotProvidedError:
168            self.error(f"Password not provided for key '{key}'")
169            raise typer.Exit(code=1)
170        except InvalidPasswordError:
171            self.error(f"Incorrect password for key '{key}'")
172            raise typer.Exit(code=1)
173
174    def resolve_key_ss58(
175        self, key: Ss58Address | Keypair | str, password: str | None = None
176    ) -> Ss58Address:
177        try:
178            address = resolve_key_ss58_encrypted(
179                key, password, password_provider=self.password_manager
180            )
181            return address
182        except PasswordNotProvidedError:
183            self.error(f"Password not provided for key '{key}'")
184            raise typer.Exit(code=1)
185        except InvalidPasswordError:
186            self.error(f"Incorrect password for key '{key}'")
187            raise typer.Exit(code=1)
CustomCtx( ctx: ExtendedContext, settings: communex._common.ComxSettings, console: rich.console.Console, console_err: rich.console.Console, com_client: communex.client.CommuneClient | None = None)
74    def __init__(
75        self,
76        ctx: ExtendedContext,
77        settings: ComxSettings,
78        console: rich.console.Console,
79        console_err: rich.console.Console,
80        com_client: CommuneClient | None = None,
81    ):
82        self.ctx = ctx
83        self.settings = settings
84        self.console = console
85        self.console_err = console_err
86        self._com_client = com_client
87        self.password_manager = CliPasswordProvider(
88            self.settings, self.prompt_secret
89        )
console: rich.console.Console
console_err: rich.console.Console
password_manager: CliPasswordProvider
def get_use_testnet(self) -> bool:
91    def get_use_testnet(self) -> bool:
92        return self.ctx.obj.use_testnet
def get_node_url(self) -> str:
94    def get_node_url(self) -> str:
95        use_testnet = self.get_use_testnet()
96        return get_node_url(self.settings, use_testnet=use_testnet)
def com_client(self) -> communex.client.CommuneClient:
 98    def com_client(self) -> CommuneClient:
 99        if self._com_client is None:
100            node_url = self.get_node_url()
101            self.info(f"Using node: {node_url}")
102            for _ in range(5):
103                try:
104                    self._com_client = CommuneClient(
105                        url=node_url,
106                        num_connections=1,
107                        wait_for_finalization=False,
108                        timeout=65,
109                    )
110                except Exception:
111                    self.info(f"Failed to connect to node: {node_url}")
112                    node_url = self.get_node_url()
113                    self.info(f"Will retry with node {node_url}")
114                    continue
115                else:
116                    break
117            if self._com_client is None:
118                raise ConnectionError("Could not connect to any node")
119
120        return self._com_client
def output( self, message: str, *args: tuple[typing.Any, ...], **kwargs: dict[str, typing.Any]) -> None:
122    def output(
123        self,
124        message: str,
125        *args: tuple[Any, ...],
126        **kwargs: dict[str, Any],
127    ) -> None:
128        self.console.print(message, *args, **kwargs)  # type: ignore
def info( self, message: str, *args: tuple[typing.Any, ...], **kwargs: dict[str, typing.Any]) -> None:
130    def info(
131        self,
132        message: str,
133        *args: tuple[Any, ...],
134        **kwargs: dict[str, Any],
135    ) -> None:
136        self.console_err.print(message, *args, **kwargs)  # type: ignore
def error( self, message: str, *args: tuple[typing.Any, ...], **kwargs: dict[str, typing.Any]) -> None:
138    def error(
139        self,
140        message: str,
141        *args: tuple[Any, ...],
142        **kwargs: dict[str, Any],
143    ) -> None:
144        message = f"ERROR: {message}"
145        self.console_err.print(message, *args, style="bold red", **kwargs)  # type: ignore
def progress_status(self, message: str):
147    def progress_status(self, message: str):
148        return self.console_err.status(message)
def confirm(self, message: str) -> bool:
150    def confirm(self, message: str) -> bool:
151        if self.ctx.obj.yes_to_all:
152            print(f"{message} (--yes)")
153            return True
154        return typer.confirm(message, err=True)
def prompt_secret(self, message: str) -> str:
156    def prompt_secret(self, message: str) -> str:
157        return rich.prompt.Prompt.ask(
158            message, password=True, console=self.console_err
159        )
def load_key( self, key: str, password: str | None = None) -> substrateinterface.keypair.Keypair:
161    def load_key(self, key: str, password: str | None = None) -> Keypair:
162        try:
163            keypair = try_classic_load_key(
164                key, password, password_provider=self.password_manager
165            )
166            return keypair
167        except PasswordNotProvidedError:
168            self.error(f"Password not provided for key '{key}'")
169            raise typer.Exit(code=1)
170        except InvalidPasswordError:
171            self.error(f"Incorrect password for key '{key}'")
172            raise typer.Exit(code=1)
def resolve_key_ss58( self, key: Union[communex.types.Ss58Address, substrateinterface.keypair.Keypair, str], password: str | None = None) -> communex.types.Ss58Address:
174    def resolve_key_ss58(
175        self, key: Ss58Address | Keypair | str, password: str | None = None
176    ) -> Ss58Address:
177        try:
178            address = resolve_key_ss58_encrypted(
179                key, password, password_provider=self.password_manager
180            )
181            return address
182        except PasswordNotProvidedError:
183            self.error(f"Password not provided for key '{key}'")
184            raise typer.Exit(code=1)
185        except InvalidPasswordError:
186            self.error(f"Incorrect password for key '{key}'")
187            raise typer.Exit(code=1)
def make_custom_context(ctx: typer.models.Context) -> CustomCtx:
190def make_custom_context(ctx: typer.Context) -> CustomCtx:
191    return CustomCtx(
192        ctx=cast(ExtendedContext, ctx),  # TODO: better check
193        settings=ComxSettings(),
194        console=Console(),
195        console_err=Console(stderr=True),
196    )
def eprint(e: Any) -> None:
202def eprint(e: Any) -> None:
203    """
204    Pretty prints an error.
205    """
206
207    console = Console()
208
209    console.print(f"[bold red]ERROR: {e}", style="italic")

Pretty prints an error.

def transform_module_into( to_exclude: list[str], last_block: int, immunity_period: int, modules: list[communex.types.ModuleInfoWithOptionalBalance], tempo: int):
264def transform_module_into(
265    to_exclude: list[str],
266    last_block: int,
267    immunity_period: int,
268    modules: list[ModuleInfoWithOptionalBalance],
269    tempo: int,
270):
271    mods = cast(list[dict[str, Any]], modules)
272    transformed_modules: list[dict[str, Any]] = []
273    for mod in mods:
274        module = mod.copy()
275        module_regblock = module["regblock"]
276        module["in_immunity"] = module_regblock + immunity_period > last_block
277
278        for key in to_exclude:
279            del module[key]
280        module["stake"] = round(from_nano(module["stake"]), 2)  # type: ignore
281        module["emission"] = round(from_horus(module["emission"], tempo), 4)  # type: ignore
282        if module.get("balance") is not None:
283            module["balance"] = from_nano(module["balance"])  # type: ignore
284        else:
285            # user should not see None values
286            del module["balance"]
287        transformed_modules.append(module)
288
289    return transformed_modules
def get_universal_password(ctx: CustomCtx) -> str:
355def get_universal_password(ctx: CustomCtx) -> str:
356    ctx.info("Please provide the universal password for all keys")
357    universal_password = getpass()
358    return universal_password
def tranform_network_params(params: communex.types.NetworkParams):
361def tranform_network_params(params: NetworkParams):
362    """Transform network params to be human readable."""
363    governance_config = params["governance_config"]
364    allocation = governance_config["proposal_reward_treasury_allocation"]
365    governance_config = cast(dict[str, Any], governance_config)
366    governance_config["proposal_reward_treasury_allocation"] = f"{allocation}%"
367    params_ = cast(dict[str, Any], params)
368    params_["governance_config"] = governance_config
369    general_params = dict_from_nano(
370        params_,
371        [
372            "min_weight_stake",
373            "general_subnet_application_cost",
374            "subnet_registration_cost",
375            "proposal_cost",
376            "max_proposal_reward_treasury_allocation",
377        ],
378    )
379
380    return general_params

Transform network params to be human readable.

def remove_none_values(data: dict[~T, typing.Optional[~V]]) -> dict[~T, ~V]:
387def remove_none_values(data: dict[T, V | None]) -> dict[T, V]:
388    """
389    Removes key-value pairs from a dictionary where the value is None.
390    Works recursively for nested dictionaries.
391    """
392    cleaned_data: dict[T, V] = {}
393    for key, value in data.items():
394        if isinstance(value, dict):
395            cleaned_value = remove_none_values(value)  # type: ignore
396            if cleaned_value is not None:  # type: ignore
397                cleaned_data[key] = cleaned_value
398        elif value is not None:
399            cleaned_data[key] = value
400    return cleaned_data

Removes key-value pairs from a dictionary where the value is None. Works recursively for nested dictionaries.

def transform_subnet_params(params: dict[int, communex.types.SubnetParamsWithEmission]):
403def transform_subnet_params(params: dict[int, SubnetParamsWithEmission]):
404    """Transform subnet params to be human readable."""
405    params_ = cast(dict[int, Any], params)
406    display_params = remove_none_values(params_)
407    display_params = dict_from_nano(
408        display_params,
409        [
410            "bonds_ma",
411            "min_burn",
412            "max_burn",
413            "min_weight_stake",
414            "proposal_cost",
415            "max_proposal_reward_treasury_allocation",
416            "min_validator_stake",
417        ],
418    )
419    return display_params

Transform subnet params to be human readable.