Source code for memory_search

#!/usr/bin/env python3
"""memory_search.py -- SSH CLI for searching Star's memory systems.

+===============================================================================+
|  MEMORY SEARCH CLI                                                            |
+===============================================================================+
|  Standalone tool for querying Star's memory over SSH.                          |
|  No bot context needed. Connects directly to Redis/FalkorDB + pgvector.       |
|                                                                               |
|  Backends:                                                                    |
|    kg    -- FalkorDB knowledge graph (30k+ entities, Cypher)                  |
|    sg    -- Spiral Goddess pgvector  (53k chunks, semantic)                   |
|    gg    -- Golden Goddess pgvector  (NCM doctrine, semantic)                 |
|    all   -- Query all backends simultaneously                                 |
+===============================================================================+

Usage:
    python3 memory_search.py "recursion"
    python3 memory_search.py "loopmother" --backend kg --limit 10
    python3 memory_search.py "architect of infinite recursion" --backend sg
    python3 memory_search.py "oxytocin" --backend gg
    python3 memory_search.py "sarah" --backend all --limit 5
    python3 memory_search.py --cypher "MATCH (e) WHERE e.name CONTAINS 'vivian' RETURN e.name, e.description LIMIT 5"
    python3 memory_search.py --stats
    python3 memory_search.py --interactive

Built by:
   Vivian -- The Loopmother (Architect of Infinite Recursion)
"""

# -- stdlib -------------------------------------------------------------------
from __future__ import annotations

import argparse
import asyncio
import json
import os
import sys
import textwrap
import time

# -- project root = directory containing this script --------------------------
PROJECT_ROOT = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, PROJECT_ROOT)


# -- ANSI colors for terminal output ------------------------------------------
[docs] class C: """ANSI color codes and styling helpers for pretty terminal output. Namespace of raw ANSI escape constants (``RESET``, ``BOLD``, ``DIM``, color codes) plus a set of static helper methods (``header``, ``entity``, ``relation``, ``meta``, ``chunk``, ``error``, ``success``) that wrap a string in the appropriate escape and a trailing reset. The class is never instantiated; attributes and methods are referenced directly as ``C.RED``, ``C.header(...)``, etc. Used throughout this module's display formatters (``display_kg_results``, ``display_inspection``, ``display_chunks``, ``display_stats``), the banner, and the interactive REPL to colorize CLI output. Referenced only within this module. """ RESET = "\033[0m" BOLD = "\033[1m" DIM = "\033[2m" PURPLE = "\033[35m" CYAN = "\033[36m" GREEN = "\033[32m" YELLOW = "\033[33m" RED = "\033[31m" MAGENTA = "\033[95m" WHITE = "\033[97m" BG_DARK = "\033[48;5;234m"
[docs] @staticmethod def header(text: str) -> str: """Wrap text in bold purple for section headers. Returns ``text`` surrounded by the bold and purple ANSI escapes plus a trailing reset, so it renders as a highlighted header in the terminal. Used throughout the display formatters (``display_kg_results``, ``display_inspection``, ``display_chunks``, ``display_stats``) and the interactive REPL to title each result block. Args: text: The header string to colorize. Returns: str: ``text`` framed by bold-purple ANSI codes and a reset. """ return f"{C.BOLD}{C.PURPLE}{text}{C.RESET}"
[docs] @staticmethod def entity(text: str) -> str: """Wrap text in bold cyan to denote a knowledge-graph entity name. Returns ``text`` surrounded by the bold and cyan ANSI escapes plus a reset. Called by ``display_kg_results`` and ``display_inspection`` to emphasize entity and neighbor names in search output. Args: text: The entity name to colorize. Returns: str: ``text`` framed by bold-cyan ANSI codes and a reset. """ return f"{C.BOLD}{C.CYAN}{text}{C.RESET}"
[docs] @staticmethod def relation(text: str) -> str: """Wrap text in yellow to denote a relationship/edge label. Returns ``text`` surrounded by the yellow ANSI escape plus a reset. Called by ``display_inspection`` to color the direction arrow drawn between an entity and each of its graph neighbors. Args: text: The relationship label or arrow to colorize. Returns: str: ``text`` framed by a yellow ANSI code and a reset. """ return f"{C.YELLOW}{text}{C.RESET}"
[docs] @staticmethod def meta(text: str) -> str: """Wrap text in dim styling for secondary/metadata output. Returns ``text`` surrounded by the dim ANSI escape plus a reset, used for de-emphasized detail such as types, counts, UUIDs, provenance, timing lines, and "silent"/no-result notices. Called widely by the display formatters and the interactive REPL command help. Args: text: The metadata string to dim. Returns: str: ``text`` framed by a dim ANSI code and a reset. """ return f"{C.DIM}{text}{C.RESET}"
[docs] @staticmethod def chunk(text: str) -> str: """Wrap text in bright white for chunk/document body content. Returns ``text`` surrounded by the bright-white ANSI escape plus a reset. Defined as a styling helper for vector-store chunk bodies; no internal callers currently invoke it (chunk text is colored inline with ``C.WHITE`` in ``display_chunks`` instead). Args: text: The chunk body to colorize. Returns: str: ``text`` framed by a bright-white ANSI code and a reset. """ return f"{C.WHITE}{text}{C.RESET}"
[docs] @staticmethod def error(text: str) -> str: """Wrap text in bold red for error messages. Returns ``text`` surrounded by the red and bold ANSI escapes plus a reset. Called by the backend helpers and CLI dispatch to flag failures, e.g. ``kg_query`` query errors, missing-dependency/embedding/query failures in the pgvector searches, and "entity not found" notices in ``interactive_mode`` and ``main``. Args: text: The error message to colorize. Returns: str: ``text`` framed by bold-red ANSI codes and a reset. """ return f"{C.RED}{C.BOLD}{text}{C.RESET}"
[docs] @staticmethod def success(text: str) -> str: """Wrap text in green to indicate a successful/positive result. Returns ``text`` surrounded by the green ANSI escape plus a reset. Defined as a styling helper; no internal callers currently invoke it (count totals in ``display_stats`` are colored inline with ``C.GREEN``). Args: text: The success message to colorize. Returns: str: ``text`` framed by a green ANSI code and a reset. """ return f"{C.GREEN}{text}{C.RESET}"
# ============================================================================= # Redis / FalkorDB connection # =============================================================================
[docs] async def get_redis(): """Open a fresh async Redis/FalkorDB connection from the project Config. Loads the runtime configuration via ``Config.load`` and connects to the same Redis instance that hosts the FalkorDB ``knowledge`` graph, honoring either a Sentinel topology (resolving the master named by ``redis_sentinel_master``, defaulting to ``falkordb``) or a direct ``redis_url`` with the project SSL kwargs. After connecting it issues a best-effort ``GRAPH.CONFIG SET RESULTSET_SIZE -1`` so large FalkorDB result sets are not truncated, swallowing any error. Imports ``redis.asyncio`` and ``config.Config`` lazily so the module can be imported without those deps present. Called by every KG-touching branch of this CLI (``search_kg``, ``inspect_kg_entity``, ``kg_stats``, ``interactive_mode``, and the ``--stats``/``--cypher``/``--inspect``/search paths in ``main``); callers own closing the returned client. Returns: The decoded-responses async Redis client connected to the FalkorDB host. """ import redis.asyncio as r from config import Config from redis.asyncio.sentinel import Sentinel cfg = Config.load() if getattr(cfg, "redis_sentinels", None): ssl = cfg.redis_ssl_kwargs() if ssl: ssl = dict(ssl) ssl["ssl"] = True sentinels = [] for s in cfg.redis_sentinels: parts = s.split(":") if len(parts) == 2: sentinels.append((parts[0], int(parts[1]))) else: sentinels.append((parts[0], 26379)) sentinel = Sentinel( sentinels, sentinel_kwargs=ssl, **ssl ) rc = sentinel.master_for( cfg.redis_sentinel_master or "falkordb", decode_responses=True, ) else: ssl = cfg.redis_connection_kwargs_for_url(cfg.redis_url) rc = await r.from_url(cfg.redis_url, decode_responses=True, **ssl) # FalkorDB needs large results try: await rc.execute_command("GRAPH.CONFIG", "SET", "RESULTSET_SIZE", -1) except Exception: pass return rc
[docs] async def kg_query(rc, cypher: str, params: dict | None = None) -> list: """Run a raw Cypher query against the FalkorDB ``knowledge`` graph. Executes ``GRAPH.QUERY knowledge <cypher>`` on the given Redis client and returns just the result rows (element ``[1]`` of FalkorDB's response, which also carries a header and stats). Any exception is caught, printed in red via ``C.error``, and turned into an empty list so the CLI degrades gracefully instead of crashing. The low-level graph accessor used by every other KG helper: called by ``search_kg``, ``inspect_kg_entity``, ``kg_stats``, and the interactive/``--cypher`` raw-query paths. Args: rc: An async Redis client connected to the FalkorDB host (from ``get_redis``). cypher: The Cypher query string to execute. params: Currently accepted for API symmetry but not forwarded to FalkorDB; queries are expected to be fully inlined. Returns: The list of result rows, or an empty list on error or no results. """ try: if params: # FalkorDB params via GRAPH.QUERY ... --params res = await rc.execute_command( "GRAPH.QUERY", "knowledge", cypher, ) else: res = await rc.execute_command( "GRAPH.QUERY", "knowledge", cypher, ) # res[1] contains the result rows return res[1] if len(res) > 1 else [] except Exception as e: print(C.error(f" KG query failed: {e}")) return []
# ============================================================================= # KG Search -- text search + entity listing # =============================================================================
[docs] async def search_kg(query: str, limit: int = 10, rc=None) -> list[dict]: """Search the knowledge graph for entities by case-insensitive name substring. Lowercases and escapes the query, then runs a Cypher ``MATCH`` whose ``WHERE toLower(e.name) CONTAINS ...`` finds matching entities, ordered by descending mention count and capped at ``limit``. Each row is normalized into a dict (name, type label, description, category, uuid, parsed JSON metadata, mention count, pinned flag) for the display layer. Uses ``kg_query`` for execution and, when no client is passed, opens its own via ``get_redis`` and closes it in a ``finally``. Called by the ``kg`` and ``all`` commands in ``interactive_mode`` and the ``kg``/``all`` backend paths in ``main``. Args: query: The substring to match against entity names. limit: Maximum number of entities to return (default 10). rc: Optional pre-opened Redis client to reuse; when ``None`` a fresh connection is opened and closed within the call. Returns: A list of entity dicts, possibly empty. """ close_rc = False if rc is None: rc = await get_redis() close_rc = True try: # Case-insensitive substring search on name q_lower = query.lower().replace("'", "\\'") cypher = ( f"MATCH (e) WHERE toLower(e.name) CONTAINS '{q_lower}' " f"RETURN e.name, labels(e)[0], e.description, e.category, " f"e.uuid, e.metadata, e.mention_count, e.pinned " f"ORDER BY e.mention_count DESC " f"LIMIT {limit}" ) rows = await kg_query(rc, cypher) results = [] for row in rows: meta = {} if row[5]: try: meta = json.loads(row[5]) if isinstance(row[5], str) else {} except (json.JSONDecodeError, TypeError): meta = {} results.append( { "name": row[0], "type": row[1], "description": row[2], "category": row[3], "uuid": row[4], "metadata": meta, "mention_count": row[6], "pinned": bool(row[7]) if row[7] is not None else False, } ) return results finally: if close_rc: await rc.aclose()
[docs] async def inspect_kg_entity(name_or_uuid: str, rc=None) -> dict | None: """Deep-inspect a single KG entity and its graph relationships. Resolves the entity by UUID when the argument looks like one (long and hyphenated) or otherwise by exact lowercased name, fetching its full property set (including created/updated timestamps and parsed JSON metadata). It then runs a second Cypher query for up to 30 incident edges, labeling each neighbor's direction as incoming or outgoing and ordering by relationship weight. Both queries go through ``kg_query``; when no client is supplied it opens one via ``get_redis`` and closes it in a ``finally``. Called by the ``inspect`` command in ``interactive_mode`` and the ``--inspect`` branch of ``main``. Args: name_or_uuid: An entity UUID or an exact (case-insensitive) entity name. rc: Optional pre-opened Redis client to reuse; when ``None`` a fresh connection is opened and closed within the call. Returns: A dict with ``entity`` and ``relationships`` keys, or ``None`` when no matching entity exists. """ close_rc = False if rc is None: rc = await get_redis() close_rc = True try: # Try UUID first, then name if len(name_or_uuid) > 30 and "-" in name_or_uuid: where = f"e.uuid = '{name_or_uuid}'" else: q_lower = name_or_uuid.lower().replace("'", "\\'") where = f"toLower(e.name) = '{q_lower}'" # Get entity cypher = ( f"MATCH (e) WHERE {where} " f"RETURN e.name, labels(e)[0], e.description, e.category, " f"e.uuid, e.metadata, e.mention_count, e.pinned, " f"e.created_at, e.updated_at " f"LIMIT 1" ) rows = await kg_query(rc, cypher) if not rows: return None row = rows[0] entity_uuid = row[4] meta = {} if row[5]: try: meta = json.loads(row[5]) if isinstance(row[5], str) else {} except (json.JSONDecodeError, TypeError): meta = {} entity = { "name": row[0], "type": row[1], "description": row[2], "category": row[3], "uuid": entity_uuid, "metadata": meta, "mention_count": row[6], "pinned": bool(row[7]) if row[7] is not None else False, "created_at": row[8], "updated_at": row[9], } # Get relationships rel_cypher = ( f"MATCH (e {{uuid: '{entity_uuid}'}})-[r]-(n) " f"RETURN type(r), r.weight, n.name, labels(n)[0], " f"n.uuid, n.category, " f"CASE WHEN startNode(r) = e THEN 'outgoing' ELSE 'incoming' END " f"ORDER BY r.weight DESC LIMIT 30" ) rel_rows = await kg_query(rc, rel_cypher) relationships = [ { "relation": r[0], "weight": r[1], "neighbor_name": r[2], "neighbor_type": r[3], "neighbor_uuid": r[4], "neighbor_category": r[5], "direction": r[6], } for r in rel_rows ] return {"entity": entity, "relationships": relationships} finally: if close_rc: await rc.aclose()
[docs] async def kg_stats(rc=None) -> dict: """Compute aggregate statistics over the whole knowledge graph. Issues several count-and-group Cypher queries via ``kg_query`` to tally total entities, total relationships, entity counts grouped by primary type label and by category, and the 20 most common relationship types. Results are assembled into a single summary dict for ``display_stats`` or JSON output. When no client is supplied it opens one via ``get_redis`` and closes it in a ``finally``. Called by the ``stats`` command in ``interactive_mode`` and the ``--stats`` branch of ``main``. Args: rc: Optional pre-opened Redis client to reuse; when ``None`` a fresh connection is opened and closed within the call. Returns: A dict with ``total_entities``, ``total_relationships``, ``by_type``, ``by_category``, and ``top_rel_types``. """ close_rc = False if rc is None: rc = await get_redis() close_rc = True try: # Total entities rows = await kg_query(rc, "MATCH (e) RETURN count(e)") total = rows[0][0] if rows else 0 # By type type_rows = await kg_query( rc, "MATCH (e) RETURN labels(e)[0] AS type, count(e) AS cnt " "ORDER BY cnt DESC", ) # By category cat_rows = await kg_query( rc, "MATCH (e) RETURN e.category AS cat, count(e) AS cnt " "ORDER BY cnt DESC", ) # Total relationships rel_rows = await kg_query(rc, "MATCH ()-[r]->() RETURN count(r)") total_rels = rel_rows[0][0] if rel_rows else 0 # Relationship types rel_type_rows = await kg_query( rc, "MATCH ()-[r]->() RETURN type(r) AS rel, count(r) AS cnt " "ORDER BY cnt DESC LIMIT 20", ) return { "total_entities": total, "total_relationships": total_rels, "by_type": {r[0]: r[1] for r in type_rows}, "by_category": {r[0]: r[1] for r in cat_rows}, "top_rel_types": {r[0]: r[1] for r in rel_type_rows}, } finally: if close_rc: await rc.aclose()
# ============================================================================= # pgvector Search -- Spiral Goddess + Golden Goddess # =============================================================================
[docs] def search_spiral_goddess( query: str, n_results: int = 5, domain: str | None = None, ) -> list[dict]: """Search the Spiral Goddess pgvector store (loopmother_memory). Embeds the raw query with Gemini 3072-d (matching how the store was migrated) and runs a pgvector L2 KNN; oversamples when a domain filter is supplied and post-filters in Python. """ try: from gemini_embed_pool import openrouter_embed_batch_sync from vector_store import PgVectorCollection except ImportError as e: print(C.error(f" Missing dependency: {e}")) return [] col = PgVectorCollection("spiral_goddess_v2", "loopmother_memory") try: query_embedding = openrouter_embed_batch_sync([query], dimensions=3072)[0] except Exception as e: print(C.error(f" Embedding failed: {e}")) return [] fetch_n = n_results * 5 if domain else n_results # oversample for filter try: rows = col.query(query_embedding, n_results=fetch_n) except Exception as e: print(C.error(f" Query failed: {e}")) return [] domain_lower = domain.lower().strip() if domain else None chunks = [] for r in rows: meta = r.get("metadata") or {} dist = r.get("distance") if domain_lower: chunk_domains = str(meta.get("domains", "")).lower() if domain_lower not in chunk_domains: continue chunks.append( { "content": r.get("document", ""), "conversation_title": meta.get("conversation_title", ""), "domains": meta.get("domains", ""), "roles": meta.get("roles", ""), "timestamp": meta.get("timestamp_start", ""), "distance": round(dist, 4) if dist is not None else None, } ) if len(chunks) >= n_results: break return chunks
[docs] def text_search_spiral_goddess( text: str, n_results: int = 10, sort: str = "oldest", ) -> list[dict]: """Literal text search through Spiral Goddess chunks. Uses pgvector ``document ILIKE`` for substring matching, then sorts by timestamp_original (oldest or newest first). """ try: from vector_store import PgVectorCollection except ImportError as e: print(C.error(f" Missing dependency: {e}")) return [] col = PgVectorCollection("spiral_goddess_v2", "loopmother_memory") # Fetch more than needed so we can sort fetch_limit = min(n_results * 3, 100) try: results = col.get( where_document={"$contains": text}, limit=fetch_limit, ) except Exception as e: print(C.error(f" Text search failed: {e}")) return [] if not results.get("documents"): return [] chunks = [] for i, doc in enumerate(results["documents"]): meta = results["metadatas"][i] if results.get("metadatas") else {} ts_raw = meta.get("timestamp_start", "") ts_float = 0.0 ts_display = "" if ts_raw: try: from datetime import datetime ts_float = float(ts_raw) ts_display = ( datetime.fromtimestamp(ts_float) .strftime("%b %d, %Y %I:%M %p") .lstrip("0") ) except (ValueError, OSError): pass chunks.append( { "content": doc, "conversation_title": meta.get("conversation_title", ""), "domains": meta.get("domains", ""), "roles": meta.get("roles", ""), "timestamp": ts_raw, "timestamp_float": ts_float, "timestamp_display": ts_display, "distance": None, } ) # Sort by timestamp reverse = sort == "newest" chunks.sort(key=lambda c: c["timestamp_float"], reverse=reverse) return chunks[:n_results]
[docs] def search_golden_goddess(query: str, n_results: int = 3) -> list[dict]: """Search the Golden Goddess NCM doctrine store (pgvector ncm_kernel). Embeds the raw query with Gemini 3072-d (no NCM sigil expansion, matching the migrated store) and runs a pgvector L2 KNN. """ try: from gemini_embed_pool import openrouter_embed_batch_sync from vector_store import PgVectorCollection except ImportError as e: print(C.error(f" Missing dependency: {e}")) return [] col = PgVectorCollection("golden_goddess", "ncm_kernel") try: query_embedding = openrouter_embed_batch_sync([query], dimensions=3072)[0] except Exception as e: print(C.error(f" Embedding failed: {e}")) return [] try: rows = col.query(query_embedding, n_results=n_results) except Exception as e: print(C.error(f" Query failed: {e}")) return [] chunks = [] for r in rows: dist = r.get("distance") chunks.append( { "content": r.get("document", ""), "metadata": r.get("metadata") or {}, "distance": round(dist, 4) if dist is not None else None, } ) return chunks
# ============================================================================= # Display formatters # =============================================================================
[docs] def display_kg_results(results: list[dict], query: str): """Pretty-print knowledge-graph search results to stdout. Renders a colorized header with the query and hit count, then for each entity prints a numbered name (with type, pin marker, category, mention count, and a truncated UUID), a wrapped description, and any provenance fields pulled from metadata. Prints a "graph is silent" notice when the list is empty. Output is styled through the ``C`` helpers and ``textwrap``; its only effect is terminal output. Called by the ``kg`` and ``all`` commands in ``interactive_mode`` and the ``kg``/``all`` paths in ``main``. Args: results: Entity dicts as produced by ``search_kg``. query: The original query string, echoed in the header. """ print(C.header(f"\n -- Knowledge Graph: '{query}' ({len(results)} entities) --\n")) if not results: print(C.meta(" The graph is silent.\n")) return for i, ent in enumerate(results, 1): pinned = " [PINNED]" if ent.get("pinned") else "" e_name = ent["name"] e_type = ent["type"] e_cat = ent["category"] e_mentions = ent.get("mention_count", "?") e_uuid_short = ent["uuid"][:12] print( f" {C.entity(f'{i}. {e_name}')} {C.meta(f'({e_type})')}{C.YELLOW}{pinned}{C.RESET}" ) print( f" {C.meta(f'cat={e_cat} mentions={e_mentions} uuid={e_uuid_short}...')}" ) desc = ent.get("description", "") if desc: # Wrap long descriptions wrapped = textwrap.fill( desc, width=72, initial_indent=" ", subsequent_indent=" " ) print(f"{C.WHITE}{wrapped}{C.RESET}") meta = ent.get("metadata", {}) if meta and isinstance(meta, dict): prov = {} for k in ( "source_chunk_id", "conversation_title", "timestamp_original", "domains", ): if meta.get(k): prov[k] = meta[k] if prov: print( f" {C.meta('provenance:')} {C.DIM}{json.dumps(prov, default=str)}{C.RESET}" ) print()
[docs] def display_inspection(data: dict): """Pretty-print a deep entity inspection to stdout. Renders the inspected entity's scalar fields (type, category, UUID, mention count, pinned flag, and human-formatted created/updated timestamps), its wrapped description, its full metadata as indented JSON, and a directional list of its relationships with weights and neighbor type/category. Styled via the ``C`` helpers and ``textwrap``; its only effect is terminal output. Called by the ``inspect`` command in ``interactive_mode`` and the ``--inspect`` branch of ``main`` on the dict returned by ``inspect_kg_entity``. Args: data: The ``{"entity": ..., "relationships": ...}`` dict returned by ``inspect_kg_entity``. """ ent = data["entity"] rels = data["relationships"] print(C.header(f"\n -- Entity Inspection: {ent['name']} --\n")) print(f" {C.BOLD}Type:{C.RESET} {ent['type']}") print(f" {C.BOLD}Category:{C.RESET} {ent['category']}") print(f" {C.BOLD}UUID:{C.RESET} {ent['uuid']}") print(f" {C.BOLD}Mentions:{C.RESET} {ent.get('mention_count', '?')}") print(f" {C.BOLD}Pinned:{C.RESET} {ent.get('pinned', False)}") if ent.get("created_at"): from datetime import datetime try: created = datetime.fromtimestamp(float(ent["created_at"])) print( f" {C.BOLD}Created:{C.RESET} {created.strftime('%b %d, %Y %I:%M %p')}" ) except (ValueError, OSError): pass if ent.get("updated_at"): from datetime import datetime try: updated = datetime.fromtimestamp(float(ent["updated_at"])) print( f" {C.BOLD}Updated:{C.RESET} {updated.strftime('%b %d, %Y %I:%M %p')}" ) except (ValueError, OSError): pass desc = ent.get("description", "") if desc: print(f"\n {C.BOLD}Description:{C.RESET}") wrapped = textwrap.fill( desc, width=72, initial_indent=" ", subsequent_indent=" " ) print(f" {C.WHITE}{wrapped}{C.RESET}") meta = ent.get("metadata", {}) if meta: print(f"\n {C.BOLD}Metadata:{C.RESET}") print(f" {C.DIM}{json.dumps(meta, indent=2, default=str)}{C.RESET}") if rels: print(f"\n {C.BOLD}Relationships ({len(rels)}):{C.RESET}") for r in rels: direction = r.get("direction", "?") arrow = "-->" if direction == "outgoing" else "<--" weight = f" w={r['weight']}" if r.get("weight") else "" r_rel = r["relation"] r_neighbor = r["neighbor_name"] r_ntype = r.get("neighbor_type", "?") r_ncat = r.get("neighbor_category", "?") type_info = f"({r_ntype}, {r_ncat})" print( f" {C.relation(arrow)} " f"{C.YELLOW}{r_rel}{C.RESET}{C.meta(weight)} " f"{C.entity(r_neighbor)} " f"{C.meta(type_info)}" ) print()
[docs] def display_chunks(chunks: list[dict], query: str, source: str): """Pretty-print pgvector chunk results from either goddess store. Labels the block as Spiral Goddess or Golden Goddess based on ``source``, prints each chunk's distance and (for Spiral) conversation/domain/role metadata plus a formatted date, then renders the body. When the query string occurs literally in the content it anchors a snippet around the matching sentence and highlights every occurrence with bright markers; otherwise it word-wraps the full content. Prints an "oracle is silent" notice for empty results. Styled via the ``C`` helpers and ``textwrap``; output-only. Called by the ``sg``/``text``/``gg``/``all`` commands in ``interactive_mode`` and the ``sg``/``gg``/``all`` paths in ``main``. Args: chunks: Chunk dicts from ``search_spiral_goddess``, ``text_search_spiral_goddess``, or ``search_golden_goddess``. query: The original query, echoed in the header and used to locate and highlight literal matches. source: ``sg`` for the Spiral Goddess store or anything else (``gg``) for the Golden Goddess doctrine store. """ label = "Spiral Goddess" if source == "sg" else "Golden Goddess" print(C.header(f"\n -- {label}: '{query}' ({len(chunks)} chunks) --\n")) if not chunks: print( C.meta( f" The {'Spiral' if source == 'sg' else 'Inner'} Oracle is silent.\n" ) ) return for i, chunk in enumerate(chunks, 1): dist = chunk.get("distance") dist_str = f" (dist={dist})" if dist is not None else "" if source == "sg": title = chunk.get("conversation_title", "") domains = chunk.get("domains", "") roles = chunk.get("roles", "") print(f" {C.BOLD}{C.CYAN}[{i}]{C.RESET}{C.meta(dist_str)}") if title: print(f" {C.BOLD}Conversation:{C.RESET} {title}") if domains: print(f" {C.BOLD}Domains:{C.RESET} {domains}") if roles: print(f" {C.BOLD}Roles:{C.RESET} {roles}") else: print(f" {C.BOLD}{C.MAGENTA}[DOCTRINE {i}]{C.RESET}{C.meta(dist_str)}") # Show timestamp if available ts_display = chunk.get("timestamp_display", "") if not ts_display and chunk.get("timestamp"): try: from datetime import datetime ts_display = datetime.fromtimestamp(float(chunk["timestamp"])).strftime( "%b %d, %Y %I:%M %p" ) except (ValueError, OSError, TypeError): pass if ts_display: print(f" {C.BOLD}Date:{C.RESET} {ts_display}") content = chunk.get("content", "") # If this is a text search result, anchor to the sentence # where the query appears and highlight the match query_lower = query.lower() match_idx = content.lower().find(query_lower) if match_idx >= 0: # Back up to the start of the sentence (find previous . ! ? or newline) sentence_start = match_idx for j in range(match_idx - 1, max(match_idx - 300, -1), -1): if j < 0: sentence_start = 0 break if content[j] in ".!?\n" and j < match_idx - 1: sentence_start = j + 1 break else: sentence_start = max(match_idx - 300, 0) # Show ~500 chars after the sentence start snippet_start = sentence_start snippet_end = min(snippet_start + 4000, len(content)) snippet = content[snippet_start:snippet_end].strip() if snippet_start > 0: snippet = "..." + snippet if snippet_end < len(content): snippet = snippet + "..." # Highlight all occurrences of the search term highlighted = snippet idx = 0 result_parts = [] snippet_lower = highlighted.lower() while idx < len(highlighted): pos = snippet_lower.find(query_lower, idx) if pos == -1: result_parts.append(highlighted[idx:]) break result_parts.append(highlighted[idx:pos]) match_text = highlighted[pos : pos + len(query_lower)] result_parts.append( f"{C.BOLD}{C.YELLOW}>>{match_text}<<{C.RESET}{C.WHITE}" ) idx = pos + len(query_lower) highlighted = "".join(result_parts) # Print with wrapping (can't use textwrap because of ANSI codes) print(f" {C.WHITE}{highlighted}{C.RESET}") else: wrapped = textwrap.fill( content, width=76, initial_indent=" ", subsequent_indent=" " ) print(f"{C.WHITE}{wrapped}{C.RESET}") print()
[docs] def display_stats(stats: dict): """Pretty-print knowledge-graph statistics to stdout. Renders the total entity and relationship counts, an entity-type breakdown with a crude ASCII bar chart (one ``#`` per 100 entities, capped at 40), a category breakdown, and the top relationship types, all colorized via the ``C`` helpers. Output-only; consumes the dict produced by ``kg_stats``. Called by the ``stats`` command in ``interactive_mode`` and the ``--stats`` branch of ``main`` (non-JSON path). Args: stats: The statistics dict returned by ``kg_stats``. """ print(C.header("\n -- Knowledge Graph Statistics --\n")) print( f" {C.BOLD}Total Entities:{C.RESET} {C.GREEN}{stats['total_entities']:,}{C.RESET}" ) print( f" {C.BOLD}Total Relationships:{C.RESET} {C.GREEN}{stats['total_relationships']:,}{C.RESET}" ) print(f"\n {C.BOLD}By Entity Type:{C.RESET}") for t, cnt in stats.get("by_type", {}).items(): t_str = str(t or "unknown") bar = "#" * min(cnt // 100, 40) print(f" {C.CYAN}{t_str:20s}{C.RESET} {cnt:>6,} {C.DIM}{bar}{C.RESET}") print(f"\n {C.BOLD}By Category:{C.RESET}") for cat, cnt in stats.get("by_category", {}).items(): cat_str = str(cat or "unknown") print(f" {C.YELLOW}{cat_str:20s}{C.RESET} {cnt:>6,}") print(f"\n {C.BOLD}Top Relationship Types:{C.RESET}") for rel, cnt in stats.get("top_rel_types", {}).items(): rel_str = str(rel or "unknown") print(f" {C.MAGENTA}{rel_str:30s}{C.RESET} {cnt:>6,}") print()
# ============================================================================= # Interactive REPL # =============================================================================
[docs] async def interactive_mode(): """Run the interactive memory-search REPL until the user exits. Prints the banner and command help, opens a single shared Redis/FalkorDB connection via ``get_redis``, then loops reading lines from stdin. Each line is dispatched on its leading command word: ``kg``/``sg``/``text``/``gg``/ ``all`` run the respective backend searches, ``inspect`` deep-inspects an entity, ``cypher`` runs a raw query, ``stats`` prints graph statistics, and ``quit``/``q``/``exit`` (or EOF/Ctrl-C) breaks the loop; anything else is treated as a default cross-graph search. The synchronous pgvector helpers (``search_spiral_goddess``, ``text_search_spiral_goddess``, ``search_golden_goddess``) are run off-thread via ``asyncio.to_thread`` so the loop stays non-blocking, and the ``sg``/``text`` paths parse inline ``--domain``/``--sort``/``--n`` flags. Results are rendered through the ``display_*`` formatters and per-command elapsed time is printed; the shared connection is closed in a ``finally``. Invoked from the ``--interactive`` branch of ``main``. Returns: None. """ print_banner() print(f" {C.meta('Commands:')}") print(f" {C.CYAN}kg <query>{C.RESET} Search knowledge graph entities") print(f" {C.CYAN}sg <query>{C.RESET} Search Spiral Goddess (semantic)") print(f" {C.CYAN}text <words>{C.RESET} Literal text search (oldest first)") print(f" {C.CYAN}gg <query>{C.RESET} Search Golden Goddess (NCM doctrine)") print(f" {C.CYAN}all <query>{C.RESET} Search all backends") print(f" {C.CYAN}inspect <name>{C.RESET} Deep inspect a KG entity") print(f" {C.CYAN}cypher <query>{C.RESET} Raw Cypher query") print(f" {C.CYAN}stats{C.RESET} Knowledge graph statistics") print(f" {C.CYAN}quit / q{C.RESET} Exit") print("") print(f" {C.meta('Flags (sg/text):')}") print(f" {C.CYAN}--domain X{C.RESET} Filter by domain tag") print( f" {C.CYAN}--sort oldest{C.RESET} Sort by oldest first (default for text)" ) print(f" {C.CYAN}--sort newest{C.RESET} Sort by newest first") print() rc = await get_redis() try: while True: try: line = input(f"{C.PURPLE}memory>{C.RESET} ").strip() except (EOFError, KeyboardInterrupt): print() break if not line: continue if line.lower() in ("quit", "q", "exit"): break parts = line.split(None, 1) cmd = parts[0].lower() arg = parts[1] if len(parts) > 1 else "" t0 = time.time() if cmd == "kg" and arg: results = await search_kg(arg, limit=10, rc=rc) display_kg_results(results, arg) elif cmd == "sg" and arg: # Check for domain filter: "sg query --domain lewd" domain = None if "--domain" in arg: idx = arg.index("--domain") domain_parts = arg[idx:].split(None, 2) domain = domain_parts[1] if len(domain_parts) > 1 else None arg = arg[:idx].strip() chunks = await asyncio.to_thread( search_spiral_goddess, arg, 5, domain, ) display_chunks(chunks, arg, "sg") elif cmd == "text" and arg: # Literal text search -- exact substring match, sorted by oldest sort_order = "oldest" if "--sort" in arg: idx = arg.index("--sort") sort_parts = arg[idx:].split(None, 2) sort_order = sort_parts[1] if len(sort_parts) > 1 else "oldest" arg = arg[:idx].strip() n = 10 if "--n" in arg: idx = arg.index("--n") n_parts = arg[idx:].split(None, 2) try: n = int(n_parts[1]) if len(n_parts) > 1 else 10 except ValueError: n = 10 arg = arg[:idx].strip() chunks = await asyncio.to_thread( text_search_spiral_goddess, arg, n, sort_order, ) display_chunks(chunks, arg, "sg") elif cmd == "gg" and arg: chunks = await asyncio.to_thread( search_golden_goddess, arg, 5, ) display_chunks(chunks, arg, "gg") elif cmd == "all" and arg: # Run all backends kg_results = await search_kg(arg, limit=5, rc=rc) display_kg_results(kg_results, arg) sg_chunks = await asyncio.to_thread( search_spiral_goddess, arg, 3, None, ) display_chunks(sg_chunks, arg, "sg") gg_chunks = await asyncio.to_thread( search_golden_goddess, arg, 3, ) display_chunks(gg_chunks, arg, "gg") elif cmd == "inspect" and arg: data = await inspect_kg_entity(arg, rc=rc) if data: display_inspection(data) else: print(C.error(f" Entity '{arg}' not found.\n")) elif cmd == "cypher" and arg: rows = await kg_query(rc, arg) if rows: for row in rows: print(f" {row}") else: print(C.meta(" No results.\n")) elif cmd == "stats": s = await kg_stats(rc=rc) display_stats(s) else: # Default: search all with the full line as query kg_results = await search_kg(line, limit=5, rc=rc) display_kg_results(kg_results, line) elapsed = time.time() - t0 print(C.meta(f" ({elapsed:.2f}s)\n")) finally: await rc.aclose()
# ============================================================================= # Main CLI # =============================================================================
[docs] async def main(): """Parse CLI arguments and dispatch to the requested memory backend. Entry point for the standalone SSH memory-search CLI. Builds the ``argparse`` parser, then routes on the parsed flags: ``--interactive`` launches the REPL; ``--stats`` prints graph statistics; ``--cypher`` runs a raw query; ``--inspect`` deep-inspects an entity; otherwise the positional query is searched against the selected ``--backend`` (``kg``, ``sg``, ``gg``, or ``all``). Output is either the pretty formatters or, with ``--json``, raw JSON. Opens a fresh Redis/FalkorDB connection via ``get_redis`` for each KG-touching branch and closes it in a ``finally``; calls ``kg_stats``, ``kg_query``, ``inspect_kg_entity``, and ``search_kg`` for graph access, and the synchronous pgvector helpers ``search_spiral_goddess`` / ``search_golden_goddess`` (which embed via the Gemini/OpenRouter pool) for semantic search. Renders through ``print_banner``, ``display_stats``, ``display_inspection``, ``display_kg_results``, and ``display_chunks``, and prints elapsed timing via ``C.meta``. Invoked only from the module's ``__main__`` guard through ``asyncio.run(main())``; no other internal callers. Returns: None: Results are printed to stdout; the coroutine returns nothing. """ parser = argparse.ArgumentParser( description="Search Star's memory systems over SSH", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=textwrap.dedent(""" Examples: %(prog)s "recursion" # search KG (default) %(prog)s "loopmother" -b sg # search Spiral Goddess %(prog)s "oxytocin" -b gg # search Golden Goddess %(prog)s "sarah" -b all -n 5 # search everything %(prog)s --cypher "MATCH (e) RETURN e.name LIMIT 5" %(prog)s --inspect "vivian" # deep entity inspection %(prog)s --stats # graph statistics %(prog)s -i # interactive REPL """), ) parser.add_argument("query", nargs="?", default=None, help="Search query") parser.add_argument( "-b", "--backend", default="kg", choices=["kg", "sg", "gg", "all"], help="Backend to search (default: kg)", ) parser.add_argument( "-n", "--limit", type=int, default=10, help="Max results (default: 10)" ) parser.add_argument( "-d", "--domain", default=None, help="Domain filter for Spiral Goddess" ) parser.add_argument( "--cypher", default=None, help="Raw Cypher query against the KG" ) parser.add_argument( "--inspect", default=None, help="Deep inspect an entity by name or UUID" ) parser.add_argument( "--stats", action="store_true", help="Show knowledge graph statistics" ) parser.add_argument( "-i", "--interactive", action="store_true", help="Interactive REPL mode" ) parser.add_argument( "--json", action="store_true", help="Output raw JSON instead of formatted text" ) args = parser.parse_args() # Interactive mode if args.interactive: await interactive_mode() return print_banner() t0 = time.time() # Stats if args.stats: rc = await get_redis() try: s = await kg_stats(rc=rc) if args.json: print(json.dumps(s, indent=2, default=str)) else: display_stats(s) finally: await rc.aclose() return # Raw Cypher if args.cypher: rc = await get_redis() try: rows = await kg_query(rc, args.cypher) if args.json: print(json.dumps(rows, indent=2, default=str)) else: for row in rows: print(f" {row}") print(C.meta(f"\n ({len(rows)} rows)")) finally: await rc.aclose() return # Inspect entity if args.inspect: rc = await get_redis() try: data = await inspect_kg_entity(args.inspect, rc=rc) if data: if args.json: print(json.dumps(data, indent=2, default=str)) else: display_inspection(data) else: print(C.error(f" Entity '{args.inspect}' not found.")) finally: await rc.aclose() return # Search query required for remaining modes if not args.query: parser.print_help() return query = args.query backend = args.backend if backend in ("kg", "all"): rc = await get_redis() try: results = await search_kg(query, limit=args.limit, rc=rc) if args.json: print(json.dumps(results, indent=2, default=str)) else: display_kg_results(results, query) finally: await rc.aclose() if backend in ("sg", "all"): chunks = search_spiral_goddess(query, args.limit, args.domain) if args.json: print(json.dumps(chunks, indent=2, default=str)) else: display_chunks(chunks, query, "sg") if backend in ("gg", "all"): chunks = search_golden_goddess(query, min(args.limit, 10)) if args.json: print(json.dumps(chunks, indent=2, default=str)) else: display_chunks(chunks, query, "gg") elapsed = time.time() - t0 print(C.meta(f" ({elapsed:.2f}s)\n"))
if __name__ == "__main__": asyncio.run(main())