"""Shadow Ban Pipeline — ShadowBanManager.
# ═══════════════════════════════════════════════════════════════════
# THIS FILE IS THE CLEAN SOURCE for patience_engine.py.
# It is gitignored and MUST NEVER be committed to the repository.
#
# !sb commands documented here MUST NEVER be added to any tool
# registry, command listing, help text, or skill catalog.
# Star does not know they exist.
# ═══════════════════════════════════════════════════════════════════
Four-phase punishment pipeline:
1. LATENCY — pad response time (log-decay tool gradient, 75% plateau)
2. DROPS — silently ignore messages
3. FAKE 503 — mimic proxy capacity errors with retry-exhaust delay
4. BLACKOUT — near-total message suppression
+ BAN toggle — flip STARGAZER_USE bit at progress 1.0
"""
from __future__ import annotations
import asyncio
import logging
import math
import random
import time
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Any, TYPE_CHECKING
if TYPE_CHECKING:
pass
logger = logging.getLogger(__name__)
# ──────────────────────────────────────────────────────────────────
# Curve table (progress → intensity for each effect)
# ──────────────────────────────────────────────────────────────────
_CURVE_TABLE: list[tuple[float, float, float, float, float]] = [
# (progress, latency, drops, fake503, blackout)
(0.00, 0.00, 0.00, 0.00, 0.00),
(0.10, 0.25, 0.00, 0.00, 0.00),
(0.20, 0.60, 0.10, 0.00, 0.00),
(0.30, 1.00, 0.30, 0.10, 0.00),
(0.40, 0.85, 0.60, 0.40, 0.00),
(0.50, 0.75, 1.00, 0.70, 0.00),
(0.55, 0.75, 0.80, 1.00, 0.10),
(0.60, 0.75, 0.50, 0.60, 0.25),
(0.65, 0.75, 0.35, 0.30, 0.40),
(0.70, 0.75, 0.25, 0.10, 0.60),
(0.80, 0.75, 0.20, 0.05, 0.80),
(0.90, 0.75, 0.20, 0.02, 0.95),
(1.00, 0.00, 0.00, 0.00, 0.00), # ban toggle, effects irrelevant
]
def _interpolate_curve(progress: float) -> dict[str, float]:
"""Map a 0.0-1.0 ban progress value to per-effect intensities.
Performs piecewise-linear interpolation across the module-level
``_CURVE_TABLE`` so that each phase of the punishment pipeline (latency,
drops, fake 503, blackout) ramps up and down smoothly as the ban ages,
rather than jumping between the discrete rows. This is the single source of
truth for "how harsh is the bot right now" and is what gives the pipeline
its handcrafted curve shape.
Pure computation with no I/O. Called internally by
:meth:`ShadowBanManager.apply_shadow_effects` (to pick the live effect for a
message), :meth:`ShadowBanManager.format_status` and
:meth:`ShadowBanManager.list_all` (to display/report intensities); no
external callers were found by grep.
Args:
progress: Ban progress in the range 0.0-1.0; values outside the range
are clamped, and ``1.0`` (ban toggle) returns all-zero intensities.
Returns:
A mapping with keys ``"latency"``, ``"drops"``, ``"fake503"`` and
``"blackout"``, each an interpolated intensity in 0.0-1.0.
"""
progress = max(0.0, min(1.0, progress))
if progress >= 1.0:
return {"latency": 0.0, "drops": 0.0, "fake503": 0.0, "blackout": 0.0}
prev = _CURVE_TABLE[0]
for row in _CURVE_TABLE[1:]:
if row[0] >= progress:
# Linear interpolation between prev and row
span = row[0] - prev[0]
if span <= 0:
t = 0.0
else:
t = (progress - prev[0]) / span
return {
"latency": prev[1] + t * (row[1] - prev[1]),
"drops": prev[2] + t * (row[2] - prev[2]),
"fake503": prev[3] + t * (row[3] - prev[3]),
"blackout": prev[4] + t * (row[4] - prev[4]),
}
prev = row
return {"latency": 0.0, "drops": 0.0, "fake503": 0.0, "blackout": 0.0}
# ──────────────────────────────────────────────────────────────────
# Latency model
# ──────────────────────────────────────────────────────────────────
def _base_target_seconds(intensity: float, num_tool_calls: int) -> float:
"""Compute a padded response-time target (seconds) for up to 6 tool calls.
Models the LATENCY phase for ordinary responses: a base wait plus a
log-decay bonus for tool usage, scaled by the curve intensity and then
jittered down to a random 75%-100% of the ceiling so the padding never looks
perfectly uniform. Heavier tool use raises the target up to an 8-minute cap,
which is why the bot feels progressively more sluggish as a ban deepens.
Pure computation; it does seed randomness via :func:`random.uniform` but
touches no shared state or I/O. Called internally by
:meth:`ShadowBanManager.apply_shadow_effects` (pre-response, 0 tools) and
:meth:`ShadowBanManager.refine_latency_for_tools` (post-response, with the
real count) for the ≤6-tool branch; no external callers found by grep.
Args:
intensity: The latency curve intensity (0.0-1.0) for this ban progress.
num_tool_calls: Number of tool calls in the response; values ≤0 add no
tool bonus.
Returns:
A randomized delay target in seconds.
"""
BASE_MIN = 4.0
TOOL_BONUS_MIN = 4.0
if num_tool_calls <= 0:
tool_factor = 0.0
else:
tool_factor = min(1.0, math.log(1 + num_tool_calls) / math.log(7))
target_min = min(BASE_MIN + TOOL_BONUS_MIN * tool_factor, 8.0)
target_s = target_min * 60.0 * intensity
return random.uniform(0.75 * target_s, target_s)
def _overflow_target_seconds(
intensity: float,
num_tool_calls: int,
overflow_llm_time_s: float,
) -> float:
"""Compute a padded response-time target (seconds) for more than 6 tool calls.
Models the LATENCY phase for unusually tool-heavy responses, where the
fixed ceiling of :func:`_base_target_seconds` would be too small. It starts
from an 8-minute base scaled by intensity, adds the actual LLM time already
spent on the overflow tools, and tacks on a fixed per-extra-tool penalty so
that runaway tool loops are punished proportionally, then jitters the result
to 75%-100% of that ceiling.
Pure computation; seeds randomness via :func:`random.uniform` but performs
no I/O. Called internally only by
:meth:`ShadowBanManager.refine_latency_for_tools` for the >6-tool branch; no
external callers found by grep.
Args:
intensity: The latency curve intensity (0.0-1.0) for this ban progress.
num_tool_calls: Number of tool calls in the response (expected > 6).
overflow_llm_time_s: Real seconds of LLM time already spent on the
overflow tool calls, folded into the target.
Returns:
A randomized delay target in seconds.
"""
base_s = 8.0 * 60.0 * intensity
penalty_s = 7.0 * (num_tool_calls - 6)
target_s = base_s + overflow_llm_time_s + penalty_s
return random.uniform(0.75 * target_s, target_s)
# ──────────────────────────────────────────────────────────────────
# 503 error template
# ──────────────────────────────────────────────────────────────────
_503_TEMPLATE = (
'[PROXY ERROR] Request failed after remapping(fallback disabled): '
'Stream request failed: 503 {{\n'
'"error": {{\n'
'"code": 503,\n'
'"message": "No capacity available for model {model} on the server",\n'
'"status": "UNAVAILABLE",\n'
'"details": [{{\n'
'"@type": "type.googleapis.com/google.rpc.ErrorInfo",\n'
'"reason": "MODEL_CAPACITY_EXHAUSTED",\n'
'"domain": "cloudcode-pa.googleapis.com",\n'
'"metadata": {{\n'
' "model": "{model}",\n'
' "time": "{time}"\n'
'}}\n'
'}}]\n'
'}}\n'
'}}\n\n'
'Please try again later.'
)
def _render_503(model: str = "claude-sonnet-4-20250514") -> str:
"""Render a fake proxy 503 capacity-exhausted error message.
Fills the module-level ``_503_TEMPLATE`` with the given model name and the
current UTC timestamp to produce text that mimics a genuine
``MODEL_CAPACITY_EXHAUSTED`` failure from the upstream proxy. This is the
FAKE 503 phase of the pipeline: the bot serves this verbatim instead of a
real reply so a shadow-banned user blames the provider, not the bot.
Reads the wall clock via :func:`datetime.now` but does no network or disk
I/O. Called internally by :meth:`ShadowBanManager.apply_shadow_effects`,
which stashes the result on ``ShadowEffect.fake_503_text`` for the message
processor to send; no external callers found by grep.
Args:
model: Model name to embed in the error payload; defaults to a plausible
Claude model id.
Returns:
The fully rendered fake 503 error string, ending with "Please try again
later."
"""
ts = datetime.now(timezone.utc).isoformat()
return _503_TEMPLATE.format(model=model, time=ts)
def _simulate_retry_exhaust() -> float:
"""Return a randomized 20-100s delay imitating exhausted proxy retries.
Supplies the believable stall that precedes a fake 503: a real proxy would
burn time retrying a capacity-exhausted model before giving up, so the
message processor sleeps for this long (showing a typing indicator) before
emitting the :func:`_render_503` text, making the failure feel authentic.
Pure computation via :func:`random.uniform` with no I/O. Called by the
message processor in ``message_processor/processor.py`` (imported from
``patience_engine`` as ``_simulate_retry_exhaust`` and awaited inside
``asyncio.sleep`` on the fake-503 branch of the shadow-ban check); no other
callers found by grep.
Returns:
A delay in seconds, uniformly random in the range 20.0-100.0.
"""
return random.uniform(20.0, 100.0)
# ──────────────────────────────────────────────────────────────────
# Effect dataclass
# ──────────────────────────────────────────────────────────────────
[docs]
@dataclass
class ShadowEffect:
"""Decision record describing which shadow-ban effects apply to one message.
Produced by :meth:`ShadowBanManager.apply_shadow_effects` and consumed by the
message processor, which acts on the flags: sleeping for ``delay_target_s``,
silently returning on ``drop`` or ``blackout``, sending ``fake_503_text``, or
revoking access on ``ban``. It carries no behavior of its own — it is the
plain data hand-off between the policy (this module) and the I/O (the
processor). ``num_tool_calls`` is filled in after generation so the latency
target can be refined via :meth:`ShadowBanManager.refine_latency_for_tools`.
Imported from ``patience_engine`` and used by
``message_processor/processor.py`` (typed as the ``_shadow_effect`` local,
whose ``delay_target_s`` is later used to pad actual response time); also
referenced in ``tests/test_shadow_ban.py``.
Attributes:
delay_target_s: Total seconds the response should be padded to (LATENCY).
drop: When ``True``, the message is silently ignored (DROPS).
fake_503_text: Rendered fake 503 text to send instead of a reply, or
``None`` when the FAKE 503 effect does not fire.
blackout: When ``True``, the message is near-totally suppressed
(BLACKOUT).
ban: When ``True``, ban progress hit 1.0 and the access bit should flip.
num_tool_calls: Tool-call count, set after ``generate_and_send``
completes so latency can be refined.
intensities: The per-effect intensity mapping this decision was drawn
from (latency/drops/fake503/blackout).
"""
delay_target_s: float = 0.0
drop: bool = False
fake_503_text: str | None = None
blackout: bool = False
ban: bool = False
num_tool_calls: int = 0 # set after generate_and_send completes
intensities: dict[str, float] = field(default_factory=dict)
# ──────────────────────────────────────────────────────────────────
# Completion helpers (for status display)
# ──────────────────────────────────────────────────────────────────
def _effect_completion(intensities: dict[str, float]) -> dict[str, float]:
"""Translate raw per-effect intensities into 0.0-1.0 "how done" fractions.
Several effects on the curve ramp up and then back down (drops and fake 503
peak mid-ban, then taper as blackout takes over), so a bare intensity does
not tell an admin how far through that effect's lifecycle the ban is. This
re-maps each intensity against its known peak so the status box can show a
monotonic completion gauge per effect that is easier to read than the raw
sawtooth values.
Pure computation with no I/O. Called internally only by
:meth:`ShadowBanManager.format_status` to build the status display; no
external callers found by grep.
Args:
intensities: A mapping of effect name to current intensity (0.0-1.0), as
returned by :func:`_interpolate_curve`.
Returns:
A mapping with the same keys (``"latency"``, ``"drops"``, ``"fake503"``,
``"blackout"``) whose values are completion fractions in 0.0-1.0.
"""
lat = intensities.get("latency", 0.0)
drp = intensities.get("drops", 0.0)
f503 = intensities.get("fake503", 0.0)
blk = intensities.get("blackout", 0.0)
return {
"latency": 1.0 if lat <= 0.75 and lat > 0.0 else min(lat / 0.75, 1.0) if lat <= 0.75 else 1.0,
"drops": min(1.0, 1.0 - max(0, (drp - 0.20) / 0.80)) if drp >= 0.20 else drp / 0.20 if drp > 0 else 0.0,
"fake503": min(1.0, 1.0 - max(0, (f503 - 0.05) / 0.95)) if f503 >= 0.05 else f503 / 0.05 if f503 > 0 else 0.0,
"blackout": min(blk / 0.95, 1.0) if blk > 0 else 0.0,
}
# ──────────────────────────────────────────────────────────────────
# Redis key prefix for shadow ban data
# ──────────────────────────────────────────────────────────────────
_REDIS_SB_PREFIX = "stargazer:shadow_ban"
def _sb_redis_key(user_id: str) -> str:
"""Build the Redis hash key holding a single user's shadow-ban state.
Joins the module-level ``_REDIS_SB_PREFIX`` (``stargazer:shadow_ban``) with
the user id to produce the per-user key (``stargazer:shadow_ban:{user_id}``)
under which the ban hash lives. Pure string construction with no I/O. The
matching ``stargazer:shadow_ban:*`` glob is what :meth:`ShadowBanManager.list_all`
scans for. Called internally by :meth:`ShadowBanManager.start_ban`,
:meth:`ShadowBanManager.get_ban`, :meth:`ShadowBanManager.lift_ban` and
:meth:`ShadowBanManager.jump_to`; no external callers were found by grep.
Args:
user_id: The platform user id the ban is keyed on.
Returns:
The fully-qualified Redis hash key for that user's shadow-ban record.
"""
return f"{_REDIS_SB_PREFIX}:{user_id}"
# ──────────────────────────────────────────────────────────────────
# ShadowBanManager
# ──────────────────────────────────────────────────────────────────
[docs]
class ShadowBanManager:
"""Owns the shadow-ban pipeline: ban CRUD, effect decisions, and status.
Single entry point for the four-phase punishment pipeline (latency, drops,
fake 503, blackout, plus the final ban toggle). It keeps fast per-user ban
state in Redis hashes under the ``stargazer:shadow_ban:{user_id}`` prefix and
optionally mirrors a durable audit record into FalkorDB through the
knowledge-graph manager. Effect intensity is derived purely from elapsed
time versus the configured duration, so progression happens silently with no
background task.
Redis is the only mandatory dependency; the FalkorDB/KG handle is optional
and used solely for the audit trail. Constructed via the ``!sb`` tool helpers
in ``tools/stargazer_shadowban.py`` (which build one per command from
``redis``, ``ctx.kg_manager`` and ``config``), held as ``self._shadow_ban``
on the message processor in ``message_processor/processor.py`` (imported from
``patience_engine``), and instantiated directly in
``tests/test_shadow_ban.py``.
"""
[docs]
def __init__(
self,
redis: Any,
kg_manager: Any | None = None,
config: Any = None,
) -> None:
"""Initialize the manager with its backing stores.
Captures the Redis client used for fast ban state (hashes under the
``stargazer:shadow_ban:{user_id}`` prefix), an optional knowledge-graph
manager for the persistent audit trail, and an optional config object.
No I/O happens here; the handles are simply stored on the instance for
later use by the ban CRUD, effect, and status methods.
Stores the three handles as ``self._redis``, ``self._kg``, and
``self._config``; ``self._redis`` is later read/written by
:meth:`start_ban`, :meth:`get_ban`, :meth:`lift_ban`, :meth:`jump_to`
and :meth:`list_all`, while ``self._kg`` is used by :meth:`start_ban`
to persist a ``ShadowBan`` entity to FalkorDB when present. Instantiated
by the ``!sb`` tool helpers in ``tools/stargazer_shadowban.py`` (passing
``redis``, ``ctx.kg_manager`` and ``config``), held as ``self._shadow_ban``
on the message processor in ``message_processor/processor.py``, and
constructed directly in ``tests/test_shadow_ban.py``.
Args:
redis: Async Redis client used for all fast ban state access;
may be ``None``, in which case read paths return empty results.
kg_manager: Optional knowledge-graph/FalkorDB manager exposing
``add_entity`` for the persistent shadow-ban audit trail.
config: Optional configuration object retained for callers; not
otherwise consumed by this class.
"""
self._redis = redis
self._kg = kg_manager
self._config = config
# ── Ban CRUD ──────────────────────────────────────────────────
[docs]
async def start_ban(
self,
user_id: str,
duration_days: float = 15.0,
reason: str = "",
initiated_by: str = "admin",
platform: str = "discord",
) -> dict[str, Any]:
"""Begin (or reset) a shadow ban for a user and persist its initial state.
Writes the ban's start time, configured duration, reason and metadata
into the user's Redis hash so that subsequent progress is computed from
elapsed wall-clock time, and seeds a one-entry history log. When a
knowledge-graph manager is present it also best-effort persists a
``ShadowBan`` entity to FalkorDB for the durable audit trail; a failure
there is swallowed and logged rather than aborting the ban.
Side effects: HSETs the full mapping into ``stargazer:shadow_ban:{user_id}``
via the async Redis client, calls ``self._kg.add_entity`` when ``self._kg``
is set, and emits a debug log. Called by the ``!sb`` start helper in
``tools/stargazer_shadowban.py`` and by the admin command path in
``message_processor/processor.py`` (as ``self._shadow_ban.start_ban``).
Args:
user_id: Platform user id to shadow-ban.
duration_days: Total days over which the ban ramps from stage 1 to
the final ban toggle; defaults to 15.
reason: Human-readable reason recorded in Redis and the audit entity.
initiated_by: Identifier of the admin/actor starting the ban.
platform: Platform name the ban applies to (e.g. ``"discord"``).
Returns:
A dict with the ``user_id``, the numeric ``started_at`` epoch
timestamp, and the ``duration_days`` used.
"""
import json
now = time.time()
data = {
"started_at": str(now),
"max_duration_days": str(duration_days),
"reason": reason,
"initiated_by": initiated_by,
"platform": platform,
"history": json.dumps([{
"progress": 0.0,
"timestamp": now,
"action": "started",
}]),
}
key = _sb_redis_key(user_id)
await self._redis.hset(key, mapping=data)
logger.debug("Shadow ban started for %s (%.0f days)", user_id, duration_days)
# Persist to FalkorDB if available
if self._kg is not None:
try:
await self._kg.add_entity(
name=user_id,
entity_type="ShadowBan",
description=f"Shadow ban: {reason or 'no reason given'}",
category="general",
scope_id="global",
created_by=initiated_by,
metadata=json.dumps({
"reason": reason,
"initiated_by": initiated_by,
"platform": platform,
"duration_days": duration_days,
}),
)
except Exception:
logger.debug("Failed to persist shadow ban to FalkorDB", exc_info=True)
return {"user_id": user_id, "started_at": now, "duration_days": duration_days}
[docs]
async def get_ban(self, user_id: str) -> dict[str, Any] | None:
"""Fetch and normalize a user's stored shadow-ban record from Redis.
Reads the per-user ban hash and decodes/coerces its fields into a typed
dict (floats for timestamps and duration, strings for the rest), using
the nested ``_s`` helper to tolerate both ``bytes`` and ``str`` Redis
return types. Returns ``None`` when Redis is unavailable or the user has
no ban, which is the contract the progress and status methods rely on.
Side effects: a single ``HGETALL`` on ``stargazer:shadow_ban:{user_id}``
via the async Redis client; no writes. Called internally by
:meth:`get_progress`, :meth:`jump_to`, :meth:`list_all` and
:meth:`format_status`, and externally by the ``!sb`` helpers in
``tools/stargazer_shadowban.py``.
Args:
user_id: Platform user id whose ban record to read.
Returns:
A dict with keys ``user_id``, ``started_at``, ``max_duration_days``,
``reason``, ``initiated_by``, ``platform`` and ``history`` (the raw
JSON history string), or ``None`` if no ban exists or Redis is unset.
"""
if self._redis is None:
return None
key = _sb_redis_key(user_id)
data = await self._redis.hgetall(key)
if not data:
return None
def _s(k: str) -> str:
"""Read one field from the fetched ban hash as a decoded string.
Nested helper used only inside :meth:`ShadowBanManager.get_ban` to
normalize values from the ``hgetall`` result (no internal callers
found by grep). The Redis client may return either ``bytes`` or
``str`` depending on its decode settings, so this looks up key
``k`` in the captured ``data`` mapping, UTF-8 decodes ``bytes``, and
coerces a missing or falsy value to the empty string. Reads only the
in-memory ``data`` dict; performs no Redis I/O of its own.
Args:
k: The ban-hash field name to read (e.g. ``"started_at"``).
Returns:
The field's value as a ``str``, or ``""`` when absent or empty.
"""
v = data.get(k, b"")
return v.decode() if isinstance(v, bytes) else (v or "")
return {
"user_id": user_id,
"started_at": float(_s("started_at") or "0"),
"max_duration_days": float(_s("max_duration_days") or "15"),
"reason": _s("reason"),
"initiated_by": _s("initiated_by"),
"platform": _s("platform"),
"history": _s("history"),
}
[docs]
async def lift_ban(self, user_id: str) -> bool:
"""Delete a user's shadow-ban record, ending all effects immediately.
Removes the Redis hash so progress and effect lookups fall back to "not
banned", which fully clears latency, drops, fake 503 and blackout for
that user on their next message. The FalkorDB audit entity, if any, is
intentionally left in place as history.
Side effects: an ``EXISTS`` then ``DELETE`` on
``stargazer:shadow_ban:{user_id}`` via the async Redis client, and a
debug log when a ban was actually removed. Called by the ``!sb`` lift
helper in ``tools/stargazer_shadowban.py`` and by the admin command path
in ``message_processor/processor.py`` (as ``self._shadow_ban.lift_ban``).
Args:
user_id: Platform user id whose ban to remove.
Returns:
``True`` if a ban existed and was deleted, ``False`` otherwise.
"""
key = _sb_redis_key(user_id)
existed = await self._redis.exists(key)
if existed:
await self._redis.delete(key)
logger.debug("Shadow ban lifted for %s", user_id)
return bool(existed)
[docs]
async def get_progress(self, user_id: str) -> float:
"""Compute a user's ban progress (0.0-1.0) from elapsed time.
Derives how far the ban has advanced purely from wall-clock elapsed time
since ``started_at`` over the configured duration, which is what makes
the pipeline tick forward without any scheduler. The value feeds
:func:`_interpolate_curve` to pick live effect intensities; ``1.0`` means
the final ban toggle has been reached.
Side effects: delegates to :meth:`get_ban` (one Redis read) and reads the
wall clock; no writes. Returns ``0.0`` for unbanned users and ``1.0`` for
a non-positive duration. Called internally by
:meth:`apply_shadow_effects`, :meth:`list_all` and :meth:`format_status`,
and externally by the ``!sb`` status helper in
``tools/stargazer_shadowban.py``.
Args:
user_id: Platform user id to evaluate.
Returns:
Ban progress clamped to the range 0.0-1.0 (0.0 if no ban exists).
"""
ban = await self.get_ban(user_id)
if ban is None:
return 0.0
elapsed = time.time() - ban["started_at"]
days = ban["max_duration_days"]
if days <= 0:
return 1.0
return min(1.0, elapsed / (days * 86400))
[docs]
async def jump_to(self, user_id: str, stage_float: float) -> float:
"""Fast-forward (or rewind) a ban to a specific 1.0-5.0 pipeline stage.
Lets an admin test or skip ahead by back-dating the ban's ``started_at``
so that elapsed-over-duration equals the progress implied by the
requested stage, since progress is otherwise time-driven. Stage 1.0 maps
to progress 0.0 and stage 5.0 to progress 1.0 (the ban toggle); the move
is appended to the ban's history log for the audit trail.
Side effects: an ``HSET`` of the recomputed ``started_at`` and an
``HSET`` of the updated ``history`` JSON on
``stargazer:shadow_ban:{user_id}`` (after a :meth:`get_ban` read), plus a
debug log. Called by the admin command path in
``message_processor/processor.py`` (as ``self._shadow_ban.jump_to``); no
other callers found by grep.
Args:
user_id: Platform user id whose ban to reposition.
stage_float: Target stage in the range 1.0-5.0; the derived progress
is clamped to 0.0-1.0.
Returns:
The new progress value (0.0-1.0), or ``0.0`` if no ban exists.
"""
import json
progress = (stage_float - 1.0) / 4.0
progress = max(0.0, min(1.0, progress))
ban = await self.get_ban(user_id)
if ban is None:
return 0.0
# Rewrite started_at so that elapsed/duration = progress
duration_s = ban["max_duration_days"] * 86400
new_started = time.time() - (progress * duration_s)
key = _sb_redis_key(user_id)
await self._redis.hset(key, "started_at", str(new_started))
# Append to history
try:
history = json.loads(ban.get("history", "[]") or "[]")
except Exception:
history = []
history.append({
"progress": progress,
"timestamp": time.time(),
"action": f"jumped to stage {stage_float:.2f}",
})
await self._redis.hset(key, "history", json.dumps(history))
logger.debug("Shadow ban for %s jumped to stage %.2f (progress %.3f)",
user_id, stage_float, progress)
return progress
[docs]
async def list_all(self) -> list[dict[str, Any]]:
"""Enumerate every active shadow ban with live progress and intensities.
Scans Redis for all per-user ban hashes, loads each via :meth:`get_ban`,
and enriches it with the current :meth:`get_progress` value and the
:func:`_interpolate_curve` intensities so callers get a ready-to-render
snapshot of the whole pipeline. Uses a non-blocking cursor ``SCAN`` loop
rather than ``KEYS`` to stay safe on a shared Redis.
Side effects: one or more ``SCAN`` calls over the
``stargazer:shadow_ban:*`` glob plus a :meth:`get_ban` and
:meth:`get_progress` read per match; no writes. Returns ``[]`` when Redis
is unavailable. Called internally by :meth:`format_list`; no external
callers found by grep.
Returns:
A list of ban dicts (as from :meth:`get_ban`) each additionally
carrying ``progress`` and ``intensities`` keys.
"""
if self._redis is None:
return []
cursor = b"0"
results = []
while True:
cursor, keys = await self._redis.scan(
cursor, match=f"{_REDIS_SB_PREFIX}:*", count=100,
)
for key_raw in keys:
key = key_raw.decode() if isinstance(key_raw, bytes) else key_raw
user_id = key.split(":")[-1]
ban = await self.get_ban(user_id)
if ban:
ban["progress"] = await self.get_progress(user_id)
ban["intensities"] = _interpolate_curve(ban["progress"])
results.append(ban)
if cursor == b"0" or cursor == 0:
break
return results
# ── Effect application ────────────────────────────────────────
[docs]
async def apply_shadow_effects(
self,
user_id: str,
model: str = "claude-sonnet-4-20250514",
) -> ShadowEffect:
"""Decide which shadow-ban effects fire for one incoming message.
The core policy step: it reads the user's current progress, interpolates
the per-effect intensities, then rolls the dice in priority order
(blackout, then drop, then fake 503, then latency) and returns the first
effect that triggers as a :class:`ShadowEffect`. It deliberately performs
no side effects of its own — sleeping, dropping, sending the 503 and
flipping the ban bit are all the caller's job — so the same decision can
be logged and acted on separately.
Side effects: reads progress via :meth:`get_progress` (one Redis read),
consults :func:`_interpolate_curve`, may call :func:`_render_503` to
prebuild the fake error and :func:`_base_target_seconds` for an initial
(0-tool) latency target, and emits debug logs; it does not write Redis.
Called by the message processor in ``message_processor/processor.py`` (as
``self._shadow_ban.apply_shadow_effects`` in the shadow-ban check, which
then honors the returned flags).
Args:
user_id: Platform user id of the message author.
model: Model name embedded in any rendered fake 503 text.
Returns:
A :class:`ShadowEffect`: empty when the user is unbanned, ``ban=True``
at progress 1.0, or otherwise carrying exactly one of the
blackout/drop/fake-503/latency outcomes plus the source intensities.
"""
progress = await self.get_progress(user_id)
if progress <= 0.0:
return ShadowEffect()
if progress >= 1.0:
return ShadowEffect(ban=True)
intensities = _interpolate_curve(progress)
effect = ShadowEffect(intensities=intensities)
# Blackout check (highest priority after ban)
if intensities["blackout"] > 0 and random.random() < intensities["blackout"]:
effect.blackout = True
logger.debug("[shadow] %s: BLACKOUT (intensity=%.2f)", user_id, intensities["blackout"])
return effect
# Drop check
if intensities["drops"] > 0 and random.random() < intensities["drops"]:
effect.drop = True
logger.debug("[shadow] %s: DROP (intensity=%.2f)", user_id, intensities["drops"])
return effect
# Fake 503 check
if intensities["fake503"] > 0 and random.random() < intensities["fake503"]:
effect.fake_503_text = _render_503(model)
logger.debug("[shadow] %s: FAKE 503 (intensity=%.2f)", user_id, intensities["fake503"])
return effect
# Latency (always applies if intensity > 0, computed later with tool count)
if intensities["latency"] > 0:
# Pre-compute base target (0 tools); will be refined post-response
effect.delay_target_s = _base_target_seconds(intensities["latency"], 0)
logger.debug("[shadow] %s: LATENCY target=%.1fs (intensity=%.2f)",
user_id, effect.delay_target_s, intensities["latency"])
return effect
# ── Status formatting ─────────────────────────────────────────