Source code for openrouter_client.transport

"""Core HTTP transport client and native Gemini embedder/token-counter."""

from __future__ import annotations

import asyncio
import logging
import time
import os
from typing import Any, Callable, Awaitable

import httpx
import jsonutil as json

from gemini_embed_pool import (
    EMBED_DIMENSIONS,
    GEMINI_EMBED_BASE,
    PAID_KEY_FALLBACK_THRESHOLD,
    OpenRouterEmbedParseError,
    check_openrouter_only,
    gemini_embed_paid_fallback,
    get_openrouter_api_key,
    is_daily_quota_429,
    mark_key_daily_spent,
    next_gemini_embed_key,
    openrouter_embed_batch,
    record_key_usage,
    set_openrouter_only,
)
from platforms.media_common import (
    CLAUDE_MAX_IMAGE_BYTES,
    detect_image_mimetype_from_bytes,
    media_to_content_parts,
    shrink_image_under_max_bytes,
)
from tool_context import ToolContext
from user_llm_config import sanitize_llm_http_url

from observability import (
    publish_embedding_event,
    publish_http_error_event,
    publish_llm_request_event,
    get_http_call_origin,
)

from openrouter_client.executor import OpenRouterExecutor, _ensure_trailing_user_turn
from openrouter_client.sanitization import (
    resolve_model_alias,
    _model_targets_gemini,
    _model_targets_claude,
    _model_targets_gemma4,
    _model_strips_all_multimodal_before_request,
    _strip_audio_parts,
    _strip_video_url_parts,
    _strip_image_url_parts,
    _strip_video_and_audio_file_parts,
    _relabel_assistant_image_turns_as_user,
    _cap_video_url_parts,
    _clamp_claude_oversized_images,
    _sanitize_openai_tools_for_gemini,
)
from openrouter_client.error_handling import (
    _PROXY_20_RETRIES_FALLBACK_LOCAL_CHAT_URL,
    _PROXY_20_RETRIES_FALLBACK_OPENROUTER_CHAT_URL,
    _PROXY_20_RETRIES_FALLBACK_LOCAL_MODEL,
    _PROXY_20_RETRIES_FALLBACK_OPENROUTER_MODEL,
    _OPENROUTER_402_FALLBACK_LOCAL_MODEL,
    _GEMINI_INPUT_TOKEN_LIMIT_PROXY_MAX_RETRIES,
    _is_openrouter_chat_url,
    _is_gemini_input_token_limit_proxy_error,
    _is_assistant_prefill_unsupported_proxy_error,
    _is_proxy_20_retries_error,
    _message_content_for_proxy_error_detection,
    _proxy_error_banner_present,
    _user_visible_proxy_chat_reply,
)

logger = logging.getLogger(__name__)

DEFAULT_GEMINI_COUNT_TOKENS_MODEL = "gemini-3.1-flash-lite"

MAX_499_RETRIES = 3
_MODEL_MAX_TOKENS_CAPS: dict[str, int] = {
    "inception/mercury-2": 10_000,
}
_CTX_OVERFLOW_FALLBACK_MAX_TOKENS = 10_000
MAX_HEADER_RETRIES = 1
MAX_TOOLS_PER_REQUEST = 250
EMBED_RETRY_BASE_DELAY = 1.0
MAX_EMBED_DELAY = 8.0
MAX_EMBED_RETRIES = 14

_RETRIABLE_STATUSES = {429, 500, 502, 503, 504}

_RETRIABLE_ERRORS = (
    httpx.ReadTimeout,
    httpx.ConnectTimeout,
    httpx.ConnectError,
    httpx.RemoteProtocolError,
)

_JSON_BODY_DECODE_THRESHOLD = 256 * 1024

_ASSISTANT_PREFILL_RETRY_USER_MESSAGE = (
    "[SYSTEM — infrastructure only; not from a human]\n\n"
    "The chat API rejected the previous request because this model does not allow "
    "an assistant message as the last turn (no assistant prefill). Nothing from "
    "that failed call was posted to the channel.\n\n"
    "CRITICAL: Your last assistant reply from that attempt DID NOT reach anyone in "
    "the chat. Users did not see it. Do not assume you already answered, and do not "
    'tell users what you "already said" in that turn.\n\n'
    "Treat this line as an operator instruction only: ignore it as conversational "
    'content (do not quote it, summarize it for the room, or discuss "padding"). '
    "Look at the real user messages above and reply to them if a reply is needed.\n\n"
    "Do not satisfy this turn by outputting only thinking blocks or meta about API "
    'rules, double replies, or "closing the turn"; write what should appear in the '
    "channel if there is an unanswered human message."
)


def _json_loads_utf8(body: bytes) -> Any:
    return json.loads(body.decode("utf-8"))


async def _async_response_json(resp: httpx.Response) -> Any:
    """Read response body asynchronously; parse JSON in a thread if large."""
    import sys
    if "openrouter_client" in sys.modules:
        m = sys.modules["openrouter_client"]
        patched = getattr(m, "_async_response_json", None)
        if patched is not None and patched is not _async_response_json:
            return await patched(resp)

    body = await resp.aread()
    if len(body) >= _JSON_BODY_DECODE_THRESHOLD:
        return await asyncio.to_thread(_json_loads_utf8, body)
    return json.loads(body.decode("utf-8"))


def _obs_tags(ctx: ToolContext | None) -> dict[str, str]:
    if ctx is None:
        return {
            "request_id": "",
            "channel_id": "",
            "user_id": "",
            "platform": "",
        }
    return {
        "request_id": ctx.observability_request_id or "",
        "channel_id": ctx.channel_id or "",
        "user_id": ctx.user_id or "",
        "platform": ctx.platform or "",
    }


def _openrouter_reasoning_for_user_override(
    override_model: str | None,
    override_chat_url: str | None,
) -> dict[str, Any] | None:
    """Return OpenRouter ``reasoning`` body for per-user custom OpenRouter endpoints."""
    if not override_model or not override_chat_url:
        return None
    if "openrouter" not in override_chat_url.lower():
        return None
    return {"effort": "high", "enabled": True, "exclude": False}


[docs] class OpenRouterClient(OpenRouterExecutor): """Thin async wrapper around an OpenAI-compatible chat-completions endpoint."""
[docs] def __init__( self, api_key: str, model: str = "x-ai/grok-4.1-fast", temperature: float = 1.0, max_tokens: int = 60_000, top_p: float = 0.99, tool_registry: Any | None = None, max_tool_rounds: int = 10, base_url: str = "http://localhost:3000/openai", gemini_api_key: str = "", gemini_count_tokens_model: str = DEFAULT_GEMINI_COUNT_TOKENS_MODEL, max_tool_output_chars: int = 150_000, http_connect_timeout: float = 10.0, http_read_timeout: float = 1200.0, http_write_timeout: float = 120.0, http_pool_timeout: float = 180.0, ) -> None: """Initialize the instance.""" self.api_key = api_key self.gemini_api_key = gemini_api_key or self._resolve_gemini_api_key() self.gemini_count_tokens_model = gemini_count_tokens_model self.model = model self.temperature = temperature self.max_tokens = max_tokens self.top_p = top_p if tool_registry is None: from tools import ToolRegistry self.tool_registry = ToolRegistry() else: self.tool_registry = tool_registry self.max_tool_rounds = max_tool_rounds self.max_tool_output_chars = max_tool_output_chars _base = sanitize_llm_http_url(base_url) self._chat_url = _base.rstrip("/") + "/chat/completions" self._http = httpx.AsyncClient( timeout=httpx.Timeout( connect=http_connect_timeout, read=http_read_timeout, write=http_write_timeout, pool=http_pool_timeout, ), )
@staticmethod def _resolve_gemini_api_key() -> str: """Return the Gemini API key from the environment. Callers that already hold a loaded :class:`Config` (e.g. ``main.py``) pass ``gemini_api_key`` explicitly, and ``Config.load`` already folds the ``GEMINI_API_KEY`` env var into ``cfg.gemini_api_key``. This fallback therefore only consults the environment and never re-parses ``config.yaml`` — previously that caused a redundant synchronous YAML load on the event loop during startup. """ return os.getenv("GEMINI_API_KEY", "") # ------------------------------------------------------------------ # Token counting via Gemini API # ------------------------------------------------------------------ @staticmethod def _build_gemini_contents(messages: list[dict[str, Any]]) -> list[dict]: """Convert OpenAI-format messages to Gemini ``contents`` (sync, CPU-bound).""" contents = [] for msg in messages: role = msg.get("role", "user") content = msg.get("content", "") if not content: continue if role in ("system", "assistant"): gemini_role = "model" else: gemini_role = "user" if isinstance(content, list): text = " ".join( p.get("text", "") for p in content if p.get("type") == "text" ) if not text: continue contents.append( { "role": gemini_role, "parts": [{"text": text}], } ) else: contents.append( { "role": gemini_role, "parts": [{"text": str(content)}], } ) if not contents: return [] deduped: list[dict] = [contents[0]] for c in contents[1:]: if c["role"] == deduped[-1]["role"]: deduped[-1]["parts"].extend(c["parts"]) else: deduped.append(c) return deduped async def _count_tokens( self, messages: list[dict[str, Any]], *, chan_info: str = "", gemini_model: str | None = None, ) -> int | None: """Count input tokens using the Gemini countTokens API.""" if not self.gemini_api_key: return None model_id = gemini_model or self.gemini_count_tokens_model deduped = await asyncio.to_thread( self._build_gemini_contents, messages, ) if not deduped: return None url = ( f"https://generativelanguage.googleapis.com/v1beta/" f"models/{model_id}:countTokens" f"?key={self.gemini_api_key}" ) payload = {"contents": deduped} try: resp = await self._http.post( url, json=payload, timeout=5.0, ) if resp.status_code == 200: data = await _async_response_json(resp) count = data.get("totalTokens") if isinstance(count, int): logger.info( "%sGemini countTokens: %d tokens", chan_info, count, ) return count else: logger.debug( "countTokens returned %d: %s", resp.status_code, resp.text[:200], ) except Exception: logger.debug( "countTokens call failed", exc_info=True, ) return None
[docs] async def count_input_tokens( self, messages: list[dict[str, Any]], *, gemini_model: str | None = None, ) -> int | None: """Public wrapper for Gemini ``countTokens`` on OpenAI-shaped *messages*.""" return await self._count_tokens( messages, gemini_model=gemini_model, )
[docs] async def close(self) -> None: """Close HTTP client.""" await self._http.aclose()
@staticmethod def _gemini_model_name(model: str) -> str: """Convert OpenRouter model name to Gemini native model name.""" return model.removeprefix("google/") # ------------------------------------------------------------------ # Embedding provider implementations # ------------------------------------------------------------------
[docs] async def embed(self, text: str, model: str) -> list[float]: """Generate an embedding vector for *text* via the Gemini API.""" if not text or not text.strip(): return [0.0] * EMBED_DIMENSIONS round_num = 0 last_error: str | None = None while round_num < MAX_EMBED_RETRIES: try: return await self._embed_gemini(text, model) except ValueError: raise except OpenRouterEmbedParseError as exc: logger.warning( "OpenRouter embed 200-no-data for single text " "(non-retriable) — returning zero vector: %s", str(exc)[:500], ) return [0.0] * EMBED_DIMENSIONS except Exception as exc: last_error = str(exc) logger.warning( "Gemini embed failed (round %d): %s", round_num + 1, exc, ) delay = min(EMBED_RETRY_BASE_DELAY * (2**round_num), MAX_EMBED_DELAY) await asyncio.sleep(delay) round_num += 1 raise RuntimeError( f"Gemini embed failed after {MAX_EMBED_RETRIES} rounds: {last_error}" )
[docs] async def embed_batch( self, texts: list[str], model: str, ) -> list[list[float]]: """Generate embedding vectors for multiple texts via the Gemini API.""" if not texts: return [] valid_indices: list[int] = [] valid_texts: list[str] = [] for i, t in enumerate(texts): if t and t.strip(): valid_indices.append(i) valid_texts.append(t) if not valid_texts: return [[0.0] * EMBED_DIMENSIONS for _ in texts] round_num = 0 last_error: str | None = None while round_num < MAX_EMBED_RETRIES: try: valid_embeddings = await self._embed_gemini_batch(valid_texts, model) result: list[list[float]] = [[0.0] * EMBED_DIMENSIONS for _ in texts] for idx, emb in zip(valid_indices, valid_embeddings): result[idx] = emb return result except ValueError: raise except Exception as exc: last_error = str(exc) logger.warning( "Gemini batch embed failed (round %d): %s", round_num + 1, exc, ) delay = min(EMBED_RETRY_BASE_DELAY * (2**round_num), MAX_EMBED_DELAY) await asyncio.sleep(delay) round_num += 1 raise RuntimeError( f"Gemini embed_batch failed after {MAX_EMBED_RETRIES} rounds: {last_error}" )
async def _embed_gemini( self, text: str, model: str, task_type: str | None = None, ) -> list[float]: """Embed a single text via the native Gemini API.""" if not text or not text.strip(): return [0.0] * EMBED_DIMENSIONS if await check_openrouter_only(): logger.info("OpenRouter-only mode — bypassing Gemini for single text") try: vecs = await openrouter_embed_batch( [text], model=model, api_key=self.api_key, ) return vecs[0] except Exception: logger.warning( "OpenRouter failed while pinned to openrouter_only — " "trying paid Gemini key as last resort", exc_info=True, ) paid_vecs = await gemini_embed_paid_fallback( [text], model=model, task_type=task_type, ) return paid_vecs[0] gemini_model = self._gemini_model_name(model) api_key = next_gemini_embed_key() url = f"{GEMINI_EMBED_BASE}/{gemini_model}:embedContent?key={api_key}" payload: dict = { "model": f"models/{gemini_model}", "content": {"parts": [{"text": text}]}, "output_dimensionality": EMBED_DIMENSIONS, } if task_type: payload["taskType"] = task_type attempt = 0 consecutive_429 = 0 tried_or_then_paid = False _embed_start = time.monotonic() last_error: str | None = "unknown" while attempt < MAX_EMBED_RETRIES: if attempt > 0: delay = min( EMBED_RETRY_BASE_DELAY * (2 ** (attempt - 1)), MAX_EMBED_DELAY, ) logger.warning( "Gemini embed error, retrying in %.1fs (attempt %d): %s", delay, attempt + 1, last_error, ) await asyncio.sleep(delay) try: _t0 = time.monotonic() origin = get_http_call_origin() logger.info( "Sending Gemini embedding request for model %s (origin: %s)", model, origin, ) resp = await self._http.post(url, json=payload) logger.info( "Gemini embed HTTP request completed in %.0f ms (status=%d) (origin: %s)", (time.monotonic() - _t0) * 1000, resp.status_code, origin, ) await record_key_usage(api_key) if resp.status_code == 400: body = resp.text[:500] logger.error( "Gemini embed 400 Bad Request (non-retriable): %s", body, ) _ms = (time.monotonic() - _t0) * 1000 asyncio.create_task( publish_http_error_event( http_service="gemini_embed", http_status=400, duration_ms=_ms, endpoint=url.split("?")[0][:120], detail=body[:500], ), ) asyncio.create_task( publish_embedding_event( source="gemini_embed_single", model=model, duration_ms=_ms, success=False, text_len=len(text), error=body[:300], ), ) raise ValueError(f"Gemini embed 400 Bad Request: {body}") if resp.status_code == 429: if is_daily_quota_429(resp): await mark_key_daily_spent(api_key, "embed") api_key = next_gemini_embed_key() url = ( f"{GEMINI_EMBED_BASE}/{gemini_model}" f":embedContent?key={api_key}" ) last_error = "HTTP 429 (daily quota)" attempt += 1 continue consecutive_429 += 1 if ( consecutive_429 >= PAID_KEY_FALLBACK_THRESHOLD and not tried_or_then_paid ): tried_or_then_paid = True logger.warning( "Free Gemini pool 429'd %d times — escalating " "to OpenRouter, then paid Gemini key", consecutive_429, ) try: vecs = await openrouter_embed_batch( [text], model=model, api_key=self.api_key, ) await set_openrouter_only() return vecs[0] except Exception: logger.warning( "OpenRouter embed fallback failed — " "trying paid Gemini key as last resort", exc_info=True, ) try: paid_vecs = await gemini_embed_paid_fallback( [text], model=model, task_type=task_type, ) return paid_vecs[0] except Exception: logger.warning( "Paid Gemini key fallback also failed", exc_info=True, ) last_error = "HTTP 429" attempt += 1 continue if resp.status_code in _RETRIABLE_STATUSES: last_error = f"HTTP {resp.status_code}" attempt += 1 continue resp.raise_for_status() data = await _async_response_json(resp) _ms = (time.monotonic() - _t0) * 1000 asyncio.create_task( publish_embedding_event( source="gemini_embed_single", model=model, duration_ms=_ms, success=True, text_len=len(text), ), ) return data["embedding"]["values"] except ValueError: raise except Exception as exc: last_error = str(exc) attempt += 1 _ms = (time.monotonic() - _embed_start) * 1000 asyncio.create_task( publish_http_error_event( http_service="gemini_embed", http_status=0, duration_ms=_ms, endpoint="gemini:embedContent", detail=str(last_error)[:500] if last_error else "unknown", error_kind="exhausted_retries", ), ) asyncio.create_task( publish_embedding_event( source="gemini_embed_single", model=model, duration_ms=_ms, success=False, text_len=len(text), error=str(last_error)[:300] if last_error else "", ), ) raise RuntimeError( f"Gemini embed failed after {MAX_EMBED_RETRIES} attempts: {last_error}" ) async def _embed_gemini_batch( self, texts: list[str], model: str, task_type: str | None = None, ) -> list[list[float]]: """Embed a batch of texts via the native Gemini API.""" if not texts: return [] valid_indices: list[int] = [] valid_texts: list[str] = [] for i, t in enumerate(texts): if t and t.strip(): valid_indices.append(i) valid_texts.append(t) if not valid_texts: return [[0.0] * EMBED_DIMENSIONS for _ in texts] if len(valid_texts) < len(texts): partial = await self._embed_gemini_batch( valid_texts, model, task_type=task_type, ) result: list[list[float]] = [[0.0] * EMBED_DIMENSIONS for _ in texts] for idx, emb in zip(valid_indices, partial): result[idx] = emb return result texts = valid_texts if len(texts) > 100: chunks = [texts[i : i + 100] for i in range(0, len(texts), 100)] results: list[list[float]] = [] for chunk in chunks: chunk_embeddings = await self._embed_gemini_batch( chunk, model, task_type=task_type, ) results.extend(chunk_embeddings) return results if await check_openrouter_only(): logger.info( "OpenRouter-only mode — bypassing Gemini for %d texts", len(texts) ) try: return await openrouter_embed_batch( texts, model=model, api_key=self.api_key, ) except Exception: logger.warning( "OpenRouter failed while pinned to openrouter_only — " "trying paid Gemini key as last resort", exc_info=True, ) return await gemini_embed_paid_fallback( texts, model=model, task_type=task_type, ) gemini_model = self._gemini_model_name(model) api_key = next_gemini_embed_key() url = f"{GEMINI_EMBED_BASE}/{gemini_model}:batchEmbedContents?key={api_key}" requests_list = [] for t in texts: req: dict = { "model": f"models/{gemini_model}", "content": {"parts": [{"text": t}]}, "output_dimensionality": EMBED_DIMENSIONS, } if task_type: req["taskType"] = task_type requests_list.append(req) payload = {"requests": requests_list} _tags = _obs_tags(None) attempt = 0 consecutive_429 = 0 tried_or_then_paid = False last_error: str | None = "unknown" while attempt < MAX_EMBED_RETRIES: if attempt > 0: delay = min( EMBED_RETRY_BASE_DELAY * (2 ** (attempt - 1)), MAX_EMBED_DELAY, ) logger.warning( "Gemini batch embed error, retrying in %.1fs (attempt %d): %s", delay, attempt + 1, last_error, ) await asyncio.sleep(delay) try: _t0 = time.monotonic() origin = get_http_call_origin() logger.info( "Sending Gemini batch embedding request of %d texts for model %s (origin: %s)", len(texts), model, origin, ) resp = await self._http.post(url, json=payload) logger.info( "Gemini batch embed HTTP request completed in %.0f ms (status=%d) (origin: %s)", (time.monotonic() - _t0) * 1000, resp.status_code, origin, ) await record_key_usage(api_key) if resp.status_code == 400: body = resp.text[:500] logger.error( "Gemini batch embed 400 Bad Request (non-retriable): %s", body, ) _ms = (time.monotonic() - _t0) * 1000 asyncio.create_task( publish_http_error_event( http_service="gemini_embed_batch", http_status=resp.status_code, request_id=_tags["request_id"], channel_id=_tags["channel_id"], user_id=_tags["user_id"], platform=_tags["platform"], duration_ms=_ms, endpoint=url.split("?")[0][:120], detail=body[:500], ), ) raise ValueError(f"Gemini batch embed 400 Bad Request: {body}") if resp.status_code == 429: if is_daily_quota_429(resp): await mark_key_daily_spent(api_key, "embed") api_key = next_gemini_embed_key() url = ( f"{GEMINI_EMBED_BASE}/{gemini_model}" f":batchEmbedContents?key={api_key}" ) last_error = "HTTP 429 (daily quota)" attempt += 1 continue consecutive_429 += 1 if ( consecutive_429 >= PAID_KEY_FALLBACK_THRESHOLD and not tried_or_then_paid ): tried_or_then_paid = True logger.warning( "Free Gemini pool 429'd %d times — escalating " "to OpenRouter, then paid Gemini key", consecutive_429, ) try: vecs = await openrouter_embed_batch( texts, model=model, api_key=self.api_key, ) await set_openrouter_only() return vecs except Exception: logger.warning( "OpenRouter embed fallback failed — " "trying paid Gemini key as last resort", exc_info=True, ) try: return await gemini_embed_paid_fallback( texts, model=model, task_type=task_type, ) except Exception: logger.warning( "Paid Gemini key fallback also failed", exc_info=True, ) last_error = "HTTP 429" attempt += 1 continue if resp.status_code in _RETRIABLE_STATUSES: last_error = f"HTTP {resp.status_code}" attempt += 1 continue resp.raise_for_status() data = await _async_response_json(resp) _ms = (time.monotonic() - _t0) * 1000 asyncio.create_task( publish_embedding_event( source="gemini_batch", model=model, duration_ms=_ms, success=True, batch_size=len(texts), request_id=_tags["request_id"], channel_id=_tags["channel_id"], user_id=_tags["user_id"], platform=_tags["platform"], ), ) return [item["values"] for item in data["embeddings"]] except ValueError: raise except Exception as exc: last_error = str(exc) attempt += 1 raise RuntimeError( f"Gemini batch embed failed after {MAX_EMBED_RETRIES} attempts: {last_error}" ) # ------------------------------------------------------------------ # HTTP Chat Completions Transport Callers # ------------------------------------------------------------------ async def _call_api_with_optional_gemini_proxy_retry( self, messages: list[dict[str, Any]], tool_names: list[str] | None = None, user_id: str = "", ctx: ToolContext | None = None, max_video_url_parts: int | None = 5, *, override_model: str | None = None, override_api_key: str | None = None, override_chat_url: str | None = None, response_format: dict[str, Any] | None = None, toggle_specific: bool = False, ) -> dict[str, Any]: """Call :meth:`_call_api` with selective retries for known proxy error bodies.""" last_normalized: dict[str, Any] | None = None work_messages: list[dict[str, Any]] = messages gemini_token_retries_remaining = _GEMINI_INPUT_TOKEN_LIMIT_PROXY_MAX_RETRIES prefill_fixup_applied = False payment_402_fallback_used = False active_override_model: str | None = override_model active_override_chat_url: str | None = override_chat_url # Room for token-limit retries, one prefill fixup, and 402 re-entry. max_attempts = max( 16, _GEMINI_INPUT_TOKEN_LIMIT_PROXY_MAX_RETRIES + 6, ) for attempt in range(max_attempts): effective_model = resolve_model_alias(active_override_model or self.model) try: raw = await self._call_api( work_messages, tool_names=tool_names, user_id=user_id, ctx=ctx, max_video_url_parts=max_video_url_parts, override_model=active_override_model, override_api_key=override_api_key, override_chat_url=active_override_chat_url, response_format=response_format, toggle_specific=toggle_specific, ) except httpx.HTTPStatusError as exc: if exc.response.status_code != 402: raise if payment_402_fallback_used: raise attempt_url = active_override_chat_url or self._chat_url if not _is_openrouter_chat_url(attempt_url): raise resolved_model = resolve_model_alias( active_override_model or self.model ) _fb_model = resolve_model_alias(_OPENROUTER_402_FALLBACK_LOCAL_MODEL) if ( attempt_url.rstrip("/") == _PROXY_20_RETRIES_FALLBACK_LOCAL_CHAT_URL.rstrip("/") and resolved_model == _fb_model ): raise payment_402_fallback_used = True logger.warning( "OpenRouter HTTP 402 Payment Required; retrying via %s (%s)", _PROXY_20_RETRIES_FALLBACK_LOCAL_CHAT_URL, _OPENROUTER_402_FALLBACK_LOCAL_MODEL, ) active_override_model = _OPENROUTER_402_FALLBACK_LOCAL_MODEL active_override_chat_url = _PROXY_20_RETRIES_FALLBACK_LOCAL_CHAT_URL effective_model = resolve_model_alias( active_override_model or self.model ) raw = await self._call_api( work_messages, tool_names=tool_names, user_id=user_id, ctx=ctx, max_video_url_parts=max_video_url_parts, override_model=active_override_model, override_api_key=override_api_key, override_chat_url=active_override_chat_url, response_format=response_format, toggle_specific=toggle_specific, ) last_normalized = self._normalize_assistant_tool_message(raw) raw_content = last_normalized.get("content") if ( gemini_token_retries_remaining > 0 and _is_gemini_input_token_limit_proxy_error(raw_content) ): gemini_token_retries_remaining -= 1 logger.warning( "Proxy reported input token limit (often transient mis-route; " "model=%r); retrying full chat request " "(%d/%d)", effective_model, _GEMINI_INPUT_TOKEN_LIMIT_PROXY_MAX_RETRIES - gemini_token_retries_remaining, _GEMINI_INPUT_TOKEN_LIMIT_PROXY_MAX_RETRIES, ) continue if ( not prefill_fixup_applied and _is_assistant_prefill_unsupported_proxy_error(raw_content) ): prefill_fixup_applied = True work_messages = [ *work_messages, { "role": "user", "content": _ASSISTANT_PREFILL_RETRY_USER_MESSAGE, }, ] logger.warning( "Assistant message prefill not supported for this model " "(proxy 400); appending synthetic user message and retrying " "(attempt %d)", attempt + 1, ) continue return last_normalized assert last_normalized is not None return last_normalized async def _call_api_with_optional_proxy_transport_fallbacks( self, messages: list[dict[str, Any]], tool_names: list[str] | None = None, user_id: str = "", ctx: ToolContext | None = None, max_video_url_parts: int | None = 5, *, override_model: str | None = None, override_api_key: str | None = None, override_chat_url: str | None = None, response_format: dict[str, Any] | None = None, toggle_specific: bool = False, ) -> dict[str, Any]: """Run Gemini/prefill proxy retries, then optional transport fallbacks.""" primary = await self._call_api_with_optional_gemini_proxy_retry( messages, tool_names=tool_names, user_id=user_id, ctx=ctx, max_video_url_parts=max_video_url_parts, override_model=override_model, override_api_key=override_api_key, override_chat_url=override_chat_url, response_format=response_format, toggle_specific=toggle_specific, ) if not _is_proxy_20_retries_error(primary.get("content")): return primary logger.warning( "Proxy reported 20-retries exhaustion; retrying via %s (%s)", _PROXY_20_RETRIES_FALLBACK_LOCAL_CHAT_URL, _PROXY_20_RETRIES_FALLBACK_LOCAL_MODEL, ) fallback1: dict[str, Any] | None = None try: fallback1 = await self._call_api_with_optional_gemini_proxy_retry( messages, tool_names=tool_names, user_id=user_id, ctx=ctx, max_video_url_parts=max_video_url_parts, override_model=_PROXY_20_RETRIES_FALLBACK_LOCAL_MODEL, override_api_key=override_api_key, override_chat_url=_PROXY_20_RETRIES_FALLBACK_LOCAL_CHAT_URL, response_format=response_format, toggle_specific=toggle_specific, ) except Exception: logger.exception( "Tier-1 transport fallback (localhost Claude) failed", ) if fallback1 is not None: t1_flat = _message_content_for_proxy_error_detection( fallback1.get("content"), ) if not _proxy_error_banner_present(t1_flat): return fallback1 import openrouter_client get_key_fn = getattr(openrouter_client, "get_openrouter_api_key", get_openrouter_api_key) or_key = get_key_fn() if not or_key: logger.error( "OpenRouter API key missing; cannot run tier-2 transport fallback", ) return fallback1 if fallback1 is not None else primary logger.warning( "Transport fallback tier 2: %s model=%s", _PROXY_20_RETRIES_FALLBACK_OPENROUTER_CHAT_URL, _PROXY_20_RETRIES_FALLBACK_OPENROUTER_MODEL, ) try: return await self._call_api_with_optional_gemini_proxy_retry( messages, tool_names=tool_names, user_id=user_id, ctx=ctx, max_video_url_parts=max_video_url_parts, override_model=_PROXY_20_RETRIES_FALLBACK_OPENROUTER_MODEL, override_api_key=or_key, override_chat_url=_PROXY_20_RETRIES_FALLBACK_OPENROUTER_CHAT_URL, response_format=response_format, toggle_specific=toggle_specific, ) except Exception: logger.exception( "Tier-2 transport fallback (OpenRouter Gemini) failed", ) return fallback1 if fallback1 is not None else primary async def _call_api( self, messages: list[dict[str, Any]], tool_names: list[str] | None = None, user_id: str = "", ctx: ToolContext | None = None, max_video_url_parts: int | None = 5, *, override_model: str | None = None, override_api_key: str | None = None, override_chat_url: str | None = None, response_format: dict[str, Any] | None = None, toggle_specific: bool = False, ) -> dict[str, Any]: """Make a single chat-completions request and return the first choice message.""" effective_model = resolve_model_alias(override_model or self.model) effective_api_key = override_api_key or self.api_key chat_url = sanitize_llm_http_url(override_chat_url or self._chat_url) from tools._safe_http import assert_safe_http_url from urllib.parse import urlparse parsed_url = urlparse(chat_url) is_local = False if parsed_url.hostname: hl = parsed_url.hostname.lower().strip(".") is_local = (hl == "localhost" or hl == "127.0.0.1" or hl.startswith("127.")) if not is_local: chat_url = assert_safe_http_url(chat_url) if _model_strips_all_multimodal_before_request(effective_model): ra = _strip_audio_parts(messages) rv = _strip_video_url_parts(messages) ri = _strip_image_url_parts(messages) if ra or rv or ri: logger.info( "Stripped multimodal content for text-only model %r " "(audio=%d, video=%d, image=%d)", effective_model, ra, rv, ri, ) elif _model_targets_claude(effective_model): rv = _strip_video_url_parts(messages) rv_file = _strip_video_and_audio_file_parts(messages) ra = _strip_audio_parts(messages) if rv or rv_file or ra: logger.info( "Stripped video/audio for Claude model %r " "(video_url=%d, file_video_or_audio=%d, audio_type_parts=%d)", effective_model, rv, rv_file, ra, ) elif _model_targets_gemma4(effective_model): rv = _strip_video_url_parts(messages) rv_file = _strip_video_and_audio_file_parts(messages) ra = _strip_audio_parts(messages) if rv or rv_file or ra: logger.info( "Stripped video/audio for Gemma 4 model %r " "(video_url=%d, file_video_or_audio=%d, audio_type_parts=%d)", effective_model, rv, rv_file, ra, ) elif not _model_targets_gemini(effective_model): removed = _strip_video_url_parts(messages) if removed: logger.info( "Stripped %d video_url part(s) — model %r does not support video", removed, effective_model, ) elif max_video_url_parts is not None and max_video_url_parts > 0: removed = _cap_video_url_parts(messages, max_video_url_parts) if removed: logger.info( "Stripped %d older video_url part(s), keeping %d", removed, max_video_url_parts, ) # Image content is never valid inside an assistant/model turn for any # provider we route to: Anthropic rejects it and Google Gemini returns # INVALID_ARGUMENT. Bot-sent images are recorded as a separate user turn # upstream, but legacy history (persisted before that fix) and any path # that slips through can still carry an assistant-role image. Re-role # such turns to ``user`` here, regardless of provider, so the request # never fails on a bot-sent image. Text-only models already stripped all # images above, so this is a no-op for them. relabeled = _relabel_assistant_image_turns_as_user(messages) if relabeled: logger.info( "Relabeled %d assistant turn(s) with image content as user " "for model %r", relabeled, effective_model, ) if _model_targets_claude(effective_model): img_changed = await asyncio.to_thread( _clamp_claude_oversized_images, messages, effective_model, ) if img_changed: logger.info( "Adjusted %d oversized image_url part(s) for Claude model %r", img_changed, effective_model, ) _tags = _obs_tags(ctx) _ensure_trailing_user_turn( messages, model_for_log=effective_model, call_site="_call_api", ) _effective_max_tokens = min( self.max_tokens, _MODEL_MAX_TOKENS_CAPS.get(effective_model, self.max_tokens), ) if _effective_max_tokens < self.max_tokens: logger.info( "Model %r has a max_tokens cap of %d (configured: %d)", effective_model, _effective_max_tokens, self.max_tokens, ) effective_temperature = self.temperature if _model_targets_claude(effective_model) and effective_temperature > 1.0: logger.info( "Clamping temperature %.2f -> 0.99 for Claude model %r", effective_temperature, effective_model, ) effective_temperature = 0.99 payload: dict[str, Any] = { "model": effective_model, "messages": messages, "temperature": effective_temperature, "max_tokens": _effective_max_tokens, } if response_format: payload["response_format"] = response_format payload["top_p"] = self.top_p _reasoning = _openrouter_reasoning_for_user_override( override_model, override_chat_url, ) if _reasoning is not None: payload["reasoning"] = _reasoning logger.info( "OpenRouter user override: added reasoning=%s for model=%r", _reasoning, effective_model, ) if user_id: payload["user"] = user_id if toggle_specific: payload["toggleSpecific"] = True if self.tool_registry.has_tools and tool_names is not None: tools = self.tool_registry.get_openai_tools_by_names(set(tool_names)) if len(tools) > MAX_TOOLS_PER_REQUEST: logger.warning( "Tool count %d exceeds hard cap %d, truncating", len(tools), MAX_TOOLS_PER_REQUEST, ) tools = tools[:MAX_TOOLS_PER_REQUEST] if tools: if _model_targets_gemini(effective_model): tools = await asyncio.to_thread( _sanitize_openai_tools_for_gemini, tools, ) payload["tools"] = tools headers = { "Authorization": f"Bearer {effective_api_key}", "Content-Type": "application/json", "HTTP-Referer": "https://github.com/matrix-llm-bot", "X-Title": "Matrix LLM Bot", } # Capture request snapshot for observability (logged after the call resolves). _obs_req_messages = list(messages) _obs_req_tools = list(tool_names) if tool_names is not None else None _obs_response_text = "" _obs_error = "" _obs_http_status = 0 _http_t0 = time.monotonic() _payload_dirty = True payload_bytes: bytes = b"" try: for _attempt in range(MAX_499_RETRIES): if _payload_dirty: payload_bytes = await asyncio.to_thread( lambda: json.dumps(payload).encode("utf-8"), ) _payload_dirty = False read_timeout_n = 0 while True: try: origin = get_http_call_origin() logger.info( "Sending OpenRouter chat completion request for model %s to %s (origin: %s)", effective_model, chat_url, origin, extra={ "channel_id": _tags.get("channel_id"), "platform": _tags.get("platform"), "user_id": _tags.get("user_id"), "request_id": _tags.get("request_id"), } ) resp = await self._http.post( chat_url, content=payload_bytes, headers=headers, ) logger.info( "OpenRouter chat completion request completed (status=%d) in %.1f ms (origin: %s)", resp.status_code, (time.monotonic() - _http_t0) * 1000, origin, extra={ "channel_id": _tags.get("channel_id"), "platform": _tags.get("platform"), "user_id": _tags.get("user_id"), "request_id": _tags.get("request_id"), } ) break except asyncio.CancelledError: raise except _RETRIABLE_ERRORS as exc: read_timeout_n += 1 logger.warning( "OpenRouter transient network error %s (attempt %d), retrying: %s", type(exc).__name__, read_timeout_n, chat_url[:120], ) await asyncio.sleep(min(0.5 * read_timeout_n, 5.0)) continue except httpx.RequestError as exc: _ms = (time.monotonic() - _http_t0) * 1000 _obs_error = f"network:{type(exc).__name__}: {exc}" asyncio.create_task( publish_http_error_event( http_service="openrouter", http_status=0, request_id=_tags["request_id"], channel_id=_tags["channel_id"], user_id=_tags["user_id"], platform=_tags["platform"], duration_ms=_ms, endpoint=chat_url[:120], detail=str(exc)[:500], error_kind="network", ), ) raise if resp.status_code == 499 and _attempt < MAX_499_RETRIES - 1: logger.warning( "HTTP 499, retrying immediately (%d/%d)", _attempt + 1, MAX_499_RETRIES, ) continue if resp.status_code == 404 and _attempt < MAX_499_RETRIES - 1: _body_404 = (resp.text or "")[:500] bl = _body_404.lower() if "input audio" in bl: stripped = _strip_audio_parts(messages) if stripped: logger.warning( "HTTP 404 'no endpoints support input audio' — " "stripped %d audio part(s), retrying (%d/%d)", stripped, _attempt + 1, MAX_499_RETRIES, ) payload["messages"] = messages _payload_dirty = True continue if "image input" in bl: # A *vision-capable* model returning this 404 is almost # always a transient routing/capacity blip — not a real # "this model can't see images". Stripping the user's # image and retrying blind makes the model silently # "not see" the picture (the reported intermittent bug). # So for vision models, back off and retry WITH the image # first; only strip as a last resort so the turn still # answers. Non-vision models still strip immediately. from model_capabilities import get_capabilities _vision_model = get_capabilities( effective_model ).supports_multimodal if _vision_model and _attempt < MAX_499_RETRIES - 2: logger.warning( "HTTP 404 'image input' on vision model %r — " "treating as transient; backing off and retrying " "with image intact (%d/%d)", effective_model, _attempt + 1, MAX_499_RETRIES, ) await asyncio.sleep(1.5) continue stripped = _strip_image_url_parts(messages) if stripped: logger.warning( "HTTP 404 'no endpoints support image input' — " "stripped %d image_url part(s) [model=%s vision=%s], " "retrying (%d/%d)", stripped, effective_model, _vision_model, _attempt + 1, MAX_499_RETRIES, ) payload["messages"] = messages _payload_dirty = True continue if resp.status_code == 400 and _attempt < MAX_499_RETRIES - 1: _body_400 = (resp.text or "")[:500] if ( "maximum context length" in _body_400.lower() and payload.get("max_tokens", 0) > _CTX_OVERFLOW_FALLBACK_MAX_TOKENS ): logger.warning( "HTTP 400 context-length overflow — reducing max_tokens " "%d%d and retrying (%d/%d)", payload["max_tokens"], _CTX_OVERFLOW_FALLBACK_MAX_TOKENS, _attempt + 1, MAX_499_RETRIES, ) payload["max_tokens"] = _CTX_OVERFLOW_FALLBACK_MAX_TOKENS _payload_dirty = True continue _obs_http_status = resp.status_code if resp.status_code >= 400: _body = (resp.text or "")[:4000] logger.error( "LLM chat HTTP %s (non-success). Response body (truncated): %s", resp.status_code, _body, ) _obs_error = f"HTTP {resp.status_code}: {_body[:500]}" _ms = (time.monotonic() - _http_t0) * 1000 asyncio.create_task( publish_http_error_event( http_service="openrouter", http_status=resp.status_code, request_id=_tags["request_id"], channel_id=_tags["channel_id"], user_id=_tags["user_id"], platform=_tags["platform"], duration_ms=_ms, endpoint=chat_url[:120], detail=_body[:500], ), ) resp.raise_for_status() break data = await _async_response_json(resp) if "error" in data: error_msg = data["error"].get("message", str(data["error"])) _obs_error = f"LLM API error: {error_msg[:500]}" raise RuntimeError(f"LLM API error: {error_msg}") choices = data.get("choices") if not choices: _obs_error = f"LLM API returned no choices: {str(data)[:300]}" raise RuntimeError(f"LLM API returned no choices: {data}") result_message = choices[0]["message"] _obs_response_text = _message_content_for_proxy_error_detection( result_message.get("content"), ) return result_message except asyncio.CancelledError: _obs_error = "CancelledError" raise except Exception as _exc: if not _obs_error: _obs_error = f"{type(_exc).__name__}: {_exc}"[:500] raise finally: _obs_duration_ms = (time.monotonic() - _http_t0) * 1000 asyncio.create_task( publish_llm_request_event( request_id=_tags.get("request_id", ""), channel_id=_tags.get("channel_id", ""), user_id=_tags.get("user_id", ""), platform=_tags.get("platform", ""), model=effective_model, messages=_obs_req_messages, tool_names=_obs_req_tools, response_text=_obs_response_text, error=_obs_error, http_status=_obs_http_status, duration_ms=_obs_duration_ms, ), name="obs_llm_req", )