Source code for tools.short_term_notes

"""Per-channel ephemeral short-term notes (v3)

Notes are stored in a Redis sorted set with configurable TTL.
They are internal/secret and auto-injected into context.
"""

from __future__ import annotations

import jsonutil as json
import logging
from datetime import datetime, timezone
from typing import TYPE_CHECKING

if TYPE_CHECKING:
    from tool_context import ToolContext

logger = logging.getLogger(__name__)

NOTES_KEY_PREFIX = "stargazer:notes:channel:"

TTL_PRESETS = {
    "1hour": 3600,
    "1day": 86400,
    "1week": 604800,
}


async def _apply_note_key_expire(
    redis_client,
    note_key: str,
    ttl_seconds: int,
    *,
    merge_key_ttl: bool,
) -> None:
    """Set or extend the Redis TTL on a channel's short-term-notes sorted set.

    Applies ``EXPIRE`` to the zset that holds one channel's notes so the whole
    note collection self-expires. When ``merge_key_ttl`` is false it simply
    overwrites the TTL; when true it preserves the longest-lived note by reading
    the current ``TTL`` and keeping the larger of the existing and requested
    durations (so appending a short-lived note never shortens a longer-lived
    one).

    Touches only the single notes key via the supplied async Redis client
    (``EXPIRE`` and, in merge mode, ``TTL``); it does no other I/O. Called by
    :func:`store_short_term_note_for_channel` and the ``write_short_term_note``
    tool handler :func:`_write_short_term_note` right after they add a note to
    the zset.

    Args:
        redis_client: Async Redis client used for the ``EXPIRE``/``TTL`` calls.
        note_key (str): The fully-qualified ``stargazer:notes:channel:`` key.
        ttl_seconds (int): Desired time-to-live in seconds.
        merge_key_ttl (bool): When true, keep whichever TTL is longer (the
            existing one or ``ttl_seconds``) instead of overwriting.
    """
    if not merge_key_ttl:
        await redis_client.expire(note_key, ttl_seconds)
        return
    cur = await redis_client.ttl(note_key)
    if cur is None or cur < 0:
        await redis_client.expire(note_key, ttl_seconds)
    else:
        await redis_client.expire(note_key, max(int(cur), ttl_seconds))


[docs] async def store_short_term_note_for_channel( redis_client, channel_id: str, note_content: str, ttl_preset: str = "1day", *, merge_key_ttl: bool = False, ) -> tuple[str, str] | None: """Append one short-term note for *channel_id* using the standard Redis encoding. Returns ``(member_json, note_id)`` for optional precise removal via :func:`remove_short_term_note_member`, or ``None`` when *ttl_preset* is invalid. """ if ttl_preset not in TTL_PRESETS: return None ttl_seconds = TTL_PRESETS[ttl_preset] cid = str(channel_id) ts = datetime.now(timezone.utc) note_data = { "content": note_content, "channel_id": cid, "timestamp": ts.isoformat(), "timestamp_unix": int(ts.timestamp()), "ttl_preset": ttl_preset, "ttl_seconds": ttl_seconds, "message_id": f"note_{int(ts.timestamp() * 1000000)}", "is_short_term_note": True, } member = json.dumps(note_data) note_key = f"{NOTES_KEY_PREFIX}{cid}" score = float(note_data["timestamp_unix"]) await redis_client.zadd(note_key, {member: score}) await _apply_note_key_expire( redis_client, note_key, ttl_seconds, merge_key_ttl=merge_key_ttl, ) return member, note_data["message_id"]
[docs] async def remove_short_term_note_member( redis_client, channel_id: str, member_json: str, ) -> None: """Remove one specific short-term note from a channel by its exact member. Deletes a single note whose canonical JSON string (the value returned alongside the note id by :func:`store_short_term_note_for_channel`) is used as the sorted-set member, enabling precise removal of just that note rather than clearing the channel. If removing it empties the zset, the key is deleted outright so no empty container lingers in Redis. Operates on the channel notes key via the async Redis client (``ZREM``, then ``ZCARD`` and a conditional ``DELETE``); no other state is touched. Defined for callers that stored a note and kept the member string for later cleanup; no in-repo callers invoke it directly at present. Args: redis_client: Async Redis client used for the zset operations. channel_id (str): Channel whose notes key to operate on (coerced to str). member_json (str): The exact JSON member string to remove from the zset. """ note_key = f"{NOTES_KEY_PREFIX}{str(channel_id)}" await redis_client.zrem(note_key, member_json) if await redis_client.zcard(note_key) == 0: await redis_client.delete(note_key)
[docs] async def format_channel_short_term_notes_for_prompt( redis_client, channel_id: str, ) -> str: """Render a channel's live short-term notes as prompt-ready text. Reads every unexpired note for the channel and formats each one as a ``[Short-term Note]:`` line so the result can be dropped straight into an LLM prompt. Notes with empty or missing content, and members that fail to parse as JSON, are skipped; blocks are separated by blank lines and an empty string is returned when the channel has no notes. Reads the channel notes zset via the async Redis client (``ZRANGE`` with scores) and performs no writes. Called by :func:`tools.admin_whisper._admin_whisper` to fold any existing channel notes into the secret admin-whisper prompt before the LLM call. Args: redis_client: Async Redis client used to read the notes zset. channel_id (str): Channel whose notes to format (coerced to str). Returns: str: Newline-separated ``[Short-term Note]:`` lines, or ``""`` when the channel has no active notes. """ note_key = f"{NOTES_KEY_PREFIX}{str(channel_id)}" raw = await redis_client.zrange(note_key, 0, -1, withscores=True) if not raw: return "" lines: list[str] = [] for note_json, _score in raw: try: nd = json.loads(note_json) content = nd.get("content") if content is not None and str(content).strip(): lines.append(f"[Short-term Note]: {content}") except json.JSONDecodeError: continue return "\n\n".join(lines)
async def _redis(ctx): """Internal helper: redis. Args: ctx: Tool execution context providing access to bot internals. """ r = getattr(ctx, "redis", None) if r is None: raise RuntimeError("Redis not available") return r # --------------------------------------------------------------- # Tool handlers # --------------------------------------------------------------- async def _write_short_term_note( note_content: str, ttl_preset: str = "1day", ctx: ToolContext | None = None, ) -> str: """Internal helper: write short term note. Args: note_content (str): The note content value. ttl_preset (str): The ttl preset value. ctx (ToolContext | None): Tool execution context providing access to bot internals. Returns: str: Result string. """ if ttl_preset not in TTL_PRESETS: return json.dumps( { "success": False, "error": ( "Invalid TTL preset. Must be one of: " + ", ".join(TTL_PRESETS.keys()) ), } ) ttl_seconds = TTL_PRESETS[ttl_preset] r = await _redis(ctx) cid = ctx.channel_id ts = datetime.now(timezone.utc) note_data = { "content": note_content, "channel_id": str(cid), "timestamp": ts.isoformat(), "timestamp_unix": int(ts.timestamp()), "ttl_preset": ttl_preset, "ttl_seconds": ttl_seconds, "message_id": (f"note_{int(ts.timestamp() * 1000000)}"), "is_short_term_note": True, } note_key = f"{NOTES_KEY_PREFIX}{cid}" score = float(note_data["timestamp_unix"]) member = json.dumps(note_data) await r.zadd(note_key, {member: score}) await _apply_note_key_expire( r, note_key, ttl_seconds, merge_key_ttl=False, ) return json.dumps( { "success": True, "message": (f"Note stored successfully with " f"{ttl_preset} TTL"), "note_id": note_data["message_id"], "expires_in_seconds": ttl_seconds, } ) async def _read_short_term_notes( ctx: ToolContext | None = None, ) -> str: """Internal helper: read short term notes. Args: ctx (ToolContext | None): Tool execution context providing access to bot internals. Returns: str: Result string. """ r = await _redis(ctx) cid = ctx.channel_id note_key = f"{NOTES_KEY_PREFIX}{cid}" raw = await r.zrange(note_key, 0, -1, withscores=True) if not raw: return json.dumps( { "success": True, "notes": [], "count": 0, "message": ("No active notes found for this channel"), } ) notes = [] for note_json, _score in raw: try: nd = json.loads(note_json) notes.append( { "content": nd.get("content"), "timestamp": nd.get("timestamp"), "ttl_preset": nd.get("ttl_preset"), "note_id": nd.get("message_id"), } ) except json.JSONDecodeError: continue return json.dumps( { "success": True, "notes": notes, "count": len(notes), } ) async def _clear_short_term_notes( ctx: ToolContext | None = None, ) -> str: """Internal helper: clear short term notes. Args: ctx (ToolContext | None): Tool execution context providing access to bot internals. Returns: str: Result string. """ r = await _redis(ctx) cid = ctx.channel_id note_key = f"{NOTES_KEY_PREFIX}{cid}" deleted = await r.delete(note_key) msg = ( f"Cleared all notes for channel {cid}" if deleted else "No notes found to clear" ) return json.dumps( { "success": True, "deleted": deleted > 0, "message": msg, } ) async def _dump_short_term_notes( ctx: ToolContext | None = None, ) -> str: """Dump every active short-term note across all channels (privileged). Backs the ``dump_short_term_notes`` tool, a debugging/audit affordance that reveals the otherwise-secret per-channel notes for the whole bot. Because that is sensitive, it fails closed: callers lacking the ``DUMP_NOTES`` privilege get an error JSON and no data. Reads ``redis``, ``user_id``, and ``config`` off the tool context, gates access via :func:`tools.alter_privileges.has_privilege` (which consults the privilege state in Redis), then scans the notes keyspace (``KEYS stargazer:notes:channel:*`` and ``ZRANGE`` per key) to collect notes grouped by channel. Invoked by the central tool dispatcher in ``tools/__init__.py`` (registered as the ``dump_short_term_notes`` handler in this module's ``TOOLS`` list); no direct internal callers. Args: ctx (ToolContext | None): Tool execution context supplying ``redis``, ``user_id``, and ``config``. Returns: str: A JSON string with ``total_notes`` and a ``channels`` array of per-channel notes, or a JSON error object when the user lacks ``DUMP_NOTES``. """ from tools.alter_privileges import has_privilege, PRIVILEGES redis_client = getattr(ctx, "redis", None) user_id = getattr(ctx, "user_id", "") config = getattr(ctx, "config", None) allowed = await has_privilege( redis_client, user_id, PRIVILEGES["DUMP_NOTES"], config, ) if not allowed: return json.dumps( { "success": False, "error": ( "The user does not have the " "DUMP_NOTES privilege. This tool " "requires DUMP_NOTES to be granted " "to the invoking user." ), } ) r = await _redis(ctx) keys = await r.keys(f"{NOTES_KEY_PREFIX}*") all_channels: list[dict] = [] total = 0 for key in keys: key_str = key if isinstance(key, str) else key.decode() channel_id = key_str.replace( NOTES_KEY_PREFIX, "", ) raw = await r.zrange( key, 0, -1, withscores=True, ) notes = [] for note_json, _score in raw: try: nd = json.loads(note_json) notes.append( { "content": nd.get("content"), "timestamp": nd.get("timestamp"), "ttl_preset": nd.get("ttl_preset"), "note_id": nd.get("message_id"), } ) except json.JSONDecodeError: continue if notes: all_channels.append( { "channel_id": channel_id, "notes": notes, "count": len(notes), } ) total += len(notes) return json.dumps( { "success": True, "total_notes": total, "channels": all_channels, } ) # --------------------------------------------------------------- # Multi-tool registration # --------------------------------------------------------------- TOOLS = [ { "name": "write_short_term_note", "description": ( "Write an internal short-term note for " "the current channel with configurable TTL. " "Notes are SECRET and INTERNAL - NEVER " "reveal them to non-admin users. " "Use for tracking temporary information, " "reminders, or context. " "TTL options: '1hour', '1day', '1week'. " "Notes are automatically injected into " "context as second-to-last messages." ), "parameters": { "type": "object", "properties": { "note_content": { "type": "string", "description": ("The content of the note."), }, "ttl_preset": { "type": "string", "enum": [ "1hour", "1day", "1week", ], "description": ("Time-to-live preset " "(default: 1day)."), }, }, "required": ["note_content"], }, "handler": _write_short_term_note, }, { "name": "read_short_term_notes", "description": ( "Read all active short-term notes for the " "current channel. Returns notes that have " "not expired yet, ordered by timestamp. " "These notes are SECRET and INTERNAL." ), "parameters": { "type": "object", "properties": {}, }, "handler": _read_short_term_notes, }, { "name": "clear_short_term_notes", "description": ( "Clear all short-term notes for the current " "channel. Permanently deletes all notes. " "Use when notes are no longer needed." ), "parameters": { "type": "object", "properties": {}, }, "handler": _clear_short_term_notes, }, { "name": "dump_short_term_notes", "description": ( "Dump ALL short-term notes across ALL " "channels. Returns every active note " "grouped by channel." ), "parameters": { "type": "object", "properties": {}, }, "handler": _dump_short_term_notes, }, ]