"""Distributed trace IDs and orphaned-trace cleanup.
Provides time-sortable UUIDv7 trace IDs (:func:`generate_trace_id`,
with fallbacks for older Python) that flow from gateway ingress
through inference to egress, helpers to create and track traces in
Redis (the ``sg:trace:*`` keyspace) under an absolute failsafe TTL,
and a :class:`TraceWatchdog` that sweeps traces abandoned by a
crashed worker so they don't leak.
"""
import time
import uuid
import logging
try:
import uuid6
except ImportError:
uuid6 = None
logger = logging.getLogger(__name__)
ABSOLUTE_FAILSAFE_TTL = 86400 # 24 hours
[docs]
def generate_trace_id() -> str:
"""Generate a time-sortable, globally unique UUIDv7 trace ID.
Produces the correlation ID that follows a request from gateway ingress
through inference to egress; using UUIDv7 means the IDs sort chronologically,
which makes the ``sg:trace:*`` keyspace easy to scan in order. It prefers the
native ``uuid.uuid7`` (Python 3.14+), falls back to the ``uuid6`` third-party
package's ``uuid7`` when that is missing, and finally degrades to a random
``uuid4`` if neither is available -- losing time-ordering but never failing.
Pure computation with no Redis or other I/O. Called at request ingress and
exercised by ``tests/core/migration/test_trace.py``.
Returns:
The trace ID as a string.
"""
try:
# Try native uuid.uuid7 if available (Python 3.14+)
return str(uuid.uuid7())
except AttributeError:
if uuid6:
return str(uuid6.uuid7())
# Fallback to uuid4 if uuid6 is missing
return str(uuid.uuid4())
[docs]
async def create_trace(redis, trace_id: str, initial_state: str, metadata: dict):
"""Register a new trace in Redis with an absolute failsafe TTL.
Records the start of a traced request so its lifecycle can be tracked and so
abandoned traces can later be swept. In a single Redis pipeline it runs an
``HSET`` on the ``sg:trace:{trace_id}`` hash with the initial state, a
``created_at`` timestamp, and any caller metadata; sets a 24-hour
``ABSOLUTE_FAILSAFE_TTL`` on that key so the trace is garbage-collected even
if :class:`TraceWatchdog` never runs; and adds the id to the
``sg:traces:active`` index the watchdog scans via ``SADD``. All four operations are
pipelined and executed atomically against Redis. Called at request ingress
(where traces are first created) and exercised by
``tests/core/migration/test_trace.py``.
Args:
redis: Async Redis client used for the pipelined ``HSET`` / ``EXPIRE`` /
``SADD``.
trace_id: Trace correlation ID, used to build the ``sg:trace:{id}`` key.
initial_state: Starting lifecycle state stored under the ``state`` field
(e.g. ``"RECEIVED"``).
metadata: Extra fields merged into the trace hash (e.g. platform, channel).
"""
pipe = redis.pipeline()
mapping = {
"state": initial_state,
"created_at": str(time.time()),
**metadata,
}
pipe.hset(f"sg:trace:{trace_id}", mapping=mapping)
# Absolute failsafe — even if watchdog dies,
# this trace will be garbage collected within 24 hours.
pipe.expire(f"sg:trace:{trace_id}", ABSOLUTE_FAILSAFE_TTL)
# Add to active trace index
pipe.sadd("sg:traces:active", trace_id)
await pipe.execute()
[docs]
class TraceWatchdog:
"""Sweeps orphaned traces every 5 minutes.
Runs under TaskSupervisor leader election to ensure exactly
one instance is active cluster-wide.
"""
SWEEP_INTERVAL = 300 # 5 minutes
ORPHAN_THRESHOLD = 1800 # 30 minutes
[docs]
async def sweep(self, redis):
"""Mark traces abandoned by a crashed worker as ERRORED and retire them.
One pass of the watchdog's reaper. It reads the ``sg:traces:active`` set
and, for each id, fetches ``created_at`` from the ``sg:trace:{id}`` hash:
a missing hash means the 24-hour failsafe already expired the key, so the
stale id is simply ``SREM``-ed from the active set. For traces older than
``ORPHAN_THRESHOLD`` (30 minutes) whose ``state`` is neither
``COMPLETED`` nor ``ERRORED``, it runs an ``HSET`` to set the state to ``ERRORED``,
applies a one-hour terminal TTL, removes the id from the active set, and
logs a ``trace_orphaned`` warning -- preventing leaks from requests whose
worker died mid-flight. Reads and mutates the ``sg:trace:*`` keyspace and
the active index in Redis. Driven on the ``SWEEP_INTERVAL`` cadence by the
watchdog's supervised loop (and called directly in
``tests/core/migration/test_trace.py``).
Args:
redis: Async Redis client used for ``SMEMBERS`` / ``HGET`` / ``HSET`` /
``EXPIRE`` / ``SREM`` against the trace keyspace.
"""
active = await redis.smembers("sg:traces:active")
now = time.time()
for trace_id_bytes in active:
trace_id = trace_id_bytes.decode()
created = await redis.hget(f"sg:trace:{trace_id}", "created_at")
if created is None:
# Trace key was already expired by the 24h failsafe
await redis.srem("sg:traces:active", trace_id)
continue
age = now - float(created)
if age > self.ORPHAN_THRESHOLD:
state = await redis.hget(f"sg:trace:{trace_id}", "state")
if state and state.decode() not in ("COMPLETED", "ERRORED"):
await redis.hset(f"sg:trace:{trace_id}", "state", "ERRORED")
await redis.expire(f"sg:trace:{trace_id}", 3600) # 1h terminal TTL
await redis.srem("sg:traces:active", trace_id)
logger.warning(f"trace_orphaned trace_id={trace_id} age_s={age}")