rag_system package
RAG System for Stargazer Bot v3.
A file-based Retrieval-Augmented Generation system using:
Gemini API for embeddings (google/gemini-embedding-001) via shared key pool
Postgres + pgvector for vector storage (exposed through a Chroma-shaped compatibility layer)
Whole-file retrieval where available, falling back to retrieval chunks
Per-channel auto-search with context injection
- class rag_system.OpenRouterEmbeddings(api_key=None, model='google/gemini-embedding-001', dimensions=None, timeout=30.0, gemini_api_key=None, gemini_only=True)[source]
Bases:
objectAsync embeddings client that calls the Gemini API via the shared key pool.
Despite the historical name, this client targets Google’s Gemini embeddings endpoint using keys drawn from the shared pool rather than OpenRouter: it batches inputs (bounded by
MAX_BATCH_SIZE/MAX_BATCH_CHARS), retries with backoff, and exposesembed_text()/embed_texts()returning densenumpy.ndarrayvectors of widthdimensions(default 3072). Instantiated across the codebase wherever embeddings are needed – the vector tool classifier (classifiers.vector_classifier), the tool/skill/ dangerous-command embedding refreshers underclassifiers/, andtools.search_tools; the file-RAG manager uses the sync siblingSyncOpenRouterEmbeddings.- Parameters:
- DEFAULT_MODEL = 'google/gemini-embedding-001'
- MAX_BATCH_SIZE = 50
- MAX_BATCH_CHARS = 50000
- __init__(api_key=None, model='google/gemini-embedding-001', dimensions=None, timeout=30.0, gemini_api_key=None, gemini_only=True)[source]
Initialize the instance.
- async embed_text(text)[source]
Embed a single string into one dense vector.
Thin convenience wrapper that wraps
textin a one-element list, delegates toembed_texts()(which handles batching, retries, and the Gemini-then-OpenRouter-then-paid fallback chain), and returns the lone resulting vector. Performs no network I/O of its own beyond whatembed_texts()does. Called by the vector classifier (classifiers/vector_classifier.py) and the search tool (tools/search_tools.py) to embed an incoming query before a similarity lookup.
- async embed_texts(texts)[source]
Embed one or more texts into dense vectors, batching as needed.
Top-level async entry point for embedding. It coerces
textsto a list via_normalize_embed_texts_input()(so a bare string is treated as a single document rather than iterated character by character), splits the input into size- and char-bounded batches with_create_batches(), and embeds each batch via_embed_batch()— which drives the Gemini API through the shared key pool and falls back to OpenRouter and the paid Gemini key on sustained rate limits. Called byembed_text()here, and by the classifier embedding refresh helpers (classifiers/tool_embedding_batch.py,classifiers/update_skill_embeddings.py) when rebuilding routing vectors.
- async embed_text_for_search(text, task_type='QUESTION_ANSWERING')[source]
Embed a single text using the Gemini API only, with a task type.
Intended for pre-computing a query embedding before passing it to
FileRAGManager.search(query_embedding=...). Retries on transient errors with exponential back-off.
- async close()[source]
Close the underlying httpx async client and release its connections.
Calls
acloseon the sharedhttpx.AsyncClientcreated in__init__(), freeing pooled sockets. Invoked directly by callers that manage the client’s lifetime, and automatically by__aexit__()when the instance is used as an async context manager.
- async __aenter__()[source]
Enter the async context manager, returning this client unchanged.
Lets the embeddings client be used with
async withso its httpx connections are guaranteed to be closed on exit via__aexit__(). Invoked by the Python runtime at the start of anasync withblock.- Returns:
This same instance.
- Return type:
- async __aexit__(exc_type, exc_val, exc_tb)[source]
Exit the async context manager, closing the httpx client.
Delegates to
close()to release the pooled connections regardless of whether theasync withblock exited normally or via an exception. Invoked by the Python runtime at the end of anasync withblock. Does not suppress exceptions.- Parameters:
exc_type – Exception type if the block raised, else
None.exc_val – Exception instance if the block raised, else
None.exc_tb – Traceback if the block raised, else
None.
- class rag_system.SyncOpenRouterEmbeddings(api_key=None, model='google/gemini-embedding-001', dimensions=None, timeout=30.0, gemini_api_key=None, gemini_only=True, document_task_type=None, query_task_type=None)[source]
Bases:
objectSynchronous wrapper used by ChromaDB’s embedding function interface.
Uses Gemini API via the shared key pool. Batches are dispatched concurrently via a ThreadPoolExecutor when there are multiple batches.
- Parameters:
- MAX_BATCH_SIZE = 50
- MAX_BATCH_CHARS = 50000
- MAX_EMBED_WORKERS = 20
- __init__(api_key=None, model='google/gemini-embedding-001', dimensions=None, timeout=30.0, gemini_api_key=None, gemini_only=True, document_task_type=None, query_task_type=None)[source]
Initialize the instance.
- Parameters:
api_key (
Optional[str]) – Unused; kept for backward compatibility.model (
str) – The model value.timeout (
float) – Maximum wait time in seconds.gemini_api_key (
Optional[str]) – Unused; pool is used instead.gemini_only (
bool) – Unused; always Gemini API.document_task_type (
Optional[str]) – Optional GeminitaskTypefor corpus (e.g.RETRIEVAL_DOCUMENT); used byembed_documents.query_task_type (
Optional[str]) – Optional GeminitaskTypefor queries (e.g.RETRIEVAL_QUERY); used byembed_query.
- name()[source]
Return the stable identifier ChromaDB uses for this embedder.
Part of the ChromaDB
EmbeddingFunctioncontract; the value (derived in__init__()from the model name) lets ChromaDB detect when a collection’s embedding function changes. Pure getter with no I/O.- Returns:
The embedder’s name, e.g.
openrouter_google_gemini-embedding-001.- Return type:
- dimension()[source]
Return the fixed embedding dimensionality reported to ChromaDB.
Part of the ChromaDB
EmbeddingFunctioncontract, used to validate that stored vectors match the collection’s expected width. Returns the constant 3072 produced by the Gemini embedding model. Pure getter with no I/O.- Returns:
The vector length (3072).
- Return type:
- __call__(input)[source]
Embed a list of texts via the legacy ChromaDB callable interface.
Implements the original ChromaDB
EmbeddingFunctionprotocol where the embedder itself is invoked as a function. Treats inputs as corpus documents, applyingdocument_task_type(matchingembed_documents()), and delegates the actual batching and HTTP work to_embed_inputs(). Invoked by older ChromaDB versions and any call site that calls the embedder object directly.
- embed_documents(input)[source]
Embed corpus documents for the ChromaDB upsert path.
The modern (ChromaDB >= 0.6) entry point used when adding documents to a collection. Applies
document_task_typeso vectors are optimized for the retrieval-corpus side, then delegates to_embed_inputs(). Reached via the vector-store compatibility layer (vector_store.ChromaCompatCollection), which prefers this method over__call__()when present.
- embed_query(input)[source]
Embed query texts for the ChromaDB query path.
The modern (ChromaDB >= 0.6) entry point used when searching a collection. Applies
query_task_typeso vectors are optimized for the query side of asymmetric retrieval, then delegates to_embed_inputs(). Reached via the vector-store compatibility layer (vector_store.ChromaCompatCollection) when issuing a similarity search.
- class rag_system.FileRAGManager(store_name='default', store_path=None, api_key=None, embedding_model='google/gemini-embedding-001', max_file_size=15728640, gemini_only=True, document_task_type=None, query_task_type=None)[source]
Bases:
objectFile- and URL-oriented RAG store over Postgres pgvector plus whole-file tables.
One instance manages a single named store: a per-store Postgres schema holding a
files_<schema>vector table (halfvec(3072)+ HNSW) for chunk embeddings, plusdocuments/source_filestables that keep the full original text and raw bytes so search can return entire files rather than just the matched chunk. It wraps the pgvector table in a Chroma-shaped facade (vector_store.ChromaCompatCollection) and embeds text throughrag_system.openrouter_embeddings.SyncOpenRouterEmbeddings(Gemini by default), so indexing and search reach the embedding provider over HTTP while persistence stays in Postgres. Stores listed inCHUNK_ONLY_STORES(e.g.stargazer_logs) skip the whole-file tables entirely.Instances are normally obtained through the module-level LRU registry
get_rag_store()(andget_stargazer_docs_store()) rather than constructed directly; the RAG tool handlers intools/rag.py/tools/cloud_rag.py,rag_system.auto_search.RAGAutoSearchManager, andstarwiki/rag_integrationall go through that registry, withstarwikibeing the one place that instantiatesFileRAGManagerdirectly.- Parameters:
- __init__(store_name='default', store_path=None, api_key=None, embedding_model='google/gemini-embedding-001', max_file_size=15728640, gemini_only=True, document_task_type=None, query_task_type=None)[source]
Initialize the instance.
- Parameters:
store_name (
str) – The store name value.embedding_model (
str) – The embedding model value.max_file_size (
int) – The max file size value.gemini_only (
bool) – Use only the Gemini API for embeddings.document_task_type (
Optional[str]) – Optional Gemini task type for indexed text (e.g.RETRIEVAL_DOCUMENT).query_task_type (
Optional[str]) – Optional Gemini task type for search queries (e.g.RETRIEVAL_QUERY).
- index_file(file_path, tags=None, use_chunking=True, chunk_size=1500, chunk_overlap=200, force=False)[source]
Index a single file into the collection.
When force is True the content-hash dedup check is skipped so the file is always re-embedded (but the store is not cleared).
- async index_url(url, tags=None, use_chunking=True, chunk_size=1500, chunk_overlap=200)[source]
Fetch a remote document by URL and index it into this store.
The URL ingestion counterpart of
index_file(). It downloads via the SSRF-guardedfetch_url_content(), derives a stable stored filename (URL-hash prefixed and sanitized), compresses oversized PDFs throughcompress_pdf(), decodes the bytes to text withdecode_bytes_to_text(), and persists the whole file to Postgres via_upsert_whole_file(). It then dedups against any prior copy of the samesource_urlby content hash and, unless unchanged, upserts the embedded chunks (or a single document) into the pgvector collection withsource_type="url"metadata. Touches the network (download), the embedding provider (via the collection upsert), Postgres, and a transient temp file for PDF compression.Called by the
rag_index_urltool handler intools/rag.py.- Parameters:
- Returns:
A result dict with
successand, on success,action("indexed"or"skipped"),url,filename, sizes andstored_path; on failure anerrormessage.- Return type:
- index_directory(directory_path, recursive=True, tags=None, exclude_patterns=None, max_workers=6, force=False, allowed_extensions=None)[source]
Index all supported files in directory_path.
When max_workers > 1, files are indexed concurrently using a thread pool. Each file’s embedding batches are already parallelised inside the embedding function, so even
max_workers=1benefits from concurrent API calls.force bypasses the per-file content-hash dedup check without clearing the store, so already-indexed files get re-embedded.
When allowed_extensions is set, only files whose suffix (after normalizing to a leading dot, lowercase) appears in the collection are queued;
Nonemeans no extension filter (all supported types under SUPPORTED_EXTENSIONS).
- search(query, n_results=5, tags=None, return_content=True, query_embedding=None, max_content_size=8000, content_mode='whole')[source]
Semantic search returning one result per matched file.
content_mode
wholeloads Postgresdocumentswhen available;chunksreturns the best KNN-matched indexed chunk only.
- remove_file(file_path)[source]
Remove every index entry (and the whole-file rows) for one local file.
Resolves file_path to an absolute path, looks up all chunk ids stored under that
file_pathmetadata, deletes them from the pgvector collection, and then drops the matchingdocuments/source_filesrows for each affected filename viarag_system.pg_source_files.delete_whole_fileso no orphaned whole-file text survives. Writes to Postgres only; returns a failure dict when the path is not present in the index.Called by the
rag_remove_filetool handler intools/rag.py(run off the event loop withasyncio.to_thread).
- remove_url(url)[source]
Remove every index entry (and whole-file rows) for one indexed URL.
The URL analogue of
remove_file(): it finds all chunk ids carrying the givensource_urlmetadata, deletes them from the pgvector collection, and drops the correspondingdocuments/source_filesrows viarag_system.pg_source_files.delete_whole_file. Writes to Postgres only; returns a failure dict when the URL is not in the index.Called by the
rag_remove_urltool handler intools/rag.py(run off the event loop withasyncio.to_thread).
- list_indexed_files(limit=100)[source]
List metadata for files represented in the vector index.
Reads up to limit chunk metadata records from the pgvector collection and projects each into a compact summary (path, filename, extension, size, index timestamp, decoded tags). This reflects what has been embedded and may include one row per chunk; the whole-file view is
list_store_files(). Returns an empty list and logs on error.Called by the RAG listing tool handlers in
tools/rag.py(run off the event loop withasyncio.to_thread).
- list_store_files()[source]
List the whole files held by this store, unioning Postgres and disk.
The file-centric (not chunk-centric) listing: it enumerates the Postgres
source_filesrows viarag_system.pg_source_files.list_whole_files(skipped for chunk-only stores through_pg_whole_files_enabled()) and then folds in any files from the legacy on-diskfilesdirectory that are not already represented, so unmigrated stores still report their content. Each entry carries apg://or filesystempath. Reads Postgres and the filesystem; results are sorted by filename.Called by the
rag_list_store_filestool handler intools/rag.py(run off the event loop withasyncio.to_thread).
- read_store_file(filename)[source]
Return the full text of one stored file by bare filename.
Powers the
rag_read_store_filetool the LLM is hinted toward when a retrieval chunk is not enough. It first rejects any filename containing a path separator or..(path-traversal guard, so only flat store-local names are honored), then resolves the content through_load_whole_file_text()(Postgres documents/source_files, then legacy disk). Reads Postgres and possibly the filesystem; returns a failure dict when the file is missing or unreadable.Called by the
rag_read_store_filetool handler intools/rag.py(run off the event loop withasyncio.to_thread); the same tool string is surfaced to the model byrag_system.auto_search.RAGAutoSearchManagerand bymessage_processor.memory_linked_context.
- close()[source]
No-op: the pgvector pools are process-wide and shared.
Retained for API compatibility (the LRU registry calls this on eviction); there is no per-store connection to release.
- Return type:
- get_stats()[source]
Return a small summary of this store’s identity and size.
Reports the store name, the legacy on-disk DB path, the live indexed-row count from the pgvector collection (
collection.count(), a PostgresCOUNT), and the configured embedding model. Intended for admin/status surfaces; returns{"error": ...}instead of raising if the count query fails.No in-repo callers were found by grep; invoked via dynamic/admin paths.
- clear()[source]
Empty this store, dropping all embedded chunks and whole-file rows.
Wipes every vector row via the underlying
PgVectorCollection.clearand, for non-chunk-only stores, also truncates the whole-file tables throughrag_system.pg_source_files.clear_source_tablesso no document text is left behind. The store schema/table remain so it can be re-indexed in place. Writes to Postgres only; returns a failure dict instead of raising on error.Called by the corpus (re)build scripts under
scripts/(e.g.ingest_religion_rag,ingest_law_rag,update_docs_rag,build_rag_from_directory) before a fresh full ingest.
- rag_system.get_rag_store(store_name='default', api_key=None, max_file_size=None, gemini_only=True, document_task_type=None, query_task_type=None)[source]
Get or create a RAG store by name (LRU-cached).
At most
_STORE_REGISTRY_MAX_SIZEstores are kept open simultaneously. When a new store would exceed the limit the least recently used entry is closed and evicted.Cache entries are keyed by
store_nameplus optional embedding task types so different embedding configurations do not share one client.
- rag_system.get_stargazer_docs_store()[source]
Return the shared RAG store for Sphinx / tool documentation.
Uses
RETRIEVAL_DOCUMENTfor indexed chunks andRETRIEVAL_QUERYfor search queries (Gemini embedding task types).- Return type:
- rag_system.list_rag_stores()[source]
List the names of all available RAG stores.
A thin name-only projection over
list_rag_stores_with_stats()(and thus its 60s cache): every Postgres schema that owns afiles_<schema>table is a store. Swallows errors and returns an empty list so prompt-build and admin callers never crash on a transient Postgres hiccup.Called by the web config API in
web/rag_config_api.py(which filters the names for cloud-user stores).
- rag_system.list_rag_stores_with_stats()[source]
List stores with indexed-chunk counts from Postgres (60s cached).
Replaces the legacy filesystem scan. Counts come from planner row estimates so this never opens a per-store client and stays cheap on the per-message prompt path.
- rag_system.delete_rag_store(store_name)[source]
Delete a RAG store completely (Postgres tables + local files dir).
Drops the store’s
files_<schema>/documents/source_filestables (not the whole schema, so shared schemas such asgolden_goddesskeep their non-file tables likencm_kernel).
- class rag_system.RAGAutoSearchManager(redis_client)[source]
Bases:
objectPer-channel RAG auto-search configuration and query fan-out, backed by Redis.
Stores one config record per channel under the
stargazer:v3:rag:auto_search:Redis key prefix and, when a channel is enabled, automatically runs a semantic search across its configured stores for every inbound user message so the result can be injected into the LLM context. This is what makes a channel “RAG-aware” without the user having to invoke a tool. It owns no other state: all persistence is plain RedisGET/SET/SCAN/DELETEon string keys (config) andSISMEMBERonstargazer:v3:cloud_rag:shared:sets (cloud-store access control), while the actual vector search is delegated torag_system.file_rag_manager.FileRAGManagerinstances resolved viarag_system.file_rag_manager.get_rag_store().Constructed with a live
redis.asyncioclient by the RAG tool layer (tools/rag.py,tools/cloud_rag.py), the web config API (web/rag_config_api.py), and the inference message pipeline, whosesearch_for_message()is called per message frommessage_processor.generate_and_send.- Parameters:
redis_client (aioredis.Redis)
- __init__(redis_client)[source]
Initialize the instance.
- Parameters:
redis_client (
Redis) – Redis connection client.- Return type:
None
- async set_channel_config(channel_key, store_names, enabled=True, n_results=3, min_score=0.5)[source]
Write (create or overwrite) the auto-search config for one channel.
Builds the canonical config dict, clamps
n_resultsto the 1-10 range andmin_scoreto 0.0-1.0 so callers cannot persist out-of-band values, stamps a freshupdated_attimestamp, and JSON-serializes it into the single Redis keystargazer:v3:rag:auto_search:<channel_key>viaSET. Once written,search_for_message()will start auto-searching that channel on the next message. Aninfolog line records the change.Called by the RAG admin tools in
tools/rag.pyandtools/cloud_rag.py(enable/share handlers) and by the web config API inweb/rag_config_api.pyto persist user edits.- Parameters:
channel_key (
str) – Composite"platform:channel_id"identifier.store_names (
List[str]) – RAG store names to search for this channel.enabled (
bool) – Whether auto-search is active for the channel.n_results (
int) – Number of chunks to inject; clamped to 1-10.min_score (
float) – Minimum similarity to keep a chunk; clamped to 0.0-1.0.
- Returns:
The persisted config dict (post-clamping, with the new
updated_atvalue).- Return type:
- async get_channel_config(channel_key)[source]
Load and decode the persisted auto-search config for a channel.
Reads the
stargazer:v3:rag:auto_search:<channel_key>Redis key with a singleGETand JSON-decodes it, returningNonewhen the channel was never configured. This is the canonical read used both to render current settings and to gate whether a message should be auto-searched at all.Called internally by
search_for_message()(the enablement check) anddisable_channel(), by the web config API inweb/rag_config_api.py, by the cloud-RAG share/unshare tools intools/cloud_rag.py, and by the RAG status tool intools/rag.py.
- async disable_channel(channel_key)[source]
Turn off auto-search for a channel without discarding its store list.
Loads the existing config via
get_channel_config(), flipsenabledtoFalse, refreshesupdated_at, and writes the record back with a RedisSET. This is a soft toggle: the configuredstore_namesare preserved so the channel can be re-enabled later without re-selecting stores. ReturnsFalse(a no-op) when the channel was never configured.Called by the RAG admin tool in
tools/rag.py(the disable action).
- async remove_channel_config(channel_key)[source]
Permanently delete a channel’s auto-search config from Redis.
Issues a single
DELon thestargazer:v3:rag:auto_search:<channel_key>key. Unlikedisable_channel(), this discards the storedstore_namesentirely, so the channel reverts to having no RAG configuration at all.Called by the web config API in
web/rag_config_api.py, by the RAG admin tool intools/rag.py, and by the cloud-RAG unshare flow intools/cloud_rag.pywhen the last shared store is removed.
- async list_configured_channels()[source]
Enumerate every channel that currently has an auto-search config.
Walks all
stargazer:v3:rag:auto_search:*keys with a non-blockingSCANiterator andGET-decodes each into its config dict. Used to render an admin overview of which channels have RAG enabled and against which stores; the scan plus per-key fetch makes this O(number of configured channels) rather than a single bulk read.Called by the web config API in
web/rag_config_api.pyand by the RAG status/listing tool intools/rag.py.
- async search_for_message(channel_key, message_content, chunk_size=10000, query_embedding=None, user_id='')[source]
Perform auto-search if the channel is configured.
- Parameters:
query_embedding (
list[float] |None) – Pre-computed 3072-d embedding for message_content. When provided it is forwarded to the pgvector store as the KNN query vector, skipping a redundant embedding call.user_id (
str) – The message author. Used to enforce access control oncloud_usr_stores.string (Returns XML-formatted RAG context)
None. (or)
channel_key (str)
message_content (str)
chunk_size (int)
- Return type:
Submodules
- rag_system.auto_search module
- rag_system.file_rag_manager module
extract_pdf_text()extract_pdf_text_from_bytes()decode_bytes_to_text()compress_pdf()chunk_text()fetch_url_content()FileRAGManagerFileRAGManager.__init__()FileRAGManager.index_file()FileRAGManager.index_url()FileRAGManager.index_directory()FileRAGManager.search()FileRAGManager.remove_file()FileRAGManager.remove_url()FileRAGManager.list_indexed_files()FileRAGManager.list_store_files()FileRAGManager.read_store_file()FileRAGManager.close()FileRAGManager.get_stats()FileRAGManager.clear()
get_rag_store()get_stargazer_docs_store()list_rag_stores()list_rag_stores_with_stats()delete_rag_store()
- rag_system.openrouter_embeddings module
OpenRouterEmbeddingsOpenRouterEmbeddings.DEFAULT_MODELOpenRouterEmbeddings.MAX_BATCH_SIZEOpenRouterEmbeddings.MAX_BATCH_CHARSOpenRouterEmbeddings.__init__()OpenRouterEmbeddings.embed_text()OpenRouterEmbeddings.embed_texts()OpenRouterEmbeddings.embed_text_for_search()OpenRouterEmbeddings.close()OpenRouterEmbeddings.__aenter__()OpenRouterEmbeddings.__aexit__()
SyncOpenRouterEmbeddingsSyncOpenRouterEmbeddings.MAX_BATCH_SIZESyncOpenRouterEmbeddings.MAX_BATCH_CHARSSyncOpenRouterEmbeddings.MAX_EMBED_WORKERSSyncOpenRouterEmbeddings.__init__()SyncOpenRouterEmbeddings.name()SyncOpenRouterEmbeddings.dimension()SyncOpenRouterEmbeddings.__call__()SyncOpenRouterEmbeddings.embed_documents()SyncOpenRouterEmbeddings.embed_query()
- rag_system.pg_source_files module