"""Postgres whole-file storage for File RAG stores.
Each file-RAG schema may contain:
* ``documents`` — extracted text (PK ``filename``)
* ``source_files`` — raw bytes (PK ``filename``)
Chunk vectors live in ``files_<schema>`` via :mod:`vector_store`.
"""
from __future__ import annotations
import logging
from datetime import datetime, timezone
from typing import Any
from vector_store import _validate_ident, get_sync_pool, pg_ident
logger = logging.getLogger(__name__)
DOCUMENTS_TABLE = "documents"
SOURCE_FILES_TABLE = "source_files"
def _qschema(schema: str) -> str:
"""Normalize and validate *schema* into a safe SQL identifier.
Runs the raw schema name through :func:`vector_store.pg_ident` to apply
the project's canonical identifier sanitisation, then through
:func:`vector_store._validate_ident` (labelled ``"schema"``) which
rejects anything that is not a legal, injection-safe identifier. The
returned string is what every statement in this module interpolates
between double quotes, so this is the single choke point guarding the
schema portion of all whole-file SQL against injection.
Called by essentially every function in this module that issues SQL,
including :func:`source_tables_ddl`, :func:`table_exists`,
:func:`upsert_document`, :func:`upsert_source_file`,
:func:`get_document_text`, :func:`get_source_file_bytes`,
:func:`list_whole_files`, :func:`delete_whole_file` and
:func:`clear_source_tables`.
Args:
schema (str): The raw store/schema name to sanitise.
Returns:
str: A validated, quote-ready schema identifier.
Raises:
ValueError: Propagated from :func:`vector_store._validate_ident` if
the sanitised name is not a valid identifier.
"""
return _validate_ident(pg_ident(schema), "schema")
[docs]
def source_tables_ddl(schema: str) -> list[str]:
"""Build the ``CREATE`` statements for a store's whole-file tables.
Returns the ordered DDL that provisions the per-store schema plus the two
whole-file tables (``documents`` for extracted text keyed on ``filename``,
``source_files`` for raw bytes keyed on ``filename``) so the layout matches
the migrated production schema. Nothing is executed here; it is a pure
string builder. The schema portion of every statement is sanitised through
:func:`_qschema` before interpolation, which is the only injection guard on
the returned SQL.
Called by :func:`ensure_source_tables`, which iterates the list and runs
each statement against the synchronous pool; there are no callers outside
this module.
Args:
schema (str): The store/schema name the tables belong to.
Returns:
list[str]: Three DDL strings, in dependency order — create schema,
create ``documents``, create ``source_files`` — each safe to run with
``CREATE ... IF NOT EXISTS`` semantics.
"""
s = _qschema(schema)
return [
f'CREATE SCHEMA IF NOT EXISTS "{s}"',
(
f'CREATE TABLE IF NOT EXISTS "{s}"."{DOCUMENTS_TABLE}" ('
"filename text PRIMARY KEY, "
"sha256 text, "
"content text, "
"char_count bigint, "
"extraction_method text, "
"extracted_at timestamptz)"
),
(
f'CREATE TABLE IF NOT EXISTS "{s}"."{SOURCE_FILES_TABLE}" ('
"filename text PRIMARY KEY, "
"size_bytes bigint, "
"sha256 text, "
"content bytea)"
),
]
[docs]
def ensure_source_tables(schema: str) -> None:
"""Provision a store's whole-file schema and tables if they do not exist.
Idempotently creates the per-store schema and its ``documents`` /
``source_files`` tables so subsequent upserts have somewhere to land. This
is the one write-side bootstrap for whole-file storage; every other writer
assumes the tables already exist.
Borrows a connection from the shared synchronous pool via
:func:`vector_store.get_sync_pool` and executes each statement produced by
:func:`source_tables_ddl` (the only side effect is the Postgres DDL).
Called by :func:`upsert_whole_file` before it writes, by
``rag_system.file_rag_manager`` when a store is initialised, and by
``scripts/verify_pgvector_stores.py`` during store verification.
Args:
schema (str): The store/schema name to provision.
Returns:
None
"""
pool = get_sync_pool()
with pool.connection() as conn:
for stmt in source_tables_ddl(schema):
conn.execute(stmt)
[docs]
def table_exists(schema: str, table: str) -> bool:
"""Return True if *table* exists in *schema* per ``information_schema``.
Sanitises the schema name through :func:`_qschema`, borrows a
connection from the shared synchronous pool returned by
:func:`vector_store.get_sync_pool`, and runs a parameterised
``SELECT EXISTS`` against ``information_schema.tables`` (both schema and
table are passed as bind parameters, not interpolated). It performs a
read-only catalog lookup with no side effects on the data tables.
This is the existence guard used before touching the whole-file tables:
it is called by :func:`get_document_text`, :func:`get_source_file_bytes`,
:func:`list_whole_files`, :func:`delete_whole_file` and
:func:`clear_source_tables` so those operations no-op gracefully when a
store has not yet created its ``documents`` / ``source_files`` tables.
Args:
schema (str): The store schema to inspect.
table (str): The table name to look for (e.g. ``documents`` or
``source_files``).
Returns:
bool: ``True`` if the table is present, ``False`` otherwise.
"""
s = _qschema(schema)
pool = get_sync_pool()
with pool.connection() as conn:
with conn.cursor() as cur:
cur.execute(
"""
SELECT EXISTS (
SELECT 1 FROM information_schema.tables
WHERE table_schema = %s AND table_name = %s
)
""",
(s, table),
)
return bool(cur.fetchone()[0])
[docs]
def upsert_document(
schema: str,
filename: str,
sha256: str,
content: str,
extraction_method: str = "text",
) -> None:
"""Upsert the extracted-text row for a file into the ``documents`` table.
Inserts (or, on a primary-key conflict, replaces) the ``documents`` row for
``filename``, recording the content hash, the text itself, its character
count, how it was extracted, and a fresh UTC extraction timestamp. This is
the text half of whole-file storage; the raw-bytes half is handled by
:func:`upsert_source_file`.
Sanitises the schema via :func:`_qschema`, stamps ``extracted_at`` with the
current UTC time, and runs a parameterised ``INSERT ... ON CONFLICT
(filename) DO UPDATE`` on a connection from
:func:`vector_store.get_sync_pool`; the sole side effect is the Postgres
write. Assumes the table already exists (see :func:`ensure_source_tables`).
Called by :func:`upsert_whole_file` (the only caller).
Args:
schema (str): The store/schema whose ``documents`` table to write.
filename (str): Primary-key filename for the row.
sha256 (str): Hex SHA-256 of the source content, stored for change
detection.
content (str): Extracted plain text; its length is stored as
``char_count``.
extraction_method (str): How ``content`` was produced, e.g. ``"text"``
or ``"pdf"``.
Returns:
None
"""
s = _qschema(schema)
now = datetime.now(timezone.utc)
pool = get_sync_pool()
with pool.connection() as conn:
conn.execute(
f"""
INSERT INTO "{s}"."{DOCUMENTS_TABLE}"
(filename, sha256, content, char_count, extraction_method, extracted_at)
VALUES (%s, %s, %s, %s, %s, %s)
ON CONFLICT (filename) DO UPDATE SET
sha256 = EXCLUDED.sha256,
content = EXCLUDED.content,
char_count = EXCLUDED.char_count,
extraction_method = EXCLUDED.extraction_method,
extracted_at = EXCLUDED.extracted_at
""",
(filename, sha256, content, len(content), extraction_method, now),
)
[docs]
def upsert_source_file(
schema: str,
filename: str,
raw_bytes: bytes,
sha256: str,
) -> None:
"""Upsert the raw-bytes row for a file into the ``source_files`` table.
Inserts (or, on a primary-key conflict, replaces) the ``source_files`` row
for ``filename``, storing the original bytes as ``bytea`` alongside their
size and content hash. This is the byte half of whole-file storage that
lets a search return the exact original file; the extracted-text half lives
in :func:`upsert_document`.
Sanitises the schema via :func:`_qschema` and runs a parameterised
``INSERT ... ON CONFLICT (filename) DO UPDATE`` on a connection from
:func:`vector_store.get_sync_pool`; the only side effect is the Postgres
write, and the table is assumed to exist already. Called by
:func:`upsert_whole_file` (the only caller).
Args:
schema (str): The store/schema whose ``source_files`` table to write.
filename (str): Primary-key filename for the row.
raw_bytes (bytes): Original file bytes; their length is stored as
``size_bytes``.
sha256 (str): Hex SHA-256 of ``raw_bytes``, stored for change
detection.
Returns:
None
"""
s = _qschema(schema)
pool = get_sync_pool()
with pool.connection() as conn:
conn.execute(
f"""
INSERT INTO "{s}"."{SOURCE_FILES_TABLE}"
(filename, size_bytes, sha256, content)
VALUES (%s, %s, %s, %s)
ON CONFLICT (filename) DO UPDATE SET
size_bytes = EXCLUDED.size_bytes,
sha256 = EXCLUDED.sha256,
content = EXCLUDED.content
""",
(filename, len(raw_bytes), sha256, raw_bytes),
)
[docs]
def upsert_whole_file(
schema: str,
filename: str,
sha256: str,
content: str,
raw_bytes: bytes,
extraction_method: str = "text",
) -> None:
"""Persist both the extracted text and raw bytes of one file in one call.
The single public entry point for writing whole-file storage: it guarantees
the tables exist, then writes the text and byte halves together so a store
holds both a searchable ``documents`` row and a downloadable
``source_files`` row for ``filename``. There is no transaction spanning the
two writes — each upsert runs on its own pooled connection.
Calls :func:`ensure_source_tables` to provision the schema, then
:func:`upsert_document` (text) and :func:`upsert_source_file` (bytes), all
of which touch Postgres via :func:`vector_store.get_sync_pool`. Called by
``rag_system.file_rag_manager`` (its ``_upsert_whole_file`` wrapper, which
first hashes the bytes) when whole-file storage is enabled for a store.
Args:
schema (str): The store/schema to write into.
filename (str): Primary-key filename shared by both rows.
sha256 (str): Hex SHA-256 used for both the text and byte rows.
content (str): Extracted plain text for the ``documents`` row.
raw_bytes (bytes): Original file bytes for the ``source_files`` row.
extraction_method (str): How ``content`` was produced, e.g. ``"text"``
or ``"pdf"``.
Returns:
None
"""
ensure_source_tables(schema)
upsert_document(schema, filename, sha256, content, extraction_method)
upsert_source_file(schema, filename, raw_bytes, sha256)
[docs]
def get_document_text(schema: str, filename: str) -> str | None:
"""Fetch a file's stored extracted text from the ``documents`` table.
Reads back the plain text previously written by :func:`upsert_document`,
returning ``None`` when the store has no ``documents`` table yet, the file
is absent, or its stored content is NULL — so callers can fall back to
re-extraction gracefully. Read-only; no side effects on the data tables.
Guards on :func:`table_exists` first, then sanitises the schema via
:func:`_qschema` and runs a parameterised ``SELECT content ... WHERE
filename = %s`` on a connection from :func:`vector_store.get_sync_pool`.
Called by ``rag_system.file_rag_manager`` when serving a whole-file result.
Args:
schema (str): The store/schema to read from.
filename (str): Filename whose extracted text is requested.
Returns:
str | None: The stored text, or ``None`` if the table is absent, the
row is missing, or the content is NULL.
"""
if not table_exists(schema, DOCUMENTS_TABLE):
return None
s = _qschema(schema)
pool = get_sync_pool()
with pool.connection() as conn:
with conn.cursor() as cur:
cur.execute(
f'SELECT content FROM "{s}"."{DOCUMENTS_TABLE}" WHERE filename = %s',
(filename,),
)
row = cur.fetchone()
if not row or row[0] is None:
return None
return str(row[0])
[docs]
def get_source_file_bytes(schema: str, filename: str) -> bytes | None:
"""Fetch a file's stored raw bytes from the ``source_files`` table.
Reads back the original bytes previously written by
:func:`upsert_source_file`, returning ``None`` when the ``source_files``
table is missing, the row is absent, or the stored content is NULL — which
lets callers decide whether the exact original file is retrievable.
Read-only; no side effects on the data tables.
Guards on :func:`table_exists` first, then sanitises the schema via
:func:`_qschema` and runs a parameterised ``SELECT content ... WHERE
filename = %s`` on a connection from :func:`vector_store.get_sync_pool`,
coercing the returned ``bytea`` to a ``bytes`` object. Called by
``rag_system.file_rag_manager`` when serving the original file for a result.
Args:
schema (str): The store/schema to read from.
filename (str): Filename whose raw bytes are requested.
Returns:
bytes | None: The original file bytes, or ``None`` if the table is
absent, the row is missing, or the content is NULL.
"""
if not table_exists(schema, SOURCE_FILES_TABLE):
return None
s = _qschema(schema)
pool = get_sync_pool()
with pool.connection() as conn:
with conn.cursor() as cur:
cur.execute(
f'SELECT content FROM "{s}"."{SOURCE_FILES_TABLE}" WHERE filename = %s',
(filename,),
)
row = cur.fetchone()
if not row or row[0] is None:
return None
return bytes(row[0])
[docs]
def list_whole_files(schema: str) -> list[dict[str, Any]]:
"""List every whole file in a store with size, hash, and modified time.
Produces one record per filename for the store's whole-file tables,
preferring the richer ``documents`` rows (which carry ``char_count`` as the
size and an ``extracted_at`` timestamp) and filling in any filenames that
exist only in ``source_files`` (using ``size_bytes`` as the size and no
timestamp). Files present in both tables are deduplicated by filename, with
the ``documents`` entry winning. Read-only; no side effects.
Sanitises the schema via :func:`_qschema`, guards each table with
:func:`table_exists`, and runs ordered ``SELECT`` queries on a connection
from :func:`vector_store.get_sync_pool`. Called by
``rag_system.file_rag_manager`` to enumerate stored whole files.
Args:
schema (str): The store/schema to enumerate.
Returns:
list[dict[str, Any]]: Records sorted by filename, each with
``filename``, ``size``, ``sha256``, and ``modified`` (an ISO timestamp
or ``None``). Empty when neither whole-file table exists.
"""
s = _qschema(schema)
out: dict[str, dict[str, Any]] = {}
pool = get_sync_pool()
with pool.connection() as conn:
with conn.cursor() as cur:
if table_exists(schema, DOCUMENTS_TABLE):
cur.execute(
f"""
SELECT filename, char_count, sha256, extracted_at
FROM "{s}"."{DOCUMENTS_TABLE}"
ORDER BY filename
"""
)
for fn, cc, sha, ext_at in cur.fetchall():
out[fn] = {
"filename": fn,
"size": int(cc or 0),
"sha256": sha,
"modified": ext_at.isoformat() if ext_at else None,
}
if table_exists(schema, SOURCE_FILES_TABLE):
cur.execute(
f"""
SELECT filename, size_bytes, sha256
FROM "{s}"."{SOURCE_FILES_TABLE}"
ORDER BY filename
"""
)
for fn, size_b, sha in cur.fetchall():
if fn not in out:
out[fn] = {
"filename": fn,
"size": int(size_b or 0),
"sha256": sha,
"modified": None,
}
return sorted(out.values(), key=lambda x: x["filename"])
[docs]
def delete_whole_file(schema: str, filename: str) -> None:
"""Delete a single file's rows from both whole-file tables.
Removes ``filename`` from ``documents`` and ``source_files`` so a deleted
file leaves no whole-file remnants behind; each table is only touched if it
exists, making the call a safe no-op for chunk-only stores or stores that
never created the tables. The chunk/vector rows for the file are deleted
elsewhere by the caller — this handles only whole-file storage.
Sanitises the schema via :func:`_qschema`, guards each table with
:func:`table_exists`, and issues parameterised ``DELETE ... WHERE filename
= %s`` statements on a connection from :func:`vector_store.get_sync_pool`
(the side effect is the Postgres deletes). Called by
``rag_system.file_rag_manager`` when removing or replacing an indexed file.
Args:
schema (str): The store/schema to delete from.
filename (str): Filename to remove from both whole-file tables.
Returns:
None
"""
s = _qschema(schema)
pool = get_sync_pool()
with pool.connection() as conn:
if table_exists(schema, DOCUMENTS_TABLE):
conn.execute(
f'DELETE FROM "{s}"."{DOCUMENTS_TABLE}" WHERE filename = %s',
(filename,),
)
if table_exists(schema, SOURCE_FILES_TABLE):
conn.execute(
f'DELETE FROM "{s}"."{SOURCE_FILES_TABLE}" WHERE filename = %s',
(filename,),
)
[docs]
def clear_source_tables(schema: str) -> None:
"""Truncate both whole-file tables, wiping all stored files for a store.
Empties ``documents`` and ``source_files`` in one pass so a store can be
fully reset without dropping its schema; each table is only truncated if it
exists, so the call is a no-op for stores that never created them. This
clears whole-file storage only — chunk/vector data is reset separately by
the caller.
Sanitises the schema via :func:`_qschema`, guards each table with
:func:`table_exists`, and runs ``TRUNCATE TABLE`` on a connection from
:func:`vector_store.get_sync_pool` (the side effect is the Postgres
truncates). Called by ``rag_system.file_rag_manager`` when clearing or
rebuilding a store.
Args:
schema (str): The store/schema whose whole-file tables to empty.
Returns:
None
"""
s = _qschema(schema)
pool = get_sync_pool()
with pool.connection() as conn:
if table_exists(schema, DOCUMENTS_TABLE):
conn.execute(f'TRUNCATE TABLE "{s}"."{DOCUMENTS_TABLE}"')
if table_exists(schema, SOURCE_FILES_TABLE):
conn.execute(f'TRUNCATE TABLE "{s}"."{SOURCE_FILES_TABLE}"')