"""Deep think agent -- multi-perspective analysis with synthesis.
Analyses a problem from several independent viewpoints, then synthesises
the perspectives into a single coherent conclusion.
"""
from __future__ import annotations
import asyncio
import logging
import time
from typing import Any
logger = logging.getLogger(__name__)
PERSPECTIVES = [
(
"Technical / Engineering",
"Analyse this from a technical and engineering perspective. Consider architecture, implementation, performance, and scalability.",
),
(
"Ethical / Social",
"Consider the ethical, social, and human implications of this problem.",
),
(
"Risk / Edge Cases",
"Think about edge cases, failure modes, risks, and what could go wrong.",
),
(
"Creative / Alternative",
"Propose creative or unconventional approaches that others might overlook.",
),
]
[docs]
async def run_deep_think(
prompt: str,
openrouter: Any,
redis: Any | None = None,
task_id: str = "",
perspectives: list[tuple[str, str]] | None = None,
) -> dict[str, Any]:
"""Run a multi-perspective deep-thinking analysis.
Parameters
----------
prompt:
The question or problem to analyse.
openrouter:
An :class:`OpenRouterClient` instance.
redis:
Optional async Redis for progress updates.
task_id:
Optional identifier for progress tracking.
perspectives:
Custom list of ``(name, system_instruction)`` tuples.
Defaults to :data:`PERSPECTIVES`.
"""
t0 = time.monotonic()
persp = perspectives or PERSPECTIVES
results: list[dict[str, Any]] = []
for i, (name, instruction) in enumerate(persp):
msgs = [
{
"role": "system",
"content": f"You are a deep-thinking analyst. {instruction}",
},
{"role": "user", "content": prompt},
]
try:
pt0 = time.monotonic()
analysis = await openrouter.chat(msgs)
results.append({"perspective": name, "analysis": analysis})
_p_dur = (time.monotonic() - pt0) * 1000
try:
from observability import publish_debug_event as _pde
asyncio.create_task(
_pde(
"deep_think",
"deep_think_agent",
phase=name,
status="ok",
duration_ms=round(_p_dur, 1),
llm_output=analysis,
preview=f"perspective='{name[:40]}' len={len(analysis)}",
)
)
except Exception:
pass
except Exception as e:
results.append({"perspective": name, "error": str(e)})
try:
from observability import publish_debug_event as _pde
asyncio.create_task(
_pde(
"deep_think",
"deep_think_agent",
phase=name,
status="error",
preview=f"perspective='{name[:40]}' error={str(e)[:80]}",
)
)
except Exception:
pass
if redis and task_id:
try:
await redis.hset(
f"stargazer:deep_think:{task_id}",
"progress",
f"{i + 1}/{len(persp)}",
)
except Exception:
pass
# Synthesis
synth_input = "\n\n".join(
f"--- {r['perspective']} ---\n{r.get('analysis', r.get('error', ''))}"
for r in results
)
msgs = [
{
"role": "system",
"content": (
"Synthesise the following independent analyses into a single, "
"coherent conclusion. Highlight agreements, disagreements, and "
"actionable recommendations."
),
},
{"role": "user", "content": synth_input},
]
try:
synthesis = await openrouter.chat(msgs)
except Exception as e:
synthesis = f"Synthesis failed: {e}"
elapsed = round(time.monotonic() - t0, 1)
# Emit synthesis event (fire-and-forget)
_perspectives_with_errors = [r["perspective"] for r in results if "error" in r]
try:
from observability import publish_debug_event as _pde
asyncio.create_task(
_pde(
"deep_think",
"deep_think_agent",
phase="synthesis",
status=(
"ok" if not synthesis.startswith("Synthesis failed") else "error"
),
duration_ms=round(elapsed * 1000, 1),
llm_output=synthesis,
preview=f"perspectives={len(persp)} synth_len={len(synthesis)} elapsed={elapsed}s",
payload={
"perspective_count": len(persp),
"perspectives_with_errors": _perspectives_with_errors,
"prompt_preview": prompt[:200],
},
)
)
except Exception:
pass
return {
"status": "completed",
"prompt": prompt,
"perspectives": results,
"synthesis": synthesis,
"elapsed_seconds": elapsed,
}