Source code for tools_main
"""Tools service entry point — dedicated, isolatable tool-execution worker.
Runs :class:`ToolsService`, a :class:`~core.service_base.StargazerService` whose
sole job is to execute tools on behalf of the inference tier. It is the **only**
process that imports the tool handler modules in ``tools/``: on boot it loads the
full :class:`~tools.ToolRegistry`, publishes the tool **catalog** to Redis (so
inference can present tools without importing them), and consumes tool-execution
requests off ``sg:stream:tools`` (group ``sg:tools``, load-balanced across
instances). Each request reconstructs a :class:`~tool_context.ToolContext` from
the request's identity fields plus this service's shared managers and a
:class:`~core.proxy_adapter.ProxyPlatformAdapter` (so tool sends still publish to
the gateway and raw-SDK ops still delegate to it), runs the tool, and replies on
the originating worker's reply stream with the result plus the write-back
channels (``sent_files`` media bytes, ``injected_tools``, ``sent_rich_messages``).
Deliberately lighter than inference: no message processor, no swarm/subagent
system, no presence manager, no inbound consumer. Compound tools that spawn
nested LLM loops stay pinned to inference (``INFERENCE_PINNED_TOOLS``); the
``OpenRouterClient`` wired here exists only for the embedding/KG managers some
delegated tools use. Launched standalone (``python tools_main.py``) by
``scripts/systemd/stargazer-tools.service``.
"""
from __future__ import annotations
import asyncio
import json
import logging
import re
import signal
import time
import uuid
from typing import Any
import msgpack
from config import Config
from core.event_bus import RedisEventBus
from core.proxy_adapter import ProxyPlatformAdapter
from core.service_base import StargazerService
from core.tool_catalog import publish_catalog
from core.tools_consumer import ToolExecConsumer
# Authoritative privileged/dangerous-tool name set (frozenset of strings, no
# handler imports) — every delegated call to one of these is audit-logged with
# its arguments. See tools/unsandboxed_exec_tool_names.py and the
# unsandboxed-exec-surface component spec.
from tools.unsandboxed_exec_tool_names import TOOL_NAMES_REQUIRING_UNSANDBOXED_EXEC
logger = logging.getLogger("tools")
_CLAIM_PREFIX = "sg:tools:claim:"
_RESULT_PREFIX = "sg:tools:result:"
# Argument keys whose values are redacted in the audit log.
_SENSITIVE_ARG_RE = re.compile(
r"(pass|secret|token|api[_-]?key|auth|credential|private[_-]?key|cookie)", re.I
)
_AUDIT_ARG_LIMIT = 2000
# A tool reports a permission denial as *returned* data (e.g.
# ``{"success": false, "error": "...does not have the UNSANDBOXED_EXEC
# privilege..."}``), NOT as a raised exception — so a clean return is not the
# same as a successful execution. This matches the canonical refusal strings
# emitted by the privilege gates (winrm_tools.check_unsandboxed_exec et al.).
_DENIAL_RE = re.compile(
r"does not have the .*privilege|access denied|permission denied|"
r"not authori[sz]ed|unauthori[sz]ed",
re.I,
)
def _classify_tool_outcome(result: Any) -> tuple[str, str]:
"""Classify a tool's RETURN value as ``ok`` | ``denied`` | ``failed``.
Tools surface privilege refusals and internal errors in their *result*, so
"returned without raising" must not be logged as success — that footgun
once made a denied ``execute_python_unsandboxed`` call look like it ran.
Returns ``(outcome, detail)`` where *detail* is a short error snippet for
non-ok outcomes (empty when ok). Exceptions are handled by the caller.
"""
obj: Any = result
if isinstance(result, str):
s = result.lstrip()
if s[:1] in "{[":
try:
obj = json.loads(s)
except Exception:
obj = result
if isinstance(obj, dict):
success = obj.get("success")
err = str(obj.get("error") or obj.get("message") or "")
if success is False:
return ("denied" if _DENIAL_RE.search(err) else "failed"), err[:200]
if success is None and err and _DENIAL_RE.search(err):
return "denied", err[:200]
return "ok", ""
if isinstance(result, str) and _DENIAL_RE.search(result):
return "denied", result[:200]
return "ok", ""
def _format_audit_args(args: Any) -> str:
"""Serialize tool arguments for the audit log, redacting secret-shaped keys
and truncating. Operates on the pre-secret-resolution args, so ``secret:``
references are logged as references, never resolved secret values."""
if isinstance(args, dict):
red = {
k: ("<redacted>" if _SENSITIVE_ARG_RE.search(str(k)) else v)
for k, v in args.items()
}
try:
s = json.dumps(red, default=str, ensure_ascii=False)
except Exception:
s = str(red)
else:
s = str(args)
if len(s) > _AUDIT_ARG_LIMIT:
s = s[:_AUDIT_ARG_LIMIT] + f"…[+{len(s) - _AUDIT_ARG_LIMIT} chars]"
return s
[docs]
class ToolsService(StargazerService):
"""Dedicated tool-execution service (service tier ``"tools"``)."""
def __init__(
self,
config: Config,
redis_client: Any,
instance_id: str,
use_health_server: bool = True,
) -> None:
super().__init__(
service_name="tools",
instance_id=instance_id,
redis_client=redis_client,
redis_required=True,
use_health_server=use_health_server,
)
self.cfg = config
self.tool_registry: Any = None
self.event_bus: RedisEventBus | None = None
self.consumer: ToolExecConsumer | None = None
self.openrouter: Any = None
self.message_cache: Any = None
self.conversation_mgr: Any = None
self.kg_manager: Any = None
self.threadweave: Any = None
self.task_manager: Any = None
self.persona_pref_manager: Any = None
self._raw: Any = None
self._schema_hash: str = ""
self._stop_event = asyncio.Event()
self._config_listener_task: asyncio.Task | None = None
[docs]
def get_adapter(self, platform_name: str) -> ProxyPlatformAdapter:
"""Return a proxy adapter so tool sends publish to the gateway's stream."""
return ProxyPlatformAdapter(self.event_bus, platform_name)
[docs]
async def on_start(self) -> None:
logger.info("Initializing Tools service dependencies...")
# The one place tools load for execution.
from tools import ToolRegistry
from tool_loader import load_tools
self.tool_registry = ToolRegistry()
if self.cfg.tool_permissions:
self.tool_registry.set_permissions(self.cfg.tool_permissions)
await asyncio.to_thread(load_tools, self.cfg.tools_dir, self.tool_registry)
logger.info("Loaded %d tools", len(self.tool_registry))
# LLM client — needed by the embedding/KG/threadweave managers below, NOT
# for running agentic compound tools (those stay pinned to inference).
from openrouter_client import OpenRouterClient
self.openrouter = OpenRouterClient(
api_key=self.cfg.api_key,
model=self.cfg.model,
temperature=self.cfg.temperature,
max_tokens=self.cfg.max_tokens,
top_p=self.cfg.top_p,
tool_registry=self.tool_registry,
base_url=self.cfg.llm_base_url,
gemini_api_key=self.cfg.gemini_api_key,
)
# Shared Redis-backed managers (subset of the inference stack).
from message_cache import MessageCache
if self.cfg.redis_sentinels:
ssl_kw = self.cfg.redis_ssl_kwargs()
elif self.cfg.redis_url:
ssl_kw = self.cfg.redis_connection_kwargs_for_url(self.cfg.redis_url)
else:
ssl_kw = {}
self.message_cache = MessageCache(
redis_url=self.cfg.redis_url,
openrouter_client=self.openrouter,
embedding_model=self.cfg.embedding_model,
ssl_kwargs=ssl_kw,
redis_sentinels=self.cfg.redis_sentinels,
redis_sentinel_master=self.cfg.redis_sentinel_master,
resilience_kwargs=self.cfg.redis_resilience_kwargs(),
)
redis_client = self.message_cache.redis_client
self._raw = self.message_cache.redis_raw_client
from observability import set_observability_redis
set_observability_redis(redis_client)
from conversation import ConversationManager
from knowledge_graph import KnowledgeGraphManager
from prompt_renderer import PromptRenderer
from threadweave import ThreadweaveManager
from task_manager import TaskManager
renderer = PromptRenderer(self.cfg.system_prompt_file)
self.conversation_mgr = ConversationManager(
prompt_renderer=renderer,
max_history=self.cfg.max_history,
redis=redis_client,
)
self.kg_manager = KnowledgeGraphManager(
redis_client=redis_client,
openrouter=self.openrouter,
embedding_model=self.cfg.embedding_model,
)
self.threadweave = ThreadweaveManager(
redis_client=redis_client,
openrouter=self.openrouter,
embedding_model=self.cfg.embedding_model,
dna_vault_path=self.cfg.dna_vault_path,
)
self.task_manager = TaskManager(timeout=10.0, redis=redis_client)
self.tool_registry.task_manager = self.task_manager
self.persona_pref_manager = None
# Event bus + tools-stream consumer.
self.event_bus = RedisEventBus(
redis=self._raw, # decode_responses=False for binary msgpack
node_role="tools",
node_id=self.instance_id,
)
await self.event_bus.ensure_streams()
# Publish the catalog so inference can present tools without importing them.
_seq, self._schema_hash = await publish_catalog(self._raw, self.tool_registry)
self.consumer = ToolExecConsumer(
redis=self._raw,
consumer_name=self.instance_id,
process_fn=self._handle_tool_exec,
)
await self.consumer.start()
self._config_listener_task = asyncio.create_task(
self._listen_config_updates(), name="tools_config_listener"
)
logger.info("ToolsService ready (schema_hash=%s)", self._schema_hash[:12])
# ── request handling ──────────────────────────────────────────
def _build_ctx(self, ctx_payload: dict[str, Any], identity: dict[str, Any]) -> Any:
from tool_context import ToolContext
# SECURITY: identity-sensitive fields (user_id, guild_id, channel_id,
# platform) come from the trusted *identity* (the authenticated session
# record written by inference, keyed by trace_id) — NOT from the request
# envelope, which is untrusted. Non-identity fields (user_name display,
# message_id, skill catalog, room_context) come from the envelope.
platform = identity.get("platform") or ctx_payload.get("platform") or "discord"
trace_id = ctx_payload.get("trace_id") or ctx_payload.get("observability_request_id") or ""
adapter = ProxyPlatformAdapter(self.event_bus, platform, trace_id=trace_id)
return ToolContext(
platform=platform,
channel_id=identity.get("channel_id", "") or ctx_payload.get("channel_id", ""),
user_id=identity.get("user_id", ""),
user_name=ctx_payload.get("user_name", ""),
guild_id=identity.get("guild_id", ""),
message_id=ctx_payload.get("message_id", ""),
observability_request_id=ctx_payload.get("observability_request_id", ""),
disclosed_skill_ids=list(ctx_payload.get("disclosed_skill_ids") or []),
room_context=ctx_payload.get("room_context"),
adapter=adapter,
config=self.cfg,
redis=self.message_cache.redis_client,
message_cache=self.message_cache,
kg_manager=self.kg_manager,
conversation_manager=self.conversation_mgr,
threadweave=self.threadweave,
task_manager=self.task_manager,
tool_registry=self.tool_registry,
openrouter=self.openrouter,
persona_pref_manager=self.persona_pref_manager,
adapters_by_name={platform: adapter},
)
async def _handle_tool_exec(self, payload: dict[str, Any]) -> None:
correlation_id = payload.get("correlation_id")
reply_to = payload.get("reply_to")
idem = payload.get("idem_key") or correlation_id or uuid.uuid4().hex
claim_key = f"{_CLAIM_PREFIX}{idem}"
result_key = f"{_RESULT_PREFIX}{idem}"
# Idempotency: dedup autoclaim re-drives / retries.
acquired = await self._raw.set(claim_key, b"1", nx=True, ex=300)
if not acquired:
cached = await self._raw.get(result_key)
if cached is not None and reply_to:
await self.event_bus.publish_tools_reply(
reply_to, msgpack.unpackb(cached, raw=False)
)
return # already done (re-replied) or still in flight elsewhere
# Catalog staleness guard.
req_hash = payload.get("schema_hash")
if req_hash and self._schema_hash and req_hash != self._schema_hash:
await self._reply(reply_to, {"correlation_id": correlation_id, "status": "CATALOG_STALE"})
return
name = payload.get("tool_name", "")
args = payload.get("tool_args") or {}
ctx_payload = payload.get("ctx") or {}
trace_id = ctx_payload.get("trace_id") or ""
# Resolve identity from the authenticated session record (NOT the
# envelope). Fail closed when it is missing unless explicitly relaxed.
from core.tool_session import read_session
session = await read_session(self._raw, trace_id)
if session is None:
if getattr(self.cfg, "tools_require_session_record", True):
logger.warning("Rejecting tool %r: no authenticated session (trace_id=%s)", name, trace_id)
await self._reply(
reply_to,
{"correlation_id": correlation_id, "error": "Tool request rejected: no authenticated session."},
)
return
logger.warning("No session record for trace_id=%s; falling back to envelope identity", trace_id)
session = {
"user_id": ctx_payload.get("user_id", ""),
"guild_id": ctx_payload.get("guild_id", ""),
"channel_id": ctx_payload.get("channel_id", ""),
"platform": ctx_payload.get("platform", ""),
}
user_id = session.get("user_id", "") # authoritative
channel_id = session.get("channel_id", "") or ctx_payload.get("channel_id", "")
ctx = self._build_ctx(ctx_payload, session)
# Audit log: every delegated execution, with arguments for privileged
# (UNSANDBOXED_EXEC) tools so there is a trail of what dangerous ops ran.
if name in TOOL_NAMES_REQUIRING_UNSANDBOXED_EXEC:
logger.warning(
"tool_exec[privileged] name=%s user=%s channel=%s platform=%s args=%s",
name, user_id, channel_id, session.get("platform", ""),
_format_audit_args(args),
)
else:
logger.info(
"tool_exec name=%s user=%s channel=%s", name, user_id, channel_id
)
_t0 = time.monotonic()
outcome = "ok"
detail = ""
try:
result = await self.tool_registry.call(name, args, user_id=user_id, ctx=ctx, nested=False)
# A clean return is NOT proof of execution: privilege denials and
# internal failures come back as result data, not exceptions.
outcome, detail = _classify_tool_outcome(result)
reply = {
"correlation_id": correlation_id,
"result": str(result),
"writeback": {
"sent_files": list(ctx.sent_files),
"injected_tools": list(ctx.injected_tools or []),
"sent_rich_messages": list(ctx.sent_rich_messages),
},
}
except Exception as exc:
outcome = "exception"
detail = type(exc).__name__
logger.exception("Tool %r failed on tools service", name)
reply = {
"correlation_id": correlation_id,
"error": f"Tool '{name}' raised {type(exc).__name__} on the tools service.",
}
# Non-ok outcomes (denied/failed/exception) escalate to WARNING so the
# audit trail surfaces refusals of privileged tools, which return a
# denial string and would otherwise read as a successful call.
logger.log(
logging.INFO if outcome == "ok" else logging.WARNING,
"tool_exec done name=%s user=%s outcome=%s%s %.0fms",
name, user_id, outcome,
f" detail={detail!r}" if detail else "",
(time.monotonic() - _t0) * 1000,
)
# Cache the reply for idempotent re-delivery, then send it.
try:
await self._raw.set(result_key, msgpack.packb(reply, use_bin_type=True), ex=120)
except Exception:
logger.debug("failed to cache tool result", exc_info=True)
await self._reply(reply_to, reply)
async def _reply(self, reply_to: str | None, reply: dict[str, Any]) -> None:
if reply_to:
try:
await self.event_bus.publish_tools_reply(reply_to, reply)
except Exception:
logger.warning("failed to publish tool reply", exc_info=True)
async def _listen_config_updates(self) -> None:
"""Reload tool permissions and re-publish the catalog on config changes."""
try:
pubsub = self._raw.pubsub()
await pubsub.subscribe("sys:config:updated")
except Exception:
return
try:
async for msg in pubsub.listen():
if msg.get("type") != "message":
continue
try:
self.cfg = Config.load()
self.tool_registry.set_permissions(self.cfg.tool_permissions or {})
_seq, self._schema_hash = await publish_catalog(self._raw, self.tool_registry)
logger.info("Reloaded config + re-published catalog")
except Exception:
logger.warning("config reload failed", exc_info=True)
except asyncio.CancelledError:
pass
finally:
try:
await pubsub.aclose()
except Exception:
pass
# ── lifecycle ─────────────────────────────────────────────────
[docs]
async def on_stop(self) -> None:
logger.info("Shutting down ToolsService...")
self._stop_event.set()
if self._config_listener_task and not self._config_listener_task.done():
self._config_listener_task.cancel()
try:
await self._config_listener_task
except asyncio.CancelledError:
pass
if self.consumer:
await self.consumer.stop()
[docs]
async def main() -> None:
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
cfg = Config.load()
instance_id = f"tools-{uuid.uuid4().hex[:8]}"
redis_client = cfg.build_async_redis_client(decode_responses=False)
service = ToolsService(cfg, redis_client, instance_id)
loop = asyncio.get_running_loop()
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(sig, lambda: asyncio.create_task(service.shutdown()))
try:
await service.boot()
await service.run()
finally:
if redis_client:
await redis_client.aclose()
if __name__ == "__main__":
asyncio.run(main())