Source code for inference_main

"""Inference service entry point — the LLM message-processing worker.

The Inference service runs :class:`InferenceService`, a
:class:`~core.service_base.StargazerService` that does the heavy work the
monolith used to do, but as a horizontally scalable, stateless worker:

* Consumes the ``sg:stream:inbound`` Redis stream via
  :class:`~core.stream_consumer.InboundStreamConsumer` (consumer group
  ``sg:workers``), which acquires a per-channel
  :class:`~core.distributed_lock.DistributedLock` so messages for one
  channel are processed serially.
* Reconstructs the :class:`~platforms.base.IncomingMessage` from the
  envelope (:func:`_incoming_message_from_envelope`, the inverse of the
  gateway's codec) and hands it to
  :meth:`message_processor.processor.MessageProcessor.handle_message`.
* Supplies a :class:`~core.proxy_adapter.ProxyPlatformAdapter` in place of a
  real platform client: every ``send``/``send_file``/reaction the processor
  performs is published to ``sg:stream:outbound:{platform}`` for the Gateway
  to dispatch. Tools needing raw SDK access are delegated back to the
  Gateway over the bus.
* Wires up the processor's collaborators: :class:`ConversationManager`,
  :class:`MessageCache`, :class:`KnowledgeGraphManager`,
  :class:`RAGAutoSearchManager`, :class:`OpenRouterClient`,
  :class:`ToolRegistry`, the SWORD system, and a leader-gated
  :class:`StatusManager` for presence updates.

Launched standalone (``python inference_main.py``) by
``scripts/systemd/stargazer-inference.service``; scale by running multiple
instances (they share the ``sg:workers`` consumer group).
"""

import asyncio
import base64
import json
import logging
import sys
import uuid
import signal
from datetime import datetime, timezone
from typing import Any

from config import Config
from core.service_base import StargazerService
from core.stream_consumer import InboundStreamConsumer
from core.proxy_adapter import ProxyPlatformAdapter
from core.event_bus import RedisEventBus
from message_processor.processor import MessageProcessor
from tools import ToolRegistry
from prompt_renderer import PromptRenderer
from conversation import ConversationManager
from openrouter_client import OpenRouterClient
from message_cache import MessageCache
from knowledge_graph import KnowledgeGraphManager
from rag_system.auto_search import RAGAutoSearchManager
from web_search_context import WebSearchContextManager
from threadweave import ThreadweaveManager
from task_manager import TaskManager
from embedding_queue import EmbeddingBatchQueue
from background_tasks import BackgroundScheduler, channel_heartbeat_task
from sword.factory import build_sword_system
from status_manager import StatusManager

logger = logging.getLogger("inference")


def _incoming_message_from_envelope(payload: dict) -> Any:
    """Reconstruct an IncomingMessage dataclass from a deserialized InboundEnvelope dict.

    This is the inverse codec of _make_inbound_envelope() in gateway_main.py.
    It is the single canonical translation point between the wire format (flat dict
    produced by InboundEnvelopeModel.model_dump()) and the IncomingMessage dataclass
    that the entire MessageProcessor API expects.

    Key type restoration:
      - attachments: base64 str → bytes (Attachment.data)
      - timestamp:   float (Unix)  → datetime (UTC)
      - extra:       top-level envelope fields → nested extra dict (as adapters build it)

    Extra keys populated:
      _identity_resolved  - always False; set True by _resolve_identity() after lookup
      guild_id            - from envelope.guild_id
      guild_name          - not in envelope; defaults to "" (populated by record_channel_metadata)
      channel_id_raw      - same as channel_id (used by has_scoped_privilege)
      is_dm               - from envelope.is_dm
      member_roles        - from envelope.member_roles
      embeds              - from envelope.embeds
      member_count        - not in envelope; defaults to 0 (population kill-switch guard)
    """
    from platforms.base import IncomingMessage, Attachment

    attachments = [
        Attachment(
            data=base64.b64decode(a["data"]) if a.get("data") else b"",
            mimetype=a.get("mimetype", ""),
            filename=a.get("filename", ""),
            source_url=a.get("source_url", ""),
        )
        for a in (payload.get("attachments") or [])
    ]

    ts_raw = payload.get("timestamp", 0.0)
    timestamp = (
        datetime.fromtimestamp(ts_raw, tz=timezone.utc)
        if ts_raw
        else datetime.now(timezone.utc)
    )

    extra: dict[str, Any] = {
        "_identity_resolved": False,
        "guild_id": payload.get("guild_id"),
        "guild_name": payload.get("guild_name", ""),
        "channel_id_raw": payload.get("channel_id", ""),
        "is_dm": payload.get("is_dm", False),
        "member_roles": payload.get("member_roles") or [],
        "embeds": payload.get("embeds"),
        "member_count": payload.get("member_count", 0),
    }

    return IncomingMessage(
        platform=payload.get("platform", ""),
        channel_id=payload.get("channel_id", ""),
        user_id=payload.get("user_id", ""),
        user_name=payload.get("username", ""),
        text=payload.get("content", ""),
        is_addressed=payload.get("is_addressed", True),
        attachments=attachments,
        channel_name=payload.get("room_name", "") or "",
        timestamp=timestamp,
        message_id=payload.get("message_id", ""),
        reply_to_id=payload.get("reply_to", "") or "",
        extra=extra,
        reactions=payload.get("reactions", ""),
        unified_user_id=payload.get("unified_user_id"),
        user_aliases=payload.get("user_aliases") or [],
    )


[docs] class InferenceService(StargazerService): """Stateless LLM message-processing worker for the inference service. A :class:`~core.service_base.StargazerService` subclass that consumes the ``sg:stream:inbound`` Redis stream, reconstructs each :class:`~platforms.base.IncomingMessage` from its envelope, and hands it to a :class:`~message_processor.processor.MessageProcessor`. Because the worker holds no real platform SDK, every outbound action is published via a :class:`~core.proxy_adapter.ProxyPlatformAdapter` over a :class:`~core.event_bus.RedisEventBus` for the Gateway to dispatch. Multiple instances scale horizontally by sharing the ``sg:workers`` consumer group; presence cycling is leader-gated through :meth:`_status_leader_guard`. See the module docstring for the full collaborator wiring. """
[docs] def __init__( self, config: Config, redis_client: Any, instance_id: str, use_health_server: bool = True ): """Construct an inference worker, registering it as the ``"inference"`` service. Stores the loaded :class:`~config.Config` and pre-declares the worker's collaborators (consumer, processor, event bus, scheduler, status manager) as ``None`` so they can be wired up later in :meth:`on_start`. The actual Redis connection, tool/model/processor initialization, and stream consumer all happen in :meth:`on_start`; this only sets up bookkeeping state and the internal stop event used by :meth:`run`/:meth:`on_stop`. Delegates to :meth:`StargazerService.__init__` with ``service_name="inference"`` and ``redis_required=True``, which sets up the shared service lifecycle and (when ``use_health_server`` is set) the health server. Called from :func:`main` with a freshly built async Redis client, and directly from the migration tests in ``tests/core/migration``. Args: config: The loaded application configuration supplying API keys, model settings, Redis connection details, and tool permissions. redis_client: A pre-built async Redis client (decode-disabled) handed to the base service; further per-component clients are derived in :meth:`on_start`. instance_id: Unique worker identifier (e.g. ``"worker-ab12cd34"``) used as the consumer name in the ``sg:workers`` group and as the status-leader lease value. use_health_server: When True, the base service stands up its HTTP health endpoint; tests pass False to skip it. """ super().__init__( service_name="inference", instance_id=instance_id, redis_client=redis_client, redis_required=True, use_health_server=use_health_server ) self.cfg = config self.consumer: InboundStreamConsumer | None = None self.processor: MessageProcessor | None = None self.event_bus: RedisEventBus | None = None self._stop_event = asyncio.Event() self._config_listener_task: asyncio.Task | None = None self.scheduler: BackgroundScheduler | None = None self.status_manager: StatusManager | None = None self._status_leader_key = "sg:leader:status"
[docs] def get_adapter(self, platform_name: str) -> ProxyPlatformAdapter: """Return a worker-side proxy adapter for a platform. Since the inference worker holds no real platform SDK client, every adapter is a :class:`~core.proxy_adapter.ProxyPlatformAdapter` bound to this service's :attr:`event_bus`; its ``send``/``send_file``/reaction calls are published to ``sg:stream:outbound:{platform}`` for the Gateway to dispatch against the live client. A new adapter instance is created per call (they are cheap, stateless wrappers over the shared bus). Called by :meth:`on_start` to supply the ``"discord"`` adapter the :class:`StatusManager` uses to publish presence actions, and by the migration tests in ``tests/core/migration``. (The same method name is provided by the other service entry points so collaborators like the status manager and heartbeat tasks can request an adapter uniformly.) Args: platform_name: The platform identifier (e.g. ``"discord"``) selecting the outbound stream the returned adapter publishes to. Returns: ProxyPlatformAdapter: A fresh proxy adapter bound to ``self.event_bus`` and ``platform_name``. """ return ProxyPlatformAdapter(self.event_bus, platform_name)
async def _status_leader_guard(self) -> bool: """Return True when this worker should drive the periodic status loop. Uses a short-lived Redis lease so exactly one worker generates and publishes presence at a time (multiple workers would otherwise each call the LLM and fight over the bot's status). The holder renews its own lease; other workers stay idle until it expires. """ redis = self.redis if redis is None: return True # standalone / no coordination available key = self._status_leader_key ttl = max(60, int(self.status_manager.config.glitch_interval) * 3) if self.status_manager else 180 try: acquired = await redis.set(key, self.instance_id, nx=True, ex=ttl) if acquired: return True current = await redis.get(key) current_str = current.decode() if isinstance(current, bytes) else (current or "") if current_str == self.instance_id: await redis.expire(key, ttl) # renew our own lease return True except Exception: logger.debug("Status leader guard check failed", exc_info=True) return False
[docs] async def on_start(self) -> None: """Initialize models, tools, and the message processor for the worker. The heavy startup phase, invoked by :meth:`StargazerService.boot` after the Redis ping. It builds the full processing stack: discovers and loads tools into a :class:`~tools.ToolRegistry`, constructs the :class:`~prompt_renderer.PromptRenderer`, connects the LLM via :class:`~openrouter_client.OpenRouterClient`, and stands up the Redis-backed managers (:class:`~message_cache.MessageCache`, :class:`~conversation.ConversationManager`, :class:`~knowledge_graph.KnowledgeGraphManager`, the SWORD system, RAG/web-search/threadweave/task managers, and the embedding queue). It wires a :class:`~core.event_bus.RedisEventBus` and a leader-gated :class:`~status_manager.StatusManager` (whose presence updates publish through the discord :class:`~core.proxy_adapter.ProxyPlatformAdapter`), assembles the :class:`~message_processor.processor.MessageProcessor`, then starts the :class:`~core.stream_consumer.InboundStreamConsumer` (bound to :meth:`_process_message`), the heartbeat :class:`BackgroundScheduler`, and the config-update pubsub listener (:meth:`_listen_config_updates`). Side effects are extensive: it opens Redis connections, registers the observability Redis client, ensures the bus streams exist, and launches several long-lived background tasks. Called only by the base service's ``boot`` sequence (no direct callers); the migration tests in ``tests/core/migration`` drive it through ``boot`` as well. """ logger.info("Initializing Inference dependencies...") # Tools self.tool_registry = ToolRegistry() if self.cfg.tool_permissions: self.tool_registry.set_permissions(self.cfg.tool_permissions) from tool_loader import load_tools logger.info("Asynchronously discovering and loading tools from: %s", self.cfg.tools_dir) await asyncio.to_thread(load_tools, self.cfg.tools_dir, self.tool_registry) # Prompts tool_info = [ {"name": td.name, "description": td.description} for td in self.tool_registry._tools.values() ] renderer = PromptRenderer( self.cfg.system_prompt_file, default_extras={ "tools": tool_info, "hosting_provider": "Distributed", "server_location": "Inference Node", }, ) # Connect LLM Provider self.openrouter = OpenRouterClient( api_key=self.cfg.api_key, model=self.cfg.model, temperature=self.cfg.temperature, max_tokens=self.cfg.max_tokens, top_p=self.cfg.top_p, tool_registry=self.tool_registry, base_url=self.cfg.llm_base_url, gemini_api_key=self.cfg.gemini_api_key, ) # Redis Caches and Managers if self.cfg.redis_sentinels: ssl_kw = self.cfg.redis_ssl_kwargs() elif self.cfg.redis_url: ssl_kw = self.cfg.redis_connection_kwargs_for_url(self.cfg.redis_url) else: ssl_kw = {} self.message_cache = MessageCache( redis_url=self.cfg.redis_url, openrouter_client=self.openrouter, embedding_model=self.cfg.embedding_model, ssl_kwargs=ssl_kw, redis_sentinels=self.cfg.redis_sentinels, redis_sentinel_master=self.cfg.redis_sentinel_master, resilience_kwargs=self.cfg.redis_resilience_kwargs(), ) # Attach shared redis client to reuse connection pool # In a real environment we might use message_cache.redis_client for everything redis_client = self.message_cache.redis_client from observability import set_observability_redis set_observability_redis(redis_client) self.conversation_mgr = ConversationManager( prompt_renderer=renderer, max_history=self.cfg.max_history, redis=redis_client, ) self.kg_manager = KnowledgeGraphManager( redis_client=redis_client, openrouter=self.openrouter, embedding_model=self.cfg.embedding_model, ) sword_monitor, sword_graph_manager = build_sword_system( cfg=self.cfg, redis_client=redis_client, openrouter=self.openrouter, kg_manager=self.kg_manager, ) if sword_monitor is not None: self.kg_manager._sword_monitor = sword_monitor self.auto_search = RAGAutoSearchManager(redis_client) self.web_search = WebSearchContextManager( redis_client, default_api_key=self.cfg.api_key, config=self.cfg, ) self.threadweave = ThreadweaveManager( redis_client=redis_client, openrouter=self.openrouter, embedding_model=self.cfg.embedding_model, dna_vault_path=self.cfg.dna_vault_path, ) self.task_manager = TaskManager(timeout=10.0, redis=redis_client) self.tool_registry.task_manager = self.task_manager self.embedding_queue = EmbeddingBatchQueue( openrouter=self.openrouter, redis=self.message_cache.redis_raw_client, model=self.cfg.embedding_model, flush_interval=self.cfg.embedding_flush_interval, max_batch_size=self.cfg.embedding_batch_size, ) await self.embedding_queue.start() # Optional features self.persona_pref_manager = None self.classifier = None # Event bus must exist before the StatusManager (its presence updates # are published through a worker-side proxy adapter over this bus). self.event_bus = RedisEventBus( redis=self.message_cache.redis_raw_client, # must be decode_responses=False for binary msgpack node_role="worker", node_id=self.instance_id ) await self.event_bus.ensure_streams() # Wrap the fully-loaded local registry so non-pinned tools can be # delegated to the dedicated ``tools`` service. Dormant by default # (``tools_service_mode == "in_process"``): ``start()`` is a no-op, the # read surface stays the local registry, and every ``call()`` runs # locally — byte-identical to before the split. Flipping the mode to # ``remote`` activates delegation + the catalog read path. The executor # reaches the registry through ``openrouter``, so we swap that handle. from core.remote_tool_registry import RemoteToolRegistry self._local_tool_registry = self.tool_registry self.tool_registry = RemoteToolRegistry( event_bus=self.event_bus, redis=self.message_cache.redis_raw_client, config=self.cfg, local_registry=self._local_tool_registry, worker_id=self.instance_id, ) await self.tool_registry.start() self.openrouter.tool_registry = self.tool_registry # AI-generated presence/status. Generation (LLM) happens here on the # worker; application (change_presence) happens on the Gateway. The # StatusManager publishes a ``presence`` action via the discord proxy # adapter, which the Gateway's outbound consumer applies to the live # client. The reply-driven ``<dstatus>`` tag path runs through the # MessageProcessor (status_manager.set_status_from_tag); the periodic # cycling loop is leader-gated so only one worker drives it. self.status_manager = StatusManager( openrouter=self.openrouter, discord_adapter=self.get_adapter("discord"), leader_guard=self._status_leader_guard, ) # Visual Memory Engine — cross-channel image recognition # 👀🔥 self.visual_memory = None if getattr(self.cfg, "visual_memory_enabled", True): try: from visual_memory import VisualMemoryEngine self.visual_memory = VisualMemoryEngine( kg_manager=self.kg_manager, redis_client=redis_client, config=self.cfg, ) logger.info("VisualMemoryEngine initialized (models lazy-loaded on first image)") except ImportError: logger.debug("visual_memory module not available") except Exception: logger.warning("VisualMemoryEngine init failed", exc_info=True) self.processor = MessageProcessor( config=self.cfg, conversation_manager=self.conversation_mgr, openrouter=self.openrouter, message_cache=self.message_cache, kg_manager=self.kg_manager, auto_search=self.auto_search, task_manager=self.task_manager, classifier=self.classifier, threadweave=self.threadweave, embedding_queue=self.embedding_queue, web_search=self.web_search, persona_pref_manager=self.persona_pref_manager, sword_graph_manager=sword_graph_manager, status_manager=self.status_manager, visual_memory=self.visual_memory, ) self.status_manager.start() logger.info("StatusManager started (presence published via event bus)") # Start Inbound Consumer self.consumer = InboundStreamConsumer( redis=self.message_cache.redis_raw_client, # must be decode_responses=False for xreadgroup binary payloads consumer_name=self.instance_id, process_fn=self._process_message ) await self.consumer.start() logger.info("InboundStreamConsumer started") # Start Heartbeat Scheduler self.scheduler = BackgroundScheduler() self.scheduler.register( "channel_heartbeat", channel_heartbeat_task, interval=86400, initial_delay=120, args=(self,), ) await self.scheduler.start() logger.info("Heartbeat scheduler started") # Start config updates listener self._config_listener_task = asyncio.create_task( self._listen_config_updates(), name="sg-inference-config-listener" ) logger.info("Config update pubsub listener started")
async def _process_message(self, payload: dict[str, Any]) -> None: """Route message to processor using ProxyPlatformAdapter. Reconstructs a proper IncomingMessage dataclass from the deserialized InboundEnvelope dict before handing off to the processor. The processor has a hard contract of IncomingMessage — it mutates .extra, .text, and resolves .unified_user_id in-place via _resolve_identity. Passing a raw dict would crash immediately on the first attribute access. """ msg = _incoming_message_from_envelope(payload) trace_id = payload.get("trace_id") proxy = ProxyPlatformAdapter(self.event_bus, msg.platform, trace_id=trace_id) await self.processor.handle_message(msg, proxy) async def _listen_config_updates(self) -> None: """Subscribe to ``sg:pubsub:config`` and hot-reload config on change. Long-running task that subscribes to the ``sg:pubsub:config`` Redis Pub/Sub channel and, on each ``sys:config:updated`` event, reloads ``config.yaml`` via :meth:`config.Config.load` and re-applies it across the live worker: it swaps ``self.cfg``, repoints the processor's config (and its context builder), and pushes any updated tool permissions onto the :class:`~tools.ToolRegistry`. This lets a running worker pick up admin config edits without a restart. Per-message decode errors are logged and skipped so one bad payload does not kill the listener. Reads from Redis Pub/Sub (parsing payloads with the stdlib :mod:`json`) and mutates this service's config/processor/tool-registry state in place. Spawned as a background task by :meth:`on_start` (stored on ``self._config_listener_task``) and cancelled in :meth:`on_stop`; the ``finally`` block unsubscribes and closes the pubsub connection. """ pubsub = self.redis.pubsub() try: await pubsub.subscribe("sg:pubsub:config") logger.info("Subscribed to sg:pubsub:config channel") async for message in pubsub.listen(): if message["type"] != "message": continue try: data = message.get("data") if isinstance(data, bytes): data = data.decode("utf-8") payload = json.loads(data) if payload.get("event") == "sys:config:updated": logger.info("Received config update event, reloading config.yaml") # Reload config self.cfg = Config.load() # Also update processor config reference if self.processor: self.processor.config = self.cfg if self.processor._ctx_builder: self.processor._ctx_builder.config = self.cfg if self.cfg.tool_permissions: self.tool_registry.set_permissions(self.cfg.tool_permissions) else: self.tool_registry.set_permissions({}) logger.info("Config reloaded and tool permissions updated successfully") except Exception as e: logger.error(f"Error processing config update message: {e}", exc_info=True) except asyncio.CancelledError: logger.info("Config updates listener task cancelled") except Exception: logger.exception("Config updates listener crashed") finally: try: await pubsub.unsubscribe("sg:pubsub:config") await pubsub.aclose() except Exception: pass
[docs] async def run(self) -> None: """Block the service's main loop until shutdown is requested. The worker does no polling of its own here -- message processing is driven by the :class:`~core.stream_consumer.InboundStreamConsumer` and background tasks launched in :meth:`on_start` -- so this simply awaits the internal :data:`_stop_event`, which :meth:`on_stop` sets during shutdown. Implements the abstract ``run`` declared on :class:`~core.service_base.StargazerService`; called by :func:`main` after ``boot`` and awaited directly by ``tests/core/migration/test_service_entrypoints.py``. """ await self._stop_event.wait()
[docs] async def on_stop(self) -> None: """Tear down the worker's background tasks and managers on shutdown. Implements the base service's ``on_stop`` hook, invoked from :meth:`StargazerService.shutdown`. It signals :data:`_stop_event` to release :meth:`run`, cancels and awaits the config-update listener task, and then stops the live collaborators in turn: the :class:`~status_manager.StatusManager`, the :class:`~core.stream_consumer.InboundStreamConsumer`, the heartbeat :class:`BackgroundScheduler`, and the :class:`~embedding_queue.EmbeddingBatchQueue`. Each shutdown is guarded so a partially-started worker tears down cleanly. Called only via the base service's shutdown path (no direct callers). """ logger.info("Shutting down InferenceService...") self._stop_event.set() if self._config_listener_task and not self._config_listener_task.done(): logger.info("Stopping config updates listener...") self._config_listener_task.cancel() try: await self._config_listener_task except asyncio.CancelledError: pass logger.info("Config updates listener stopped.") if self.status_manager: self.status_manager.stop() if self.consumer: await self.consumer.stop() _reg = getattr(self, "tool_registry", None) if _reg is not None and hasattr(_reg, "stop"): try: await _reg.stop() except Exception: logger.debug("RemoteToolRegistry stop failed", exc_info=True) if self.scheduler: await self.scheduler.stop() if self.embedding_queue: await self.embedding_queue.stop()
[docs] async def main() -> None: """Process entry point that boots and runs a single inference worker. Configures root logging, loads the :class:`~config.Config`, mints a unique ``worker-{hex}`` instance id, and builds the binary (``decode_responses=False``) async Redis client the service requires for msgpack stream payloads. It then constructs an :class:`InferenceService`, installs ``SIGINT``/``SIGTERM`` handlers that trigger :meth:`StargazerService.shutdown`, and drives the service through :meth:`~StargazerService.boot` (which calls :meth:`InferenceService.on_start`) and :meth:`InferenceService.run`, which blocks until the stop event fires. The Redis client is always closed in the ``finally`` block. Called as the module's ``__main__`` body via ``asyncio.run(main())`` when launched standalone (``python inference_main.py``, e.g. from the systemd unit), and awaited directly by ``tests/core/migration/test_service_entrypoints.py``. """ logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s" ) cfg = Config.load() instance_id = f"worker-{uuid.uuid4().hex[:8]}" redis_client = cfg.build_async_redis_client(decode_responses=False) service = InferenceService(cfg, redis_client, instance_id) loop = asyncio.get_running_loop() for sig in (signal.SIGINT, signal.SIGTERM): loop.add_signal_handler(sig, lambda: asyncio.create_task(service.shutdown())) try: await service.boot() await service.run() finally: if redis_client: await redis_client.aclose()
if __name__ == "__main__": asyncio.run(main())