core.trace module
Distributed trace IDs and orphaned-trace cleanup.
Provides time-sortable UUIDv7 trace IDs (generate_trace_id(),
with fallbacks for older Python) that flow from gateway ingress
through inference to egress, helpers to create and track traces in
Redis (the sg:trace:* keyspace) under an absolute failsafe TTL,
and a TraceWatchdog that sweeps traces abandoned by a
crashed worker so they don’t leak.
- core.trace.generate_trace_id()[source]
Generate a time-sortable, globally unique UUIDv7 trace ID.
Produces the correlation ID that follows a request from gateway ingress through inference to egress; using UUIDv7 means the IDs sort chronologically, which makes the
sg:trace:*keyspace easy to scan in order. It prefers the nativeuuid.uuid7(Python 3.14+), falls back to theuuid6third-party package’suuid7when that is missing, and finally degrades to a randomuuid4if neither is available – losing time-ordering but never failing. Pure computation with no Redis or other I/O. Called at request ingress and exercised bytests/core/migration/test_trace.py.- Return type:
- Returns:
The trace ID as a string.
- async core.trace.create_trace(redis, trace_id, initial_state, metadata)[source]
Register a new trace in Redis with an absolute failsafe TTL.
Records the start of a traced request so its lifecycle can be tracked and so abandoned traces can later be swept. In a single Redis pipeline it runs an
HSETon thesg:trace:{trace_id}hash with the initial state, acreated_attimestamp, and any caller metadata; sets a 24-hourABSOLUTE_FAILSAFE_TTLon that key so the trace is garbage-collected even ifTraceWatchdognever runs; and adds the id to thesg:traces:activeindex the watchdog scans viaSADD. All four operations are pipelined and executed atomically against Redis. Called at request ingress (where traces are first created) and exercised bytests/core/migration/test_trace.py.- Parameters:
redis – Async Redis client used for the pipelined
HSET/EXPIRE/SADD.trace_id (
str) – Trace correlation ID, used to build thesg:trace:{id}key.initial_state (
str) – Starting lifecycle state stored under thestatefield (e.g."RECEIVED").metadata (
dict) – Extra fields merged into the trace hash (e.g. platform, channel).
- class core.trace.TraceWatchdog[source]
Bases:
objectSweeps orphaned traces every 5 minutes.
Runs under TaskSupervisor leader election to ensure exactly one instance is active cluster-wide.
- SWEEP_INTERVAL = 300
- ORPHAN_THRESHOLD = 1800
- async sweep(redis)[source]
Mark traces abandoned by a crashed worker as ERRORED and retire them.
One pass of the watchdog’s reaper. It reads the
sg:traces:activeset and, for each id, fetchescreated_atfrom thesg:trace:{id}hash: a missing hash means the 24-hour failsafe already expired the key, so the stale id is simplySREM-ed from the active set. For traces older thanORPHAN_THRESHOLD(30 minutes) whosestateis neitherCOMPLETEDnorERRORED, it runs anHSETto set the state toERRORED, applies a one-hour terminal TTL, removes the id from the active set, and logs atrace_orphanedwarning – preventing leaks from requests whose worker died mid-flight. Reads and mutates thesg:trace:*keyspace and the active index in Redis. Driven on theSWEEP_INTERVALcadence by the watchdog’s supervised loop (and called directly intests/core/migration/test_trace.py).- Parameters:
redis – Async Redis client used for
SMEMBERS/HGET/HSET/EXPIRE/SREMagainst the trace keyspace.