Source code for task_manager

"""Fire-and-forget task manager for tool execution.

Wraps tool handler coroutines with a configurable timeout.  If a tool
completes within the timeout its result is returned inline.  Otherwise
the coroutine continues as a background :class:`asyncio.Task` and a
JSON envelope containing a task ID is returned so the LLM can poll for
results later via the ``check_task`` tool.

Background tasks may still hold the :class:`~tool_context.ToolContext`
injected into the tool and mutate it after the enclosing tool-calling
loop has moved on. Prefer ``no_background`` on tools that perform
persistent ``ctx`` side effects the model must observe in-order.

Output redirect
~~~~~~~~~~~~~~~
Any backgrounded task can have its result automatically delivered to a
channel on any platform when it finishes.  Call
:meth:`TaskManager.set_output_redirect` (or use the ``redirect_task``
tool) to configure this.
"""

from __future__ import annotations

import asyncio
import jsonutil as json
import logging
import time
import uuid
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, TYPE_CHECKING

if TYPE_CHECKING:
    from platforms.base import PlatformAdapter

logger = logging.getLogger(__name__)

from message_utils import split_message

REDIS_KEY_PREFIX = "task_result:"
REDIS_TTL_SECONDS = 86400  # 24 hours

# Persist full tool result in Redis only up to this size; larger results stay
# in-memory on the TaskRecord (check_task on the same process still works).
REDIS_PERSIST_RESULT_INLINE_MAX = 512 * 1024
# Build JSON off the event loop when the persisted blob would be heavy.
REDIS_PERSIST_JSON_OFFLOAD_MIN = 256 * 1024

DEFAULT_REDIRECT_MAX_CHARS = 9000
MAX_REDIRECT_MESSAGES = 5

# In-memory eviction for completed/failed records (T3-7).
MAX_COMPLETED_TASKS = 500
# 0 = TTL eviction disabled; set to e.g. 86400 to drop stale completed entries.
COMPLETED_TASK_MEMORY_TTL_SECONDS = 0


[docs] class TaskStatus(str, Enum): """Lifecycle states for a tracked background tool task. A string-valued enum (so members serialize directly into JSON envelopes and Redis fields as their plain values) covering the three terminal-or-not states a task can be in: ``RUNNING`` while the coroutine is in flight, ``COMPLETED`` on success, and ``FAILED`` on cancellation or exception. Used throughout :class:`TaskManager` and persisted into the ``sg:task:{task_id}`` Redis hash and the legacy JSON envelopes that the ``check_task`` tool reads back. """ RUNNING = "running" COMPLETED = "completed" FAILED = "failed"
[docs] @dataclass class TaskRecord: """In-memory bookkeeping for one tracked background tool task. Captures everything :class:`TaskManager` needs to track, persist, and deliver a backgrounded tool invocation: identity and origin (``task_id``, ``tool_name``, ``user_id``, ``channel_id``, ``platform``), the live :class:`asyncio.Task` handle, the current :class:`TaskStatus`, and the captured ``result`` or ``error`` once it finishes. The trailing output-redirect fields hold an optional delivery target — channel, platform, resolved :class:`~platforms.base.PlatformAdapter`, and a character cap — so a finished task's result can be pushed to a chat channel. Created by :meth:`TaskManager.execute` and mutated in place by :meth:`TaskManager._on_task_done`. """ task_id: str tool_name: str status: TaskStatus created_at: float = field(default_factory=time.time) result: str | None = None error: str | None = None user_id: str = "" channel_id: str = "" platform: str = "" asyncio_task: asyncio.Task | None = field( default=None, repr=False, ) # -- Output redirect ----------------------------------------------- redirect_channel_id: str = "" """Channel to deliver the result to when the task finishes.""" redirect_platform: str = "" """Platform name for the redirect target.""" redirect_adapter: Any = field(default=None, repr=False) """Resolved :class:`PlatformAdapter` for delivery.""" redirect_max_chars: int = 0 """Max characters of output body to deliver (0 = use default)."""
[docs] class TaskManager: """Manage fire-and-forget tool execution with timeout. Parameters ---------- timeout: Seconds to wait for a tool to complete before backgrounding it. Defaults to ``10.0``. redis: Optional async Redis client for persisting completed results. """
[docs] def __init__( self, timeout: float = 10.0, redis: Any = None, ) -> None: """Configure the timeout budget and Redis handle for task tracking. Stores the inline-vs-background ``timeout`` threshold and the optional async Redis client used to persist task state, and initializes the empty ``self._tasks`` registry that maps each task ID to its :class:`TaskRecord`. Performs no I/O. Constructed once per worker by the inference and web services (``inference_main`` and ``web_main``, both with ``timeout=10.0`` and the shared Redis client) and exposed to tools through the injected :class:`~tool_context.ToolContext`. Args: timeout (float): Seconds to wait for a tool coroutine to finish before backgrounding it; defaults to ``10.0``. redis (Any): Optional async Redis client for persisting running and completed results; when ``None``, tasks are tracked in memory only. """ self.timeout = timeout self.redis = redis self._tasks: dict[str, TaskRecord] = {}
# ------------------------------------------------------------------ # Core execution # ------------------------------------------------------------------
[docs] async def execute( self, coro: Any, tool_name: str = "", user_id: str = "", channel_id: str = "", platform: str = "", ) -> str: """Run *coro* with a timeout; background it if it takes too long. Returns the tool result string directly when the coroutine finishes within :attr:`timeout`, or a JSON envelope with a ``task_id`` when it does not. """ task = asyncio.create_task(coro) t0 = time.monotonic() done, _ = await asyncio.wait({task}, timeout=self.timeout) if done: duration_ms = (time.monotonic() - t0) * 1000 try: from observability import publish_debug_event asyncio.create_task( publish_debug_event( "task_execution", "task_manager", channel_id=channel_id, platform=platform, phase="inline", status="ok", duration_ms=duration_ms, preview=f"tool={tool_name}", payload={"tool_name": tool_name, "user_id": user_id}, ), name="obs_task_execution", ) except Exception: pass return str(task.result()) task_id = uuid.uuid4().hex[:12] record = TaskRecord( task_id=task_id, tool_name=tool_name, status=TaskStatus.RUNNING, user_id=user_id, channel_id=channel_id, platform=platform, asyncio_task=task, ) self._tasks[task_id] = record task.add_done_callback( lambda t: self._on_task_done(task_id, t), ) logger.info( "Tool '%s' backgrounded as task %s", tool_name, task_id, ) if self.redis is not None: asyncio.create_task(self._persist_running_task(record)) try: from observability import publish_debug_event asyncio.create_task( publish_debug_event( "task_execution", "task_manager", channel_id=channel_id, platform=platform, phase="backgrounded", status="backgrounded", preview=f"tool={tool_name} task_id={task_id}", payload={ "tool_name": tool_name, "task_id": task_id, "backgrounded": True, "user_id": user_id, }, ), name="obs_task_execution", ) except Exception: pass return json.dumps( { "task_id": task_id, "tool_name": tool_name, "status": TaskStatus.RUNNING.value, "message": ( f"Tool '{tool_name}' is running in the background. " f"Use check_task with task_id '{task_id}' to get " f"the result." ), } )
async def _persist_running_task(self, record: TaskRecord) -> None: """Persist a freshly backgrounded task's metadata to Redis. Writes the identity and origin fields (task id, tool name, status, creation time, user/channel/platform) of a still-running task into the ``sg:task:{task_id}`` Redis hash via ``hset`` and sets a ``REDIS_TTL_SECONDS`` (24 h) expiry, so another process can answer ``check_task`` for a task it did not start. A ``None`` Redis client makes this a no-op, and any Redis error is caught and logged as a warning rather than propagated. Scheduled fire-and-forget by :meth:`execute` when a tool is backgrounded. Args: record (TaskRecord): The running task whose metadata is persisted. """ if self.redis is None: return try: mapping = { "task_id": record.task_id, "tool_name": record.tool_name, "status": record.status.value, "created_at": str(record.created_at), "user_id": record.user_id, "channel_id": record.channel_id, "platform": record.platform, } task_key = f"sg:task:{record.task_id}" await self.redis.hset(task_key, mapping=mapping) await self.redis.expire(task_key, REDIS_TTL_SECONDS) except Exception as e: logger.warning("Failed to persist running task %s to Redis: %s", record.task_id, e) def _redis_payload_allowed(self, cached: Any, user_id: str | None) -> str | None: """Decode a legacy JSON task envelope and return its result if accessible. Handles the older ``task_result:{task_id}`` string format: decodes the cached bytes/str, parses the JSON, and enforces ownership — when ``user_id`` is supplied and does not match the record's owner it returns a ``not found`` error envelope rather than leaking another user's result. For a completed task it returns the stored result (or the whole envelope when the body was omitted as too large), and for a failed task a compact error envelope. Returns ``None`` for running tasks or any parse error so callers fall through to other lookups. Pure with respect to state; called by :meth:`get_result` and :meth:`await_result` on the legacy fallback path. Args: cached (Any): The raw cached value (``bytes`` or ``str``) holding a JSON task envelope. user_id (str | None): If set, only the owning user may read the result; mismatches yield a ``not found`` envelope. Returns: str | None: A result or error string when the envelope is terminal and accessible, otherwise ``None``. """ try: s = cached.decode() if isinstance(cached, bytes) else cached data = json.loads(s) rec_user_id = data.get("user_id", "") if user_id is not None and rec_user_id != user_id: return json.dumps({"error": f"Task '{data.get('task_id')}' not found."}) status = data.get("status") if status == TaskStatus.COMPLETED.value: if data.get("result_omitted"): return json.dumps(data) return data.get("result", "") elif status == TaskStatus.FAILED.value: return json.dumps({ "task_id": data.get("task_id"), "status": TaskStatus.FAILED.value, "error": data.get("error"), }) except Exception: pass return None # ------------------------------------------------------------------ # Result retrieval # ------------------------------------------------------------------
[docs] async def get_result( self, task_id: str, user_id: str | None = None, ) -> str: """Return the result for *task_id*, or a status update. If *user_id* is set, only tasks owned by that user are returned. """ record = self._tasks.get(task_id) if record is not None: if user_id is not None and record.user_id != user_id: return json.dumps( { "error": f"Task '{task_id}' not found.", } ) if record.status == TaskStatus.COMPLETED: return record.result or "" if record.status == TaskStatus.FAILED: return json.dumps( { "task_id": task_id, "status": TaskStatus.FAILED.value, "error": record.error, } ) return json.dumps( { "task_id": task_id, "tool_name": record.tool_name, "status": TaskStatus.RUNNING.value, "elapsed_seconds": round( time.time() - record.created_at, 1, ), } ) # Check Redis for the new global Redis Hash format if self.redis is not None: try: task_key = f"sg:task:{task_id}" data = await self.redis.hgetall(task_key) if data: decoded = { k.decode() if isinstance(k, bytes) else k: v.decode() if isinstance(v, bytes) else v for k, v in data.items() } rec_user_id = decoded.get("user_id", "") if user_id is not None and rec_user_id != user_id: return json.dumps({"error": f"Task '{task_id}' not found."}) status = decoded.get("status") if status == TaskStatus.COMPLETED.value: return decoded.get("result", "") if status == TaskStatus.FAILED.value: return json.dumps({ "task_id": task_id, "status": TaskStatus.FAILED.value, "error": decoded.get("error"), }) created_at = float(decoded.get("created_at", time.time())) return json.dumps({ "task_id": task_id, "tool_name": decoded.get("tool_name", ""), "status": TaskStatus.RUNNING.value, "elapsed_seconds": round(time.time() - created_at, 1), }) except Exception as e: logger.warning("Redis lookup for task %s hash failed: %s", task_id, e) # Fallback to legacy string format if still present try: cached = await self.redis.get( f"{REDIS_KEY_PREFIX}{task_id}", ) if cached: allowed = self._redis_payload_allowed(cached, user_id) if allowed is not None: return allowed if user_id is not None: return json.dumps( { "error": f"Task '{task_id}' not found.", } ) except Exception: logger.warning( "Redis lookup failed for task %s", task_id, exc_info=True, ) return json.dumps( { "error": f"Task '{task_id}' not found.", } )
[docs] async def await_result( self, task_id: str, timeout: float = 300.0, user_id: str | None = None, ) -> str: """Block until *task_id* completes and return its result. Unlike :meth:`get_result` which returns immediately with a status update, this method **awaits** the underlying :class:`asyncio.Task` so the caller's coroutine is suspended until the work finishes. If *user_id* is set, only tasks owned by that user are awaited or returned (same semantics as :meth:`get_result`). Parameters ---------- timeout: Maximum seconds to wait. Defaults to ``300`` (5 minutes). If exceeded, a timeout error JSON envelope is returned. """ record = self._tasks.get(task_id) # --- Already finished (in-memory) -------------------------------- if record is not None: if user_id is not None and record.user_id != user_id: return json.dumps( { "error": f"Task '{task_id}' not found.", } ) if record.status == TaskStatus.COMPLETED: return record.result or "" if record.status == TaskStatus.FAILED: return json.dumps( { "task_id": task_id, "status": TaskStatus.FAILED.value, "error": record.error, } ) # --- Still running: await the asyncio.Task -------------------- atask = record.asyncio_task if atask is not None: try: await asyncio.wait_for( asyncio.shield(atask), timeout=timeout, ) except asyncio.TimeoutError: return json.dumps( { "task_id": task_id, "status": "timeout", "error": ( f"Task '{task_id}' did not complete within " f"{timeout}s. It is still running in the " f"background — use check_task to poll later." ), } ) except asyncio.CancelledError: pass # fall through to the status re-check below except Exception: pass # task raised; result captured by _on_task_done # Re-read status — _on_task_done has updated the record. if record.status == TaskStatus.COMPLETED: return record.result or "" if record.status == TaskStatus.FAILED: return json.dumps( { "task_id": task_id, "status": TaskStatus.FAILED.value, "error": record.error, } ) # asyncio_task was already None (finished between our checks) return await self.get_result(task_id, user_id=user_id) # --- Not in memory: check Redis ---------------------------------- if self.redis is not None: try: task_key = f"sg:task:{task_id}" data = await self.redis.hgetall(task_key) if data: decoded = { k.decode() if isinstance(k, bytes) else k: v.decode() if isinstance(v, bytes) else v for k, v in data.items() } rec_user_id = decoded.get("user_id", "") if user_id is not None and rec_user_id != user_id: return json.dumps({"error": f"Task '{task_id}' not found."}) status = decoded.get("status") if status == TaskStatus.COMPLETED.value: return decoded.get("result", "") if status == TaskStatus.FAILED.value: return json.dumps({ "task_id": task_id, "status": TaskStatus.FAILED.value, "error": decoded.get("error"), }) # If still running, subscribe to sg:task:done:{task_id} channel = f"sg:task:done:{task_id}" pubsub = self.redis.pubsub() try: await pubsub.subscribe(channel) t0 = time.monotonic() while time.monotonic() - t0 < timeout: try: msg = await pubsub.get_message(ignore_subscribe_messages=True, timeout=1.0) if msg is not None: res_data = await self.redis.hgetall(task_key) if res_data: res_decoded = { k.decode() if isinstance(k, bytes) else k: v.decode() if isinstance(v, bytes) else v for k, v in res_data.items() } res_status = res_decoded.get("status") if res_status == TaskStatus.COMPLETED.value: return res_decoded.get("result", "") if res_status == TaskStatus.FAILED.value: return json.dumps({ "task_id": task_id, "status": TaskStatus.FAILED.value, "error": res_decoded.get("error"), }) break await asyncio.sleep(0.05) except asyncio.CancelledError: break except Exception as ex: return json.dumps({"error": f"Error awaiting task result: {ex}"}) else: return json.dumps({ "task_id": task_id, "status": "timeout", "error": ( f"Task '{task_id}' did not complete within " f"{timeout}s." ), }) finally: try: await pubsub.unsubscribe(channel) await pubsub.aclose() except Exception: pass except Exception as e: logger.warning("Redis pubsub await for task %s failed: %s", task_id, e) # Fallback to legacy string format if still present try: cached = await self.redis.get( f"{REDIS_KEY_PREFIX}{task_id}", ) if cached: allowed = self._redis_payload_allowed(cached, user_id) if allowed is not None: return allowed except Exception: logger.warning( "Redis lookup failed for task %s", task_id, exc_info=True, ) return json.dumps( { "error": f"Task '{task_id}' not found.", } )
[docs] async def list_tasks(self, user_id: str | None = None) -> str: """Return a JSON summary of tracked tasks. If *user_id* is provided, only tasks belonging to that user are returned. Pass ``None`` to list all tasks. """ now = time.time() tasks = [] for rec in self._tasks.values(): if user_id is not None and rec.user_id != user_id: continue entry: dict[str, Any] = { "task_id": rec.task_id, "tool_name": rec.tool_name, "status": rec.status.value, "user_id": rec.user_id, "elapsed_seconds": round( now - rec.created_at, 1, ), "created_at": rec.created_at, } if rec.status == TaskStatus.FAILED: entry["error"] = rec.error tasks.append(entry) return json.dumps( { "tasks": tasks, "count": len(tasks), } )
# ------------------------------------------------------------------ # Output redirect # ------------------------------------------------------------------
[docs] def set_output_redirect( self, task_id: str, channel_id: str, platform: str, adapter: "PlatformAdapter", max_chars: int = 0, ) -> str | None: """Configure a task to deliver its result to *channel_id* on finish. Returns an error string if the task is not found or already finished, otherwise ``None``. """ record = self._tasks.get(task_id) if record is None: return f"Task '{task_id}' not found." record.redirect_channel_id = channel_id record.redirect_platform = platform record.redirect_adapter = adapter record.redirect_max_chars = max_chars # If the task already finished before the redirect was set, # deliver immediately. if record.status in (TaskStatus.COMPLETED, TaskStatus.FAILED): asyncio.create_task(self._deliver_output(record)) return None
# ------------------------------------------------------------------ # Internal # ------------------------------------------------------------------ def _on_task_done( self, task_id: str, task: asyncio.Task, ) -> None: """Finalize a backgrounded task's record when its coroutine settles. Registered as the ``asyncio`` done-callback on the backgrounded task by :meth:`execute`. Clears the live task handle, then transitions the :class:`TaskRecord` to ``FAILED`` (on cancellation or exception, capturing the error string) or ``COMPLETED`` (capturing ``str(result)``), logging at the appropriate level. It emits a ``task_execution`` debug event via ``observability.publish_debug_event``, schedules :meth:`_persist_result` when Redis is configured, schedules :meth:`_deliver_output` when an output redirect was set, and finally calls :meth:`_prune_completed_memory_tasks` to bound in-memory growth. Side effects on Redis, observability, and chat delivery all run as fire-and-forget tasks so the callback itself stays non-blocking. Args: task_id (str): Identifier of the finished task, used to look up its record. task (asyncio.Task): The settled task whose cancellation/exception/ result determines the final status. """ record = self._tasks.get(task_id) if record is None: return record.asyncio_task = None if task.cancelled(): record.status = TaskStatus.FAILED record.error = "Task was cancelled." logger.warning("Task %s was cancelled", task_id) elif (exc := task.exception()) is not None: record.status = TaskStatus.FAILED record.error = f"{type(exc).__name__}: {exc}" logger.error( "Task %s failed: %s", task_id, exc, exc_info=exc, ) else: record.status = TaskStatus.COMPLETED record.result = str(task.result()) logger.info("Task %s completed", task_id) # Emit task_execution telemetry for completed/failed outcomes try: from observability import publish_debug_event _phase = "completed" if record.status == TaskStatus.COMPLETED else "failed" _status = "ok" if record.status == TaskStatus.COMPLETED else "error" _duration = (time.time() - record.created_at) * 1000 _payload: dict = { "tool_name": record.tool_name, "task_id": task_id, "user_id": record.user_id, } if record.status == TaskStatus.FAILED and record.error: _payload["error_type"] = ( record.error.split(":")[0] if ":" in record.error else record.error ) _payload["error_message"] = record.error[:500] asyncio.create_task( publish_debug_event( "task_execution", "task_manager", channel_id=record.channel_id, platform=record.platform, phase=_phase, status=_status, duration_ms=_duration, preview=f"tool={record.tool_name} task_id={task_id}", payload=_payload, ), name="obs_task_execution", ) except Exception: pass # Persist to Redis asynchronously if self.redis is not None: asyncio.create_task(self._persist_result(record)) # Deliver output to redirect channel if configured if record.redirect_adapter is not None and record.redirect_channel_id: asyncio.create_task(self._deliver_output(record)) self._prune_completed_memory_tasks() def _prune_completed_memory_tasks(self) -> None: """Bound the in-memory registry by evicting old terminal task records. Keeps ``self._tasks`` from growing without limit: when ``COMPLETED_TASK_MEMORY_TTL_SECONDS`` is enabled it first drops any completed or failed record older than that TTL, then, regardless, evicts the oldest terminal records (by creation time) beyond ``MAX_COMPLETED_TASKS``. Running tasks are never pruned, and Redis copies (if persisted) are unaffected — only the in-process cache shrinks. Called by :meth:`_on_task_done` after each task settles. """ now = time.time() if COMPLETED_TASK_MEMORY_TTL_SECONDS > 0: ttl = COMPLETED_TASK_MEMORY_TTL_SECONDS for tid in [ t for t, rec in self._tasks.items() if rec.status in (TaskStatus.COMPLETED, TaskStatus.FAILED) and (now - rec.created_at) > ttl ]: del self._tasks[tid] terminal = [ (rec.created_at, tid) for tid, rec in self._tasks.items() if rec.status in (TaskStatus.COMPLETED, TaskStatus.FAILED) ] terminal.sort(key=lambda x: x[0]) excess = len(terminal) - MAX_COMPLETED_TASKS if excess > 0: for _, tid in terminal[:excess]: self._tasks.pop(tid, None) async def _persist_result(self, record: TaskRecord) -> None: """Persist a settled task's result to Redis and wake any waiters. Writes the terminal outcome in two formats within a single Redis pipeline: the globally coherent ``sg:task:{task_id}`` hash (status plus result or error), a ``publish`` to ``sg:task:done:{task_id}`` that unblocks :meth:`await_result` subscribers on other processes, and the legacy ``task_result:{task_id}`` JSON string value set with a ``REDIS_TTL_SECONDS`` expiry. Oversized completed bodies (above ``REDIS_PERSIST_RESULT_INLINE_MAX``) are stored with the result omitted and a note that the full body is only available via ``check_task`` on the owning process, and heavy-but-allowed payloads have their JSON serialized off the event loop via :func:`asyncio.to_thread`. Any failure is caught and logged as a warning. Scheduled fire-and-forget by :meth:`_on_task_done`. Args: record (TaskRecord): The completed or failed task to persist. """ try: # 1. New globally coherent Redis Hash format: sg:task:{task_id} task_key = f"sg:task:{record.task_id}" mapping = { "status": record.status.value, } if record.result is not None: mapping["result"] = record.result if record.error is not None: mapping["error"] = record.error pipe = self.redis.pipeline() pipe.hset(task_key, mapping=mapping) pipe.publish(f"sg:task:done:{record.task_id}", "1") # 2. Legacy JSON string value key: task_result:{task_id} if record.status == TaskStatus.COMPLETED: body = record.result or "" if len(body) > REDIS_PERSIST_RESULT_INLINE_MAX: payload = { "task_id": record.task_id, "status": record.status.value, "user_id": record.user_id, "result": None, "result_omitted": True, "result_note": ( "Result too large for Redis; full body is still " "available via check_task on this bot process." ), } value = json.dumps(payload) elif len(body) > REDIS_PERSIST_JSON_OFFLOAD_MIN: value = await asyncio.to_thread( json.dumps, { "task_id": record.task_id, "status": record.status.value, "user_id": record.user_id, "result": body, }, ) else: value = json.dumps( { "task_id": record.task_id, "status": record.status.value, "user_id": record.user_id, "result": body, } ) else: err = record.error or "" if len(err) > REDIS_PERSIST_JSON_OFFLOAD_MIN: value = await asyncio.to_thread( json.dumps, { "task_id": record.task_id, "status": record.status.value, "user_id": record.user_id, "error": err, }, ) else: value = json.dumps( { "task_id": record.task_id, "status": record.status.value, "user_id": record.user_id, "error": err, } ) pipe.set( f"{REDIS_KEY_PREFIX}{record.task_id}", value, ex=REDIS_TTL_SECONDS, ) await pipe.execute() except Exception: logger.warning( "Failed to persist task %s to Redis", record.task_id, exc_info=True, ) async def _deliver_output(self, record: TaskRecord) -> None: """Send the task result to the configured redirect channel. Splits across up to :data:`MAX_REDIRECT_MESSAGES` messages and only truncates when that budget is exhausted. """ adapter = record.redirect_adapter channel_id = record.redirect_channel_id if adapter is None or not channel_id: return elapsed = round(time.time() - record.created_at, 1) tool = record.tool_name or "unknown" max_chars = record.redirect_max_chars or DEFAULT_REDIRECT_MAX_CHARS if record.status == TaskStatus.COMPLETED: body = record.result or "(empty result)" truncated = len(body) > max_chars if truncated: body = body[:max_chars] header = f"**`{tool}`** completed in {elapsed}s\n" suffix = "\n…[truncated]" if truncated else "" text = f"{header}```\n{body}{suffix}\n```" elif record.status == TaskStatus.FAILED: err = record.error or "unknown error" text = f"**`{tool}`** failed after {elapsed}s\n" f"```\n{err}\n```" else: return chunks = split_message(text, max_length=1950) try: for chunk in chunks[:MAX_REDIRECT_MESSAGES]: await adapter.send(channel_id, chunk) logger.info( "Delivered task %s output (%d msg) to %s:%s", record.task_id, min(len(chunks), MAX_REDIRECT_MESSAGES), record.redirect_platform, channel_id, ) # Emit redirect_delivered event (fire-and-forget) try: from observability import publish_debug_event as _pde asyncio.create_task( _pde( "task_execution", "task_manager", phase="redirect_delivered", status="ok", channel_id=channel_id, platform=record.redirect_platform or "", preview=( f"tool={tool} chunks={min(len(chunks), MAX_REDIRECT_MESSAGES)} " f"elapsed={elapsed}s" ), payload={ "redirect_channel_id": channel_id, "redirect_platform": record.redirect_platform or "", "chunks_sent": min(len(chunks), MAX_REDIRECT_MESSAGES), "task_id": record.task_id, "tool_name": tool, }, ) ) except Exception: pass except Exception: logger.exception( "Failed to deliver task %s output to %s:%s", record.task_id, record.redirect_platform, channel_id, )