background_tasks
Asyncio-based background task scheduler.
Manages periodic background tasks that run alongside the bot: - Scheduled prompt tick / cleanup - Auto memory extraction - Periodic maintenance
When Config.background_scheduler_chat_llm_enabled is false, periodic
tasks that call chat/completions are not registered (scheduled prompt tick,
auto KG extraction, agentic KG bulk incremental, channel summarization,
channel heartbeat, KG consolidation, anamnesis). Cleanup, KG decay, and
Gemini key probe still run. Log RAG embed ingest runs only when
Config.background_scheduler_log_rag_ingest_enabled is true (default off).
- background_tasks.background_task(interval, initial_delay=0.0, *, name=None, requires_redis=False, requires_kg=False, requires_openrouter=False, requires_bot_runner=False, requires_chat_llm=False, requires_config=False, opt_in_config_flag=None)[source]
Decorator that registers a coroutine as a periodic background task.
Captures the task’s scheduling metadata and dependency requirements at import time so that
build_scheduler()can later decide whether to wire the task up (based on which of Redis, the knowledge graph, OpenRouter, the bot runner, chat-LLM gating, and any opt-in config flag are available). The decorator itself does not run or wrap the coroutine; it only appends a metadata record.Interactions: its inner
decoratorclosure appends a dict to the module-level_BACKGROUND_TASKS_REGISTRYand logs the registration at debug level. No I/O, network, or Redis access happens here.Called by every
@background_task(...)application on the concrete task coroutines defined later in this module, and by the scheduler decorator tests intests/test_scheduler_decorator.py.- Parameters:
interval (
float) – Seconds between successive runs of the task.initial_delay (
float) – Seconds to wait before the first run.name (
str|None) – Explicit registry name; defaults to the coroutine’s__name__when omitted.requires_redis (
bool) – Whether the task needs a Redis client argument.requires_kg (
bool) – Whether the task needs the knowledge-graph manager.requires_openrouter (
bool) – Whether the task needs the OpenRouter client.requires_bot_runner (
bool) – Whether the task needs the bot-runner/service object.requires_chat_llm (
bool) – Whether the task is gated on the chat-LLM scheduler flag.requires_config (
bool) – Whether the task should be passed the config object.opt_in_config_flag (
str|None) – Name of a config attribute that must be truthy for the task to be scheduled, orNonefor always-on.
- Returns:
A decorator that records metadata for, and returns unchanged, the wrapped task coroutine.
- Return type:
Callable[[Callable[...,Awaitable[Any]]],Callable[...,Awaitable[Any]]]
- class background_tasks.BackgroundScheduler[source]
Bases:
objectRun named periodic tasks in the background.
Each task is an async callable that is invoked repeatedly with a configurable interval. Tasks are resilient to individual failures and log errors without crashing the scheduler.
Usage:
sched = BackgroundScheduler() sched.register("my_task", my_coro, interval=300) await sched.start() # non-blocking, spawns tasks ... await sched.stop()
- __init__()[source]
Initialize an empty scheduler with no registered or running tasks.
Sets up the in-memory registry of declared tasks and the map of live asyncio tasks, and marks the scheduler as not yet started. Pure in-process state; performs no I/O.
Called by the service entrypoints that own a scheduler instance (
agents_main.py,inference_main.py,consolidation_main.py), bybuild_scheduler(), and by the scheduler tests.- Return type:
None
- register(name, func, interval, initial_delay=0.0, args=(), kwargs=None)[source]
Register a periodic task.
- Parameters:
name (
str) – Unique identifier for the task.func (
Callable[...,Awaitable[Any]]) – Async callable to invoke each cycle.interval (
float) – Seconds between invocations (measured from end of previous run).initial_delay (
float) – Seconds to wait before the first invocation.args (tuple)
kwargs (dict | None)
- Return type:
- async start()[source]
Spawn one supervised loop per registered task (idempotent).
Marks the scheduler started and creates a long-lived asyncio task for each registered entry, each driven by
_loop()to run the task on its configured interval. Returns immediately without blocking; a second call is a no-op once started.Interactions: calls
asyncio.create_task(self._loop(task))for every entry inself._tasks, stores the handles inself._running, and logs the count started.Called by the agents, inference, and consolidation service startup flows (
await self.scheduler.start()) and by the observability/scheduler tests.- Return type:
- async stop()[source]
Cancel every running task loop and wait for them to settle.
Clears the started flag, cancels all live asyncio tasks, gathers them (swallowing the resulting exceptions), and empties the running map so the scheduler can be cleanly restarted or torn down.
Interactions: calls
cancel()on each handle inself._running, awaits them withasyncio.gather(..., return_exceptions=True), clears the dict, and logs the stop.Called by the agents, inference, and consolidation service shutdown paths (
await self.scheduler.stop()).- Return type:
- class background_tasks.TaskSupervisor(redis=None, node_id='')[source]
Bases:
objectLeader-elected supervisor for long-lived async worker loops.
Unlike
BackgroundScheduler(interval-based, every-node tasks), this supervises persistent worker coroutines that should run on exactly one node at a time. When given a Redis client it contends for a distributed leader lease and only spawns the registered workers on the elected leader, stopping them again on leadership loss; without Redis it runs standalone as the permanent leader. Each supervised worker is restarted with exponential backoff if it crashes.Interactions: uses Redis
SET NX EX/GET/EVALon the leader key for lease contention,asynciotasks for the heartbeat and each worker, and theobservabilitysingleton fortask_started/task_crashcounters.Constructed by the agents service in
agents_main.py(to supervise the Limbic dedup worker) and exercised by the leader-election and boot tests.- Parameters:
redis (Any)
node_id (str)
- __init__(redis=None, node_id='')[source]
Initialize the supervisor and its leader-election state.
Sets up empty task/factory registries and records the Redis client and node identity used for distributed leader election. The election key is read from the
SG_LEADER_KEYenvironment variable, defaulting tosg:leader:supervisor. WhenredisisNonethe supervisor runs standalone and treats itself as the permanent leader.Interactions reads
os.environ["SG_LEADER_KEY"]for the Redis lock key; storesredisfor laterget/set/evallease calls in_leader_heartbeat().Called by the agents service in
agents_main.py, which constructsTaskSupervisor(redis=redis_client, node_id=self.instance_id)during startup, and by the supervisor unit/integration tests.
- register_task(name, task_factory)[source]
Register a worker factory under a name for later supervision.
Records a zero-arg coroutine factory keyed by name; the supervisor will invoke it (and re-invoke it on crash) once this node holds leadership. Registering does not start anything — it only populates the factory map.
Interactions: writes
self._factories[name]. The matching worker is later spawned by_spawn_task()via_run_with_retry().Called by the agents service in
agents_main.py(registeringlimbic_dedup_worker) and by the leader-election and boot tests.
- async start()[source]
Start supervision, electing a leader when Redis is available.
Marks the supervisor running. When a Redis client is configured it launches the
_leader_heartbeat()background loop, which only spawns the registered workers once this node wins the lease. Without Redis it immediately assumes leadership and starts all tasks directly.Interactions sets
self._is_runningand either creates the_leader_heartbeatasyncio task or calls_start_all_tasks().Called by the agents service
startupflow inagents_main.py(await self.supervisor.start()) and by the boot/leader-election tests.- Return type:
- async verify_leadership_before_destructive()[source]
Re-assert leadership immediately before a destructive action.
Guards against split-brain by failing loudly unless this node both holds the in-memory leader flag and still owns the lease in Redis at call time. Intended to be called right before a worker performs an irreversible or cluster-wide operation, so a stale or lost lease aborts the action.
Interactions: reads
self._is_leaderand issuesredis.get(self._leader_key), comparing the decoded value againstself._node_id.No internal callers in this module; invoked by worker code ahead of destructive work and asserted by the leader-key config test in
tests/core/migration/test_leader_key_config.py.- Raises:
RuntimeError – If this node is not flagged leader, the lease has expired in Redis, or the lease is now owned by another node.
- Return type:
- async shutdown()[source]
Stop the heartbeat and all supervised tasks for a clean exit.
Clears the running flag (so loops exit), cancels and awaits the leader heartbeat task if present, then cancels and awaits every supervised worker. Leaves the supervisor in a non-leader, fully stopped state.
Interactions cancels
self._heartbeat_taskand gathers it, then calls_stop_all_tasks(); resetsself._is_leader. (Note: the Redis lease is left to expire via its TTL rather than being deleted here.)Called by the agents service
shutdownpath inagents_main.py(await self.supervisor.shutdown()) and by the supervisor tests.- Return type:
- async background_tasks.scheduled_prompt_tick(redis)[source]
Fire any user-scheduled prompts that have come due (every 60s).
Periodic task that delegates to the scheduled-prompt tool to find prompts whose fire time has passed and dispatch them, so reminders and recurring prompts actually go off. A missing optional dependency is silently ignored and any other failure is logged without crashing the scheduler.
Interactions: imports and awaits
tools.scheduled_prompt.tick_scheduled_prompts(redis), which reads and updates the scheduled-prompt state in Redis and enqueues the triggered prompts.Registered via the
background_task()decorator into_BACKGROUND_TASKS_REGISTRYand dispatched bybuild_scheduler(); also re-registered explicitly inagents_main.pywithargs=(redis_client,).
- async background_tasks.scheduled_prompt_cleanup(redis)[source]
Garbage-collect stale executed/cancelled scheduled prompts (every 48h).
Periodic housekeeping task that prunes scheduled-prompt records older than a week so the store does not grow without bound. Unlike
scheduled_prompt_tick(), it does not invoke the chat LLM. Import errors are ignored and other failures are logged.Interactions: imports and awaits
tools.scheduled_prompt.cleanup_expired_prompts(redis, days_to_keep=7), which deletes expired prompt entries from Redis.Registered via the
background_task()decorator into_BACKGROUND_TASKS_REGISTRYand dispatched bybuild_scheduler(); also re-registered explicitly inagents_main.pywithargs=(redis_client,).
- async background_tasks.auto_kg_extraction(redis, kg_manager, openrouter, config=None)[source]
Mine recent conversations for knowledge-graph entities (every 4h).
Periodic task that runs a batch extraction pass over recent messages, adding entities and relationships to the knowledge graph so the bot’s long-term memory keeps up with new conversations. The extraction model can be overridden via config. Import errors are ignored; other failures are logged.
Interactions: imports and awaits
kg_extraction.run_batch_extraction, passing the Redis client (message source), thekg_manager(FalkorDB graph backend it writes into), the OpenRouter client (LLM that performs extraction), and an optionalkg_extraction_modeloverride read fromconfig. Logs a count when entities/relationships are added.Registered via the
background_task()decorator into_BACKGROUND_TASKS_REGISTRYand dispatched bybuild_scheduler(); also re-registered explicitly inagents_main.pywithargs=(redis_client, self.kg_manager, self.openrouter, self.cfg).
- async background_tasks.channel_summarization(redis)[source]
Refresh rolling summaries for the most active channels (every 6h).
Periodic task that delegates to the channel-summarizer background agent to (re)summarise the top recently-active channels, keeping per-channel summaries used elsewhere in retrieval and context assembly current. Import errors are ignored; other failures are logged.
Interactions: imports and awaits
background_agents.channel_summarizer.summarise_all_active(redis=redis), which reads channel activity/history from Redis and calls the chat LLM to produce summaries it persists back. Logs the number of channels processed.Registered via the
background_task()decorator into_BACKGROUND_TASKS_REGISTRYand dispatched bybuild_scheduler(); also re-registered explicitly inagents_main.pywithargs=(redis_client,).
- async background_tasks.kg_consolidation_task(kg_manager, openrouter)[source]
Merge and prune duplicate knowledge-graph entities (every 24h).
Periodic task that runs the KG consolidation pass: detecting and merging duplicate entities and pruning low-value ones so the graph stays coherent over time. It records its outcome (or failure) as a debug-observability event. Import errors are ignored; other failures are logged and reported as an error event.
Interactions: imports and awaits
kg_consolidation.consolidate_graph(kg_manager=..., openrouter=...), which reads and mutates the FalkorDB knowledge graph and uses the LLM to judge merges; then schedules a fire-and-forgetobservability.publish_debug_event(topickg_consolidation) with the merged/pruned counts.Registered via the
background_task()decorator into_BACKGROUND_TASKS_REGISTRYand dispatched bybuild_scheduler(); the consolidation service inconsolidation_main.pyalso registers it explicitly under"kg_consolidation".
- async background_tasks.kg_decay_task(kg_manager)[source]
Decay knowledge-graph relationship weights and prune weak edges (every 24h).
Periodic task that ages out stale connections by decaying relationship weights and removing edges that fall below threshold, so the graph reflects recency. Unlike
kg_consolidation_task(), it is a pure graph operation and does not call the LLM. It records an outcome (or error) debug-observability event. Import errors are ignored; other failures are logged.Interactions: imports and awaits
kg_consolidation.decay_relationships(kg_manager), which mutates the FalkorDB knowledge graph; then schedules a fire-and-forgetobservability.publish_debug_event(topickg_decay) with the number of relationships removed.Registered via the
background_task()decorator into_BACKGROUND_TASKS_REGISTRYand dispatched bybuild_scheduler(); the consolidation service inconsolidation_main.pyalso registers it explicitly under"kg_decay".
- async background_tasks.journal_stream_task(redis)[source]
Tail the systemd journal and republish lines to Redis Pub/Sub.
Long-running task that spawns
journalctl -u stargazer -fand forwards each JSON log line to thestargazer:journalRedis channel, powering the live log feed in the web debug console. Ifjournalctlis unavailable it returns immediately; the scheduler restarts the task ~10s after any exit or crash. Thefinallyblock terminates (then kills) the subprocess on cancellation or exit so no orphanedjournalctlis left behind.Interactions: launches a
journalctlsubprocess viaasyncio.create_subprocess_exec, reads its stdout line by line, and callsredis.publish("stargazer:journal", ...)for each line.Registered via the
background_task()decorator into_BACKGROUND_TASKS_REGISTRYand dispatched bybuild_scheduler(); also re-registered explicitly inagents_main.pywithargs=(redis_client,)and exercised bytests/test_background_tasks_shutdown.py.
- async background_tasks.log_rag_ingest_task()[source]
Ingest recent journal logs into the log RAG store (every 6h, opt-in).
Periodic task that pulls recent
journalctloutput and embeds it into thestargazer_logsretrieval store so logs become searchable for diagnostics. Gated behind thebackground_scheduler_log_rag_ingest_enabledopt-in flag (off by default). Import errors are ignored; other failures are logged.Interactions: imports and awaits
log_rag_ingest.ingest_logs_tick(), which reads logs and writes embeddings into the log RAG store.Registered via the
background_task()decorator into_BACKGROUND_TASKS_REGISTRYand dispatched bybuild_scheduler(); also re-registered explicitly inagents_main.py(only when the opt-in flag is set). Takes no arguments.- Return type:
- async background_tasks.gemini_key_probe_task()[source]
Probe the shared Gemini API keys for quota exhaustion (every 2h).
Periodic task that pings every key in the shared Gemini embedding pool so keys that have hit their daily quota are detected and marked, letting the embedding pool route around exhausted keys instead of failing live requests. Import errors are ignored; other failures are logged.
Interactions: imports and awaits
gemini_embed_pool.probe_all_keys(), which issues lightweight probe calls per key and updates the pool’s per-key availability/quota state.Registered via the
background_task()decorator into_BACKGROUND_TASKS_REGISTRYand dispatched bybuild_scheduler(); also re-registered explicitly inagents_main.py. Takes no arguments.- Return type:
- async background_tasks.agentic_kg_bulk_incremental_task(redis, kg_manager, openrouter)[source]
Run incremental agentic bulk KG extraction over active channels (every 4h).
Periodic task that drives the heavyweight agentic bulk-extraction pipeline against the most-recently-used channels, using per-channel incremental cursors so only messages newer than the last run are processed. Each run dumps its working set under
var/kg_bulk_scheduled/<timestamp>and feeds the agentic pipeline that grows the knowledge graph. It short-circuits when Redis or the API key is unconfigured, when there are no active channels, or when no new messages exist, emitting a debug-observability event at each milestone. Import errors are ignored; other failures are logged and reported as an error event.Interactions: loads
Config, callsmessage_cache.get_active_channelsandkg_bulk_runner.collect_messages_from_redis(reading messages and cursor state from Redis), writes a dump directory to the filesystem, then awaitskg_bulk_runner.run_agentic_bulk_pipeline, which uses the LLM and mutates the knowledge graph. Progress is reported through the nested_emit()closure (topickg_bulk_incremental). The passedkg_managerandopenrouterare accepted only for scheduler parity with the other KG tasks and are not used directly here.Registered via the
background_task()decorator into_BACKGROUND_TASKS_REGISTRYand dispatched bybuild_scheduler(); also re-registered explicitly inagents_main.pywithargs=(redis_client, self.kg_manager, self.openrouter).
- async background_tasks.anamnesis_digest_task(redis, kg_manager, openrouter, config)[source]
Digest Spiral Goddess RAG fragments into the knowledge graph (every 20m).
Periodic task that runs one anamnesis cycle: advancing a cursor through the Spiral Goddess RAG corpus and folding fresh fragments into the knowledge graph as entities and relationships, so curated lore is progressively absorbed into long-term memory. Import errors are ignored; other failures are logged. Logs cursor progress when anything is added.
Interactions: imports and awaits
anamnesis_engine.run_anamnesis_cycle(redis=..., kg_manager=..., openrouter=..., config=...), which reads the RAG fragment store (Redis), calls the LLM via the OpenRouter client to extract structure, and writes results into the FalkorDB knowledge graph while persisting its cursor.Registered via the
background_task()decorator into_BACKGROUND_TASKS_REGISTRYand dispatched bybuild_scheduler(); also re-registered explicitly inagents_main.pywithargs=(redis_client, self.kg_manager, self.openrouter, self.cfg).
- async background_tasks.startup_channel_backfill(redis, message_cache, embedding_queue, adapters, max_channels=10)[source]
Backfill recent history for the most active channels at startup.
For each of the top max_channels most-recently-active channels, fetches history from the appropriate platform adapter, deduplicates against Redis, caches new messages, and enqueues embeddings.
Returns a summary dict with
channels_processedandmessages_cached.
- async background_tasks.channel_heartbeat_task(bot_runner)[source]
Run the staggered per-channel heartbeat loop (flash model).
Periodic task that starts the channel-heartbeat background agent, which walks the most-recently-used channels on a stagger and uses the cheap “flash” model to keep the bot’s presence/awareness warm across channels. It no-ops if the heartbeat module is unavailable. The decorator interval is a crash-restart cadence; the loop itself paces its own per-channel timing.
Interactions: imports and awaits
background_agents.channel_heartbeat.channel_heartbeat_loop(bot_runner), passing the runner so the loop can reach Redis, adapters, and the LLM.Registered via the
background_task()decorator into_BACKGROUND_TASKS_REGISTRYand dispatched bybuild_scheduler(); imported and registered by the inference service ininference_main.py.
- async background_tasks.starwiki_lint_task(bot_runner)[source]
Run the scheduled StarWiki lint over public and opt-in wikis.
Periodic task that delegates to the StarWiki bookkeeper to lint public and opt-in wiki content. It no-ops when no
starwiki_serviceis attached to the runner. Gated on thestarwiki_enabledopt-in flag, and its interval is dynamically overridden inbuild_scheduler()fromstarwiki_lint_interval_minutes.Interactions reads
bot_runner.starwiki_serviceand callsstarwiki.bookkeeper.scheduled_lint_public_and_opt_in(svc).Called by the scheduler loop; registered via the
background_task()decorator into_BACKGROUND_TASKS_REGISTRY(and explicitly re-registered inagents_main.pywithargs=(self,)).
- async background_tasks.limbic_anchoring_task(bot_runner)[source]
Run the persistent real-time Limbic knowledge-anchoring loop.
Long-lived worker that drives the knowledge-anchoring system: it continuously discovers active channels and extracts/anchors knowledge from new messages into the graph in near real time. No-ops when
ka_enabledis false. Channel discovery is fully dynamic (via Redis SCAN inside the worker), so the initial channel list is intentionally empty. The decorator interval is only a crash-restart cadence; the worker runs its own internal loop.Interactions: reads
bot_runner.cfgand constructs anAnchoringWorkerwired to the message-cache Redis client, thekg_manager(FalkorDB graph), the embedding queue, and a localextraction_llm_callclosure that posts to the keyless Gemini proxy; then awaitsworker.anchoring_loop().Registered via the
background_task()decorator into_BACKGROUND_TASKS_REGISTRYand dispatched bybuild_scheduler(); also re-registered explicitly inagents_main.pywithargs=(self,).
- async background_tasks.limbic_sunset_task(bot_runner)[source]
Run the daily Limbic sunset (epoch synthesis) loop.
Long-lived worker that drives the knowledge-anchoring “sunset” pass: periodically synthesising the day’s anchored knowledge into a consolidated epoch in the graph. No-ops when
ka_enabledis false. The decorator interval is a crash-restart cadence; the loop paces its own daily cycle.Interactions: reads
bot_runner.cfgand awaitsknowledge_anchoring.sunset_processor.sunset_loopwired to the message-cache Redis client (and its raw handle), thekg_manager(FalkorDB graph), the message cache, and asunset_llm_callclosure that delegates tobot_runner.openrouter.chatwithoverride_model=cfg.kg_extraction_model.Registered via the
background_task()decorator into_BACKGROUND_TASKS_REGISTRYand dispatched bybuild_scheduler(); also re-registered explicitly inagents_main.pywithargs=(self,).
- async background_tasks.limbic_dedup_task(bot_runner)[source]
Run the persistent Limbic knowledge-graph deduplication loop.
Long-lived worker that drives the knowledge-anchoring dedup loop, merging duplicate KG entities/relationships via the LLM. No-ops when
ka_dedup_enabledis false. Unlike the decorated periodic tasks, this is supervised byTaskSupervisor(leader-elected) rather than the interval scheduler, so only one node runs it at a time.Interactions reads
bot_runner.cfg; defines adedup_llm_callclosure that wraps prompts and callsbot_runner.openrouter.chatwithoverride_model=cfg.kg_extraction_model; then runsknowledge_anchoring.dedup_worker.dedup_loopwithbot_runner.kg_manageras the graph backend.Called by the agents service in
agents_main.pyviaself.supervisor.register_task("limbic_dedup_worker", lambda: limbic_dedup_task(self))(and the analogous dedup-registration test).
- async background_tasks.limbic_org_task(bot_runner)[source]
Run the persistent Limbic knowledge-graph organization loop.
Long-lived worker that drives the knowledge-anchoring organization loop, restructuring high degree nodes into hierarchies. No-ops when
ka_org_enabledis false. Supervised byTaskSupervisor(leader-elected) rather than the interval scheduler.
- async background_tasks.limbic_maintenance_task(bot_runner)[source]
Weekly maintenance for Limbic Anchoring.
F9-4 fix: GC does not require LLM. It runs a pure Cypher query on FalkorDB. This task is therefore NOT gated on chat_llm; it will run even when LLM-based anchoring tasks are disabled by the operator.
F9-3 fix: Passes kg_manager.query as the graph_query callable, which is the standard convention for all KA module functions. This is now documented.
- background_tasks.build_scheduler(redis=None, kg_manager=None, openrouter=None, bot_runner=None, *, config=None)[source]
Build a
BackgroundSchedulerwith the standard tasks.Only registers tasks whose dependencies are available. When
config.background_scheduler_chat_llm_enabledis false, skips periodic tasks that invoke chat/completions (see module docstring). Log RAG ingest is registered only whenconfig.background_scheduler_log_rag_ingest_enabledis true.