"""Agents service entry point — scheduled background work and agents.
Runs :class:`AgentsService`, a :class:`~core.service_base.StargazerService`
dedicated to recurring/background jobs so they never compete with the
latency-sensitive Inference workers. It owns the cluster-singleton
:class:`~background_tasks.TaskSupervisor` (leader-elected via a Redis lock,
so exactly one instance runs each cron job) and schedules tasks through a
:class:`~background_tasks.BackgroundScheduler`, including:
* Knowledge-graph maintenance: auto KG extraction, agentic bulk
incremental extraction, and channel summarization.
* Limbic/NCM upkeep: maintenance, dedup, anchoring, sunset, and the
journal stream.
* Scheduled prompt ticks/cleanup and Gemini key probing.
It also boots optional background agents (:class:`DyadicEvaluator`,
:class:`GameArtAgent`, :class:`GameTurnAgent`), the SWORD subsystem
(:func:`sword.factory.build_sword_system`), and the StarWiki service, and
persists embeddings asynchronously via :class:`EmbeddingBatchQueue`.
Launched standalone (``python agents_main.py``) by
``scripts/systemd/stargazer-agents.service``.
"""
import asyncio
import logging
from typing import Any
from core.service_base import StargazerService
from config import Config
from message_cache import MessageCache
from knowledge_graph import KnowledgeGraphManager
from openrouter_client import OpenRouterClient
from core.event_bus import RedisEventBus
from core.proxy_adapter import ProxyPlatformAdapter
from embedding_queue import EmbeddingBatchQueue
from starwiki.service import StarwikiService
from background_agents.dyadic_evaluator import DyadicEvaluator
from background_agents.desire_check_agent import DesireCheckAgent
from background_agents.game_art_agent import GameArtAgent
from background_agents.game_turn_agent import GameTurnAgent
from background_tasks import (
BackgroundScheduler,
TaskSupervisor,
scheduled_prompt_tick,
scheduled_prompt_cleanup,
auto_kg_extraction,
channel_summarization,
log_rag_ingest_task,
gemini_key_probe_task,
agentic_kg_bulk_incremental_task,
limbic_maintenance_task,
limbic_dedup_task,
limbic_org_task,
journal_stream_task,
limbic_anchoring_task,
limbic_sunset_task,
anamnesis_digest_task,
channel_heartbeat_task,
starwiki_lint_task,
)
from sword.factory import build_sword_system
logger = logging.getLogger("agents_service")
[docs]
class AgentsService(StargazerService):
"""Background-work microservice running scheduled jobs and agents.
Concrete :class:`~core.service_base.StargazerService` for the agents tier.
It isolates recurring/LLM-heavy work (KG extraction, limbic upkeep,
journaling, key probing, channel summarization) from the latency-sensitive
inference workers by owning a leader-elected
:class:`~background_tasks.TaskSupervisor` and a
:class:`~background_tasks.BackgroundScheduler`, plus optional background
agents (dyadic evaluator, game art/turn agents), the SWORD subsystem and
StarWiki.
Instances are constructed by :func:`main` (and by the migration tests under
``tests/core/migration/``); the base class drives the lifecycle by calling
:meth:`on_start`, :meth:`run` and :meth:`on_stop` from ``boot``/``shutdown``.
"""
[docs]
def __init__(self, config: Config, redis_client: Any, instance_id: str):
"""Initialize the agents service and declare its component slots.
Wires up the base :class:`~core.service_base.StargazerService` with
``service_name="agents"`` and a required Redis client, then sets every
heavyweight collaborator (message cache, KG manager, OpenRouter client,
supervisor, embedding queue, event bus, background agents) to ``None``
so they can be lazily built in :meth:`on_start`. The
:class:`~background_tasks.BackgroundScheduler` is the only collaborator
constructed eagerly here.
Interactions: calls ``super().__init__`` on
:class:`~core.service_base.StargazerService` (which may create a
``HealthServer``) and constructs a
:class:`~background_tasks.BackgroundScheduler`. No Redis or network I/O
happens at construction time.
Called by :func:`main` when booting the standalone process, and directly
by the migration tests in ``tests/core/migration/`` that exercise the
service in isolation.
Args:
config (Config): Loaded application configuration supplying Redis,
OpenRouter/Gemini, model and feature-flag settings; stored as
``self.cfg``.
redis_client (Any): Async Redis client (decoded responses off) used
for service registration, the supervisor lock and scheduled
tasks.
instance_id (str): Unique id for this process (e.g.
``agents-<hex>``) used for service registration, the event-bus
node id and the supervisor's leader-election identity.
"""
super().__init__(
service_name="agents",
instance_id=instance_id,
redis_client=redis_client,
redis_required=True
)
self.cfg = config
self.message_cache = None
self.kg_manager = None
self.openrouter = None
self.scheduler = BackgroundScheduler()
self.supervisor = None
self.embedding_queue = None
self.starwiki_service = None
self.event_bus = None
self.dyadic_evaluator = None
self.desire_check_agent = None
self.game_art_agent = None
self.game_turn_agent = None
self._background_agent_tasks = []
[docs]
def get_adapter(self, platform_name: str) -> ProxyPlatformAdapter:
"""Return a proxy platform adapter that sends via the event bus.
Builds a :class:`~core.proxy_adapter.ProxyPlatformAdapter` bound to this
service's :class:`~core.event_bus.RedisEventBus` so background agents can
emit outbound platform actions (messages, etc.) without owning a real
platform connection; the inference/gateway tier consumes those events
and performs the actual send.
Interactions: constructs a fresh
:class:`~core.proxy_adapter.ProxyPlatformAdapter` over ``self.event_bus``
each call (no caching). The bus must already be initialized in
:meth:`on_start`; calling this before ``on_start`` yields an adapter over
a ``None`` bus.
Called by :meth:`on_start` to obtain the ``"discord"`` adapter for the
:class:`GameArtAgent` and :class:`GameTurnAgent`, and by the agents
migration tests; mirrors the ``get_adapter`` method on the inference and
web services.
Args:
platform_name (str): Logical platform key (e.g. ``"discord"``) the
adapter will tag its outbound events with.
Returns:
ProxyPlatformAdapter: Event-bus-backed adapter for ``platform_name``.
"""
return ProxyPlatformAdapter(self.event_bus, platform_name)
[docs]
async def on_start(self) -> None:
"""Build collaborators and register all scheduled and supervised work.
Implements the abstract ``on_start`` of
:class:`~core.service_base.StargazerService`. Constructs the
:class:`~openrouter_client.OpenRouterClient`,
:class:`~message_cache.MessageCache`,
:class:`~knowledge_graph.KnowledgeGraphManager`,
:class:`~embedding_queue.EmbeddingBatchQueue`,
:class:`~core.event_bus.RedisEventBus` and
:class:`~background_tasks.TaskSupervisor`, then registers the supervised
``limbic_dedup_worker`` and a fleet of periodic jobs on the
:class:`~background_tasks.BackgroundScheduler` (prompt tick/cleanup, auto
and agentic-bulk KG extraction, channel summarization, optional log RAG
ingest, Gemini key probe, limbic maintenance/anchoring/sunset, journal
stream, anamnesis digest, and — when enabled — StarWiki). Finally it
starts the supervisor and scheduler, kicks off SWORD bootstrap in the
background, and starts the optional dyadic/game agents.
Interactions: resolves SSL/connection kwargs from
:class:`~config.Config`; calls ``set_observability_redis`` from
:mod:`observability`; awaits ``embedding_queue.start()``,
``event_bus.ensure_streams()``, ``supervisor.start()`` and
``scheduler.start()``; builds the SWORD system via
:func:`sword.factory.build_sword_system` and, in a fire-and-forget
:func:`asyncio.create_task` (``_bootstrap_sword``), ensures SWORD indexes
and loads Origin fragments from ``system_prompt.j2`` before attaching the
monitor to the KG manager. Conditionally constructs and starts
:class:`StarwikiService`, :class:`DyadicEvaluator`,
:class:`GameArtAgent` and :class:`GameTurnAgent` (the latter two over the
``"discord"`` adapter from :meth:`get_adapter`); agent startup failures
are logged and swallowed so one failing agent cannot abort boot. Mutates
the instance attributes declared in :meth:`__init__`.
Called by ``StargazerService.boot`` (Phase 3-7) after the Redis
connection is verified; never invoked directly outside the migration
tests.
Raises:
Exception: Propagates failures from core collaborator construction or
the awaited ``start``/``ensure_streams`` calls (which would abort
boot); per-agent and SWORD-bootstrap errors are caught and logged
instead.
"""
redis_client = self.redis
if self.cfg.redis_sentinels:
ssl_kw = self.cfg.redis_ssl_kwargs()
elif self.cfg.redis_url:
ssl_kw = self.cfg.redis_connection_kwargs_for_url(self.cfg.redis_url)
else:
ssl_kw = {}
self.openrouter = OpenRouterClient(
api_key=self.cfg.openrouter_api_key,
model=self.cfg.model,
temperature=self.cfg.temperature,
max_tokens=self.cfg.max_tokens,
top_p=self.cfg.top_p,
base_url=self.cfg.llm_base_url,
gemini_api_key=self.cfg.gemini_api_key,
)
self.message_cache = MessageCache(
redis_url=self.cfg.redis_url,
openrouter_client=self.openrouter,
embedding_model=self.cfg.embedding_model,
ssl_kwargs=ssl_kw,
redis_sentinels=self.cfg.redis_sentinels,
redis_sentinel_master=self.cfg.redis_sentinel_master,
resilience_kwargs=self.cfg.redis_resilience_kwargs(),
)
from observability import set_observability_redis
set_observability_redis(self.message_cache.redis_client)
self.kg_manager = KnowledgeGraphManager(
redis_client=redis_client,
openrouter=self.openrouter,
embedding_model=self.cfg.embedding_model,
)
self.embedding_queue = EmbeddingBatchQueue(
openrouter=self.openrouter,
redis=self.message_cache.redis_raw_client,
model=self.cfg.embedding_model,
flush_interval=self.cfg.embedding_flush_interval,
max_batch_size=self.cfg.embedding_batch_size,
)
await self.embedding_queue.start()
if getattr(self.cfg, "starwiki_enabled", False):
self.starwiki_service = StarwikiService(
self.cfg,
self.message_cache.redis_client,
self.openrouter,
)
await self.starwiki_service.ensure_ready()
self.event_bus = RedisEventBus(
redis=self.message_cache.redis_raw_client,
node_role="worker",
node_id=self.instance_id,
stream_maxlen=getattr(self.cfg, "redis_stream_maxlen", 100_000),
)
await self.event_bus.ensure_streams()
self.supervisor = TaskSupervisor(redis=redis_client, node_id=self.instance_id)
# Supervised tasks
self.supervisor.register_task(
"limbic_dedup_worker", lambda: limbic_dedup_task(self)
)
self.supervisor.register_task(
"limbic_org_worker", lambda: limbic_org_task(self)
)
# Scheduled periodic tasks
self.scheduler.register("scheduled_prompt_tick", scheduled_prompt_tick, interval=60, initial_delay=10, args=(redis_client,))
self.scheduler.register("scheduled_prompt_cleanup", scheduled_prompt_cleanup, interval=172800, initial_delay=300, args=(redis_client,))
if getattr(self.cfg, "legacy_kg_extraction", False):
self.scheduler.register("auto_kg_extraction", auto_kg_extraction, interval=14400, initial_delay=300, args=(redis_client, self.kg_manager, self.openrouter, self.cfg))
self.scheduler.register("channel_summarization", channel_summarization, interval=21600, initial_delay=600, args=(redis_client,))
if getattr(self.cfg, "background_scheduler_log_rag_ingest_enabled", False):
self.scheduler.register("log_rag_ingest", log_rag_ingest_task, interval=21600, initial_delay=30)
self.scheduler.register("gemini_key_probe", gemini_key_probe_task, interval=7200, initial_delay=60)
self.scheduler.register("agentic_kg_bulk_incremental", agentic_kg_bulk_incremental_task, interval=14400, initial_delay=900, args=(redis_client, self.kg_manager, self.openrouter))
self.scheduler.register("limbic_maintenance", limbic_maintenance_task, interval=604800, initial_delay=7200, args=(self,))
self.scheduler.register("journal_stream", journal_stream_task, interval=10, initial_delay=5, args=(redis_client,))
# New background task registrations
self.scheduler.register("limbic_anchoring_worker", limbic_anchoring_task, interval=60, initial_delay=15, args=(self,))
self.scheduler.register("limbic_sunset_processor", limbic_sunset_task, interval=3600, initial_delay=3600, args=(self,))
self.scheduler.register("anamnesis_digest", anamnesis_digest_task, interval=1200, initial_delay=300, args=(redis_client, self.kg_manager, self.openrouter, self.cfg))
if getattr(self.cfg, "starwiki_enabled", False):
self.scheduler.register("starwiki_lint", starwiki_lint_task, interval=3600, initial_delay=600, args=(self,))
await self.supervisor.start()
await self.scheduler.start()
# SWORD SWAOM system integration
sword_monitor, sword_graph_manager = build_sword_system(
cfg=self.cfg,
redis_client=redis_client,
openrouter=self.openrouter,
kg_manager=self.kg_manager,
)
if sword_monitor is not None and sword_graph_manager is not None:
async def _bootstrap_sword():
"""Verify SWORD indexes, load Origins, then attach the monitor.
Background coroutine that prepares the SWORD subsystem off the
critical boot path so index creation and Origin parsing never
delay service startup. Only after both succeed does it wire the
monitor onto the KG manager.
Interactions: awaits ``sword_graph_manager.ensure_sword_indexes()``,
constructs an :class:`sword.origin_parser.OriginParser` and awaits
``parser.parse_and_load("system_prompt.j2")`` to load Origin
fragments, then sets ``self.kg_manager._sword_monitor`` to the
``sword_monitor`` captured from the enclosing :meth:`on_start`
scope. Any failure is logged at warning level and otherwise
ignored, leaving the monitor unattached.
Called by :meth:`on_start` via
``asyncio.create_task(..., name="sword_bootstrap_background")``,
only when :func:`sword.factory.build_sword_system` returned a
non-``None`` monitor and graph manager.
"""
try:
logger.info("Verifying SWORD subgraph indexes (background)...")
await sword_graph_manager.ensure_sword_indexes()
logger.info("SWORD subgraph indexes verified")
from sword.origin_parser import OriginParser
parser = OriginParser(sword_graph_manager)
system_prompt_path = "system_prompt.j2"
logger.info("Parsing and loading SWORD Origin fragments...")
await parser.parse_and_load(system_prompt_path)
logger.info("SWORD Origin fragments parsed and loaded successfully")
# Attach the monitor only AFTER indexes and origins are ready
self.kg_manager._sword_monitor = sword_monitor
except Exception:
logger.warning("Could not create SWORD indexes or load Origins (background)", exc_info=True)
asyncio.create_task(_bootstrap_sword(), name="sword_bootstrap_background")
# Start background agents
if self.message_cache is not None:
try:
self.dyadic_evaluator = DyadicEvaluator(
redis_client=self.message_cache.redis_client,
cache_redis=self.message_cache.redis_client,
)
await self.dyadic_evaluator.start()
logger.info("Dyadic evaluator background agent started")
except Exception:
logger.warning("Dyadic evaluator startup failed", exc_info=True)
try:
self.desire_check_agent = DesireCheckAgent(
redis_client=self.message_cache.redis_client,
)
await self.desire_check_agent.start()
logger.info("Desire check agent started")
except Exception:
logger.warning("Desire check agent startup failed", exc_info=True)
try:
_discord_adapter = self.get_adapter("discord")
self.game_art_agent = GameArtAgent(
redis_client=self.message_cache.redis_client,
platform_adapter=_discord_adapter,
config=self.cfg,
)
await self.game_art_agent.start()
logger.info("Game art agent started")
except Exception:
logger.warning("Game art agent startup failed", exc_info=True)
try:
_discord_adapter = self.get_adapter("discord")
self.game_turn_agent = GameTurnAgent(
discord_platform=_discord_adapter,
)
await self.game_turn_agent.start(
self.redis,
)
logger.info("Game turn agent started")
except Exception:
logger.warning("Game turn agent startup failed", exc_info=True)
logger.info("AgentsService started successfully.")
[docs]
async def run(self) -> None:
"""Block forever so scheduled jobs and agents keep running.
Implements the abstract ``run`` loop of
:class:`~core.service_base.StargazerService`. The agents service does no
per-tick work in the foreground — the supervisor, scheduler and
background agents started in :meth:`on_start` drive everything — so this
simply awaits a never-set :class:`asyncio.Event` until the task is
cancelled at shutdown.
Interactions: awaits ``asyncio.Event().wait()``; swallows the
:class:`asyncio.CancelledError` raised when the process is shutting down
so cancellation unwinds cleanly.
Called by :func:`main` after ``service.boot()`` completes; cancellation
is triggered by the SIGINT/SIGTERM handlers installed in :func:`main`,
which invoke ``service.shutdown()``.
"""
try:
logger.info("AgentsService running... Press Ctrl+C to stop.")
await asyncio.Event().wait()
except asyncio.CancelledError:
pass
[docs]
async def on_stop(self) -> None:
"""Tear down the scheduler, supervisor, agents and clients in order.
Implements the abstract ``on_stop`` cleanup of
:class:`~core.service_base.StargazerService`, releasing everything stood
up in :meth:`on_start`. It stops the scheduler and supervisor first
(halting new task launches and releasing the leader lock), then the game
turn/art agents and dyadic evaluator, then the embedding queue, and
finally closes the OpenRouter client.
Interactions: awaits ``scheduler.stop()`` and ``supervisor.shutdown()``;
best-effort awaits ``stop()`` on
:class:`GameTurnAgent`, :class:`GameArtAgent`,
:class:`DyadicEvaluator` and :class:`EmbeddingBatchQueue` (each wrapped
in try/except so one failure cannot block the rest), then awaits
``openrouter.close()``. Each component is guarded by a truthiness check
so partial-boot states shut down safely.
Called by ``StargazerService.shutdown`` after service deregistration;
``shutdown`` itself is invoked by the signal handlers wired in
:func:`main`.
"""
if self.scheduler:
await self.scheduler.stop()
if self.supervisor:
await self.supervisor.shutdown()
if self.game_turn_agent:
try:
await self.game_turn_agent.stop()
except Exception:
logger.warning("Game turn agent stop failed", exc_info=True)
if self.game_art_agent:
try:
await self.game_art_agent.stop()
except Exception:
logger.warning("Game art agent stop failed", exc_info=True)
if self.dyadic_evaluator:
try:
await self.dyadic_evaluator.stop()
except Exception:
logger.warning("Dyadic evaluator stop failed", exc_info=True)
if self.desire_check_agent:
try:
await self.desire_check_agent.stop()
except Exception:
logger.warning("Desire check agent stop failed", exc_info=True)
if self.embedding_queue:
try:
await self.embedding_queue.stop()
except Exception:
logger.warning("Embedding queue stop failed", exc_info=True)
if self.openrouter:
await self.openrouter.close()
logger.info("AgentsService stopped gracefully.")
[docs]
async def main() -> None:
"""Configure, boot and run the agents service as a standalone process.
Process entry point: sets up basic logging, loads :class:`~config.Config`,
mints an ``agents-<hex>`` instance id, builds an async Redis client and
constructs an :class:`AgentsService`. It installs SIGINT/SIGTERM handlers
that schedule a graceful ``service.shutdown()``, then boots and runs the
service until cancellation, closing the Redis client in a ``finally`` block.
Interactions: calls ``Config.load()`` and
``cfg.build_async_redis_client(decode_responses=False)``; registers signal
handlers on the running event loop that call ``service.shutdown()``; awaits
``service.boot()`` (which runs :meth:`on_start`) and ``service.run()``;
guarantees ``redis_client.aclose()`` on exit.
Called by the ``if __name__ == "__main__"`` guard via
:func:`asyncio.run` (launched by ``scripts/systemd/stargazer-agents.service``)
and is patched/awaited directly by the migration tests in
``tests/core/migration/test_service_entrypoints.py``.
"""
import uuid
import signal
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s"
)
cfg = Config.load()
instance_id = f"agents-{uuid.uuid4().hex[:8]}"
redis_client = cfg.build_async_redis_client(decode_responses=False)
service = AgentsService(cfg, redis_client, instance_id)
loop = asyncio.get_running_loop()
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(sig, lambda: asyncio.create_task(service.shutdown()))
try:
await service.boot()
await service.run()
finally:
if redis_client:
await redis_client.aclose()
if __name__ == "__main__":
asyncio.run(main())