communex.util.mutex
1from threading import Lock 2from types import TracebackType 3from typing import ContextManager, Generic, TypeVar 4 5T = TypeVar("T") 6 7 8class MutexBox(Generic[T], ContextManager[T]): 9 _mutex: Lock 10 _value: T 11 12 def __init__(self, value: T): 13 self._mutex = Lock() 14 self._value = value 15 16 def __enter__(self) -> T: 17 self._mutex.acquire() 18 return self._value 19 20 def __exit__( 21 self, 22 exc_type: type[BaseException] | None, 23 exc_value: BaseException | None, 24 traceback: TracebackType | None, 25 ): 26 self._mutex.release()
class
MutexBox(typing.Generic[~T], typing.ContextManager[~T]):
9class MutexBox(Generic[T], ContextManager[T]): 10 _mutex: Lock 11 _value: T 12 13 def __init__(self, value: T): 14 self._mutex = Lock() 15 self._value = value 16 17 def __enter__(self) -> T: 18 self._mutex.acquire() 19 return self._value 20 21 def __exit__( 22 self, 23 exc_type: type[BaseException] | None, 24 exc_value: BaseException | None, 25 traceback: TracebackType | None, 26 ): 27 self._mutex.release()
An abstract base class for context managers.