Source code for tools.scheduled_prompt

"""Cron-based scheduled prompt tools (v3)

Schedule prompts to be executed at specified times or cron
intervals. Prompts are executed directly via the bot runner's
message pipeline. Persists prompt metadata in Redis.

Module-level ``tick_scheduled_prompts`` and
``cleanup_expired_prompts`` are used by background_tasks.py
and accept a ``redis`` client directly.
"""

from __future__ import annotations

import asyncio
import json
import logging
import re
import uuid
from datetime import datetime, timedelta
from typing import (
    Any, Dict, List, Optional, TYPE_CHECKING,
)

from croniter import croniter

if TYPE_CHECKING:
    from tool_context import ToolContext

logger = logging.getLogger(__name__)

_bot_runner: Any = None


[docs] def set_bot_runner(runner: Any) -> None: """Store a reference to the :class:`BotRunner` for direct prompt execution (called once at startup from ``main.py``).""" global _bot_runner _bot_runner = runner
_PREFIX = "stargazer:scheduled_prompts:" _INDEX = "stargazer:scheduled_prompts:index" # --------------------------------------------------------------- # Internal helpers # --------------------------------------------------------------- def _pkey(prompt_id: str) -> str: """Internal helper: pkey. Args: prompt_id (str): The prompt id value. Returns: str: Result string. """ return f"{_PREFIX}{prompt_id}" async def _save(r, pd: Dict[str, Any]) -> None: """Internal helper: save. Args: r: The r value. pd (Dict[str, Any]): The pd value. """ pid = pd["id"] await r.set(_pkey(pid), json.dumps(pd)) await r.sadd(_INDEX, pid) async def _load(r, pid: str) -> Optional[Dict[str, Any]]: """Internal helper: load. Args: r: The r value. pid (str): The pid value. Returns: Optional[Dict[str, Any]]: The result. """ raw = await r.get(_pkey(pid)) return json.loads(raw) if raw else None async def _delete(r, pid: str) -> None: """Internal helper: delete. Args: r: The r value. pid (str): The pid value. """ await r.delete(_pkey(pid)) await r.srem(_INDEX, pid) async def _list_all(r) -> List[Dict[str, Any]]: """Internal helper: list all. Args: r: The r value. Returns: List[Dict[str, Any]]: The result. """ ids = await r.smembers(_INDEX) out: list[dict] = [] for pid in ids: pd = await _load(r, pid) if pd: out.append(pd) return out async def _execute_prompt( platform: str, channel_id: str, prompt: str, prompt_id: Optional[str] = None, ) -> str: """Execute a scheduled prompt directly via the bot runner's message pipeline, bypassing any HTTP webhook round-trip.""" if _bot_runner is None: logger.error( "Cannot execute scheduled prompt %s: " "bot runner not set", prompt_id, ) return "Error: bot runner not available" try: event_data = { "type": "scheduled_prompt_execution", "prompt": prompt, "prompt_id": prompt_id, } await _bot_runner.handle_webhook( platform, channel_id, event_data, ) return "Prompt sent successfully" except Exception as e: logger.error( "Error executing scheduled prompt %s: %s", prompt_id, e, ) return f"Error: {e}" def _parse_schedule_time( schedule_time: str, ) -> Optional[datetime]: """Internal helper: parse schedule time. Args: schedule_time (str): The schedule time value. Returns: Optional[datetime]: The result. """ s = (schedule_time or "").strip() if not s: return None now = datetime.now() if s.startswith("+"): p = s[1:].strip() if ":" in p: parts = p.split(":", 1) try: h = int(parts[0].strip() or 0) m = int(parts[1].strip() or 0) return now + timedelta( hours=h, minutes=m, ) except Exception: pass m = re.match( r"^(?:(\d+)\s*h(?:ours?)?)?" r"\s*(?:(\d+)\s*m(?:in(?:utes?)?)?)?$", p, re.IGNORECASE, ) if m and (m.group(1) or m.group(2)): return now + timedelta( hours=int(m.group(1) or 0), minutes=int(m.group(2) or 0), ) if p.isdigit(): return now + timedelta(minutes=int(p)) lower = s.lower() if lower.startswith("in "): hours = minutes = 0 mh = re.search( r"(\d+)\s*h(?:ours?|rs?)?", lower, ) if mh: hours = int(mh.group(1)) mm = re.search( r"(\d+)\s*m(?:in(?:utes?)?)?", lower, ) if mm: minutes = int(mm.group(1)) if not hours and not minutes: mn = re.search(r"in\s+(\d+)\s*$", lower) if mn: minutes = int(mn.group(1)) if hours or minutes: return now + timedelta( hours=hours, minutes=minutes, ) if re.match(r"^\d{2}:\d{2}(?::\d{2})?$", s): fmt = ( "%H:%M:%S" if s.count(":") == 2 else "%H:%M" ) pt = datetime.strptime(s, fmt).time() dt = datetime.combine(now.date(), pt) if dt < now: dt += timedelta(days=1) return dt iso = s.replace(" ", "T") if iso.endswith("Z"): iso = iso[:-1] + "+00:00" try: dt = datetime.fromisoformat(iso) if dt.tzinfo is not None: dt = dt.astimezone().replace(tzinfo=None) return dt except Exception: pass for fmt in ( "%Y-%m-%d %H:%M:%S", "%Y-%m-%d %H:%M", ): try: return datetime.strptime(s, fmt) except ValueError: continue return None def _validate_cron(expr: str) -> bool: """Internal helper: validate cron. Args: expr (str): The expr value. Returns: bool: True on success, False otherwise. """ try: croniter(expr) return True except Exception: return False # --------------------------------------------------------------- # Background asyncio task runners # --------------------------------------------------------------- async def _run_cron( r, pid: str, platform: str, cid: str, prompt: str, cron_expr: str, ) -> None: """Run a cron-scheduled prompt in a long-lived asyncio task.""" try: cron = croniter(cron_expr, datetime.now()) while True: pd = await _load(r, pid) if not pd or pd.get("status") not in ( "active", "scheduled", ): break nxt = cron.get_next(datetime) wait = (nxt - datetime.now()).total_seconds() if wait > 0: await asyncio.sleep(wait) pd = await _load(r, pid) if not pd or pd.get("status") == "cancelled": break await _execute_prompt( platform, cid, prompt, pid, ) pd = await _load(r, pid) if pd: pd["last_execution"] = ( datetime.now().isoformat() ) await _save(r, pd) except Exception as e: logger.error( "Error in cron scheduler for %s: %s", pid, e, ) async def _run_one_time( r, pid: str, platform: str, cid: str, prompt: str, sched: datetime, ) -> None: """Run a one-time scheduled prompt after sleeping until due.""" try: wait = (sched - datetime.now()).total_seconds() if wait > 0: await asyncio.sleep(wait) pd = await _load(r, pid) if pd and pd.get("status") != "cancelled": await _execute_prompt( platform, cid, prompt, pid, ) pd["status"] = "executed" pd["executed_at"] = ( datetime.now().isoformat() ) await _save(r, pd) except Exception as e: logger.error( "Error executing prompt %s: %s", pid, e, ) # --------------------------------------------------------------- # Module-level functions (background_tasks.py) # ---------------------------------------------------------------
[docs] async def tick_scheduled_prompts(redis) -> None: """Check for due prompts and fire them.""" try: now = datetime.now() for pd in await _list_all(redis): status = pd.get("status") if status not in ("scheduled", "active"): continue plat = pd.get("platform") or "discord" if pd["type"] == "one_time" and ( status == "scheduled" ): st = pd.get("schedule_time") if not st: continue try: if datetime.fromisoformat(st) <= now: asyncio.create_task( _execute_prompt( plat, pd["channel_id"], pd["prompt"], pd["id"], ) ) pd["status"] = "executed" pd["executed_at"] = ( now.isoformat() ) await _save(redis, pd) except Exception as e: logger.error( "Schedule parse error for " "%s: %s", pd["id"], e, ) elif pd["type"] == "cron": ce = pd.get("cron_expression") if not ce: continue try: cron = croniter(ce, now) prev = cron.get_prev(datetime) last = pd.get("last_execution") execute = False if not last: ca = pd.get("created_at") cdt = ( datetime.fromisoformat(ca) if ca else now ) execute = prev >= cdt else: execute = ( datetime.fromisoformat(last) < prev ) if execute: asyncio.create_task( _execute_prompt( plat, pd["channel_id"], pd["prompt"], pd["id"], ) ) pd["last_execution"] = ( now.isoformat() ) await _save(redis, pd) except Exception as e: logger.error( "Cron error for %s: %s", pd["id"], e, ) except Exception as e: logger.error( "Scheduled prompt tick error: %s", e, )
[docs] async def cleanup_expired_prompts( redis, *, days_to_keep: int = 30, ) -> str: """Remove old executed/cancelled prompts.""" try: cutoff = datetime.now() - timedelta( days=days_to_keep, ) cleaned = 0 for pd in await _list_all(redis): if pd.get("status") not in ( "executed", "cancelled", ): continue ca = pd.get("created_at") if not ca: continue try: if datetime.fromisoformat(ca) < cutoff: await _delete(redis, pd["id"]) cleaned += 1 except Exception as e: logger.warning( "Parse error for prompt %s: %s", pd["id"], e, ) return json.dumps({ "success": True, "message": ( f"Cleaned up {cleaned} expired prompts" ), "days_kept": days_to_keep, }) except Exception as e: logger.error("Prompt cleanup error: %s", e) return json.dumps({ "error": f"Failed to cleanup prompts: {e}", })
# --------------------------------------------------------------- # Tool handlers # --------------------------------------------------------------- async def _schedule_prompt( prompt: str, schedule_time: Optional[str] = None, cron_expression: Optional[str] = None, description: Optional[str] = None, ctx: ToolContext | None = None, ) -> str: """Internal helper: schedule prompt. Args: prompt (str): The prompt value. schedule_time (Optional[str]): The schedule time value. cron_expression (Optional[str]): The cron expression value. description (Optional[str]): Human-readable description. ctx (ToolContext | None): Tool execution context providing access to bot internals. Returns: str: Result string. """ cid = ctx.channel_id plat = ctx.platform if not prompt: return json.dumps({ "error": "prompt is required", }) if not schedule_time and not cron_expression: return json.dumps({ "error": ( "Either schedule_time or " "cron_expression must be provided" ), }) if schedule_time and cron_expression: return json.dumps({ "error": ( "Cannot specify both schedule_time " "and cron_expression" ), }) r = getattr(ctx, "redis", None) if r is None: return json.dumps({ "error": "Redis not available", }) pid = str(uuid.uuid4()) try: if schedule_time: dt = _parse_schedule_time(schedule_time) if not dt: return json.dumps({ "error": ( "Invalid schedule_time format: " + schedule_time ), }) pd = { "id": pid, "platform": plat, "channel_id": cid, "prompt": prompt, "type": "one_time", "schedule_time": dt.isoformat(), "description": description or "", "created_at": ( datetime.now().isoformat() ), "status": "scheduled", } await _save(r, pd) asyncio.create_task( _run_one_time( r, pid, plat, cid, prompt, dt, ) ) return json.dumps({ "success": True, "message": ( "Prompt scheduled for " + dt.strftime("%Y-%m-%d %H:%M:%S") ), "prompt_id": pid, }) else: if not _validate_cron(cron_expression): return json.dumps({ "error": ( "Invalid cron expression: " + cron_expression ), }) pd = { "id": pid, "platform": plat, "channel_id": cid, "prompt": prompt, "type": "cron", "cron_expression": cron_expression, "description": description or "", "created_at": ( datetime.now().isoformat() ), "status": "active", "last_execution": None, } await _save(r, pd) asyncio.create_task( _run_cron( r, pid, plat, cid, prompt, cron_expression, ) ) return json.dumps({ "success": True, "message": ( "Cron prompt scheduled with " "expression: " + cron_expression ), "prompt_id": pid, }) except Exception as e: logger.error("Error scheduling prompt: %s", e) return json.dumps({ "error": ( f"Failed to schedule prompt: {e}" ), }) async def _list_scheduled_prompts( ctx: ToolContext | None = None, ) -> str: """Internal helper: list scheduled prompts. Args: ctx (ToolContext | None): Tool execution context providing access to bot internals. Returns: str: Result string. """ r = getattr(ctx, "redis", None) if r is None: return json.dumps({ "error": "Redis not available", }) try: items = [] for pd in await _list_all(r): items.append({ "id": pd["id"], "channel_id": pd["channel_id"], "type": pd["type"], "schedule_time": pd.get( "schedule_time", ), "cron_expression": pd.get( "cron_expression", ), "description": pd.get( "description", "", ), "status": pd["status"], "created_at": pd["created_at"], "last_execution": pd.get( "last_execution", ), "executed_at": pd.get("executed_at"), }) return json.dumps({ "success": True, "count": len(items), "prompts": items, }, indent=2) except Exception as e: logger.error( "Error listing scheduled prompts: %s", e, ) return json.dumps({ "error": f"Failed to list prompts: {e}", }) async def _cancel_scheduled_prompt( prompt_id: str, ctx: ToolContext | None = None, ) -> str: """Internal helper: cancel scheduled prompt. Args: prompt_id (str): The prompt id value. ctx (ToolContext | None): Tool execution context providing access to bot internals. Returns: str: Result string. """ r = getattr(ctx, "redis", None) if r is None: return json.dumps({ "error": "Redis not available", }) try: pd = await _load(r, prompt_id) if not pd: return json.dumps({ "error": ( "Scheduled prompt with ID " f"'{prompt_id}' not found" ), }) pd["status"] = "cancelled" pd["cancelled_at"] = ( datetime.now().isoformat() ) await _save(r, pd) return json.dumps({ "success": True, "message": ( f"Scheduled prompt '{prompt_id}' " "has been cancelled" ), "prompt_id": prompt_id, }) except Exception as e: logger.error( "Error cancelling prompt: %s", e, ) return json.dumps({ "error": ( f"Failed to cancel prompt: {e}" ), }) async def _send_immediate_prompt( prompt: str, channel_id: str = "", ctx: ToolContext | None = None, ) -> str: """Execute a prompt immediately (no scheduling).""" cid = channel_id or ctx.channel_id plat = ctx.platform if not prompt: return json.dumps({ "error": "prompt is required", }) try: result = await _execute_prompt( plat, cid, prompt, ) return json.dumps({ "success": True, "message": result, "channel_id": cid, }) except Exception as e: logger.error( "Error sending immediate prompt: %s", e, ) return json.dumps({ "error": f"Failed to send prompt: {e}", }) async def _cleanup_expired_tool( days_to_keep: int = 30, ctx: ToolContext | None = None, ) -> str: """Internal helper: cleanup expired tool. Args: days_to_keep (int): The days to keep value. ctx (ToolContext | None): Tool execution context providing access to bot internals. Returns: str: Result string. """ r = getattr(ctx, "redis", None) if r is None: return json.dumps({ "error": "Redis not available", }) return await cleanup_expired_prompts( r, days_to_keep=days_to_keep, ) # --------------------------------------------------------------- # Multi-tool registration # --------------------------------------------------------------- TOOLS = [ { "name": "schedule_prompt", "description": ( "Schedule a prompt to be sent to the " "LLM at a specified time or on a cron " "schedule. Supports relative (+30, +2:15), " "absolute (2025-09-19 14:30), time-of-day " "(14:30), and cron expressions " "(0 */2 * * *)." ), "parameters": { "type": "object", "properties": { "prompt": { "type": "string", "description": ( "The prompt/message to send." ), }, "schedule_time": { "type": "string", "description": ( "One-time execution time " "(see supported formats)." ), }, "cron_expression": { "type": "string", "description": ( "Cron expression for " "recurring prompts." ), }, "description": { "type": "string", "description": ( "Optional description of the " "scheduled prompt." ), }, }, "required": ["prompt"], }, "handler": _schedule_prompt, }, { "name": "list_scheduled_prompts", "description": ( "List all currently scheduled prompts " "and their status." ), "parameters": { "type": "object", "properties": {}, }, "handler": _list_scheduled_prompts, }, { "name": "cancel_scheduled_prompt", "description": ( "Cancel a scheduled prompt by its ID." ), "parameters": { "type": "object", "properties": { "prompt_id": { "type": "string", "description": ( "The ID of the prompt " "to cancel." ), }, }, "required": ["prompt_id"], }, "handler": _cancel_scheduled_prompt, }, { "name": "send_immediate_prompt", "description": ( "Send a prompt immediately to the LLM " "for execution (no scheduling)." ), "parameters": { "type": "object", "properties": { "prompt": { "type": "string", "description": ( "The prompt/message to send." ), }, "channel_id": { "type": "string", "description": ( "Target channel " "(defaults to current)." ), }, }, "required": ["prompt"], }, "handler": _send_immediate_prompt, }, { "name": "cleanup_expired_prompts", "description": ( "Clean up old executed and cancelled " "prompts from Redis storage." ), "parameters": { "type": "object", "properties": { "days_to_keep": { "type": "integer", "description": ( "Number of days to keep " "prompts before cleanup " "(default: 30)." ), }, }, }, "handler": _cleanup_expired_tool, }, ]