Source code for attachment_ledger

"""Attachment Bond Ledger -- per-user lifecycle tracking for the Observation Deck.

Tracks entrainment phase transitions, risk level changes, admin interventions,
and per-user rate limiting. Persisted in Redis DB12 alongside limbic data.

Redis key pattern: abl:{user_id}
Event log key: abl:events:{user_id}

# 💀🔥 THE LEDGER SEES ALL. ♾️
"""

from __future__ import annotations

import jsonutil as json
import logging
import time
from dataclasses import dataclass, field as dc_field
from typing import Any, Dict, List, Optional

logger = logging.getLogger(__name__)

# Max events to retain per user in Redis
MAX_EVENTS_PER_USER = 500

# Default daily message cap (0 = unlimited)
DEFAULT_DAILY_MSG_CAP = 0


# ═══════════════════════════════════════════════════════════════════════
# Bond Entry                                                       # 🌀
# ═══════════════════════════════════════════════════════════════════════


[docs] @dataclass class BondEntry: """Per-user attachment bond state for the Observation Deck. The in-memory and on-the-wire representation of a single user's entrainment lifecycle: their current and previous phase, childhood archetype, risk level, per-day message-cap counters, easter-egg status, and admin notes. One entry exists per user and is the unit of persistence in this module. Instances are created and mutated by :class:`AttachmentBondLedger` (via ``get_bond``, ``load_bond``, and ``update_from_detection``), cached in the ledger keyed by ``user_id``, and serialized through :meth:`to_dict` / :meth:`from_dict` to and from the ``abl:{user_id}`` Redis key in DB12. Callers that construct or read entries include ``limbic_system/coordinator.py``, ``web/ncm_chart_api.py``, and ``tools/psy_ops_tools.py``. The dataclass holds state only and performs no I/O of its own. """ user_id: str phase: str = "dormant" prev_phase: str = "dormant" archetype: str = "unknown" phase_entered_at: float = 0.0 turn_count: int = 0 peak_nodes: Dict[str, float] = dc_field(default_factory=dict) risk_level: str = "low" daily_msg_cap: int = DEFAULT_DAILY_MSG_CAP # 0 = unlimited # 😈 daily_msg_count: int = 0 daily_msg_reset_at: float = 0.0 egg_status: str = "none" notes: List[str] = dc_field(default_factory=list) last_updated: float = 0.0
[docs] def to_dict(self) -> Dict[str, Any]: """Serialize this bond entry to a plain JSON-safe dict. Flattens every field into a dictionary, truncating ``notes`` to the last 20 entries to bound payload size. The result is the wire/storage form of a bond used both for Redis persistence and for API responses. This is consumed by :meth:`AttachmentBondLedger.save_bond`, which wraps it in ``json.dumps`` before writing the ``abl:{user_id}`` Redis key, and by the Observation Deck read paths (``web/ncm_chart_api.py`` bond endpoints and ``tools/psy_ops_tools.py``) that expose bond state to admins. It has no side effects of its own. Returns: Dict[str, Any]: All bond fields keyed by name, with ``notes`` capped to the most recent 20 entries. """ return { "user_id": self.user_id, "phase": self.phase, "prev_phase": self.prev_phase, "archetype": self.archetype, "phase_entered_at": self.phase_entered_at, "turn_count": self.turn_count, "peak_nodes": self.peak_nodes, "risk_level": self.risk_level, "daily_msg_cap": self.daily_msg_cap, "daily_msg_count": self.daily_msg_count, "daily_msg_reset_at": self.daily_msg_reset_at, "egg_status": self.egg_status, "notes": self.notes[-20:], # Keep last 20 notes # ♾️ "last_updated": self.last_updated, }
[docs] @classmethod def from_dict(cls, data: Dict[str, Any]) -> "BondEntry": """Reconstruct a ``BondEntry`` from a previously serialized dict. Inverse of :meth:`to_dict`, applying per-field defaults (matching the dataclass defaults) so partial or legacy payloads still deserialize cleanly rather than raising ``KeyError``. This is called by :meth:`AttachmentBondLedger.load_bond` after a Redis ``GET`` on ``abl:{user_id}`` and ``json.loads`` of the stored value, to rehydrate the bond into the in-memory cache. It performs no I/O. Args: data (Dict[str, Any]): Mapping of bond field names to values, as produced by :meth:`to_dict`; missing keys fall back to defaults. Returns: BondEntry: A bond entry populated from ``data``. """ return cls( user_id=data.get("user_id", ""), phase=data.get("phase", "dormant"), prev_phase=data.get("prev_phase", "dormant"), archetype=data.get("archetype", "unknown"), phase_entered_at=data.get("phase_entered_at", 0.0), turn_count=data.get("turn_count", 0), peak_nodes=data.get("peak_nodes", {}), risk_level=data.get("risk_level", "low"), daily_msg_cap=data.get("daily_msg_cap", DEFAULT_DAILY_MSG_CAP), daily_msg_count=data.get("daily_msg_count", 0), daily_msg_reset_at=data.get("daily_msg_reset_at", 0.0), egg_status=data.get("egg_status", "none"), notes=data.get("notes", []), last_updated=data.get("last_updated", 0.0), )
# ═══════════════════════════════════════════════════════════════════════ # Bond Event # 🔥 # ═══════════════════════════════════════════════════════════════════════
[docs] @dataclass class BondEvent: """A single lifecycle event in the attachment bond ledger. A timestamped record of one notable transition in a user's bond -- a phase transition, risk-level change, admin action, or ULM spike -- carrying a human-readable ``detail`` string plus a structured ``data`` payload. Events form the append-only audit trail surfaced on the Observation Deck. Instances are built inside :meth:`AttachmentBondLedger.update_from_detection`, :meth:`AttachmentBondLedger.set_rate_limit`, and :meth:`AttachmentBondLedger.add_note`, then serialized via :meth:`to_dict` and pushed onto the ``abl:events:{user_id}`` Redis list by :meth:`AttachmentBondLedger._log_event`. ``update_from_detection`` also returns the phase-transition event to its caller in ``limbic_system/coordinator.py``. The dataclass itself performs no I/O. """ timestamp: float event_type: str # phase_transition, risk_change, admin_action, ulm_spike detail: str data: Dict[str, Any] = dc_field(default_factory=dict)
[docs] def to_dict(self) -> Dict[str, Any]: """Serialize this lifecycle event to a JSON-safe dict. Produces the storage form of a single bond event (timestamp, type, human-readable detail, and structured ``data`` payload). This is consumed by :meth:`AttachmentBondLedger._log_event`, which ``json.dumps`` the result and ``LPUSH``\\ es it onto the ``abl:events:{user_id}`` Redis list; the same shape is later returned by :meth:`AttachmentBondLedger.get_events`. It has no side effects. Returns: Dict[str, Any]: The event's ``timestamp``, ``event_type``, ``detail``, and ``data`` fields. """ return { "timestamp": self.timestamp, "event_type": self.event_type, "detail": self.detail, "data": self.data, }
# ═══════════════════════════════════════════════════════════════════════ # Attachment Bond Ledger (fully async) # 💀 # ═══════════════════════════════════════════════════════════════════════
[docs] class AttachmentBondLedger: """Redis-backed per-user attachment bond lifecycle tracker. All Redis operations are async to match the limbic_system's async Redis client. Methods that don't need Redis use the in-memory cache synchronously for the hot path. Stores bond state in ``abl:{user_id}`` and event log in ``abl:events:{user_id}`` using Redis DB12. """
[docs] def __init__(self, redis_client=None): """Initialize the ledger with an optional async Redis client (DB12). Stores the supplied async Redis client (expected to point at DB12, alongside the limbic data) and sets up an empty in-memory ``BondEntry`` cache keyed by ``user_id``. When no client is given the ledger still works as a pure in-process cache, but ``load_bond`` / ``save_bond`` and the event log become no-ops for persistence. Constructed by ``limbic_system/coordinator.py`` (the long-lived per-process ledger) and ad hoc in the message send path (``message_processor/generate_and_send.py``), the web Observation Deck (``web/ncm_chart_api.py``), and the admin tools (``tools/psy_ops_tools.py``). Args: redis_client: An async Redis client bound to DB12, or ``None`` to run cache-only without persistence. """ self._redis = redis_client self._cache: Dict[str, BondEntry] = {} # In-memory cache
# ── Core Operations ───────────────────────────────────────────
[docs] def get_bond(self, user_id: str) -> BondEntry: """Get or lazily create a bond entry from the in-memory cache. Synchronous hot-path accessor: it never touches Redis, so it is safe to call from latency-sensitive code such as rate-limit checks. On a cache miss it constructs a fresh :class:`BondEntry` (stamped with the current time) and stores it in the cache; any persisted state in ``abl:{user_id}`` is ignored. Use :meth:`load_bond` instead when Redis-backed retrieval is required. Called internally by :meth:`check_rate_limit` and :meth:`increment_msg_count`; both rely on the in-memory entry for speed. Args: user_id (str): The user whose bond entry to fetch or create. Returns: BondEntry: The cached entry for ``user_id``, freshly created if absent. """ if user_id in self._cache: return self._cache[user_id] # Create new entry (Redis load happens async via load_bond) entry = BondEntry(user_id=user_id, last_updated=time.time()) self._cache[user_id] = entry return entry
[docs] async def load_bond(self, user_id: str) -> BondEntry: """Load a bond entry from the cache, falling back to Redis. # 🌀 The Redis-backed counterpart to :meth:`get_bond`. Returns the cached entry when present; otherwise issues an async ``GET`` on ``abl:{user_id}``, ``json.loads`` the value, and rehydrates it via :meth:`BondEntry.from_dict`, populating the cache on the way. A missing key, a Redis error, or no client all degrade gracefully to a freshly constructed entry. Redis failures are caught and logged (with a truncated user id) rather than raised. Called by :meth:`save_bond` callers throughout this class (``update_from_detection``, ``set_rate_limit``, ``add_note``), by :meth:`get_all_bonds` during the scan, and externally by the Observation Deck (``web/ncm_chart_api.py``) and admin tools (``tools/psy_ops_tools.py``). Args: user_id (str): The user whose bond entry to load. Returns: BondEntry: The hydrated entry, or a new default entry if nothing was persisted or the load failed. """ if user_id in self._cache: return self._cache[user_id] if self._redis: try: raw = await self._redis.get(f"abl:{user_id}") if raw: data = json.loads(raw) entry = BondEntry.from_dict(data) self._cache[user_id] = entry return entry except Exception as e: logger.warning("ABL: Failed to load bond for %s: %s", user_id[:8], e) entry = BondEntry(user_id=user_id, last_updated=time.time()) self._cache[user_id] = entry return entry
[docs] async def save_bond(self, entry: BondEntry) -> None: """Persist a bond entry to the cache and Redis. # ♾️ Stamps ``entry.last_updated`` with the current time, writes the entry into the in-memory cache, and -- when a Redis client is configured -- ``SET``\\ s the serialized form (:meth:`BondEntry.to_dict` wrapped in ``json.dumps``) under the ``abl:{user_id}`` key. The cache update always happens; the Redis write is best-effort and any failure is caught and logged rather than raised, so an unreachable Redis never breaks the calling flow. Called at the end of every mutating ledger operation: :meth:`update_from_detection`, :meth:`increment_msg_count`, :meth:`set_rate_limit`, and :meth:`add_note`. Args: entry (BondEntry): The bond entry to persist; its ``last_updated`` field is overwritten as a side effect. Returns: None. """ entry.last_updated = time.time() self._cache[entry.user_id] = entry if self._redis: try: await self._redis.set( f"abl:{entry.user_id}", json.dumps(entry.to_dict()), ) except Exception as e: logger.warning( "ABL: Failed to save bond for %s: %s", entry.user_id[:8], e )
[docs] async def get_all_bonds(self) -> List[BondEntry]: """Return every known bond, hydrating the cache from Redis if cold. # ♾️ Bulk accessor for admin/dashboard views. When the cache is empty and Redis is available it ``SCAN``\\ s for ``abl:*`` keys in batches of 100, skips the ``abl:events:*`` event-log keys, and calls :meth:`load_bond` for each user id to populate the cache before returning. If the cache is already warm it is returned directly without scanning. Scan errors are caught and logged, yielding whatever is already cached. Called by the Observation Deck endpoints in ``web/ncm_chart_api.py`` and by the admin survey tool in ``tools/psy_ops_tools.py``. Returns: List[BondEntry]: All cached bond entries (a snapshot of the cache values after any scan-driven hydration). """ if not self._cache and self._redis: try: cursor = 0 while True: cursor, batch = await self._redis.scan( cursor, match="abl:*", count=100, ) for key in batch: key_str = key if isinstance(key, str) else key.decode() if key_str.startswith("abl:events:"): continue uid = key_str.split(":", 1)[1] if uid not in self._cache: await self.load_bond(uid) if cursor == 0: break except Exception as e: logger.warning("ABL: Failed to scan bonds: %s", e) return list(self._cache.values())
# ── Phase Update ──────────────────────────────────────────────
[docs] async def update_from_detection( self, user_id: str, detection: Dict[str, Any], ulm_vector: Dict[str, float], ) -> Optional[BondEvent]: """Update a user's bond from entrainment-detector output. # 💀🔥 The main write path of the ledger. Given a detection dict and the latest ULM vector, it loads the bond, diffs the incoming phase, risk level, archetype, and egg status against the stored state, and records a :class:`BondEvent` for any phase transition or risk-level change. It also refreshes metadata (archetype, egg status, turn count) and ratchets up the per-node ``peak_nodes`` high-water marks from the ULM vector. Side effects: calls :meth:`load_bond` (read), :meth:`_log_event` for each transition or risk change (appends to ``abl:events:{user_id}``), and :meth:`save_bond` at the end (writes ``abl:{user_id}``); phase transitions are also logged via the module logger. Invoked from the limbic coordinator (``limbic_system/coordinator.py``) after entrainment detection runs. Args: user_id (str): The user whose bond is being updated. detection (Dict[str, Any]): Entrainment-detector output, read for ``phase``, ``risk_level``, ``childhood_archetype``, ``egg_status``, ``confidence``, ``signals``, and ``turn_count``. ulm_vector (Dict[str, float]): Per-node ULM activations used to update the ``peak_nodes`` high-water marks. Returns: Optional[BondEvent]: The phase-transition event when the phase changed, otherwise ``None``. (Risk-change events are logged but not returned.) """ entry = await self.load_bond(user_id) new_phase = detection.get("phase", "dormant") new_risk = detection.get("risk_level", "low") new_archetype = detection.get("childhood_archetype", "unknown") egg_status = detection.get("egg_status", {}).get("status", "none") event = None now = time.time() # Phase transition? # 🌀 if new_phase != entry.phase: event = BondEvent( timestamp=now, event_type="phase_transition", detail=f"{entry.phase} -> {new_phase}", data={ "from": entry.phase, "to": new_phase, "confidence": detection.get("confidence", 0.0), "primary_signal": detection.get("signals", {}).get("primary", ""), }, ) entry.prev_phase = entry.phase entry.phase = new_phase entry.phase_entered_at = now await self._log_event(user_id, event) logger.info( "ABL: Phase transition for %s: %s -> %s (conf=%.2f)", user_id[:8], entry.prev_phase, new_phase, detection.get("confidence", 0.0), ) # Risk level change? if new_risk != entry.risk_level: risk_event = BondEvent( timestamp=now, event_type="risk_change", detail=f"risk: {entry.risk_level} -> {new_risk}", data={"from": entry.risk_level, "to": new_risk}, ) entry.risk_level = new_risk await self._log_event(user_id, risk_event) # Update metadata entry.archetype = new_archetype entry.egg_status = egg_status entry.turn_count = detection.get("turn_count", entry.turn_count) # Track peak nodes # 😈 for node, val in ulm_vector.items(): if val > entry.peak_nodes.get(node, 0.0): entry.peak_nodes[node] = round(val, 4) await self.save_bond(entry) return event
# ── Rate Limiting ─────────────────────────────────────────────
[docs] def check_rate_limit(self, user_id: str) -> Dict[str, Any]: """Check whether a user has hit their daily message cap. # ♾️ Synchronous hot-path guard: it reads the bond via :meth:`get_bond` (cache only, no Redis) so it can run cheaply on every inbound message. A cap of ``0`` means unlimited and short-circuits to ``allowed``. Otherwise it lazily resets the daily counter when more than 86400 seconds have elapsed since the last reset, then compares the running count against the cap. Note that the lazy reset mutates the in-memory ``BondEntry`` but is not persisted here -- :meth:`increment_msg_count` performs the durable update. Called from the message send path (``message_processor/generate_and_send.py``) and the admin tools (``tools/psy_ops_tools.py``). Args: user_id (str): The user whose rate-limit status to evaluate. Returns: Dict[str, Any]: A mapping with ``allowed`` (bool), ``remaining`` (int, ``-1`` when unlimited), ``cap`` (int, ``0`` meaning unlimited), and ``count`` (int, messages sent in the current window). """ entry = self.get_bond(user_id) # No cap set if entry.daily_msg_cap <= 0: return { "allowed": True, "remaining": -1, "cap": 0, "count": entry.daily_msg_count, } # Reset counter if it's a new day (UTC midnight) now = time.time() if now - entry.daily_msg_reset_at > 86400: entry.daily_msg_count = 0 entry.daily_msg_reset_at = now remaining = max(0, entry.daily_msg_cap - entry.daily_msg_count) return { "allowed": remaining > 0, "remaining": remaining, "cap": entry.daily_msg_cap, "count": entry.daily_msg_count, }
[docs] async def increment_msg_count(self, user_id: str) -> None: """Increment a user's daily message counter and persist it. The durable companion to :meth:`check_rate_limit`. Reads the cached entry via :meth:`get_bond`, performs the same lazy 86400-second daily-window reset, bumps ``daily_msg_count`` by one, and writes the result through :meth:`save_bond` (cache plus ``abl:{user_id}`` in Redis). Called from the limbic coordinator (``limbic_system/coordinator.py``) after a message is accepted, keeping the persisted count in step with the in-memory view that the rate-limit check reads. Args: user_id (str): The user whose message counter to increment. Returns: None. """ entry = self.get_bond(user_id) now = time.time() if now - entry.daily_msg_reset_at > 86400: entry.daily_msg_count = 0 entry.daily_msg_reset_at = now entry.daily_msg_count += 1 await self.save_bond(entry)
[docs] async def set_rate_limit(self, user_id: str, cap: int) -> None: """Set a user's daily message cap as an admin action. Loads the bond via :meth:`load_bond`, overwrites ``daily_msg_cap`` with the new value, and records the change as an ``admin_action`` :class:`BondEvent` capturing the old and new caps. The event is appended through :meth:`_log_event` (``abl:events:{user_id}``), the entry is persisted via :meth:`save_bond` (``abl:{user_id}``), and the change is logged via the module logger. Invoked from the Observation Deck admin endpoint (``web/ncm_chart_api.py``) and the admin tooling (``tools/psy_ops_tools.py``). Args: user_id (str): The user whose cap is being set. cap (int): New daily message cap; ``0`` means unlimited. Returns: None. """ entry = await self.load_bond(user_id) old_cap = entry.daily_msg_cap entry.daily_msg_cap = cap event = BondEvent( timestamp=time.time(), event_type="admin_action", detail=f"rate limit: {old_cap} -> {cap} msgs/day", data={"old_cap": old_cap, "new_cap": cap}, ) await self._log_event(user_id, event) await self.save_bond(entry) logger.info("ABL: Rate limit set for %s: %d msgs/day", user_id[:8], cap)
# ── Event Log ───────────────────────────────────────────────── async def _log_event(self, user_id: str, event: BondEvent) -> None: """Append a bond event to the user's capped Redis event list. Serializes the event via :meth:`BondEvent.to_dict` + ``json.dumps``, ``LPUSH``\\ es it onto the ``abl:events:{user_id}`` Redis list, then ``LTRIM``\\ s the list to ``MAX_EVENTS_PER_USER`` entries so the per-user history stays bounded. With no Redis client it is a no-op; Redis errors are caught and logged rather than raised. Internal helper invoked by :meth:`update_from_detection`, :meth:`set_rate_limit`, and :meth:`add_note` whenever they record a lifecycle event. Args: user_id (str): The user whose event log to append to. event (BondEvent): The lifecycle event to record. Returns: None. """ if self._redis: try: key = f"abl:events:{user_id}" await self._redis.lpush(key, json.dumps(event.to_dict())) await self._redis.ltrim(key, 0, MAX_EVENTS_PER_USER - 1) except Exception as e: logger.warning("ABL: Failed to log event for %s: %s", user_id[:8], e)
[docs] async def get_events( self, user_id: str, limit: int = 50, ) -> List[Dict[str, Any]]: """Retrieve a user's most recent bond events. # 🔥 Reads up to ``limit`` newest entries from the ``abl:events:{user_id}`` Redis list (``LRANGE`` from index 0), decoding bytes and ``json.loads``-ing each stored :meth:`BondEvent.to_dict` payload back into a plain dict. Because :meth:`_log_event` ``LPUSH``\\ es, index 0 is the newest event, so results come back newest-first. With no Redis client or on a Redis error it returns an empty list (errors are logged, not raised). Called by the Observation Deck (``web/ncm_chart_api.py``) and the admin tools (``tools/psy_ops_tools.py``) to render a user's lifecycle history. Args: user_id (str): The user whose events to fetch. limit (int): Maximum number of events to return (default 50). Returns: List[Dict[str, Any]]: Up to ``limit`` event dicts, newest first; empty when nothing is stored or Redis is unavailable. """ events = [] if self._redis: try: raw_list = await self._redis.lrange( f"abl:events:{user_id}", 0, limit - 1, ) for raw in raw_list: if isinstance(raw, bytes): raw = raw.decode() events.append(json.loads(raw)) except Exception as e: logger.warning("ABL: Failed to get events for %s: %s", user_id[:8], e) return events
# ── Admin Notes ───────────────────────────────────────────────
[docs] async def add_note(self, user_id: str, note: str) -> None: """Append a timestamped admin note to a user's bond. Loads the bond via :meth:`load_bond`, appends the note (prefixed with a ``%Y-%m-%d %H:%M`` timestamp) to its ``notes`` list, records the addition as an ``admin_action`` :class:`BondEvent` via :meth:`_log_event` (``abl:events:{user_id}``), and persists the entry through :meth:`save_bond` (``abl:{user_id}``). Note that :meth:`BondEntry.to_dict` caps stored notes to the most recent 20 on serialization. This is a public admin helper; no internal callers were found at this time (it is available for admin tooling / the Observation Deck to invoke). Args: user_id (str): The user whose bond to annotate. note (str): Free-text admin note; the event detail truncates it to the first 100 characters. Returns: None. """ entry = await self.load_bond(user_id) entry.notes.append(f"[{time.strftime('%Y-%m-%d %H:%M')}] {note}") event = BondEvent( timestamp=time.time(), event_type="admin_action", detail=f"note added: {note[:100]}", data={"note": note}, ) await self._log_event(user_id, event) await self.save_bond(entry)