"""User privilege management with 64-bit bitmask system.
Each user has a 64-bit integer stored in Redis whose individual bits
represent specific privileges. Admins (``config.admin_user_ids``)
implicitly have all 64 bits set.
Other tool modules can import the helpers to check privileges::
from tools.alter_privileges import has_privilege, PRIVILEGES
"""
from __future__ import annotations
import json
import logging
from typing import Any, TYPE_CHECKING
if TYPE_CHECKING:
from tool_context import ToolContext
logger = logging.getLogger(__name__)
REDIS_KEY_PREFIX = "stargazer:user_privileges"
ALL_BITS = (1 << 64) - 1
PRIVILEGES: dict[str, int] = {
"ALTER_PRIVILEGES": 0,
"CORE_MEMORY": 1,
"UNSANDBOXED_EXEC": 2,
"SQLITE_CROSS_USER": 3,
"DUMP_NOTES": 4,
"DUMP_GOALS": 5,
"ANCHOR_ADMIN": 6,
"SUBAGENT_ACCESS": 7,
"WEB_SEARCH": 8,
"LONG_TERM_GOALS": 9,
"READ_DM": 10,
"CHAT_ANALYTICS": 11,
"BYPASS_RATELIMIT": 12,
}
_REVERSE: dict[int, str] = {v: k for k, v in PRIVILEGES.items()}
def _redis_key(user_id: str) -> str:
"""Internal helper: redis key.
Args:
user_id (str): Unique identifier for the user.
Returns:
str: Result string.
"""
return f"{REDIS_KEY_PREFIX}:{user_id}"
def _is_admin(user_id: str, config: Any) -> bool:
"""Internal helper: is admin.
Args:
user_id (str): Unique identifier for the user.
config (Any): Bot configuration object.
Returns:
bool: True on success, False otherwise.
"""
if config is None:
return False
admin_ids = getattr(config, "admin_user_ids", None) or []
return user_id in admin_ids
[docs]
async def get_user_privileges(redis: Any, user_id: str, config: Any = None) -> int:
"""Return the privilege bitmask for *user_id*.
Admins get all 64 bits. Returns ``0`` when the user has no stored
privileges and is not an admin.
"""
if _is_admin(user_id, config):
return ALL_BITS
if redis is None:
return 0
raw = await redis.get(_redis_key(user_id))
if raw is None:
return 0
return int(raw)
[docs]
async def has_privilege(redis: Any, user_id: str, bit: int, config: Any = None) -> bool:
"""Check whether *user_id* has a single privilege *bit* enabled."""
mask = await get_user_privileges(redis, user_id, config)
return bool(mask & (1 << bit))
def _resolve_privileges(names: list) -> tuple[int, list[str]]:
"""Convert a list of privilege names/bit numbers into a bitmask.
Returns ``(bitmask, list_of_bad_names)``.
"""
mask = 0
bad: list[str] = []
for entry in names:
if isinstance(entry, int) or (isinstance(entry, str) and entry.isdigit()):
bit = int(entry)
if 0 <= bit < 64:
mask |= 1 << bit
else:
bad.append(str(entry))
else:
name = str(entry).upper()
bit = PRIVILEGES.get(name)
if bit is not None:
mask |= 1 << bit
else:
bad.append(str(entry))
return mask, bad
def _mask_to_names(mask: int) -> list[str]:
"""Return human-readable names for every set bit."""
names: list[str] = []
for bit in range(64):
if mask & (1 << bit):
name = _REVERSE.get(bit, f"bit_{bit}")
names.append(name)
return names
# ------------------------------------------------------------------
# Tool handlers
# ------------------------------------------------------------------
async def _alter_privileges(
target_user_id: str,
action: str,
privileges: list,
ctx: ToolContext | None = None,
) -> str:
"""Internal helper: alter privileges.
Args:
target_user_id (str): The target user id value.
action (str): The action value.
privileges (list): The privileges value.
ctx (ToolContext | None): Tool execution context providing access to bot internals.
Returns:
str: Result string.
"""
user_id = getattr(ctx, "user_id", "") or ""
redis = getattr(ctx, "redis", None)
config = getattr(ctx, "config", None)
if redis is None:
return json.dumps({"success": False, "error": "Redis not available"})
caller_mask = await get_user_privileges(redis, user_id, config)
if not (caller_mask & (1 << PRIVILEGES["ALTER_PRIVILEGES"])):
return json.dumps({
"success": False,
"error": "You do not have the ALTER_PRIVILEGES privilege.",
})
requested_mask, bad_names = _resolve_privileges(privileges)
if requested_mask == 0 and not bad_names:
return json.dumps({"success": False, "error": "No valid privileges specified."})
allowed_mask = requested_mask & caller_mask
denied_mask = requested_mask & ~caller_mask
target_mask = await get_user_privileges(redis, target_user_id, config=None)
action = action.lower()
if action == "grant":
new_mask = target_mask | allowed_mask
elif action == "revoke":
new_mask = target_mask & ~allowed_mask
else:
return json.dumps({"success": False, "error": f"Unknown action '{action}'. Use 'grant' or 'revoke'."})
await redis.set(_redis_key(target_user_id), str(new_mask))
result: dict[str, Any] = {
"success": True,
"action": action,
"target_user_id": target_user_id,
"previous": hex(target_mask),
"new": hex(new_mask),
"applied": _mask_to_names(allowed_mask),
}
if denied_mask:
result["denied"] = _mask_to_names(denied_mask)
result["denied_reason"] = "You do not have these privileges yourself."
if bad_names:
result["unrecognized"] = bad_names
logger.info(
"User %s %sd privileges for %s: %s -> %s (applied=%s denied=%s)",
user_id, action, target_user_id,
hex(target_mask), hex(new_mask),
hex(allowed_mask), hex(denied_mask),
)
return json.dumps(result, indent=2)
async def _get_privileges(
target_user_id: str = "",
ctx: ToolContext | None = None,
) -> str:
"""Internal helper: get privileges.
Args:
target_user_id (str): The target user id value.
ctx (ToolContext | None): Tool execution context providing access to bot internals.
Returns:
str: Result string.
"""
user_id = getattr(ctx, "user_id", "") or ""
redis = getattr(ctx, "redis", None)
config = getattr(ctx, "config", None)
if redis is None:
return json.dumps({"success": False, "error": "Redis not available"})
target = (target_user_id or "").strip() or user_id
if target != user_id:
caller_mask = await get_user_privileges(redis, user_id, config)
if not (caller_mask & (1 << PRIVILEGES["ALTER_PRIVILEGES"])):
return json.dumps({
"success": False,
"error": (
"Reading another user's privileges requires "
"ALTER_PRIVILEGES."
),
})
mask = await get_user_privileges(redis, target, config)
return json.dumps({
"success": True,
"user_id": target,
"mask_hex": hex(mask),
"mask_decimal": mask,
"privileges": _mask_to_names(mask),
"is_admin": _is_admin(target, config),
"available_privilege_names": list(PRIVILEGES.keys()),
}, indent=2)
async def _audit_privileges(
ctx: ToolContext | None = None,
) -> str:
"""List every user with elevated privileges and what those privileges are."""
user_id = getattr(ctx, "user_id", "") or ""
redis = getattr(ctx, "redis", None)
config = getattr(ctx, "config", None)
if redis is None:
return json.dumps({"success": False, "error": "Redis not available"})
# Caller must have ALTER_PRIVILEGES to see the full audit.
caller_mask = await get_user_privileges(redis, user_id, config)
if not (caller_mask & (1 << PRIVILEGES["ALTER_PRIVILEGES"])):
return json.dumps({
"success": False,
"error": "You do not have the ALTER_PRIVILEGES privilege.",
})
users: dict[str, Any] = {}
# 1. Scan Redis for every stored privilege bitmask.
prefix = f"{REDIS_KEY_PREFIX}:"
cursor = b"0"
while True:
cursor, keys = await redis.scan(cursor, match=f"{prefix}*", count=200)
for key in keys:
key_str = key.decode() if isinstance(key, bytes) else key
uid = key_str[len(prefix):]
raw = await redis.get(key)
if raw is None:
continue
mask = int(raw)
if mask == 0:
continue
users[uid] = {
"mask_hex": hex(mask),
"privileges": _mask_to_names(mask),
"is_admin": _is_admin(uid, config),
}
if cursor == b"0" or cursor == 0:
break
# 2. Ensure admins from config are included even if they have
# no explicit Redis entry (they implicitly have ALL bits).
admin_ids = getattr(config, "admin_user_ids", None) or []
for aid in admin_ids:
if aid not in users:
users[aid] = {
"mask_hex": hex(ALL_BITS),
"privileges": ["ALL (admin)"],
"is_admin": True,
}
else:
users[aid]["is_admin"] = True
return json.dumps({
"success": True,
"total_privileged_users": len(users),
"users": users,
}, indent=2)
# ------------------------------------------------------------------
# Multi-tool registration
# ------------------------------------------------------------------
TOOLS = [
{
"name": "alter_privileges",
"description": (
"Grant or revoke privilege bits for a user. "
"Requires the ALTER_PRIVILEGES privilege. "
"You can only modify bits that you yourself have enabled."
),
"parameters": {
"type": "object",
"properties": {
"target_user_id": {
"type": "string",
"description": "Platform user ID of the target user.",
},
"action": {
"type": "string",
"enum": ["grant", "revoke"],
"description": "Whether to grant or revoke the specified privileges.",
},
"privileges": {
"type": "array",
"items": {"type": "string"},
"description": (
"List of privilege names (e.g. 'CORE_MEMORY', 'UNSANDBOXED_EXEC') "
"or bit numbers (e.g. '0', '1')."
),
},
},
"required": ["target_user_id", "action", "privileges"],
},
"handler": _alter_privileges,
},
{
"name": "get_privileges",
"description": (
"View the privilege bitmask for a user. "
"Defaults to the calling user if no target is specified."
),
"parameters": {
"type": "object",
"properties": {
"target_user_id": {
"type": "string",
"description": "Platform user ID to look up. Omit to check your own privileges.",
},
},
},
"handler": _get_privileges,
},
{
"name": "audit_privileges",
"description": (
"List every user with elevated privileges and what those "
"privileges are. Requires ALTER_PRIVILEGES."
),
"parameters": {
"type": "object",
"properties": {},
},
"handler": _audit_privileges,
},
]