rag_system.file_rag_manager module
File-based RAG Manager.
Manages file indexing and retrieval using Postgres pgvector + whole-file tables. Returns entire files, not chunks, for complete context when possible.
Features: - URL fetching support - PDF parsing via PyMuPDF - Chunked embeddings for better semantic matching - Full file retrieval on search
- rag_system.file_rag_manager.extract_pdf_text(file_path)[source]
Extract page-delimited plain text from a PDF on disk via PyMuPDF.
Opens the file with
fitz(PyMuPDF), concatenates the non-empty pages with--- Page N ---separators, and returns the joined text so callers get readable, source-traceable content for indexing. Reads the filesystem only; performs no embedding or network work. ReturnsNone(and logs) when PyMuPDF is not installed, the document yields no extractable text, or any extraction error occurs, so the caller can fall back or skip the file.Called within this module by
FileRAGManager._read_file_content(),FileRAGManager._load_whole_file_text(), andextract_pdf_text_from_bytes(). No external in-repo callers.
- rag_system.file_rag_manager.extract_pdf_text_from_bytes(raw)[source]
Extract text from in-memory PDF bytes by staging a temp file.
Wraps
extract_pdf_text()for the byte-stream case (e.g. a PDF fetched from a URL that was never written to a permanent path): it writes raw to aNamedTemporaryFile, extracts, and unlinks the temp file in afinallyblock so nothing is left on disk even on error. Touches the filesystem (one transient temp file) but no network or store state; returnsNoneon any failure.Called within this module by
decode_bytes_to_text()for.pdfinputs. No external in-repo callers.
- rag_system.file_rag_manager.decode_bytes_to_text(raw, filename)[source]
Decode raw file bytes to text, dispatching on the filename extension.
The single entry point for turning fetched or stored bytes into indexable text. For
.pdfnames it delegates toextract_pdf_text_from_bytes(); otherwise it tries a strict UTF-8 decode and falls back tolatin-1so arbitrary byte streams still produce something usable. Pure aside from the temp file used by the PDF path; returnsNoneonly when even thelatin-1fallback raises.Called within this module by
FileRAGManager._load_whole_file_text()andFileRAGManager.index_url(), and externally byscripts/backfill_pg_source_files.pywhen re-deriving whole-file text.
- rag_system.file_rag_manager.compress_pdf(file_path, output_path=None, remove_images=True)[source]
Shrink an oversized PDF in place (or to a copy) before indexing.
Used to bring PDFs that exceed
MAX_FILE_SIZEunder the limit so they can still be ingested. Opens the document with PyMuPDF, optionally strips every embedded image (the dominant size contributor), and rewrites it withez_save()for object-stream/garbage-collected compaction. Writes to output_path when given, otherwise overwrites file_path in place, and logs the size reduction. Filesystem-only; no network or store interaction. Note thatimport fitzis unguarded here, so a missing PyMuPDF raises rather than returning a sentinel.Called within this module by
FileRAGManager._read_file_content()andFileRAGManager.index_url()on the over-limit PDF path. No external in-repo callers.- Parameters:
- Returns:
(out_path, original_size, compressed_size)in bytes.- Return type:
- Raises:
ImportError – If PyMuPDF (
fitz) is not installed.
- rag_system.file_rag_manager.chunk_text(text, chunk_size=1500, overlap=200)[source]
Split text into overlapping chunks on paragraph and sentence boundaries.
The shared chunker that turns whole documents into the embed-sized units the vector store indexes. It prefers natural boundaries — splitting on blank-line paragraphs first, then sentence breaks, and only hard-slicing when a single sentence still exceeds chunk_size — and carries an overlap tail of characters between consecutive chunks so semantic context is not lost at the seams. Pure string processing with no I/O; returns the input as a single chunk when it already fits.
Called within this module by
FileRAGManager.index_file()andFileRAGManager.index_url(), and externally bylog_rag_ingestand the cloud-RAG ingest path intools/cloud_rag.py.- Parameters:
- Returns:
The ordered list of chunk strings (a single-element list when the text fits in one chunk).
- Return type:
- async rag_system.file_rag_manager.fetch_url_content(url, timeout=30.0)[source]
Fetch content from url (SSRF-guarded). Returns
(bytes, content_type, filename).Routes through
tools._safe_http.safe_http_request(), which validates every redirect hop and pins each connect to a vetted public IP, so a user-supplied ingestion URL cannot be redirected (or DNS-rebound) into an internal host such as 10.10.0.x:6379.
- class rag_system.file_rag_manager.FileRAGManager(store_name='default', store_path=None, api_key=None, embedding_model='google/gemini-embedding-001', max_file_size=15728640, gemini_only=True, document_task_type=None, query_task_type=None)[source]
Bases:
objectFile- and URL-oriented RAG store over Postgres pgvector plus whole-file tables.
One instance manages a single named store: a per-store Postgres schema holding a
files_<schema>vector table (halfvec(3072)+ HNSW) for chunk embeddings, plusdocuments/source_filestables that keep the full original text and raw bytes so search can return entire files rather than just the matched chunk. It wraps the pgvector table in a Chroma-shaped facade (vector_store.ChromaCompatCollection) and embeds text throughrag_system.openrouter_embeddings.SyncOpenRouterEmbeddings(Gemini by default), so indexing and search reach the embedding provider over HTTP while persistence stays in Postgres. Stores listed inCHUNK_ONLY_STORES(e.g.stargazer_logs) skip the whole-file tables entirely.Instances are normally obtained through the module-level LRU registry
get_rag_store()(andget_stargazer_docs_store()) rather than constructed directly; the RAG tool handlers intools/rag.py/tools/cloud_rag.py,rag_system.auto_search.RAGAutoSearchManager, andstarwiki/rag_integrationall go through that registry, withstarwikibeing the one place that instantiatesFileRAGManagerdirectly.- Parameters:
- __init__(store_name='default', store_path=None, api_key=None, embedding_model='google/gemini-embedding-001', max_file_size=15728640, gemini_only=True, document_task_type=None, query_task_type=None)[source]
Initialize the instance.
- Parameters:
store_name (
str) – The store name value.embedding_model (
str) – The embedding model value.max_file_size (
int) – The max file size value.gemini_only (
bool) – Use only the Gemini API for embeddings.document_task_type (
Optional[str]) – Optional Gemini task type for indexed text (e.g.RETRIEVAL_DOCUMENT).query_task_type (
Optional[str]) – Optional Gemini task type for search queries (e.g.RETRIEVAL_QUERY).
- index_file(file_path, tags=None, use_chunking=True, chunk_size=1500, chunk_overlap=200, force=False)[source]
Index a single file into the collection.
When force is True the content-hash dedup check is skipped so the file is always re-embedded (but the store is not cleared).
- async index_url(url, tags=None, use_chunking=True, chunk_size=1500, chunk_overlap=200)[source]
Fetch a remote document by URL and index it into this store.
The URL ingestion counterpart of
index_file(). It downloads via the SSRF-guardedfetch_url_content(), derives a stable stored filename (URL-hash prefixed and sanitized), compresses oversized PDFs throughcompress_pdf(), decodes the bytes to text withdecode_bytes_to_text(), and persists the whole file to Postgres via_upsert_whole_file(). It then dedups against any prior copy of the samesource_urlby content hash and, unless unchanged, upserts the embedded chunks (or a single document) into the pgvector collection withsource_type="url"metadata. Touches the network (download), the embedding provider (via the collection upsert), Postgres, and a transient temp file for PDF compression.Called by the
rag_index_urltool handler intools/rag.py.- Parameters:
- Returns:
A result dict with
successand, on success,action("indexed"or"skipped"),url,filename, sizes andstored_path; on failure anerrormessage.- Return type:
- index_directory(directory_path, recursive=True, tags=None, exclude_patterns=None, max_workers=6, force=False, allowed_extensions=None)[source]
Index all supported files in directory_path.
When max_workers > 1, files are indexed concurrently using a thread pool. Each file’s embedding batches are already parallelised inside the embedding function, so even
max_workers=1benefits from concurrent API calls.force bypasses the per-file content-hash dedup check without clearing the store, so already-indexed files get re-embedded.
When allowed_extensions is set, only files whose suffix (after normalizing to a leading dot, lowercase) appears in the collection are queued;
Nonemeans no extension filter (all supported types under SUPPORTED_EXTENSIONS).
- search(query, n_results=5, tags=None, return_content=True, query_embedding=None, max_content_size=8000, content_mode='whole')[source]
Semantic search returning one result per matched file.
content_mode
wholeloads Postgresdocumentswhen available;chunksreturns the best KNN-matched indexed chunk only.
- remove_file(file_path)[source]
Remove every index entry (and the whole-file rows) for one local file.
Resolves file_path to an absolute path, looks up all chunk ids stored under that
file_pathmetadata, deletes them from the pgvector collection, and then drops the matchingdocuments/source_filesrows for each affected filename viarag_system.pg_source_files.delete_whole_fileso no orphaned whole-file text survives. Writes to Postgres only; returns a failure dict when the path is not present in the index.Called by the
rag_remove_filetool handler intools/rag.py(run off the event loop withasyncio.to_thread).
- remove_url(url)[source]
Remove every index entry (and whole-file rows) for one indexed URL.
The URL analogue of
remove_file(): it finds all chunk ids carrying the givensource_urlmetadata, deletes them from the pgvector collection, and drops the correspondingdocuments/source_filesrows viarag_system.pg_source_files.delete_whole_file. Writes to Postgres only; returns a failure dict when the URL is not in the index.Called by the
rag_remove_urltool handler intools/rag.py(run off the event loop withasyncio.to_thread).
- list_indexed_files(limit=100)[source]
List metadata for files represented in the vector index.
Reads up to limit chunk metadata records from the pgvector collection and projects each into a compact summary (path, filename, extension, size, index timestamp, decoded tags). This reflects what has been embedded and may include one row per chunk; the whole-file view is
list_store_files(). Returns an empty list and logs on error.Called by the RAG listing tool handlers in
tools/rag.py(run off the event loop withasyncio.to_thread).
- list_store_files()[source]
List the whole files held by this store, unioning Postgres and disk.
The file-centric (not chunk-centric) listing: it enumerates the Postgres
source_filesrows viarag_system.pg_source_files.list_whole_files(skipped for chunk-only stores through_pg_whole_files_enabled()) and then folds in any files from the legacy on-diskfilesdirectory that are not already represented, so unmigrated stores still report their content. Each entry carries apg://or filesystempath. Reads Postgres and the filesystem; results are sorted by filename.Called by the
rag_list_store_filestool handler intools/rag.py(run off the event loop withasyncio.to_thread).
- read_store_file(filename)[source]
Return the full text of one stored file by bare filename.
Powers the
rag_read_store_filetool the LLM is hinted toward when a retrieval chunk is not enough. It first rejects any filename containing a path separator or..(path-traversal guard, so only flat store-local names are honored), then resolves the content through_load_whole_file_text()(Postgres documents/source_files, then legacy disk). Reads Postgres and possibly the filesystem; returns a failure dict when the file is missing or unreadable.Called by the
rag_read_store_filetool handler intools/rag.py(run off the event loop withasyncio.to_thread); the same tool string is surfaced to the model byrag_system.auto_search.RAGAutoSearchManagerand bymessage_processor.memory_linked_context.
- close()[source]
No-op: the pgvector pools are process-wide and shared.
Retained for API compatibility (the LRU registry calls this on eviction); there is no per-store connection to release.
- Return type:
- get_stats()[source]
Return a small summary of this store’s identity and size.
Reports the store name, the legacy on-disk DB path, the live indexed-row count from the pgvector collection (
collection.count(), a PostgresCOUNT), and the configured embedding model. Intended for admin/status surfaces; returns{"error": ...}instead of raising if the count query fails.No in-repo callers were found by grep; invoked via dynamic/admin paths.
- clear()[source]
Empty this store, dropping all embedded chunks and whole-file rows.
Wipes every vector row via the underlying
PgVectorCollection.clearand, for non-chunk-only stores, also truncates the whole-file tables throughrag_system.pg_source_files.clear_source_tablesso no document text is left behind. The store schema/table remain so it can be re-indexed in place. Writes to Postgres only; returns a failure dict instead of raising on error.Called by the corpus (re)build scripts under
scripts/(e.g.ingest_religion_rag,ingest_law_rag,update_docs_rag,build_rag_from_directory) before a fresh full ingest.
- rag_system.file_rag_manager.get_rag_store(store_name='default', api_key=None, max_file_size=None, gemini_only=True, document_task_type=None, query_task_type=None)[source]
Get or create a RAG store by name (LRU-cached).
At most
_STORE_REGISTRY_MAX_SIZEstores are kept open simultaneously. When a new store would exceed the limit the least recently used entry is closed and evicted.Cache entries are keyed by
store_nameplus optional embedding task types so different embedding configurations do not share one client.
- rag_system.file_rag_manager.get_stargazer_docs_store()[source]
Return the shared RAG store for Sphinx / tool documentation.
Uses
RETRIEVAL_DOCUMENTfor indexed chunks andRETRIEVAL_QUERYfor search queries (Gemini embedding task types).- Return type:
- rag_system.file_rag_manager.list_rag_stores()[source]
List the names of all available RAG stores.
A thin name-only projection over
list_rag_stores_with_stats()(and thus its 60s cache): every Postgres schema that owns afiles_<schema>table is a store. Swallows errors and returns an empty list so prompt-build and admin callers never crash on a transient Postgres hiccup.Called by the web config API in
web/rag_config_api.py(which filters the names for cloud-user stores).
- rag_system.file_rag_manager.list_rag_stores_with_stats()[source]
List stores with indexed-chunk counts from Postgres (60s cached).
Replaces the legacy filesystem scan. Counts come from planner row estimates so this never opens a per-store client and stays cheap on the per-message prompt path.
- rag_system.file_rag_manager.delete_rag_store(store_name)[source]
Delete a RAG store completely (Postgres tables + local files dir).
Drops the store’s
files_<schema>/documents/source_filestables (not the whole schema, so shared schemas such asgolden_goddesskeep their non-file tables likencm_kernel).