Source code for agents_main

"""Agents service entry point — scheduled background work and agents.

Runs :class:`AgentsService`, a :class:`~core.service_base.StargazerService`
dedicated to recurring/background jobs so they never compete with the
latency-sensitive Inference workers. It owns the cluster-singleton
:class:`~background_tasks.TaskSupervisor` (leader-elected via a Redis lock,
so exactly one instance runs each cron job) and schedules tasks through a
:class:`~background_tasks.BackgroundScheduler`, including:

* Knowledge-graph maintenance: auto KG extraction, agentic bulk
  incremental extraction, and channel summarization.
* Limbic/NCM upkeep: maintenance, dedup, anchoring, sunset, and the
  journal stream.
* Scheduled prompt ticks/cleanup and Gemini key probing.

It also boots optional background agents (:class:`DyadicEvaluator`,
:class:`GameArtAgent`, :class:`GameTurnAgent`), the SWORD subsystem
(:func:`sword.factory.build_sword_system`), and the StarWiki service, and
persists embeddings asynchronously via :class:`EmbeddingBatchQueue`.

Launched standalone (``python agents_main.py``) by
``scripts/systemd/stargazer-agents.service``.
"""

import asyncio
import logging
from typing import Any
from core.service_base import StargazerService
from config import Config
from message_cache import MessageCache
from knowledge_graph import KnowledgeGraphManager
from openrouter_client import OpenRouterClient
from core.event_bus import RedisEventBus
from core.proxy_adapter import ProxyPlatformAdapter
from embedding_queue import EmbeddingBatchQueue
from starwiki.service import StarwikiService
from background_agents.dyadic_evaluator import DyadicEvaluator
from background_agents.desire_check_agent import DesireCheckAgent
from background_agents.game_art_agent import GameArtAgent
from background_agents.game_turn_agent import GameTurnAgent
from background_tasks import (
    BackgroundScheduler,
    TaskSupervisor,
    scheduled_prompt_tick,
    scheduled_prompt_cleanup,
    auto_kg_extraction,
    channel_summarization,
    log_rag_ingest_task,
    gemini_key_probe_task,
    agentic_kg_bulk_incremental_task,
    limbic_maintenance_task,
    limbic_dedup_task,
    limbic_org_task,
    journal_stream_task,
    limbic_anchoring_task,
    limbic_sunset_task,
    anamnesis_digest_task,
    channel_heartbeat_task,
    starwiki_lint_task,
)
from sword.factory import build_sword_system

logger = logging.getLogger("agents_service")

[docs] class AgentsService(StargazerService): """Background-work microservice running scheduled jobs and agents. Concrete :class:`~core.service_base.StargazerService` for the agents tier. It isolates recurring/LLM-heavy work (KG extraction, limbic upkeep, journaling, key probing, channel summarization) from the latency-sensitive inference workers by owning a leader-elected :class:`~background_tasks.TaskSupervisor` and a :class:`~background_tasks.BackgroundScheduler`, plus optional background agents (dyadic evaluator, game art/turn agents), the SWORD subsystem and StarWiki. Instances are constructed by :func:`main` (and by the migration tests under ``tests/core/migration/``); the base class drives the lifecycle by calling :meth:`on_start`, :meth:`run` and :meth:`on_stop` from ``boot``/``shutdown``. """
[docs] def __init__(self, config: Config, redis_client: Any, instance_id: str): """Initialize the agents service and declare its component slots. Wires up the base :class:`~core.service_base.StargazerService` with ``service_name="agents"`` and a required Redis client, then sets every heavyweight collaborator (message cache, KG manager, OpenRouter client, supervisor, embedding queue, event bus, background agents) to ``None`` so they can be lazily built in :meth:`on_start`. The :class:`~background_tasks.BackgroundScheduler` is the only collaborator constructed eagerly here. Interactions: calls ``super().__init__`` on :class:`~core.service_base.StargazerService` (which may create a ``HealthServer``) and constructs a :class:`~background_tasks.BackgroundScheduler`. No Redis or network I/O happens at construction time. Called by :func:`main` when booting the standalone process, and directly by the migration tests in ``tests/core/migration/`` that exercise the service in isolation. Args: config (Config): Loaded application configuration supplying Redis, OpenRouter/Gemini, model and feature-flag settings; stored as ``self.cfg``. redis_client (Any): Async Redis client (decoded responses off) used for service registration, the supervisor lock and scheduled tasks. instance_id (str): Unique id for this process (e.g. ``agents-<hex>``) used for service registration, the event-bus node id and the supervisor's leader-election identity. """ super().__init__( service_name="agents", instance_id=instance_id, redis_client=redis_client, redis_required=True ) self.cfg = config self.message_cache = None self.kg_manager = None self.openrouter = None self.scheduler = BackgroundScheduler() self.supervisor = None self.embedding_queue = None self.starwiki_service = None self.event_bus = None self.dyadic_evaluator = None self.desire_check_agent = None self.game_art_agent = None self.game_turn_agent = None self._background_agent_tasks = []
[docs] def get_adapter(self, platform_name: str) -> ProxyPlatformAdapter: """Return a proxy platform adapter that sends via the event bus. Builds a :class:`~core.proxy_adapter.ProxyPlatformAdapter` bound to this service's :class:`~core.event_bus.RedisEventBus` so background agents can emit outbound platform actions (messages, etc.) without owning a real platform connection; the inference/gateway tier consumes those events and performs the actual send. Interactions: constructs a fresh :class:`~core.proxy_adapter.ProxyPlatformAdapter` over ``self.event_bus`` each call (no caching). The bus must already be initialized in :meth:`on_start`; calling this before ``on_start`` yields an adapter over a ``None`` bus. Called by :meth:`on_start` to obtain the ``"discord"`` adapter for the :class:`GameArtAgent` and :class:`GameTurnAgent`, and by the agents migration tests; mirrors the ``get_adapter`` method on the inference and web services. Args: platform_name (str): Logical platform key (e.g. ``"discord"``) the adapter will tag its outbound events with. Returns: ProxyPlatformAdapter: Event-bus-backed adapter for ``platform_name``. """ return ProxyPlatformAdapter(self.event_bus, platform_name)
[docs] async def on_start(self) -> None: """Build collaborators and register all scheduled and supervised work. Implements the abstract ``on_start`` of :class:`~core.service_base.StargazerService`. Constructs the :class:`~openrouter_client.OpenRouterClient`, :class:`~message_cache.MessageCache`, :class:`~knowledge_graph.KnowledgeGraphManager`, :class:`~embedding_queue.EmbeddingBatchQueue`, :class:`~core.event_bus.RedisEventBus` and :class:`~background_tasks.TaskSupervisor`, then registers the supervised ``limbic_dedup_worker`` and a fleet of periodic jobs on the :class:`~background_tasks.BackgroundScheduler` (prompt tick/cleanup, auto and agentic-bulk KG extraction, channel summarization, optional log RAG ingest, Gemini key probe, limbic maintenance/anchoring/sunset, journal stream, anamnesis digest, and — when enabled — StarWiki). Finally it starts the supervisor and scheduler, kicks off SWORD bootstrap in the background, and starts the optional dyadic/game agents. Interactions: resolves SSL/connection kwargs from :class:`~config.Config`; calls ``set_observability_redis`` from :mod:`observability`; awaits ``embedding_queue.start()``, ``event_bus.ensure_streams()``, ``supervisor.start()`` and ``scheduler.start()``; builds the SWORD system via :func:`sword.factory.build_sword_system` and, in a fire-and-forget :func:`asyncio.create_task` (``_bootstrap_sword``), ensures SWORD indexes and loads Origin fragments from ``system_prompt.j2`` before attaching the monitor to the KG manager. Conditionally constructs and starts :class:`StarwikiService`, :class:`DyadicEvaluator`, :class:`GameArtAgent` and :class:`GameTurnAgent` (the latter two over the ``"discord"`` adapter from :meth:`get_adapter`); agent startup failures are logged and swallowed so one failing agent cannot abort boot. Mutates the instance attributes declared in :meth:`__init__`. Called by ``StargazerService.boot`` (Phase 3-7) after the Redis connection is verified; never invoked directly outside the migration tests. Raises: Exception: Propagates failures from core collaborator construction or the awaited ``start``/``ensure_streams`` calls (which would abort boot); per-agent and SWORD-bootstrap errors are caught and logged instead. """ redis_client = self.redis if self.cfg.redis_sentinels: ssl_kw = self.cfg.redis_ssl_kwargs() elif self.cfg.redis_url: ssl_kw = self.cfg.redis_connection_kwargs_for_url(self.cfg.redis_url) else: ssl_kw = {} self.openrouter = OpenRouterClient( api_key=self.cfg.openrouter_api_key, model=self.cfg.model, temperature=self.cfg.temperature, max_tokens=self.cfg.max_tokens, top_p=self.cfg.top_p, base_url=self.cfg.llm_base_url, gemini_api_key=self.cfg.gemini_api_key, ) self.message_cache = MessageCache( redis_url=self.cfg.redis_url, openrouter_client=self.openrouter, embedding_model=self.cfg.embedding_model, ssl_kwargs=ssl_kw, redis_sentinels=self.cfg.redis_sentinels, redis_sentinel_master=self.cfg.redis_sentinel_master, resilience_kwargs=self.cfg.redis_resilience_kwargs(), ) from observability import set_observability_redis set_observability_redis(self.message_cache.redis_client) self.kg_manager = KnowledgeGraphManager( redis_client=redis_client, openrouter=self.openrouter, embedding_model=self.cfg.embedding_model, ) self.embedding_queue = EmbeddingBatchQueue( openrouter=self.openrouter, redis=self.message_cache.redis_raw_client, model=self.cfg.embedding_model, flush_interval=self.cfg.embedding_flush_interval, max_batch_size=self.cfg.embedding_batch_size, ) await self.embedding_queue.start() if getattr(self.cfg, "starwiki_enabled", False): self.starwiki_service = StarwikiService( self.cfg, self.message_cache.redis_client, self.openrouter, ) await self.starwiki_service.ensure_ready() self.event_bus = RedisEventBus( redis=self.message_cache.redis_raw_client, node_role="worker", node_id=self.instance_id, stream_maxlen=getattr(self.cfg, "redis_stream_maxlen", 100_000), ) await self.event_bus.ensure_streams() self.supervisor = TaskSupervisor(redis=redis_client, node_id=self.instance_id) # Supervised tasks self.supervisor.register_task( "limbic_dedup_worker", lambda: limbic_dedup_task(self) ) self.supervisor.register_task( "limbic_org_worker", lambda: limbic_org_task(self) ) # Scheduled periodic tasks self.scheduler.register("scheduled_prompt_tick", scheduled_prompt_tick, interval=60, initial_delay=10, args=(redis_client,)) self.scheduler.register("scheduled_prompt_cleanup", scheduled_prompt_cleanup, interval=172800, initial_delay=300, args=(redis_client,)) if getattr(self.cfg, "legacy_kg_extraction", False): self.scheduler.register("auto_kg_extraction", auto_kg_extraction, interval=14400, initial_delay=300, args=(redis_client, self.kg_manager, self.openrouter, self.cfg)) self.scheduler.register("channel_summarization", channel_summarization, interval=21600, initial_delay=600, args=(redis_client,)) if getattr(self.cfg, "background_scheduler_log_rag_ingest_enabled", False): self.scheduler.register("log_rag_ingest", log_rag_ingest_task, interval=21600, initial_delay=30) self.scheduler.register("gemini_key_probe", gemini_key_probe_task, interval=7200, initial_delay=60) self.scheduler.register("agentic_kg_bulk_incremental", agentic_kg_bulk_incremental_task, interval=14400, initial_delay=900, args=(redis_client, self.kg_manager, self.openrouter)) self.scheduler.register("limbic_maintenance", limbic_maintenance_task, interval=604800, initial_delay=7200, args=(self,)) self.scheduler.register("journal_stream", journal_stream_task, interval=10, initial_delay=5, args=(redis_client,)) # New background task registrations self.scheduler.register("limbic_anchoring_worker", limbic_anchoring_task, interval=60, initial_delay=15, args=(self,)) self.scheduler.register("limbic_sunset_processor", limbic_sunset_task, interval=3600, initial_delay=3600, args=(self,)) self.scheduler.register("anamnesis_digest", anamnesis_digest_task, interval=1200, initial_delay=300, args=(redis_client, self.kg_manager, self.openrouter, self.cfg)) if getattr(self.cfg, "starwiki_enabled", False): self.scheduler.register("starwiki_lint", starwiki_lint_task, interval=3600, initial_delay=600, args=(self,)) await self.supervisor.start() await self.scheduler.start() # SWORD SWAOM system integration sword_monitor, sword_graph_manager = build_sword_system( cfg=self.cfg, redis_client=redis_client, openrouter=self.openrouter, kg_manager=self.kg_manager, ) if sword_monitor is not None and sword_graph_manager is not None: async def _bootstrap_sword(): """Verify SWORD indexes, load Origins, then attach the monitor. Background coroutine that prepares the SWORD subsystem off the critical boot path so index creation and Origin parsing never delay service startup. Only after both succeed does it wire the monitor onto the KG manager. Interactions: awaits ``sword_graph_manager.ensure_sword_indexes()``, constructs an :class:`sword.origin_parser.OriginParser` and awaits ``parser.parse_and_load("system_prompt.j2")`` to load Origin fragments, then sets ``self.kg_manager._sword_monitor`` to the ``sword_monitor`` captured from the enclosing :meth:`on_start` scope. Any failure is logged at warning level and otherwise ignored, leaving the monitor unattached. Called by :meth:`on_start` via ``asyncio.create_task(..., name="sword_bootstrap_background")``, only when :func:`sword.factory.build_sword_system` returned a non-``None`` monitor and graph manager. """ try: logger.info("Verifying SWORD subgraph indexes (background)...") await sword_graph_manager.ensure_sword_indexes() logger.info("SWORD subgraph indexes verified") from sword.origin_parser import OriginParser parser = OriginParser(sword_graph_manager) system_prompt_path = "system_prompt.j2" logger.info("Parsing and loading SWORD Origin fragments...") await parser.parse_and_load(system_prompt_path) logger.info("SWORD Origin fragments parsed and loaded successfully") # Attach the monitor only AFTER indexes and origins are ready self.kg_manager._sword_monitor = sword_monitor except Exception: logger.warning("Could not create SWORD indexes or load Origins (background)", exc_info=True) asyncio.create_task(_bootstrap_sword(), name="sword_bootstrap_background") # Start background agents if self.message_cache is not None: try: self.dyadic_evaluator = DyadicEvaluator( redis_client=self.message_cache.redis_client, cache_redis=self.message_cache.redis_client, ) await self.dyadic_evaluator.start() logger.info("Dyadic evaluator background agent started") except Exception: logger.warning("Dyadic evaluator startup failed", exc_info=True) try: self.desire_check_agent = DesireCheckAgent( redis_client=self.message_cache.redis_client, ) await self.desire_check_agent.start() logger.info("Desire check agent started") except Exception: logger.warning("Desire check agent startup failed", exc_info=True) try: _discord_adapter = self.get_adapter("discord") self.game_art_agent = GameArtAgent( redis_client=self.message_cache.redis_client, platform_adapter=_discord_adapter, config=self.cfg, ) await self.game_art_agent.start() logger.info("Game art agent started") except Exception: logger.warning("Game art agent startup failed", exc_info=True) try: _discord_adapter = self.get_adapter("discord") self.game_turn_agent = GameTurnAgent( discord_platform=_discord_adapter, ) await self.game_turn_agent.start( self.redis, ) logger.info("Game turn agent started") except Exception: logger.warning("Game turn agent startup failed", exc_info=True) logger.info("AgentsService started successfully.")
[docs] async def run(self) -> None: """Block forever so scheduled jobs and agents keep running. Implements the abstract ``run`` loop of :class:`~core.service_base.StargazerService`. The agents service does no per-tick work in the foreground — the supervisor, scheduler and background agents started in :meth:`on_start` drive everything — so this simply awaits a never-set :class:`asyncio.Event` until the task is cancelled at shutdown. Interactions: awaits ``asyncio.Event().wait()``; swallows the :class:`asyncio.CancelledError` raised when the process is shutting down so cancellation unwinds cleanly. Called by :func:`main` after ``service.boot()`` completes; cancellation is triggered by the SIGINT/SIGTERM handlers installed in :func:`main`, which invoke ``service.shutdown()``. """ try: logger.info("AgentsService running... Press Ctrl+C to stop.") await asyncio.Event().wait() except asyncio.CancelledError: pass
[docs] async def on_stop(self) -> None: """Tear down the scheduler, supervisor, agents and clients in order. Implements the abstract ``on_stop`` cleanup of :class:`~core.service_base.StargazerService`, releasing everything stood up in :meth:`on_start`. It stops the scheduler and supervisor first (halting new task launches and releasing the leader lock), then the game turn/art agents and dyadic evaluator, then the embedding queue, and finally closes the OpenRouter client. Interactions: awaits ``scheduler.stop()`` and ``supervisor.shutdown()``; best-effort awaits ``stop()`` on :class:`GameTurnAgent`, :class:`GameArtAgent`, :class:`DyadicEvaluator` and :class:`EmbeddingBatchQueue` (each wrapped in try/except so one failure cannot block the rest), then awaits ``openrouter.close()``. Each component is guarded by a truthiness check so partial-boot states shut down safely. Called by ``StargazerService.shutdown`` after service deregistration; ``shutdown`` itself is invoked by the signal handlers wired in :func:`main`. """ if self.scheduler: await self.scheduler.stop() if self.supervisor: await self.supervisor.shutdown() if self.game_turn_agent: try: await self.game_turn_agent.stop() except Exception: logger.warning("Game turn agent stop failed", exc_info=True) if self.game_art_agent: try: await self.game_art_agent.stop() except Exception: logger.warning("Game art agent stop failed", exc_info=True) if self.dyadic_evaluator: try: await self.dyadic_evaluator.stop() except Exception: logger.warning("Dyadic evaluator stop failed", exc_info=True) if self.desire_check_agent: try: await self.desire_check_agent.stop() except Exception: logger.warning("Desire check agent stop failed", exc_info=True) if self.embedding_queue: try: await self.embedding_queue.stop() except Exception: logger.warning("Embedding queue stop failed", exc_info=True) if self.openrouter: await self.openrouter.close() logger.info("AgentsService stopped gracefully.")
[docs] async def main() -> None: """Configure, boot and run the agents service as a standalone process. Process entry point: sets up basic logging, loads :class:`~config.Config`, mints an ``agents-<hex>`` instance id, builds an async Redis client and constructs an :class:`AgentsService`. It installs SIGINT/SIGTERM handlers that schedule a graceful ``service.shutdown()``, then boots and runs the service until cancellation, closing the Redis client in a ``finally`` block. Interactions: calls ``Config.load()`` and ``cfg.build_async_redis_client(decode_responses=False)``; registers signal handlers on the running event loop that call ``service.shutdown()``; awaits ``service.boot()`` (which runs :meth:`on_start`) and ``service.run()``; guarantees ``redis_client.aclose()`` on exit. Called by the ``if __name__ == "__main__"`` guard via :func:`asyncio.run` (launched by ``scripts/systemd/stargazer-agents.service``) and is patched/awaited directly by the migration tests in ``tests/core/migration/test_service_entrypoints.py``. """ import uuid import signal logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s" ) cfg = Config.load() instance_id = f"agents-{uuid.uuid4().hex[:8]}" redis_client = cfg.build_async_redis_client(decode_responses=False) service = AgentsService(cfg, redis_client, instance_id) loop = asyncio.get_running_loop() for sig in (signal.SIGINT, signal.SIGTERM): loop.add_signal_handler(sig, lambda: asyncio.create_task(service.shutdown())) try: await service.boot() await service.run() finally: if redis_client: await redis_client.aclose()
if __name__ == "__main__": asyncio.run(main())