Edit on GitHub

communex.compat.key

Key storage compatible with the classic commune library.

WIP

  1"""
  2Key storage compatible with the classic `commune` library.
  3
  4WIP
  5"""
  6
  7import json
  8import os
  9from getpass import getpass
 10from pathlib import Path
 11from typing import Any, Protocol, cast
 12
 13from substrateinterface import Keypair  # type: ignore
 14
 15from communex.compat.storage import COMMUNE_HOME, classic_load, classic_put
 16from communex.compat.types import CommuneKeyDict
 17from communex.key import check_ss58_address, is_ss58_address
 18from communex.types import Ss58Address
 19from communex.util import bytes_to_hex, check_str
 20
 21
 22class GenericCtx(Protocol):
 23    def info(self, message: str):
 24        ...
 25
 26
 27def check_key_dict(key_dict: Any) -> CommuneKeyDict:
 28    """
 29    Validates a given dictionary as a commune key dictionary and returns it.
 30
 31    This function checks if the provided dictionary adheres to the structure of
 32    a CommuneKeyDict, that is used by the classic `commune` library and returns
 33    it if valid.
 34
 35    Args:
 36        key_dict: The dictionary to validate.
 37
 38    Returns:
 39        The validated commune key dictionary. Same as input.
 40
 41    Raises:
 42      AssertionError: If the dictionary does not conform to the expected
 43        structure.
 44    """
 45
 46    assert isinstance(key_dict, dict)
 47    assert isinstance(key_dict["crypto_type"], int)
 48    assert isinstance(key_dict["seed_hex"], str)
 49    assert isinstance(key_dict["derive_path"], str | None)
 50    assert isinstance(key_dict["path"], str) or key_dict["path"] is None
 51    assert isinstance(key_dict["public_key"], str)
 52    assert isinstance(key_dict["ss58_format"], int)
 53    assert isinstance(key_dict["ss58_address"], str)
 54    assert is_ss58_address(key_dict["ss58_address"])
 55    assert isinstance(key_dict["private_key"], str)
 56    assert isinstance(key_dict["mnemonic"], str)
 57    return cast(CommuneKeyDict, key_dict)
 58
 59
 60def classic_key_path(name: str) -> str:
 61    """
 62    Constructs the file path for a key name in the classic commune format.
 63    """
 64
 65    home = Path.home()
 66    root_path = home / '.commune' / "key"
 67    name = name + ".json"
 68    return str(root_path / name)
 69
 70
 71def from_classic_dict(
 72        data: dict[Any, Any],
 73        from_mnemonic: bool = False
 74) -> Keypair:
 75    """
 76    Creates a `Key` from a dict conforming to the classic `commune` format.
 77
 78    Args:
 79        data: The key data in a classic commune format.
 80        name: The name to assign to the key.
 81
 82    Returns:
 83        The reconstructed `Key` instance.
 84
 85    Raises:
 86        AssertionError: If `data` does not conform to the expected format.
 87    """
 88
 89    data_ = check_key_dict(data)
 90
 91    ss58_address = data_["ss58_address"]
 92    private_key = data_["private_key"]
 93    mnemonic_key = data_["mnemonic"]
 94    public_key = data_["public_key"]
 95    ss58_format = data_["ss58_format"]
 96    if from_mnemonic:
 97        key = Keypair.create_from_mnemonic(
 98            mnemonic_key, ss58_format
 99        )
100    else:
101        key = Keypair.create_from_private_key(
102            private_key, public_key, ss58_address, ss58_format
103        )
104
105    return key
106
107
108def to_classic_dict(keypair: Keypair, path: str) -> CommuneKeyDict:
109    """
110    Converts a keypair to a dictionary conforming to the classic commune format.
111
112    Args:
113        keypair: The keypair to convert.
114        path: The path/name of the key file.
115    """
116
117    return {
118        "path": path,
119        "mnemonic": check_str(keypair.mnemonic),
120        "public_key": bytes_to_hex(keypair.public_key),
121        "private_key": bytes_to_hex(keypair.private_key),
122        "ss58_address": check_ss58_address(keypair.ss58_address),
123        "seed_hex": bytes_to_hex(keypair.seed_hex),
124        "ss58_format": keypair.ss58_format,
125        "crypto_type": keypair.crypto_type,
126        "derive_path": keypair.derive_path,
127    }
128
129
130def classic_load_key(
131    name: str,
132    password: str | None = None,
133    from_mnemonic: bool = False,
134) -> Keypair:
135    """
136    Loads the keypair with the given name from a disk.
137    """
138    path = classic_key_path(name)
139    key_dict_json = classic_load(path, password=password)
140    key_dict = json.loads(key_dict_json)
141    return from_classic_dict(key_dict, from_mnemonic=from_mnemonic)
142
143
144def is_encrypted(name: str) -> bool:
145    """
146    Checks if the key with the given name is encrypted.
147    """
148    path = classic_key_path(name)
149    full_path = os.path.expanduser(os.path.join(COMMUNE_HOME, path))
150    with open(full_path, "r") as file:
151        body = json.load(file)
152    return body["encrypted"]
153
154
155def classic_store_key(keypair: Keypair, name: str, password: str | None = None) -> None:
156    """
157    Stores the given keypair on a disk under the given name.
158    """
159    key_dict = to_classic_dict(keypair, name)
160    key_dict_json = json.dumps(key_dict)
161    path = classic_key_path(name)
162    classic_put(path, key_dict_json, password=password)
163
164
165def try_classic_load_key(
166    name: str, context: GenericCtx | None = None,
167    password: str | None = None,
168    from_mnemonic: bool = False,
169) -> Keypair:
170    try:
171        keypair = classic_load_key(
172            name,
173            password=password,
174            from_mnemonic=from_mnemonic
175        )
176    except json.JSONDecodeError:
177        prompt = f"Please provide the password for the key {name}"
178        if context is not None:
179            context.info(prompt)
180        else:
181            print(prompt)
182        password = getpass()
183        keypair = classic_load_key(
184            name,
185            password=password,
186            from_mnemonic=from_mnemonic
187        )
188    return keypair
189
190
191def try_load_key(
192    name: str,
193    context: GenericCtx | None = None,
194    password: str | None = None
195):
196    try:
197        key_dict = classic_load(name, password=password)
198    except json.JSONDecodeError:
199        prompt = f"Please provide the password for the key {name}"
200        if context is not None:
201            context.info(prompt)
202        else:
203            print(prompt)
204        password = getpass()
205        key_dict = classic_load(name, password=password)
206    return key_dict
207
208
209def local_key_addresses(
210    ctx: GenericCtx | None = None,
211    universal_password: str | None = None
212) -> dict[str, Ss58Address]:
213    """
214    Retrieves a mapping of local key names to their SS58 addresses.
215    If password is passed, it will be used to decrypt every key.
216    If password is not passed and ctx is,
217    the user will be prompted for the password.
218    """
219    home = Path.home()
220    key_dir = home / '.commune' / "key"
221
222    key_names = [f.stem for f in key_dir.iterdir() if f.is_file() and not f.name.startswith('.')]
223
224    addresses_map: dict[str, Ss58Address] = {}
225
226    for key_name in key_names:
227        # issue #11 https://github.com/agicommies/communex/issues/12 added check for key2address to stop error from being thrown by wrong key type.
228        if key_name == "key2address":
229            print("key2address is saved in an invalid format. It will be ignored.")
230            continue
231        encrypted = is_encrypted(key_name)
232        if encrypted:
233            if universal_password:
234                password = universal_password
235            elif ctx:
236                ctx.info(f"Please provide the password for the key '{key_name}'")
237                password = getpass()
238            else:
239                print(f"Please provide the password for the key '{key_name}'")
240                password = getpass()
241        else:
242            password = None
243        key_dict = classic_load_key(key_name, password=password)
244        addresses_map[key_name] = check_ss58_address(key_dict.ss58_address)
245
246    return addresses_map
247
248
249def resolve_key_ss58(key: Ss58Address | Keypair | str) -> Ss58Address:
250    """
251    Resolves a keypair or key name to its corresponding SS58 address.
252
253    If the input is already an SS58 address, it is returned as is.
254    """
255
256    if isinstance(key, Keypair):
257        return key.ss58_address  # type: ignore
258
259    if is_ss58_address(key):
260        return key
261
262    try:
263        keypair = classic_load_key(key)
264    except FileNotFoundError:
265        raise ValueError(
266            f"Key is not a valid SS58 address nor a valid key name: {key}")
267
268    address = keypair.ss58_address
269
270    return check_ss58_address(address)
271
272
273def resolve_key_ss58_encrypted(
274        key: Ss58Address | Keypair | str, context: GenericCtx,
275        password: str | None = None
276
277) -> Ss58Address:
278    """
279    Resolves a keypair or key name to its corresponding SS58 address.
280
281    If the input is already an SS58 address, it is returned as is.
282    """
283
284    if isinstance(key, Keypair):
285        return check_ss58_address(key.ss58_address, key.ss58_format)
286
287    if is_ss58_address(key):
288        return key
289
290    try:
291        keypair = classic_load_key(key, password=password)
292    except json.JSONDecodeError:
293        context.info(f"Please provide the password for the key {key}")
294        password = getpass()
295        keypair = classic_load_key(key, password=password)
296    except FileNotFoundError:
297        raise ValueError(
298            f"Key is not a valid SS58 address nor a valid key name: {key}")
299
300    address = keypair.ss58_address
301
302    return check_ss58_address(address, keypair.ss58_format)
class GenericCtx(typing.Protocol):
23class GenericCtx(Protocol):
24    def info(self, message: str):
25        ...

Base class for protocol classes.

Protocol classes are defined as::

class Proto(Protocol):
    def meth(self) -> int:
        ...

Such classes are primarily used with static type checkers that recognize structural subtyping (static duck-typing).

For example::

class C:
    def meth(self) -> int:
        return 0

def func(x: Proto) -> int:
    return x.meth()

func(C())  # Passes static type check

See PEP 544 for details. Protocol classes decorated with @typing.runtime_checkable act as simple-minded runtime protocols that check only the presence of given attributes, ignoring their type signatures. Protocol classes can be generic, they are defined as::

class GenProto(Protocol[T]):
    def meth(self) -> T:
        ...
GenericCtx(*args, **kwargs)
1927def _no_init_or_replace_init(self, *args, **kwargs):
1928    cls = type(self)
1929
1930    if cls._is_protocol:
1931        raise TypeError('Protocols cannot be instantiated')
1932
1933    # Already using a custom `__init__`. No need to calculate correct
1934    # `__init__` to call. This can lead to RecursionError. See bpo-45121.
1935    if cls.__init__ is not _no_init_or_replace_init:
1936        return
1937
1938    # Initially, `__init__` of a protocol subclass is set to `_no_init_or_replace_init`.
1939    # The first instantiation of the subclass will call `_no_init_or_replace_init` which
1940    # searches for a proper new `__init__` in the MRO. The new `__init__`
1941    # replaces the subclass' old `__init__` (ie `_no_init_or_replace_init`). Subsequent
1942    # instantiation of the protocol subclass will thus use the new
1943    # `__init__` and no longer call `_no_init_or_replace_init`.
1944    for base in cls.__mro__:
1945        init = base.__dict__.get('__init__', _no_init_or_replace_init)
1946        if init is not _no_init_or_replace_init:
1947            cls.__init__ = init
1948            break
1949    else:
1950        # should not happen
1951        cls.__init__ = object.__init__
1952
1953    cls.__init__(self, *args, **kwargs)
def info(self, message: str):
24    def info(self, message: str):
25        ...
def check_key_dict(key_dict: Any) -> communex.compat.types.CommuneKeyDict:
28def check_key_dict(key_dict: Any) -> CommuneKeyDict:
29    """
30    Validates a given dictionary as a commune key dictionary and returns it.
31
32    This function checks if the provided dictionary adheres to the structure of
33    a CommuneKeyDict, that is used by the classic `commune` library and returns
34    it if valid.
35
36    Args:
37        key_dict: The dictionary to validate.
38
39    Returns:
40        The validated commune key dictionary. Same as input.
41
42    Raises:
43      AssertionError: If the dictionary does not conform to the expected
44        structure.
45    """
46
47    assert isinstance(key_dict, dict)
48    assert isinstance(key_dict["crypto_type"], int)
49    assert isinstance(key_dict["seed_hex"], str)
50    assert isinstance(key_dict["derive_path"], str | None)
51    assert isinstance(key_dict["path"], str) or key_dict["path"] is None
52    assert isinstance(key_dict["public_key"], str)
53    assert isinstance(key_dict["ss58_format"], int)
54    assert isinstance(key_dict["ss58_address"], str)
55    assert is_ss58_address(key_dict["ss58_address"])
56    assert isinstance(key_dict["private_key"], str)
57    assert isinstance(key_dict["mnemonic"], str)
58    return cast(CommuneKeyDict, key_dict)

Validates a given dictionary as a commune key dictionary and returns it.

This function checks if the provided dictionary adheres to the structure of a CommuneKeyDict, that is used by the classic commune library and returns it if valid.

Arguments:
  • key_dict: The dictionary to validate.
Returns:

The validated commune key dictionary. Same as input.

Raises:
  • AssertionError: If the dictionary does not conform to the expected structure.
def classic_key_path(name: str) -> str:
61def classic_key_path(name: str) -> str:
62    """
63    Constructs the file path for a key name in the classic commune format.
64    """
65
66    home = Path.home()
67    root_path = home / '.commune' / "key"
68    name = name + ".json"
69    return str(root_path / name)

Constructs the file path for a key name in the classic commune format.

def from_classic_dict( data: dict[typing.Any, typing.Any], from_mnemonic: bool = False) -> substrateinterface.keypair.Keypair:
 72def from_classic_dict(
 73        data: dict[Any, Any],
 74        from_mnemonic: bool = False
 75) -> Keypair:
 76    """
 77    Creates a `Key` from a dict conforming to the classic `commune` format.
 78
 79    Args:
 80        data: The key data in a classic commune format.
 81        name: The name to assign to the key.
 82
 83    Returns:
 84        The reconstructed `Key` instance.
 85
 86    Raises:
 87        AssertionError: If `data` does not conform to the expected format.
 88    """
 89
 90    data_ = check_key_dict(data)
 91
 92    ss58_address = data_["ss58_address"]
 93    private_key = data_["private_key"]
 94    mnemonic_key = data_["mnemonic"]
 95    public_key = data_["public_key"]
 96    ss58_format = data_["ss58_format"]
 97    if from_mnemonic:
 98        key = Keypair.create_from_mnemonic(
 99            mnemonic_key, ss58_format
100        )
101    else:
102        key = Keypair.create_from_private_key(
103            private_key, public_key, ss58_address, ss58_format
104        )
105
106    return key

Creates a Key from a dict conforming to the classic commune format.

Arguments:
  • data: The key data in a classic commune format.
  • name: The name to assign to the key.
Returns:

The reconstructed Key instance.

Raises:
  • AssertionError: If data does not conform to the expected format.
def to_classic_dict( keypair: substrateinterface.keypair.Keypair, path: str) -> communex.compat.types.CommuneKeyDict:
109def to_classic_dict(keypair: Keypair, path: str) -> CommuneKeyDict:
110    """
111    Converts a keypair to a dictionary conforming to the classic commune format.
112
113    Args:
114        keypair: The keypair to convert.
115        path: The path/name of the key file.
116    """
117
118    return {
119        "path": path,
120        "mnemonic": check_str(keypair.mnemonic),
121        "public_key": bytes_to_hex(keypair.public_key),
122        "private_key": bytes_to_hex(keypair.private_key),
123        "ss58_address": check_ss58_address(keypair.ss58_address),
124        "seed_hex": bytes_to_hex(keypair.seed_hex),
125        "ss58_format": keypair.ss58_format,
126        "crypto_type": keypair.crypto_type,
127        "derive_path": keypair.derive_path,
128    }

Converts a keypair to a dictionary conforming to the classic commune format.

Arguments:
  • keypair: The keypair to convert.
  • path: The path/name of the key file.
def classic_load_key( name: str, password: str | None = None, from_mnemonic: bool = False) -> substrateinterface.keypair.Keypair:
131def classic_load_key(
132    name: str,
133    password: str | None = None,
134    from_mnemonic: bool = False,
135) -> Keypair:
136    """
137    Loads the keypair with the given name from a disk.
138    """
139    path = classic_key_path(name)
140    key_dict_json = classic_load(path, password=password)
141    key_dict = json.loads(key_dict_json)
142    return from_classic_dict(key_dict, from_mnemonic=from_mnemonic)

Loads the keypair with the given name from a disk.

def is_encrypted(name: str) -> bool:
145def is_encrypted(name: str) -> bool:
146    """
147    Checks if the key with the given name is encrypted.
148    """
149    path = classic_key_path(name)
150    full_path = os.path.expanduser(os.path.join(COMMUNE_HOME, path))
151    with open(full_path, "r") as file:
152        body = json.load(file)
153    return body["encrypted"]

Checks if the key with the given name is encrypted.

def classic_store_key( keypair: substrateinterface.keypair.Keypair, name: str, password: str | None = None) -> None:
156def classic_store_key(keypair: Keypair, name: str, password: str | None = None) -> None:
157    """
158    Stores the given keypair on a disk under the given name.
159    """
160    key_dict = to_classic_dict(keypair, name)
161    key_dict_json = json.dumps(key_dict)
162    path = classic_key_path(name)
163    classic_put(path, key_dict_json, password=password)

Stores the given keypair on a disk under the given name.

def try_classic_load_key( name: str, context: GenericCtx | None = None, password: str | None = None, from_mnemonic: bool = False) -> substrateinterface.keypair.Keypair:
166def try_classic_load_key(
167    name: str, context: GenericCtx | None = None,
168    password: str | None = None,
169    from_mnemonic: bool = False,
170) -> Keypair:
171    try:
172        keypair = classic_load_key(
173            name,
174            password=password,
175            from_mnemonic=from_mnemonic
176        )
177    except json.JSONDecodeError:
178        prompt = f"Please provide the password for the key {name}"
179        if context is not None:
180            context.info(prompt)
181        else:
182            print(prompt)
183        password = getpass()
184        keypair = classic_load_key(
185            name,
186            password=password,
187            from_mnemonic=from_mnemonic
188        )
189    return keypair
def try_load_key( name: str, context: GenericCtx | None = None, password: str | None = None):
192def try_load_key(
193    name: str,
194    context: GenericCtx | None = None,
195    password: str | None = None
196):
197    try:
198        key_dict = classic_load(name, password=password)
199    except json.JSONDecodeError:
200        prompt = f"Please provide the password for the key {name}"
201        if context is not None:
202            context.info(prompt)
203        else:
204            print(prompt)
205        password = getpass()
206        key_dict = classic_load(name, password=password)
207    return key_dict
def local_key_addresses( ctx: GenericCtx | None = None, universal_password: str | None = None) -> dict[str, communex.types.Ss58Address]:
210def local_key_addresses(
211    ctx: GenericCtx | None = None,
212    universal_password: str | None = None
213) -> dict[str, Ss58Address]:
214    """
215    Retrieves a mapping of local key names to their SS58 addresses.
216    If password is passed, it will be used to decrypt every key.
217    If password is not passed and ctx is,
218    the user will be prompted for the password.
219    """
220    home = Path.home()
221    key_dir = home / '.commune' / "key"
222
223    key_names = [f.stem for f in key_dir.iterdir() if f.is_file() and not f.name.startswith('.')]
224
225    addresses_map: dict[str, Ss58Address] = {}
226
227    for key_name in key_names:
228        # issue #11 https://github.com/agicommies/communex/issues/12 added check for key2address to stop error from being thrown by wrong key type.
229        if key_name == "key2address":
230            print("key2address is saved in an invalid format. It will be ignored.")
231            continue
232        encrypted = is_encrypted(key_name)
233        if encrypted:
234            if universal_password:
235                password = universal_password
236            elif ctx:
237                ctx.info(f"Please provide the password for the key '{key_name}'")
238                password = getpass()
239            else:
240                print(f"Please provide the password for the key '{key_name}'")
241                password = getpass()
242        else:
243            password = None
244        key_dict = classic_load_key(key_name, password=password)
245        addresses_map[key_name] = check_ss58_address(key_dict.ss58_address)
246
247    return addresses_map

Retrieves a mapping of local key names to their SS58 addresses. If password is passed, it will be used to decrypt every key. If password is not passed and ctx is, the user will be prompted for the password.

def resolve_key_ss58( key: Union[communex.types.Ss58Address, substrateinterface.keypair.Keypair, str]) -> communex.types.Ss58Address:
250def resolve_key_ss58(key: Ss58Address | Keypair | str) -> Ss58Address:
251    """
252    Resolves a keypair or key name to its corresponding SS58 address.
253
254    If the input is already an SS58 address, it is returned as is.
255    """
256
257    if isinstance(key, Keypair):
258        return key.ss58_address  # type: ignore
259
260    if is_ss58_address(key):
261        return key
262
263    try:
264        keypair = classic_load_key(key)
265    except FileNotFoundError:
266        raise ValueError(
267            f"Key is not a valid SS58 address nor a valid key name: {key}")
268
269    address = keypair.ss58_address
270
271    return check_ss58_address(address)

Resolves a keypair or key name to its corresponding SS58 address.

If the input is already an SS58 address, it is returned as is.

def resolve_key_ss58_encrypted( key: Union[communex.types.Ss58Address, substrateinterface.keypair.Keypair, str], context: GenericCtx, password: str | None = None) -> communex.types.Ss58Address:
274def resolve_key_ss58_encrypted(
275        key: Ss58Address | Keypair | str, context: GenericCtx,
276        password: str | None = None
277
278) -> Ss58Address:
279    """
280    Resolves a keypair or key name to its corresponding SS58 address.
281
282    If the input is already an SS58 address, it is returned as is.
283    """
284
285    if isinstance(key, Keypair):
286        return check_ss58_address(key.ss58_address, key.ss58_format)
287
288    if is_ss58_address(key):
289        return key
290
291    try:
292        keypair = classic_load_key(key, password=password)
293    except json.JSONDecodeError:
294        context.info(f"Please provide the password for the key {key}")
295        password = getpass()
296        keypair = classic_load_key(key, password=password)
297    except FileNotFoundError:
298        raise ValueError(
299            f"Key is not a valid SS58 address nor a valid key name: {key}")
300
301    address = keypair.ss58_address
302
303    return check_ss58_address(address, keypair.ss58_format)

Resolves a keypair or key name to its corresponding SS58 address.

If the input is already an SS58 address, it is returned as is.