"""Cron-based scheduled prompt tools (v3)
Schedule prompts to fire at specified times or on cron
intervals. Prompt metadata is persisted in Redis. When a prompt
comes due it is handed to a runner registered via
``set_bot_runner`` (``runner.handle_webhook``); that legacy hook
predates the Phase T3 microservices split and is no longer wired
up, so delivery is a no-op unless a runner is supplied.
Module-level ``tick_scheduled_prompts`` and
``cleanup_expired_prompts`` are used by background_tasks.py
and accept a ``redis`` client directly.
"""
from __future__ import annotations
import asyncio
import jsonutil as json
import logging
import re
import uuid
from datetime import datetime, timedelta
from typing import (
Any,
Dict,
List,
Optional,
TYPE_CHECKING,
)
from croniter import croniter
if TYPE_CHECKING:
from tool_context import ToolContext
logger = logging.getLogger(__name__)
_bot_runner: Any = None
_running_tasks: set[str] = set()
[docs]
def set_bot_runner(runner: Any) -> None:
"""Store a runner reference used by ``_execute_prompt`` to deliver
a prompt via ``runner.handle_webhook``.
This hook predates the Phase T3 split into separate gateway,
inference, agents, consolidation, and web services. Nothing in
the current microservices layout calls it, so ``_bot_runner``
normally stays ``None`` and ``_execute_prompt`` short-circuits
with an error until a runner is supplied.
"""
global _bot_runner
_bot_runner = runner
_PREFIX = "stargazer:scheduled_prompts:"
_INDEX = "stargazer:scheduled_prompts:index"
# ---------------------------------------------------------------
# Internal helpers
# ---------------------------------------------------------------
def _pkey(prompt_id: str) -> str:
"""Build the Redis key under which one scheduled prompt is stored.
Namespacing helper that prefixes a prompt id with ``_PREFIX``
(``stargazer:scheduled_prompts:``) so every persisted prompt lives under a
predictable key. Centralizing the format here keeps ``_save``, ``_load`` and
``_delete`` in agreement.
Called by ``_save``, ``_load`` and ``_delete`` in this module.
Args:
prompt_id: The UUID string identifying the scheduled prompt.
Returns:
str: The fully-qualified Redis key for that prompt's JSON blob.
"""
return f"{_PREFIX}{prompt_id}"
async def _save(r, pd: Dict[str, Any]) -> None:
"""Persist a scheduled-prompt record to Redis and index its id.
Serializes the prompt dict to JSON under its ``_pkey`` and adds the id to the
``_INDEX`` set, so the record is both directly fetchable and enumerable by
``_list_all``. Used as the single write path whenever a prompt is created or
its status changes (executed, cancelled, last-execution updated).
Touches Redis: a ``SET`` on the per-prompt key plus a ``SADD`` to the index
set. Called by ``_schedule_prompt``, ``_run_cron``, ``_run_one_time`` and
``tick_scheduled_prompts`` in this module.
Args:
r: An async Redis client.
pd: The scheduled-prompt record; its ``id`` field supplies the key.
"""
pid = pd["id"]
await r.set(_pkey(pid), json.dumps(pd))
await r.sadd(_INDEX, pid)
async def _load(r, pid: str) -> Optional[Dict[str, Any]]:
"""Fetch and deserialize one scheduled-prompt record from Redis.
Reads the JSON blob stored at the prompt's ``_pkey`` and parses it back into
a dict, returning ``None`` when the key is absent (e.g. the prompt was
deleted but its id lingers in the index). The complementary read to
``_save``.
Touches Redis with a single ``GET``. Called by ``_list_all``, ``_delete``
(indirectly), ``_run_cron``, ``_run_one_time`` and ``_cancel_scheduled_prompt``
in this module.
Args:
r: An async Redis client.
pid: The prompt id whose record to load.
Returns:
Optional[Dict[str, Any]]: The decoded record, or ``None`` if no value is
stored at the key.
"""
raw = await r.get(_pkey(pid))
return json.loads(raw) if raw else None
async def _delete(r, pid: str) -> None:
"""Remove a scheduled-prompt record and drop its id from the index.
Inverse of ``_save``: deletes the per-prompt key and removes the id from the
``_INDEX`` set so the prompt no longer appears in ``_list_all``. Used by the
retention sweep to purge old executed/cancelled prompts.
Touches Redis with a ``DEL`` plus an ``SREM`` on the index set. Called by
``cleanup_expired_prompts`` in this module.
Args:
r: An async Redis client.
pid: The prompt id to delete.
"""
await r.delete(_pkey(pid))
await r.srem(_INDEX, pid)
async def _list_all(r) -> List[Dict[str, Any]]:
"""Load every scheduled-prompt record currently tracked in the index.
Reads the ``_INDEX`` set of prompt ids and loads each one via ``_load``,
silently skipping ids whose backing key has gone missing. This is the
enumeration primitive behind listing, the due-prompt tick, and the retention
sweep.
Touches Redis with an ``SMEMBERS`` on the index plus one ``GET`` per id (via
``_load``). Called by ``tick_scheduled_prompts``, ``cleanup_expired_prompts``
and ``_list_scheduled_prompts`` in this module.
Args:
r: An async Redis client.
Returns:
List[Dict[str, Any]]: All resolvable prompt records; ids with no value
are omitted.
"""
ids = await r.smembers(_INDEX)
out: list[dict] = []
for pid in ids:
pd = await _load(r, pid)
if pd:
out.append(pd)
return out
async def _execute_prompt(
platform: str,
channel_id: str,
prompt: str,
prompt_id: Optional[str] = None,
) -> str:
"""Hand a due prompt to the registered bot runner for delivery.
The single delivery primitive shared by every scheduling path. It wraps the
prompt in a ``scheduled_prompt_execution`` event and calls
``_bot_runner.handle_webhook`` directly, bypassing any HTTP round-trip.
This depends on a runner having been installed via ``set_bot_runner``. That
legacy hook predates the Phase T3 microservices split and is no longer wired
up in the current gateway/inference/agents/consolidation/web layout, so
``_bot_runner`` is normally ``None`` and this function short-circuits with an
error -- delivery is effectively a no-op unless a runner is supplied. All
exceptions are caught and returned as an error string; nothing is raised.
Called by ``_run_cron``, ``_run_one_time``, ``tick_scheduled_prompts`` and
``_send_immediate_prompt`` in this module.
Args:
platform: Platform key for the target conversation (e.g. ``discord``).
channel_id: Destination channel id.
prompt: The prompt text to execute.
prompt_id: Optional id of the scheduled prompt, used only for logging.
Returns:
str: ``"Prompt sent successfully"`` on success, or an error string when
the runner is unset or ``handle_webhook`` raises.
"""
if _bot_runner is None:
logger.error(
"Cannot execute scheduled prompt %s: " "bot runner not set",
prompt_id,
)
return "Error: bot runner not available"
try:
event_data = {
"type": "scheduled_prompt_execution",
"prompt": prompt,
"prompt_id": prompt_id,
}
await _bot_runner.handle_webhook(
platform,
channel_id,
event_data,
)
return "Prompt sent successfully"
except Exception as e:
logger.error(
"Error executing scheduled prompt %s: %s",
prompt_id,
e,
)
return f"Error: {e}"
def _parse_schedule_time(
schedule_time: str,
) -> Optional[datetime]:
"""Parse a flexible one-time schedule expression into a ``datetime``.
Accepts the several human-friendly formats the ``schedule_prompt`` tool
advertises and normalizes them to a single naive local ``datetime``: relative
offsets (``+30``, ``+2:15``, ``+1h30m``), natural-language offsets
(``in 2 hours``), a bare time-of-day (``14:30``, rolled to tomorrow if
already past), and absolute ISO / ``%Y-%m-%d %H:%M[:%S]`` timestamps
(timezone-aware inputs are converted to local and stripped of tzinfo).
Pure and side-effect-free -- it only reads ``datetime.now`` for relative
math. Called by ``_schedule_prompt`` in this module to turn user input into
the ``schedule_time`` stored on a one-time prompt.
Args:
schedule_time: The raw schedule expression in any supported format.
Returns:
Optional[datetime]: The resolved naive local datetime, or ``None`` if the
input is empty or matches no supported format.
"""
s = (schedule_time or "").strip()
if not s:
return None
now = datetime.now()
if s.startswith("+"):
p = s[1:].strip()
if ":" in p:
parts = p.split(":", 1)
try:
h = int(parts[0].strip() or 0)
m = int(parts[1].strip() or 0)
return now + timedelta(
hours=h,
minutes=m,
)
except Exception:
pass
m = re.match(
r"^(?:(\d+)\s*h(?:ours?)?)?" r"\s*(?:(\d+)\s*m(?:in(?:utes?)?)?)?$",
p,
re.IGNORECASE,
)
if m and (m.group(1) or m.group(2)):
return now + timedelta(
hours=int(m.group(1) or 0),
minutes=int(m.group(2) or 0),
)
if p.isdigit():
return now + timedelta(minutes=int(p))
lower = s.lower()
if lower.startswith("in "):
hours = minutes = 0
mh = re.search(
r"(\d+)\s*h(?:ours?|rs?)?",
lower,
)
if mh:
hours = int(mh.group(1))
mm = re.search(
r"(\d+)\s*m(?:in(?:utes?)?)?",
lower,
)
if mm:
minutes = int(mm.group(1))
if not hours and not minutes:
mn = re.search(r"in\s+(\d+)\s*$", lower)
if mn:
minutes = int(mn.group(1))
if hours or minutes:
return now + timedelta(
hours=hours,
minutes=minutes,
)
if re.match(r"^\d{2}:\d{2}(?::\d{2})?$", s):
fmt = "%H:%M:%S" if s.count(":") == 2 else "%H:%M"
pt = datetime.strptime(s, fmt).time()
dt = datetime.combine(now.date(), pt)
if dt < now:
dt += timedelta(days=1)
return dt
iso = s.replace(" ", "T")
if iso.endswith("Z"):
iso = iso[:-1] + "+00:00"
try:
dt = datetime.fromisoformat(iso)
if dt.tzinfo is not None:
dt = dt.astimezone().replace(tzinfo=None)
return dt
except Exception:
pass
for fmt in (
"%Y-%m-%d %H:%M:%S",
"%Y-%m-%d %H:%M",
):
try:
return datetime.strptime(s, fmt)
except ValueError:
continue
return None
def _validate_cron(expr: str) -> bool:
"""Report whether a string is a cron expression ``croniter`` can parse.
Cheap validation gate used before a cron-type prompt is persisted, so an
unparseable expression is rejected up front rather than blowing up later
inside the scheduler loop. It simply attempts to construct a ``croniter`` and
treats any exception as invalid.
Side-effect-free. Called by ``_schedule_prompt`` in this module.
Args:
expr: The candidate cron expression (e.g. ``0 */2 * * *``).
Returns:
bool: ``True`` if ``croniter`` accepts the expression, ``False``
otherwise.
"""
try:
croniter(expr)
return True
except Exception:
return False
# ---------------------------------------------------------------
# Background asyncio task runners
# ---------------------------------------------------------------
async def _run_cron(
r,
pid: str,
platform: str,
cid: str,
prompt: str,
cron_expr: str,
) -> None:
"""Drive one recurring cron prompt in a long-lived asyncio loop.
Spawned as a background task to own a single cron-type prompt for its whole
lifetime: it computes the next fire time with ``croniter``, sleeps until then,
fires the prompt, and repeats. Re-reading the record from Redis on each pass
lets a cancellation or status change made elsewhere stop the loop cleanly.
Registers and de-registers its id in the module-level ``_running_tasks`` set
(so re-hydration does not double-spawn it), reloads state via ``_load``,
delivers through ``_execute_prompt`` each cycle, and writes the updated
``last_execution`` timestamp back via ``_save``. Exceptions abort the loop
and are logged; the ``_running_tasks`` cleanup always runs in ``finally``.
Called by ``_schedule_prompt`` (on creation) and ``hydrate_scheduled_prompts``
(on startup re-spawn) in this module, both via ``asyncio.create_task``.
Args:
r: An async Redis client.
pid: The prompt id this task owns.
platform: Platform key for delivery.
cid: Destination channel id.
prompt: The prompt text to fire each cycle.
cron_expr: The cron expression driving the schedule.
"""
_running_tasks.add(pid)
try:
cron = croniter(cron_expr, datetime.now())
while True:
pd = await _load(r, pid)
if not pd or pd.get("status") not in (
"active",
"scheduled",
):
break
nxt = cron.get_next(datetime)
wait = (nxt - datetime.now()).total_seconds()
if wait > 0:
await asyncio.sleep(wait)
pd = await _load(r, pid)
if not pd or pd.get("status") == "cancelled":
break
await _execute_prompt(
platform,
cid,
prompt,
pid,
)
pd = await _load(r, pid)
if pd:
pd["last_execution"] = datetime.now().isoformat()
await _save(r, pd)
except Exception as e:
logger.error(
"Error in cron scheduler for %s: %s",
pid,
e,
)
finally:
_running_tasks.discard(pid)
async def _run_one_time(
r,
pid: str,
platform: str,
cid: str,
prompt: str,
sched: datetime,
) -> None:
"""Drive one one-time prompt: sleep until due, fire once, mark executed.
Spawned as a background task that owns a single one-time prompt. It sleeps
until the scheduled instant, re-checks that the prompt has not been cancelled
in the meantime, delivers it once, and records the terminal ``executed``
status. After firing it does not reschedule.
Registers and de-registers its id in ``_running_tasks`` (so re-hydration does
not double-spawn it), reloads the record via ``_load`` to honor a late
cancellation, delivers through ``_execute_prompt``, and persists the
``executed`` status and ``executed_at`` timestamp via ``_save``. Exceptions
are logged; the ``_running_tasks`` cleanup always runs in ``finally``.
Called by ``_schedule_prompt`` (on creation) and ``hydrate_scheduled_prompts``
(on startup re-spawn) in this module, both via ``asyncio.create_task``.
Args:
r: An async Redis client.
pid: The prompt id this task owns.
platform: Platform key for delivery.
cid: Destination channel id.
prompt: The prompt text to fire.
sched: The naive local datetime at which to fire.
"""
_running_tasks.add(pid)
try:
wait = (sched - datetime.now()).total_seconds()
if wait > 0:
await asyncio.sleep(wait)
pd = await _load(r, pid)
if pd and pd.get("status") != "cancelled":
await _execute_prompt(
platform,
cid,
prompt,
pid,
)
pd["status"] = "executed"
pd["executed_at"] = datetime.now().isoformat()
await _save(r, pd)
except Exception as e:
logger.error(
"Error executing prompt %s: %s",
pid,
e,
)
finally:
_running_tasks.discard(pid)
# ---------------------------------------------------------------
# Module-level functions (background_tasks.py)
# ---------------------------------------------------------------
[docs]
async def tick_scheduled_prompts(redis) -> None:
"""Scan all scheduled prompts and fire any that have come due.
The polling sweep that complements the per-prompt asyncio timers: it iterates
every active/scheduled record and dispatches the ones whose time has arrived,
so prompts still fire even if their dedicated timer task was never spawned or
was lost (for example across a restart before hydration). Each due delivery is
launched fire-and-forget via ``asyncio.create_task`` so a slow send does not
stall the sweep.
For one-time prompts it compares ``schedule_time`` to now and, on firing,
flips the record to ``executed``. For cron prompts it uses ``croniter`` to
find the previous fire boundary and fires when that boundary is newer than the
stored ``last_execution`` (or, on first run, the ``created_at`` time),
updating ``last_execution`` afterward. Reads via ``_list_all`` and persists
status changes via ``_save``; all errors are logged and swallowed so one bad
record cannot abort the tick.
Called by ``background_tasks.scheduled_prompt_tick``, which the agents
service's periodic scheduler runs roughly once a minute (registered in
``agents_main.py``).
Args:
redis: An async Redis client.
"""
try:
now = datetime.now()
for pd in await _list_all(redis):
status = pd.get("status")
if status not in ("scheduled", "active"):
continue
plat = pd.get("platform") or "discord"
if pd["type"] == "one_time" and (status == "scheduled"):
st = pd.get("schedule_time")
if not st:
continue
try:
if datetime.fromisoformat(st) <= now:
asyncio.create_task(
_execute_prompt(
plat,
pd["channel_id"],
pd["prompt"],
pd["id"],
)
)
pd["status"] = "executed"
pd["executed_at"] = now.isoformat()
await _save(redis, pd)
except Exception as e:
logger.error(
"Schedule parse error for " "%s: %s",
pd["id"],
e,
)
elif pd["type"] == "cron":
ce = pd.get("cron_expression")
if not ce:
continue
try:
cron = croniter(ce, now)
prev = cron.get_prev(datetime)
last = pd.get("last_execution")
execute = False
if not last:
ca = pd.get("created_at")
cdt = datetime.fromisoformat(ca) if ca else now
execute = prev >= cdt
else:
execute = datetime.fromisoformat(last) < prev
if execute:
asyncio.create_task(
_execute_prompt(
plat,
pd["channel_id"],
pd["prompt"],
pd["id"],
)
)
pd["last_execution"] = now.isoformat()
await _save(redis, pd)
except Exception as e:
logger.error(
"Cron error for %s: %s",
pd["id"],
e,
)
except Exception as e:
logger.error(
"Scheduled prompt tick error: %s",
e,
)
[docs]
async def hydrate_scheduled_prompts(
redis,
active_platforms: List[str],
limit: int = 500,
) -> int:
"""Re-spawn in-memory timer tasks for surviving scheduled prompts.
Recovery routine run at startup: the per-prompt ``_run_cron`` /
``_run_one_time`` asyncio tasks live only in process memory and are lost on
restart, so this walks the persisted index and re-instantiates a timer for
each still-pending prompt. Without it, after a restart prompts would only fire
via the slower ``tick_scheduled_prompts`` polling fallback.
Reads the ``_INDEX`` set and each record via ``_load``, skipping prompts that
are not ``active``/``scheduled``, that target a platform not in
*active_platforms*, that already have a live task in ``_running_tasks``, or
(for one-time prompts) whose time has already passed. For each eligible prompt
it adds the id to ``_running_tasks`` and launches ``_run_cron`` or
``_run_one_time`` via ``asyncio.create_task``. Processing stops once *limit*
prompts have been hydrated. All exceptions are caught and logged.
Called at startup by the agents service wiring (see callers of
``hydrate_scheduled_prompts``); also exercised directly by
``tests/test_scheduled_prompt_hydration.py``.
Args:
redis: An async Redis client.
active_platforms: Platform keys currently served; prompts for other
platforms are skipped so timers are not spawned for offline platforms.
limit: Maximum number of prompts to hydrate in one pass. Defaults to
``500``.
Returns:
int: The number of prompts for which a timer task was re-spawned (``0`` on
an empty index or on error).
"""
try:
pids = await redis.smembers(_INDEX)
if not pids:
return 0
count = 0
for pid in pids:
if isinstance(pid, bytes):
pid = pid.decode("utf-8")
if count >= limit:
logger.warning(
"Hydration limit reached (%d); skipping remaining.",
limit,
)
break
pd = await _load(redis, pid)
if not pd:
continue
status = pd.get("status")
if status not in ("active", "scheduled"):
continue
# Verify platform is active
plat = pd.get("platform")
if plat not in active_platforms:
logger.debug(
"Skipping hydration for prompt %s: platform %s not active",
pid,
plat,
)
continue
if pid in _running_tasks:
continue
# Re-spawn task
if pd.get("type") == "cron":
ce = pd.get("cron_expression")
if not ce:
continue
_running_tasks.add(pid)
asyncio.create_task(
_run_cron(
redis,
pid,
plat,
pd["channel_id"],
pd["prompt"],
ce,
)
)
count += 1
else:
st = pd.get("schedule_time")
if not st:
continue
try:
dt = datetime.fromisoformat(st)
if dt > datetime.now():
_running_tasks.add(pid)
asyncio.create_task(
_run_one_time(
redis,
pid,
plat,
pd["channel_id"],
pd["prompt"],
dt,
)
)
count += 1
except ValueError:
continue
if count > 0:
logger.info("Hydrated %d scheduled prompt(s)", count)
return count
except Exception:
logger.exception("Failed to hydrate scheduled prompts")
return 0
[docs]
async def cleanup_expired_prompts(
redis,
*,
days_to_keep: int = 30,
) -> str:
"""Purge old executed and cancelled prompts past a retention window.
Retention sweep that keeps the ``scheduled_prompts`` namespace from growing
unbounded: it deletes terminal records (status ``executed`` or ``cancelled``)
whose ``created_at`` is older than *days_to_keep*. Active and scheduled
prompts are always left untouched, as are records with an unparseable
``created_at`` (those are logged and skipped).
Reads candidates via ``_list_all`` and removes each match via ``_delete``
(which clears both the per-prompt key and its index entry). Errors are caught
and reported in the returned JSON rather than raised.
Called by ``background_tasks.scheduled_prompt_cleanup`` (which passes
``days_to_keep=7``) on the agents service's periodic scheduler, and by the
``cleanup_expired_prompts`` tool handler ``_cleanup_expired_tool`` in this
module.
Args:
redis: An async Redis client.
days_to_keep: Age threshold in days; terminal prompts older than this are
removed. Defaults to ``30``.
Returns:
str: A JSON string -- on success
``{"success": True, "message": ..., "days_kept": int}``; on failure
``{"error": ...}``.
"""
try:
cutoff = datetime.now() - timedelta(
days=days_to_keep,
)
cleaned = 0
for pd in await _list_all(redis):
if pd.get("status") not in (
"executed",
"cancelled",
):
continue
ca = pd.get("created_at")
if not ca:
continue
try:
if datetime.fromisoformat(ca) < cutoff:
await _delete(redis, pd["id"])
cleaned += 1
except Exception as e:
logger.warning(
"Parse error for prompt %s: %s",
pd["id"],
e,
)
return json.dumps(
{
"success": True,
"message": (f"Cleaned up {cleaned} expired prompts"),
"days_kept": days_to_keep,
}
)
except Exception as e:
logger.error("Prompt cleanup error: %s", e)
return json.dumps(
{
"error": f"Failed to cleanup prompts: {e}",
}
)
# ---------------------------------------------------------------
# Tool handlers
# ---------------------------------------------------------------
async def _schedule_prompt(
prompt: str,
schedule_time: Optional[str] = None,
cron_expression: Optional[str] = None,
description: Optional[str] = None,
ctx: ToolContext | None = None,
) -> str:
"""Create a one-time or recurring scheduled prompt and arm its timer.
Handler for the ``schedule_prompt`` tool. Validates the request (exactly one
of *schedule_time* or *cron_expression* must be supplied), persists a new
prompt record, and immediately spawns the matching background timer so the
prompt will fire without waiting for the next polling tick.
Pulls the target ``channel_id`` and ``platform`` off the context and Redis off
``ctx.redis``. For a one-time prompt it resolves the time via
``_parse_schedule_time``, stores a ``one_time``/``scheduled`` record with
``_save``, and launches ``_run_one_time``; for a cron prompt it validates the
expression via ``_validate_cron``, stores a ``cron``/``active`` record, and
launches ``_run_cron`` -- each id is registered in ``_running_tasks`` and the
task started with ``asyncio.create_task``. All failures are returned as JSON
error strings rather than raised.
Called by the tool dispatcher (``ToolRegistry.execute_tool``) via this
module's ``TOOLS`` entry named ``schedule_prompt`` -- no direct internal
callers.
Args:
prompt: The prompt text to schedule (required).
schedule_time: A one-time schedule expression in any format
``_parse_schedule_time`` accepts. Mutually exclusive with
*cron_expression*.
cron_expression: A cron expression for a recurring prompt. Mutually
exclusive with *schedule_time*.
description: Optional human-readable label stored on the record.
ctx: The :class:`~tool_context.ToolContext`; supplies ``channel_id``,
``platform`` and ``redis``.
Returns:
str: A JSON string -- on success
``{"success": True, "message": ..., "prompt_id": str}``; on validation or
runtime failure ``{"error": ...}``.
"""
cid = ctx.channel_id
plat = ctx.platform
if not prompt:
return json.dumps(
{
"error": "prompt is required",
}
)
if not schedule_time and not cron_expression:
return json.dumps(
{
"error": (
"Either schedule_time or " "cron_expression must be provided"
),
}
)
if schedule_time and cron_expression:
return json.dumps(
{
"error": ("Cannot specify both schedule_time " "and cron_expression"),
}
)
r = getattr(ctx, "redis", None)
if r is None:
return json.dumps(
{
"error": "Redis not available",
}
)
pid = str(uuid.uuid4())
try:
if schedule_time:
dt = _parse_schedule_time(schedule_time)
if not dt:
return json.dumps(
{
"error": ("Invalid schedule_time format: " + schedule_time),
}
)
pd = {
"id": pid,
"platform": plat,
"channel_id": cid,
"prompt": prompt,
"type": "one_time",
"schedule_time": dt.isoformat(),
"description": description or "",
"created_at": (datetime.now().isoformat()),
"status": "scheduled",
}
await _save(r, pd)
_running_tasks.add(pid)
asyncio.create_task(
_run_one_time(
r,
pid,
plat,
cid,
prompt,
dt,
)
)
return json.dumps(
{
"success": True,
"message": (
"Prompt scheduled for " + dt.strftime("%Y-%m-%d %H:%M:%S")
),
"prompt_id": pid,
}
)
else:
if not _validate_cron(cron_expression):
return json.dumps(
{
"error": ("Invalid cron expression: " + cron_expression),
}
)
pd = {
"id": pid,
"platform": plat,
"channel_id": cid,
"prompt": prompt,
"type": "cron",
"cron_expression": cron_expression,
"description": description or "",
"created_at": (datetime.now().isoformat()),
"status": "active",
"last_execution": None,
}
await _save(r, pd)
_running_tasks.add(pid)
asyncio.create_task(
_run_cron(
r,
pid,
plat,
cid,
prompt,
cron_expression,
)
)
return json.dumps(
{
"success": True,
"message": (
"Cron prompt scheduled with " "expression: " + cron_expression
),
"prompt_id": pid,
}
)
except Exception as e:
logger.error("Error scheduling prompt: %s", e)
return json.dumps(
{
"error": (f"Failed to schedule prompt: {e}"),
}
)
async def _list_scheduled_prompts(
ctx: ToolContext | None = None,
) -> str:
"""List every scheduled prompt with its schedule and status.
Handler for the ``list_scheduled_prompts`` tool. Enumerates all persisted
prompts and projects each into a compact summary (id, channel, type,
schedule_time/cron_expression, description, status, and the various
timestamps) for display back to the model or user.
Reads Redis off ``ctx.redis`` and loads records via ``_list_all``. Errors are
returned as JSON error strings rather than raised.
Called by the tool dispatcher (``ToolRegistry.execute_tool``) via this
module's ``TOOLS`` entry named ``list_scheduled_prompts`` -- no direct
internal callers.
Args:
ctx: The :class:`~tool_context.ToolContext`; ``ctx.redis`` supplies the
client.
Returns:
str: An indented JSON string -- on success
``{"success": True, "count": int, "prompts": [...]}``; on failure
``{"error": ...}``.
"""
r = getattr(ctx, "redis", None)
if r is None:
return json.dumps(
{
"error": "Redis not available",
}
)
try:
items = []
for pd in await _list_all(r):
items.append(
{
"id": pd["id"],
"channel_id": pd["channel_id"],
"type": pd["type"],
"schedule_time": pd.get(
"schedule_time",
),
"cron_expression": pd.get(
"cron_expression",
),
"description": pd.get(
"description",
"",
),
"status": pd["status"],
"created_at": pd["created_at"],
"last_execution": pd.get(
"last_execution",
),
"executed_at": pd.get("executed_at"),
}
)
return json.dumps(
{
"success": True,
"count": len(items),
"prompts": items,
},
indent=2,
)
except Exception as e:
logger.error(
"Error listing scheduled prompts: %s",
e,
)
return json.dumps(
{
"error": f"Failed to list prompts: {e}",
}
)
async def _cancel_scheduled_prompt(
prompt_id: str,
ctx: ToolContext | None = None,
) -> str:
"""Cancel a scheduled prompt by id by marking it cancelled.
Handler for the ``cancel_scheduled_prompt`` tool. Looks up the record and
flips its status to ``cancelled`` with a ``cancelled_at`` timestamp; it does
not delete the record (the retention sweep does that later). The running
timer task is not torn down directly -- instead ``_run_cron`` and
``_run_one_time`` observe the changed status on their next Redis re-read and
exit on their own.
Reads Redis off ``ctx.redis``, loads the record via ``_load`` and persists the
status change via ``_save``. A missing id and any error are returned as JSON
error strings.
Called by the tool dispatcher (``ToolRegistry.execute_tool``) via this
module's ``TOOLS`` entry named ``cancel_scheduled_prompt`` -- no direct
internal callers.
Args:
prompt_id: Id of the prompt to cancel.
ctx: The :class:`~tool_context.ToolContext`; ``ctx.redis`` supplies the
client.
Returns:
str: A JSON string -- on success
``{"success": True, "message": ..., "prompt_id": str}``; when not found or
on error ``{"error": ...}``.
"""
r = getattr(ctx, "redis", None)
if r is None:
return json.dumps(
{
"error": "Redis not available",
}
)
try:
pd = await _load(r, prompt_id)
if not pd:
return json.dumps(
{
"error": ("Scheduled prompt with ID " f"'{prompt_id}' not found"),
}
)
pd["status"] = "cancelled"
pd["cancelled_at"] = datetime.now().isoformat()
await _save(r, pd)
return json.dumps(
{
"success": True,
"message": (f"Scheduled prompt '{prompt_id}' " "has been cancelled"),
"prompt_id": prompt_id,
}
)
except Exception as e:
logger.error(
"Error cancelling prompt: %s",
e,
)
return json.dumps(
{
"error": (f"Failed to cancel prompt: {e}"),
}
)
async def _send_immediate_prompt(
prompt: str,
channel_id: str = "",
ctx: ToolContext | None = None,
) -> str:
"""Fire a prompt right now, bypassing scheduling and persistence.
Handler for the ``send_immediate_prompt`` tool. Delivers the prompt straight
away via ``_execute_prompt`` without creating or storing any scheduled
record, for the case where the model wants to trigger a turn now rather than
later.
Uses the explicit *channel_id* when given, otherwise falls back to
``ctx.channel_id``, and reads the platform off ``ctx.platform``. Because it
routes through ``_execute_prompt``, delivery depends on a bot runner having
been installed via ``set_bot_runner`` -- under the current microservices
layout that hook is unset, so this normally returns the runner-unavailable
error. Failures are returned as JSON error strings.
Called by the tool dispatcher (``ToolRegistry.execute_tool``) via this
module's ``TOOLS`` entry named ``send_immediate_prompt`` -- no direct internal
callers.
Args:
prompt: The prompt text to send (required).
channel_id: Target channel id; defaults to ``ctx.channel_id`` when empty.
ctx: The :class:`~tool_context.ToolContext`; supplies ``channel_id`` and
``platform``.
Returns:
str: A JSON string -- on success
``{"success": True, "message": ..., "channel_id": str}``; on failure
``{"error": ...}``.
"""
cid = channel_id or ctx.channel_id
plat = ctx.platform
if not prompt:
return json.dumps(
{
"error": "prompt is required",
}
)
try:
result = await _execute_prompt(
plat,
cid,
prompt,
)
return json.dumps(
{
"success": True,
"message": result,
"channel_id": cid,
}
)
except Exception as e:
logger.error(
"Error sending immediate prompt: %s",
e,
)
return json.dumps(
{
"error": f"Failed to send prompt: {e}",
}
)
async def _cleanup_expired_tool(
days_to_keep: int = 30,
ctx: ToolContext | None = None,
) -> str:
"""Tool-facing wrapper around the prompt retention sweep.
Handler for the ``cleanup_expired_prompts`` tool. Pulls the Redis client off
the context and delegates to the module-level ``cleanup_expired_prompts``,
exposing the same purge of old executed/cancelled prompts as a tool the model
can invoke on demand (the background scheduler also runs it independently).
Reads Redis off ``ctx.redis`` and returns a Redis-unavailable error if it is
absent.
Called by the tool dispatcher (``ToolRegistry.execute_tool``) via this
module's ``TOOLS`` entry named ``cleanup_expired_prompts`` -- no direct
internal callers.
Args:
days_to_keep: Age threshold in days forwarded to
``cleanup_expired_prompts``. Defaults to ``30``.
ctx: The :class:`~tool_context.ToolContext`; ``ctx.redis`` supplies the
client.
Returns:
str: The JSON result string from ``cleanup_expired_prompts``, or a
``{"error": ...}`` JSON string when Redis is unavailable.
"""
r = getattr(ctx, "redis", None)
if r is None:
return json.dumps(
{
"error": "Redis not available",
}
)
return await cleanup_expired_prompts(
r,
days_to_keep=days_to_keep,
)
# ---------------------------------------------------------------
# Multi-tool registration
# ---------------------------------------------------------------
TOOLS = [
{
"name": "schedule_prompt",
"description": (
"Schedule a prompt to be sent to the "
"LLM at a specified time or on a cron "
"schedule. Supports relative (+30, +2:15), "
"absolute (2025-09-19 14:30), time-of-day "
"(14:30), and cron expressions "
"(0 */2 * * *)."
),
"parameters": {
"type": "object",
"properties": {
"prompt": {
"type": "string",
"description": ("The prompt/message to send."),
},
"schedule_time": {
"type": "string",
"description": (
"One-time execution time " "(see supported formats)."
),
},
"cron_expression": {
"type": "string",
"description": ("Cron expression for " "recurring prompts."),
},
"description": {
"type": "string",
"description": ("Optional description of the " "scheduled prompt."),
},
},
"required": ["prompt"],
},
"handler": _schedule_prompt,
},
{
"name": "list_scheduled_prompts",
"description": ("List all currently scheduled prompts " "and their status."),
"parameters": {
"type": "object",
"properties": {},
},
"handler": _list_scheduled_prompts,
},
{
"name": "cancel_scheduled_prompt",
"description": ("Cancel a scheduled prompt by its ID."),
"parameters": {
"type": "object",
"properties": {
"prompt_id": {
"type": "string",
"description": ("The ID of the prompt " "to cancel."),
},
},
"required": ["prompt_id"],
},
"handler": _cancel_scheduled_prompt,
},
{
"name": "send_immediate_prompt",
"description": (
"Send a prompt immediately to the LLM " "for execution (no scheduling)."
),
"parameters": {
"type": "object",
"properties": {
"prompt": {
"type": "string",
"description": ("The prompt/message to send."),
},
"channel_id": {
"type": "string",
"description": ("Target channel " "(defaults to current)."),
},
},
"required": ["prompt"],
},
"handler": _send_immediate_prompt,
},
{
"name": "cleanup_expired_prompts",
"description": (
"Clean up old executed and cancelled " "prompts from Redis storage."
),
"parameters": {
"type": "object",
"properties": {
"days_to_keep": {
"type": "integer",
"description": (
"Number of days to keep "
"prompts before cleanup "
"(default: 30)."
),
},
},
},
"handler": _cleanup_expired_tool,
},
]