core.dlq module
Dead Letter Queue (DLQ) handling for failed Redis Stream messages.
Provides the parking lot for inbound/outbound envelopes that the stream
consumers could not process after repeated retries, plus the tooling to
inspect and replay them. Failed messages are acknowledged on their source
stream and re-published to the sg:stream:dlq stream (capped at 5000
entries) so they are removed from the live pipeline without being lost,
and operators can later resurrect them via replay_dlq_entry().
The functions here are the seam between the consumers in
core/stream_consumer.py and core/outbound_consumer.py (which call
handle_failed_message() from their error paths) and the operational
scripts/dlq_replay.py CLI (which drives replay_dlq_entry()).
All payloads are msgpack-framed by core.serialization.
- core.dlq.extract_stream_payload_bytes(raw)[source]
Extract the raw msgpack-framed payload from a Redis Stream entry.
Pulls the
datafield out of a singleXRANGE/XREADGROUPentry, tolerating both the bytes-keyed (b"data") and string-keyed ("data") forms thatredis.asynciomay return depending on decode settings, and coercing a string value back toutf-8bytes. This keeps the rest of the DLQ code agnostic to how the connection was configured. A missing payload yieldsb""rather than raising, which callers treat as an empty entry.A pure helper that touches no Redis or other I/O. Called by
handle_failed_message(),inspect_dlq_entry(), andreplay_dlq_entry()in this module, and exercised directly bytests/core/test_context_assembly_hardening.py.
- core.dlq.extract_stream_aux_fields(raw)[source]
Extract the
tsandtrace_idsidecar fields from a stream entry.Collects the non-payload metadata that travels alongside the msgpack body so it can be carried forward when a message is moved to the DLQ, preserving the original enqueue timestamp and the cross-service trace id used for grep-based correlation across the gateway / inference / agents pipeline. Both bytes- and str-keyed variants are probed, missing values are skipped, and every surviving value is normalised to a
str(decoding bytes witherrors="replace").A pure helper with no I/O. Called by
handle_failed_message()to populate thetsandtrace_idfields of the DLQ record.
- async core.dlq.handle_failed_message(redis, source_stream, group_name, msg_id, raw, error, attempt)[source]
Move a message to the DLQ after MAX_RETRIES failures.
If attempt < MAX_RETRIES, the message stays in the PEL for XAUTOCLAIM to reassign. If attempt >= MAX_RETRIES, we XACK the original and XADD to the DLQ.
- async core.dlq.inspect_dlq_entry(redis, msg_id)[source]
Fetch and decode a single DLQ entry into an operator-friendly dict.
Reads one entry from the
sg:stream:dlqstream by exact id (via anXRANGEbounded tomsg_id..``msg_id``) and unpacks both the failure metadata thathandle_failed_message()recorded – the originating stream, original message id, error string, and attempt count – and, when a body is present, the msgpack payload itself throughcore.serialization.deserialize_stream_payload. A payload that fails to deserialize is logged at debug and reported asNonerather than aborting, so a poison message can still be inspected.Issues one read against Redis and performs no writes. No internal callers were found by grep; this is an inspection/diagnostic helper intended for DLQ tooling and interactive use.
- Parameters:
redis (
Redis) – Async Redis client connected to the stream backend.msg_id (
str) – The DLQ stream entry id to look up.
- Return type:
- Returns:
A dict with
dlq_msg_id,original_stream,original_msg_id,error,attempt, and the decodedpayload(possiblyNone), orNonewhen no entry exists at that id.
- async core.dlq.replay_dlq_entry(redis, dlq_msg_id, *, target_stream=None, migration_fn=None)[source]
Resurrect one DLQ entry back onto a live stream, optionally migrating it.
Reads the entry by exact id from
sg:stream:dlq, and re-publishes its msgpack body (with the preservedtsandtrace_idsidecars) onto a target stream viaXADD, then deletes the DLQ row withXDELso a successful replay is not double-counted. The destination defaults to theoriginal_streamcaptured at park time but can be overridden viatarget_stream(used to redirect to a different stream). Whenmigration_fnis supplied, the payload is msgpack-unpacked, passed through that callable, and re-packed before publishing, which lets an operator reshape an envelope to a newer schema during replay. An empty payload or a failing migration is logged and aborts the replay (returnsNone) without touching the streams.Performs an
XRANGEread plus, on success, anXADDand anXDELagainst Redis. Called byscripts/dlq_replay.py(the operator CLI that replays entries one-by-one or in bulk) and exercised by the migration tests intests/core/migration/test_dlq_replay.pyandtests/core/test_context_assembly_hardening.py.- Parameters:
redis (
Redis) – Async Redis client connected to the stream backend.dlq_msg_id (
str) – The DLQ stream entry id to replay.target_stream (
str|None) – Optional override destination stream; whenNonethe entry returns to the stream it originally failed on.migration_fn (
Any|None) – Optional callable applied to the unpacked payload to transform it before re-publishing.
- Return type:
- Returns:
The new stream message id of the re-published entry, or
Nonewhen the entry is missing, has an empty payload, or migration failed.