Source code for rag_system.file_rag_manager

"""File-based RAG Manager.

Manages file indexing and retrieval using Postgres pgvector + whole-file tables.
Returns entire files, not chunks, for complete context when possible.

Features:
- URL fetching support
- PDF parsing via PyMuPDF
- Chunked embeddings for better semantic matching
- Full file retrieval on search
"""

import fnmatch
import hashlib
import jsonutil as json
import logging
import os
import re
import tempfile
from collections import OrderedDict
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Collection, Dict, List, Literal, Optional, Tuple
from urllib.parse import unquote, urlparse

import httpx

from .openrouter_embeddings import SyncOpenRouterEmbeddings

logger = logging.getLogger(__name__)

_PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))


def _sanitize_collection_name(name: str) -> str:
    """Sanitize *name* for use as a ChromaDB collection name.

    ChromaDB requires 3-512 characters, only ``[a-zA-Z0-9._-]``,
    must start and end with ``[a-zA-Z0-9]``.
    """
    sanitized = re.sub(r"[^a-zA-Z0-9._-]", "_", name)
    sanitized = re.sub(r"_+", "_", sanitized)
    sanitized = sanitized.strip("._-")
    if len(sanitized) < 3:
        sanitized = f"store_{sanitized}" if sanitized else "store_default"
    if len(sanitized) > 500:
        sanitized = sanitized[:500].rstrip("._-")
    return sanitized


DEFAULT_STORE_PATH = os.path.join(_PROJECT_ROOT, "rag_stores")
STORE_FILES_SUBDIR = "files"

SUPPORTED_EXTENSIONS = {
    ".txt",
    ".md",
    ".py",
    ".js",
    ".ts",
    ".jsx",
    ".tsx",
    ".json",
    ".yaml",
    ".yml",
    ".toml",
    ".ini",
    ".cfg",
    ".html",
    ".css",
    ".scss",
    ".less",
    ".c",
    ".cpp",
    ".h",
    ".hpp",
    ".rs",
    ".go",
    ".java",
    ".kt",
    ".sh",
    ".bash",
    ".zsh",
    ".fish",
    ".sql",
    ".graphql",
    ".xml",
    ".csv",
    ".r",
    ".R",
    ".rmd",
    ".lua",
    ".rb",
    ".php",
    ".pl",
    ".pm",
    ".dockerfile",
    ".makefile",
    ".env",
    ".gitignore",
    ".dockerignore",
    ".rst",
    ".tex",
    ".bib",
    ".pdf",
}

MAX_FILE_SIZE = 15 * 1024 * 1024  # 15 MB
DEFAULT_CHUNK_SIZE = 1500
DEFAULT_CHUNK_OVERLAP = 200
CHROMA_MAX_BATCH = 5000

# Stores where content is log chunks only — no whole-file PG tables used on read.
CHUNK_ONLY_STORES = frozenset({"stargazer_logs"})


# ---------------------------------------------------------------------------
# Sensitive-path deny list (defense-in-depth for index_file/index_directory)
# ---------------------------------------------------------------------------
#
# Even when an admin runs ``rag_index_directory("/root/stargazer-v3")``
# legitimately, accidentally surfacing ``config.yaml`` or
# ``matrix_credentials.json`` into a queryable Chroma store would be a
# disaster: anyone with ``rag_search`` can then exfil the embedded chunks.
# These patterns are checked *after* the per-tool UNSANDBOXED_EXEC gate in
# ``tools/rag.py`` and apply to every caller including admins.

_SENSITIVE_BASENAME_PATTERNS: Tuple[str, ...] = (
    "config.yaml",
    "config.yml",
    ".env",
    ".env.*",
    "matrix_credentials*.json",
    "*.pem",
    "*.key",
    "id_rsa*",
    "id_ed25519*",
    "*_rsa",
    "*_ed25519",
    "*_rsa.pub",
    "*_ed25519.pub",
    "api_key_encryption_keys.db",
    "*.rdb",
    "memories_export*.json",
)
"""Filename glob patterns refused regardless of directory."""

_SENSITIVE_ANCESTOR_DIRS: Tuple[str, ...] = (
    "dna_vault",
    "nio_store",
    ".ssh",
    ".docker",
    ".gnupg",
)
"""Any path with one of these directory components in its ancestry is refused."""

_SENSITIVE_HOME_SUBPATHS: Tuple[str, ...] = (
    "~/.ssh",
    "~/.docker",
    "~/.config/gcloud",
    "~/.aws",
)
"""Resolved at call time via ``os.path.expanduser``; refused as prefix."""

_SENSITIVE_ABSOLUTE_PREFIXES: Tuple[str, ...] = (
    "/etc/shadow",
    "/etc/gshadow",
    "/etc/sudoers",
    "/etc/sudoers.d",
    "/etc/ssl/private",
    "/root/.ssh",
    "/root/.docker",
    "/root/.config/gcloud",
    "/root/.aws",
)
"""Absolute path prefixes refused (matched against ``realpath``)."""


def _is_sensitive_path(file_path: str) -> bool:
    """Return True if *file_path* (or its real target) is on the deny list.

    The check operates on ``os.path.realpath`` so symlinks don't bypass
    it. Compares filename via ``fnmatch``, walks ancestors for forbidden
    directory names, and tests both expanded ``~`` paths and absolute
    prefixes for prefix containment.
    """
    if not file_path:
        return False
    try:
        real = os.path.realpath(file_path)
    except (OSError, ValueError):
        return True

    p = Path(real)
    name = p.name

    for pat in _SENSITIVE_BASENAME_PATTERNS:
        if fnmatch.fnmatch(name, pat):
            return True

    parts_lower = {part.lower() for part in p.parts}
    for d in _SENSITIVE_ANCESTOR_DIRS:
        if d.lower() in parts_lower:
            return True

    for sub in _SENSITIVE_HOME_SUBPATHS:
        try:
            base = Path(os.path.expanduser(sub)).resolve()
        except (OSError, RuntimeError, ValueError):
            continue
        if str(base).startswith("~"):
            continue
        try:
            if p == base or p.is_relative_to(base):
                return True
        except (AttributeError, ValueError):
            if real == str(base) or real.startswith(str(base) + os.sep):
                return True

    for pref in _SENSITIVE_ABSOLUTE_PREFIXES:
        try:
            pref_path = Path(pref)
            if p == pref_path or p.is_relative_to(pref_path):
                return True
        except (AttributeError, ValueError):
            if real == pref or real.startswith(pref + os.sep):
                return True

    return False


# ---------------------------------------------------------------------------
# PDF helpers
# ---------------------------------------------------------------------------


[docs] def extract_pdf_text(file_path: str) -> Optional[str]: """Extract page-delimited plain text from a PDF on disk via PyMuPDF. Opens the file with ``fitz`` (PyMuPDF), concatenates the non-empty pages with ``--- Page N ---`` separators, and returns the joined text so callers get readable, source-traceable content for indexing. Reads the filesystem only; performs no embedding or network work. Returns ``None`` (and logs) when PyMuPDF is not installed, the document yields no extractable text, or any extraction error occurs, so the caller can fall back or skip the file. Called within this module by :meth:`FileRAGManager._read_file_content`, :meth:`FileRAGManager._load_whole_file_text`, and :func:`extract_pdf_text_from_bytes`. No external in-repo callers. Args: file_path (str): Absolute path to the PDF file. Returns: Optional[str]: The extracted text, or ``None`` on failure / empty PDF. """ try: import fitz # PyMuPDF doc = fitz.open(file_path) text_parts = [] for page_num, page in enumerate(doc, 1): page_text = page.get_text() if page_text.strip(): text_parts.append(f"--- Page {page_num} ---\n{page_text}") doc.close() if not text_parts: logger.warning("No text extracted from PDF: %s", file_path) return None return "\n\n".join(text_parts) except ImportError: logger.error( "PyMuPDF (fitz) not installed. Install with: pip install pymupdf", ) return None except Exception as e: logger.error("Failed to extract PDF text from %s: %s", file_path, e) return None
[docs] def extract_pdf_text_from_bytes(raw: bytes) -> Optional[str]: """Extract text from in-memory PDF bytes by staging a temp file. Wraps :func:`extract_pdf_text` for the byte-stream case (e.g. a PDF fetched from a URL that was never written to a permanent path): it writes *raw* to a ``NamedTemporaryFile``, extracts, and unlinks the temp file in a ``finally`` block so nothing is left on disk even on error. Touches the filesystem (one transient temp file) but no network or store state; returns ``None`` on any failure. Called within this module by :func:`decode_bytes_to_text` for ``.pdf`` inputs. No external in-repo callers. Args: raw (bytes): The raw PDF file contents. Returns: Optional[str]: The extracted text, or ``None`` on failure. """ try: with tempfile.NamedTemporaryFile(suffix=".pdf", delete=False) as tmp: tmp.write(raw) path = tmp.name try: return extract_pdf_text(path) finally: try: os.unlink(path) except OSError: pass except Exception as e: logger.error("Failed to extract PDF from bytes: %s", e) return None
[docs] def decode_bytes_to_text(raw: bytes, filename: str) -> Optional[str]: """Decode raw file bytes to text, dispatching on the filename extension. The single entry point for turning fetched or stored bytes into indexable text. For ``.pdf`` names it delegates to :func:`extract_pdf_text_from_bytes`; otherwise it tries a strict UTF-8 decode and falls back to ``latin-1`` so arbitrary byte streams still produce something usable. Pure aside from the temp file used by the PDF path; returns ``None`` only when even the ``latin-1`` fallback raises. Called within this module by :meth:`FileRAGManager._load_whole_file_text` and :meth:`FileRAGManager.index_url`, and externally by ``scripts/backfill_pg_source_files.py`` when re-deriving whole-file text. Args: raw (bytes): The raw file contents. filename (str): Name used only to detect a ``.pdf`` extension. Returns: Optional[str]: The decoded text, or ``None`` if decoding failed. """ if filename.lower().endswith(".pdf"): return extract_pdf_text_from_bytes(raw) try: return raw.decode("utf-8") except UnicodeDecodeError: try: return raw.decode("latin-1") except Exception: return None
[docs] def compress_pdf( file_path: str, output_path: Optional[str] = None, remove_images: bool = True, ) -> Tuple[str, int, int]: """Shrink an oversized PDF in place (or to a copy) before indexing. Used to bring PDFs that exceed ``MAX_FILE_SIZE`` under the limit so they can still be ingested. Opens the document with PyMuPDF, optionally strips every embedded image (the dominant size contributor), and rewrites it with ``ez_save()`` for object-stream/garbage-collected compaction. Writes to *output_path* when given, otherwise overwrites *file_path* in place, and logs the size reduction. Filesystem-only; no network or store interaction. Note that ``import fitz`` is unguarded here, so a missing PyMuPDF raises rather than returning a sentinel. Called within this module by :meth:`FileRAGManager._read_file_content` and :meth:`FileRAGManager.index_url` on the over-limit PDF path. No external in-repo callers. Args: file_path (str): Absolute path to the source PDF. output_path (Optional[str]): Destination path; ``None`` overwrites the source in place. remove_images (bool): Whether to delete embedded images before saving. Returns: Tuple[str, int, int]: ``(out_path, original_size, compressed_size)`` in bytes. Raises: ImportError: If PyMuPDF (``fitz``) is not installed. """ import fitz # PyMuPDF original_size = os.path.getsize(file_path) doc = fitz.open(file_path) if remove_images: images_removed = 0 for page in doc: for img in page.get_images(full=True): try: page.delete_image(img[0]) images_removed += 1 except Exception: pass if images_removed: logger.info("Removed %d images from PDF", images_removed) out_path = output_path or file_path doc.ez_save(out_path) doc.close() compressed_size = os.path.getsize(out_path) reduction = 100 - (compressed_size * 100 // original_size) if original_size else 0 logger.info( "Compressed PDF: %d -> %d bytes (%d%% reduction)", original_size, compressed_size, reduction, ) return out_path, original_size, compressed_size
# --------------------------------------------------------------------------- # Text chunking # ---------------------------------------------------------------------------
[docs] def chunk_text( text: str, chunk_size: int = DEFAULT_CHUNK_SIZE, overlap: int = DEFAULT_CHUNK_OVERLAP, ) -> List[str]: """Split text into overlapping chunks on paragraph and sentence boundaries. The shared chunker that turns whole documents into the embed-sized units the vector store indexes. It prefers natural boundaries — splitting on blank-line paragraphs first, then sentence breaks, and only hard-slicing when a single sentence still exceeds *chunk_size* — and carries an *overlap* tail of characters between consecutive chunks so semantic context is not lost at the seams. Pure string processing with no I/O; returns the input as a single chunk when it already fits. Called within this module by :meth:`FileRAGManager.index_file` and :meth:`FileRAGManager.index_url`, and externally by ``log_rag_ingest`` and the cloud-RAG ingest path in ``tools/cloud_rag.py``. Args: text (str): The full document text to split. chunk_size (int): Target maximum chunk length in characters. overlap (int): Number of trailing characters repeated into the next chunk for context continuity. Returns: List[str]: The ordered list of chunk strings (a single-element list when the text fits in one chunk). """ if len(text) <= chunk_size: return [text] chunks: List[str] = [] paragraphs = re.split(r"\n\n+", text) current_chunk = "" for para in paragraphs: if len(current_chunk) + len(para) + 2 > chunk_size: if current_chunk: chunks.append(current_chunk.strip()) current_chunk = ( current_chunk[-overlap:] if len(current_chunk) > overlap else "" ) if len(para) > chunk_size: sentences = re.split(r"(?<=[.!?])\s+", para) for sentence in sentences: if len(current_chunk) + len(sentence) + 1 > chunk_size: if current_chunk: chunks.append(current_chunk.strip()) current_chunk = ( current_chunk[-overlap:] if len(current_chunk) > overlap else "" ) if len(sentence) > chunk_size: for i in range( 0, len(sentence), chunk_size - overlap, ): chunks.append( sentence[i : i + chunk_size], ) else: current_chunk = sentence else: current_chunk += (" " + sentence) if current_chunk else sentence else: current_chunk = para else: current_chunk += ("\n\n" + para) if current_chunk else para if current_chunk.strip(): chunks.append(current_chunk.strip()) return chunks
# --------------------------------------------------------------------------- # URL fetching # ---------------------------------------------------------------------------
[docs] async def fetch_url_content( url: str, timeout: float = 30.0, ) -> Tuple[Optional[bytes], Optional[str], Optional[str]]: """Fetch content from *url* (SSRF-guarded). Returns ``(bytes, content_type, filename)``. Routes through :func:`tools._safe_http.safe_http_request`, which validates every redirect hop and pins each connect to a vetted public IP, so a user-supplied ingestion URL cannot be redirected (or DNS-rebound) into an internal host such as 10.10.0.x:6379. """ from tools._safe_http import safe_http_request, safe_httpx_client try: async with safe_httpx_client(timeout=timeout) as client: response = await safe_http_request( client, "GET", url, max_redirects=5 ) response.raise_for_status() content = response.content content_type = ( response.headers.get("content-type", "").split(";")[0].strip() ) filename = None cd = response.headers.get("content-disposition", "") if "filename=" in cd: match = re.search(r'filename[*]?=["\']?([^"\';\s]+)', cd) if match: filename = unquote(match.group(1)) if not filename: parsed = urlparse(url) path = unquote(parsed.path) if path and "/" in path: filename = path.split("/")[-1] if not filename or "." not in filename: filename = None if not filename: ext_map = { "application/pdf": "document.pdf", "text/plain": "document.txt", "application/json": "data.json", "text/html": "page.html", } filename = ext_map.get(content_type, "downloaded_file") return content, content_type, filename except Exception as e: import asyncio from observability import publish_http_error_event status = getattr(getattr(e, "response", None), "status_code", 0) asyncio.create_task( publish_http_error_event( http_service="file_rag_fetch", http_status=status, endpoint=url[:120], detail=str(e)[:500], error_kind="network" if status == 0 else "", ) ) logger.error("Error fetching %s: %s", url, e) return None, None, None
# --------------------------------------------------------------------------- # FileRAGManager # ---------------------------------------------------------------------------
[docs] class FileRAGManager: """File- and URL-oriented RAG store over Postgres pgvector plus whole-file tables. One instance manages a single named store: a per-store Postgres schema holding a ``files_<schema>`` vector table (``halfvec(3072)`` + HNSW) for chunk embeddings, plus ``documents`` / ``source_files`` tables that keep the full original text and raw bytes so search can return entire files rather than just the matched chunk. It wraps the pgvector table in a Chroma-shaped facade (``vector_store.ChromaCompatCollection``) and embeds text through :class:`rag_system.openrouter_embeddings.SyncOpenRouterEmbeddings` (Gemini by default), so indexing and search reach the embedding provider over HTTP while persistence stays in Postgres. Stores listed in ``CHUNK_ONLY_STORES`` (e.g. ``stargazer_logs``) skip the whole-file tables entirely. Instances are normally obtained through the module-level LRU registry :func:`get_rag_store` (and :func:`get_stargazer_docs_store`) rather than constructed directly; the RAG tool handlers in ``tools/rag.py`` / ``tools/cloud_rag.py``, :class:`rag_system.auto_search.RAGAutoSearchManager`, and ``starwiki/rag_integration`` all go through that registry, with ``starwiki`` being the one place that instantiates :class:`FileRAGManager` directly. """
[docs] def __init__( self, store_name: str = "default", store_path: Optional[str] = None, api_key: Optional[str] = None, embedding_model: str = "google/gemini-embedding-001", max_file_size: int = MAX_FILE_SIZE, gemini_only: bool = True, document_task_type: Optional[str] = None, query_task_type: Optional[str] = None, ): """Initialize the instance. Args: store_name (str): The store name value. store_path (Optional[str]): The store path value. api_key (Optional[str]): The api key value. embedding_model (str): The embedding model value. max_file_size (int): The max file size value. gemini_only (bool): Use only the Gemini API for embeddings. document_task_type: Optional Gemini task type for indexed text (e.g. ``RETRIEVAL_DOCUMENT``). query_task_type: Optional Gemini task type for search queries (e.g. ``RETRIEVAL_QUERY``). """ self.store_name = store_name self.store_path = store_path or DEFAULT_STORE_PATH self.embedding_model = embedding_model self.max_file_size = max_file_size self._sanitized_name = _sanitize_collection_name(store_name) self._collection_name = f"files_{self._sanitized_name}" # Legacy disk dir (read-only fallback for unmigrated files). self.db_path = os.path.join(self.store_path, self._sanitized_name) self.files_path = os.path.join(self.db_path, STORE_FILES_SUBDIR) self.embedding_fn = SyncOpenRouterEmbeddings( api_key=api_key, model=embedding_model, gemini_only=gemini_only, document_task_type=document_task_type, query_task_type=query_task_type, ) # Postgres + pgvector: one schema per store, ``files_<store>`` vector # table (halfvec(3072) + HNSW). ``self.collection`` is a Chroma-shaped # facade so the rest of this class is unchanged. from vector_store import ( ChromaCompatCollection, PgVectorCollection, pg_ident, ) self._schema = pg_ident(self._sanitized_name) self._table = pg_ident(self._collection_name) self._pg = PgVectorCollection(self._schema, self._table) self._pg.ensure() from .pg_source_files import ensure_source_tables if self._sanitized_name not in CHUNK_ONLY_STORES: ensure_source_tables(self._schema) self.collection = ChromaCompatCollection(self._pg, self.embedding_fn) logger.info( "Initialized FileRAGManager '%s' (pgvector %s.%s; whole files in PG)", store_name, self._schema, self._table, )
# -- helpers ------------------------------------------------------------- def _pg_whole_files_enabled(self) -> bool: """Report whether this store keeps whole-file Postgres tables. Returns ``True`` for ordinary stores and ``False`` for chunk-only stores (those whose sanitized name is in the module-level ``CHUNK_ONLY_STORES`` frozenset, e.g. ``stargazer_logs``), which index log chunks only and therefore never populate or read the ``documents`` / ``source_files`` tables. It performs a pure in-memory set membership test on ``self._sanitized_name`` and touches no I/O. Used throughout this class as a guard before any whole-file Postgres work: it is called by :meth:`_upsert_whole_file`, :meth:`_load_whole_file_text`, :meth:`search`, :meth:`list_store_files` and :meth:`clear` to decide whether to delegate into ``rag_system.pg_source_files``. Returns: bool: ``True`` if whole-file PG storage applies to this store, ``False`` for chunk-only stores. """ return self._sanitized_name not in CHUNK_ONLY_STORES def _upsert_whole_file( self, filename: str, content: str, raw_bytes: bytes, extraction_method: str = "text", ) -> None: """Persist a file's full text and raw bytes into the whole-file PG tables. Backs the "return the entire file on search" behaviour: it computes a SHA-256 over *raw_bytes* and delegates to ``rag_system.pg_source_files.upsert_whole_file`` to write the ``documents`` (extracted text) and ``source_files`` (raw bytes) rows for this store's schema. A no-op for chunk-only stores, gated up front by :meth:`_pg_whole_files_enabled`. Side effect is a Postgres write only; the vector/chunk upsert happens separately in the index methods. Called within this class by :meth:`index_file` and :meth:`index_url` after the source content has been read/decoded. Args: filename (str): Stored filename key for the document. content (str): Extracted plain-text content to persist. raw_bytes (bytes): Original file bytes (also hashed for the SHA). extraction_method (str): How *content* was produced, e.g. ``"text"`` or ``"pdf"``. Returns: None """ if not self._pg_whole_files_enabled(): return from .pg_source_files import upsert_whole_file sha = hashlib.sha256(raw_bytes).hexdigest() upsert_whole_file( self._schema, filename, sha, content, raw_bytes, extraction_method=extraction_method, ) def _load_whole_file_text(self, filename: str) -> Optional[str]: """Resolve the full text of a stored file across all storage tiers. The canonical "give me the whole file" lookup, tried in priority order: the Postgres ``documents`` table (pre-extracted text), then the ``source_files`` raw bytes decoded via :func:`decode_bytes_to_text`, and finally the legacy on-disk ``files`` directory (PDF-extracted or UTF-8/latin-1 decoded) for stores predating the Postgres migration. The PG tiers are skipped entirely for chunk-only stores via :meth:`_pg_whole_files_enabled`. Reads Postgres and the filesystem; returns ``None`` when the file is found in none of the tiers. Called within this class by :meth:`search` (whole-file content mode) and :meth:`read_store_file`. Args: filename (str): Stored filename to resolve. Returns: Optional[str]: The full file text, or ``None`` if not found anywhere. """ if not filename: return None if self._pg_whole_files_enabled(): from .pg_source_files import get_document_text, get_source_file_bytes text = get_document_text(self._schema, filename) if text: return text raw = get_source_file_bytes(self._schema, filename) if raw is not None: decoded = decode_bytes_to_text(raw, filename) if decoded: return decoded if os.path.isdir(self.files_path): fp = os.path.join(self.files_path, filename) if os.path.isfile(fp): if filename.lower().endswith(".pdf"): return extract_pdf_text(fp) try: with open(fp, "r", encoding="utf-8") as f: return f.read() except UnicodeDecodeError: with open(fp, "r", encoding="latin-1") as f: return f.read() return None @staticmethod def _compute_file_hash(content: str) -> str: """Compute the SHA-256 content hash used for index deduplication. Returns the hex digest of the UTF-8-encoded *content*. Stored in chunk metadata as ``content_hash`` so re-indexing an unchanged file (same path or URL, same hash) can be skipped instead of re-embedding. Pure, no I/O. Called within this class by :meth:`index_file` and :meth:`index_url`. Args: content (str): The extracted file text to hash. Returns: str: The hex-encoded SHA-256 digest. """ return hashlib.sha256(content.encode("utf-8")).hexdigest() @staticmethod def _get_file_id(file_path: str) -> str: """Derive a stable, path-based document id for a local file. Normalizes *file_path* to an absolute, canonical form and returns its MD5 hex digest, giving every distinct on-disk path one deterministic id that is reused as the chunk-id prefix (``<file_id>_chunk_<n>``). Re-indexing the same path therefore lands on the same ids, enabling clean upsert/replace. Pure, no I/O. Called within this class by :meth:`index_file` to seed chunk ids. Args: file_path (str): The local file path to derive an id from. Returns: str: A hex-encoded MD5 digest of the normalized absolute path. """ normalized = os.path.normpath(os.path.abspath(file_path)) return hashlib.md5(normalized.encode("utf-8")).hexdigest() @staticmethod def _is_supported_file(file_path: str) -> bool: """Decide whether a file's type is eligible for indexing. Gatekeeps indexing to known text/code/document types: it accepts the file when its lower-cased suffix is in the module-level ``SUPPORTED_EXTENSIONS`` set, or when the bare (extensionless) filename is a recognized special file such as ``dockerfile``, ``makefile``, ``readme``, ``license`` or ``changelog``. Pure string inspection with no filesystem access; this is a type check, separate from the security-oriented :func:`_is_sensitive_path` deny list. Called within this class by :meth:`index_file`, and externally by ``scripts/backfill_pg_source_files.py`` when scanning a directory. Args: file_path (str): Path whose extension / basename is examined. Returns: bool: ``True`` if the file type is supported for indexing. """ ext = Path(file_path).suffix.lower() filename = Path(file_path).name.lower() known_files = { "dockerfile", "makefile", "readme", "license", "changelog", } return ext in SUPPORTED_EXTENSIONS or filename in known_files def _read_file_content(self, file_path: str) -> Optional[str]: """Read a local file's text content, honoring size limits and PDFs. The disk-side counterpart used by indexing to turn a path into text. It enforces ``self.max_file_size``: oversized PDFs are routed through :func:`compress_pdf` and re-extracted if they fit afterwards, while other oversized files are refused (logged, ``None``). PDFs are extracted with :func:`extract_pdf_text`; everything else is read as UTF-8 with a ``latin-1`` fallback. Reads the filesystem (and may write/remove a transient ``.compressed.pdf``); returns ``None`` on any read/extract failure. Called within this class by :meth:`index_file`. Args: file_path (str): Absolute path to the file to read. Returns: Optional[str]: The file's text content, or ``None`` if it could not be read, was too large, or yielded no text. """ try: file_size = os.path.getsize(file_path) is_pdf = file_path.lower().endswith(".pdf") if file_size > self.max_file_size: if is_pdf: logger.info( "PDF exceeds size limit, attempting compression...", ) compressed_path = file_path + ".compressed.pdf" try: _, _, compressed_size = compress_pdf( file_path, compressed_path, ) if compressed_size <= self.max_file_size: content = extract_pdf_text(compressed_path) try: os.remove(compressed_path) except OSError: pass return content try: os.remove(compressed_path) except OSError: pass except Exception: pass return None logger.warning( "File too large (>%d bytes): %s", self.max_file_size, file_path, ) return None if is_pdf: return extract_pdf_text(file_path) try: with open(file_path, "r", encoding="utf-8") as f: return f.read() except UnicodeDecodeError: with open(file_path, "r", encoding="latin-1") as f: return f.read() except Exception as e: logger.error("Failed to read file %s: %s", file_path, e) return None @staticmethod def _create_embedding_text(file_path: str, content: str) -> str: """Prepend a File/Path/Type header to chunk text before embedding. Enriches each chunk with lightweight provenance — the filename, the last three path components, and the file extension — so the embedding (and any later keyword match) carries signal about where the content came from, not just the body. The header is later stripped on read by :meth:`_strip_embedding_header` so it never reaches the user. Pure string formatting, no I/O. Called within this class by :meth:`index_file` for both the chunked and single-document indexing paths. Args: file_path (str): Source path used to derive the header metadata. content (str): The chunk body to embed. Returns: str: The header-prefixed text to embed. """ path_parts = Path(file_path).parts filename = Path(file_path).name ext = Path(file_path).suffix return ( f"File: {filename}\nPath: {'/'.join(path_parts[-3:])}\n" f"Type: {ext or 'unknown'}\n\n{content}" ) @staticmethod def _strip_embedding_header(doc: str) -> str: """Strip the File/Path/Type provenance header back off a stored chunk. The inverse of :meth:`_create_embedding_text`: it drops everything up to and including the first blank-line separator so callers get the raw chunk body without the embedding-only header. Falls back to the trimmed original when no separator is present (or the body would be empty), so it never returns nothing for non-empty input. Pure, no I/O. Called within this class by :meth:`search` when assembling chunk-mode result content. Args: doc (str): The stored chunk text, possibly header-prefixed. Returns: str: The chunk body with the header removed (or the trimmed input as a fallback). """ if not doc: return "" marker = "\n\n" idx = doc.find(marker) if idx == -1: return doc.strip() body = doc[idx + len(marker) :].strip() return body if body else doc.strip() # -- index ---------------------------------------------------------------
[docs] def index_file( self, file_path: str, tags: Optional[List[str]] = None, use_chunking: bool = True, chunk_size: int = DEFAULT_CHUNK_SIZE, chunk_overlap: int = DEFAULT_CHUNK_OVERLAP, force: bool = False, ) -> Dict[str, Any]: """Index a single file into the collection. When *force* is True the content-hash dedup check is skipped so the file is always re-embedded (but the store is **not** cleared). """ file_path = os.path.abspath(file_path) if not os.path.exists(file_path): return {"success": False, "error": f"File not found: {file_path}"} if not os.path.isfile(file_path): return {"success": False, "error": f"Not a file: {file_path}"} if _is_sensitive_path(file_path): logger.warning( "RAG index_file refused sensitive path: %s", file_path, ) return { "success": False, "skipped_sensitive": True, "error": ( f"Refusing to index sensitive path: {file_path} " "(matches the RAG sensitive-file deny list)." ), } if not self._is_supported_file(file_path): return { "success": False, "error": f"Unsupported file type: {file_path}", } content = self._read_file_content(file_path) if content is None: return { "success": False, "error": f"Failed to read file: {file_path}", } try: with open(file_path, "rb") as f: raw_bytes = f.read() except OSError as e: return {"success": False, "error": f"Failed to read file bytes: {e}"} filename = Path(file_path).name extraction_method = "pdf" if filename.lower().endswith(".pdf") else "text" self._upsert_whole_file( filename, content, raw_bytes, extraction_method=extraction_method, ) file_id = self._get_file_id(file_path) content_hash = self._compute_file_hash(content) try: existing = self.collection.get( where={"file_path": file_path}, include=["metadatas"], ) if existing and existing.get("metadatas"): if not force: meta0 = existing["metadatas"][0] if meta0.get("content_hash") == content_hash: return { "success": True, "action": "skipped", "file_path": file_path, "reason": "content unchanged", } old_ids = existing.get("ids", []) if old_ids: self.collection.delete(ids=old_ids) except Exception: pass base_metadata: Dict[str, Any] = { "file_path": file_path, "filename": Path(file_path).name, "extension": Path(file_path).suffix, "content_hash": content_hash, "file_size": len(content), "indexed_at": datetime.now(timezone.utc).isoformat(), "tags": json.dumps(tags or []), "store_name": self.store_name, "source_type": "local", } try: if use_chunking and len(content) > chunk_size: chunks = [ c for c in chunk_text(content, chunk_size, chunk_overlap) if c and c.strip() ] if not chunks: chunks = [ content[:chunk_size] if len(content) > chunk_size else content ] ids, documents, metadatas = [], [], [] for i, chk in enumerate(chunks): chunk_meta = base_metadata.copy() chunk_meta.update( { "chunk_index": i, "chunk_count": len(chunks), "is_chunked": True, } ) ids.append(f"{file_id}_chunk_{i}") documents.append( self._create_embedding_text(file_path, chk), ) metadatas.append(chunk_meta) for start in range(0, len(ids), CHROMA_MAX_BATCH): end = start + CHROMA_MAX_BATCH self.collection.upsert( ids=ids[start:end], documents=documents[start:end], metadatas=metadatas[start:end], ) logger.info( "Indexed file with %d chunks: %s", len(chunks), file_path, ) return { "success": True, "action": "indexed", "file_id": file_id, "file_path": file_path, "file_size": len(content), "chunk_count": len(chunks), } base_metadata.update( { "is_chunked": False, "chunk_index": 0, "chunk_count": 1, } ) self.collection.upsert( ids=[file_id], documents=[self._create_embedding_text(file_path, content)], metadatas=[base_metadata], ) logger.info("Indexed file: %s", file_path) return { "success": True, "action": "indexed", "file_id": file_id, "file_path": file_path, "file_size": len(content), } except Exception as e: logger.error( "Failed to index file %s: %s", file_path, e, exc_info=True, ) return {"success": False, "error": str(e)}
[docs] async def index_url( self, url: str, tags: Optional[List[str]] = None, use_chunking: bool = True, chunk_size: int = DEFAULT_CHUNK_SIZE, chunk_overlap: int = DEFAULT_CHUNK_OVERLAP, ) -> Dict[str, Any]: """Fetch a remote document by URL and index it into this store. The URL ingestion counterpart of :meth:`index_file`. It downloads via the SSRF-guarded :func:`fetch_url_content`, derives a stable stored filename (URL-hash prefixed and sanitized), compresses oversized PDFs through :func:`compress_pdf`, decodes the bytes to text with :func:`decode_bytes_to_text`, and persists the whole file to Postgres via :meth:`_upsert_whole_file`. It then dedups against any prior copy of the same ``source_url`` by content hash and, unless unchanged, upserts the embedded chunks (or a single document) into the pgvector collection with ``source_type="url"`` metadata. Touches the network (download), the embedding provider (via the collection upsert), Postgres, and a transient temp file for PDF compression. Called by the ``rag_index_url`` tool handler in ``tools/rag.py``. Args: url (str): The document URL to fetch and index. tags (Optional[List[str]]): Optional tag labels stored in metadata. use_chunking (bool): Whether to chunk large content before embedding. chunk_size (int): Target chunk length in characters. chunk_overlap (int): Overlap carried between chunks. Returns: Dict[str, Any]: A result dict with ``success`` and, on success, ``action`` (``"indexed"`` or ``"skipped"``), ``url``, ``filename``, sizes and ``stored_path``; on failure an ``error`` message. """ content_bytes, content_type, filename = await fetch_url_content(url) if content_bytes is None: return {"success": False, "error": f"Failed to fetch URL: {url}"} url_hash = hashlib.md5(url.encode()).hexdigest()[:12] ext = Path(filename).suffix if filename else "" if not ext: ext_map = { "application/pdf": ".pdf", "text/plain": ".txt", "application/json": ".json", "text/html": ".html", } ext = ext_map.get(content_type, ".txt") is_pdf = ext.lower() == ".pdf" file_size = len(content_bytes) if file_size > self.max_file_size and not is_pdf: return { "success": False, "error": ( f"File too large: {file_size} bytes " f"(limit: {self.max_file_size})" ), } base_filename = Path(filename).name if filename else "downloaded" base_filename = re.sub(r'[/\\:*?"<>|]', "_", base_filename) stored_filename = f"{url_hash}_{base_filename}" if not stored_filename.endswith(ext): stored_filename += ext work_bytes = content_bytes if is_pdf and file_size > self.max_file_size: try: with tempfile.NamedTemporaryFile( suffix=".pdf", delete=False ) as tmp_in: tmp_in.write(content_bytes) in_path = tmp_in.name compressed_path = in_path + ".compressed.pdf" try: _, _, compressed_size = compress_pdf(in_path, compressed_path) if compressed_size <= self.max_file_size: with open(compressed_path, "rb") as f: work_bytes = f.read() else: return { "success": False, "error": ( f"PDF still too large after compression: " f"{compressed_size}" ), } finally: for p in (compressed_path, in_path): try: os.unlink(p) except OSError: pass except Exception as e: return {"success": False, "error": f"PDF compression failed: {e}"} if is_pdf: content = decode_bytes_to_text(work_bytes, stored_filename) if content is None: return { "success": False, "error": "Failed to extract text from PDF", } else: content = decode_bytes_to_text(work_bytes, stored_filename) if content is None: return { "success": False, "error": "Failed to decode content", } self._upsert_whole_file( stored_filename, content, work_bytes, extraction_method="pdf" if is_pdf else "text", ) stored_path = f"pg://{self.store_name}/{stored_filename}" content_hash = self._compute_file_hash(content) file_id = hashlib.md5(url.encode()).hexdigest() try: existing = self.collection.get( where={"source_url": url}, include=["metadatas"], ) if existing and existing.get("metadatas"): meta0 = existing["metadatas"][0] if meta0.get("content_hash") == content_hash: return { "success": True, "action": "skipped", "url": url, "reason": "content unchanged", } old_ids = existing.get("ids", []) if old_ids: self.collection.delete(ids=old_ids) except Exception: pass base_metadata: Dict[str, Any] = { "file_path": stored_path, "filename": stored_filename, "extension": ext, "content_hash": content_hash, "file_size": len(content), "indexed_at": datetime.now(timezone.utc).isoformat(), "tags": json.dumps(tags or []), "store_name": self.store_name, "source_type": "url", "source_url": url, } try: if use_chunking and len(content) > chunk_size: chunks = [ c for c in chunk_text(content, chunk_size, chunk_overlap) if c and c.strip() ] if not chunks: chunks = [ content[:chunk_size] if len(content) > chunk_size else content ] ids, documents, metadatas = [], [], [] for i, chk in enumerate(chunks): chunk_meta = base_metadata.copy() chunk_meta.update( { "chunk_index": i, "chunk_count": len(chunks), "is_chunked": True, } ) ids.append(f"{file_id}_chunk_{i}") doc_text = f"File: {stored_filename}\nURL: {url}\n" doc_text += f"Type: {ext}\n\n{chk}" documents.append(doc_text) metadatas.append(chunk_meta) self.collection.upsert( ids=ids, documents=documents, metadatas=metadatas, ) logger.info( "Indexed URL with %d chunks: %s", len(chunks), url, ) return { "success": True, "action": "indexed", "url": url, "filename": stored_filename, "file_size": len(content), "chunk_count": len(chunks), "stored_path": stored_path, } base_metadata.update( { "is_chunked": False, "chunk_index": 0, "chunk_count": 1, } ) doc_text = f"File: {stored_filename}\nURL: {url}\n" doc_text += f"Type: {ext}\n\n{content}" self.collection.upsert( ids=[file_id], documents=[doc_text], metadatas=[base_metadata], ) logger.info("Indexed URL: %s", url) return { "success": True, "action": "indexed", "url": url, "filename": stored_filename, "file_size": len(content), "stored_path": stored_path, } except Exception as e: logger.error("Failed to index URL %s: %s", url, e, exc_info=True) return {"success": False, "error": str(e)}
[docs] def index_directory( self, directory_path: str, recursive: bool = True, tags: Optional[List[str]] = None, exclude_patterns: Optional[List[str]] = None, max_workers: int = 6, force: bool = False, allowed_extensions: Optional[Collection[str]] = None, ) -> Dict[str, Any]: """Index all supported files in *directory_path*. When *max_workers* > 1, files are indexed concurrently using a thread pool. Each file's embedding batches are already parallelised inside the embedding function, so even ``max_workers=1`` benefits from concurrent API calls. *force* bypasses the per-file content-hash dedup check without clearing the store, so already-indexed files get re-embedded. When *allowed_extensions* is set, only files whose suffix (after normalizing to a leading dot, lowercase) appears in the collection are queued; ``None`` means no extension filter (all supported types under *SUPPORTED_EXTENSIONS*). """ directory_path = os.path.abspath(directory_path) if not os.path.isdir(directory_path): return { "success": False, "error": f"Not a directory: {directory_path}", } exclude_patterns = exclude_patterns or [ "__pycache__", ".git", "node_modules", ".venv", "venv", ".env", "*.pyc", "*.pyo", ".DS_Store", ] results: Dict[str, Any] = { "success": True, "indexed": 0, "skipped": 0, "failed": 0, "files": [], } def should_exclude(path: str) -> bool: """Return True if *path* matches any directory exclude pattern. Closure defined inside :meth:`index_directory` that closes over that call's resolved *exclude_patterns* list (defaults such as ``__pycache__``, ``.git``, ``node_modules`` plus suffix globs like ``*.pyc``). Patterns beginning with ``*`` are treated as suffix matches against the lower-cased path; all others are tested as plain case-sensitive substrings. This is purely string logic with no filesystem access and is distinct from :func:`_is_sensitive_path`, which handles the security deny list. It is called by the surrounding ``index_directory`` walk both to prune traversed subdirectories and to skip individual files; it has no other callers. Args: path (str): A directory or file path encountered during the walk. Returns: bool: ``True`` if the path should be excluded from indexing. """ path_lower = path.lower() for pat in exclude_patterns: # type: ignore[union-attr] if pat.startswith("*"): if path_lower.endswith(pat[1:]): return True elif pat in path: return True return False file_paths: List[str] = [] sensitive_skipped: List[str] = [] walker = ( os.walk(directory_path) if recursive else [(directory_path, [], os.listdir(directory_path))] ) for root, dirs, files in walker: if recursive: dirs[:] = [ d for d in dirs if not should_exclude(d) and not _is_sensitive_path(os.path.join(root, d)) ] for fname in files: file_path = os.path.join(root, fname) if should_exclude(file_path): continue if _is_sensitive_path(file_path): sensitive_skipped.append(file_path) continue if not recursive and not os.path.isfile(file_path): continue file_paths.append(file_path) if sensitive_skipped: logger.warning( "RAG index_directory skipped %d sensitive files under %s", len(sensitive_skipped), directory_path, ) results["sensitive_skipped"] = len(sensitive_skipped) if allowed_extensions is not None: ext_set: set[str] = set() for raw in allowed_extensions: e = raw.strip().lower() if not e: continue if not e.startswith("."): e = "." + e ext_set.add(e) if ext_set: file_paths = [ fp for fp in file_paths if Path(fp).suffix.lower() in ext_set ] total_files = len(file_paths) done_count = 0 def _record(result: Dict[str, Any], lock=None) -> None: """Fold one per-file index result into the running totals. Closure inside :meth:`index_directory` that mutates the enclosing ``results`` dict and the ``nonlocal done_count`` counter. It appends *result* to ``results["files"]`` and bumps the ``indexed`` / ``skipped`` / ``failed`` tally based on the result's ``success`` flag and ``action`` field, then emits an ``info`` progress log every 25 files (and on the final file). Because the concurrent code path calls it from multiple ``ThreadPoolExecutor`` worker threads, an optional :class:`threading.Lock` is acquired around the mutation and released in a ``finally`` block; the sequential path passes ``None`` and skips locking. It is called only by the surrounding ``index_directory`` (once per file, from either the threaded or sequential branch). Args: result (Dict[str, Any]): The dict returned by :meth:`index_file` for a single file. lock: Optional :class:`threading.Lock` guarding the shared aggregation state; ``None`` in the single-threaded path. Returns: None """ nonlocal done_count if lock: lock.acquire() try: results["files"].append(result) if result.get("success"): key = "indexed" if result.get("action") == "indexed" else "skipped" results[key] += 1 else: results["failed"] += 1 done_count += 1 if done_count % 25 == 0 or done_count == total_files: logger.info( "Progress: %d / %d files (%d indexed, %d skipped, %d failed)", done_count, total_files, results["indexed"], results["skipped"], results["failed"], ) finally: if lock: lock.release() if max_workers > 1 and len(file_paths) > 1: import threading from concurrent.futures import ThreadPoolExecutor, as_completed lock = threading.Lock() def _index_one(fp: str) -> Dict[str, Any]: """Index a single file, binding the loop's tags and force flag. Thin worker closure defined inside :meth:`index_directory`'s threaded branch. It forwards *fp* to :meth:`index_file` while capturing the surrounding call's *tags* and *force* arguments, so each :class:`~concurrent.futures.ThreadPoolExecutor` task runs the full per-file index (embedding, whole-file upsert, dedup) for one path. It exists only to give the executor a callable with a uniform one-argument signature. It is submitted to the thread pool once per file by the enclosing ``index_directory`` and has no other callers. Args: fp (str): Absolute path of the file to index. Returns: Dict[str, Any]: The result dict from :meth:`index_file`. """ return self.index_file(fp, tags=tags, force=force) workers = min(max_workers, len(file_paths)) with ThreadPoolExecutor(max_workers=workers) as pool: futures = {pool.submit(_index_one, fp): fp for fp in file_paths} for future in as_completed(futures): _record(future.result(), lock=lock) else: for file_path in file_paths: result = self.index_file(file_path, tags=tags, force=force) _record(result) logger.info( "Directory indexing complete: %d indexed, %d skipped, %d failed", results["indexed"], results["skipped"], results["failed"], ) return results
# -- search -------------------------------------------------------------- @staticmethod def _merge_chunks( chunks: List[tuple], max_content_size: int, ) -> str: """Merge matched chunks into a single string. *chunks* is a list of ``(chunk_index, chunk_text)`` tuples sorted by ``chunk_index``. Adjacent chunks (index differs by 1) are joined directly; non-adjacent chunks are separated by a ``[...]`` marker so the reader knows content was skipped. """ if not chunks: return "" merged_parts: List[str] = [] current_size = 0 prev_idx: int | None = None for idx, text in chunks: text = text.strip() if not text: continue if current_size + len(text) + 10 > max_content_size: remaining = max_content_size - current_size - 10 if remaining > 200: merged_parts.append(text[:remaining].rstrip()) break if prev_idx is not None: if idx == prev_idx + 1: merged_parts.append(text) else: merged_parts.append("\n\n[...]\n\n" + text) else: merged_parts.append(text) current_size += len(text) + 10 prev_idx = idx result = "\n".join(merged_parts) return result
[docs] def search( self, query: str, n_results: int = 5, tags: Optional[List[str]] = None, return_content: bool = True, query_embedding: list[float] | None = None, max_content_size: int = 8000, content_mode: Literal["whole", "chunks"] = "whole", ) -> List[Dict[str, Any]]: """Semantic search returning one result per matched file. *content_mode* ``whole`` loads Postgres ``documents`` when available; ``chunks`` returns the best KNN-matched indexed chunk only. """ import time t0 = time.monotonic() if query_embedding is None and not (query or "").strip(): return [] try: where_filter = None if tags: where_filter = { "$or": [{"tags": {"$contains": tag}} for tag in tags], } query_kwargs: Dict[str, Any] = { "n_results": n_results * 5, "where": where_filter, "include": ["metadatas", "distances", "documents"], } if query_embedding is not None: query_kwargs["query_embeddings"] = [query_embedding] else: query_kwargs["query_texts"] = [query] results = self.collection.query(**query_kwargs) if not results or not results.get("metadatas"): return [] documents = results.get("documents", [[]])[0] file_chunks: Dict[str, Dict[str, Any]] = {} for i, metadata in enumerate(results["metadatas"][0]): file_path = metadata.get("file_path", "") distance = ( results["distances"][0][i] if results.get("distances") else 1.0 ) chunk_index = int(metadata.get("chunk_index", 0)) chunk_text = documents[i] if i < len(documents) else "" if file_path not in file_chunks: file_chunks[file_path] = { "metadata": metadata, "best_distance": distance, "chunks": [], } entry = file_chunks[file_path] if distance < entry["best_distance"]: entry["best_distance"] = distance entry["metadata"] = metadata entry["chunks"].append((distance, chunk_index, chunk_text)) output = [] sorted_files = sorted( file_chunks.items(), key=lambda x: x[1]["best_distance"], ) for file_path, data in sorted_files[:n_results]: metadata = data["metadata"] best_distance = data["best_distance"] entry: Dict[str, Any] = { "file_path": file_path, "filename": metadata.get("filename", ""), "extension": metadata.get("extension", ""), "file_size": metadata.get("file_size", 0), "indexed_at": metadata.get("indexed_at", ""), "tags": json.loads(metadata.get("tags", "[]")), "similarity_score": ( 1.0 - best_distance if best_distance is not None else None ), "source_type": metadata.get("source_type", "local"), "source_url": metadata.get("source_url", None), } if return_content: if content_mode == "chunks": best_distance, chunk_index, chunk_text = min( data["chunks"], key=lambda c: c[0], ) entry["chunk_index"] = chunk_index stripped = self._strip_embedding_header(chunk_text) if len(stripped) > max_content_size: stripped = stripped[:max_content_size] entry["content"] = ( stripped if stripped else "[No chunk text available]" ) else: fname = metadata.get("filename", "") or Path( file_path ).name whole_text = ( self._load_whole_file_text(fname) if fname and self._pg_whole_files_enabled() else None ) if whole_text: entry["content"] = ( whole_text[:max_content_size] if len(whole_text) > max_content_size else whole_text ) else: ranked = sorted( data["chunks"], key=lambda c: c[0], ) by_index = sorted( [(ci, ct) for _, ci, ct in ranked], key=lambda x: x[0], ) seen_indices: set = set() deduped = [] for ci, ct in by_index: if ci not in seen_indices: seen_indices.add(ci) deduped.append((ci, ct)) content = self._merge_chunks(deduped, max_content_size) entry["content"] = ( content if content else "[No chunk text available]" ) output.append(entry) logger.info( "Search query '%s...' returned %d unique files", query[:50], len(output), ) best_sim = ( output[0]["similarity_score"] if output and output[0].get("similarity_score") is not None else 0.0 ) from observability import publish_debug_event import asyncio try: loop = asyncio.get_running_loop() loop.create_task( publish_debug_event( "rag_retrieval", "rag_system", phase="search", status="ok" if output else "empty", duration_ms=(time.monotonic() - t0) * 1000, preview=f"hits={len(output)} best_sim={best_sim:.2f} query_len={len(query)}", payload={ "hits": len(output), "best_sim": best_sim, "query_len": len(query), }, ), name="obs_rag_retrieval_ok", ) except RuntimeError: # No running loop (sync context) pass return output except Exception as e: logger.error("Search failed: %s", e, exc_info=True) from observability import publish_debug_event import asyncio try: loop = asyncio.get_running_loop() loop.create_task( publish_debug_event( "rag_retrieval", "rag_system", phase="search", status="error", duration_ms=(time.monotonic() - t0) * 1000, preview=f"error={str(e)[:100]}", payload={"error": str(e), "query_len": len(query)}, ), name="obs_rag_retrieval_err", ) except RuntimeError: pass return []
# -- remove / list / stats -----------------------------------------------
[docs] def remove_file(self, file_path: str) -> Dict[str, Any]: """Remove every index entry (and the whole-file rows) for one local file. Resolves *file_path* to an absolute path, looks up all chunk ids stored under that ``file_path`` metadata, deletes them from the pgvector collection, and then drops the matching ``documents`` / ``source_files`` rows for each affected filename via ``rag_system.pg_source_files.delete_whole_file`` so no orphaned whole-file text survives. Writes to Postgres only; returns a failure dict when the path is not present in the index. Called by the ``rag_remove_file`` tool handler in ``tools/rag.py`` (run off the event loop with ``asyncio.to_thread``). Args: file_path (str): Path of the indexed file to remove. Returns: Dict[str, Any]: ``{"success": True, "file_path", "entries_removed"}`` on success, otherwise ``{"success": False, "error": ...}``. """ file_path = os.path.abspath(file_path) try: existing = self.collection.get( where={"file_path": file_path}, include=["metadatas"], ) if existing and existing.get("ids"): metas = existing.get("metadatas") or [] filenames = { m.get("filename") for m in metas if m and m.get("filename") } self.collection.delete(ids=existing["ids"]) from .pg_source_files import delete_whole_file for fn in filenames: delete_whole_file(self._schema, fn) return { "success": True, "file_path": file_path, "entries_removed": len(existing["ids"]), } return { "success": False, "error": f"File not found in index: {file_path}", } except Exception as e: return {"success": False, "error": str(e)}
[docs] def remove_url(self, url: str) -> Dict[str, Any]: """Remove every index entry (and whole-file rows) for one indexed URL. The URL analogue of :meth:`remove_file`: it finds all chunk ids carrying the given ``source_url`` metadata, deletes them from the pgvector collection, and drops the corresponding ``documents`` / ``source_files`` rows via ``rag_system.pg_source_files.delete_whole_file``. Writes to Postgres only; returns a failure dict when the URL is not in the index. Called by the ``rag_remove_url`` tool handler in ``tools/rag.py`` (run off the event loop with ``asyncio.to_thread``). Args: url (str): The previously indexed source URL to remove. Returns: Dict[str, Any]: ``{"success": True, "url", "entries_removed"}`` on success, otherwise ``{"success": False, "error": ...}``. """ try: existing = self.collection.get( where={"source_url": url}, include=["metadatas"], ) if existing and existing.get("ids"): metas = existing.get("metadatas") or [] filenames = { m.get("filename") for m in metas if m and m.get("filename") } self.collection.delete(ids=existing["ids"]) from .pg_source_files import delete_whole_file for fn in filenames: delete_whole_file(self._schema, fn) return { "success": True, "url": url, "entries_removed": len(existing["ids"]), } return { "success": False, "error": f"URL not found in index: {url}", } except Exception as e: return {"success": False, "error": str(e)}
[docs] def list_indexed_files(self, limit: int = 100) -> List[Dict[str, Any]]: """List metadata for files represented in the vector index. Reads up to *limit* chunk metadata records from the pgvector collection and projects each into a compact summary (path, filename, extension, size, index timestamp, decoded tags). This reflects what has been *embedded* and may include one row per chunk; the whole-file view is :meth:`list_store_files`. Returns an empty list and logs on error. Called by the RAG listing tool handlers in ``tools/rag.py`` (run off the event loop with ``asyncio.to_thread``). Args: limit (int): Maximum number of metadata records to fetch. Returns: List[Dict[str, Any]]: One summary dict per fetched index entry (empty on error). """ try: results = self.collection.get(limit=limit, include=["metadatas"]) return [ { "file_path": m.get("file_path", ""), "filename": m.get("filename", ""), "extension": m.get("extension", ""), "file_size": m.get("file_size", 0), "indexed_at": m.get("indexed_at", ""), "tags": json.loads(m.get("tags", "[]")), } for m in results.get("metadatas", []) ] except Exception as e: logger.error("Failed to list files: %s", e) return []
[docs] def list_store_files(self) -> List[Dict[str, Any]]: """List the whole files held by this store, unioning Postgres and disk. The file-centric (not chunk-centric) listing: it enumerates the Postgres ``source_files`` rows via ``rag_system.pg_source_files.list_whole_files`` (skipped for chunk-only stores through :meth:`_pg_whole_files_enabled`) and then folds in any files from the legacy on-disk ``files`` directory that are not already represented, so unmigrated stores still report their content. Each entry carries a ``pg://`` or filesystem ``path``. Reads Postgres and the filesystem; results are sorted by filename. Called by the ``rag_list_store_files`` tool handler in ``tools/rag.py`` (run off the event loop with ``asyncio.to_thread``). Returns: List[Dict[str, Any]]: Filename/size/modified/path dicts, one per whole file, sorted by filename. """ from .pg_source_files import list_whole_files by_name: dict[str, dict[str, Any]] = {} if self._pg_whole_files_enabled(): for row in list_whole_files(self._schema): fn = row["filename"] by_name[fn] = { "filename": fn, "size": row.get("size", 0), "modified": row.get("modified"), "path": f"pg://{self.store_name}/{fn}", } if os.path.isdir(self.files_path): for fname in os.listdir(self.files_path): if fname in by_name: continue fp = os.path.join(self.files_path, fname) if os.path.isfile(fp): try: st = os.stat(fp) by_name[fname] = { "filename": fname, "size": st.st_size, "modified": datetime.fromtimestamp( st.st_mtime, tz=timezone.utc, ).isoformat(), "path": fp, } except Exception: pass return sorted(by_name.values(), key=lambda x: x.get("filename", ""))
[docs] def read_store_file(self, filename: str) -> Dict[str, Any]: """Return the full text of one stored file by bare filename. Powers the ``rag_read_store_file`` tool the LLM is hinted toward when a retrieval chunk is not enough. It first rejects any *filename* containing a path separator or ``..`` (path-traversal guard, so only flat store-local names are honored), then resolves the content through :meth:`_load_whole_file_text` (Postgres documents/source_files, then legacy disk). Reads Postgres and possibly the filesystem; returns a failure dict when the file is missing or unreadable. Called by the ``rag_read_store_file`` tool handler in ``tools/rag.py`` (run off the event loop with ``asyncio.to_thread``); the same tool string is surfaced to the model by :class:`rag_system.auto_search.RAGAutoSearchManager` and by ``message_processor.memory_linked_context``. Args: filename (str): Flat, store-local filename (no slashes or ``..``). Returns: Dict[str, Any]: ``{"success": True, "filename", "content", "size"}`` on success, otherwise ``{"success": False, "error": ...}``. """ if "/" in filename or "\\" in filename or ".." in filename: return {"success": False, "error": "Invalid filename."} try: content = self._load_whole_file_text(filename) if content is None: return {"success": False, "error": f"File not found: {filename}"} return { "success": True, "filename": filename, "content": content, "size": len(content), } except Exception as e: return {"success": False, "error": str(e)}
[docs] def close(self) -> None: """No-op: the pgvector pools are process-wide and shared. Retained for API compatibility (the LRU registry calls this on eviction); there is no per-store connection to release. """ return None
[docs] def get_stats(self) -> Dict[str, Any]: """Return a small summary of this store's identity and size. Reports the store name, the legacy on-disk DB path, the live indexed-row count from the pgvector collection (``collection.count()``, a Postgres ``COUNT``), and the configured embedding model. Intended for admin/status surfaces; returns ``{"error": ...}`` instead of raising if the count query fails. No in-repo callers were found by grep; invoked via dynamic/admin paths. Returns: Dict[str, Any]: ``store_name``, ``store_path``, ``file_count`` and ``embedding_model`` keys, or an ``error`` key on failure. """ try: return { "store_name": self.store_name, "store_path": self.db_path, "file_count": self.collection.count(), "embedding_model": self.embedding_model, } except Exception as e: return {"error": str(e)}
[docs] def clear(self) -> Dict[str, Any]: """Empty this store, dropping all embedded chunks and whole-file rows. Wipes every vector row via the underlying ``PgVectorCollection.clear`` and, for non-chunk-only stores, also truncates the whole-file tables through ``rag_system.pg_source_files.clear_source_tables`` so no document text is left behind. The store schema/table remain so it can be re-indexed in place. Writes to Postgres only; returns a failure dict instead of raising on error. Called by the corpus (re)build scripts under ``scripts/`` (e.g. ``ingest_religion_rag``, ``ingest_law_rag``, ``update_docs_rag``, ``build_rag_from_directory``) before a fresh full ingest. Returns: Dict[str, Any]: ``{"success": True, "message": ...}`` on success, otherwise ``{"success": False, "error": ...}``. """ try: self._pg.clear() if self._pg_whole_files_enabled(): from .pg_source_files import clear_source_tables clear_source_tables(self._schema) logger.info("Cleared RAG store: %s", self.store_name) return { "success": True, "message": f"Store '{self.store_name}' cleared", } except Exception as e: return {"success": False, "error": str(e)}
# --------------------------------------------------------------------------- # Global store registry (LRU-bounded) # --------------------------------------------------------------------------- _STORE_REGISTRY_MAX_SIZE = int(os.environ.get("RAG_STORE_CACHE_SIZE", "5")) _store_registry: OrderedDict[tuple, FileRAGManager] = OrderedDict() # Sphinx-generated docs store: Gemini retrieval task types for index vs query. STARGAZER_DOCS_STORE_NAME = "stargazer_docs" STARGAZER_DOCS_DOCUMENT_TASK = "RETRIEVAL_DOCUMENT" STARGAZER_DOCS_QUERY_TASK = "RETRIEVAL_QUERY"
[docs] def get_rag_store( store_name: str = "default", api_key: Optional[str] = None, max_file_size: Optional[int] = None, gemini_only: bool = True, document_task_type: Optional[str] = None, query_task_type: Optional[str] = None, ) -> FileRAGManager: """Get or create a RAG store by name (LRU-cached). At most ``_STORE_REGISTRY_MAX_SIZE`` stores are kept open simultaneously. When a new store would exceed the limit the least recently used entry is closed and evicted. Cache entries are keyed by ``store_name`` plus optional embedding task types so different embedding configurations do not share one client. """ cache_key = ( store_name, document_task_type, query_task_type, ) if cache_key in _store_registry: _store_registry.move_to_end(cache_key) return _store_registry[cache_key] kwargs: Dict[str, Any] = {"store_name": store_name, "api_key": api_key} if max_file_size is not None: kwargs["max_file_size"] = max_file_size if gemini_only: kwargs["gemini_only"] = True if document_task_type is not None: kwargs["document_task_type"] = document_task_type if query_task_type is not None: kwargs["query_task_type"] = query_task_type store = FileRAGManager(**kwargs) _store_registry[cache_key] = store while len(_store_registry) > _STORE_REGISTRY_MAX_SIZE: _evicted_key, evicted_store = _store_registry.popitem(last=False) logger.info("Evicting RAG store %s from cache (LRU)", _evicted_key) evicted_store.close() return store
[docs] def get_stargazer_docs_store() -> FileRAGManager: """Return the shared RAG store for Sphinx / tool documentation. Uses ``RETRIEVAL_DOCUMENT`` for indexed chunks and ``RETRIEVAL_QUERY`` for search queries (Gemini embedding task types). """ return get_rag_store( STARGAZER_DOCS_STORE_NAME, document_task_type=STARGAZER_DOCS_DOCUMENT_TASK, query_task_type=STARGAZER_DOCS_QUERY_TASK, )
def _enumerate_pg_file_stores() -> List[Dict[str, Any]]: """Return ``[{'name','file_count'}]`` for every Postgres file-RAG store. A file-RAG store is a schema containing a ``files_<schema>`` table. ``file_count`` uses the planner's row estimate (instant) so this stays cheap enough for the per-message system-prompt build. """ from vector_store import get_sync_pool out: List[Dict[str, Any]] = [] pool = get_sync_pool() with pool.connection() as conn: with conn.cursor() as cur: cur.execute( "SELECT t.table_schema, " " GREATEST(COALESCE(c.reltuples, 0), 0)::bigint " "FROM information_schema.tables t " "JOIN pg_namespace n ON n.nspname = t.table_schema " "JOIN pg_class c ON c.relname = t.table_name " " AND c.relnamespace = n.oid " "WHERE t.table_name = 'files_' || t.table_schema " " AND t.table_schema NOT IN " " ('pg_catalog', 'information_schema') " "ORDER BY t.table_schema" ) for schema, est in cur.fetchall(): out.append({"name": schema, "file_count": int(est)}) return out _rag_stores_cache: "tuple[float, List[Dict[str, Any]]] | None" = None _RAG_STORES_CACHE_TTL = 60.0
[docs] def list_rag_stores() -> List[str]: """List the names of all available RAG stores. A thin name-only projection over :func:`list_rag_stores_with_stats` (and thus its 60s cache): every Postgres schema that owns a ``files_<schema>`` table is a store. Swallows errors and returns an empty list so prompt-build and admin callers never crash on a transient Postgres hiccup. Called by the web config API in ``web/rag_config_api.py`` (which filters the names for cloud-user stores). Returns: List[str]: Store names (empty on error). """ try: return [s["name"] for s in list_rag_stores_with_stats()] except Exception: logger.debug("list_rag_stores (pgvector) failed", exc_info=True) return []
[docs] def list_rag_stores_with_stats() -> List[Dict[str, Any]]: """List stores with indexed-chunk counts from Postgres (60s cached). Replaces the legacy filesystem scan. Counts come from planner row estimates so this never opens a per-store client and stays cheap on the per-message prompt path. """ global _rag_stores_cache import time as _t now = _t.monotonic() if ( _rag_stores_cache is not None and (now - _rag_stores_cache[0]) < _RAG_STORES_CACHE_TTL ): return list(_rag_stores_cache[1]) try: stores = _enumerate_pg_file_stores() except Exception: logger.debug("list_rag_stores_with_stats (pgvector) failed", exc_info=True) return list(_rag_stores_cache[1]) if _rag_stores_cache else [] _rag_stores_cache = (now, stores) return list(stores)
[docs] def delete_rag_store(store_name: str) -> Dict[str, Any]: """Delete a RAG store completely (Postgres tables + local files dir). Drops the store's ``files_<schema>`` / ``documents`` / ``source_files`` tables (not the whole schema, so shared schemas such as ``golden_goddess`` keep their non-file tables like ``ncm_kernel``). """ import shutil from vector_store import get_sync_pool, pg_ident global _rag_stores_cache sanitized_name = _sanitize_collection_name(store_name) schema = pg_ident(sanitized_name) table = pg_ident(f"files_{sanitized_name}") store_path = os.path.join(DEFAULT_STORE_PATH, sanitized_name) # Evict any cached managers for this store first. keys_to_drop = [k for k in list(_store_registry.keys()) if k[0] == store_name] for k in keys_to_drop: evicted = _store_registry.pop(k, None) if evicted is not None: evicted.close() dropped = False try: pool = get_sync_pool() with pool.connection() as conn: with conn.cursor() as cur: cur.execute( "SELECT 1 FROM information_schema.tables " "WHERE table_schema = %s AND table_name = %s", (schema, table), ) dropped = cur.fetchone() is not None conn.execute(f'DROP TABLE IF EXISTS "{schema}"."{table}" CASCADE') conn.execute(f'DROP TABLE IF EXISTS "{schema}"."documents" CASCADE') conn.execute(f'DROP TABLE IF EXISTS "{schema}"."source_files" CASCADE') # Remove the schema only if nothing else is left in it. try: conn.execute(f'DROP SCHEMA IF EXISTS "{schema}" RESTRICT') except Exception: pass except Exception as e: return {"success": False, "error": f"Failed to drop '{schema}': {e}"} dir_removed = False if os.path.exists(store_path): try: shutil.rmtree(store_path) dir_removed = True except Exception: logger.debug("Failed to remove store dir %s", store_path, exc_info=True) _rag_stores_cache = None # invalidate listing cache if not dropped and not dir_removed: return { "success": False, "error": f"Store '{store_name}' does not exist", } logger.info( "Deleted RAG store: %s (pg dropped=%s, dir removed=%s)", store_name, dropped, dir_removed, ) return { "success": True, "message": f"Store '{store_name}' deleted successfully", }