communex.module.server
Server for Commune modules.
1""" 2Server for Commune modules. 3""" 4 5import inspect 6import random 7from functools import partial 8from typing import Any 9 10import fastapi 11from fastapi import APIRouter 12from pydantic import BaseModel 13from substrateinterface import Keypair # type: ignore 14 15from communex.key import check_ss58_address 16from communex.module import _signer as signer 17from communex.module._rate_limiters.limiters import (IpLimiterParams, 18 StakeLimiterParams) 19from communex.module.module import EndpointDefinition, Module, endpoint 20from communex.module.routers.module_routers import (InputHandlerVerifier, 21 IpLimiterVerifier, 22 ListVerifier, 23 StakeLimiterVerifier, 24 build_route_class) 25from communex.types import Ss58Address 26from communex.util.memo import TTLDict 27 28 29class ModuleServer: 30 def __init__( 31 self, 32 module: Module, 33 key: Keypair, 34 max_request_staleness: int = 120, 35 whitelist: list[Ss58Address] | None = None, 36 blacklist: list[Ss58Address] | None = None, 37 subnets_whitelist: list[int] | None = None, 38 lower_ttl: int = 600, 39 upper_ttl: int = 700, 40 limiter: StakeLimiterParams | IpLimiterParams = StakeLimiterParams(), 41 ip_blacklist: list[str] | None = None, 42 use_testnet: bool = False, 43 ) -> None: 44 self._module = module 45 self._app = fastapi.FastAPI() 46 self._subnets_whitelist = subnets_whitelist 47 self.key = key 48 self.max_request_staleness = max_request_staleness 49 ttl = random.randint(lower_ttl, upper_ttl) 50 self._blockchain_cache = TTLDict[str, list[Ss58Address]](ttl) 51 self._ip_blacklist = ip_blacklist 52 53 # to keep reference to add_to_blacklist and add_to_whitelist 54 whitelist_: list[Ss58Address] = [] if whitelist is None else whitelist 55 blacklist_: list[Ss58Address] = [] if blacklist is None else blacklist 56 57 self._whitelist = whitelist_ 58 self._blacklist = blacklist_ 59 60 self._build_routers(use_testnet, limiter) 61 62 def _build_routers( 63 self, use_testnet: bool, 64 limiter: StakeLimiterParams | IpLimiterParams 65 ): 66 input_handler = InputHandlerVerifier( 67 self._subnets_whitelist, 68 check_ss58_address(self.key.ss58_address), 69 self.max_request_staleness, 70 self._blockchain_cache, 71 self.key, 72 use_testnet 73 ) 74 check_lists = ListVerifier( 75 self._blacklist, 76 self._whitelist, 77 self._ip_blacklist 78 ) 79 if isinstance(limiter, StakeLimiterParams): 80 limiter_verifier = StakeLimiterVerifier( 81 self._subnets_whitelist, limiter 82 ) 83 else: 84 limiter_verifier = IpLimiterVerifier(limiter) 85 86 # order of verifiers is extremely important 87 verifiers = [check_lists, input_handler, limiter_verifier] 88 route_class = build_route_class(verifiers) 89 self._router = APIRouter(route_class=route_class) 90 self.register_endpoints(self._router) 91 self._app.include_router(self._router) 92 93 def get_fastapi_app(self): 94 return self._app 95 96 def register_endpoints(self, router: APIRouter): 97 endpoints = self._module.get_endpoints() 98 for name, endpoint_def in endpoints.items(): 99 100 class Body(BaseModel): 101 params: endpoint_def.params_model # type: ignore 102 103 async def async_handler(end_def: EndpointDefinition[Any, ...], body: Body): 104 return await end_def.fn(self._module, **body.params.model_dump()) # type: ignore 105 106 def handler(end_def: EndpointDefinition[Any, ...], body: Body): 107 return end_def.fn(self._module, **body.params.model_dump()) # type: ignore 108 109 if inspect.iscoroutinefunction(endpoint_def.fn): 110 defined_handler = partial(async_handler, endpoint_def) 111 else: 112 defined_handler = partial(handler, endpoint_def) 113 router.post(f"/method/{name}")(defined_handler) 114 115 def add_to_blacklist(self, ss58_address: Ss58Address): 116 self._blacklist.append(ss58_address) 117 118 def add_to_whitelist(self, ss58_address: Ss58Address): 119 self._whitelist.append(ss58_address) 120 121 122def main(): 123 class Amod(Module): 124 @endpoint 125 def do_the_thing(self, awesomness: int = 42): 126 if awesomness > 60: 127 msg = f"You're super awesome: {awesomness} awesomness" 128 else: 129 msg = f"You're not that awesome: {awesomness} awesomness" 130 return {"msg": msg} 131 132 a_mod = Amod() 133 keypair = Keypair.create_from_mnemonic(signer.TESTING_MNEMONIC) 134 server = ModuleServer( 135 a_mod, 136 keypair, 137 subnets_whitelist=[0], 138 blacklist=None, 139 ) 140 app = server.get_fastapi_app() 141 142 import uvicorn 143 uvicorn.run(app, host="127.0.0.1", port=8000) # type: ignore 144 145 146if __name__ == "__main__": 147 main()
class
ModuleServer:
30class ModuleServer: 31 def __init__( 32 self, 33 module: Module, 34 key: Keypair, 35 max_request_staleness: int = 120, 36 whitelist: list[Ss58Address] | None = None, 37 blacklist: list[Ss58Address] | None = None, 38 subnets_whitelist: list[int] | None = None, 39 lower_ttl: int = 600, 40 upper_ttl: int = 700, 41 limiter: StakeLimiterParams | IpLimiterParams = StakeLimiterParams(), 42 ip_blacklist: list[str] | None = None, 43 use_testnet: bool = False, 44 ) -> None: 45 self._module = module 46 self._app = fastapi.FastAPI() 47 self._subnets_whitelist = subnets_whitelist 48 self.key = key 49 self.max_request_staleness = max_request_staleness 50 ttl = random.randint(lower_ttl, upper_ttl) 51 self._blockchain_cache = TTLDict[str, list[Ss58Address]](ttl) 52 self._ip_blacklist = ip_blacklist 53 54 # to keep reference to add_to_blacklist and add_to_whitelist 55 whitelist_: list[Ss58Address] = [] if whitelist is None else whitelist 56 blacklist_: list[Ss58Address] = [] if blacklist is None else blacklist 57 58 self._whitelist = whitelist_ 59 self._blacklist = blacklist_ 60 61 self._build_routers(use_testnet, limiter) 62 63 def _build_routers( 64 self, use_testnet: bool, 65 limiter: StakeLimiterParams | IpLimiterParams 66 ): 67 input_handler = InputHandlerVerifier( 68 self._subnets_whitelist, 69 check_ss58_address(self.key.ss58_address), 70 self.max_request_staleness, 71 self._blockchain_cache, 72 self.key, 73 use_testnet 74 ) 75 check_lists = ListVerifier( 76 self._blacklist, 77 self._whitelist, 78 self._ip_blacklist 79 ) 80 if isinstance(limiter, StakeLimiterParams): 81 limiter_verifier = StakeLimiterVerifier( 82 self._subnets_whitelist, limiter 83 ) 84 else: 85 limiter_verifier = IpLimiterVerifier(limiter) 86 87 # order of verifiers is extremely important 88 verifiers = [check_lists, input_handler, limiter_verifier] 89 route_class = build_route_class(verifiers) 90 self._router = APIRouter(route_class=route_class) 91 self.register_endpoints(self._router) 92 self._app.include_router(self._router) 93 94 def get_fastapi_app(self): 95 return self._app 96 97 def register_endpoints(self, router: APIRouter): 98 endpoints = self._module.get_endpoints() 99 for name, endpoint_def in endpoints.items(): 100 101 class Body(BaseModel): 102 params: endpoint_def.params_model # type: ignore 103 104 async def async_handler(end_def: EndpointDefinition[Any, ...], body: Body): 105 return await end_def.fn(self._module, **body.params.model_dump()) # type: ignore 106 107 def handler(end_def: EndpointDefinition[Any, ...], body: Body): 108 return end_def.fn(self._module, **body.params.model_dump()) # type: ignore 109 110 if inspect.iscoroutinefunction(endpoint_def.fn): 111 defined_handler = partial(async_handler, endpoint_def) 112 else: 113 defined_handler = partial(handler, endpoint_def) 114 router.post(f"/method/{name}")(defined_handler) 115 116 def add_to_blacklist(self, ss58_address: Ss58Address): 117 self._blacklist.append(ss58_address) 118 119 def add_to_whitelist(self, ss58_address: Ss58Address): 120 self._whitelist.append(ss58_address)
ModuleServer( module: communex.module.module.Module, key: substrateinterface.keypair.Keypair, max_request_staleness: int = 120, whitelist: list[communex.types.Ss58Address] | None = None, blacklist: list[communex.types.Ss58Address] | None = None, subnets_whitelist: list[int] | None = None, lower_ttl: int = 600, upper_ttl: int = 700, limiter: communex.module._rate_limiters.limiters.StakeLimiterParams | communex.module._rate_limiters.limiters.IpLimiterParams = StakeLimiterParams(epoch=800, cache_age=600, get_refill_per_epoch=None, token_ratio=1), ip_blacklist: list[str] | None = None, use_testnet: bool = False)
31 def __init__( 32 self, 33 module: Module, 34 key: Keypair, 35 max_request_staleness: int = 120, 36 whitelist: list[Ss58Address] | None = None, 37 blacklist: list[Ss58Address] | None = None, 38 subnets_whitelist: list[int] | None = None, 39 lower_ttl: int = 600, 40 upper_ttl: int = 700, 41 limiter: StakeLimiterParams | IpLimiterParams = StakeLimiterParams(), 42 ip_blacklist: list[str] | None = None, 43 use_testnet: bool = False, 44 ) -> None: 45 self._module = module 46 self._app = fastapi.FastAPI() 47 self._subnets_whitelist = subnets_whitelist 48 self.key = key 49 self.max_request_staleness = max_request_staleness 50 ttl = random.randint(lower_ttl, upper_ttl) 51 self._blockchain_cache = TTLDict[str, list[Ss58Address]](ttl) 52 self._ip_blacklist = ip_blacklist 53 54 # to keep reference to add_to_blacklist and add_to_whitelist 55 whitelist_: list[Ss58Address] = [] if whitelist is None else whitelist 56 blacklist_: list[Ss58Address] = [] if blacklist is None else blacklist 57 58 self._whitelist = whitelist_ 59 self._blacklist = blacklist_ 60 61 self._build_routers(use_testnet, limiter)
def
register_endpoints(self, router: fastapi.routing.APIRouter):
97 def register_endpoints(self, router: APIRouter): 98 endpoints = self._module.get_endpoints() 99 for name, endpoint_def in endpoints.items(): 100 101 class Body(BaseModel): 102 params: endpoint_def.params_model # type: ignore 103 104 async def async_handler(end_def: EndpointDefinition[Any, ...], body: Body): 105 return await end_def.fn(self._module, **body.params.model_dump()) # type: ignore 106 107 def handler(end_def: EndpointDefinition[Any, ...], body: Body): 108 return end_def.fn(self._module, **body.params.model_dump()) # type: ignore 109 110 if inspect.iscoroutinefunction(endpoint_def.fn): 111 defined_handler = partial(async_handler, endpoint_def) 112 else: 113 defined_handler = partial(handler, endpoint_def) 114 router.post(f"/method/{name}")(defined_handler)
def
main():
123def main(): 124 class Amod(Module): 125 @endpoint 126 def do_the_thing(self, awesomness: int = 42): 127 if awesomness > 60: 128 msg = f"You're super awesome: {awesomness} awesomness" 129 else: 130 msg = f"You're not that awesome: {awesomness} awesomness" 131 return {"msg": msg} 132 133 a_mod = Amod() 134 keypair = Keypair.create_from_mnemonic(signer.TESTING_MNEMONIC) 135 server = ModuleServer( 136 a_mod, 137 keypair, 138 subnets_whitelist=[0], 139 blacklist=None, 140 ) 141 app = server.get_fastapi_app() 142 143 import uvicorn 144 uvicorn.run(app, host="127.0.0.1", port=8000) # type: ignore