rag_system package

RAG System for Stargazer Bot v3.

A file-based Retrieval-Augmented Generation system using:

  • Gemini API for embeddings (google/gemini-embedding-001) via shared key pool

  • Postgres + pgvector for vector storage (exposed through a Chroma-shaped compatibility layer)

  • Whole-file retrieval where available, falling back to retrieval chunks

  • Per-channel auto-search with context injection

class rag_system.OpenRouterEmbeddings(api_key=None, model='google/gemini-embedding-001', dimensions=None, timeout=30.0, gemini_api_key=None, gemini_only=True)[source]

Bases: object

Async embeddings client that calls the Gemini API via the shared key pool.

Despite the historical name, this client targets Google’s Gemini embeddings endpoint using keys drawn from the shared pool rather than OpenRouter: it batches inputs (bounded by MAX_BATCH_SIZE / MAX_BATCH_CHARS), retries with backoff, and exposes embed_text()/embed_texts() returning dense numpy.ndarray vectors of width dimensions (default 3072). Instantiated across the codebase wherever embeddings are needed – the vector tool classifier (classifiers.vector_classifier), the tool/skill/ dangerous-command embedding refreshers under classifiers/, and tools.search_tools; the file-RAG manager uses the sync sibling SyncOpenRouterEmbeddings.

Parameters:
  • api_key (str | None)

  • model (str)

  • dimensions (int | None)

  • timeout (float)

  • gemini_api_key (str | None)

  • gemini_only (bool)

DEFAULT_MODEL = 'google/gemini-embedding-001'
MAX_BATCH_SIZE = 50
MAX_BATCH_CHARS = 50000
__init__(api_key=None, model='google/gemini-embedding-001', dimensions=None, timeout=30.0, gemini_api_key=None, gemini_only=True)[source]

Initialize the instance.

Parameters:
  • api_key (Optional[str]) – Unused; kept for backward compatibility.

  • model (str) – The model value.

  • dimensions (Optional[int]) – The dimensions value.

  • timeout (float) – Maximum wait time in seconds.

  • gemini_api_key (Optional[str]) – Unused; pool is used instead.

  • gemini_only (bool) – Always True; embeddings use Gemini API only.

async embed_text(text)[source]

Embed a single string into one dense vector.

Thin convenience wrapper that wraps text in a one-element list, delegates to embed_texts() (which handles batching, retries, and the Gemini-then-OpenRouter-then-paid fallback chain), and returns the lone resulting vector. Performs no network I/O of its own beyond what embed_texts() does. Called by the vector classifier (classifiers/vector_classifier.py) and the search tool (tools/search_tools.py) to embed an incoming query before a similarity lookup.

Parameters:

text (str) – The text to embed.

Returns:

A single float32 embedding vector of length self.dimensions.

Return type:

ndarray

async embed_texts(texts)[source]

Embed one or more texts into dense vectors, batching as needed.

Top-level async entry point for embedding. It coerces texts to a list via _normalize_embed_texts_input() (so a bare string is treated as a single document rather than iterated character by character), splits the input into size- and char-bounded batches with _create_batches(), and embeds each batch via _embed_batch() — which drives the Gemini API through the shared key pool and falls back to OpenRouter and the paid Gemini key on sustained rate limits. Called by embed_text() here, and by the classifier embedding refresh helpers (classifiers/tool_embedding_batch.py, classifiers/update_skill_embeddings.py) when rebuilding routing vectors.

Parameters:

texts (Union[str, Sequence[str]]) – A list of strings, or a single string (treated as one document — not iterated by character).

Returns:

One float32 vector per input text, in input order. Returns an empty list when texts is empty.

Return type:

List[ndarray]

Embed a single text using the Gemini API only, with a task type.

Intended for pre-computing a query embedding before passing it to FileRAGManager.search(query_embedding=...). Retries on transient errors with exponential back-off.

Return type:

List[float]

Parameters:
async close()[source]

Close the underlying httpx async client and release its connections.

Calls aclose on the shared httpx.AsyncClient created in __init__(), freeing pooled sockets. Invoked directly by callers that manage the client’s lifetime, and automatically by __aexit__() when the instance is used as an async context manager.

async __aenter__()[source]

Enter the async context manager, returning this client unchanged.

Lets the embeddings client be used with async with so its httpx connections are guaranteed to be closed on exit via __aexit__(). Invoked by the Python runtime at the start of an async with block.

Returns:

This same instance.

Return type:

OpenRouterEmbeddings

async __aexit__(exc_type, exc_val, exc_tb)[source]

Exit the async context manager, closing the httpx client.

Delegates to close() to release the pooled connections regardless of whether the async with block exited normally or via an exception. Invoked by the Python runtime at the end of an async with block. Does not suppress exceptions.

Parameters:
  • exc_type – Exception type if the block raised, else None.

  • exc_val – Exception instance if the block raised, else None.

  • exc_tb – Traceback if the block raised, else None.

class rag_system.SyncOpenRouterEmbeddings(api_key=None, model='google/gemini-embedding-001', dimensions=None, timeout=30.0, gemini_api_key=None, gemini_only=True, document_task_type=None, query_task_type=None)[source]

Bases: object

Synchronous wrapper used by ChromaDB’s embedding function interface.

Uses Gemini API via the shared key pool. Batches are dispatched concurrently via a ThreadPoolExecutor when there are multiple batches.

Parameters:
  • api_key (str | None)

  • model (str)

  • dimensions (int | None)

  • timeout (float)

  • gemini_api_key (str | None)

  • gemini_only (bool)

  • document_task_type (str | None)

  • query_task_type (str | None)

MAX_BATCH_SIZE = 50
MAX_BATCH_CHARS = 50000
MAX_EMBED_WORKERS = 20
__init__(api_key=None, model='google/gemini-embedding-001', dimensions=None, timeout=30.0, gemini_api_key=None, gemini_only=True, document_task_type=None, query_task_type=None)[source]

Initialize the instance.

Parameters:
  • api_key (Optional[str]) – Unused; kept for backward compatibility.

  • model (str) – The model value.

  • dimensions (Optional[int]) – The dimensions value.

  • timeout (float) – Maximum wait time in seconds.

  • gemini_api_key (Optional[str]) – Unused; pool is used instead.

  • gemini_only (bool) – Unused; always Gemini API.

  • document_task_type (Optional[str]) – Optional Gemini taskType for corpus (e.g. RETRIEVAL_DOCUMENT); used by embed_documents.

  • query_task_type (Optional[str]) – Optional Gemini taskType for queries (e.g. RETRIEVAL_QUERY); used by embed_query.

name()[source]

Return the stable identifier ChromaDB uses for this embedder.

Part of the ChromaDB EmbeddingFunction contract; the value (derived in __init__() from the model name) lets ChromaDB detect when a collection’s embedding function changes. Pure getter with no I/O.

Returns:

The embedder’s name, e.g. openrouter_google_gemini-embedding-001.

Return type:

str

dimension()[source]

Return the fixed embedding dimensionality reported to ChromaDB.

Part of the ChromaDB EmbeddingFunction contract, used to validate that stored vectors match the collection’s expected width. Returns the constant 3072 produced by the Gemini embedding model. Pure getter with no I/O.

Returns:

The vector length (3072).

Return type:

int

__call__(input)[source]

Embed a list of texts via the legacy ChromaDB callable interface.

Implements the original ChromaDB EmbeddingFunction protocol where the embedder itself is invoked as a function. Treats inputs as corpus documents, applying document_task_type (matching embed_documents()), and delegates the actual batching and HTTP work to _embed_inputs(). Invoked by older ChromaDB versions and any call site that calls the embedder object directly.

Parameters:

input (List[str]) – Texts to embed.

Returns:

One embedding (list of floats) per input text.

Return type:

List[List[float]]

embed_documents(input)[source]

Embed corpus documents for the ChromaDB upsert path.

The modern (ChromaDB >= 0.6) entry point used when adding documents to a collection. Applies document_task_type so vectors are optimized for the retrieval-corpus side, then delegates to _embed_inputs(). Reached via the vector-store compatibility layer (vector_store.ChromaCompatCollection), which prefers this method over __call__() when present.

Parameters:

input (List[str]) – Document texts to embed.

Returns:

One embedding per document, in input order.

Return type:

List[List[float]]

embed_query(input)[source]

Embed query texts for the ChromaDB query path.

The modern (ChromaDB >= 0.6) entry point used when searching a collection. Applies query_task_type so vectors are optimized for the query side of asymmetric retrieval, then delegates to _embed_inputs(). Reached via the vector-store compatibility layer (vector_store.ChromaCompatCollection) when issuing a similarity search.

Parameters:

input (List[str]) – Query texts to embed.

Returns:

One embedding per query, in input order.

Return type:

List[List[float]]

class rag_system.FileRAGManager(store_name='default', store_path=None, api_key=None, embedding_model='google/gemini-embedding-001', max_file_size=15728640, gemini_only=True, document_task_type=None, query_task_type=None)[source]

Bases: object

File- and URL-oriented RAG store over Postgres pgvector plus whole-file tables.

One instance manages a single named store: a per-store Postgres schema holding a files_<schema> vector table (halfvec(3072) + HNSW) for chunk embeddings, plus documents / source_files tables that keep the full original text and raw bytes so search can return entire files rather than just the matched chunk. It wraps the pgvector table in a Chroma-shaped facade (vector_store.ChromaCompatCollection) and embeds text through rag_system.openrouter_embeddings.SyncOpenRouterEmbeddings (Gemini by default), so indexing and search reach the embedding provider over HTTP while persistence stays in Postgres. Stores listed in CHUNK_ONLY_STORES (e.g. stargazer_logs) skip the whole-file tables entirely.

Instances are normally obtained through the module-level LRU registry get_rag_store() (and get_stargazer_docs_store()) rather than constructed directly; the RAG tool handlers in tools/rag.py / tools/cloud_rag.py, rag_system.auto_search.RAGAutoSearchManager, and starwiki/rag_integration all go through that registry, with starwiki being the one place that instantiates FileRAGManager directly.

Parameters:
  • store_name (str)

  • store_path (str | None)

  • api_key (str | None)

  • embedding_model (str)

  • max_file_size (int)

  • gemini_only (bool)

  • document_task_type (str | None)

  • query_task_type (str | None)

__init__(store_name='default', store_path=None, api_key=None, embedding_model='google/gemini-embedding-001', max_file_size=15728640, gemini_only=True, document_task_type=None, query_task_type=None)[source]

Initialize the instance.

Parameters:
  • store_name (str) – The store name value.

  • store_path (Optional[str]) – The store path value.

  • api_key (Optional[str]) – The api key value.

  • embedding_model (str) – The embedding model value.

  • max_file_size (int) – The max file size value.

  • gemini_only (bool) – Use only the Gemini API for embeddings.

  • document_task_type (Optional[str]) – Optional Gemini task type for indexed text (e.g. RETRIEVAL_DOCUMENT).

  • query_task_type (Optional[str]) – Optional Gemini task type for search queries (e.g. RETRIEVAL_QUERY).

index_file(file_path, tags=None, use_chunking=True, chunk_size=1500, chunk_overlap=200, force=False)[source]

Index a single file into the collection.

When force is True the content-hash dedup check is skipped so the file is always re-embedded (but the store is not cleared).

Return type:

Dict[str, Any]

Parameters:
async index_url(url, tags=None, use_chunking=True, chunk_size=1500, chunk_overlap=200)[source]

Fetch a remote document by URL and index it into this store.

The URL ingestion counterpart of index_file(). It downloads via the SSRF-guarded fetch_url_content(), derives a stable stored filename (URL-hash prefixed and sanitized), compresses oversized PDFs through compress_pdf(), decodes the bytes to text with decode_bytes_to_text(), and persists the whole file to Postgres via _upsert_whole_file(). It then dedups against any prior copy of the same source_url by content hash and, unless unchanged, upserts the embedded chunks (or a single document) into the pgvector collection with source_type="url" metadata. Touches the network (download), the embedding provider (via the collection upsert), Postgres, and a transient temp file for PDF compression.

Called by the rag_index_url tool handler in tools/rag.py.

Parameters:
  • url (str) – The document URL to fetch and index.

  • tags (Optional[List[str]]) – Optional tag labels stored in metadata.

  • use_chunking (bool) – Whether to chunk large content before embedding.

  • chunk_size (int) – Target chunk length in characters.

  • chunk_overlap (int) – Overlap carried between chunks.

Returns:

A result dict with success and, on success, action ("indexed" or "skipped"), url, filename, sizes and stored_path; on failure an error message.

Return type:

Dict[str, Any]

index_directory(directory_path, recursive=True, tags=None, exclude_patterns=None, max_workers=6, force=False, allowed_extensions=None)[source]

Index all supported files in directory_path.

When max_workers > 1, files are indexed concurrently using a thread pool. Each file’s embedding batches are already parallelised inside the embedding function, so even max_workers=1 benefits from concurrent API calls.

force bypasses the per-file content-hash dedup check without clearing the store, so already-indexed files get re-embedded.

When allowed_extensions is set, only files whose suffix (after normalizing to a leading dot, lowercase) appears in the collection are queued; None means no extension filter (all supported types under SUPPORTED_EXTENSIONS).

Return type:

Dict[str, Any]

Parameters:
search(query, n_results=5, tags=None, return_content=True, query_embedding=None, max_content_size=8000, content_mode='whole')[source]

Semantic search returning one result per matched file.

content_mode whole loads Postgres documents when available; chunks returns the best KNN-matched indexed chunk only.

Return type:

List[Dict[str, Any]]

Parameters:
remove_file(file_path)[source]

Remove every index entry (and the whole-file rows) for one local file.

Resolves file_path to an absolute path, looks up all chunk ids stored under that file_path metadata, deletes them from the pgvector collection, and then drops the matching documents / source_files rows for each affected filename via rag_system.pg_source_files.delete_whole_file so no orphaned whole-file text survives. Writes to Postgres only; returns a failure dict when the path is not present in the index.

Called by the rag_remove_file tool handler in tools/rag.py (run off the event loop with asyncio.to_thread).

Parameters:

file_path (str) – Path of the indexed file to remove.

Returns:

{"success": True, "file_path", "entries_removed"} on success, otherwise {"success": False, "error": ...}.

Return type:

Dict[str, Any]

remove_url(url)[source]

Remove every index entry (and whole-file rows) for one indexed URL.

The URL analogue of remove_file(): it finds all chunk ids carrying the given source_url metadata, deletes them from the pgvector collection, and drops the corresponding documents / source_files rows via rag_system.pg_source_files.delete_whole_file. Writes to Postgres only; returns a failure dict when the URL is not in the index.

Called by the rag_remove_url tool handler in tools/rag.py (run off the event loop with asyncio.to_thread).

Parameters:

url (str) – The previously indexed source URL to remove.

Returns:

{"success": True, "url", "entries_removed"} on success, otherwise {"success": False, "error": ...}.

Return type:

Dict[str, Any]

list_indexed_files(limit=100)[source]

List metadata for files represented in the vector index.

Reads up to limit chunk metadata records from the pgvector collection and projects each into a compact summary (path, filename, extension, size, index timestamp, decoded tags). This reflects what has been embedded and may include one row per chunk; the whole-file view is list_store_files(). Returns an empty list and logs on error.

Called by the RAG listing tool handlers in tools/rag.py (run off the event loop with asyncio.to_thread).

Parameters:

limit (int) – Maximum number of metadata records to fetch.

Returns:

One summary dict per fetched index entry (empty on error).

Return type:

List[Dict[str, Any]]

list_store_files()[source]

List the whole files held by this store, unioning Postgres and disk.

The file-centric (not chunk-centric) listing: it enumerates the Postgres source_files rows via rag_system.pg_source_files.list_whole_files (skipped for chunk-only stores through _pg_whole_files_enabled()) and then folds in any files from the legacy on-disk files directory that are not already represented, so unmigrated stores still report their content. Each entry carries a pg:// or filesystem path. Reads Postgres and the filesystem; results are sorted by filename.

Called by the rag_list_store_files tool handler in tools/rag.py (run off the event loop with asyncio.to_thread).

Returns:

Filename/size/modified/path dicts, one per whole file, sorted by filename.

Return type:

List[Dict[str, Any]]

read_store_file(filename)[source]

Return the full text of one stored file by bare filename.

Powers the rag_read_store_file tool the LLM is hinted toward when a retrieval chunk is not enough. It first rejects any filename containing a path separator or .. (path-traversal guard, so only flat store-local names are honored), then resolves the content through _load_whole_file_text() (Postgres documents/source_files, then legacy disk). Reads Postgres and possibly the filesystem; returns a failure dict when the file is missing or unreadable.

Called by the rag_read_store_file tool handler in tools/rag.py (run off the event loop with asyncio.to_thread); the same tool string is surfaced to the model by rag_system.auto_search.RAGAutoSearchManager and by message_processor.memory_linked_context.

Parameters:

filename (str) – Flat, store-local filename (no slashes or ..).

Returns:

{"success": True, "filename", "content", "size"} on success, otherwise {"success": False, "error": ...}.

Return type:

Dict[str, Any]

close()[source]

No-op: the pgvector pools are process-wide and shared.

Retained for API compatibility (the LRU registry calls this on eviction); there is no per-store connection to release.

Return type:

None

get_stats()[source]

Return a small summary of this store’s identity and size.

Reports the store name, the legacy on-disk DB path, the live indexed-row count from the pgvector collection (collection.count(), a Postgres COUNT), and the configured embedding model. Intended for admin/status surfaces; returns {"error": ...} instead of raising if the count query fails.

No in-repo callers were found by grep; invoked via dynamic/admin paths.

Returns:

store_name, store_path, file_count and embedding_model keys, or an error key on failure.

Return type:

Dict[str, Any]

clear()[source]

Empty this store, dropping all embedded chunks and whole-file rows.

Wipes every vector row via the underlying PgVectorCollection.clear and, for non-chunk-only stores, also truncates the whole-file tables through rag_system.pg_source_files.clear_source_tables so no document text is left behind. The store schema/table remain so it can be re-indexed in place. Writes to Postgres only; returns a failure dict instead of raising on error.

Called by the corpus (re)build scripts under scripts/ (e.g. ingest_religion_rag, ingest_law_rag, update_docs_rag, build_rag_from_directory) before a fresh full ingest.

Returns:

{"success": True, "message": ...} on success, otherwise {"success": False, "error": ...}.

Return type:

Dict[str, Any]

rag_system.get_rag_store(store_name='default', api_key=None, max_file_size=None, gemini_only=True, document_task_type=None, query_task_type=None)[source]

Get or create a RAG store by name (LRU-cached).

At most _STORE_REGISTRY_MAX_SIZE stores are kept open simultaneously. When a new store would exceed the limit the least recently used entry is closed and evicted.

Cache entries are keyed by store_name plus optional embedding task types so different embedding configurations do not share one client.

Return type:

FileRAGManager

Parameters:
  • store_name (str)

  • api_key (str | None)

  • max_file_size (int | None)

  • gemini_only (bool)

  • document_task_type (str | None)

  • query_task_type (str | None)

rag_system.get_stargazer_docs_store()[source]

Return the shared RAG store for Sphinx / tool documentation.

Uses RETRIEVAL_DOCUMENT for indexed chunks and RETRIEVAL_QUERY for search queries (Gemini embedding task types).

Return type:

FileRAGManager

rag_system.list_rag_stores()[source]

List the names of all available RAG stores.

A thin name-only projection over list_rag_stores_with_stats() (and thus its 60s cache): every Postgres schema that owns a files_<schema> table is a store. Swallows errors and returns an empty list so prompt-build and admin callers never crash on a transient Postgres hiccup.

Called by the web config API in web/rag_config_api.py (which filters the names for cloud-user stores).

Returns:

Store names (empty on error).

Return type:

List[str]

rag_system.list_rag_stores_with_stats()[source]

List stores with indexed-chunk counts from Postgres (60s cached).

Replaces the legacy filesystem scan. Counts come from planner row estimates so this never opens a per-store client and stays cheap on the per-message prompt path.

Return type:

List[Dict[str, Any]]

rag_system.delete_rag_store(store_name)[source]

Delete a RAG store completely (Postgres tables + local files dir).

Drops the store’s files_<schema> / documents / source_files tables (not the whole schema, so shared schemas such as golden_goddess keep their non-file tables like ncm_kernel).

Return type:

Dict[str, Any]

Parameters:

store_name (str)

class rag_system.RAGAutoSearchManager(redis_client)[source]

Bases: object

Per-channel RAG auto-search configuration and query fan-out, backed by Redis.

Stores one config record per channel under the stargazer:v3:rag:auto_search: Redis key prefix and, when a channel is enabled, automatically runs a semantic search across its configured stores for every inbound user message so the result can be injected into the LLM context. This is what makes a channel “RAG-aware” without the user having to invoke a tool. It owns no other state: all persistence is plain Redis GET / SET / SCAN / DELETE on string keys (config) and SISMEMBER on stargazer:v3:cloud_rag:shared: sets (cloud-store access control), while the actual vector search is delegated to rag_system.file_rag_manager.FileRAGManager instances resolved via rag_system.file_rag_manager.get_rag_store().

Constructed with a live redis.asyncio client by the RAG tool layer (tools/rag.py, tools/cloud_rag.py), the web config API (web/rag_config_api.py), and the inference message pipeline, whose search_for_message() is called per message from message_processor.generate_and_send.

Parameters:

redis_client (aioredis.Redis)

__init__(redis_client)[source]

Initialize the instance.

Parameters:

redis_client (Redis) – Redis connection client.

Return type:

None

async set_channel_config(channel_key, store_names, enabled=True, n_results=3, min_score=0.5)[source]

Write (create or overwrite) the auto-search config for one channel.

Builds the canonical config dict, clamps n_results to the 1-10 range and min_score to 0.0-1.0 so callers cannot persist out-of-band values, stamps a fresh updated_at timestamp, and JSON-serializes it into the single Redis key stargazer:v3:rag:auto_search:<channel_key> via SET. Once written, search_for_message() will start auto-searching that channel on the next message. An info log line records the change.

Called by the RAG admin tools in tools/rag.py and tools/cloud_rag.py (enable/share handlers) and by the web config API in web/rag_config_api.py to persist user edits.

Parameters:
  • channel_key (str) – Composite "platform:channel_id" identifier.

  • store_names (List[str]) – RAG store names to search for this channel.

  • enabled (bool) – Whether auto-search is active for the channel.

  • n_results (int) – Number of chunks to inject; clamped to 1-10.

  • min_score (float) – Minimum similarity to keep a chunk; clamped to 0.0-1.0.

Returns:

The persisted config dict (post-clamping, with the new updated_at value).

Return type:

Dict[str, Any]

async get_channel_config(channel_key)[source]

Load and decode the persisted auto-search config for a channel.

Reads the stargazer:v3:rag:auto_search:<channel_key> Redis key with a single GET and JSON-decodes it, returning None when the channel was never configured. This is the canonical read used both to render current settings and to gate whether a message should be auto-searched at all.

Called internally by search_for_message() (the enablement check) and disable_channel(), by the web config API in web/rag_config_api.py, by the cloud-RAG share/unshare tools in tools/cloud_rag.py, and by the RAG status tool in tools/rag.py.

Parameters:

channel_key (str) – Composite "platform:channel_id" identifier.

Returns:

The decoded config dict, or None if the channel has no stored config.

Return type:

Optional[Dict[str, Any]]

async disable_channel(channel_key)[source]

Turn off auto-search for a channel without discarding its store list.

Loads the existing config via get_channel_config(), flips enabled to False, refreshes updated_at, and writes the record back with a Redis SET. This is a soft toggle: the configured store_names are preserved so the channel can be re-enabled later without re-selecting stores. Returns False (a no-op) when the channel was never configured.

Called by the RAG admin tool in tools/rag.py (the disable action).

Parameters:

channel_key (str) – Composite "platform:channel_id" identifier.

Returns:

True if a config existed and was updated, False if there was nothing to disable.

Return type:

bool

async remove_channel_config(channel_key)[source]

Permanently delete a channel’s auto-search config from Redis.

Issues a single DEL on the stargazer:v3:rag:auto_search:<channel_key> key. Unlike disable_channel(), this discards the stored store_names entirely, so the channel reverts to having no RAG configuration at all.

Called by the web config API in web/rag_config_api.py, by the RAG admin tool in tools/rag.py, and by the cloud-RAG unshare flow in tools/cloud_rag.py when the last shared store is removed.

Parameters:

channel_key (str) – Composite "platform:channel_id" identifier.

Returns:

True if a key was actually deleted, False if no config existed for the channel.

Return type:

bool

async list_configured_channels()[source]

Enumerate every channel that currently has an auto-search config.

Walks all stargazer:v3:rag:auto_search:* keys with a non-blocking SCAN iterator and GET-decodes each into its config dict. Used to render an admin overview of which channels have RAG enabled and against which stores; the scan plus per-key fetch makes this O(number of configured channels) rather than a single bulk read.

Called by the web config API in web/rag_config_api.py and by the RAG status/listing tool in tools/rag.py.

Returns:

One decoded config dict per configured channel (order follows the Redis scan, i.e. unspecified).

Return type:

List[Dict[str, Any]]

async search_for_message(channel_key, message_content, chunk_size=10000, query_embedding=None, user_id='')[source]

Perform auto-search if the channel is configured.

Parameters:
  • query_embedding (list[float] | None) – Pre-computed 3072-d embedding for message_content. When provided it is forwarded to the pgvector store as the KNN query vector, skipping a redundant embedding call.

  • user_id (str) – The message author. Used to enforce access control on cloud_usr_ stores.

  • string (Returns XML-formatted RAG context)

  • None. (or)

  • channel_key (str)

  • message_content (str)

  • chunk_size (int)

Return type:

Optional[str]

Submodules