observability

Unified observability events: Redis HASH + RediSearch (idx:observability) + pub/sub.

Each event is stored as obs:{uuid} with fields aligned to the RediSearch index, then published on stargazer:observability for live admin dashboards.

observability.set_observability_redis(client)[source]

Attach the shared async Redis client used for all observability storage.

Sets the module-level _redis handle that every publish_*, fetch_* and search_* function in this module reads; until it is set those calls short-circuit and become no-ops (returning None or empty results). This is the wiring step each microservice performs at startup so observability events can be persisted and fanned out. Called from the service entrypoints agents_main.py, inference_main.py, consolidation_main.py and web_main.py, and re-bound by web/observability_routes.py.

Parameters:

client (aioredis.Redis | None) – The shared async Redis client, or None to detach and disable observability persistence.

Return type:

None

observability.get_http_call_origin()[source]

Identify the application frame that triggered an outbound HTTP call.

Walks the live call stack and returns file:lineno in function for the first frame that is not part of the HTTP/observability plumbing, skipping openrouter_client, this module, the embedding pools, and stdlib/test machinery (importlib, contextlib, asyncio, pytest). This gives HTTP error telemetry a meaningful “who made this call” attribution instead of pointing at the transport layer. Pure stack inspection with no I/O; any exception is swallowed and "unknown" returned. Called by the OpenRouter transport (openrouter_client/transport.py) and the embedding pool (gemini_embed_pool.py) when recording HTTP failures.

Returns:

A "basename:lineno in function" origin string, or "unknown" if no suitable frame is found or inspection fails.

Return type:

str

observability.generate_request_id()[source]

Mint a fresh correlation id for a single bot turn.

Produces a short req_{hex} token that is threaded through one turn’s observability events (inbound message, LLM request, tool calls, response phases) so they can be stitched together in the dashboard. Pure id generation with no I/O. Called by the message processor (message_processor/processor.py), which stashes the value under observability_request_id on the message for downstream emitters.

Returns:

A new req_ correlation id.

Return type:

str

async observability.publish_observability_event(*, event_type, platform='', channel_id='', user_id='', tool_name='', request_id='', preview='', timestamp=None, http_status=0, http_service='', payload=None)[source]

Persist and broadcast one observability event — the core emitter.

Builds a flat hash for the RediSearch index (every TAG field passed through _tag_val(), preview clamped by _truncate_preview(), structured payload JSON-encoded), writes it to obs:{uuid}, sets a TTL_SECONDS expiry, and publishes the full JSON wire form on the OBSERVABILITY_CHANNEL (stargazer:observability) pub/sub channel for live admin dashboards — all in a single Redis pipeline. The HSET fields align with the idx:observability schema so events are searchable by search_observability_events(). This is the shared backend that every typed publish_* helper in this module delegates to (publish_tool_event(), publish_response_event(), publish_http_error_event(), etc.); it returns None and does nothing when Redis is unattached or the pipeline raises (failures are logged at debug and never propagate).

Parameters:
  • event_type (str) – Event type tag (e.g. tool_call, http_error).

  • platform (str) – Originating platform, if any.

  • channel_id (str) – Originating channel/conversation id, if any.

  • user_id (str) – Originating user id, if any.

  • tool_name (str) – Tool name, for tool events.

  • request_id (str) – Per-turn correlation id.

  • preview (str) – Human-readable preview (truncated when stored).

  • timestamp (float | None) – Event time; defaults to now when None.

  • http_status (int) – HTTP status for HTTP events, else 0.

  • http_service (str) – HTTP service name for HTTP events.

  • payload (dict[str, Any] | None) – Structured payload stored as JSON.

Returns:

The obs:{uuid} Redis key on success, or None when Redis is unavailable or the write failed.

Return type:

str | None

async observability.publish_tool_event(tool_name, arguments=None, result_preview='', duration_ms=0, success=True, user_id='', channel_id='', request_id='', platform='')[source]

Record a single tool invocation as a tool_call observability event.

Builds a compact one-line preview (tool name, success flag, duration, truncated args and result) and a structured payload, then persists and fanouts the event. Used to populate the live admin dashboard’s tool-activity view.

Delegates to publish_observability_event() (writing the obs:{uuid} hash, indexing it under idx:observability and publishing JSON on stargazer:observability); arguments and result_preview are size- clamped via _truncate() and _truncate_preview() so large tool I/O does not bloat the store. Called by the OpenRouter tool executor after each tool finishes (openrouter_client/executor.py around line 799).

Parameters:
  • tool_name (str) – Name of the tool that ran.

  • arguments (dict[str, Any] | None) – The arguments dict the tool was called with (clamped to ~800 chars in the payload), or None.

  • result_preview (str) – A short textual preview of the tool result.

  • duration_ms (float) – Wall-clock execution time in milliseconds.

  • success (bool) – Whether the tool call completed without error.

  • user_id (str) – ID of the user whose turn triggered the tool.

  • channel_id (str) – ID of the originating channel/conversation.

  • request_id (str) – Correlation id for the bot turn.

  • platform (str) – Originating platform (e.g. discord).

Return type:

None

Returns:

None.

async observability.publish_response_event(phase, request_id='', channel_id='', user_id='', model='', duration_ms=0, tool_rounds=0, error='', platform='', **extra)[source]

Record a phase of the response-generation lifecycle as a response_phase event.

Captures milestones of one bot turn (e.g. start/end of LLM generation), including the model used, elapsed time, number of tool rounds and any error, so the dashboard can reconstruct per-turn timing. Arbitrary extra keyword fields are merged verbatim into the payload.

Delegates to publish_observability_event(), persisting the event to obs:{uuid}, indexing it and publishing it on stargazer:observability; error is clamped to 500 chars in the payload (120 in the preview). Called by the message generator at several points (message_processor/generate_and_send.py around lines 1312, 1428 and 1486) to mark response phases.

Parameters:
  • phase (str) – Name of the lifecycle phase (e.g. start, complete).

  • request_id (str) – Correlation id for the bot turn.

  • channel_id (str) – ID of the originating channel/conversation.

  • user_id (str) – ID of the user being responded to.

  • model (str) – Model identifier used for generation.

  • duration_ms (float) – Elapsed time for this phase in milliseconds.

  • tool_rounds (int) – Number of tool-call rounds in the turn so far.

  • error (str) – Error message if the phase failed (truncated when stored).

  • platform (str) – Originating platform.

  • **extra (Any) – Additional key/value fields merged into the event payload.

Return type:

None

Returns:

None.

async observability.publish_background_event(task_name, status='running', task_id='', progress=None, result='', error='', **extra)[source]

Record the state of a background task as a background_task event.

Reports task lifecycle (running/done/failed), optional progress fraction, and truncated result/error text for the admin dashboard’s background-task view. If no task_id is supplied a short random hex id is generated so successive updates for the same task can be correlated by the caller.

Delegates to publish_observability_event(), writing the obs:{uuid} hash, indexing under idx:observability and publishing on stargazer:observability; result is clamped to 1000 chars and error to 500. No internal callers were found for this function in the repo (it is a public helper available for background workers to call).

Parameters:
  • task_name (str) – Human-readable name of the background task.

  • status (str) – Task status string (default running).

  • task_id (str) – Stable id for the task; auto-generated when empty.

  • progress (float | None) – Optional completion fraction (0.0–1.0); omitted when None.

  • result (str) – Optional result text (truncated when stored).

  • error (str) – Optional error text (truncated when stored).

  • **extra (Any) – Additional key/value fields merged into the event payload.

Return type:

None

Returns:

None.

async observability.publish_classifier_event(*, request_id='', channel_id='', user_id='', platform='', strategy='', tools=None, top_matches=None, phase='user_message')[source]

Record a classifier tool-selection decision as a classifier_selection event.

Captures which routing strategy fired, the list of tools the classifier selected, and the top scoring vector matches, so the dashboard can audit how tools were chosen for a turn. Lists are capped (tools to 256, top matches to 20) before storage.

Delegates to publish_observability_event(), persisting to obs:{uuid}, indexing under idx:observability and publishing on stargazer:observability. Called by the vector tool classifier after it resolves the tool set (classifiers/vector_classifier.py around line 57).

Parameters:
  • request_id (str) – Correlation id for the bot turn.

  • channel_id (str) – ID of the originating channel/conversation.

  • user_id (str) – ID of the user being classified for.

  • platform (str) – Originating platform.

  • strategy (str) – Name of the classification strategy/path taken.

  • tools (list[str] | None) – Tool names selected by the classifier (capped at 256).

  • top_matches (list[dict[str, Any]] | None) – Per-tool scoring dicts for the best matches (capped at 20).

  • phase (str) – Classification phase label (default user_message).

Return type:

None

Returns:

None.

async observability.publish_embedding_event(*, source, model, duration_ms, success, text_len=0, batch_size=0, request_id='', channel_id='', user_id='', platform='', error='')[source]

Record one embedding-API call as an embedding_api observability event.

Tracks latency, success, text/batch sizes and the model for each embedding request, giving the dashboard visibility into embedding throughput and failures. The error text is clamped to 500 chars in the payload.

Delegates to publish_observability_event(), writing obs:{uuid}, indexing under idx:observability and publishing on stargazer:observability. Called from the OpenRouter embeddings transport around several call sites in openrouter_client/transport.py (lines 534, 603, 629 and 839), covering single and batched embedding requests.

Parameters:
  • source (str) – Logical source/caller of the embedding request.

  • model (str) – Embedding model identifier.

  • duration_ms (float) – Wall-clock duration of the call in milliseconds.

  • success (bool) – Whether the embedding call succeeded.

  • text_len (int) – Character length of the embedded text.

  • batch_size (int) – Number of items in the batch (0 for single).

  • request_id (str) – Correlation id for the bot turn, if any.

  • channel_id (str) – Originating channel id, if any.

  • user_id (str) – Originating user id, if any.

  • platform (str) – Originating platform, if any.

  • error (str) – Error message when the call failed (truncated when stored).

Return type:

None

Returns:

None.

async observability.publish_message_observability_event(*, kind, platform, channel_id, user_id, text_preview, request_id='', message_id='')[source]

Record an inbound or outbound chat message as a message_in/message_out event.

Logs the text preview of a user message or bot reply so the dashboard can show a live conversation timeline. The event type is chosen from kind: user_in maps to message_in; anything else maps to message_out.

Delegates to publish_observability_event(), persisting to obs:{uuid}, indexing under idx:observability and publishing on stargazer:observability; text_preview is clamped by _truncate_preview(). Called when ingesting a user message (message_processor/processor.py around line 2067) and after sending a reply (message_processor/generate_and_send.py around line 2549).

Parameters:
  • kind (str) – user_in for inbound, otherwise treated as outbound.

  • platform (str) – Originating platform (e.g. discord).

  • channel_id (str) – ID of the channel/conversation.

  • user_id (str) – ID of the user.

  • text_preview (str) – Message text (truncated when stored).

  • request_id (str) – Correlation id for the bot turn, if any.

  • message_id (str) – Platform message id, stored in the payload.

Return type:

None

Returns:

None.

async observability.publish_http_error_event(*, http_service, http_status=0, request_id='', channel_id='', user_id='', platform='', duration_ms=0, endpoint='', detail='', error_kind='')[source]

Record an HTTP/API failure as an http_error observability event.

Captures the failing service, HTTP status (0 for transport-level failures), endpoint, latency and a detail/error-kind string for every outbound HTTP error across the system. These events back the dashboard’s error views and the count_http_errors_since() / aggregate_http_errors_by_status() queries via the HTTP_ERROR_SEARCH_QUERY constant.

Delegates to publish_observability_event(), writing obs:{uuid} with http_status/http_service indexed, and publishing on stargazer:observability; endpoint is clamped to 500 chars and detail to 800. Called from many HTTP clients/tools across the repo, including anamnesis_engine.py, response_postprocessor.py, gemini_embed_pool.py, rag_system/file_rag_manager.py, tools/brave_search.py and message_processor/openrouter_structured_preprocess.py.

Parameters:
  • http_service (str) – Name of the service/API that failed.

  • http_status (int) – HTTP status code, or 0 for transport errors.

  • request_id (str) – Correlation id for the bot turn, if any.

  • channel_id (str) – Originating channel id, if any.

  • user_id (str) – Originating user id, if any.

  • platform (str) – Originating platform, if any.

  • duration_ms (float) – Latency until failure in milliseconds.

  • endpoint (str) – The URL/endpoint that was called (truncated when stored).

  • detail (str) – Free-form error detail (truncated when stored).

  • error_kind (str) – Optional error category; added to payload when set.

Return type:

None

Returns:

None.

async observability.publish_llm_request_event(*, request_id='', channel_id='', user_id='', platform='', model='', messages=None, tool_names=None, response_text='', error='', http_status=0, duration_ms=0.0, timestamp=None)[source]

Persist one full LLM request/response record for debugging.

Captures the heavyweight detail of a single model call — the complete messages list, tool names offered, the response text, error, HTTP status and latency — and writes it as a hash under llm_req:{uuid} (LLM_REQUEST_KEY_PREFIX) with a short LLM_REQUEST_TTL (30-minute) expiry. It is deliberately kept out of the main idx:observability index and instead indexed by idx:llm_req, because the payloads are large and only useful for forensic replay; the messages list is JSON-encoded defensively. Written via a Redis pipeline (HSET + EXPIRE); returns None and logs at debug if Redis is unattached or the write fails. Called by the OpenRouter transport (openrouter_client/transport.py) after each model round-trip, and read back by fetch_llm_requests().

Parameters:
  • request_id (str) – Per-turn correlation id.

  • channel_id (str) – Originating channel id.

  • user_id (str) – Originating user id.

  • platform (str) – Originating platform.

  • model (str) – Model identifier used for the call.

  • messages (list[dict] | None) – The full request message list.

  • tool_names (list[str] | None) – Names of tools offered to the model.

  • response_text (str) – The model’s response text.

  • error (str) – Error message if the call failed.

  • http_status (int) – HTTP status of the call.

  • duration_ms (float) – Wall-clock latency in milliseconds.

  • timestamp (float | None) – Event time; defaults to now when None.

Returns:

The llm_req:{uuid} key on success, or None when Redis is unavailable or the write failed.

Return type:

str | None

async observability.fetch_llm_requests(*, channel_id='', model='', limit=50, offset=0)[source]

Return paginated LLM request records (newest first) for the debug UI.

The read side of publish_llm_request_event(). It prefers an FT.SEARCH against the idx:llm_req index (escaping channel_id and model filter values and sorting by timestamp descending), then hydrates each matching doc with a pipelined HGETALL and decodes the stored fields (parsing tool_names JSON, coercing numerics). If the index is not yet available — e.g. during warm-up — it falls back to a full SCAN of llm_req:* keys with in-process filtering and sorting. Called by the observability web layer (web/obs.py) to drive the LLM-requests debugging view.

Parameters:
  • channel_id (str) – Optional exact channel filter.

  • model (str) – Optional model filter (exact tag in FT mode, substring in the SCAN fallback).

  • limit (int) – Page size (clamped to 1..500 in the FT path).

  • offset (int) – Page offset.

Returns:

(rows, total_matching) where rows are the hydrated request records for this page.

Return type:

tuple[list[dict], int]

async observability.search_observability_events(query, *, limit=50, offset=0, sort_desc=True)[source]

Query the idx:observability index and hydrate matching event docs.

Runs an FT.SEARCH for the given RediSearch query (optionally sorted by timestamp descending, paged and clamped to at most 500), then pipelines an HGETALL per hit to rebuild each event dict — decoding fields via _hdecode(), parsing the structured payload_json and coercing timestamp/http_status. This is the generic dashboard query used to read back events written by publish_observability_event(). Called by the observability routes (web/observability_routes.py) and internally by get_recent_events(). All failures are logged at debug and yield empty results rather than raising.

Parameters:
  • query (str) – A RediSearch query string against idx:observability.

  • limit (int) – Page size (clamped to 1..500).

  • offset (int) – Page offset.

  • sort_desc (bool) – Sort by timestamp descending when True.

Returns:

(documents, total_hits) where each document includes doc_id, the indexed fields and a parsed payload; empty when Redis is unavailable or the search fails.

Return type:

tuple[list[dict[str, Any]], int]

async observability.count_http_errors_since(since_ts)[source]

Count http_error events recorded at or after a cutoff timestamp.

Issues a zero-row FT.SEARCH (paging(0, 0)) against idx:observability combining the HTTP_ERROR_SEARCH_QUERY tag filter with a @timestamp:[since_ts +inf] range and returns only the total hit count, making it a cheap way to surface a recent-error tally on the dashboard. Called by the observability routes (web/observability_routes.py). Returns 0 when Redis is unavailable or the search fails.

Parameters:

since_ts (float) – Inclusive lower-bound epoch timestamp.

Returns:

The number of matching http_error events.

Return type:

int

async observability.aggregate_http_errors_by_status(since_ts)[source]

Break recent HTTP errors down into per-status-code counts.

Runs an FT.AGGREGATE over idx:observability filtered by HTTP_ERROR_SEARCH_QUERY and a @timestamp:[since_ts +inf] range, grouping by @http_status with a COUNT reducer, then defensively decodes the aggregation rows (which may arrive as flat key/value lists or dicts, with bytes or str) into clean integer pairs sorted by status code. This feeds the dashboard’s error-by-status breakdown alongside count_http_errors_since(). Called by the observability routes (web/observability_routes.py). Returns an empty list when Redis is unavailable or the aggregation fails.

Parameters:

since_ts (float) – Inclusive lower-bound epoch timestamp.

Returns:

One {"http_status": int, "count": int} entry per observed status code, ascending by status.

Return type:

list[dict[str, Any]]

async observability.get_recent_events(category='tools', limit=100)[source]

Fetch the most recent events for a coarse category (compat shim).

A backward-compatible convenience that maps a category name (tools, responses or background) to the corresponding @event_type tag query — or * for anything else — and delegates to search_observability_events(), sourcing data from RediSearch rather than the legacy sorted-set storage that predated it. No internal callers remain in this repository; it is retained for older dashboard/API consumers.

Parameters:
  • category (str) – One of tools, responses, background; any other value matches all event types.

  • limit (int) – Maximum number of events to return.

Returns:

The matching event documents, newest first.

Return type:

list[dict[str, Any]]

async observability.publish_debug_event(event_type, subsystem, *, platform='', channel_id='', hub_id='', user_id='', request_id='', run_id='', phase='', status='ok', preview='', llm_output='', duration_ms=0.0, token_est=0, payload=None)[source]

Persist and broadcast one short-lived forensic debug event.

The debug-tier analogue of publish_observability_event(), backing the 1-hour forensic event bus. It builds a hash matching the idx:obs_debug schema (every TAG field via _tag_val(), preview clamped by _truncate_preview()), writes it to obs_debug:{uid} with a OBS_DEBUG_TTL expiry, and publishes the wire form on OBS_DEBUG_PUBSUB_CHANNEL (stargazer:obs_debug) via a single Redis pipeline. The large llm_output is stored in the hash for forensic retrieval but intentionally kept out of the indexed and pub/sub fields to avoid index bloat and wasted bandwidth. Read back by search_debug_events(). Called broadly across the codebase (anamnesis_engine.py, task_manager.py, proactive_triage.py, background_tasks.py) and internally by Observability and Timer; failures are logged at debug and never raised.

Parameters:
  • event_type (str) – Debug event type tag.

  • subsystem (str) – Subsystem that produced the event.

  • platform (str) – Originating platform, if any.

  • channel_id (str) – Originating channel id, if any.

  • hub_id (str) – Originating hub id, if any.

  • user_id (str) – Originating user id, if any.

  • request_id (str) – Per-turn correlation id, if any.

  • run_id (str) – Run/job correlation id, if any.

  • phase (str) – Lifecycle phase label, if any.

  • status (str) – Outcome status (default ok).

  • preview (str) – Human-readable preview (truncated when stored).

  • llm_output (str) – Optional large LLM output, stored but not indexed.

  • duration_ms (float) – Elapsed time in milliseconds.

  • token_est (int) – Estimated token count, if relevant.

  • payload (dict[str, Any] | None) – Structured payload stored as JSON.

Return type:

None

Returns:

None.

async observability.search_debug_events(query, *, limit=50, offset=0)[source]

Query the idx:obs_debug index and hydrate full forensic docs.

The read side of publish_debug_event(). It runs an FT.SEARCH (sorted by timestamp descending, paged and clamped to 500) and then pipelines an HGETALL per hit so each returned row carries not just the indexed fields but also the stored-only llm_output and parsed payload, decoded via _hdecode() with numerics coerced. This is what lets the debug dashboard replay the large LLM output that was deliberately excluded from the index. Called extensively by the observability web layer (web/obs.py) for history, per-request and per-run forensic views. Returns an empty list when Redis is unavailable, the index module is not yet importable, or the search fails.

Parameters:
  • query (str) – A RediSearch query string against idx:obs_debug.

  • limit (int) – Page size (clamped to 1..500).

  • offset (int) – Page offset.

Returns:

Hydrated debug-event dicts (indexed fields plus llm_output and payload), newest first.

Return type:

list[dict[str, Any]]

async observability.publish_proxy_telemetry_event(event)[source]

Persist one gemini-cli-proxy telemetry event for durable inspection.

Ingests a raw telemetry object emitted by the proxy’s telemetry.ts and stores it as a hash under proxy_tel:{uuid} (PROXY_TELEMETRY_KEY_PREFIX) with a 7-day PROXY_TELEMETRY_TTL expiry via a Redis pipeline. It normalises a few fields (event type, request id, model, account, HTTP status) for cheap in-process filtering and tolerantly parses the timestamp from a numeric or ISO-8601 ts/timestamp value (falling back to now), while preserving the entire original object in payload_json for forensic retrieval. Unlike the main observability events, these are not RediSearch-indexed and are read back by scanning in fetch_proxy_telemetry_events(). Called by the observability web layer (web/obs.py) as proxy telemetry is received.

Parameters:

event (dict[str, Any]) – The raw telemetry object from the proxy.

Returns:

The proxy_tel:{uuid} key on success, or None when Redis is unavailable or the write failed.

Return type:

str | None

async observability.fetch_proxy_telemetry_events(*, event_type='', model='', req_id='', limit=200, offset=0)[source]

Return paginated gemini-cli-proxy telemetry rows, newest first.

The read side of publish_proxy_telemetry_event(). Because these records are not RediSearch-indexed, it performs a full SCAN of proxy_tel:* keys, pipelines an HGETALL for each, applies optional in-process filters, sorts by timestamp descending and pages the result. Each row exposes the normalised columns plus the parsed full payload. Called by the observability web layer (web/obs.py) to render the proxy telemetry view. Returns empty when Redis is unavailable or no keys exist.

Parameters:
  • event_type (str) – Optional exact event-type filter.

  • model (str) – Optional case-insensitive substring model filter.

  • req_id (str) – Optional exact request-id filter.

  • limit (int) – Page size.

  • offset (int) – Page offset.

Returns:

(rows, total_matching) for the requested page after filtering and sorting.

Return type:

tuple[list[dict[str, Any]], int]

class observability.Observability[source]

Bases: object

Centralized, ergonomic facade for emitting metrics, timers and alerts.

A thin synchronous-friendly client that callers reach through the module-level observability singleton. Its methods log immediately and then best-effort fan the event into the forensic obs_debug store by scheduling publish_debug_event() as a fire-and-forget task on the running event loop (silently skipped when no loop is running), so call sites never have to await telemetry. It exposes increment() for counters, timer() for timing via the Timer context manager/decorator, and alert() for critical alerts. Used pervasively across the codebase (e.g. prompt_renderer.py, prompt_context.py, background_tasks.py, message_processor/processor.py, limbic_system/coordinator.py).

increment(metric_name, tags=None)[source]

Increment a named counter metric, logging and best-effort persisting it.

Emits a debug log line and, when a Redis client is attached and an event loop is running, schedules a fire-and-forget metric_increment debug event so the counter shows up in the forensic obs_debug store. If no loop is running (e.g. called from sync startup code) the persistence is silently skipped.

Interacts with publish_debug_event() via loop.create_task(...) (writing obs_debug:{uid} and publishing on stargazer:obs_debug). Called widely across the codebase through the module-level observability singleton, e.g. prompt_renderer.py, prompt_context.py, background_tasks.py and kg_consolidation.py.

Parameters:
  • metric_name (str) – Name of the counter metric to increment.

  • tags (dict[str, str]) – Optional dimension tags attached to the metric event.

Return type:

None

Returns:

None.

timer(metric_name, subsystem='observability')[source]

Create a Timer for metric_name, usable as a context manager or decorator.

The returned Timer measures elapsed wall-clock time and, on exit, best-effort publishes a timer debug event tagged with subsystem.

Constructs and returns a Timer instance; no I/O happens here. Called via the observability singleton both as with observability.timer(...) (e.g. anamnesis_engine.py, parallax_engine.py, message_processor/processor.py) and as a decorator @observability.timer(...) (e.g. limbic_system/ coordinator.py, message_processor/processor.py).

Parameters:
  • metric_name (str) – Name of the timing metric to record.

  • subsystem (str) – Subsystem label stored on the emitted timer event.

Returns:

A new timer bound to metric_name and subsystem.

Return type:

Timer

alert(severity, message, metadata=None)[source]

Raise a critical alert, logging a warning and best-effort persisting it.

Logs the alert at WARNING level and, when a Redis client is attached and an event loop is running, schedules a fire-and-forget alert debug event (with status set to the lowercased severity and a 200-char message preview) into the forensic obs_debug store. Persistence is skipped silently when no event loop is running.

Interacts with publish_debug_event() via loop.create_task(...) (writing obs_debug:{uid} and publishing on stargazer:obs_debug). Called through the observability singleton from validation/guard paths such as config.py, prompt_context.py, wallet_key_utils.py and message_processor/processor.py.

Parameters:
  • severity (str) – Alert severity label (also used, lowercased, as the event status).

  • message (str) – Human-readable alert message (preview truncated to 200 chars when stored).

  • metadata (dict) – Optional structured context attached to the event payload.

Return type:

None

Returns:

None.

class observability.Timer(metric_name, subsystem='observability')[source]

Bases: object

A dual-purpose wall-clock timer usable as a context manager or decorator.

Measures elapsed time with time.perf_counter and, when the timed region finishes, logs the duration and best-effort emits a timer debug event via publish_debug_event() (fire-and-forget on the running loop). The same instance works as with observability.timer(...): (see __enter__() /__exit__()) and as @observability.timer(...) (see __call__(), which detects sync vs. async callables); it is constructed by Observability.timer(). Used across hot paths such as anamnesis_engine.py, parallax_engine.py, message_processor/processor.py and limbic_system/coordinator.py.

Parameters:
  • metric_name (str)

  • subsystem (str)

__init__(metric_name, subsystem='observability')[source]

Initialise the timer with a metric name and subsystem label.

Stores configuration only; timing does not begin until the timer is entered as a context manager or the wrapped function is invoked. Constructed by Observability.timer().

Parameters:
  • metric_name (str) – Name of the timing metric this timer records.

  • subsystem (str) – Subsystem label stored on emitted timer events.

__enter__()[source]

Start the high-resolution clock and return self for with blocks.

Records time.perf_counter() into self.start_time. Invoked implicitly by the Python runtime when the timer is used as with observability.timer(...):.

Returns:

This timer instance.

Return type:

Timer

__exit__(exc_type, exc_val, exc_tb)[source]

Compute the elapsed duration on context exit and emit a timer event.

Logs the elapsed milliseconds and, when a Redis client is attached and an event loop is running, schedules a fire-and-forget timer debug event via publish_debug_event() (writing obs_debug:{uid} and publishing on stargazer:obs_debug). Always returns False so any exception raised inside the with block propagates normally. Invoked implicitly by the runtime when the with block ends.

Parameters:
  • exc_type – Exception type if the block raised, else None.

  • exc_val – Exception instance if the block raised, else None.

  • exc_tb – Traceback if the block raised, else None.

Returns:

False (do not suppress exceptions).

Return type:

bool

__call__(func)[source]

Wrap func as a decorator that times each invocation.

Detects whether func is a coroutine function and returns either an async or sync wrapper that measures the call’s wall-clock duration and emits a timer debug event afterward (see the nested wrappers). Invoked implicitly when the timer is used as a decorator, e.g. @observability.timer("message_processing_total", subsystem=...) in message_processor/processor.py and limbic_system/coordinator.py.

Parameters:

func – The sync or async callable to wrap with timing.

Returns:

A functools.wraps-preserving wrapper around func.

Return type:

Callable