embedding_queue

Redis-backed batched embedding queue for non-critical embedding generation.

Accumulates (redis_key, text) pairs in a Redis sorted set (embed_queue:pending) and flushes them to the Gemini batch embeddings API on a timer or when the batch reaches a size threshold. Resolved embeddings are written back to the corresponding Redis hashes so they become vector-searchable.

Persistence guarantees:

  • Items are written to the ZSET before any API call, so a crash or restart never loses pending work.

  • On startup, any leftover items from a previous run are drained automatically (fire-and-forget — no in-process Future for those).

  • On API failure, items are re-added to the ZSET for the next drain cycle.

A single background asyncio.Task (_drain_task) drains the queue — it pops a batch from the ZSET with ZPOPMIN, calls the embedding API, and writes results back. After each successful batch it loops back to check for more items; when the ZSET is empty it exits and the next enqueue() call schedules a new drain after the batching interval.

Usage:

queue = EmbeddingBatchQueue(
    openrouter, redis, model="google/gemini-embedding-001",
)
await queue.start()   # drains any leftovers from previous run

future = await queue.enqueue("msg:abc-123", "Hello world")
embedding = await future

await queue.stop()
class embedding_queue.EmbeddingBatchQueue(openrouter, redis, model='google/gemini-embedding-001', flush_interval=3600.0, max_batch_size=50)[source]

Bases: object

Redis-backed queue that batches embedding requests and flushes them periodically via the Gemini batch API.

Parameters:
  • openrouter (OpenRouterClient) – Shared API client with embed_batch() support.

  • redis (Redis) – Async Redis client for the persistent ZSET and writing embeddings.

  • model (str) – Embedding model identifier.

  • flush_interval (float) – Seconds to wait after the first enqueue before starting a flush.

  • max_batch_size (int) – Flush immediately when the queue reaches this size.

__init__(openrouter, redis, model='google/gemini-embedding-001', flush_interval=3600.0, max_batch_size=50)[source]

Configure the queue and prepare its background-task and locking state.

Stores the injected collaborators and tuning parameters and initialises the timer/drain task slots (left None until start() or enqueue() schedules them), the cross-thread coordination lock, and the Lua _claim_script used by _claim_pending_batch() to atomically move a batch from the pending ZSET into the in-flight ZSET. No Redis or network I/O happens here; the background drain begins only once start() is called.

Constructed once per worker process by InferenceWorker in inference_main.py and AgentsWorker in agents_main.py, and is then passed down to the message processor and knowledge-anchoring worker.

Parameters:
  • openrouter (OpenRouterClient) – Shared API client; its _embed_gemini_batch() method performs the actual embedding calls.

  • redis (Redis) – Async Redis client backing the persistent pending/in-flight ZSETs, dedup/retry hashes, the result hashes, and the sg:embed:done:* pub/sub channels.

  • model (str) – Embedding model identifier passed to the batch API.

  • flush_interval (float) – Seconds to wait after the first enqueue before a timer-triggered flush starts.

  • max_batch_size (int) – Queue size that triggers an immediate flush, and the maximum number of items claimed per drain iteration.

Return type:

None

async start()[source]

Bring the queue online and recover any work left by a previous run.

The lifecycle entry point: it first runs _reclaim_inflight() to rescue items orphaned by a crashed worker, then counts the pending and in-flight ZSETs (ZCARD) and, if either holds leftovers, immediately kicks off a background drain via _start_drain_now() (fire-and-forget, with no per-item future since those producers are gone). Beyond that it only logs; the steady-state drain is scheduled lazily by enqueue().

Awaited once per worker process at startup by InferenceWorker in inference_main.py (line 350) and AgentsWorker in agents_main.py (line 250).

Return type:

None

Returns:

None

async stop()[source]

Tear down the background tasks, leaving queued work safely persisted.

The lifecycle shutdown counterpart of start(). It cancels the pending flush timer and the drain task (awaiting the latter so its cancellation settles), then counts (ZCARD) and logs how many items remain in embed_queue:pending. Nothing is dropped: because every item lives in Redis, in-progress and pending work is recovered by the next start(). No embeddings are computed or written here.

Awaited during worker shutdown by InferenceWorker in inference_main.py (line 506) and AgentsWorker in agents_main.py (line 454).

Return type:

None

Returns:

None

async enqueue(redis_key, text)[source]

Queue one text for deferred embedding and hand back a result future.

The main public producer entry point. Empty/whitespace text short- circuits to a pre-resolved zero-vector future (using openrouter_client.EMBED_DIMENSIONS) and never touches Redis. Otherwise it creates a future, spawns _await_pubsub_result() as a background task to resolve it once the embedding lands in Redis (the cross-process bridge, since the actual drain may run in another service), de-dups any prior pending member for the key via _zrem_by_redis_key(), then in one pipeline writes the dedup hash (HSET) and adds the serialized member onto embed_queue:pending (ZADD). Finally it calls _maybe_trigger_drain() to schedule or fire the flush. The future is resolved out-of-band when the embedding is written and the sg:embed:done:{redis_key} channel is published.

Called from the message-processing path: message_processor/processor.py (lines 2088, 2107, 2305), message_processor/channel_heartbeat.py (line 576) and message_processor/generate_and_send.py (line 2573); also driven by the pub/sub tests.

Parameters:
  • redis_key (str) – Destination hash whose embedding field will receive the resolved vector.

  • text (str) – The text to embed.

Returns:

A future resolving to the embedding vector (a zero vector for blank text, or after the batch flush).

Return type:

Future[list[float]]

async enqueue_many(items)[source]

Queue a batch of (redis_key, text) pairs for deferred embedding.

The bulk counterpart of enqueue(), applying the same per-item rules: blank text yields a pre-resolved zero-vector future, while real text gets a future plus a background _await_pubsub_result() task, is de-duped via _zrem_by_redis_key(), and is written to the dedup hash and embed_queue:pending (each item in its own small pipeline). A single _maybe_trigger_drain() runs at the end if anything was actually queued. The returned futures line up positionally with the input list.

Called by background_tasks.py (line 1344, queuing pending message embeddings) and by MessageProcessor in message_processor/processor.py (line 4222).

Parameters:

items (list[tuple[str, str]]) – (redis_key, text) pairs to queue, in order.

Returns:

One future per input item, in the same order; an empty list when items is empty.

Return type:

list[Future[list[float]]]

async pending_count()[source]

Return how many items are currently waiting in the persistent queue.

A thin ZCARD over embed_queue:pending exposing the backlog size. Counts only pending work, not items already claimed into the in-flight ZSET. Used internally as the drain-completion signal.

Called by flush_and_wait() to poll for an empty queue; no other in-repo callers were found.

Returns:

The number of members in the pending ZSET.

Return type:

int

flush_now()[source]

Kick off an immediate, non-blocking drain of the pending queue.

A synchronous fire-and-forget trigger meant to be called just before an LLM inference so freshly-enqueued embeddings become vector-searchable in time. It simply delegates to _start_drain_now(), which cancels any pending flush timer and ensures the background _drain_loop() task is running; it does not wait for completion (use flush_and_wait() for that).

Called synchronously by MessageProcessor in message_processor/generate_and_send.py (line 1323).

Return type:

None

Returns:

None

async flush_and_wait(timeout=15.0)[source]

Force a drain and await it until the queue empties or time runs out.

The blocking counterpart of flush_now(), used when a caller needs all pending embeddings written before proceeding. It calls _start_drain_now(), then loops polling pending_count(): while items remain it awaits a shielded slice of the live drain task (re-kicking the drain if none is running) until the pending ZSET reaches zero or the timeout budget elapses. The shield prevents the drain from being cancelled when an individual wait times out.

Awaited by the knowledge-anchoring worker in knowledge_anchoring/worker.py (line 1185) before it relies on message embeddings being present.

Parameters:

timeout (float) – Maximum seconds to wait for the queue to drain before returning regardless of remaining items.

Return type:

None

Returns:

None