"""Admin tool to inspect and modify Redis keys directly.
Uses the bot's live TLS-aware async Redis pool (``ctx.redis``) so it works on
the same database the rest of Stargazer talks to. Gated by the
``UNSANDBOXED_EXEC`` privilege; users in ``config.admin_user_ids`` pass
implicitly.
Without the gate this tool is a privilege-escalation primitive: any caller
could read every other user's encrypted secret blob (``stargazer:user_secrets:*``)
or grant themselves admin via ``stargazer:user_privileges:<self>``.
"""
from __future__ import annotations
import jsonutil as json
import logging
from typing import TYPE_CHECKING
from tools.alter_privileges import has_privilege, PRIVILEGES
if TYPE_CHECKING:
from tool_context import ToolContext
logger = logging.getLogger(__name__)
TOOL_NAME = "redis_admin"
TOOL_DESCRIPTION = (
"Admin tool to get, set, delete, and scan Redis keys on the bot's live "
"Redis instance. Operates against the configured TLS Redis pool. "
"Requires UNSANDBOXED_EXEC privilege (admins implicitly). "
"Without this gate, any caller could read every user's encrypted "
"secrets or grant themselves arbitrary privileges."
)
TOOL_PARAMETERS = {
"type": "object",
"properties": {
"action": {
"type": "string",
"enum": ["get", "set", "delete", "keys"],
"description": (
"'get' returns the value of a key; 'set' writes value to "
"key; 'delete' removes key; 'keys' returns matching keys "
"(use a pattern like 'stargazer:user_privileges:*')."
),
},
"key": {
"type": "string",
"description": ("Key name (or glob pattern when action='keys')."),
},
"value": {
"type": "string",
"description": "Value to set (action='set' only).",
},
},
"required": ["action"],
}
_KEYS_SCAN_LIMIT = 5000
"""Hard cap on returned keys for action='keys' to bound payload size."""
async def _check_unsandboxed_exec(ctx: "ToolContext | None") -> str | None:
"""Authorize the caller for ``redis_admin``, returning an error or None.
Gatekeeper for this privilege-escalation-sensitive tool: without it any
caller could read every other user's encrypted secret blob or grant
themselves admin, so :func:`run` calls this first and aborts on any non-None
return. It pulls ``redis``, ``config``, and ``user_id`` off the context and
consults :func:`tools.alter_privileges.has_privilege` for the
``UNSANDBOXED_EXEC`` privilege (which treats configured admins as implicitly
granted). A denied attempt is logged at warning level with a ``SECURITY:``
prefix; the only Redis touch is whatever ``has_privilege`` performs to read
the privilege records.
Called by :func:`run` in this module; no other callers were found (each
privileged tool keeps its own copy of this helper).
Args:
ctx: Tool execution context, or None. Supplies ``redis``, ``config``,
and ``user_id`` for the privilege check.
Returns:
str | None: ``None`` when the caller is authorized; otherwise a JSON
error string explaining the missing context or denied privilege, ready
to return straight to the model.
"""
if ctx is None:
return json.dumps(
{
"success": False,
"error": (
"Authorization failed: tool context is required to verify "
"privileges for redis_admin."
),
}
)
try:
redis = getattr(ctx, "redis", None)
config = getattr(ctx, "config", None)
user_id = getattr(ctx, "user_id", "") or ""
if not await has_privilege(
redis,
user_id,
PRIVILEGES["UNSANDBOXED_EXEC"],
config,
):
logger.warning(
"SECURITY: User %s attempted redis_admin without "
"UNSANDBOXED_EXEC - DENIED",
user_id,
)
return json.dumps(
{
"success": False,
"error": (
"The user does not have the UNSANDBOXED_EXEC privilege. "
"Ask an admin to grant it with the alter_privileges tool."
),
}
)
except ImportError:
return json.dumps(
{
"success": False,
"error": "Privilege system unavailable.",
}
)
return None
def _decode(value: object) -> object:
"""Best-effort UTF-8 decode of a raw Redis value, with hex fallback.
Normalizes values coming back from the raw Redis client (which may return
``bytes``) into something JSON-serializable before :func:`run` packs them
into a tool response. Bytes are decoded as UTF-8; if that fails the value is
rendered as a hex string rather than dropped, and non-bytes inputs pass
through untouched. Pure in-memory transformation with no I/O.
Called by :func:`run` when formatting ``get`` results and each scanned key
for the ``keys`` action; no other callers were found.
Args:
value: A value read from Redis, typically ``bytes`` or ``str``.
Returns:
object: The decoded ``str`` for valid UTF-8 bytes, a hex ``str`` for
undecodable bytes, or the original value for non-bytes input.
"""
if isinstance(value, bytes):
try:
return value.decode("utf-8")
except UnicodeDecodeError:
return value.hex()
return value
[docs]
async def run(
action: str,
key: str | None = None,
value: str | None = None,
ctx: "ToolContext | None" = None,
**_kwargs,
) -> str:
"""Execute one of the four Redis admin actions against the live database.
Entry point for the ``redis_admin`` tool. It operates directly on the bot's
own TLS-aware async Redis pool (``ctx.redis``) -- the same database the rest
of Stargazer uses -- so it can read or rewrite any key, including encrypted
user secrets and privilege records. For that reason every call is first
gated through :func:`_check_unsandboxed_exec`; a denied or context-less call
returns that helper's error untouched.
Dispatches on ``action``: ``get`` returns a single key's value (decoded via
:func:`_decode`), ``set`` writes a value, ``delete`` removes a key, and
``keys`` cursor-scans (``SCAN``) for keys matching a glob pattern, capping
results at the module-level ``_KEYS_SCAN_LIMIT`` and flagging truncation.
Mutating actions (``set``, ``delete``) emit an info log line tagged with the
invoking ``user_id``; any Redis exception is logged and surfaced as a JSON
error rather than raised.
Dispatched by the tool registry (:meth:`tools.ToolRegistry.call`) after the
loader picks up this module's ``run`` via its ``TOOL_NAME`` metadata; no
direct in-code callers were found.
Args:
action: One of ``get``, ``set``, ``delete``, or ``keys``.
key: Target key name, or a glob pattern when ``action`` is ``keys``.
Required for ``get`` / ``set`` / ``delete``.
value: Value to write; required only when ``action`` is ``set``.
ctx: Tool execution context, supplying the privileged Redis pool,
``config``, and ``user_id``.
Returns:
str: A JSON string with the action result, or a JSON error describing a
failed authorization, missing Redis, missing argument, unknown action,
or underlying Redis exception.
"""
auth_err = await _check_unsandboxed_exec(ctx)
if auth_err:
return auth_err
redis = getattr(ctx, "redis", None) if ctx is not None else None
if redis is None:
return json.dumps(
{
"success": False,
"error": "Redis is not available on this context.",
}
)
user_id = getattr(ctx, "user_id", "") or ""
try:
if action == "get":
if not key:
return json.dumps(
{
"success": False,
"error": "'key' is required for action='get'.",
}
)
raw = await redis.get(key)
return json.dumps(
{
"success": True,
"action": "get",
"key": key,
"value": _decode(raw),
}
)
if action == "set":
if not key:
return json.dumps(
{
"success": False,
"error": "'key' is required for action='set'.",
}
)
if value is None:
return json.dumps(
{
"success": False,
"error": "'value' is required for action='set'.",
}
)
await redis.set(key, value)
logger.info(
"redis_admin set by user=%s key=%s len(value)=%d",
user_id,
key,
len(value),
)
return json.dumps(
{
"success": True,
"action": "set",
"key": key,
"message": f"Key {key} set successfully.",
}
)
if action == "delete":
if not key:
return json.dumps(
{
"success": False,
"error": "'key' is required for action='delete'.",
}
)
removed = await redis.delete(key)
logger.info(
"redis_admin delete by user=%s key=%s removed=%s",
user_id,
key,
removed,
)
return json.dumps(
{
"success": True,
"action": "delete",
"key": key,
"removed": int(removed or 0),
}
)
if action == "keys":
pattern = key or "*"
collected: list[str] = []
cursor: int | bytes = 0
while True:
cursor, batch = await redis.scan(
cursor=cursor,
match=pattern,
count=500,
)
for raw in batch:
decoded = _decode(raw)
if isinstance(decoded, str):
collected.append(decoded)
else:
collected.append(str(decoded))
if len(collected) >= _KEYS_SCAN_LIMIT:
break
if len(collected) >= _KEYS_SCAN_LIMIT:
break
if cursor in (0, b"0", "0"):
break
truncated = len(collected) >= _KEYS_SCAN_LIMIT
return json.dumps(
{
"success": True,
"action": "keys",
"pattern": pattern,
"count": len(collected),
"truncated": truncated,
"keys": collected,
}
)
return json.dumps(
{
"success": False,
"error": f"Unknown action '{action}'. Use get, set, delete, or keys.",
}
)
except Exception as exc:
logger.exception("redis_admin error: %s", exc)
return json.dumps(
{
"success": False,
"error": f"Redis error: {exc}",
}
)