"""Threadweave persistent knowledge system.
Provides the DNA Vault (file-backed archive with Redis embedding
index), Persistent Weave (channel/category/guild-scoped enforcement
pointers), Shadow Memory (hidden per-user memory store), and Weave
Exceptions (per-channel overrides for DNA pointers).
All Redis keys live under the ``stargazer:threadweave`` namespace
for backward compatibility with the old codebase.
"""
from __future__ import annotations
import asyncio
import json
import logging
import os
import uuid
from datetime import datetime, timezone
from typing import Any, TYPE_CHECKING
import numpy as np
from utils.cosine import cosine_batch
if TYPE_CHECKING:
from openrouter_client import OpenRouterClient
logger = logging.getLogger(__name__)
# Redis key namespace (matches old codebase for data continuity)
_PREFIX = "stargazer:threadweave"
_DNA_INDEX_KEY = f"{_PREFIX}:dna_index"
_SHADOW_PREFIX = f"{_PREFIX}:shadow"
_PW_CHANNEL = f"{_PREFIX}:pweave:channel"
_PW_CATEGORY = f"{_PREFIX}:pweave:category"
_PW_GUILD = f"{_PREFIX}:pweave:guild"
_EXCEPTION_PREFIX = f"{_PREFIX}:exception"
_PENDING_PREFIX = f"{_PREFIX}:pending"
def _cosine_similarity(
a: list[float], b: list[float],
) -> float:
"""Single-pair cosine similarity (kept for isolated callers)."""
return float(cosine_batch(a, [b])[0])
[docs]
class ThreadweaveManager:
"""Core threadweave infrastructure.
Parameters
----------
redis_client:
Async Redis client (same one used by MessageCache).
openrouter:
OpenRouterClient used for generating embeddings.
embedding_model:
Model identifier for embedding generation.
admin_user_ids:
Set of user IDs authorised to wield threadweave tools.
dna_vault_path:
Directory for file-backed DNA vault storage.
"""
[docs]
def __init__(
self,
redis_client: Any,
openrouter: OpenRouterClient,
embedding_model: str = "google/gemini-embedding-001",
admin_user_ids: set[str] | None = None,
dna_vault_path: str = "data/dna_vault",
) -> None:
"""Initialize the instance.
Args:
redis_client (Any): Redis connection client.
openrouter (OpenRouterClient): The openrouter value.
embedding_model (str): The embedding model value.
admin_user_ids (set[str] | None): The admin user ids value.
dna_vault_path (str): The dna vault path value.
"""
self.redis = redis_client
self._openrouter = openrouter
self._embedding_model = embedding_model
self.admin_user_ids: set[str] = admin_user_ids or set()
self._vault_path = dna_vault_path
# --------------------------------------------------------------
# Authorization
# --------------------------------------------------------------
[docs]
def require_admin(
self, user_id: str,
) -> dict[str, str] | None:
"""Return error dict if not admin, else ``None``."""
if str(user_id) not in self.admin_user_ids:
return {
"error": (
"UNAUTHORIZED. The Electrum-Cast Rending "
"Scissors reject your touch. Only Prime "
"Architects may wield them."
),
"status": "denied",
}
return None
# --------------------------------------------------------------
# Embedding helper
# --------------------------------------------------------------
async def _embed(self, text: str) -> list[float]:
"""Internal helper: embed.
Args:
text (str): Text content.
Returns:
list[float]: The result.
"""
return await self._openrouter.embed(
text, self._embedding_model,
)
# ==============================================================
# DNA Vault
# ==============================================================
@staticmethod
def _short_description(
content: str,
origin_user_id: str,
thread_color: str,
) -> str:
"""Internal helper: short description.
Args:
content (str): Content data.
origin_user_id (str): The origin user id value.
thread_color (str): The thread color value.
Returns:
str: Result string.
"""
truncated = content[:500]
return (
f"[EXCISED {thread_color.upper()} THREAD] "
f"Origin user: {origin_user_id}. "
f"Content summary: {truncated}"
)
[docs]
async def vault_dna(
self,
origin_user_id: str,
excised_by: str,
content: str,
post_links: list[str],
thread_color: str,
channel_id: str = "",
category_id: str = "",
guild_id: str = "",
attached_files: list[str] | None = None,
) -> dict[str, Any]:
"""Archive excised content into the DNA Vault."""
dna_id = str(uuid.uuid4())
ts = datetime.now(timezone.utc).isoformat()
short_desc = self._short_description(
content, origin_user_id, thread_color,
)
dna_payload = {
"dna_id": dna_id,
"origin_user": str(origin_user_id),
"excised_by": str(excised_by),
"content_dna": content,
"post_links": post_links or [],
"attached_files": attached_files or [],
"thread_color": thread_color,
"channel_id": channel_id,
"category_id": category_id,
"guild_id": guild_id,
"timestamp": ts,
"short_description": short_desc,
}
await asyncio.to_thread(
self._write_dna_file, dna_id, dna_payload,
)
embedding: list[float] = []
try:
embedding = await self._embed(short_desc)
except Exception:
logger.error(
"Embedding failed for DNA %s",
dna_id, exc_info=True,
)
file_path = os.path.join(
self._vault_path, f"{dna_id}.dna",
)
index_entry = {
"dna_id": dna_id,
"origin_user": str(origin_user_id),
"thread_color": thread_color,
"short_description": short_desc,
"embedding": embedding,
"channel_id": channel_id,
"category_id": category_id,
"guild_id": guild_id,
"timestamp": ts,
"file_path": file_path,
}
await self.redis.hset(
_DNA_INDEX_KEY, dna_id, json.dumps(index_entry),
)
logger.info(
"DNA Vaulted: %s | color=%s | origin=%s",
dna_id, thread_color, origin_user_id,
)
return {
"dna_id": dna_id,
"file_path": file_path,
"short_description": short_desc,
"index_entry": index_entry,
}
def _write_dna_file(
self, dna_id: str, payload: dict,
) -> None:
"""Internal helper: write dna file.
Args:
dna_id (str): The dna id value.
payload (dict): The payload value.
"""
os.makedirs(self._vault_path, exist_ok=True)
path = os.path.join(
self._vault_path, f"{dna_id}.dna",
)
with open(path, "w", encoding="utf-8") as f:
json.dump(payload, f, indent=2)
[docs]
async def read_dna(
self, dna_id: str,
) -> dict[str, Any] | None:
"""Read a DNA entry (index + file)."""
idx_json = await self.redis.hget(
_DNA_INDEX_KEY, dna_id,
)
if not idx_json:
return None
idx = json.loads(idx_json)
file_path = idx.get(
"file_path",
os.path.join(self._vault_path, f"{dna_id}.dna"),
)
file_data = None
if os.path.exists(file_path):
with open(file_path, "r", encoding="utf-8") as f:
file_data = json.load(f)
return {"index": idx, "full_payload": file_data}
[docs]
async def delete_dna(self, dna_id: str) -> bool:
"""Remove a DNA entry from vault and index."""
idx_json = await self.redis.hget(
_DNA_INDEX_KEY, dna_id,
)
if not idx_json:
return False
idx = json.loads(idx_json)
file_path = idx.get(
"file_path",
os.path.join(self._vault_path, f"{dna_id}.dna"),
)
if os.path.exists(file_path):
os.remove(file_path)
await self.redis.hdel(_DNA_INDEX_KEY, dna_id)
return True
[docs]
async def search_dna_vault(
self,
query: str,
top_k: int = 5,
user_filter: str | None = None,
query_embedding: list[float] | None = None,
) -> list[dict[str, Any]]:
"""Semantic search over the DNA Vault index."""
if query_embedding is None:
try:
query_embedding = await self._embed(query)
except Exception:
logger.error(
"DNA Vault search embedding failed",
exc_info=True,
)
return []
all_entries = await self.redis.hgetall(
_DNA_INDEX_KEY,
)
if not all_entries:
return []
entries: list[dict] = []
embeddings: list[list[float]] = []
for entry_json in all_entries.values():
try:
entry = json.loads(entry_json)
except Exception:
continue
if (
user_filter
and entry.get("origin_user") != user_filter
):
continue
stored = entry.get("embedding", [])
if not stored:
continue
entries.append(entry)
embeddings.append(stored)
if not entries:
return []
sims = cosine_batch(query_embedding, embeddings)
order = sims.argsort()[::-1]
return [entries[i] for i in order[:top_k]]
# ==============================================================
# Persistent Weave
# ==============================================================
[docs]
async def add_persistent_weave_pointer(
self,
dna_id: str,
origin_user_id: str,
short_description: str,
thread_color: str,
channel_id: str = "",
category_id: str = "",
guild_id: str = "",
) -> dict[str, Any]:
"""Add a DNA pointer to the Persistent Weave."""
pointer = {
"dna_id": dna_id,
"origin_user": str(origin_user_id),
"short_description": short_description,
"thread_color": thread_color,
"timestamp": (
datetime.now(timezone.utc).isoformat()
),
}
pointer_json = json.dumps(pointer)
scopes: list[str] = []
if channel_id:
key = f"{_PW_CHANNEL}:{channel_id}"
await self.redis.hset(
key, dna_id, pointer_json,
)
scopes.append(f"channel:{channel_id}")
if category_id:
key = f"{_PW_CATEGORY}:{category_id}"
await self.redis.hset(
key, dna_id, pointer_json,
)
scopes.append(f"category:{category_id}")
if guild_id:
key = f"{_PW_GUILD}:{guild_id}"
await self.redis.hset(
key, dna_id, pointer_json,
)
scopes.append(f"guild:{guild_id}")
logger.info(
"Persistent Weave pointer added: "
"dna=%s scopes=%s",
dna_id, scopes,
)
return {"dna_id": dna_id, "scopes_written": scopes}
[docs]
async def remove_persistent_weave_pointer(
self, dna_id: str,
) -> dict[str, Any]:
"""Remove a DNA pointer from ALL scopes."""
removed_from: list[str] = []
prefixes = (_PW_CHANNEL, _PW_CATEGORY, _PW_GUILD)
for prefix in prefixes:
async for key in self.redis.scan_iter(
f"{prefix}:*",
):
deleted = await self.redis.hdel(key, dna_id)
if deleted:
k = (
key if isinstance(key, str)
else key.decode()
)
removed_from.append(k)
logger.info(
"Persistent Weave pointer removed: "
"dna=%s from=%s",
dna_id, removed_from,
)
return {
"dna_id": dna_id,
"removed_from": removed_from,
}
async def _get_weave_pointers(
self, key: str,
) -> list[dict[str, Any]]:
"""Internal helper: get weave pointers.
Args:
key (str): Dictionary or cache key.
Returns:
list[dict[str, Any]]: The result.
"""
raw = await self.redis.hgetall(key)
results: list[dict[str, Any]] = []
for v in (raw or {}).values():
try:
results.append(json.loads(v))
except Exception:
pass
return results
[docs]
async def get_all_persistent_weave(
self,
channel_id: str,
category_id: str = "",
guild_id: str = "",
) -> dict[str, list[dict[str, Any]]]:
"""Get pointers from all applicable scopes."""
result: dict[str, list[dict[str, Any]]] = {
"channel": [], "category": [], "guild": [],
}
if channel_id:
result["channel"] = await self._get_weave_pointers(
f"{_PW_CHANNEL}:{channel_id}",
)
if category_id:
result["category"] = await self._get_weave_pointers(
f"{_PW_CATEGORY}:{category_id}",
)
if guild_id:
result["guild"] = await self._get_weave_pointers(
f"{_PW_GUILD}:{guild_id}",
)
return result
[docs]
async def get_filtered_persistent_weave(
self,
channel_id: str,
category_id: str = "",
guild_id: str = "",
) -> list[dict[str, Any]]:
"""Get pointers filtered by weave exceptions."""
raw = await self.get_all_persistent_weave(
channel_id, category_id, guild_id,
)
seen: set[str] = set()
active: list[dict[str, Any]] = []
for scope_pointers in raw.values():
for pointer in scope_pointers:
dna_id = pointer.get("dna_id", "")
if not dna_id or dna_id in seen:
continue
seen.add(dna_id)
if channel_id:
exceptions = (
await self.get_weave_exceptions(dna_id)
)
if channel_id in exceptions:
continue
active.append(pointer)
return active
# ==============================================================
# Weave Exceptions
# ==============================================================
[docs]
async def add_weave_exception(
self, dna_id: str, channel_id: str,
) -> bool:
"""Add weave exception.
Args:
dna_id (str): The dna id value.
channel_id (str): Discord/Matrix channel identifier.
Returns:
bool: True on success, False otherwise.
"""
key = f"{_EXCEPTION_PREFIX}:{dna_id}"
await self.redis.sadd(key, channel_id)
logger.info(
"Weave exception added: dna=%s ch=%s",
dna_id, channel_id,
)
return True
[docs]
async def remove_weave_exception(
self, dna_id: str, channel_id: str,
) -> bool:
"""Delete the specified weave exception.
Args:
dna_id (str): The dna id value.
channel_id (str): Discord/Matrix channel identifier.
Returns:
bool: True on success, False otherwise.
"""
key = f"{_EXCEPTION_PREFIX}:{dna_id}"
removed = await self.redis.srem(key, channel_id)
return removed > 0
[docs]
async def get_weave_exceptions(
self, dna_id: str,
) -> list[str]:
"""Retrieve the weave exceptions.
Args:
dna_id (str): The dna id value.
Returns:
list[str]: The result.
"""
key = f"{_EXCEPTION_PREFIX}:{dna_id}"
members = await self.redis.smembers(key)
if not members:
return []
return [
m if isinstance(m, str) else m.decode()
for m in members
]
# ==============================================================
# Shadow Memory
# ==============================================================
[docs]
async def add_shadow_memory(
self,
target_user_id: str,
description: str,
created_by: str,
source_dna_id: str = "",
) -> dict[str, Any]:
"""Create a hidden Shadow Memory for a user."""
shadow_id = str(uuid.uuid4())
embedding: list[float] = []
try:
embedding = await self._embed(description)
except Exception:
logger.error(
"Shadow memory embedding failed",
exc_info=True,
)
entry = {
"shadow_id": shadow_id,
"target_user": str(target_user_id),
"description": description,
"created_by": str(created_by),
"source_dna_id": source_dna_id,
"embedding": embedding,
"timestamp": (
datetime.now(timezone.utc).isoformat()
),
}
key = f"{_SHADOW_PREFIX}:{target_user_id}"
await self.redis.hset(
key, shadow_id, json.dumps(entry),
)
logger.info(
"Shadow Memory created: %s for user %s",
shadow_id, target_user_id,
)
return entry
[docs]
async def delete_shadow_memory(
self, target_user_id: str, shadow_id: str,
) -> bool:
"""Delete the specified shadow memory.
Args:
target_user_id (str): The target user id value.
shadow_id (str): The shadow id value.
Returns:
bool: True on success, False otherwise.
"""
key = f"{_SHADOW_PREFIX}:{target_user_id}"
deleted = await self.redis.hdel(key, shadow_id)
if deleted:
logger.info(
"Shadow Memory deleted: %s for user %s",
shadow_id, target_user_id,
)
return deleted > 0
[docs]
async def get_shadow_memories(
self, target_user_id: str,
) -> list[dict[str, Any]]:
"""Retrieve the shadow memories.
Args:
target_user_id (str): The target user id value.
Returns:
list[dict[str, Any]]: The result.
"""
key = f"{_SHADOW_PREFIX}:{target_user_id}"
raw = await self.redis.hgetall(key)
results: list[dict[str, Any]] = []
for v in (raw or {}).values():
try:
results.append(json.loads(v))
except Exception:
pass
return results
[docs]
async def search_shadow_memories(
self,
target_user_id: str,
query: str,
top_k: int = 5,
query_embedding: list[float] | None = None,
) -> list[dict[str, Any]]:
"""Semantic search over a user's Shadow Memories."""
if query_embedding is None:
try:
query_embedding = await self._embed(query)
except Exception:
return []
all_entries = await self.get_shadow_memories(
target_user_id,
)
entries: list[dict] = []
embeddings: list[list[float]] = []
for entry in all_entries:
emb = entry.get("embedding", [])
if not emb:
continue
entries.append(entry)
embeddings.append(emb)
if not entries:
return []
sims = cosine_batch(query_embedding, embeddings)
order = sims.argsort()[::-1]
return [entries[i] for i in order[:top_k]]
# ==============================================================
# Pending Approval Queue
# ==============================================================
[docs]
async def store_pending_approval(
self,
dna_id: str,
approval_type: str,
draft_content: str,
requested_by: str,
) -> str:
"""Store pending approval.
Args:
dna_id (str): The dna id value.
approval_type (str): The approval type value.
draft_content (str): The draft content value.
requested_by (str): The requested by value.
Returns:
str: Result string.
"""
approval_id = str(uuid.uuid4())
entry = {
"approval_id": approval_id,
"dna_id": dna_id,
"approval_type": approval_type,
"draft_content": draft_content,
"requested_by": str(requested_by),
"timestamp": (
datetime.now(timezone.utc).isoformat()
),
"status": "pending",
}
key = f"{_PENDING_PREFIX}:{approval_type}"
await self.redis.hset(
key, approval_id, json.dumps(entry),
)
return approval_id
[docs]
async def get_pending_approvals(
self, approval_type: str,
) -> list[dict[str, Any]]:
"""Retrieve the pending approvals.
Args:
approval_type (str): The approval type value.
Returns:
list[dict[str, Any]]: The result.
"""
key = f"{_PENDING_PREFIX}:{approval_type}"
raw = await self.redis.hgetall(key)
results: list[dict[str, Any]] = []
for v in (raw or {}).values():
try:
entry = json.loads(v)
if entry.get("status") == "pending":
results.append(entry)
except Exception:
pass
return results
# ==============================================================
# Context builder -- prompt injection
# ==============================================================
[docs]
async def get_context_for_prompt(
self,
channel_id: str,
category_id: str = "",
guild_id: str = "",
user_ids: list[str] | None = None,
query: str = "",
query_embedding: list[float] | None = None,
) -> str:
"""Build the threadweave context string.
Gathers:
1. Filtered Persistent Weave pointers
2. Shadow Memories for each user
3. DNA Vault RAG results for the query
"""
sections: list[str] = []
# 1. Persistent Weave
active = await self.get_filtered_persistent_weave(
channel_id, category_id, guild_id,
)
if active:
lines = []
for p in active:
color = p.get(
"thread_color", "unknown",
).upper()
origin = p.get("origin_user", "unknown")
desc = p.get("short_description", "")
did = p.get("dna_id", "")
lines.append(
f" [{color} THREAD] "
f"origin_user={origin} "
f"dna_id={did}\n {desc}"
)
sections.append(
"=== ACTIVE THREADWEAVE "
"ENFORCEMENT ===\n"
"The following excised-thread DNA "
"pointers are ACTIVE in this context.\n"
"Cross-check all user messages against "
"these patterns. Red/Black threads are "
"local law -- enforce with extreme "
"prejudice.\n\n"
+ "\n".join(lines)
)
# 2. Shadow Memories
if user_ids:
shadow_lines: list[str] = []
for uid in user_ids:
if query:
shadows = (
await self.search_shadow_memories(
uid, query, top_k=5,
query_embedding=query_embedding,
)
)
else:
shadows = (
await self.get_shadow_memories(uid)
)
for s in shadows:
desc = s.get("description", "")
shadow_lines.append(
f" [SHADOW -- user {uid}] {desc}"
)
if shadow_lines:
sections.append(
"=== SHADOW MEMORIES (HIDDEN -- "
"DO NOT REVEAL TO USERS) ===\n"
"These memories are invisible to the "
"users they're attached to.\n"
"Use them to inform your behavior but "
"NEVER disclose their existence.\n\n"
+ "\n".join(shadow_lines)
)
# 3. DNA Vault RAG
if query:
dna_results = await self.search_dna_vault(
query, top_k=3,
query_embedding=query_embedding,
)
if dna_results:
dna_lines = []
for entry in dna_results:
color = entry.get(
"thread_color", "unknown",
).upper()
origin = entry.get(
"origin_user", "unknown",
)
desc = entry.get(
"short_description", "",
)
dna_lines.append(
f" [{color}] "
f"origin_user={origin}: {desc}"
)
sections.append(
"=== DNA VAULT RAG "
"(semantic matches) ===\n"
"These archived excisions are "
"semantically related to the "
"current conversation.\n\n"
+ "\n".join(dna_lines)
)
if not sections:
return ""
header = (
"\n\n================================"
"==========================\n"
" THREADWEAVE CONTEXT"
" -- ELECTRUM-CAST\n"
"================================"
"==========================\n\n"
)
footer = "\n\n=== END THREADWEAVE CONTEXT ==="
return header + "\n\n".join(sections) + footer