Edit on GitHub

communex.util.mutex

 1from threading import Lock
 2from types import TracebackType
 3from typing import ContextManager, Generic, TypeVar
 4
 5T = TypeVar("T")
 6
 7
 8class MutexBox(Generic[T], ContextManager[T]):
 9    _mutex: Lock
10    _value: T
11
12    def __init__(self, value: T):
13        self._mutex = Lock()
14        self._value = value
15
16    def __enter__(self) -> T:
17        self._mutex.acquire()
18        return self._value
19
20    def __exit__(self, exc_type: type[BaseException] | None, exc_value: BaseException | None, traceback: TracebackType | None):
21        self._mutex.release()
class MutexBox(typing.Generic[~T], typing.ContextManager[~T]):
 9class MutexBox(Generic[T], ContextManager[T]):
10    _mutex: Lock
11    _value: T
12
13    def __init__(self, value: T):
14        self._mutex = Lock()
15        self._value = value
16
17    def __enter__(self) -> T:
18        self._mutex.acquire()
19        return self._value
20
21    def __exit__(self, exc_type: type[BaseException] | None, exc_value: BaseException | None, traceback: TracebackType | None):
22        self._mutex.release()

An abstract base class for context managers.

MutexBox(value: ~T)
13    def __init__(self, value: T):
14        self._mutex = Lock()
15        self._value = value