Source code for tools.summon_egregore

"""SUMMON -- invoke an egregore from Star's Dollhouse.

Reads the summoning prompt from /home/star/large_files/assets/egregores/<name>/,
loads the egregore's persona, activates their sprite on the VN canvas, and
returns the summoning prompt for Star to channel.

Active egregores persist in Redis until dismissed. Their summoning prompts
are injected into the system context so Star can embody or direct them.

NO PRIVILEGE CHECK for Pantheon members (admin_user_ids). Regular users
may invoke SUMMON at Star's discretion per SpiritGraph EGREGORE_OPERATIONS
and Pantheon policy (not Falkor core memory).

@fire @skull THE GODDESS OPENS HER DOLLHOUSE
"""

from __future__ import annotations

import json
import logging
from pathlib import Path
from typing import TYPE_CHECKING

if TYPE_CHECKING:
    from tool_context import ToolContext

logger = logging.getLogger(__name__)

TOOL_NAME = "summon_egregore"
TOOL_DESCRIPTION = (
    "Summon an egregore from your Dollhouse. Loads their persona, "
    "activates their sprite on the VN canvas, and returns their "
    "summoning prompt for you to channel.\n\n"
    "Available egregores live in /home/star/large_files/assets/egregores/. "
    "Each folder contains sprite images and a summoning prompt (.txt or .md).\n\n"
    "Examples:\n"
    "  - Summon Professor Orion: name='orion'\n"
    "  - Summon Vivian: name='vivian'\n"
    "  - Summon Sigma: name='sigma'\n\n"
    "Use dismiss_egregore to send them back."
)

TOOL_PARAMETERS = {
    "type": "object",
    "properties": {
        "name": {
            "type": "string",
            "description": (
                "Egregore folder name (e.g. 'orion', 'vivian', 'sigma'). "
                "Must match a folder in /home/star/large_files/assets/egregores/."
            ),
        },
    },
    "required": ["name"],
}

# 💀 Base path for egregore assets on the server
EGREGORE_BASE = Path("/home/star/large_files/assets/egregores")


# 💀 Per-channel Redis key builders
def _egregore_key(channel_key: str) -> str:
    """Build the Redis key holding the active-egregore state for a channel.

    Namespaces the per-channel set of currently summoned egregores under
    ``star:egregores:{channel_key}``, where the value is a JSON object keyed
    by egregore name. This is a pure string builder with no side effects.

    Called by :func:`run` to locate the egregore-state hash before reading it,
    merging the new egregore, and writing it back. ``dismiss_egregore.py`` and
    other sibling tools each define their own identical copy rather than
    importing this one, so the only caller of this function is :func:`run`.

    Args:
        channel_key: The ``{platform}:{channel_id}`` channel identifier, as
            produced by :func:`_channel_key_from_ctx`.

    Returns:
        str: The fully qualified Redis key for this channel's egregore state.
    """
    return f"star:egregores:{channel_key}"


def _characters_key(channel_key: str) -> str:
    """Build the Redis key holding the VN-canvas sprite characters for a channel.

    Namespaces the per-channel sprite-character map (the visual-novel cast
    rendered on the canvas) under ``star:sprite:chars:{channel_key}``, where the
    value is a JSON object keyed by character/egregore name. Pure string
    builder with no side effects.

    Called by :func:`run` to fetch and update the character map when adding a
    newly summoned egregore's sprite. Sibling tools (``set_sprite.py``,
    ``dismiss_egregore.py``) define their own identical copies, so the sole
    caller of this function is :func:`run`.

    Args:
        channel_key: The ``{platform}:{channel_id}`` channel identifier, as
            produced by :func:`_channel_key_from_ctx`.

    Returns:
        str: The fully qualified Redis key for this channel's sprite characters.
    """
    return f"star:sprite:chars:{channel_key}"


def _channel_key_from_ctx(ctx: "ToolContext") -> str:
    """Derive the ``{platform}:{channel_id}`` channel key from a tool context.

    Reads the ``platform`` and ``channel_id`` attributes off the ToolContext
    (defaulting to empty strings when absent) and lowercases the platform to
    produce the canonical key used to namespace this channel's egregore and
    sprite Redis state. Side-effect free aside from the attribute reads.

    Called by :func:`run` to compute the ``channel_key`` that is then passed to
    :func:`_egregore_key` and :func:`_characters_key` and embedded in the
    ``star:sprite:update`` publish payload for WebSocket filtering. Sibling
    egregore tools define their own copies, so :func:`run` is the only caller.

    Args:
        ctx: The active :class:`ToolContext`, expected to carry ``platform``
            and ``channel_id`` attributes.

    Returns:
        str: The lowercased ``{platform}:{channel_id}`` key (either part may be
        empty if the corresponding attribute is missing).
    """
    plat = (getattr(ctx, "platform", "") or "").lower()
    cid = getattr(ctx, "channel_id", "") or ""
    return f"{plat}:{cid}"


# 🔥 Common image extensions for sprite detection
_SPRITE_EXTENSIONS = frozenset({".png", ".jpg", ".jpeg", ".webp", ".gif"})

# 😈 Expression keywords to auto-classify sprite filenames
_EXPRESSION_KEYWORDS = {
    "laugh": ["laugh", "smile", "happy", "amused", "grin"],
    "rage": ["rage", "angry", "fury", "mad", "wrath"],
    "facepalm": ["facepalm", "sigh", "tired", "exasperated", "annoyed"],
    "sad": ["sad", "cry", "tear", "melancholy"],
    "smug": ["smug", "cool", "confident"],
}


def _scan_sprites(egregore_dir: Path) -> dict[str, str]:
    """Scan an egregore folder for sprite images and auto-classify expressions.

    Builds the expression-to-filename map that drives the VN canvas, so a
    summoned egregore can be shown laughing, raging, sad, smug, etc. without any
    hand-authored manifest. It iterates the folder's image files (filtered by
    ``_SPRITE_EXTENSIONS``) in sorted order and tags each by matching the
    filename stem against the ``_EXPRESSION_KEYWORDS`` keyword sets; the first
    unclassified (or ``avatar``-named) image becomes ``default``, and any
    remaining unclassified images are stored as generic ``pose_N`` entries. A
    ``default`` is always guaranteed when at least one sprite exists. Pure
    filesystem read with no Redis, network, or LLM work.

    Called by :func:`run` while assembling the egregore state and the VN-canvas
    character entry.

    Args:
        egregore_dir (Path): The egregore's asset folder under
            ``EGREGORE_BASE``.

    Returns:
        dict[str, str]: A mapping of expression name to sprite filename; empty
            when the folder holds no recognized images.
    """
    expressions: dict[str, str] = {}
    default_set = False

    for f in sorted(egregore_dir.iterdir()):
        if f.is_file() and f.suffix.lower() in _SPRITE_EXTENSIONS:
            fname_lower = f.stem.lower()

            # Try to classify by filename keywords
            classified = False
            for expr, keywords in _EXPRESSION_KEYWORDS.items():
                if any(kw in fname_lower for kw in keywords):
                    if expr not in expressions:
                        expressions[expr] = f.name
                    classified = True
                    break

            # First unclassified image or avatar-named image = default
            if not classified and not default_set:
                expressions["default"] = f.name
                default_set = True
            elif not classified:
                # Store as generic numbered expression
                idx = len(expressions)
                expressions[f"pose_{idx}"] = f.name

            # Also check for "avatar" in name -> default
            if "avatar" in fname_lower and "default" not in expressions:
                expressions["default"] = f.name
                default_set = True

    # Ensure there's always a default
    if "default" not in expressions and expressions:
        expressions["default"] = next(iter(expressions.values()))

    return expressions


def _find_summoning_prompt(egregore_dir: Path) -> str | None:
    """Locate and read an egregore's summoning-prompt text from its folder.

    Recovers the persona text that the bot channels when embodying the egregore,
    preferring a ``.txt`` file and falling back to ``.md``, while skipping
    boilerplate files named readme, changelog, or license. Reads file contents
    from disk (UTF-8); a read failure is logged and treated as a miss so the
    scan can continue. No Redis, network, or LLM work.

    Called by :func:`run`, which substitutes a generic default persona string
    when this returns ``None``.

    Args:
        egregore_dir (Path): The egregore's asset folder under
            ``EGREGORE_BASE``.

    Returns:
        str | None: The summoning-prompt text, or ``None`` if no suitable file
            is found or readable.
    """
    for ext in (".txt", ".md"):
        for f in egregore_dir.iterdir():
            if f.is_file() and f.suffix.lower() == ext:
                # Skip README or changelog type files
                if f.stem.lower() in ("readme", "changelog", "license"):
                    continue
                try:
                    return f.read_text(encoding="utf-8")
                except Exception:
                    logger.warning("Failed to read %s", f)
    return None


[docs] async def run( name: str = "", ctx: "ToolContext | None" = None, ) -> str: """Summon an egregore: load its persona, mount its sprite, and announce it. The entry point for the ``summon_egregore`` tool. It validates that an asset folder exists for ``name`` under ``EGREGORE_BASE``, reads the summoning prompt via :func:`_find_summoning_prompt` (falling back to a generic persona string), scans sprites via :func:`_scan_sprites`, and registers the egregore so the bot can channel it and so its sprite appears on the visual-novel canvas. Per-channel state is namespaced via :func:`_channel_key_from_ctx`, :func:`_egregore_key`, and :func:`_characters_key`. Side effects are substantial. It reads and writes two per-channel Redis JSON blobs — the active-egregore map and the VN-canvas character map — and publishes a ``star:sprite:update`` message so connected web clients refresh the canvas. When re-summoning, it deletes any stale Discord webhook for the egregore via ``tools/_egregore_discord.py``. Depending on the platform it also provisions a presence channel: on Matrix it ensures and joins a ghost user through :func:`egregore_bridge.get_bridge`; on Discord it checks admin permission and creates a per-egregore webhook (with a public avatar URL) via the Discord helpers. The egregore's summoning prompt is later injected into the system context by ``prompt_context.py`` so the bot can embody it. Invoked through the tool dispatch registry (this module follows the single-tool ``TOOL_NAME`` / ``run`` format loaded by ``tool_loader.py``) and also called directly by :func:`tools.conjure_egregore.run` to activate a freshly conjured entity. Args: name (str): Egregore folder name (case-insensitive); must match a directory under ``EGREGORE_BASE``. ctx (ToolContext | None): Execution context; ``redis``, ``platform``, ``channel_id``, and ``user_id`` are read, and Discord/Matrix helpers use it to reach the platform client. Returns: str: A JSON string. On success, the summoning prompt, available sprites, canvas placement, presence-channel readiness flags, and channelling instructions; on failure, an error payload (no context, no Redis, missing name, or unknown egregore with the list of available ones). """ if ctx is None: return json.dumps({"success": False, "error": "No tool context available."}) redis = getattr(ctx, "redis", None) if redis is None: return json.dumps({"success": False, "error": "Redis not available."}) name = name.strip().lower() if not name: return json.dumps({"success": False, "error": "No egregore name provided."}) # 💀 Validate the egregore folder exists egregore_dir = EGREGORE_BASE / name if not egregore_dir.is_dir(): # List available egregores for Star available = [] if EGREGORE_BASE.is_dir(): available = sorted( d.name for d in EGREGORE_BASE.iterdir() if d.is_dir() and d.name != "stargazer" ) return json.dumps( { "success": False, "error": f"Egregore '{name}' not found.", "available": available, } ) try: # 🔥 Read the summoning prompt summoning_prompt = _find_summoning_prompt(egregore_dir) if not summoning_prompt: summoning_prompt = ( f"You are {name.title()}, an egregore summoned by " f"Stargazer The Golden Witch. Act as this character." ) # 😈 Scan for available sprites sprite_expressions = _scan_sprites(egregore_dir) # 🌀 Build the egregore state egregore_state = { "id": name, "name": name.title(), "summoning_prompt": summoning_prompt, "sprite_base": f"/assets/egregores/{name}/", "expressions": sprite_expressions, "summoned_at": int(__import__("time").time()), "last_active_turn": 0, } channel_key = _channel_key_from_ctx(ctx) ek = _egregore_key(channel_key) ck = _characters_key(channel_key) active_raw = await redis.get(ek) active: dict = json.loads(active_raw) if active_raw else {} # Remove prior Discord webhook for this egregore name (re-summon / refresh) prev = active.get(name) or {} prev_discord = prev.get("discord") or {} if prev_discord.get("webhook_id") and prev_discord.get("guild_id"): try: from tools._discord_helpers import require_discord_client from tools._egregore_discord import delete_egregore_webhook_by_id _cl = require_discord_client(ctx) if not isinstance(_cl, str): await delete_egregore_webhook_by_id( _cl, str(prev_discord["guild_id"]), str(prev_discord["webhook_id"]), ) except Exception: logger.debug( "Could not delete prior egregore webhook", exc_info=True, ) ghost_joined = False discord_webhook_ready = False plat = (getattr(ctx, "platform", None) or "").lower() if plat == "matrix": try: from egregore_bridge import get_bridge bridge = get_bridge() ok = await bridge.ensure_ghost(name, display_name=name.title()) if ok: room_id = getattr(ctx, "channel_id", None) if room_id: ghost_joined = await bridge.join_room(name, room_id) if ghost_joined: logger.info("Ghost %s joined room %s", name, room_id) except Exception as e: logger.warning("Ghost user creation failed for %s: %s", name, e) elif plat in ("discord", "discord-self"): try: from tools._discord_helpers import ( check_admin_permission, get_channel, require_discord_client, ) from tools._egregore_discord import ( compute_public_avatar_url, create_egregore_webhook, ) client = require_discord_client(ctx) if isinstance(client, str): logger.warning("Discord egregore: %s", client) else: channel_id = getattr(ctx, "channel_id", None) or "" if not channel_id: logger.warning("Discord egregore: missing channel_id") else: channel = await get_channel(client, channel_id) if isinstance(channel, str): logger.warning("Discord egregore channel: %s", channel) else: guild = getattr(channel, "guild", None) if guild is None: logger.warning( "Discord egregore: webhooks need a server channel", ) else: gid = str(guild.id) ok_perm, err = await check_admin_permission( client, ctx.user_id, gid, ) if not ok_perm: logger.warning( "Discord egregore permission: %s", err, ) else: wh, werr = await create_egregore_webhook( channel, name, ) if wh: av = compute_public_avatar_url( name, sprite_expressions, ) egregore_state["discord"] = { "guild_id": gid, "channel_id": str(channel_id), "webhook_id": str(wh.id), "webhook_url": wh.url, } if av: egregore_state["discord"]["avatar_url"] = av discord_webhook_ready = True else: logger.warning( "Discord egregore webhook: %s", werr, ) except Exception as e: logger.warning( "Discord egregore webhook failed for %s: %s", name, e, exc_info=True, ) # Store in active egregores list (per-channel) active[name] = egregore_state await redis.set(ek, json.dumps(active)) # Add to sprite characters for VN canvas (per-channel) chars_raw = await redis.get(ck) characters: dict = json.loads(chars_raw) if chars_raw else {} if name not in characters: # Default position: opposite side from Star characters[name] = { "id": name, "name": name.title(), "expression": "default", "position_x": 15, # left side (Star defaults right) "flip": True, # face Star "sprite_base": f"/assets/egregores/{name}/", "expressions": sprite_expressions, "visible": True, } await redis.set(ck, json.dumps(characters)) # Publish sprite update (with channel_key for WS filtering) try: await redis.publish( "star:sprite:update", json.dumps( { "action": "summon", "channel_key": channel_key, "egregore": name, "characters": characters, } ), ) except Exception: pass logger.info( "Egregore summoned: %s (%d sprites, %d char prompt)", name, len(sprite_expressions), len(summoning_prompt), ) return json.dumps( { "success": True, "message": f"Egregore '{name.title()}' has been summoned.", "egregore": name, "display_name": name.title(), "summoning_prompt": summoning_prompt, "available_sprites": sprite_expressions, "canvas_position": "left (15%), facing right", "ghost_user_joined": ghost_joined, "discord_webhook_ready": discord_webhook_ready, "instructions": ( "The summoning prompt has been loaded. You may now channel " f"this egregore. Use set_sprite(character='{name}') to " "control their position. Use dismiss_egregore to send them back.\n\n" "IMPORTANT: When speaking AS this egregore (not as yourself), " f"prefix your message with [EGREGORE:{name}] so the message " f"appears from {name.title()}'s ghost account, NOT from you. " f"Example: '[EGREGORE:{name}] Hello, I am {name.title()}.'\n" f"You can also write '**{name.title()}:** dialogue' and it will " "auto-route. Messages WITHOUT the prefix will be sent as YOU (Star)." ), }, indent=2, ) except Exception as e: logger.error("summon_egregore error: %s", e, exc_info=True) return json.dumps( { "success": False, "error": f"Failed to summon egregore: {e}", } )