Source code for observability

"""Unified observability events via Redis pub/sub and sorted sets.

Publishes events for tool calls, response phases, and background tasks.
All events are both streamed (pub/sub) and persisted (sorted sets) so
dashboards can consume them in real-time or query history.
"""

from __future__ import annotations

import json
import logging
import time
import uuid
from typing import Any, TYPE_CHECKING

if TYPE_CHECKING:
    import redis.asyncio as aioredis

logger = logging.getLogger(__name__)

# Redis channels / storage keys
_TOOL_CHANNEL = "stargazer:tools"
_TOOL_STORAGE = "stargazer:tools:history"
_RESPONSE_CHANNEL = "stargazer:response_phases"
_RESPONSE_STORAGE = "stargazer:response_phases:history"
_BG_CHANNEL = "stargazer:background_tasks"
_BG_STORAGE = "stargazer:background_tasks:history"

_MAX_STORED = 10_000
_TTL_DAYS = 7

# Module-level Redis client (set during init)
_redis: aioredis.Redis | None = None


[docs] def set_observability_redis(client: aioredis.Redis | None) -> None: """Set the observability redis. Args: client (aioredis.Redis | None): The client value. """ global _redis _redis = client
[docs] def generate_request_id() -> str: """Generate request id. Returns: str: Result string. """ return f"req_{uuid.uuid4().hex[:12]}"
# ------------------------------------------------------------------ # Internal publish + persist # ------------------------------------------------------------------ async def _publish( channel: str, storage_key: str, event: dict[str, Any], max_stored: int = _MAX_STORED, ttl_days: int = _TTL_DAYS, ) -> None: """Internal helper: publish. Args: channel (str): The channel value. storage_key (str): The storage key value. event (dict[str, Any]): The event value. max_stored (int): The max stored value. ttl_days (int): The ttl days value. """ if _redis is None: return try: payload = json.dumps(event, default=str) ts = event.get("timestamp", time.time()) await _redis.publish(channel, payload) await _redis.zadd(storage_key, {payload: ts}) # Probabilistic trim (~1% of calls) if hash(payload) % 100 == 0: await _redis.zremrangebyrank(storage_key, 0, -max_stored - 1) cutoff = time.time() - ttl_days * 86_400 await _redis.zremrangebyscore(storage_key, "-inf", cutoff) except Exception: logger.debug("Observability publish failed", exc_info=True) # ------------------------------------------------------------------ # Tool call events # ------------------------------------------------------------------
[docs] async def publish_tool_event( tool_name: str, arguments: dict[str, Any] | None = None, result_preview: str = "", duration_ms: float = 0, success: bool = True, user_id: str = "", channel_id: str = "", request_id: str = "", ) -> None: """Publish tool event. Args: tool_name (str): The tool name value. arguments (dict[str, Any] | None): The arguments value. result_preview (str): The result preview value. duration_ms (float): The duration ms value. success (bool): The success value. user_id (str): Unique identifier for the user. channel_id (str): Discord/Matrix channel identifier. request_id (str): The request id value. """ await _publish(_TOOL_CHANNEL, _TOOL_STORAGE, { "event_type": "tool_call", "tool_name": tool_name, "arguments": _truncate(arguments), "result_preview": (result_preview or "")[:500], "duration_ms": round(duration_ms, 1), "success": success, "user_id": user_id, "channel_id": channel_id, "request_id": request_id, "timestamp": time.time(), })
# ------------------------------------------------------------------ # Response phase events # ------------------------------------------------------------------
[docs] async def publish_response_event( phase: str, request_id: str = "", channel_id: str = "", user_id: str = "", model: str = "", duration_ms: float = 0, tool_rounds: int = 0, error: str = "", **extra: Any, ) -> None: """Publish response event. Args: phase (str): The phase value. request_id (str): The request id value. channel_id (str): Discord/Matrix channel identifier. user_id (str): Unique identifier for the user. model (str): The model value. duration_ms (float): The duration ms value. tool_rounds (int): The tool rounds value. error (str): The error value. **extra: Additional keyword arguments. """ event: dict[str, Any] = { "event_type": "response_phase", "phase": phase, "request_id": request_id, "channel_id": channel_id, "user_id": user_id, "model": model, "duration_ms": round(duration_ms, 1), "tool_rounds": tool_rounds, "timestamp": time.time(), } if error: event["error"] = error[:500] event.update(extra) await _publish(_RESPONSE_CHANNEL, _RESPONSE_STORAGE, event)
# ------------------------------------------------------------------ # Background task events # ------------------------------------------------------------------
[docs] async def publish_background_event( task_name: str, status: str = "running", task_id: str = "", progress: float | None = None, result: str = "", error: str = "", **extra: Any, ) -> None: """Publish background event. Args: task_name (str): The task name value. status (str): The status value. task_id (str): Background task identifier. progress (float | None): The progress value. result (str): Result data. error (str): The error value. **extra: Additional keyword arguments. """ event: dict[str, Any] = { "event_type": "background_task", "task_name": task_name, "status": status, "task_id": task_id or uuid.uuid4().hex[:12], "timestamp": time.time(), } if progress is not None: event["progress"] = progress if result: event["result"] = result[:1000] if error: event["error"] = error[:500] event.update(extra) await _publish(_BG_CHANNEL, _BG_STORAGE, event)
# ------------------------------------------------------------------ # Query helpers # ------------------------------------------------------------------
[docs] async def get_recent_events( category: str = "tools", limit: int = 100, ) -> list[dict[str, Any]]: """Return recent events for *category* (tools, responses, background).""" if _redis is None: return [] key_map = { "tools": _TOOL_STORAGE, "responses": _RESPONSE_STORAGE, "background": _BG_STORAGE, } storage_key = key_map.get(category) if not storage_key: return [] try: raw = await _redis.zrevrange(storage_key, 0, limit - 1, withscores=True) out: list[dict[str, Any]] = [] for payload, score in raw: try: ev = json.loads(payload) ev["timestamp"] = score out.append(ev) except json.JSONDecodeError: continue return out except Exception: logger.debug("Failed to query events", exc_info=True) return []
# ------------------------------------------------------------------ # Helpers # ------------------------------------------------------------------ def _truncate(obj: Any, max_len: int = 500) -> Any: """Internal helper: truncate. Args: obj (Any): The obj value. max_len (int): The max len value. Returns: Any: The result. """ if obj is None: return None s = str(obj) return s[:max_len] + "..." if len(s) > max_len else s