Source code for core.serialization

"""Serialization helpers for Redis Stream payloads and Hash fields."""

import msgpack
import json
import time
import uuid
from typing import Any, TypedDict, Optional, List, Dict
from pydantic import BaseModel, Field


[docs] class InboundEnvelopeModel(BaseModel): """Pydantic schema for the gateway-to-inference inbound message envelope. The validated, runtime-checked contract for a message as it crosses the inbound Redis Stream: sender/channel/content fields plus the microservice transport extras (routing decision, alias set, unified user id, reaction summary). :func:`validate_inbound_envelope` builds this from a raw dict to reject malformed payloads at the boundary; :class:`InboundEnvelope` is the lighter ``TypedDict`` view of the same shape used for static typing. """ channel_key: str platform: str channel_id: str user_id: str username: str display_name: Optional[str] = None content: str message_id: str timestamp: float attachments: List[Dict[str, Any]] = Field(default_factory=list) reply_to: Optional[str] = None embeds: Optional[List[Dict[str, Any]]] = None room_name: Optional[str] = None is_dm: bool = False guild_id: Optional[str] = None member_roles: List[str] = Field(default_factory=list) trace_id: str enqueued_at: float # --- Fields added for microservice stream transport --- is_addressed: bool = True """Whether the bot was explicitly addressed. Checked by InboundStreamConsumer to decide whether to acquire the per-channel distributed lock.""" strangler_route: str = "microservice" """Routing decision from StranglerRouter ('microservice' | 'monolith' | 'shadow').""" user_aliases: List[str] = Field(default_factory=list) """Full alias set for the sender (e.g. ['discord:12345']).""" unified_user_id: Optional[str] = None """Resolved Stargazer UUID if the user is linked, else None.""" reactions: str = "" """Serialized reaction summary for this message, e.g. '👍×3, 🔥×1'."""
[docs] class OutboundEnvelopeModel(BaseModel): """Pydantic schema for the inference-to-gateway outbound response envelope. The validated contract for a response leaving the inference worker on the outbound Redis Stream: target ``channel_id``/``platform``, a ``type`` tag (``message``/``file``/``buttons``/``reaction``/...), the optional per-type payload fields, and the idempotency ``message_key`` plus tracing metadata. Validated by :func:`validate_outbound_envelope`; :class:`OutboundEnvelope` is the ``TypedDict`` view of the same shape. """ channel_id: str platform: str type: str = "message" text: Optional[str] = None file_data: Optional[bytes] = None filename: Optional[str] = None buttons: Optional[List[Dict[str, Any]]] = None message_id: Optional[str] = None emoji: Optional[str] = None message_key: Optional[str] = None trace_id: str processed_at: float
[docs] def validate_inbound_envelope(raw: dict) -> dict: """Validate and normalize a raw inbound envelope through its Pydantic model. Constructs an :class:`InboundEnvelopeModel` from ``raw`` (which enforces required fields, types, and defaults for the gateway-to-inference stream contract) and returns it as a plain dict, so a malformed payload is rejected at the boundary rather than failing deeper in processing. Pure with respect to external state; the only side effect is raising on invalid input. Used as a schema guard before trusting a stream payload; in this repo it is exercised by the PEL-leak adversarial tests (``tests/adversarial/test_pel_leak_protection.py``). Args: raw: Decoded inbound stream payload to validate. Returns: dict: The validated, field-normalized envelope. Raises: pydantic.ValidationError: If ``raw`` violates the inbound schema. """ model = InboundEnvelopeModel(**raw) return model.dict()
[docs] def validate_outbound_envelope(raw: dict) -> dict: """Validate and normalize a raw outbound envelope through its Pydantic model. Constructs an :class:`OutboundEnvelopeModel` from ``raw`` (enforcing the inference-to-gateway response contract: required ``channel_id``/``platform``/ ``trace_id``/``processed_at`` plus the optional per-type payload fields) and returns it as a plain dict, catching a malformed response before it reaches the outbound stream. The only side effect is raising on invalid input. A schema guard for the response path; no internal caller was found in this repo (it complements :func:`validate_inbound_envelope` as the outbound counterpart and is available to validators/tests). Args: raw: Outbound response payload to validate. Returns: dict: The validated, field-normalized envelope. Raises: pydantic.ValidationError: If ``raw`` violates the outbound schema. """ model = OutboundEnvelopeModel(**raw) return model.dict()
[docs] class InboundEnvelope(TypedDict): """TypedDict shape of an inbound message envelope for static typing. The structural, annotation-only view of an inbound stream payload (routing, sender, content, context, and tracing groups), mirroring :class:`InboundEnvelopeModel` without runtime validation. Used to type dict-shaped envelopes throughout the inference path; verified by ``tests/core/test_envelopes.py``. """ # Routing channel_key: str # "{platform}:{channel_id}" platform: str # "discord" | "matrix" | "webchat" channel_id: str # Platform-specific channel identifier # Sender user_id: str username: str display_name: Optional[str] # Content content: str message_id: str timestamp: float attachments: List[Dict[str, Any]] # [{url, filename, content_type, size}] reply_to: Optional[str] # Message ID being replied to embeds: Optional[List[Dict[str, Any]]] # Discord embeds # Context room_name: Optional[str] is_dm: bool guild_id: Optional[str] # Discord-specific member_roles: List[str] # Discord roles for permission checks # Metadata trace_id: str # UUID for distributed tracing enqueued_at: float # time.time() at gateway
[docs] class OutboundEnvelope(TypedDict): """TypedDict shape of an outbound response envelope for static typing. The structural, annotation-only view of an outbound stream payload (routing, typed payload, and metadata groups), mirroring :class:`OutboundEnvelopeModel` without runtime validation. Used to type the response dicts the worker publishes; verified by ``tests/core/test_envelopes.py``. """ # Routing channel_id: str platform: str # Payload type: str # "message" | "file" | "buttons" | "typing" | "reaction" text: Optional[str] file_data: Optional[bytes] # Base64-encoded for msgpack filename: Optional[str] buttons: Optional[List[Dict[str, Any]]] message_id: Optional[str] # For reactions/edits emoji: Optional[str] message_key: Optional[str] # Metadata trace_id: str processed_at: float
[docs] def serialize_stream_payload(data: dict[str, Any]) -> dict[str, bytes]: """Serialize a dict into a flat {field: msgpack_bytes} map for XADD. Redis Streams store field-value pairs. We pack the entire payload into a single 'data' field to avoid flattening nested structures. """ return { "data": msgpack.packb(data, use_bin_type=True), "ts": str(time.time()).encode(), "trace_id": str(uuid.uuid4()).encode(), "schema_version": b"1", }
[docs] def deserialize_stream_payload(raw: dict[Any, Any]) -> dict[str, Any]: """Unpack a Redis Stream entry's packed ``data`` field back into a dict. The inverse of :func:`serialize_stream_payload`: pulls the single ``data`` field (tolerating both ``bytes`` and ``str`` keys, since redis-py may return either depending on ``decode_responses``) and ``msgpack``-unpacks it into the original Python dict. The transport metadata fields (``ts``, ``trace_id``, ``schema_version``) are intentionally ignored. Pure, with no I/O. Called on the read side of every stream by the consumers and dead-letter handler — ``core/stream_consumer.py``, ``core/outbound_consumer.py``, and ``core/dlq.py`` — and by the serialization/end-to-end tests. Args: raw: A stream entry's field map as returned by ``xreadgroup``/``xautoclaim``. Returns: dict[str, Any]: The original payload dict. Raises: KeyError: If the entry has no ``data`` field to unpack. """ data_bytes = raw.get(b"data") or raw.get("data") if data_bytes is None: raise KeyError("data") return msgpack.unpackb(data_bytes, raw=False)
[docs] def serialize_hash_field(value: Any) -> str: """Encode a Python value as a JSON string for storage in a Redis Hash field. Redis Hash values are flat strings, so structured data (dicts, lists) must be JSON-encoded first. Uses ``default=str`` so otherwise non-serializable values (e.g. ``datetime``, ``UUID``) degrade to their string form instead of raising. Pairs with :func:`deserialize_hash_field` on read. Pure, with no I/O. A general hash-field codec helper; in this repo it is exercised directly by the serialization tests (``tests/core/test_serialization.py``). Args: value: Any JSON-encodable (or stringifiable) Python value. Returns: str: The JSON-encoded representation. """ return json.dumps(value, default=str)
[docs] def deserialize_hash_field(raw: str, fallback: Any = None) -> Any: """Decode a JSON string read from a Redis Hash field back into a Python value. The inverse of :func:`serialize_hash_field`: ``json.loads`` the stored string, but returns ``fallback`` unchanged when ``raw`` is ``None`` (the common case of a missing hash field via ``hget``) so callers need not special-case absent keys. Pure, with no I/O. A general hash-field codec helper; in this repo it is exercised directly by the serialization tests (``tests/core/test_serialization.py``). Args: raw: The JSON string read from the hash field, or ``None`` if absent. fallback: Value to return when ``raw`` is ``None``. Defaults to ``None``. Returns: Any: The decoded value, or ``fallback`` when the field was absent. Raises: json.JSONDecodeError: If ``raw`` is a non-``None`` but invalid JSON string. """ if raw is None: return fallback return json.loads(raw)