log_rag_ingest

Background task: ingest journalctl logs into the stargazer_logs RAG store.

Periodically reads new log lines since the last ingestion (across every live service tier — see core.control_ops.fleet_units), chunks them, and upserts into the pgvector stargazer_logs store for semantic search.

async log_rag_ingest.ingest_logs_tick()[source]

Run one cycle of the journal-to-RAG ingestion pipeline.

The full per-tick workhorse: it reads the saved cursor via _read_last_timestamp(), shells out to journalctl over every fleet tier (core.control_ops.fleet_units, with a 30s timeout and careful subprocess cleanup) to collect only new lines, chunks the output with rag_system.file_rag_manager.chunk_text, embeds the chunks through gemini_embed_pool.openrouter_embed_batch (the google/gemini-embedding-001 model) in API-sized sub-batches, and upserts them with pre-computed embeddings into the stargazer_logs pgvector store obtained from rag_system.get_rag_store. It then prunes the collection back toward ~2000 entries (oldest-first, by ingested_at) and finally advances the cursor via _write_last_timestamp(). Touches the filesystem (the timestamp marker and the journalctl subprocess), the embedding HTTP backend, and the vector store; the heavy store calls run via asyncio.to_thread to stay non-blocking. Missing journalctl and other errors are logged and turned into early returns rather than raised.

Called by background_tasks._log_rag_ingest_task() (gated behind the background_scheduler_log_rag_ingest_enabled opt-in flag) on the periodic scheduler, and invoked directly by tests/test_background_tasks_shutdown.py.

Return type:

None

Returns:

None. All outcomes (indexed, pruned, skipped, or failed) are surfaced via logging only.