"""User privilege management with a 64-bit bitmask stored in Redis.
Each bit is a named privilege. The canonical registry is :data:`PRIVILEGES`
(name → bit index, 0–63). Admins (``config.admin_user_ids``) implicitly have
all bits set. Unknown set bits are reported as ``bit_N``.
Other modules check privileges with::
from tools.alter_privileges import has_privilege, PRIVILEGES
Tool-facing descriptions below are built from :data:`PRIVILEGES` so they stay
in sync when bits are added or renamed.
"""
from __future__ import annotations
import jsonutil as json
import logging
from typing import Any, TYPE_CHECKING
if TYPE_CHECKING:
from tool_context import ToolContext
logger = logging.getLogger(__name__)
REDIS_KEY_PREFIX = "stargazer:user_privileges"
ALL_BITS = (1 << 64) - 1
PRIVILEGES: dict[str, int] = {
"ALTER_PRIVILEGES": 0,
"CORE_MEMORY": 1,
"UNSANDBOXED_EXEC": 2,
"SQLITE_CROSS_USER": 3,
"DUMP_NOTES": 4,
"DUMP_GOALS": 5,
"ANCHOR_ADMIN": 6,
"SUBAGENT_ACCESS": 7,
"WEB_SEARCH": 8,
"LONG_TERM_GOALS": 9,
"READ_DM": 10,
"CHAT_ANALYTICS": 11,
"BYPASS_RATELIMIT": 12,
"READ_TOOL_CODE": 13,
"SHADOW_BAN_ADMIN": 14,
"CTX_MANAGE": 15,
"OPUS_ACCESS": 16,
"GUILD_ADMIN": 17,
"CHANNEL_ADMIN": 18,
"GAME_ADMIN": 19,
"FILE_OPS_ADMIN": 20,
"VULTR_ADMIN": 21,
"ENTRAINMENT_ADMIN": 22,
"STARGAZER_USE": 63,
}
# ------------------------------------------------------------------
# Per-bit resolution modes
# ------------------------------------------------------------------
# 0 = NORMAL: channel > guild > global (most specific scope wins)
# 1 = INVERTED: global > guild > channel (most general scope wins)
# 2 = DANGEROUS: global only, scoped masks completely ignored
# 3 = ANY: global OR guild OR channel — granted if set at any scope
RES_NORMAL = 0
RES_INVERTED = 1
RES_DANGEROUS = 2
RES_ANY = 3
BIT_RESOLUTION_MODE: dict[int, int] = {
PRIVILEGES["ALTER_PRIVILEGES"]: RES_DANGEROUS, # 0 — self-escalation
PRIVILEGES["UNSANDBOXED_EXEC"]: RES_DANGEROUS, # 2 — code execution
PRIVILEGES["SHADOW_BAN_ADMIN"]: RES_DANGEROUS, # 14 — lift own shadow ban
PRIVILEGES["GUILD_ADMIN"]: RES_DANGEROUS, # 17 — scoped self-escalation
PRIVILEGES["CHANNEL_ADMIN"]: RES_DANGEROUS, # 18 — scoped self-escalation
PRIVILEGES["FILE_OPS_ADMIN"]: RES_DANGEROUS, # 20 — filesystem access
PRIVILEGES["VULTR_ADMIN"]: RES_DANGEROUS, # 21 — vultr access
PRIVILEGES[
"STARGAZER_USE"
]: RES_NORMAL, # 63 — access gate: channel > guild > global
# All unlisted bits default to RES_NORMAL
}
def _bit_mode(bit: int) -> int:
"""Return the scope-resolution mode for a privilege bit.
Looks the bit up in :data:`BIT_RESOLUTION_MODE` and falls back to
``RES_NORMAL`` (channel beats guild beats global) for any bit not
explicitly listed, so newly added privileges default to the safe
most-specific-scope-wins behaviour. This is the single source of truth
consulted by :func:`resolve_privilege_bit` and
:func:`_has_scoped_privilege_single` when deciding how the three scope
masks combine.
Args:
bit: Privilege bit index (0-63).
Returns:
One of ``RES_NORMAL``, ``RES_INVERTED``, ``RES_DANGEROUS``, or
``RES_ANY``.
"""
return BIT_RESOLUTION_MODE.get(bit, RES_NORMAL)
# Derived for backwards compatibility and scoped-tool write guards
DANGEROUS_BITS: frozenset[int] = frozenset(
bit for bit, mode in BIT_RESOLUTION_MODE.items() if mode == RES_DANGEROUS
)
# Scoped privilege Redis key prefixes
GUILD_KEY_PREFIX = "stargazer:guild_privileges"
CHANNEL_KEY_PREFIX = "stargazer:channel_privileges"
# Opus hardcoded guild
_OPUS_HARDCODE_GUILD = "1405695643913027737"
_REVERSE: dict[int, str] = {v: k for k, v in PRIVILEGES.items()}
def _privileges_catalog_text() -> str:
"""Render the privilege registry as a compact ``name=bit`` catalog string.
Sorts :data:`PRIVILEGES` by bit index and joins each entry as
``NAME=index``, producing a stable, human-readable list. This text is
embedded at import time into the module-level tool/parameter description
constants (``_ALTER_PRIVILEGES_DESCRIPTION``, ``_PRIVILEGES_PARAM_DESCRIPTION``,
the guild/channel descriptions) so the LLM-facing tool schemas stay in sync
with the registry whenever bits are added or renamed. Called only within
this module while those constants are being built.
Returns:
A comma-separated string such as ``ALTER_PRIVILEGES=0, CORE_MEMORY=1, ...``.
"""
pairs = sorted(PRIVILEGES.items(), key=lambda kv: kv[1])
return ", ".join(f"{name}={bit}" for name, bit in pairs)
def _redis_key(user_id: str) -> str:
"""Build the Redis key that stores a user's global privilege bitmask.
Namespaces the user ID under :data:`REDIS_KEY_PREFIX`
(``stargazer:user_privileges``); the value at this key is the decimal
string of the 64-bit mask. This is the canonical key for the *global*
scope, distinct from the guild/channel scoped keys built by
:func:`_scoped_redis_key`. Used throughout this module by
:func:`get_user_privileges`, :func:`_alter_privileges`,
:func:`_audit_privileges`, :func:`_stargazer_full_revoke`, and
:func:`_purge_all_privileges`, and imported directly by
``tools/stargazer_ban.py`` and ``tools/stargazer_shadowban.py`` to read
and rewrite the same global mask.
Args:
user_id: Platform user ID.
Returns:
The fully qualified Redis key string.
"""
return f"{REDIS_KEY_PREFIX}:{user_id}"
def _is_admin(user_id: str, config: Any) -> bool:
"""Report whether a user is a hard-coded admin in the bot config.
Admins are listed in ``config.admin_user_ids`` and implicitly hold every
privilege bit, so this short-circuits the bitmask machinery: callers treat
a True result as "all 64 bits set". Reads only the in-memory config object
(no Redis or network) and returns False when ``config`` is missing. Used
here by :func:`get_user_privileges`, :func:`_has_scoped_privilege_single`,
:func:`_can_alter_scope`, :func:`_get_privileges`, :func:`_audit_privileges`,
and the purge/revoke wrappers to refuse demoting an admin; also imported
directly by ``tools/stargazer_ban.py``, ``tools/stargazer_shadowban.py``,
and ``tools/parallax_telemetry.py``.
Args:
user_id: Platform user ID to check.
config: Bot configuration object exposing ``admin_user_ids``; may be
``None``.
Returns:
True if ``user_id`` appears in the configured admin list, else False.
"""
if config is None:
return False
admin_ids = getattr(config, "admin_user_ids", None) or []
return user_id in admin_ids
[docs]
async def get_user_privileges(redis: Any, user_id: str, config: Any = None) -> int:
"""Return the privilege bitmask for *user_id*.
Admins get all 64 bits. Returns ``0`` when the user has no stored
privileges and is not an admin.
"""
if _is_admin(user_id, config):
return ALL_BITS
if redis is None:
return 0
raw = await redis.get(_redis_key(user_id))
if raw is None:
return 0
return int(raw)
[docs]
async def has_privilege(
redis: Any,
user_id: str,
bit: int,
config: Any = None,
user_aliases: list[str] | None = None,
) -> bool:
"""Report whether a user (or any alias) has one privilege bit set.
Checks the single privilege flag ``bit`` for ``user_id``: for each alias in
``user_aliases`` (defaulting to just ``user_id``) it strips any ``platform:``
prefix, loads that identity's bitmask via :func:`get_user_privileges`, and
returns ``True`` on the first alias whose mask has ``bit`` set. Read-only
against Redis.
This is the shared gate the tool layer uses to authorize privileged actions
-- e.g. :func:`tools.prowlarr_search._check_web_search_access` and the other
per-tool access checks pass a ``PRIVILEGES[...]`` constant as ``bit``.
Args:
redis: Async Redis client used to read each identity's privilege mask.
user_id: The primary user id to check (used when ``user_aliases`` is
empty).
bit: Zero-based bit index of the privilege to test (``1 << bit``).
config: Optional config forwarded to :func:`get_user_privileges` (e.g.
for owner/superuser short-circuits).
user_aliases: Optional list of equivalent identities (cross-platform
links); any one of them holding the bit grants access.
Returns:
bool: ``True`` if any checked identity has ``bit`` enabled, else
``False``.
"""
aliases = user_aliases if user_aliases else [user_id]
for alias in aliases:
p_uid = alias.split(":", 1)[1] if ":" in alias else alias
mask = await get_user_privileges(redis, p_uid, config)
if bool(mask & (1 << bit)):
return True
return False
# ------------------------------------------------------------------
# Scoped privilege resolution (global < guild < channel)
# ------------------------------------------------------------------
# Placeholder used in Redis keys when no guild_id is available (e.g. Matrix rooms, DMs).
_NO_GUILD_SENTINEL = "__global__"
def _scoped_redis_key(
scope: str,
guild_id: str,
channel_id: str | None,
user_id: str,
) -> str:
"""Build Redis key for guild/channel scoped masks.
When ``guild_id`` is empty (e.g. Matrix rooms or Discord DMs) the sentinel
``'__global__'`` is used so that channel-level overrides can still be stored
and resolved without a guild namespace.
"""
effective_guild = guild_id or _NO_GUILD_SENTINEL
if scope == "channel" and channel_id:
return f"{CHANNEL_KEY_PREFIX}:{effective_guild}:{channel_id}:{user_id}"
return f"{GUILD_KEY_PREFIX}:{effective_guild}:{user_id}"
async def _get_scoped_mask(
redis: Any,
scope: str,
guild_id: str,
channel_id: str | None,
user_id: str,
) -> int | None:
"""Fetch a user's guild- or channel-scoped privilege mask from Redis.
Reads the key produced by :func:`_scoped_redis_key` and distinguishes an
*absent* scoped override (no key) from an explicit zero mask: a missing key
returns ``None`` so the resolver can fall back to a broader scope, whereas a
stored ``0`` is a real value. Performs a single Redis GET and never raises on
a missing key. Called internally by :func:`_has_scoped_privilege_single`,
:func:`_alter_guild_privileges`, and :func:`_alter_channel_privileges`, and
imported by ``prompt_context.py``, ``message_processor/proxy_status_commands.py``,
and ``tools/privilege_capsh.py`` to inspect effective scoped grants.
Args:
redis: Async Redis client, or ``None`` (treated as no data).
scope: Either ``'guild'`` or ``'channel'``.
guild_id: Guild/server ID (or empty for the global sentinel).
channel_id: Channel ID; required only for channel scope.
user_id: Platform user ID.
Returns:
The stored bitmask as an ``int``, or ``None`` when no scoped key exists.
"""
if redis is None:
return None
key = _scoped_redis_key(scope, guild_id, channel_id, user_id)
raw = await redis.get(key)
if raw is None:
return None
return int(raw)
async def _set_scoped_mask(
redis: Any,
scope: str,
guild_id: str,
channel_id: str | None,
user_id: str,
mask: int,
) -> None:
"""Persist a guild- or channel-scoped privilege mask to Redis.
Writes the decimal string of ``mask`` to the key built by
:func:`_scoped_redis_key`, creating or overwriting the scoped override for
the user. A ``None`` Redis client is a silent no-op. Called by
:func:`_alter_guild_privileges` and :func:`_alter_channel_privileges` after
they compute the new mask; the global scope writes directly via
:func:`_redis_key` instead of this helper.
Args:
redis: Async Redis client, or ``None`` (no-op).
scope: Either ``'guild'`` or ``'channel'``.
guild_id: Guild/server ID (or empty for the global sentinel).
channel_id: Channel ID; required only for channel scope.
user_id: Platform user ID.
mask: The bitmask to store.
"""
if redis is None:
return
key = _scoped_redis_key(scope, guild_id, channel_id, user_id)
await redis.set(key, str(mask))
[docs]
def resolve_privilege_bit(
bit: int,
global_mask: int,
guild_mask: int | None,
channel_mask: int | None,
) -> bool:
"""Resolve a single privilege bit across all three scope masks.
Resolution modes:
0 (NORMAL): channel > guild > global — most specific scope wins.
1 (INVERTED): global > guild > channel — most general scope wins.
2 (DANGEROUS): global only — scoped masks ignored.
3 (ANY): granted if set at any scope (global OR guild OR channel).
"""
mode = _bit_mode(bit)
flag = 1 << bit
if mode == RES_DANGEROUS:
return bool(global_mask & flag)
if mode == RES_INVERTED:
# Most general wins: global, then guild, then channel
return bool(global_mask & flag)
# (guild/channel could restrict in future, but can't escalate)
if mode == RES_ANY:
# Granted if the bit is set at ANY scope — global, guild, or channel.
# This is the access-gate pattern: guild or channel whitelisting is
# sufficient, but a global ban (bit cleared at global AND no scoped
# grant) still blocks the user.
if bool(global_mask & flag):
return True
if guild_mask is not None and bool(guild_mask & flag):
return True
if channel_mask is not None and bool(channel_mask & flag):
return True
return False
# RES_NORMAL: most specific scope wins (channel > guild > global)
if channel_mask is not None:
return bool(channel_mask & flag)
if guild_mask is not None:
return bool(guild_mask & flag)
return bool(global_mask & flag)
[docs]
async def has_scoped_privilege(
redis: Any,
user_id: str,
bit: int,
config: Any = None,
guild_id: str | None = None,
channel_id: str | None = None,
user_aliases: list[str] | None = None,
) -> bool:
"""Check a scoped privilege bit across all of a user's linked aliases.
The primary public scope-aware access check: a user may appear under several
platform identities (cross-platform alias links), so this strips any
``platform:`` prefix from each alias and grants access if *any* alias passes
:func:`_has_scoped_privilege_single`, which applies the bit's per-scope
resolution mode against the global, guild, and channel masks read from Redis.
Short-circuits on the first alias that succeeds. Used pervasively as the
gatekeeper for feature and tool access — by ``message_processor/processor.py``
and ``generate_and_send.py``, ``feature_toggles.py``, ``platforms/discord.py``,
``game_ui/action_handler.py``, and tools such as ``set_user_timezone`` and
``context_window_tools`` — typically for the ``STARGAZER_USE`` access gate and
admin checks.
Args:
redis: Async Redis client used to read the scoped masks.
user_id: Platform user ID (used when ``user_aliases`` is not given).
bit: Privilege bit index (0-63) to check.
config: Bot config for the admin short-circuit; optional.
guild_id: Guild/server ID for scoped resolution; optional.
channel_id: Channel ID for the most-specific scope; optional.
user_aliases: Full alias list to test; defaults to ``[user_id]``.
Returns:
True if any alias holds the bit under its resolution mode, else False.
"""
aliases = user_aliases if user_aliases else [user_id]
for alias in aliases:
p_uid = alias.split(":", 1)[1] if ":" in alias else alias
if await _has_scoped_privilege_single(redis, p_uid, bit, config, guild_id, channel_id):
return True
return False
async def _has_scoped_privilege_single(
redis: Any,
user_id: str,
bit: int,
config: Any = None,
guild_id: str | None = None,
channel_id: str | None = None,
) -> bool:
"""Check privilege with per-bit resolution mode.
Resolution modes (see :data:`BIT_RESOLUTION_MODE`):
0 (NORMAL): channel > guild > global — most specific scope wins.
1 (INVERTED): global > guild > channel — most general scope wins.
2 (DANGEROUS): global only — scoped masks ignored.
3 (ANY): granted if set at any scope.
When ``guild_id`` is absent (Matrix rooms, Discord DMs) channel-level
overrides are still resolved via the ``_NO_GUILD_SENTINEL`` placeholder,
allowing per-room whitelisting/blacklisting on platforms without guild IDs.
OPUS_ACCESS has a hardcoded guild shortcircuit for guild
``1405695643913027737``.
"""
# Admins always pass
if _is_admin(user_id, config):
return True
# Opus guild hardcode
if bit == PRIVILEGES["OPUS_ACCESS"] and guild_id == _OPUS_HARDCODE_GUILD:
return True
global_mask = await get_user_privileges(redis, user_id, config)
guild_mask: int | None = None
channel_mask: int | None = None
mode = _bit_mode(bit)
if mode != RES_DANGEROUS:
# Always attempt guild lookup if a guild_id is present.
if guild_id:
guild_mask = await _get_scoped_mask(redis, "guild", guild_id, None, user_id)
# Channel lookup: works even if guild_id is empty — the sentinel is used
# in the Redis key so Matrix rooms / DMs get channel-level resolution.
if channel_id:
ch_guild = guild_id or ""
channel_mask = await _get_scoped_mask(
redis,
"channel",
ch_guild,
channel_id,
user_id,
)
result = resolve_privilege_bit(bit, global_mask, guild_mask, channel_mask)
bit_name = _REVERSE.get(bit, f"bit_{bit}")
logger.debug(
"[privilege] user=%s bit=%s(%d) guild=%r channel=%r "
"global=0x%x guild_mask=%s channel_mask=%s -> %s",
user_id,
bit_name,
bit,
guild_id or "(none)",
channel_id or "(none)",
global_mask,
hex(guild_mask) if guild_mask is not None else "None",
hex(channel_mask) if channel_mask is not None else "None",
result,
)
return result
async def _can_alter_scope(
redis: Any,
user_id: str,
scope: str,
config: Any = None,
guild_id: str | None = None,
) -> bool:
"""Check whether *user_id* can alter masks at the given scope.
- Global scope: requires ALTER_PRIVILEGES (bit 0)
- Guild scope: requires GUILD_ADMIN (17) at guild level, or ALTER_PRIVILEGES
- Channel scope: requires CHANNEL_ADMIN (18), or GUILD_ADMIN (17) at guild, or ALTER_PRIVILEGES
"""
if _is_admin(user_id, config):
return True
if await has_privilege(redis, user_id, PRIVILEGES["ALTER_PRIVILEGES"], config):
return True
if scope == "global":
return False
if scope == "guild":
if guild_id:
gu_mask = await _get_scoped_mask(redis, "guild", guild_id, None, user_id)
if gu_mask is not None and (gu_mask & (1 << PRIVILEGES["GUILD_ADMIN"])):
return True
return False
if scope == "channel":
# GUILD_ADMIN implies CHANNEL_ADMIN
if guild_id:
gu_mask = await _get_scoped_mask(redis, "guild", guild_id, None, user_id)
if gu_mask is not None and (gu_mask & (1 << PRIVILEGES["GUILD_ADMIN"])):
return True
# Direct CHANNEL_ADMIN check at channel scope handled by caller
return False
return False
def _resolve_privileges(names: list) -> tuple[int, list[str]]:
"""Convert a list of privilege names/bit numbers into a bitmask.
Returns ``(bitmask, list_of_bad_names)``.
"""
mask = 0
bad: list[str] = []
for entry in names:
if isinstance(entry, int) or (isinstance(entry, str) and entry.isdigit()):
bit = int(entry)
if 0 <= bit < 64:
mask |= 1 << bit
else:
bad.append(str(entry))
else:
name = str(entry).upper()
bit = PRIVILEGES.get(name)
if bit is not None:
mask |= 1 << bit
else:
bad.append(str(entry))
return mask, bad
def _mask_to_names(mask: int) -> list[str]:
"""Decode a privilege bitmask into its human-readable privilege names.
Walks all 64 bit positions and, for each set bit, looks the index up in the
reverse registry :data:`_REVERSE`, falling back to ``bit_N`` for indices not
in :data:`PRIVILEGES` so unknown grants are still reported rather than
dropped. The inverse of :func:`_resolve_privileges`. Called internally by the
tool handlers (:func:`_alter_privileges`, :func:`_get_privileges`,
:func:`_audit_privileges`, and the guild/channel alterers) to build the
``applied`` / ``denied`` / ``privileges`` fields of their JSON responses.
Args:
mask: A 64-bit privilege bitmask.
Returns:
Privilege names for every set bit, in ascending bit order.
"""
names: list[str] = []
for bit in range(64):
if mask & (1 << bit):
name = _REVERSE.get(bit, f"bit_{bit}")
names.append(name)
return names
# ------------------------------------------------------------------
# Tool handlers
# ------------------------------------------------------------------
async def _alter_privileges(
target_user_id: str,
action: str,
privileges: list,
ctx: ToolContext | None = None,
) -> str:
"""Grant or revoke bits on *target_user_id*'s stored mask.
Caller must have ``ALTER_PRIVILEGES``; only bits present in the caller's
mask can be changed on the target (you cannot grant what you do not have).
"""
user_id = getattr(ctx, "user_id", "") or ""
redis = getattr(ctx, "redis", None)
config = getattr(ctx, "config", None)
if redis is None:
return json.dumps({"success": False, "error": "Redis not available"})
caller_mask = await get_user_privileges(redis, user_id, config)
if not (caller_mask & (1 << PRIVILEGES["ALTER_PRIVILEGES"])):
return json.dumps(
{
"success": False,
"error": "You do not have the ALTER_PRIVILEGES privilege.",
}
)
requested_mask, bad_names = _resolve_privileges(privileges)
if requested_mask == 0 and not bad_names:
return json.dumps({"success": False, "error": "No valid privileges specified."})
allowed_mask = requested_mask & caller_mask
denied_mask = requested_mask & ~caller_mask
target_mask = await get_user_privileges(redis, target_user_id, config=None)
action = action.lower()
if action == "grant":
new_mask = target_mask | allowed_mask
elif action == "revoke":
new_mask = target_mask & ~allowed_mask
else:
return json.dumps(
{
"success": False,
"error": f"Unknown action '{action}'. Use 'grant' or 'revoke'.",
}
)
await redis.set(_redis_key(target_user_id), str(new_mask))
result: dict[str, Any] = {
"success": True,
"action": action,
"target_user_id": target_user_id,
"previous": hex(target_mask),
"new": hex(new_mask),
"applied": _mask_to_names(allowed_mask),
}
if denied_mask:
result["denied"] = _mask_to_names(denied_mask)
result["denied_reason"] = "You do not have these privileges yourself."
if bad_names:
result["unrecognized"] = bad_names
logger.info(
"User %s %sd privileges for %s: %s -> %s (applied=%s denied=%s)",
user_id,
action,
target_user_id,
hex(target_mask),
hex(new_mask),
hex(allowed_mask),
hex(denied_mask),
)
return json.dumps(result, indent=2)
async def _get_privileges(
target_user_id: str = "",
ctx: ToolContext | None = None,
) -> str:
"""Return hex mask, decoded names, admin flag, and ``available_privilege_names``.
Defaults to the caller. Reading another user requires ``ALTER_PRIVILEGES``.
"""
user_id = getattr(ctx, "user_id", "") or ""
redis = getattr(ctx, "redis", None)
config = getattr(ctx, "config", None)
if redis is None:
return json.dumps({"success": False, "error": "Redis not available"})
target = (target_user_id or "").strip() or user_id
if target != user_id:
caller_mask = await get_user_privileges(redis, user_id, config)
if not (caller_mask & (1 << PRIVILEGES["ALTER_PRIVILEGES"])):
return json.dumps(
{
"success": False,
"error": (
"Reading another user's privileges requires "
"ALTER_PRIVILEGES."
),
}
)
mask = await get_user_privileges(redis, target, config)
return json.dumps(
{
"success": True,
"user_id": target,
"mask_hex": hex(mask),
"mask_decimal": mask,
"privileges": _mask_to_names(mask),
"is_admin": _is_admin(target, config),
"available_privilege_names": list(PRIVILEGES.keys()),
},
indent=2,
)
async def _audit_privileges(
ctx: ToolContext | None = None,
) -> str:
"""Report every user holding a non-zero global privilege mask.
Backs the ``audit_privileges`` LLM tool: after confirming the caller holds
``ALTER_PRIVILEGES`` (admins implicitly pass), it SCANs Redis for all
``stargazer:user_privileges:*`` keys, decodes each non-zero mask via
:func:`_mask_to_names`, then merges in the configured ``admin_user_ids`` so
admins appear even without a stored entry (shown with all bits). Reads Redis
and the bot config from the :class:`ToolContext`; performs no writes. Not
called directly in code — it is dispatched by name through this module's
``TOOLS`` registry, which ``tool_loader.py`` discovers via
``getattr(module, "TOOLS")``.
Args:
ctx: Tool context supplying ``user_id``, ``redis``, and ``config``.
Returns:
A JSON string with ``total_privileged_users`` and a ``users`` map of
user ID to mask hex, decoded privilege names, and admin flag; or an
error object when Redis is unavailable or the caller lacks
``ALTER_PRIVILEGES``.
"""
user_id = getattr(ctx, "user_id", "") or ""
redis = getattr(ctx, "redis", None)
config = getattr(ctx, "config", None)
if redis is None:
return json.dumps({"success": False, "error": "Redis not available"})
# Caller must have ALTER_PRIVILEGES to see the full audit.
caller_mask = await get_user_privileges(redis, user_id, config)
if not (caller_mask & (1 << PRIVILEGES["ALTER_PRIVILEGES"])):
return json.dumps(
{
"success": False,
"error": "You do not have the ALTER_PRIVILEGES privilege.",
}
)
users: dict[str, Any] = {}
# 1. Scan Redis for every stored privilege bitmask.
prefix = f"{REDIS_KEY_PREFIX}:"
cursor = b"0"
while True:
cursor, keys = await redis.scan(cursor, match=f"{prefix}*", count=200)
for key in keys:
key_str = key.decode() if isinstance(key, bytes) else key
uid = key_str[len(prefix) :]
raw = await redis.get(key)
if raw is None:
continue
mask = int(raw)
if mask == 0:
continue
users[uid] = {
"mask_hex": hex(mask),
"privileges": _mask_to_names(mask),
"is_admin": _is_admin(uid, config),
}
if cursor == b"0" or cursor == 0:
break
# 2. Ensure admins from config are included even if they have
# no explicit Redis entry (they implicitly have ALL bits).
admin_ids = getattr(config, "admin_user_ids", None) or []
for aid in admin_ids:
if aid not in users:
users[aid] = {
"mask_hex": hex(ALL_BITS),
"privileges": ["ALL (admin)"],
"is_admin": True,
}
else:
users[aid]["is_admin"] = True
return json.dumps(
{
"success": True,
"total_privileged_users": len(users),
"users": users,
},
indent=2,
)
async def _alter_guild_privileges(
target_user_id: str,
action: str,
privileges: list,
guild_id: str = "",
ctx: ToolContext | None = None,
) -> str:
"""Grant or revoke privilege bits at guild scope.
Writes to ``stargazer:guild_privileges:{guild_id}:{user_id}``.
Bits with resolution mode DANGEROUS (ALTER_PRIVILEGES, UNSANDBOXED_EXEC,
SHADOW_BAN_ADMIN) are rejected — they only resolve globally.
"""
user_id = getattr(ctx, "user_id", "") or ""
redis = getattr(ctx, "redis", None)
config = getattr(ctx, "config", None)
if redis is None:
return json.dumps({"success": False, "error": "Redis not available"})
# Resolve guild_id from context if not provided
guild_id = (guild_id or "").strip()
if not guild_id:
guild_id = getattr(ctx, "guild_id", "") or ""
if not guild_id:
return json.dumps(
{
"success": False,
"error": "guild_id is required (not in a guild context).",
}
)
# Caller must have ALTER_PRIVILEGES or GUILD_ADMIN
caller_mask = await get_user_privileges(redis, user_id, config)
has_alter = bool(caller_mask & (1 << PRIVILEGES["ALTER_PRIVILEGES"]))
has_guild_admin = bool(caller_mask & (1 << PRIVILEGES["GUILD_ADMIN"]))
if not (has_alter or has_guild_admin):
return json.dumps(
{
"success": False,
"error": "Requires ALTER_PRIVILEGES or GUILD_ADMIN.",
}
)
requested_mask, bad_names = _resolve_privileges(privileges)
if requested_mask == 0 and not bad_names:
return json.dumps({"success": False, "error": "No valid privileges specified."})
# Block dangerous bits at scoped level
dangerous_requested = requested_mask & sum(1 << b for b in DANGEROUS_BITS)
if dangerous_requested:
return json.dumps(
{
"success": False,
"error": (
f"Bits {_mask_to_names(dangerous_requested)} are DANGEROUS_BITS "
"and can only be set at global scope."
),
}
)
# Scope: caller can only grant what they have
allowed_mask = requested_mask & caller_mask
denied_mask = requested_mask & ~caller_mask
current = await _get_scoped_mask(redis, "guild", guild_id, None, target_user_id)
if current is not None:
target_mask = current
else:
# Inherit effective global mask so granting one bit doesn't silently
# revoke all others via RES_NORMAL channel-wins-over-global logic.
target_mask = await get_user_privileges(redis, target_user_id, config=None)
action = action.lower()
if action == "grant":
new_mask = target_mask | allowed_mask
elif action == "revoke":
new_mask = target_mask & ~allowed_mask
else:
return json.dumps({"success": False, "error": f"Unknown action '{action}'."})
await _set_scoped_mask(redis, "guild", guild_id, None, target_user_id, new_mask)
result: dict[str, Any] = {
"success": True,
"scope": "guild",
"guild_id": guild_id,
"action": action,
"target_user_id": target_user_id,
"previous": hex(target_mask),
"new": hex(new_mask),
"applied": _mask_to_names(allowed_mask),
}
if denied_mask:
result["denied"] = _mask_to_names(denied_mask)
if bad_names:
result["unrecognized"] = bad_names
logger.info(
"Guild %s: user %s %sd scoped privileges for %s: %s -> %s",
guild_id,
user_id,
action,
target_user_id,
hex(target_mask),
hex(new_mask),
)
return json.dumps(result, indent=2)
async def _alter_channel_privileges(
target_user_id: str,
action: str,
privileges: list,
guild_id: str = "",
channel_id: str = "",
ctx: ToolContext | None = None,
) -> str:
"""Grant or revoke privilege bits at channel scope.
Writes to ``stargazer:channel_privileges:{guild_id}:{channel_id}:{user_id}``.
Bits with resolution mode DANGEROUS are rejected — they only resolve globally.
"""
user_id = getattr(ctx, "user_id", "") or ""
redis = getattr(ctx, "redis", None)
config = getattr(ctx, "config", None)
if redis is None:
return json.dumps({"success": False, "error": "Redis not available"})
guild_id = (guild_id or "").strip()
if not guild_id:
guild_id = getattr(ctx, "guild_id", "") or ""
if not guild_id:
return json.dumps(
{
"success": False,
"error": "guild_id is required (not in a guild context).",
}
)
channel_id = (channel_id or "").strip()
if not channel_id:
channel_id = getattr(ctx, "channel_id", "") or ""
if not channel_id:
return json.dumps(
{
"success": False,
"error": "channel_id is required.",
}
)
# Caller must have ALTER_PRIVILEGES, GUILD_ADMIN, or CHANNEL_ADMIN
caller_mask = await get_user_privileges(redis, user_id, config)
has_alter = bool(caller_mask & (1 << PRIVILEGES["ALTER_PRIVILEGES"]))
has_guild_admin = bool(caller_mask & (1 << PRIVILEGES["GUILD_ADMIN"]))
has_channel_admin = bool(caller_mask & (1 << PRIVILEGES["CHANNEL_ADMIN"]))
if not (has_alter or has_guild_admin or has_channel_admin):
return json.dumps(
{
"success": False,
"error": "Requires ALTER_PRIVILEGES, GUILD_ADMIN, or CHANNEL_ADMIN.",
}
)
requested_mask, bad_names = _resolve_privileges(privileges)
if requested_mask == 0 and not bad_names:
return json.dumps({"success": False, "error": "No valid privileges specified."})
dangerous_requested = requested_mask & sum(1 << b for b in DANGEROUS_BITS)
if dangerous_requested:
return json.dumps(
{
"success": False,
"error": (
f"Bits {_mask_to_names(dangerous_requested)} are DANGEROUS_BITS "
"and can only be set at global scope."
),
}
)
allowed_mask = requested_mask & caller_mask
denied_mask = requested_mask & ~caller_mask
current = await _get_scoped_mask(
redis, "channel", guild_id, channel_id, target_user_id
)
if current is not None:
target_mask = current
else:
# Inherit the guild mask (falling back to global) so granting one
# channel-level bit doesn't silently revoke all others.
inherited = await _get_scoped_mask(
redis, "guild", guild_id, None, target_user_id
)
target_mask = (
inherited
if inherited is not None
else await get_user_privileges(
redis,
target_user_id,
config=None,
)
)
action = action.lower()
if action == "grant":
new_mask = target_mask | allowed_mask
elif action == "revoke":
new_mask = target_mask & ~allowed_mask
else:
return json.dumps({"success": False, "error": f"Unknown action '{action}'."})
await _set_scoped_mask(
redis, "channel", guild_id, channel_id, target_user_id, new_mask
)
result: dict[str, Any] = {
"success": True,
"scope": "channel",
"guild_id": guild_id,
"channel_id": channel_id,
"action": action,
"target_user_id": target_user_id,
"previous": hex(target_mask),
"new": hex(new_mask),
"applied": _mask_to_names(allowed_mask),
}
if denied_mask:
result["denied"] = _mask_to_names(denied_mask)
if bad_names:
result["unrecognized"] = bad_names
logger.info(
"Channel %s/%s: user %s %sd scoped privileges for %s: %s -> %s",
guild_id,
channel_id,
user_id,
action,
target_user_id,
hex(target_mask),
hex(new_mask),
)
return json.dumps(result, indent=2)
async def _stargazer_full_revoke(
target_user_id: str,
*,
redis: Any,
config: Any,
caller_id: str,
reason: str = "",
) -> dict[str, Any]:
"""Strip ``STARGAZER_USE`` (bit 63) from a user at every scope.
Enforces a hard, un-bypassable ban: clears bit 63 in the global mask and
then SCANs every guild- and channel-scoped key
(``stargazer:guild_privileges:*`` / ``stargazer:channel_privileges:*``) whose
trailing segment is the target user, clearing the bit wherever it is set so
no scoped override can re-grant access. Only bit 63 is touched; all other
privilege bits are left intact (use :func:`_purge_all_privileges` to wipe
everything). Issues multiple Redis GET/SET calls and logs an audit line.
Called by :func:`_stargazer_full_revoke_wrapper` (which performs the auth and
admin-safety checks) and exercised directly in
``tests/test_stargazer_full_revoke.py``.
Args:
target_user_id: User whose access is being revoked.
redis: Async Redis client.
config: Bot config (accepted for symmetry; not consulted here).
caller_id: ID of the actor, recorded in the audit log.
reason: Optional human-readable reason, recorded in the audit log.
Returns:
A results dict with ``success``, ``global_was_set``, and the count of
``guild_keys_cleared`` / ``channel_keys_cleared``.
"""
bit = 63
flag = 1 << bit
results = {
"success": True,
"target_user_id": target_user_id,
"global_was_set": False,
"guild_keys_cleared": 0,
"channel_keys_cleared": 0,
}
# 1. Global scope
global_key = _redis_key(target_user_id)
raw_global = await redis.get(global_key)
if raw_global is not None:
mask = int(raw_global)
if mask & flag:
new_mask = mask & ~flag
await redis.set(global_key, str(new_mask))
results["global_was_set"] = True
# 2. Scoped scans
# Patterns to scan
scans = [
("guild", f"{GUILD_KEY_PREFIX}:*"),
("channel", f"{CHANNEL_KEY_PREFIX}:*"),
]
for scope_type, pattern in scans:
cursor = b"0"
while True:
cursor, keys = await redis.scan(cursor, match=pattern, count=200)
for key in keys:
key_str = key.decode() if isinstance(key, bytes) else key
# Suffix is always the user_id
uid = key_str.rsplit(":", 1)[-1]
if uid == target_user_id:
raw = await redis.get(key)
if raw is not None:
mask = int(raw)
if mask & flag:
new_mask = mask & ~flag
await redis.set(key, str(new_mask))
if scope_type == "guild":
results["guild_keys_cleared"] += 1
else:
results["channel_keys_cleared"] += 1
if cursor == b"0" or cursor == 0:
break
logger.info(
"Full revoke: caller %s revoked STARGAZER_USE for %s. Results: %s. Reason: %s",
caller_id,
target_user_id,
results,
reason,
)
return results
async def _purge_all_privileges(
target_user_id: str,
*,
redis: Any,
config: Any,
caller_id: str,
reason: str = "",
) -> dict[str, Any]:
"""Zero out the entire privilege bitmask for a user at all scopes.
Unlike _stargazer_full_revoke which only clears bit 63, this deletes
the key entirely (equivalent to mask = 0) at global, guild, and channel
scopes, stripping every privilege bit.
"""
results = {
"success": True,
"target_user_id": target_user_id,
"global_cleared": False,
"guild_keys_cleared": 0,
"channel_keys_cleared": 0,
}
# 1. Global scope — delete the key entirely (mask → 0)
global_key = _redis_key(target_user_id)
deleted = await redis.delete(global_key)
results["global_cleared"] = bool(deleted)
# 2. Scoped scans — delete guild and channel keys for this user
scans = [
("guild", f"{GUILD_KEY_PREFIX}:*"),
("channel", f"{CHANNEL_KEY_PREFIX}:*"),
]
for scope_type, pattern in scans:
cursor = b"0"
while True:
cursor, keys = await redis.scan(cursor, match=pattern, count=200)
for key in keys:
key_str = key.decode() if isinstance(key, bytes) else key
uid = key_str.rsplit(":", 1)[-1]
if uid == target_user_id:
await redis.delete(key)
if scope_type == "guild":
results["guild_keys_cleared"] += 1
else:
results["channel_keys_cleared"] += 1
if cursor == b"0" or cursor == 0:
break
logger.warning(
"Purge all privileges: caller %s wiped ALL bits for %s. Results: %s. Reason: %s",
caller_id,
target_user_id,
results,
reason,
)
return results
async def _purge_all_privileges_wrapper(
target_user_id: str,
reason: str = "",
ctx: "ToolContext | None" = None,
) -> str:
"""Authorize and run a full privilege wipe for the ``purge_all_privileges`` tool.
The LLM-facing front end for :func:`_purge_all_privileges`: it pulls the
caller, Redis client, and config off the :class:`ToolContext`, requires the
caller to hold ``ALTER_PRIVILEGES``, and refuses to target a configured admin
before delegating the destructive scan-and-delete to the inner function.
Reads and deletes Redis keys (via the delegate) and returns a JSON string.
Not called directly in code — dispatched by name through this module's
``TOOLS`` registry, which ``tool_loader.py`` discovers via
``getattr(module, "TOOLS")``.
Args:
target_user_id: User whose entire privilege mask will be zeroed.
reason: Optional reason string, forwarded to the audit log.
ctx: Tool context supplying ``user_id``, ``redis``, and ``config``.
Returns:
A JSON string: the delegate's results on success, or an error object
when Redis is unavailable, the caller lacks ``ALTER_PRIVILEGES``, or the
target is an admin.
"""
user_id = getattr(ctx, "user_id", "") or ""
redis = getattr(ctx, "redis", None)
config = getattr(ctx, "config", None)
if redis is None:
return json.dumps({"success": False, "error": "Redis not available"})
caller_mask = await get_user_privileges(redis, user_id, config)
if not (caller_mask & (1 << PRIVILEGES["ALTER_PRIVILEGES"])):
return json.dumps(
{
"success": False,
"error": "You do not have the ALTER_PRIVILEGES privilege.",
}
)
if _is_admin(target_user_id, config):
return json.dumps(
{
"success": False,
"error": f"Refused: User {target_user_id} is a configured admin.",
}
)
result = await _purge_all_privileges(
target_user_id=target_user_id,
redis=redis,
config=config,
caller_id=user_id,
reason=reason,
)
return json.dumps(result, indent=2)
async def _stargazer_full_revoke_wrapper(
target_user_id: str,
reason: str = "",
ctx: ToolContext | None = None,
) -> str:
"""Authorize and run a full access ban for the ``stargazer_full_revoke`` tool.
The LLM-facing front end for :func:`_stargazer_full_revoke`: it reads the
caller, Redis client, and config from the :class:`ToolContext`, requires the
caller to hold ``ALTER_PRIVILEGES``, and refuses to target a configured admin
before delegating the all-scope revoke of ``STARGAZER_USE`` (bit 63). Reads
and rewrites Redis keys (via the delegate) and returns a JSON string. Not
called directly in code — dispatched by name through this module's ``TOOLS``
registry, which ``tool_loader.py`` discovers via ``getattr(module, "TOOLS")``.
Args:
target_user_id: User to fully revoke access from.
reason: Optional reason string, forwarded to the audit log.
ctx: Tool context supplying ``user_id``, ``redis``, and ``config``.
Returns:
A JSON string: the delegate's results on success, or an error object
when Redis is unavailable, the caller lacks ``ALTER_PRIVILEGES``, or the
target is an admin.
"""
user_id = getattr(ctx, "user_id", "") or ""
redis = getattr(ctx, "redis", None)
config = getattr(ctx, "config", None)
if redis is None:
return json.dumps({"success": False, "error": "Redis not available"})
# Auth check: caller must have ALTER_PRIVILEGES
caller_mask = await get_user_privileges(redis, user_id, config)
if not (caller_mask & (1 << PRIVILEGES["ALTER_PRIVILEGES"])):
return json.dumps(
{
"success": False,
"error": "You do not have the ALTER_PRIVILEGES privilege.",
}
)
# Safety: refuse to revoke admins
if _is_admin(target_user_id, config):
return json.dumps(
{
"success": False,
"error": f"Refused: User {target_user_id} is a configured admin.",
}
)
result = await _stargazer_full_revoke(
target_user_id=target_user_id,
redis=redis,
config=config,
caller_id=user_id,
reason=reason,
)
return json.dumps(result, indent=2)
# ------------------------------------------------------------------
# Multi-tool registration
# ------------------------------------------------------------------
_ALTER_PRIVILEGES_DESCRIPTION = (
"Grant or revoke privilege bits for a user. Requires ALTER_PRIVILEGES. "
"You can only grant or revoke bits that you yourself have enabled. "
"Known privileges (name=bit): " + _privileges_catalog_text()
)
_PRIVILEGES_PARAM_DESCRIPTION = (
"Privilege names from the registry or decimal bit numbers 0-63. "
"Known names: " + _privileges_catalog_text()
)
_TOOLS_GET_DESCRIPTION = (
"View the privilege bitmask for a user (hex, decoded names, is_admin). "
"Response includes available_privilege_names (full registry). "
"Defaults to the calling user if target_user_id is omitted. "
"Reading another user requires ALTER_PRIVILEGES."
)
_GUILD_PRIV_DESCRIPTION = (
"Grant or revoke privilege bits at GUILD scope. "
"Guild-scoped privileges override the global mask for a specific server "
"(for bits with NORMAL resolution mode). "
"Requires ALTER_PRIVILEGES or GUILD_ADMIN. "
"DANGEROUS mode bits (ALTER_PRIVILEGES, UNSANDBOXED_EXEC, SHADOW_BAN_ADMIN) "
"cannot be set at guild scope — they only resolve globally. "
"Known privileges (name=bit): " + _privileges_catalog_text()
)
_CHANNEL_PRIV_DESCRIPTION = (
"Grant or revoke privilege bits at CHANNEL scope. "
"Channel-scoped privileges override both global and guild masks "
"(for bits with NORMAL resolution mode). "
"Requires ALTER_PRIVILEGES, GUILD_ADMIN, or CHANNEL_ADMIN. "
"DANGEROUS mode bits (ALTER_PRIVILEGES, UNSANDBOXED_EXEC, SHADOW_BAN_ADMIN) "
"cannot be set at channel scope — they only resolve globally. "
"Known privileges (name=bit): " + _privileges_catalog_text()
)
TOOLS = [
{
"name": "alter_privileges",
"description": _ALTER_PRIVILEGES_DESCRIPTION,
"parameters": {
"type": "object",
"properties": {
"target_user_id": {
"type": "string",
"description": "Platform user ID of the target user.",
},
"action": {
"type": "string",
"enum": ["grant", "revoke"],
"description": "Whether to grant or revoke the specified privileges.",
},
"privileges": {
"type": "array",
"items": {"type": "string"},
"description": _PRIVILEGES_PARAM_DESCRIPTION,
},
},
"required": ["target_user_id", "action", "privileges"],
},
"handler": _alter_privileges,
},
{
"name": "get_privileges",
"description": _TOOLS_GET_DESCRIPTION,
"parameters": {
"type": "object",
"properties": {
"target_user_id": {
"type": "string",
"description": "Platform user ID to look up. Omit to check your own privileges.",
},
},
},
"handler": _get_privileges,
},
{
"name": "audit_privileges",
"description": (
"List users with a non-zero privilege mask in Redis plus admins "
"from config (admins implicitly have all bits). Requires "
"ALTER_PRIVILEGES. For the canonical name=bit registry, use "
"get_privileges or see alter_privileges."
),
"parameters": {
"type": "object",
"properties": {},
},
"handler": _audit_privileges,
},
{
"name": "alter_guild_privileges",
"description": _GUILD_PRIV_DESCRIPTION,
"parameters": {
"type": "object",
"properties": {
"target_user_id": {
"type": "string",
"description": "Platform user ID of the target user.",
},
"action": {
"type": "string",
"enum": ["grant", "revoke"],
"description": "Whether to grant or revoke.",
},
"privileges": {
"type": "array",
"items": {"type": "string"},
"description": _PRIVILEGES_PARAM_DESCRIPTION,
},
"guild_id": {
"type": "string",
"description": (
"Target guild/server ID. Defaults to the current "
"guild if omitted."
),
},
},
"required": ["target_user_id", "action", "privileges"],
},
"handler": _alter_guild_privileges,
},
{
"name": "alter_channel_privileges",
"description": _CHANNEL_PRIV_DESCRIPTION,
"parameters": {
"type": "object",
"properties": {
"target_user_id": {
"type": "string",
"description": "Platform user ID of the target user.",
},
"action": {
"type": "string",
"enum": ["grant", "revoke"],
"description": "Whether to grant or revoke.",
},
"privileges": {
"type": "array",
"items": {"type": "string"},
"description": _PRIVILEGES_PARAM_DESCRIPTION,
},
"guild_id": {
"type": "string",
"description": (
"Target guild/server ID. Defaults to the current "
"guild if omitted."
),
},
"channel_id": {
"type": "string",
"description": (
"Target channel ID. Defaults to the current "
"channel if omitted."
),
},
},
"required": ["target_user_id", "action", "privileges"],
},
"handler": _alter_channel_privileges,
},
{
"name": "stargazer_full_revoke",
"description": (
"Revoke STARGAZER_USE (bit 63) for a user at ALL scopes — global, "
"guild, and channel. This is a complete ban that cannot be bypassed "
"by any scoped override. Requires ALTER_PRIVILEGES. Cannot target admins."
),
"parameters": {
"type": "object",
"properties": {
"target_user_id": {
"type": "string",
"description": "The user ID to fully revoke access from.",
},
"reason": {
"type": "string",
"description": "Reason for the revoke (logged).",
},
},
"required": ["target_user_id"],
},
"handler": _stargazer_full_revoke_wrapper,
},
{
"name": "purge_all_privileges",
"description": (
"Wipe the ENTIRE privilege bitmask (all 64 bits) for a user at ALL scopes — "
"global, guild, and channel. This removes every granted privilege, not just "
"STARGAZER_USE. Requires ALTER_PRIVILEGES. Cannot target configured admins."
),
"parameters": {
"type": "object",
"properties": {
"target_user_id": {
"type": "string",
"description": "The user ID whose entire privilege bitmask will be zeroed.",
},
"reason": {
"type": "string",
"description": "Reason for the purge (logged).",
},
},
"required": ["target_user_id"],
},
"handler": _purge_all_privileges_wrapper,
},
]