communex.module.server
Server for Commune modules.
1""" 2Server for Commune modules. 3""" 4 5import inspect 6import random 7from functools import partial 8from typing import Any 9 10import fastapi 11from fastapi import APIRouter 12from pydantic import BaseModel 13from substrateinterface import Keypair 14 15from communex.key import check_ss58_address 16from communex.module import _signer as signer 17from communex.module._rate_limiters.limiters import ( 18 IpLimiterParams, 19 StakeLimiterParams, 20) 21from communex.module.module import EndpointDefinition, Module, endpoint 22from communex.module.routers.module_routers import ( 23 InputHandlerVerifier, 24 IpLimiterVerifier, 25 ListVerifier, 26 StakeLimiterVerifier, 27 build_route_class, 28) 29from communex.types import Ss58Address 30from communex.util.memo import TTLDict 31 32 33class ModuleServer: 34 def __init__( 35 self, 36 module: Module, 37 key: Keypair, 38 max_request_staleness: int = 120, 39 whitelist: list[Ss58Address] | None = None, 40 blacklist: list[Ss58Address] | None = None, 41 subnets_whitelist: list[int] | None = None, 42 lower_ttl: int = 600, 43 upper_ttl: int = 700, 44 limiter: StakeLimiterParams | IpLimiterParams = StakeLimiterParams(), 45 ip_blacklist: list[str] | None = None, 46 use_testnet: bool = False, 47 ) -> None: 48 self._module = module 49 self._app = fastapi.FastAPI() 50 self._subnets_whitelist = subnets_whitelist 51 self.key = key 52 self.max_request_staleness = max_request_staleness 53 ttl = random.randint(lower_ttl, upper_ttl) 54 self._blockchain_cache = TTLDict[str, list[Ss58Address]](ttl) 55 self._ip_blacklist = ip_blacklist 56 57 # to keep reference to add_to_blacklist and add_to_whitelist 58 whitelist_: list[Ss58Address] = [] if whitelist is None else whitelist 59 blacklist_: list[Ss58Address] = [] if blacklist is None else blacklist 60 61 self._whitelist = whitelist_ 62 self._blacklist = blacklist_ 63 64 self._build_routers(use_testnet, limiter) 65 66 def _build_routers( 67 self, use_testnet: bool, limiter: StakeLimiterParams | IpLimiterParams 68 ): 69 input_handler = InputHandlerVerifier( 70 self._subnets_whitelist, 71 check_ss58_address(self.key.ss58_address), 72 self.max_request_staleness, 73 self._blockchain_cache, 74 self.key, 75 use_testnet, 76 ) 77 check_lists = ListVerifier( 78 self._blacklist, self._whitelist, self._ip_blacklist 79 ) 80 if isinstance(limiter, StakeLimiterParams): 81 limiter_verifier = StakeLimiterVerifier( 82 self._subnets_whitelist, limiter 83 ) 84 else: 85 limiter_verifier = IpLimiterVerifier(limiter) 86 87 # order of verifiers is extremely important 88 verifiers = [check_lists, input_handler, limiter_verifier] 89 route_class = build_route_class(verifiers) 90 self._router = APIRouter(route_class=route_class) 91 self.register_endpoints(self._router) 92 self._app.include_router(self._router) 93 94 def get_fastapi_app(self): 95 return self._app 96 97 def register_endpoints(self, router: APIRouter): 98 endpoints = self._module.get_endpoints() 99 for name, endpoint_def in endpoints.items(): 100 101 class Body(BaseModel): 102 params: endpoint_def.params_model # type: ignore 103 104 async def async_handler( 105 end_def: EndpointDefinition[Any, ...], body: Body 106 ): 107 return await end_def.fn( 108 self._module, 109 **body.params.model_dump(), # type: ignore 110 ) 111 112 def handler(end_def: EndpointDefinition[Any, ...], body: Body): 113 return end_def.fn(self._module, **body.params.model_dump()) # type: ignore 114 115 if inspect.iscoroutinefunction(endpoint_def.fn): 116 defined_handler = partial(async_handler, endpoint_def) 117 else: 118 defined_handler = partial(handler, endpoint_def) 119 router.post(f"/method/{name}")(defined_handler) 120 121 def add_to_blacklist(self, ss58_address: Ss58Address): 122 self._blacklist.append(ss58_address) 123 124 def add_to_whitelist(self, ss58_address: Ss58Address): 125 self._whitelist.append(ss58_address) 126 127 128def main(): 129 class Amod(Module): 130 @endpoint 131 def do_the_thing(self, awesomness: int = 42): 132 if awesomness > 60: 133 msg = f"You're super awesome: {awesomness} awesomness" 134 else: 135 msg = f"You're not that awesome: {awesomness} awesomness" 136 return {"msg": msg} 137 138 a_mod = Amod() 139 keypair = Keypair.create_from_mnemonic(signer.TESTING_MNEMONIC) 140 server = ModuleServer( 141 a_mod, 142 keypair, 143 subnets_whitelist=[0], 144 blacklist=None, 145 ) 146 app = server.get_fastapi_app() 147 148 import uvicorn 149 150 uvicorn.run(app, host="127.0.0.1", port=8000) # type: ignore 151 152 153if __name__ == "__main__": 154 main()
class
ModuleServer:
34class ModuleServer: 35 def __init__( 36 self, 37 module: Module, 38 key: Keypair, 39 max_request_staleness: int = 120, 40 whitelist: list[Ss58Address] | None = None, 41 blacklist: list[Ss58Address] | None = None, 42 subnets_whitelist: list[int] | None = None, 43 lower_ttl: int = 600, 44 upper_ttl: int = 700, 45 limiter: StakeLimiterParams | IpLimiterParams = StakeLimiterParams(), 46 ip_blacklist: list[str] | None = None, 47 use_testnet: bool = False, 48 ) -> None: 49 self._module = module 50 self._app = fastapi.FastAPI() 51 self._subnets_whitelist = subnets_whitelist 52 self.key = key 53 self.max_request_staleness = max_request_staleness 54 ttl = random.randint(lower_ttl, upper_ttl) 55 self._blockchain_cache = TTLDict[str, list[Ss58Address]](ttl) 56 self._ip_blacklist = ip_blacklist 57 58 # to keep reference to add_to_blacklist and add_to_whitelist 59 whitelist_: list[Ss58Address] = [] if whitelist is None else whitelist 60 blacklist_: list[Ss58Address] = [] if blacklist is None else blacklist 61 62 self._whitelist = whitelist_ 63 self._blacklist = blacklist_ 64 65 self._build_routers(use_testnet, limiter) 66 67 def _build_routers( 68 self, use_testnet: bool, limiter: StakeLimiterParams | IpLimiterParams 69 ): 70 input_handler = InputHandlerVerifier( 71 self._subnets_whitelist, 72 check_ss58_address(self.key.ss58_address), 73 self.max_request_staleness, 74 self._blockchain_cache, 75 self.key, 76 use_testnet, 77 ) 78 check_lists = ListVerifier( 79 self._blacklist, self._whitelist, self._ip_blacklist 80 ) 81 if isinstance(limiter, StakeLimiterParams): 82 limiter_verifier = StakeLimiterVerifier( 83 self._subnets_whitelist, limiter 84 ) 85 else: 86 limiter_verifier = IpLimiterVerifier(limiter) 87 88 # order of verifiers is extremely important 89 verifiers = [check_lists, input_handler, limiter_verifier] 90 route_class = build_route_class(verifiers) 91 self._router = APIRouter(route_class=route_class) 92 self.register_endpoints(self._router) 93 self._app.include_router(self._router) 94 95 def get_fastapi_app(self): 96 return self._app 97 98 def register_endpoints(self, router: APIRouter): 99 endpoints = self._module.get_endpoints() 100 for name, endpoint_def in endpoints.items(): 101 102 class Body(BaseModel): 103 params: endpoint_def.params_model # type: ignore 104 105 async def async_handler( 106 end_def: EndpointDefinition[Any, ...], body: Body 107 ): 108 return await end_def.fn( 109 self._module, 110 **body.params.model_dump(), # type: ignore 111 ) 112 113 def handler(end_def: EndpointDefinition[Any, ...], body: Body): 114 return end_def.fn(self._module, **body.params.model_dump()) # type: ignore 115 116 if inspect.iscoroutinefunction(endpoint_def.fn): 117 defined_handler = partial(async_handler, endpoint_def) 118 else: 119 defined_handler = partial(handler, endpoint_def) 120 router.post(f"/method/{name}")(defined_handler) 121 122 def add_to_blacklist(self, ss58_address: Ss58Address): 123 self._blacklist.append(ss58_address) 124 125 def add_to_whitelist(self, ss58_address: Ss58Address): 126 self._whitelist.append(ss58_address)
ModuleServer( module: communex.module.module.Module, key: substrateinterface.keypair.Keypair, max_request_staleness: int = 120, whitelist: list[communex.types.Ss58Address] | None = None, blacklist: list[communex.types.Ss58Address] | None = None, subnets_whitelist: list[int] | None = None, lower_ttl: int = 600, upper_ttl: int = 700, limiter: communex.module._rate_limiters.limiters.StakeLimiterParams | communex.module._rate_limiters.limiters.IpLimiterParams = StakeLimiterParams(epoch=800, cache_age=600, get_refill_per_epoch=None, token_ratio=1), ip_blacklist: list[str] | None = None, use_testnet: bool = False)
35 def __init__( 36 self, 37 module: Module, 38 key: Keypair, 39 max_request_staleness: int = 120, 40 whitelist: list[Ss58Address] | None = None, 41 blacklist: list[Ss58Address] | None = None, 42 subnets_whitelist: list[int] | None = None, 43 lower_ttl: int = 600, 44 upper_ttl: int = 700, 45 limiter: StakeLimiterParams | IpLimiterParams = StakeLimiterParams(), 46 ip_blacklist: list[str] | None = None, 47 use_testnet: bool = False, 48 ) -> None: 49 self._module = module 50 self._app = fastapi.FastAPI() 51 self._subnets_whitelist = subnets_whitelist 52 self.key = key 53 self.max_request_staleness = max_request_staleness 54 ttl = random.randint(lower_ttl, upper_ttl) 55 self._blockchain_cache = TTLDict[str, list[Ss58Address]](ttl) 56 self._ip_blacklist = ip_blacklist 57 58 # to keep reference to add_to_blacklist and add_to_whitelist 59 whitelist_: list[Ss58Address] = [] if whitelist is None else whitelist 60 blacklist_: list[Ss58Address] = [] if blacklist is None else blacklist 61 62 self._whitelist = whitelist_ 63 self._blacklist = blacklist_ 64 65 self._build_routers(use_testnet, limiter)
def
register_endpoints(self, router: fastapi.routing.APIRouter):
98 def register_endpoints(self, router: APIRouter): 99 endpoints = self._module.get_endpoints() 100 for name, endpoint_def in endpoints.items(): 101 102 class Body(BaseModel): 103 params: endpoint_def.params_model # type: ignore 104 105 async def async_handler( 106 end_def: EndpointDefinition[Any, ...], body: Body 107 ): 108 return await end_def.fn( 109 self._module, 110 **body.params.model_dump(), # type: ignore 111 ) 112 113 def handler(end_def: EndpointDefinition[Any, ...], body: Body): 114 return end_def.fn(self._module, **body.params.model_dump()) # type: ignore 115 116 if inspect.iscoroutinefunction(endpoint_def.fn): 117 defined_handler = partial(async_handler, endpoint_def) 118 else: 119 defined_handler = partial(handler, endpoint_def) 120 router.post(f"/method/{name}")(defined_handler)
def
main():
129def main(): 130 class Amod(Module): 131 @endpoint 132 def do_the_thing(self, awesomness: int = 42): 133 if awesomness > 60: 134 msg = f"You're super awesome: {awesomness} awesomness" 135 else: 136 msg = f"You're not that awesome: {awesomness} awesomness" 137 return {"msg": msg} 138 139 a_mod = Amod() 140 keypair = Keypair.create_from_mnemonic(signer.TESTING_MNEMONIC) 141 server = ModuleServer( 142 a_mod, 143 keypair, 144 subnets_whitelist=[0], 145 blacklist=None, 146 ) 147 app = server.get_fastapi_app() 148 149 import uvicorn 150 151 uvicorn.run(app, host="127.0.0.1", port=8000) # type: ignore