inference_main

Inference service entry point — the LLM message-processing worker.

The Inference service runs InferenceService, a StargazerService that does the heavy work the monolith used to do, but as a horizontally scalable, stateless worker:

  • Consumes the sg:stream:inbound Redis stream via InboundStreamConsumer (consumer group sg:workers), which acquires a per-channel DistributedLock so messages for one channel are processed serially.

  • Reconstructs the IncomingMessage from the envelope (_incoming_message_from_envelope(), the inverse of the gateway’s codec) and hands it to message_processor.processor.MessageProcessor.handle_message().

  • Supplies a ProxyPlatformAdapter in place of a real platform client: every send/send_file/reaction the processor performs is published to sg:stream:outbound:{platform} for the Gateway to dispatch. Tools needing raw SDK access are delegated back to the Gateway over the bus.

  • Wires up the processor’s collaborators: ConversationManager, MessageCache, KnowledgeGraphManager, RAGAutoSearchManager, OpenRouterClient, ToolRegistry, the SWORD system, and a leader-gated StatusManager for presence updates.

Launched standalone (python inference_main.py) by scripts/systemd/stargazer-inference.service; scale by running multiple instances (they share the sg:workers consumer group).

class inference_main.InferenceService(config, redis_client, instance_id, use_health_server=True)[source]

Bases: StargazerService

Stateless LLM message-processing worker for the inference service.

A StargazerService subclass that consumes the sg:stream:inbound Redis stream, reconstructs each IncomingMessage from its envelope, and hands it to a MessageProcessor. Because the worker holds no real platform SDK, every outbound action is published via a ProxyPlatformAdapter over a RedisEventBus for the Gateway to dispatch. Multiple instances scale horizontally by sharing the sg:workers consumer group; presence cycling is leader-gated through _status_leader_guard(). See the module docstring for the full collaborator wiring.

Parameters:
__init__(config, redis_client, instance_id, use_health_server=True)[source]

Construct an inference worker, registering it as the "inference" service.

Stores the loaded Config and pre-declares the worker’s collaborators (consumer, processor, event bus, scheduler, status manager) as None so they can be wired up later in on_start(). The actual Redis connection, tool/model/processor initialization, and stream consumer all happen in on_start(); this only sets up bookkeeping state and the internal stop event used by run()/on_stop().

Delegates to StargazerService.__init__() with service_name="inference" and redis_required=True, which sets up the shared service lifecycle and (when use_health_server is set) the health server. Called from main() with a freshly built async Redis client, and directly from the migration tests in tests/core/migration.

Parameters:
  • config (Config) – The loaded application configuration supplying API keys, model settings, Redis connection details, and tool permissions.

  • redis_client (Any) – A pre-built async Redis client (decode-disabled) handed to the base service; further per-component clients are derived in on_start().

  • instance_id (str) – Unique worker identifier (e.g. "worker-ab12cd34") used as the consumer name in the sg:workers group and as the status-leader lease value.

  • use_health_server (bool) – When True, the base service stands up its HTTP health endpoint; tests pass False to skip it.

get_adapter(platform_name)[source]

Return a worker-side proxy adapter for a platform.

Since the inference worker holds no real platform SDK client, every adapter is a ProxyPlatformAdapter bound to this service’s event_bus; its send/send_file/reaction calls are published to sg:stream:outbound:{platform} for the Gateway to dispatch against the live client. A new adapter instance is created per call (they are cheap, stateless wrappers over the shared bus).

Called by on_start() to supply the "discord" adapter the StatusManager uses to publish presence actions, and by the migration tests in tests/core/migration. (The same method name is provided by the other service entry points so collaborators like the status manager and heartbeat tasks can request an adapter uniformly.)

Parameters:

platform_name (str) – The platform identifier (e.g. "discord") selecting the outbound stream the returned adapter publishes to.

Returns:

A fresh proxy adapter bound to self.event_bus and platform_name.

Return type:

ProxyPlatformAdapter

async on_start()[source]

Initialize models, tools, and the message processor for the worker.

The heavy startup phase, invoked by StargazerService.boot() after the Redis ping. It builds the full processing stack: discovers and loads tools into a ToolRegistry, constructs the PromptRenderer, connects the LLM via OpenRouterClient, and stands up the Redis-backed managers (MessageCache, ConversationManager, KnowledgeGraphManager, the SWORD system, RAG/web-search/threadweave/task managers, and the embedding queue). It wires a RedisEventBus and a leader-gated StatusManager (whose presence updates publish through the discord ProxyPlatformAdapter), assembles the MessageProcessor, then starts the InboundStreamConsumer (bound to _process_message()), the heartbeat BackgroundScheduler, and the config-update pubsub listener (_listen_config_updates()).

Side effects are extensive: it opens Redis connections, registers the observability Redis client, ensures the bus streams exist, and launches several long-lived background tasks. Called only by the base service’s boot sequence (no direct callers); the migration tests in tests/core/migration drive it through boot as well.

Return type:

None

async run()[source]

Block the service’s main loop until shutdown is requested.

The worker does no polling of its own here – message processing is driven by the InboundStreamConsumer and background tasks launched in on_start() – so this simply awaits the internal _stop_event, which on_stop() sets during shutdown. Implements the abstract run declared on StargazerService; called by main() after boot and awaited directly by tests/core/migration/test_service_entrypoints.py.

Return type:

None

async on_stop()[source]

Tear down the worker’s background tasks and managers on shutdown.

Implements the base service’s on_stop hook, invoked from StargazerService.shutdown(). It signals _stop_event to release run(), cancels and awaits the config-update listener task, and then stops the live collaborators in turn: the StatusManager, the InboundStreamConsumer, the heartbeat BackgroundScheduler, and the EmbeddingBatchQueue. Each shutdown is guarded so a partially-started worker tears down cleanly. Called only via the base service’s shutdown path (no direct callers).

Return type:

None

async inference_main.main()[source]

Process entry point that boots and runs a single inference worker.

Configures root logging, loads the Config, mints a unique worker-{hex} instance id, and builds the binary (decode_responses=False) async Redis client the service requires for msgpack stream payloads. It then constructs an InferenceService, installs SIGINT/SIGTERM handlers that trigger StargazerService.shutdown(), and drives the service through boot() (which calls InferenceService.on_start()) and InferenceService.run(), which blocks until the stop event fires. The Redis client is always closed in the finally block.

Called as the module’s __main__ body via asyncio.run(main()) when launched standalone (python inference_main.py, e.g. from the systemd unit), and awaited directly by tests/core/migration/test_service_entrypoints.py.

Return type:

None