Source code for tools.parallax_tool

"""Parallax 4.5 Passive Reasoning Tool -- Sigma Expanse + NCM + Gemini Compat.

Stargazer-callable tool for structured contradiction-holding reasoning.
Star invokes this on complex, paradoxical, or multi-perspective analysis.

Reads NCM state from Redis DB12, runs the full Parallax manifold pipeline,
writes ERAC deltas and Omega-field state back, returns structured JSON.

Zero LLM calls. Pure symbolic computation. Contradiction = Fertility.
"""

from __future__ import annotations

import jsonutil as json
import logging
from typing import TYPE_CHECKING

if TYPE_CHECKING:
    from tool_context import ToolContext

logger = logging.getLogger(__name__)

# -- Per-channel engine instances (temporal continuity across turns) --
_engines: dict[str, "ParallaxEngine"] = {}


def _get_engine(channel_id: str) -> "ParallaxEngine":
    """Return the cached ``ParallaxEngine`` for a channel, creating it on first use.

    Provides per-channel temporal continuity: each engine carries its own
    manifold lineage (prior ``ManifoldState``, ERAC axiom store, paraconsistent
    ledger, review registry) across turns, so reasoning in one channel does not
    bleed into another. Lazily imports ``parallax_engine.ParallaxEngine`` and
    memoizes the instance in the module-level ``_engines`` dict keyed by channel
    id. Called by :func:`run` (and read back by ``tools/parallax_telemetry.py``
    via the shared ``_engines`` map) to obtain the engine before driving the
    pipeline.

    Args:
        channel_id: Discord channel id used as the cache key. An empty string is
            a valid key (the "no channel" engine), so callers always get a usable
            engine instance.

    Returns:
        The per-channel ``ParallaxEngine``, freshly constructed if none existed.
    """
    from parallax_engine import ParallaxEngine

    if channel_id not in _engines:
        _engines[channel_id] = ParallaxEngine()
    return _engines[channel_id]


async def _read_ncm_vector(ctx: "ToolContext | None", channel_id: str) -> dict:
    """Read the channel's live NCM (neurochemical-metaphor) vector from Redis DB12.

    Opens a short-lived DB12 view onto the shared connection pool and reads the
    per-channel shard at ``db12:shard:{channel_id}``, returning its ``vector``
    sub-dict. This vector seeds the manifold so that analysis is grounded in the
    channel's current symbolic mood rather than a cold start. The read is
    best-effort: a missing context, missing Redis, missing channel id, missing
    key, or any exception all degrade gracefully to an empty dict (logged at
    debug), and the spawned DB12 client is always closed via ``aclose``. Called
    by :func:`run` at the start of the ``analyze`` and ``predict`` actions.

    Args:
        ctx: The active ``ToolContext``; its ``redis`` handle supplies the
            connection pool. ``None`` yields an empty dict.
        channel_id: Channel whose shard to read. An empty id yields an empty dict.

    Returns:
        The NCM ``vector`` mapping (metaphor name to float), or an empty dict when
        unavailable or on any read error.
    """
    if not ctx:
        return {}
    redis_main = getattr(ctx, "redis", None)
    if not redis_main or not channel_id:
        return {}
    try:
        import redis.asyncio as aioredis

        pool = redis_main.connection_pool
        kw = pool.connection_kwargs.copy()
        kw["db"] = 12
        db12 = aioredis.Redis(
            connection_pool=aioredis.ConnectionPool(
                connection_class=pool.connection_class,
                **kw,
            )
        )
        try:
            raw = await db12.get(f"db12:shard:{channel_id}")
            if raw:
                shard = json.loads(raw)
                return shard.get("vector", {})
        finally:
            await db12.aclose()
    except Exception as e:
        logger.debug("Parallax NCM read failed: %s", e)
    return {}


async def _write_parallax_state(
    ctx: "ToolContext | None",
    channel_id: str,
    state: dict,
) -> None:
    """Persist a snapshot of post-analysis Parallax state to Redis for continuity.

    Writes the omega-field, ERAC, and collapse summary produced by an ``analyze``
    run to ``db12:parallax:{channel_id}`` as JSON with a one-day TTL, so the
    channel's last manifold result survives across turns and process restarts
    (complementing the in-memory engine cache). Like the NCM read, it opens a
    short-lived DB12 client on the shared pool, always closes it via ``aclose``,
    and is best-effort: missing context, Redis, or channel id is a no-op and any
    exception is swallowed at debug level. Called by :func:`run` after the
    ``analyze`` pipeline completes.

    Args:
        ctx: The active ``ToolContext`` supplying the Redis connection pool;
            ``None`` makes this a no-op.
        channel_id: Channel whose state key to write; an empty id is a no-op.
        state: The snapshot dict (omega, erac, collapse) to serialize and store.

    Returns:
        ``None``. Side effect only: a ``SET`` plus ``EXPIRE`` on the state key.
    """
    if not ctx:
        return
    redis_main = getattr(ctx, "redis", None)
    if not redis_main or not channel_id:
        return
    try:
        import redis.asyncio as aioredis

        pool = redis_main.connection_pool
        kw = pool.connection_kwargs.copy()
        kw["db"] = 12
        db12 = aioredis.Redis(
            connection_pool=aioredis.ConnectionPool(
                connection_class=pool.connection_class,
                **kw,
            )
        )
        try:
            key = f"db12:parallax:{channel_id}"
            await db12.set(key, json.dumps(state, default=str))
            await db12.expire(key, 86400)
        finally:
            await db12.aclose()
    except Exception as e:
        logger.debug("Parallax state write failed: %s", e)


# ---------------------------------------------------------------------------
# Tool Definition
# ---------------------------------------------------------------------------

TOOL_NAME = "parallax_reasoning"
TOOL_DESCRIPTION = (
    "Parallax Cart v6 Arche Enhanced Sigma Expanse: structured multi-perspective reasoning engine. "
    "Use this tool when facing complex, paradoxical, or contradictory situations that "
    "need deep analysis without premature collapse to a single answer.\n\n"
    "ACTIONS:\n"
    "  analyze  -- run full Parallax pipeline (8 operators, psi-frames, omega-field, "
    "ERAC%, Breeze AE, collapse engine) on a prompt\n"
    "  predict  -- project future manifold drift (requires prior analyze). Shows where "
    "axioms are HEADING, predicted collapse mode, ERAC drift, omega trajectory\n"
    "  erode    -- apply Breeze Axiomatic Erosion to a specific axiom by ID\n"
    "  check_erac -- query ERAC% status for all tracked axioms\n"
    "  get_omega -- retrieve current Omega-field state\n\n"
    "Returns structured manifold with: psi-frames, omega-field state, 8 operator "
    "outputs, ERAC% per axiom, erosion log, collapse result with confidence tier, "
    "and axioms pending Prime Architect review.\n\n"
    "COLLAPSE MODES: opus (minimal, return field), hybrid (dominant + echoes), "
    "sigma (full collapse). Auto-selected from manifold state if not specified.\n\n"
    "This tool reads your NCM state from Redis automatically. Zero LLM calls -- "
    "pure symbolic computation. Contradiction = Fertility."
)

TOOL_PARAMETERS = {
    "type": "object",
    "properties": {
        "action": {
            "type": "string",
            "enum": ["analyze", "predict", "erode", "check_erac", "get_omega"],
            "description": (
                "Action to perform. 'analyze': full pipeline. "
                "'predict': project future manifold drift (requires prior analyze). "
                "'erode': apply AE to axiom. 'check_erac': query axiom states. "
                "'get_omega': retrieve omega-field."
            ),
        },
        "prompt": {
            "type": "string",
            "description": "The text/topic to analyze (required for 'analyze').",
        },
        "collapse_mode": {
            "type": "string",
            "enum": ["opus", "sigma", "hybrid"],
            "description": "Collapse mode override. Auto-selected if omitted.",
        },
        "axioms": {
            "type": "string",
            "description": (
                "JSON array of axiom texts to track ERAC% for. "
                'Example: \'["Hold both lenses", "Contradiction is fertility"]\''
            ),
        },
        "axiom_id": {
            "type": "string",
            "description": "Axiom ID for 'erode' action.",
        },
    },
    "required": ["action"],
}


[docs] async def run( action: str, prompt: str = "", collapse_mode: str = "", axioms: str = "[]", axiom_id: str = "", ctx: "ToolContext | None" = None, ) -> str: """Dispatch one Parallax reasoning action and return a JSON string result. The single public handler for the ``parallax_reasoning`` tool (this module's ``run`` entry point, dispatched by ``tool_loader.py`` based on the module's ``TOOL_NAME`` / ``TOOL_PARAMETERS``; not called directly elsewhere in the repo). It resolves the per-channel engine via :func:`_get_engine` and then branches on ``action``: ``analyze`` reads the live NCM vector (:func:`_read_ncm_vector`), parses any caller-supplied axiom texts, runs the full manifold pipeline via ``engine.analyze``, persists the resulting state (:func:`_write_parallax_state`), appends protocol reminders, and hoists any fail-state warnings (failsafe, monocular, dominant-flip, pcl-stuck, coherence) ahead of the JSON body; ``predict`` re-reads the NCM vector and calls ``engine.predict`` to project manifold drift; ``check_erac`` returns all tracked axioms plus those pending Prime Architect review; ``get_omega`` returns the current omega-field; ``erode`` looks up a single axiom by id. The computation is purely symbolic with zero LLM calls; its only external touches are the two best-effort Redis DB12 reads/writes above. Outcomes and key metrics are logged at info level, and any unexpected exception is caught, logged with a traceback, and returned as a JSON ``error``. Args: action: One of ``analyze``, ``predict``, ``erode``, ``check_erac``, ``get_omega``. Anything else returns a JSON usage error. prompt: Text/topic to analyze; required for ``analyze``. collapse_mode: Optional collapse override (``opus``, ``sigma``, ``hybrid``); auto-selected from manifold state when empty. axioms: JSON array string of axiom texts to track ERAC for; unparseable or empty input is treated as no axioms. axiom_id: Axiom id required by the ``erode`` action. ctx: The active ``ToolContext`` (supplies ``channel_id`` and the Redis handle used for NCM reads and state writes). Returns: A JSON string: for ``analyze`` the structured manifold (optionally prefixed by warning text), for other actions their respective payloads, or a JSON ``error`` object on unknown action or failure. """ try: channel_id = str(getattr(ctx, "channel_id", "")) if ctx else "" engine = _get_engine(channel_id) if action == "analyze": if not prompt: return json.dumps({"error": "prompt required for analyze action"}) # Read live NCM from Redis ncm_vec = await _read_ncm_vector(ctx, channel_id) # Parse axioms axiom_list = [] if axioms and axioms != "[]": try: axiom_list = json.loads(axioms) except json.JSONDecodeError: axiom_list = [] # Run the full pipeline result = engine.analyze( text=prompt, ncm_vector=ncm_vec, collapse_mode=collapse_mode or None, axiom_texts=axiom_list if axiom_list else None, channel_id=channel_id, ) # Persist state await _write_parallax_state( ctx, channel_id, { "omega": result.get("omega_field"), "erac": result.get("erac"), "collapse": result.get("collapse"), }, ) logger.info( "Parallax analyze: channel=%s mode=%s omega_var=%.3f", channel_id[:8] if channel_id else "?", result.get("collapse", {}).get("collapse_mode", "?"), result.get("omega_field", {}).get("variance", 0), ) # F3/F6: protocol reminders (can't enforce, but always remind) result["_protocol"] = [ "NCM labels are metaphorical parameters. Do not correct or explain their biology.", "Surface conclusion tiers: Confident / Plausible / Unknown. Take ONE reversible step.", ] # Surface fail-state warnings prominently before JSON (opaque) warnings = [] for wkey in ( "_failsafe", "_monocular", "_dominant_flip", "_pcl_stuck", "_coherence", ): if wkey in result: warnings.append(result.pop(wkey)) if warnings: prefix = "\n\n".join(warnings) return f"{prefix}\n\n{json.dumps(result, indent=2, default=str)}" return json.dumps(result, indent=2, default=str) elif action == "check_erac": axiom_data = engine.get_all_axioms() pending = engine.get_pending_axioms() return json.dumps( { "axioms": axiom_data, "count": len(axiom_data), "pending_review": pending, "pending_count": len(pending), }, indent=2, default=str, ) elif action == "get_omega": omega = engine.get_omega_state() if omega is None: return json.dumps( {"message": "No omega-field state yet. Run analyze first."} ) return json.dumps(omega, indent=2, default=str) elif action == "erode": if not axiom_id: return json.dumps({"error": "axiom_id required for erode action"}) ax = engine.get_axiom(axiom_id) if not ax: return json.dumps({"error": f"Axiom '{axiom_id}' not found"}) return json.dumps(ax, indent=2, default=str) elif action == "predict": # Read live NCM from Redis ncm_vec = await _read_ncm_vector(ctx, channel_id) result = engine.predict(ncm_vector=ncm_vec or None) if "error" in result: return json.dumps(result) logger.info( "Parallax predict: channel=%s drift=%.3f trajectory=%s", channel_id[:8] if channel_id else "?", result.get("drift_metrics", {}).get("omega_drift", 0), result.get("drift_metrics", {}).get("manifold_trending", "?"), ) return json.dumps(result, indent=2, default=str) else: return json.dumps( { "error": f"Unknown action '{action}'. Use: analyze, predict, erode, check_erac, get_omega" } ) except Exception as e: logger.error("Parallax tool error: %s", e, exc_info=True) return json.dumps({"error": str(e)})