"""Wire a backgrounded task's output to any channel on any platform.
When the task finishes (success or failure), its result is automatically
delivered to the target channel — no polling required.
Works cross-platform: specify a ``platform`` name to target a different
platform than the one the command was issued from.
"""
from __future__ import annotations
import jsonutil as json
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from tool_context import ToolContext
TOOL_NAME = "redirect_task"
TOOL_NO_BACKGROUND = True
TOOL_DESCRIPTION = (
"Redirect a backgrounded task's output to a channel. When the task "
"finishes, its result is automatically sent to the specified channel "
"on the specified platform — no need to poll with check_task. "
"Defaults to the current channel and platform if omitted. "
"Works cross-platform (e.g. redirect a task started from Discord "
"into a Matrix room, or vice versa)."
)
TOOL_PARAMETERS = {
"type": "object",
"properties": {
"task_id": {
"type": "string",
"description": "The task_id returned by a backgrounded tool call.",
},
"channel_id": {
"type": "string",
"description": (
"Target channel/room ID for the output. "
"Defaults to the current channel."
),
},
"platform": {
"type": "string",
"description": (
"Target platform name (e.g. 'discord', 'matrix'). "
"Defaults to the current platform."
),
},
"max_chars": {
"type": "integer",
"description": (
"Maximum characters of output to deliver. "
"Defaults to ~9000 (5 messages). Set lower for "
"concise output."
),
},
},
"required": ["task_id"],
}
def _resolve_adapter(ctx: "ToolContext", platform_name: str):
"""Find the running platform adapter whose name matches *platform_name*.
Resolves a target platform (e.g. ``"discord"`` or ``"matrix"``) to the
live ``ProxyPlatformAdapter`` that can actually deliver output there, which
is what lets ``redirect_task`` route a task's result cross-platform. It
scans ``ctx.all_adapters`` and returns the first adapter whose ``name``
attribute equals *platform_name*, or ``None`` when none match. Pure lookup:
no Redis, network, or other side effects.
Called by :func:`run` to locate the destination adapter before registering
the output redirect; no other in-repo callers.
Args:
ctx: Tool execution context whose ``all_adapters`` holds the running
platform adapters.
platform_name: Name of the target platform to match against each
adapter's ``name``.
Returns:
The matching adapter object, or ``None`` if no running adapter has that
name.
"""
adapters = getattr(ctx, "all_adapters", None) or []
for adapter in adapters:
if getattr(adapter, "name", "") == platform_name:
return adapter
return None
[docs]
async def run(
task_id: str,
channel_id: str = "",
platform: str = "",
max_chars: int = 0,
ctx: "ToolContext | None" = None,
) -> str:
"""Wire a backgrounded task's eventual output to a target channel/platform.
This is the entry point invoked by the tool loader (the module-level
``run`` callable discovered via ``getattr(module, "run")`` in
``tool_loader.py``) when the model calls the ``redirect_task`` tool. It
resolves the destination channel and platform, looks up the matching
platform adapter, and registers an output redirect on the task manager so
that when the named background task finishes (success or failure) its
result is delivered to that channel without any polling.
The target channel and platform default to the originating ones taken from
``ctx.channel_id`` and ``ctx.platform``. The destination adapter is found
by calling :func:`_resolve_adapter`, which scans ``ctx.all_adapters`` for a
``ProxyPlatformAdapter`` whose ``name`` matches *target_platform*; because
any running adapter may be selected, the redirect works cross-platform
(e.g. a task started from Discord can be delivered into a Matrix room). The
actual wiring is performed by ``ctx.task_manager.set_output_redirect``,
which records the channel, platform, adapter, and ``max_chars`` cap against
the task; the eventual delivery is driven by the task manager / event-bus
machinery, not by this function. No Redis keys, knowledge-graph entries,
LLM calls, or HTTP requests are made here directly.
Args:
task_id: The identifier returned by an earlier backgrounded tool call
whose output should be redirected. Required; an empty value yields
a failure response.
channel_id: Target channel/room ID for delivery. Defaults to the
current channel (``ctx.channel_id``) when empty.
platform: Target platform name (e.g. ``"discord"``, ``"matrix"``).
Defaults to the current platform (``ctx.platform``) when empty.
max_chars: Maximum characters of output to deliver. ``0`` defers to the
task manager's default cap (about 9000, roughly five messages).
ctx: The tool execution context, providing ``task_manager``,
``channel_id``, ``platform``, and ``all_adapters``. Required for the
tool to operate.
Returns:
str: A JSON document. On success it includes ``success: true`` plus the
resolved ``task_id``, ``redirect_channel``, ``redirect_platform``,
``max_chars``, and a human-readable ``message``. On failure it includes
``success: false`` and an ``error`` describing the missing task
manager, missing ``task_id``, missing target channel, unresolved
platform adapter (with the list of available adapter names), or an
error string propagated from ``set_output_redirect``.
No internal callers were found; this function is dispatched dynamically by
the tool-loading machinery rather than invoked by name elsewhere.
"""
if ctx is None or ctx.task_manager is None:
return json.dumps({"success": False, "error": "Task manager not available."})
if not task_id:
return json.dumps({"success": False, "error": "task_id is required."})
target_channel = channel_id or ctx.channel_id
target_platform = platform or ctx.platform
if not target_channel:
return json.dumps({"success": False, "error": "No target channel specified."})
adapter = _resolve_adapter(ctx, target_platform)
if adapter is None:
available = [
getattr(a, "name", "?") for a in (getattr(ctx, "all_adapters", None) or [])
]
return json.dumps(
{
"success": False,
"error": (
f"No adapter found for platform '{target_platform}'. "
f"Available: {available}"
),
}
)
err = ctx.task_manager.set_output_redirect(
task_id,
target_channel,
target_platform,
adapter,
max_chars=max_chars,
)
if err:
return json.dumps({"success": False, "error": err})
return json.dumps(
{
"success": True,
"task_id": task_id,
"redirect_channel": target_channel,
"redirect_platform": target_platform,
"max_chars": max_chars or "default (~9000)",
"message": (
f"Task '{task_id}' output will be delivered to "
f"{target_platform}:{target_channel} when it finishes."
),
}
)