Source code for tools.redis_admin

"""Admin tool to inspect and modify Redis keys directly.

Uses the bot's live TLS-aware async Redis pool (``ctx.redis``) so it works on
the same database the rest of Stargazer talks to. Gated by the
``UNSANDBOXED_EXEC`` privilege; users in ``config.admin_user_ids`` pass
implicitly.

Without the gate this tool is a privilege-escalation primitive: any caller
could read every other user's encrypted secret blob (``stargazer:user_secrets:*``)
or grant themselves admin via ``stargazer:user_privileges:<self>``.
"""

from __future__ import annotations

import jsonutil as json
import logging
from typing import TYPE_CHECKING

from tools.alter_privileges import has_privilege, PRIVILEGES

if TYPE_CHECKING:
    from tool_context import ToolContext

logger = logging.getLogger(__name__)

TOOL_NAME = "redis_admin"
TOOL_DESCRIPTION = (
    "Admin tool to get, set, delete, and scan Redis keys on the bot's live "
    "Redis instance. Operates against the configured TLS Redis pool. "
    "Requires UNSANDBOXED_EXEC privilege (admins implicitly). "
    "Without this gate, any caller could read every user's encrypted "
    "secrets or grant themselves arbitrary privileges."
)
TOOL_PARAMETERS = {
    "type": "object",
    "properties": {
        "action": {
            "type": "string",
            "enum": ["get", "set", "delete", "keys"],
            "description": (
                "'get' returns the value of a key; 'set' writes value to "
                "key; 'delete' removes key; 'keys' returns matching keys "
                "(use a pattern like 'stargazer:user_privileges:*')."
            ),
        },
        "key": {
            "type": "string",
            "description": ("Key name (or glob pattern when action='keys')."),
        },
        "value": {
            "type": "string",
            "description": "Value to set (action='set' only).",
        },
    },
    "required": ["action"],
}


_KEYS_SCAN_LIMIT = 5000
"""Hard cap on returned keys for action='keys' to bound payload size."""


async def _check_unsandboxed_exec(ctx: "ToolContext | None") -> str | None:
    """Authorize the caller for ``redis_admin``, returning an error or None.

    Gatekeeper for this privilege-escalation-sensitive tool: without it any
    caller could read every other user's encrypted secret blob or grant
    themselves admin, so :func:`run` calls this first and aborts on any non-None
    return. It pulls ``redis``, ``config``, and ``user_id`` off the context and
    consults :func:`tools.alter_privileges.has_privilege` for the
    ``UNSANDBOXED_EXEC`` privilege (which treats configured admins as implicitly
    granted). A denied attempt is logged at warning level with a ``SECURITY:``
    prefix; the only Redis touch is whatever ``has_privilege`` performs to read
    the privilege records.

    Called by :func:`run` in this module; no other callers were found (each
    privileged tool keeps its own copy of this helper).

    Args:
        ctx: Tool execution context, or None. Supplies ``redis``, ``config``,
            and ``user_id`` for the privilege check.

    Returns:
        str | None: ``None`` when the caller is authorized; otherwise a JSON
        error string explaining the missing context or denied privilege, ready
        to return straight to the model.
    """
    if ctx is None:
        return json.dumps(
            {
                "success": False,
                "error": (
                    "Authorization failed: tool context is required to verify "
                    "privileges for redis_admin."
                ),
            }
        )
    try:
        redis = getattr(ctx, "redis", None)
        config = getattr(ctx, "config", None)
        user_id = getattr(ctx, "user_id", "") or ""

        if not await has_privilege(
            redis,
            user_id,
            PRIVILEGES["UNSANDBOXED_EXEC"],
            config,
        ):
            logger.warning(
                "SECURITY: User %s attempted redis_admin without "
                "UNSANDBOXED_EXEC - DENIED",
                user_id,
            )
            return json.dumps(
                {
                    "success": False,
                    "error": (
                        "The user does not have the UNSANDBOXED_EXEC privilege. "
                        "Ask an admin to grant it with the alter_privileges tool."
                    ),
                }
            )
    except ImportError:
        return json.dumps(
            {
                "success": False,
                "error": "Privilege system unavailable.",
            }
        )
    return None


def _decode(value: object) -> object:
    """Best-effort UTF-8 decode of a raw Redis value, with hex fallback.

    Normalizes values coming back from the raw Redis client (which may return
    ``bytes``) into something JSON-serializable before :func:`run` packs them
    into a tool response. Bytes are decoded as UTF-8; if that fails the value is
    rendered as a hex string rather than dropped, and non-bytes inputs pass
    through untouched. Pure in-memory transformation with no I/O.

    Called by :func:`run` when formatting ``get`` results and each scanned key
    for the ``keys`` action; no other callers were found.

    Args:
        value: A value read from Redis, typically ``bytes`` or ``str``.

    Returns:
        object: The decoded ``str`` for valid UTF-8 bytes, a hex ``str`` for
        undecodable bytes, or the original value for non-bytes input.
    """
    if isinstance(value, bytes):
        try:
            return value.decode("utf-8")
        except UnicodeDecodeError:
            return value.hex()
    return value


[docs] async def run( action: str, key: str | None = None, value: str | None = None, ctx: "ToolContext | None" = None, **_kwargs, ) -> str: """Execute one of the four Redis admin actions against the live database. Entry point for the ``redis_admin`` tool. It operates directly on the bot's own TLS-aware async Redis pool (``ctx.redis``) -- the same database the rest of Stargazer uses -- so it can read or rewrite any key, including encrypted user secrets and privilege records. For that reason every call is first gated through :func:`_check_unsandboxed_exec`; a denied or context-less call returns that helper's error untouched. Dispatches on ``action``: ``get`` returns a single key's value (decoded via :func:`_decode`), ``set`` writes a value, ``delete`` removes a key, and ``keys`` cursor-scans (``SCAN``) for keys matching a glob pattern, capping results at the module-level ``_KEYS_SCAN_LIMIT`` and flagging truncation. Mutating actions (``set``, ``delete``) emit an info log line tagged with the invoking ``user_id``; any Redis exception is logged and surfaced as a JSON error rather than raised. Dispatched by the tool registry (:meth:`tools.ToolRegistry.call`) after the loader picks up this module's ``run`` via its ``TOOL_NAME`` metadata; no direct in-code callers were found. Args: action: One of ``get``, ``set``, ``delete``, or ``keys``. key: Target key name, or a glob pattern when ``action`` is ``keys``. Required for ``get`` / ``set`` / ``delete``. value: Value to write; required only when ``action`` is ``set``. ctx: Tool execution context, supplying the privileged Redis pool, ``config``, and ``user_id``. Returns: str: A JSON string with the action result, or a JSON error describing a failed authorization, missing Redis, missing argument, unknown action, or underlying Redis exception. """ auth_err = await _check_unsandboxed_exec(ctx) if auth_err: return auth_err redis = getattr(ctx, "redis", None) if ctx is not None else None if redis is None: return json.dumps( { "success": False, "error": "Redis is not available on this context.", } ) user_id = getattr(ctx, "user_id", "") or "" try: if action == "get": if not key: return json.dumps( { "success": False, "error": "'key' is required for action='get'.", } ) raw = await redis.get(key) return json.dumps( { "success": True, "action": "get", "key": key, "value": _decode(raw), } ) if action == "set": if not key: return json.dumps( { "success": False, "error": "'key' is required for action='set'.", } ) if value is None: return json.dumps( { "success": False, "error": "'value' is required for action='set'.", } ) await redis.set(key, value) logger.info( "redis_admin set by user=%s key=%s len(value)=%d", user_id, key, len(value), ) return json.dumps( { "success": True, "action": "set", "key": key, "message": f"Key {key} set successfully.", } ) if action == "delete": if not key: return json.dumps( { "success": False, "error": "'key' is required for action='delete'.", } ) removed = await redis.delete(key) logger.info( "redis_admin delete by user=%s key=%s removed=%s", user_id, key, removed, ) return json.dumps( { "success": True, "action": "delete", "key": key, "removed": int(removed or 0), } ) if action == "keys": pattern = key or "*" collected: list[str] = [] cursor: int | bytes = 0 while True: cursor, batch = await redis.scan( cursor=cursor, match=pattern, count=500, ) for raw in batch: decoded = _decode(raw) if isinstance(decoded, str): collected.append(decoded) else: collected.append(str(decoded)) if len(collected) >= _KEYS_SCAN_LIMIT: break if len(collected) >= _KEYS_SCAN_LIMIT: break if cursor in (0, b"0", "0"): break truncated = len(collected) >= _KEYS_SCAN_LIMIT return json.dumps( { "success": True, "action": "keys", "pattern": pattern, "count": len(collected), "truncated": truncated, "keys": collected, } ) return json.dumps( { "success": False, "error": f"Unknown action '{action}'. Use get, set, delete, or keys.", } ) except Exception as exc: logger.exception("redis_admin error: %s", exc) return json.dumps( { "success": False, "error": f"Redis error: {exc}", } )