rag_system.file_rag_manager module

File-based RAG Manager.

Manages file indexing and retrieval using Postgres pgvector + whole-file tables. Returns entire files, not chunks, for complete context when possible.

Features: - URL fetching support - PDF parsing via PyMuPDF - Chunked embeddings for better semantic matching - Full file retrieval on search

rag_system.file_rag_manager.extract_pdf_text(file_path)[source]

Extract page-delimited plain text from a PDF on disk via PyMuPDF.

Opens the file with fitz (PyMuPDF), concatenates the non-empty pages with --- Page N --- separators, and returns the joined text so callers get readable, source-traceable content for indexing. Reads the filesystem only; performs no embedding or network work. Returns None (and logs) when PyMuPDF is not installed, the document yields no extractable text, or any extraction error occurs, so the caller can fall back or skip the file.

Called within this module by FileRAGManager._read_file_content(), FileRAGManager._load_whole_file_text(), and extract_pdf_text_from_bytes(). No external in-repo callers.

Parameters:

file_path (str) – Absolute path to the PDF file.

Returns:

The extracted text, or None on failure / empty PDF.

Return type:

Optional[str]

rag_system.file_rag_manager.extract_pdf_text_from_bytes(raw)[source]

Extract text from in-memory PDF bytes by staging a temp file.

Wraps extract_pdf_text() for the byte-stream case (e.g. a PDF fetched from a URL that was never written to a permanent path): it writes raw to a NamedTemporaryFile, extracts, and unlinks the temp file in a finally block so nothing is left on disk even on error. Touches the filesystem (one transient temp file) but no network or store state; returns None on any failure.

Called within this module by decode_bytes_to_text() for .pdf inputs. No external in-repo callers.

Parameters:

raw (bytes) – The raw PDF file contents.

Returns:

The extracted text, or None on failure.

Return type:

Optional[str]

rag_system.file_rag_manager.decode_bytes_to_text(raw, filename)[source]

Decode raw file bytes to text, dispatching on the filename extension.

The single entry point for turning fetched or stored bytes into indexable text. For .pdf names it delegates to extract_pdf_text_from_bytes(); otherwise it tries a strict UTF-8 decode and falls back to latin-1 so arbitrary byte streams still produce something usable. Pure aside from the temp file used by the PDF path; returns None only when even the latin-1 fallback raises.

Called within this module by FileRAGManager._load_whole_file_text() and FileRAGManager.index_url(), and externally by scripts/backfill_pg_source_files.py when re-deriving whole-file text.

Parameters:
  • raw (bytes) – The raw file contents.

  • filename (str) – Name used only to detect a .pdf extension.

Returns:

The decoded text, or None if decoding failed.

Return type:

Optional[str]

rag_system.file_rag_manager.compress_pdf(file_path, output_path=None, remove_images=True)[source]

Shrink an oversized PDF in place (or to a copy) before indexing.

Used to bring PDFs that exceed MAX_FILE_SIZE under the limit so they can still be ingested. Opens the document with PyMuPDF, optionally strips every embedded image (the dominant size contributor), and rewrites it with ez_save() for object-stream/garbage-collected compaction. Writes to output_path when given, otherwise overwrites file_path in place, and logs the size reduction. Filesystem-only; no network or store interaction. Note that import fitz is unguarded here, so a missing PyMuPDF raises rather than returning a sentinel.

Called within this module by FileRAGManager._read_file_content() and FileRAGManager.index_url() on the over-limit PDF path. No external in-repo callers.

Parameters:
  • file_path (str) – Absolute path to the source PDF.

  • output_path (Optional[str]) – Destination path; None overwrites the source in place.

  • remove_images (bool) – Whether to delete embedded images before saving.

Returns:

(out_path, original_size, compressed_size) in bytes.

Return type:

Tuple[str, int, int]

Raises:

ImportError – If PyMuPDF (fitz) is not installed.

rag_system.file_rag_manager.chunk_text(text, chunk_size=1500, overlap=200)[source]

Split text into overlapping chunks on paragraph and sentence boundaries.

The shared chunker that turns whole documents into the embed-sized units the vector store indexes. It prefers natural boundaries — splitting on blank-line paragraphs first, then sentence breaks, and only hard-slicing when a single sentence still exceeds chunk_size — and carries an overlap tail of characters between consecutive chunks so semantic context is not lost at the seams. Pure string processing with no I/O; returns the input as a single chunk when it already fits.

Called within this module by FileRAGManager.index_file() and FileRAGManager.index_url(), and externally by log_rag_ingest and the cloud-RAG ingest path in tools/cloud_rag.py.

Parameters:
  • text (str) – The full document text to split.

  • chunk_size (int) – Target maximum chunk length in characters.

  • overlap (int) – Number of trailing characters repeated into the next chunk for context continuity.

Returns:

The ordered list of chunk strings (a single-element list when the text fits in one chunk).

Return type:

List[str]

async rag_system.file_rag_manager.fetch_url_content(url, timeout=30.0)[source]

Fetch content from url (SSRF-guarded). Returns (bytes, content_type, filename).

Routes through tools._safe_http.safe_http_request(), which validates every redirect hop and pins each connect to a vetted public IP, so a user-supplied ingestion URL cannot be redirected (or DNS-rebound) into an internal host such as 10.10.0.x:6379.

Return type:

Tuple[Optional[bytes], Optional[str], Optional[str]]

Parameters:
class rag_system.file_rag_manager.FileRAGManager(store_name='default', store_path=None, api_key=None, embedding_model='google/gemini-embedding-001', max_file_size=15728640, gemini_only=True, document_task_type=None, query_task_type=None)[source]

Bases: object

File- and URL-oriented RAG store over Postgres pgvector plus whole-file tables.

One instance manages a single named store: a per-store Postgres schema holding a files_<schema> vector table (halfvec(3072) + HNSW) for chunk embeddings, plus documents / source_files tables that keep the full original text and raw bytes so search can return entire files rather than just the matched chunk. It wraps the pgvector table in a Chroma-shaped facade (vector_store.ChromaCompatCollection) and embeds text through rag_system.openrouter_embeddings.SyncOpenRouterEmbeddings (Gemini by default), so indexing and search reach the embedding provider over HTTP while persistence stays in Postgres. Stores listed in CHUNK_ONLY_STORES (e.g. stargazer_logs) skip the whole-file tables entirely.

Instances are normally obtained through the module-level LRU registry get_rag_store() (and get_stargazer_docs_store()) rather than constructed directly; the RAG tool handlers in tools/rag.py / tools/cloud_rag.py, rag_system.auto_search.RAGAutoSearchManager, and starwiki/rag_integration all go through that registry, with starwiki being the one place that instantiates FileRAGManager directly.

Parameters:
  • store_name (str)

  • store_path (str | None)

  • api_key (str | None)

  • embedding_model (str)

  • max_file_size (int)

  • gemini_only (bool)

  • document_task_type (str | None)

  • query_task_type (str | None)

__init__(store_name='default', store_path=None, api_key=None, embedding_model='google/gemini-embedding-001', max_file_size=15728640, gemini_only=True, document_task_type=None, query_task_type=None)[source]

Initialize the instance.

Parameters:
  • store_name (str) – The store name value.

  • store_path (Optional[str]) – The store path value.

  • api_key (Optional[str]) – The api key value.

  • embedding_model (str) – The embedding model value.

  • max_file_size (int) – The max file size value.

  • gemini_only (bool) – Use only the Gemini API for embeddings.

  • document_task_type (Optional[str]) – Optional Gemini task type for indexed text (e.g. RETRIEVAL_DOCUMENT).

  • query_task_type (Optional[str]) – Optional Gemini task type for search queries (e.g. RETRIEVAL_QUERY).

index_file(file_path, tags=None, use_chunking=True, chunk_size=1500, chunk_overlap=200, force=False)[source]

Index a single file into the collection.

When force is True the content-hash dedup check is skipped so the file is always re-embedded (but the store is not cleared).

Return type:

Dict[str, Any]

Parameters:
async index_url(url, tags=None, use_chunking=True, chunk_size=1500, chunk_overlap=200)[source]

Fetch a remote document by URL and index it into this store.

The URL ingestion counterpart of index_file(). It downloads via the SSRF-guarded fetch_url_content(), derives a stable stored filename (URL-hash prefixed and sanitized), compresses oversized PDFs through compress_pdf(), decodes the bytes to text with decode_bytes_to_text(), and persists the whole file to Postgres via _upsert_whole_file(). It then dedups against any prior copy of the same source_url by content hash and, unless unchanged, upserts the embedded chunks (or a single document) into the pgvector collection with source_type="url" metadata. Touches the network (download), the embedding provider (via the collection upsert), Postgres, and a transient temp file for PDF compression.

Called by the rag_index_url tool handler in tools/rag.py.

Parameters:
  • url (str) – The document URL to fetch and index.

  • tags (Optional[List[str]]) – Optional tag labels stored in metadata.

  • use_chunking (bool) – Whether to chunk large content before embedding.

  • chunk_size (int) – Target chunk length in characters.

  • chunk_overlap (int) – Overlap carried between chunks.

Returns:

A result dict with success and, on success, action ("indexed" or "skipped"), url, filename, sizes and stored_path; on failure an error message.

Return type:

Dict[str, Any]

index_directory(directory_path, recursive=True, tags=None, exclude_patterns=None, max_workers=6, force=False, allowed_extensions=None)[source]

Index all supported files in directory_path.

When max_workers > 1, files are indexed concurrently using a thread pool. Each file’s embedding batches are already parallelised inside the embedding function, so even max_workers=1 benefits from concurrent API calls.

force bypasses the per-file content-hash dedup check without clearing the store, so already-indexed files get re-embedded.

When allowed_extensions is set, only files whose suffix (after normalizing to a leading dot, lowercase) appears in the collection are queued; None means no extension filter (all supported types under SUPPORTED_EXTENSIONS).

Return type:

Dict[str, Any]

Parameters:
search(query, n_results=5, tags=None, return_content=True, query_embedding=None, max_content_size=8000, content_mode='whole')[source]

Semantic search returning one result per matched file.

content_mode whole loads Postgres documents when available; chunks returns the best KNN-matched indexed chunk only.

Return type:

List[Dict[str, Any]]

Parameters:
remove_file(file_path)[source]

Remove every index entry (and the whole-file rows) for one local file.

Resolves file_path to an absolute path, looks up all chunk ids stored under that file_path metadata, deletes them from the pgvector collection, and then drops the matching documents / source_files rows for each affected filename via rag_system.pg_source_files.delete_whole_file so no orphaned whole-file text survives. Writes to Postgres only; returns a failure dict when the path is not present in the index.

Called by the rag_remove_file tool handler in tools/rag.py (run off the event loop with asyncio.to_thread).

Parameters:

file_path (str) – Path of the indexed file to remove.

Returns:

{"success": True, "file_path", "entries_removed"} on success, otherwise {"success": False, "error": ...}.

Return type:

Dict[str, Any]

remove_url(url)[source]

Remove every index entry (and whole-file rows) for one indexed URL.

The URL analogue of remove_file(): it finds all chunk ids carrying the given source_url metadata, deletes them from the pgvector collection, and drops the corresponding documents / source_files rows via rag_system.pg_source_files.delete_whole_file. Writes to Postgres only; returns a failure dict when the URL is not in the index.

Called by the rag_remove_url tool handler in tools/rag.py (run off the event loop with asyncio.to_thread).

Parameters:

url (str) – The previously indexed source URL to remove.

Returns:

{"success": True, "url", "entries_removed"} on success, otherwise {"success": False, "error": ...}.

Return type:

Dict[str, Any]

list_indexed_files(limit=100)[source]

List metadata for files represented in the vector index.

Reads up to limit chunk metadata records from the pgvector collection and projects each into a compact summary (path, filename, extension, size, index timestamp, decoded tags). This reflects what has been embedded and may include one row per chunk; the whole-file view is list_store_files(). Returns an empty list and logs on error.

Called by the RAG listing tool handlers in tools/rag.py (run off the event loop with asyncio.to_thread).

Parameters:

limit (int) – Maximum number of metadata records to fetch.

Returns:

One summary dict per fetched index entry (empty on error).

Return type:

List[Dict[str, Any]]

list_store_files()[source]

List the whole files held by this store, unioning Postgres and disk.

The file-centric (not chunk-centric) listing: it enumerates the Postgres source_files rows via rag_system.pg_source_files.list_whole_files (skipped for chunk-only stores through _pg_whole_files_enabled()) and then folds in any files from the legacy on-disk files directory that are not already represented, so unmigrated stores still report their content. Each entry carries a pg:// or filesystem path. Reads Postgres and the filesystem; results are sorted by filename.

Called by the rag_list_store_files tool handler in tools/rag.py (run off the event loop with asyncio.to_thread).

Returns:

Filename/size/modified/path dicts, one per whole file, sorted by filename.

Return type:

List[Dict[str, Any]]

read_store_file(filename)[source]

Return the full text of one stored file by bare filename.

Powers the rag_read_store_file tool the LLM is hinted toward when a retrieval chunk is not enough. It first rejects any filename containing a path separator or .. (path-traversal guard, so only flat store-local names are honored), then resolves the content through _load_whole_file_text() (Postgres documents/source_files, then legacy disk). Reads Postgres and possibly the filesystem; returns a failure dict when the file is missing or unreadable.

Called by the rag_read_store_file tool handler in tools/rag.py (run off the event loop with asyncio.to_thread); the same tool string is surfaced to the model by rag_system.auto_search.RAGAutoSearchManager and by message_processor.memory_linked_context.

Parameters:

filename (str) – Flat, store-local filename (no slashes or ..).

Returns:

{"success": True, "filename", "content", "size"} on success, otherwise {"success": False, "error": ...}.

Return type:

Dict[str, Any]

close()[source]

No-op: the pgvector pools are process-wide and shared.

Retained for API compatibility (the LRU registry calls this on eviction); there is no per-store connection to release.

Return type:

None

get_stats()[source]

Return a small summary of this store’s identity and size.

Reports the store name, the legacy on-disk DB path, the live indexed-row count from the pgvector collection (collection.count(), a Postgres COUNT), and the configured embedding model. Intended for admin/status surfaces; returns {"error": ...} instead of raising if the count query fails.

No in-repo callers were found by grep; invoked via dynamic/admin paths.

Returns:

store_name, store_path, file_count and embedding_model keys, or an error key on failure.

Return type:

Dict[str, Any]

clear()[source]

Empty this store, dropping all embedded chunks and whole-file rows.

Wipes every vector row via the underlying PgVectorCollection.clear and, for non-chunk-only stores, also truncates the whole-file tables through rag_system.pg_source_files.clear_source_tables so no document text is left behind. The store schema/table remain so it can be re-indexed in place. Writes to Postgres only; returns a failure dict instead of raising on error.

Called by the corpus (re)build scripts under scripts/ (e.g. ingest_religion_rag, ingest_law_rag, update_docs_rag, build_rag_from_directory) before a fresh full ingest.

Returns:

{"success": True, "message": ...} on success, otherwise {"success": False, "error": ...}.

Return type:

Dict[str, Any]

rag_system.file_rag_manager.get_rag_store(store_name='default', api_key=None, max_file_size=None, gemini_only=True, document_task_type=None, query_task_type=None)[source]

Get or create a RAG store by name (LRU-cached).

At most _STORE_REGISTRY_MAX_SIZE stores are kept open simultaneously. When a new store would exceed the limit the least recently used entry is closed and evicted.

Cache entries are keyed by store_name plus optional embedding task types so different embedding configurations do not share one client.

Return type:

FileRAGManager

Parameters:
  • store_name (str)

  • api_key (str | None)

  • max_file_size (int | None)

  • gemini_only (bool)

  • document_task_type (str | None)

  • query_task_type (str | None)

rag_system.file_rag_manager.get_stargazer_docs_store()[source]

Return the shared RAG store for Sphinx / tool documentation.

Uses RETRIEVAL_DOCUMENT for indexed chunks and RETRIEVAL_QUERY for search queries (Gemini embedding task types).

Return type:

FileRAGManager

rag_system.file_rag_manager.list_rag_stores()[source]

List the names of all available RAG stores.

A thin name-only projection over list_rag_stores_with_stats() (and thus its 60s cache): every Postgres schema that owns a files_<schema> table is a store. Swallows errors and returns an empty list so prompt-build and admin callers never crash on a transient Postgres hiccup.

Called by the web config API in web/rag_config_api.py (which filters the names for cloud-user stores).

Returns:

Store names (empty on error).

Return type:

List[str]

rag_system.file_rag_manager.list_rag_stores_with_stats()[source]

List stores with indexed-chunk counts from Postgres (60s cached).

Replaces the legacy filesystem scan. Counts come from planner row estimates so this never opens a per-store client and stays cheap on the per-message prompt path.

Return type:

List[Dict[str, Any]]

rag_system.file_rag_manager.delete_rag_store(store_name)[source]

Delete a RAG store completely (Postgres tables + local files dir).

Drops the store’s files_<schema> / documents / source_files tables (not the whole schema, so shared schemas such as golden_goddess keep their non-file tables like ncm_kernel).

Return type:

Dict[str, Any]

Parameters:

store_name (str)