Source code for tools.dm_history

"""Search or fetch recent messages from a user's DM history."""

from __future__ import annotations

import json
import logging
from datetime import datetime, timezone
from typing import TYPE_CHECKING

if TYPE_CHECKING:
    from tool_context import ToolContext

logger = logging.getLogger(__name__)

TOOL_NAME = "search_dm_history"
TOOL_DESCRIPTION = (
    "Search or fetch recent messages from the direct message history "
    "with the user you are currently talking to, OR with a specific user "
    "if target_user_id is provided. Use mode 'search' with "
    "a query for semantic search, or mode 'recent' to retrieve the most "
    "recent DM messages. Highly useful for webhooks or cron jobs that need to check DMs."
)
TOOL_PARAMETERS = {
    "type": "object",
    "properties": {
        "mode": {
            "type": "string",
            "enum": ["search", "recent"],
            "description": "search = semantic query; recent = latest messages",
        },
        "query": {
            "type": "string",
            "description": "Search query text (required when mode is 'search')",
        },
        "limit": {
            "type": "integer",
            "description": "Maximum number of results to return (default 10, max 50)",
        },
        "target_user_id": {
            "type": "string",
            "description": "Optional user ID to fetch DMs for. If omitted, uses the current user.",
        },
    },
    "required": ["mode"],
}


def _format_timestamp(ts: float) -> str:
    """Internal helper: format timestamp.

        Args:
            ts (float): The ts value.

        Returns:
            str: Result string.
        """
    dt = datetime.fromtimestamp(ts, tz=timezone.utc)
    return dt.strftime("%Y-%m-%d %H:%M:%S UTC")


[docs] async def run( mode: str = "recent", query: str = "", limit: int = 10, target_user_id: str = "", *, ctx: ToolContext | None = None, ) -> str: """Execute this tool and return the result. Args: mode (str): The mode value. query (str): Search query or input string. limit (int): Maximum number of items. target_user_id (str): Optional target user ID. Returns: str: Result string. """ if ctx is None or ctx.message_cache is None: return json.dumps({"error": "Message cache not available"}) if target_user_id and target_user_id != ctx.user_id: from tools.alter_privileges import has_privilege, PRIVILEGES if not await has_privilege(ctx.redis, ctx.user_id, PRIVILEGES["READ_DM"], ctx.config): return json.dumps({ "error": "This user does not have the READ_DM privilege required to query another user's DM history." }) user_id = target_user_id if target_user_id else ctx.user_id if not user_id: return json.dumps({"error": "No user context available and no target_user_id provided"}) limit = max(1, min(limit, 50)) mc = ctx.message_cache platform = ctx.platform try: if mode == "search": if not query: return json.dumps({"error": "query is required when mode is 'search'"}) results = await mc.search_messages( query=query, limit=limit, platform=platform or None, user_id=user_id, ) return json.dumps({ "mode": "search", "query": query, "user_id": user_id, "num_results": len(results), "results": [ { "text": r["text"], "user_name": r["user_name"], "timestamp": _format_timestamp(r["timestamp"]), "similarity": r["similarity"], } for r in results ], }, default=str) messages = await mc.get_recent_for_user( platform=platform, user_id=user_id, limit=limit, ) return json.dumps({ "mode": "recent", "user_id": user_id, "num_results": len(messages), "results": [ { "text": msg.text, "user_name": msg.user_name, "timestamp": _format_timestamp(msg.timestamp), } for msg in messages ], }, default=str) except Exception as e: logger.error("DM history search failed: %s", e, exc_info=True) return json.dumps({ "error": str(e), "num_results": 0, "results": [], })