Source code for core.dlq

"""Dead Letter Queue (DLQ) handling for failed Redis Stream messages.

Provides the parking lot for inbound/outbound envelopes that the stream
consumers could not process after repeated retries, plus the tooling to
inspect and replay them. Failed messages are acknowledged on their source
stream and re-published to the ``sg:stream:dlq`` stream (capped at 5000
entries) so they are removed from the live pipeline without being lost,
and operators can later resurrect them via :func:`replay_dlq_entry`.

The functions here are the seam between the consumers in
``core/stream_consumer.py`` and ``core/outbound_consumer.py`` (which call
:func:`handle_failed_message` from their error paths) and the operational
``scripts/dlq_replay.py`` CLI (which drives :func:`replay_dlq_entry`).
All payloads are msgpack-framed by ``core.serialization``.
"""

import logging
from typing import Any

from redis.asyncio import Redis

from core.serialization import deserialize_stream_payload

logger = logging.getLogger("stargazer.dlq")

MAX_RETRIES = 3
DLQ_STREAM = "sg:stream:dlq"


[docs] def extract_stream_payload_bytes(raw: dict[Any, Any]) -> bytes: """Extract the raw msgpack-framed payload from a Redis Stream entry. Pulls the ``data`` field out of a single ``XRANGE``/``XREADGROUP`` entry, tolerating both the bytes-keyed (``b"data"``) and string-keyed (``"data"``) forms that ``redis.asyncio`` may return depending on decode settings, and coercing a string value back to ``utf-8`` bytes. This keeps the rest of the DLQ code agnostic to how the connection was configured. A missing payload yields ``b""`` rather than raising, which callers treat as an empty entry. A pure helper that touches no Redis or other I/O. Called by :func:`handle_failed_message`, :func:`inspect_dlq_entry`, and :func:`replay_dlq_entry` in this module, and exercised directly by ``tests/core/test_context_assembly_hardening.py``. Args: raw: The field/value mapping of one stream entry, keyed by either bytes or str. Returns: The msgpack-encoded payload bytes, or ``b""`` when no ``data`` field is present. """ data = raw.get(b"data") if data is None: data = raw.get("data") if data is None: return b"" if isinstance(data, str): return data.encode("utf-8") return data
[docs] def extract_stream_aux_fields(raw: dict[Any, Any]) -> dict[str, str]: """Extract the ``ts`` and ``trace_id`` sidecar fields from a stream entry. Collects the non-payload metadata that travels alongside the msgpack body so it can be carried forward when a message is moved to the DLQ, preserving the original enqueue timestamp and the cross-service trace id used for grep-based correlation across the gateway / inference / agents pipeline. Both bytes- and str-keyed variants are probed, missing values are skipped, and every surviving value is normalised to a ``str`` (decoding bytes with ``errors="replace"``). A pure helper with no I/O. Called by :func:`handle_failed_message` to populate the ``ts`` and ``trace_id`` fields of the DLQ record. Args: raw: The field/value mapping of one stream entry, keyed by either bytes or str. Returns: A mapping of the present auxiliary field names to their string values; empty when neither field is set. """ out: dict[str, str] = {} for key in (b"ts", "ts", b"trace_id", "trace_id"): if key in raw and raw[key] is not None: val = raw[key] if isinstance(val, bytes): val = val.decode("utf-8", errors="replace") out[str(key.decode() if isinstance(key, bytes) else key)] = str(val) return out
[docs] async def handle_failed_message( redis: Redis, source_stream: str, group_name: str, msg_id: str, raw: dict[Any, Any], error: Exception, attempt: int, ) -> None: """Move a message to the DLQ after MAX_RETRIES failures. If attempt < MAX_RETRIES, the message stays in the PEL for XAUTOCLAIM to reassign. If attempt >= MAX_RETRIES, we XACK the original and XADD to the DLQ. """ payload_bytes = extract_stream_payload_bytes(raw) aux = extract_stream_aux_fields(raw) if attempt >= MAX_RETRIES: logger.warning( "Moving message to DLQ after %d failures", attempt, extra={ "stream_msg_id": msg_id, "source_stream": source_stream, "error": str(error), }, ) await redis.xadd( DLQ_STREAM, { "original_stream": source_stream, "original_msg_id": msg_id, "error": str(error), "attempt": str(attempt), "data": payload_bytes, "ts": aux.get("ts", ""), "trace_id": aux.get("trace_id", ""), }, maxlen=5000, approximate=True, ) await redis.xack(source_stream, group_name, msg_id) else: logger.info( "Message processing failed (attempt %d/%d), leaving in PEL for retry", attempt, MAX_RETRIES, extra={"stream_msg_id": msg_id}, )
[docs] async def inspect_dlq_entry(redis: Redis, msg_id: str) -> dict[str, Any] | None: """Fetch and decode a single DLQ entry into an operator-friendly dict. Reads one entry from the ``sg:stream:dlq`` stream by exact id (via an ``XRANGE`` bounded to ``msg_id``..``msg_id``) and unpacks both the failure metadata that :func:`handle_failed_message` recorded -- the originating stream, original message id, error string, and attempt count -- and, when a body is present, the msgpack payload itself through ``core.serialization.deserialize_stream_payload``. A payload that fails to deserialize is logged at debug and reported as ``None`` rather than aborting, so a poison message can still be inspected. Issues one read against Redis and performs no writes. No internal callers were found by grep; this is an inspection/diagnostic helper intended for DLQ tooling and interactive use. Args: redis: Async Redis client connected to the stream backend. msg_id: The DLQ stream entry id to look up. Returns: A dict with ``dlq_msg_id``, ``original_stream``, ``original_msg_id``, ``error``, ``attempt``, and the decoded ``payload`` (possibly ``None``), or ``None`` when no entry exists at that id. """ rows = await redis.xrange(DLQ_STREAM, msg_id, msg_id) if not rows: return None _mid, raw = rows[0] payload: dict[str, Any] | None = None if extract_stream_payload_bytes(raw): try: payload = deserialize_stream_payload(raw) except Exception: logger.debug("DLQ entry %s could not be deserialized", msg_id, exc_info=True) orig_stream = raw.get(b"original_stream") or raw.get("original_stream") orig_msg = raw.get(b"original_msg_id") or raw.get("original_msg_id") err = raw.get(b"error") or raw.get("error") attempt = raw.get(b"attempt") or raw.get("attempt") return { "dlq_msg_id": _mid.decode() if isinstance(_mid, bytes) else str(_mid), "original_stream": ( orig_stream.decode() if isinstance(orig_stream, bytes) else orig_stream ), "original_msg_id": ( orig_msg.decode() if isinstance(orig_msg, bytes) else orig_msg ), "error": err.decode() if isinstance(err, bytes) else err, "attempt": attempt.decode() if isinstance(attempt, bytes) else attempt, "payload": payload, }
[docs] async def replay_dlq_entry( redis: Redis, dlq_msg_id: str, *, target_stream: str | None = None, migration_fn: Any | None = None, ) -> str | None: """Resurrect one DLQ entry back onto a live stream, optionally migrating it. Reads the entry by exact id from ``sg:stream:dlq``, and re-publishes its msgpack body (with the preserved ``ts`` and ``trace_id`` sidecars) onto a target stream via ``XADD``, then deletes the DLQ row with ``XDEL`` so a successful replay is not double-counted. The destination defaults to the ``original_stream`` captured at park time but can be overridden via ``target_stream`` (used to redirect to a different stream). When ``migration_fn`` is supplied, the payload is msgpack-unpacked, passed through that callable, and re-packed before publishing, which lets an operator reshape an envelope to a newer schema during replay. An empty payload or a failing migration is logged and aborts the replay (returns ``None``) without touching the streams. Performs an ``XRANGE`` read plus, on success, an ``XADD`` and an ``XDEL`` against Redis. Called by ``scripts/dlq_replay.py`` (the operator CLI that replays entries one-by-one or in bulk) and exercised by the migration tests in ``tests/core/migration/test_dlq_replay.py`` and ``tests/core/test_context_assembly_hardening.py``. Args: redis: Async Redis client connected to the stream backend. dlq_msg_id: The DLQ stream entry id to replay. target_stream: Optional override destination stream; when ``None`` the entry returns to the stream it originally failed on. migration_fn: Optional callable applied to the unpacked payload to transform it before re-publishing. Returns: The new stream message id of the re-published entry, or ``None`` when the entry is missing, has an empty payload, or migration failed. """ rows = await redis.xrange(DLQ_STREAM, dlq_msg_id, dlq_msg_id) if not rows: return None _, raw = rows[0] data_bytes = extract_stream_payload_bytes(raw) if not data_bytes: logger.warning("DLQ entry %s has empty payload — cannot replay", dlq_msg_id) return None if migration_fn: try: import msgpack payload = msgpack.unpackb(data_bytes, raw=False) migrated = migration_fn(payload) data_bytes = msgpack.packb(migrated, use_bin_type=True) except Exception as e: logger.error("Migration function failed for %s: %s", dlq_msg_id, e) return None stream = target_stream if not stream: orig = raw.get(b"original_stream") or raw.get("original_stream") stream = orig.decode() if isinstance(orig, bytes) else str(orig) ts_val = raw.get(b"ts") or raw.get("ts") or b"" trace_val = raw.get(b"trace_id") or raw.get("trace_id") or b"" if isinstance(ts_val, str): ts_val = ts_val.encode("utf-8") if isinstance(trace_val, str): trace_val = trace_val.encode("utf-8") new_id = await redis.xadd( stream, {b"data": data_bytes, b"ts": ts_val, b"trace_id": trace_val}, ) await redis.xdel(DLQ_STREAM, dlq_msg_id) return new_id.decode() if isinstance(new_id, bytes) else str(new_id)