core.dlq module

Dead Letter Queue (DLQ) handling for failed Redis Stream messages.

Provides the parking lot for inbound/outbound envelopes that the stream consumers could not process after repeated retries, plus the tooling to inspect and replay them. Failed messages are acknowledged on their source stream and re-published to the sg:stream:dlq stream (capped at 5000 entries) so they are removed from the live pipeline without being lost, and operators can later resurrect them via replay_dlq_entry().

The functions here are the seam between the consumers in core/stream_consumer.py and core/outbound_consumer.py (which call handle_failed_message() from their error paths) and the operational scripts/dlq_replay.py CLI (which drives replay_dlq_entry()). All payloads are msgpack-framed by core.serialization.

core.dlq.extract_stream_payload_bytes(raw)[source]

Extract the raw msgpack-framed payload from a Redis Stream entry.

Pulls the data field out of a single XRANGE/XREADGROUP entry, tolerating both the bytes-keyed (b"data") and string-keyed ("data") forms that redis.asyncio may return depending on decode settings, and coercing a string value back to utf-8 bytes. This keeps the rest of the DLQ code agnostic to how the connection was configured. A missing payload yields b"" rather than raising, which callers treat as an empty entry.

A pure helper that touches no Redis or other I/O. Called by handle_failed_message(), inspect_dlq_entry(), and replay_dlq_entry() in this module, and exercised directly by tests/core/test_context_assembly_hardening.py.

Parameters:

raw (dict[Any, Any]) – The field/value mapping of one stream entry, keyed by either bytes or str.

Return type:

bytes

Returns:

The msgpack-encoded payload bytes, or b"" when no data field is present.

core.dlq.extract_stream_aux_fields(raw)[source]

Extract the ts and trace_id sidecar fields from a stream entry.

Collects the non-payload metadata that travels alongside the msgpack body so it can be carried forward when a message is moved to the DLQ, preserving the original enqueue timestamp and the cross-service trace id used for grep-based correlation across the gateway / inference / agents pipeline. Both bytes- and str-keyed variants are probed, missing values are skipped, and every surviving value is normalised to a str (decoding bytes with errors="replace").

A pure helper with no I/O. Called by handle_failed_message() to populate the ts and trace_id fields of the DLQ record.

Parameters:

raw (dict[Any, Any]) – The field/value mapping of one stream entry, keyed by either bytes or str.

Return type:

dict[str, str]

Returns:

A mapping of the present auxiliary field names to their string values; empty when neither field is set.

async core.dlq.handle_failed_message(redis, source_stream, group_name, msg_id, raw, error, attempt)[source]

Move a message to the DLQ after MAX_RETRIES failures.

If attempt < MAX_RETRIES, the message stays in the PEL for XAUTOCLAIM to reassign. If attempt >= MAX_RETRIES, we XACK the original and XADD to the DLQ.

Return type:

None

Parameters:
async core.dlq.inspect_dlq_entry(redis, msg_id)[source]

Fetch and decode a single DLQ entry into an operator-friendly dict.

Reads one entry from the sg:stream:dlq stream by exact id (via an XRANGE bounded to msg_id..``msg_id``) and unpacks both the failure metadata that handle_failed_message() recorded – the originating stream, original message id, error string, and attempt count – and, when a body is present, the msgpack payload itself through core.serialization.deserialize_stream_payload. A payload that fails to deserialize is logged at debug and reported as None rather than aborting, so a poison message can still be inspected.

Issues one read against Redis and performs no writes. No internal callers were found by grep; this is an inspection/diagnostic helper intended for DLQ tooling and interactive use.

Parameters:
  • redis (Redis) – Async Redis client connected to the stream backend.

  • msg_id (str) – The DLQ stream entry id to look up.

Return type:

dict[str, Any] | None

Returns:

A dict with dlq_msg_id, original_stream, original_msg_id, error, attempt, and the decoded payload (possibly None), or None when no entry exists at that id.

async core.dlq.replay_dlq_entry(redis, dlq_msg_id, *, target_stream=None, migration_fn=None)[source]

Resurrect one DLQ entry back onto a live stream, optionally migrating it.

Reads the entry by exact id from sg:stream:dlq, and re-publishes its msgpack body (with the preserved ts and trace_id sidecars) onto a target stream via XADD, then deletes the DLQ row with XDEL so a successful replay is not double-counted. The destination defaults to the original_stream captured at park time but can be overridden via target_stream (used to redirect to a different stream). When migration_fn is supplied, the payload is msgpack-unpacked, passed through that callable, and re-packed before publishing, which lets an operator reshape an envelope to a newer schema during replay. An empty payload or a failing migration is logged and aborts the replay (returns None) without touching the streams.

Performs an XRANGE read plus, on success, an XADD and an XDEL against Redis. Called by scripts/dlq_replay.py (the operator CLI that replays entries one-by-one or in bulk) and exercised by the migration tests in tests/core/migration/test_dlq_replay.py and tests/core/test_context_assembly_hardening.py.

Parameters:
  • redis (Redis) – Async Redis client connected to the stream backend.

  • dlq_msg_id (str) – The DLQ stream entry id to replay.

  • target_stream (str | None) – Optional override destination stream; when None the entry returns to the stream it originally failed on.

  • migration_fn (Any | None) – Optional callable applied to the unpacked payload to transform it before re-publishing.

Return type:

str | None

Returns:

The new stream message id of the re-published entry, or None when the entry is missing, has an empty payload, or migration failed.