"""Serialization helpers for Redis Stream payloads and Hash fields."""
import msgpack
import json
import time
import uuid
from typing import Any, TypedDict, Optional, List, Dict
from pydantic import BaseModel, Field
[docs]
class InboundEnvelopeModel(BaseModel):
"""Pydantic schema for the gateway-to-inference inbound message envelope.
The validated, runtime-checked contract for a message as it crosses the
inbound Redis Stream: sender/channel/content fields plus the microservice
transport extras (routing decision, alias set, unified user id, reaction
summary). :func:`validate_inbound_envelope` builds this from a raw dict to
reject malformed payloads at the boundary; :class:`InboundEnvelope` is the
lighter ``TypedDict`` view of the same shape used for static typing.
"""
channel_key: str
platform: str
channel_id: str
user_id: str
username: str
display_name: Optional[str] = None
content: str
message_id: str
timestamp: float
attachments: List[Dict[str, Any]] = Field(default_factory=list)
reply_to: Optional[str] = None
embeds: Optional[List[Dict[str, Any]]] = None
room_name: Optional[str] = None
is_dm: bool = False
guild_id: Optional[str] = None
member_roles: List[str] = Field(default_factory=list)
trace_id: str
enqueued_at: float
# --- Fields added for microservice stream transport ---
is_addressed: bool = True
"""Whether the bot was explicitly addressed. Checked by InboundStreamConsumer
to decide whether to acquire the per-channel distributed lock."""
strangler_route: str = "microservice"
"""Routing decision from StranglerRouter ('microservice' | 'monolith' | 'shadow')."""
user_aliases: List[str] = Field(default_factory=list)
"""Full alias set for the sender (e.g. ['discord:12345'])."""
unified_user_id: Optional[str] = None
"""Resolved Stargazer UUID if the user is linked, else None."""
reactions: str = ""
"""Serialized reaction summary for this message, e.g. '👍×3, 🔥×1'."""
[docs]
class OutboundEnvelopeModel(BaseModel):
"""Pydantic schema for the inference-to-gateway outbound response envelope.
The validated contract for a response leaving the inference worker on the
outbound Redis Stream: target ``channel_id``/``platform``, a ``type`` tag
(``message``/``file``/``buttons``/``reaction``/...), the optional per-type
payload fields, and the idempotency ``message_key`` plus tracing metadata.
Validated by :func:`validate_outbound_envelope`; :class:`OutboundEnvelope` is
the ``TypedDict`` view of the same shape.
"""
channel_id: str
platform: str
type: str = "message"
text: Optional[str] = None
file_data: Optional[bytes] = None
filename: Optional[str] = None
buttons: Optional[List[Dict[str, Any]]] = None
message_id: Optional[str] = None
emoji: Optional[str] = None
message_key: Optional[str] = None
trace_id: str
processed_at: float
[docs]
def validate_inbound_envelope(raw: dict) -> dict:
"""Validate and normalize a raw inbound envelope through its Pydantic model.
Constructs an :class:`InboundEnvelopeModel` from ``raw`` (which enforces
required fields, types, and defaults for the gateway-to-inference stream
contract) and returns it as a plain dict, so a malformed payload is rejected
at the boundary rather than failing deeper in processing. Pure with respect
to external state; the only side effect is raising on invalid input.
Used as a schema guard before trusting a stream payload; in this repo it is
exercised by the PEL-leak adversarial tests
(``tests/adversarial/test_pel_leak_protection.py``).
Args:
raw: Decoded inbound stream payload to validate.
Returns:
dict: The validated, field-normalized envelope.
Raises:
pydantic.ValidationError: If ``raw`` violates the inbound schema.
"""
model = InboundEnvelopeModel(**raw)
return model.dict()
[docs]
def validate_outbound_envelope(raw: dict) -> dict:
"""Validate and normalize a raw outbound envelope through its Pydantic model.
Constructs an :class:`OutboundEnvelopeModel` from ``raw`` (enforcing the
inference-to-gateway response contract: required ``channel_id``/``platform``/
``trace_id``/``processed_at`` plus the optional per-type payload fields) and
returns it as a plain dict, catching a malformed response before it reaches
the outbound stream. The only side effect is raising on invalid input.
A schema guard for the response path; no internal caller was found in this
repo (it complements :func:`validate_inbound_envelope` as the outbound
counterpart and is available to validators/tests).
Args:
raw: Outbound response payload to validate.
Returns:
dict: The validated, field-normalized envelope.
Raises:
pydantic.ValidationError: If ``raw`` violates the outbound schema.
"""
model = OutboundEnvelopeModel(**raw)
return model.dict()
[docs]
class InboundEnvelope(TypedDict):
"""TypedDict shape of an inbound message envelope for static typing.
The structural, annotation-only view of an inbound stream payload (routing,
sender, content, context, and tracing groups), mirroring
:class:`InboundEnvelopeModel` without runtime validation. Used to type
dict-shaped envelopes throughout the inference path; verified by
``tests/core/test_envelopes.py``.
"""
# Routing
channel_key: str # "{platform}:{channel_id}"
platform: str # "discord" | "matrix" | "webchat"
channel_id: str # Platform-specific channel identifier
# Sender
user_id: str
username: str
display_name: Optional[str]
# Content
content: str
message_id: str
timestamp: float
attachments: List[Dict[str, Any]] # [{url, filename, content_type, size}]
reply_to: Optional[str] # Message ID being replied to
embeds: Optional[List[Dict[str, Any]]] # Discord embeds
# Context
room_name: Optional[str]
is_dm: bool
guild_id: Optional[str] # Discord-specific
member_roles: List[str] # Discord roles for permission checks
# Metadata
trace_id: str # UUID for distributed tracing
enqueued_at: float # time.time() at gateway
[docs]
class OutboundEnvelope(TypedDict):
"""TypedDict shape of an outbound response envelope for static typing.
The structural, annotation-only view of an outbound stream payload (routing,
typed payload, and metadata groups), mirroring
:class:`OutboundEnvelopeModel` without runtime validation. Used to type the
response dicts the worker publishes; verified by
``tests/core/test_envelopes.py``.
"""
# Routing
channel_id: str
platform: str
# Payload
type: str # "message" | "file" | "buttons" | "typing" | "reaction"
text: Optional[str]
file_data: Optional[bytes] # Base64-encoded for msgpack
filename: Optional[str]
buttons: Optional[List[Dict[str, Any]]]
message_id: Optional[str] # For reactions/edits
emoji: Optional[str]
message_key: Optional[str]
# Metadata
trace_id: str
processed_at: float
[docs]
def serialize_stream_payload(data: dict[str, Any]) -> dict[str, bytes]:
"""Serialize a dict into a flat {field: msgpack_bytes} map for XADD.
Redis Streams store field-value pairs. We pack the entire payload
into a single 'data' field to avoid flattening nested structures.
"""
return {
"data": msgpack.packb(data, use_bin_type=True),
"ts": str(time.time()).encode(),
"trace_id": str(uuid.uuid4()).encode(),
"schema_version": b"1",
}
[docs]
def deserialize_stream_payload(raw: dict[Any, Any]) -> dict[str, Any]:
"""Unpack a Redis Stream entry's packed ``data`` field back into a dict.
The inverse of :func:`serialize_stream_payload`: pulls the single ``data``
field (tolerating both ``bytes`` and ``str`` keys, since redis-py may return
either depending on ``decode_responses``) and ``msgpack``-unpacks it into the
original Python dict. The transport metadata fields (``ts``, ``trace_id``,
``schema_version``) are intentionally ignored. Pure, with no I/O.
Called on the read side of every stream by the consumers and dead-letter
handler — ``core/stream_consumer.py``, ``core/outbound_consumer.py``, and
``core/dlq.py`` — and by the serialization/end-to-end tests.
Args:
raw: A stream entry's field map as returned by ``xreadgroup``/``xautoclaim``.
Returns:
dict[str, Any]: The original payload dict.
Raises:
KeyError: If the entry has no ``data`` field to unpack.
"""
data_bytes = raw.get(b"data") or raw.get("data")
if data_bytes is None:
raise KeyError("data")
return msgpack.unpackb(data_bytes, raw=False)
[docs]
def serialize_hash_field(value: Any) -> str:
"""Encode a Python value as a JSON string for storage in a Redis Hash field.
Redis Hash values are flat strings, so structured data (dicts, lists) must be
JSON-encoded first. Uses ``default=str`` so otherwise non-serializable values
(e.g. ``datetime``, ``UUID``) degrade to their string form instead of raising.
Pairs with :func:`deserialize_hash_field` on read. Pure, with no I/O.
A general hash-field codec helper; in this repo it is exercised directly by
the serialization tests (``tests/core/test_serialization.py``).
Args:
value: Any JSON-encodable (or stringifiable) Python value.
Returns:
str: The JSON-encoded representation.
"""
return json.dumps(value, default=str)
[docs]
def deserialize_hash_field(raw: str, fallback: Any = None) -> Any:
"""Decode a JSON string read from a Redis Hash field back into a Python value.
The inverse of :func:`serialize_hash_field`: ``json.loads`` the stored string,
but returns ``fallback`` unchanged when ``raw`` is ``None`` (the common case
of a missing hash field via ``hget``) so callers need not special-case absent
keys. Pure, with no I/O.
A general hash-field codec helper; in this repo it is exercised directly by
the serialization tests (``tests/core/test_serialization.py``).
Args:
raw: The JSON string read from the hash field, or ``None`` if absent.
fallback: Value to return when ``raw`` is ``None``. Defaults to ``None``.
Returns:
Any: The decoded value, or ``fallback`` when the field was absent.
Raises:
json.JSONDecodeError: If ``raw`` is a non-``None`` but invalid JSON string.
"""
if raw is None:
return fallback
return json.loads(raw)