Source code for tools.redirect_task

"""Wire a backgrounded task's output to any channel on any platform.

When the task finishes (success or failure), its result is automatically
delivered to the target channel — no polling required.

Works cross-platform: specify a ``platform`` name to target a different
platform than the one the command was issued from.
"""

from __future__ import annotations

import jsonutil as json
from typing import TYPE_CHECKING

if TYPE_CHECKING:
    from tool_context import ToolContext

TOOL_NAME = "redirect_task"
TOOL_NO_BACKGROUND = True
TOOL_DESCRIPTION = (
    "Redirect a backgrounded task's output to a channel. When the task "
    "finishes, its result is automatically sent to the specified channel "
    "on the specified platform — no need to poll with check_task. "
    "Defaults to the current channel and platform if omitted. "
    "Works cross-platform (e.g. redirect a task started from Discord "
    "into a Matrix room, or vice versa)."
)
TOOL_PARAMETERS = {
    "type": "object",
    "properties": {
        "task_id": {
            "type": "string",
            "description": "The task_id returned by a backgrounded tool call.",
        },
        "channel_id": {
            "type": "string",
            "description": (
                "Target channel/room ID for the output. "
                "Defaults to the current channel."
            ),
        },
        "platform": {
            "type": "string",
            "description": (
                "Target platform name (e.g. 'discord', 'matrix'). "
                "Defaults to the current platform."
            ),
        },
        "max_chars": {
            "type": "integer",
            "description": (
                "Maximum characters of output to deliver. "
                "Defaults to ~9000 (5 messages). Set lower for "
                "concise output."
            ),
        },
    },
    "required": ["task_id"],
}


def _resolve_adapter(ctx: "ToolContext", platform_name: str):
    """Find the running platform adapter whose name matches *platform_name*.

    Resolves a target platform (e.g. ``"discord"`` or ``"matrix"``) to the
    live ``ProxyPlatformAdapter`` that can actually deliver output there, which
    is what lets ``redirect_task`` route a task's result cross-platform. It
    scans ``ctx.all_adapters`` and returns the first adapter whose ``name``
    attribute equals *platform_name*, or ``None`` when none match. Pure lookup:
    no Redis, network, or other side effects.

    Called by :func:`run` to locate the destination adapter before registering
    the output redirect; no other in-repo callers.

    Args:
        ctx: Tool execution context whose ``all_adapters`` holds the running
            platform adapters.
        platform_name: Name of the target platform to match against each
            adapter's ``name``.

    Returns:
        The matching adapter object, or ``None`` if no running adapter has that
        name.
    """
    adapters = getattr(ctx, "all_adapters", None) or []
    for adapter in adapters:
        if getattr(adapter, "name", "") == platform_name:
            return adapter
    return None


[docs] async def run( task_id: str, channel_id: str = "", platform: str = "", max_chars: int = 0, ctx: "ToolContext | None" = None, ) -> str: """Wire a backgrounded task's eventual output to a target channel/platform. This is the entry point invoked by the tool loader (the module-level ``run`` callable discovered via ``getattr(module, "run")`` in ``tool_loader.py``) when the model calls the ``redirect_task`` tool. It resolves the destination channel and platform, looks up the matching platform adapter, and registers an output redirect on the task manager so that when the named background task finishes (success or failure) its result is delivered to that channel without any polling. The target channel and platform default to the originating ones taken from ``ctx.channel_id`` and ``ctx.platform``. The destination adapter is found by calling :func:`_resolve_adapter`, which scans ``ctx.all_adapters`` for a ``ProxyPlatformAdapter`` whose ``name`` matches *target_platform*; because any running adapter may be selected, the redirect works cross-platform (e.g. a task started from Discord can be delivered into a Matrix room). The actual wiring is performed by ``ctx.task_manager.set_output_redirect``, which records the channel, platform, adapter, and ``max_chars`` cap against the task; the eventual delivery is driven by the task manager / event-bus machinery, not by this function. No Redis keys, knowledge-graph entries, LLM calls, or HTTP requests are made here directly. Args: task_id: The identifier returned by an earlier backgrounded tool call whose output should be redirected. Required; an empty value yields a failure response. channel_id: Target channel/room ID for delivery. Defaults to the current channel (``ctx.channel_id``) when empty. platform: Target platform name (e.g. ``"discord"``, ``"matrix"``). Defaults to the current platform (``ctx.platform``) when empty. max_chars: Maximum characters of output to deliver. ``0`` defers to the task manager's default cap (about 9000, roughly five messages). ctx: The tool execution context, providing ``task_manager``, ``channel_id``, ``platform``, and ``all_adapters``. Required for the tool to operate. Returns: str: A JSON document. On success it includes ``success: true`` plus the resolved ``task_id``, ``redirect_channel``, ``redirect_platform``, ``max_chars``, and a human-readable ``message``. On failure it includes ``success: false`` and an ``error`` describing the missing task manager, missing ``task_id``, missing target channel, unresolved platform adapter (with the list of available adapter names), or an error string propagated from ``set_output_redirect``. No internal callers were found; this function is dispatched dynamically by the tool-loading machinery rather than invoked by name elsewhere. """ if ctx is None or ctx.task_manager is None: return json.dumps({"success": False, "error": "Task manager not available."}) if not task_id: return json.dumps({"success": False, "error": "task_id is required."}) target_channel = channel_id or ctx.channel_id target_platform = platform or ctx.platform if not target_channel: return json.dumps({"success": False, "error": "No target channel specified."}) adapter = _resolve_adapter(ctx, target_platform) if adapter is None: available = [ getattr(a, "name", "?") for a in (getattr(ctx, "all_adapters", None) or []) ] return json.dumps( { "success": False, "error": ( f"No adapter found for platform '{target_platform}'. " f"Available: {available}" ), } ) err = ctx.task_manager.set_output_redirect( task_id, target_channel, target_platform, adapter, max_chars=max_chars, ) if err: return json.dumps({"success": False, "error": err}) return json.dumps( { "success": True, "task_id": task_id, "redirect_channel": target_channel, "redirect_platform": target_platform, "max_chars": max_chars or "default (~9000)", "message": ( f"Task '{task_id}' output will be delivered to " f"{target_platform}:{target_channel} when it finishes." ), } )