Source code for log_rag_ingest

"""Background task: ingest journalctl logs into the stargazer_logs RAG store.

Periodically reads new log lines since the last ingestion, chunks them,
and upserts into a ChromaDB collection for semantic search.
"""

from __future__ import annotations

import asyncio
import hashlib
import logging
import os
from datetime import datetime, timezone

logger = logging.getLogger(__name__)

_PROJECT_ROOT = os.path.dirname(os.path.abspath(__file__))
_TIMESTAMP_FILE = os.path.join(_PROJECT_ROOT, ".last_log_ingest_ts")
_SERVICE_NAME = "stargazer"
_STORE_NAME = "stargazer_logs"
_CHUNK_SIZE = 1500
_CHUNK_OVERLAP = 200
_TAGS = ["stargazer", "logs", "systemd"]
_CHROMA_BATCH_SIZE = 5000
_PRUNE_PAGE_SIZE = 500
_PRUNE_DELETE_BATCH_SIZE = 500


def _read_last_timestamp() -> str | None:
    """Read the last ingestion timestamp from disk."""
    try:
        if os.path.exists(_TIMESTAMP_FILE):
            with open(_TIMESTAMP_FILE) as f:
                ts = f.read().strip()
                return ts if ts else None
    except OSError:
        pass
    return None


def _write_last_timestamp(ts: str) -> None:
    """Persist the latest ingestion timestamp."""
    try:
        with open(_TIMESTAMP_FILE, "w") as f:
            f.write(ts)
    except OSError as e:
        logger.warning("Could not write log ingest timestamp: %s", e)


[docs] async def ingest_logs_tick() -> None: """Single tick: fetch new logs and ingest into the RAG store.""" last_ts = _read_last_timestamp() # Build journalctl command cmd = [ "journalctl", "-u", _SERVICE_NAME, "--no-pager", "--output=short-iso", ] if last_ts: cmd.extend(["--since", last_ts]) else: # First run: ingest last 2 hours of logs cmd.extend(["--since", "2 hours ago"]) try: proc = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await asyncio.wait_for( proc.communicate(), timeout=30.0, ) except asyncio.TimeoutError: logger.warning("Log ingest: journalctl timed out") return except FileNotFoundError: logger.debug("Log ingest: journalctl not found — skipping") return except Exception: logger.exception("Log ingest: failed to run journalctl") return output = stdout.decode("utf-8", errors="replace").strip() if not output: logger.debug("Log ingest: no new log lines") return lines = output.splitlines() if not lines: return # Parse timestamp range from first and last lines first_ts = _extract_timestamp(lines[0]) last_line_ts = _extract_timestamp(lines[-1]) # Chunk the log text from rag_system.file_rag_manager import chunk_text chunks = chunk_text(output, _CHUNK_SIZE, _CHUNK_OVERLAP) if not chunks: return # Get or create the RAG store try: from gemini_embed_pool import openrouter_embed_batch from rag_system import get_rag_store store = get_rag_store(_STORE_NAME) except Exception: logger.exception("Log ingest: failed to get RAG store '%s'", _STORE_NAME) return # Build IDs, documents, and metadata for upsert import json now_iso = datetime.now(timezone.utc).isoformat() ids = [] documents = [] metadatas = [] for i, chunk in enumerate(chunks): chunk_hash = hashlib.md5(chunk.encode("utf-8")).hexdigest()[:12] doc_id = f"log_{now_iso}_{i}_{chunk_hash}" documents.append(f"Service: {_SERVICE_NAME}\nLog chunk:\n\n{chunk}") ids.append(doc_id) metadatas.append({ "source": "journalctl", "service": _SERVICE_NAME, "timestamp_range": f"{first_ts or 'unknown'}{last_line_ts or 'unknown'}", "chunk_index": i, "chunk_count": len(chunks), "ingested_at": now_iso, "tags": json.dumps(_TAGS), "line_count": len(lines), }) # Embed via OpenRouter (async), then upsert with pre-computed embeddings _EMBED_BATCH_SIZE = 50 # OpenRouter API batch limit try: for start in range(0, len(ids), _CHROMA_BATCH_SIZE): end = start + _CHROMA_BATCH_SIZE batch_ids = ids[start:end] batch_docs = documents[start:end] batch_metas = metadatas[start:end] all_embeddings: list[list[float]] = [] for sub_start in range(0, len(batch_docs), _EMBED_BATCH_SIZE): sub_end = sub_start + _EMBED_BATCH_SIZE sub_docs = batch_docs[sub_start:sub_end] sub_embs = await openrouter_embed_batch( sub_docs, model="google/gemini-embedding-001", ) all_embeddings.extend(sub_embs) await asyncio.to_thread( store.collection.upsert, ids=batch_ids, documents=batch_docs, embeddings=all_embeddings, metadatas=batch_metas, ) logger.info( "Log ingest: indexed %d chunks (%d lines) into '%s'", len(chunks), len(lines), _STORE_NAME, ) except Exception: logger.exception("Log ingest: ChromaDB upsert failed") return # Prune old entries to keep the store from growing unbounded # Keep at most ~2000 entries (roughly 24h of logs at 6-hour intervals) try: count = await asyncio.to_thread(store.collection.count) if count > 2000: all_ids: list[str] = [] all_metadatas: list[dict] = [] offset = 0 while True: page = await asyncio.to_thread( store.collection.get, include=["metadatas"], limit=_PRUNE_PAGE_SIZE, offset=offset, ) if not page or not page.get("ids"): break all_ids.extend(page["ids"]) all_metadatas.extend(page["metadatas"]) if len(page["ids"]) < _PRUNE_PAGE_SIZE: break offset += len(page["ids"]) if all_ids: paired = list(zip(all_ids, all_metadatas)) paired.sort(key=lambda p: p[1].get("ingested_at", "")) excess = len(paired) - 2000 if excess > 0: to_delete = [p[0] for p in paired[:excess]] for start in range(0, len(to_delete), _PRUNE_DELETE_BATCH_SIZE): end = start + _PRUNE_DELETE_BATCH_SIZE await asyncio.to_thread( store.collection.delete, ids=to_delete[start:end], ) logger.info( "Log ingest: pruned %d old entries", len(to_delete), ) except Exception: logger.warning("Log ingest: pruning failed", exc_info=True) # Update the timestamp marker new_ts = last_line_ts or datetime.now(timezone.utc).strftime( "%Y-%m-%d %H:%M:%S", ) _write_last_timestamp(new_ts)
def _extract_timestamp(line: str) -> str | None: """Try to extract an ISO timestamp from the start of a journalctl line.""" # short-iso format: "2026-03-10T23:15:42+0000 hostname ..." if len(line) >= 19 and line[4] == "-" and line[7] == "-" and line[10] == "T": # Take up to the space after the timezone space_idx = line.find(" ") if space_idx > 10: return line[:space_idx] return None