Edit on GitHub

communex.compat.key

Key storage compatible with the classic commune library.

WIP

  1"""
  2Key storage compatible with the classic `commune` library.
  3
  4WIP
  5"""
  6
  7import json
  8import os
  9from pathlib import Path
 10from typing import Any, cast
 11
 12from nacl.exceptions import CryptoError
 13from substrateinterface import Keypair
 14
 15from communex.compat.storage import COMMUNE_HOME, classic_load, classic_put
 16from communex.compat.types import CommuneKeyDict
 17from communex.errors import (
 18    InvalidPasswordError,
 19    KeyNotFoundError,
 20    PasswordNotProvidedError,
 21)
 22from communex.key import check_ss58_address, is_ss58_address
 23from communex.password import NoPassword, PasswordProvider
 24from communex.types import Ss58Address
 25from communex.util import bytes_to_hex, check_str
 26
 27
 28def check_key_dict(key_dict: Any) -> CommuneKeyDict:
 29    """
 30    Validates a given dictionary as a commune key dictionary and returns it.
 31
 32    This function checks if the provided dictionary adheres to the structure of
 33    a CommuneKeyDict, that is used by the classic `commune` library and returns
 34    it if valid.
 35
 36    Args:
 37        key_dict: The dictionary to validate.
 38
 39    Returns:
 40        The validated commune key dictionary. Same as input.
 41
 42    Raises:
 43      AssertionError: If the dictionary does not conform to the expected
 44        structure.
 45    """
 46
 47    assert isinstance(key_dict, dict)
 48    assert isinstance(key_dict["crypto_type"], int)
 49    assert isinstance(key_dict["seed_hex"], str)
 50    assert isinstance(key_dict["derive_path"], str | None)
 51    assert isinstance(key_dict["path"], str) or key_dict["path"] is None
 52    assert isinstance(key_dict["public_key"], str)
 53    assert isinstance(key_dict["ss58_format"], int)
 54    assert isinstance(key_dict["ss58_address"], str)
 55    assert is_ss58_address(key_dict["ss58_address"])
 56    assert isinstance(key_dict["private_key"], str)
 57    assert isinstance(key_dict["mnemonic"], str)
 58    return cast(CommuneKeyDict, key_dict)
 59
 60
 61def classic_key_path(name: str) -> str:
 62    """
 63    Constructs the file path for a key name in the classic commune format.
 64    """
 65
 66    home = Path.home()
 67    root_path = home / ".commune" / "key"
 68    name = name + ".json"
 69    return str(root_path / name)
 70
 71
 72def from_classic_dict(
 73    data: dict[Any, Any], from_mnemonic: bool = True
 74) -> Keypair:
 75    """
 76    Creates a `Key` from a dict conforming to the classic `commune` format.
 77
 78    Args:
 79        data: The key data in a classic commune format.
 80        name: The name to assign to the key.
 81
 82    Returns:
 83        The reconstructed `Key` instance.
 84
 85    Raises:
 86        AssertionError: If `data` does not conform to the expected format.
 87    """
 88
 89    data_ = check_key_dict(data)
 90
 91    ss58_address = data_["ss58_address"]
 92    private_key = data_["private_key"]
 93    mnemonic_key = data_["mnemonic"]
 94    public_key = data_["public_key"]
 95    ss58_format = data_["ss58_format"]
 96    if from_mnemonic:
 97        key = Keypair.create_from_mnemonic(mnemonic_key, ss58_format)
 98    else:
 99        key = Keypair.create_from_private_key(
100            private_key, public_key, ss58_address, ss58_format
101        )
102
103    return key
104
105
106def to_classic_dict(keypair: Keypair, path: str) -> CommuneKeyDict:
107    """
108    Converts a keypair to a dictionary conforming to the classic commune format.
109
110    Args:
111        keypair: The keypair to convert.
112        path: The path/name of the key file.
113    """
114
115    return {
116        "path": path,
117        "mnemonic": check_str(keypair.mnemonic),
118        "public_key": bytes_to_hex(keypair.public_key),
119        "private_key": bytes_to_hex(keypair.private_key),
120        "ss58_address": check_ss58_address(keypair.ss58_address),
121        "seed_hex": bytes_to_hex(keypair.seed_hex),
122        "ss58_format": keypair.ss58_format,
123        "crypto_type": keypair.crypto_type,
124        "derive_path": keypair.derive_path,
125    }
126
127
128def classic_load_key(
129    name: str,
130    password: str | None = None,
131    from_mnemonic: bool = True,
132) -> Keypair:
133    """
134    Loads the keypair with the given name from a disk.
135    """
136    path = classic_key_path(name)
137    key_dict_json = classic_load(path, password=password)
138    key_dict = json.loads(key_dict_json)
139    return from_classic_dict(key_dict, from_mnemonic=from_mnemonic)
140
141
142def try_classic_load_key(
143    key_name: str,
144    password: str | None = None,
145    *,
146    password_provider: PasswordProvider = NoPassword(),
147) -> Keypair:
148    password = password or password_provider.get_password(key_name)
149    try:
150        try:
151            keypair = classic_load_key(key_name, password=password)
152        except PasswordNotProvidedError:
153            password = password_provider.ask_password(key_name)
154            keypair = classic_load_key(key_name, password=password)
155    except FileNotFoundError as err:
156        raise KeyNotFoundError(
157            f"Key '{key_name}' is not a valid SS58 address nor a valid key name",
158            err,
159        )
160    except CryptoError as err:
161        raise InvalidPasswordError(
162            f"Invalid password for key '{key_name}'", err
163        )
164
165    return keypair
166
167
168def try_load_key(name: str, password: str | None = None):
169    """
170    DEPRECATED
171    """
172    raise DeprecationWarning("Use try_classic_load_key instead")
173    # try:
174    #     key_dict = classic_load(name, password=password)
175    # except json.JSONDecodeError:
176    #     prompt = f"Please provide the password for the key {name}"
177    #     print(prompt)
178    #     password = getpass()
179    #     key_dict = classic_load(name, password=password)
180    # return key_dict
181
182
183def is_encrypted(name: str) -> bool:
184    """
185    Checks if the key with the given name is encrypted.
186    """
187    path = classic_key_path(name)
188    full_path = os.path.expanduser(os.path.join(COMMUNE_HOME, path))
189    with open(full_path, "r") as file:
190        body = json.load(file)
191    return body["encrypted"]
192
193
194def classic_store_key(
195    keypair: Keypair, name: str, password: str | None = None
196) -> None:
197    """
198    Stores the given keypair on a disk under the given name.
199    """
200    key_dict = to_classic_dict(keypair, name)
201    key_dict_json = json.dumps(key_dict)
202    path = classic_key_path(name)
203    classic_put(path, key_dict_json, password=password)
204
205
206def resolve_key_ss58(key: Ss58Address | Keypair | str) -> Ss58Address:
207    """
208    Resolves a keypair or key name to its corresponding SS58 address.
209
210    If the input is already an SS58 address, it is returned as is.
211
212    DEPRECATED
213    """
214
215    if isinstance(key, Keypair):
216        return key.ss58_address  # type: ignore
217
218    if is_ss58_address(key):
219        return key
220
221    try:
222        keypair = classic_load_key(key)
223    except FileNotFoundError:
224        raise ValueError(
225            f"Key is not a valid SS58 address nor a valid key name: {key}"
226        )
227
228    address = keypair.ss58_address
229
230    return check_ss58_address(address)
231
232
233def resolve_key_ss58_encrypted(
234    key: Ss58Address | Keypair | str,
235    password: str | None = None,
236    password_provider: PasswordProvider = NoPassword(),
237) -> Ss58Address:
238    """
239    Resolves a keypair or key name to its corresponding SS58 address.
240
241    If the input is already an SS58 address, it is returned as is.
242    """
243
244    if isinstance(key, Keypair):
245        return check_ss58_address(key.ss58_address, key.ss58_format)
246
247    if is_ss58_address(key):
248        return key
249
250    keypair = try_classic_load_key(
251        key, password=password, password_provider=password_provider
252    )
253
254    address = keypair.ss58_address
255
256    return check_ss58_address(address, keypair.ss58_format)
257
258
259def local_key_addresses(
260    password_provider: PasswordProvider = NoPassword(),
261) -> dict[str, Ss58Address]:
262    """
263    Retrieves a mapping of local key names to their SS58 addresses.
264    If password is passed, it will be used to decrypt every key.
265    If password is not passed and ctx is,
266    the user will be prompted for the password.
267    """
268
269    # TODO: refactor to return mapping of (key_name -> Keypair)
270    # Outside of this, Keypair can be mapped to Ss58Address
271
272    home = Path.home()
273    key_dir = home / ".commune" / "key"
274
275    key_names = [
276        f.stem
277        for f in key_dir.iterdir()
278        if f.is_file() and not f.name.startswith(".")
279    ]
280
281    addresses_map: dict[str, Ss58Address] = {}
282
283    for key_name in key_names:
284        # issue #12 https://github.com/agicommies/communex/issues/12
285        # added check for key2address to stop error
286        # from being thrown by wrong key type.
287        if key_name == "key2address":
288            print(
289                "key2address is saved in an invalid format. It will be ignored."
290            )
291            continue
292
293        password = password_provider.get_password(key_name)
294        try:
295            keypair = classic_load_key(key_name, password=password)
296        except PasswordNotProvidedError:
297            password = password_provider.ask_password(key_name)
298            keypair = classic_load_key(key_name, password=password)
299
300        addresses_map[key_name] = check_ss58_address(keypair.ss58_address)
301
302    return addresses_map
def check_key_dict(key_dict: Any) -> communex.compat.types.CommuneKeyDict:
29def check_key_dict(key_dict: Any) -> CommuneKeyDict:
30    """
31    Validates a given dictionary as a commune key dictionary and returns it.
32
33    This function checks if the provided dictionary adheres to the structure of
34    a CommuneKeyDict, that is used by the classic `commune` library and returns
35    it if valid.
36
37    Args:
38        key_dict: The dictionary to validate.
39
40    Returns:
41        The validated commune key dictionary. Same as input.
42
43    Raises:
44      AssertionError: If the dictionary does not conform to the expected
45        structure.
46    """
47
48    assert isinstance(key_dict, dict)
49    assert isinstance(key_dict["crypto_type"], int)
50    assert isinstance(key_dict["seed_hex"], str)
51    assert isinstance(key_dict["derive_path"], str | None)
52    assert isinstance(key_dict["path"], str) or key_dict["path"] is None
53    assert isinstance(key_dict["public_key"], str)
54    assert isinstance(key_dict["ss58_format"], int)
55    assert isinstance(key_dict["ss58_address"], str)
56    assert is_ss58_address(key_dict["ss58_address"])
57    assert isinstance(key_dict["private_key"], str)
58    assert isinstance(key_dict["mnemonic"], str)
59    return cast(CommuneKeyDict, key_dict)

Validates a given dictionary as a commune key dictionary and returns it.

This function checks if the provided dictionary adheres to the structure of a CommuneKeyDict, that is used by the classic commune library and returns it if valid.

Arguments:
  • key_dict: The dictionary to validate.
Returns:

The validated commune key dictionary. Same as input.

Raises:
  • AssertionError: If the dictionary does not conform to the expected structure.
def classic_key_path(name: str) -> str:
62def classic_key_path(name: str) -> str:
63    """
64    Constructs the file path for a key name in the classic commune format.
65    """
66
67    home = Path.home()
68    root_path = home / ".commune" / "key"
69    name = name + ".json"
70    return str(root_path / name)

Constructs the file path for a key name in the classic commune format.

def from_classic_dict( data: dict[typing.Any, typing.Any], from_mnemonic: bool = True) -> substrateinterface.keypair.Keypair:
 73def from_classic_dict(
 74    data: dict[Any, Any], from_mnemonic: bool = True
 75) -> Keypair:
 76    """
 77    Creates a `Key` from a dict conforming to the classic `commune` format.
 78
 79    Args:
 80        data: The key data in a classic commune format.
 81        name: The name to assign to the key.
 82
 83    Returns:
 84        The reconstructed `Key` instance.
 85
 86    Raises:
 87        AssertionError: If `data` does not conform to the expected format.
 88    """
 89
 90    data_ = check_key_dict(data)
 91
 92    ss58_address = data_["ss58_address"]
 93    private_key = data_["private_key"]
 94    mnemonic_key = data_["mnemonic"]
 95    public_key = data_["public_key"]
 96    ss58_format = data_["ss58_format"]
 97    if from_mnemonic:
 98        key = Keypair.create_from_mnemonic(mnemonic_key, ss58_format)
 99    else:
100        key = Keypair.create_from_private_key(
101            private_key, public_key, ss58_address, ss58_format
102        )
103
104    return key

Creates a Key from a dict conforming to the classic commune format.

Arguments:
  • data: The key data in a classic commune format.
  • name: The name to assign to the key.
Returns:

The reconstructed Key instance.

Raises:
  • AssertionError: If data does not conform to the expected format.
def to_classic_dict( keypair: substrateinterface.keypair.Keypair, path: str) -> communex.compat.types.CommuneKeyDict:
107def to_classic_dict(keypair: Keypair, path: str) -> CommuneKeyDict:
108    """
109    Converts a keypair to a dictionary conforming to the classic commune format.
110
111    Args:
112        keypair: The keypair to convert.
113        path: The path/name of the key file.
114    """
115
116    return {
117        "path": path,
118        "mnemonic": check_str(keypair.mnemonic),
119        "public_key": bytes_to_hex(keypair.public_key),
120        "private_key": bytes_to_hex(keypair.private_key),
121        "ss58_address": check_ss58_address(keypair.ss58_address),
122        "seed_hex": bytes_to_hex(keypair.seed_hex),
123        "ss58_format": keypair.ss58_format,
124        "crypto_type": keypair.crypto_type,
125        "derive_path": keypair.derive_path,
126    }

Converts a keypair to a dictionary conforming to the classic commune format.

Arguments:
  • keypair: The keypair to convert.
  • path: The path/name of the key file.
def classic_load_key( name: str, password: str | None = None, from_mnemonic: bool = True) -> substrateinterface.keypair.Keypair:
129def classic_load_key(
130    name: str,
131    password: str | None = None,
132    from_mnemonic: bool = True,
133) -> Keypair:
134    """
135    Loads the keypair with the given name from a disk.
136    """
137    path = classic_key_path(name)
138    key_dict_json = classic_load(path, password=password)
139    key_dict = json.loads(key_dict_json)
140    return from_classic_dict(key_dict, from_mnemonic=from_mnemonic)

Loads the keypair with the given name from a disk.

def try_classic_load_key( key_name: str, password: str | None = None, *, password_provider: communex.password.PasswordProvider = <communex.password.NoPassword object>) -> substrateinterface.keypair.Keypair:
143def try_classic_load_key(
144    key_name: str,
145    password: str | None = None,
146    *,
147    password_provider: PasswordProvider = NoPassword(),
148) -> Keypair:
149    password = password or password_provider.get_password(key_name)
150    try:
151        try:
152            keypair = classic_load_key(key_name, password=password)
153        except PasswordNotProvidedError:
154            password = password_provider.ask_password(key_name)
155            keypair = classic_load_key(key_name, password=password)
156    except FileNotFoundError as err:
157        raise KeyNotFoundError(
158            f"Key '{key_name}' is not a valid SS58 address nor a valid key name",
159            err,
160        )
161    except CryptoError as err:
162        raise InvalidPasswordError(
163            f"Invalid password for key '{key_name}'", err
164        )
165
166    return keypair
def try_load_key(name: str, password: str | None = None):
169def try_load_key(name: str, password: str | None = None):
170    """
171    DEPRECATED
172    """
173    raise DeprecationWarning("Use try_classic_load_key instead")
174    # try:
175    #     key_dict = classic_load(name, password=password)
176    # except json.JSONDecodeError:
177    #     prompt = f"Please provide the password for the key {name}"
178    #     print(prompt)
179    #     password = getpass()
180    #     key_dict = classic_load(name, password=password)
181    # return key_dict

DEPRECATED

def is_encrypted(name: str) -> bool:
184def is_encrypted(name: str) -> bool:
185    """
186    Checks if the key with the given name is encrypted.
187    """
188    path = classic_key_path(name)
189    full_path = os.path.expanduser(os.path.join(COMMUNE_HOME, path))
190    with open(full_path, "r") as file:
191        body = json.load(file)
192    return body["encrypted"]

Checks if the key with the given name is encrypted.

def classic_store_key( keypair: substrateinterface.keypair.Keypair, name: str, password: str | None = None) -> None:
195def classic_store_key(
196    keypair: Keypair, name: str, password: str | None = None
197) -> None:
198    """
199    Stores the given keypair on a disk under the given name.
200    """
201    key_dict = to_classic_dict(keypair, name)
202    key_dict_json = json.dumps(key_dict)
203    path = classic_key_path(name)
204    classic_put(path, key_dict_json, password=password)

Stores the given keypair on a disk under the given name.

def resolve_key_ss58( key: Union[communex.types.Ss58Address, substrateinterface.keypair.Keypair, str]) -> communex.types.Ss58Address:
207def resolve_key_ss58(key: Ss58Address | Keypair | str) -> Ss58Address:
208    """
209    Resolves a keypair or key name to its corresponding SS58 address.
210
211    If the input is already an SS58 address, it is returned as is.
212
213    DEPRECATED
214    """
215
216    if isinstance(key, Keypair):
217        return key.ss58_address  # type: ignore
218
219    if is_ss58_address(key):
220        return key
221
222    try:
223        keypair = classic_load_key(key)
224    except FileNotFoundError:
225        raise ValueError(
226            f"Key is not a valid SS58 address nor a valid key name: {key}"
227        )
228
229    address = keypair.ss58_address
230
231    return check_ss58_address(address)

Resolves a keypair or key name to its corresponding SS58 address.

If the input is already an SS58 address, it is returned as is.

DEPRECATED

def resolve_key_ss58_encrypted( key: Union[communex.types.Ss58Address, substrateinterface.keypair.Keypair, str], password: str | None = None, password_provider: communex.password.PasswordProvider = <communex.password.NoPassword object>) -> communex.types.Ss58Address:
234def resolve_key_ss58_encrypted(
235    key: Ss58Address | Keypair | str,
236    password: str | None = None,
237    password_provider: PasswordProvider = NoPassword(),
238) -> Ss58Address:
239    """
240    Resolves a keypair or key name to its corresponding SS58 address.
241
242    If the input is already an SS58 address, it is returned as is.
243    """
244
245    if isinstance(key, Keypair):
246        return check_ss58_address(key.ss58_address, key.ss58_format)
247
248    if is_ss58_address(key):
249        return key
250
251    keypair = try_classic_load_key(
252        key, password=password, password_provider=password_provider
253    )
254
255    address = keypair.ss58_address
256
257    return check_ss58_address(address, keypair.ss58_format)

Resolves a keypair or key name to its corresponding SS58 address.

If the input is already an SS58 address, it is returned as is.

def local_key_addresses( password_provider: communex.password.PasswordProvider = <communex.password.NoPassword object>) -> dict[str, communex.types.Ss58Address]:
260def local_key_addresses(
261    password_provider: PasswordProvider = NoPassword(),
262) -> dict[str, Ss58Address]:
263    """
264    Retrieves a mapping of local key names to their SS58 addresses.
265    If password is passed, it will be used to decrypt every key.
266    If password is not passed and ctx is,
267    the user will be prompted for the password.
268    """
269
270    # TODO: refactor to return mapping of (key_name -> Keypair)
271    # Outside of this, Keypair can be mapped to Ss58Address
272
273    home = Path.home()
274    key_dir = home / ".commune" / "key"
275
276    key_names = [
277        f.stem
278        for f in key_dir.iterdir()
279        if f.is_file() and not f.name.startswith(".")
280    ]
281
282    addresses_map: dict[str, Ss58Address] = {}
283
284    for key_name in key_names:
285        # issue #12 https://github.com/agicommies/communex/issues/12
286        # added check for key2address to stop error
287        # from being thrown by wrong key type.
288        if key_name == "key2address":
289            print(
290                "key2address is saved in an invalid format. It will be ignored."
291            )
292            continue
293
294        password = password_provider.get_password(key_name)
295        try:
296            keypair = classic_load_key(key_name, password=password)
297        except PasswordNotProvidedError:
298            password = password_provider.ask_password(key_name)
299            keypair = classic_load_key(key_name, password=password)
300
301        addresses_map[key_name] = check_ss58_address(keypair.ss58_address)
302
303    return addresses_map

Retrieves a mapping of local key names to their SS58 addresses. If password is passed, it will be used to decrypt every key. If password is not passed and ctx is, the user will be prompted for the password.