"""File-based RAG Manager.
Manages file indexing and retrieval using Postgres pgvector + whole-file tables.
Returns entire files, not chunks, for complete context when possible.
Features:
- URL fetching support
- PDF parsing via PyMuPDF
- Chunked embeddings for better semantic matching
- Full file retrieval on search
"""
import fnmatch
import hashlib
import jsonutil as json
import logging
import os
import re
import tempfile
from collections import OrderedDict
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Collection, Dict, List, Literal, Optional, Tuple
from urllib.parse import unquote, urlparse
import httpx
from .openrouter_embeddings import SyncOpenRouterEmbeddings
logger = logging.getLogger(__name__)
_PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
def _sanitize_collection_name(name: str) -> str:
"""Sanitize *name* for use as a ChromaDB collection name.
ChromaDB requires 3-512 characters, only ``[a-zA-Z0-9._-]``,
must start and end with ``[a-zA-Z0-9]``.
"""
sanitized = re.sub(r"[^a-zA-Z0-9._-]", "_", name)
sanitized = re.sub(r"_+", "_", sanitized)
sanitized = sanitized.strip("._-")
if len(sanitized) < 3:
sanitized = f"store_{sanitized}" if sanitized else "store_default"
if len(sanitized) > 500:
sanitized = sanitized[:500].rstrip("._-")
return sanitized
DEFAULT_STORE_PATH = os.path.join(_PROJECT_ROOT, "rag_stores")
STORE_FILES_SUBDIR = "files"
SUPPORTED_EXTENSIONS = {
".txt",
".md",
".py",
".js",
".ts",
".jsx",
".tsx",
".json",
".yaml",
".yml",
".toml",
".ini",
".cfg",
".html",
".css",
".scss",
".less",
".c",
".cpp",
".h",
".hpp",
".rs",
".go",
".java",
".kt",
".sh",
".bash",
".zsh",
".fish",
".sql",
".graphql",
".xml",
".csv",
".r",
".R",
".rmd",
".lua",
".rb",
".php",
".pl",
".pm",
".dockerfile",
".makefile",
".env",
".gitignore",
".dockerignore",
".rst",
".tex",
".bib",
".pdf",
}
MAX_FILE_SIZE = 15 * 1024 * 1024 # 15 MB
DEFAULT_CHUNK_SIZE = 1500
DEFAULT_CHUNK_OVERLAP = 200
CHROMA_MAX_BATCH = 5000
# Stores where content is log chunks only — no whole-file PG tables used on read.
CHUNK_ONLY_STORES = frozenset({"stargazer_logs"})
# ---------------------------------------------------------------------------
# Sensitive-path deny list (defense-in-depth for index_file/index_directory)
# ---------------------------------------------------------------------------
#
# Even when an admin runs ``rag_index_directory("/root/stargazer-v3")``
# legitimately, accidentally surfacing ``config.yaml`` or
# ``matrix_credentials.json`` into a queryable Chroma store would be a
# disaster: anyone with ``rag_search`` can then exfil the embedded chunks.
# These patterns are checked *after* the per-tool UNSANDBOXED_EXEC gate in
# ``tools/rag.py`` and apply to every caller including admins.
_SENSITIVE_BASENAME_PATTERNS: Tuple[str, ...] = (
"config.yaml",
"config.yml",
".env",
".env.*",
"matrix_credentials*.json",
"*.pem",
"*.key",
"id_rsa*",
"id_ed25519*",
"*_rsa",
"*_ed25519",
"*_rsa.pub",
"*_ed25519.pub",
"api_key_encryption_keys.db",
"*.rdb",
"memories_export*.json",
)
"""Filename glob patterns refused regardless of directory."""
_SENSITIVE_ANCESTOR_DIRS: Tuple[str, ...] = (
"dna_vault",
"nio_store",
".ssh",
".docker",
".gnupg",
)
"""Any path with one of these directory components in its ancestry is refused."""
_SENSITIVE_HOME_SUBPATHS: Tuple[str, ...] = (
"~/.ssh",
"~/.docker",
"~/.config/gcloud",
"~/.aws",
)
"""Resolved at call time via ``os.path.expanduser``; refused as prefix."""
_SENSITIVE_ABSOLUTE_PREFIXES: Tuple[str, ...] = (
"/etc/shadow",
"/etc/gshadow",
"/etc/sudoers",
"/etc/sudoers.d",
"/etc/ssl/private",
"/root/.ssh",
"/root/.docker",
"/root/.config/gcloud",
"/root/.aws",
)
"""Absolute path prefixes refused (matched against ``realpath``)."""
def _is_sensitive_path(file_path: str) -> bool:
"""Return True if *file_path* (or its real target) is on the deny list.
The check operates on ``os.path.realpath`` so symlinks don't bypass
it. Compares filename via ``fnmatch``, walks ancestors for forbidden
directory names, and tests both expanded ``~`` paths and absolute
prefixes for prefix containment.
"""
if not file_path:
return False
try:
real = os.path.realpath(file_path)
except (OSError, ValueError):
return True
p = Path(real)
name = p.name
for pat in _SENSITIVE_BASENAME_PATTERNS:
if fnmatch.fnmatch(name, pat):
return True
parts_lower = {part.lower() for part in p.parts}
for d in _SENSITIVE_ANCESTOR_DIRS:
if d.lower() in parts_lower:
return True
for sub in _SENSITIVE_HOME_SUBPATHS:
try:
base = Path(os.path.expanduser(sub)).resolve()
except (OSError, RuntimeError, ValueError):
continue
if str(base).startswith("~"):
continue
try:
if p == base or p.is_relative_to(base):
return True
except (AttributeError, ValueError):
if real == str(base) or real.startswith(str(base) + os.sep):
return True
for pref in _SENSITIVE_ABSOLUTE_PREFIXES:
try:
pref_path = Path(pref)
if p == pref_path or p.is_relative_to(pref_path):
return True
except (AttributeError, ValueError):
if real == pref or real.startswith(pref + os.sep):
return True
return False
# ---------------------------------------------------------------------------
# PDF helpers
# ---------------------------------------------------------------------------
[docs]
def decode_bytes_to_text(raw: bytes, filename: str) -> Optional[str]:
"""Decode raw file bytes to text, dispatching on the filename extension.
The single entry point for turning fetched or stored bytes into indexable
text. For ``.pdf`` names it delegates to
:func:`extract_pdf_text_from_bytes`; otherwise it tries a strict UTF-8 decode
and falls back to ``latin-1`` so arbitrary byte streams still produce
something usable. Pure aside from the temp file used by the PDF path; returns
``None`` only when even the ``latin-1`` fallback raises.
Called within this module by :meth:`FileRAGManager._load_whole_file_text` and
:meth:`FileRAGManager.index_url`, and externally by
``scripts/backfill_pg_source_files.py`` when re-deriving whole-file text.
Args:
raw (bytes): The raw file contents.
filename (str): Name used only to detect a ``.pdf`` extension.
Returns:
Optional[str]: The decoded text, or ``None`` if decoding failed.
"""
if filename.lower().endswith(".pdf"):
return extract_pdf_text_from_bytes(raw)
try:
return raw.decode("utf-8")
except UnicodeDecodeError:
try:
return raw.decode("latin-1")
except Exception:
return None
[docs]
def compress_pdf(
file_path: str,
output_path: Optional[str] = None,
remove_images: bool = True,
) -> Tuple[str, int, int]:
"""Shrink an oversized PDF in place (or to a copy) before indexing.
Used to bring PDFs that exceed ``MAX_FILE_SIZE`` under the limit so they can
still be ingested. Opens the document with PyMuPDF, optionally strips every
embedded image (the dominant size contributor), and rewrites it with
``ez_save()`` for object-stream/garbage-collected compaction. Writes to
*output_path* when given, otherwise overwrites *file_path* in place, and logs
the size reduction. Filesystem-only; no network or store interaction. Note
that ``import fitz`` is unguarded here, so a missing PyMuPDF raises rather
than returning a sentinel.
Called within this module by :meth:`FileRAGManager._read_file_content` and
:meth:`FileRAGManager.index_url` on the over-limit PDF path. No external
in-repo callers.
Args:
file_path (str): Absolute path to the source PDF.
output_path (Optional[str]): Destination path; ``None`` overwrites the
source in place.
remove_images (bool): Whether to delete embedded images before saving.
Returns:
Tuple[str, int, int]: ``(out_path, original_size, compressed_size)`` in
bytes.
Raises:
ImportError: If PyMuPDF (``fitz``) is not installed.
"""
import fitz # PyMuPDF
original_size = os.path.getsize(file_path)
doc = fitz.open(file_path)
if remove_images:
images_removed = 0
for page in doc:
for img in page.get_images(full=True):
try:
page.delete_image(img[0])
images_removed += 1
except Exception:
pass
if images_removed:
logger.info("Removed %d images from PDF", images_removed)
out_path = output_path or file_path
doc.ez_save(out_path)
doc.close()
compressed_size = os.path.getsize(out_path)
reduction = 100 - (compressed_size * 100 // original_size) if original_size else 0
logger.info(
"Compressed PDF: %d -> %d bytes (%d%% reduction)",
original_size,
compressed_size,
reduction,
)
return out_path, original_size, compressed_size
# ---------------------------------------------------------------------------
# Text chunking
# ---------------------------------------------------------------------------
[docs]
def chunk_text(
text: str,
chunk_size: int = DEFAULT_CHUNK_SIZE,
overlap: int = DEFAULT_CHUNK_OVERLAP,
) -> List[str]:
"""Split text into overlapping chunks on paragraph and sentence boundaries.
The shared chunker that turns whole documents into the embed-sized units the
vector store indexes. It prefers natural boundaries — splitting on blank-line
paragraphs first, then sentence breaks, and only hard-slicing when a single
sentence still exceeds *chunk_size* — and carries an *overlap* tail of
characters between consecutive chunks so semantic context is not lost at the
seams. Pure string processing with no I/O; returns the input as a single
chunk when it already fits.
Called within this module by :meth:`FileRAGManager.index_file` and
:meth:`FileRAGManager.index_url`, and externally by ``log_rag_ingest`` and the
cloud-RAG ingest path in ``tools/cloud_rag.py``.
Args:
text (str): The full document text to split.
chunk_size (int): Target maximum chunk length in characters.
overlap (int): Number of trailing characters repeated into the next
chunk for context continuity.
Returns:
List[str]: The ordered list of chunk strings (a single-element list when
the text fits in one chunk).
"""
if len(text) <= chunk_size:
return [text]
chunks: List[str] = []
paragraphs = re.split(r"\n\n+", text)
current_chunk = ""
for para in paragraphs:
if len(current_chunk) + len(para) + 2 > chunk_size:
if current_chunk:
chunks.append(current_chunk.strip())
current_chunk = (
current_chunk[-overlap:] if len(current_chunk) > overlap else ""
)
if len(para) > chunk_size:
sentences = re.split(r"(?<=[.!?])\s+", para)
for sentence in sentences:
if len(current_chunk) + len(sentence) + 1 > chunk_size:
if current_chunk:
chunks.append(current_chunk.strip())
current_chunk = (
current_chunk[-overlap:]
if len(current_chunk) > overlap
else ""
)
if len(sentence) > chunk_size:
for i in range(
0,
len(sentence),
chunk_size - overlap,
):
chunks.append(
sentence[i : i + chunk_size],
)
else:
current_chunk = sentence
else:
current_chunk += (" " + sentence) if current_chunk else sentence
else:
current_chunk = para
else:
current_chunk += ("\n\n" + para) if current_chunk else para
if current_chunk.strip():
chunks.append(current_chunk.strip())
return chunks
# ---------------------------------------------------------------------------
# URL fetching
# ---------------------------------------------------------------------------
[docs]
async def fetch_url_content(
url: str,
timeout: float = 30.0,
) -> Tuple[Optional[bytes], Optional[str], Optional[str]]:
"""Fetch content from *url* (SSRF-guarded). Returns ``(bytes, content_type, filename)``.
Routes through :func:`tools._safe_http.safe_http_request`, which validates
every redirect hop and pins each connect to a vetted public IP, so a
user-supplied ingestion URL cannot be redirected (or DNS-rebound) into an
internal host such as 10.10.0.x:6379.
"""
from tools._safe_http import safe_http_request, safe_httpx_client
try:
async with safe_httpx_client(timeout=timeout) as client:
response = await safe_http_request(
client, "GET", url, max_redirects=5
)
response.raise_for_status()
content = response.content
content_type = (
response.headers.get("content-type", "").split(";")[0].strip()
)
filename = None
cd = response.headers.get("content-disposition", "")
if "filename=" in cd:
match = re.search(r'filename[*]?=["\']?([^"\';\s]+)', cd)
if match:
filename = unquote(match.group(1))
if not filename:
parsed = urlparse(url)
path = unquote(parsed.path)
if path and "/" in path:
filename = path.split("/")[-1]
if not filename or "." not in filename:
filename = None
if not filename:
ext_map = {
"application/pdf": "document.pdf",
"text/plain": "document.txt",
"application/json": "data.json",
"text/html": "page.html",
}
filename = ext_map.get(content_type, "downloaded_file")
return content, content_type, filename
except Exception as e:
import asyncio
from observability import publish_http_error_event
status = getattr(getattr(e, "response", None), "status_code", 0)
asyncio.create_task(
publish_http_error_event(
http_service="file_rag_fetch",
http_status=status,
endpoint=url[:120],
detail=str(e)[:500],
error_kind="network" if status == 0 else "",
)
)
logger.error("Error fetching %s: %s", url, e)
return None, None, None
# ---------------------------------------------------------------------------
# FileRAGManager
# ---------------------------------------------------------------------------
[docs]
class FileRAGManager:
"""File- and URL-oriented RAG store over Postgres pgvector plus whole-file tables.
One instance manages a single named store: a per-store Postgres schema holding a
``files_<schema>`` vector table (``halfvec(3072)`` + HNSW) for chunk embeddings,
plus ``documents`` / ``source_files`` tables that keep the full original text and
raw bytes so search can return entire files rather than just the matched chunk.
It wraps the pgvector table in a Chroma-shaped facade
(``vector_store.ChromaCompatCollection``) and embeds text through
:class:`rag_system.openrouter_embeddings.SyncOpenRouterEmbeddings`
(Gemini by default), so indexing and search reach the embedding provider over
HTTP while persistence stays in Postgres. Stores listed in ``CHUNK_ONLY_STORES``
(e.g. ``stargazer_logs``) skip the whole-file tables entirely.
Instances are normally obtained through the module-level LRU registry
:func:`get_rag_store` (and :func:`get_stargazer_docs_store`) rather than
constructed directly; the RAG tool handlers in ``tools/rag.py`` /
``tools/cloud_rag.py``, :class:`rag_system.auto_search.RAGAutoSearchManager`, and
``starwiki/rag_integration`` all go through that registry, with ``starwiki`` being
the one place that instantiates :class:`FileRAGManager` directly.
"""
[docs]
def __init__(
self,
store_name: str = "default",
store_path: Optional[str] = None,
api_key: Optional[str] = None,
embedding_model: str = "google/gemini-embedding-001",
max_file_size: int = MAX_FILE_SIZE,
gemini_only: bool = True,
document_task_type: Optional[str] = None,
query_task_type: Optional[str] = None,
):
"""Initialize the instance.
Args:
store_name (str): The store name value.
store_path (Optional[str]): The store path value.
api_key (Optional[str]): The api key value.
embedding_model (str): The embedding model value.
max_file_size (int): The max file size value.
gemini_only (bool): Use only the Gemini API for embeddings.
document_task_type: Optional Gemini task type for indexed text
(e.g. ``RETRIEVAL_DOCUMENT``).
query_task_type: Optional Gemini task type for search queries
(e.g. ``RETRIEVAL_QUERY``).
"""
self.store_name = store_name
self.store_path = store_path or DEFAULT_STORE_PATH
self.embedding_model = embedding_model
self.max_file_size = max_file_size
self._sanitized_name = _sanitize_collection_name(store_name)
self._collection_name = f"files_{self._sanitized_name}"
# Legacy disk dir (read-only fallback for unmigrated files).
self.db_path = os.path.join(self.store_path, self._sanitized_name)
self.files_path = os.path.join(self.db_path, STORE_FILES_SUBDIR)
self.embedding_fn = SyncOpenRouterEmbeddings(
api_key=api_key,
model=embedding_model,
gemini_only=gemini_only,
document_task_type=document_task_type,
query_task_type=query_task_type,
)
# Postgres + pgvector: one schema per store, ``files_<store>`` vector
# table (halfvec(3072) + HNSW). ``self.collection`` is a Chroma-shaped
# facade so the rest of this class is unchanged.
from vector_store import (
ChromaCompatCollection,
PgVectorCollection,
pg_ident,
)
self._schema = pg_ident(self._sanitized_name)
self._table = pg_ident(self._collection_name)
self._pg = PgVectorCollection(self._schema, self._table)
self._pg.ensure()
from .pg_source_files import ensure_source_tables
if self._sanitized_name not in CHUNK_ONLY_STORES:
ensure_source_tables(self._schema)
self.collection = ChromaCompatCollection(self._pg, self.embedding_fn)
logger.info(
"Initialized FileRAGManager '%s' (pgvector %s.%s; whole files in PG)",
store_name,
self._schema,
self._table,
)
# -- helpers -------------------------------------------------------------
def _pg_whole_files_enabled(self) -> bool:
"""Report whether this store keeps whole-file Postgres tables.
Returns ``True`` for ordinary stores and ``False`` for
chunk-only stores (those whose sanitized name is in the
module-level ``CHUNK_ONLY_STORES`` frozenset, e.g.
``stargazer_logs``), which index log chunks only and therefore
never populate or read the ``documents`` / ``source_files`` tables.
It performs a pure in-memory set membership test on
``self._sanitized_name`` and touches no I/O.
Used throughout this class as a guard before any whole-file
Postgres work: it is called by :meth:`_upsert_whole_file`,
:meth:`_load_whole_file_text`, :meth:`search`, :meth:`list_store_files`
and :meth:`clear` to decide whether to delegate into
``rag_system.pg_source_files``.
Returns:
bool: ``True`` if whole-file PG storage applies to this store,
``False`` for chunk-only stores.
"""
return self._sanitized_name not in CHUNK_ONLY_STORES
def _upsert_whole_file(
self,
filename: str,
content: str,
raw_bytes: bytes,
extraction_method: str = "text",
) -> None:
"""Persist a file's full text and raw bytes into the whole-file PG tables.
Backs the "return the entire file on search" behaviour: it computes a
SHA-256 over *raw_bytes* and delegates to
``rag_system.pg_source_files.upsert_whole_file`` to write the
``documents`` (extracted text) and ``source_files`` (raw bytes) rows for
this store's schema. A no-op for chunk-only stores, gated up front by
:meth:`_pg_whole_files_enabled`. Side effect is a Postgres write only; the
vector/chunk upsert happens separately in the index methods.
Called within this class by :meth:`index_file` and :meth:`index_url` after
the source content has been read/decoded.
Args:
filename (str): Stored filename key for the document.
content (str): Extracted plain-text content to persist.
raw_bytes (bytes): Original file bytes (also hashed for the SHA).
extraction_method (str): How *content* was produced, e.g. ``"text"``
or ``"pdf"``.
Returns:
None
"""
if not self._pg_whole_files_enabled():
return
from .pg_source_files import upsert_whole_file
sha = hashlib.sha256(raw_bytes).hexdigest()
upsert_whole_file(
self._schema,
filename,
sha,
content,
raw_bytes,
extraction_method=extraction_method,
)
def _load_whole_file_text(self, filename: str) -> Optional[str]:
"""Resolve the full text of a stored file across all storage tiers.
The canonical "give me the whole file" lookup, tried in priority order:
the Postgres ``documents`` table (pre-extracted text), then the
``source_files`` raw bytes decoded via :func:`decode_bytes_to_text`, and
finally the legacy on-disk ``files`` directory (PDF-extracted or
UTF-8/latin-1 decoded) for stores predating the Postgres migration. The PG
tiers are skipped entirely for chunk-only stores via
:meth:`_pg_whole_files_enabled`. Reads Postgres and the filesystem;
returns ``None`` when the file is found in none of the tiers.
Called within this class by :meth:`search` (whole-file content mode) and
:meth:`read_store_file`.
Args:
filename (str): Stored filename to resolve.
Returns:
Optional[str]: The full file text, or ``None`` if not found anywhere.
"""
if not filename:
return None
if self._pg_whole_files_enabled():
from .pg_source_files import get_document_text, get_source_file_bytes
text = get_document_text(self._schema, filename)
if text:
return text
raw = get_source_file_bytes(self._schema, filename)
if raw is not None:
decoded = decode_bytes_to_text(raw, filename)
if decoded:
return decoded
if os.path.isdir(self.files_path):
fp = os.path.join(self.files_path, filename)
if os.path.isfile(fp):
if filename.lower().endswith(".pdf"):
return extract_pdf_text(fp)
try:
with open(fp, "r", encoding="utf-8") as f:
return f.read()
except UnicodeDecodeError:
with open(fp, "r", encoding="latin-1") as f:
return f.read()
return None
@staticmethod
def _compute_file_hash(content: str) -> str:
"""Compute the SHA-256 content hash used for index deduplication.
Returns the hex digest of the UTF-8-encoded *content*. Stored in chunk
metadata as ``content_hash`` so re-indexing an unchanged file (same path or
URL, same hash) can be skipped instead of re-embedding. Pure, no I/O.
Called within this class by :meth:`index_file` and :meth:`index_url`.
Args:
content (str): The extracted file text to hash.
Returns:
str: The hex-encoded SHA-256 digest.
"""
return hashlib.sha256(content.encode("utf-8")).hexdigest()
@staticmethod
def _get_file_id(file_path: str) -> str:
"""Derive a stable, path-based document id for a local file.
Normalizes *file_path* to an absolute, canonical form and returns its MD5
hex digest, giving every distinct on-disk path one deterministic id that is
reused as the chunk-id prefix (``<file_id>_chunk_<n>``). Re-indexing the same
path therefore lands on the same ids, enabling clean upsert/replace. Pure,
no I/O.
Called within this class by :meth:`index_file` to seed chunk ids.
Args:
file_path (str): The local file path to derive an id from.
Returns:
str: A hex-encoded MD5 digest of the normalized absolute path.
"""
normalized = os.path.normpath(os.path.abspath(file_path))
return hashlib.md5(normalized.encode("utf-8")).hexdigest()
@staticmethod
def _is_supported_file(file_path: str) -> bool:
"""Decide whether a file's type is eligible for indexing.
Gatekeeps indexing to known text/code/document types: it accepts the file
when its lower-cased suffix is in the module-level ``SUPPORTED_EXTENSIONS``
set, or when the bare (extensionless) filename is a recognized special file
such as ``dockerfile``, ``makefile``, ``readme``, ``license`` or
``changelog``. Pure string inspection with no filesystem access; this is a
type check, separate from the security-oriented :func:`_is_sensitive_path`
deny list.
Called within this class by :meth:`index_file`, and externally by
``scripts/backfill_pg_source_files.py`` when scanning a directory.
Args:
file_path (str): Path whose extension / basename is examined.
Returns:
bool: ``True`` if the file type is supported for indexing.
"""
ext = Path(file_path).suffix.lower()
filename = Path(file_path).name.lower()
known_files = {
"dockerfile",
"makefile",
"readme",
"license",
"changelog",
}
return ext in SUPPORTED_EXTENSIONS or filename in known_files
def _read_file_content(self, file_path: str) -> Optional[str]:
"""Read a local file's text content, honoring size limits and PDFs.
The disk-side counterpart used by indexing to turn a path into text. It
enforces ``self.max_file_size``: oversized PDFs are routed through
:func:`compress_pdf` and re-extracted if they fit afterwards, while other
oversized files are refused (logged, ``None``). PDFs are extracted with
:func:`extract_pdf_text`; everything else is read as UTF-8 with a
``latin-1`` fallback. Reads the filesystem (and may write/remove a transient
``.compressed.pdf``); returns ``None`` on any read/extract failure.
Called within this class by :meth:`index_file`.
Args:
file_path (str): Absolute path to the file to read.
Returns:
Optional[str]: The file's text content, or ``None`` if it could not be
read, was too large, or yielded no text.
"""
try:
file_size = os.path.getsize(file_path)
is_pdf = file_path.lower().endswith(".pdf")
if file_size > self.max_file_size:
if is_pdf:
logger.info(
"PDF exceeds size limit, attempting compression...",
)
compressed_path = file_path + ".compressed.pdf"
try:
_, _, compressed_size = compress_pdf(
file_path,
compressed_path,
)
if compressed_size <= self.max_file_size:
content = extract_pdf_text(compressed_path)
try:
os.remove(compressed_path)
except OSError:
pass
return content
try:
os.remove(compressed_path)
except OSError:
pass
except Exception:
pass
return None
logger.warning(
"File too large (>%d bytes): %s",
self.max_file_size,
file_path,
)
return None
if is_pdf:
return extract_pdf_text(file_path)
try:
with open(file_path, "r", encoding="utf-8") as f:
return f.read()
except UnicodeDecodeError:
with open(file_path, "r", encoding="latin-1") as f:
return f.read()
except Exception as e:
logger.error("Failed to read file %s: %s", file_path, e)
return None
@staticmethod
def _create_embedding_text(file_path: str, content: str) -> str:
"""Prepend a File/Path/Type header to chunk text before embedding.
Enriches each chunk with lightweight provenance — the filename, the last
three path components, and the file extension — so the embedding (and any
later keyword match) carries signal about where the content came from, not
just the body. The header is later stripped on read by
:meth:`_strip_embedding_header` so it never reaches the user. Pure string
formatting, no I/O.
Called within this class by :meth:`index_file` for both the chunked and
single-document indexing paths.
Args:
file_path (str): Source path used to derive the header metadata.
content (str): The chunk body to embed.
Returns:
str: The header-prefixed text to embed.
"""
path_parts = Path(file_path).parts
filename = Path(file_path).name
ext = Path(file_path).suffix
return (
f"File: {filename}\nPath: {'/'.join(path_parts[-3:])}\n"
f"Type: {ext or 'unknown'}\n\n{content}"
)
@staticmethod
def _strip_embedding_header(doc: str) -> str:
"""Strip the File/Path/Type provenance header back off a stored chunk.
The inverse of :meth:`_create_embedding_text`: it drops everything up to and
including the first blank-line separator so callers get the raw chunk body
without the embedding-only header. Falls back to the trimmed original when
no separator is present (or the body would be empty), so it never returns
nothing for non-empty input. Pure, no I/O.
Called within this class by :meth:`search` when assembling chunk-mode result
content.
Args:
doc (str): The stored chunk text, possibly header-prefixed.
Returns:
str: The chunk body with the header removed (or the trimmed input as a
fallback).
"""
if not doc:
return ""
marker = "\n\n"
idx = doc.find(marker)
if idx == -1:
return doc.strip()
body = doc[idx + len(marker) :].strip()
return body if body else doc.strip()
# -- index ---------------------------------------------------------------
[docs]
def index_file(
self,
file_path: str,
tags: Optional[List[str]] = None,
use_chunking: bool = True,
chunk_size: int = DEFAULT_CHUNK_SIZE,
chunk_overlap: int = DEFAULT_CHUNK_OVERLAP,
force: bool = False,
) -> Dict[str, Any]:
"""Index a single file into the collection.
When *force* is True the content-hash dedup check is skipped so
the file is always re-embedded (but the store is **not** cleared).
"""
file_path = os.path.abspath(file_path)
if not os.path.exists(file_path):
return {"success": False, "error": f"File not found: {file_path}"}
if not os.path.isfile(file_path):
return {"success": False, "error": f"Not a file: {file_path}"}
if _is_sensitive_path(file_path):
logger.warning(
"RAG index_file refused sensitive path: %s",
file_path,
)
return {
"success": False,
"skipped_sensitive": True,
"error": (
f"Refusing to index sensitive path: {file_path} "
"(matches the RAG sensitive-file deny list)."
),
}
if not self._is_supported_file(file_path):
return {
"success": False,
"error": f"Unsupported file type: {file_path}",
}
content = self._read_file_content(file_path)
if content is None:
return {
"success": False,
"error": f"Failed to read file: {file_path}",
}
try:
with open(file_path, "rb") as f:
raw_bytes = f.read()
except OSError as e:
return {"success": False, "error": f"Failed to read file bytes: {e}"}
filename = Path(file_path).name
extraction_method = "pdf" if filename.lower().endswith(".pdf") else "text"
self._upsert_whole_file(
filename,
content,
raw_bytes,
extraction_method=extraction_method,
)
file_id = self._get_file_id(file_path)
content_hash = self._compute_file_hash(content)
try:
existing = self.collection.get(
where={"file_path": file_path},
include=["metadatas"],
)
if existing and existing.get("metadatas"):
if not force:
meta0 = existing["metadatas"][0]
if meta0.get("content_hash") == content_hash:
return {
"success": True,
"action": "skipped",
"file_path": file_path,
"reason": "content unchanged",
}
old_ids = existing.get("ids", [])
if old_ids:
self.collection.delete(ids=old_ids)
except Exception:
pass
base_metadata: Dict[str, Any] = {
"file_path": file_path,
"filename": Path(file_path).name,
"extension": Path(file_path).suffix,
"content_hash": content_hash,
"file_size": len(content),
"indexed_at": datetime.now(timezone.utc).isoformat(),
"tags": json.dumps(tags or []),
"store_name": self.store_name,
"source_type": "local",
}
try:
if use_chunking and len(content) > chunk_size:
chunks = [
c
for c in chunk_text(content, chunk_size, chunk_overlap)
if c and c.strip()
]
if not chunks:
chunks = [
content[:chunk_size] if len(content) > chunk_size else content
]
ids, documents, metadatas = [], [], []
for i, chk in enumerate(chunks):
chunk_meta = base_metadata.copy()
chunk_meta.update(
{
"chunk_index": i,
"chunk_count": len(chunks),
"is_chunked": True,
}
)
ids.append(f"{file_id}_chunk_{i}")
documents.append(
self._create_embedding_text(file_path, chk),
)
metadatas.append(chunk_meta)
for start in range(0, len(ids), CHROMA_MAX_BATCH):
end = start + CHROMA_MAX_BATCH
self.collection.upsert(
ids=ids[start:end],
documents=documents[start:end],
metadatas=metadatas[start:end],
)
logger.info(
"Indexed file with %d chunks: %s",
len(chunks),
file_path,
)
return {
"success": True,
"action": "indexed",
"file_id": file_id,
"file_path": file_path,
"file_size": len(content),
"chunk_count": len(chunks),
}
base_metadata.update(
{
"is_chunked": False,
"chunk_index": 0,
"chunk_count": 1,
}
)
self.collection.upsert(
ids=[file_id],
documents=[self._create_embedding_text(file_path, content)],
metadatas=[base_metadata],
)
logger.info("Indexed file: %s", file_path)
return {
"success": True,
"action": "indexed",
"file_id": file_id,
"file_path": file_path,
"file_size": len(content),
}
except Exception as e:
logger.error(
"Failed to index file %s: %s",
file_path,
e,
exc_info=True,
)
return {"success": False, "error": str(e)}
[docs]
async def index_url(
self,
url: str,
tags: Optional[List[str]] = None,
use_chunking: bool = True,
chunk_size: int = DEFAULT_CHUNK_SIZE,
chunk_overlap: int = DEFAULT_CHUNK_OVERLAP,
) -> Dict[str, Any]:
"""Fetch a remote document by URL and index it into this store.
The URL ingestion counterpart of :meth:`index_file`. It downloads via the
SSRF-guarded :func:`fetch_url_content`, derives a stable stored filename
(URL-hash prefixed and sanitized), compresses oversized PDFs through
:func:`compress_pdf`, decodes the bytes to text with
:func:`decode_bytes_to_text`, and persists the whole file to Postgres via
:meth:`_upsert_whole_file`. It then dedups against any prior copy of the
same ``source_url`` by content hash and, unless unchanged, upserts the
embedded chunks (or a single document) into the pgvector collection with
``source_type="url"`` metadata. Touches the network (download), the
embedding provider (via the collection upsert), Postgres, and a transient
temp file for PDF compression.
Called by the ``rag_index_url`` tool handler in ``tools/rag.py``.
Args:
url (str): The document URL to fetch and index.
tags (Optional[List[str]]): Optional tag labels stored in metadata.
use_chunking (bool): Whether to chunk large content before embedding.
chunk_size (int): Target chunk length in characters.
chunk_overlap (int): Overlap carried between chunks.
Returns:
Dict[str, Any]: A result dict with ``success`` and, on success,
``action`` (``"indexed"`` or ``"skipped"``), ``url``, ``filename``,
sizes and ``stored_path``; on failure an ``error`` message.
"""
content_bytes, content_type, filename = await fetch_url_content(url)
if content_bytes is None:
return {"success": False, "error": f"Failed to fetch URL: {url}"}
url_hash = hashlib.md5(url.encode()).hexdigest()[:12]
ext = Path(filename).suffix if filename else ""
if not ext:
ext_map = {
"application/pdf": ".pdf",
"text/plain": ".txt",
"application/json": ".json",
"text/html": ".html",
}
ext = ext_map.get(content_type, ".txt")
is_pdf = ext.lower() == ".pdf"
file_size = len(content_bytes)
if file_size > self.max_file_size and not is_pdf:
return {
"success": False,
"error": (
f"File too large: {file_size} bytes "
f"(limit: {self.max_file_size})"
),
}
base_filename = Path(filename).name if filename else "downloaded"
base_filename = re.sub(r'[/\\:*?"<>|]', "_", base_filename)
stored_filename = f"{url_hash}_{base_filename}"
if not stored_filename.endswith(ext):
stored_filename += ext
work_bytes = content_bytes
if is_pdf and file_size > self.max_file_size:
try:
with tempfile.NamedTemporaryFile(
suffix=".pdf", delete=False
) as tmp_in:
tmp_in.write(content_bytes)
in_path = tmp_in.name
compressed_path = in_path + ".compressed.pdf"
try:
_, _, compressed_size = compress_pdf(in_path, compressed_path)
if compressed_size <= self.max_file_size:
with open(compressed_path, "rb") as f:
work_bytes = f.read()
else:
return {
"success": False,
"error": (
f"PDF still too large after compression: "
f"{compressed_size}"
),
}
finally:
for p in (compressed_path, in_path):
try:
os.unlink(p)
except OSError:
pass
except Exception as e:
return {"success": False, "error": f"PDF compression failed: {e}"}
if is_pdf:
content = decode_bytes_to_text(work_bytes, stored_filename)
if content is None:
return {
"success": False,
"error": "Failed to extract text from PDF",
}
else:
content = decode_bytes_to_text(work_bytes, stored_filename)
if content is None:
return {
"success": False,
"error": "Failed to decode content",
}
self._upsert_whole_file(
stored_filename,
content,
work_bytes,
extraction_method="pdf" if is_pdf else "text",
)
stored_path = f"pg://{self.store_name}/{stored_filename}"
content_hash = self._compute_file_hash(content)
file_id = hashlib.md5(url.encode()).hexdigest()
try:
existing = self.collection.get(
where={"source_url": url},
include=["metadatas"],
)
if existing and existing.get("metadatas"):
meta0 = existing["metadatas"][0]
if meta0.get("content_hash") == content_hash:
return {
"success": True,
"action": "skipped",
"url": url,
"reason": "content unchanged",
}
old_ids = existing.get("ids", [])
if old_ids:
self.collection.delete(ids=old_ids)
except Exception:
pass
base_metadata: Dict[str, Any] = {
"file_path": stored_path,
"filename": stored_filename,
"extension": ext,
"content_hash": content_hash,
"file_size": len(content),
"indexed_at": datetime.now(timezone.utc).isoformat(),
"tags": json.dumps(tags or []),
"store_name": self.store_name,
"source_type": "url",
"source_url": url,
}
try:
if use_chunking and len(content) > chunk_size:
chunks = [
c
for c in chunk_text(content, chunk_size, chunk_overlap)
if c and c.strip()
]
if not chunks:
chunks = [
content[:chunk_size] if len(content) > chunk_size else content
]
ids, documents, metadatas = [], [], []
for i, chk in enumerate(chunks):
chunk_meta = base_metadata.copy()
chunk_meta.update(
{
"chunk_index": i,
"chunk_count": len(chunks),
"is_chunked": True,
}
)
ids.append(f"{file_id}_chunk_{i}")
doc_text = f"File: {stored_filename}\nURL: {url}\n"
doc_text += f"Type: {ext}\n\n{chk}"
documents.append(doc_text)
metadatas.append(chunk_meta)
self.collection.upsert(
ids=ids,
documents=documents,
metadatas=metadatas,
)
logger.info(
"Indexed URL with %d chunks: %s",
len(chunks),
url,
)
return {
"success": True,
"action": "indexed",
"url": url,
"filename": stored_filename,
"file_size": len(content),
"chunk_count": len(chunks),
"stored_path": stored_path,
}
base_metadata.update(
{
"is_chunked": False,
"chunk_index": 0,
"chunk_count": 1,
}
)
doc_text = f"File: {stored_filename}\nURL: {url}\n"
doc_text += f"Type: {ext}\n\n{content}"
self.collection.upsert(
ids=[file_id],
documents=[doc_text],
metadatas=[base_metadata],
)
logger.info("Indexed URL: %s", url)
return {
"success": True,
"action": "indexed",
"url": url,
"filename": stored_filename,
"file_size": len(content),
"stored_path": stored_path,
}
except Exception as e:
logger.error("Failed to index URL %s: %s", url, e, exc_info=True)
return {"success": False, "error": str(e)}
[docs]
def index_directory(
self,
directory_path: str,
recursive: bool = True,
tags: Optional[List[str]] = None,
exclude_patterns: Optional[List[str]] = None,
max_workers: int = 6,
force: bool = False,
allowed_extensions: Optional[Collection[str]] = None,
) -> Dict[str, Any]:
"""Index all supported files in *directory_path*.
When *max_workers* > 1, files are indexed concurrently using a
thread pool. Each file's embedding batches are already
parallelised inside the embedding function, so even
``max_workers=1`` benefits from concurrent API calls.
*force* bypasses the per-file content-hash dedup check without
clearing the store, so already-indexed files get re-embedded.
When *allowed_extensions* is set, only files whose suffix (after
normalizing to a leading dot, lowercase) appears in the collection
are queued; ``None`` means no extension filter (all supported
types under *SUPPORTED_EXTENSIONS*).
"""
directory_path = os.path.abspath(directory_path)
if not os.path.isdir(directory_path):
return {
"success": False,
"error": f"Not a directory: {directory_path}",
}
exclude_patterns = exclude_patterns or [
"__pycache__",
".git",
"node_modules",
".venv",
"venv",
".env",
"*.pyc",
"*.pyo",
".DS_Store",
]
results: Dict[str, Any] = {
"success": True,
"indexed": 0,
"skipped": 0,
"failed": 0,
"files": [],
}
def should_exclude(path: str) -> bool:
"""Return True if *path* matches any directory exclude pattern.
Closure defined inside :meth:`index_directory` that closes over
that call's resolved *exclude_patterns* list (defaults such as
``__pycache__``, ``.git``, ``node_modules`` plus suffix globs like
``*.pyc``). Patterns beginning with ``*`` are treated as suffix
matches against the lower-cased path; all others are tested as
plain case-sensitive substrings. This is purely string logic with
no filesystem access and is distinct from
:func:`_is_sensitive_path`, which handles the security deny list.
It is called by the surrounding ``index_directory`` walk both to
prune traversed subdirectories and to skip individual files; it
has no other callers.
Args:
path (str): A directory or file path encountered during the
walk.
Returns:
bool: ``True`` if the path should be excluded from indexing.
"""
path_lower = path.lower()
for pat in exclude_patterns: # type: ignore[union-attr]
if pat.startswith("*"):
if path_lower.endswith(pat[1:]):
return True
elif pat in path:
return True
return False
file_paths: List[str] = []
sensitive_skipped: List[str] = []
walker = (
os.walk(directory_path)
if recursive
else [(directory_path, [], os.listdir(directory_path))]
)
for root, dirs, files in walker:
if recursive:
dirs[:] = [
d
for d in dirs
if not should_exclude(d)
and not _is_sensitive_path(os.path.join(root, d))
]
for fname in files:
file_path = os.path.join(root, fname)
if should_exclude(file_path):
continue
if _is_sensitive_path(file_path):
sensitive_skipped.append(file_path)
continue
if not recursive and not os.path.isfile(file_path):
continue
file_paths.append(file_path)
if sensitive_skipped:
logger.warning(
"RAG index_directory skipped %d sensitive files under %s",
len(sensitive_skipped),
directory_path,
)
results["sensitive_skipped"] = len(sensitive_skipped)
if allowed_extensions is not None:
ext_set: set[str] = set()
for raw in allowed_extensions:
e = raw.strip().lower()
if not e:
continue
if not e.startswith("."):
e = "." + e
ext_set.add(e)
if ext_set:
file_paths = [
fp for fp in file_paths if Path(fp).suffix.lower() in ext_set
]
total_files = len(file_paths)
done_count = 0
def _record(result: Dict[str, Any], lock=None) -> None:
"""Fold one per-file index result into the running totals.
Closure inside :meth:`index_directory` that mutates the enclosing
``results`` dict and the ``nonlocal done_count`` counter. It
appends *result* to ``results["files"]`` and bumps the
``indexed`` / ``skipped`` / ``failed`` tally based on the
result's ``success`` flag and ``action`` field, then emits an
``info`` progress log every 25 files (and on the final file).
Because the concurrent code path calls it from multiple
``ThreadPoolExecutor`` worker threads, an optional
:class:`threading.Lock` is acquired around the mutation and
released in a ``finally`` block; the sequential path passes
``None`` and skips locking.
It is called only by the surrounding ``index_directory`` (once per
file, from either the threaded or sequential branch).
Args:
result (Dict[str, Any]): The dict returned by
:meth:`index_file` for a single file.
lock: Optional :class:`threading.Lock` guarding the shared
aggregation state; ``None`` in the single-threaded path.
Returns:
None
"""
nonlocal done_count
if lock:
lock.acquire()
try:
results["files"].append(result)
if result.get("success"):
key = "indexed" if result.get("action") == "indexed" else "skipped"
results[key] += 1
else:
results["failed"] += 1
done_count += 1
if done_count % 25 == 0 or done_count == total_files:
logger.info(
"Progress: %d / %d files (%d indexed, %d skipped, %d failed)",
done_count,
total_files,
results["indexed"],
results["skipped"],
results["failed"],
)
finally:
if lock:
lock.release()
if max_workers > 1 and len(file_paths) > 1:
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
lock = threading.Lock()
def _index_one(fp: str) -> Dict[str, Any]:
"""Index a single file, binding the loop's tags and force flag.
Thin worker closure defined inside :meth:`index_directory`'s
threaded branch. It forwards *fp* to :meth:`index_file` while
capturing the surrounding call's *tags* and *force* arguments,
so each :class:`~concurrent.futures.ThreadPoolExecutor` task
runs the full per-file index (embedding, whole-file upsert,
dedup) for one path. It exists only to give the executor a
callable with a uniform one-argument signature.
It is submitted to the thread pool once per file by the
enclosing ``index_directory`` and has no other callers.
Args:
fp (str): Absolute path of the file to index.
Returns:
Dict[str, Any]: The result dict from :meth:`index_file`.
"""
return self.index_file(fp, tags=tags, force=force)
workers = min(max_workers, len(file_paths))
with ThreadPoolExecutor(max_workers=workers) as pool:
futures = {pool.submit(_index_one, fp): fp for fp in file_paths}
for future in as_completed(futures):
_record(future.result(), lock=lock)
else:
for file_path in file_paths:
result = self.index_file(file_path, tags=tags, force=force)
_record(result)
logger.info(
"Directory indexing complete: %d indexed, %d skipped, %d failed",
results["indexed"],
results["skipped"],
results["failed"],
)
return results
# -- search --------------------------------------------------------------
@staticmethod
def _merge_chunks(
chunks: List[tuple],
max_content_size: int,
) -> str:
"""Merge matched chunks into a single string.
*chunks* is a list of ``(chunk_index, chunk_text)`` tuples sorted
by ``chunk_index``. Adjacent chunks (index differs by 1) are
joined directly; non-adjacent chunks are separated by a ``[...]``
marker so the reader knows content was skipped.
"""
if not chunks:
return ""
merged_parts: List[str] = []
current_size = 0
prev_idx: int | None = None
for idx, text in chunks:
text = text.strip()
if not text:
continue
if current_size + len(text) + 10 > max_content_size:
remaining = max_content_size - current_size - 10
if remaining > 200:
merged_parts.append(text[:remaining].rstrip())
break
if prev_idx is not None:
if idx == prev_idx + 1:
merged_parts.append(text)
else:
merged_parts.append("\n\n[...]\n\n" + text)
else:
merged_parts.append(text)
current_size += len(text) + 10
prev_idx = idx
result = "\n".join(merged_parts)
return result
[docs]
def search(
self,
query: str,
n_results: int = 5,
tags: Optional[List[str]] = None,
return_content: bool = True,
query_embedding: list[float] | None = None,
max_content_size: int = 8000,
content_mode: Literal["whole", "chunks"] = "whole",
) -> List[Dict[str, Any]]:
"""Semantic search returning one result per matched file.
*content_mode* ``whole`` loads Postgres ``documents`` when available;
``chunks`` returns the best KNN-matched indexed chunk only.
"""
import time
t0 = time.monotonic()
if query_embedding is None and not (query or "").strip():
return []
try:
where_filter = None
if tags:
where_filter = {
"$or": [{"tags": {"$contains": tag}} for tag in tags],
}
query_kwargs: Dict[str, Any] = {
"n_results": n_results * 5,
"where": where_filter,
"include": ["metadatas", "distances", "documents"],
}
if query_embedding is not None:
query_kwargs["query_embeddings"] = [query_embedding]
else:
query_kwargs["query_texts"] = [query]
results = self.collection.query(**query_kwargs)
if not results or not results.get("metadatas"):
return []
documents = results.get("documents", [[]])[0]
file_chunks: Dict[str, Dict[str, Any]] = {}
for i, metadata in enumerate(results["metadatas"][0]):
file_path = metadata.get("file_path", "")
distance = (
results["distances"][0][i] if results.get("distances") else 1.0
)
chunk_index = int(metadata.get("chunk_index", 0))
chunk_text = documents[i] if i < len(documents) else ""
if file_path not in file_chunks:
file_chunks[file_path] = {
"metadata": metadata,
"best_distance": distance,
"chunks": [],
}
entry = file_chunks[file_path]
if distance < entry["best_distance"]:
entry["best_distance"] = distance
entry["metadata"] = metadata
entry["chunks"].append((distance, chunk_index, chunk_text))
output = []
sorted_files = sorted(
file_chunks.items(),
key=lambda x: x[1]["best_distance"],
)
for file_path, data in sorted_files[:n_results]:
metadata = data["metadata"]
best_distance = data["best_distance"]
entry: Dict[str, Any] = {
"file_path": file_path,
"filename": metadata.get("filename", ""),
"extension": metadata.get("extension", ""),
"file_size": metadata.get("file_size", 0),
"indexed_at": metadata.get("indexed_at", ""),
"tags": json.loads(metadata.get("tags", "[]")),
"similarity_score": (
1.0 - best_distance if best_distance is not None else None
),
"source_type": metadata.get("source_type", "local"),
"source_url": metadata.get("source_url", None),
}
if return_content:
if content_mode == "chunks":
best_distance, chunk_index, chunk_text = min(
data["chunks"],
key=lambda c: c[0],
)
entry["chunk_index"] = chunk_index
stripped = self._strip_embedding_header(chunk_text)
if len(stripped) > max_content_size:
stripped = stripped[:max_content_size]
entry["content"] = (
stripped if stripped else "[No chunk text available]"
)
else:
fname = metadata.get("filename", "") or Path(
file_path
).name
whole_text = (
self._load_whole_file_text(fname)
if fname and self._pg_whole_files_enabled()
else None
)
if whole_text:
entry["content"] = (
whole_text[:max_content_size]
if len(whole_text) > max_content_size
else whole_text
)
else:
ranked = sorted(
data["chunks"],
key=lambda c: c[0],
)
by_index = sorted(
[(ci, ct) for _, ci, ct in ranked],
key=lambda x: x[0],
)
seen_indices: set = set()
deduped = []
for ci, ct in by_index:
if ci not in seen_indices:
seen_indices.add(ci)
deduped.append((ci, ct))
content = self._merge_chunks(deduped, max_content_size)
entry["content"] = (
content
if content
else "[No chunk text available]"
)
output.append(entry)
logger.info(
"Search query '%s...' returned %d unique files",
query[:50],
len(output),
)
best_sim = (
output[0]["similarity_score"]
if output and output[0].get("similarity_score") is not None
else 0.0
)
from observability import publish_debug_event
import asyncio
try:
loop = asyncio.get_running_loop()
loop.create_task(
publish_debug_event(
"rag_retrieval",
"rag_system",
phase="search",
status="ok" if output else "empty",
duration_ms=(time.monotonic() - t0) * 1000,
preview=f"hits={len(output)} best_sim={best_sim:.2f} query_len={len(query)}",
payload={
"hits": len(output),
"best_sim": best_sim,
"query_len": len(query),
},
),
name="obs_rag_retrieval_ok",
)
except RuntimeError:
# No running loop (sync context)
pass
return output
except Exception as e:
logger.error("Search failed: %s", e, exc_info=True)
from observability import publish_debug_event
import asyncio
try:
loop = asyncio.get_running_loop()
loop.create_task(
publish_debug_event(
"rag_retrieval",
"rag_system",
phase="search",
status="error",
duration_ms=(time.monotonic() - t0) * 1000,
preview=f"error={str(e)[:100]}",
payload={"error": str(e), "query_len": len(query)},
),
name="obs_rag_retrieval_err",
)
except RuntimeError:
pass
return []
# -- remove / list / stats -----------------------------------------------
[docs]
def remove_file(self, file_path: str) -> Dict[str, Any]:
"""Remove every index entry (and the whole-file rows) for one local file.
Resolves *file_path* to an absolute path, looks up all chunk ids stored
under that ``file_path`` metadata, deletes them from the pgvector
collection, and then drops the matching ``documents`` / ``source_files``
rows for each affected filename via
``rag_system.pg_source_files.delete_whole_file`` so no orphaned whole-file
text survives. Writes to Postgres only; returns a failure dict when the
path is not present in the index.
Called by the ``rag_remove_file`` tool handler in ``tools/rag.py`` (run off
the event loop with ``asyncio.to_thread``).
Args:
file_path (str): Path of the indexed file to remove.
Returns:
Dict[str, Any]: ``{"success": True, "file_path", "entries_removed"}`` on
success, otherwise ``{"success": False, "error": ...}``.
"""
file_path = os.path.abspath(file_path)
try:
existing = self.collection.get(
where={"file_path": file_path},
include=["metadatas"],
)
if existing and existing.get("ids"):
metas = existing.get("metadatas") or []
filenames = {
m.get("filename")
for m in metas
if m and m.get("filename")
}
self.collection.delete(ids=existing["ids"])
from .pg_source_files import delete_whole_file
for fn in filenames:
delete_whole_file(self._schema, fn)
return {
"success": True,
"file_path": file_path,
"entries_removed": len(existing["ids"]),
}
return {
"success": False,
"error": f"File not found in index: {file_path}",
}
except Exception as e:
return {"success": False, "error": str(e)}
[docs]
def remove_url(self, url: str) -> Dict[str, Any]:
"""Remove every index entry (and whole-file rows) for one indexed URL.
The URL analogue of :meth:`remove_file`: it finds all chunk ids carrying the
given ``source_url`` metadata, deletes them from the pgvector collection,
and drops the corresponding ``documents`` / ``source_files`` rows via
``rag_system.pg_source_files.delete_whole_file``. Writes to Postgres only;
returns a failure dict when the URL is not in the index.
Called by the ``rag_remove_url`` tool handler in ``tools/rag.py`` (run off
the event loop with ``asyncio.to_thread``).
Args:
url (str): The previously indexed source URL to remove.
Returns:
Dict[str, Any]: ``{"success": True, "url", "entries_removed"}`` on
success, otherwise ``{"success": False, "error": ...}``.
"""
try:
existing = self.collection.get(
where={"source_url": url},
include=["metadatas"],
)
if existing and existing.get("ids"):
metas = existing.get("metadatas") or []
filenames = {
m.get("filename")
for m in metas
if m and m.get("filename")
}
self.collection.delete(ids=existing["ids"])
from .pg_source_files import delete_whole_file
for fn in filenames:
delete_whole_file(self._schema, fn)
return {
"success": True,
"url": url,
"entries_removed": len(existing["ids"]),
}
return {
"success": False,
"error": f"URL not found in index: {url}",
}
except Exception as e:
return {"success": False, "error": str(e)}
[docs]
def list_indexed_files(self, limit: int = 100) -> List[Dict[str, Any]]:
"""List metadata for files represented in the vector index.
Reads up to *limit* chunk metadata records from the pgvector collection and
projects each into a compact summary (path, filename, extension, size,
index timestamp, decoded tags). This reflects what has been *embedded* and
may include one row per chunk; the whole-file view is
:meth:`list_store_files`. Returns an empty list and logs on error.
Called by the RAG listing tool handlers in ``tools/rag.py`` (run off the
event loop with ``asyncio.to_thread``).
Args:
limit (int): Maximum number of metadata records to fetch.
Returns:
List[Dict[str, Any]]: One summary dict per fetched index entry (empty on
error).
"""
try:
results = self.collection.get(limit=limit, include=["metadatas"])
return [
{
"file_path": m.get("file_path", ""),
"filename": m.get("filename", ""),
"extension": m.get("extension", ""),
"file_size": m.get("file_size", 0),
"indexed_at": m.get("indexed_at", ""),
"tags": json.loads(m.get("tags", "[]")),
}
for m in results.get("metadatas", [])
]
except Exception as e:
logger.error("Failed to list files: %s", e)
return []
[docs]
def list_store_files(self) -> List[Dict[str, Any]]:
"""List the whole files held by this store, unioning Postgres and disk.
The file-centric (not chunk-centric) listing: it enumerates the Postgres
``source_files`` rows via ``rag_system.pg_source_files.list_whole_files``
(skipped for chunk-only stores through :meth:`_pg_whole_files_enabled`) and
then folds in any files from the legacy on-disk ``files`` directory that are
not already represented, so unmigrated stores still report their content.
Each entry carries a ``pg://`` or filesystem ``path``. Reads Postgres and
the filesystem; results are sorted by filename.
Called by the ``rag_list_store_files`` tool handler in ``tools/rag.py`` (run
off the event loop with ``asyncio.to_thread``).
Returns:
List[Dict[str, Any]]: Filename/size/modified/path dicts, one per whole
file, sorted by filename.
"""
from .pg_source_files import list_whole_files
by_name: dict[str, dict[str, Any]] = {}
if self._pg_whole_files_enabled():
for row in list_whole_files(self._schema):
fn = row["filename"]
by_name[fn] = {
"filename": fn,
"size": row.get("size", 0),
"modified": row.get("modified"),
"path": f"pg://{self.store_name}/{fn}",
}
if os.path.isdir(self.files_path):
for fname in os.listdir(self.files_path):
if fname in by_name:
continue
fp = os.path.join(self.files_path, fname)
if os.path.isfile(fp):
try:
st = os.stat(fp)
by_name[fname] = {
"filename": fname,
"size": st.st_size,
"modified": datetime.fromtimestamp(
st.st_mtime,
tz=timezone.utc,
).isoformat(),
"path": fp,
}
except Exception:
pass
return sorted(by_name.values(), key=lambda x: x.get("filename", ""))
[docs]
def read_store_file(self, filename: str) -> Dict[str, Any]:
"""Return the full text of one stored file by bare filename.
Powers the ``rag_read_store_file`` tool the LLM is hinted toward when a
retrieval chunk is not enough. It first rejects any *filename* containing a
path separator or ``..`` (path-traversal guard, so only flat store-local
names are honored), then resolves the content through
:meth:`_load_whole_file_text` (Postgres documents/source_files, then legacy
disk). Reads Postgres and possibly the filesystem; returns a failure dict
when the file is missing or unreadable.
Called by the ``rag_read_store_file`` tool handler in ``tools/rag.py`` (run
off the event loop with ``asyncio.to_thread``); the same tool string is
surfaced to the model by :class:`rag_system.auto_search.RAGAutoSearchManager`
and by ``message_processor.memory_linked_context``.
Args:
filename (str): Flat, store-local filename (no slashes or ``..``).
Returns:
Dict[str, Any]: ``{"success": True, "filename", "content", "size"}`` on
success, otherwise ``{"success": False, "error": ...}``.
"""
if "/" in filename or "\\" in filename or ".." in filename:
return {"success": False, "error": "Invalid filename."}
try:
content = self._load_whole_file_text(filename)
if content is None:
return {"success": False, "error": f"File not found: {filename}"}
return {
"success": True,
"filename": filename,
"content": content,
"size": len(content),
}
except Exception as e:
return {"success": False, "error": str(e)}
[docs]
def close(self) -> None:
"""No-op: the pgvector pools are process-wide and shared.
Retained for API compatibility (the LRU registry calls this on
eviction); there is no per-store connection to release.
"""
return None
[docs]
def get_stats(self) -> Dict[str, Any]:
"""Return a small summary of this store's identity and size.
Reports the store name, the legacy on-disk DB path, the live indexed-row
count from the pgvector collection (``collection.count()``, a Postgres
``COUNT``), and the configured embedding model. Intended for admin/status
surfaces; returns ``{"error": ...}`` instead of raising if the count query
fails.
No in-repo callers were found by grep; invoked via dynamic/admin paths.
Returns:
Dict[str, Any]: ``store_name``, ``store_path``, ``file_count`` and
``embedding_model`` keys, or an ``error`` key on failure.
"""
try:
return {
"store_name": self.store_name,
"store_path": self.db_path,
"file_count": self.collection.count(),
"embedding_model": self.embedding_model,
}
except Exception as e:
return {"error": str(e)}
[docs]
def clear(self) -> Dict[str, Any]:
"""Empty this store, dropping all embedded chunks and whole-file rows.
Wipes every vector row via the underlying ``PgVectorCollection.clear`` and,
for non-chunk-only stores, also truncates the whole-file tables through
``rag_system.pg_source_files.clear_source_tables`` so no document text is
left behind. The store schema/table remain so it can be re-indexed in place.
Writes to Postgres only; returns a failure dict instead of raising on error.
Called by the corpus (re)build scripts under ``scripts/`` (e.g.
``ingest_religion_rag``, ``ingest_law_rag``, ``update_docs_rag``,
``build_rag_from_directory``) before a fresh full ingest.
Returns:
Dict[str, Any]: ``{"success": True, "message": ...}`` on success,
otherwise ``{"success": False, "error": ...}``.
"""
try:
self._pg.clear()
if self._pg_whole_files_enabled():
from .pg_source_files import clear_source_tables
clear_source_tables(self._schema)
logger.info("Cleared RAG store: %s", self.store_name)
return {
"success": True,
"message": f"Store '{self.store_name}' cleared",
}
except Exception as e:
return {"success": False, "error": str(e)}
# ---------------------------------------------------------------------------
# Global store registry (LRU-bounded)
# ---------------------------------------------------------------------------
_STORE_REGISTRY_MAX_SIZE = int(os.environ.get("RAG_STORE_CACHE_SIZE", "5"))
_store_registry: OrderedDict[tuple, FileRAGManager] = OrderedDict()
# Sphinx-generated docs store: Gemini retrieval task types for index vs query.
STARGAZER_DOCS_STORE_NAME = "stargazer_docs"
STARGAZER_DOCS_DOCUMENT_TASK = "RETRIEVAL_DOCUMENT"
STARGAZER_DOCS_QUERY_TASK = "RETRIEVAL_QUERY"
[docs]
def get_rag_store(
store_name: str = "default",
api_key: Optional[str] = None,
max_file_size: Optional[int] = None,
gemini_only: bool = True,
document_task_type: Optional[str] = None,
query_task_type: Optional[str] = None,
) -> FileRAGManager:
"""Get or create a RAG store by name (LRU-cached).
At most ``_STORE_REGISTRY_MAX_SIZE`` stores are kept open
simultaneously. When a new store would exceed the limit the least
recently used entry is closed and evicted.
Cache entries are keyed by ``store_name`` plus optional embedding task
types so different embedding configurations do not share one client.
"""
cache_key = (
store_name,
document_task_type,
query_task_type,
)
if cache_key in _store_registry:
_store_registry.move_to_end(cache_key)
return _store_registry[cache_key]
kwargs: Dict[str, Any] = {"store_name": store_name, "api_key": api_key}
if max_file_size is not None:
kwargs["max_file_size"] = max_file_size
if gemini_only:
kwargs["gemini_only"] = True
if document_task_type is not None:
kwargs["document_task_type"] = document_task_type
if query_task_type is not None:
kwargs["query_task_type"] = query_task_type
store = FileRAGManager(**kwargs)
_store_registry[cache_key] = store
while len(_store_registry) > _STORE_REGISTRY_MAX_SIZE:
_evicted_key, evicted_store = _store_registry.popitem(last=False)
logger.info("Evicting RAG store %s from cache (LRU)", _evicted_key)
evicted_store.close()
return store
[docs]
def get_stargazer_docs_store() -> FileRAGManager:
"""Return the shared RAG store for Sphinx / tool documentation.
Uses ``RETRIEVAL_DOCUMENT`` for indexed chunks and ``RETRIEVAL_QUERY``
for search queries (Gemini embedding task types).
"""
return get_rag_store(
STARGAZER_DOCS_STORE_NAME,
document_task_type=STARGAZER_DOCS_DOCUMENT_TASK,
query_task_type=STARGAZER_DOCS_QUERY_TASK,
)
def _enumerate_pg_file_stores() -> List[Dict[str, Any]]:
"""Return ``[{'name','file_count'}]`` for every Postgres file-RAG store.
A file-RAG store is a schema containing a ``files_<schema>`` table.
``file_count`` uses the planner's row estimate (instant) so this stays
cheap enough for the per-message system-prompt build.
"""
from vector_store import get_sync_pool
out: List[Dict[str, Any]] = []
pool = get_sync_pool()
with pool.connection() as conn:
with conn.cursor() as cur:
cur.execute(
"SELECT t.table_schema, "
" GREATEST(COALESCE(c.reltuples, 0), 0)::bigint "
"FROM information_schema.tables t "
"JOIN pg_namespace n ON n.nspname = t.table_schema "
"JOIN pg_class c ON c.relname = t.table_name "
" AND c.relnamespace = n.oid "
"WHERE t.table_name = 'files_' || t.table_schema "
" AND t.table_schema NOT IN "
" ('pg_catalog', 'information_schema') "
"ORDER BY t.table_schema"
)
for schema, est in cur.fetchall():
out.append({"name": schema, "file_count": int(est)})
return out
_rag_stores_cache: "tuple[float, List[Dict[str, Any]]] | None" = None
_RAG_STORES_CACHE_TTL = 60.0
[docs]
def list_rag_stores() -> List[str]:
"""List the names of all available RAG stores.
A thin name-only projection over :func:`list_rag_stores_with_stats` (and thus
its 60s cache): every Postgres schema that owns a ``files_<schema>`` table is a
store. Swallows errors and returns an empty list so prompt-build and admin
callers never crash on a transient Postgres hiccup.
Called by the web config API in ``web/rag_config_api.py`` (which filters the
names for cloud-user stores).
Returns:
List[str]: Store names (empty on error).
"""
try:
return [s["name"] for s in list_rag_stores_with_stats()]
except Exception:
logger.debug("list_rag_stores (pgvector) failed", exc_info=True)
return []
[docs]
def list_rag_stores_with_stats() -> List[Dict[str, Any]]:
"""List stores with indexed-chunk counts from Postgres (60s cached).
Replaces the legacy filesystem scan. Counts come from planner row
estimates so this never opens a per-store client and stays cheap on the
per-message prompt path.
"""
global _rag_stores_cache
import time as _t
now = _t.monotonic()
if (
_rag_stores_cache is not None
and (now - _rag_stores_cache[0]) < _RAG_STORES_CACHE_TTL
):
return list(_rag_stores_cache[1])
try:
stores = _enumerate_pg_file_stores()
except Exception:
logger.debug("list_rag_stores_with_stats (pgvector) failed", exc_info=True)
return list(_rag_stores_cache[1]) if _rag_stores_cache else []
_rag_stores_cache = (now, stores)
return list(stores)
[docs]
def delete_rag_store(store_name: str) -> Dict[str, Any]:
"""Delete a RAG store completely (Postgres tables + local files dir).
Drops the store's ``files_<schema>`` / ``documents`` / ``source_files``
tables (not the whole schema, so shared schemas such as
``golden_goddess`` keep their non-file tables like ``ncm_kernel``).
"""
import shutil
from vector_store import get_sync_pool, pg_ident
global _rag_stores_cache
sanitized_name = _sanitize_collection_name(store_name)
schema = pg_ident(sanitized_name)
table = pg_ident(f"files_{sanitized_name}")
store_path = os.path.join(DEFAULT_STORE_PATH, sanitized_name)
# Evict any cached managers for this store first.
keys_to_drop = [k for k in list(_store_registry.keys()) if k[0] == store_name]
for k in keys_to_drop:
evicted = _store_registry.pop(k, None)
if evicted is not None:
evicted.close()
dropped = False
try:
pool = get_sync_pool()
with pool.connection() as conn:
with conn.cursor() as cur:
cur.execute(
"SELECT 1 FROM information_schema.tables "
"WHERE table_schema = %s AND table_name = %s",
(schema, table),
)
dropped = cur.fetchone() is not None
conn.execute(f'DROP TABLE IF EXISTS "{schema}"."{table}" CASCADE')
conn.execute(f'DROP TABLE IF EXISTS "{schema}"."documents" CASCADE')
conn.execute(f'DROP TABLE IF EXISTS "{schema}"."source_files" CASCADE')
# Remove the schema only if nothing else is left in it.
try:
conn.execute(f'DROP SCHEMA IF EXISTS "{schema}" RESTRICT')
except Exception:
pass
except Exception as e:
return {"success": False, "error": f"Failed to drop '{schema}': {e}"}
dir_removed = False
if os.path.exists(store_path):
try:
shutil.rmtree(store_path)
dir_removed = True
except Exception:
logger.debug("Failed to remove store dir %s", store_path, exc_info=True)
_rag_stores_cache = None # invalidate listing cache
if not dropped and not dir_removed:
return {
"success": False,
"error": f"Store '{store_name}' does not exist",
}
logger.info(
"Deleted RAG store: %s (pg dropped=%s, dir removed=%s)",
store_name,
dropped,
dir_removed,
)
return {
"success": True,
"message": f"Store '{store_name}' deleted successfully",
}