Source code for tools.await_task

"""Synchronously block until a background task completes.

Unlike ``check_task`` which returns immediately with a status snapshot,
``await_task`` **suspends the entire inference cycle** until the task
finishes.  From the LLM's perspective no time passes — the result
materialises in the very next thought block.
"""

from __future__ import annotations

import json
from typing import TYPE_CHECKING

if TYPE_CHECKING:
    from tool_context import ToolContext

TOOL_NO_BACKGROUND = True
TOOL_ALLOW_REPEAT = True
TOOL_NAME = "await_task"
TOOL_DESCRIPTION = (
    "Block until a background task completes and return its result "
    "inline. Unlike check_task (which polls), this tool suspends the "
    "current inference cycle so the result appears instantly in your "
    "next thought. Use this when you need the output before you can "
    "continue reasoning.  Accepts an optional timeout in seconds "
    "(default 300, max 300)."
)
TOOL_PARAMETERS = {
    "type": "object",
    "properties": {
        "task_id": {
            "type": "string",
            "description": (
                "The task_id returned by a backgrounded tool call."
            ),
        },
        "timeout": {
            "type": "number",
            "description": (
                "Maximum seconds to wait (default 300, max 300). "
                "If the task hasn't finished by then, a timeout "
                "error is returned and the task keeps running."
            ),
        },
    },
    "required": ["task_id"],
}

MAX_TIMEOUT = 300.0


[docs] async def run( task_id: str | None = None, timeout: float = 300.0, ctx: "ToolContext | None" = None, **_kwargs, ) -> str: """Execute this tool and return the result. Args: task_id (str | None): Background task identifier. timeout (float): Maximum wait time in seconds. ctx ('ToolContext | None'): Tool execution context providing access to bot internals. Returns: str: Result string. """ if ctx is None or ctx.task_manager is None: return json.dumps({"error": "Task manager not available."}) if not task_id: return json.dumps({"error": "task_id is required."}) # Clamp timeout try: timeout = float(timeout) except (TypeError, ValueError): timeout = MAX_TIMEOUT if timeout <= 0: timeout = MAX_TIMEOUT if timeout > MAX_TIMEOUT: timeout = MAX_TIMEOUT return await ctx.task_manager.await_result(task_id, timeout=timeout)