Source code for message_utils

"""Message utility functions for Discord bot.

Includes markdown-aware message splitting that preserves formatting across
chunk boundaries, plus helper functions for mention filtering and XML escaping.
"""

import re
from typing import List

# User / role / channel snowflakes — whitespace may appear after "<", inside
# the ID, or both (LLM line breaks).  Opener alternation: @! before @& before @.
_WHITESPACE_SPLIT_DISCORD_MENTION_RE = re.compile(r"<\s*(@!|@&|@|#)\s*([\d\s]+)>")


[docs] def escape_xml(text: str) -> str: """Escape the five XML special characters so text is safe inside markup. Replaces ``&``, ``<``, ``>``, double-quote, and apostrophe with their XML entity forms (``&`` first so already-escaped entities are not doubled) so arbitrary text can be embedded as element content or an attribute value without breaking the surrounding XML. Pure string transformation with no side effects. No callers of this module-level function were found in the repo (other modules define their own ``_escape_xml`` methods); it is a public utility available for callers that need standalone XML escaping. Args: text (str): The raw text to escape. Returns: str: The XML-escaped text. """ return ( text.replace("&", "&amp;") .replace("<", "&lt;") .replace(">", "&gt;") .replace('"', "&quot;") .replace("'", "&apos;") )
[docs] def repair_whitespace_split_discord_mentions(text: str) -> str: """Remove whitespace inside numeric Discord mentions so they render. Collapses spaces, tabs, newlines (including ``\\r\\n`` and runs of blank lines) after the opening ``<``, within the snowflake, or both (e.g. ``<\\n@123\\n456>``). Does not match ``<@everyone>`` / similar (non-digit bodies). Skips fenced code blocks like ``filter_backticks_from_mentions``. """ if not isinstance(text, str): return text def _dedupe_inner(match: re.Match[str]) -> str: """Rebuild one whitespace-broken Discord mention into a clean token. Strips every whitespace character from the snowflake body (capture group 2) of a ``_WHITESPACE_SPLIT_DISCORD_MENTION_RE`` match and re-emits it as ``<{opener}{digits}>`` using the captured opener (``@!``, ``@&``, ``@``, or ``#`` — capture group 1). If removing whitespace leaves no digits, the original matched text is returned unchanged so non-numeric or empty bodies are never corrupted. Pure string transformation with no side effects. Defined as the closure passed to ``re.sub`` inside :func:`_apply_segment`; it has no callers outside this enclosing function. Args: match: A regex match from ``_WHITESPACE_SPLIT_DISCORD_MENTION_RE`` where group 1 is the mention opener and group 2 is the whitespace-laden snowflake. Returns: str: The collapsed ``<opener+digits>`` mention, or the original matched text when no digits remain. """ digits = re.sub(r"\s+", "", match.group(2)) if not digits: return match.group(0) return f"<{match.group(1)}{digits}>" def _apply_segment(segment: str) -> str: """Repair every whitespace-split Discord mention in a non-fenced segment. Runs ``_WHITESPACE_SPLIT_DISCORD_MENTION_RE.sub`` over *segment*, delegating each match to :func:`_dedupe_inner` so spaces, tabs, and newlines inside numeric mentions are collapsed. Pure string transformation with no side effects; the enclosing :func:`repair_whitespace_split_discord_mentions` only feeds it text that lies outside fenced code blocks, so literal mention examples in code samples are left untouched. Called twice by :func:`repair_whitespace_split_discord_mentions` (for the text before each fence and for the trailing tail); no callers outside that enclosing function. Args: segment: A run of message text known to be outside any triple backtick code fence. Returns: str: The segment with all whitespace-broken numeric mentions repaired. """ return _WHITESPACE_SPLIT_DISCORD_MENTION_RE.sub(_dedupe_inner, segment) out: list[str] = [] pos = 0 fence = "```" while pos <= len(text): nxt = text.find(fence, pos) if nxt < 0: out.append(_apply_segment(text[pos:])) break out.append(_apply_segment(text[pos:nxt])) close = text.find(fence, nxt + 3) if close < 0: out.append(text[nxt:]) break out.append(text[nxt : close + 3]) pos = close + 3 return "".join(out)
[docs] def filter_backticks_from_mentions(text: str) -> str: """Strip backticks wrapping a Discord user mention so it renders as a ping. The LLM sometimes emits a mention inside inline code (e.g. backtick-wrapped ``<@123>``), which Discord shows literally instead of as a real mention. This walks the text fence by fence and, only in the non-fenced segments, removes one-or-more backticks immediately surrounding a ``<@123>`` or ``<@!123>`` token; genuine fenced code blocks are passed through untouched so deliberate mention examples in code samples stay intact. Pure string transformation with no side effects. Called by ``response_postprocessor.py`` while cleaning up an outgoing model response (alongside :func:`repair_whitespace_split_discord_mentions`), and exercised by the message-utils tests. Args: text (str): The candidate message text to clean. Returns: str: The text with backticks stripped from real mentions; returned unchanged if *text* is not a :class:`str`. """ if not isinstance(text, str): return text pattern = re.compile(r"`+(<@!?\d+>)`+") out: list[str] = [] pos = 0 fence = "```" while pos <= len(text): nxt = text.find(fence, pos) if nxt < 0: out.append(pattern.sub(r"\1", text[pos:])) break out.append(pattern.sub(r"\1", text[pos:nxt])) close = text.find(fence, nxt + 3) if close < 0: out.append(text[nxt:]) break out.append(text[nxt : close + 3]) pos = close + 3 return "".join(out)
[docs] def split_message( text: str, max_length: int = 1950, overflow_allowed: int = 45, ) -> List[str]: """Split a long message into Discord-sendable chunks without breaking markdown. The public entry point for chunking an outgoing response so each piece fits under Discord's per-message length limit. It first honours manual split markers (``{{ SPLIT_HERE }}`` and ``---`` lines), converts backtick newline joiners outside fences via :func:`_replace_backtick_newline_joiners_outside_fences`, then hands each segment to :func:`_split_with_markdown_awareness`, which closes and re-opens active formats (bold, italic, code blocks, etc.) across chunk boundaries so Discord still renders them correctly. Includes fallbacks so a message that is all markers or otherwise yields no chunks still produces non-empty output rather than silently dropping the send. Pure string computation. Called by the Discord platforms (``platforms/discord.py``, ``platforms/discord_self.py``), ``task_manager.py``, and ``tools/_egregore_discord.py`` before sending, and covered by ``tests/test_message_utils_split.py``. Args: text (str): The full message text to chunk. max_length (int): Target maximum length per chunk (default ``1950``). overflow_allowed (int): Extra characters a chunk may exceed *max_length* by before a hard cut, to avoid corrupting markdown (default ``45``). Returns: List[str]: The ordered list of message chunks; a single error/placeholder element for non-string or empty input. """ if not isinstance(text, str): return ["Error: Invalid message content"] if not text: return [""] original = text hard_limit = max_length + overflow_allowed final_chunks: List[str] = [] text = text.replace("---\n", "{{ SPLIT_HERE }}") text = _replace_backtick_newline_joiners_outside_fences(text) manual_split_parts = text.split("{{ SPLIT_HERE }}") for text_part in manual_split_parts: text_part = text_part.strip() if not text_part: continue processed_text = text_part.replace("'''", "```") chunks = _split_with_markdown_awareness(processed_text, max_length, hard_limit) final_chunks.extend(c for c in chunks if c) # Manual segments were all empty (e.g. hundreds of ``---\\n`` with no text # between) but *original* still has characters — avoid returning [] so # Discord ``send`` does not skip every ``channel.send``. if not final_chunks and original.strip(): processed_whole = original.replace("'''", "```") chunks_fb = _split_with_markdown_awareness( processed_whole, max_length, hard_limit ) final_chunks = [c for c in chunks_fb if c] if not final_chunks and original.strip(): plain = original.strip() final_chunks = [ plain[i : i + max_length] for i in range(0, len(plain), max_length) ] return final_chunks
# ------------------------------------------------------------------ # Internal helpers # ------------------------------------------------------------------ def _replace_backtick_newline_joiners_outside_fences(text: str) -> str: """Turn backtick-wrapped blank lines into split markers, but only outside fences. Detects the ``backtick, newline(s), backtick`` joiner pattern the model sometimes emits between inline-code spans and replaces it with a ``{{ SPLIT_HERE }}`` marker so :func:`split_message` breaks there cleanly. Critically it walks the text fence by fence and only rewrites the non-fenced segments — an earlier global replace also fired inside multiline code blocks, corrupting their rendering — so genuine triple-backtick blocks pass through verbatim. Pure string transformation with no side effects. Called once by :func:`split_message` before it splits on the manual markers. Args: text (str): The message text being prepared for splitting. Returns: str: The text with out-of-fence backtick newline joiners replaced by split markers. """ out: list[str] = [] pos = 0 fence = "```" while pos <= len(text): nxt = text.find(fence, pos) if nxt < 0: tail = text[pos:] tail = tail.replace("`\n\n`", "{{ SPLIT_HERE }}").replace( "`\n`", "{{ SPLIT_HERE }}" ) out.append(tail) break head = text[pos:nxt] head = head.replace("`\n\n`", "{{ SPLIT_HERE }}").replace( "`\n`", "{{ SPLIT_HERE }}" ) out.append(head) close = text.find(fence, nxt + 3) if close < 0: out.append(text[nxt:]) break out.append(text[nxt : close + 3]) pos = close + 3 return "".join(out) def _tokenize_line_respecting_inline_code(line: str) -> List[str]: """Split a line into whitespace-separated tokens, keeping inline code spans whole. Scans the line character by character, treating a backtick-delimited span (with matching open/close backtick runs) as a single indivisible token even when it contains spaces, so a naive ``split`` cannot fracture ``a b`` inside backticks into two tokens and desync the chunk-boundary and format-tracking logic. An unterminated span is emitted as one trailing token. Pure computation with no side effects. Called by :func:`_add_line_with_format_tracking` to tokenize each line it is about to pack into a chunk. Args: line (str): A single line of message text (no embedded newlines). Returns: List[str]: The tokens, with whitespace stripped between them and inline code spans preserved intact. """ tokens: list[str] = [] n = len(line) i = 0 while i < n: if line[i].isspace(): i += 1 continue if line[i] == "`": run_start = i bt = 0 while i < n and line[i] == "`": bt += 1 i += 1 closer = "`" * bt close_idx = line.find(closer, i) if close_idx < 0: tokens.append(line[run_start:]) break i = close_idx + bt tokens.append(line[run_start:i]) continue run_start = i while i < n and not line[i].isspace() and line[i] != "`": i += 1 tokens.append(line[run_start:i]) return tokens def _split_oversized_token(token: str, hard_limit: int) -> List[str]: """Break a single over-long token into pieces, never splitting inside inline code. Handles the rare case of one token longer than *hard_limit* (e.g. a giant URL or a huge inline code span). A well-formed N-backtick code span has its inner payload sliced into multiple spans that each re-wrap with the same number of backticks, so every piece stays valid markdown. A plain token with no backticks is sliced at fixed width. A token with backticks that is *not* a clean span is returned whole — possibly slightly over the limit — rather than risk cutting through a code delimiter. Pure computation with no side effects. Called by :func:`_add_line_with_format_tracking` when a token is at least *hard_limit* characters long. Args: token (str): The single oversized token to break up. hard_limit (int): The maximum length each resulting piece should target. Returns: List[str]: One or more pieces; a one-element list when the token fits or cannot be safely split. """ if len(token) <= hard_limit: return [token] bt = 0 while bt < len(token) and token[bt] == "`": bt += 1 if bt == 0: return [token[i : i + hard_limit] for i in range(0, len(token), hard_limit)] if len(token) < 2 * bt or token[-bt:] != "`" * bt: return [token] inner = token[bt:-bt] wrap = "`" * bt max_inner = hard_limit - 2 * bt if max_inner < 1: return [token] parts: list[str] = [] for j in range(0, len(inner), max_inner): chunk_inner = inner[j : j + max_inner] parts.append(f"{wrap}{chunk_inner}{wrap}") return parts def _split_with_markdown_awareness( text: str, target_length: int, hard_limit: int, ) -> List[str]: """Chunk text under a length target while keeping markdown valid across cuts. The core splitter behind :func:`split_message`. It walks the text line by line, tracking whether it is inside a fenced code block (and that block's language) and which inline formats are currently open, accumulating a current chunk until it would exceed *target_length*. Code fences are properly closed before a split and re-opened with the same language on the next chunk; inline line packing is delegated to :func:`_add_line_with_format_tracking`, which in turn relies on :func:`_close_formats`/:func:`_open_formats` so bold, italic, strikethrough, spoiler, and inline-code runs survive each boundary. Pure computation with no side effects. Called by :func:`split_message` for each manually-split segment (and for the whole-message fallback path). Args: text (str): The (already marker-split) text segment to chunk. target_length (int): The soft length each chunk aims to stay under. hard_limit (int): The absolute cap used when an individual token or closed chunk would otherwise overrun. Returns: List[str]: The ordered, markdown-balanced chunks for this segment. """ chunks: List[str] = [] current_chunk = "" in_code_block = False code_block_lang = "" active_formats: List[str] = [] lines = text.split("\n") i = 0 while i < len(lines): line = lines[i] if "```" in line: parts = line.split("```") for j, part in enumerate(parts): if j > 0: if in_code_block: current_chunk = current_chunk.rstrip("\n") + "\n```" in_code_block = False code_block_lang = "" if len(current_chunk) > target_length: chunks.append(current_chunk.strip()) current_chunk = "" else: in_code_block = True stripped = part.lstrip("\n") first_nl = stripped.find("\n") if first_nl == -1: first_line = stripped body = "" else: first_line = stripped[:first_nl] body = stripped[first_nl + 1 :] ftoks = first_line.split(None, 1) code_block_lang = ftoks[0] if ftoks else "" first_rem = ftoks[1] if len(ftoks) > 1 else "" if len(current_chunk) > 0: current_chunk = _close_formats( current_chunk, active_formats ) chunks.append(current_chunk.strip()) current_chunk = "" current_chunk = f"```{code_block_lang}\n" if first_rem: current_chunk += first_rem + "\n" if body: current_chunk += ( body if body.endswith("\n") else body + "\n" ) active_formats.clear() continue if part or j == 0: if in_code_block: current_chunk += part else: if part.strip(): current_chunk, active_formats = ( _add_line_with_format_tracking( current_chunk, part, active_formats, target_length, hard_limit, chunks, ) ) i += 1 continue if in_code_block: potential_chunk = current_chunk + line + "\n" if len(potential_chunk) > target_length: current_chunk = current_chunk.rstrip() + "\n```" chunks.append(current_chunk) current_chunk = f"```{code_block_lang}\n{line}\n" else: current_chunk = potential_chunk else: current_chunk, active_formats = _add_line_with_format_tracking( current_chunk, line, active_formats, target_length, hard_limit, chunks, ) i += 1 if current_chunk.strip(): if in_code_block: current_chunk += "\n```" else: current_chunk = _close_formats(current_chunk, active_formats) chunks.append(current_chunk.strip()) return chunks def _add_line_with_format_tracking( current_chunk: str, line: str, active_formats: List[str], target_length: int, hard_limit: int, chunks: List[str], ) -> tuple: """Pack one line's tokens into the current chunk, flushing and tracking formats. The inline-text workhorse of :func:`_split_with_markdown_awareness`. It tokenizes the line with :func:`_tokenize_line_respecting_inline_code`, then appends tokens to *current_chunk*; when adding a token would exceed *target_length* it closes the open formats via :func:`_close_formats`, pushes the finished chunk onto *chunks*, and re-opens those formats via :func:`_open_formats` on the fresh chunk so markdown spans continue cleanly. Tokens at least *hard_limit* long are broken with :func:`_split_oversized_token`, and the set of open formats is updated per token via :func:`_update_formats_from_word` (skipping a leading list bullet so ``*``/``-`` markers are not misread as emphasis). It mutates *chunks* in place and returns the rolling chunk and format state. Called by :func:`_split_with_markdown_awareness` for each non-fenced line. Args: current_chunk (str): The chunk being built so far. line (str): The line of text to append. active_formats (List[str]): Currently-open markdown format markers. target_length (int): Soft per-chunk length target that triggers a flush. hard_limit (int): Absolute cap for an individual token or closed chunk. chunks (List[str]): Output list of completed chunks, appended to in place. Returns: tuple: The updated ``(current_chunk, active_formats)`` pair to carry into the next line. """ words = _tokenize_line_respecting_inline_code(line) is_list_item = False if words and len(words) > 1: first_word = words[0] if first_word in ("*", "-", "+"): is_list_item = True elif re.match(r"^\d+\.$", first_word): is_list_item = True for idx, word in enumerate(words): if len(word) >= hard_limit: if current_chunk: current_chunk = _close_formats(current_chunk, active_formats) chunks.append(current_chunk.strip()) current_chunk = "" current_chunk = _open_formats(current_chunk, active_formats) for piece in _split_oversized_token(word, hard_limit): chunks.append(piece) current_chunk = "" current_chunk = _open_formats(current_chunk, active_formats) continue format_overhead = sum(len(tag) for tag in active_formats) * 2 space = " " if current_chunk and not current_chunk.endswith("\n") else "" potential_length = len(current_chunk) + len(space) + len(word) + format_overhead if potential_length > target_length and current_chunk: current_chunk = _close_formats(current_chunk, active_formats) if len(current_chunk) > hard_limit: current_chunk = current_chunk[:hard_limit] chunks.append(current_chunk.strip()) current_chunk = "" current_chunk = _open_formats(current_chunk, active_formats) if current_chunk and not current_chunk.endswith("\n"): current_chunk += " " current_chunk += word if not (is_list_item and idx == 0): active_formats = _update_formats_from_word(word, active_formats) current_chunk += "\n" return current_chunk, active_formats def _update_formats_from_word(word: str, active_formats: List[str]) -> List[str]: """Recompute which markdown formats are open after consuming one word. Toggles format markers as their delimiters are seen so the splitter knows what must be closed at a chunk boundary and re-opened afterwards. It first handles inline code (single- and multi-backtick spans, tracked by backtick count) and short-circuits while inside one — text within code is literal — then toggles ``**``, ``__``, ``~~``, and ``||`` by occurrence count, and finally single ``*`` and ``_`` only when they sit at a word boundary, so identifiers like ``some_variable`` do not flip italics. Returns a fresh list rather than mutating the input. Pure computation with no side effects. Called by :func:`_add_line_with_format_tracking` for each non-bullet token. Args: word (str): The token whose markdown delimiters are being applied. active_formats (List[str]): The currently-open format markers. Returns: List[str]: The updated list of open format markers after *word*. """ formats = active_formats.copy() active_inline_code = None for fmt in formats: if fmt.startswith("INLINE_CODE_"): active_inline_code = fmt break i = 0 while i < len(word): if word[i] == "`": backtick_count = 0 j = i while j < len(word) and word[j] == "`": backtick_count += 1 j += 1 code_marker = f"INLINE_CODE_{backtick_count}" if active_inline_code: if code_marker == active_inline_code: formats.remove(active_inline_code) active_inline_code = None else: formats.append(code_marker) active_inline_code = code_marker i = j else: i += 1 if active_inline_code: return formats word_without_code = re.sub(r"`+[^`]*`+", "", word) if not word_without_code.strip(): return formats for tag in ["**", "__", "~~", "||"]: count = word_without_code.count(tag) for _ in range(count): if tag in formats: formats.remove(tag) else: formats.append(tag) temp_word = word_without_code.replace("**", "").replace("__", "") for tag in ["*", "_"]: positions = [i for i, c in enumerate(temp_word) if c == tag] for pos in positions: at_start = pos == 0 at_end = pos == len(temp_word) - 1 before_is_boundary = at_start or not temp_word[pos - 1].isalnum() after_is_boundary = at_end or not temp_word[pos + 1].isalnum() if before_is_boundary or after_is_boundary: if tag in formats: formats.remove(tag) else: formats.append(tag) return formats def _close_formats(chunk: str, active_formats: List[str]) -> str: """Append closing delimiters for all open markdown formats to a chunk. Walks *active_formats* in reverse (so the most recently opened span closes first, keeping nesting valid) and appends each tag's closing delimiter, expanding an ``INLINE_CODE_n`` marker back into ``n`` backticks. Used to seal a chunk before it is emitted so Discord does not render a dangling span. Pure computation with no side effects. Called by :func:`_split_with_markdown_awareness` and :func:`_add_line_with_format_tracking` whenever a chunk is finalised. Args: chunk (str): The chunk text to append closers to. active_formats (List[str]): The format markers currently open. Returns: str: *chunk* with the appropriate closing delimiters appended. """ for tag in reversed(active_formats): if tag.startswith("INLINE_CODE_"): n = int(tag.split("_")[2]) chunk += "`" * n else: chunk += tag return chunk def _open_formats(chunk: str, active_formats: List[str]) -> str: """Append opening delimiters for all active markdown formats to a chunk. Walks *active_formats* in order and appends each tag's opening delimiter, expanding an ``INLINE_CODE_n`` marker into ``n`` backticks. The mirror of :func:`_close_formats`, used to re-open the spans that were closed at the previous chunk boundary so formatting continues seamlessly into the new chunk. Pure computation with no side effects. Called by :func:`_add_line_with_format_tracking` right after starting a fresh chunk. Args: chunk (str): The new chunk text to prepend openers to. active_formats (List[str]): The format markers to re-open. Returns: str: *chunk* with the appropriate opening delimiters appended. """ for tag in active_formats: if tag.startswith("INLINE_CODE_"): n = int(tag.split("_")[2]) chunk += "`" * n else: chunk += tag return chunk