API reference

Queues

class culsans.Queue

Bases: MixedQueue[_T]

A mixed sync-async queue that is:

  • MPMC

  • FIFO

Compliant with the Janus API version 2.0.0.

__init__(maxsize: int = 0, *, sizer: Callable[[_T], int] | DefaultType = DEFAULT) None

Create a queue object with the given maximum size.

Parameters:

sizer – A function used to calculate the size of each item in the queue.

close() None

This method is provided for compatibility with the Janus queues. Use queue.shutdown(immediate=True) as a direct substitute.

async wait_closed() None

This method is provided for compatibility with the Janus queues. It actually does nothing.

Raises:

RuntimeError – if called for non-closed queue.

async aclose() None

This method is provided for compatibility with the Janus queues. Use queue.shutdown(immediate=True) as a direct substitute.

property putting: int

The current number of threads/tasks waiting to put.

It represents the length of the wait queue and thus changes immediately.

property getting: int

The current number of threads/tasks waiting to get/peek.

It represents the length of the wait queue and thus changes immediately.

property waiting: int

The current number of threads/tasks waiting to access.

It is roughly equivalent to the sum of the putting and getting properties, but is more reliable than the sum in a multithreaded environment.

class culsans.LifoQueue

Bases: Queue[_T]

A variant of Queue that retrieves most recently added entries first (LIFO).

class culsans.PriorityQueue

Bases: Queue[_RichComparableT]

A variant of Queue that retrieves entries in priority order (lowest first).

Proxies

class culsans.BaseQueueProxy

Bases: BaseQueue[_T]

A proxy that implements the BaseQueue protocol by wrapping a mixed queue.

class culsans.SyncQueueProxy

Bases: BaseQueueProxy[_T], SyncQueue[_T]

A proxy that implements the SyncQueue protocol by wrapping a mixed queue.

class culsans.GreenQueueProxy

Bases: BaseQueueProxy[_T], GreenQueue[_T]

A proxy that implements the GreenQueue protocol by wrapping a mixed queue.

class culsans.AsyncQueueProxy

Bases: BaseQueueProxy[_T], AsyncQueue[_T]

A proxy that implements the AsyncQueue protocol by wrapping a mixed queue.

Protocols

class culsans.BaseQueue

Bases: Protocol[_T]

A base queue protocol that includes all except blocking methods.

Can be useful when you need to make sure that none of the methods will block (except for the underlying lock).

__bool__() bool

Return True if the queue is not empty, False otherwise.

Note, bool(queue) does not guarantee that subsequent get/peek calls will not block (such an approach risks a race condition where the queue can shrink before the result can be used).

To create code that needs to wait for all queued tasks to be completed, the preferred technique is to use the join methods.

__len__() int

Return the number of items in the queue.

Note, len(queue) > 0 does not guarantee that subsequent get/peek calls will not block (such an approach risks a race condition where the queue can shrink before the result can be used).

To create code that needs to wait for all queued tasks to be completed, the preferred technique is to use the join methods.

peekable() bool

Return True if the queue is peekable, False otherwise.

clearable() bool

Return True if the queue is clearable, False otherwise.

isize(item: _T) int

Return the size of the item.

This method uses the sizer passed during initialization. If it was not passed (and the corresponding protected attribute/method was not defined), it returns 1 for any object.

qsize() int

This method is provided for compatibility with the standard queues. Use len(queue) as a direct substitute.

empty() bool

This method is provided for compatibility with the standard queues. Use not queue as a direct substitute.

full() bool

Return True if the queue is full, False otherwise.

Note, not queue.full() does not guarantee that subsequent put calls will not block (such an approach risks a race condition where the queue can grow before the result can be used).

put_nowait(item: _T) None

Put item into the queue without blocking.

Only put (enqueue) the item if a free slot is immediately available.

Raises:
get_nowait() _T

Remove and return an item from the queue without blocking.

Only get (dequeue) an item if one is immediately available.

Raises:
  • QueueEmpty – if the queue is empty.

  • QueueShutDown – if the queue has been shut down and is empty, or if the queue has been shut down immediately.

peek_nowait() _T

Return an item from the queue without blocking.

Only peek (front) an item if one is immediately available.

Raises:
task_done() None

Indicate that a formerly enqueued task is complete.

Used by queue consumers. For each get call used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.

If a join call is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put into the queue).

Raises:

ValueError – if called more times than there were items placed in the queue.

shutdown(immediate: bool = False) None

Put the queue into a shutdown mode.

The queue can no longer grow. Future calls to the put methods raise QueueShutDown. Currently blocked callers of the put methods will be unblocked and will raise QueueShutDown in the formerly blocked thread/task.

Once the queue is empty, the get/peek methods will also raise QueueShutDown.

Parameters:

immediate – If set to True, the queue is drained to be completely empty, and task_done() is called for each item removed from the queue (but joiners are unblocked regardless of the number of unfinished tasks).

clear() None

Clear all items from the queue atomically.

Also calls task_done() for each removed item.

Raises:

UnsupportedOperation – if the queue is not clearable.

property unfinished_tasks: int

The current number of tasks remaining to be processed.

See the task_done() method.

property is_shutdown: bool

A boolean that is True if the queue has been shut down, False otherwise.

See the shutdown() method.

property closed: bool

This property is provided for compatibility with the Janus queues. Use is_shutdown as a direct substitute.

property maxsize: int

The maximum cumulative size of items which the queue can hold.

if maxsize is <= 0, the size is infinite. If it is an integer greater that 0, then the put methods block when the queue reaches maxsize until an item is removed by the get methods.

It can be changed dynamically by setting the attribute.

property size: int

The current cumulative size of items in the queue.

class culsans.MixedQueue

Bases: BaseQueue[_T], Protocol[_T]

A mixed queue protocol that includes both types of blocking methods via prefixes.

Provides specialized proxies via properties.

sync_put(item: _T, block: bool | DefaultType = DEFAULT, timeout: float | None = None, *, blocking: bool | DefaultType = DEFAULT) None

This method is provided for consistency with the Janus queues. Use green_put() as a direct substitute.

green_put(item: _T, block: bool | DefaultType = DEFAULT, timeout: float | None = None, *, blocking: bool | DefaultType = DEFAULT) None

Put item into the queue.

Parameters:
  • block – This parameter is provided for compatibility with the standard queues. Use blocking as a direct substitute.

  • blocking – Unless set to False, the method will block if necessary until a free slot is available. Otherwise, timeout=0 is implied.

  • timeout – If set to a non-negative number, the method will block at most timeout seconds and raise the QueueFull exception if no free slot was available within that time.

Raises:
  • QueueFull – if the queue is full and the timeout has expired.

  • QueueShutDown – if the queue has been shut down.

async async_put(item: _T) None

Put item into the queue.

If the queue is full, wait until a free slot is available.

Raises:

QueueShutDown – if the queue has been shut down.

sync_get(block: bool | DefaultType = DEFAULT, timeout: float | None = None, *, blocking: bool | DefaultType = DEFAULT) _T

This method is provided for consistency with the Janus queues. Use green_get() as a direct substitute.

green_get(block: bool | DefaultType = DEFAULT, timeout: float | None = None, *, blocking: bool | DefaultType = DEFAULT) _T

Remove and return an item from the queue.

Parameters:
  • block – This parameter is provided for compatibility with the standard queues. Use blocking as a direct substitute.

  • blocking – Unless set to False, the method will block if necessary until an item is available. Otherwise, timeout=0 is implied.

  • timeout – If set to a non-negative number, the method will block at most timeout seconds and raise the QueueEmpty exception if no item was available within that time.

Raises:
  • QueueEmpty – if the queue is empty and the timeout has expired.

  • QueueShutDown – if the queue has been shut down and is empty, or if the queue has been shut down immediately.

async async_get() _T

Remove and return an item from the queue.

If the queue is empty, wait until an item is available.

Raises:

QueueShutDown – if the queue has been shut down and is empty, or if the queue has been shut down immediately.

sync_peek(block: bool | DefaultType = DEFAULT, timeout: float | None = None, *, blocking: bool | DefaultType = DEFAULT) _T

This method is provided for consistency with the Janus queues. Use green_peek() as a direct substitute.

green_peek(block: bool | DefaultType = DEFAULT, timeout: float | None = None, *, blocking: bool | DefaultType = DEFAULT) _T

Return an item from the queue without removing it.

Parameters:
  • block – This parameter is provided for compatibility with the standard queues. Use blocking as a direct substitute.

  • blocking – Unless set to False, the method will block if necessary until an item is available. Otherwise, timeout=0 is implied.

  • timeout – If set to a non-negative number, the method will block at most timeout seconds and raise the QueueEmpty exception if no item was available within that time.

Raises:
  • QueueEmpty – if the queue is empty and the timeout has expired.

  • QueueShutDown – if the queue has been shut down and is empty, or if the queue has been shut down immediately.

  • UnsupportedOperation – if the queue is not peekable.

async async_peek() _T

Return an item from the queue without removing it.

If the queue is empty, wait until an item is available.

Raises:
  • QueueShutDown – if the queue has been shut down and is empty, or if the queue has been shut down immediately.

  • UnsupportedOperation – if the queue is not peekable.

sync_join() None

This method is provided for consistency with the Janus queues. Use green_join() as a direct substitute.

green_join() None

Block until all items in the queue have been gotten and processed.

The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer calls task_done() to indicate that the item was retrieved and all work on it is complete.

When the count of unfinished tasks drops to zero, the caller unblocks.

async async_join() None

Block until all items in the queue have been gotten and processed.

The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer calls task_done() to indicate that the item was retrieved and all work on it is complete.

When the count of unfinished tasks drops to zero, the caller unblocks.

property green_proxy: GreenQueue[_T]

An interface compatible with the standard queues from the queue module.

property async_proxy: AsyncQueue[_T]

An interface compatible with the standard queues from the asyncio module.

property sync_q: SyncQueue[_T]

This property is provided for compatibility with the Janus queues. Use green_proxy instead.

property async_q: AsyncQueue[_T]

This property is provided for compatibility with the Janus queues. Use async_proxy as a direct substitute.

class culsans.SyncQueue

Bases: BaseQueue[_T], Protocol[_T]

This protocol is provided for compatibility with the Janus queues. Use GreenQueue instead.

class culsans.GreenQueue

Bases: BaseQueue[_T], Protocol[_T]

A queue protocol that covers the standard queues’ interface from the queue module.

Compliant with the Python API version 3.13.

put(item: _T, block: bool | DefaultType = DEFAULT, timeout: float | None = None, *, blocking: bool | DefaultType = DEFAULT) None

Put item into the queue.

Parameters:
  • block – This parameter is provided for compatibility with the standard queues. Use blocking as a direct substitute.

  • blocking – Unless set to False, the method will block if necessary until a free slot is available. Otherwise, timeout=0 is implied.

  • timeout – If set to a non-negative number, the method will block at most timeout seconds and raise the QueueFull exception if no free slot was available within that time.

Raises:
  • QueueFull – if the queue is full and the timeout has expired.

  • QueueShutDown – if the queue has been shut down.

get(block: bool | DefaultType = DEFAULT, timeout: float | None = None, *, blocking: bool | DefaultType = DEFAULT) _T

Remove and return an item from the queue.

Parameters:
  • block – This parameter is provided for compatibility with the standard queues. Use blocking as a direct substitute.

  • blocking – Unless set to False, the method will block if necessary until an item is available. Otherwise, timeout=0 is implied.

  • timeout – If set to a non-negative number, the method will block at most timeout seconds and raise the QueueEmpty exception if no item was available within that time.

Raises:
  • QueueEmpty – if the queue is empty and the timeout has expired.

  • QueueShutDown – if the queue has been shut down and is empty, or if the queue has been shut down immediately.

peek(block: bool | DefaultType = DEFAULT, timeout: float | None = None, *, blocking: bool | DefaultType = DEFAULT) _T

Return an item from the queue without removing it.

Parameters:
  • block – This parameter is provided for compatibility with the standard queues. Use blocking as a direct substitute.

  • blocking – Unless set to False, the method will block if necessary until an item is available. Otherwise, timeout=0 is implied.

  • timeout – If set to a non-negative number, the method will block at most timeout seconds and raise the QueueEmpty exception if no item was available within that time.

Raises:
  • QueueEmpty – if the queue is empty and the timeout has expired.

  • QueueShutDown – if the queue has been shut down and is empty, or if the queue has been shut down immediately.

  • UnsupportedOperation – if the queue is not peekable.

join() None

Block until all items in the queue have been gotten and processed.

The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer calls task_done() to indicate that the item was retrieved and all work on it is complete.

When the count of unfinished tasks drops to zero, the caller unblocks.

class culsans.AsyncQueue

Bases: BaseQueue[_T], Protocol[_T]

A queue protocol that covers the standard queues’ interface from the asyncio module.

Compliant with the Python API version 3.13.

async put(item: _T) None

Put item into the queue.

If the queue is full, wait until a free slot is available.

Raises:

QueueShutDown – if the queue has been shut down.

async get() _T

Remove and return an item from the queue.

If the queue is empty, wait until an item is available.

Raises:

QueueShutDown – if the queue has been shut down and is empty, or if the queue has been shut down immediately.

async peek() _T

Return an item from the queue without removing it.

If the queue is empty, wait until an item is available.

Raises:
  • QueueShutDown – if the queue has been shut down and is empty, or if the queue has been shut down immediately.

  • UnsupportedOperation – if the queue is not peekable.

async join() None

Block until all items in the queue have been gotten and processed.

The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer calls task_done() to indicate that the item was retrieved and all work on it is complete.

When the count of unfinished tasks drops to zero, the caller unblocks.

Exceptions

exception culsans.QueueEmpty

Bases: SyncQueueEmpty, AsyncQueueEmpty

Exception raised when non-blocking get/peek is called on a Queue object which is empty.

exception culsans.SyncQueueEmpty

Bases: Exception

The same as queue.Empty.

exception culsans.AsyncQueueEmpty

Bases: Exception

The same as asyncio.QueueEmpty.

exception culsans.QueueFull

Bases: SyncQueueFull, AsyncQueueFull

Exception raised when non-blocking put is called on a Queue object which is full.

exception culsans.SyncQueueFull

Bases: Exception

The same as queue.Full.

exception culsans.AsyncQueueFull

Bases: Exception

The same as asyncio.QueueFull.

exception culsans.QueueShutDown

Bases: SyncQueueShutDown, AsyncQueueShutDown

Exception raised when put/get/peek is called on a Queue object which has been shut down.

exception culsans.SyncQueueShutDown

Bases: Exception

The same as queue.ShutDown.

exception culsans.AsyncQueueShutDown

Bases: Exception

The same as asyncio.QueueShutDown.

exception culsans.UnsupportedOperation

Bases: ValueError

Exception raised when peek/clear is called on a Queue object which is not peekable/clearable.