Source code for rag_system.pg_source_files

"""Postgres whole-file storage for File RAG stores.

Each file-RAG schema may contain:

* ``documents`` — extracted text (PK ``filename``)
* ``source_files`` — raw bytes (PK ``filename``)

Chunk vectors live in ``files_<schema>`` via :mod:`vector_store`.
"""

from __future__ import annotations

import logging
from datetime import datetime, timezone
from typing import Any

from vector_store import _validate_ident, get_sync_pool, pg_ident

logger = logging.getLogger(__name__)

DOCUMENTS_TABLE = "documents"
SOURCE_FILES_TABLE = "source_files"


def _qschema(schema: str) -> str:
    """Normalize and validate *schema* into a safe SQL identifier.

    Runs the raw schema name through :func:`vector_store.pg_ident` to apply
    the project's canonical identifier sanitisation, then through
    :func:`vector_store._validate_ident` (labelled ``"schema"``) which
    rejects anything that is not a legal, injection-safe identifier. The
    returned string is what every statement in this module interpolates
    between double quotes, so this is the single choke point guarding the
    schema portion of all whole-file SQL against injection.

    Called by essentially every function in this module that issues SQL,
    including :func:`source_tables_ddl`, :func:`table_exists`,
    :func:`upsert_document`, :func:`upsert_source_file`,
    :func:`get_document_text`, :func:`get_source_file_bytes`,
    :func:`list_whole_files`, :func:`delete_whole_file` and
    :func:`clear_source_tables`.

    Args:
        schema (str): The raw store/schema name to sanitise.

    Returns:
        str: A validated, quote-ready schema identifier.

    Raises:
        ValueError: Propagated from :func:`vector_store._validate_ident` if
            the sanitised name is not a valid identifier.
    """
    return _validate_ident(pg_ident(schema), "schema")


[docs] def source_tables_ddl(schema: str) -> list[str]: """Build the ``CREATE`` statements for a store's whole-file tables. Returns the ordered DDL that provisions the per-store schema plus the two whole-file tables (``documents`` for extracted text keyed on ``filename``, ``source_files`` for raw bytes keyed on ``filename``) so the layout matches the migrated production schema. Nothing is executed here; it is a pure string builder. The schema portion of every statement is sanitised through :func:`_qschema` before interpolation, which is the only injection guard on the returned SQL. Called by :func:`ensure_source_tables`, which iterates the list and runs each statement against the synchronous pool; there are no callers outside this module. Args: schema (str): The store/schema name the tables belong to. Returns: list[str]: Three DDL strings, in dependency order — create schema, create ``documents``, create ``source_files`` — each safe to run with ``CREATE ... IF NOT EXISTS`` semantics. """ s = _qschema(schema) return [ f'CREATE SCHEMA IF NOT EXISTS "{s}"', ( f'CREATE TABLE IF NOT EXISTS "{s}"."{DOCUMENTS_TABLE}" (' "filename text PRIMARY KEY, " "sha256 text, " "content text, " "char_count bigint, " "extraction_method text, " "extracted_at timestamptz)" ), ( f'CREATE TABLE IF NOT EXISTS "{s}"."{SOURCE_FILES_TABLE}" (' "filename text PRIMARY KEY, " "size_bytes bigint, " "sha256 text, " "content bytea)" ), ]
[docs] def ensure_source_tables(schema: str) -> None: """Provision a store's whole-file schema and tables if they do not exist. Idempotently creates the per-store schema and its ``documents`` / ``source_files`` tables so subsequent upserts have somewhere to land. This is the one write-side bootstrap for whole-file storage; every other writer assumes the tables already exist. Borrows a connection from the shared synchronous pool via :func:`vector_store.get_sync_pool` and executes each statement produced by :func:`source_tables_ddl` (the only side effect is the Postgres DDL). Called by :func:`upsert_whole_file` before it writes, by ``rag_system.file_rag_manager`` when a store is initialised, and by ``scripts/verify_pgvector_stores.py`` during store verification. Args: schema (str): The store/schema name to provision. Returns: None """ pool = get_sync_pool() with pool.connection() as conn: for stmt in source_tables_ddl(schema): conn.execute(stmt)
[docs] def table_exists(schema: str, table: str) -> bool: """Return True if *table* exists in *schema* per ``information_schema``. Sanitises the schema name through :func:`_qschema`, borrows a connection from the shared synchronous pool returned by :func:`vector_store.get_sync_pool`, and runs a parameterised ``SELECT EXISTS`` against ``information_schema.tables`` (both schema and table are passed as bind parameters, not interpolated). It performs a read-only catalog lookup with no side effects on the data tables. This is the existence guard used before touching the whole-file tables: it is called by :func:`get_document_text`, :func:`get_source_file_bytes`, :func:`list_whole_files`, :func:`delete_whole_file` and :func:`clear_source_tables` so those operations no-op gracefully when a store has not yet created its ``documents`` / ``source_files`` tables. Args: schema (str): The store schema to inspect. table (str): The table name to look for (e.g. ``documents`` or ``source_files``). Returns: bool: ``True`` if the table is present, ``False`` otherwise. """ s = _qschema(schema) pool = get_sync_pool() with pool.connection() as conn: with conn.cursor() as cur: cur.execute( """ SELECT EXISTS ( SELECT 1 FROM information_schema.tables WHERE table_schema = %s AND table_name = %s ) """, (s, table), ) return bool(cur.fetchone()[0])
[docs] def upsert_document( schema: str, filename: str, sha256: str, content: str, extraction_method: str = "text", ) -> None: """Upsert the extracted-text row for a file into the ``documents`` table. Inserts (or, on a primary-key conflict, replaces) the ``documents`` row for ``filename``, recording the content hash, the text itself, its character count, how it was extracted, and a fresh UTC extraction timestamp. This is the text half of whole-file storage; the raw-bytes half is handled by :func:`upsert_source_file`. Sanitises the schema via :func:`_qschema`, stamps ``extracted_at`` with the current UTC time, and runs a parameterised ``INSERT ... ON CONFLICT (filename) DO UPDATE`` on a connection from :func:`vector_store.get_sync_pool`; the sole side effect is the Postgres write. Assumes the table already exists (see :func:`ensure_source_tables`). Called by :func:`upsert_whole_file` (the only caller). Args: schema (str): The store/schema whose ``documents`` table to write. filename (str): Primary-key filename for the row. sha256 (str): Hex SHA-256 of the source content, stored for change detection. content (str): Extracted plain text; its length is stored as ``char_count``. extraction_method (str): How ``content`` was produced, e.g. ``"text"`` or ``"pdf"``. Returns: None """ s = _qschema(schema) now = datetime.now(timezone.utc) pool = get_sync_pool() with pool.connection() as conn: conn.execute( f""" INSERT INTO "{s}"."{DOCUMENTS_TABLE}" (filename, sha256, content, char_count, extraction_method, extracted_at) VALUES (%s, %s, %s, %s, %s, %s) ON CONFLICT (filename) DO UPDATE SET sha256 = EXCLUDED.sha256, content = EXCLUDED.content, char_count = EXCLUDED.char_count, extraction_method = EXCLUDED.extraction_method, extracted_at = EXCLUDED.extracted_at """, (filename, sha256, content, len(content), extraction_method, now), )
[docs] def upsert_source_file( schema: str, filename: str, raw_bytes: bytes, sha256: str, ) -> None: """Upsert the raw-bytes row for a file into the ``source_files`` table. Inserts (or, on a primary-key conflict, replaces) the ``source_files`` row for ``filename``, storing the original bytes as ``bytea`` alongside their size and content hash. This is the byte half of whole-file storage that lets a search return the exact original file; the extracted-text half lives in :func:`upsert_document`. Sanitises the schema via :func:`_qschema` and runs a parameterised ``INSERT ... ON CONFLICT (filename) DO UPDATE`` on a connection from :func:`vector_store.get_sync_pool`; the only side effect is the Postgres write, and the table is assumed to exist already. Called by :func:`upsert_whole_file` (the only caller). Args: schema (str): The store/schema whose ``source_files`` table to write. filename (str): Primary-key filename for the row. raw_bytes (bytes): Original file bytes; their length is stored as ``size_bytes``. sha256 (str): Hex SHA-256 of ``raw_bytes``, stored for change detection. Returns: None """ s = _qschema(schema) pool = get_sync_pool() with pool.connection() as conn: conn.execute( f""" INSERT INTO "{s}"."{SOURCE_FILES_TABLE}" (filename, size_bytes, sha256, content) VALUES (%s, %s, %s, %s) ON CONFLICT (filename) DO UPDATE SET size_bytes = EXCLUDED.size_bytes, sha256 = EXCLUDED.sha256, content = EXCLUDED.content """, (filename, len(raw_bytes), sha256, raw_bytes), )
[docs] def upsert_whole_file( schema: str, filename: str, sha256: str, content: str, raw_bytes: bytes, extraction_method: str = "text", ) -> None: """Persist both the extracted text and raw bytes of one file in one call. The single public entry point for writing whole-file storage: it guarantees the tables exist, then writes the text and byte halves together so a store holds both a searchable ``documents`` row and a downloadable ``source_files`` row for ``filename``. There is no transaction spanning the two writes — each upsert runs on its own pooled connection. Calls :func:`ensure_source_tables` to provision the schema, then :func:`upsert_document` (text) and :func:`upsert_source_file` (bytes), all of which touch Postgres via :func:`vector_store.get_sync_pool`. Called by ``rag_system.file_rag_manager`` (its ``_upsert_whole_file`` wrapper, which first hashes the bytes) when whole-file storage is enabled for a store. Args: schema (str): The store/schema to write into. filename (str): Primary-key filename shared by both rows. sha256 (str): Hex SHA-256 used for both the text and byte rows. content (str): Extracted plain text for the ``documents`` row. raw_bytes (bytes): Original file bytes for the ``source_files`` row. extraction_method (str): How ``content`` was produced, e.g. ``"text"`` or ``"pdf"``. Returns: None """ ensure_source_tables(schema) upsert_document(schema, filename, sha256, content, extraction_method) upsert_source_file(schema, filename, raw_bytes, sha256)
[docs] def get_document_text(schema: str, filename: str) -> str | None: """Fetch a file's stored extracted text from the ``documents`` table. Reads back the plain text previously written by :func:`upsert_document`, returning ``None`` when the store has no ``documents`` table yet, the file is absent, or its stored content is NULL — so callers can fall back to re-extraction gracefully. Read-only; no side effects on the data tables. Guards on :func:`table_exists` first, then sanitises the schema via :func:`_qschema` and runs a parameterised ``SELECT content ... WHERE filename = %s`` on a connection from :func:`vector_store.get_sync_pool`. Called by ``rag_system.file_rag_manager`` when serving a whole-file result. Args: schema (str): The store/schema to read from. filename (str): Filename whose extracted text is requested. Returns: str | None: The stored text, or ``None`` if the table is absent, the row is missing, or the content is NULL. """ if not table_exists(schema, DOCUMENTS_TABLE): return None s = _qschema(schema) pool = get_sync_pool() with pool.connection() as conn: with conn.cursor() as cur: cur.execute( f'SELECT content FROM "{s}"."{DOCUMENTS_TABLE}" WHERE filename = %s', (filename,), ) row = cur.fetchone() if not row or row[0] is None: return None return str(row[0])
[docs] def get_source_file_bytes(schema: str, filename: str) -> bytes | None: """Fetch a file's stored raw bytes from the ``source_files`` table. Reads back the original bytes previously written by :func:`upsert_source_file`, returning ``None`` when the ``source_files`` table is missing, the row is absent, or the stored content is NULL — which lets callers decide whether the exact original file is retrievable. Read-only; no side effects on the data tables. Guards on :func:`table_exists` first, then sanitises the schema via :func:`_qschema` and runs a parameterised ``SELECT content ... WHERE filename = %s`` on a connection from :func:`vector_store.get_sync_pool`, coercing the returned ``bytea`` to a ``bytes`` object. Called by ``rag_system.file_rag_manager`` when serving the original file for a result. Args: schema (str): The store/schema to read from. filename (str): Filename whose raw bytes are requested. Returns: bytes | None: The original file bytes, or ``None`` if the table is absent, the row is missing, or the content is NULL. """ if not table_exists(schema, SOURCE_FILES_TABLE): return None s = _qschema(schema) pool = get_sync_pool() with pool.connection() as conn: with conn.cursor() as cur: cur.execute( f'SELECT content FROM "{s}"."{SOURCE_FILES_TABLE}" WHERE filename = %s', (filename,), ) row = cur.fetchone() if not row or row[0] is None: return None return bytes(row[0])
[docs] def list_whole_files(schema: str) -> list[dict[str, Any]]: """List every whole file in a store with size, hash, and modified time. Produces one record per filename for the store's whole-file tables, preferring the richer ``documents`` rows (which carry ``char_count`` as the size and an ``extracted_at`` timestamp) and filling in any filenames that exist only in ``source_files`` (using ``size_bytes`` as the size and no timestamp). Files present in both tables are deduplicated by filename, with the ``documents`` entry winning. Read-only; no side effects. Sanitises the schema via :func:`_qschema`, guards each table with :func:`table_exists`, and runs ordered ``SELECT`` queries on a connection from :func:`vector_store.get_sync_pool`. Called by ``rag_system.file_rag_manager`` to enumerate stored whole files. Args: schema (str): The store/schema to enumerate. Returns: list[dict[str, Any]]: Records sorted by filename, each with ``filename``, ``size``, ``sha256``, and ``modified`` (an ISO timestamp or ``None``). Empty when neither whole-file table exists. """ s = _qschema(schema) out: dict[str, dict[str, Any]] = {} pool = get_sync_pool() with pool.connection() as conn: with conn.cursor() as cur: if table_exists(schema, DOCUMENTS_TABLE): cur.execute( f""" SELECT filename, char_count, sha256, extracted_at FROM "{s}"."{DOCUMENTS_TABLE}" ORDER BY filename """ ) for fn, cc, sha, ext_at in cur.fetchall(): out[fn] = { "filename": fn, "size": int(cc or 0), "sha256": sha, "modified": ext_at.isoformat() if ext_at else None, } if table_exists(schema, SOURCE_FILES_TABLE): cur.execute( f""" SELECT filename, size_bytes, sha256 FROM "{s}"."{SOURCE_FILES_TABLE}" ORDER BY filename """ ) for fn, size_b, sha in cur.fetchall(): if fn not in out: out[fn] = { "filename": fn, "size": int(size_b or 0), "sha256": sha, "modified": None, } return sorted(out.values(), key=lambda x: x["filename"])
[docs] def delete_whole_file(schema: str, filename: str) -> None: """Delete a single file's rows from both whole-file tables. Removes ``filename`` from ``documents`` and ``source_files`` so a deleted file leaves no whole-file remnants behind; each table is only touched if it exists, making the call a safe no-op for chunk-only stores or stores that never created the tables. The chunk/vector rows for the file are deleted elsewhere by the caller — this handles only whole-file storage. Sanitises the schema via :func:`_qschema`, guards each table with :func:`table_exists`, and issues parameterised ``DELETE ... WHERE filename = %s`` statements on a connection from :func:`vector_store.get_sync_pool` (the side effect is the Postgres deletes). Called by ``rag_system.file_rag_manager`` when removing or replacing an indexed file. Args: schema (str): The store/schema to delete from. filename (str): Filename to remove from both whole-file tables. Returns: None """ s = _qschema(schema) pool = get_sync_pool() with pool.connection() as conn: if table_exists(schema, DOCUMENTS_TABLE): conn.execute( f'DELETE FROM "{s}"."{DOCUMENTS_TABLE}" WHERE filename = %s', (filename,), ) if table_exists(schema, SOURCE_FILES_TABLE): conn.execute( f'DELETE FROM "{s}"."{SOURCE_FILES_TABLE}" WHERE filename = %s', (filename,), )
[docs] def clear_source_tables(schema: str) -> None: """Truncate both whole-file tables, wiping all stored files for a store. Empties ``documents`` and ``source_files`` in one pass so a store can be fully reset without dropping its schema; each table is only truncated if it exists, so the call is a no-op for stores that never created them. This clears whole-file storage only — chunk/vector data is reset separately by the caller. Sanitises the schema via :func:`_qschema`, guards each table with :func:`table_exists`, and runs ``TRUNCATE TABLE`` on a connection from :func:`vector_store.get_sync_pool` (the side effect is the Postgres truncates). Called by ``rag_system.file_rag_manager`` when clearing or rebuilding a store. Args: schema (str): The store/schema whose whole-file tables to empty. Returns: None """ s = _qschema(schema) pool = get_sync_pool() with pool.connection() as conn: if table_exists(schema, DOCUMENTS_TABLE): conn.execute(f'TRUNCATE TABLE "{s}"."{DOCUMENTS_TABLE}"') if table_exists(schema, SOURCE_FILES_TABLE): conn.execute(f'TRUNCATE TABLE "{s}"."{SOURCE_FILES_TABLE}"')