Source code for vector_store

"""Shared Postgres + pgvector store layer.

This module replaces the legacy ``chroma_registry`` / on-disk ChromaDB
``PersistentClient`` model.  All vector storage now lives in a single
Postgres database (``stargazer_filerag``) using one **schema per logical
store** and one table per collection with the shape::

    id        text PRIMARY KEY
    document  text
    metadata  jsonb
    embedding halfvec(3072)        -- HNSW index, halfvec_l2_ops (L2)

Embeddings are **always supplied by the caller** (computed via the shared
Gemini embed pool); this layer never embeds implicitly the way a ChromaDB
``embedding_function`` did.

Two drivers are provided because the codebase mixes async and sync call
sites:

* :class:`AsyncPgVectorCollection` (asyncpg) for genuinely-async hot paths
  running on the bot event loop (e.g. ``prompt_context._divine_reflex``).
* :class:`PgVectorCollection` (psycopg3, thread-safe pool) for synchronous
  code: ``FileRAGManager`` worker threads, CLIs, and ``scripts/``.

Both expose the same Chroma-like surface: ``ensure`` / ``upsert`` /
``query`` / ``get`` / ``delete`` / ``count`` / ``clear``.

Built by Stargazer Project.
"""

from __future__ import annotations

import asyncio
import json
import logging
import os
import re
import threading
from typing import Any, Iterable, Sequence

logger = logging.getLogger(__name__)

# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------

EMBED_DIM = 3072
"""All migrated stores use Gemini ``gemini-embedding-001`` at 3072 dims."""

_IDENT_RE = re.compile(r"^[A-Za-z0-9_]+$")

# Default connection target.  Mirrors the ``_DEFAULT_OPENROUTER_KEY`` pattern
# already used in ``gemini_embed_pool`` so CLIs/scripts work out of the box;
# overridable via env (``STARGAZER_VECTOR_PG_*`` / ``STARGAZER_VECTOR_PG_DSN``)
# or programmatically via :func:`configure`.
_DEFAULT_CONN: dict[str, Any] = {
    "host": "10.10.0.7",
    "port": 5432,
    "database": "stargazer_filerag",
    "user": "stargazer",
    "password": "r9yJpys5vLFrnrx7YuYCRiwF3VW8",
    "sslmode": "prefer",
    "min_size": 1,
    "max_size": 8,
}

_lock = threading.Lock()
_conn_params: dict[str, Any] | None = None


# ---------------------------------------------------------------------------
# Connection-parameter resolution
# ---------------------------------------------------------------------------


[docs] def configure(**overrides: Any) -> None: """Override the resolved Postgres connection parameters at runtime. Seeds the module-level ``_conn_params`` cache from :func:`_resolve_params` (env plus defaults) and then layers the given overrides on top, so every subsequent pool built by :func:`get_async_pool` or :func:`get_sync_pool` uses them. Mutates shared module state under ``_lock``; accepts ``host``, ``port``, ``database``, ``user``, ``password``, ``sslmode``, ``min_size``, ``max_size``, or a full ``dsn`` string. Empty/None values are ignored so callers can pass through optional config fields without clobbering defaults. Called by :func:`configure_from_config`; no external callers were found by grep. Args: **overrides: Any of the connection-param keys above, or ``dsn``. """ global _conn_params with _lock: params = dict(_resolve_params()) for k, v in overrides.items(): if v is None: continue if isinstance(v, str) and not v.strip(): continue params[k] = v _conn_params = params
def _resolve_params() -> dict[str, Any]: """Resolve and cache the effective Postgres connection parameters. Returns the cached ``_conn_params`` if already populated; otherwise builds a fresh dict from :data:`_DEFAULT_CONN`, overlays any ``STARGAZER_VECTOR_PG_*`` environment variables (and ``STARGAZER_VECTOR_PG_DSN`` as a full ``dsn``), coercing ``port``/``min_size``/``max_size`` to ``int``, and memoises the result so the precedence is env over module defaults. Reads ``os.environ`` and mutates the module-level cache. Called by :func:`configure`, :func:`_asyncpg_kwargs`, :func:`_psycopg_conninfo`, and :func:`_pool_sizes`. Returns: dict[str, Any]: The resolved connection parameters. """ global _conn_params if _conn_params is not None: return _conn_params dsn = os.environ.get("STARGAZER_VECTOR_PG_DSN", "").strip() params = dict(_DEFAULT_CONN) if dsn: params["dsn"] = dsn env_map = { "STARGAZER_VECTOR_PG_HOST": "host", "STARGAZER_VECTOR_PG_PORT": "port", "STARGAZER_VECTOR_PG_DATABASE": "database", "STARGAZER_VECTOR_PG_USER": "user", "STARGAZER_VECTOR_PG_PASSWORD": "password", "STARGAZER_VECTOR_PG_SSLMODE": "sslmode", "STARGAZER_VECTOR_PG_MIN_SIZE": "min_size", "STARGAZER_VECTOR_PG_MAX_SIZE": "max_size", } for env_var, key in env_map.items(): val = os.environ.get(env_var) if val is None or not val.strip(): continue if key in ("port", "min_size", "max_size"): try: params[key] = int(val) except ValueError: continue else: params[key] = val.strip() _conn_params = params return params
[docs] def configure_from_config(config: Any) -> None: """Apply a ``Config`` object's vector-store fields to the connection params. Pulls the ``vector_pg_*`` attributes off *config* (via ``getattr`` with a ``None`` default, so missing fields are simply skipped) and forwards them to :func:`configure`, which ignores the ``None`` values. Intended as a startup hook so a service's loaded ``Config`` can point this module at the right Postgres without touching the environment. No callers were found by grep, so it is invoked (if at all) by service bootstrap code. Args: config: A config object exposing optional ``vector_pg_dsn`` / ``vector_pg_host`` / ``vector_pg_port`` / ``vector_pg_database`` / ``vector_pg_user`` / ``vector_pg_password`` / ``vector_pg_sslmode`` / ``vector_pg_min_size`` / ``vector_pg_max_size`` attributes, or ``None`` (a no-op). """ if config is None: return configure( dsn=getattr(config, "vector_pg_dsn", None), host=getattr(config, "vector_pg_host", None), port=getattr(config, "vector_pg_port", None), database=getattr(config, "vector_pg_database", None), user=getattr(config, "vector_pg_user", None), password=getattr(config, "vector_pg_password", None), sslmode=getattr(config, "vector_pg_sslmode", None), min_size=getattr(config, "vector_pg_min_size", None), max_size=getattr(config, "vector_pg_max_size", None), )
def _asyncpg_kwargs() -> dict[str, Any]: """Build the keyword arguments for ``asyncpg.create_pool``. Reads the resolved connection params and returns either a single ``dsn`` entry (when a DSN string was configured) or the discrete ``host``/``port``/ ``user``/``password``/``database`` fields asyncpg expects. An empty password is normalised to ``None``. Calls :func:`_resolve_params`. Called by :func:`get_async_pool` when it constructs the per-event-loop asyncpg pool. No other module references it. Returns: dict[str, Any]: Keyword arguments suitable for ``asyncpg.create_pool``. """ p = _resolve_params() if p.get("dsn"): return {"dsn": p["dsn"]} return { "host": p["host"], "port": int(p["port"]), "user": p["user"], "password": p["password"] or None, "database": p["database"], } def _psycopg_conninfo() -> str: """Build the libpq-style ``conninfo`` string for the psycopg3 pool. Returns the configured ``dsn`` verbatim when present; otherwise assembles a space-separated ``key=value`` connection string from the discrete params, including ``password`` and ``sslmode`` only when set. Calls :func:`_resolve_params`. Called by :func:`get_sync_pool` when it opens the shared synchronous ``ConnectionPool``. No external callers found. Returns: str: A libpq ``conninfo`` string consumable by psycopg3. """ p = _resolve_params() if p.get("dsn"): return p["dsn"] parts = [ f"host={p['host']}", f"port={int(p['port'])}", f"dbname={p['database']}", f"user={p['user']}", ] if p.get("password"): parts.append(f"password={p['password']}") if p.get("sslmode"): parts.append(f"sslmode={p['sslmode']}") return " ".join(parts) def _pool_sizes() -> tuple[int, int]: """Return the ``(min_size, max_size)`` pool bounds from resolved params. Reads ``min_size``/``max_size`` (defaulting to ``1`` and ``8``) and coerces them to ``int``. Calls :func:`_resolve_params`; called by both :func:`get_async_pool` and :func:`get_sync_pool` when sizing their pools. Returns: tuple[int, int]: The minimum and maximum connection counts. """ p = _resolve_params() return int(p.get("min_size", 1)), int(p.get("max_size", 8)) def _validate_ident(name: str, kind: str) -> str: """Validate that *name* is a safe Postgres identifier and return it. Identifiers are interpolated directly into SQL DDL/DML (schema and table names), so this guards against injection by requiring *name* to match ``_IDENT_RE`` (alphanumerics and underscores only). Unlike :func:`pg_ident`, it does not sanitise — it rejects. Called throughout this module wherever a schema/table name reaches SQL: :class:`_BaseCollection.__init__`, :func:`_ensure_sql`, and :func:`table_count`. Also imported by ``rag_system/pg_source_files.py``, which pairs it with :func:`pg_ident` to validate a sanitised schema name. Args: name: The candidate identifier (schema or table name). kind: A short label (e.g. ``"schema"``/``"table"``) used in the error message. Returns: str: The validated *name*, unchanged. Raises: ValueError: If *name* is empty or contains characters outside ``[A-Za-z0-9_]``. """ if not name or not _IDENT_RE.match(name): raise ValueError(f"Unsafe {kind} identifier: {name!r}") return name
[docs] def pg_ident(name: str) -> str: """Coerce an arbitrary string into a safe Postgres identifier. Unlike :func:`_validate_ident` (which rejects unsafe input), this sanitises: it mirrors the underscore-only convention used by the migration (e.g. ``us_law``, ``files_us_law``). Non-alphanumeric characters collapse to a single ``_``, an empty result becomes ``store``, a leading digit is prefixed with ``s_``, and the result is truncated to Postgres's 63-char identifier limit. Pure string helper with no I/O. Used across the repo to derive schema/table names from store paths and collection names: ``anamnesis_engine``, ``extract_tags_to_concepts``, ``backfill_entity_provenance``, ``rag_system/file_rag_manager``, ``rag_system/pg_source_files`` (which then re-validates via :func:`_validate_ident`), ``tools/chromadb_tools``, and ``scripts/migrate_vector_dbs_to_pgvector``. Args: name: The raw name to sanitise (e.g. a store directory or collection). Returns: str: A Postgres-safe identifier of at most 63 characters. """ s = re.sub(r"[^A-Za-z0-9_]", "_", name or "") s = re.sub(r"_+", "_", s).strip("_") if not s: s = "store" if s[0].isdigit(): s = "s_" + s return s[:63]
# --------------------------------------------------------------------------- # Helpers: vector / metadata / filter translation # ---------------------------------------------------------------------------
[docs] def vector_literal(embedding: Sequence[float]) -> str: """Render an embedding vector as a pgvector text literal. Produces the bracketed comma-separated form (``[v1,v2,...]``) that pgvector's ``halfvec``/``vector`` input parser accepts, coercing each element to ``float`` and using ``repr`` so full precision survives the round-trip. Pure helper with no I/O. Called wherever a vector is bound into SQL within this module: :meth:`_BaseCollection._upsert_rows` (per upserted row) and the ``query`` methods of both :class:`PgVectorCollection` and :class:`AsyncPgVectorCollection`, where it fills the reserved ``None`` placeholder. Args: embedding: The numeric embedding vector. Returns: str: A pgvector text literal of the form ``[v1,v2,...]``. """ return "[" + ",".join(repr(float(x)) for x in embedding) + "]"
def _coerce_metadata(meta: Any) -> dict[str, Any]: """Normalise a row's ``metadata`` value into a plain ``dict``. Postgres ``jsonb`` may surface to a driver as an already-parsed ``dict`` or as a raw JSON ``str`` (asyncpg returns the latter unless a codec is set), so this decodes strings via ``json.loads`` and treats anything unparseable or of an unexpected type as an empty mapping. Pure helper with no I/O. Called when shaping query/get results back into Python dicts: :meth:`PgVectorCollection.query`, :meth:`_BaseCollection._rows_to_get`, and :meth:`AsyncPgVectorCollection.query`. Args: meta: A metadata value from a result row (``None``, JSON ``str``, ``dict``, or other). Returns: dict[str, Any]: The decoded metadata, or ``{}`` when absent/invalid. """ if meta is None: return {} if isinstance(meta, str): try: return json.loads(meta) except (ValueError, TypeError): return {} if isinstance(meta, dict): return meta return {}
[docs] def l2_to_similarity(distance: float | None) -> float | None: """Convert an L2 distance to a cosine-style similarity in ``[0, 1]``. Because Gemini ``gemini-embedding-001`` at 3072 dims is pre-normalised, for unit vectors ``L2^2 = 2 - 2*cos``, so ``cos = 1 - L2^2/2``; the result is clamped into ``[0, 1]``. This keeps the reported ``similarity`` aligned with the old Chroma ``1 - distance`` convention. Pure helper with no I/O. Called by the ``query`` methods of both :class:`PgVectorCollection` and :class:`AsyncPgVectorCollection` to attach a ``similarity`` to each hit. Args: distance: The raw L2 distance from the pgvector ``<->`` operator, or ``None``. Returns: float | None: The similarity in ``[0, 1]``, or ``None`` when *distance* is ``None``. """ if distance is None: return None sim = 1.0 - (distance * distance) / 2.0 if sim < 0.0: return 0.0 if sim > 1.0: return 1.0 return sim
class _ParamBuilder: """Accumulates bind values and emits driver-correct SQL placeholders. Lets one set of SQL builders target both drivers used in this module: asyncpg (numbered ``$1``, ``$2`` … placeholders) and psycopg (``%s``). Each :meth:`add` records a value in call order and hands back the matching placeholder text, so the accumulated :attr:`values` list lines up positionally with the assembled statement. A fresh instance is created per statement by every SQL builder here (:func:`_translate_clause`, :func:`_build_filter_sql`, :meth:`_BaseCollection._upsert_sql`, ``_query_sql``, ``_get_sql``, ``_delete_sql``). """ def __init__(self, style: str): """Initialise an empty parameter accumulator for one driver style. Args: style: Either ``"asyncpg"`` (emits numbered ``$1``, ``$2`` … placeholders) or ``"psycopg"`` (emits ``%s`` placeholders). """ self.style = style # "asyncpg" | "psycopg" self.values: list[Any] = [] def add(self, value: Any) -> str: """Record a bind value and return its driver-correct placeholder. Appends *value* to :attr:`values` (preserving call order) and returns the placeholder text to splice into the SQL string. Used by every SQL builder in this module (:func:`_translate_clause`, :func:`_build_filter_sql`, :meth:`_BaseCollection._query_sql`, etc.) so the same builder code targets both asyncpg and psycopg. Args: value: The Python value to bind for this placeholder. Returns: str: ``"$N"`` for the asyncpg style (1-based positional index) or ``"%s"`` for psycopg. """ self.values.append(value) if self.style == "asyncpg": return f"${len(self.values)}" return "%s" def _translate_clause(node: Any, pb: _ParamBuilder) -> str | None: """Translate a Chroma-style ``where`` dict into a SQL boolean clause. Recursively walks the filter, handling the boolean combinators ``$and`` and ``$or`` plus the per-field operators ``$eq`` / ``$ne`` / ``$in`` / ``$contains`` and bare equality ``{"field": value}``. Field comparisons run against the ``jsonb`` metadata column as text (``metadata->>key``), and every operand is parameterised through *pb* (never inlined) so the clause is injection-safe; values are coerced to ``str`` to match ``->>`` semantics. Unknown operators fall back to equality. Called by :func:`_build_filter_sql` and :meth:`_BaseCollection._delete_sql`, and recursively by itself for nested ``$and`` / ``$or`` branches. Args: node: A Chroma-style filter dict (or anything else, treated as no clause). pb: The :class:`_ParamBuilder` collecting bind values for this query. Returns: str | None: A SQL boolean fragment, or ``None`` when *node* yields no predicate. """ if not node or not isinstance(node, dict): return None clauses: list[str] = [] for key, val in node.items(): if key == "$and": sub = [c for c in (_translate_clause(x, pb) for x in val) if c] if sub: clauses.append("(" + " AND ".join(sub) + ")") continue if key == "$or": sub = [c for c in (_translate_clause(x, pb) for x in val) if c] if sub: clauses.append("(" + " OR ".join(sub) + ")") continue col = f"metadata->>{pb.add(key)}" if isinstance(val, dict): for op, opval in val.items(): if op == "$eq": clauses.append(f"{col} = {pb.add(str(opval))}") elif op == "$ne": clauses.append( f"({col} IS DISTINCT FROM {pb.add(str(opval))})" ) elif op == "$contains": clauses.append(f"{col} ILIKE {pb.add('%' + str(opval) + '%')}") elif op == "$in": vals = [str(x) for x in (opval or [])] clauses.append(f"{col} = ANY({pb.add(vals)})") else: clauses.append(f"{col} = {pb.add(str(opval))}") else: clauses.append(f"{col} = {pb.add(str(val))}") if not clauses: return None return " AND ".join(clauses) def _translate_where_document(where_document: Any, pb: _ParamBuilder) -> str | None: """Translate a Chroma ``where_document`` filter into a SQL clause. Supports only the ``$contains`` operator, mapping it to a case-insensitive ``document ILIKE '%…%'`` predicate; any other shape yields no clause. Binds the search pattern via *pb* so the substring is parameterised, not inlined. Called by :func:`_build_filter_sql`, which combines its result with the metadata ``where`` clause. No external callers. Args: where_document: A Chroma-style document filter dict (e.g. ``{"$contains": "text"}``) or anything else (ignored). pb: The :class:`_ParamBuilder` collecting bind values for this query. Returns: str | None: A SQL boolean fragment, or ``None`` when no filter applies. """ if not where_document or not isinstance(where_document, dict): return None contains = where_document.get("$contains") if contains: return f"document ILIKE {pb.add('%' + str(contains) + '%')}" return None def _build_filter_sql( pb: _ParamBuilder, where: Any = None, where_document: Any = None, ) -> str: """Combine metadata and document filters into a ``WHERE`` suffix. Translates the Chroma-style *where* clause via :func:`_translate_clause` and the *where_document* clause via :func:`_translate_where_document`, joining any resulting fragments with ``AND``. Returns an empty string when neither produces a clause (so it can be concatenated unconditionally). Bind values accumulate in *pb*. Called by :meth:`_BaseCollection._query_sql` and :meth:`_BaseCollection._get_sql` while assembling their statements. Args: pb: The :class:`_ParamBuilder` collecting bind values. where: A Chroma-style metadata filter dict, or ``None``. where_document: A Chroma-style document filter dict, or ``None``. Returns: str: A leading-space ``" WHERE …"`` fragment, or ``""`` if no filters. """ parts: list[str] = [] w = _translate_clause(where, pb) if w: parts.append(w) wd = _translate_where_document(where_document, pb) if wd: parts.append(wd) if not parts: return "" return " WHERE " + " AND ".join(parts) # --------------------------------------------------------------------------- # Async pool (asyncpg) — keyed per running event loop # --------------------------------------------------------------------------- _async_pools: dict[int, Any] = {} _async_pool_lock = asyncio.Lock()
[docs] async def get_async_pool(): """Return the asyncpg pool for the current event loop, creating it lazily. Pools are cached per running event loop (keyed by ``id(loop)`` in the module-level ``_async_pools`` dict) because asyncpg pools are bound to the loop that created them; this matters in the multi-service bot where worker threads run their own loops. On a cache miss it double-checks under ``_async_pool_lock`` then opens an ``asyncpg.create_pool`` sized by :func:`_pool_sizes` and configured by :func:`_asyncpg_kwargs`, opening real Postgres connections and logging the creation. Awaited by :func:`warm_async_pool`, :func:`close_async_pool`, the ``ensure``/``upsert``/ ``query``/``get``/``delete``/``count`` methods of :class:`AsyncPgVectorCollection`, and external async callers such as ``sword/overlay`` and ``tools/chromadb_tools``. Returns: asyncpg.Pool: The pool bound to the current event loop. """ import asyncpg loop = asyncio.get_running_loop() key = id(loop) pool = _async_pools.get(key) if pool is not None: return pool async with _async_pool_lock: pool = _async_pools.get(key) if pool is None: min_size, max_size = _pool_sizes() pool = await asyncpg.create_pool( min_size=min_size, max_size=max_size, command_timeout=60, **_asyncpg_kwargs(), ) _async_pools[key] = pool logger.info("vector_store: created asyncpg pool for loop %s", key) return pool
[docs] async def warm_async_pool() -> None: """Warm the asyncpg pool and initialise the SWORD overlay cache schema. A startup hook that establishes the per-loop pool via :func:`get_async_pool`, runs ``SELECT 1`` to confirm connectivity, and then idempotently creates the ``sword_cache`` schema and its ``overlay_cache`` table (used by ``sword/overlay``) so the first real overlay write does not pay DDL latency. Acquires a pooled connection and issues DDL/DML; logs and re-raises on failure so a broken database surfaces at boot. Awaited by ``prompt_context`` during startup and by the SWORD overlay Postgres tests. Raises: Exception: If the SWORD cache schema/table cannot be created (the ``SELECT 1`` connectivity error propagates likewise). """ pool = await get_async_pool() async with pool.acquire() as conn: await conn.execute("SELECT 1") try: # Ensure SWORD overlay cache table exists at startup await conn.execute('CREATE SCHEMA IF NOT EXISTS "sword_cache"') await conn.execute( 'CREATE TABLE IF NOT EXISTS "sword_cache"."overlay_cache" (' ' key TEXT PRIMARY KEY,' ' payload JSONB,' ' updated_at DOUBLE PRECISION' ')' ) logger.info("[sword] PostgreSQL SWORD cache schema and table verified/initialized successfully.") except Exception as e: logger.error("[sword] Failed to initialize PostgreSQL SWORD cache schema: %s", e) raise e
[docs] async def close_async_pool() -> None: """Close and forget the asyncpg pool bound to the current event loop. Pops the pool keyed by the running loop's id out of the module-level ``_async_pools`` cache and awaits its ``close()``; a no-op if no pool exists for this loop. Intended for orderly shutdown of a worker's event loop. No internal callers were found in this module, and no external callers were found by grep, so it is invoked (if at all) by service teardown code. Raises: RuntimeError: If called outside a running event loop (from ``asyncio.get_running_loop``). """ loop = asyncio.get_running_loop() pool = _async_pools.pop(id(loop), None) if pool is not None: await pool.close()
# --------------------------------------------------------------------------- # Sync pool (psycopg3) — single thread-safe pool # --------------------------------------------------------------------------- _sync_pool: Any = None _sync_pool_lock = threading.Lock()
[docs] def get_sync_pool(): """Return the shared psycopg3 connection pool, creating it lazily. Unlike the per-loop async pools, a single process-wide thread-safe ``ConnectionPool`` serves all synchronous callers (worker threads, CLIs, scripts). On first use it double-checks under ``_sync_pool_lock`` then opens an autocommit pool with conninfo from :func:`_psycopg_conninfo`, sized by :func:`_pool_sizes`, opening real Postgres connections and logging the creation. Called by :func:`warm_sync_pool`, :func:`list_store_schemas`, :func:`table_count`, every method of :class:`PgVectorCollection`, and many external sync paths (``rag_system/pg_source_files``, ``rag_system/file_rag_manager``, ``sword/overlay``, and ``scripts/``). Returns: psycopg_pool.ConnectionPool: The shared autocommit pool. """ global _sync_pool if _sync_pool is not None: return _sync_pool with _sync_pool_lock: if _sync_pool is None: from psycopg_pool import ConnectionPool min_size, max_size = _pool_sizes() _sync_pool = ConnectionPool( conninfo=_psycopg_conninfo(), min_size=min_size, max_size=max_size, kwargs={"autocommit": True}, open=True, ) logger.info("vector_store: created psycopg3 sync pool") return _sync_pool
[docs] def warm_sync_pool() -> None: """Open the synchronous psycopg3 pool and verify connectivity. Calls :func:`get_sync_pool` to lazily create the shared pool, then runs a trivial ``SELECT 1`` on a borrowed connection so the first real query doesn't pay connection-setup latency. Mirrors :func:`warm_async_pool` for sync call sites (worker threads, CLIs). No internal or external callers were found by grep; it is a startup-warm entry point for synchronous services. Raises: Exception: Propagates any psycopg connection/query error if the database is unreachable. """ pool = get_sync_pool() with pool.connection() as conn: conn.execute("SELECT 1")
# --------------------------------------------------------------------------- # DDL # --------------------------------------------------------------------------- def _ensure_sql(schema: str, table: str, dim: int) -> list[str]: """Build the idempotent DDL statements that create a collection's table. Returns three ``IF NOT EXISTS`` statements: create the schema, create the collection table (``id``/``document``/``metadata``/``embedding halfvec(dim)``), and create the HNSW index on the embedding using ``halfvec_l2_ops``. Both identifiers are first checked by :func:`_validate_ident` since they are interpolated directly into the SQL. Called by :meth:`PgVectorCollection.ensure` and :meth:`AsyncPgVectorCollection.ensure`, which execute each returned statement on a pooled connection. No external callers. Args: schema: The validated Postgres schema name for the store. table: The validated Postgres table name for the collection. dim: Embedding dimensionality for the ``halfvec`` column (e.g. :data:`EMBED_DIM`). Returns: list[str]: SQL DDL statements to execute in order. Raises: ValueError: If *schema* or *table* is not a safe identifier. """ _validate_ident(schema, "schema") _validate_ident(table, "table") return [ f'CREATE SCHEMA IF NOT EXISTS "{schema}"', ( f'CREATE TABLE IF NOT EXISTS "{schema}"."{table}" (' "id text PRIMARY KEY, document text, metadata jsonb, " f"embedding halfvec({dim}))" ), ( f'CREATE INDEX IF NOT EXISTS "{table}_embedding_idx" ' f'ON "{schema}"."{table}" USING hnsw (embedding halfvec_l2_ops)' ), ] # --------------------------------------------------------------------------- # Collections # --------------------------------------------------------------------------- class _BaseCollection: """Driver-neutral base for a single pgvector collection (schema.table). Holds the validated schema/table identifiers and the embedding dimension, and provides the SQL builders shared by the sync (:class:`PgVectorCollection`) and async (:class:`AsyncPgVectorCollection`) subclasses. It performs no I/O itself; subclasses execute the SQL it emits on their respective pools. """ def __init__(self, schema: str, table: str, dim: int = EMBED_DIM): """Validate identifiers and cache the quoted ``schema.table`` name. Runs both names through :func:`_validate_ident` (raising on unsafe input) and precomputes ``self._qname``, the double-quoted qualified name spliced into every statement. Invoked via the concrete subclasses, which are instantiated across the repo (e.g. ``prompt_context``\\ 's Golden-Goddess collection, ``anamnesis_engine``, ``extract_tags_to_concepts``, and ``tools/chromadb_tools``). Args: schema: Postgres schema name for the logical store. table: Postgres table name for the collection. dim: Embedding dimensionality for the ``halfvec`` column; defaults to :data:`EMBED_DIM`. Raises: ValueError: If *schema* or *table* is not a safe identifier. """ self.schema = _validate_ident(schema, "schema") self.table = _validate_ident(table, "table") self.dim = dim self._qname = f'"{self.schema}"."{self.table}"' # -- SQL builders (driver-neutral via _ParamBuilder) ------------------- def _upsert_sql(self, style: str) -> str: """Build the parameterised ``INSERT … ON CONFLICT`` upsert statement. Produces a single-row upsert with placeholders for ``id``, ``document``, ``metadata`` (cast ``::jsonb``) and ``embedding`` (cast ``::halfvec(dim)``), updating all non-key columns on primary-key conflict. The placeholders are filled per row by ``executemany``. Uses a :class:`_ParamBuilder` only to emit driver-correct placeholder syntax; the four ``None`` adds reserve positions, not values. Called by :meth:`PgVectorCollection.upsert` and :meth:`AsyncPgVectorCollection.upsert`. Args: style: ``"asyncpg"`` or ``"psycopg"`` placeholder style. Returns: str: The upsert SQL for the configured driver. """ pb = _ParamBuilder(style) idp = pb.add(None) docp = pb.add(None) metap = pb.add(None) vecp = pb.add(None) return ( f"INSERT INTO {self._qname} (id, document, metadata, embedding) " f"VALUES ({idp}, {docp}, {metap}::jsonb, {vecp}::halfvec({self.dim})) " "ON CONFLICT (id) DO UPDATE SET " "document = EXCLUDED.document, " "metadata = EXCLUDED.metadata, " "embedding = EXCLUDED.embedding" ) def _query_sql( self, style: str, n_results: int, where: Any, where_document: Any ) -> tuple[str, list[Any]]: """Build the nearest-neighbour ``SELECT`` and its bind-value list. Emits a query that returns ``id``/``document``/``metadata`` plus the L2 distance (``embedding <-> $vec``) and orders by that same distance with a ``LIMIT``. The embedding appears twice (select expression and ``ORDER BY``); both positions are reserved as ``None`` placeholders for the caller to fill with the real vector literal, while optional metadata and document filters are injected via :func:`_build_filter_sql`. Called by :meth:`PgVectorCollection.query` and :meth:`AsyncPgVectorCollection.query`, which replace the ``None`` slots with the rendered vector before executing. Args: style: ``"asyncpg"`` or ``"psycopg"`` placeholder style. n_results: Maximum number of neighbours to return (``LIMIT``). where: Chroma-style metadata filter, or ``None``. where_document: Chroma-style document filter, or ``None``. Returns: tuple[str, list[Any]]: The SQL and its ordered bind values, where the two embedding slots are still ``None`` pending substitution. """ pb = _ParamBuilder(style) vecp = pb.add(None) # placeholder reserved for the embedding (select) # We need the vector value bound; fill after building. filt = _build_filter_sql(pb, where, where_document) vecp2 = pb.add(None) # order-by vector limitp = pb.add(n_results) sql = ( f"SELECT id, document, metadata, " f"(embedding <-> {vecp}::halfvec({self.dim})) AS distance " f"FROM {self._qname}{filt} " f"ORDER BY embedding <-> {vecp2}::halfvec({self.dim}) " f"LIMIT {limitp}" ) return sql, pb.values def _get_sql( self, style: str, ids: Any, where: Any, where_document: Any, offset: int | None, limit: int | None, ) -> tuple[str, list[Any]]: """Build a non-vector ``SELECT`` for fetching rows by id and/or filter. Emits ``SELECT id, document, metadata`` constrained by an optional ``id = ANY(...)`` predicate and/or the metadata/document filters from :func:`_build_filter_sql`, splicing the two together with ``AND`` when both are present (it strips the leading ``" WHERE "`` from the filter fragment to do so). When *offset* or *limit* is given it appends a stable ``ORDER BY id`` plus the corresponding ``LIMIT``/``OFFSET``. Called by :meth:`PgVectorCollection.get` and :meth:`AsyncPgVectorCollection.get`. Args: style: ``"asyncpg"`` or ``"psycopg"`` placeholder style. ids: An iterable of ids to restrict to, or ``None`` for no id filter. where: Chroma-style metadata filter, or ``None``. where_document: Chroma-style document filter, or ``None``. offset: Row offset to skip, or ``None``. limit: Maximum rows to return, or ``None``. Returns: tuple[str, list[Any]]: The SQL and its ordered bind values. """ pb = _ParamBuilder(style) conds: list[str] = [] if ids is not None: conds.append(f"id = ANY({pb.add(list(ids))})") filt = _build_filter_sql(pb, where, where_document) where_sql = "" if conds and filt: where_sql = " WHERE " + conds[0] + " AND " + filt[len(" WHERE "):] elif conds: where_sql = " WHERE " + conds[0] elif filt: where_sql = filt tail = "" if offset is not None or limit is not None: tail += " ORDER BY id" if limit is not None: tail += f" LIMIT {pb.add(int(limit))}" if offset is not None: tail += f" OFFSET {pb.add(int(offset))}" sql = f"SELECT id, document, metadata FROM {self._qname}{where_sql}{tail}" return sql, pb.values def _delete_sql( self, style: str, ids: Any, where: Any ) -> tuple[str, list[Any]]: """Build a ``DELETE`` statement scoped by ids and/or a metadata filter. Combines an optional ``id = ANY(...)`` predicate with the metadata ``where`` clause from :func:`_translate_clause` (joined by ``AND``). Note that with no ids and no *where* this returns an unfiltered ``DELETE FROM table``; the public ``delete`` methods guard against that case before calling here. Called by :meth:`PgVectorCollection.delete` and :meth:`AsyncPgVectorCollection.delete`. Args: style: ``"asyncpg"`` or ``"psycopg"`` placeholder style. ids: An iterable of ids to delete, or ``None``. where: Chroma-style metadata filter, or ``None``. Returns: tuple[str, list[Any]]: The ``DELETE`` SQL and its bind values. """ pb = _ParamBuilder(style) conds: list[str] = [] if ids is not None: conds.append(f"id = ANY({pb.add(list(ids))})") w = _translate_clause(where, pb) if w: conds.append(w) where_sql = (" WHERE " + " AND ".join(conds)) if conds else "" return f"DELETE FROM {self._qname}{where_sql}", pb.values @staticmethod def _upsert_rows( ids: Sequence[str], documents: Sequence[str], metadatas: Sequence[dict] | None, embeddings: Sequence[Sequence[float]], ) -> list[tuple]: """Shape parallel column lists into per-row tuples for ``executemany``. Zips *ids*/*documents*/*metadatas*/*embeddings* positionally, defaulting a missing document to ``""`` and missing/None metadata to ``{}``. Each metadata dict is serialised with ``json.dumps`` and each embedding is rendered via :func:`vector_literal`, so the resulting tuples line up with the ``(id, document, metadata, embedding)`` placeholders of :meth:`_upsert_sql`. Called by both ``upsert`` implementations to prepare their rows. Args: ids: Row primary keys (coerced to ``str``). documents: Document texts, positionally aligned with *ids*. metadatas: Per-row metadata dicts, or ``None``. embeddings: Per-row embedding vectors, positionally aligned. Returns: list[tuple]: ``(id, document, metadata_json, vector_literal)`` rows. """ rows: list[tuple] = [] for i, _id in enumerate(ids): meta = metadatas[i] if metadatas and i < len(metadatas) else {} rows.append( ( str(_id), documents[i] if i < len(documents) else "", json.dumps(meta or {}), vector_literal(embeddings[i]), ) ) return rows @staticmethod def _rows_to_get(records: Iterable[Any]) -> dict[str, list]: """Convert fetched rows into the Chroma-style ``get`` result mapping. Accepts rows as either positional tuples (``r[0]``/``r[1]``/``r[2]``) or mapping rows (``r["id"]`` etc.), so it works for both psycopg tuple rows and pre-tupled asyncpg records. Each metadata cell is normalised through :func:`_coerce_metadata`. Called by :meth:`PgVectorCollection.get` and :meth:`AsyncPgVectorCollection.get` to build their return value. Args: records: Iterable of result rows (tuples or dict-like mappings). Returns: dict[str, list]: ``{"ids": [...], "documents": [...], "metadatas": [...]}`` with positionally aligned lists. """ ids: list[str] = [] docs: list[str] = [] metas: list[dict] = [] for r in records: ids.append(r[0] if not isinstance(r, dict) else r["id"]) docs.append(r[1] if not isinstance(r, dict) else r["document"]) metas.append( _coerce_metadata( r[2] if not isinstance(r, dict) else r["metadata"] ) ) return {"ids": ids, "documents": docs, "metadatas": metas}
[docs] class PgVectorCollection(_BaseCollection): """Synchronous pgvector collection backed by the shared psycopg3 pool. Concrete :class:`_BaseCollection` for blocking call sites — worker threads, CLIs, and ``scripts/`` — that cannot await. Each method borrows a connection from :func:`get_sync_pool` and runs the driver-neutral SQL the base class builds (with ``psycopg`` placeholders). Exposes the Chroma-like surface ``ensure`` / ``upsert`` / ``query`` / ``get`` / ``delete`` / ``count`` / ``clear`` / ``drop``; embeddings are always supplied by the caller. Wrapped by :class:`ChromaCompatCollection` for auto-embedding call sites and instantiated directly by ``anamnesis_engine``, ``extract_tags_to_concepts``, ``backfill_entity_provenance``, ``memory_search``, and ``rag_system/file_rag_manager``. """
[docs] def ensure(self) -> None: """Create the collection's schema, table, and HNSW index if absent. Borrows a connection from the synchronous pool (:func:`get_sync_pool`) and runs each idempotent DDL statement from :func:`_ensure_sql`. Safe to call repeatedly. Used directly (e.g. by ``rag_system/file_rag_manager`` and verification scripts) and internally by :meth:`clear`. Raises: Exception: Propagates any psycopg error if the DDL fails. """ pool = get_sync_pool() with pool.connection() as conn: for stmt in _ensure_sql(self.schema, self.table, self.dim): conn.execute(stmt)
[docs] def upsert( self, ids: Sequence[str], documents: Sequence[str], metadatas: Sequence[dict] | None = None, embeddings: Sequence[Sequence[float]] | None = None, ) -> None: """Insert or replace rows by primary key, requiring explicit embeddings. Shapes the parallel lists via :meth:`_upsert_rows`, builds the upsert SQL with :meth:`_upsert_sql`, and runs it with ``cursor.executemany`` on a pooled (autocommit) connection so each id is inserted or fully updated. Returns early when *ids* is empty. Unlike :class:`ChromaCompatCollection`, this layer never embeds implicitly — callers supply Gemini embeddings. Reached from synchronous indexing paths such as ``extract_tags_to_concepts`` and ``backfill_entity_provenance``, which instantiate the collection and call ``upsert`` directly. Args: ids: Row primary keys. documents: Document texts aligned with *ids*. metadatas: Per-row metadata dicts, or ``None`` (treated as empty). embeddings: Per-row embedding vectors; required. Raises: ValueError: If *embeddings* is ``None``. """ if embeddings is None: raise ValueError("PgVectorCollection.upsert requires embeddings") if not ids: return rows = self._upsert_rows(ids, documents, metadatas, embeddings) sql = self._upsert_sql("psycopg") pool = get_sync_pool() with pool.connection() as conn: with conn.cursor() as cur: cur.executemany(sql, rows)
[docs] def query( self, embedding: Sequence[float], n_results: int = 5, where: Any = None, where_document: Any = None, ) -> list[dict[str, Any]]: """Run a synchronous nearest-neighbour search and return scored hits. Builds the query via :meth:`_query_sql`, substitutes the rendered :func:`vector_literal` into the reserved ``None`` placeholders, executes it on a pooled connection, and maps each row to a dict including the raw L2 ``distance`` and a derived ``similarity`` from :func:`l2_to_similarity`. Used by synchronous search paths (e.g. ``rag_system/file_rag_manager`` via :class:`ChromaCompatCollection`, and direct callers like ``anamnesis_engine``). Args: embedding: The query embedding vector. n_results: Maximum neighbours to return. where: Chroma-style metadata filter, or ``None``. where_document: Chroma-style document filter, or ``None``. Returns: list[dict[str, Any]]: Per-hit dicts with ``id``, ``document``, ``metadata``, ``distance``, and ``similarity``, nearest first. """ sql, params = self._query_sql("psycopg", n_results, where, where_document) vec = vector_literal(embedding) params = [vec if v is None else v for v in params] pool = get_sync_pool() with pool.connection() as conn: with conn.cursor() as cur: cur.execute(sql, params) rows = cur.fetchall() out: list[dict[str, Any]] = [] for r in rows: dist = float(r[3]) if r[3] is not None else None out.append( { "id": r[0], "document": r[1], "metadata": _coerce_metadata(r[2]), "distance": dist, "similarity": l2_to_similarity(dist), } ) return out
[docs] def get( self, ids: Sequence[str] | None = None, where: Any = None, where_document: Any = None, offset: int | None = None, limit: int | None = None, ) -> dict[str, list]: """Fetch rows by id and/or filter (no vector search) synchronously. Builds the statement with :meth:`_get_sql`, runs it on a pooled connection, and reshapes the rows via :meth:`_rows_to_get`. Surfaced to Chroma-style callers through :meth:`ChromaCompatCollection.get`. Args: ids: Ids to restrict to, or ``None`` for all matching rows. where: Chroma-style metadata filter, or ``None``. where_document: Chroma-style document filter, or ``None``. offset: Row offset to skip, or ``None``. limit: Maximum rows to return, or ``None``. Returns: dict[str, list]: ``{"ids", "documents", "metadatas"}`` aligned lists. """ sql, params = self._get_sql( "psycopg", ids, where, where_document, offset, limit ) pool = get_sync_pool() with pool.connection() as conn: with conn.cursor() as cur: cur.execute(sql, params) rows = cur.fetchall() return self._rows_to_get(rows)
[docs] def delete( self, ids: Sequence[str] | None = None, where: Any = None ) -> None: """Delete rows by id and/or metadata filter synchronously. No-ops when neither *ids* nor *where* is supplied (guarding against an accidental full-table delete), otherwise builds the statement via :meth:`_delete_sql` and runs it on a pooled connection. Surfaced through :meth:`ChromaCompatCollection.delete`. Args: ids: Ids to delete, or ``None``. where: Chroma-style metadata filter, or ``None``. """ if ids is None and not where: return sql, params = self._delete_sql("psycopg", ids, where) pool = get_sync_pool() with pool.connection() as conn: conn.execute(sql, params)
[docs] def count(self) -> int: """Return the total number of rows in the collection. Runs ``SELECT count(*)`` on a pooled connection. Surfaced through :meth:`ChromaCompatCollection.count`. Returns: int: The row count (``0`` if the query yields no row). """ pool = get_sync_pool() with pool.connection() as conn: with conn.cursor() as cur: cur.execute(f"SELECT count(*) FROM {self._qname}") row = cur.fetchone() return int(row[0]) if row else 0
[docs] def clear(self) -> None: """Empty the collection, creating it first if necessary. Calls :meth:`ensure` so the ``TRUNCATE`` cannot fail on a missing table, then truncates on a pooled connection. Data is removed but the table and index remain. """ self.ensure() pool = get_sync_pool() with pool.connection() as conn: conn.execute(f"TRUNCATE TABLE {self._qname}")
[docs] def drop(self) -> None: """Drop the collection's entire schema and everything in it. Runs ``DROP SCHEMA IF EXISTS … CASCADE`` on a pooled connection, destroying every table/index under this store's schema (not just this one table). Irreversible; intended for teardown/reset tooling. """ pool = get_sync_pool() with pool.connection() as conn: conn.execute(f'DROP SCHEMA IF EXISTS "{self.schema}" CASCADE')
[docs] class ChromaCompatCollection: """Chroma-compatible sync facade over :class:`PgVectorCollection`. Auto-embeds documents/queries via *embedding_fn* (e.g. ``SyncOpenRouterEmbeddings``) so call sites that relied on Chroma's ``embedding_function`` keep working with minimal changes. ``query`` returns Chroma-shaped nested lists and reports ``distances`` as cosine distance (``1 - similarity``), so callers that compute ``1 - distance`` recover a cosine similarity in ``[0, 1]`` exactly as they did against Chroma. """
[docs] def __init__(self, pg: PgVectorCollection, embedding_fn: Any): """Wrap a :class:`PgVectorCollection` with an auto-embedding facade. Stores the backing collection and an embedder. Instantiated by ``rag_system/file_rag_manager`` (``self.collection = ChromaCompatCollection(self._pg, self.embedding_fn)``) so existing Chroma-style call sites keep working over pgvector. Args: pg: The synchronous pgvector collection doing the real I/O. embedding_fn: An embedder exposing either ``embed_documents`` / ``embed_query`` methods or a plain callable taking a list of texts (e.g. ``SyncOpenRouterEmbeddings``). """ self.pg = pg self.embedding_fn = embedding_fn
def _embed_documents(self, docs: Sequence[str]) -> list[list[float]]: """Embed a batch of document texts using :attr:`embedding_fn`. Prefers an ``embed_documents`` method on the embedder and falls back to calling the embedder directly. Called by :meth:`upsert` when the caller did not pass precomputed embeddings. Args: docs: Document texts to embed. Returns: list[list[float]]: One embedding vector per input document. """ fn = getattr(self.embedding_fn, "embed_documents", None) if fn is not None: return list(fn(list(docs))) return list(self.embedding_fn(list(docs))) def _embed_query(self, text: str) -> list[float]: """Embed a single query string using :attr:`embedding_fn`. Prefers an ``embed_query`` method, else calls the embedder with a one-element list and takes the first vector. Called by :meth:`query` when only ``query_texts`` (not embeddings) are supplied. Args: text: The query string to embed. Returns: list[float]: The query's embedding vector. """ fn = getattr(self.embedding_fn, "embed_query", None) if fn is not None: return list(fn([text]))[0] return list(self.embedding_fn([text]))[0]
[docs] def ensure(self) -> None: """Ensure the backing collection's schema/table/index exist. Thin pass-through to :meth:`PgVectorCollection.ensure`. """ self.pg.ensure()
[docs] def upsert( self, ids: Sequence[str], documents: Sequence[str], metadatas: Sequence[dict] | None = None, embeddings: Sequence[Sequence[float]] | None = None, ) -> None: """Upsert documents, auto-embedding them when no vectors are given. Computes embeddings via :meth:`_embed_documents` if *embeddings* is ``None`` (the Chroma ``embedding_function`` behaviour), then delegates to :meth:`PgVectorCollection.upsert`. Args: ids: Row primary keys. documents: Document texts aligned with *ids*. metadatas: Per-row metadata dicts, or ``None``. embeddings: Precomputed vectors; embedded on demand if ``None``. """ if embeddings is None: embeddings = self._embed_documents(documents) self.pg.upsert(ids, documents, metadatas, embeddings)
[docs] def add( self, ids: Sequence[str], documents: Sequence[str], metadatas: Sequence[dict] | None = None, embeddings: Sequence[Sequence[float]] | None = None, ) -> None: """Alias for :meth:`upsert`, matching Chroma's ``add`` API. Forwards all arguments to :meth:`upsert` so call sites using Chroma's ``collection.add`` semantics keep working. Args: ids: Row primary keys. documents: Document texts aligned with *ids*. metadatas: Per-row metadata dicts, or ``None``. embeddings: Precomputed vectors, or ``None`` to auto-embed. """ self.upsert(ids, documents, metadatas=metadatas, embeddings=embeddings)
[docs] def get( self, ids: Sequence[str] | None = None, where: Any = None, where_document: Any = None, limit: int | None = None, offset: int | None = None, include: Any = None, ) -> dict[str, list]: """Fetch rows by id/filter, mirroring Chroma's ``get`` signature. Delegates to :meth:`PgVectorCollection.get`. The Chroma-only *include* argument is accepted for signature compatibility but ignored (this layer always returns ids/documents/metadatas). Args: ids: Ids to restrict to, or ``None``. where: Chroma-style metadata filter, or ``None``. where_document: Chroma-style document filter, or ``None``. limit: Maximum rows to return, or ``None``. offset: Row offset to skip, or ``None``. include: Accepted for Chroma compatibility; ignored. Returns: dict[str, list]: ``{"ids", "documents", "metadatas"}`` lists. """ return self.pg.get( ids=ids, where=where, where_document=where_document, offset=offset, limit=limit, )
[docs] def delete( self, ids: Sequence[str] | None = None, where: Any = None ) -> None: """Delete rows by id and/or metadata filter. Thin pass-through to :meth:`PgVectorCollection.delete`. Args: ids: Ids to delete, or ``None``. where: Chroma-style metadata filter, or ``None``. """ self.pg.delete(ids=ids, where=where)
[docs] def count(self) -> int: """Return the collection's row count via :meth:`PgVectorCollection.count`. Returns: int: Total number of stored rows. """ return self.pg.count()
[docs] def query( self, query_embeddings: Any = None, query_texts: Any = None, n_results: int = 10, where: Any = None, where_document: Any = None, include: Any = None, ) -> dict[str, list]: """Run a nearest-neighbour search returning Chroma-shaped nested lists. Resolves the query vector from *query_embeddings* (unwrapping a list-of-lists batch to its first row) or by embedding ``query_texts[0]`` via :meth:`_embed_query`; with neither it returns the empty Chroma shape. Delegates the search to :meth:`PgVectorCollection.query`, then repackages results into single-batch nested lists and converts each ``similarity`` back to a cosine ``distance`` (``1 - similarity``) so callers computing ``1 - distance`` recover the similarity exactly as they did against Chroma. The *include* argument is ignored. Args: query_embeddings: A query vector or a one-element batch of vectors, or ``None``. query_texts: A list of query strings (only the first is used), or ``None``. n_results: Maximum neighbours to return. where: Chroma-style metadata filter, or ``None``. where_document: Chroma-style document filter, or ``None``. include: Accepted for Chroma compatibility; ignored. Returns: dict[str, list]: ``{"ids", "documents", "metadatas", "distances"}`` each wrapped in a single-query outer list; ``distances`` are cosine distances. Returns empty single-batch lists when no query is given. """ empty = {"ids": [[]], "documents": [[]], "metadatas": [[]], "distances": [[]]} if query_embeddings is not None: emb = query_embeddings if emb and isinstance(emb[0], (list, tuple)): emb = emb[0] elif query_texts: emb = self._embed_query(query_texts[0]) else: return empty rows = self.pg.query( emb, n_results=n_results, where=where, where_document=where_document ) return { "ids": [[r["id"] for r in rows]], "documents": [[r["document"] for r in rows]], "metadatas": [[r["metadata"] for r in rows]], "distances": [ [ (1.0 - r["similarity"]) if r["similarity"] is not None else None for r in rows ] ], }
[docs] class AsyncPgVectorCollection(_BaseCollection): """Asynchronous pgvector collection backed by the per-loop asyncpg pool. Concrete :class:`_BaseCollection` for genuinely-async hot paths running on the bot event loop. Each coroutine acquires a connection from :func:`get_async_pool` and runs the driver-neutral SQL the base class builds (with ``asyncpg`` numbered placeholders). Exposes the async Chroma-like surface ``ensure`` / ``upsert`` / ``query`` / ``get`` / ``delete`` / ``count`` / ``clear``; embeddings are always supplied by the caller. Drives async paths such as ``prompt_context._divine_reflex`` (Golden Goddess reflex), ``tools/query_golden_goddess_v2``, and ``tools/chromadb_tools``. """
[docs] async def ensure(self) -> None: """Create the schema, table, and HNSW index for this collection (async). Async counterpart of :meth:`PgVectorCollection.ensure`: runs the idempotent DDL from :func:`_ensure_sql` on a connection acquired from :func:`get_async_pool`. Called by ``tools/chromadb_tools`` before upserting into a vector DB, and internally by :meth:`clear`. """ pool = await get_async_pool() async with pool.acquire() as conn: for stmt in _ensure_sql(self.schema, self.table, self.dim): await conn.execute(stmt)
[docs] async def upsert( self, ids: Sequence[str], documents: Sequence[str], metadatas: Sequence[dict] | None = None, embeddings: Sequence[Sequence[float]] | None = None, ) -> None: """Insert or update rows on the async (asyncpg) path. No-ops on empty *ids*; otherwise rows are built by :meth:`_upsert_rows`, the SQL by :meth:`_upsert_sql`, and executed with ``executemany`` on a pooled connection from :func:`get_async_pool`. Used by async ingestion code such as ``tools/chromadb_tools``. Args: ids: Stable ids for each row. documents: The document texts. metadatas: Per-row metadata dicts, or ``None``. embeddings: Required precomputed embeddings (this layer never embeds). Raises: ValueError: If *embeddings* is ``None``. """ if embeddings is None: raise ValueError("AsyncPgVectorCollection.upsert requires embeddings") if not ids: return rows = self._upsert_rows(ids, documents, metadatas, embeddings) sql = self._upsert_sql("asyncpg") pool = await get_async_pool() async with pool.acquire() as conn: await conn.executemany(sql, rows)
[docs] async def query( self, embedding: Sequence[float], n_results: int = 5, where: Any = None, where_document: Any = None, ) -> list[dict[str, Any]]: """Run an async nearest-neighbour search and return scored hits. Builds the SQL via :meth:`_query_sql`, fills the reserved vector slots with the rendered :func:`vector_literal`, fetches rows on a pooled connection from :func:`get_async_pool`, and maps each row to a dict with the raw L2 ``distance`` and a ``similarity`` from :func:`l2_to_similarity`. Drives async hot paths such as ``prompt_context._divine_reflex`` (Golden Goddess reflex), ``tools/query_golden_goddess_v2``, and ``tools/chromadb_tools``. Args: embedding: The query embedding vector. n_results: Maximum number of neighbours to return. where: Chroma-style metadata filter dict, or ``None``. where_document: Chroma-style document filter dict, or ``None``. Returns: list[dict[str, Any]]: Per-hit dicts with ``id``, ``document``, ``metadata``, ``distance``, and ``similarity``, nearest first. """ sql, params = self._query_sql("asyncpg", n_results, where, where_document) vec = vector_literal(embedding) params = [vec if v is None else v for v in params] pool = await get_async_pool() async with pool.acquire() as conn: rows = await conn.fetch(sql, *params) out: list[dict[str, Any]] = [] for r in rows: dist = float(r["distance"]) if r["distance"] is not None else None out.append( { "id": r["id"], "document": r["document"], "metadata": _coerce_metadata(r["metadata"]), "distance": dist, "similarity": l2_to_similarity(dist), } ) return out
[docs] async def get( self, ids: Sequence[str] | None = None, where: Any = None, where_document: Any = None, offset: int | None = None, limit: int | None = None, ) -> dict[str, list]: """Fetch rows by id and/or filters on the async path. Builds the statement via :meth:`_get_sql`, fetches rows on a pooled connection from :func:`get_async_pool`, then normalises the asyncpg ``Record`` rows into Chroma-style parallel lists via :meth:`_rows_to_get`. Used by async tools like ``tools/chromadb_tools``. Args: ids: Specific ids to fetch, or ``None``. where: Chroma-style metadata filter dict, or ``None``. where_document: Chroma-style document filter dict, or ``None``. offset: Row offset for pagination, or ``None``. limit: Maximum rows to return, or ``None``. Returns: dict[str, list]: Parallel ``ids`` / ``documents`` / ``metadatas`` lists. """ sql, params = self._get_sql( "asyncpg", ids, where, where_document, offset, limit ) pool = await get_async_pool() async with pool.acquire() as conn: rows = await conn.fetch(sql, *params) return self._rows_to_get( [(r["id"], r["document"], r["metadata"]) for r in rows] )
[docs] async def delete( self, ids: Sequence[str] | None = None, where: Any = None ) -> None: """Delete rows by id list and/or metadata filter (async). No-ops when neither *ids* nor *where* is given. Otherwise builds the statement via :meth:`_delete_sql` and executes it on a pooled connection from :func:`get_async_pool`. Used by async maintenance code such as ``tools/chromadb_tools``. Args: ids: Ids to delete, or ``None``. where: Chroma-style metadata filter selecting rows to delete, or ``None``. """ if ids is None and not where: return sql, params = self._delete_sql("asyncpg", ids, where) pool = await get_async_pool() async with pool.acquire() as conn: await conn.execute(sql, *params)
[docs] async def count(self) -> int: """Return the collection's row count (async). Runs ``SELECT count(*)`` via ``fetchval`` on a pooled connection from :func:`get_async_pool`. Used by async tools such as ``tools/chromadb_tools``. Returns: int: The row count, or ``0`` when the value is ``None``. """ pool = await get_async_pool() async with pool.acquire() as conn: val = await conn.fetchval(f"SELECT count(*) FROM {self._qname}") return int(val) if val is not None else 0
[docs] async def clear(self) -> None: """Empty the collection on the async path, ensuring it exists first. Calls :meth:`ensure` then ``TRUNCATE``\\ s the table on a pooled connection from :func:`get_async_pool`; the schema and HNSW index are preserved. No internal or grep-found external callers; invoked ad hoc by async maintenance code. """ await self.ensure() pool = await get_async_pool() async with pool.acquire() as conn: await conn.execute(f"TRUNCATE TABLE {self._qname}")
# --------------------------------------------------------------------------- # Store introspection (replaces filesystem listing of rag_stores/) # --------------------------------------------------------------------------- _SYSTEM_SCHEMAS = frozenset( {"pg_catalog", "information_schema", "public", "pg_toast"} )
[docs] def list_store_schemas() -> list[str]: """List the user (non-system) schemas in the vector database. Replaces the old filesystem listing of ``rag_stores/`` directories: it queries ``information_schema.schemata`` on the synchronous pool (:func:`get_sync_pool`), excluding Postgres's own schemas (``pg_catalog``, ``information_schema``, ``public``, ``pg_toast``, and the ``pg_temp_*`` / ``pg_toast_temp_*`` families) so only logical store schemas remain. No callers were found by grep, so it is used (if at all) by introspection tooling or scripts. Returns: list[str]: Candidate store schema names, ordered alphabetically. """ pool = get_sync_pool() with pool.connection() as conn: with conn.cursor() as cur: cur.execute( "SELECT schema_name FROM information_schema.schemata " "WHERE schema_name NOT IN ('pg_catalog','information_schema'," "'public','pg_toast') AND schema_name NOT LIKE 'pg_temp_%' " "AND schema_name NOT LIKE 'pg_toast_temp_%' " "ORDER BY schema_name" ) rows = cur.fetchall() return [r[0] for r in rows]
[docs] def table_count(schema: str, table: str) -> int: """Return the row count for a given ``schema.table``, or ``0`` if absent. Validates both identifiers with :func:`_validate_ident` (since the table name is interpolated into the ``count(*)`` query), then first checks ``information_schema.tables`` to confirm the table exists before counting, so a missing collection yields ``0`` rather than raising. Runs on the synchronous pool from :func:`get_sync_pool`. No callers were found by grep, so it is used (if at all) by introspection/verification tooling. Args: schema: The Postgres schema name (validated). table: The Postgres table name (validated). Returns: int: The row count, or ``0`` when the table does not exist. Raises: ValueError: If *schema* or *table* is not a safe identifier. """ _validate_ident(schema, "schema") _validate_ident(table, "table") pool = get_sync_pool() with pool.connection() as conn: with conn.cursor() as cur: cur.execute( "SELECT EXISTS (SELECT 1 FROM information_schema.tables " "WHERE table_schema=%s AND table_name=%s)", (schema, table), ) exists = cur.fetchone()[0] if not exists: return 0 cur.execute(f'SELECT count(*) FROM "{schema}"."{table}"') row = cur.fetchone() return int(row[0]) if row else 0