"""Per-channel staggered heartbeat: Redis next_at + short polling tick."""
from __future__ import annotations
import asyncio
import logging
import random
import time
from typing import Any
logger = logging.getLogger(__name__)
_NEXT_PREFIX = "stargazer:channel_heartbeat:next:"
_FAILURE_BACKOFF_S = 300.0
def _next_key(platform: str, channel_id: str) -> str:
"""Build the Redis key holding a channel's next scheduled heartbeat time.
Joins the module ``_NEXT_PREFIX`` with the platform and channel id so each
channel gets an isolated ``stargazer:channel_heartbeat:next:{platform}:{channel_id}``
string key. Called by :func:`get_next_run` and :func:`set_next_run` to address
the per-channel ``next_at`` timestamp; no other modules reference this helper.
Args:
platform (str): Platform identifier (e.g. ``"discord"``) the channel lives on.
channel_id (str): Platform-specific channel identifier.
Returns:
str: The fully qualified Redis key for this channel's next-run timestamp.
"""
return f"{_NEXT_PREFIX}{platform}:{channel_id}"
[docs]
async def get_next_run(redis: Any, platform: str, channel_id: str) -> float | None:
"""Read a channel's next scheduled heartbeat timestamp from Redis.
Fetches and parses the per-channel ``next_at`` epoch-seconds value used to
stagger heartbeats so all channels do not fire at once. A missing key (never
scheduled) or an unparsable value yields ``None``, which the caller treats as
"schedule a fresh randomized first run".
Performs a single ``GET`` on the key produced by :func:`_next_key`, decoding
bytes responses to ``str`` before converting to ``float``. Called by
:func:`_tick` while deciding which active channels are due; no callers exist
outside this module.
Args:
redis (Any): Async Redis client (from ``bot_runner.message_cache.redis_client``).
platform (str): Platform identifier the channel belongs to.
channel_id (str): Platform-specific channel identifier.
Returns:
float | None: The next-run epoch timestamp in seconds, or ``None`` if the
key is absent or its stored value cannot be parsed as a float.
"""
raw = await redis.get(_next_key(platform, channel_id))
if not raw:
return None
s = raw.decode() if isinstance(raw, bytes) else str(raw)
try:
return float(s)
except ValueError:
return None
[docs]
async def set_next_run(redis: Any, platform: str, channel_id: str, ts: float) -> None:
"""Persist a channel's next scheduled heartbeat timestamp to Redis.
Stores ``ts`` (epoch seconds, six decimal places) under the per-channel key,
overwriting any prior value, so the next :func:`_tick` pass knows when this
channel becomes due again. The key has no TTL; it is rewritten on every
scheduling decision.
Issues a single ``SET`` against the key from :func:`_next_key`. Called by
:func:`_tick` (and its nested ``_one`` worker) to spread an initial run,
push a disabled or adapter-down channel forward, apply the normal randomized
interval after a successful beat, or apply ``_FAILURE_BACKOFF_S`` after a
failed one. No callers exist outside this module.
Args:
redis (Any): Async Redis client used to persist the timestamp.
platform (str): Platform identifier the channel belongs to.
channel_id (str): Platform-specific channel identifier.
ts (float): Absolute epoch-seconds time at which the channel becomes due.
Returns:
None
"""
await redis.set(_next_key(platform, channel_id), f"{ts:.6f}")
[docs]
async def channel_heartbeat_loop(bot_runner: Any) -> None:
"""Run the forever loop that drives staggered per-channel heartbeats.
The long-lived entry point for the channel-heartbeat background agent: it
builds one shared flash-model LLM client, then loops on a short tick,
delegating each pass to :func:`_tick` so the most-recently-used channels are
beaten on their own randomized schedules rather than all at once. This keeps
the bot's per-channel awareness warm cheaply; the loop exits immediately if
no Redis is available and otherwise runs until cancelled.
Resolves the async Redis client from
``bot_runner.message_cache.redis_client``, constructs an
:class:`openrouter_client.OpenRouterClient` configured with
``cfg.channel_heartbeat_model`` and no tools (so beats never call tools),
and passes both into :func:`_tick` each iteration. Re-reads ``bot_runner.cfg``
every pass so a flipped ``channel_heartbeat_enabled`` flag or changed
``channel_heartbeat_tick_s`` takes effect live; per-tick exceptions are
logged and swallowed while :class:`asyncio.CancelledError` propagates, and the
shared client is closed in a ``finally`` on shutdown. Called by the inference
service's ``channel_heartbeat_task`` in ``background_tasks.py``, which is the
only caller.
Args:
bot_runner (Any): Runtime context exposing ``message_cache`` (for Redis),
``cfg``, ``get_adapter``, and ``processor`` used throughout the loop.
Returns:
None
"""
redis = (
bot_runner.message_cache.redis_client
if bot_runner.message_cache is not None
else None
)
if redis is None:
logger.warning("Channel heartbeat: no Redis — loop exiting")
return
from openrouter_client import OpenRouterClient
cfg = bot_runner.cfg
hb_client = OpenRouterClient(
api_key=cfg.api_key,
model=cfg.channel_heartbeat_model,
base_url=cfg.llm_base_url,
tool_registry=None,
max_tool_rounds=0,
top_p=cfg.top_p,
http_connect_timeout=cfg.openrouter_http_connect_timeout_seconds,
http_read_timeout=cfg.openrouter_http_read_timeout_seconds,
http_write_timeout=cfg.openrouter_http_write_timeout_seconds,
http_pool_timeout=cfg.openrouter_http_pool_timeout_seconds,
)
try:
while True:
cfg = bot_runner.cfg
if not cfg.channel_heartbeat_enabled:
await asyncio.sleep(cfg.channel_heartbeat_tick_s)
continue
try:
await _tick(bot_runner, redis, hb_client)
except asyncio.CancelledError:
raise
except Exception:
logger.exception("Channel heartbeat tick failed")
await asyncio.sleep(cfg.channel_heartbeat_tick_s)
finally:
await hb_client.close()
async def _tick(bot_runner: Any, redis: Any, hb_client: Any) -> None:
"""Run one polling pass: find due channels and beat them concurrently.
Pulls the most-recently-used active channels, computes which are due based on
their per-channel ``next_at`` timestamps, and dispatches a heartbeat for each
due channel under a concurrency limit. Channels never seen before are seeded
with a random first-run time within ``[0, max_s]`` to stagger initial fires,
and channels flagged disabled in Redis are pushed forward without beating.
Reads the active channel set via :func:`message_cache.get_active_channels`
(scans ``channel_msgs:*`` ZSETs), consults
:func:`message_processor.channel_heartbeat.is_channel_heartbeat_disabled`
(key ``heartbeat_disabled:{platform}:{channel_id}``), and reads/writes each
channel's schedule through :func:`get_next_run` / :func:`set_next_run`. Due
channels are processed by the nested ``_one`` worker, which invokes
``bot_runner.processor.run_channel_heartbeat_once`` with the shared
``hb_client``; an :class:`asyncio.Semaphore` sized by
``cfg.channel_heartbeat_concurrency`` bounds parallelism. Returns early when
the feature is disabled or no channels are due. Called once per loop iteration
by :func:`channel_heartbeat_loop` (the only caller).
Args:
bot_runner (Any): Runtime context exposing ``cfg``, ``get_adapter``, and
``processor`` used to resolve adapters and execute heartbeats.
redis (Any): Async Redis client used for channel discovery and scheduling.
hb_client (Any): Shared :class:`OpenRouterClient` (flash model, no tools)
passed through to each per-channel heartbeat run.
Returns:
None
"""
cfg = bot_runner.cfg
if not cfg.channel_heartbeat_enabled:
return
from message_cache import get_active_channels
from message_processor.channel_heartbeat import is_channel_heartbeat_disabled
channels = await get_active_channels(redis, cfg.channel_heartbeat_max_channels)
now = time.time()
min_s = cfg.channel_heartbeat_interval_min_s
max_s = cfg.channel_heartbeat_interval_max_s
if max_s < min_s:
max_s = min_s
due: list[tuple[str, str]] = []
for platform, channel_id in channels:
nxt = await get_next_run(redis, platform, channel_id)
if nxt is None:
spread = random.uniform(0.0, max_s)
await set_next_run(redis, platform, channel_id, now + spread)
continue
if nxt <= now:
if await is_channel_heartbeat_disabled(redis, platform, channel_id):
await set_next_run(
redis,
platform,
channel_id,
now + random.uniform(min_s, max_s),
)
continue
due.append((platform, channel_id))
if not due:
return
sem = asyncio.Semaphore(max(1, cfg.channel_heartbeat_concurrency))
async def _one(platform: str, channel_id: str) -> None:
"""Run one due channel's heartbeat and reschedule its next run.
Acquires the enclosing semaphore to respect the concurrency cap, resolves
the platform adapter, and executes a single heartbeat for the channel. If
the adapter is missing or not running it skips the beat and re-schedules
within the normal randomized interval; otherwise it reschedules with the
normal interval on success and ``_FAILURE_BACKOFF_S`` (300s) on failure.
Resolves the adapter via ``bot_runner.get_adapter(platform)`` and runs the
beat through ``bot_runner.processor.run_channel_heartbeat_once`` (which
serializes against the live inference turn and calls the flash heartbeat
model via the shared ``hb_client``), returning ``True`` only when the
heartbeat actually executed. Persists the next-run time through
:func:`set_next_run`. Closes over ``bot_runner``, ``redis``, ``hb_client``,
``sem``, ``min_s``, and ``max_s`` from :func:`_tick`, which schedules every
due channel's ``_one`` call via :func:`asyncio.gather`; there are no other
callers.
Args:
platform (str): Platform identifier for the due channel.
channel_id (str): Platform-specific channel identifier to beat.
Returns:
None
"""
async with sem:
adapter = bot_runner.get_adapter(platform)
t = time.time()
if adapter is None or not getattr(adapter, "is_running", False):
await set_next_run(
redis,
platform,
channel_id,
t + random.uniform(min_s, max_s),
)
return
ok = await bot_runner.processor.run_channel_heartbeat_once(
adapter,
channel_id,
hb_client,
)
t = time.time()
if ok:
await set_next_run(
redis,
platform,
channel_id,
t + random.uniform(min_s, max_s),
)
else:
await set_next_run(
redis,
platform,
channel_id,
t + _FAILURE_BACKOFF_S,
)
await asyncio.gather(*(_one(p, c) for p, c in due))