Source code for tools.alter_privileges

"""User privilege management with 64-bit bitmask system.

Each user has a 64-bit integer stored in Redis whose individual bits
represent specific privileges.  Admins (``config.admin_user_ids``)
implicitly have all 64 bits set.

Other tool modules can import the helpers to check privileges::

    from tools.alter_privileges import has_privilege, PRIVILEGES
"""

from __future__ import annotations

import json
import logging
from typing import Any, TYPE_CHECKING

if TYPE_CHECKING:
    from tool_context import ToolContext

logger = logging.getLogger(__name__)

REDIS_KEY_PREFIX = "stargazer:user_privileges"

ALL_BITS = (1 << 64) - 1

PRIVILEGES: dict[str, int] = {
    "ALTER_PRIVILEGES": 0,
    "CORE_MEMORY": 1,
    "UNSANDBOXED_EXEC": 2,
    "SQLITE_CROSS_USER": 3,
    "DUMP_NOTES": 4,
    "DUMP_GOALS": 5,
    "ANCHOR_ADMIN": 6,
    "SUBAGENT_ACCESS": 7,
    "WEB_SEARCH": 8,
    "LONG_TERM_GOALS": 9,
    "READ_DM": 10,
    "CHAT_ANALYTICS": 11,
    "BYPASS_RATELIMIT": 12,
}

_REVERSE: dict[int, str] = {v: k for k, v in PRIVILEGES.items()}


def _redis_key(user_id: str) -> str:
    """Internal helper: redis key.

        Args:
            user_id (str): Unique identifier for the user.

        Returns:
            str: Result string.
        """
    return f"{REDIS_KEY_PREFIX}:{user_id}"


def _is_admin(user_id: str, config: Any) -> bool:
    """Internal helper: is admin.

        Args:
            user_id (str): Unique identifier for the user.
            config (Any): Bot configuration object.

        Returns:
            bool: True on success, False otherwise.
        """
    if config is None:
        return False
    admin_ids = getattr(config, "admin_user_ids", None) or []
    return user_id in admin_ids


[docs] async def get_user_privileges(redis: Any, user_id: str, config: Any = None) -> int: """Return the privilege bitmask for *user_id*. Admins get all 64 bits. Returns ``0`` when the user has no stored privileges and is not an admin. """ if _is_admin(user_id, config): return ALL_BITS if redis is None: return 0 raw = await redis.get(_redis_key(user_id)) if raw is None: return 0 return int(raw)
[docs] async def has_privilege(redis: Any, user_id: str, bit: int, config: Any = None) -> bool: """Check whether *user_id* has a single privilege *bit* enabled.""" mask = await get_user_privileges(redis, user_id, config) return bool(mask & (1 << bit))
def _resolve_privileges(names: list) -> tuple[int, list[str]]: """Convert a list of privilege names/bit numbers into a bitmask. Returns ``(bitmask, list_of_bad_names)``. """ mask = 0 bad: list[str] = [] for entry in names: if isinstance(entry, int) or (isinstance(entry, str) and entry.isdigit()): bit = int(entry) if 0 <= bit < 64: mask |= 1 << bit else: bad.append(str(entry)) else: name = str(entry).upper() bit = PRIVILEGES.get(name) if bit is not None: mask |= 1 << bit else: bad.append(str(entry)) return mask, bad def _mask_to_names(mask: int) -> list[str]: """Return human-readable names for every set bit.""" names: list[str] = [] for bit in range(64): if mask & (1 << bit): name = _REVERSE.get(bit, f"bit_{bit}") names.append(name) return names # ------------------------------------------------------------------ # Tool handlers # ------------------------------------------------------------------ async def _alter_privileges( target_user_id: str, action: str, privileges: list, ctx: ToolContext | None = None, ) -> str: """Internal helper: alter privileges. Args: target_user_id (str): The target user id value. action (str): The action value. privileges (list): The privileges value. ctx (ToolContext | None): Tool execution context providing access to bot internals. Returns: str: Result string. """ user_id = getattr(ctx, "user_id", "") or "" redis = getattr(ctx, "redis", None) config = getattr(ctx, "config", None) if redis is None: return json.dumps({"success": False, "error": "Redis not available"}) caller_mask = await get_user_privileges(redis, user_id, config) if not (caller_mask & (1 << PRIVILEGES["ALTER_PRIVILEGES"])): return json.dumps({ "success": False, "error": "You do not have the ALTER_PRIVILEGES privilege.", }) requested_mask, bad_names = _resolve_privileges(privileges) if requested_mask == 0 and not bad_names: return json.dumps({"success": False, "error": "No valid privileges specified."}) allowed_mask = requested_mask & caller_mask denied_mask = requested_mask & ~caller_mask target_mask = await get_user_privileges(redis, target_user_id, config=None) action = action.lower() if action == "grant": new_mask = target_mask | allowed_mask elif action == "revoke": new_mask = target_mask & ~allowed_mask else: return json.dumps({"success": False, "error": f"Unknown action '{action}'. Use 'grant' or 'revoke'."}) await redis.set(_redis_key(target_user_id), str(new_mask)) result: dict[str, Any] = { "success": True, "action": action, "target_user_id": target_user_id, "previous": hex(target_mask), "new": hex(new_mask), "applied": _mask_to_names(allowed_mask), } if denied_mask: result["denied"] = _mask_to_names(denied_mask) result["denied_reason"] = "You do not have these privileges yourself." if bad_names: result["unrecognized"] = bad_names logger.info( "User %s %sd privileges for %s: %s -> %s (applied=%s denied=%s)", user_id, action, target_user_id, hex(target_mask), hex(new_mask), hex(allowed_mask), hex(denied_mask), ) return json.dumps(result, indent=2) async def _get_privileges( target_user_id: str = "", ctx: ToolContext | None = None, ) -> str: """Internal helper: get privileges. Args: target_user_id (str): The target user id value. ctx (ToolContext | None): Tool execution context providing access to bot internals. Returns: str: Result string. """ user_id = getattr(ctx, "user_id", "") or "" redis = getattr(ctx, "redis", None) config = getattr(ctx, "config", None) if redis is None: return json.dumps({"success": False, "error": "Redis not available"}) target = (target_user_id or "").strip() or user_id if target != user_id: caller_mask = await get_user_privileges(redis, user_id, config) if not (caller_mask & (1 << PRIVILEGES["ALTER_PRIVILEGES"])): return json.dumps({ "success": False, "error": ( "Reading another user's privileges requires " "ALTER_PRIVILEGES." ), }) mask = await get_user_privileges(redis, target, config) return json.dumps({ "success": True, "user_id": target, "mask_hex": hex(mask), "mask_decimal": mask, "privileges": _mask_to_names(mask), "is_admin": _is_admin(target, config), "available_privilege_names": list(PRIVILEGES.keys()), }, indent=2) async def _audit_privileges( ctx: ToolContext | None = None, ) -> str: """List every user with elevated privileges and what those privileges are.""" user_id = getattr(ctx, "user_id", "") or "" redis = getattr(ctx, "redis", None) config = getattr(ctx, "config", None) if redis is None: return json.dumps({"success": False, "error": "Redis not available"}) # Caller must have ALTER_PRIVILEGES to see the full audit. caller_mask = await get_user_privileges(redis, user_id, config) if not (caller_mask & (1 << PRIVILEGES["ALTER_PRIVILEGES"])): return json.dumps({ "success": False, "error": "You do not have the ALTER_PRIVILEGES privilege.", }) users: dict[str, Any] = {} # 1. Scan Redis for every stored privilege bitmask. prefix = f"{REDIS_KEY_PREFIX}:" cursor = b"0" while True: cursor, keys = await redis.scan(cursor, match=f"{prefix}*", count=200) for key in keys: key_str = key.decode() if isinstance(key, bytes) else key uid = key_str[len(prefix):] raw = await redis.get(key) if raw is None: continue mask = int(raw) if mask == 0: continue users[uid] = { "mask_hex": hex(mask), "privileges": _mask_to_names(mask), "is_admin": _is_admin(uid, config), } if cursor == b"0" or cursor == 0: break # 2. Ensure admins from config are included even if they have # no explicit Redis entry (they implicitly have ALL bits). admin_ids = getattr(config, "admin_user_ids", None) or [] for aid in admin_ids: if aid not in users: users[aid] = { "mask_hex": hex(ALL_BITS), "privileges": ["ALL (admin)"], "is_admin": True, } else: users[aid]["is_admin"] = True return json.dumps({ "success": True, "total_privileged_users": len(users), "users": users, }, indent=2) # ------------------------------------------------------------------ # Multi-tool registration # ------------------------------------------------------------------ TOOLS = [ { "name": "alter_privileges", "description": ( "Grant or revoke privilege bits for a user. " "Requires the ALTER_PRIVILEGES privilege. " "You can only modify bits that you yourself have enabled." ), "parameters": { "type": "object", "properties": { "target_user_id": { "type": "string", "description": "Platform user ID of the target user.", }, "action": { "type": "string", "enum": ["grant", "revoke"], "description": "Whether to grant or revoke the specified privileges.", }, "privileges": { "type": "array", "items": {"type": "string"}, "description": ( "List of privilege names (e.g. 'CORE_MEMORY', 'UNSANDBOXED_EXEC') " "or bit numbers (e.g. '0', '1')." ), }, }, "required": ["target_user_id", "action", "privileges"], }, "handler": _alter_privileges, }, { "name": "get_privileges", "description": ( "View the privilege bitmask for a user. " "Defaults to the calling user if no target is specified." ), "parameters": { "type": "object", "properties": { "target_user_id": { "type": "string", "description": "Platform user ID to look up. Omit to check your own privileges.", }, }, }, "handler": _get_privileges, }, { "name": "audit_privileges", "description": ( "List every user with elevated privileges and what those " "privileges are. Requires ALTER_PRIVILEGES." ), "parameters": { "type": "object", "properties": {}, }, "handler": _audit_privileges, }, ]