message_queue
Per-channel message queue with batching.
Ensures temporal consistency: messages are processed in order per channel, and rapid-succession messages can be collected into batches for a single combined response.
- class message_queue.QueuedMessage(platform, channel_id, user_id, user_name, text, queued_at=<factory>, extra=<factory>, raw=None)[source]
Bases:
objectOne message captured while waiting in a channel’s batching queue.
A normalized snapshot of an inbound message –
platform,channel_id,user_id/user_nameandtext– stamped withqueued_atand carrying the originalIncomingMessageinrawplus any adapter-specificextrametadata. Created byMessageQueuewhen a message arrives and coalesced into aMessageBatchbefore the processor flushes a channel’s accumulated traffic.- Parameters:
- class message_queue.MessageBatch(messages=<factory>, first_at=<factory>, last_at=<factory>)[source]
Bases:
objectA set of
QueuedMessageobjects collected in a rolling window.Groups rapid-succession messages on one channel so they are processed together rather than one-at-a-time:
messagesholds the arrivals whilefirst_at/last_attrack the window thatadd()slides forward on each append. Held in memory byMessageQueueand flushed once the batch hits its size cap or the window’s quiet period elapses.- Parameters:
messages (list[QueuedMessage])
first_at (datetime)
last_at (datetime)
- messages: list[QueuedMessage]
- add(msg)[source]
Append a message to the batch and extend its rolling window.
Records the new arrival, advancing
last_atto now so the batch’s time window slides forward with each message; the first message also setsfirst_at. This is how rapid-succession messages get coalesced into one batch before the in-memory processor flushes them.Called by
MessageQueue.enqueue()(the in-memory batching path) for each message that arrives while a batch is open for the channel.- Parameters:
msg (
QueuedMessage) – The freshly queued message to add to this batch.- Return type:
- property size: int
Number of messages currently in the batch.
Convenience accessor over the backing
messageslist, used to test the batch againstmax_batch_sizeand to label log lines.Read by
MessageQueue.enqueue()andMessageQueue._finalize_batch_unlocked()(size-limit checks and logging) and byMessageQueue._process_loop()when describing a processed item.- Returns:
The count of messages held in this batch.
- Return type:
- property channel_id: str
Channel identifier shared by every message in the batch.
A batch only ever holds messages for a single channel, so this returns the channel of the first message. Used by
MessageQueue.enqueue_front()to route a batch back to the correct per-channel queue.- Returns:
The fully-qualified channel id of the batch’s messages.
- Return type:
- Raises:
ValueError – If the batch is empty and therefore has no channel.
- unique_authors()[source]
List the distinct author ids in the batch, preserving first-seen order.
Walks the batched messages once, de-duplicating by
user_idwhile keeping the order in which each author first appears, so a multi-author batch can be summarised without losing conversational ordering. Pure computation with no side effects.Defined as a helper on the batch; no callers were found in the repo, so it is available for downstream consumers (e.g. prompt-context assembly) but currently unused by core paths.
- class message_queue.RedisQueue(redis, channel_key)[source]
Bases:
objectRedis-list-backed FIFO queue for one channel’s queue items.
Serializes
QueuedMessage/MessageBatchitems into a single Redis list keyedmessage_queue:redis:{channel_key}so the per-channel processor loop can pop them in order even across process restarts. Used byMessageQueue(viaMessageQueue._queue()) whenever both a Redis client and an event bus are configured; otherwiseInMemoryRedisQueueis substituted. Both classes expose the same interface so the processor loop is agnostic to which is in play.- Parameters:
redis (Any)
channel_key (str)
- __init__(redis, channel_key)[source]
Bind the queue to a Redis client and a channel.
Computes the Redis list key (
message_queue:redis:{channel_key}) used by every other method. Instantiated byMessageQueue._queue().
- async put(item)[source]
Append a queue item to the tail of the channel’s Redis list.
Serializes item with
_serialize_item()andRPUSH-es it onto the backing list. Called byMessageQueue._finalize_batch_unlocked()when an in-memory-batched item is committed to the queue.- Parameters:
item (
Union[QueuedMessage,MessageBatch]) – The message or batch to enqueue.- Return type:
- async enqueue_front(item)[source]
Push a queue item onto the head of the channel’s Redis list.
Serializes item with
_serialize_item()andLPUSH-es it so it will be consumed before any items already queued. Called byMessageQueue.enqueue_front()to give an item priority.- Parameters:
item (
Union[QueuedMessage,MessageBatch]) – The message or batch to enqueue at the front.- Return type:
- async get()[source]
Block until an item is available, then pop and decode it.
Performs a blocking
BLPOP(no timeout) against the backing list and decodes the popped value with_deserialize_item(). Called byMessageQueue._process_loop()(wrapped in a 60 sasyncio.wait_for()).- Returns:
The next
QueuedMessageorMessageBatch.- Return type:
Union[QueuedMessage,MessageBatch]- Raises:
RuntimeError – If
BLPOPreturnsNone(should not occur with an infinite timeout, guarding against a misbehaving client).
- async qsize()[source]
Return the number of items currently in the channel’s Redis list.
Issues an
LLENagainst the backing list, returning0when no Redis client is bound.- Returns:
The current list length, or
0if Redis is unavailable.- Return type:
- async empty()[source]
Report whether the channel’s Redis list has no items.
- Returns:
Trueif the backing list is empty (or no Redis client is bound),Falseotherwise.- Return type:
- async clear()[source]
Delete the channel’s Redis list, discarding all queued items.
Reads the length first so the count of removed items can be reported, then
DEL-etes the backing key. Called byMessageQueue.clear().- Returns:
The number of items that were in the list before deletion (
0if no Redis client is bound).- Return type:
- class message_queue.InMemoryRedisQueue[source]
Bases:
objectIn-process fallback queue with the same interface as
RedisQueue.Wraps a plain
asyncio.QueuesoMessageQueuecan operate without Redis (standalone / single-process mode). Selected byMessageQueue._queue()when no Redis client and event bus are configured; items live only for the lifetime of the process. Note thatenqueue_front()andclear()reach into the underlying queue’s private attributes to achieve head-insertion and draining.- __init__()[source]
Create the backing in-memory
asyncio.Queue.Allocates the single unbounded
asyncio.Queue(self.q) that backs this fallback queue;put(),get(),enqueue_front()andclear()all operate on it. Called byMessageQueue._queue()when no Redis client/event bus is configured (standalone single-process mode). No I/O.- Return type:
None
- async put(item)[source]
Append a queue item to the tail of the in-memory queue.
Mirrors
RedisQueue.put(); called byMessageQueue._finalize_batch_unlocked().- Parameters:
item (
Union[QueuedMessage,MessageBatch]) – The message or batch to enqueue.- Return type:
- async enqueue_front(item)[source]
Insert a queue item at the head of the in-memory queue.
Since
asyncio.Queuehas no native front-insert, this reaches into its private_queuedeque toappendleftthe item, clears the_finishedevent, and wakes a waiting getter if one is parked so the item is consumed before existing entries. MirrorsRedisQueue.enqueue_front(); called byMessageQueue.enqueue_front().- Parameters:
item (
Union[QueuedMessage,MessageBatch]) – The message or batch to enqueue at the front.- Return type:
- async get()[source]
Block until an item is available, then pop and return it.
Mirrors
RedisQueue.get(); called byMessageQueue._process_loop().- Returns:
The next
QueuedMessageorMessageBatch.- Return type:
Union[QueuedMessage,MessageBatch]
- async qsize()[source]
Return the number of items currently buffered.
- Returns:
The current size of the underlying
asyncio.Queue.- Return type:
- async empty()[source]
Report whether the in-memory queue has no items.
- Returns:
Trueif the underlying queue is empty,Falseotherwise.- Return type:
- async clear()[source]
Drain every buffered item, discarding them.
Repeatedly
get_nowait/task_doneuntil empty, counting the removed items. MirrorsRedisQueue.clear(); called byMessageQueue.clear().- Returns:
The number of items removed from the queue.
- Return type:
- task_done()[source]
Mark a previously retrieved item as processed.
Delegates to
asyncio.Queue.task_done()so callers ofget()keep the underlying queue’s unfinished-task count balanced. Called byMessageQueue._process_loop().- Return type:
- class message_queue.MessageQueue(default_batch_window=5.0, max_batch_size=10, redis=None, event_bus=None)[source]
Bases:
objectPer-channel queue that batches rapid-succession messages.
Operates in two modes:
In-memory (no redis/event_bus): batches are buffered locally and handed to the processor loop started via
start_processing().Distributed (both redis and event_bus supplied): batching state lives in Redis and finalised batches are published to the cross-service inbound stream via
event_bus.publish_inbound; the local processor loop is not used for delivery in this mode.
- Parameters:
default_batch_window (
float) – Seconds to wait for additional messages before finalising a batch.max_batch_size (
int) – Maximum messages per batch before immediate finalisation.redis (
Any) – Optional async Redis client. When provided together with event_bus, enables the distributed batching path (Redis-backed batch state); otherwise consulted only for per-channel batch config (window override / disable flag).event_bus (
Any) – Optional event bus used to publish finalised batches to the cross-service inbound stream. Required (with redis) for the distributed path.
- __init__(default_batch_window=5.0, max_batch_size=10, redis=None, event_bus=None)[source]
Set up per-channel batching state and, in distributed mode, the scheduler.
Allocates the per-channel bookkeeping dicts (queues, locks, active batches, timers, processor tasks, lengths, cancellation flags) and stores the batching tunables and optional Redis/event-bus handles. When both redis and event_bus are supplied the constructor immediately starts the
_batch_scheduler_loop()backgroundasyncio.Taskthat drives the distributed sliding-window flush; otherwise the queue runs in local in-memory mode.Instantiated once per process by the owning runner/gateway and exercised directly by the batching tests; requires a running event loop in distributed mode because of the scheduler task it creates.
- Parameters:
default_batch_window (
float) – Seconds to wait for additional messages before finalising a batch (the default sliding-window length).max_batch_size (
int) – Maximum messages allowed in one batch before it is flushed immediately regardless of the window.redis (
Any) – Optional async Redis client; together with event_bus it enables the distributed Redis-backed batching path.event_bus (
Any) – Optional event bus whosepublish_inbounddelivers finalised batches to the cross-service inbound stream.
- Return type:
None
- set_event_bus(event_bus)[source]
Attach an event bus after construction, lazily starting the scheduler.
Lets a queue created before the event bus existed be upgraded to the distributed path. Once both a Redis client and the event bus are present and no scheduler is running yet, it spawns the
_batch_scheduler_loop()asyncio.Taskso Redis-backed batches begin flushing to the inbound stream.Provided for the runner/gateway wiring step; no in-repo callers were found, so it is invoked from the owning process’s startup sequence.
- async enqueue(msg)[source]
Accept a message into the channel’s batching window.
In distributed mode (
redisandevent_busset), the message is appended to the Redis-backed batch and ultimately published to the cross-service inbound stream viaevent_bus.publish_inbound(either immediately when batching is disabled or once the window/size limit is reached). In the in-memory mode it is buffered in a localMessageBatchfor the processor loop.- Return type:
- Parameters:
msg (QueuedMessage)
- async enqueue_front(item)[source]
Push a message or batch to the head of its channel queue (priority).
Resolves the channel from item (via its
channel_id), fetches the backing queue with_queue(), and delegates to that queue’senqueue_frontso the item is consumed before anything already waiting; the cached per-channel length is bumped to match. Used to inject a high-priority item (e.g. a re-queued or system message) ahead of the normal FIFO order.Within the repo this is exercised by the queue tests; otherwise called by the owning runner when an item must jump the line.
- Parameters:
item (
Union[QueuedMessage,MessageBatch]) – TheQueuedMessageorMessageBatchto enqueue at the front of its channel.- Return type:
- is_channel_processing(channel)[source]
Report whether a channel is currently running its processor callback.
Reads the cached
_processingflag, which_process_loop()sets while a queue item’s callback is in flight for the channel. A cheap, non-blocking status probe.No in-repo callers were found, so this is consumed by external status or admin surfaces.
- queue_size(channel)[source]
Return the cached number of items waiting in a channel’s queue.
Reads the in-process
_lengthscounter that the enqueue and processor paths keep in sync, rather than querying the backend, so it is cheap and synchronous. Note this reflects committed queue items, not messages still accumulating in an open batch.No in-repo callers were found, so this is consumed by external status or admin surfaces.
- async start_processing(channel, callback)[source]
Ensure a per-channel processor loop is running, starting one if not.
Idempotently launches the
_process_loop()asyncio.Taskfor channel (creating the backing queue via_queue()if needed). Holds the channel’s processor lock while checking for and replacing a finished task so two concurrent calls cannot start duplicate loops. The loop then pops items and invokes callback for each. Used by the in-memory delivery mode; in distributed mode batches are delivered via the event bus instead.Within the repo this is exercised by the queue tests; otherwise called by the owning runner when a channel first needs servicing.
- Parameters:
channel (
str) – The channel to start processing.callback (
Callable[[Union[QueuedMessage,MessageBatch]],Awaitable[None]]) – Async callable invoked with each dequeuedQueuedMessageorMessageBatch.
- Return type:
- async cancel_batch_timer(channel)[source]
Cancel and await a channel’s pending in-memory batch flush timer.
Pops the channel’s timer task (if any), cancels it, and awaits it so the cancellation fully settles before returning, swallowing the expected
asyncio.CancelledError. Prevents a pending flush from firing after the channel is being torn down or stopped. No-ops when no timer is armed.Called by
stop_processing()as part of stopping a channel.
- async stop_processing(channel)[source]
Tear down all processing for a channel: in-flight task, timer, and loop.
Cancels the channel’s currently-running callback via
cancel_current(), cancels its pending batch timer viacancel_batch_timer(), then pops and cancels the_process_loop()task itself, awaiting each cancellation so it fully settles. Used to stop servicing a channel entirely (e.g. on a user stop/cancel request).Called by
web/redis_platform_api.py(the platform cancel/stop endpoint) and exercised by the queue tests.
- async clear(channel)[source]
Discard every queued item for a channel and reset its cached length.
Delegates to the backing queue’s
clear(which deletes the Redis list or drains the in-memory queue), then zeroes the channel’s cached length. Returns0for a channel that has no queue. Used to drop pending work for a channel (e.g. during a cancel/reset).No in-repo callers were found, so this is invoked by the owning runner or an admin/reset path.
- stats()[source]
Snapshot per-channel queue health for every known channel.
Builds a mapping from channel key to a small status dict — cached
queue_size, theis_processingflag, and whether a live processor task exists — reading purely from in-process bookkeeping so it is cheap and synchronous. Intended for diagnostics and monitoring surfaces.No in-repo callers were found, so this is consumed by external status or admin surfaces.