"""Fire-and-forget task manager for tool execution.
Wraps tool handler coroutines with a configurable timeout. If a tool
completes within the timeout its result is returned inline. Otherwise
the coroutine continues as a background :class:`asyncio.Task` and a
JSON envelope containing a task ID is returned so the LLM can poll for
results later via the ``check_task`` tool.
Background tasks may still hold the :class:`~tool_context.ToolContext`
injected into the tool and mutate it after the enclosing tool-calling
loop has moved on. Prefer ``no_background`` on tools that perform
persistent ``ctx`` side effects the model must observe in-order.
Output redirect
~~~~~~~~~~~~~~~
Any backgrounded task can have its result automatically delivered to a
channel on any platform when it finishes. Call
:meth:`TaskManager.set_output_redirect` (or use the ``redirect_task``
tool) to configure this.
"""
from __future__ import annotations
import asyncio
import jsonutil as json
import logging
import time
import uuid
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, TYPE_CHECKING
if TYPE_CHECKING:
from platforms.base import PlatformAdapter
logger = logging.getLogger(__name__)
from message_utils import split_message
REDIS_KEY_PREFIX = "task_result:"
REDIS_TTL_SECONDS = 86400 # 24 hours
# Persist full tool result in Redis only up to this size; larger results stay
# in-memory on the TaskRecord (check_task on the same process still works).
REDIS_PERSIST_RESULT_INLINE_MAX = 512 * 1024
# Build JSON off the event loop when the persisted blob would be heavy.
REDIS_PERSIST_JSON_OFFLOAD_MIN = 256 * 1024
DEFAULT_REDIRECT_MAX_CHARS = 9000
MAX_REDIRECT_MESSAGES = 5
# In-memory eviction for completed/failed records (T3-7).
MAX_COMPLETED_TASKS = 500
# 0 = TTL eviction disabled; set to e.g. 86400 to drop stale completed entries.
COMPLETED_TASK_MEMORY_TTL_SECONDS = 0
[docs]
class TaskStatus(str, Enum):
"""Lifecycle states for a tracked background tool task.
A string-valued enum (so members serialize directly into JSON envelopes
and Redis fields as their plain values) covering the three terminal-or-not
states a task can be in: ``RUNNING`` while the coroutine is in flight,
``COMPLETED`` on success, and ``FAILED`` on cancellation or exception.
Used throughout :class:`TaskManager` and persisted into the
``sg:task:{task_id}`` Redis hash and the legacy JSON envelopes that the
``check_task`` tool reads back.
"""
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
[docs]
@dataclass
class TaskRecord:
"""In-memory bookkeeping for one tracked background tool task.
Captures everything :class:`TaskManager` needs to track, persist, and
deliver a backgrounded tool invocation: identity and origin (``task_id``,
``tool_name``, ``user_id``, ``channel_id``, ``platform``), the live
:class:`asyncio.Task` handle, the current :class:`TaskStatus`, and the
captured ``result`` or ``error`` once it finishes. The trailing
output-redirect fields hold an optional delivery target — channel,
platform, resolved :class:`~platforms.base.PlatformAdapter`, and a
character cap — so a finished task's result can be pushed to a chat
channel. Created by :meth:`TaskManager.execute` and mutated in place by
:meth:`TaskManager._on_task_done`.
"""
task_id: str
tool_name: str
status: TaskStatus
created_at: float = field(default_factory=time.time)
result: str | None = None
error: str | None = None
user_id: str = ""
channel_id: str = ""
platform: str = ""
asyncio_task: asyncio.Task | None = field(
default=None,
repr=False,
)
# -- Output redirect -----------------------------------------------
redirect_channel_id: str = ""
"""Channel to deliver the result to when the task finishes."""
redirect_platform: str = ""
"""Platform name for the redirect target."""
redirect_adapter: Any = field(default=None, repr=False)
"""Resolved :class:`PlatformAdapter` for delivery."""
redirect_max_chars: int = 0
"""Max characters of output body to deliver (0 = use default)."""
[docs]
class TaskManager:
"""Manage fire-and-forget tool execution with timeout.
Parameters
----------
timeout:
Seconds to wait for a tool to complete before backgrounding
it. Defaults to ``10.0``.
redis:
Optional async Redis client for persisting completed results.
"""
[docs]
def __init__(
self,
timeout: float = 10.0,
redis: Any = None,
) -> None:
"""Configure the timeout budget and Redis handle for task tracking.
Stores the inline-vs-background ``timeout`` threshold and the optional
async Redis client used to persist task state, and initializes the empty
``self._tasks`` registry that maps each task ID to its
:class:`TaskRecord`. Performs no I/O. Constructed once per worker by the
inference and web services (``inference_main`` and ``web_main``, both with
``timeout=10.0`` and the shared Redis client) and exposed to tools through
the injected :class:`~tool_context.ToolContext`.
Args:
timeout (float): Seconds to wait for a tool coroutine to finish before
backgrounding it; defaults to ``10.0``.
redis (Any): Optional async Redis client for persisting running and
completed results; when ``None``, tasks are tracked in memory only.
"""
self.timeout = timeout
self.redis = redis
self._tasks: dict[str, TaskRecord] = {}
# ------------------------------------------------------------------
# Core execution
# ------------------------------------------------------------------
[docs]
async def execute(
self,
coro: Any,
tool_name: str = "",
user_id: str = "",
channel_id: str = "",
platform: str = "",
) -> str:
"""Run *coro* with a timeout; background it if it takes too long.
Returns the tool result string directly when the coroutine
finishes within :attr:`timeout`, or a JSON envelope with a
``task_id`` when it does not.
"""
task = asyncio.create_task(coro)
t0 = time.monotonic()
done, _ = await asyncio.wait({task}, timeout=self.timeout)
if done:
duration_ms = (time.monotonic() - t0) * 1000
try:
from observability import publish_debug_event
asyncio.create_task(
publish_debug_event(
"task_execution",
"task_manager",
channel_id=channel_id,
platform=platform,
phase="inline",
status="ok",
duration_ms=duration_ms,
preview=f"tool={tool_name}",
payload={"tool_name": tool_name, "user_id": user_id},
),
name="obs_task_execution",
)
except Exception:
pass
return str(task.result())
task_id = uuid.uuid4().hex[:12]
record = TaskRecord(
task_id=task_id,
tool_name=tool_name,
status=TaskStatus.RUNNING,
user_id=user_id,
channel_id=channel_id,
platform=platform,
asyncio_task=task,
)
self._tasks[task_id] = record
task.add_done_callback(
lambda t: self._on_task_done(task_id, t),
)
logger.info(
"Tool '%s' backgrounded as task %s",
tool_name,
task_id,
)
if self.redis is not None:
asyncio.create_task(self._persist_running_task(record))
try:
from observability import publish_debug_event
asyncio.create_task(
publish_debug_event(
"task_execution",
"task_manager",
channel_id=channel_id,
platform=platform,
phase="backgrounded",
status="backgrounded",
preview=f"tool={tool_name} task_id={task_id}",
payload={
"tool_name": tool_name,
"task_id": task_id,
"backgrounded": True,
"user_id": user_id,
},
),
name="obs_task_execution",
)
except Exception:
pass
return json.dumps(
{
"task_id": task_id,
"tool_name": tool_name,
"status": TaskStatus.RUNNING.value,
"message": (
f"Tool '{tool_name}' is running in the background. "
f"Use check_task with task_id '{task_id}' to get "
f"the result."
),
}
)
async def _persist_running_task(self, record: TaskRecord) -> None:
"""Persist a freshly backgrounded task's metadata to Redis.
Writes the identity and origin fields (task id, tool name, status,
creation time, user/channel/platform) of a still-running task into the
``sg:task:{task_id}`` Redis hash via ``hset`` and sets a
``REDIS_TTL_SECONDS`` (24 h) expiry, so another process can answer
``check_task`` for a task it did not start. A ``None`` Redis client makes
this a no-op, and any Redis error is caught and logged as a warning rather
than propagated. Scheduled fire-and-forget by :meth:`execute` when a tool
is backgrounded.
Args:
record (TaskRecord): The running task whose metadata is persisted.
"""
if self.redis is None:
return
try:
mapping = {
"task_id": record.task_id,
"tool_name": record.tool_name,
"status": record.status.value,
"created_at": str(record.created_at),
"user_id": record.user_id,
"channel_id": record.channel_id,
"platform": record.platform,
}
task_key = f"sg:task:{record.task_id}"
await self.redis.hset(task_key, mapping=mapping)
await self.redis.expire(task_key, REDIS_TTL_SECONDS)
except Exception as e:
logger.warning("Failed to persist running task %s to Redis: %s", record.task_id, e)
def _redis_payload_allowed(self, cached: Any, user_id: str | None) -> str | None:
"""Decode a legacy JSON task envelope and return its result if accessible.
Handles the older ``task_result:{task_id}`` string format: decodes the
cached bytes/str, parses the JSON, and enforces ownership — when
``user_id`` is supplied and does not match the record's owner it returns a
``not found`` error envelope rather than leaking another user's result. For
a completed task it returns the stored result (or the whole envelope when
the body was omitted as too large), and for a failed task a compact error
envelope. Returns ``None`` for running tasks or any parse error so callers
fall through to other lookups. Pure with respect to state; called by
:meth:`get_result` and :meth:`await_result` on the legacy fallback path.
Args:
cached (Any): The raw cached value (``bytes`` or ``str``) holding a
JSON task envelope.
user_id (str | None): If set, only the owning user may read the
result; mismatches yield a ``not found`` envelope.
Returns:
str | None: A result or error string when the envelope is terminal and
accessible, otherwise ``None``.
"""
try:
s = cached.decode() if isinstance(cached, bytes) else cached
data = json.loads(s)
rec_user_id = data.get("user_id", "")
if user_id is not None and rec_user_id != user_id:
return json.dumps({"error": f"Task '{data.get('task_id')}' not found."})
status = data.get("status")
if status == TaskStatus.COMPLETED.value:
if data.get("result_omitted"):
return json.dumps(data)
return data.get("result", "")
elif status == TaskStatus.FAILED.value:
return json.dumps({
"task_id": data.get("task_id"),
"status": TaskStatus.FAILED.value,
"error": data.get("error"),
})
except Exception:
pass
return None
# ------------------------------------------------------------------
# Result retrieval
# ------------------------------------------------------------------
[docs]
async def get_result(
self,
task_id: str,
user_id: str | None = None,
) -> str:
"""Return the result for *task_id*, or a status update.
If *user_id* is set, only tasks owned by that user are returned.
"""
record = self._tasks.get(task_id)
if record is not None:
if user_id is not None and record.user_id != user_id:
return json.dumps(
{
"error": f"Task '{task_id}' not found.",
}
)
if record.status == TaskStatus.COMPLETED:
return record.result or ""
if record.status == TaskStatus.FAILED:
return json.dumps(
{
"task_id": task_id,
"status": TaskStatus.FAILED.value,
"error": record.error,
}
)
return json.dumps(
{
"task_id": task_id,
"tool_name": record.tool_name,
"status": TaskStatus.RUNNING.value,
"elapsed_seconds": round(
time.time() - record.created_at,
1,
),
}
)
# Check Redis for the new global Redis Hash format
if self.redis is not None:
try:
task_key = f"sg:task:{task_id}"
data = await self.redis.hgetall(task_key)
if data:
decoded = {
k.decode() if isinstance(k, bytes) else k:
v.decode() if isinstance(v, bytes) else v
for k, v in data.items()
}
rec_user_id = decoded.get("user_id", "")
if user_id is not None and rec_user_id != user_id:
return json.dumps({"error": f"Task '{task_id}' not found."})
status = decoded.get("status")
if status == TaskStatus.COMPLETED.value:
return decoded.get("result", "")
if status == TaskStatus.FAILED.value:
return json.dumps({
"task_id": task_id,
"status": TaskStatus.FAILED.value,
"error": decoded.get("error"),
})
created_at = float(decoded.get("created_at", time.time()))
return json.dumps({
"task_id": task_id,
"tool_name": decoded.get("tool_name", ""),
"status": TaskStatus.RUNNING.value,
"elapsed_seconds": round(time.time() - created_at, 1),
})
except Exception as e:
logger.warning("Redis lookup for task %s hash failed: %s", task_id, e)
# Fallback to legacy string format if still present
try:
cached = await self.redis.get(
f"{REDIS_KEY_PREFIX}{task_id}",
)
if cached:
allowed = self._redis_payload_allowed(cached, user_id)
if allowed is not None:
return allowed
if user_id is not None:
return json.dumps(
{
"error": f"Task '{task_id}' not found.",
}
)
except Exception:
logger.warning(
"Redis lookup failed for task %s",
task_id,
exc_info=True,
)
return json.dumps(
{
"error": f"Task '{task_id}' not found.",
}
)
[docs]
async def await_result(
self,
task_id: str,
timeout: float = 300.0,
user_id: str | None = None,
) -> str:
"""Block until *task_id* completes and return its result.
Unlike :meth:`get_result` which returns immediately with a status
update, this method **awaits** the underlying :class:`asyncio.Task`
so the caller's coroutine is suspended until the work finishes.
If *user_id* is set, only tasks owned by that user are awaited or
returned (same semantics as :meth:`get_result`).
Parameters
----------
timeout:
Maximum seconds to wait. Defaults to ``300`` (5 minutes).
If exceeded, a timeout error JSON envelope is returned.
"""
record = self._tasks.get(task_id)
# --- Already finished (in-memory) --------------------------------
if record is not None:
if user_id is not None and record.user_id != user_id:
return json.dumps(
{
"error": f"Task '{task_id}' not found.",
}
)
if record.status == TaskStatus.COMPLETED:
return record.result or ""
if record.status == TaskStatus.FAILED:
return json.dumps(
{
"task_id": task_id,
"status": TaskStatus.FAILED.value,
"error": record.error,
}
)
# --- Still running: await the asyncio.Task --------------------
atask = record.asyncio_task
if atask is not None:
try:
await asyncio.wait_for(
asyncio.shield(atask),
timeout=timeout,
)
except asyncio.TimeoutError:
return json.dumps(
{
"task_id": task_id,
"status": "timeout",
"error": (
f"Task '{task_id}' did not complete within "
f"{timeout}s. It is still running in the "
f"background — use check_task to poll later."
),
}
)
except asyncio.CancelledError:
pass # fall through to the status re-check below
except Exception:
pass # task raised; result captured by _on_task_done
# Re-read status — _on_task_done has updated the record.
if record.status == TaskStatus.COMPLETED:
return record.result or ""
if record.status == TaskStatus.FAILED:
return json.dumps(
{
"task_id": task_id,
"status": TaskStatus.FAILED.value,
"error": record.error,
}
)
# asyncio_task was already None (finished between our checks)
return await self.get_result(task_id, user_id=user_id)
# --- Not in memory: check Redis ----------------------------------
if self.redis is not None:
try:
task_key = f"sg:task:{task_id}"
data = await self.redis.hgetall(task_key)
if data:
decoded = {
k.decode() if isinstance(k, bytes) else k:
v.decode() if isinstance(v, bytes) else v
for k, v in data.items()
}
rec_user_id = decoded.get("user_id", "")
if user_id is not None and rec_user_id != user_id:
return json.dumps({"error": f"Task '{task_id}' not found."})
status = decoded.get("status")
if status == TaskStatus.COMPLETED.value:
return decoded.get("result", "")
if status == TaskStatus.FAILED.value:
return json.dumps({
"task_id": task_id,
"status": TaskStatus.FAILED.value,
"error": decoded.get("error"),
})
# If still running, subscribe to sg:task:done:{task_id}
channel = f"sg:task:done:{task_id}"
pubsub = self.redis.pubsub()
try:
await pubsub.subscribe(channel)
t0 = time.monotonic()
while time.monotonic() - t0 < timeout:
try:
msg = await pubsub.get_message(ignore_subscribe_messages=True, timeout=1.0)
if msg is not None:
res_data = await self.redis.hgetall(task_key)
if res_data:
res_decoded = {
k.decode() if isinstance(k, bytes) else k:
v.decode() if isinstance(v, bytes) else v
for k, v in res_data.items()
}
res_status = res_decoded.get("status")
if res_status == TaskStatus.COMPLETED.value:
return res_decoded.get("result", "")
if res_status == TaskStatus.FAILED.value:
return json.dumps({
"task_id": task_id,
"status": TaskStatus.FAILED.value,
"error": res_decoded.get("error"),
})
break
await asyncio.sleep(0.05)
except asyncio.CancelledError:
break
except Exception as ex:
return json.dumps({"error": f"Error awaiting task result: {ex}"})
else:
return json.dumps({
"task_id": task_id,
"status": "timeout",
"error": (
f"Task '{task_id}' did not complete within "
f"{timeout}s."
),
})
finally:
try:
await pubsub.unsubscribe(channel)
await pubsub.aclose()
except Exception:
pass
except Exception as e:
logger.warning("Redis pubsub await for task %s failed: %s", task_id, e)
# Fallback to legacy string format if still present
try:
cached = await self.redis.get(
f"{REDIS_KEY_PREFIX}{task_id}",
)
if cached:
allowed = self._redis_payload_allowed(cached, user_id)
if allowed is not None:
return allowed
except Exception:
logger.warning(
"Redis lookup failed for task %s",
task_id,
exc_info=True,
)
return json.dumps(
{
"error": f"Task '{task_id}' not found.",
}
)
[docs]
async def list_tasks(self, user_id: str | None = None) -> str:
"""Return a JSON summary of tracked tasks.
If *user_id* is provided, only tasks belonging to that user are
returned. Pass ``None`` to list all tasks.
"""
now = time.time()
tasks = []
for rec in self._tasks.values():
if user_id is not None and rec.user_id != user_id:
continue
entry: dict[str, Any] = {
"task_id": rec.task_id,
"tool_name": rec.tool_name,
"status": rec.status.value,
"user_id": rec.user_id,
"elapsed_seconds": round(
now - rec.created_at,
1,
),
"created_at": rec.created_at,
}
if rec.status == TaskStatus.FAILED:
entry["error"] = rec.error
tasks.append(entry)
return json.dumps(
{
"tasks": tasks,
"count": len(tasks),
}
)
# ------------------------------------------------------------------
# Output redirect
# ------------------------------------------------------------------
[docs]
def set_output_redirect(
self,
task_id: str,
channel_id: str,
platform: str,
adapter: "PlatformAdapter",
max_chars: int = 0,
) -> str | None:
"""Configure a task to deliver its result to *channel_id* on finish.
Returns an error string if the task is not found or already
finished, otherwise ``None``.
"""
record = self._tasks.get(task_id)
if record is None:
return f"Task '{task_id}' not found."
record.redirect_channel_id = channel_id
record.redirect_platform = platform
record.redirect_adapter = adapter
record.redirect_max_chars = max_chars
# If the task already finished before the redirect was set,
# deliver immediately.
if record.status in (TaskStatus.COMPLETED, TaskStatus.FAILED):
asyncio.create_task(self._deliver_output(record))
return None
# ------------------------------------------------------------------
# Internal
# ------------------------------------------------------------------
def _on_task_done(
self,
task_id: str,
task: asyncio.Task,
) -> None:
"""Finalize a backgrounded task's record when its coroutine settles.
Registered as the ``asyncio`` done-callback on the backgrounded task by
:meth:`execute`. Clears the live task handle, then transitions the
:class:`TaskRecord` to ``FAILED`` (on cancellation or exception, capturing
the error string) or ``COMPLETED`` (capturing ``str(result)``), logging at
the appropriate level. It emits a ``task_execution`` debug event via
``observability.publish_debug_event``, schedules :meth:`_persist_result`
when Redis is configured, schedules :meth:`_deliver_output` when an output
redirect was set, and finally calls :meth:`_prune_completed_memory_tasks`
to bound in-memory growth. Side effects on Redis, observability, and chat
delivery all run as fire-and-forget tasks so the callback itself stays
non-blocking.
Args:
task_id (str): Identifier of the finished task, used to look up its
record.
task (asyncio.Task): The settled task whose cancellation/exception/
result determines the final status.
"""
record = self._tasks.get(task_id)
if record is None:
return
record.asyncio_task = None
if task.cancelled():
record.status = TaskStatus.FAILED
record.error = "Task was cancelled."
logger.warning("Task %s was cancelled", task_id)
elif (exc := task.exception()) is not None:
record.status = TaskStatus.FAILED
record.error = f"{type(exc).__name__}: {exc}"
logger.error(
"Task %s failed: %s",
task_id,
exc,
exc_info=exc,
)
else:
record.status = TaskStatus.COMPLETED
record.result = str(task.result())
logger.info("Task %s completed", task_id)
# Emit task_execution telemetry for completed/failed outcomes
try:
from observability import publish_debug_event
_phase = "completed" if record.status == TaskStatus.COMPLETED else "failed"
_status = "ok" if record.status == TaskStatus.COMPLETED else "error"
_duration = (time.time() - record.created_at) * 1000
_payload: dict = {
"tool_name": record.tool_name,
"task_id": task_id,
"user_id": record.user_id,
}
if record.status == TaskStatus.FAILED and record.error:
_payload["error_type"] = (
record.error.split(":")[0] if ":" in record.error else record.error
)
_payload["error_message"] = record.error[:500]
asyncio.create_task(
publish_debug_event(
"task_execution",
"task_manager",
channel_id=record.channel_id,
platform=record.platform,
phase=_phase,
status=_status,
duration_ms=_duration,
preview=f"tool={record.tool_name} task_id={task_id}",
payload=_payload,
),
name="obs_task_execution",
)
except Exception:
pass
# Persist to Redis asynchronously
if self.redis is not None:
asyncio.create_task(self._persist_result(record))
# Deliver output to redirect channel if configured
if record.redirect_adapter is not None and record.redirect_channel_id:
asyncio.create_task(self._deliver_output(record))
self._prune_completed_memory_tasks()
def _prune_completed_memory_tasks(self) -> None:
"""Bound the in-memory registry by evicting old terminal task records.
Keeps ``self._tasks`` from growing without limit: when
``COMPLETED_TASK_MEMORY_TTL_SECONDS`` is enabled it first drops any
completed or failed record older than that TTL, then, regardless, evicts
the oldest terminal records (by creation time) beyond
``MAX_COMPLETED_TASKS``. Running tasks are never pruned, and Redis copies
(if persisted) are unaffected — only the in-process cache shrinks. Called
by :meth:`_on_task_done` after each task settles.
"""
now = time.time()
if COMPLETED_TASK_MEMORY_TTL_SECONDS > 0:
ttl = COMPLETED_TASK_MEMORY_TTL_SECONDS
for tid in [
t
for t, rec in self._tasks.items()
if rec.status in (TaskStatus.COMPLETED, TaskStatus.FAILED)
and (now - rec.created_at) > ttl
]:
del self._tasks[tid]
terminal = [
(rec.created_at, tid)
for tid, rec in self._tasks.items()
if rec.status in (TaskStatus.COMPLETED, TaskStatus.FAILED)
]
terminal.sort(key=lambda x: x[0])
excess = len(terminal) - MAX_COMPLETED_TASKS
if excess > 0:
for _, tid in terminal[:excess]:
self._tasks.pop(tid, None)
async def _persist_result(self, record: TaskRecord) -> None:
"""Persist a settled task's result to Redis and wake any waiters.
Writes the terminal outcome in two formats within a single Redis pipeline:
the globally coherent ``sg:task:{task_id}`` hash (status plus result or
error), a ``publish`` to ``sg:task:done:{task_id}`` that unblocks
:meth:`await_result` subscribers on other processes, and the legacy
``task_result:{task_id}`` JSON string value set with a
``REDIS_TTL_SECONDS`` expiry. Oversized completed bodies (above
``REDIS_PERSIST_RESULT_INLINE_MAX``) are stored with the result omitted and
a note that the full body is only available via ``check_task`` on the
owning process, and heavy-but-allowed payloads have their JSON serialized
off the event loop via :func:`asyncio.to_thread`. Any failure is caught and
logged as a warning. Scheduled fire-and-forget by :meth:`_on_task_done`.
Args:
record (TaskRecord): The completed or failed task to persist.
"""
try:
# 1. New globally coherent Redis Hash format: sg:task:{task_id}
task_key = f"sg:task:{record.task_id}"
mapping = {
"status": record.status.value,
}
if record.result is not None:
mapping["result"] = record.result
if record.error is not None:
mapping["error"] = record.error
pipe = self.redis.pipeline()
pipe.hset(task_key, mapping=mapping)
pipe.publish(f"sg:task:done:{record.task_id}", "1")
# 2. Legacy JSON string value key: task_result:{task_id}
if record.status == TaskStatus.COMPLETED:
body = record.result or ""
if len(body) > REDIS_PERSIST_RESULT_INLINE_MAX:
payload = {
"task_id": record.task_id,
"status": record.status.value,
"user_id": record.user_id,
"result": None,
"result_omitted": True,
"result_note": (
"Result too large for Redis; full body is still "
"available via check_task on this bot process."
),
}
value = json.dumps(payload)
elif len(body) > REDIS_PERSIST_JSON_OFFLOAD_MIN:
value = await asyncio.to_thread(
json.dumps,
{
"task_id": record.task_id,
"status": record.status.value,
"user_id": record.user_id,
"result": body,
},
)
else:
value = json.dumps(
{
"task_id": record.task_id,
"status": record.status.value,
"user_id": record.user_id,
"result": body,
}
)
else:
err = record.error or ""
if len(err) > REDIS_PERSIST_JSON_OFFLOAD_MIN:
value = await asyncio.to_thread(
json.dumps,
{
"task_id": record.task_id,
"status": record.status.value,
"user_id": record.user_id,
"error": err,
},
)
else:
value = json.dumps(
{
"task_id": record.task_id,
"status": record.status.value,
"user_id": record.user_id,
"error": err,
}
)
pipe.set(
f"{REDIS_KEY_PREFIX}{record.task_id}",
value,
ex=REDIS_TTL_SECONDS,
)
await pipe.execute()
except Exception:
logger.warning(
"Failed to persist task %s to Redis",
record.task_id,
exc_info=True,
)
async def _deliver_output(self, record: TaskRecord) -> None:
"""Send the task result to the configured redirect channel.
Splits across up to :data:`MAX_REDIRECT_MESSAGES` messages and
only truncates when that budget is exhausted.
"""
adapter = record.redirect_adapter
channel_id = record.redirect_channel_id
if adapter is None or not channel_id:
return
elapsed = round(time.time() - record.created_at, 1)
tool = record.tool_name or "unknown"
max_chars = record.redirect_max_chars or DEFAULT_REDIRECT_MAX_CHARS
if record.status == TaskStatus.COMPLETED:
body = record.result or "(empty result)"
truncated = len(body) > max_chars
if truncated:
body = body[:max_chars]
header = f"**`{tool}`** completed in {elapsed}s\n"
suffix = "\n…[truncated]" if truncated else ""
text = f"{header}```\n{body}{suffix}\n```"
elif record.status == TaskStatus.FAILED:
err = record.error or "unknown error"
text = f"**`{tool}`** failed after {elapsed}s\n" f"```\n{err}\n```"
else:
return
chunks = split_message(text, max_length=1950)
try:
for chunk in chunks[:MAX_REDIRECT_MESSAGES]:
await adapter.send(channel_id, chunk)
logger.info(
"Delivered task %s output (%d msg) to %s:%s",
record.task_id,
min(len(chunks), MAX_REDIRECT_MESSAGES),
record.redirect_platform,
channel_id,
)
# Emit redirect_delivered event (fire-and-forget)
try:
from observability import publish_debug_event as _pde
asyncio.create_task(
_pde(
"task_execution",
"task_manager",
phase="redirect_delivered",
status="ok",
channel_id=channel_id,
platform=record.redirect_platform or "",
preview=(
f"tool={tool} chunks={min(len(chunks), MAX_REDIRECT_MESSAGES)} "
f"elapsed={elapsed}s"
),
payload={
"redirect_channel_id": channel_id,
"redirect_platform": record.redirect_platform or "",
"chunks_sent": min(len(chunks), MAX_REDIRECT_MESSAGES),
"task_id": record.task_id,
"tool_name": tool,
},
)
)
except Exception:
pass
except Exception:
logger.exception(
"Failed to deliver task %s output to %s:%s",
record.task_id,
record.redirect_platform,
channel_id,
)