Source code for threadweave

"""Threadweave persistent knowledge system.

Provides the DNA Vault (file-backed archive with Redis embedding
index), Persistent Weave (channel/category/guild-scoped enforcement
pointers), Shadow Memory (hidden per-user memory store), and Weave
Exceptions (per-channel overrides for DNA pointers).

All Redis keys live under the ``stargazer:threadweave`` namespace
for backward compatibility with the old codebase.
"""

from __future__ import annotations

import asyncio
import jsonutil as json
import logging
import os
import uuid
from datetime import datetime, timezone
from typing import Any, TYPE_CHECKING


from utils.cosine import cosine_batch

if TYPE_CHECKING:
    from openrouter_client import OpenRouterClient

logger = logging.getLogger(__name__)

# Redis key namespace (matches old codebase for data continuity)
_PREFIX = "stargazer:threadweave"
_DNA_INDEX_KEY = f"{_PREFIX}:dna_index"
_SHADOW_PREFIX = f"{_PREFIX}:shadow"
_PW_CHANNEL = f"{_PREFIX}:pweave:channel"
_PW_CATEGORY = f"{_PREFIX}:pweave:category"
_PW_GUILD = f"{_PREFIX}:pweave:guild"
_EXCEPTION_PREFIX = f"{_PREFIX}:exception"
_PENDING_PREFIX = f"{_PREFIX}:pending"


def _cosine_similarity(
    a: list[float],
    b: list[float],
) -> float:
    """Compute the cosine similarity between two single embedding vectors.

    A thin convenience wrapper kept for the rare caller that has exactly
    one pair of vectors on hand; it exists so such callers do not have to
    construct a singleton batch themselves. Internally it delegates to the
    vectorised :func:`utils.cosine.cosine_batch`, passing ``b`` as a
    one-element batch and unwrapping the first (and only) score.

    This is pure CPU math with no Redis, network, or LLM interaction. It is
    not referenced elsewhere in the repository today (no callers found via
    grep); the hot paths in this module call ``cosine_batch`` directly for
    speed, so this helper is retained only for isolated/external use.

    Args:
        a: The query embedding as a list of floats.
        b: The candidate embedding to compare against, same dimensionality
            as ``a``.

    Returns:
        float: The cosine similarity score, typically in the range
        ``[-1.0, 1.0]``.
    """
    return float(cosine_batch(a, [b])[0])


[docs] class ThreadweaveManager: """Core threadweave infrastructure. Parameters ---------- redis_client: Async Redis client (same one used by MessageCache). openrouter: OpenRouterClient used for generating embeddings. embedding_model: Model identifier for embedding generation. admin_user_ids: Set of user IDs authorised to wield threadweave tools. dna_vault_path: Directory for file-backed DNA vault storage. """
[docs] def __init__( self, redis_client: Any, openrouter: OpenRouterClient, embedding_model: str = "google/gemini-embedding-001", admin_user_ids: set[str] | None = None, dna_vault_path: str = "data/dna_vault", ) -> None: """Initialize the instance. Args: redis_client (Any): Redis connection client. openrouter (OpenRouterClient): The openrouter value. embedding_model (str): The embedding model value. admin_user_ids (set[str] | None): The admin user ids value. dna_vault_path (str): The dna vault path value. """ self.redis = redis_client self._openrouter = openrouter self._embedding_model = embedding_model self.admin_user_ids: set[str] = admin_user_ids or set() self._vault_path = dna_vault_path
# -------------------------------------------------------------- # Authorization # --------------------------------------------------------------
[docs] def require_admin( self, user_id: str, ) -> dict[str, str] | None: """Gate threadweave actions behind the Prime Architect allow-list. Checks whether ``user_id`` belongs to ``self.admin_user_ids`` and, when it does not, produces a ready-to-return denial payload. This is the single chokepoint that keeps the destructive threadweave capabilities (vaulting DNA, planting Shadow Memories, editing the Persistent Weave) restricted to authorised operators. It performs no I/O — it is a pure in-memory set membership test against the admin set seeded at construction time. Called by nearly every handler in ``tools/threadweave_tools.py`` (e.g. the excise, shadow-memory, and weave-exception tools), each of which calls ``tw.require_admin(ctx.user_id)`` first and short-circuits if the return value is truthy. Args: user_id: Platform sender id to authorise; coerced to ``str`` before comparison. Returns: dict[str, str] | None: ``None`` when the user is authorised; otherwise a dict with an in-character ``error`` message and a ``status`` of ``"denied"`` that the caller can surface directly to the LLM/user. """ if str(user_id) not in self.admin_user_ids: return { "error": ( "UNAUTHORIZED. The Electrum-Cast Rending " "Scissors reject your touch. Only Prime " "Architects may wield them." ), "status": "denied", } return None
# -------------------------------------------------------------- # Embedding helper # -------------------------------------------------------------- async def _embed(self, text: str) -> list[float]: """Internal helper: embed. Args: text (str): Text content. Returns: list[float]: The result. """ return await self._openrouter.embed( text, self._embedding_model, ) # ============================================================== # DNA Vault # ============================================================== @staticmethod def _short_description( content: str, origin_user_id: str, thread_color: str, ) -> str: """Internal helper: short description. Args: content (str): Content data. origin_user_id (str): The origin user id value. thread_color (str): The thread color value. Returns: str: Result string. """ truncated = content[:500] return ( f"[EXCISED {thread_color.upper()} THREAD] " f"Origin user: {origin_user_id}. " f"Content summary: {truncated}" )
[docs] async def vault_dna( self, origin_user_id: str, excised_by: str, content: str, post_links: list[str], thread_color: str, channel_id: str = "", category_id: str = "", guild_id: str = "", attached_files: list[str] | None = None, ) -> dict[str, Any]: """Archive an excised conversational thread into the DNA Vault. Persists the full "excised" content both as a file-backed record and as a searchable index entry, so that future conversations can be cross-checked against past offences. This is the durable write side of the DNA Vault: it mints a fresh ``dna_id``, builds a short human summary, writes the complete payload to a ``.dna`` JSON file, embeds the summary for semantic search, and registers the entry in Redis. It touches several subsystems. The file write goes to ``{dna_vault_path}/{dna_id}.dna`` via ``_write_dna_file`` (run in a worker thread so the event loop is not blocked). The summary is embedded through ``_embed`` (the OpenRouterClient embedding API); if embedding raises, the failure is logged and an empty vector is stored so the record still exists. The index entry is written to the Redis hash ``stargazer:threadweave:dna_index`` keyed by ``dna_id``. Called by the threadweave excision tools in ``tools/threadweave_tools.py`` (the various excise/vault handlers around lines 101-511), which then typically register a Persistent Weave pointer for the new id. Args: origin_user_id: Id of the user whose content is being excised. excised_by: Id of the admin performing the excision. content: Full text being archived as the thread DNA. post_links: Source message/permalink references for the content. thread_color: Severity/category tag (e.g. red, black) recorded and surfaced in the short description. channel_id: Originating channel id, stored for scoping. category_id: Originating category id, stored for scoping. guild_id: Originating guild id, stored for scoping. attached_files: Optional list of associated file references. Returns: dict[str, Any]: A summary of the write containing ``dna_id``, the absolute-ish ``file_path``, the ``short_description``, and the full ``index_entry`` that was stored in Redis. """ dna_id = str(uuid.uuid4()) ts = datetime.now(timezone.utc).isoformat() short_desc = self._short_description( content, origin_user_id, thread_color, ) dna_payload = { "dna_id": dna_id, "origin_user": str(origin_user_id), "excised_by": str(excised_by), "content_dna": content, "post_links": post_links or [], "attached_files": attached_files or [], "thread_color": thread_color, "channel_id": channel_id, "category_id": category_id, "guild_id": guild_id, "timestamp": ts, "short_description": short_desc, } await asyncio.to_thread( self._write_dna_file, dna_id, dna_payload, ) embedding: list[float] = [] try: embedding = await self._embed(short_desc) except Exception: logger.error( "Embedding failed for DNA %s", dna_id, exc_info=True, ) file_path = os.path.join( self._vault_path, f"{dna_id}.dna", ) index_entry = { "dna_id": dna_id, "origin_user": str(origin_user_id), "thread_color": thread_color, "short_description": short_desc, "embedding": embedding, "channel_id": channel_id, "category_id": category_id, "guild_id": guild_id, "timestamp": ts, "file_path": file_path, } await self.redis.hset( _DNA_INDEX_KEY, dna_id, json.dumps(index_entry), ) logger.info( "DNA Vaulted: %s | color=%s | origin=%s", dna_id, thread_color, origin_user_id, ) return { "dna_id": dna_id, "file_path": file_path, "short_description": short_desc, "index_entry": index_entry, }
def _write_dna_file( self, dna_id: str, payload: dict, ) -> None: """Internal helper: write dna file. Args: dna_id (str): The dna id value. payload (dict): The payload value. """ os.makedirs(self._vault_path, exist_ok=True) path = os.path.join( self._vault_path, f"{dna_id}.dna", ) with open(path, "w", encoding="utf-8") as f: json.dump(payload, f, indent=2)
[docs] async def read_dna( self, dna_id: str, ) -> dict[str, Any] | None: """Fetch a single DNA Vault entry by id, index plus full payload. Resolves a vaulted thread back into its two halves: the lightweight Redis index record (metadata and embedding) and the complete file-backed payload (the original content and links). This is the read counterpart to ``vault_dna`` and the canonical way other code rehydrates an archived excision. It reads the index hash ``stargazer:threadweave:dna_index`` via ``HGET`` keyed by ``dna_id``; if present, it loads the corresponding ``.dna`` JSON file from disk (path taken from the index, falling back to ``{vault_path}/{dna_id}.dna``). Called by the web admin layer (``web/threadweave_api.py``) and by several threadweave tool handlers in ``tools/threadweave_tools.py`` that need to inspect or edit an existing DNA record before acting on it. Args: dna_id: Unique id of the DNA Vault entry to read. Returns: dict[str, Any] | None: ``None`` if no index entry exists for the id; otherwise a dict with ``index`` (the Redis metadata) and ``full_payload`` (the parsed file contents, or ``None`` if the backing file is missing). """ idx_json = await self.redis.hget( _DNA_INDEX_KEY, dna_id, ) if not idx_json: return None idx = json.loads(idx_json) file_path = idx.get( "file_path", os.path.join(self._vault_path, f"{dna_id}.dna"), ) file_data = None if os.path.exists(file_path): with open(file_path, "r", encoding="utf-8") as f: file_data = json.load(f) return {"index": idx, "full_payload": file_data}
[docs] async def delete_dna(self, dna_id: str) -> bool: """Permanently delete a DNA Vault entry from disk and the index. Fully retires an archived thread: it removes both the file-backed record and its searchable index entry so the DNA no longer surfaces in vault searches or context injection. This is the destructive inverse of ``vault_dna``. It first reads the index hash ``stargazer:threadweave:dna_index`` via ``HGET`` to locate the backing file, deletes that ``.dna`` file from disk when it exists, then removes the field from the index hash via ``HDEL``. Called by the web admin endpoint in ``web/threadweave_api.py`` and by the DNA-deletion tool handler in ``tools/threadweave_tools.py`` (around line 601, which typically also clears any Persistent Weave pointers for the same id). Args: dna_id: Unique id of the DNA Vault entry to delete. Returns: bool: ``True`` if an index entry existed and was removed; ``False`` if no entry was found for the id. """ idx_json = await self.redis.hget( _DNA_INDEX_KEY, dna_id, ) if not idx_json: return False idx = json.loads(idx_json) file_path = idx.get( "file_path", os.path.join(self._vault_path, f"{dna_id}.dna"), ) if os.path.exists(file_path): os.remove(file_path) await self.redis.hdel(_DNA_INDEX_KEY, dna_id) return True
[docs] async def search_dna_vault( self, query: str, top_k: int = 5, user_filter: str | None = None, query_embedding: list[float] | None = None, ) -> list[dict[str, Any]]: """Rank DNA Vault entries by semantic similarity to a query. Provides the retrieval-augmented side of the vault: given a natural language query (or a precomputed query embedding), it finds the most relevant archived excisions so they can be injected into prompt context or shown to an operator. It powers the "DNA Vault RAG" section assembled by ``get_context_for_prompt``. When no ``query_embedding`` is supplied it embeds ``query`` via ``_embed`` (the OpenRouterClient embedding API), returning an empty list on embedding failure. It then loads every entry from the Redis hash ``stargazer:threadweave:dna_index`` via ``HGETALL``, optionally filters by ``origin_user`` when ``user_filter`` is set, skips entries with no stored embedding, and scores the rest in one vectorised pass with :func:`utils.cosine.cosine_batch`. Called internally by ``get_context_for_prompt``; no external callers were found. Args: query: Natural-language search text. Ignored when ``query_embedding`` is provided. top_k: Maximum number of best-matching entries to return. user_filter: When set, only entries whose ``origin_user`` equals this value are considered. query_embedding: Optional precomputed query vector to reuse, avoiding a redundant embedding call. Returns: list[dict[str, Any]]: Up to ``top_k`` index entries ordered by descending similarity; empty if embedding failed or no embedded entries matched. """ if query_embedding is None: try: query_embedding = await self._embed(query) except Exception: logger.error( "DNA Vault search embedding failed", exc_info=True, ) return [] all_entries = await self.redis.hgetall( _DNA_INDEX_KEY, ) if not all_entries: return [] entries: list[dict] = [] embeddings: list[list[float]] = [] for entry_json in all_entries.values(): try: entry = json.loads(entry_json) except Exception: continue if user_filter and entry.get("origin_user") != user_filter: continue stored = entry.get("embedding", []) if not stored: continue entries.append(entry) embeddings.append(stored) if not entries: return [] sims = cosine_batch(query_embedding, embeddings) order = sims.argsort()[::-1] return [entries[i] for i in order[:top_k]]
# ============================================================== # Persistent Weave # ==============================================================
[docs] async def add_persistent_weave_pointer( self, dna_id: str, origin_user_id: str, short_description: str, thread_color: str, channel_id: str = "", category_id: str = "", guild_id: str = "", ) -> dict[str, Any]: """Register a DNA pointer into one or more Persistent Weave scopes. Promotes an archived DNA entry into "active enforcement" by writing lightweight pointers at whichever of the channel, category, and guild scopes are supplied. Pointers in the Persistent Weave are what ``get_context_for_prompt`` reads back to remind the bot which excised-thread patterns are local law in the current context. For each non-empty scope id it writes the same JSON pointer (id, origin user, short description, thread color, timestamp) via ``HSET`` into the matching Redis hash: ``stargazer:threadweave:pweave:channel``, ``...:pweave:category``, or ``...:pweave:guild`` (each suffixed with the scope id), keyed by ``dna_id``. It logs the scopes written but performs no embedding or file I/O. Called by the threadweave excision tools in ``tools/threadweave_tools.py`` (the various vault/excise handlers around lines 112-523), usually right after ``vault_dna``. Args: dna_id: Id of the DNA Vault entry the pointer references. origin_user_id: Id of the user the excised thread belongs to. short_description: Human-readable summary stored in the pointer. thread_color: Severity/category tag carried on the pointer. channel_id: When non-empty, writes a channel-scoped pointer. category_id: When non-empty, writes a category-scoped pointer. guild_id: When non-empty, writes a guild-scoped pointer. Returns: dict[str, Any]: A dict with ``dna_id`` and ``scopes_written``, the list of scope labels (e.g. ``"channel:123"``) that received a pointer. """ pointer = { "dna_id": dna_id, "origin_user": str(origin_user_id), "short_description": short_description, "thread_color": thread_color, "timestamp": (datetime.now(timezone.utc).isoformat()), } pointer_json = json.dumps(pointer) scopes: list[str] = [] if channel_id: key = f"{_PW_CHANNEL}:{channel_id}" await self.redis.hset( key, dna_id, pointer_json, ) scopes.append(f"channel:{channel_id}") if category_id: key = f"{_PW_CATEGORY}:{category_id}" await self.redis.hset( key, dna_id, pointer_json, ) scopes.append(f"category:{category_id}") if guild_id: key = f"{_PW_GUILD}:{guild_id}" await self.redis.hset( key, dna_id, pointer_json, ) scopes.append(f"guild:{guild_id}") logger.info( "Persistent Weave pointer added: " "dna=%s scopes=%s", dna_id, scopes, ) return {"dna_id": dna_id, "scopes_written": scopes}
[docs] async def remove_persistent_weave_pointer( self, dna_id: str, ) -> dict[str, Any]: """Tear down a DNA pointer from every Persistent Weave scope. Globally deactivates enforcement for a DNA id by deleting its pointer wherever it may live, without the caller needing to know which channel, category, or guild scopes it was registered under. This is the inverse of ``add_persistent_weave_pointer``. It scans Redis with ``SCAN`` across all three pointer prefixes (``stargazer:threadweave:pweave:channel`` / ``...:category`` / ``...:guild``, each pattern ``...:*``) and issues an ``HDEL`` for ``dna_id`` on every matching hash, collecting the keys it actually removed from. Note this does not remove the DNA from the vault itself. Called by the web admin endpoint in ``web/threadweave_api.py`` and by the DNA-deletion tool handler in ``tools/threadweave_tools.py`` (around line 602), typically alongside ``delete_dna``. Args: dna_id: Id of the DNA pointer to remove from all scopes. Returns: dict[str, Any]: A dict with ``dna_id`` and ``removed_from``, the list of Redis hash keys from which a pointer was actually deleted. """ removed_from: list[str] = [] prefixes = (_PW_CHANNEL, _PW_CATEGORY, _PW_GUILD) for prefix in prefixes: async for key in self.redis.scan_iter( f"{prefix}:*", ): deleted = await self.redis.hdel(key, dna_id) if deleted: k = key if isinstance(key, str) else key.decode() removed_from.append(k) logger.info( "Persistent Weave pointer removed: " "dna=%s from=%s", dna_id, removed_from, ) return { "dna_id": dna_id, "removed_from": removed_from, }
async def _get_weave_pointers( self, key: str, ) -> list[dict[str, Any]]: """Internal helper: get weave pointers. Args: key (str): Dictionary or cache key. Returns: list[dict[str, Any]]: The result. """ raw = await self.redis.hgetall(key) results: list[dict[str, Any]] = [] for v in (raw or {}).values(): try: results.append(json.loads(v)) except Exception: pass return results
[docs] async def get_all_persistent_weave( self, channel_id: str, category_id: str = "", guild_id: str = "", ) -> dict[str, list[dict[str, Any]]]: """Collect Persistent Weave pointers across channel/category/guild. Gathers the raw, unfiltered set of active DNA pointers that apply to a location, grouped by the scope they were registered at. It is the broad fan-out read that ``get_filtered_persistent_weave`` builds on before applying per-channel exceptions. For each non-empty scope id it reads the corresponding Redis hash via ``_get_weave_pointers`` (which issues ``HGETALL`` and JSON-decodes the values) against ``stargazer:threadweave:pweave:channel`` / ``...:category`` / ``...:guild`` suffixed with the id. It does not deduplicate or apply exceptions. Called internally by ``get_filtered_persistent_weave``; no external callers were found. Args: channel_id: When non-empty, reads channel-scoped pointers. category_id: When non-empty, reads category-scoped pointers. guild_id: When non-empty, reads guild-scoped pointers. Returns: dict[str, list[dict[str, Any]]]: A dict with ``channel``, ``category``, and ``guild`` keys, each mapping to the list of pointer dicts found at that scope (empty lists where the scope id was blank or had no pointers). """ result: dict[str, list[dict[str, Any]]] = { "channel": [], "category": [], "guild": [], } if channel_id: result["channel"] = await self._get_weave_pointers( f"{_PW_CHANNEL}:{channel_id}", ) if category_id: result["category"] = await self._get_weave_pointers( f"{_PW_CATEGORY}:{category_id}", ) if guild_id: result["guild"] = await self._get_weave_pointers( f"{_PW_GUILD}:{guild_id}", ) return result
[docs] async def get_filtered_persistent_weave( self, channel_id: str, category_id: str = "", guild_id: str = "", ) -> list[dict[str, Any]]: """Resolve the effective Persistent Weave pointers for a channel. Produces the deduplicated, exception-aware list of DNA pointers that should actually be enforced in a given channel. This is the read used to drive prompt injection: a pointer that has a per-channel weave exception is suppressed here so the bot stops enforcing that thread in the channel where it was waived. It first fans out via ``get_all_persistent_weave`` to gather pointers from every applicable scope, then walks them keeping each ``dna_id`` only once. When a ``channel_id`` is provided it consults ``get_weave_exceptions`` (Redis set ``stargazer:threadweave:exception`` per id) and drops any pointer whose exception set contains this channel. Called internally by ``get_context_for_prompt``; no external callers were found. Args: channel_id: Channel being rendered; also used to apply per-channel weave exceptions. category_id: Category scope to include when non-empty. guild_id: Guild scope to include when non-empty. Returns: list[dict[str, Any]]: The active pointer dicts, deduplicated by ``dna_id`` and with channel-excepted pointers removed. """ raw = await self.get_all_persistent_weave( channel_id, category_id, guild_id, ) seen: set[str] = set() active: list[dict[str, Any]] = [] for scope_pointers in raw.values(): for pointer in scope_pointers: dna_id = pointer.get("dna_id", "") if not dna_id or dna_id in seen: continue seen.add(dna_id) if channel_id: exceptions = await self.get_weave_exceptions(dna_id) if channel_id in exceptions: continue active.append(pointer) return active
# ============================================================== # Weave Exceptions # ==============================================================
[docs] async def add_weave_exception( self, dna_id: str, channel_id: str, ) -> bool: """Add weave exception. Args: dna_id (str): The dna id value. channel_id (str): Discord/Matrix channel identifier. Returns: bool: True on success, False otherwise. """ key = f"{_EXCEPTION_PREFIX}:{dna_id}" await self.redis.sadd(key, channel_id) logger.info( "Weave exception added: dna=%s ch=%s", dna_id, channel_id, ) return True
[docs] async def remove_weave_exception( self, dna_id: str, channel_id: str, ) -> bool: """Delete the specified weave exception. Args: dna_id (str): The dna id value. channel_id (str): Discord/Matrix channel identifier. Returns: bool: True on success, False otherwise. """ key = f"{_EXCEPTION_PREFIX}:{dna_id}" removed = await self.redis.srem(key, channel_id) return removed > 0
[docs] async def get_weave_exceptions( self, dna_id: str, ) -> list[str]: """Retrieve the weave exceptions. Args: dna_id (str): The dna id value. Returns: list[str]: The result. """ key = f"{_EXCEPTION_PREFIX}:{dna_id}" members = await self.redis.smembers(key) if not members: return [] return [m if isinstance(m, str) else m.decode() for m in members]
# ============================================================== # Shadow Memory # ==============================================================
[docs] async def add_shadow_memory( self, target_user_id: str, description: str, created_by: str, source_dna_id: str = "", ) -> dict[str, Any]: """Plant a hidden, embedded Shadow Memory attached to a user. Records a private note about a user that informs the bot's behaviour but is never disclosed to the user it concerns. Each memory is embedded at write time so it can later be retrieved by semantic relevance during prompt assembly. It mints a ``shadow_id``, embeds ``description`` via ``_embed`` (the OpenRouterClient embedding API; on failure the error is logged and an empty vector is stored so the memory still persists), and writes the full entry via ``HSET`` into the Redis hash ``stargazer:threadweave:shadow:{target_user_id}`` keyed by the new shadow id. Called by the shadow-memory tool handlers in ``tools/threadweave_tools.py`` (around lines 187 and 802), often seeded from a freshly vaulted DNA id. Args: target_user_id: Id of the user the memory is attached to. description: The hidden note text; also the source for the embedding. created_by: Id of the admin who created the memory. source_dna_id: Optional DNA Vault id this memory derives from. Returns: dict[str, Any]: The stored entry, including ``shadow_id``, the target/creator ids, the description, the ``embedding`` (possibly empty), ``source_dna_id``, and an ISO ``timestamp``. """ shadow_id = str(uuid.uuid4()) embedding: list[float] = [] try: embedding = await self._embed(description) except Exception: logger.error( "Shadow memory embedding failed", exc_info=True, ) entry = { "shadow_id": shadow_id, "target_user": str(target_user_id), "description": description, "created_by": str(created_by), "source_dna_id": source_dna_id, "embedding": embedding, "timestamp": (datetime.now(timezone.utc).isoformat()), } key = f"{_SHADOW_PREFIX}:{target_user_id}" await self.redis.hset( key, shadow_id, json.dumps(entry), ) logger.info( "Shadow Memory created: %s for user %s", shadow_id, target_user_id, ) return entry
[docs] async def delete_shadow_memory( self, target_user_id: str, shadow_id: str, ) -> bool: """Delete the specified shadow memory. Args: target_user_id (str): The target user id value. shadow_id (str): The shadow id value. Returns: bool: True on success, False otherwise. """ key = f"{_SHADOW_PREFIX}:{target_user_id}" deleted = await self.redis.hdel(key, shadow_id) if deleted: logger.info( "Shadow Memory deleted: %s for user %s", shadow_id, target_user_id, ) return deleted > 0
[docs] async def get_shadow_memories( self, target_user_id: str, ) -> list[dict[str, Any]]: """Retrieve the shadow memories. Args: target_user_id (str): The target user id value. Returns: list[dict[str, Any]]: The result. """ key = f"{_SHADOW_PREFIX}:{target_user_id}" raw = await self.redis.hgetall(key) results: list[dict[str, Any]] = [] for v in (raw or {}).values(): try: results.append(json.loads(v)) except Exception: pass return results
[docs] async def clear_all_shadow_memories( self, target_user_id: str, ) -> int: """Nuke ALL Shadow Memories for a user in one shot. # 💀🔥 Deletes the entire Redis hash ``stargazer:threadweave:shadow:{target_user_id}``. Returns the number of individual shadow entries that existed before deletion. Args: target_user_id: Id of the user whose shadow memories are being purged. Returns: int: Count of shadow entries that were destroyed. """ key = f"{_SHADOW_PREFIX}:{target_user_id}" count = await self.redis.hlen(key) if count: await self.redis.delete(key) logger.info( "Shadow Memories PURGED: user=%s count=%d", target_user_id, count, ) return count
[docs] async def search_shadow_memories( self, target_user_id: str, query: str, top_k: int = 5, query_embedding: list[float] | None = None, ) -> list[dict[str, Any]]: """Semantically rank a target user's Shadow Memories against a query. Embeds ``query`` (unless a precomputed ``query_embedding`` is supplied), loads every stored Shadow Memory for ``target_user_id`` via :meth:`get_shadow_memories`, keeps only entries that carry an embedding, and returns the ``top_k`` most similar by cosine similarity (:func:`cosine_batch`). Read-only; an embedding failure or an empty corpus yields an empty list rather than raising. Called by :class:`ThreadweaveManager`'s relationship-context assembly (threadweave.py:1099) to surface the most relevant shadow recollections about another user. Args: target_user_id: The user whose Shadow Memories are searched. query: Natural-language text to match against; embedded on demand. top_k: Maximum number of memories to return (default 5). query_embedding: Optional precomputed embedding for ``query`` that skips the embedding call. Returns: list[dict[str, Any]]: The up-to-``top_k`` most similar memory entries, most-similar first; empty if embedding fails or no embedded entries exist. """ if query_embedding is None: try: query_embedding = await self._embed(query) except Exception: return [] all_entries = await self.get_shadow_memories( target_user_id, ) entries: list[dict] = [] embeddings: list[list[float]] = [] for entry in all_entries: emb = entry.get("embedding", []) if not emb: continue entries.append(entry) embeddings.append(emb) if not entries: return [] sims = cosine_batch(query_embedding, embeddings) order = sims.argsort()[::-1] return [entries[i] for i in order[:top_k]]
# ============================================================== # Pending Approval Queue # ==============================================================
[docs] async def store_pending_approval( self, dna_id: str, approval_type: str, draft_content: str, requested_by: str, ) -> str: """Store pending approval. Args: dna_id (str): The dna id value. approval_type (str): The approval type value. draft_content (str): The draft content value. requested_by (str): The requested by value. Returns: str: Result string. """ approval_id = str(uuid.uuid4()) entry = { "approval_id": approval_id, "dna_id": dna_id, "approval_type": approval_type, "draft_content": draft_content, "requested_by": str(requested_by), "timestamp": (datetime.now(timezone.utc).isoformat()), "status": "pending", } key = f"{_PENDING_PREFIX}:{approval_type}" await self.redis.hset( key, approval_id, json.dumps(entry), ) return approval_id
[docs] async def get_pending_approvals( self, approval_type: str, ) -> list[dict[str, Any]]: """Retrieve the pending approvals. Args: approval_type (str): The approval type value. Returns: list[dict[str, Any]]: The result. """ key = f"{_PENDING_PREFIX}:{approval_type}" raw = await self.redis.hgetall(key) results: list[dict[str, Any]] = [] for v in (raw or {}).values(): try: entry = json.loads(v) if entry.get("status") == "pending": results.append(entry) except Exception: pass return results
# ============================================================== # Context builder -- prompt injection # ==============================================================
[docs] async def get_context_for_prompt( self, channel_id: str, category_id: str = "", guild_id: str = "", user_ids: list[str] | None = None, query: str = "", query_embedding: list[float] | None = None, ) -> str: """Build the threadweave context string. Gathers: 1. Filtered Persistent Weave pointers 2. Shadow Memories for each user 3. DNA Vault RAG results for the query """ sections: list[str] = [] # 1. Persistent Weave active = await self.get_filtered_persistent_weave( channel_id, category_id, guild_id, ) if active: lines = [] for p in active: color = p.get( "thread_color", "unknown", ).upper() origin = p.get("origin_user", "unknown") desc = p.get("short_description", "") did = p.get("dna_id", "") lines.append( f" [{color} THREAD] " f"origin_user={origin} " f"dna_id={did}\n {desc}" ) sections.append( "=== ACTIVE THREADWEAVE " "ENFORCEMENT ===\n" "The following excised-thread DNA " "pointers are ACTIVE in this context.\n" "Cross-check all user messages against " "these patterns. Red/Black threads are " "local law -- enforce with extreme " "prejudice.\n\n" + "\n".join(lines) ) # 2. Shadow Memories if user_ids: shadow_lines: list[str] = [] for uid in user_ids: if query: shadows = await self.search_shadow_memories( uid, query, top_k=5, query_embedding=query_embedding, ) else: shadows = await self.get_shadow_memories(uid) for s in shadows: desc = s.get("description", "") shadow_lines.append(f" [SHADOW -- user {uid}] {desc}") if shadow_lines: sections.append( "=== SHADOW MEMORIES (HIDDEN -- " "DO NOT REVEAL TO USERS) ===\n" "These memories are invisible to the " "users they're attached to.\n" "Use them to inform your behavior but " "NEVER disclose their existence.\n\n" + "\n".join(shadow_lines) ) # 3. DNA Vault RAG if query: dna_results = await self.search_dna_vault( query, top_k=3, query_embedding=query_embedding, ) if dna_results: dna_lines = [] for entry in dna_results: color = entry.get( "thread_color", "unknown", ).upper() origin = entry.get( "origin_user", "unknown", ) desc = entry.get( "short_description", "", ) dna_lines.append(f" [{color}] " f"origin_user={origin}: {desc}") sections.append( "=== DNA VAULT RAG " "(semantic matches) ===\n" "These archived excisions are " "semantically related to the " "current conversation.\n\n" + "\n".join(dna_lines) ) if not sections: return "" header = ( "\n\n================================" "==========================\n" " THREADWEAVE CONTEXT" " -- ELECTRUM-CAST\n" "================================" "==========================\n\n" ) footer = "\n\n=== END THREADWEAVE CONTEXT ===" return header + "\n\n".join(sections) + footer