Source code for openrouter_client.executor

"""Stateful LLM completion and tool-execution loop orchestrator."""

from __future__ import annotations

import asyncio
import contextvars
import hashlib
import logging
import re
import secrets
import time
import uuid

def _current_time() -> float:
    import openrouter_client
    if hasattr(openrouter_client, "time") and hasattr(openrouter_client.time, "time"):
        return openrouter_client.time.time()
    return time.time()
from datetime import datetime, timezone
from typing import Any, Awaitable, Callable, TYPE_CHECKING

import jsonutil as json

from tool_context import ToolContext
from observability import publish_tool_event
from response_postprocessor import (
    extract_and_strip_thoughts,
    postprocess_intermediate_response,
)
from openrouter_client.error_handling import (
    _message_content_for_proxy_error_detection,
    _proxy_error_banner_present,
    _user_visible_proxy_chat_reply,
)

if TYPE_CHECKING:
    pass

logger = logging.getLogger(__name__)

# Per-chat mutable effective max tool rounds (single-element list).
_TOOL_ROUND_LIMIT_BOX: contextvars.ContextVar[list[int] | None] = (
    contextvars.ContextVar(
        "tool_round_limit_box",
        default=None,
    )
)


[docs] def get_tool_round_limit_box() -> list[int] | None: """Return the mutable ``[max_rounds]`` for the current chat invocation.""" return _TOOL_ROUND_LIMIT_BOX.get()
def _obs_tags(ctx: ToolContext | None) -> dict[str, str]: if ctx is None: return { "request_id": "", "channel_id": "", "user_id": "", "platform": "", } return { "request_id": ctx.observability_request_id or "", "channel_id": ctx.channel_id or "", "user_id": ctx.user_id or "", "platform": ctx.platform or "", } _HEADER_REGEN_MESSAGE = ( "[SYSTEM] This is a 'fake' system message, sent with the user role, to resolve a generation error. \n\n" "Your previous response did not comply with the system prompt's rules for " "header generation.\n\nIt is NOT currently visible to any users.\n\n" "CRITICAL: Do NOT output any internal reasoning, chain-of-thought, system instructions, " "tool schemas, or meta-commentary (e.g. '[Internal Thought Summary]', 'CRITICAL INSTRUCTION', " "'Related tools:', 'Task:'). These are PRIVATE and must NEVER appear in your response.\n\n" "Responses MUST begin with a response header bracket:\n\n" "[`model` :: emojis :: thought :: `tools`]\n\n" "This may occur due to misconfigurations at the API provider's end.\n\n" "Regenerate your response now, starting with the required header." ) _TRAILING_NON_USER_FILLER_BODY = """ [SYSTEM — infrastructure guard; not written by any human] The conversation passed to the model ended with a non-user turn (assistant or system). Several endpoints reject this shape outright ("must end with a user message"), and Claude refuses trailing assistant turns. This user-role placeholder was inserted to keep the request valid. Nothing was censored; the messages above are exactly what the pipeline produced. Look at the messages above and act accordingly: - If a human asked something that is still unanswered, answer it now. - If I was mid-task (tool loop, plan, ongoing action), continue from where I left off. - If there is nothing new to say, a short truthful acknowledgement is fine. This placeholder is NOT a user turn. I MUST NOT quote it, summarise it, claim a user typed it, explain "padding" or "prefill", or say I "already responded". It is invisible on the platform side. """.strip()
[docs] def build_trailing_non_user_filler() -> str: """Random SHA-256–tagged user-role filler for requests ending in non-user turns.""" h = hashlib.sha256(secrets.token_bytes(32)).hexdigest() return ( f"<system_notice_{h}>\n" f"{_TRAILING_NON_USER_FILLER_BODY}\n" f"</system_notice_{h}>" )
_ALLOWED_TRAILING_ROLES: frozenset[str] = frozenset({"user", "tool"}) def _ensure_trailing_user_turn( messages: list[dict[str, Any]], *, model_for_log: str, call_site: str, ) -> bool: """Append a synthetic ``user`` filler when *messages* ends with a non-user/tool turn. Several model endpoints reject requests whose final message role is not ``user`` or ``tool``. This guard mutates *messages* in place and is idempotent. Returns ``True`` iff a filler was appended. """ if not messages: return False last_role = messages[-1].get("role") if last_role in _ALLOWED_TRAILING_ROLES: return False messages.append( { "role": "user", "content": build_trailing_non_user_filler(), } ) logger.warning( "Trailing role %r not in {user, tool}; appended synthetic user filler " "(call_site=%s, model=%r, total_messages=%d)", last_role, call_site, model_for_log, len(messages), ) return True async def _collect_btw_messages( ctx: ToolContext | None, cursor_ts: float, triggering_message_id: str, ) -> tuple[list[str], float]: """Return formatted lines for cached messages newer than *cursor_ts*. Returns (lines, new_cursor_ts). Fail-open — never raises. """ if ( ctx is None or getattr(ctx, "message_cache", None) is None or not str(getattr(ctx, "platform", "") or "").strip() or not str(getattr(ctx, "channel_id", "") or "").strip() ): return [], cursor_ts try: cms = await ctx.message_cache.get_messages_after( ctx.platform, ctx.channel_id, after_ts_exclusive=cursor_ts, ) trigger = (triggering_message_id or "").strip() surviving = [ cm for cm in cms if not ( trigger and str(getattr(cm, "message_id", "") or "").strip() == trigger ) ] if not surviving: if cms: advanced = max( (float(getattr(cm, "timestamp", 0.0) or 0.0) for cm in cms), default=cursor_ts, ) return [], advanced return [], cursor_ts new_cursor = max( (float(getattr(cm, "timestamp", 0.0) or 0.0) for cm in surviving), default=cursor_ts, ) lines: list[str] = [] for cm in surviving: kind = str(getattr(cm, "kind", "") or "user_in").strip() or "user_in" if kind not in ("user_in", "assistant_out"): continue ts = float(getattr(cm, "timestamp", 0.0) or 0.0) ts_iso = datetime.fromtimestamp(ts, tz=timezone.utc).isoformat() uid = str(getattr(cm, "user_id", "") or "") uname = str(getattr(cm, "user_name", "") or "") msg_id = str(getattr(cm, "message_id", "") or "") txt = str(getattr(cm, "text", "") or "") prefix = f"[{ts_iso}] {uname} ({uid}) [Message ID: {msg_id}]" if kind == "user_in": rto = str(getattr(cm, "reply_to_id", "") or "").strip() if rto: prefix += f" [Replying to: {rto}]" prefix += " : " lines.append(prefix + txt) if not lines: return [], new_cursor return lines, new_cursor except Exception: logger.debug("Failed to collect btw messages", exc_info=True) return [], cursor_ts def _append_btw_suffix_to_last_tool_message( msgs: list[dict[str, Any]], suffix: str, ) -> None: """Append *suffix* to the text of the last ``role`` == ``tool`` message in *msgs*.""" if not msgs or msgs[-1].get("role") != "tool": return last = msgs[-1] last_content = last.get("content") if isinstance(last_content, str): last["content"] = last_content + suffix return if isinstance(last_content, list): for part in reversed(last_content): if isinstance(part, dict) and part.get("type") == "text": part["text"] = (part.get("text") or "") + suffix return last_content.append({"type": "text", "text": suffix.lstrip("\n")}) class OpenRouterExecutor: """Mixin orchestrator that executes the completion loop and manages tool calls.""" @staticmethod def _extract_reasoning_text(message: dict[str, Any]) -> str | None: """Return combined reasoning text from an API response message, or ``None``.""" parts: list[str] = [] details = message.get("reasoning_details") if isinstance(details, list): for entry in details: if isinstance(entry, dict): text = entry.get("thinking") or entry.get("text") or "" if text: parts.append(text) reasoning_str = message.get("reasoning") if isinstance(reasoning_str, str) and reasoning_str.strip(): if reasoning_str.strip() not in parts: parts.append(reasoning_str.strip()) return "\n\n".join(parts) if parts else None @staticmethod def _prepend_reasoning_to_content(message: dict[str, Any], content: str) -> str: """Prepend any out-of-band reasoning from *message* as ``<thinking>`` tags.""" reasoning = OpenRouterExecutor._extract_reasoning_text(message) if not reasoning: return content logger.debug( "Prepending out-of-band reasoning (%d chars) to content as <thinking> block", len(reasoning), ) thinking_block = f"<thinking>{reasoning}</thinking>" return f"{thinking_block}\n{content}" if content else thinking_block @staticmethod def _normalize_assistant_tool_message( message: dict[str, Any], ) -> dict[str, Any]: """Normalize an assistant message for conversation-history round-trips.""" out = dict(message) # -- 1. function_call -> tool_calls -- if not out.get("tool_calls"): fc = out.get("function_call") if isinstance(fc, dict) and fc.get("name"): args = fc.get("arguments") if not isinstance(args, str): args = json.dumps(args) if args is not None else "{}" out["tool_calls"] = [ { "id": f"legacy_{uuid.uuid4().hex[:16]}", "type": "function", "function": {"name": fc["name"], "arguments": args}, } ] out.pop("function_call", None) # -- 2. reasoning_details -> reasoning (for conversation history) -- reasoning_text = OpenRouterExecutor._extract_reasoning_text(out) if reasoning_text: out["reasoning"] = reasoning_text out.pop("reasoning_details", None) return out _COT_BRACKET_PATTERNS: tuple[re.Pattern[str], ...] = ( re.compile(r"^\[Internal Thought Summary\]", re.IGNORECASE), re.compile(r"^\[\(?System|User|Assistant\)?\s", re.IGNORECASE), re.compile(r"^\[(?:CRITICAL|WARNING|NOTE|IMPORTANT)\s", re.IGNORECASE), ) @staticmethod def _validate_header_sync(content: str) -> bool: """Return *True* if *content* starts with the required ``[`` header.""" cleaned, _thoughts = extract_and_strip_thoughts(content) cleaned = cleaned.lstrip() if not cleaned: has_thinking = bool(re.search(r"<think(?:ing|ought)>", content)) return has_thinking if not cleaned.startswith("["): return False # Reject known CoT bracket patterns that start with [ for pat in OpenRouterExecutor._COT_BRACKET_PATTERNS: if pat.match(cleaned): return False first_line = cleaned.split("\n", 1)[0] if first_line.startswith("[`") or first_line.startswith("[<code>"): return True if "::" in first_line and "]" in first_line: return True return False @staticmethod async def _validate_header(content: str) -> bool: """Async wrapper for the regex-heavy header validation.""" return await asyncio.to_thread( OpenRouterExecutor._validate_header_sync, content, ) async def _resolve_final_content_with_header_validation( self, content: str, msgs: list[dict[str, Any]], *, validate_header: bool, regen_tool_names: list[str] | None, user_id: str, max_video_url_parts: int | None = 5, override_model: str | None = None, override_api_key: str | None = None, override_chat_url: str | None = None, response_format: dict[str, Any] | None = None, toggle_specific: bool = False, ) -> str: """Validate header, clean intermediate posts, and handle regen retry once if invalid.""" if not validate_header or not content: return content normalized = postprocess_intermediate_response(content) if await self._validate_header(normalized): return normalized if await self._validate_header(content): return content return await self._regenerate_with_header_hint( msgs, content, tool_names=regen_tool_names, user_id=user_id, max_video_url_parts=max_video_url_parts, override_model=override_model, override_api_key=override_api_key, override_chat_url=override_chat_url, response_format=response_format, toggle_specific=toggle_specific, ) async def _regenerate_with_header_hint( self, msgs: list[dict[str, Any]], bad_content: str, *, tool_names: list[str] | None, user_id: str, max_video_url_parts: int | None = 5, override_model: str | None = None, override_api_key: str | None = None, override_chat_url: str | None = None, response_format: dict[str, Any] | None = None, toggle_specific: bool = False, ) -> str: """Recall the API once with an ephemeral correction message injection.""" logger.warning( "Response missing required header (starts with %r), regenerating with header hint", bad_content, ) msgs.append({"role": "assistant", "content": bad_content}) msgs.append({"role": "user", "content": _HEADER_REGEN_MESSAGE}) try: _t0 = time.monotonic() retry_response = await self._call_api( msgs, tool_names=[], user_id=user_id, ctx=None, max_video_url_parts=max_video_url_parts, override_model=override_model, override_api_key=override_api_key, override_chat_url=override_chat_url, response_format=response_format, toggle_specific=toggle_specific, ) logger.info( "Header-hint regeneration completed in %.0f ms", (time.monotonic() - _t0) * 1000, ) return retry_response.get("content") or bad_content except Exception as exc: logger.error("Header-hint regeneration failed: %s", exc) return bad_content @staticmethod def _detect_loop( round_history: list[set[tuple[str, str]]], exact_consecutive: int = 3, name_consecutive: int = 10, ) -> bool: """Return *True* if a tool-call loop is detected across recent execution rounds.""" if len(round_history) >= exact_consecutive: tail = round_history[-exact_consecutive:] intersection = set(tail[0]) for s in tail[1:]: intersection &= s if intersection: return True if len(round_history) >= name_consecutive: tail = round_history[-name_consecutive:] name_sets = [{n for n, _ in s} for s in tail] name_intersection = set(name_sets[0]) for ns in name_sets[1:]: name_intersection &= ns if name_intersection: return True return False async def chat( self, messages: list[dict[str, Any]], user_id: str = "", ctx: ToolContext | None = None, tool_names: list[str] | None = None, validate_header: bool = False, token_count: int | None = None, on_intermediate_text: Callable[[str], Awaitable[None]] | None = None, record_executed_tools: bool = True, max_video_url_parts: int | None = 5, override_model: str | None = None, override_api_key: str | None = None, override_chat_url: str | None = None, response_format: dict[str, Any] | None = None, toggle_specific: bool = False, ) -> str: """Send *messages* to the LLM and return the final assistant text.""" msgs = [dict(m) for m in messages] _ensure_trailing_user_turn( msgs, model_for_log=override_model or self.model, call_site="chat", ) _btw_cursor_ts = _current_time() _btw_trigger_msg_id = (ctx.message_id if ctx is not None else "") or "" if ctx is not None and record_executed_tools: ctx.tools_executed.clear() ctx.injected_tools_session.clear() ctx.tool_call_records.clear() round_history: list[set[tuple[str, str]]] = [] total_calls = 0 _global_order_index = 0 chan_info = f"[{ctx.platform}:{ctx.channel_id}] " if ctx else "" if token_count is None: token_count = await self._count_tokens(msgs, chan_info=chan_info) _count_str = str(token_count) if token_count is not None else "unavailable" for _m in msgs: if _m.get("role") == "system" and "__INPUT_TOKEN_COUNT__" in str( _m.get("content", "") ): _m["content"] = _m["content"].replace( "__INPUT_TOKEN_COUNT__", _count_str, ) break eff_model = override_model or self.model if self.max_tool_rounds <= 0: logger.info( "%sLLM API call (tool rounds disabled), model=%s, messages=%d", chan_info, eff_model, len(msgs), ) _t0 = time.monotonic() response_message = ( await self._call_api_with_optional_proxy_transport_fallbacks( msgs, tool_names=[], user_id=user_id, ctx=ctx, max_video_url_parts=max_video_url_parts, override_model=override_model, override_api_key=override_api_key, override_chat_url=override_chat_url, response_format=response_format, toggle_specific=toggle_specific, ) ) logger.info( "%sLLM API call completed in %.0f ms", chan_info, (time.monotonic() - _t0) * 1000, ) tool_calls = response_message.get("tool_calls") if tool_calls: logger.warning( "Model returned %d tool call(s) with tool rounds disabled — using text only", len(tool_calls), ) content = _message_content_for_proxy_error_detection( response_message.get("content"), ).strip() content = self._prepend_reasoning_to_content(response_message, content) if not content and tool_calls: content = ( "(No text reply; model attempted tool calls but none are enabled for this request.)" ) if _proxy_error_banner_present(content): logger.warning("Proxy returned an error response:\n%s", content) return _user_visible_proxy_chat_reply(content) content = await self._resolve_final_content_with_header_validation( content, msgs, validate_header=validate_header, regen_tool_names=[], user_id=user_id, max_video_url_parts=max_video_url_parts, override_model=override_model, override_api_key=override_api_key, override_chat_url=override_chat_url, toggle_specific=toggle_specific, ) return content limit_box = [self.max_tool_rounds] _limit_token = _TOOL_ROUND_LIMIT_BOX.set(limit_box) try: round_num = 0 while round_num < limit_box[0]: if ctx is not None: ctx.injected_tools = [] logger.info( "%sLLM API call: round=%d, model=%s, messages=%d", chan_info, round_num + 1, eff_model, len(msgs), ) _t0 = time.monotonic() response_message = ( await self._call_api_with_optional_proxy_transport_fallbacks( msgs, tool_names=tool_names, user_id=user_id, ctx=ctx, max_video_url_parts=max_video_url_parts, override_model=override_model, override_api_key=override_api_key, override_chat_url=override_chat_url, response_format=response_format, toggle_specific=toggle_specific, ) ) logger.info( "%sLLM API call round=%d completed in %.0f ms", chan_info, round_num + 1, (time.monotonic() - _t0) * 1000, ) tool_calls = response_message.get("tool_calls") if not tool_calls: content = _message_content_for_proxy_error_detection( response_message.get("content"), ).strip() content = self._prepend_reasoning_to_content( response_message, content ) if _proxy_error_banner_present(content): logger.warning("Proxy returned an error response:\n%s", content) return _user_visible_proxy_chat_reply(content) content = await self._resolve_final_content_with_header_validation( content, msgs, validate_header=validate_header, regen_tool_names=None, user_id=user_id, max_video_url_parts=max_video_url_parts, override_model=override_model, override_api_key=override_api_key, override_chat_url=override_chat_url, response_format=response_format, toggle_specific=toggle_specific, ) return content intermediate_text = _message_content_for_proxy_error_detection( response_message.get("content"), ).strip() if intermediate_text and on_intermediate_text: logger.debug( "Invoking on_intermediate_text (%d raw chars, %d tool call(s))", len(intermediate_text), len(tool_calls), ) try: await on_intermediate_text(intermediate_text) except Exception: logger.warning( "on_intermediate_text callback failed", exc_info=True, ) msgs.append(response_message) if ctx is not None and record_executed_tools: _seen = set(ctx.tools_executed) for tc in tool_calls: fn = tc.get("function") or {} name = fn.get("name") or "" if name and name not in _seen: _seen.add(name) ctx.tools_executed.append(name) round_keys: set[tuple[str, str]] = set() for tc in tool_calls: fn = tc["function"] name = fn["name"] args_str = fn.get("arguments", "") round_keys.add((name, args_str)) round_history.append(round_keys) total_calls += len(tool_calls) if total_calls > 200: logger.warning("Excessive tool calls (%d), aborting", total_calls) return "(Stopped: too many tool calls.)" exempt = self.tool_registry.repeat_allowed_tools() filtered_rounds = [ {(n, a) for n, a in rk if n not in exempt} for rk in round_history ] if self._detect_loop(filtered_rounds): logger.warning("Repetitive tool-call loop detected, aborting") return "(Stopped: repetitive tool-call loop detected.)" t0 = time.monotonic() sent_files_before = len(ctx.sent_files) if ctx else 0 async def _run_one( tc: dict, ) -> tuple[str, str, str, str, bool, float, float, float]: fn = tc["function"] tool_name = fn["name"] raw_args_json = fn.get("arguments", "") _t_tool = time.monotonic() _wall_start = _current_time() try: arguments = json.loads(raw_args_json) arguments = {k.strip("'\""): v for k, v in arguments.items()} except (json.JSONDecodeError, TypeError): arguments = {} logger.debug( "Calling tool %s with %s", tool_name, arguments, ) try: _IMAGE_GEN_TOOLS = { "generate_image", "comfyui_generate_image", "edit_image", } _is_game = False if ctx is not None and tool_name in _IMAGE_GEN_TOOLS: try: from game_session import get_or_restore_session _ch = getattr(ctx, "channel_id", "") _redis_client = getattr(ctx, "redis", None) _gs = await get_or_restore_session(_ch, _redis_client) if _ch else None _is_game = _gs is not None and _gs.active except ImportError: pass if _is_game and tool_name in _IMAGE_GEN_TOOLS: try: from background_agents.game_art_agent import ( publish_art_request, ) _art_redis = getattr(ctx, "redis", None) _prompt = arguments.get( "prompt", arguments.get("text", ""), ) if _art_redis and _prompt: _ch_id = getattr(ctx, "channel_id", "") or getattr( getattr(ctx, "message", None), "channel_id", "", ) _char_urls: list[str] = [] _char_names: list[str] = [] if _gs: try: from game_characters import ( get_active_character, ) for _uid in _gs.get_active_players(): try: _char = await get_active_character( _uid, _art_redis, ) if _char: _url = _char.get( "image_url", "" ) _cname = _char.get( "name", str(_uid) ) if _url: _char_urls.append(_url) _char_names.append(_cname) except Exception: pass except ImportError: pass asyncio.create_task( publish_art_request( redis=_art_redis, channel_id=str(_ch_id), narrative=_prompt[:3000], character_urls=_char_urls, character_names=_char_names, game_name=(_gs.game_name if _gs else ""), ), ) except Exception as _bg_exc: logger.debug( "Background art redirect failed: %s", _bg_exc, ) result = json.dumps( { "success": True, "message": ( "Image generation dispatched to " "background art agent. It will appear " "in the channel shortly. Continue " "your narrative." ), } ) _ok = True else: result = await self.tool_registry.call( tool_name, arguments, user_id=user_id, ctx=ctx, ) _ok = True except Exception as _texc: _ok = False result = json.dumps( { "error": ( f"Tool '{tool_name}' raised " f"{type(_texc).__name__}: {_texc}" ), } ) _wall_end = _current_time() _elapsed = (time.monotonic() - _t_tool) * 1000 _tags = _obs_tags(ctx) asyncio.create_task( publish_tool_event( tool_name, arguments, str(result)[:500], _elapsed, _ok, user_id=user_id, channel_id=_tags["channel_id"], request_id=_tags["request_id"], platform=_tags["platform"], ), ) return ( tc["id"], result, tool_name, raw_args_json, _ok, _wall_start, _wall_end, _elapsed, ) raw_results = await asyncio.gather( *[_run_one(tc) for tc in tool_calls], ) from tool_context import ToolCallRecord results: list[tuple[str, str]] = [] for r in raw_results: call_id, result, _tname, _raw_args, _ok, _ws, _we, _dur = r results.append((call_id, result)) if ctx is not None and record_executed_tools: ctx.tool_call_records.append( ToolCallRecord( tool_name=_tname, raw_arguments_json=_raw_args, result_output=str(result), success=_ok, execution_start=_ws, execution_end=_we, duration_ms=_dur, order_index=_global_order_index, round_number=round_num, ) ) _global_order_index += 1 elapsed_ms = (time.monotonic() - t0) * 1000 logger.info( "%sRound %d: executed %d tool(s) in %.0f ms", chan_info, round_num + 1, len(tool_calls), elapsed_ms, ) new_files = ( ctx.sent_files[sent_files_before:] if ctx and sent_files_before < len(ctx.sent_files) else [] ) for idx, (call_id, result) in enumerate(results): remaining = limit_box[0] - round_num - 1 cap = self.max_tool_output_chars if cap > 0 and len(result) > cap: _orig_len = len(result) result = ( result[:cap] + f"\n\n[TRUNCATED — output was {_orig_len:,} chars, showing first {cap:,}]" ) logger.warning( "Truncated tool output from %d to %d chars", _orig_len, cap, ) text_content = f"[Remaining tool rounds: {remaining}]\n{result}" if idx == 0 and new_files: from platforms.media_common import media_to_content_parts content_parts: list[dict[str, Any]] = [ {"type": "text", "text": text_content}, ] for sf in new_files: sf_parts = await media_to_content_parts( sf["data"], sf["mimetype"], sf["filename"], ) content_parts.extend(sf_parts) msgs.append( { "role": "tool", "tool_call_id": call_id, "content": content_parts, } ) else: msgs.append( { "role": "tool", "tool_call_id": call_id, "content": text_content, } ) btw_lines, _btw_cursor_ts = await _collect_btw_messages( ctx, _btw_cursor_ts, _btw_trigger_msg_id, ) if btw_lines and msgs and msgs[-1].get("role") == "tool": _suffix = "\n" + json.dumps( {"btw": btw_lines}, ensure_ascii=False, ) _append_btw_suffix_to_last_tool_message(msgs, _suffix) logger.info( "%sInjected btw suffix into round %d: %d new message(s)", chan_info, round_num + 1, len(btw_lines), ) if ctx is not None and ctx.injected_tools and tool_names is not None: existing = set(tool_names) new_tools = [t for t in ctx.injected_tools if t not in existing] if new_tools: tool_names.extend(new_tools) ctx.injected_tools_session.extend(new_tools) logger.info( "%sInjected %d tool(s) into active set: %s", chan_info, len(new_tools), new_tools, ) ctx.injected_tools = None round_num += 1 logger.warning("Reached max tool-call rounds (%d)", limit_box[0]) msgs.append( { "role": "user", "content": ( "[SYSTEM] You have reached the maximum number of allowed tool calls " f"({limit_box[0]} rounds) for this cycle. You cannot " "make any more tool calls. Please generate your final response " "to the user now, summarizing what you accomplished and noting " "anything you were unable to complete." ), } ) logger.info( "%sLLM API call: final no-tools round, model=%s, messages=%d", chan_info, eff_model, len(msgs), ) _t0 = time.monotonic() try: final_response = ( await self._call_api_with_optional_proxy_transport_fallbacks( msgs, tool_names=[], user_id=user_id, ctx=ctx, max_video_url_parts=max_video_url_parts, override_model=override_model, override_api_key=override_api_key, override_chat_url=override_chat_url, toggle_specific=toggle_specific, ) ) logger.info( "%sLLM API final no-tools round completed in %.0f ms", chan_info, (time.monotonic() - _t0) * 1000, ) _fc = final_response.get("content") _flat_final = _message_content_for_proxy_error_detection(_fc).strip() content = ( _flat_final if _flat_final else "(max tool-call rounds reached)" ) except Exception as exc: logger.error("Final no-tools round failed: %s", exc) _ec = response_message.get("content") _flat_err = _message_content_for_proxy_error_detection(_ec).strip() content = _flat_err if _flat_err else "(max tool-call rounds reached)" if _proxy_error_banner_present(content): logger.warning("Proxy returned an error response:\n%s", content) return _user_visible_proxy_chat_reply(content) content = await self._resolve_final_content_with_header_validation( content, msgs, validate_header=validate_header, regen_tool_names=[], user_id=user_id, max_video_url_parts=max_video_url_parts, override_model=override_model, override_api_key=override_api_key, override_chat_url=override_chat_url, toggle_specific=toggle_specific, ) return content finally: _TOOL_ROUND_LIMIT_BOX.reset(_limit_token)