Source code for tools.scheduled_prompt

"""Cron-based scheduled prompt tools (v3)

Schedule prompts to fire at specified times or on cron
intervals. Prompt metadata is persisted in Redis. When a prompt
comes due it is handed to a runner registered via
``set_bot_runner`` (``runner.handle_webhook``); that legacy hook
predates the Phase T3 microservices split and is no longer wired
up, so delivery is a no-op unless a runner is supplied.

Module-level ``tick_scheduled_prompts`` and
``cleanup_expired_prompts`` are used by background_tasks.py
and accept a ``redis`` client directly.
"""

from __future__ import annotations

import asyncio
import jsonutil as json
import logging
import re
import uuid
from datetime import datetime, timedelta
from typing import (
    Any,
    Dict,
    List,
    Optional,
    TYPE_CHECKING,
)

from croniter import croniter

if TYPE_CHECKING:
    from tool_context import ToolContext

logger = logging.getLogger(__name__)

_bot_runner: Any = None
_running_tasks: set[str] = set()


[docs] def set_bot_runner(runner: Any) -> None: """Store a runner reference used by ``_execute_prompt`` to deliver a prompt via ``runner.handle_webhook``. This hook predates the Phase T3 split into separate gateway, inference, agents, consolidation, and web services. Nothing in the current microservices layout calls it, so ``_bot_runner`` normally stays ``None`` and ``_execute_prompt`` short-circuits with an error until a runner is supplied. """ global _bot_runner _bot_runner = runner
_PREFIX = "stargazer:scheduled_prompts:" _INDEX = "stargazer:scheduled_prompts:index" # --------------------------------------------------------------- # Internal helpers # --------------------------------------------------------------- def _pkey(prompt_id: str) -> str: """Build the Redis key under which one scheduled prompt is stored. Namespacing helper that prefixes a prompt id with ``_PREFIX`` (``stargazer:scheduled_prompts:``) so every persisted prompt lives under a predictable key. Centralizing the format here keeps ``_save``, ``_load`` and ``_delete`` in agreement. Called by ``_save``, ``_load`` and ``_delete`` in this module. Args: prompt_id: The UUID string identifying the scheduled prompt. Returns: str: The fully-qualified Redis key for that prompt's JSON blob. """ return f"{_PREFIX}{prompt_id}" async def _save(r, pd: Dict[str, Any]) -> None: """Persist a scheduled-prompt record to Redis and index its id. Serializes the prompt dict to JSON under its ``_pkey`` and adds the id to the ``_INDEX`` set, so the record is both directly fetchable and enumerable by ``_list_all``. Used as the single write path whenever a prompt is created or its status changes (executed, cancelled, last-execution updated). Touches Redis: a ``SET`` on the per-prompt key plus a ``SADD`` to the index set. Called by ``_schedule_prompt``, ``_run_cron``, ``_run_one_time`` and ``tick_scheduled_prompts`` in this module. Args: r: An async Redis client. pd: The scheduled-prompt record; its ``id`` field supplies the key. """ pid = pd["id"] await r.set(_pkey(pid), json.dumps(pd)) await r.sadd(_INDEX, pid) async def _load(r, pid: str) -> Optional[Dict[str, Any]]: """Fetch and deserialize one scheduled-prompt record from Redis. Reads the JSON blob stored at the prompt's ``_pkey`` and parses it back into a dict, returning ``None`` when the key is absent (e.g. the prompt was deleted but its id lingers in the index). The complementary read to ``_save``. Touches Redis with a single ``GET``. Called by ``_list_all``, ``_delete`` (indirectly), ``_run_cron``, ``_run_one_time`` and ``_cancel_scheduled_prompt`` in this module. Args: r: An async Redis client. pid: The prompt id whose record to load. Returns: Optional[Dict[str, Any]]: The decoded record, or ``None`` if no value is stored at the key. """ raw = await r.get(_pkey(pid)) return json.loads(raw) if raw else None async def _delete(r, pid: str) -> None: """Remove a scheduled-prompt record and drop its id from the index. Inverse of ``_save``: deletes the per-prompt key and removes the id from the ``_INDEX`` set so the prompt no longer appears in ``_list_all``. Used by the retention sweep to purge old executed/cancelled prompts. Touches Redis with a ``DEL`` plus an ``SREM`` on the index set. Called by ``cleanup_expired_prompts`` in this module. Args: r: An async Redis client. pid: The prompt id to delete. """ await r.delete(_pkey(pid)) await r.srem(_INDEX, pid) async def _list_all(r) -> List[Dict[str, Any]]: """Load every scheduled-prompt record currently tracked in the index. Reads the ``_INDEX`` set of prompt ids and loads each one via ``_load``, silently skipping ids whose backing key has gone missing. This is the enumeration primitive behind listing, the due-prompt tick, and the retention sweep. Touches Redis with an ``SMEMBERS`` on the index plus one ``GET`` per id (via ``_load``). Called by ``tick_scheduled_prompts``, ``cleanup_expired_prompts`` and ``_list_scheduled_prompts`` in this module. Args: r: An async Redis client. Returns: List[Dict[str, Any]]: All resolvable prompt records; ids with no value are omitted. """ ids = await r.smembers(_INDEX) out: list[dict] = [] for pid in ids: pd = await _load(r, pid) if pd: out.append(pd) return out async def _execute_prompt( platform: str, channel_id: str, prompt: str, prompt_id: Optional[str] = None, ) -> str: """Hand a due prompt to the registered bot runner for delivery. The single delivery primitive shared by every scheduling path. It wraps the prompt in a ``scheduled_prompt_execution`` event and calls ``_bot_runner.handle_webhook`` directly, bypassing any HTTP round-trip. This depends on a runner having been installed via ``set_bot_runner``. That legacy hook predates the Phase T3 microservices split and is no longer wired up in the current gateway/inference/agents/consolidation/web layout, so ``_bot_runner`` is normally ``None`` and this function short-circuits with an error -- delivery is effectively a no-op unless a runner is supplied. All exceptions are caught and returned as an error string; nothing is raised. Called by ``_run_cron``, ``_run_one_time``, ``tick_scheduled_prompts`` and ``_send_immediate_prompt`` in this module. Args: platform: Platform key for the target conversation (e.g. ``discord``). channel_id: Destination channel id. prompt: The prompt text to execute. prompt_id: Optional id of the scheduled prompt, used only for logging. Returns: str: ``"Prompt sent successfully"`` on success, or an error string when the runner is unset or ``handle_webhook`` raises. """ if _bot_runner is None: logger.error( "Cannot execute scheduled prompt %s: " "bot runner not set", prompt_id, ) return "Error: bot runner not available" try: event_data = { "type": "scheduled_prompt_execution", "prompt": prompt, "prompt_id": prompt_id, } await _bot_runner.handle_webhook( platform, channel_id, event_data, ) return "Prompt sent successfully" except Exception as e: logger.error( "Error executing scheduled prompt %s: %s", prompt_id, e, ) return f"Error: {e}" def _parse_schedule_time( schedule_time: str, ) -> Optional[datetime]: """Parse a flexible one-time schedule expression into a ``datetime``. Accepts the several human-friendly formats the ``schedule_prompt`` tool advertises and normalizes them to a single naive local ``datetime``: relative offsets (``+30``, ``+2:15``, ``+1h30m``), natural-language offsets (``in 2 hours``), a bare time-of-day (``14:30``, rolled to tomorrow if already past), and absolute ISO / ``%Y-%m-%d %H:%M[:%S]`` timestamps (timezone-aware inputs are converted to local and stripped of tzinfo). Pure and side-effect-free -- it only reads ``datetime.now`` for relative math. Called by ``_schedule_prompt`` in this module to turn user input into the ``schedule_time`` stored on a one-time prompt. Args: schedule_time: The raw schedule expression in any supported format. Returns: Optional[datetime]: The resolved naive local datetime, or ``None`` if the input is empty or matches no supported format. """ s = (schedule_time or "").strip() if not s: return None now = datetime.now() if s.startswith("+"): p = s[1:].strip() if ":" in p: parts = p.split(":", 1) try: h = int(parts[0].strip() or 0) m = int(parts[1].strip() or 0) return now + timedelta( hours=h, minutes=m, ) except Exception: pass m = re.match( r"^(?:(\d+)\s*h(?:ours?)?)?" r"\s*(?:(\d+)\s*m(?:in(?:utes?)?)?)?$", p, re.IGNORECASE, ) if m and (m.group(1) or m.group(2)): return now + timedelta( hours=int(m.group(1) or 0), minutes=int(m.group(2) or 0), ) if p.isdigit(): return now + timedelta(minutes=int(p)) lower = s.lower() if lower.startswith("in "): hours = minutes = 0 mh = re.search( r"(\d+)\s*h(?:ours?|rs?)?", lower, ) if mh: hours = int(mh.group(1)) mm = re.search( r"(\d+)\s*m(?:in(?:utes?)?)?", lower, ) if mm: minutes = int(mm.group(1)) if not hours and not minutes: mn = re.search(r"in\s+(\d+)\s*$", lower) if mn: minutes = int(mn.group(1)) if hours or minutes: return now + timedelta( hours=hours, minutes=minutes, ) if re.match(r"^\d{2}:\d{2}(?::\d{2})?$", s): fmt = "%H:%M:%S" if s.count(":") == 2 else "%H:%M" pt = datetime.strptime(s, fmt).time() dt = datetime.combine(now.date(), pt) if dt < now: dt += timedelta(days=1) return dt iso = s.replace(" ", "T") if iso.endswith("Z"): iso = iso[:-1] + "+00:00" try: dt = datetime.fromisoformat(iso) if dt.tzinfo is not None: dt = dt.astimezone().replace(tzinfo=None) return dt except Exception: pass for fmt in ( "%Y-%m-%d %H:%M:%S", "%Y-%m-%d %H:%M", ): try: return datetime.strptime(s, fmt) except ValueError: continue return None def _validate_cron(expr: str) -> bool: """Report whether a string is a cron expression ``croniter`` can parse. Cheap validation gate used before a cron-type prompt is persisted, so an unparseable expression is rejected up front rather than blowing up later inside the scheduler loop. It simply attempts to construct a ``croniter`` and treats any exception as invalid. Side-effect-free. Called by ``_schedule_prompt`` in this module. Args: expr: The candidate cron expression (e.g. ``0 */2 * * *``). Returns: bool: ``True`` if ``croniter`` accepts the expression, ``False`` otherwise. """ try: croniter(expr) return True except Exception: return False # --------------------------------------------------------------- # Background asyncio task runners # --------------------------------------------------------------- async def _run_cron( r, pid: str, platform: str, cid: str, prompt: str, cron_expr: str, ) -> None: """Drive one recurring cron prompt in a long-lived asyncio loop. Spawned as a background task to own a single cron-type prompt for its whole lifetime: it computes the next fire time with ``croniter``, sleeps until then, fires the prompt, and repeats. Re-reading the record from Redis on each pass lets a cancellation or status change made elsewhere stop the loop cleanly. Registers and de-registers its id in the module-level ``_running_tasks`` set (so re-hydration does not double-spawn it), reloads state via ``_load``, delivers through ``_execute_prompt`` each cycle, and writes the updated ``last_execution`` timestamp back via ``_save``. Exceptions abort the loop and are logged; the ``_running_tasks`` cleanup always runs in ``finally``. Called by ``_schedule_prompt`` (on creation) and ``hydrate_scheduled_prompts`` (on startup re-spawn) in this module, both via ``asyncio.create_task``. Args: r: An async Redis client. pid: The prompt id this task owns. platform: Platform key for delivery. cid: Destination channel id. prompt: The prompt text to fire each cycle. cron_expr: The cron expression driving the schedule. """ _running_tasks.add(pid) try: cron = croniter(cron_expr, datetime.now()) while True: pd = await _load(r, pid) if not pd or pd.get("status") not in ( "active", "scheduled", ): break nxt = cron.get_next(datetime) wait = (nxt - datetime.now()).total_seconds() if wait > 0: await asyncio.sleep(wait) pd = await _load(r, pid) if not pd or pd.get("status") == "cancelled": break await _execute_prompt( platform, cid, prompt, pid, ) pd = await _load(r, pid) if pd: pd["last_execution"] = datetime.now().isoformat() await _save(r, pd) except Exception as e: logger.error( "Error in cron scheduler for %s: %s", pid, e, ) finally: _running_tasks.discard(pid) async def _run_one_time( r, pid: str, platform: str, cid: str, prompt: str, sched: datetime, ) -> None: """Drive one one-time prompt: sleep until due, fire once, mark executed. Spawned as a background task that owns a single one-time prompt. It sleeps until the scheduled instant, re-checks that the prompt has not been cancelled in the meantime, delivers it once, and records the terminal ``executed`` status. After firing it does not reschedule. Registers and de-registers its id in ``_running_tasks`` (so re-hydration does not double-spawn it), reloads the record via ``_load`` to honor a late cancellation, delivers through ``_execute_prompt``, and persists the ``executed`` status and ``executed_at`` timestamp via ``_save``. Exceptions are logged; the ``_running_tasks`` cleanup always runs in ``finally``. Called by ``_schedule_prompt`` (on creation) and ``hydrate_scheduled_prompts`` (on startup re-spawn) in this module, both via ``asyncio.create_task``. Args: r: An async Redis client. pid: The prompt id this task owns. platform: Platform key for delivery. cid: Destination channel id. prompt: The prompt text to fire. sched: The naive local datetime at which to fire. """ _running_tasks.add(pid) try: wait = (sched - datetime.now()).total_seconds() if wait > 0: await asyncio.sleep(wait) pd = await _load(r, pid) if pd and pd.get("status") != "cancelled": await _execute_prompt( platform, cid, prompt, pid, ) pd["status"] = "executed" pd["executed_at"] = datetime.now().isoformat() await _save(r, pd) except Exception as e: logger.error( "Error executing prompt %s: %s", pid, e, ) finally: _running_tasks.discard(pid) # --------------------------------------------------------------- # Module-level functions (background_tasks.py) # ---------------------------------------------------------------
[docs] async def tick_scheduled_prompts(redis) -> None: """Scan all scheduled prompts and fire any that have come due. The polling sweep that complements the per-prompt asyncio timers: it iterates every active/scheduled record and dispatches the ones whose time has arrived, so prompts still fire even if their dedicated timer task was never spawned or was lost (for example across a restart before hydration). Each due delivery is launched fire-and-forget via ``asyncio.create_task`` so a slow send does not stall the sweep. For one-time prompts it compares ``schedule_time`` to now and, on firing, flips the record to ``executed``. For cron prompts it uses ``croniter`` to find the previous fire boundary and fires when that boundary is newer than the stored ``last_execution`` (or, on first run, the ``created_at`` time), updating ``last_execution`` afterward. Reads via ``_list_all`` and persists status changes via ``_save``; all errors are logged and swallowed so one bad record cannot abort the tick. Called by ``background_tasks.scheduled_prompt_tick``, which the agents service's periodic scheduler runs roughly once a minute (registered in ``agents_main.py``). Args: redis: An async Redis client. """ try: now = datetime.now() for pd in await _list_all(redis): status = pd.get("status") if status not in ("scheduled", "active"): continue plat = pd.get("platform") or "discord" if pd["type"] == "one_time" and (status == "scheduled"): st = pd.get("schedule_time") if not st: continue try: if datetime.fromisoformat(st) <= now: asyncio.create_task( _execute_prompt( plat, pd["channel_id"], pd["prompt"], pd["id"], ) ) pd["status"] = "executed" pd["executed_at"] = now.isoformat() await _save(redis, pd) except Exception as e: logger.error( "Schedule parse error for " "%s: %s", pd["id"], e, ) elif pd["type"] == "cron": ce = pd.get("cron_expression") if not ce: continue try: cron = croniter(ce, now) prev = cron.get_prev(datetime) last = pd.get("last_execution") execute = False if not last: ca = pd.get("created_at") cdt = datetime.fromisoformat(ca) if ca else now execute = prev >= cdt else: execute = datetime.fromisoformat(last) < prev if execute: asyncio.create_task( _execute_prompt( plat, pd["channel_id"], pd["prompt"], pd["id"], ) ) pd["last_execution"] = now.isoformat() await _save(redis, pd) except Exception as e: logger.error( "Cron error for %s: %s", pd["id"], e, ) except Exception as e: logger.error( "Scheduled prompt tick error: %s", e, )
[docs] async def hydrate_scheduled_prompts( redis, active_platforms: List[str], limit: int = 500, ) -> int: """Re-spawn in-memory timer tasks for surviving scheduled prompts. Recovery routine run at startup: the per-prompt ``_run_cron`` / ``_run_one_time`` asyncio tasks live only in process memory and are lost on restart, so this walks the persisted index and re-instantiates a timer for each still-pending prompt. Without it, after a restart prompts would only fire via the slower ``tick_scheduled_prompts`` polling fallback. Reads the ``_INDEX`` set and each record via ``_load``, skipping prompts that are not ``active``/``scheduled``, that target a platform not in *active_platforms*, that already have a live task in ``_running_tasks``, or (for one-time prompts) whose time has already passed. For each eligible prompt it adds the id to ``_running_tasks`` and launches ``_run_cron`` or ``_run_one_time`` via ``asyncio.create_task``. Processing stops once *limit* prompts have been hydrated. All exceptions are caught and logged. Called at startup by the agents service wiring (see callers of ``hydrate_scheduled_prompts``); also exercised directly by ``tests/test_scheduled_prompt_hydration.py``. Args: redis: An async Redis client. active_platforms: Platform keys currently served; prompts for other platforms are skipped so timers are not spawned for offline platforms. limit: Maximum number of prompts to hydrate in one pass. Defaults to ``500``. Returns: int: The number of prompts for which a timer task was re-spawned (``0`` on an empty index or on error). """ try: pids = await redis.smembers(_INDEX) if not pids: return 0 count = 0 for pid in pids: if isinstance(pid, bytes): pid = pid.decode("utf-8") if count >= limit: logger.warning( "Hydration limit reached (%d); skipping remaining.", limit, ) break pd = await _load(redis, pid) if not pd: continue status = pd.get("status") if status not in ("active", "scheduled"): continue # Verify platform is active plat = pd.get("platform") if plat not in active_platforms: logger.debug( "Skipping hydration for prompt %s: platform %s not active", pid, plat, ) continue if pid in _running_tasks: continue # Re-spawn task if pd.get("type") == "cron": ce = pd.get("cron_expression") if not ce: continue _running_tasks.add(pid) asyncio.create_task( _run_cron( redis, pid, plat, pd["channel_id"], pd["prompt"], ce, ) ) count += 1 else: st = pd.get("schedule_time") if not st: continue try: dt = datetime.fromisoformat(st) if dt > datetime.now(): _running_tasks.add(pid) asyncio.create_task( _run_one_time( redis, pid, plat, pd["channel_id"], pd["prompt"], dt, ) ) count += 1 except ValueError: continue if count > 0: logger.info("Hydrated %d scheduled prompt(s)", count) return count except Exception: logger.exception("Failed to hydrate scheduled prompts") return 0
[docs] async def cleanup_expired_prompts( redis, *, days_to_keep: int = 30, ) -> str: """Purge old executed and cancelled prompts past a retention window. Retention sweep that keeps the ``scheduled_prompts`` namespace from growing unbounded: it deletes terminal records (status ``executed`` or ``cancelled``) whose ``created_at`` is older than *days_to_keep*. Active and scheduled prompts are always left untouched, as are records with an unparseable ``created_at`` (those are logged and skipped). Reads candidates via ``_list_all`` and removes each match via ``_delete`` (which clears both the per-prompt key and its index entry). Errors are caught and reported in the returned JSON rather than raised. Called by ``background_tasks.scheduled_prompt_cleanup`` (which passes ``days_to_keep=7``) on the agents service's periodic scheduler, and by the ``cleanup_expired_prompts`` tool handler ``_cleanup_expired_tool`` in this module. Args: redis: An async Redis client. days_to_keep: Age threshold in days; terminal prompts older than this are removed. Defaults to ``30``. Returns: str: A JSON string -- on success ``{"success": True, "message": ..., "days_kept": int}``; on failure ``{"error": ...}``. """ try: cutoff = datetime.now() - timedelta( days=days_to_keep, ) cleaned = 0 for pd in await _list_all(redis): if pd.get("status") not in ( "executed", "cancelled", ): continue ca = pd.get("created_at") if not ca: continue try: if datetime.fromisoformat(ca) < cutoff: await _delete(redis, pd["id"]) cleaned += 1 except Exception as e: logger.warning( "Parse error for prompt %s: %s", pd["id"], e, ) return json.dumps( { "success": True, "message": (f"Cleaned up {cleaned} expired prompts"), "days_kept": days_to_keep, } ) except Exception as e: logger.error("Prompt cleanup error: %s", e) return json.dumps( { "error": f"Failed to cleanup prompts: {e}", } )
# --------------------------------------------------------------- # Tool handlers # --------------------------------------------------------------- async def _schedule_prompt( prompt: str, schedule_time: Optional[str] = None, cron_expression: Optional[str] = None, description: Optional[str] = None, ctx: ToolContext | None = None, ) -> str: """Create a one-time or recurring scheduled prompt and arm its timer. Handler for the ``schedule_prompt`` tool. Validates the request (exactly one of *schedule_time* or *cron_expression* must be supplied), persists a new prompt record, and immediately spawns the matching background timer so the prompt will fire without waiting for the next polling tick. Pulls the target ``channel_id`` and ``platform`` off the context and Redis off ``ctx.redis``. For a one-time prompt it resolves the time via ``_parse_schedule_time``, stores a ``one_time``/``scheduled`` record with ``_save``, and launches ``_run_one_time``; for a cron prompt it validates the expression via ``_validate_cron``, stores a ``cron``/``active`` record, and launches ``_run_cron`` -- each id is registered in ``_running_tasks`` and the task started with ``asyncio.create_task``. All failures are returned as JSON error strings rather than raised. Called by the tool dispatcher (``ToolRegistry.execute_tool``) via this module's ``TOOLS`` entry named ``schedule_prompt`` -- no direct internal callers. Args: prompt: The prompt text to schedule (required). schedule_time: A one-time schedule expression in any format ``_parse_schedule_time`` accepts. Mutually exclusive with *cron_expression*. cron_expression: A cron expression for a recurring prompt. Mutually exclusive with *schedule_time*. description: Optional human-readable label stored on the record. ctx: The :class:`~tool_context.ToolContext`; supplies ``channel_id``, ``platform`` and ``redis``. Returns: str: A JSON string -- on success ``{"success": True, "message": ..., "prompt_id": str}``; on validation or runtime failure ``{"error": ...}``. """ cid = ctx.channel_id plat = ctx.platform if not prompt: return json.dumps( { "error": "prompt is required", } ) if not schedule_time and not cron_expression: return json.dumps( { "error": ( "Either schedule_time or " "cron_expression must be provided" ), } ) if schedule_time and cron_expression: return json.dumps( { "error": ("Cannot specify both schedule_time " "and cron_expression"), } ) r = getattr(ctx, "redis", None) if r is None: return json.dumps( { "error": "Redis not available", } ) pid = str(uuid.uuid4()) try: if schedule_time: dt = _parse_schedule_time(schedule_time) if not dt: return json.dumps( { "error": ("Invalid schedule_time format: " + schedule_time), } ) pd = { "id": pid, "platform": plat, "channel_id": cid, "prompt": prompt, "type": "one_time", "schedule_time": dt.isoformat(), "description": description or "", "created_at": (datetime.now().isoformat()), "status": "scheduled", } await _save(r, pd) _running_tasks.add(pid) asyncio.create_task( _run_one_time( r, pid, plat, cid, prompt, dt, ) ) return json.dumps( { "success": True, "message": ( "Prompt scheduled for " + dt.strftime("%Y-%m-%d %H:%M:%S") ), "prompt_id": pid, } ) else: if not _validate_cron(cron_expression): return json.dumps( { "error": ("Invalid cron expression: " + cron_expression), } ) pd = { "id": pid, "platform": plat, "channel_id": cid, "prompt": prompt, "type": "cron", "cron_expression": cron_expression, "description": description or "", "created_at": (datetime.now().isoformat()), "status": "active", "last_execution": None, } await _save(r, pd) _running_tasks.add(pid) asyncio.create_task( _run_cron( r, pid, plat, cid, prompt, cron_expression, ) ) return json.dumps( { "success": True, "message": ( "Cron prompt scheduled with " "expression: " + cron_expression ), "prompt_id": pid, } ) except Exception as e: logger.error("Error scheduling prompt: %s", e) return json.dumps( { "error": (f"Failed to schedule prompt: {e}"), } ) async def _list_scheduled_prompts( ctx: ToolContext | None = None, ) -> str: """List every scheduled prompt with its schedule and status. Handler for the ``list_scheduled_prompts`` tool. Enumerates all persisted prompts and projects each into a compact summary (id, channel, type, schedule_time/cron_expression, description, status, and the various timestamps) for display back to the model or user. Reads Redis off ``ctx.redis`` and loads records via ``_list_all``. Errors are returned as JSON error strings rather than raised. Called by the tool dispatcher (``ToolRegistry.execute_tool``) via this module's ``TOOLS`` entry named ``list_scheduled_prompts`` -- no direct internal callers. Args: ctx: The :class:`~tool_context.ToolContext`; ``ctx.redis`` supplies the client. Returns: str: An indented JSON string -- on success ``{"success": True, "count": int, "prompts": [...]}``; on failure ``{"error": ...}``. """ r = getattr(ctx, "redis", None) if r is None: return json.dumps( { "error": "Redis not available", } ) try: items = [] for pd in await _list_all(r): items.append( { "id": pd["id"], "channel_id": pd["channel_id"], "type": pd["type"], "schedule_time": pd.get( "schedule_time", ), "cron_expression": pd.get( "cron_expression", ), "description": pd.get( "description", "", ), "status": pd["status"], "created_at": pd["created_at"], "last_execution": pd.get( "last_execution", ), "executed_at": pd.get("executed_at"), } ) return json.dumps( { "success": True, "count": len(items), "prompts": items, }, indent=2, ) except Exception as e: logger.error( "Error listing scheduled prompts: %s", e, ) return json.dumps( { "error": f"Failed to list prompts: {e}", } ) async def _cancel_scheduled_prompt( prompt_id: str, ctx: ToolContext | None = None, ) -> str: """Cancel a scheduled prompt by id by marking it cancelled. Handler for the ``cancel_scheduled_prompt`` tool. Looks up the record and flips its status to ``cancelled`` with a ``cancelled_at`` timestamp; it does not delete the record (the retention sweep does that later). The running timer task is not torn down directly -- instead ``_run_cron`` and ``_run_one_time`` observe the changed status on their next Redis re-read and exit on their own. Reads Redis off ``ctx.redis``, loads the record via ``_load`` and persists the status change via ``_save``. A missing id and any error are returned as JSON error strings. Called by the tool dispatcher (``ToolRegistry.execute_tool``) via this module's ``TOOLS`` entry named ``cancel_scheduled_prompt`` -- no direct internal callers. Args: prompt_id: Id of the prompt to cancel. ctx: The :class:`~tool_context.ToolContext`; ``ctx.redis`` supplies the client. Returns: str: A JSON string -- on success ``{"success": True, "message": ..., "prompt_id": str}``; when not found or on error ``{"error": ...}``. """ r = getattr(ctx, "redis", None) if r is None: return json.dumps( { "error": "Redis not available", } ) try: pd = await _load(r, prompt_id) if not pd: return json.dumps( { "error": ("Scheduled prompt with ID " f"'{prompt_id}' not found"), } ) pd["status"] = "cancelled" pd["cancelled_at"] = datetime.now().isoformat() await _save(r, pd) return json.dumps( { "success": True, "message": (f"Scheduled prompt '{prompt_id}' " "has been cancelled"), "prompt_id": prompt_id, } ) except Exception as e: logger.error( "Error cancelling prompt: %s", e, ) return json.dumps( { "error": (f"Failed to cancel prompt: {e}"), } ) async def _send_immediate_prompt( prompt: str, channel_id: str = "", ctx: ToolContext | None = None, ) -> str: """Fire a prompt right now, bypassing scheduling and persistence. Handler for the ``send_immediate_prompt`` tool. Delivers the prompt straight away via ``_execute_prompt`` without creating or storing any scheduled record, for the case where the model wants to trigger a turn now rather than later. Uses the explicit *channel_id* when given, otherwise falls back to ``ctx.channel_id``, and reads the platform off ``ctx.platform``. Because it routes through ``_execute_prompt``, delivery depends on a bot runner having been installed via ``set_bot_runner`` -- under the current microservices layout that hook is unset, so this normally returns the runner-unavailable error. Failures are returned as JSON error strings. Called by the tool dispatcher (``ToolRegistry.execute_tool``) via this module's ``TOOLS`` entry named ``send_immediate_prompt`` -- no direct internal callers. Args: prompt: The prompt text to send (required). channel_id: Target channel id; defaults to ``ctx.channel_id`` when empty. ctx: The :class:`~tool_context.ToolContext`; supplies ``channel_id`` and ``platform``. Returns: str: A JSON string -- on success ``{"success": True, "message": ..., "channel_id": str}``; on failure ``{"error": ...}``. """ cid = channel_id or ctx.channel_id plat = ctx.platform if not prompt: return json.dumps( { "error": "prompt is required", } ) try: result = await _execute_prompt( plat, cid, prompt, ) return json.dumps( { "success": True, "message": result, "channel_id": cid, } ) except Exception as e: logger.error( "Error sending immediate prompt: %s", e, ) return json.dumps( { "error": f"Failed to send prompt: {e}", } ) async def _cleanup_expired_tool( days_to_keep: int = 30, ctx: ToolContext | None = None, ) -> str: """Tool-facing wrapper around the prompt retention sweep. Handler for the ``cleanup_expired_prompts`` tool. Pulls the Redis client off the context and delegates to the module-level ``cleanup_expired_prompts``, exposing the same purge of old executed/cancelled prompts as a tool the model can invoke on demand (the background scheduler also runs it independently). Reads Redis off ``ctx.redis`` and returns a Redis-unavailable error if it is absent. Called by the tool dispatcher (``ToolRegistry.execute_tool``) via this module's ``TOOLS`` entry named ``cleanup_expired_prompts`` -- no direct internal callers. Args: days_to_keep: Age threshold in days forwarded to ``cleanup_expired_prompts``. Defaults to ``30``. ctx: The :class:`~tool_context.ToolContext`; ``ctx.redis`` supplies the client. Returns: str: The JSON result string from ``cleanup_expired_prompts``, or a ``{"error": ...}`` JSON string when Redis is unavailable. """ r = getattr(ctx, "redis", None) if r is None: return json.dumps( { "error": "Redis not available", } ) return await cleanup_expired_prompts( r, days_to_keep=days_to_keep, ) # --------------------------------------------------------------- # Multi-tool registration # --------------------------------------------------------------- TOOLS = [ { "name": "schedule_prompt", "description": ( "Schedule a prompt to be sent to the " "LLM at a specified time or on a cron " "schedule. Supports relative (+30, +2:15), " "absolute (2025-09-19 14:30), time-of-day " "(14:30), and cron expressions " "(0 */2 * * *)." ), "parameters": { "type": "object", "properties": { "prompt": { "type": "string", "description": ("The prompt/message to send."), }, "schedule_time": { "type": "string", "description": ( "One-time execution time " "(see supported formats)." ), }, "cron_expression": { "type": "string", "description": ("Cron expression for " "recurring prompts."), }, "description": { "type": "string", "description": ("Optional description of the " "scheduled prompt."), }, }, "required": ["prompt"], }, "handler": _schedule_prompt, }, { "name": "list_scheduled_prompts", "description": ("List all currently scheduled prompts " "and their status."), "parameters": { "type": "object", "properties": {}, }, "handler": _list_scheduled_prompts, }, { "name": "cancel_scheduled_prompt", "description": ("Cancel a scheduled prompt by its ID."), "parameters": { "type": "object", "properties": { "prompt_id": { "type": "string", "description": ("The ID of the prompt " "to cancel."), }, }, "required": ["prompt_id"], }, "handler": _cancel_scheduled_prompt, }, { "name": "send_immediate_prompt", "description": ( "Send a prompt immediately to the LLM " "for execution (no scheduling)." ), "parameters": { "type": "object", "properties": { "prompt": { "type": "string", "description": ("The prompt/message to send."), }, "channel_id": { "type": "string", "description": ("Target channel " "(defaults to current)."), }, }, "required": ["prompt"], }, "handler": _send_immediate_prompt, }, { "name": "cleanup_expired_prompts", "description": ( "Clean up old executed and cancelled " "prompts from Redis storage." ), "parameters": { "type": "object", "properties": { "days_to_keep": { "type": "integer", "description": ( "Number of days to keep " "prompts before cleanup " "(default: 30)." ), }, }, }, "handler": _cleanup_expired_tool, }, ]