vector_store

Shared Postgres + pgvector store layer.

This module replaces the legacy chroma_registry / on-disk ChromaDB PersistentClient model. All vector storage now lives in a single Postgres database (stargazer_filerag) using one schema per logical store and one table per collection with the shape:

id        text PRIMARY KEY
document  text
metadata  jsonb
embedding halfvec(3072)        -- HNSW index, halfvec_l2_ops (L2)

Embeddings are always supplied by the caller (computed via the shared Gemini embed pool); this layer never embeds implicitly the way a ChromaDB embedding_function did.

Two drivers are provided because the codebase mixes async and sync call sites:

  • AsyncPgVectorCollection (asyncpg) for genuinely-async hot paths running on the bot event loop (e.g. prompt_context._divine_reflex).

  • PgVectorCollection (psycopg3, thread-safe pool) for synchronous code: FileRAGManager worker threads, CLIs, and scripts/.

Both expose the same Chroma-like surface: ensure / upsert / query / get / delete / count / clear.

Built by Stargazer Project.

vector_store.EMBED_DIM = 3072

All migrated stores use Gemini gemini-embedding-001 at 3072 dims.

vector_store.configure(**overrides)[source]

Override the resolved Postgres connection parameters at runtime.

Seeds the module-level _conn_params cache from _resolve_params() (env plus defaults) and then layers the given overrides on top, so every subsequent pool built by get_async_pool() or get_sync_pool() uses them. Mutates shared module state under _lock; accepts host, port, database, user, password, sslmode, min_size, max_size, or a full dsn string. Empty/None values are ignored so callers can pass through optional config fields without clobbering defaults. Called by configure_from_config(); no external callers were found by grep.

Parameters:

**overrides (Any) – Any of the connection-param keys above, or dsn.

Return type:

None

vector_store.configure_from_config(config)[source]

Apply a Config object’s vector-store fields to the connection params.

Pulls the vector_pg_* attributes off config (via getattr with a None default, so missing fields are simply skipped) and forwards them to configure(), which ignores the None values. Intended as a startup hook so a service’s loaded Config can point this module at the right Postgres without touching the environment. No callers were found by grep, so it is invoked (if at all) by service bootstrap code.

Parameters:

config (Any) – A config object exposing optional vector_pg_dsn / vector_pg_host / vector_pg_port / vector_pg_database / vector_pg_user / vector_pg_password / vector_pg_sslmode / vector_pg_min_size / vector_pg_max_size attributes, or None (a no-op).

Return type:

None

vector_store.pg_ident(name)[source]

Coerce an arbitrary string into a safe Postgres identifier.

Unlike _validate_ident() (which rejects unsafe input), this sanitises: it mirrors the underscore-only convention used by the migration (e.g. us_law, files_us_law). Non-alphanumeric characters collapse to a single _, an empty result becomes store, a leading digit is prefixed with s_, and the result is truncated to Postgres’s 63-char identifier limit. Pure string helper with no I/O.

Used across the repo to derive schema/table names from store paths and collection names: anamnesis_engine, extract_tags_to_concepts, backfill_entity_provenance, rag_system/file_rag_manager, rag_system/pg_source_files (which then re-validates via _validate_ident()), tools/chromadb_tools, and scripts/migrate_vector_dbs_to_pgvector.

Parameters:

name (str) – The raw name to sanitise (e.g. a store directory or collection).

Returns:

A Postgres-safe identifier of at most 63 characters.

Return type:

str

vector_store.vector_literal(embedding)[source]

Render an embedding vector as a pgvector text literal.

Produces the bracketed comma-separated form ([v1,v2,...]) that pgvector’s halfvec/vector input parser accepts, coercing each element to float and using repr so full precision survives the round-trip. Pure helper with no I/O. Called wherever a vector is bound into SQL within this module: _BaseCollection._upsert_rows() (per upserted row) and the query methods of both PgVectorCollection and AsyncPgVectorCollection, where it fills the reserved None placeholder.

Parameters:

embedding (Sequence[float]) – The numeric embedding vector.

Returns:

A pgvector text literal of the form [v1,v2,...].

Return type:

str

vector_store.l2_to_similarity(distance)[source]

Convert an L2 distance to a cosine-style similarity in [0, 1].

Because Gemini gemini-embedding-001 at 3072 dims is pre-normalised, for unit vectors L2^2 = 2 - 2*cos, so cos = 1 - L2^2/2; the result is clamped into [0, 1]. This keeps the reported similarity aligned with the old Chroma 1 - distance convention. Pure helper with no I/O. Called by the query methods of both PgVectorCollection and AsyncPgVectorCollection to attach a similarity to each hit.

Parameters:

distance (float | None) – The raw L2 distance from the pgvector <-> operator, or None.

Returns:

The similarity in [0, 1], or None when distance is None.

Return type:

float | None

async vector_store.get_async_pool()[source]

Return the asyncpg pool for the current event loop, creating it lazily.

Pools are cached per running event loop (keyed by id(loop) in the module-level _async_pools dict) because asyncpg pools are bound to the loop that created them; this matters in the multi-service bot where worker threads run their own loops. On a cache miss it double-checks under _async_pool_lock then opens an asyncpg.create_pool sized by _pool_sizes() and configured by _asyncpg_kwargs(), opening real Postgres connections and logging the creation. Awaited by warm_async_pool(), close_async_pool(), the ensure/upsert/ query/get/delete/count methods of AsyncPgVectorCollection, and external async callers such as sword/overlay and tools/chromadb_tools.

Returns:

The pool bound to the current event loop.

Return type:

asyncpg.Pool

async vector_store.warm_async_pool()[source]

Warm the asyncpg pool and initialise the SWORD overlay cache schema.

A startup hook that establishes the per-loop pool via get_async_pool(), runs SELECT 1 to confirm connectivity, and then idempotently creates the sword_cache schema and its overlay_cache table (used by sword/overlay) so the first real overlay write does not pay DDL latency. Acquires a pooled connection and issues DDL/DML; logs and re-raises on failure so a broken database surfaces at boot. Awaited by prompt_context during startup and by the SWORD overlay Postgres tests.

Raises:

Exception – If the SWORD cache schema/table cannot be created (the SELECT 1 connectivity error propagates likewise).

Return type:

None

async vector_store.close_async_pool()[source]

Close and forget the asyncpg pool bound to the current event loop.

Pops the pool keyed by the running loop’s id out of the module-level _async_pools cache and awaits its close(); a no-op if no pool exists for this loop. Intended for orderly shutdown of a worker’s event loop. No internal callers were found in this module, and no external callers were found by grep, so it is invoked (if at all) by service teardown code.

Raises:

RuntimeError – If called outside a running event loop (from asyncio.get_running_loop).

Return type:

None

vector_store.get_sync_pool()[source]

Return the shared psycopg3 connection pool, creating it lazily.

Unlike the per-loop async pools, a single process-wide thread-safe ConnectionPool serves all synchronous callers (worker threads, CLIs, scripts). On first use it double-checks under _sync_pool_lock then opens an autocommit pool with conninfo from _psycopg_conninfo(), sized by _pool_sizes(), opening real Postgres connections and logging the creation. Called by warm_sync_pool(), list_store_schemas(), table_count(), every method of PgVectorCollection, and many external sync paths (rag_system/pg_source_files, rag_system/file_rag_manager, sword/overlay, and scripts/).

Returns:

The shared autocommit pool.

Return type:

psycopg_pool.ConnectionPool

vector_store.warm_sync_pool()[source]

Open the synchronous psycopg3 pool and verify connectivity.

Calls get_sync_pool() to lazily create the shared pool, then runs a trivial SELECT 1 on a borrowed connection so the first real query doesn’t pay connection-setup latency. Mirrors warm_async_pool() for sync call sites (worker threads, CLIs). No internal or external callers were found by grep; it is a startup-warm entry point for synchronous services.

Raises:

Exception – Propagates any psycopg connection/query error if the database is unreachable.

Return type:

None

class vector_store.PgVectorCollection(schema, table, dim=3072)[source]

Bases: _BaseCollection

Synchronous pgvector collection backed by the shared psycopg3 pool.

Concrete _BaseCollection for blocking call sites — worker threads, CLIs, and scripts/ — that cannot await. Each method borrows a connection from get_sync_pool() and runs the driver-neutral SQL the base class builds (with psycopg placeholders). Exposes the Chroma-like surface ensure / upsert / query / get / delete / count / clear / drop; embeddings are always supplied by the caller. Wrapped by ChromaCompatCollection for auto-embedding call sites and instantiated directly by anamnesis_engine, extract_tags_to_concepts, backfill_entity_provenance, memory_search, and rag_system/file_rag_manager.

Parameters:
ensure()[source]

Create the collection’s schema, table, and HNSW index if absent.

Borrows a connection from the synchronous pool (get_sync_pool()) and runs each idempotent DDL statement from _ensure_sql(). Safe to call repeatedly. Used directly (e.g. by rag_system/file_rag_manager and verification scripts) and internally by clear().

Raises:

Exception – Propagates any psycopg error if the DDL fails.

Return type:

None

upsert(ids, documents, metadatas=None, embeddings=None)[source]

Insert or replace rows by primary key, requiring explicit embeddings.

Shapes the parallel lists via _upsert_rows(), builds the upsert SQL with _upsert_sql(), and runs it with cursor.executemany on a pooled (autocommit) connection so each id is inserted or fully updated. Returns early when ids is empty. Unlike ChromaCompatCollection, this layer never embeds implicitly — callers supply Gemini embeddings.

Reached from synchronous indexing paths such as extract_tags_to_concepts and backfill_entity_provenance, which instantiate the collection and call upsert directly.

Parameters:
Raises:

ValueError – If embeddings is None.

Return type:

None

query(embedding, n_results=5, where=None, where_document=None)[source]

Run a synchronous nearest-neighbour search and return scored hits.

Builds the query via _query_sql(), substitutes the rendered vector_literal() into the reserved None placeholders, executes it on a pooled connection, and maps each row to a dict including the raw L2 distance and a derived similarity from l2_to_similarity().

Used by synchronous search paths (e.g. rag_system/file_rag_manager via ChromaCompatCollection, and direct callers like anamnesis_engine).

Parameters:
  • embedding (Sequence[float]) – The query embedding vector.

  • n_results (int) – Maximum neighbours to return.

  • where (Any) – Chroma-style metadata filter, or None.

  • where_document (Any) – Chroma-style document filter, or None.

Returns:

Per-hit dicts with id, document, metadata, distance, and similarity, nearest first.

Return type:

list[dict[str, Any]]

get(ids=None, where=None, where_document=None, offset=None, limit=None)[source]

Fetch rows by id and/or filter (no vector search) synchronously.

Builds the statement with _get_sql(), runs it on a pooled connection, and reshapes the rows via _rows_to_get(). Surfaced to Chroma-style callers through ChromaCompatCollection.get().

Parameters:
  • ids (Optional[Sequence[str]]) – Ids to restrict to, or None for all matching rows.

  • where (Any) – Chroma-style metadata filter, or None.

  • where_document (Any) – Chroma-style document filter, or None.

  • offset (int | None) – Row offset to skip, or None.

  • limit (int | None) – Maximum rows to return, or None.

Returns:

{"ids", "documents", "metadatas"} aligned lists.

Return type:

dict[str, list]

delete(ids=None, where=None)[source]

Delete rows by id and/or metadata filter synchronously.

No-ops when neither ids nor where is supplied (guarding against an accidental full-table delete), otherwise builds the statement via _delete_sql() and runs it on a pooled connection. Surfaced through ChromaCompatCollection.delete().

Parameters:
  • ids (Optional[Sequence[str]]) – Ids to delete, or None.

  • where (Any) – Chroma-style metadata filter, or None.

Return type:

None

count()[source]

Return the total number of rows in the collection.

Runs SELECT count(*) on a pooled connection. Surfaced through ChromaCompatCollection.count().

Returns:

The row count (0 if the query yields no row).

Return type:

int

clear()[source]

Empty the collection, creating it first if necessary.

Calls ensure() so the TRUNCATE cannot fail on a missing table, then truncates on a pooled connection. Data is removed but the table and index remain.

Return type:

None

drop()[source]

Drop the collection’s entire schema and everything in it.

Runs DROP SCHEMA IF EXISTS CASCADE on a pooled connection, destroying every table/index under this store’s schema (not just this one table). Irreversible; intended for teardown/reset tooling.

Return type:

None

class vector_store.ChromaCompatCollection(pg, embedding_fn)[source]

Bases: object

Chroma-compatible sync facade over PgVectorCollection.

Auto-embeds documents/queries via embedding_fn (e.g. SyncOpenRouterEmbeddings) so call sites that relied on Chroma’s embedding_function keep working with minimal changes.

query returns Chroma-shaped nested lists and reports distances as cosine distance (1 - similarity), so callers that compute 1 - distance recover a cosine similarity in [0, 1] exactly as they did against Chroma.

Parameters:
__init__(pg, embedding_fn)[source]

Wrap a PgVectorCollection with an auto-embedding facade.

Stores the backing collection and an embedder. Instantiated by rag_system/file_rag_manager (self.collection = ChromaCompatCollection(self._pg, self.embedding_fn)) so existing Chroma-style call sites keep working over pgvector.

Parameters:
  • pg (PgVectorCollection) – The synchronous pgvector collection doing the real I/O.

  • embedding_fn (Any) – An embedder exposing either embed_documents / embed_query methods or a plain callable taking a list of texts (e.g. SyncOpenRouterEmbeddings).

ensure()[source]

Ensure the backing collection’s schema/table/index exist.

Thin pass-through to PgVectorCollection.ensure().

Return type:

None

upsert(ids, documents, metadatas=None, embeddings=None)[source]

Upsert documents, auto-embedding them when no vectors are given.

Computes embeddings via _embed_documents() if embeddings is None (the Chroma embedding_function behaviour), then delegates to PgVectorCollection.upsert().

Parameters:
Return type:

None

add(ids, documents, metadatas=None, embeddings=None)[source]

Alias for upsert(), matching Chroma’s add API.

Forwards all arguments to upsert() so call sites using Chroma’s collection.add semantics keep working.

Parameters:
Return type:

None

get(ids=None, where=None, where_document=None, limit=None, offset=None, include=None)[source]

Fetch rows by id/filter, mirroring Chroma’s get signature.

Delegates to PgVectorCollection.get(). The Chroma-only include argument is accepted for signature compatibility but ignored (this layer always returns ids/documents/metadatas).

Parameters:
  • ids (Optional[Sequence[str]]) – Ids to restrict to, or None.

  • where (Any) – Chroma-style metadata filter, or None.

  • where_document (Any) – Chroma-style document filter, or None.

  • limit (int | None) – Maximum rows to return, or None.

  • offset (int | None) – Row offset to skip, or None.

  • include (Any) – Accepted for Chroma compatibility; ignored.

Returns:

{"ids", "documents", "metadatas"} lists.

Return type:

dict[str, list]

delete(ids=None, where=None)[source]

Delete rows by id and/or metadata filter.

Thin pass-through to PgVectorCollection.delete().

Parameters:
  • ids (Optional[Sequence[str]]) – Ids to delete, or None.

  • where (Any) – Chroma-style metadata filter, or None.

Return type:

None

count()[source]

Return the collection’s row count via PgVectorCollection.count().

Returns:

Total number of stored rows.

Return type:

int

query(query_embeddings=None, query_texts=None, n_results=10, where=None, where_document=None, include=None)[source]

Run a nearest-neighbour search returning Chroma-shaped nested lists.

Resolves the query vector from query_embeddings (unwrapping a list-of-lists batch to its first row) or by embedding query_texts[0] via _embed_query(); with neither it returns the empty Chroma shape. Delegates the search to PgVectorCollection.query(), then repackages results into single-batch nested lists and converts each similarity back to a cosine distance (1 - similarity) so callers computing 1 - distance recover the similarity exactly as they did against Chroma. The include argument is ignored.

Parameters:
  • query_embeddings (Any) – A query vector or a one-element batch of vectors, or None.

  • query_texts (Any) – A list of query strings (only the first is used), or None.

  • n_results (int) – Maximum neighbours to return.

  • where (Any) – Chroma-style metadata filter, or None.

  • where_document (Any) – Chroma-style document filter, or None.

  • include (Any) – Accepted for Chroma compatibility; ignored.

Returns:

{"ids", "documents", "metadatas", "distances"} each wrapped in a single-query outer list; distances are cosine distances. Returns empty single-batch lists when no query is given.

Return type:

dict[str, list]

class vector_store.AsyncPgVectorCollection(schema, table, dim=3072)[source]

Bases: _BaseCollection

Asynchronous pgvector collection backed by the per-loop asyncpg pool.

Concrete _BaseCollection for genuinely-async hot paths running on the bot event loop. Each coroutine acquires a connection from get_async_pool() and runs the driver-neutral SQL the base class builds (with asyncpg numbered placeholders). Exposes the async Chroma-like surface ensure / upsert / query / get / delete / count / clear; embeddings are always supplied by the caller. Drives async paths such as prompt_context._divine_reflex (Golden Goddess reflex), tools/query_golden_goddess_v2, and tools/chromadb_tools.

Parameters:
async ensure()[source]

Create the schema, table, and HNSW index for this collection (async).

Async counterpart of PgVectorCollection.ensure(): runs the idempotent DDL from _ensure_sql() on a connection acquired from get_async_pool(). Called by tools/chromadb_tools before upserting into a vector DB, and internally by clear().

Return type:

None

async upsert(ids, documents, metadatas=None, embeddings=None)[source]

Insert or update rows on the async (asyncpg) path.

No-ops on empty ids; otherwise rows are built by _upsert_rows(), the SQL by _upsert_sql(), and executed with executemany on a pooled connection from get_async_pool(). Used by async ingestion code such as tools/chromadb_tools.

Parameters:
Raises:

ValueError – If embeddings is None.

Return type:

None

async query(embedding, n_results=5, where=None, where_document=None)[source]

Run an async nearest-neighbour search and return scored hits.

Builds the SQL via _query_sql(), fills the reserved vector slots with the rendered vector_literal(), fetches rows on a pooled connection from get_async_pool(), and maps each row to a dict with the raw L2 distance and a similarity from l2_to_similarity().

Drives async hot paths such as prompt_context._divine_reflex (Golden Goddess reflex), tools/query_golden_goddess_v2, and tools/chromadb_tools.

Parameters:
  • embedding (Sequence[float]) – The query embedding vector.

  • n_results (int) – Maximum number of neighbours to return.

  • where (Any) – Chroma-style metadata filter dict, or None.

  • where_document (Any) – Chroma-style document filter dict, or None.

Returns:

Per-hit dicts with id, document, metadata, distance, and similarity, nearest first.

Return type:

list[dict[str, Any]]

async get(ids=None, where=None, where_document=None, offset=None, limit=None)[source]

Fetch rows by id and/or filters on the async path.

Builds the statement via _get_sql(), fetches rows on a pooled connection from get_async_pool(), then normalises the asyncpg Record rows into Chroma-style parallel lists via _rows_to_get(). Used by async tools like tools/chromadb_tools.

Parameters:
  • ids (Optional[Sequence[str]]) – Specific ids to fetch, or None.

  • where (Any) – Chroma-style metadata filter dict, or None.

  • where_document (Any) – Chroma-style document filter dict, or None.

  • offset (int | None) – Row offset for pagination, or None.

  • limit (int | None) – Maximum rows to return, or None.

Returns:

Parallel ids / documents / metadatas lists.

Return type:

dict[str, list]

async delete(ids=None, where=None)[source]

Delete rows by id list and/or metadata filter (async).

No-ops when neither ids nor where is given. Otherwise builds the statement via _delete_sql() and executes it on a pooled connection from get_async_pool(). Used by async maintenance code such as tools/chromadb_tools.

Parameters:
  • ids (Optional[Sequence[str]]) – Ids to delete, or None.

  • where (Any) – Chroma-style metadata filter selecting rows to delete, or None.

Return type:

None

async count()[source]

Return the collection’s row count (async).

Runs SELECT count(*) via fetchval on a pooled connection from get_async_pool(). Used by async tools such as tools/chromadb_tools.

Returns:

The row count, or 0 when the value is None.

Return type:

int

async clear()[source]

Empty the collection on the async path, ensuring it exists first.

Calls ensure() then TRUNCATEs the table on a pooled connection from get_async_pool(); the schema and HNSW index are preserved. No internal or grep-found external callers; invoked ad hoc by async maintenance code.

Return type:

None

vector_store.list_store_schemas()[source]

List the user (non-system) schemas in the vector database.

Replaces the old filesystem listing of rag_stores/ directories: it queries information_schema.schemata on the synchronous pool (get_sync_pool()), excluding Postgres’s own schemas (pg_catalog, information_schema, public, pg_toast, and the pg_temp_* / pg_toast_temp_* families) so only logical store schemas remain. No callers were found by grep, so it is used (if at all) by introspection tooling or scripts.

Returns:

Candidate store schema names, ordered alphabetically.

Return type:

list[str]

vector_store.table_count(schema, table)[source]

Return the row count for a given schema.table, or 0 if absent.

Validates both identifiers with _validate_ident() (since the table name is interpolated into the count(*) query), then first checks information_schema.tables to confirm the table exists before counting, so a missing collection yields 0 rather than raising. Runs on the synchronous pool from get_sync_pool(). No callers were found by grep, so it is used (if at all) by introspection/verification tooling.

Parameters:
  • schema (str) – The Postgres schema name (validated).

  • table (str) – The Postgres table name (validated).

Returns:

The row count, or 0 when the table does not exist.

Return type:

int

Raises:

ValueError – If schema or table is not a safe identifier.