"""Background task: ingest journalctl logs into the stargazer_logs RAG store.
Periodically reads new log lines since the last ingestion, chunks them,
and upserts into a ChromaDB collection for semantic search.
"""
from __future__ import annotations
import asyncio
import hashlib
import logging
import os
from datetime import datetime, timezone
logger = logging.getLogger(__name__)
_PROJECT_ROOT = os.path.dirname(os.path.abspath(__file__))
_TIMESTAMP_FILE = os.path.join(_PROJECT_ROOT, ".last_log_ingest_ts")
_SERVICE_NAME = "stargazer"
_STORE_NAME = "stargazer_logs"
_CHUNK_SIZE = 1500
_CHUNK_OVERLAP = 200
_TAGS = ["stargazer", "logs", "systemd"]
_CHROMA_BATCH_SIZE = 5000
_PRUNE_PAGE_SIZE = 500
_PRUNE_DELETE_BATCH_SIZE = 500
def _read_last_timestamp() -> str | None:
"""Read the last ingestion timestamp from disk."""
try:
if os.path.exists(_TIMESTAMP_FILE):
with open(_TIMESTAMP_FILE) as f:
ts = f.read().strip()
return ts if ts else None
except OSError:
pass
return None
def _write_last_timestamp(ts: str) -> None:
"""Persist the latest ingestion timestamp."""
try:
with open(_TIMESTAMP_FILE, "w") as f:
f.write(ts)
except OSError as e:
logger.warning("Could not write log ingest timestamp: %s", e)
[docs]
async def ingest_logs_tick() -> None:
"""Single tick: fetch new logs and ingest into the RAG store."""
last_ts = _read_last_timestamp()
# Build journalctl command
cmd = [
"journalctl",
"-u", _SERVICE_NAME,
"--no-pager",
"--output=short-iso",
]
if last_ts:
cmd.extend(["--since", last_ts])
else:
# First run: ingest last 2 hours of logs
cmd.extend(["--since", "2 hours ago"])
try:
proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await asyncio.wait_for(
proc.communicate(), timeout=30.0,
)
except asyncio.TimeoutError:
logger.warning("Log ingest: journalctl timed out")
return
except FileNotFoundError:
logger.debug("Log ingest: journalctl not found — skipping")
return
except Exception:
logger.exception("Log ingest: failed to run journalctl")
return
output = stdout.decode("utf-8", errors="replace").strip()
if not output:
logger.debug("Log ingest: no new log lines")
return
lines = output.splitlines()
if not lines:
return
# Parse timestamp range from first and last lines
first_ts = _extract_timestamp(lines[0])
last_line_ts = _extract_timestamp(lines[-1])
# Chunk the log text
from rag_system.file_rag_manager import chunk_text
chunks = chunk_text(output, _CHUNK_SIZE, _CHUNK_OVERLAP)
if not chunks:
return
# Get or create the RAG store
try:
from gemini_embed_pool import openrouter_embed_batch
from rag_system import get_rag_store
store = get_rag_store(_STORE_NAME)
except Exception:
logger.exception("Log ingest: failed to get RAG store '%s'", _STORE_NAME)
return
# Build IDs, documents, and metadata for upsert
import json
now_iso = datetime.now(timezone.utc).isoformat()
ids = []
documents = []
metadatas = []
for i, chunk in enumerate(chunks):
chunk_hash = hashlib.md5(chunk.encode("utf-8")).hexdigest()[:12]
doc_id = f"log_{now_iso}_{i}_{chunk_hash}"
documents.append(f"Service: {_SERVICE_NAME}\nLog chunk:\n\n{chunk}")
ids.append(doc_id)
metadatas.append({
"source": "journalctl",
"service": _SERVICE_NAME,
"timestamp_range": f"{first_ts or 'unknown'} — {last_line_ts or 'unknown'}",
"chunk_index": i,
"chunk_count": len(chunks),
"ingested_at": now_iso,
"tags": json.dumps(_TAGS),
"line_count": len(lines),
})
# Embed via OpenRouter (async), then upsert with pre-computed embeddings
_EMBED_BATCH_SIZE = 50 # OpenRouter API batch limit
try:
for start in range(0, len(ids), _CHROMA_BATCH_SIZE):
end = start + _CHROMA_BATCH_SIZE
batch_ids = ids[start:end]
batch_docs = documents[start:end]
batch_metas = metadatas[start:end]
all_embeddings: list[list[float]] = []
for sub_start in range(0, len(batch_docs), _EMBED_BATCH_SIZE):
sub_end = sub_start + _EMBED_BATCH_SIZE
sub_docs = batch_docs[sub_start:sub_end]
sub_embs = await openrouter_embed_batch(
sub_docs, model="google/gemini-embedding-001",
)
all_embeddings.extend(sub_embs)
await asyncio.to_thread(
store.collection.upsert,
ids=batch_ids,
documents=batch_docs,
embeddings=all_embeddings,
metadatas=batch_metas,
)
logger.info(
"Log ingest: indexed %d chunks (%d lines) into '%s'",
len(chunks), len(lines), _STORE_NAME,
)
except Exception:
logger.exception("Log ingest: ChromaDB upsert failed")
return
# Prune old entries to keep the store from growing unbounded
# Keep at most ~2000 entries (roughly 24h of logs at 6-hour intervals)
try:
count = await asyncio.to_thread(store.collection.count)
if count > 2000:
all_ids: list[str] = []
all_metadatas: list[dict] = []
offset = 0
while True:
page = await asyncio.to_thread(
store.collection.get,
include=["metadatas"],
limit=_PRUNE_PAGE_SIZE,
offset=offset,
)
if not page or not page.get("ids"):
break
all_ids.extend(page["ids"])
all_metadatas.extend(page["metadatas"])
if len(page["ids"]) < _PRUNE_PAGE_SIZE:
break
offset += len(page["ids"])
if all_ids:
paired = list(zip(all_ids, all_metadatas))
paired.sort(key=lambda p: p[1].get("ingested_at", ""))
excess = len(paired) - 2000
if excess > 0:
to_delete = [p[0] for p in paired[:excess]]
for start in range(0, len(to_delete), _PRUNE_DELETE_BATCH_SIZE):
end = start + _PRUNE_DELETE_BATCH_SIZE
await asyncio.to_thread(
store.collection.delete, ids=to_delete[start:end],
)
logger.info(
"Log ingest: pruned %d old entries", len(to_delete),
)
except Exception:
logger.warning("Log ingest: pruning failed", exc_info=True)
# Update the timestamp marker
new_ts = last_line_ts or datetime.now(timezone.utc).strftime(
"%Y-%m-%d %H:%M:%S",
)
_write_last_timestamp(new_ts)
def _extract_timestamp(line: str) -> str | None:
"""Try to extract an ISO timestamp from the start of a journalctl line."""
# short-iso format: "2026-03-10T23:15:42+0000 hostname ..."
if len(line) >= 19 and line[4] == "-" and line[7] == "-" and line[10] == "T":
# Take up to the space after the timezone
space_idx = line.find(" ")
if space_idx > 10:
return line[:space_idx]
return None