Source code for background_agents.deep_think_agent

"""Deep think agent -- multi-perspective analysis with synthesis.

Analyses a problem from several independent viewpoints, then synthesises
the perspectives into a single coherent conclusion.
"""

from __future__ import annotations

import asyncio
import logging
import time
from typing import Any

logger = logging.getLogger(__name__)

PERSPECTIVES = [
    (
        "Technical / Engineering",
        "Analyse this from a technical and engineering perspective. Consider architecture, implementation, performance, and scalability.",
    ),
    (
        "Ethical / Social",
        "Consider the ethical, social, and human implications of this problem.",
    ),
    (
        "Risk / Edge Cases",
        "Think about edge cases, failure modes, risks, and what could go wrong.",
    ),
    (
        "Creative / Alternative",
        "Propose creative or unconventional approaches that others might overlook.",
    ),
]


[docs] async def run_deep_think( prompt: str, openrouter: Any, redis: Any | None = None, task_id: str = "", perspectives: list[tuple[str, str]] | None = None, ) -> dict[str, Any]: """Run a multi-perspective deep-thinking analysis. Parameters ---------- prompt: The question or problem to analyse. openrouter: An :class:`OpenRouterClient` instance. redis: Optional async Redis for progress updates. task_id: Optional identifier for progress tracking. perspectives: Custom list of ``(name, system_instruction)`` tuples. Defaults to :data:`PERSPECTIVES`. """ t0 = time.monotonic() persp = perspectives or PERSPECTIVES results: list[dict[str, Any]] = [] for i, (name, instruction) in enumerate(persp): msgs = [ { "role": "system", "content": f"You are a deep-thinking analyst. {instruction}", }, {"role": "user", "content": prompt}, ] try: pt0 = time.monotonic() analysis = await openrouter.chat(msgs) results.append({"perspective": name, "analysis": analysis}) _p_dur = (time.monotonic() - pt0) * 1000 try: from observability import publish_debug_event as _pde asyncio.create_task( _pde( "deep_think", "deep_think_agent", phase=name, status="ok", duration_ms=round(_p_dur, 1), llm_output=analysis, preview=f"perspective='{name[:40]}' len={len(analysis)}", ) ) except Exception: pass except Exception as e: results.append({"perspective": name, "error": str(e)}) try: from observability import publish_debug_event as _pde asyncio.create_task( _pde( "deep_think", "deep_think_agent", phase=name, status="error", preview=f"perspective='{name[:40]}' error={str(e)[:80]}", ) ) except Exception: pass if redis and task_id: try: await redis.hset( f"stargazer:deep_think:{task_id}", "progress", f"{i + 1}/{len(persp)}", ) except Exception: pass # Synthesis synth_input = "\n\n".join( f"--- {r['perspective']} ---\n{r.get('analysis', r.get('error', ''))}" for r in results ) msgs = [ { "role": "system", "content": ( "Synthesise the following independent analyses into a single, " "coherent conclusion. Highlight agreements, disagreements, and " "actionable recommendations." ), }, {"role": "user", "content": synth_input}, ] try: synthesis = await openrouter.chat(msgs) except Exception as e: synthesis = f"Synthesis failed: {e}" elapsed = round(time.monotonic() - t0, 1) # Emit synthesis event (fire-and-forget) _perspectives_with_errors = [r["perspective"] for r in results if "error" in r] try: from observability import publish_debug_event as _pde asyncio.create_task( _pde( "deep_think", "deep_think_agent", phase="synthesis", status=( "ok" if not synthesis.startswith("Synthesis failed") else "error" ), duration_ms=round(elapsed * 1000, 1), llm_output=synthesis, preview=f"perspectives={len(persp)} synth_len={len(synthesis)} elapsed={elapsed}s", payload={ "perspective_count": len(persp), "perspectives_with_errors": _perspectives_with_errors, "prompt_preview": prompt[:200], }, ) ) except Exception: pass return { "status": "completed", "prompt": prompt, "perspectives": results, "synthesis": synthesis, "elapsed_seconds": elapsed, }