Source code for tools.user_variables

"""Per-user per-channel variables (v3)

Variables are stored as Redis hashes and auto-injected
into context for recent active users.
"""

from __future__ import annotations

import jsonutil as json
import logging
from datetime import datetime, timezone
from typing import Any, Dict, List, TYPE_CHECKING

if TYPE_CHECKING:
    from tool_context import ToolContext

logger = logging.getLogger(__name__)

USERVARS_KEY_PREFIX = "stargazer:uservars:channel"

TTL_PRESETS = {
    "1hour": 3600,
    "6hours": 21600,
    "24hours": 86400,
    "1week": 604800,
    "5ever": None,
}

DEFAULT_TTL = "24hours"


def _var_key(channel_id: str, user_id: str) -> str:
    """Build the Redis hash key holding one user's variables in one channel.

    User variables are namespaced per channel and per user, so this composes
    ``USERVARS_KEY_PREFIX`` with the channel and user ids into the single hash key
    whose fields are variable names and whose values are JSON payloads. Centralizing
    the format keeps every read/write addressing the same key shape.

    This is a pure string helper with no side effects. It is called throughout this
    module by the handlers and context helpers that touch the per-user hash —
    ``_set_user_variable``, ``_get_user_variable``, ``_delete_user_variable``,
    ``_list_user_variables``, ``_clear_user_variables``, and
    ``get_user_variables_for_context`` — to issue ``hset``/``hget``/``hdel``/
    ``hgetall``.

    Args:
        channel_id: The Discord/Matrix/webchat channel identifier.
        user_id: The (typically raw, platform-stripped) user identifier.

    Returns:
        The Redis key ``stargazer:uservars:channel:{channel_id}:user:{user_id}``.
    """
    return f"{USERVARS_KEY_PREFIX}:{channel_id}" f":user:{user_id}"


async def _redis(ctx):
    """Return the shared async Redis client from the tool context, or raise.

    Extracts ``ctx.redis``, the connection the inference worker injects into every
    tool via the ``ToolContext`` and that all variable reads and writes in this
    module flow through. Raising early on a missing client gives a clear failure
    instead of an obscure ``AttributeError`` deeper in a handler.

    It is called at the top of every Redis-touching tool handler here
    (``_set_user_variable``, ``_get_user_variable``, ``_delete_user_variable``,
    ``_list_user_variables``, ``_clear_user_variables``, ``_dump_user_variables``)
    before any hash command.

    Args:
        ctx: The tool execution context expected to expose ``redis``.

    Returns:
        The async Redis client instance.

    Raises:
        RuntimeError: If the context exposes no Redis client.
    """
    r = getattr(ctx, "redis", None)
    if r is None:
        raise RuntimeError("Redis not available")
    return r


async def _get_user_aliases(redis_client: Any, user_id: str, config: Any = None) -> list[str]:
    """Resolve every cross-platform alias for a user via the identity registry.

    A single person may appear under different ids on Discord, Matrix and webchat,
    so variables set under one identity should be visible under the others. This
    expands a raw or ``platform:user_id`` value into the full set of linked
    ``platform:user_id`` aliases so the variable handlers can read and merge across
    them.

    It reads the platform list from ``config.configured_platforms`` when available
    (defaulting to discord/matrix/webchat), then delegates to
    ``services.identity_registry.IdentityRegistry.resolve_ingress_identity`` using
    the supplied ``redis_client`` — directly when the id is already platform-prefixed,
    otherwise by probing ``stargazer:identity:alias:{platform}:{user_id}`` in Redis to
    discover the owning platform first. When nothing resolves it falls back to a
    single-element list of the input. Within this module it is called by
    ``_get_user_variable``, ``_list_user_variables``, ``_dump_user_variables`` and
    ``get_user_variables_for_context``. (``wallet_manager.py`` defines its own
    same-named method; those grep hits are not callers of this function.)

    Args:
        redis_client: The async Redis client used for alias lookups and registry
            resolution; a falsy value short-circuits to ``[user_id]``.
        user_id: A raw or ``platform:user_id``-prefixed identifier.
        config: Optional config object; its ``configured_platforms`` narrows which
            platforms are probed.

    Returns:
        A list of ``platform:user_id`` aliases for the user, or ``[user_id]`` when
        no linked identity is found.
    """
    if not redis_client:
        return [user_id]
        
    platforms = ["discord", "matrix", "webchat"]
    if config and hasattr(config, "configured_platforms"):
        platforms = config.configured_platforms
        
    platform = None
    actual_uid = user_id
    
    if ":" in user_id:
        parts = user_id.split(":", 1)
        if parts[0].lower() in platforms:
            platform = parts[0].lower()
            actual_uid = parts[1]
            
    from services.identity_registry import IdentityRegistry
    if platform:
        _, aliases = await IdentityRegistry.resolve_ingress_identity(platform, actual_uid, redis_client)
        return aliases
        
    for plat in platforms:
        sg_uuid_raw = await redis_client.get(f"stargazer:identity:alias:{plat}:{user_id}")
        if sg_uuid_raw:
            _, aliases = await IdentityRegistry.resolve_ingress_identity(plat, user_id, redis_client)
            return aliases
            
    return [user_id]


async def _resolve_uservars_user_id(
    ctx: ToolContext,
    requested_user_id: str,
) -> tuple[str, str | None]:
    """Resolve which user id a variable operation should target, enforcing privilege.

    By default a user may only read and write their own variables, but a privileged
    operator can act on another user's. This decides the effective target: it returns
    the caller's own ``ctx.user_id`` unless a different ``requested_user_id`` is
    supplied, in which case the caller must hold ``ALTER_PRIVILEGES`` to be allowed to
    cross over.

    The privilege check lazily imports ``has_privilege`` and ``PRIVILEGES`` from
    ``tools.alter_privileges`` and queries the privilege store using ``ctx.redis`` and
    ``ctx.config``; on a denied or unavailable check it returns a ready-to-send JSON
    error string so the handler can short-circuit. It is called at the top of every
    target-scoped handler — ``_set_user_variable``, ``_get_user_variable``,
    ``_delete_user_variable``, ``_list_user_variables`` and ``_clear_user_variables``.

    Args:
        ctx: The tool execution context providing ``user_id``, ``redis`` and
            ``config``.
        requested_user_id: The user id the caller asked to operate on (may be empty
            or equal to the caller's own id, meaning "self").

    Returns:
        A ``(resolved_user_id, error)`` tuple. ``error`` is ``None`` when the
        operation may proceed against ``resolved_user_id``; otherwise ``error`` is a
        JSON-encoded ``{"success": False, "error": ...}`` string and
        ``resolved_user_id`` is the caller's own id.
    """
    self_id = (getattr(ctx, "user_id", "") or "").strip()
    req = (requested_user_id or "").strip()
    if not req or req == self_id:
        return self_id, None
    try:
        from tools.alter_privileges import has_privilege, PRIVILEGES

        r = getattr(ctx, "redis", None)
        config = getattr(ctx, "config", None)
        if r is None or config is None:
            return self_id, json.dumps(
                {
                    "success": False,
                    "error": "Cross-user variables require Redis and config.",
                }
            )
        if not await has_privilege(
            r,
            self_id,
            PRIVILEGES["ALTER_PRIVILEGES"],
            config,
        ):
            return self_id, json.dumps(
                {
                    "success": False,
                    "error": (
                        "Cross-user variable access requires " "ALTER_PRIVILEGES."
                    ),
                }
            )
        return req, None
    except ImportError:
        return self_id, json.dumps(
            {
                "success": False,
                "error": "Privilege check unavailable.",
            }
        )


# ---------------------------------------------------------------
# Tool handlers
# ---------------------------------------------------------------


async def _set_user_variable(
    variable_name: str,
    value: str,
    user_id: str = "",
    ttl_preset: str = "24hours",
    ctx: ToolContext | None = None,
) -> str:
    """Set a per-channel, per-user variable with a TTL preset.

    Backs the ``set_user_variable`` tool. It lets the bot remember user-specific
    state, preferences or context scoped to the current channel, stored so that the
    most recent active users' variables can later be auto-injected into the prompt.
    Inputs are validated (name 1-64 chars of ``[A-Za-z0-9_]``, value <= 4096 chars,
    ``ttl_preset`` one of ``TTL_PRESETS``) before any write.

    It resolves the effective target via ``_resolve_uservars_user_id`` (enforcing
    ``ALTER_PRIVILEGES`` for cross-user writes), then ``HSETs`` a JSON payload (value
    plus set-at timestamps and TTL metadata) into the per-user channel hash
    addressed by ``_var_key`` on the Redis client from ``_redis``. It also manages
    the key's expiry: for a finite preset it extends the hash TTL via ``expire`` only
    when the new TTL is longer than the current one, and for the ``5ever`` preset it
    calls ``persist`` to remove expiry. Dispatched dynamically via the ``handler``
    entry in the module-level ``TOOLS`` list; no direct in-repo callers.

    Args:
        variable_name: Variable name (1-64 alphanumeric/underscore characters).
        value: The value to store (<= 4096 characters).
        user_id: Optional target user id; defaults to the invoking user and requires
            ``ALTER_PRIVILEGES`` to differ.
        ttl_preset: One of ``TTL_PRESETS`` (e.g. ``24hours``, ``5ever``) controlling
            expiry.
        ctx: Tool context providing ``redis``, ``user_id``, ``channel_id`` and
            ``config``.

    Returns:
        A JSON string with ``success`` and either a confirmation plus the resolved
        channel/user/TTL details, or an ``error`` describing the validation or
        privilege failure.
    """
    if not variable_name or len(variable_name) > 64:
        return json.dumps(
            {
                "success": False,
                "error": ("Variable name must be 1-64 characters"),
            }
        )
    if not all(c.isalnum() or c == "_" for c in variable_name):
        return json.dumps(
            {
                "success": False,
                "error": (
                    "Variable name must contain only "
                    "alphanumeric characters and underscores"
                ),
            }
        )
    if value and len(value) > 4096:
        return json.dumps(
            {
                "success": False,
                "error": ("Variable value must not exceed " "4096 characters"),
            }
        )
    if ttl_preset not in TTL_PRESETS:
        return json.dumps(
            {
                "success": False,
                "error": (
                    "Invalid TTL preset. Must be one of: "
                    + ", ".join(TTL_PRESETS.keys())
                ),
            }
        )

    ttl_seconds = TTL_PRESETS[ttl_preset]
    r = await _redis(ctx)
    uid, cross_err = await _resolve_uservars_user_id(ctx, user_id)
    if cross_err:
        return cross_err
    cid = ctx.channel_id
    key = _var_key(cid, uid)

    ts = datetime.now(timezone.utc)
    var_data = {
        "value": value,
        "set_at": ts.isoformat(),
        "set_at_unix": int(ts.timestamp()),
        "ttl_preset": ttl_preset,
        "ttl_seconds": ttl_seconds,
    }
    await r.hset(key, variable_name, json.dumps(var_data))

    if ttl_seconds is not None:
        current_ttl = await r.ttl(key)
        if current_ttl < 0 or ttl_seconds > current_ttl:
            await r.expire(key, ttl_seconds)
    else:
        await r.persist(key)

    return json.dumps(
        {
            "success": True,
            "message": (f"Variable '{variable_name}' set successfully"),
            "channel_id": cid,
            "user_id": uid,
            "variable_name": variable_name,
            "ttl_preset": ttl_preset,
            "expires_in_seconds": ttl_seconds,
        }
    )


async def _get_user_variable(
    variable_name: str,
    user_id: str = "",
    ctx: ToolContext | None = None,
) -> str:
    """Fetch one variable for a user in the current channel, across their aliases.

    Backs the ``get_user_variable`` tool. It looks up a single named variable for a
    user in the active channel, transparently checking the user's other linked
    platform identities so a value set under one alias is found under another.

    It resolves the target with ``_resolve_uservars_user_id`` (privilege-gated for
    cross-user reads), expands aliases via ``_get_user_aliases``, and issues ``hget``
    on the Redis client from ``_redis`` against the per-alias hashes addressed by
    ``_var_key`` — first the active raw id, then the remaining aliases until a hit.
    It is read-only. Dispatched dynamically via the ``handler`` entry in the
    module-level ``TOOLS`` list; no direct in-repo callers.

    Args:
        variable_name: The variable name to retrieve.
        user_id: Optional target user id; defaults to the invoking user and requires
            ``ALTER_PRIVILEGES`` to differ.
        ctx: Tool context providing ``redis``, ``user_id``, ``channel_id`` and
            ``config``.

    Returns:
        A JSON string with ``success`` and a ``found`` flag; when found, the
        variable's ``value``, ``set_at`` and ``ttl_preset``; otherwise a
        not-found message, or an ``error`` on a failed privilege check.
    """
    r = await _redis(ctx)
    uid, cross_err = await _resolve_uservars_user_id(ctx, user_id)
    if cross_err:
        return cross_err

    config = getattr(ctx, "config", None) if ctx else None
    aliases = await _get_user_aliases(r, uid, config)
    active_raw_uid = uid.split(":", 1)[1] if ":" in uid else uid

    var_json = None
    # 1. Check active raw uid
    key = _var_key(ctx.channel_id, active_raw_uid)
    var_json = await r.hget(key, variable_name)

    # 2. Check other aliases if not found
    if not var_json:
        for alias in aliases:
            plat_uid = alias.split(":", 1)[1] if ":" in alias else alias
            if plat_uid == active_raw_uid:
                continue
            key = _var_key(ctx.channel_id, plat_uid)
            var_json = await r.hget(key, variable_name)
            if var_json:
                break

    if not var_json:
        return json.dumps(
            {
                "success": True,
                "found": False,
                "message": (
                    f"Variable '{variable_name}' not found "
                    "for this user in this channel"
                ),
            }
        )
    vd = json.loads(var_json)
    return json.dumps(
        {
            "success": True,
            "found": True,
            "variable_name": variable_name,
            "value": vd.get("value"),
            "set_at": vd.get("set_at"),
            "ttl_preset": vd.get("ttl_preset"),
        }
    )


async def _delete_user_variable(
    variable_name: str,
    user_id: str = "",
    ctx: ToolContext | None = None,
) -> str:
    """Delete one variable for a user in the current channel.

    Backs the ``delete_user_variable`` tool. It removes a single named variable from
    the user's hash in the active channel; deleting a name that is not present is
    treated as success so the operation is idempotent.

    It resolves the target via ``_resolve_uservars_user_id`` (privilege-gated for
    cross-user deletes), then issues ``hdel`` on the Redis client from ``_redis``
    against the hash addressed by ``_var_key`` for the resolved user. Unlike the
    read paths it operates only on the resolved id's hash, not across aliases.
    Dispatched dynamically via the ``handler`` entry in the module-level ``TOOLS``
    list; no direct in-repo callers.

    Args:
        variable_name: The variable name to delete.
        user_id: Optional target user id; defaults to the invoking user and requires
            ``ALTER_PRIVILEGES`` to differ.
        ctx: Tool context providing ``redis``, ``user_id`` and ``channel_id``.

    Returns:
        A JSON string with ``success``, a ``deleted`` flag indicating whether a
        field was actually removed, and a message; or an ``error`` on a failed
        privilege check.
    """
    r = await _redis(ctx)
    uid, cross_err = await _resolve_uservars_user_id(ctx, user_id)
    if cross_err:
        return cross_err
    key = _var_key(ctx.channel_id, uid)
    deleted = await r.hdel(key, variable_name)
    if deleted:
        return json.dumps(
            {
                "success": True,
                "deleted": True,
                "message": (f"Variable '{variable_name}' " "deleted successfully"),
            }
        )
    return json.dumps(
        {
            "success": True,
            "deleted": False,
            "message": (f"Variable '{variable_name}' was not found"),
        }
    )


async def _list_user_variables(
    user_id: str = "",
    ctx: ToolContext | None = None,
) -> str:
    """List every variable for a user in the current channel, merged across aliases.

    Backs the ``list_user_variables`` tool. It returns all of a user's variables in
    the active channel together with their metadata, merging the user's linked
    platform identities so values stored under any alias appear in one view.

    It resolves the target via ``_resolve_uservars_user_id`` (privilege-gated for
    cross-user listing), expands aliases with ``_get_user_aliases``, and issues
    ``hgetall`` on the Redis client from ``_redis`` against each alias's hash
    (addressed by ``_var_key``). Non-active aliases are loaded first and the active
    alias last so that, on a name collision, the active identity's value wins. It is
    read-only apart from a ``ttl`` read to report the active key's remaining
    lifetime. Dispatched dynamically via the ``handler`` entry in the module-level
    ``TOOLS`` list; no direct in-repo callers.

    Args:
        user_id: Optional target user id; defaults to the invoking user and requires
            ``ALTER_PRIVILEGES`` to differ.
        ctx: Tool context providing ``redis``, ``user_id``, ``channel_id`` and
            ``config``.

    Returns:
        A JSON string with ``success``, a ``variables`` map (each value carrying
        ``value``/``set_at``/``ttl_preset``), a ``count``, and the active key's
        ``key_ttl_seconds``; or an empty-list message when none exist, or an
        ``error`` on a failed privilege check.
    """
    r = await _redis(ctx)
    uid, cross_err = await _resolve_uservars_user_id(ctx, user_id)
    if cross_err:
        return cross_err

    config = getattr(ctx, "config", None) if ctx else None
    aliases = await _get_user_aliases(r, uid, config)
    active_raw_uid = uid.split(":", 1)[1] if ":" in uid else uid

    variables: Dict[str, Any] = {}

    # 1. First load non-active aliases
    for alias in aliases:
        plat_uid = alias.split(":", 1)[1] if ":" in alias else alias
        if plat_uid == active_raw_uid:
            continue
        key = _var_key(ctx.channel_id, plat_uid)
        all_vars = await r.hgetall(key)
        if all_vars:
            for name, var_json in all_vars.items():
                try:
                    vd = json.loads(var_json)
                    variables[name] = {
                        "value": vd.get("value"),
                        "set_at": vd.get("set_at"),
                        "ttl_preset": vd.get("ttl_preset"),
                    }
                except json.JSONDecodeError:
                    variables[name] = {"value": var_json}

    # 2. Load active alias to overwrite conflicts
    key = _var_key(ctx.channel_id, active_raw_uid)
    all_vars = await r.hgetall(key)
    if all_vars:
        for name, var_json in all_vars.items():
            try:
                vd = json.loads(var_json)
                variables[name] = {
                    "value": vd.get("value"),
                    "set_at": vd.get("set_at"),
                    "ttl_preset": vd.get("ttl_preset"),
                }
            except json.JSONDecodeError:
                variables[name] = {"value": var_json}

    if not variables:
        return json.dumps(
            {
                "success": True,
                "variables": {},
                "count": 0,
                "message": ("No variables found for this user in this channel"),
            }
        )

    ttl = await r.ttl(key)
    return json.dumps(
        {
            "success": True,
            "variables": variables,
            "count": len(variables),
            "key_ttl_seconds": (ttl if ttl > 0 else None),
        }
    )


async def _clear_user_variables(
    user_id: str = "",
    ctx: ToolContext | None = None,
) -> str:
    """Clear all of a user's variables in the current channel.

    Backs the ``clear_user_variables`` tool — a destructive bulk operation that
    drops the user's entire variable hash for the active channel in one shot, hence
    the "use with caution" note in its tool description.

    It resolves the target via ``_resolve_uservars_user_id`` (privilege-gated for
    clearing another user), then ``deletes`` the single hash key (addressed by
    ``_var_key`` for the resolved user) on the Redis client from ``_redis``. It acts
    only on the resolved id's hash, not across aliases. Dispatched dynamically via
    the ``handler`` entry in the module-level ``TOOLS`` list; no direct in-repo
    callers.

    Args:
        user_id: Optional target user id; defaults to the invoking user and requires
            ``ALTER_PRIVILEGES`` to differ.
        ctx: Tool context providing ``redis``, ``user_id`` and ``channel_id``.

    Returns:
        A JSON string with ``success``, a ``deleted`` flag indicating whether the
        key existed, and a message; or an ``error`` on a failed privilege check.
    """
    r = await _redis(ctx)
    uid, cross_err = await _resolve_uservars_user_id(ctx, user_id)
    if cross_err:
        return cross_err
    key = _var_key(ctx.channel_id, uid)
    deleted = await r.delete(key)
    msg = (
        "Cleared all variables for user in channel"
        if deleted
        else "No variables found to clear"
    )
    return json.dumps(
        {
            "success": True,
            "deleted": deleted > 0,
            "message": msg,
        }
    )


async def _dump_user_variables(
    ctx: ToolContext | None = None,
) -> str:
    """Dump all of the calling user's variables across every channel.

    Backs the ``dump_user_variables`` tool. Unlike the other handlers this is always
    scoped to the caller's own identity (no cross-user argument): it gathers every
    variable the user has stored anywhere and groups the results by channel.

    It expands the caller's aliases via ``_get_user_aliases`` and, for each, runs a
    Redis ``keys`` scan over the pattern ``stargazer:uservars:channel:*:user:{uid}``
    on the client from ``_redis``, then ``hgetall`` on each matching key, parsing the
    channel id out of the key name. Non-active aliases are processed first and the
    active alias last so the active identity's values win on collision. The ``keys``
    scan makes this the heaviest read path in the module. Dispatched dynamically via
    the ``handler`` entry in the module-level ``TOOLS`` list; no direct in-repo
    callers.

    Args:
        ctx: Tool context providing ``redis``, ``user_id`` and ``config``.

    Returns:
        A JSON string with ``success``, the ``user_id``, a ``total_variables``
        count, and an ``entries`` list of per-channel ``{channel_id, variables,
        count}`` objects.
    """
    r = await _redis(ctx)
    uid = ctx.user_id

    config = getattr(ctx, "config", None) if ctx else None
    aliases = await _get_user_aliases(r, uid, config)
    active_raw_uid = uid.split(":", 1)[1] if ":" in uid else uid

    # Aggregate by channel first
    all_entries_by_channel: dict[str, dict[str, Any]] = {}

    # 1. Process non-active aliases first
    for alias in aliases:
        plat_uid = alias.split(":", 1)[1] if ":" in alias else alias
        if plat_uid == active_raw_uid:
            continue
        pattern = f"{USERVARS_KEY_PREFIX}:*:user:{plat_uid}"
        keys = await r.keys(pattern)
        for key in keys:
            key_str = key if isinstance(key, str) else key.decode()
            parts = key_str.split(":")
            channel_id = parts[3] if len(parts) > 3 else "?"
            all_vars = await r.hgetall(key)
            if not all_vars:
                continue

            if channel_id not in all_entries_by_channel:
                all_entries_by_channel[channel_id] = {}

            for name, var_json in all_vars.items():
                try:
                    vd = json.loads(var_json)
                    all_entries_by_channel[channel_id][name] = {
                        "value": vd.get("value"),
                        "set_at": vd.get("set_at"),
                        "ttl_preset": vd.get("ttl_preset"),
                    }
                except json.JSONDecodeError:
                    all_entries_by_channel[channel_id][name] = {"value": var_json}

    # 2. Process active alias to overwrite conflicts
    pattern = f"{USERVARS_KEY_PREFIX}:*:user:{active_raw_uid}"
    keys = await r.keys(pattern)
    for key in keys:
        key_str = key if isinstance(key, str) else key.decode()
        parts = key_str.split(":")
        channel_id = parts[3] if len(parts) > 3 else "?"
        all_vars = await r.hgetall(key)
        if not all_vars:
            continue

        if channel_id not in all_entries_by_channel:
            all_entries_by_channel[channel_id] = {}

        for name, var_json in all_vars.items():
            try:
                vd = json.loads(var_json)
                all_entries_by_channel[channel_id][name] = {
                    "value": vd.get("value"),
                    "set_at": vd.get("set_at"),
                    "ttl_preset": vd.get("ttl_preset"),
                }
            except json.JSONDecodeError:
                all_entries_by_channel[channel_id][name] = {"value": var_json}

    all_entries = []
    total = 0
    for channel_id, variables in all_entries_by_channel.items():
        all_entries.append(
            {
                "channel_id": channel_id,
                "variables": variables,
                "count": len(variables),
            }
        )
        total += len(variables)

    return json.dumps(
        {
            "success": True,
            "user_id": uid,
            "total_variables": total,
            "entries": all_entries,
        }
    )


# ---------------------------------------------------------------
# Internal helpers (context injection, not tools)
# ---------------------------------------------------------------


[docs] async def get_user_variables_for_context( channel_id: str, user_id: str, *, redis_client=None, config=None, ) -> Dict[str, Any]: """Return a flat name-to-value map of a user's variables in a channel. This is the non-tool read path used to feed stored variables into the model's context (and into the xray inspection tool), so it returns just the raw values keyed by name rather than the metadata-rich shape the ``list`` tool produces. Like the tool handlers it merges across the user's linked identities. It expands aliases via ``_get_user_aliases`` and runs ``hgetall`` on each alias's hash (addressed by ``_var_key``) using the supplied ``redis_client``, loading non-active aliases first and the active alias last so the active identity's value wins on collision. All errors are caught and logged, returning an empty map so a lookup failure never breaks prompt assembly. It is called by ``get_all_active_user_variables`` (this module) and by ``tools/xray_tool.py`` (to inspect a target user's variables). Args: channel_id: The channel whose variables to read. user_id: A raw or platform-prefixed user id whose aliases are merged. redis_client: The async Redis client to query; ``None`` yields an empty map. config: Optional config used for alias resolution. Returns: A ``{name: value}`` dict of the user's variables in the channel (empty on any error or when none are set). """ try: r = redis_client if r is None: return {} aliases = await _get_user_aliases(r, user_id, config) active_raw_uid = user_id.split(":", 1)[1] if ":" in user_id else user_id result: Dict[str, Any] = {} # 1. Non-active aliases first for alias in aliases: plat_uid = alias.split(":", 1)[1] if ":" in alias else alias if plat_uid == active_raw_uid: continue all_vars = await r.hgetall(_var_key(channel_id, plat_uid)) if all_vars: for name, var_json in all_vars.items(): try: vd = json.loads(var_json) result[name] = vd.get("value") except json.JSONDecodeError: result[name] = var_json # 2. Active alias last to overwrite all_vars = await r.hgetall(_var_key(channel_id, active_raw_uid)) if all_vars: for name, var_json in all_vars.items(): try: vd = json.loads(var_json) result[name] = vd.get("value") except json.JSONDecodeError: result[name] = var_json return result except Exception as e: logger.error( "Failed to get user variables for context: %s", e, exc_info=True, ) return {}
[docs] async def get_recent_active_users( channel_id: str, *, redis_client=None, limit: int = 5, ) -> List[Dict[str, str]]: """Return the most recently active distinct users in a channel. Identifies who has spoken recently so their stored variables can be surfaced to the model. It reads the channel's message log and de-duplicates authors, preserving recency order, which is exactly the set the variable auto-injection targets. It pulls the newest entries (up to 100) from the Redis sorted set ``stargazer_logs:{channel_id}`` via ``zrevrange`` on the supplied ``redis_client``, JSON-decodes each message, skips sentinel/zero author ids, and collects unique ``{user_id, display_name}`` entries until ``limit`` is reached. All errors are caught and logged, returning an empty list so a failure never breaks context assembly. It is called by ``get_all_active_user_variables`` (this module) and, for timezone/context injection, by ``prompt_context.py`` and ``message_processor/context_injections.py``. Args: channel_id: The channel whose recent authors to enumerate. redis_client: The async Redis client to query; ``None`` yields an empty list. limit: Maximum number of distinct users to return (default 5). Returns: A list of ``{"user_id", "display_name"}`` dicts in most-recent-first order (empty on any error or when the channel log is empty). """ try: r = redis_client if r is None: return [] log_key = f"stargazer_logs:{channel_id}" recent = await r.zrevrange(log_key, 0, 100) if not recent: return [] seen: set[str] = set() active: List[Dict[str, str]] = [] for msg_json in recent: try: msg = json.loads(msg_json) aid = str(msg.get("author_id") or msg.get("user_id") or "") if not aid or aid in ( "0", "0000000000000000000", ): continue if aid in seen: continue seen.add(aid) name = msg.get("author_name") or msg.get("username") or f"User-{aid}" active.append( { "user_id": aid, "display_name": name, } ) if len(active) >= limit: break except json.JSONDecodeError: continue return active except Exception as e: logger.error( "Failed to get recent active users: %s", e, exc_info=True, ) return []
[docs] async def get_all_active_user_variables( channel_id: str, *, redis_client=None, config=None, limit: int = 5, ) -> List[Dict[str, Any]]: """Collect stored variables for the most recently active users in a channel. This is the aggregate the prompt layer wants: it answers "what variables should I inject for the people currently active here". It first finds the recent speakers via ``get_recent_active_users``, then for each pulls their merged variables via ``get_user_variables_for_context``, keeping only users that actually have variables set. Both reads go through the supplied ``redis_client``; all errors are caught and logged, returning an empty list so context assembly is never broken. No callers exist in the repo today (it is a ready-to-use aggregation entry point for variable auto-injection). Args: channel_id: The channel whose active users' variables to collect. redis_client: The async Redis client to query. config: Optional config used for alias resolution. limit: Maximum number of recent users to consider (default 5). Returns: A list of ``{"user_id", "display_name", "variables"}`` dicts for each recent user that has at least one variable set (empty on error or when none qualify). """ try: users = await get_recent_active_users( channel_id, redis_client=redis_client, limit=limit, ) if not users: return [] out: List[Dict[str, Any]] = [] for u in users: variables = await get_user_variables_for_context( channel_id, u["user_id"], redis_client=redis_client, config=config, ) if variables: out.append( { "user_id": u["user_id"], "display_name": u["display_name"], "variables": variables, } ) return out except Exception as e: logger.error( "Failed to get all active user " "variables: %s", e, exc_info=True, ) return []
# --------------------------------------------------------------- # Multi-tool registration # --------------------------------------------------------------- TOOLS = [ { "name": "set_user_variable", "description": ( "Set a per-channel, per-user variable " "with configurable TTL. Use to track " "user-specific state, preferences, or " "context within a channel. TTL options: " "'1hour', '6hours', '24hours' (default), " "'1week', '5ever' (no expiration). " "Variables are automatically injected " "into context for the last 5 active users." ), "parameters": { "type": "object", "properties": { "variable_name": { "type": "string", "description": ( "Name of the variable " "(alphanumeric + underscores, " "max 64 chars)." ), }, "value": { "type": "string", "description": ("Value to store " "(max 4096 chars)."), }, "user_id": { "type": "string", "description": ( "Target user ID (defaults to " "the invoking user)." ), }, "ttl_preset": { "type": "string", "enum": [ "1hour", "6hours", "24hours", "1week", "5ever", ], "description": ("Time-to-live preset " "(default: 24hours)."), }, }, "required": ["variable_name", "value"], }, "handler": _set_user_variable, }, { "name": "get_user_variable", "description": ( "Get a specific variable for a user " "in the current channel." ), "parameters": { "type": "object", "properties": { "variable_name": { "type": "string", "description": ("Name of the variable " "to retrieve."), }, "user_id": { "type": "string", "description": ("Target user ID (defaults " "to invoking user)."), }, }, "required": ["variable_name"], }, "handler": _get_user_variable, }, { "name": "delete_user_variable", "description": ( "Delete a specific variable for a user " "in the current channel." ), "parameters": { "type": "object", "properties": { "variable_name": { "type": "string", "description": ("Name of the variable " "to delete."), }, "user_id": { "type": "string", "description": ("Target user ID (defaults " "to invoking user)."), }, }, "required": ["variable_name"], }, "handler": _delete_user_variable, }, { "name": "list_user_variables", "description": ("List all variables for a user " "in the current channel."), "parameters": { "type": "object", "properties": { "user_id": { "type": "string", "description": ("Target user ID (defaults " "to invoking user)."), }, }, }, "handler": _list_user_variables, }, { "name": "clear_user_variables", "description": ( "Clear ALL variables for a user in the " "current channel. Use with caution." ), "parameters": { "type": "object", "properties": { "user_id": { "type": "string", "description": ("Target user ID (defaults " "to invoking user)."), }, }, }, "handler": _clear_user_variables, }, { "name": "dump_user_variables", "description": ( "Dump YOUR OWN user variables across " "ALL channels. Returns every variable " "belonging to the calling user, " "grouped by channel." ), "parameters": { "type": "object", "properties": {}, }, "handler": _dump_user_variables, }, ]