"""Background task: ingest journalctl logs into the stargazer_logs RAG store.
Periodically reads new log lines since the last ingestion (across every live
service tier — see ``core.control_ops.fleet_units``), chunks them, and upserts
into the pgvector stargazer_logs store for semantic search.
"""
from __future__ import annotations
import asyncio
import hashlib
import logging
import os
from datetime import datetime, timezone
logger = logging.getLogger(__name__)
_PROJECT_ROOT = os.path.dirname(os.path.abspath(__file__))
_TIMESTAMP_FILE = os.path.join(_PROJECT_ROOT, ".last_log_ingest_ts")
# Project-level label stamped on each ingested chunk (the store is
# ``stargazer_logs``). The journal *source* is now the whole fleet — every tier
# from ``core.control_ops.fleet_units`` — not a single ``stargazer`` systemd unit.
_SERVICE_NAME = "stargazer"
_STORE_NAME = "stargazer_logs"
_CHUNK_SIZE = 1500
_CHUNK_OVERLAP = 200
_TAGS = ["stargazer", "logs", "systemd"]
_CHROMA_BATCH_SIZE = 5000
_PRUNE_PAGE_SIZE = 500
_PRUNE_DELETE_BATCH_SIZE = 500
def _read_last_timestamp() -> str | None:
"""Read the high-water-mark timestamp of the last successful ingest.
Reads the ``.last_log_ingest_ts`` marker file (``_TIMESTAMP_FILE``, next to
this module) so the next tick can ask ``journalctl`` only for lines newer
than what was already indexed, avoiding re-embedding the entire journal each
run. Any filesystem error is treated as "no marker yet" so a missing or
unreadable file simply triggers a fresh first-run ingest.
Called by :func:`ingest_logs_tick` at the start of each tick to bound the
``--since`` window; it is a module-private helper with no external callers.
Returns:
str | None: The persisted ISO timestamp string, or ``None`` if the file
is absent, empty, or unreadable.
"""
try:
if os.path.exists(_TIMESTAMP_FILE):
with open(_TIMESTAMP_FILE) as f:
ts = f.read().strip()
return ts if ts else None
except OSError:
pass
return None
def _write_last_timestamp(ts: str) -> None:
"""Persist the new high-water-mark timestamp after a successful ingest.
Overwrites the ``.last_log_ingest_ts`` marker file (``_TIMESTAMP_FILE``) with
the timestamp of the most recent log line just indexed, advancing the cursor
that :func:`_read_last_timestamp` will return next tick. Write failures are
logged but swallowed so an ingest that already succeeded is not reported as a
failure merely because the marker could not be updated.
Called by :func:`ingest_logs_tick` near the end of a successful tick; it is a
module-private helper with no external callers.
Args:
ts: The timestamp string (journalctl-style or ISO) to record as the new
ingestion high-water mark.
Returns:
None.
"""
try:
with open(_TIMESTAMP_FILE, "w") as f:
f.write(ts)
except OSError as e:
logger.warning("Could not write log ingest timestamp: %s", e)
[docs]
async def ingest_logs_tick() -> None:
"""Run one cycle of the journal-to-RAG ingestion pipeline.
The full per-tick workhorse: it reads the saved cursor via
:func:`_read_last_timestamp`, shells out to ``journalctl`` over every fleet
tier (``core.control_ops.fleet_units``, with a 30s timeout and careful
subprocess cleanup) to collect only new lines,
chunks the output with ``rag_system.file_rag_manager.chunk_text``, embeds the
chunks through ``gemini_embed_pool.openrouter_embed_batch`` (the
``google/gemini-embedding-001`` model) in API-sized sub-batches, and upserts
them with pre-computed embeddings into the ``stargazer_logs`` pgvector store
obtained from ``rag_system.get_rag_store``. It then prunes the collection back
toward ~2000 entries (oldest-first, by ``ingested_at``) and finally advances
the cursor via :func:`_write_last_timestamp`. Touches the filesystem (the
timestamp marker and the ``journalctl`` subprocess), the embedding HTTP
backend, and the vector store; the heavy store calls run via
``asyncio.to_thread`` to stay non-blocking. Missing ``journalctl`` and other
errors are logged and turned into early returns rather than raised.
Called by :func:`background_tasks._log_rag_ingest_task` (gated behind the
``background_scheduler_log_rag_ingest_enabled`` opt-in flag) on the periodic
scheduler, and invoked directly by
``tests/test_background_tasks_shutdown.py``.
Returns:
None. All outcomes (indexed, pruned, skipped, or failed) are surfaced via
logging only.
"""
last_ts = _read_last_timestamp()
# Build journalctl command — tail every live service tier (the retired
# monolithic ``stargazer`` unit no longer exists post-T3), merging the
# persistent and runtime journals like the admin journal endpoint does.
from core.control_ops import fleet_units
cmd = ["journalctl", "--no-pager", "--output=short-iso", "--merge"]
for unit in fleet_units():
cmd.extend(["-u", unit])
if last_ts:
cmd.extend(["--since", last_ts])
else:
# First run: ingest last 2 hours of logs
cmd.extend(["--since", "2 hours ago"])
proc = None
try:
proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await asyncio.wait_for(
proc.communicate(),
timeout=30.0,
)
except asyncio.TimeoutError:
logger.warning("Log ingest: journalctl timed out")
if proc is not None and proc.returncode is None:
try:
proc.terminate()
await asyncio.wait_for(proc.wait(), timeout=2.0)
except Exception:
try:
proc.kill()
await proc.wait()
except Exception:
pass
return
except FileNotFoundError:
logger.debug("Log ingest: journalctl not found — skipping")
return
except Exception:
logger.exception("Log ingest: failed to run journalctl")
if proc is not None and proc.returncode is None:
try:
proc.terminate()
await asyncio.wait_for(proc.wait(), timeout=2.0)
except Exception:
try:
proc.kill()
await proc.wait()
except Exception:
pass
return
output = stdout.decode("utf-8", errors="replace").strip()
if not output:
logger.debug("Log ingest: no new log lines")
return
lines = output.splitlines()
if not lines:
return
# Parse timestamp range from first and last lines
first_ts = _extract_timestamp(lines[0])
last_line_ts = _extract_timestamp(lines[-1])
# Chunk the log text
from rag_system.file_rag_manager import chunk_text
chunks = chunk_text(output, _CHUNK_SIZE, _CHUNK_OVERLAP)
if not chunks:
return
# Get or create the RAG store
try:
from gemini_embed_pool import openrouter_embed_batch
from rag_system import get_rag_store
store = get_rag_store(_STORE_NAME)
except Exception:
logger.exception("Log ingest: failed to get RAG store '%s'", _STORE_NAME)
return
# Build IDs, documents, and metadata for upsert
import jsonutil as json
now_iso = datetime.now(timezone.utc).isoformat()
ids = []
documents = []
metadatas = []
for i, chunk in enumerate(chunks):
chunk_hash = hashlib.md5(chunk.encode("utf-8")).hexdigest()[:12]
doc_id = f"log_{now_iso}_{i}_{chunk_hash}"
documents.append(f"Service: {_SERVICE_NAME}\nLog chunk:\n\n{chunk}")
ids.append(doc_id)
metadatas.append(
{
"source": "journalctl",
"service": _SERVICE_NAME,
"timestamp_range": f"{first_ts or 'unknown'} — {last_line_ts or 'unknown'}",
"chunk_index": i,
"chunk_count": len(chunks),
"ingested_at": now_iso,
"tags": json.dumps(_TAGS),
"line_count": len(lines),
}
)
# Embed via OpenRouter (async), then upsert with pre-computed embeddings
_EMBED_BATCH_SIZE = 50 # OpenRouter API batch limit
try:
for start in range(0, len(ids), _CHROMA_BATCH_SIZE):
end = start + _CHROMA_BATCH_SIZE
batch_ids = ids[start:end]
batch_docs = documents[start:end]
batch_metas = metadatas[start:end]
all_embeddings: list[list[float]] = []
for sub_start in range(0, len(batch_docs), _EMBED_BATCH_SIZE):
sub_end = sub_start + _EMBED_BATCH_SIZE
sub_docs = batch_docs[sub_start:sub_end]
sub_embs = await openrouter_embed_batch(
sub_docs,
model="google/gemini-embedding-001",
)
all_embeddings.extend(sub_embs)
await asyncio.to_thread(
store.collection.upsert,
ids=batch_ids,
documents=batch_docs,
embeddings=all_embeddings,
metadatas=batch_metas,
)
logger.info(
"Log ingest: indexed %d chunks (%d lines) into '%s'",
len(chunks),
len(lines),
_STORE_NAME,
)
except Exception:
logger.exception("Log ingest: pgvector upsert failed")
return
# Prune old entries to keep the store from growing unbounded
# Keep at most ~2000 entries (roughly 24h of logs at 6-hour intervals)
try:
count = await asyncio.to_thread(store.collection.count)
if count > 2000:
all_ids: list[str] = []
all_metadatas: list[dict] = []
offset = 0
while True:
page = await asyncio.to_thread(
store.collection.get,
include=["metadatas"],
limit=_PRUNE_PAGE_SIZE,
offset=offset,
)
if not page or not page.get("ids"):
break
all_ids.extend(page["ids"])
all_metadatas.extend(page["metadatas"])
if len(page["ids"]) < _PRUNE_PAGE_SIZE:
break
offset += len(page["ids"])
if all_ids:
paired = list(zip(all_ids, all_metadatas))
paired.sort(key=lambda p: p[1].get("ingested_at", ""))
excess = len(paired) - 2000
if excess > 0:
to_delete = [p[0] for p in paired[:excess]]
for start in range(0, len(to_delete), _PRUNE_DELETE_BATCH_SIZE):
end = start + _PRUNE_DELETE_BATCH_SIZE
await asyncio.to_thread(
store.collection.delete,
ids=to_delete[start:end],
)
logger.info(
"Log ingest: pruned %d old entries",
len(to_delete),
)
except Exception:
logger.warning("Log ingest: pruning failed", exc_info=True)
# Update the timestamp marker
new_ts = last_line_ts or datetime.now(timezone.utc).strftime(
"%Y-%m-%d %H:%M:%S",
)
_write_last_timestamp(new_ts)
def _extract_timestamp(line: str) -> str | None:
"""Pull the leading ISO timestamp off a ``short-iso`` journalctl line.
Cheaply recognizes the ``2026-03-10T23:15:42+0000 hostname ...`` shape by
checking the fixed dash/``T`` positions rather than full parsing, then slices
out everything up to the first space (the timestamp plus timezone). Used to
label each ingested chunk with the first and last log line's time so the
stored metadata carries a human-readable ``timestamp_range``. Pure string
work with no I/O.
Called by :func:`ingest_logs_tick` on the first and last lines of a journal
batch; it is a module-private helper with no external callers.
Args:
line: A single raw journalctl output line.
Returns:
str | None: The extracted timestamp substring, or ``None`` if the line
does not match the expected ``short-iso`` prefix.
"""
# short-iso format: "2026-03-10T23:15:42+0000 hostname ..."
if len(line) >= 19 and line[4] == "-" and line[7] == "-" and line[10] == "T":
# Take up to the space after the timezone
space_idx = line.find(" ")
if space_idx > 10:
return line[:space_idx]
return None