Source code for background_agents.channel_heartbeat

"""Per-channel staggered heartbeat: Redis next_at + short polling tick."""

from __future__ import annotations

import asyncio
import logging
import random
import time
from typing import Any

logger = logging.getLogger(__name__)

_NEXT_PREFIX = "stargazer:channel_heartbeat:next:"
_FAILURE_BACKOFF_S = 300.0


def _next_key(platform: str, channel_id: str) -> str:
    """Build the Redis key holding a channel's next scheduled heartbeat time.

    Joins the module ``_NEXT_PREFIX`` with the platform and channel id so each
    channel gets an isolated ``stargazer:channel_heartbeat:next:{platform}:{channel_id}``
    string key. Called by :func:`get_next_run` and :func:`set_next_run` to address
    the per-channel ``next_at`` timestamp; no other modules reference this helper.

    Args:
        platform (str): Platform identifier (e.g. ``"discord"``) the channel lives on.
        channel_id (str): Platform-specific channel identifier.

    Returns:
        str: The fully qualified Redis key for this channel's next-run timestamp.
    """
    return f"{_NEXT_PREFIX}{platform}:{channel_id}"


[docs] async def get_next_run(redis: Any, platform: str, channel_id: str) -> float | None: """Read a channel's next scheduled heartbeat timestamp from Redis. Fetches and parses the per-channel ``next_at`` epoch-seconds value used to stagger heartbeats so all channels do not fire at once. A missing key (never scheduled) or an unparsable value yields ``None``, which the caller treats as "schedule a fresh randomized first run". Performs a single ``GET`` on the key produced by :func:`_next_key`, decoding bytes responses to ``str`` before converting to ``float``. Called by :func:`_tick` while deciding which active channels are due; no callers exist outside this module. Args: redis (Any): Async Redis client (from ``bot_runner.message_cache.redis_client``). platform (str): Platform identifier the channel belongs to. channel_id (str): Platform-specific channel identifier. Returns: float | None: The next-run epoch timestamp in seconds, or ``None`` if the key is absent or its stored value cannot be parsed as a float. """ raw = await redis.get(_next_key(platform, channel_id)) if not raw: return None s = raw.decode() if isinstance(raw, bytes) else str(raw) try: return float(s) except ValueError: return None
[docs] async def set_next_run(redis: Any, platform: str, channel_id: str, ts: float) -> None: """Persist a channel's next scheduled heartbeat timestamp to Redis. Stores ``ts`` (epoch seconds, six decimal places) under the per-channel key, overwriting any prior value, so the next :func:`_tick` pass knows when this channel becomes due again. The key has no TTL; it is rewritten on every scheduling decision. Issues a single ``SET`` against the key from :func:`_next_key`. Called by :func:`_tick` (and its nested ``_one`` worker) to spread an initial run, push a disabled or adapter-down channel forward, apply the normal randomized interval after a successful beat, or apply ``_FAILURE_BACKOFF_S`` after a failed one. No callers exist outside this module. Args: redis (Any): Async Redis client used to persist the timestamp. platform (str): Platform identifier the channel belongs to. channel_id (str): Platform-specific channel identifier. ts (float): Absolute epoch-seconds time at which the channel becomes due. Returns: None """ await redis.set(_next_key(platform, channel_id), f"{ts:.6f}")
[docs] async def channel_heartbeat_loop(bot_runner: Any) -> None: """Run the forever loop that drives staggered per-channel heartbeats. The long-lived entry point for the channel-heartbeat background agent: it builds one shared flash-model LLM client, then loops on a short tick, delegating each pass to :func:`_tick` so the most-recently-used channels are beaten on their own randomized schedules rather than all at once. This keeps the bot's per-channel awareness warm cheaply; the loop exits immediately if no Redis is available and otherwise runs until cancelled. Resolves the async Redis client from ``bot_runner.message_cache.redis_client``, constructs an :class:`openrouter_client.OpenRouterClient` configured with ``cfg.channel_heartbeat_model`` and no tools (so beats never call tools), and passes both into :func:`_tick` each iteration. Re-reads ``bot_runner.cfg`` every pass so a flipped ``channel_heartbeat_enabled`` flag or changed ``channel_heartbeat_tick_s`` takes effect live; per-tick exceptions are logged and swallowed while :class:`asyncio.CancelledError` propagates, and the shared client is closed in a ``finally`` on shutdown. Called by the inference service's ``channel_heartbeat_task`` in ``background_tasks.py``, which is the only caller. Args: bot_runner (Any): Runtime context exposing ``message_cache`` (for Redis), ``cfg``, ``get_adapter``, and ``processor`` used throughout the loop. Returns: None """ redis = ( bot_runner.message_cache.redis_client if bot_runner.message_cache is not None else None ) if redis is None: logger.warning("Channel heartbeat: no Redis — loop exiting") return from openrouter_client import OpenRouterClient cfg = bot_runner.cfg hb_client = OpenRouterClient( api_key=cfg.api_key, model=cfg.channel_heartbeat_model, base_url=cfg.llm_base_url, tool_registry=None, max_tool_rounds=0, top_p=cfg.top_p, http_connect_timeout=cfg.openrouter_http_connect_timeout_seconds, http_read_timeout=cfg.openrouter_http_read_timeout_seconds, http_write_timeout=cfg.openrouter_http_write_timeout_seconds, http_pool_timeout=cfg.openrouter_http_pool_timeout_seconds, ) try: while True: cfg = bot_runner.cfg if not cfg.channel_heartbeat_enabled: await asyncio.sleep(cfg.channel_heartbeat_tick_s) continue try: await _tick(bot_runner, redis, hb_client) except asyncio.CancelledError: raise except Exception: logger.exception("Channel heartbeat tick failed") await asyncio.sleep(cfg.channel_heartbeat_tick_s) finally: await hb_client.close()
async def _tick(bot_runner: Any, redis: Any, hb_client: Any) -> None: """Run one polling pass: find due channels and beat them concurrently. Pulls the most-recently-used active channels, computes which are due based on their per-channel ``next_at`` timestamps, and dispatches a heartbeat for each due channel under a concurrency limit. Channels never seen before are seeded with a random first-run time within ``[0, max_s]`` to stagger initial fires, and channels flagged disabled in Redis are pushed forward without beating. Reads the active channel set via :func:`message_cache.get_active_channels` (scans ``channel_msgs:*`` ZSETs), consults :func:`message_processor.channel_heartbeat.is_channel_heartbeat_disabled` (key ``heartbeat_disabled:{platform}:{channel_id}``), and reads/writes each channel's schedule through :func:`get_next_run` / :func:`set_next_run`. Due channels are processed by the nested ``_one`` worker, which invokes ``bot_runner.processor.run_channel_heartbeat_once`` with the shared ``hb_client``; an :class:`asyncio.Semaphore` sized by ``cfg.channel_heartbeat_concurrency`` bounds parallelism. Returns early when the feature is disabled or no channels are due. Called once per loop iteration by :func:`channel_heartbeat_loop` (the only caller). Args: bot_runner (Any): Runtime context exposing ``cfg``, ``get_adapter``, and ``processor`` used to resolve adapters and execute heartbeats. redis (Any): Async Redis client used for channel discovery and scheduling. hb_client (Any): Shared :class:`OpenRouterClient` (flash model, no tools) passed through to each per-channel heartbeat run. Returns: None """ cfg = bot_runner.cfg if not cfg.channel_heartbeat_enabled: return from message_cache import get_active_channels from message_processor.channel_heartbeat import is_channel_heartbeat_disabled channels = await get_active_channels(redis, cfg.channel_heartbeat_max_channels) now = time.time() min_s = cfg.channel_heartbeat_interval_min_s max_s = cfg.channel_heartbeat_interval_max_s if max_s < min_s: max_s = min_s due: list[tuple[str, str]] = [] for platform, channel_id in channels: nxt = await get_next_run(redis, platform, channel_id) if nxt is None: spread = random.uniform(0.0, max_s) await set_next_run(redis, platform, channel_id, now + spread) continue if nxt <= now: if await is_channel_heartbeat_disabled(redis, platform, channel_id): await set_next_run( redis, platform, channel_id, now + random.uniform(min_s, max_s), ) continue due.append((platform, channel_id)) if not due: return sem = asyncio.Semaphore(max(1, cfg.channel_heartbeat_concurrency)) async def _one(platform: str, channel_id: str) -> None: """Run one due channel's heartbeat and reschedule its next run. Acquires the enclosing semaphore to respect the concurrency cap, resolves the platform adapter, and executes a single heartbeat for the channel. If the adapter is missing or not running it skips the beat and re-schedules within the normal randomized interval; otherwise it reschedules with the normal interval on success and ``_FAILURE_BACKOFF_S`` (300s) on failure. Resolves the adapter via ``bot_runner.get_adapter(platform)`` and runs the beat through ``bot_runner.processor.run_channel_heartbeat_once`` (which serializes against the live inference turn and calls the flash heartbeat model via the shared ``hb_client``), returning ``True`` only when the heartbeat actually executed. Persists the next-run time through :func:`set_next_run`. Closes over ``bot_runner``, ``redis``, ``hb_client``, ``sem``, ``min_s``, and ``max_s`` from :func:`_tick`, which schedules every due channel's ``_one`` call via :func:`asyncio.gather`; there are no other callers. Args: platform (str): Platform identifier for the due channel. channel_id (str): Platform-specific channel identifier to beat. Returns: None """ async with sem: adapter = bot_runner.get_adapter(platform) t = time.time() if adapter is None or not getattr(adapter, "is_running", False): await set_next_run( redis, platform, channel_id, t + random.uniform(min_s, max_s), ) return ok = await bot_runner.processor.run_channel_heartbeat_once( adapter, channel_id, hb_client, ) t = time.time() if ok: await set_next_run( redis, platform, channel_id, t + random.uniform(min_s, max_s), ) else: await set_next_run( redis, platform, channel_id, t + _FAILURE_BACKOFF_S, ) await asyncio.gather(*(_one(p, c) for p, c in due))