Key storage compatible with the classic commune library.


  2Key storage compatible with the classic `commune` library.
  7import json
  8import os
  9from getpass import getpass
 10from pathlib import Path
 11from typing import Any, Protocol, cast
 13from substrateinterface import Keypair  # type: ignore
 15from communex.compat.storage import COMMUNE_HOME, classic_load, classic_put
 16from communex.compat.types import CommuneKeyDict
 17from communex.key import check_ss58_address, is_ss58_address
 18from communex.types import Ss58Address
 19from communex.util import bytes_to_hex, check_str
 22class GenericCtx(Protocol):
 23    def info(self, message: str):
 24        ...
 27def check_key_dict(key_dict: Any) -> CommuneKeyDict:
 28    """
 29    Validates a given dictionary as a commune key dictionary and returns it.
 31    This function checks if the provided dictionary adheres to the structure of
 32    a CommuneKeyDict, that is used by the classic `commune` library and returns
 33    it if valid.
 35    Args:
 36        key_dict: The dictionary to validate.
 38    Returns:
 39        The validated commune key dictionary. Same as input.
 41    Raises:
 42      AssertionError: If the dictionary does not conform to the expected
 43        structure.
 44    """
 46    assert isinstance(key_dict, dict)
 47    assert isinstance(key_dict["crypto_type"], int)
 48    assert isinstance(key_dict["seed_hex"], str)
 49    assert isinstance(key_dict["derive_path"], str | None)
 50    assert isinstance(key_dict["path"], str) or key_dict["path"] is None
 51    assert isinstance(key_dict["public_key"], str)
 52    assert isinstance(key_dict["ss58_format"], int)
 53    assert isinstance(key_dict["ss58_address"], str)
 54    assert is_ss58_address(key_dict["ss58_address"])
 55    assert isinstance(key_dict["private_key"], str)
 56    assert isinstance(key_dict["mnemonic"], str)
 57    return cast(CommuneKeyDict, key_dict)
 60def classic_key_path(name: str) -> str:
 61    """
 62    Constructs the file path for a key name in the classic commune format.
 63    """
 65    home = Path.home()
 66    root_path = home / '.commune' / "key"
 67    name = name + ".json"
 68    return str(root_path / name)
 71def from_classic_dict(
 72        data: dict[Any, Any],
 73        from_mnemonic: bool = False
 74) -> Keypair:
 75    """
 76    Creates a `Key` from a dict conforming to the classic `commune` format.
 78    Args:
 79        data: The key data in a classic commune format.
 80        name: The name to assign to the key.
 82    Returns:
 83        The reconstructed `Key` instance.
 85    Raises:
 86        AssertionError: If `data` does not conform to the expected format.
 87    """
 89    data_ = check_key_dict(data)
 91    ss58_address = data_["ss58_address"]
 92    private_key = data_["private_key"]
 93    mnemonic_key = data_["mnemonic"]
 94    public_key = data_["public_key"]
 95    ss58_format = data_["ss58_format"]
 96    if from_mnemonic:
 97        key = Keypair.create_from_mnemonic(
 98            mnemonic_key, ss58_format
 99        )
100    else:
101        key = Keypair.create_from_private_key(
102            private_key, public_key, ss58_address, ss58_format
103        )
105    return key
108def to_classic_dict(keypair: Keypair, path: str) -> CommuneKeyDict:
109    """
110    Converts a keypair to a dictionary conforming to the classic commune format.
112    Args:
113        keypair: The keypair to convert.
114        path: The path/name of the key file.
115    """
117    return {
118        "path": path,
119        "mnemonic": check_str(keypair.mnemonic),
120        "public_key": bytes_to_hex(keypair.public_key),
121        "private_key": bytes_to_hex(keypair.private_key),
122        "ss58_address": check_ss58_address(keypair.ss58_address),
123        "seed_hex": bytes_to_hex(keypair.seed_hex),
124        "ss58_format": keypair.ss58_format,
125        "crypto_type": keypair.crypto_type,
126        "derive_path": keypair.derive_path,
127    }
130def classic_load_key(
131    name: str,
132    password: str | None = None,
133    from_mnemonic: bool = False,
134) -> Keypair:
135    """
136    Loads the keypair with the given name from a disk.
137    """
138    path = classic_key_path(name)
139    key_dict_json = classic_load(path, password=password)
140    key_dict = json.loads(key_dict_json)
141    return from_classic_dict(key_dict, from_mnemonic=from_mnemonic)
144def is_encrypted(name: str) -> bool:
145    """
146    Checks if the key with the given name is encrypted.
147    """
148    path = classic_key_path(name)
149    full_path = os.path.expanduser(os.path.join(COMMUNE_HOME, path))
150    with open(full_path, "r") as file:
151        body = json.load(file)
152    return body["encrypted"]
155def classic_store_key(keypair: Keypair, name: str, password: str | None = None) -> None:
156    """
157    Stores the given keypair on a disk under the given name.
158    """
159    key_dict = to_classic_dict(keypair, name)
160    key_dict_json = json.dumps(key_dict)
161    path = classic_key_path(name)
162    classic_put(path, key_dict_json, password=password)
165def try_classic_load_key(
166    name: str, context: GenericCtx | None = None,
167    password: str | None = None,
168    from_mnemonic: bool = False,
169) -> Keypair:
170    try:
171        keypair = classic_load_key(
172            name,
173            password=password,
174            from_mnemonic=from_mnemonic
175        )
176    except json.JSONDecodeError:
177        prompt = f"Please provide the password for the key {name}"
178        if context is not None:
179            context.info(prompt)
180        else:
181            print(prompt)
182        password = getpass()
183        keypair = classic_load_key(
184            name,
185            password=password,
186            from_mnemonic=from_mnemonic
187        )
188    return keypair
191def try_load_key(
192    name: str,
193    context: GenericCtx | None = None,
194    password: str | None = None
196    try:
197        key_dict = classic_load(name, password=password)
198    except json.JSONDecodeError:
199        prompt = f"Please provide the password for the key {name}"
200        if context is not None:
201            context.info(prompt)
202        else:
203            print(prompt)
204        password = getpass()
205        key_dict = classic_load(name, password=password)
206    return key_dict
209def local_key_addresses(
210    ctx: GenericCtx | None = None,
211    universal_password: str | None = None
212) -> dict[str, Ss58Address]:
213    """
214    Retrieves a mapping of local key names to their SS58 addresses.
215    If password is passed, it will be used to decrypt every key.
216    If password is not passed and ctx is,
217    the user will be prompted for the password.
218    """
219    home = Path.home()
220    key_dir = home / '.commune' / "key"
222    key_names = [f.stem for f in key_dir.iterdir() if f.is_file() and not f.name.startswith('.')]
224    addresses_map: dict[str, Ss58Address] = {}
226    for key_name in key_names:
227        # issue #11 https://github.com/agicommies/communex/issues/12 added check for key2address to stop error from being thrown by wrong key type.
228        if key_name == "key2address":
229            print("key2address is saved in an invalid format. It will be ignored.")
230            continue
231        encrypted = is_encrypted(key_name)
232        if encrypted:
233            if universal_password:
234                password = universal_password
235            elif ctx:
236                ctx.info(f"Please provide the password for the key '{key_name}'")
237                password = getpass()
238            else:
239                print(f"Please provide the password for the key '{key_name}'")
240                password = getpass()
241        else:
242            password = None
243        key_dict = classic_load_key(key_name, password=password)
244        addresses_map[key_name] = check_ss58_address(key_dict.ss58_address)
246    return addresses_map
249def resolve_key_ss58(key: Ss58Address | Keypair | str) -> Ss58Address:
250    """
251    Resolves a keypair or key name to its corresponding SS58 address.
253    If the input is already an SS58 address, it is returned as is.
254    """
256    if isinstance(key, Keypair):
257        return key.ss58_address  # type: ignore
259    if is_ss58_address(key):
260        return key
262    try:
263        keypair = classic_load_key(key)
264    except FileNotFoundError:
265        raise ValueError(
266            f"Key is not a valid SS58 address nor a valid key name: {key}")
268    address = keypair.ss58_address
270    return check_ss58_address(address)
273def resolve_key_ss58_encrypted(
274        key: Ss58Address | Keypair | str, context: GenericCtx,
275        password: str | None = None
277) -> Ss58Address:
278    """
279    Resolves a keypair or key name to its corresponding SS58 address.
281    If the input is already an SS58 address, it is returned as is.
282    """
284    if isinstance(key, Keypair):
285        return check_ss58_address(key.ss58_address, key.ss58_format)
287    if is_ss58_address(key):
288        return key
290    try:
291        keypair = classic_load_key(key, password=password)
292    except json.JSONDecodeError:
293        context.info(f"Please provide the password for the key {key}")
294        password = getpass()
295        keypair = classic_load_key(key, password=password)
296    except FileNotFoundError:
297        raise ValueError(
298            f"Key is not a valid SS58 address nor a valid key name: {key}")
300    address = keypair.ss58_address
302    return check_ss58_address(address, keypair.ss58_format)
