Source code for tools

"""Extensible tool-calling framework.

Register tools with the ``@registry.tool`` decorator. Each tool is an async
callable that receives keyword arguments and returns a string result.

Example usage::

    from tools import ToolRegistry

    registry = ToolRegistry()

    @registry.tool(
        name="get_weather",
        description="Get the current weather for a location.",
        parameters={
            "type": "object",
            "properties": {
                "location": {
                    "type": "string",
                    "description": "City and state, e.g. 'San Francisco, CA'",
                },
            },
            "required": ["location"],
        },
    )
    async def get_weather(location: str) -> str:
        return f"The weather in {location} is sunny and 72°F."
"""

from __future__ import annotations

import inspect
import logging
import threading
from dataclasses import dataclass, field
from typing import Any, Callable, Awaitable, TYPE_CHECKING

if TYPE_CHECKING:
    from task_manager import TaskManager
    from tool_context import ToolContext

logger = logging.getLogger(__name__)


# Tool-routing pin sets now live in ``core.gateway_pinned`` so services (e.g.
# inference) can import the routing decision as plain data without importing any
# tool handler module. Re-exported here for backward compatibility.
from core.gateway_pinned import (  # noqa: E402,F401
    GATEWAY_PINNED_TOOLS,
    INFERENCE_PINNED_TOOLS,
)


[docs] @dataclass class ToolDefinition: """Immutable record describing one tool registered with the framework. Holds everything :class:`ToolRegistry` needs to expose a tool to the LLM and dispatch a call to it: the LLM-visible ``name`` and ``description``, the JSON-Schema ``parameters`` block, the async ``handler`` coroutine, and two behavioural flags. ``no_background`` keeps the call inline (the result is awaited directly instead of being handed to ``TaskManager`` for background execution), and ``allow_repeat`` exempts the tool from the executor's repetition-loop guard. Instances are created by :meth:`ToolRegistry.tool` when a handler is decorated and are read back out by the schema-export and dispatch paths (:meth:`ToolRegistry.call`, :meth:`ToolRegistry.list_tools`, :meth:`ToolRegistry.repeat_allowed_tools`). Args: name: Tool name as advertised to the model. description: Human-readable summary of what the tool does. parameters: JSON Schema object describing the accepted arguments. handler: The async callable invoked to run the tool. no_background: If true, never offload the call to ``TaskManager``. allow_repeat: If true, the tool is exempt from repetition detection. """ name: str description: str parameters: dict[str, Any] handler: Callable[..., Awaitable[str]] no_background: bool = False allow_repeat: bool = False
[docs] class ToolRegistry: """Registry that stores tool definitions and executes tool calls. A :class:`threading.RLock` serializes access to ``_tools`` and tool caches so :func:`~tools.reload_tools.run` and concurrent :meth:`call` / OpenAI schema reads cannot race. """
[docs] def __init__( self, task_manager: TaskManager | None = None, ) -> None: """Initialize an empty registry, optionally bound to a ``TaskManager``. Sets up the internal tool table, the per-tool permission map, the :class:`threading.RLock` that serializes every read and write, and the two lazily-populated caches of OpenAI-format schemas. The optional *task_manager* is stored so that non-nested :meth:`call` invocations of background-eligible tools can be offloaded to it; when it is ``None`` every call runs inline. A registry is constructed once per microservice worker (the inference and web services build one during startup) and then populated by the tool loader. Args: task_manager: The :class:`~task_manager.TaskManager` used to run long tools in the background, or ``None`` to always run inline. """ self._tools: dict[str, ToolDefinition] = {} self._permissions: dict[str, list[str]] = {} self.task_manager: TaskManager | None = task_manager self._lock = threading.RLock() # Cached OpenAI-format tool lists, rebuilt lazily on first access # after any mutation of ``_tools``. self._openai_tools_cache: list[dict[str, Any]] | None = None self._openai_tools_by_name: dict[str, dict[str, Any]] | None = None
# ------------------------------------------------------------------ # Permissions # ------------------------------------------------------------------
[docs] def set_permissions(self, permissions: dict[str, list[str]]) -> None: """Set per-tool user whitelists. *permissions* maps tool names to lists of allowed user IDs. A special value ``"*"`` in the list means *everyone*. Tools **not** present in the dict are allowed for all users. """ with self._lock: self._permissions = dict(permissions)
[docs] def is_allowed(self, tool_name: str, user_id: str) -> bool: """Return ``True`` if *user_id* may execute *tool_name*. Rules: 1. Tool not in the permissions dict -> allow (backward compatible). 2. ``"*"`` in the tool's allowed list -> allow. 3. *user_id* in the tool's allowed list -> allow. 4. Otherwise -> deny. """ allowed = self._permissions.get(tool_name) if allowed is None: return True if "*" in allowed: return True return user_id in allowed
# ------------------------------------------------------------------ # Registration # ------------------------------------------------------------------
[docs] def tool( self, name: str, description: str, parameters: dict[str, Any], ) -> Callable: """Decorator to register an async function as a tool. Parameters ---------- name: The tool name exposed to the LLM. description: Human-readable description of what the tool does. parameters: JSON Schema object describing the tool's parameters. """ def decorator( fn: Callable[..., Awaitable[str]], ) -> Callable[..., Awaitable[str]]: """Register *fn* under the captured name and return it unchanged. This is the closure returned by :meth:`tool`; the ``@registry.tool(...)`` line applies it to the decorated coroutine. It builds a :class:`ToolDefinition` from the captured ``name``, ``description`` and ``parameters`` plus *fn* as the handler, stores it in ``_tools`` under the lock, and calls :meth:`invalidate_cache` so the OpenAI schema caches are rebuilt on next read. The original function is returned so the decorated name still refers to the plain coroutine and stays directly callable. Args: fn: The async tool handler being registered. Returns: The same *fn*, unmodified. """ with self._lock: self._tools[name] = ToolDefinition( name=name, description=description, parameters=parameters, handler=fn, ) self.invalidate_cache() return fn return decorator
# ------------------------------------------------------------------ # Execution # ------------------------------------------------------------------
[docs] async def call( self, name: str, arguments: dict[str, Any], user_id: str = "", ctx: ToolContext | None = None, *, nested: bool = False, ) -> str: """Execute a registered tool by name and return the result string. If *user_id* is provided, the tool's permission whitelist is checked first. If *ctx* is provided **and** the handler declares a ``ctx`` parameter, the context is injected automatically. If *nested* is true, the coroutine is awaited directly (TaskManager is skipped) so compound tools receive concrete results instead of background ``task_id`` envelopes when inner calls exceed the timeout. If the tool raises an exception the error message is returned so the LLM can see what went wrong and recover. """ with self._lock: tool_def = self._tools.get(name) if tool_def is None: error_msg = f"Unknown tool: {name}" logger.error(error_msg) return error_msg # Permission check if user_id and not self.is_allowed(name, user_id): error_msg = ( f"Permission denied: user '{user_id}' is not allowed " f"to run tool '{name}'." ) logger.warning(error_msg) return error_msg # Gateway-pinned delegation check. # # Some tools require the live platform client (e.g. discord.py # ``Client``) which only exists on the Gateway node. On worker nodes # ``ctx.adapter`` is a ProxyPlatformAdapter with no ``.client``, so # these tools would fail with "Discord client not available". They are # delegated to the Gateway's ``execute_tool`` RPC, where they run # against the real adapter. # # The two groups below are pinned for different reasons: # - voice/music tools hold node-local audio state on the Gateway; # - the discord_* / guild tools call ``require_discord_client`` and # thus need the native client (added when the microservice split # left them stranded on worker nodes). if not nested and ctx is not None and ctx.adapter is not None: delegate_method = getattr(ctx.adapter, "delegate_to_gateway", None) if delegate_method is not None and name in GATEWAY_PINNED_TOOLS: logger.info("Delegating gateway-pinned tool '%s' to GatewayService", name) return await delegate_method( "execute_tool", tool_name=name, tool_args=arguments, tool_ctx={ "platform": ctx.platform, "channel_id": ctx.channel_id, "user_id": ctx.user_id, "user_name": ctx.user_name, "guild_id": ctx.guild_id, "message_id": ctx.message_id, }, ) try: arguments = _filter_arguments(tool_def.handler, arguments, name) if ctx is not None and user_id: err = await _resolve_secret_refs(arguments, user_id, ctx) if err is not None: return err if ctx is not None and _handler_accepts_ctx(tool_def.handler): coro = tool_def.handler(**arguments, ctx=ctx) else: coro = tool_def.handler(**arguments) if ( not nested and self.task_manager is not None and not tool_def.no_background ): return await self.task_manager.execute( coro, tool_name=name, user_id=user_id, channel_id=ctx.channel_id if ctx else "", platform=ctx.platform if ctx else "", ) return str(await coro) except Exception as exc: logger.exception("Tool %r failed", name) return ( f"Tool '{name}' raised {type(exc).__name__}. " "See server logs for details." )
# ------------------------------------------------------------------ # Schema export & caching # ------------------------------------------------------------------
[docs] def invalidate_cache(self) -> None: """Clear the cached OpenAI tool representations. Called automatically when tools are registered via the ``tool`` decorator. Must also be called explicitly after bulk mutations such as ``_tools.clear()`` (e.g. in ``reload_tools``). """ with self._lock: self._openai_tools_cache = None self._openai_tools_by_name = None
def _ensure_cache_unlocked(self) -> None: """Rebuild the OpenAI-format schema caches when they have been invalidated. Lazily materializes ``_openai_tools_cache`` (the full list of function-calling tool dicts) and ``_openai_tools_by_name`` (the same dicts keyed by name for O(1) lookup) by walking every :class:`ToolDefinition` in ``_tools``. It is a no-op when the cache is already warm, so the cost is paid only after a registration or an explicit :meth:`invalidate_cache`. The caller MUST already hold :attr:`_lock`; the public readers :meth:`get_openai_tools` and :meth:`get_openai_tools_by_names` are its only callers and both invoke it from inside the lock. """ if self._openai_tools_cache is not None: return tools = [ { "type": "function", "function": { "name": td.name, "description": td.description, "parameters": td.parameters, }, } for td in self._tools.values() ] self._openai_tools_cache = tools self._openai_tools_by_name = {t["function"]["name"]: t for t in tools}
[docs] def get_openai_tools(self) -> list[dict[str, Any]]: """Return tool definitions in the OpenAI function-calling JSON format. Returns an empty list when no tools are registered, which means the ``tools`` parameter can be omitted from the API call. """ with self._lock: self._ensure_cache_unlocked() return list(self._openai_tools_cache)
[docs] def get_openai_tools_by_names( self, names: set[str], ) -> list[dict[str, Any]]: """Return only the OpenAI tool dicts whose names are in *names*. Uses a cached ``dict`` for O(1) per-name lookup instead of rebuilding and filtering the full list each time. """ with self._lock: self._ensure_cache_unlocked() by_name = self._openai_tools_by_name or {} return [by_name[n] for n in names if n in by_name]
[docs] def list_tools(self) -> list[ToolDefinition]: """Return a snapshot list of every registered :class:`ToolDefinition`. Takes the lock and copies the current values of ``_tools`` so callers can iterate the full set of tools (name, description, parameters and flags) without holding the lock or racing a reload. It backs a wide range of consumers across the repo -- the tool-embedding indexer in ``classifiers.update_tool_embeddings`` and ``classifiers.build_tool_index``, the web ``tools_tasks_api`` listing endpoint, the subagent tool filter in ``tools/subagent_tools.py``, ``tools/search_tools.py`` and the write/import-tool flows -- each of which then reads fields off the returned definitions. Returns: A new list of the registered tool definitions. """ with self._lock: return list(self._tools.values())
[docs] def tool_names(self) -> frozenset[str]: """Return the set of all registered tool names as an immutable snapshot. Reads the keys of ``_tools`` under the lock and freezes them, giving a thread-safe, cheaply-comparable view of which tools currently exist. Used by ``message_processor.generate_and_send`` to validate and gate tool selection (for example checking that ``activate_skill`` is present) and by the per-service tool-loading tests to assert a service finished registering its tools. Returns: A frozenset of the registered tool names. """ with self._lock: return frozenset(self._tools.keys())
[docs] def repeat_allowed_tools(self) -> frozenset[str]: """Return the names of tools exempt from repetition-loop detection. Collects the names of every definition whose ``allow_repeat`` flag is set, snapshotting them under the lock. The executor (``openrouter_client.executor``) reads this set so it can suppress its "the model keeps calling the same tool" guard for tools that are legitimately meant to be invoked repeatedly. Returns: A frozenset of repeat-exempt tool names. """ with self._lock: return frozenset(td.name for td in self._tools.values() if td.allow_repeat)
@property def has_tools(self) -> bool: """Return whether any tool is currently registered. A cheap, lock-guarded predicate used to decide whether tool-calling is even possible. The transport layer (``openrouter_client.transport``) checks it before attaching a ``tools`` block to an outbound request, and the per-service loading tests use it to confirm that the inference service registered tools while the web service did not. Returns: ``True`` if at least one tool is registered, else ``False``. """ with self._lock: return len(self._tools) > 0
[docs] def __len__(self) -> int: """Return the number of registered tools. Lets the registry be used directly in ``len(registry)`` and boolean contexts. Reads the size of ``_tools`` under the lock. Returns: The count of registered tool definitions. """ with self._lock: return len(self._tools)
# ------------------------------------------------------------------ # Internal helpers # ------------------------------------------------------------------ # Cache so we only inspect each handler once. _ctx_cache: dict[int, bool] = {} _sig_cache: dict[int, inspect.Signature | None] = {} def _filter_arguments( handler: Callable, arguments: dict[str, Any], tool_name: str, ) -> dict[str, Any]: """Strip keyword arguments that *handler* does not accept. If the handler accepts ``**kwargs`` all arguments are passed through. Otherwise only recognised parameter names are kept, and any dropped keys are logged as a warning so hallucinated params don't crash the call. """ key = id(handler) sig = _sig_cache.get(key, ...) # sentinel to distinguish "not cached" from "None" if sig is ...: try: sig = inspect.signature(handler) except (ValueError, TypeError): sig = None _sig_cache[key] = sig if sig is None: return arguments # can't introspect — pass everything through # If the handler has a **kwargs catch-all, everything is fine. if any(p.kind == inspect.Parameter.VAR_KEYWORD for p in sig.parameters.values()): return arguments valid_names = set(sig.parameters.keys()) - {"ctx"} unexpected = set(arguments.keys()) - valid_names if not unexpected: return arguments logger.warning( "Tool '%s': dropping unexpected argument(s) %s", tool_name, ", ".join(sorted(unexpected)), ) return {k: v for k, v in arguments.items() if k in valid_names} def _handler_accepts_ctx(handler: Callable) -> bool: """Return whether *handler* declares a ``ctx`` parameter. Tells :meth:`ToolRegistry.call` whether to inject the active :class:`~tool_context.ToolContext` when dispatching, so that context-unaware tools are not handed an argument they do not expect. The result is memoized in the module-level ``_ctx_cache`` keyed by the handler's :func:`id`, so each handler is introspected with :func:`inspect.signature` only once; signatures that cannot be introspected are treated as not accepting ``ctx``. Args: handler: The tool coroutine to inspect. Returns: ``True`` if the handler accepts a ``ctx`` keyword, else ``False``. """ key = id(handler) cached = _ctx_cache.get(key) if cached is not None: return cached try: sig = inspect.signature(handler) result = "ctx" in sig.parameters except (ValueError, TypeError): result = False _ctx_cache[key] = result return result async def _resolve_secret_refs( arguments: dict[str, Any], user_id: str, ctx: "ToolContext", ) -> str | None: """Expand ``secret:`` references in tool arguments into their plaintext values. Walks the string arguments and, for any value beginning with the ``secret:`` prefix, looks up the named secret for *user_id* and substitutes the decrypted value in place. This lets a user pass a token by name without the model ever seeing the real credential. Resolution is delegated to ``tools.manage_secrets.resolve_user_secret``, which reads the user's encrypted secret store via ``ctx.redis`` (and ``ctx.config`` for the key-database path); *arguments* is mutated in place on success. It is called by :meth:`ToolRegistry.call` just before dispatch whenever both a context and a ``user_id`` are present. Args: arguments: The tool's keyword arguments; mutated in place when a reference is resolved. user_id: The owner whose secret store is consulted. ctx: The active tool context supplying Redis and configuration. Returns: ``None`` if all references resolved (or none were present), or a human-readable error string describing the first failure: an empty name, a missing secret, or a decryption failure. """ from tools.manage_secrets import SECRET_REF_PREFIX, resolve_user_secret for key, val in list(arguments.items()): if isinstance(val, str) and val.startswith(SECRET_REF_PREFIX): name = val[len(SECRET_REF_PREFIX) :].strip() if not name: return "Invalid secret reference: secret: must be followed by a name." resolved = await resolve_user_secret( user_id, name, redis_client=ctx.redis, config=getattr(ctx, "config", None), ) if resolved is None: return f"Secret '{name}' not found or could not be decrypted." arguments[key] = resolved return None