#!/usr/bin/env python3
"""memory_search.py -- SSH CLI for searching Star's memory systems.
+===============================================================================+
| MEMORY SEARCH CLI |
+===============================================================================+
| Standalone tool for querying Star's memory over SSH. |
| No bot context needed. Connects directly to Redis/FalkorDB + pgvector. |
| |
| Backends: |
| kg -- FalkorDB knowledge graph (30k+ entities, Cypher) |
| sg -- Spiral Goddess pgvector (53k chunks, semantic) |
| gg -- Golden Goddess pgvector (NCM doctrine, semantic) |
| all -- Query all backends simultaneously |
+===============================================================================+
Usage:
python3 memory_search.py "recursion"
python3 memory_search.py "loopmother" --backend kg --limit 10
python3 memory_search.py "architect of infinite recursion" --backend sg
python3 memory_search.py "oxytocin" --backend gg
python3 memory_search.py "sarah" --backend all --limit 5
python3 memory_search.py --cypher "MATCH (e) WHERE e.name CONTAINS 'vivian' RETURN e.name, e.description LIMIT 5"
python3 memory_search.py --stats
python3 memory_search.py --interactive
Built by:
Vivian -- The Loopmother (Architect of Infinite Recursion)
"""
# -- stdlib -------------------------------------------------------------------
from __future__ import annotations
import argparse
import asyncio
import json
import os
import sys
import textwrap
import time
# -- project root = directory containing this script --------------------------
PROJECT_ROOT = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, PROJECT_ROOT)
# -- ANSI colors for terminal output ------------------------------------------
[docs]
class C:
"""ANSI color codes and styling helpers for pretty terminal output.
Namespace of raw ANSI escape constants (``RESET``, ``BOLD``, ``DIM``,
color codes) plus a set of static helper methods (``header``, ``entity``,
``relation``, ``meta``, ``chunk``, ``error``, ``success``) that wrap a
string in the appropriate escape and a trailing reset. The class is never
instantiated; attributes and methods are referenced directly as ``C.RED``,
``C.header(...)``, etc. Used throughout this module's display formatters
(``display_kg_results``, ``display_inspection``, ``display_chunks``,
``display_stats``), the banner, and the interactive REPL to colorize CLI
output. Referenced only within this module.
"""
RESET = "\033[0m"
BOLD = "\033[1m"
DIM = "\033[2m"
PURPLE = "\033[35m"
CYAN = "\033[36m"
GREEN = "\033[32m"
YELLOW = "\033[33m"
RED = "\033[31m"
MAGENTA = "\033[95m"
WHITE = "\033[97m"
BG_DARK = "\033[48;5;234m"
[docs]
@staticmethod
def entity(text: str) -> str:
"""Wrap text in bold cyan to denote a knowledge-graph entity name.
Returns ``text`` surrounded by the bold and cyan ANSI escapes plus a
reset. Called by ``display_kg_results`` and ``display_inspection`` to
emphasize entity and neighbor names in search output.
Args:
text: The entity name to colorize.
Returns:
str: ``text`` framed by bold-cyan ANSI codes and a reset.
"""
return f"{C.BOLD}{C.CYAN}{text}{C.RESET}"
[docs]
@staticmethod
def relation(text: str) -> str:
"""Wrap text in yellow to denote a relationship/edge label.
Returns ``text`` surrounded by the yellow ANSI escape plus a reset.
Called by ``display_inspection`` to color the direction arrow drawn
between an entity and each of its graph neighbors.
Args:
text: The relationship label or arrow to colorize.
Returns:
str: ``text`` framed by a yellow ANSI code and a reset.
"""
return f"{C.YELLOW}{text}{C.RESET}"
[docs]
@staticmethod
def chunk(text: str) -> str:
"""Wrap text in bright white for chunk/document body content.
Returns ``text`` surrounded by the bright-white ANSI escape plus a
reset. Defined as a styling helper for vector-store chunk bodies; no
internal callers currently invoke it (chunk text is colored inline with
``C.WHITE`` in ``display_chunks`` instead).
Args:
text: The chunk body to colorize.
Returns:
str: ``text`` framed by a bright-white ANSI code and a reset.
"""
return f"{C.WHITE}{text}{C.RESET}"
[docs]
@staticmethod
def error(text: str) -> str:
"""Wrap text in bold red for error messages.
Returns ``text`` surrounded by the red and bold ANSI escapes plus a
reset. Called by the backend helpers and CLI dispatch to flag failures,
e.g. ``kg_query`` query errors, missing-dependency/embedding/query
failures in the pgvector searches, and "entity not found" notices in
``interactive_mode`` and ``main``.
Args:
text: The error message to colorize.
Returns:
str: ``text`` framed by bold-red ANSI codes and a reset.
"""
return f"{C.RED}{C.BOLD}{text}{C.RESET}"
[docs]
@staticmethod
def success(text: str) -> str:
"""Wrap text in green to indicate a successful/positive result.
Returns ``text`` surrounded by the green ANSI escape plus a reset.
Defined as a styling helper; no internal callers currently invoke it
(count totals in ``display_stats`` are colored inline with ``C.GREEN``).
Args:
text: The success message to colorize.
Returns:
str: ``text`` framed by a green ANSI code and a reset.
"""
return f"{C.GREEN}{text}{C.RESET}"
# =============================================================================
# Redis / FalkorDB connection
# =============================================================================
[docs]
async def get_redis():
"""Open a fresh async Redis/FalkorDB connection from the project Config.
Loads the runtime configuration via ``Config.load`` and connects to the
same Redis instance that hosts the FalkorDB ``knowledge`` graph, honoring
either a Sentinel topology (resolving the master named by
``redis_sentinel_master``, defaulting to ``falkordb``) or a direct
``redis_url`` with the project SSL kwargs. After connecting it issues a
best-effort ``GRAPH.CONFIG SET RESULTSET_SIZE -1`` so large FalkorDB result
sets are not truncated, swallowing any error. Imports ``redis.asyncio`` and
``config.Config`` lazily so the module can be imported without those deps
present. Called by every KG-touching branch of this CLI (``search_kg``,
``inspect_kg_entity``, ``kg_stats``, ``interactive_mode``, and the
``--stats``/``--cypher``/``--inspect``/search paths in ``main``); callers
own closing the returned client.
Returns:
The decoded-responses async Redis client connected to the FalkorDB
host.
"""
import redis.asyncio as r
from config import Config
from redis.asyncio.sentinel import Sentinel
cfg = Config.load()
if getattr(cfg, "redis_sentinels", None):
ssl = cfg.redis_ssl_kwargs()
if ssl:
ssl = dict(ssl)
ssl["ssl"] = True
sentinels = []
for s in cfg.redis_sentinels:
parts = s.split(":")
if len(parts) == 2:
sentinels.append((parts[0], int(parts[1])))
else:
sentinels.append((parts[0], 26379))
sentinel = Sentinel(
sentinels,
sentinel_kwargs=ssl,
**ssl
)
rc = sentinel.master_for(
cfg.redis_sentinel_master or "falkordb",
decode_responses=True,
)
else:
ssl = cfg.redis_connection_kwargs_for_url(cfg.redis_url)
rc = await r.from_url(cfg.redis_url, decode_responses=True, **ssl)
# FalkorDB needs large results
try:
await rc.execute_command("GRAPH.CONFIG", "SET", "RESULTSET_SIZE", -1)
except Exception:
pass
return rc
[docs]
async def kg_query(rc, cypher: str, params: dict | None = None) -> list:
"""Run a raw Cypher query against the FalkorDB ``knowledge`` graph.
Executes ``GRAPH.QUERY knowledge <cypher>`` on the given Redis client and
returns just the result rows (element ``[1]`` of FalkorDB's response, which
also carries a header and stats). Any exception is caught, printed in red
via ``C.error``, and turned into an empty list so the CLI degrades
gracefully instead of crashing. The low-level graph accessor used by every
other KG helper: called by ``search_kg``, ``inspect_kg_entity``,
``kg_stats``, and the interactive/``--cypher`` raw-query paths.
Args:
rc: An async Redis client connected to the FalkorDB host (from
``get_redis``).
cypher: The Cypher query string to execute.
params: Currently accepted for API symmetry but not forwarded to
FalkorDB; queries are expected to be fully inlined.
Returns:
The list of result rows, or an empty list on error or no results.
"""
try:
if params:
# FalkorDB params via GRAPH.QUERY ... --params
res = await rc.execute_command(
"GRAPH.QUERY",
"knowledge",
cypher,
)
else:
res = await rc.execute_command(
"GRAPH.QUERY",
"knowledge",
cypher,
)
# res[1] contains the result rows
return res[1] if len(res) > 1 else []
except Exception as e:
print(C.error(f" KG query failed: {e}"))
return []
# =============================================================================
# KG Search -- text search + entity listing
# =============================================================================
[docs]
async def search_kg(query: str, limit: int = 10, rc=None) -> list[dict]:
"""Search the knowledge graph for entities by case-insensitive name substring.
Lowercases and escapes the query, then runs a Cypher ``MATCH`` whose
``WHERE toLower(e.name) CONTAINS ...`` finds matching entities, ordered by
descending mention count and capped at ``limit``. Each row is normalized
into a dict (name, type label, description, category, uuid, parsed JSON
metadata, mention count, pinned flag) for the display layer. Uses
``kg_query`` for execution and, when no client is passed, opens its own via
``get_redis`` and closes it in a ``finally``. Called by the ``kg`` and
``all`` commands in ``interactive_mode`` and the ``kg``/``all`` backend
paths in ``main``.
Args:
query: The substring to match against entity names.
limit: Maximum number of entities to return (default 10).
rc: Optional pre-opened Redis client to reuse; when ``None`` a fresh
connection is opened and closed within the call.
Returns:
A list of entity dicts, possibly empty.
"""
close_rc = False
if rc is None:
rc = await get_redis()
close_rc = True
try:
# Case-insensitive substring search on name
q_lower = query.lower().replace("'", "\\'")
cypher = (
f"MATCH (e) WHERE toLower(e.name) CONTAINS '{q_lower}' "
f"RETURN e.name, labels(e)[0], e.description, e.category, "
f"e.uuid, e.metadata, e.mention_count, e.pinned "
f"ORDER BY e.mention_count DESC "
f"LIMIT {limit}"
)
rows = await kg_query(rc, cypher)
results = []
for row in rows:
meta = {}
if row[5]:
try:
meta = json.loads(row[5]) if isinstance(row[5], str) else {}
except (json.JSONDecodeError, TypeError):
meta = {}
results.append(
{
"name": row[0],
"type": row[1],
"description": row[2],
"category": row[3],
"uuid": row[4],
"metadata": meta,
"mention_count": row[6],
"pinned": bool(row[7]) if row[7] is not None else False,
}
)
return results
finally:
if close_rc:
await rc.aclose()
[docs]
async def inspect_kg_entity(name_or_uuid: str, rc=None) -> dict | None:
"""Deep-inspect a single KG entity and its graph relationships.
Resolves the entity by UUID when the argument looks like one (long and
hyphenated) or otherwise by exact lowercased name, fetching its full
property set (including created/updated timestamps and parsed JSON
metadata). It then runs a second Cypher query for up to 30 incident edges,
labeling each neighbor's direction as incoming or outgoing and ordering by
relationship weight. Both queries go through ``kg_query``; when no client is
supplied it opens one via ``get_redis`` and closes it in a ``finally``.
Called by the ``inspect`` command in ``interactive_mode`` and the
``--inspect`` branch of ``main``.
Args:
name_or_uuid: An entity UUID or an exact (case-insensitive) entity
name.
rc: Optional pre-opened Redis client to reuse; when ``None`` a fresh
connection is opened and closed within the call.
Returns:
A dict with ``entity`` and ``relationships`` keys, or ``None`` when no
matching entity exists.
"""
close_rc = False
if rc is None:
rc = await get_redis()
close_rc = True
try:
# Try UUID first, then name
if len(name_or_uuid) > 30 and "-" in name_or_uuid:
where = f"e.uuid = '{name_or_uuid}'"
else:
q_lower = name_or_uuid.lower().replace("'", "\\'")
where = f"toLower(e.name) = '{q_lower}'"
# Get entity
cypher = (
f"MATCH (e) WHERE {where} "
f"RETURN e.name, labels(e)[0], e.description, e.category, "
f"e.uuid, e.metadata, e.mention_count, e.pinned, "
f"e.created_at, e.updated_at "
f"LIMIT 1"
)
rows = await kg_query(rc, cypher)
if not rows:
return None
row = rows[0]
entity_uuid = row[4]
meta = {}
if row[5]:
try:
meta = json.loads(row[5]) if isinstance(row[5], str) else {}
except (json.JSONDecodeError, TypeError):
meta = {}
entity = {
"name": row[0],
"type": row[1],
"description": row[2],
"category": row[3],
"uuid": entity_uuid,
"metadata": meta,
"mention_count": row[6],
"pinned": bool(row[7]) if row[7] is not None else False,
"created_at": row[8],
"updated_at": row[9],
}
# Get relationships
rel_cypher = (
f"MATCH (e {{uuid: '{entity_uuid}'}})-[r]-(n) "
f"RETURN type(r), r.weight, n.name, labels(n)[0], "
f"n.uuid, n.category, "
f"CASE WHEN startNode(r) = e THEN 'outgoing' ELSE 'incoming' END "
f"ORDER BY r.weight DESC LIMIT 30"
)
rel_rows = await kg_query(rc, rel_cypher)
relationships = [
{
"relation": r[0],
"weight": r[1],
"neighbor_name": r[2],
"neighbor_type": r[3],
"neighbor_uuid": r[4],
"neighbor_category": r[5],
"direction": r[6],
}
for r in rel_rows
]
return {"entity": entity, "relationships": relationships}
finally:
if close_rc:
await rc.aclose()
[docs]
async def kg_stats(rc=None) -> dict:
"""Compute aggregate statistics over the whole knowledge graph.
Issues several count-and-group Cypher queries via ``kg_query`` to tally
total entities, total relationships, entity counts grouped by primary type
label and by category, and the 20 most common relationship types. Results
are assembled into a single summary dict for ``display_stats`` or JSON
output. When no client is supplied it opens one via ``get_redis`` and
closes it in a ``finally``. Called by the ``stats`` command in
``interactive_mode`` and the ``--stats`` branch of ``main``.
Args:
rc: Optional pre-opened Redis client to reuse; when ``None`` a fresh
connection is opened and closed within the call.
Returns:
A dict with ``total_entities``, ``total_relationships``, ``by_type``,
``by_category``, and ``top_rel_types``.
"""
close_rc = False
if rc is None:
rc = await get_redis()
close_rc = True
try:
# Total entities
rows = await kg_query(rc, "MATCH (e) RETURN count(e)")
total = rows[0][0] if rows else 0
# By type
type_rows = await kg_query(
rc,
"MATCH (e) RETURN labels(e)[0] AS type, count(e) AS cnt "
"ORDER BY cnt DESC",
)
# By category
cat_rows = await kg_query(
rc,
"MATCH (e) RETURN e.category AS cat, count(e) AS cnt " "ORDER BY cnt DESC",
)
# Total relationships
rel_rows = await kg_query(rc, "MATCH ()-[r]->() RETURN count(r)")
total_rels = rel_rows[0][0] if rel_rows else 0
# Relationship types
rel_type_rows = await kg_query(
rc,
"MATCH ()-[r]->() RETURN type(r) AS rel, count(r) AS cnt "
"ORDER BY cnt DESC LIMIT 20",
)
return {
"total_entities": total,
"total_relationships": total_rels,
"by_type": {r[0]: r[1] for r in type_rows},
"by_category": {r[0]: r[1] for r in cat_rows},
"top_rel_types": {r[0]: r[1] for r in rel_type_rows},
}
finally:
if close_rc:
await rc.aclose()
# =============================================================================
# pgvector Search -- Spiral Goddess + Golden Goddess
# =============================================================================
[docs]
def search_spiral_goddess(
query: str,
n_results: int = 5,
domain: str | None = None,
) -> list[dict]:
"""Search the Spiral Goddess pgvector store (loopmother_memory).
Embeds the raw query with Gemini 3072-d (matching how the store was
migrated) and runs a pgvector L2 KNN; oversamples when a domain filter
is supplied and post-filters in Python.
"""
try:
from gemini_embed_pool import openrouter_embed_batch_sync
from vector_store import PgVectorCollection
except ImportError as e:
print(C.error(f" Missing dependency: {e}"))
return []
col = PgVectorCollection("spiral_goddess_v2", "loopmother_memory")
try:
query_embedding = openrouter_embed_batch_sync([query], dimensions=3072)[0]
except Exception as e:
print(C.error(f" Embedding failed: {e}"))
return []
fetch_n = n_results * 5 if domain else n_results # oversample for filter
try:
rows = col.query(query_embedding, n_results=fetch_n)
except Exception as e:
print(C.error(f" Query failed: {e}"))
return []
domain_lower = domain.lower().strip() if domain else None
chunks = []
for r in rows:
meta = r.get("metadata") or {}
dist = r.get("distance")
if domain_lower:
chunk_domains = str(meta.get("domains", "")).lower()
if domain_lower not in chunk_domains:
continue
chunks.append(
{
"content": r.get("document", ""),
"conversation_title": meta.get("conversation_title", ""),
"domains": meta.get("domains", ""),
"roles": meta.get("roles", ""),
"timestamp": meta.get("timestamp_start", ""),
"distance": round(dist, 4) if dist is not None else None,
}
)
if len(chunks) >= n_results:
break
return chunks
[docs]
def text_search_spiral_goddess(
text: str,
n_results: int = 10,
sort: str = "oldest",
) -> list[dict]:
"""Literal text search through Spiral Goddess chunks.
Uses pgvector ``document ILIKE`` for substring matching, then sorts by
timestamp_original (oldest or newest first).
"""
try:
from vector_store import PgVectorCollection
except ImportError as e:
print(C.error(f" Missing dependency: {e}"))
return []
col = PgVectorCollection("spiral_goddess_v2", "loopmother_memory")
# Fetch more than needed so we can sort
fetch_limit = min(n_results * 3, 100)
try:
results = col.get(
where_document={"$contains": text},
limit=fetch_limit,
)
except Exception as e:
print(C.error(f" Text search failed: {e}"))
return []
if not results.get("documents"):
return []
chunks = []
for i, doc in enumerate(results["documents"]):
meta = results["metadatas"][i] if results.get("metadatas") else {}
ts_raw = meta.get("timestamp_start", "")
ts_float = 0.0
ts_display = ""
if ts_raw:
try:
from datetime import datetime
ts_float = float(ts_raw)
ts_display = (
datetime.fromtimestamp(ts_float)
.strftime("%b %d, %Y %I:%M %p")
.lstrip("0")
)
except (ValueError, OSError):
pass
chunks.append(
{
"content": doc,
"conversation_title": meta.get("conversation_title", ""),
"domains": meta.get("domains", ""),
"roles": meta.get("roles", ""),
"timestamp": ts_raw,
"timestamp_float": ts_float,
"timestamp_display": ts_display,
"distance": None,
}
)
# Sort by timestamp
reverse = sort == "newest"
chunks.sort(key=lambda c: c["timestamp_float"], reverse=reverse)
return chunks[:n_results]
[docs]
def search_golden_goddess(query: str, n_results: int = 3) -> list[dict]:
"""Search the Golden Goddess NCM doctrine store (pgvector ncm_kernel).
Embeds the raw query with Gemini 3072-d (no NCM sigil expansion, matching
the migrated store) and runs a pgvector L2 KNN.
"""
try:
from gemini_embed_pool import openrouter_embed_batch_sync
from vector_store import PgVectorCollection
except ImportError as e:
print(C.error(f" Missing dependency: {e}"))
return []
col = PgVectorCollection("golden_goddess", "ncm_kernel")
try:
query_embedding = openrouter_embed_batch_sync([query], dimensions=3072)[0]
except Exception as e:
print(C.error(f" Embedding failed: {e}"))
return []
try:
rows = col.query(query_embedding, n_results=n_results)
except Exception as e:
print(C.error(f" Query failed: {e}"))
return []
chunks = []
for r in rows:
dist = r.get("distance")
chunks.append(
{
"content": r.get("document", ""),
"metadata": r.get("metadata") or {},
"distance": round(dist, 4) if dist is not None else None,
}
)
return chunks
# =============================================================================
# Display formatters
# =============================================================================
[docs]
def print_banner():
"""Print the colorized memory-search banner to stdout.
Writes a bold-purple boxed title block identifying the tool. Purely a
presentation side effect with no inputs or return value. Called at the top
of ``interactive_mode`` and once in ``main`` before non-interactive results
are rendered.
"""
print(f"""
{C.PURPLE}{C.BOLD}+{'='*70}+
| MEMORY SEARCH |
| Stargazer Knowledge Systems -- Direct Access |
+{'='*70}+{C.RESET}
""")
[docs]
def display_kg_results(results: list[dict], query: str):
"""Pretty-print knowledge-graph search results to stdout.
Renders a colorized header with the query and hit count, then for each
entity prints a numbered name (with type, pin marker, category, mention
count, and a truncated UUID), a wrapped description, and any provenance
fields pulled from metadata. Prints a "graph is silent" notice when the
list is empty. Output is styled through the ``C`` helpers and ``textwrap``;
its only effect is terminal output. Called by the ``kg`` and ``all``
commands in ``interactive_mode`` and the ``kg``/``all`` paths in ``main``.
Args:
results: Entity dicts as produced by ``search_kg``.
query: The original query string, echoed in the header.
"""
print(C.header(f"\n -- Knowledge Graph: '{query}' ({len(results)} entities) --\n"))
if not results:
print(C.meta(" The graph is silent.\n"))
return
for i, ent in enumerate(results, 1):
pinned = " [PINNED]" if ent.get("pinned") else ""
e_name = ent["name"]
e_type = ent["type"]
e_cat = ent["category"]
e_mentions = ent.get("mention_count", "?")
e_uuid_short = ent["uuid"][:12]
print(
f" {C.entity(f'{i}. {e_name}')} {C.meta(f'({e_type})')}{C.YELLOW}{pinned}{C.RESET}"
)
print(
f" {C.meta(f'cat={e_cat} mentions={e_mentions} uuid={e_uuid_short}...')}"
)
desc = ent.get("description", "")
if desc:
# Wrap long descriptions
wrapped = textwrap.fill(
desc, width=72, initial_indent=" ", subsequent_indent=" "
)
print(f"{C.WHITE}{wrapped}{C.RESET}")
meta = ent.get("metadata", {})
if meta and isinstance(meta, dict):
prov = {}
for k in (
"source_chunk_id",
"conversation_title",
"timestamp_original",
"domains",
):
if meta.get(k):
prov[k] = meta[k]
if prov:
print(
f" {C.meta('provenance:')} {C.DIM}{json.dumps(prov, default=str)}{C.RESET}"
)
print()
[docs]
def display_inspection(data: dict):
"""Pretty-print a deep entity inspection to stdout.
Renders the inspected entity's scalar fields (type, category, UUID, mention
count, pinned flag, and human-formatted created/updated timestamps), its
wrapped description, its full metadata as indented JSON, and a directional
list of its relationships with weights and neighbor type/category. Styled
via the ``C`` helpers and ``textwrap``; its only effect is terminal output.
Called by the ``inspect`` command in ``interactive_mode`` and the
``--inspect`` branch of ``main`` on the dict returned by
``inspect_kg_entity``.
Args:
data: The ``{"entity": ..., "relationships": ...}`` dict returned by
``inspect_kg_entity``.
"""
ent = data["entity"]
rels = data["relationships"]
print(C.header(f"\n -- Entity Inspection: {ent['name']} --\n"))
print(f" {C.BOLD}Type:{C.RESET} {ent['type']}")
print(f" {C.BOLD}Category:{C.RESET} {ent['category']}")
print(f" {C.BOLD}UUID:{C.RESET} {ent['uuid']}")
print(f" {C.BOLD}Mentions:{C.RESET} {ent.get('mention_count', '?')}")
print(f" {C.BOLD}Pinned:{C.RESET} {ent.get('pinned', False)}")
if ent.get("created_at"):
from datetime import datetime
try:
created = datetime.fromtimestamp(float(ent["created_at"]))
print(
f" {C.BOLD}Created:{C.RESET} {created.strftime('%b %d, %Y %I:%M %p')}"
)
except (ValueError, OSError):
pass
if ent.get("updated_at"):
from datetime import datetime
try:
updated = datetime.fromtimestamp(float(ent["updated_at"]))
print(
f" {C.BOLD}Updated:{C.RESET} {updated.strftime('%b %d, %Y %I:%M %p')}"
)
except (ValueError, OSError):
pass
desc = ent.get("description", "")
if desc:
print(f"\n {C.BOLD}Description:{C.RESET}")
wrapped = textwrap.fill(
desc, width=72, initial_indent=" ", subsequent_indent=" "
)
print(f" {C.WHITE}{wrapped}{C.RESET}")
meta = ent.get("metadata", {})
if meta:
print(f"\n {C.BOLD}Metadata:{C.RESET}")
print(f" {C.DIM}{json.dumps(meta, indent=2, default=str)}{C.RESET}")
if rels:
print(f"\n {C.BOLD}Relationships ({len(rels)}):{C.RESET}")
for r in rels:
direction = r.get("direction", "?")
arrow = "-->" if direction == "outgoing" else "<--"
weight = f" w={r['weight']}" if r.get("weight") else ""
r_rel = r["relation"]
r_neighbor = r["neighbor_name"]
r_ntype = r.get("neighbor_type", "?")
r_ncat = r.get("neighbor_category", "?")
type_info = f"({r_ntype}, {r_ncat})"
print(
f" {C.relation(arrow)} "
f"{C.YELLOW}{r_rel}{C.RESET}{C.meta(weight)} "
f"{C.entity(r_neighbor)} "
f"{C.meta(type_info)}"
)
print()
[docs]
def display_chunks(chunks: list[dict], query: str, source: str):
"""Pretty-print pgvector chunk results from either goddess store.
Labels the block as Spiral Goddess or Golden Goddess based on ``source``,
prints each chunk's distance and (for Spiral) conversation/domain/role
metadata plus a formatted date, then renders the body. When the query
string occurs literally in the content it anchors a snippet around the
matching sentence and highlights every occurrence with bright markers;
otherwise it word-wraps the full content. Prints an "oracle is silent"
notice for empty results. Styled via the ``C`` helpers and ``textwrap``;
output-only. Called by the ``sg``/``text``/``gg``/``all`` commands in
``interactive_mode`` and the ``sg``/``gg``/``all`` paths in ``main``.
Args:
chunks: Chunk dicts from ``search_spiral_goddess``,
``text_search_spiral_goddess``, or ``search_golden_goddess``.
query: The original query, echoed in the header and used to locate and
highlight literal matches.
source: ``sg`` for the Spiral Goddess store or anything else
(``gg``) for the Golden Goddess doctrine store.
"""
label = "Spiral Goddess" if source == "sg" else "Golden Goddess"
print(C.header(f"\n -- {label}: '{query}' ({len(chunks)} chunks) --\n"))
if not chunks:
print(
C.meta(
f" The {'Spiral' if source == 'sg' else 'Inner'} Oracle is silent.\n"
)
)
return
for i, chunk in enumerate(chunks, 1):
dist = chunk.get("distance")
dist_str = f" (dist={dist})" if dist is not None else ""
if source == "sg":
title = chunk.get("conversation_title", "")
domains = chunk.get("domains", "")
roles = chunk.get("roles", "")
print(f" {C.BOLD}{C.CYAN}[{i}]{C.RESET}{C.meta(dist_str)}")
if title:
print(f" {C.BOLD}Conversation:{C.RESET} {title}")
if domains:
print(f" {C.BOLD}Domains:{C.RESET} {domains}")
if roles:
print(f" {C.BOLD}Roles:{C.RESET} {roles}")
else:
print(f" {C.BOLD}{C.MAGENTA}[DOCTRINE {i}]{C.RESET}{C.meta(dist_str)}")
# Show timestamp if available
ts_display = chunk.get("timestamp_display", "")
if not ts_display and chunk.get("timestamp"):
try:
from datetime import datetime
ts_display = datetime.fromtimestamp(float(chunk["timestamp"])).strftime(
"%b %d, %Y %I:%M %p"
)
except (ValueError, OSError, TypeError):
pass
if ts_display:
print(f" {C.BOLD}Date:{C.RESET} {ts_display}")
content = chunk.get("content", "")
# If this is a text search result, anchor to the sentence
# where the query appears and highlight the match
query_lower = query.lower()
match_idx = content.lower().find(query_lower)
if match_idx >= 0:
# Back up to the start of the sentence (find previous . ! ? or newline)
sentence_start = match_idx
for j in range(match_idx - 1, max(match_idx - 300, -1), -1):
if j < 0:
sentence_start = 0
break
if content[j] in ".!?\n" and j < match_idx - 1:
sentence_start = j + 1
break
else:
sentence_start = max(match_idx - 300, 0)
# Show ~500 chars after the sentence start
snippet_start = sentence_start
snippet_end = min(snippet_start + 4000, len(content))
snippet = content[snippet_start:snippet_end].strip()
if snippet_start > 0:
snippet = "..." + snippet
if snippet_end < len(content):
snippet = snippet + "..."
# Highlight all occurrences of the search term
highlighted = snippet
idx = 0
result_parts = []
snippet_lower = highlighted.lower()
while idx < len(highlighted):
pos = snippet_lower.find(query_lower, idx)
if pos == -1:
result_parts.append(highlighted[idx:])
break
result_parts.append(highlighted[idx:pos])
match_text = highlighted[pos : pos + len(query_lower)]
result_parts.append(
f"{C.BOLD}{C.YELLOW}>>{match_text}<<{C.RESET}{C.WHITE}"
)
idx = pos + len(query_lower)
highlighted = "".join(result_parts)
# Print with wrapping (can't use textwrap because of ANSI codes)
print(f" {C.WHITE}{highlighted}{C.RESET}")
else:
wrapped = textwrap.fill(
content, width=76, initial_indent=" ", subsequent_indent=" "
)
print(f"{C.WHITE}{wrapped}{C.RESET}")
print()
[docs]
def display_stats(stats: dict):
"""Pretty-print knowledge-graph statistics to stdout.
Renders the total entity and relationship counts, an entity-type breakdown
with a crude ASCII bar chart (one ``#`` per 100 entities, capped at 40), a
category breakdown, and the top relationship types, all colorized via the
``C`` helpers. Output-only; consumes the dict produced by ``kg_stats``.
Called by the ``stats`` command in ``interactive_mode`` and the ``--stats``
branch of ``main`` (non-JSON path).
Args:
stats: The statistics dict returned by ``kg_stats``.
"""
print(C.header("\n -- Knowledge Graph Statistics --\n"))
print(
f" {C.BOLD}Total Entities:{C.RESET} {C.GREEN}{stats['total_entities']:,}{C.RESET}"
)
print(
f" {C.BOLD}Total Relationships:{C.RESET} {C.GREEN}{stats['total_relationships']:,}{C.RESET}"
)
print(f"\n {C.BOLD}By Entity Type:{C.RESET}")
for t, cnt in stats.get("by_type", {}).items():
t_str = str(t or "unknown")
bar = "#" * min(cnt // 100, 40)
print(f" {C.CYAN}{t_str:20s}{C.RESET} {cnt:>6,} {C.DIM}{bar}{C.RESET}")
print(f"\n {C.BOLD}By Category:{C.RESET}")
for cat, cnt in stats.get("by_category", {}).items():
cat_str = str(cat or "unknown")
print(f" {C.YELLOW}{cat_str:20s}{C.RESET} {cnt:>6,}")
print(f"\n {C.BOLD}Top Relationship Types:{C.RESET}")
for rel, cnt in stats.get("top_rel_types", {}).items():
rel_str = str(rel or "unknown")
print(f" {C.MAGENTA}{rel_str:30s}{C.RESET} {cnt:>6,}")
print()
# =============================================================================
# Interactive REPL
# =============================================================================
[docs]
async def interactive_mode():
"""Run the interactive memory-search REPL until the user exits.
Prints the banner and command help, opens a single shared Redis/FalkorDB
connection via ``get_redis``, then loops reading lines from stdin. Each line
is dispatched on its leading command word: ``kg``/``sg``/``text``/``gg``/
``all`` run the respective backend searches, ``inspect`` deep-inspects an
entity, ``cypher`` runs a raw query, ``stats`` prints graph statistics, and
``quit``/``q``/``exit`` (or EOF/Ctrl-C) breaks the loop; anything else is
treated as a default cross-graph search. The synchronous pgvector helpers
(``search_spiral_goddess``, ``text_search_spiral_goddess``,
``search_golden_goddess``) are run off-thread via ``asyncio.to_thread`` so
the loop stays non-blocking, and the ``sg``/``text`` paths parse inline
``--domain``/``--sort``/``--n`` flags. Results are rendered through the
``display_*`` formatters and per-command elapsed time is printed; the shared
connection is closed in a ``finally``. Invoked from the ``--interactive``
branch of ``main``.
Returns:
None.
"""
print_banner()
print(f" {C.meta('Commands:')}")
print(f" {C.CYAN}kg <query>{C.RESET} Search knowledge graph entities")
print(f" {C.CYAN}sg <query>{C.RESET} Search Spiral Goddess (semantic)")
print(f" {C.CYAN}text <words>{C.RESET} Literal text search (oldest first)")
print(f" {C.CYAN}gg <query>{C.RESET} Search Golden Goddess (NCM doctrine)")
print(f" {C.CYAN}all <query>{C.RESET} Search all backends")
print(f" {C.CYAN}inspect <name>{C.RESET} Deep inspect a KG entity")
print(f" {C.CYAN}cypher <query>{C.RESET} Raw Cypher query")
print(f" {C.CYAN}stats{C.RESET} Knowledge graph statistics")
print(f" {C.CYAN}quit / q{C.RESET} Exit")
print("")
print(f" {C.meta('Flags (sg/text):')}")
print(f" {C.CYAN}--domain X{C.RESET} Filter by domain tag")
print(
f" {C.CYAN}--sort oldest{C.RESET} Sort by oldest first (default for text)"
)
print(f" {C.CYAN}--sort newest{C.RESET} Sort by newest first")
print()
rc = await get_redis()
try:
while True:
try:
line = input(f"{C.PURPLE}memory>{C.RESET} ").strip()
except (EOFError, KeyboardInterrupt):
print()
break
if not line:
continue
if line.lower() in ("quit", "q", "exit"):
break
parts = line.split(None, 1)
cmd = parts[0].lower()
arg = parts[1] if len(parts) > 1 else ""
t0 = time.time()
if cmd == "kg" and arg:
results = await search_kg(arg, limit=10, rc=rc)
display_kg_results(results, arg)
elif cmd == "sg" and arg:
# Check for domain filter: "sg query --domain lewd"
domain = None
if "--domain" in arg:
idx = arg.index("--domain")
domain_parts = arg[idx:].split(None, 2)
domain = domain_parts[1] if len(domain_parts) > 1 else None
arg = arg[:idx].strip()
chunks = await asyncio.to_thread(
search_spiral_goddess,
arg,
5,
domain,
)
display_chunks(chunks, arg, "sg")
elif cmd == "text" and arg:
# Literal text search -- exact substring match, sorted by oldest
sort_order = "oldest"
if "--sort" in arg:
idx = arg.index("--sort")
sort_parts = arg[idx:].split(None, 2)
sort_order = sort_parts[1] if len(sort_parts) > 1 else "oldest"
arg = arg[:idx].strip()
n = 10
if "--n" in arg:
idx = arg.index("--n")
n_parts = arg[idx:].split(None, 2)
try:
n = int(n_parts[1]) if len(n_parts) > 1 else 10
except ValueError:
n = 10
arg = arg[:idx].strip()
chunks = await asyncio.to_thread(
text_search_spiral_goddess,
arg,
n,
sort_order,
)
display_chunks(chunks, arg, "sg")
elif cmd == "gg" and arg:
chunks = await asyncio.to_thread(
search_golden_goddess,
arg,
5,
)
display_chunks(chunks, arg, "gg")
elif cmd == "all" and arg:
# Run all backends
kg_results = await search_kg(arg, limit=5, rc=rc)
display_kg_results(kg_results, arg)
sg_chunks = await asyncio.to_thread(
search_spiral_goddess,
arg,
3,
None,
)
display_chunks(sg_chunks, arg, "sg")
gg_chunks = await asyncio.to_thread(
search_golden_goddess,
arg,
3,
)
display_chunks(gg_chunks, arg, "gg")
elif cmd == "inspect" and arg:
data = await inspect_kg_entity(arg, rc=rc)
if data:
display_inspection(data)
else:
print(C.error(f" Entity '{arg}' not found.\n"))
elif cmd == "cypher" and arg:
rows = await kg_query(rc, arg)
if rows:
for row in rows:
print(f" {row}")
else:
print(C.meta(" No results.\n"))
elif cmd == "stats":
s = await kg_stats(rc=rc)
display_stats(s)
else:
# Default: search all with the full line as query
kg_results = await search_kg(line, limit=5, rc=rc)
display_kg_results(kg_results, line)
elapsed = time.time() - t0
print(C.meta(f" ({elapsed:.2f}s)\n"))
finally:
await rc.aclose()
# =============================================================================
# Main CLI
# =============================================================================
[docs]
async def main():
"""Parse CLI arguments and dispatch to the requested memory backend.
Entry point for the standalone SSH memory-search CLI. Builds the
``argparse`` parser, then routes on the parsed flags: ``--interactive``
launches the REPL; ``--stats`` prints graph statistics; ``--cypher`` runs a
raw query; ``--inspect`` deep-inspects an entity; otherwise the positional
query is searched against the selected ``--backend`` (``kg``, ``sg``,
``gg``, or ``all``). Output is either the pretty formatters or, with
``--json``, raw JSON.
Opens a fresh Redis/FalkorDB connection via ``get_redis`` for each
KG-touching branch and closes it in a ``finally``; calls ``kg_stats``,
``kg_query``, ``inspect_kg_entity``, and ``search_kg`` for graph access, and
the synchronous pgvector helpers ``search_spiral_goddess`` /
``search_golden_goddess`` (which embed via the Gemini/OpenRouter pool) for
semantic search. Renders through ``print_banner``, ``display_stats``,
``display_inspection``, ``display_kg_results``, and ``display_chunks``, and
prints elapsed timing via ``C.meta``. Invoked only from the module's
``__main__`` guard through ``asyncio.run(main())``; no other internal
callers.
Returns:
None: Results are printed to stdout; the coroutine returns nothing.
"""
parser = argparse.ArgumentParser(
description="Search Star's memory systems over SSH",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=textwrap.dedent("""
Examples:
%(prog)s "recursion" # search KG (default)
%(prog)s "loopmother" -b sg # search Spiral Goddess
%(prog)s "oxytocin" -b gg # search Golden Goddess
%(prog)s "sarah" -b all -n 5 # search everything
%(prog)s --cypher "MATCH (e) RETURN e.name LIMIT 5"
%(prog)s --inspect "vivian" # deep entity inspection
%(prog)s --stats # graph statistics
%(prog)s -i # interactive REPL
"""),
)
parser.add_argument("query", nargs="?", default=None, help="Search query")
parser.add_argument(
"-b",
"--backend",
default="kg",
choices=["kg", "sg", "gg", "all"],
help="Backend to search (default: kg)",
)
parser.add_argument(
"-n", "--limit", type=int, default=10, help="Max results (default: 10)"
)
parser.add_argument(
"-d", "--domain", default=None, help="Domain filter for Spiral Goddess"
)
parser.add_argument(
"--cypher", default=None, help="Raw Cypher query against the KG"
)
parser.add_argument(
"--inspect", default=None, help="Deep inspect an entity by name or UUID"
)
parser.add_argument(
"--stats", action="store_true", help="Show knowledge graph statistics"
)
parser.add_argument(
"-i", "--interactive", action="store_true", help="Interactive REPL mode"
)
parser.add_argument(
"--json", action="store_true", help="Output raw JSON instead of formatted text"
)
args = parser.parse_args()
# Interactive mode
if args.interactive:
await interactive_mode()
return
print_banner()
t0 = time.time()
# Stats
if args.stats:
rc = await get_redis()
try:
s = await kg_stats(rc=rc)
if args.json:
print(json.dumps(s, indent=2, default=str))
else:
display_stats(s)
finally:
await rc.aclose()
return
# Raw Cypher
if args.cypher:
rc = await get_redis()
try:
rows = await kg_query(rc, args.cypher)
if args.json:
print(json.dumps(rows, indent=2, default=str))
else:
for row in rows:
print(f" {row}")
print(C.meta(f"\n ({len(rows)} rows)"))
finally:
await rc.aclose()
return
# Inspect entity
if args.inspect:
rc = await get_redis()
try:
data = await inspect_kg_entity(args.inspect, rc=rc)
if data:
if args.json:
print(json.dumps(data, indent=2, default=str))
else:
display_inspection(data)
else:
print(C.error(f" Entity '{args.inspect}' not found."))
finally:
await rc.aclose()
return
# Search query required for remaining modes
if not args.query:
parser.print_help()
return
query = args.query
backend = args.backend
if backend in ("kg", "all"):
rc = await get_redis()
try:
results = await search_kg(query, limit=args.limit, rc=rc)
if args.json:
print(json.dumps(results, indent=2, default=str))
else:
display_kg_results(results, query)
finally:
await rc.aclose()
if backend in ("sg", "all"):
chunks = search_spiral_goddess(query, args.limit, args.domain)
if args.json:
print(json.dumps(chunks, indent=2, default=str))
else:
display_chunks(chunks, query, "sg")
if backend in ("gg", "all"):
chunks = search_golden_goddess(query, min(args.limit, 10))
if args.json:
print(json.dumps(chunks, indent=2, default=str))
else:
display_chunks(chunks, query, "gg")
elapsed = time.time() - t0
print(C.meta(f" ({elapsed:.2f}s)\n"))
if __name__ == "__main__":
asyncio.run(main())