"""Message utility functions for Discord bot.
Includes markdown-aware message splitting that preserves formatting across
chunk boundaries, plus helper functions for mention filtering and XML escaping.
"""
import re
from typing import List
# User / role / channel snowflakes — whitespace may appear after "<", inside
# the ID, or both (LLM line breaks). Opener alternation: @! before @& before @.
_WHITESPACE_SPLIT_DISCORD_MENTION_RE = re.compile(r"<\s*(@!|@&|@|#)\s*([\d\s]+)>")
[docs]
def escape_xml(text: str) -> str:
"""Escape the five XML special characters so text is safe inside markup.
Replaces ``&``, ``<``, ``>``, double-quote, and apostrophe with their XML
entity forms (``&`` first so already-escaped entities are not doubled) so
arbitrary text can be embedded as element content or an attribute value
without breaking the surrounding XML. Pure string transformation with no
side effects.
No callers of this module-level function were found in the repo (other
modules define their own ``_escape_xml`` methods); it is a public utility
available for callers that need standalone XML escaping.
Args:
text (str): The raw text to escape.
Returns:
str: The XML-escaped text.
"""
return (
text.replace("&", "&")
.replace("<", "<")
.replace(">", ">")
.replace('"', """)
.replace("'", "'")
)
[docs]
def repair_whitespace_split_discord_mentions(text: str) -> str:
"""Remove whitespace inside numeric Discord mentions so they render.
Collapses spaces, tabs, newlines (including ``\\r\\n`` and runs of blank
lines) after the opening ``<``, within the snowflake, or both (e.g.
``<\\n@123\\n456>``). Does not match ``<@everyone>`` / similar
(non-digit bodies). Skips fenced code blocks like
``filter_backticks_from_mentions``.
"""
if not isinstance(text, str):
return text
def _dedupe_inner(match: re.Match[str]) -> str:
"""Rebuild one whitespace-broken Discord mention into a clean token.
Strips every whitespace character from the snowflake body (capture
group 2) of a ``_WHITESPACE_SPLIT_DISCORD_MENTION_RE`` match and
re-emits it as ``<{opener}{digits}>`` using the captured opener
(``@!``, ``@&``, ``@``, or ``#`` — capture group 1). If removing
whitespace leaves no digits, the original matched text is returned
unchanged so non-numeric or empty bodies are never corrupted. Pure
string transformation with no side effects.
Defined as the closure passed to ``re.sub`` inside
:func:`_apply_segment`; it has no callers outside this enclosing
function.
Args:
match: A regex match from ``_WHITESPACE_SPLIT_DISCORD_MENTION_RE``
where group 1 is the mention opener and group 2 is the
whitespace-laden snowflake.
Returns:
str: The collapsed ``<opener+digits>`` mention, or the original
matched text when no digits remain.
"""
digits = re.sub(r"\s+", "", match.group(2))
if not digits:
return match.group(0)
return f"<{match.group(1)}{digits}>"
def _apply_segment(segment: str) -> str:
"""Repair every whitespace-split Discord mention in a non-fenced segment.
Runs ``_WHITESPACE_SPLIT_DISCORD_MENTION_RE.sub`` over *segment*,
delegating each match to :func:`_dedupe_inner` so spaces, tabs, and
newlines inside numeric mentions are collapsed. Pure string
transformation with no side effects; the enclosing
:func:`repair_whitespace_split_discord_mentions` only feeds it text that
lies outside fenced code blocks, so literal mention examples in code
samples are left untouched.
Called twice by :func:`repair_whitespace_split_discord_mentions` (for
the text before each fence and for the trailing tail); no callers
outside that enclosing function.
Args:
segment: A run of message text known to be outside any triple
backtick code fence.
Returns:
str: The segment with all whitespace-broken numeric mentions
repaired.
"""
return _WHITESPACE_SPLIT_DISCORD_MENTION_RE.sub(_dedupe_inner, segment)
out: list[str] = []
pos = 0
fence = "```"
while pos <= len(text):
nxt = text.find(fence, pos)
if nxt < 0:
out.append(_apply_segment(text[pos:]))
break
out.append(_apply_segment(text[pos:nxt]))
close = text.find(fence, nxt + 3)
if close < 0:
out.append(text[nxt:])
break
out.append(text[nxt : close + 3])
pos = close + 3
return "".join(out)
[docs]
def filter_backticks_from_mentions(text: str) -> str:
"""Strip backticks wrapping a Discord user mention so it renders as a ping.
The LLM sometimes emits a mention inside inline code (e.g. backtick-wrapped
``<@123>``), which Discord shows literally instead of as a real mention.
This walks the text fence by fence and, only in the non-fenced segments,
removes one-or-more backticks immediately surrounding a ``<@123>`` or
``<@!123>`` token; genuine fenced code blocks are passed through untouched so
deliberate mention examples in code samples stay intact. Pure string
transformation with no side effects.
Called by ``response_postprocessor.py`` while cleaning up an outgoing model
response (alongside :func:`repair_whitespace_split_discord_mentions`), and
exercised by the message-utils tests.
Args:
text (str): The candidate message text to clean.
Returns:
str: The text with backticks stripped from real mentions; returned
unchanged if *text* is not a :class:`str`.
"""
if not isinstance(text, str):
return text
pattern = re.compile(r"`+(<@!?\d+>)`+")
out: list[str] = []
pos = 0
fence = "```"
while pos <= len(text):
nxt = text.find(fence, pos)
if nxt < 0:
out.append(pattern.sub(r"\1", text[pos:]))
break
out.append(pattern.sub(r"\1", text[pos:nxt]))
close = text.find(fence, nxt + 3)
if close < 0:
out.append(text[nxt:])
break
out.append(text[nxt : close + 3])
pos = close + 3
return "".join(out)
[docs]
def split_message(
text: str,
max_length: int = 1950,
overflow_allowed: int = 45,
) -> List[str]:
"""Split a long message into Discord-sendable chunks without breaking markdown.
The public entry point for chunking an outgoing response so each piece fits
under Discord's per-message length limit. It first honours manual split
markers (``{{ SPLIT_HERE }}`` and ``---`` lines), converts backtick newline
joiners outside fences via
:func:`_replace_backtick_newline_joiners_outside_fences`, then hands each
segment to :func:`_split_with_markdown_awareness`, which closes and re-opens
active formats (bold, italic, code blocks, etc.) across chunk boundaries so
Discord still renders them correctly. Includes fallbacks so a message that is
all markers or otherwise yields no chunks still produces non-empty output
rather than silently dropping the send. Pure string computation.
Called by the Discord platforms (``platforms/discord.py``,
``platforms/discord_self.py``), ``task_manager.py``, and
``tools/_egregore_discord.py`` before sending, and covered by
``tests/test_message_utils_split.py``.
Args:
text (str): The full message text to chunk.
max_length (int): Target maximum length per chunk (default ``1950``).
overflow_allowed (int): Extra characters a chunk may exceed *max_length*
by before a hard cut, to avoid corrupting markdown (default ``45``).
Returns:
List[str]: The ordered list of message chunks; a single
error/placeholder element for non-string or empty input.
"""
if not isinstance(text, str):
return ["Error: Invalid message content"]
if not text:
return [""]
original = text
hard_limit = max_length + overflow_allowed
final_chunks: List[str] = []
text = text.replace("---\n", "{{ SPLIT_HERE }}")
text = _replace_backtick_newline_joiners_outside_fences(text)
manual_split_parts = text.split("{{ SPLIT_HERE }}")
for text_part in manual_split_parts:
text_part = text_part.strip()
if not text_part:
continue
processed_text = text_part.replace("'''", "```")
chunks = _split_with_markdown_awareness(processed_text, max_length, hard_limit)
final_chunks.extend(c for c in chunks if c)
# Manual segments were all empty (e.g. hundreds of ``---\\n`` with no text
# between) but *original* still has characters — avoid returning [] so
# Discord ``send`` does not skip every ``channel.send``.
if not final_chunks and original.strip():
processed_whole = original.replace("'''", "```")
chunks_fb = _split_with_markdown_awareness(
processed_whole, max_length, hard_limit
)
final_chunks = [c for c in chunks_fb if c]
if not final_chunks and original.strip():
plain = original.strip()
final_chunks = [
plain[i : i + max_length] for i in range(0, len(plain), max_length)
]
return final_chunks
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
def _replace_backtick_newline_joiners_outside_fences(text: str) -> str:
"""Turn backtick-wrapped blank lines into split markers, but only outside fences.
Detects the ``backtick, newline(s), backtick`` joiner pattern the model
sometimes emits between inline-code spans and replaces it with a
``{{ SPLIT_HERE }}`` marker so :func:`split_message` breaks there cleanly.
Critically it walks the text fence by fence and only rewrites the non-fenced
segments — an earlier global replace also fired inside multiline code blocks,
corrupting their rendering — so genuine triple-backtick blocks pass through
verbatim. Pure string transformation with no side effects.
Called once by :func:`split_message` before it splits on the manual markers.
Args:
text (str): The message text being prepared for splitting.
Returns:
str: The text with out-of-fence backtick newline joiners replaced by
split markers.
"""
out: list[str] = []
pos = 0
fence = "```"
while pos <= len(text):
nxt = text.find(fence, pos)
if nxt < 0:
tail = text[pos:]
tail = tail.replace("`\n\n`", "{{ SPLIT_HERE }}").replace(
"`\n`", "{{ SPLIT_HERE }}"
)
out.append(tail)
break
head = text[pos:nxt]
head = head.replace("`\n\n`", "{{ SPLIT_HERE }}").replace(
"`\n`", "{{ SPLIT_HERE }}"
)
out.append(head)
close = text.find(fence, nxt + 3)
if close < 0:
out.append(text[nxt:])
break
out.append(text[nxt : close + 3])
pos = close + 3
return "".join(out)
def _tokenize_line_respecting_inline_code(line: str) -> List[str]:
"""Split a line into whitespace-separated tokens, keeping inline code spans whole.
Scans the line character by character, treating a backtick-delimited span
(with matching open/close backtick runs) as a single indivisible token even
when it contains spaces, so a naive ``split`` cannot fracture ``a b`` inside
backticks into two tokens and desync the chunk-boundary and format-tracking
logic. An unterminated span is emitted as one trailing token. Pure
computation with no side effects.
Called by :func:`_add_line_with_format_tracking` to tokenize each line it is
about to pack into a chunk.
Args:
line (str): A single line of message text (no embedded newlines).
Returns:
List[str]: The tokens, with whitespace stripped between them and inline
code spans preserved intact.
"""
tokens: list[str] = []
n = len(line)
i = 0
while i < n:
if line[i].isspace():
i += 1
continue
if line[i] == "`":
run_start = i
bt = 0
while i < n and line[i] == "`":
bt += 1
i += 1
closer = "`" * bt
close_idx = line.find(closer, i)
if close_idx < 0:
tokens.append(line[run_start:])
break
i = close_idx + bt
tokens.append(line[run_start:i])
continue
run_start = i
while i < n and not line[i].isspace() and line[i] != "`":
i += 1
tokens.append(line[run_start:i])
return tokens
def _split_oversized_token(token: str, hard_limit: int) -> List[str]:
"""Break a single over-long token into pieces, never splitting inside inline code.
Handles the rare case of one token longer than *hard_limit* (e.g. a giant URL
or a huge inline code span). A well-formed N-backtick code span has its inner
payload sliced into multiple spans that each re-wrap with the same number of
backticks, so every piece stays valid markdown. A plain token with no
backticks is sliced at fixed width. A token with backticks that is *not* a
clean span is returned whole — possibly slightly over the limit — rather than
risk cutting through a code delimiter. Pure computation with no side effects.
Called by :func:`_add_line_with_format_tracking` when a token is at least
*hard_limit* characters long.
Args:
token (str): The single oversized token to break up.
hard_limit (int): The maximum length each resulting piece should target.
Returns:
List[str]: One or more pieces; a one-element list when the token fits or
cannot be safely split.
"""
if len(token) <= hard_limit:
return [token]
bt = 0
while bt < len(token) and token[bt] == "`":
bt += 1
if bt == 0:
return [token[i : i + hard_limit] for i in range(0, len(token), hard_limit)]
if len(token) < 2 * bt or token[-bt:] != "`" * bt:
return [token]
inner = token[bt:-bt]
wrap = "`" * bt
max_inner = hard_limit - 2 * bt
if max_inner < 1:
return [token]
parts: list[str] = []
for j in range(0, len(inner), max_inner):
chunk_inner = inner[j : j + max_inner]
parts.append(f"{wrap}{chunk_inner}{wrap}")
return parts
def _split_with_markdown_awareness(
text: str,
target_length: int,
hard_limit: int,
) -> List[str]:
"""Chunk text under a length target while keeping markdown valid across cuts.
The core splitter behind :func:`split_message`. It walks the text line by
line, tracking whether it is inside a fenced code block (and that block's
language) and which inline formats are currently open, accumulating a current
chunk until it would exceed *target_length*. Code fences are properly closed
before a split and re-opened with the same language on the next chunk; inline
line packing is delegated to :func:`_add_line_with_format_tracking`, which in
turn relies on :func:`_close_formats`/:func:`_open_formats` so bold, italic,
strikethrough, spoiler, and inline-code runs survive each boundary. Pure
computation with no side effects.
Called by :func:`split_message` for each manually-split segment (and for the
whole-message fallback path).
Args:
text (str): The (already marker-split) text segment to chunk.
target_length (int): The soft length each chunk aims to stay under.
hard_limit (int): The absolute cap used when an individual token or
closed chunk would otherwise overrun.
Returns:
List[str]: The ordered, markdown-balanced chunks for this segment.
"""
chunks: List[str] = []
current_chunk = ""
in_code_block = False
code_block_lang = ""
active_formats: List[str] = []
lines = text.split("\n")
i = 0
while i < len(lines):
line = lines[i]
if "```" in line:
parts = line.split("```")
for j, part in enumerate(parts):
if j > 0:
if in_code_block:
current_chunk = current_chunk.rstrip("\n") + "\n```"
in_code_block = False
code_block_lang = ""
if len(current_chunk) > target_length:
chunks.append(current_chunk.strip())
current_chunk = ""
else:
in_code_block = True
stripped = part.lstrip("\n")
first_nl = stripped.find("\n")
if first_nl == -1:
first_line = stripped
body = ""
else:
first_line = stripped[:first_nl]
body = stripped[first_nl + 1 :]
ftoks = first_line.split(None, 1)
code_block_lang = ftoks[0] if ftoks else ""
first_rem = ftoks[1] if len(ftoks) > 1 else ""
if len(current_chunk) > 0:
current_chunk = _close_formats(
current_chunk, active_formats
)
chunks.append(current_chunk.strip())
current_chunk = ""
current_chunk = f"```{code_block_lang}\n"
if first_rem:
current_chunk += first_rem + "\n"
if body:
current_chunk += (
body if body.endswith("\n") else body + "\n"
)
active_formats.clear()
continue
if part or j == 0:
if in_code_block:
current_chunk += part
else:
if part.strip():
current_chunk, active_formats = (
_add_line_with_format_tracking(
current_chunk,
part,
active_formats,
target_length,
hard_limit,
chunks,
)
)
i += 1
continue
if in_code_block:
potential_chunk = current_chunk + line + "\n"
if len(potential_chunk) > target_length:
current_chunk = current_chunk.rstrip() + "\n```"
chunks.append(current_chunk)
current_chunk = f"```{code_block_lang}\n{line}\n"
else:
current_chunk = potential_chunk
else:
current_chunk, active_formats = _add_line_with_format_tracking(
current_chunk,
line,
active_formats,
target_length,
hard_limit,
chunks,
)
i += 1
if current_chunk.strip():
if in_code_block:
current_chunk += "\n```"
else:
current_chunk = _close_formats(current_chunk, active_formats)
chunks.append(current_chunk.strip())
return chunks
def _add_line_with_format_tracking(
current_chunk: str,
line: str,
active_formats: List[str],
target_length: int,
hard_limit: int,
chunks: List[str],
) -> tuple:
"""Pack one line's tokens into the current chunk, flushing and tracking formats.
The inline-text workhorse of :func:`_split_with_markdown_awareness`. It
tokenizes the line with :func:`_tokenize_line_respecting_inline_code`, then
appends tokens to *current_chunk*; when adding a token would exceed
*target_length* it closes the open formats via :func:`_close_formats`, pushes
the finished chunk onto *chunks*, and re-opens those formats via
:func:`_open_formats` on the fresh chunk so markdown spans continue cleanly.
Tokens at least *hard_limit* long are broken with
:func:`_split_oversized_token`, and the set of open formats is updated per
token via :func:`_update_formats_from_word` (skipping a leading list bullet so
``*``/``-`` markers are not misread as emphasis). It mutates *chunks* in place
and returns the rolling chunk and format state.
Called by :func:`_split_with_markdown_awareness` for each non-fenced line.
Args:
current_chunk (str): The chunk being built so far.
line (str): The line of text to append.
active_formats (List[str]): Currently-open markdown format markers.
target_length (int): Soft per-chunk length target that triggers a flush.
hard_limit (int): Absolute cap for an individual token or closed chunk.
chunks (List[str]): Output list of completed chunks, appended to in place.
Returns:
tuple: The updated ``(current_chunk, active_formats)`` pair to carry into
the next line.
"""
words = _tokenize_line_respecting_inline_code(line)
is_list_item = False
if words and len(words) > 1:
first_word = words[0]
if first_word in ("*", "-", "+"):
is_list_item = True
elif re.match(r"^\d+\.$", first_word):
is_list_item = True
for idx, word in enumerate(words):
if len(word) >= hard_limit:
if current_chunk:
current_chunk = _close_formats(current_chunk, active_formats)
chunks.append(current_chunk.strip())
current_chunk = ""
current_chunk = _open_formats(current_chunk, active_formats)
for piece in _split_oversized_token(word, hard_limit):
chunks.append(piece)
current_chunk = ""
current_chunk = _open_formats(current_chunk, active_formats)
continue
format_overhead = sum(len(tag) for tag in active_formats) * 2
space = " " if current_chunk and not current_chunk.endswith("\n") else ""
potential_length = len(current_chunk) + len(space) + len(word) + format_overhead
if potential_length > target_length and current_chunk:
current_chunk = _close_formats(current_chunk, active_formats)
if len(current_chunk) > hard_limit:
current_chunk = current_chunk[:hard_limit]
chunks.append(current_chunk.strip())
current_chunk = ""
current_chunk = _open_formats(current_chunk, active_formats)
if current_chunk and not current_chunk.endswith("\n"):
current_chunk += " "
current_chunk += word
if not (is_list_item and idx == 0):
active_formats = _update_formats_from_word(word, active_formats)
current_chunk += "\n"
return current_chunk, active_formats
def _update_formats_from_word(word: str, active_formats: List[str]) -> List[str]:
"""Recompute which markdown formats are open after consuming one word.
Toggles format markers as their delimiters are seen so the splitter knows
what must be closed at a chunk boundary and re-opened afterwards. It first
handles inline code (single- and multi-backtick spans, tracked by backtick
count) and short-circuits while inside one — text within code is literal —
then toggles ``**``, ``__``, ``~~``, and ``||`` by occurrence count, and
finally single ``*`` and ``_`` only when they sit at a word boundary, so
identifiers like ``some_variable`` do not flip italics. Returns a fresh list
rather than mutating the input. Pure computation with no side effects.
Called by :func:`_add_line_with_format_tracking` for each non-bullet token.
Args:
word (str): The token whose markdown delimiters are being applied.
active_formats (List[str]): The currently-open format markers.
Returns:
List[str]: The updated list of open format markers after *word*.
"""
formats = active_formats.copy()
active_inline_code = None
for fmt in formats:
if fmt.startswith("INLINE_CODE_"):
active_inline_code = fmt
break
i = 0
while i < len(word):
if word[i] == "`":
backtick_count = 0
j = i
while j < len(word) and word[j] == "`":
backtick_count += 1
j += 1
code_marker = f"INLINE_CODE_{backtick_count}"
if active_inline_code:
if code_marker == active_inline_code:
formats.remove(active_inline_code)
active_inline_code = None
else:
formats.append(code_marker)
active_inline_code = code_marker
i = j
else:
i += 1
if active_inline_code:
return formats
word_without_code = re.sub(r"`+[^`]*`+", "", word)
if not word_without_code.strip():
return formats
for tag in ["**", "__", "~~", "||"]:
count = word_without_code.count(tag)
for _ in range(count):
if tag in formats:
formats.remove(tag)
else:
formats.append(tag)
temp_word = word_without_code.replace("**", "").replace("__", "")
for tag in ["*", "_"]:
positions = [i for i, c in enumerate(temp_word) if c == tag]
for pos in positions:
at_start = pos == 0
at_end = pos == len(temp_word) - 1
before_is_boundary = at_start or not temp_word[pos - 1].isalnum()
after_is_boundary = at_end or not temp_word[pos + 1].isalnum()
if before_is_boundary or after_is_boundary:
if tag in formats:
formats.remove(tag)
else:
formats.append(tag)
return formats
def _close_formats(chunk: str, active_formats: List[str]) -> str:
"""Append closing delimiters for all open markdown formats to a chunk.
Walks *active_formats* in reverse (so the most recently opened span closes
first, keeping nesting valid) and appends each tag's closing delimiter,
expanding an ``INLINE_CODE_n`` marker back into ``n`` backticks. Used to seal
a chunk before it is emitted so Discord does not render a dangling span. Pure
computation with no side effects.
Called by :func:`_split_with_markdown_awareness` and
:func:`_add_line_with_format_tracking` whenever a chunk is finalised.
Args:
chunk (str): The chunk text to append closers to.
active_formats (List[str]): The format markers currently open.
Returns:
str: *chunk* with the appropriate closing delimiters appended.
"""
for tag in reversed(active_formats):
if tag.startswith("INLINE_CODE_"):
n = int(tag.split("_")[2])
chunk += "`" * n
else:
chunk += tag
return chunk
def _open_formats(chunk: str, active_formats: List[str]) -> str:
"""Append opening delimiters for all active markdown formats to a chunk.
Walks *active_formats* in order and appends each tag's opening delimiter,
expanding an ``INLINE_CODE_n`` marker into ``n`` backticks. The mirror of
:func:`_close_formats`, used to re-open the spans that were closed at the
previous chunk boundary so formatting continues seamlessly into the new
chunk. Pure computation with no side effects.
Called by :func:`_add_line_with_format_tracking` right after starting a fresh
chunk.
Args:
chunk (str): The new chunk text to prepend openers to.
active_formats (List[str]): The format markers to re-open.
Returns:
str: *chunk* with the appropriate opening delimiters appended.
"""
for tag in active_formats:
if tag.startswith("INLINE_CODE_"):
n = int(tag.split("_")[2])
chunk += "`" * n
else:
chunk += tag
return chunk