Edit on GitHub

communex.util

  1import ipaddress
  2import json
  3import os.path
  4import re
  5from typing import Any, Callable, Generic, Optional, Protocol, TypeVar
  6
  7import requests
  8
  9
 10def check_str(x: Any) -> str:
 11    assert isinstance(x, str)
 12    return x
 13
 14
 15def ensure_dir_exists(path: str) -> None:
 16    if not os.path.exists(path):
 17        os.makedirs(path, exist_ok=True)
 18
 19
 20def ensure_parent_dir_exists(path: str) -> None:
 21    ensure_dir_exists(os.path.dirname(path))
 22
 23
 24def bytes_to_hex(value: str | bytes) -> str:
 25    """
 26    Converts a string or bytes object to its hexadecimal representation.
 27
 28    If the input is already a string, it assumes that the string is already in
 29    hexadecimal format and returns it as is. If the input is bytes, it converts
 30    the bytes to their hexadecimal string representation.
 31
 32    Args:
 33        x: The input string or bytes object to be converted to hexadecimal.
 34
 35    Returns:
 36        The hexadecimal representation of the input.
 37
 38    Examples:
 39        _to_hex(b'hello') returns '68656c6c6f'
 40        _to_hex('68656c6c6f') returns '68656c6c6f'
 41    """
 42    if isinstance(value, bytes):
 43        return value.hex()
 44    assert isinstance(value, str)
 45    # TODO: Check that `value` is a valid hexadecimal string
 46    return value
 47
 48
 49def is_ip_valid(ip: str) -> bool:
 50    """
 51    Checks if an ip address is valid
 52    """
 53    try:
 54        ipaddress.ip_address(ip)
 55        return True
 56    except ValueError:
 57        return False
 58
 59
 60T = TypeVar("T")
 61
 62
 63class SetterGetterFn(Generic[T], Protocol):
 64    def __call__(self, x: T = ..., /) -> T:
 65        ...
 66
 67
 68def create_state_fn(default: Callable[..., T]) -> SetterGetterFn[T]:
 69    """
 70    Creates a state function that can be used to get or set a value.
 71    """
 72    value = default()
 73
 74    def state_function(input: Optional[T] = None):
 75        nonlocal value
 76        if input is not None:
 77            value = input
 78        return value
 79
 80    return state_function
 81
 82
 83def get_json_from_cid(cid: str) -> dict[Any, Any] | None:
 84    gateway = "https://ipfs.io/ipfs/"
 85    try:
 86        result = requests.get(gateway + cid)
 87        if result.ok:
 88            return result.json()
 89        return None
 90    except Exception:
 91        return None
 92
 93
 94def convert_cid_on_proposal(proposals: dict[int, dict[str, Any]]):
 95    unwrapped: dict[int, dict[str, Any]] = {}
 96    for prop_id, proposal in proposals.items():
 97        data = proposal.get("data")
 98        if data and "Custom" in data:
 99            metadata = proposal["metadata"]
100            cid = metadata.split("ipfs://")[-1]
101            queried_cid = get_json_from_cid(cid)
102            if queried_cid:
103                body = queried_cid.get("body")
104                if body:
105                    try:
106                        queried_cid["body"] = json.loads(body)
107                    except Exception:
108                        pass
109            proposal["Custom"] = queried_cid
110        unwrapped[prop_id] = proposal
111    return unwrapped
112
113
114HEX_PATTERN = re.compile(r"^[0-9a-fA-F]+$")
115
116
117# TODO: merge `is_hex_string` into `parse_hex`
118def is_hex_string(string: str):
119    return bool(HEX_PATTERN.match(string))
120
121
122def parse_hex(hex_str: str) -> bytes:
123    if hex_str[0:2] == "0x":
124        return bytes.fromhex(hex_str[2:])
125    else:
126        return bytes.fromhex(hex_str)
def check_str(x: Any) -> str:
11def check_str(x: Any) -> str:
12    assert isinstance(x, str)
13    return x
def ensure_dir_exists(path: str) -> None:
16def ensure_dir_exists(path: str) -> None:
17    if not os.path.exists(path):
18        os.makedirs(path, exist_ok=True)
def ensure_parent_dir_exists(path: str) -> None:
21def ensure_parent_dir_exists(path: str) -> None:
22    ensure_dir_exists(os.path.dirname(path))
def bytes_to_hex(value: str | bytes) -> str:
25def bytes_to_hex(value: str | bytes) -> str:
26    """
27    Converts a string or bytes object to its hexadecimal representation.
28
29    If the input is already a string, it assumes that the string is already in
30    hexadecimal format and returns it as is. If the input is bytes, it converts
31    the bytes to their hexadecimal string representation.
32
33    Args:
34        x: The input string or bytes object to be converted to hexadecimal.
35
36    Returns:
37        The hexadecimal representation of the input.
38
39    Examples:
40        _to_hex(b'hello') returns '68656c6c6f'
41        _to_hex('68656c6c6f') returns '68656c6c6f'
42    """
43    if isinstance(value, bytes):
44        return value.hex()
45    assert isinstance(value, str)
46    # TODO: Check that `value` is a valid hexadecimal string
47    return value

Converts a string or bytes object to its hexadecimal representation.

If the input is already a string, it assumes that the string is already in hexadecimal format and returns it as is. If the input is bytes, it converts the bytes to their hexadecimal string representation.

Arguments:
  • x: The input string or bytes object to be converted to hexadecimal.
Returns:

The hexadecimal representation of the input.

Examples:

_to_hex(b'hello') returns '68656c6c6f' _to_hex('68656c6c6f') returns '68656c6c6f'

def is_ip_valid(ip: str) -> bool:
50def is_ip_valid(ip: str) -> bool:
51    """
52    Checks if an ip address is valid
53    """
54    try:
55        ipaddress.ip_address(ip)
56        return True
57    except ValueError:
58        return False

Checks if an ip address is valid

class SetterGetterFn(typing.Generic[~T], typing.Protocol):
64class SetterGetterFn(Generic[T], Protocol):
65    def __call__(self, x: T = ..., /) -> T:
66        ...

Base class for protocol classes.

Protocol classes are defined as::

class Proto(Protocol):
    def meth(self) -> int:
        ...

Such classes are primarily used with static type checkers that recognize structural subtyping (static duck-typing).

For example::

class C:
    def meth(self) -> int:
        return 0

def func(x: Proto) -> int:
    return x.meth()

func(C())  # Passes static type check

See PEP 544 for details. Protocol classes decorated with @typing.runtime_checkable act as simple-minded runtime protocols that check only the presence of given attributes, ignoring their type signatures. Protocol classes can be generic, they are defined as::

class GenProto(Protocol[T]):
    def meth(self) -> T:
        ...
SetterGetterFn(*args, **kwargs)
1927def _no_init_or_replace_init(self, *args, **kwargs):
1928    cls = type(self)
1929
1930    if cls._is_protocol:
1931        raise TypeError('Protocols cannot be instantiated')
1932
1933    # Already using a custom `__init__`. No need to calculate correct
1934    # `__init__` to call. This can lead to RecursionError. See bpo-45121.
1935    if cls.__init__ is not _no_init_or_replace_init:
1936        return
1937
1938    # Initially, `__init__` of a protocol subclass is set to `_no_init_or_replace_init`.
1939    # The first instantiation of the subclass will call `_no_init_or_replace_init` which
1940    # searches for a proper new `__init__` in the MRO. The new `__init__`
1941    # replaces the subclass' old `__init__` (ie `_no_init_or_replace_init`). Subsequent
1942    # instantiation of the protocol subclass will thus use the new
1943    # `__init__` and no longer call `_no_init_or_replace_init`.
1944    for base in cls.__mro__:
1945        init = base.__dict__.get('__init__', _no_init_or_replace_init)
1946        if init is not _no_init_or_replace_init:
1947            cls.__init__ = init
1948            break
1949    else:
1950        # should not happen
1951        cls.__init__ = object.__init__
1952
1953    cls.__init__(self, *args, **kwargs)
def create_state_fn(default: Callable[..., ~T]) -> SetterGetterFn[~T]:
69def create_state_fn(default: Callable[..., T]) -> SetterGetterFn[T]:
70    """
71    Creates a state function that can be used to get or set a value.
72    """
73    value = default()
74
75    def state_function(input: Optional[T] = None):
76        nonlocal value
77        if input is not None:
78            value = input
79        return value
80
81    return state_function

Creates a state function that can be used to get or set a value.

def get_json_from_cid(cid: str) -> dict[typing.Any, typing.Any] | None:
84def get_json_from_cid(cid: str) -> dict[Any, Any] | None:
85    gateway = "https://ipfs.io/ipfs/"
86    try:
87        result = requests.get(gateway + cid)
88        if result.ok:
89            return result.json()
90        return None
91    except Exception:
92        return None
def convert_cid_on_proposal(proposals: dict[int, dict[str, typing.Any]]):
 95def convert_cid_on_proposal(proposals: dict[int, dict[str, Any]]):
 96    unwrapped: dict[int, dict[str, Any]] = {}
 97    for prop_id, proposal in proposals.items():
 98        data = proposal.get("data")
 99        if data and "Custom" in data:
100            metadata = proposal["metadata"]
101            cid = metadata.split("ipfs://")[-1]
102            queried_cid = get_json_from_cid(cid)
103            if queried_cid:
104                body = queried_cid.get("body")
105                if body:
106                    try:
107                        queried_cid["body"] = json.loads(body)
108                    except Exception:
109                        pass
110            proposal["Custom"] = queried_cid
111        unwrapped[prop_id] = proposal
112    return unwrapped
HEX_PATTERN = re.compile('^[0-9a-fA-F]+$')
def is_hex_string(string: str):
119def is_hex_string(string: str):
120    return bool(HEX_PATTERN.match(string))
def parse_hex(hex_str: str) -> bytes:
123def parse_hex(hex_str: str) -> bytes:
124    if hex_str[0:2] == "0x":
125        return bytes.fromhex(hex_str[2:])
126    else:
127        return bytes.fromhex(hex_str)