"""Query Wolfram|Alpha via the LLM API.
Uses ``https://www.wolframalpha.com/api/v1/llm-api`` which returns
plain-text results optimised for consumption by large language models.
Resolve the App ID via per-user key, ``WOLFRAM_ALPHA_APP_ID`` env var,
or ``api_keys.wolfram_alpha`` in config.yaml.
Optional: set ``WOLFRAM_ALPHA_USE_BEARER_AUTH=1`` to send the App ID in the
``Authorization: Bearer`` header instead of the ``appid`` query parameter.
"""
from __future__ import annotations
import asyncio
import jsonutil as json
import logging
import os
from typing import Any, TYPE_CHECKING
import aiohttp
if TYPE_CHECKING:
from tool_context import ToolContext
logger = logging.getLogger(__name__)
LLM_API_URL = "https://www.wolframalpha.com/api/v1/llm-api"
DEFAULT_TIMEOUT = 25
MAX_MAXCHARS = 32000
MIN_MAXCHARS = 500
MIN_SUB_TIMEOUT = 1
MAX_SUB_TIMEOUT = 120
MIN_PLOT_DIM = 1
MAX_PLOT_DIM = 2048
async def _resolve_app_id(ctx: ToolContext | None) -> str | None:
"""Return the Wolfram|Alpha App ID using the standard credential stack.
Resolves the App ID by trying, in order, the per-user key stored in Redis,
the ``WOLFRAM_ALPHA_APP_ID`` environment variable, and finally
``config.API_KEYS["wolfram_alpha"]``. This mirrors how other API-backed tools
locate credentials so a user-specific key can override the bot-wide default.
When ``ctx`` carries a Redis client and a ``user_id``, it calls
:func:`tools.manage_api_keys.get_user_api_key` (which reads the user/channel
key from Redis); otherwise it falls back to process environment and config.
Called only by :func:`run` at the start of a query; no external callers were
found.
Args:
ctx (ToolContext | None): Tool context supplying ``redis``, ``user_id``,
``channel_id``, and ``config`` for per-user lookup; ``None`` skips
straight to the env/config fallbacks.
Returns:
str | None: The stripped App ID, or ``None`` if no credential could be
resolved from any source.
"""
app_id: str | None = None
if ctx and getattr(ctx, "redis", None) and getattr(ctx, "user_id", None):
from tools.manage_api_keys import get_user_api_key
app_id = await get_user_api_key(
ctx.user_id,
"wolfram_alpha",
redis_client=ctx.redis,
channel_id=getattr(ctx, "channel_id", None),
config=getattr(ctx, "config", None),
)
if not app_id:
app_id = os.environ.get("WOLFRAM_ALPHA_APP_ID", "")
if not app_id and ctx and getattr(ctx, "config", None):
app_id = getattr(ctx.config, "API_KEYS", {}).get("wolfram_alpha", "")
return app_id.strip() if app_id else None
def _use_bearer_auth() -> bool:
"""Report whether the App ID should be sent as a Bearer token.
Reads the ``WOLFRAM_ALPHA_USE_BEARER_AUTH`` environment variable and treats
the truthy values ``1``/``true``/``yes``/``on`` (case-insensitive) as a
request to authenticate via the ``Authorization: Bearer`` header rather than
the default ``appid`` query parameter.
This reads only process environment state and has no other side effects. It
is called by :func:`run` to decide both how to build the query pairs (via
:func:`_build_query_pairs`, which omits the ``appid`` param when bearer auth
is on) and whether to add the ``Authorization`` header.
Returns:
bool: ``True`` if bearer-header authentication is enabled, else ``False``.
"""
flag = os.environ.get("WOLFRAM_ALPHA_USE_BEARER_AUTH", "").strip().lower()
return flag in ("1", "true", "yes", "on")
def _normalize_query_input(q: str) -> str:
"""Collapse all whitespace runs and newlines into single spaces.
The Wolfram|Alpha LLM API expects its ``input`` parameter on one line, so
this normalises multi-line or irregularly spaced user queries before they are
placed into the query string. Pure helper with no side effects.
Called by :func:`run` to prepare the query, and exercised directly by
``tests/test_wolfram_alpha_tool.py``.
Args:
q (str): The raw, possibly multi-line query text.
Returns:
str: The query with every whitespace run collapsed to a single space.
"""
return " ".join(q.split())
def _optional_str(value: str | None) -> str | None:
"""Normalise an optional string, returning ``None`` for empties.
Coerces the value to ``str``, strips surrounding whitespace, and returns the
result only if it is non-empty; ``None`` and whitespace-only inputs collapse
to ``None``. Pure helper with no side effects, used to skip blank optional
query parameters.
It is called by :func:`_append_if_str` (to decide whether a key/value pair
is worth appending) and by :func:`_build_query_pairs` when emitting each
raw ``assumption`` token.
Args:
value (str | None): The candidate value to normalise.
Returns:
str | None: The stripped string, or ``None`` if empty/``None``.
"""
if value is None:
return None
t = str(value).strip()
return t if t else None
def _clamp_int(value: int | None, lo: int, hi: int) -> int | None:
"""Clamp an optional integer into the inclusive range ``[lo, hi]``.
Coerces the value to ``int`` and bounds it between ``lo`` and ``hi``;
``None`` passes through unchanged so callers can distinguish "omit" from a
clamped value. Pure helper with no side effects.
It is called throughout :func:`_build_query_pairs` to keep
``maxchars`` within ``MIN_MAXCHARS``/``MAX_MAXCHARS``, the per-phase
timeouts within ``MIN_SUB_TIMEOUT``/``MAX_SUB_TIMEOUT``, and the plot
dimensions within ``MIN_PLOT_DIM``/``MAX_PLOT_DIM`` before they are sent to
the Wolfram|Alpha LLM API.
Args:
value (int | None): The candidate integer, or ``None`` to skip.
lo (int): Inclusive lower bound.
hi (int): Inclusive upper bound.
Returns:
int | None: The clamped integer, or ``None`` when ``value`` is ``None``.
Raises:
ValueError: If ``value`` is non-``None`` and cannot be coerced to ``int``.
"""
if value is None:
return None
return max(lo, min(hi, int(value)))
def _append_if_str(
pairs: list[tuple[str, str]],
key: str,
value: str | None,
) -> None:
"""Append a ``(key, value)`` query pair only when the value is non-empty.
Normalises ``value`` via :func:`_optional_str` and, if it survives as a
non-empty string, mutates ``pairs`` in place by appending the new tuple.
Blank/``None`` values are silently skipped so they never reach the API.
It is called repeatedly by :func:`_build_query_pairs` for the optional
string-valued Wolfram parameters (``languagecode``, ``location``,
``timezone``, ``latlong``, ``countrycode``, ``currency``, ``ip``, ``mag``).
Args:
pairs (list[tuple[str, str]]): The accumulating list of query-string
pairs; mutated in place when a value is appended.
key (str): The query-parameter name to use for the pair.
value (str | None): The candidate value; appended only if non-empty.
Returns:
None
"""
s = _optional_str(value)
if s is not None:
pairs.append((key, s))
def _build_query_pairs(
*,
normalized_input: str,
app_id: str,
use_bearer: bool,
maxchars: int | None,
units: str | None,
req_timeout: int,
assumptions: list[str] | None,
languagecode: str | None,
location: str | None,
timezone: str | None,
latlong: str | None,
countrycode: str | None,
currency: str | None,
ip: str | None,
scantimeout: int | None,
formattimeout: int | None,
parsetimeout: int | None,
width: int | None,
maxwidth: int | None,
plotwidth: int | None,
mag: str | None,
) -> list[tuple[str, str]]:
"""Assemble the ordered query-string pairs for the Wolfram|Alpha LLM API.
Builds the full list of ``(key, value)`` tuples sent to
``https://www.wolframalpha.com/api/v1/llm-api``, starting with the required
``input`` and conditionally adding authentication, sizing, locale, geo, and
timeout parameters. Integer parameters are bounded with :func:`_clamp_int`,
optional string parameters are filtered with :func:`_append_if_str`, and
each disambiguation token in ``assumptions`` is emitted as its own repeated
``assumption`` pair. The ``appid`` pair is included only when bearer auth is
off; ``units`` is included only for the values ``metric``/``imperial``.
This helper performs no I/O — it only transforms inputs into a list — and is
called exclusively by :func:`run`, which passes the result as the
``params`` argument to the outbound ``aiohttp`` GET request.
Args:
normalized_input (str): The whitespace-collapsed query text (the
required ``input`` parameter).
app_id (str): The resolved Wolfram|Alpha App ID; emitted as the
``appid`` pair when ``use_bearer`` is ``False``.
use_bearer (bool): When ``True``, omit the ``appid`` query pair because
the caller will send the App ID via an ``Authorization`` header.
maxchars (int | None): Desired max response size, clamped to
``[MIN_MAXCHARS, MAX_MAXCHARS]`` when provided.
units (str | None): Unit system; emitted only if ``metric`` or
``imperial``.
req_timeout (int): Total request timeout, always emitted as
``totaltimeout``.
assumptions (list[str] | None): Disambiguation tokens, each emitted as a
separate ``assumption`` pair.
languagecode (str | None): Optional response language code.
location (str | None): Optional location hint.
timezone (str | None): Optional timezone hint.
latlong (str | None): Optional ``lat,long`` geo hint.
countrycode (str | None): Optional country code.
currency (str | None): Optional currency for financial queries.
ip (str | None): Optional client IP hint for geolocation.
scantimeout (int | None): Scan-phase timeout, clamped to
``[MIN_SUB_TIMEOUT, MAX_SUB_TIMEOUT]``.
formattimeout (int | None): Format-phase timeout, similarly clamped.
parsetimeout (int | None): Parse-phase timeout, similarly clamped.
width (int | None): Plot/image width hint, clamped to
``[MIN_PLOT_DIM, MAX_PLOT_DIM]``.
maxwidth (int | None): Max width hint, similarly clamped.
plotwidth (int | None): Plot width hint, similarly clamped.
mag (str | None): Optional magnification string.
Returns:
list[tuple[str, str]]: Ordered query-string pairs ready to pass as the
``aiohttp`` request ``params``.
"""
pairs: list[tuple[str, str]] = [("input", normalized_input)]
if not use_bearer:
pairs.append(("appid", app_id))
if maxchars is not None:
mc = _clamp_int(maxchars, MIN_MAXCHARS, MAX_MAXCHARS)
if mc is not None:
pairs.append(("maxchars", str(mc)))
if units and units in ("metric", "imperial"):
pairs.append(("units", units))
pairs.append(("totaltimeout", str(req_timeout)))
st = _clamp_int(scantimeout, MIN_SUB_TIMEOUT, MAX_SUB_TIMEOUT)
if st is not None:
pairs.append(("scantimeout", str(st)))
ft = _clamp_int(formattimeout, MIN_SUB_TIMEOUT, MAX_SUB_TIMEOUT)
if ft is not None:
pairs.append(("formattimeout", str(ft)))
pt = _clamp_int(parsetimeout, MIN_SUB_TIMEOUT, MAX_SUB_TIMEOUT)
if pt is not None:
pairs.append(("parsetimeout", str(pt)))
_append_if_str(pairs, "languagecode", languagecode)
_append_if_str(pairs, "location", location)
_append_if_str(pairs, "timezone", timezone)
_append_if_str(pairs, "latlong", latlong)
_append_if_str(pairs, "countrycode", countrycode)
_append_if_str(pairs, "currency", currency)
_append_if_str(pairs, "ip", ip)
_append_if_str(pairs, "mag", mag)
w = _clamp_int(width, MIN_PLOT_DIM, MAX_PLOT_DIM)
if w is not None:
pairs.append(("width", str(w)))
mw = _clamp_int(maxwidth, MIN_PLOT_DIM, MAX_PLOT_DIM)
if mw is not None:
pairs.append(("maxwidth", str(mw)))
pw = _clamp_int(plotwidth, MIN_PLOT_DIM, MAX_PLOT_DIM)
if pw is not None:
pairs.append(("plotwidth", str(pw)))
if assumptions:
for raw in assumptions:
a = _optional_str(str(raw))
if a is not None:
pairs.append(("assumption", a))
return pairs
# ---------------------------------------------------------------------------
# v3 tool interface
# ---------------------------------------------------------------------------
_TOOL_DESC_ASSUMPTIONS = (
"If Wolfram suggests disambiguation, repeat the same query and pass "
"each choice as an entry in 'assumptions' (API assumption tokens)."
)
TOOL_NAME = "wolfram_alpha_query"
TOOL_DESCRIPTION = (
"Query Wolfram|Alpha for computational knowledge: math, science, "
"conversions, statistics, geography, finance, and more. Returns "
"LLM-optimised plain-text results including computed answers, tables, "
"image URLs, and a link to the full Wolfram|Alpha page. " + _TOOL_DESC_ASSUMPTIONS
)
TOOL_PARAMETERS: dict[str, Any] = {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": (
"Natural-language input for Wolfram|Alpha. Prefer simplified "
"keyword queries when possible (e.g. 'France population' "
"instead of 'how many people live in France'). Use single-"
"letter variable names for math (e.g. 'solve x^2 - 4 = 0')."
),
},
"maxchars": {
"type": "integer",
"description": (
"Maximum characters in the response (500-32000, default 6800). "
"Reduce for concise answers, increase for detailed data."
),
},
"units": {
"type": "string",
"enum": ["metric", "imperial"],
"description": "Unit system for measurements (default: auto-detect from location).",
},
"timeout": {
"type": "integer",
"description": (
"Maps to Wolfram's totaltimeout (default 25, max 30 seconds)."
),
},
"assumptions": {
"type": "array",
"items": {"type": "string"},
"description": (
"Disambiguation tokens from Wolfram (repeat 'assumption' query "
"params). Use when re-trying after ambiguous input."
),
},
"languagecode": {
"type": "string",
"description": "Response language code (e.g. en, de).",
},
"location": {
"type": "string",
"description": "Location hint for locale-aware results.",
},
"timezone": {
"type": "string",
"description": "Timezone hint (Wolfram API string).",
},
"latlong": {
"type": "string",
"description": "Latitude,longitude for geo context.",
},
"countrycode": {
"type": "string",
"description": "Country code for regional defaults.",
},
"currency": {
"type": "string",
"description": "Preferred currency for financial queries.",
},
"ip": {
"type": "string",
"description": "Client IP hint for geo (when available).",
},
"scantimeout": {
"type": "integer",
"description": "Scan phase timeout in seconds (1-120).",
},
"formattimeout": {
"type": "integer",
"description": "Format phase timeout in seconds (1-120).",
},
"parsetimeout": {
"type": "integer",
"description": "Parse phase timeout in seconds (1-120).",
},
"width": {
"type": "integer",
"description": "Plot/image width hint (1-2048).",
},
"maxwidth": {
"type": "integer",
"description": "Max width hint (1-2048).",
},
"plotwidth": {
"type": "integer",
"description": "Plot width hint (1-2048).",
},
"mag": {
"type": "string",
"description": "Magnification string (Full Results API).",
},
},
"required": ["query"],
}
[docs]
async def run(
query: str,
maxchars: int | None = None,
units: str | None = None,
timeout: int | None = None,
assumptions: list[str] | None = None,
languagecode: str | None = None,
location: str | None = None,
timezone: str | None = None,
latlong: str | None = None,
countrycode: str | None = None,
currency: str | None = None,
ip: str | None = None,
scantimeout: int | None = None,
formattimeout: int | None = None,
parsetimeout: int | None = None,
width: int | None = None,
maxwidth: int | None = None,
plotwidth: int | None = None,
mag: str | None = None,
ctx: ToolContext | None = None,
) -> str:
"""Execute a Wolfram|Alpha LLM-API query and return a JSON-encoded result.
Entry point for the ``wolfram_alpha_query`` tool. Validates the query,
resolves the App ID, builds the request parameters, performs an async HTTP
GET against the Wolfram|Alpha LLM API, and serialises either the plain-text
answer or a structured error to a JSON string. HTTP error statuses are
mapped to friendly messages (501 = uninterpretable input, 403 = bad App ID,
400 = missing input, other = generic), and network/unexpected failures are
caught and returned as ``error`` payloads rather than raised.
Resolves credentials via :func:`_resolve_app_id` (which checks the per-user
key in Redis, the ``WOLFRAM_ALPHA_APP_ID`` env var, then config), normalises
the query with :func:`_normalize_query_input`, chooses the auth scheme with
:func:`_use_bearer_auth`, and constructs the query pairs with
:func:`_build_query_pairs`. It opens an ``aiohttp.ClientSession`` to call
``LLM_API_URL``, logs request/response activity, and on any non-200 status
fires a fire-and-forget observability event through
:func:`_emit_error_event`. When no App ID can be resolved it returns
``missing_api_key_error("wolfram_alpha")`` from ``tools.manage_api_keys``.
This is invoked by the generic tool dispatcher in ``tool_loader.py`` (which
locates the module's ``run`` attribute via ``getattr(module, "run")``) and
directly by the tool's tests in ``tests/test_wolfram_alpha_tool.py``.
Args:
query (str): Natural-language Wolfram|Alpha input; required and must be
non-blank.
maxchars (int | None): Max characters in the response (clamped 500-32000).
units (str | None): Unit system, ``metric`` or ``imperial``.
timeout (int | None): Maps to Wolfram's ``totaltimeout``; bounded to
``[5, 30]`` seconds, defaulting to ``DEFAULT_TIMEOUT``.
assumptions (list[str] | None): Disambiguation tokens for re-tries.
languagecode (str | None): Response language code.
location (str | None): Location hint for locale-aware results.
timezone (str | None): Timezone hint.
latlong (str | None): ``lat,long`` geo hint.
countrycode (str | None): Country code for regional defaults.
currency (str | None): Preferred currency for financial queries.
ip (str | None): Client IP hint for geolocation.
scantimeout (int | None): Scan-phase timeout in seconds (1-120).
formattimeout (int | None): Format-phase timeout in seconds (1-120).
parsetimeout (int | None): Parse-phase timeout in seconds (1-120).
width (int | None): Plot/image width hint (1-2048).
maxwidth (int | None): Max width hint (1-2048).
plotwidth (int | None): Plot width hint (1-2048).
mag (str | None): Magnification string.
ctx (ToolContext | None): Tool context supplying ``redis``, ``user_id``,
``channel_id``, and ``config`` for per-user App ID resolution; may be
``None`` (falls back to env/config credentials).
Returns:
str: A JSON string. On success it contains ``query`` and ``answer``;
on failure it contains an ``error`` field (plus ``http_status`` and
optional ``detail``/``suggestions`` for HTTP errors).
"""
from tools.manage_api_keys import missing_api_key_error
if not query or not query.strip():
return json.dumps({"error": "query is required"})
normalized = _normalize_query_input(query)
app_id = await _resolve_app_id(ctx)
if not app_id:
return json.dumps({"error": missing_api_key_error("wolfram_alpha")})
use_bearer = _use_bearer_auth()
req_timeout = DEFAULT_TIMEOUT
if timeout is not None:
req_timeout = max(5, min(30, int(timeout)))
params = _build_query_pairs(
normalized_input=normalized,
app_id=app_id,
use_bearer=use_bearer,
maxchars=maxchars,
units=units,
req_timeout=req_timeout,
assumptions=assumptions,
languagecode=languagecode,
location=location,
timezone=timezone,
latlong=latlong,
countrycode=countrycode,
currency=currency,
ip=ip,
scantimeout=scantimeout,
formattimeout=formattimeout,
parsetimeout=parsetimeout,
width=width,
maxwidth=maxwidth,
plotwidth=plotwidth,
mag=mag,
)
headers: dict[str, str] = {}
if use_bearer:
headers["Authorization"] = f"Bearer {app_id}"
try:
async with aiohttp.ClientSession() as session:
logger.info("Wolfram|Alpha LLM API query: %s", normalized[:120])
async with session.get(
LLM_API_URL,
params=params,
headers=headers or None,
timeout=aiohttp.ClientTimeout(total=req_timeout + 5),
) as response:
body = await response.text()
if response.status == 200:
logger.info(
"Wolfram|Alpha OK (%d chars)",
len(body),
)
return json.dumps(
{
"query": normalized,
"answer": body,
},
ensure_ascii=False,
)
_emit_error_event(response.status, body)
if response.status == 501:
return json.dumps(
{
"error": "Wolfram|Alpha could not interpret the input",
"http_status": 501,
"suggestions": body[:1000] if body else None,
},
ensure_ascii=False,
)
if response.status == 403:
return json.dumps(
{
"error": "Invalid or missing Wolfram|Alpha App ID",
"http_status": 403,
}
)
if response.status == 400:
return json.dumps(
{
"error": "Bad request — missing input parameter",
"http_status": 400,
}
)
return json.dumps(
{
"error": f"Wolfram|Alpha HTTP {response.status}",
"http_status": response.status,
"detail": body[:500] if body else None,
},
ensure_ascii=False,
)
except aiohttp.ClientError as exc:
logger.exception("Wolfram|Alpha network error")
_emit_error_event(0, str(exc), error_kind="network")
return json.dumps({"error": f"Network error: {exc}"})
except Exception as exc:
logger.exception("Wolfram|Alpha unexpected error")
return json.dumps({"error": f"Unexpected error: {exc}"})
def _emit_error_event(status: int, detail: str, error_kind: str | None = None) -> None:
"""Schedule a fire-and-forget observability event for a failed HTTP call.
Emits an HTTP-error telemetry record so non-200 Wolfram|Alpha responses (and
network failures) show up in the bot's observability pipeline without blocking
or being able to fail the actual query. Any exception raised while building or
scheduling the event is swallowed.
It lazily imports :func:`observability.publish_http_error_event` and schedules
it on the running event loop via ``call_soon`` + ``ensure_future`` rather than
awaiting it, so the caller continues immediately. Called by :func:`run` on
every non-200 status and on the network-error path; no external callers were
found.
Args:
status (int): The HTTP status code (``0`` is used to denote a network
error rather than an HTTP response).
detail (str): Response/exception text; truncated to 500 chars in the event.
error_kind (str | None): Optional classifier (e.g. ``"network"``) attached
to the event when provided.
Returns:
None
"""
try:
from observability import publish_http_error_event
kwargs: dict[str, Any] = {
"http_service": "wolfram_alpha",
"http_status": status,
"endpoint": "www.wolframalpha.com/api/v1/llm-api",
"detail": (detail or "")[:500],
}
if error_kind:
kwargs["error_kind"] = error_kind
asyncio.get_event_loop().call_soon(
lambda: asyncio.ensure_future(publish_http_error_event(**kwargs)),
)
except Exception:
pass