Edit on GitHub

communex.compat.key

Key storage compatible with the classic commune library.

WIP

  1"""
  2Key storage compatible with the classic `commune` library.
  3
  4WIP
  5"""
  6
  7import json
  8import os
  9from pathlib import Path
 10from typing import Any, cast
 11
 12from nacl.exceptions import CryptoError
 13from substrateinterface import Keypair
 14
 15from communex.compat.storage import COMMUNE_HOME, classic_load, classic_put
 16from communex.compat.types import CommuneKeyDict
 17from communex.errors import (
 18    InvalidPasswordError,
 19    KeyNotFoundError,
 20    PasswordNotProvidedError,
 21)
 22from communex.key import check_ss58_address, is_ss58_address
 23from communex.password import NoPassword, PasswordProvider
 24from communex.types import Ss58Address
 25from communex.util import bytes_to_hex, check_str
 26
 27
 28def check_key_dict(key_dict: Any) -> CommuneKeyDict:
 29    """
 30    Validates a given dictionary as a commune key dictionary and returns it.
 31
 32    This function checks if the provided dictionary adheres to the structure of
 33    a CommuneKeyDict, that is used by the classic `commune` library and returns
 34    it if valid.
 35
 36    Args:
 37        key_dict: The dictionary to validate.
 38
 39    Returns:
 40        The validated commune key dictionary. Same as input.
 41
 42    Raises:
 43      AssertionError: If the dictionary does not conform to the expected
 44        structure.
 45    """
 46
 47    assert isinstance(key_dict, dict)
 48    assert isinstance(key_dict["crypto_type"], int)
 49    assert isinstance(key_dict["seed_hex"], str)
 50    assert isinstance(key_dict["derive_path"], str | None)
 51    assert isinstance(key_dict["path"], str) or key_dict["path"] is None
 52    assert isinstance(key_dict["public_key"], str)
 53    assert isinstance(key_dict["ss58_format"], int)
 54    assert isinstance(key_dict["ss58_address"], str)
 55    assert is_ss58_address(key_dict["ss58_address"])
 56    assert isinstance(key_dict["private_key"], str)
 57    assert isinstance(key_dict["mnemonic"], str)
 58    return cast(CommuneKeyDict, key_dict)
 59
 60
 61def classic_key_path(name: str) -> str:
 62    """
 63    Constructs the file path for a key name in the classic commune format.
 64    """
 65
 66    home = Path.home()
 67    root_path = home / ".commune" / "key"
 68    name = name + ".json"
 69    return str(root_path / name)
 70
 71
 72def from_classic_dict(
 73    data: dict[Any, Any], from_mnemonic: bool = True
 74) -> Keypair:
 75    """
 76    Creates a `Key` from a dict conforming to the classic `commune` format.
 77
 78    Args:
 79        data: The key data in a classic commune format.
 80        name: The name to assign to the key.
 81
 82    Returns:
 83        The reconstructed `Key` instance.
 84
 85    Raises:
 86        AssertionError: If `data` does not conform to the expected format.
 87    """
 88
 89    data_ = check_key_dict(data)
 90
 91    ss58_address = data_["ss58_address"]
 92    private_key = data_["private_key"]
 93    mnemonic_key = data_["mnemonic"]
 94    public_key = data_["public_key"]
 95    ss58_format = data_["ss58_format"]
 96    if from_mnemonic:
 97        key = Keypair.create_from_mnemonic(mnemonic_key, ss58_format)
 98    else:
 99        key = Keypair.create_from_private_key(
100            private_key, public_key, ss58_address, ss58_format
101        )
102
103    return key
104
105
106def to_classic_dict(keypair: Keypair, path: str) -> CommuneKeyDict:
107    """
108    Converts a keypair to a dictionary conforming to the classic commune format.
109
110    Args:
111        keypair: The keypair to convert.
112        path: The path/name of the key file.
113    """
114
115    return {
116        "path": path,
117        "mnemonic": check_str(keypair.mnemonic),
118        "public_key": bytes_to_hex(keypair.public_key),
119        "private_key": bytes_to_hex(keypair.private_key),
120        "ss58_address": check_ss58_address(keypair.ss58_address),
121        "seed_hex": bytes_to_hex(keypair.seed_hex),
122        "ss58_format": keypair.ss58_format,
123        "crypto_type": keypair.crypto_type,
124        "derive_path": keypair.derive_path,
125    }
126
127
128def classic_load_key(
129    name: str,
130    password: str | None = None,
131    from_mnemonic: bool = True, # Should be removed in future versions
132) -> Keypair:
133    """
134    Loads the keypair with the given name from a disk.
135    """
136    path = classic_key_path(name)
137    key_dict_json = classic_load(path, password=password)
138    key_dict = json.loads(key_dict_json)
139    try:
140        kp = from_classic_dict(key_dict, from_mnemonic=from_mnemonic)
141    except ValueError:
142        kp = from_classic_dict(
143            key_dict, from_mnemonic=(not from_mnemonic)
144        )
145    return kp
146
147
148def try_classic_load_key(
149    key_name: str,
150    password: str | None = None,
151    *,
152    password_provider: PasswordProvider = NoPassword(),
153) -> Keypair:
154    password = password or password_provider.get_password(key_name)
155    try:
156        try:
157            keypair = classic_load_key(key_name, password=password)
158        except PasswordNotProvidedError:
159            password = password_provider.ask_password(key_name)
160            keypair = classic_load_key(key_name, password=password)
161    except FileNotFoundError as err:
162        raise KeyNotFoundError(
163            f"Key '{key_name}' is not a valid SS58 address nor a valid key name",
164            err,
165        )
166    except CryptoError as err:
167        raise InvalidPasswordError(
168            f"Invalid password for key '{key_name}'", err
169        )
170
171    return keypair
172
173
174def try_load_key(name: str, password: str | None = None):
175    """
176    DEPRECATED
177    """
178    raise DeprecationWarning("Use try_classic_load_key instead")
179    # try:
180    #     key_dict = classic_load(name, password=password)
181    # except json.JSONDecodeError:
182    #     prompt = f"Please provide the password for the key {name}"
183    #     print(prompt)
184    #     password = getpass()
185    #     key_dict = classic_load(name, password=password)
186    # return key_dict
187
188
189def is_encrypted(name: str) -> bool:
190    """
191    Checks if the key with the given name is encrypted.
192    """
193    path = classic_key_path(name)
194    full_path = os.path.expanduser(os.path.join(COMMUNE_HOME, path))
195    with open(full_path, "r") as file:
196        body = json.load(file)
197    return body["encrypted"]
198
199
200def classic_store_key(
201    keypair: Keypair, name: str, password: str | None = None
202) -> None:
203    """
204    Stores the given keypair on a disk under the given name.
205    """
206    key_dict = to_classic_dict(keypair, name)
207    key_dict_json = json.dumps(key_dict)
208    path = classic_key_path(name)
209    classic_put(path, key_dict_json, password=password)
210
211
212def resolve_key_ss58(key: Ss58Address | Keypair | str) -> Ss58Address:
213    """
214    Resolves a keypair or key name to its corresponding SS58 address.
215
216    If the input is already an SS58 address, it is returned as is.
217
218    DEPRECATED
219    """
220
221    if isinstance(key, Keypair):
222        return key.ss58_address  # type: ignore
223
224    if is_ss58_address(key):
225        return key
226
227    try:
228        keypair = classic_load_key(key)
229    except FileNotFoundError:
230        raise ValueError(
231            f"Key is not a valid SS58 address nor a valid key name: {key}"
232        )
233
234    address = keypair.ss58_address
235
236    return check_ss58_address(address)
237
238
239def resolve_key_ss58_encrypted(
240    key: Ss58Address | Keypair | str,
241    password: str | None = None,
242    password_provider: PasswordProvider = NoPassword(),
243) -> Ss58Address:
244    """
245    Resolves a keypair or key name to its corresponding SS58 address.
246
247    If the input is already an SS58 address, it is returned as is.
248    """
249
250    if isinstance(key, Keypair):
251        return check_ss58_address(key.ss58_address, key.ss58_format)
252
253    if is_ss58_address(key):
254        return key
255
256    keypair = try_classic_load_key(
257        key, password=password, password_provider=password_provider
258    )
259
260    address = keypair.ss58_address
261
262    return check_ss58_address(address, keypair.ss58_format)
263
264
265def local_key_addresses(
266    password_provider: PasswordProvider = NoPassword(),
267) -> dict[str, Ss58Address]:
268    """
269    Retrieves a mapping of local key names to their SS58 addresses.
270    If password is passed, it will be used to decrypt every key.
271    If password is not passed and ctx is,
272    the user will be prompted for the password.
273    """
274
275    # TODO: refactor to return mapping of (key_name -> Keypair)
276    # Outside of this, Keypair can be mapped to Ss58Address
277
278    home = Path.home()
279    key_dir = home / ".commune" / "key"
280
281    key_names = [
282        f.stem
283        for f in key_dir.iterdir()
284        if f.is_file() and not f.name.startswith(".")
285    ]
286
287    addresses_map: dict[str, Ss58Address] = {}
288
289    for key_name in key_names:
290        # issue #12 https://github.com/agicommies/communex/issues/12
291        # added check for key2address to stop error
292        # from being thrown by wrong key type.
293        if key_name == "key2address":
294            print(
295                "key2address is saved in an invalid format. It will be ignored."
296            )
297            continue
298
299        password = password_provider.get_password(key_name)
300        try:
301            keypair = classic_load_key(key_name, password=password)
302        except PasswordNotProvidedError:
303            password = password_provider.ask_password(key_name)
304            keypair = classic_load_key(key_name, password=password)
305
306        addresses_map[key_name] = check_ss58_address(keypair.ss58_address)
307
308    return addresses_map
def check_key_dict(key_dict: Any) -> communex.compat.types.CommuneKeyDict:
29def check_key_dict(key_dict: Any) -> CommuneKeyDict:
30    """
31    Validates a given dictionary as a commune key dictionary and returns it.
32
33    This function checks if the provided dictionary adheres to the structure of
34    a CommuneKeyDict, that is used by the classic `commune` library and returns
35    it if valid.
36
37    Args:
38        key_dict: The dictionary to validate.
39
40    Returns:
41        The validated commune key dictionary. Same as input.
42
43    Raises:
44      AssertionError: If the dictionary does not conform to the expected
45        structure.
46    """
47
48    assert isinstance(key_dict, dict)
49    assert isinstance(key_dict["crypto_type"], int)
50    assert isinstance(key_dict["seed_hex"], str)
51    assert isinstance(key_dict["derive_path"], str | None)
52    assert isinstance(key_dict["path"], str) or key_dict["path"] is None
53    assert isinstance(key_dict["public_key"], str)
54    assert isinstance(key_dict["ss58_format"], int)
55    assert isinstance(key_dict["ss58_address"], str)
56    assert is_ss58_address(key_dict["ss58_address"])
57    assert isinstance(key_dict["private_key"], str)
58    assert isinstance(key_dict["mnemonic"], str)
59    return cast(CommuneKeyDict, key_dict)

Validates a given dictionary as a commune key dictionary and returns it.

This function checks if the provided dictionary adheres to the structure of a CommuneKeyDict, that is used by the classic commune library and returns it if valid.

Arguments:
  • key_dict: The dictionary to validate.
Returns:

The validated commune key dictionary. Same as input.

Raises:
  • AssertionError: If the dictionary does not conform to the expected structure.
def classic_key_path(name: str) -> str:
62def classic_key_path(name: str) -> str:
63    """
64    Constructs the file path for a key name in the classic commune format.
65    """
66
67    home = Path.home()
68    root_path = home / ".commune" / "key"
69    name = name + ".json"
70    return str(root_path / name)

Constructs the file path for a key name in the classic commune format.

def from_classic_dict( data: dict[typing.Any, typing.Any], from_mnemonic: bool = True) -> substrateinterface.keypair.Keypair:
 73def from_classic_dict(
 74    data: dict[Any, Any], from_mnemonic: bool = True
 75) -> Keypair:
 76    """
 77    Creates a `Key` from a dict conforming to the classic `commune` format.
 78
 79    Args:
 80        data: The key data in a classic commune format.
 81        name: The name to assign to the key.
 82
 83    Returns:
 84        The reconstructed `Key` instance.
 85
 86    Raises:
 87        AssertionError: If `data` does not conform to the expected format.
 88    """
 89
 90    data_ = check_key_dict(data)
 91
 92    ss58_address = data_["ss58_address"]
 93    private_key = data_["private_key"]
 94    mnemonic_key = data_["mnemonic"]
 95    public_key = data_["public_key"]
 96    ss58_format = data_["ss58_format"]
 97    if from_mnemonic:
 98        key = Keypair.create_from_mnemonic(mnemonic_key, ss58_format)
 99    else:
100        key = Keypair.create_from_private_key(
101            private_key, public_key, ss58_address, ss58_format
102        )
103
104    return key

Creates a Key from a dict conforming to the classic commune format.

Arguments:
  • data: The key data in a classic commune format.
  • name: The name to assign to the key.
Returns:

The reconstructed Key instance.

Raises:
  • AssertionError: If data does not conform to the expected format.
def to_classic_dict( keypair: substrateinterface.keypair.Keypair, path: str) -> communex.compat.types.CommuneKeyDict:
107def to_classic_dict(keypair: Keypair, path: str) -> CommuneKeyDict:
108    """
109    Converts a keypair to a dictionary conforming to the classic commune format.
110
111    Args:
112        keypair: The keypair to convert.
113        path: The path/name of the key file.
114    """
115
116    return {
117        "path": path,
118        "mnemonic": check_str(keypair.mnemonic),
119        "public_key": bytes_to_hex(keypair.public_key),
120        "private_key": bytes_to_hex(keypair.private_key),
121        "ss58_address": check_ss58_address(keypair.ss58_address),
122        "seed_hex": bytes_to_hex(keypair.seed_hex),
123        "ss58_format": keypair.ss58_format,
124        "crypto_type": keypair.crypto_type,
125        "derive_path": keypair.derive_path,
126    }

Converts a keypair to a dictionary conforming to the classic commune format.

Arguments:
  • keypair: The keypair to convert.
  • path: The path/name of the key file.
def classic_load_key( name: str, password: str | None = None, from_mnemonic: bool = True) -> substrateinterface.keypair.Keypair:
129def classic_load_key(
130    name: str,
131    password: str | None = None,
132    from_mnemonic: bool = True, # Should be removed in future versions
133) -> Keypair:
134    """
135    Loads the keypair with the given name from a disk.
136    """
137    path = classic_key_path(name)
138    key_dict_json = classic_load(path, password=password)
139    key_dict = json.loads(key_dict_json)
140    try:
141        kp = from_classic_dict(key_dict, from_mnemonic=from_mnemonic)
142    except ValueError:
143        kp = from_classic_dict(
144            key_dict, from_mnemonic=(not from_mnemonic)
145        )
146    return kp

Loads the keypair with the given name from a disk.

def try_classic_load_key( key_name: str, password: str | None = None, *, password_provider: communex.password.PasswordProvider = <communex.password.NoPassword object>) -> substrateinterface.keypair.Keypair:
149def try_classic_load_key(
150    key_name: str,
151    password: str | None = None,
152    *,
153    password_provider: PasswordProvider = NoPassword(),
154) -> Keypair:
155    password = password or password_provider.get_password(key_name)
156    try:
157        try:
158            keypair = classic_load_key(key_name, password=password)
159        except PasswordNotProvidedError:
160            password = password_provider.ask_password(key_name)
161            keypair = classic_load_key(key_name, password=password)
162    except FileNotFoundError as err:
163        raise KeyNotFoundError(
164            f"Key '{key_name}' is not a valid SS58 address nor a valid key name",
165            err,
166        )
167    except CryptoError as err:
168        raise InvalidPasswordError(
169            f"Invalid password for key '{key_name}'", err
170        )
171
172    return keypair
def try_load_key(name: str, password: str | None = None):
175def try_load_key(name: str, password: str | None = None):
176    """
177    DEPRECATED
178    """
179    raise DeprecationWarning("Use try_classic_load_key instead")
180    # try:
181    #     key_dict = classic_load(name, password=password)
182    # except json.JSONDecodeError:
183    #     prompt = f"Please provide the password for the key {name}"
184    #     print(prompt)
185    #     password = getpass()
186    #     key_dict = classic_load(name, password=password)
187    # return key_dict

DEPRECATED

def is_encrypted(name: str) -> bool:
190def is_encrypted(name: str) -> bool:
191    """
192    Checks if the key with the given name is encrypted.
193    """
194    path = classic_key_path(name)
195    full_path = os.path.expanduser(os.path.join(COMMUNE_HOME, path))
196    with open(full_path, "r") as file:
197        body = json.load(file)
198    return body["encrypted"]

Checks if the key with the given name is encrypted.

def classic_store_key( keypair: substrateinterface.keypair.Keypair, name: str, password: str | None = None) -> None:
201def classic_store_key(
202    keypair: Keypair, name: str, password: str | None = None
203) -> None:
204    """
205    Stores the given keypair on a disk under the given name.
206    """
207    key_dict = to_classic_dict(keypair, name)
208    key_dict_json = json.dumps(key_dict)
209    path = classic_key_path(name)
210    classic_put(path, key_dict_json, password=password)

Stores the given keypair on a disk under the given name.

def resolve_key_ss58( key: Union[communex.types.Ss58Address, substrateinterface.keypair.Keypair, str]) -> communex.types.Ss58Address:
213def resolve_key_ss58(key: Ss58Address | Keypair | str) -> Ss58Address:
214    """
215    Resolves a keypair or key name to its corresponding SS58 address.
216
217    If the input is already an SS58 address, it is returned as is.
218
219    DEPRECATED
220    """
221
222    if isinstance(key, Keypair):
223        return key.ss58_address  # type: ignore
224
225    if is_ss58_address(key):
226        return key
227
228    try:
229        keypair = classic_load_key(key)
230    except FileNotFoundError:
231        raise ValueError(
232            f"Key is not a valid SS58 address nor a valid key name: {key}"
233        )
234
235    address = keypair.ss58_address
236
237    return check_ss58_address(address)

Resolves a keypair or key name to its corresponding SS58 address.

If the input is already an SS58 address, it is returned as is.

DEPRECATED

def resolve_key_ss58_encrypted( key: Union[communex.types.Ss58Address, substrateinterface.keypair.Keypair, str], password: str | None = None, password_provider: communex.password.PasswordProvider = <communex.password.NoPassword object>) -> communex.types.Ss58Address:
240def resolve_key_ss58_encrypted(
241    key: Ss58Address | Keypair | str,
242    password: str | None = None,
243    password_provider: PasswordProvider = NoPassword(),
244) -> Ss58Address:
245    """
246    Resolves a keypair or key name to its corresponding SS58 address.
247
248    If the input is already an SS58 address, it is returned as is.
249    """
250
251    if isinstance(key, Keypair):
252        return check_ss58_address(key.ss58_address, key.ss58_format)
253
254    if is_ss58_address(key):
255        return key
256
257    keypair = try_classic_load_key(
258        key, password=password, password_provider=password_provider
259    )
260
261    address = keypair.ss58_address
262
263    return check_ss58_address(address, keypair.ss58_format)

Resolves a keypair or key name to its corresponding SS58 address.

If the input is already an SS58 address, it is returned as is.

def local_key_addresses( password_provider: communex.password.PasswordProvider = <communex.password.NoPassword object>) -> dict[str, communex.types.Ss58Address]:
266def local_key_addresses(
267    password_provider: PasswordProvider = NoPassword(),
268) -> dict[str, Ss58Address]:
269    """
270    Retrieves a mapping of local key names to their SS58 addresses.
271    If password is passed, it will be used to decrypt every key.
272    If password is not passed and ctx is,
273    the user will be prompted for the password.
274    """
275
276    # TODO: refactor to return mapping of (key_name -> Keypair)
277    # Outside of this, Keypair can be mapped to Ss58Address
278
279    home = Path.home()
280    key_dir = home / ".commune" / "key"
281
282    key_names = [
283        f.stem
284        for f in key_dir.iterdir()
285        if f.is_file() and not f.name.startswith(".")
286    ]
287
288    addresses_map: dict[str, Ss58Address] = {}
289
290    for key_name in key_names:
291        # issue #12 https://github.com/agicommies/communex/issues/12
292        # added check for key2address to stop error
293        # from being thrown by wrong key type.
294        if key_name == "key2address":
295            print(
296                "key2address is saved in an invalid format. It will be ignored."
297            )
298            continue
299
300        password = password_provider.get_password(key_name)
301        try:
302            keypair = classic_load_key(key_name, password=password)
303        except PasswordNotProvidedError:
304            password = password_provider.ask_password(key_name)
305            keypair = classic_load_key(key_name, password=password)
306
307        addresses_map[key_name] = check_ss58_address(keypair.ss58_address)
308
309    return addresses_map

Retrieves a mapping of local key names to their SS58 addresses. If password is passed, it will be used to decrypt every key. If password is not passed and ctx is, the user will be prompted for the password.