log_rag_ingest
Background task: ingest journalctl logs into the stargazer_logs RAG store.
Periodically reads new log lines since the last ingestion (across every live
service tier — see core.control_ops.fleet_units), chunks them, and upserts
into the pgvector stargazer_logs store for semantic search.
- async log_rag_ingest.ingest_logs_tick()[source]
Run one cycle of the journal-to-RAG ingestion pipeline.
The full per-tick workhorse: it reads the saved cursor via
_read_last_timestamp(), shells out tojournalctlover every fleet tier (core.control_ops.fleet_units, with a 30s timeout and careful subprocess cleanup) to collect only new lines, chunks the output withrag_system.file_rag_manager.chunk_text, embeds the chunks throughgemini_embed_pool.openrouter_embed_batch(thegoogle/gemini-embedding-001model) in API-sized sub-batches, and upserts them with pre-computed embeddings into thestargazer_logspgvector store obtained fromrag_system.get_rag_store. It then prunes the collection back toward ~2000 entries (oldest-first, byingested_at) and finally advances the cursor via_write_last_timestamp(). Touches the filesystem (the timestamp marker and thejournalctlsubprocess), the embedding HTTP backend, and the vector store; the heavy store calls run viaasyncio.to_threadto stay non-blocking. Missingjournalctland other errors are logged and turned into early returns rather than raised.Called by
background_tasks._log_rag_ingest_task()(gated behind thebackground_scheduler_log_rag_ingest_enabledopt-in flag) on the periodic scheduler, and invoked directly bytests/test_background_tasks_shutdown.py.- Return type:
- Returns:
None. All outcomes (indexed, pruned, skipped, or failed) are surfaced via logging only.