embedding_queue

Redis-backed batched embedding queue for non-critical embedding generation.

Accumulates (redis_key, text) pairs in a Redis sorted set (embed_queue:pending) and flushes them to the Gemini batch embeddings API on a timer or when the batch reaches a size threshold. Resolved embeddings are written back to the corresponding Redis hashes so they become vector-searchable.

Persistence guarantees:

  • Items are written to the ZSET before any API call, so a crash or restart never loses pending work.

  • On startup, any leftover items from a previous run are drained automatically (fire-and-forget — no in-process Future for those).

  • On API failure, items are re-added to the ZSET for the next drain cycle.

A single background asyncio.Task (_drain_task) drains the queue — it pops a batch from the ZSET with ZPOPMIN, calls the embedding API, and writes results back. After each successful batch it loops back to check for more items; when the ZSET is empty it exits and the next enqueue() call schedules a new drain after the batching interval.

Usage:

queue = EmbeddingBatchQueue(
    openrouter, redis, model="google/gemini-embedding-001",
)
await queue.start()   # drains any leftovers from previous run

future = await queue.enqueue("msg:abc-123", "Hello world")
embedding = await future

await queue.stop()
class embedding_queue.EmbeddingBatchQueue(openrouter, redis, model='google/gemini-embedding-001', flush_interval=3600.0, max_batch_size=50)[source]

Bases: object

Redis-backed queue that batches embedding requests and flushes them periodically via the Gemini batch API.

Parameters:
  • openrouter (OpenRouterClient) – Shared API client with embed_batch() support.

  • redis (Redis) – Async Redis client for the persistent ZSET and writing embeddings.

  • model (str) – Embedding model identifier.

  • flush_interval (float) – Seconds to wait after the first enqueue before starting a flush.

  • max_batch_size (int) – Flush immediately when the queue reaches this size.

async start()[source]

Start the queue and drain any leftovers from a previous run.

Return type:

None

async stop()[source]

Cancel the background drain task.

Items remain in the Redis ZSET and will be recovered on next start. In-process futures are cancelled.

Return type:

None

async enqueue(redis_key, text)[source]

Add text for deferred embedding. Returns a Future that resolves with the embedding vector once the batch is flushed.

If the same redis_key is already pending, the existing future is returned (dedup). Empty/whitespace text returns a pre-resolved zero-vector future.

Return type:

Future[list[float]]

Parameters:
async enqueue_many(items)[source]

Atomically enqueue multiple (redis_key, text) pairs.

Returns a list of Futures (one per item, in the same order).

Return type:

list[Future[list[float]]]

Parameters:

items (list[tuple[str, str]])

async pending_count()[source]

Number of items waiting in the persistent queue.

Return type:

int

flush_now()[source]

Trigger an immediate drain of the persistent queue.

Call this just before an LLM inference so that all pending embeddings are available for vector search. Non-blocking: the drain runs in a background task.

Return type:

None