"""Research agent -- multi-step research with tool usage.
Performs comprehensive research by:
1. Creating a research plan
2. Executing each step (web search, scraping, etc.)
3. Synthesising findings into a structured report
Designed to be invoked from the ``research_tool`` or directly.
"""
from __future__ import annotations
import json
import logging
import time
from typing import Any
logger = logging.getLogger(__name__)
[docs]
async def run_research(
query: str,
openrouter: Any,
redis: Any | None = None,
task_id: str = "",
) -> dict[str, Any]:
"""Execute a full research workflow and return the report.
Parameters
----------
query:
The research question.
openrouter:
An :class:`OpenRouterClient` instance (provides ``.chat()``).
redis:
Optional async Redis client for progress updates.
task_id:
Optional identifier for progress tracking in Redis.
"""
t0 = time.monotonic()
# Phase 1 -- plan
plan = await _plan(query, openrouter)
if redis and task_id:
await _progress(redis, task_id, "plan_created")
# Phase 2 -- execute
raw_findings = await _execute(query, plan, openrouter)
if redis and task_id:
await _progress(redis, task_id, "research_complete")
# Phase 3 -- synthesise
report = await _synthesise(query, raw_findings, openrouter)
elapsed = round(time.monotonic() - t0, 1)
return {
"status": "completed",
"query": query,
"plan": plan,
"raw_findings": raw_findings,
"report": report,
"elapsed_seconds": elapsed,
}
# ------------------------------------------------------------------
# Internal phases
# ------------------------------------------------------------------
async def _plan(query: str, openrouter: Any) -> str:
"""Internal helper: plan.
Args:
query (str): Search query or input string.
openrouter (Any): The openrouter value.
Returns:
str: Result string.
"""
msgs = [
{"role": "system", "content": (
"You are a research planner. Given a query, produce a concise, "
"numbered research plan (3-5 steps). Each step should describe "
"what information to gather and how."
)},
{"role": "user", "content": query},
]
return await openrouter.chat(msgs)
async def _execute(query: str, plan: str, openrouter: Any) -> str:
"""Internal helper: execute.
Args:
query (str): Search query or input string.
plan (str): The plan value.
openrouter (Any): The openrouter value.
Returns:
str: Result string.
"""
msgs = [
{"role": "system", "content": (
"You are a research agent. Execute the plan step by step. "
"Use tools like brave_web_search and scrape_webpage when "
"available. Report your findings for each step."
)},
{"role": "user", "content": (
f"Research query: {query}\n\nPlan:\n{plan}\n\n"
"Execute each step and report findings."
)},
]
return await openrouter.chat(msgs)
async def _synthesise(query: str, findings: str, openrouter: Any) -> str:
"""Internal helper: synthesise.
Args:
query (str): Search query or input string.
findings (str): The findings value.
openrouter (Any): The openrouter value.
Returns:
str: Result string.
"""
msgs = [
{"role": "system", "content": (
"Synthesise the raw research findings into a clear, "
"well-structured report. Include key takeaways and "
"cite sources where possible."
)},
{"role": "user", "content": (
f"Original query: {query}\n\nFindings:\n{findings}"
)},
]
return await openrouter.chat(msgs)
async def _progress(redis: Any, task_id: str, stage: str) -> None:
"""Internal helper: progress.
Args:
redis (Any): The redis value.
task_id (str): Background task identifier.
stage (str): The stage value.
"""
try:
await redis.hset(
f"stargazer:research:{task_id}", "progress", stage,
)
except Exception:
pass