"""Wire a backgrounded task's output to any channel on any platform.
When the task finishes (success or failure), its result is automatically
delivered to the target channel — no polling required.
Works cross-platform: specify a ``platform`` name to target a different
platform than the one the command was issued from.
"""
from __future__ import annotations
import json
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from tool_context import ToolContext
TOOL_NAME = "redirect_task"
TOOL_NO_BACKGROUND = True
TOOL_DESCRIPTION = (
"Redirect a backgrounded task's output to a channel. When the task "
"finishes, its result is automatically sent to the specified channel "
"on the specified platform — no need to poll with check_task. "
"Defaults to the current channel and platform if omitted. "
"Works cross-platform (e.g. redirect a task started from Discord "
"into a Matrix room, or vice versa)."
)
TOOL_PARAMETERS = {
"type": "object",
"properties": {
"task_id": {
"type": "string",
"description": "The task_id returned by a backgrounded tool call.",
},
"channel_id": {
"type": "string",
"description": (
"Target channel/room ID for the output. "
"Defaults to the current channel."
),
},
"platform": {
"type": "string",
"description": (
"Target platform name (e.g. 'discord', 'matrix'). "
"Defaults to the current platform."
),
},
"max_chars": {
"type": "integer",
"description": (
"Maximum characters of output to deliver. "
"Defaults to ~9000 (5 messages). Set lower for "
"concise output."
),
},
},
"required": ["task_id"],
}
def _resolve_adapter(ctx: "ToolContext", platform_name: str):
"""Find the adapter matching *platform_name* from all running adapters."""
adapters = getattr(ctx, "all_adapters", None) or []
for adapter in adapters:
if getattr(adapter, "name", "") == platform_name:
return adapter
return None
[docs]
async def run(
task_id: str,
channel_id: str = "",
platform: str = "",
max_chars: int = 0,
ctx: "ToolContext | None" = None,
) -> str:
if ctx is None or ctx.task_manager is None:
return json.dumps({"success": False, "error": "Task manager not available."})
if not task_id:
return json.dumps({"success": False, "error": "task_id is required."})
target_channel = channel_id or ctx.channel_id
target_platform = platform or ctx.platform
if not target_channel:
return json.dumps({"success": False, "error": "No target channel specified."})
adapter = _resolve_adapter(ctx, target_platform)
if adapter is None:
available = [
getattr(a, "name", "?")
for a in (getattr(ctx, "all_adapters", None) or [])
]
return json.dumps({
"success": False,
"error": (
f"No adapter found for platform '{target_platform}'. "
f"Available: {available}"
),
})
err = ctx.task_manager.set_output_redirect(
task_id, target_channel, target_platform, adapter,
max_chars=max_chars,
)
if err:
return json.dumps({"success": False, "error": err})
return json.dumps({
"success": True,
"task_id": task_id,
"redirect_channel": target_channel,
"redirect_platform": target_platform,
"max_chars": max_chars or "default (~9000)",
"message": (
f"Task '{task_id}' output will be delivered to "
f"{target_platform}:{target_channel} when it finishes."
),
})