"""Unified observability events via Redis pub/sub and sorted sets.
Publishes events for tool calls, response phases, and background tasks.
All events are both streamed (pub/sub) and persisted (sorted sets) so
dashboards can consume them in real-time or query history.
"""
from __future__ import annotations
import json
import logging
import time
import uuid
from typing import Any, TYPE_CHECKING
if TYPE_CHECKING:
import redis.asyncio as aioredis
logger = logging.getLogger(__name__)
# Redis channels / storage keys
_TOOL_CHANNEL = "stargazer:tools"
_TOOL_STORAGE = "stargazer:tools:history"
_RESPONSE_CHANNEL = "stargazer:response_phases"
_RESPONSE_STORAGE = "stargazer:response_phases:history"
_BG_CHANNEL = "stargazer:background_tasks"
_BG_STORAGE = "stargazer:background_tasks:history"
_MAX_STORED = 10_000
_TTL_DAYS = 7
# Module-level Redis client (set during init)
_redis: aioredis.Redis | None = None
[docs]
def set_observability_redis(client: aioredis.Redis | None) -> None:
"""Set the observability redis.
Args:
client (aioredis.Redis | None): The client value.
"""
global _redis
_redis = client
[docs]
def generate_request_id() -> str:
"""Generate request id.
Returns:
str: Result string.
"""
return f"req_{uuid.uuid4().hex[:12]}"
# ------------------------------------------------------------------
# Internal publish + persist
# ------------------------------------------------------------------
async def _publish(
channel: str,
storage_key: str,
event: dict[str, Any],
max_stored: int = _MAX_STORED,
ttl_days: int = _TTL_DAYS,
) -> None:
"""Internal helper: publish.
Args:
channel (str): The channel value.
storage_key (str): The storage key value.
event (dict[str, Any]): The event value.
max_stored (int): The max stored value.
ttl_days (int): The ttl days value.
"""
if _redis is None:
return
try:
payload = json.dumps(event, default=str)
ts = event.get("timestamp", time.time())
await _redis.publish(channel, payload)
await _redis.zadd(storage_key, {payload: ts})
# Probabilistic trim (~1% of calls)
if hash(payload) % 100 == 0:
await _redis.zremrangebyrank(storage_key, 0, -max_stored - 1)
cutoff = time.time() - ttl_days * 86_400
await _redis.zremrangebyscore(storage_key, "-inf", cutoff)
except Exception:
logger.debug("Observability publish failed", exc_info=True)
# ------------------------------------------------------------------
# Tool call events
# ------------------------------------------------------------------
# ------------------------------------------------------------------
# Response phase events
# ------------------------------------------------------------------
[docs]
async def publish_response_event(
phase: str,
request_id: str = "",
channel_id: str = "",
user_id: str = "",
model: str = "",
duration_ms: float = 0,
tool_rounds: int = 0,
error: str = "",
**extra: Any,
) -> None:
"""Publish response event.
Args:
phase (str): The phase value.
request_id (str): The request id value.
channel_id (str): Discord/Matrix channel identifier.
user_id (str): Unique identifier for the user.
model (str): The model value.
duration_ms (float): The duration ms value.
tool_rounds (int): The tool rounds value.
error (str): The error value.
**extra: Additional keyword arguments.
"""
event: dict[str, Any] = {
"event_type": "response_phase",
"phase": phase,
"request_id": request_id,
"channel_id": channel_id,
"user_id": user_id,
"model": model,
"duration_ms": round(duration_ms, 1),
"tool_rounds": tool_rounds,
"timestamp": time.time(),
}
if error:
event["error"] = error[:500]
event.update(extra)
await _publish(_RESPONSE_CHANNEL, _RESPONSE_STORAGE, event)
# ------------------------------------------------------------------
# Background task events
# ------------------------------------------------------------------
[docs]
async def publish_background_event(
task_name: str,
status: str = "running",
task_id: str = "",
progress: float | None = None,
result: str = "",
error: str = "",
**extra: Any,
) -> None:
"""Publish background event.
Args:
task_name (str): The task name value.
status (str): The status value.
task_id (str): Background task identifier.
progress (float | None): The progress value.
result (str): Result data.
error (str): The error value.
**extra: Additional keyword arguments.
"""
event: dict[str, Any] = {
"event_type": "background_task",
"task_name": task_name,
"status": status,
"task_id": task_id or uuid.uuid4().hex[:12],
"timestamp": time.time(),
}
if progress is not None:
event["progress"] = progress
if result:
event["result"] = result[:1000]
if error:
event["error"] = error[:500]
event.update(extra)
await _publish(_BG_CHANNEL, _BG_STORAGE, event)
# ------------------------------------------------------------------
# Query helpers
# ------------------------------------------------------------------
[docs]
async def get_recent_events(
category: str = "tools",
limit: int = 100,
) -> list[dict[str, Any]]:
"""Return recent events for *category* (tools, responses, background)."""
if _redis is None:
return []
key_map = {
"tools": _TOOL_STORAGE,
"responses": _RESPONSE_STORAGE,
"background": _BG_STORAGE,
}
storage_key = key_map.get(category)
if not storage_key:
return []
try:
raw = await _redis.zrevrange(storage_key, 0, limit - 1, withscores=True)
out: list[dict[str, Any]] = []
for payload, score in raw:
try:
ev = json.loads(payload)
ev["timestamp"] = score
out.append(ev)
except json.JSONDecodeError:
continue
return out
except Exception:
logger.debug("Failed to query events", exc_info=True)
return []
# ------------------------------------------------------------------
# Helpers
# ------------------------------------------------------------------
def _truncate(obj: Any, max_len: int = 500) -> Any:
"""Internal helper: truncate.
Args:
obj (Any): The obj value.
max_len (int): The max len value.
Returns:
Any: The result.
"""
if obj is None:
return None
s = str(obj)
return s[:max_len] + "..." if len(s) > max_len else s