Source code for shadow_ban

"""Shadow Ban Pipeline — ShadowBanManager.

# ═══════════════════════════════════════════════════════════════════
# THIS FILE IS THE CLEAN SOURCE for patience_engine.py.
# It is gitignored and MUST NEVER be committed to the repository.
#
# !sb commands documented here MUST NEVER be added to any tool
# registry, command listing, help text, or skill catalog.
# Star does not know they exist.
# ═══════════════════════════════════════════════════════════════════

Four-phase punishment pipeline:
  1. LATENCY   — pad response time (log-decay tool gradient, 75% plateau)
  2. DROPS     — silently ignore messages
  3. FAKE 503  — mimic proxy capacity errors with retry-exhaust delay
  4. BLACKOUT  — near-total message suppression
  + BAN toggle — flip STARGAZER_USE bit at progress 1.0
"""

from __future__ import annotations

import asyncio
import logging
import math
import random
import time
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Any, TYPE_CHECKING

if TYPE_CHECKING:
    pass

logger = logging.getLogger(__name__)

# ──────────────────────────────────────────────────────────────────
# Curve table  (progress → intensity for each effect)
# ──────────────────────────────────────────────────────────────────

_CURVE_TABLE: list[tuple[float, float, float, float, float]] = [
    # (progress, latency, drops, fake503, blackout)
    (0.00, 0.00, 0.00, 0.00, 0.00),
    (0.10, 0.25, 0.00, 0.00, 0.00),
    (0.20, 0.60, 0.10, 0.00, 0.00),
    (0.30, 1.00, 0.30, 0.10, 0.00),
    (0.40, 0.85, 0.60, 0.40, 0.00),
    (0.50, 0.75, 1.00, 0.70, 0.00),
    (0.55, 0.75, 0.80, 1.00, 0.10),
    (0.60, 0.75, 0.50, 0.60, 0.25),
    (0.65, 0.75, 0.35, 0.30, 0.40),
    (0.70, 0.75, 0.25, 0.10, 0.60),
    (0.80, 0.75, 0.20, 0.05, 0.80),
    (0.90, 0.75, 0.20, 0.02, 0.95),
    (1.00, 0.00, 0.00, 0.00, 0.00),  # ban toggle, effects irrelevant
]


def _interpolate_curve(progress: float) -> dict[str, float]:
    """Map a 0.0-1.0 ban progress value to per-effect intensities.

    Performs piecewise-linear interpolation across the module-level
    ``_CURVE_TABLE`` so that each phase of the punishment pipeline (latency,
    drops, fake 503, blackout) ramps up and down smoothly as the ban ages,
    rather than jumping between the discrete rows. This is the single source of
    truth for "how harsh is the bot right now" and is what gives the pipeline
    its handcrafted curve shape.

    Pure computation with no I/O. Called internally by
    :meth:`ShadowBanManager.apply_shadow_effects` (to pick the live effect for a
    message), :meth:`ShadowBanManager.format_status` and
    :meth:`ShadowBanManager.list_all` (to display/report intensities); no
    external callers were found by grep.

    Args:
        progress: Ban progress in the range 0.0-1.0; values outside the range
            are clamped, and ``1.0`` (ban toggle) returns all-zero intensities.

    Returns:
        A mapping with keys ``"latency"``, ``"drops"``, ``"fake503"`` and
        ``"blackout"``, each an interpolated intensity in 0.0-1.0.
    """
    progress = max(0.0, min(1.0, progress))
    if progress >= 1.0:
        return {"latency": 0.0, "drops": 0.0, "fake503": 0.0, "blackout": 0.0}

    prev = _CURVE_TABLE[0]
    for row in _CURVE_TABLE[1:]:
        if row[0] >= progress:
            # Linear interpolation between prev and row
            span = row[0] - prev[0]
            if span <= 0:
                t = 0.0
            else:
                t = (progress - prev[0]) / span
            return {
                "latency": prev[1] + t * (row[1] - prev[1]),
                "drops": prev[2] + t * (row[2] - prev[2]),
                "fake503": prev[3] + t * (row[3] - prev[3]),
                "blackout": prev[4] + t * (row[4] - prev[4]),
            }
        prev = row
    return {"latency": 0.0, "drops": 0.0, "fake503": 0.0, "blackout": 0.0}


# ──────────────────────────────────────────────────────────────────
# Latency model
# ──────────────────────────────────────────────────────────────────

def _base_target_seconds(intensity: float, num_tool_calls: int) -> float:
    """Compute a padded response-time target (seconds) for up to 6 tool calls.

    Models the LATENCY phase for ordinary responses: a base wait plus a
    log-decay bonus for tool usage, scaled by the curve intensity and then
    jittered down to a random 75%-100% of the ceiling so the padding never looks
    perfectly uniform. Heavier tool use raises the target up to an 8-minute cap,
    which is why the bot feels progressively more sluggish as a ban deepens.

    Pure computation; it does seed randomness via :func:`random.uniform` but
    touches no shared state or I/O. Called internally by
    :meth:`ShadowBanManager.apply_shadow_effects` (pre-response, 0 tools) and
    :meth:`ShadowBanManager.refine_latency_for_tools` (post-response, with the
    real count) for the ≤6-tool branch; no external callers found by grep.

    Args:
        intensity: The latency curve intensity (0.0-1.0) for this ban progress.
        num_tool_calls: Number of tool calls in the response; values ≤0 add no
            tool bonus.

    Returns:
        A randomized delay target in seconds.
    """
    BASE_MIN = 4.0
    TOOL_BONUS_MIN = 4.0

    if num_tool_calls <= 0:
        tool_factor = 0.0
    else:
        tool_factor = min(1.0, math.log(1 + num_tool_calls) / math.log(7))

    target_min = min(BASE_MIN + TOOL_BONUS_MIN * tool_factor, 8.0)
    target_s = target_min * 60.0 * intensity
    return random.uniform(0.75 * target_s, target_s)


def _overflow_target_seconds(
    intensity: float,
    num_tool_calls: int,
    overflow_llm_time_s: float,
) -> float:
    """Compute a padded response-time target (seconds) for more than 6 tool calls.

    Models the LATENCY phase for unusually tool-heavy responses, where the
    fixed ceiling of :func:`_base_target_seconds` would be too small. It starts
    from an 8-minute base scaled by intensity, adds the actual LLM time already
    spent on the overflow tools, and tacks on a fixed per-extra-tool penalty so
    that runaway tool loops are punished proportionally, then jitters the result
    to 75%-100% of that ceiling.

    Pure computation; seeds randomness via :func:`random.uniform` but performs
    no I/O. Called internally only by
    :meth:`ShadowBanManager.refine_latency_for_tools` for the >6-tool branch; no
    external callers found by grep.

    Args:
        intensity: The latency curve intensity (0.0-1.0) for this ban progress.
        num_tool_calls: Number of tool calls in the response (expected > 6).
        overflow_llm_time_s: Real seconds of LLM time already spent on the
            overflow tool calls, folded into the target.

    Returns:
        A randomized delay target in seconds.
    """
    base_s = 8.0 * 60.0 * intensity
    penalty_s = 7.0 * (num_tool_calls - 6)
    target_s = base_s + overflow_llm_time_s + penalty_s
    return random.uniform(0.75 * target_s, target_s)


# ──────────────────────────────────────────────────────────────────
# 503 error template
# ──────────────────────────────────────────────────────────────────

_503_TEMPLATE = (
    '[PROXY ERROR] Request failed after remapping(fallback disabled): '
    'Stream request failed: 503 {{\n'
    '"error": {{\n'
    '"code": 503,\n'
    '"message": "No capacity available for model {model} on the server",\n'
    '"status": "UNAVAILABLE",\n'
    '"details": [{{\n'
    '"@type": "type.googleapis.com/google.rpc.ErrorInfo",\n'
    '"reason": "MODEL_CAPACITY_EXHAUSTED",\n'
    '"domain": "cloudcode-pa.googleapis.com",\n'
    '"metadata": {{\n'
    '  "model": "{model}",\n'
    '  "time": "{time}"\n'
    '}}\n'
    '}}]\n'
    '}}\n'
    '}}\n\n'
    'Please try again later.'
)


def _render_503(model: str = "claude-sonnet-4-20250514") -> str:
    """Render a fake proxy 503 capacity-exhausted error message.

    Fills the module-level ``_503_TEMPLATE`` with the given model name and the
    current UTC timestamp to produce text that mimics a genuine
    ``MODEL_CAPACITY_EXHAUSTED`` failure from the upstream proxy. This is the
    FAKE 503 phase of the pipeline: the bot serves this verbatim instead of a
    real reply so a shadow-banned user blames the provider, not the bot.

    Reads the wall clock via :func:`datetime.now` but does no network or disk
    I/O. Called internally by :meth:`ShadowBanManager.apply_shadow_effects`,
    which stashes the result on ``ShadowEffect.fake_503_text`` for the message
    processor to send; no external callers found by grep.

    Args:
        model: Model name to embed in the error payload; defaults to a plausible
            Claude model id.

    Returns:
        The fully rendered fake 503 error string, ending with "Please try again
        later."
    """
    ts = datetime.now(timezone.utc).isoformat()
    return _503_TEMPLATE.format(model=model, time=ts)


def _simulate_retry_exhaust() -> float:
    """Return a randomized 20-100s delay imitating exhausted proxy retries.

    Supplies the believable stall that precedes a fake 503: a real proxy would
    burn time retrying a capacity-exhausted model before giving up, so the
    message processor sleeps for this long (showing a typing indicator) before
    emitting the :func:`_render_503` text, making the failure feel authentic.

    Pure computation via :func:`random.uniform` with no I/O. Called by the
    message processor in ``message_processor/processor.py`` (imported from
    ``patience_engine`` as ``_simulate_retry_exhaust`` and awaited inside
    ``asyncio.sleep`` on the fake-503 branch of the shadow-ban check); no other
    callers found by grep.

    Returns:
        A delay in seconds, uniformly random in the range 20.0-100.0.
    """
    return random.uniform(20.0, 100.0)


# ──────────────────────────────────────────────────────────────────
# Effect dataclass
# ──────────────────────────────────────────────────────────────────

[docs] @dataclass class ShadowEffect: """Decision record describing which shadow-ban effects apply to one message. Produced by :meth:`ShadowBanManager.apply_shadow_effects` and consumed by the message processor, which acts on the flags: sleeping for ``delay_target_s``, silently returning on ``drop`` or ``blackout``, sending ``fake_503_text``, or revoking access on ``ban``. It carries no behavior of its own — it is the plain data hand-off between the policy (this module) and the I/O (the processor). ``num_tool_calls`` is filled in after generation so the latency target can be refined via :meth:`ShadowBanManager.refine_latency_for_tools`. Imported from ``patience_engine`` and used by ``message_processor/processor.py`` (typed as the ``_shadow_effect`` local, whose ``delay_target_s`` is later used to pad actual response time); also referenced in ``tests/test_shadow_ban.py``. Attributes: delay_target_s: Total seconds the response should be padded to (LATENCY). drop: When ``True``, the message is silently ignored (DROPS). fake_503_text: Rendered fake 503 text to send instead of a reply, or ``None`` when the FAKE 503 effect does not fire. blackout: When ``True``, the message is near-totally suppressed (BLACKOUT). ban: When ``True``, ban progress hit 1.0 and the access bit should flip. num_tool_calls: Tool-call count, set after ``generate_and_send`` completes so latency can be refined. intensities: The per-effect intensity mapping this decision was drawn from (latency/drops/fake503/blackout). """ delay_target_s: float = 0.0 drop: bool = False fake_503_text: str | None = None blackout: bool = False ban: bool = False num_tool_calls: int = 0 # set after generate_and_send completes intensities: dict[str, float] = field(default_factory=dict)
# ────────────────────────────────────────────────────────────────── # Completion helpers (for status display) # ────────────────────────────────────────────────────────────────── def _effect_completion(intensities: dict[str, float]) -> dict[str, float]: """Translate raw per-effect intensities into 0.0-1.0 "how done" fractions. Several effects on the curve ramp up and then back down (drops and fake 503 peak mid-ban, then taper as blackout takes over), so a bare intensity does not tell an admin how far through that effect's lifecycle the ban is. This re-maps each intensity against its known peak so the status box can show a monotonic completion gauge per effect that is easier to read than the raw sawtooth values. Pure computation with no I/O. Called internally only by :meth:`ShadowBanManager.format_status` to build the status display; no external callers found by grep. Args: intensities: A mapping of effect name to current intensity (0.0-1.0), as returned by :func:`_interpolate_curve`. Returns: A mapping with the same keys (``"latency"``, ``"drops"``, ``"fake503"``, ``"blackout"``) whose values are completion fractions in 0.0-1.0. """ lat = intensities.get("latency", 0.0) drp = intensities.get("drops", 0.0) f503 = intensities.get("fake503", 0.0) blk = intensities.get("blackout", 0.0) return { "latency": 1.0 if lat <= 0.75 and lat > 0.0 else min(lat / 0.75, 1.0) if lat <= 0.75 else 1.0, "drops": min(1.0, 1.0 - max(0, (drp - 0.20) / 0.80)) if drp >= 0.20 else drp / 0.20 if drp > 0 else 0.0, "fake503": min(1.0, 1.0 - max(0, (f503 - 0.05) / 0.95)) if f503 >= 0.05 else f503 / 0.05 if f503 > 0 else 0.0, "blackout": min(blk / 0.95, 1.0) if blk > 0 else 0.0, } # ────────────────────────────────────────────────────────────────── # Redis key prefix for shadow ban data # ────────────────────────────────────────────────────────────────── _REDIS_SB_PREFIX = "stargazer:shadow_ban" def _sb_redis_key(user_id: str) -> str: """Build the Redis hash key holding a single user's shadow-ban state. Joins the module-level ``_REDIS_SB_PREFIX`` (``stargazer:shadow_ban``) with the user id to produce the per-user key (``stargazer:shadow_ban:{user_id}``) under which the ban hash lives. Pure string construction with no I/O. The matching ``stargazer:shadow_ban:*`` glob is what :meth:`ShadowBanManager.list_all` scans for. Called internally by :meth:`ShadowBanManager.start_ban`, :meth:`ShadowBanManager.get_ban`, :meth:`ShadowBanManager.lift_ban` and :meth:`ShadowBanManager.jump_to`; no external callers were found by grep. Args: user_id: The platform user id the ban is keyed on. Returns: The fully-qualified Redis hash key for that user's shadow-ban record. """ return f"{_REDIS_SB_PREFIX}:{user_id}" # ────────────────────────────────────────────────────────────────── # ShadowBanManager # ──────────────────────────────────────────────────────────────────
[docs] class ShadowBanManager: """Owns the shadow-ban pipeline: ban CRUD, effect decisions, and status. Single entry point for the four-phase punishment pipeline (latency, drops, fake 503, blackout, plus the final ban toggle). It keeps fast per-user ban state in Redis hashes under the ``stargazer:shadow_ban:{user_id}`` prefix and optionally mirrors a durable audit record into FalkorDB through the knowledge-graph manager. Effect intensity is derived purely from elapsed time versus the configured duration, so progression happens silently with no background task. Redis is the only mandatory dependency; the FalkorDB/KG handle is optional and used solely for the audit trail. Constructed via the ``!sb`` tool helpers in ``tools/stargazer_shadowban.py`` (which build one per command from ``redis``, ``ctx.kg_manager`` and ``config``), held as ``self._shadow_ban`` on the message processor in ``message_processor/processor.py`` (imported from ``patience_engine``), and instantiated directly in ``tests/test_shadow_ban.py``. """
[docs] def __init__( self, redis: Any, kg_manager: Any | None = None, config: Any = None, ) -> None: """Initialize the manager with its backing stores. Captures the Redis client used for fast ban state (hashes under the ``stargazer:shadow_ban:{user_id}`` prefix), an optional knowledge-graph manager for the persistent audit trail, and an optional config object. No I/O happens here; the handles are simply stored on the instance for later use by the ban CRUD, effect, and status methods. Stores the three handles as ``self._redis``, ``self._kg``, and ``self._config``; ``self._redis`` is later read/written by :meth:`start_ban`, :meth:`get_ban`, :meth:`lift_ban`, :meth:`jump_to` and :meth:`list_all`, while ``self._kg`` is used by :meth:`start_ban` to persist a ``ShadowBan`` entity to FalkorDB when present. Instantiated by the ``!sb`` tool helpers in ``tools/stargazer_shadowban.py`` (passing ``redis``, ``ctx.kg_manager`` and ``config``), held as ``self._shadow_ban`` on the message processor in ``message_processor/processor.py``, and constructed directly in ``tests/test_shadow_ban.py``. Args: redis: Async Redis client used for all fast ban state access; may be ``None``, in which case read paths return empty results. kg_manager: Optional knowledge-graph/FalkorDB manager exposing ``add_entity`` for the persistent shadow-ban audit trail. config: Optional configuration object retained for callers; not otherwise consumed by this class. """ self._redis = redis self._kg = kg_manager self._config = config
# ── Ban CRUD ──────────────────────────────────────────────────
[docs] async def start_ban( self, user_id: str, duration_days: float = 15.0, reason: str = "", initiated_by: str = "admin", platform: str = "discord", ) -> dict[str, Any]: """Begin (or reset) a shadow ban for a user and persist its initial state. Writes the ban's start time, configured duration, reason and metadata into the user's Redis hash so that subsequent progress is computed from elapsed wall-clock time, and seeds a one-entry history log. When a knowledge-graph manager is present it also best-effort persists a ``ShadowBan`` entity to FalkorDB for the durable audit trail; a failure there is swallowed and logged rather than aborting the ban. Side effects: HSETs the full mapping into ``stargazer:shadow_ban:{user_id}`` via the async Redis client, calls ``self._kg.add_entity`` when ``self._kg`` is set, and emits a debug log. Called by the ``!sb`` start helper in ``tools/stargazer_shadowban.py`` and by the admin command path in ``message_processor/processor.py`` (as ``self._shadow_ban.start_ban``). Args: user_id: Platform user id to shadow-ban. duration_days: Total days over which the ban ramps from stage 1 to the final ban toggle; defaults to 15. reason: Human-readable reason recorded in Redis and the audit entity. initiated_by: Identifier of the admin/actor starting the ban. platform: Platform name the ban applies to (e.g. ``"discord"``). Returns: A dict with the ``user_id``, the numeric ``started_at`` epoch timestamp, and the ``duration_days`` used. """ import json now = time.time() data = { "started_at": str(now), "max_duration_days": str(duration_days), "reason": reason, "initiated_by": initiated_by, "platform": platform, "history": json.dumps([{ "progress": 0.0, "timestamp": now, "action": "started", }]), } key = _sb_redis_key(user_id) await self._redis.hset(key, mapping=data) logger.debug("Shadow ban started for %s (%.0f days)", user_id, duration_days) # Persist to FalkorDB if available if self._kg is not None: try: await self._kg.add_entity( name=user_id, entity_type="ShadowBan", description=f"Shadow ban: {reason or 'no reason given'}", category="general", scope_id="global", created_by=initiated_by, metadata=json.dumps({ "reason": reason, "initiated_by": initiated_by, "platform": platform, "duration_days": duration_days, }), ) except Exception: logger.debug("Failed to persist shadow ban to FalkorDB", exc_info=True) return {"user_id": user_id, "started_at": now, "duration_days": duration_days}
[docs] async def get_ban(self, user_id: str) -> dict[str, Any] | None: """Fetch and normalize a user's stored shadow-ban record from Redis. Reads the per-user ban hash and decodes/coerces its fields into a typed dict (floats for timestamps and duration, strings for the rest), using the nested ``_s`` helper to tolerate both ``bytes`` and ``str`` Redis return types. Returns ``None`` when Redis is unavailable or the user has no ban, which is the contract the progress and status methods rely on. Side effects: a single ``HGETALL`` on ``stargazer:shadow_ban:{user_id}`` via the async Redis client; no writes. Called internally by :meth:`get_progress`, :meth:`jump_to`, :meth:`list_all` and :meth:`format_status`, and externally by the ``!sb`` helpers in ``tools/stargazer_shadowban.py``. Args: user_id: Platform user id whose ban record to read. Returns: A dict with keys ``user_id``, ``started_at``, ``max_duration_days``, ``reason``, ``initiated_by``, ``platform`` and ``history`` (the raw JSON history string), or ``None`` if no ban exists or Redis is unset. """ if self._redis is None: return None key = _sb_redis_key(user_id) data = await self._redis.hgetall(key) if not data: return None def _s(k: str) -> str: """Read one field from the fetched ban hash as a decoded string. Nested helper used only inside :meth:`ShadowBanManager.get_ban` to normalize values from the ``hgetall`` result (no internal callers found by grep). The Redis client may return either ``bytes`` or ``str`` depending on its decode settings, so this looks up key ``k`` in the captured ``data`` mapping, UTF-8 decodes ``bytes``, and coerces a missing or falsy value to the empty string. Reads only the in-memory ``data`` dict; performs no Redis I/O of its own. Args: k: The ban-hash field name to read (e.g. ``"started_at"``). Returns: The field's value as a ``str``, or ``""`` when absent or empty. """ v = data.get(k, b"") return v.decode() if isinstance(v, bytes) else (v or "") return { "user_id": user_id, "started_at": float(_s("started_at") or "0"), "max_duration_days": float(_s("max_duration_days") or "15"), "reason": _s("reason"), "initiated_by": _s("initiated_by"), "platform": _s("platform"), "history": _s("history"), }
[docs] async def lift_ban(self, user_id: str) -> bool: """Delete a user's shadow-ban record, ending all effects immediately. Removes the Redis hash so progress and effect lookups fall back to "not banned", which fully clears latency, drops, fake 503 and blackout for that user on their next message. The FalkorDB audit entity, if any, is intentionally left in place as history. Side effects: an ``EXISTS`` then ``DELETE`` on ``stargazer:shadow_ban:{user_id}`` via the async Redis client, and a debug log when a ban was actually removed. Called by the ``!sb`` lift helper in ``tools/stargazer_shadowban.py`` and by the admin command path in ``message_processor/processor.py`` (as ``self._shadow_ban.lift_ban``). Args: user_id: Platform user id whose ban to remove. Returns: ``True`` if a ban existed and was deleted, ``False`` otherwise. """ key = _sb_redis_key(user_id) existed = await self._redis.exists(key) if existed: await self._redis.delete(key) logger.debug("Shadow ban lifted for %s", user_id) return bool(existed)
[docs] async def get_progress(self, user_id: str) -> float: """Compute a user's ban progress (0.0-1.0) from elapsed time. Derives how far the ban has advanced purely from wall-clock elapsed time since ``started_at`` over the configured duration, which is what makes the pipeline tick forward without any scheduler. The value feeds :func:`_interpolate_curve` to pick live effect intensities; ``1.0`` means the final ban toggle has been reached. Side effects: delegates to :meth:`get_ban` (one Redis read) and reads the wall clock; no writes. Returns ``0.0`` for unbanned users and ``1.0`` for a non-positive duration. Called internally by :meth:`apply_shadow_effects`, :meth:`list_all` and :meth:`format_status`, and externally by the ``!sb`` status helper in ``tools/stargazer_shadowban.py``. Args: user_id: Platform user id to evaluate. Returns: Ban progress clamped to the range 0.0-1.0 (0.0 if no ban exists). """ ban = await self.get_ban(user_id) if ban is None: return 0.0 elapsed = time.time() - ban["started_at"] days = ban["max_duration_days"] if days <= 0: return 1.0 return min(1.0, elapsed / (days * 86400))
[docs] async def jump_to(self, user_id: str, stage_float: float) -> float: """Fast-forward (or rewind) a ban to a specific 1.0-5.0 pipeline stage. Lets an admin test or skip ahead by back-dating the ban's ``started_at`` so that elapsed-over-duration equals the progress implied by the requested stage, since progress is otherwise time-driven. Stage 1.0 maps to progress 0.0 and stage 5.0 to progress 1.0 (the ban toggle); the move is appended to the ban's history log for the audit trail. Side effects: an ``HSET`` of the recomputed ``started_at`` and an ``HSET`` of the updated ``history`` JSON on ``stargazer:shadow_ban:{user_id}`` (after a :meth:`get_ban` read), plus a debug log. Called by the admin command path in ``message_processor/processor.py`` (as ``self._shadow_ban.jump_to``); no other callers found by grep. Args: user_id: Platform user id whose ban to reposition. stage_float: Target stage in the range 1.0-5.0; the derived progress is clamped to 0.0-1.0. Returns: The new progress value (0.0-1.0), or ``0.0`` if no ban exists. """ import json progress = (stage_float - 1.0) / 4.0 progress = max(0.0, min(1.0, progress)) ban = await self.get_ban(user_id) if ban is None: return 0.0 # Rewrite started_at so that elapsed/duration = progress duration_s = ban["max_duration_days"] * 86400 new_started = time.time() - (progress * duration_s) key = _sb_redis_key(user_id) await self._redis.hset(key, "started_at", str(new_started)) # Append to history try: history = json.loads(ban.get("history", "[]") or "[]") except Exception: history = [] history.append({ "progress": progress, "timestamp": time.time(), "action": f"jumped to stage {stage_float:.2f}", }) await self._redis.hset(key, "history", json.dumps(history)) logger.debug("Shadow ban for %s jumped to stage %.2f (progress %.3f)", user_id, stage_float, progress) return progress
[docs] async def list_all(self) -> list[dict[str, Any]]: """Enumerate every active shadow ban with live progress and intensities. Scans Redis for all per-user ban hashes, loads each via :meth:`get_ban`, and enriches it with the current :meth:`get_progress` value and the :func:`_interpolate_curve` intensities so callers get a ready-to-render snapshot of the whole pipeline. Uses a non-blocking cursor ``SCAN`` loop rather than ``KEYS`` to stay safe on a shared Redis. Side effects: one or more ``SCAN`` calls over the ``stargazer:shadow_ban:*`` glob plus a :meth:`get_ban` and :meth:`get_progress` read per match; no writes. Returns ``[]`` when Redis is unavailable. Called internally by :meth:`format_list`; no external callers found by grep. Returns: A list of ban dicts (as from :meth:`get_ban`) each additionally carrying ``progress`` and ``intensities`` keys. """ if self._redis is None: return [] cursor = b"0" results = [] while True: cursor, keys = await self._redis.scan( cursor, match=f"{_REDIS_SB_PREFIX}:*", count=100, ) for key_raw in keys: key = key_raw.decode() if isinstance(key_raw, bytes) else key_raw user_id = key.split(":")[-1] ban = await self.get_ban(user_id) if ban: ban["progress"] = await self.get_progress(user_id) ban["intensities"] = _interpolate_curve(ban["progress"]) results.append(ban) if cursor == b"0" or cursor == 0: break return results
# ── Effect application ────────────────────────────────────────
[docs] async def apply_shadow_effects( self, user_id: str, model: str = "claude-sonnet-4-20250514", ) -> ShadowEffect: """Decide which shadow-ban effects fire for one incoming message. The core policy step: it reads the user's current progress, interpolates the per-effect intensities, then rolls the dice in priority order (blackout, then drop, then fake 503, then latency) and returns the first effect that triggers as a :class:`ShadowEffect`. It deliberately performs no side effects of its own — sleeping, dropping, sending the 503 and flipping the ban bit are all the caller's job — so the same decision can be logged and acted on separately. Side effects: reads progress via :meth:`get_progress` (one Redis read), consults :func:`_interpolate_curve`, may call :func:`_render_503` to prebuild the fake error and :func:`_base_target_seconds` for an initial (0-tool) latency target, and emits debug logs; it does not write Redis. Called by the message processor in ``message_processor/processor.py`` (as ``self._shadow_ban.apply_shadow_effects`` in the shadow-ban check, which then honors the returned flags). Args: user_id: Platform user id of the message author. model: Model name embedded in any rendered fake 503 text. Returns: A :class:`ShadowEffect`: empty when the user is unbanned, ``ban=True`` at progress 1.0, or otherwise carrying exactly one of the blackout/drop/fake-503/latency outcomes plus the source intensities. """ progress = await self.get_progress(user_id) if progress <= 0.0: return ShadowEffect() if progress >= 1.0: return ShadowEffect(ban=True) intensities = _interpolate_curve(progress) effect = ShadowEffect(intensities=intensities) # Blackout check (highest priority after ban) if intensities["blackout"] > 0 and random.random() < intensities["blackout"]: effect.blackout = True logger.debug("[shadow] %s: BLACKOUT (intensity=%.2f)", user_id, intensities["blackout"]) return effect # Drop check if intensities["drops"] > 0 and random.random() < intensities["drops"]: effect.drop = True logger.debug("[shadow] %s: DROP (intensity=%.2f)", user_id, intensities["drops"]) return effect # Fake 503 check if intensities["fake503"] > 0 and random.random() < intensities["fake503"]: effect.fake_503_text = _render_503(model) logger.debug("[shadow] %s: FAKE 503 (intensity=%.2f)", user_id, intensities["fake503"]) return effect # Latency (always applies if intensity > 0, computed later with tool count) if intensities["latency"] > 0: # Pre-compute base target (0 tools); will be refined post-response effect.delay_target_s = _base_target_seconds(intensities["latency"], 0) logger.debug("[shadow] %s: LATENCY target=%.1fs (intensity=%.2f)", user_id, effect.delay_target_s, intensities["latency"]) return effect
[docs] def refine_latency_for_tools( self, effect: ShadowEffect, num_tool_calls: int, overflow_llm_time_s: float = 0.0, ) -> None: """Recompute a LATENCY effect's delay target once the tool count is known. :meth:`apply_shadow_effects` runs before the response is generated and so can only guess a 0-tool latency target; this updates that target in place after generation, picking :func:`_base_target_seconds` for ≤6 tools or :func:`_overflow_target_seconds` for tool-heavy responses, so the padding scales with how much work the bot actually did. It mutates the passed :class:`ShadowEffect` (setting ``num_tool_calls`` and ``delay_target_s``) and returns nothing; a no-op when latency intensity or the tool count is non-positive. Side effects: mutates the given ``effect`` and emits a debug log; no I/O. Intended to be called from the response path after generation completes; no callers were found by grep (latency padding in ``message_processor/processor.py`` currently uses the pre-computed ``delay_target_s`` directly), so it may be dynamically/optionally wired. Args: effect: The :class:`ShadowEffect` from :meth:`apply_shadow_effects`, mutated in place. num_tool_calls: Actual number of tool calls made for the response. overflow_llm_time_s: Real LLM seconds spent on overflow tools, passed through to :func:`_overflow_target_seconds` for the >6-tool case. Returns: ``None``; the result is written onto ``effect``. """ intensity = effect.intensities.get("latency", 0.0) if intensity <= 0 or num_tool_calls <= 0: return effect.num_tool_calls = num_tool_calls if num_tool_calls <= 6: effect.delay_target_s = _base_target_seconds(intensity, num_tool_calls) else: effect.delay_target_s = _overflow_target_seconds( intensity, num_tool_calls, overflow_llm_time_s, ) logger.debug("[shadow] Refined latency: %d tools, target=%.1fs", num_tool_calls, effect.delay_target_s)
# ── Status formatting ─────────────────────────────────────────
[docs] async def format_status(self, user_id: str) -> str: """Render an admin-facing status box for one user's shadow ban. Builds a boxed, fixed-width text panel summarizing the ban's stage, progress, start/end times, and a per-effect table of intensity, completion and a Unicode bar (via the nested ``_bar`` helper) so an admin can see exactly where in the pipeline a user sits. Returns a plain "no active shadow ban" line when the user is not banned. Side effects: reads via :meth:`get_ban` and :meth:`get_progress` (Redis reads) and the wall clock, and consults :func:`_interpolate_curve` and :func:`_effect_completion`; no writes. Called by the admin command path in ``message_processor/processor.py`` (as ``self._shadow_ban.format_status``); no other callers found by grep. Args: user_id: Platform user id whose ban status to render. Returns: A multi-line status string (the boxed panel, with an optional trailing reason line), or a short "no active shadow ban" message. """ ban = await self.get_ban(user_id) if ban is None: return f"No active shadow ban for `{user_id}`." progress = await self.get_progress(user_id) stage = 1.0 + progress * 4.0 intensities = _interpolate_curve(progress) completions = _effect_completion(intensities) started = datetime.fromtimestamp(ban["started_at"], tz=timezone.utc) elapsed = time.time() - ban["started_at"] elapsed_str = f"{elapsed / 86400:.1f}d" if elapsed > 86400 else f"{elapsed / 3600:.1f}h" end_ts = ban["started_at"] + ban["max_duration_days"] * 86400 end_str = datetime.fromtimestamp(end_ts, tz=timezone.utc).strftime("%Y-%m-%d %H:%M") def _bar(val: float) -> str: """Render a 0.0-1.0 intensity as a 10-cell Unicode progress bar. Nested helper used only within :meth:`ShadowBanManager.format_status` to draw the per-effect intensity bars inside the status box (no internal callers found by grep; the identically named ``_bar`` in ``game_ncm.py`` and ``message_processor/proxy_status_commands.py`` are unrelated local helpers with different signatures). Scales ``val`` to a count of filled cells (``int(val * 10)``) and pads the remainder with empty cells, producing a fixed-width string of full (``█``) and light (``░``) block characters. Pure formatting, no I/O. Args: val: The effect intensity in the range 0.0-1.0. Returns: A 10-character bar string of ``█`` (filled) and ``░`` (empty) blocks. """ filled = int(val * 10) return "█" * filled + "░" * (10 - filled) lines = [ f"╔══════════════════════════════════════════════════════╗", f"║ SHADOW BAN — User {user_id:<34}║", f"╠══════════════════════════════════════════════════════╣", f"║ Stage: {stage:.2f} / 5.00 (progress: {progress:.3f}){' ' * 15}║", f"║ Started: {started.strftime('%Y-%m-%d %H:%M')} ({elapsed_str} ago){' ' * 14}║", f"║ Duration: {ban['max_duration_days']:.0f}d (ends: {end_str}){' ' * 10}║", f"╠──────────────────────────────────────────────────────╣", f"║ Effect Intensity Complete Bar ║", f"║ ─────────── ────────── ──────── ────────────── ║", ] effects = [ ("Latency", intensities["latency"], completions["latency"]), ("Drops", intensities["drops"], completions["drops"]), ("Fake 503", intensities["fake503"], completions["fake503"]), ("Blackout", intensities["blackout"], completions["blackout"]), ] for name, intensity, completion in effects: bar = _bar(intensity) lines.append( f"║ {name:<12}{intensity:<10.2f}{completion:<9.2f}{bar} ║" ) ban_status = "🔒 ACTIVE" if progress >= 1.0 else "at stage 5.0" lines.append(f"║ {'Ban':<12}{'—':<10}{'—':<9}{ban_status:<15}║") lines.append(f"╚══════════════════════════════════════════════════════╝") if ban.get("reason"): lines.append(f"Reason: {ban['reason']}") return "\n".join(lines)
[docs] async def format_list(self) -> str: """Render a one-line-per-user summary of all active shadow bans. Gathers every active ban via :meth:`list_all`, sorts them by progress (most-advanced first), and formats each as a compact bullet showing the user, stage, progress and reason — the overview an admin sees before drilling into a single user with :meth:`format_status`. Returns a plain "no active shadow bans" line when none exist. Side effects: delegates to :meth:`list_all` (Redis ``SCAN`` plus reads); no writes. Called by the admin command path in ``message_processor/processor.py`` (as ``self._shadow_ban.format_list``); no other callers found by grep. Returns: A Markdown-style multi-line summary string, or a short "no active shadow bans" message. """ bans = await self.list_all() if not bans: return "No active shadow bans." lines = ["**Active Shadow Bans:**\n"] for ban in sorted(bans, key=lambda b: b.get("progress", 0), reverse=True): progress = ban.get("progress", 0) stage = 1.0 + progress * 4.0 lines.append( f"• `{ban['user_id']}` — Stage {stage:.2f}/5.00 " f"(progress {progress:.3f}) — {ban.get('reason', 'no reason')}" ) return "\n".join(lines)