communex.client
1import json 2import queue 3from concurrent.futures import Future, ThreadPoolExecutor 4from contextlib import contextmanager 5from copy import deepcopy 6from dataclasses import dataclass 7from typing import Any, Mapping, TypeVar, cast 8 9from substrateinterface import ( 10 ExtrinsicReceipt, 11 Keypair, 12 SubstrateInterface, 13) 14from substrateinterface.storage import StorageKey 15 16from communex._common import transform_stake_dmap 17from communex.errors import ChainTransactionError, NetworkQueryError 18from communex.types import NetworkParams, Ss58Address, SubnetParams 19 20# TODO: InsufficientBalanceError, MismatchedLengthError etc 21 22MAX_REQUEST_SIZE = 9_000_000 23 24 25@dataclass 26class Chunk: 27 batch_requests: list[tuple[Any, Any]] 28 prefix_list: list[list[str]] 29 fun_params: list[tuple[Any, Any, Any, Any, str]] 30 31 32T1 = TypeVar("T1") 33T2 = TypeVar("T2") 34 35 36class CommuneClient: 37 """ 38 A client for interacting with Commune network nodes, querying storage, 39 submitting transactions, etc. 40 41 Attributes: 42 wait_for_finalization: Whether to wait for transaction finalization. 43 44 Example: 45 ```py 46 client = CommuneClient() 47 client.query(name='function_name', params=['param1', 'param2']) 48 ``` 49 50 Raises: 51 AssertionError: If the maximum connections value is less than or equal 52 to zero. 53 """ 54 55 wait_for_finalization: bool 56 _num_connections: int 57 _connection_queue: queue.Queue[SubstrateInterface] 58 url: str 59 60 def __init__( 61 self, 62 url: str, 63 num_connections: int = 1, 64 wait_for_finalization: bool = False, 65 timeout: int | None = None, 66 ): 67 """ 68 Args: 69 url: The URL of the network node to connect to. 70 num_connections: The number of websocket connections to be opened. 71 """ 72 assert num_connections > 0 73 self._num_connections = num_connections 74 self.wait_for_finalization = wait_for_finalization 75 self._connection_queue = queue.Queue(num_connections) 76 self.url = url 77 ws_options: dict[str, int] = {} 78 if timeout is not None: 79 ws_options["timeout"] = timeout 80 self.ws_options = ws_options 81 for _ in range(num_connections): 82 self._connection_queue.put( 83 SubstrateInterface(url, ws_options=ws_options) 84 ) 85 86 @property 87 def connections(self) -> int: 88 """ 89 Gets the maximum allowed number of simultaneous connections to the 90 network node. 91 """ 92 return self._num_connections 93 94 @contextmanager 95 def get_conn(self, timeout: float | None = None, init: bool = False): 96 """ 97 Context manager to get a connection from the pool. 98 99 Tries to get a connection from the pool queue. If the queue is empty, 100 it blocks for `timeout` seconds until a connection is available. If 101 `timeout` is None, it blocks indefinitely. 102 103 Args: 104 timeout: The maximum time in seconds to wait for a connection. 105 106 Yields: 107 The connection object from the pool. 108 109 Raises: 110 QueueEmptyError: If no connection is available within the timeout 111 period. 112 """ 113 conn = self._connection_queue.get(timeout=timeout) 114 if init: 115 conn.init_runtime() # type: ignore 116 try: 117 if conn.websocket and conn.websocket.connected: # type: ignore 118 yield conn 119 else: 120 conn = SubstrateInterface(self.url, ws_options=self.ws_options) 121 yield conn 122 finally: 123 self._connection_queue.put(conn) 124 125 def _get_storage_keys( 126 self, 127 storage: str, 128 queries: list[tuple[str, list[Any]]], 129 block_hash: str | None, 130 ): 131 send: list[tuple[str, list[Any]]] = [] 132 prefix_list: list[Any] = [] 133 134 key_idx = 0 135 with self.get_conn(init=True) as substrate: 136 for function, params in queries: 137 storage_key = StorageKey.create_from_storage_function( # type: ignore 138 storage, 139 function, 140 params, 141 runtime_config=substrate.runtime_config, # type: ignore 142 metadata=substrate.metadata, # type: ignore 143 ) 144 145 prefix = storage_key.to_hex() 146 prefix_list.append(prefix) 147 send.append(("state_getKeys", [prefix, block_hash])) 148 key_idx += 1 149 return send, prefix_list 150 151 def _get_lists( 152 self, 153 storage_module: str, 154 queries: list[tuple[str, list[Any]]], 155 substrate: SubstrateInterface, 156 ) -> list[tuple[Any, Any, Any, Any, str]]: 157 """ 158 Generates a list of tuples containing parameters for each storage function based on the given functions and substrate interface. 159 160 Args: 161 functions (dict[str, list[query_call]]): A dictionary where keys are storage module names and values are lists of tuples. 162 Each tuple consists of a storage function name and its parameters. 163 substrate: An instance of the SubstrateInterface class used to interact with the substrate. 164 165 Returns: 166 A list of tuples in the format `(value_type, param_types, key_hashers, params, storage_function)` for each storage function in the given functions. 167 168 Example: 169 >>> _get_lists( 170 functions={'storage_module': [('storage_function', ['param1', 'param2'])]}, 171 substrate=substrate_instance 172 ) 173 [('value_type', 'param_types', 'key_hashers', ['param1', 'param2'], 'storage_function'), ...] 174 """ 175 176 function_parameters: list[tuple[Any, Any, Any, Any, str]] = [] 177 178 metadata_pallet = substrate.metadata.get_metadata_pallet( # type: ignore 179 storage_module 180 ) 181 for storage_function, params in queries: 182 storage_item = metadata_pallet.get_storage_function( # type: ignore 183 storage_function 184 ) 185 186 value_type = storage_item.get_value_type_string() # type: ignore 187 param_types = storage_item.get_params_type_string() # type: ignore 188 key_hashers = storage_item.get_param_hashers() # type: ignore 189 function_parameters.append( 190 ( 191 value_type, 192 param_types, 193 key_hashers, 194 params, 195 storage_function, 196 ) # type: ignore 197 ) 198 return function_parameters 199 200 def _send_batch( 201 self, 202 batch_payload: list[Any], 203 request_ids: list[int], 204 extract_result: bool = True, 205 ): 206 """ 207 Sends a batch of requests to the substrate and collects the results. 208 209 Args: 210 substrate: An instance of the substrate interface. 211 batch_payload: The payload of the batch request. 212 request_ids: A list of request IDs for tracking responses. 213 results: A list to store the results of the requests. 214 extract_result: Whether to extract the result from the response. 215 216 Raises: 217 NetworkQueryError: If there is an `error` in the response message. 218 219 Note: 220 No explicit return value as results are appended to the provided 'results' list. 221 """ 222 results: list[str | dict[Any, Any]] = [] 223 with self.get_conn(init=True) as substrate: 224 try: 225 substrate.websocket.send( # Â type: ignore 226 json.dumps(batch_payload) 227 ) 228 except NetworkQueryError: 229 pass 230 while len(results) < len(request_ids): 231 received_messages = json.loads( 232 substrate.websocket.recv() # type: ignore 233 ) 234 if isinstance(received_messages, dict): 235 received_messages: list[dict[Any, Any]] = [ 236 received_messages 237 ] 238 239 for message in received_messages: 240 if message.get("id") in request_ids: 241 if extract_result: 242 try: 243 results.append(message["result"]) 244 except Exception: 245 raise ( 246 RuntimeError( 247 f"Error extracting result from message: {message}" 248 ) 249 ) 250 else: 251 results.append(message) 252 if "error" in message: 253 raise NetworkQueryError(message["error"]) 254 255 return results 256 257 def _make_request_smaller( 258 self, 259 batch_request: list[tuple[T1, T2]], 260 prefix_list: list[list[str]], 261 fun_params: list[tuple[Any, Any, Any, Any, str]], 262 ) -> tuple[list[list[tuple[T1, T2]]], list[Chunk]]: 263 """ 264 Splits a batch of requests into smaller batches, each not exceeding the specified maximum size. 265 266 Args: 267 batch_request: A list of requests to be sent in a batch. 268 max_size: Maximum size of each batch in bytes. 269 270 Returns: 271 A list of smaller request batches. 272 273 Example: 274 >>> _make_request_smaller(batch_request=[('method1', 'params1'), ('method2', 'params2')], max_size=1000) 275 [[('method1', 'params1')], [('method2', 'params2')]] 276 """ 277 assert len(prefix_list) == len(fun_params) == len(batch_request) 278 279 def estimate_size(request: tuple[T1, T2]): 280 """Convert the batch request to a string and measure its length""" 281 return len(json.dumps(request)) 282 283 # Initialize variables 284 result: list[list[tuple[T1, T2]]] = [] 285 current_batch = [] 286 current_prefix_batch = [] 287 current_params_batch = [] 288 current_size = 0 289 290 chunk_list: list[Chunk] = [] 291 292 # Iterate through each request in the batch 293 for request, prefix, params in zip( 294 batch_request, prefix_list, fun_params 295 ): 296 request_size = estimate_size(request) 297 298 # Check if adding this request exceeds the max size 299 if current_size + request_size > MAX_REQUEST_SIZE: 300 # If so, start a new batch 301 302 # Essentiatly checks that it's not the first iteration 303 if current_batch: 304 chunk = Chunk( 305 current_batch, 306 current_prefix_batch, 307 current_params_batch, 308 ) 309 chunk_list.append(chunk) 310 result.append(current_batch) 311 312 current_batch = [request] 313 current_prefix_batch = [prefix] 314 current_params_batch = [params] 315 current_size = request_size 316 else: 317 # Otherwise, add to the current batch 318 current_batch.append(request) 319 current_size += request_size 320 current_prefix_batch.append(prefix) 321 current_params_batch.append(params) 322 323 # Add the last batch if it's not empty 324 if current_batch: 325 result.append(current_batch) 326 chunk = Chunk( 327 current_batch, current_prefix_batch, current_params_batch 328 ) 329 chunk_list.append(chunk) 330 331 return result, chunk_list 332 333 def _are_changes_equal(self, change_a: Any, change_b: Any): 334 for (a, b), (c, d) in zip(change_a, change_b): 335 if a != c or b != d: 336 return False 337 338 def _rpc_request_batch( 339 self, 340 batch_requests: list[tuple[str, list[Any]]], 341 extract_result: bool = True, 342 ) -> list[str]: 343 """ 344 Sends batch requests to the substrate node using multiple threads and collects the results. 345 346 Args: 347 substrate: An instance of the substrate interface. 348 batch_requests : A list of requests to be sent in batches. 349 max_size: Maximum size of each batch in bytes. 350 extract_result: Whether to extract the result from the response message. 351 352 Returns: 353 A list of results from the batch requests. 354 355 Example: 356 >>> _rpc_request_batch(substrate_instance, [('method1', ['param1']), ('method2', ['param2'])]) 357 ['result1', 'result2', ...] 358 """ 359 360 chunk_results: list[Any] = [] 361 # smaller_requests = self._make_request_smaller(batch_requests) 362 request_id = 0 363 with ThreadPoolExecutor() as executor: 364 futures: list[Future[list[str | dict[Any, Any]]]] = [] 365 for chunk in [batch_requests]: 366 request_ids: list[int] = [] 367 batch_payload: list[Any] = [] 368 for method, params in chunk: 369 request_id += 1 370 request_ids.append(request_id) 371 batch_payload.append( 372 { 373 "jsonrpc": "2.0", 374 "method": method, 375 "params": params, 376 "id": request_id, 377 } 378 ) 379 380 futures.append( 381 executor.submit( 382 self._send_batch, 383 batch_payload=batch_payload, 384 request_ids=request_ids, 385 extract_result=extract_result, 386 ) 387 ) 388 for future in futures: 389 resul = future.result() 390 chunk_results.append(resul) 391 return chunk_results 392 393 def _rpc_request_batch_chunked( 394 self, chunk_requests: list[Chunk], extract_result: bool = True 395 ): 396 """ 397 Sends batch requests to the substrate node using multiple threads and collects the results. 398 399 Args: 400 substrate: An instance of the substrate interface. 401 batch_requests : A list of requests to be sent in batches. 402 max_size: Maximum size of each batch in bytes. 403 extract_result: Whether to extract the result from the response message. 404 405 Returns: 406 A list of results from the batch requests. 407 408 Example: 409 >>> _rpc_request_batch(substrate_instance, [('method1', ['param1']), ('method2', ['param2'])]) 410 ['result1', 'result2', ...] 411 """ 412 413 def split_chunks( 414 chunk: Chunk, chunk_info: list[Chunk], chunk_info_idx: int 415 ): 416 manhattam_chunks: list[tuple[Any, Any]] = [] 417 mutaded_chunk_info = deepcopy(chunk_info) 418 max_n_keys = 35000 419 for query in chunk.batch_requests: 420 result_keys = query[1][0] 421 keys_amount = len(result_keys) 422 if keys_amount > max_n_keys: 423 mutaded_chunk_info.pop(chunk_info_idx) 424 for i in range(0, keys_amount, max_n_keys): 425 new_chunk = deepcopy(chunk) 426 splitted_keys = result_keys[i : i + max_n_keys] 427 splitted_query = deepcopy(query) 428 splitted_query[1][0] = splitted_keys 429 new_chunk.batch_requests = [splitted_query] 430 manhattam_chunks.append(splitted_query) 431 mutaded_chunk_info.insert(chunk_info_idx, new_chunk) 432 else: 433 manhattam_chunks.append(query) 434 return manhattam_chunks, mutaded_chunk_info 435 436 assert len(chunk_requests) > 0 437 mutated_chunk_info: list[Chunk] = [] 438 chunk_results: list[Any] = [] 439 # smaller_requests = self._make_request_smaller(batch_requests) 440 request_id = 0 441 442 with ThreadPoolExecutor() as executor: 443 futures: list[Future[list[str | dict[Any, Any]]]] = [] 444 for idx, macro_chunk in enumerate(chunk_requests): 445 _, mutated_chunk_info = split_chunks( 446 macro_chunk, chunk_requests, idx 447 ) 448 for chunk in mutated_chunk_info: 449 request_ids: list[int] = [] 450 batch_payload: list[Any] = [] 451 for method, params in chunk.batch_requests: 452 # for method, params in micro_chunk: 453 request_id += 1 454 request_ids.append(request_id) 455 batch_payload.append( 456 { 457 "jsonrpc": "2.0", 458 "method": method, 459 "params": params, 460 "id": request_id, 461 } 462 ) 463 futures.append( 464 executor.submit( 465 self._send_batch, 466 batch_payload=batch_payload, 467 request_ids=request_ids, 468 extract_result=extract_result, 469 ) 470 ) 471 for future in futures: 472 resul = future.result() 473 chunk_results.append(resul) 474 return chunk_results, mutated_chunk_info 475 476 def _decode_response( 477 self, 478 response: list[str], 479 function_parameters: list[tuple[Any, Any, Any, Any, str]], 480 prefix_list: list[Any], 481 block_hash: str, 482 ) -> dict[str, dict[Any, Any]]: 483 """ 484 Decodes a response from the substrate interface and organizes the data into a dictionary. 485 486 Args: 487 response: A list of encoded responses from a substrate query. 488 function_parameters: A list of tuples containing the parameters for each storage function. 489 last_keys: A list of the last keys used in the substrate query. 490 prefix_list: A list of prefixes used in the substrate query. 491 substrate: An instance of the SubstrateInterface class. 492 block_hash: The hash of the block to be queried. 493 494 Returns: 495 A dictionary where each key is a storage function name and the value is another dictionary. 496 This inner dictionary's key is the decoded key from the response and the value is the corresponding decoded value. 497 498 Raises: 499 ValueError: If an unsupported hash type is encountered in the `concat_hash_len` function. 500 501 Example: 502 >>> _decode_response( 503 response=[...], 504 function_parameters=[...], 505 last_keys=[...], 506 prefix_list=[...], 507 substrate=substrate_instance, 508 block_hash="0x123..." 509 ) 510 {'storage_function_name': {decoded_key: decoded_value, ...}, ...} 511 """ 512 513 def get_item_key_value( 514 item_key: tuple[Any, ...] | Any, 515 ) -> tuple[Any, ...] | Any: 516 if isinstance(item_key, tuple): 517 return tuple(k.value for k in item_key) # type: ignore 518 return item_key.value 519 520 def concat_hash_len(key_hasher: str) -> int: 521 """ 522 Determines the length of the hash based on the given key hasher type. 523 524 Args: 525 key_hasher: The type of key hasher. 526 527 Returns: 528 The length of the hash corresponding to the given key hasher type. 529 530 Raises: 531 ValueError: If the key hasher type is not supported. 532 533 Example: 534 >>> concat_hash_len("Blake2_128Concat") 535 16 536 """ 537 538 if key_hasher == "Blake2_128Concat": 539 return 16 540 elif key_hasher == "Twox64Concat": 541 return 8 542 elif key_hasher == "Identity": 543 return 0 544 else: 545 raise ValueError("Unsupported hash type") 546 547 assert len(response) == len(function_parameters) == len(prefix_list) 548 result_dict: dict[str, dict[Any, Any]] = {} 549 for res, fun_params_tuple, prefix in zip( 550 response, function_parameters, prefix_list 551 ): 552 if not res: 553 continue 554 res = res[0] 555 changes = res["changes"] # type: ignore 556 value_type, param_types, key_hashers, params, storage_function = ( 557 fun_params_tuple 558 ) 559 with self.get_conn(init=True) as substrate: 560 for item in changes: 561 # Determine type string 562 key_type_string: list[Any] = [] 563 for n in range(len(params), len(param_types)): 564 key_type_string.append( 565 f"[u8; {concat_hash_len(key_hashers[n])}]" 566 ) 567 key_type_string.append(param_types[n]) 568 569 item_key_obj = substrate.decode_scale( # type: ignore 570 type_string=f"({', '.join(key_type_string)})", 571 scale_bytes="0x" + item[0][len(prefix) :], 572 return_scale_obj=True, 573 block_hash=block_hash, 574 ) 575 # strip key_hashers to use as item key 576 if len(param_types) - len(params) == 1: 577 item_key = item_key_obj.value_object[1] # type: ignore 578 else: 579 item_key = tuple( # type: ignore 580 item_key_obj.value_object[key + 1] # type: ignore 581 for key in range( # type: ignore 582 len(params), len(param_types) + 1, 2 583 ) 584 ) 585 586 item_value = substrate.decode_scale( # type: ignore 587 type_string=value_type, 588 scale_bytes=item[1], 589 return_scale_obj=True, 590 block_hash=block_hash, 591 ) 592 result_dict.setdefault(storage_function, {}) 593 key = get_item_key_value(item_key) # type: ignore 594 result_dict[storage_function][key] = item_value.value # type: ignore 595 596 return result_dict 597 598 def query_batch( 599 self, functions: dict[str, list[tuple[str, list[Any]]]] 600 ) -> dict[str, str]: 601 """ 602 Executes batch queries on a substrate and returns results in a dictionary format. 603 604 Args: 605 substrate: An instance of SubstrateInterface to interact with the substrate. 606 functions (dict[str, list[query_call]]): A dictionary mapping module names to lists of query calls (function name and parameters). 607 608 Returns: 609 A dictionary where keys are storage function names and values are the query results. 610 611 Raises: 612 Exception: If no result is found from the batch queries. 613 614 Example: 615 >>> query_batch(substrate_instance, {'module_name': [('function_name', ['param1', 'param2'])]}) 616 {'function_name': 'query_result', ...} 617 """ 618 619 result: dict[str, str] = {} 620 if not functions: 621 raise Exception("No result") 622 with self.get_conn(init=True) as substrate: 623 for module, queries in functions.items(): 624 storage_keys: list[Any] = [] 625 for fn, params in queries: 626 storage_function = substrate.create_storage_key( # type: ignore 627 pallet=module, storage_function=fn, params=params 628 ) 629 storage_keys.append(storage_function) 630 631 block_hash = substrate.get_block_hash() 632 responses: list[Any] = substrate.query_multi( # type: ignore 633 storage_keys=storage_keys, block_hash=block_hash 634 ) 635 636 for item in responses: 637 fun = item[0] 638 query = item[1] 639 storage_fun = fun.storage_function 640 result[storage_fun] = query.value 641 642 return result 643 644 def query_batch_map( 645 self, 646 functions: dict[str, list[tuple[str, list[Any]]]], 647 block_hash: str | None = None, 648 ) -> dict[str, dict[Any, Any]]: 649 """ 650 Queries multiple storage functions using a map batch approach and returns the combined result. 651 652 Args: 653 substrate: An instance of SubstrateInterface for substrate interaction. 654 functions (dict[str, list[query_call]]): A dictionary mapping module names to lists of query calls. 655 656 Returns: 657 The combined result of the map batch query. 658 659 Example: 660 >>> query_batch_map(substrate_instance, {'module_name': [('function_name', ['param1', 'param2'])]}) 661 # Returns the combined result of the map batch query 662 """ 663 multi_result: dict[str, dict[Any, Any]] = {} 664 665 def recursive_update( 666 d: dict[str, dict[T1, T2] | dict[str, Any]], 667 u: Mapping[str, dict[Any, Any] | str], 668 ) -> dict[str, dict[T1, T2]]: 669 for k, v in u.items(): 670 if isinstance(v, dict): 671 d[k] = recursive_update(d.get(k, {}), v) # type: ignore 672 else: 673 d[k] = v # type: ignore 674 return d # type: ignore 675 676 def get_page(): 677 send, prefix_list = self._get_storage_keys( 678 storage, queries, block_hash 679 ) 680 with self.get_conn(init=True) as substrate: 681 function_parameters = self._get_lists( 682 storage, queries, substrate 683 ) 684 responses = self._rpc_request_batch(send) 685 # assumption because send is just the storage_function keys 686 # so it should always be really small regardless of the amount of queries 687 assert len(responses) == 1 688 res = responses[0] 689 built_payload: list[tuple[str, list[Any]]] = [] 690 for result_keys in res: 691 built_payload.append( 692 ("state_queryStorageAt", [result_keys, block_hash]) 693 ) 694 _, chunks_info = self._make_request_smaller( 695 built_payload, prefix_list, function_parameters 696 ) 697 chunks_response, chunks_info = self._rpc_request_batch_chunked( 698 chunks_info 699 ) 700 return chunks_response, chunks_info 701 702 if not block_hash: 703 with self.get_conn(init=True) as substrate: 704 block_hash = substrate.get_block_hash() 705 for storage, queries in functions.items(): 706 chunks, chunks_info = get_page() 707 # if this doesn't happen something is wrong on the code 708 # and we won't be able to decode the data properly 709 assert len(chunks) == len(chunks_info) 710 for chunk_info, response in zip(chunks_info, chunks): 711 storage_result = self._decode_response( 712 response, 713 chunk_info.fun_params, 714 chunk_info.prefix_list, 715 block_hash, 716 ) 717 multi_result = recursive_update(multi_result, storage_result) 718 719 return multi_result 720 721 def query( 722 self, 723 name: str, 724 params: list[Any] = [], 725 module: str = "SubspaceModule", 726 block_hash: str | None = None, 727 ) -> Any: 728 """ 729 Queries a storage function on the network. 730 731 Sends a query to the network and retrieves data from a 732 specified storage function. 733 734 Args: 735 name: The name of the storage function to query. 736 params: The parameters to pass to the storage function. 737 module: The module where the storage function is located. 738 739 Returns: 740 The result of the query from the network. 741 742 Raises: 743 NetworkQueryError: If the query fails or is invalid. 744 """ 745 746 result = self.query_batch({module: [(name, params)]}) 747 748 return result[name] 749 750 def query_map( 751 self, 752 name: str, 753 params: list[Any] = [], 754 module: str = "SubspaceModule", 755 extract_value: bool = True, 756 block_hash: str | None = None, 757 ) -> dict[Any, Any]: 758 """ 759 Queries a storage map from a network node. 760 761 Args: 762 name: The name of the storage map to query. 763 params: A list of parameters for the query. 764 module: The module in which the storage map is located. 765 766 Returns: 767 A dictionary representing the key-value pairs 768 retrieved from the storage map. 769 770 Raises: 771 QueryError: If the query to the network fails or is invalid. 772 """ 773 774 result = self.query_batch_map({module: [(name, params)]}, block_hash) 775 776 if extract_value: 777 return {k.value: v.value for k, v in result} # type: ignore 778 779 return result 780 781 def compose_call( 782 self, 783 fn: str, 784 params: dict[str, Any], 785 key: Keypair | None, 786 module: str = "SubspaceModule", 787 wait_for_inclusion: bool = True, 788 wait_for_finalization: bool | None = None, 789 sudo: bool = False, 790 unsigned: bool = False, 791 ) -> ExtrinsicReceipt: 792 """ 793 Composes and submits a call to the network node. 794 795 Composes and signs a call with the provided keypair, and submits it to 796 the network. The call can be a standard extrinsic or a sudo extrinsic if 797 elevated permissions are required. The method can optionally wait for 798 the call's inclusion in a block and/or its finalization. 799 800 Args: 801 fn: The function name to call on the network. 802 params: A dictionary of parameters for the call. 803 key: The keypair for signing the extrinsic. 804 module: The module containing the function. 805 wait_for_inclusion: Wait for the call's inclusion in a block. 806 wait_for_finalization: Wait for the transaction's finalization. 807 sudo: Execute the call as a sudo (superuser) operation. 808 809 Returns: 810 The receipt of the submitted extrinsic, if 811 `wait_for_inclusion` is True. Otherwise, returns a string 812 identifier of the extrinsic. 813 814 Raises: 815 ChainTransactionError: If the transaction fails. 816 """ 817 818 if key is None and not unsigned: 819 raise ValueError("Key must be provided for signed extrinsics.") 820 821 with self.get_conn() as substrate: 822 if wait_for_finalization is None: 823 wait_for_finalization = self.wait_for_finalization 824 825 call = substrate.compose_call( # type: ignore 826 call_module=module, call_function=fn, call_params=params 827 ) 828 if sudo: 829 call = substrate.compose_call( # type: ignore 830 call_module="Sudo", 831 call_function="sudo", 832 call_params={ 833 "call": call.value, # type: ignore 834 }, 835 ) 836 837 if not unsigned: 838 assert key is not None 839 extrinsic = substrate.create_signed_extrinsic( # type: ignore 840 call=call, 841 keypair=key, 842 ) 843 else: 844 extrinsic = substrate.create_unsigned_extrinsic(call=call) # type: ignore 845 846 response = substrate.submit_extrinsic( 847 extrinsic=extrinsic, 848 wait_for_inclusion=wait_for_inclusion, 849 wait_for_finalization=wait_for_finalization, 850 ) 851 if wait_for_inclusion: 852 if not response.is_success: 853 raise ChainTransactionError( 854 response.error_message, # type: ignore 855 response, # type: ignore 856 ) 857 858 return response 859 860 def compose_call_multisig( 861 self, 862 fn: str, 863 params: dict[str, Any], 864 key: Keypair, 865 signatories: list[Ss58Address], 866 threshold: int, 867 module: str = "SubspaceModule", 868 wait_for_inclusion: bool = True, 869 wait_for_finalization: bool | None = None, 870 sudo: bool = False, 871 era: dict[str, int] | None = None, 872 ) -> ExtrinsicReceipt: 873 """ 874 Composes and submits a multisignature call to the network node. 875 876 This method allows the composition and submission of a call that 877 requires multiple signatures for execution, known as a multisignature 878 call. It supports specifying signatories, a threshold of signatures for 879 the call's execution, and an optional era for the call's mortality. The 880 call can be a standard extrinsic, a sudo extrinsic for elevated 881 permissions, or a multisig extrinsic if multiple signatures are 882 required. Optionally, the method can wait for the call's inclusion in a 883 block and/or its finalization. Make sure to pass all keys, 884 that are part of the multisignature. 885 886 Args: 887 fn: The function name to call on the network. params: A dictionary 888 of parameters for the call. key: The keypair for signing the 889 extrinsic. signatories: List of SS58 addresses of the signatories. 890 Include ALL KEYS that are part of the multisig. threshold: The 891 minimum number of signatories required to execute the extrinsic. 892 module: The module containing the function to call. 893 wait_for_inclusion: Whether to wait for the call's inclusion in a 894 block. wait_for_finalization: Whether to wait for the transaction's 895 finalization. sudo: Execute the call as a sudo (superuser) 896 operation. era: Specifies the call's mortality in terms of blocks in 897 the format 898 {'period': amount_blocks}. If omitted, the extrinsic is 899 immortal. 900 901 Returns: 902 The receipt of the submitted extrinsic if `wait_for_inclusion` is 903 True. Otherwise, returns a string identifier of the extrinsic. 904 905 Raises: 906 ChainTransactionError: If the transaction fails. 907 """ 908 909 # getting the call ready 910 with self.get_conn() as substrate: 911 if wait_for_finalization is None: 912 wait_for_finalization = self.wait_for_finalization 913 914 # prepares the `GenericCall` object 915 call = substrate.compose_call( # type: ignore 916 call_module=module, call_function=fn, call_params=params 917 ) 918 if sudo: 919 call = substrate.compose_call( # type: ignore 920 call_module="Sudo", 921 call_function="sudo", 922 call_params={ 923 "call": call.value, # type: ignore 924 }, 925 ) 926 927 # modify the rpc methods at runtime, to allow for correct payment 928 # fee calculation parity has a bug in this version, 929 # where the method has to be removed 930 rpc_methods = substrate.config.get("rpc_methods") # type: ignore 931 932 if "state_call" in rpc_methods: # type: ignore 933 rpc_methods.remove("state_call") # type: ignore 934 935 # create the multisig account 936 multisig_acc = substrate.generate_multisig_account( # type: ignore 937 signatories, threshold 938 ) 939 940 # send the multisig extrinsic 941 extrinsic = substrate.create_multisig_extrinsic( # type: ignore 942 call=call, # type: ignore 943 keypair=key, 944 multisig_account=multisig_acc, # type: ignore 945 era=era, # type: ignore 946 ) # type: ignore 947 948 response = substrate.submit_extrinsic( 949 extrinsic=extrinsic, 950 wait_for_inclusion=wait_for_inclusion, 951 wait_for_finalization=wait_for_finalization, 952 ) 953 954 if wait_for_inclusion: 955 if not response.is_success: 956 raise ChainTransactionError( 957 response.error_message, # type: ignore 958 response, # type: ignore 959 ) 960 961 return response 962 963 def transfer( 964 self, 965 key: Keypair, 966 amount: int, 967 dest: Ss58Address, 968 ) -> ExtrinsicReceipt: 969 """ 970 Transfers a specified amount of tokens from the signer's account to the 971 specified account. 972 973 Args: 974 key: The keypair associated with the sender's account. 975 amount: The amount to transfer, in nanotokens. 976 dest: The SS58 address of the recipient. 977 978 Returns: 979 A receipt of the transaction. 980 981 Raises: 982 InsufficientBalanceError: If the sender's account does not have 983 enough balance. 984 ChainTransactionError: If the transaction fails. 985 """ 986 987 params = {"dest": dest, "value": amount} 988 989 return self.compose_call( 990 module="Balances", fn="transfer_keep_alive", params=params, key=key 991 ) 992 993 def transfer_multiple( 994 self, 995 key: Keypair, 996 destinations: list[Ss58Address], 997 amounts: list[int], 998 netuid: str | int = 0, 999 ) -> ExtrinsicReceipt: 1000 """ 1001 Transfers specified amounts of tokens from the signer's account to 1002 multiple target accounts. 1003 1004 The `destinations` and `amounts` lists must be of the same length. 1005 1006 Args: 1007 key: The keypair associated with the sender's account. 1008 destinations: A list of SS58 addresses of the recipients. 1009 amounts: Amount to transfer to each recipient, in nanotokens. 1010 netuid: The network identifier. 1011 1012 Returns: 1013 A receipt of the transaction. 1014 1015 Raises: 1016 InsufficientBalanceError: If the sender's account does not have 1017 enough balance for all transfers. 1018 ChainTransactionError: If the transaction fails. 1019 """ 1020 1021 assert len(destinations) == len(amounts) 1022 1023 # extract existential deposit from amounts 1024 existential_deposit = self.get_existential_deposit() 1025 amounts = [a - existential_deposit for a in amounts] 1026 1027 params = { 1028 "netuid": netuid, 1029 "destinations": destinations, 1030 "amounts": amounts, 1031 } 1032 1033 return self.compose_call( 1034 module="SubspaceModule", 1035 fn="transfer_multiple", 1036 params=params, 1037 key=key, 1038 ) 1039 1040 def stake( 1041 self, 1042 key: Keypair, 1043 amount: int, 1044 dest: Ss58Address, 1045 ) -> ExtrinsicReceipt: 1046 """ 1047 Stakes the specified amount of tokens to a module key address. 1048 1049 Args: 1050 key: The keypair associated with the staker's account. 1051 amount: The amount of tokens to stake, in nanotokens. 1052 dest: The SS58 address of the module key to stake to. 1053 netuid: The network identifier. 1054 1055 Returns: 1056 A receipt of the staking transaction. 1057 1058 Raises: 1059 InsufficientBalanceError: If the staker's account does not have 1060 enough balance. 1061 ChainTransactionError: If the transaction fails. 1062 """ 1063 1064 params = {"amount": amount, "module_key": dest} 1065 1066 return self.compose_call(fn="add_stake", params=params, key=key) 1067 1068 def unstake( 1069 self, 1070 key: Keypair, 1071 amount: int, 1072 dest: Ss58Address, 1073 ) -> ExtrinsicReceipt: 1074 """ 1075 Unstakes the specified amount of tokens from a module key address. 1076 1077 Args: 1078 key: The keypair associated with the unstaker's account. 1079 amount: The amount of tokens to unstake, in nanotokens. 1080 dest: The SS58 address of the module key to unstake from. 1081 netuid: The network identifier. 1082 1083 Returns: 1084 A receipt of the unstaking transaction. 1085 1086 Raises: 1087 InsufficientStakeError: If the staked key does not have enough 1088 staked tokens by the signer key. 1089 ChainTransactionError: If the transaction fails. 1090 """ 1091 1092 params = {"amount": amount, "module_key": dest} 1093 return self.compose_call(fn="remove_stake", params=params, key=key) 1094 1095 def update_module( 1096 self, 1097 key: Keypair, 1098 name: str, 1099 address: str, 1100 metadata: str | None = None, 1101 delegation_fee: int = 20, 1102 netuid: int = 0, 1103 ) -> ExtrinsicReceipt: 1104 """ 1105 Updates the parameters of a registered module. 1106 1107 The delegation fee must be an integer between 0 and 100. 1108 1109 Args: 1110 key: The keypair associated with the module's account. 1111 name: The new name for the module. If None, the name is not updated. 1112 address: The new address for the module. 1113 If None, the address is not updated. 1114 delegation_fee: The new delegation fee for the module, 1115 between 0 and 100. 1116 netuid: The network identifier. 1117 1118 Returns: 1119 A receipt of the module update transaction. 1120 1121 Raises: 1122 InvalidParameterError: If the provided parameters are invalid. 1123 ChainTransactionError: If the transaction fails. 1124 """ 1125 1126 assert isinstance(delegation_fee, int) 1127 params = { 1128 "netuid": netuid, 1129 "name": name, 1130 "address": address, 1131 "delegation_fee": delegation_fee, 1132 "metadata": metadata, 1133 } 1134 1135 response = self.compose_call("update_module", params=params, key=key) 1136 1137 return response 1138 1139 def register_module( 1140 self, 1141 key: Keypair, 1142 name: str, 1143 address: str | None = None, 1144 subnet: str = "Rootnet", 1145 metadata: str | None = None, 1146 ) -> ExtrinsicReceipt: 1147 """ 1148 Registers a new module in the network. 1149 1150 Args: 1151 key: The keypair used for registering the module. 1152 name: The name of the module. If None, a default or previously 1153 set name is used. # How does this work? 1154 address: The address of the module. If None, a default or 1155 previously set address is used. # How does this work? 1156 subnet: The network subnet to register the module in. 1157 min_stake: The minimum stake required for the module, in nanotokens. 1158 If None, a default value is used. 1159 1160 Returns: 1161 A receipt of the registration transaction. 1162 1163 Raises: 1164 InvalidParameterError: If the provided parameters are invalid. 1165 ChainTransactionError: If the transaction fails. 1166 """ 1167 1168 key_addr = key.ss58_address 1169 1170 params = { 1171 "network_name": subnet, 1172 "address": address, 1173 "name": name, 1174 "module_key": key_addr, 1175 "metadata": metadata, 1176 } 1177 1178 response = self.compose_call("register", params=params, key=key) 1179 return response 1180 1181 def deregister_module(self, key: Keypair, netuid: int) -> ExtrinsicReceipt: 1182 """ 1183 Deregisters a module from the network. 1184 1185 Args: 1186 key: The keypair associated with the module's account. 1187 netuid: The network identifier. 1188 1189 Returns: 1190 A receipt of the module deregistration transaction. 1191 1192 Raises: 1193 ChainTransactionError: If the transaction fails. 1194 """ 1195 1196 params = {"netuid": netuid} 1197 1198 response = self.compose_call("deregister", params=params, key=key) 1199 1200 return response 1201 1202 def register_subnet( 1203 self, key: Keypair, name: str, metadata: str | None = None 1204 ) -> ExtrinsicReceipt: 1205 """ 1206 Registers a new subnet in the network. 1207 1208 Args: 1209 key (Keypair): The keypair used for registering the subnet. 1210 name (str): The name of the subnet to be registered. 1211 metadata (str | None, optional): Additional metadata for the subnet. Defaults to None. 1212 1213 Returns: 1214 ExtrinsicReceipt: A receipt of the subnet registration transaction. 1215 1216 Raises: 1217 ChainTransactionError: If the transaction fails. 1218 """ 1219 1220 params = { 1221 "name": name, 1222 "metadata": metadata, 1223 } 1224 1225 response = self.compose_call("register_subnet", params=params, key=key) 1226 1227 return response 1228 1229 def vote( 1230 self, 1231 key: Keypair, 1232 uids: list[int], 1233 weights: list[int], 1234 netuid: int = 0, 1235 ) -> ExtrinsicReceipt: 1236 """ 1237 Casts votes on a list of module UIDs with corresponding weights. 1238 1239 The length of the UIDs list and the weights list should be the same. 1240 Each weight corresponds to the UID at the same index. 1241 1242 Args: 1243 key: The keypair used for signing the vote transaction. 1244 uids: A list of module UIDs to vote on. 1245 weights: A list of weights corresponding to each UID. 1246 netuid: The network identifier. 1247 1248 Returns: 1249 A receipt of the voting transaction. 1250 1251 Raises: 1252 InvalidParameterError: If the lengths of UIDs and weights lists 1253 do not match. 1254 ChainTransactionError: If the transaction fails. 1255 """ 1256 1257 assert len(uids) == len(weights) 1258 1259 params = { 1260 "uids": uids, 1261 "weights": weights, 1262 "netuid": netuid, 1263 } 1264 1265 response = self.compose_call("set_weights", params=params, key=key) 1266 1267 return response 1268 1269 def update_subnet( 1270 self, 1271 key: Keypair, 1272 params: SubnetParams, 1273 netuid: int = 0, 1274 ) -> ExtrinsicReceipt: 1275 """ 1276 Update a subnet's configuration. 1277 1278 It requires the founder key for authorization. 1279 1280 Args: 1281 key: The founder keypair of the subnet. 1282 params: The new parameters for the subnet. 1283 netuid: The network identifier. 1284 1285 Returns: 1286 A receipt of the subnet update transaction. 1287 1288 Raises: 1289 AuthorizationError: If the key is not authorized. 1290 ChainTransactionError: If the transaction fails. 1291 """ 1292 1293 general_params = dict(params) 1294 general_params["netuid"] = netuid 1295 if general_params.get("subnet_metadata") is None: 1296 general_params["metadata"] = None 1297 else: 1298 general_params["metadata"] = general_params["subnet_metadata"] 1299 1300 response = self.compose_call( 1301 fn="update_subnet", 1302 params=general_params, 1303 key=key, 1304 ) 1305 1306 return response 1307 1308 def transfer_stake( 1309 self, 1310 key: Keypair, 1311 amount: int, 1312 from_module_key: Ss58Address, 1313 dest_module_address: Ss58Address, 1314 ) -> ExtrinsicReceipt: 1315 """ 1316 Realocate staked tokens from one staked module to another module. 1317 1318 Args: 1319 key: The keypair associated with the account that is delegating the tokens. 1320 amount: The amount of staked tokens to transfer, in nanotokens. 1321 from_module_key: The SS58 address of the module you want to transfer from (currently delegated by the key). 1322 dest_module_address: The SS58 address of the destination (newly delegated key). 1323 netuid: The network identifier. 1324 1325 Returns: 1326 A receipt of the stake transfer transaction. 1327 1328 Raises: 1329 InsufficientStakeError: If the source module key does not have 1330 enough staked tokens. ChainTransactionError: If the transaction 1331 fails. 1332 """ 1333 1334 amount = amount - self.get_existential_deposit() 1335 1336 params = { 1337 "amount": amount, 1338 "module_key": from_module_key, 1339 "new_module_key": dest_module_address, 1340 } 1341 1342 response = self.compose_call("transfer_stake", key=key, params=params) 1343 1344 return response 1345 1346 def multiunstake( 1347 self, 1348 key: Keypair, 1349 keys: list[Ss58Address], 1350 amounts: list[int], 1351 ) -> ExtrinsicReceipt: 1352 """ 1353 Unstakes tokens from multiple module keys. 1354 1355 And the lists `keys` and `amounts` must be of the same length. Each 1356 amount corresponds to the module key at the same index. 1357 1358 Args: 1359 key: The keypair associated with the unstaker's account. 1360 keys: A list of SS58 addresses of the module keys to unstake from. 1361 amounts: A list of amounts to unstake from each module key, 1362 in nanotokens. 1363 netuid: The network identifier. 1364 1365 Returns: 1366 A receipt of the multi-unstaking transaction. 1367 1368 Raises: 1369 MismatchedLengthError: If the lengths of keys and amounts lists do 1370 not match. InsufficientStakeError: If any of the module keys do not 1371 have enough staked tokens. ChainTransactionError: If the transaction 1372 fails. 1373 """ 1374 1375 assert len(keys) == len(amounts) 1376 1377 params = {"module_keys": keys, "amounts": amounts} 1378 1379 response = self.compose_call( 1380 "remove_stake_multiple", params=params, key=key 1381 ) 1382 1383 return response 1384 1385 def multistake( 1386 self, 1387 key: Keypair, 1388 keys: list[Ss58Address], 1389 amounts: list[int], 1390 ) -> ExtrinsicReceipt: 1391 """ 1392 Stakes tokens to multiple module keys. 1393 1394 The lengths of the `keys` and `amounts` lists must be the same. Each 1395 amount corresponds to the module key at the same index. 1396 1397 Args: 1398 key: The keypair associated with the staker's account. 1399 keys: A list of SS58 addresses of the module keys to stake to. 1400 amounts: A list of amounts to stake to each module key, 1401 in nanotokens. 1402 netuid: The network identifier. 1403 1404 Returns: 1405 A receipt of the multi-staking transaction. 1406 1407 Raises: 1408 MismatchedLengthError: If the lengths of keys and amounts lists 1409 do not match. 1410 ChainTransactionError: If the transaction fails. 1411 """ 1412 1413 assert len(keys) == len(amounts) 1414 1415 params = { 1416 "module_keys": keys, 1417 "amounts": amounts, 1418 } 1419 1420 response = self.compose_call( 1421 "add_stake_multiple", params=params, key=key 1422 ) 1423 1424 return response 1425 1426 def add_profit_shares( 1427 self, 1428 key: Keypair, 1429 keys: list[Ss58Address], 1430 shares: list[int], 1431 ) -> ExtrinsicReceipt: 1432 """ 1433 Allocates profit shares to multiple keys. 1434 1435 The lists `keys` and `shares` must be of the same length, 1436 with each share amount corresponding to the key at the same index. 1437 1438 Args: 1439 key: The keypair associated with the account 1440 distributing the shares. 1441 keys: A list of SS58 addresses to allocate shares to. 1442 shares: A list of share amounts to allocate to each key, 1443 in nanotokens. 1444 1445 Returns: 1446 A receipt of the profit sharing transaction. 1447 1448 Raises: 1449 MismatchedLengthError: If the lengths of keys and shares 1450 lists do not match. 1451 ChainTransactionError: If the transaction fails. 1452 """ 1453 1454 assert len(keys) == len(shares) 1455 1456 params = {"keys": keys, "shares": shares} 1457 1458 response = self.compose_call( 1459 "add_profit_shares", params=params, key=key 1460 ) 1461 1462 return response 1463 1464 def add_subnet_proposal( 1465 self, key: Keypair, params: dict[str, Any], ipfs: str, netuid: int = 0 1466 ) -> ExtrinsicReceipt: 1467 """ 1468 Submits a proposal for creating or modifying a subnet within the 1469 network. 1470 1471 The proposal includes various parameters like the name, founder, share 1472 allocations, and other subnet-specific settings. 1473 1474 Args: 1475 key: The keypair used for signing the proposal transaction. 1476 params: The parameters for the subnet proposal. 1477 netuid: The network identifier. 1478 1479 Returns: 1480 A receipt of the subnet proposal transaction. 1481 1482 Raises: 1483 InvalidParameterError: If the provided subnet 1484 parameters are invalid. 1485 ChainTransactionError: If the transaction fails. 1486 """ 1487 1488 general_params = dict(params) 1489 general_params["netuid"] = netuid 1490 general_params["data"] = ipfs 1491 if "metadata" not in general_params: 1492 general_params["metadata"] = None 1493 1494 # general_params["burn_config"] = json.dumps(general_params["burn_config"]) 1495 response = self.compose_call( 1496 fn="add_subnet_params_proposal", 1497 params=general_params, 1498 key=key, 1499 module="GovernanceModule", 1500 ) 1501 1502 return response 1503 1504 def add_custom_proposal( 1505 self, 1506 key: Keypair, 1507 cid: str, 1508 ) -> ExtrinsicReceipt: 1509 params = {"data": cid} 1510 1511 response = self.compose_call( 1512 fn="add_global_custom_proposal", 1513 params=params, 1514 key=key, 1515 module="GovernanceModule", 1516 ) 1517 return response 1518 1519 def add_custom_subnet_proposal( 1520 self, 1521 key: Keypair, 1522 cid: str, 1523 netuid: int = 0, 1524 ) -> ExtrinsicReceipt: 1525 """ 1526 Submits a proposal for creating or modifying a custom subnet within the 1527 network. 1528 1529 The proposal includes various parameters like the name, founder, share 1530 allocations, and other subnet-specific settings. 1531 1532 Args: 1533 key: The keypair used for signing the proposal transaction. 1534 params: The parameters for the subnet proposal. 1535 netuid: The network identifier. 1536 1537 Returns: 1538 A receipt of the subnet proposal transaction. 1539 """ 1540 1541 params = { 1542 "data": cid, 1543 "netuid": netuid, 1544 } 1545 1546 response = self.compose_call( 1547 fn="add_subnet_custom_proposal", 1548 params=params, 1549 key=key, 1550 module="GovernanceModule", 1551 ) 1552 1553 return response 1554 1555 def add_global_proposal( 1556 self, 1557 key: Keypair, 1558 params: NetworkParams, 1559 cid: str | None, 1560 ) -> ExtrinsicReceipt: 1561 """ 1562 Submits a proposal for altering the global network parameters. 1563 1564 Allows for the submission of a proposal to 1565 change various global parameters 1566 of the network, such as emission rates, rate limits, and voting 1567 thresholds. It is used to 1568 suggest changes that affect the entire network's operation. 1569 1570 Args: 1571 key: The keypair used for signing the proposal transaction. 1572 params: A dictionary containing global network parameters 1573 like maximum allowed subnets, modules, 1574 transaction rate limits, and others. 1575 1576 Returns: 1577 A receipt of the global proposal transaction. 1578 1579 Raises: 1580 InvalidParameterError: If the provided network 1581 parameters are invalid. 1582 ChainTransactionError: If the transaction fails. 1583 """ 1584 general_params = cast(dict[str, Any], params) 1585 cid = cid or "" 1586 general_params["data"] = cid 1587 1588 response = self.compose_call( 1589 fn="add_global_params_proposal", 1590 params=general_params, 1591 key=key, 1592 module="GovernanceModule", 1593 ) 1594 1595 return response 1596 1597 def vote_on_proposal( 1598 self, 1599 key: Keypair, 1600 proposal_id: int, 1601 agree: bool, 1602 ) -> ExtrinsicReceipt: 1603 """ 1604 Casts a vote on a specified proposal within the network. 1605 1606 Args: 1607 key: The keypair used for signing the vote transaction. 1608 proposal_id: The unique identifier of the proposal to vote on. 1609 1610 Returns: 1611 A receipt of the voting transaction in nanotokens. 1612 1613 Raises: 1614 InvalidProposalIDError: If the provided proposal ID does not 1615 exist or is invalid. 1616 ChainTransactionError: If the transaction fails. 1617 """ 1618 1619 params = {"proposal_id": proposal_id, "agree": agree} 1620 1621 response = self.compose_call( 1622 "vote_proposal", 1623 key=key, 1624 params=params, 1625 module="GovernanceModule", 1626 ) 1627 1628 return response 1629 1630 def unvote_on_proposal( 1631 self, 1632 key: Keypair, 1633 proposal_id: int, 1634 ) -> ExtrinsicReceipt: 1635 """ 1636 Retracts a previously cast vote on a specified proposal. 1637 1638 Args: 1639 key: The keypair used for signing the unvote transaction. 1640 proposal_id: The unique identifier of the proposal to withdraw the 1641 vote from. 1642 1643 Returns: 1644 A receipt of the unvoting transaction in nanotokens. 1645 1646 Raises: 1647 InvalidProposalIDError: If the provided proposal ID does not 1648 exist or is invalid. 1649 ChainTransactionError: If the transaction fails to be processed, or 1650 if there was no prior vote to retract. 1651 """ 1652 1653 params = {"proposal_id": proposal_id} 1654 1655 response = self.compose_call( 1656 "remove_vote_proposal", 1657 key=key, 1658 params=params, 1659 module="GovernanceModule", 1660 ) 1661 1662 return response 1663 1664 def enable_vote_power_delegation(self, key: Keypair) -> ExtrinsicReceipt: 1665 """ 1666 Enables vote power delegation for the signer's account. 1667 1668 Args: 1669 key: The keypair used for signing the delegation transaction. 1670 1671 Returns: 1672 A receipt of the vote power delegation transaction. 1673 1674 Raises: 1675 ChainTransactionError: If the transaction fails. 1676 """ 1677 1678 response = self.compose_call( 1679 "enable_vote_power_delegation", 1680 params={}, 1681 key=key, 1682 module="GovernanceModule", 1683 ) 1684 1685 return response 1686 1687 def disable_vote_power_delegation(self, key: Keypair) -> ExtrinsicReceipt: 1688 """ 1689 Disables vote power delegation for the signer's account. 1690 1691 Args: 1692 key: The keypair used for signing the delegation transaction. 1693 1694 Returns: 1695 A receipt of the vote power delegation transaction. 1696 1697 Raises: 1698 ChainTransactionError: If the transaction fails. 1699 """ 1700 1701 response = self.compose_call( 1702 "disable_vote_power_delegation", 1703 params={}, 1704 key=key, 1705 module="GovernanceModule", 1706 ) 1707 1708 return response 1709 1710 def add_dao_application( 1711 self, key: Keypair, application_key: Ss58Address, data: str 1712 ) -> ExtrinsicReceipt: 1713 """ 1714 Submits a new application to the general subnet DAO. 1715 1716 Args: 1717 key: The keypair used for signing the application transaction. 1718 application_key: The SS58 address of the application key. 1719 data: The data associated with the application. 1720 1721 Returns: 1722 A receipt of the application transaction. 1723 1724 Raises: 1725 ChainTransactionError: If the transaction fails. 1726 """ 1727 1728 params = {"application_key": application_key, "data": data} 1729 1730 response = self.compose_call( 1731 "add_dao_application", 1732 module="GovernanceModule", 1733 key=key, 1734 params=params, 1735 ) 1736 1737 return response 1738 1739 def query_map_curator_applications(self) -> dict[str, dict[str, str]]: 1740 query_result = self.query_map( 1741 "CuratorApplications", 1742 module="GovernanceModule", 1743 params=[], 1744 extract_value=False, 1745 ) 1746 applications = query_result.get("CuratorApplications", {}) 1747 return applications 1748 1749 def query_map_proposals( 1750 self, extract_value: bool = False 1751 ) -> dict[int, dict[str, Any]]: 1752 """ 1753 Retrieves a mappping of proposals from the network. 1754 1755 Queries the network and returns a mapping of proposal IDs to 1756 their respective parameters. 1757 1758 Returns: 1759 A dictionary mapping proposal IDs 1760 to dictionaries of their parameters. 1761 1762 Raises: 1763 QueryError: If the query to the network fails or is invalid. 1764 """ 1765 1766 return self.query_map( 1767 "Proposals", extract_value=extract_value, module="GovernanceModule" 1768 )["Proposals"] 1769 1770 def query_map_weights( 1771 self, netuid: int = 0, extract_value: bool = False 1772 ) -> dict[int, list[tuple[int, int]]] | None: 1773 """ 1774 Retrieves a mapping of weights for keys on the network. 1775 1776 Queries the network and returns a mapping of key UIDs to 1777 their respective weights. 1778 1779 Args: 1780 netuid: The network UID from which to get the weights. 1781 1782 Returns: 1783 A dictionary mapping key UIDs to lists of their weights. 1784 1785 Raises: 1786 QueryError: If the query to the network fails or is invalid. 1787 """ 1788 1789 weights_dict = self.query_map( 1790 "Weights", [netuid], extract_value=extract_value 1791 ).get("Weights") 1792 return weights_dict 1793 1794 def query_map_key( 1795 self, 1796 netuid: int = 0, 1797 extract_value: bool = False, 1798 ) -> dict[int, Ss58Address]: 1799 """ 1800 Retrieves a map of keys from the network. 1801 1802 Fetches a mapping of key UIDs to their associated 1803 addresses on the network. 1804 The query can be targeted at a specific network UID if required. 1805 1806 Args: 1807 netuid: The network UID from which to get the keys. 1808 1809 Returns: 1810 A dictionary mapping key UIDs to their addresses. 1811 1812 Raises: 1813 QueryError: If the query to the network fails or is invalid. 1814 """ 1815 return self.query_map("Keys", [netuid], extract_value=extract_value)[ 1816 "Keys" 1817 ] 1818 1819 def query_map_address( 1820 self, netuid: int = 0, extract_value: bool = False 1821 ) -> dict[int, str]: 1822 """ 1823 Retrieves a map of key addresses from the network. 1824 1825 Queries the network for a mapping of key UIDs to their addresses. 1826 1827 Args: 1828 netuid: The network UID from which to get the addresses. 1829 1830 Returns: 1831 A dictionary mapping key UIDs to their addresses. 1832 1833 Raises: 1834 QueryError: If the query to the network fails or is invalid. 1835 """ 1836 1837 return self.query_map("Address", [netuid], extract_value=extract_value)[ 1838 "Address" 1839 ] 1840 1841 def query_map_emission( 1842 self, extract_value: bool = False 1843 ) -> dict[int, list[int]]: 1844 """ 1845 Retrieves a map of emissions for keys on the network. 1846 1847 Queries the network to get a mapping of 1848 key UIDs to their emission values. 1849 1850 Returns: 1851 A dictionary mapping key UIDs to lists of their emission values. 1852 1853 Raises: 1854 QueryError: If the query to the network fails or is invalid. 1855 """ 1856 1857 return self.query_map("Emission", extract_value=extract_value)[ 1858 "Emission" 1859 ] 1860 1861 def query_map_pending_emission(self, extract_value: bool = False) -> int: 1862 """ 1863 Retrieves a map of pending emissions for the subnets. 1864 1865 Queries the network for a mapping of subnet UIDs to their pending emission values. 1866 1867 Returns: 1868 A dictionary mapping subnet UIDs to their pending emission values. 1869 1870 Raises: 1871 QueryError: If the query to the network fails or is invalid. 1872 """ 1873 return self.query_map( 1874 "PendingEmission", 1875 extract_value=extract_value, 1876 module="SubnetEmissionModule", 1877 )["PendingEmission"] 1878 1879 def query_map_subnet_emission( 1880 self, extract_value: bool = False 1881 ) -> dict[int, int]: 1882 """ 1883 Retrieves a map of subnet emissions for the network. 1884 1885 Queries the network for a mapping of subnet UIDs to their emission values. 1886 1887 Returns: 1888 A dictionary mapping subnet UIDs to their emission values. 1889 1890 Raises: 1891 QueryError: If the query to the network fails or is invalid. 1892 """ 1893 1894 return self.query_map( 1895 "SubnetEmission", 1896 extract_value=extract_value, 1897 module="SubnetEmissionModule", 1898 )["SubnetEmission"] 1899 1900 def query_map_subnet_consensus( 1901 self, extract_value: bool = False 1902 ) -> dict[int, str]: 1903 """ 1904 Retrieves a map of subnet consensus types for the network. 1905 1906 Queries the network for a mapping of subnet UIDs to their consensus types. 1907 1908 Returns: 1909 A dictionary mapping subnet UIDs to their consensus types. 1910 1911 Raises: 1912 QueryError: If the query to the network fails or is invalid. 1913 """ 1914 1915 return self.query_map( 1916 "SubnetConsensusType", 1917 extract_value=extract_value, 1918 module="SubnetEmissionModule", 1919 )["SubnetConsensusType"] 1920 1921 def query_map_incentive( 1922 self, extract_value: bool = False 1923 ) -> dict[int, list[int]]: 1924 """ 1925 Retrieves a mapping of incentives for keys on the network. 1926 1927 Queries the network and returns a mapping of key UIDs to 1928 their respective incentive values. 1929 1930 Returns: 1931 A dictionary mapping key UIDs to lists of their incentive values. 1932 1933 Raises: 1934 QueryError: If the query to the network fails or is invalid. 1935 """ 1936 1937 return self.query_map("Incentive", extract_value=extract_value)[ 1938 "Incentive" 1939 ] 1940 1941 def query_map_dividend( 1942 self, extract_value: bool = False 1943 ) -> dict[int, list[int]]: 1944 """ 1945 Retrieves a mapping of dividends for keys on the network. 1946 1947 Queries the network for a mapping of key UIDs to 1948 their dividend values. 1949 1950 Returns: 1951 A dictionary mapping key UIDs to lists of their dividend values. 1952 1953 Raises: 1954 QueryError: If the query to the network fails or is invalid. 1955 """ 1956 1957 return self.query_map("Dividends", extract_value=extract_value)[ 1958 "Dividends" 1959 ] 1960 1961 def query_map_regblock( 1962 self, netuid: int = 0, extract_value: bool = False 1963 ) -> dict[int, int]: 1964 """ 1965 Retrieves a mapping of registration blocks for keys on the network. 1966 1967 Queries the network for a mapping of key UIDs to 1968 the blocks where they were registered. 1969 1970 Args: 1971 netuid: The network UID from which to get the registration blocks. 1972 1973 Returns: 1974 A dictionary mapping key UIDs to their registration blocks. 1975 1976 Raises: 1977 QueryError: If the query to the network fails or is invalid. 1978 """ 1979 1980 return self.query_map( 1981 "RegistrationBlock", [netuid], extract_value=extract_value 1982 )["RegistrationBlock"] 1983 1984 def query_map_lastupdate( 1985 self, extract_value: bool = False 1986 ) -> dict[int, list[int]]: 1987 """ 1988 Retrieves a mapping of the last update times for keys on the network. 1989 1990 Queries the network for a mapping of key UIDs to their last update times. 1991 1992 Returns: 1993 A dictionary mapping key UIDs to lists of their last update times. 1994 1995 Raises: 1996 QueryError: If the query to the network fails or is invalid. 1997 """ 1998 1999 return self.query_map("LastUpdate", extract_value=extract_value)[ 2000 "LastUpdate" 2001 ] 2002 2003 def query_map_stakefrom( 2004 self, extract_value: bool = False 2005 ) -> dict[Ss58Address, list[tuple[Ss58Address, int]]]: 2006 """ 2007 Retrieves a mapping of stakes from various sources for keys on the network. 2008 2009 Queries the network to obtain a mapping of key addresses to the sources 2010 and amounts of stakes they have received. 2011 2012 Args: 2013 netuid: The network UID from which to get the stakes. 2014 2015 Returns: 2016 A dictionary mapping key addresses to lists of tuples 2017 (module_key_address, amount). 2018 2019 Raises: 2020 QueryError: If the query to the network fails or is invalid. 2021 """ 2022 2023 result = self.query_map("StakeFrom", [], extract_value=extract_value)[ 2024 "StakeFrom" 2025 ] 2026 2027 return transform_stake_dmap(result) 2028 2029 def query_map_staketo( 2030 self, extract_value: bool = False 2031 ) -> dict[Ss58Address, list[tuple[Ss58Address, int]]]: 2032 """ 2033 Retrieves a mapping of stakes to destinations for keys on the network. 2034 2035 Queries the network for a mapping of key addresses to the destinations 2036 and amounts of stakes they have made. 2037 2038 Args: 2039 netuid: The network UID from which to get the stakes. 2040 2041 Returns: 2042 A dictionary mapping key addresses to lists of tuples 2043 (module_key_address, amount). 2044 2045 Raises: 2046 QueryError: If the query to the network fails or is invalid. 2047 """ 2048 2049 result = self.query_map("StakeTo", [], extract_value=extract_value)[ 2050 "StakeTo" 2051 ] 2052 return transform_stake_dmap(result) 2053 2054 def query_map_delegationfee( 2055 self, netuid: int = 0, extract_value: bool = False 2056 ) -> dict[str, int]: 2057 """ 2058 Retrieves a mapping of delegation fees for keys on the network. 2059 2060 Queries the network to obtain a mapping of key addresses to their 2061 respective delegation fees. 2062 2063 Args: 2064 netuid: The network UID to filter the delegation fees. 2065 2066 Returns: 2067 A dictionary mapping key addresses to their delegation fees. 2068 2069 Raises: 2070 QueryError: If the query to the network fails or is invalid. 2071 """ 2072 2073 return self.query_map( 2074 "DelegationFee", [netuid], extract_value=extract_value 2075 )["DelegationFee"] 2076 2077 def query_map_tempo(self, extract_value: bool = False) -> dict[int, int]: 2078 """ 2079 Retrieves a mapping of tempo settings for the network. 2080 2081 Queries the network to obtain the tempo (rate of reward distributions) 2082 settings for various network subnets. 2083 2084 Returns: 2085 A dictionary mapping network UIDs to their tempo settings. 2086 2087 Raises: 2088 QueryError: If the query to the network fails or is invalid. 2089 """ 2090 2091 return self.query_map("Tempo", extract_value=extract_value)["Tempo"] 2092 2093 def query_map_immunity_period(self, extract_value: bool) -> dict[int, int]: 2094 """ 2095 Retrieves a mapping of immunity periods for the network. 2096 2097 Queries the network for the immunity period settings, 2098 which represent the time duration during which modules 2099 can not get deregistered. 2100 2101 Returns: 2102 A dictionary mapping network UIDs to their immunity period settings. 2103 2104 Raises: 2105 QueryError: If the query to the network fails or is invalid. 2106 """ 2107 2108 return self.query_map("ImmunityPeriod", extract_value=extract_value)[ 2109 "ImmunityPeriod" 2110 ] 2111 2112 def query_map_min_allowed_weights( 2113 self, extract_value: bool = False 2114 ) -> dict[int, int]: 2115 """ 2116 Retrieves a mapping of minimum allowed weights for the network. 2117 2118 Queries the network to obtain the minimum allowed weights, 2119 which are the lowest permissible weight values that can be set by 2120 validators. 2121 2122 Returns: 2123 A dictionary mapping network UIDs to 2124 their minimum allowed weight values. 2125 2126 Raises: 2127 QueryError: If the query to the network fails or is invalid. 2128 """ 2129 2130 return self.query_map("MinAllowedWeights", extract_value=extract_value)[ 2131 "MinAllowedWeights" 2132 ] 2133 2134 def query_map_max_allowed_weights( 2135 self, extract_value: bool = False 2136 ) -> dict[int, int]: 2137 """ 2138 Retrieves a mapping of maximum allowed weights for the network. 2139 2140 Queries the network for the maximum allowed weights, 2141 which are the highest permissible 2142 weight values that can be set by validators. 2143 2144 Returns: 2145 A dictionary mapping network UIDs to 2146 their maximum allowed weight values. 2147 2148 Raises: 2149 QueryError: If the query to the network fails or is invalid. 2150 """ 2151 2152 return self.query_map("MaxAllowedWeights", extract_value=extract_value)[ 2153 "MaxAllowedWeights" 2154 ] 2155 2156 def query_map_max_allowed_uids( 2157 self, extract_value: bool = False 2158 ) -> dict[int, int]: 2159 """ 2160 Queries the network for the maximum number of allowed user IDs (UIDs) 2161 for each network subnet. 2162 2163 Fetches a mapping of network subnets to their respective 2164 limits on the number of user IDs that can be created or used. 2165 2166 Returns: 2167 A dictionary mapping network UIDs (unique identifiers) to their 2168 maximum allowed number of UIDs. 2169 Each entry represents a network subnet 2170 with its corresponding UID limit. 2171 2172 Raises: 2173 QueryError: If the query to the network fails or is invalid. 2174 """ 2175 2176 return self.query_map("MaxAllowedUids", extract_value=extract_value)[ 2177 "MaxAllowedUids" 2178 ] 2179 2180 def query_map_min_stake( 2181 self, extract_value: bool = False 2182 ) -> dict[int, int]: 2183 """ 2184 Retrieves a mapping of minimum allowed stake on the network. 2185 2186 Queries the network to obtain the minimum number of stake, 2187 which is represented in nanotokens. 2188 2189 Returns: 2190 A dictionary mapping network UIDs to 2191 their minimum allowed stake values. 2192 2193 Raises: 2194 QueryError: If the query to the network fails or is invalid. 2195 """ 2196 2197 return self.query_map("MinStake", extract_value=extract_value)[ 2198 "MinStake" 2199 ] 2200 2201 def query_map_max_stake( 2202 self, extract_value: bool = False 2203 ) -> dict[int, int]: 2204 """ 2205 Retrieves a mapping of the maximum stake values for the network. 2206 2207 Queries the network for the maximum stake values across various s 2208 ubnets of the network. 2209 2210 Returns: 2211 A dictionary mapping network UIDs to their maximum stake values. 2212 2213 Raises: 2214 QueryError: If the query to the network fails or is invalid. 2215 """ 2216 2217 return self.query_map("MaxStake", extract_value=extract_value)[ 2218 "MaxStake" 2219 ] 2220 2221 def query_map_founder(self, extract_value: bool = False) -> dict[int, str]: 2222 """ 2223 Retrieves a mapping of founders for the network. 2224 2225 Queries the network to obtain the founders associated with 2226 various subnets. 2227 2228 Returns: 2229 A dictionary mapping network UIDs to their respective founders. 2230 2231 Raises: 2232 QueryError: If the query to the network fails or is invalid. 2233 """ 2234 2235 return self.query_map("Founder", extract_value=extract_value)["Founder"] 2236 2237 def query_map_founder_share( 2238 self, extract_value: bool = False 2239 ) -> dict[int, int]: 2240 """ 2241 Retrieves a mapping of founder shares for the network. 2242 2243 Queries the network for the share percentages 2244 allocated to founders across different subnets. 2245 2246 Returns: 2247 A dictionary mapping network UIDs to their founder share percentages. 2248 2249 Raises: 2250 QueryError: If the query to the network fails or is invalid. 2251 """ 2252 2253 return self.query_map("FounderShare", extract_value=extract_value)[ 2254 "FounderShare" 2255 ] 2256 2257 def query_map_incentive_ratio( 2258 self, extract_value: bool = False 2259 ) -> dict[int, int]: 2260 """ 2261 Retrieves a mapping of incentive ratios for the network. 2262 2263 Queries the network for the incentive ratios, 2264 which are the proportions of rewards or incentives 2265 allocated in different subnets of the network. 2266 2267 Returns: 2268 A dictionary mapping network UIDs to their incentive ratios. 2269 2270 Raises: 2271 QueryError: If the query to the network fails or is invalid. 2272 """ 2273 2274 return self.query_map("IncentiveRatio", extract_value=extract_value)[ 2275 "IncentiveRatio" 2276 ] 2277 2278 def query_map_trust_ratio( 2279 self, extract_value: bool = False 2280 ) -> dict[int, int]: 2281 """ 2282 Retrieves a mapping of trust ratios for the network. 2283 2284 Queries the network for trust ratios, 2285 indicative of the level of trust or credibility assigned 2286 to different subnets of the network. 2287 2288 Returns: 2289 A dictionary mapping network UIDs to their trust ratios. 2290 2291 Raises: 2292 QueryError: If the query to the network fails or is invalid. 2293 """ 2294 2295 return self.query_map("TrustRatio", extract_value=extract_value)[ 2296 "TrustRatio" 2297 ] 2298 2299 def query_map_vote_mode_subnet( 2300 self, extract_value: bool = False 2301 ) -> dict[int, str]: 2302 """ 2303 Retrieves a mapping of vote modes for subnets within the network. 2304 2305 Queries the network for the voting modes used in different 2306 subnets, which define the methodology or approach of voting within those 2307 subnets. 2308 2309 Returns: 2310 A dictionary mapping network UIDs to their vote 2311 modes for subnets. 2312 2313 Raises: 2314 QueryError: If the query to the network fails or is invalid. 2315 """ 2316 2317 return self.query_map("VoteModeSubnet", extract_value=extract_value)[ 2318 "VoteModeSubnet" 2319 ] 2320 2321 def query_map_legit_whitelist( 2322 self, extract_value: bool = False 2323 ) -> dict[Ss58Address, int]: 2324 """ 2325 Retrieves a mapping of whitelisted addresses for the network. 2326 2327 Queries the network for a mapping of whitelisted addresses 2328 and their respective legitimacy status. 2329 2330 Returns: 2331 A dictionary mapping addresses to their legitimacy status. 2332 2333 Raises: 2334 QueryError: If the query to the network fails or is invalid. 2335 """ 2336 2337 return self.query_map( 2338 "LegitWhitelist", 2339 module="GovernanceModule", 2340 extract_value=extract_value, 2341 )["LegitWhitelist"] 2342 2343 def query_map_subnet_names( 2344 self, extract_value: bool = False 2345 ) -> dict[int, str]: 2346 """ 2347 Retrieves a mapping of subnet names within the network. 2348 2349 Queries the network for the names of various subnets, 2350 providing an overview of the different 2351 subnets within the network. 2352 2353 Returns: 2354 A dictionary mapping network UIDs to their subnet names. 2355 2356 Raises: 2357 QueryError: If the query to the network fails or is invalid. 2358 """ 2359 2360 return self.query_map("SubnetNames", extract_value=extract_value)[ 2361 "SubnetNames" 2362 ] 2363 2364 def query_map_balances( 2365 self, extract_value: bool = False, block_hash: str | None = None 2366 ) -> dict[str, dict[str, int | dict[str, int | float]]]: 2367 """ 2368 Retrieves a mapping of account balances within the network. 2369 2370 Queries the network for the balances associated with different accounts. 2371 It provides detailed information including various types of 2372 balances for each account. 2373 2374 Returns: 2375 A dictionary mapping account addresses to their balance details. 2376 2377 Raises: 2378 QueryError: If the query to the network fails or is invalid. 2379 """ 2380 2381 return self.query_map( 2382 "Account", 2383 module="System", 2384 extract_value=extract_value, 2385 block_hash=block_hash, 2386 )["Account"] 2387 2388 def query_map_registration_blocks( 2389 self, netuid: int = 0, extract_value: bool = False 2390 ) -> dict[int, int]: 2391 """ 2392 Retrieves a mapping of registration blocks for UIDs on the network. 2393 2394 Queries the network to find the block numbers at which various 2395 UIDs were registered. 2396 2397 Args: 2398 netuid: The network UID from which to get the registrations. 2399 2400 Returns: 2401 A dictionary mapping UIDs to their registration block numbers. 2402 2403 Raises: 2404 QueryError: If the query to the network fails or is invalid. 2405 """ 2406 2407 return self.query_map( 2408 "RegistrationBlock", [netuid], extract_value=extract_value 2409 )["RegistrationBlock"] 2410 2411 def query_map_name( 2412 self, netuid: int = 0, extract_value: bool = False 2413 ) -> dict[int, str]: 2414 """ 2415 Retrieves a mapping of names for keys on the network. 2416 2417 Queries the network for the names associated with different keys. 2418 It provides a mapping of key UIDs to their registered names. 2419 2420 Args: 2421 netuid: The network UID from which to get the names. 2422 2423 Returns: 2424 A dictionary mapping key UIDs to their names. 2425 2426 Raises: 2427 QueryError: If the query to the network fails or is invalid. 2428 """ 2429 2430 return self.query_map("Name", [netuid], extract_value=extract_value)[ 2431 "Name" 2432 ] 2433 2434 # Â == QUERY FUNCTIONS == # 2435 2436 def get_immunity_period(self, netuid: int = 0) -> int: 2437 """ 2438 Queries the network for the immunity period setting. 2439 2440 The immunity period is a time duration during which a module 2441 can not be deregistered from the network. 2442 Fetches the immunity period for a specified network subnet. 2443 2444 Args: 2445 netuid: The network UID for which to query the immunity period. 2446 2447 Returns: 2448 The immunity period setting for the specified network subnet. 2449 2450 Raises: 2451 QueryError: If the query to the network fails or is invalid. 2452 """ 2453 2454 return self.query( 2455 "ImmunityPeriod", 2456 params=[netuid], 2457 ) 2458 2459 def get_max_set_weights_per_epoch(self): 2460 return self.query("MaximumSetWeightCallsPerEpoch") 2461 2462 def get_min_allowed_weights(self, netuid: int = 0) -> int: 2463 """ 2464 Queries the network for the minimum allowed weights setting. 2465 2466 Retrieves the minimum weight values that are possible to set 2467 by a validator within a specific network subnet. 2468 2469 Args: 2470 netuid: The network UID for which to query the minimum allowed 2471 weights. 2472 2473 Returns: 2474 The minimum allowed weight values for the specified network 2475 subnet. 2476 2477 Raises: 2478 QueryError: If the query to the network fails or is invalid. 2479 """ 2480 2481 return self.query( 2482 "MinAllowedWeights", 2483 params=[netuid], 2484 ) 2485 2486 def get_dao_treasury_address(self) -> Ss58Address: 2487 return self.query("DaoTreasuryAddress", module="GovernanceModule") 2488 2489 def get_max_allowed_weights(self, netuid: int = 0) -> int: 2490 """ 2491 Queries the network for the maximum allowed weights setting. 2492 2493 Retrieves the maximum weight values that are possible to set 2494 by a validator within a specific network subnet. 2495 2496 Args: 2497 netuid: The network UID for which to query the maximum allowed 2498 weights. 2499 2500 Returns: 2501 The maximum allowed weight values for the specified network 2502 subnet. 2503 2504 Raises: 2505 QueryError: If the query to the network fails or is invalid. 2506 """ 2507 2508 return self.query("MaxAllowedWeights", params=[netuid]) 2509 2510 def get_max_allowed_uids(self, netuid: int = 0) -> int: 2511 """ 2512 Queries the network for the maximum allowed UIDs setting. 2513 2514 Fetches the upper limit on the number of user IDs that can 2515 be allocated or used within a specific network subnet. 2516 2517 Args: 2518 netuid: The network UID for which to query the maximum allowed UIDs. 2519 2520 Returns: 2521 The maximum number of allowed UIDs for the specified network subnet. 2522 2523 Raises: 2524 QueryError: If the query to the network fails or is invalid. 2525 """ 2526 2527 return self.query("MaxAllowedUids", params=[netuid]) 2528 2529 def get_name(self, netuid: int = 0) -> str: 2530 """ 2531 Queries the network for the name of a specific subnet. 2532 2533 Args: 2534 netuid: The network UID for which to query the name. 2535 2536 Returns: 2537 The name of the specified network subnet. 2538 2539 Raises: 2540 QueryError: If the query to the network fails or is invalid. 2541 """ 2542 2543 return self.query("Name", params=[netuid]) 2544 2545 def get_subnet_name(self, netuid: int = 0) -> str: 2546 """ 2547 Queries the network for the name of a specific subnet. 2548 2549 Args: 2550 netuid: The network UID for which to query the name. 2551 2552 Returns: 2553 The name of the specified network subnet. 2554 2555 Raises: 2556 QueryError: If the query to the network fails or is invalid. 2557 """ 2558 2559 return self.query("SubnetNames", params=[netuid]) 2560 2561 def get_global_dao_treasury(self): 2562 return self.query("GlobalDaoTreasury", module="GovernanceModule") 2563 2564 def get_n(self, netuid: int = 0) -> int: 2565 """ 2566 Queries the network for the 'N' hyperparameter, which represents how 2567 many modules are on the network. 2568 2569 Args: 2570 netuid: The network UID for which to query the 'N' hyperparameter. 2571 2572 Returns: 2573 The value of the 'N' hyperparameter for the specified network 2574 subnet. 2575 2576 Raises: 2577 QueryError: If the query to the network fails or is invalid. 2578 """ 2579 2580 return self.query("N", params=[netuid]) 2581 2582 def get_tempo(self, netuid: int = 0) -> int: 2583 """ 2584 Queries the network for the tempo setting, measured in blocks, for the 2585 specified subnet. 2586 2587 Args: 2588 netuid: The network UID for which to query the tempo. 2589 2590 Returns: 2591 The tempo setting for the specified subnet. 2592 2593 Raises: 2594 QueryError: If the query to the network fails or is invalid. 2595 """ 2596 2597 return self.query("Tempo", params=[netuid]) 2598 2599 def get_total_free_issuance(self, block_hash: str | None = None) -> int: 2600 """ 2601 Queries the network for the total free issuance. 2602 2603 Fetches the total amount of free issuance tokens available 2604 2605 Returns: 2606 The total free issuance amount. 2607 2608 Raises: 2609 QueryError: If the query to the network fails or is invalid. 2610 """ 2611 2612 return self.query( 2613 "TotalIssuance", module="Balances", block_hash=block_hash 2614 ) 2615 2616 def get_total_stake(self, block_hash: str | None = None) -> int: 2617 """ 2618 Retrieves a mapping of total stakes for keys on the network. 2619 2620 Queries the network for a mapping of key UIDs to their total stake amounts. 2621 2622 Returns: 2623 A dictionary mapping key UIDs to their total stake amounts. 2624 2625 Raises: 2626 QueryError: If the query to the network fails or is invalid. 2627 """ 2628 2629 return self.query("TotalStake", block_hash=block_hash) 2630 2631 def get_registrations_per_block(self): 2632 """ 2633 Queries the network for the number of registrations per block. 2634 2635 Fetches the number of registrations that are processed per 2636 block within the network. 2637 2638 Returns: 2639 The number of registrations processed per block. 2640 2641 Raises: 2642 QueryError: If the query to the network fails or is invalid. 2643 """ 2644 2645 return self.query( 2646 "RegistrationsPerBlock", 2647 ) 2648 2649 def max_registrations_per_block(self, netuid: int = 0): 2650 """ 2651 Queries the network for the maximum number of registrations per block. 2652 2653 Retrieves the upper limit of registrations that can be processed in 2654 each block within a specific network subnet. 2655 2656 Args: 2657 netuid: The network UID for which to query. 2658 2659 Returns: 2660 The maximum number of registrations per block for 2661 the specified network subnet. 2662 2663 Raises: 2664 QueryError: If the query to the network fails or is invalid. 2665 """ 2666 2667 return self.query( 2668 "MaxRegistrationsPerBlock", 2669 params=[netuid], 2670 ) 2671 2672 def get_proposal(self, proposal_id: int = 0): 2673 """ 2674 Queries the network for a specific proposal. 2675 2676 Args: 2677 proposal_id: The ID of the proposal to query. 2678 2679 Returns: 2680 The details of the specified proposal. 2681 2682 Raises: 2683 QueryError: If the query to the network fails, is invalid, 2684 or if the proposal ID does not exist. 2685 """ 2686 2687 return self.query( 2688 "Proposals", 2689 params=[proposal_id], 2690 ) 2691 2692 def get_trust(self, netuid: int = 0): 2693 """ 2694 Queries the network for the trust setting of a specific network subnet. 2695 2696 Retrieves the trust level or score, which may represent the 2697 level of trustworthiness or reliability within a 2698 particular network subnet. 2699 2700 Args: 2701 netuid: The network UID for which to query the trust setting. 2702 2703 Returns: 2704 The trust level or score for the specified network subnet. 2705 2706 Raises: 2707 QueryError: If the query to the network fails or is invalid. 2708 """ 2709 2710 return self.query( 2711 "Trust", 2712 params=[netuid], 2713 ) 2714 2715 def get_uids(self, key: Ss58Address, netuid: int = 0) -> bool | None: 2716 """ 2717 Queries the network for module UIDs associated with a specific key. 2718 2719 Args: 2720 key: The key address for which to query UIDs. 2721 netuid: The network UID within which to search for the key. 2722 2723 Returns: 2724 A list of UIDs associated with the specified key. 2725 2726 Raises: 2727 QueryError: If the query to the network fails or is invalid. 2728 """ 2729 2730 return self.query( 2731 "Uids", 2732 params=[netuid, key], 2733 ) 2734 2735 def get_unit_emission(self) -> int: 2736 """ 2737 Queries the network for the unit emission setting. 2738 2739 Retrieves the unit emission value, which represents the 2740 emission rate or quantity for the $COMM token. 2741 2742 Returns: 2743 The unit emission value in nanos for the network. 2744 2745 Raises: 2746 QueryError: If the query to the network fails or is invalid. 2747 """ 2748 2749 return self.query("UnitEmission", module="SubnetEmissionModule") 2750 2751 def get_tx_rate_limit(self) -> int: 2752 """ 2753 Queries the network for the transaction rate limit. 2754 2755 Retrieves the rate limit for transactions within the network, 2756 which defines the maximum number of transactions that can be 2757 processed within a certain timeframe. 2758 2759 Returns: 2760 The transaction rate limit for the network. 2761 2762 Raises: 2763 QueryError: If the query to the network fails or is invalid. 2764 """ 2765 2766 return self.query( 2767 "TxRateLimit", 2768 ) 2769 2770 def get_subnet_burn(self) -> int: 2771 """Queries the network for the subnet burn value. 2772 2773 Retrieves the subnet burn value from the network, which represents 2774 the amount of tokens that are burned (permanently removed from 2775 circulation) for subnet-related operations. 2776 2777 Returns: 2778 int: The subnet burn value. 2779 2780 Raises: 2781 QueryError: If the query to the network fails or returns invalid data. 2782 """ 2783 2784 return self.query( 2785 "SubnetBurn", 2786 ) 2787 2788 def get_burn_rate(self) -> int: 2789 """ 2790 Queries the network for the burn rate setting. 2791 2792 Retrieves the burn rate, which represents the rate at 2793 which the $COMM token is permanently 2794 removed or 'burned' from circulation. 2795 2796 Returns: 2797 The burn rate for the network. 2798 2799 Raises: 2800 QueryError: If the query to the network fails or is invalid. 2801 """ 2802 2803 return self.query( 2804 "BurnRate", 2805 params=[], 2806 ) 2807 2808 def get_burn(self, netuid: int = 0) -> int: 2809 """ 2810 Queries the network for the burn setting. 2811 2812 Retrieves the burn value, which represents the amount of the 2813 $COMM token that is 'burned' or permanently removed from 2814 circulation. 2815 2816 Args: 2817 netuid: The network UID for which to query the burn value. 2818 2819 Returns: 2820 The burn value for the specified network subnet. 2821 2822 Raises: 2823 QueryError: If the query to the network fails or is invalid. 2824 """ 2825 2826 return self.query("Burn", params=[netuid]) 2827 2828 def get_min_burn(self) -> int: 2829 """ 2830 Queries the network for the minimum burn setting. 2831 2832 Retrieves the minimum burn value, indicating the lowest 2833 amount of the $COMM tokens that can be 'burned' or 2834 permanently removed from circulation. 2835 2836 Returns: 2837 The minimum burn value for the network. 2838 2839 Raises: 2840 QueryError: If the query to the network fails or is invalid. 2841 """ 2842 2843 return self.query( 2844 "BurnConfig", 2845 params=[], 2846 )["min_burn"] 2847 2848 def get_min_weight_stake(self) -> int: 2849 """ 2850 Queries the network for the minimum weight stake setting. 2851 2852 Retrieves the minimum weight stake, which represents the lowest 2853 stake weight that is allowed for certain operations or 2854 transactions within the network. 2855 2856 Returns: 2857 The minimum weight stake for the network. 2858 2859 Raises: 2860 QueryError: If the query to the network fails or is invalid. 2861 """ 2862 2863 return self.query("MinWeightStake", params=[]) 2864 2865 def get_vote_mode_global(self) -> str: 2866 """ 2867 Queries the network for the global vote mode setting. 2868 2869 Retrieves the global vote mode, which defines the overall voting 2870 methodology or approach used across the network in default. 2871 2872 Returns: 2873 The global vote mode setting for the network. 2874 2875 Raises: 2876 QueryError: If the query to the network fails or is invalid. 2877 """ 2878 2879 return self.query( 2880 "VoteModeGlobal", 2881 ) 2882 2883 def get_max_proposals(self) -> int: 2884 """ 2885 Queries the network for the maximum number of proposals allowed. 2886 2887 Retrieves the upper limit on the number of proposals that can be 2888 active or considered at any given time within the network. 2889 2890 Returns: 2891 The maximum number of proposals allowed on the network. 2892 2893 Raises: 2894 QueryError: If the query to the network fails or is invalid. 2895 """ 2896 2897 return self.query( 2898 "MaxProposals", 2899 ) 2900 2901 def get_max_registrations_per_block(self) -> int: 2902 """ 2903 Queries the network for the maximum number of registrations per block. 2904 2905 Retrieves the maximum number of registrations that can 2906 be processed in each block within the network. 2907 2908 Returns: 2909 The maximum number of registrations per block on the network. 2910 2911 Raises: 2912 QueryError: If the query to the network fails or is invalid. 2913 """ 2914 2915 return self.query( 2916 "MaxRegistrationsPerBlock", 2917 params=[], 2918 ) 2919 2920 def get_max_name_length(self) -> int: 2921 """ 2922 Queries the network for the maximum length allowed for names. 2923 2924 Retrieves the maximum character length permitted for names 2925 within the network. Such as the module names 2926 2927 Returns: 2928 The maximum length allowed for names on the network. 2929 2930 Raises: 2931 QueryError: If the query to the network fails or is invalid. 2932 """ 2933 2934 return self.query( 2935 "MaxNameLength", 2936 params=[], 2937 ) 2938 2939 def get_global_vote_threshold(self) -> int: 2940 """ 2941 Queries the network for the global vote threshold. 2942 2943 Retrieves the global vote threshold, which is the critical value or 2944 percentage required for decisions in the network's governance process. 2945 2946 Returns: 2947 The global vote threshold for the network. 2948 2949 Raises: 2950 QueryError: If the query to the network fails or is invalid. 2951 """ 2952 2953 return self.query( 2954 "GlobalVoteThreshold", 2955 ) 2956 2957 def get_max_allowed_subnets(self) -> int: 2958 """ 2959 Queries the network for the maximum number of allowed subnets. 2960 2961 Retrieves the upper limit on the number of subnets that can 2962 be created or operated within the network. 2963 2964 Returns: 2965 The maximum number of allowed subnets on the network. 2966 2967 Raises: 2968 QueryError: If the query to the network fails or is invalid. 2969 """ 2970 2971 return self.query( 2972 "MaxAllowedSubnets", 2973 params=[], 2974 ) 2975 2976 def get_max_allowed_modules(self) -> int: 2977 """ 2978 Queries the network for the maximum number of allowed modules. 2979 2980 Retrieves the upper limit on the number of modules that 2981 can be registered within the network. 2982 2983 Returns: 2984 The maximum number of allowed modules on the network. 2985 2986 Raises: 2987 QueryError: If the query to the network fails or is invalid. 2988 """ 2989 2990 return self.query( 2991 "MaxAllowedModules", 2992 params=[], 2993 ) 2994 2995 def get_min_stake(self, netuid: int = 0) -> int: 2996 """ 2997 Queries the network for the minimum stake required to register a key. 2998 2999 Retrieves the minimum amount of stake necessary for 3000 registering a key within a specific network subnet. 3001 3002 Args: 3003 netuid: The network UID for which to query the minimum stake. 3004 3005 Returns: 3006 The minimum stake required for key registration in nanos. 3007 3008 Raises: 3009 QueryError: If the query to the network fails or is invalid. 3010 """ 3011 3012 return self.query("MinStake", params=[netuid]) 3013 3014 def get_stakefrom( 3015 self, 3016 key: Ss58Address, 3017 ) -> dict[str, int]: 3018 """ 3019 Retrieves the stake amounts from all stakers to a specific staked address. 3020 3021 Queries the network for the stakes received by a particular staked address 3022 from all stakers. 3023 3024 Args: 3025 key: The address of the key receiving the stakes. 3026 3027 Returns: 3028 A dictionary mapping staker addresses to their respective stake amounts. 3029 3030 Raises: 3031 QueryError: If the query to the network fails or is invalid. 3032 """ 3033 3034 # Has to use query map in order to iterate through the storage prefix. 3035 return self.query_map("StakeFrom", [key], extract_value=False).get( 3036 "StakeFrom", {} 3037 ) 3038 3039 def get_staketo( 3040 self, 3041 key: Ss58Address, 3042 ) -> dict[str, int]: 3043 """ 3044 Retrieves the stake amounts provided by a specific staker to all staked addresses. 3045 3046 Queries the network for the stakes provided by a particular staker to 3047 all staked addresses. 3048 3049 Args: 3050 key: The address of the key providing the stakes. 3051 3052 Returns: 3053 A dictionary mapping staked addresses to their respective received stake amounts. 3054 3055 Raises: 3056 QueryError: If the query to the network fails or is invalid. 3057 """ 3058 3059 # Has to use query map in order to iterate through the storage prefix. 3060 return self.query_map("StakeTo", [key], extract_value=False).get( 3061 "StakeTo", {} 3062 ) 3063 3064 def get_balance( 3065 self, 3066 addr: Ss58Address, 3067 ) -> int: 3068 """ 3069 Retrieves the balance of a specific key. 3070 3071 Args: 3072 addr: The address of the key to query the balance for. 3073 3074 Returns: 3075 The balance of the specified key. 3076 3077 Raises: 3078 QueryError: If the query to the network fails or is invalid. 3079 """ 3080 3081 result = self.query("Account", module="System", params=[addr]) 3082 3083 return result["data"]["free"] 3084 3085 def get_block(self, block_hash: str | None = None) -> dict[Any, Any] | None: 3086 """ 3087 Retrieves information about a specific block in the network. 3088 3089 Queries the network for details about a block, such as its number, 3090 hash, and other relevant information. 3091 3092 Returns: 3093 The requested information about the block, 3094 or None if the block does not exist 3095 or the information is not available. 3096 3097 Raises: 3098 QueryError: If the query to the network fails or is invalid. 3099 """ 3100 3101 with self.get_conn() as substrate: 3102 block: dict[Any, Any] | None = substrate.get_block( # type: ignore 3103 block_hash # type: ignore 3104 ) 3105 3106 return block 3107 3108 def get_existential_deposit(self, block_hash: str | None = None) -> int: 3109 """ 3110 Retrieves the existential deposit value for the network. 3111 3112 The existential deposit is the minimum balance that must be maintained 3113 in an account to prevent it from being purged. Denotated in nano units. 3114 3115 Returns: 3116 The existential deposit value in nano units. 3117 Note: 3118 The value returned is a fixed value defined in the 3119 client and may not reflect changes in the network's configuration. 3120 """ 3121 3122 with self.get_conn() as substrate: 3123 result: int = substrate.get_constant( # Â type: ignore 3124 "Balances", "ExistentialDeposit", block_hash 3125 ).value # Â type: ignore 3126 3127 return result 3128 3129 def get_voting_power_delegators(self) -> list[Ss58Address]: 3130 result = self.query( 3131 "NotDelegatingVotingPower", [], module="GovernanceModule" 3132 ) 3133 return result 3134 3135 def add_transfer_dao_treasury_proposal( 3136 self, 3137 key: Keypair, 3138 data: str, 3139 amount_nano: int, 3140 dest: Ss58Address, 3141 ): 3142 params = {"dest": dest, "value": amount_nano, "data": data} 3143 3144 return self.compose_call( 3145 module="GovernanceModule", 3146 fn="add_transfer_dao_treasury_proposal", 3147 params=params, 3148 key=key, 3149 ) 3150 3151 def delegate_rootnet_control(self, key: Keypair, dest: Ss58Address): 3152 params = {"origin": key, "target": dest} 3153 3154 return self.compose_call( 3155 module="SubspaceModule", 3156 fn="delegate_rootnet_control", 3157 params=params, 3158 key=key, 3159 )
MAX_REQUEST_SIZE =
9000000
@dataclass
class
Chunk:
26@dataclass 27class Chunk: 28 batch_requests: list[tuple[Any, Any]] 29 prefix_list: list[list[str]] 30 fun_params: list[tuple[Any, Any, Any, Any, str]]
class
CommuneClient:
37class CommuneClient: 38 """ 39 A client for interacting with Commune network nodes, querying storage, 40 submitting transactions, etc. 41 42 Attributes: 43 wait_for_finalization: Whether to wait for transaction finalization. 44 45 Example: 46 ```py 47 client = CommuneClient() 48 client.query(name='function_name', params=['param1', 'param2']) 49 ``` 50 51 Raises: 52 AssertionError: If the maximum connections value is less than or equal 53 to zero. 54 """ 55 56 wait_for_finalization: bool 57 _num_connections: int 58 _connection_queue: queue.Queue[SubstrateInterface] 59 url: str 60 61 def __init__( 62 self, 63 url: str, 64 num_connections: int = 1, 65 wait_for_finalization: bool = False, 66 timeout: int | None = None, 67 ): 68 """ 69 Args: 70 url: The URL of the network node to connect to. 71 num_connections: The number of websocket connections to be opened. 72 """ 73 assert num_connections > 0 74 self._num_connections = num_connections 75 self.wait_for_finalization = wait_for_finalization 76 self._connection_queue = queue.Queue(num_connections) 77 self.url = url 78 ws_options: dict[str, int] = {} 79 if timeout is not None: 80 ws_options["timeout"] = timeout 81 self.ws_options = ws_options 82 for _ in range(num_connections): 83 self._connection_queue.put( 84 SubstrateInterface(url, ws_options=ws_options) 85 ) 86 87 @property 88 def connections(self) -> int: 89 """ 90 Gets the maximum allowed number of simultaneous connections to the 91 network node. 92 """ 93 return self._num_connections 94 95 @contextmanager 96 def get_conn(self, timeout: float | None = None, init: bool = False): 97 """ 98 Context manager to get a connection from the pool. 99 100 Tries to get a connection from the pool queue. If the queue is empty, 101 it blocks for `timeout` seconds until a connection is available. If 102 `timeout` is None, it blocks indefinitely. 103 104 Args: 105 timeout: The maximum time in seconds to wait for a connection. 106 107 Yields: 108 The connection object from the pool. 109 110 Raises: 111 QueueEmptyError: If no connection is available within the timeout 112 period. 113 """ 114 conn = self._connection_queue.get(timeout=timeout) 115 if init: 116 conn.init_runtime() # type: ignore 117 try: 118 if conn.websocket and conn.websocket.connected: # type: ignore 119 yield conn 120 else: 121 conn = SubstrateInterface(self.url, ws_options=self.ws_options) 122 yield conn 123 finally: 124 self._connection_queue.put(conn) 125 126 def _get_storage_keys( 127 self, 128 storage: str, 129 queries: list[tuple[str, list[Any]]], 130 block_hash: str | None, 131 ): 132 send: list[tuple[str, list[Any]]] = [] 133 prefix_list: list[Any] = [] 134 135 key_idx = 0 136 with self.get_conn(init=True) as substrate: 137 for function, params in queries: 138 storage_key = StorageKey.create_from_storage_function( # type: ignore 139 storage, 140 function, 141 params, 142 runtime_config=substrate.runtime_config, # type: ignore 143 metadata=substrate.metadata, # type: ignore 144 ) 145 146 prefix = storage_key.to_hex() 147 prefix_list.append(prefix) 148 send.append(("state_getKeys", [prefix, block_hash])) 149 key_idx += 1 150 return send, prefix_list 151 152 def _get_lists( 153 self, 154 storage_module: str, 155 queries: list[tuple[str, list[Any]]], 156 substrate: SubstrateInterface, 157 ) -> list[tuple[Any, Any, Any, Any, str]]: 158 """ 159 Generates a list of tuples containing parameters for each storage function based on the given functions and substrate interface. 160 161 Args: 162 functions (dict[str, list[query_call]]): A dictionary where keys are storage module names and values are lists of tuples. 163 Each tuple consists of a storage function name and its parameters. 164 substrate: An instance of the SubstrateInterface class used to interact with the substrate. 165 166 Returns: 167 A list of tuples in the format `(value_type, param_types, key_hashers, params, storage_function)` for each storage function in the given functions. 168 169 Example: 170 >>> _get_lists( 171 functions={'storage_module': [('storage_function', ['param1', 'param2'])]}, 172 substrate=substrate_instance 173 ) 174 [('value_type', 'param_types', 'key_hashers', ['param1', 'param2'], 'storage_function'), ...] 175 """ 176 177 function_parameters: list[tuple[Any, Any, Any, Any, str]] = [] 178 179 metadata_pallet = substrate.metadata.get_metadata_pallet( # type: ignore 180 storage_module 181 ) 182 for storage_function, params in queries: 183 storage_item = metadata_pallet.get_storage_function( # type: ignore 184 storage_function 185 ) 186 187 value_type = storage_item.get_value_type_string() # type: ignore 188 param_types = storage_item.get_params_type_string() # type: ignore 189 key_hashers = storage_item.get_param_hashers() # type: ignore 190 function_parameters.append( 191 ( 192 value_type, 193 param_types, 194 key_hashers, 195 params, 196 storage_function, 197 ) # type: ignore 198 ) 199 return function_parameters 200 201 def _send_batch( 202 self, 203 batch_payload: list[Any], 204 request_ids: list[int], 205 extract_result: bool = True, 206 ): 207 """ 208 Sends a batch of requests to the substrate and collects the results. 209 210 Args: 211 substrate: An instance of the substrate interface. 212 batch_payload: The payload of the batch request. 213 request_ids: A list of request IDs for tracking responses. 214 results: A list to store the results of the requests. 215 extract_result: Whether to extract the result from the response. 216 217 Raises: 218 NetworkQueryError: If there is an `error` in the response message. 219 220 Note: 221 No explicit return value as results are appended to the provided 'results' list. 222 """ 223 results: list[str | dict[Any, Any]] = [] 224 with self.get_conn(init=True) as substrate: 225 try: 226 substrate.websocket.send( # Â type: ignore 227 json.dumps(batch_payload) 228 ) 229 except NetworkQueryError: 230 pass 231 while len(results) < len(request_ids): 232 received_messages = json.loads( 233 substrate.websocket.recv() # type: ignore 234 ) 235 if isinstance(received_messages, dict): 236 received_messages: list[dict[Any, Any]] = [ 237 received_messages 238 ] 239 240 for message in received_messages: 241 if message.get("id") in request_ids: 242 if extract_result: 243 try: 244 results.append(message["result"]) 245 except Exception: 246 raise ( 247 RuntimeError( 248 f"Error extracting result from message: {message}" 249 ) 250 ) 251 else: 252 results.append(message) 253 if "error" in message: 254 raise NetworkQueryError(message["error"]) 255 256 return results 257 258 def _make_request_smaller( 259 self, 260 batch_request: list[tuple[T1, T2]], 261 prefix_list: list[list[str]], 262 fun_params: list[tuple[Any, Any, Any, Any, str]], 263 ) -> tuple[list[list[tuple[T1, T2]]], list[Chunk]]: 264 """ 265 Splits a batch of requests into smaller batches, each not exceeding the specified maximum size. 266 267 Args: 268 batch_request: A list of requests to be sent in a batch. 269 max_size: Maximum size of each batch in bytes. 270 271 Returns: 272 A list of smaller request batches. 273 274 Example: 275 >>> _make_request_smaller(batch_request=[('method1', 'params1'), ('method2', 'params2')], max_size=1000) 276 [[('method1', 'params1')], [('method2', 'params2')]] 277 """ 278 assert len(prefix_list) == len(fun_params) == len(batch_request) 279 280 def estimate_size(request: tuple[T1, T2]): 281 """Convert the batch request to a string and measure its length""" 282 return len(json.dumps(request)) 283 284 # Initialize variables 285 result: list[list[tuple[T1, T2]]] = [] 286 current_batch = [] 287 current_prefix_batch = [] 288 current_params_batch = [] 289 current_size = 0 290 291 chunk_list: list[Chunk] = [] 292 293 # Iterate through each request in the batch 294 for request, prefix, params in zip( 295 batch_request, prefix_list, fun_params 296 ): 297 request_size = estimate_size(request) 298 299 # Check if adding this request exceeds the max size 300 if current_size + request_size > MAX_REQUEST_SIZE: 301 # If so, start a new batch 302 303 # Essentiatly checks that it's not the first iteration 304 if current_batch: 305 chunk = Chunk( 306 current_batch, 307 current_prefix_batch, 308 current_params_batch, 309 ) 310 chunk_list.append(chunk) 311 result.append(current_batch) 312 313 current_batch = [request] 314 current_prefix_batch = [prefix] 315 current_params_batch = [params] 316 current_size = request_size 317 else: 318 # Otherwise, add to the current batch 319 current_batch.append(request) 320 current_size += request_size 321 current_prefix_batch.append(prefix) 322 current_params_batch.append(params) 323 324 # Add the last batch if it's not empty 325 if current_batch: 326 result.append(current_batch) 327 chunk = Chunk( 328 current_batch, current_prefix_batch, current_params_batch 329 ) 330 chunk_list.append(chunk) 331 332 return result, chunk_list 333 334 def _are_changes_equal(self, change_a: Any, change_b: Any): 335 for (a, b), (c, d) in zip(change_a, change_b): 336 if a != c or b != d: 337 return False 338 339 def _rpc_request_batch( 340 self, 341 batch_requests: list[tuple[str, list[Any]]], 342 extract_result: bool = True, 343 ) -> list[str]: 344 """ 345 Sends batch requests to the substrate node using multiple threads and collects the results. 346 347 Args: 348 substrate: An instance of the substrate interface. 349 batch_requests : A list of requests to be sent in batches. 350 max_size: Maximum size of each batch in bytes. 351 extract_result: Whether to extract the result from the response message. 352 353 Returns: 354 A list of results from the batch requests. 355 356 Example: 357 >>> _rpc_request_batch(substrate_instance, [('method1', ['param1']), ('method2', ['param2'])]) 358 ['result1', 'result2', ...] 359 """ 360 361 chunk_results: list[Any] = [] 362 # smaller_requests = self._make_request_smaller(batch_requests) 363 request_id = 0 364 with ThreadPoolExecutor() as executor: 365 futures: list[Future[list[str | dict[Any, Any]]]] = [] 366 for chunk in [batch_requests]: 367 request_ids: list[int] = [] 368 batch_payload: list[Any] = [] 369 for method, params in chunk: 370 request_id += 1 371 request_ids.append(request_id) 372 batch_payload.append( 373 { 374 "jsonrpc": "2.0", 375 "method": method, 376 "params": params, 377 "id": request_id, 378 } 379 ) 380 381 futures.append( 382 executor.submit( 383 self._send_batch, 384 batch_payload=batch_payload, 385 request_ids=request_ids, 386 extract_result=extract_result, 387 ) 388 ) 389 for future in futures: 390 resul = future.result() 391 chunk_results.append(resul) 392 return chunk_results 393 394 def _rpc_request_batch_chunked( 395 self, chunk_requests: list[Chunk], extract_result: bool = True 396 ): 397 """ 398 Sends batch requests to the substrate node using multiple threads and collects the results. 399 400 Args: 401 substrate: An instance of the substrate interface. 402 batch_requests : A list of requests to be sent in batches. 403 max_size: Maximum size of each batch in bytes. 404 extract_result: Whether to extract the result from the response message. 405 406 Returns: 407 A list of results from the batch requests. 408 409 Example: 410 >>> _rpc_request_batch(substrate_instance, [('method1', ['param1']), ('method2', ['param2'])]) 411 ['result1', 'result2', ...] 412 """ 413 414 def split_chunks( 415 chunk: Chunk, chunk_info: list[Chunk], chunk_info_idx: int 416 ): 417 manhattam_chunks: list[tuple[Any, Any]] = [] 418 mutaded_chunk_info = deepcopy(chunk_info) 419 max_n_keys = 35000 420 for query in chunk.batch_requests: 421 result_keys = query[1][0] 422 keys_amount = len(result_keys) 423 if keys_amount > max_n_keys: 424 mutaded_chunk_info.pop(chunk_info_idx) 425 for i in range(0, keys_amount, max_n_keys): 426 new_chunk = deepcopy(chunk) 427 splitted_keys = result_keys[i : i + max_n_keys] 428 splitted_query = deepcopy(query) 429 splitted_query[1][0] = splitted_keys 430 new_chunk.batch_requests = [splitted_query] 431 manhattam_chunks.append(splitted_query) 432 mutaded_chunk_info.insert(chunk_info_idx, new_chunk) 433 else: 434 manhattam_chunks.append(query) 435 return manhattam_chunks, mutaded_chunk_info 436 437 assert len(chunk_requests) > 0 438 mutated_chunk_info: list[Chunk] = [] 439 chunk_results: list[Any] = [] 440 # smaller_requests = self._make_request_smaller(batch_requests) 441 request_id = 0 442 443 with ThreadPoolExecutor() as executor: 444 futures: list[Future[list[str | dict[Any, Any]]]] = [] 445 for idx, macro_chunk in enumerate(chunk_requests): 446 _, mutated_chunk_info = split_chunks( 447 macro_chunk, chunk_requests, idx 448 ) 449 for chunk in mutated_chunk_info: 450 request_ids: list[int] = [] 451 batch_payload: list[Any] = [] 452 for method, params in chunk.batch_requests: 453 # for method, params in micro_chunk: 454 request_id += 1 455 request_ids.append(request_id) 456 batch_payload.append( 457 { 458 "jsonrpc": "2.0", 459 "method": method, 460 "params": params, 461 "id": request_id, 462 } 463 ) 464 futures.append( 465 executor.submit( 466 self._send_batch, 467 batch_payload=batch_payload, 468 request_ids=request_ids, 469 extract_result=extract_result, 470 ) 471 ) 472 for future in futures: 473 resul = future.result() 474 chunk_results.append(resul) 475 return chunk_results, mutated_chunk_info 476 477 def _decode_response( 478 self, 479 response: list[str], 480 function_parameters: list[tuple[Any, Any, Any, Any, str]], 481 prefix_list: list[Any], 482 block_hash: str, 483 ) -> dict[str, dict[Any, Any]]: 484 """ 485 Decodes a response from the substrate interface and organizes the data into a dictionary. 486 487 Args: 488 response: A list of encoded responses from a substrate query. 489 function_parameters: A list of tuples containing the parameters for each storage function. 490 last_keys: A list of the last keys used in the substrate query. 491 prefix_list: A list of prefixes used in the substrate query. 492 substrate: An instance of the SubstrateInterface class. 493 block_hash: The hash of the block to be queried. 494 495 Returns: 496 A dictionary where each key is a storage function name and the value is another dictionary. 497 This inner dictionary's key is the decoded key from the response and the value is the corresponding decoded value. 498 499 Raises: 500 ValueError: If an unsupported hash type is encountered in the `concat_hash_len` function. 501 502 Example: 503 >>> _decode_response( 504 response=[...], 505 function_parameters=[...], 506 last_keys=[...], 507 prefix_list=[...], 508 substrate=substrate_instance, 509 block_hash="0x123..." 510 ) 511 {'storage_function_name': {decoded_key: decoded_value, ...}, ...} 512 """ 513 514 def get_item_key_value( 515 item_key: tuple[Any, ...] | Any, 516 ) -> tuple[Any, ...] | Any: 517 if isinstance(item_key, tuple): 518 return tuple(k.value for k in item_key) # type: ignore 519 return item_key.value 520 521 def concat_hash_len(key_hasher: str) -> int: 522 """ 523 Determines the length of the hash based on the given key hasher type. 524 525 Args: 526 key_hasher: The type of key hasher. 527 528 Returns: 529 The length of the hash corresponding to the given key hasher type. 530 531 Raises: 532 ValueError: If the key hasher type is not supported. 533 534 Example: 535 >>> concat_hash_len("Blake2_128Concat") 536 16 537 """ 538 539 if key_hasher == "Blake2_128Concat": 540 return 16 541 elif key_hasher == "Twox64Concat": 542 return 8 543 elif key_hasher == "Identity": 544 return 0 545 else: 546 raise ValueError("Unsupported hash type") 547 548 assert len(response) == len(function_parameters) == len(prefix_list) 549 result_dict: dict[str, dict[Any, Any]] = {} 550 for res, fun_params_tuple, prefix in zip( 551 response, function_parameters, prefix_list 552 ): 553 if not res: 554 continue 555 res = res[0] 556 changes = res["changes"] # type: ignore 557 value_type, param_types, key_hashers, params, storage_function = ( 558 fun_params_tuple 559 ) 560 with self.get_conn(init=True) as substrate: 561 for item in changes: 562 # Determine type string 563 key_type_string: list[Any] = [] 564 for n in range(len(params), len(param_types)): 565 key_type_string.append( 566 f"[u8; {concat_hash_len(key_hashers[n])}]" 567 ) 568 key_type_string.append(param_types[n]) 569 570 item_key_obj = substrate.decode_scale( # type: ignore 571 type_string=f"({', '.join(key_type_string)})", 572 scale_bytes="0x" + item[0][len(prefix) :], 573 return_scale_obj=True, 574 block_hash=block_hash, 575 ) 576 # strip key_hashers to use as item key 577 if len(param_types) - len(params) == 1: 578 item_key = item_key_obj.value_object[1] # type: ignore 579 else: 580 item_key = tuple( # type: ignore 581 item_key_obj.value_object[key + 1] # type: ignore 582 for key in range( # type: ignore 583 len(params), len(param_types) + 1, 2 584 ) 585 ) 586 587 item_value = substrate.decode_scale( # type: ignore 588 type_string=value_type, 589 scale_bytes=item[1], 590 return_scale_obj=True, 591 block_hash=block_hash, 592 ) 593 result_dict.setdefault(storage_function, {}) 594 key = get_item_key_value(item_key) # type: ignore 595 result_dict[storage_function][key] = item_value.value # type: ignore 596 597 return result_dict 598 599 def query_batch( 600 self, functions: dict[str, list[tuple[str, list[Any]]]] 601 ) -> dict[str, str]: 602 """ 603 Executes batch queries on a substrate and returns results in a dictionary format. 604 605 Args: 606 substrate: An instance of SubstrateInterface to interact with the substrate. 607 functions (dict[str, list[query_call]]): A dictionary mapping module names to lists of query calls (function name and parameters). 608 609 Returns: 610 A dictionary where keys are storage function names and values are the query results. 611 612 Raises: 613 Exception: If no result is found from the batch queries. 614 615 Example: 616 >>> query_batch(substrate_instance, {'module_name': [('function_name', ['param1', 'param2'])]}) 617 {'function_name': 'query_result', ...} 618 """ 619 620 result: dict[str, str] = {} 621 if not functions: 622 raise Exception("No result") 623 with self.get_conn(init=True) as substrate: 624 for module, queries in functions.items(): 625 storage_keys: list[Any] = [] 626 for fn, params in queries: 627 storage_function = substrate.create_storage_key( # type: ignore 628 pallet=module, storage_function=fn, params=params 629 ) 630 storage_keys.append(storage_function) 631 632 block_hash = substrate.get_block_hash() 633 responses: list[Any] = substrate.query_multi( # type: ignore 634 storage_keys=storage_keys, block_hash=block_hash 635 ) 636 637 for item in responses: 638 fun = item[0] 639 query = item[1] 640 storage_fun = fun.storage_function 641 result[storage_fun] = query.value 642 643 return result 644 645 def query_batch_map( 646 self, 647 functions: dict[str, list[tuple[str, list[Any]]]], 648 block_hash: str | None = None, 649 ) -> dict[str, dict[Any, Any]]: 650 """ 651 Queries multiple storage functions using a map batch approach and returns the combined result. 652 653 Args: 654 substrate: An instance of SubstrateInterface for substrate interaction. 655 functions (dict[str, list[query_call]]): A dictionary mapping module names to lists of query calls. 656 657 Returns: 658 The combined result of the map batch query. 659 660 Example: 661 >>> query_batch_map(substrate_instance, {'module_name': [('function_name', ['param1', 'param2'])]}) 662 # Returns the combined result of the map batch query 663 """ 664 multi_result: dict[str, dict[Any, Any]] = {} 665 666 def recursive_update( 667 d: dict[str, dict[T1, T2] | dict[str, Any]], 668 u: Mapping[str, dict[Any, Any] | str], 669 ) -> dict[str, dict[T1, T2]]: 670 for k, v in u.items(): 671 if isinstance(v, dict): 672 d[k] = recursive_update(d.get(k, {}), v) # type: ignore 673 else: 674 d[k] = v # type: ignore 675 return d # type: ignore 676 677 def get_page(): 678 send, prefix_list = self._get_storage_keys( 679 storage, queries, block_hash 680 ) 681 with self.get_conn(init=True) as substrate: 682 function_parameters = self._get_lists( 683 storage, queries, substrate 684 ) 685 responses = self._rpc_request_batch(send) 686 # assumption because send is just the storage_function keys 687 # so it should always be really small regardless of the amount of queries 688 assert len(responses) == 1 689 res = responses[0] 690 built_payload: list[tuple[str, list[Any]]] = [] 691 for result_keys in res: 692 built_payload.append( 693 ("state_queryStorageAt", [result_keys, block_hash]) 694 ) 695 _, chunks_info = self._make_request_smaller( 696 built_payload, prefix_list, function_parameters 697 ) 698 chunks_response, chunks_info = self._rpc_request_batch_chunked( 699 chunks_info 700 ) 701 return chunks_response, chunks_info 702 703 if not block_hash: 704 with self.get_conn(init=True) as substrate: 705 block_hash = substrate.get_block_hash() 706 for storage, queries in functions.items(): 707 chunks, chunks_info = get_page() 708 # if this doesn't happen something is wrong on the code 709 # and we won't be able to decode the data properly 710 assert len(chunks) == len(chunks_info) 711 for chunk_info, response in zip(chunks_info, chunks): 712 storage_result = self._decode_response( 713 response, 714 chunk_info.fun_params, 715 chunk_info.prefix_list, 716 block_hash, 717 ) 718 multi_result = recursive_update(multi_result, storage_result) 719 720 return multi_result 721 722 def query( 723 self, 724 name: str, 725 params: list[Any] = [], 726 module: str = "SubspaceModule", 727 block_hash: str | None = None, 728 ) -> Any: 729 """ 730 Queries a storage function on the network. 731 732 Sends a query to the network and retrieves data from a 733 specified storage function. 734 735 Args: 736 name: The name of the storage function to query. 737 params: The parameters to pass to the storage function. 738 module: The module where the storage function is located. 739 740 Returns: 741 The result of the query from the network. 742 743 Raises: 744 NetworkQueryError: If the query fails or is invalid. 745 """ 746 747 result = self.query_batch({module: [(name, params)]}) 748 749 return result[name] 750 751 def query_map( 752 self, 753 name: str, 754 params: list[Any] = [], 755 module: str = "SubspaceModule", 756 extract_value: bool = True, 757 block_hash: str | None = None, 758 ) -> dict[Any, Any]: 759 """ 760 Queries a storage map from a network node. 761 762 Args: 763 name: The name of the storage map to query. 764 params: A list of parameters for the query. 765 module: The module in which the storage map is located. 766 767 Returns: 768 A dictionary representing the key-value pairs 769 retrieved from the storage map. 770 771 Raises: 772 QueryError: If the query to the network fails or is invalid. 773 """ 774 775 result = self.query_batch_map({module: [(name, params)]}, block_hash) 776 777 if extract_value: 778 return {k.value: v.value for k, v in result} # type: ignore 779 780 return result 781 782 def compose_call( 783 self, 784 fn: str, 785 params: dict[str, Any], 786 key: Keypair | None, 787 module: str = "SubspaceModule", 788 wait_for_inclusion: bool = True, 789 wait_for_finalization: bool | None = None, 790 sudo: bool = False, 791 unsigned: bool = False, 792 ) -> ExtrinsicReceipt: 793 """ 794 Composes and submits a call to the network node. 795 796 Composes and signs a call with the provided keypair, and submits it to 797 the network. The call can be a standard extrinsic or a sudo extrinsic if 798 elevated permissions are required. The method can optionally wait for 799 the call's inclusion in a block and/or its finalization. 800 801 Args: 802 fn: The function name to call on the network. 803 params: A dictionary of parameters for the call. 804 key: The keypair for signing the extrinsic. 805 module: The module containing the function. 806 wait_for_inclusion: Wait for the call's inclusion in a block. 807 wait_for_finalization: Wait for the transaction's finalization. 808 sudo: Execute the call as a sudo (superuser) operation. 809 810 Returns: 811 The receipt of the submitted extrinsic, if 812 `wait_for_inclusion` is True. Otherwise, returns a string 813 identifier of the extrinsic. 814 815 Raises: 816 ChainTransactionError: If the transaction fails. 817 """ 818 819 if key is None and not unsigned: 820 raise ValueError("Key must be provided for signed extrinsics.") 821 822 with self.get_conn() as substrate: 823 if wait_for_finalization is None: 824 wait_for_finalization = self.wait_for_finalization 825 826 call = substrate.compose_call( # type: ignore 827 call_module=module, call_function=fn, call_params=params 828 ) 829 if sudo: 830 call = substrate.compose_call( # type: ignore 831 call_module="Sudo", 832 call_function="sudo", 833 call_params={ 834 "call": call.value, # type: ignore 835 }, 836 ) 837 838 if not unsigned: 839 assert key is not None 840 extrinsic = substrate.create_signed_extrinsic( # type: ignore 841 call=call, 842 keypair=key, 843 ) 844 else: 845 extrinsic = substrate.create_unsigned_extrinsic(call=call) # type: ignore 846 847 response = substrate.submit_extrinsic( 848 extrinsic=extrinsic, 849 wait_for_inclusion=wait_for_inclusion, 850 wait_for_finalization=wait_for_finalization, 851 ) 852 if wait_for_inclusion: 853 if not response.is_success: 854 raise ChainTransactionError( 855 response.error_message, # type: ignore 856 response, # type: ignore 857 ) 858 859 return response 860 861 def compose_call_multisig( 862 self, 863 fn: str, 864 params: dict[str, Any], 865 key: Keypair, 866 signatories: list[Ss58Address], 867 threshold: int, 868 module: str = "SubspaceModule", 869 wait_for_inclusion: bool = True, 870 wait_for_finalization: bool | None = None, 871 sudo: bool = False, 872 era: dict[str, int] | None = None, 873 ) -> ExtrinsicReceipt: 874 """ 875 Composes and submits a multisignature call to the network node. 876 877 This method allows the composition and submission of a call that 878 requires multiple signatures for execution, known as a multisignature 879 call. It supports specifying signatories, a threshold of signatures for 880 the call's execution, and an optional era for the call's mortality. The 881 call can be a standard extrinsic, a sudo extrinsic for elevated 882 permissions, or a multisig extrinsic if multiple signatures are 883 required. Optionally, the method can wait for the call's inclusion in a 884 block and/or its finalization. Make sure to pass all keys, 885 that are part of the multisignature. 886 887 Args: 888 fn: The function name to call on the network. params: A dictionary 889 of parameters for the call. key: The keypair for signing the 890 extrinsic. signatories: List of SS58 addresses of the signatories. 891 Include ALL KEYS that are part of the multisig. threshold: The 892 minimum number of signatories required to execute the extrinsic. 893 module: The module containing the function to call. 894 wait_for_inclusion: Whether to wait for the call's inclusion in a 895 block. wait_for_finalization: Whether to wait for the transaction's 896 finalization. sudo: Execute the call as a sudo (superuser) 897 operation. era: Specifies the call's mortality in terms of blocks in 898 the format 899 {'period': amount_blocks}. If omitted, the extrinsic is 900 immortal. 901 902 Returns: 903 The receipt of the submitted extrinsic if `wait_for_inclusion` is 904 True. Otherwise, returns a string identifier of the extrinsic. 905 906 Raises: 907 ChainTransactionError: If the transaction fails. 908 """ 909 910 # getting the call ready 911 with self.get_conn() as substrate: 912 if wait_for_finalization is None: 913 wait_for_finalization = self.wait_for_finalization 914 915 # prepares the `GenericCall` object 916 call = substrate.compose_call( # type: ignore 917 call_module=module, call_function=fn, call_params=params 918 ) 919 if sudo: 920 call = substrate.compose_call( # type: ignore 921 call_module="Sudo", 922 call_function="sudo", 923 call_params={ 924 "call": call.value, # type: ignore 925 }, 926 ) 927 928 # modify the rpc methods at runtime, to allow for correct payment 929 # fee calculation parity has a bug in this version, 930 # where the method has to be removed 931 rpc_methods = substrate.config.get("rpc_methods") # type: ignore 932 933 if "state_call" in rpc_methods: # type: ignore 934 rpc_methods.remove("state_call") # type: ignore 935 936 # create the multisig account 937 multisig_acc = substrate.generate_multisig_account( # type: ignore 938 signatories, threshold 939 ) 940 941 # send the multisig extrinsic 942 extrinsic = substrate.create_multisig_extrinsic( # type: ignore 943 call=call, # type: ignore 944 keypair=key, 945 multisig_account=multisig_acc, # type: ignore 946 era=era, # type: ignore 947 ) # type: ignore 948 949 response = substrate.submit_extrinsic( 950 extrinsic=extrinsic, 951 wait_for_inclusion=wait_for_inclusion, 952 wait_for_finalization=wait_for_finalization, 953 ) 954 955 if wait_for_inclusion: 956 if not response.is_success: 957 raise ChainTransactionError( 958 response.error_message, # type: ignore 959 response, # type: ignore 960 ) 961 962 return response 963 964 def transfer( 965 self, 966 key: Keypair, 967 amount: int, 968 dest: Ss58Address, 969 ) -> ExtrinsicReceipt: 970 """ 971 Transfers a specified amount of tokens from the signer's account to the 972 specified account. 973 974 Args: 975 key: The keypair associated with the sender's account. 976 amount: The amount to transfer, in nanotokens. 977 dest: The SS58 address of the recipient. 978 979 Returns: 980 A receipt of the transaction. 981 982 Raises: 983 InsufficientBalanceError: If the sender's account does not have 984 enough balance. 985 ChainTransactionError: If the transaction fails. 986 """ 987 988 params = {"dest": dest, "value": amount} 989 990 return self.compose_call( 991 module="Balances", fn="transfer_keep_alive", params=params, key=key 992 ) 993 994 def transfer_multiple( 995 self, 996 key: Keypair, 997 destinations: list[Ss58Address], 998 amounts: list[int], 999 netuid: str | int = 0, 1000 ) -> ExtrinsicReceipt: 1001 """ 1002 Transfers specified amounts of tokens from the signer's account to 1003 multiple target accounts. 1004 1005 The `destinations` and `amounts` lists must be of the same length. 1006 1007 Args: 1008 key: The keypair associated with the sender's account. 1009 destinations: A list of SS58 addresses of the recipients. 1010 amounts: Amount to transfer to each recipient, in nanotokens. 1011 netuid: The network identifier. 1012 1013 Returns: 1014 A receipt of the transaction. 1015 1016 Raises: 1017 InsufficientBalanceError: If the sender's account does not have 1018 enough balance for all transfers. 1019 ChainTransactionError: If the transaction fails. 1020 """ 1021 1022 assert len(destinations) == len(amounts) 1023 1024 # extract existential deposit from amounts 1025 existential_deposit = self.get_existential_deposit() 1026 amounts = [a - existential_deposit for a in amounts] 1027 1028 params = { 1029 "netuid": netuid, 1030 "destinations": destinations, 1031 "amounts": amounts, 1032 } 1033 1034 return self.compose_call( 1035 module="SubspaceModule", 1036 fn="transfer_multiple", 1037 params=params, 1038 key=key, 1039 ) 1040 1041 def stake( 1042 self, 1043 key: Keypair, 1044 amount: int, 1045 dest: Ss58Address, 1046 ) -> ExtrinsicReceipt: 1047 """ 1048 Stakes the specified amount of tokens to a module key address. 1049 1050 Args: 1051 key: The keypair associated with the staker's account. 1052 amount: The amount of tokens to stake, in nanotokens. 1053 dest: The SS58 address of the module key to stake to. 1054 netuid: The network identifier. 1055 1056 Returns: 1057 A receipt of the staking transaction. 1058 1059 Raises: 1060 InsufficientBalanceError: If the staker's account does not have 1061 enough balance. 1062 ChainTransactionError: If the transaction fails. 1063 """ 1064 1065 params = {"amount": amount, "module_key": dest} 1066 1067 return self.compose_call(fn="add_stake", params=params, key=key) 1068 1069 def unstake( 1070 self, 1071 key: Keypair, 1072 amount: int, 1073 dest: Ss58Address, 1074 ) -> ExtrinsicReceipt: 1075 """ 1076 Unstakes the specified amount of tokens from a module key address. 1077 1078 Args: 1079 key: The keypair associated with the unstaker's account. 1080 amount: The amount of tokens to unstake, in nanotokens. 1081 dest: The SS58 address of the module key to unstake from. 1082 netuid: The network identifier. 1083 1084 Returns: 1085 A receipt of the unstaking transaction. 1086 1087 Raises: 1088 InsufficientStakeError: If the staked key does not have enough 1089 staked tokens by the signer key. 1090 ChainTransactionError: If the transaction fails. 1091 """ 1092 1093 params = {"amount": amount, "module_key": dest} 1094 return self.compose_call(fn="remove_stake", params=params, key=key) 1095 1096 def update_module( 1097 self, 1098 key: Keypair, 1099 name: str, 1100 address: str, 1101 metadata: str | None = None, 1102 delegation_fee: int = 20, 1103 netuid: int = 0, 1104 ) -> ExtrinsicReceipt: 1105 """ 1106 Updates the parameters of a registered module. 1107 1108 The delegation fee must be an integer between 0 and 100. 1109 1110 Args: 1111 key: The keypair associated with the module's account. 1112 name: The new name for the module. If None, the name is not updated. 1113 address: The new address for the module. 1114 If None, the address is not updated. 1115 delegation_fee: The new delegation fee for the module, 1116 between 0 and 100. 1117 netuid: The network identifier. 1118 1119 Returns: 1120 A receipt of the module update transaction. 1121 1122 Raises: 1123 InvalidParameterError: If the provided parameters are invalid. 1124 ChainTransactionError: If the transaction fails. 1125 """ 1126 1127 assert isinstance(delegation_fee, int) 1128 params = { 1129 "netuid": netuid, 1130 "name": name, 1131 "address": address, 1132 "delegation_fee": delegation_fee, 1133 "metadata": metadata, 1134 } 1135 1136 response = self.compose_call("update_module", params=params, key=key) 1137 1138 return response 1139 1140 def register_module( 1141 self, 1142 key: Keypair, 1143 name: str, 1144 address: str | None = None, 1145 subnet: str = "Rootnet", 1146 metadata: str | None = None, 1147 ) -> ExtrinsicReceipt: 1148 """ 1149 Registers a new module in the network. 1150 1151 Args: 1152 key: The keypair used for registering the module. 1153 name: The name of the module. If None, a default or previously 1154 set name is used. # How does this work? 1155 address: The address of the module. If None, a default or 1156 previously set address is used. # How does this work? 1157 subnet: The network subnet to register the module in. 1158 min_stake: The minimum stake required for the module, in nanotokens. 1159 If None, a default value is used. 1160 1161 Returns: 1162 A receipt of the registration transaction. 1163 1164 Raises: 1165 InvalidParameterError: If the provided parameters are invalid. 1166 ChainTransactionError: If the transaction fails. 1167 """ 1168 1169 key_addr = key.ss58_address 1170 1171 params = { 1172 "network_name": subnet, 1173 "address": address, 1174 "name": name, 1175 "module_key": key_addr, 1176 "metadata": metadata, 1177 } 1178 1179 response = self.compose_call("register", params=params, key=key) 1180 return response 1181 1182 def deregister_module(self, key: Keypair, netuid: int) -> ExtrinsicReceipt: 1183 """ 1184 Deregisters a module from the network. 1185 1186 Args: 1187 key: The keypair associated with the module's account. 1188 netuid: The network identifier. 1189 1190 Returns: 1191 A receipt of the module deregistration transaction. 1192 1193 Raises: 1194 ChainTransactionError: If the transaction fails. 1195 """ 1196 1197 params = {"netuid": netuid} 1198 1199 response = self.compose_call("deregister", params=params, key=key) 1200 1201 return response 1202 1203 def register_subnet( 1204 self, key: Keypair, name: str, metadata: str | None = None 1205 ) -> ExtrinsicReceipt: 1206 """ 1207 Registers a new subnet in the network. 1208 1209 Args: 1210 key (Keypair): The keypair used for registering the subnet. 1211 name (str): The name of the subnet to be registered. 1212 metadata (str | None, optional): Additional metadata for the subnet. Defaults to None. 1213 1214 Returns: 1215 ExtrinsicReceipt: A receipt of the subnet registration transaction. 1216 1217 Raises: 1218 ChainTransactionError: If the transaction fails. 1219 """ 1220 1221 params = { 1222 "name": name, 1223 "metadata": metadata, 1224 } 1225 1226 response = self.compose_call("register_subnet", params=params, key=key) 1227 1228 return response 1229 1230 def vote( 1231 self, 1232 key: Keypair, 1233 uids: list[int], 1234 weights: list[int], 1235 netuid: int = 0, 1236 ) -> ExtrinsicReceipt: 1237 """ 1238 Casts votes on a list of module UIDs with corresponding weights. 1239 1240 The length of the UIDs list and the weights list should be the same. 1241 Each weight corresponds to the UID at the same index. 1242 1243 Args: 1244 key: The keypair used for signing the vote transaction. 1245 uids: A list of module UIDs to vote on. 1246 weights: A list of weights corresponding to each UID. 1247 netuid: The network identifier. 1248 1249 Returns: 1250 A receipt of the voting transaction. 1251 1252 Raises: 1253 InvalidParameterError: If the lengths of UIDs and weights lists 1254 do not match. 1255 ChainTransactionError: If the transaction fails. 1256 """ 1257 1258 assert len(uids) == len(weights) 1259 1260 params = { 1261 "uids": uids, 1262 "weights": weights, 1263 "netuid": netuid, 1264 } 1265 1266 response = self.compose_call("set_weights", params=params, key=key) 1267 1268 return response 1269 1270 def update_subnet( 1271 self, 1272 key: Keypair, 1273 params: SubnetParams, 1274 netuid: int = 0, 1275 ) -> ExtrinsicReceipt: 1276 """ 1277 Update a subnet's configuration. 1278 1279 It requires the founder key for authorization. 1280 1281 Args: 1282 key: The founder keypair of the subnet. 1283 params: The new parameters for the subnet. 1284 netuid: The network identifier. 1285 1286 Returns: 1287 A receipt of the subnet update transaction. 1288 1289 Raises: 1290 AuthorizationError: If the key is not authorized. 1291 ChainTransactionError: If the transaction fails. 1292 """ 1293 1294 general_params = dict(params) 1295 general_params["netuid"] = netuid 1296 if general_params.get("subnet_metadata") is None: 1297 general_params["metadata"] = None 1298 else: 1299 general_params["metadata"] = general_params["subnet_metadata"] 1300 1301 response = self.compose_call( 1302 fn="update_subnet", 1303 params=general_params, 1304 key=key, 1305 ) 1306 1307 return response 1308 1309 def transfer_stake( 1310 self, 1311 key: Keypair, 1312 amount: int, 1313 from_module_key: Ss58Address, 1314 dest_module_address: Ss58Address, 1315 ) -> ExtrinsicReceipt: 1316 """ 1317 Realocate staked tokens from one staked module to another module. 1318 1319 Args: 1320 key: The keypair associated with the account that is delegating the tokens. 1321 amount: The amount of staked tokens to transfer, in nanotokens. 1322 from_module_key: The SS58 address of the module you want to transfer from (currently delegated by the key). 1323 dest_module_address: The SS58 address of the destination (newly delegated key). 1324 netuid: The network identifier. 1325 1326 Returns: 1327 A receipt of the stake transfer transaction. 1328 1329 Raises: 1330 InsufficientStakeError: If the source module key does not have 1331 enough staked tokens. ChainTransactionError: If the transaction 1332 fails. 1333 """ 1334 1335 amount = amount - self.get_existential_deposit() 1336 1337 params = { 1338 "amount": amount, 1339 "module_key": from_module_key, 1340 "new_module_key": dest_module_address, 1341 } 1342 1343 response = self.compose_call("transfer_stake", key=key, params=params) 1344 1345 return response 1346 1347 def multiunstake( 1348 self, 1349 key: Keypair, 1350 keys: list[Ss58Address], 1351 amounts: list[int], 1352 ) -> ExtrinsicReceipt: 1353 """ 1354 Unstakes tokens from multiple module keys. 1355 1356 And the lists `keys` and `amounts` must be of the same length. Each 1357 amount corresponds to the module key at the same index. 1358 1359 Args: 1360 key: The keypair associated with the unstaker's account. 1361 keys: A list of SS58 addresses of the module keys to unstake from. 1362 amounts: A list of amounts to unstake from each module key, 1363 in nanotokens. 1364 netuid: The network identifier. 1365 1366 Returns: 1367 A receipt of the multi-unstaking transaction. 1368 1369 Raises: 1370 MismatchedLengthError: If the lengths of keys and amounts lists do 1371 not match. InsufficientStakeError: If any of the module keys do not 1372 have enough staked tokens. ChainTransactionError: If the transaction 1373 fails. 1374 """ 1375 1376 assert len(keys) == len(amounts) 1377 1378 params = {"module_keys": keys, "amounts": amounts} 1379 1380 response = self.compose_call( 1381 "remove_stake_multiple", params=params, key=key 1382 ) 1383 1384 return response 1385 1386 def multistake( 1387 self, 1388 key: Keypair, 1389 keys: list[Ss58Address], 1390 amounts: list[int], 1391 ) -> ExtrinsicReceipt: 1392 """ 1393 Stakes tokens to multiple module keys. 1394 1395 The lengths of the `keys` and `amounts` lists must be the same. Each 1396 amount corresponds to the module key at the same index. 1397 1398 Args: 1399 key: The keypair associated with the staker's account. 1400 keys: A list of SS58 addresses of the module keys to stake to. 1401 amounts: A list of amounts to stake to each module key, 1402 in nanotokens. 1403 netuid: The network identifier. 1404 1405 Returns: 1406 A receipt of the multi-staking transaction. 1407 1408 Raises: 1409 MismatchedLengthError: If the lengths of keys and amounts lists 1410 do not match. 1411 ChainTransactionError: If the transaction fails. 1412 """ 1413 1414 assert len(keys) == len(amounts) 1415 1416 params = { 1417 "module_keys": keys, 1418 "amounts": amounts, 1419 } 1420 1421 response = self.compose_call( 1422 "add_stake_multiple", params=params, key=key 1423 ) 1424 1425 return response 1426 1427 def add_profit_shares( 1428 self, 1429 key: Keypair, 1430 keys: list[Ss58Address], 1431 shares: list[int], 1432 ) -> ExtrinsicReceipt: 1433 """ 1434 Allocates profit shares to multiple keys. 1435 1436 The lists `keys` and `shares` must be of the same length, 1437 with each share amount corresponding to the key at the same index. 1438 1439 Args: 1440 key: The keypair associated with the account 1441 distributing the shares. 1442 keys: A list of SS58 addresses to allocate shares to. 1443 shares: A list of share amounts to allocate to each key, 1444 in nanotokens. 1445 1446 Returns: 1447 A receipt of the profit sharing transaction. 1448 1449 Raises: 1450 MismatchedLengthError: If the lengths of keys and shares 1451 lists do not match. 1452 ChainTransactionError: If the transaction fails. 1453 """ 1454 1455 assert len(keys) == len(shares) 1456 1457 params = {"keys": keys, "shares": shares} 1458 1459 response = self.compose_call( 1460 "add_profit_shares", params=params, key=key 1461 ) 1462 1463 return response 1464 1465 def add_subnet_proposal( 1466 self, key: Keypair, params: dict[str, Any], ipfs: str, netuid: int = 0 1467 ) -> ExtrinsicReceipt: 1468 """ 1469 Submits a proposal for creating or modifying a subnet within the 1470 network. 1471 1472 The proposal includes various parameters like the name, founder, share 1473 allocations, and other subnet-specific settings. 1474 1475 Args: 1476 key: The keypair used for signing the proposal transaction. 1477 params: The parameters for the subnet proposal. 1478 netuid: The network identifier. 1479 1480 Returns: 1481 A receipt of the subnet proposal transaction. 1482 1483 Raises: 1484 InvalidParameterError: If the provided subnet 1485 parameters are invalid. 1486 ChainTransactionError: If the transaction fails. 1487 """ 1488 1489 general_params = dict(params) 1490 general_params["netuid"] = netuid 1491 general_params["data"] = ipfs 1492 if "metadata" not in general_params: 1493 general_params["metadata"] = None 1494 1495 # general_params["burn_config"] = json.dumps(general_params["burn_config"]) 1496 response = self.compose_call( 1497 fn="add_subnet_params_proposal", 1498 params=general_params, 1499 key=key, 1500 module="GovernanceModule", 1501 ) 1502 1503 return response 1504 1505 def add_custom_proposal( 1506 self, 1507 key: Keypair, 1508 cid: str, 1509 ) -> ExtrinsicReceipt: 1510 params = {"data": cid} 1511 1512 response = self.compose_call( 1513 fn="add_global_custom_proposal", 1514 params=params, 1515 key=key, 1516 module="GovernanceModule", 1517 ) 1518 return response 1519 1520 def add_custom_subnet_proposal( 1521 self, 1522 key: Keypair, 1523 cid: str, 1524 netuid: int = 0, 1525 ) -> ExtrinsicReceipt: 1526 """ 1527 Submits a proposal for creating or modifying a custom subnet within the 1528 network. 1529 1530 The proposal includes various parameters like the name, founder, share 1531 allocations, and other subnet-specific settings. 1532 1533 Args: 1534 key: The keypair used for signing the proposal transaction. 1535 params: The parameters for the subnet proposal. 1536 netuid: The network identifier. 1537 1538 Returns: 1539 A receipt of the subnet proposal transaction. 1540 """ 1541 1542 params = { 1543 "data": cid, 1544 "netuid": netuid, 1545 } 1546 1547 response = self.compose_call( 1548 fn="add_subnet_custom_proposal", 1549 params=params, 1550 key=key, 1551 module="GovernanceModule", 1552 ) 1553 1554 return response 1555 1556 def add_global_proposal( 1557 self, 1558 key: Keypair, 1559 params: NetworkParams, 1560 cid: str | None, 1561 ) -> ExtrinsicReceipt: 1562 """ 1563 Submits a proposal for altering the global network parameters. 1564 1565 Allows for the submission of a proposal to 1566 change various global parameters 1567 of the network, such as emission rates, rate limits, and voting 1568 thresholds. It is used to 1569 suggest changes that affect the entire network's operation. 1570 1571 Args: 1572 key: The keypair used for signing the proposal transaction. 1573 params: A dictionary containing global network parameters 1574 like maximum allowed subnets, modules, 1575 transaction rate limits, and others. 1576 1577 Returns: 1578 A receipt of the global proposal transaction. 1579 1580 Raises: 1581 InvalidParameterError: If the provided network 1582 parameters are invalid. 1583 ChainTransactionError: If the transaction fails. 1584 """ 1585 general_params = cast(dict[str, Any], params) 1586 cid = cid or "" 1587 general_params["data"] = cid 1588 1589 response = self.compose_call( 1590 fn="add_global_params_proposal", 1591 params=general_params, 1592 key=key, 1593 module="GovernanceModule", 1594 ) 1595 1596 return response 1597 1598 def vote_on_proposal( 1599 self, 1600 key: Keypair, 1601 proposal_id: int, 1602 agree: bool, 1603 ) -> ExtrinsicReceipt: 1604 """ 1605 Casts a vote on a specified proposal within the network. 1606 1607 Args: 1608 key: The keypair used for signing the vote transaction. 1609 proposal_id: The unique identifier of the proposal to vote on. 1610 1611 Returns: 1612 A receipt of the voting transaction in nanotokens. 1613 1614 Raises: 1615 InvalidProposalIDError: If the provided proposal ID does not 1616 exist or is invalid. 1617 ChainTransactionError: If the transaction fails. 1618 """ 1619 1620 params = {"proposal_id": proposal_id, "agree": agree} 1621 1622 response = self.compose_call( 1623 "vote_proposal", 1624 key=key, 1625 params=params, 1626 module="GovernanceModule", 1627 ) 1628 1629 return response 1630 1631 def unvote_on_proposal( 1632 self, 1633 key: Keypair, 1634 proposal_id: int, 1635 ) -> ExtrinsicReceipt: 1636 """ 1637 Retracts a previously cast vote on a specified proposal. 1638 1639 Args: 1640 key: The keypair used for signing the unvote transaction. 1641 proposal_id: The unique identifier of the proposal to withdraw the 1642 vote from. 1643 1644 Returns: 1645 A receipt of the unvoting transaction in nanotokens. 1646 1647 Raises: 1648 InvalidProposalIDError: If the provided proposal ID does not 1649 exist or is invalid. 1650 ChainTransactionError: If the transaction fails to be processed, or 1651 if there was no prior vote to retract. 1652 """ 1653 1654 params = {"proposal_id": proposal_id} 1655 1656 response = self.compose_call( 1657 "remove_vote_proposal", 1658 key=key, 1659 params=params, 1660 module="GovernanceModule", 1661 ) 1662 1663 return response 1664 1665 def enable_vote_power_delegation(self, key: Keypair) -> ExtrinsicReceipt: 1666 """ 1667 Enables vote power delegation for the signer's account. 1668 1669 Args: 1670 key: The keypair used for signing the delegation transaction. 1671 1672 Returns: 1673 A receipt of the vote power delegation transaction. 1674 1675 Raises: 1676 ChainTransactionError: If the transaction fails. 1677 """ 1678 1679 response = self.compose_call( 1680 "enable_vote_power_delegation", 1681 params={}, 1682 key=key, 1683 module="GovernanceModule", 1684 ) 1685 1686 return response 1687 1688 def disable_vote_power_delegation(self, key: Keypair) -> ExtrinsicReceipt: 1689 """ 1690 Disables vote power delegation for the signer's account. 1691 1692 Args: 1693 key: The keypair used for signing the delegation transaction. 1694 1695 Returns: 1696 A receipt of the vote power delegation transaction. 1697 1698 Raises: 1699 ChainTransactionError: If the transaction fails. 1700 """ 1701 1702 response = self.compose_call( 1703 "disable_vote_power_delegation", 1704 params={}, 1705 key=key, 1706 module="GovernanceModule", 1707 ) 1708 1709 return response 1710 1711 def add_dao_application( 1712 self, key: Keypair, application_key: Ss58Address, data: str 1713 ) -> ExtrinsicReceipt: 1714 """ 1715 Submits a new application to the general subnet DAO. 1716 1717 Args: 1718 key: The keypair used for signing the application transaction. 1719 application_key: The SS58 address of the application key. 1720 data: The data associated with the application. 1721 1722 Returns: 1723 A receipt of the application transaction. 1724 1725 Raises: 1726 ChainTransactionError: If the transaction fails. 1727 """ 1728 1729 params = {"application_key": application_key, "data": data} 1730 1731 response = self.compose_call( 1732 "add_dao_application", 1733 module="GovernanceModule", 1734 key=key, 1735 params=params, 1736 ) 1737 1738 return response 1739 1740 def query_map_curator_applications(self) -> dict[str, dict[str, str]]: 1741 query_result = self.query_map( 1742 "CuratorApplications", 1743 module="GovernanceModule", 1744 params=[], 1745 extract_value=False, 1746 ) 1747 applications = query_result.get("CuratorApplications", {}) 1748 return applications 1749 1750 def query_map_proposals( 1751 self, extract_value: bool = False 1752 ) -> dict[int, dict[str, Any]]: 1753 """ 1754 Retrieves a mappping of proposals from the network. 1755 1756 Queries the network and returns a mapping of proposal IDs to 1757 their respective parameters. 1758 1759 Returns: 1760 A dictionary mapping proposal IDs 1761 to dictionaries of their parameters. 1762 1763 Raises: 1764 QueryError: If the query to the network fails or is invalid. 1765 """ 1766 1767 return self.query_map( 1768 "Proposals", extract_value=extract_value, module="GovernanceModule" 1769 )["Proposals"] 1770 1771 def query_map_weights( 1772 self, netuid: int = 0, extract_value: bool = False 1773 ) -> dict[int, list[tuple[int, int]]] | None: 1774 """ 1775 Retrieves a mapping of weights for keys on the network. 1776 1777 Queries the network and returns a mapping of key UIDs to 1778 their respective weights. 1779 1780 Args: 1781 netuid: The network UID from which to get the weights. 1782 1783 Returns: 1784 A dictionary mapping key UIDs to lists of their weights. 1785 1786 Raises: 1787 QueryError: If the query to the network fails or is invalid. 1788 """ 1789 1790 weights_dict = self.query_map( 1791 "Weights", [netuid], extract_value=extract_value 1792 ).get("Weights") 1793 return weights_dict 1794 1795 def query_map_key( 1796 self, 1797 netuid: int = 0, 1798 extract_value: bool = False, 1799 ) -> dict[int, Ss58Address]: 1800 """ 1801 Retrieves a map of keys from the network. 1802 1803 Fetches a mapping of key UIDs to their associated 1804 addresses on the network. 1805 The query can be targeted at a specific network UID if required. 1806 1807 Args: 1808 netuid: The network UID from which to get the keys. 1809 1810 Returns: 1811 A dictionary mapping key UIDs to their addresses. 1812 1813 Raises: 1814 QueryError: If the query to the network fails or is invalid. 1815 """ 1816 return self.query_map("Keys", [netuid], extract_value=extract_value)[ 1817 "Keys" 1818 ] 1819 1820 def query_map_address( 1821 self, netuid: int = 0, extract_value: bool = False 1822 ) -> dict[int, str]: 1823 """ 1824 Retrieves a map of key addresses from the network. 1825 1826 Queries the network for a mapping of key UIDs to their addresses. 1827 1828 Args: 1829 netuid: The network UID from which to get the addresses. 1830 1831 Returns: 1832 A dictionary mapping key UIDs to their addresses. 1833 1834 Raises: 1835 QueryError: If the query to the network fails or is invalid. 1836 """ 1837 1838 return self.query_map("Address", [netuid], extract_value=extract_value)[ 1839 "Address" 1840 ] 1841 1842 def query_map_emission( 1843 self, extract_value: bool = False 1844 ) -> dict[int, list[int]]: 1845 """ 1846 Retrieves a map of emissions for keys on the network. 1847 1848 Queries the network to get a mapping of 1849 key UIDs to their emission values. 1850 1851 Returns: 1852 A dictionary mapping key UIDs to lists of their emission values. 1853 1854 Raises: 1855 QueryError: If the query to the network fails or is invalid. 1856 """ 1857 1858 return self.query_map("Emission", extract_value=extract_value)[ 1859 "Emission" 1860 ] 1861 1862 def query_map_pending_emission(self, extract_value: bool = False) -> int: 1863 """ 1864 Retrieves a map of pending emissions for the subnets. 1865 1866 Queries the network for a mapping of subnet UIDs to their pending emission values. 1867 1868 Returns: 1869 A dictionary mapping subnet UIDs to their pending emission values. 1870 1871 Raises: 1872 QueryError: If the query to the network fails or is invalid. 1873 """ 1874 return self.query_map( 1875 "PendingEmission", 1876 extract_value=extract_value, 1877 module="SubnetEmissionModule", 1878 )["PendingEmission"] 1879 1880 def query_map_subnet_emission( 1881 self, extract_value: bool = False 1882 ) -> dict[int, int]: 1883 """ 1884 Retrieves a map of subnet emissions for the network. 1885 1886 Queries the network for a mapping of subnet UIDs to their emission values. 1887 1888 Returns: 1889 A dictionary mapping subnet UIDs to their emission values. 1890 1891 Raises: 1892 QueryError: If the query to the network fails or is invalid. 1893 """ 1894 1895 return self.query_map( 1896 "SubnetEmission", 1897 extract_value=extract_value, 1898 module="SubnetEmissionModule", 1899 )["SubnetEmission"] 1900 1901 def query_map_subnet_consensus( 1902 self, extract_value: bool = False 1903 ) -> dict[int, str]: 1904 """ 1905 Retrieves a map of subnet consensus types for the network. 1906 1907 Queries the network for a mapping of subnet UIDs to their consensus types. 1908 1909 Returns: 1910 A dictionary mapping subnet UIDs to their consensus types. 1911 1912 Raises: 1913 QueryError: If the query to the network fails or is invalid. 1914 """ 1915 1916 return self.query_map( 1917 "SubnetConsensusType", 1918 extract_value=extract_value, 1919 module="SubnetEmissionModule", 1920 )["SubnetConsensusType"] 1921 1922 def query_map_incentive( 1923 self, extract_value: bool = False 1924 ) -> dict[int, list[int]]: 1925 """ 1926 Retrieves a mapping of incentives for keys on the network. 1927 1928 Queries the network and returns a mapping of key UIDs to 1929 their respective incentive values. 1930 1931 Returns: 1932 A dictionary mapping key UIDs to lists of their incentive values. 1933 1934 Raises: 1935 QueryError: If the query to the network fails or is invalid. 1936 """ 1937 1938 return self.query_map("Incentive", extract_value=extract_value)[ 1939 "Incentive" 1940 ] 1941 1942 def query_map_dividend( 1943 self, extract_value: bool = False 1944 ) -> dict[int, list[int]]: 1945 """ 1946 Retrieves a mapping of dividends for keys on the network. 1947 1948 Queries the network for a mapping of key UIDs to 1949 their dividend values. 1950 1951 Returns: 1952 A dictionary mapping key UIDs to lists of their dividend values. 1953 1954 Raises: 1955 QueryError: If the query to the network fails or is invalid. 1956 """ 1957 1958 return self.query_map("Dividends", extract_value=extract_value)[ 1959 "Dividends" 1960 ] 1961 1962 def query_map_regblock( 1963 self, netuid: int = 0, extract_value: bool = False 1964 ) -> dict[int, int]: 1965 """ 1966 Retrieves a mapping of registration blocks for keys on the network. 1967 1968 Queries the network for a mapping of key UIDs to 1969 the blocks where they were registered. 1970 1971 Args: 1972 netuid: The network UID from which to get the registration blocks. 1973 1974 Returns: 1975 A dictionary mapping key UIDs to their registration blocks. 1976 1977 Raises: 1978 QueryError: If the query to the network fails or is invalid. 1979 """ 1980 1981 return self.query_map( 1982 "RegistrationBlock", [netuid], extract_value=extract_value 1983 )["RegistrationBlock"] 1984 1985 def query_map_lastupdate( 1986 self, extract_value: bool = False 1987 ) -> dict[int, list[int]]: 1988 """ 1989 Retrieves a mapping of the last update times for keys on the network. 1990 1991 Queries the network for a mapping of key UIDs to their last update times. 1992 1993 Returns: 1994 A dictionary mapping key UIDs to lists of their last update times. 1995 1996 Raises: 1997 QueryError: If the query to the network fails or is invalid. 1998 """ 1999 2000 return self.query_map("LastUpdate", extract_value=extract_value)[ 2001 "LastUpdate" 2002 ] 2003 2004 def query_map_stakefrom( 2005 self, extract_value: bool = False 2006 ) -> dict[Ss58Address, list[tuple[Ss58Address, int]]]: 2007 """ 2008 Retrieves a mapping of stakes from various sources for keys on the network. 2009 2010 Queries the network to obtain a mapping of key addresses to the sources 2011 and amounts of stakes they have received. 2012 2013 Args: 2014 netuid: The network UID from which to get the stakes. 2015 2016 Returns: 2017 A dictionary mapping key addresses to lists of tuples 2018 (module_key_address, amount). 2019 2020 Raises: 2021 QueryError: If the query to the network fails or is invalid. 2022 """ 2023 2024 result = self.query_map("StakeFrom", [], extract_value=extract_value)[ 2025 "StakeFrom" 2026 ] 2027 2028 return transform_stake_dmap(result) 2029 2030 def query_map_staketo( 2031 self, extract_value: bool = False 2032 ) -> dict[Ss58Address, list[tuple[Ss58Address, int]]]: 2033 """ 2034 Retrieves a mapping of stakes to destinations for keys on the network. 2035 2036 Queries the network for a mapping of key addresses to the destinations 2037 and amounts of stakes they have made. 2038 2039 Args: 2040 netuid: The network UID from which to get the stakes. 2041 2042 Returns: 2043 A dictionary mapping key addresses to lists of tuples 2044 (module_key_address, amount). 2045 2046 Raises: 2047 QueryError: If the query to the network fails or is invalid. 2048 """ 2049 2050 result = self.query_map("StakeTo", [], extract_value=extract_value)[ 2051 "StakeTo" 2052 ] 2053 return transform_stake_dmap(result) 2054 2055 def query_map_delegationfee( 2056 self, netuid: int = 0, extract_value: bool = False 2057 ) -> dict[str, int]: 2058 """ 2059 Retrieves a mapping of delegation fees for keys on the network. 2060 2061 Queries the network to obtain a mapping of key addresses to their 2062 respective delegation fees. 2063 2064 Args: 2065 netuid: The network UID to filter the delegation fees. 2066 2067 Returns: 2068 A dictionary mapping key addresses to their delegation fees. 2069 2070 Raises: 2071 QueryError: If the query to the network fails or is invalid. 2072 """ 2073 2074 return self.query_map( 2075 "DelegationFee", [netuid], extract_value=extract_value 2076 )["DelegationFee"] 2077 2078 def query_map_tempo(self, extract_value: bool = False) -> dict[int, int]: 2079 """ 2080 Retrieves a mapping of tempo settings for the network. 2081 2082 Queries the network to obtain the tempo (rate of reward distributions) 2083 settings for various network subnets. 2084 2085 Returns: 2086 A dictionary mapping network UIDs to their tempo settings. 2087 2088 Raises: 2089 QueryError: If the query to the network fails or is invalid. 2090 """ 2091 2092 return self.query_map("Tempo", extract_value=extract_value)["Tempo"] 2093 2094 def query_map_immunity_period(self, extract_value: bool) -> dict[int, int]: 2095 """ 2096 Retrieves a mapping of immunity periods for the network. 2097 2098 Queries the network for the immunity period settings, 2099 which represent the time duration during which modules 2100 can not get deregistered. 2101 2102 Returns: 2103 A dictionary mapping network UIDs to their immunity period settings. 2104 2105 Raises: 2106 QueryError: If the query to the network fails or is invalid. 2107 """ 2108 2109 return self.query_map("ImmunityPeriod", extract_value=extract_value)[ 2110 "ImmunityPeriod" 2111 ] 2112 2113 def query_map_min_allowed_weights( 2114 self, extract_value: bool = False 2115 ) -> dict[int, int]: 2116 """ 2117 Retrieves a mapping of minimum allowed weights for the network. 2118 2119 Queries the network to obtain the minimum allowed weights, 2120 which are the lowest permissible weight values that can be set by 2121 validators. 2122 2123 Returns: 2124 A dictionary mapping network UIDs to 2125 their minimum allowed weight values. 2126 2127 Raises: 2128 QueryError: If the query to the network fails or is invalid. 2129 """ 2130 2131 return self.query_map("MinAllowedWeights", extract_value=extract_value)[ 2132 "MinAllowedWeights" 2133 ] 2134 2135 def query_map_max_allowed_weights( 2136 self, extract_value: bool = False 2137 ) -> dict[int, int]: 2138 """ 2139 Retrieves a mapping of maximum allowed weights for the network. 2140 2141 Queries the network for the maximum allowed weights, 2142 which are the highest permissible 2143 weight values that can be set by validators. 2144 2145 Returns: 2146 A dictionary mapping network UIDs to 2147 their maximum allowed weight values. 2148 2149 Raises: 2150 QueryError: If the query to the network fails or is invalid. 2151 """ 2152 2153 return self.query_map("MaxAllowedWeights", extract_value=extract_value)[ 2154 "MaxAllowedWeights" 2155 ] 2156 2157 def query_map_max_allowed_uids( 2158 self, extract_value: bool = False 2159 ) -> dict[int, int]: 2160 """ 2161 Queries the network for the maximum number of allowed user IDs (UIDs) 2162 for each network subnet. 2163 2164 Fetches a mapping of network subnets to their respective 2165 limits on the number of user IDs that can be created or used. 2166 2167 Returns: 2168 A dictionary mapping network UIDs (unique identifiers) to their 2169 maximum allowed number of UIDs. 2170 Each entry represents a network subnet 2171 with its corresponding UID limit. 2172 2173 Raises: 2174 QueryError: If the query to the network fails or is invalid. 2175 """ 2176 2177 return self.query_map("MaxAllowedUids", extract_value=extract_value)[ 2178 "MaxAllowedUids" 2179 ] 2180 2181 def query_map_min_stake( 2182 self, extract_value: bool = False 2183 ) -> dict[int, int]: 2184 """ 2185 Retrieves a mapping of minimum allowed stake on the network. 2186 2187 Queries the network to obtain the minimum number of stake, 2188 which is represented in nanotokens. 2189 2190 Returns: 2191 A dictionary mapping network UIDs to 2192 their minimum allowed stake values. 2193 2194 Raises: 2195 QueryError: If the query to the network fails or is invalid. 2196 """ 2197 2198 return self.query_map("MinStake", extract_value=extract_value)[ 2199 "MinStake&