Source code for core.tool_catalog

"""Redis-backed tool catalog: the bridge that lets the inference tier present
tools to the LLM without importing any tool handler module.

The dedicated ``tools`` service owns ``tools/`` and is the *only* process that
imports tool handlers. On boot (and on every ``reload_tools``) it serializes its
``ToolRegistry`` into a **catalog** — each tool's name / description / JSON
``parameters`` / behaviour flags plus the permission map — and publishes it to
Redis. The inference tier's ``RemoteToolRegistry`` loads that catalog and
reproduces the registry's read surface (OpenAI schemas, ``tool_names``,
``repeat_allowed_tools``, ``is_allowed`` …) byte-for-byte, so prompt rendering,
the vector classifier, and the function-calling schema all keep working while
execution is delegated.

Storage (all keys binary-safe; services use a ``decode_responses=False`` client):

* ``sg:tools:catalog:seq``        — monotonic ``INCR`` counter for blob versions.
* ``sg:tools:catalog:v{seq}``     — msgpack blob ``{tools, permissions, schema_hash}``.
* ``sg:tools:catalog:current``    — the active ``seq`` (atomic pointer swap).
* ``sg:pubsub:tools:catalog``     — JSON ``{version, schema_hash}`` published on change.

``schema_hash`` is a content hash of the tool schemas (independent of ``seq``,
which advances on every restart). Inference stamps its loaded ``schema_hash``
into each delegated call; the tools service rejects a mismatch with
``CATALOG_STALE`` so a deploy that changed a schema cannot silently run the model
against the wrong parameters.
"""

from __future__ import annotations

import hashlib
import json
import logging
from typing import Any, Optional, TYPE_CHECKING

import msgpack

if TYPE_CHECKING:  # pragma: no cover - typing only, avoids importing tools/
    from tools import ToolRegistry

logger = logging.getLogger("stargazer.tool_catalog")

CATALOG_SEQ_KEY = "sg:tools:catalog:seq"
CATALOG_BLOB_PREFIX = "sg:tools:catalog:v"
CATALOG_CURRENT_KEY = "sg:tools:catalog:current"
CATALOG_PUBSUB_CHANNEL = "sg:pubsub:tools:catalog"


def _as_int(raw: Any) -> Optional[int]:
    if raw is None:
        return None
    if isinstance(raw, bytes):
        raw = raw.decode()
    try:
        return int(raw)
    except (TypeError, ValueError):
        return None


[docs] def schema_hash(tools: list[dict[str, Any]]) -> str: """Stable content hash over the catalog's tool schemas. Hashes the name / description / parameters / flags of every tool (sorted by name) so two registries with identical tool definitions produce the same hash regardless of load order or restart. Used for the staleness check. """ canonical = [ { "name": t["name"], "description": t.get("description", ""), "parameters": t.get("parameters", {}), "no_background": bool(t.get("no_background", False)), "allow_repeat": bool(t.get("allow_repeat", False)), } for t in sorted(tools, key=lambda t: t["name"]) ] blob = json.dumps(canonical, sort_keys=True, ensure_ascii=False).encode("utf-8") return hashlib.sha256(blob).hexdigest()
[docs] def build_catalog_payload(registry: "ToolRegistry") -> dict[str, Any]: """Serialize a live ``ToolRegistry`` into a catalog dict (pure, no I/O). Captures everything the inference tier needs to *present* tools without the handler code: each tool's name / description / JSON ``parameters`` / the ``no_background`` + ``allow_repeat`` flags, plus the per-tool permission map. """ tools = [ { "name": td.name, "description": td.description, "parameters": td.parameters, "no_background": bool(td.no_background), "allow_repeat": bool(td.allow_repeat), } for td in registry.list_tools() ] permissions = {k: list(v) for k, v in getattr(registry, "_permissions", {}).items()} return { "tools": tools, "permissions": permissions, "schema_hash": schema_hash(tools), }
[docs] def openai_tool_dict(entry: dict[str, Any]) -> dict[str, Any]: """Build an OpenAI function-calling tool dict from a catalog entry. Mirrors ``ToolRegistry._ensure_cache_unlocked`` exactly so the schema the LLM sees is identical whether sourced from the live registry or the catalog. """ return { "type": "function", "function": { "name": entry["name"], "description": entry.get("description", ""), "parameters": entry.get("parameters", {}), }, }
[docs] async def publish_catalog( redis: Any, registry: "ToolRegistry", *, prune_keep: int = 3, ) -> tuple[int, str]: """Publish *registry*'s catalog to Redis under a fresh version and announce it. Writes the new blob first, then atomically flips ``current`` to it, then publishes a pubsub notification so live ``RemoteToolRegistry`` readers hot-reload. Old blob versions beyond *prune_keep* are best-effort deleted. Returns ``(seq, schema_hash)`` of the published catalog. """ payload = build_catalog_payload(registry) blob = msgpack.packb(payload, use_bin_type=True) seq = int(await redis.incr(CATALOG_SEQ_KEY)) blob_key = f"{CATALOG_BLOB_PREFIX}{seq}" await redis.set(blob_key, blob) await redis.set(CATALOG_CURRENT_KEY, str(seq).encode()) try: await redis.publish( CATALOG_PUBSUB_CHANNEL, json.dumps({"version": seq, "schema_hash": payload["schema_hash"]}), ) except Exception: logger.debug("catalog pubsub publish failed", exc_info=True) # Best-effort prune of superseded blobs to avoid unbounded growth. for old in range(seq - prune_keep, 0, -1): try: deleted = await redis.delete(f"{CATALOG_BLOB_PREFIX}{old}") if not deleted: break # nothing older remains except Exception: break logger.info( "Published tool catalog", extra={"version": seq, "tool_count": len(payload["tools"]), "schema_hash": payload["schema_hash"][:12]}, ) return seq, payload["schema_hash"]
[docs] async def load_catalog(redis: Any) -> Optional[dict[str, Any]]: """Load the current catalog blob from Redis, or ``None`` if not published yet. Returns the decoded ``{tools, permissions, schema_hash, version}`` dict. """ seq = _as_int(await redis.get(CATALOG_CURRENT_KEY)) if seq is None: return None blob = await redis.get(f"{CATALOG_BLOB_PREFIX}{seq}") if blob is None: return None payload = msgpack.unpackb(blob, raw=False) payload["version"] = seq return payload