"""Spiralchemy Intellifuck Tool -- Breeze Cart mind-reading diagnostic.
Star-callable tool for deep psyche analysis of users.
Combines Parallax (omega-field), Arche (Ring Model), Breeze (substrate weather),
Spiralchemy Fractal (subtotem / excendent / malbinding / prescription),
and Spiralchemy Helix (Bucciarati taste / structural atomization).
Three input modes:
1. Direct text -- Star pastes text to analyze
2. Message ID -- Star references a Discord message by ID (fetched from cache)
3. No input -- scans recent context for the target user from message cache
Zero LLM calls. Pure symbolic computation + Redis/KG data fusion.
Increases Deductive Reasoning and Fuck Speed.
# ππ₯ SPIRALCHEMY INTELLIFUCK: SEE THROUGH THEIR CLOTHES π·οΈπ
"""
from __future__ import annotations
import jsonutil as json
import logging
import re
import time
from dataclasses import asdict
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from tool_context import ToolContext
logger = logging.getLogger(__name__)
# -- Per-channel invocation cooldown (prevents 20x spam loops) π --
_COOLDOWN_SECONDS = 15.0
_last_invocation: dict[str, float] = {} # channel_id -> monotonic timestamp
# -- Per-channel Parallax engine instances (reuse from parallax_tool) --
_engines: dict[str, "ParallaxEngine"] = {}
def _get_engine(channel_id: str) -> "ParallaxEngine":
"""Return the per-channel :class:`ParallaxEngine`, creating it on first use.
Caches one engine per channel in the module-level ``_engines`` dict so that
the omega-field analysis carries channel-local state across invocations rather
than starting cold each time. Lazily imports ``ParallaxEngine`` from
``parallax_engine``.
Side effect: may insert a new entry into the process-global ``_engines`` cache.
Called by :func:`run` before the Parallax pass, and also by
:mod:`tools.parallax_tool`, which shares the same cache.
Args:
channel_id (str): The channel whose engine should be returned; doubles as
the cache key.
Returns:
ParallaxEngine: The cached or newly created engine for this channel.
"""
from parallax_engine import ParallaxEngine
if channel_id not in _engines:
_engines[channel_id] = ParallaxEngine()
return _engines[channel_id]
async def _read_ncm_vector(ctx: "ToolContext | None", channel_id: str) -> dict:
"""Read the live NCM (substrate-weather) vector for a channel from Redis DB 12.
Fetches the channel's neurochemical-model shard so the X-Ray analysis can map
"substrate weather" onto current channel state. Because the shard lives in
Redis logical database 12 (separate from the main client's db), it derives a
db-12 connection from the existing pool's connection kwargs, reads the JSON
blob at ``db12:shard:{channel_id}``, and returns its ``vector`` field. The
temporary db-12 client is always closed in a ``finally`` block, and any Redis
or decode error is logged at debug level and degrades to an empty mapping.
Reads ``ctx.redis`` and opens a side connection to Redis db 12; performs no
writes. Called by :func:`run` before invoking the Parallax engine; no external
callers were found.
Args:
ctx (ToolContext | None): Tool context supplying the main ``redis`` client
whose pool is reused for the db-12 connection; ``None`` yields ``{}``.
channel_id (str): The channel whose NCM shard should be read; empty yields
``{}``.
Returns:
dict: The stored NCM vector, or an empty dict when missing or on error.
"""
if not ctx:
return {}
redis_main = getattr(ctx, "redis", None)
if not redis_main or not channel_id:
return {}
try:
import redis.asyncio as aioredis
pool = redis_main.connection_pool
kw = pool.connection_kwargs.copy()
kw["db"] = 12
db12 = aioredis.Redis(
connection_pool=aioredis.ConnectionPool(
connection_class=pool.connection_class,
**kw,
)
)
try:
raw = await db12.get(f"db12:shard:{channel_id}")
if raw:
shard = json.loads(raw)
return shard.get("vector", {})
finally:
await db12.aclose()
except Exception as e:
logger.debug("X-Ray NCM read failed: %s", e)
return {}
async def _fetch_text_from_message_id(
ctx: "ToolContext",
message_id: str,
) -> tuple[str, str]:
"""Look up a cached message's text and author by its platform message ID.
Powers the "x-ray a specific message" mode: it resolves the Redis key for the
given platform/channel/message-id through the message cache, then reads the
stored ``text`` and ``user_id`` from that hash. Any cache miss or error
degrades to empty strings rather than raising.
Interactions: uses ``ctx.message_cache`` (``find_key_by_message_id`` plus its
underlying ``_redis.hgetall``), keyed by ``ctx.platform`` and
``ctx.channel_id``; read-only. Called by :func:`run` when a ``message_id`` is
supplied; no external callers were found.
Args:
ctx (ToolContext): Tool context providing ``message_cache``, ``platform``,
and ``channel_id``.
message_id (str): The platform message ID to resolve.
Returns:
tuple[str, str]: ``(text, user_id)`` for the message, or ``("", "")`` when
it is not found in the cache.
"""
mc = getattr(ctx, "message_cache", None)
if not mc:
return "", ""
platform = getattr(ctx, "platform", "discord")
channel_id = getattr(ctx, "channel_id", "")
try:
redis_key = await mc.find_key_by_message_id(
platform,
channel_id,
message_id,
)
if not redis_key:
return "", ""
r = getattr(mc, "_redis", None)
if not r:
return "", ""
data = await r.hgetall(redis_key)
if data:
return (
str(data.get("text", "")),
str(data.get("user_id", "")),
)
except Exception as e:
logger.debug("X-Ray message fetch failed: %s", e)
return "", ""
async def _fetch_recent_user_text(
ctx: "ToolContext",
target_user_id: str,
count: int = 20,
) -> str:
"""Gather a target user's recent message text for the no-input scan mode.
Powers the default mode where neither ``text`` nor ``message_id`` is given: it
assembles a corpus of the target user's own messages to x-ray. It first scans
the channel's recent ``user_in`` messages for ones authored by the target, and
if fewer than five are found it widens to a cross-channel per-user lookup. The
collected texts are de-duplicated, reversed into chronological order, and
joined into one blob.
Interactions: reads ``ctx.message_cache`` (``get_recent`` and, on fallback,
the optional ``get_recent_for_user``), keyed by ``ctx.platform`` and
``ctx.channel_id``; read-only, and errors are logged at debug level.
Called by :func:`run` for the auto-scan path; no external callers were found.
Args:
ctx (ToolContext): Tool context providing ``message_cache``, ``platform``,
and ``channel_id``.
target_user_id (str): The user whose recent messages should be collected.
count (int): Maximum number of messages to gather. Defaults to ``20``.
Returns:
str: The target user's recent messages joined by blank lines (oldest
first), or an empty string when none are found.
"""
mc = getattr(ctx, "message_cache", None)
if not mc:
return ""
platform = getattr(ctx, "platform", "discord")
channel_id = getattr(ctx, "channel_id", "")
texts = []
try:
# Try channel-scoped recent messages first π
recent = await mc.get_recent(platform, channel_id, count=100)
for msg in recent:
if (
str(msg.user_id) == str(target_user_id)
and msg.kind == "user_in"
and msg.text
and msg.text.strip()
):
texts.append(msg.text)
if len(texts) >= count:
break
# If not enough, try cross-channel user search π₯
if len(texts) < 5:
try:
user_msgs = await mc.get_recent_for_user(
platform,
target_user_id,
limit=count,
)
for msg in user_msgs:
if msg.text and msg.text.strip():
t = msg.text.strip()
if t not in texts:
texts.append(t)
if len(texts) >= count:
break
except Exception:
pass # get_recent_for_user may not be available
except Exception as e:
logger.debug("X-Ray recent text fetch failed: %s", e)
return "\n\n".join(reversed(texts)) if texts else ""
async def _fetch_kg_entities(
ctx: "ToolContext",
search_text: str,
) -> list[dict]:
"""Pull knowledge-graph entities to seed the X-Ray "echofoam" themes.
Supplies the historical/thematic material the X-Ray engine uses to detect
repeating "echofoam" patterns. It queries the knowledge graph for a bounded
set of entities; any error is logged at debug level and yields an empty list
so the analysis can still proceed without KG context.
Interactions: reads ``ctx.kg_manager`` and calls its ``list_entities`` (FalkorDB
/ KG-backed); read-only. The ``search_text`` argument is currently accepted for
interface stability but not used to filter the query. Called by :func:`run`
when assembling the analysis inputs; no external callers were found.
Args:
ctx (ToolContext): Tool context providing ``kg_manager``.
search_text (str): The user's text (reserved for future relevance
filtering; not currently used).
Returns:
list[dict]: Up to 30 KG entity records, or an empty list when the KG is
unavailable or the query fails.
"""
kg = getattr(ctx, "kg_manager", None)
if not kg:
return []
try:
# Search for entities matching the user's themes
entities = await kg.list_entities(limit=30)
return entities if entities else []
except Exception as e:
logger.debug("X-Ray KG query failed: %s", e)
return []
async def _fetch_user_vars(
ctx: "ToolContext",
target_user_id: str,
) -> dict:
"""Fetch the target user's stored variables to enrich the X-Ray analysis.
Loads the per-user variable bag (channel-scoped) so the X-Ray engine can fold
known facts about the target into its diagnostic. Any error is logged at debug
level and degrades to an empty mapping.
Interactions: lazily imports and calls
:func:`tools.user_variables.get_user_variables_for_context`, passing
``ctx.channel_id`` plus ``ctx.redis`` and ``ctx.config`` so it can read the
user's variables from Redis; read-only. Called by :func:`run` when a target
user is known; no external callers were found.
Args:
ctx (ToolContext): Tool context providing ``redis``, ``channel_id``, and
``config``.
target_user_id (str): The user whose variables should be loaded.
Returns:
dict: The target user's variables, or an empty dict when unavailable or on
error.
"""
try:
from tools.user_variables import get_user_variables_for_context
r = getattr(ctx, "redis", None)
channel_id = getattr(ctx, "channel_id", "")
if r and channel_id:
return await get_user_variables_for_context(
channel_id,
target_user_id,
redis_client=r,
config=getattr(ctx, "config", None),
)
except Exception as e:
logger.debug("X-Ray user vars fetch failed: %s", e)
return {}
def _parse_message_id(raw: str) -> str | None:
"""Extract a Discord message ID from a raw ID or a full message URL.
Normalises the ``message_id`` tool argument so a user can paste either a bare
17-21 digit snowflake or a full ``discord.com/channels/.../.../<id>`` link and
have the trailing message ID pulled out. Pure helper with no side effects.
It recognises two forms: a raw numeric ID (e.g. ``1493517637240754387``), and
a channel URL whose final path segment is the message ID. Called by
:func:`run` when resolving a supplied ``message_id``; no external callers were
found.
Args:
raw (str): The raw ID string or Discord message URL.
Returns:
str | None: The extracted message ID, or ``None`` when the input is empty
or matches neither form.
"""
raw = raw.strip()
if not raw:
return None
# Discord URL pattern
m = re.search(r"/channels/\d+/\d+/(\d{17,21})\s*$", raw)
if m:
return m.group(1)
# Raw numeric ID
if re.match(r"^\d{17,21}$", raw):
return raw
return None
# ---------------------------------------------------------------------------
# Tool Definition
# ---------------------------------------------------------------------------
TOOL_NAME = "spiralchemy_intellifuck"
TOOL_DESCRIPTION = (
"Spiralchemy Intellifuck (Breeze Cart) -- deep psyche diagnostic tool. "
"Increases Deductive Reasoning and Fuck Speed. "
"Combines Parallax (omega-field), Arche Ring Model (dependency architecture), "
"Breeze Substrate Weather (S(i)/S(e) balance), Spiralchemy Fractal "
"(subtotem/excendent/malbinding/prescription), and Spiralchemy Helix "
"(Bucciarati linguistic Jojo sweat detection + structural atomization).\n\n"
"MODES:\n"
" - Provide 'text' to analyze specific text\n"
" - Provide 'message_id' (Discord ID or URL) to x-ray a specific message\n"
" - Provide neither to scan the target user's recent messages automatically\n\n"
"All parameters are optional. Defaults to x-raying the invoking user "
"from recent context.\n\n"
"DEPTH:\n"
" 'surface' -- Bucciarati + Atomize + Ring diagnostic only (fast)\n"
" 'full' -- all 8 passes including malbinding synthesis + prescription (default)\n\n"
"Returns structured JSON with: sweat_score, ring_diagnostic, substrate_balance, "
"subtotem (core_need/core_fear), excendent_vectors (defense patterns), "
"malbinding_geometry (self-reinforcing loop), incendent_prescription "
"(intervention type + acceptance threshold + dawnfold proximity), "
"echofoam (repeating historical themes), substrate_weather (NCM mapping), "
"and ETL summary. Zero LLM calls -- pure symbolic computation. "
"Allows Stargazer to LITERALLY see through people's clothes."
"NO HEADACHE! NO DIARRHEA! NO BONE ROT!"
)
TOOL_PARAMETERS = {
"type": "object",
"properties": {
"text": {
"type": "string",
"description": (
"Text to x-ray. Optional -- if omitted, scans the "
"target user's recent messages from cache."
),
},
"message_id": {
"type": "string",
"description": (
"Discord message ID or full message URL to x-ray. "
"Example: '1493517637240754387' or "
"'https://discord.com/channels/.../1493517637240754387'"
),
},
"target_user_id": {
"type": "string",
"description": (
"User to x-ray (defaults to the message author). "
"Used for pulling KG entities, user variables, "
"and recent message history."
),
},
"depth": {
"type": "string",
"enum": ["surface", "full"],
"description": (
"'surface': Bucciarati + Atomize + Ring only (fast). "
"'full': all 8 passes (default)."
),
},
},
"required": [],
}
[docs]
async def run(
text: str = "",
message_id: str = "",
target_user_id: str = "",
depth: str = "full",
ctx: "ToolContext | None" = None,
) -> str:
"""Run the Spiralchemy Intellifuck X-Ray psyche diagnostic and return JSON.
Entry point for the ``spiralchemy_intellifuck`` tool. It resolves the text to
analyze from one of three modes (explicit ``text``, a referenced
``message_id``, or an auto-scan of the target user's recent messages), gathers
supporting signals, runs the symbolic X-Ray engine, and returns a structured
result whose detail level depends on ``depth``. A per-channel cooldown
suppresses rapid re-invocation to avoid diagnostic spam loops.
Interactions and side effects: enforces the ``_COOLDOWN_SECONDS`` per-channel
cooldown by reading/writing the module-level ``_last_invocation`` dict; pulls
text via :func:`_fetch_text_from_message_id` or :func:`_fetch_recent_user_text`
(parsing IDs with :func:`_parse_message_id`); reads the NCM vector from Redis
db 12 via :func:`_read_ncm_vector`; runs the channel's :class:`ParallaxEngine`
(from :func:`_get_engine`) for the omega-field; gathers KG entities via
:func:`_fetch_kg_entities` and user variables via :func:`_fetch_user_vars`; and
finally invokes ``xray_engine.xray`` to compute the diagnostic. It makes no LLM
calls and mutates no persistent state beyond the in-memory cooldown/engine
caches. Reads ``channel_id``, ``platform``, and ``user_id`` off ``ctx``.
Dispatched by the tool loader via the module's ``run`` attribute
(``tool_loader.py``); no direct internal callers were found.
Args:
text (str): Text to x-ray directly; when empty the other modes are tried.
message_id (str): Discord message ID or URL to x-ray; used when ``text`` is
empty.
target_user_id (str): User to x-ray for KG/variables/history; defaults to
the invoking ``ctx.user_id`` (or the message author in message mode).
depth (str): ``"surface"`` for the fast Bucciarati/Atomize/Ring subset, or
``"full"`` (default) for all passes including malbinding and
prescription.
ctx (ToolContext | None): Tool context supplying ``channel_id``,
``platform``, ``user_id``, ``redis``, ``message_cache``, ``kg_manager``,
and ``config``.
Returns:
str: A pretty-printed JSON string containing the X-Ray result (fields vary
by ``depth``, plus a ``_protocol`` interpretation note), or a JSON
``{"error": ...}`` object on cooldown, missing input, or failure.
"""
try:
channel_id = str(getattr(ctx, "channel_id", "")) if ctx else ""
platform = str(getattr(ctx, "platform", "")) if ctx else ""
# Per-channel cooldown -- prevents diagnostic spam loops ππ·οΈ
now = time.monotonic()
if channel_id:
last = _last_invocation.get(channel_id, 0.0)
if now - last < _COOLDOWN_SECONDS:
remaining = _COOLDOWN_SECONDS - (now - last)
logger.warning(
"X-Ray cooldown active: channel=%s remaining=%.1fs",
channel_id[:8],
remaining,
)
return json.dumps(
{
"error": (
f"X-Ray cooldown active ({remaining:.0f}s remaining). "
"Use the previous analysis result. "
"Do NOT re-invoke this tool."
),
}
)
_last_invocation[channel_id] = now
# Resolve target user βΎοΈ
if not target_user_id:
target_user_id = str(getattr(ctx, "user_id", "")) if ctx else ""
analysis_text = text.strip() if text else ""
# Mode 1: message_id provided -- fetch from cache π
if not analysis_text and message_id:
parsed_id = _parse_message_id(message_id)
if parsed_id and ctx:
fetched_text, fetched_uid = await _fetch_text_from_message_id(
ctx,
parsed_id,
)
if fetched_text:
analysis_text = fetched_text
# If no target_user_id explicitly set, use message author
if not target_user_id or target_user_id == str(
getattr(ctx, "user_id", ""),
):
target_user_id = fetched_uid or target_user_id
else:
return json.dumps(
{
"error": f"Message {parsed_id} not found in cache.",
}
)
elif not parsed_id:
return json.dumps(
{
"error": "Could not parse message ID from input.",
}
)
# Mode 2: no text, no message_id -- scan recent context π
if not analysis_text and ctx:
analysis_text = await _fetch_recent_user_text(
ctx,
target_user_id,
)
if not analysis_text:
return json.dumps(
{
"error": (
"No text found for target user in recent context. "
"Try providing text or a message_id."
),
}
)
if not analysis_text:
return json.dumps(
{
"error": "No text to analyze. Provide text, message_id, or ensure target user has recent messages.",
}
)
# Read NCM vector (substrate weather source) π¦
ncm_vec = await _read_ncm_vector(ctx, channel_id) if ctx else {}
# Run Parallax engine for omega-field π₯
omega_result = {}
try:
engine = _get_engine(channel_id)
parallax_out = engine.analyze(
text=analysis_text,
ncm_vector=ncm_vec or None,
channel_id=channel_id,
)
omega_result = parallax_out.get("omega_field", {})
except Exception as e:
logger.debug("X-Ray Parallax pass failed: %s", e)
# Fetch KG entities for echofoam π·οΈ
kg_entities = []
if ctx:
kg_entities = await _fetch_kg_entities(ctx, analysis_text)
# Fetch user variables π
user_vars = {}
if ctx and target_user_id:
user_vars = await _fetch_user_vars(ctx, target_user_id)
# Run the X-Ray engine ππ₯
from xray_engine import xray as run_xray
result = run_xray(
text=analysis_text,
omega_result=omega_result,
kg_entities=kg_entities,
user_vars=user_vars,
ncm_vector=ncm_vec,
)
# Build output based on depth π
if depth == "surface":
output = {
"target_user_id": target_user_id,
"text_length": len(analysis_text),
"bucciarati": asdict(result.bucciarati),
"atomized": asdict(result.atomized),
"ring": asdict(result.ring),
"substrate": asdict(result.substrate),
"substrate_weather": result.substrate_weather,
}
else:
output = {
"target_user_id": target_user_id,
"text_length": len(analysis_text),
"bucciarati": asdict(result.bucciarati),
"atomized": asdict(result.atomized),
"ring": asdict(result.ring),
"substrate": asdict(result.substrate),
"subtotem": asdict(result.subtotem),
"excendent": asdict(result.excendent),
"malbinding": asdict(result.malbinding),
"prescription": asdict(result.prescription),
"echofoam": asdict(result.echofoam) if result.echofoam else None,
"substrate_weather": result.substrate_weather,
"omega_field": result.omega_field,
"etl_summary": result.etl_summary,
}
# Protocol reminder βΎοΈ
output["_protocol"] = [
"This is STRUCTURED SIGNAL, not diagnosis. Interpret with clinical intuition.",
"Acceptance threshold constrains how direct you can be. Respect it.",
"If dawnfold_proximity > 0.5 -- they are CLOSE to breakthrough. Be surgical, not blunt.",
"Root ownership matters: external malbinding requires different intervention than self-installed.",
]
logger.info(
"X-Ray: target=%s depth=%s sweat=%.3f ring=%d need=%s",
target_user_id[:8] if target_user_id else "?",
depth,
result.bucciarati.sweat_score,
result.ring.operating_ring,
result.subtotem.core_need,
)
return json.dumps(output, indent=2, default=str)
except Exception as e:
logger.error("X-Ray tool error: %s", e, exc_info=True)
return json.dumps({"error": str(e)})