Edit on GitHub

communex.cli._common

  1from dataclasses import dataclass
  2from getpass import getpass
  3from typing import Any, Callable, Mapping, TypeVar, cast
  4
  5import rich
  6import rich.prompt
  7import typer
  8from rich import box
  9from rich.console import Console
 10from rich.table import Table
 11from substrateinterface import Keypair
 12from typer import Context
 13
 14from communex._common import ComxSettings, get_node_url
 15from communex.balance import dict_from_nano, from_horus, from_nano
 16from communex.client import CommuneClient
 17from communex.compat.key import resolve_key_ss58_encrypted, try_classic_load_key
 18from communex.errors import InvalidPasswordError, PasswordNotProvidedError
 19from communex.types import (
 20    ModuleInfoWithOptionalBalance,
 21    NetworkParams,
 22    Ss58Address,
 23    SubnetParamsWithEmission,
 24)
 25
 26
 27@dataclass
 28class ExtraCtxData:
 29    output_json: bool
 30    use_testnet: bool
 31    yes_to_all: bool
 32
 33
 34class ExtendedContext(Context):
 35    obj: ExtraCtxData
 36
 37
 38class CliPasswordProvider:
 39    def __init__(
 40        self, settings: ComxSettings, prompt_secret: Callable[[str], str]
 41    ):
 42        self.settings = settings
 43        self.prompt_secret = prompt_secret
 44
 45    def get_password(self, key_name: str) -> str | None:
 46        key_map = self.settings.KEY_PASSWORDS
 47        if key_map is not None:
 48            password = key_map.get(key_name)
 49            if password is not None:
 50                return password.get_secret_value()
 51        # fallback to universal password
 52        password = self.settings.UNIVERSAL_PASSWORD
 53        if password is not None:
 54            return password.get_secret_value()
 55        else:
 56            return None
 57
 58    def ask_password(self, key_name: str) -> str:
 59        password = self.prompt_secret(
 60            f"Please provide the password for the key '{key_name}'"
 61        )
 62        return password
 63
 64
 65class CustomCtx:
 66    ctx: ExtendedContext
 67    settings: ComxSettings
 68    console: rich.console.Console
 69    console_err: rich.console.Console
 70    password_manager: CliPasswordProvider
 71    _com_client: CommuneClient | None = None
 72
 73    def __init__(
 74        self,
 75        ctx: ExtendedContext,
 76        settings: ComxSettings,
 77        console: rich.console.Console,
 78        console_err: rich.console.Console,
 79        com_client: CommuneClient | None = None,
 80    ):
 81        self.ctx = ctx
 82        self.settings = settings
 83        self.console = console
 84        self.console_err = console_err
 85        self._com_client = com_client
 86        self.password_manager = CliPasswordProvider(
 87            self.settings, self.prompt_secret
 88        )
 89
 90    def get_use_testnet(self) -> bool:
 91        return self.ctx.obj.use_testnet
 92
 93    def get_node_url(self) -> str:
 94        use_testnet = self.get_use_testnet()
 95        return get_node_url(self.settings, use_testnet=use_testnet)
 96
 97    def com_client(self) -> CommuneClient:
 98        if self._com_client is None:
 99            node_url = self.get_node_url()
100            self.info(f"Using node: {node_url}")
101            for _ in range(5):
102                try:
103                    self._com_client = CommuneClient(
104                        url=node_url,
105                        num_connections=1,
106                        wait_for_finalization=False,
107                    )
108                except Exception:
109                    self.info(f"Failed to connect to node: {node_url}")
110                    node_url = self.get_node_url()
111                    self.info(f"Will retry with node {node_url}")
112                    continue
113            if self._com_client is None:
114                raise ConnectionError("Could not connect to any node")
115
116        return self._com_client
117
118    def output(
119        self,
120        message: str,
121        *args: tuple[Any, ...],
122        **kwargs: dict[str, Any],
123    ) -> None:
124        self.console.print(message, *args, **kwargs)  # type: ignore
125
126    def info(
127        self,
128        message: str,
129        *args: tuple[Any, ...],
130        **kwargs: dict[str, Any],
131    ) -> None:
132        self.console_err.print(message, *args, **kwargs)  # type: ignore
133
134    def error(
135        self,
136        message: str,
137        *args: tuple[Any, ...],
138        **kwargs: dict[str, Any],
139    ) -> None:
140        message = f"ERROR: {message}"
141        self.console_err.print(message, *args, style="bold red", **kwargs)  # type: ignore
142
143    def progress_status(self, message: str):
144        return self.console_err.status(message)
145
146    def confirm(self, message: str) -> bool:
147        if self.ctx.obj.yes_to_all:
148            print(f"{message} (--yes)")
149            return True
150        return typer.confirm(message, err=True)
151
152    def prompt_secret(self, message: str) -> str:
153        return rich.prompt.Prompt.ask(
154            message, password=True, console=self.console_err
155        )
156
157    def load_key(self, key: str, password: str | None = None) -> Keypair:
158        try:
159            keypair = try_classic_load_key(
160                key, password, password_provider=self.password_manager
161            )
162            return keypair
163        except PasswordNotProvidedError:
164            self.error(f"Password not provided for key '{key}'")
165            raise typer.Exit(code=1)
166        except InvalidPasswordError:
167            self.error(f"Incorrect password for key '{key}'")
168            raise typer.Exit(code=1)
169
170    def resolve_key_ss58(
171        self, key: Ss58Address | Keypair | str, password: str | None = None
172    ) -> Ss58Address:
173        try:
174            address = resolve_key_ss58_encrypted(
175                key, password, password_provider=self.password_manager
176            )
177            return address
178        except PasswordNotProvidedError:
179            self.error(f"Password not provided for key '{key}'")
180            raise typer.Exit(code=1)
181        except InvalidPasswordError:
182            self.error(f"Incorrect password for key '{key}'")
183            raise typer.Exit(code=1)
184
185
186def make_custom_context(ctx: typer.Context) -> CustomCtx:
187    return CustomCtx(
188        ctx=cast(ExtendedContext, ctx),  # TODO: better check
189        settings=ComxSettings(),
190        console=Console(),
191        console_err=Console(stderr=True),
192    )
193
194
195# Formatting
196
197
198def eprint(e: Any) -> None:
199    """
200    Pretty prints an error.
201    """
202
203    console = Console()
204
205    console.print(f"[bold red]ERROR: {e}", style="italic")
206
207
208def print_table_from_plain_dict(
209    result: Mapping[str, str | int | float | dict[Any, Any] | Ss58Address],
210    column_names: list[str],
211    console: Console,
212) -> None:
213    """
214    Creates a table for a plain dictionary.
215    """
216
217    table = Table(show_header=True, header_style="bold magenta")
218
219    for name in column_names:
220        table.add_column(name, style="white", vertical="middle")
221
222    # Add non-dictionary values to the table first
223    for key, value in result.items():
224        if not isinstance(value, dict):
225            table.add_row(key, str(value))
226    # Add subtables for nested dictionaries.
227    # Important to add after so that the display of the table is nicer.
228    for key, value in result.items():
229        if isinstance(value, dict):
230            subtable = Table(
231                show_header=False,
232                padding=(0, 0, 0, 0),
233                border_style="bright_black",
234            )
235            for subkey, subvalue in value.items():
236                subtable.add_row(f"{subkey}: {subvalue}")
237            table.add_row(key, subtable)
238
239    console.print(table)
240
241
242def print_table_standardize(
243    result: dict[str, list[Any]], console: Console
244) -> None:
245    """
246    Creates a table for a standardized dictionary.
247    """
248    table = Table(show_header=True, header_style="bold magenta")
249
250    for key in result.keys():
251        table.add_column(key, style="white")
252    rows = [*result.values()]
253    zipped_rows = [list(column) for column in zip(*rows)]
254    for row in zipped_rows:
255        table.add_row(*row, style="white")
256
257    console.print(table)
258
259
260def transform_module_into(
261    to_exclude: list[str],
262    last_block: int,
263    immunity_period: int,
264    modules: list[ModuleInfoWithOptionalBalance],
265    tempo: int,
266):
267    mods = cast(list[dict[str, Any]], modules)
268    transformed_modules: list[dict[str, Any]] = []
269    for mod in mods:
270        module = mod.copy()
271        module_regblock = module["regblock"]
272        module["in_immunity"] = module_regblock + immunity_period > last_block
273
274        for key in to_exclude:
275            del module[key]
276        module["stake"] = round(from_nano(module["stake"]), 2)  # type: ignore
277        module["emission"] = round(from_horus(module["emission"], tempo), 4)  # type: ignore
278        if module.get("balance") is not None:
279            module["balance"] = from_nano(module["balance"])  # type: ignore
280        else:
281            # user should not see None values
282            del module["balance"]
283        transformed_modules.append(module)
284
285    return transformed_modules
286
287
288def print_module_info(
289    client: CommuneClient,
290    modules: list[ModuleInfoWithOptionalBalance],
291    console: Console,
292    netuid: int,
293    title: str | None = None,
294) -> None:
295    """
296    Prints information about a module.
297    """
298    if not modules:
299        return
300
301    # Get the current block number, we will need this to caluclate immunity period
302    block = client.get_block()
303    if block:
304        last_block = block["header"]["number"]
305    else:
306        raise ValueError("Could not get block info")
307
308    # Get the immunity period on the netuid
309    immunity_period = client.get_immunity_period(netuid)
310    tempo = client.get_tempo(netuid)
311
312    # Transform the module dictionary to have immunity_period
313    table = Table(
314        show_header=True,
315        header_style="bold magenta",
316        box=box.DOUBLE_EDGE,
317        title=title,
318        caption_style="chartreuse3",
319        title_style="bold magenta",
320    )
321
322    to_exclude = ["stake_from", "last_update", "regblock"]
323    tranformed_modules = transform_module_into(
324        to_exclude, last_block, immunity_period, modules, tempo
325    )
326
327    sample_mod = tranformed_modules[0]
328    for key in sample_mod.keys():
329        # add columns
330        table.add_column(key, style="white")
331
332    total_stake = 0
333    total_balance = 0
334
335    for mod in tranformed_modules:
336        total_stake += mod["stake"]
337        if mod.get("balance") is not None:
338            total_balance += mod["balance"]
339
340        row: list[str] = []
341        for val in mod.values():
342            row.append(str(val))
343        table.add_row(*row)
344
345    table.caption = "total balance: " + f"{total_balance + total_stake}J"
346    console.print(table)
347    for _ in range(3):
348        console.print()
349
350
351def get_universal_password(ctx: CustomCtx) -> str:
352    ctx.info("Please provide the universal password for all keys")
353    universal_password = getpass()
354    return universal_password
355
356
357def tranform_network_params(params: NetworkParams):
358    """Transform network params to be human readable."""
359    governance_config = params["governance_config"]
360    allocation = governance_config["proposal_reward_treasury_allocation"]
361    governance_config = cast(dict[str, Any], governance_config)
362    governance_config["proposal_reward_treasury_allocation"] = f"{allocation}%"
363    params_ = cast(dict[str, Any], params)
364    params_["governance_config"] = governance_config
365    general_params = dict_from_nano(
366        params_,
367        [
368            "min_weight_stake",
369            "general_subnet_application_cost",
370            "subnet_registration_cost",
371            "proposal_cost",
372            "max_proposal_reward_treasury_allocation",
373        ],
374    )
375
376    return general_params
377
378
379T = TypeVar("T")
380V = TypeVar("V")
381
382
383def remove_none_values(data: dict[T, V | None]) -> dict[T, V]:
384    """
385    Removes key-value pairs from a dictionary where the value is None.
386    Works recursively for nested dictionaries.
387    """
388    cleaned_data: dict[T, V] = {}
389    for key, value in data.items():
390        if isinstance(value, dict):
391            cleaned_value = remove_none_values(value)  # type: ignore
392            if cleaned_value is not None:  # type: ignore
393                cleaned_data[key] = cleaned_value
394        elif value is not None:
395            cleaned_data[key] = value
396    return cleaned_data
397
398
399def transform_subnet_params(params: dict[int, SubnetParamsWithEmission]):
400    """Transform subnet params to be human readable."""
401    params_ = cast(dict[int, Any], params)
402    display_params = remove_none_values(params_)
403    display_params = dict_from_nano(
404        display_params,
405        [
406            "bonds_ma",
407            "min_burn",
408            "max_burn",
409            "min_weight_stake",
410            "proposal_cost",
411            "max_proposal_reward_treasury_allocation",
412            "min_validator_stake",
413        ],
414    )
415    return display_params
@dataclass
class ExtraCtxData:
28@dataclass
29class ExtraCtxData:
30    output_json: bool
31    use_testnet: bool
32    yes_to_all: bool
ExtraCtxData(output_json: bool, use_testnet: bool, yes_to_all: bool)
output_json: bool
use_testnet: bool
yes_to_all: bool
class ExtendedContext(typer.models.Context):
35class ExtendedContext(Context):
36    obj: ExtraCtxData

The context is a special internal object that holds state relevant for the script execution at every single level. It's normally invisible to commands unless they opt-in to getting access to it.

The context is useful as it can pass internal objects around and can control special execution features such as reading data from environment variables.

A context can be used as context manager in which case it will call close() on teardown.

Parameters
  • command: the command class for this context.
  • parent: the parent context.
  • info_name: the info name for this invocation. Generally this is the most descriptive name for the script or command. For the toplevel script it is usually the name of the script, for commands below it it's the name of the script.
  • obj: an arbitrary object of user data.
  • auto_envvar_prefix: the prefix to use for automatic environment variables. If this is None then reading from environment variables is disabled. This does not affect manually set environment variables which are always read.
  • default_map: a dictionary (like object) with default values for parameters.
  • terminal_width: the width of the terminal. The default is inherit from parent context. If no context defines the terminal width then auto detection will be applied.
  • max_content_width: the maximum width for content rendered by Click (this currently only affects help pages). This defaults to 80 characters if not overridden. In other words: even if the terminal is larger than that, Click will not format things wider than 80 characters by default. In addition to that, formatters might add some safety mapping on the right.
  • resilient_parsing: if this flag is enabled then Click will parse without any interactivity or callback invocation. Default values will also be ignored. This is useful for implementing things such as completion support.
  • allow_extra_args: if this is set to True then extra arguments at the end will not raise an error and will be kept on the context. The default is to inherit from the command.
  • allow_interspersed_args: if this is set to False then options and arguments cannot be mixed. The default is to inherit from the command.
  • ignore_unknown_options: instructs click to ignore options it does not know and keeps them for later processing.
  • help_option_names: optionally a list of strings that define how the default help parameter is named. The default is ['--help'].
  • token_normalize_func: an optional function that is used to normalize tokens (options, choices, etc.). This for instance can be used to implement case insensitive behavior.
  • color: controls if the terminal supports ANSI colors or not. The default is autodetection. This is only needed if ANSI codes are used in texts that Click prints which is by default not the case. This for instance would affect help output.
  • show_default: Show the default value for commands. If this value is not set, it defaults to the value from the parent context. Command.show_default overrides this default for the specific command.

Changed in version 8.1: The show_default parameter is overridden by Command.show_default, instead of the other way around.

Changed in version 8.0: The show_default parameter defaults to the value from the parent context.

Changed in version 7.1: Added the show_default parameter.

Changed in version 4.0: Added the color, ignore_unknown_options, and max_content_width parameters.

Changed in version 3.0: Added the allow_extra_args and allow_interspersed_args parameters.

Changed in version 2.0: Added the resilient_parsing, help_option_names, and token_normalize_func parameters.

Inherited Members
click.core.Context
Context
formatter_class
parent
command
info_name
params
args
protected_args
default_map
invoked_subcommand
terminal_width
max_content_width
allow_extra_args
allow_interspersed_args
ignore_unknown_options
help_option_names
token_normalize_func
resilient_parsing
auto_envvar_prefix
color
show_default
to_info_dict
scope
meta
make_formatter
with_resource
call_on_close
close
command_path
find_root
find_object
ensure_object
lookup_default
fail
abort
exit
get_usage
get_help
invoke
forward
set_parameter_source
get_parameter_source
class CliPasswordProvider:
39class CliPasswordProvider:
40    def __init__(
41        self, settings: ComxSettings, prompt_secret: Callable[[str], str]
42    ):
43        self.settings = settings
44        self.prompt_secret = prompt_secret
45
46    def get_password(self, key_name: str) -> str | None:
47        key_map = self.settings.KEY_PASSWORDS
48        if key_map is not None:
49            password = key_map.get(key_name)
50            if password is not None:
51                return password.get_secret_value()
52        # fallback to universal password
53        password = self.settings.UNIVERSAL_PASSWORD
54        if password is not None:
55            return password.get_secret_value()
56        else:
57            return None
58
59    def ask_password(self, key_name: str) -> str:
60        password = self.prompt_secret(
61            f"Please provide the password for the key '{key_name}'"
62        )
63        return password
CliPasswordProvider( settings: communex._common.ComxSettings, prompt_secret: Callable[[str], str])
40    def __init__(
41        self, settings: ComxSettings, prompt_secret: Callable[[str], str]
42    ):
43        self.settings = settings
44        self.prompt_secret = prompt_secret
settings
prompt_secret
def get_password(self, key_name: str) -> str | None:
46    def get_password(self, key_name: str) -> str | None:
47        key_map = self.settings.KEY_PASSWORDS
48        if key_map is not None:
49            password = key_map.get(key_name)
50            if password is not None:
51                return password.get_secret_value()
52        # fallback to universal password
53        password = self.settings.UNIVERSAL_PASSWORD
54        if password is not None:
55            return password.get_secret_value()
56        else:
57            return None
def ask_password(self, key_name: str) -> str:
59    def ask_password(self, key_name: str) -> str:
60        password = self.prompt_secret(
61            f"Please provide the password for the key '{key_name}'"
62        )
63        return password
class CustomCtx:
 66class CustomCtx:
 67    ctx: ExtendedContext
 68    settings: ComxSettings
 69    console: rich.console.Console
 70    console_err: rich.console.Console
 71    password_manager: CliPasswordProvider
 72    _com_client: CommuneClient | None = None
 73
 74    def __init__(
 75        self,
 76        ctx: ExtendedContext,
 77        settings: ComxSettings,
 78        console: rich.console.Console,
 79        console_err: rich.console.Console,
 80        com_client: CommuneClient | None = None,
 81    ):
 82        self.ctx = ctx
 83        self.settings = settings
 84        self.console = console
 85        self.console_err = console_err
 86        self._com_client = com_client
 87        self.password_manager = CliPasswordProvider(
 88            self.settings, self.prompt_secret
 89        )
 90
 91    def get_use_testnet(self) -> bool:
 92        return self.ctx.obj.use_testnet
 93
 94    def get_node_url(self) -> str:
 95        use_testnet = self.get_use_testnet()
 96        return get_node_url(self.settings, use_testnet=use_testnet)
 97
 98    def com_client(self) -> CommuneClient:
 99        if self._com_client is None:
100            node_url = self.get_node_url()
101            self.info(f"Using node: {node_url}")
102            for _ in range(5):
103                try:
104                    self._com_client = CommuneClient(
105                        url=node_url,
106                        num_connections=1,
107                        wait_for_finalization=False,
108                    )
109                except Exception:
110                    self.info(f"Failed to connect to node: {node_url}")
111                    node_url = self.get_node_url()
112                    self.info(f"Will retry with node {node_url}")
113                    continue
114            if self._com_client is None:
115                raise ConnectionError("Could not connect to any node")
116
117        return self._com_client
118
119    def output(
120        self,
121        message: str,
122        *args: tuple[Any, ...],
123        **kwargs: dict[str, Any],
124    ) -> None:
125        self.console.print(message, *args, **kwargs)  # type: ignore
126
127    def info(
128        self,
129        message: str,
130        *args: tuple[Any, ...],
131        **kwargs: dict[str, Any],
132    ) -> None:
133        self.console_err.print(message, *args, **kwargs)  # type: ignore
134
135    def error(
136        self,
137        message: str,
138        *args: tuple[Any, ...],
139        **kwargs: dict[str, Any],
140    ) -> None:
141        message = f"ERROR: {message}"
142        self.console_err.print(message, *args, style="bold red", **kwargs)  # type: ignore
143
144    def progress_status(self, message: str):
145        return self.console_err.status(message)
146
147    def confirm(self, message: str) -> bool:
148        if self.ctx.obj.yes_to_all:
149            print(f"{message} (--yes)")
150            return True
151        return typer.confirm(message, err=True)
152
153    def prompt_secret(self, message: str) -> str:
154        return rich.prompt.Prompt.ask(
155            message, password=True, console=self.console_err
156        )
157
158    def load_key(self, key: str, password: str | None = None) -> Keypair:
159        try:
160            keypair = try_classic_load_key(
161                key, password, password_provider=self.password_manager
162            )
163            return keypair
164        except PasswordNotProvidedError:
165            self.error(f"Password not provided for key '{key}'")
166            raise typer.Exit(code=1)
167        except InvalidPasswordError:
168            self.error(f"Incorrect password for key '{key}'")
169            raise typer.Exit(code=1)
170
171    def resolve_key_ss58(
172        self, key: Ss58Address | Keypair | str, password: str | None = None
173    ) -> Ss58Address:
174        try:
175            address = resolve_key_ss58_encrypted(
176                key, password, password_provider=self.password_manager
177            )
178            return address
179        except PasswordNotProvidedError:
180            self.error(f"Password not provided for key '{key}'")
181            raise typer.Exit(code=1)
182        except InvalidPasswordError:
183            self.error(f"Incorrect password for key '{key}'")
184            raise typer.Exit(code=1)
CustomCtx( ctx: ExtendedContext, settings: communex._common.ComxSettings, console: rich.console.Console, console_err: rich.console.Console, com_client: communex.client.CommuneClient | None = None)
74    def __init__(
75        self,
76        ctx: ExtendedContext,
77        settings: ComxSettings,
78        console: rich.console.Console,
79        console_err: rich.console.Console,
80        com_client: CommuneClient | None = None,
81    ):
82        self.ctx = ctx
83        self.settings = settings
84        self.console = console
85        self.console_err = console_err
86        self._com_client = com_client
87        self.password_manager = CliPasswordProvider(
88            self.settings, self.prompt_secret
89        )
console: rich.console.Console
console_err: rich.console.Console
password_manager: CliPasswordProvider
def get_use_testnet(self) -> bool:
91    def get_use_testnet(self) -> bool:
92        return self.ctx.obj.use_testnet
def get_node_url(self) -> str:
94    def get_node_url(self) -> str:
95        use_testnet = self.get_use_testnet()
96        return get_node_url(self.settings, use_testnet=use_testnet)
def com_client(self) -> communex.client.CommuneClient:
 98    def com_client(self) -> CommuneClient:
 99        if self._com_client is None:
100            node_url = self.get_node_url()
101            self.info(f"Using node: {node_url}")
102            for _ in range(5):
103                try:
104                    self._com_client = CommuneClient(
105                        url=node_url,
106                        num_connections=1,
107                        wait_for_finalization=False,
108                    )
109                except Exception:
110                    self.info(f"Failed to connect to node: {node_url}")
111                    node_url = self.get_node_url()
112                    self.info(f"Will retry with node {node_url}")
113                    continue
114            if self._com_client is None:
115                raise ConnectionError("Could not connect to any node")
116
117        return self._com_client
def output( self, message: str, *args: tuple[typing.Any, ...], **kwargs: dict[str, typing.Any]) -> None:
119    def output(
120        self,
121        message: str,
122        *args: tuple[Any, ...],
123        **kwargs: dict[str, Any],
124    ) -> None:
125        self.console.print(message, *args, **kwargs)  # type: ignore
def info( self, message: str, *args: tuple[typing.Any, ...], **kwargs: dict[str, typing.Any]) -> None:
127    def info(
128        self,
129        message: str,
130        *args: tuple[Any, ...],
131        **kwargs: dict[str, Any],
132    ) -> None:
133        self.console_err.print(message, *args, **kwargs)  # type: ignore
def error( self, message: str, *args: tuple[typing.Any, ...], **kwargs: dict[str, typing.Any]) -> None:
135    def error(
136        self,
137        message: str,
138        *args: tuple[Any, ...],
139        **kwargs: dict[str, Any],
140    ) -> None:
141        message = f"ERROR: {message}"
142        self.console_err.print(message, *args, style="bold red", **kwargs)  # type: ignore
def progress_status(self, message: str):
144    def progress_status(self, message: str):
145        return self.console_err.status(message)
def confirm(self, message: str) -> bool:
147    def confirm(self, message: str) -> bool:
148        if self.ctx.obj.yes_to_all:
149            print(f"{message} (--yes)")
150            return True
151        return typer.confirm(message, err=True)
def prompt_secret(self, message: str) -> str:
153    def prompt_secret(self, message: str) -> str:
154        return rich.prompt.Prompt.ask(
155            message, password=True, console=self.console_err
156        )
def load_key( self, key: str, password: str | None = None) -> substrateinterface.keypair.Keypair:
158    def load_key(self, key: str, password: str | None = None) -> Keypair:
159        try:
160            keypair = try_classic_load_key(
161                key, password, password_provider=self.password_manager
162            )
163            return keypair
164        except PasswordNotProvidedError:
165            self.error(f"Password not provided for key '{key}'")
166            raise typer.Exit(code=1)
167        except InvalidPasswordError:
168            self.error(f"Incorrect password for key '{key}'")
169            raise typer.Exit(code=1)
def resolve_key_ss58( self, key: Union[communex.types.Ss58Address, substrateinterface.keypair.Keypair, str], password: str | None = None) -> communex.types.Ss58Address:
171    def resolve_key_ss58(
172        self, key: Ss58Address | Keypair | str, password: str | None = None
173    ) -> Ss58Address:
174        try:
175            address = resolve_key_ss58_encrypted(
176                key, password, password_provider=self.password_manager
177            )
178            return address
179        except PasswordNotProvidedError:
180            self.error(f"Password not provided for key '{key}'")
181            raise typer.Exit(code=1)
182        except InvalidPasswordError:
183            self.error(f"Incorrect password for key '{key}'")
184            raise typer.Exit(code=1)
def make_custom_context(ctx: typer.models.Context) -> CustomCtx:
187def make_custom_context(ctx: typer.Context) -> CustomCtx:
188    return CustomCtx(
189        ctx=cast(ExtendedContext, ctx),  # TODO: better check
190        settings=ComxSettings(),
191        console=Console(),
192        console_err=Console(stderr=True),
193    )
def eprint(e: Any) -> None:
199def eprint(e: Any) -> None:
200    """
201    Pretty prints an error.
202    """
203
204    console = Console()
205
206    console.print(f"[bold red]ERROR: {e}", style="italic")

Pretty prints an error.

def transform_module_into( to_exclude: list[str], last_block: int, immunity_period: int, modules: list[communex.types.ModuleInfoWithOptionalBalance], tempo: int):
261def transform_module_into(
262    to_exclude: list[str],
263    last_block: int,
264    immunity_period: int,
265    modules: list[ModuleInfoWithOptionalBalance],
266    tempo: int,
267):
268    mods = cast(list[dict[str, Any]], modules)
269    transformed_modules: list[dict[str, Any]] = []
270    for mod in mods:
271        module = mod.copy()
272        module_regblock = module["regblock"]
273        module["in_immunity"] = module_regblock + immunity_period > last_block
274
275        for key in to_exclude:
276            del module[key]
277        module["stake"] = round(from_nano(module["stake"]), 2)  # type: ignore
278        module["emission"] = round(from_horus(module["emission"], tempo), 4)  # type: ignore
279        if module.get("balance") is not None:
280            module["balance"] = from_nano(module["balance"])  # type: ignore
281        else:
282            # user should not see None values
283            del module["balance"]
284        transformed_modules.append(module)
285
286    return transformed_modules
def get_universal_password(ctx: CustomCtx) -> str:
352def get_universal_password(ctx: CustomCtx) -> str:
353    ctx.info("Please provide the universal password for all keys")
354    universal_password = getpass()
355    return universal_password
def tranform_network_params(params: communex.types.NetworkParams):
358def tranform_network_params(params: NetworkParams):
359    """Transform network params to be human readable."""
360    governance_config = params["governance_config"]
361    allocation = governance_config["proposal_reward_treasury_allocation"]
362    governance_config = cast(dict[str, Any], governance_config)
363    governance_config["proposal_reward_treasury_allocation"] = f"{allocation}%"
364    params_ = cast(dict[str, Any], params)
365    params_["governance_config"] = governance_config
366    general_params = dict_from_nano(
367        params_,
368        [
369            "min_weight_stake",
370            "general_subnet_application_cost",
371            "subnet_registration_cost",
372            "proposal_cost",
373            "max_proposal_reward_treasury_allocation",
374        ],
375    )
376
377    return general_params

Transform network params to be human readable.

def remove_none_values(data: dict[~T, typing.Optional[~V]]) -> dict[~T, ~V]:
384def remove_none_values(data: dict[T, V | None]) -> dict[T, V]:
385    """
386    Removes key-value pairs from a dictionary where the value is None.
387    Works recursively for nested dictionaries.
388    """
389    cleaned_data: dict[T, V] = {}
390    for key, value in data.items():
391        if isinstance(value, dict):
392            cleaned_value = remove_none_values(value)  # type: ignore
393            if cleaned_value is not None:  # type: ignore
394                cleaned_data[key] = cleaned_value
395        elif value is not None:
396            cleaned_data[key] = value
397    return cleaned_data

Removes key-value pairs from a dictionary where the value is None. Works recursively for nested dictionaries.

def transform_subnet_params(params: dict[int, communex.types.SubnetParamsWithEmission]):
400def transform_subnet_params(params: dict[int, SubnetParamsWithEmission]):
401    """Transform subnet params to be human readable."""
402    params_ = cast(dict[int, Any], params)
403    display_params = remove_none_values(params_)
404    display_params = dict_from_nano(
405        display_params,
406        [
407            "bonds_ma",
408            "min_burn",
409            "max_burn",
410            "min_weight_stake",
411            "proposal_cost",
412            "max_proposal_reward_treasury_allocation",
413            "min_validator_stake",
414        ],
415    )
416    return display_params

Transform subnet params to be human readable.