core.serialization module

Serialization helpers for Redis Stream payloads and Hash fields.

class core.serialization.InboundEnvelopeModel(**data)[source]

Bases: BaseModel

Pydantic schema for the gateway-to-inference inbound message envelope.

The validated, runtime-checked contract for a message as it crosses the inbound Redis Stream: sender/channel/content fields plus the microservice transport extras (routing decision, alias set, unified user id, reaction summary). validate_inbound_envelope() builds this from a raw dict to reject malformed payloads at the boundary; InboundEnvelope is the lighter TypedDict view of the same shape used for static typing.

Parameters:
channel_key: str
platform: str
channel_id: str
user_id: str
username: str
display_name: str | None
content: str
message_id: str
timestamp: float
attachments: List[Dict[str, Any]]
reply_to: str | None
embeds: List[Dict[str, Any]] | None
room_name: str | None
is_dm: bool
guild_id: str | None
member_roles: List[str]
trace_id: str
enqueued_at: float
is_addressed: bool

Whether the bot was explicitly addressed. Checked by InboundStreamConsumer to decide whether to acquire the per-channel distributed lock.

strangler_route: str

Routing decision from StranglerRouter (β€˜microservice’ | β€˜monolith’ | β€˜shadow’).

user_aliases: List[str]

12345’]).

Type:

Full alias set for the sender (e.g. [β€˜discord

unified_user_id: str | None

Resolved Stargazer UUID if the user is linked, else None.

reactions: str

Serialized reaction summary for this message, e.g. β€˜πŸ‘Γ—3, πŸ”₯Γ—1’.

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class core.serialization.OutboundEnvelopeModel(**data)[source]

Bases: BaseModel

Pydantic schema for the inference-to-gateway outbound response envelope.

The validated contract for a response leaving the inference worker on the outbound Redis Stream: target channel_id/platform, a type tag (message/file/buttons/reaction/…), the optional per-type payload fields, and the idempotency message_key plus tracing metadata. Validated by validate_outbound_envelope(); OutboundEnvelope is the TypedDict view of the same shape.

Parameters:
channel_id: str
platform: str
type: str
text: str | None
file_data: bytes | None
filename: str | None
buttons: List[Dict[str, Any]] | None
message_id: str | None
emoji: str | None
message_key: str | None
trace_id: str
processed_at: float
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

core.serialization.validate_inbound_envelope(raw)[source]

Validate and normalize a raw inbound envelope through its Pydantic model.

Constructs an InboundEnvelopeModel from raw (which enforces required fields, types, and defaults for the gateway-to-inference stream contract) and returns it as a plain dict, so a malformed payload is rejected at the boundary rather than failing deeper in processing. Pure with respect to external state; the only side effect is raising on invalid input.

Used as a schema guard before trusting a stream payload; in this repo it is exercised by the PEL-leak adversarial tests (tests/adversarial/test_pel_leak_protection.py).

Parameters:

raw (dict) – Decoded inbound stream payload to validate.

Returns:

The validated, field-normalized envelope.

Return type:

dict

Raises:

pydantic.ValidationError – If raw violates the inbound schema.

core.serialization.validate_outbound_envelope(raw)[source]

Validate and normalize a raw outbound envelope through its Pydantic model.

Constructs an OutboundEnvelopeModel from raw (enforcing the inference-to-gateway response contract: required channel_id/platform/ trace_id/processed_at plus the optional per-type payload fields) and returns it as a plain dict, catching a malformed response before it reaches the outbound stream. The only side effect is raising on invalid input.

A schema guard for the response path; no internal caller was found in this repo (it complements validate_inbound_envelope() as the outbound counterpart and is available to validators/tests).

Parameters:

raw (dict) – Outbound response payload to validate.

Returns:

The validated, field-normalized envelope.

Return type:

dict

Raises:

pydantic.ValidationError – If raw violates the outbound schema.

class core.serialization.InboundEnvelope[source]

Bases: TypedDict

TypedDict shape of an inbound message envelope for static typing.

The structural, annotation-only view of an inbound stream payload (routing, sender, content, context, and tracing groups), mirroring InboundEnvelopeModel without runtime validation. Used to type dict-shaped envelopes throughout the inference path; verified by tests/core/test_envelopes.py.

channel_key: str
platform: str
channel_id: str
user_id: str
username: str
display_name: str | None
content: str
message_id: str
timestamp: float
attachments: List[Dict[str, Any]]
reply_to: str | None
embeds: List[Dict[str, Any]] | None
room_name: str | None
is_dm: bool
guild_id: str | None
member_roles: List[str]
trace_id: str
enqueued_at: float
class core.serialization.OutboundEnvelope[source]

Bases: TypedDict

TypedDict shape of an outbound response envelope for static typing.

The structural, annotation-only view of an outbound stream payload (routing, typed payload, and metadata groups), mirroring OutboundEnvelopeModel without runtime validation. Used to type the response dicts the worker publishes; verified by tests/core/test_envelopes.py.

channel_id: str
platform: str
type: str
text: str | None
file_data: bytes | None
filename: str | None
buttons: List[Dict[str, Any]] | None
message_id: str | None
emoji: str | None
message_key: str | None
trace_id: str
processed_at: float
core.serialization.serialize_stream_payload(data)[source]

Serialize a dict into a flat {field: msgpack_bytes} map for XADD.

Redis Streams store field-value pairs. We pack the entire payload into a single β€˜data’ field to avoid flattening nested structures.

Return type:

dict[str, bytes]

Parameters:

data (dict[str, Any])

core.serialization.deserialize_stream_payload(raw)[source]

Unpack a Redis Stream entry’s packed data field back into a dict.

The inverse of serialize_stream_payload(): pulls the single data field (tolerating both bytes and str keys, since redis-py may return either depending on decode_responses) and msgpack-unpacks it into the original Python dict. The transport metadata fields (ts, trace_id, schema_version) are intentionally ignored. Pure, with no I/O.

Called on the read side of every stream by the consumers and dead-letter handler β€” core/stream_consumer.py, core/outbound_consumer.py, and core/dlq.py β€” and by the serialization/end-to-end tests.

Parameters:

raw (dict[Any, Any]) – A stream entry’s field map as returned by xreadgroup/xautoclaim.

Returns:

The original payload dict.

Return type:

dict[str, Any]

Raises:

KeyError – If the entry has no data field to unpack.

core.serialization.serialize_hash_field(value)[source]

Encode a Python value as a JSON string for storage in a Redis Hash field.

Redis Hash values are flat strings, so structured data (dicts, lists) must be JSON-encoded first. Uses default=str so otherwise non-serializable values (e.g. datetime, UUID) degrade to their string form instead of raising. Pairs with deserialize_hash_field() on read. Pure, with no I/O.

A general hash-field codec helper; in this repo it is exercised directly by the serialization tests (tests/core/test_serialization.py).

Parameters:

value (Any) – Any JSON-encodable (or stringifiable) Python value.

Returns:

The JSON-encoded representation.

Return type:

str

core.serialization.deserialize_hash_field(raw, fallback=None)[source]

Decode a JSON string read from a Redis Hash field back into a Python value.

The inverse of serialize_hash_field(): json.loads the stored string, but returns fallback unchanged when raw is None (the common case of a missing hash field via hget) so callers need not special-case absent keys. Pure, with no I/O.

A general hash-field codec helper; in this repo it is exercised directly by the serialization tests (tests/core/test_serialization.py).

Parameters:
  • raw (str) – The JSON string read from the hash field, or None if absent.

  • fallback (Any) – Value to return when raw is None. Defaults to None.

Returns:

The decoded value, or fallback when the field was absent.

Return type:

Any

Raises:

json.JSONDecodeError – If raw is a non-None but invalid JSON string.