Source code for core.trace

"""Distributed trace IDs and orphaned-trace cleanup.

Provides time-sortable UUIDv7 trace IDs (:func:`generate_trace_id`,
with fallbacks for older Python) that flow from gateway ingress
through inference to egress, helpers to create and track traces in
Redis (the ``sg:trace:*`` keyspace) under an absolute failsafe TTL,
and a :class:`TraceWatchdog` that sweeps traces abandoned by a
crashed worker so they don't leak.
"""

import time
import uuid
import logging

try:
    import uuid6
except ImportError:
    uuid6 = None

logger = logging.getLogger(__name__)

ABSOLUTE_FAILSAFE_TTL = 86400  # 24 hours

[docs] def generate_trace_id() -> str: """Generate a time-sortable, globally unique UUIDv7 trace ID. Produces the correlation ID that follows a request from gateway ingress through inference to egress; using UUIDv7 means the IDs sort chronologically, which makes the ``sg:trace:*`` keyspace easy to scan in order. It prefers the native ``uuid.uuid7`` (Python 3.14+), falls back to the ``uuid6`` third-party package's ``uuid7`` when that is missing, and finally degrades to a random ``uuid4`` if neither is available -- losing time-ordering but never failing. Pure computation with no Redis or other I/O. Called at request ingress and exercised by ``tests/core/migration/test_trace.py``. Returns: The trace ID as a string. """ try: # Try native uuid.uuid7 if available (Python 3.14+) return str(uuid.uuid7()) except AttributeError: if uuid6: return str(uuid6.uuid7()) # Fallback to uuid4 if uuid6 is missing return str(uuid.uuid4())
[docs] async def create_trace(redis, trace_id: str, initial_state: str, metadata: dict): """Register a new trace in Redis with an absolute failsafe TTL. Records the start of a traced request so its lifecycle can be tracked and so abandoned traces can later be swept. In a single Redis pipeline it runs an ``HSET`` on the ``sg:trace:{trace_id}`` hash with the initial state, a ``created_at`` timestamp, and any caller metadata; sets a 24-hour ``ABSOLUTE_FAILSAFE_TTL`` on that key so the trace is garbage-collected even if :class:`TraceWatchdog` never runs; and adds the id to the ``sg:traces:active`` index the watchdog scans via ``SADD``. All four operations are pipelined and executed atomically against Redis. Called at request ingress (where traces are first created) and exercised by ``tests/core/migration/test_trace.py``. Args: redis: Async Redis client used for the pipelined ``HSET`` / ``EXPIRE`` / ``SADD``. trace_id: Trace correlation ID, used to build the ``sg:trace:{id}`` key. initial_state: Starting lifecycle state stored under the ``state`` field (e.g. ``"RECEIVED"``). metadata: Extra fields merged into the trace hash (e.g. platform, channel). """ pipe = redis.pipeline() mapping = { "state": initial_state, "created_at": str(time.time()), **metadata, } pipe.hset(f"sg:trace:{trace_id}", mapping=mapping) # Absolute failsafe — even if watchdog dies, # this trace will be garbage collected within 24 hours. pipe.expire(f"sg:trace:{trace_id}", ABSOLUTE_FAILSAFE_TTL) # Add to active trace index pipe.sadd("sg:traces:active", trace_id) await pipe.execute()
[docs] class TraceWatchdog: """Sweeps orphaned traces every 5 minutes. Runs under TaskSupervisor leader election to ensure exactly one instance is active cluster-wide. """ SWEEP_INTERVAL = 300 # 5 minutes ORPHAN_THRESHOLD = 1800 # 30 minutes
[docs] async def sweep(self, redis): """Mark traces abandoned by a crashed worker as ERRORED and retire them. One pass of the watchdog's reaper. It reads the ``sg:traces:active`` set and, for each id, fetches ``created_at`` from the ``sg:trace:{id}`` hash: a missing hash means the 24-hour failsafe already expired the key, so the stale id is simply ``SREM``-ed from the active set. For traces older than ``ORPHAN_THRESHOLD`` (30 minutes) whose ``state`` is neither ``COMPLETED`` nor ``ERRORED``, it runs an ``HSET`` to set the state to ``ERRORED``, applies a one-hour terminal TTL, removes the id from the active set, and logs a ``trace_orphaned`` warning -- preventing leaks from requests whose worker died mid-flight. Reads and mutates the ``sg:trace:*`` keyspace and the active index in Redis. Driven on the ``SWEEP_INTERVAL`` cadence by the watchdog's supervised loop (and called directly in ``tests/core/migration/test_trace.py``). Args: redis: Async Redis client used for ``SMEMBERS`` / ``HGET`` / ``HSET`` / ``EXPIRE`` / ``SREM`` against the trace keyspace. """ active = await redis.smembers("sg:traces:active") now = time.time() for trace_id_bytes in active: trace_id = trace_id_bytes.decode() created = await redis.hget(f"sg:trace:{trace_id}", "created_at") if created is None: # Trace key was already expired by the 24h failsafe await redis.srem("sg:traces:active", trace_id) continue age = now - float(created) if age > self.ORPHAN_THRESHOLD: state = await redis.hget(f"sg:trace:{trace_id}", "state") if state and state.decode() not in ("COMPLETED", "ERRORED"): await redis.hset(f"sg:trace:{trace_id}", "state", "ERRORED") await redis.expire(f"sg:trace:{trace_id}", 3600) # 1h terminal TTL await redis.srem("sg:traces:active", trace_id) logger.warning(f"trace_orphaned trace_id={trace_id} age_s={age}")