Edit on GitHub

communex.client

   1import json
   2import queue
   3from concurrent.futures import Future, ThreadPoolExecutor
   4from contextlib import contextmanager
   5from copy import deepcopy
   6from dataclasses import dataclass
   7from typing import Any, Mapping, TypeVar, cast
   8
   9from substrateinterface import ExtrinsicReceipt  # type: ignore
  10from substrateinterface import Keypair  # type: ignore
  11from substrateinterface import SubstrateInterface  # type: ignore
  12from substrateinterface.storage import StorageKey  # type: ignore
  13
  14from communex._common import transform_stake_dmap
  15from communex.errors import ChainTransactionError, NetworkQueryError
  16from communex.types import NetworkParams, Ss58Address, SubnetParams
  17
  18# TODO: InsufficientBalanceError, MismatchedLengthError etc
  19
  20MAX_REQUEST_SIZE = 9_000_000
  21
  22
  23@dataclass
  24class Chunk:
  25    batch_requests: list[tuple[Any, Any]]
  26    prefix_list: list[list[str]]
  27    fun_params: list[tuple[Any, Any, Any, Any, str]]
  28
  29
  30T1 = TypeVar("T1")
  31T2 = TypeVar("T2")
  32
  33
  34class CommuneClient:
  35    """
  36    A client for interacting with Commune network nodes, querying storage,
  37    submitting transactions, etc.
  38
  39    Attributes:
  40        wait_for_finalization: Whether to wait for transaction finalization.
  41
  42    Example:
  43    ```py
  44    client = CommuneClient()
  45    client.query(name='function_name', params=['param1', 'param2'])
  46    ```
  47
  48    Raises:
  49        AssertionError: If the maximum connections value is less than or equal
  50          to zero.
  51    """
  52
  53    wait_for_finalization: bool
  54    _num_connections: int
  55    _connection_queue: queue.Queue[SubstrateInterface]
  56    url: str
  57
  58    def __init__(
  59        self,
  60        url: str,
  61        num_connections: int = 1,
  62        wait_for_finalization: bool = False,
  63    ):
  64        """
  65        Args:
  66            url: The URL of the network node to connect to.
  67            num_connections: The number of websocket connections to be opened.
  68        """
  69        assert num_connections > 0
  70        self._num_connections = num_connections
  71        self.wait_for_finalization = wait_for_finalization
  72        self._connection_queue = queue.Queue(num_connections)
  73        self.url = url
  74
  75        for _ in range(num_connections):
  76            self._connection_queue.put(SubstrateInterface(url))
  77
  78    @property
  79    def connections(self) -> int:
  80        """
  81        Gets the maximum allowed number of simultaneous connections to the
  82        network node.
  83        """
  84        return self._num_connections
  85
  86    @contextmanager
  87    def get_conn(self, timeout: float | None = None, init: bool = False):
  88        """
  89        Context manager to get a connection from the pool.
  90
  91        Tries to get a connection from the pool queue. If the queue is empty,
  92        it blocks for `timeout` seconds until a connection is available. If
  93        `timeout` is None, it blocks indefinitely.
  94
  95        Args:
  96            timeout: The maximum time in seconds to wait for a connection.
  97
  98        Yields:
  99            The connection object from the pool.
 100
 101        Raises:
 102            QueueEmptyError: If no connection is available within the timeout
 103              period.
 104        """
 105        conn = self._connection_queue.get(timeout=timeout)
 106        if init:
 107            conn.init_runtime()  # type: ignore
 108        try:
 109            yield conn
 110        finally:
 111            self._connection_queue.put(conn)
 112
 113    def _get_storage_keys(
 114        self,
 115        storage: str,
 116        queries: list[tuple[str, list[Any]]],
 117        block_hash: str | None,
 118    ):
 119
 120        send: list[tuple[str, list[Any]]] = []
 121        prefix_list: list[Any] = []
 122
 123        key_idx = 0
 124        with self.get_conn(init=True) as substrate:
 125            for function, params in queries:
 126                storage_key = StorageKey.create_from_storage_function(  # type: ignore
 127                    storage, function, params, runtime_config=substrate.runtime_config, metadata=substrate.metadata  # type: ignore
 128                )
 129
 130                prefix = storage_key.to_hex()
 131                prefix_list.append(prefix)
 132                send.append(("state_getKeys", [prefix, block_hash]))
 133                key_idx += 1
 134        return send, prefix_list
 135
 136    def _get_lists(
 137        self,
 138        storage_module: str,
 139        queries: list[tuple[str, list[Any]]],
 140        substrate: SubstrateInterface,
 141    ) -> list[tuple[Any, Any, Any, Any, str]]:
 142        """
 143        Generates a list of tuples containing parameters for each storage function based on the given functions and substrate interface.
 144
 145        Args:
 146            functions (dict[str, list[query_call]]): A dictionary where keys are storage module names and values are lists of tuples.
 147                Each tuple consists of a storage function name and its parameters.
 148            substrate: An instance of the SubstrateInterface class used to interact with the substrate.
 149
 150        Returns:
 151            A list of tuples in the format `(value_type, param_types, key_hashers, params, storage_function)` for each storage function in the given functions.
 152
 153        Example:
 154            >>> _get_lists(
 155                    functions={'storage_module': [('storage_function', ['param1', 'param2'])]},
 156                    substrate=substrate_instance
 157                )
 158            [('value_type', 'param_types', 'key_hashers', ['param1', 'param2'], 'storage_function'), ...]
 159        """
 160
 161        function_parameters: list[tuple[Any, Any, Any, Any, str]] = []
 162
 163        metadata_pallet = substrate.metadata.get_metadata_pallet(  # type: ignore
 164            storage_module
 165        )
 166        for storage_function, params in queries:
 167            storage_item = metadata_pallet.get_storage_function(  # type: ignore
 168                storage_function
 169            )
 170
 171            value_type = storage_item.get_value_type_string()  # type: ignore
 172            param_types = storage_item.get_params_type_string()  # type: ignore
 173            key_hashers = storage_item.get_param_hashers()  # type: ignore
 174            function_parameters.append(
 175                (
 176                    value_type,
 177                    param_types,
 178                    key_hashers,
 179                    params,
 180                    storage_function,
 181                )  # type: ignore
 182            )
 183        return function_parameters
 184
 185    def _send_batch(
 186        self,
 187        batch_payload: list[Any],
 188        request_ids: list[int],
 189        extract_result: bool = True,
 190    ):
 191        """
 192        Sends a batch of requests to the substrate and collects the results.
 193
 194        Args:
 195            substrate: An instance of the substrate interface.
 196            batch_payload: The payload of the batch request.
 197            request_ids: A list of request IDs for tracking responses.
 198            results: A list to store the results of the requests.
 199            extract_result: Whether to extract the result from the response.
 200
 201        Raises:
 202            NetworkQueryError: If there is an `error` in the response message.
 203
 204        Note:
 205            No explicit return value as results are appended to the provided 'results' list.
 206        """
 207        results: list[str | dict[Any, Any]] = []
 208        with self.get_conn(init=True) as substrate:
 209            try:
 210
 211                substrate.websocket.send(  #  type: ignore
 212                    json.dumps(batch_payload)
 213                )  # type: ignore
 214            except NetworkQueryError:
 215                pass
 216            while len(results) < len(request_ids):
 217                received_messages = json.loads(
 218                    substrate.websocket.recv()  # type: ignore
 219                )  # type: ignore
 220                if isinstance(received_messages, dict):
 221                    received_messages: list[dict[Any, Any]] = [received_messages]
 222
 223                for message in received_messages:
 224                    if message.get("id") in request_ids:
 225                        if extract_result:
 226                            try:
 227                                results.append(message["result"])
 228                            except Exception:
 229                                raise (
 230                                    RuntimeError(
 231                                        f"Error extracting result from message: {message}"
 232                                    )
 233                                )
 234                        else:
 235                            results.append(message)
 236                    if "error" in message:
 237                        raise NetworkQueryError(message["error"])
 238
 239            return results
 240
 241    def _make_request_smaller(
 242        self,
 243        batch_request: list[tuple[T1, T2]],
 244        prefix_list: list[list[str]],
 245        fun_params: list[tuple[Any, Any, Any, Any, str]],
 246    ) -> tuple[list[list[tuple[T1, T2]]], list[Chunk]]:
 247        """
 248        Splits a batch of requests into smaller batches, each not exceeding the specified maximum size.
 249
 250        Args:
 251            batch_request: A list of requests to be sent in a batch.
 252            max_size: Maximum size of each batch in bytes.
 253
 254        Returns:
 255            A list of smaller request batches.
 256
 257        Example:
 258            >>> _make_request_smaller(batch_request=[('method1', 'params1'), ('method2', 'params2')], max_size=1000)
 259            [[('method1', 'params1')], [('method2', 'params2')]]
 260        """
 261        assert len(prefix_list) == len(fun_params) == len(batch_request)
 262
 263        def estimate_size(request: tuple[T1, T2]):
 264            """Convert the batch request to a string and measure its length"""
 265            return len(json.dumps(request))
 266
 267        # Initialize variables
 268        result: list[list[tuple[T1, T2]]] = []
 269        current_batch = []
 270        current_prefix_batch = []
 271        current_params_batch = []
 272        current_size = 0
 273
 274        chunk_list: list[Chunk] = []
 275
 276        # Iterate through each request in the batch
 277        for request, prefix, params in zip(batch_request, prefix_list, fun_params):
 278            request_size = estimate_size(request)
 279
 280            # Check if adding this request exceeds the max size
 281            if current_size + request_size > MAX_REQUEST_SIZE:
 282                # If so, start a new batch
 283
 284                # Essentiatly checks that it's not the first iteration
 285                if current_batch:
 286                    chunk = Chunk(
 287                        current_batch, current_prefix_batch, current_params_batch
 288                    )
 289                    chunk_list.append(chunk)
 290                    result.append(current_batch)
 291
 292                current_batch = [request]
 293                current_prefix_batch = [prefix]
 294                current_params_batch = [params]
 295                current_size = request_size
 296            else:
 297                # Otherwise, add to the current batch
 298                current_batch.append(request)
 299                current_size += request_size
 300                current_prefix_batch.append(prefix)
 301                current_params_batch.append(params)
 302
 303        # Add the last batch if it's not empty
 304        if current_batch:
 305            result.append(current_batch)
 306            chunk = Chunk(current_batch, current_prefix_batch, current_params_batch)
 307            chunk_list.append(chunk)
 308
 309        return result, chunk_list
 310
 311    def _are_changes_equal(self, change_a: Any, change_b: Any):
 312        for (a, b), (c, d) in zip(change_a, change_b):
 313            if a != c or b != d:
 314                return False
 315
 316    def _rpc_request_batch(
 317        self, batch_requests: list[tuple[str, list[Any]]], extract_result: bool = True
 318    ) -> list[str]:
 319        """
 320        Sends batch requests to the substrate node using multiple threads and collects the results.
 321
 322        Args:
 323            substrate: An instance of the substrate interface.
 324            batch_requests : A list of requests to be sent in batches.
 325            max_size: Maximum size of each batch in bytes.
 326            extract_result: Whether to extract the result from the response message.
 327
 328        Returns:
 329            A list of results from the batch requests.
 330
 331        Example:
 332            >>> _rpc_request_batch(substrate_instance, [('method1', ['param1']), ('method2', ['param2'])])
 333            ['result1', 'result2', ...]
 334        """
 335
 336        chunk_results: list[Any] = []
 337        # smaller_requests = self._make_request_smaller(batch_requests)
 338        request_id = 0
 339        with ThreadPoolExecutor() as executor:
 340            futures: list[Future[list[str | dict[Any, Any]]]] = []
 341            for chunk in [batch_requests]:
 342                request_ids: list[int] = []
 343                batch_payload: list[Any] = []
 344                for method, params in chunk:
 345                    request_id += 1
 346                    request_ids.append(request_id)
 347                    batch_payload.append(
 348                        {
 349                            "jsonrpc": "2.0",
 350                            "method": method,
 351                            "params": params,
 352                            "id": request_id,
 353                        }
 354                    )
 355
 356                futures.append(
 357                    executor.submit(
 358                        self._send_batch,
 359                        batch_payload=batch_payload,
 360                        request_ids=request_ids,
 361                        extract_result=extract_result,
 362                    )
 363                )
 364            for future in futures:
 365                resul = future.result()
 366                chunk_results.append(resul)
 367        return chunk_results
 368
 369    def _rpc_request_batch_chunked(
 370        self, chunk_requests: list[Chunk], extract_result: bool = True
 371    ):
 372        """
 373        Sends batch requests to the substrate node using multiple threads and collects the results.
 374
 375        Args:
 376            substrate: An instance of the substrate interface.
 377            batch_requests : A list of requests to be sent in batches.
 378            max_size: Maximum size of each batch in bytes.
 379            extract_result: Whether to extract the result from the response message.
 380
 381        Returns:
 382            A list of results from the batch requests.
 383
 384        Example:
 385            >>> _rpc_request_batch(substrate_instance, [('method1', ['param1']), ('method2', ['param2'])])
 386            ['result1', 'result2', ...]
 387        """
 388
 389        def split_chunks(chunk: Chunk, chunk_info: list[Chunk], chunk_info_idx: int):
 390            manhattam_chunks: list[tuple[Any, Any]] = []
 391            mutaded_chunk_info = deepcopy(chunk_info)
 392            max_n_keys = 35000
 393            for query in chunk.batch_requests:
 394                result_keys = query[1][0]
 395                keys_amount = len(result_keys)
 396                if keys_amount > max_n_keys:
 397                    mutaded_chunk_info.pop(chunk_info_idx)
 398                    for i in range(0, keys_amount, max_n_keys):
 399                        new_chunk = deepcopy(chunk)
 400                        splitted_keys = result_keys[i: i + max_n_keys]
 401                        splitted_query = deepcopy(query)
 402                        splitted_query[1][0] = splitted_keys
 403                        new_chunk.batch_requests = [splitted_query]
 404                        manhattam_chunks.append(splitted_query)
 405                        mutaded_chunk_info.insert(chunk_info_idx, new_chunk)
 406                else:
 407                    manhattam_chunks.append(query)
 408            return manhattam_chunks, mutaded_chunk_info
 409
 410        assert len(chunk_requests) > 0
 411        mutated_chunk_info: list[Chunk] = []
 412        chunk_results: list[Any] = []
 413        # smaller_requests = self._make_request_smaller(batch_requests)
 414        request_id = 0
 415
 416        with ThreadPoolExecutor() as executor:
 417            futures: list[Future[list[str | dict[Any, Any]]]] = []
 418            for idx, macro_chunk in enumerate(chunk_requests):
 419                _, mutated_chunk_info = split_chunks(macro_chunk, chunk_requests, idx)
 420            for chunk in mutated_chunk_info:
 421                request_ids: list[int] = []
 422                batch_payload: list[Any] = []
 423                for method, params in chunk.batch_requests:
 424                    # for method, params in micro_chunk:
 425                    request_id += 1
 426                    request_ids.append(request_id)
 427                    batch_payload.append(
 428                        {
 429                            "jsonrpc": "2.0",
 430                            "method": method,
 431                            "params": params,
 432                            "id": request_id,
 433                        }
 434                    )
 435                futures.append(
 436                    executor.submit(
 437                        self._send_batch,
 438                        batch_payload=batch_payload,
 439                        request_ids=request_ids,
 440                        extract_result=extract_result,
 441                    )
 442                )
 443            for future in futures:
 444                resul = future.result()
 445                chunk_results.append(resul)
 446        return chunk_results, mutated_chunk_info
 447
 448    def _decode_response(
 449        self,
 450        response: list[str],
 451        function_parameters: list[tuple[Any, Any, Any, Any, str]],
 452        prefix_list: list[Any],
 453        block_hash: str,
 454    ) -> dict[str, dict[Any, Any]]:
 455        """
 456        Decodes a response from the substrate interface and organizes the data into a dictionary.
 457
 458        Args:
 459            response: A list of encoded responses from a substrate query.
 460            function_parameters: A list of tuples containing the parameters for each storage function.
 461            last_keys: A list of the last keys used in the substrate query.
 462            prefix_list: A list of prefixes used in the substrate query.
 463            substrate: An instance of the SubstrateInterface class.
 464            block_hash: The hash of the block to be queried.
 465
 466        Returns:
 467            A dictionary where each key is a storage function name and the value is another dictionary.
 468            This inner dictionary's key is the decoded key from the response and the value is the corresponding decoded value.
 469
 470        Raises:
 471            ValueError: If an unsupported hash type is encountered in the `concat_hash_len` function.
 472
 473        Example:
 474            >>> _decode_response(
 475                    response=[...],
 476                    function_parameters=[...],
 477                    last_keys=[...],
 478                    prefix_list=[...],
 479                    substrate=substrate_instance,
 480                    block_hash="0x123..."
 481                )
 482            {'storage_function_name': {decoded_key: decoded_value, ...}, ...}
 483        """
 484
 485        def get_item_key_value(item_key: tuple[Any, ...] | Any) -> tuple[Any, ...] | Any:
 486            if isinstance(item_key, tuple):
 487                return tuple(k.value for k in item_key)
 488            return item_key.value
 489
 490        def concat_hash_len(key_hasher: str) -> int:
 491            """
 492            Determines the length of the hash based on the given key hasher type.
 493
 494            Args:
 495                key_hasher: The type of key hasher.
 496
 497            Returns:
 498                The length of the hash corresponding to the given key hasher type.
 499
 500            Raises:
 501                ValueError: If the key hasher type is not supported.
 502
 503            Example:
 504                >>> concat_hash_len("Blake2_128Concat")
 505                16
 506            """
 507
 508            if key_hasher == "Blake2_128Concat":
 509                return 16
 510            elif key_hasher == "Twox64Concat":
 511                return 8
 512            elif key_hasher == "Identity":
 513                return 0
 514            else:
 515                raise ValueError("Unsupported hash type")
 516
 517        assert len(response) == len(function_parameters) == len(prefix_list)
 518        result_dict: dict[str, dict[Any, Any]] = {}
 519        for res, fun_params_tuple, prefix in zip(
 520            response, function_parameters, prefix_list
 521        ):
 522            if not res:
 523                continue
 524            res = res[0]
 525            changes = res["changes"]  # type: ignore
 526            value_type, param_types, key_hashers, params, storage_function = (
 527                fun_params_tuple
 528            )
 529            with self.get_conn(init=True) as substrate:
 530                for item in changes:
 531                    # Determine type string
 532                    key_type_string: list[Any] = []
 533                    for n in range(len(params), len(param_types)):
 534                        key_type_string.append(
 535                            f"[u8; {concat_hash_len(key_hashers[n])}]"
 536                        )
 537                        key_type_string.append(param_types[n])
 538
 539                    item_key_obj = substrate.decode_scale(  # type: ignore
 540                        type_string=f"({', '.join(key_type_string)})",
 541                        scale_bytes="0x" + item[0][len(prefix):],
 542                        return_scale_obj=True,
 543                        block_hash=block_hash,
 544                    )
 545                    # strip key_hashers to use as item key
 546                    if len(param_types) - len(params) == 1:
 547                        item_key = item_key_obj.value_object[1]  # type: ignore
 548                    else:
 549                        item_key = tuple(  # type: ignore
 550                            item_key_obj.value_object[key + 1]  # type: ignore
 551                            for key in range(  # type: ignore
 552                                len(params), len(param_types) + 1, 2
 553                            )
 554                        )
 555
 556                    item_value = substrate.decode_scale(  # type: ignore
 557                        type_string=value_type,
 558                        scale_bytes=item[1],
 559                        return_scale_obj=True,
 560                        block_hash=block_hash,
 561                    )
 562                    result_dict.setdefault(storage_function, {})
 563                    key = get_item_key_value(item_key)  # type: ignore
 564                    result_dict[storage_function][key] = item_value.value  # type: ignore
 565
 566        return result_dict
 567
 568    def query_batch(
 569        self, functions: dict[str, list[tuple[str, list[Any]]]]
 570    ) -> dict[str, str]:
 571        """
 572        Executes batch queries on a substrate and returns results in a dictionary format.
 573
 574        Args:
 575            substrate: An instance of SubstrateInterface to interact with the substrate.
 576            functions (dict[str, list[query_call]]): A dictionary mapping module names to lists of query calls (function name and parameters).
 577
 578        Returns:
 579            A dictionary where keys are storage function names and values are the query results.
 580
 581        Raises:
 582            Exception: If no result is found from the batch queries.
 583
 584        Example:
 585            >>> query_batch(substrate_instance, {'module_name': [('function_name', ['param1', 'param2'])]})
 586            {'function_name': 'query_result', ...}
 587        """
 588
 589        result: dict[str, str] = {}
 590        if not functions:
 591            raise Exception("No result")
 592        with self.get_conn(init=True) as substrate:
 593            for module, queries in functions.items():
 594                storage_keys: list[Any] = []
 595                for fn, params in queries:
 596                    storage_function = substrate.create_storage_key(  # type: ignore
 597                        pallet=module, storage_function=fn, params=params
 598                    )
 599                    storage_keys.append(storage_function)
 600
 601                block_hash = substrate.get_block_hash()
 602                responses: list[Any] = substrate.query_multi(  # type: ignore
 603                    storage_keys=storage_keys, block_hash=block_hash
 604                )
 605
 606                for item in responses:
 607                    fun = item[0]
 608                    query = item[1]
 609                    storage_fun = fun.storage_function
 610                    result[storage_fun] = query.value
 611
 612        return result
 613
 614    def query_batch_map(
 615        self,
 616        functions: dict[str, list[tuple[str, list[Any]]]],
 617        block_hash: str | None = None,
 618    ) -> dict[str, dict[Any, Any]]:
 619        """
 620        Queries multiple storage functions using a map batch approach and returns the combined result.
 621
 622        Args:
 623            substrate: An instance of SubstrateInterface for substrate interaction.
 624            functions (dict[str, list[query_call]]): A dictionary mapping module names to lists of query calls.
 625
 626        Returns:
 627            The combined result of the map batch query.
 628
 629        Example:
 630            >>> query_batch_map(substrate_instance, {'module_name': [('function_name', ['param1', 'param2'])]})
 631            # Returns the combined result of the map batch query
 632        """
 633        multi_result: dict[str, dict[Any, Any]] = {}
 634
 635        def recursive_update(
 636            d: dict[str, dict[T1, T2] | dict[str, Any]],
 637            u: Mapping[str, dict[Any, Any] | str],
 638        ) -> dict[str, dict[T1, T2]]:
 639            for k, v in u.items():
 640                if isinstance(v, dict):
 641                    d[k] = recursive_update(d.get(k, {}), v)  # type: ignore
 642                else:
 643                    d[k] = v  # type: ignore
 644            return d  # type: ignore
 645
 646        def get_page():
 647            send, prefix_list = self._get_storage_keys(storage, queries, block_hash)
 648            with self.get_conn(init=True) as substrate:
 649                function_parameters = self._get_lists(storage, queries, substrate)
 650            responses = self._rpc_request_batch(send)
 651            # assumption because send is just the storage_function keys
 652            # so it should always be really small regardless of the amount of queries
 653            assert len(responses) == 1
 654            res = responses[0]
 655            built_payload: list[tuple[str, list[Any]]] = []
 656            for result_keys in res:
 657                built_payload.append(
 658                    ("state_queryStorageAt", [result_keys, block_hash])
 659                )
 660            _, chunks_info = self._make_request_smaller(
 661                built_payload, prefix_list, function_parameters
 662            )
 663            chunks_response, chunks_info = self._rpc_request_batch_chunked(chunks_info)
 664            return chunks_response, chunks_info
 665
 666        if not block_hash:
 667            with self.get_conn(init=True) as substrate:
 668                block_hash = substrate.get_block_hash()
 669        for storage, queries in functions.items():
 670            chunks, chunks_info = get_page()
 671            # if this doesn't happen something is wrong on the code
 672            # and we won't be able to decode the data properly
 673            assert len(chunks) == len(chunks_info)
 674            for chunk_info, response in zip(chunks_info, chunks):
 675                storage_result = self._decode_response(
 676                    response, chunk_info.fun_params, chunk_info.prefix_list, block_hash
 677                )
 678                multi_result = recursive_update(multi_result, storage_result)
 679
 680        return multi_result
 681
 682    def query(
 683        self,
 684        name: str,
 685        params: list[Any] = [],
 686        module: str = "SubspaceModule",
 687        block_hash: str | None = None,
 688    ) -> Any:
 689        """
 690        Queries a storage function on the network.
 691
 692        Sends a query to the network and retrieves data from a
 693        specified storage function.
 694
 695        Args:
 696            name: The name of the storage function to query.
 697            params: The parameters to pass to the storage function.
 698            module: The module where the storage function is located.
 699
 700        Returns:
 701            The result of the query from the network.
 702
 703        Raises:
 704            NetworkQueryError: If the query fails or is invalid.
 705        """
 706
 707        result = self.query_batch({module: [(name, params)]})
 708
 709        return result[name]
 710
 711    def query_map(
 712        self,
 713        name: str,
 714        params: list[Any] = [],
 715        module: str = "SubspaceModule",
 716        extract_value: bool = True,
 717    ) -> dict[Any, Any]:
 718        """
 719        Queries a storage map from a network node.
 720
 721        Args:
 722            name: The name of the storage map to query.
 723            params: A list of parameters for the query.
 724            module: The module in which the storage map is located.
 725
 726        Returns:
 727            A dictionary representing the key-value pairs
 728              retrieved from the storage map.
 729
 730        Raises:
 731            QueryError: If the query to the network fails or is invalid.
 732        """
 733
 734        result = self.query_batch_map({module: [(name, params)]})
 735
 736        if extract_value:
 737            return {k.value: v.value for k, v in result}  # type: ignore
 738
 739        return result
 740
 741    def compose_call(
 742        self,
 743        fn: str,
 744        params: dict[str, Any],
 745        key: Keypair | None,
 746        module: str = "SubspaceModule",
 747        wait_for_inclusion: bool = True,
 748        wait_for_finalization: bool | None = None,
 749        sudo: bool = False,
 750        unsigned: bool = False,
 751    ) -> ExtrinsicReceipt:
 752        """
 753        Composes and submits a call to the network node.
 754
 755        Composes and signs a call with the provided keypair, and submits it to
 756        the network. The call can be a standard extrinsic or a sudo extrinsic if
 757        elevated permissions are required. The method can optionally wait for
 758        the call's inclusion in a block and/or its finalization.
 759
 760        Args:
 761            fn: The function name to call on the network.
 762            params: A dictionary of parameters for the call.
 763            key: The keypair for signing the extrinsic.
 764            module: The module containing the function.
 765            wait_for_inclusion: Wait for the call's inclusion in a block.
 766            wait_for_finalization: Wait for the transaction's finalization.
 767            sudo: Execute the call as a sudo (superuser) operation.
 768
 769        Returns:
 770            The receipt of the submitted extrinsic, if
 771              `wait_for_inclusion` is True. Otherwise, returns a string
 772              identifier of the extrinsic.
 773
 774        Raises:
 775            ChainTransactionError: If the transaction fails.
 776        """
 777
 778        if key is None and not unsigned:
 779            raise ValueError("Key must be provided for signed extrinsics.")
 780
 781        with self.get_conn() as substrate:
 782            if wait_for_finalization is None:
 783                wait_for_finalization = self.wait_for_finalization
 784
 785            call = substrate.compose_call(  # type: ignore
 786                call_module=module, call_function=fn, call_params=params
 787            )
 788            if sudo:
 789                call = substrate.compose_call(  # type: ignore
 790                    call_module="Sudo",
 791                    call_function="sudo",
 792                    call_params={
 793                        "call": call.value,  # type: ignore
 794                    },
 795                )
 796
 797            if not unsigned:
 798                extrinsic = substrate.create_signed_extrinsic(  # type: ignore
 799                    call=call, keypair=key  # type: ignore
 800                )  # type: ignore
 801            else:
 802                extrinsic = substrate.create_unsigned_extrinsic(call=call)  # type: ignore
 803
 804            response = substrate.submit_extrinsic(
 805                extrinsic=extrinsic,
 806                wait_for_inclusion=wait_for_inclusion,
 807                wait_for_finalization=wait_for_finalization,
 808            )
 809        if wait_for_inclusion:
 810            if not response.is_success:
 811                raise ChainTransactionError(
 812                    response.error_message, response  # type: ignore
 813                )
 814
 815        return response
 816
 817    def compose_call_multisig(
 818        self,
 819        fn: str,
 820        params: dict[str, Any],
 821        key: Keypair,
 822        signatories: list[Ss58Address],
 823        threshold: int,
 824        module: str = "SubspaceModule",
 825        wait_for_inclusion: bool = True,
 826        wait_for_finalization: bool | None = None,
 827        sudo: bool = False,
 828        era: dict[str, int] | None = None,
 829    ) -> ExtrinsicReceipt:
 830        """
 831        Composes and submits a multisignature call to the network node.
 832
 833        This method allows the composition and submission of a call that
 834        requires multiple signatures for execution, known as a multisignature
 835        call. It supports specifying signatories, a threshold of signatures for
 836        the call's execution, and an optional era for the call's mortality. The
 837        call can be a standard extrinsic, a sudo extrinsic for elevated
 838        permissions, or a multisig extrinsic if multiple signatures are
 839        required. Optionally, the method can wait for the call's inclusion in a
 840        block and/or its finalization. Make sure to pass all keys,
 841        that are part of the multisignature.
 842
 843        Args:
 844            fn: The function name to call on the network. params: A dictionary
 845            of parameters for the call. key: The keypair for signing the
 846            extrinsic. signatories: List of SS58 addresses of the signatories.
 847            Include ALL KEYS that are part of the multisig. threshold: The
 848            minimum number of signatories required to execute the extrinsic.
 849            module: The module containing the function to call.
 850            wait_for_inclusion: Whether to wait for the call's inclusion in a
 851            block. wait_for_finalization: Whether to wait for the transaction's
 852            finalization. sudo: Execute the call as a sudo (superuser)
 853            operation. era: Specifies the call's mortality in terms of blocks in
 854            the format
 855                {'period': amount_blocks}. If omitted, the extrinsic is
 856                immortal.
 857
 858        Returns:
 859            The receipt of the submitted extrinsic if `wait_for_inclusion` is
 860            True. Otherwise, returns a string identifier of the extrinsic.
 861
 862        Raises:
 863            ChainTransactionError: If the transaction fails.
 864        """
 865
 866        # getting the call ready
 867        with self.get_conn() as substrate:
 868            if wait_for_finalization is None:
 869                wait_for_finalization = self.wait_for_finalization
 870
 871            # prepares the `GenericCall` object
 872            call = substrate.compose_call(  # type: ignore
 873                call_module=module, call_function=fn, call_params=params
 874            )
 875            if sudo:
 876                call = substrate.compose_call(  # type: ignore
 877                    call_module="Sudo",
 878                    call_function="sudo",
 879                    call_params={
 880                        "call": call.value,  # type: ignore
 881                    },
 882                )
 883
 884            # modify the rpc methods at runtime, to allow for correct payment
 885            # fee calculation parity has a bug in this version,
 886            # where the method has to be removed
 887            rpc_methods = substrate.config.get("rpc_methods")  # type: ignore
 888
 889            if "state_call" in rpc_methods:  # type: ignore
 890                rpc_methods.remove("state_call")  # type: ignore
 891
 892            # create the multisig account
 893            multisig_acc = substrate.generate_multisig_account(  # type: ignore
 894                signatories, threshold
 895            )
 896
 897            # send the multisig extrinsic
 898            extrinsic = substrate.create_multisig_extrinsic(  # type: ignore
 899                call=call,  # type: ignore
 900                keypair=key,
 901                multisig_account=multisig_acc,  # type: ignore
 902                era=era,  # type: ignore
 903            )  # type: ignore
 904
 905            response = substrate.submit_extrinsic(
 906                extrinsic=extrinsic,
 907                wait_for_inclusion=wait_for_inclusion,
 908                wait_for_finalization=wait_for_finalization,
 909            )
 910
 911        if wait_for_inclusion:
 912            if not response.is_success:
 913                raise ChainTransactionError(
 914                    response.error_message, response  # type: ignore
 915                )
 916
 917        return response
 918
 919    def transfer(
 920        self,
 921        key: Keypair,
 922        amount: int,
 923        dest: Ss58Address,
 924    ) -> ExtrinsicReceipt:
 925        """
 926        Transfers a specified amount of tokens from the signer's account to the
 927        specified account.
 928
 929        Args:
 930            key: The keypair associated with the sender's account.
 931            amount: The amount to transfer, in nanotokens.
 932            dest: The SS58 address of the recipient.
 933
 934        Returns:
 935            A receipt of the transaction.
 936
 937        Raises:
 938            InsufficientBalanceError: If the sender's account does not have
 939              enough balance.
 940            ChainTransactionError: If the transaction fails.
 941        """
 942
 943        params = {"dest": dest, "value": amount}
 944
 945        return self.compose_call(
 946            module="Balances", fn="transfer_keep_alive", params=params, key=key
 947        )
 948
 949    def transfer_multiple(
 950        self,
 951        key: Keypair,
 952        destinations: list[Ss58Address],
 953        amounts: list[int],
 954        netuid: str | int = 0,
 955    ) -> ExtrinsicReceipt:
 956        """
 957        Transfers specified amounts of tokens from the signer's account to
 958        multiple target accounts.
 959
 960        The `destinations` and `amounts` lists must be of the same length.
 961
 962        Args:
 963            key: The keypair associated with the sender's account.
 964            destinations: A list of SS58 addresses of the recipients.
 965            amounts: Amount to transfer to each recipient, in nanotokens.
 966            netuid: The network identifier.
 967
 968        Returns:
 969            A receipt of the transaction.
 970
 971        Raises:
 972            InsufficientBalanceError: If the sender's account does not have
 973              enough balance for all transfers.
 974            ChainTransactionError: If the transaction fails.
 975        """
 976
 977        assert len(destinations) == len(amounts)
 978
 979        # extract existential deposit from amounts
 980        existential_deposit = self.get_existential_deposit()
 981        amounts = [a - existential_deposit for a in amounts]
 982
 983        params = {
 984            "netuid": netuid,
 985            "destinations": destinations,
 986            "amounts": amounts,
 987        }
 988
 989        return self.compose_call(
 990            module="SubspaceModule", fn="transfer_multiple", params=params, key=key
 991        )
 992
 993    def stake(
 994        self,
 995        key: Keypair,
 996        amount: int,
 997        dest: Ss58Address,
 998    ) -> ExtrinsicReceipt:
 999        """
1000        Stakes the specified amount of tokens to a module key address.
1001
1002        Args:
1003            key: The keypair associated with the staker's account.
1004            amount: The amount of tokens to stake, in nanotokens.
1005            dest: The SS58 address of the module key to stake to.
1006            netuid: The network identifier.
1007
1008        Returns:
1009            A receipt of the staking transaction.
1010
1011        Raises:
1012            InsufficientBalanceError: If the staker's account does not have
1013              enough balance.
1014            ChainTransactionError: If the transaction fails.
1015        """
1016
1017        params = {"amount": amount, "module_key": dest}
1018
1019        return self.compose_call(fn="add_stake", params=params, key=key)
1020
1021    def unstake(
1022        self,
1023        key: Keypair,
1024        amount: int,
1025        dest: Ss58Address,
1026    ) -> ExtrinsicReceipt:
1027        """
1028        Unstakes the specified amount of tokens from a module key address.
1029
1030        Args:
1031            key: The keypair associated with the unstaker's account.
1032            amount: The amount of tokens to unstake, in nanotokens.
1033            dest: The SS58 address of the module key to unstake from.
1034            netuid: The network identifier.
1035
1036        Returns:
1037            A receipt of the unstaking transaction.
1038
1039        Raises:
1040            InsufficientStakeError: If the staked key does not have enough
1041              staked tokens by the signer key.
1042            ChainTransactionError: If the transaction fails.
1043        """
1044
1045        params = {"amount": amount, "module_key": dest}
1046        return self.compose_call(fn="remove_stake", params=params, key=key)
1047
1048    def update_module(
1049        self,
1050        key: Keypair,
1051        name: str,
1052        address: str,
1053        metadata: str | None = None,
1054        delegation_fee: int = 20,
1055        netuid: int = 0,
1056    ) -> ExtrinsicReceipt:
1057        """
1058        Updates the parameters of a registered module.
1059
1060        The delegation fee must be an integer between 0 and 100.
1061
1062        Args:
1063            key: The keypair associated with the module's account.
1064            name: The new name for the module. If None, the name is not updated.
1065            address: The new address for the module.
1066                If None, the address is not updated.
1067            delegation_fee: The new delegation fee for the module,
1068                between 0 and 100.
1069            netuid: The network identifier.
1070
1071        Returns:
1072            A receipt of the module update transaction.
1073
1074        Raises:
1075            InvalidParameterError: If the provided parameters are invalid.
1076            ChainTransactionError: If the transaction fails.
1077        """
1078
1079        assert isinstance(delegation_fee, int)
1080
1081        params = {
1082            "netuid": netuid,
1083            "name": name,
1084            "address": address,
1085            "delegation_fee": delegation_fee,
1086            "metadata": metadata,
1087        }
1088
1089        response = self.compose_call("update_module", params=params, key=key)
1090
1091        return response
1092
1093    def register_module(
1094        self,
1095        key: Keypair,
1096        name: str,
1097        address: str | None = None,
1098        subnet: str = "Rootnet",
1099        metadata: str | None = None,
1100    ) -> ExtrinsicReceipt:
1101        """
1102        Registers a new module in the network.
1103
1104        Args:
1105            key: The keypair used for registering the module.
1106            name: The name of the module. If None, a default or previously
1107                set name is used. # How does this work?
1108            address: The address of the module. If None, a default or
1109                previously set address is used. # How does this work?
1110            subnet: The network subnet to register the module in.
1111            min_stake: The minimum stake required for the module, in nanotokens.
1112                If None, a default value is used.
1113
1114        Returns:
1115            A receipt of the registration transaction.
1116
1117        Raises:
1118            InvalidParameterError: If the provided parameters are invalid.
1119            ChainTransactionError: If the transaction fails.
1120        """
1121
1122        key_addr = key.ss58_address
1123
1124        params = {
1125            "network": subnet,
1126            "address": address,
1127            "name": name,
1128            "module_key": key_addr,
1129            "metadata": metadata,
1130        }
1131
1132        response = self.compose_call("register", params=params, key=key)
1133        return response
1134
1135    def vote(
1136        self,
1137        key: Keypair,
1138        uids: list[int],
1139        weights: list[int],
1140        netuid: int = 0,
1141    ) -> ExtrinsicReceipt:
1142        """
1143        Casts votes on a list of module UIDs with corresponding weights.
1144
1145        The length of the UIDs list and the weights list should be the same.
1146        Each weight corresponds to the UID at the same index.
1147
1148        Args:
1149            key: The keypair used for signing the vote transaction.
1150            uids: A list of module UIDs to vote on.
1151            weights: A list of weights corresponding to each UID.
1152            netuid: The network identifier.
1153
1154        Returns:
1155            A receipt of the voting transaction.
1156
1157        Raises:
1158            InvalidParameterError: If the lengths of UIDs and weights lists
1159                do not match.
1160            ChainTransactionError: If the transaction fails.
1161        """
1162
1163        assert len(uids) == len(weights)
1164
1165        params = {
1166            "uids": uids,
1167            "weights": weights,
1168            "netuid": netuid,
1169        }
1170
1171        response = self.compose_call("set_weights", params=params, key=key)
1172
1173        return response
1174
1175    def update_subnet(
1176        self,
1177        key: Keypair,
1178        params: SubnetParams,
1179        netuid: int = 0,
1180    ) -> ExtrinsicReceipt:
1181        """
1182        Update a subnet's configuration.
1183
1184        It requires the founder key for authorization.
1185
1186        Args:
1187            key: The founder keypair of the subnet.
1188            params: The new parameters for the subnet.
1189            netuid: The network identifier.
1190
1191        Returns:
1192            A receipt of the subnet update transaction.
1193
1194        Raises:
1195            AuthorizationError: If the key is not authorized.
1196            ChainTransactionError: If the transaction fails.
1197        """
1198
1199        general_params = dict(params)
1200        general_params["netuid"] = netuid
1201
1202        response = self.compose_call(
1203            fn="update_subnet",
1204            params=general_params,
1205            key=key,
1206        )
1207
1208        return response
1209
1210    def transfer_stake(
1211        self,
1212        key: Keypair,
1213        amount: int,
1214        from_module_key: Ss58Address,
1215        dest_module_address: Ss58Address,
1216    ) -> ExtrinsicReceipt:
1217        """
1218        Realocate staked tokens from one staked module to another module.
1219
1220        Args:
1221            key: The keypair associated with the account that is delegating the tokens.
1222            amount: The amount of staked tokens to transfer, in nanotokens.
1223            from_module_key: The SS58 address of the module you want to transfer from (currently delegated by the key).
1224            dest_module_address: The SS58 address of the destination (newly delegated key).
1225            netuid: The network identifier.
1226
1227        Returns:
1228            A receipt of the stake transfer transaction.
1229
1230        Raises:
1231            InsufficientStakeError: If the source module key does not have
1232            enough staked tokens. ChainTransactionError: If the transaction
1233            fails.
1234        """
1235
1236        amount = amount - self.get_existential_deposit()
1237
1238        params = {
1239            "amount": amount,
1240            "module_key": from_module_key,
1241            "new_module_key": dest_module_address,
1242        }
1243
1244        response = self.compose_call("transfer_stake", key=key, params=params)
1245
1246        return response
1247
1248    def multiunstake(
1249        self,
1250        key: Keypair,
1251        keys: list[Ss58Address],
1252        amounts: list[int],
1253    ) -> ExtrinsicReceipt:
1254        """
1255        Unstakes tokens from multiple module keys.
1256
1257        And the lists `keys` and `amounts` must be of the same length. Each
1258        amount corresponds to the module key at the same index.
1259
1260        Args:
1261            key: The keypair associated with the unstaker's account.
1262            keys: A list of SS58 addresses of the module keys to unstake from.
1263            amounts: A list of amounts to unstake from each module key,
1264              in nanotokens.
1265            netuid: The network identifier.
1266
1267        Returns:
1268            A receipt of the multi-unstaking transaction.
1269
1270        Raises:
1271            MismatchedLengthError: If the lengths of keys and amounts lists do
1272            not match. InsufficientStakeError: If any of the module keys do not
1273            have enough staked tokens. ChainTransactionError: If the transaction
1274            fails.
1275        """
1276
1277        assert len(keys) == len(amounts)
1278
1279        params = {"module_keys": keys, "amounts": amounts}
1280
1281        response = self.compose_call("remove_stake_multiple", params=params, key=key)
1282
1283        return response
1284
1285    def multistake(
1286        self,
1287        key: Keypair,
1288        keys: list[Ss58Address],
1289        amounts: list[int],
1290    ) -> ExtrinsicReceipt:
1291        """
1292        Stakes tokens to multiple module keys.
1293
1294        The lengths of the `keys` and `amounts` lists must be the same. Each
1295        amount corresponds to the module key at the same index.
1296
1297        Args:
1298            key: The keypair associated with the staker's account.
1299            keys: A list of SS58 addresses of the module keys to stake to.
1300            amounts: A list of amounts to stake to each module key,
1301                in nanotokens.
1302            netuid: The network identifier.
1303
1304        Returns:
1305            A receipt of the multi-staking transaction.
1306
1307        Raises:
1308            MismatchedLengthError: If the lengths of keys and amounts lists
1309                do not match.
1310            ChainTransactionError: If the transaction fails.
1311        """
1312
1313        assert len(keys) == len(amounts)
1314
1315        params = {
1316            "module_keys": keys,
1317            "amounts": amounts,
1318        }
1319
1320        response = self.compose_call("add_stake_multiple", params=params, key=key)
1321
1322        return response
1323
1324    def add_profit_shares(
1325        self,
1326        key: Keypair,
1327        keys: list[Ss58Address],
1328        shares: list[int],
1329    ) -> ExtrinsicReceipt:
1330        """
1331        Allocates profit shares to multiple keys.
1332
1333        The lists `keys` and `shares` must be of the same length,
1334        with each share amount corresponding to the key at the same index.
1335
1336        Args:
1337            key: The keypair associated with the account
1338                distributing the shares.
1339            keys: A list of SS58 addresses to allocate shares to.
1340            shares: A list of share amounts to allocate to each key,
1341                in nanotokens.
1342
1343        Returns:
1344            A receipt of the profit sharing transaction.
1345
1346        Raises:
1347            MismatchedLengthError: If the lengths of keys and shares
1348                lists do not match.
1349            ChainTransactionError: If the transaction fails.
1350        """
1351
1352        assert len(keys) == len(shares)
1353
1354        params = {"keys": keys, "shares": shares}
1355
1356        response = self.compose_call("add_profit_shares", params=params, key=key)
1357
1358        return response
1359
1360    def add_subnet_proposal(
1361        self, key: Keypair,
1362        params: dict[str, Any],
1363        ipfs: str,
1364        netuid: int = 0
1365    ) -> ExtrinsicReceipt:
1366        """
1367        Submits a proposal for creating or modifying a subnet within the
1368        network.
1369
1370        The proposal includes various parameters like the name, founder, share
1371        allocations, and other subnet-specific settings.
1372
1373        Args:
1374            key: The keypair used for signing the proposal transaction.
1375            params: The parameters for the subnet proposal.
1376            netuid: The network identifier.
1377
1378        Returns:
1379            A receipt of the subnet proposal transaction.
1380
1381        Raises:
1382            InvalidParameterError: If the provided subnet
1383                parameters are invalid.
1384            ChainTransactionError: If the transaction fails.
1385        """
1386
1387        general_params = dict(params)
1388        general_params["subnet_id"] = netuid
1389        general_params["data"] = ipfs
1390        # breakpoint()
1391        # general_params["burn_config"] = json.dumps(general_params["burn_config"])
1392        response = self.compose_call(
1393            fn="add_subnet_params_proposal",
1394            params=general_params,
1395            key=key,
1396            module="GovernanceModule",
1397        )
1398
1399        return response
1400
1401    def add_custom_proposal(
1402        self,
1403        key: Keypair,
1404        cid: str,
1405    ) -> ExtrinsicReceipt:
1406
1407        params = {"data": cid}
1408
1409        response = self.compose_call(
1410            fn="add_global_custom_proposal",
1411            params=params,
1412            key=key,
1413            module="GovernanceModule",
1414        )
1415        return response
1416
1417    def add_custom_subnet_proposal(
1418        self,
1419        key: Keypair,
1420        cid: str,
1421        netuid: int = 0,
1422    ) -> ExtrinsicReceipt:
1423        """
1424        Submits a proposal for creating or modifying a custom subnet within the
1425        network.
1426
1427        The proposal includes various parameters like the name, founder, share
1428        allocations, and other subnet-specific settings.
1429
1430        Args:
1431            key: The keypair used for signing the proposal transaction.
1432            params: The parameters for the subnet proposal.
1433            netuid: The network identifier.
1434
1435        Returns:
1436            A receipt of the subnet proposal transaction.
1437        """
1438
1439        params = {
1440            "data": cid,
1441            "subnet_id": netuid,
1442        }
1443
1444        response = self.compose_call(
1445            fn="add_subnet_custom_proposal",
1446            params=params,
1447            key=key,
1448            module="GovernanceModule",
1449        )
1450
1451        return response
1452
1453    def add_global_proposal(
1454        self,
1455        key: Keypair,
1456        params: NetworkParams,
1457        cid: str | None,
1458    ) -> ExtrinsicReceipt:
1459        """
1460        Submits a proposal for altering the global network parameters.
1461
1462        Allows for the submission of a proposal to
1463        change various global parameters
1464        of the network, such as emission rates, rate limits, and voting
1465        thresholds. It is used to
1466        suggest changes that affect the entire network's operation.
1467
1468        Args:
1469            key: The keypair used for signing the proposal transaction.
1470            params: A dictionary containing global network parameters
1471                    like maximum allowed subnets, modules,
1472                    transaction rate limits, and others.
1473
1474        Returns:
1475            A receipt of the global proposal transaction.
1476
1477        Raises:
1478            InvalidParameterError: If the provided network
1479                parameters are invalid.
1480            ChainTransactionError: If the transaction fails.
1481        """
1482        general_params = cast(dict[str, Any], params)
1483        cid = cid or ""
1484        general_params["data"] = cid
1485
1486        response = self.compose_call(
1487            fn="add_global_params_proposal",
1488            params=general_params,
1489            key=key,
1490            module="GovernanceModule",
1491        )
1492
1493        return response
1494
1495    def vote_on_proposal(
1496        self,
1497        key: Keypair,
1498        proposal_id: int,
1499        agree: bool,
1500    ) -> ExtrinsicReceipt:
1501        """
1502        Casts a vote on a specified proposal within the network.
1503
1504        Args:
1505            key: The keypair used for signing the vote transaction.
1506            proposal_id: The unique identifier of the proposal to vote on.
1507
1508        Returns:
1509            A receipt of the voting transaction in nanotokens.
1510
1511        Raises:
1512            InvalidProposalIDError: If the provided proposal ID does not
1513                exist or is invalid.
1514            ChainTransactionError: If the transaction fails.
1515        """
1516
1517        params = {"proposal_id": proposal_id, "agree": agree}
1518
1519        response = self.compose_call(
1520            "vote_proposal",
1521            key=key,
1522            params=params,
1523            module="GovernanceModule",
1524        )
1525
1526        return response
1527
1528    def unvote_on_proposal(
1529        self,
1530        key: Keypair,
1531        proposal_id: int,
1532    ) -> ExtrinsicReceipt:
1533        """
1534        Retracts a previously cast vote on a specified proposal.
1535
1536        Args:
1537            key: The keypair used for signing the unvote transaction.
1538            proposal_id: The unique identifier of the proposal to withdraw the
1539                vote from.
1540
1541        Returns:
1542            A receipt of the unvoting transaction in nanotokens.
1543
1544        Raises:
1545            InvalidProposalIDError: If the provided proposal ID does not
1546                exist or is invalid.
1547            ChainTransactionError: If the transaction fails to be processed, or
1548                if there was no prior vote to retract.
1549        """
1550
1551        params = {"proposal_id": proposal_id}
1552
1553        response = self.compose_call(
1554            "remove_vote_proposal",
1555            key=key,
1556            params=params,
1557            module="GovernanceModule",
1558        )
1559
1560        return response
1561
1562    def enable_vote_power_delegation(self, key: Keypair) -> ExtrinsicReceipt:
1563        """
1564        Enables vote power delegation for the signer's account.
1565
1566        Args:
1567            key: The keypair used for signing the delegation transaction.
1568
1569        Returns:
1570            A receipt of the vote power delegation transaction.
1571
1572        Raises:
1573            ChainTransactionError: If the transaction fails.
1574        """
1575
1576        response = self.compose_call(
1577            "enable_vote_power_delegation",
1578            params={},
1579            key=key,
1580            module="GovernanceModule",
1581        )
1582
1583        return response
1584
1585    def disable_vote_power_delegation(self, key: Keypair) -> ExtrinsicReceipt:
1586        """
1587        Disables vote power delegation for the signer's account.
1588
1589        Args:
1590            key: The keypair used for signing the delegation transaction.
1591
1592        Returns:
1593            A receipt of the vote power delegation transaction.
1594
1595        Raises:
1596            ChainTransactionError: If the transaction fails.
1597        """
1598
1599        response = self.compose_call(
1600            "disable_vote_power_delegation",
1601            params={},
1602            key=key,
1603            module="GovernanceModule",
1604        )
1605
1606        return response
1607
1608    def add_dao_application(
1609        self, key: Keypair, application_key: Ss58Address, data: str
1610    ) -> ExtrinsicReceipt:
1611        """
1612        Submits a new application to the general subnet DAO.
1613
1614        Args:
1615            key: The keypair used for signing the application transaction.
1616            application_key: The SS58 address of the application key.
1617            data: The data associated with the application.
1618
1619        Returns:
1620            A receipt of the application transaction.
1621
1622        Raises:
1623            ChainTransactionError: If the transaction fails.
1624        """
1625
1626        params = {"application_key": application_key, "data": data}
1627
1628        response = self.compose_call(
1629            "add_dao_application", module="GovernanceModule", key=key,
1630            params=params
1631        )
1632
1633        return response
1634
1635    def query_map_curator_applications(self) -> dict[str, dict[str, str]]:
1636        query_result = self.query_map(
1637            "CuratorApplications", module="GovernanceModule", params=[],
1638            extract_value=False
1639        )
1640        applications = query_result.get("CuratorApplications", {})
1641        return applications
1642
1643    def query_map_proposals(
1644        self, extract_value: bool = False
1645    ) -> dict[int, dict[str, Any]]:
1646        """
1647        Retrieves a mappping of proposals from the network.
1648
1649        Queries the network and returns a mapping of proposal IDs to
1650        their respective parameters.
1651
1652        Returns:
1653            A dictionary mapping proposal IDs
1654            to dictionaries of their parameters.
1655
1656        Raises:
1657            QueryError: If the query to the network fails or is invalid.
1658        """
1659
1660        return self.query_map(
1661            "Proposals", extract_value=extract_value, module="GovernanceModule"
1662        )["Proposals"]
1663
1664    def query_map_weights(
1665        self, netuid: int = 0, extract_value: bool = False
1666    ) -> dict[int, list[tuple[int, int]]] | None:
1667        """
1668        Retrieves a mapping of weights for keys on the network.
1669
1670        Queries the network and returns a mapping of key UIDs to
1671        their respective weights.
1672
1673        Args:
1674            netuid: The network UID from which to get the weights.
1675
1676        Returns:
1677            A dictionary mapping key UIDs to lists of their weights.
1678
1679        Raises:
1680            QueryError: If the query to the network fails or is invalid.
1681        """
1682
1683        weights_dict = self.query_map(
1684            "Weights",
1685            [netuid],
1686            extract_value=extract_value
1687        ).get("Weights")
1688        return weights_dict
1689
1690    def query_map_key(
1691        self,
1692        netuid: int = 0,
1693        extract_value: bool = False,
1694    ) -> dict[int, Ss58Address]:
1695        """
1696        Retrieves a map of keys from the network.
1697
1698        Fetches a mapping of key UIDs to their associated
1699        addresses on the network.
1700        The query can be targeted at a specific network UID if required.
1701
1702        Args:
1703            netuid: The network UID from which to get the keys.
1704
1705        Returns:
1706            A dictionary mapping key UIDs to their addresses.
1707
1708        Raises:
1709            QueryError: If the query to the network fails or is invalid.
1710        """
1711        return self.query_map("Keys", [netuid], extract_value=extract_value)["Keys"]
1712
1713    def query_map_address(
1714        self, netuid: int = 0, extract_value: bool = False
1715    ) -> dict[int, str]:
1716        """
1717        Retrieves a map of key addresses from the network.
1718
1719        Queries the network for a mapping of key UIDs to their addresses.
1720
1721        Args:
1722            netuid: The network UID from which to get the addresses.
1723
1724        Returns:
1725            A dictionary mapping key UIDs to their addresses.
1726
1727        Raises:
1728            QueryError: If the query to the network fails or is invalid.
1729        """
1730
1731        return self.query_map("Address", [netuid], extract_value=extract_value)[
1732            "Address"
1733        ]
1734
1735    def query_map_emission(self, extract_value: bool = False) -> dict[int, list[int]]:
1736        """
1737        Retrieves a map of emissions for keys on the network.
1738
1739        Queries the network to get a mapping of
1740        key UIDs to their emission values.
1741
1742        Returns:
1743            A dictionary mapping key UIDs to lists of their emission values.
1744
1745        Raises:
1746            QueryError: If the query to the network fails or is invalid.
1747        """
1748
1749        return self.query_map("Emission", extract_value=extract_value)["Emission"]
1750
1751    def query_map_pending_emission(self, extract_value: bool = False) -> int:
1752        """
1753        Retrieves a map of pending emissions for the subnets.
1754
1755        Queries the network for a mapping of subnet UIDs to their pending emission values.
1756
1757        Returns:
1758            A dictionary mapping subnet UIDs to their pending emission values.
1759
1760        Raises:
1761            QueryError: If the query to the network fails or is invalid.
1762        """
1763        return self.query_map("PendingEmission", extract_value=extract_value, module="SubnetEmissionModule")["PendingEmission"]
1764
1765    def query_map_subnet_emission(self, extract_value: bool = False) -> dict[int, int]:
1766        """
1767        Retrieves a map of subnet emissions for the network.
1768
1769        Queries the network for a mapping of subnet UIDs to their emission values.
1770
1771        Returns:
1772            A dictionary mapping subnet UIDs to their emission values.
1773
1774        Raises:
1775            QueryError: If the query to the network fails or is invalid.
1776        """
1777
1778        return self.query_map("SubnetEmission", extract_value=extract_value, module="SubnetEmissionModule")["SubnetEmission"]
1779
1780    def query_map_subnet_consensus(self, extract_value: bool = False) -> dict[int, str]:
1781        """
1782        Retrieves a map of subnet consensus types for the network.
1783
1784        Queries the network for a mapping of subnet UIDs to their consensus types.
1785
1786        Returns:
1787            A dictionary mapping subnet UIDs to their consensus types.
1788
1789        Raises:
1790            QueryError: If the query to the network fails or is invalid.
1791        """
1792
1793        return self.query_map("SubnetConsensusType", extract_value=extract_value, module="SubnetEmissionModule")["SubnetConsensusType"]
1794
1795    def query_map_incentive(self, extract_value: bool = False) -> dict[int, list[int]]:
1796        """
1797        Retrieves a mapping of incentives for keys on the network.
1798
1799        Queries the network and returns a mapping of key UIDs to
1800        their respective incentive values.
1801
1802        Returns:
1803            A dictionary mapping key UIDs to lists of their incentive values.
1804
1805        Raises:
1806            QueryError: If the query to the network fails or is invalid.
1807        """
1808
1809        return self.query_map("Incentive", extract_value=extract_value)["Incentive"]
1810
1811    def query_map_dividend(self, extract_value: bool = False) -> dict[int, list[int]]:
1812        """
1813        Retrieves a mapping of dividends for keys on the network.
1814
1815        Queries the network for a mapping of key UIDs to
1816        their dividend values.
1817
1818        Returns:
1819            A dictionary mapping key UIDs to lists of their dividend values.
1820
1821        Raises:
1822            QueryError: If the query to the network fails or is invalid.
1823        """
1824
1825        return self.query_map("Dividends", extract_value=extract_value)["Dividends"]
1826
1827    def query_map_regblock(
1828        self, netuid: int = 0, extract_value: bool = False
1829    ) -> dict[int, int]:
1830        """
1831        Retrieves a mapping of registration blocks for keys on the network.
1832
1833        Queries the network for a mapping of key UIDs to
1834        the blocks where they were registered.
1835
1836        Args:
1837            netuid: The network UID from which to get the registration blocks.
1838
1839        Returns:
1840            A dictionary mapping key UIDs to their registration blocks.
1841
1842        Raises:
1843            QueryError: If the query to the network fails or is invalid.
1844        """
1845
1846        return self.query_map(
1847            "RegistrationBlock", [netuid], extract_value=extract_value
1848        )["RegistrationBlock"]
1849
1850    def query_map_lastupdate(self, extract_value: bool = False) -> dict[int, list[int]]:
1851        """
1852        Retrieves a mapping of the last update times for keys on the network.
1853
1854        Queries the network for a mapping of key UIDs to their last update times.
1855
1856        Returns:
1857            A dictionary mapping key UIDs to lists of their last update times.
1858
1859        Raises:
1860            QueryError: If the query to the network fails or is invalid.
1861        """
1862
1863        return self.query_map("LastUpdate", extract_value=extract_value)["LastUpdate"]
1864
1865    def query_map_stakefrom(
1866        self, extract_value: bool = False
1867    ) -> dict[Ss58Address, list[tuple[Ss58Address, int]]]:
1868        """
1869        Retrieves a mapping of stakes from various sources for keys on the network.
1870
1871        Queries the network to obtain a mapping of key addresses to the sources
1872        and amounts of stakes they have received.
1873
1874        Args:
1875            netuid: The network UID from which to get the stakes.
1876
1877        Returns:
1878            A dictionary mapping key addresses to lists of tuples
1879            (module_key_address, amount).
1880
1881        Raises:
1882            QueryError: If the query to the network fails or is invalid.
1883        """
1884
1885        result = self.query_map("StakeFrom", [], extract_value=extract_value)[
1886            "StakeFrom"
1887        ]
1888
1889        return transform_stake_dmap(result)
1890
1891    def query_map_staketo(
1892        self, extract_value: bool = False
1893    ) -> dict[str, list[tuple[str, int]]]:
1894        """
1895        Retrieves a mapping of stakes to destinations for keys on the network.
1896
1897        Queries the network for a mapping of key addresses to the destinations
1898        and amounts of stakes they have made.
1899
1900        Args:
1901            netuid: The network UID from which to get the stakes.
1902
1903        Returns:
1904            A dictionary mapping key addresses to lists of tuples
1905            (module_key_address, amount).
1906
1907        Raises:
1908            QueryError: If the query to the network fails or is invalid.
1909        """
1910
1911        result = self.query_map("StakeTo", [], extract_value=extract_value)[
1912            "StakeTo"
1913        ]
1914        return transform_stake_dmap(result)
1915
1916    def query_map_delegationfee(
1917        self, netuid: int = 0, extract_value: bool = False
1918    ) -> dict[str, int]:
1919        """
1920        Retrieves a mapping of delegation fees for keys on the network.
1921
1922        Queries the network to obtain a mapping of key addresses to their
1923        respective delegation fees.
1924
1925        Args:
1926            netuid: The network UID to filter the delegation fees.
1927
1928        Returns:
1929            A dictionary mapping key addresses to their delegation fees.
1930
1931        Raises:
1932            QueryError: If the query to the network fails or is invalid.
1933        """
1934
1935        return self.query_map("DelegationFee", [netuid], extract_value=extract_value)[
1936            "DelegationFee"
1937        ]
1938
1939    def query_map_tempo(self, extract_value: bool = False) -> dict[int, int]:
1940        """
1941        Retrieves a mapping of tempo settings for the network.
1942
1943        Queries the network to obtain the tempo (rate of reward distributions)
1944        settings for various network subnets.
1945
1946        Returns:
1947            A dictionary mapping network UIDs to their tempo settings.
1948
1949        Raises:
1950            QueryError: If the query to the network fails or is invalid.
1951        """
1952
1953        return self.query_map("Tempo", extract_value=extract_value)["Tempo"]
1954
1955    def query_map_immunity_period(self, extract_value: bool) -> dict[int, int]:
1956        """
1957        Retrieves a mapping of immunity periods for the network.
1958
1959        Queries the network for the immunity period settings,
1960        which represent the time duration during which modules
1961        can not get deregistered.
1962
1963        Returns:
1964            A dictionary mapping network UIDs to their immunity period settings.
1965
1966        Raises:
1967            QueryError: If the query to the network fails or is invalid.
1968        """
1969
1970        return self.query_map("ImmunityPeriod", extract_value=extract_value)[
1971            "ImmunityPeriod"
1972        ]
1973
1974    def query_map_min_allowed_weights(
1975        self, extract_value: bool = False
1976    ) -> dict[int, int]:
1977        """
1978        Retrieves a mapping of minimum allowed weights for the network.
1979
1980        Queries the network to obtain the minimum allowed weights,
1981        which are the lowest permissible weight values that can be set by
1982        validators.
1983
1984        Returns:
1985            A dictionary mapping network UIDs to
1986            their minimum allowed weight values.
1987
1988        Raises:
1989            QueryError: If the query to the network fails or is invalid.
1990        """
1991
1992        return self.query_map("MinAllowedWeights", extract_value=extract_value)[
1993            "MinAllowedWeights"
1994        ]
1995
1996    def query_map_max_allowed_weights(
1997        self, extract_value: bool = False
1998    ) -> dict[int, int]:
1999        """
2000        Retrieves a mapping of maximum allowed weights for the network.
2001
2002        Queries the network for the maximum allowed weights,
2003        which are the highest permissible
2004        weight values that can be set by validators.
2005
2006        Returns:
2007            A dictionary mapping network UIDs to
2008            their maximum allowed weight values.
2009
2010        Raises:
2011            QueryError: If the query to the network fails or is invalid.
2012        """
2013
2014        return self.query_map("MaxAllowedWeights", extract_value=extract_value)[
2015            "MaxAllowedWeights"
2016        ]
2017
2018    def query_map_max_allowed_uids(self, extract_value: bool = False) -> dict[int, int]:
2019        """
2020        Queries the network for the maximum number of allowed user IDs (UIDs)
2021        for each network subnet.
2022
2023        Fetches a mapping of network subnets to their respective
2024        limits on the number of user IDs that can be created or used.
2025
2026        Returns:
2027            A dictionary mapping network UIDs (unique identifiers) to their
2028            maximum allowed number of UIDs.
2029            Each entry represents a network subnet
2030            with its corresponding UID limit.
2031
2032        Raises:
2033            QueryError: If the query to the network fails or is invalid.
2034        """
2035
2036        return self.query_map("MaxAllowedUids", extract_value=extract_value)[
2037            "MaxAllowedUids"
2038        ]
2039
2040    def query_map_min_stake(self, extract_value: bool = False) -> dict[int, int]:
2041        """
2042        Retrieves a mapping of minimum allowed stake on the network.
2043
2044        Queries the network to obtain the minimum number of stake,
2045        which is represented in nanotokens.
2046
2047        Returns:
2048            A dictionary mapping network UIDs to
2049            their minimum allowed stake values.
2050
2051        Raises:
2052            QueryError: If the query to the network fails or is invalid.
2053        """
2054
2055        return self.query_map("MinStake", extract_value=extract_value)["MinStake"]
2056
2057    def query_map_max_stake(self, extract_value: bool = False) -> dict[int, int]:
2058        """
2059        Retrieves a mapping of the maximum stake values for the network.
2060
2061        Queries the network for the maximum stake values across various s
2062        ubnets of the network.
2063
2064        Returns:
2065            A dictionary mapping network UIDs to their maximum stake values.
2066
2067        Raises:
2068            QueryError: If the query to the network fails or is invalid.
2069        """
2070
2071        return self.query_map("MaxStake", extract_value=extract_value)["MaxStake"]
2072
2073    def query_map_founder(self, extract_value: bool = False) -> dict[int, str]:
2074        """
2075        Retrieves a mapping of founders for the network.
2076
2077        Queries the network to obtain the founders associated with
2078        various subnets.
2079
2080        Returns:
2081            A dictionary mapping network UIDs to their respective founders.
2082
2083        Raises:
2084            QueryError: If the query to the network fails or is invalid.
2085        """
2086
2087        return self.query_map("Founder", extract_value=extract_value)["Founder"]
2088
2089    def query_map_founder_share(self, extract_value: bool = False) -> dict[int, int]:
2090        """
2091        Retrieves a mapping of founder shares for the network.
2092
2093        Queries the network for the share percentages
2094        allocated to founders across different subnets.
2095
2096        Returns:
2097            A dictionary mapping network UIDs to their founder share percentages.
2098
2099        Raises:
2100            QueryError: If the query to the network fails or is invalid.
2101        """
2102
2103        return self.query_map("FounderShare", extract_value=extract_value)[
2104            "FounderShare"
2105        ]
2106
2107    def query_map_incentive_ratio(self, extract_value: bool = False) -> dict[int, int]:
2108        """
2109        Retrieves a mapping of incentive ratios for the network.
2110
2111        Queries the network for the incentive ratios,
2112        which are the proportions of rewards or incentives
2113        allocated in different subnets of the network.
2114
2115        Returns:
2116            A dictionary mapping network UIDs to their incentive ratios.
2117
2118        Raises:
2119            QueryError: If the query to the network fails or is invalid.
2120        """
2121
2122        return self.query_map("IncentiveRatio", extract_value=extract_value)[
2123            "IncentiveRatio"
2124        ]
2125
2126    def query_map_trust_ratio(self, extract_value: bool = False) -> dict[int, int]:
2127        """
2128        Retrieves a mapping of trust ratios for the network.
2129
2130        Queries the network for trust ratios,
2131        indicative of the level of trust or credibility assigned
2132        to different subnets of the network.
2133
2134        Returns:
2135            A dictionary mapping network UIDs to their trust ratios.
2136
2137        Raises:
2138            QueryError: If the query to the network fails or is invalid.
2139        """
2140
2141        return self.query_map("TrustRatio", extract_value=extract_value)["TrustRatio"]
2142
2143    def query_map_vote_mode_subnet(self, extract_value: bool = False) -> dict[int, str]:
2144        """
2145        Retrieves a mapping of vote modes for subnets within the network.
2146
2147        Queries the network for the voting modes used in different
2148        subnets, which define the methodology or approach of voting within those
2149        subnets.
2150
2151        Returns:
2152            A dictionary mapping network UIDs to their vote
2153            modes for subnets.
2154
2155        Raises:
2156            QueryError: If the query to the network fails or is invalid.
2157        """
2158
2159        return self.query_map("VoteModeSubnet", extract_value=extract_value)[
2160            "VoteModeSubnet"
2161        ]
2162
2163    def query_map_legit_whitelist(
2164        self, extract_value: bool = False
2165    ) -> dict[Ss58Address, int]:
2166        """
2167        Retrieves a mapping of whitelisted addresses for the network.
2168
2169        Queries the network for a mapping of whitelisted addresses
2170        and their respective legitimacy status.
2171
2172        Returns:
2173            A dictionary mapping addresses to their legitimacy status.
2174
2175        Raises:
2176            QueryError: If the query to the network fails or is invalid.
2177        """
2178
2179        return self.query_map(
2180            "LegitWhitelist", module="GovernanceModule", extract_value=extract_value)[
2181            "LegitWhitelist"
2182        ]
2183
2184    def query_map_subnet_names(self, extract_value: bool = False) -> dict[int, str]:
2185        """
2186        Retrieves a mapping of subnet names within the network.
2187
2188        Queries the network for the names of various subnets,
2189        providing an overview of the different
2190        subnets within the network.
2191
2192        Returns:
2193            A dictionary mapping network UIDs to their subnet names.
2194
2195        Raises:
2196            QueryError: If the query to the network fails or is invalid.
2197        """
2198
2199        return self.query_map("SubnetNames", extract_value=extract_value)["SubnetNames"]
2200
2201    def query_map_balances(
2202        self, extract_value: bool = False
2203    ) -> dict[str, dict[str, int | dict[str, int | float]]]:
2204        """
2205        Retrieves a mapping of account balances within the network.
2206
2207        Queries the network for the balances associated with different accounts.
2208        It provides detailed information including various types of
2209        balances for each account.
2210
2211        Returns:
2212            A dictionary mapping account addresses to their balance details.
2213
2214        Raises:
2215            QueryError: If the query to the network fails or is invalid.
2216        """
2217
2218        return self.query_map("Account", module="System", extract_value=extract_value)[
2219            "Account"
2220        ]
2221
2222    def query_map_registration_blocks(
2223        self, netuid: int = 0, extract_value: bool = False
2224    ) -> dict[int, int]:
2225        """
2226        Retrieves a mapping of registration blocks for UIDs on the network.
2227
2228        Queries the network to find the block numbers at which various
2229        UIDs were registered.
2230
2231        Args:
2232            netuid: The network UID from which to get the registrations.
2233
2234        Returns:
2235            A dictionary mapping UIDs to their registration block numbers.
2236
2237        Raises:
2238            QueryError: If the query to the network fails or is invalid.
2239        """
2240
2241        return self.query_map(
2242            "RegistrationBlock", [netuid], extract_value=extract_value
2243        )["RegistrationBlock"]
2244
2245    def query_map_name(
2246        self, netuid: int = 0, extract_value: bool = False
2247    ) -> dict[int, str]:
2248        """
2249        Retrieves a mapping of names for keys on the network.
2250
2251        Queries the network for the names associated with different keys.
2252        It provides a mapping of key UIDs to their registered names.
2253
2254        Args:
2255            netuid: The network UID from which to get the names.
2256
2257        Returns:
2258            A dictionary mapping key UIDs to their names.
2259
2260        Raises:
2261            QueryError: If the query to the network fails or is invalid.
2262        """
2263
2264        return self.query_map("Name", [netuid], extract_value=extract_value)["Name"]
2265
2266    #  == QUERY FUNCTIONS == #
2267
2268    def get_immunity_period(self, netuid: int = 0) -> int:
2269        """
2270        Queries the network for the immunity period setting.
2271
2272        The immunity period is a time duration during which a module
2273        can not be deregistered from the network.
2274        Fetches the immunity period for a specified network subnet.
2275
2276        Args:
2277            netuid: The network UID for which to query the immunity period.
2278
2279        Returns:
2280            The immunity period setting for the specified network subnet.
2281
2282        Raises:
2283            QueryError: If the query to the network fails or is invalid.
2284        """
2285
2286        return self.query(
2287            "ImmunityPeriod",
2288            params=[netuid],
2289        )
2290
2291    def get_max_set_weights_per_epoch(self):
2292        return self.query("MaximumSetWeightCallsPerEpoch")
2293
2294    def get_min_allowed_weights(self, netuid: int = 0) -> int:
2295        """
2296        Queries the network for the minimum allowed weights setting.
2297
2298        Retrieves the minimum weight values that are possible to set
2299        by a validator within a specific network subnet.
2300
2301        Args:
2302            netuid: The network UID for which to query the minimum allowed
2303              weights.
2304
2305        Returns:
2306            The minimum allowed weight values for the specified network
2307              subnet.
2308
2309        Raises:
2310            QueryError: If the query to the network fails or is invalid.
2311        """
2312
2313        return self.query(
2314            "MinAllowedWeights",
2315            params=[netuid],
2316        )
2317
2318    def get_dao_treasury_address(self) -> Ss58Address:
2319        return self.query("DaoTreasuryAddress", module="GovernanceModule")
2320
2321    def get_max_allowed_weights(self, netuid: int = 0) -> int:
2322        """
2323        Queries the network for the maximum allowed weights setting.
2324
2325        Retrieves the maximum weight values that are possible to set
2326        by a validator within a specific network subnet.
2327
2328        Args:
2329            netuid: The network UID for which to query the maximum allowed
2330              weights.
2331
2332        Returns:
2333            The maximum allowed weight values for the specified network
2334              subnet.
2335
2336        Raises:
2337            QueryError: If the query to the network fails or is invalid.
2338        """
2339
2340        return self.query("MaxAllowedWeights", params=[netuid])
2341
2342    def get_max_allowed_uids(self, netuid: int = 0) -> int:
2343        """
2344        Queries the network for the maximum allowed UIDs setting.
2345
2346        Fetches the upper limit on the number of user IDs that can
2347        be allocated or used within a specific network subnet.
2348
2349        Args:
2350            netuid: The network UID for which to query the maximum allowed UIDs.
2351
2352        Returns:
2353            The maximum number of allowed UIDs for the specified network subnet.
2354
2355        Raises:
2356            QueryError: If the query to the network fails or is invalid.
2357        """
2358
2359        return self.query("MaxAllowedUids", params=[netuid])
2360
2361    def get_name(self, netuid: int = 0) -> str:
2362        """
2363        Queries the network for the name of a specific subnet.
2364
2365        Args:
2366            netuid: The network UID for which to query the name.
2367
2368        Returns:
2369            The name of the specified network subnet.
2370
2371        Raises:
2372            QueryError: If the query to the network fails or is invalid.
2373        """
2374
2375        return self.query("Name", params=[netuid])
2376
2377    def get_subnet_name(self, netuid: int = 0) -> str:
2378        """
2379        Queries the network for the name of a specific subnet.
2380
2381        Args:
2382            netuid: The network UID for which to query the name.
2383
2384        Returns:
2385            The name of the specified network subnet.
2386
2387        Raises:
2388            QueryError: If the query to the network fails or is invalid.
2389        """
2390
2391        return self.query("SubnetNames", params=[netuid])
2392
2393    def get_global_dao_treasury(self):
2394        return self.query("GlobalDaoTreasury", module="GovernanceModule")
2395
2396    def get_n(self, netuid: int = 0) -> int:
2397        """
2398        Queries the network for the 'N' hyperparameter, which represents how
2399        many modules are on the network.
2400
2401        Args:
2402            netuid: The network UID for which to query the 'N' hyperparameter.
2403
2404        Returns:
2405            The value of the 'N' hyperparameter for the specified network
2406              subnet.
2407
2408        Raises:
2409            QueryError: If the query to the network fails or is invalid.
2410        """
2411
2412        return self.query("N", params=[netuid])
2413
2414    def get_tempo(self, netuid: int = 0) -> int:
2415        """
2416        Queries the network for the tempo setting, measured in blocks, for the
2417        specified subnet.
2418
2419        Args:
2420            netuid: The network UID for which to query the tempo.
2421
2422        Returns:
2423            The tempo setting for the specified subnet.
2424
2425        Raises:
2426            QueryError: If the query to the network fails or is invalid.
2427        """
2428
2429        return self.query("Tempo", params=[netuid])
2430
2431    def get_total_stake(self) -> int:
2432        """
2433        Retrieves a mapping of total stakes for keys on the network.
2434
2435        Queries the network for a mapping of key UIDs to their total stake amounts.
2436
2437        Returns:
2438            A dictionary mapping key UIDs to their total stake amounts.
2439
2440        Raises:
2441            QueryError: If the query to the network fails or is invalid.
2442        """
2443
2444        return self.query("TotalStake")
2445
2446    def get_registrations_per_block(self):
2447        """
2448        Queries the network for the number of registrations per block.
2449
2450        Fetches the number of registrations that are processed per
2451        block within the network.
2452
2453        Returns:
2454            The number of registrations processed per block.
2455
2456        Raises:
2457            QueryError: If the query to the network fails or is invalid.
2458        """
2459
2460        return self.query(
2461            "RegistrationsPerBlock",
2462        )
2463
2464    def max_registrations_per_block(self, netuid: int = 0):
2465        """
2466        Queries the network for the maximum number of registrations per block.
2467
2468        Retrieves the upper limit of registrations that can be processed in
2469        each block within a specific network subnet.
2470
2471        Args:
2472            netuid: The network UID for which to query.
2473
2474        Returns:
2475            The maximum number of registrations per block for
2476            the specified network subnet.
2477
2478        Raises:
2479            QueryError: If the query to the network fails or is invalid.
2480        """
2481
2482        return self.query(
2483            "MaxRegistrationsPerBlock",
2484            params=[netuid],
2485        )
2486
2487    def get_proposal(self, proposal_id: int = 0):
2488        """
2489        Queries the network for a specific proposal.
2490
2491        Args:
2492            proposal_id: The ID of the proposal to query.
2493
2494        Returns:
2495            The details of the specified proposal.
2496
2497        Raises:
2498            QueryError: If the query to the network fails, is invalid,
2499                or if the proposal ID does not exist.
2500        """
2501
2502        return self.query(
2503            "Proposals",
2504            params=[proposal_id],
2505        )
2506
2507    def get_trust(self, netuid: int = 0):
2508        """
2509        Queries the network for the trust setting of a specific network subnet.
2510
2511        Retrieves the trust level or score, which may represent the
2512        level of trustworthiness or reliability within a
2513        particular network subnet.
2514
2515        Args:
2516            netuid: The network UID for which to query the trust setting.
2517
2518        Returns:
2519            The trust level or score for the specified network subnet.
2520
2521        Raises:
2522            QueryError: If the query to the network fails or is invalid.
2523        """
2524
2525        return self.query(
2526            "Trust",
2527            params=[netuid],
2528        )
2529
2530    def get_uids(self, key: Ss58Address, netuid: int = 0) -> bool | None:
2531        """
2532        Queries the network for module UIDs associated with a specific key.
2533
2534        Args:
2535            key: The key address for which to query UIDs.
2536            netuid: The network UID within which to search for the key.
2537
2538        Returns:
2539            A list of UIDs associated with the specified key.
2540
2541        Raises:
2542            QueryError: If the query to the network fails or is invalid.
2543        """
2544
2545        return self.query(
2546            "Uids",
2547            params=[netuid, key],
2548        )
2549
2550    def get_unit_emission(self) -> int:
2551        """
2552        Queries the network for the unit emission setting.
2553
2554        Retrieves the unit emission value, which represents the
2555        emission rate or quantity for the $COMM token.
2556
2557        Returns:
2558            The unit emission value in nanos for the network.
2559
2560        Raises:
2561            QueryError: If the query to the network fails or is invalid.
2562        """
2563
2564        return self.query("UnitEmission", module="SubnetEmissionModule")
2565
2566    def get_tx_rate_limit(self) -> int:
2567        """
2568        Queries the network for the transaction rate limit.
2569
2570        Retrieves the rate limit for transactions within the network,
2571        which defines the maximum number of transactions that can be
2572        processed within a certain timeframe.
2573
2574        Returns:
2575            The transaction rate limit for the network.
2576
2577        Raises:
2578            QueryError: If the query to the network fails or is invalid.
2579        """
2580
2581        return self.query(
2582            "TxRateLimit",
2583        )
2584
2585    def get_subnet_burn(self) -> int:
2586        """Queries the network for the subnet burn value.
2587
2588        Retrieves the subnet burn value from the network, which represents
2589        the amount of tokens that are burned (permanently removed from
2590        circulation) for subnet-related operations.
2591
2592        Returns:
2593            int: The subnet burn value.
2594
2595        Raises:
2596            QueryError: If the query to the network fails or returns invalid data.
2597        """
2598
2599        return self.query(
2600            "SubnetBurn",
2601        )
2602
2603    def get_burn_rate(self) -> int:
2604        """
2605        Queries the network for the burn rate setting.
2606
2607        Retrieves the burn rate, which represents the rate at
2608        which the $COMM token is permanently
2609        removed or 'burned' from circulation.
2610
2611        Returns:
2612            The burn rate for the network.
2613
2614        Raises:
2615            QueryError: If the query to the network fails or is invalid.
2616        """
2617
2618        return self.query(
2619            "BurnRate",
2620            params=[],
2621        )
2622
2623    def get_burn(self, netuid: int = 0) -> int:
2624        """
2625        Queries the network for the burn setting.
2626
2627        Retrieves the burn value, which represents the amount of the
2628        $COMM token that is 'burned' or permanently removed from
2629        circulation.
2630
2631        Args:
2632            netuid: The network UID for which to query the burn value.
2633
2634        Returns:
2635            The burn value for the specified network subnet.
2636
2637        Raises:
2638            QueryError: If the query to the network fails or is invalid.
2639        """
2640
2641        return self.query("Burn", params=[netuid])
2642
2643    def get_min_burn(self) -> int:
2644        """
2645        Queries the network for the minimum burn setting.
2646
2647        Retrieves the minimum burn value, indicating the lowest
2648        amount of the $COMM tokens that can be 'burned' or
2649        permanently removed from circulation.
2650
2651        Returns:
2652            The minimum burn value for the network.
2653
2654        Raises:
2655            QueryError: If the query to the network fails or is invalid.
2656        """
2657
2658        return self.query(
2659            "BurnConfig",
2660            params=[],
2661        )["min_burn"]
2662
2663    def get_min_weight_stake(self) -> int:
2664        """
2665        Queries the network for the minimum weight stake setting.
2666
2667        Retrieves the minimum weight stake, which represents the lowest
2668        stake weight that is allowed for certain operations or
2669        transactions within the network.
2670
2671        Returns:
2672            The minimum weight stake for the network.
2673
2674        Raises:
2675            QueryError: If the query to the network fails or is invalid.
2676        """
2677
2678        return self.query("MinWeightStake", params=[])
2679
2680    def get_vote_mode_global(self) -> str:
2681        """
2682        Queries the network for the global vote mode setting.
2683
2684        Retrieves the global vote mode, which defines the overall voting
2685        methodology or approach used across the network in default.
2686
2687        Returns:
2688            The global vote mode setting for the network.
2689
2690        Raises:
2691            QueryError: If the query to the network fails or is invalid.
2692        """
2693
2694        return self.query(
2695            "VoteModeGlobal",
2696        )
2697
2698    def get_max_proposals(self) -> int:
2699        """
2700        Queries the network for the maximum number of proposals allowed.
2701
2702        Retrieves the upper limit on the number of proposals that can be
2703        active or considered at any given time within the network.
2704
2705        Returns:
2706            The maximum number of proposals allowed on the network.
2707
2708        Raises:
2709            QueryError: If the query to the network fails or is invalid.
2710        """
2711
2712        return self.query(
2713            "MaxProposals",
2714        )
2715
2716    def get_max_registrations_per_block(self) -> int:
2717        """
2718        Queries the network for the maximum number of registrations per block.
2719
2720        Retrieves the maximum number of registrations that can
2721        be processed in each block within the network.
2722
2723        Returns:
2724            The maximum number of registrations per block on the network.
2725
2726        Raises:
2727            QueryError: If the query to the network fails or is invalid.
2728        """
2729
2730        return self.query(
2731            "MaxRegistrationsPerBlock",
2732            params=[],
2733        )
2734
2735    def get_max_name_length(self) -> int:
2736        """
2737        Queries the network for the maximum length allowed for names.
2738
2739        Retrieves the maximum character length permitted for names
2740        within the network. Such as the module names
2741
2742        Returns:
2743            The maximum length allowed for names on the network.
2744
2745        Raises:
2746            QueryError: If the query to the network fails or is invalid.
2747        """
2748
2749        return self.query(
2750            "MaxNameLength",
2751            params=[],
2752        )
2753
2754    def get_global_vote_threshold(self) -> int:
2755        """
2756        Queries the network for the global vote threshold.
2757
2758        Retrieves the global vote threshold, which is the critical value or
2759        percentage required for decisions in the network's governance process.
2760
2761        Returns:
2762            The global vote threshold for the network.
2763
2764        Raises:
2765            QueryError: If the query to the network fails or is invalid.
2766        """
2767
2768        return self.query(
2769            "GlobalVoteThreshold",
2770        )
2771
2772    def get_max_allowed_subnets(self) -> int:
2773        """
2774        Queries the network for the maximum number of allowed subnets.
2775
2776        Retrieves the upper limit on the number of subnets that can
2777        be created or operated within the network.
2778
2779        Returns:
2780            The maximum number of allowed subnets on the network.
2781
2782        Raises:
2783            QueryError: If the query to the network fails or is invalid.
2784        """
2785
2786        return self.query(
2787            "MaxAllowedSubnets",
2788            params=[],
2789        )
2790
2791    def get_max_allowed_modules(self) -> int:
2792        """
2793        Queries the network for the maximum number of allowed modules.
2794
2795        Retrieves the upper limit on the number of modules that
2796        can be registered within the network.
2797
2798        Returns:
2799            The maximum number of allowed modules on the network.
2800
2801        Raises:
2802            QueryError: If the query to the network fails or is invalid.
2803        """
2804
2805        return self.query(
2806            "MaxAllowedModules",
2807            params=[],
2808        )
2809
2810    def get_min_stake(self, netuid: int = 0) -> int:
2811        """
2812        Queries the network for the minimum stake required to register a key.
2813
2814        Retrieves the minimum amount of stake necessary for
2815        registering a key within a specific network subnet.
2816
2817        Args:
2818            netuid: The network UID for which to query the minimum stake.
2819
2820        Returns:
2821            The minimum stake required for key registration in nanos.
2822
2823        Raises:
2824            QueryError: If the query to the network fails or is invalid.
2825        """
2826
2827        return self.query("MinStake", params=[netuid])
2828
2829    def get_stakefrom(
2830        self,
2831        key: Ss58Address,
2832    ) -> dict[str, int]:
2833        """
2834        Retrieves the stake amounts from all stakers to a specific staked address.
2835
2836        Queries the network for the stakes received by a particular staked address
2837        from all stakers.
2838
2839        Args:
2840            key: The address of the key receiving the stakes.
2841
2842        Returns:
2843            A dictionary mapping staker addresses to their respective stake amounts.
2844
2845        Raises:
2846            QueryError: If the query to the network fails or is invalid.
2847        """
2848
2849        # Has to use query map in order to iterate through the storage prefix.
2850        return self.query_map("StakeFrom", [key], extract_value=False).get("StakeFrom", {})
2851
2852    def get_staketo(
2853        self,
2854        key: Ss58Address,
2855    ) -> dict[str, int]:
2856        """
2857        Retrieves the stake amounts provided by a specific staker to all staked addresses.
2858
2859        Queries the network for the stakes provided by a particular staker to
2860        all staked addresses.
2861
2862        Args:
2863            key: The address of the key providing the stakes.
2864
2865        Returns:
2866            A dictionary mapping staked addresses to their respective received stake amounts.
2867
2868        Raises:
2869            QueryError: If the query to the network fails or is invalid.
2870        """
2871
2872        # Has to use query map in order to iterate through the storage prefix.
2873        return self.query_map("StakeTo", [key], extract_value=False).get("StakeTo", {})
2874
2875    def get_balance(
2876        self,
2877        addr: Ss58Address,
2878    ) -> int:
2879        """
2880        Retrieves the balance of a specific key.
2881
2882        Args:
2883            addr: The address of the key to query the balance for.
2884
2885        Returns:
2886            The balance of the specified key.
2887
2888        Raises:
2889            QueryError: If the query to the network fails or is invalid.
2890        """
2891
2892        result = self.query("Account", module="System", params=[addr])
2893
2894        return result["data"]["free"]
2895
2896    def get_block(self, block_hash: str | None = None) -> dict[Any, Any] | None:
2897        """
2898        Retrieves information about a specific block in the network.
2899
2900        Queries the network for details about a block, such as its number,
2901        hash, and other relevant information.
2902
2903        Returns:
2904            The requested information about the block,
2905            or None if the block does not exist
2906            or the information is not available.
2907
2908        Raises:
2909            QueryError: If the query to the network fails or is invalid.
2910        """
2911
2912        with self.get_conn() as substrate:
2913            block: dict[Any, Any] | None = substrate.get_block(  # type: ignore
2914                block_hash  # type: ignore
2915            )
2916
2917        return block
2918
2919    def get_existential_deposit(self, block_hash: str | None = None) -> int:
2920        """
2921        Retrieves the existential deposit value for the network.
2922
2923        The existential deposit is the minimum balance that must be maintained
2924        in an account to prevent it from being purged. Denotated in nano units.
2925
2926        Returns:
2927            The existential deposit value in nano units.
2928        Note:
2929            The value returned is a fixed value defined in the
2930            client and may not reflect changes in the network's configuration.
2931        """
2932
2933        with self.get_conn() as substrate:
2934            result: int = substrate.get_constant(  #  type: ignore
2935                "Balances", "ExistentialDeposit", block_hash
2936            ).value  #  type: ignore
2937
2938        return result
2939
2940    def get_voting_power_delegators(self) -> list[Ss58Address]:
2941        result = self.query("NotDelegatingVotingPower", [], module="GovernanceModule")
2942        return result
2943
2944    def add_transfer_dao_treasury_proposal(
2945        self,
2946        key: Keypair,
2947        data: str,
2948        amount_nano: int,
2949        dest: Ss58Address,
2950    ):
2951        params = {"dest": dest, "value": amount_nano, "data": data}
2952
2953        return self.compose_call(
2954            module="GovernanceModule",
2955            fn="add_transfer_dao_treasury_proposal",
2956            params=params,
2957            key=key,
2958        )
2959
2960    def delegate_rootnet_control(self, key: Keypair, dest: Ss58Address):
2961        params = {"origin": key, "target": dest}
2962
2963        return self.compose_call(
2964            module="SubspaceModule",
2965            fn="delegate_rootnet_control",
2966            params=params,
2967            key=key,
2968        )
MAX_REQUEST_SIZE = 9000000
@dataclass
class Chunk:
24@dataclass
25class Chunk:
26    batch_requests: list[tuple[Any, Any]]
27    prefix_list: list[list[str]]
28    fun_params: list[tuple[Any, Any, Any, Any, str]]
Chunk( batch_requests: list[tuple[typing.Any, typing.Any]], prefix_list: list[list[str]], fun_params: list[tuple[typing.Any, typing.Any, typing.Any, typing.Any, str]])
batch_requests: list[tuple[typing.Any, typing.Any]]
prefix_list: list[list[str]]
fun_params: list[tuple[typing.Any, typing.Any, typing.Any, typing.Any, str]]
class CommuneClient:
  35class CommuneClient:
  36    """
  37    A client for interacting with Commune network nodes, querying storage,
  38    submitting transactions, etc.
  39
  40    Attributes:
  41        wait_for_finalization: Whether to wait for transaction finalization.
  42
  43    Example:
  44    ```py
  45    client = CommuneClient()
  46    client.query(name='function_name', params=['param1', 'param2'])
  47    ```
  48
  49    Raises:
  50        AssertionError: If the maximum connections value is less than or equal
  51          to zero.
  52    """
  53
  54    wait_for_finalization: bool
  55    _num_connections: int
  56    _connection_queue: queue.Queue[SubstrateInterface]
  57    url: str
  58
  59    def __init__(
  60        self,
  61        url: str,
  62        num_connections: int = 1,
  63        wait_for_finalization: bool = False,
  64    ):
  65        """
  66        Args:
  67            url: The URL of the network node to connect to.
  68            num_connections: The number of websocket connections to be opened.
  69        """
  70        assert num_connections > 0
  71        self._num_connections = num_connections
  72        self.wait_for_finalization = wait_for_finalization
  73        self._connection_queue = queue.Queue(num_connections)
  74        self.url = url
  75
  76        for _ in range(num_connections):
  77            self._connection_queue.put(SubstrateInterface(url))
  78
  79    @property
  80    def connections(self) -> int:
  81        """
  82        Gets the maximum allowed number of simultaneous connections to the
  83        network node.
  84        """
  85        return self._num_connections
  86
  87    @contextmanager
  88    def get_conn(self, timeout: float | None = None, init: bool = False):
  89        """
  90        Context manager to get a connection from the pool.
  91
  92        Tries to get a connection from the pool queue. If the queue is empty,
  93        it blocks for `timeout` seconds until a connection is available. If
  94        `timeout` is None, it blocks indefinitely.
  95
  96        Args:
  97            timeout: The maximum time in seconds to wait for a connection.
  98
  99        Yields:
 100            The connection object from the pool.
 101
 102        Raises:
 103            QueueEmptyError: If no connection is available within the timeout
 104              period.
 105        """
 106        conn = self._connection_queue.get(timeout=timeout)
 107        if init:
 108            conn.init_runtime()  # type: ignore
 109        try:
 110            yield conn
 111        finally:
 112            self._connection_queue.put(conn)
 113
 114    def _get_storage_keys(
 115        self,
 116        storage: str,
 117        queries: list[tuple[str, list[Any]]],
 118        block_hash: str | None,
 119    ):
 120
 121        send: list[tuple[str, list[Any]]] = []
 122        prefix_list: list[Any] = []
 123
 124        key_idx = 0
 125        with self.get_conn(init=True) as substrate:
 126            for function, params in queries:
 127                storage_key = StorageKey.create_from_storage_function(  # type: ignore
 128                    storage, function, params, runtime_config=substrate.runtime_config, metadata=substrate.metadata  # type: ignore
 129                )
 130
 131                prefix = storage_key.to_hex()
 132                prefix_list.append(prefix)
 133                send.append(("state_getKeys", [prefix, block_hash]))
 134                key_idx += 1
 135        return send, prefix_list
 136
 137    def _get_lists(
 138        self,
 139        storage_module: str,
 140        queries: list[tuple[str, list[Any]]],
 141        substrate: SubstrateInterface,
 142    ) -> list[tuple[Any, Any, Any, Any, str]]:
 143        """
 144        Generates a list of tuples containing parameters for each storage function based on the given functions and substrate interface.
 145
 146        Args:
 147            functions (dict[str, list[query_call]]): A dictionary where keys are storage module names and values are lists of tuples.
 148                Each tuple consists of a storage function name and its parameters.
 149            substrate: An instance of the SubstrateInterface class used to interact with the substrate.
 150
 151        Returns:
 152            A list of tuples in the format `(value_type, param_types, key_hashers, params, storage_function)` for each storage function in the given functions.
 153
 154        Example:
 155            >>> _get_lists(
 156                    functions={'storage_module': [('storage_function', ['param1', 'param2'])]},
 157                    substrate=substrate_instance
 158                )
 159            [('value_type', 'param_types', 'key_hashers', ['param1', 'param2'], 'storage_function'), ...]
 160        """
 161
 162        function_parameters: list[tuple[Any, Any, Any, Any, str]] = []
 163
 164        metadata_pallet = substrate.metadata.get_metadata_pallet(  # type: ignore
 165            storage_module
 166        )
 167        for storage_function, params in queries:
 168            storage_item = metadata_pallet.get_storage_function(  # type: ignore
 169                storage_function
 170            )
 171
 172            value_type = storage_item.get_value_type_string()  # type: ignore
 173            param_types = storage_item.get_params_type_string()  # type: ignore
 174            key_hashers = storage_item.get_param_hashers()  # type: ignore
 175            function_parameters.append(
 176                (
 177                    value_type,
 178                    param_types,
 179                    key_hashers,
 180                    params,
 181                    storage_function,
 182                )  # type: ignore
 183            )
 184        return function_parameters
 185
 186    def _send_batch(
 187        self,
 188        batch_payload: list[Any],
 189        request_ids: list[int],
 190        extract_result: bool = True,
 191    ):
 192        """
 193        Sends a batch of requests to the substrate and collects the results.
 194
 195        Args:
 196            substrate: An instance of the substrate interface.
 197            batch_payload: The payload of the batch request.
 198            request_ids: A list of request IDs for tracking responses.
 199            results: A list to store the results of the requests.
 200            extract_result: Whether to extract the result from the response.
 201
 202        Raises:
 203            NetworkQueryError: If there is an `error` in the response message.
 204
 205        Note:
 206            No explicit return value as results are appended to the provided 'results' list.
 207        """
 208        results: list[str | dict[Any, Any]] = []
 209        with self.get_conn(init=True) as substrate:
 210            try:
 211
 212                substrate.websocket.send(  #  type: ignore
 213                    json.dumps(batch_payload)
 214                )  # type: ignore
 215            except NetworkQueryError:
 216                pass
 217            while len(results) < len(request_ids):
 218                received_messages = json.loads(
 219                    substrate.websocket.recv()  # type: ignore
 220                )  # type: ignore
 221                if isinstance(received_messages, dict):
 222                    received_messages: list[dict[Any, Any]] = [received_messages]
 223
 224                for message in received_messages:
 225                    if message.get("id") in request_ids:
 226                        if extract_result:
 227                            try:
 228                                results.append(message["result"])
 229                            except Exception:
 230                                raise (
 231                                    RuntimeError(
 232                                        f"Error extracting result from message: {message}"
 233                                    )
 234                                )
 235                        else:
 236                            results.append(message)
 237                    if "error" in message:
 238                        raise NetworkQueryError(message["error"])
 239
 240            return results
 241
 242    def _make_request_smaller(
 243        self,
 244        batch_request: list[tuple[T1, T2]],
 245        prefix_list: list[list[str]],
 246        fun_params: list[tuple[Any, Any, Any, Any, str]],
 247    ) -> tuple[list[list[tuple[T1, T2]]], list[Chunk]]:
 248        """
 249        Splits a batch of requests into smaller batches, each not exceeding the specified maximum size.
 250
 251        Args:
 252            batch_request: A list of requests to be sent in a batch.
 253            max_size: Maximum size of each batch in bytes.
 254
 255        Returns:
 256            A list of smaller request batches.
 257
 258        Example:
 259            >>> _make_request_smaller(batch_request=[('method1', 'params1'), ('method2', 'params2')], max_size=1000)
 260            [[('method1', 'params1')], [('method2', 'params2')]]
 261        """
 262        assert len(prefix_list) == len(fun_params) == len(batch_request)
 263
 264        def estimate_size(request: tuple[T1, T2]):
 265            """Convert the batch request to a string and measure its length"""
 266            return len(json.dumps(request))
 267
 268        # Initialize variables
 269        result: list[list[tuple[T1, T2]]] = []
 270        current_batch = []
 271        current_prefix_batch = []
 272        current_params_batch = []
 273        current_size = 0
 274
 275        chunk_list: list[Chunk] = []
 276
 277        # Iterate through each request in the batch
 278        for request, prefix, params in zip(batch_request, prefix_list, fun_params):
 279            request_size = estimate_size(request)
 280
 281            # Check if adding this request exceeds the max size
 282            if current_size + request_size > MAX_REQUEST_SIZE:
 283                # If so, start a new batch
 284
 285                # Essentiatly checks that it's not the first iteration
 286                if current_batch:
 287                    chunk = Chunk(
 288                        current_batch, current_prefix_batch, current_params_batch
 289                    )
 290                    chunk_list.append(chunk)
 291                    result.append(current_batch)
 292
 293                current_batch = [request]
 294                current_prefix_batch = [prefix]
 295                current_params_batch = [params]
 296                current_size = request_size
 297            else:
 298                # Otherwise, add to the current batch
 299                current_batch.append(request)
 300                current_size += request_size
 301                current_prefix_batch.append(prefix)
 302                current_params_batch.append(params)
 303
 304        # Add the last batch if it's not empty
 305        if current_batch:
 306            result.append(current_batch)
 307            chunk = Chunk(current_batch, current_prefix_batch, current_params_batch)
 308            chunk_list.append(chunk)
 309
 310        return result, chunk_list
 311
 312    def _are_changes_equal(self, change_a: Any, change_b: Any):
 313        for (a, b), (c, d) in zip(change_a, change_b):
 314            if a != c or b != d:
 315                return False
 316
 317    def _rpc_request_batch(
 318        self, batch_requests: list[tuple[str, list[Any]]], extract_result: bool = True
 319    ) -> list[str]:
 320        """
 321        Sends batch requests to the substrate node using multiple threads and collects the results.
 322
 323        Args:
 324            substrate: An instance of the substrate interface.
 325            batch_requests : A list of requests to be sent in batches.
 326            max_size: Maximum size of each batch in bytes.
 327            extract_result: Whether to extract the result from the response message.
 328
 329        Returns:
 330            A list of results from the batch requests.
 331
 332        Example:
 333            >>> _rpc_request_batch(substrate_instance, [('method1', ['param1']), ('method2', ['param2'])])
 334            ['result1', 'result2', ...]
 335        """
 336
 337        chunk_results: list[Any] = []
 338        # smaller_requests = self._make_request_smaller(batch_requests)
 339        request_id = 0
 340        with ThreadPoolExecutor() as executor:
 341            futures: list[Future[list[str | dict[Any, Any]]]] = []
 342            for chunk in [batch_requests]:
 343                request_ids: list[int] = []
 344                batch_payload: list[Any] = []
 345                for method, params in chunk:
 346                    request_id += 1
 347                    request_ids.append(request_id)
 348                    batch_payload.append(
 349                        {
 350                            "jsonrpc": "2.0",
 351                            "method": method,
 352                            "params": params,
 353                            "id": request_id,
 354                        }
 355                    )
 356
 357                futures.append(
 358                    executor.submit(
 359                        self._send_batch,
 360                        batch_payload=batch_payload,
 361                        request_ids=request_ids,
 362                        extract_result=extract_result,
 363                    )
 364                )
 365            for future in futures:
 366                resul = future.result()
 367                chunk_results.append(resul)
 368        return chunk_results
 369
 370    def _rpc_request_batch_chunked(
 371        self, chunk_requests: list[Chunk], extract_result: bool = True
 372    ):
 373        """
 374        Sends batch requests to the substrate node using multiple threads and collects the results.
 375
 376        Args:
 377            substrate: An instance of the substrate interface.
 378            batch_requests : A list of requests to be sent in batches.
 379            max_size: Maximum size of each batch in bytes.
 380            extract_result: Whether to extract the result from the response message.
 381
 382        Returns:
 383            A list of results from the batch requests.
 384
 385        Example:
 386            >>> _rpc_request_batch(substrate_instance, [('method1', ['param1']), ('method2', ['param2'])])
 387            ['result1', 'result2', ...]
 388        """
 389
 390        def split_chunks(chunk: Chunk, chunk_info: list[Chunk], chunk_info_idx: int):
 391            manhattam_chunks: list[tuple[Any, Any]] = []
 392            mutaded_chunk_info = deepcopy(chunk_info)
 393            max_n_keys = 35000
 394            for query in chunk.batch_requests:
 395                result_keys = query[1][0]
 396                keys_amount = len(result_keys)
 397                if keys_amount > max_n_keys:
 398                    mutaded_chunk_info.pop(chunk_info_idx)
 399                    for i in range(0, keys_amount, max_n_keys):
 400                        new_chunk = deepcopy(chunk)
 401                        splitted_keys = result_keys[i: i + max_n_keys]
 402                        splitted_query = deepcopy(query)
 403                        splitted_query[1][0] = splitted_keys
 404                        new_chunk.batch_requests = [splitted_query]
 405                        manhattam_chunks.append(splitted_query)
 406                        mutaded_chunk_info.insert(chunk_info_idx, new_chunk)
 407                else:
 408                    manhattam_chunks.append(query)
 409            return manhattam_chunks, mutaded_chunk_info
 410
 411        assert len(chunk_requests) > 0
 412        mutated_chunk_info: list[Chunk] = []
 413        chunk_results: list[Any] = []
 414        # smaller_requests = self._make_request_smaller(batch_requests)
 415        request_id = 0
 416
 417        with ThreadPoolExecutor() as executor:
 418            futures: list[Future[list[str | dict[Any, Any]]]] = []
 419            for idx, macro_chunk in enumerate(chunk_requests):
 420                _, mutated_chunk_info = split_chunks(macro_chunk, chunk_requests, idx)
 421            for chunk in mutated_chunk_info:
 422                request_ids: list[int] = []
 423                batch_payload: list[Any] = []
 424                for method, params in chunk.batch_requests:
 425                    # for method, params in micro_chunk:
 426                    request_id += 1
 427                    request_ids.append(request_id)
 428                    batch_payload.append(
 429                        {
 430                            "jsonrpc": "2.0",
 431                            "method": method,
 432                            "params": params,
 433                            "id": request_id,
 434                        }
 435                    )
 436                futures.append(
 437                    executor.submit(
 438                        self._send_batch,
 439                        batch_payload=batch_payload,
 440                        request_ids=request_ids,
 441                        extract_result=extract_result,
 442                    )
 443                )
 444            for future in futures:
 445                resul = future.result()
 446                chunk_results.append(resul)
 447        return chunk_results, mutated_chunk_info
 448
 449    def _decode_response(
 450        self,
 451        response: list[str],
 452        function_parameters: list[tuple[Any, Any, Any, Any, str]],
 453        prefix_list: list[Any],
 454        block_hash: str,
 455    ) -> dict[str, dict[Any, Any]]:
 456        """
 457        Decodes a response from the substrate interface and organizes the data into a dictionary.
 458
 459        Args:
 460            response: A list of encoded responses from a substrate query.
 461            function_parameters: A list of tuples containing the parameters for each storage function.
 462            last_keys: A list of the last keys used in the substrate query.
 463            prefix_list: A list of prefixes used in the substrate query.
 464            substrate: An instance of the SubstrateInterface class.
 465            block_hash: The hash of the block to be queried.
 466
 467        Returns:
 468            A dictionary where each key is a storage function name and the value is another dictionary.
 469            This inner dictionary's key is the decoded key from the response and the value is the corresponding decoded value.
 470
 471        Raises:
 472            ValueError: If an unsupported hash type is encountered in the `concat_hash_len` function.
 473
 474        Example:
 475            >>> _decode_response(
 476                    response=[...],
 477                    function_parameters=[...],
 478                    last_keys=[...],
 479                    prefix_list=[...],
 480                    substrate=substrate_instance,
 481                    block_hash="0x123..."
 482                )
 483            {'storage_function_name': {decoded_key: decoded_value, ...}, ...}
 484        """
 485
 486        def get_item_key_value(item_key: tuple[Any, ...] | Any) -> tuple[Any, ...] | Any:
 487            if isinstance(item_key, tuple):
 488                return tuple(k.value for k in item_key)
 489            return item_key.value
 490
 491        def concat_hash_len(key_hasher: str) -> int:
 492            """
 493            Determines the length of the hash based on the given key hasher type.
 494
 495            Args:
 496                key_hasher: The type of key hasher.
 497
 498            Returns:
 499                The length of the hash corresponding to the given key hasher type.
 500
 501            Raises:
 502                ValueError: If the key hasher type is not supported.
 503
 504            Example:
 505                >>> concat_hash_len("Blake2_128Concat")
 506                16
 507            """
 508
 509            if key_hasher == "Blake2_128Concat":
 510                return 16
 511            elif key_hasher == "Twox64Concat":
 512                return 8
 513            elif key_hasher == "Identity":
 514                return 0
 515            else:
 516                raise ValueError("Unsupported hash type")
 517
 518        assert len(response) == len(function_parameters) == len(prefix_list)
 519        result_dict: dict[str, dict[Any, Any]] = {}
 520        for res, fun_params_tuple, prefix in zip(
 521            response, function_parameters, prefix_list
 522        ):
 523            if not res:
 524                continue
 525            res = res[0]
 526            changes = res["changes"]  # type: ignore
 527            value_type, param_types, key_hashers, params, storage_function = (
 528                fun_params_tuple
 529            )
 530            with self.get_conn(init=True) as substrate:
 531                for item in changes:
 532                    # Determine type string
 533                    key_type_string: list[Any] = []
 534                    for n in range(len(params), len(param_types)):
 535                        key_type_string.append(
 536                            f"[u8; {concat_hash_len(key_hashers[n])}]"
 537                        )
 538                        key_type_string.append(param_types[n])
 539
 540                    item_key_obj = substrate.decode_scale(  # type: ignore
 541                        type_string=f"({', '.join(key_type_string)})",
 542                        scale_bytes="0x" + item[0][len(prefix):],
 543                        return_scale_obj=True,
 544                        block_hash=block_hash,
 545                    )
 546                    # strip key_hashers to use as item key
 547                    if len(param_types) - len(params) == 1:
 548                        item_key = item_key_obj.value_object[1]  # type: ignore
 549                    else:
 550                        item_key = tuple(  # type: ignore
 551                            item_key_obj.value_object[key + 1]  # type: ignore
 552                            for key in range(  # type: ignore
 553                                len(params), len(param_types) + 1, 2
 554                            )
 555                        )
 556
 557                    item_value = substrate.decode_scale(  # type: ignore
 558                        type_string=value_type,
 559                        scale_bytes=item[1],
 560                        return_scale_obj=True,
 561                        block_hash=block_hash,
 562                    )
 563                    result_dict.setdefault(storage_function, {})
 564                    key = get_item_key_value(item_key)  # type: ignore
 565                    result_dict[storage_function][key] = item_value.value  # type: ignore
 566
 567        return result_dict
 568
 569    def query_batch(
 570        self, functions: dict[str, list[tuple[str, list[Any]]]]
 571    ) -> dict[str, str]:
 572        """
 573        Executes batch queries on a substrate and returns results in a dictionary format.
 574
 575        Args:
 576            substrate: An instance of SubstrateInterface to interact with the substrate.
 577            functions (dict[str, list[query_call]]): A dictionary mapping module names to lists of query calls (function name and parameters).
 578
 579        Returns:
 580            A dictionary where keys are storage function names and values are the query results.
 581
 582        Raises:
 583            Exception: If no result is found from the batch queries.
 584
 585        Example:
 586            >>> query_batch(substrate_instance, {'module_name': [('function_name', ['param1', 'param2'])]})
 587            {'function_name': 'query_result', ...}
 588        """
 589
 590        result: dict[str, str] = {}
 591        if not functions:
 592            raise Exception("No result")
 593        with self.get_conn(init=True) as substrate:
 594            for module, queries in functions.items():
 595                storage_keys: list[Any] = []
 596                for fn, params in queries:
 597                    storage_function = substrate.create_storage_key(  # type: ignore
 598                        pallet=module, storage_function=fn, params=params
 599                    )
 600                    storage_keys.append(storage_function)
 601
 602                block_hash = substrate.get_block_hash()
 603                responses: list[Any] = substrate.query_multi(  # type: ignore
 604                    storage_keys=storage_keys, block_hash=block_hash
 605                )
 606
 607                for item in responses:
 608                    fun = item[0]
 609                    query = item[1]
 610                    storage_fun = fun.storage_function
 611                    result[storage_fun] = query.value
 612
 613        return result
 614
 615    def query_batch_map(
 616        self,
 617        functions: dict[str, list[tuple[str, list[Any]]]],
 618        block_hash: str | None = None,
 619    ) -> dict[str, dict[Any, Any]]:
 620        """
 621        Queries multiple storage functions using a map batch approach and returns the combined result.
 622
 623        Args:
 624            substrate: An instance of SubstrateInterface for substrate interaction.
 625            functions (dict[str, list[query_call]]): A dictionary mapping module names to lists of query calls.
 626
 627        Returns:
 628            The combined result of the map batch query.
 629
 630        Example:
 631            >>> query_batch_map(substrate_instance, {'module_name': [('function_name', ['param1', 'param2'])]})
 632            # Returns the combined result of the map batch query
 633        """
 634        multi_result: dict[str, dict[Any, Any]] = {}
 635
 636        def recursive_update(
 637            d: dict[str, dict[T1, T2] | dict[str, Any]],
 638            u: Mapping[str, dict[Any, Any] | str],
 639        ) -> dict[str, dict[T1, T2]]:
 640            for k, v in u.items():
 641                if isinstance(v, dict):
 642                    d[k] = recursive_update(d.get(k, {}), v)  # type: ignore
 643                else:
 644                    d[k] = v  # type: ignore
 645            return d  # type: ignore
 646
 647        def get_page():
 648            send, prefix_list = self._get_storage_keys(storage, queries, block_hash)
 649            with self.get_conn(init=True) as substrate:
 650                function_parameters = self._get_lists(storage, queries, substrate)
 651            responses = self._rpc_request_batch(send)
 652            # assumption because send is just the storage_function keys
 653            # so it should always be really small regardless of the amount of queries
 654            assert len(responses) == 1
 655            res = responses[0]
 656            built_payload: list[tuple[str, list[Any]]] = []
 657            for result_keys in res:
 658                built_payload.append(
 659                    ("state_queryStorageAt", [result_keys, block_hash])
 660                )
 661            _, chunks_info = self._make_request_smaller(
 662                built_payload, prefix_list, function_parameters
 663            )
 664            chunks_response, chunks_info = self._rpc_request_batch_chunked(chunks_info)
 665            return chunks_response, chunks_info
 666
 667        if not block_hash:
 668            with self.get_conn(init=True) as substrate:
 669                block_hash = substrate.get_block_hash()
 670        for storage, queries in functions.items():
 671            chunks, chunks_info = get_page()
 672            # if this doesn't happen something is wrong on the code
 673            # and we won't be able to decode the data properly
 674            assert len(chunks) == len(chunks_info)
 675            for chunk_info, response in zip(chunks_info, chunks):
 676                storage_result = self._decode_response(
 677                    response, chunk_info.fun_params, chunk_info.prefix_list, block_hash
 678                )
 679                multi_result = recursive_update(multi_result, storage_result)
 680
 681        return multi_result
 682
 683    def query(
 684        self,
 685        name: str,
 686        params: list[Any] = [],
 687        module: str = "SubspaceModule",
 688        block_hash: str | None = None,
 689    ) -> Any:
 690        """
 691        Queries a storage function on the network.
 692
 693        Sends a query to the network and retrieves data from a
 694        specified storage function.
 695
 696        Args:
 697            name: The name of the storage function to query.
 698            params: The parameters to pass to the storage function.
 699            module: The module where the storage function is located.
 700
 701        Returns:
 702            The result of the query from the network.
 703
 704        Raises:
 705            NetworkQueryError: If the query fails or is invalid.
 706        """
 707
 708        result = self.query_batch({module: [(name, params)]})
 709
 710        return result[name]
 711
 712    def query_map(
 713        self,
 714        name: str,
 715        params: list[Any] = [],
 716        module: str = "SubspaceModule",
 717        extract_value: bool = True,
 718    ) -> dict[Any, Any]:
 719        """
 720        Queries a storage map from a network node.
 721
 722        Args:
 723            name: The name of the storage map to query.
 724            params: A list of parameters for the query.
 725            module: The module in which the storage map is located.
 726
 727        Returns:
 728            A dictionary representing the key-value pairs
 729              retrieved from the storage map.
 730
 731        Raises:
 732            QueryError: If the query to the network fails or is invalid.
 733        """
 734
 735        result = self.query_batch_map({module: [(name, params)]})
 736
 737        if extract_value:
 738            return {k.value: v.value for k, v in result}  # type: ignore
 739
 740        return result
 741
 742    def compose_call(
 743        self,
 744        fn: str,
 745        params: dict[str, Any],
 746        key: Keypair | None,
 747        module: str = "SubspaceModule",
 748        wait_for_inclusion: bool = True,
 749        wait_for_finalization: bool | None = None,
 750        sudo: bool = False,
 751        unsigned: bool = False,
 752    ) -> ExtrinsicReceipt:
 753        """
 754        Composes and submits a call to the network node.
 755
 756        Composes and signs a call with the provided keypair, and submits it to
 757        the network. The call can be a standard extrinsic or a sudo extrinsic if
 758        elevated permissions are required. The method can optionally wait for
 759        the call's inclusion in a block and/or its finalization.
 760
 761        Args:
 762            fn: The function name to call on the network.
 763            params: A dictionary of parameters for the call.
 764            key: The keypair for signing the extrinsic.
 765            module: The module containing the function.
 766            wait_for_inclusion: Wait for the call's inclusion in a block.
 767            wait_for_finalization: Wait for the transaction's finalization.
 768            sudo: Execute the call as a sudo (superuser) operation.
 769
 770        Returns:
 771            The receipt of the submitted extrinsic, if
 772              `wait_for_inclusion` is True. Otherwise, returns a string
 773              identifier of the extrinsic.
 774
 775        Raises:
 776            ChainTransactionError: If the transaction fails.
 777        """
 778
 779        if key is None and not unsigned:
 780            raise ValueError("Key must be provided for signed extrinsics.")
 781
 782        with self.get_conn() as substrate:
 783            if wait_for_finalization is None:
 784                wait_for_finalization = self.wait_for_finalization
 785
 786            call = substrate.compose_call(  # type: ignore
 787                call_module=module, call_function=fn, call_params=params
 788            )
 789            if sudo:
 790                call = substrate.compose_call(  # type: ignore
 791                    call_module="Sudo",
 792                    call_function="sudo",
 793                    call_params={
 794                        "call": call.value,  # type: ignore
 795                    },
 796                )
 797
 798            if not unsigned:
 799                extrinsic = substrate.create_signed_extrinsic(  # type: ignore
 800                    call=call, keypair=key  # type: ignore
 801                )  # type: ignore
 802            else:
 803                extrinsic = substrate.create_unsigned_extrinsic(call=call)  # type: ignore
 804
 805            response = substrate.submit_extrinsic(
 806                extrinsic=extrinsic,
 807                wait_for_inclusion=wait_for_inclusion,
 808                wait_for_finalization=wait_for_finalization,
 809            )
 810        if wait_for_inclusion:
 811            if not response.is_success:
 812                raise ChainTransactionError(
 813                    response.error_message, response  # type: ignore
 814                )
 815
 816        return response
 817
 818    def compose_call_multisig(
 819        self,
 820        fn: str,
 821        params: dict[str, Any],
 822        key: Keypair,
 823        signatories: list[Ss58Address],
 824        threshold: int,
 825        module: str = "SubspaceModule",
 826        wait_for_inclusion: bool = True,
 827        wait_for_finalization: bool | None = None,
 828        sudo: bool = False,
 829        era: dict[str, int] | None = None,
 830    ) -> ExtrinsicReceipt:
 831        """
 832        Composes and submits a multisignature call to the network node.
 833
 834        This method allows the composition and submission of a call that
 835        requires multiple signatures for execution, known as a multisignature
 836        call. It supports specifying signatories, a threshold of signatures for
 837        the call's execution, and an optional era for the call's mortality. The
 838        call can be a standard extrinsic, a sudo extrinsic for elevated
 839        permissions, or a multisig extrinsic if multiple signatures are
 840        required. Optionally, the method can wait for the call's inclusion in a
 841        block and/or its finalization. Make sure to pass all keys,
 842        that are part of the multisignature.
 843
 844        Args:
 845            fn: The function name to call on the network. params: A dictionary
 846            of parameters for the call. key: The keypair for signing the
 847            extrinsic. signatories: List of SS58 addresses of the signatories.
 848            Include ALL KEYS that are part of the multisig. threshold: The
 849            minimum number of signatories required to execute the extrinsic.
 850            module: The module containing the function to call.
 851            wait_for_inclusion: Whether to wait for the call's inclusion in a
 852            block. wait_for_finalization: Whether to wait for the transaction's
 853            finalization. sudo: Execute the call as a sudo (superuser)
 854            operation. era: Specifies the call's mortality in terms of blocks in
 855            the format
 856                {'period': amount_blocks}. If omitted, the extrinsic is
 857                immortal.
 858
 859        Returns:
 860            The receipt of the submitted extrinsic if `wait_for_inclusion` is
 861            True. Otherwise, returns a string identifier of the extrinsic.
 862
 863        Raises:
 864            ChainTransactionError: If the transaction fails.
 865        """
 866
 867        # getting the call ready
 868        with self.get_conn() as substrate:
 869            if wait_for_finalization is None:
 870                wait_for_finalization = self.wait_for_finalization
 871
 872            # prepares the `GenericCall` object
 873            call = substrate.compose_call(  # type: ignore
 874                call_module=module, call_function=fn, call_params=params
 875            )
 876            if sudo:
 877                call = substrate.compose_call(  # type: ignore
 878                    call_module="Sudo",
 879                    call_function="sudo",
 880                    call_params={
 881                        "call": call.value,  # type: ignore
 882                    },
 883                )
 884
 885            # modify the rpc methods at runtime, to allow for correct payment
 886            # fee calculation parity has a bug in this version,
 887            # where the method has to be removed
 888            rpc_methods = substrate.config.get("rpc_methods")  # type: ignore
 889
 890            if "state_call" in rpc_methods:  # type: ignore
 891                rpc_methods.remove("state_call")  # type: ignore
 892
 893            # create the multisig account
 894            multisig_acc = substrate.generate_multisig_account(  # type: ignore
 895                signatories, threshold
 896            )
 897
 898            # send the multisig extrinsic
 899            extrinsic = substrate.create_multisig_extrinsic(  # type: ignore
 900                call=call,  # type: ignore
 901                keypair=key,
 902                multisig_account=multisig_acc,  # type: ignore
 903                era=era,  # type: ignore
 904            )  # type: ignore
 905
 906            response = substrate.submit_extrinsic(
 907                extrinsic=extrinsic,
 908                wait_for_inclusion=wait_for_inclusion,
 909                wait_for_finalization=wait_for_finalization,
 910            )
 911
 912        if wait_for_inclusion:
 913            if not response.is_success:
 914                raise ChainTransactionError(
 915                    response.error_message, response  # type: ignore
 916                )
 917
 918        return response
 919
 920    def transfer(
 921        self,
 922        key: Keypair,
 923        amount: int,
 924        dest: Ss58Address,
 925    ) -> ExtrinsicReceipt:
 926        """
 927        Transfers a specified amount of tokens from the signer's account to the
 928        specified account.
 929
 930        Args:
 931            key: The keypair associated with the sender's account.
 932            amount: The amount to transfer, in nanotokens.
 933            dest: The SS58 address of the recipient.
 934
 935        Returns:
 936            A receipt of the transaction.
 937
 938        Raises:
 939            InsufficientBalanceError: If the sender's account does not have
 940              enough balance.
 941            ChainTransactionError: If the transaction fails.
 942        """
 943
 944        params = {"dest": dest, "value": amount}
 945
 946        return self.compose_call(
 947            module="Balances", fn="transfer_keep_alive", params=params, key=key
 948        )
 949
 950    def transfer_multiple(
 951        self,
 952        key: Keypair,
 953        destinations: list[Ss58Address],
 954        amounts: list[int],
 955        netuid: str | int = 0,
 956    ) -> ExtrinsicReceipt:
 957        """
 958        Transfers specified amounts of tokens from the signer's account to
 959        multiple target accounts.
 960
 961        The `destinations` and `amounts` lists must be of the same length.
 962
 963        Args:
 964            key: The keypair associated with the sender's account.
 965            destinations: A list of SS58 addresses of the recipients.
 966            amounts: Amount to transfer to each recipient, in nanotokens.
 967            netuid: The network identifier.
 968
 969        Returns:
 970            A receipt of the transaction.
 971
 972        Raises:
 973            InsufficientBalanceError: If the sender's account does not have
 974              enough balance for all transfers.
 975            ChainTransactionError: If the transaction fails.
 976        """
 977
 978        assert len(destinations) == len(amounts)
 979
 980        # extract existential deposit from amounts
 981        existential_deposit = self.get_existential_deposit()
 982        amounts = [a - existential_deposit for a in amounts]
 983
 984        params = {
 985            "netuid": netuid,
 986            "destinations": destinations,
 987            "amounts": amounts,
 988        }
 989
 990        return self.compose_call(
 991            module="SubspaceModule", fn="transfer_multiple", params=params, key=key
 992        )
 993
 994    def stake(
 995        self,
 996        key: Keypair,
 997        amount: int,
 998        dest: Ss58Address,
 999    ) -> ExtrinsicReceipt:
1000        """
1001        Stakes the specified amount of tokens to a module key address.
1002
1003        Args:
1004            key: The keypair associated with the staker's account.
1005            amount: The amount of tokens to stake, in nanotokens.
1006            dest: The SS58 address of the module key to stake to.
1007            netuid: The network identifier.
1008
1009        Returns:
1010            A receipt of the staking transaction.
1011
1012        Raises:
1013            InsufficientBalanceError: If the staker's account does not have
1014              enough balance.
1015            ChainTransactionError: If the transaction fails.
1016        """
1017
1018        params = {"amount": amount, "module_key": dest}
1019
1020        return self.compose_call(fn="add_stake", params=params, key=key)
1021
1022    def unstake(
1023        self,
1024        key: Keypair,
1025        amount: int,
1026        dest: Ss58Address,
1027    ) -> ExtrinsicReceipt:
1028        """
1029        Unstakes the specified amount of tokens from a module key address.
1030
1031        Args:
1032            key: The keypair associated with the unstaker's account.
1033            amount: The amount of tokens to unstake, in nanotokens.
1034            dest: The SS58 address of the module key to unstake from.
1035            netuid: The network identifier.
1036
1037        Returns:
1038            A receipt of the unstaking transaction.
1039
1040        Raises:
1041            InsufficientStakeError: If the staked key does not have enough
1042              staked tokens by the signer key.
1043            ChainTransactionError: If the transaction fails.
1044        """
1045
1046        params = {"amount": amount, "module_key": dest}
1047        return self.compose_call(fn="remove_stake", params=params, key=key)
1048
1049    def update_module(
1050        self,
1051        key: Keypair,
1052        name: str,
1053        address: str,
1054        metadata: str | None = None,
1055        delegation_fee: int = 20,
1056        netuid: int = 0,
1057    ) -> ExtrinsicReceipt:
1058        """
1059        Updates the parameters of a registered module.
1060
1061        The delegation fee must be an integer between 0 and 100.
1062
1063        Args:
1064            key: The keypair associated with the module's account.
1065            name: The new name for the module. If None, the name is not updated.
1066            address: The new address for the module.
1067                If None, the address is not updated.
1068            delegation_fee: The new delegation fee for the module,
1069                between 0 and 100.
1070            netuid: The network identifier.
1071
1072        Returns:
1073            A receipt of the module update transaction.
1074
1075        Raises:
1076            InvalidParameterError: If the provided parameters are invalid.
1077            ChainTransactionError: If the transaction fails.
1078        """
1079
1080        assert isinstance(delegation_fee, int)
1081
1082        params = {
1083            "netuid": netuid,
1084            "name": name,
1085            "address": address,
1086            "delegation_fee": delegation_fee,
1087            "metadata": metadata,
1088        }
1089
1090        response = self.compose_call("update_module", params=params, key=key)
1091
1092        return response
1093
1094    def register_module(
1095        self,
1096        key: Keypair,
1097        name: str,
1098        address: str | None = None,
1099        subnet: str = "Rootnet",
1100        metadata: str | None = None,
1101    ) -> ExtrinsicReceipt:
1102        """
1103        Registers a new module in the network.
1104
1105        Args:
1106            key: The keypair used for registering the module.
1107            name: The name of the module. If None, a default or previously
1108                set name is used. # How does this work?
1109            address: The address of the module. If None, a default or
1110                previously set address is used. # How does this work?
1111            subnet: The network subnet to register the module in.
1112            min_stake: The minimum stake required for the module, in nanotokens.
1113                If None, a default value is used.
1114
1115        Returns:
1116            A receipt of the registration transaction.
1117
1118        Raises:
1119            InvalidParameterError: If the provided parameters are invalid.
1120            ChainTransactionError: If the transaction fails.
1121        """
1122
1123        key_addr = key.ss58_address
1124
1125        params = {
1126            "network": subnet,
1127            "address": address,
1128            "name": name,
1129            "module_key": key_addr,
1130            "metadata": metadata,
1131        }
1132
1133        response = self.compose_call("register", params=params, key=key)
1134        return response
1135
1136    def vote(
1137        self,
1138        key: Keypair,
1139        uids: list[int],
1140        weights: list[int],
1141        netuid: int = 0,
1142    ) -> ExtrinsicReceipt:
1143        """
1144        Casts votes on a list of module UIDs with corresponding weights.
1145
1146        The length of the UIDs list and the weights list should be the same.
1147        Each weight corresponds to the UID at the same index.
1148
1149        Args:
1150            key: The keypair used for signing the vote transaction.
1151            uids: A list of module UIDs to vote on.
1152            weights: A list of weights corresponding to each UID.
1153            netuid: The network identifier.
1154
1155        Returns:
1156            A receipt of the voting transaction.
1157
1158        Raises:
1159            InvalidParameterError: If the lengths of UIDs and weights lists
1160                do not match.
1161            ChainTransactionError: If the transaction fails.
1162        """
1163
1164        assert len(uids) == len(weights)
1165
1166        params = {
1167            "uids": uids,
1168            "weights": weights,
1169            "netuid": netuid,
1170        }
1171
1172        response = self.compose_call("set_weights", params=params, key=key)
1173
1174        return response
1175
1176    def update_subnet(
1177        self,
1178        key: Keypair,
1179        params: SubnetParams,
1180        netuid: int = 0,
1181    ) -> ExtrinsicReceipt:
1182        """
1183        Update a subnet's configuration.
1184
1185        It requires the founder key for authorization.
1186
1187        Args:
1188            key: The founder keypair of the subnet.
1189            params: The new parameters for the subnet.
1190            netuid: The network identifier.
1191
1192        Returns:
1193            A receipt of the subnet update transaction.
1194
1195        Raises:
1196            AuthorizationError: If the key is not authorized.
1197            ChainTransactionError: If the transaction fails.
1198        """
1199
1200        general_params = dict(params)
1201        general_params["netuid"] = netuid
1202
1203        response = self.compose_call(
1204            fn="update_subnet",
1205            params=general_params,
1206            key=key,
1207        )
1208
1209        return response
1210
1211    def transfer_stake(
1212        self,
1213        key: Keypair,
1214        amount: int,
1215        from_module_key: Ss58Address,
1216        dest_module_address: Ss58Address,
1217    ) -> ExtrinsicReceipt:
1218        """
1219        Realocate staked tokens from one staked module to another module.
1220
1221        Args:
1222            key: The keypair associated with the account that is delegating the tokens.
1223            amount: The amount of staked tokens to transfer, in nanotokens.
1224            from_module_key: The SS58 address of the module you want to transfer from (currently delegated by the key).
1225            dest_module_address: The SS58 address of the destination (newly delegated key).
1226            netuid: The network identifier.
1227
1228        Returns:
1229            A receipt of the stake transfer transaction.
1230
1231        Raises:
1232            InsufficientStakeError: If the source module key does not have
1233            enough staked tokens. ChainTransactionError: If the transaction
1234            fails.
1235        """
1236
1237        amount = amount - self.get_existential_deposit()
1238
1239        params = {
1240            "amount": amount,
1241            "module_key": from_module_key,
1242            "new_module_key": dest_module_address,
1243        }
1244
1245        response = self.compose_call("transfer_stake", key=key, params=params)
1246
1247        return response
1248
1249    def multiunstake(
1250        self,
1251        key: Keypair,
1252        keys: list[Ss58Address],
1253        amounts: list[int],
1254    ) -> ExtrinsicReceipt:
1255        """
1256        Unstakes tokens from multiple module keys.
1257
1258        And the lists `keys` and `amounts` must be of the same length. Each
1259        amount corresponds to the module key at the same index.
1260
1261        Args:
1262            key: The keypair associated with the unstaker's account.
1263            keys: A list of SS58 addresses of the module keys to unstake from.
1264            amounts: A list of amounts to unstake from each module key,
1265              in nanotokens.
1266            netuid: The network identifier.
1267
1268        Returns:
1269            A receipt of the multi-unstaking transaction.
1270
1271        Raises:
1272            MismatchedLengthError: If the lengths of keys and amounts lists do
1273            not match. InsufficientStakeError: If any of the module keys do not
1274            have enough staked tokens. ChainTransactionError: If the transaction
1275            fails.
1276        """
1277
1278        assert len(keys) == len(amounts)
1279
1280        params = {"module_keys": keys, "amounts": amounts}
1281
1282        response = self.compose_call("remove_stake_multiple", params=params, key=key)
1283
1284        return response
1285
1286    def multistake(
1287        self,
1288        key: Keypair,
1289        keys: list[Ss58Address],
1290        amounts: list[int],
1291    ) -> ExtrinsicReceipt:
1292        """
1293        Stakes tokens to multiple module keys.
1294
1295        The lengths of the `keys` and `amounts` lists must be the same. Each
1296        amount corresponds to the module key at the same index.
1297
1298        Args:
1299            key: The keypair associated with the staker's account.
1300            keys: A list of SS58 addresses of the module keys to stake to.
1301            amounts: A list of amounts to stake to each module key,
1302                in nanotokens.
1303            netuid: The network identifier.
1304
1305        Returns:
1306            A receipt of the multi-staking transaction.
1307
1308        Raises:
1309            MismatchedLengthError: If the lengths of keys and amounts lists
1310                do not match.
1311            ChainTransactionError: If the transaction fails.
1312        """
1313
1314        assert len(keys) == len(amounts)
1315
1316        params = {
1317            "module_keys": keys,
1318            "amounts": amounts,
1319        }
1320
1321        response = self.compose_call("add_stake_multiple", params=params, key=key)
1322
1323        return response
1324
1325    def add_profit_shares(
1326        self,
1327        key: Keypair,
1328        keys: list[Ss58Address],
1329        shares: list[int],
1330    ) -> ExtrinsicReceipt:
1331        """
1332        Allocates profit shares to multiple keys.
1333
1334        The lists `keys` and `shares` must be of the same length,
1335        with each share amount corresponding to the key at the same index.
1336
1337        Args:
1338            key: The keypair associated with the account
1339                distributing the shares.
1340            keys: A list of SS58 addresses to allocate shares to.
1341            shares: A list of share amounts to allocate to each key,
1342                in nanotokens.
1343
1344        Returns:
1345            A receipt of the profit sharing transaction.
1346
1347        Raises:
1348            MismatchedLengthError: If the lengths of keys and shares
1349                lists do not match.
1350            ChainTransactionError: If the transaction fails.
1351        """
1352
1353        assert len(keys) == len(shares)
1354
1355        params = {"keys": keys, "shares": shares}
1356
1357        response = self.compose_call("add_profit_shares", params=params, key=key)
1358
1359        return response
1360
1361    def add_subnet_proposal(
1362        self, key: Keypair,
1363        params: dict[str, Any],
1364        ipfs: str,
1365        netuid: int = 0
1366    ) -> ExtrinsicReceipt:
1367        """
1368        Submits a proposal for creating or modifying a subnet within the
1369        network.
1370
1371        The proposal includes various parameters like the name, founder, share
1372        allocations, and other subnet-specific settings.
1373
1374        Args:
1375            key: The keypair used for signing the proposal transaction.
1376            params: The parameters for the subnet proposal.
1377            netuid: The network identifier.
1378
1379        Returns:
1380            A receipt of the subnet proposal transaction.
1381
1382        Raises:
1383            InvalidParameterError: If the provided subnet
1384                parameters are invalid.
1385            ChainTransactionError: If the transaction fails.
1386        """
1387
1388        general_params = dict(params)
1389        general_params["subnet_id"] = netuid
1390        general_params["data"] = ipfs
1391        # breakpoint()
1392        # general_params["burn_config"] = json.dumps(general_params["burn_config"])
1393        response = self.compose_call(
1394            fn="add_subnet_params_proposal",
1395            params=general_params,
1396            key=key,
1397            module="GovernanceModule",
1398        )
1399
1400        return response
1401
1402    def add_custom_proposal(
1403        self,
1404        key: Keypair,
1405        cid: str,
1406    ) -> ExtrinsicReceipt:
1407
1408        params = {"data": cid}
1409
1410        response = self.compose_call(
1411            fn="add_global_custom_proposal",
1412            params=params,
1413            key=key,
1414            module="GovernanceModule",
1415        )
1416        return response
1417
1418    def add_custom_subnet_proposal(
1419        self,
1420        key: Keypair,
1421        cid: str,
1422        netuid: int = 0,
1423    ) -> ExtrinsicReceipt:
1424        """
1425        Submits a proposal for creating or modifying a custom subnet within the
1426        network.
1427
1428        The proposal includes various parameters like the name, founder, share
1429        allocations, and other subnet-specific settings.
1430
1431        Args:
1432            key: The keypair used for signing the proposal transaction.
1433            params: The parameters for the subnet proposal.
1434            netuid: The network identifier.
1435
1436        Returns:
1437            A receipt of the subnet proposal transaction.
1438        """
1439
1440        params = {
1441            "data": cid,
1442            "subnet_id": netuid,
1443        }
1444
1445        response = self.compose_call(
1446            fn="add_subnet_custom_proposal",
1447            params=params,
1448            key=key,
1449            module="GovernanceModule",
1450        )
1451
1452        return response
1453
1454    def add_global_proposal(
1455        self,
1456        key: Keypair,
1457        params: NetworkParams,
1458        cid: str | None,
1459    ) -> ExtrinsicReceipt:
1460        """
1461        Submits a proposal for altering the global network parameters.
1462
1463        Allows for the submission of a proposal to
1464        change various global parameters
1465        of the network, such as emission rates, rate limits, and voting
1466        thresholds. It is used to
1467        suggest changes that affect the entire network's operation.
1468
1469        Args:
1470            key: The keypair used for signing the proposal transaction.
1471            params: A dictionary containing global network parameters
1472                    like maximum allowed subnets, modules,
1473                    transaction rate limits, and others.
1474
1475        Returns:
1476            A receipt of the global proposal transaction.
1477
1478        Raises:
1479            InvalidParameterError: If the provided network
1480                parameters are invalid.
1481            ChainTransactionError: If the transaction fails.
1482        """
1483        general_params = cast(dict[str, Any], params)
1484        cid = cid or ""
1485        general_params["data"] = cid
1486
1487        response = self.compose_call(
1488            fn="add_global_params_proposal",
1489            params=general_params,
1490            key=key,
1491            module="GovernanceModule",
1492        )
1493
1494        return response
1495
1496    def vote_on_proposal(
1497        self,
1498        key: Keypair,
1499        proposal_id: int,
1500        agree: bool,
1501    ) -> ExtrinsicReceipt:
1502        """
1503        Casts a vote on a specified proposal within the network.
1504
1505        Args:
1506            key: The keypair used for signing the vote transaction.
1507            proposal_id: The unique identifier of the proposal to vote on.
1508
1509        Returns:
1510            A receipt of the voting transaction in nanotokens.
1511
1512        Raises:
1513            InvalidProposalIDError: If the provided proposal ID does not
1514                exist or is invalid.
1515            ChainTransactionError: If the transaction fails.
1516        """
1517
1518        params = {"proposal_id": proposal_id, "agree": agree}
1519
1520        response = self.compose_call(
1521            "vote_proposal",
1522            key=key,
1523            params=params,
1524            module="GovernanceModule",
1525        )
1526
1527        return response
1528
1529    def unvote_on_proposal(
1530        self,
1531        key: Keypair,
1532        proposal_id: int,
1533    ) -> ExtrinsicReceipt:
1534        """
1535        Retracts a previously cast vote on a specified proposal.
1536
1537        Args:
1538            key: The keypair used for signing the unvote transaction.
1539            proposal_id: The unique identifier of the proposal to withdraw the
1540                vote from.
1541
1542        Returns:
1543            A receipt of the unvoting transaction in nanotokens.
1544
1545        Raises:
1546            InvalidProposalIDError: If the provided proposal ID does not
1547                exist or is invalid.
1548            ChainTransactionError: If the transaction fails to be processed, or
1549                if there was no prior vote to retract.
1550        """
1551
1552        params = {"proposal_id": proposal_id}
1553
1554        response = self.compose_call(
1555            "remove_vote_proposal",
1556            key=key,
1557            params=params,
1558            module="GovernanceModule",
1559        )
1560
1561        return response
1562
1563    def enable_vote_power_delegation(self, key: Keypair) -> ExtrinsicReceipt:
1564        """
1565        Enables vote power delegation for the signer's account.
1566
1567        Args:
1568            key: The keypair used for signing the delegation transaction.
1569
1570        Returns:
1571            A receipt of the vote power delegation transaction.
1572
1573        Raises:
1574            ChainTransactionError: If the transaction fails.
1575        """
1576
1577        response = self.compose_call(
1578            "enable_vote_power_delegation",
1579            params={},
1580            key=key,
1581            module="GovernanceModule",
1582        )
1583
1584        return response
1585
1586    def disable_vote_power_delegation(self, key: Keypair) -> ExtrinsicReceipt:
1587        """
1588        Disables vote power delegation for the signer's account.
1589
1590        Args:
1591            key: The keypair used for signing the delegation transaction.
1592
1593        Returns:
1594            A receipt of the vote power delegation transaction.
1595
1596        Raises:
1597            ChainTransactionError: If the transaction fails.
1598        """
1599
1600        response = self.compose_call(
1601            "disable_vote_power_delegation",
1602            params={},
1603            key=key,
1604            module="GovernanceModule",
1605        )
1606
1607        return response
1608
1609    def add_dao_application(
1610        self, key: Keypair, application_key: Ss58Address, data: str
1611    ) -> ExtrinsicReceipt:
1612        """
1613        Submits a new application to the general subnet DAO.
1614
1615        Args:
1616            key: The keypair used for signing the application transaction.
1617            application_key: The SS58 address of the application key.
1618            data: The data associated with the application.
1619
1620        Returns:
1621            A receipt of the application transaction.
1622
1623        Raises:
1624            ChainTransactionError: If the transaction fails.
1625        """
1626
1627        params = {"application_key": application_key, "data": data}
1628
1629        response = self.compose_call(
1630            "add_dao_application", module="GovernanceModule", key=key,
1631            params=params
1632        )
1633
1634        return response
1635
1636    def query_map_curator_applications(self) -> dict[str, dict[str, str]]:
1637        query_result = self.query_map(
1638            "CuratorApplications", module="GovernanceModule", params=[],
1639            extract_value=False
1640        )
1641        applications = query_result.get("CuratorApplications", {})
1642        return applications
1643
1644    def query_map_proposals(
1645        self, extract_value: bool = False
1646    ) -> dict[int, dict[str, Any]]:
1647        """
1648        Retrieves a mappping of proposals from the network.
1649
1650        Queries the network and returns a mapping of proposal IDs to
1651        their respective parameters.
1652
1653        Returns:
1654            A dictionary mapping proposal IDs
1655            to dictionaries of their parameters.
1656
1657        Raises:
1658            QueryError: If the query to the network fails or is invalid.
1659        """
1660
1661        return self.query_map(
1662            "Proposals", extract_value=extract_value, module="GovernanceModule"
1663        )["Proposals"]
1664
1665    def query_map_weights(
1666        self, netuid: int = 0, extract_value: bool = False
1667    ) -> dict[int, list[tuple[int, int]]] | None:
1668        """
1669        Retrieves a mapping of weights for keys on the network.
1670
1671        Queries the network and returns a mapping of key UIDs to
1672        their respective weights.
1673
1674        Args:
1675            netuid: The network UID from which to get the weights.
1676
1677        Returns:
1678            A dictionary mapping key UIDs to lists of their weights.
1679
1680        Raises:
1681            QueryError: If the query to the network fails or is invalid.
1682        """
1683
1684        weights_dict = self.query_map(
1685            "Weights",
1686            [netuid],
1687            extract_value=extract_value
1688        ).get("Weights")
1689        return weights_dict
1690
1691    def query_map_key(
1692        self,
1693        netuid: int = 0,
1694        extract_value: bool = False,
1695    ) -> dict[int, Ss58Address]:
1696        """
1697        Retrieves a map of keys from the network.
1698
1699        Fetches a mapping of key UIDs to their associated
1700        addresses on the network.
1701        The query can be targeted at a specific network UID if required.
1702
1703        Args:
1704            netuid: The network UID from which to get the keys.
1705
1706        Returns:
1707            A dictionary mapping key UIDs to their addresses.
1708
1709        Raises:
1710            QueryError: If the query to the network fails or is invalid.
1711        """
1712        return self.query_map("Keys", [netuid], extract_value=extract_value)["Keys"]
1713
1714    def query_map_address(
1715        self, netuid: int = 0, extract_value: bool = False
1716    ) -> dict[int, str]:
1717        """
1718        Retrieves a map of key addresses from the network.
1719
1720        Queries the network for a mapping of key UIDs to their addresses.
1721
1722        Args:
1723            netuid: The network UID from which to get the addresses.
1724
1725        Returns:
1726            A dictionary mapping key UIDs to their addresses.
1727
1728        Raises:
1729            QueryError: If the query to the network fails or is invalid.
1730        """
1731
1732        return self.query_map("Address", [netuid], extract_value=extract_value)[
1733            "Address"
1734        ]
1735
1736    def query_map_emission(self, extract_value: bool = False) -> dict[int, list[int]]:
1737        """
1738        Retrieves a map of emissions for keys on the network.
1739
1740        Queries the network to get a mapping of
1741        key UIDs to their emission values.
1742
1743        Returns:
1744            A dictionary mapping key UIDs to lists of their emission values.
1745
1746        Raises:
1747            QueryError: If the query to the network fails or is invalid.
1748        """
1749
1750        return self.query_map("Emission", extract_value=extract_value)["Emission"]
1751
1752    def query_map_pending_emission(self, extract_value: bool = False) -> int:
1753        """
1754        Retrieves a map of pending emissions for the subnets.
1755
1756        Queries the network for a mapping of subnet UIDs to their pending emission values.
1757
1758        Returns:
1759            A dictionary mapping subnet UIDs to their pending emission values.
1760
1761        Raises:
1762            QueryError: If the query to the network fails or is invalid.
1763        """
1764        return self.query_map("PendingEmission", extract_value=extract_value, module="SubnetEmissionModule")["PendingEmission"]
1765
1766    def query_map_subnet_emission(self, extract_value: bool = False) -> dict[int, int]:
1767        """
1768        Retrieves a map of subnet emissions for the network.
1769
1770        Queries the network for a mapping of subnet UIDs to their emission values.
1771
1772        Returns:
1773            A dictionary mapping subnet UIDs to their emission values.
1774
1775        Raises:
1776            QueryError: If the query to the network fails or is invalid.
1777        """
1778
1779        return self.query_map("SubnetEmission", extract_value=extract_value, module="SubnetEmissionModule")["SubnetEmission"]
1780
1781    def query_map_subnet_consensus(self, extract_value: bool = False) -> dict[int, str]:
1782        """
1783        Retrieves a map of subnet consensus types for the network.
1784
1785        Queries the network for a mapping of subnet UIDs to their consensus types.
1786
1787        Returns:
1788            A dictionary mapping subnet UIDs to their consensus types.
1789
1790        Raises:
1791            QueryError: If the query to the network fails or is invalid.
1792        """
1793
1794        return self.query_map("SubnetConsensusType", extract_value=extract_value, module="SubnetEmissionModule")["SubnetConsensusType"]
1795
1796    def query_map_incentive(self, extract_value: bool = False) -> dict[int, list[int]]:
1797        """
1798        Retrieves a mapping of incentives for keys on the network.
1799
1800        Queries the network and returns a mapping of key UIDs to
1801        their respective incentive values.
1802
1803        Returns:
1804            A dictionary mapping key UIDs to lists of their incentive values.
1805
1806        Raises:
1807            QueryError: If the query to the network fails or is invalid.
1808        """
1809
1810        return self.query_map("Incentive", extract_value=extract_value)["Incentive"]
1811
1812    def query_map_dividend(self, extract_value: bool = False) -> dict[int, list[int]]:
1813        """
1814        Retrieves a mapping of dividends for keys on the network.
1815
1816        Queries the network for a mapping of key UIDs to
1817        their dividend values.
1818
1819        Returns:
1820            A dictionary mapping key UIDs to lists of their dividend values.
1821
1822        Raises:
1823            QueryError: If the query to the network fails or is invalid.
1824        """
1825
1826        return self.query_map("Dividends", extract_value=extract_value)["Dividends"]
1827
1828    def query_map_regblock(
1829        self, netuid: int = 0, extract_value: bool = False
1830    ) -> dict[int, int]:
1831        """
1832        Retrieves a mapping of registration blocks for keys on the network.
1833
1834        Queries the network for a mapping of key UIDs to
1835        the blocks where they were registered.
1836
1837        Args:
1838            netuid: The network UID from which to get the registration blocks.
1839
1840        Returns:
1841            A dictionary mapping key UIDs to their registration blocks.
1842
1843        Raises:
1844            QueryError: If the query to the network fails or is invalid.
1845        """
1846
1847        return self.query_map(
1848            "RegistrationBlock", [netuid], extract_value=extract_value
1849        )["RegistrationBlock"]
1850
1851    def query_map_lastupdate(self, extract_value: bool = False) -> dict[int, list[int]]:
1852        """
1853        Retrieves a mapping of the last update times for keys on the network.
1854
1855        Queries the network for a mapping of key UIDs to their last update times.
1856
1857        Returns:
1858            A dictionary mapping key UIDs to lists of their last update times.
1859
1860        Raises:
1861            QueryError: If the query to the network fails or is invalid.
1862        """
1863
1864        return self.query_map("LastUpdate", extract_value=extract_value)["LastUpdate"]
1865
1866    def query_map_stakefrom(
1867        self, extract_value: bool = False
1868    ) -> dict[Ss58Address, list[tuple[Ss58Address, int]]]:
1869        """
1870        Retrieves a mapping of stakes from various sources for keys on the network.
1871
1872        Queries the network to obtain a mapping of key addresses to the sources
1873        and amounts of stakes they have received.
1874
1875        Args:
1876            netuid: The network UID from which to get the stakes.
1877
1878        Returns:
1879            A dictionary mapping key addresses to lists of tuples
1880            (module_key_address, amount).
1881
1882        Raises:
1883            QueryError: If the query to the network fails or is invalid.
1884        """
1885
1886        result = self.query_map("StakeFrom", [], extract_value=extract_value)[
1887            "StakeFrom"
1888        ]
1889
1890        return transform_stake_dmap(result)
1891
1892    def query_map_staketo(
1893        self, extract_value: bool = False
1894    ) -> dict[str, list[tuple[str, int]]]:
1895        """
1896        Retrieves a mapping of stakes to destinations for keys on the network.
1897
1898        Queries the network for a mapping of key addresses to the destinations
1899        and amounts of stakes they have made.
1900
1901        Args:
1902            netuid: The network UID from which to get the stakes.
1903
1904        Returns:
1905            A dictionary mapping key addresses to lists of tuples
1906            (module_key_address, amount).
1907
1908        Raises:
1909            QueryError: If the query to the network fails or is invalid.
1910        """
1911
1912        result = self.query_map("StakeTo", [], extract_value=extract_value)[
1913            "StakeTo"
1914        ]
1915        return transform_stake_dmap(result)
1916
1917    def query_map_delegationfee(
1918        self, netuid: int = 0, extract_value: bool = False
1919    ) -> dict[str, int]:
1920        """
1921        Retrieves a mapping of delegation fees for keys on the network.
1922
1923        Queries the network to obtain a mapping of key addresses to their
1924        respective delegation fees.
1925
1926        Args:
1927            netuid: The network UID to filter the delegation fees.
1928
1929        Returns:
1930            A dictionary mapping key addresses to their delegation fees.
1931
1932        Raises:
1933            QueryError: If the query to the network fails or is invalid.
1934        """
1935
1936        return self.query_map("DelegationFee", [netuid], extract_value=extract_value)[
1937            "DelegationFee"
1938        ]
1939
1940    def query_map_tempo(self, extract_value: bool = False) -> dict[int, int]:
1941        """
1942        Retrieves a mapping of tempo settings for the network.
1943
1944        Queries the network to obtain the tempo (rate of reward distributions)
1945        settings for various network subnets.
1946
1947        Returns:
1948            A dictionary mapping network UIDs to their tempo settings.
1949
1950        Raises:
1951            QueryError: If the query to the network fails or is invalid.
1952        """
1953
1954        return self.query_map("Tempo", extract_value=extract_value)["Tempo"]
1955
1956    def query_map_immunity_period(self, extract_value: bool) -> dict[int, int]:
1957        """
1958        Retrieves a mapping of immunity periods for the network.
1959
1960        Queries the network for the immunity period settings,
1961        which represent the time duration during which modules
1962        can not get deregistered.
1963
1964        Returns:
1965            A dictionary mapping network UIDs to their immunity period settings.
1966
1967        Raises:
1968            QueryError: If the query to the network fails or is invalid.
1969        """
1970
1971        return self.query_map("ImmunityPeriod", extract_value=extract_value)[
1972            "ImmunityPeriod"
1973        ]
1974
1975    def query_map_min_allowed_weights(
1976        self, extract_value: bool = False
1977    ) -> dict[int, int]:
1978        """
1979        Retrieves a mapping of minimum allowed weights for the network.
1980
1981        Queries the network to obtain the minimum allowed weights,
1982        which are the lowest permissible weight values that can be set by
1983        validators.
1984
1985        Returns:
1986            A dictionary mapping network UIDs to
1987            their minimum allowed weight values.
1988
1989        Raises:
1990            QueryError: If the query to the network fails or is invalid.
1991        """
1992
1993        return self.query_map("MinAllowedWeights", extract_value=extract_value)[
1994            "MinAllowedWeights"
1995        ]
1996
1997    def query_map_max_allowed_weights(
1998        self, extract_value: bool = False
1999    ) -> dict[int, int]:
2000        """
2001        Retrieves a mapping of maximum allowed weights for the network.
2002
2003        Queries the network for the maximum allowed weights,
2004        which are the highest permissible
2005        weight values that can be set by validators.
2006
2007        Returns:
2008            A dictionary mapping network UIDs to
2009            their maximum allowed weight values.
2010
2011        Raises:
2012            QueryError: If the query to the network fails or is invalid.
2013        """
2014
2015        return self.query_map("MaxAllowedWeights", extract_value=extract_value)[
2016            "MaxAllowedWeights"
2017        ]
2018
2019    def query_map_max_allowed_uids(self, extract_value: bool = False) -> dict[int, int]:
2020        """
2021        Queries the network for the maximum number of allowed user IDs (UIDs)
2022        for each network subnet.
2023
2024        Fetches a mapping of network subnets to their respective
2025        limits on the number of user IDs that can be created or used.
2026
2027        Returns:
2028            A dictionary mapping network UIDs (unique identifiers) to their
2029            maximum allowed number of UIDs.
2030            Each entry represents a network subnet
2031            with its corresponding UID limit.
2032
2033        Raises:
2034            QueryError: If the query to the network fails or is invalid.
2035        """
2036
2037        return self.query_map("MaxAllowedUids", extract_value=extract_value)[
2038            "MaxAllowedUids"
2039        ]
2040
2041    def query_map_min_stake(self, extract_value: bool = False) -> dict[int, int]:
2042        """
2043        Retrieves a mapping of minimum allowed stake on the network.
2044
2045        Queries the network to obtain the minimum number of stake,
2046        which is represented in nanotokens.
2047
2048        Returns:
2049            A dictionary mapping network UIDs to
2050            their minimum allowed stake values.
2051
2052        Raises:
2053            QueryError: If the query to the network fails or is invalid.
2054        """
2055
2056        return self.query_map("MinStake", extract_value=extract_value)["MinStake"]
2057
2058    def query_map_max_stake(self, extract_value: bool = False) -> dict[int, int]:
2059        """
2060        Retrieves a mapping of the maximum stake values for the network.
2061
2062        Queries the network for the maximum stake values across various s
2063        ubnets of the network.
2064
2065        Returns:
2066            A dictionary mapping network UIDs to their maximum stake values.
2067
2068        Raises:
2069            QueryError: If the query to the network fails or is invalid.
2070        """
2071
2072        return self.query_map("MaxStake", extract_value=extract_value)["MaxStake"]
2073
2074    def query_map_founder(self, extract_value: bool = False) -> dict[int, str]:
2075        """
2076        Retrieves a mapping of founders for the network.
2077
2078        Queries the network to obtain the founders associated with
2079        various subnets.
2080
2081        Returns:
2082            A dictionary mapping network UIDs to their respective founders.
2083
2084        Raises:
2085            QueryError: If the query to the network fails or is invalid.
2086        """
2087
2088        return self.query_map("Founder", extract_value=extract_value)["Founder"]
2089
2090    def query_map_founder_share(self, extract_value: bool = False) -> dict[int, int]:
2091        """
2092        Retrieves a mapping of founder shares for the network.
2093
2094        Queries the network for the share percentages
2095        allocated to founders across different subnets.
2096
2097        Returns:
2098            A dictionary mapping network UIDs to their founder share percentages.
2099
2100        Raises:
2101            QueryError: If the query to the network fails or is invalid.
2102        """
2103
2104        return self.query_map("FounderShare", extract_value=extract_value)[
2105            "FounderShare"
2106        ]
2107
2108    def query_map_incentive_ratio(self, extract_value: bool = False) -> dict[int, int]:
2109        """
2110        Retrieves a mapping of incentive ratios for the network.
2111
2112        Queries the network for the incentive ratios,
2113        which are the proportions of rewards or incentives
2114        allocated in different subnets of the network.
2115
2116        Returns:
2117            A dictionary mapping network UIDs to their incentive ratios.
2118
2119        Raises:
2120            QueryError: If the query to the network fails or is invalid.
2121        """
2122
2123        return self.query_map("IncentiveRatio", extract_value=extract_value)[
2124            "IncentiveRatio"
2125        ]
2126
2127    def query_map_trust_ratio(self, extract_value: bool = False) -> dict[int, int]:
2128        """
2129        Retrieves a mapping of trust ratios for the network.
2130
2131        Queries the network for trust ratios,
2132        indicative of the level of trust or credibility assigned
2133        to different subnets of the network.
2134
2135        Returns:
2136            A dictionary mapping network UIDs to their trust ratios.
2137
2138        Raises:
2139            QueryError: If the query to the network fails or is invalid.
2140        """
2141
2142        return self.query_map("TrustRatio", extract_value=extract_value)["TrustRatio"]
2143
2144    def query_map_vote_mode_subnet(self, extract_value: bool = False) -> dict[int, str]:
2145        """
2146        Retrieves a mapping of vote modes for subnets within the network.
2147
2148        Queries the network for the voting modes used in different
2149        subnets, which define the methodology or approach of voting within those
2150        subnets.
2151
2152        Returns:
2153            A dictionary mapping network UIDs to their vote
2154            modes for subnets.
2155
2156        Raises:
2157            QueryError: If the query to the network fails or is invalid.
2158        """
2159
2160        return self.query_map("VoteModeSubnet", extract_value=extract_value)[
2161            "VoteModeSubnet"
2162        ]
2163
2164    def query_map_legit_whitelist(
2165        self, extract_value: bool = False
2166    ) -> dict[Ss58Address, int]:
2167        """
2168        Retrieves a mapping of whitelisted addresses for the network.
2169
2170        Queries the network for a mapping of whitelisted addresses
2171        and their respective legitimacy status.
2172
2173        Returns:
2174            A dictionary mapping addresses to their legitimacy status.
2175
2176        Raises:
2177            QueryError: If the query to the network fails or is invalid.
2178        """
2179
2180        return self.query_map(
2181            "LegitWhitelist", module="GovernanceModule", extract_value=extract_value)[
2182            "LegitWhitelist"
2183        ]
2184
2185    def query_map_subnet_names(self, extract_value: bool = False) -> dict[int, str]:
2186        """
2187        Retrieves a mapping of subnet names within the network.
2188
2189        Queries the network for the names of various subnets,
2190        providing an overview of the different
2191        subnets within the network.
2192
2193        Returns:
2194            A dictionary mapping network UIDs to their subnet names.
2195
2196        Raises:
2197            QueryError: If the query to the network fails or is invalid.
2198        """
2199
2200        return self.query_map("SubnetNames", extract_value=extract_value)["SubnetNames"]
2201
2202    def query_map_balances(
2203        self, extract_value: bool = False
2204    ) -> dict[str, dict[str, int | dict[str, int | float]]]:
2205        """
2206        Retrieves a mapping of account balances within the network.
2207
2208        Queries the network for the balances associated with different accounts.
2209        It provides detailed information including various types of
2210        balances for each account.
2211
2212        Returns:
2213            A dictionary mapping account addresses to their balance details.
2214
2215        Raises:
2216            QueryError: If the query to the network fails or is invalid.
2217        """
2218
2219        return self.query_map("Account", module="System", extract_value=extract_value)[
2220            "Account"
2221        ]
2222
2223    def query_map_registration_blocks(
2224        self, netuid: int = 0, extract_value: bool = False
2225    ) -> dict[int, int]:
2226        """
2227        Retrieves a mapping of registration blocks for UIDs on the network.
2228
2229        Queries the network to find the block numbers at which various
2230        UIDs were registered.
2231
2232        Args:
2233            netuid: The network UID from which to get the registrations.
2234
2235        Returns:
2236            A dictionary mapping UIDs to their registration block numbers.
2237
2238        Raises:
2239            QueryError: If the query to the network fails or is invalid.
2240        """
2241
2242        return self.query_map(
2243            "RegistrationBlock", [netuid], extract_value=extract_value
2244        )["RegistrationBlock"]
2245
2246    def query_map_name(
2247        self, netuid: int = 0, extract_value: bool = False
2248    ) -> dict[int, str]:
2249        """
2250        Retrieves a mapping of names for keys on the network.
2251
2252        Queries the network for the names associated with different keys.
2253        It provides a mapping of key UIDs to their registered names.
2254
2255        Args:
2256            netuid: The network UID from which to get the names.
2257
2258        Returns:
2259            A dictionary mapping key UIDs to their names.
2260
2261        Raises:
2262            QueryError: If the query to the network fails or is invalid.
2263        """
2264
2265        return self.query_map("Name", [netuid], extract_value=extract_value)["Name"]
2266
2267    #  == QUERY FUNCTIONS == #
2268
2269    def get_immunity_period(self, netuid: int = 0) -> int:
2270        """
2271        Queries the network for the immunity period setting.
2272
2273        The immunity period is a time duration during which a module
2274        can not be deregistered from the network.
2275        Fetches the immunity period for a specified network subnet.
2276
2277        Args:
2278            netuid: The network UID for which to query the immunity period.
2279
2280        Returns:
2281            The immunity period setting for the specified network subnet.
2282
2283        Raises:
2284            QueryError: If the query to the network fails or is invalid.
2285        """
2286
2287        return self.query(
2288            "ImmunityPeriod",
2289            params=[netuid],
2290        )
2291
2292    def get_max_set_weights_per_epoch(self):
2293        return self.query("MaximumSetWeightCallsPerEpoch")
2294
2295    def get_min_allowed_weights(self, netuid: int = 0) -> int:
2296        """
2297        Queries the network for the minimum allowed weights setting.
2298
2299        Retrieves the minimum weight values that are possible to set
2300        by a validator within a specific network subnet.
2301
2302        Args:
2303            netuid: The network UID for which to query the minimum allowed
2304              weights.
2305
2306        Returns:
2307            The minimum allowed weight values for the specified network
2308              subnet.
2309
2310        Raises:
2311            QueryError: If the query to the network fails or is invalid.
2312        """
2313
2314        return self.query(
2315            "MinAllowedWeights",
2316            params=[netuid],
2317        )
2318
2319    def get_dao_treasury_address(self) -> Ss58Address:
2320        return self.query("DaoTreasuryAddress", module="GovernanceModule")
2321
2322    def get_max_allowed_weights(self, netuid: int = 0) -> int:
2323        """
2324        Queries the network for the maximum allowed weights setting.
2325
2326        Retrieves the maximum weight values that are possible to set
2327        by a validator within a specific network subnet.
2328
2329        Args:
2330            netuid: The network UID for which to query the maximum allowed
2331              weights.
2332
2333        Returns:
2334            The maximum allowed weight values for the specified network
2335              subnet.
2336
2337        Raises:
2338            QueryError: If the query to the network fails or is invalid.
2339        """
2340
2341        return self.query("MaxAllowedWeights", params=[netuid])
2342
2343    def get_max_allowed_uids(self, netuid: int = 0) -> int:
2344        """
2345        Queries the network for the maximum allowed UIDs setting.
2346
2347        Fetches the upper limit on the number of user IDs that can
2348        be allocated or used within a specific network subnet.
2349
2350        Args:
2351            netuid: The network UID for which to query the maximum allowed UIDs.
2352
2353        Returns:
2354            The maximum number of allowed UIDs for the specified network subnet.
2355
2356        Raises:
2357            QueryError: If the query to the network fails or is invalid.
2358        """
2359
2360        return self.query("MaxAllowedUids", params=[netuid])
2361
2362    def get_name(self, netuid: int = 0) -> str:
2363        """
2364        Queries the network for the name of a specific subnet.
2365
2366        Args:
2367            netuid: The network UID for which to query the name.
2368
2369        Returns:
2370            The name of the specified network subnet.
2371
2372        Raises:
2373            QueryError: If the query to the network fails or is invalid.
2374        """
2375
2376        return self.query("Name", params=[netuid])
2377
2378    def get_subnet_name(self, netuid: int = 0) -> str:
2379        """
2380        Queries the network for the name of a specific subnet.
2381
2382        Args:
2383            netuid: The network UID for which to query the name.
2384
2385        Returns:
2386            The name of the specified network subnet.
2387
2388        Raises:
2389            QueryError: If the query to the network fails or is invalid.
2390        """
2391
2392        return self.query("SubnetNames", params=[netuid])
2393
2394    def get_global_dao_treasury(self):
2395        return self.query("GlobalDaoTreasury", module="GovernanceModule")
2396
2397    def get_n(self, netuid: int = 0) -> int:
2398        """
2399        Queries the network for the 'N' hyperparameter, which represents how
2400        many modules are on the network.
2401
2402        Args:
2403            netuid: The network UID for which to query the 'N' hyperparameter.
2404
2405        Returns:
2406            The value of the 'N' hyperparameter for the specified network
2407              subnet.
2408
2409        Raises:
2410            QueryError: If the query to the network fails or is invalid.
2411        """
2412
2413        return self.query("N", params=[netuid])
2414
2415    def get_tempo(self, netuid: int = 0) -> int:
2416        """
2417        Queries the network for the tempo setting, measured in blocks, for the
2418        specified subnet.
2419
2420        Args:
2421            netuid: The network UID for which to query the tempo.
2422
2423        Returns:
2424            The tempo setting for the specified subnet.
2425
2426        Raises:
2427            QueryError: If the query to the network fails or is invalid.
2428        """
2429
2430        return self.query("Tempo", params=[netuid])
2431
2432    def get_total_stake(self) -> int:
2433        """
2434        Retrieves a mapping of total stakes for keys on the network.
2435
2436        Queries the network for a mapping of key UIDs to their total stake amounts.
2437
2438        Returns:
2439            A dictionary mapping key UIDs to their total stake amounts.
2440
2441        Raises:
2442            QueryError: If the query to the network fails or is invalid.
2443        """
2444
2445        return self.query("TotalStake")
2446
2447    def get_registrations_per_block(self):
2448        """
2449        Queries the network for the number of registrations per block.
2450
2451        Fetches the number of registrations that are processed per
2452        block within the network.
2453
2454        Returns:
2455            The number of registrations processed per block.
2456
2457        Raises:
2458            QueryError: If the query to the network fails or is invalid.
2459        """
2460
2461        return self.query(
2462            "RegistrationsPerBlock",
2463        )
2464
2465    def max_registrations_per_block(self, netuid: int = 0):
2466        """
2467        Queries the network for the maximum number of registrations per block.
2468
2469        Retrieves the upper limit of registrations that can be processed in
2470        each block within a specific network subnet.
2471
2472        Args:
2473            netuid: The network UID for which to query.
2474
2475        Returns:
2476            The maximum number of registrations per block for
2477            the specified network subnet.
2478
2479        Raises:
2480            QueryError: If the query to the network fails or is invalid.
2481        """
2482
2483        return self.query(
2484            "MaxRegistrationsPerBlock",
2485            params=[netuid],
2486        )
2487
2488    def get_proposal(self, proposal_id: int = 0):
2489        """
2490        Queries the network for a specific proposal.
2491
2492        Args:
2493            proposal_id: The ID of the proposal to query.
2494
2495        Returns:
2496            The details of the specified proposal.
2497
2498        Raises:
2499            QueryError: If the query to the network fails, is invalid,
2500                or if the proposal ID does not exist.
2501        """
2502
2503        return self.query(
2504            "Proposals",
2505            params=[proposal_id],
2506        )
2507
2508    def get_trust(self, netuid: int = 0):
2509        """
2510        Queries the network for the trust setting of a specific network subnet.
2511
2512        Retrieves the trust level or score, which may represent the
2513        level of trustworthiness or reliability within a
2514        particular network subnet.
2515
2516        Args:
2517            netuid: The network UID for which to query the trust setting.
2518
2519        Returns:
2520            The trust level or score for the specified network subnet.
2521
2522        Raises:
2523            QueryError: If the query to the network fails or is invalid.
2524        """
2525
2526        return self.query(
2527            "Trust",
2528            params=[netuid],
2529        )
2530
2531    def get_uids(self, key: Ss58Address, netuid: int = 0) -> bool | None:
2532        """
2533        Queries the network for module UIDs associated with a specific key.
2534
2535        Args:
2536            key: The key address for which to query UIDs.
2537            netuid: The network UID within which to search for the key.
2538
2539        Returns:
2540            A list of UIDs associated with the specified key.
2541
2542        Raises:
2543            QueryError: If the query to the network fails or is invalid.
2544        """
2545
2546        return self.query(
2547            "Uids",
2548            params=[netuid, key],
2549        )
2550
2551    def get_unit_emission(self) -> int:
2552        """
2553        Queries the network for the unit emission setting.
2554
2555        Retrieves the unit emission value, which represents the
2556        emission rate or quantity for the $COMM token.
2557
2558        Returns:
2559            The unit emission value in nanos for the network.
2560
2561        Raises:
2562            QueryError: If the query to the network fails or is invalid.
2563        """
2564
2565        return self.query("UnitEmission", module="SubnetEmissionModule")
2566
2567    def get_tx_rate_limit(self) -> int:
2568        """
2569        Queries the network for the transaction rate limit.
2570
2571        Retrieves the rate limit for transactions within the network,
2572        which defines the maximum number of transactions that can be
2573        processed within a certain timeframe.
2574
2575        Returns:
2576            The transaction rate limit for the network.
2577
2578        Raises:
2579            QueryError: If the query to the network fails or is invalid.
2580        """
2581
2582        return self.query(
2583            "TxRateLimit",
2584        )
2585
2586    def get_subnet_burn(self) -> int:
2587        """Queries the network for the subnet burn value.
2588
2589        Retrieves the subnet burn value from the network, which represents
2590        the amount of tokens that are burned (permanently removed from
2591        circulation) for subnet-related operations.
2592
2593        Returns:
2594            int: The subnet burn value.
2595
2596        Raises:
2597            QueryError: If the query to the network fails or returns invalid data.
2598        """
2599
2600        return self.query(
2601            "SubnetBurn",
2602        )
2603
2604    def get_burn_rate(self) -> int:
2605        """
2606        Queries the network for the burn rate setting.
2607
2608        Retrieves the burn rate, which represents the rate at
2609        which the $COMM token is permanently
2610        removed or 'burned' from circulation.
2611
2612        Returns:
2613            The burn rate for the network.
2614
2615        Raises:
2616            QueryError: If the query to the network fails or is invalid.
2617        """
2618
2619        return self.query(
2620            "BurnRate",
2621            params=[],
2622        )
2623
2624    def get_burn(self, netuid: int = 0) -> int:
2625        """
2626        Queries the network for the burn setting.
2627
2628        Retrieves the burn value, which represents the amount of the
2629        $COMM token that is 'burned' or permanently removed from
2630        circulation.
2631
2632        Args:
2633            netuid: The network UID for which to query the burn value.
2634
2635        Returns:
2636            The burn value for the specified network subnet.
2637
2638        Raises:
2639            QueryError: If the query to the network fails or is invalid.
2640        """
2641
2642        return self.query("Burn", params=[netuid])
2643
2644    def get_min_burn(self) -> int:
2645        """
2646        Queries the network for the minimum burn setting.
2647
2648        Retrieves the minimum burn value, indicating the lowest
2649        amount of the $COMM tokens that can be 'burned' or
2650        permanently removed from circulation.
2651
2652        Returns:
2653            The minimum burn value for the network.
2654
2655        Raises:
2656            QueryError: If the query to the network fails or is invalid.
2657        """
2658
2659        return self.query(
2660            "BurnConfig",
2661            params=[],
2662        )["min_burn"]
2663
2664    def get_min_weight_stake(self) -> int:
2665        """
2666        Queries the network for the minimum weight stake setting.
2667
2668        Retrieves the minimum weight stake, which represents the lowest
2669        stake weight that is allowed for certain operations or
2670        transactions within the network.
2671
2672        Returns:
2673            The minimum weight stake for the network.
2674
2675        Raises:
2676            QueryError: If the query to the network fails or is invalid.
2677        """
2678
2679        return self.query("MinWeightStake", params=[])
2680
2681    def get_vote_mode_global(self) -> str:
2682        """
2683        Queries the network for the global vote mode setting.
2684
2685        Retrieves the global vote mode, which defines the overall voting
2686        methodology or approach used across the network in default.
2687
2688        Returns:
2689            The global vote mode setting for the network.
2690
2691        Raises:
2692            QueryError: If the query to the network fails or is invalid.
2693        """
2694
2695        return self.query(
2696            "VoteModeGlobal",
2697        )
2698
2699    def get_max_proposals(self) -> int:
2700        """
2701        Queries the network for the maximum number of proposals allowed.
2702
2703        Retrieves the upper limit on the number of proposals that can be
2704        active or considered at any given time within the network.
2705
2706        Returns:
2707            The maximum number of proposals allowed on the network.
2708
2709        Raises:
2710            QueryError: If the query to the network fails or is invalid.
2711        """
2712
2713        return self.query(
2714            "MaxProposals",
2715        )
2716
2717    def get_max_registrations_per_block(self) -> int:
2718        """
2719        Queries the network for the maximum number of registrations per block.
2720
2721        Retrieves the maximum number of registrations that can
2722        be processed in each block within the network.
2723
2724        Returns:
2725            The maximum number of registrations per block on the network.
2726
2727        Raises:
2728            QueryError: If the query to the network fails or is invalid.
2729        """
2730
2731        return self.query(
2732            "MaxRegistrationsPerBlock",
2733            params=[],
2734        )
2735
2736    def get_max_name_length(self) -> int:
2737        """
2738        Queries the network for the maximum length allowed for names.
2739
2740        Retrieves the maximum character length permitted for names
2741        within the network. Such as the module names
2742
2743        Returns:
2744            The maximum length allowed for names on the network.
2745
2746        Raises:
2747            QueryError: If the query to the network fails or is invalid.
2748        """
2749
2750        return self.query(
2751            "MaxNameLength",
2752            params=[],
2753        )
2754
2755    def get_global_vote_threshold(self) -> int:
2756        """
2757        Queries the network for the global vote threshold.
2758
2759        Retrieves the global vote threshold, which is the critical value or
2760        percentage required for decisions in the network's governance process.
2761
2762        Returns:
2763            The global vote threshold for the network.
2764
2765        Raises:
2766            QueryError: If the query to the network fails or is invalid.
2767        """
2768
2769        return self.query(
2770            "GlobalVoteThreshold",
2771        )
2772
2773    def get_max_allowed_subnets(self) -> int:
2774        """
2775        Queries the network for the maximum number of allowed subnets.
2776
2777        Retrieves the upper limit on the number of subnets that can
2778        be created or operated within the network.
2779
2780        Returns:
2781            The maximum number of allowed subnets on the network.
2782
2783        Raises:
2784            QueryError: If the query to the network fails or is invalid.
2785        """
2786
2787        return self.query(
2788            "MaxAllowedSubnets",
2789            params=[],
2790        )
2791
2792    def get_max_allowed_modules(self) -> int:
2793        """
2794        Queries the network for the maximum number of allowed modules.
2795
2796        Retrieves the upper limit on the number of modules that
2797        can be registered within the network.
2798
2799        Returns:
2800            The maximum number of allowed modules on the network.
2801
2802        Raises:
2803            QueryError: If the query to the network fails or is invalid.
2804        """
2805
2806        return self.query(
2807            "MaxAllowedModules",
2808            params=[],
2809        )
2810
2811    def get_min_stake(self, netuid: int = 0) -> int:
2812        """
2813        Queries the network for the minimum stake required to register a key.
2814
2815        Retrieves the minimum amount of stake necessary for
2816        registering a key within a specific network subnet.
2817
2818        Args:
2819            netuid: The network UID for which to query the minimum stake.
2820
2821        Returns:
2822            The minimum stake required for key registration in nanos.
2823
2824        Raises:
2825            QueryError: If the query to the network fails or is invalid.
2826        """
2827
2828        return self.query("MinStake", params=[netuid])
2829
2830    def get_stakefrom(
2831        self,
2832        key: Ss58Address,
2833    ) -> dict[str, int]:
2834        """
2835        Retrieves the stake amounts from all stakers to a specific staked address.
2836
2837        Queries the network for the stakes received by a particular staked address
2838        from all stakers.
2839
2840        Args:
2841            key: The address of the key receiving the stakes.
2842
2843        Returns:
2844            A dictionary mapping staker addresses to their respective stake amounts.
2845
2846        Raises:
2847            QueryError: If the query to the network fails or is invalid.
2848        """
2849
2850        # Has to use query map in order to iterate through the storage prefix.
2851        return self.query_map("StakeFrom", [key], extract_value=False).get("StakeFrom", {})
2852
2853    def get_staketo(
2854        self,
2855        key: Ss58Address,
2856    ) -> dict[str, int]:
2857        """
2858        Retrieves the stake amounts provided by a specific staker to all staked addresses.
2859
2860        Queries the network for the stakes provided by a particular staker to
2861        all staked addresses.
2862
2863        Args:
2864            key: The address of the key providing the stakes.
2865
2866        Returns:
2867            A dictionary mapping staked addresses to their respective received stake amounts.
2868
2869        Raises:
2870            QueryError: If the query to the network fails or is invalid.
2871        """
2872
2873        # Has to use query map in order to iterate through the storage prefix.
2874        return self.query_map("StakeTo", [key], extract_value=False).get("StakeTo", {})
2875
2876    def get_balance(
2877        self,
2878        addr: Ss58Address,
2879    ) -> int:
2880        """
2881        Retrieves the balance of a specific key.
2882
2883        Args:
2884            addr: The address of the key to query the balance for.
2885
2886        Returns:
2887            The balance of the specified key.
2888
2889        Raises:
2890            QueryError: If the query to the network fails or is invalid.
2891        """
2892
2893        result = self.query("Account", module="System", params=[addr])
2894
2895        return result["data"]["free"]
2896
2897    def get_block(self, block_hash: str | None = None) -> dict[Any, Any] | None:
2898        """
2899        Retrieves information about a specific block in the network.
2900
2901        Queries the network for details about a block, such as its number,
2902        hash, and other relevant information.
2903
2904        Returns:
2905            The requested information about the block,
2906            or None if the block does not exist
2907            or the information is not available.
2908
2909        Raises:
2910            QueryError: If the query to the network fails or is invalid.
2911        """
2912
2913        with self.get_conn() as substrate:
2914            block: dict[Any, Any] | None = substrate.get_block(  # type: ignore
2915                block_hash  # type: ignore
2916            )
2917
2918        return block
2919
2920    def get_existential_deposit(self, block_hash: str | None = None) -> int:
2921        """
2922        Retrieves the existential deposit value for the network.
2923
2924        The existential deposit is the minimum balance that must be maintained
2925        in an account to prevent it from being purged. Denotated in nano units.
2926
2927        Returns:
2928            The existential deposit value in nano units.
2929        Note:
2930            The value returned is a fixed value defined in the
2931            client and may not reflect changes in the network's configuration.
2932        """
2933
2934        with self.get_conn() as substrate:
2935            result: int = substrate.get_constant(  #  type: ignore
2936                "Balances", "ExistentialDeposit", block_hash
2937            ).value  #  type: ignore
2938
2939        return result
2940
2941    def get_voting_power_delegators(self) -> list[Ss58Address]:
2942        result = self.query("NotDelegatingVotingPower", [], module="GovernanceModule")
2943        return result
2944
2945    def add_transfer_dao_treasury_proposal(
2946        self,
2947        key: Keypair,
2948        data: str,
2949        amount_nano: int,
2950        dest: Ss58Address,
2951    ):
2952        params = {"dest": dest, "value": amount_nano, "data": data}
2953
2954        return self.compose_call(
2955            module="GovernanceModule",
2956            fn="add_transfer_dao_treasury_proposal",
2957            params=params,
2958            key=key,
2959        )
2960
2961    def delegate_rootnet_control(self, key: Keypair, dest: Ss58Address):
2962        params = {"origin": key, "target": dest}
2963
2964        return self.compose_call(
2965            module="SubspaceModule",
2966            fn="delegate_rootnet_control",
2967            params=params,
2968            key=key,
2969        )

A client for interacting with Commune network nodes, querying storage, submitting transactions, etc.

Attributes:
  • wait_for_finalization: Whether to wait for transaction finalization.

Example:

client = CommuneClient()
client.query(name='function_name', params=['param1', 'param2'])
Raises:
  • AssertionError: If the maximum connections value is less than or equal to zero.
CommuneClient( url: str, num_connections: int = 1, wait_for_finalization: bool = False)
59    def __init__(
60        self,
61        url: str,
62        num_connections: int = 1,
63        wait_for_finalization: bool = False,
64    ):
65        """
66        Args:
67            url: The URL of the network node to connect to.
68            num_connections: The number of websocket connections to be opened.
69        """
70        assert num_connections > 0
71        self._num_connections = num_connections
72        self.wait_for_finalization = wait_for_finalization
73        self._connection_queue = queue.Queue(num_connections)
74        self.url = url
75
76        for _ in range(num_connections):
77            self._connection_queue.put(SubstrateInterface(url))
Arguments:
  • url: The URL of the network node to connect to.
  • num_connections: The number of websocket connections to be opened.
wait_for_finalization: bool
url: str
connections: int
79    @property
80    def connections(self) -> int:
81        """
82        Gets the maximum allowed number of simultaneous connections to the
83        network node.
84        """
85        return self._num_connections

Gets the maximum allowed number of simultaneous connections to the network node.

@contextmanager
def get_conn(self, timeout: float | None = None, init: bool = False):
 87    @contextmanager
 88    def get_conn(self, timeout: float | None = None, init: bool = False):
 89        """
 90        Context manager to get a connection from the pool.
 91
 92        Tries to get a connection from the pool queue. If the queue is empty,
 93        it blocks for `timeout` seconds until a connection is available. If
 94        `timeout` is None, it blocks indefinitely.
 95
 96        Args:
 97            timeout: The maximum time in seconds to wait for a connection.
 98
 99        Yields:
100            The connection object from the pool.
101
102        Raises:
103            QueueEmptyError: If no connection is available within the timeout
104              period.
105        """
106        conn = self._connection_queue.get(timeout=timeout)
107        if init:
108            conn.init_runtime()  # type: ignore
109        try:
110            yield conn
111        finally:
112            self._connection_queue.put(conn)

Context manager to get a connection from the pool.

Tries to get a connection from the pool queue. If the queue is empty, it blocks for timeout seconds until a connection is available. If timeout is None, it blocks indefinitely.

Arguments:
  • timeout: The maximum time in seconds to wait for a connection.
Yields:

The connection object from the pool.

Raises:
  • QueueEmptyError: If no connection is available within the timeout period.
def query_batch( self, functions: dict[str, list[tuple[str, list[typing.Any]]]]) -> dict[str, str]:
569    def query_batch(
570        self, functions: dict[str, list[tuple[str, list[Any]]]]
571    ) -> dict[str, str]:
572        """
573        Executes batch queries on a substrate and returns results in a dictionary format.
574
575        Args:
576            substrate: An instance of SubstrateInterface to interact with the substrate.
577            functions (dict[str, list[query_call]]): A dictionary mapping module names to lists of query calls (function name and parameters).
578
579        Returns:
580            A dictionary where keys are storage function names and values are the query results.
581
582        Raises:
583            Exception: If no result is found from the batch queries.
584
585        Example:
586            >>> query_batch(substrate_instance, {'module_name': [('function_name', ['param1', 'param2'])]})
587            {'function_name': 'query_result', ...}
588        """
589
590        result: dict[str, str] = {}
591        if not functions:
592            raise Exception("No result")
593        with self.get_conn(init=True) as substrate:
594            for module, queries in functions.items():
595                storage_keys: list[Any] = []
596                for fn, params in queries:
597                    storage_function = substrate.create_storage_key(  # type: ignore
598                        pallet=module, storage_function=fn, params=params
599                    )
600                    storage_keys.append(storage_function)
601
602                block_hash = substrate.get_block_hash()
603                responses: list[Any] = substrate.query_multi(  # type: ignore
604                    storage_keys=storage_keys, block_hash=block_hash
605                )
606
607                for item in responses:
608                    fun = item[0]
609                    query = item[1]
610                    storage_fun = fun.storage_function
611                    result[storage_fun] = query.value
612
613        return result

Executes batch queries on a substrate and returns results in a dictionary format.

Arguments:
  • substrate: An instance of SubstrateInterface to interact with the substrate.
  • functions (dict[str, list[query_call]]): A dictionary mapping module names to lists of query calls (function name and parameters).
Returns:

A dictionary where keys are storage function names and values are the query results.

Raises:
  • Exception: If no result is found from the batch queries.
Example:
>>> query_batch(substrate_instance, {'module_name': [('function_name', ['param1', 'param2'])]})
{'function_name': 'query_result', ...}
def query_batch_map( self, functions: dict[str, list[tuple[str, list[typing.Any]]]], block_hash: str | None = None) -> dict[str, dict[typing.Any, typing.Any]]:
615    def query_batch_map(
616        self,
617        functions: dict[str, list[tuple[str, list[Any]]]],
618        block_hash: str | None = None,
619    ) -> dict[str, dict[Any, Any]]:
620        """
621        Queries multiple storage functions using a map batch approach and returns the combined result.
622
623        Args:
624            substrate: An instance of SubstrateInterface for substrate interaction.
625            functions (dict[str, list[query_call]]): A dictionary mapping module names to lists of query calls.
626
627        Returns:
628            The combined result of the map batch query.
629
630        Example:
631            >>> query_batch_map(substrate_instance, {'module_name': [('function_name', ['param1', 'param2'])]})
632            # Returns the combined result of the map batch query
633        """
634        multi_result: dict[str, dict[Any, Any]] = {}
635
636        def recursive_update(
637            d: dict[str, dict[T1, T2] | dict[str, Any]],
638            u: Mapping[str, dict[Any, Any] | str],
639        ) -> dict[str, dict[T1, T2]]:
640            for k, v in u.items():
641                if isinstance(v, dict):
642                    d[k] = recursive_update(d.get(k, {}), v)  # type: ignore
643                else:
644                    d[k] = v  # type: ignore
645            return d  # type: ignore
646
647        def get_page():
648            send, prefix_list = self._get_storage_keys(storage, queries, block_hash)
649            with self.get_conn(init=True) as substrate:
650                function_parameters = self._get_lists(storage, queries, substrate)
651            responses = self._rpc_request_batch(send)
652            # assumption because send is just the storage_function keys
653            # so it should always be really small regardless of the amount of queries
654            assert len(responses) == 1
655            res = responses[0]
656            built_payload: list[tuple[str, list[Any]]] = []
657            for result_keys in res:
658                built_payload.append(
659                    ("state_queryStorageAt", [result_keys, block_hash])
660                )
661            _, chunks_info = self._make_request_smaller(
662                built_payload, prefix_list, function_parameters
663            )
664            chunks_response, chunks_info = self._rpc_request_batch_chunked(chunks_info)
665            return chunks_response, chunks_info
666
667        if not block_hash:
668            with self.get_conn(init=True) as substrate:
669                block_hash = substrate.get_block_hash()
670        for storage, queries in functions.items():
671            chunks, chunks_info = get_page()
672            # if this doesn't happen something is wrong on the code
673            # and we won't be able to decode the data properly
674            assert len(chunks) == len(chunks_info)
675            for chunk_info, response in zip(chunks_info, chunks):
676                storage_result = self._decode_response(
677                    response, chunk_info.fun_params, chunk_info.prefix_list, block_hash
678                )
679                multi_result = recursive_update(multi_result, storage_result)
680
681        return multi_result

Queries multiple storage functions using a map batch approach and returns the combined result.

Arguments:
  • substrate: An instance of SubstrateInterface for substrate interaction.
  • functions (dict[str, list[query_call]]): A dictionary mapping module names to lists of query calls.
Returns:

The combined result of the map batch query.

Example:
>>> query_batch_map(substrate_instance, {'module_name': [('function_name', ['param1', 'param2'])]})
<h1 id="returns-the-combined-result-of-the-map-batch-query">Returns the combined result of the map batch query</h1>
def query( self, name: str, params: list[typing.Any] = [], module: str = 'SubspaceModule', block_hash: str | None = None) -> Any:
683    def query(
684        self,
685        name: str,
686        params: list[Any] = [],
687        module: str = "SubspaceModule",
688        block_hash: str | None = None,
689    ) -> Any:
690        """
691        Queries a storage function on the network.
692
693        Sends a query to the network and retrieves data from a
694        specified storage function.
695
696        Args:
697            name: The name of the storage function to query.
698            params: The parameters to pass to the storage function.
699            module: The module where the storage function is located.
700
701        Returns:
702            The result of the query from the network.
703
704        Raises:
705            NetworkQueryError: If the query fails or is invalid.
706        """
707
708        result = self.query_batch({module: [(name, params)]})
709
710        return result[name]

Queries a storage function on the network.

Sends a query to the network and retrieves data from a specified storage function.

Arguments:
  • name: The name of the storage function to query.
  • params: The parameters to pass to the storage function.
  • module: The module where the storage function is located.
Returns:

The result of the query from the network.

Raises:
  • NetworkQueryError: If the query fails or is invalid.
def query_map( self, name: str, params: list[typing.Any] = [], module: str = 'SubspaceModule', extract_value: bool = True) -> dict[typing.Any, typing.Any]:
712    def query_map(
713        self,
714        name: str,
715        params: list[Any] = [],
716        module: str = "SubspaceModule",
717        extract_value: bool = True,
718    ) -> dict[Any, Any]:
719        """
720        Queries a storage map from a network node.
721
722        Args:
723            name: The name of the storage map to query.
724            params: A list of parameters for the query.
725            module: The module in which the storage map is located.
726
727        Returns:
728            A dictionary representing the key-value pairs
729              retrieved from the storage map.
730
731        Raises:
732            QueryError: If the query to the network fails or is invalid.
733        """
734
735        result = self.query_batch_map({module: [(name, params)]})
736
737        if extract_value:
738            return {k.value: v.value for k, v in result}  # type: ignore
739
740        return result

Queries a storage map from a network node.

Arguments:
  • name: The name of the storage map to query.
  • params: A list of parameters for the query.
  • module: The module in which the storage map is located.
Returns:

A dictionary representing the key-value pairs retrieved from the storage map.

Raises:
  • QueryError: If the query to the network fails or is invalid.
def compose_call( self, fn: str, params: dict[str, typing.Any], key: substrateinterface.keypair.Keypair | None, module: str = 'SubspaceModule', wait_for_inclusion: bool = True, wait_for_finalization: bool | None = None, sudo: bool = False, unsigned: bool = False) -> substrateinterface.base.ExtrinsicReceipt:
742    def compose_call(
743        self,
744        fn: str,
745        params: dict[str, Any],
746        key: Keypair | None,
747        module: str = "SubspaceModule",
748        wait_for_inclusion: bool = True,
749        wait_for_finalization: bool | None = None,
750        sudo: bool = False,
751        unsigned: bool = False,
752    ) -> ExtrinsicReceipt:
753        """
754        Composes and submits a call to the network node.
755
756        Composes and signs a call with the provided keypair, and submits it to
757        the network. The call can be a standard extrinsic or a sudo extrinsic if
758        elevated permissions are required. The method can optionally wait for
759        the call's inclusion in a block and/or its finalization.
760
761        Args:
762            fn: The function name to call on the network.
763            params: A dictionary of parameters for the call.
764            key: The keypair for signing the extrinsic.
765            module: The module containing the function.
766            wait_for_inclusion: Wait for the call's inclusion in a block.
767            wait_for_finalization: Wait for the transaction's finalization.
768            sudo: Execute the call as a sudo (superuser) operation.
769
770        Returns:
771            The receipt of the submitted extrinsic, if
772              `wait_for_inclusion` is True. Otherwise, returns a string
773              identifier of the extrinsic.
774
775        Raises:
776            ChainTransactionError: If the transaction fails.
777        """
778
779        if key is None and not unsigned:
780            raise ValueError("Key must be provided for signed extrinsics.")
781
782        with self.get_conn() as substrate:
783            if wait_for_finalization is None:
784                wait_for_finalization = self.wait_for_finalization
785
786            call = substrate.compose_call(  # type: ignore
787                call_module=module, call_function=fn, call_params=params
788            )
789            if sudo:
790                call = substrate.compose_call(  # type: ignore
791                    call_module="Sudo",
792                    call_function="sudo",
793                    call_params={
794                        "call": call.value,  # type: ignore
795                    },
796                )
797
798            if not unsigned:
799                extrinsic = substrate.create_signed_extrinsic(  # type: ignore
800                    call=call, keypair=key  # type: ignore
801                )  # type: ignore
802            else:
803                extrinsic = substrate.create_unsigned_extrinsic(call=call)  # type: ignore
804
805            response = substrate.submit_extrinsic(
806                extrinsic=extrinsic,
807                wait_for_inclusion=wait_for_inclusion,
808                wait_for_finalization=wait_for_finalization,
809            )
810        if wait_for_inclusion:
811            if not response.is_success:
812                raise ChainTransactionError(
813                    response.error_message, response  # type: ignore
814                )
815
816        return response

Composes and submits a call to the network node.

Composes and signs a call with the provided keypair, and submits it to the network. The call can be a standard extrinsic or a sudo extrinsic if elevated permissions are required. The method can optionally wait for the call's inclusion in a block and/or its finalization.

Arguments:
  • fn: The function name to call on the network.
  • params: A dictionary of parameters for the call.
  • key: The keypair for signing the extrinsic.
  • module: The module containing the function.
  • wait_for_inclusion: Wait for the call's inclusion in a block.
  • wait_for_finalization: Wait for the transaction's finalization.
  • sudo: Execute the call as a sudo (superuser) operation.
Returns:

The receipt of the submitted extrinsic, if wait_for_inclusion is True. Otherwise, returns a string identifier of the extrinsic.

Raises:
  • ChainTransactionError: If the transaction fails.
def compose_call_multisig( self, fn: str, params: dict[str, typing.Any], key: substrateinterface.keypair.Keypair, signatories: list[communex.types.Ss58Address], threshold: int, module: str = 'SubspaceModule', wait_for_inclusion: bool = True, wait_for_finalization: bool | None = None, sudo: bool = False, era: dict[str, int] | None = None) -> substrateinterface.base.ExtrinsicReceipt:
818    def compose_call_multisig(
819        self,
820        fn: str,
821        params: dict[str, Any],
822        key: Keypair,
823        signatories: list[Ss58Address],
824        threshold: int,
825        module: str = "SubspaceModule",
826        wait_for_inclusion: bool = True,
827        wait_for_finalization: bool | None = None,
828        sudo: bool = False,
829        era: dict[str, int] | None = None,
830    ) -> ExtrinsicReceipt:
831        """
832        Composes and submits a multisignature call to the network node.
833
834        This method allows the composition and submission of a call that
835        requires multiple signatures for execution, known as a multisignature
836        call. It supports specifying signatories, a threshold of signatures for
837        the call's execution, and an optional era for the call's mortality. The
838        call can be a standard extrinsic, a sudo extrinsic for elevated
839        permissions, or a multisig extrinsic if multiple signatures are
840        required. Optionally, the method can wait for the call's inclusion in a
841        block and/or its finalization. Make sure to pass all keys,
842        that are part of the multisignature.
843
844        Args:
845            fn: The function name to call on the network. params: A dictionary
846            of parameters for the call. key: The keypair for signing the
847            extrinsic. signatories: List of SS58 addresses of the signatories.
848            Include ALL KEYS that are part of the multisig. threshold: The
849            minimum number of signatories required to execute the extrinsic.
850            module: The module containing the function to call.
851            wait_for_inclusion: Whether to wait for the call's inclusion in a
852            block. wait_for_finalization: Whether to wait for the transaction's
853            finalization. sudo: Execute the call as a sudo (superuser)
854            operation. era: Specifies the call's mortality in terms of blocks in
855            the format
856                {'period': amount_blocks}. If omitted, the extrinsic is
857                immortal.
858
859        Returns:
860            The receipt of the submitted extrinsic if `wait_for_inclusion` is
861            True. Otherwise, returns a string identifier of the extrinsic.
862
863        Raises:
864            ChainTransactionError: If the transaction fails.
865        """
866
867        # getting the call ready
868        with self.get_conn() as substrate:
869            if wait_for_finalization is None:
870                wait_for_finalization = self.wait_for_finalization
871
872            # prepares the `GenericCall` object
873            call = substrate.compose_call(  # type: ignore
874                call_module=module, call_function=fn, call_params=params
875            )
876            if sudo:
877                call = substrate.compose_call(  # type: ignore
878                    call_module="Sudo",
879                    call_function="sudo",
880                    call_params={
881                        "call": call.value,  # type: ignore
882                    },
883                )
884
885            # modify the rpc methods at runtime, to allow for correct payment
886            # fee calculation parity has a bug in this version,
887            # where the method has to be removed
888            rpc_methods = substrate.config.get("rpc_methods")  # type: ignore
889
890            if "state_call" in rpc_methods:  # type: ignore
891                rpc_methods.remove("state_call")  # type: ignore
892
893            # create the multisig account
894            multisig_acc = substrate.generate_multisig_account(  # type: ignore
895                signatories, threshold
896            )
897
898            # send the multisig extrinsic
899            extrinsic = substrate.create_multisig_extrinsic(  # type: ignore
900                call=call,  # type: ignore
901                keypair=key,
902                multisig_account=multisig_acc,  # type: ignore
903                era=era,  # type: ignore
904            )  # type: ignore
905
906            response = substrate.submit_extrinsic(
907                extrinsic=extrinsic,
908                wait_for_inclusion=wait_for_inclusion,
909                wait_for_finalization=wait_for_finalization,
910            )
911
912        if wait_for_inclusion:
913            if not response.is_success:
914                raise ChainTransactionError(
915                    response.error_message, response  # type: ignore
916                )
917
918        return response

Composes and submits a multisignature call to the network node.

This method allows the composition and submission of a call that requires multiple signatures for execution, known as a multisignature call. It supports specifying signatories, a threshold of signatures for the call's execution, and an optional era for the call's mortality. The call can be a standard extrinsic, a sudo extrinsic for elevated permissions, or a multisig extrinsic if multiple signatures are required. Optionally, the method can wait for the call's inclusion in a block and/or its finalization. Make sure to pass all keys, that are part of the multisignature.

Arguments:
  • fn: The function name to call on the network. params: A dictionary
  • of parameters for the call. key: The keypair for signing the
  • extrinsic. signatories: List of SS58 addresses of the signatories.
  • Include ALL KEYS that are part of the multisig. threshold: The
  • minimum number of signatories required to execute the extrinsic.
  • module: The module containing the function to call.
  • wait_for_inclusion: Whether to wait for the call's inclusion in a
  • block. wait_for_finalization: Whether to wait for the transaction's
  • finalization. sudo: Execute the call as a sudo (superuser)
  • operation. era: Specifies the call's mortality in terms of blocks in
  • the format {'period': amount_blocks}. If omitted, the extrinsic is immortal.
Returns:

The receipt of the submitted extrinsic if wait_for_inclusion is True. Otherwise, returns a string identifier of the extrinsic.

Raises:
  • ChainTransactionError: If the transaction fails.
def transfer( self, key: substrateinterface.keypair.Keypair, amount: int, dest: communex.types.Ss58Address) -> substrateinterface.base.ExtrinsicReceipt:
920    def transfer(
921        self,
922        key: Keypair,
923        amount: int,
924        dest: Ss58Address,
925    ) -> ExtrinsicReceipt:
926        """
927        Transfers a specified amount of tokens from the signer's account to the
928        specified account.
929
930        Args:
931            key: The keypair associated with the sender's account.
932            amount: The amount to transfer, in nanotokens.
933            dest: The SS58 address of the recipient.
934
935        Returns:
936            A receipt of the transaction.
937
938        Raises:
939            InsufficientBalanceError: If the sender's account does not have
940              enough balance.
941            ChainTransactionError: If the transaction fails.
942        """
943
944        params = {"dest": dest, "value": amount}
945
946        return self.compose_call(
947            module="Balances", fn="transfer_keep_alive", params=params, key=key
948        )

Transfers a specified amount of tokens from the signer's account to the specified account.

Arguments:
  • key: The keypair associated with the sender's account.
  • amount: The amount to transfer, in nanotokens.
  • dest: The SS58 address of the recipient.
Returns:

A receipt of the transaction.

Raises:
  • InsufficientBalanceError: If the sender's account does not have enough balance.
  • ChainTransactionError: If the transaction fails.
def transfer_multiple( self, key: substrateinterface.keypair.Keypair, destinations: list[communex.types.Ss58Address], amounts: list[int], netuid: str | int = 0) -> substrateinterface.base.ExtrinsicReceipt:
950    def transfer_multiple(
951        self,
952        key: Keypair,
953        destinations: list[Ss58Address],
954        amounts: list[int],
955        netuid: str | int = 0,
956    ) -> ExtrinsicReceipt:
957        """
958        Transfers specified amounts of tokens from the signer's account to
959        multiple target accounts.
960
961        The `destinations` and `amounts` lists must be of the same length.
962
963        Args:
964            key: The keypair associated with the sender's account.
965            destinations: A list of SS58 addresses of the recipients.
966            amounts: Amount to transfer to each recipient, in nanotokens.
967            netuid: The network identifier.
968
969        Returns:
970            A receipt of the transaction.
971
972        Raises:
973            InsufficientBalanceError: If the sender's account does not have
974              enough balance for all transfers.
975            ChainTransactionError: If the transaction fails.
976        """
977
978        assert len(destinations) == len(amounts)
979
980        # extract existential deposit from amounts
981        existential_deposit = self.get_existential_deposit()
982        amounts = [a - existential_deposit for a in amounts]
983
984        params = {
985            "netuid": netuid,
986            "destinations": destinations,
987            "amounts": amounts,
988        }
989
990        return self.compose_call(
991            module="SubspaceModule", fn="transfer_multiple", params=params, key=key
992        )

Transfers specified amounts of tokens from the signer's account to multiple target accounts.

The destinations and amounts lists must be of the same length.

Arguments:
  • key: The keypair associated with the sender's account.
  • destinations: A list of SS58 addresses of the recipients.
  • amounts: Amount to transfer to each recipient, in nanotokens.
  • netuid: The network identifier.
Returns:

A receipt of the transaction.

Raises:
  • InsufficientBalanceError: If the sender's account does not have enough balance for all transfers.
  • ChainTransactionError: If the transaction fails.
def stake( self, key: substrateinterface.keypair.Keypair, amount: int, dest: communex.types.Ss58Address) -> substrateinterface.base.ExtrinsicReceipt:
 994    def stake(
 995        self,
 996        key: Keypair,
 997        amount: int,
 998        dest: Ss58Address,
 999    ) -> ExtrinsicReceipt:
1000        """
1001        Stakes the specified amount of tokens to a module key address.
1002
1003        Args:
1004            key: The keypair associated with the staker's account.
1005            amount: The amount of tokens to stake, in nanotokens.
1006            dest: The SS58 address of the module key to stake to.
1007            netuid: The network identifier.
1008
1009        Returns:
1010            A receipt of the staking transaction.
1011
1012        Raises:
1013            InsufficientBalanceError: If the staker's account does not have
1014              enough balance.
1015            ChainTransactionError: If the transaction fails.
1016        """
1017
1018        params = {"amount": amount, "module_key": dest}
1019
1020        return self.compose_call(fn="add_stake", params=params, key=key)

Stakes the specified amount of tokens to a module key address.

Arguments:
  • key: The keypair associated with the staker's account.
  • amount: The amount of tokens to stake, in nanotokens.
  • dest: The SS58 address of the module key to stake to.
  • netuid: The network identifier.
Returns:

A receipt of the staking transaction.

Raises:
  • InsufficientBalanceError: If the staker's account does not have enough balance.
  • ChainTransactionError: If the transaction fails.
def unstake( self, key: substrateinterface.keypair.Keypair, amount: int, dest: communex.types.Ss58Address) -> substrateinterface.base.ExtrinsicReceipt:
1022    def unstake(
1023        self,
1024        key: Keypair,
1025        amount: int,
1026        dest: Ss58Address,
1027    ) -> ExtrinsicReceipt:
1028        """
1029        Unstakes the specified amount of tokens from a module key address.
1030
1031        Args:
1032            key: The keypair associated with the unstaker's account.
1033            amount: The amount of tokens to unstake, in nanotokens.
1034            dest: The SS58 address of the module key to unstake from.
1035            netuid: The network identifier.
1036
1037        Returns:
1038            A receipt of the unstaking transaction.
1039
1040        Raises:
1041            InsufficientStakeError: If the staked key does not have enough
1042              staked tokens by the signer key.
1043            ChainTransactionError: If the transaction fails.
1044        """
1045
1046        params = {"amount": amount, "module_key": dest}
1047        return self.compose_call(fn="remove_stake", params=params, key=key)

Unstakes the specified amount of tokens from a module key address.

Arguments:
  • key: The keypair associated with the unstaker's account.
  • amount: The amount of tokens to unstake, in nanotokens.
  • dest: The SS58 address of the module key to unstake from.
  • netuid: The network identifier.
Returns:

A receipt of the unstaking transaction.

Raises:
  • InsufficientStakeError: If the staked key does not have enough staked tokens by the signer key.
  • ChainTransactionError: If the transaction fails.
def update_module( self, key: substrateinterface.keypair.Keypair, name: str, address: str, metadata: str | None = None, delegation_fee: int = 20, netuid: int = 0) -> substrateinterface.base.ExtrinsicReceipt:
1049    def update_module(
1050        self,
1051        key: Keypair,
1052        name: str,
1053        address: str,
1054        metadata: str | None = None,
1055        delegation_fee: int = 20,
1056        netuid: int = 0,
1057    ) -> ExtrinsicReceipt:
1058        """
1059        Updates the parameters of a registered module.
1060
1061        The delegation fee must be an integer between 0 and 100.
1062
1063        Args:
1064            key: The keypair associated with the module's account.
1065            name: The new name for the module. If None, the name is not updated.
1066            address: The new address for the module.
1067                If None, the address is not updated.
1068            delegation_fee: The new delegation fee for the module,
1069                between 0 and 100.
1070            netuid: The network identifier.
1071
1072        Returns:
1073            A receipt of the module update transaction.
1074
1075        Raises:
1076            InvalidParameterError: If the provided parameters are invalid.
1077            ChainTransactionError: If the transaction fails.
1078        """
1079
1080        assert isinstance(delegation_fee, int)
1081
1082        params = {
1083            "netuid": netuid,
1084            "name": name,
1085            "address": address,
1086            "delegation_fee": delegation_fee,
1087            "metadata": metadata,
1088        }
1089
1090        response = self.compose_call("update_module", params=params, key=key)
1091
1092        return response

Updates the parameters of a registered module.

The delegation fee must be an integer between 0 and 100.

Arguments:
  • key: The keypair associated with the module's account.
  • name: The new name for the module. If None, the name is not updated.
  • address: The new address for the module. If None, the address is not updated.
  • delegation_fee: The new delegation fee for the module, between 0 and 100.
  • netuid: The network identifier.
Returns:

A receipt of the module update transaction.

Raises:
  • InvalidParameterError: If the provided parameters are invalid.
  • ChainTransactionError: If the transaction fails.
def register_module( self, key: substrateinterface.keypair.Keypair, name: str, address: str | None = None, subnet: str = 'Rootnet', metadata: str | None = None) -> substrateinterface.base.ExtrinsicReceipt:
1094    def register_module(
1095        self,
1096        key: Keypair,
1097        name: str,
1098        address: str | None = None,
1099        subnet: str = "Rootnet",
1100        metadata: str | None = None,
1101    ) -> ExtrinsicReceipt:
1102        """
1103        Registers a new module in the network.
1104
1105        Args:
1106            key: The keypair used for registering the module.
1107            name: The name of the module. If None, a default or previously
1108                set name is used. # How does this work?
1109            address: The address of the module. If None, a default or
1110                previously set address is used. # How does this work?
1111            subnet: The network subnet to register the module in.
1112            min_stake: The minimum stake required for the module, in nanotokens.
1113                If None, a default value is used.
1114
1115        Returns:
1116            A receipt of the registration transaction.
1117
1118        Raises:
1119            InvalidParameterError: If the provided parameters are invalid.
1120            ChainTransactionError: If the transaction fails.
1121        """
1122
1123        key_addr = key.ss58_address
1124
1125        params = {
1126            "network": subnet,
1127            "address": address,
1128            "name": name,
1129            "module_key": key_addr,
1130            "metadata": metadata,
1131        }
1132
1133        response = self.compose_call("register", params=params, key=key)
1134        return response

Registers a new module in the network.

Arguments:
  • key: The keypair used for registering the module.
  • name: The name of the module. If None, a default or previously set name is used. # How does this work?
  • address: The address of the module. If None, a default or previously set address is used. # How does this work?
  • subnet: The network subnet to register the module in.
  • min_stake: The minimum stake required for the module, in nanotokens. If None, a default value is used.
Returns:

A receipt of the registration transaction.

Raises:
  • InvalidParameterError: If the provided parameters are invalid.
  • ChainTransactionError: If the transaction fails.
def vote( self, key: substrateinterface.keypair.Keypair, uids: list[int], weights: list[int], netuid: int = 0) -> substrateinterface.base.ExtrinsicReceipt:
1136    def vote(
1137        self,
1138        key: Keypair,
1139        uids: list[int],
1140        weights: list[int],
1141        netuid: int = 0,
1142    ) -> ExtrinsicReceipt:
1143        """
1144        Casts votes on a list of module UIDs with corresponding weights.
1145
1146        The length of the UIDs list and the weights list should be the same.
1147        Each weight corresponds to the UID at the same index.
1148
1149        Args:
1150            key: The keypair used for signing the vote transaction.
1151            uids: A list of module UIDs to vote on.
1152            weights: A list of weights corresponding to each UID.
1153            netuid: The network identifier.
1154
1155        Returns:
1156            A receipt of the voting transaction.
1157
1158        Raises:
1159            InvalidParameterError: If the lengths of UIDs and weights lists
1160                do not match.
1161            ChainTransactionError: If the transaction fails.
1162        """
1163
1164        assert len(uids) == len(weights)
1165
1166        params = {
1167            "uids": uids,
1168            "weights": weights,
1169            "netuid": netuid,
1170        }
1171
1172        response = self.compose_call("set_weights", params=params, key=key)
1173
1174        return response

Casts votes on a list of module UIDs with corresponding weights.

The length of the UIDs list and the weights list should be the same. Each weight corresponds to the UID at the same index.

Arguments:
  • key: The keypair used for signing the vote transaction.
  • uids: A list of module UIDs to vote on.
  • weights: A list of weights corresponding to each UID.
  • netuid: The network identifier.
Returns:

A receipt of the voting transaction.

Raises:
  • InvalidParameterError: If the lengths of UIDs and weights lists do not match.
  • ChainTransactionError: If the transaction fails.
def update_subnet( self, key: substrateinterface.keypair.Keypair, params: communex.types.SubnetParams, netuid: int = 0) -> substrateinterface.base.ExtrinsicReceipt:
1176    def update_subnet(
1177        self,
1178        key: Keypair,
1179        params: SubnetParams,
1180        netuid: int = 0,
1181    ) -> ExtrinsicReceipt:
1182        """
1183        Update a subnet's configuration.
1184
1185        It requires the founder key for authorization.
1186
1187        Args:
1188            key: The founder keypair of the subnet.
1189            params: The new parameters for the subnet.
1190            netuid: The network identifier.
1191
1192        Returns:
1193            A receipt of the subnet update transaction.
1194
1195        Raises:
1196            AuthorizationError: If the key is not authorized.
1197            ChainTransactionError: If the transaction fails.
1198        """
1199
1200        general_params = dict(params)
1201        general_params["netuid"] = netuid
1202
1203        response = self.compose_call(
1204            fn="update_subnet",
1205            params=general_params,
1206            key=key,
1207        )
1208
1209        return response

Update a subnet's configuration.

It requires the founder key for authorization.

Arguments:
  • key: The founder keypair of the subnet.
  • params: The new parameters for the subnet.
  • netuid: The network identifier.
Returns:

A receipt of the subnet update transaction.

Raises:
  • AuthorizationError: If the key is not authorized.
  • ChainTransactionError: If the transaction fails.
def transfer_stake( self, key: substrateinterface.keypair.Keypair, amount: int, from_module_key: communex.types.Ss58Address, dest_module_address: communex.types.Ss58Address) -> substrateinterface.base.ExtrinsicReceipt:
1211    def transfer_stake(
1212        self,
1213        key: Keypair,
1214        amount: int,
1215        from_module_key: Ss58Address,
1216        dest_module_address: Ss58Address,
1217    ) -> ExtrinsicReceipt:
1218        """
1219        Realocate staked tokens from one staked module to another module.
1220
1221        Args:
1222            key: The keypair associated with the account that is delegating the tokens.
1223            amount: The amount of staked tokens to transfer, in nanotokens.
1224            from_module_key: The SS58 address of the module you want to transfer from (currently delegated by the key).
1225            dest_module_address: The SS58 address of the destination (newly delegated key).
1226            netuid: The network identifier.
1227
1228        Returns:
1229            A receipt of the stake transfer transaction.
1230
1231        Raises:
1232            InsufficientStakeError: If the source module key does not have
1233            enough staked tokens. ChainTransactionError: If the transaction
1234            fails.
1235        """
1236
1237        amount = amount - self.get_existential_deposit()
1238
1239        params = {
1240            "amount": amount,
1241            "module_key": from_module_key,
1242            "new_module_key": dest_module_address,
1243        }
1244
1245        response = self.compose_call("transfer_stake", key=key, params=params)
1246
1247        return response

Realocate staked tokens from one staked module to another module.

Arguments:
  • key: The keypair associated with the account that is delegating the tokens.
  • amount: The amount of staked tokens to transfer, in nanotokens.
  • from_module_key: The SS58 address of the module you want to transfer from (currently delegated by the key).
  • dest_module_address: The SS58 address of the destination (newly delegated key).
  • netuid: The network identifier.
Returns:

A receipt of the stake transfer transaction.

Raises:
  • InsufficientStakeError: If the source module key does not have
  • enough staked tokens. ChainTransactionError: If the transaction
  • fails.
def multiunstake( self, key: substrateinterface.keypair.Keypair, keys: list[communex.types.Ss58Address], amounts: list[int]) -> substrateinterface.base.ExtrinsicReceipt:
1249    def multiunstake(
1250        self,
1251        key: Keypair,
1252        keys: list[Ss58Address],
1253        amounts: list[int],
1254    ) -> ExtrinsicReceipt:
1255        """
1256        Unstakes tokens from multiple module keys.
1257
1258        And the lists `keys` and `amounts` must be of the same length. Each
1259        amount corresponds to the module key at the same index.
1260
1261        Args:
1262            key: The keypair associated with the unstaker's account.
1263            keys: A list of SS58 addresses of the module keys to unstake from.
1264            amounts: A list of amounts to unstake from each module key,
1265              in nanotokens.
1266            netuid: The network identifier.
1267
1268        Returns:
1269            A receipt of the multi-unstaking transaction.
1270
1271        Raises:
1272            MismatchedLengthError: If the lengths of keys and amounts lists do
1273            not match. InsufficientStakeError: If any of the module keys do not
1274            have enough staked tokens. ChainTransactionError: If the transaction
1275            fails.
1276        """
1277
1278        assert len(keys) == len(amounts)
1279
1280        params = {"module_keys": keys, "amounts": amounts}
1281
1282        response = self.compose_call("remove_stake_multiple", params=params, key=key)
1283
1284        return response

Unstakes tokens from multiple module keys.

And the lists keys and amounts must be of the same length. Each amount corresponds to the module key at the same index.

Arguments:
  • key: The keypair associated with the unstaker's account.
  • keys: A list of SS58 addresses of the module keys to unstake from.
  • amounts: A list of amounts to unstake from each module key, in nanotokens.
  • netuid: The network identifier.
Returns:

A receipt of the multi-unstaking transaction.

Raises:
  • MismatchedLengthError: If the lengths of keys and amounts lists do
  • not match. InsufficientStakeError: If any of the module keys do not
  • have enough staked tokens. ChainTransactionError: If the transaction
  • fails.
def multistake( self, key: substrateinterface.keypair.Keypair, keys: list[communex.types.Ss58Address], amounts: list[int]) -> substrateinterface.base.ExtrinsicReceipt:
1286    def multistake(
1287        self,
1288        key: Keypair,
1289        keys: list[Ss58Address],
1290        amounts: list[int],
1291    ) -> ExtrinsicReceipt:
1292        """
1293        Stakes tokens to multiple module keys.
1294
1295        The lengths of the `keys` and `amounts` lists must be the same. Each
1296        amount corresponds to the module key at the same index.
1297
1298        Args:
1299            key: The keypair associated with the staker's account.
1300            keys: A list of SS58 addresses of the module keys to stake to.
1301            amounts: A list of amounts to stake to each module key,
1302                in nanotokens.
1303            netuid: The network identifier.
1304
1305        Returns:
1306            A receipt of the multi-staking transaction.
1307
1308        Raises:
1309            MismatchedLengthError: If the lengths of keys and amounts lists
1310                do not match.
1311            ChainTransactionError: If the transaction fails.
1312        """
1313
1314        assert len(keys) == len(amounts)
1315
1316        params = {
1317            "module_keys": keys,
1318            "amounts": amounts,
1319        }
1320
1321        response = self.compose_call("add_stake_multiple", params=params, key=key)
1322
1323        return response

Stakes tokens to multiple module keys.

The lengths of the keys and amounts lists must be the same. Each amount corresponds to the module key at the same index.

Arguments:
  • key: The keypair associated with the staker's account.
  • keys: A list of SS58 addresses of the module keys to stake to.
  • amounts: A list of amounts to stake to each module key, in nanotokens.
  • netuid: The network identifier.
Returns:

A receipt of the multi-staking transaction.

Raises:
  • MismatchedLengthError: If the lengths of keys and amounts lists do not match.
  • ChainTransactionError: If the transaction fails.
def add_profit_shares( self, key: substrateinterface.keypair.Keypair, keys: list[communex.types.Ss58Address], shares: list[int]) -> substrateinterface.base.ExtrinsicReceipt:
1325    def add_profit_shares(
1326        self,
1327        key: Keypair,
1328        keys: list[Ss58Address],
1329        shares: list[int],
1330    ) -> ExtrinsicReceipt:
1331        """
1332        Allocates profit shares to multiple keys.
1333
1334        The lists `keys` and `shares` must be of the same length,
1335        with each share amount corresponding to the key at the same index.
1336
1337        Args:
1338            key: The keypair associated with the account
1339                distributing the shares.
1340            keys: A list of SS58 addresses to allocate shares to.
1341            shares: A list of share amounts to allocate to each key,
1342                in nanotokens.
1343
1344        Returns:
1345            A receipt of the profit sharing transaction.
1346
1347        Raises:
1348            MismatchedLengthError: If the lengths of keys and shares
1349                lists do not match.
1350            ChainTransactionError: If the transaction fails.
1351        """
1352
1353        assert len(keys) == len(shares)
1354
1355        params = {"keys": keys, "shares": shares}
1356
1357        response = self.compose_call("add_profit_shares", params=params, key=key)
1358
1359        return response

Allocates profit shares to multiple keys.

The lists keys and shares must be of the same length, with each share amount corresponding to the key at the same index.

Arguments:
  • key: The keypair associated with the account distributing the shares.
  • keys: A list of SS58 addresses to allocate shares to.
  • shares: A list of share amounts to allocate to each key, in nanotokens.
Returns:

A receipt of the profit sharing transaction.

Raises:
  • MismatchedLengthError: If the lengths of keys and shares lists do not match.
  • ChainTransactionError: If the transaction fails.
def add_subnet_proposal( self, key: substrateinterface.keypair.Keypair, params: dict[str, typing.Any], ipfs: str, netuid: int = 0) -> substrateinterface.base.ExtrinsicReceipt:
1361    def add_subnet_proposal(
1362        self, key: Keypair,
1363        params: dict[str, Any],
1364        ipfs: str,
1365        netuid: int = 0
1366    ) -> ExtrinsicReceipt:
1367        """
1368        Submits a proposal for creating or modifying a subnet within the
1369        network.
1370
1371        The proposal includes various parameters like the name, founder, share
1372        allocations, and other subnet-specific settings.
1373
1374        Args:
1375            key: The keypair used for signing the proposal transaction.
1376            params: The parameters for the subnet proposal.
1377            netuid: The network identifier.
1378
1379        Returns:
1380            A receipt of the subnet proposal transaction.
1381
1382        Raises:
1383            InvalidParameterError: If the provided subnet
1384                parameters are invalid.
1385            ChainTransactionError: If the transaction fails.
1386        """
1387
1388        general_params = dict(params)
1389        general_params["subnet_id"] = netuid
1390        general_params["data"] = ipfs
1391        # breakpoint()
1392        # general_params["burn_config"] = json.dumps(general_params["burn_config"])
1393        response = self.compose_call(
1394            fn="add_subnet_params_proposal",
1395            params=general_params,
1396            key=key,
1397            module="GovernanceModule",
1398        )
1399
1400        return response

Submits a proposal for creating or modifying a subnet within the network.

The proposal includes various parameters like the name, founder, share allocations, and other subnet-specific settings.

Arguments:
  • key: The keypair used for signing the proposal transaction.
  • params: The parameters for the subnet proposal.
  • netuid: The network identifier.
Returns:

A receipt of the subnet proposal transaction.

Raises:
  • InvalidParameterError: If the provided subnet parameters are invalid.
  • ChainTransactionError: If the transaction fails.
def add_custom_proposal( self, key: substrateinterface.keypair.Keypair, cid: str) -> substrateinterface.base.ExtrinsicReceipt:
1402    def add_custom_proposal(
1403        self,
1404        key: Keypair,
1405        cid: str,
1406    ) -> ExtrinsicReceipt:
1407
1408        params = {"data": cid}
1409
1410        response = self.compose_call(
1411            fn="add_global_custom_proposal",
1412            params=params,
1413            key=key,
1414            module="GovernanceModule",
1415        )
1416        return response
def add_custom_subnet_proposal( self, key: substrateinterface.keypair.Keypair, cid: str, netuid: int = 0) -> substrateinterface.base.ExtrinsicReceipt:
1418    def add_custom_subnet_proposal(
1419        self,
1420        key: Keypair,
1421        cid: str,
1422        netuid: int = 0,
1423    ) -> ExtrinsicReceipt:
1424        """
1425        Submits a proposal for creating or modifying a custom subnet within the
1426        network.
1427
1428        The proposal includes various parameters like the name, founder, share
1429        allocations, and other subnet-specific settings.
1430
1431        Args:
1432            key: The keypair used for signing the proposal transaction.
1433            params: The parameters for the subnet proposal.
1434            netuid: The network identifier.
1435
1436        Returns:
1437            A receipt of the subnet proposal transaction.
1438        """
1439
1440        params = {
1441            "data": cid,
1442            "subnet_id": netuid,
1443        }
1444
1445        response = self.compose_call(
1446            fn="add_subnet_custom_proposal",
1447            params=params,
1448            key=key,
1449            module="GovernanceModule",
1450        )
1451
1452        return response

Submits a proposal for creating or modifying a custom subnet within the network.

The proposal includes various parameters like the name, founder, share allocations, and other subnet-specific settings.

Arguments:
  • key: The keypair used for signing the proposal transaction.
  • params: The parameters for the subnet proposal.
  • netuid: The network identifier.
Returns:

A receipt of the subnet proposal transaction.

def add_global_proposal( self, key: substrateinterface.keypair.Keypair, params: communex.types.NetworkParams, cid: str | None) -> substrateinterface.base.ExtrinsicReceipt:
1454    def add_global_proposal(
1455        self,
1456        key: Keypair,
1457        params: NetworkParams,
1458        cid: str | None,
1459    ) -> ExtrinsicReceipt:
1460        """
1461        Submits a proposal for altering the global network parameters.
1462
1463        Allows for the submission of a proposal to
1464        change various global parameters
1465        of the network, such as emission rates, rate limits, and voting
1466        thresholds. It is used to
1467        suggest changes that affect the entire network's operation.
1468
1469        Args:
1470            key: The keypair used for signing the proposal transaction.
1471            params: A dictionary containing global network parameters
1472                    like maximum allowed subnets, modules,
1473                    transaction rate limits, and others.
1474
1475        Returns:
1476            A receipt of the global proposal transaction.
1477
1478        Raises:
1479            InvalidParameterError: If the provided network
1480                parameters are invalid.
1481            ChainTransactionError: If the transaction fails.
1482        """
1483        general_params = cast(dict[str, Any], params)
1484        cid = cid or ""
1485        general_params["data"] = cid
1486
1487        response = self.compose_call(
1488            fn="add_global_params_proposal",
1489            params=general_params,
1490            key=key,
1491            module="GovernanceModule",
1492        )
1493
1494        return response

Submits a proposal for altering the global network parameters.

Allows for the submission of a proposal to change various global parameters of the network, such as emission rates, rate limits, and voting thresholds. It is used to suggest changes that affect the entire network's operation.

Arguments:
  • key: The keypair used for signing the proposal transaction.
  • params: A dictionary containing global network parameters like maximum allowed subnets, modules, transaction rate limits, and others.
Returns:

A receipt of the global proposal transaction.

Raises:
  • InvalidParameterError: If the provided network parameters are invalid.
  • ChainTransactionError: If the transaction fails.
def vote_on_proposal( self, key: substrateinterface.keypair.Keypair, proposal_id: int, agree: bool) -> substrateinterface.base.ExtrinsicReceipt:
1496    def vote_on_proposal(
1497        self,
1498        key: Keypair,
1499        proposal_id: int,
1500        agree: bool,
1501    ) -> ExtrinsicReceipt:
1502        """
1503        Casts a vote on a specified proposal within the network.
1504
1505        Args:
1506            key: The keypair used for signing the vote transaction.
1507            proposal_id: The unique identifier of the proposal to vote on.
1508
1509        Returns:
1510            A receipt of the voting transaction in nanotokens.
1511
1512        Raises:
1513            InvalidProposalIDError: If the provided proposal ID does not
1514                exist or is invalid.
1515            ChainTransactionError: If the transaction fails.
1516        """
1517
1518        params = {"proposal_id": proposal_id, "agree": agree}
1519
1520        response = self.compose_call(
1521            "vote_proposal",
1522            key=key,
1523            params=params,
1524            module="GovernanceModule",
1525        )
1526
1527        return response

Casts a vote on a specified proposal within the network.

Arguments:
  • key: The keypair used for signing the vote transaction.
  • proposal_id: The unique identifier of the proposal to vote on.
Returns:

A receipt of the voting transaction in nanotokens.

Raises:
  • InvalidProposalIDError: If the provided proposal ID does not exist or is invalid.
  • ChainTransactionError: If the transaction fails.
def unvote_on_proposal( self, key: substrateinterface.keypair.Keypair, proposal_id: int) -> substrateinterface.base.ExtrinsicReceipt:
1529    def unvote_on_proposal(
1530        self,
1531        key: Keypair,
1532        proposal_id: int,
1533    ) -> ExtrinsicReceipt:
1534        """
1535        Retracts a previously cast vote on a specified proposal.
1536
1537        Args:
1538            key: The keypair used for signing the unvote transaction.
1539            proposal_id: The unique identifier of the proposal to withdraw the
1540                vote from.
1541
1542        Returns:
1543            A receipt of the unvoting transaction in nanotokens.
1544
1545        Raises:
1546            InvalidProposalIDError: If the provided proposal ID does not
1547                exist or is invalid.
1548            ChainTransactionError: If the transaction fails to be processed, or
1549                if there was no prior vote to retract.
1550        """
1551
1552        params = {"proposal_id": proposal_id}
1553
1554        response = self.compose_call(
1555            "remove_vote_proposal",
1556            key=key,
1557            params=params,
1558            module="GovernanceModule",
1559        )
1560
1561        return response

Retracts a previously cast vote on a specified proposal.

Arguments:
  • key: The keypair used for signing the unvote transaction.
  • proposal_id: The unique identifier of the proposal to withdraw the vote from.
Returns:

A receipt of the unvoting transaction in nanotokens.

Raises:
  • InvalidProposalIDError: If the provided proposal ID does not exist or is invalid.
  • ChainTransactionError: If the transaction fails to be processed, or if there was no prior vote to retract.
def enable_vote_power_delegation( self, key: substrateinterface.keypair.Keypair) -> substrateinterface.base.ExtrinsicReceipt:
1563    def enable_vote_power_delegation(self, key: Keypair) -> ExtrinsicReceipt:
1564        """
1565        Enables vote power delegation for the signer's account.
1566
1567        Args:
1568            key: The keypair used for signing the delegation transaction.
1569
1570        Returns:
1571            A receipt of the vote power delegation transaction.
1572
1573        Raises:
1574            ChainTransactionError: If the transaction fails.
1575        """
1576
1577        response = self.compose_call(
1578            "enable_vote_power_delegation",
1579            params={},
1580            key=key,
1581            module="GovernanceModule",
1582        )
1583
1584        return response

Enables vote power delegation for the signer's account.

Arguments:
  • key: The keypair used for signing the delegation transaction.
Returns:

A receipt of the vote power delegation transaction.

Raises:
  • ChainTransactionError: If the transaction fails.
def disable_vote_power_delegation( self, key: substrateinterface.keypair.Keypair) -> substrateinterface.base.ExtrinsicReceipt:
1586    def disable_vote_power_delegation(self, key: Keypair) -> ExtrinsicReceipt:
1587        """
1588        Disables vote power delegation for the signer's account.
1589
1590        Args:
1591            key: The keypair used for signing the delegation transaction.
1592
1593        Returns:
1594            A receipt of the vote power delegation transaction.
1595
1596        Raises:
1597            ChainTransactionError: If the transaction fails.
1598        """
1599
1600        response = self.compose_call(
1601            "disable_vote_power_delegation",
1602            params={},
1603            key=key,
1604            module="GovernanceModule",
1605        )
1606
1607        return response

Disables vote power delegation for the signer's account.

Arguments:
  • key: The keypair used for signing the delegation transaction.
Returns:

A receipt of the vote power delegation transaction.

Raises:
  • ChainTransactionError: If the transaction fails.
def add_dao_application( self, key: substrateinterface.keypair.Keypair, application_key: communex.types.Ss58Address, data: str) -> substrateinterface.base.ExtrinsicReceipt:
1609    def add_dao_application(
1610        self, key: Keypair, application_key: Ss58Address, data: str
1611    ) -> ExtrinsicReceipt:
1612        """
1613        Submits a new application to the general subnet DAO.
1614
1615        Args:
1616            key: The keypair used for signing the application transaction.
1617            application_key: The SS58 address of the application key.
1618            data: The data associated with the application.
1619
1620        Returns:
1621            A receipt of the application transaction.
1622
1623        Raises:
1624            ChainTransactionError: If the transaction fails.
1625        """
1626
1627        params = {"application_key": application_key, "data": data}
1628
1629        response = self.compose_call(
1630            "add_dao_application", module="GovernanceModule", key=key,
1631            params=params
1632        )
1633
1634        return response

Submits a new application to the general subnet DAO.

Arguments:
  • key: The keypair used for signing the application transaction.
  • application_key: The SS58 address of the application key.
  • data: The data associated with the application.
Returns:

A receipt of the application transaction.

Raises:
  • ChainTransactionError: If the transaction fails.
def query_map_curator_applications(self) -> dict[str, dict[str, str]]:
1636    def query_map_curator_applications(self) -> dict[str, dict[str, str]]:
1637        query_result = self.query_map(
1638            "CuratorApplications", module="GovernanceModule", params=[],
1639            extract_value=False
1640        )
1641        applications = query_result.get("CuratorApplications", {})
1642        return applications
def query_map_proposals(self, extract_value: bool = False) -> dict[int, dict[str, typing.Any]]:
1644    def query_map_proposals(
1645        self, extract_value: bool = False
1646    ) -> dict[int, dict[str, Any]]:
1647        """
1648        Retrieves a mappping of proposals from the network.
1649
1650        Queries the network and returns a mapping of proposal IDs to
1651        their respective parameters.
1652
1653        Returns:
1654            A dictionary mapping proposal IDs
1655            to dictionaries of their parameters.
1656
1657        Raises:
1658            QueryError: If the query to the network fails or is invalid.
1659        """
1660
1661        return self.query_map(
1662            "Proposals", extract_value=extract_value, module="GovernanceModule"
1663        )["Proposals"]

Retrieves a mappping of proposals from the network.

Queries the network and returns a mapping of proposal IDs to their respective parameters.

Returns:

A dictionary mapping proposal IDs to dictionaries of their parameters.

Raises:
  • QueryError: If the query to the network fails or is invalid.
def query_map_weights( self, netuid: int = 0, extract_value: bool = False) -> dict[int, list[tuple[int, int]]] | None:
1665    def query_map_weights(
1666        self, netuid: int = 0, extract_value: bool = False
1667    ) -> dict[int, list[tuple[int, int]]] | None:
1668        """
1669        Retrieves a mapping of weights for keys on the network.
1670
1671        Queries the network and returns a mapping of key UIDs to
1672        their respective weights.
1673
1674        Args:
1675            netuid: The network UID from which to get the weights.
1676
1677        Returns:
1678            A dictionary mapping key UIDs to lists of their weights.
1679
1680        Raises:
1681            QueryError: If the query to the network fails or is invalid.
1682        """
1683
1684        weights_dict = self.query_map(
1685            "Weights",
1686            [netuid],
1687            extract_value=extract_value
1688        ).get("Weights")
1689        return weights_dict

Retrieves a mapping of weights for keys on the network.

Queries the network and returns a mapping of key UIDs to their respective weights.

Arguments:
  • netuid: The network UID from which to get the weights.
Returns:

A dictionary mapping key UIDs to lists of their weights.

Raises:
  • QueryError: If the query to the network fails or is invalid.
def query_map_key( self, netuid: int = 0, extract_value: bool = False) -> dict[int, communex.types.Ss58Address]:
1691    def query_map_key(
1692        self,
1693        netuid: int = 0,
1694        extract_value: bool = False,
1695    ) -> dict[int, Ss58Address]:
1696        """
1697        Retrieves a map of keys from the network.
1698
1699        Fetches a mapping of key UIDs to their associated
1700        addresses on the network.
1701        The query can be targeted at a specific network UID if required.
1702
1703        Args:
1704            netuid: The network UID from which to get the keys.
1705
1706        Returns:
1707            A dictionary mapping key UIDs to their addresses.
1708
1709        Raises:
1710            QueryError: If the query to the network fails or is invalid.
1711        """
1712        return self.query_map("Keys", [netuid], extract_value=extract_value)["Keys"]

Retrieves a map of keys from the network.

Fetches a mapping of key UIDs to their associated addresses on the network. The query can be targeted at a specific network UID if required.

Arguments:
  • netuid: The network UID from which to get the keys.
Returns:

A dictionary mapping key UIDs to their addresses.

Raises:
  • QueryError: If the query to the network fails or is invalid.
def query_map_address(self, netuid: int = 0, extract_value: bool = False) -> dict[int, str]:
1714    def query_map_address(
1715        self, netuid: int = 0, extract_value: bool = False
1716    ) -> dict[int, str]:
1717        """
1718        Retrieves a map of key addresses from the network.
1719
1720        Queries the network for a mapping of key UIDs to their addresses.
1721
1722        Args:
1723            netuid: The network UID from which to get the addresses.
1724
1725        Returns:
1726            A dictionary mapping key UIDs to their addresses.
1727
1728        Raises:
1729            QueryError: If the query to the network fails or is invalid.
1730        """
1731
1732        return self.query_map("Address", [netuid], extract_value=extract_value)[
1733            "Address"
1734        ]

Retrieves a map of key addresses from the network.

Queries the network for a mapping of key UIDs to their addresses.

Arguments:
  • netuid: The network UID from which to get the addresses.
Returns:

A dictionary mapping key UIDs to their addresses.

Raises:
  • QueryError: If the query to the network fails or is invalid.
def query_map_emission(self, extract_value: bool = False) -> dict[int, list[int]]:
1736    def query_map_emission(self, extract_value: bool = False) -> dict[int, list[int]]:
1737        """
1738        Retrieves a map of emissions for keys on the network.
1739
1740        Queries the network to get a mapping of
1741        key UIDs to their emission values.
1742
1743        Returns:
1744            A dictionary mapping key UIDs to lists of their emission values.
1745
1746        Raises:
1747            QueryError: If the query to the network fails or is invalid.
1748        """
1749
1750        return self.query_map("Emission", extract_value=extract_value)["Emission"]

Retrieves a map of emissions for keys on the network.

Queries the network to get a mapping of key UIDs to their emission values.

Returns:

A dictionary mapping key UIDs to lists of their emission values.

Raises:
  • QueryError: If the query to the network fails or is invalid.
def query_map_pending_emission(self, extract_value: bool = False) -> int:
1752    def query_map_pending_emission(self, extract_value: bool = False) -> int:
1753        """
1754        Retrieves a map of pending emissions for the subnets.
1755
1756        Queries the network for a mapping of subnet UIDs to their pending emission values.
1757
1758        Returns:
1759            A dictionary mapping subnet UIDs to their pending emission values.
1760
1761        Raises:
1762            QueryError: If the query to the network fails or is invalid.
1763        """
1764        return self.query_map("PendingEmission", extract_value=extract_value, module="SubnetEmissionModule")["PendingEmission"]

Retrieves a map of pending emissions for the subnets.

Queries the network for a mapping of subnet UIDs to their pending emission values.

Returns:

A dictionary mapping subnet UIDs to their pending emission values.

Raises:
  • QueryError: If the query to the network fails or is invalid.
def query_map_subnet_emission(self, extract_value: bool = False) -> dict[int, int]:
1766    def query_map_subnet_emission(self, extract_value: bool = False) -> dict[int, int]:
1767        """
1768        Retrieves a map of subnet emissions for the network.
1769
1770        Queries the network for a mapping of subnet UIDs to their emission values.
1771
1772        Returns:
1773            A dictionary mapping subnet UIDs to their emission values.
1774
1775        Raises:
1776            QueryError: If the query to the network fails or is invalid.
1777        """
1778
1779        return self.query_map("SubnetEmission", extract_value=extract_value, module="SubnetEmissionModule")["SubnetEmission"]

Retrieves a map of subnet emissions for the network.

Queries the network for a mapping of subnet UIDs to their emission values.

Returns:

A dictionary mapping subnet UIDs to their emission values.

Raises:
  • QueryError: If the query to the network fails or is invalid.
def query_map_subnet_consensus(self, extract_value: bool = False) -> dict[int, str]:
1781    def query_map_subnet_consensus(self, extract_value: bool = False) -> dict[int, str]:
1782        """
1783        Retrieves a map of subnet consensus types for the network.
1784
1785        Queries the network for a mapping of subnet UIDs to their consensus types.
1786
1787        Returns:
1788            A dictionary mapping subnet UIDs to their consensus types.
1789
1790        Raises:
1791            QueryError: If the query to the network fails or is invalid.
1792        """
1793
1794        return self.query_map("SubnetConsensusType", extract_value=extract_value, module="SubnetEmissionModule")["SubnetConsensusType"]

Retrieves a map of subnet consensus types for the network.

Queries the network for a mapping of subnet UIDs to their consensus types.

Returns:

A dictionary mapping subnet UIDs to their consensus types.

Raises:
  • QueryError: If the query to the network fails or is invalid.
def query_map_incentive(self, extract_value: bool = False) -> dict[int, list[int]]:
1796    def query_map_incentive(self, extract_value: bool = False) -> dict[int, list[int]]:
1797        """
1798        Retrieves a mapping of incentives for keys on the network.
1799
1800        Queries the network and returns a mapping of key UIDs to
1801        their respective incentive values.
1802
1803        Returns:
1804            A dictionary mapping key UIDs to lists of their incentive values.
1805
1806        Raises:
1807            QueryError: If the query to the network fails or is invalid.
1808        """
1809
1810        return self.query_map("Incentive", extract_value=extract_value)["Incentive"]

Retrieves a mapping of incentives for keys on the network.

Queries the network and returns a mapping of key UIDs to their respective incentive values.

Returns:

A dictionary mapping key UIDs to lists of their incentive values.

Raises:
  • QueryError: If the query to the network fails or is invalid.
def query_map_dividend(self, extract_value: bool = False) -> dict[int, list[int]]:
1812    def query_map_dividend(self, extract_value: bool = False) -> dict[int, list[int]]:
1813        """
1814        Retrieves a mapping of dividends for keys on the network.
1815
1816        Queries the network for a mapping of key UIDs to
1817        their dividend values.
1818
1819        Returns:
1820            A dictionary mapping key UIDs to lists of their dividend values.
1821
1822        Raises:
1823            QueryError: If the query to the network fails or is invalid.
1824        """
1825
1826        return self.query_map("Dividends", extract_value=extract_value)["Dividends"]

Retrieves a mapping of dividends for keys on the network.

Queries the network for a mapping of key UIDs to their dividend values.

Returns:

A dictionary mapping key UIDs to lists of their dividend values.

Raises:
  • QueryError: If the query to the network fails or is invalid.
def query_map_regblock(self, netuid: int = 0, extract_value: bool = False) -> dict[int, int]:
1828    def query_map_regblock(
1829        self, netuid: int = 0, extract_value: bool = False
1830    ) -> dict[int, int]:
1831        """
1832        Retrieves a mapping of registration blocks for keys on the network.
1833
1834        Queries the network for a mapping of key UIDs to
1835        the blocks where they were registered.
1836
1837        Args:
1838            netuid: The network UID from which to get the registration blocks.
1839
1840        Returns:
1841            A dictionary mapping key UIDs to their registration blocks.
1842
1843        Raises:
1844            QueryError: If the query to the network fails or is invalid.
1845        """
1846
1847        return self.query_map(
1848            "RegistrationBlock", [netuid], extract_value=extract_value
1849        )["RegistrationBlock"]

Retrieves a mapping of registration blocks for keys on the network.

Queries the network for a mapping of key UIDs to the blocks where they were registered.

Arguments:
  • netuid: The network UID from which to get the registration blocks.
Returns:

A dictionary mapping key UIDs to their registration blocks.

Raises:
  • QueryError: If the query to the network fails or is invalid.
def query_map_lastupdate(self, extract_value: bool = False) -> dict[int, list[int]]:
1851    def query_map_lastupdate(self, extract_value: bool = False) -> dict[int, list[int]]:
1852        """
1853        Retrieves a mapping of the last update times for keys on the network.
1854
1855        Queries the network for a mapping of key UIDs to their last update times.
1856
1857        Returns:
1858            A dictionary mapping key UIDs to lists of their last update times.
1859
1860        Raises:
1861            QueryError: If the query to the network fails or is invalid.
1862        """
1863
1864        return self.query_map("LastUpdate", extract_value=extract_value)["LastUpdate"]

Retrieves a mapping of the last update times for keys on the network.

Queries the network for a mapping of key UIDs to their last update times.

Returns:

A dictionary mapping key UIDs to lists of their last update times.

Raises:
  • QueryError: If the query to the network fails or is invalid.
def query_map_stakefrom( self, extract_value: bool = False) -> dict[communex.types.Ss58Address, list[tuple[communex.types.Ss58Address, int]]]:
1866    def query_map_stakefrom(
1867        self, extract_value: bool = False
1868    ) -> dict[Ss58Address, list[tuple[Ss58Address, int]]]:
1869        """
1870        Retrieves a mapping of stakes from various sources for keys on the network.
1871
1872        Queries the network to obtain a mapping of key addresses to the sources
1873        and amounts of stakes they have received.
1874
1875        Args:
1876            netuid: The network UID from which to get the stakes.
1877
1878        Returns:
1879            A dictionary mapping key addresses to lists of tuples
1880            (module_key_address, amount).
1881
1882        Raises:
1883            QueryError: If the query to the network fails or is invalid.
1884        """
1885
1886        result = self.query_map("StakeFrom", [], extract_value=extract_value)[
1887            "StakeFrom"
1888        ]
1889
1890        return transform_stake_dmap(result)

Retrieves a mapping of stakes from various sources for keys on the network.

Queries the network to obtain a mapping of key addresses to the sources and amounts of stakes they have received.

Arguments:
  • netuid: The network UID from which to get the stakes.
Returns:

A dictionary mapping key addresses to lists of tuples (module_key_address, amount).

Raises:
  • QueryError: If the query to the network fails or is invalid.
def query_map_staketo(self, extract_value: bool = False) -> dict[str, list[tuple[str, int]]]:
1892    def query_map_staketo(
1893        self, extract_value: bool = False
1894    ) -> dict[str, list[tuple[str, int]]]:
1895        """
1896        Retrieves a mapping of stakes to destinations for keys on the network.
1897
1898        Queries the network for a mapping of key addresses to the destinations
1899        and amounts of stakes they have made.
1900
1901        Args:
1902            netuid: The network UID from which to get the stakes.
1903
1904        Returns:
1905            A dictionary mapping key addresses to lists of tuples
1906            (module_key_address, amount).
1907
1908        Raises:
1909            QueryError: If the query to the network fails or is invalid.
1910        """
1911
1912        result = self.query_map("StakeTo", [], extract_value=extract_value)[
1913            "StakeTo"
1914        ]
1915        return transform_stake_dmap(result)

Retrieves a mapping of stakes to destinations for keys on the network.

Queries the network for a mapping of key addresses to the destinations and amounts of stakes they have made.

Arguments:
  • netuid: The network UID from which to get the stakes.
Returns:

A dictionary mapping key addresses to lists of tuples (module_key_address, amount).

Raises:
  • QueryError: If the query to the network fails or is invalid.
def query_map_delegationfee(self, netuid: int = 0, extract_value: bool = False) -> dict[str, int]:
1917    def query_map_delegationfee(
1918        self, netuid: int = 0, extract_value: bool = False
1919    ) -> dict[str, int]:
1920        """
1921        Retrieves a mapping of delegation fees for keys on the network.
1922
1923        Queries the network to obtain a mapping of key addresses to their
1924        respective delegation fees.
1925
1926        Args:
1927            netuid: The network UID to filter the delegation fees.
1928
1929        Returns:
1930            A dictionary mapping key addresses to their delegation fees.
1931
1932        Raises:
1933            QueryError: If the query to the network fails or is invalid.
1934        """
1935
1936        return self.query_map("DelegationFee", [netuid], extract_value=extract_value)[
1937            "DelegationFee"
1938        ]

Retrieves a mapping of delegation fees for keys on the network.

Queries the network to obtain a mapping of key addresses to their respective delegation fees.

Arguments:
  • netuid: The network UID to filter the delegation fees.
Returns:

A dictionary mapping key addresses to their delegation fees.

Raises:
  • QueryError: If the query to the network fails or is invalid.
def query_map_tempo(self, extract_value: bool = False) -> dict[int, int]:
1940    def query_map_tempo(self, extract_value: bool = False) -> dict[int, int]:
1941        """
1942        Retrieves a mapping of tempo settings for the network.
1943
1944        Queries the network to obtain the tempo (rate of reward distributions)
1945        settings for various network subnets.
1946
1947        Returns:
1948            A dictionary mapping network UIDs to their tempo settings.
1949
1950        Raises:
1951            QueryError: If the query to the network fails or is invalid.
1952        """
1953
1954        return self.query_map("Tempo", extract_value=extract_value)["Tempo"]

Retrieves a mapping of tempo settings for the network.

Queries the network to obtain the tempo (rate of reward distributions) settings for various network subnets.

Returns:

A dictionary mapping network UIDs to their tempo settings.

Raises:
  • QueryError: If the query to the network fails or is invalid.
def query_map_immunity_period(self, extract_value: bool) -> dict[int, int]:
1956    def query_map_immunity_period(self, extract_value: bool) -> dict[int, int]:
1957        """
1958        Retrieves a mapping of immunity periods for the network.
1959
1960        Queries the network for the immunity period settings,
1961        which represent the time duration during which modules
1962        can not get deregistered.
1963
1964        Returns:
1965            A dictionary mapping network UIDs to their immunity period settings.
1966
1967        Raises:
1968            QueryError: If the query to the network fails or is invalid.
1969        """
1970
1971        return self.query_map("ImmunityPeriod", extract_value=extract_value)[
1972            "ImmunityPeriod"
1973        ]

Retrieves a mapping of immunity periods for the network.

Queries the network for the immunity period settings, which represent the time duration during which modules can not get deregistered.

Returns:

A dictionary mapping network UIDs to their immunity period settings.

Raises:
  • QueryError: If the query to the network fails or is invalid.
def query_map_min_allowed_weights(self, extract_value: bool = False) -> dict[int, int]:
1975    def query_map_min_allowed_weights(
1976        self, extract_value: bool = False
1977    ) -> dict[int, int]:
1978        """
1979        Retrieves a mapping of minimum allowed weights for the network.
1980
1981        Queries the network to obtain the minimum allowed weights,
1982        which are the lowest permissible weight values that can be set by
1983        validators.
1984
1985        Returns:
1986            A dictionary mapping network UIDs to
1987            their minimum allowed weight values.
1988
1989        Raises:
1990            QueryError: If the query to the network fails or is invalid.
1991        """
1992
1993        return self.query_map("MinAllowedWeights", extract_value=extract_value)[
1994            "MinAllowedWeights"
1995        ]

Retrieves a mapping of minimum allowed weights for the network.

Queries the network to obtain the minimum allowed weights, which are the lowest permissible weight values that can be set by validators.

Returns:

A dictionary mapping network UIDs to their minimum allowed weight values.

Raises:
  • QueryError: If the query to the network fails or is invalid.
def query_map_max_allowed_weights(self, extract_value: bool = False) -> dict[int, int]:
1997    def query_map_max_allowed_weights(
1998        self, extract_value: bool = False
1999    ) -> dict[int, int]:
2000        """
2001        Retrieves a mapping of maximum allowed weights for the network.
2002
2003        Queries the network for the maximum allowed weights,
2004        which are the highest permissible
2005        weight values that can be set by validators.
2006
2007        Returns:
2008            A dictionary mapping network UIDs to
2009            their maximum allowed weight values.
2010
2011        Raises:
2012            QueryError: If the query to the network fails or is invalid.
2013        """
2014
2015        return self.query_map("MaxAllowedWeights", extract_value=extract_value)[
2016            "MaxAllowedWeights"
2017        ]

Retrieves a mapping of maximum allowed weights for the network.

Queries the network for the maximum allowed weights, which are the highest permissible weight values that can be set by validators.

Returns:

A dictionary mapping network UIDs to their maximum allowed weight values.

Raises:
  • QueryError: If the query to the network fails or is invalid.
def query_map_max_allowed_uids(self, extract_value: bool = False) -> dict[int, int]:
2019    def query_map_max_allowed_uids(self, extract_value: bool = False) -> dict[int, int]:
2020        """
2021        Queries the network for the maximum number of allowed user IDs (UIDs)
2022        for each network subnet.
2023
2024        Fetches a mapping of network subnets to their respective
2025        limits on the number of user IDs that can be created or used.
2026
2027        Returns:
2028            A dictionary mapping network UIDs (unique identifiers) to their
2029            maximum allowed number of UIDs.
2030            Each entry represents a network subnet
2031            with its corresponding UID limit.
2032
2033        Raises:
2034            QueryError: If the query to the network fails or is invalid.
2035        """
2036
2037        return self.query_map("MaxAllowedUids", extract_value=extract_value)[
2038            "MaxAllowedUids"
2039        ]

Queries the network for the maximum number of allowed user IDs (UIDs) for each network subnet.

Fetches a mapping of network subnets to their respective limits on the number of user IDs that can be created or used.

Returns:

A dictionary mapping network UIDs (unique identifiers) to their maximum allowed number of UIDs. Each entry represents a network subnet with its corresponding UID limit.

Raises:
  • QueryError: If the query to the network fails or is invalid.
def query_map_min_stake(self, extract_value: bool = False) -> dict[int, int]:
2041    def query_map_min_stake(self, extract_value: bool = False) -> dict[int, int]:
2042        """
2043        Retrieves a mapping of minimum allowed stake on the network.
2044
2045        Queries the network to obtain the minimum number of stake,
2046        which is represented in nanotokens.
2047
2048        Returns:
2049            A dictionary mapping network UIDs to
2050            their minimum allowed stake values.
2051
2052        Raises:
2053            QueryError: If the query to the network fails or is invalid.
2054        """
2055
2056        return self.query_map("MinStake", extract_value=extract_value)["MinStake"]

Retrieves a mapping of minimum allowed stake on the network.

Queries the network to obtain the minimum number of stake, which is represented in nanotokens.

Returns:

A dictionary mapping network UIDs to their minimum allowed stake values.

Raises:
  • QueryError: If the query to the network fails or is invalid.
def query_map_max_stake(self, extract_value: bool = False) -> dict[int, int]:
2058    def query_map_max_stake(self, extract_value: bool = False) -> dict[int, int]:
2059        """
2060        Retrieves a mapping of the maximum stake values for the network.
2061
2062        Queries the network for the maximum stake values across various s
2063        ubnets of the network.
2064
2065        Returns:
2066            A dictionary mapping network UIDs to their maximum stake values.
2067
2068        Raises:
2069            QueryError: If the query to the network fails or is invalid.
2070        """
2071
2072        return self.query_map("MaxStake", extract_value=extract_value)["MaxStake"]

Retrieves a mapping of the maximum stake values for the network.

Queries the network for the maximum stake values across various s ubnets of the network.

Returns:

A dictionary mapping network UIDs to their maximum stake values.

Raises:
  • QueryError: If the query to the network fails or is invalid.
def query_map_founder(self, extract_value: bool = False) -> dict[int, str]:
2074    def query_map_founder(self, extract_value: bool = False) -> dict[int, str]:
2075        """
2076        Retrieves a mapping of founders for the network.
2077
2078        Queries the network to obtain the founders associated with
2079        various subnets.
2080
2081        Returns:
2082            A dictionary mapping network UIDs to their respective founders.
2083
2084        Raises:
2085            QueryError: If the query to the network fails or is invalid.
2086        """
2087
2088        return self.query_map("Founder", extract_value=extract_value)["Founder"]

Retrieves a mapping of founders for the network.

Queries the network to obtain the founders associated with various subnets.

Returns:

A dictionary mapping network UIDs to their respective founders.

Raises:
  • QueryError: If the query to the network fails or is invalid.
def query_map_founder_share(self, extract_value: bool = False) -> dict[int, int]:
2090    def query_map_founder_share(self, extract_value: bool = False) -> dict[int, int]:
2091        """
2092        Retrieves a mapping of founder shares for the network.
2093
2094        Queries the network for the share percentages
2095        allocated to founders across different subnets.
2096
2097        Returns:
2098            A dictionary mapping network UIDs to their founder share percentages.
2099
2100        Raises:
2101            QueryError: If the query to the network fails or is invalid.
2102        """
2103
2104        return self.query_map("FounderShare", extract_value=extract_value)[
2105            "FounderShare"
2106        ]

Retrieves a mapping of founder shares for the network.

Queries the network for the share percentages allocated to founders across different subnets.

Returns:

A dictionary mapping network UIDs to their founder share percentages.

Raises:
  • QueryError: If the query to the network fails or is invalid.
def query_map_incentive_ratio(self, extract_value: bool = False) -> dict[int, int]:
2108    def query_map_incentive_ratio(self, extract_value: bool = False) -> dict[int, int]:
2109        """
2110        Retrieves a mapping of incentive ratios for the network.
2111
2112        Queries the network for the incentive ratios,
2113        which are the proportions of rewards or incentives
2114        allocated in different subnets of the network.
2115
2116        Returns:
2117            A dictionary mapping network UIDs to their incentive ratios.
2118
2119        Raises:
2120            QueryError: If the query to the network fails or is invalid.
2121        """
2122
2123        return self.query_map("IncentiveRatio", extract_value=extract_value)[
2124            "IncentiveRatio"
2125        ]

Retrieves a mapping of incentive ratios for the network.

Queries the network for the incentive ratios, which are the proportions of rewards or incentives allocated in different subnets of the network.

Returns:

A dictionary mapping network UIDs to their incentive ratios.

Raises:
  • QueryError: If the query to the network fails or is invalid.
def query_map_trust_ratio(self, extract_value: bool = False) -> dict[int, int]:
2127    def query_map_trust_ratio(self, extract_value: bool = False) -> dict[int, int]:
2128        """
2129        Retrieves a mapping of trust ratios for the network.
2130
2131        Queries the network for trust ratios,
2132        indicative of the level of trust or credibility assigned
2133        to different subnets of the network.
2134
2135        Returns:
2136            A dictionary mapping network UIDs to their trust ratios.
2137
2138        Raises:
2139            QueryError: If the query to the network fails or is invalid.
2140        """
2141
2142        return self.query_map("TrustRatio", extract_value=extract_value)["TrustRatio"]

Retrieves a mapping of trust ratios for the network.

Queries the network for trust ratios, indicative of the level of trust or credibility assigned to different subnets of the network.

Returns:

A dictionary mapping network UIDs to their trust ratios.

Raises:
  • QueryError: If the query to the network fails or is invalid.
def query_map_vote_mode_subnet(self, extract_value: bool = False) -> dict[int, str]:
2144    def query_map_vote_mode_subnet(self, extract_value: bool = False) -> dict[int, str]:
2145        """
2146        Retrieves a mapping of vote modes for subnets within the network.
2147
2148        Queries the network for the voting modes used in different
2149        subnets, which define the methodology or approach of voting within those
2150        subnets.
2151
2152        Returns:
2153            A dictionary mapping network UIDs to their vote
2154            modes for subnets.
2155
2156        Raises:
2157            QueryError: If the query to the network fails or is invalid.
2158        """
2159
2160        return self.query_map("VoteModeSubnet", extract_value=extract_value)[
2161            "VoteModeSubnet"
2162        ]

Retrieves a mapping of vote modes for subnets within the network.

Queries the network for the voting modes used in different subnets, which define the methodology or approach of voting within those subnets.

Returns:

A dictionary mapping network UIDs to their vote modes for subnets.

Raises:
  • QueryError: If the query to the network fails or is invalid.
def query_map_legit_whitelist( self, extract_value: bool = False) -> dict[communex.types.Ss58Address, int]:
2164    def query_map_legit_whitelist(
2165        self, extract_value: bool = False
2166    ) -> dict[Ss58Address, int]:
2167        """
2168        Retrieves a mapping of whitelisted addresses for the network.
2169
2170        Queries the network for a mapping of whitelisted addresses
2171        and their respective legitimacy status.
2172
2173        Returns:
2174            A dictionary mapping addresses to their legitimacy status.
2175
2176        Raises:
2177            QueryError: If the query to the network fails or is invalid.
2178        """
2179
2180        return self.query_map(
2181            "LegitWhitelist", module="GovernanceModule", extract_value=extract_value)[
2182            "LegitWhitelist"
2183        ]

Retrieves a mapping of whitelisted addresses for the network.

Queries the network for a mapping of whitelisted addresses and their respective legitimacy status.

Returns:

A dictionary mapping addresses to their legitimacy status.

Raises:
  • QueryError: If the query to the network fails or is invalid.
def query_map_subnet_names(self, extract_value: bool = False) -> dict[int, str]:
2185    def query_map_subnet_names(self, extract_value: bool = False) -> dict[int, str]:
2186        """
2187        Retrieves a mapping of subnet names within the network.
2188
2189        Queries the network for the names of various subnets,
2190        providing an overview of the different
2191        subnets within the network.
2192
2193        Returns:
2194            A dictionary mapping network UIDs to their subnet names.
2195
2196        Raises:
2197            QueryError: If the query to the network fails or is invalid.
2198        """
2199
2200        return self.query_map("SubnetNames", extract_value=extract_value)["SubnetNames"]

Retrieves a mapping of subnet names within the network.

Queries the network for the names of various subnets, providing an overview of the different subnets within the network.

Returns:

A dictionary mapping network UIDs to their subnet names.

Raises:
  • QueryError: If the query to the network fails or is invalid.
def query_map_balances( self, extract_value: bool = False) -> dict[str, dict[str, int | dict[str, int | float]]]:
2202    def query_map_balances(
2203        self, extract_value: bool = False
2204    ) -> dict[str, dict[str, int | dict[str, int | float]]]:
2205        """
2206        Retrieves a mapping of account balances within the network.
2207
2208        Queries the network for the balances associated with different accounts.
2209        It provides detailed information including various types of
2210        balances for each account.
2211
2212        Returns:
2213            A dictionary mapping account addresses to their balance details.
2214
2215        Raises:
2216            QueryError: If the query to the network fails or is invalid.
2217        """
2218
2219        return self.query_map("Account", module="System", extract_value=extract_value)[
2220            "Account"
2221        ]

Retrieves a mapping of account balances within the network.

Queries the network for the balances associated with different accounts. It provides detailed information including various types of balances for each account.

Returns:

A dictionary mapping account addresses to their balance details.

Raises:
  • QueryError: If the query to the network fails or is invalid.
def query_map_registration_blocks(self, netuid: int = 0, extract_value: bool = False) -> dict[int, int]:
2223    def query_map_registration_blocks(
2224        self, netuid: int = 0, extract_value: bool = False
2225    ) -> dict[int, int]:
2226        """
2227        Retrieves a mapping of registration blocks for UIDs on the network.
2228
2229        Queries the network to find the block numbers at which various
2230        UIDs were registered.
2231
2232        Args:
2233            netuid: The network UID from which to get the registrations.
2234
2235        Returns:
2236            A dictionary mapping UIDs to their registration block numbers.
2237
2238        Raises:
2239            QueryError: If the query to the network fails or is invalid.
2240        """
2241
2242        return self.query_map(
2243            "RegistrationBlock", [netuid], extract_value=extract_value
2244        )["RegistrationBlock"]

Retrieves a mapping of registration blocks for UIDs on the network.

Queries the network to find the block numbers at which various UIDs were registered.

Arguments:
  • netuid: The network UID from which to get the registrations.
Returns:

A dictionary mapping UIDs to their registration block numbers.

Raises:
  • QueryError: If the query to the network fails or is invalid.
def query_map_name(self, netuid: int = 0, extract_value: bool = False) -> dict[int, str]:
2246    def query_map_name(
2247        self, netuid: int = 0, extract_value: bool = False
2248    ) -> dict[int, str]:
2249        """
2250        Retrieves a mapping of names for keys on the network.
2251
2252        Queries the network for the names associated with different keys.
2253        It provides a mapping of key UIDs to their registered names.
2254
2255        Args:
2256            netuid: The network UID from which to get the names.
2257
2258        Returns:
2259            A dictionary mapping key UIDs to their names.
2260
2261        Raises:
2262            QueryError: If the query to the network fails or is invalid.
2263        """
2264
2265        return self.query_map("Name", [netuid], extract_value=extract_value)["Name"]

Retrieves a mapping of names for keys on the network.

Queries the network for the names associated with different keys. It provides a mapping of key UIDs to their registered names.

Arguments:
  • netuid: The network UID from which to get the names.
Returns:

A dictionary mapping key UIDs to their names.

Raises:
  • QueryError: If the query to the network fails or is invalid.
def get_immunity_period(self, netuid: int = 0) -> int:
2269    def get_immunity_period(self, netuid: int = 0) -> int:
2270        """
2271        Queries the network for the immunity period setting.
2272
2273        The immunity period is a time duration during which a module
2274        can not be deregistered from the network.
2275        Fetches the immunity period for a specified network subnet.
2276
2277        Args:
2278            netuid: The network UID for which to query the immunity period.
2279
2280        Returns:
2281            The immunity period setting for the specified network subnet.
2282
2283        Raises:
2284            QueryError: If the query to the network fails or is invalid.
2285        """
2286
2287        return self.query(
2288            "ImmunityPeriod",
2289            params=[netuid],
2290        )

Queries the network for the immunity period setting.

The immunity period is a time duration during which a module can not be deregistered from the network. Fetches the immunity period for a specified network subnet.

Arguments:
  • netuid: The network UID for which to query the immunity period.
Returns:

The immunity period setting for the specified network subnet.

Raises:
  • QueryError: If the query to the network fails or is invalid.
def get_max_set_weights_per_epoch(self):
2292    def get_max_set_weights_per_epoch(self):
2293        return self.query("MaximumSetWeightCallsPerEpoch")
def get_min_allowed_weights(self, netuid: int = 0) -> int:
2295    def get_min_allowed_weights(self, netuid: int = 0) -> int:
2296        """
2297        Queries the network for the minimum allowed weights setting.
2298
2299        Retrieves the minimum weight values that are possible to set
2300        by a validator within a specific network subnet.
2301
2302        Args:
2303            netuid: The network UID for which to query the minimum allowed
2304              weights.
2305
2306        Returns:
2307            The minimum allowed weight values for the specified network
2308              subnet.
2309
2310        Raises:
2311            QueryError: If the query to the network fails or is invalid.
2312        """
2313
2314        return self.query(
2315            "MinAllowedWeights",
2316            params=[netuid],
2317        )

Queries the network for the minimum allowed weights setting.

Retrieves the minimum weight values that are possible to set by a validator within a specific network subnet.

Arguments:
  • netuid: The network UID for which to query the minimum allowed weights.
Returns:

The minimum allowed weight values for the specified network subnet.

Raises:
  • QueryError: If the query to the network fails or is invalid.
def get_dao_treasury_address(self) -> communex.types.Ss58Address:
2319    def get_dao_treasury_address(self) -> Ss58Address:
2320        return self.query("DaoTreasuryAddress", module="GovernanceModule")
def get_max_allowed_weights(self, netuid: int = 0) -> int:
2322    def get_max_allowed_weights(self, netuid: int = 0) -> int:
2323        """
2324        Queries the network for the maximum allowed weights setting.
2325
2326        Retrieves the maximum weight values that are possible to set
2327        by a validator within a specific network subnet.
2328
2329        Args:
2330            netuid: The network UID for which to query the maximum allowed
2331              weights.
2332
2333        Returns:
2334            The maximum allowed weight values for the specified network
2335              subnet.
2336
2337        Raises:
2338            QueryError: If the query to the network fails or is invalid.
2339        """
2340
2341        return self.query("MaxAllowedWeights", params=[netuid])

Queries the network for the maximum allowed weights setting.

Retrieves the maximum weight values that are possible to set by a validator within a specific network subnet.

Arguments:
  • netuid: The network UID for which to query the maximum allowed weights.
Returns:

The maximum allowed weight values for the specified network subnet.

Raises:
  • QueryError: If the query to the network fails or is invalid.
def get_max_allowed_uids(self, netuid: int = 0) -> int:
2343    def get_max_allowed_uids(self, netuid: int = 0) -> int:
2344        """
2345        Queries the network for the maximum allowed UIDs setting.
2346
2347        Fetches the upper limit on the number of user IDs that can
2348        be allocated or used within a specific network subnet.
2349
2350        Args:
2351            netuid: The network UID for which to query the maximum allowed UIDs.
2352
2353        Returns:
2354            The maximum number of allowed UIDs for the specified network subnet.
2355
2356        Raises:
2357            QueryError: If the query to the network fails or is invalid.
2358        """
2359
2360        return self.query("MaxAllowedUids", params=[netuid])

Queries the network for the maximum allowed UIDs setting.

Fetches the upper limit on the number of user IDs that can be allocated or used within a specific network subnet.

Arguments:
  • netuid: The network UID for which to query the maximum allowed UIDs.
Returns:

The maximum number of allowed UIDs for the specified network subnet.

Raises:
  • QueryError: If the query to the network fails or is invalid.
def get_name(self, netuid: int = 0) -> str:
2362    def get_name(self, netuid: int = 0) -> str:
2363        """
2364        Queries the network for the name of a specific subnet.
2365
2366        Args:
2367            netuid: The network UID for which to query the name.
2368
2369        Returns:
2370            The name of the specified network subnet.
2371
2372        Raises:
2373            QueryError: If the query to the network fails or is invalid.
2374        """
2375
2376        return self.query("Name", params=[netuid])

Queries the network for the name of a specific subnet.

Arguments:
  • netuid: The network UID for which to query the name.
Returns:

The name of the specified network subnet.

Raises:
  • QueryError: If the query to the network fails or is invalid.
def get_subnet_name(self, netuid: int = 0) -> str:
2378    def get_subnet_name(self, netuid: int = 0) -> str:
2379        """
2380        Queries the network for the name of a specific subnet.
2381
2382        Args:
2383            netuid: The network UID for which to query the name.
2384
2385        Returns:
2386            The name of the specified network subnet.
2387
2388        Raises:
2389            QueryError: If the query to the network fails or is invalid.
2390        """
2391
2392        return self.query("SubnetNames", params=[netuid])

Queries the network for the name of a specific subnet.

Arguments:
  • netuid: The network UID for which to query the name.
Returns:

The name of the specified network subnet.

Raises:
  • QueryError: If the query to the network fails or is invalid.
def get_global_dao_treasury(self):
2394    def get_global_dao_treasury(self):
2395        return self.query("GlobalDaoTreasury", module="GovernanceModule")
def get_n(self, netuid: int = 0) -> int:
2397    def get_n(self, netuid: int = 0) -> int:
2398        """
2399        Queries the network for the 'N' hyperparameter, which represents how
2400        many modules are on the network.
2401
2402        Args:
2403            netuid: The network UID for which to query the 'N' hyperparameter.
2404
2405        Returns:
2406            The value of the 'N' hyperparameter for the specified network
2407              subnet.
2408
2409        Raises:
2410            QueryError: If the query to the network fails or is invalid.
2411        """
2412
2413        return self.query("N", params=[netuid])

Queries the network for the 'N' hyperparameter, which represents how many modules are on the network.

Arguments:
  • netuid: The network UID for which to query the 'N' hyperparameter.
Returns:

The value of the 'N' hyperparameter for the specified network subnet.

Raises:
  • QueryError: If the query to the network fails or is invalid.
def get_tempo(self, netuid: int = 0) -> int:
2415    def get_tempo(self, netuid: int = 0) -> int:
2416        """
2417        Queries the network for the tempo setting, measured in blocks, for the
2418        specified subnet.
2419
2420        Args:
2421            netuid: The network UID for which to query the tempo.
2422
2423        Returns:
2424            The tempo setting for the specified subnet.
2425
2426        Raises:
2427            QueryError: If the query to the network fails or is invalid.
2428        """
2429
2430        return self.query("Tempo", params=[netuid])

Queries the network for the tempo setting, measured in blocks, for the specified subnet.

Arguments:
  • netuid: The network UID for which to query the tempo.
Returns:

The tempo setting for the specified subnet.

Raises:
  • QueryError: If the query to the network fails or is invalid.
def get_total_stake(self) -> int:
2432    def get_total_stake(self) -> int:
2433        """
2434        Retrieves a mapping of total stakes for keys on the network.
2435
2436        Queries the network for a mapping of key UIDs to their total stake amounts.
2437
2438        Returns:
2439            A dictionary mapping key UIDs to their total stake amounts.
2440
2441        Raises:
2442            QueryError: If the query to the network fails or is invalid.
2443        """
2444
2445        return self.query("TotalStake")

Retrieves a mapping of total stakes for keys on the network.

Queries the network for a mapping of key UIDs to their total stake amounts.

Returns:

A dictionary mapping key UIDs to their total stake amounts.

Raises:
  • QueryError: If the query to the network fails or is invalid.
def get_registrations_per_block(self):
2447    def get_registrations_per_block(self):
2448        """
2449        Queries the network for the number of registrations per block.
2450
2451        Fetches the number of registrations that are processed per
2452        block within the network.
2453
2454        Returns:
2455            The number of registrations processed per block.
2456
2457        Raises:
2458            QueryError: If the query to the network fails or is invalid.
2459        """
2460
2461        return self.query(
2462            "RegistrationsPerBlock",
2463        )

Queries the network for the number of registrations per block.

Fetches the number of registrations that are processed per block within the network.

Returns:

The number of registrations processed per block.

Raises:
  • QueryError: If the query to the network fails or is invalid.
def max_registrations_per_block(self, netuid: int = 0):
2465    def max_registrations_per_block(self, netuid: int = 0):
2466        """
2467        Queries the network for the maximum number of registrations per block.
2468
2469        Retrieves the upper limit of registrations that can be processed in
2470        each block within a specific network subnet.
2471
2472        Args:
2473            netuid: The network UID for which to query.
2474
2475        Returns:
2476            The maximum number of registrations per block for
2477            the specified network subnet.
2478
2479        Raises:
2480            QueryError: If the query to the network fails or is invalid.
2481        """
2482
2483        return self.query(
2484            "MaxRegistrationsPerBlock",
2485            params=[netuid],
2486        )

Queries the network for the maximum number of registrations per block.

Retrieves the upper limit of registrations that can be processed in each block within a specific network subnet.

Arguments:
  • netuid: The network UID for which to query.
Returns:

The maximum number of registrations per block for the specified network subnet.

Raises:
  • QueryError: If the query to the network fails or is invalid.
def get_proposal(self, proposal_id: int = 0):
2488    def get_proposal(self, proposal_id: int = 0):
2489        """
2490        Queries the network for a specific proposal.
2491
2492        Args:
2493            proposal_id: The ID of the proposal to query.
2494
2495        Returns:
2496            The details of the specified proposal.
2497
2498        Raises:
2499            QueryError: If the query to the network fails, is invalid,
2500                or if the proposal ID does not exist.
2501        """
2502
2503        return self.query(
2504            "Proposals",
2505            params=[proposal_id],
2506        )

Queries the network for a specific proposal.

Arguments:
  • proposal_id: The ID of the proposal to query.
Returns:

The details of the specified proposal.

Raises:
  • QueryError: If the query to the network fails, is invalid, or if the proposal ID does not exist.
def get_trust(self, netuid: int = 0):
2508    def get_trust(self, netuid: int = 0):
2509        """
2510        Queries the network for the trust setting of a specific network subnet.
2511
2512        Retrieves the trust level or score, which may represent the
2513        level of trustworthiness or reliability within a
2514        particular network subnet.
2515
2516        Args:
2517            netuid: The network UID for which to query the trust setting.
2518
2519        Returns:
2520            The trust level or score for the specified network subnet.
2521
2522        Raises:
2523            QueryError: If the query to the network fails or is invalid.
2524        """
2525
2526        return self.query(
2527            "Trust",
2528            params=[netuid],
2529        )

Queries the network for the trust setting of a specific network subnet.

Retrieves the trust level or score, which may represent the level of trustworthiness or reliability within a particular network subnet.

Arguments:
  • netuid: The network UID for which to query the trust setting.
Returns:

The trust level or score for the specified network subnet.

Raises:
  • QueryError: If the query to the network fails or is invalid.
def get_uids(self, key: communex.types.Ss58Address, netuid: int = 0) -> bool | None:
2531    def get_uids(self, key: Ss58Address, netuid: int = 0) -> bool | None:
2532        """
2533        Queries the network for module UIDs associated with a specific key.
2534
2535        Args:
2536            key: The key address for which to query UIDs.
2537            netuid: The network UID within which to search for the key.
2538
2539        Returns:
2540            A list of UIDs associated with the specified key.
2541
2542        Raises:
2543            QueryError: If the query to the network fails or is invalid.
2544        """
2545
2546        return self.query(
2547            "Uids",
2548            params=[netuid, key],
2549        )

Queries the network for module UIDs associated with a specific key.

Arguments:
  • key: The key address for which to query UIDs.
  • netuid: The network UID within which to search for the key.
Returns:

A list of UIDs associated with the specified key.

Raises:
  • QueryError: If the query to the network fails or is invalid.
def get_unit_emission(self) -> int:
2551    def get_unit_emission(self) -> int:
2552        """
2553        Queries the network for the unit emission setting.
2554
2555        Retrieves the unit emission value, which represents the
2556        emission rate or quantity for the $COMM token.
2557
2558        Returns:
2559            The unit emission value in nanos for the network.
2560
2561        Raises:
2562            QueryError: If the query to the network fails or is invalid.
2563        """
2564
2565        return self.query("UnitEmission", module="SubnetEmissionModule")

Queries the network for the unit emission setting.

Retrieves the unit emission value, which represents the emission rate or quantity for the $COMM token.

Returns:

The unit emission value in nanos for the network.

Raises:
  • QueryError: If the query to the network fails or is invalid.
def get_tx_rate_limit(self) -> int:
2567    def get_tx_rate_limit(self) -> int:
2568        """
2569        Queries the network for the transaction rate limit.
2570
2571        Retrieves the rate limit for transactions within the network,
2572        which defines the maximum number of transactions that can be
2573        processed within a certain timeframe.
2574
2575        Returns:
2576            The transaction rate limit for the network.
2577
2578        Raises:
2579            QueryError: If the query to the network fails or is invalid.
2580        """
2581
2582        return self.query(
2583            "TxRateLimit",
2584        )

Queries the network for the transaction rate limit.

Retrieves the rate limit for transactions within the network, which defines the maximum number of transactions that can be processed within a certain timeframe.

Returns:

The transaction rate limit for the network.

Raises:
  • QueryError: If the query to the network fails or is invalid.
def get_subnet_burn(self) -> int:
2586    def get_subnet_burn(self) -> int:
2587        """Queries the network for the subnet burn value.
2588
2589        Retrieves the subnet burn value from the network, which represents
2590        the amount of tokens that are burned (permanently removed from
2591        circulation) for subnet-related operations.
2592
2593        Returns:
2594            int: The subnet burn value.
2595
2596        Raises:
2597            QueryError: If the query to the network fails or returns invalid data.
2598        """
2599
2600        return self.query(
2601            "SubnetBurn",
2602        )

Queries the network for the subnet burn value.

Retrieves the subnet burn value from the network, which represents the amount of tokens that are burned (permanently removed from circulation) for subnet-related operations.

Returns:

int: The subnet burn value.

Raises:
  • QueryError: If the query to the network fails or returns invalid data.
def get_burn_rate(self) -> int:
2604    def get_burn_rate(self) -> int:
2605        """
2606        Queries the network for the burn rate setting.
2607
2608        Retrieves the burn rate, which represents the rate at
2609        which the $COMM token is permanently
2610        removed or 'burned' from circulation.
2611
2612        Returns:
2613            The burn rate for the network.
2614
2615        Raises:
2616            QueryError: If the query to the network fails or is invalid.
2617        """
2618
2619        return self.query(
2620            "BurnRate",
2621            params=[],
2622        )

Queries the network for the burn rate setting.

Retrieves the burn rate, which represents the rate at which the $COMM token is permanently removed or 'burned' from circulation.

Returns:

The burn rate for the network.

Raises:
  • QueryError: If the query to the network fails or is invalid.
def get_burn(self, netuid: int = 0) -> int:
2624    def get_burn(self, netuid: int = 0) -> int:
2625        """
2626        Queries the network for the burn setting.
2627
2628        Retrieves the burn value, which represents the amount of the
2629        $COMM token that is 'burned' or permanently removed from
2630        circulation.
2631
2632        Args:
2633            netuid: The network UID for which to query the burn value.
2634
2635        Returns:
2636            The burn value for the specified network subnet.
2637
2638        Raises:
2639            QueryError: If the query to the network fails or is invalid.
2640        """
2641
2642        return self.query("Burn", params=[netuid])

Queries the network for the burn setting.

Retrieves the burn value, which represents the amount of the $COMM token that is 'burned' or permanently removed from circulation.

Arguments:
  • netuid: The network UID for which to query the burn value.
Returns:

The burn value for the specified network subnet.

Raises:
  • QueryError: If the query to the network fails or is invalid.
def get_min_burn(self) -> int:
2644    def get_min_burn(self) -> int:
2645        """
2646        Queries the network for the minimum burn setting.
2647
2648        Retrieves the minimum burn value, indicating the lowest
2649        amount of the $COMM tokens that can be 'burned' or
2650        permanently removed from circulation.
2651
2652        Returns:
2653            The minimum burn value for the network.
2654
2655        Raises:
2656            QueryError: If the query to the network fails or is invalid.
2657        """
2658
2659        return self.query(
2660            "BurnConfig",
2661            params=[],
2662        )["min_burn"]

Queries the network for the minimum burn setting.

Retrieves the minimum burn value, indicating the lowest amount of the $COMM tokens that can be 'burned' or permanently removed from circulation.

Returns:

The minimum burn value for the network.

Raises:
  • QueryError: If the query to the network fails or is invalid.
def get_min_weight_stake(self) -> int:
2664    def get_min_weight_stake(self) -> int:
2665        """
2666        Queries the network for the minimum weight stake setting.
2667
2668        Retrieves the minimum weight stake, which represents the lowest
2669        stake weight that is allowed for certain operations or
2670        transactions within the network.
2671
2672        Returns:
2673            The minimum weight stake for the network.
2674
2675        Raises:
2676            QueryError: If the query to the network fails or is invalid.
2677        """
2678
2679        return self.query("MinWeightStake", params=[])

Queries the network for the minimum weight stake setting.

Retrieves the minimum weight stake, which represents the lowest stake weight that is allowed for certain operations or transactions within the network.

Returns:

The minimum weight stake for the network.

Raises:
  • QueryError: If the query to the network fails or is invalid.
def get_vote_mode_global(self) -> str:
2681    def get_vote_mode_global(self) -> str:
2682        """
2683        Queries the network for the global vote mode setting.
2684
2685        Retrieves the global vote mode, which defines the overall voting
2686        methodology or approach used across the network in default.
2687
2688        Returns:
2689            The global vote mode setting for the network.
2690
2691        Raises:
2692            QueryError: If the query to the network fails or is invalid.
2693        """
2694
2695        return self.query(
2696            "VoteModeGlobal",
2697        )

Queries the network for the global vote mode setting.

Retrieves the global vote mode, which defines the overall voting methodology or approach used across the network in default.

Returns:

The global vote mode setting for the network.

Raises:
  • QueryError: If the query to the network fails or is invalid.
def get_max_proposals(self) -> int:
2699    def get_max_proposals(self) -> int:
2700        """
2701        Queries the network for the maximum number of proposals allowed.
2702
2703        Retrieves the upper limit on the number of proposals that can be
2704        active or considered at any given time within the network.
2705
2706        Returns:
2707            The maximum number of proposals allowed on the network.
2708
2709        Raises:
2710            QueryError: If the query to the network fails or is invalid.
2711        """
2712
2713        return self.query(
2714            "MaxProposals",
2715        )

Queries the network for the maximum number of proposals allowed.

Retrieves the upper limit on the number of proposals that can be active or considered at any given time within the network.

Returns:

The maximum number of proposals allowed on the network.

Raises:
  • QueryError: If the query to the network fails or is invalid.
def get_max_registrations_per_block(self) -> int:
2717    def get_max_registrations_per_block(self) -> int:
2718        """
2719        Queries the network for the maximum number of registrations per block.
2720
2721        Retrieves the maximum number of registrations that can
2722        be processed in each block within the network.
2723
2724        Returns:
2725            The maximum number of registrations per block on the network.
2726
2727        Raises:
2728            QueryError: If the query to the network fails or is invalid.
2729        """
2730
2731        return self.query(
2732            "MaxRegistrationsPerBlock",
2733            params=[],
2734        )

Queries the network for the maximum number of registrations per block.

Retrieves the maximum number of registrations that can be processed in each block within the network.

Returns:

The maximum number of registrations per block on the network.

Raises:
  • QueryError: If the query to the network fails or is invalid.
def get_max_name_length(self) -> int:
2736    def get_max_name_length(self) -> int:
2737        """
2738        Queries the network for the maximum length allowed for names.
2739
2740        Retrieves the maximum character length permitted for names
2741        within the network. Such as the module names
2742
2743        Returns:
2744            The maximum length allowed for names on the network.
2745
2746        Raises:
2747            QueryError: If the query to the network fails or is invalid.
2748        """
2749
2750        return self.query(
2751            "MaxNameLength",
2752            params=[],
2753        )

Queries the network for the maximum length allowed for names.

Retrieves the maximum character length permitted for names within the network. Such as the module names

Returns:

The maximum length allowed for names on the network.

Raises:
  • QueryError: If the query to the network fails or is invalid.
def get_global_vote_threshold(self) -> int:
2755    def get_global_vote_threshold(self) -> int:
2756        """
2757        Queries the network for the global vote threshold.
2758
2759        Retrieves the global vote threshold, which is the critical value or
2760        percentage required for decisions in the network's governance process.
2761
2762        Returns:
2763            The global vote threshold for the network.
2764
2765        Raises:
2766            QueryError: If the query to the network fails or is invalid.
2767        """
2768
2769        return self.query(
2770            "GlobalVoteThreshold",
2771        )

Queries the network for the global vote threshold.

Retrieves the global vote threshold, which is the critical value or percentage required for decisions in the network's governance process.

Returns:

The global vote threshold for the network.

Raises:
  • QueryError: If the query to the network fails or is invalid.
def get_max_allowed_subnets(self) -> int:
2773    def get_max_allowed_subnets(self) -> int:
2774        """
2775        Queries the network for the maximum number of allowed subnets.
2776
2777        Retrieves the upper limit on the number of subnets that can
2778        be created or operated within the network.
2779
2780        Returns:
2781            The maximum number of allowed subnets on the network.
2782
2783        Raises:
2784            QueryError: If the query to the network fails or is invalid.
2785        """
2786
2787        return self.query(
2788            "MaxAllowedSubnets",
2789            params=[],
2790        )

Queries the network for the maximum number of allowed subnets.

Retrieves the upper limit on the number of subnets that can be created or operated within the network.

Returns:

The maximum number of allowed subnets on the network.

Raises:
  • QueryError: If the query to the network fails or is invalid.
def get_max_allowed_modules(self) -> int:
2792    def get_max_allowed_modules(self) -> int:
2793        """
2794        Queries the network for the maximum number of allowed modules.
2795
2796        Retrieves the upper limit on the number of modules that
2797        can be registered within the network.
2798
2799        Returns:
2800            The maximum number of allowed modules on the network.
2801
2802        Raises:
2803            QueryError: If the query to the network fails or is invalid.
2804        """
2805
2806        return self.query(
2807            "MaxAllowedModules",
2808            params=[],
2809        )

Queries the network for the maximum number of allowed modules.

Retrieves the upper limit on the number of modules that can be registered within the network.

Returns:

The maximum number of allowed modules on the network.

Raises:
  • QueryError: If the query to the network fails or is invalid.
def get_min_stake(self, netuid: int = 0) -> int:
2811    def get_min_stake(self, netuid: int = 0) -> int:
2812        """
2813        Queries the network for the minimum stake required to register a key.
2814
2815        Retrieves the minimum amount of stake necessary for
2816        registering a key within a specific network subnet.
2817
2818        Args:
2819            netuid: The network UID for which to query the minimum stake.
2820
2821        Returns:
2822            The minimum stake required for key registration in nanos.
2823
2824        Raises:
2825            QueryError: If the query to the network fails or is invalid.
2826        """
2827
2828        return self.query("MinStake", params=[netuid])

Queries the network for the minimum stake required to register a key.

Retrieves the minimum amount of stake necessary for registering a key within a specific network subnet.

Arguments:
  • netuid: The network UID for which to query the minimum stake.
Returns:

The minimum stake required for key registration in nanos.

Raises:
  • QueryError: If the query to the network fails or is invalid.
def get_stakefrom(self, key: communex.types.Ss58Address) -> dict[str, int]:
2830    def get_stakefrom(
2831        self,
2832        key: Ss58Address,
2833    ) -> dict[str, int]:
2834        """
2835        Retrieves the stake amounts from all stakers to a specific staked address.
2836
2837        Queries the network for the stakes received by a particular staked address
2838        from all stakers.
2839
2840        Args:
2841            key: The address of the key receiving the stakes.
2842
2843        Returns:
2844            A dictionary mapping staker addresses to their respective stake amounts.
2845
2846        Raises:
2847            QueryError: If the query to the network fails or is invalid.
2848        """
2849
2850        # Has to use query map in order to iterate through the storage prefix.
2851        return self.query_map("StakeFrom", [key], extract_value=False).get("StakeFrom", {})

Retrieves the stake amounts from all stakers to a specific staked address.

Queries the network for the stakes received by a particular staked address from all stakers.

Arguments:
  • key: The address of the key receiving the stakes.
Returns:

A dictionary mapping staker addresses to their respective stake amounts.

Raises:
  • QueryError: If the query to the network fails or is invalid.
def get_staketo(self, key: communex.types.Ss58Address) -> dict[str, int]:
2853    def get_staketo(
2854        self,
2855        key: Ss58Address,
2856    ) -> dict[str, int]:
2857        """
2858        Retrieves the stake amounts provided by a specific staker to all staked addresses.
2859
2860        Queries the network for the stakes provided by a particular staker to
2861        all staked addresses.
2862
2863        Args:
2864            key: The address of the key providing the stakes.
2865
2866        Returns:
2867            A dictionary mapping staked addresses to their respective received stake amounts.
2868
2869        Raises:
2870            QueryError: If the query to the network fails or is invalid.
2871        """
2872
2873        # Has to use query map in order to iterate through the storage prefix.
2874        return self.query_map("StakeTo", [key], extract_value=False).get("StakeTo", {})

Retrieves the stake amounts provided by a specific staker to all staked addresses.

Queries the network for the stakes provided by a particular staker to all staked addresses.

Arguments:
  • key: The address of the key providing the stakes.
Returns:

A dictionary mapping staked addresses to their respective received stake amounts.

Raises:
  • QueryError: If the query to the network fails or is invalid.
def get_balance(self, addr: communex.types.Ss58Address) -> int:
2876    def get_balance(
2877        self,
2878        addr: Ss58Address,
2879    ) -> int:
2880        """
2881        Retrieves the balance of a specific key.
2882
2883        Args:
2884            addr: The address of the key to query the balance for.
2885
2886        Returns:
2887            The balance of the specified key.
2888
2889        Raises:
2890            QueryError: If the query to the network fails or is invalid.
2891        """
2892
2893        result = self.query("Account", module="System", params=[addr])
2894
2895        return result["data"]["free"]

Retrieves the balance of a specific key.

Arguments:
  • addr: The address of the key to query the balance for.
Returns:

The balance of the specified key.

Raises:
  • QueryError: If the query to the network fails or is invalid.
def get_block( self, block_hash: str | None = None) -> dict[typing.Any, typing.Any] | None:
2897    def get_block(self, block_hash: str | None = None) -> dict[Any, Any] | None:
2898        """
2899        Retrieves information about a specific block in the network.
2900
2901        Queries the network for details about a block, such as its number,
2902        hash, and other relevant information.
2903
2904        Returns:
2905            The requested information about the block,
2906            or None if the block does not exist
2907            or the information is not available.
2908
2909        Raises:
2910            QueryError: If the query to the network fails or is invalid.
2911        """
2912
2913        with self.get_conn() as substrate:
2914            block: dict[Any, Any] | None = substrate.get_block(  # type: ignore
2915                block_hash  # type: ignore
2916            )
2917
2918        return block

Retrieves information about a specific block in the network.

Queries the network for details about a block, such as its number, hash, and other relevant information.

Returns:

The requested information about the block, or None if the block does not exist or the information is not available.

Raises:
  • QueryError: If the query to the network fails or is invalid.
def get_existential_deposit(self, block_hash: str | None = None) -> int:
2920    def get_existential_deposit(self, block_hash: str | None = None) -> int:
2921        """
2922        Retrieves the existential deposit value for the network.
2923
2924        The existential deposit is the minimum balance that must be maintained
2925        in an account to prevent it from being purged. Denotated in nano units.
2926
2927        Returns:
2928            The existential deposit value in nano units.
2929        Note:
2930            The value returned is a fixed value defined in the
2931            client and may not reflect changes in the network's configuration.
2932        """
2933
2934        with self.get_conn() as substrate:
2935            result: int = substrate.get_constant(  #  type: ignore
2936                "Balances", "ExistentialDeposit", block_hash
2937            ).value  #  type: ignore
2938
2939        return result

Retrieves the existential deposit value for the network.

The existential deposit is the minimum balance that must be maintained in an account to prevent it from being purged. Denotated in nano units.

Returns:

The existential deposit value in nano units.

Note:

The value returned is a fixed value defined in the client and may not reflect changes in the network's configuration.

def get_voting_power_delegators(self) -> list[communex.types.Ss58Address]:
2941    def get_voting_power_delegators(self) -> list[Ss58Address]:
2942        result = self.query("NotDelegatingVotingPower", [], module="GovernanceModule")
2943        return result
def add_transfer_dao_treasury_proposal( self, key: substrateinterface.keypair.Keypair, data: str, amount_nano: int, dest: communex.types.Ss58Address):
2945    def add_transfer_dao_treasury_proposal(
2946        self,
2947        key: Keypair,
2948        data: str,
2949        amount_nano: int,
2950        dest: Ss58Address,
2951    ):
2952        params = {"dest": dest, "value": amount_nano, "data": data}
2953
2954        return self.compose_call(
2955            module="GovernanceModule",
2956            fn="add_transfer_dao_treasury_proposal",
2957            params=params,
2958            key=key,
2959        )
def delegate_rootnet_control( self, key: substrateinterface.keypair.Keypair, dest: communex.types.Ss58Address):
2961    def delegate_rootnet_control(self, key: Keypair, dest: Ss58Address):
2962        params = {"origin": key, "target": dest}
2963
2964        return self.compose_call(
2965            module="SubspaceModule",
2966            fn="delegate_rootnet_control",
2967            params=params,
2968            key=key,
2969        )