Edit on GitHub

communex.util

  1import ipaddress
  2import json
  3import os.path
  4import re
  5from typing import Any, Callable, Generic, Optional, Protocol, TypeVar
  6
  7import requests
  8
  9
 10def check_str(x: Any) -> str:
 11    assert isinstance(x, str)
 12    return x
 13
 14
 15def ensure_dir_exists(path: str) -> None:
 16    if not os.path.exists(path):
 17        os.makedirs(path, exist_ok=True)
 18
 19
 20def ensure_parent_dir_exists(path: str) -> None:
 21    ensure_dir_exists(os.path.dirname(path))
 22
 23
 24def bytes_to_hex(value: str | bytes) -> str:
 25    """
 26    Converts a string or bytes object to its hexadecimal representation.
 27
 28    If the input is already a string, it assumes that the string is already in
 29    hexadecimal format and returns it as is. If the input is bytes, it converts
 30    the bytes to their hexadecimal string representation.
 31
 32    Args:
 33        x: The input string or bytes object to be converted to hexadecimal.
 34
 35    Returns:
 36        The hexadecimal representation of the input.
 37
 38    Examples:
 39        _to_hex(b'hello') returns '68656c6c6f'
 40        _to_hex('68656c6c6f') returns '68656c6c6f'
 41    """
 42    if isinstance(value, bytes):
 43        return value.hex()
 44    assert isinstance(value, str)
 45    # TODO: Check that `value` is a valid hexadecimal string
 46    return value
 47
 48
 49def is_ip_valid(ip: str) -> bool:
 50    """
 51    Checks if an ip address is valid
 52    """
 53    try:
 54        ipaddress.ip_address(ip)
 55        return True
 56    except ValueError:
 57        return False
 58
 59
 60T = TypeVar("T")
 61
 62
 63class SetterGetterFn(Generic[T], Protocol):
 64    def __call__(self, x: T = ..., /) -> T: ...
 65
 66
 67def create_state_fn(default: Callable[..., T]) -> SetterGetterFn[T]:
 68    """
 69    Creates a state function that can be used to get or set a value.
 70    """
 71    value = default()
 72
 73    def state_function(input: Optional[T] = None):
 74        nonlocal value
 75        if input is not None:
 76            value = input
 77        return value
 78
 79    return state_function
 80
 81
 82def get_json_from_cid(cid: str) -> dict[Any, Any] | None:
 83    gateway = "https://ipfs.io/ipfs/"
 84    try:
 85        result = requests.get(gateway + cid)
 86        if result.ok:
 87            return result.json()
 88        return None
 89    except Exception:
 90        return None
 91
 92
 93def convert_cid_on_proposal(proposals: dict[int, dict[str, Any]]):
 94    unwrapped: dict[int, dict[str, Any]] = {}
 95    for prop_id, proposal in proposals.items():
 96        data = proposal.get("data")
 97        if data and "Custom" in data:
 98            metadata = proposal["metadata"]
 99            cid = metadata.split("ipfs://")[-1]
100            queried_cid = get_json_from_cid(cid)
101            if queried_cid:
102                body = queried_cid.get("body")
103                if body:
104                    try:
105                        queried_cid["body"] = json.loads(body)
106                    except Exception:
107                        pass
108            proposal["Custom"] = queried_cid
109        unwrapped[prop_id] = proposal
110    return unwrapped
111
112
113HEX_PATTERN = re.compile(r"^[0-9a-fA-F]+$")
114
115
116# TODO: merge `is_hex_string` into `parse_hex`
117def is_hex_string(string: str):
118    return bool(HEX_PATTERN.match(string))
119
120
121def parse_hex(hex_str: str) -> bytes:
122    if hex_str[0:2] == "0x":
123        return bytes.fromhex(hex_str[2:])
124    else:
125        return bytes.fromhex(hex_str)
def check_str(x: Any) -> str:
11def check_str(x: Any) -> str:
12    assert isinstance(x, str)
13    return x
def ensure_dir_exists(path: str) -> None:
16def ensure_dir_exists(path: str) -> None:
17    if not os.path.exists(path):
18        os.makedirs(path, exist_ok=True)
def ensure_parent_dir_exists(path: str) -> None:
21def ensure_parent_dir_exists(path: str) -> None:
22    ensure_dir_exists(os.path.dirname(path))
def bytes_to_hex(value: str | bytes) -> str:
25def bytes_to_hex(value: str | bytes) -> str:
26    """
27    Converts a string or bytes object to its hexadecimal representation.
28
29    If the input is already a string, it assumes that the string is already in
30    hexadecimal format and returns it as is. If the input is bytes, it converts
31    the bytes to their hexadecimal string representation.
32
33    Args:
34        x: The input string or bytes object to be converted to hexadecimal.
35
36    Returns:
37        The hexadecimal representation of the input.
38
39    Examples:
40        _to_hex(b'hello') returns '68656c6c6f'
41        _to_hex('68656c6c6f') returns '68656c6c6f'
42    """
43    if isinstance(value, bytes):
44        return value.hex()
45    assert isinstance(value, str)
46    # TODO: Check that `value` is a valid hexadecimal string
47    return value

Converts a string or bytes object to its hexadecimal representation.

If the input is already a string, it assumes that the string is already in hexadecimal format and returns it as is. If the input is bytes, it converts the bytes to their hexadecimal string representation.

Arguments:
  • x: The input string or bytes object to be converted to hexadecimal.
Returns:

The hexadecimal representation of the input.

Examples:

_to_hex(b'hello') returns '68656c6c6f' _to_hex('68656c6c6f') returns '68656c6c6f'

def is_ip_valid(ip: str) -> bool:
50def is_ip_valid(ip: str) -> bool:
51    """
52    Checks if an ip address is valid
53    """
54    try:
55        ipaddress.ip_address(ip)
56        return True
57    except ValueError:
58        return False

Checks if an ip address is valid

class SetterGetterFn(typing.Generic[~T], typing.Protocol):
64class SetterGetterFn(Generic[T], Protocol):
65    def __call__(self, x: T = ..., /) -> T: ...

Base class for protocol classes.

Protocol classes are defined as::

class Proto(Protocol):
    def meth(self) -> int:
        ...

Such classes are primarily used with static type checkers that recognize structural subtyping (static duck-typing).

For example::

class C:
    def meth(self) -> int:
        return 0

def func(x: Proto) -> int:
    return x.meth()

func(C())  # Passes static type check

See PEP 544 for details. Protocol classes decorated with @typing.runtime_checkable act as simple-minded runtime protocols that check only the presence of given attributes, ignoring their type signatures. Protocol classes can be generic, they are defined as::

class GenProto(Protocol[T]):
    def meth(self) -> T:
        ...
SetterGetterFn(*args, **kwargs)
1927def _no_init_or_replace_init(self, *args, **kwargs):
1928    cls = type(self)
1929
1930    if cls._is_protocol:
1931        raise TypeError('Protocols cannot be instantiated')
1932
1933    # Already using a custom `__init__`. No need to calculate correct
1934    # `__init__` to call. This can lead to RecursionError. See bpo-45121.
1935    if cls.__init__ is not _no_init_or_replace_init:
1936        return
1937
1938    # Initially, `__init__` of a protocol subclass is set to `_no_init_or_replace_init`.
1939    # The first instantiation of the subclass will call `_no_init_or_replace_init` which
1940    # searches for a proper new `__init__` in the MRO. The new `__init__`
1941    # replaces the subclass' old `__init__` (ie `_no_init_or_replace_init`). Subsequent
1942    # instantiation of the protocol subclass will thus use the new
1943    # `__init__` and no longer call `_no_init_or_replace_init`.
1944    for base in cls.__mro__:
1945        init = base.__dict__.get('__init__', _no_init_or_replace_init)
1946        if init is not _no_init_or_replace_init:
1947            cls.__init__ = init
1948            break
1949    else:
1950        # should not happen
1951        cls.__init__ = object.__init__
1952
1953    cls.__init__(self, *args, **kwargs)
def create_state_fn(default: Callable[..., ~T]) -> SetterGetterFn[~T]:
68def create_state_fn(default: Callable[..., T]) -> SetterGetterFn[T]:
69    """
70    Creates a state function that can be used to get or set a value.
71    """
72    value = default()
73
74    def state_function(input: Optional[T] = None):
75        nonlocal value
76        if input is not None:
77            value = input
78        return value
79
80    return state_function

Creates a state function that can be used to get or set a value.

def get_json_from_cid(cid: str) -> dict[typing.Any, typing.Any] | None:
83def get_json_from_cid(cid: str) -> dict[Any, Any] | None:
84    gateway = "https://ipfs.io/ipfs/"
85    try:
86        result = requests.get(gateway + cid)
87        if result.ok:
88            return result.json()
89        return None
90    except Exception:
91        return None
def convert_cid_on_proposal(proposals: dict[int, dict[str, typing.Any]]):
 94def convert_cid_on_proposal(proposals: dict[int, dict[str, Any]]):
 95    unwrapped: dict[int, dict[str, Any]] = {}
 96    for prop_id, proposal in proposals.items():
 97        data = proposal.get("data")
 98        if data and "Custom" in data:
 99            metadata = proposal["metadata"]
100            cid = metadata.split("ipfs://")[-1]
101            queried_cid = get_json_from_cid(cid)
102            if queried_cid:
103                body = queried_cid.get("body")
104                if body:
105                    try:
106                        queried_cid["body"] = json.loads(body)
107                    except Exception:
108                        pass
109            proposal["Custom"] = queried_cid
110        unwrapped[prop_id] = proposal
111    return unwrapped
HEX_PATTERN = re.compile('^[0-9a-fA-F]+$')
def is_hex_string(string: str):
118def is_hex_string(string: str):
119    return bool(HEX_PATTERN.match(string))
def parse_hex(hex_str: str) -> bytes:
122def parse_hex(hex_str: str) -> bytes:
123    if hex_str[0:2] == "0x":
124        return bytes.fromhex(hex_str[2:])
125    else:
126        return bytes.fromhex(hex_str)