"""Search or fetch recent messages from a user's DM history."""
from __future__ import annotations
import json
import logging
from datetime import datetime, timezone
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from tool_context import ToolContext
logger = logging.getLogger(__name__)
TOOL_NAME = "search_dm_history"
TOOL_DESCRIPTION = (
"Search or fetch recent messages from the direct message history "
"with the user you are currently talking to, OR with a specific user "
"if target_user_id is provided. Use mode 'search' with "
"a query for semantic search, or mode 'recent' to retrieve the most "
"recent DM messages. Highly useful for webhooks or cron jobs that need to check DMs."
)
TOOL_PARAMETERS = {
"type": "object",
"properties": {
"mode": {
"type": "string",
"enum": ["search", "recent"],
"description": "search = semantic query; recent = latest messages",
},
"query": {
"type": "string",
"description": "Search query text (required when mode is 'search')",
},
"limit": {
"type": "integer",
"description": "Maximum number of results to return (default 10, max 50)",
},
"target_user_id": {
"type": "string",
"description": "Optional user ID to fetch DMs for. If omitted, uses the current user.",
},
},
"required": ["mode"],
}
def _format_timestamp(ts: float) -> str:
"""Internal helper: format timestamp.
Args:
ts (float): The ts value.
Returns:
str: Result string.
"""
dt = datetime.fromtimestamp(ts, tz=timezone.utc)
return dt.strftime("%Y-%m-%d %H:%M:%S UTC")
[docs]
async def run(
mode: str = "recent",
query: str = "",
limit: int = 10,
target_user_id: str = "",
*,
ctx: ToolContext | None = None,
) -> str:
"""Execute this tool and return the result.
Args:
mode (str): The mode value.
query (str): Search query or input string.
limit (int): Maximum number of items.
target_user_id (str): Optional target user ID.
Returns:
str: Result string.
"""
if ctx is None or ctx.message_cache is None:
return json.dumps({"error": "Message cache not available"})
if target_user_id and target_user_id != ctx.user_id:
from tools.alter_privileges import has_privilege, PRIVILEGES
if not await has_privilege(ctx.redis, ctx.user_id, PRIVILEGES["READ_DM"], ctx.config):
return json.dumps({
"error": "This user does not have the READ_DM privilege required to query another user's DM history."
})
user_id = target_user_id if target_user_id else ctx.user_id
if not user_id:
return json.dumps({"error": "No user context available and no target_user_id provided"})
limit = max(1, min(limit, 50))
mc = ctx.message_cache
platform = ctx.platform
try:
if mode == "search":
if not query:
return json.dumps({"error": "query is required when mode is 'search'"})
results = await mc.search_messages(
query=query,
limit=limit,
platform=platform or None,
user_id=user_id,
)
return json.dumps({
"mode": "search",
"query": query,
"user_id": user_id,
"num_results": len(results),
"results": [
{
"text": r["text"],
"user_name": r["user_name"],
"timestamp": _format_timestamp(r["timestamp"]),
"similarity": r["similarity"],
}
for r in results
],
}, default=str)
messages = await mc.get_recent_for_user(
platform=platform,
user_id=user_id,
limit=limit,
)
return json.dumps({
"mode": "recent",
"user_id": user_id,
"num_results": len(messages),
"results": [
{
"text": msg.text,
"user_name": msg.user_name,
"timestamp": _format_timestamp(msg.timestamp),
}
for msg in messages
],
}, default=str)
except Exception as e:
logger.error("DM history search failed: %s", e, exc_info=True)
return json.dumps({
"error": str(e),
"num_results": 0,
"results": [],
})