Source code for tools_main

"""Tools service entry point — dedicated, isolatable tool-execution worker.

Runs :class:`ToolsService`, a :class:`~core.service_base.StargazerService` whose
sole job is to execute tools on behalf of the inference tier. It is the **only**
process that imports the tool handler modules in ``tools/``: on boot it loads the
full :class:`~tools.ToolRegistry`, publishes the tool **catalog** to Redis (so
inference can present tools without importing them), and consumes tool-execution
requests off ``sg:stream:tools`` (group ``sg:tools``, load-balanced across
instances). Each request reconstructs a :class:`~tool_context.ToolContext` from
the request's identity fields plus this service's shared managers and a
:class:`~core.proxy_adapter.ProxyPlatformAdapter` (so tool sends still publish to
the gateway and raw-SDK ops still delegate to it), runs the tool, and replies on
the originating worker's reply stream with the result plus the write-back
channels (``sent_files`` media bytes, ``injected_tools``, ``sent_rich_messages``).

Deliberately lighter than inference: no message processor, no swarm/subagent
system, no presence manager, no inbound consumer. Compound tools that spawn
nested LLM loops stay pinned to inference (``INFERENCE_PINNED_TOOLS``); the
``OpenRouterClient`` wired here exists only for the embedding/KG managers some
delegated tools use. Launched standalone (``python tools_main.py``) by
``scripts/systemd/stargazer-tools.service``.
"""

from __future__ import annotations

import asyncio
import json
import logging
import re
import signal
import time
import uuid
from typing import Any

import msgpack

from config import Config
from core.event_bus import RedisEventBus
from core.proxy_adapter import ProxyPlatformAdapter
from core.service_base import StargazerService
from core.tool_catalog import publish_catalog
from core.tools_consumer import ToolExecConsumer
# Authoritative privileged/dangerous-tool name set (frozenset of strings, no
# handler imports) — every delegated call to one of these is audit-logged with
# its arguments. See tools/unsandboxed_exec_tool_names.py and the
# unsandboxed-exec-surface component spec.
from tools.unsandboxed_exec_tool_names import TOOL_NAMES_REQUIRING_UNSANDBOXED_EXEC

logger = logging.getLogger("tools")

_CLAIM_PREFIX = "sg:tools:claim:"
_RESULT_PREFIX = "sg:tools:result:"

# Argument keys whose values are redacted in the audit log.
_SENSITIVE_ARG_RE = re.compile(
    r"(pass|secret|token|api[_-]?key|auth|credential|private[_-]?key|cookie)", re.I
)
_AUDIT_ARG_LIMIT = 2000

# A tool reports a permission denial as *returned* data (e.g.
# ``{"success": false, "error": "...does not have the UNSANDBOXED_EXEC
# privilege..."}``), NOT as a raised exception — so a clean return is not the
# same as a successful execution. This matches the canonical refusal strings
# emitted by the privilege gates (winrm_tools.check_unsandboxed_exec et al.).
_DENIAL_RE = re.compile(
    r"does not have the .*privilege|access denied|permission denied|"
    r"not authori[sz]ed|unauthori[sz]ed",
    re.I,
)


def _classify_tool_outcome(result: Any) -> tuple[str, str]:
    """Classify a tool's RETURN value as ``ok`` | ``denied`` | ``failed``.

    Tools surface privilege refusals and internal errors in their *result*, so
    "returned without raising" must not be logged as success — that footgun
    once made a denied ``execute_python_unsandboxed`` call look like it ran.
    Returns ``(outcome, detail)`` where *detail* is a short error snippet for
    non-ok outcomes (empty when ok). Exceptions are handled by the caller.
    """
    obj: Any = result
    if isinstance(result, str):
        s = result.lstrip()
        if s[:1] in "{[":
            try:
                obj = json.loads(s)
            except Exception:
                obj = result
    if isinstance(obj, dict):
        success = obj.get("success")
        err = str(obj.get("error") or obj.get("message") or "")
        if success is False:
            return ("denied" if _DENIAL_RE.search(err) else "failed"), err[:200]
        if success is None and err and _DENIAL_RE.search(err):
            return "denied", err[:200]
        return "ok", ""
    if isinstance(result, str) and _DENIAL_RE.search(result):
        return "denied", result[:200]
    return "ok", ""


def _format_audit_args(args: Any) -> str:
    """Serialize tool arguments for the audit log, redacting secret-shaped keys
    and truncating. Operates on the pre-secret-resolution args, so ``secret:``
    references are logged as references, never resolved secret values."""
    if isinstance(args, dict):
        red = {
            k: ("<redacted>" if _SENSITIVE_ARG_RE.search(str(k)) else v)
            for k, v in args.items()
        }
        try:
            s = json.dumps(red, default=str, ensure_ascii=False)
        except Exception:
            s = str(red)
    else:
        s = str(args)
    if len(s) > _AUDIT_ARG_LIMIT:
        s = s[:_AUDIT_ARG_LIMIT] + f"…[+{len(s) - _AUDIT_ARG_LIMIT} chars]"
    return s


[docs] class ToolsService(StargazerService): """Dedicated tool-execution service (service tier ``"tools"``).""" def __init__( self, config: Config, redis_client: Any, instance_id: str, use_health_server: bool = True, ) -> None: super().__init__( service_name="tools", instance_id=instance_id, redis_client=redis_client, redis_required=True, use_health_server=use_health_server, ) self.cfg = config self.tool_registry: Any = None self.event_bus: RedisEventBus | None = None self.consumer: ToolExecConsumer | None = None self.openrouter: Any = None self.message_cache: Any = None self.conversation_mgr: Any = None self.kg_manager: Any = None self.threadweave: Any = None self.task_manager: Any = None self.persona_pref_manager: Any = None self._raw: Any = None self._schema_hash: str = "" self._stop_event = asyncio.Event() self._config_listener_task: asyncio.Task | None = None
[docs] def get_adapter(self, platform_name: str) -> ProxyPlatformAdapter: """Return a proxy adapter so tool sends publish to the gateway's stream.""" return ProxyPlatformAdapter(self.event_bus, platform_name)
[docs] async def on_start(self) -> None: logger.info("Initializing Tools service dependencies...") # The one place tools load for execution. from tools import ToolRegistry from tool_loader import load_tools self.tool_registry = ToolRegistry() if self.cfg.tool_permissions: self.tool_registry.set_permissions(self.cfg.tool_permissions) await asyncio.to_thread(load_tools, self.cfg.tools_dir, self.tool_registry) logger.info("Loaded %d tools", len(self.tool_registry)) # LLM client — needed by the embedding/KG/threadweave managers below, NOT # for running agentic compound tools (those stay pinned to inference). from openrouter_client import OpenRouterClient self.openrouter = OpenRouterClient( api_key=self.cfg.api_key, model=self.cfg.model, temperature=self.cfg.temperature, max_tokens=self.cfg.max_tokens, top_p=self.cfg.top_p, tool_registry=self.tool_registry, base_url=self.cfg.llm_base_url, gemini_api_key=self.cfg.gemini_api_key, ) # Shared Redis-backed managers (subset of the inference stack). from message_cache import MessageCache if self.cfg.redis_sentinels: ssl_kw = self.cfg.redis_ssl_kwargs() elif self.cfg.redis_url: ssl_kw = self.cfg.redis_connection_kwargs_for_url(self.cfg.redis_url) else: ssl_kw = {} self.message_cache = MessageCache( redis_url=self.cfg.redis_url, openrouter_client=self.openrouter, embedding_model=self.cfg.embedding_model, ssl_kwargs=ssl_kw, redis_sentinels=self.cfg.redis_sentinels, redis_sentinel_master=self.cfg.redis_sentinel_master, resilience_kwargs=self.cfg.redis_resilience_kwargs(), ) redis_client = self.message_cache.redis_client self._raw = self.message_cache.redis_raw_client from observability import set_observability_redis set_observability_redis(redis_client) from conversation import ConversationManager from knowledge_graph import KnowledgeGraphManager from prompt_renderer import PromptRenderer from threadweave import ThreadweaveManager from task_manager import TaskManager renderer = PromptRenderer(self.cfg.system_prompt_file) self.conversation_mgr = ConversationManager( prompt_renderer=renderer, max_history=self.cfg.max_history, redis=redis_client, ) self.kg_manager = KnowledgeGraphManager( redis_client=redis_client, openrouter=self.openrouter, embedding_model=self.cfg.embedding_model, ) self.threadweave = ThreadweaveManager( redis_client=redis_client, openrouter=self.openrouter, embedding_model=self.cfg.embedding_model, dna_vault_path=self.cfg.dna_vault_path, ) self.task_manager = TaskManager(timeout=10.0, redis=redis_client) self.tool_registry.task_manager = self.task_manager self.persona_pref_manager = None # Event bus + tools-stream consumer. self.event_bus = RedisEventBus( redis=self._raw, # decode_responses=False for binary msgpack node_role="tools", node_id=self.instance_id, ) await self.event_bus.ensure_streams() # Publish the catalog so inference can present tools without importing them. _seq, self._schema_hash = await publish_catalog(self._raw, self.tool_registry) self.consumer = ToolExecConsumer( redis=self._raw, consumer_name=self.instance_id, process_fn=self._handle_tool_exec, ) await self.consumer.start() self._config_listener_task = asyncio.create_task( self._listen_config_updates(), name="tools_config_listener" ) logger.info("ToolsService ready (schema_hash=%s)", self._schema_hash[:12])
# ── request handling ────────────────────────────────────────── def _build_ctx(self, ctx_payload: dict[str, Any], identity: dict[str, Any]) -> Any: from tool_context import ToolContext # SECURITY: identity-sensitive fields (user_id, guild_id, channel_id, # platform) come from the trusted *identity* (the authenticated session # record written by inference, keyed by trace_id) — NOT from the request # envelope, which is untrusted. Non-identity fields (user_name display, # message_id, skill catalog, room_context) come from the envelope. platform = identity.get("platform") or ctx_payload.get("platform") or "discord" trace_id = ctx_payload.get("trace_id") or ctx_payload.get("observability_request_id") or "" adapter = ProxyPlatformAdapter(self.event_bus, platform, trace_id=trace_id) return ToolContext( platform=platform, channel_id=identity.get("channel_id", "") or ctx_payload.get("channel_id", ""), user_id=identity.get("user_id", ""), user_name=ctx_payload.get("user_name", ""), guild_id=identity.get("guild_id", ""), message_id=ctx_payload.get("message_id", ""), observability_request_id=ctx_payload.get("observability_request_id", ""), disclosed_skill_ids=list(ctx_payload.get("disclosed_skill_ids") or []), room_context=ctx_payload.get("room_context"), adapter=adapter, config=self.cfg, redis=self.message_cache.redis_client, message_cache=self.message_cache, kg_manager=self.kg_manager, conversation_manager=self.conversation_mgr, threadweave=self.threadweave, task_manager=self.task_manager, tool_registry=self.tool_registry, openrouter=self.openrouter, persona_pref_manager=self.persona_pref_manager, adapters_by_name={platform: adapter}, ) async def _handle_tool_exec(self, payload: dict[str, Any]) -> None: correlation_id = payload.get("correlation_id") reply_to = payload.get("reply_to") idem = payload.get("idem_key") or correlation_id or uuid.uuid4().hex claim_key = f"{_CLAIM_PREFIX}{idem}" result_key = f"{_RESULT_PREFIX}{idem}" # Idempotency: dedup autoclaim re-drives / retries. acquired = await self._raw.set(claim_key, b"1", nx=True, ex=300) if not acquired: cached = await self._raw.get(result_key) if cached is not None and reply_to: await self.event_bus.publish_tools_reply( reply_to, msgpack.unpackb(cached, raw=False) ) return # already done (re-replied) or still in flight elsewhere # Catalog staleness guard. req_hash = payload.get("schema_hash") if req_hash and self._schema_hash and req_hash != self._schema_hash: await self._reply(reply_to, {"correlation_id": correlation_id, "status": "CATALOG_STALE"}) return name = payload.get("tool_name", "") args = payload.get("tool_args") or {} ctx_payload = payload.get("ctx") or {} trace_id = ctx_payload.get("trace_id") or "" # Resolve identity from the authenticated session record (NOT the # envelope). Fail closed when it is missing unless explicitly relaxed. from core.tool_session import read_session session = await read_session(self._raw, trace_id) if session is None: if getattr(self.cfg, "tools_require_session_record", True): logger.warning("Rejecting tool %r: no authenticated session (trace_id=%s)", name, trace_id) await self._reply( reply_to, {"correlation_id": correlation_id, "error": "Tool request rejected: no authenticated session."}, ) return logger.warning("No session record for trace_id=%s; falling back to envelope identity", trace_id) session = { "user_id": ctx_payload.get("user_id", ""), "guild_id": ctx_payload.get("guild_id", ""), "channel_id": ctx_payload.get("channel_id", ""), "platform": ctx_payload.get("platform", ""), } user_id = session.get("user_id", "") # authoritative channel_id = session.get("channel_id", "") or ctx_payload.get("channel_id", "") ctx = self._build_ctx(ctx_payload, session) # Audit log: every delegated execution, with arguments for privileged # (UNSANDBOXED_EXEC) tools so there is a trail of what dangerous ops ran. if name in TOOL_NAMES_REQUIRING_UNSANDBOXED_EXEC: logger.warning( "tool_exec[privileged] name=%s user=%s channel=%s platform=%s args=%s", name, user_id, channel_id, session.get("platform", ""), _format_audit_args(args), ) else: logger.info( "tool_exec name=%s user=%s channel=%s", name, user_id, channel_id ) _t0 = time.monotonic() outcome = "ok" detail = "" try: result = await self.tool_registry.call(name, args, user_id=user_id, ctx=ctx, nested=False) # A clean return is NOT proof of execution: privilege denials and # internal failures come back as result data, not exceptions. outcome, detail = _classify_tool_outcome(result) reply = { "correlation_id": correlation_id, "result": str(result), "writeback": { "sent_files": list(ctx.sent_files), "injected_tools": list(ctx.injected_tools or []), "sent_rich_messages": list(ctx.sent_rich_messages), }, } except Exception as exc: outcome = "exception" detail = type(exc).__name__ logger.exception("Tool %r failed on tools service", name) reply = { "correlation_id": correlation_id, "error": f"Tool '{name}' raised {type(exc).__name__} on the tools service.", } # Non-ok outcomes (denied/failed/exception) escalate to WARNING so the # audit trail surfaces refusals of privileged tools, which return a # denial string and would otherwise read as a successful call. logger.log( logging.INFO if outcome == "ok" else logging.WARNING, "tool_exec done name=%s user=%s outcome=%s%s %.0fms", name, user_id, outcome, f" detail={detail!r}" if detail else "", (time.monotonic() - _t0) * 1000, ) # Cache the reply for idempotent re-delivery, then send it. try: await self._raw.set(result_key, msgpack.packb(reply, use_bin_type=True), ex=120) except Exception: logger.debug("failed to cache tool result", exc_info=True) await self._reply(reply_to, reply) async def _reply(self, reply_to: str | None, reply: dict[str, Any]) -> None: if reply_to: try: await self.event_bus.publish_tools_reply(reply_to, reply) except Exception: logger.warning("failed to publish tool reply", exc_info=True) async def _listen_config_updates(self) -> None: """Reload tool permissions and re-publish the catalog on config changes.""" try: pubsub = self._raw.pubsub() await pubsub.subscribe("sys:config:updated") except Exception: return try: async for msg in pubsub.listen(): if msg.get("type") != "message": continue try: self.cfg = Config.load() self.tool_registry.set_permissions(self.cfg.tool_permissions or {}) _seq, self._schema_hash = await publish_catalog(self._raw, self.tool_registry) logger.info("Reloaded config + re-published catalog") except Exception: logger.warning("config reload failed", exc_info=True) except asyncio.CancelledError: pass finally: try: await pubsub.aclose() except Exception: pass # ── lifecycle ─────────────────────────────────────────────────
[docs] async def run(self) -> None: await self._stop_event.wait()
[docs] async def on_stop(self) -> None: logger.info("Shutting down ToolsService...") self._stop_event.set() if self._config_listener_task and not self._config_listener_task.done(): self._config_listener_task.cancel() try: await self._config_listener_task except asyncio.CancelledError: pass if self.consumer: await self.consumer.stop()
[docs] async def main() -> None: logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", ) cfg = Config.load() instance_id = f"tools-{uuid.uuid4().hex[:8]}" redis_client = cfg.build_async_redis_client(decode_responses=False) service = ToolsService(cfg, redis_client, instance_id) loop = asyncio.get_running_loop() for sig in (signal.SIGINT, signal.SIGTERM): loop.add_signal_handler(sig, lambda: asyncio.create_task(service.shutdown())) try: await service.boot() await service.run() finally: if redis_client: await redis_client.aclose()
if __name__ == "__main__": asyncio.run(main())