Source code for background_tasks

"""Asyncio-based background task scheduler.

Manages periodic background tasks that run alongside the bot:
- Scheduled prompt tick / cleanup
- Auto memory extraction
- Periodic maintenance
"""

from __future__ import annotations

import asyncio
import logging
from typing import Any, Callable, Awaitable

logger = logging.getLogger(__name__)


[docs] class BackgroundScheduler: """Run named periodic tasks in the background. Each task is an async callable that is invoked repeatedly with a configurable interval. Tasks are resilient to individual failures and log errors without crashing the scheduler. Usage:: sched = BackgroundScheduler() sched.register("my_task", my_coro, interval=300) await sched.start() # non-blocking, spawns tasks ... await sched.stop() """
[docs] def __init__(self) -> None: """Initialize the instance. """ self._tasks: dict[str, _RegisteredTask] = {} self._running: dict[str, asyncio.Task] = {} self._started = False
[docs] def register( self, name: str, func: Callable[..., Awaitable[Any]], interval: float, initial_delay: float = 0.0, args: tuple = (), kwargs: dict | None = None, ) -> None: """Register a periodic task. Parameters ---------- name: Unique identifier for the task. func: Async callable to invoke each cycle. interval: Seconds between invocations (measured from end of previous run). initial_delay: Seconds to wait before the first invocation. """ self._tasks[name] = _RegisteredTask( name=name, func=func, interval=interval, initial_delay=initial_delay, args=args, kwargs=kwargs or {}, )
[docs] async def start(self) -> None: """Start all registered tasks.""" if self._started: return self._started = True for name, task in self._tasks.items(): self._running[name] = asyncio.create_task( self._loop(task), name=f"bg:{name}", ) logger.info( "Background scheduler started %d task(s)", len(self._running), )
[docs] async def stop(self) -> None: """Cancel all running tasks and wait for them to finish.""" self._started = False for name, t in list(self._running.items()): t.cancel() await asyncio.gather( *self._running.values(), return_exceptions=True, ) self._running.clear() logger.info("Background scheduler stopped")
# ------------------------------------------------------------------ @staticmethod async def _loop(task: _RegisteredTask) -> None: """Internal helper: loop. Args: task (_RegisteredTask): The task value. """ if task.initial_delay > 0: await asyncio.sleep(task.initial_delay) while True: try: await task.func(*task.args, **task.kwargs) except asyncio.CancelledError: raise except Exception: logger.exception("Background task '%s' failed", task.name) await asyncio.sleep(task.interval)
class _RegisteredTask: """ RegisteredTask. Attributes: name: The name. func: The func. interval: The interval. initial_delay: The initial delay. args: The args. kwargs: The kwargs. """ __slots__ = ("name", "func", "interval", "initial_delay", "args", "kwargs") def __init__( self, name: str, func: Callable[..., Awaitable[Any]], interval: float, initial_delay: float, args: tuple, kwargs: dict, ) -> None: """Initialize the instance. Args: name (str): Human-readable name. func (Callable[..., Awaitable[Any]]): The func value. interval (float): The interval value. initial_delay (float): The initial delay value. args (tuple): The args value. kwargs (dict): Additional keyword arguments. """ self.name = name self.func = func self.interval = interval self.initial_delay = initial_delay self.args = args self.kwargs = kwargs # ------------------------------------------------------------------ # Concrete task functions # ------------------------------------------------------------------
[docs] async def scheduled_prompt_tick(redis: Any) -> None: """Check for due scheduled prompts and trigger them.""" try: from tools.scheduled_prompt import tick_scheduled_prompts await tick_scheduled_prompts(redis) except ImportError: pass except Exception: logger.exception("Scheduled prompt tick failed")
[docs] async def scheduled_prompt_cleanup(redis: Any) -> None: """Remove old executed/cancelled scheduled prompts.""" try: from tools.scheduled_prompt import cleanup_expired_prompts await cleanup_expired_prompts(redis, days_to_keep=7) except ImportError: pass except Exception: logger.exception("Scheduled prompt cleanup failed")
[docs] async def auto_kg_extraction( redis: Any, kg_manager: Any, openrouter: Any, ) -> None: """Extract knowledge graph entities from recent conversations.""" try: from kg_extraction import run_batch_extraction result = await run_batch_extraction( redis=redis, kg_manager=kg_manager, openrouter=openrouter, ) total = result.get("entities_added", 0) + result.get("relationships_added", 0) if total: logger.info("KG extraction added %d entities/relationships", total) except ImportError: pass except Exception: logger.exception("KG extraction failed")
[docs] async def channel_summarization(redis: Any) -> None: """Summarise the 10 most recently active channels.""" try: from background_agents.channel_summarizer import summarise_all_active result = await summarise_all_active(redis=redis) n = result.get("channels_processed", 0) if n: logger.info("Channel summarization processed %d channel(s)", n) except ImportError: pass except Exception: logger.exception("Channel summarization failed")
[docs] async def kg_consolidation_task( kg_manager: Any, openrouter: Any, ) -> None: """Consolidate duplicate entities in the knowledge graph.""" try: from kg_consolidation import consolidate_graph result = await consolidate_graph( kg_manager=kg_manager, openrouter=openrouter, ) if result.get("merged") or result.get("pruned_entities"): logger.info( "KG consolidation: merged=%d pruned=%d", result.get("merged", 0), result.get("pruned_entities", 0), ) except ImportError: pass except Exception: logger.exception("KG consolidation failed")
[docs] async def kg_decay_task(kg_manager: Any) -> None: """Apply relationship weight decay in the knowledge graph.""" try: from kg_consolidation import decay_relationships removed = await decay_relationships(kg_manager) if removed: logger.info("KG decay removed %d weak relationships", removed) except ImportError: pass except Exception: logger.exception("KG relationship decay failed")
[docs] async def log_rag_ingest_task() -> None: """Ingest recent journalctl logs into the stargazer_logs RAG store.""" try: from log_rag_ingest import ingest_logs_tick await ingest_logs_tick() except ImportError: pass except Exception: logger.exception("Log RAG ingest failed")
[docs] async def gemini_key_probe_task() -> None: """Probe Gemini API keys to detect daily quota exhaustion.""" try: from gemini_embed_pool import probe_all_keys await probe_all_keys() except ImportError: pass except Exception: logger.exception("Gemini key probe failed")
[docs] async def agentic_kg_bulk_incremental_task( redis: Any, kg_manager: Any, openrouter: Any, ) -> None: """Agentic bulk KG extraction for the 10 MRU channels, incremental cursors.""" try: from datetime import datetime, timezone from pathlib import Path from config import Config from kg_bulk_runner import ( KgBulkPipelineParams, collect_messages_from_redis, resolve_bulk_backend, run_agentic_bulk_pipeline, ) from message_cache import get_active_channels _ = kg_manager, openrouter # scheduler parity with other KG tasks cfg = Config.load() if not cfg.redis_url or not cfg.api_key: logger.debug( "agentic_kg_bulk_incremental: skip (redis or api_key missing)", ) return channels = await get_active_channels(redis, limit=10) if not channels: logger.info("agentic_kg_bulk_incremental: no active channels") return messages, n_zsets = await collect_messages_from_redis( redis, incremental=True, cursor_bootstrap="latest", channel_pairs=channels, ) if not messages: logger.info( "agentic_kg_bulk_incremental: no new messages for %d channel(s)", len(channels), ) return repo_root = Path(__file__).resolve().parent out_root = repo_root / "var" / "kg_bulk_scheduled" out_root.mkdir(parents=True, exist_ok=True) out_dir = out_root / datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ") out_dir.mkdir(parents=True, exist_ok=True) params = KgBulkPipelineParams( out_dir=out_dir, dump_only=False, dry_run_chunks=False, dry_run_llm=False, per_channel=True, incremental=True, bulk_llm_backend=resolve_bulk_backend(None), fetch_channel_metadata=False, ) await run_agentic_bulk_pipeline( cfg, redis, messages, n_zsets, params, load_channel_metadata=None, ) logger.info( "agentic_kg_bulk_incremental: processed %d message(s) -> %s", len(messages), out_dir, ) except ImportError: pass except Exception: logger.exception("agentic_kg_bulk_incremental failed")
[docs] async def anamnesis_digest_task( redis: Any, kg_manager: Any, openrouter: Any, ) -> None: """Digest Spiral Goddess RAG fragments into the Knowledge Graph.""" try: from anamnesis_engine import run_anamnesis_cycle result = await run_anamnesis_cycle( redis=redis, kg_manager=kg_manager, openrouter=openrouter, ) total = result.get("entities_added", 0) + result.get("relationships_added", 0) if total: logger.info( "Anamnesis digest: +%d entities/rels (cursor %d/%d, %.1f%%)", total, result.get("cursor", 0), result.get("total_store", 0), result.get("progress_pct", 0), ) except ImportError: pass except Exception: logger.exception("Anamnesis digest failed")
[docs] async def startup_channel_backfill( redis: Any, message_cache: Any, embedding_queue: Any, adapters: list[Any], max_channels: int = 10, ) -> dict[str, int]: """Backfill recent history for the most active channels at startup. For each of the top *max_channels* most-recently-active channels, fetches history from the appropriate platform adapter, deduplicates against Redis, caches new messages, and enqueues embeddings. Returns a summary dict with ``channels_processed`` and ``messages_cached``. """ from message_cache import get_active_channels channels = await get_active_channels(redis, limit=max_channels) if not channels: logger.info("Startup backfill: no active channels found") return {"channels_processed": 0, "messages_cached": 0} adapter_map: dict[str, Any] = {} for adapter in adapters: adapter_map[adapter.name] = adapter total_cached = 0 channels_done = 0 for platform_name, channel_id in channels: adapter = adapter_map.get(platform_name) if adapter is None or not adapter.is_running: continue try: history = await adapter.fetch_history(channel_id, limit=100) except Exception: logger.debug( "Startup backfill: fetch_history failed for %s:%s", platform_name, channel_id, exc_info=True, ) continue if not history: continue all_message_ids = [hm.message_id for hm in history if hm.message_id] existing_lookup: dict[str, str] = {} if all_message_ids: try: existing_lookup = ( await message_cache.find_keys_by_message_ids( platform_name, channel_id, all_message_ids, ) ) except Exception: logger.debug( "Startup backfill: message ID lookup failed for %s:%s", platform_name, channel_id, exc_info=True, ) need_embed_check: list[tuple[str, str]] = [] new_messages = [] for hm in history: existing_key = ( existing_lookup.get(hm.message_id) if hm.message_id else None ) if existing_key: if hm.text: need_embed_check.append((existing_key, hm.text)) else: new_messages.append(hm) pending_embeddings: list[tuple[str, str]] = [] if need_embed_check: try: check_keys = [k for k, _ in need_embed_check] has_emb = await message_cache.has_real_embedding_many( check_keys, ) for (key, text), has in zip(need_embed_check, has_emb): if not has: pending_embeddings.append((key, text)) except Exception: pending_embeddings.extend(need_embed_check) for hm in new_messages: try: cached_msg = await message_cache.log_message( platform=platform_name, channel_id=channel_id, user_id="assistant" if hm.is_bot else hm.user_id, user_name="assistant" if hm.is_bot else hm.user_name, text=hm.text, timestamp=hm.timestamp.timestamp(), defer_embedding=True, message_id=hm.message_id, reply_to_id=hm.reply_to_id, ) if cached_msg.message_key and hm.text: pending_embeddings.append( (cached_msg.message_key, hm.text), ) total_cached += 1 except Exception: logger.debug( "Startup backfill: failed to cache message", exc_info=True, ) if pending_embeddings and embedding_queue is not None: try: await embedding_queue.enqueue_many(pending_embeddings) except Exception: logger.debug( "Startup backfill: enqueue_many failed for %s:%s", platform_name, channel_id, exc_info=True, ) channels_done += 1 logger.info( "Startup backfill complete: %d channels, %d new messages cached, " "%d embeddings enqueued", channels_done, total_cached, total_cached + len(pending_embeddings) if pending_embeddings else total_cached, ) return {"channels_processed": channels_done, "messages_cached": total_cached}
[docs] def build_scheduler( redis: Any = None, kg_manager: Any = None, openrouter: Any = None, ) -> BackgroundScheduler: """Build a :class:`BackgroundScheduler` with the standard tasks. Only registers tasks whose dependencies are available. """ sched = BackgroundScheduler() if redis is not None: sched.register( "scheduled_prompt_tick", scheduled_prompt_tick, interval=60, initial_delay=10, args=(redis,), ) sched.register( "scheduled_prompt_cleanup", scheduled_prompt_cleanup, interval=172_800, # 48 hours initial_delay=300, args=(redis,), ) if redis is not None and kg_manager is not None and openrouter is not None: sched.register( "auto_kg_extraction", auto_kg_extraction, interval=14_400, # 4 hours initial_delay=300, args=(redis, kg_manager, openrouter), ) sched.register( "agentic_kg_bulk_incremental", agentic_kg_bulk_incremental_task, interval=14_400, # 4 hours initial_delay=900, args=(redis, kg_manager, openrouter), ) if redis is not None: sched.register( "channel_summarization", channel_summarization, interval=21_600, # 6 hours initial_delay=600, args=(redis,), ) if kg_manager is not None and openrouter is not None: sched.register( "kg_consolidation", kg_consolidation_task, interval=86_400, # 24 hours initial_delay=1800, args=(kg_manager, openrouter), ) sched.register( "kg_decay", kg_decay_task, interval=86_400, # 24 hours initial_delay=3600, args=(kg_manager,), ) # Log RAG ingestion -- no dependencies required sched.register( "log_rag_ingest", log_rag_ingest_task, interval=21_600, # 6 hours initial_delay=30, ) # Gemini API key daily quota probe -- no dependencies required sched.register( "gemini_key_probe", gemini_key_probe_task, interval=7_200, # 2 hours initial_delay=60, ) # Anamnesis -- Spiral Goddess -> Knowledge Graph consolidation if redis is not None and kg_manager is not None and openrouter is not None: sched.register( "anamnesis_digest", anamnesis_digest_task, interval=1200, # 20 minutes initial_delay=300, # 5 minutes after boot args=(redis, kg_manager, openrouter), ) return sched