Edit on GitHub

communex.module._rate_limiters.limiters

 1from typing import Awaitable, Callable
 2
 3from fastapi import Request, Response
 4from fastapi.responses import JSONResponse
 5from keylimiter import TokenBucketLimiter
 6from pydantic_settings import BaseSettings
 7from starlette.middleware.base import BaseHTTPMiddleware
 8from starlette.types import ASGIApp
 9
10Callback = Callable[[Request], Awaitable[Response]]
11
12
13class IpLimiterParams(BaseSettings):
14    bucket_size: int = 15
15    refill_rate: int = 1
16
17    class config:
18        env_prefix = "CONFIG_IP_LIMITER_"
19        extra = "ignore"
20
21
22class StakeLimiterParams(BaseSettings):
23    epoch: int = 800
24    cache_age: int = 600
25    get_refill_per_epoch: Callable[[int], float] | None = None
26    token_ratio: int = 1
27
28    class config:
29        env_prefix = "CONFIG_STAKE_LIMITER_"
30        extra = "ignore"
31
32
33class IpLimiterMiddleware(BaseHTTPMiddleware):
34    def __init__(
35            self,
36            app: ASGIApp,
37            params: IpLimiterParams | None,
38    ):
39        '''
40        :param app: FastAPI instance
41        :param limiter: KeyLimiter instance OR None
42
43        If limiter is None, then a default TokenBucketLimiter is used with the following config:
44        bucket_size=200, refill_rate=15
45        '''
46        super().__init__(app)
47
48        # fallback to default limiter
49        if not params:
50            params = IpLimiterParams()
51        self._limiter = TokenBucketLimiter(bucket_size=params.bucket_size, refill_rate=params.refill_rate)
52
53    async def dispatch(self, request: Request, call_next: Callback) -> Response:
54        assert request.client is not None, "request is invalid"
55        assert request.client.host, "request is invalid."
56
57        ip = request.client.host
58
59        is_allowed = self._limiter.allow(ip)
60
61        if not is_allowed:
62            response = JSONResponse(
63                status_code=429,
64                headers={"X-RateLimit-Remaining": str(self._limiter.remaining(ip))},
65                content={"error": "Rate limit exceeded"}
66            )
67            return response
68
69        response = await call_next(request)
70
71        return response
Callback = typing.Callable[[starlette.requests.Request], typing.Awaitable[starlette.responses.Response]]
class IpLimiterParams(pydantic_settings.main.BaseSettings):
14class IpLimiterParams(BaseSettings):
15    bucket_size: int = 15
16    refill_rate: int = 1
17
18    class config:
19        env_prefix = "CONFIG_IP_LIMITER_"
20        extra = "ignore"

Base class for settings, allowing values to be overridden by environment variables.

This is useful in production for secrets you do not wish to save in code, it plays nicely with docker(-compose), Heroku and any 12 factor app design.

All the below attributes can be set via model_config.

Arguments:
  • _case_sensitive: Whether environment variables names should be read with case-sensitivity. Defaults to None.
  • _env_prefix: Prefix for all environment variables. Defaults to None.
  • _env_file: The env file(s) to load settings values from. Defaults to Path(''), which means that the value from model_config['env_file'] should be used. You can also pass None to indicate that environment variables should not be loaded from an env file.
  • _env_file_encoding: The env file encoding, e.g. 'latin-1'. Defaults to None.
  • _env_ignore_empty: Ignore environment variables where the value is an empty string. Default to False.
  • _env_nested_delimiter: The nested env values delimiter. Defaults to None.
  • _env_parse_none_str: The env string value that should be parsed (e.g. "null", "void", "None", etc.) into None type(None). Defaults to None type(None), which means no parsing should occur.
  • _env_parse_enums: Parse enum field names to values. Defaults to None., which means no parsing should occur.
  • _cli_prog_name: The CLI program name to display in help text. Defaults to None if _cli_parse_args is None. Otherwse, defaults to sys.argv[0].
  • _cli_parse_args: The list of CLI arguments to parse. Defaults to None. If set to True, defaults to sys.argv[1:].
  • _cli_settings_source: Override the default CLI settings source with a user defined instance. Defaults to None.
  • _cli_parse_none_str: The CLI string value that should be parsed (e.g. "null", "void", "None", etc.) into None type(None). Defaults to _env_parse_none_str value if set. Otherwise, defaults to "null" if _cli_avoid_json is False, and "None" if _cli_avoid_json is True.
  • _cli_hide_none_type: Hide None values in CLI help text. Defaults to False.
  • _cli_avoid_json: Avoid complex JSON objects in CLI help text. Defaults to False.
  • _cli_enforce_required: Enforce required fields at the CLI. Defaults to False.
  • _cli_use_class_docs_for_groups: Use class docstrings in CLI group help text instead of field descriptions. Defaults to False.
  • _cli_prefix: The root parser command line arguments prefix. Defaults to "".
  • _secrets_dir: The secret files directory. Defaults to None.
bucket_size: int
refill_rate: int
model_config: ClassVar[pydantic_settings.main.SettingsConfigDict] = {'extra': 'forbid', 'arbitrary_types_allowed': True, 'validate_default': True, 'case_sensitive': False, 'env_prefix': '', 'env_file': None, 'env_file_encoding': None, 'env_ignore_empty': False, 'env_nested_delimiter': None, 'env_parse_none_str': None, 'env_parse_enums': None, 'cli_prog_name': None, 'cli_parse_args': None, 'cli_settings_source': None, 'cli_parse_none_str': None, 'cli_hide_none_type': False, 'cli_avoid_json': False, 'cli_enforce_required': False, 'cli_use_class_docs_for_groups': False, 'cli_prefix': '', 'json_file': None, 'json_file_encoding': None, 'yaml_file': None, 'yaml_file_encoding': None, 'toml_file': None, 'secrets_dir': None, 'protected_namespaces': ('model_', 'settings_')}
model_fields = {'bucket_size': FieldInfo(annotation=int, required=False, default=15), 'refill_rate': FieldInfo(annotation=int, required=False, default=1)}
model_computed_fields = {}
Inherited Members
pydantic_settings.main.BaseSettings
BaseSettings
settings_customise_sources
pydantic.main.BaseModel
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_post_init
model_rebuild
model_validate
model_validate_json
model_validate_strings
dict
json
parse_obj
parse_raw
parse_file
from_orm
construct
copy
schema
schema_json
validate
update_forward_refs
class IpLimiterParams.config:
18    class config:
19        env_prefix = "CONFIG_IP_LIMITER_"
20        extra = "ignore"
env_prefix = 'CONFIG_IP_LIMITER_'
extra = 'ignore'
class StakeLimiterParams(pydantic_settings.main.BaseSettings):
23class StakeLimiterParams(BaseSettings):
24    epoch: int = 800
25    cache_age: int = 600
26    get_refill_per_epoch: Callable[[int], float] | None = None
27    token_ratio: int = 1
28
29    class config:
30        env_prefix = "CONFIG_STAKE_LIMITER_"
31        extra = "ignore"

Base class for settings, allowing values to be overridden by environment variables.

This is useful in production for secrets you do not wish to save in code, it plays nicely with docker(-compose), Heroku and any 12 factor app design.

All the below attributes can be set via model_config.

Arguments:
  • _case_sensitive: Whether environment variables names should be read with case-sensitivity. Defaults to None.
  • _env_prefix: Prefix for all environment variables. Defaults to None.
  • _env_file: The env file(s) to load settings values from. Defaults to Path(''), which means that the value from model_config['env_file'] should be used. You can also pass None to indicate that environment variables should not be loaded from an env file.
  • _env_file_encoding: The env file encoding, e.g. 'latin-1'. Defaults to None.
  • _env_ignore_empty: Ignore environment variables where the value is an empty string. Default to False.
  • _env_nested_delimiter: The nested env values delimiter. Defaults to None.
  • _env_parse_none_str: The env string value that should be parsed (e.g. "null", "void", "None", etc.) into None type(None). Defaults to None type(None), which means no parsing should occur.
  • _env_parse_enums: Parse enum field names to values. Defaults to None., which means no parsing should occur.
  • _cli_prog_name: The CLI program name to display in help text. Defaults to None if _cli_parse_args is None. Otherwse, defaults to sys.argv[0].
  • _cli_parse_args: The list of CLI arguments to parse. Defaults to None. If set to True, defaults to sys.argv[1:].
  • _cli_settings_source: Override the default CLI settings source with a user defined instance. Defaults to None.
  • _cli_parse_none_str: The CLI string value that should be parsed (e.g. "null", "void", "None", etc.) into None type(None). Defaults to _env_parse_none_str value if set. Otherwise, defaults to "null" if _cli_avoid_json is False, and "None" if _cli_avoid_json is True.
  • _cli_hide_none_type: Hide None values in CLI help text. Defaults to False.
  • _cli_avoid_json: Avoid complex JSON objects in CLI help text. Defaults to False.
  • _cli_enforce_required: Enforce required fields at the CLI. Defaults to False.
  • _cli_use_class_docs_for_groups: Use class docstrings in CLI group help text instead of field descriptions. Defaults to False.
  • _cli_prefix: The root parser command line arguments prefix. Defaults to "".
  • _secrets_dir: The secret files directory. Defaults to None.
epoch: int
cache_age: int
get_refill_per_epoch: Optional[Callable[[int], float]]
token_ratio: int
model_config: ClassVar[pydantic_settings.main.SettingsConfigDict] = {'extra': 'forbid', 'arbitrary_types_allowed': True, 'validate_default': True, 'case_sensitive': False, 'env_prefix': '', 'env_file': None, 'env_file_encoding': None, 'env_ignore_empty': False, 'env_nested_delimiter': None, 'env_parse_none_str': None, 'env_parse_enums': None, 'cli_prog_name': None, 'cli_parse_args': None, 'cli_settings_source': None, 'cli_parse_none_str': None, 'cli_hide_none_type': False, 'cli_avoid_json': False, 'cli_enforce_required': False, 'cli_use_class_docs_for_groups': False, 'cli_prefix': '', 'json_file': None, 'json_file_encoding': None, 'yaml_file': None, 'yaml_file_encoding': None, 'toml_file': None, 'secrets_dir': None, 'protected_namespaces': ('model_', 'settings_')}
model_fields = {'epoch': FieldInfo(annotation=int, required=False, default=800), 'cache_age': FieldInfo(annotation=int, required=False, default=600), 'get_refill_per_epoch': FieldInfo(annotation=Union[Callable[list, float], NoneType], required=False, default=None), 'token_ratio': FieldInfo(annotation=int, required=False, default=1)}
model_computed_fields = {}
Inherited Members
pydantic_settings.main.BaseSettings
BaseSettings
settings_customise_sources
pydantic.main.BaseModel
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_post_init
model_rebuild
model_validate
model_validate_json
model_validate_strings
dict
json
parse_obj
parse_raw
parse_file
from_orm
construct
copy
schema
schema_json
validate
update_forward_refs
class StakeLimiterParams.config:
29    class config:
30        env_prefix = "CONFIG_STAKE_LIMITER_"
31        extra = "ignore"
env_prefix = 'CONFIG_STAKE_LIMITER_'
extra = 'ignore'
class IpLimiterMiddleware(starlette.middleware.base.BaseHTTPMiddleware):
34class IpLimiterMiddleware(BaseHTTPMiddleware):
35    def __init__(
36            self,
37            app: ASGIApp,
38            params: IpLimiterParams | None,
39    ):
40        '''
41        :param app: FastAPI instance
42        :param limiter: KeyLimiter instance OR None
43
44        If limiter is None, then a default TokenBucketLimiter is used with the following config:
45        bucket_size=200, refill_rate=15
46        '''
47        super().__init__(app)
48
49        # fallback to default limiter
50        if not params:
51            params = IpLimiterParams()
52        self._limiter = TokenBucketLimiter(bucket_size=params.bucket_size, refill_rate=params.refill_rate)
53
54    async def dispatch(self, request: Request, call_next: Callback) -> Response:
55        assert request.client is not None, "request is invalid"
56        assert request.client.host, "request is invalid."
57
58        ip = request.client.host
59
60        is_allowed = self._limiter.allow(ip)
61
62        if not is_allowed:
63            response = JSONResponse(
64                status_code=429,
65                headers={"X-RateLimit-Remaining": str(self._limiter.remaining(ip))},
66                content={"error": "Rate limit exceeded"}
67            )
68            return response
69
70        response = await call_next(request)
71
72        return response
IpLimiterMiddleware( app: Callable[[MutableMapping[str, Any], Callable[[], Awaitable[MutableMapping[str, Any]]], Callable[[MutableMapping[str, Any]], Awaitable[NoneType]]], Awaitable[NoneType]], params: IpLimiterParams | None)
35    def __init__(
36            self,
37            app: ASGIApp,
38            params: IpLimiterParams | None,
39    ):
40        '''
41        :param app: FastAPI instance
42        :param limiter: KeyLimiter instance OR None
43
44        If limiter is None, then a default TokenBucketLimiter is used with the following config:
45        bucket_size=200, refill_rate=15
46        '''
47        super().__init__(app)
48
49        # fallback to default limiter
50        if not params:
51            params = IpLimiterParams()
52        self._limiter = TokenBucketLimiter(bucket_size=params.bucket_size, refill_rate=params.refill_rate)
Parameters
  • app: FastAPI instance
  • limiter: KeyLimiter instance OR None

If limiter is None, then a default TokenBucketLimiter is used with the following config: bucket_size=200, refill_rate=15

async def dispatch( self, request: starlette.requests.Request, call_next: Callable[[starlette.requests.Request], Awaitable[starlette.responses.Response]]) -> starlette.responses.Response:
54    async def dispatch(self, request: Request, call_next: Callback) -> Response:
55        assert request.client is not None, "request is invalid"
56        assert request.client.host, "request is invalid."
57
58        ip = request.client.host
59
60        is_allowed = self._limiter.allow(ip)
61
62        if not is_allowed:
63            response = JSONResponse(
64                status_code=429,
65                headers={"X-RateLimit-Remaining": str(self._limiter.remaining(ip))},
66                content={"error": "Rate limit exceeded"}
67            )
68            return response
69
70        response = await call_next(request)
71
72        return response
Inherited Members
starlette.middleware.base.BaseHTTPMiddleware
app
dispatch_func