API reference

Queues

Types

class culsans.Queue

Bases: MixedQueue[_T]

A mixed sync-async queue that is:

  • MPMC

  • FIFO

Compliant with the Janus API version 2.0.0.

__init__(maxsize: int = 0, *, sizer: Callable[[_T], int] | DefaultType = DEFAULT) None

Create a queue object with the given maximum size.

Parameters:

sizer – A function used to calculate the size of each item in the queue.

close() None

This method is provided for compatibility with the Janus queues. Use queue.shutdown(immediate=True) as a direct substitute.

async wait_closed() None

This method is provided for compatibility with the Janus queues. It actually does nothing.

Raises:

RuntimeError – if called for non-closed queue.

async aclose() None

This method is provided for compatibility with the Janus queues. Use queue.shutdown(immediate=True) as a direct substitute.

property putting: int

The current number of threads/tasks waiting to put.

It represents the length of the wait queue and thus changes immediately.

property getting: int

The current number of threads/tasks waiting to get/peek.

It represents the length of the wait queue and thus changes immediately.

property waiting: int

The current number of threads/tasks waiting to access.

It is roughly equivalent to the sum of the putting and getting properties, but is more reliable than the sum in a multithreaded environment.

class culsans.LifoQueue

Bases: Queue[_T]

A variant of Queue that retrieves most recently added entries first (LIFO).

class culsans.PriorityQueue

Bases: Queue[_RichComparableT]

A variant of Queue that retrieves entries in priority order (lowest first).

Proxies

class culsans.BaseQueueProxy

Bases: BaseQueue[_T]

A proxy that implements the BaseQueue protocol by wrapping a mixed queue.

class culsans.SyncQueueProxy

Bases: BaseQueueProxy[_T], SyncQueue[_T]

A proxy that implements the SyncQueue protocol by wrapping a mixed queue.

class culsans.GreenQueueProxy

Bases: BaseQueueProxy[_T], GreenQueue[_T]

A proxy that implements the GreenQueue protocol by wrapping a mixed queue.

class culsans.AsyncQueueProxy

Bases: BaseQueueProxy[_T], AsyncQueue[_T]

A proxy that implements the AsyncQueue protocol by wrapping a mixed queue.

Protocols

class culsans.BaseQueue

Bases: Protocol[_T]

A base queue protocol that includes all except blocking methods.

Can be useful when you need to make sure that none of the methods will block (except for the underlying lock).

__bool__() bool

Return True if the queue is not empty, False otherwise.

Note, bool(queue) does not guarantee that subsequent get/peek calls will not block (such an approach risks a race condition where the queue can shrink before the result can be used).

To create code that needs to wait for all queued tasks to be completed, the preferred technique is to use the join methods.

__len__() int

Return the number of items in the queue.

Note, len(queue) > 0 does not guarantee that subsequent get/peek calls will not block (such an approach risks a race condition where the queue can shrink before the result can be used).

To create code that needs to wait for all queued tasks to be completed, the preferred technique is to use the join methods.

peekable() bool

Return True if the queue is peekable, False otherwise.

clearable() bool

Return True if the queue is clearable, False otherwise.

isize(item: _T) int

Return the size of the item.

This method uses the sizer passed during initialization. If it was not passed (and the corresponding protected attribute/method was not defined), it returns 1 for any object.

qsize() int

This method is provided for compatibility with the standard queues. Use len(queue) as a direct substitute.

empty() bool

This method is provided for compatibility with the standard queues. Use not queue as a direct substitute.

full() bool

Return True if the queue is full, False otherwise.

Note, not queue.full() does not guarantee that subsequent put calls will not block (such an approach risks a race condition where the queue can grow before the result can be used).

put_nowait(item: _T) None

Put item into the queue without blocking.

Only put (enqueue) the item if a free slot is immediately available.

Raises:
get_nowait() _T

Remove and return an item from the queue without blocking.

Only get (dequeue) an item if one is immediately available.

Raises:
  • QueueEmpty – if the queue is empty.

  • QueueShutDown – if the queue has been shut down and is empty, or if the queue has been shut down immediately.

peek_nowait() _T

Return an item from the queue without blocking.

Only peek (front) an item if one is immediately available.

Raises:
task_done() None

Indicate that a formerly enqueued task is complete.

Used by queue consumers. For each get call used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.

If a join call is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put into the queue).

Raises:

ValueError – if called more times than there were items placed in the queue.

shutdown(immediate: bool = False) None

Put the queue into a shutdown mode.

The queue can no longer grow. Future calls to the put methods raise QueueShutDown. Currently blocked callers of the put methods will be unblocked and will raise QueueShutDown in the formerly blocked thread/task.

Once the queue is empty, the get/peek methods will also raise QueueShutDown.

Parameters:

immediate – If set to True, the queue is drained to be completely empty, and task_done() is called for each item removed from the queue (but joiners are unblocked regardless of the number of unfinished tasks).

clear() None

Clear all items from the queue atomically.

Also calls task_done() for each removed item.

Raises:

UnsupportedOperation – if the queue is not clearable.

property unfinished_tasks: int

The current number of tasks remaining to be processed.

See the task_done() method.

property is_shutdown: bool

A boolean that is True if the queue has been shut down, False otherwise.

See the shutdown() method.

property closed: bool

This property is provided for compatibility with the Janus queues. Use is_shutdown as a direct substitute.

property maxsize: int

The maximum cumulative size of items which the queue can hold.

if maxsize is <= 0, the size is infinite. If it is an integer greater that 0, then the put methods block when the queue reaches maxsize until an item is removed by the get methods.

It can be changed dynamically by setting the attribute.

property size: int

The current cumulative size of items in the queue.

class culsans.MixedQueue

Bases: BaseQueue[_T], Protocol[_T]

A mixed queue protocol that includes both types of blocking methods via prefixes.

Provides specialized proxies via properties.

sync_put(item: _T, block: bool | DefaultType = DEFAULT, timeout: float | None = None, *, blocking: bool | DefaultType = DEFAULT) None

This method is provided for consistency with the Janus queues. Use green_put() as a direct substitute.

green_put(item: _T, block: bool | DefaultType = DEFAULT, timeout: float | None = None, *, blocking: bool | DefaultType = DEFAULT) None

Put item into the queue.

Parameters:
  • block – This parameter is provided for compatibility with the standard queues. Use blocking as a direct substitute.

  • blocking – Unless set to False, the method will block if necessary until a free slot is available. Otherwise, timeout=0 is implied.

  • timeout – If set to a non-negative number, the method will block at most timeout seconds and raise the QueueFull exception if no free slot was available within that time.

Raises:
  • QueueFull – if the queue is full and the timeout has expired.

  • QueueShutDown – if the queue has been shut down.

async async_put(item: _T) None

Put item into the queue.

If the queue is full, wait until a free slot is available.

Raises:

QueueShutDown – if the queue has been shut down.

sync_get(block: bool | DefaultType = DEFAULT, timeout: float | None = None, *, blocking: bool | DefaultType = DEFAULT) _T

This method is provided for consistency with the Janus queues. Use green_get() as a direct substitute.

green_get(block: bool | DefaultType = DEFAULT, timeout: float | None = None, *, blocking: bool | DefaultType = DEFAULT) _T

Remove and return an item from the queue.

Parameters:
  • block – This parameter is provided for compatibility with the standard queues. Use blocking as a direct substitute.

  • blocking – Unless set to False, the method will block if necessary until an item is available. Otherwise, timeout=0 is implied.

  • timeout – If set to a non-negative number, the method will block at most timeout seconds and raise the QueueEmpty exception if no item was available within that time.

Raises:
  • QueueEmpty – if the queue is empty and the timeout has expired.

  • QueueShutDown – if the queue has been shut down and is empty, or if the queue has been shut down immediately.

async async_get() _T

Remove and return an item from the queue.

If the queue is empty, wait until an item is available.

Raises:

QueueShutDown – if the queue has been shut down and is empty, or if the queue has been shut down immediately.

sync_peek(block: bool | DefaultType = DEFAULT, timeout: float | None = None, *, blocking: bool | DefaultType = DEFAULT) _T

This method is provided for consistency with the Janus queues. Use green_peek() as a direct substitute.

green_peek(block: bool | DefaultType = DEFAULT, timeout: float | None = None, *, blocking: bool | DefaultType = DEFAULT) _T

Return an item from the queue without removing it.

Parameters:
  • block – This parameter is provided for compatibility with the standard queues. Use blocking as a direct substitute.

  • blocking – Unless set to False, the method will block if necessary until an item is available. Otherwise, timeout=0 is implied.

  • timeout – If set to a non-negative number, the method will block at most timeout seconds and raise the QueueEmpty exception if no item was available within that time.

Raises:
  • QueueEmpty – if the queue is empty and the timeout has expired.

  • QueueShutDown – if the queue has been shut down and is empty, or if the queue has been shut down immediately.

  • UnsupportedOperation – if the queue is not peekable.

async async_peek() _T

Return an item from the queue without removing it.

If the queue is empty, wait until an item is available.

Raises:
  • QueueShutDown – if the queue has been shut down and is empty, or if the queue has been shut down immediately.

  • UnsupportedOperation – if the queue is not peekable.

sync_join() None

This method is provided for consistency with the Janus queues. Use green_join() as a direct substitute.

green_join() None

Block until all items in the queue have been gotten and processed.

The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer calls task_done() to indicate that the item was retrieved and all work on it is complete.

When the count of unfinished tasks drops to zero, the caller unblocks.

async async_join() None

Block until all items in the queue have been gotten and processed.

The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer calls task_done() to indicate that the item was retrieved and all work on it is complete.

When the count of unfinished tasks drops to zero, the caller unblocks.

property green_proxy: GreenQueue[_T]

An interface compatible with the standard queues from the queue module.

property async_proxy: AsyncQueue[_T]

An interface compatible with the standard queues from the asyncio module.

property sync_q: SyncQueue[_T]

This property is provided for compatibility with the Janus queues. Use green_proxy instead.

property async_q: AsyncQueue[_T]

This property is provided for compatibility with the Janus queues. Use async_proxy as a direct substitute.

class culsans.SyncQueue

Bases: BaseQueue[_T], Protocol[_T]

This protocol is provided for compatibility with the Janus queues. Use GreenQueue instead.

class culsans.GreenQueue

Bases: BaseQueue[_T], Protocol[_T]

A queue protocol that covers the standard queues’ interface from the queue module.

Compliant with the Python API version 3.13.

put(item: _T, block: bool | DefaultType = DEFAULT, timeout: float | None = None, *, blocking: bool | DefaultType = DEFAULT) None

Put item into the queue.

Parameters:
  • block – This parameter is provided for compatibility with the standard queues. Use blocking as a direct substitute.

  • blocking – Unless set to False, the method will block if necessary until a free slot is available. Otherwise, timeout=0 is implied.

  • timeout – If set to a non-negative number, the method will block at most timeout seconds and raise the QueueFull exception if no free slot was available within that time.

Raises:
  • QueueFull – if the queue is full and the timeout has expired.

  • QueueShutDown – if the queue has been shut down.

get(block: bool | DefaultType = DEFAULT, timeout: float | None = None, *, blocking: bool | DefaultType = DEFAULT) _T

Remove and return an item from the queue.

Parameters:
  • block – This parameter is provided for compatibility with the standard queues. Use blocking as a direct substitute.

  • blocking – Unless set to False, the method will block if necessary until an item is available. Otherwise, timeout=0 is implied.

  • timeout – If set to a non-negative number, the method will block at most timeout seconds and raise the QueueEmpty exception if no item was available within that time.

Raises:
  • QueueEmpty – if the queue is empty and the timeout has expired.

  • QueueShutDown – if the queue has been shut down and is empty, or if the queue has been shut down immediately.

peek(block: bool | DefaultType = DEFAULT, timeout: float | None = None, *, blocking: bool | DefaultType = DEFAULT) _T

Return an item from the queue without removing it.

Parameters:
  • block – This parameter is provided for compatibility with the standard queues. Use blocking as a direct substitute.

  • blocking – Unless set to False, the method will block if necessary until an item is available. Otherwise, timeout=0 is implied.

  • timeout – If set to a non-negative number, the method will block at most timeout seconds and raise the QueueEmpty exception if no item was available within that time.

Raises:
  • QueueEmpty – if the queue is empty and the timeout has expired.

  • QueueShutDown – if the queue has been shut down and is empty, or if the queue has been shut down immediately.

  • UnsupportedOperation – if the queue is not peekable.

join() None

Block until all items in the queue have been gotten and processed.

The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer calls task_done() to indicate that the item was retrieved and all work on it is complete.

When the count of unfinished tasks drops to zero, the caller unblocks.

class culsans.AsyncQueue

Bases: BaseQueue[_T], Protocol[_T]

A queue protocol that covers the standard queues’ interface from the asyncio module.

Compliant with the Python API version 3.13.

async put(item: _T) None

Put item into the queue.

If the queue is full, wait until a free slot is available.

Raises:

QueueShutDown – if the queue has been shut down.

async get() _T

Remove and return an item from the queue.

If the queue is empty, wait until an item is available.

Raises:

QueueShutDown – if the queue has been shut down and is empty, or if the queue has been shut down immediately.

async peek() _T

Return an item from the queue without removing it.

If the queue is empty, wait until an item is available.

Raises:
  • QueueShutDown – if the queue has been shut down and is empty, or if the queue has been shut down immediately.

  • UnsupportedOperation – if the queue is not peekable.

async join() None

Block until all items in the queue have been gotten and processed.

The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer calls task_done() to indicate that the item was retrieved and all work on it is complete.

When the count of unfinished tasks drops to zero, the caller unblocks.

Exceptions

exception culsans.QueueEmpty

Bases: SyncQueueEmpty, AsyncQueueEmpty

Exception raised when non-blocking get/peek is called on a Queue object which is empty.

exception culsans.SyncQueueEmpty

Bases: Exception

The same as queue.Empty.

exception culsans.AsyncQueueEmpty

Bases: Exception

The same as asyncio.QueueEmpty.

exception culsans.QueueFull

Bases: SyncQueueFull, AsyncQueueFull

Exception raised when non-blocking put is called on a Queue object which is full.

exception culsans.SyncQueueFull

Bases: Exception

The same as queue.Full.

exception culsans.AsyncQueueFull

Bases: Exception

The same as asyncio.QueueFull.

exception culsans.QueueShutDown

Bases: SyncQueueShutDown, AsyncQueueShutDown

Exception raised when put/get/peek is called on a Queue object which has been shut down.

exception culsans.SyncQueueShutDown

Bases: Exception

The same as queue.ShutDown.

exception culsans.AsyncQueueShutDown

Bases: Exception

The same as asyncio.QueueShutDown.

exception culsans.UnsupportedOperation

Bases: ValueError

Exception raised when peek/clear is called on a Queue object which is not peekable/clearable.

Groupers

Types

class culsans.Grouper

Bases: object

A reentrant higher-level primitive that synchronizes tasks by groups.

Can be used via the async with/with statements.

__init__(wrapped_or_predicate: Grouper | Callable[[Grouper, Hashable, tuple[str, int] | None], bool] | DefaultType = DEFAULT, /, *, default_group: Hashable | MissingType = MISSING, default_group_factory: Callable[[], Hashable] | MissingType = MISSING, default_group_mode: Literal['exclusive', 'shared'] = 'shared') None

Create a grouper object with the given parameters.

Parameters:
  • wrapped_or_predicate – If another grouper object is passed, it will create a proxy/wrapper that uses the same underlying structures but with different default parameters (see below). Otherwise, the passed object is treated as a predicate (see the predicate property).

  • default_group – See the default_group property.

  • default_group_factory – See the default_group_factory property.

  • default_group_mode – If "exclusive" is passed, only one task from the group will be woken up on release. Otherwise, if "shared" is passed, all tasks from the group will be woken up. It must be consistent with the predicate.

Example

>>> import threading
>>> thread_rlock = Grouper(default_group_factory=threading.get_ident)
>>> async with thread_rlock:
...     ...  # exclusive for threads; shared for the thread's tasks
__bool__() bool

Return True if the grouper is used by any task, False otherwise.

Used by the standard truth testing procedure.

async async_acquire(count: int = 1, *, blocking: bool = True) bool

Acquire the grouper by the current async task on behalf of the default group.

If no default group was passed to the constructor, the task identifier is used instead.

Parameters:
  • count – The recursion level delta (equivalent to the same number of calls).

  • blocking – Unless set to False, the method will block if necessary until it succeeds in acquiring the grouper. Otherwise, it will return False if it fails to do so immediately.

green_acquire(count: int = 1, *, blocking: bool = True, timeout: float | None = None) bool

Acquire the grouper by the current green task on behalf of the default group.

If no default group was passed to the constructor, the task identifier is used instead.

Parameters:
  • count – The recursion level delta (equivalent to the same number of calls).

  • blocking – Unless set to False, the method will block if necessary until it succeeds in acquiring the grouper. Otherwise, timeout=0 is implied.

  • timeout – If set to a non-negative number, the method will block at most timeout seconds and return False if it fails to acquire the grouper within that time.

async_release(count: int = 1) None

Release the grouper by the current async task on behalf of the default group.

If no default group was passed to the constructor, the task identifier is used instead.

Parameters:

count – The recursion level delta (equivalent to the same number of calls).

green_release(count: int = 1) None

Release the grouper by the current green task on behalf of the default group.

If no default group was passed to the constructor, the task identifier is used instead.

Parameters:

count – The recursion level delta (equivalent to the same number of calls).

async async_acquire_on_behalf_of(group: Hashable, count: int = 1, *, blocking: bool = True) bool

Acquire the grouper by the current async task on behalf of the group.

Parameters:
  • count – The recursion level delta (equivalent to the same number of calls).

  • blocking – Unless set to False, the method will block if necessary until it succeeds in acquiring the grouper. Otherwise, it will return False if it fails to do so immediately.

green_acquire_on_behalf_of(group: Hashable, count: int = 1, *, blocking: bool = True, timeout: float | None = None) bool

Acquire the grouper by the current green task on behalf of the group.

Parameters:
  • count – The recursion level delta (equivalent to the same number of calls).

  • blocking – Unless set to False, the method will block if necessary until it succeeds in acquiring the grouper. Otherwise, timeout=0 is implied.

  • timeout – If set to a non-negative number, the method will block at most timeout seconds and return False if it fails to acquire the grouper within that time.

async_release_on_behalf_of(group: Hashable, count: int = 1) None

Release the grouper by the current async task on behalf of the group.

Parameters:

count – The recursion level delta (equivalent to the same number of calls).

green_release_on_behalf_of(group: Hashable, count: int = 1) None

Release the grouper by the current green task on behalf of the group.

Parameters:

count – The recursion level delta (equivalent to the same number of calls).

async_owned(group: Hashable | MissingType = MISSING) bool

Return True if the current async task owns the grouper, False otherwise.

Parameters:

group – If specified, then only on behalf of the group.

green_owned(group: Hashable | MissingType = MISSING) bool

Return True if the current green task owns the grouper, False otherwise.

Parameters:

group – If specified, then only on behalf of the group.

async_count(group: Hashable | MissingType = MISSING) int

Return the recursion level of the current async task.

Parameters:

group – If specified, then only for the group. Otherwise, the sum across all groups to which the task belongs.

green_count(group: Hashable | MissingType = MISSING) int

Return the recursion level of the current green task.

Parameters:

group – If specified, then only for the group. Otherwise, the sum across all groups to which the task belongs.

locked() bool

Return True if the grouper is used by any task, False otherwise.

property predicate: Callable[[Grouper, Hashable, tuple[str, int] | None], bool]

A callable object that determines whether a given group (or a task in a given group) can acquire the grouper at a specific point in time.

On acquire, it is called for the current task if it is not yet a member of the desired group (otherwise, access is reentrant and the call does not occur). It is called even when the current task is a member of some other group (but not the desired one). The grouper, the group, and the task identifier are passed.

On release, it is called for the next group in the wait queue (if any) when the last task exits the current group. The grouper, the group, and None (instead of the task identifier) are passed.

Called only when the underlying lock is acquired, and must return True if access is granted, False otherwise. For acquire, this determines whether the task must acquire the grouper immediately or join the wait queue as part of the desired group. For release, this determines whether tasks from the next group in the wait queue must wake up (according to the chosen mode) or remain waiting.

If no predicate was passed to the constructor, then the default predicate is used, which operates according to the phase-fair policy for reading: the grouper can be acquired on behalf of the desired group if there are no owners, or if the desired group is already the owner and there are no other waiting groups. This can be used, for example, to synchronize asynchronous tasks at the thread level (group = thread identifier).

property default_group: Hashable | MissingType

The default group used by the acquire/release methods.

property default_group_factory: Callable[[], Hashable] | MissingType

The default group factory used by the acquire/release methods.

property groups: MappingProxyType[Hashable, MappingProxyType[tuple[str, int], int]]

The read-only proxy of the dictionary that maps groups to their respective information regarding which tasks the group consists of and their recursion level in it. Contains only those groups that own the grouper. Updated automatically when the current state changes.

property owners: MappingProxyType[tuple[str, int], MappingProxyType[Hashable, int]]

The read-only proxy of the dictionary that maps tasks’ identifiers to their respective information regarding which groups the task belongs to and its recursion level in each group. Contains identifiers only of those tasks that own the grouper. Updated automatically when the current state changes.

property mutex: ThreadRLock

The underlying lock.

property waiting: int

The current number of tasks waiting to own.

property wrapped: Grouper | None

The parent of this proxy/wrapper, if any.

class culsans.RWLock

Bases: Grouper

A readers-writer lock that is:

See the reading and writing properties.

property reading: Grouper

The proxy/wrapper for reading.

Example

>>> lock = RWLock()
>>> async with lock.reading:
...     ...  # read
...     async with lock.reading:
...         ...  # nested read
...     ...  # read

Example

>>> lock = RWLock()
>>> with lock.reading:
...     ...  # read
...     with lock.reading:
...         ...  # nested read
...     ...  # read
property writing: Grouper

The proxy/wrapper for writing.

Example

>>> lock = RWLock()
>>> async with lock.writing:
...     ...  # write
...     async with lock.writing:
...         ...  # nested write
...     async with lock.reading:
...         ...  # nested read
...     ...  # write

Example

>>> lock = RWLock()
>>> with lock.writing:
...     ...  # write
...     with lock.writing:
...         ...  # nested write
...     with lock.reading:
...         ...  # nested read
...     ...  # write