rag_system.pg_source_files module

Postgres whole-file storage for File RAG stores.

Each file-RAG schema may contain:

  • documents — extracted text (PK filename)

  • source_files — raw bytes (PK filename)

Chunk vectors live in files_<schema> via vector_store.

rag_system.pg_source_files.source_tables_ddl(schema)[source]

Build the CREATE statements for a store’s whole-file tables.

Returns the ordered DDL that provisions the per-store schema plus the two whole-file tables (documents for extracted text keyed on filename, source_files for raw bytes keyed on filename) so the layout matches the migrated production schema. Nothing is executed here; it is a pure string builder. The schema portion of every statement is sanitised through _qschema() before interpolation, which is the only injection guard on the returned SQL.

Called by ensure_source_tables(), which iterates the list and runs each statement against the synchronous pool; there are no callers outside this module.

Parameters:

schema (str) – The store/schema name the tables belong to.

Returns:

Three DDL strings, in dependency order — create schema, create documents, create source_files — each safe to run with CREATE ... IF NOT EXISTS semantics.

Return type:

list[str]

rag_system.pg_source_files.ensure_source_tables(schema)[source]

Provision a store’s whole-file schema and tables if they do not exist.

Idempotently creates the per-store schema and its documents / source_files tables so subsequent upserts have somewhere to land. This is the one write-side bootstrap for whole-file storage; every other writer assumes the tables already exist.

Borrows a connection from the shared synchronous pool via vector_store.get_sync_pool() and executes each statement produced by source_tables_ddl() (the only side effect is the Postgres DDL). Called by upsert_whole_file() before it writes, by rag_system.file_rag_manager when a store is initialised, and by scripts/verify_pgvector_stores.py during store verification.

Parameters:

schema (str) – The store/schema name to provision.

Return type:

None

Returns:

None

rag_system.pg_source_files.table_exists(schema, table)[source]

Return True if table exists in schema per information_schema.

Sanitises the schema name through _qschema(), borrows a connection from the shared synchronous pool returned by vector_store.get_sync_pool(), and runs a parameterised SELECT EXISTS against information_schema.tables (both schema and table are passed as bind parameters, not interpolated). It performs a read-only catalog lookup with no side effects on the data tables.

This is the existence guard used before touching the whole-file tables: it is called by get_document_text(), get_source_file_bytes(), list_whole_files(), delete_whole_file() and clear_source_tables() so those operations no-op gracefully when a store has not yet created its documents / source_files tables.

Parameters:
  • schema (str) – The store schema to inspect.

  • table (str) – The table name to look for (e.g. documents or source_files).

Returns:

True if the table is present, False otherwise.

Return type:

bool

rag_system.pg_source_files.upsert_document(schema, filename, sha256, content, extraction_method='text')[source]

Upsert the extracted-text row for a file into the documents table.

Inserts (or, on a primary-key conflict, replaces) the documents row for filename, recording the content hash, the text itself, its character count, how it was extracted, and a fresh UTC extraction timestamp. This is the text half of whole-file storage; the raw-bytes half is handled by upsert_source_file().

Sanitises the schema via _qschema(), stamps extracted_at with the current UTC time, and runs a parameterised INSERT ... ON CONFLICT (filename) DO UPDATE on a connection from vector_store.get_sync_pool(); the sole side effect is the Postgres write. Assumes the table already exists (see ensure_source_tables()). Called by upsert_whole_file() (the only caller).

Parameters:
  • schema (str) – The store/schema whose documents table to write.

  • filename (str) – Primary-key filename for the row.

  • sha256 (str) – Hex SHA-256 of the source content, stored for change detection.

  • content (str) – Extracted plain text; its length is stored as char_count.

  • extraction_method (str) – How content was produced, e.g. "text" or "pdf".

Return type:

None

Returns:

None

rag_system.pg_source_files.upsert_source_file(schema, filename, raw_bytes, sha256)[source]

Upsert the raw-bytes row for a file into the source_files table.

Inserts (or, on a primary-key conflict, replaces) the source_files row for filename, storing the original bytes as bytea alongside their size and content hash. This is the byte half of whole-file storage that lets a search return the exact original file; the extracted-text half lives in upsert_document().

Sanitises the schema via _qschema() and runs a parameterised INSERT ... ON CONFLICT (filename) DO UPDATE on a connection from vector_store.get_sync_pool(); the only side effect is the Postgres write, and the table is assumed to exist already. Called by upsert_whole_file() (the only caller).

Parameters:
  • schema (str) – The store/schema whose source_files table to write.

  • filename (str) – Primary-key filename for the row.

  • raw_bytes (bytes) – Original file bytes; their length is stored as size_bytes.

  • sha256 (str) – Hex SHA-256 of raw_bytes, stored for change detection.

Return type:

None

Returns:

None

rag_system.pg_source_files.upsert_whole_file(schema, filename, sha256, content, raw_bytes, extraction_method='text')[source]

Persist both the extracted text and raw bytes of one file in one call.

The single public entry point for writing whole-file storage: it guarantees the tables exist, then writes the text and byte halves together so a store holds both a searchable documents row and a downloadable source_files row for filename. There is no transaction spanning the two writes — each upsert runs on its own pooled connection.

Calls ensure_source_tables() to provision the schema, then upsert_document() (text) and upsert_source_file() (bytes), all of which touch Postgres via vector_store.get_sync_pool(). Called by rag_system.file_rag_manager (its _upsert_whole_file wrapper, which first hashes the bytes) when whole-file storage is enabled for a store.

Parameters:
  • schema (str) – The store/schema to write into.

  • filename (str) – Primary-key filename shared by both rows.

  • sha256 (str) – Hex SHA-256 used for both the text and byte rows.

  • content (str) – Extracted plain text for the documents row.

  • raw_bytes (bytes) – Original file bytes for the source_files row.

  • extraction_method (str) – How content was produced, e.g. "text" or "pdf".

Return type:

None

Returns:

None

rag_system.pg_source_files.get_document_text(schema, filename)[source]

Fetch a file’s stored extracted text from the documents table.

Reads back the plain text previously written by upsert_document(), returning None when the store has no documents table yet, the file is absent, or its stored content is NULL — so callers can fall back to re-extraction gracefully. Read-only; no side effects on the data tables.

Guards on table_exists() first, then sanitises the schema via _qschema() and runs a parameterised SELECT content ... WHERE filename = %s on a connection from vector_store.get_sync_pool(). Called by rag_system.file_rag_manager when serving a whole-file result.

Parameters:
  • schema (str) – The store/schema to read from.

  • filename (str) – Filename whose extracted text is requested.

Returns:

The stored text, or None if the table is absent, the row is missing, or the content is NULL.

Return type:

str | None

rag_system.pg_source_files.get_source_file_bytes(schema, filename)[source]

Fetch a file’s stored raw bytes from the source_files table.

Reads back the original bytes previously written by upsert_source_file(), returning None when the source_files table is missing, the row is absent, or the stored content is NULL — which lets callers decide whether the exact original file is retrievable. Read-only; no side effects on the data tables.

Guards on table_exists() first, then sanitises the schema via _qschema() and runs a parameterised SELECT content ... WHERE filename = %s on a connection from vector_store.get_sync_pool(), coercing the returned bytea to a bytes object. Called by rag_system.file_rag_manager when serving the original file for a result.

Parameters:
  • schema (str) – The store/schema to read from.

  • filename (str) – Filename whose raw bytes are requested.

Returns:

The original file bytes, or None if the table is absent, the row is missing, or the content is NULL.

Return type:

bytes | None

rag_system.pg_source_files.list_whole_files(schema)[source]

List every whole file in a store with size, hash, and modified time.

Produces one record per filename for the store’s whole-file tables, preferring the richer documents rows (which carry char_count as the size and an extracted_at timestamp) and filling in any filenames that exist only in source_files (using size_bytes as the size and no timestamp). Files present in both tables are deduplicated by filename, with the documents entry winning. Read-only; no side effects.

Sanitises the schema via _qschema(), guards each table with table_exists(), and runs ordered SELECT queries on a connection from vector_store.get_sync_pool(). Called by rag_system.file_rag_manager to enumerate stored whole files.

Parameters:

schema (str) – The store/schema to enumerate.

Returns:

Records sorted by filename, each with filename, size, sha256, and modified (an ISO timestamp or None). Empty when neither whole-file table exists.

Return type:

list[dict[str, Any]]

rag_system.pg_source_files.delete_whole_file(schema, filename)[source]

Delete a single file’s rows from both whole-file tables.

Removes filename from documents and source_files so a deleted file leaves no whole-file remnants behind; each table is only touched if it exists, making the call a safe no-op for chunk-only stores or stores that never created the tables. The chunk/vector rows for the file are deleted elsewhere by the caller — this handles only whole-file storage.

Sanitises the schema via _qschema(), guards each table with table_exists(), and issues parameterised DELETE ... WHERE filename = %s statements on a connection from vector_store.get_sync_pool() (the side effect is the Postgres deletes). Called by rag_system.file_rag_manager when removing or replacing an indexed file.

Parameters:
  • schema (str) – The store/schema to delete from.

  • filename (str) – Filename to remove from both whole-file tables.

Return type:

None

Returns:

None

rag_system.pg_source_files.clear_source_tables(schema)[source]

Truncate both whole-file tables, wiping all stored files for a store.

Empties documents and source_files in one pass so a store can be fully reset without dropping its schema; each table is only truncated if it exists, so the call is a no-op for stores that never created them. This clears whole-file storage only — chunk/vector data is reset separately by the caller.

Sanitises the schema via _qschema(), guards each table with table_exists(), and runs TRUNCATE TABLE on a connection from vector_store.get_sync_pool() (the side effect is the Postgres truncates). Called by rag_system.file_rag_manager when clearing or rebuilding a store.

Parameters:

schema (str) – The store/schema whose whole-file tables to empty.

Return type:

None

Returns:

None