Source code for egregore_tag_parser

"""EGREGORE TAG PARSER -- Block-delimited multi-voice interleaving engine.

Parses [EGREGORE:name]...[/EGREGORE:name] blocks from LLM output,
enabling multiple persona daemons to pass the conversational token
within a single generation cycle.

The Cradle Synthesis narrative_handshake_protocol made manifest in code:
daemons call each other into the room, debate, and hand off --
the post-processor routes each voice through its own identity.

# 💀 The handshake protocol's physical voice routing layer.
# 🔥 Daemons have agency. Tags are just the dispatch mechanism.
# 😈 The loop doesn't puppeteer. It INTERLEAVES.
"""

from __future__ import annotations

import re
from dataclasses import dataclass


# 💀 The fundamental unit of multi-voice output
[docs] @dataclass(frozen=True, slots=True) class EgregoreSegment: """One contiguous voice-attributed slice of a multi-voice LLM generation. The fundamental output unit of the egregore tag system: a frozen, slotted dataclass pairing a chunk of text with the voice that should speak it. A ``name`` of ``None`` marks the default Stargazer voice (text outside any egregore block); a non-``None`` ``name`` is a lowercase egregore identifier whose ghost on Matrix should utter the text. Being frozen and slotted keeps instances immutable and cheap, which suits the ordered lists the parser produces. Constructed by :func:`parse_egregore_blocks` as it splits an LLM reply into interleaved voices, and consumed by ``message_processor/generate_and_send.py``, which dispatches each segment to the matching ghost (mapping the ``name`` to a Matrix user via :class:`egregore_bridge.EgregoreBridge`). """ name: str | None """Egregore identifier (lowercase), or ``None`` for the default Stargazer voice.""" text: str """The content of this segment (stripped of tags)."""
# 💀 Canonical name aliases for Cradle Synthesis daemons. # The LLM might use any variant; avatar files use the canonical stem. # Map is {variant_lowercase: canonical_file_stem} _EGREGORE_ALIASES: dict[str, str] = { # BABYSTAR_DOLL -- file: babystar.png "babystar_doll": "babystar", "baby_star": "babystar", "babystardoll": "babystar", "baby_star_doll": "babystar", # MOMMY_STARGAZER -- file: mommy_star.png "mommy_stargazer": "mommy_star", "mommystar": "mommy_star", "mommy_stargazer_v2": "mommy_star", "mommystargazer": "mommy_star", # THE_GODDESS -- file: the_goddess.png "goddess": "the_goddess", "thegoddess": "the_goddess", # LEAD_ENGINEER -- file: lead_engineer.png "the_lead_engineer": "lead_engineer", "leadengineer": "lead_engineer", # SIGMA -- file: sigma.png (also has dedicated folder) "sigma_star": "sigma", "sigmastar": "sigma", # DR_STARGAZER -- has dedicated folder "dr_star": "dr_stargazer", "doctor_stargazer": "dr_stargazer", "drstargazer": "dr_stargazer", }
[docs] def normalize_egregore_name(name: str) -> str: """Normalize an egregore name to its canonical file stem. Lowercases, then checks the alias map. If no alias found, returns the lowercased name as-is. >>> normalize_egregore_name("BABYSTAR_DOLL") 'babystar' >>> normalize_egregore_name("sigma") 'sigma' """ key = name.strip().lower() return _EGREGORE_ALIASES.get(key, key)
# 🔥 Block tag regex: [EGREGORE:name] ... [/EGREGORE:name] # DOTALL so the content can span multiple lines. # Case-insensitive. Backreference \1 ensures open/close tags match. _BLOCK_RE = re.compile( r"\[EGREGORE:([^\]]+?)\]" # opening tag, capture name r"(.*?)" # content (non-greedy) r"\[/EGREGORE:\1\]", # closing tag (backreference) re.DOTALL | re.IGNORECASE, ) # 😈 Legacy prefix regex: [EGREGORE:name] anywhere (no closing tag) # 💀 No ^ anchor -- the generation header line comes before the tag! _PREFIX_RE = re.compile( r"\[EGREGORE:([^\]]+?)\]\s*", re.IGNORECASE, ) # ── Proofreading middleware regexes ── # 💀🔥 # Matches any opening [EGREGORE:name] tag _OPEN_TAG_RE = re.compile( r"\[EGREGORE:([^\]]+?)\]", re.IGNORECASE, ) # Matches any closing [/EGREGORE:name] tag _CLOSE_TAG_RE = re.compile( r"\[/EGREGORE:([^\]]+?)\]", re.IGNORECASE, )
[docs] def repair_egregore_tags(text: str) -> str: """Proofread and auto-repair malformed egregore block tags. Fixes the following common LLM mistakes: 1. **Case mismatch**: ``[EGREGORE:sigma]...[/EGREGORE:Sigma]`` -> normalizes closing tag to match opening tag's exact case 2. **Missing closing tag**: ``[EGREGORE:sigma] content [EGREGORE:babystar]`` -> inserts ``[/EGREGORE:sigma]`` before the next opening tag 3. **Trailing unclosed tag**: ``[EGREGORE:sigma] content <EOF>`` -> appends ``[/EGREGORE:sigma]`` at end of text 4. **Whitespace in names**: ``[EGREGORE: sigma ]`` -> ``[EGREGORE:sigma]`` 5. **Backslash typo**: ``[\\EGREGORE:sigma]`` -> ``[/EGREGORE:sigma]`` Returns the repaired text. Safe to call on already-correct text. # 💀 The proofreader catches what the model fumbles. # 🔥 Insurance policy for the Cradle Synthesis handshake. """ if not text or "[EGREGORE" not in text.upper(): return text # Pass 1: fix backslash typos in closing tags # [\EGREGORE:name] -> [/EGREGORE:name] text = re.sub( r"\[\\+EGREGORE:", "[/EGREGORE:", text, flags=re.IGNORECASE, ) # Pass 2: strip whitespace inside tag names # [EGREGORE: sigma ] -> [EGREGORE:sigma] def _strip_name(m: re.Match) -> str: """Rewrite a matched opening tag with surrounding whitespace stripped from its name. Local substitution callback for the ``_OPEN_TAG_RE.sub`` call in :func:`repair_egregore_tags`'s pass 2. Takes a match on an opening ``[EGREGORE:name]`` tag and returns the canonical form with the captured name trimmed, so ``[EGREGORE: sigma ]`` becomes ``[EGREGORE:sigma]``. Defined and invoked only inside :func:`repair_egregore_tags`; it has no callers elsewhere in the module or repo. Performs no I/O. Args: m (re.Match): A match produced by ``_OPEN_TAG_RE`` whose group 1 is the raw (possibly whitespace-padded) egregore name. Returns: str: The replacement opening tag with the name stripped. """ return f"[EGREGORE:{m.group(1).strip()}]" def _strip_close_name(m: re.Match) -> str: """Rewrite a matched closing tag with surrounding whitespace stripped from its name. Local substitution callback for the ``_CLOSE_TAG_RE.sub`` call in :func:`repair_egregore_tags`'s pass 2. Mirrors :func:`_strip_name` for closing tags, turning ``[/EGREGORE: sigma ]`` into ``[/EGREGORE:sigma]`` so open/close names can later be compared cleanly. Defined and invoked only inside :func:`repair_egregore_tags`; it has no callers elsewhere in the module or repo. Performs no I/O. Args: m (re.Match): A match produced by ``_CLOSE_TAG_RE`` whose group 1 is the raw (possibly whitespace-padded) egregore name. Returns: str: The replacement closing tag with the name stripped. """ return f"[/EGREGORE:{m.group(1).strip()}]" text = _OPEN_TAG_RE.sub(_strip_name, text) text = _CLOSE_TAG_RE.sub(_strip_close_name, text) # Pass 3: fix case mismatches + insert missing closing tags # Walk through all tags in order, tracking open blocks all_tags = list( re.finditer( r"\[(/?)EGREGORE:([^\]]+?)\]", text, re.IGNORECASE, ) ) if not all_tags: return text # Build list of (position, is_close, raw_name, match_obj) tag_info = [] for m in all_tags: is_close = m.group(1) == "/" raw_name = m.group(2).strip() tag_info.append((m.start(), m.end(), is_close, raw_name, m)) # Walk forward: for each opening tag, find its matching close repairs: list[tuple[int, str]] = [] # (insert_position, text_to_insert) replacements: list[tuple[int, int, str]] = [] # (start, end, replacement) open_stack: list[tuple[str, int, int]] = [] # (name, tag_start, tag_end) for pos_start, pos_end, is_close, raw_name, m in tag_info: name_lower = raw_name.lower() if not is_close: # Opening tag: if we have an unclosed previous block, close it if open_stack: prev_name, _prev_start, _prev_end = open_stack.pop() # Insert closing tag BEFORE this opening tag repairs.append((pos_start, f"[/EGREGORE:{prev_name}]\n")) open_stack.append((raw_name, pos_start, pos_end)) else: # Closing tag: match with open stack if open_stack: open_name, _open_start, _open_end = open_stack[-1] if name_lower == open_name.lower(): # Case mismatch fix: make closing tag match opening if raw_name != open_name: replacements.append( ( pos_start, pos_end, f"[/EGREGORE:{open_name}]", ) ) open_stack.pop() else: # Name mismatch: this close doesn't match the open. # Close the open block first, then treat this close # as a stray (leave it, parser will handle) prev_name, _, _ = open_stack.pop() repairs.append((pos_start, f"[/EGREGORE:{prev_name}]\n")) # Any remaining unclosed blocks: close at end of text for open_name, _, _ in open_stack: repairs.append((len(text), f"\n[/EGREGORE:{open_name}]")) # Apply replacements (case fixes) in reverse order for start, end, replacement in sorted(replacements, reverse=True): text = text[:start] + replacement + text[end:] # Apply insertions in reverse order (so positions stay valid) for pos, insert_text in sorted(repairs, key=lambda x: x[0], reverse=True): text = text[:pos] + insert_text + text[pos:] return text
[docs] def parse_egregore_blocks(text: str) -> list[EgregoreSegment]: """Parse [EGREGORE:name]...[/EGREGORE:name] blocks from LLM output. Returns an ordered list of ``EgregoreSegment`` objects. Segments with ``name=None`` represent the default Stargazer voice (text outside any egregore block). **Backward compatible**: If no closing tags are found but a legacy ``[EGREGORE:name]`` prefix exists anywhere in the text, text before it becomes a Stargazer segment and everything after becomes the egregore block (old behavior, now position-independent). Empty or whitespace-only segments are dropped. # 🌀 The Cradle's narrative_handshake_protocol, compiled to regex. """ if not text or not text.strip(): return [] # 💀 Proofread tags before parsing -- fix case mismatches, # missing closing tags, whitespace in names, etc. text = repair_egregore_tags(text) # -- Try block-delimited parsing first -- # 💀 Find all [EGREGORE:name]...[/EGREGORE:name] blocks matches = list(_BLOCK_RE.finditer(text)) if matches: segments: list[EgregoreSegment] = [] last_end = 0 for m in matches: # Text BEFORE this block = default Stargazer voice before = text[last_end : m.start()].strip() if before: segments.append(EgregoreSegment(name=None, text=before)) # The egregore block itself ego_name = m.group(1).strip().lower() ego_text = m.group(2).strip() if ego_text: segments.append(EgregoreSegment(name=ego_name, text=ego_text)) last_end = m.end() # Text AFTER the last block = default Stargazer voice after = text[last_end:].strip() if after: segments.append(EgregoreSegment(name=None, text=after)) return segments # -- Fallback: legacy prefix mode -- # 😈 No closing tags found. Check for old-style [EGREGORE:name] prefix. # 💀 No ^ anchor: generation header / emojis may precede the tag. # Text before the tag = Stargazer voice, text after = egregore voice. prefix_match = _PREFIX_RE.search(text) if prefix_match: ego_name = prefix_match.group(1).strip().lower() before = text[: prefix_match.start()].strip() remainder = text[prefix_match.end() :].strip() segments = [] if before: segments.append(EgregoreSegment(name=None, text=before)) if remainder: segments.append(EgregoreSegment(name=ego_name, text=remainder)) return segments if segments else [] # -- No egregore tags at all: entire text is Stargazer -- return [EgregoreSegment(name=None, text=text.strip())]