"""Asyncio-based background task scheduler.
Manages periodic background tasks that run alongside the bot:
- Scheduled prompt tick / cleanup
- Auto memory extraction
- Periodic maintenance
"""
from __future__ import annotations
import asyncio
import logging
from typing import Any, Callable, Awaitable
logger = logging.getLogger(__name__)
[docs]
class BackgroundScheduler:
"""Run named periodic tasks in the background.
Each task is an async callable that is invoked repeatedly with a
configurable interval. Tasks are resilient to individual failures
and log errors without crashing the scheduler.
Usage::
sched = BackgroundScheduler()
sched.register("my_task", my_coro, interval=300)
await sched.start() # non-blocking, spawns tasks
...
await sched.stop()
"""
[docs]
def __init__(self) -> None:
"""Initialize the instance.
"""
self._tasks: dict[str, _RegisteredTask] = {}
self._running: dict[str, asyncio.Task] = {}
self._started = False
[docs]
def register(
self,
name: str,
func: Callable[..., Awaitable[Any]],
interval: float,
initial_delay: float = 0.0,
args: tuple = (),
kwargs: dict | None = None,
) -> None:
"""Register a periodic task.
Parameters
----------
name:
Unique identifier for the task.
func:
Async callable to invoke each cycle.
interval:
Seconds between invocations (measured from end of previous run).
initial_delay:
Seconds to wait before the first invocation.
"""
self._tasks[name] = _RegisteredTask(
name=name,
func=func,
interval=interval,
initial_delay=initial_delay,
args=args,
kwargs=kwargs or {},
)
[docs]
async def start(self) -> None:
"""Start all registered tasks."""
if self._started:
return
self._started = True
for name, task in self._tasks.items():
self._running[name] = asyncio.create_task(
self._loop(task), name=f"bg:{name}",
)
logger.info(
"Background scheduler started %d task(s)", len(self._running),
)
[docs]
async def stop(self) -> None:
"""Cancel all running tasks and wait for them to finish."""
self._started = False
for name, t in list(self._running.items()):
t.cancel()
await asyncio.gather(
*self._running.values(), return_exceptions=True,
)
self._running.clear()
logger.info("Background scheduler stopped")
# ------------------------------------------------------------------
@staticmethod
async def _loop(task: _RegisteredTask) -> None:
"""Internal helper: loop.
Args:
task (_RegisteredTask): The task value.
"""
if task.initial_delay > 0:
await asyncio.sleep(task.initial_delay)
while True:
try:
await task.func(*task.args, **task.kwargs)
except asyncio.CancelledError:
raise
except Exception:
logger.exception("Background task '%s' failed", task.name)
await asyncio.sleep(task.interval)
class _RegisteredTask:
""" RegisteredTask.
Attributes:
name: The name.
func: The func.
interval: The interval.
initial_delay: The initial delay.
args: The args.
kwargs: The kwargs.
"""
__slots__ = ("name", "func", "interval", "initial_delay", "args", "kwargs")
def __init__(
self,
name: str,
func: Callable[..., Awaitable[Any]],
interval: float,
initial_delay: float,
args: tuple,
kwargs: dict,
) -> None:
"""Initialize the instance.
Args:
name (str): Human-readable name.
func (Callable[..., Awaitable[Any]]): The func value.
interval (float): The interval value.
initial_delay (float): The initial delay value.
args (tuple): The args value.
kwargs (dict): Additional keyword arguments.
"""
self.name = name
self.func = func
self.interval = interval
self.initial_delay = initial_delay
self.args = args
self.kwargs = kwargs
# ------------------------------------------------------------------
# Concrete task functions
# ------------------------------------------------------------------
[docs]
async def scheduled_prompt_tick(redis: Any) -> None:
"""Check for due scheduled prompts and trigger them."""
try:
from tools.scheduled_prompt import tick_scheduled_prompts
await tick_scheduled_prompts(redis)
except ImportError:
pass
except Exception:
logger.exception("Scheduled prompt tick failed")
[docs]
async def scheduled_prompt_cleanup(redis: Any) -> None:
"""Remove old executed/cancelled scheduled prompts."""
try:
from tools.scheduled_prompt import cleanup_expired_prompts
await cleanup_expired_prompts(redis, days_to_keep=7)
except ImportError:
pass
except Exception:
logger.exception("Scheduled prompt cleanup failed")
[docs]
async def channel_summarization(redis: Any) -> None:
"""Summarise the 10 most recently active channels."""
try:
from background_agents.channel_summarizer import summarise_all_active
result = await summarise_all_active(redis=redis)
n = result.get("channels_processed", 0)
if n:
logger.info("Channel summarization processed %d channel(s)", n)
except ImportError:
pass
except Exception:
logger.exception("Channel summarization failed")
[docs]
async def kg_consolidation_task(
kg_manager: Any,
openrouter: Any,
) -> None:
"""Consolidate duplicate entities in the knowledge graph."""
try:
from kg_consolidation import consolidate_graph
result = await consolidate_graph(
kg_manager=kg_manager,
openrouter=openrouter,
)
if result.get("merged") or result.get("pruned_entities"):
logger.info(
"KG consolidation: merged=%d pruned=%d",
result.get("merged", 0), result.get("pruned_entities", 0),
)
except ImportError:
pass
except Exception:
logger.exception("KG consolidation failed")
[docs]
async def kg_decay_task(kg_manager: Any) -> None:
"""Apply relationship weight decay in the knowledge graph."""
try:
from kg_consolidation import decay_relationships
removed = await decay_relationships(kg_manager)
if removed:
logger.info("KG decay removed %d weak relationships", removed)
except ImportError:
pass
except Exception:
logger.exception("KG relationship decay failed")
[docs]
async def log_rag_ingest_task() -> None:
"""Ingest recent journalctl logs into the stargazer_logs RAG store."""
try:
from log_rag_ingest import ingest_logs_tick
await ingest_logs_tick()
except ImportError:
pass
except Exception:
logger.exception("Log RAG ingest failed")
[docs]
async def gemini_key_probe_task() -> None:
"""Probe Gemini API keys to detect daily quota exhaustion."""
try:
from gemini_embed_pool import probe_all_keys
await probe_all_keys()
except ImportError:
pass
except Exception:
logger.exception("Gemini key probe failed")
[docs]
async def agentic_kg_bulk_incremental_task(
redis: Any,
kg_manager: Any,
openrouter: Any,
) -> None:
"""Agentic bulk KG extraction for the 10 MRU channels, incremental cursors."""
try:
from datetime import datetime, timezone
from pathlib import Path
from config import Config
from kg_bulk_runner import (
KgBulkPipelineParams,
collect_messages_from_redis,
resolve_bulk_backend,
run_agentic_bulk_pipeline,
)
from message_cache import get_active_channels
_ = kg_manager, openrouter # scheduler parity with other KG tasks
cfg = Config.load()
if not cfg.redis_url or not cfg.api_key:
logger.debug(
"agentic_kg_bulk_incremental: skip (redis or api_key missing)",
)
return
channels = await get_active_channels(redis, limit=10)
if not channels:
logger.info("agentic_kg_bulk_incremental: no active channels")
return
messages, n_zsets = await collect_messages_from_redis(
redis,
incremental=True,
cursor_bootstrap="latest",
channel_pairs=channels,
)
if not messages:
logger.info(
"agentic_kg_bulk_incremental: no new messages for %d channel(s)",
len(channels),
)
return
repo_root = Path(__file__).resolve().parent
out_root = repo_root / "var" / "kg_bulk_scheduled"
out_root.mkdir(parents=True, exist_ok=True)
out_dir = out_root / datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
out_dir.mkdir(parents=True, exist_ok=True)
params = KgBulkPipelineParams(
out_dir=out_dir,
dump_only=False,
dry_run_chunks=False,
dry_run_llm=False,
per_channel=True,
incremental=True,
bulk_llm_backend=resolve_bulk_backend(None),
fetch_channel_metadata=False,
)
await run_agentic_bulk_pipeline(
cfg, redis, messages, n_zsets, params,
load_channel_metadata=None,
)
logger.info(
"agentic_kg_bulk_incremental: processed %d message(s) -> %s",
len(messages),
out_dir,
)
except ImportError:
pass
except Exception:
logger.exception("agentic_kg_bulk_incremental failed")
[docs]
async def anamnesis_digest_task(
redis: Any,
kg_manager: Any,
openrouter: Any,
) -> None:
"""Digest Spiral Goddess RAG fragments into the Knowledge Graph."""
try:
from anamnesis_engine import run_anamnesis_cycle
result = await run_anamnesis_cycle(
redis=redis,
kg_manager=kg_manager,
openrouter=openrouter,
)
total = result.get("entities_added", 0) + result.get("relationships_added", 0)
if total:
logger.info(
"Anamnesis digest: +%d entities/rels (cursor %d/%d, %.1f%%)",
total,
result.get("cursor", 0),
result.get("total_store", 0),
result.get("progress_pct", 0),
)
except ImportError:
pass
except Exception:
logger.exception("Anamnesis digest failed")
[docs]
async def startup_channel_backfill(
redis: Any,
message_cache: Any,
embedding_queue: Any,
adapters: list[Any],
max_channels: int = 10,
) -> dict[str, int]:
"""Backfill recent history for the most active channels at startup.
For each of the top *max_channels* most-recently-active channels,
fetches history from the appropriate platform adapter, deduplicates
against Redis, caches new messages, and enqueues embeddings.
Returns a summary dict with ``channels_processed`` and
``messages_cached``.
"""
from message_cache import get_active_channels
channels = await get_active_channels(redis, limit=max_channels)
if not channels:
logger.info("Startup backfill: no active channels found")
return {"channels_processed": 0, "messages_cached": 0}
adapter_map: dict[str, Any] = {}
for adapter in adapters:
adapter_map[adapter.name] = adapter
total_cached = 0
channels_done = 0
for platform_name, channel_id in channels:
adapter = adapter_map.get(platform_name)
if adapter is None or not adapter.is_running:
continue
try:
history = await adapter.fetch_history(channel_id, limit=100)
except Exception:
logger.debug(
"Startup backfill: fetch_history failed for %s:%s",
platform_name, channel_id, exc_info=True,
)
continue
if not history:
continue
all_message_ids = [hm.message_id for hm in history if hm.message_id]
existing_lookup: dict[str, str] = {}
if all_message_ids:
try:
existing_lookup = (
await message_cache.find_keys_by_message_ids(
platform_name, channel_id, all_message_ids,
)
)
except Exception:
logger.debug(
"Startup backfill: message ID lookup failed for %s:%s",
platform_name, channel_id, exc_info=True,
)
need_embed_check: list[tuple[str, str]] = []
new_messages = []
for hm in history:
existing_key = (
existing_lookup.get(hm.message_id) if hm.message_id else None
)
if existing_key:
if hm.text:
need_embed_check.append((existing_key, hm.text))
else:
new_messages.append(hm)
pending_embeddings: list[tuple[str, str]] = []
if need_embed_check:
try:
check_keys = [k for k, _ in need_embed_check]
has_emb = await message_cache.has_real_embedding_many(
check_keys,
)
for (key, text), has in zip(need_embed_check, has_emb):
if not has:
pending_embeddings.append((key, text))
except Exception:
pending_embeddings.extend(need_embed_check)
for hm in new_messages:
try:
cached_msg = await message_cache.log_message(
platform=platform_name,
channel_id=channel_id,
user_id="assistant" if hm.is_bot else hm.user_id,
user_name="assistant" if hm.is_bot else hm.user_name,
text=hm.text,
timestamp=hm.timestamp.timestamp(),
defer_embedding=True,
message_id=hm.message_id,
reply_to_id=hm.reply_to_id,
)
if cached_msg.message_key and hm.text:
pending_embeddings.append(
(cached_msg.message_key, hm.text),
)
total_cached += 1
except Exception:
logger.debug(
"Startup backfill: failed to cache message",
exc_info=True,
)
if pending_embeddings and embedding_queue is not None:
try:
await embedding_queue.enqueue_many(pending_embeddings)
except Exception:
logger.debug(
"Startup backfill: enqueue_many failed for %s:%s",
platform_name, channel_id, exc_info=True,
)
channels_done += 1
logger.info(
"Startup backfill complete: %d channels, %d new messages cached, "
"%d embeddings enqueued",
channels_done, total_cached,
total_cached + len(pending_embeddings) if pending_embeddings else total_cached,
)
return {"channels_processed": channels_done, "messages_cached": total_cached}
[docs]
def build_scheduler(
redis: Any = None,
kg_manager: Any = None,
openrouter: Any = None,
) -> BackgroundScheduler:
"""Build a :class:`BackgroundScheduler` with the standard tasks.
Only registers tasks whose dependencies are available.
"""
sched = BackgroundScheduler()
if redis is not None:
sched.register(
"scheduled_prompt_tick",
scheduled_prompt_tick,
interval=60,
initial_delay=10,
args=(redis,),
)
sched.register(
"scheduled_prompt_cleanup",
scheduled_prompt_cleanup,
interval=172_800, # 48 hours
initial_delay=300,
args=(redis,),
)
if redis is not None and kg_manager is not None and openrouter is not None:
sched.register(
"auto_kg_extraction",
auto_kg_extraction,
interval=14_400, # 4 hours
initial_delay=300,
args=(redis, kg_manager, openrouter),
)
sched.register(
"agentic_kg_bulk_incremental",
agentic_kg_bulk_incremental_task,
interval=14_400, # 4 hours
initial_delay=900,
args=(redis, kg_manager, openrouter),
)
if redis is not None:
sched.register(
"channel_summarization",
channel_summarization,
interval=21_600, # 6 hours
initial_delay=600,
args=(redis,),
)
if kg_manager is not None and openrouter is not None:
sched.register(
"kg_consolidation",
kg_consolidation_task,
interval=86_400, # 24 hours
initial_delay=1800,
args=(kg_manager, openrouter),
)
sched.register(
"kg_decay",
kg_decay_task,
interval=86_400, # 24 hours
initial_delay=3600,
args=(kg_manager,),
)
# Log RAG ingestion -- no dependencies required
sched.register(
"log_rag_ingest",
log_rag_ingest_task,
interval=21_600, # 6 hours
initial_delay=30,
)
# Gemini API key daily quota probe -- no dependencies required
sched.register(
"gemini_key_probe",
gemini_key_probe_task,
interval=7_200, # 2 hours
initial_delay=60,
)
# Anamnesis -- Spiral Goddess -> Knowledge Graph consolidation
if redis is not None and kg_manager is not None and openrouter is not None:
sched.register(
"anamnesis_digest",
anamnesis_digest_task,
interval=1200, # 20 minutes
initial_delay=300, # 5 minutes after boot
args=(redis, kg_manager, openrouter),
)
return sched