"""Dead Letter Queue (DLQ) handling for failed Redis Stream messages.
Provides the parking lot for inbound/outbound envelopes that the stream
consumers could not process after repeated retries, plus the tooling to
inspect and replay them. Failed messages are acknowledged on their source
stream and re-published to the ``sg:stream:dlq`` stream (capped at 5000
entries) so they are removed from the live pipeline without being lost,
and operators can later resurrect them via :func:`replay_dlq_entry`.
The functions here are the seam between the consumers in
``core/stream_consumer.py`` and ``core/outbound_consumer.py`` (which call
:func:`handle_failed_message` from their error paths) and the operational
``scripts/dlq_replay.py`` CLI (which drives :func:`replay_dlq_entry`).
All payloads are msgpack-framed by ``core.serialization``.
"""
import logging
from typing import Any
from redis.asyncio import Redis
from core.serialization import deserialize_stream_payload
logger = logging.getLogger("stargazer.dlq")
MAX_RETRIES = 3
DLQ_STREAM = "sg:stream:dlq"
[docs]
async def handle_failed_message(
redis: Redis,
source_stream: str,
group_name: str,
msg_id: str,
raw: dict[Any, Any],
error: Exception,
attempt: int,
) -> None:
"""Move a message to the DLQ after MAX_RETRIES failures.
If attempt < MAX_RETRIES, the message stays in the PEL for
XAUTOCLAIM to reassign. If attempt >= MAX_RETRIES, we XACK
the original and XADD to the DLQ.
"""
payload_bytes = extract_stream_payload_bytes(raw)
aux = extract_stream_aux_fields(raw)
if attempt >= MAX_RETRIES:
logger.warning(
"Moving message to DLQ after %d failures",
attempt,
extra={
"stream_msg_id": msg_id,
"source_stream": source_stream,
"error": str(error),
},
)
await redis.xadd(
DLQ_STREAM,
{
"original_stream": source_stream,
"original_msg_id": msg_id,
"error": str(error),
"attempt": str(attempt),
"data": payload_bytes,
"ts": aux.get("ts", ""),
"trace_id": aux.get("trace_id", ""),
},
maxlen=5000,
approximate=True,
)
await redis.xack(source_stream, group_name, msg_id)
else:
logger.info(
"Message processing failed (attempt %d/%d), leaving in PEL for retry",
attempt,
MAX_RETRIES,
extra={"stream_msg_id": msg_id},
)
[docs]
async def inspect_dlq_entry(redis: Redis, msg_id: str) -> dict[str, Any] | None:
"""Fetch and decode a single DLQ entry into an operator-friendly dict.
Reads one entry from the ``sg:stream:dlq`` stream by exact id (via an
``XRANGE`` bounded to ``msg_id``..``msg_id``) and unpacks both the failure
metadata that :func:`handle_failed_message` recorded -- the originating
stream, original message id, error string, and attempt count -- and, when a
body is present, the msgpack payload itself through
``core.serialization.deserialize_stream_payload``. A payload that fails to
deserialize is logged at debug and reported as ``None`` rather than
aborting, so a poison message can still be inspected.
Issues one read against Redis and performs no writes. No internal callers
were found by grep; this is an inspection/diagnostic helper intended for
DLQ tooling and interactive use.
Args:
redis: Async Redis client connected to the stream backend.
msg_id: The DLQ stream entry id to look up.
Returns:
A dict with ``dlq_msg_id``, ``original_stream``, ``original_msg_id``,
``error``, ``attempt``, and the decoded ``payload`` (possibly ``None``),
or ``None`` when no entry exists at that id.
"""
rows = await redis.xrange(DLQ_STREAM, msg_id, msg_id)
if not rows:
return None
_mid, raw = rows[0]
payload: dict[str, Any] | None = None
if extract_stream_payload_bytes(raw):
try:
payload = deserialize_stream_payload(raw)
except Exception:
logger.debug("DLQ entry %s could not be deserialized", msg_id, exc_info=True)
orig_stream = raw.get(b"original_stream") or raw.get("original_stream")
orig_msg = raw.get(b"original_msg_id") or raw.get("original_msg_id")
err = raw.get(b"error") or raw.get("error")
attempt = raw.get(b"attempt") or raw.get("attempt")
return {
"dlq_msg_id": _mid.decode() if isinstance(_mid, bytes) else str(_mid),
"original_stream": (
orig_stream.decode() if isinstance(orig_stream, bytes) else orig_stream
),
"original_msg_id": (
orig_msg.decode() if isinstance(orig_msg, bytes) else orig_msg
),
"error": err.decode() if isinstance(err, bytes) else err,
"attempt": attempt.decode() if isinstance(attempt, bytes) else attempt,
"payload": payload,
}
[docs]
async def replay_dlq_entry(
redis: Redis,
dlq_msg_id: str,
*,
target_stream: str | None = None,
migration_fn: Any | None = None,
) -> str | None:
"""Resurrect one DLQ entry back onto a live stream, optionally migrating it.
Reads the entry by exact id from ``sg:stream:dlq``, and re-publishes its
msgpack body (with the preserved ``ts`` and ``trace_id`` sidecars) onto a
target stream via ``XADD``, then deletes the DLQ row with ``XDEL`` so a
successful replay is not double-counted. The destination defaults to the
``original_stream`` captured at park time but can be overridden via
``target_stream`` (used to redirect to a different stream). When
``migration_fn`` is supplied, the payload is msgpack-unpacked, passed
through that callable, and re-packed before publishing, which lets an
operator reshape an envelope to a newer schema during replay. An empty
payload or a failing migration is logged and aborts the replay (returns
``None``) without touching the streams.
Performs an ``XRANGE`` read plus, on success, an ``XADD`` and an ``XDEL``
against Redis. Called by ``scripts/dlq_replay.py`` (the operator CLI that
replays entries one-by-one or in bulk) and exercised by the migration tests
in ``tests/core/migration/test_dlq_replay.py`` and
``tests/core/test_context_assembly_hardening.py``.
Args:
redis: Async Redis client connected to the stream backend.
dlq_msg_id: The DLQ stream entry id to replay.
target_stream: Optional override destination stream; when ``None`` the
entry returns to the stream it originally failed on.
migration_fn: Optional callable applied to the unpacked payload to
transform it before re-publishing.
Returns:
The new stream message id of the re-published entry, or ``None`` when
the entry is missing, has an empty payload, or migration failed.
"""
rows = await redis.xrange(DLQ_STREAM, dlq_msg_id, dlq_msg_id)
if not rows:
return None
_, raw = rows[0]
data_bytes = extract_stream_payload_bytes(raw)
if not data_bytes:
logger.warning("DLQ entry %s has empty payload — cannot replay", dlq_msg_id)
return None
if migration_fn:
try:
import msgpack
payload = msgpack.unpackb(data_bytes, raw=False)
migrated = migration_fn(payload)
data_bytes = msgpack.packb(migrated, use_bin_type=True)
except Exception as e:
logger.error("Migration function failed for %s: %s", dlq_msg_id, e)
return None
stream = target_stream
if not stream:
orig = raw.get(b"original_stream") or raw.get("original_stream")
stream = orig.decode() if isinstance(orig, bytes) else str(orig)
ts_val = raw.get(b"ts") or raw.get("ts") or b""
trace_val = raw.get(b"trace_id") or raw.get("trace_id") or b""
if isinstance(ts_val, str):
ts_val = ts_val.encode("utf-8")
if isinstance(trace_val, str):
trace_val = trace_val.encode("utf-8")
new_id = await redis.xadd(
stream,
{b"data": data_bytes, b"ts": ts_val, b"trace_id": trace_val},
)
await redis.xdel(DLQ_STREAM, dlq_msg_id)
return new_id.decode() if isinstance(new_id, bytes) else str(new_id)