"""Shared Postgres + pgvector store layer.
This module replaces the legacy ``chroma_registry`` / on-disk ChromaDB
``PersistentClient`` model. All vector storage now lives in a single
Postgres database (``stargazer_filerag``) using one **schema per logical
store** and one table per collection with the shape::
id text PRIMARY KEY
document text
metadata jsonb
embedding halfvec(3072) -- HNSW index, halfvec_l2_ops (L2)
Embeddings are **always supplied by the caller** (computed via the shared
Gemini embed pool); this layer never embeds implicitly the way a ChromaDB
``embedding_function`` did.
Two drivers are provided because the codebase mixes async and sync call
sites:
* :class:`AsyncPgVectorCollection` (asyncpg) for genuinely-async hot paths
running on the bot event loop (e.g. ``prompt_context._divine_reflex``).
* :class:`PgVectorCollection` (psycopg3, thread-safe pool) for synchronous
code: ``FileRAGManager`` worker threads, CLIs, and ``scripts/``.
Both expose the same Chroma-like surface: ``ensure`` / ``upsert`` /
``query`` / ``get`` / ``delete`` / ``count`` / ``clear``.
Built by Stargazer Project.
"""
from __future__ import annotations
import asyncio
import json
import logging
import os
import re
import threading
from typing import Any, Iterable, Sequence
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
EMBED_DIM = 3072
"""All migrated stores use Gemini ``gemini-embedding-001`` at 3072 dims."""
_IDENT_RE = re.compile(r"^[A-Za-z0-9_]+$")
# Default connection target. Mirrors the ``_DEFAULT_OPENROUTER_KEY`` pattern
# already used in ``gemini_embed_pool`` so CLIs/scripts work out of the box;
# overridable via env (``STARGAZER_VECTOR_PG_*`` / ``STARGAZER_VECTOR_PG_DSN``)
# or programmatically via :func:`configure`.
_DEFAULT_CONN: dict[str, Any] = {
"host": "10.10.0.7",
"port": 5432,
"database": "stargazer_filerag",
"user": "stargazer",
"password": "r9yJpys5vLFrnrx7YuYCRiwF3VW8",
"sslmode": "prefer",
"min_size": 1,
"max_size": 8,
}
_lock = threading.Lock()
_conn_params: dict[str, Any] | None = None
# ---------------------------------------------------------------------------
# Connection-parameter resolution
# ---------------------------------------------------------------------------
def _resolve_params() -> dict[str, Any]:
"""Resolve and cache the effective Postgres connection parameters.
Returns the cached ``_conn_params`` if already populated; otherwise builds
a fresh dict from :data:`_DEFAULT_CONN`, overlays any
``STARGAZER_VECTOR_PG_*`` environment variables (and ``STARGAZER_VECTOR_PG_DSN``
as a full ``dsn``), coercing ``port``/``min_size``/``max_size`` to ``int``,
and memoises the result so the precedence is env over module defaults. Reads
``os.environ`` and mutates the module-level cache. Called by
:func:`configure`, :func:`_asyncpg_kwargs`, :func:`_psycopg_conninfo`, and
:func:`_pool_sizes`.
Returns:
dict[str, Any]: The resolved connection parameters.
"""
global _conn_params
if _conn_params is not None:
return _conn_params
dsn = os.environ.get("STARGAZER_VECTOR_PG_DSN", "").strip()
params = dict(_DEFAULT_CONN)
if dsn:
params["dsn"] = dsn
env_map = {
"STARGAZER_VECTOR_PG_HOST": "host",
"STARGAZER_VECTOR_PG_PORT": "port",
"STARGAZER_VECTOR_PG_DATABASE": "database",
"STARGAZER_VECTOR_PG_USER": "user",
"STARGAZER_VECTOR_PG_PASSWORD": "password",
"STARGAZER_VECTOR_PG_SSLMODE": "sslmode",
"STARGAZER_VECTOR_PG_MIN_SIZE": "min_size",
"STARGAZER_VECTOR_PG_MAX_SIZE": "max_size",
}
for env_var, key in env_map.items():
val = os.environ.get(env_var)
if val is None or not val.strip():
continue
if key in ("port", "min_size", "max_size"):
try:
params[key] = int(val)
except ValueError:
continue
else:
params[key] = val.strip()
_conn_params = params
return params
def _asyncpg_kwargs() -> dict[str, Any]:
"""Build the keyword arguments for ``asyncpg.create_pool``.
Reads the resolved connection params and returns either a single ``dsn``
entry (when a DSN string was configured) or the discrete ``host``/``port``/
``user``/``password``/``database`` fields asyncpg expects. An empty password
is normalised to ``None``.
Calls :func:`_resolve_params`. Called by :func:`get_async_pool` when it
constructs the per-event-loop asyncpg pool. No other module references it.
Returns:
dict[str, Any]: Keyword arguments suitable for ``asyncpg.create_pool``.
"""
p = _resolve_params()
if p.get("dsn"):
return {"dsn": p["dsn"]}
return {
"host": p["host"],
"port": int(p["port"]),
"user": p["user"],
"password": p["password"] or None,
"database": p["database"],
}
def _psycopg_conninfo() -> str:
"""Build the libpq-style ``conninfo`` string for the psycopg3 pool.
Returns the configured ``dsn`` verbatim when present; otherwise assembles a
space-separated ``key=value`` connection string from the discrete params,
including ``password`` and ``sslmode`` only when set.
Calls :func:`_resolve_params`. Called by :func:`get_sync_pool` when it opens
the shared synchronous ``ConnectionPool``. No external callers found.
Returns:
str: A libpq ``conninfo`` string consumable by psycopg3.
"""
p = _resolve_params()
if p.get("dsn"):
return p["dsn"]
parts = [
f"host={p['host']}",
f"port={int(p['port'])}",
f"dbname={p['database']}",
f"user={p['user']}",
]
if p.get("password"):
parts.append(f"password={p['password']}")
if p.get("sslmode"):
parts.append(f"sslmode={p['sslmode']}")
return " ".join(parts)
def _pool_sizes() -> tuple[int, int]:
"""Return the ``(min_size, max_size)`` pool bounds from resolved params.
Reads ``min_size``/``max_size`` (defaulting to ``1`` and ``8``) and coerces
them to ``int``. Calls :func:`_resolve_params`; called by both
:func:`get_async_pool` and :func:`get_sync_pool` when sizing their pools.
Returns:
tuple[int, int]: The minimum and maximum connection counts.
"""
p = _resolve_params()
return int(p.get("min_size", 1)), int(p.get("max_size", 8))
def _validate_ident(name: str, kind: str) -> str:
"""Validate that *name* is a safe Postgres identifier and return it.
Identifiers are interpolated directly into SQL DDL/DML (schema and table
names), so this guards against injection by requiring *name* to match
``_IDENT_RE`` (alphanumerics and underscores only). Unlike :func:`pg_ident`,
it does not sanitise — it rejects.
Called throughout this module wherever a schema/table name reaches SQL:
:class:`_BaseCollection.__init__`, :func:`_ensure_sql`, and
:func:`table_count`. Also imported by ``rag_system/pg_source_files.py``,
which pairs it with :func:`pg_ident` to validate a sanitised schema name.
Args:
name: The candidate identifier (schema or table name).
kind: A short label (e.g. ``"schema"``/``"table"``) used in the error
message.
Returns:
str: The validated *name*, unchanged.
Raises:
ValueError: If *name* is empty or contains characters outside
``[A-Za-z0-9_]``.
"""
if not name or not _IDENT_RE.match(name):
raise ValueError(f"Unsafe {kind} identifier: {name!r}")
return name
[docs]
def pg_ident(name: str) -> str:
"""Coerce an arbitrary string into a safe Postgres identifier.
Unlike :func:`_validate_ident` (which rejects unsafe input), this sanitises:
it mirrors the underscore-only convention used by the migration (e.g.
``us_law``, ``files_us_law``). Non-alphanumeric characters collapse to a
single ``_``, an empty result becomes ``store``, a leading digit is prefixed
with ``s_``, and the result is truncated to Postgres's 63-char identifier
limit. Pure string helper with no I/O.
Used across the repo to derive schema/table names from store paths and
collection names: ``anamnesis_engine``, ``extract_tags_to_concepts``,
``backfill_entity_provenance``, ``rag_system/file_rag_manager``,
``rag_system/pg_source_files`` (which then re-validates via
:func:`_validate_ident`), ``tools/chromadb_tools``, and
``scripts/migrate_vector_dbs_to_pgvector``.
Args:
name: The raw name to sanitise (e.g. a store directory or collection).
Returns:
str: A Postgres-safe identifier of at most 63 characters.
"""
s = re.sub(r"[^A-Za-z0-9_]", "_", name or "")
s = re.sub(r"_+", "_", s).strip("_")
if not s:
s = "store"
if s[0].isdigit():
s = "s_" + s
return s[:63]
# ---------------------------------------------------------------------------
# Helpers: vector / metadata / filter translation
# ---------------------------------------------------------------------------
[docs]
def vector_literal(embedding: Sequence[float]) -> str:
"""Render an embedding vector as a pgvector text literal.
Produces the bracketed comma-separated form (``[v1,v2,...]``) that
pgvector's ``halfvec``/``vector`` input parser accepts, coercing each element
to ``float`` and using ``repr`` so full precision survives the round-trip.
Pure helper with no I/O. Called wherever a vector is bound into SQL within
this module: :meth:`_BaseCollection._upsert_rows` (per upserted row) and the
``query`` methods of both :class:`PgVectorCollection` and
:class:`AsyncPgVectorCollection`, where it fills the reserved ``None``
placeholder.
Args:
embedding: The numeric embedding vector.
Returns:
str: A pgvector text literal of the form ``[v1,v2,...]``.
"""
return "[" + ",".join(repr(float(x)) for x in embedding) + "]"
def _coerce_metadata(meta: Any) -> dict[str, Any]:
"""Normalise a row's ``metadata`` value into a plain ``dict``.
Postgres ``jsonb`` may surface to a driver as an already-parsed ``dict`` or
as a raw JSON ``str`` (asyncpg returns the latter unless a codec is set), so
this decodes strings via ``json.loads`` and treats anything unparseable or
of an unexpected type as an empty mapping.
Pure helper with no I/O. Called when shaping query/get results back into
Python dicts: :meth:`PgVectorCollection.query`, :meth:`_BaseCollection._rows_to_get`,
and :meth:`AsyncPgVectorCollection.query`.
Args:
meta: A metadata value from a result row (``None``, JSON ``str``,
``dict``, or other).
Returns:
dict[str, Any]: The decoded metadata, or ``{}`` when absent/invalid.
"""
if meta is None:
return {}
if isinstance(meta, str):
try:
return json.loads(meta)
except (ValueError, TypeError):
return {}
if isinstance(meta, dict):
return meta
return {}
[docs]
def l2_to_similarity(distance: float | None) -> float | None:
"""Convert an L2 distance to a cosine-style similarity in ``[0, 1]``.
Because Gemini ``gemini-embedding-001`` at 3072 dims is pre-normalised, for
unit vectors ``L2^2 = 2 - 2*cos``, so ``cos = 1 - L2^2/2``; the result is
clamped into ``[0, 1]``. This keeps the reported ``similarity`` aligned with
the old Chroma ``1 - distance`` convention. Pure helper with no I/O. Called
by the ``query`` methods of both :class:`PgVectorCollection` and
:class:`AsyncPgVectorCollection` to attach a ``similarity`` to each hit.
Args:
distance: The raw L2 distance from the pgvector ``<->`` operator, or
``None``.
Returns:
float | None: The similarity in ``[0, 1]``, or ``None`` when *distance*
is ``None``.
"""
if distance is None:
return None
sim = 1.0 - (distance * distance) / 2.0
if sim < 0.0:
return 0.0
if sim > 1.0:
return 1.0
return sim
class _ParamBuilder:
"""Accumulates bind values and emits driver-correct SQL placeholders.
Lets one set of SQL builders target both drivers used in this module:
asyncpg (numbered ``$1``, ``$2`` … placeholders) and psycopg (``%s``).
Each :meth:`add` records a value in call order and hands back the matching
placeholder text, so the accumulated :attr:`values` list lines up
positionally with the assembled statement. A fresh instance is created per
statement by every SQL builder here (:func:`_translate_clause`,
:func:`_build_filter_sql`, :meth:`_BaseCollection._upsert_sql`,
``_query_sql``, ``_get_sql``, ``_delete_sql``).
"""
def __init__(self, style: str):
"""Initialise an empty parameter accumulator for one driver style.
Args:
style: Either ``"asyncpg"`` (emits numbered ``$1``, ``$2`` …
placeholders) or ``"psycopg"`` (emits ``%s`` placeholders).
"""
self.style = style # "asyncpg" | "psycopg"
self.values: list[Any] = []
def add(self, value: Any) -> str:
"""Record a bind value and return its driver-correct placeholder.
Appends *value* to :attr:`values` (preserving call order) and returns
the placeholder text to splice into the SQL string. Used by every SQL
builder in this module (:func:`_translate_clause`,
:func:`_build_filter_sql`, :meth:`_BaseCollection._query_sql`, etc.) so
the same builder code targets both asyncpg and psycopg.
Args:
value: The Python value to bind for this placeholder.
Returns:
str: ``"$N"`` for the asyncpg style (1-based positional index) or
``"%s"`` for psycopg.
"""
self.values.append(value)
if self.style == "asyncpg":
return f"${len(self.values)}"
return "%s"
def _translate_clause(node: Any, pb: _ParamBuilder) -> str | None:
"""Translate a Chroma-style ``where`` dict into a SQL boolean clause.
Recursively walks the filter, handling the boolean combinators ``$and`` and
``$or`` plus the per-field operators ``$eq`` / ``$ne`` / ``$in`` /
``$contains`` and bare equality ``{"field": value}``. Field comparisons run
against the ``jsonb`` metadata column as text (``metadata->>key``), and every
operand is parameterised through *pb* (never inlined) so the clause is
injection-safe; values are coerced to ``str`` to match ``->>`` semantics.
Unknown operators fall back to equality.
Called by :func:`_build_filter_sql` and :meth:`_BaseCollection._delete_sql`,
and recursively by itself for nested ``$and`` / ``$or`` branches.
Args:
node: A Chroma-style filter dict (or anything else, treated as no
clause).
pb: The :class:`_ParamBuilder` collecting bind values for this query.
Returns:
str | None: A SQL boolean fragment, or ``None`` when *node* yields no
predicate.
"""
if not node or not isinstance(node, dict):
return None
clauses: list[str] = []
for key, val in node.items():
if key == "$and":
sub = [c for c in (_translate_clause(x, pb) for x in val) if c]
if sub:
clauses.append("(" + " AND ".join(sub) + ")")
continue
if key == "$or":
sub = [c for c in (_translate_clause(x, pb) for x in val) if c]
if sub:
clauses.append("(" + " OR ".join(sub) + ")")
continue
col = f"metadata->>{pb.add(key)}"
if isinstance(val, dict):
for op, opval in val.items():
if op == "$eq":
clauses.append(f"{col} = {pb.add(str(opval))}")
elif op == "$ne":
clauses.append(
f"({col} IS DISTINCT FROM {pb.add(str(opval))})"
)
elif op == "$contains":
clauses.append(f"{col} ILIKE {pb.add('%' + str(opval) + '%')}")
elif op == "$in":
vals = [str(x) for x in (opval or [])]
clauses.append(f"{col} = ANY({pb.add(vals)})")
else:
clauses.append(f"{col} = {pb.add(str(opval))}")
else:
clauses.append(f"{col} = {pb.add(str(val))}")
if not clauses:
return None
return " AND ".join(clauses)
def _translate_where_document(where_document: Any, pb: _ParamBuilder) -> str | None:
"""Translate a Chroma ``where_document`` filter into a SQL clause.
Supports only the ``$contains`` operator, mapping it to a case-insensitive
``document ILIKE '%…%'`` predicate; any other shape yields no clause. Binds
the search pattern via *pb* so the substring is parameterised, not inlined.
Called by :func:`_build_filter_sql`, which combines its result with the
metadata ``where`` clause. No external callers.
Args:
where_document: A Chroma-style document filter dict (e.g.
``{"$contains": "text"}``) or anything else (ignored).
pb: The :class:`_ParamBuilder` collecting bind values for this query.
Returns:
str | None: A SQL boolean fragment, or ``None`` when no filter applies.
"""
if not where_document or not isinstance(where_document, dict):
return None
contains = where_document.get("$contains")
if contains:
return f"document ILIKE {pb.add('%' + str(contains) + '%')}"
return None
def _build_filter_sql(
pb: _ParamBuilder,
where: Any = None,
where_document: Any = None,
) -> str:
"""Combine metadata and document filters into a ``WHERE`` suffix.
Translates the Chroma-style *where* clause via :func:`_translate_clause` and
the *where_document* clause via :func:`_translate_where_document`, joining
any resulting fragments with ``AND``. Returns an empty string when neither
produces a clause (so it can be concatenated unconditionally). Bind values
accumulate in *pb*.
Called by :meth:`_BaseCollection._query_sql` and
:meth:`_BaseCollection._get_sql` while assembling their statements.
Args:
pb: The :class:`_ParamBuilder` collecting bind values.
where: A Chroma-style metadata filter dict, or ``None``.
where_document: A Chroma-style document filter dict, or ``None``.
Returns:
str: A leading-space ``" WHERE …"`` fragment, or ``""`` if no filters.
"""
parts: list[str] = []
w = _translate_clause(where, pb)
if w:
parts.append(w)
wd = _translate_where_document(where_document, pb)
if wd:
parts.append(wd)
if not parts:
return ""
return " WHERE " + " AND ".join(parts)
# ---------------------------------------------------------------------------
# Async pool (asyncpg) — keyed per running event loop
# ---------------------------------------------------------------------------
_async_pools: dict[int, Any] = {}
_async_pool_lock = asyncio.Lock()
[docs]
async def get_async_pool():
"""Return the asyncpg pool for the current event loop, creating it lazily.
Pools are cached per running event loop (keyed by ``id(loop)`` in the
module-level ``_async_pools`` dict) because asyncpg pools are bound to the
loop that created them; this matters in the multi-service bot where worker
threads run their own loops. On a cache miss it double-checks under
``_async_pool_lock`` then opens an ``asyncpg.create_pool`` sized by
:func:`_pool_sizes` and configured by :func:`_asyncpg_kwargs`, opening real
Postgres connections and logging the creation. Awaited by
:func:`warm_async_pool`, :func:`close_async_pool`, the ``ensure``/``upsert``/
``query``/``get``/``delete``/``count`` methods of
:class:`AsyncPgVectorCollection`, and external async callers such as
``sword/overlay`` and ``tools/chromadb_tools``.
Returns:
asyncpg.Pool: The pool bound to the current event loop.
"""
import asyncpg
loop = asyncio.get_running_loop()
key = id(loop)
pool = _async_pools.get(key)
if pool is not None:
return pool
async with _async_pool_lock:
pool = _async_pools.get(key)
if pool is None:
min_size, max_size = _pool_sizes()
pool = await asyncpg.create_pool(
min_size=min_size,
max_size=max_size,
command_timeout=60,
**_asyncpg_kwargs(),
)
_async_pools[key] = pool
logger.info("vector_store: created asyncpg pool for loop %s", key)
return pool
[docs]
async def warm_async_pool() -> None:
"""Warm the asyncpg pool and initialise the SWORD overlay cache schema.
A startup hook that establishes the per-loop pool via
:func:`get_async_pool`, runs ``SELECT 1`` to confirm connectivity, and then
idempotently creates the ``sword_cache`` schema and its ``overlay_cache``
table (used by ``sword/overlay``) so the first real overlay write does not
pay DDL latency. Acquires a pooled connection and issues DDL/DML; logs and
re-raises on failure so a broken database surfaces at boot. Awaited by
``prompt_context`` during startup and by the SWORD overlay Postgres tests.
Raises:
Exception: If the SWORD cache schema/table cannot be created (the
``SELECT 1`` connectivity error propagates likewise).
"""
pool = await get_async_pool()
async with pool.acquire() as conn:
await conn.execute("SELECT 1")
try:
# Ensure SWORD overlay cache table exists at startup
await conn.execute('CREATE SCHEMA IF NOT EXISTS "sword_cache"')
await conn.execute(
'CREATE TABLE IF NOT EXISTS "sword_cache"."overlay_cache" ('
' key TEXT PRIMARY KEY,'
' payload JSONB,'
' updated_at DOUBLE PRECISION'
')'
)
logger.info("[sword] PostgreSQL SWORD cache schema and table verified/initialized successfully.")
except Exception as e:
logger.error("[sword] Failed to initialize PostgreSQL SWORD cache schema: %s", e)
raise e
[docs]
async def close_async_pool() -> None:
"""Close and forget the asyncpg pool bound to the current event loop.
Pops the pool keyed by the running loop's id out of the module-level
``_async_pools`` cache and awaits its ``close()``; a no-op if no pool exists
for this loop. Intended for orderly shutdown of a worker's event loop. No
internal callers were found in this module, and no external callers were
found by grep, so it is invoked (if at all) by service teardown code.
Raises:
RuntimeError: If called outside a running event loop (from
``asyncio.get_running_loop``).
"""
loop = asyncio.get_running_loop()
pool = _async_pools.pop(id(loop), None)
if pool is not None:
await pool.close()
# ---------------------------------------------------------------------------
# Sync pool (psycopg3) — single thread-safe pool
# ---------------------------------------------------------------------------
_sync_pool: Any = None
_sync_pool_lock = threading.Lock()
[docs]
def get_sync_pool():
"""Return the shared psycopg3 connection pool, creating it lazily.
Unlike the per-loop async pools, a single process-wide thread-safe
``ConnectionPool`` serves all synchronous callers (worker threads, CLIs,
scripts). On first use it double-checks under ``_sync_pool_lock`` then opens
an autocommit pool with conninfo from :func:`_psycopg_conninfo`, sized by
:func:`_pool_sizes`, opening real Postgres connections and logging the
creation. Called by :func:`warm_sync_pool`, :func:`list_store_schemas`,
:func:`table_count`, every method of :class:`PgVectorCollection`, and many
external sync paths (``rag_system/pg_source_files``,
``rag_system/file_rag_manager``, ``sword/overlay``, and ``scripts/``).
Returns:
psycopg_pool.ConnectionPool: The shared autocommit pool.
"""
global _sync_pool
if _sync_pool is not None:
return _sync_pool
with _sync_pool_lock:
if _sync_pool is None:
from psycopg_pool import ConnectionPool
min_size, max_size = _pool_sizes()
_sync_pool = ConnectionPool(
conninfo=_psycopg_conninfo(),
min_size=min_size,
max_size=max_size,
kwargs={"autocommit": True},
open=True,
)
logger.info("vector_store: created psycopg3 sync pool")
return _sync_pool
[docs]
def warm_sync_pool() -> None:
"""Open the synchronous psycopg3 pool and verify connectivity.
Calls :func:`get_sync_pool` to lazily create the shared pool, then runs a
trivial ``SELECT 1`` on a borrowed connection so the first real query
doesn't pay connection-setup latency. Mirrors :func:`warm_async_pool` for
sync call sites (worker threads, CLIs). No internal or external callers were
found by grep; it is a startup-warm entry point for synchronous services.
Raises:
Exception: Propagates any psycopg connection/query error if the
database is unreachable.
"""
pool = get_sync_pool()
with pool.connection() as conn:
conn.execute("SELECT 1")
# ---------------------------------------------------------------------------
# DDL
# ---------------------------------------------------------------------------
def _ensure_sql(schema: str, table: str, dim: int) -> list[str]:
"""Build the idempotent DDL statements that create a collection's table.
Returns three ``IF NOT EXISTS`` statements: create the schema, create the
collection table (``id``/``document``/``metadata``/``embedding halfvec(dim)``),
and create the HNSW index on the embedding using ``halfvec_l2_ops``. Both
identifiers are first checked by :func:`_validate_ident` since they are
interpolated directly into the SQL.
Called by :meth:`PgVectorCollection.ensure` and
:meth:`AsyncPgVectorCollection.ensure`, which execute each returned
statement on a pooled connection. No external callers.
Args:
schema: The validated Postgres schema name for the store.
table: The validated Postgres table name for the collection.
dim: Embedding dimensionality for the ``halfvec`` column (e.g.
:data:`EMBED_DIM`).
Returns:
list[str]: SQL DDL statements to execute in order.
Raises:
ValueError: If *schema* or *table* is not a safe identifier.
"""
_validate_ident(schema, "schema")
_validate_ident(table, "table")
return [
f'CREATE SCHEMA IF NOT EXISTS "{schema}"',
(
f'CREATE TABLE IF NOT EXISTS "{schema}"."{table}" ('
"id text PRIMARY KEY, document text, metadata jsonb, "
f"embedding halfvec({dim}))"
),
(
f'CREATE INDEX IF NOT EXISTS "{table}_embedding_idx" '
f'ON "{schema}"."{table}" USING hnsw (embedding halfvec_l2_ops)'
),
]
# ---------------------------------------------------------------------------
# Collections
# ---------------------------------------------------------------------------
class _BaseCollection:
"""Driver-neutral base for a single pgvector collection (schema.table).
Holds the validated schema/table identifiers and the embedding dimension,
and provides the SQL builders shared by the sync
(:class:`PgVectorCollection`) and async (:class:`AsyncPgVectorCollection`)
subclasses. It performs no I/O itself; subclasses execute the SQL it emits
on their respective pools.
"""
def __init__(self, schema: str, table: str, dim: int = EMBED_DIM):
"""Validate identifiers and cache the quoted ``schema.table`` name.
Runs both names through :func:`_validate_ident` (raising on unsafe
input) and precomputes ``self._qname``, the double-quoted qualified
name spliced into every statement. Invoked via the concrete subclasses,
which are instantiated across the repo (e.g. ``prompt_context``\\ 's
Golden-Goddess collection, ``anamnesis_engine``, ``extract_tags_to_concepts``,
and ``tools/chromadb_tools``).
Args:
schema: Postgres schema name for the logical store.
table: Postgres table name for the collection.
dim: Embedding dimensionality for the ``halfvec`` column; defaults
to :data:`EMBED_DIM`.
Raises:
ValueError: If *schema* or *table* is not a safe identifier.
"""
self.schema = _validate_ident(schema, "schema")
self.table = _validate_ident(table, "table")
self.dim = dim
self._qname = f'"{self.schema}"."{self.table}"'
# -- SQL builders (driver-neutral via _ParamBuilder) -------------------
def _upsert_sql(self, style: str) -> str:
"""Build the parameterised ``INSERT … ON CONFLICT`` upsert statement.
Produces a single-row upsert with placeholders for ``id``, ``document``,
``metadata`` (cast ``::jsonb``) and ``embedding`` (cast
``::halfvec(dim)``), updating all non-key columns on primary-key
conflict. The placeholders are filled per row by ``executemany``. Uses a
:class:`_ParamBuilder` only to emit driver-correct placeholder syntax;
the four ``None`` adds reserve positions, not values.
Called by :meth:`PgVectorCollection.upsert` and
:meth:`AsyncPgVectorCollection.upsert`.
Args:
style: ``"asyncpg"`` or ``"psycopg"`` placeholder style.
Returns:
str: The upsert SQL for the configured driver.
"""
pb = _ParamBuilder(style)
idp = pb.add(None)
docp = pb.add(None)
metap = pb.add(None)
vecp = pb.add(None)
return (
f"INSERT INTO {self._qname} (id, document, metadata, embedding) "
f"VALUES ({idp}, {docp}, {metap}::jsonb, {vecp}::halfvec({self.dim})) "
"ON CONFLICT (id) DO UPDATE SET "
"document = EXCLUDED.document, "
"metadata = EXCLUDED.metadata, "
"embedding = EXCLUDED.embedding"
)
def _query_sql(
self, style: str, n_results: int, where: Any, where_document: Any
) -> tuple[str, list[Any]]:
"""Build the nearest-neighbour ``SELECT`` and its bind-value list.
Emits a query that returns ``id``/``document``/``metadata`` plus the L2
distance (``embedding <-> $vec``) and orders by that same distance with
a ``LIMIT``. The embedding appears twice (select expression and
``ORDER BY``); both positions are reserved as ``None`` placeholders for
the caller to fill with the real vector literal, while optional metadata
and document filters are injected via :func:`_build_filter_sql`.
Called by :meth:`PgVectorCollection.query` and
:meth:`AsyncPgVectorCollection.query`, which replace the ``None`` slots
with the rendered vector before executing.
Args:
style: ``"asyncpg"`` or ``"psycopg"`` placeholder style.
n_results: Maximum number of neighbours to return (``LIMIT``).
where: Chroma-style metadata filter, or ``None``.
where_document: Chroma-style document filter, or ``None``.
Returns:
tuple[str, list[Any]]: The SQL and its ordered bind values, where
the two embedding slots are still ``None`` pending substitution.
"""
pb = _ParamBuilder(style)
vecp = pb.add(None) # placeholder reserved for the embedding (select)
# We need the vector value bound; fill after building.
filt = _build_filter_sql(pb, where, where_document)
vecp2 = pb.add(None) # order-by vector
limitp = pb.add(n_results)
sql = (
f"SELECT id, document, metadata, "
f"(embedding <-> {vecp}::halfvec({self.dim})) AS distance "
f"FROM {self._qname}{filt} "
f"ORDER BY embedding <-> {vecp2}::halfvec({self.dim}) "
f"LIMIT {limitp}"
)
return sql, pb.values
def _get_sql(
self,
style: str,
ids: Any,
where: Any,
where_document: Any,
offset: int | None,
limit: int | None,
) -> tuple[str, list[Any]]:
"""Build a non-vector ``SELECT`` for fetching rows by id and/or filter.
Emits ``SELECT id, document, metadata`` constrained by an optional
``id = ANY(...)`` predicate and/or the metadata/document filters from
:func:`_build_filter_sql`, splicing the two together with ``AND`` when
both are present (it strips the leading ``" WHERE "`` from the filter
fragment to do so). When *offset* or *limit* is given it appends a stable
``ORDER BY id`` plus the corresponding ``LIMIT``/``OFFSET``.
Called by :meth:`PgVectorCollection.get` and
:meth:`AsyncPgVectorCollection.get`.
Args:
style: ``"asyncpg"`` or ``"psycopg"`` placeholder style.
ids: An iterable of ids to restrict to, or ``None`` for no id filter.
where: Chroma-style metadata filter, or ``None``.
where_document: Chroma-style document filter, or ``None``.
offset: Row offset to skip, or ``None``.
limit: Maximum rows to return, or ``None``.
Returns:
tuple[str, list[Any]]: The SQL and its ordered bind values.
"""
pb = _ParamBuilder(style)
conds: list[str] = []
if ids is not None:
conds.append(f"id = ANY({pb.add(list(ids))})")
filt = _build_filter_sql(pb, where, where_document)
where_sql = ""
if conds and filt:
where_sql = " WHERE " + conds[0] + " AND " + filt[len(" WHERE "):]
elif conds:
where_sql = " WHERE " + conds[0]
elif filt:
where_sql = filt
tail = ""
if offset is not None or limit is not None:
tail += " ORDER BY id"
if limit is not None:
tail += f" LIMIT {pb.add(int(limit))}"
if offset is not None:
tail += f" OFFSET {pb.add(int(offset))}"
sql = f"SELECT id, document, metadata FROM {self._qname}{where_sql}{tail}"
return sql, pb.values
def _delete_sql(
self, style: str, ids: Any, where: Any
) -> tuple[str, list[Any]]:
"""Build a ``DELETE`` statement scoped by ids and/or a metadata filter.
Combines an optional ``id = ANY(...)`` predicate with the metadata
``where`` clause from :func:`_translate_clause` (joined by ``AND``).
Note that with no ids and no *where* this returns an unfiltered
``DELETE FROM table``; the public ``delete`` methods guard against that
case before calling here.
Called by :meth:`PgVectorCollection.delete` and
:meth:`AsyncPgVectorCollection.delete`.
Args:
style: ``"asyncpg"`` or ``"psycopg"`` placeholder style.
ids: An iterable of ids to delete, or ``None``.
where: Chroma-style metadata filter, or ``None``.
Returns:
tuple[str, list[Any]]: The ``DELETE`` SQL and its bind values.
"""
pb = _ParamBuilder(style)
conds: list[str] = []
if ids is not None:
conds.append(f"id = ANY({pb.add(list(ids))})")
w = _translate_clause(where, pb)
if w:
conds.append(w)
where_sql = (" WHERE " + " AND ".join(conds)) if conds else ""
return f"DELETE FROM {self._qname}{where_sql}", pb.values
@staticmethod
def _upsert_rows(
ids: Sequence[str],
documents: Sequence[str],
metadatas: Sequence[dict] | None,
embeddings: Sequence[Sequence[float]],
) -> list[tuple]:
"""Shape parallel column lists into per-row tuples for ``executemany``.
Zips *ids*/*documents*/*metadatas*/*embeddings* positionally, defaulting
a missing document to ``""`` and missing/None metadata to ``{}``. Each
metadata dict is serialised with ``json.dumps`` and each embedding is
rendered via :func:`vector_literal`, so the resulting tuples line up with
the ``(id, document, metadata, embedding)`` placeholders of
:meth:`_upsert_sql`.
Called by both ``upsert`` implementations to prepare their rows.
Args:
ids: Row primary keys (coerced to ``str``).
documents: Document texts, positionally aligned with *ids*.
metadatas: Per-row metadata dicts, or ``None``.
embeddings: Per-row embedding vectors, positionally aligned.
Returns:
list[tuple]: ``(id, document, metadata_json, vector_literal)`` rows.
"""
rows: list[tuple] = []
for i, _id in enumerate(ids):
meta = metadatas[i] if metadatas and i < len(metadatas) else {}
rows.append(
(
str(_id),
documents[i] if i < len(documents) else "",
json.dumps(meta or {}),
vector_literal(embeddings[i]),
)
)
return rows
@staticmethod
def _rows_to_get(records: Iterable[Any]) -> dict[str, list]:
"""Convert fetched rows into the Chroma-style ``get`` result mapping.
Accepts rows as either positional tuples (``r[0]``/``r[1]``/``r[2]``) or
mapping rows (``r["id"]`` etc.), so it works for both psycopg tuple rows
and pre-tupled asyncpg records. Each metadata cell is normalised through
:func:`_coerce_metadata`.
Called by :meth:`PgVectorCollection.get` and
:meth:`AsyncPgVectorCollection.get` to build their return value.
Args:
records: Iterable of result rows (tuples or dict-like mappings).
Returns:
dict[str, list]: ``{"ids": [...], "documents": [...],
"metadatas": [...]}`` with positionally aligned lists.
"""
ids: list[str] = []
docs: list[str] = []
metas: list[dict] = []
for r in records:
ids.append(r[0] if not isinstance(r, dict) else r["id"])
docs.append(r[1] if not isinstance(r, dict) else r["document"])
metas.append(
_coerce_metadata(
r[2] if not isinstance(r, dict) else r["metadata"]
)
)
return {"ids": ids, "documents": docs, "metadatas": metas}
[docs]
class PgVectorCollection(_BaseCollection):
"""Synchronous pgvector collection backed by the shared psycopg3 pool.
Concrete :class:`_BaseCollection` for blocking call sites — worker threads,
CLIs, and ``scripts/`` — that cannot await. Each method borrows a connection
from :func:`get_sync_pool` and runs the driver-neutral SQL the base class
builds (with ``psycopg`` placeholders). Exposes the Chroma-like surface
``ensure`` / ``upsert`` / ``query`` / ``get`` / ``delete`` / ``count`` /
``clear`` / ``drop``; embeddings are always supplied by the caller. Wrapped
by :class:`ChromaCompatCollection` for auto-embedding call sites and
instantiated directly by ``anamnesis_engine``, ``extract_tags_to_concepts``,
``backfill_entity_provenance``, ``memory_search``, and
``rag_system/file_rag_manager``.
"""
[docs]
def ensure(self) -> None:
"""Create the collection's schema, table, and HNSW index if absent.
Borrows a connection from the synchronous pool (:func:`get_sync_pool`)
and runs each idempotent DDL statement from :func:`_ensure_sql`. Safe to
call repeatedly. Used directly (e.g. by ``rag_system/file_rag_manager``
and verification scripts) and internally by :meth:`clear`.
Raises:
Exception: Propagates any psycopg error if the DDL fails.
"""
pool = get_sync_pool()
with pool.connection() as conn:
for stmt in _ensure_sql(self.schema, self.table, self.dim):
conn.execute(stmt)
[docs]
def upsert(
self,
ids: Sequence[str],
documents: Sequence[str],
metadatas: Sequence[dict] | None = None,
embeddings: Sequence[Sequence[float]] | None = None,
) -> None:
"""Insert or replace rows by primary key, requiring explicit embeddings.
Shapes the parallel lists via :meth:`_upsert_rows`, builds the upsert SQL
with :meth:`_upsert_sql`, and runs it with ``cursor.executemany`` on a
pooled (autocommit) connection so each id is inserted or fully updated.
Returns early when *ids* is empty. Unlike :class:`ChromaCompatCollection`,
this layer never embeds implicitly — callers supply Gemini embeddings.
Reached from synchronous indexing paths such as
``extract_tags_to_concepts`` and ``backfill_entity_provenance``, which
instantiate the collection and call ``upsert`` directly.
Args:
ids: Row primary keys.
documents: Document texts aligned with *ids*.
metadatas: Per-row metadata dicts, or ``None`` (treated as empty).
embeddings: Per-row embedding vectors; required.
Raises:
ValueError: If *embeddings* is ``None``.
"""
if embeddings is None:
raise ValueError("PgVectorCollection.upsert requires embeddings")
if not ids:
return
rows = self._upsert_rows(ids, documents, metadatas, embeddings)
sql = self._upsert_sql("psycopg")
pool = get_sync_pool()
with pool.connection() as conn:
with conn.cursor() as cur:
cur.executemany(sql, rows)
[docs]
def query(
self,
embedding: Sequence[float],
n_results: int = 5,
where: Any = None,
where_document: Any = None,
) -> list[dict[str, Any]]:
"""Run a synchronous nearest-neighbour search and return scored hits.
Builds the query via :meth:`_query_sql`, substitutes the rendered
:func:`vector_literal` into the reserved ``None`` placeholders, executes
it on a pooled connection, and maps each row to a dict including the raw
L2 ``distance`` and a derived ``similarity`` from :func:`l2_to_similarity`.
Used by synchronous search paths (e.g. ``rag_system/file_rag_manager``
via :class:`ChromaCompatCollection`, and direct callers like
``anamnesis_engine``).
Args:
embedding: The query embedding vector.
n_results: Maximum neighbours to return.
where: Chroma-style metadata filter, or ``None``.
where_document: Chroma-style document filter, or ``None``.
Returns:
list[dict[str, Any]]: Per-hit dicts with ``id``, ``document``,
``metadata``, ``distance``, and ``similarity``, nearest first.
"""
sql, params = self._query_sql("psycopg", n_results, where, where_document)
vec = vector_literal(embedding)
params = [vec if v is None else v for v in params]
pool = get_sync_pool()
with pool.connection() as conn:
with conn.cursor() as cur:
cur.execute(sql, params)
rows = cur.fetchall()
out: list[dict[str, Any]] = []
for r in rows:
dist = float(r[3]) if r[3] is not None else None
out.append(
{
"id": r[0],
"document": r[1],
"metadata": _coerce_metadata(r[2]),
"distance": dist,
"similarity": l2_to_similarity(dist),
}
)
return out
[docs]
def get(
self,
ids: Sequence[str] | None = None,
where: Any = None,
where_document: Any = None,
offset: int | None = None,
limit: int | None = None,
) -> dict[str, list]:
"""Fetch rows by id and/or filter (no vector search) synchronously.
Builds the statement with :meth:`_get_sql`, runs it on a pooled
connection, and reshapes the rows via :meth:`_rows_to_get`. Surfaced to
Chroma-style callers through :meth:`ChromaCompatCollection.get`.
Args:
ids: Ids to restrict to, or ``None`` for all matching rows.
where: Chroma-style metadata filter, or ``None``.
where_document: Chroma-style document filter, or ``None``.
offset: Row offset to skip, or ``None``.
limit: Maximum rows to return, or ``None``.
Returns:
dict[str, list]: ``{"ids", "documents", "metadatas"}`` aligned lists.
"""
sql, params = self._get_sql(
"psycopg", ids, where, where_document, offset, limit
)
pool = get_sync_pool()
with pool.connection() as conn:
with conn.cursor() as cur:
cur.execute(sql, params)
rows = cur.fetchall()
return self._rows_to_get(rows)
[docs]
def delete(
self, ids: Sequence[str] | None = None, where: Any = None
) -> None:
"""Delete rows by id and/or metadata filter synchronously.
No-ops when neither *ids* nor *where* is supplied (guarding against an
accidental full-table delete), otherwise builds the statement via
:meth:`_delete_sql` and runs it on a pooled connection. Surfaced through
:meth:`ChromaCompatCollection.delete`.
Args:
ids: Ids to delete, or ``None``.
where: Chroma-style metadata filter, or ``None``.
"""
if ids is None and not where:
return
sql, params = self._delete_sql("psycopg", ids, where)
pool = get_sync_pool()
with pool.connection() as conn:
conn.execute(sql, params)
[docs]
def count(self) -> int:
"""Return the total number of rows in the collection.
Runs ``SELECT count(*)`` on a pooled connection. Surfaced through
:meth:`ChromaCompatCollection.count`.
Returns:
int: The row count (``0`` if the query yields no row).
"""
pool = get_sync_pool()
with pool.connection() as conn:
with conn.cursor() as cur:
cur.execute(f"SELECT count(*) FROM {self._qname}")
row = cur.fetchone()
return int(row[0]) if row else 0
[docs]
def clear(self) -> None:
"""Empty the collection, creating it first if necessary.
Calls :meth:`ensure` so the ``TRUNCATE`` cannot fail on a missing table,
then truncates on a pooled connection. Data is removed but the table and
index remain.
"""
self.ensure()
pool = get_sync_pool()
with pool.connection() as conn:
conn.execute(f"TRUNCATE TABLE {self._qname}")
[docs]
def drop(self) -> None:
"""Drop the collection's entire schema and everything in it.
Runs ``DROP SCHEMA IF EXISTS … CASCADE`` on a pooled connection,
destroying every table/index under this store's schema (not just this
one table). Irreversible; intended for teardown/reset tooling.
"""
pool = get_sync_pool()
with pool.connection() as conn:
conn.execute(f'DROP SCHEMA IF EXISTS "{self.schema}" CASCADE')
[docs]
class ChromaCompatCollection:
"""Chroma-compatible sync facade over :class:`PgVectorCollection`.
Auto-embeds documents/queries via *embedding_fn* (e.g.
``SyncOpenRouterEmbeddings``) so call sites that relied on Chroma's
``embedding_function`` keep working with minimal changes.
``query`` returns Chroma-shaped nested lists and reports ``distances`` as
cosine distance (``1 - similarity``), so callers that compute
``1 - distance`` recover a cosine similarity in ``[0, 1]`` exactly as
they did against Chroma.
"""
[docs]
def __init__(self, pg: PgVectorCollection, embedding_fn: Any):
"""Wrap a :class:`PgVectorCollection` with an auto-embedding facade.
Stores the backing collection and an embedder. Instantiated by
``rag_system/file_rag_manager`` (``self.collection =
ChromaCompatCollection(self._pg, self.embedding_fn)``) so existing
Chroma-style call sites keep working over pgvector.
Args:
pg: The synchronous pgvector collection doing the real I/O.
embedding_fn: An embedder exposing either ``embed_documents`` /
``embed_query`` methods or a plain callable taking a list of
texts (e.g. ``SyncOpenRouterEmbeddings``).
"""
self.pg = pg
self.embedding_fn = embedding_fn
def _embed_documents(self, docs: Sequence[str]) -> list[list[float]]:
"""Embed a batch of document texts using :attr:`embedding_fn`.
Prefers an ``embed_documents`` method on the embedder and falls back to
calling the embedder directly. Called by :meth:`upsert` when the caller
did not pass precomputed embeddings.
Args:
docs: Document texts to embed.
Returns:
list[list[float]]: One embedding vector per input document.
"""
fn = getattr(self.embedding_fn, "embed_documents", None)
if fn is not None:
return list(fn(list(docs)))
return list(self.embedding_fn(list(docs)))
def _embed_query(self, text: str) -> list[float]:
"""Embed a single query string using :attr:`embedding_fn`.
Prefers an ``embed_query`` method, else calls the embedder with a
one-element list and takes the first vector. Called by :meth:`query`
when only ``query_texts`` (not embeddings) are supplied.
Args:
text: The query string to embed.
Returns:
list[float]: The query's embedding vector.
"""
fn = getattr(self.embedding_fn, "embed_query", None)
if fn is not None:
return list(fn([text]))[0]
return list(self.embedding_fn([text]))[0]
[docs]
def ensure(self) -> None:
"""Ensure the backing collection's schema/table/index exist.
Thin pass-through to :meth:`PgVectorCollection.ensure`.
"""
self.pg.ensure()
[docs]
def upsert(
self,
ids: Sequence[str],
documents: Sequence[str],
metadatas: Sequence[dict] | None = None,
embeddings: Sequence[Sequence[float]] | None = None,
) -> None:
"""Upsert documents, auto-embedding them when no vectors are given.
Computes embeddings via :meth:`_embed_documents` if *embeddings* is
``None`` (the Chroma ``embedding_function`` behaviour), then delegates to
:meth:`PgVectorCollection.upsert`.
Args:
ids: Row primary keys.
documents: Document texts aligned with *ids*.
metadatas: Per-row metadata dicts, or ``None``.
embeddings: Precomputed vectors; embedded on demand if ``None``.
"""
if embeddings is None:
embeddings = self._embed_documents(documents)
self.pg.upsert(ids, documents, metadatas, embeddings)
[docs]
def add(
self,
ids: Sequence[str],
documents: Sequence[str],
metadatas: Sequence[dict] | None = None,
embeddings: Sequence[Sequence[float]] | None = None,
) -> None:
"""Alias for :meth:`upsert`, matching Chroma's ``add`` API.
Forwards all arguments to :meth:`upsert` so call sites using Chroma's
``collection.add`` semantics keep working.
Args:
ids: Row primary keys.
documents: Document texts aligned with *ids*.
metadatas: Per-row metadata dicts, or ``None``.
embeddings: Precomputed vectors, or ``None`` to auto-embed.
"""
self.upsert(ids, documents, metadatas=metadatas, embeddings=embeddings)
[docs]
def get(
self,
ids: Sequence[str] | None = None,
where: Any = None,
where_document: Any = None,
limit: int | None = None,
offset: int | None = None,
include: Any = None,
) -> dict[str, list]:
"""Fetch rows by id/filter, mirroring Chroma's ``get`` signature.
Delegates to :meth:`PgVectorCollection.get`. The Chroma-only *include*
argument is accepted for signature compatibility but ignored (this layer
always returns ids/documents/metadatas).
Args:
ids: Ids to restrict to, or ``None``.
where: Chroma-style metadata filter, or ``None``.
where_document: Chroma-style document filter, or ``None``.
limit: Maximum rows to return, or ``None``.
offset: Row offset to skip, or ``None``.
include: Accepted for Chroma compatibility; ignored.
Returns:
dict[str, list]: ``{"ids", "documents", "metadatas"}`` lists.
"""
return self.pg.get(
ids=ids,
where=where,
where_document=where_document,
offset=offset,
limit=limit,
)
[docs]
def delete(
self, ids: Sequence[str] | None = None, where: Any = None
) -> None:
"""Delete rows by id and/or metadata filter.
Thin pass-through to :meth:`PgVectorCollection.delete`.
Args:
ids: Ids to delete, or ``None``.
where: Chroma-style metadata filter, or ``None``.
"""
self.pg.delete(ids=ids, where=where)
[docs]
def count(self) -> int:
"""Return the collection's row count via :meth:`PgVectorCollection.count`.
Returns:
int: Total number of stored rows.
"""
return self.pg.count()
[docs]
def query(
self,
query_embeddings: Any = None,
query_texts: Any = None,
n_results: int = 10,
where: Any = None,
where_document: Any = None,
include: Any = None,
) -> dict[str, list]:
"""Run a nearest-neighbour search returning Chroma-shaped nested lists.
Resolves the query vector from *query_embeddings* (unwrapping a
list-of-lists batch to its first row) or by embedding ``query_texts[0]``
via :meth:`_embed_query`; with neither it returns the empty Chroma
shape. Delegates the search to :meth:`PgVectorCollection.query`, then
repackages results into single-batch nested lists and converts each
``similarity`` back to a cosine ``distance`` (``1 - similarity``) so
callers computing ``1 - distance`` recover the similarity exactly as
they did against Chroma. The *include* argument is ignored.
Args:
query_embeddings: A query vector or a one-element batch of vectors,
or ``None``.
query_texts: A list of query strings (only the first is used), or
``None``.
n_results: Maximum neighbours to return.
where: Chroma-style metadata filter, or ``None``.
where_document: Chroma-style document filter, or ``None``.
include: Accepted for Chroma compatibility; ignored.
Returns:
dict[str, list]: ``{"ids", "documents", "metadatas", "distances"}``
each wrapped in a single-query outer list; ``distances`` are cosine
distances. Returns empty single-batch lists when no query is given.
"""
empty = {"ids": [[]], "documents": [[]], "metadatas": [[]], "distances": [[]]}
if query_embeddings is not None:
emb = query_embeddings
if emb and isinstance(emb[0], (list, tuple)):
emb = emb[0]
elif query_texts:
emb = self._embed_query(query_texts[0])
else:
return empty
rows = self.pg.query(
emb, n_results=n_results, where=where, where_document=where_document
)
return {
"ids": [[r["id"] for r in rows]],
"documents": [[r["document"] for r in rows]],
"metadatas": [[r["metadata"] for r in rows]],
"distances": [
[
(1.0 - r["similarity"]) if r["similarity"] is not None else None
for r in rows
]
],
}
[docs]
class AsyncPgVectorCollection(_BaseCollection):
"""Asynchronous pgvector collection backed by the per-loop asyncpg pool.
Concrete :class:`_BaseCollection` for genuinely-async hot paths running on
the bot event loop. Each coroutine acquires a connection from
:func:`get_async_pool` and runs the driver-neutral SQL the base class builds
(with ``asyncpg`` numbered placeholders). Exposes the async Chroma-like
surface ``ensure`` / ``upsert`` / ``query`` / ``get`` / ``delete`` /
``count`` / ``clear``; embeddings are always supplied by the caller. Drives
async paths such as ``prompt_context._divine_reflex`` (Golden Goddess
reflex), ``tools/query_golden_goddess_v2``, and ``tools/chromadb_tools``.
"""
[docs]
async def ensure(self) -> None:
"""Create the schema, table, and HNSW index for this collection (async).
Async counterpart of :meth:`PgVectorCollection.ensure`: runs the
idempotent DDL from :func:`_ensure_sql` on a connection acquired from
:func:`get_async_pool`. Called by ``tools/chromadb_tools`` before
upserting into a vector DB, and internally by :meth:`clear`.
"""
pool = await get_async_pool()
async with pool.acquire() as conn:
for stmt in _ensure_sql(self.schema, self.table, self.dim):
await conn.execute(stmt)
[docs]
async def upsert(
self,
ids: Sequence[str],
documents: Sequence[str],
metadatas: Sequence[dict] | None = None,
embeddings: Sequence[Sequence[float]] | None = None,
) -> None:
"""Insert or update rows on the async (asyncpg) path.
No-ops on empty *ids*; otherwise rows are built by
:meth:`_upsert_rows`, the SQL by :meth:`_upsert_sql`, and executed with
``executemany`` on a pooled connection from :func:`get_async_pool`.
Used by async ingestion code such as ``tools/chromadb_tools``.
Args:
ids: Stable ids for each row.
documents: The document texts.
metadatas: Per-row metadata dicts, or ``None``.
embeddings: Required precomputed embeddings (this layer never embeds).
Raises:
ValueError: If *embeddings* is ``None``.
"""
if embeddings is None:
raise ValueError("AsyncPgVectorCollection.upsert requires embeddings")
if not ids:
return
rows = self._upsert_rows(ids, documents, metadatas, embeddings)
sql = self._upsert_sql("asyncpg")
pool = await get_async_pool()
async with pool.acquire() as conn:
await conn.executemany(sql, rows)
[docs]
async def query(
self,
embedding: Sequence[float],
n_results: int = 5,
where: Any = None,
where_document: Any = None,
) -> list[dict[str, Any]]:
"""Run an async nearest-neighbour search and return scored hits.
Builds the SQL via :meth:`_query_sql`, fills the reserved vector slots
with the rendered :func:`vector_literal`, fetches rows on a pooled
connection from :func:`get_async_pool`, and maps each row to a dict with
the raw L2 ``distance`` and a ``similarity`` from
:func:`l2_to_similarity`.
Drives async hot paths such as ``prompt_context._divine_reflex`` (Golden
Goddess reflex), ``tools/query_golden_goddess_v2``, and
``tools/chromadb_tools``.
Args:
embedding: The query embedding vector.
n_results: Maximum number of neighbours to return.
where: Chroma-style metadata filter dict, or ``None``.
where_document: Chroma-style document filter dict, or ``None``.
Returns:
list[dict[str, Any]]: Per-hit dicts with ``id``, ``document``,
``metadata``, ``distance``, and ``similarity``, nearest first.
"""
sql, params = self._query_sql("asyncpg", n_results, where, where_document)
vec = vector_literal(embedding)
params = [vec if v is None else v for v in params]
pool = await get_async_pool()
async with pool.acquire() as conn:
rows = await conn.fetch(sql, *params)
out: list[dict[str, Any]] = []
for r in rows:
dist = float(r["distance"]) if r["distance"] is not None else None
out.append(
{
"id": r["id"],
"document": r["document"],
"metadata": _coerce_metadata(r["metadata"]),
"distance": dist,
"similarity": l2_to_similarity(dist),
}
)
return out
[docs]
async def get(
self,
ids: Sequence[str] | None = None,
where: Any = None,
where_document: Any = None,
offset: int | None = None,
limit: int | None = None,
) -> dict[str, list]:
"""Fetch rows by id and/or filters on the async path.
Builds the statement via :meth:`_get_sql`, fetches rows on a pooled
connection from :func:`get_async_pool`, then normalises the asyncpg
``Record`` rows into Chroma-style parallel lists via
:meth:`_rows_to_get`. Used by async tools like ``tools/chromadb_tools``.
Args:
ids: Specific ids to fetch, or ``None``.
where: Chroma-style metadata filter dict, or ``None``.
where_document: Chroma-style document filter dict, or ``None``.
offset: Row offset for pagination, or ``None``.
limit: Maximum rows to return, or ``None``.
Returns:
dict[str, list]: Parallel ``ids`` / ``documents`` / ``metadatas``
lists.
"""
sql, params = self._get_sql(
"asyncpg", ids, where, where_document, offset, limit
)
pool = await get_async_pool()
async with pool.acquire() as conn:
rows = await conn.fetch(sql, *params)
return self._rows_to_get(
[(r["id"], r["document"], r["metadata"]) for r in rows]
)
[docs]
async def delete(
self, ids: Sequence[str] | None = None, where: Any = None
) -> None:
"""Delete rows by id list and/or metadata filter (async).
No-ops when neither *ids* nor *where* is given. Otherwise builds the
statement via :meth:`_delete_sql` and executes it on a pooled connection
from :func:`get_async_pool`. Used by async maintenance code such as
``tools/chromadb_tools``.
Args:
ids: Ids to delete, or ``None``.
where: Chroma-style metadata filter selecting rows to delete, or
``None``.
"""
if ids is None and not where:
return
sql, params = self._delete_sql("asyncpg", ids, where)
pool = await get_async_pool()
async with pool.acquire() as conn:
await conn.execute(sql, *params)
[docs]
async def count(self) -> int:
"""Return the collection's row count (async).
Runs ``SELECT count(*)`` via ``fetchval`` on a pooled connection from
:func:`get_async_pool`. Used by async tools such as
``tools/chromadb_tools``.
Returns:
int: The row count, or ``0`` when the value is ``None``.
"""
pool = await get_async_pool()
async with pool.acquire() as conn:
val = await conn.fetchval(f"SELECT count(*) FROM {self._qname}")
return int(val) if val is not None else 0
[docs]
async def clear(self) -> None:
"""Empty the collection on the async path, ensuring it exists first.
Calls :meth:`ensure` then ``TRUNCATE``\\ s the table on a pooled connection
from :func:`get_async_pool`; the schema and HNSW index are preserved. No
internal or grep-found external callers; invoked ad hoc by async
maintenance code.
"""
await self.ensure()
pool = await get_async_pool()
async with pool.acquire() as conn:
await conn.execute(f"TRUNCATE TABLE {self._qname}")
# ---------------------------------------------------------------------------
# Store introspection (replaces filesystem listing of rag_stores/)
# ---------------------------------------------------------------------------
_SYSTEM_SCHEMAS = frozenset(
{"pg_catalog", "information_schema", "public", "pg_toast"}
)
[docs]
def list_store_schemas() -> list[str]:
"""List the user (non-system) schemas in the vector database.
Replaces the old filesystem listing of ``rag_stores/`` directories: it
queries ``information_schema.schemata`` on the synchronous pool
(:func:`get_sync_pool`), excluding Postgres's own schemas (``pg_catalog``,
``information_schema``, ``public``, ``pg_toast``, and the ``pg_temp_*`` /
``pg_toast_temp_*`` families) so only logical store schemas remain. No
callers were found by grep, so it is used (if at all) by introspection
tooling or scripts.
Returns:
list[str]: Candidate store schema names, ordered alphabetically.
"""
pool = get_sync_pool()
with pool.connection() as conn:
with conn.cursor() as cur:
cur.execute(
"SELECT schema_name FROM information_schema.schemata "
"WHERE schema_name NOT IN ('pg_catalog','information_schema',"
"'public','pg_toast') AND schema_name NOT LIKE 'pg_temp_%' "
"AND schema_name NOT LIKE 'pg_toast_temp_%' "
"ORDER BY schema_name"
)
rows = cur.fetchall()
return [r[0] for r in rows]
[docs]
def table_count(schema: str, table: str) -> int:
"""Return the row count for a given ``schema.table``, or ``0`` if absent.
Validates both identifiers with :func:`_validate_ident` (since the table
name is interpolated into the ``count(*)`` query), then first checks
``information_schema.tables`` to confirm the table exists before counting,
so a missing collection yields ``0`` rather than raising. Runs on the
synchronous pool from :func:`get_sync_pool`. No callers were found by grep,
so it is used (if at all) by introspection/verification tooling.
Args:
schema: The Postgres schema name (validated).
table: The Postgres table name (validated).
Returns:
int: The row count, or ``0`` when the table does not exist.
Raises:
ValueError: If *schema* or *table* is not a safe identifier.
"""
_validate_ident(schema, "schema")
_validate_ident(table, "table")
pool = get_sync_pool()
with pool.connection() as conn:
with conn.cursor() as cur:
cur.execute(
"SELECT EXISTS (SELECT 1 FROM information_schema.tables "
"WHERE table_schema=%s AND table_name=%s)",
(schema, table),
)
exists = cur.fetchone()[0]
if not exists:
return 0
cur.execute(f'SELECT count(*) FROM "{schema}"."{table}"')
row = cur.fetchone()
return int(row[0]) if row else 0