message_queue
Per-channel message queue with batching.
Ensures temporal consistency: messages are processed in order per channel, and rapid-succession messages can be collected into batches for a single combined response.
- class message_queue.QueuedMessage(platform, channel_id, user_id, user_name, text, queued_at=<factory>, extra=<factory>, raw=None)[source]
Bases:
objectA message waiting in the channel queue.
- Parameters:
- class message_queue.MessageBatch(messages=<factory>, first_at=<factory>, last_at=<factory>)[source]
Bases:
objectA group of messages collected within a rolling time window.
- Parameters:
messages (list[QueuedMessage])
first_at (datetime)
last_at (datetime)
- messages: list[QueuedMessage]
- add(msg)[source]
Add.
- Parameters:
msg (
QueuedMessage) – Incoming message object.- Return type:
- class message_queue.MessageQueue(default_batch_window=5.0, max_batch_size=10, redis=None)[source]
Bases:
objectPer-channel queue that processes messages in order.
- Parameters:
- async enqueue(msg)[source]
Add a message to the channel queue (may batch).
- Return type:
- Parameters:
msg (QueuedMessage)
- async start_processing(channel, callback)[source]
Ensure a processor loop is running for channel.
- Return type:
- Parameters:
channel (str)
callback (Callable[[QueuedMessage | MessageBatch], Awaitable[None]])