Source code for log_rag_ingest

"""Background task: ingest journalctl logs into the stargazer_logs RAG store.

Periodically reads new log lines since the last ingestion (across every live
service tier — see ``core.control_ops.fleet_units``), chunks them, and upserts
into the pgvector stargazer_logs store for semantic search.
"""

from __future__ import annotations

import asyncio
import hashlib
import logging
import os
from datetime import datetime, timezone

logger = logging.getLogger(__name__)

_PROJECT_ROOT = os.path.dirname(os.path.abspath(__file__))
_TIMESTAMP_FILE = os.path.join(_PROJECT_ROOT, ".last_log_ingest_ts")
# Project-level label stamped on each ingested chunk (the store is
# ``stargazer_logs``). The journal *source* is now the whole fleet — every tier
# from ``core.control_ops.fleet_units`` — not a single ``stargazer`` systemd unit.
_SERVICE_NAME = "stargazer"
_STORE_NAME = "stargazer_logs"
_CHUNK_SIZE = 1500
_CHUNK_OVERLAP = 200
_TAGS = ["stargazer", "logs", "systemd"]
_CHROMA_BATCH_SIZE = 5000
_PRUNE_PAGE_SIZE = 500
_PRUNE_DELETE_BATCH_SIZE = 500


def _read_last_timestamp() -> str | None:
    """Read the high-water-mark timestamp of the last successful ingest.

    Reads the ``.last_log_ingest_ts`` marker file (``_TIMESTAMP_FILE``, next to
    this module) so the next tick can ask ``journalctl`` only for lines newer
    than what was already indexed, avoiding re-embedding the entire journal each
    run. Any filesystem error is treated as "no marker yet" so a missing or
    unreadable file simply triggers a fresh first-run ingest.

    Called by :func:`ingest_logs_tick` at the start of each tick to bound the
    ``--since`` window; it is a module-private helper with no external callers.

    Returns:
        str | None: The persisted ISO timestamp string, or ``None`` if the file
        is absent, empty, or unreadable.
    """
    try:
        if os.path.exists(_TIMESTAMP_FILE):
            with open(_TIMESTAMP_FILE) as f:
                ts = f.read().strip()
                return ts if ts else None
    except OSError:
        pass
    return None


def _write_last_timestamp(ts: str) -> None:
    """Persist the new high-water-mark timestamp after a successful ingest.

    Overwrites the ``.last_log_ingest_ts`` marker file (``_TIMESTAMP_FILE``) with
    the timestamp of the most recent log line just indexed, advancing the cursor
    that :func:`_read_last_timestamp` will return next tick. Write failures are
    logged but swallowed so an ingest that already succeeded is not reported as a
    failure merely because the marker could not be updated.

    Called by :func:`ingest_logs_tick` near the end of a successful tick; it is a
    module-private helper with no external callers.

    Args:
        ts: The timestamp string (journalctl-style or ISO) to record as the new
            ingestion high-water mark.

    Returns:
        None.
    """
    try:
        with open(_TIMESTAMP_FILE, "w") as f:
            f.write(ts)
    except OSError as e:
        logger.warning("Could not write log ingest timestamp: %s", e)


[docs] async def ingest_logs_tick() -> None: """Run one cycle of the journal-to-RAG ingestion pipeline. The full per-tick workhorse: it reads the saved cursor via :func:`_read_last_timestamp`, shells out to ``journalctl`` over every fleet tier (``core.control_ops.fleet_units``, with a 30s timeout and careful subprocess cleanup) to collect only new lines, chunks the output with ``rag_system.file_rag_manager.chunk_text``, embeds the chunks through ``gemini_embed_pool.openrouter_embed_batch`` (the ``google/gemini-embedding-001`` model) in API-sized sub-batches, and upserts them with pre-computed embeddings into the ``stargazer_logs`` pgvector store obtained from ``rag_system.get_rag_store``. It then prunes the collection back toward ~2000 entries (oldest-first, by ``ingested_at``) and finally advances the cursor via :func:`_write_last_timestamp`. Touches the filesystem (the timestamp marker and the ``journalctl`` subprocess), the embedding HTTP backend, and the vector store; the heavy store calls run via ``asyncio.to_thread`` to stay non-blocking. Missing ``journalctl`` and other errors are logged and turned into early returns rather than raised. Called by :func:`background_tasks._log_rag_ingest_task` (gated behind the ``background_scheduler_log_rag_ingest_enabled`` opt-in flag) on the periodic scheduler, and invoked directly by ``tests/test_background_tasks_shutdown.py``. Returns: None. All outcomes (indexed, pruned, skipped, or failed) are surfaced via logging only. """ last_ts = _read_last_timestamp() # Build journalctl command — tail every live service tier (the retired # monolithic ``stargazer`` unit no longer exists post-T3), merging the # persistent and runtime journals like the admin journal endpoint does. from core.control_ops import fleet_units cmd = ["journalctl", "--no-pager", "--output=short-iso", "--merge"] for unit in fleet_units(): cmd.extend(["-u", unit]) if last_ts: cmd.extend(["--since", last_ts]) else: # First run: ingest last 2 hours of logs cmd.extend(["--since", "2 hours ago"]) proc = None try: proc = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await asyncio.wait_for( proc.communicate(), timeout=30.0, ) except asyncio.TimeoutError: logger.warning("Log ingest: journalctl timed out") if proc is not None and proc.returncode is None: try: proc.terminate() await asyncio.wait_for(proc.wait(), timeout=2.0) except Exception: try: proc.kill() await proc.wait() except Exception: pass return except FileNotFoundError: logger.debug("Log ingest: journalctl not found — skipping") return except Exception: logger.exception("Log ingest: failed to run journalctl") if proc is not None and proc.returncode is None: try: proc.terminate() await asyncio.wait_for(proc.wait(), timeout=2.0) except Exception: try: proc.kill() await proc.wait() except Exception: pass return output = stdout.decode("utf-8", errors="replace").strip() if not output: logger.debug("Log ingest: no new log lines") return lines = output.splitlines() if not lines: return # Parse timestamp range from first and last lines first_ts = _extract_timestamp(lines[0]) last_line_ts = _extract_timestamp(lines[-1]) # Chunk the log text from rag_system.file_rag_manager import chunk_text chunks = chunk_text(output, _CHUNK_SIZE, _CHUNK_OVERLAP) if not chunks: return # Get or create the RAG store try: from gemini_embed_pool import openrouter_embed_batch from rag_system import get_rag_store store = get_rag_store(_STORE_NAME) except Exception: logger.exception("Log ingest: failed to get RAG store '%s'", _STORE_NAME) return # Build IDs, documents, and metadata for upsert import jsonutil as json now_iso = datetime.now(timezone.utc).isoformat() ids = [] documents = [] metadatas = [] for i, chunk in enumerate(chunks): chunk_hash = hashlib.md5(chunk.encode("utf-8")).hexdigest()[:12] doc_id = f"log_{now_iso}_{i}_{chunk_hash}" documents.append(f"Service: {_SERVICE_NAME}\nLog chunk:\n\n{chunk}") ids.append(doc_id) metadatas.append( { "source": "journalctl", "service": _SERVICE_NAME, "timestamp_range": f"{first_ts or 'unknown'}{last_line_ts or 'unknown'}", "chunk_index": i, "chunk_count": len(chunks), "ingested_at": now_iso, "tags": json.dumps(_TAGS), "line_count": len(lines), } ) # Embed via OpenRouter (async), then upsert with pre-computed embeddings _EMBED_BATCH_SIZE = 50 # OpenRouter API batch limit try: for start in range(0, len(ids), _CHROMA_BATCH_SIZE): end = start + _CHROMA_BATCH_SIZE batch_ids = ids[start:end] batch_docs = documents[start:end] batch_metas = metadatas[start:end] all_embeddings: list[list[float]] = [] for sub_start in range(0, len(batch_docs), _EMBED_BATCH_SIZE): sub_end = sub_start + _EMBED_BATCH_SIZE sub_docs = batch_docs[sub_start:sub_end] sub_embs = await openrouter_embed_batch( sub_docs, model="google/gemini-embedding-001", ) all_embeddings.extend(sub_embs) await asyncio.to_thread( store.collection.upsert, ids=batch_ids, documents=batch_docs, embeddings=all_embeddings, metadatas=batch_metas, ) logger.info( "Log ingest: indexed %d chunks (%d lines) into '%s'", len(chunks), len(lines), _STORE_NAME, ) except Exception: logger.exception("Log ingest: pgvector upsert failed") return # Prune old entries to keep the store from growing unbounded # Keep at most ~2000 entries (roughly 24h of logs at 6-hour intervals) try: count = await asyncio.to_thread(store.collection.count) if count > 2000: all_ids: list[str] = [] all_metadatas: list[dict] = [] offset = 0 while True: page = await asyncio.to_thread( store.collection.get, include=["metadatas"], limit=_PRUNE_PAGE_SIZE, offset=offset, ) if not page or not page.get("ids"): break all_ids.extend(page["ids"]) all_metadatas.extend(page["metadatas"]) if len(page["ids"]) < _PRUNE_PAGE_SIZE: break offset += len(page["ids"]) if all_ids: paired = list(zip(all_ids, all_metadatas)) paired.sort(key=lambda p: p[1].get("ingested_at", "")) excess = len(paired) - 2000 if excess > 0: to_delete = [p[0] for p in paired[:excess]] for start in range(0, len(to_delete), _PRUNE_DELETE_BATCH_SIZE): end = start + _PRUNE_DELETE_BATCH_SIZE await asyncio.to_thread( store.collection.delete, ids=to_delete[start:end], ) logger.info( "Log ingest: pruned %d old entries", len(to_delete), ) except Exception: logger.warning("Log ingest: pruning failed", exc_info=True) # Update the timestamp marker new_ts = last_line_ts or datetime.now(timezone.utc).strftime( "%Y-%m-%d %H:%M:%S", ) _write_last_timestamp(new_ts)
def _extract_timestamp(line: str) -> str | None: """Pull the leading ISO timestamp off a ``short-iso`` journalctl line. Cheaply recognizes the ``2026-03-10T23:15:42+0000 hostname ...`` shape by checking the fixed dash/``T`` positions rather than full parsing, then slices out everything up to the first space (the timestamp plus timezone). Used to label each ingested chunk with the first and last log line's time so the stored metadata carries a human-readable ``timestamp_range``. Pure string work with no I/O. Called by :func:`ingest_logs_tick` on the first and last lines of a journal batch; it is a module-private helper with no external callers. Args: line: A single raw journalctl output line. Returns: str | None: The extracted timestamp substring, or ``None`` if the line does not match the expected ``short-iso`` prefix. """ # short-iso format: "2026-03-10T23:15:42+0000 hostname ..." if len(line) >= 19 and line[4] == "-" and line[7] == "-" and line[10] == "T": # Take up to the space after the timezone space_idx = line.find(" ") if space_idx > 10: return line[:space_idx] return None