Source code for core.log_config
"""Structured logging configuration for the distributed Stargazer services.
Provides :class:`StructuredFormatter`, a JSON-lines log formatter that stamps
every record with the process-wide ``NODE_ID`` / ``NODE_ROLE`` (read once from
the ``SG_NODE_ID`` / ``SG_NODE_ROLE`` environment variables) plus any per-event
trace fields, and :func:`configure_logging`, which installs that formatter on
the root logger from a ``Config``. The shared envelope of fields is what makes a
single message greppable as it hops across the gateway / inference / agents /
consolidation / web services over Redis Streams.
Note: this module is the original ``core.log_config`` implementation; the live
runtime path is the newer ``core.structured_logger`` (which exposes its own
``configure_logging``). The symbols here are currently referenced only by the
tests under ``tests/core/`` that pin this module's behaviour.
"""
import logging
import json
import time
import os
from typing import Any
# Every log line from the event bus and stream consumers must include these fields.
# This enables grep-based correlation across distributed nodes.
NODE_ID = os.environ.get("SG_NODE_ID", "standalone")
NODE_ROLE = os.environ.get("SG_NODE_ROLE", "standalone") # gateway | worker | standalone
[docs]
class StructuredFormatter(logging.Formatter):
"""Logging formatter that renders each record as one line of JSON.
A :class:`logging.Formatter` subclass whose :meth:`format` emits a flat JSON
object per log record -- timestamp, level, logger, message, the process-wide
``NODE_ID`` / ``NODE_ROLE``, and any optional trace fields the caller
attached via ``extra`` -- so logs from every Stargazer service can be
aggregated and correlated by ``trace_id`` or ``stream_msg_id`` with simple
text tooling.
Installed on the root logger's stdout handler by :func:`configure_logging`
when structured logging is enabled, and constructed directly by
``tests/core/test_log_config.py``.
"""
[docs]
def format(self, record: logging.LogRecord) -> str:
"""Render a single log record as a one-line JSON string for distributed tracing.
Builds a flat dictionary containing the timestamp, level, logger name,
rendered message, and the process-wide ``NODE_ID`` / ``NODE_ROLE``
(read once at import time from the ``SG_NODE_ID`` and ``SG_NODE_ROLE``
environment variables). It then opportunistically copies a fixed set of
optional trace fields off the record -- ``stream_msg_id``,
``channel_id``, ``platform``, ``user_id``, and ``trace_id`` -- whenever
the caller has attached them via ``logger.x(..., extra={...})``, which is
what enables grep-based correlation of a single message as it flows across
the gateway / inference / agents / consolidation / web services over Redis
Streams. If the record carries exception info, the formatted traceback is
added under ``exception``. The result is serialised with ``json.dumps``.
This is invoked implicitly by the logging framework for every record once
an instance is installed as a handler's formatter; the formatter itself is
instantiated by :func:`configure_logging` in this module and constructed
directly in ``tests/core/test_log_config.py``.
Args:
record: The log record produced by the standard logging machinery.
Returns:
A JSON-encoded string terminated by no newline (the handler adds it),
containing the core fields plus any present optional trace fields and
exception traceback.
"""
log_entry: dict[str, Any] = {
"ts": time.time(),
"level": record.levelname,
"logger": record.name,
"msg": record.getMessage(),
"node_id": NODE_ID,
"node_role": NODE_ROLE,
}
# Attach optional trace fields if set on the record
for field in ("stream_msg_id", "channel_id", "platform", "user_id", "trace_id"):
val = getattr(record, field, None)
if val is not None:
log_entry[field] = val
if record.exc_info and record.exc_info[1]:
log_entry["exception"] = self.formatException(record.exc_info)
return json.dumps(log_entry)
[docs]
def configure_logging(cfg: Any) -> None:
"""Reconfigure the root logger from a ``Config`` object.
Sets the root log level from ``cfg.log_level``, strips any existing handlers
to avoid duplicate output on re-init, and attaches a single fresh
``StreamHandler`` writing to ``stdout``. The handler is given either a
:class:`StructuredFormatter` (JSON lines, when ``cfg.structured_logging`` is
truthy) or a plain text formatter, and an ``ApiKeyRedactionFilter`` from
``log_redaction`` so secrets are scrubbed before anything is emitted.
Mutates global logging state (the root logger's level, handlers, and
filters) as a side effect and reads ``log_level`` / ``structured_logging``
off the supplied config. No production callers were found by grep; it is
invoked by the logging tests under ``tests/core/`` and stands in parallel to
the live ``core.structured_logger.configure_logging``.
Args:
cfg: A configuration object exposing ``log_level`` and
``structured_logging`` attributes (both read defensively via
``getattr`` with safe defaults).
"""
log_level_str = getattr(cfg, "log_level", "INFO").upper()
level = getattr(logging, log_level_str, logging.INFO)
root = logging.getLogger()
root.setLevel(level)
# Clean up any existing stream/file handlers to avoid double outputs
for handler in list(root.handlers):
root.removeHandler(handler)
# Create new standard stdout stream handler
import sys
ch = logging.StreamHandler(sys.stdout)
ch.setLevel(level)
# Configure formatter
structured = getattr(cfg, "structured_logging", False)
if structured:
formatter = StructuredFormatter()
else:
formatter = logging.Formatter("%(asctime)s [%(levelname)s] %(name)s: %(message)s")
ch.setFormatter(formatter)
# Add API key redaction filter to the handler
from log_redaction import ApiKeyRedactionFilter
ch.addFilter(ApiKeyRedactionFilter())
root.addHandler(ch)