message_queue

Per-channel message queue with batching.

Ensures temporal consistency: messages are processed in order per channel, and rapid-succession messages can be collected into batches for a single combined response.

class message_queue.QueuedMessage(platform, channel_id, user_id, user_name, text, queued_at=<factory>, extra=<factory>, raw=None)[source]

Bases: object

A message waiting in the channel queue.

Parameters:
platform: str
channel_id: str
user_id: str
user_name: str
text: str
queued_at: datetime
extra: dict[str, Any]
raw: Any = None
class message_queue.MessageBatch(messages=<factory>, first_at=<factory>, last_at=<factory>)[source]

Bases: object

A group of messages collected within a rolling time window.

Parameters:
messages: list[QueuedMessage]
first_at: datetime
last_at: datetime
add(msg)[source]

Add.

Parameters:

msg (QueuedMessage) – Incoming message object.

Return type:

None

property size: int

Size.

Returns:

The result.

Return type:

int

property channel_id: str

Channel id.

Returns:

Result string.

Return type:

str

unique_authors()[source]

Unique authors.

Returns:

The result.

Return type:

list[str]

class message_queue.MessageQueue(default_batch_window=5.0, max_batch_size=10, redis=None)[source]

Bases: object

Per-channel queue that processes messages in order.

Parameters:
  • default_batch_window (float) – Seconds to wait for additional messages before finalising a batch.

  • max_batch_size (int) – Maximum messages per batch before immediate finalisation.

  • redis (Any) – Optional async Redis client for per-channel batch config.

__init__(default_batch_window=5.0, max_batch_size=10, redis=None)[source]

Initialize the instance.

Parameters:
  • default_batch_window (float) – The default batch window value.

  • max_batch_size (int) – The max batch size value.

  • redis (Any) – The redis value.

Return type:

None

async enqueue(msg)[source]

Add a message to the channel queue (may batch).

Return type:

None

Parameters:

msg (QueuedMessage)

is_channel_processing(channel)[source]

Check whether is channel processing.

Parameters:

channel (str) – The channel value.

Returns:

True on success, False otherwise.

Return type:

bool

queue_size(channel)[source]

Queue size.

Parameters:

channel (str) – The channel value.

Returns:

The result.

Return type:

int

async start_processing(channel, callback)[source]

Ensure a processor loop is running for channel.

Return type:

None

Parameters:
async stop_processing(channel)[source]

Stop processing.

Parameters:

channel (str) – The channel value.

Return type:

None

async clear(channel)[source]

Clear.

Parameters:

channel (str) – The channel value.

Returns:

The result.

Return type:

int

stats()[source]

Stats.

Returns:

The result.

Return type:

dict[str, dict[str, Any]]

cancel_current(channel)[source]

Cancel the in-flight processing task for channel.

Returns True if a task was found and cancelled, False if no processing was active for the channel.

Return type:

bool

Parameters:

channel (str)