Source code for background_tasks

"""Asyncio-based background task scheduler.

Manages periodic background tasks that run alongside the bot:
- Scheduled prompt tick / cleanup
- Auto memory extraction
- Periodic maintenance

When ``Config.background_scheduler_chat_llm_enabled`` is false, periodic
tasks that call chat/completions are not registered (scheduled prompt tick,
auto KG extraction, agentic KG bulk incremental, channel summarization,
channel heartbeat, KG consolidation, anamnesis).  Cleanup, KG decay, and
Gemini key probe still run.  Log RAG embed ingest runs only when
``Config.background_scheduler_log_rag_ingest_enabled`` is true (default off).
"""

from __future__ import annotations

import asyncio
import logging
import os
from typing import Any, Callable, Awaitable

logger = logging.getLogger(__name__)


_BACKGROUND_TASKS_REGISTRY: list[dict[str, Any]] = []


[docs] def background_task( interval: float, initial_delay: float = 0.0, *, name: str | None = None, requires_redis: bool = False, requires_kg: bool = False, requires_openrouter: bool = False, requires_bot_runner: bool = False, requires_chat_llm: bool = False, requires_config: bool = False, opt_in_config_flag: str | None = None, ) -> Callable[[Callable[..., Awaitable[Any]]], Callable[..., Awaitable[Any]]]: """Decorator that registers a coroutine as a periodic background task. Captures the task's scheduling metadata and dependency requirements at import time so that :func:`build_scheduler` can later decide whether to wire the task up (based on which of Redis, the knowledge graph, OpenRouter, the bot runner, chat-LLM gating, and any opt-in config flag are available). The decorator itself does not run or wrap the coroutine; it only appends a metadata record. Interactions: its inner ``decorator`` closure appends a dict to the module-level ``_BACKGROUND_TASKS_REGISTRY`` and logs the registration at debug level. No I/O, network, or Redis access happens here. Called by every ``@background_task(...)`` application on the concrete task coroutines defined later in this module, and by the scheduler decorator tests in ``tests/test_scheduler_decorator.py``. Args: interval (float): Seconds between successive runs of the task. initial_delay (float): Seconds to wait before the first run. name (str | None): Explicit registry name; defaults to the coroutine's ``__name__`` when omitted. requires_redis (bool): Whether the task needs a Redis client argument. requires_kg (bool): Whether the task needs the knowledge-graph manager. requires_openrouter (bool): Whether the task needs the OpenRouter client. requires_bot_runner (bool): Whether the task needs the bot-runner/service object. requires_chat_llm (bool): Whether the task is gated on the chat-LLM scheduler flag. requires_config (bool): Whether the task should be passed the config object. opt_in_config_flag (str | None): Name of a config attribute that must be truthy for the task to be scheduled, or ``None`` for always-on. Returns: Callable: A decorator that records metadata for, and returns unchanged, the wrapped task coroutine. """ def decorator(func: Callable[..., Awaitable[Any]]) -> Callable[..., Awaitable[Any]]: """Register the wrapped coroutine and return it unchanged. Inner closure of :func:`background_task` that records the task's metadata (name, interval, initial delay, dependency requirements, and opt-in flag) so :func:`build_scheduler` can later wire it up. Interactions appends a metadata dict to the module-level ``_BACKGROUND_TASKS_REGISTRY`` and logs the registration at debug level; does not wrap or alter ``func``. Called implicitly by the ``@background_task(...)`` decorator on each concrete task coroutine in this module. Args: func (Callable[..., Awaitable[Any]]): The task coroutine being decorated. Returns: Callable[..., Awaitable[Any]]: The same ``func``, unmodified. """ task_name = name or func.__name__ _BACKGROUND_TASKS_REGISTRY.append({ "name": task_name, "func": func, "interval": interval, "initial_delay": initial_delay, "requires_redis": requires_redis, "requires_kg": requires_kg, "requires_openrouter": requires_openrouter, "requires_bot_runner": requires_bot_runner, "requires_chat_llm": requires_chat_llm, "requires_config": requires_config, "opt_in_config_flag": opt_in_config_flag, }) logger.debug( "Registered background task %s with interval %s.", task_name, interval, ) return func return decorator
def _decode_redis_text(raw: Any) -> str: """Coerce a raw Redis field value into a clean ``str``. Normalises the heterogeneous values returned by ``hget``/pipeline reads: ``None`` becomes the empty string, ``bytes`` are UTF-8 decoded with ``errors="replace"``, and anything else is stringified. Called by :func:`startup_channel_backfill` when reading cached message ``text`` fields back from Redis to compare against freshly fetched history. Args: raw (Any): Value from a Redis read (``None``, ``bytes``, or other). Returns: str: The decoded text, never ``None``. """ if raw is None: return "" if isinstance(raw, bytes): return raw.decode("utf-8", errors="replace") return str(raw)
[docs] class BackgroundScheduler: """Run named periodic tasks in the background. Each task is an async callable that is invoked repeatedly with a configurable interval. Tasks are resilient to individual failures and log errors without crashing the scheduler. Usage:: sched = BackgroundScheduler() sched.register("my_task", my_coro, interval=300) await sched.start() # non-blocking, spawns tasks ... await sched.stop() """
[docs] def __init__(self) -> None: """Initialize an empty scheduler with no registered or running tasks. Sets up the in-memory registry of declared tasks and the map of live asyncio tasks, and marks the scheduler as not yet started. Pure in-process state; performs no I/O. Called by the service entrypoints that own a scheduler instance (``agents_main.py``, ``inference_main.py``, ``consolidation_main.py``), by :func:`build_scheduler`, and by the scheduler tests. """ self._tasks: dict[str, _RegisteredTask] = {} self._running: dict[str, asyncio.Task] = {} self._started = False
[docs] def register( self, name: str, func: Callable[..., Awaitable[Any]], interval: float, initial_delay: float = 0.0, args: tuple = (), kwargs: dict | None = None, ) -> None: """Register a periodic task. Parameters ---------- name: Unique identifier for the task. func: Async callable to invoke each cycle. interval: Seconds between invocations (measured from end of previous run). initial_delay: Seconds to wait before the first invocation. """ self._tasks[name] = _RegisteredTask( name=name, func=func, interval=interval, initial_delay=initial_delay, args=args, kwargs=kwargs or {}, )
[docs] async def start(self) -> None: """Spawn one supervised loop per registered task (idempotent). Marks the scheduler started and creates a long-lived asyncio task for each registered entry, each driven by :meth:`_loop` to run the task on its configured interval. Returns immediately without blocking; a second call is a no-op once started. Interactions: calls ``asyncio.create_task(self._loop(task))`` for every entry in ``self._tasks``, stores the handles in ``self._running``, and logs the count started. Called by the agents, inference, and consolidation service startup flows (``await self.scheduler.start()``) and by the observability/scheduler tests. """ if self._started: return self._started = True for name, task in self._tasks.items(): self._running[name] = asyncio.create_task( self._loop(task), name=f"bg:{name}", ) logger.info( "Background scheduler started %d task(s)", len(self._running), )
[docs] async def stop(self) -> None: """Cancel every running task loop and wait for them to settle. Clears the started flag, cancels all live asyncio tasks, gathers them (swallowing the resulting exceptions), and empties the running map so the scheduler can be cleanly restarted or torn down. Interactions: calls ``cancel()`` on each handle in ``self._running``, awaits them with ``asyncio.gather(..., return_exceptions=True)``, clears the dict, and logs the stop. Called by the agents, inference, and consolidation service shutdown paths (``await self.scheduler.stop()``). """ self._started = False for name, t in list(self._running.items()): t.cancel() await asyncio.gather( *self._running.values(), return_exceptions=True, ) self._running.clear() logger.info("Background scheduler stopped")
# ------------------------------------------------------------------ @staticmethod async def _loop(task: _RegisteredTask) -> None: """Drive one registered task forever on its configured interval. Honours the task's initial delay, then repeatedly invokes the task coroutine, sleeping ``task.interval`` seconds between runs. Individual run failures are caught and logged (so one bad cycle never kills the loop), while a ``CancelledError`` propagates to allow clean shutdown. Each run emits a ``scheduler_task_run`` debug-observability event with the outcome and duration. Interactions: awaits ``task.func(*task.args, **task.kwargs)``, measures wall time via ``time.monotonic``, and schedules a fire-and-forget ``observability.publish_debug_event`` (source ``scheduler``) per run. Called by :meth:`start`, which wraps it in one asyncio task per registered entry; also exercised directly by the observability tests. Args: task (_RegisteredTask): The frozen registration record holding the coroutine, its args/kwargs, interval, and initial delay. """ if task.initial_delay > 0: await asyncio.sleep(task.initial_delay) while True: import time t0 = time.monotonic() status = "ok" error_msg = "" try: await task.func(*task.args, **task.kwargs) except asyncio.CancelledError: raise except Exception as e: status = "error" error_msg = str(e) logger.exception("Background task '%s' failed", task.name) from observability import publish_debug_event asyncio.create_task( publish_debug_event( "scheduler_task_run", "scheduler", status=status, duration_ms=(time.monotonic() - t0) * 1000, preview=f"task={task.name} status={status}", payload={"task": task.name, "error": error_msg}, ), name=f"obs_scheduler_{task.name}", ) await asyncio.sleep(task.interval)
class _RegisteredTask: """RegisteredTask. Attributes: name: The name. func: The func. interval: The interval. initial_delay: The initial delay. args: The args. kwargs: The kwargs. """ __slots__ = ("name", "func", "interval", "initial_delay", "args", "kwargs") def __init__( self, name: str, func: Callable[..., Awaitable[Any]], interval: float, initial_delay: float, args: tuple, kwargs: dict, ) -> None: """Initialize the instance. Args: name (str): Human-readable name. func (Callable[..., Awaitable[Any]]): The func value. interval (float): The interval value. initial_delay (float): The initial delay value. args (tuple): The args value. kwargs (dict): Additional keyword arguments. """ self.name = name self.func = func self.interval = interval self.initial_delay = initial_delay self.args = args self.kwargs = kwargs
[docs] class TaskSupervisor: """Leader-elected supervisor for long-lived async worker loops. Unlike :class:`BackgroundScheduler` (interval-based, every-node tasks), this supervises persistent worker coroutines that should run on exactly one node at a time. When given a Redis client it contends for a distributed leader lease and only spawns the registered workers on the elected leader, stopping them again on leadership loss; without Redis it runs standalone as the permanent leader. Each supervised worker is restarted with exponential backoff if it crashes. Interactions: uses Redis ``SET NX EX``/``GET``/``EVAL`` on the leader key for lease contention, ``asyncio`` tasks for the heartbeat and each worker, and the ``observability`` singleton for ``task_started`` / ``task_crash`` counters. Constructed by the agents service in ``agents_main.py`` (to supervise the Limbic dedup worker) and exercised by the leader-election and boot tests. """
[docs] def __init__(self, redis: Any = None, node_id: str = "") -> None: """Initialize the supervisor and its leader-election state. Sets up empty task/factory registries and records the Redis client and node identity used for distributed leader election. The election key is read from the ``SG_LEADER_KEY`` environment variable, defaulting to ``sg:leader:supervisor``. When ``redis`` is ``None`` the supervisor runs standalone and treats itself as the permanent leader. Interactions reads ``os.environ["SG_LEADER_KEY"]`` for the Redis lock key; stores ``redis`` for later ``get``/``set``/``eval`` lease calls in :meth:`_leader_heartbeat`. Called by the agents service in ``agents_main.py``, which constructs ``TaskSupervisor(redis=redis_client, node_id=self.instance_id)`` during startup, and by the supervisor unit/integration tests. Args: redis (Any): Async Redis client used for leader-election lease operations, or ``None`` to run standalone as permanent leader. node_id (str): Unique identity of this process for the leader lease; falls back to ``"standalone"`` when empty. """ self._tasks: dict[str, asyncio.Task] = {} self._factories: dict[str, Callable[[], Awaitable[None]]] = {} self._is_running = False self._redis = redis self._node_id = node_id or "standalone" self._leader_key = os.environ.get("SG_LEADER_KEY", "sg:leader:supervisor") self._heartbeat_task: asyncio.Task | None = None self._is_leader = False
[docs] def register_task( self, name: str, task_factory: Callable[[], Awaitable[None]] ) -> None: """Register a worker factory under a name for later supervision. Records a zero-arg coroutine factory keyed by name; the supervisor will invoke it (and re-invoke it on crash) once this node holds leadership. Registering does not start anything — it only populates the factory map. Interactions: writes ``self._factories[name]``. The matching worker is later spawned by :meth:`_spawn_task` via :meth:`_run_with_retry`. Called by the agents service in ``agents_main.py`` (registering ``limbic_dedup_worker``) and by the leader-election and boot tests. Args: name (str): Unique key identifying the worker. task_factory (Callable[[], Awaitable[None]]): Zero-arg coroutine factory that runs the worker body for one invocation. """ self._factories[name] = task_factory
[docs] async def start(self) -> None: """Start supervision, electing a leader when Redis is available. Marks the supervisor running. When a Redis client is configured it launches the :meth:`_leader_heartbeat` background loop, which only spawns the registered workers once this node wins the lease. Without Redis it immediately assumes leadership and starts all tasks directly. Interactions sets ``self._is_running`` and either creates the ``_leader_heartbeat`` asyncio task or calls :meth:`_start_all_tasks`. Called by the agents service ``startup`` flow in ``agents_main.py`` (``await self.supervisor.start()``) and by the boot/leader-election tests. """ self._is_running = True if self._redis is not None: self._heartbeat_task = asyncio.create_task(self._leader_heartbeat()) else: # Standalone fallback: start directly as leader self._is_leader = True await self._start_all_tasks()
async def _start_all_tasks(self) -> None: """Spawn a supervised task for every registered factory. Iterates the registered factories and, for any that has no live task yet (absent or already finished), spawns a fresh retrying runner. Used when this node first wins (or assumes) leadership. Interactions calls :meth:`_spawn_task` for each eligible factory name, reading ``self._factories`` and ``self._tasks``. Called by :meth:`start` (standalone path) and by :meth:`_leader_heartbeat` when the node is elected leader. """ for name in self._factories: if name not in self._tasks or self._tasks[name].done(): self._spawn_task(name) async def _stop_all_tasks(self) -> None: """Cancel and await every currently running supervised task. Cancels all live tasks, waits for them to settle (swallowing the resulting exceptions), and clears the task registry. Invoked when this node loses leadership or shuts down so workers do not run on a follower. Interactions cancels every ``asyncio.Task`` in ``self._tasks``, gathers them with ``return_exceptions=True``, then clears the dict and logs. Called by :meth:`_leader_heartbeat` on leadership loss and by :meth:`shutdown`. """ for name, task in list(self._tasks.items()): task.cancel() if self._tasks: await asyncio.gather(*self._tasks.values(), return_exceptions=True) self._tasks.clear() logger.info("Supervisor stopped all tasks on losing leadership") async def _is_current_leader(self) -> bool: """Report whether this node currently holds the leader lease in Redis. Reads the leader key from Redis and compares the stored value to this node's id. Standalone supervisors (no Redis) always report ``True``; any read error or mismatch reports ``False``. Interactions issues ``redis.get(self._leader_key)`` and decodes the result for comparison against ``self._node_id``. No internal callers were found in this module; it is exposed for leadership checks/tests (the heartbeat loop performs its own inline lease comparison rather than calling this helper). Returns: bool: ``True`` if this node owns the lease (or runs standalone), otherwise ``False``. """ if self._redis is None: return True try: val = await self._redis.get(self._leader_key) if val is not None: val_str = val.decode() if isinstance(val, bytes) else str(val) return val_str == self._node_id except Exception: pass return False
[docs] async def verify_leadership_before_destructive(self) -> None: """Re-assert leadership immediately before a destructive action. Guards against split-brain by failing loudly unless this node both holds the in-memory leader flag and still owns the lease in Redis at call time. Intended to be called right before a worker performs an irreversible or cluster-wide operation, so a stale or lost lease aborts the action. Interactions: reads ``self._is_leader`` and issues ``redis.get(self._leader_key)``, comparing the decoded value against ``self._node_id``. No internal callers in this module; invoked by worker code ahead of destructive work and asserted by the leader-key config test in ``tests/core/migration/test_leader_key_config.py``. Raises: RuntimeError: If this node is not flagged leader, the lease has expired in Redis, or the lease is now owned by another node. """ if not self._is_leader: raise RuntimeError("Pre-flight lease check failed: not the current leader flag") if self._redis is not None: val = await self._redis.get(self._leader_key) if val is None: raise RuntimeError("Pre-flight lease check failed: lease expired in Redis") val_str = val.decode() if isinstance(val, bytes) else str(val) if val_str != self._node_id: raise RuntimeError(f"Pre-flight lease check failed: lost lease to {val_str}")
async def _leader_heartbeat(self) -> None: """Continuously contend for and renew the distributed leader lease. Runs an infinite loop (every 10s while running) that tries to acquire the Redis leader key with ``SET NX EX 30``; if this node already owns it, the TTL is renewed via a compare-and-extend Lua script. On gaining leadership it starts all supervised tasks; on losing it (lease taken by another node or read failure) it stops them. This makes exactly one node run the workers at a time. Interactions calls ``redis.set(..., nx=True, ex=30)``, ``redis.get(key)``, and ``redis.eval(<compare-and-expire Lua>, ...)`` on ``self._leader_key``; toggles ``self._is_leader`` and calls :meth:`_start_all_tasks` / :meth:`_stop_all_tasks` on transitions. Heartbeat errors are logged and retried rather than raised. Called by :meth:`start` as a long-lived ``asyncio`` task when a Redis client is configured. """ key = self._leader_key while self._is_running: try: # Try to set node_id as leader key with 30s TTL acquired = await self._redis.set(key, self._node_id, nx=True, ex=30) # If set returns True/b"OK", or if we are already recorded as the leader # (renew the TTL) is_leader = False if acquired is not None and acquired is not False: is_leader = True else: current_leader = await self._redis.get(key) if current_leader is not None: current_leader_str = current_leader.decode() if isinstance(current_leader, bytes) else str(current_leader) if current_leader_str == self._node_id: renewed = await self._redis.eval( """ if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('expire', KEYS[1], tonumber(ARGV[2])) else return 0 end """, 1, key, self._node_id, "30", ) is_leader = bool(renewed) if is_leader: if not self._is_leader: logger.info("Node '%s' elected as supervisor leader", self._node_id) self._is_leader = True await self._start_all_tasks() else: if self._is_leader: logger.warning("Node '%s' lost supervisor leadership", self._node_id) self._is_leader = False await self._stop_all_tasks() except Exception as e: logger.warning("Supervisor leader heartbeat failed: %s", e) await asyncio.sleep(10) def _spawn_task(self, name: str) -> None: """Create the supervised, auto-restarting task for one factory. Looks up the registered factory by name and schedules it under :meth:`_run_with_retry`, recording the resulting task so it can later be cancelled. No-ops if the supervisor is not currently running. Interactions reads ``self._factories[name]``, calls ``asyncio.create_task(self._run_with_retry(name, factory))``, and stores it in ``self._tasks[name]``. Called by :meth:`_start_all_tasks` for each eligible factory. Args: name (str): Registry key of the worker factory to spawn. """ if not self._is_running: return factory = self._factories[name] task = asyncio.create_task(self._run_with_retry(name, factory)) self._tasks[name] = task logger.info("Supervisor spawned task: %s", name) async def _run_with_retry( self, name: str, factory: Callable[[], Awaitable[None]] ) -> None: """Run a worker factory forever, respawning it with exponential backoff. Repeatedly invokes the factory while the supervisor is running. A clean return resets the backoff and pauses 5s before the next cycle; a crash is logged and retried after a backoff that doubles up to 60s. A ``CancelledError`` (from cancellation on shutdown or leadership loss) breaks the loop cleanly. Interactions emits ``task_started`` and ``task_crash`` counters via the ``observability`` singleton (the latter tagged with the exception class name) and calls the supplied ``factory`` coroutine each cycle. Called by :meth:`_spawn_task`, wrapped in an ``asyncio`` task per registered worker. Args: name (str): Registry key of the worker, used for logging and observability tags. factory (Callable[[], Awaitable[None]]): Zero-arg coroutine factory that runs the worker body for one invocation. """ from observability import observability backoff = 1.0 max_backoff = 60.0 while self._is_running: try: observability.increment("task_started", {"task_name": name}) await factory() # If worker returns cleanly, reset backoff and sleep before next iteration backoff = 1.0 await asyncio.sleep(5) except asyncio.CancelledError: logger.info("Task %s cancelled by supervisor", name) break except Exception as e: observability.increment( "task_crash", {"task_name": name, "error": e.__class__.__name__} ) logger.error( "Supervised task %s crashed! Exception: %s", name, str(e), exc_info=True, ) # Dynamic recovery and exponential backoff await asyncio.sleep(backoff) backoff = min(backoff * 2, max_backoff) logger.info( "Supervisor respawning task %s (backoff: %ds)", name, backoff )
[docs] async def shutdown(self) -> None: """Stop the heartbeat and all supervised tasks for a clean exit. Clears the running flag (so loops exit), cancels and awaits the leader heartbeat task if present, then cancels and awaits every supervised worker. Leaves the supervisor in a non-leader, fully stopped state. Interactions cancels ``self._heartbeat_task`` and gathers it, then calls :meth:`_stop_all_tasks`; resets ``self._is_leader``. (Note: the Redis lease is left to expire via its TTL rather than being deleted here.) Called by the agents service ``shutdown`` path in ``agents_main.py`` (``await self.supervisor.shutdown()``) and by the supervisor tests. """ self._is_running = False if self._heartbeat_task is not None: self._heartbeat_task.cancel() await asyncio.gather(self._heartbeat_task, return_exceptions=True) self._heartbeat_task = None await self._stop_all_tasks() self._is_leader = False logger.info("Supervisor shut down completely")
# ------------------------------------------------------------------ # Concrete task functions # ------------------------------------------------------------------
[docs] @background_task( interval=60, initial_delay=10, requires_redis=True, requires_chat_llm=True, ) async def scheduled_prompt_tick(redis: Any) -> None: """Fire any user-scheduled prompts that have come due (every 60s). Periodic task that delegates to the scheduled-prompt tool to find prompts whose fire time has passed and dispatch them, so reminders and recurring prompts actually go off. A missing optional dependency is silently ignored and any other failure is logged without crashing the scheduler. Interactions: imports and awaits ``tools.scheduled_prompt.tick_scheduled_prompts(redis)``, which reads and updates the scheduled-prompt state in Redis and enqueues the triggered prompts. Registered via the :func:`background_task` decorator into ``_BACKGROUND_TASKS_REGISTRY`` and dispatched by :func:`build_scheduler`; also re-registered explicitly in ``agents_main.py`` with ``args=(redis_client,)``. Args: redis (Any): Async Redis client passed by the scheduler. """ try: from tools.scheduled_prompt import tick_scheduled_prompts await tick_scheduled_prompts(redis) except ImportError: pass except Exception: logger.exception("Scheduled prompt tick failed")
[docs] @background_task( interval=172_800, # 48 hours initial_delay=300, requires_redis=True, ) async def scheduled_prompt_cleanup(redis: Any) -> None: """Garbage-collect stale executed/cancelled scheduled prompts (every 48h). Periodic housekeeping task that prunes scheduled-prompt records older than a week so the store does not grow without bound. Unlike :func:`scheduled_prompt_tick`, it does not invoke the chat LLM. Import errors are ignored and other failures are logged. Interactions: imports and awaits ``tools.scheduled_prompt.cleanup_expired_prompts(redis, days_to_keep=7)``, which deletes expired prompt entries from Redis. Registered via the :func:`background_task` decorator into ``_BACKGROUND_TASKS_REGISTRY`` and dispatched by :func:`build_scheduler`; also re-registered explicitly in ``agents_main.py`` with ``args=(redis_client,)``. Args: redis (Any): Async Redis client passed by the scheduler. """ try: from tools.scheduled_prompt import cleanup_expired_prompts await cleanup_expired_prompts(redis, days_to_keep=7) except ImportError: pass except Exception: logger.exception("Scheduled prompt cleanup failed")
[docs] @background_task( interval=14_400, # 4 hours initial_delay=300, requires_redis=True, requires_kg=True, requires_openrouter=True, requires_config=True, requires_chat_llm=True, opt_in_config_flag="legacy_kg_extraction", ) async def auto_kg_extraction( redis: Any, kg_manager: Any, openrouter: Any, config: Any = None, ) -> None: """Mine recent conversations for knowledge-graph entities (every 4h). Periodic task that runs a batch extraction pass over recent messages, adding entities and relationships to the knowledge graph so the bot's long-term memory keeps up with new conversations. The extraction model can be overridden via config. Import errors are ignored; other failures are logged. Interactions: imports and awaits ``kg_extraction.run_batch_extraction``, passing the Redis client (message source), the ``kg_manager`` (FalkorDB graph backend it writes into), the OpenRouter client (LLM that performs extraction), and an optional ``kg_extraction_model`` override read from ``config``. Logs a count when entities/relationships are added. Registered via the :func:`background_task` decorator into ``_BACKGROUND_TASKS_REGISTRY`` and dispatched by :func:`build_scheduler`; also re-registered explicitly in ``agents_main.py`` with ``args=(redis_client, self.kg_manager, self.openrouter, self.cfg)``. Args: redis (Any): Async Redis client (conversation source). kg_manager (Any): Knowledge-graph manager written to. openrouter (Any): OpenRouter client used for LLM extraction. config (Any): Optional config supplying ``kg_extraction_model``. """ try: from kg_extraction import run_batch_extraction extraction_model = ( getattr(config, "kg_extraction_model", None) if config is not None else None ) result = await run_batch_extraction( redis=redis, kg_manager=kg_manager, openrouter=openrouter, override_model=extraction_model, ) total = result.get("entities_added", 0) + result.get("relationships_added", 0) if total: logger.info("KG extraction added %d entities/relationships", total) except ImportError: pass except Exception: logger.exception("KG extraction failed")
[docs] @background_task( interval=21_600, # 6 hours initial_delay=600, requires_redis=True, requires_chat_llm=True, ) async def channel_summarization(redis: Any) -> None: """Refresh rolling summaries for the most active channels (every 6h). Periodic task that delegates to the channel-summarizer background agent to (re)summarise the top recently-active channels, keeping per-channel summaries used elsewhere in retrieval and context assembly current. Import errors are ignored; other failures are logged. Interactions: imports and awaits ``background_agents.channel_summarizer.summarise_all_active(redis=redis)``, which reads channel activity/history from Redis and calls the chat LLM to produce summaries it persists back. Logs the number of channels processed. Registered via the :func:`background_task` decorator into ``_BACKGROUND_TASKS_REGISTRY`` and dispatched by :func:`build_scheduler`; also re-registered explicitly in ``agents_main.py`` with ``args=(redis_client,)``. Args: redis (Any): Async Redis client passed by the scheduler. """ try: from background_agents.channel_summarizer import summarise_all_active result = await summarise_all_active(redis=redis) n = result.get("channels_processed", 0) if n: logger.info("Channel summarization processed %d channel(s)", n) except ImportError: pass except Exception: logger.exception("Channel summarization failed")
[docs] @background_task( interval=86_400, # 24 hours initial_delay=1800, requires_kg=True, requires_openrouter=True, requires_chat_llm=True, ) async def kg_consolidation_task( kg_manager: Any, openrouter: Any, ) -> None: """Merge and prune duplicate knowledge-graph entities (every 24h). Periodic task that runs the KG consolidation pass: detecting and merging duplicate entities and pruning low-value ones so the graph stays coherent over time. It records its outcome (or failure) as a debug-observability event. Import errors are ignored; other failures are logged and reported as an error event. Interactions: imports and awaits ``kg_consolidation.consolidate_graph(kg_manager=..., openrouter=...)``, which reads and mutates the FalkorDB knowledge graph and uses the LLM to judge merges; then schedules a fire-and-forget ``observability.publish_debug_event`` (topic ``kg_consolidation``) with the merged/pruned counts. Registered via the :func:`background_task` decorator into ``_BACKGROUND_TASKS_REGISTRY`` and dispatched by :func:`build_scheduler`; the consolidation service in ``consolidation_main.py`` also registers it explicitly under ``"kg_consolidation"``. Args: kg_manager (Any): Knowledge-graph manager (graph backend) to consolidate. openrouter (Any): OpenRouter client used to adjudicate merges. """ import time t0 = time.monotonic() try: from kg_consolidation import consolidate_graph result = await consolidate_graph( kg_manager=kg_manager, openrouter=openrouter, ) if result.get("merged") or result.get("pruned_entities"): logger.info( "KG consolidation: merged=%d pruned=%d", result.get("merged", 0), result.get("pruned_entities", 0), ) from observability import publish_debug_event asyncio.create_task( publish_debug_event( "kg_consolidation", "background_tasks", status="ok", duration_ms=(time.monotonic() - t0) * 1000, preview=f"merged={result.get('merged', 0)} pruned={result.get('pruned_entities', 0)}", payload={ "merged": result.get("merged", 0), "pruned_entities": result.get("pruned_entities", 0), }, ), name="obs_kg_consolidation", ) except ImportError: pass except Exception as e: logger.exception("KG consolidation failed") from observability import publish_debug_event asyncio.create_task( publish_debug_event( "kg_consolidation", "background_tasks", status="error", duration_ms=(time.monotonic() - t0) * 1000, preview=f"error={str(e)[:100]}", payload={"error": str(e)}, ), name="obs_kg_consolidation_err", )
[docs] @background_task( interval=86_400, # 24 hours initial_delay=3600, requires_kg=True, ) async def kg_decay_task(kg_manager: Any) -> None: """Decay knowledge-graph relationship weights and prune weak edges (every 24h). Periodic task that ages out stale connections by decaying relationship weights and removing edges that fall below threshold, so the graph reflects recency. Unlike :func:`kg_consolidation_task`, it is a pure graph operation and does not call the LLM. It records an outcome (or error) debug-observability event. Import errors are ignored; other failures are logged. Interactions: imports and awaits ``kg_consolidation.decay_relationships(kg_manager)``, which mutates the FalkorDB knowledge graph; then schedules a fire-and-forget ``observability.publish_debug_event`` (topic ``kg_decay``) with the number of relationships removed. Registered via the :func:`background_task` decorator into ``_BACKGROUND_TASKS_REGISTRY`` and dispatched by :func:`build_scheduler`; the consolidation service in ``consolidation_main.py`` also registers it explicitly under ``"kg_decay"``. Args: kg_manager (Any): Knowledge-graph manager (graph backend) to decay. """ import time t0 = time.monotonic() try: from kg_consolidation import decay_relationships removed = await decay_relationships(kg_manager) if removed: logger.info("KG decay removed %d weak relationships", removed) from observability import publish_debug_event asyncio.create_task( publish_debug_event( "kg_decay", "background_tasks", status="ok", duration_ms=(time.monotonic() - t0) * 1000, preview=f"removed={removed}", payload={"removed": removed}, ), name="obs_kg_decay", ) except ImportError: pass except Exception as e: logger.exception("KG relationship decay failed") from observability import publish_debug_event asyncio.create_task( publish_debug_event( "kg_decay", "background_tasks", status="error", duration_ms=(time.monotonic() - t0) * 1000, preview=f"error={str(e)[:100]}", payload={"error": str(e)}, ), name="obs_kg_decay_err", )
[docs] @background_task( interval=10, # restart 10s after disconnect/crash initial_delay=5, name="journal_stream", requires_redis=True, ) async def journal_stream_task(redis: Any) -> None: """Tail the systemd journal and republish lines to Redis Pub/Sub. Long-running task that spawns ``journalctl -u stargazer -f`` and forwards each JSON log line to the ``stargazer:journal`` Redis channel, powering the live log feed in the web debug console. If ``journalctl`` is unavailable it returns immediately; the scheduler restarts the task ~10s after any exit or crash. The ``finally`` block terminates (then kills) the subprocess on cancellation or exit so no orphaned ``journalctl`` is left behind. Interactions: launches a ``journalctl`` subprocess via ``asyncio.create_subprocess_exec``, reads its stdout line by line, and calls ``redis.publish("stargazer:journal", ...)`` for each line. Registered via the :func:`background_task` decorator into ``_BACKGROUND_TASKS_REGISTRY`` and dispatched by :func:`build_scheduler`; also re-registered explicitly in ``agents_main.py`` with ``args=(redis_client,)`` and exercised by ``tests/test_background_tasks_shutdown.py``. Args: redis (Any): Async Redis client used for the Pub/Sub publish. """ import asyncio proc = None try: try: proc = await asyncio.create_subprocess_exec( "journalctl", "-u", "stargazer", "-f", "-n", "50", "--output=json", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) except FileNotFoundError: return if not proc.stdout: return while True: line = await proc.stdout.readline() if not line: break try: await redis.publish("stargazer:journal", line.decode("utf-8").strip()) except Exception: pass finally: if proc is not None and proc.returncode is None: try: proc.terminate() await asyncio.shield(asyncio.wait_for(proc.wait(), timeout=2.0)) except (asyncio.TimeoutError, Exception): try: proc.kill() await asyncio.shield(proc.wait()) except Exception: pass except asyncio.CancelledError: try: proc.kill() await asyncio.shield(proc.wait()) except Exception: pass raise
[docs] @background_task( interval=21_600, # 6 hours initial_delay=30, name="log_rag_ingest", opt_in_config_flag="background_scheduler_log_rag_ingest_enabled", ) async def log_rag_ingest_task() -> None: """Ingest recent journal logs into the log RAG store (every 6h, opt-in). Periodic task that pulls recent ``journalctl`` output and embeds it into the ``stargazer_logs`` retrieval store so logs become searchable for diagnostics. Gated behind the ``background_scheduler_log_rag_ingest_enabled`` opt-in flag (off by default). Import errors are ignored; other failures are logged. Interactions: imports and awaits ``log_rag_ingest.ingest_logs_tick()``, which reads logs and writes embeddings into the log RAG store. Registered via the :func:`background_task` decorator into ``_BACKGROUND_TASKS_REGISTRY`` and dispatched by :func:`build_scheduler`; also re-registered explicitly in ``agents_main.py`` (only when the opt-in flag is set). Takes no arguments. """ try: from log_rag_ingest import ingest_logs_tick await ingest_logs_tick() except ImportError: pass except Exception: logger.exception("Log RAG ingest failed")
[docs] @background_task( interval=7_200, # 2 hours initial_delay=60, name="gemini_key_probe", ) async def gemini_key_probe_task() -> None: """Probe the shared Gemini API keys for quota exhaustion (every 2h). Periodic task that pings every key in the shared Gemini embedding pool so keys that have hit their daily quota are detected and marked, letting the embedding pool route around exhausted keys instead of failing live requests. Import errors are ignored; other failures are logged. Interactions: imports and awaits ``gemini_embed_pool.probe_all_keys()``, which issues lightweight probe calls per key and updates the pool's per-key availability/quota state. Registered via the :func:`background_task` decorator into ``_BACKGROUND_TASKS_REGISTRY`` and dispatched by :func:`build_scheduler`; also re-registered explicitly in ``agents_main.py``. Takes no arguments. """ try: from gemini_embed_pool import probe_all_keys await probe_all_keys() except ImportError: pass except Exception: logger.exception("Gemini key probe failed")
[docs] @background_task( interval=14_400, # 4 hours initial_delay=900, name="agentic_kg_bulk_incremental", requires_redis=True, requires_kg=True, requires_openrouter=True, requires_chat_llm=True, ) async def agentic_kg_bulk_incremental_task( redis: Any, kg_manager: Any, openrouter: Any, ) -> None: """Run incremental agentic bulk KG extraction over active channels (every 4h). Periodic task that drives the heavyweight agentic bulk-extraction pipeline against the most-recently-used channels, using per-channel incremental cursors so only messages newer than the last run are processed. Each run dumps its working set under ``var/kg_bulk_scheduled/<timestamp>`` and feeds the agentic pipeline that grows the knowledge graph. It short-circuits when Redis or the API key is unconfigured, when there are no active channels, or when no new messages exist, emitting a debug-observability event at each milestone. Import errors are ignored; other failures are logged and reported as an error event. Interactions: loads ``Config``, calls ``message_cache.get_active_channels`` and ``kg_bulk_runner.collect_messages_from_redis`` (reading messages and cursor state from Redis), writes a dump directory to the filesystem, then awaits ``kg_bulk_runner.run_agentic_bulk_pipeline``, which uses the LLM and mutates the knowledge graph. Progress is reported through the nested :func:`_emit` closure (topic ``kg_bulk_incremental``). The passed ``kg_manager`` and ``openrouter`` are accepted only for scheduler parity with the other KG tasks and are not used directly here. Registered via the :func:`background_task` decorator into ``_BACKGROUND_TASKS_REGISTRY`` and dispatched by :func:`build_scheduler`; also re-registered explicitly in ``agents_main.py`` with ``args=(redis_client, self.kg_manager, self.openrouter)``. Args: redis (Any): Async Redis client (message and cursor source). kg_manager (Any): Knowledge-graph manager (accepted for parity, unused). openrouter (Any): OpenRouter client (accepted for parity, unused). """ try: from datetime import datetime, timezone from pathlib import Path from config import Config from kg_bulk_runner import ( KgBulkPipelineParams, collect_messages_from_redis, resolve_bulk_backend, run_agentic_bulk_pipeline, ) from message_cache import get_active_channels _ = kg_manager, openrouter # scheduler parity with other KG tasks import time t0 = time.monotonic() def _emit( status: str, msg_count: int = 0, n_zsets_val: int = 0, channels_proc: int = 0, out_dir_val: str = "", error_str: str = "", ): """Publish a ``kg_bulk_incremental`` debug-observability event. Closure over the enclosing task's start time ``t0`` that fires a fire-and-forget debug event summarising the current bulk-extraction phase (status, channel/message counts, output dir) for the web debug console. Interactions schedules ``observability.publish_debug_event`` (topic ``kg_bulk_incremental``, source ``background_tasks``) as a detached ``asyncio`` task, computing ``duration_ms`` from ``t0``. Called only within :func:`agentic_kg_bulk_incremental_task` at the skip/no-new-messages/success milestones. Args: status (str): Outcome marker (e.g. ``"skipped"``, ``"no_new_messages"``, ``"ok"``). msg_count (int): Number of messages processed this run. n_zsets_val (int): Number of Redis sorted-set cursors scanned. channels_proc (int): Number of channels processed. out_dir_val (str): Output directory path for this run's dump. error_str (str): Optional error string for the payload. """ from observability import publish_debug_event asyncio.create_task( publish_debug_event( "kg_bulk_incremental", "background_tasks", phase="pipeline", status=status, duration_ms=(time.monotonic() - t0) * 1000, preview=f"channels={channels_proc} messages={msg_count} out_dir={out_dir_val}", payload={ "channels_processed": channels_proc, "messages_processed": msg_count, "n_zsets": n_zsets_val, "out_dir": str(out_dir_val), "error": error_str, }, ), name="obs_kg_bulk_incremental", ) cfg = Config.load() if not cfg.redis_url or not cfg.api_key: logger.debug( "agentic_kg_bulk_incremental: skip (redis or api_key missing)", ) _emit("skipped") return channels = await get_active_channels(redis, limit=10) if not channels: logger.info("agentic_kg_bulk_incremental: no active channels") _emit("skipped") return messages, n_zsets = await collect_messages_from_redis( redis, incremental=True, cursor_bootstrap="latest", channel_pairs=channels, ) if not messages: logger.info( "agentic_kg_bulk_incremental: no new messages for %d channel(s)", len(channels), ) _emit("no_new_messages", channels_proc=len(channels)) return repo_root = Path(__file__).resolve().parent out_root = repo_root / "var" / "kg_bulk_scheduled" out_root.mkdir(parents=True, exist_ok=True) out_dir = out_root / datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ") out_dir.mkdir(parents=True, exist_ok=True) params = KgBulkPipelineParams( out_dir=out_dir, dump_only=False, dry_run_chunks=False, dry_run_llm=False, per_channel=True, incremental=True, bulk_llm_backend=resolve_bulk_backend(None), fetch_channel_metadata=False, ) await run_agentic_bulk_pipeline( cfg, redis, messages, n_zsets, params, load_channel_metadata=None, ) logger.info( "agentic_kg_bulk_incremental: processed %d message(s) -> %s", len(messages), out_dir, ) _emit( "ok", msg_count=len(messages), n_zsets_val=n_zsets, channels_proc=len(channels), out_dir_val=str(out_dir), ) except ImportError: pass except Exception as e: logger.exception("agentic_kg_bulk_incremental failed") from observability import publish_debug_event asyncio.create_task( publish_debug_event( "kg_bulk_incremental", "background_tasks", phase="pipeline", status="error", preview=f"error={str(e)[:100]}", payload={"error": str(e)}, ), name="obs_kg_bulk_incremental_err", )
[docs] @background_task( interval=1200, # 20 minutes initial_delay=300, # 5 minutes after boot name="anamnesis_digest", requires_redis=True, requires_kg=True, requires_openrouter=True, requires_chat_llm=True, requires_config=True, ) async def anamnesis_digest_task( redis: Any, kg_manager: Any, openrouter: Any, config: Any, ) -> None: """Digest Spiral Goddess RAG fragments into the knowledge graph (every 20m). Periodic task that runs one anamnesis cycle: advancing a cursor through the Spiral Goddess RAG corpus and folding fresh fragments into the knowledge graph as entities and relationships, so curated lore is progressively absorbed into long-term memory. Import errors are ignored; other failures are logged. Logs cursor progress when anything is added. Interactions: imports and awaits ``anamnesis_engine.run_anamnesis_cycle(redis=..., kg_manager=..., openrouter=..., config=...)``, which reads the RAG fragment store (Redis), calls the LLM via the OpenRouter client to extract structure, and writes results into the FalkorDB knowledge graph while persisting its cursor. Registered via the :func:`background_task` decorator into ``_BACKGROUND_TASKS_REGISTRY`` and dispatched by :func:`build_scheduler`; also re-registered explicitly in ``agents_main.py`` with ``args=(redis_client, self.kg_manager, self.openrouter, self.cfg)``. Args: redis (Any): Async Redis client (RAG fragment and cursor store). kg_manager (Any): Knowledge-graph manager written to. openrouter (Any): OpenRouter client used for LLM extraction. config (Any): Config controlling the anamnesis cycle. """ try: from anamnesis_engine import run_anamnesis_cycle result = await run_anamnesis_cycle( redis=redis, kg_manager=kg_manager, openrouter=openrouter, config=config, ) total = result.get("entities_added", 0) + result.get("relationships_added", 0) if total: logger.info( "Anamnesis digest: +%d entities/rels (cursor %d/%d, %.1f%%)", total, result.get("cursor", 0), result.get("total_store", 0), result.get("progress_pct", 0), ) except ImportError: pass except Exception: logger.exception("Anamnesis digest failed")
[docs] async def startup_channel_backfill( redis: Any, message_cache: Any, embedding_queue: Any, adapters: list[Any], max_channels: int = 10, ) -> dict[str, int]: """Backfill recent history for the most active channels at startup. For each of the top *max_channels* most-recently-active channels, fetches history from the appropriate platform adapter, deduplicates against Redis, caches new messages, and enqueues embeddings. Returns a summary dict with ``channels_processed`` and ``messages_cached``. """ # 0. Give adapters that are still mid-connect a brief window to become # ready before we catalog channels. Platform adapters (notably Discord, # whose gateway READY can land just after start() returns) report # is_running=False until fully connected; without this wait the backfill # would silently skip them. Bounded so a permanently-down platform cannot # stall the backfill indefinitely. if adapters: loop = asyncio.get_running_loop() readiness_deadline = loop.time() + 30.0 while loop.time() < readiness_deadline and any( not getattr(a, "is_running", False) for a in adapters ): await asyncio.sleep(0.5) not_ready = [ getattr(a, "name", "?") for a in adapters if not getattr(a, "is_running", False) ] if not_ready: logger.info( "Startup backfill: proceeding without fully-ready adapters %s " "(they will backfill on the next pass once connected)", not_ready, ) # 1. Catalog online channels and their metadata from active running adapters adapter_channels: dict[tuple[str, str], dict[str, Any]] = {} for adapter in adapters: if not getattr(adapter, "is_running", False): continue try: servers = await adapter.list_servers_and_channels() platform_name = adapter.name for server in servers: server_id = server.get("server_id", "") server_name = server.get("server_name", "") member_count = server.get("member_count") nested_channels = server.get("channels") if nested_channels: # Discord-style nested channels for ch in nested_channels: ch_id = ch.get("channel_id") if ch_id: ch_type = ch.get("type", "text") if ch_type == "text": adapter_channels[(platform_name, ch_id)] = { "channel_name": ch.get("channel_name", ""), "guild_id": server_id, "guild_name": server_name, "is_dm": False, } else: # Matrix-style flat room structure if server_id: is_dm = (platform_name == "matrix" and member_count is not None and member_count <= 2) adapter_channels[(platform_name, server_id)] = { "channel_name": server_name or "", "guild_id": "", "guild_name": "", "is_dm": is_dm, } except Exception: logger.exception("Startup backfill: failed to catalog channels for %s", adapter.name) from message_cache import get_active_channels channels = await get_active_channels(redis, limit=max_channels) if not channels: logger.info("Startup backfill: no active channels found in Redis, performing platform fallback discovery") channels = list(adapter_channels.keys())[:max_channels] if not channels: logger.info("Startup backfill: no channels discovered from Redis or platform adapters") return {"channels_processed": 0, "messages_cached": 0} adapter_map: dict[str, Any] = {} for adapter in adapters: adapter_map[adapter.name] = adapter total_cached = 0 channels_done = 0 total_embeddings_enqueued = 0 for platform_name, channel_id in channels: adapter = adapter_map.get(platform_name) if adapter is None or not adapter.is_running: continue # Record channel metadata to Redis to prevent fail-closed recall failures meta = adapter_channels.get((platform_name, channel_id)) if meta: try: await message_cache.record_channel_metadata( platform=platform_name, channel_id=channel_id, channel_name=meta["channel_name"], guild_id=meta["guild_id"], guild_name=meta["guild_name"], is_dm=meta["is_dm"], ) except Exception: logger.debug( "Startup backfill: failed to record channel metadata for %s:%s", platform_name, channel_id, exc_info=True, ) bot_uid = "" if hasattr(adapter, "bot_identity"): from message_processor.user_message_format import resolve_bot_speaker _, bot_uid = resolve_bot_speaker(adapter, None) try: history = await adapter.fetch_history(channel_id, limit=100) except Exception: logger.debug( "Startup backfill: fetch_history failed for %s:%s", platform_name, channel_id, exc_info=True, ) continue if not history: continue all_message_ids = [hm.message_id for hm in history if hm.message_id] existing_lookup: dict[str, str] = {} existing_text_by_key: dict[str, str] = {} if all_message_ids: try: existing_lookup = await message_cache.find_keys_by_message_ids( platform_name, channel_id, all_message_ids, ) except Exception: logger.debug( "Startup backfill: message ID lookup failed for %s:%s", platform_name, channel_id, exc_info=True, ) if existing_lookup: try: keys = list(dict.fromkeys(existing_lookup.values())) pipe = message_cache.redis_client.pipeline() for key in keys: pipe.hget(key, "text") values = await pipe.execute() existing_text_by_key = { key: _decode_redis_text(value) for key, value in zip(keys, values) } except Exception: logger.debug( "Startup backfill: text lookup failed for %s:%s", platform_name, channel_id, exc_info=True, ) need_embed_check: list[tuple[str, str]] = [] new_messages = [] pending_embeddings: list[tuple[str, str]] = [] for hm in history: existing_key = existing_lookup.get(hm.message_id) if hm.message_id else None if existing_key: if hm.text != existing_text_by_key.get(existing_key, ""): try: await message_cache.redis_client.hset( existing_key, "text", hm.text, ) if hm.text: pending_embeddings.append((existing_key, hm.text)) except Exception: logger.debug( "Startup backfill: failed to repair rich text " "for message %s", hm.message_id, exc_info=True, ) if hm.text: need_embed_check.append((existing_key, hm.text)) else: new_messages.append(hm) if need_embed_check: try: check_keys = [k for k, _ in need_embed_check] has_emb = await message_cache.has_real_embedding_many( check_keys, ) for (key, text), has in zip(need_embed_check, has_emb): if not has: pending_embeddings.append((key, text)) except Exception: pending_embeddings.extend(need_embed_check) for hm in new_messages: try: cached_msg = await message_cache.log_message( platform=platform_name, channel_id=channel_id, user_id=hm.user_id, user_name=hm.user_name, text=hm.text, timestamp=hm.timestamp.timestamp(), defer_embedding=True, message_id=hm.message_id, reply_to_id=hm.reply_to_id, kind="assistant_out" if (hm.user_id == bot_uid if bot_uid else hm.is_bot) else "user_in", ) if cached_msg.message_key and hm.text: pending_embeddings.append( (cached_msg.message_key, hm.text), ) total_cached += 1 except Exception: logger.debug( "Startup backfill: failed to cache message", exc_info=True, ) if pending_embeddings and embedding_queue is not None: try: await embedding_queue.enqueue_many(pending_embeddings) total_embeddings_enqueued += len(pending_embeddings) except Exception: logger.debug( "Startup backfill: enqueue_many failed for %s:%s", platform_name, channel_id, exc_info=True, ) channels_done += 1 logger.info( "Startup backfill complete: %d channels, %d new messages cached, " "%d embeddings enqueued", channels_done, total_cached, total_embeddings_enqueued, ) return {"channels_processed": channels_done, "messages_cached": total_cached}
[docs] @background_task( interval=86_400.0, initial_delay=120, name="channel_heartbeat", requires_bot_runner=True, requires_chat_llm=True, ) async def channel_heartbeat_task(bot_runner: Any) -> None: """Run the staggered per-channel heartbeat loop (flash model). Periodic task that starts the channel-heartbeat background agent, which walks the most-recently-used channels on a stagger and uses the cheap "flash" model to keep the bot's presence/awareness warm across channels. It no-ops if the heartbeat module is unavailable. The decorator interval is a crash-restart cadence; the loop itself paces its own per-channel timing. Interactions: imports and awaits ``background_agents.channel_heartbeat.channel_heartbeat_loop(bot_runner)``, passing the runner so the loop can reach Redis, adapters, and the LLM. Registered via the :func:`background_task` decorator into ``_BACKGROUND_TASKS_REGISTRY`` and dispatched by :func:`build_scheduler`; imported and registered by the inference service in ``inference_main.py``. Args: bot_runner (Any): Service/runner object the heartbeat loop operates on. """ try: from background_agents.channel_heartbeat import channel_heartbeat_loop except ImportError: return await channel_heartbeat_loop(bot_runner)
[docs] @background_task( interval=3600, # Dynamic interval overridden in build_scheduler initial_delay=600, name="starwiki_lint", requires_bot_runner=True, opt_in_config_flag="starwiki_enabled", ) async def starwiki_lint_task(bot_runner: Any) -> None: """Run the scheduled StarWiki lint over public and opt-in wikis. Periodic task that delegates to the StarWiki bookkeeper to lint public and opt-in wiki content. It no-ops when no ``starwiki_service`` is attached to the runner. Gated on the ``starwiki_enabled`` opt-in flag, and its interval is dynamically overridden in :func:`build_scheduler` from ``starwiki_lint_interval_minutes``. Interactions reads ``bot_runner.starwiki_service`` and calls ``starwiki.bookkeeper.scheduled_lint_public_and_opt_in(svc)``. Called by the scheduler loop; registered via the :func:`background_task` decorator into ``_BACKGROUND_TASKS_REGISTRY`` (and explicitly re-registered in ``agents_main.py`` with ``args=(self,)``). Args: bot_runner (Any): Service/runner object exposing ``starwiki_service``. """ svc = getattr(bot_runner, "starwiki_service", None) if svc is None: return from starwiki.bookkeeper import scheduled_lint_public_and_opt_in await scheduled_lint_public_and_opt_in(svc)
[docs] @background_task( interval=60, # Resilience restart interval if loop crashes initial_delay=15, name="limbic_anchoring_worker", requires_bot_runner=True, requires_chat_llm=True, ) async def limbic_anchoring_task(bot_runner: Any) -> None: """Run the persistent real-time Limbic knowledge-anchoring loop. Long-lived worker that drives the knowledge-anchoring system: it continuously discovers active channels and extracts/anchors knowledge from new messages into the graph in near real time. No-ops when ``ka_enabled`` is false. Channel discovery is fully dynamic (via Redis SCAN inside the worker), so the initial channel list is intentionally empty. The decorator interval is only a crash-restart cadence; the worker runs its own internal loop. Interactions: reads ``bot_runner.cfg`` and constructs an ``AnchoringWorker`` wired to the message-cache Redis client, the ``kg_manager`` (FalkorDB graph), the embedding queue, and a local ``extraction_llm_call`` closure that posts to the keyless Gemini proxy; then awaits ``worker.anchoring_loop()``. Registered via the :func:`background_task` decorator into ``_BACKGROUND_TASKS_REGISTRY`` and dispatched by :func:`build_scheduler`; also re-registered explicitly in ``agents_main.py`` with ``args=(self,)``. Args: bot_runner (Any): Service/runner exposing ``cfg``, ``message_cache``, ``kg_manager``, and ``embedding_queue``. """ from knowledge_anchoring.worker import AnchoringWorker cfg = bot_runner.cfg if not cfg.ka_enabled: return # F9-1: Channel discovery is 100% dynamic via Redis SCAN in _discover_channels(). # The initial channel list is intentionally empty; the first _tick() call immediately # discovers active channels from the ZSET namespace without any static configuration. channels: list[tuple[str, str]] = [] async def extraction_llm_call( prompt: str, response_format: dict | None = None ) -> str: """Call the keyless local Gemini proxy to perform KA extraction. Closure over the enclosing task's ``cfg`` that issues a single chat/completions request to the in-process LLM proxy for knowledge- anchoring extraction. It uses a low temperature for stability, honours an optional ``response_format`` (e.g. JSON mode), treats HTTP 429 as a soft failure returning an empty string, and raises on empty content or other transport errors so the caller can retry. Interactions: builds the URL from ``cfg.llm_base_url`` and posts JSON via ``httpx.AsyncClient`` (timeout from ``cfg.ka_extraction_http_timeout_seconds``) using ``cfg.kg_extraction_model``; logs request/response metadata. Called by the ``AnchoringWorker`` constructed in :func:`limbic_anchoring_task`, which is passed this closure as its ``llm_call``. Args: prompt (str): User prompt sent as the sole chat message. response_format (dict | None): Optional structured-output spec. Returns: str: The model's content, or an empty string on a 429 rate limit. Raises: ValueError: If the proxy returns 200 with empty content. Exception: Re-raises any underlying HTTP/transport failure. """ import httpx url = f"{cfg.llm_base_url.rstrip('/')}/chat/completions" logger.info( "Knowledge Anchoring: starting extraction call with model=%s", cfg.kg_extraction_model, ) payload = { "model": cfg.kg_extraction_model, "messages": [{"role": "user", "content": prompt}], "temperature": 0.1, # Lower temperature for extraction stability "max_tokens": 4096, } if response_format: payload["response_format"] = response_format async with httpx.AsyncClient( timeout=httpx.Timeout(cfg.ka_extraction_http_timeout_seconds, connect=10.0) ) as client: try: resp = await client.post( url, json=payload, headers={"Content-Type": "application/json"}, ) if resp.status_code == 429: logger.warning("Extraction proxy rate-limited (429)") return "" resp.raise_for_status() data = resp.json() content = ( data.get("choices", [{}])[0].get("message", {}).get("content", "") ) # Log metadata for visibility resp_model = data.get("model", "unknown") usage = data.get("usage", {}) logger.info( "Extraction proxy success: model=%s, usage=%s, content=%r...", resp_model, usage, content[:100], ) if not content: logger.warning( "Extraction proxy returned 200 OK but empty content. Body: %s", data, ) raise ValueError("Empty content from extraction proxy") return content except Exception as e: logger.warning("Extraction proxy call failed: %s", e) raise worker = AnchoringWorker( config=cfg, redis=bot_runner.message_cache.redis_client, kg_manager=bot_runner.kg_manager, llm_call=extraction_llm_call, channels=channels, message_cache=bot_runner.message_cache, embedding_queue=bot_runner.embedding_queue, ) await worker.anchoring_loop()
[docs] @background_task( interval=3600, # Resilience restart interval if loop crashes initial_delay=3600, name="limbic_sunset_processor", requires_bot_runner=True, requires_chat_llm=True, ) async def limbic_sunset_task(bot_runner: Any) -> None: """Run the daily Limbic sunset (epoch synthesis) loop. Long-lived worker that drives the knowledge-anchoring "sunset" pass: periodically synthesising the day's anchored knowledge into a consolidated epoch in the graph. No-ops when ``ka_enabled`` is false. The decorator interval is a crash-restart cadence; the loop paces its own daily cycle. Interactions: reads ``bot_runner.cfg`` and awaits ``knowledge_anchoring.sunset_processor.sunset_loop`` wired to the message-cache Redis client (and its raw handle), the ``kg_manager`` (FalkorDB graph), the message cache, and a ``sunset_llm_call`` closure that delegates to ``bot_runner.openrouter.chat`` with ``override_model=cfg.kg_extraction_model``. Registered via the :func:`background_task` decorator into ``_BACKGROUND_TASKS_REGISTRY`` and dispatched by :func:`build_scheduler`; also re-registered explicitly in ``agents_main.py`` with ``args=(self,)``. Args: bot_runner (Any): Service/runner exposing ``cfg``, ``openrouter``, ``message_cache``, and ``kg_manager``. """ from knowledge_anchoring.sunset_processor import sunset_loop cfg = bot_runner.cfg if not cfg.ka_enabled: return async def sunset_llm_call(prompt: str, response_format: dict | None = None) -> str: """Adapt a plain-string prompt to ``openrouter.chat`` for sunset synthesis. Closure over the enclosing task's ``bot_runner`` and ``cfg`` that wraps the prompt as a single user message and forwards it to the OpenRouter client, pinning the extraction model so sunset synthesis uses the same model as the rest of knowledge anchoring. Interactions: awaits ``bot_runner.openrouter.chat`` with ``override_model=cfg.kg_extraction_model`` and the optional ``response_format``. Called by ``knowledge_anchoring.sunset_processor.sunset_loop``, which receives this closure as its ``llm_call``. Args: prompt (str): User prompt sent as the sole chat message. response_format (dict | None): Optional structured-output spec. Returns: str: The model's response content. """ return await bot_runner.openrouter.chat( messages=[{"role": "user", "content": prompt}], override_model=cfg.kg_extraction_model, response_format=response_format, ) await sunset_loop( redis=bot_runner.message_cache.redis_client, redis_raw=bot_runner.message_cache._redis_raw, kg_manager=bot_runner.kg_manager, llm_call=sunset_llm_call, config=cfg, message_cache=bot_runner.message_cache, )
[docs] async def limbic_dedup_task(bot_runner: Any) -> None: """Run the persistent Limbic knowledge-graph deduplication loop. Long-lived worker that drives the knowledge-anchoring dedup loop, merging duplicate KG entities/relationships via the LLM. No-ops when ``ka_dedup_enabled`` is false. Unlike the decorated periodic tasks, this is supervised by :class:`TaskSupervisor` (leader-elected) rather than the interval scheduler, so only one node runs it at a time. Interactions reads ``bot_runner.cfg``; defines a ``dedup_llm_call`` closure that wraps prompts and calls ``bot_runner.openrouter.chat`` with ``override_model=cfg.kg_extraction_model``; then runs ``knowledge_anchoring.dedup_worker.dedup_loop`` with ``bot_runner.kg_manager`` as the graph backend. Called by the agents service in ``agents_main.py`` via ``self.supervisor.register_task("limbic_dedup_worker", lambda: limbic_dedup_task(self))`` (and the analogous dedup-registration test). Args: bot_runner (Any): Service/runner exposing ``cfg``, ``openrouter``, and ``kg_manager``. """ from knowledge_anchoring.dedup_worker import dedup_loop cfg = bot_runner.cfg if not getattr(cfg, "ka_dedup_enabled", True): return async def dedup_llm_call(prompt: str, response_format: dict | None = None) -> str: """Adapt a plain-string prompt to ``openrouter.chat`` for dedup judging. Closure over the enclosing task's ``bot_runner`` and ``cfg`` that wraps the prompt as a single user message and forwards it to the OpenRouter client, pinning the extraction model so the dedup loop's merge decisions use the same model as the rest of knowledge anchoring. Interactions: awaits ``bot_runner.openrouter.chat`` with ``override_model=cfg.kg_extraction_model`` and the optional ``response_format``. Called by ``knowledge_anchoring.dedup_worker.dedup_loop``, which receives this closure as its ``llm_call``. Args: prompt (str): User prompt sent as the sole chat message. response_format (dict | None): Optional structured-output spec. Returns: str: The model's response content. """ return await bot_runner.openrouter.chat( messages=[{"role": "user", "content": prompt}], override_model=cfg.kg_extraction_model, response_format=response_format, ) await dedup_loop( kg_manager=bot_runner.kg_manager, llm_call=dedup_llm_call, config=cfg, )
[docs] async def limbic_org_task(bot_runner: Any) -> None: """Run the persistent Limbic knowledge-graph organization loop. Long-lived worker that drives the knowledge-anchoring organization loop, restructuring high degree nodes into hierarchies. No-ops when ``ka_org_enabled`` is false. Supervised by :class:`TaskSupervisor` (leader-elected) rather than the interval scheduler. """ logger.info("limbic_org_task: starting persistent Limbic knowledge-graph organization task supervisor") from knowledge_anchoring.org_worker import org_loop cfg = bot_runner.cfg if not getattr(cfg, "ka_org_enabled", True): logger.info("limbic_org_task: ka_org_enabled is false. Supervisor will exit.") return logger.info( "limbic_org_task: starting persistent org_loop (interval_seconds=%d, hub_degree_threshold=%d)", getattr(cfg, "ka_org_interval_seconds", 600), getattr(cfg, "ka_org_hub_degree_threshold", 1) ) async def org_llm_call(prompt: str, response_format: dict | None = None) -> str: """Adapt a plain-string prompt to ``openrouter.chat`` for organization judging. Closure over the enclosing task's ``bot_runner`` and ``cfg`` that wraps the prompt as a single user message and forwards it to the OpenRouter client, pinning the extraction model. """ logger.info("limbic_org_task: invoking Adapt OpenRouter chat for organization judge (model=%s)", cfg.kg_extraction_model) return await bot_runner.openrouter.chat( messages=[{"role": "user", "content": prompt}], override_model=cfg.kg_extraction_model, response_format=response_format, ) await org_loop( kg_manager=bot_runner.kg_manager, llm_call=org_llm_call, config=cfg, )
[docs] @background_task( interval=604_800, # 1 week initial_delay=7200, name="limbic_maintenance", requires_bot_runner=True, ) async def limbic_maintenance_task(bot_runner: Any) -> None: """Weekly maintenance for Limbic Anchoring. F9-4 fix: GC does not require LLM. It runs a pure Cypher query on FalkorDB. This task is therefore NOT gated on chat_llm; it will run even when LLM-based anchoring tasks are disabled by the operator. F9-3 fix: Passes kg_manager.query as the graph_query callable, which is the standard convention for all KA module functions. This is now documented. """ from knowledge_anchoring.garbage_collect_orphans import garbage_collect_orphans if not bot_runner or not bot_runner.cfg.ka_enabled: return # F9-3: kg_manager.query is the correct callable — signature: (cypher, params={}) await garbage_collect_orphans(bot_runner.kg_manager.query, bot_runner.cfg)
[docs] def build_scheduler( redis: Any = None, kg_manager: Any = None, openrouter: Any = None, bot_runner: Any = None, *, config: Any | None = None, ) -> BackgroundScheduler: """Build a :class:`BackgroundScheduler` with the standard tasks. Only registers tasks whose dependencies are available. When ``config.background_scheduler_chat_llm_enabled`` is false, skips periodic tasks that invoke chat/completions (see module docstring). Log RAG ingest is registered only when ``config.background_scheduler_log_rag_ingest_enabled`` is true. """ sched = BackgroundScheduler() chat_llm = True if config is not None: chat_llm = bool( getattr(config, "background_scheduler_chat_llm_enabled", True), ) if not chat_llm: logger.info( "Background scheduler: chat/completion LLM tasks disabled " "(scheduled_prompt_tick, auto_kg_extraction, " "agentic_kg_bulk_incremental, channel_summarization, " "channel_heartbeat, kg_consolidation, anamnesis_digest)", ) scheduled_count = 0 scheduled_names = [] for task in _BACKGROUND_TASKS_REGISTRY: name = task["name"] func = task["func"] interval = task["interval"] initial_delay = task["initial_delay"] # Check dependencies if task["requires_redis"] and redis is None: continue if task["requires_kg"] and kg_manager is None: continue if task["requires_openrouter"] and openrouter is None: continue if task["requires_bot_runner"] and bot_runner is None: continue # Check chat_llm gating if task["requires_chat_llm"] and not chat_llm: continue # Check opt-in config flag opt_in_flag = task["opt_in_config_flag"] if opt_in_flag is not None: flag_val = False if config is not None: flag_val = bool(getattr(config, opt_in_flag, False)) elif bot_runner is not None and hasattr(bot_runner, "cfg"): flag_val = bool(getattr(bot_runner.cfg, opt_in_flag, False)) if not flag_val: continue # Custom logic for dynamic interval override (e.g. starwiki_lint) if name == "starwiki_lint" and config is not None: _lint_min = float(getattr(config, "starwiki_lint_interval_minutes", 60)) interval = max(300.0, _lint_min * 60.0) # Construct args tuple args = [] if task["requires_redis"]: args.append(redis) if task["requires_kg"]: args.append(kg_manager) if task["requires_openrouter"]: args.append(openrouter) if task["requires_bot_runner"]: args.append(bot_runner) if task["requires_config"]: args.append(config) sched.register( name, func, interval=interval, initial_delay=initial_delay, args=tuple(args), ) scheduled_count += 1 scheduled_names.append(name) logger.info( "Successfully scheduled %d dynamic background tasks: %s.", scheduled_count, ", ".join(scheduled_names), ) return sched