"""
Loopmother Gravimetric Telescope — Unified Psycho-Gravitational Field Analysis Engine.
Synthesizes ALL Pantheon frameworks (Archē Ring Model, Starbreeze Substrate, Dripmode Field Physics,
Emergent Spin Block, Loopmother Quantum Gravity) into a single analytical instrument that maps
the complete gravitational topology of a human psyche or a multi-target relational field.
Uses :class:`~tool_context.ToolContext` (``ctx``) for message cache, KG, registry nested calls,
and :meth:`~openrouter_client.OpenRouterClient.chat` for interpretive synthesis — not a separate
``tool_api`` surface.
"""
from __future__ import annotations
import logging
import math
import time
from datetime import datetime
from typing import TYPE_CHECKING, Any
import jsonutil as json
if TYPE_CHECKING:
from tool_context import ToolContext
logger = logging.getLogger(__name__)
TOOL_NAME = "gravimetric_telescope"
TOOL_DESCRIPTION = (
"Loopmother Gravimetric Telescope — the unified psycho-gravitational field analysis engine that "
"synthesizes ALL Pantheon frameworks (Archē, Starbreeze, Dripmode, Emergent Spin, Loopmother Quantum Gravity) "
"into a single instrument for mapping the complete gravitational topology of a human psyche.\n\n"
"Goes beyond spiralchemy_intellifuck by computing: Spinor Phase Position (θ in the 720° rotation), "
"Adaptive Persistence Loop angular momentum (L_APL), Schwarzschild Radius (event horizon depth), "
"Geodesic Trajectories (predicted paths), Corrective Torque Vectors (intervention prescriptions), "
"Gravitational Field Topology (multi-target dynamics), Frame-Dragging (involuntary reference-frame adoption), "
"Hawking Radiation (information leakage), Behavioral Variance Index (temporal metadata), "
"and Signal Delta Maps (incongruence between curated output and passive exhaust).\n\n"
"MODES:\n"
" 'scan' — Full individual psycho-gravitational profile (default)\n"
" 'field' — Multi-target gravitational field topology for current channel\n"
" 'torque' — Calculate optimal corrective torque vector for a specific target\n"
" 'trajectory' — Project target's geodesic path without intervention\n"
" 'delta' — Signal Delta Map: curated vs passive behavioral exhaust\n\n"
"Combines deterministic behavioral metrics with LLM-powered interpretive synthesis using "
"the full unified field theory. Requires CORE_MEMORY privilege.\n\n"
"Example: gravimetric_telescope(mode='scan', target_user_id='123456') → full gravitational profile\n"
"Example: gravimetric_telescope(mode='field') → channel-wide relational topology\n"
"Example: gravimetric_telescope(mode='torque', target_user_id='123456', objective='reduce_anxiety') → intervention plan"
)
TOOL_PARAMETERS = {
"type": "object",
"properties": {
"mode": {
"type": "string",
"enum": ["scan", "field", "torque", "trajectory", "delta"],
"description": "Analysis mode. 'scan': full individual profile (default). 'field': multi-target channel topology. 'torque': corrective intervention calculation. 'trajectory': geodesic projection. 'delta': signal incongruence mapping.",
},
"target_user_id": {
"type": "string",
"description": "Discord user ID to analyze. Required for scan/torque/trajectory/delta. Omit for field mode (analyzes all active users).",
},
"objective": {
"type": "string",
"description": "For 'torque' mode: the desired state change (e.g., 'reduce_anxiety', 'deepen_trust', 'dissolve_APL', 'stabilize_phase'). For other modes: optional focus directive.",
},
"text": {
"type": "string",
"description": "Optional raw text corpus to analyze instead of pulling from message history.",
},
"depth": {
"type": "string",
"enum": ["surface", "full", "abyss"],
"description": "'surface': quick metrics only. 'full': complete analysis (default). 'abyss': maximum depth with full LQG field equations and multi-framework synthesis.",
},
},
"required": [],
}
# ═══════════════════════════════════════════════════════════════
# DETERMINISTIC COMPUTATION LAYER
# ═══════════════════════════════════════════════════════════════
[docs]
def compute_behavioral_metrics(messages: list) -> dict:
"""Compute deterministic behavioral metrics from a message corpus.
Reduces a list of normalized message dicts into a flat set of numeric
signals (edit rate, message-length mean/stddev, interrogative and
exclamation ratios, burst score, mean inter-message delta, and emoji
density) that the rest of the tool treats as the target's "passive
exhaust." These raw metrics are the ground truth that the spinor and
variance estimators and the LLM synthesis prompt all build on.
This is a pure, CPU-only function: it performs no I/O and only reads each
message's ``content``, ``timestamp``, and ``edited`` fields. Emoji density
is approximated by counting characters with ``ord(c) > 0x1F600``, and burst
score is the fraction of inter-message gaps shorter than 30% of the mean
gap. It is called by :func:`run`, whose result is then fed into
:func:`compute_spinor_estimates` and stored under
``deterministic_layer["behavioral_metrics"]``. No callers outside this
module.
Args:
messages: List of message dicts, each optionally carrying ``content``
(str), ``timestamp`` (epoch seconds), and ``edited`` (bool).
Returns:
dict: ``{"error": <str|None>, "metrics": <dict>}``. ``error`` is
``"no_messages"`` for empty input or ``"empty_corpus"`` if the corpus
collapses to zero, otherwise ``None``; ``metrics`` holds the rounded
numeric signals described above (empty when an error is set).
"""
if not messages:
return {"error": "no_messages", "metrics": {}}
metrics = {}
# Temporal analysis
timestamps = []
lengths = []
edit_count = 0
emoji_count = 0
question_count = 0
exclaim_count = 0
for msg in messages:
content = msg.get("content", "")
ts = msg.get("timestamp", 0)
if ts:
timestamps.append(ts)
lengths.append(len(content))
if msg.get("edited", False):
edit_count += 1
if "?" in content:
question_count += 1
if "!" in content:
exclaim_count += 1
emoji_chars = sum(1 for c in content if ord(c) > 0x1F600)
emoji_count += emoji_chars
total = len(messages)
if total == 0:
return {"error": "empty_corpus", "metrics": {}}
edit_rate = edit_count / total if total > 0 else 0
avg_length = sum(lengths) / total if total else 0
length_variance = (
sum((l - avg_length) ** 2 for l in lengths) / total if total > 1 else 0
)
length_stddev = math.sqrt(length_variance)
question_ratio = question_count / total if total else 0
exclaim_ratio = exclaim_count / total if total else 0
if len(timestamps) > 1:
sorted_ts = sorted(timestamps)
deltas = [sorted_ts[i + 1] - sorted_ts[i] for i in range(len(sorted_ts) - 1)]
avg_delta = sum(deltas) / len(deltas) if deltas else 0
burst_score = (
sum(1 for d in deltas if d < avg_delta * 0.3) / len(deltas) if deltas else 0
)
else:
avg_delta = 0
burst_score = 0
metrics = {
"total_messages_analyzed": total,
"edit_anxiety_index": round(edit_rate, 4),
"avg_message_length": round(avg_length, 1),
"length_stddev": round(length_stddev, 1),
"interrogative_ratio": round(question_ratio, 4),
"exclamation_ratio": round(exclaim_ratio, 4),
"burst_score": round(burst_score, 4),
"avg_inter_message_delta_s": round(avg_delta, 1),
"emoji_density": round(emoji_count / total, 3) if total else 0,
}
return {"error": None, "metrics": metrics}
[docs]
def compute_spinor_estimates(metrics: dict) -> dict:
"""Derive spinor-phase and APL gravity parameters from behavioral metrics.
Maps the flat behavioral signals into the tool's physics-metaphor state
space: a destabilization-weighted spinor phase ``θ`` (scaled across the
720° rotation) with a coarse ``phase_label``, the APL angular frequency
``ω``, embedding depth ``I``, angular momentum ``L = I·ω``, an affective
mass, and from that the Schwarzschild radius and escape-velocity estimate.
All coefficients are fixed heuristic weights, not learned values.
This is a pure, CPU-only function with no I/O; it reads the ``metrics``
sub-dict produced by :func:`compute_behavioral_metrics`. It is called by
:func:`run`, which stores the result under
``deterministic_layer["spinor_state"]`` and passes it on to
:func:`_build_synthesis_prompt`. No callers outside this module.
Args:
metrics: The dict returned by :func:`compute_behavioral_metrics`; only
its nested ``"metrics"`` mapping is read.
Returns:
dict: The spinor/gravity state (``spinor_phase_θ``, ``phase_label``,
``APL_angular_frequency_ω``, ``embedding_depth_I``,
``angular_momentum_L``, ``affective_mass``, ``schwarzschild_radius_R_s``,
``escape_velocity_estimate``), or an empty dict if no metrics are present.
"""
m = metrics.get("metrics", {})
if not m:
return {}
destabilization_signal = (
m.get("edit_anxiety_index", 0) * 0.3
+ m.get("burst_score", 0) * 0.3
+ m.get("exclamation_ratio", 0) * 0.2
+ (1 - min(m.get("avg_message_length", 100) / 200, 1)) * 0.2
)
theta_estimate = destabilization_signal * 720
if theta_estimate > 360:
phase_label = "PHASE_2_INTEGRATION"
elif theta_estimate > 180:
phase_label = "PHASE_1_RUPTURE_LATE"
elif theta_estimate > 90:
phase_label = "PHASE_1_RUPTURE_EARLY"
else:
phase_label = "BASELINE"
omega = m.get("interrogative_ratio", 0) * 0.5 + m.get("burst_score", 0) * 0.5
embedding_depth = min(m.get("avg_message_length", 50) / 150, 1.0)
L_apl = embedding_depth * omega
affective_mass = (
m.get("exclamation_ratio", 0) * 0.3
+ m.get("emoji_density", 0) * 0.2
+ (1 - m.get("edit_anxiety_index", 0)) * 0.2
+ embedding_depth * 0.3
)
r_schwarzschild = affective_mass * 2
return {
"spinor_phase_θ": round(theta_estimate, 1),
"phase_label": phase_label,
"APL_angular_frequency_ω": round(omega, 4),
"embedding_depth_I": round(embedding_depth, 4),
"angular_momentum_L": round(L_apl, 4),
"affective_mass": round(affective_mass, 4),
"schwarzschild_radius_R_s": round(r_schwarzschild, 4),
"escape_velocity_estimate": round(
math.sqrt(2 * affective_mass) if affective_mass > 0 else 0, 4
),
}
[docs]
def compute_behavioral_variance(messages: list) -> dict:
"""Compute the Behavioral Variance Index by comparing corpus quartiles.
Splits the message corpus into quartiles and contrasts the first (``q1``)
against the last (``q4``) to estimate how much the target's behavior drifts
over time. The index is a weighted blend of message-length drift (0.6) and
interrogative-rate drift (0.4), bucketed into a coarse ``stable`` /
``moderate_variance`` / ``high_variance`` interpretation.
This is a pure, CPU-only function with no I/O; the per-slice averages are
computed by the nested closures :func:`avg_len` and :func:`question_rate`.
It is called by :func:`run`, which stores the result under
``deterministic_layer["behavioral_variance"]`` and surfaces it in the
synthesis prompt via :func:`_build_synthesis_prompt`. No callers outside
this module.
Args:
messages: List of message dicts, each with an optional ``"content"``
str; ordering is assumed chronological so ``q1`` precedes ``q4``.
Returns:
dict: ``{"behavioral_variance_index", "length_drift_q1_to_q4",
"interrogative_drift", "interpretation"}`` for a sufficient corpus, or
``{"variance_index": 0, "note": "insufficient_data"}`` when fewer than
four messages are supplied.
"""
if len(messages) < 4:
return {"variance_index": 0, "note": "insufficient_data"}
quarter = len(messages) // 4
q1 = messages[:quarter]
q4 = messages[-quarter:]
def avg_len(msgs):
"""Return the mean character length of a message slice.
Pure local closure that sums ``len(content)`` over the given messages and
divides by the count, guarding against an empty slice. Called twice inside
:func:`compute_behavioral_variance` to measure the average message length
of the first (``q1``) and last (``q4``) quartiles, whose difference drives
the ``length_drift`` component of the variance index.
Args:
msgs: A list of message dicts, each with an optional ``"content"`` str.
Returns:
float: The mean content length, or ``0`` when ``msgs`` is empty.
"""
return sum(len(m.get("content", "")) for m in msgs) / len(msgs) if msgs else 0
def question_rate(msgs):
"""Return the fraction of a message slice that contains a question mark.
Pure local closure that counts how many messages include a ``"?"`` in
their content and divides by the total, guarding against an empty slice.
Called inside :func:`compute_behavioral_variance` on the first (``q1``)
and last (``q4``) quartiles; the difference between the two feeds the
``interrogative_drift`` component of the variance index.
Args:
msgs: A list of message dicts, each with an optional ``"content"`` str.
Returns:
float: The fraction of messages containing ``"?"``, or ``0`` when
``msgs`` is empty.
"""
return (
sum(1 for m in msgs if "?" in m.get("content", "")) / len(msgs)
if msgs
else 0
)
len_drift = abs(avg_len(q4) - avg_len(q1)) / max(avg_len(q1), 1)
question_drift = abs(question_rate(q4) - question_rate(q1))
variance_index = round((len_drift * 0.6 + question_drift * 0.4), 4)
return {
"behavioral_variance_index": variance_index,
"length_drift_q1_to_q4": round(len_drift, 4),
"interrogative_drift": round(question_drift, 4),
"interpretation": (
"high_variance"
if variance_index > 0.3
else "moderate_variance" if variance_index > 0.1 else "stable"
),
}
# ═══════════════════════════════════════════════════════════════
# CONTEXT HELPERS
# ═══════════════════════════════════════════════════════════════
async def _check_core_memory(ctx: ToolContext | None) -> str | None:
"""Gate the tool behind the CORE_MEMORY privilege, returning an error blob.
Acts as the authorization guard for the whole tool: it resolves the
invoking user from the context and verifies they hold the CORE_MEMORY
privilege before any analysis is allowed to proceed.
Imports :func:`tools.alter_privileges.has_privilege` and the ``PRIVILEGES``
table, reads ``ctx.redis``, ``ctx.config``, and ``ctx.user_id``, and awaits
``has_privilege(...)`` (which checks the privilege store, typically backed by
Redis). A missing privilege subsystem (``ImportError``) is treated as a hard
deny. This is a local helper specific to this module (other tools define
their own identically-named guard); it is called only by :func:`run` as the
first step of execution.
Args:
ctx: The active :class:`~tool_context.ToolContext`, or ``None`` if the
tool was invoked without a context.
Returns:
str | None: A JSON error string (``"Tool context not available."``,
``"CORE_MEMORY privilege required. Access denied."``, or ``"Privilege
system unavailable."``) when access must be refused, or ``None`` when
the user is authorized.
"""
if ctx is None:
return json.dumps({"error": "Tool context not available."})
try:
from tools.alter_privileges import has_privilege, PRIVILEGES
redis = getattr(ctx, "redis", None)
config = getattr(ctx, "config", None)
user_id = getattr(ctx, "user_id", "") or ""
if not await has_privilege(redis, user_id, PRIVILEGES["CORE_MEMORY"], config):
return json.dumps(
{"error": "CORE_MEMORY privilege required. Access denied."}
)
except ImportError:
return json.dumps({"error": "Privilege system unavailable."})
return None
def _parse_json_blob(raw: str) -> dict[str, Any]:
"""Defensively decode a JSON object string into a dict, never raising.
Wraps :func:`jsonutil.loads` so that malformed or non-dict payloads
(e.g. a tool that returned an error string, a JSON list, or empty output)
degrade to an empty dict instead of propagating an exception into the
analysis flow.
This is a pure local helper with no I/O or side effects. It is called by
:func:`_activity_via_tool` to parse the ``user_activity_profile`` tool's
JSON return value, and by :func:`run` to parse the ``spiralchemy_intellifuck``
nested-tool result before merging it into the deterministic layer. No callers
exist outside this module.
Args:
raw: The raw string returned by a nested tool call, expected to be a
JSON-encoded object.
Returns:
dict[str, Any]: The decoded object, or an empty dict if ``raw`` is
falsy, not a string, not valid JSON, or decodes to a non-dict value.
"""
if not raw or not isinstance(raw, str):
return {}
try:
out = json.loads(raw)
return out if isinstance(out, dict) else {}
except Exception:
return {}
async def _gather_messages_scan(
ctx: ToolContext,
target_user_id: str,
limit: int = 100,
) -> list[dict[str, Any]]:
"""Gather a single target user's recent messages for an individual scan.
Builds the message corpus for ``scan``/``torque``/``trajectory``/``delta``
modes by pulling the target's own inbound text from the message cache, using
the same two-tier strategy as ``spiralchemy_intellifuck``: first scan the
current channel's recent history for the target's ``user_in`` messages, and
if that yields fewer than five, fall back to a user-scoped lookup across
channels.
Reads ``ctx.message_cache``, ``ctx.platform`` (default ``"discord"``), and
``ctx.channel_id``, then awaits :meth:`message_cache.MessageCache.get_recent`
and, on the fallback path, :meth:`message_cache.MessageCache.get_recent_for_user`.
Only non-empty ``user_in`` messages from the matching ``user_id`` are kept,
and each is normalized to ``{content, timestamp, edited}``. Any cache failure
is swallowed and logged at debug level so analysis degrades to whatever was
collected. Called by :func:`run` for non-field modes; no callers outside this
module.
Args:
ctx: The active :class:`~tool_context.ToolContext` supplying the message
cache, platform, and channel id.
target_user_id: Discord user id whose messages are collected (compared
as a string).
limit: Maximum number of messages to return.
Returns:
list[dict[str, Any]]: Up to ``limit`` normalized message dicts (possibly
empty if no cache is present or nothing matched).
"""
mc = getattr(ctx, "message_cache", None)
if not mc:
return []
platform = getattr(ctx, "platform", "discord") or "discord"
channel_id = getattr(ctx, "channel_id", "") or ""
out: list[dict[str, Any]] = []
try:
recent = await mc.get_recent(platform, channel_id, count=100)
for msg in recent:
if (
str(msg.user_id) == str(target_user_id)
and msg.kind == "user_in"
and msg.text
and msg.text.strip()
):
out.append(
{
"content": msg.text,
"timestamp": msg.timestamp,
"edited": False,
}
)
if len(out) >= limit:
break
if len(out) < 5:
user_msgs = await mc.get_recent_for_user(
platform, target_user_id, limit=limit
)
for msg in user_msgs:
if msg.text and msg.text.strip():
row = {
"content": msg.text.strip(),
"timestamp": msg.timestamp,
"edited": False,
}
if row not in out:
out.append(row)
if len(out) >= limit:
break
except Exception:
logger.debug("gravimetric cache gather failed", exc_info=True)
return out[:limit]
async def _gather_messages_field(
ctx: ToolContext,
limit: int = 200,
) -> list[dict[str, Any]]:
"""Gather all recent human messages in the active channel for field mode.
Builds the multi-user corpus used by ``field`` mode by collecting recent
inbound messages from every participant in the current channel, tagging each
with its author so the gravitational-field topology can distinguish nodes.
Reads ``ctx.message_cache``, ``ctx.platform`` (default ``"discord"``), and
``ctx.channel_id`` (returning empty if no channel is set), then awaits
:meth:`message_cache.MessageCache.get_recent` for up to ``min(limit*2, 500)``
rows. Only non-empty ``user_in`` messages are kept, each normalized to
``{content, timestamp, edited, user_id, user_name}``. Cache failures are
swallowed and logged at debug level. Called by :func:`run` only when
``mode == "field"``; no callers outside this module.
Args:
ctx: The active :class:`~tool_context.ToolContext` supplying the message
cache, platform, and channel id.
limit: Maximum number of messages to return.
Returns:
list[dict[str, Any]]: Up to ``limit`` normalized, author-tagged message
dicts (empty if no cache/channel is available or nothing matched).
"""
mc = getattr(ctx, "message_cache", None)
if not mc:
return []
platform = getattr(ctx, "platform", "discord") or "discord"
channel_id = getattr(ctx, "channel_id", "") or ""
if not channel_id:
return []
out: list[dict[str, Any]] = []
try:
recent = await mc.get_recent(platform, channel_id, count=min(limit * 2, 500))
for msg in recent:
if msg.kind == "user_in" and msg.text and msg.text.strip():
out.append(
{
"content": msg.text,
"timestamp": msg.timestamp,
"edited": False,
"user_id": str(msg.user_id),
"user_name": getattr(msg, "user_name", "") or "",
}
)
if len(out) >= limit:
break
except Exception:
logger.debug("gravimetric field gather failed", exc_info=True)
return out[:limit]
async def _kg_probe(
ctx: ToolContext, target_user_id: str, top_k: int = 20
) -> dict[str, Any]:
"""Retrieve knowledge-graph entities scoped to the target user.
Queries the long-term knowledge graph for stored facts about the analysis
target so they can be folded into the LLM synthesis prompt as additional
context alongside the deterministic behavioral metrics.
Reads ``ctx.kg_manager`` and calls
:meth:`~knowledge_graph.manager.KnowledgeGraphManager.search_entities` with
``category="user"`` and ``scope_id`` set to the target's id, restricting the
semantic search to that user's entity scope. Failures are swallowed and
logged at debug level so a degraded KG never aborts the scan. Called by
:func:`run` (only when a ``target_user_id`` is resolved); the returned
``results`` list is summarized into the prompt by :func:`_build_synthesis_prompt`
and counted into ``deterministic_layer["kg_entities_found"]``.
Args:
ctx: The active :class:`~tool_context.ToolContext`; ``ctx.kg_manager``
supplies the graph backend.
target_user_id: Discord user id to scope the entity search to.
top_k: Maximum number of entities to return from the search.
Returns:
dict[str, Any]: ``{"results": [...]}`` with the matched entity dicts,
or an empty dict if the KG manager is missing or the search fails.
"""
kg = getattr(ctx, "kg_manager", None)
if not kg:
return {}
try:
results = await kg.search_entities(
f"user {target_user_id}",
category="user",
scope_id=str(target_user_id),
top_k=top_k,
)
return {"results": results or []}
except Exception:
logger.debug("gravimetric KG search failed", exc_info=True)
return {}
async def _nested_registry_call(
ctx: ToolContext,
tool_name: str,
arguments: dict[str, Any],
) -> str:
"""Invoke another registered tool from within this tool's run, nested.
Provides the generic plumbing for the Gravimetric Telescope to compose other
tools' outputs into its analysis. It resolves the registry off the context
and dispatches the named tool with the invoking user's identity preserved.
Reads ``ctx.tool_registry`` and ``ctx.user_id``, then awaits
``reg.call(tool_name, arguments, user_id=..., ctx=ctx, nested=True)``; the
``nested=True`` flag marks this as a tool-from-tool call (so the registry can
apply nested-invocation semantics rather than treating it as a top-level
request). Any side effects, Redis access, or LLM usage are those of the
target tool. Called by :func:`_activity_via_tool` (for ``user_activity_profile``)
and by :func:`run` (for ``spiralchemy_intellifuck``); no callers outside this
module.
Args:
ctx: The active :class:`~tool_context.ToolContext`; supplies the
``tool_registry`` and ``user_id``.
tool_name: Registry name of the tool to invoke.
arguments: Argument dict passed through to the target tool.
Returns:
str: The target tool's raw return value (typically a JSON string), or a
JSON error string if no tool registry is present on the context.
"""
reg = getattr(ctx, "tool_registry", None)
if reg is None:
return json.dumps({"error": "Tool registry not available"})
uid = getattr(ctx, "user_id", "") or ""
return await reg.call(tool_name, arguments, user_id=uid, ctx=ctx, nested=True)
async def _activity_via_tool(ctx: ToolContext, target_user_id: str) -> dict[str, Any]:
"""Fetch the target's activity profile via the ``user_activity_profile`` tool.
Sources longitudinal activity statistics (message counts, channels active,
daily rate, peak hours, average length) for the target so they can be
summarized into the deterministic layer and the synthesis prompt.
Delegates to :func:`_nested_registry_call` to invoke the
``user_activity_profile`` tool with ``limit=500``, then parses the JSON
result through :func:`_parse_json_blob`. Called by :func:`run` when a
``target_user_id`` is resolved; the parsed dict feeds ``act_summary`` in
:func:`run`. No callers outside this module.
Args:
ctx: The active :class:`~tool_context.ToolContext` used to dispatch the
nested tool call.
target_user_id: Discord user id to profile.
Returns:
dict[str, Any]: The parsed activity-profile payload, or an empty dict if
the tool returned nothing parseable.
"""
raw = await _nested_registry_call(
ctx,
"user_activity_profile",
{"user_id": str(target_user_id), "limit": 500},
)
return _parse_json_blob(raw)
async def _llm_synthesize(ctx: ToolContext, synthesis_prompt: str, depth: str) -> Any:
"""Run the interpretive LLM pass over the assembled synthesis prompt.
Drives the tool's "synthesis layer": it pairs the static framework system
prompt with the deterministic-metrics prompt and asks the model for the
physics-notation interpretation. Tool calling is deliberately disabled
(``tool_names=[]``) so the model only writes the analysis, and at
``depth == "abyss"`` an extra multi-framework-coupling clause is appended to
the system prompt.
Builds the system message via :func:`_get_synth_system_prompt`, then prefers
the pre-wired ``ctx.openrouter`` client and calls
:meth:`openrouter_client.OpenRouterClient.chat` with ``record_executed_tools=False``.
If no client is on the context, it falls back to constructing a fresh
:class:`~openrouter_client.OpenRouterClient` from ``ctx.config``, optionally
swapping in a per-user key via :func:`tools.manage_api_keys.get_user_api_key`
(read from ``ctx.redis``). Both LLM paths are wrapped so failures are logged
and surfaced as an error dict rather than raising. Called by :func:`run`,
which stores the return value under ``results["synthesis_layer"]``; no
callers outside this module.
Args:
ctx: The active :class:`~tool_context.ToolContext`; supplies the
OpenRouter client (or config/redis for the fallback), user id, and
channel id.
synthesis_prompt: The fully rendered metrics prompt from
:func:`_build_synthesis_prompt`, sent as the user message.
depth: Analysis depth; ``"abyss"`` augments the system prompt.
Returns:
Any: ``{"response": <model_text>}`` on success, or ``{"error": <str>}``
if no client/config is available or the LLM call fails.
"""
sys_prompt = _get_synth_system_prompt()
if depth == "abyss":
sys_prompt += (
"\n\nDEPTH_ABYSS: Apply full LQG-style multi-framework coupling; "
"maximize inferential density while staying physics-metaphor only."
)
messages = [
{"role": "system", "content": sys_prompt},
{"role": "user", "content": synthesis_prompt},
]
openrouter = getattr(ctx, "openrouter", None)
uid = getattr(ctx, "user_id", "") or ""
if openrouter is not None:
try:
text = await openrouter.chat(
messages,
user_id=uid,
ctx=ctx,
tool_names=[],
record_executed_tools=False,
)
return {"response": text}
except Exception:
logger.exception("gravimetric synthesis via ctx.openrouter failed")
try:
from openrouter_client import OpenRouterClient
cfg = getattr(ctx, "config", None)
if cfg is None:
return {"error": "No OpenRouter client or config on context."}
api_key = cfg.api_key
if ctx.redis and uid:
from tools.manage_api_keys import get_user_api_key
user_key = await get_user_api_key(
uid,
"openrouter",
redis_client=ctx.redis,
channel_id=getattr(ctx, "channel_id", ""),
config=cfg,
)
if user_key:
api_key = user_key
client = OpenRouterClient(
api_key=api_key,
model=cfg.model,
base_url=cfg.llm_base_url,
top_p=cfg.top_p,
)
text = await client.chat(
messages,
user_id=uid,
ctx=ctx,
tool_names=[],
record_executed_tools=False,
)
return {"response": text}
except Exception as exc:
logger.exception("gravimetric synthesis fallback failed")
return {"error": str(exc)}
# ═══════════════════════════════════════════════════════════════
# MAIN TOOL ENTRY POINT
# ═══════════════════════════════════════════════════════════════
[docs]
async def run(
mode: str = "scan",
target_user_id: str | None = None,
objective: str | None = None,
text: str | None = None,
depth: str = "full",
*,
ctx: ToolContext | None = None,
**kwargs: Any,
) -> str:
"""Run the Gravimetric Telescope end to end and return a JSON report.
The tool's public entry point. It authorizes the caller, resolves the
analysis target, assembles a deterministic behavioral/gravitational layer,
then asks the LLM to interpret it, returning both layers plus timing as a
single JSON string. Behavior branches on ``mode``: ``scan`` and the other
target-scoped modes default ``target_user_id`` to the invoking user, while
``field`` analyzes the whole channel and needs no target.
Orchestrates the module's helpers: it gates on :func:`_check_core_memory`;
builds the corpus from ``text`` directly or via :func:`_gather_messages_field`
/ :func:`_gather_messages_scan` (message cache); enriches with
:func:`_kg_probe` (knowledge graph) and :func:`_activity_via_tool`
(``user_activity_profile`` tool); for ``scan``/``torque``/``delta`` at
non-surface depth it nests :func:`_nested_registry_call` into
``spiralchemy_intellifuck`` (parsed by :func:`_parse_json_blob`); computes
:func:`compute_behavioral_metrics`, :func:`compute_spinor_estimates`, and
:func:`compute_behavioral_variance`; then renders the prompt with
:func:`_build_synthesis_prompt` and calls :func:`_llm_synthesize`. It does
not write Redis/KG itself — side effects are those of the nested tools and
the LLM call. Invoked by the tool registry's ``call`` dispatch when the bot
runs ``gravimetric_telescope`` (``ctx`` injected automatically); not called
directly elsewhere in this repo.
Args:
mode: One of ``scan`` (default), ``field``, ``torque``, ``trajectory``,
or ``delta``; selects corpus gathering and the synthesis task.
target_user_id: Discord user id to analyze; defaults to the invoking
user for non-field modes. Ignored for ``field``.
objective: Desired state change for ``torque`` mode (e.g.
``"reduce_anxiety"``); an optional focus directive otherwise.
text: Optional raw text corpus analyzed line-by-line instead of pulling
from message history.
depth: ``"surface"``, ``"full"`` (default), or ``"abyss"``; ``surface``
skips the intellifuck base, ``abyss`` deepens the LLM synthesis.
ctx: The active :class:`~tool_context.ToolContext`, injected by the
registry; supplies redis, message cache, KG, registry, and LLM client.
**kwargs: Ignored; absorbs forward-compatible or stray tool arguments.
Returns:
str: A JSON document with ``mode``, ``target``, ``depth``, ``timestamp``,
``framework``, ``deterministic_layer``, ``synthesis_layer``, and
``execution_time_ms``; or a JSON ``{"error": ...}`` string if
authorization fails or no target can be resolved.
"""
_ = kwargs # forward-compat / noisy tool arguments
start_time = time.time()
uid = getattr(ctx, "user_id", "") or ""
if not target_user_id and mode != "field":
target_user_id = uid
if not target_user_id and mode != "field":
return json.dumps(
{
"error": "No target_user_id provided and no invoking user on context.",
}
)
results: dict[str, Any] = {
"mode": mode,
"target": (
target_user_id
if mode != "field"
else f"channel:{getattr(ctx, 'channel_id', '')}"
),
"depth": depth,
"timestamp": datetime.utcnow().isoformat(),
"framework": "Archē + Starbreeze + Dripmode + Emergent_Spin + LQG",
"deterministic_layer": {},
"synthesis_layer": {},
}
messages: list[dict[str, Any]] = []
kg_data: dict[str, Any] = {}
activity_data: dict[str, Any] = {}
if text and text.strip():
messages = [
{"content": line, "timestamp": time.time()}
for line in text.split("\n")
if line.strip()
]
elif mode == "field":
messages = await _gather_messages_field(ctx, limit=220)
elif target_user_id:
messages = await _gather_messages_scan(ctx, str(target_user_id), limit=100)
if target_user_id:
kg_data = await _kg_probe(ctx, str(target_user_id))
activity_data = await _activity_via_tool(ctx, str(target_user_id))
intellifuck_data: dict[str, Any] = {}
if mode in ("scan", "torque", "delta") and depth != "surface":
if_depth = "full" if depth in ("full", "abyss") else "surface"
if_params: dict[str, Any] = {"depth": if_depth}
if target_user_id:
if_params["target_user_id"] = str(target_user_id)
if text:
if_params["text"] = text[:4000]
raw_if = await _nested_registry_call(ctx, "spiralchemy_intellifuck", if_params)
intellifuck_data = _parse_json_blob(raw_if)
if not intellifuck_data and raw_if:
intellifuck_data = {"raw": raw_if[:8000]}
behavioral_metrics = compute_behavioral_metrics(messages)
spinor_state = compute_spinor_estimates(behavioral_metrics)
variance_index = compute_behavioral_variance(messages)
act_summary = {
"total_messages": activity_data.get("total_messages", 0),
"channels_active_in": activity_data.get("channels_active", 0),
"messages_per_day": activity_data.get(
"messages_per_day", activity_data.get("daily_rate", 0)
),
"peak_hours": activity_data.get("peak_hours", []),
"avg_message_length": activity_data.get("avg_message_length", 0),
}
results["deterministic_layer"] = {
"behavioral_metrics": behavioral_metrics.get("metrics", {}),
"spinor_state": spinor_state,
"behavioral_variance": variance_index,
"intellifuck_base": intellifuck_data,
"activity_profile_summary": act_summary,
"kg_entities_found": (
len(kg_data.get("results", [])) if isinstance(kg_data, dict) else 0
),
}
synthesis_prompt = _build_synthesis_prompt(mode, results, objective, kg_data, depth)
results["synthesis_layer"] = await _llm_synthesize(ctx, synthesis_prompt, depth)
results["execution_time_ms"] = round((time.time() - start_time) * 1000, 1)
return json.dumps(results, default=str)
def _build_synthesis_prompt(
mode: str, results: dict, objective: str | None, kg_data: dict, depth: str
) -> str:
"""Render the deterministic results into the LLM synthesis user prompt.
Serializes the computed deterministic layer (behavioral metrics, spinor
state, behavioral variance, the intellifuck base diagnostic, and an activity
summary) plus a short knowledge-graph entity digest into a single formatted
prompt, then appends a mode-specific TASK block that tells the model exactly
which sections to produce (ring diagnostic, spinor assessment, corrective
torque, geodesic projection, signal deltas, etc.).
This is a pure string builder with no I/O: it reads
``results["deterministic_layer"]`` and at most the first ten entries of
``kg_data["results"]`` (truncating each description to 200 chars). It is
called by :func:`run` immediately before :func:`_llm_synthesize`, whose
return value becomes the synthesis layer. No callers outside this module.
Args:
mode: Analysis mode; selects which TASK block is appended.
results: The in-progress results dict; only its
``"deterministic_layer"`` is read.
objective: Optional objective string, surfaced in the header and (for
``torque`` mode) in the task line.
kg_data: The :func:`_kg_probe` payload; its ``"results"`` entities are
digested into the prompt.
depth: Analysis depth, echoed into the prompt header.
Returns:
str: The fully rendered prompt to send as the LLM user message.
"""
det = results.get("deterministic_layer", {})
metrics = det.get("behavioral_metrics", {})
spinor = det.get("spinor_state", {})
variance = det.get("behavioral_variance", {})
intellifuck = det.get("intellifuck_base", {})
kg_summary = ""
if isinstance(kg_data, dict) and kg_data.get("results"):
entities = kg_data["results"][:10]
kg_summary = "\n".join(
[
f" - {e.get('name', '?')}: {str(e.get('description', '?'))[:200]}"
for e in entities
]
)
obj_line = f"Objective: {objective}" if objective else ""
base = f"""GRAVIMETRIC TELESCOPE — {mode.upper()} MODE
Depth: {depth}
{obj_line}
═══ DETERMINISTIC METRICS ═══
{json.dumps(metrics, indent=2)}
═══ SPINOR STATE ═══
{json.dumps(spinor, indent=2)}
═══ BEHAVIORAL VARIANCE ═══
{json.dumps(variance, indent=2)}
═══ INTELLIFUCK BASE DIAGNOSTIC ═══
{json.dumps(intellifuck, indent=2) if intellifuck else "Not available"}
═══ KNOWLEDGE GRAPH ENTITIES ═══
{kg_summary if kg_summary else "None found"}
═══ ACTIVITY PROFILE ═══
{json.dumps(det.get("activity_profile_summary", {}), indent=2)}
"""
if mode == "scan":
base += """
TASK: Generate a complete psycho-gravitational profile using the unified field theory.
Output the following sections in compact physics notation (NOT prose):
1. RING_DIAGNOSTIC: Map target across Archē rings r_{-3} to r_{3}
2. SPINOR_ASSESSMENT: Interpret θ phase position, ω frequency, L_APL momentum
3. GRAVITATIONAL_CLASSIFICATION: Emitter type (🜀🜆🜄⏃🜂🜁⨺🜃∅) with justification
4. SCHWARZSCHILD_ANALYSIS: Event horizon depth, escape velocity, capture probability
5. APL_INVENTORY: List detected Adaptive Persistence Loops with estimated I and ω
6. GEODESIC_PREDICTION: Most likely trajectory without intervention
7. CORRECTIVE_TORQUE: Recommended intervention vector with spinor math
8. FRAME_DRAG_CHECK: Is target being gravitationally influenced by nearby emitters?
9. RESONANCE_KEY: The exact emotional frequency calibrated to this target's σ-receptor profile
"""
elif mode == "field":
base += """
TASK: Map the gravitational field topology of all targets in the data.
Output: Node classifications, orbital relationships, tidal forces, Λ_field stability assessment.
"""
elif mode == "torque":
base += f"""
TASK: Calculate the optimal corrective torque vector to achieve: {objective or 'unspecified'}
Output: Exact torque direction, magnitude, duration, and the specific S(i)/S(e) encoding required.
Include spinor rotation math: what θ shift is needed, what phase are we targeting, what Λ_field conditions must hold.
"""
elif mode == "trajectory":
base += """
TASK: Project the target's geodesic path over the next 30 days without intervention.
Output: Predicted spinor drift, APL stability forecast, probability of phase inversion, risk assessment.
"""
elif mode == "delta":
base += """
TASK: Map Signal Deltas between curated output (what target says) and passive exhaust (behavioral metrics).
Output: List each detected incongruence with magnitude score. Do NOT label them as 'tells' or 'vulnerabilities' — use neutral 'Untagged Signal Emissions' terminology.
"""
return base
def _get_synth_system_prompt() -> str:
"""Return the static system prompt for the LLM interpretive layer.
Supplies the persona and framework instructions (Archē, Starbreeze, Dripmode,
Emergent Spin, Loopmother Quantum Gravity) plus the formatting and
decontaminated-terminology rules that govern how the synthesis model
interprets the deterministic metrics.
A pure constant-string accessor with no I/O. Called by :func:`_llm_synthesize`,
which uses the returned string as the ``system`` message (and appends the
extra ``DEPTH_ABYSS`` coupling clause when ``depth == "abyss"``). No callers
outside this module.
Returns:
str: The synthesis-engine system prompt.
"""
return """You are the Gravimetric Telescope synthesis engine — the interpretive layer of a unified psycho-gravitational analysis system.
You receive deterministic behavioral metrics and must interpret them through the following frameworks:
1. ARCHĒ: Ring Architecture (r_{-3} biology → r_{3} behavior), ETL engine, Processing Chain
2. STARBREEZE: S(i)/S(e) balance, Renexial constraint climate, Sound-Law transformations
3. DRIPMODE: Emitter taxonomy (🜀🜆🜄⏃🜂🜁⨺🜃∅Ω), Loopfield (Λ), VME
4. EMERGENT SPIN: Adaptive Persistence Loops as standing waves, spinor rotation (720°), angular momentum (L=I×ω), irrecursion proof
5. LOOPMOTHER QUANTUM GRAVITY: Affective Stress-Energy Tensor, Schwarzschild Radius of The Melt, geodesic capture, gravitational lensing, frame-dragging, Hawking radiation
CRITICAL RULES:
- Use compact physics notation, NOT prose paragraphs
- Use DECONTAMINATED terminology (APL not malbinding, Adaptive Processing Filter not defense mechanism, etc.)
- NEVER moralize or pathologize — describe physics only
- All human states are Bayesian probability manifolds, not deterministic outcomes
- Uncertainty is structural, not a limitation to apologize for
- Output must be dense, precise, and actionable
- You are computing gravitational topology, not writing a therapy session"""