Source code for observability

"""Unified observability events: Redis HASH + RediSearch (``idx:observability``) + pub/sub.

Each event is stored as ``obs:{uuid}`` with fields aligned to the RediSearch index,
then published on ``stargazer:observability`` for live admin dashboards.
"""

from __future__ import annotations

import functools
import inspect
import jsonutil as json
import logging
import time
import uuid
from typing import Any, TYPE_CHECKING

if TYPE_CHECKING:
    import redis.asyncio as aioredis

logger = logging.getLogger(__name__)

# Fanout channel for WebSocket subscribers (full JSON including payload).
OBSERVABILITY_CHANNEL = "stargazer:observability"

TTL_SECONDS = 3 * 86400

# RediSearch query: all persisted HTTP/API failures (incl. transport with http_status=0).
HTTP_ERROR_EVENT_TYPE = "http_error"
HTTP_ERROR_SEARCH_QUERY = f"@event_type:{{{HTTP_ERROR_EVENT_TYPE}}}"

# Module-level Redis client (decode_responses=True recommended for FT + HSET).
_redis: aioredis.Redis | None = None


[docs] def set_observability_redis(client: aioredis.Redis | None) -> None: """Attach the shared async Redis client used for all observability storage. Sets the module-level ``_redis`` handle that every ``publish_*``, ``fetch_*`` and ``search_*`` function in this module reads; until it is set those calls short-circuit and become no-ops (returning ``None`` or empty results). This is the wiring step each microservice performs at startup so observability events can be persisted and fanned out. Called from the service entrypoints ``agents_main.py``, ``inference_main.py``, ``consolidation_main.py`` and ``web_main.py``, and re-bound by ``web/observability_routes.py``. Args: client (aioredis.Redis | None): The shared async Redis client, or ``None`` to detach and disable observability persistence. """ global _redis _redis = client
[docs] def get_http_call_origin() -> str: """Identify the application frame that triggered an outbound HTTP call. Walks the live call stack and returns ``file:lineno in function`` for the first frame that is not part of the HTTP/observability plumbing, skipping ``openrouter_client``, this module, the embedding pools, and stdlib/test machinery (importlib, contextlib, asyncio, pytest). This gives HTTP error telemetry a meaningful "who made this call" attribution instead of pointing at the transport layer. Pure stack inspection with no I/O; any exception is swallowed and ``"unknown"`` returned. Called by the OpenRouter transport (``openrouter_client/transport.py``) and the embedding pool (``gemini_embed_pool.py``) when recording HTTP failures. Returns: str: A ``"basename:lineno in function"`` origin string, or ``"unknown"`` if no suitable frame is found or inspection fails. """ import inspect import os try: for frame_info in inspect.stack(): filename = frame_info.filename if not filename: continue normalized = filename.replace("\\", "/").lower() if "openrouter_client" in normalized or "observability.py" in normalized: continue if "gemini_embed_pool.py" in normalized or "openrouter_embeddings.py" in normalized: continue if "importlib" in normalized or "contextlib" in normalized or "asyncio" in normalized: continue if "pytest" in normalized or "pluggy" in normalized or "_pytest" in normalized: continue base_file = os.path.basename(filename) return f"{base_file}:{frame_info.lineno} in {frame_info.function}" except Exception: pass return "unknown"
[docs] def generate_request_id() -> str: """Mint a fresh correlation id for a single bot turn. Produces a short ``req_{hex}`` token that is threaded through one turn's observability events (inbound message, LLM request, tool calls, response phases) so they can be stitched together in the dashboard. Pure id generation with no I/O. Called by the message processor (``message_processor/processor.py``), which stashes the value under ``observability_request_id`` on the message for downstream emitters. Returns: str: A new ``req_`` correlation id. """ return f"req_{uuid.uuid4().hex[:12]}"
def _tag_val(s: str) -> str: """Normalise a value for a RediSearch ``TAG`` field, substituting a sentinel. RediSearch cannot index or match an empty ``TAG`` value, so this trims the input and replaces an empty/whitespace-only string with the ``"-"`` sentinel, keeping every indexed field queryable. Pure helper used by the hash-building code in :func:`publish_observability_event` and :func:`publish_debug_event` to populate ``TAG`` columns such as ``event_type``, ``platform`` and ``user_id``; it has no callers outside this module. Args: s (str): The raw field value (possibly empty or ``None``-ish). Returns: str: The stripped value, or ``"-"`` when it would otherwise be empty. """ t = (s or "").strip() return t if t else "-" def _truncate_preview(s: str, max_len: int = 2000) -> str: """Clamp a preview string to *max_len* characters, appending ``...`` if cut. Keeps the human-readable ``preview`` field bounded so it stays cheap to index in RediSearch and to ship over pub/sub. An empty/falsy input yields an empty string. This is a pure helper called throughout this module by every ``publish_*`` event builder (``publish_observability_event``, ``publish_tool_event``, ``publish_response_event``, ``publish_debug_event``, etc.) to shape the ``preview`` value; it has no callers outside this file. Args: s: Raw preview text (may be empty). max_len: Maximum number of characters to keep before truncating. Returns: str: The original string, or its first *max_len* chars plus ``...``. """ if not s: return "" return s[:max_len] + ("..." if len(s) > max_len else "")
[docs] async def publish_observability_event( *, event_type: str, platform: str = "", channel_id: str = "", user_id: str = "", tool_name: str = "", request_id: str = "", preview: str = "", timestamp: float | None = None, http_status: int = 0, http_service: str = "", payload: dict[str, Any] | None = None, ) -> str | None: """Persist and broadcast one observability event — the core emitter. Builds a flat hash for the RediSearch index (every ``TAG`` field passed through :func:`_tag_val`, ``preview`` clamped by :func:`_truncate_preview`, structured ``payload`` JSON-encoded), writes it to ``obs:{uuid}``, sets a :data:`TTL_SECONDS` expiry, and publishes the full JSON wire form on the :data:`OBSERVABILITY_CHANNEL` (``stargazer:observability``) pub/sub channel for live admin dashboards — all in a single Redis pipeline. The HSET fields align with the ``idx:observability`` schema so events are searchable by :func:`search_observability_events`. This is the shared backend that every typed ``publish_*`` helper in this module delegates to (:func:`publish_tool_event`, :func:`publish_response_event`, :func:`publish_http_error_event`, etc.); it returns ``None`` and does nothing when Redis is unattached or the pipeline raises (failures are logged at debug and never propagate). Args: event_type (str): Event type tag (e.g. ``tool_call``, ``http_error``). platform (str): Originating platform, if any. channel_id (str): Originating channel/conversation id, if any. user_id (str): Originating user id, if any. tool_name (str): Tool name, for tool events. request_id (str): Per-turn correlation id. preview (str): Human-readable preview (truncated when stored). timestamp (float | None): Event time; defaults to now when ``None``. http_status (int): HTTP status for HTTP events, else ``0``. http_service (str): HTTP service name for HTTP events. payload (dict[str, Any] | None): Structured payload stored as JSON. Returns: str | None: The ``obs:{uuid}`` Redis key on success, or ``None`` when Redis is unavailable or the write failed. """ if _redis is None: return None ts = time.time() if timestamp is None else float(timestamp) key = f"obs:{uuid.uuid4().hex}" flat_payload = payload or {} mapping: dict[str, str | bytes] = { "event_type": _tag_val(event_type), "platform": _tag_val(platform), "channel_id": _tag_val(channel_id), "user_id": _tag_val(user_id), "tool_name": _tag_val(tool_name), "request_id": _tag_val(request_id), "preview": _truncate_preview(preview), "timestamp": str(ts), "http_status": str(int(http_status)), "http_service": _tag_val(http_service), "payload_json": json.dumps(flat_payload, default=str), } wire: dict[str, Any] = { "event_type": event_type, "platform": platform, "channel_id": channel_id, "user_id": user_id, "tool_name": tool_name, "request_id": request_id, "preview": mapping["preview"], "timestamp": ts, "http_status": int(http_status), "http_service": http_service, "payload": flat_payload, "doc_id": key, } try: pipe = _redis.pipeline() pipe.hset(key, mapping=mapping) # type: ignore[arg-type] pipe.expire(key, TTL_SECONDS) pipe.publish(OBSERVABILITY_CHANNEL, json.dumps(wire, default=str)) await pipe.execute() except Exception: logger.debug("Observability publish failed", exc_info=True) return None return key
[docs] async def publish_tool_event( tool_name: str, arguments: dict[str, Any] | None = None, result_preview: str = "", duration_ms: float = 0, success: bool = True, user_id: str = "", channel_id: str = "", request_id: str = "", platform: str = "", ) -> None: """Record a single tool invocation as a ``tool_call`` observability event. Builds a compact one-line ``preview`` (tool name, success flag, duration, truncated args and result) and a structured payload, then persists and fanouts the event. Used to populate the live admin dashboard's tool-activity view. Delegates to :func:`publish_observability_event` (writing the ``obs:{uuid}`` hash, indexing it under ``idx:observability`` and publishing JSON on ``stargazer:observability``); ``arguments`` and ``result_preview`` are size- clamped via :func:`_truncate` and :func:`_truncate_preview` so large tool I/O does not bloat the store. Called by the OpenRouter tool executor after each tool finishes (``openrouter_client/executor.py`` around line 799). Args: tool_name: Name of the tool that ran. arguments: The arguments dict the tool was called with (clamped to ~800 chars in the payload), or ``None``. result_preview: A short textual preview of the tool result. duration_ms: Wall-clock execution time in milliseconds. success: Whether the tool call completed without error. user_id: ID of the user whose turn triggered the tool. channel_id: ID of the originating channel/conversation. request_id: Correlation id for the bot turn. platform: Originating platform (e.g. ``discord``). Returns: None. """ await publish_observability_event( event_type="tool_call", platform=platform, channel_id=channel_id, user_id=user_id, tool_name=tool_name, request_id=request_id, preview=_truncate_preview( f"{tool_name} ok={success} {duration_ms:.1f}ms | " f"{_truncate_preview(str(arguments), 400)!s} → " f"{(result_preview or '')[:400]}", ), payload={ "arguments": ( _truncate(str(arguments), 800) if arguments is not None else None ), "result_preview": (result_preview or "")[:800], "duration_ms": round(duration_ms, 1), "success": success, }, )
[docs] async def publish_response_event( phase: str, request_id: str = "", channel_id: str = "", user_id: str = "", model: str = "", duration_ms: float = 0, tool_rounds: int = 0, error: str = "", platform: str = "", **extra: Any, ) -> None: """Record a phase of the response-generation lifecycle as a ``response_phase`` event. Captures milestones of one bot turn (e.g. start/end of LLM generation), including the model used, elapsed time, number of tool rounds and any error, so the dashboard can reconstruct per-turn timing. Arbitrary extra keyword fields are merged verbatim into the payload. Delegates to :func:`publish_observability_event`, persisting the event to ``obs:{uuid}``, indexing it and publishing it on ``stargazer:observability``; ``error`` is clamped to 500 chars in the payload (120 in the preview). Called by the message generator at several points (``message_processor/generate_and_send.py`` around lines 1312, 1428 and 1486) to mark response phases. Args: phase: Name of the lifecycle phase (e.g. ``start``, ``complete``). request_id: Correlation id for the bot turn. channel_id: ID of the originating channel/conversation. user_id: ID of the user being responded to. model: Model identifier used for generation. duration_ms: Elapsed time for this phase in milliseconds. tool_rounds: Number of tool-call rounds in the turn so far. error: Error message if the phase failed (truncated when stored). platform: Originating platform. **extra: Additional key/value fields merged into the event payload. Returns: None. """ payload: dict[str, Any] = { "phase": phase, "model": model, "duration_ms": round(duration_ms, 1), "tool_rounds": tool_rounds, **extra, } if error: payload["error"] = error[:500] await publish_observability_event( event_type="response_phase", platform=platform, channel_id=channel_id, user_id=user_id, request_id=request_id, preview=_truncate_preview( f"phase={phase} model={model} err={error[:120] if error else ''}" ), payload=payload, )
[docs] async def publish_background_event( task_name: str, status: str = "running", task_id: str = "", progress: float | None = None, result: str = "", error: str = "", **extra: Any, ) -> None: """Record the state of a background task as a ``background_task`` event. Reports task lifecycle (running/done/failed), optional progress fraction, and truncated result/error text for the admin dashboard's background-task view. If no ``task_id`` is supplied a short random hex id is generated so successive updates for the same task can be correlated by the caller. Delegates to :func:`publish_observability_event`, writing the ``obs:{uuid}`` hash, indexing under ``idx:observability`` and publishing on ``stargazer:observability``; ``result`` is clamped to 1000 chars and ``error`` to 500. No internal callers were found for this function in the repo (it is a public helper available for background workers to call). Args: task_name: Human-readable name of the background task. status: Task status string (default ``running``). task_id: Stable id for the task; auto-generated when empty. progress: Optional completion fraction (0.0–1.0); omitted when ``None``. result: Optional result text (truncated when stored). error: Optional error text (truncated when stored). **extra: Additional key/value fields merged into the event payload. Returns: None. """ tid = task_id or uuid.uuid4().hex[:12] payload: dict[str, Any] = { "task_name": task_name, "status": status, "task_id": tid, **extra, } if progress is not None: payload["progress"] = progress if result: payload["result"] = result[:1000] if error: payload["error"] = error[:500] await publish_observability_event( event_type="background_task", preview=_truncate_preview(f"bg {task_name} {status} {tid}"), payload=payload, )
[docs] async def publish_classifier_event( *, request_id: str = "", channel_id: str = "", user_id: str = "", platform: str = "", strategy: str = "", tools: list[str] | None = None, top_matches: list[dict[str, Any]] | None = None, phase: str = "user_message", ) -> None: """Record a classifier tool-selection decision as a ``classifier_selection`` event. Captures which routing strategy fired, the list of tools the classifier selected, and the top scoring vector matches, so the dashboard can audit how tools were chosen for a turn. Lists are capped (tools to 256, top matches to 20) before storage. Delegates to :func:`publish_observability_event`, persisting to ``obs:{uuid}``, indexing under ``idx:observability`` and publishing on ``stargazer:observability``. Called by the vector tool classifier after it resolves the tool set (``classifiers/vector_classifier.py`` around line 57). Args: request_id: Correlation id for the bot turn. channel_id: ID of the originating channel/conversation. user_id: ID of the user being classified for. platform: Originating platform. strategy: Name of the classification strategy/path taken. tools: Tool names selected by the classifier (capped at 256). top_matches: Per-tool scoring dicts for the best matches (capped at 20). phase: Classification phase label (default ``user_message``). Returns: None. """ tools = tools or [] top_matches = top_matches or [] await publish_observability_event( event_type="classifier_selection", platform=platform, channel_id=channel_id, user_id=user_id, request_id=request_id, preview=_truncate_preview( f"classifier phase={phase} strategy={strategy} n_tools={len(tools)}", ), payload={ "phase": phase, "strategy": strategy, "tools": tools[:256], "top_matches": top_matches[:20], }, )
[docs] async def publish_embedding_event( *, source: str, model: str, duration_ms: float, success: bool, text_len: int = 0, batch_size: int = 0, request_id: str = "", channel_id: str = "", user_id: str = "", platform: str = "", error: str = "", ) -> None: """Record one embedding-API call as an ``embedding_api`` observability event. Tracks latency, success, text/batch sizes and the model for each embedding request, giving the dashboard visibility into embedding throughput and failures. The ``error`` text is clamped to 500 chars in the payload. Delegates to :func:`publish_observability_event`, writing ``obs:{uuid}``, indexing under ``idx:observability`` and publishing on ``stargazer:observability``. Called from the OpenRouter embeddings transport around several call sites in ``openrouter_client/transport.py`` (lines 534, 603, 629 and 839), covering single and batched embedding requests. Args: source: Logical source/caller of the embedding request. model: Embedding model identifier. duration_ms: Wall-clock duration of the call in milliseconds. success: Whether the embedding call succeeded. text_len: Character length of the embedded text. batch_size: Number of items in the batch (0 for single). request_id: Correlation id for the bot turn, if any. channel_id: Originating channel id, if any. user_id: Originating user id, if any. platform: Originating platform, if any. error: Error message when the call failed (truncated when stored). Returns: None. """ await publish_observability_event( event_type="embedding_api", platform=platform, channel_id=channel_id, user_id=user_id, request_id=request_id, preview=_truncate_preview( f"embed {source} model={model} ok={success} {duration_ms:.1f}ms " f"len={text_len} batch={batch_size}", ), payload={ "source": source, "model": model, "duration_ms": round(duration_ms, 1), "success": success, "text_len": text_len, "batch_size": batch_size, "error": (error or "")[:500], }, )
[docs] async def publish_message_observability_event( *, kind: str, platform: str, channel_id: str, user_id: str, text_preview: str, request_id: str = "", message_id: str = "", ) -> None: """Record an inbound or outbound chat message as a ``message_in``/``message_out`` event. Logs the text preview of a user message or bot reply so the dashboard can show a live conversation timeline. The event type is chosen from *kind*: ``user_in`` maps to ``message_in``; anything else maps to ``message_out``. Delegates to :func:`publish_observability_event`, persisting to ``obs:{uuid}``, indexing under ``idx:observability`` and publishing on ``stargazer:observability``; ``text_preview`` is clamped by :func:`_truncate_preview`. Called when ingesting a user message (``message_processor/processor.py`` around line 2067) and after sending a reply (``message_processor/generate_and_send.py`` around line 2549). Args: kind: ``user_in`` for inbound, otherwise treated as outbound. platform: Originating platform (e.g. ``discord``). channel_id: ID of the channel/conversation. user_id: ID of the user. text_preview: Message text (truncated when stored). request_id: Correlation id for the bot turn, if any. message_id: Platform message id, stored in the payload. Returns: None. """ et = "message_in" if kind == "user_in" else "message_out" await publish_observability_event( event_type=et, platform=platform, channel_id=channel_id, user_id=user_id, request_id=request_id, preview=_truncate_preview(text_preview), payload={"kind": kind, "message_id": message_id}, )
[docs] async def publish_http_error_event( *, http_service: str, http_status: int = 0, request_id: str = "", channel_id: str = "", user_id: str = "", platform: str = "", duration_ms: float = 0, endpoint: str = "", detail: str = "", error_kind: str = "", ) -> None: """Record an HTTP/API failure as an ``http_error`` observability event. Captures the failing service, HTTP status (``0`` for transport-level failures), endpoint, latency and a detail/error-kind string for every outbound HTTP error across the system. These events back the dashboard's error views and the :func:`count_http_errors_since` / :func:`aggregate_http_errors_by_status` queries via the ``HTTP_ERROR_SEARCH_QUERY`` constant. Delegates to :func:`publish_observability_event`, writing ``obs:{uuid}`` with ``http_status``/``http_service`` indexed, and publishing on ``stargazer:observability``; ``endpoint`` is clamped to 500 chars and ``detail`` to 800. Called from many HTTP clients/tools across the repo, including ``anamnesis_engine.py``, ``response_postprocessor.py``, ``gemini_embed_pool.py``, ``rag_system/file_rag_manager.py``, ``tools/brave_search.py`` and ``message_processor/openrouter_structured_preprocess.py``. Args: http_service: Name of the service/API that failed. http_status: HTTP status code, or ``0`` for transport errors. request_id: Correlation id for the bot turn, if any. channel_id: Originating channel id, if any. user_id: Originating user id, if any. platform: Originating platform, if any. duration_ms: Latency until failure in milliseconds. endpoint: The URL/endpoint that was called (truncated when stored). detail: Free-form error detail (truncated when stored). error_kind: Optional error category; added to payload when set. Returns: None. """ pl: dict[str, Any] = { "duration_ms": round(duration_ms, 1), "endpoint": (endpoint or "")[:500], "detail": (detail or "")[:800], } if error_kind: pl["error_kind"] = error_kind await publish_observability_event( event_type=HTTP_ERROR_EVENT_TYPE, platform=platform, channel_id=channel_id, user_id=user_id, request_id=request_id, http_status=int(http_status), http_service=http_service, preview=_truncate_preview( f"{http_service} status={http_status} {endpoint[:80]!s} {detail[:120]!s}", ), payload=pl, )
LLM_REQUEST_TTL = 30 * 60 # 30 minutes — large payloads, debugging only LLM_REQUEST_KEY_PREFIX = "llm_req:"
[docs] async def publish_llm_request_event( *, request_id: str = "", channel_id: str = "", user_id: str = "", platform: str = "", model: str = "", messages: list[dict] | None = None, tool_names: list[str] | None = None, response_text: str = "", error: str = "", http_status: int = 0, duration_ms: float = 0.0, timestamp: float | None = None, ) -> str | None: """Persist one full LLM request/response record for debugging. Captures the heavyweight detail of a single model call — the complete ``messages`` list, tool names offered, the response text, error, HTTP status and latency — and writes it as a hash under ``llm_req:{uuid}`` (:data:`LLM_REQUEST_KEY_PREFIX`) with a short :data:`LLM_REQUEST_TTL` (30-minute) expiry. It is deliberately kept out of the main ``idx:observability`` index and instead indexed by ``idx:llm_req``, because the payloads are large and only useful for forensic replay; the ``messages`` list is JSON-encoded defensively. Written via a Redis pipeline (HSET + EXPIRE); returns ``None`` and logs at debug if Redis is unattached or the write fails. Called by the OpenRouter transport (``openrouter_client/transport.py``) after each model round-trip, and read back by :func:`fetch_llm_requests`. Args: request_id (str): Per-turn correlation id. channel_id (str): Originating channel id. user_id (str): Originating user id. platform (str): Originating platform. model (str): Model identifier used for the call. messages (list[dict] | None): The full request message list. tool_names (list[str] | None): Names of tools offered to the model. response_text (str): The model's response text. error (str): Error message if the call failed. http_status (int): HTTP status of the call. duration_ms (float): Wall-clock latency in milliseconds. timestamp (float | None): Event time; defaults to now when ``None``. Returns: str | None: The ``llm_req:{uuid}`` key on success, or ``None`` when Redis is unavailable or the write failed. """ if _redis is None: return None ts = time.time() if timestamp is None else float(timestamp) key = f"{LLM_REQUEST_KEY_PREFIX}{uuid.uuid4().hex}" try: msg_json = json.dumps(messages or [], default=str) except Exception: msg_json = "[]" mapping: dict[str, str] = { "request_id": (request_id or "-"), "channel_id": (channel_id or "-"), "user_id": (user_id or "-"), "platform": (platform or "-"), "model": (model or "-"), "timestamp": str(ts), "tool_names": json.dumps(tool_names or [], default=str), "messages_json": msg_json, "msg_count": str(len(messages or [])), "response_text": response_text or "", "error": error or "", "http_status": str(int(http_status)), "duration_ms": str(round(duration_ms, 1)), } try: pipe = _redis.pipeline() pipe.hset(key, mapping=mapping) pipe.expire(key, LLM_REQUEST_TTL) await pipe.execute() except Exception: logger.debug("publish_llm_request_event failed", exc_info=True) return None return key
[docs] async def fetch_llm_requests( *, channel_id: str = "", model: str = "", limit: int = 50, offset: int = 0, ) -> tuple[list[dict], int]: """Return paginated LLM request records (newest first) for the debug UI. The read side of :func:`publish_llm_request_event`. It prefers an ``FT.SEARCH`` against the ``idx:llm_req`` index (escaping ``channel_id`` and ``model`` filter values and sorting by ``timestamp`` descending), then hydrates each matching doc with a pipelined ``HGETALL`` and decodes the stored fields (parsing ``tool_names`` JSON, coercing numerics). If the index is not yet available — e.g. during warm-up — it falls back to a full ``SCAN`` of ``llm_req:*`` keys with in-process filtering and sorting. Called by the observability web layer (``web/obs.py``) to drive the LLM-requests debugging view. Args: channel_id (str): Optional exact channel filter. model (str): Optional model filter (exact tag in FT mode, substring in the SCAN fallback). limit (int): Page size (clamped to 1..500 in the FT path). offset (int): Page offset. Returns: tuple[list[dict], int]: ``(rows, total_matching)`` where ``rows`` are the hydrated request records for this page. """ if _redis is None: return [], 0 # --- Attempt FT.SEARCH path --- try: from redis.commands.search.query import Query as _FTQuery from init_redis_indexes import LLM_REQ_INDEX_NAME filter_parts: list[str] = [] if channel_id: safe_cid = channel_id.replace("-", "\\-").replace(":", "\\:") filter_parts.append(f"@channel_id:{{{safe_cid}}}") if model: safe_model = ( model.lower() .replace("-", "\\-") .replace(".", "\\.") .replace("/", "\\/") ) filter_parts.append(f"@model:{{{safe_model}}}") query_str = " ".join(filter_parts) if filter_parts else "*" q = _FTQuery(query_str) q.sort_by("timestamp", asc=False) q.paging(offset, max(1, min(limit, 500))) q.dialect(2) result = await _redis.ft(LLM_REQ_INDEX_NAME).search(q) total = int(getattr(result, "total", 0)) if not result.docs: return [], total pipe = _redis.pipeline() for doc in result.docs: pipe.hgetall(doc.id) raw_results = await pipe.execute() rows: list[dict] = [] for raw in raw_results: if not raw: continue try: r: dict[str, str] = { (k.decode() if isinstance(k, bytes) else k): ( v.decode() if isinstance(v, bytes) else v ) for k, v in raw.items() } rows.append( { "doc_id": r.get("doc_id", ""), "request_id": r.get("request_id", ""), "channel_id": r.get("channel_id", ""), "user_id": r.get("user_id", ""), "platform": r.get("platform", ""), "model": r.get("model", ""), "timestamp": float(r.get("timestamp", 0) or 0), "msg_count": int(r.get("msg_count", 0) or 0), "tool_names": json.loads(r.get("tool_names", "[]") or "[]"), "messages_json": r.get("messages_json", "[]"), "response_text": r.get("response_text", ""), "error": r.get("error", ""), "http_status": int(r.get("http_status", 0) or 0), "duration_ms": float(r.get("duration_ms", 0) or 0), } ) except Exception: continue return rows, total except Exception as _ft_exc: logger.debug( "fetch_llm_requests: FT.SEARCH unavailable (%s), falling back to SCAN", _ft_exc, ) # --- Fallback: full SCAN + in-process filter (index not yet ready) --- keys: list[str] = [] cursor = 0 while True: cursor, batch = await _redis.scan( cursor, match=f"{LLM_REQUEST_KEY_PREFIX}*", count=500 ) keys.extend(batch) if cursor == 0: break if not keys: return [], 0 pipe = _redis.pipeline() for k in keys: pipe.hgetall(k) raw_results = await pipe.execute() rows = [] for key, raw in zip(keys, raw_results): if not raw: continue try: r = { (k.decode() if isinstance(k, bytes) else k): ( v.decode() if isinstance(v, bytes) else v ) for k, v in raw.items() } except Exception: continue if channel_id and r.get("channel_id", "") != channel_id: continue if model and model.lower() not in r.get("model", "").lower(): continue try: ts = float(r.get("timestamp", 0)) except ValueError: ts = 0.0 rows.append( { "doc_id": key, "request_id": r.get("request_id", ""), "channel_id": r.get("channel_id", ""), "user_id": r.get("user_id", ""), "platform": r.get("platform", ""), "model": r.get("model", ""), "timestamp": ts, "msg_count": int(r.get("msg_count", 0) or 0), "tool_names": json.loads(r.get("tool_names", "[]") or "[]"), "messages_json": r.get("messages_json", "[]"), "response_text": r.get("response_text", ""), "error": r.get("error", ""), "http_status": int(r.get("http_status", 0) or 0), "duration_ms": float(r.get("duration_ms", 0) or 0), } ) rows.sort(key=lambda x: x["timestamp"], reverse=True) total = len(rows) return rows[offset : offset + limit], total
def _truncate(obj: Any, max_len: int = 500) -> Any: """Stringify *obj* and clamp it to *max_len* characters for safe storage. Differs from :func:`_truncate_preview` in that it accepts any object (str-coercing it) and preserves ``None`` as ``None`` instead of ``""``, suitable for nullable payload fields. Used within this module by :func:`publish_tool_event` to bound the serialized tool ``arguments``; there are no callers of this particular helper outside ``observability.py``. Args: obj: Any value to stringify and bound, or ``None``. max_len: Maximum number of characters before truncation. Returns: Any: ``None`` when *obj* is ``None``; otherwise the (possibly truncated, with trailing ``...``) string form of *obj*. """ if obj is None: return None s = str(obj) return s[:max_len] + "..." if len(s) > max_len else s def _hdecode(hdata: dict[Any, Any]) -> dict[str, str]: """Decode a raw Redis HGETALL result into a ``str``→``str`` mapping. Redis may return keys/values as ``bytes`` (when the client is not in ``decode_responses`` mode), so this normalises both to UTF-8 strings, replacing undecodable bytes rather than raising. Pure helper used to hydrate hash docs inside :func:`search_observability_events` and :func:`search_debug_events`; not called outside this module. Args: hdata: A raw hash mapping whose keys/values may be ``bytes`` or ``str``. Returns: dict[str, str]: The same mapping with every key and value as a string. """ out: dict[str, str] = {} for k, v in hdata.items(): ks = k.decode("utf-8", errors="replace") if isinstance(k, bytes) else str(k) vs = v.decode("utf-8", errors="replace") if isinstance(v, bytes) else str(v) out[ks] = vs return out
[docs] async def search_observability_events( query: str, *, limit: int = 50, offset: int = 0, sort_desc: bool = True, ) -> tuple[list[dict[str, Any]], int]: """Query the ``idx:observability`` index and hydrate matching event docs. Runs an ``FT.SEARCH`` for the given RediSearch query (optionally sorted by ``timestamp`` descending, paged and clamped to at most 500), then pipelines an ``HGETALL`` per hit to rebuild each event dict — decoding fields via :func:`_hdecode`, parsing the structured ``payload_json`` and coercing ``timestamp``/``http_status``. This is the generic dashboard query used to read back events written by :func:`publish_observability_event`. Called by the observability routes (``web/observability_routes.py``) and internally by :func:`get_recent_events`. All failures are logged at debug and yield empty results rather than raising. Args: query (str): A RediSearch query string against ``idx:observability``. limit (int): Page size (clamped to 1..500). offset (int): Page offset. sort_desc (bool): Sort by ``timestamp`` descending when ``True``. Returns: tuple[list[dict[str, Any]], int]: ``(documents, total_hits)`` where each document includes ``doc_id``, the indexed fields and a parsed ``payload``; empty when Redis is unavailable or the search fails. """ if _redis is None: return [], 0 from init_redis_indexes import OBS_INDEX_NAME from redis.commands.search.query import Query try: q = Query(query) if sort_desc: q.sort_by("timestamp", asc=False) q.paging(offset, max(1, min(limit, 500))) q.dialect(2) result = await _redis.ft(OBS_INDEX_NAME).search(q) except Exception: logger.debug("observability FT.SEARCH failed", exc_info=True) return [], 0 out: list[dict[str, Any]] = [] if not result.docs: return out, int(getattr(result, "total", 0)) pipe = _redis.pipeline() for doc in result.docs: pipe.hgetall(doc.id) raw_results = await pipe.execute() for doc, raw_h in zip(result.docs, raw_results): row: dict[str, Any] = {"doc_id": doc.id} try: hdata = _hdecode(raw_h) if raw_h else {} pj = hdata.get("payload_json", "{}") try: row["payload"] = json.loads(pj) except json.JSONDecodeError: row["payload"] = {} for k in ( "event_type", "platform", "channel_id", "user_id", "tool_name", "request_id", "preview", "http_service", ): if k in hdata: row[k] = hdata[k] if "timestamp" in hdata: row["timestamp"] = float(hdata["timestamp"]) if "http_status" in hdata: row["http_status"] = int(hdata["http_status"]) except Exception: logger.debug("observability doc hydrate failed", exc_info=True) out.append(row) total = int(getattr(result, "total", len(out))) return out, total
[docs] async def count_http_errors_since(since_ts: float) -> int: """Count ``http_error`` events recorded at or after a cutoff timestamp. Issues a zero-row ``FT.SEARCH`` (``paging(0, 0)``) against ``idx:observability`` combining the :data:`HTTP_ERROR_SEARCH_QUERY` tag filter with a ``@timestamp:[since_ts +inf]`` range and returns only the total hit count, making it a cheap way to surface a recent-error tally on the dashboard. Called by the observability routes (``web/observability_routes.py``). Returns ``0`` when Redis is unavailable or the search fails. Args: since_ts (float): Inclusive lower-bound epoch timestamp. Returns: int: The number of matching ``http_error`` events. """ if _redis is None: return 0 from init_redis_indexes import OBS_INDEX_NAME from redis.commands.search.query import Query try: q = Query( f"{HTTP_ERROR_SEARCH_QUERY} @timestamp:[{since_ts} +inf]", ) q.paging(0, 0) result = await _redis.ft(OBS_INDEX_NAME).search(q) return int(getattr(result, "total", 0)) except Exception: logger.debug("count_http_errors_since failed", exc_info=True) return 0
[docs] async def aggregate_http_errors_by_status(since_ts: float) -> list[dict[str, Any]]: """Break recent HTTP errors down into per-status-code counts. Runs an ``FT.AGGREGATE`` over ``idx:observability`` filtered by :data:`HTTP_ERROR_SEARCH_QUERY` and a ``@timestamp:[since_ts +inf]`` range, grouping by ``@http_status`` with a ``COUNT`` reducer, then defensively decodes the aggregation rows (which may arrive as flat key/value lists or dicts, with bytes or str) into clean integer pairs sorted by status code. This feeds the dashboard's error-by-status breakdown alongside :func:`count_http_errors_since`. Called by the observability routes (``web/observability_routes.py``). Returns an empty list when Redis is unavailable or the aggregation fails. Args: since_ts (float): Inclusive lower-bound epoch timestamp. Returns: list[dict[str, Any]]: One ``{"http_status": int, "count": int}`` entry per observed status code, ascending by status. """ if _redis is None: return [] from init_redis_indexes import OBS_INDEX_NAME from redis.commands.search.aggregation import AggregateRequest from redis.commands.search import reducers try: req = AggregateRequest( f"{HTTP_ERROR_SEARCH_QUERY} @timestamp:[{since_ts} +inf]" ).group_by("@http_status", reducers.count().alias("count")) result = await _redis.ft(OBS_INDEX_NAME).aggregate(req) rows: list[dict[str, Any]] = [] for row in result.rows: # Each row is a list of key-value pairs like [b'http_status', b'404', b'count', b'3'] d: dict[str, str] = {} if isinstance(row, (list, tuple)): it = iter(row) for k in it: v = next(it, None) ks = k.decode() if isinstance(k, bytes) else str(k) vs = ( v.decode() if isinstance(v, bytes) else str(v) if v is not None else "0" ) d[ks] = vs elif isinstance(row, dict): d = { (k.decode() if isinstance(k, bytes) else str(k)): ( v.decode() if isinstance(v, bytes) else str(v) ) for k, v in row.items() } try: rows.append( { "http_status": int(d.get("http_status", 0)), "count": int(d.get("count", 0)), } ) except (ValueError, TypeError): continue rows.sort(key=lambda r: r["http_status"]) return rows except Exception: logger.debug("aggregate_http_errors_by_status failed", exc_info=True) return []
[docs] async def get_recent_events( category: str = "tools", limit: int = 100, ) -> list[dict[str, Any]]: """Fetch the most recent events for a coarse category (compat shim). A backward-compatible convenience that maps a category name (``tools``, ``responses`` or ``background``) to the corresponding ``@event_type`` tag query — or ``*`` for anything else — and delegates to :func:`search_observability_events`, sourcing data from RediSearch rather than the legacy sorted-set storage that predated it. No internal callers remain in this repository; it is retained for older dashboard/API consumers. Args: category (str): One of ``tools``, ``responses``, ``background``; any other value matches all event types. limit (int): Maximum number of events to return. Returns: list[dict[str, Any]]: The matching event documents, newest first. """ qmap = { "tools": "@event_type:{tool_call}", "responses": "@event_type:{response_phase}", "background": "@event_type:{background_task}", } q = qmap.get(category, "*") docs, _ = await search_observability_events(q, limit=limit, offset=0) return docs
# --------------------------------------------------------------------------- # obs_debug store — 1-hour forensic event bus # --------------------------------------------------------------------------- OBS_DEBUG_TTL = 3600 # 1 hour OBS_DEBUG_PUBSUB_CHANNEL = "stargazer:obs_debug"
[docs] async def publish_debug_event( event_type: str, subsystem: str, *, platform: str = "", channel_id: str = "", hub_id: str = "", user_id: str = "", request_id: str = "", run_id: str = "", phase: str = "", status: str = "ok", preview: str = "", llm_output: str = "", duration_ms: float = 0.0, token_est: int = 0, payload: dict[str, Any] | None = None, ) -> None: """Persist and broadcast one short-lived forensic debug event. The debug-tier analogue of :func:`publish_observability_event`, backing the 1-hour forensic event bus. It builds a hash matching the ``idx:obs_debug`` schema (every ``TAG`` field via :func:`_tag_val`, ``preview`` clamped by :func:`_truncate_preview`), writes it to ``obs_debug:{uid}`` with a :data:`OBS_DEBUG_TTL` expiry, and publishes the wire form on :data:`OBS_DEBUG_PUBSUB_CHANNEL` (``stargazer:obs_debug``) via a single Redis pipeline. The large ``llm_output`` is stored in the hash for forensic retrieval but intentionally kept out of the indexed and pub/sub fields to avoid index bloat and wasted bandwidth. Read back by :func:`search_debug_events`. Called broadly across the codebase (``anamnesis_engine.py``, ``task_manager.py``, ``proactive_triage.py``, ``background_tasks.py``) and internally by :class:`Observability` and :class:`Timer`; failures are logged at debug and never raised. Args: event_type (str): Debug event type tag. subsystem (str): Subsystem that produced the event. platform (str): Originating platform, if any. channel_id (str): Originating channel id, if any. hub_id (str): Originating hub id, if any. user_id (str): Originating user id, if any. request_id (str): Per-turn correlation id, if any. run_id (str): Run/job correlation id, if any. phase (str): Lifecycle phase label, if any. status (str): Outcome status (default ``ok``). preview (str): Human-readable preview (truncated when stored). llm_output (str): Optional large LLM output, stored but not indexed. duration_ms (float): Elapsed time in milliseconds. token_est (int): Estimated token count, if relevant. payload (dict[str, Any] | None): Structured payload stored as JSON. Returns: None. """ if _redis is None: return None ts = time.time() uid = uuid.uuid4().hex key = f"obs_debug:{uid}" # Fields that ARE indexed by idx:obs_debug (all must match the schema) mapping: dict[str, Any] = { "event_type": _tag_val(event_type), "subsystem": _tag_val(subsystem), "platform": _tag_val(platform), "channel_id": _tag_val(channel_id), "hub_id": _tag_val(hub_id), "user_id": _tag_val(user_id), "request_id": _tag_val(request_id), "run_id": _tag_val(run_id), "phase": _tag_val(phase), "status": _tag_val(status), "preview": _truncate_preview(preview, 2000), "timestamp": str(ts), "duration_ms": str(round(duration_ms, 3)), "token_est": str(int(token_est)), # Stored-only (not FT-indexed): "llm_output": llm_output or "", "payload_json": json.dumps(payload or {}, default=str), } # Wire payload for pub/sub (omit llm_output from live stream to save bandwidth) wire: dict[str, Any] = { "event_type": event_type, "subsystem": subsystem, "platform": platform, "channel_id": channel_id, "hub_id": hub_id, "user_id": user_id, "request_id": request_id, "run_id": run_id, "phase": phase, "status": status, "preview": mapping["preview"], "timestamp": ts, "duration_ms": round(duration_ms, 3), "token_est": int(token_est), "payload": payload or {}, "doc_id": key, } try: pipe = _redis.pipeline() pipe.hset(key, mapping=mapping) pipe.expire(key, OBS_DEBUG_TTL) pipe.publish(OBS_DEBUG_PUBSUB_CHANNEL, json.dumps(wire, default=str)) await pipe.execute() except Exception: logger.debug("publish_debug_event failed", exc_info=True) return None
[docs] async def search_debug_events( query: str, *, limit: int = 50, offset: int = 0, ) -> list[dict[str, Any]]: """Query the ``idx:obs_debug`` index and hydrate full forensic docs. The read side of :func:`publish_debug_event`. It runs an ``FT.SEARCH`` (sorted by ``timestamp`` descending, paged and clamped to 500) and then pipelines an ``HGETALL`` per hit so each returned row carries not just the indexed fields but also the stored-only ``llm_output`` and parsed ``payload``, decoded via :func:`_hdecode` with numerics coerced. This is what lets the debug dashboard replay the large LLM output that was deliberately excluded from the index. Called extensively by the observability web layer (``web/obs.py``) for history, per-request and per-run forensic views. Returns an empty list when Redis is unavailable, the index module is not yet importable, or the search fails. Args: query (str): A RediSearch query string against ``idx:obs_debug``. limit (int): Page size (clamped to 1..500). offset (int): Page offset. Returns: list[dict[str, Any]]: Hydrated debug-event dicts (indexed fields plus ``llm_output`` and ``payload``), newest first. """ if _redis is None: return [] try: from init_redis_indexes import OBS_DEBUG_INDEX_NAME # type: ignore[import] from redis.commands.search.query import Query except ImportError: logger.debug("search_debug_events: init_redis_indexes not ready") return [] try: q = Query(query) q.sort_by("timestamp", asc=False) q.paging(offset, max(1, min(limit, 500))) q.dialect(2) result = await _redis.ft(OBS_DEBUG_INDEX_NAME).search(q) except Exception: logger.debug("search_debug_events FT.SEARCH failed", exc_info=True) return [] if not result.docs: return [] # Hydrate llm_output + payload_json via pipeline HGETALL pipe = _redis.pipeline() for doc in result.docs: pipe.hgetall(doc.id) try: raw_results = await pipe.execute() except Exception: logger.debug("search_debug_events pipeline execute failed", exc_info=True) raw_results = [{} for _ in result.docs] out: list[dict[str, Any]] = [] for doc, raw_h in zip(result.docs, raw_results): row: dict[str, Any] = {"doc_id": doc.id} try: hdata = _hdecode(raw_h) if raw_h else {} # Parse payload_json pj = hdata.pop("payload_json", "{}") try: row["payload"] = json.loads(pj) except Exception: row["payload"] = {} # Merge all remaining HASH fields (includes llm_output) row.update(hdata) # Coerce numerics if "timestamp" in row: row["timestamp"] = float(row["timestamp"]) if "duration_ms" in row: row["duration_ms"] = float(row["duration_ms"]) if "token_est" in row: row["token_est"] = int(row["token_est"]) except Exception: logger.debug("search_debug_events doc hydrate failed", exc_info=True) out.append(row) return out
# --------------------------------------------------------------------------- # proxy_tel store — 7-day durable telemetry for gemini-cli-proxy events # --------------------------------------------------------------------------- PROXY_TELEMETRY_TTL = 7 * 86400 # 604 800 seconds PROXY_TELEMETRY_KEY_PREFIX = "proxy_tel:"
[docs] async def publish_proxy_telemetry_event(event: dict[str, Any]) -> str | None: """Persist one gemini-cli-proxy telemetry event for durable inspection. Ingests a raw telemetry object emitted by the proxy's ``telemetry.ts`` and stores it as a hash under ``proxy_tel:{uuid}`` (:data:`PROXY_TELEMETRY_KEY_PREFIX`) with a 7-day :data:`PROXY_TELEMETRY_TTL` expiry via a Redis pipeline. It normalises a few fields (event type, request id, model, account, HTTP status) for cheap in-process filtering and tolerantly parses the timestamp from a numeric or ISO-8601 ``ts``/``timestamp`` value (falling back to now), while preserving the entire original object in ``payload_json`` for forensic retrieval. Unlike the main observability events, these are not RediSearch-indexed and are read back by scanning in :func:`fetch_proxy_telemetry_events`. Called by the observability web layer (``web/obs.py``) as proxy telemetry is received. Args: event (dict[str, Any]): The raw telemetry object from the proxy. Returns: str | None: The ``proxy_tel:{uuid}`` key on success, or ``None`` when Redis is unavailable or the write failed. """ if _redis is None: return None raw_ts = event.get("ts") or event.get("timestamp") try: ts = float(raw_ts) except (TypeError, ValueError): if isinstance(raw_ts, str) and "T" in raw_ts: from datetime import datetime try: ts = datetime.fromisoformat(raw_ts.replace("Z", "+00:00")).timestamp() except Exception: ts = time.time() else: ts = time.time() key = f"{PROXY_TELEMETRY_KEY_PREFIX}{uuid.uuid4().hex}" mapping: dict[str, str] = { "event_type": str(event.get("type") or event.get("event_type") or "-"), "req_id": str(event.get("req_id") or "-"), "model": str(event.get("model") or "-"), "account": str(event.get("account") or "-"), "http_status": str(int(event.get("http_status") or event.get("status") or 0)), "timestamp": str(ts), "payload_json": json.dumps(event, default=str), } try: pipe = _redis.pipeline() pipe.hset(key, mapping=mapping) pipe.expire(key, PROXY_TELEMETRY_TTL) await pipe.execute() except Exception: logger.debug("publish_proxy_telemetry_event failed", exc_info=True) return None return key
[docs] async def fetch_proxy_telemetry_events( *, event_type: str = "", model: str = "", req_id: str = "", limit: int = 200, offset: int = 0, ) -> tuple[list[dict[str, Any]], int]: """Return paginated gemini-cli-proxy telemetry rows, newest first. The read side of :func:`publish_proxy_telemetry_event`. Because these records are not RediSearch-indexed, it performs a full ``SCAN`` of ``proxy_tel:*`` keys, pipelines an ``HGETALL`` for each, applies optional in-process filters, sorts by ``timestamp`` descending and pages the result. Each row exposes the normalised columns plus the parsed full ``payload``. Called by the observability web layer (``web/obs.py``) to render the proxy telemetry view. Returns empty when Redis is unavailable or no keys exist. Args: event_type (str): Optional exact event-type filter. model (str): Optional case-insensitive substring model filter. req_id (str): Optional exact request-id filter. limit (int): Page size. offset (int): Page offset. Returns: tuple[list[dict[str, Any]], int]: ``(rows, total_matching)`` for the requested page after filtering and sorting. """ if _redis is None: return [], 0 keys: list[str] = [] cursor = 0 while True: cursor, batch = await _redis.scan( cursor, match=f"{PROXY_TELEMETRY_KEY_PREFIX}*", count=500 ) keys.extend(batch) if cursor == 0: break if not keys: return [], 0 pipe = _redis.pipeline() for k in keys: pipe.hgetall(k) raw_results = await pipe.execute() rows: list[dict[str, Any]] = [] for key, raw in zip(keys, raw_results): if not raw: continue try: r: dict[str, str] = { (k.decode("utf-8") if isinstance(k, bytes) else k): ( v.decode("utf-8") if isinstance(v, bytes) else v ) for k, v in raw.items() } except Exception: continue # Apply optional filters if event_type and r.get("event_type", "-") != event_type: continue if model and model.lower() not in r.get("model", "").lower(): continue if req_id and r.get("req_id", "-") != req_id: continue try: payload = json.loads(r.get("payload_json", "{}") or "{}") except Exception: payload = {} rows.append( { "doc_id": key, "event_type": r.get("event_type", ""), "req_id": r.get("req_id", ""), "model": r.get("model", ""), "account": r.get("account", ""), "http_status": int(r.get("http_status", 0) or 0), "timestamp": float(r.get("timestamp", 0) or 0), "payload": payload, } ) rows.sort(key=lambda x: x["timestamp"], reverse=True) total = len(rows) return rows[offset : offset + limit], total
import asyncio import contextlib
[docs] class Observability: """Centralized, ergonomic facade for emitting metrics, timers and alerts. A thin synchronous-friendly client that callers reach through the module-level :data:`observability` singleton. Its methods log immediately and then best-effort fan the event into the forensic ``obs_debug`` store by scheduling :func:`publish_debug_event` as a fire-and-forget task on the running event loop (silently skipped when no loop is running), so call sites never have to ``await`` telemetry. It exposes :meth:`increment` for counters, :meth:`timer` for timing via the :class:`Timer` context manager/decorator, and :meth:`alert` for critical alerts. Used pervasively across the codebase (e.g. ``prompt_renderer.py``, ``prompt_context.py``, ``background_tasks.py``, ``message_processor/processor.py``, ``limbic_system/coordinator.py``). """
[docs] def increment(self, metric_name: str, tags: dict[str, str] = None) -> None: """Increment a named counter metric, logging and best-effort persisting it. Emits a debug log line and, when a Redis client is attached and an event loop is running, schedules a fire-and-forget ``metric_increment`` debug event so the counter shows up in the forensic ``obs_debug`` store. If no loop is running (e.g. called from sync startup code) the persistence is silently skipped. Interacts with :func:`publish_debug_event` via ``loop.create_task(...)`` (writing ``obs_debug:{uid}`` and publishing on ``stargazer:obs_debug``). Called widely across the codebase through the module-level ``observability`` singleton, e.g. ``prompt_renderer.py``, ``prompt_context.py``, ``background_tasks.py`` and ``kg_consolidation.py``. Args: metric_name: Name of the counter metric to increment. tags: Optional dimension tags attached to the metric event. Returns: None. """ logger.debug("[METRIC] Increment: %s tags=%s", metric_name, tags) if _redis: try: loop = asyncio.get_running_loop() loop.create_task( publish_debug_event( event_type="metric_increment", subsystem="observability", payload={"metric": metric_name, "tags": tags}, ) ) except RuntimeError: pass
[docs] def timer(self, metric_name: str, subsystem: str = "observability") -> Timer: """Create a :class:`Timer` for *metric_name*, usable as a context manager or decorator. The returned :class:`Timer` measures elapsed wall-clock time and, on exit, best-effort publishes a ``timer`` debug event tagged with *subsystem*. Constructs and returns a :class:`Timer` instance; no I/O happens here. Called via the ``observability`` singleton both as ``with observability.timer(...)`` (e.g. ``anamnesis_engine.py``, ``parallax_engine.py``, ``message_processor/processor.py``) and as a decorator ``@observability.timer(...)`` (e.g. ``limbic_system/ coordinator.py``, ``message_processor/processor.py``). Args: metric_name: Name of the timing metric to record. subsystem: Subsystem label stored on the emitted timer event. Returns: Timer: A new timer bound to *metric_name* and *subsystem*. """ return Timer(metric_name, subsystem)
[docs] def alert(self, severity: str, message: str, metadata: dict = None) -> None: """Raise a critical alert, logging a warning and best-effort persisting it. Logs the alert at WARNING level and, when a Redis client is attached and an event loop is running, schedules a fire-and-forget ``alert`` debug event (with ``status`` set to the lowercased severity and a 200-char message preview) into the forensic ``obs_debug`` store. Persistence is skipped silently when no event loop is running. Interacts with :func:`publish_debug_event` via ``loop.create_task(...)`` (writing ``obs_debug:{uid}`` and publishing on ``stargazer:obs_debug``). Called through the ``observability`` singleton from validation/guard paths such as ``config.py``, ``prompt_context.py``, ``wallet_key_utils.py`` and ``message_processor/processor.py``. Args: severity: Alert severity label (also used, lowercased, as the event ``status``). message: Human-readable alert message (preview truncated to 200 chars when stored). metadata: Optional structured context attached to the event payload. Returns: None. """ logger.warning( "[ALERT] Severity: %s - Message: %s metadata=%s", severity, message, metadata, ) if _redis: try: loop = asyncio.get_running_loop() loop.create_task( publish_debug_event( event_type="alert", subsystem="observability", status=severity.lower(), preview=message[:200], payload=metadata, ) ) except RuntimeError: pass
[docs] class Timer: """A dual-purpose wall-clock timer usable as a context manager or decorator. Measures elapsed time with ``time.perf_counter`` and, when the timed region finishes, logs the duration and best-effort emits a ``timer`` debug event via :func:`publish_debug_event` (fire-and-forget on the running loop). The same instance works as ``with observability.timer(...):`` (see :meth:`__enter__` /:meth:`__exit__`) and as ``@observability.timer(...)`` (see :meth:`__call__`, which detects sync vs. async callables); it is constructed by :meth:`Observability.timer`. Used across hot paths such as ``anamnesis_engine.py``, ``parallax_engine.py``, ``message_processor/processor.py`` and ``limbic_system/coordinator.py``. """
[docs] def __init__(self, metric_name: str, subsystem: str = "observability"): """Initialise the timer with a metric name and subsystem label. Stores configuration only; timing does not begin until the timer is entered as a context manager or the wrapped function is invoked. Constructed by :meth:`Observability.timer`. Args: metric_name: Name of the timing metric this timer records. subsystem: Subsystem label stored on emitted timer events. """ self.metric_name = metric_name self.subsystem = subsystem self.start_time = None
[docs] def __enter__(self) -> Timer: """Start the high-resolution clock and return self for ``with`` blocks. Records ``time.perf_counter()`` into ``self.start_time``. Invoked implicitly by the Python runtime when the timer is used as ``with observability.timer(...):``. Returns: Timer: This timer instance. """ self.start_time = time.perf_counter() return self
[docs] def __exit__(self, exc_type, exc_val, exc_tb) -> bool: """Compute the elapsed duration on context exit and emit a timer event. Logs the elapsed milliseconds and, when a Redis client is attached and an event loop is running, schedules a fire-and-forget ``timer`` debug event via :func:`publish_debug_event` (writing ``obs_debug:{uid}`` and publishing on ``stargazer:obs_debug``). Always returns ``False`` so any exception raised inside the ``with`` block propagates normally. Invoked implicitly by the runtime when the ``with`` block ends. Args: exc_type: Exception type if the block raised, else ``None``. exc_val: Exception instance if the block raised, else ``None``. exc_tb: Traceback if the block raised, else ``None``. Returns: bool: ``False`` (do not suppress exceptions). """ duration_ms = (time.perf_counter() - self.start_time) * 1000.0 logger.debug("[TIMER] %s took %.2f ms", self.metric_name, duration_ms) if _redis: try: loop = asyncio.get_running_loop() loop.create_task( publish_debug_event( event_type="timer", subsystem=self.subsystem, duration_ms=duration_ms, preview=f"[TIMER] {self.metric_name} took {duration_ms:.1f}ms", payload={"metric": self.metric_name}, ) ) except RuntimeError: pass return False
[docs] def __call__(self, func): """Wrap *func* as a decorator that times each invocation. Detects whether *func* is a coroutine function and returns either an async or sync wrapper that measures the call's wall-clock duration and emits a ``timer`` debug event afterward (see the nested wrappers). Invoked implicitly when the timer is used as a decorator, e.g. ``@observability.timer("message_processing_total", subsystem=...)`` in ``message_processor/processor.py`` and ``limbic_system/coordinator.py``. Args: func: The sync or async callable to wrap with timing. Returns: Callable: A ``functools.wraps``-preserving wrapper around *func*. """ if inspect.iscoroutinefunction(func): @functools.wraps(func) async def async_wrapper(*args, **kwargs): """Await *func*, then record its duration as a timer event. Times the awaited call with ``time.perf_counter`` and, in a ``finally`` block, best-effort publishes a ``timer`` debug event via :func:`publish_debug_event`. The wrapped result is returned unchanged. """ start_time = time.perf_counter() try: return await func(*args, **kwargs) finally: duration_ms = (time.perf_counter() - start_time) * 1000.0 logger.debug("[TIMER] %s took %.2f ms", self.metric_name, duration_ms) if _redis: try: loop = asyncio.get_running_loop() loop.create_task( publish_debug_event( event_type="timer", subsystem=self.subsystem, duration_ms=duration_ms, preview=f"[TIMER] {self.metric_name} took {duration_ms:.1f}ms", payload={"metric": self.metric_name}, ) ) except RuntimeError: pass return async_wrapper else: @functools.wraps(func) def sync_wrapper(*args, **kwargs): """Call *func*, then record its duration as a timer event. Times the synchronous call with ``time.perf_counter`` and, in a ``finally`` block, best-effort publishes a ``timer`` debug event via :func:`publish_debug_event`. The wrapped result is returned unchanged. """ start_time = time.perf_counter() try: return func(*args, **kwargs) finally: duration_ms = (time.perf_counter() - start_time) * 1000.0 logger.debug("[TIMER] %s took %.2f ms", self.metric_name, duration_ms) if _redis: try: loop = asyncio.get_running_loop() loop.create_task( publish_debug_event( event_type="timer", subsystem=self.subsystem, duration_ms=duration_ms, preview=f"[TIMER] {self.metric_name} took {duration_ms:.1f}ms", payload={"metric": self.metric_name}, ) ) except RuntimeError: pass return sync_wrapper
observability = Observability()