Edit on GitHub

communex.module.server

Server for Commune modules.

  1"""
  2Server for Commune modules.
  3"""
  4
  5import inspect
  6import random
  7from functools import partial
  8from typing import Any
  9
 10import fastapi
 11from fastapi import APIRouter
 12from pydantic import BaseModel
 13from substrateinterface import Keypair  # type: ignore
 14
 15from communex.key import check_ss58_address
 16from communex.module import _signer as signer
 17from communex.module._rate_limiters.limiters import (IpLimiterParams,
 18                                                     StakeLimiterParams)
 19from communex.module.module import EndpointDefinition, Module, endpoint
 20from communex.module.routers.module_routers import (InputHandlerVerifier,
 21                                                    IpLimiterVerifier,
 22                                                    ListVerifier,
 23                                                    StakeLimiterVerifier,
 24                                                    build_route_class)
 25from communex.types import Ss58Address
 26from communex.util.memo import TTLDict
 27
 28
 29class ModuleServer:
 30    def __init__(
 31        self,
 32        module: Module,
 33        key: Keypair,
 34        max_request_staleness: int = 120,
 35        whitelist: list[Ss58Address] | None = None,
 36        blacklist: list[Ss58Address] | None = None,
 37        subnets_whitelist: list[int] | None = None,
 38        lower_ttl: int = 600,
 39        upper_ttl: int = 700,
 40        limiter: StakeLimiterParams | IpLimiterParams = StakeLimiterParams(),
 41        ip_blacklist: list[str] | None = None,
 42        use_testnet: bool = False,
 43    ) -> None:
 44        self._module = module
 45        self._app = fastapi.FastAPI()
 46        self._subnets_whitelist = subnets_whitelist
 47        self.key = key
 48        self.max_request_staleness = max_request_staleness
 49        ttl = random.randint(lower_ttl, upper_ttl)
 50        self._blockchain_cache = TTLDict[str, list[Ss58Address]](ttl)
 51        self._ip_blacklist = ip_blacklist
 52
 53        # to keep reference to add_to_blacklist and add_to_whitelist
 54        whitelist_: list[Ss58Address] = [] if whitelist is None else whitelist
 55        blacklist_: list[Ss58Address] = [] if blacklist is None else blacklist
 56
 57        self._whitelist = whitelist_
 58        self._blacklist = blacklist_
 59
 60        self._build_routers(use_testnet, limiter)
 61
 62    def _build_routers(
 63            self, use_testnet: bool,
 64            limiter: StakeLimiterParams | IpLimiterParams
 65    ):
 66        input_handler = InputHandlerVerifier(
 67            self._subnets_whitelist,
 68            check_ss58_address(self.key.ss58_address),
 69            self.max_request_staleness,
 70            self._blockchain_cache,
 71            self.key,
 72            use_testnet
 73        )
 74        check_lists = ListVerifier(
 75            self._blacklist,
 76            self._whitelist,
 77            self._ip_blacklist
 78        )
 79        if isinstance(limiter, StakeLimiterParams):
 80            limiter_verifier = StakeLimiterVerifier(
 81                self._subnets_whitelist, limiter
 82            )
 83        else:
 84            limiter_verifier = IpLimiterVerifier(limiter)
 85
 86        # order of verifiers is extremely important
 87        verifiers = [check_lists, input_handler, limiter_verifier]
 88        route_class = build_route_class(verifiers)
 89        self._router = APIRouter(route_class=route_class)
 90        self.register_endpoints(self._router)
 91        self._app.include_router(self._router)
 92
 93    def get_fastapi_app(self):
 94        return self._app
 95
 96    def register_endpoints(self, router: APIRouter):
 97        endpoints = self._module.get_endpoints()
 98        for name, endpoint_def in endpoints.items():
 99
100            class Body(BaseModel):
101                params: endpoint_def.params_model  # type: ignore
102
103            async def async_handler(end_def: EndpointDefinition[Any, ...], body: Body):
104                return await end_def.fn(self._module, **body.params.model_dump())  # type: ignore
105
106            def handler(end_def: EndpointDefinition[Any, ...], body: Body):
107                return end_def.fn(self._module, **body.params.model_dump())  # type: ignore
108
109            if inspect.iscoroutinefunction(endpoint_def.fn):
110                defined_handler = partial(async_handler, endpoint_def)
111            else:
112                defined_handler = partial(handler, endpoint_def)
113            router.post(f"/method/{name}")(defined_handler)
114
115    def add_to_blacklist(self, ss58_address: Ss58Address):
116        self._blacklist.append(ss58_address)
117
118    def add_to_whitelist(self, ss58_address: Ss58Address):
119        self._whitelist.append(ss58_address)
120
121
122def main():
123    class Amod(Module):
124        @endpoint
125        def do_the_thing(self, awesomness: int = 42):
126            if awesomness > 60:
127                msg = f"You're super awesome: {awesomness} awesomness"
128            else:
129                msg = f"You're not that awesome: {awesomness} awesomness"
130            return {"msg": msg}
131
132    a_mod = Amod()
133    keypair = Keypair.create_from_mnemonic(signer.TESTING_MNEMONIC)
134    server = ModuleServer(
135        a_mod,
136        keypair,
137        subnets_whitelist=[0],
138        blacklist=None,
139    )
140    app = server.get_fastapi_app()
141
142    import uvicorn
143    uvicorn.run(app, host="127.0.0.1", port=8000)  # type: ignore
144
145
146if __name__ == "__main__":
147    main()
class ModuleServer:
 30class ModuleServer:
 31    def __init__(
 32        self,
 33        module: Module,
 34        key: Keypair,
 35        max_request_staleness: int = 120,
 36        whitelist: list[Ss58Address] | None = None,
 37        blacklist: list[Ss58Address] | None = None,
 38        subnets_whitelist: list[int] | None = None,
 39        lower_ttl: int = 600,
 40        upper_ttl: int = 700,
 41        limiter: StakeLimiterParams | IpLimiterParams = StakeLimiterParams(),
 42        ip_blacklist: list[str] | None = None,
 43        use_testnet: bool = False,
 44    ) -> None:
 45        self._module = module
 46        self._app = fastapi.FastAPI()
 47        self._subnets_whitelist = subnets_whitelist
 48        self.key = key
 49        self.max_request_staleness = max_request_staleness
 50        ttl = random.randint(lower_ttl, upper_ttl)
 51        self._blockchain_cache = TTLDict[str, list[Ss58Address]](ttl)
 52        self._ip_blacklist = ip_blacklist
 53
 54        # to keep reference to add_to_blacklist and add_to_whitelist
 55        whitelist_: list[Ss58Address] = [] if whitelist is None else whitelist
 56        blacklist_: list[Ss58Address] = [] if blacklist is None else blacklist
 57
 58        self._whitelist = whitelist_
 59        self._blacklist = blacklist_
 60
 61        self._build_routers(use_testnet, limiter)
 62
 63    def _build_routers(
 64            self, use_testnet: bool,
 65            limiter: StakeLimiterParams | IpLimiterParams
 66    ):
 67        input_handler = InputHandlerVerifier(
 68            self._subnets_whitelist,
 69            check_ss58_address(self.key.ss58_address),
 70            self.max_request_staleness,
 71            self._blockchain_cache,
 72            self.key,
 73            use_testnet
 74        )
 75        check_lists = ListVerifier(
 76            self._blacklist,
 77            self._whitelist,
 78            self._ip_blacklist
 79        )
 80        if isinstance(limiter, StakeLimiterParams):
 81            limiter_verifier = StakeLimiterVerifier(
 82                self._subnets_whitelist, limiter
 83            )
 84        else:
 85            limiter_verifier = IpLimiterVerifier(limiter)
 86
 87        # order of verifiers is extremely important
 88        verifiers = [check_lists, input_handler, limiter_verifier]
 89        route_class = build_route_class(verifiers)
 90        self._router = APIRouter(route_class=route_class)
 91        self.register_endpoints(self._router)
 92        self._app.include_router(self._router)
 93
 94    def get_fastapi_app(self):
 95        return self._app
 96
 97    def register_endpoints(self, router: APIRouter):
 98        endpoints = self._module.get_endpoints()
 99        for name, endpoint_def in endpoints.items():
100
101            class Body(BaseModel):
102                params: endpoint_def.params_model  # type: ignore
103
104            async def async_handler(end_def: EndpointDefinition[Any, ...], body: Body):
105                return await end_def.fn(self._module, **body.params.model_dump())  # type: ignore
106
107            def handler(end_def: EndpointDefinition[Any, ...], body: Body):
108                return end_def.fn(self._module, **body.params.model_dump())  # type: ignore
109
110            if inspect.iscoroutinefunction(endpoint_def.fn):
111                defined_handler = partial(async_handler, endpoint_def)
112            else:
113                defined_handler = partial(handler, endpoint_def)
114            router.post(f"/method/{name}")(defined_handler)
115
116    def add_to_blacklist(self, ss58_address: Ss58Address):
117        self._blacklist.append(ss58_address)
118
119    def add_to_whitelist(self, ss58_address: Ss58Address):
120        self._whitelist.append(ss58_address)
ModuleServer( module: communex.module.module.Module, key: substrateinterface.keypair.Keypair, max_request_staleness: int = 120, whitelist: list[communex.types.Ss58Address] | None = None, blacklist: list[communex.types.Ss58Address] | None = None, subnets_whitelist: list[int] | None = None, lower_ttl: int = 600, upper_ttl: int = 700, limiter: communex.module._rate_limiters.limiters.StakeLimiterParams | communex.module._rate_limiters.limiters.IpLimiterParams = StakeLimiterParams(epoch=800, cache_age=600, get_refill_per_epoch=None, token_ratio=1), ip_blacklist: list[str] | None = None, use_testnet: bool = False)
31    def __init__(
32        self,
33        module: Module,
34        key: Keypair,
35        max_request_staleness: int = 120,
36        whitelist: list[Ss58Address] | None = None,
37        blacklist: list[Ss58Address] | None = None,
38        subnets_whitelist: list[int] | None = None,
39        lower_ttl: int = 600,
40        upper_ttl: int = 700,
41        limiter: StakeLimiterParams | IpLimiterParams = StakeLimiterParams(),
42        ip_blacklist: list[str] | None = None,
43        use_testnet: bool = False,
44    ) -> None:
45        self._module = module
46        self._app = fastapi.FastAPI()
47        self._subnets_whitelist = subnets_whitelist
48        self.key = key
49        self.max_request_staleness = max_request_staleness
50        ttl = random.randint(lower_ttl, upper_ttl)
51        self._blockchain_cache = TTLDict[str, list[Ss58Address]](ttl)
52        self._ip_blacklist = ip_blacklist
53
54        # to keep reference to add_to_blacklist and add_to_whitelist
55        whitelist_: list[Ss58Address] = [] if whitelist is None else whitelist
56        blacklist_: list[Ss58Address] = [] if blacklist is None else blacklist
57
58        self._whitelist = whitelist_
59        self._blacklist = blacklist_
60
61        self._build_routers(use_testnet, limiter)
key
max_request_staleness
def get_fastapi_app(self):
94    def get_fastapi_app(self):
95        return self._app
def register_endpoints(self, router: fastapi.routing.APIRouter):
 97    def register_endpoints(self, router: APIRouter):
 98        endpoints = self._module.get_endpoints()
 99        for name, endpoint_def in endpoints.items():
100
101            class Body(BaseModel):
102                params: endpoint_def.params_model  # type: ignore
103
104            async def async_handler(end_def: EndpointDefinition[Any, ...], body: Body):
105                return await end_def.fn(self._module, **body.params.model_dump())  # type: ignore
106
107            def handler(end_def: EndpointDefinition[Any, ...], body: Body):
108                return end_def.fn(self._module, **body.params.model_dump())  # type: ignore
109
110            if inspect.iscoroutinefunction(endpoint_def.fn):
111                defined_handler = partial(async_handler, endpoint_def)
112            else:
113                defined_handler = partial(handler, endpoint_def)
114            router.post(f"/method/{name}")(defined_handler)
def add_to_blacklist(self, ss58_address: communex.types.Ss58Address):
116    def add_to_blacklist(self, ss58_address: Ss58Address):
117        self._blacklist.append(ss58_address)
def add_to_whitelist(self, ss58_address: communex.types.Ss58Address):
119    def add_to_whitelist(self, ss58_address: Ss58Address):
120        self._whitelist.append(ss58_address)
def main():
123def main():
124    class Amod(Module):
125        @endpoint
126        def do_the_thing(self, awesomness: int = 42):
127            if awesomness > 60:
128                msg = f"You're super awesome: {awesomness} awesomness"
129            else:
130                msg = f"You're not that awesome: {awesomness} awesomness"
131            return {"msg": msg}
132
133    a_mod = Amod()
134    keypair = Keypair.create_from_mnemonic(signer.TESTING_MNEMONIC)
135    server = ModuleServer(
136        a_mod,
137        keypair,
138        subnets_whitelist=[0],
139        blacklist=None,
140    )
141    app = server.get_fastapi_app()
142
143    import uvicorn
144    uvicorn.run(app, host="127.0.0.1", port=8000)  # type: ignore