core.serialization moduleο
Serialization helpers for Redis Stream payloads and Hash fields.
- class core.serialization.InboundEnvelopeModel(**data)[source]ο
Bases:
BaseModelPydantic schema for the gateway-to-inference inbound message envelope.
The validated, runtime-checked contract for a message as it crosses the inbound Redis Stream: sender/channel/content fields plus the microservice transport extras (routing decision, alias set, unified user id, reaction summary).
validate_inbound_envelope()builds this from a raw dict to reject malformed payloads at the boundary;InboundEnvelopeis the lighterTypedDictview of the same shape used for static typing.- Parameters:
channel_key (str)
platform (str)
channel_id (str)
user_id (str)
username (str)
display_name (str | None)
content (str)
message_id (str)
timestamp (float)
reply_to (str | None)
room_name (str | None)
is_dm (bool)
guild_id (str | None)
trace_id (str)
enqueued_at (float)
is_addressed (bool)
strangler_route (str)
unified_user_id (str | None)
reactions (str)
- is_addressed: boolο
Whether the bot was explicitly addressed. Checked by InboundStreamConsumer to decide whether to acquire the per-channel distributed lock.
- class core.serialization.OutboundEnvelopeModel(**data)[source]ο
Bases:
BaseModelPydantic schema for the inference-to-gateway outbound response envelope.
The validated contract for a response leaving the inference worker on the outbound Redis Stream: target
channel_id/platform, atypetag (message/file/buttons/reaction/β¦), the optional per-type payload fields, and the idempotencymessage_keyplus tracing metadata. Validated byvalidate_outbound_envelope();OutboundEnvelopeis theTypedDictview of the same shape.- Parameters:
- core.serialization.validate_inbound_envelope(raw)[source]ο
Validate and normalize a raw inbound envelope through its Pydantic model.
Constructs an
InboundEnvelopeModelfromraw(which enforces required fields, types, and defaults for the gateway-to-inference stream contract) and returns it as a plain dict, so a malformed payload is rejected at the boundary rather than failing deeper in processing. Pure with respect to external state; the only side effect is raising on invalid input.Used as a schema guard before trusting a stream payload; in this repo it is exercised by the PEL-leak adversarial tests (
tests/adversarial/test_pel_leak_protection.py).
- core.serialization.validate_outbound_envelope(raw)[source]ο
Validate and normalize a raw outbound envelope through its Pydantic model.
Constructs an
OutboundEnvelopeModelfromraw(enforcing the inference-to-gateway response contract: requiredchannel_id/platform/trace_id/processed_atplus the optional per-type payload fields) and returns it as a plain dict, catching a malformed response before it reaches the outbound stream. The only side effect is raising on invalid input.A schema guard for the response path; no internal caller was found in this repo (it complements
validate_inbound_envelope()as the outbound counterpart and is available to validators/tests).
- class core.serialization.InboundEnvelope[source]ο
Bases:
TypedDictTypedDict shape of an inbound message envelope for static typing.
The structural, annotation-only view of an inbound stream payload (routing, sender, content, context, and tracing groups), mirroring
InboundEnvelopeModelwithout runtime validation. Used to type dict-shaped envelopes throughout the inference path; verified bytests/core/test_envelopes.py.
- class core.serialization.OutboundEnvelope[source]ο
Bases:
TypedDictTypedDict shape of an outbound response envelope for static typing.
The structural, annotation-only view of an outbound stream payload (routing, typed payload, and metadata groups), mirroring
OutboundEnvelopeModelwithout runtime validation. Used to type the response dicts the worker publishes; verified bytests/core/test_envelopes.py.
- core.serialization.serialize_stream_payload(data)[source]ο
Serialize a dict into a flat {field: msgpack_bytes} map for XADD.
Redis Streams store field-value pairs. We pack the entire payload into a single βdataβ field to avoid flattening nested structures.
- core.serialization.deserialize_stream_payload(raw)[source]ο
Unpack a Redis Stream entryβs packed
datafield back into a dict.The inverse of
serialize_stream_payload(): pulls the singledatafield (tolerating bothbytesandstrkeys, since redis-py may return either depending ondecode_responses) andmsgpack-unpacks it into the original Python dict. The transport metadata fields (ts,trace_id,schema_version) are intentionally ignored. Pure, with no I/O.Called on the read side of every stream by the consumers and dead-letter handler β
core/stream_consumer.py,core/outbound_consumer.py, andcore/dlq.pyβ and by the serialization/end-to-end tests.
- core.serialization.serialize_hash_field(value)[source]ο
Encode a Python value as a JSON string for storage in a Redis Hash field.
Redis Hash values are flat strings, so structured data (dicts, lists) must be JSON-encoded first. Uses
default=strso otherwise non-serializable values (e.g.datetime,UUID) degrade to their string form instead of raising. Pairs withdeserialize_hash_field()on read. Pure, with no I/O.A general hash-field codec helper; in this repo it is exercised directly by the serialization tests (
tests/core/test_serialization.py).
- core.serialization.deserialize_hash_field(raw, fallback=None)[source]ο
Decode a JSON string read from a Redis Hash field back into a Python value.
The inverse of
serialize_hash_field():json.loadsthe stored string, but returnsfallbackunchanged whenrawisNone(the common case of a missing hash field viahget) so callers need not special-case absent keys. Pure, with no I/O.A general hash-field codec helper; in this repo it is exercised directly by the serialization tests (
tests/core/test_serialization.py).- Parameters:
- Returns:
The decoded value, or
fallbackwhen the field was absent.- Return type:
- Raises:
json.JSONDecodeError β If
rawis a non-Nonebut invalid JSON string.