inference_main
Inference service entry point — the LLM message-processing worker.
The Inference service runs InferenceService, a
StargazerService that does the heavy work the
monolith used to do, but as a horizontally scalable, stateless worker:
Consumes the
sg:stream:inboundRedis stream viaInboundStreamConsumer(consumer groupsg:workers), which acquires a per-channelDistributedLockso messages for one channel are processed serially.Reconstructs the
IncomingMessagefrom the envelope (_incoming_message_from_envelope(), the inverse of the gateway’s codec) and hands it tomessage_processor.processor.MessageProcessor.handle_message().Supplies a
ProxyPlatformAdapterin place of a real platform client: everysend/send_file/reaction the processor performs is published tosg:stream:outbound:{platform}for the Gateway to dispatch. Tools needing raw SDK access are delegated back to the Gateway over the bus.Wires up the processor’s collaborators:
ConversationManager,MessageCache,KnowledgeGraphManager,RAGAutoSearchManager,OpenRouterClient,ToolRegistry, the SWORD system, and a leader-gatedStatusManagerfor presence updates.
Launched standalone (python inference_main.py) by
scripts/systemd/stargazer-inference.service; scale by running multiple
instances (they share the sg:workers consumer group).
- class inference_main.InferenceService(config, redis_client, instance_id, use_health_server=True)[source]
Bases:
StargazerServiceStateless LLM message-processing worker for the inference service.
A
StargazerServicesubclass that consumes thesg:stream:inboundRedis stream, reconstructs eachIncomingMessagefrom its envelope, and hands it to aMessageProcessor. Because the worker holds no real platform SDK, every outbound action is published via aProxyPlatformAdapterover aRedisEventBusfor the Gateway to dispatch. Multiple instances scale horizontally by sharing thesg:workersconsumer group; presence cycling is leader-gated through_status_leader_guard(). See the module docstring for the full collaborator wiring.- __init__(config, redis_client, instance_id, use_health_server=True)[source]
Construct an inference worker, registering it as the
"inference"service.Stores the loaded
Configand pre-declares the worker’s collaborators (consumer, processor, event bus, scheduler, status manager) asNoneso they can be wired up later inon_start(). The actual Redis connection, tool/model/processor initialization, and stream consumer all happen inon_start(); this only sets up bookkeeping state and the internal stop event used byrun()/on_stop().Delegates to
StargazerService.__init__()withservice_name="inference"andredis_required=True, which sets up the shared service lifecycle and (whenuse_health_serveris set) the health server. Called frommain()with a freshly built async Redis client, and directly from the migration tests intests/core/migration.- Parameters:
config (
Config) – The loaded application configuration supplying API keys, model settings, Redis connection details, and tool permissions.redis_client (
Any) – A pre-built async Redis client (decode-disabled) handed to the base service; further per-component clients are derived inon_start().instance_id (
str) – Unique worker identifier (e.g."worker-ab12cd34") used as the consumer name in thesg:workersgroup and as the status-leader lease value.use_health_server (
bool) – When True, the base service stands up its HTTP health endpoint; tests pass False to skip it.
- get_adapter(platform_name)[source]
Return a worker-side proxy adapter for a platform.
Since the inference worker holds no real platform SDK client, every adapter is a
ProxyPlatformAdapterbound to this service’sevent_bus; itssend/send_file/reaction calls are published tosg:stream:outbound:{platform}for the Gateway to dispatch against the live client. A new adapter instance is created per call (they are cheap, stateless wrappers over the shared bus).Called by
on_start()to supply the"discord"adapter theStatusManageruses to publish presence actions, and by the migration tests intests/core/migration. (The same method name is provided by the other service entry points so collaborators like the status manager and heartbeat tasks can request an adapter uniformly.)- Parameters:
platform_name (
str) – The platform identifier (e.g."discord") selecting the outbound stream the returned adapter publishes to.- Returns:
A fresh proxy adapter bound to
self.event_busandplatform_name.- Return type:
- async on_start()[source]
Initialize models, tools, and the message processor for the worker.
The heavy startup phase, invoked by
StargazerService.boot()after the Redis ping. It builds the full processing stack: discovers and loads tools into aToolRegistry, constructs thePromptRenderer, connects the LLM viaOpenRouterClient, and stands up the Redis-backed managers (MessageCache,ConversationManager,KnowledgeGraphManager, the SWORD system, RAG/web-search/threadweave/task managers, and the embedding queue). It wires aRedisEventBusand a leader-gatedStatusManager(whose presence updates publish through the discordProxyPlatformAdapter), assembles theMessageProcessor, then starts theInboundStreamConsumer(bound to_process_message()), the heartbeatBackgroundScheduler, and the config-update pubsub listener (_listen_config_updates()).Side effects are extensive: it opens Redis connections, registers the observability Redis client, ensures the bus streams exist, and launches several long-lived background tasks. Called only by the base service’s
bootsequence (no direct callers); the migration tests intests/core/migrationdrive it throughbootas well.- Return type:
- async run()[source]
Block the service’s main loop until shutdown is requested.
The worker does no polling of its own here – message processing is driven by the
InboundStreamConsumerand background tasks launched inon_start()– so this simply awaits the internal_stop_event, whichon_stop()sets during shutdown. Implements the abstractrundeclared onStargazerService; called bymain()afterbootand awaited directly bytests/core/migration/test_service_entrypoints.py.- Return type:
- async on_stop()[source]
Tear down the worker’s background tasks and managers on shutdown.
Implements the base service’s
on_stophook, invoked fromStargazerService.shutdown(). It signals_stop_eventto releaserun(), cancels and awaits the config-update listener task, and then stops the live collaborators in turn: theStatusManager, theInboundStreamConsumer, the heartbeatBackgroundScheduler, and theEmbeddingBatchQueue. Each shutdown is guarded so a partially-started worker tears down cleanly. Called only via the base service’s shutdown path (no direct callers).- Return type:
- async inference_main.main()[source]
Process entry point that boots and runs a single inference worker.
Configures root logging, loads the
Config, mints a uniqueworker-{hex}instance id, and builds the binary (decode_responses=False) async Redis client the service requires for msgpack stream payloads. It then constructs anInferenceService, installsSIGINT/SIGTERMhandlers that triggerStargazerService.shutdown(), and drives the service throughboot()(which callsInferenceService.on_start()) andInferenceService.run(), which blocks until the stop event fires. The Redis client is always closed in thefinallyblock.Called as the module’s
__main__body viaasyncio.run(main())when launched standalone (python inference_main.py, e.g. from the systemd unit), and awaited directly bytests/core/migration/test_service_entrypoints.py.- Return type: