Source code for core.log_config

"""Structured logging configuration for the distributed Stargazer services.

Provides :class:`StructuredFormatter`, a JSON-lines log formatter that stamps
every record with the process-wide ``NODE_ID`` / ``NODE_ROLE`` (read once from
the ``SG_NODE_ID`` / ``SG_NODE_ROLE`` environment variables) plus any per-event
trace fields, and :func:`configure_logging`, which installs that formatter on
the root logger from a ``Config``. The shared envelope of fields is what makes a
single message greppable as it hops across the gateway / inference / agents /
consolidation / web services over Redis Streams.

Note: this module is the original ``core.log_config`` implementation; the live
runtime path is the newer ``core.structured_logger`` (which exposes its own
``configure_logging``). The symbols here are currently referenced only by the
tests under ``tests/core/`` that pin this module's behaviour.
"""

import logging
import json
import time
import os
from typing import Any

# Every log line from the event bus and stream consumers must include these fields.
# This enables grep-based correlation across distributed nodes.

NODE_ID = os.environ.get("SG_NODE_ID", "standalone")
NODE_ROLE = os.environ.get("SG_NODE_ROLE", "standalone")  # gateway | worker | standalone


[docs] class StructuredFormatter(logging.Formatter): """Logging formatter that renders each record as one line of JSON. A :class:`logging.Formatter` subclass whose :meth:`format` emits a flat JSON object per log record -- timestamp, level, logger, message, the process-wide ``NODE_ID`` / ``NODE_ROLE``, and any optional trace fields the caller attached via ``extra`` -- so logs from every Stargazer service can be aggregated and correlated by ``trace_id`` or ``stream_msg_id`` with simple text tooling. Installed on the root logger's stdout handler by :func:`configure_logging` when structured logging is enabled, and constructed directly by ``tests/core/test_log_config.py``. """
[docs] def format(self, record: logging.LogRecord) -> str: """Render a single log record as a one-line JSON string for distributed tracing. Builds a flat dictionary containing the timestamp, level, logger name, rendered message, and the process-wide ``NODE_ID`` / ``NODE_ROLE`` (read once at import time from the ``SG_NODE_ID`` and ``SG_NODE_ROLE`` environment variables). It then opportunistically copies a fixed set of optional trace fields off the record -- ``stream_msg_id``, ``channel_id``, ``platform``, ``user_id``, and ``trace_id`` -- whenever the caller has attached them via ``logger.x(..., extra={...})``, which is what enables grep-based correlation of a single message as it flows across the gateway / inference / agents / consolidation / web services over Redis Streams. If the record carries exception info, the formatted traceback is added under ``exception``. The result is serialised with ``json.dumps``. This is invoked implicitly by the logging framework for every record once an instance is installed as a handler's formatter; the formatter itself is instantiated by :func:`configure_logging` in this module and constructed directly in ``tests/core/test_log_config.py``. Args: record: The log record produced by the standard logging machinery. Returns: A JSON-encoded string terminated by no newline (the handler adds it), containing the core fields plus any present optional trace fields and exception traceback. """ log_entry: dict[str, Any] = { "ts": time.time(), "level": record.levelname, "logger": record.name, "msg": record.getMessage(), "node_id": NODE_ID, "node_role": NODE_ROLE, } # Attach optional trace fields if set on the record for field in ("stream_msg_id", "channel_id", "platform", "user_id", "trace_id"): val = getattr(record, field, None) if val is not None: log_entry[field] = val if record.exc_info and record.exc_info[1]: log_entry["exception"] = self.formatException(record.exc_info) return json.dumps(log_entry)
[docs] def configure_logging(cfg: Any) -> None: """Reconfigure the root logger from a ``Config`` object. Sets the root log level from ``cfg.log_level``, strips any existing handlers to avoid duplicate output on re-init, and attaches a single fresh ``StreamHandler`` writing to ``stdout``. The handler is given either a :class:`StructuredFormatter` (JSON lines, when ``cfg.structured_logging`` is truthy) or a plain text formatter, and an ``ApiKeyRedactionFilter`` from ``log_redaction`` so secrets are scrubbed before anything is emitted. Mutates global logging state (the root logger's level, handlers, and filters) as a side effect and reads ``log_level`` / ``structured_logging`` off the supplied config. No production callers were found by grep; it is invoked by the logging tests under ``tests/core/`` and stands in parallel to the live ``core.structured_logger.configure_logging``. Args: cfg: A configuration object exposing ``log_level`` and ``structured_logging`` attributes (both read defensively via ``getattr`` with safe defaults). """ log_level_str = getattr(cfg, "log_level", "INFO").upper() level = getattr(logging, log_level_str, logging.INFO) root = logging.getLogger() root.setLevel(level) # Clean up any existing stream/file handlers to avoid double outputs for handler in list(root.handlers): root.removeHandler(handler) # Create new standard stdout stream handler import sys ch = logging.StreamHandler(sys.stdout) ch.setLevel(level) # Configure formatter structured = getattr(cfg, "structured_logging", False) if structured: formatter = StructuredFormatter() else: formatter = logging.Formatter("%(asctime)s [%(levelname)s] %(name)s: %(message)s") ch.setFormatter(formatter) # Add API key redaction filter to the handler from log_redaction import ApiKeyRedactionFilter ch.addFilter(ApiKeyRedactionFilter()) root.addHandler(ch)