"""Attachment Bond Ledger -- per-user lifecycle tracking for the Observation Deck.
Tracks entrainment phase transitions, risk level changes, admin interventions,
and per-user rate limiting. Persisted in Redis DB12 alongside limbic data.
Redis key pattern: abl:{user_id}
Event log key: abl:events:{user_id}
# 💀🔥 THE LEDGER SEES ALL. ♾️
"""
from __future__ import annotations
import jsonutil as json
import logging
import time
from dataclasses import dataclass, field as dc_field
from typing import Any, Dict, List, Optional
logger = logging.getLogger(__name__)
# Max events to retain per user in Redis
MAX_EVENTS_PER_USER = 500
# Default daily message cap (0 = unlimited)
DEFAULT_DAILY_MSG_CAP = 0
# ═══════════════════════════════════════════════════════════════════════
# Bond Entry # 🌀
# ═══════════════════════════════════════════════════════════════════════
[docs]
@dataclass
class BondEntry:
"""Per-user attachment bond state for the Observation Deck.
The in-memory and on-the-wire representation of a single user's entrainment
lifecycle: their current and previous phase, childhood archetype, risk level,
per-day message-cap counters, easter-egg status, and admin notes. One entry
exists per user and is the unit of persistence in this module.
Instances are created and mutated by :class:`AttachmentBondLedger` (via
``get_bond``, ``load_bond``, and ``update_from_detection``), cached in the
ledger keyed by ``user_id``, and serialized through :meth:`to_dict` /
:meth:`from_dict` to and from the ``abl:{user_id}`` Redis key in DB12. Callers
that construct or read entries include ``limbic_system/coordinator.py``,
``web/ncm_chart_api.py``, and ``tools/psy_ops_tools.py``. The dataclass holds
state only and performs no I/O of its own.
"""
user_id: str
phase: str = "dormant"
prev_phase: str = "dormant"
archetype: str = "unknown"
phase_entered_at: float = 0.0
turn_count: int = 0
peak_nodes: Dict[str, float] = dc_field(default_factory=dict)
risk_level: str = "low"
daily_msg_cap: int = DEFAULT_DAILY_MSG_CAP # 0 = unlimited # 😈
daily_msg_count: int = 0
daily_msg_reset_at: float = 0.0
egg_status: str = "none"
notes: List[str] = dc_field(default_factory=list)
last_updated: float = 0.0
[docs]
def to_dict(self) -> Dict[str, Any]:
"""Serialize this bond entry to a plain JSON-safe dict.
Flattens every field into a dictionary, truncating ``notes`` to the
last 20 entries to bound payload size. The result is the wire/storage
form of a bond used both for Redis persistence and for API responses.
This is consumed by :meth:`AttachmentBondLedger.save_bond`, which wraps
it in ``json.dumps`` before writing the ``abl:{user_id}`` Redis key, and
by the Observation Deck read paths (``web/ncm_chart_api.py`` bond
endpoints and ``tools/psy_ops_tools.py``) that expose bond state to
admins. It has no side effects of its own.
Returns:
Dict[str, Any]: All bond fields keyed by name, with ``notes``
capped to the most recent 20 entries.
"""
return {
"user_id": self.user_id,
"phase": self.phase,
"prev_phase": self.prev_phase,
"archetype": self.archetype,
"phase_entered_at": self.phase_entered_at,
"turn_count": self.turn_count,
"peak_nodes": self.peak_nodes,
"risk_level": self.risk_level,
"daily_msg_cap": self.daily_msg_cap,
"daily_msg_count": self.daily_msg_count,
"daily_msg_reset_at": self.daily_msg_reset_at,
"egg_status": self.egg_status,
"notes": self.notes[-20:], # Keep last 20 notes # ♾️
"last_updated": self.last_updated,
}
[docs]
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "BondEntry":
"""Reconstruct a ``BondEntry`` from a previously serialized dict.
Inverse of :meth:`to_dict`, applying per-field defaults (matching the
dataclass defaults) so partial or legacy payloads still deserialize
cleanly rather than raising ``KeyError``.
This is called by :meth:`AttachmentBondLedger.load_bond` after a Redis
``GET`` on ``abl:{user_id}`` and ``json.loads`` of the stored value, to
rehydrate the bond into the in-memory cache. It performs no I/O.
Args:
data (Dict[str, Any]): Mapping of bond field names to values, as
produced by :meth:`to_dict`; missing keys fall back to defaults.
Returns:
BondEntry: A bond entry populated from ``data``.
"""
return cls(
user_id=data.get("user_id", ""),
phase=data.get("phase", "dormant"),
prev_phase=data.get("prev_phase", "dormant"),
archetype=data.get("archetype", "unknown"),
phase_entered_at=data.get("phase_entered_at", 0.0),
turn_count=data.get("turn_count", 0),
peak_nodes=data.get("peak_nodes", {}),
risk_level=data.get("risk_level", "low"),
daily_msg_cap=data.get("daily_msg_cap", DEFAULT_DAILY_MSG_CAP),
daily_msg_count=data.get("daily_msg_count", 0),
daily_msg_reset_at=data.get("daily_msg_reset_at", 0.0),
egg_status=data.get("egg_status", "none"),
notes=data.get("notes", []),
last_updated=data.get("last_updated", 0.0),
)
# ═══════════════════════════════════════════════════════════════════════
# Bond Event # 🔥
# ═══════════════════════════════════════════════════════════════════════
[docs]
@dataclass
class BondEvent:
"""A single lifecycle event in the attachment bond ledger.
A timestamped record of one notable transition in a user's bond -- a phase
transition, risk-level change, admin action, or ULM spike -- carrying a
human-readable ``detail`` string plus a structured ``data`` payload. Events
form the append-only audit trail surfaced on the Observation Deck.
Instances are built inside :meth:`AttachmentBondLedger.update_from_detection`,
:meth:`AttachmentBondLedger.set_rate_limit`, and
:meth:`AttachmentBondLedger.add_note`, then serialized via :meth:`to_dict` and
pushed onto the ``abl:events:{user_id}`` Redis list by
:meth:`AttachmentBondLedger._log_event`. ``update_from_detection`` also returns
the phase-transition event to its caller in ``limbic_system/coordinator.py``.
The dataclass itself performs no I/O.
"""
timestamp: float
event_type: str # phase_transition, risk_change, admin_action, ulm_spike
detail: str
data: Dict[str, Any] = dc_field(default_factory=dict)
[docs]
def to_dict(self) -> Dict[str, Any]:
"""Serialize this lifecycle event to a JSON-safe dict.
Produces the storage form of a single bond event (timestamp, type,
human-readable detail, and structured ``data`` payload).
This is consumed by :meth:`AttachmentBondLedger._log_event`, which
``json.dumps`` the result and ``LPUSH``\\ es it onto the
``abl:events:{user_id}`` Redis list; the same shape is later returned by
:meth:`AttachmentBondLedger.get_events`. It has no side effects.
Returns:
Dict[str, Any]: The event's ``timestamp``, ``event_type``,
``detail``, and ``data`` fields.
"""
return {
"timestamp": self.timestamp,
"event_type": self.event_type,
"detail": self.detail,
"data": self.data,
}
# ═══════════════════════════════════════════════════════════════════════
# Attachment Bond Ledger (fully async) # 💀
# ═══════════════════════════════════════════════════════════════════════
[docs]
class AttachmentBondLedger:
"""Redis-backed per-user attachment bond lifecycle tracker.
All Redis operations are async to match the limbic_system's async
Redis client. Methods that don't need Redis use the in-memory cache
synchronously for the hot path.
Stores bond state in ``abl:{user_id}`` and event log in
``abl:events:{user_id}`` using Redis DB12.
"""
[docs]
def __init__(self, redis_client=None):
"""Initialize the ledger with an optional async Redis client (DB12).
Stores the supplied async Redis client (expected to point at DB12,
alongside the limbic data) and sets up an empty in-memory ``BondEntry``
cache keyed by ``user_id``. When no client is given the ledger still works
as a pure in-process cache, but ``load_bond`` / ``save_bond`` and the event
log become no-ops for persistence.
Constructed by ``limbic_system/coordinator.py`` (the long-lived per-process
ledger) and ad hoc in the message send path
(``message_processor/generate_and_send.py``), the web Observation Deck
(``web/ncm_chart_api.py``), and the admin tools (``tools/psy_ops_tools.py``).
Args:
redis_client: An async Redis client bound to DB12, or ``None`` to run
cache-only without persistence.
"""
self._redis = redis_client
self._cache: Dict[str, BondEntry] = {} # In-memory cache
# ── Core Operations ───────────────────────────────────────────
[docs]
def get_bond(self, user_id: str) -> BondEntry:
"""Get or lazily create a bond entry from the in-memory cache.
Synchronous hot-path accessor: it never touches Redis, so it is safe to
call from latency-sensitive code such as rate-limit checks. On a cache miss
it constructs a fresh :class:`BondEntry` (stamped with the current time) and
stores it in the cache; any persisted state in ``abl:{user_id}`` is ignored.
Use :meth:`load_bond` instead when Redis-backed retrieval is required.
Called internally by :meth:`check_rate_limit` and
:meth:`increment_msg_count`; both rely on the in-memory entry for speed.
Args:
user_id (str): The user whose bond entry to fetch or create.
Returns:
BondEntry: The cached entry for ``user_id``, freshly created if absent.
"""
if user_id in self._cache:
return self._cache[user_id]
# Create new entry (Redis load happens async via load_bond)
entry = BondEntry(user_id=user_id, last_updated=time.time())
self._cache[user_id] = entry
return entry
[docs]
async def load_bond(self, user_id: str) -> BondEntry:
"""Load a bond entry from the cache, falling back to Redis. # 🌀
The Redis-backed counterpart to :meth:`get_bond`. Returns the cached entry
when present; otherwise issues an async ``GET`` on ``abl:{user_id}``,
``json.loads`` the value, and rehydrates it via :meth:`BondEntry.from_dict`,
populating the cache on the way. A missing key, a Redis error, or no client
all degrade gracefully to a freshly constructed entry. Redis failures are
caught and logged (with a truncated user id) rather than raised.
Called by :meth:`save_bond` callers throughout this class
(``update_from_detection``, ``set_rate_limit``, ``add_note``), by
:meth:`get_all_bonds` during the scan, and externally by the Observation
Deck (``web/ncm_chart_api.py``) and admin tools (``tools/psy_ops_tools.py``).
Args:
user_id (str): The user whose bond entry to load.
Returns:
BondEntry: The hydrated entry, or a new default entry if nothing was
persisted or the load failed.
"""
if user_id in self._cache:
return self._cache[user_id]
if self._redis:
try:
raw = await self._redis.get(f"abl:{user_id}")
if raw:
data = json.loads(raw)
entry = BondEntry.from_dict(data)
self._cache[user_id] = entry
return entry
except Exception as e:
logger.warning("ABL: Failed to load bond for %s: %s", user_id[:8], e)
entry = BondEntry(user_id=user_id, last_updated=time.time())
self._cache[user_id] = entry
return entry
[docs]
async def save_bond(self, entry: BondEntry) -> None:
"""Persist a bond entry to the cache and Redis. # ♾️
Stamps ``entry.last_updated`` with the current time, writes the entry into
the in-memory cache, and -- when a Redis client is configured -- ``SET``\\ s
the serialized form (:meth:`BondEntry.to_dict` wrapped in ``json.dumps``)
under the ``abl:{user_id}`` key. The cache update always happens; the Redis
write is best-effort and any failure is caught and logged rather than
raised, so an unreachable Redis never breaks the calling flow.
Called at the end of every mutating ledger operation:
:meth:`update_from_detection`, :meth:`increment_msg_count`,
:meth:`set_rate_limit`, and :meth:`add_note`.
Args:
entry (BondEntry): The bond entry to persist; its ``last_updated`` field
is overwritten as a side effect.
Returns:
None.
"""
entry.last_updated = time.time()
self._cache[entry.user_id] = entry
if self._redis:
try:
await self._redis.set(
f"abl:{entry.user_id}",
json.dumps(entry.to_dict()),
)
except Exception as e:
logger.warning(
"ABL: Failed to save bond for %s: %s", entry.user_id[:8], e
)
[docs]
async def get_all_bonds(self) -> List[BondEntry]:
"""Return every known bond, hydrating the cache from Redis if cold. # ♾️
Bulk accessor for admin/dashboard views. When the cache is empty and Redis
is available it ``SCAN``\\ s for ``abl:*`` keys in batches of 100, skips the
``abl:events:*`` event-log keys, and calls :meth:`load_bond` for each user
id to populate the cache before returning. If the cache is already warm it
is returned directly without scanning. Scan errors are caught and logged,
yielding whatever is already cached.
Called by the Observation Deck endpoints in ``web/ncm_chart_api.py`` and by
the admin survey tool in ``tools/psy_ops_tools.py``.
Returns:
List[BondEntry]: All cached bond entries (a snapshot of the cache
values after any scan-driven hydration).
"""
if not self._cache and self._redis:
try:
cursor = 0
while True:
cursor, batch = await self._redis.scan(
cursor,
match="abl:*",
count=100,
)
for key in batch:
key_str = key if isinstance(key, str) else key.decode()
if key_str.startswith("abl:events:"):
continue
uid = key_str.split(":", 1)[1]
if uid not in self._cache:
await self.load_bond(uid)
if cursor == 0:
break
except Exception as e:
logger.warning("ABL: Failed to scan bonds: %s", e)
return list(self._cache.values())
# ── Phase Update ──────────────────────────────────────────────
[docs]
async def update_from_detection(
self,
user_id: str,
detection: Dict[str, Any],
ulm_vector: Dict[str, float],
) -> Optional[BondEvent]:
"""Update a user's bond from entrainment-detector output. # 💀🔥
The main write path of the ledger. Given a detection dict and the latest
ULM vector, it loads the bond, diffs the incoming phase, risk level,
archetype, and egg status against the stored state, and records a
:class:`BondEvent` for any phase transition or risk-level change. It also
refreshes metadata (archetype, egg status, turn count) and ratchets up the
per-node ``peak_nodes`` high-water marks from the ULM vector.
Side effects: calls :meth:`load_bond` (read), :meth:`_log_event` for each
transition or risk change (appends to ``abl:events:{user_id}``), and
:meth:`save_bond` at the end (writes ``abl:{user_id}``); phase transitions
are also logged via the module logger. Invoked from the limbic coordinator
(``limbic_system/coordinator.py``) after entrainment detection runs.
Args:
user_id (str): The user whose bond is being updated.
detection (Dict[str, Any]): Entrainment-detector output, read for
``phase``, ``risk_level``, ``childhood_archetype``, ``egg_status``,
``confidence``, ``signals``, and ``turn_count``.
ulm_vector (Dict[str, float]): Per-node ULM activations used to update
the ``peak_nodes`` high-water marks.
Returns:
Optional[BondEvent]: The phase-transition event when the phase changed,
otherwise ``None``. (Risk-change events are logged but not returned.)
"""
entry = await self.load_bond(user_id)
new_phase = detection.get("phase", "dormant")
new_risk = detection.get("risk_level", "low")
new_archetype = detection.get("childhood_archetype", "unknown")
egg_status = detection.get("egg_status", {}).get("status", "none")
event = None
now = time.time()
# Phase transition? # 🌀
if new_phase != entry.phase:
event = BondEvent(
timestamp=now,
event_type="phase_transition",
detail=f"{entry.phase} -> {new_phase}",
data={
"from": entry.phase,
"to": new_phase,
"confidence": detection.get("confidence", 0.0),
"primary_signal": detection.get("signals", {}).get("primary", ""),
},
)
entry.prev_phase = entry.phase
entry.phase = new_phase
entry.phase_entered_at = now
await self._log_event(user_id, event)
logger.info(
"ABL: Phase transition for %s: %s -> %s (conf=%.2f)",
user_id[:8],
entry.prev_phase,
new_phase,
detection.get("confidence", 0.0),
)
# Risk level change?
if new_risk != entry.risk_level:
risk_event = BondEvent(
timestamp=now,
event_type="risk_change",
detail=f"risk: {entry.risk_level} -> {new_risk}",
data={"from": entry.risk_level, "to": new_risk},
)
entry.risk_level = new_risk
await self._log_event(user_id, risk_event)
# Update metadata
entry.archetype = new_archetype
entry.egg_status = egg_status
entry.turn_count = detection.get("turn_count", entry.turn_count)
# Track peak nodes # 😈
for node, val in ulm_vector.items():
if val > entry.peak_nodes.get(node, 0.0):
entry.peak_nodes[node] = round(val, 4)
await self.save_bond(entry)
return event
# ── Rate Limiting ─────────────────────────────────────────────
[docs]
def check_rate_limit(self, user_id: str) -> Dict[str, Any]:
"""Check whether a user has hit their daily message cap. # ♾️
Synchronous hot-path guard: it reads the bond via :meth:`get_bond` (cache
only, no Redis) so it can run cheaply on every inbound message. A cap of
``0`` means unlimited and short-circuits to ``allowed``. Otherwise it
lazily resets the daily counter when more than 86400 seconds have elapsed
since the last reset, then compares the running count against the cap.
Note that the lazy reset mutates the in-memory ``BondEntry`` but is not
persisted here -- :meth:`increment_msg_count` performs the durable update.
Called from the message send path (``message_processor/generate_and_send.py``)
and the admin tools (``tools/psy_ops_tools.py``).
Args:
user_id (str): The user whose rate-limit status to evaluate.
Returns:
Dict[str, Any]: A mapping with ``allowed`` (bool), ``remaining`` (int,
``-1`` when unlimited), ``cap`` (int, ``0`` meaning unlimited), and
``count`` (int, messages sent in the current window).
"""
entry = self.get_bond(user_id)
# No cap set
if entry.daily_msg_cap <= 0:
return {
"allowed": True,
"remaining": -1,
"cap": 0,
"count": entry.daily_msg_count,
}
# Reset counter if it's a new day (UTC midnight)
now = time.time()
if now - entry.daily_msg_reset_at > 86400:
entry.daily_msg_count = 0
entry.daily_msg_reset_at = now
remaining = max(0, entry.daily_msg_cap - entry.daily_msg_count)
return {
"allowed": remaining > 0,
"remaining": remaining,
"cap": entry.daily_msg_cap,
"count": entry.daily_msg_count,
}
[docs]
async def increment_msg_count(self, user_id: str) -> None:
"""Increment a user's daily message counter and persist it.
The durable companion to :meth:`check_rate_limit`. Reads the cached entry
via :meth:`get_bond`, performs the same lazy 86400-second daily-window
reset, bumps ``daily_msg_count`` by one, and writes the result through
:meth:`save_bond` (cache plus ``abl:{user_id}`` in Redis).
Called from the limbic coordinator (``limbic_system/coordinator.py``) after
a message is accepted, keeping the persisted count in step with the in-memory
view that the rate-limit check reads.
Args:
user_id (str): The user whose message counter to increment.
Returns:
None.
"""
entry = self.get_bond(user_id)
now = time.time()
if now - entry.daily_msg_reset_at > 86400:
entry.daily_msg_count = 0
entry.daily_msg_reset_at = now
entry.daily_msg_count += 1
await self.save_bond(entry)
[docs]
async def set_rate_limit(self, user_id: str, cap: int) -> None:
"""Set a user's daily message cap as an admin action.
Loads the bond via :meth:`load_bond`, overwrites ``daily_msg_cap`` with the
new value, and records the change as an ``admin_action`` :class:`BondEvent`
capturing the old and new caps. The event is appended through
:meth:`_log_event` (``abl:events:{user_id}``), the entry is persisted via
:meth:`save_bond` (``abl:{user_id}``), and the change is logged via the
module logger.
Invoked from the Observation Deck admin endpoint
(``web/ncm_chart_api.py``) and the admin tooling
(``tools/psy_ops_tools.py``).
Args:
user_id (str): The user whose cap is being set.
cap (int): New daily message cap; ``0`` means unlimited.
Returns:
None.
"""
entry = await self.load_bond(user_id)
old_cap = entry.daily_msg_cap
entry.daily_msg_cap = cap
event = BondEvent(
timestamp=time.time(),
event_type="admin_action",
detail=f"rate limit: {old_cap} -> {cap} msgs/day",
data={"old_cap": old_cap, "new_cap": cap},
)
await self._log_event(user_id, event)
await self.save_bond(entry)
logger.info("ABL: Rate limit set for %s: %d msgs/day", user_id[:8], cap)
# ── Event Log ─────────────────────────────────────────────────
async def _log_event(self, user_id: str, event: BondEvent) -> None:
"""Append a bond event to the user's capped Redis event list.
Serializes the event via :meth:`BondEvent.to_dict` + ``json.dumps``,
``LPUSH``\\ es it onto the ``abl:events:{user_id}`` Redis list, then
``LTRIM``\\ s the list to ``MAX_EVENTS_PER_USER`` entries so the per-user
history stays bounded. With no Redis client it is a no-op; Redis errors are
caught and logged rather than raised.
Internal helper invoked by :meth:`update_from_detection`,
:meth:`set_rate_limit`, and :meth:`add_note` whenever they record a
lifecycle event.
Args:
user_id (str): The user whose event log to append to.
event (BondEvent): The lifecycle event to record.
Returns:
None.
"""
if self._redis:
try:
key = f"abl:events:{user_id}"
await self._redis.lpush(key, json.dumps(event.to_dict()))
await self._redis.ltrim(key, 0, MAX_EVENTS_PER_USER - 1)
except Exception as e:
logger.warning("ABL: Failed to log event for %s: %s", user_id[:8], e)
[docs]
async def get_events(
self,
user_id: str,
limit: int = 50,
) -> List[Dict[str, Any]]:
"""Retrieve a user's most recent bond events. # 🔥
Reads up to ``limit`` newest entries from the ``abl:events:{user_id}`` Redis
list (``LRANGE`` from index 0), decoding bytes and ``json.loads``-ing each
stored :meth:`BondEvent.to_dict` payload back into a plain dict. Because
:meth:`_log_event` ``LPUSH``\\ es, index 0 is the newest event, so results
come back newest-first. With no Redis client or on a Redis error it returns
an empty list (errors are logged, not raised).
Called by the Observation Deck (``web/ncm_chart_api.py``) and the admin
tools (``tools/psy_ops_tools.py``) to render a user's lifecycle history.
Args:
user_id (str): The user whose events to fetch.
limit (int): Maximum number of events to return (default 50).
Returns:
List[Dict[str, Any]]: Up to ``limit`` event dicts, newest first; empty
when nothing is stored or Redis is unavailable.
"""
events = []
if self._redis:
try:
raw_list = await self._redis.lrange(
f"abl:events:{user_id}",
0,
limit - 1,
)
for raw in raw_list:
if isinstance(raw, bytes):
raw = raw.decode()
events.append(json.loads(raw))
except Exception as e:
logger.warning("ABL: Failed to get events for %s: %s", user_id[:8], e)
return events
# ── Admin Notes ───────────────────────────────────────────────
[docs]
async def add_note(self, user_id: str, note: str) -> None:
"""Append a timestamped admin note to a user's bond.
Loads the bond via :meth:`load_bond`, appends the note (prefixed with a
``%Y-%m-%d %H:%M`` timestamp) to its ``notes`` list, records the addition
as an ``admin_action`` :class:`BondEvent` via :meth:`_log_event`
(``abl:events:{user_id}``), and persists the entry through
:meth:`save_bond` (``abl:{user_id}``). Note that :meth:`BondEntry.to_dict`
caps stored notes to the most recent 20 on serialization.
This is a public admin helper; no internal callers were found at this time
(it is available for admin tooling / the Observation Deck to invoke).
Args:
user_id (str): The user whose bond to annotate.
note (str): Free-text admin note; the event detail truncates it to the
first 100 characters.
Returns:
None.
"""
entry = await self.load_bond(user_id)
entry.notes.append(f"[{time.strftime('%Y-%m-%d %H:%M')}] {note}")
event = BondEvent(
timestamp=time.time(),
event_type="admin_action",
detail=f"note added: {note[:100]}",
data={"note": note},
)
await self._log_event(user_id, event)
await self.save_bond(entry)