core.trace module

Distributed trace IDs and orphaned-trace cleanup.

Provides time-sortable UUIDv7 trace IDs (generate_trace_id(), with fallbacks for older Python) that flow from gateway ingress through inference to egress, helpers to create and track traces in Redis (the sg:trace:* keyspace) under an absolute failsafe TTL, and a TraceWatchdog that sweeps traces abandoned by a crashed worker so they don’t leak.

core.trace.generate_trace_id()[source]

Generate a time-sortable, globally unique UUIDv7 trace ID.

Produces the correlation ID that follows a request from gateway ingress through inference to egress; using UUIDv7 means the IDs sort chronologically, which makes the sg:trace:* keyspace easy to scan in order. It prefers the native uuid.uuid7 (Python 3.14+), falls back to the uuid6 third-party package’s uuid7 when that is missing, and finally degrades to a random uuid4 if neither is available – losing time-ordering but never failing. Pure computation with no Redis or other I/O. Called at request ingress and exercised by tests/core/migration/test_trace.py.

Return type:

str

Returns:

The trace ID as a string.

async core.trace.create_trace(redis, trace_id, initial_state, metadata)[source]

Register a new trace in Redis with an absolute failsafe TTL.

Records the start of a traced request so its lifecycle can be tracked and so abandoned traces can later be swept. In a single Redis pipeline it runs an HSET on the sg:trace:{trace_id} hash with the initial state, a created_at timestamp, and any caller metadata; sets a 24-hour ABSOLUTE_FAILSAFE_TTL on that key so the trace is garbage-collected even if TraceWatchdog never runs; and adds the id to the sg:traces:active index the watchdog scans via SADD. All four operations are pipelined and executed atomically against Redis. Called at request ingress (where traces are first created) and exercised by tests/core/migration/test_trace.py.

Parameters:
  • redis – Async Redis client used for the pipelined HSET / EXPIRE / SADD.

  • trace_id (str) – Trace correlation ID, used to build the sg:trace:{id} key.

  • initial_state (str) – Starting lifecycle state stored under the state field (e.g. "RECEIVED").

  • metadata (dict) – Extra fields merged into the trace hash (e.g. platform, channel).

class core.trace.TraceWatchdog[source]

Bases: object

Sweeps orphaned traces every 5 minutes.

Runs under TaskSupervisor leader election to ensure exactly one instance is active cluster-wide.

SWEEP_INTERVAL = 300
ORPHAN_THRESHOLD = 1800
async sweep(redis)[source]

Mark traces abandoned by a crashed worker as ERRORED and retire them.

One pass of the watchdog’s reaper. It reads the sg:traces:active set and, for each id, fetches created_at from the sg:trace:{id} hash: a missing hash means the 24-hour failsafe already expired the key, so the stale id is simply SREM-ed from the active set. For traces older than ORPHAN_THRESHOLD (30 minutes) whose state is neither COMPLETED nor ERRORED, it runs an HSET to set the state to ERRORED, applies a one-hour terminal TTL, removes the id from the active set, and logs a trace_orphaned warning – preventing leaks from requests whose worker died mid-flight. Reads and mutates the sg:trace:* keyspace and the active index in Redis. Driven on the SWEEP_INTERVAL cadence by the watchdog’s supervised loop (and called directly in tests/core/migration/test_trace.py).

Parameters:

redis – Async Redis client used for SMEMBERS / HGET / HSET / EXPIRE / SREM against the trace keyspace.