embedding_queue
Redis-backed batched embedding queue for non-critical embedding generation.
Accumulates (redis_key, text) pairs in a Redis sorted set
(embed_queue:pending) and flushes them to the Gemini batch embeddings
API on a timer or when the batch reaches a size threshold. Resolved
embeddings are written back to the corresponding Redis hashes so they
become vector-searchable.
Persistence guarantees:
Items are written to the ZSET before any API call, so a crash or restart never loses pending work.
On startup, any leftover items from a previous run are drained automatically (fire-and-forget — no in-process Future for those).
On API failure, items are re-added to the ZSET for the next drain cycle.
A single background asyncio.Task (_drain_task) drains the
queue — it pops a batch from the ZSET with ZPOPMIN, calls the
embedding API, and writes results back. After each successful batch it
loops back to check for more items; when the ZSET is empty it exits and
the next enqueue() call schedules a new drain after the batching
interval.
Usage:
queue = EmbeddingBatchQueue(
openrouter, redis, model="google/gemini-embedding-001",
)
await queue.start() # drains any leftovers from previous run
future = await queue.enqueue("msg:abc-123", "Hello world")
embedding = await future
await queue.stop()
- class embedding_queue.EmbeddingBatchQueue(openrouter, redis, model='google/gemini-embedding-001', flush_interval=3600.0, max_batch_size=50)[source]
Bases:
objectRedis-backed queue that batches embedding requests and flushes them periodically via the Gemini batch API.
- Parameters:
openrouter (
OpenRouterClient) – Shared API client withembed_batch()support.redis (
Redis) – Async Redis client for the persistent ZSET and writing embeddings.model (
str) – Embedding model identifier.flush_interval (
float) – Seconds to wait after the first enqueue before starting a flush.max_batch_size (
int) – Flush immediately when the queue reaches this size.
- async stop()[source]
Cancel the background drain task.
Items remain in the Redis ZSET and will be recovered on next start. In-process futures are cancelled.
- Return type:
- async enqueue(redis_key, text)[source]
Add text for deferred embedding. Returns a Future that resolves with the embedding vector once the batch is flushed.
If the same redis_key is already pending, the existing future is returned (dedup). Empty/whitespace text returns a pre-resolved zero-vector future.