Edit on GitHub

communex.compat.key

Key storage compatible with the classic commune library.

WIP

  1"""
  2Key storage compatible with the classic `commune` library.
  3
  4WIP
  5"""
  6
  7import json
  8import os
  9from getpass import getpass
 10from pathlib import Path
 11from typing import Any, Protocol, cast
 12
 13from substrateinterface import Keypair  # type: ignore
 14
 15from communex.compat.storage import COMMUNE_HOME, classic_load, classic_put
 16from communex.compat.types import CommuneKeyDict
 17from communex.key import check_ss58_address, is_ss58_address
 18from communex.types import Ss58Address
 19from communex.util import bytes_to_hex, check_str
 20
 21
 22class GenericCtx(Protocol):
 23    def info(self, message: str):
 24        ...
 25
 26
 27def check_key_dict(key_dict: Any) -> CommuneKeyDict:
 28    """
 29    Validates a given dictionary as a commune key dictionary and returns it.
 30
 31    This function checks if the provided dictionary adheres to the structure of
 32    a CommuneKeyDict, that is used by the classic `commune` library and returns
 33    it if valid.
 34
 35    Args:
 36        key_dict: The dictionary to validate.
 37
 38    Returns:
 39        The validated commune key dictionary. Same as input.
 40
 41    Raises:
 42      AssertionError: If the dictionary does not conform to the expected
 43        structure.
 44    """
 45
 46    assert isinstance(key_dict, dict)
 47    assert isinstance(key_dict["crypto_type"], int)
 48    assert isinstance(key_dict["seed_hex"], str)
 49    assert isinstance(key_dict["derive_path"], str | None)
 50    assert isinstance(key_dict["path"], str) or key_dict["path"] is None
 51    assert isinstance(key_dict["public_key"], str)
 52    assert isinstance(key_dict["ss58_format"], int)
 53    assert isinstance(key_dict["ss58_address"], str)
 54    assert is_ss58_address(key_dict["ss58_address"])
 55    assert isinstance(key_dict["private_key"], str)
 56    assert isinstance(key_dict["mnemonic"], str)
 57    return cast(CommuneKeyDict, key_dict)
 58
 59
 60def classic_key_path(name: str) -> str:
 61    """
 62    Constructs the file path for a key name in the classic commune format.
 63    """
 64
 65    home = Path.home()
 66    root_path = home / '.commune' / "key"
 67    name = name + ".json"
 68    return str(root_path / name)
 69
 70
 71def from_classic_dict(data: dict[Any, Any]) -> Keypair:
 72    """
 73    Creates a `Key` from a dict conforming to the classic `commune` format.
 74
 75    Args:
 76        data: The key data in a classic commune format.
 77        name: The name to assign to the key.
 78
 79    Returns:
 80        The reconstructed `Key` instance.
 81
 82    Raises:
 83        AssertionError: If `data` does not conform to the expected format.
 84    """
 85
 86    data_ = check_key_dict(data)
 87
 88    ss58_address = data_["ss58_address"]
 89    private_key = data_["private_key"]
 90    public_key = data_["public_key"]
 91    ss58_format = data_["ss58_format"]
 92
 93    key = Keypair.create_from_private_key(
 94        private_key, public_key, ss58_address, ss58_format)
 95
 96    return key
 97
 98
 99def to_classic_dict(keypair: Keypair, path: str) -> CommuneKeyDict:
100    """
101    Converts a keypair to a dictionary conforming to the classic commune format.
102
103    Args:
104        keypair: The keypair to convert.
105        path: The path/name of the key file.
106    """
107
108    return {
109        "path": path,
110        "mnemonic": check_str(keypair.mnemonic),
111        "public_key": bytes_to_hex(keypair.public_key),
112        "private_key": bytes_to_hex(keypair.private_key),
113        "ss58_address": check_ss58_address(keypair.ss58_address),
114        "seed_hex": bytes_to_hex(keypair.seed_hex),
115        "ss58_format": keypair.ss58_format,
116        "crypto_type": keypair.crypto_type,
117        "derive_path": keypair.derive_path,
118    }
119
120
121def classic_load_key(name: str, password: str | None = None) -> Keypair:
122    """
123    Loads the keypair with the given name from a disk.
124    """
125    path = classic_key_path(name)
126    key_dict_json = classic_load(path, password=password)
127    key_dict = json.loads(key_dict_json)
128    return from_classic_dict(key_dict)
129
130
131def is_encrypted(name: str) -> bool:
132    """
133    Checks if the key with the given name is encrypted.
134    """
135    path = classic_key_path(name)
136    full_path = os.path.expanduser(os.path.join(COMMUNE_HOME, path))
137    with open(full_path, "r") as file:
138        body = json.load(file)
139    return body["encrypted"]
140
141
142def classic_store_key(keypair: Keypair, name: str, password: str | None = None) -> None:
143    """
144    Stores the given keypair on a disk under the given name.
145    """
146    key_dict = to_classic_dict(keypair, name)
147    key_dict_json = json.dumps(key_dict)
148    path = classic_key_path(name)
149    classic_put(path, key_dict_json, password=password)
150
151
152def try_classic_load_key(
153    name: str, context: GenericCtx | None = None,
154    password: str | None = None
155) -> Keypair:
156    try:
157        keypair = classic_load_key(name, password=password)
158    except json.JSONDecodeError:
159        prompt = f"Please provide the password for the key {name}"
160        if context is not None:
161            context.info(prompt)
162        else:
163            print(prompt)
164        password = getpass()
165        keypair = classic_load_key(name, password=password)
166    return keypair
167
168
169def try_load_key(
170        name: str, 
171        context: GenericCtx | None = None, 
172        password: str | None = None
173    ):
174    try:
175        key_dict = classic_load(name, password=password)
176    except json.JSONDecodeError:
177        prompt = f"Please provide the password for the key {name}"
178        if context is not None:
179            context.info(prompt)
180        else:
181            print(prompt)
182        password = getpass()
183        key_dict = classic_load(name, password=password)
184    return key_dict
185
186
187def local_key_addresses(
188    ctx: GenericCtx | None = None,
189    universal_password: str | None = None
190) -> dict[str, Ss58Address]:
191    """
192    Retrieves a mapping of local key names to their SS58 addresses.
193    If password is passed, it will be used to decrypt every key.
194    If password is not passed and ctx is,
195    the user will be prompted for the password.
196    """
197    home = Path.home()
198    key_dir = home / '.commune' / "key"
199
200    key_names = [f.stem for f in key_dir.iterdir() if f.is_file() and not f.name.startswith('.')]
201
202    addresses_map: dict[str, Ss58Address] = {}
203
204    for key_name in key_names:
205        # issue #11 https://github.com/agicommies/communex/issues/12 added check for key2address to stop error from being thrown by wrong key type.
206        if key_name == "key2address":
207            print("key2address is saved in an invalid format. It will be ignored.")
208            continue
209        encrypted = is_encrypted(key_name)
210        if encrypted:
211            if universal_password:
212                password = universal_password
213            elif ctx:
214                ctx.info(f"Please provide the password for the key '{key_name}'")
215                password = getpass()
216            else:
217                print(f"Please provide the password for the key '{key_name}'")
218                password = getpass()
219        else:
220            password = None
221        key_dict = classic_load_key(key_name, password=password)
222        addresses_map[key_name] = check_ss58_address(key_dict.ss58_address)
223
224    return addresses_map
225
226
227def resolve_key_ss58(key: Ss58Address | Keypair | str) -> Ss58Address:
228    """
229    Resolves a keypair or key name to its corresponding SS58 address.
230
231    If the input is already an SS58 address, it is returned as is.
232    """
233
234    if isinstance(key, Keypair):
235        return key.ss58_address  # type: ignore
236
237    if is_ss58_address(key):
238        return key
239
240    try:
241        keypair = classic_load_key(key)
242    except FileNotFoundError:
243        raise ValueError(
244            f"Key is not a valid SS58 address nor a valid key name: {key}")
245
246    address = keypair.ss58_address
247
248    return check_ss58_address(address)
249
250
251def resolve_key_ss58_encrypted(
252        key: Ss58Address | Keypair | str, context: GenericCtx,
253        password: str | None = None
254
255) -> Ss58Address:
256    """
257    Resolves a keypair or key name to its corresponding SS58 address.
258
259    If the input is already an SS58 address, it is returned as is.
260    """
261
262    if isinstance(key, Keypair):
263        return check_ss58_address(key.ss58_address, key.ss58_format)
264
265    if is_ss58_address(key):
266        return key
267
268    try:
269        keypair = classic_load_key(key, password=password)
270    except json.JSONDecodeError:
271        context.info(f"Please provide the password for the key {key}")
272        password = getpass()
273        keypair = classic_load_key(key, password=password)
274    except FileNotFoundError:
275        raise ValueError(
276            f"Key is not a valid SS58 address nor a valid key name: {key}")
277
278    address = keypair.ss58_address
279
280    return check_ss58_address(address, keypair.ss58_format)
class GenericCtx(typing.Protocol):
23class GenericCtx(Protocol):
24    def info(self, message: str):
25        ...

Base class for protocol classes.

Protocol classes are defined as::

class Proto(Protocol):
    def meth(self) -> int:
        ...

Such classes are primarily used with static type checkers that recognize structural subtyping (static duck-typing).

For example::

class C:
    def meth(self) -> int:
        return 0

def func(x: Proto) -> int:
    return x.meth()

func(C())  # Passes static type check

See PEP 544 for details. Protocol classes decorated with @typing.runtime_checkable act as simple-minded runtime protocols that check only the presence of given attributes, ignoring their type signatures. Protocol classes can be generic, they are defined as::

class GenProto(Protocol[T]):
    def meth(self) -> T:
        ...
GenericCtx(*args, **kwargs)
1927def _no_init_or_replace_init(self, *args, **kwargs):
1928    cls = type(self)
1929
1930    if cls._is_protocol:
1931        raise TypeError('Protocols cannot be instantiated')
1932
1933    # Already using a custom `__init__`. No need to calculate correct
1934    # `__init__` to call. This can lead to RecursionError. See bpo-45121.
1935    if cls.__init__ is not _no_init_or_replace_init:
1936        return
1937
1938    # Initially, `__init__` of a protocol subclass is set to `_no_init_or_replace_init`.
1939    # The first instantiation of the subclass will call `_no_init_or_replace_init` which
1940    # searches for a proper new `__init__` in the MRO. The new `__init__`
1941    # replaces the subclass' old `__init__` (ie `_no_init_or_replace_init`). Subsequent
1942    # instantiation of the protocol subclass will thus use the new
1943    # `__init__` and no longer call `_no_init_or_replace_init`.
1944    for base in cls.__mro__:
1945        init = base.__dict__.get('__init__', _no_init_or_replace_init)
1946        if init is not _no_init_or_replace_init:
1947            cls.__init__ = init
1948            break
1949    else:
1950        # should not happen
1951        cls.__init__ = object.__init__
1952
1953    cls.__init__(self, *args, **kwargs)
def info(self, message: str):
24    def info(self, message: str):
25        ...
def check_key_dict(key_dict: Any) -> communex.compat.types.CommuneKeyDict:
28def check_key_dict(key_dict: Any) -> CommuneKeyDict:
29    """
30    Validates a given dictionary as a commune key dictionary and returns it.
31
32    This function checks if the provided dictionary adheres to the structure of
33    a CommuneKeyDict, that is used by the classic `commune` library and returns
34    it if valid.
35
36    Args:
37        key_dict: The dictionary to validate.
38
39    Returns:
40        The validated commune key dictionary. Same as input.
41
42    Raises:
43      AssertionError: If the dictionary does not conform to the expected
44        structure.
45    """
46
47    assert isinstance(key_dict, dict)
48    assert isinstance(key_dict["crypto_type"], int)
49    assert isinstance(key_dict["seed_hex"], str)
50    assert isinstance(key_dict["derive_path"], str | None)
51    assert isinstance(key_dict["path"], str) or key_dict["path"] is None
52    assert isinstance(key_dict["public_key"], str)
53    assert isinstance(key_dict["ss58_format"], int)
54    assert isinstance(key_dict["ss58_address"], str)
55    assert is_ss58_address(key_dict["ss58_address"])
56    assert isinstance(key_dict["private_key"], str)
57    assert isinstance(key_dict["mnemonic"], str)
58    return cast(CommuneKeyDict, key_dict)

Validates a given dictionary as a commune key dictionary and returns it.

This function checks if the provided dictionary adheres to the structure of a CommuneKeyDict, that is used by the classic commune library and returns it if valid.

Arguments:
  • key_dict: The dictionary to validate.
Returns:

The validated commune key dictionary. Same as input.

Raises:
  • AssertionError: If the dictionary does not conform to the expected structure.
def classic_key_path(name: str) -> str:
61def classic_key_path(name: str) -> str:
62    """
63    Constructs the file path for a key name in the classic commune format.
64    """
65
66    home = Path.home()
67    root_path = home / '.commune' / "key"
68    name = name + ".json"
69    return str(root_path / name)

Constructs the file path for a key name in the classic commune format.

def from_classic_dict(data: dict[typing.Any, typing.Any]) -> substrateinterface.keypair.Keypair:
72def from_classic_dict(data: dict[Any, Any]) -> Keypair:
73    """
74    Creates a `Key` from a dict conforming to the classic `commune` format.
75
76    Args:
77        data: The key data in a classic commune format.
78        name: The name to assign to the key.
79
80    Returns:
81        The reconstructed `Key` instance.
82
83    Raises:
84        AssertionError: If `data` does not conform to the expected format.
85    """
86
87    data_ = check_key_dict(data)
88
89    ss58_address = data_["ss58_address"]
90    private_key = data_["private_key"]
91    public_key = data_["public_key"]
92    ss58_format = data_["ss58_format"]
93
94    key = Keypair.create_from_private_key(
95        private_key, public_key, ss58_address, ss58_format)
96
97    return key

Creates a Key from a dict conforming to the classic commune format.

Arguments:
  • data: The key data in a classic commune format.
  • name: The name to assign to the key.
Returns:

The reconstructed Key instance.

Raises:
  • AssertionError: If data does not conform to the expected format.
def to_classic_dict( keypair: substrateinterface.keypair.Keypair, path: str) -> communex.compat.types.CommuneKeyDict:
100def to_classic_dict(keypair: Keypair, path: str) -> CommuneKeyDict:
101    """
102    Converts a keypair to a dictionary conforming to the classic commune format.
103
104    Args:
105        keypair: The keypair to convert.
106        path: The path/name of the key file.
107    """
108
109    return {
110        "path": path,
111        "mnemonic": check_str(keypair.mnemonic),
112        "public_key": bytes_to_hex(keypair.public_key),
113        "private_key": bytes_to_hex(keypair.private_key),
114        "ss58_address": check_ss58_address(keypair.ss58_address),
115        "seed_hex": bytes_to_hex(keypair.seed_hex),
116        "ss58_format": keypair.ss58_format,
117        "crypto_type": keypair.crypto_type,
118        "derive_path": keypair.derive_path,
119    }

Converts a keypair to a dictionary conforming to the classic commune format.

Arguments:
  • keypair: The keypair to convert.
  • path: The path/name of the key file.
def classic_load_key( name: str, password: str | None = None) -> substrateinterface.keypair.Keypair:
122def classic_load_key(name: str, password: str | None = None) -> Keypair:
123    """
124    Loads the keypair with the given name from a disk.
125    """
126    path = classic_key_path(name)
127    key_dict_json = classic_load(path, password=password)
128    key_dict = json.loads(key_dict_json)
129    return from_classic_dict(key_dict)

Loads the keypair with the given name from a disk.

def is_encrypted(name: str) -> bool:
132def is_encrypted(name: str) -> bool:
133    """
134    Checks if the key with the given name is encrypted.
135    """
136    path = classic_key_path(name)
137    full_path = os.path.expanduser(os.path.join(COMMUNE_HOME, path))
138    with open(full_path, "r") as file:
139        body = json.load(file)
140    return body["encrypted"]

Checks if the key with the given name is encrypted.

def classic_store_key( keypair: substrateinterface.keypair.Keypair, name: str, password: str | None = None) -> None:
143def classic_store_key(keypair: Keypair, name: str, password: str | None = None) -> None:
144    """
145    Stores the given keypair on a disk under the given name.
146    """
147    key_dict = to_classic_dict(keypair, name)
148    key_dict_json = json.dumps(key_dict)
149    path = classic_key_path(name)
150    classic_put(path, key_dict_json, password=password)

Stores the given keypair on a disk under the given name.

def try_classic_load_key( name: str, context: GenericCtx | None = None, password: str | None = None) -> substrateinterface.keypair.Keypair:
153def try_classic_load_key(
154    name: str, context: GenericCtx | None = None,
155    password: str | None = None
156) -> Keypair:
157    try:
158        keypair = classic_load_key(name, password=password)
159    except json.JSONDecodeError:
160        prompt = f"Please provide the password for the key {name}"
161        if context is not None:
162            context.info(prompt)
163        else:
164            print(prompt)
165        password = getpass()
166        keypair = classic_load_key(name, password=password)
167    return keypair
def try_load_key( name: str, context: GenericCtx | None = None, password: str | None = None):
170def try_load_key(
171        name: str, 
172        context: GenericCtx | None = None, 
173        password: str | None = None
174    ):
175    try:
176        key_dict = classic_load(name, password=password)
177    except json.JSONDecodeError:
178        prompt = f"Please provide the password for the key {name}"
179        if context is not None:
180            context.info(prompt)
181        else:
182            print(prompt)
183        password = getpass()
184        key_dict = classic_load(name, password=password)
185    return key_dict
def local_key_addresses( ctx: GenericCtx | None = None, universal_password: str | None = None) -> dict[str, communex.types.Ss58Address]:
188def local_key_addresses(
189    ctx: GenericCtx | None = None,
190    universal_password: str | None = None
191) -> dict[str, Ss58Address]:
192    """
193    Retrieves a mapping of local key names to their SS58 addresses.
194    If password is passed, it will be used to decrypt every key.
195    If password is not passed and ctx is,
196    the user will be prompted for the password.
197    """
198    home = Path.home()
199    key_dir = home / '.commune' / "key"
200
201    key_names = [f.stem for f in key_dir.iterdir() if f.is_file() and not f.name.startswith('.')]
202
203    addresses_map: dict[str, Ss58Address] = {}
204
205    for key_name in key_names:
206        # issue #11 https://github.com/agicommies/communex/issues/12 added check for key2address to stop error from being thrown by wrong key type.
207        if key_name == "key2address":
208            print("key2address is saved in an invalid format. It will be ignored.")
209            continue
210        encrypted = is_encrypted(key_name)
211        if encrypted:
212            if universal_password:
213                password = universal_password
214            elif ctx:
215                ctx.info(f"Please provide the password for the key '{key_name}'")
216                password = getpass()
217            else:
218                print(f"Please provide the password for the key '{key_name}'")
219                password = getpass()
220        else:
221            password = None
222        key_dict = classic_load_key(key_name, password=password)
223        addresses_map[key_name] = check_ss58_address(key_dict.ss58_address)
224
225    return addresses_map

Retrieves a mapping of local key names to their SS58 addresses. If password is passed, it will be used to decrypt every key. If password is not passed and ctx is, the user will be prompted for the password.

def resolve_key_ss58( key: Union[communex.types.Ss58Address, substrateinterface.keypair.Keypair, str]) -> communex.types.Ss58Address:
228def resolve_key_ss58(key: Ss58Address | Keypair | str) -> Ss58Address:
229    """
230    Resolves a keypair or key name to its corresponding SS58 address.
231
232    If the input is already an SS58 address, it is returned as is.
233    """
234
235    if isinstance(key, Keypair):
236        return key.ss58_address  # type: ignore
237
238    if is_ss58_address(key):
239        return key
240
241    try:
242        keypair = classic_load_key(key)
243    except FileNotFoundError:
244        raise ValueError(
245            f"Key is not a valid SS58 address nor a valid key name: {key}")
246
247    address = keypair.ss58_address
248
249    return check_ss58_address(address)

Resolves a keypair or key name to its corresponding SS58 address.

If the input is already an SS58 address, it is returned as is.

def resolve_key_ss58_encrypted( key: Union[communex.types.Ss58Address, substrateinterface.keypair.Keypair, str], context: GenericCtx, password: str | None = None) -> communex.types.Ss58Address:
252def resolve_key_ss58_encrypted(
253        key: Ss58Address | Keypair | str, context: GenericCtx,
254        password: str | None = None
255
256) -> Ss58Address:
257    """
258    Resolves a keypair or key name to its corresponding SS58 address.
259
260    If the input is already an SS58 address, it is returned as is.
261    """
262
263    if isinstance(key, Keypair):
264        return check_ss58_address(key.ss58_address, key.ss58_format)
265
266    if is_ss58_address(key):
267        return key
268
269    try:
270        keypair = classic_load_key(key, password=password)
271    except json.JSONDecodeError:
272        context.info(f"Please provide the password for the key {key}")
273        password = getpass()
274        keypair = classic_load_key(key, password=password)
275    except FileNotFoundError:
276        raise ValueError(
277            f"Key is not a valid SS58 address nor a valid key name: {key}")
278
279    address = keypair.ss58_address
280
281    return check_ss58_address(address, keypair.ss58_format)

Resolves a keypair or key name to its corresponding SS58 address.

If the input is already an SS58 address, it is returned as is.