vector_store
Shared Postgres + pgvector store layer.
This module replaces the legacy chroma_registry / on-disk ChromaDB
PersistentClient model. All vector storage now lives in a single
Postgres database (stargazer_filerag) using one schema per logical
store and one table per collection with the shape:
id text PRIMARY KEY
document text
metadata jsonb
embedding halfvec(3072) -- HNSW index, halfvec_l2_ops (L2)
Embeddings are always supplied by the caller (computed via the shared
Gemini embed pool); this layer never embeds implicitly the way a ChromaDB
embedding_function did.
Two drivers are provided because the codebase mixes async and sync call sites:
AsyncPgVectorCollection(asyncpg) for genuinely-async hot paths running on the bot event loop (e.g.prompt_context._divine_reflex).PgVectorCollection(psycopg3, thread-safe pool) for synchronous code:FileRAGManagerworker threads, CLIs, andscripts/.
Both expose the same Chroma-like surface: ensure / upsert /
query / get / delete / count / clear.
Built by Stargazer Project.
- vector_store.EMBED_DIM = 3072
All migrated stores use Gemini
gemini-embedding-001at 3072 dims.
- vector_store.configure(**overrides)[source]
Override the resolved Postgres connection parameters at runtime.
Seeds the module-level
_conn_paramscache from_resolve_params()(env plus defaults) and then layers the given overrides on top, so every subsequent pool built byget_async_pool()orget_sync_pool()uses them. Mutates shared module state under_lock; acceptshost,port,database,user,password,sslmode,min_size,max_size, or a fulldsnstring. Empty/None values are ignored so callers can pass through optional config fields without clobbering defaults. Called byconfigure_from_config(); no external callers were found by grep.
- vector_store.configure_from_config(config)[source]
Apply a
Configobject’s vector-store fields to the connection params.Pulls the
vector_pg_*attributes off config (viagetattrwith aNonedefault, so missing fields are simply skipped) and forwards them toconfigure(), which ignores theNonevalues. Intended as a startup hook so a service’s loadedConfigcan point this module at the right Postgres without touching the environment. No callers were found by grep, so it is invoked (if at all) by service bootstrap code.
- vector_store.pg_ident(name)[source]
Coerce an arbitrary string into a safe Postgres identifier.
Unlike
_validate_ident()(which rejects unsafe input), this sanitises: it mirrors the underscore-only convention used by the migration (e.g.us_law,files_us_law). Non-alphanumeric characters collapse to a single_, an empty result becomesstore, a leading digit is prefixed withs_, and the result is truncated to Postgres’s 63-char identifier limit. Pure string helper with no I/O.Used across the repo to derive schema/table names from store paths and collection names:
anamnesis_engine,extract_tags_to_concepts,backfill_entity_provenance,rag_system/file_rag_manager,rag_system/pg_source_files(which then re-validates via_validate_ident()),tools/chromadb_tools, andscripts/migrate_vector_dbs_to_pgvector.
- vector_store.vector_literal(embedding)[source]
Render an embedding vector as a pgvector text literal.
Produces the bracketed comma-separated form (
[v1,v2,...]) that pgvector’shalfvec/vectorinput parser accepts, coercing each element tofloatand usingreprso full precision survives the round-trip. Pure helper with no I/O. Called wherever a vector is bound into SQL within this module:_BaseCollection._upsert_rows()(per upserted row) and thequerymethods of bothPgVectorCollectionandAsyncPgVectorCollection, where it fills the reservedNoneplaceholder.
- vector_store.l2_to_similarity(distance)[source]
Convert an L2 distance to a cosine-style similarity in
[0, 1].Because Gemini
gemini-embedding-001at 3072 dims is pre-normalised, for unit vectorsL2^2 = 2 - 2*cos, socos = 1 - L2^2/2; the result is clamped into[0, 1]. This keeps the reportedsimilarityaligned with the old Chroma1 - distanceconvention. Pure helper with no I/O. Called by thequerymethods of bothPgVectorCollectionandAsyncPgVectorCollectionto attach asimilarityto each hit.
- async vector_store.get_async_pool()[source]
Return the asyncpg pool for the current event loop, creating it lazily.
Pools are cached per running event loop (keyed by
id(loop)in the module-level_async_poolsdict) because asyncpg pools are bound to the loop that created them; this matters in the multi-service bot where worker threads run their own loops. On a cache miss it double-checks under_async_pool_lockthen opens anasyncpg.create_poolsized by_pool_sizes()and configured by_asyncpg_kwargs(), opening real Postgres connections and logging the creation. Awaited bywarm_async_pool(),close_async_pool(), theensure/upsert/query/get/delete/countmethods ofAsyncPgVectorCollection, and external async callers such assword/overlayandtools/chromadb_tools.- Returns:
The pool bound to the current event loop.
- Return type:
asyncpg.Pool
- async vector_store.warm_async_pool()[source]
Warm the asyncpg pool and initialise the SWORD overlay cache schema.
A startup hook that establishes the per-loop pool via
get_async_pool(), runsSELECT 1to confirm connectivity, and then idempotently creates thesword_cacheschema and itsoverlay_cachetable (used bysword/overlay) so the first real overlay write does not pay DDL latency. Acquires a pooled connection and issues DDL/DML; logs and re-raises on failure so a broken database surfaces at boot. Awaited byprompt_contextduring startup and by the SWORD overlay Postgres tests.
- async vector_store.close_async_pool()[source]
Close and forget the asyncpg pool bound to the current event loop.
Pops the pool keyed by the running loop’s id out of the module-level
_async_poolscache and awaits itsclose(); a no-op if no pool exists for this loop. Intended for orderly shutdown of a worker’s event loop. No internal callers were found in this module, and no external callers were found by grep, so it is invoked (if at all) by service teardown code.- Raises:
RuntimeError – If called outside a running event loop (from
asyncio.get_running_loop).- Return type:
- vector_store.get_sync_pool()[source]
Return the shared psycopg3 connection pool, creating it lazily.
Unlike the per-loop async pools, a single process-wide thread-safe
ConnectionPoolserves all synchronous callers (worker threads, CLIs, scripts). On first use it double-checks under_sync_pool_lockthen opens an autocommit pool with conninfo from_psycopg_conninfo(), sized by_pool_sizes(), opening real Postgres connections and logging the creation. Called bywarm_sync_pool(),list_store_schemas(),table_count(), every method ofPgVectorCollection, and many external sync paths (rag_system/pg_source_files,rag_system/file_rag_manager,sword/overlay, andscripts/).- Returns:
The shared autocommit pool.
- Return type:
psycopg_pool.ConnectionPool
- vector_store.warm_sync_pool()[source]
Open the synchronous psycopg3 pool and verify connectivity.
Calls
get_sync_pool()to lazily create the shared pool, then runs a trivialSELECT 1on a borrowed connection so the first real query doesn’t pay connection-setup latency. Mirrorswarm_async_pool()for sync call sites (worker threads, CLIs). No internal or external callers were found by grep; it is a startup-warm entry point for synchronous services.
- class vector_store.PgVectorCollection(schema, table, dim=3072)[source]
Bases:
_BaseCollectionSynchronous pgvector collection backed by the shared psycopg3 pool.
Concrete
_BaseCollectionfor blocking call sites — worker threads, CLIs, andscripts/— that cannot await. Each method borrows a connection fromget_sync_pool()and runs the driver-neutral SQL the base class builds (withpsycopgplaceholders). Exposes the Chroma-like surfaceensure/upsert/query/get/delete/count/clear/drop; embeddings are always supplied by the caller. Wrapped byChromaCompatCollectionfor auto-embedding call sites and instantiated directly byanamnesis_engine,extract_tags_to_concepts,backfill_entity_provenance,memory_search, andrag_system/file_rag_manager.- ensure()[source]
Create the collection’s schema, table, and HNSW index if absent.
Borrows a connection from the synchronous pool (
get_sync_pool()) and runs each idempotent DDL statement from_ensure_sql(). Safe to call repeatedly. Used directly (e.g. byrag_system/file_rag_managerand verification scripts) and internally byclear().
- upsert(ids, documents, metadatas=None, embeddings=None)[source]
Insert or replace rows by primary key, requiring explicit embeddings.
Shapes the parallel lists via
_upsert_rows(), builds the upsert SQL with_upsert_sql(), and runs it withcursor.executemanyon a pooled (autocommit) connection so each id is inserted or fully updated. Returns early when ids is empty. UnlikeChromaCompatCollection, this layer never embeds implicitly — callers supply Gemini embeddings.Reached from synchronous indexing paths such as
extract_tags_to_conceptsandbackfill_entity_provenance, which instantiate the collection and callupsertdirectly.- Parameters:
- Raises:
ValueError – If embeddings is
None.- Return type:
- query(embedding, n_results=5, where=None, where_document=None)[source]
Run a synchronous nearest-neighbour search and return scored hits.
Builds the query via
_query_sql(), substitutes the renderedvector_literal()into the reservedNoneplaceholders, executes it on a pooled connection, and maps each row to a dict including the raw L2distanceand a derivedsimilarityfroml2_to_similarity().Used by synchronous search paths (e.g.
rag_system/file_rag_managerviaChromaCompatCollection, and direct callers likeanamnesis_engine).- Parameters:
- Returns:
Per-hit dicts with
id,document,metadata,distance, andsimilarity, nearest first.- Return type:
- get(ids=None, where=None, where_document=None, offset=None, limit=None)[source]
Fetch rows by id and/or filter (no vector search) synchronously.
Builds the statement with
_get_sql(), runs it on a pooled connection, and reshapes the rows via_rows_to_get(). Surfaced to Chroma-style callers throughChromaCompatCollection.get().
- delete(ids=None, where=None)[source]
Delete rows by id and/or metadata filter synchronously.
No-ops when neither ids nor where is supplied (guarding against an accidental full-table delete), otherwise builds the statement via
_delete_sql()and runs it on a pooled connection. Surfaced throughChromaCompatCollection.delete().
- count()[source]
Return the total number of rows in the collection.
Runs
SELECT count(*)on a pooled connection. Surfaced throughChromaCompatCollection.count().- Returns:
The row count (
0if the query yields no row).- Return type:
- class vector_store.ChromaCompatCollection(pg, embedding_fn)[source]
Bases:
objectChroma-compatible sync facade over
PgVectorCollection.Auto-embeds documents/queries via embedding_fn (e.g.
SyncOpenRouterEmbeddings) so call sites that relied on Chroma’sembedding_functionkeep working with minimal changes.queryreturns Chroma-shaped nested lists and reportsdistancesas cosine distance (1 - similarity), so callers that compute1 - distancerecover a cosine similarity in[0, 1]exactly as they did against Chroma.- Parameters:
pg (PgVectorCollection)
embedding_fn (Any)
- __init__(pg, embedding_fn)[source]
Wrap a
PgVectorCollectionwith an auto-embedding facade.Stores the backing collection and an embedder. Instantiated by
rag_system/file_rag_manager(self.collection = ChromaCompatCollection(self._pg, self.embedding_fn)) so existing Chroma-style call sites keep working over pgvector.- Parameters:
pg (
PgVectorCollection) – The synchronous pgvector collection doing the real I/O.embedding_fn (
Any) – An embedder exposing eitherembed_documents/embed_querymethods or a plain callable taking a list of texts (e.g.SyncOpenRouterEmbeddings).
- ensure()[source]
Ensure the backing collection’s schema/table/index exist.
Thin pass-through to
PgVectorCollection.ensure().- Return type:
- upsert(ids, documents, metadatas=None, embeddings=None)[source]
Upsert documents, auto-embedding them when no vectors are given.
Computes embeddings via
_embed_documents()if embeddings isNone(the Chromaembedding_functionbehaviour), then delegates toPgVectorCollection.upsert().- Parameters:
- Return type:
- add(ids, documents, metadatas=None, embeddings=None)[source]
Alias for
upsert(), matching Chroma’saddAPI.Forwards all arguments to
upsert()so call sites using Chroma’scollection.addsemantics keep working.
- get(ids=None, where=None, where_document=None, limit=None, offset=None, include=None)[source]
Fetch rows by id/filter, mirroring Chroma’s
getsignature.Delegates to
PgVectorCollection.get(). The Chroma-only include argument is accepted for signature compatibility but ignored (this layer always returns ids/documents/metadatas).- Parameters:
- Returns:
{"ids", "documents", "metadatas"}lists.- Return type:
- delete(ids=None, where=None)[source]
Delete rows by id and/or metadata filter.
Thin pass-through to
PgVectorCollection.delete().
- count()[source]
Return the collection’s row count via
PgVectorCollection.count().- Returns:
Total number of stored rows.
- Return type:
- query(query_embeddings=None, query_texts=None, n_results=10, where=None, where_document=None, include=None)[source]
Run a nearest-neighbour search returning Chroma-shaped nested lists.
Resolves the query vector from query_embeddings (unwrapping a list-of-lists batch to its first row) or by embedding
query_texts[0]via_embed_query(); with neither it returns the empty Chroma shape. Delegates the search toPgVectorCollection.query(), then repackages results into single-batch nested lists and converts eachsimilarityback to a cosinedistance(1 - similarity) so callers computing1 - distancerecover the similarity exactly as they did against Chroma. The include argument is ignored.- Parameters:
query_embeddings (
Any) – A query vector or a one-element batch of vectors, orNone.query_texts (
Any) – A list of query strings (only the first is used), orNone.n_results (
int) – Maximum neighbours to return.where (
Any) – Chroma-style metadata filter, orNone.where_document (
Any) – Chroma-style document filter, orNone.include (
Any) – Accepted for Chroma compatibility; ignored.
- Returns:
{"ids", "documents", "metadatas", "distances"}each wrapped in a single-query outer list;distancesare cosine distances. Returns empty single-batch lists when no query is given.- Return type:
- class vector_store.AsyncPgVectorCollection(schema, table, dim=3072)[source]
Bases:
_BaseCollectionAsynchronous pgvector collection backed by the per-loop asyncpg pool.
Concrete
_BaseCollectionfor genuinely-async hot paths running on the bot event loop. Each coroutine acquires a connection fromget_async_pool()and runs the driver-neutral SQL the base class builds (withasyncpgnumbered placeholders). Exposes the async Chroma-like surfaceensure/upsert/query/get/delete/count/clear; embeddings are always supplied by the caller. Drives async paths such asprompt_context._divine_reflex(Golden Goddess reflex),tools/query_golden_goddess_v2, andtools/chromadb_tools.- async ensure()[source]
Create the schema, table, and HNSW index for this collection (async).
Async counterpart of
PgVectorCollection.ensure(): runs the idempotent DDL from_ensure_sql()on a connection acquired fromget_async_pool(). Called bytools/chromadb_toolsbefore upserting into a vector DB, and internally byclear().- Return type:
- async upsert(ids, documents, metadatas=None, embeddings=None)[source]
Insert or update rows on the async (asyncpg) path.
No-ops on empty ids; otherwise rows are built by
_upsert_rows(), the SQL by_upsert_sql(), and executed withexecutemanyon a pooled connection fromget_async_pool(). Used by async ingestion code such astools/chromadb_tools.- Parameters:
- Raises:
ValueError – If embeddings is
None.- Return type:
- async query(embedding, n_results=5, where=None, where_document=None)[source]
Run an async nearest-neighbour search and return scored hits.
Builds the SQL via
_query_sql(), fills the reserved vector slots with the renderedvector_literal(), fetches rows on a pooled connection fromget_async_pool(), and maps each row to a dict with the raw L2distanceand asimilarityfroml2_to_similarity().Drives async hot paths such as
prompt_context._divine_reflex(Golden Goddess reflex),tools/query_golden_goddess_v2, andtools/chromadb_tools.- Parameters:
- Returns:
Per-hit dicts with
id,document,metadata,distance, andsimilarity, nearest first.- Return type:
- async get(ids=None, where=None, where_document=None, offset=None, limit=None)[source]
Fetch rows by id and/or filters on the async path.
Builds the statement via
_get_sql(), fetches rows on a pooled connection fromget_async_pool(), then normalises the asyncpgRecordrows into Chroma-style parallel lists via_rows_to_get(). Used by async tools liketools/chromadb_tools.
- async delete(ids=None, where=None)[source]
Delete rows by id list and/or metadata filter (async).
No-ops when neither ids nor where is given. Otherwise builds the statement via
_delete_sql()and executes it on a pooled connection fromget_async_pool(). Used by async maintenance code such astools/chromadb_tools.
- async count()[source]
Return the collection’s row count (async).
Runs
SELECT count(*)viafetchvalon a pooled connection fromget_async_pool(). Used by async tools such astools/chromadb_tools.- Returns:
The row count, or
0when the value isNone.- Return type:
- async clear()[source]
Empty the collection on the async path, ensuring it exists first.
Calls
ensure()thenTRUNCATEs the table on a pooled connection fromget_async_pool(); the schema and HNSW index are preserved. No internal or grep-found external callers; invoked ad hoc by async maintenance code.- Return type:
- vector_store.list_store_schemas()[source]
List the user (non-system) schemas in the vector database.
Replaces the old filesystem listing of
rag_stores/directories: it queriesinformation_schema.schemataon the synchronous pool (get_sync_pool()), excluding Postgres’s own schemas (pg_catalog,information_schema,public,pg_toast, and thepg_temp_*/pg_toast_temp_*families) so only logical store schemas remain. No callers were found by grep, so it is used (if at all) by introspection tooling or scripts.
- vector_store.table_count(schema, table)[source]
Return the row count for a given
schema.table, or0if absent.Validates both identifiers with
_validate_ident()(since the table name is interpolated into thecount(*)query), then first checksinformation_schema.tablesto confirm the table exists before counting, so a missing collection yields0rather than raising. Runs on the synchronous pool fromget_sync_pool(). No callers were found by grep, so it is used (if at all) by introspection/verification tooling.- Parameters:
- Returns:
The row count, or
0when the table does not exist.- Return type:
- Raises:
ValueError – If schema or table is not a safe identifier.