Edit on GitHub

communex.module.server

Server for Commune modules.

  1"""
  2Server for Commune modules.
  3"""
  4
  5import inspect
  6import random
  7from functools import partial
  8from typing import Any
  9
 10import fastapi
 11from fastapi import APIRouter
 12from pydantic import BaseModel
 13from substrateinterface import Keypair
 14
 15from communex.key import check_ss58_address
 16from communex.module import _signer as signer
 17from communex.module._rate_limiters.limiters import (
 18    IpLimiterParams,
 19    StakeLimiterParams,
 20)
 21from communex.module.module import EndpointDefinition, Module, endpoint
 22from communex.module.routers.module_routers import (
 23    InputHandlerVerifier,
 24    IpLimiterVerifier,
 25    ListVerifier,
 26    StakeLimiterVerifier,
 27    build_route_class,
 28)
 29from communex.types import Ss58Address
 30from communex.util.memo import TTLDict
 31
 32
 33class ModuleServer:
 34    def __init__(
 35        self,
 36        module: Module,
 37        key: Keypair,
 38        max_request_staleness: int = 120,
 39        whitelist: list[Ss58Address] | None = None,
 40        blacklist: list[Ss58Address] | None = None,
 41        subnets_whitelist: list[int] | None = None,
 42        lower_ttl: int = 600,
 43        upper_ttl: int = 700,
 44        limiter: StakeLimiterParams | IpLimiterParams = StakeLimiterParams(),
 45        ip_blacklist: list[str] | None = None,
 46        use_testnet: bool = False,
 47    ) -> None:
 48        self._module = module
 49        self._app = fastapi.FastAPI()
 50        self._subnets_whitelist = subnets_whitelist
 51        self.key = key
 52        self.max_request_staleness = max_request_staleness
 53        ttl = random.randint(lower_ttl, upper_ttl)
 54        self._blockchain_cache = TTLDict[str, list[Ss58Address]](ttl)
 55        self._ip_blacklist = ip_blacklist
 56
 57        # to keep reference to add_to_blacklist and add_to_whitelist
 58        whitelist_: list[Ss58Address] = [] if whitelist is None else whitelist
 59        blacklist_: list[Ss58Address] = [] if blacklist is None else blacklist
 60
 61        self._whitelist = whitelist_
 62        self._blacklist = blacklist_
 63
 64        self._build_routers(use_testnet, limiter)
 65
 66    def _build_routers(
 67        self, use_testnet: bool, limiter: StakeLimiterParams | IpLimiterParams
 68    ):
 69        input_handler = InputHandlerVerifier(
 70            self._subnets_whitelist,
 71            check_ss58_address(self.key.ss58_address),
 72            self.max_request_staleness,
 73            self._blockchain_cache,
 74            self.key,
 75            use_testnet,
 76        )
 77        check_lists = ListVerifier(
 78            self._blacklist, self._whitelist, self._ip_blacklist
 79        )
 80        if isinstance(limiter, StakeLimiterParams):
 81            limiter_verifier = StakeLimiterVerifier(
 82                self._subnets_whitelist, limiter
 83            )
 84        else:
 85            limiter_verifier = IpLimiterVerifier(limiter)
 86
 87        # order of verifiers is extremely important
 88        verifiers = [check_lists, input_handler, limiter_verifier]
 89        route_class = build_route_class(verifiers)
 90        self._router = APIRouter(route_class=route_class)
 91        self.register_endpoints(self._router)
 92        self._app.include_router(self._router)
 93
 94    def get_fastapi_app(self):
 95        return self._app
 96
 97    def register_endpoints(self, router: APIRouter):
 98        endpoints = self._module.get_endpoints()
 99        for name, endpoint_def in endpoints.items():
100
101            class Body(BaseModel):
102                params: endpoint_def.params_model  # type: ignore
103
104            async def async_handler(
105                end_def: EndpointDefinition[Any, ...], body: Body
106            ):
107                return await end_def.fn(
108                    self._module,
109                    **body.params.model_dump(),  # type: ignore
110                )
111
112            def handler(end_def: EndpointDefinition[Any, ...], body: Body):
113                return end_def.fn(self._module, **body.params.model_dump())  # type: ignore
114
115            if inspect.iscoroutinefunction(endpoint_def.fn):
116                defined_handler = partial(async_handler, endpoint_def)
117            else:
118                defined_handler = partial(handler, endpoint_def)
119            router.post(f"/method/{name}")(defined_handler)
120
121    def add_to_blacklist(self, ss58_address: Ss58Address):
122        self._blacklist.append(ss58_address)
123
124    def add_to_whitelist(self, ss58_address: Ss58Address):
125        self._whitelist.append(ss58_address)
126
127
128def main():
129    class Amod(Module):
130        @endpoint
131        def do_the_thing(self, awesomness: int = 42):
132            if awesomness > 60:
133                msg = f"You're super awesome: {awesomness} awesomness"
134            else:
135                msg = f"You're not that awesome: {awesomness} awesomness"
136            return {"msg": msg}
137
138    a_mod = Amod()
139    keypair = Keypair.create_from_mnemonic(signer.TESTING_MNEMONIC)
140    server = ModuleServer(
141        a_mod,
142        keypair,
143        subnets_whitelist=[0],
144        blacklist=None,
145    )
146    app = server.get_fastapi_app()
147
148    import uvicorn
149
150    uvicorn.run(app, host="127.0.0.1", port=8000)  # type: ignore
151
152
153if __name__ == "__main__":
154    main()
class ModuleServer:
 34class ModuleServer:
 35    def __init__(
 36        self,
 37        module: Module,
 38        key: Keypair,
 39        max_request_staleness: int = 120,
 40        whitelist: list[Ss58Address] | None = None,
 41        blacklist: list[Ss58Address] | None = None,
 42        subnets_whitelist: list[int] | None = None,
 43        lower_ttl: int = 600,
 44        upper_ttl: int = 700,
 45        limiter: StakeLimiterParams | IpLimiterParams = StakeLimiterParams(),
 46        ip_blacklist: list[str] | None = None,
 47        use_testnet: bool = False,
 48    ) -> None:
 49        self._module = module
 50        self._app = fastapi.FastAPI()
 51        self._subnets_whitelist = subnets_whitelist
 52        self.key = key
 53        self.max_request_staleness = max_request_staleness
 54        ttl = random.randint(lower_ttl, upper_ttl)
 55        self._blockchain_cache = TTLDict[str, list[Ss58Address]](ttl)
 56        self._ip_blacklist = ip_blacklist
 57
 58        # to keep reference to add_to_blacklist and add_to_whitelist
 59        whitelist_: list[Ss58Address] = [] if whitelist is None else whitelist
 60        blacklist_: list[Ss58Address] = [] if blacklist is None else blacklist
 61
 62        self._whitelist = whitelist_
 63        self._blacklist = blacklist_
 64
 65        self._build_routers(use_testnet, limiter)
 66
 67    def _build_routers(
 68        self, use_testnet: bool, limiter: StakeLimiterParams | IpLimiterParams
 69    ):
 70        input_handler = InputHandlerVerifier(
 71            self._subnets_whitelist,
 72            check_ss58_address(self.key.ss58_address),
 73            self.max_request_staleness,
 74            self._blockchain_cache,
 75            self.key,
 76            use_testnet,
 77        )
 78        check_lists = ListVerifier(
 79            self._blacklist, self._whitelist, self._ip_blacklist
 80        )
 81        if isinstance(limiter, StakeLimiterParams):
 82            limiter_verifier = StakeLimiterVerifier(
 83                self._subnets_whitelist, limiter
 84            )
 85        else:
 86            limiter_verifier = IpLimiterVerifier(limiter)
 87
 88        # order of verifiers is extremely important
 89        verifiers = [check_lists, input_handler, limiter_verifier]
 90        route_class = build_route_class(verifiers)
 91        self._router = APIRouter(route_class=route_class)
 92        self.register_endpoints(self._router)
 93        self._app.include_router(self._router)
 94
 95    def get_fastapi_app(self):
 96        return self._app
 97
 98    def register_endpoints(self, router: APIRouter):
 99        endpoints = self._module.get_endpoints()
100        for name, endpoint_def in endpoints.items():
101
102            class Body(BaseModel):
103                params: endpoint_def.params_model  # type: ignore
104
105            async def async_handler(
106                end_def: EndpointDefinition[Any, ...], body: Body
107            ):
108                return await end_def.fn(
109                    self._module,
110                    **body.params.model_dump(),  # type: ignore
111                )
112
113            def handler(end_def: EndpointDefinition[Any, ...], body: Body):
114                return end_def.fn(self._module, **body.params.model_dump())  # type: ignore
115
116            if inspect.iscoroutinefunction(endpoint_def.fn):
117                defined_handler = partial(async_handler, endpoint_def)
118            else:
119                defined_handler = partial(handler, endpoint_def)
120            router.post(f"/method/{name}")(defined_handler)
121
122    def add_to_blacklist(self, ss58_address: Ss58Address):
123        self._blacklist.append(ss58_address)
124
125    def add_to_whitelist(self, ss58_address: Ss58Address):
126        self._whitelist.append(ss58_address)
ModuleServer( module: communex.module.module.Module, key: substrateinterface.keypair.Keypair, max_request_staleness: int = 120, whitelist: list[communex.types.Ss58Address] | None = None, blacklist: list[communex.types.Ss58Address] | None = None, subnets_whitelist: list[int] | None = None, lower_ttl: int = 600, upper_ttl: int = 700, limiter: communex.module._rate_limiters.limiters.StakeLimiterParams | communex.module._rate_limiters.limiters.IpLimiterParams = StakeLimiterParams(epoch=800, cache_age=600, get_refill_per_epoch=None, token_ratio=1), ip_blacklist: list[str] | None = None, use_testnet: bool = False)
35    def __init__(
36        self,
37        module: Module,
38        key: Keypair,
39        max_request_staleness: int = 120,
40        whitelist: list[Ss58Address] | None = None,
41        blacklist: list[Ss58Address] | None = None,
42        subnets_whitelist: list[int] | None = None,
43        lower_ttl: int = 600,
44        upper_ttl: int = 700,
45        limiter: StakeLimiterParams | IpLimiterParams = StakeLimiterParams(),
46        ip_blacklist: list[str] | None = None,
47        use_testnet: bool = False,
48    ) -> None:
49        self._module = module
50        self._app = fastapi.FastAPI()
51        self._subnets_whitelist = subnets_whitelist
52        self.key = key
53        self.max_request_staleness = max_request_staleness
54        ttl = random.randint(lower_ttl, upper_ttl)
55        self._blockchain_cache = TTLDict[str, list[Ss58Address]](ttl)
56        self._ip_blacklist = ip_blacklist
57
58        # to keep reference to add_to_blacklist and add_to_whitelist
59        whitelist_: list[Ss58Address] = [] if whitelist is None else whitelist
60        blacklist_: list[Ss58Address] = [] if blacklist is None else blacklist
61
62        self._whitelist = whitelist_
63        self._blacklist = blacklist_
64
65        self._build_routers(use_testnet, limiter)
key
max_request_staleness
def get_fastapi_app(self):
95    def get_fastapi_app(self):
96        return self._app
def register_endpoints(self, router: fastapi.routing.APIRouter):
 98    def register_endpoints(self, router: APIRouter):
 99        endpoints = self._module.get_endpoints()
100        for name, endpoint_def in endpoints.items():
101
102            class Body(BaseModel):
103                params: endpoint_def.params_model  # type: ignore
104
105            async def async_handler(
106                end_def: EndpointDefinition[Any, ...], body: Body
107            ):
108                return await end_def.fn(
109                    self._module,
110                    **body.params.model_dump(),  # type: ignore
111                )
112
113            def handler(end_def: EndpointDefinition[Any, ...], body: Body):
114                return end_def.fn(self._module, **body.params.model_dump())  # type: ignore
115
116            if inspect.iscoroutinefunction(endpoint_def.fn):
117                defined_handler = partial(async_handler, endpoint_def)
118            else:
119                defined_handler = partial(handler, endpoint_def)
120            router.post(f"/method/{name}")(defined_handler)
def add_to_blacklist(self, ss58_address: communex.types.Ss58Address):
122    def add_to_blacklist(self, ss58_address: Ss58Address):
123        self._blacklist.append(ss58_address)
def add_to_whitelist(self, ss58_address: communex.types.Ss58Address):
125    def add_to_whitelist(self, ss58_address: Ss58Address):
126        self._whitelist.append(ss58_address)
def main():
129def main():
130    class Amod(Module):
131        @endpoint
132        def do_the_thing(self, awesomness: int = 42):
133            if awesomness > 60:
134                msg = f"You're super awesome: {awesomness} awesomness"
135            else:
136                msg = f"You're not that awesome: {awesomness} awesomness"
137            return {"msg": msg}
138
139    a_mod = Amod()
140    keypair = Keypair.create_from_mnemonic(signer.TESTING_MNEMONIC)
141    server = ModuleServer(
142        a_mod,
143        keypair,
144        subnets_whitelist=[0],
145        blacklist=None,
146    )
147    app = server.get_fastapi_app()
148
149    import uvicorn
150
151    uvicorn.run(app, host="127.0.0.1", port=8000)  # type: ignore