Source code for core.event_types

"""Versioned event envelope for the Redis Streams bus.

Defines :class:`EventEnvelope` (a frozen dataclass carrying event
identity, source service/instance, trace id, payload, and
``schema_version``) plus forward-compatible deserialization:
:func:`deserialize_envelope` ignores unknown fields and rejects
envelopes newer than :data:`SUPPORTED_SCHEMA_VERSION` (returning
``None`` so the caller can route them to the dead-letter queue).
"""

import json
import logging
from dataclasses import dataclass, fields

logger = logging.getLogger(__name__)

SUPPORTED_SCHEMA_VERSION = 1

[docs] @dataclass(frozen=True) class EventEnvelope: event_id: str = "" event_type: str = "" source_service: str = "" source_instance: str = "" trace_id: str = "" timestamp: float = 0.0 payload: dict = None data: bytes = b"" retry_count: int = 0 schema_version: int = 1
[docs] def deserialize_envelope(raw: dict) -> EventEnvelope | None: """Deserialize with forward compatibility. - Unknown fields are silently ignored. - If schema_version > SUPPORTED_SCHEMA_VERSION, return None (caller routes to DLQ). """ # Convert byte keys/values to strings for easier processing processed = {} for k, v in raw.items(): key_str = k.decode('utf-8') if isinstance(k, bytes) else str(k) if key_str == "data": processed[key_str] = v continue val_str = v.decode('utf-8') if isinstance(v, bytes) else v processed[key_str] = val_str version = int(processed.get("schema_version", 1)) if version > SUPPORTED_SCHEMA_VERSION: logger.warning( "unsupported_schema_version received=%d supported=%d", version, SUPPORTED_SCHEMA_VERSION, ) return None # Caller sends to DLQ # Parse payload if it's a JSON string if "payload" in processed and isinstance(processed["payload"], str): try: processed["payload"] = json.loads(processed["payload"]) except json.JSONDecodeError: pass # Keep as string if parsing fails, or could raise # Type casting for specific fields if "timestamp" in processed: processed["timestamp"] = float(processed["timestamp"]) if "retry_count" in processed: processed["retry_count"] = int(processed["retry_count"]) if "schema_version" in processed: processed["schema_version"] = int(processed["schema_version"]) # Build envelope, ignoring unknown keys known_keys = {f.name for f in fields(EventEnvelope)} filtered = {k: v for k, v in processed.items() if k in known_keys} try: return EventEnvelope(**filtered) except TypeError as e: logger.error(f"failed_to_build_envelope error={e}") return None