"""Upload a file to a Discord channel, thread, or DM.
Handles all known Discord upload failure states including DM
resolution, file size limits (per guild boost tier), archived/locked
threads, forum/voice channels, rate limits, and transient network
errors.
"""
from __future__ import annotations
import asyncio
import io
import logging
import math
import os
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from tool_context import ToolContext
logger = logging.getLogger(__name__)
TOOL_NAME = "discord_upload_file"
TOOL_DESCRIPTION = (
"Uploads a file to a Discord channel, thread, or DM. Can create "
"a file from text content or upload an existing file from disk. "
"For DMs, you may pass a user_id instead of (or in addition to) "
"the channel_id."
)
TOOL_PARAMETERS = {
"type": "object",
"properties": {
"channel_id": {
"type": "string",
"description": (
"The channel or thread ID to upload to. "
"For DMs you can also pass the DM channel ID."
),
},
"user_id": {
"type": "string",
"description": (
"Optional Discord user ID. When provided: if "
"channel_id resolves to a valid channel, user_id "
"is ignored; otherwise a DM is opened to this user "
"and the file is uploaded there. You may omit "
"channel_id entirely and provide only user_id to "
"send a file via DM."
),
},
"content": {
"type": "string",
"description": (
"String content to upload as a file. " "Requires 'filename' to be set."
),
},
"filename": {
"type": "string",
"description": ("Name for the file when uploading from 'content'."),
},
"filepath": {
"type": "string",
"description": ("Local path to an existing file to upload."),
},
},
"required": [],
}
ALLOWED_UPLOAD_DIR = os.environ.get(
"ALLOWED_UPLOAD_DIR",
"/home/star/large_files",
)
# Discord upload limits by guild premium tier (bytes).
_UPLOAD_LIMITS = {
0: 25 * 1024 * 1024, # No boost – 25 MB
1: 25 * 1024 * 1024, # Tier 1 – 25 MB
2: 50 * 1024 * 1024, # Tier 2 – 50 MB
3: 100 * 1024 * 1024, # Tier 3 – 100 MB
}
_DM_UPLOAD_LIMIT = 25 * 1024 * 1024 # DMs: 25 MB
# Maximum retries for transient failures.
_MAX_RETRIES = 1
# ------------------------------------------------------------------
# Helpers
# ------------------------------------------------------------------
def _get_upload_limit(channel) -> int:
"""Return the maximum allowed upload size in bytes for *channel*.
Discord caps attachment size by guild premium (boost) tier, so the limit
depends on which guild the channel belongs to. This reads
``channel.guild.premium_tier`` and maps it through :data:`_UPLOAD_LIMITS`;
a channel with no guild (a DM) falls back to :data:`_DM_UPLOAD_LIMIT`. Pure
lookup with no I/O. Called by :func:`run` to size the upload before sending
and by :func:`_send_file_with_retry` to build the message when Discord
rejects an upload with HTTP 413.
Args:
channel: A resolved Discord channel, thread, or DM object.
Returns:
int: The upload ceiling in bytes for that channel's tier (or the DM
limit when there is no guild).
"""
guild = getattr(channel, "guild", None)
if guild is None:
return _DM_UPLOAD_LIMIT
return _UPLOAD_LIMITS.get(guild.premium_tier, _DM_UPLOAD_LIMIT)
def _friendly_size(size_bytes: int) -> str:
"""Format a byte count as a short human-readable size string.
Used purely for user-facing error and status messages so limits and file
sizes read as ``"24.0 MB"`` rather than raw byte counts. Chooses bytes, KB,
or MB by magnitude and rounds to one decimal place for KB/MB. Pure, no I/O.
Called by :func:`run` and :func:`_send_file_with_retry` when composing
over-limit error text.
Args:
size_bytes: A size in bytes.
Returns:
str: The size rendered as ``"N B"``, ``"N.N KB"``, or ``"N.N MB"``.
"""
if size_bytes < 1024:
return f"{size_bytes} B"
elif size_bytes < 1024 * 1024:
return f"{size_bytes / 1024:.1f} KB"
else:
return f"{size_bytes / (1024 * 1024):.1f} MB"
def _check_channel_sendable(channel) -> str | None:
"""Validate *channel* can receive file uploads.
Returns ``None`` if OK, or an error message string.
"""
import discord
# Forum channels require posting inside a thread.
if isinstance(channel, discord.ForumChannel):
return (
"Error: Cannot upload directly to a forum channel. "
"Please specify a thread (post) within the forum."
)
# Voice / stage channels don't support text messages.
if isinstance(channel, (discord.VoiceChannel, discord.StageChannel)):
return "Error: Cannot upload files to a voice or stage " "channel."
# Category channels are organizational containers.
if isinstance(channel, discord.CategoryChannel):
return (
"Error: Cannot upload files to a category channel. "
"Please specify a text channel within the category."
)
# Threads: check archived / locked state.
if isinstance(channel, discord.Thread):
if channel.locked:
return (
f"Error: Thread '{channel.name}' is locked. "
f"Files cannot be uploaded to locked threads."
)
if channel.archived:
# We'll attempt to unarchive and proceed; if that fails
# the send() call will raise Forbidden and we handle it.
pass
# Final generic check.
if not hasattr(channel, "send"):
return f"Error: Channel '{channel.id}' does not support " f"sending messages."
return None
async def _resolve_channel(client, channel_id, user_id):
"""Resolve a sendable channel from *channel_id* and/or *user_id*.
Returns the channel object or a ``str`` error message.
"""
from tools._discord_helpers import (
get_messageable_channel,
resolve_dm_channel,
)
# Try channel_id first. Use the DM-aware helper so an uncached DM
# snowflake resolves to a sendable PartialMessageable instead of
# failing with "permission denied" on fetch_channel.
if channel_id:
result = await get_messageable_channel(client, channel_id)
if not isinstance(result, str):
return result
# channel_id failed — fall through to user_id if available.
channel_error = result
else:
channel_error = None
# Fallback: open a DM via user_id.
if user_id:
dm_result = await resolve_dm_channel(client, user_id)
if not isinstance(dm_result, str):
return dm_result
# Both failed — return the DM error (more relevant).
return dm_result
# No user_id fallback available.
if channel_error:
return channel_error
return "Error: You must provide either 'channel_id' or " "'user_id'."
async def _send_file_with_retry(channel, discord_file, retries=_MAX_RETRIES):
"""Send *discord_file* to *channel* with retry on transient failures.
Returns ``None`` on success or a ``str`` error message.
"""
import discord
import aiohttp
last_error = None
for attempt in range(1 + retries):
try:
await channel.send(file=discord_file)
return None # success
except discord.errors.HTTPException as exc:
if exc.status == 429:
# Rate-limited — wait and retry.
retry_after = getattr(exc, "retry_after", 5.0) or 5.0
logger.warning(
"Rate-limited uploading to %s, retrying in %.1fs",
getattr(channel, "name", channel.id),
retry_after,
)
if attempt < retries:
await asyncio.sleep(retry_after)
# discord.File objects can only be sent once;
# the fp is already consumed so we cannot retry
# without re-creating the file. Return the error.
return (
f"Error: Rate-limited by Discord. Please "
f"wait {retry_after:.0f}s and try again."
)
return (
f"Error: Rate-limited by Discord (retry after "
f"{retry_after:.0f}s)."
)
elif exc.status == 413:
limit = _get_upload_limit(channel)
return (
f"Error: File exceeds Discord's upload limit "
f"({_friendly_size(limit)}) for this server."
)
elif exc.code == 50006:
return (
"Error: Discord rejected the upload — the "
"message was considered empty. Ensure the file "
"has content."
)
elif exc.code == 50007:
return (
"Error: Cannot send a DM to this user. "
"They may have DMs disabled or have blocked "
"the bot."
)
else:
last_error = exc
logger.warning(
"HTTPException %s/%s uploading file: %s",
exc.status,
exc.code,
exc,
)
if attempt < retries:
await asyncio.sleep(1.0)
continue
except discord.errors.Forbidden:
ch_name = getattr(channel, "name", channel.id)
return (
f"Error: I don't have permission to upload files "
f"to channel '{ch_name}'."
)
except (aiohttp.ClientError, asyncio.TimeoutError) as exc:
logger.warning(
"Transient network error uploading file: %s",
exc,
)
last_error = exc
if attempt < retries:
await asyncio.sleep(2.0)
continue
if last_error:
return f"Error: Upload failed after retries: {last_error}"
return "Error: Upload failed for an unknown reason."
# ------------------------------------------------------------------
# Entry point
# ------------------------------------------------------------------
[docs]
async def run(
channel_id: str | None = None,
user_id: str | None = None,
content: str | None = None,
filename: str | None = None,
filepath: str | None = None,
ctx: ToolContext | None = None,
) -> str:
"""Execute this tool and return the result.
Args:
channel_id (str | None): Discord/Matrix channel identifier.
user_id (str | None): Unique identifier for the user.
content (str | None): Content data.
filename (str | None): The filename value.
filepath (str | None): The filepath value.
ctx (ToolContext | None): Tool execution context providing access to bot internals.
Returns:
str: Result string.
"""
import discord
from tools._discord_helpers import require_discord_client
client = require_discord_client(ctx)
if isinstance(client, str):
return client
# ----------------------------------------------------------
# 1. Resolve channel
# ----------------------------------------------------------
channel = await _resolve_channel(client, channel_id, user_id)
if isinstance(channel, str):
return channel
# ----------------------------------------------------------
# 2. Validate channel type
# ----------------------------------------------------------
send_check = _check_channel_sendable(channel)
if send_check is not None:
return send_check
# ----------------------------------------------------------
# 3. Unarchive thread if needed
# ----------------------------------------------------------
if isinstance(channel, discord.Thread) and channel.archived:
try:
await channel.edit(archived=False)
logger.info(
"Unarchived thread '%s' for file upload",
channel.name,
)
except discord.errors.Forbidden:
return (
f"Error: Thread '{channel.name}' is archived and "
f"I don't have permission to unarchive it."
)
except Exception as exc:
return f"Error: Failed to unarchive thread " f"'{channel.name}': {exc}"
# ----------------------------------------------------------
# 4. Determine upload limit
# ----------------------------------------------------------
max_bytes = _get_upload_limit(channel)
ch_name = getattr(channel, "name", None) or (
f"DM ({getattr(channel, 'recipient', 'unknown')})"
)
# ----------------------------------------------------------
# 5. Upload from content + filename
# ----------------------------------------------------------
if content is not None and filename:
try:
data = content.encode("utf-8", errors="replace")
except Exception as exc:
return f"Error encoding content: {exc}"
data_len = len(data)
if data_len == 0:
return "Error: Cannot upload an empty file."
# Auto-split if content exceeds the limit.
if data_len > max_bytes:
return await _upload_split_content(
channel,
content,
filename,
max_bytes,
)
f = discord.File(io.BytesIO(data), filename=filename)
err = await _send_file_with_retry(channel, f)
if err:
return err
return f"Successfully uploaded file '{filename}' to " f"channel '{ch_name}'."
if content is not None and not filename:
return (
"Error: 'filename' is required when uploading from "
"'content'. Please provide a filename (e.g., "
"'output.txt')."
)
if filename and content is None and not filepath:
return (
"Error: 'content' is required when 'filename' is "
"provided without 'filepath'."
)
# ----------------------------------------------------------
# 6. Upload from filepath
# ----------------------------------------------------------
if filepath:
resolved = os.path.realpath(os.path.expanduser(filepath))
allowed = os.path.realpath(ALLOWED_UPLOAD_DIR)
if not resolved.startswith(allowed + os.sep) and resolved != allowed:
return (
f"Error: Access denied. Files can only be "
f"uploaded from within {ALLOWED_UPLOAD_DIR}"
)
if not await asyncio.to_thread(os.path.exists, resolved):
return f"Error: File not found at path: {filepath}"
if not await asyncio.to_thread(os.path.isfile, resolved):
return f"Error: Path is not a file: {filepath}"
if not await asyncio.to_thread(os.access, resolved, os.R_OK):
return f"Error: Permission denied reading file: " f"{filepath}"
file_size = await asyncio.to_thread(os.path.getsize, resolved)
if file_size == 0:
return f"Error: File is empty: {filepath}"
if file_size > max_bytes:
return (
f"Error: File '{os.path.basename(resolved)}' is "
f"{_friendly_size(file_size)}, which exceeds the "
f"upload limit of {_friendly_size(max_bytes)} for "
f"this {'server' if hasattr(channel, 'guild') and channel.guild else 'DM'}."
)
# Use a context-managed file to ensure cleanup on failure.
f = discord.File(resolved)
try:
err = await _send_file_with_retry(channel, f)
finally:
f.close()
if err:
return err
return (
f"Successfully uploaded file from '{filepath}' " f"to channel '{ch_name}'."
)
# ----------------------------------------------------------
# 7. Nothing provided
# ----------------------------------------------------------
return "Error: Provide either 'content' and 'filename', or a " "'filepath'."
# ------------------------------------------------------------------
# Split-upload for oversized text content
# ------------------------------------------------------------------
async def _upload_split_content(
channel,
content: str,
filename: str,
max_bytes: int,
) -> str:
"""Split oversized text *content* into multiple files and upload each part.
Fallback path for when text supplied via ``content`` exceeds the channel's
upload limit: rather than rejecting it, the content is chunked so every
piece fits, and the parts are uploaded as ``name_part1.ext``,
``name_part2.ext``, and so on. It encodes *content* to UTF-8, derives the
part count against *max_bytes* (less a 1 KB metadata margin), and sends each
chunk via :func:`_send_file_with_retry`, performing outbound Discord network
I/O. If a part fails it stops and reports how many parts made it through.
Called by :func:`run` when ``content`` is larger than the limit returned by
:func:`_get_upload_limit`.
Args:
channel: The resolved, send-capable Discord channel/thread/DM.
content: The text to upload, split across parts as needed.
filename: Base filename; a ``_partN`` suffix is inserted before the
extension when more than one part is produced.
max_bytes: The channel's upload ceiling in bytes (from
:func:`_get_upload_limit`).
Returns:
str: A success summary naming the channel and part count, or the first
part's error message (annotated with how many parts uploaded before it)
on failure.
"""
import discord
# Leave a 1 KB margin for metadata overhead.
effective_limit = max_bytes - 1024
if effective_limit < 1024:
effective_limit = max_bytes
data = content.encode("utf-8", errors="replace")
total = len(data)
num_parts = math.ceil(total / effective_limit)
ch_name = getattr(channel, "name", None) or (
f"DM ({getattr(channel, 'recipient', 'unknown')})"
)
base, ext = os.path.splitext(filename)
uploaded = 0
for i in range(num_parts):
start = i * effective_limit
end = min(start + effective_limit, total)
chunk = data[start:end]
part_name = f"{base}_part{i + 1}{ext}" if num_parts > 1 else filename
f = discord.File(
io.BytesIO(chunk),
filename=part_name,
)
err = await _send_file_with_retry(channel, f)
if err:
# Report partial success if some parts uploaded.
if uploaded > 0:
return (
f"{err} — {uploaded}/{num_parts} parts were "
f"uploaded before the error."
)
return err
uploaded += 1
return (
f"Successfully uploaded '{filename}' to channel "
f"'{ch_name}' (split into {num_parts} parts due to "
f"size limit)."
)