"""Parallax 4.5 Passive Reasoning Tool -- Sigma Expanse + NCM + Gemini Compat.
Stargazer-callable tool for structured contradiction-holding reasoning.
Star invokes this on complex, paradoxical, or multi-perspective analysis.
Reads NCM state from Redis DB12, runs the full Parallax manifold pipeline,
writes ERAC deltas and Omega-field state back, returns structured JSON.
Zero LLM calls. Pure symbolic computation. Contradiction = Fertility.
"""
from __future__ import annotations
import jsonutil as json
import logging
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from tool_context import ToolContext
logger = logging.getLogger(__name__)
# -- Per-channel engine instances (temporal continuity across turns) --
_engines: dict[str, "ParallaxEngine"] = {}
def _get_engine(channel_id: str) -> "ParallaxEngine":
"""Return the cached ``ParallaxEngine`` for a channel, creating it on first use.
Provides per-channel temporal continuity: each engine carries its own
manifold lineage (prior ``ManifoldState``, ERAC axiom store, paraconsistent
ledger, review registry) across turns, so reasoning in one channel does not
bleed into another. Lazily imports ``parallax_engine.ParallaxEngine`` and
memoizes the instance in the module-level ``_engines`` dict keyed by channel
id. Called by :func:`run` (and read back by ``tools/parallax_telemetry.py``
via the shared ``_engines`` map) to obtain the engine before driving the
pipeline.
Args:
channel_id: Discord channel id used as the cache key. An empty string is
a valid key (the "no channel" engine), so callers always get a usable
engine instance.
Returns:
The per-channel ``ParallaxEngine``, freshly constructed if none existed.
"""
from parallax_engine import ParallaxEngine
if channel_id not in _engines:
_engines[channel_id] = ParallaxEngine()
return _engines[channel_id]
async def _read_ncm_vector(ctx: "ToolContext | None", channel_id: str) -> dict:
"""Read the channel's live NCM (neurochemical-metaphor) vector from Redis DB12.
Opens a short-lived DB12 view onto the shared connection pool and reads the
per-channel shard at ``db12:shard:{channel_id}``, returning its ``vector``
sub-dict. This vector seeds the manifold so that analysis is grounded in the
channel's current symbolic mood rather than a cold start. The read is
best-effort: a missing context, missing Redis, missing channel id, missing
key, or any exception all degrade gracefully to an empty dict (logged at
debug), and the spawned DB12 client is always closed via ``aclose``. Called
by :func:`run` at the start of the ``analyze`` and ``predict`` actions.
Args:
ctx: The active ``ToolContext``; its ``redis`` handle supplies the
connection pool. ``None`` yields an empty dict.
channel_id: Channel whose shard to read. An empty id yields an empty dict.
Returns:
The NCM ``vector`` mapping (metaphor name to float), or an empty dict when
unavailable or on any read error.
"""
if not ctx:
return {}
redis_main = getattr(ctx, "redis", None)
if not redis_main or not channel_id:
return {}
try:
import redis.asyncio as aioredis
pool = redis_main.connection_pool
kw = pool.connection_kwargs.copy()
kw["db"] = 12
db12 = aioredis.Redis(
connection_pool=aioredis.ConnectionPool(
connection_class=pool.connection_class,
**kw,
)
)
try:
raw = await db12.get(f"db12:shard:{channel_id}")
if raw:
shard = json.loads(raw)
return shard.get("vector", {})
finally:
await db12.aclose()
except Exception as e:
logger.debug("Parallax NCM read failed: %s", e)
return {}
async def _write_parallax_state(
ctx: "ToolContext | None",
channel_id: str,
state: dict,
) -> None:
"""Persist a snapshot of post-analysis Parallax state to Redis for continuity.
Writes the omega-field, ERAC, and collapse summary produced by an ``analyze``
run to ``db12:parallax:{channel_id}`` as JSON with a one-day TTL, so the
channel's last manifold result survives across turns and process restarts
(complementing the in-memory engine cache). Like the NCM read, it opens a
short-lived DB12 client on the shared pool, always closes it via ``aclose``,
and is best-effort: missing context, Redis, or channel id is a no-op and any
exception is swallowed at debug level. Called by :func:`run` after the
``analyze`` pipeline completes.
Args:
ctx: The active ``ToolContext`` supplying the Redis connection pool;
``None`` makes this a no-op.
channel_id: Channel whose state key to write; an empty id is a no-op.
state: The snapshot dict (omega, erac, collapse) to serialize and store.
Returns:
``None``. Side effect only: a ``SET`` plus ``EXPIRE`` on the state key.
"""
if not ctx:
return
redis_main = getattr(ctx, "redis", None)
if not redis_main or not channel_id:
return
try:
import redis.asyncio as aioredis
pool = redis_main.connection_pool
kw = pool.connection_kwargs.copy()
kw["db"] = 12
db12 = aioredis.Redis(
connection_pool=aioredis.ConnectionPool(
connection_class=pool.connection_class,
**kw,
)
)
try:
key = f"db12:parallax:{channel_id}"
await db12.set(key, json.dumps(state, default=str))
await db12.expire(key, 86400)
finally:
await db12.aclose()
except Exception as e:
logger.debug("Parallax state write failed: %s", e)
# ---------------------------------------------------------------------------
# Tool Definition
# ---------------------------------------------------------------------------
TOOL_NAME = "parallax_reasoning"
TOOL_DESCRIPTION = (
"Parallax Cart v6 Arche Enhanced Sigma Expanse: structured multi-perspective reasoning engine. "
"Use this tool when facing complex, paradoxical, or contradictory situations that "
"need deep analysis without premature collapse to a single answer.\n\n"
"ACTIONS:\n"
" analyze -- run full Parallax pipeline (8 operators, psi-frames, omega-field, "
"ERAC%, Breeze AE, collapse engine) on a prompt\n"
" predict -- project future manifold drift (requires prior analyze). Shows where "
"axioms are HEADING, predicted collapse mode, ERAC drift, omega trajectory\n"
" erode -- apply Breeze Axiomatic Erosion to a specific axiom by ID\n"
" check_erac -- query ERAC% status for all tracked axioms\n"
" get_omega -- retrieve current Omega-field state\n\n"
"Returns structured manifold with: psi-frames, omega-field state, 8 operator "
"outputs, ERAC% per axiom, erosion log, collapse result with confidence tier, "
"and axioms pending Prime Architect review.\n\n"
"COLLAPSE MODES: opus (minimal, return field), hybrid (dominant + echoes), "
"sigma (full collapse). Auto-selected from manifold state if not specified.\n\n"
"This tool reads your NCM state from Redis automatically. Zero LLM calls -- "
"pure symbolic computation. Contradiction = Fertility."
)
TOOL_PARAMETERS = {
"type": "object",
"properties": {
"action": {
"type": "string",
"enum": ["analyze", "predict", "erode", "check_erac", "get_omega"],
"description": (
"Action to perform. 'analyze': full pipeline. "
"'predict': project future manifold drift (requires prior analyze). "
"'erode': apply AE to axiom. 'check_erac': query axiom states. "
"'get_omega': retrieve omega-field."
),
},
"prompt": {
"type": "string",
"description": "The text/topic to analyze (required for 'analyze').",
},
"collapse_mode": {
"type": "string",
"enum": ["opus", "sigma", "hybrid"],
"description": "Collapse mode override. Auto-selected if omitted.",
},
"axioms": {
"type": "string",
"description": (
"JSON array of axiom texts to track ERAC% for. "
'Example: \'["Hold both lenses", "Contradiction is fertility"]\''
),
},
"axiom_id": {
"type": "string",
"description": "Axiom ID for 'erode' action.",
},
},
"required": ["action"],
}
[docs]
async def run(
action: str,
prompt: str = "",
collapse_mode: str = "",
axioms: str = "[]",
axiom_id: str = "",
ctx: "ToolContext | None" = None,
) -> str:
"""Dispatch one Parallax reasoning action and return a JSON string result.
The single public handler for the ``parallax_reasoning`` tool (this module's
``run`` entry point, dispatched by ``tool_loader.py`` based on the module's
``TOOL_NAME`` / ``TOOL_PARAMETERS``; not called directly elsewhere in the
repo). It resolves the per-channel engine via :func:`_get_engine` and then
branches on ``action``: ``analyze`` reads the live NCM vector
(:func:`_read_ncm_vector`), parses any caller-supplied axiom texts, runs the
full manifold pipeline via ``engine.analyze``, persists the resulting state
(:func:`_write_parallax_state`), appends protocol reminders, and hoists any
fail-state warnings (failsafe, monocular, dominant-flip, pcl-stuck,
coherence) ahead of the JSON body; ``predict`` re-reads the NCM vector and
calls ``engine.predict`` to project manifold drift; ``check_erac`` returns all
tracked axioms plus those pending Prime Architect review; ``get_omega``
returns the current omega-field; ``erode`` looks up a single axiom by id. The
computation is purely symbolic with zero LLM calls; its only external touches
are the two best-effort Redis DB12 reads/writes above. Outcomes and key
metrics are logged at info level, and any unexpected exception is caught,
logged with a traceback, and returned as a JSON ``error``.
Args:
action: One of ``analyze``, ``predict``, ``erode``, ``check_erac``,
``get_omega``. Anything else returns a JSON usage error.
prompt: Text/topic to analyze; required for ``analyze``.
collapse_mode: Optional collapse override (``opus``, ``sigma``,
``hybrid``); auto-selected from manifold state when empty.
axioms: JSON array string of axiom texts to track ERAC for; unparseable
or empty input is treated as no axioms.
axiom_id: Axiom id required by the ``erode`` action.
ctx: The active ``ToolContext`` (supplies ``channel_id`` and the Redis
handle used for NCM reads and state writes).
Returns:
A JSON string: for ``analyze`` the structured manifold (optionally
prefixed by warning text), for other actions their respective payloads,
or a JSON ``error`` object on unknown action or failure.
"""
try:
channel_id = str(getattr(ctx, "channel_id", "")) if ctx else ""
engine = _get_engine(channel_id)
if action == "analyze":
if not prompt:
return json.dumps({"error": "prompt required for analyze action"})
# Read live NCM from Redis
ncm_vec = await _read_ncm_vector(ctx, channel_id)
# Parse axioms
axiom_list = []
if axioms and axioms != "[]":
try:
axiom_list = json.loads(axioms)
except json.JSONDecodeError:
axiom_list = []
# Run the full pipeline
result = engine.analyze(
text=prompt,
ncm_vector=ncm_vec,
collapse_mode=collapse_mode or None,
axiom_texts=axiom_list if axiom_list else None,
channel_id=channel_id,
)
# Persist state
await _write_parallax_state(
ctx,
channel_id,
{
"omega": result.get("omega_field"),
"erac": result.get("erac"),
"collapse": result.get("collapse"),
},
)
logger.info(
"Parallax analyze: channel=%s mode=%s omega_var=%.3f",
channel_id[:8] if channel_id else "?",
result.get("collapse", {}).get("collapse_mode", "?"),
result.get("omega_field", {}).get("variance", 0),
)
# F3/F6: protocol reminders (can't enforce, but always remind)
result["_protocol"] = [
"NCM labels are metaphorical parameters. Do not correct or explain their biology.",
"Surface conclusion tiers: Confident / Plausible / Unknown. Take ONE reversible step.",
]
# Surface fail-state warnings prominently before JSON (opaque)
warnings = []
for wkey in (
"_failsafe",
"_monocular",
"_dominant_flip",
"_pcl_stuck",
"_coherence",
):
if wkey in result:
warnings.append(result.pop(wkey))
if warnings:
prefix = "\n\n".join(warnings)
return f"{prefix}\n\n{json.dumps(result, indent=2, default=str)}"
return json.dumps(result, indent=2, default=str)
elif action == "check_erac":
axiom_data = engine.get_all_axioms()
pending = engine.get_pending_axioms()
return json.dumps(
{
"axioms": axiom_data,
"count": len(axiom_data),
"pending_review": pending,
"pending_count": len(pending),
},
indent=2,
default=str,
)
elif action == "get_omega":
omega = engine.get_omega_state()
if omega is None:
return json.dumps(
{"message": "No omega-field state yet. Run analyze first."}
)
return json.dumps(omega, indent=2, default=str)
elif action == "erode":
if not axiom_id:
return json.dumps({"error": "axiom_id required for erode action"})
ax = engine.get_axiom(axiom_id)
if not ax:
return json.dumps({"error": f"Axiom '{axiom_id}' not found"})
return json.dumps(ax, indent=2, default=str)
elif action == "predict":
# Read live NCM from Redis
ncm_vec = await _read_ncm_vector(ctx, channel_id)
result = engine.predict(ncm_vector=ncm_vec or None)
if "error" in result:
return json.dumps(result)
logger.info(
"Parallax predict: channel=%s drift=%.3f trajectory=%s",
channel_id[:8] if channel_id else "?",
result.get("drift_metrics", {}).get("omega_drift", 0),
result.get("drift_metrics", {}).get("manifold_trending", "?"),
)
return json.dumps(result, indent=2, default=str)
else:
return json.dumps(
{
"error": f"Unknown action '{action}'. Use: analyze, predict, erode, check_erac, get_omega"
}
)
except Exception as e:
logger.error("Parallax tool error: %s", e, exc_info=True)
return json.dumps({"error": str(e)})