"""Deep think agent -- multi-perspective analysis with synthesis.
Analyses a problem from several independent viewpoints, then synthesises
the perspectives into a single coherent conclusion.
"""
from __future__ import annotations
import logging
import time
from typing import Any
logger = logging.getLogger(__name__)
PERSPECTIVES = [
("Technical / Engineering", "Analyse this from a technical and engineering perspective. Consider architecture, implementation, performance, and scalability."),
("Ethical / Social", "Consider the ethical, social, and human implications of this problem."),
("Risk / Edge Cases", "Think about edge cases, failure modes, risks, and what could go wrong."),
("Creative / Alternative", "Propose creative or unconventional approaches that others might overlook."),
]
[docs]
async def run_deep_think(
prompt: str,
openrouter: Any,
redis: Any | None = None,
task_id: str = "",
perspectives: list[tuple[str, str]] | None = None,
) -> dict[str, Any]:
"""Run a multi-perspective deep-thinking analysis.
Parameters
----------
prompt:
The question or problem to analyse.
openrouter:
An :class:`OpenRouterClient` instance.
redis:
Optional async Redis for progress updates.
task_id:
Optional identifier for progress tracking.
perspectives:
Custom list of ``(name, system_instruction)`` tuples.
Defaults to :data:`PERSPECTIVES`.
"""
t0 = time.monotonic()
persp = perspectives or PERSPECTIVES
results: list[dict[str, Any]] = []
for i, (name, instruction) in enumerate(persp):
msgs = [
{"role": "system", "content": f"You are a deep-thinking analyst. {instruction}"},
{"role": "user", "content": prompt},
]
try:
analysis = await openrouter.chat(msgs)
results.append({"perspective": name, "analysis": analysis})
except Exception as e:
results.append({"perspective": name, "error": str(e)})
if redis and task_id:
try:
await redis.hset(
f"stargazer:deep_think:{task_id}",
"progress", f"{i + 1}/{len(persp)}",
)
except Exception:
pass
# Synthesis
synth_input = "\n\n".join(
f"--- {r['perspective']} ---\n{r.get('analysis', r.get('error', ''))}"
for r in results
)
msgs = [
{"role": "system", "content": (
"Synthesise the following independent analyses into a single, "
"coherent conclusion. Highlight agreements, disagreements, and "
"actionable recommendations."
)},
{"role": "user", "content": synth_input},
]
try:
synthesis = await openrouter.chat(msgs)
except Exception as e:
synthesis = f"Synthesis failed: {e}"
elapsed = round(time.monotonic() - t0, 1)
return {
"status": "completed",
"prompt": prompt,
"perspectives": results,
"synthesis": synthesis,
"elapsed_seconds": elapsed,
}