message_queue

Per-channel message queue with batching.

Ensures temporal consistency: messages are processed in order per channel, and rapid-succession messages can be collected into batches for a single combined response.

class message_queue.QueuedMessage(platform, channel_id, user_id, user_name, text, queued_at=<factory>, extra=<factory>, raw=None)[source]

Bases: object

One message captured while waiting in a channel’s batching queue.

A normalized snapshot of an inbound message – platform, channel_id, user_id/user_name and text – stamped with queued_at and carrying the original IncomingMessage in raw plus any adapter-specific extra metadata. Created by MessageQueue when a message arrives and coalesced into a MessageBatch before the processor flushes a channel’s accumulated traffic.

Parameters:
platform: str
channel_id: str
user_id: str
user_name: str
text: str
queued_at: datetime
extra: dict[str, Any]
raw: Any = None
class message_queue.MessageBatch(messages=<factory>, first_at=<factory>, last_at=<factory>)[source]

Bases: object

A set of QueuedMessage objects collected in a rolling window.

Groups rapid-succession messages on one channel so they are processed together rather than one-at-a-time: messages holds the arrivals while first_at/last_at track the window that add() slides forward on each append. Held in memory by MessageQueue and flushed once the batch hits its size cap or the window’s quiet period elapses.

Parameters:
messages: list[QueuedMessage]
first_at: datetime
last_at: datetime
add(msg)[source]

Append a message to the batch and extend its rolling window.

Records the new arrival, advancing last_at to now so the batch’s time window slides forward with each message; the first message also sets first_at. This is how rapid-succession messages get coalesced into one batch before the in-memory processor flushes them.

Called by MessageQueue.enqueue() (the in-memory batching path) for each message that arrives while a batch is open for the channel.

Parameters:

msg (QueuedMessage) – The freshly queued message to add to this batch.

Return type:

None

property size: int

Number of messages currently in the batch.

Convenience accessor over the backing messages list, used to test the batch against max_batch_size and to label log lines.

Read by MessageQueue.enqueue() and MessageQueue._finalize_batch_unlocked() (size-limit checks and logging) and by MessageQueue._process_loop() when describing a processed item.

Returns:

The count of messages held in this batch.

Return type:

int

property channel_id: str

Channel identifier shared by every message in the batch.

A batch only ever holds messages for a single channel, so this returns the channel of the first message. Used by MessageQueue.enqueue_front() to route a batch back to the correct per-channel queue.

Returns:

The fully-qualified channel id of the batch’s messages.

Return type:

str

Raises:

ValueError – If the batch is empty and therefore has no channel.

unique_authors()[source]

List the distinct author ids in the batch, preserving first-seen order.

Walks the batched messages once, de-duplicating by user_id while keeping the order in which each author first appears, so a multi-author batch can be summarised without losing conversational ordering. Pure computation with no side effects.

Defined as a helper on the batch; no callers were found in the repo, so it is available for downstream consumers (e.g. prompt-context assembly) but currently unused by core paths.

Returns:

The unique author ids, in the order they first sent a message in this batch.

Return type:

list[str]

class message_queue.RedisQueue(redis, channel_key)[source]

Bases: object

Redis-list-backed FIFO queue for one channel’s queue items.

Serializes QueuedMessage/MessageBatch items into a single Redis list keyed message_queue:redis:{channel_key} so the per-channel processor loop can pop them in order even across process restarts. Used by MessageQueue (via MessageQueue._queue()) whenever both a Redis client and an event bus are configured; otherwise InMemoryRedisQueue is substituted. Both classes expose the same interface so the processor loop is agnostic to which is in play.

Parameters:
  • redis (Any)

  • channel_key (str)

__init__(redis, channel_key)[source]

Bind the queue to a Redis client and a channel.

Computes the Redis list key (message_queue:redis:{channel_key}) used by every other method. Instantiated by MessageQueue._queue().

Parameters:
  • redis (Any) – An async Redis client (e.g. redis.asyncio.Redis).

  • channel_key (str) – The fully-qualified channel identifier this queue serves, used to namespace the backing list key.

Return type:

None

async put(item)[source]

Append a queue item to the tail of the channel’s Redis list.

Serializes item with _serialize_item() and RPUSH-es it onto the backing list. Called by MessageQueue._finalize_batch_unlocked() when an in-memory-batched item is committed to the queue.

Parameters:

item (Union[QueuedMessage, MessageBatch]) – The message or batch to enqueue.

Return type:

None

async enqueue_front(item)[source]

Push a queue item onto the head of the channel’s Redis list.

Serializes item with _serialize_item() and LPUSH-es it so it will be consumed before any items already queued. Called by MessageQueue.enqueue_front() to give an item priority.

Parameters:

item (Union[QueuedMessage, MessageBatch]) – The message or batch to enqueue at the front.

Return type:

None

async get()[source]

Block until an item is available, then pop and decode it.

Performs a blocking BLPOP (no timeout) against the backing list and decodes the popped value with _deserialize_item(). Called by MessageQueue._process_loop() (wrapped in a 60 s asyncio.wait_for()).

Returns:

The next QueuedMessage or MessageBatch.

Return type:

Union[QueuedMessage, MessageBatch]

Raises:

RuntimeError – If BLPOP returns None (should not occur with an infinite timeout, guarding against a misbehaving client).

async qsize()[source]

Return the number of items currently in the channel’s Redis list.

Issues an LLEN against the backing list, returning 0 when no Redis client is bound.

Returns:

The current list length, or 0 if Redis is unavailable.

Return type:

int

async empty()[source]

Report whether the channel’s Redis list has no items.

Returns:

True if the backing list is empty (or no Redis client is bound), False otherwise.

Return type:

bool

async clear()[source]

Delete the channel’s Redis list, discarding all queued items.

Reads the length first so the count of removed items can be reported, then DEL-etes the backing key. Called by MessageQueue.clear().

Returns:

The number of items that were in the list before deletion (0 if no Redis client is bound).

Return type:

int

task_done()[source]

No-op completion hook mirroring asyncio.Queue.task_done.

Exists so MessageQueue._process_loop() can call task_done() uniformly regardless of whether the channel is backed by a Redis or an in-memory queue; the Redis variant has nothing to track.

Return type:

None

class message_queue.InMemoryRedisQueue[source]

Bases: object

In-process fallback queue with the same interface as RedisQueue.

Wraps a plain asyncio.Queue so MessageQueue can operate without Redis (standalone / single-process mode). Selected by MessageQueue._queue() when no Redis client and event bus are configured; items live only for the lifetime of the process. Note that enqueue_front() and clear() reach into the underlying queue’s private attributes to achieve head-insertion and draining.

__init__()[source]

Create the backing in-memory asyncio.Queue.

Allocates the single unbounded asyncio.Queue (self.q) that backs this fallback queue; put(), get(), enqueue_front() and clear() all operate on it. Called by MessageQueue._queue() when no Redis client/event bus is configured (standalone single-process mode). No I/O.

Return type:

None

async put(item)[source]

Append a queue item to the tail of the in-memory queue.

Mirrors RedisQueue.put(); called by MessageQueue._finalize_batch_unlocked().

Parameters:

item (Union[QueuedMessage, MessageBatch]) – The message or batch to enqueue.

Return type:

None

async enqueue_front(item)[source]

Insert a queue item at the head of the in-memory queue.

Since asyncio.Queue has no native front-insert, this reaches into its private _queue deque to appendleft the item, clears the _finished event, and wakes a waiting getter if one is parked so the item is consumed before existing entries. Mirrors RedisQueue.enqueue_front(); called by MessageQueue.enqueue_front().

Parameters:

item (Union[QueuedMessage, MessageBatch]) – The message or batch to enqueue at the front.

Return type:

None

async get()[source]

Block until an item is available, then pop and return it.

Mirrors RedisQueue.get(); called by MessageQueue._process_loop().

Returns:

The next QueuedMessage or MessageBatch.

Return type:

Union[QueuedMessage, MessageBatch]

async qsize()[source]

Return the number of items currently buffered.

Returns:

The current size of the underlying asyncio.Queue.

Return type:

int

async empty()[source]

Report whether the in-memory queue has no items.

Returns:

True if the underlying queue is empty, False otherwise.

Return type:

bool

async clear()[source]

Drain every buffered item, discarding them.

Repeatedly get_nowait/task_done until empty, counting the removed items. Mirrors RedisQueue.clear(); called by MessageQueue.clear().

Returns:

The number of items removed from the queue.

Return type:

int

task_done()[source]

Mark a previously retrieved item as processed.

Delegates to asyncio.Queue.task_done() so callers of get() keep the underlying queue’s unfinished-task count balanced. Called by MessageQueue._process_loop().

Return type:

None

class message_queue.MessageQueue(default_batch_window=5.0, max_batch_size=10, redis=None, event_bus=None)[source]

Bases: object

Per-channel queue that batches rapid-succession messages.

Operates in two modes:

  • In-memory (no redis/event_bus): batches are buffered locally and handed to the processor loop started via start_processing().

  • Distributed (both redis and event_bus supplied): batching state lives in Redis and finalised batches are published to the cross-service inbound stream via event_bus.publish_inbound; the local processor loop is not used for delivery in this mode.

Parameters:
  • default_batch_window (float) – Seconds to wait for additional messages before finalising a batch.

  • max_batch_size (int) – Maximum messages per batch before immediate finalisation.

  • redis (Any) – Optional async Redis client. When provided together with event_bus, enables the distributed batching path (Redis-backed batch state); otherwise consulted only for per-channel batch config (window override / disable flag).

  • event_bus (Any) – Optional event bus used to publish finalised batches to the cross-service inbound stream. Required (with redis) for the distributed path.

__init__(default_batch_window=5.0, max_batch_size=10, redis=None, event_bus=None)[source]

Set up per-channel batching state and, in distributed mode, the scheduler.

Allocates the per-channel bookkeeping dicts (queues, locks, active batches, timers, processor tasks, lengths, cancellation flags) and stores the batching tunables and optional Redis/event-bus handles. When both redis and event_bus are supplied the constructor immediately starts the _batch_scheduler_loop() background asyncio.Task that drives the distributed sliding-window flush; otherwise the queue runs in local in-memory mode.

Instantiated once per process by the owning runner/gateway and exercised directly by the batching tests; requires a running event loop in distributed mode because of the scheduler task it creates.

Parameters:
  • default_batch_window (float) – Seconds to wait for additional messages before finalising a batch (the default sliding-window length).

  • max_batch_size (int) – Maximum messages allowed in one batch before it is flushed immediately regardless of the window.

  • redis (Any) – Optional async Redis client; together with event_bus it enables the distributed Redis-backed batching path.

  • event_bus (Any) – Optional event bus whose publish_inbound delivers finalised batches to the cross-service inbound stream.

Return type:

None

set_event_bus(event_bus)[source]

Attach an event bus after construction, lazily starting the scheduler.

Lets a queue created before the event bus existed be upgraded to the distributed path. Once both a Redis client and the event bus are present and no scheduler is running yet, it spawns the _batch_scheduler_loop() asyncio.Task so Redis-backed batches begin flushing to the inbound stream.

Provided for the runner/gateway wiring step; no in-repo callers were found, so it is invoked from the owning process’s startup sequence.

Parameters:

event_bus (Any) – The event bus whose publish_inbound will deliver finalised batches to the cross-service inbound stream.

Return type:

None

async enqueue(msg)[source]

Accept a message into the channel’s batching window.

In distributed mode (redis and event_bus set), the message is appended to the Redis-backed batch and ultimately published to the cross-service inbound stream via event_bus.publish_inbound (either immediately when batching is disabled or once the window/size limit is reached). In the in-memory mode it is buffered in a local MessageBatch for the processor loop.

Return type:

None

Parameters:

msg (QueuedMessage)

async enqueue_front(item)[source]

Push a message or batch to the head of its channel queue (priority).

Resolves the channel from item (via its channel_id), fetches the backing queue with _queue(), and delegates to that queue’s enqueue_front so the item is consumed before anything already waiting; the cached per-channel length is bumped to match. Used to inject a high-priority item (e.g. a re-queued or system message) ahead of the normal FIFO order.

Within the repo this is exercised by the queue tests; otherwise called by the owning runner when an item must jump the line.

Parameters:

item (Union[QueuedMessage, MessageBatch]) – The QueuedMessage or MessageBatch to enqueue at the front of its channel.

Return type:

None

is_channel_processing(channel)[source]

Report whether a channel is currently running its processor callback.

Reads the cached _processing flag, which _process_loop() sets while a queue item’s callback is in flight for the channel. A cheap, non-blocking status probe.

No in-repo callers were found, so this is consumed by external status or admin surfaces.

Parameters:

channel (str) – The channel key to check.

Returns:

True if the channel is actively processing an item, False otherwise (including unknown channels).

Return type:

bool

queue_size(channel)[source]

Return the cached number of items waiting in a channel’s queue.

Reads the in-process _lengths counter that the enqueue and processor paths keep in sync, rather than querying the backend, so it is cheap and synchronous. Note this reflects committed queue items, not messages still accumulating in an open batch.

No in-repo callers were found, so this is consumed by external status or admin surfaces.

Parameters:

channel (str) – The channel key to size.

Returns:

The cached queue length for the channel (0 if unknown).

Return type:

int

async start_processing(channel, callback)[source]

Ensure a per-channel processor loop is running, starting one if not.

Idempotently launches the _process_loop() asyncio.Task for channel (creating the backing queue via _queue() if needed). Holds the channel’s processor lock while checking for and replacing a finished task so two concurrent calls cannot start duplicate loops. The loop then pops items and invokes callback for each. Used by the in-memory delivery mode; in distributed mode batches are delivered via the event bus instead.

Within the repo this is exercised by the queue tests; otherwise called by the owning runner when a channel first needs servicing.

Parameters:
Return type:

None

async cancel_batch_timer(channel)[source]

Cancel and await a channel’s pending in-memory batch flush timer.

Pops the channel’s timer task (if any), cancels it, and awaits it so the cancellation fully settles before returning, swallowing the expected asyncio.CancelledError. Prevents a pending flush from firing after the channel is being torn down or stopped. No-ops when no timer is armed.

Called by stop_processing() as part of stopping a channel.

Parameters:

channel (str) – The channel whose flush timer should be cancelled.

Return type:

None

async stop_processing(channel)[source]

Tear down all processing for a channel: in-flight task, timer, and loop.

Cancels the channel’s currently-running callback via cancel_current(), cancels its pending batch timer via cancel_batch_timer(), then pops and cancels the _process_loop() task itself, awaiting each cancellation so it fully settles. Used to stop servicing a channel entirely (e.g. on a user stop/cancel request).

Called by web/redis_platform_api.py (the platform cancel/stop endpoint) and exercised by the queue tests.

Parameters:

channel (str) – The channel to stop processing.

Return type:

None

async clear(channel)[source]

Discard every queued item for a channel and reset its cached length.

Delegates to the backing queue’s clear (which deletes the Redis list or drains the in-memory queue), then zeroes the channel’s cached length. Returns 0 for a channel that has no queue. Used to drop pending work for a channel (e.g. during a cancel/reset).

No in-repo callers were found, so this is invoked by the owning runner or an admin/reset path.

Parameters:

channel (str) – The channel whose queue should be emptied.

Returns:

The number of items removed from the channel’s queue.

Return type:

int

stats()[source]

Snapshot per-channel queue health for every known channel.

Builds a mapping from channel key to a small status dict — cached queue_size, the is_processing flag, and whether a live processor task exists — reading purely from in-process bookkeeping so it is cheap and synchronous. Intended for diagnostics and monitoring surfaces.

No in-repo callers were found, so this is consumed by external status or admin surfaces.

Returns:

Per-channel status, each entry holding queue_size, is_processing, and has_processor keys.

Return type:

dict[str, dict[str, Any]]

cancel_current(channel)[source]

Cancel the in-flight processing task for channel.

Returns True if a task was found and cancelled, False if no processing was active for the channel.

Return type:

bool

Parameters:

channel (str)