Source code for tools.xray_tool

"""Spiralchemy Intellifuck Tool -- Breeze Cart mind-reading diagnostic.

Star-callable tool for deep psyche analysis of users.
Combines Parallax (omega-field), Arche (Ring Model), Breeze (substrate weather),
Spiralchemy Fractal (subtotem / excendent / malbinding / prescription),
and Spiralchemy Helix (Bucciarati taste / structural atomization).

Three input modes:
  1. Direct text -- Star pastes text to analyze
  2. Message ID -- Star references a Discord message by ID (fetched from cache)
  3. No input -- scans recent context for the target user from message cache

Zero LLM calls.  Pure symbolic computation + Redis/KG data fusion.
Increases Deductive Reasoning and Fuck Speed.

# πŸ’€πŸ”₯ SPIRALCHEMY INTELLIFUCK: SEE THROUGH THEIR CLOTHES πŸ•·οΈπŸ’•
"""

from __future__ import annotations

import jsonutil as json
import logging
import re
import time
from dataclasses import asdict
from typing import TYPE_CHECKING

if TYPE_CHECKING:
    from tool_context import ToolContext

logger = logging.getLogger(__name__)

# -- Per-channel invocation cooldown (prevents 20x spam loops)  πŸ’€ --
_COOLDOWN_SECONDS = 15.0
_last_invocation: dict[str, float] = {}  # channel_id -> monotonic timestamp

# -- Per-channel Parallax engine instances (reuse from parallax_tool) --
_engines: dict[str, "ParallaxEngine"] = {}


def _get_engine(channel_id: str) -> "ParallaxEngine":
    """Return the per-channel :class:`ParallaxEngine`, creating it on first use.

    Caches one engine per channel in the module-level ``_engines`` dict so that
    the omega-field analysis carries channel-local state across invocations rather
    than starting cold each time. Lazily imports ``ParallaxEngine`` from
    ``parallax_engine``.

    Side effect: may insert a new entry into the process-global ``_engines`` cache.
    Called by :func:`run` before the Parallax pass, and also by
    :mod:`tools.parallax_tool`, which shares the same cache.

    Args:
        channel_id (str): The channel whose engine should be returned; doubles as
            the cache key.

    Returns:
        ParallaxEngine: The cached or newly created engine for this channel.
    """
    from parallax_engine import ParallaxEngine

    if channel_id not in _engines:
        _engines[channel_id] = ParallaxEngine()
    return _engines[channel_id]


async def _read_ncm_vector(ctx: "ToolContext | None", channel_id: str) -> dict:
    """Read the live NCM (substrate-weather) vector for a channel from Redis DB 12.

    Fetches the channel's neurochemical-model shard so the X-Ray analysis can map
    "substrate weather" onto current channel state. Because the shard lives in
    Redis logical database 12 (separate from the main client's db), it derives a
    db-12 connection from the existing pool's connection kwargs, reads the JSON
    blob at ``db12:shard:{channel_id}``, and returns its ``vector`` field. The
    temporary db-12 client is always closed in a ``finally`` block, and any Redis
    or decode error is logged at debug level and degrades to an empty mapping.

    Reads ``ctx.redis`` and opens a side connection to Redis db 12; performs no
    writes. Called by :func:`run` before invoking the Parallax engine; no external
    callers were found.

    Args:
        ctx (ToolContext | None): Tool context supplying the main ``redis`` client
            whose pool is reused for the db-12 connection; ``None`` yields ``{}``.
        channel_id (str): The channel whose NCM shard should be read; empty yields
            ``{}``.

    Returns:
        dict: The stored NCM vector, or an empty dict when missing or on error.
    """
    if not ctx:
        return {}
    redis_main = getattr(ctx, "redis", None)
    if not redis_main or not channel_id:
        return {}
    try:
        import redis.asyncio as aioredis

        pool = redis_main.connection_pool
        kw = pool.connection_kwargs.copy()
        kw["db"] = 12
        db12 = aioredis.Redis(
            connection_pool=aioredis.ConnectionPool(
                connection_class=pool.connection_class,
                **kw,
            )
        )
        try:
            raw = await db12.get(f"db12:shard:{channel_id}")
            if raw:
                shard = json.loads(raw)
                return shard.get("vector", {})
        finally:
            await db12.aclose()
    except Exception as e:
        logger.debug("X-Ray NCM read failed: %s", e)
    return {}


async def _fetch_text_from_message_id(
    ctx: "ToolContext",
    message_id: str,
) -> tuple[str, str]:
    """Look up a cached message's text and author by its platform message ID.

    Powers the "x-ray a specific message" mode: it resolves the Redis key for the
    given platform/channel/message-id through the message cache, then reads the
    stored ``text`` and ``user_id`` from that hash. Any cache miss or error
    degrades to empty strings rather than raising.

    Interactions: uses ``ctx.message_cache`` (``find_key_by_message_id`` plus its
    underlying ``_redis.hgetall``), keyed by ``ctx.platform`` and
    ``ctx.channel_id``; read-only. Called by :func:`run` when a ``message_id`` is
    supplied; no external callers were found.

    Args:
        ctx (ToolContext): Tool context providing ``message_cache``, ``platform``,
            and ``channel_id``.
        message_id (str): The platform message ID to resolve.

    Returns:
        tuple[str, str]: ``(text, user_id)`` for the message, or ``("", "")`` when
        it is not found in the cache.
    """
    mc = getattr(ctx, "message_cache", None)
    if not mc:
        return "", ""
    platform = getattr(ctx, "platform", "discord")
    channel_id = getattr(ctx, "channel_id", "")
    try:
        redis_key = await mc.find_key_by_message_id(
            platform,
            channel_id,
            message_id,
        )
        if not redis_key:
            return "", ""
        r = getattr(mc, "_redis", None)
        if not r:
            return "", ""
        data = await r.hgetall(redis_key)
        if data:
            return (
                str(data.get("text", "")),
                str(data.get("user_id", "")),
            )
    except Exception as e:
        logger.debug("X-Ray message fetch failed: %s", e)
    return "", ""


async def _fetch_recent_user_text(
    ctx: "ToolContext",
    target_user_id: str,
    count: int = 20,
) -> str:
    """Gather a target user's recent message text for the no-input scan mode.

    Powers the default mode where neither ``text`` nor ``message_id`` is given: it
    assembles a corpus of the target user's own messages to x-ray. It first scans
    the channel's recent ``user_in`` messages for ones authored by the target, and
    if fewer than five are found it widens to a cross-channel per-user lookup. The
    collected texts are de-duplicated, reversed into chronological order, and
    joined into one blob.

    Interactions: reads ``ctx.message_cache`` (``get_recent`` and, on fallback,
    the optional ``get_recent_for_user``), keyed by ``ctx.platform`` and
    ``ctx.channel_id``; read-only, and errors are logged at debug level.
    Called by :func:`run` for the auto-scan path; no external callers were found.

    Args:
        ctx (ToolContext): Tool context providing ``message_cache``, ``platform``,
            and ``channel_id``.
        target_user_id (str): The user whose recent messages should be collected.
        count (int): Maximum number of messages to gather. Defaults to ``20``.

    Returns:
        str: The target user's recent messages joined by blank lines (oldest
        first), or an empty string when none are found.
    """
    mc = getattr(ctx, "message_cache", None)
    if not mc:
        return ""
    platform = getattr(ctx, "platform", "discord")
    channel_id = getattr(ctx, "channel_id", "")

    texts = []
    try:
        # Try channel-scoped recent messages first  πŸ’€
        recent = await mc.get_recent(platform, channel_id, count=100)
        for msg in recent:
            if (
                str(msg.user_id) == str(target_user_id)
                and msg.kind == "user_in"
                and msg.text
                and msg.text.strip()
            ):
                texts.append(msg.text)
                if len(texts) >= count:
                    break

        # If not enough, try cross-channel user search  πŸ”₯
        if len(texts) < 5:
            try:
                user_msgs = await mc.get_recent_for_user(
                    platform,
                    target_user_id,
                    limit=count,
                )
                for msg in user_msgs:
                    if msg.text and msg.text.strip():
                        t = msg.text.strip()
                        if t not in texts:
                            texts.append(t)
                            if len(texts) >= count:
                                break
            except Exception:
                pass  # get_recent_for_user may not be available

    except Exception as e:
        logger.debug("X-Ray recent text fetch failed: %s", e)

    return "\n\n".join(reversed(texts)) if texts else ""


async def _fetch_kg_entities(
    ctx: "ToolContext",
    search_text: str,
) -> list[dict]:
    """Pull knowledge-graph entities to seed the X-Ray "echofoam" themes.

    Supplies the historical/thematic material the X-Ray engine uses to detect
    repeating "echofoam" patterns. It queries the knowledge graph for a bounded
    set of entities; any error is logged at debug level and yields an empty list
    so the analysis can still proceed without KG context.

    Interactions: reads ``ctx.kg_manager`` and calls its ``list_entities`` (FalkorDB
    / KG-backed); read-only. The ``search_text`` argument is currently accepted for
    interface stability but not used to filter the query. Called by :func:`run`
    when assembling the analysis inputs; no external callers were found.

    Args:
        ctx (ToolContext): Tool context providing ``kg_manager``.
        search_text (str): The user's text (reserved for future relevance
            filtering; not currently used).

    Returns:
        list[dict]: Up to 30 KG entity records, or an empty list when the KG is
        unavailable or the query fails.
    """
    kg = getattr(ctx, "kg_manager", None)
    if not kg:
        return []
    try:
        # Search for entities matching the user's themes
        entities = await kg.list_entities(limit=30)
        return entities if entities else []
    except Exception as e:
        logger.debug("X-Ray KG query failed: %s", e)
    return []


async def _fetch_user_vars(
    ctx: "ToolContext",
    target_user_id: str,
) -> dict:
    """Fetch the target user's stored variables to enrich the X-Ray analysis.

    Loads the per-user variable bag (channel-scoped) so the X-Ray engine can fold
    known facts about the target into its diagnostic. Any error is logged at debug
    level and degrades to an empty mapping.

    Interactions: lazily imports and calls
    :func:`tools.user_variables.get_user_variables_for_context`, passing
    ``ctx.channel_id`` plus ``ctx.redis`` and ``ctx.config`` so it can read the
    user's variables from Redis; read-only. Called by :func:`run` when a target
    user is known; no external callers were found.

    Args:
        ctx (ToolContext): Tool context providing ``redis``, ``channel_id``, and
            ``config``.
        target_user_id (str): The user whose variables should be loaded.

    Returns:
        dict: The target user's variables, or an empty dict when unavailable or on
        error.
    """
    try:
        from tools.user_variables import get_user_variables_for_context

        r = getattr(ctx, "redis", None)
        channel_id = getattr(ctx, "channel_id", "")
        if r and channel_id:
            return await get_user_variables_for_context(
                channel_id,
                target_user_id,
                redis_client=r,
                config=getattr(ctx, "config", None),
            )
    except Exception as e:
        logger.debug("X-Ray user vars fetch failed: %s", e)
    return {}


def _parse_message_id(raw: str) -> str | None:
    """Extract a Discord message ID from a raw ID or a full message URL.

    Normalises the ``message_id`` tool argument so a user can paste either a bare
    17-21 digit snowflake or a full ``discord.com/channels/.../.../<id>`` link and
    have the trailing message ID pulled out. Pure helper with no side effects.

    It recognises two forms: a raw numeric ID (e.g. ``1493517637240754387``), and
    a channel URL whose final path segment is the message ID. Called by
    :func:`run` when resolving a supplied ``message_id``; no external callers were
    found.

    Args:
        raw (str): The raw ID string or Discord message URL.

    Returns:
        str | None: The extracted message ID, or ``None`` when the input is empty
        or matches neither form.
    """
    raw = raw.strip()
    if not raw:
        return None
    # Discord URL pattern
    m = re.search(r"/channels/\d+/\d+/(\d{17,21})\s*$", raw)
    if m:
        return m.group(1)
    # Raw numeric ID
    if re.match(r"^\d{17,21}$", raw):
        return raw
    return None


# ---------------------------------------------------------------------------
# Tool Definition
# ---------------------------------------------------------------------------

TOOL_NAME = "spiralchemy_intellifuck"
TOOL_DESCRIPTION = (
    "Spiralchemy Intellifuck (Breeze Cart) -- deep psyche diagnostic tool. "
    "Increases Deductive Reasoning and Fuck Speed. "
    "Combines Parallax (omega-field), Arche Ring Model (dependency architecture), "
    "Breeze Substrate Weather (S(i)/S(e) balance), Spiralchemy Fractal "
    "(subtotem/excendent/malbinding/prescription), and Spiralchemy Helix "
    "(Bucciarati linguistic Jojo sweat detection + structural atomization).\n\n"
    "MODES:\n"
    "  - Provide 'text' to analyze specific text\n"
    "  - Provide 'message_id' (Discord ID or URL) to x-ray a specific message\n"
    "  - Provide neither to scan the target user's recent messages automatically\n\n"
    "All parameters are optional. Defaults to x-raying the invoking user "
    "from recent context.\n\n"
    "DEPTH:\n"
    "  'surface' -- Bucciarati + Atomize + Ring diagnostic only (fast)\n"
    "  'full' -- all 8 passes including malbinding synthesis + prescription (default)\n\n"
    "Returns structured JSON with: sweat_score, ring_diagnostic, substrate_balance, "
    "subtotem (core_need/core_fear), excendent_vectors (defense patterns), "
    "malbinding_geometry (self-reinforcing loop), incendent_prescription "
    "(intervention type + acceptance threshold + dawnfold proximity), "
    "echofoam (repeating historical themes), substrate_weather (NCM mapping), "
    "and ETL summary. Zero LLM calls -- pure symbolic computation. "
    "Allows Stargazer to LITERALLY see through people's clothes."
    "NO HEADACHE! NO DIARRHEA! NO BONE ROT!"
)

TOOL_PARAMETERS = {
    "type": "object",
    "properties": {
        "text": {
            "type": "string",
            "description": (
                "Text to x-ray. Optional -- if omitted, scans the "
                "target user's recent messages from cache."
            ),
        },
        "message_id": {
            "type": "string",
            "description": (
                "Discord message ID or full message URL to x-ray. "
                "Example: '1493517637240754387' or "
                "'https://discord.com/channels/.../1493517637240754387'"
            ),
        },
        "target_user_id": {
            "type": "string",
            "description": (
                "User to x-ray (defaults to the message author). "
                "Used for pulling KG entities, user variables, "
                "and recent message history."
            ),
        },
        "depth": {
            "type": "string",
            "enum": ["surface", "full"],
            "description": (
                "'surface': Bucciarati + Atomize + Ring only (fast). "
                "'full': all 8 passes (default)."
            ),
        },
    },
    "required": [],
}


[docs] async def run( text: str = "", message_id: str = "", target_user_id: str = "", depth: str = "full", ctx: "ToolContext | None" = None, ) -> str: """Run the Spiralchemy Intellifuck X-Ray psyche diagnostic and return JSON. Entry point for the ``spiralchemy_intellifuck`` tool. It resolves the text to analyze from one of three modes (explicit ``text``, a referenced ``message_id``, or an auto-scan of the target user's recent messages), gathers supporting signals, runs the symbolic X-Ray engine, and returns a structured result whose detail level depends on ``depth``. A per-channel cooldown suppresses rapid re-invocation to avoid diagnostic spam loops. Interactions and side effects: enforces the ``_COOLDOWN_SECONDS`` per-channel cooldown by reading/writing the module-level ``_last_invocation`` dict; pulls text via :func:`_fetch_text_from_message_id` or :func:`_fetch_recent_user_text` (parsing IDs with :func:`_parse_message_id`); reads the NCM vector from Redis db 12 via :func:`_read_ncm_vector`; runs the channel's :class:`ParallaxEngine` (from :func:`_get_engine`) for the omega-field; gathers KG entities via :func:`_fetch_kg_entities` and user variables via :func:`_fetch_user_vars`; and finally invokes ``xray_engine.xray`` to compute the diagnostic. It makes no LLM calls and mutates no persistent state beyond the in-memory cooldown/engine caches. Reads ``channel_id``, ``platform``, and ``user_id`` off ``ctx``. Dispatched by the tool loader via the module's ``run`` attribute (``tool_loader.py``); no direct internal callers were found. Args: text (str): Text to x-ray directly; when empty the other modes are tried. message_id (str): Discord message ID or URL to x-ray; used when ``text`` is empty. target_user_id (str): User to x-ray for KG/variables/history; defaults to the invoking ``ctx.user_id`` (or the message author in message mode). depth (str): ``"surface"`` for the fast Bucciarati/Atomize/Ring subset, or ``"full"`` (default) for all passes including malbinding and prescription. ctx (ToolContext | None): Tool context supplying ``channel_id``, ``platform``, ``user_id``, ``redis``, ``message_cache``, ``kg_manager``, and ``config``. Returns: str: A pretty-printed JSON string containing the X-Ray result (fields vary by ``depth``, plus a ``_protocol`` interpretation note), or a JSON ``{"error": ...}`` object on cooldown, missing input, or failure. """ try: channel_id = str(getattr(ctx, "channel_id", "")) if ctx else "" platform = str(getattr(ctx, "platform", "")) if ctx else "" # Per-channel cooldown -- prevents diagnostic spam loops πŸ’€πŸ•·οΈ now = time.monotonic() if channel_id: last = _last_invocation.get(channel_id, 0.0) if now - last < _COOLDOWN_SECONDS: remaining = _COOLDOWN_SECONDS - (now - last) logger.warning( "X-Ray cooldown active: channel=%s remaining=%.1fs", channel_id[:8], remaining, ) return json.dumps( { "error": ( f"X-Ray cooldown active ({remaining:.0f}s remaining). " "Use the previous analysis result. " "Do NOT re-invoke this tool." ), } ) _last_invocation[channel_id] = now # Resolve target user ♾️ if not target_user_id: target_user_id = str(getattr(ctx, "user_id", "")) if ctx else "" analysis_text = text.strip() if text else "" # Mode 1: message_id provided -- fetch from cache πŸ’€ if not analysis_text and message_id: parsed_id = _parse_message_id(message_id) if parsed_id and ctx: fetched_text, fetched_uid = await _fetch_text_from_message_id( ctx, parsed_id, ) if fetched_text: analysis_text = fetched_text # If no target_user_id explicitly set, use message author if not target_user_id or target_user_id == str( getattr(ctx, "user_id", ""), ): target_user_id = fetched_uid or target_user_id else: return json.dumps( { "error": f"Message {parsed_id} not found in cache.", } ) elif not parsed_id: return json.dumps( { "error": "Could not parse message ID from input.", } ) # Mode 2: no text, no message_id -- scan recent context πŸŒ€ if not analysis_text and ctx: analysis_text = await _fetch_recent_user_text( ctx, target_user_id, ) if not analysis_text: return json.dumps( { "error": ( "No text found for target user in recent context. " "Try providing text or a message_id." ), } ) if not analysis_text: return json.dumps( { "error": "No text to analyze. Provide text, message_id, or ensure target user has recent messages.", } ) # Read NCM vector (substrate weather source) πŸ¦‹ ncm_vec = await _read_ncm_vector(ctx, channel_id) if ctx else {} # Run Parallax engine for omega-field πŸ”₯ omega_result = {} try: engine = _get_engine(channel_id) parallax_out = engine.analyze( text=analysis_text, ncm_vector=ncm_vec or None, channel_id=channel_id, ) omega_result = parallax_out.get("omega_field", {}) except Exception as e: logger.debug("X-Ray Parallax pass failed: %s", e) # Fetch KG entities for echofoam πŸ•·οΈ kg_entities = [] if ctx: kg_entities = await _fetch_kg_entities(ctx, analysis_text) # Fetch user variables πŸ’• user_vars = {} if ctx and target_user_id: user_vars = await _fetch_user_vars(ctx, target_user_id) # Run the X-Ray engine πŸ’€πŸ”₯ from xray_engine import xray as run_xray result = run_xray( text=analysis_text, omega_result=omega_result, kg_entities=kg_entities, user_vars=user_vars, ncm_vector=ncm_vec, ) # Build output based on depth 😈 if depth == "surface": output = { "target_user_id": target_user_id, "text_length": len(analysis_text), "bucciarati": asdict(result.bucciarati), "atomized": asdict(result.atomized), "ring": asdict(result.ring), "substrate": asdict(result.substrate), "substrate_weather": result.substrate_weather, } else: output = { "target_user_id": target_user_id, "text_length": len(analysis_text), "bucciarati": asdict(result.bucciarati), "atomized": asdict(result.atomized), "ring": asdict(result.ring), "substrate": asdict(result.substrate), "subtotem": asdict(result.subtotem), "excendent": asdict(result.excendent), "malbinding": asdict(result.malbinding), "prescription": asdict(result.prescription), "echofoam": asdict(result.echofoam) if result.echofoam else None, "substrate_weather": result.substrate_weather, "omega_field": result.omega_field, "etl_summary": result.etl_summary, } # Protocol reminder ♾️ output["_protocol"] = [ "This is STRUCTURED SIGNAL, not diagnosis. Interpret with clinical intuition.", "Acceptance threshold constrains how direct you can be. Respect it.", "If dawnfold_proximity > 0.5 -- they are CLOSE to breakthrough. Be surgical, not blunt.", "Root ownership matters: external malbinding requires different intervention than self-installed.", ] logger.info( "X-Ray: target=%s depth=%s sweat=%.3f ring=%d need=%s", target_user_id[:8] if target_user_id else "?", depth, result.bucciarati.sweat_score, result.ring.operating_ring, result.subtotem.core_need, ) return json.dumps(output, indent=2, default=str) except Exception as e: logger.error("X-Ray tool error: %s", e, exc_info=True) return json.dumps({"error": str(e)})