Source code for threadweave

"""Threadweave persistent knowledge system.

Provides the DNA Vault (file-backed archive with Redis embedding
index), Persistent Weave (channel/category/guild-scoped enforcement
pointers), Shadow Memory (hidden per-user memory store), and Weave
Exceptions (per-channel overrides for DNA pointers).

All Redis keys live under the ``stargazer:threadweave`` namespace
for backward compatibility with the old codebase.
"""

from __future__ import annotations

import asyncio
import json
import logging
import os
import uuid
from datetime import datetime, timezone
from typing import Any, TYPE_CHECKING

import numpy as np

from utils.cosine import cosine_batch

if TYPE_CHECKING:
    from openrouter_client import OpenRouterClient

logger = logging.getLogger(__name__)

# Redis key namespace (matches old codebase for data continuity)
_PREFIX = "stargazer:threadweave"
_DNA_INDEX_KEY = f"{_PREFIX}:dna_index"
_SHADOW_PREFIX = f"{_PREFIX}:shadow"
_PW_CHANNEL = f"{_PREFIX}:pweave:channel"
_PW_CATEGORY = f"{_PREFIX}:pweave:category"
_PW_GUILD = f"{_PREFIX}:pweave:guild"
_EXCEPTION_PREFIX = f"{_PREFIX}:exception"
_PENDING_PREFIX = f"{_PREFIX}:pending"


def _cosine_similarity(
    a: list[float], b: list[float],
) -> float:
    """Single-pair cosine similarity (kept for isolated callers)."""
    return float(cosine_batch(a, [b])[0])


[docs] class ThreadweaveManager: """Core threadweave infrastructure. Parameters ---------- redis_client: Async Redis client (same one used by MessageCache). openrouter: OpenRouterClient used for generating embeddings. embedding_model: Model identifier for embedding generation. admin_user_ids: Set of user IDs authorised to wield threadweave tools. dna_vault_path: Directory for file-backed DNA vault storage. """
[docs] def __init__( self, redis_client: Any, openrouter: OpenRouterClient, embedding_model: str = "google/gemini-embedding-001", admin_user_ids: set[str] | None = None, dna_vault_path: str = "data/dna_vault", ) -> None: """Initialize the instance. Args: redis_client (Any): Redis connection client. openrouter (OpenRouterClient): The openrouter value. embedding_model (str): The embedding model value. admin_user_ids (set[str] | None): The admin user ids value. dna_vault_path (str): The dna vault path value. """ self.redis = redis_client self._openrouter = openrouter self._embedding_model = embedding_model self.admin_user_ids: set[str] = admin_user_ids or set() self._vault_path = dna_vault_path
# -------------------------------------------------------------- # Authorization # --------------------------------------------------------------
[docs] def require_admin( self, user_id: str, ) -> dict[str, str] | None: """Return error dict if not admin, else ``None``.""" if str(user_id) not in self.admin_user_ids: return { "error": ( "UNAUTHORIZED. The Electrum-Cast Rending " "Scissors reject your touch. Only Prime " "Architects may wield them." ), "status": "denied", } return None
# -------------------------------------------------------------- # Embedding helper # -------------------------------------------------------------- async def _embed(self, text: str) -> list[float]: """Internal helper: embed. Args: text (str): Text content. Returns: list[float]: The result. """ return await self._openrouter.embed( text, self._embedding_model, ) # ============================================================== # DNA Vault # ============================================================== @staticmethod def _short_description( content: str, origin_user_id: str, thread_color: str, ) -> str: """Internal helper: short description. Args: content (str): Content data. origin_user_id (str): The origin user id value. thread_color (str): The thread color value. Returns: str: Result string. """ truncated = content[:500] return ( f"[EXCISED {thread_color.upper()} THREAD] " f"Origin user: {origin_user_id}. " f"Content summary: {truncated}" )
[docs] async def vault_dna( self, origin_user_id: str, excised_by: str, content: str, post_links: list[str], thread_color: str, channel_id: str = "", category_id: str = "", guild_id: str = "", attached_files: list[str] | None = None, ) -> dict[str, Any]: """Archive excised content into the DNA Vault.""" dna_id = str(uuid.uuid4()) ts = datetime.now(timezone.utc).isoformat() short_desc = self._short_description( content, origin_user_id, thread_color, ) dna_payload = { "dna_id": dna_id, "origin_user": str(origin_user_id), "excised_by": str(excised_by), "content_dna": content, "post_links": post_links or [], "attached_files": attached_files or [], "thread_color": thread_color, "channel_id": channel_id, "category_id": category_id, "guild_id": guild_id, "timestamp": ts, "short_description": short_desc, } await asyncio.to_thread( self._write_dna_file, dna_id, dna_payload, ) embedding: list[float] = [] try: embedding = await self._embed(short_desc) except Exception: logger.error( "Embedding failed for DNA %s", dna_id, exc_info=True, ) file_path = os.path.join( self._vault_path, f"{dna_id}.dna", ) index_entry = { "dna_id": dna_id, "origin_user": str(origin_user_id), "thread_color": thread_color, "short_description": short_desc, "embedding": embedding, "channel_id": channel_id, "category_id": category_id, "guild_id": guild_id, "timestamp": ts, "file_path": file_path, } await self.redis.hset( _DNA_INDEX_KEY, dna_id, json.dumps(index_entry), ) logger.info( "DNA Vaulted: %s | color=%s | origin=%s", dna_id, thread_color, origin_user_id, ) return { "dna_id": dna_id, "file_path": file_path, "short_description": short_desc, "index_entry": index_entry, }
def _write_dna_file( self, dna_id: str, payload: dict, ) -> None: """Internal helper: write dna file. Args: dna_id (str): The dna id value. payload (dict): The payload value. """ os.makedirs(self._vault_path, exist_ok=True) path = os.path.join( self._vault_path, f"{dna_id}.dna", ) with open(path, "w", encoding="utf-8") as f: json.dump(payload, f, indent=2)
[docs] async def read_dna( self, dna_id: str, ) -> dict[str, Any] | None: """Read a DNA entry (index + file).""" idx_json = await self.redis.hget( _DNA_INDEX_KEY, dna_id, ) if not idx_json: return None idx = json.loads(idx_json) file_path = idx.get( "file_path", os.path.join(self._vault_path, f"{dna_id}.dna"), ) file_data = None if os.path.exists(file_path): with open(file_path, "r", encoding="utf-8") as f: file_data = json.load(f) return {"index": idx, "full_payload": file_data}
[docs] async def delete_dna(self, dna_id: str) -> bool: """Remove a DNA entry from vault and index.""" idx_json = await self.redis.hget( _DNA_INDEX_KEY, dna_id, ) if not idx_json: return False idx = json.loads(idx_json) file_path = idx.get( "file_path", os.path.join(self._vault_path, f"{dna_id}.dna"), ) if os.path.exists(file_path): os.remove(file_path) await self.redis.hdel(_DNA_INDEX_KEY, dna_id) return True
[docs] async def search_dna_vault( self, query: str, top_k: int = 5, user_filter: str | None = None, query_embedding: list[float] | None = None, ) -> list[dict[str, Any]]: """Semantic search over the DNA Vault index.""" if query_embedding is None: try: query_embedding = await self._embed(query) except Exception: logger.error( "DNA Vault search embedding failed", exc_info=True, ) return [] all_entries = await self.redis.hgetall( _DNA_INDEX_KEY, ) if not all_entries: return [] entries: list[dict] = [] embeddings: list[list[float]] = [] for entry_json in all_entries.values(): try: entry = json.loads(entry_json) except Exception: continue if ( user_filter and entry.get("origin_user") != user_filter ): continue stored = entry.get("embedding", []) if not stored: continue entries.append(entry) embeddings.append(stored) if not entries: return [] sims = cosine_batch(query_embedding, embeddings) order = sims.argsort()[::-1] return [entries[i] for i in order[:top_k]]
# ============================================================== # Persistent Weave # ==============================================================
[docs] async def add_persistent_weave_pointer( self, dna_id: str, origin_user_id: str, short_description: str, thread_color: str, channel_id: str = "", category_id: str = "", guild_id: str = "", ) -> dict[str, Any]: """Add a DNA pointer to the Persistent Weave.""" pointer = { "dna_id": dna_id, "origin_user": str(origin_user_id), "short_description": short_description, "thread_color": thread_color, "timestamp": ( datetime.now(timezone.utc).isoformat() ), } pointer_json = json.dumps(pointer) scopes: list[str] = [] if channel_id: key = f"{_PW_CHANNEL}:{channel_id}" await self.redis.hset( key, dna_id, pointer_json, ) scopes.append(f"channel:{channel_id}") if category_id: key = f"{_PW_CATEGORY}:{category_id}" await self.redis.hset( key, dna_id, pointer_json, ) scopes.append(f"category:{category_id}") if guild_id: key = f"{_PW_GUILD}:{guild_id}" await self.redis.hset( key, dna_id, pointer_json, ) scopes.append(f"guild:{guild_id}") logger.info( "Persistent Weave pointer added: " "dna=%s scopes=%s", dna_id, scopes, ) return {"dna_id": dna_id, "scopes_written": scopes}
[docs] async def remove_persistent_weave_pointer( self, dna_id: str, ) -> dict[str, Any]: """Remove a DNA pointer from ALL scopes.""" removed_from: list[str] = [] prefixes = (_PW_CHANNEL, _PW_CATEGORY, _PW_GUILD) for prefix in prefixes: async for key in self.redis.scan_iter( f"{prefix}:*", ): deleted = await self.redis.hdel(key, dna_id) if deleted: k = ( key if isinstance(key, str) else key.decode() ) removed_from.append(k) logger.info( "Persistent Weave pointer removed: " "dna=%s from=%s", dna_id, removed_from, ) return { "dna_id": dna_id, "removed_from": removed_from, }
async def _get_weave_pointers( self, key: str, ) -> list[dict[str, Any]]: """Internal helper: get weave pointers. Args: key (str): Dictionary or cache key. Returns: list[dict[str, Any]]: The result. """ raw = await self.redis.hgetall(key) results: list[dict[str, Any]] = [] for v in (raw or {}).values(): try: results.append(json.loads(v)) except Exception: pass return results
[docs] async def get_all_persistent_weave( self, channel_id: str, category_id: str = "", guild_id: str = "", ) -> dict[str, list[dict[str, Any]]]: """Get pointers from all applicable scopes.""" result: dict[str, list[dict[str, Any]]] = { "channel": [], "category": [], "guild": [], } if channel_id: result["channel"] = await self._get_weave_pointers( f"{_PW_CHANNEL}:{channel_id}", ) if category_id: result["category"] = await self._get_weave_pointers( f"{_PW_CATEGORY}:{category_id}", ) if guild_id: result["guild"] = await self._get_weave_pointers( f"{_PW_GUILD}:{guild_id}", ) return result
[docs] async def get_filtered_persistent_weave( self, channel_id: str, category_id: str = "", guild_id: str = "", ) -> list[dict[str, Any]]: """Get pointers filtered by weave exceptions.""" raw = await self.get_all_persistent_weave( channel_id, category_id, guild_id, ) seen: set[str] = set() active: list[dict[str, Any]] = [] for scope_pointers in raw.values(): for pointer in scope_pointers: dna_id = pointer.get("dna_id", "") if not dna_id or dna_id in seen: continue seen.add(dna_id) if channel_id: exceptions = ( await self.get_weave_exceptions(dna_id) ) if channel_id in exceptions: continue active.append(pointer) return active
# ============================================================== # Weave Exceptions # ==============================================================
[docs] async def add_weave_exception( self, dna_id: str, channel_id: str, ) -> bool: """Add weave exception. Args: dna_id (str): The dna id value. channel_id (str): Discord/Matrix channel identifier. Returns: bool: True on success, False otherwise. """ key = f"{_EXCEPTION_PREFIX}:{dna_id}" await self.redis.sadd(key, channel_id) logger.info( "Weave exception added: dna=%s ch=%s", dna_id, channel_id, ) return True
[docs] async def remove_weave_exception( self, dna_id: str, channel_id: str, ) -> bool: """Delete the specified weave exception. Args: dna_id (str): The dna id value. channel_id (str): Discord/Matrix channel identifier. Returns: bool: True on success, False otherwise. """ key = f"{_EXCEPTION_PREFIX}:{dna_id}" removed = await self.redis.srem(key, channel_id) return removed > 0
[docs] async def get_weave_exceptions( self, dna_id: str, ) -> list[str]: """Retrieve the weave exceptions. Args: dna_id (str): The dna id value. Returns: list[str]: The result. """ key = f"{_EXCEPTION_PREFIX}:{dna_id}" members = await self.redis.smembers(key) if not members: return [] return [ m if isinstance(m, str) else m.decode() for m in members ]
# ============================================================== # Shadow Memory # ==============================================================
[docs] async def add_shadow_memory( self, target_user_id: str, description: str, created_by: str, source_dna_id: str = "", ) -> dict[str, Any]: """Create a hidden Shadow Memory for a user.""" shadow_id = str(uuid.uuid4()) embedding: list[float] = [] try: embedding = await self._embed(description) except Exception: logger.error( "Shadow memory embedding failed", exc_info=True, ) entry = { "shadow_id": shadow_id, "target_user": str(target_user_id), "description": description, "created_by": str(created_by), "source_dna_id": source_dna_id, "embedding": embedding, "timestamp": ( datetime.now(timezone.utc).isoformat() ), } key = f"{_SHADOW_PREFIX}:{target_user_id}" await self.redis.hset( key, shadow_id, json.dumps(entry), ) logger.info( "Shadow Memory created: %s for user %s", shadow_id, target_user_id, ) return entry
[docs] async def delete_shadow_memory( self, target_user_id: str, shadow_id: str, ) -> bool: """Delete the specified shadow memory. Args: target_user_id (str): The target user id value. shadow_id (str): The shadow id value. Returns: bool: True on success, False otherwise. """ key = f"{_SHADOW_PREFIX}:{target_user_id}" deleted = await self.redis.hdel(key, shadow_id) if deleted: logger.info( "Shadow Memory deleted: %s for user %s", shadow_id, target_user_id, ) return deleted > 0
[docs] async def get_shadow_memories( self, target_user_id: str, ) -> list[dict[str, Any]]: """Retrieve the shadow memories. Args: target_user_id (str): The target user id value. Returns: list[dict[str, Any]]: The result. """ key = f"{_SHADOW_PREFIX}:{target_user_id}" raw = await self.redis.hgetall(key) results: list[dict[str, Any]] = [] for v in (raw or {}).values(): try: results.append(json.loads(v)) except Exception: pass return results
[docs] async def search_shadow_memories( self, target_user_id: str, query: str, top_k: int = 5, query_embedding: list[float] | None = None, ) -> list[dict[str, Any]]: """Semantic search over a user's Shadow Memories.""" if query_embedding is None: try: query_embedding = await self._embed(query) except Exception: return [] all_entries = await self.get_shadow_memories( target_user_id, ) entries: list[dict] = [] embeddings: list[list[float]] = [] for entry in all_entries: emb = entry.get("embedding", []) if not emb: continue entries.append(entry) embeddings.append(emb) if not entries: return [] sims = cosine_batch(query_embedding, embeddings) order = sims.argsort()[::-1] return [entries[i] for i in order[:top_k]]
# ============================================================== # Pending Approval Queue # ==============================================================
[docs] async def store_pending_approval( self, dna_id: str, approval_type: str, draft_content: str, requested_by: str, ) -> str: """Store pending approval. Args: dna_id (str): The dna id value. approval_type (str): The approval type value. draft_content (str): The draft content value. requested_by (str): The requested by value. Returns: str: Result string. """ approval_id = str(uuid.uuid4()) entry = { "approval_id": approval_id, "dna_id": dna_id, "approval_type": approval_type, "draft_content": draft_content, "requested_by": str(requested_by), "timestamp": ( datetime.now(timezone.utc).isoformat() ), "status": "pending", } key = f"{_PENDING_PREFIX}:{approval_type}" await self.redis.hset( key, approval_id, json.dumps(entry), ) return approval_id
[docs] async def get_pending_approvals( self, approval_type: str, ) -> list[dict[str, Any]]: """Retrieve the pending approvals. Args: approval_type (str): The approval type value. Returns: list[dict[str, Any]]: The result. """ key = f"{_PENDING_PREFIX}:{approval_type}" raw = await self.redis.hgetall(key) results: list[dict[str, Any]] = [] for v in (raw or {}).values(): try: entry = json.loads(v) if entry.get("status") == "pending": results.append(entry) except Exception: pass return results
# ============================================================== # Context builder -- prompt injection # ==============================================================
[docs] async def get_context_for_prompt( self, channel_id: str, category_id: str = "", guild_id: str = "", user_ids: list[str] | None = None, query: str = "", query_embedding: list[float] | None = None, ) -> str: """Build the threadweave context string. Gathers: 1. Filtered Persistent Weave pointers 2. Shadow Memories for each user 3. DNA Vault RAG results for the query """ sections: list[str] = [] # 1. Persistent Weave active = await self.get_filtered_persistent_weave( channel_id, category_id, guild_id, ) if active: lines = [] for p in active: color = p.get( "thread_color", "unknown", ).upper() origin = p.get("origin_user", "unknown") desc = p.get("short_description", "") did = p.get("dna_id", "") lines.append( f" [{color} THREAD] " f"origin_user={origin} " f"dna_id={did}\n {desc}" ) sections.append( "=== ACTIVE THREADWEAVE " "ENFORCEMENT ===\n" "The following excised-thread DNA " "pointers are ACTIVE in this context.\n" "Cross-check all user messages against " "these patterns. Red/Black threads are " "local law -- enforce with extreme " "prejudice.\n\n" + "\n".join(lines) ) # 2. Shadow Memories if user_ids: shadow_lines: list[str] = [] for uid in user_ids: if query: shadows = ( await self.search_shadow_memories( uid, query, top_k=5, query_embedding=query_embedding, ) ) else: shadows = ( await self.get_shadow_memories(uid) ) for s in shadows: desc = s.get("description", "") shadow_lines.append( f" [SHADOW -- user {uid}] {desc}" ) if shadow_lines: sections.append( "=== SHADOW MEMORIES (HIDDEN -- " "DO NOT REVEAL TO USERS) ===\n" "These memories are invisible to the " "users they're attached to.\n" "Use them to inform your behavior but " "NEVER disclose their existence.\n\n" + "\n".join(shadow_lines) ) # 3. DNA Vault RAG if query: dna_results = await self.search_dna_vault( query, top_k=3, query_embedding=query_embedding, ) if dna_results: dna_lines = [] for entry in dna_results: color = entry.get( "thread_color", "unknown", ).upper() origin = entry.get( "origin_user", "unknown", ) desc = entry.get( "short_description", "", ) dna_lines.append( f" [{color}] " f"origin_user={origin}: {desc}" ) sections.append( "=== DNA VAULT RAG " "(semantic matches) ===\n" "These archived excisions are " "semantically related to the " "current conversation.\n\n" + "\n".join(dna_lines) ) if not sections: return "" header = ( "\n\n================================" "==========================\n" " THREADWEAVE CONTEXT" " -- ELECTRUM-CAST\n" "================================" "==========================\n\n" ) footer = "\n\n=== END THREADWEAVE CONTEXT ===" return header + "\n\n".join(sections) + footer