Source code for tools
"""Extensible tool-calling framework.
Register tools with the ``@registry.tool`` decorator. Each tool is an async
callable that receives keyword arguments and returns a string result.
Example usage::
from tools import ToolRegistry
registry = ToolRegistry()
@registry.tool(
name="get_weather",
description="Get the current weather for a location.",
parameters={
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "City and state, e.g. 'San Francisco, CA'",
},
},
"required": ["location"],
},
)
async def get_weather(location: str) -> str:
return f"The weather in {location} is sunny and 72°F."
"""
from __future__ import annotations
import inspect
import logging
import threading
from dataclasses import dataclass, field
from typing import Any, Callable, Awaitable, TYPE_CHECKING
if TYPE_CHECKING:
from task_manager import TaskManager
from tool_context import ToolContext
logger = logging.getLogger(__name__)
# Tool-routing pin sets now live in ``core.gateway_pinned`` so services (e.g.
# inference) can import the routing decision as plain data without importing any
# tool handler module. Re-exported here for backward compatibility.
from core.gateway_pinned import ( # noqa: E402,F401
GATEWAY_PINNED_TOOLS,
INFERENCE_PINNED_TOOLS,
)
[docs]
@dataclass
class ToolDefinition:
"""Immutable record describing one tool registered with the framework.
Holds everything :class:`ToolRegistry` needs to expose a tool to the LLM
and dispatch a call to it: the LLM-visible ``name`` and ``description``, the
JSON-Schema ``parameters`` block, the async ``handler`` coroutine, and two
behavioural flags. ``no_background`` keeps the call inline (the result is
awaited directly instead of being handed to ``TaskManager`` for background
execution), and ``allow_repeat`` exempts the tool from the executor's
repetition-loop guard. Instances are created by :meth:`ToolRegistry.tool`
when a handler is decorated and are read back out by the schema-export and
dispatch paths (:meth:`ToolRegistry.call`, :meth:`ToolRegistry.list_tools`,
:meth:`ToolRegistry.repeat_allowed_tools`).
Args:
name: Tool name as advertised to the model.
description: Human-readable summary of what the tool does.
parameters: JSON Schema object describing the accepted arguments.
handler: The async callable invoked to run the tool.
no_background: If true, never offload the call to ``TaskManager``.
allow_repeat: If true, the tool is exempt from repetition detection.
"""
name: str
description: str
parameters: dict[str, Any]
handler: Callable[..., Awaitable[str]]
no_background: bool = False
allow_repeat: bool = False
[docs]
class ToolRegistry:
"""Registry that stores tool definitions and executes tool calls.
A :class:`threading.RLock` serializes access to ``_tools`` and tool caches
so :func:`~tools.reload_tools.run` and concurrent :meth:`call` /
OpenAI schema reads cannot race.
"""
[docs]
def __init__(
self,
task_manager: TaskManager | None = None,
) -> None:
"""Initialize an empty registry, optionally bound to a ``TaskManager``.
Sets up the internal tool table, the per-tool permission map, the
:class:`threading.RLock` that serializes every read and write, and the
two lazily-populated caches of OpenAI-format schemas. The optional
*task_manager* is stored so that non-nested :meth:`call` invocations of
background-eligible tools can be offloaded to it; when it is ``None``
every call runs inline. A registry is constructed once per microservice
worker (the inference and web services build one during startup) and
then populated by the tool loader.
Args:
task_manager: The :class:`~task_manager.TaskManager` used to run
long tools in the background, or ``None`` to always run inline.
"""
self._tools: dict[str, ToolDefinition] = {}
self._permissions: dict[str, list[str]] = {}
self.task_manager: TaskManager | None = task_manager
self._lock = threading.RLock()
# Cached OpenAI-format tool lists, rebuilt lazily on first access
# after any mutation of ``_tools``.
self._openai_tools_cache: list[dict[str, Any]] | None = None
self._openai_tools_by_name: dict[str, dict[str, Any]] | None = None
# ------------------------------------------------------------------
# Permissions
# ------------------------------------------------------------------
[docs]
def set_permissions(self, permissions: dict[str, list[str]]) -> None:
"""Set per-tool user whitelists.
*permissions* maps tool names to lists of allowed user IDs.
A special value ``"*"`` in the list means *everyone*.
Tools **not** present in the dict are allowed for all users.
"""
with self._lock:
self._permissions = dict(permissions)
[docs]
def is_allowed(self, tool_name: str, user_id: str) -> bool:
"""Return ``True`` if *user_id* may execute *tool_name*.
Rules:
1. Tool not in the permissions dict -> allow (backward compatible).
2. ``"*"`` in the tool's allowed list -> allow.
3. *user_id* in the tool's allowed list -> allow.
4. Otherwise -> deny.
"""
allowed = self._permissions.get(tool_name)
if allowed is None:
return True
if "*" in allowed:
return True
return user_id in allowed
# ------------------------------------------------------------------
# Registration
# ------------------------------------------------------------------
[docs]
def tool(
self,
name: str,
description: str,
parameters: dict[str, Any],
) -> Callable:
"""Decorator to register an async function as a tool.
Parameters
----------
name:
The tool name exposed to the LLM.
description:
Human-readable description of what the tool does.
parameters:
JSON Schema object describing the tool's parameters.
"""
def decorator(
fn: Callable[..., Awaitable[str]],
) -> Callable[..., Awaitable[str]]:
"""Register *fn* under the captured name and return it unchanged.
This is the closure returned by :meth:`tool`; the ``@registry.tool(...)``
line applies it to the decorated coroutine. It builds a
:class:`ToolDefinition` from the captured ``name``, ``description``
and ``parameters`` plus *fn* as the handler, stores it in
``_tools`` under the lock, and calls :meth:`invalidate_cache` so the
OpenAI schema caches are rebuilt on next read. The original function
is returned so the decorated name still refers to the plain
coroutine and stays directly callable.
Args:
fn: The async tool handler being registered.
Returns:
The same *fn*, unmodified.
"""
with self._lock:
self._tools[name] = ToolDefinition(
name=name,
description=description,
parameters=parameters,
handler=fn,
)
self.invalidate_cache()
return fn
return decorator
# ------------------------------------------------------------------
# Execution
# ------------------------------------------------------------------
[docs]
async def call(
self,
name: str,
arguments: dict[str, Any],
user_id: str = "",
ctx: ToolContext | None = None,
*,
nested: bool = False,
) -> str:
"""Execute a registered tool by name and return the result string.
If *user_id* is provided, the tool's permission whitelist is
checked first. If *ctx* is provided **and** the handler declares
a ``ctx`` parameter, the context is injected automatically.
If *nested* is true, the coroutine is awaited directly (TaskManager
is skipped) so compound tools receive concrete results instead of
background ``task_id`` envelopes when inner calls exceed the timeout.
If the tool raises an exception the error message is returned so
the LLM can see what went wrong and recover.
"""
with self._lock:
tool_def = self._tools.get(name)
if tool_def is None:
error_msg = f"Unknown tool: {name}"
logger.error(error_msg)
return error_msg
# Permission check
if user_id and not self.is_allowed(name, user_id):
error_msg = (
f"Permission denied: user '{user_id}' is not allowed "
f"to run tool '{name}'."
)
logger.warning(error_msg)
return error_msg
# Gateway-pinned delegation check.
#
# Some tools require the live platform client (e.g. discord.py
# ``Client``) which only exists on the Gateway node. On worker nodes
# ``ctx.adapter`` is a ProxyPlatformAdapter with no ``.client``, so
# these tools would fail with "Discord client not available". They are
# delegated to the Gateway's ``execute_tool`` RPC, where they run
# against the real adapter.
#
# The two groups below are pinned for different reasons:
# - voice/music tools hold node-local audio state on the Gateway;
# - the discord_* / guild tools call ``require_discord_client`` and
# thus need the native client (added when the microservice split
# left them stranded on worker nodes).
if not nested and ctx is not None and ctx.adapter is not None:
delegate_method = getattr(ctx.adapter, "delegate_to_gateway", None)
if delegate_method is not None and name in GATEWAY_PINNED_TOOLS:
logger.info("Delegating gateway-pinned tool '%s' to GatewayService", name)
return await delegate_method(
"execute_tool",
tool_name=name,
tool_args=arguments,
tool_ctx={
"platform": ctx.platform,
"channel_id": ctx.channel_id,
"user_id": ctx.user_id,
"user_name": ctx.user_name,
"guild_id": ctx.guild_id,
"message_id": ctx.message_id,
},
)
try:
arguments = _filter_arguments(tool_def.handler, arguments, name)
if ctx is not None and user_id:
err = await _resolve_secret_refs(arguments, user_id, ctx)
if err is not None:
return err
if ctx is not None and _handler_accepts_ctx(tool_def.handler):
coro = tool_def.handler(**arguments, ctx=ctx)
else:
coro = tool_def.handler(**arguments)
if (
not nested
and self.task_manager is not None
and not tool_def.no_background
):
return await self.task_manager.execute(
coro,
tool_name=name,
user_id=user_id,
channel_id=ctx.channel_id if ctx else "",
platform=ctx.platform if ctx else "",
)
return str(await coro)
except Exception as exc:
logger.exception("Tool %r failed", name)
return (
f"Tool '{name}' raised {type(exc).__name__}. "
"See server logs for details."
)
# ------------------------------------------------------------------
# Schema export & caching
# ------------------------------------------------------------------
[docs]
def invalidate_cache(self) -> None:
"""Clear the cached OpenAI tool representations.
Called automatically when tools are registered via the ``tool``
decorator. Must also be called explicitly after bulk mutations
such as ``_tools.clear()`` (e.g. in ``reload_tools``).
"""
with self._lock:
self._openai_tools_cache = None
self._openai_tools_by_name = None
def _ensure_cache_unlocked(self) -> None:
"""Rebuild the OpenAI-format schema caches when they have been invalidated.
Lazily materializes ``_openai_tools_cache`` (the full list of
function-calling tool dicts) and ``_openai_tools_by_name`` (the same
dicts keyed by name for O(1) lookup) by walking every
:class:`ToolDefinition` in ``_tools``. It is a no-op when the cache is
already warm, so the cost is paid only after a registration or an
explicit :meth:`invalidate_cache`. The caller MUST already hold
:attr:`_lock`; the public readers :meth:`get_openai_tools` and
:meth:`get_openai_tools_by_names` are its only callers and both invoke
it from inside the lock.
"""
if self._openai_tools_cache is not None:
return
tools = [
{
"type": "function",
"function": {
"name": td.name,
"description": td.description,
"parameters": td.parameters,
},
}
for td in self._tools.values()
]
self._openai_tools_cache = tools
self._openai_tools_by_name = {t["function"]["name"]: t for t in tools}
[docs]
def get_openai_tools(self) -> list[dict[str, Any]]:
"""Return tool definitions in the OpenAI function-calling JSON format.
Returns an empty list when no tools are registered, which means the
``tools`` parameter can be omitted from the API call.
"""
with self._lock:
self._ensure_cache_unlocked()
return list(self._openai_tools_cache)
[docs]
def get_openai_tools_by_names(
self,
names: set[str],
) -> list[dict[str, Any]]:
"""Return only the OpenAI tool dicts whose names are in *names*.
Uses a cached ``dict`` for O(1) per-name lookup instead of
rebuilding and filtering the full list each time.
"""
with self._lock:
self._ensure_cache_unlocked()
by_name = self._openai_tools_by_name or {}
return [by_name[n] for n in names if n in by_name]
[docs]
def list_tools(self) -> list[ToolDefinition]:
"""Return a snapshot list of every registered :class:`ToolDefinition`.
Takes the lock and copies the current values of ``_tools`` so callers
can iterate the full set of tools (name, description, parameters and
flags) without holding the lock or racing a reload. It backs a wide
range of consumers across the repo -- the tool-embedding indexer in
``classifiers.update_tool_embeddings`` and ``classifiers.build_tool_index``,
the web ``tools_tasks_api`` listing endpoint, the subagent tool filter
in ``tools/subagent_tools.py``, ``tools/search_tools.py`` and the
write/import-tool flows -- each of which then reads fields off the
returned definitions.
Returns:
A new list of the registered tool definitions.
"""
with self._lock:
return list(self._tools.values())
[docs]
def tool_names(self) -> frozenset[str]:
"""Return the set of all registered tool names as an immutable snapshot.
Reads the keys of ``_tools`` under the lock and freezes them, giving a
thread-safe, cheaply-comparable view of which tools currently exist.
Used by ``message_processor.generate_and_send`` to validate and gate
tool selection (for example checking that ``activate_skill`` is present)
and by the per-service tool-loading tests to assert a service finished
registering its tools.
Returns:
A frozenset of the registered tool names.
"""
with self._lock:
return frozenset(self._tools.keys())
[docs]
def repeat_allowed_tools(self) -> frozenset[str]:
"""Return the names of tools exempt from repetition-loop detection.
Collects the names of every definition whose ``allow_repeat`` flag is
set, snapshotting them under the lock. The executor
(``openrouter_client.executor``) reads this set so it can suppress its
"the model keeps calling the same tool" guard for tools that are
legitimately meant to be invoked repeatedly.
Returns:
A frozenset of repeat-exempt tool names.
"""
with self._lock:
return frozenset(td.name for td in self._tools.values() if td.allow_repeat)
@property
def has_tools(self) -> bool:
"""Return whether any tool is currently registered.
A cheap, lock-guarded predicate used to decide whether tool-calling is
even possible. The transport layer (``openrouter_client.transport``)
checks it before attaching a ``tools`` block to an outbound request, and
the per-service loading tests use it to confirm that the inference
service registered tools while the web service did not.
Returns:
``True`` if at least one tool is registered, else ``False``.
"""
with self._lock:
return len(self._tools) > 0
[docs]
def __len__(self) -> int:
"""Return the number of registered tools.
Lets the registry be used directly in ``len(registry)`` and boolean
contexts. Reads the size of ``_tools`` under the lock.
Returns:
The count of registered tool definitions.
"""
with self._lock:
return len(self._tools)
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
# Cache so we only inspect each handler once.
_ctx_cache: dict[int, bool] = {}
_sig_cache: dict[int, inspect.Signature | None] = {}
def _filter_arguments(
handler: Callable,
arguments: dict[str, Any],
tool_name: str,
) -> dict[str, Any]:
"""Strip keyword arguments that *handler* does not accept.
If the handler accepts ``**kwargs`` all arguments are passed through.
Otherwise only recognised parameter names are kept, and any dropped
keys are logged as a warning so hallucinated params don't crash the
call.
"""
key = id(handler)
sig = _sig_cache.get(key, ...) # sentinel to distinguish "not cached" from "None"
if sig is ...:
try:
sig = inspect.signature(handler)
except (ValueError, TypeError):
sig = None
_sig_cache[key] = sig
if sig is None:
return arguments # can't introspect — pass everything through
# If the handler has a **kwargs catch-all, everything is fine.
if any(p.kind == inspect.Parameter.VAR_KEYWORD for p in sig.parameters.values()):
return arguments
valid_names = set(sig.parameters.keys()) - {"ctx"}
unexpected = set(arguments.keys()) - valid_names
if not unexpected:
return arguments
logger.warning(
"Tool '%s': dropping unexpected argument(s) %s",
tool_name,
", ".join(sorted(unexpected)),
)
return {k: v for k, v in arguments.items() if k in valid_names}
def _handler_accepts_ctx(handler: Callable) -> bool:
"""Return whether *handler* declares a ``ctx`` parameter.
Tells :meth:`ToolRegistry.call` whether to inject the active
:class:`~tool_context.ToolContext` when dispatching, so that
context-unaware tools are not handed an argument they do not expect. The
result is memoized in the module-level ``_ctx_cache`` keyed by the
handler's :func:`id`, so each handler is introspected with
:func:`inspect.signature` only once; signatures that cannot be introspected
are treated as not accepting ``ctx``.
Args:
handler: The tool coroutine to inspect.
Returns:
``True`` if the handler accepts a ``ctx`` keyword, else ``False``.
"""
key = id(handler)
cached = _ctx_cache.get(key)
if cached is not None:
return cached
try:
sig = inspect.signature(handler)
result = "ctx" in sig.parameters
except (ValueError, TypeError):
result = False
_ctx_cache[key] = result
return result
async def _resolve_secret_refs(
arguments: dict[str, Any],
user_id: str,
ctx: "ToolContext",
) -> str | None:
"""Expand ``secret:`` references in tool arguments into their plaintext values.
Walks the string arguments and, for any value beginning with the
``secret:`` prefix, looks up the named secret for *user_id* and substitutes
the decrypted value in place. This lets a user pass a token by name without
the model ever seeing the real credential. Resolution is delegated to
``tools.manage_secrets.resolve_user_secret``, which reads the user's
encrypted secret store via ``ctx.redis`` (and ``ctx.config`` for the
key-database path); *arguments* is mutated in place on success. It is called
by :meth:`ToolRegistry.call` just before dispatch whenever both a context
and a ``user_id`` are present.
Args:
arguments: The tool's keyword arguments; mutated in place when a
reference is resolved.
user_id: The owner whose secret store is consulted.
ctx: The active tool context supplying Redis and configuration.
Returns:
``None`` if all references resolved (or none were present), or a
human-readable error string describing the first failure: an empty
name, a missing secret, or a decryption failure.
"""
from tools.manage_secrets import SECRET_REF_PREFIX, resolve_user_secret
for key, val in list(arguments.items()):
if isinstance(val, str) and val.startswith(SECRET_REF_PREFIX):
name = val[len(SECRET_REF_PREFIX) :].strip()
if not name:
return "Invalid secret reference: secret: must be followed by a name."
resolved = await resolve_user_secret(
user_id,
name,
redis_client=ctx.redis,
config=getattr(ctx, "config", None),
)
if resolved is None:
return f"Secret '{name}' not found or could not be decrypted."
arguments[key] = resolved
return None