"""Core HTTP transport client and native Gemini embedder/token-counter."""
from __future__ import annotations
import asyncio
import logging
import time
import os
from typing import Any, Callable, Awaitable
import httpx
import jsonutil as json
from gemini_embed_pool import (
EMBED_DIMENSIONS,
GEMINI_EMBED_BASE,
PAID_KEY_FALLBACK_THRESHOLD,
OpenRouterEmbedParseError,
check_openrouter_only,
gemini_embed_paid_fallback,
get_openrouter_api_key,
is_daily_quota_429,
mark_key_daily_spent,
next_gemini_embed_key,
openrouter_embed_batch,
record_key_usage,
set_openrouter_only,
)
from platforms.media_common import (
CLAUDE_MAX_IMAGE_BYTES,
detect_image_mimetype_from_bytes,
media_to_content_parts,
shrink_image_under_max_bytes,
)
from tool_context import ToolContext
from user_llm_config import sanitize_llm_http_url
from observability import (
publish_embedding_event,
publish_http_error_event,
publish_llm_request_event,
get_http_call_origin,
)
from openrouter_client.executor import OpenRouterExecutor, _ensure_trailing_user_turn
from openrouter_client.sanitization import (
resolve_model_alias,
_model_targets_gemini,
_model_targets_claude,
_model_targets_gemma4,
_model_strips_all_multimodal_before_request,
_strip_audio_parts,
_strip_video_url_parts,
_strip_image_url_parts,
_strip_video_and_audio_file_parts,
_relabel_assistant_image_turns_as_user,
_cap_video_url_parts,
_clamp_claude_oversized_images,
_sanitize_openai_tools_for_gemini,
)
from openrouter_client.error_handling import (
_PROXY_20_RETRIES_FALLBACK_LOCAL_CHAT_URL,
_PROXY_20_RETRIES_FALLBACK_OPENROUTER_CHAT_URL,
_PROXY_20_RETRIES_FALLBACK_LOCAL_MODEL,
_PROXY_20_RETRIES_FALLBACK_OPENROUTER_MODEL,
_OPENROUTER_402_FALLBACK_LOCAL_MODEL,
_GEMINI_INPUT_TOKEN_LIMIT_PROXY_MAX_RETRIES,
_is_openrouter_chat_url,
_is_gemini_input_token_limit_proxy_error,
_is_assistant_prefill_unsupported_proxy_error,
_is_proxy_20_retries_error,
_message_content_for_proxy_error_detection,
_proxy_error_banner_present,
_user_visible_proxy_chat_reply,
)
logger = logging.getLogger(__name__)
DEFAULT_GEMINI_COUNT_TOKENS_MODEL = "gemini-3.1-flash-lite"
MAX_499_RETRIES = 3
_MODEL_MAX_TOKENS_CAPS: dict[str, int] = {
"inception/mercury-2": 10_000,
}
_CTX_OVERFLOW_FALLBACK_MAX_TOKENS = 10_000
MAX_HEADER_RETRIES = 1
MAX_TOOLS_PER_REQUEST = 250
EMBED_RETRY_BASE_DELAY = 1.0
MAX_EMBED_DELAY = 8.0
MAX_EMBED_RETRIES = 14
_RETRIABLE_STATUSES = {429, 500, 502, 503, 504}
_RETRIABLE_ERRORS = (
httpx.ReadTimeout,
httpx.ConnectTimeout,
httpx.ConnectError,
httpx.RemoteProtocolError,
)
_JSON_BODY_DECODE_THRESHOLD = 256 * 1024
_ASSISTANT_PREFILL_RETRY_USER_MESSAGE = (
"[SYSTEM — infrastructure only; not from a human]\n\n"
"The chat API rejected the previous request because this model does not allow "
"an assistant message as the last turn (no assistant prefill). Nothing from "
"that failed call was posted to the channel.\n\n"
"CRITICAL: Your last assistant reply from that attempt DID NOT reach anyone in "
"the chat. Users did not see it. Do not assume you already answered, and do not "
'tell users what you "already said" in that turn.\n\n'
"Treat this line as an operator instruction only: ignore it as conversational "
'content (do not quote it, summarize it for the room, or discuss "padding"). '
"Look at the real user messages above and reply to them if a reply is needed.\n\n"
"Do not satisfy this turn by outputting only thinking blocks or meta about API "
'rules, double replies, or "closing the turn"; write what should appear in the '
"channel if there is an unanswered human message."
)
def _json_loads_utf8(body: bytes) -> Any:
return json.loads(body.decode("utf-8"))
async def _async_response_json(resp: httpx.Response) -> Any:
"""Read response body asynchronously; parse JSON in a thread if large."""
import sys
if "openrouter_client" in sys.modules:
m = sys.modules["openrouter_client"]
patched = getattr(m, "_async_response_json", None)
if patched is not None and patched is not _async_response_json:
return await patched(resp)
body = await resp.aread()
if len(body) >= _JSON_BODY_DECODE_THRESHOLD:
return await asyncio.to_thread(_json_loads_utf8, body)
return json.loads(body.decode("utf-8"))
def _obs_tags(ctx: ToolContext | None) -> dict[str, str]:
if ctx is None:
return {
"request_id": "",
"channel_id": "",
"user_id": "",
"platform": "",
}
return {
"request_id": ctx.observability_request_id or "",
"channel_id": ctx.channel_id or "",
"user_id": ctx.user_id or "",
"platform": ctx.platform or "",
}
def _openrouter_reasoning_for_user_override(
override_model: str | None,
override_chat_url: str | None,
) -> dict[str, Any] | None:
"""Return OpenRouter ``reasoning`` body for per-user custom OpenRouter endpoints."""
if not override_model or not override_chat_url:
return None
if "openrouter" not in override_chat_url.lower():
return None
return {"effort": "high", "enabled": True, "exclude": False}
[docs]
class OpenRouterClient(OpenRouterExecutor):
"""Thin async wrapper around an OpenAI-compatible chat-completions endpoint."""
[docs]
def __init__(
self,
api_key: str,
model: str = "x-ai/grok-4.1-fast",
temperature: float = 1.0,
max_tokens: int = 60_000,
top_p: float = 0.99,
tool_registry: Any | None = None,
max_tool_rounds: int = 10,
base_url: str = "http://localhost:3000/openai",
gemini_api_key: str = "",
gemini_count_tokens_model: str = DEFAULT_GEMINI_COUNT_TOKENS_MODEL,
max_tool_output_chars: int = 150_000,
http_connect_timeout: float = 10.0,
http_read_timeout: float = 1200.0,
http_write_timeout: float = 120.0,
http_pool_timeout: float = 180.0,
) -> None:
"""Initialize the instance."""
self.api_key = api_key
self.gemini_api_key = gemini_api_key or self._resolve_gemini_api_key()
self.gemini_count_tokens_model = gemini_count_tokens_model
self.model = model
self.temperature = temperature
self.max_tokens = max_tokens
self.top_p = top_p
if tool_registry is None:
from tools import ToolRegistry
self.tool_registry = ToolRegistry()
else:
self.tool_registry = tool_registry
self.max_tool_rounds = max_tool_rounds
self.max_tool_output_chars = max_tool_output_chars
_base = sanitize_llm_http_url(base_url)
self._chat_url = _base.rstrip("/") + "/chat/completions"
self._http = httpx.AsyncClient(
timeout=httpx.Timeout(
connect=http_connect_timeout,
read=http_read_timeout,
write=http_write_timeout,
pool=http_pool_timeout,
),
)
@staticmethod
def _resolve_gemini_api_key() -> str:
"""Return the Gemini API key from the environment.
Callers that already hold a loaded :class:`Config` (e.g. ``main.py``)
pass ``gemini_api_key`` explicitly, and ``Config.load`` already folds
the ``GEMINI_API_KEY`` env var into ``cfg.gemini_api_key``. This
fallback therefore only consults the environment and never re-parses
``config.yaml`` — previously that caused a redundant synchronous YAML
load on the event loop during startup.
"""
return os.getenv("GEMINI_API_KEY", "")
# ------------------------------------------------------------------
# Token counting via Gemini API
# ------------------------------------------------------------------
@staticmethod
def _build_gemini_contents(messages: list[dict[str, Any]]) -> list[dict]:
"""Convert OpenAI-format messages to Gemini ``contents`` (sync, CPU-bound)."""
contents = []
for msg in messages:
role = msg.get("role", "user")
content = msg.get("content", "")
if not content:
continue
if role in ("system", "assistant"):
gemini_role = "model"
else:
gemini_role = "user"
if isinstance(content, list):
text = " ".join(
p.get("text", "") for p in content if p.get("type") == "text"
)
if not text:
continue
contents.append(
{
"role": gemini_role,
"parts": [{"text": text}],
}
)
else:
contents.append(
{
"role": gemini_role,
"parts": [{"text": str(content)}],
}
)
if not contents:
return []
deduped: list[dict] = [contents[0]]
for c in contents[1:]:
if c["role"] == deduped[-1]["role"]:
deduped[-1]["parts"].extend(c["parts"])
else:
deduped.append(c)
return deduped
async def _count_tokens(
self,
messages: list[dict[str, Any]],
*,
chan_info: str = "",
gemini_model: str | None = None,
) -> int | None:
"""Count input tokens using the Gemini countTokens API."""
if not self.gemini_api_key:
return None
model_id = gemini_model or self.gemini_count_tokens_model
deduped = await asyncio.to_thread(
self._build_gemini_contents,
messages,
)
if not deduped:
return None
url = (
f"https://generativelanguage.googleapis.com/v1beta/"
f"models/{model_id}:countTokens"
f"?key={self.gemini_api_key}"
)
payload = {"contents": deduped}
try:
resp = await self._http.post(
url,
json=payload,
timeout=5.0,
)
if resp.status_code == 200:
data = await _async_response_json(resp)
count = data.get("totalTokens")
if isinstance(count, int):
logger.info(
"%sGemini countTokens: %d tokens",
chan_info,
count,
)
return count
else:
logger.debug(
"countTokens returned %d: %s",
resp.status_code,
resp.text[:200],
)
except Exception:
logger.debug(
"countTokens call failed",
exc_info=True,
)
return None
[docs]
async def close(self) -> None:
"""Close HTTP client."""
await self._http.aclose()
@staticmethod
def _gemini_model_name(model: str) -> str:
"""Convert OpenRouter model name to Gemini native model name."""
return model.removeprefix("google/")
# ------------------------------------------------------------------
# Embedding provider implementations
# ------------------------------------------------------------------
[docs]
async def embed(self, text: str, model: str) -> list[float]:
"""Generate an embedding vector for *text* via the Gemini API."""
if not text or not text.strip():
return [0.0] * EMBED_DIMENSIONS
round_num = 0
last_error: str | None = None
while round_num < MAX_EMBED_RETRIES:
try:
return await self._embed_gemini(text, model)
except ValueError:
raise
except OpenRouterEmbedParseError as exc:
logger.warning(
"OpenRouter embed 200-no-data for single text "
"(non-retriable) — returning zero vector: %s",
str(exc)[:500],
)
return [0.0] * EMBED_DIMENSIONS
except Exception as exc:
last_error = str(exc)
logger.warning(
"Gemini embed failed (round %d): %s",
round_num + 1,
exc,
)
delay = min(EMBED_RETRY_BASE_DELAY * (2**round_num), MAX_EMBED_DELAY)
await asyncio.sleep(delay)
round_num += 1
raise RuntimeError(
f"Gemini embed failed after {MAX_EMBED_RETRIES} rounds: {last_error}"
)
[docs]
async def embed_batch(
self,
texts: list[str],
model: str,
) -> list[list[float]]:
"""Generate embedding vectors for multiple texts via the Gemini API."""
if not texts:
return []
valid_indices: list[int] = []
valid_texts: list[str] = []
for i, t in enumerate(texts):
if t and t.strip():
valid_indices.append(i)
valid_texts.append(t)
if not valid_texts:
return [[0.0] * EMBED_DIMENSIONS for _ in texts]
round_num = 0
last_error: str | None = None
while round_num < MAX_EMBED_RETRIES:
try:
valid_embeddings = await self._embed_gemini_batch(valid_texts, model)
result: list[list[float]] = [[0.0] * EMBED_DIMENSIONS for _ in texts]
for idx, emb in zip(valid_indices, valid_embeddings):
result[idx] = emb
return result
except ValueError:
raise
except Exception as exc:
last_error = str(exc)
logger.warning(
"Gemini batch embed failed (round %d): %s",
round_num + 1,
exc,
)
delay = min(EMBED_RETRY_BASE_DELAY * (2**round_num), MAX_EMBED_DELAY)
await asyncio.sleep(delay)
round_num += 1
raise RuntimeError(
f"Gemini embed_batch failed after {MAX_EMBED_RETRIES} rounds: {last_error}"
)
async def _embed_gemini(
self,
text: str,
model: str,
task_type: str | None = None,
) -> list[float]:
"""Embed a single text via the native Gemini API."""
if not text or not text.strip():
return [0.0] * EMBED_DIMENSIONS
if await check_openrouter_only():
logger.info("OpenRouter-only mode — bypassing Gemini for single text")
try:
vecs = await openrouter_embed_batch(
[text],
model=model,
api_key=self.api_key,
)
return vecs[0]
except Exception:
logger.warning(
"OpenRouter failed while pinned to openrouter_only — "
"trying paid Gemini key as last resort",
exc_info=True,
)
paid_vecs = await gemini_embed_paid_fallback(
[text],
model=model,
task_type=task_type,
)
return paid_vecs[0]
gemini_model = self._gemini_model_name(model)
api_key = next_gemini_embed_key()
url = f"{GEMINI_EMBED_BASE}/{gemini_model}:embedContent?key={api_key}"
payload: dict = {
"model": f"models/{gemini_model}",
"content": {"parts": [{"text": text}]},
"output_dimensionality": EMBED_DIMENSIONS,
}
if task_type:
payload["taskType"] = task_type
attempt = 0
consecutive_429 = 0
tried_or_then_paid = False
_embed_start = time.monotonic()
last_error: str | None = "unknown"
while attempt < MAX_EMBED_RETRIES:
if attempt > 0:
delay = min(
EMBED_RETRY_BASE_DELAY * (2 ** (attempt - 1)),
MAX_EMBED_DELAY,
)
logger.warning(
"Gemini embed error, retrying in %.1fs (attempt %d): %s",
delay,
attempt + 1,
last_error,
)
await asyncio.sleep(delay)
try:
_t0 = time.monotonic()
origin = get_http_call_origin()
logger.info(
"Sending Gemini embedding request for model %s (origin: %s)",
model,
origin,
)
resp = await self._http.post(url, json=payload)
logger.info(
"Gemini embed HTTP request completed in %.0f ms (status=%d) (origin: %s)",
(time.monotonic() - _t0) * 1000,
resp.status_code,
origin,
)
await record_key_usage(api_key)
if resp.status_code == 400:
body = resp.text[:500]
logger.error(
"Gemini embed 400 Bad Request (non-retriable): %s",
body,
)
_ms = (time.monotonic() - _t0) * 1000
asyncio.create_task(
publish_http_error_event(
http_service="gemini_embed",
http_status=400,
duration_ms=_ms,
endpoint=url.split("?")[0][:120],
detail=body[:500],
),
)
asyncio.create_task(
publish_embedding_event(
source="gemini_embed_single",
model=model,
duration_ms=_ms,
success=False,
text_len=len(text),
error=body[:300],
),
)
raise ValueError(f"Gemini embed 400 Bad Request: {body}")
if resp.status_code == 429:
if is_daily_quota_429(resp):
await mark_key_daily_spent(api_key, "embed")
api_key = next_gemini_embed_key()
url = (
f"{GEMINI_EMBED_BASE}/{gemini_model}"
f":embedContent?key={api_key}"
)
last_error = "HTTP 429 (daily quota)"
attempt += 1
continue
consecutive_429 += 1
if (
consecutive_429 >= PAID_KEY_FALLBACK_THRESHOLD
and not tried_or_then_paid
):
tried_or_then_paid = True
logger.warning(
"Free Gemini pool 429'd %d times — escalating "
"to OpenRouter, then paid Gemini key",
consecutive_429,
)
try:
vecs = await openrouter_embed_batch(
[text],
model=model,
api_key=self.api_key,
)
await set_openrouter_only()
return vecs[0]
except Exception:
logger.warning(
"OpenRouter embed fallback failed — "
"trying paid Gemini key as last resort",
exc_info=True,
)
try:
paid_vecs = await gemini_embed_paid_fallback(
[text],
model=model,
task_type=task_type,
)
return paid_vecs[0]
except Exception:
logger.warning(
"Paid Gemini key fallback also failed",
exc_info=True,
)
last_error = "HTTP 429"
attempt += 1
continue
if resp.status_code in _RETRIABLE_STATUSES:
last_error = f"HTTP {resp.status_code}"
attempt += 1
continue
resp.raise_for_status()
data = await _async_response_json(resp)
_ms = (time.monotonic() - _t0) * 1000
asyncio.create_task(
publish_embedding_event(
source="gemini_embed_single",
model=model,
duration_ms=_ms,
success=True,
text_len=len(text),
),
)
return data["embedding"]["values"]
except ValueError:
raise
except Exception as exc:
last_error = str(exc)
attempt += 1
_ms = (time.monotonic() - _embed_start) * 1000
asyncio.create_task(
publish_http_error_event(
http_service="gemini_embed",
http_status=0,
duration_ms=_ms,
endpoint="gemini:embedContent",
detail=str(last_error)[:500] if last_error else "unknown",
error_kind="exhausted_retries",
),
)
asyncio.create_task(
publish_embedding_event(
source="gemini_embed_single",
model=model,
duration_ms=_ms,
success=False,
text_len=len(text),
error=str(last_error)[:300] if last_error else "",
),
)
raise RuntimeError(
f"Gemini embed failed after {MAX_EMBED_RETRIES} attempts: {last_error}"
)
async def _embed_gemini_batch(
self,
texts: list[str],
model: str,
task_type: str | None = None,
) -> list[list[float]]:
"""Embed a batch of texts via the native Gemini API."""
if not texts:
return []
valid_indices: list[int] = []
valid_texts: list[str] = []
for i, t in enumerate(texts):
if t and t.strip():
valid_indices.append(i)
valid_texts.append(t)
if not valid_texts:
return [[0.0] * EMBED_DIMENSIONS for _ in texts]
if len(valid_texts) < len(texts):
partial = await self._embed_gemini_batch(
valid_texts,
model,
task_type=task_type,
)
result: list[list[float]] = [[0.0] * EMBED_DIMENSIONS for _ in texts]
for idx, emb in zip(valid_indices, partial):
result[idx] = emb
return result
texts = valid_texts
if len(texts) > 100:
chunks = [texts[i : i + 100] for i in range(0, len(texts), 100)]
results: list[list[float]] = []
for chunk in chunks:
chunk_embeddings = await self._embed_gemini_batch(
chunk,
model,
task_type=task_type,
)
results.extend(chunk_embeddings)
return results
if await check_openrouter_only():
logger.info(
"OpenRouter-only mode — bypassing Gemini for %d texts", len(texts)
)
try:
return await openrouter_embed_batch(
texts,
model=model,
api_key=self.api_key,
)
except Exception:
logger.warning(
"OpenRouter failed while pinned to openrouter_only — "
"trying paid Gemini key as last resort",
exc_info=True,
)
return await gemini_embed_paid_fallback(
texts,
model=model,
task_type=task_type,
)
gemini_model = self._gemini_model_name(model)
api_key = next_gemini_embed_key()
url = f"{GEMINI_EMBED_BASE}/{gemini_model}:batchEmbedContents?key={api_key}"
requests_list = []
for t in texts:
req: dict = {
"model": f"models/{gemini_model}",
"content": {"parts": [{"text": t}]},
"output_dimensionality": EMBED_DIMENSIONS,
}
if task_type:
req["taskType"] = task_type
requests_list.append(req)
payload = {"requests": requests_list}
_tags = _obs_tags(None)
attempt = 0
consecutive_429 = 0
tried_or_then_paid = False
last_error: str | None = "unknown"
while attempt < MAX_EMBED_RETRIES:
if attempt > 0:
delay = min(
EMBED_RETRY_BASE_DELAY * (2 ** (attempt - 1)),
MAX_EMBED_DELAY,
)
logger.warning(
"Gemini batch embed error, retrying in %.1fs (attempt %d): %s",
delay,
attempt + 1,
last_error,
)
await asyncio.sleep(delay)
try:
_t0 = time.monotonic()
origin = get_http_call_origin()
logger.info(
"Sending Gemini batch embedding request of %d texts for model %s (origin: %s)",
len(texts),
model,
origin,
)
resp = await self._http.post(url, json=payload)
logger.info(
"Gemini batch embed HTTP request completed in %.0f ms (status=%d) (origin: %s)",
(time.monotonic() - _t0) * 1000,
resp.status_code,
origin,
)
await record_key_usage(api_key)
if resp.status_code == 400:
body = resp.text[:500]
logger.error(
"Gemini batch embed 400 Bad Request (non-retriable): %s",
body,
)
_ms = (time.monotonic() - _t0) * 1000
asyncio.create_task(
publish_http_error_event(
http_service="gemini_embed_batch",
http_status=resp.status_code,
request_id=_tags["request_id"],
channel_id=_tags["channel_id"],
user_id=_tags["user_id"],
platform=_tags["platform"],
duration_ms=_ms,
endpoint=url.split("?")[0][:120],
detail=body[:500],
),
)
raise ValueError(f"Gemini batch embed 400 Bad Request: {body}")
if resp.status_code == 429:
if is_daily_quota_429(resp):
await mark_key_daily_spent(api_key, "embed")
api_key = next_gemini_embed_key()
url = (
f"{GEMINI_EMBED_BASE}/{gemini_model}"
f":batchEmbedContents?key={api_key}"
)
last_error = "HTTP 429 (daily quota)"
attempt += 1
continue
consecutive_429 += 1
if (
consecutive_429 >= PAID_KEY_FALLBACK_THRESHOLD
and not tried_or_then_paid
):
tried_or_then_paid = True
logger.warning(
"Free Gemini pool 429'd %d times — escalating "
"to OpenRouter, then paid Gemini key",
consecutive_429,
)
try:
vecs = await openrouter_embed_batch(
texts,
model=model,
api_key=self.api_key,
)
await set_openrouter_only()
return vecs
except Exception:
logger.warning(
"OpenRouter embed fallback failed — "
"trying paid Gemini key as last resort",
exc_info=True,
)
try:
return await gemini_embed_paid_fallback(
texts,
model=model,
task_type=task_type,
)
except Exception:
logger.warning(
"Paid Gemini key fallback also failed",
exc_info=True,
)
last_error = "HTTP 429"
attempt += 1
continue
if resp.status_code in _RETRIABLE_STATUSES:
last_error = f"HTTP {resp.status_code}"
attempt += 1
continue
resp.raise_for_status()
data = await _async_response_json(resp)
_ms = (time.monotonic() - _t0) * 1000
asyncio.create_task(
publish_embedding_event(
source="gemini_batch",
model=model,
duration_ms=_ms,
success=True,
batch_size=len(texts),
request_id=_tags["request_id"],
channel_id=_tags["channel_id"],
user_id=_tags["user_id"],
platform=_tags["platform"],
),
)
return [item["values"] for item in data["embeddings"]]
except ValueError:
raise
except Exception as exc:
last_error = str(exc)
attempt += 1
raise RuntimeError(
f"Gemini batch embed failed after {MAX_EMBED_RETRIES} attempts: {last_error}"
)
# ------------------------------------------------------------------
# HTTP Chat Completions Transport Callers
# ------------------------------------------------------------------
async def _call_api_with_optional_gemini_proxy_retry(
self,
messages: list[dict[str, Any]],
tool_names: list[str] | None = None,
user_id: str = "",
ctx: ToolContext | None = None,
max_video_url_parts: int | None = 5,
*,
override_model: str | None = None,
override_api_key: str | None = None,
override_chat_url: str | None = None,
response_format: dict[str, Any] | None = None,
toggle_specific: bool = False,
) -> dict[str, Any]:
"""Call :meth:`_call_api` with selective retries for known proxy error bodies."""
last_normalized: dict[str, Any] | None = None
work_messages: list[dict[str, Any]] = messages
gemini_token_retries_remaining = _GEMINI_INPUT_TOKEN_LIMIT_PROXY_MAX_RETRIES
prefill_fixup_applied = False
payment_402_fallback_used = False
active_override_model: str | None = override_model
active_override_chat_url: str | None = override_chat_url
# Room for token-limit retries, one prefill fixup, and 402 re-entry.
max_attempts = max(
16,
_GEMINI_INPUT_TOKEN_LIMIT_PROXY_MAX_RETRIES + 6,
)
for attempt in range(max_attempts):
effective_model = resolve_model_alias(active_override_model or self.model)
try:
raw = await self._call_api(
work_messages,
tool_names=tool_names,
user_id=user_id,
ctx=ctx,
max_video_url_parts=max_video_url_parts,
override_model=active_override_model,
override_api_key=override_api_key,
override_chat_url=active_override_chat_url,
response_format=response_format,
toggle_specific=toggle_specific,
)
except httpx.HTTPStatusError as exc:
if exc.response.status_code != 402:
raise
if payment_402_fallback_used:
raise
attempt_url = active_override_chat_url or self._chat_url
if not _is_openrouter_chat_url(attempt_url):
raise
resolved_model = resolve_model_alias(
active_override_model or self.model
)
_fb_model = resolve_model_alias(_OPENROUTER_402_FALLBACK_LOCAL_MODEL)
if (
attempt_url.rstrip("/")
== _PROXY_20_RETRIES_FALLBACK_LOCAL_CHAT_URL.rstrip("/")
and resolved_model == _fb_model
):
raise
payment_402_fallback_used = True
logger.warning(
"OpenRouter HTTP 402 Payment Required; retrying via %s (%s)",
_PROXY_20_RETRIES_FALLBACK_LOCAL_CHAT_URL,
_OPENROUTER_402_FALLBACK_LOCAL_MODEL,
)
active_override_model = _OPENROUTER_402_FALLBACK_LOCAL_MODEL
active_override_chat_url = _PROXY_20_RETRIES_FALLBACK_LOCAL_CHAT_URL
effective_model = resolve_model_alias(
active_override_model or self.model
)
raw = await self._call_api(
work_messages,
tool_names=tool_names,
user_id=user_id,
ctx=ctx,
max_video_url_parts=max_video_url_parts,
override_model=active_override_model,
override_api_key=override_api_key,
override_chat_url=active_override_chat_url,
response_format=response_format,
toggle_specific=toggle_specific,
)
last_normalized = self._normalize_assistant_tool_message(raw)
raw_content = last_normalized.get("content")
if (
gemini_token_retries_remaining > 0
and _is_gemini_input_token_limit_proxy_error(raw_content)
):
gemini_token_retries_remaining -= 1
logger.warning(
"Proxy reported input token limit (often transient mis-route; "
"model=%r); retrying full chat request "
"(%d/%d)",
effective_model,
_GEMINI_INPUT_TOKEN_LIMIT_PROXY_MAX_RETRIES
- gemini_token_retries_remaining,
_GEMINI_INPUT_TOKEN_LIMIT_PROXY_MAX_RETRIES,
)
continue
if (
not prefill_fixup_applied
and _is_assistant_prefill_unsupported_proxy_error(raw_content)
):
prefill_fixup_applied = True
work_messages = [
*work_messages,
{
"role": "user",
"content": _ASSISTANT_PREFILL_RETRY_USER_MESSAGE,
},
]
logger.warning(
"Assistant message prefill not supported for this model "
"(proxy 400); appending synthetic user message and retrying "
"(attempt %d)",
attempt + 1,
)
continue
return last_normalized
assert last_normalized is not None
return last_normalized
async def _call_api_with_optional_proxy_transport_fallbacks(
self,
messages: list[dict[str, Any]],
tool_names: list[str] | None = None,
user_id: str = "",
ctx: ToolContext | None = None,
max_video_url_parts: int | None = 5,
*,
override_model: str | None = None,
override_api_key: str | None = None,
override_chat_url: str | None = None,
response_format: dict[str, Any] | None = None,
toggle_specific: bool = False,
) -> dict[str, Any]:
"""Run Gemini/prefill proxy retries, then optional transport fallbacks."""
primary = await self._call_api_with_optional_gemini_proxy_retry(
messages,
tool_names=tool_names,
user_id=user_id,
ctx=ctx,
max_video_url_parts=max_video_url_parts,
override_model=override_model,
override_api_key=override_api_key,
override_chat_url=override_chat_url,
response_format=response_format,
toggle_specific=toggle_specific,
)
if not _is_proxy_20_retries_error(primary.get("content")):
return primary
logger.warning(
"Proxy reported 20-retries exhaustion; retrying via %s (%s)",
_PROXY_20_RETRIES_FALLBACK_LOCAL_CHAT_URL,
_PROXY_20_RETRIES_FALLBACK_LOCAL_MODEL,
)
fallback1: dict[str, Any] | None = None
try:
fallback1 = await self._call_api_with_optional_gemini_proxy_retry(
messages,
tool_names=tool_names,
user_id=user_id,
ctx=ctx,
max_video_url_parts=max_video_url_parts,
override_model=_PROXY_20_RETRIES_FALLBACK_LOCAL_MODEL,
override_api_key=override_api_key,
override_chat_url=_PROXY_20_RETRIES_FALLBACK_LOCAL_CHAT_URL,
response_format=response_format,
toggle_specific=toggle_specific,
)
except Exception:
logger.exception(
"Tier-1 transport fallback (localhost Claude) failed",
)
if fallback1 is not None:
t1_flat = _message_content_for_proxy_error_detection(
fallback1.get("content"),
)
if not _proxy_error_banner_present(t1_flat):
return fallback1
import openrouter_client
get_key_fn = getattr(openrouter_client, "get_openrouter_api_key", get_openrouter_api_key)
or_key = get_key_fn()
if not or_key:
logger.error(
"OpenRouter API key missing; cannot run tier-2 transport fallback",
)
return fallback1 if fallback1 is not None else primary
logger.warning(
"Transport fallback tier 2: %s model=%s",
_PROXY_20_RETRIES_FALLBACK_OPENROUTER_CHAT_URL,
_PROXY_20_RETRIES_FALLBACK_OPENROUTER_MODEL,
)
try:
return await self._call_api_with_optional_gemini_proxy_retry(
messages,
tool_names=tool_names,
user_id=user_id,
ctx=ctx,
max_video_url_parts=max_video_url_parts,
override_model=_PROXY_20_RETRIES_FALLBACK_OPENROUTER_MODEL,
override_api_key=or_key,
override_chat_url=_PROXY_20_RETRIES_FALLBACK_OPENROUTER_CHAT_URL,
response_format=response_format,
toggle_specific=toggle_specific,
)
except Exception:
logger.exception(
"Tier-2 transport fallback (OpenRouter Gemini) failed",
)
return fallback1 if fallback1 is not None else primary
async def _call_api(
self,
messages: list[dict[str, Any]],
tool_names: list[str] | None = None,
user_id: str = "",
ctx: ToolContext | None = None,
max_video_url_parts: int | None = 5,
*,
override_model: str | None = None,
override_api_key: str | None = None,
override_chat_url: str | None = None,
response_format: dict[str, Any] | None = None,
toggle_specific: bool = False,
) -> dict[str, Any]:
"""Make a single chat-completions request and return the first choice message."""
effective_model = resolve_model_alias(override_model or self.model)
effective_api_key = override_api_key or self.api_key
chat_url = sanitize_llm_http_url(override_chat_url or self._chat_url)
from tools._safe_http import assert_safe_http_url
from urllib.parse import urlparse
parsed_url = urlparse(chat_url)
is_local = False
if parsed_url.hostname:
hl = parsed_url.hostname.lower().strip(".")
is_local = (hl == "localhost" or hl == "127.0.0.1" or hl.startswith("127."))
if not is_local:
chat_url = assert_safe_http_url(chat_url)
if _model_strips_all_multimodal_before_request(effective_model):
ra = _strip_audio_parts(messages)
rv = _strip_video_url_parts(messages)
ri = _strip_image_url_parts(messages)
if ra or rv or ri:
logger.info(
"Stripped multimodal content for text-only model %r "
"(audio=%d, video=%d, image=%d)",
effective_model,
ra,
rv,
ri,
)
elif _model_targets_claude(effective_model):
rv = _strip_video_url_parts(messages)
rv_file = _strip_video_and_audio_file_parts(messages)
ra = _strip_audio_parts(messages)
if rv or rv_file or ra:
logger.info(
"Stripped video/audio for Claude model %r "
"(video_url=%d, file_video_or_audio=%d, audio_type_parts=%d)",
effective_model,
rv,
rv_file,
ra,
)
elif _model_targets_gemma4(effective_model):
rv = _strip_video_url_parts(messages)
rv_file = _strip_video_and_audio_file_parts(messages)
ra = _strip_audio_parts(messages)
if rv or rv_file or ra:
logger.info(
"Stripped video/audio for Gemma 4 model %r "
"(video_url=%d, file_video_or_audio=%d, audio_type_parts=%d)",
effective_model,
rv,
rv_file,
ra,
)
elif not _model_targets_gemini(effective_model):
removed = _strip_video_url_parts(messages)
if removed:
logger.info(
"Stripped %d video_url part(s) — model %r does not support video",
removed,
effective_model,
)
elif max_video_url_parts is not None and max_video_url_parts > 0:
removed = _cap_video_url_parts(messages, max_video_url_parts)
if removed:
logger.info(
"Stripped %d older video_url part(s), keeping %d",
removed,
max_video_url_parts,
)
# Image content is never valid inside an assistant/model turn for any
# provider we route to: Anthropic rejects it and Google Gemini returns
# INVALID_ARGUMENT. Bot-sent images are recorded as a separate user turn
# upstream, but legacy history (persisted before that fix) and any path
# that slips through can still carry an assistant-role image. Re-role
# such turns to ``user`` here, regardless of provider, so the request
# never fails on a bot-sent image. Text-only models already stripped all
# images above, so this is a no-op for them.
relabeled = _relabel_assistant_image_turns_as_user(messages)
if relabeled:
logger.info(
"Relabeled %d assistant turn(s) with image content as user "
"for model %r",
relabeled,
effective_model,
)
if _model_targets_claude(effective_model):
img_changed = await asyncio.to_thread(
_clamp_claude_oversized_images,
messages,
effective_model,
)
if img_changed:
logger.info(
"Adjusted %d oversized image_url part(s) for Claude model %r",
img_changed,
effective_model,
)
_tags = _obs_tags(ctx)
_ensure_trailing_user_turn(
messages,
model_for_log=effective_model,
call_site="_call_api",
)
_effective_max_tokens = min(
self.max_tokens,
_MODEL_MAX_TOKENS_CAPS.get(effective_model, self.max_tokens),
)
if _effective_max_tokens < self.max_tokens:
logger.info(
"Model %r has a max_tokens cap of %d (configured: %d)",
effective_model,
_effective_max_tokens,
self.max_tokens,
)
effective_temperature = self.temperature
if _model_targets_claude(effective_model) and effective_temperature > 1.0:
logger.info(
"Clamping temperature %.2f -> 0.99 for Claude model %r",
effective_temperature,
effective_model,
)
effective_temperature = 0.99
payload: dict[str, Any] = {
"model": effective_model,
"messages": messages,
"temperature": effective_temperature,
"max_tokens": _effective_max_tokens,
}
if response_format:
payload["response_format"] = response_format
payload["top_p"] = self.top_p
_reasoning = _openrouter_reasoning_for_user_override(
override_model,
override_chat_url,
)
if _reasoning is not None:
payload["reasoning"] = _reasoning
logger.info(
"OpenRouter user override: added reasoning=%s for model=%r",
_reasoning,
effective_model,
)
if user_id:
payload["user"] = user_id
if toggle_specific:
payload["toggleSpecific"] = True
if self.tool_registry.has_tools and tool_names is not None:
tools = self.tool_registry.get_openai_tools_by_names(set(tool_names))
if len(tools) > MAX_TOOLS_PER_REQUEST:
logger.warning(
"Tool count %d exceeds hard cap %d, truncating",
len(tools),
MAX_TOOLS_PER_REQUEST,
)
tools = tools[:MAX_TOOLS_PER_REQUEST]
if tools:
if _model_targets_gemini(effective_model):
tools = await asyncio.to_thread(
_sanitize_openai_tools_for_gemini,
tools,
)
payload["tools"] = tools
headers = {
"Authorization": f"Bearer {effective_api_key}",
"Content-Type": "application/json",
"HTTP-Referer": "https://github.com/matrix-llm-bot",
"X-Title": "Matrix LLM Bot",
}
# Capture request snapshot for observability (logged after the call resolves).
_obs_req_messages = list(messages)
_obs_req_tools = list(tool_names) if tool_names is not None else None
_obs_response_text = ""
_obs_error = ""
_obs_http_status = 0
_http_t0 = time.monotonic()
_payload_dirty = True
payload_bytes: bytes = b""
try:
for _attempt in range(MAX_499_RETRIES):
if _payload_dirty:
payload_bytes = await asyncio.to_thread(
lambda: json.dumps(payload).encode("utf-8"),
)
_payload_dirty = False
read_timeout_n = 0
while True:
try:
origin = get_http_call_origin()
logger.info(
"Sending OpenRouter chat completion request for model %s to %s (origin: %s)",
effective_model,
chat_url,
origin,
extra={
"channel_id": _tags.get("channel_id"),
"platform": _tags.get("platform"),
"user_id": _tags.get("user_id"),
"request_id": _tags.get("request_id"),
}
)
resp = await self._http.post(
chat_url,
content=payload_bytes,
headers=headers,
)
logger.info(
"OpenRouter chat completion request completed (status=%d) in %.1f ms (origin: %s)",
resp.status_code,
(time.monotonic() - _http_t0) * 1000,
origin,
extra={
"channel_id": _tags.get("channel_id"),
"platform": _tags.get("platform"),
"user_id": _tags.get("user_id"),
"request_id": _tags.get("request_id"),
}
)
break
except asyncio.CancelledError:
raise
except _RETRIABLE_ERRORS as exc:
read_timeout_n += 1
logger.warning(
"OpenRouter transient network error %s (attempt %d), retrying: %s",
type(exc).__name__,
read_timeout_n,
chat_url[:120],
)
await asyncio.sleep(min(0.5 * read_timeout_n, 5.0))
continue
except httpx.RequestError as exc:
_ms = (time.monotonic() - _http_t0) * 1000
_obs_error = f"network:{type(exc).__name__}: {exc}"
asyncio.create_task(
publish_http_error_event(
http_service="openrouter",
http_status=0,
request_id=_tags["request_id"],
channel_id=_tags["channel_id"],
user_id=_tags["user_id"],
platform=_tags["platform"],
duration_ms=_ms,
endpoint=chat_url[:120],
detail=str(exc)[:500],
error_kind="network",
),
)
raise
if resp.status_code == 499 and _attempt < MAX_499_RETRIES - 1:
logger.warning(
"HTTP 499, retrying immediately (%d/%d)",
_attempt + 1,
MAX_499_RETRIES,
)
continue
if resp.status_code == 404 and _attempt < MAX_499_RETRIES - 1:
_body_404 = (resp.text or "")[:500]
bl = _body_404.lower()
if "input audio" in bl:
stripped = _strip_audio_parts(messages)
if stripped:
logger.warning(
"HTTP 404 'no endpoints support input audio' — "
"stripped %d audio part(s), retrying (%d/%d)",
stripped,
_attempt + 1,
MAX_499_RETRIES,
)
payload["messages"] = messages
_payload_dirty = True
continue
if "image input" in bl:
# A *vision-capable* model returning this 404 is almost
# always a transient routing/capacity blip — not a real
# "this model can't see images". Stripping the user's
# image and retrying blind makes the model silently
# "not see" the picture (the reported intermittent bug).
# So for vision models, back off and retry WITH the image
# first; only strip as a last resort so the turn still
# answers. Non-vision models still strip immediately.
from model_capabilities import get_capabilities
_vision_model = get_capabilities(
effective_model
).supports_multimodal
if _vision_model and _attempt < MAX_499_RETRIES - 2:
logger.warning(
"HTTP 404 'image input' on vision model %r — "
"treating as transient; backing off and retrying "
"with image intact (%d/%d)",
effective_model,
_attempt + 1,
MAX_499_RETRIES,
)
await asyncio.sleep(1.5)
continue
stripped = _strip_image_url_parts(messages)
if stripped:
logger.warning(
"HTTP 404 'no endpoints support image input' — "
"stripped %d image_url part(s) [model=%s vision=%s], "
"retrying (%d/%d)",
stripped,
effective_model,
_vision_model,
_attempt + 1,
MAX_499_RETRIES,
)
payload["messages"] = messages
_payload_dirty = True
continue
if resp.status_code == 400 and _attempt < MAX_499_RETRIES - 1:
_body_400 = (resp.text or "")[:500]
if (
"maximum context length" in _body_400.lower()
and payload.get("max_tokens", 0) > _CTX_OVERFLOW_FALLBACK_MAX_TOKENS
):
logger.warning(
"HTTP 400 context-length overflow — reducing max_tokens "
"%d → %d and retrying (%d/%d)",
payload["max_tokens"],
_CTX_OVERFLOW_FALLBACK_MAX_TOKENS,
_attempt + 1,
MAX_499_RETRIES,
)
payload["max_tokens"] = _CTX_OVERFLOW_FALLBACK_MAX_TOKENS
_payload_dirty = True
continue
_obs_http_status = resp.status_code
if resp.status_code >= 400:
_body = (resp.text or "")[:4000]
logger.error(
"LLM chat HTTP %s (non-success). Response body (truncated): %s",
resp.status_code,
_body,
)
_obs_error = f"HTTP {resp.status_code}: {_body[:500]}"
_ms = (time.monotonic() - _http_t0) * 1000
asyncio.create_task(
publish_http_error_event(
http_service="openrouter",
http_status=resp.status_code,
request_id=_tags["request_id"],
channel_id=_tags["channel_id"],
user_id=_tags["user_id"],
platform=_tags["platform"],
duration_ms=_ms,
endpoint=chat_url[:120],
detail=_body[:500],
),
)
resp.raise_for_status()
break
data = await _async_response_json(resp)
if "error" in data:
error_msg = data["error"].get("message", str(data["error"]))
_obs_error = f"LLM API error: {error_msg[:500]}"
raise RuntimeError(f"LLM API error: {error_msg}")
choices = data.get("choices")
if not choices:
_obs_error = f"LLM API returned no choices: {str(data)[:300]}"
raise RuntimeError(f"LLM API returned no choices: {data}")
result_message = choices[0]["message"]
_obs_response_text = _message_content_for_proxy_error_detection(
result_message.get("content"),
)
return result_message
except asyncio.CancelledError:
_obs_error = "CancelledError"
raise
except Exception as _exc:
if not _obs_error:
_obs_error = f"{type(_exc).__name__}: {_exc}"[:500]
raise
finally:
_obs_duration_ms = (time.monotonic() - _http_t0) * 1000
asyncio.create_task(
publish_llm_request_event(
request_id=_tags.get("request_id", ""),
channel_id=_tags.get("channel_id", ""),
user_id=_tags.get("user_id", ""),
platform=_tags.get("platform", ""),
model=effective_model,
messages=_obs_req_messages,
tool_names=_obs_req_tools,
response_text=_obs_response_text,
error=_obs_error,
http_status=_obs_http_status,
duration_ms=_obs_duration_ms,
),
name="obs_llm_req",
)