"""Unified observability events: Redis HASH + RediSearch (``idx:observability``) + pub/sub.
Each event is stored as ``obs:{uuid}`` with fields aligned to the RediSearch index,
then published on ``stargazer:observability`` for live admin dashboards.
"""
from __future__ import annotations
import functools
import inspect
import jsonutil as json
import logging
import time
import uuid
from typing import Any, TYPE_CHECKING
if TYPE_CHECKING:
import redis.asyncio as aioredis
logger = logging.getLogger(__name__)
# Fanout channel for WebSocket subscribers (full JSON including payload).
OBSERVABILITY_CHANNEL = "stargazer:observability"
TTL_SECONDS = 3 * 86400
# RediSearch query: all persisted HTTP/API failures (incl. transport with http_status=0).
HTTP_ERROR_EVENT_TYPE = "http_error"
HTTP_ERROR_SEARCH_QUERY = f"@event_type:{{{HTTP_ERROR_EVENT_TYPE}}}"
# Module-level Redis client (decode_responses=True recommended for FT + HSET).
_redis: aioredis.Redis | None = None
[docs]
def set_observability_redis(client: aioredis.Redis | None) -> None:
"""Attach the shared async Redis client used for all observability storage.
Sets the module-level ``_redis`` handle that every ``publish_*``, ``fetch_*``
and ``search_*`` function in this module reads; until it is set those calls
short-circuit and become no-ops (returning ``None`` or empty results). This
is the wiring step each microservice performs at startup so observability
events can be persisted and fanned out. Called from the service entrypoints
``agents_main.py``, ``inference_main.py``, ``consolidation_main.py`` and
``web_main.py``, and re-bound by ``web/observability_routes.py``.
Args:
client (aioredis.Redis | None): The shared async Redis client, or
``None`` to detach and disable observability persistence.
"""
global _redis
_redis = client
[docs]
def get_http_call_origin() -> str:
"""Identify the application frame that triggered an outbound HTTP call.
Walks the live call stack and returns ``file:lineno in function`` for the
first frame that is not part of the HTTP/observability plumbing, skipping
``openrouter_client``, this module, the embedding pools, and stdlib/test
machinery (importlib, contextlib, asyncio, pytest). This gives HTTP error
telemetry a meaningful "who made this call" attribution instead of pointing
at the transport layer. Pure stack inspection with no I/O; any exception is
swallowed and ``"unknown"`` returned. Called by the OpenRouter transport
(``openrouter_client/transport.py``) and the embedding pool
(``gemini_embed_pool.py``) when recording HTTP failures.
Returns:
str: A ``"basename:lineno in function"`` origin string, or ``"unknown"``
if no suitable frame is found or inspection fails.
"""
import inspect
import os
try:
for frame_info in inspect.stack():
filename = frame_info.filename
if not filename:
continue
normalized = filename.replace("\\", "/").lower()
if "openrouter_client" in normalized or "observability.py" in normalized:
continue
if "gemini_embed_pool.py" in normalized or "openrouter_embeddings.py" in normalized:
continue
if "importlib" in normalized or "contextlib" in normalized or "asyncio" in normalized:
continue
if "pytest" in normalized or "pluggy" in normalized or "_pytest" in normalized:
continue
base_file = os.path.basename(filename)
return f"{base_file}:{frame_info.lineno} in {frame_info.function}"
except Exception:
pass
return "unknown"
[docs]
def generate_request_id() -> str:
"""Mint a fresh correlation id for a single bot turn.
Produces a short ``req_{hex}`` token that is threaded through one turn's
observability events (inbound message, LLM request, tool calls, response
phases) so they can be stitched together in the dashboard. Pure id
generation with no I/O. Called by the message processor
(``message_processor/processor.py``), which stashes the value under
``observability_request_id`` on the message for downstream emitters.
Returns:
str: A new ``req_`` correlation id.
"""
return f"req_{uuid.uuid4().hex[:12]}"
def _tag_val(s: str) -> str:
"""Normalise a value for a RediSearch ``TAG`` field, substituting a sentinel.
RediSearch cannot index or match an empty ``TAG`` value, so this trims the
input and replaces an empty/whitespace-only string with the ``"-"`` sentinel,
keeping every indexed field queryable. Pure helper used by the hash-building
code in :func:`publish_observability_event` and :func:`publish_debug_event`
to populate ``TAG`` columns such as ``event_type``, ``platform`` and
``user_id``; it has no callers outside this module.
Args:
s (str): The raw field value (possibly empty or ``None``-ish).
Returns:
str: The stripped value, or ``"-"`` when it would otherwise be empty.
"""
t = (s or "").strip()
return t if t else "-"
def _truncate_preview(s: str, max_len: int = 2000) -> str:
"""Clamp a preview string to *max_len* characters, appending ``...`` if cut.
Keeps the human-readable ``preview`` field bounded so it stays cheap to
index in RediSearch and to ship over pub/sub. An empty/falsy input yields
an empty string.
This is a pure helper called throughout this module by every ``publish_*``
event builder (``publish_observability_event``, ``publish_tool_event``,
``publish_response_event``, ``publish_debug_event``, etc.) to shape the
``preview`` value; it has no callers outside this file.
Args:
s: Raw preview text (may be empty).
max_len: Maximum number of characters to keep before truncating.
Returns:
str: The original string, or its first *max_len* chars plus ``...``.
"""
if not s:
return ""
return s[:max_len] + ("..." if len(s) > max_len else "")
[docs]
async def publish_observability_event(
*,
event_type: str,
platform: str = "",
channel_id: str = "",
user_id: str = "",
tool_name: str = "",
request_id: str = "",
preview: str = "",
timestamp: float | None = None,
http_status: int = 0,
http_service: str = "",
payload: dict[str, Any] | None = None,
) -> str | None:
"""Persist and broadcast one observability event — the core emitter.
Builds a flat hash for the RediSearch index (every ``TAG`` field passed
through :func:`_tag_val`, ``preview`` clamped by :func:`_truncate_preview`,
structured ``payload`` JSON-encoded), writes it to ``obs:{uuid}``, sets a
:data:`TTL_SECONDS` expiry, and publishes the full JSON wire form on the
:data:`OBSERVABILITY_CHANNEL` (``stargazer:observability``) pub/sub channel
for live admin dashboards — all in a single Redis pipeline. The HSET fields
align with the ``idx:observability`` schema so events are searchable by
:func:`search_observability_events`. This is the shared backend that every
typed ``publish_*`` helper in this module delegates to
(:func:`publish_tool_event`, :func:`publish_response_event`,
:func:`publish_http_error_event`, etc.); it returns ``None`` and does nothing
when Redis is unattached or the pipeline raises (failures are logged at debug
and never propagate).
Args:
event_type (str): Event type tag (e.g. ``tool_call``, ``http_error``).
platform (str): Originating platform, if any.
channel_id (str): Originating channel/conversation id, if any.
user_id (str): Originating user id, if any.
tool_name (str): Tool name, for tool events.
request_id (str): Per-turn correlation id.
preview (str): Human-readable preview (truncated when stored).
timestamp (float | None): Event time; defaults to now when ``None``.
http_status (int): HTTP status for HTTP events, else ``0``.
http_service (str): HTTP service name for HTTP events.
payload (dict[str, Any] | None): Structured payload stored as JSON.
Returns:
str | None: The ``obs:{uuid}`` Redis key on success, or ``None`` when
Redis is unavailable or the write failed.
"""
if _redis is None:
return None
ts = time.time() if timestamp is None else float(timestamp)
key = f"obs:{uuid.uuid4().hex}"
flat_payload = payload or {}
mapping: dict[str, str | bytes] = {
"event_type": _tag_val(event_type),
"platform": _tag_val(platform),
"channel_id": _tag_val(channel_id),
"user_id": _tag_val(user_id),
"tool_name": _tag_val(tool_name),
"request_id": _tag_val(request_id),
"preview": _truncate_preview(preview),
"timestamp": str(ts),
"http_status": str(int(http_status)),
"http_service": _tag_val(http_service),
"payload_json": json.dumps(flat_payload, default=str),
}
wire: dict[str, Any] = {
"event_type": event_type,
"platform": platform,
"channel_id": channel_id,
"user_id": user_id,
"tool_name": tool_name,
"request_id": request_id,
"preview": mapping["preview"],
"timestamp": ts,
"http_status": int(http_status),
"http_service": http_service,
"payload": flat_payload,
"doc_id": key,
}
try:
pipe = _redis.pipeline()
pipe.hset(key, mapping=mapping) # type: ignore[arg-type]
pipe.expire(key, TTL_SECONDS)
pipe.publish(OBSERVABILITY_CHANNEL, json.dumps(wire, default=str))
await pipe.execute()
except Exception:
logger.debug("Observability publish failed", exc_info=True)
return None
return key
[docs]
async def publish_response_event(
phase: str,
request_id: str = "",
channel_id: str = "",
user_id: str = "",
model: str = "",
duration_ms: float = 0,
tool_rounds: int = 0,
error: str = "",
platform: str = "",
**extra: Any,
) -> None:
"""Record a phase of the response-generation lifecycle as a ``response_phase`` event.
Captures milestones of one bot turn (e.g. start/end of LLM generation),
including the model used, elapsed time, number of tool rounds and any error,
so the dashboard can reconstruct per-turn timing. Arbitrary extra keyword
fields are merged verbatim into the payload.
Delegates to :func:`publish_observability_event`, persisting the event to
``obs:{uuid}``, indexing it and publishing it on ``stargazer:observability``;
``error`` is clamped to 500 chars in the payload (120 in the preview). Called
by the message generator at several points
(``message_processor/generate_and_send.py`` around lines 1312, 1428 and 1486)
to mark response phases.
Args:
phase: Name of the lifecycle phase (e.g. ``start``, ``complete``).
request_id: Correlation id for the bot turn.
channel_id: ID of the originating channel/conversation.
user_id: ID of the user being responded to.
model: Model identifier used for generation.
duration_ms: Elapsed time for this phase in milliseconds.
tool_rounds: Number of tool-call rounds in the turn so far.
error: Error message if the phase failed (truncated when stored).
platform: Originating platform.
**extra: Additional key/value fields merged into the event payload.
Returns:
None.
"""
payload: dict[str, Any] = {
"phase": phase,
"model": model,
"duration_ms": round(duration_ms, 1),
"tool_rounds": tool_rounds,
**extra,
}
if error:
payload["error"] = error[:500]
await publish_observability_event(
event_type="response_phase",
platform=platform,
channel_id=channel_id,
user_id=user_id,
request_id=request_id,
preview=_truncate_preview(
f"phase={phase} model={model} err={error[:120] if error else ''}"
),
payload=payload,
)
[docs]
async def publish_background_event(
task_name: str,
status: str = "running",
task_id: str = "",
progress: float | None = None,
result: str = "",
error: str = "",
**extra: Any,
) -> None:
"""Record the state of a background task as a ``background_task`` event.
Reports task lifecycle (running/done/failed), optional progress fraction,
and truncated result/error text for the admin dashboard's background-task
view. If no ``task_id`` is supplied a short random hex id is generated so
successive updates for the same task can be correlated by the caller.
Delegates to :func:`publish_observability_event`, writing the ``obs:{uuid}``
hash, indexing under ``idx:observability`` and publishing on
``stargazer:observability``; ``result`` is clamped to 1000 chars and
``error`` to 500. No internal callers were found for this function in the
repo (it is a public helper available for background workers to call).
Args:
task_name: Human-readable name of the background task.
status: Task status string (default ``running``).
task_id: Stable id for the task; auto-generated when empty.
progress: Optional completion fraction (0.0–1.0); omitted when ``None``.
result: Optional result text (truncated when stored).
error: Optional error text (truncated when stored).
**extra: Additional key/value fields merged into the event payload.
Returns:
None.
"""
tid = task_id or uuid.uuid4().hex[:12]
payload: dict[str, Any] = {
"task_name": task_name,
"status": status,
"task_id": tid,
**extra,
}
if progress is not None:
payload["progress"] = progress
if result:
payload["result"] = result[:1000]
if error:
payload["error"] = error[:500]
await publish_observability_event(
event_type="background_task",
preview=_truncate_preview(f"bg {task_name} {status} {tid}"),
payload=payload,
)
[docs]
async def publish_classifier_event(
*,
request_id: str = "",
channel_id: str = "",
user_id: str = "",
platform: str = "",
strategy: str = "",
tools: list[str] | None = None,
top_matches: list[dict[str, Any]] | None = None,
phase: str = "user_message",
) -> None:
"""Record a classifier tool-selection decision as a ``classifier_selection`` event.
Captures which routing strategy fired, the list of tools the classifier
selected, and the top scoring vector matches, so the dashboard can audit how
tools were chosen for a turn. Lists are capped (tools to 256, top matches to
20) before storage.
Delegates to :func:`publish_observability_event`, persisting to
``obs:{uuid}``, indexing under ``idx:observability`` and publishing on
``stargazer:observability``. Called by the vector tool classifier after it
resolves the tool set (``classifiers/vector_classifier.py`` around line 57).
Args:
request_id: Correlation id for the bot turn.
channel_id: ID of the originating channel/conversation.
user_id: ID of the user being classified for.
platform: Originating platform.
strategy: Name of the classification strategy/path taken.
tools: Tool names selected by the classifier (capped at 256).
top_matches: Per-tool scoring dicts for the best matches (capped at 20).
phase: Classification phase label (default ``user_message``).
Returns:
None.
"""
tools = tools or []
top_matches = top_matches or []
await publish_observability_event(
event_type="classifier_selection",
platform=platform,
channel_id=channel_id,
user_id=user_id,
request_id=request_id,
preview=_truncate_preview(
f"classifier phase={phase} strategy={strategy} n_tools={len(tools)}",
),
payload={
"phase": phase,
"strategy": strategy,
"tools": tools[:256],
"top_matches": top_matches[:20],
},
)
[docs]
async def publish_embedding_event(
*,
source: str,
model: str,
duration_ms: float,
success: bool,
text_len: int = 0,
batch_size: int = 0,
request_id: str = "",
channel_id: str = "",
user_id: str = "",
platform: str = "",
error: str = "",
) -> None:
"""Record one embedding-API call as an ``embedding_api`` observability event.
Tracks latency, success, text/batch sizes and the model for each embedding
request, giving the dashboard visibility into embedding throughput and
failures. The ``error`` text is clamped to 500 chars in the payload.
Delegates to :func:`publish_observability_event`, writing ``obs:{uuid}``,
indexing under ``idx:observability`` and publishing on
``stargazer:observability``. Called from the OpenRouter embeddings transport
around several call sites in ``openrouter_client/transport.py`` (lines 534,
603, 629 and 839), covering single and batched embedding requests.
Args:
source: Logical source/caller of the embedding request.
model: Embedding model identifier.
duration_ms: Wall-clock duration of the call in milliseconds.
success: Whether the embedding call succeeded.
text_len: Character length of the embedded text.
batch_size: Number of items in the batch (0 for single).
request_id: Correlation id for the bot turn, if any.
channel_id: Originating channel id, if any.
user_id: Originating user id, if any.
platform: Originating platform, if any.
error: Error message when the call failed (truncated when stored).
Returns:
None.
"""
await publish_observability_event(
event_type="embedding_api",
platform=platform,
channel_id=channel_id,
user_id=user_id,
request_id=request_id,
preview=_truncate_preview(
f"embed {source} model={model} ok={success} {duration_ms:.1f}ms "
f"len={text_len} batch={batch_size}",
),
payload={
"source": source,
"model": model,
"duration_ms": round(duration_ms, 1),
"success": success,
"text_len": text_len,
"batch_size": batch_size,
"error": (error or "")[:500],
},
)
[docs]
async def publish_message_observability_event(
*,
kind: str,
platform: str,
channel_id: str,
user_id: str,
text_preview: str,
request_id: str = "",
message_id: str = "",
) -> None:
"""Record an inbound or outbound chat message as a ``message_in``/``message_out`` event.
Logs the text preview of a user message or bot reply so the dashboard can
show a live conversation timeline. The event type is chosen from *kind*:
``user_in`` maps to ``message_in``; anything else maps to ``message_out``.
Delegates to :func:`publish_observability_event`, persisting to
``obs:{uuid}``, indexing under ``idx:observability`` and publishing on
``stargazer:observability``; ``text_preview`` is clamped by
:func:`_truncate_preview`. Called when ingesting a user message
(``message_processor/processor.py`` around line 2067) and after sending a
reply (``message_processor/generate_and_send.py`` around line 2549).
Args:
kind: ``user_in`` for inbound, otherwise treated as outbound.
platform: Originating platform (e.g. ``discord``).
channel_id: ID of the channel/conversation.
user_id: ID of the user.
text_preview: Message text (truncated when stored).
request_id: Correlation id for the bot turn, if any.
message_id: Platform message id, stored in the payload.
Returns:
None.
"""
et = "message_in" if kind == "user_in" else "message_out"
await publish_observability_event(
event_type=et,
platform=platform,
channel_id=channel_id,
user_id=user_id,
request_id=request_id,
preview=_truncate_preview(text_preview),
payload={"kind": kind, "message_id": message_id},
)
[docs]
async def publish_http_error_event(
*,
http_service: str,
http_status: int = 0,
request_id: str = "",
channel_id: str = "",
user_id: str = "",
platform: str = "",
duration_ms: float = 0,
endpoint: str = "",
detail: str = "",
error_kind: str = "",
) -> None:
"""Record an HTTP/API failure as an ``http_error`` observability event.
Captures the failing service, HTTP status (``0`` for transport-level
failures), endpoint, latency and a detail/error-kind string for every
outbound HTTP error across the system. These events back the dashboard's
error views and the :func:`count_http_errors_since` /
:func:`aggregate_http_errors_by_status` queries via the
``HTTP_ERROR_SEARCH_QUERY`` constant.
Delegates to :func:`publish_observability_event`, writing ``obs:{uuid}``
with ``http_status``/``http_service`` indexed, and publishing on
``stargazer:observability``; ``endpoint`` is clamped to 500 chars and
``detail`` to 800. Called from many HTTP clients/tools across the repo,
including ``anamnesis_engine.py``, ``response_postprocessor.py``,
``gemini_embed_pool.py``, ``rag_system/file_rag_manager.py``,
``tools/brave_search.py`` and
``message_processor/openrouter_structured_preprocess.py``.
Args:
http_service: Name of the service/API that failed.
http_status: HTTP status code, or ``0`` for transport errors.
request_id: Correlation id for the bot turn, if any.
channel_id: Originating channel id, if any.
user_id: Originating user id, if any.
platform: Originating platform, if any.
duration_ms: Latency until failure in milliseconds.
endpoint: The URL/endpoint that was called (truncated when stored).
detail: Free-form error detail (truncated when stored).
error_kind: Optional error category; added to payload when set.
Returns:
None.
"""
pl: dict[str, Any] = {
"duration_ms": round(duration_ms, 1),
"endpoint": (endpoint or "")[:500],
"detail": (detail or "")[:800],
}
if error_kind:
pl["error_kind"] = error_kind
await publish_observability_event(
event_type=HTTP_ERROR_EVENT_TYPE,
platform=platform,
channel_id=channel_id,
user_id=user_id,
request_id=request_id,
http_status=int(http_status),
http_service=http_service,
preview=_truncate_preview(
f"{http_service} status={http_status} {endpoint[:80]!s} {detail[:120]!s}",
),
payload=pl,
)
LLM_REQUEST_TTL = 30 * 60 # 30 minutes — large payloads, debugging only
LLM_REQUEST_KEY_PREFIX = "llm_req:"
[docs]
async def publish_llm_request_event(
*,
request_id: str = "",
channel_id: str = "",
user_id: str = "",
platform: str = "",
model: str = "",
messages: list[dict] | None = None,
tool_names: list[str] | None = None,
response_text: str = "",
error: str = "",
http_status: int = 0,
duration_ms: float = 0.0,
timestamp: float | None = None,
) -> str | None:
"""Persist one full LLM request/response record for debugging.
Captures the heavyweight detail of a single model call — the complete
``messages`` list, tool names offered, the response text, error, HTTP status
and latency — and writes it as a hash under
``llm_req:{uuid}`` (:data:`LLM_REQUEST_KEY_PREFIX`) with a short
:data:`LLM_REQUEST_TTL` (30-minute) expiry. It is deliberately kept out of
the main ``idx:observability`` index and instead indexed by ``idx:llm_req``,
because the payloads are large and only useful for forensic replay; the
``messages`` list is JSON-encoded defensively. Written via a Redis pipeline
(HSET + EXPIRE); returns ``None`` and logs at debug if Redis is unattached or
the write fails. Called by the OpenRouter transport
(``openrouter_client/transport.py``) after each model round-trip, and read
back by :func:`fetch_llm_requests`.
Args:
request_id (str): Per-turn correlation id.
channel_id (str): Originating channel id.
user_id (str): Originating user id.
platform (str): Originating platform.
model (str): Model identifier used for the call.
messages (list[dict] | None): The full request message list.
tool_names (list[str] | None): Names of tools offered to the model.
response_text (str): The model's response text.
error (str): Error message if the call failed.
http_status (int): HTTP status of the call.
duration_ms (float): Wall-clock latency in milliseconds.
timestamp (float | None): Event time; defaults to now when ``None``.
Returns:
str | None: The ``llm_req:{uuid}`` key on success, or ``None`` when
Redis is unavailable or the write failed.
"""
if _redis is None:
return None
ts = time.time() if timestamp is None else float(timestamp)
key = f"{LLM_REQUEST_KEY_PREFIX}{uuid.uuid4().hex}"
try:
msg_json = json.dumps(messages or [], default=str)
except Exception:
msg_json = "[]"
mapping: dict[str, str] = {
"request_id": (request_id or "-"),
"channel_id": (channel_id or "-"),
"user_id": (user_id or "-"),
"platform": (platform or "-"),
"model": (model or "-"),
"timestamp": str(ts),
"tool_names": json.dumps(tool_names or [], default=str),
"messages_json": msg_json,
"msg_count": str(len(messages or [])),
"response_text": response_text or "",
"error": error or "",
"http_status": str(int(http_status)),
"duration_ms": str(round(duration_ms, 1)),
}
try:
pipe = _redis.pipeline()
pipe.hset(key, mapping=mapping)
pipe.expire(key, LLM_REQUEST_TTL)
await pipe.execute()
except Exception:
logger.debug("publish_llm_request_event failed", exc_info=True)
return None
return key
[docs]
async def fetch_llm_requests(
*,
channel_id: str = "",
model: str = "",
limit: int = 50,
offset: int = 0,
) -> tuple[list[dict], int]:
"""Return paginated LLM request records (newest first) for the debug UI.
The read side of :func:`publish_llm_request_event`. It prefers an
``FT.SEARCH`` against the ``idx:llm_req`` index (escaping ``channel_id`` and
``model`` filter values and sorting by ``timestamp`` descending), then
hydrates each matching doc with a pipelined ``HGETALL`` and decodes the
stored fields (parsing ``tool_names`` JSON, coercing numerics). If the index
is not yet available — e.g. during warm-up — it falls back to a full
``SCAN`` of ``llm_req:*`` keys with in-process filtering and sorting. Called
by the observability web layer (``web/obs.py``) to drive the LLM-requests
debugging view.
Args:
channel_id (str): Optional exact channel filter.
model (str): Optional model filter (exact tag in FT mode, substring in
the SCAN fallback).
limit (int): Page size (clamped to 1..500 in the FT path).
offset (int): Page offset.
Returns:
tuple[list[dict], int]: ``(rows, total_matching)`` where ``rows`` are the
hydrated request records for this page.
"""
if _redis is None:
return [], 0
# --- Attempt FT.SEARCH path ---
try:
from redis.commands.search.query import Query as _FTQuery
from init_redis_indexes import LLM_REQ_INDEX_NAME
filter_parts: list[str] = []
if channel_id:
safe_cid = channel_id.replace("-", "\\-").replace(":", "\\:")
filter_parts.append(f"@channel_id:{{{safe_cid}}}")
if model:
safe_model = (
model.lower()
.replace("-", "\\-")
.replace(".", "\\.")
.replace("/", "\\/")
)
filter_parts.append(f"@model:{{{safe_model}}}")
query_str = " ".join(filter_parts) if filter_parts else "*"
q = _FTQuery(query_str)
q.sort_by("timestamp", asc=False)
q.paging(offset, max(1, min(limit, 500)))
q.dialect(2)
result = await _redis.ft(LLM_REQ_INDEX_NAME).search(q)
total = int(getattr(result, "total", 0))
if not result.docs:
return [], total
pipe = _redis.pipeline()
for doc in result.docs:
pipe.hgetall(doc.id)
raw_results = await pipe.execute()
rows: list[dict] = []
for raw in raw_results:
if not raw:
continue
try:
r: dict[str, str] = {
(k.decode() if isinstance(k, bytes) else k): (
v.decode() if isinstance(v, bytes) else v
)
for k, v in raw.items()
}
rows.append(
{
"doc_id": r.get("doc_id", ""),
"request_id": r.get("request_id", ""),
"channel_id": r.get("channel_id", ""),
"user_id": r.get("user_id", ""),
"platform": r.get("platform", ""),
"model": r.get("model", ""),
"timestamp": float(r.get("timestamp", 0) or 0),
"msg_count": int(r.get("msg_count", 0) or 0),
"tool_names": json.loads(r.get("tool_names", "[]") or "[]"),
"messages_json": r.get("messages_json", "[]"),
"response_text": r.get("response_text", ""),
"error": r.get("error", ""),
"http_status": int(r.get("http_status", 0) or 0),
"duration_ms": float(r.get("duration_ms", 0) or 0),
}
)
except Exception:
continue
return rows, total
except Exception as _ft_exc:
logger.debug(
"fetch_llm_requests: FT.SEARCH unavailable (%s), falling back to SCAN",
_ft_exc,
)
# --- Fallback: full SCAN + in-process filter (index not yet ready) ---
keys: list[str] = []
cursor = 0
while True:
cursor, batch = await _redis.scan(
cursor, match=f"{LLM_REQUEST_KEY_PREFIX}*", count=500
)
keys.extend(batch)
if cursor == 0:
break
if not keys:
return [], 0
pipe = _redis.pipeline()
for k in keys:
pipe.hgetall(k)
raw_results = await pipe.execute()
rows = []
for key, raw in zip(keys, raw_results):
if not raw:
continue
try:
r = {
(k.decode() if isinstance(k, bytes) else k): (
v.decode() if isinstance(v, bytes) else v
)
for k, v in raw.items()
}
except Exception:
continue
if channel_id and r.get("channel_id", "") != channel_id:
continue
if model and model.lower() not in r.get("model", "").lower():
continue
try:
ts = float(r.get("timestamp", 0))
except ValueError:
ts = 0.0
rows.append(
{
"doc_id": key,
"request_id": r.get("request_id", ""),
"channel_id": r.get("channel_id", ""),
"user_id": r.get("user_id", ""),
"platform": r.get("platform", ""),
"model": r.get("model", ""),
"timestamp": ts,
"msg_count": int(r.get("msg_count", 0) or 0),
"tool_names": json.loads(r.get("tool_names", "[]") or "[]"),
"messages_json": r.get("messages_json", "[]"),
"response_text": r.get("response_text", ""),
"error": r.get("error", ""),
"http_status": int(r.get("http_status", 0) or 0),
"duration_ms": float(r.get("duration_ms", 0) or 0),
}
)
rows.sort(key=lambda x: x["timestamp"], reverse=True)
total = len(rows)
return rows[offset : offset + limit], total
def _truncate(obj: Any, max_len: int = 500) -> Any:
"""Stringify *obj* and clamp it to *max_len* characters for safe storage.
Differs from :func:`_truncate_preview` in that it accepts any object
(str-coercing it) and preserves ``None`` as ``None`` instead of ``""``,
suitable for nullable payload fields. Used within this module by
:func:`publish_tool_event` to bound the serialized tool ``arguments``;
there are no callers of this particular helper outside ``observability.py``.
Args:
obj: Any value to stringify and bound, or ``None``.
max_len: Maximum number of characters before truncation.
Returns:
Any: ``None`` when *obj* is ``None``; otherwise the (possibly
truncated, with trailing ``...``) string form of *obj*.
"""
if obj is None:
return None
s = str(obj)
return s[:max_len] + "..." if len(s) > max_len else s
def _hdecode(hdata: dict[Any, Any]) -> dict[str, str]:
"""Decode a raw Redis HGETALL result into a ``str``→``str`` mapping.
Redis may return keys/values as ``bytes`` (when the client is not in
``decode_responses`` mode), so this normalises both to UTF-8 strings,
replacing undecodable bytes rather than raising. Pure helper used to
hydrate hash docs inside :func:`search_observability_events` and
:func:`search_debug_events`; not called outside this module.
Args:
hdata: A raw hash mapping whose keys/values may be ``bytes`` or ``str``.
Returns:
dict[str, str]: The same mapping with every key and value as a string.
"""
out: dict[str, str] = {}
for k, v in hdata.items():
ks = k.decode("utf-8", errors="replace") if isinstance(k, bytes) else str(k)
vs = v.decode("utf-8", errors="replace") if isinstance(v, bytes) else str(v)
out[ks] = vs
return out
[docs]
async def search_observability_events(
query: str,
*,
limit: int = 50,
offset: int = 0,
sort_desc: bool = True,
) -> tuple[list[dict[str, Any]], int]:
"""Query the ``idx:observability`` index and hydrate matching event docs.
Runs an ``FT.SEARCH`` for the given RediSearch query (optionally sorted by
``timestamp`` descending, paged and clamped to at most 500), then pipelines
an ``HGETALL`` per hit to rebuild each event dict — decoding fields via
:func:`_hdecode`, parsing the structured ``payload_json`` and coercing
``timestamp``/``http_status``. This is the generic dashboard query used to
read back events written by :func:`publish_observability_event`. Called by
the observability routes (``web/observability_routes.py``) and internally by
:func:`get_recent_events`. All failures are logged at debug and yield empty
results rather than raising.
Args:
query (str): A RediSearch query string against ``idx:observability``.
limit (int): Page size (clamped to 1..500).
offset (int): Page offset.
sort_desc (bool): Sort by ``timestamp`` descending when ``True``.
Returns:
tuple[list[dict[str, Any]], int]: ``(documents, total_hits)`` where each
document includes ``doc_id``, the indexed fields and a parsed
``payload``; empty when Redis is unavailable or the search fails.
"""
if _redis is None:
return [], 0
from init_redis_indexes import OBS_INDEX_NAME
from redis.commands.search.query import Query
try:
q = Query(query)
if sort_desc:
q.sort_by("timestamp", asc=False)
q.paging(offset, max(1, min(limit, 500)))
q.dialect(2)
result = await _redis.ft(OBS_INDEX_NAME).search(q)
except Exception:
logger.debug("observability FT.SEARCH failed", exc_info=True)
return [], 0
out: list[dict[str, Any]] = []
if not result.docs:
return out, int(getattr(result, "total", 0))
pipe = _redis.pipeline()
for doc in result.docs:
pipe.hgetall(doc.id)
raw_results = await pipe.execute()
for doc, raw_h in zip(result.docs, raw_results):
row: dict[str, Any] = {"doc_id": doc.id}
try:
hdata = _hdecode(raw_h) if raw_h else {}
pj = hdata.get("payload_json", "{}")
try:
row["payload"] = json.loads(pj)
except json.JSONDecodeError:
row["payload"] = {}
for k in (
"event_type",
"platform",
"channel_id",
"user_id",
"tool_name",
"request_id",
"preview",
"http_service",
):
if k in hdata:
row[k] = hdata[k]
if "timestamp" in hdata:
row["timestamp"] = float(hdata["timestamp"])
if "http_status" in hdata:
row["http_status"] = int(hdata["http_status"])
except Exception:
logger.debug("observability doc hydrate failed", exc_info=True)
out.append(row)
total = int(getattr(result, "total", len(out)))
return out, total
[docs]
async def count_http_errors_since(since_ts: float) -> int:
"""Count ``http_error`` events recorded at or after a cutoff timestamp.
Issues a zero-row ``FT.SEARCH`` (``paging(0, 0)``) against
``idx:observability`` combining the :data:`HTTP_ERROR_SEARCH_QUERY` tag
filter with a ``@timestamp:[since_ts +inf]`` range and returns only the
total hit count, making it a cheap way to surface a recent-error tally on the
dashboard. Called by the observability routes
(``web/observability_routes.py``). Returns ``0`` when Redis is unavailable or
the search fails.
Args:
since_ts (float): Inclusive lower-bound epoch timestamp.
Returns:
int: The number of matching ``http_error`` events.
"""
if _redis is None:
return 0
from init_redis_indexes import OBS_INDEX_NAME
from redis.commands.search.query import Query
try:
q = Query(
f"{HTTP_ERROR_SEARCH_QUERY} @timestamp:[{since_ts} +inf]",
)
q.paging(0, 0)
result = await _redis.ft(OBS_INDEX_NAME).search(q)
return int(getattr(result, "total", 0))
except Exception:
logger.debug("count_http_errors_since failed", exc_info=True)
return 0
[docs]
async def aggregate_http_errors_by_status(since_ts: float) -> list[dict[str, Any]]:
"""Break recent HTTP errors down into per-status-code counts.
Runs an ``FT.AGGREGATE`` over ``idx:observability`` filtered by
:data:`HTTP_ERROR_SEARCH_QUERY` and a ``@timestamp:[since_ts +inf]`` range,
grouping by ``@http_status`` with a ``COUNT`` reducer, then defensively
decodes the aggregation rows (which may arrive as flat key/value lists or
dicts, with bytes or str) into clean integer pairs sorted by status code.
This feeds the dashboard's error-by-status breakdown alongside
:func:`count_http_errors_since`. Called by the observability routes
(``web/observability_routes.py``). Returns an empty list when Redis is
unavailable or the aggregation fails.
Args:
since_ts (float): Inclusive lower-bound epoch timestamp.
Returns:
list[dict[str, Any]]: One ``{"http_status": int, "count": int}`` entry
per observed status code, ascending by status.
"""
if _redis is None:
return []
from init_redis_indexes import OBS_INDEX_NAME
from redis.commands.search.aggregation import AggregateRequest
from redis.commands.search import reducers
try:
req = AggregateRequest(
f"{HTTP_ERROR_SEARCH_QUERY} @timestamp:[{since_ts} +inf]"
).group_by("@http_status", reducers.count().alias("count"))
result = await _redis.ft(OBS_INDEX_NAME).aggregate(req)
rows: list[dict[str, Any]] = []
for row in result.rows:
# Each row is a list of key-value pairs like [b'http_status', b'404', b'count', b'3']
d: dict[str, str] = {}
if isinstance(row, (list, tuple)):
it = iter(row)
for k in it:
v = next(it, None)
ks = k.decode() if isinstance(k, bytes) else str(k)
vs = (
v.decode()
if isinstance(v, bytes)
else str(v) if v is not None else "0"
)
d[ks] = vs
elif isinstance(row, dict):
d = {
(k.decode() if isinstance(k, bytes) else str(k)): (
v.decode() if isinstance(v, bytes) else str(v)
)
for k, v in row.items()
}
try:
rows.append(
{
"http_status": int(d.get("http_status", 0)),
"count": int(d.get("count", 0)),
}
)
except (ValueError, TypeError):
continue
rows.sort(key=lambda r: r["http_status"])
return rows
except Exception:
logger.debug("aggregate_http_errors_by_status failed", exc_info=True)
return []
[docs]
async def get_recent_events(
category: str = "tools",
limit: int = 100,
) -> list[dict[str, Any]]:
"""Fetch the most recent events for a coarse category (compat shim).
A backward-compatible convenience that maps a category name (``tools``,
``responses`` or ``background``) to the corresponding ``@event_type`` tag
query — or ``*`` for anything else — and delegates to
:func:`search_observability_events`, sourcing data from RediSearch rather
than the legacy sorted-set storage that predated it. No internal callers
remain in this repository; it is retained for older dashboard/API consumers.
Args:
category (str): One of ``tools``, ``responses``, ``background``; any
other value matches all event types.
limit (int): Maximum number of events to return.
Returns:
list[dict[str, Any]]: The matching event documents, newest first.
"""
qmap = {
"tools": "@event_type:{tool_call}",
"responses": "@event_type:{response_phase}",
"background": "@event_type:{background_task}",
}
q = qmap.get(category, "*")
docs, _ = await search_observability_events(q, limit=limit, offset=0)
return docs
# ---------------------------------------------------------------------------
# obs_debug store — 1-hour forensic event bus
# ---------------------------------------------------------------------------
OBS_DEBUG_TTL = 3600 # 1 hour
OBS_DEBUG_PUBSUB_CHANNEL = "stargazer:obs_debug"
[docs]
async def publish_debug_event(
event_type: str,
subsystem: str,
*,
platform: str = "",
channel_id: str = "",
hub_id: str = "",
user_id: str = "",
request_id: str = "",
run_id: str = "",
phase: str = "",
status: str = "ok",
preview: str = "",
llm_output: str = "",
duration_ms: float = 0.0,
token_est: int = 0,
payload: dict[str, Any] | None = None,
) -> None:
"""Persist and broadcast one short-lived forensic debug event.
The debug-tier analogue of :func:`publish_observability_event`, backing the
1-hour forensic event bus. It builds a hash matching the ``idx:obs_debug``
schema (every ``TAG`` field via :func:`_tag_val`, ``preview`` clamped by
:func:`_truncate_preview`), writes it to ``obs_debug:{uid}`` with a
:data:`OBS_DEBUG_TTL` expiry, and publishes the wire form on
:data:`OBS_DEBUG_PUBSUB_CHANNEL` (``stargazer:obs_debug``) via a single Redis
pipeline. The large ``llm_output`` is stored in the hash for forensic
retrieval but intentionally kept out of the indexed and pub/sub fields to
avoid index bloat and wasted bandwidth. Read back by
:func:`search_debug_events`. Called broadly across the codebase
(``anamnesis_engine.py``, ``task_manager.py``, ``proactive_triage.py``,
``background_tasks.py``) and internally by :class:`Observability` and
:class:`Timer`; failures are logged at debug and never raised.
Args:
event_type (str): Debug event type tag.
subsystem (str): Subsystem that produced the event.
platform (str): Originating platform, if any.
channel_id (str): Originating channel id, if any.
hub_id (str): Originating hub id, if any.
user_id (str): Originating user id, if any.
request_id (str): Per-turn correlation id, if any.
run_id (str): Run/job correlation id, if any.
phase (str): Lifecycle phase label, if any.
status (str): Outcome status (default ``ok``).
preview (str): Human-readable preview (truncated when stored).
llm_output (str): Optional large LLM output, stored but not indexed.
duration_ms (float): Elapsed time in milliseconds.
token_est (int): Estimated token count, if relevant.
payload (dict[str, Any] | None): Structured payload stored as JSON.
Returns:
None.
"""
if _redis is None:
return None
ts = time.time()
uid = uuid.uuid4().hex
key = f"obs_debug:{uid}"
# Fields that ARE indexed by idx:obs_debug (all must match the schema)
mapping: dict[str, Any] = {
"event_type": _tag_val(event_type),
"subsystem": _tag_val(subsystem),
"platform": _tag_val(platform),
"channel_id": _tag_val(channel_id),
"hub_id": _tag_val(hub_id),
"user_id": _tag_val(user_id),
"request_id": _tag_val(request_id),
"run_id": _tag_val(run_id),
"phase": _tag_val(phase),
"status": _tag_val(status),
"preview": _truncate_preview(preview, 2000),
"timestamp": str(ts),
"duration_ms": str(round(duration_ms, 3)),
"token_est": str(int(token_est)),
# Stored-only (not FT-indexed):
"llm_output": llm_output or "",
"payload_json": json.dumps(payload or {}, default=str),
}
# Wire payload for pub/sub (omit llm_output from live stream to save bandwidth)
wire: dict[str, Any] = {
"event_type": event_type,
"subsystem": subsystem,
"platform": platform,
"channel_id": channel_id,
"hub_id": hub_id,
"user_id": user_id,
"request_id": request_id,
"run_id": run_id,
"phase": phase,
"status": status,
"preview": mapping["preview"],
"timestamp": ts,
"duration_ms": round(duration_ms, 3),
"token_est": int(token_est),
"payload": payload or {},
"doc_id": key,
}
try:
pipe = _redis.pipeline()
pipe.hset(key, mapping=mapping)
pipe.expire(key, OBS_DEBUG_TTL)
pipe.publish(OBS_DEBUG_PUBSUB_CHANNEL, json.dumps(wire, default=str))
await pipe.execute()
except Exception:
logger.debug("publish_debug_event failed", exc_info=True)
return None
[docs]
async def search_debug_events(
query: str,
*,
limit: int = 50,
offset: int = 0,
) -> list[dict[str, Any]]:
"""Query the ``idx:obs_debug`` index and hydrate full forensic docs.
The read side of :func:`publish_debug_event`. It runs an ``FT.SEARCH``
(sorted by ``timestamp`` descending, paged and clamped to 500) and then
pipelines an ``HGETALL`` per hit so each returned row carries not just the
indexed fields but also the stored-only ``llm_output`` and parsed
``payload``, decoded via :func:`_hdecode` with numerics coerced. This is what
lets the debug dashboard replay the large LLM output that was deliberately
excluded from the index. Called extensively by the observability web layer
(``web/obs.py``) for history, per-request and per-run forensic views. Returns
an empty list when Redis is unavailable, the index module is not yet
importable, or the search fails.
Args:
query (str): A RediSearch query string against ``idx:obs_debug``.
limit (int): Page size (clamped to 1..500).
offset (int): Page offset.
Returns:
list[dict[str, Any]]: Hydrated debug-event dicts (indexed fields plus
``llm_output`` and ``payload``), newest first.
"""
if _redis is None:
return []
try:
from init_redis_indexes import OBS_DEBUG_INDEX_NAME # type: ignore[import]
from redis.commands.search.query import Query
except ImportError:
logger.debug("search_debug_events: init_redis_indexes not ready")
return []
try:
q = Query(query)
q.sort_by("timestamp", asc=False)
q.paging(offset, max(1, min(limit, 500)))
q.dialect(2)
result = await _redis.ft(OBS_DEBUG_INDEX_NAME).search(q)
except Exception:
logger.debug("search_debug_events FT.SEARCH failed", exc_info=True)
return []
if not result.docs:
return []
# Hydrate llm_output + payload_json via pipeline HGETALL
pipe = _redis.pipeline()
for doc in result.docs:
pipe.hgetall(doc.id)
try:
raw_results = await pipe.execute()
except Exception:
logger.debug("search_debug_events pipeline execute failed", exc_info=True)
raw_results = [{} for _ in result.docs]
out: list[dict[str, Any]] = []
for doc, raw_h in zip(result.docs, raw_results):
row: dict[str, Any] = {"doc_id": doc.id}
try:
hdata = _hdecode(raw_h) if raw_h else {}
# Parse payload_json
pj = hdata.pop("payload_json", "{}")
try:
row["payload"] = json.loads(pj)
except Exception:
row["payload"] = {}
# Merge all remaining HASH fields (includes llm_output)
row.update(hdata)
# Coerce numerics
if "timestamp" in row:
row["timestamp"] = float(row["timestamp"])
if "duration_ms" in row:
row["duration_ms"] = float(row["duration_ms"])
if "token_est" in row:
row["token_est"] = int(row["token_est"])
except Exception:
logger.debug("search_debug_events doc hydrate failed", exc_info=True)
out.append(row)
return out
# ---------------------------------------------------------------------------
# proxy_tel store — 7-day durable telemetry for gemini-cli-proxy events
# ---------------------------------------------------------------------------
PROXY_TELEMETRY_TTL = 7 * 86400 # 604 800 seconds
PROXY_TELEMETRY_KEY_PREFIX = "proxy_tel:"
[docs]
async def publish_proxy_telemetry_event(event: dict[str, Any]) -> str | None:
"""Persist one gemini-cli-proxy telemetry event for durable inspection.
Ingests a raw telemetry object emitted by the proxy's ``telemetry.ts`` and
stores it as a hash under ``proxy_tel:{uuid}``
(:data:`PROXY_TELEMETRY_KEY_PREFIX`) with a 7-day
:data:`PROXY_TELEMETRY_TTL` expiry via a Redis pipeline. It normalises a few
fields (event type, request id, model, account, HTTP status) for cheap
in-process filtering and tolerantly parses the timestamp from a numeric or
ISO-8601 ``ts``/``timestamp`` value (falling back to now), while preserving
the entire original object in ``payload_json`` for forensic retrieval. Unlike
the main observability events, these are not RediSearch-indexed and are read
back by scanning in :func:`fetch_proxy_telemetry_events`. Called by the
observability web layer (``web/obs.py``) as proxy telemetry is received.
Args:
event (dict[str, Any]): The raw telemetry object from the proxy.
Returns:
str | None: The ``proxy_tel:{uuid}`` key on success, or ``None`` when
Redis is unavailable or the write failed.
"""
if _redis is None:
return None
raw_ts = event.get("ts") or event.get("timestamp")
try:
ts = float(raw_ts)
except (TypeError, ValueError):
if isinstance(raw_ts, str) and "T" in raw_ts:
from datetime import datetime
try:
ts = datetime.fromisoformat(raw_ts.replace("Z", "+00:00")).timestamp()
except Exception:
ts = time.time()
else:
ts = time.time()
key = f"{PROXY_TELEMETRY_KEY_PREFIX}{uuid.uuid4().hex}"
mapping: dict[str, str] = {
"event_type": str(event.get("type") or event.get("event_type") or "-"),
"req_id": str(event.get("req_id") or "-"),
"model": str(event.get("model") or "-"),
"account": str(event.get("account") or "-"),
"http_status": str(int(event.get("http_status") or event.get("status") or 0)),
"timestamp": str(ts),
"payload_json": json.dumps(event, default=str),
}
try:
pipe = _redis.pipeline()
pipe.hset(key, mapping=mapping)
pipe.expire(key, PROXY_TELEMETRY_TTL)
await pipe.execute()
except Exception:
logger.debug("publish_proxy_telemetry_event failed", exc_info=True)
return None
return key
[docs]
async def fetch_proxy_telemetry_events(
*,
event_type: str = "",
model: str = "",
req_id: str = "",
limit: int = 200,
offset: int = 0,
) -> tuple[list[dict[str, Any]], int]:
"""Return paginated gemini-cli-proxy telemetry rows, newest first.
The read side of :func:`publish_proxy_telemetry_event`. Because these records
are not RediSearch-indexed, it performs a full ``SCAN`` of
``proxy_tel:*`` keys, pipelines an ``HGETALL`` for each, applies optional
in-process filters, sorts by ``timestamp`` descending and pages the result.
Each row exposes the normalised columns plus the parsed full ``payload``.
Called by the observability web layer (``web/obs.py``) to render the proxy
telemetry view. Returns empty when Redis is unavailable or no keys exist.
Args:
event_type (str): Optional exact event-type filter.
model (str): Optional case-insensitive substring model filter.
req_id (str): Optional exact request-id filter.
limit (int): Page size.
offset (int): Page offset.
Returns:
tuple[list[dict[str, Any]], int]: ``(rows, total_matching)`` for the
requested page after filtering and sorting.
"""
if _redis is None:
return [], 0
keys: list[str] = []
cursor = 0
while True:
cursor, batch = await _redis.scan(
cursor, match=f"{PROXY_TELEMETRY_KEY_PREFIX}*", count=500
)
keys.extend(batch)
if cursor == 0:
break
if not keys:
return [], 0
pipe = _redis.pipeline()
for k in keys:
pipe.hgetall(k)
raw_results = await pipe.execute()
rows: list[dict[str, Any]] = []
for key, raw in zip(keys, raw_results):
if not raw:
continue
try:
r: dict[str, str] = {
(k.decode("utf-8") if isinstance(k, bytes) else k): (
v.decode("utf-8") if isinstance(v, bytes) else v
)
for k, v in raw.items()
}
except Exception:
continue
# Apply optional filters
if event_type and r.get("event_type", "-") != event_type:
continue
if model and model.lower() not in r.get("model", "").lower():
continue
if req_id and r.get("req_id", "-") != req_id:
continue
try:
payload = json.loads(r.get("payload_json", "{}") or "{}")
except Exception:
payload = {}
rows.append(
{
"doc_id": key,
"event_type": r.get("event_type", ""),
"req_id": r.get("req_id", ""),
"model": r.get("model", ""),
"account": r.get("account", ""),
"http_status": int(r.get("http_status", 0) or 0),
"timestamp": float(r.get("timestamp", 0) or 0),
"payload": payload,
}
)
rows.sort(key=lambda x: x["timestamp"], reverse=True)
total = len(rows)
return rows[offset : offset + limit], total
import asyncio
import contextlib
[docs]
class Observability:
"""Centralized, ergonomic facade for emitting metrics, timers and alerts.
A thin synchronous-friendly client that callers reach through the
module-level :data:`observability` singleton. Its methods log immediately and
then best-effort fan the event into the forensic ``obs_debug`` store by
scheduling :func:`publish_debug_event` as a fire-and-forget task on the
running event loop (silently skipped when no loop is running), so call sites
never have to ``await`` telemetry. It exposes :meth:`increment` for counters,
:meth:`timer` for timing via the :class:`Timer` context manager/decorator,
and :meth:`alert` for critical alerts. Used pervasively across the codebase
(e.g. ``prompt_renderer.py``, ``prompt_context.py``, ``background_tasks.py``,
``message_processor/processor.py``, ``limbic_system/coordinator.py``).
"""
[docs]
def increment(self, metric_name: str, tags: dict[str, str] = None) -> None:
"""Increment a named counter metric, logging and best-effort persisting it.
Emits a debug log line and, when a Redis client is attached and an event
loop is running, schedules a fire-and-forget ``metric_increment`` debug
event so the counter shows up in the forensic ``obs_debug`` store. If no
loop is running (e.g. called from sync startup code) the persistence is
silently skipped.
Interacts with :func:`publish_debug_event` via
``loop.create_task(...)`` (writing ``obs_debug:{uid}`` and publishing on
``stargazer:obs_debug``). Called widely across the codebase through the
module-level ``observability`` singleton, e.g. ``prompt_renderer.py``,
``prompt_context.py``, ``background_tasks.py`` and ``kg_consolidation.py``.
Args:
metric_name: Name of the counter metric to increment.
tags: Optional dimension tags attached to the metric event.
Returns:
None.
"""
logger.debug("[METRIC] Increment: %s tags=%s", metric_name, tags)
if _redis:
try:
loop = asyncio.get_running_loop()
loop.create_task(
publish_debug_event(
event_type="metric_increment",
subsystem="observability",
payload={"metric": metric_name, "tags": tags},
)
)
except RuntimeError:
pass
[docs]
def timer(self, metric_name: str, subsystem: str = "observability") -> Timer:
"""Create a :class:`Timer` for *metric_name*, usable as a context manager or decorator.
The returned :class:`Timer` measures elapsed wall-clock time and, on
exit, best-effort publishes a ``timer`` debug event tagged with
*subsystem*.
Constructs and returns a :class:`Timer` instance; no I/O happens here.
Called via the ``observability`` singleton both as ``with
observability.timer(...)`` (e.g. ``anamnesis_engine.py``,
``parallax_engine.py``, ``message_processor/processor.py``) and as a
decorator ``@observability.timer(...)`` (e.g. ``limbic_system/
coordinator.py``, ``message_processor/processor.py``).
Args:
metric_name: Name of the timing metric to record.
subsystem: Subsystem label stored on the emitted timer event.
Returns:
Timer: A new timer bound to *metric_name* and *subsystem*.
"""
return Timer(metric_name, subsystem)
[docs]
def alert(self, severity: str, message: str, metadata: dict = None) -> None:
"""Raise a critical alert, logging a warning and best-effort persisting it.
Logs the alert at WARNING level and, when a Redis client is attached and
an event loop is running, schedules a fire-and-forget ``alert`` debug
event (with ``status`` set to the lowercased severity and a 200-char
message preview) into the forensic ``obs_debug`` store. Persistence is
skipped silently when no event loop is running.
Interacts with :func:`publish_debug_event` via
``loop.create_task(...)`` (writing ``obs_debug:{uid}`` and publishing on
``stargazer:obs_debug``). Called through the ``observability`` singleton
from validation/guard paths such as ``config.py``, ``prompt_context.py``,
``wallet_key_utils.py`` and ``message_processor/processor.py``.
Args:
severity: Alert severity label (also used, lowercased, as the event
``status``).
message: Human-readable alert message (preview truncated to 200
chars when stored).
metadata: Optional structured context attached to the event payload.
Returns:
None.
"""
logger.warning(
"[ALERT] Severity: %s - Message: %s metadata=%s",
severity,
message,
metadata,
)
if _redis:
try:
loop = asyncio.get_running_loop()
loop.create_task(
publish_debug_event(
event_type="alert",
subsystem="observability",
status=severity.lower(),
preview=message[:200],
payload=metadata,
)
)
except RuntimeError:
pass
[docs]
class Timer:
"""A dual-purpose wall-clock timer usable as a context manager or decorator.
Measures elapsed time with ``time.perf_counter`` and, when the timed region
finishes, logs the duration and best-effort emits a ``timer`` debug event via
:func:`publish_debug_event` (fire-and-forget on the running loop). The same
instance works as ``with observability.timer(...):`` (see :meth:`__enter__`
/:meth:`__exit__`) and as ``@observability.timer(...)`` (see
:meth:`__call__`, which detects sync vs. async callables); it is constructed
by :meth:`Observability.timer`. Used across hot paths such as
``anamnesis_engine.py``, ``parallax_engine.py``,
``message_processor/processor.py`` and ``limbic_system/coordinator.py``.
"""
[docs]
def __init__(self, metric_name: str, subsystem: str = "observability"):
"""Initialise the timer with a metric name and subsystem label.
Stores configuration only; timing does not begin until the timer is
entered as a context manager or the wrapped function is invoked.
Constructed by :meth:`Observability.timer`.
Args:
metric_name: Name of the timing metric this timer records.
subsystem: Subsystem label stored on emitted timer events.
"""
self.metric_name = metric_name
self.subsystem = subsystem
self.start_time = None
[docs]
def __enter__(self) -> Timer:
"""Start the high-resolution clock and return self for ``with`` blocks.
Records ``time.perf_counter()`` into ``self.start_time``. Invoked
implicitly by the Python runtime when the timer is used as
``with observability.timer(...):``.
Returns:
Timer: This timer instance.
"""
self.start_time = time.perf_counter()
return self
[docs]
def __exit__(self, exc_type, exc_val, exc_tb) -> bool:
"""Compute the elapsed duration on context exit and emit a timer event.
Logs the elapsed milliseconds and, when a Redis client is attached and
an event loop is running, schedules a fire-and-forget ``timer`` debug
event via :func:`publish_debug_event` (writing ``obs_debug:{uid}`` and
publishing on ``stargazer:obs_debug``). Always returns ``False`` so any
exception raised inside the ``with`` block propagates normally. Invoked
implicitly by the runtime when the ``with`` block ends.
Args:
exc_type: Exception type if the block raised, else ``None``.
exc_val: Exception instance if the block raised, else ``None``.
exc_tb: Traceback if the block raised, else ``None``.
Returns:
bool: ``False`` (do not suppress exceptions).
"""
duration_ms = (time.perf_counter() - self.start_time) * 1000.0
logger.debug("[TIMER] %s took %.2f ms", self.metric_name, duration_ms)
if _redis:
try:
loop = asyncio.get_running_loop()
loop.create_task(
publish_debug_event(
event_type="timer",
subsystem=self.subsystem,
duration_ms=duration_ms,
preview=f"[TIMER] {self.metric_name} took {duration_ms:.1f}ms",
payload={"metric": self.metric_name},
)
)
except RuntimeError:
pass
return False
[docs]
def __call__(self, func):
"""Wrap *func* as a decorator that times each invocation.
Detects whether *func* is a coroutine function and returns either an
async or sync wrapper that measures the call's wall-clock duration and
emits a ``timer`` debug event afterward (see the nested wrappers).
Invoked implicitly when the timer is used as a decorator, e.g.
``@observability.timer("message_processing_total", subsystem=...)`` in
``message_processor/processor.py`` and ``limbic_system/coordinator.py``.
Args:
func: The sync or async callable to wrap with timing.
Returns:
Callable: A ``functools.wraps``-preserving wrapper around *func*.
"""
if inspect.iscoroutinefunction(func):
@functools.wraps(func)
async def async_wrapper(*args, **kwargs):
"""Await *func*, then record its duration as a timer event.
Times the awaited call with ``time.perf_counter`` and, in a
``finally`` block, best-effort publishes a ``timer`` debug event
via :func:`publish_debug_event`. The wrapped result is returned
unchanged.
"""
start_time = time.perf_counter()
try:
return await func(*args, **kwargs)
finally:
duration_ms = (time.perf_counter() - start_time) * 1000.0
logger.debug("[TIMER] %s took %.2f ms", self.metric_name, duration_ms)
if _redis:
try:
loop = asyncio.get_running_loop()
loop.create_task(
publish_debug_event(
event_type="timer",
subsystem=self.subsystem,
duration_ms=duration_ms,
preview=f"[TIMER] {self.metric_name} took {duration_ms:.1f}ms",
payload={"metric": self.metric_name},
)
)
except RuntimeError:
pass
return async_wrapper
else:
@functools.wraps(func)
def sync_wrapper(*args, **kwargs):
"""Call *func*, then record its duration as a timer event.
Times the synchronous call with ``time.perf_counter`` and, in a
``finally`` block, best-effort publishes a ``timer`` debug event
via :func:`publish_debug_event`. The wrapped result is returned
unchanged.
"""
start_time = time.perf_counter()
try:
return func(*args, **kwargs)
finally:
duration_ms = (time.perf_counter() - start_time) * 1000.0
logger.debug("[TIMER] %s took %.2f ms", self.metric_name, duration_ms)
if _redis:
try:
loop = asyncio.get_running_loop()
loop.create_task(
publish_debug_event(
event_type="timer",
subsystem=self.subsystem,
duration_ms=duration_ms,
preview=f"[TIMER] {self.metric_name} took {duration_ms:.1f}ms",
payload={"metric": self.metric_name},
)
)
except RuntimeError:
pass
return sync_wrapper
observability = Observability()