embedding_queue
Redis-backed batched embedding queue for non-critical embedding generation.
Accumulates (redis_key, text) pairs in a Redis sorted set
(embed_queue:pending) and flushes them to the Gemini batch embeddings
API on a timer or when the batch reaches a size threshold. Resolved
embeddings are written back to the corresponding Redis hashes so they
become vector-searchable.
Persistence guarantees:
Items are written to the ZSET before any API call, so a crash or restart never loses pending work.
On startup, any leftover items from a previous run are drained automatically (fire-and-forget — no in-process Future for those).
On API failure, items are re-added to the ZSET for the next drain cycle.
A single background asyncio.Task (_drain_task) drains the
queue — it pops a batch from the ZSET with ZPOPMIN, calls the
embedding API, and writes results back. After each successful batch it
loops back to check for more items; when the ZSET is empty it exits and
the next enqueue() call schedules a new drain after the batching
interval.
Usage:
queue = EmbeddingBatchQueue(
openrouter, redis, model="google/gemini-embedding-001",
)
await queue.start() # drains any leftovers from previous run
future = await queue.enqueue("msg:abc-123", "Hello world")
embedding = await future
await queue.stop()
- class embedding_queue.EmbeddingBatchQueue(openrouter, redis, model='google/gemini-embedding-001', flush_interval=3600.0, max_batch_size=50)[source]
Bases:
objectRedis-backed queue that batches embedding requests and flushes them periodically via the Gemini batch API.
- Parameters:
openrouter (
OpenRouterClient) – Shared API client withembed_batch()support.redis (
Redis) – Async Redis client for the persistent ZSET and writing embeddings.model (
str) – Embedding model identifier.flush_interval (
float) – Seconds to wait after the first enqueue before starting a flush.max_batch_size (
int) – Flush immediately when the queue reaches this size.
- __init__(openrouter, redis, model='google/gemini-embedding-001', flush_interval=3600.0, max_batch_size=50)[source]
Configure the queue and prepare its background-task and locking state.
Stores the injected collaborators and tuning parameters and initialises the timer/drain task slots (left
Noneuntilstart()orenqueue()schedules them), the cross-thread coordination lock, and the Lua_claim_scriptused by_claim_pending_batch()to atomically move a batch from the pending ZSET into the in-flight ZSET. No Redis or network I/O happens here; the background drain begins only oncestart()is called.Constructed once per worker process by
InferenceWorkerininference_main.pyandAgentsWorkerinagents_main.py, and is then passed down to the message processor and knowledge-anchoring worker.- Parameters:
openrouter (
OpenRouterClient) – Shared API client; its_embed_gemini_batch()method performs the actual embedding calls.redis (
Redis) – Async Redis client backing the persistent pending/in-flight ZSETs, dedup/retry hashes, the result hashes, and thesg:embed:done:*pub/sub channels.model (
str) – Embedding model identifier passed to the batch API.flush_interval (
float) – Seconds to wait after the first enqueue before a timer-triggered flush starts.max_batch_size (
int) – Queue size that triggers an immediate flush, and the maximum number of items claimed per drain iteration.
- Return type:
None
- async start()[source]
Bring the queue online and recover any work left by a previous run.
The lifecycle entry point: it first runs
_reclaim_inflight()to rescue items orphaned by a crashed worker, then counts the pending and in-flight ZSETs (ZCARD) and, if either holds leftovers, immediately kicks off a background drain via_start_drain_now()(fire-and-forget, with no per-item future since those producers are gone). Beyond that it only logs; the steady-state drain is scheduled lazily byenqueue().Awaited once per worker process at startup by
InferenceWorkerininference_main.py(line 350) andAgentsWorkerinagents_main.py(line 250).- Return type:
- Returns:
None
- async stop()[source]
Tear down the background tasks, leaving queued work safely persisted.
The lifecycle shutdown counterpart of
start(). It cancels the pending flush timer and the drain task (awaiting the latter so its cancellation settles), then counts (ZCARD) and logs how many items remain inembed_queue:pending. Nothing is dropped: because every item lives in Redis, in-progress and pending work is recovered by the nextstart(). No embeddings are computed or written here.Awaited during worker shutdown by
InferenceWorkerininference_main.py(line 506) andAgentsWorkerinagents_main.py(line 454).- Return type:
- Returns:
None
- async enqueue(redis_key, text)[source]
Queue one text for deferred embedding and hand back a result future.
The main public producer entry point. Empty/whitespace text short- circuits to a pre-resolved zero-vector future (using
openrouter_client.EMBED_DIMENSIONS) and never touches Redis. Otherwise it creates a future, spawns_await_pubsub_result()as a background task to resolve it once the embedding lands in Redis (the cross-process bridge, since the actual drain may run in another service), de-dups any prior pending member for the key via_zrem_by_redis_key(), then in one pipeline writes the dedup hash (HSET) and adds the serialized member ontoembed_queue:pending(ZADD). Finally it calls_maybe_trigger_drain()to schedule or fire the flush. The future is resolved out-of-band when the embedding is written and thesg:embed:done:{redis_key}channel is published.Called from the message-processing path:
message_processor/processor.py(lines 2088, 2107, 2305),message_processor/channel_heartbeat.py(line 576) andmessage_processor/generate_and_send.py(line 2573); also driven by the pub/sub tests.
- async enqueue_many(items)[source]
Queue a batch of
(redis_key, text)pairs for deferred embedding.The bulk counterpart of
enqueue(), applying the same per-item rules: blank text yields a pre-resolved zero-vector future, while real text gets a future plus a background_await_pubsub_result()task, is de-duped via_zrem_by_redis_key(), and is written to the dedup hash andembed_queue:pending(each item in its own small pipeline). A single_maybe_trigger_drain()runs at the end if anything was actually queued. The returned futures line up positionally with the input list.Called by
background_tasks.py(line 1344, queuing pending message embeddings) and byMessageProcessorinmessage_processor/processor.py(line 4222).
- async pending_count()[source]
Return how many items are currently waiting in the persistent queue.
A thin
ZCARDoverembed_queue:pendingexposing the backlog size. Counts only pending work, not items already claimed into the in-flight ZSET. Used internally as the drain-completion signal.Called by
flush_and_wait()to poll for an empty queue; no other in-repo callers were found.- Returns:
The number of members in the pending ZSET.
- Return type:
- flush_now()[source]
Kick off an immediate, non-blocking drain of the pending queue.
A synchronous fire-and-forget trigger meant to be called just before an LLM inference so freshly-enqueued embeddings become vector-searchable in time. It simply delegates to
_start_drain_now(), which cancels any pending flush timer and ensures the background_drain_loop()task is running; it does not wait for completion (useflush_and_wait()for that).Called synchronously by
MessageProcessorinmessage_processor/generate_and_send.py(line 1323).- Return type:
- Returns:
None
- async flush_and_wait(timeout=15.0)[source]
Force a drain and await it until the queue empties or time runs out.
The blocking counterpart of
flush_now(), used when a caller needs all pending embeddings written before proceeding. It calls_start_drain_now(), then loops pollingpending_count(): while items remain it awaits a shielded slice of the live drain task (re-kicking the drain if none is running) until the pending ZSET reaches zero or thetimeoutbudget elapses. The shield prevents the drain from being cancelled when an individual wait times out.Awaited by the knowledge-anchoring worker in
knowledge_anchoring/worker.py(line 1185) before it relies on message embeddings being present.