Source code for tools.alter_privileges

"""User privilege management with a 64-bit bitmask stored in Redis.

Each bit is a named privilege. The canonical registry is :data:`PRIVILEGES`
(name → bit index, 0–63). Admins (``config.admin_user_ids``) implicitly have
all bits set. Unknown set bits are reported as ``bit_N``.

Other modules check privileges with::

    from tools.alter_privileges import has_privilege, PRIVILEGES

Tool-facing descriptions below are built from :data:`PRIVILEGES` so they stay
in sync when bits are added or renamed.
"""

from __future__ import annotations

import jsonutil as json
import logging
from typing import Any, TYPE_CHECKING

if TYPE_CHECKING:
    from tool_context import ToolContext

logger = logging.getLogger(__name__)

REDIS_KEY_PREFIX = "stargazer:user_privileges"

ALL_BITS = (1 << 64) - 1

PRIVILEGES: dict[str, int] = {
    "ALTER_PRIVILEGES": 0,
    "CORE_MEMORY": 1,
    "UNSANDBOXED_EXEC": 2,
    "SQLITE_CROSS_USER": 3,
    "DUMP_NOTES": 4,
    "DUMP_GOALS": 5,
    "ANCHOR_ADMIN": 6,
    "SUBAGENT_ACCESS": 7,
    "WEB_SEARCH": 8,
    "LONG_TERM_GOALS": 9,
    "READ_DM": 10,
    "CHAT_ANALYTICS": 11,
    "BYPASS_RATELIMIT": 12,
    "READ_TOOL_CODE": 13,
    "SHADOW_BAN_ADMIN": 14,
    "CTX_MANAGE": 15,
    "OPUS_ACCESS": 16,
    "GUILD_ADMIN": 17,
    "CHANNEL_ADMIN": 18,
    "GAME_ADMIN": 19,
    "FILE_OPS_ADMIN": 20,
    "VULTR_ADMIN": 21,
    "ENTRAINMENT_ADMIN": 22,
    "STARGAZER_USE": 63,
}

# ------------------------------------------------------------------
# Per-bit resolution modes
# ------------------------------------------------------------------
# 0 = NORMAL:    channel > guild > global  (most specific scope wins)
# 1 = INVERTED:  global > guild > channel  (most general scope wins)
# 2 = DANGEROUS: global only, scoped masks completely ignored
# 3 = ANY:       global OR guild OR channel — granted if set at any scope

RES_NORMAL = 0
RES_INVERTED = 1
RES_DANGEROUS = 2
RES_ANY = 3

BIT_RESOLUTION_MODE: dict[int, int] = {
    PRIVILEGES["ALTER_PRIVILEGES"]: RES_DANGEROUS,  # 0  — self-escalation
    PRIVILEGES["UNSANDBOXED_EXEC"]: RES_DANGEROUS,  # 2  — code execution
    PRIVILEGES["SHADOW_BAN_ADMIN"]: RES_DANGEROUS,  # 14 — lift own shadow ban
    PRIVILEGES["GUILD_ADMIN"]: RES_DANGEROUS,  # 17 — scoped self-escalation
    PRIVILEGES["CHANNEL_ADMIN"]: RES_DANGEROUS,  # 18 — scoped self-escalation
    PRIVILEGES["FILE_OPS_ADMIN"]: RES_DANGEROUS,  # 20 — filesystem access
    PRIVILEGES["VULTR_ADMIN"]: RES_DANGEROUS,  # 21 — vultr access
    PRIVILEGES[
        "STARGAZER_USE"
    ]: RES_NORMAL,  # 63 — access gate: channel > guild > global
    # All unlisted bits default to RES_NORMAL
}


def _bit_mode(bit: int) -> int:
    """Return the scope-resolution mode for a privilege bit.

    Looks the bit up in :data:`BIT_RESOLUTION_MODE` and falls back to
    ``RES_NORMAL`` (channel beats guild beats global) for any bit not
    explicitly listed, so newly added privileges default to the safe
    most-specific-scope-wins behaviour. This is the single source of truth
    consulted by :func:`resolve_privilege_bit` and
    :func:`_has_scoped_privilege_single` when deciding how the three scope
    masks combine.

    Args:
        bit: Privilege bit index (0-63).

    Returns:
        One of ``RES_NORMAL``, ``RES_INVERTED``, ``RES_DANGEROUS``, or
        ``RES_ANY``.
    """
    return BIT_RESOLUTION_MODE.get(bit, RES_NORMAL)


# Derived for backwards compatibility and scoped-tool write guards
DANGEROUS_BITS: frozenset[int] = frozenset(
    bit for bit, mode in BIT_RESOLUTION_MODE.items() if mode == RES_DANGEROUS
)

# Scoped privilege Redis key prefixes
GUILD_KEY_PREFIX = "stargazer:guild_privileges"
CHANNEL_KEY_PREFIX = "stargazer:channel_privileges"

# Opus hardcoded guild
_OPUS_HARDCODE_GUILD = "1405695643913027737"

_REVERSE: dict[int, str] = {v: k for k, v in PRIVILEGES.items()}


def _privileges_catalog_text() -> str:
    """Render the privilege registry as a compact ``name=bit`` catalog string.

    Sorts :data:`PRIVILEGES` by bit index and joins each entry as
    ``NAME=index``, producing a stable, human-readable list. This text is
    embedded at import time into the module-level tool/parameter description
    constants (``_ALTER_PRIVILEGES_DESCRIPTION``, ``_PRIVILEGES_PARAM_DESCRIPTION``,
    the guild/channel descriptions) so the LLM-facing tool schemas stay in sync
    with the registry whenever bits are added or renamed. Called only within
    this module while those constants are being built.

    Returns:
        A comma-separated string such as ``ALTER_PRIVILEGES=0, CORE_MEMORY=1, ...``.
    """
    pairs = sorted(PRIVILEGES.items(), key=lambda kv: kv[1])
    return ", ".join(f"{name}={bit}" for name, bit in pairs)


def _redis_key(user_id: str) -> str:
    """Build the Redis key that stores a user's global privilege bitmask.

    Namespaces the user ID under :data:`REDIS_KEY_PREFIX`
    (``stargazer:user_privileges``); the value at this key is the decimal
    string of the 64-bit mask. This is the canonical key for the *global*
    scope, distinct from the guild/channel scoped keys built by
    :func:`_scoped_redis_key`. Used throughout this module by
    :func:`get_user_privileges`, :func:`_alter_privileges`,
    :func:`_audit_privileges`, :func:`_stargazer_full_revoke`, and
    :func:`_purge_all_privileges`, and imported directly by
    ``tools/stargazer_ban.py`` and ``tools/stargazer_shadowban.py`` to read
    and rewrite the same global mask.

    Args:
        user_id: Platform user ID.

    Returns:
        The fully qualified Redis key string.
    """
    return f"{REDIS_KEY_PREFIX}:{user_id}"


def _is_admin(user_id: str, config: Any) -> bool:
    """Report whether a user is a hard-coded admin in the bot config.

    Admins are listed in ``config.admin_user_ids`` and implicitly hold every
    privilege bit, so this short-circuits the bitmask machinery: callers treat
    a True result as "all 64 bits set". Reads only the in-memory config object
    (no Redis or network) and returns False when ``config`` is missing. Used
    here by :func:`get_user_privileges`, :func:`_has_scoped_privilege_single`,
    :func:`_can_alter_scope`, :func:`_get_privileges`, :func:`_audit_privileges`,
    and the purge/revoke wrappers to refuse demoting an admin; also imported
    directly by ``tools/stargazer_ban.py``, ``tools/stargazer_shadowban.py``,
    and ``tools/parallax_telemetry.py``.

    Args:
        user_id: Platform user ID to check.
        config: Bot configuration object exposing ``admin_user_ids``; may be
            ``None``.

    Returns:
        True if ``user_id`` appears in the configured admin list, else False.
    """
    if config is None:
        return False
    admin_ids = getattr(config, "admin_user_ids", None) or []
    return user_id in admin_ids


[docs] async def get_user_privileges(redis: Any, user_id: str, config: Any = None) -> int: """Return the privilege bitmask for *user_id*. Admins get all 64 bits. Returns ``0`` when the user has no stored privileges and is not an admin. """ if _is_admin(user_id, config): return ALL_BITS if redis is None: return 0 raw = await redis.get(_redis_key(user_id)) if raw is None: return 0 return int(raw)
[docs] async def has_privilege( redis: Any, user_id: str, bit: int, config: Any = None, user_aliases: list[str] | None = None, ) -> bool: """Report whether a user (or any alias) has one privilege bit set. Checks the single privilege flag ``bit`` for ``user_id``: for each alias in ``user_aliases`` (defaulting to just ``user_id``) it strips any ``platform:`` prefix, loads that identity's bitmask via :func:`get_user_privileges`, and returns ``True`` on the first alias whose mask has ``bit`` set. Read-only against Redis. This is the shared gate the tool layer uses to authorize privileged actions -- e.g. :func:`tools.prowlarr_search._check_web_search_access` and the other per-tool access checks pass a ``PRIVILEGES[...]`` constant as ``bit``. Args: redis: Async Redis client used to read each identity's privilege mask. user_id: The primary user id to check (used when ``user_aliases`` is empty). bit: Zero-based bit index of the privilege to test (``1 << bit``). config: Optional config forwarded to :func:`get_user_privileges` (e.g. for owner/superuser short-circuits). user_aliases: Optional list of equivalent identities (cross-platform links); any one of them holding the bit grants access. Returns: bool: ``True`` if any checked identity has ``bit`` enabled, else ``False``. """ aliases = user_aliases if user_aliases else [user_id] for alias in aliases: p_uid = alias.split(":", 1)[1] if ":" in alias else alias mask = await get_user_privileges(redis, p_uid, config) if bool(mask & (1 << bit)): return True return False
# ------------------------------------------------------------------ # Scoped privilege resolution (global < guild < channel) # ------------------------------------------------------------------ # Placeholder used in Redis keys when no guild_id is available (e.g. Matrix rooms, DMs). _NO_GUILD_SENTINEL = "__global__" def _scoped_redis_key( scope: str, guild_id: str, channel_id: str | None, user_id: str, ) -> str: """Build Redis key for guild/channel scoped masks. When ``guild_id`` is empty (e.g. Matrix rooms or Discord DMs) the sentinel ``'__global__'`` is used so that channel-level overrides can still be stored and resolved without a guild namespace. """ effective_guild = guild_id or _NO_GUILD_SENTINEL if scope == "channel" and channel_id: return f"{CHANNEL_KEY_PREFIX}:{effective_guild}:{channel_id}:{user_id}" return f"{GUILD_KEY_PREFIX}:{effective_guild}:{user_id}" async def _get_scoped_mask( redis: Any, scope: str, guild_id: str, channel_id: str | None, user_id: str, ) -> int | None: """Fetch a user's guild- or channel-scoped privilege mask from Redis. Reads the key produced by :func:`_scoped_redis_key` and distinguishes an *absent* scoped override (no key) from an explicit zero mask: a missing key returns ``None`` so the resolver can fall back to a broader scope, whereas a stored ``0`` is a real value. Performs a single Redis GET and never raises on a missing key. Called internally by :func:`_has_scoped_privilege_single`, :func:`_alter_guild_privileges`, and :func:`_alter_channel_privileges`, and imported by ``prompt_context.py``, ``message_processor/proxy_status_commands.py``, and ``tools/privilege_capsh.py`` to inspect effective scoped grants. Args: redis: Async Redis client, or ``None`` (treated as no data). scope: Either ``'guild'`` or ``'channel'``. guild_id: Guild/server ID (or empty for the global sentinel). channel_id: Channel ID; required only for channel scope. user_id: Platform user ID. Returns: The stored bitmask as an ``int``, or ``None`` when no scoped key exists. """ if redis is None: return None key = _scoped_redis_key(scope, guild_id, channel_id, user_id) raw = await redis.get(key) if raw is None: return None return int(raw) async def _set_scoped_mask( redis: Any, scope: str, guild_id: str, channel_id: str | None, user_id: str, mask: int, ) -> None: """Persist a guild- or channel-scoped privilege mask to Redis. Writes the decimal string of ``mask`` to the key built by :func:`_scoped_redis_key`, creating or overwriting the scoped override for the user. A ``None`` Redis client is a silent no-op. Called by :func:`_alter_guild_privileges` and :func:`_alter_channel_privileges` after they compute the new mask; the global scope writes directly via :func:`_redis_key` instead of this helper. Args: redis: Async Redis client, or ``None`` (no-op). scope: Either ``'guild'`` or ``'channel'``. guild_id: Guild/server ID (or empty for the global sentinel). channel_id: Channel ID; required only for channel scope. user_id: Platform user ID. mask: The bitmask to store. """ if redis is None: return key = _scoped_redis_key(scope, guild_id, channel_id, user_id) await redis.set(key, str(mask))
[docs] def resolve_privilege_bit( bit: int, global_mask: int, guild_mask: int | None, channel_mask: int | None, ) -> bool: """Resolve a single privilege bit across all three scope masks. Resolution modes: 0 (NORMAL): channel > guild > global — most specific scope wins. 1 (INVERTED): global > guild > channel — most general scope wins. 2 (DANGEROUS): global only — scoped masks ignored. 3 (ANY): granted if set at any scope (global OR guild OR channel). """ mode = _bit_mode(bit) flag = 1 << bit if mode == RES_DANGEROUS: return bool(global_mask & flag) if mode == RES_INVERTED: # Most general wins: global, then guild, then channel return bool(global_mask & flag) # (guild/channel could restrict in future, but can't escalate) if mode == RES_ANY: # Granted if the bit is set at ANY scope — global, guild, or channel. # This is the access-gate pattern: guild or channel whitelisting is # sufficient, but a global ban (bit cleared at global AND no scoped # grant) still blocks the user. if bool(global_mask & flag): return True if guild_mask is not None and bool(guild_mask & flag): return True if channel_mask is not None and bool(channel_mask & flag): return True return False # RES_NORMAL: most specific scope wins (channel > guild > global) if channel_mask is not None: return bool(channel_mask & flag) if guild_mask is not None: return bool(guild_mask & flag) return bool(global_mask & flag)
[docs] async def has_scoped_privilege( redis: Any, user_id: str, bit: int, config: Any = None, guild_id: str | None = None, channel_id: str | None = None, user_aliases: list[str] | None = None, ) -> bool: """Check a scoped privilege bit across all of a user's linked aliases. The primary public scope-aware access check: a user may appear under several platform identities (cross-platform alias links), so this strips any ``platform:`` prefix from each alias and grants access if *any* alias passes :func:`_has_scoped_privilege_single`, which applies the bit's per-scope resolution mode against the global, guild, and channel masks read from Redis. Short-circuits on the first alias that succeeds. Used pervasively as the gatekeeper for feature and tool access — by ``message_processor/processor.py`` and ``generate_and_send.py``, ``feature_toggles.py``, ``platforms/discord.py``, ``game_ui/action_handler.py``, and tools such as ``set_user_timezone`` and ``context_window_tools`` — typically for the ``STARGAZER_USE`` access gate and admin checks. Args: redis: Async Redis client used to read the scoped masks. user_id: Platform user ID (used when ``user_aliases`` is not given). bit: Privilege bit index (0-63) to check. config: Bot config for the admin short-circuit; optional. guild_id: Guild/server ID for scoped resolution; optional. channel_id: Channel ID for the most-specific scope; optional. user_aliases: Full alias list to test; defaults to ``[user_id]``. Returns: True if any alias holds the bit under its resolution mode, else False. """ aliases = user_aliases if user_aliases else [user_id] for alias in aliases: p_uid = alias.split(":", 1)[1] if ":" in alias else alias if await _has_scoped_privilege_single(redis, p_uid, bit, config, guild_id, channel_id): return True return False
async def _has_scoped_privilege_single( redis: Any, user_id: str, bit: int, config: Any = None, guild_id: str | None = None, channel_id: str | None = None, ) -> bool: """Check privilege with per-bit resolution mode. Resolution modes (see :data:`BIT_RESOLUTION_MODE`): 0 (NORMAL): channel > guild > global — most specific scope wins. 1 (INVERTED): global > guild > channel — most general scope wins. 2 (DANGEROUS): global only — scoped masks ignored. 3 (ANY): granted if set at any scope. When ``guild_id`` is absent (Matrix rooms, Discord DMs) channel-level overrides are still resolved via the ``_NO_GUILD_SENTINEL`` placeholder, allowing per-room whitelisting/blacklisting on platforms without guild IDs. OPUS_ACCESS has a hardcoded guild shortcircuit for guild ``1405695643913027737``. """ # Admins always pass if _is_admin(user_id, config): return True # Opus guild hardcode if bit == PRIVILEGES["OPUS_ACCESS"] and guild_id == _OPUS_HARDCODE_GUILD: return True global_mask = await get_user_privileges(redis, user_id, config) guild_mask: int | None = None channel_mask: int | None = None mode = _bit_mode(bit) if mode != RES_DANGEROUS: # Always attempt guild lookup if a guild_id is present. if guild_id: guild_mask = await _get_scoped_mask(redis, "guild", guild_id, None, user_id) # Channel lookup: works even if guild_id is empty — the sentinel is used # in the Redis key so Matrix rooms / DMs get channel-level resolution. if channel_id: ch_guild = guild_id or "" channel_mask = await _get_scoped_mask( redis, "channel", ch_guild, channel_id, user_id, ) result = resolve_privilege_bit(bit, global_mask, guild_mask, channel_mask) bit_name = _REVERSE.get(bit, f"bit_{bit}") logger.debug( "[privilege] user=%s bit=%s(%d) guild=%r channel=%r " "global=0x%x guild_mask=%s channel_mask=%s -> %s", user_id, bit_name, bit, guild_id or "(none)", channel_id or "(none)", global_mask, hex(guild_mask) if guild_mask is not None else "None", hex(channel_mask) if channel_mask is not None else "None", result, ) return result async def _can_alter_scope( redis: Any, user_id: str, scope: str, config: Any = None, guild_id: str | None = None, ) -> bool: """Check whether *user_id* can alter masks at the given scope. - Global scope: requires ALTER_PRIVILEGES (bit 0) - Guild scope: requires GUILD_ADMIN (17) at guild level, or ALTER_PRIVILEGES - Channel scope: requires CHANNEL_ADMIN (18), or GUILD_ADMIN (17) at guild, or ALTER_PRIVILEGES """ if _is_admin(user_id, config): return True if await has_privilege(redis, user_id, PRIVILEGES["ALTER_PRIVILEGES"], config): return True if scope == "global": return False if scope == "guild": if guild_id: gu_mask = await _get_scoped_mask(redis, "guild", guild_id, None, user_id) if gu_mask is not None and (gu_mask & (1 << PRIVILEGES["GUILD_ADMIN"])): return True return False if scope == "channel": # GUILD_ADMIN implies CHANNEL_ADMIN if guild_id: gu_mask = await _get_scoped_mask(redis, "guild", guild_id, None, user_id) if gu_mask is not None and (gu_mask & (1 << PRIVILEGES["GUILD_ADMIN"])): return True # Direct CHANNEL_ADMIN check at channel scope handled by caller return False return False def _resolve_privileges(names: list) -> tuple[int, list[str]]: """Convert a list of privilege names/bit numbers into a bitmask. Returns ``(bitmask, list_of_bad_names)``. """ mask = 0 bad: list[str] = [] for entry in names: if isinstance(entry, int) or (isinstance(entry, str) and entry.isdigit()): bit = int(entry) if 0 <= bit < 64: mask |= 1 << bit else: bad.append(str(entry)) else: name = str(entry).upper() bit = PRIVILEGES.get(name) if bit is not None: mask |= 1 << bit else: bad.append(str(entry)) return mask, bad def _mask_to_names(mask: int) -> list[str]: """Decode a privilege bitmask into its human-readable privilege names. Walks all 64 bit positions and, for each set bit, looks the index up in the reverse registry :data:`_REVERSE`, falling back to ``bit_N`` for indices not in :data:`PRIVILEGES` so unknown grants are still reported rather than dropped. The inverse of :func:`_resolve_privileges`. Called internally by the tool handlers (:func:`_alter_privileges`, :func:`_get_privileges`, :func:`_audit_privileges`, and the guild/channel alterers) to build the ``applied`` / ``denied`` / ``privileges`` fields of their JSON responses. Args: mask: A 64-bit privilege bitmask. Returns: Privilege names for every set bit, in ascending bit order. """ names: list[str] = [] for bit in range(64): if mask & (1 << bit): name = _REVERSE.get(bit, f"bit_{bit}") names.append(name) return names # ------------------------------------------------------------------ # Tool handlers # ------------------------------------------------------------------ async def _alter_privileges( target_user_id: str, action: str, privileges: list, ctx: ToolContext | None = None, ) -> str: """Grant or revoke bits on *target_user_id*'s stored mask. Caller must have ``ALTER_PRIVILEGES``; only bits present in the caller's mask can be changed on the target (you cannot grant what you do not have). """ user_id = getattr(ctx, "user_id", "") or "" redis = getattr(ctx, "redis", None) config = getattr(ctx, "config", None) if redis is None: return json.dumps({"success": False, "error": "Redis not available"}) caller_mask = await get_user_privileges(redis, user_id, config) if not (caller_mask & (1 << PRIVILEGES["ALTER_PRIVILEGES"])): return json.dumps( { "success": False, "error": "You do not have the ALTER_PRIVILEGES privilege.", } ) requested_mask, bad_names = _resolve_privileges(privileges) if requested_mask == 0 and not bad_names: return json.dumps({"success": False, "error": "No valid privileges specified."}) allowed_mask = requested_mask & caller_mask denied_mask = requested_mask & ~caller_mask target_mask = await get_user_privileges(redis, target_user_id, config=None) action = action.lower() if action == "grant": new_mask = target_mask | allowed_mask elif action == "revoke": new_mask = target_mask & ~allowed_mask else: return json.dumps( { "success": False, "error": f"Unknown action '{action}'. Use 'grant' or 'revoke'.", } ) await redis.set(_redis_key(target_user_id), str(new_mask)) result: dict[str, Any] = { "success": True, "action": action, "target_user_id": target_user_id, "previous": hex(target_mask), "new": hex(new_mask), "applied": _mask_to_names(allowed_mask), } if denied_mask: result["denied"] = _mask_to_names(denied_mask) result["denied_reason"] = "You do not have these privileges yourself." if bad_names: result["unrecognized"] = bad_names logger.info( "User %s %sd privileges for %s: %s -> %s (applied=%s denied=%s)", user_id, action, target_user_id, hex(target_mask), hex(new_mask), hex(allowed_mask), hex(denied_mask), ) return json.dumps(result, indent=2) async def _get_privileges( target_user_id: str = "", ctx: ToolContext | None = None, ) -> str: """Return hex mask, decoded names, admin flag, and ``available_privilege_names``. Defaults to the caller. Reading another user requires ``ALTER_PRIVILEGES``. """ user_id = getattr(ctx, "user_id", "") or "" redis = getattr(ctx, "redis", None) config = getattr(ctx, "config", None) if redis is None: return json.dumps({"success": False, "error": "Redis not available"}) target = (target_user_id or "").strip() or user_id if target != user_id: caller_mask = await get_user_privileges(redis, user_id, config) if not (caller_mask & (1 << PRIVILEGES["ALTER_PRIVILEGES"])): return json.dumps( { "success": False, "error": ( "Reading another user's privileges requires " "ALTER_PRIVILEGES." ), } ) mask = await get_user_privileges(redis, target, config) return json.dumps( { "success": True, "user_id": target, "mask_hex": hex(mask), "mask_decimal": mask, "privileges": _mask_to_names(mask), "is_admin": _is_admin(target, config), "available_privilege_names": list(PRIVILEGES.keys()), }, indent=2, ) async def _audit_privileges( ctx: ToolContext | None = None, ) -> str: """Report every user holding a non-zero global privilege mask. Backs the ``audit_privileges`` LLM tool: after confirming the caller holds ``ALTER_PRIVILEGES`` (admins implicitly pass), it SCANs Redis for all ``stargazer:user_privileges:*`` keys, decodes each non-zero mask via :func:`_mask_to_names`, then merges in the configured ``admin_user_ids`` so admins appear even without a stored entry (shown with all bits). Reads Redis and the bot config from the :class:`ToolContext`; performs no writes. Not called directly in code — it is dispatched by name through this module's ``TOOLS`` registry, which ``tool_loader.py`` discovers via ``getattr(module, "TOOLS")``. Args: ctx: Tool context supplying ``user_id``, ``redis``, and ``config``. Returns: A JSON string with ``total_privileged_users`` and a ``users`` map of user ID to mask hex, decoded privilege names, and admin flag; or an error object when Redis is unavailable or the caller lacks ``ALTER_PRIVILEGES``. """ user_id = getattr(ctx, "user_id", "") or "" redis = getattr(ctx, "redis", None) config = getattr(ctx, "config", None) if redis is None: return json.dumps({"success": False, "error": "Redis not available"}) # Caller must have ALTER_PRIVILEGES to see the full audit. caller_mask = await get_user_privileges(redis, user_id, config) if not (caller_mask & (1 << PRIVILEGES["ALTER_PRIVILEGES"])): return json.dumps( { "success": False, "error": "You do not have the ALTER_PRIVILEGES privilege.", } ) users: dict[str, Any] = {} # 1. Scan Redis for every stored privilege bitmask. prefix = f"{REDIS_KEY_PREFIX}:" cursor = b"0" while True: cursor, keys = await redis.scan(cursor, match=f"{prefix}*", count=200) for key in keys: key_str = key.decode() if isinstance(key, bytes) else key uid = key_str[len(prefix) :] raw = await redis.get(key) if raw is None: continue mask = int(raw) if mask == 0: continue users[uid] = { "mask_hex": hex(mask), "privileges": _mask_to_names(mask), "is_admin": _is_admin(uid, config), } if cursor == b"0" or cursor == 0: break # 2. Ensure admins from config are included even if they have # no explicit Redis entry (they implicitly have ALL bits). admin_ids = getattr(config, "admin_user_ids", None) or [] for aid in admin_ids: if aid not in users: users[aid] = { "mask_hex": hex(ALL_BITS), "privileges": ["ALL (admin)"], "is_admin": True, } else: users[aid]["is_admin"] = True return json.dumps( { "success": True, "total_privileged_users": len(users), "users": users, }, indent=2, ) async def _alter_guild_privileges( target_user_id: str, action: str, privileges: list, guild_id: str = "", ctx: ToolContext | None = None, ) -> str: """Grant or revoke privilege bits at guild scope. Writes to ``stargazer:guild_privileges:{guild_id}:{user_id}``. Bits with resolution mode DANGEROUS (ALTER_PRIVILEGES, UNSANDBOXED_EXEC, SHADOW_BAN_ADMIN) are rejected — they only resolve globally. """ user_id = getattr(ctx, "user_id", "") or "" redis = getattr(ctx, "redis", None) config = getattr(ctx, "config", None) if redis is None: return json.dumps({"success": False, "error": "Redis not available"}) # Resolve guild_id from context if not provided guild_id = (guild_id or "").strip() if not guild_id: guild_id = getattr(ctx, "guild_id", "") or "" if not guild_id: return json.dumps( { "success": False, "error": "guild_id is required (not in a guild context).", } ) # Caller must have ALTER_PRIVILEGES or GUILD_ADMIN caller_mask = await get_user_privileges(redis, user_id, config) has_alter = bool(caller_mask & (1 << PRIVILEGES["ALTER_PRIVILEGES"])) has_guild_admin = bool(caller_mask & (1 << PRIVILEGES["GUILD_ADMIN"])) if not (has_alter or has_guild_admin): return json.dumps( { "success": False, "error": "Requires ALTER_PRIVILEGES or GUILD_ADMIN.", } ) requested_mask, bad_names = _resolve_privileges(privileges) if requested_mask == 0 and not bad_names: return json.dumps({"success": False, "error": "No valid privileges specified."}) # Block dangerous bits at scoped level dangerous_requested = requested_mask & sum(1 << b for b in DANGEROUS_BITS) if dangerous_requested: return json.dumps( { "success": False, "error": ( f"Bits {_mask_to_names(dangerous_requested)} are DANGEROUS_BITS " "and can only be set at global scope." ), } ) # Scope: caller can only grant what they have allowed_mask = requested_mask & caller_mask denied_mask = requested_mask & ~caller_mask current = await _get_scoped_mask(redis, "guild", guild_id, None, target_user_id) if current is not None: target_mask = current else: # Inherit effective global mask so granting one bit doesn't silently # revoke all others via RES_NORMAL channel-wins-over-global logic. target_mask = await get_user_privileges(redis, target_user_id, config=None) action = action.lower() if action == "grant": new_mask = target_mask | allowed_mask elif action == "revoke": new_mask = target_mask & ~allowed_mask else: return json.dumps({"success": False, "error": f"Unknown action '{action}'."}) await _set_scoped_mask(redis, "guild", guild_id, None, target_user_id, new_mask) result: dict[str, Any] = { "success": True, "scope": "guild", "guild_id": guild_id, "action": action, "target_user_id": target_user_id, "previous": hex(target_mask), "new": hex(new_mask), "applied": _mask_to_names(allowed_mask), } if denied_mask: result["denied"] = _mask_to_names(denied_mask) if bad_names: result["unrecognized"] = bad_names logger.info( "Guild %s: user %s %sd scoped privileges for %s: %s -> %s", guild_id, user_id, action, target_user_id, hex(target_mask), hex(new_mask), ) return json.dumps(result, indent=2) async def _alter_channel_privileges( target_user_id: str, action: str, privileges: list, guild_id: str = "", channel_id: str = "", ctx: ToolContext | None = None, ) -> str: """Grant or revoke privilege bits at channel scope. Writes to ``stargazer:channel_privileges:{guild_id}:{channel_id}:{user_id}``. Bits with resolution mode DANGEROUS are rejected — they only resolve globally. """ user_id = getattr(ctx, "user_id", "") or "" redis = getattr(ctx, "redis", None) config = getattr(ctx, "config", None) if redis is None: return json.dumps({"success": False, "error": "Redis not available"}) guild_id = (guild_id or "").strip() if not guild_id: guild_id = getattr(ctx, "guild_id", "") or "" if not guild_id: return json.dumps( { "success": False, "error": "guild_id is required (not in a guild context).", } ) channel_id = (channel_id or "").strip() if not channel_id: channel_id = getattr(ctx, "channel_id", "") or "" if not channel_id: return json.dumps( { "success": False, "error": "channel_id is required.", } ) # Caller must have ALTER_PRIVILEGES, GUILD_ADMIN, or CHANNEL_ADMIN caller_mask = await get_user_privileges(redis, user_id, config) has_alter = bool(caller_mask & (1 << PRIVILEGES["ALTER_PRIVILEGES"])) has_guild_admin = bool(caller_mask & (1 << PRIVILEGES["GUILD_ADMIN"])) has_channel_admin = bool(caller_mask & (1 << PRIVILEGES["CHANNEL_ADMIN"])) if not (has_alter or has_guild_admin or has_channel_admin): return json.dumps( { "success": False, "error": "Requires ALTER_PRIVILEGES, GUILD_ADMIN, or CHANNEL_ADMIN.", } ) requested_mask, bad_names = _resolve_privileges(privileges) if requested_mask == 0 and not bad_names: return json.dumps({"success": False, "error": "No valid privileges specified."}) dangerous_requested = requested_mask & sum(1 << b for b in DANGEROUS_BITS) if dangerous_requested: return json.dumps( { "success": False, "error": ( f"Bits {_mask_to_names(dangerous_requested)} are DANGEROUS_BITS " "and can only be set at global scope." ), } ) allowed_mask = requested_mask & caller_mask denied_mask = requested_mask & ~caller_mask current = await _get_scoped_mask( redis, "channel", guild_id, channel_id, target_user_id ) if current is not None: target_mask = current else: # Inherit the guild mask (falling back to global) so granting one # channel-level bit doesn't silently revoke all others. inherited = await _get_scoped_mask( redis, "guild", guild_id, None, target_user_id ) target_mask = ( inherited if inherited is not None else await get_user_privileges( redis, target_user_id, config=None, ) ) action = action.lower() if action == "grant": new_mask = target_mask | allowed_mask elif action == "revoke": new_mask = target_mask & ~allowed_mask else: return json.dumps({"success": False, "error": f"Unknown action '{action}'."}) await _set_scoped_mask( redis, "channel", guild_id, channel_id, target_user_id, new_mask ) result: dict[str, Any] = { "success": True, "scope": "channel", "guild_id": guild_id, "channel_id": channel_id, "action": action, "target_user_id": target_user_id, "previous": hex(target_mask), "new": hex(new_mask), "applied": _mask_to_names(allowed_mask), } if denied_mask: result["denied"] = _mask_to_names(denied_mask) if bad_names: result["unrecognized"] = bad_names logger.info( "Channel %s/%s: user %s %sd scoped privileges for %s: %s -> %s", guild_id, channel_id, user_id, action, target_user_id, hex(target_mask), hex(new_mask), ) return json.dumps(result, indent=2) async def _stargazer_full_revoke( target_user_id: str, *, redis: Any, config: Any, caller_id: str, reason: str = "", ) -> dict[str, Any]: """Strip ``STARGAZER_USE`` (bit 63) from a user at every scope. Enforces a hard, un-bypassable ban: clears bit 63 in the global mask and then SCANs every guild- and channel-scoped key (``stargazer:guild_privileges:*`` / ``stargazer:channel_privileges:*``) whose trailing segment is the target user, clearing the bit wherever it is set so no scoped override can re-grant access. Only bit 63 is touched; all other privilege bits are left intact (use :func:`_purge_all_privileges` to wipe everything). Issues multiple Redis GET/SET calls and logs an audit line. Called by :func:`_stargazer_full_revoke_wrapper` (which performs the auth and admin-safety checks) and exercised directly in ``tests/test_stargazer_full_revoke.py``. Args: target_user_id: User whose access is being revoked. redis: Async Redis client. config: Bot config (accepted for symmetry; not consulted here). caller_id: ID of the actor, recorded in the audit log. reason: Optional human-readable reason, recorded in the audit log. Returns: A results dict with ``success``, ``global_was_set``, and the count of ``guild_keys_cleared`` / ``channel_keys_cleared``. """ bit = 63 flag = 1 << bit results = { "success": True, "target_user_id": target_user_id, "global_was_set": False, "guild_keys_cleared": 0, "channel_keys_cleared": 0, } # 1. Global scope global_key = _redis_key(target_user_id) raw_global = await redis.get(global_key) if raw_global is not None: mask = int(raw_global) if mask & flag: new_mask = mask & ~flag await redis.set(global_key, str(new_mask)) results["global_was_set"] = True # 2. Scoped scans # Patterns to scan scans = [ ("guild", f"{GUILD_KEY_PREFIX}:*"), ("channel", f"{CHANNEL_KEY_PREFIX}:*"), ] for scope_type, pattern in scans: cursor = b"0" while True: cursor, keys = await redis.scan(cursor, match=pattern, count=200) for key in keys: key_str = key.decode() if isinstance(key, bytes) else key # Suffix is always the user_id uid = key_str.rsplit(":", 1)[-1] if uid == target_user_id: raw = await redis.get(key) if raw is not None: mask = int(raw) if mask & flag: new_mask = mask & ~flag await redis.set(key, str(new_mask)) if scope_type == "guild": results["guild_keys_cleared"] += 1 else: results["channel_keys_cleared"] += 1 if cursor == b"0" or cursor == 0: break logger.info( "Full revoke: caller %s revoked STARGAZER_USE for %s. Results: %s. Reason: %s", caller_id, target_user_id, results, reason, ) return results async def _purge_all_privileges( target_user_id: str, *, redis: Any, config: Any, caller_id: str, reason: str = "", ) -> dict[str, Any]: """Zero out the entire privilege bitmask for a user at all scopes. Unlike _stargazer_full_revoke which only clears bit 63, this deletes the key entirely (equivalent to mask = 0) at global, guild, and channel scopes, stripping every privilege bit. """ results = { "success": True, "target_user_id": target_user_id, "global_cleared": False, "guild_keys_cleared": 0, "channel_keys_cleared": 0, } # 1. Global scope — delete the key entirely (mask → 0) global_key = _redis_key(target_user_id) deleted = await redis.delete(global_key) results["global_cleared"] = bool(deleted) # 2. Scoped scans — delete guild and channel keys for this user scans = [ ("guild", f"{GUILD_KEY_PREFIX}:*"), ("channel", f"{CHANNEL_KEY_PREFIX}:*"), ] for scope_type, pattern in scans: cursor = b"0" while True: cursor, keys = await redis.scan(cursor, match=pattern, count=200) for key in keys: key_str = key.decode() if isinstance(key, bytes) else key uid = key_str.rsplit(":", 1)[-1] if uid == target_user_id: await redis.delete(key) if scope_type == "guild": results["guild_keys_cleared"] += 1 else: results["channel_keys_cleared"] += 1 if cursor == b"0" or cursor == 0: break logger.warning( "Purge all privileges: caller %s wiped ALL bits for %s. Results: %s. Reason: %s", caller_id, target_user_id, results, reason, ) return results async def _purge_all_privileges_wrapper( target_user_id: str, reason: str = "", ctx: "ToolContext | None" = None, ) -> str: """Authorize and run a full privilege wipe for the ``purge_all_privileges`` tool. The LLM-facing front end for :func:`_purge_all_privileges`: it pulls the caller, Redis client, and config off the :class:`ToolContext`, requires the caller to hold ``ALTER_PRIVILEGES``, and refuses to target a configured admin before delegating the destructive scan-and-delete to the inner function. Reads and deletes Redis keys (via the delegate) and returns a JSON string. Not called directly in code — dispatched by name through this module's ``TOOLS`` registry, which ``tool_loader.py`` discovers via ``getattr(module, "TOOLS")``. Args: target_user_id: User whose entire privilege mask will be zeroed. reason: Optional reason string, forwarded to the audit log. ctx: Tool context supplying ``user_id``, ``redis``, and ``config``. Returns: A JSON string: the delegate's results on success, or an error object when Redis is unavailable, the caller lacks ``ALTER_PRIVILEGES``, or the target is an admin. """ user_id = getattr(ctx, "user_id", "") or "" redis = getattr(ctx, "redis", None) config = getattr(ctx, "config", None) if redis is None: return json.dumps({"success": False, "error": "Redis not available"}) caller_mask = await get_user_privileges(redis, user_id, config) if not (caller_mask & (1 << PRIVILEGES["ALTER_PRIVILEGES"])): return json.dumps( { "success": False, "error": "You do not have the ALTER_PRIVILEGES privilege.", } ) if _is_admin(target_user_id, config): return json.dumps( { "success": False, "error": f"Refused: User {target_user_id} is a configured admin.", } ) result = await _purge_all_privileges( target_user_id=target_user_id, redis=redis, config=config, caller_id=user_id, reason=reason, ) return json.dumps(result, indent=2) async def _stargazer_full_revoke_wrapper( target_user_id: str, reason: str = "", ctx: ToolContext | None = None, ) -> str: """Authorize and run a full access ban for the ``stargazer_full_revoke`` tool. The LLM-facing front end for :func:`_stargazer_full_revoke`: it reads the caller, Redis client, and config from the :class:`ToolContext`, requires the caller to hold ``ALTER_PRIVILEGES``, and refuses to target a configured admin before delegating the all-scope revoke of ``STARGAZER_USE`` (bit 63). Reads and rewrites Redis keys (via the delegate) and returns a JSON string. Not called directly in code — dispatched by name through this module's ``TOOLS`` registry, which ``tool_loader.py`` discovers via ``getattr(module, "TOOLS")``. Args: target_user_id: User to fully revoke access from. reason: Optional reason string, forwarded to the audit log. ctx: Tool context supplying ``user_id``, ``redis``, and ``config``. Returns: A JSON string: the delegate's results on success, or an error object when Redis is unavailable, the caller lacks ``ALTER_PRIVILEGES``, or the target is an admin. """ user_id = getattr(ctx, "user_id", "") or "" redis = getattr(ctx, "redis", None) config = getattr(ctx, "config", None) if redis is None: return json.dumps({"success": False, "error": "Redis not available"}) # Auth check: caller must have ALTER_PRIVILEGES caller_mask = await get_user_privileges(redis, user_id, config) if not (caller_mask & (1 << PRIVILEGES["ALTER_PRIVILEGES"])): return json.dumps( { "success": False, "error": "You do not have the ALTER_PRIVILEGES privilege.", } ) # Safety: refuse to revoke admins if _is_admin(target_user_id, config): return json.dumps( { "success": False, "error": f"Refused: User {target_user_id} is a configured admin.", } ) result = await _stargazer_full_revoke( target_user_id=target_user_id, redis=redis, config=config, caller_id=user_id, reason=reason, ) return json.dumps(result, indent=2) # ------------------------------------------------------------------ # Multi-tool registration # ------------------------------------------------------------------ _ALTER_PRIVILEGES_DESCRIPTION = ( "Grant or revoke privilege bits for a user. Requires ALTER_PRIVILEGES. " "You can only grant or revoke bits that you yourself have enabled. " "Known privileges (name=bit): " + _privileges_catalog_text() ) _PRIVILEGES_PARAM_DESCRIPTION = ( "Privilege names from the registry or decimal bit numbers 0-63. " "Known names: " + _privileges_catalog_text() ) _TOOLS_GET_DESCRIPTION = ( "View the privilege bitmask for a user (hex, decoded names, is_admin). " "Response includes available_privilege_names (full registry). " "Defaults to the calling user if target_user_id is omitted. " "Reading another user requires ALTER_PRIVILEGES." ) _GUILD_PRIV_DESCRIPTION = ( "Grant or revoke privilege bits at GUILD scope. " "Guild-scoped privileges override the global mask for a specific server " "(for bits with NORMAL resolution mode). " "Requires ALTER_PRIVILEGES or GUILD_ADMIN. " "DANGEROUS mode bits (ALTER_PRIVILEGES, UNSANDBOXED_EXEC, SHADOW_BAN_ADMIN) " "cannot be set at guild scope — they only resolve globally. " "Known privileges (name=bit): " + _privileges_catalog_text() ) _CHANNEL_PRIV_DESCRIPTION = ( "Grant or revoke privilege bits at CHANNEL scope. " "Channel-scoped privileges override both global and guild masks " "(for bits with NORMAL resolution mode). " "Requires ALTER_PRIVILEGES, GUILD_ADMIN, or CHANNEL_ADMIN. " "DANGEROUS mode bits (ALTER_PRIVILEGES, UNSANDBOXED_EXEC, SHADOW_BAN_ADMIN) " "cannot be set at channel scope — they only resolve globally. " "Known privileges (name=bit): " + _privileges_catalog_text() ) TOOLS = [ { "name": "alter_privileges", "description": _ALTER_PRIVILEGES_DESCRIPTION, "parameters": { "type": "object", "properties": { "target_user_id": { "type": "string", "description": "Platform user ID of the target user.", }, "action": { "type": "string", "enum": ["grant", "revoke"], "description": "Whether to grant or revoke the specified privileges.", }, "privileges": { "type": "array", "items": {"type": "string"}, "description": _PRIVILEGES_PARAM_DESCRIPTION, }, }, "required": ["target_user_id", "action", "privileges"], }, "handler": _alter_privileges, }, { "name": "get_privileges", "description": _TOOLS_GET_DESCRIPTION, "parameters": { "type": "object", "properties": { "target_user_id": { "type": "string", "description": "Platform user ID to look up. Omit to check your own privileges.", }, }, }, "handler": _get_privileges, }, { "name": "audit_privileges", "description": ( "List users with a non-zero privilege mask in Redis plus admins " "from config (admins implicitly have all bits). Requires " "ALTER_PRIVILEGES. For the canonical name=bit registry, use " "get_privileges or see alter_privileges." ), "parameters": { "type": "object", "properties": {}, }, "handler": _audit_privileges, }, { "name": "alter_guild_privileges", "description": _GUILD_PRIV_DESCRIPTION, "parameters": { "type": "object", "properties": { "target_user_id": { "type": "string", "description": "Platform user ID of the target user.", }, "action": { "type": "string", "enum": ["grant", "revoke"], "description": "Whether to grant or revoke.", }, "privileges": { "type": "array", "items": {"type": "string"}, "description": _PRIVILEGES_PARAM_DESCRIPTION, }, "guild_id": { "type": "string", "description": ( "Target guild/server ID. Defaults to the current " "guild if omitted." ), }, }, "required": ["target_user_id", "action", "privileges"], }, "handler": _alter_guild_privileges, }, { "name": "alter_channel_privileges", "description": _CHANNEL_PRIV_DESCRIPTION, "parameters": { "type": "object", "properties": { "target_user_id": { "type": "string", "description": "Platform user ID of the target user.", }, "action": { "type": "string", "enum": ["grant", "revoke"], "description": "Whether to grant or revoke.", }, "privileges": { "type": "array", "items": {"type": "string"}, "description": _PRIVILEGES_PARAM_DESCRIPTION, }, "guild_id": { "type": "string", "description": ( "Target guild/server ID. Defaults to the current " "guild if omitted." ), }, "channel_id": { "type": "string", "description": ( "Target channel ID. Defaults to the current " "channel if omitted." ), }, }, "required": ["target_user_id", "action", "privileges"], }, "handler": _alter_channel_privileges, }, { "name": "stargazer_full_revoke", "description": ( "Revoke STARGAZER_USE (bit 63) for a user at ALL scopes — global, " "guild, and channel. This is a complete ban that cannot be bypassed " "by any scoped override. Requires ALTER_PRIVILEGES. Cannot target admins." ), "parameters": { "type": "object", "properties": { "target_user_id": { "type": "string", "description": "The user ID to fully revoke access from.", }, "reason": { "type": "string", "description": "Reason for the revoke (logged).", }, }, "required": ["target_user_id"], }, "handler": _stargazer_full_revoke_wrapper, }, { "name": "purge_all_privileges", "description": ( "Wipe the ENTIRE privilege bitmask (all 64 bits) for a user at ALL scopes — " "global, guild, and channel. This removes every granted privilege, not just " "STARGAZER_USE. Requires ALTER_PRIVILEGES. Cannot target configured admins." ), "parameters": { "type": "object", "properties": { "target_user_id": { "type": "string", "description": "The user ID whose entire privilege bitmask will be zeroed.", }, "reason": { "type": "string", "description": "Reason for the purge (logged).", }, }, "required": ["target_user_id"], }, "handler": _purge_all_privileges_wrapper, }, ]