Source code for tools.librarian_tool

"""Autonomous Librarian subagent dispatch tool.

The Librarian handles long-term memory management for Stargazer agents by
recording notes in a git-versioned paginated notebook. It operates
asynchronously with a scoped toolset.

Follows the established Redis-backed subagent lifecycle:
    _create_subagent → _call_subagent → _delete_subagent
as used by the Swarm Supervisor (scripts/swarm_supervisor.py).
"""

from __future__ import annotations

import asyncio
import logging
import uuid
from typing import Annotated

import jsonutil as json
from pydantic import Field

from tool_context import ToolContext
from tools.alter_privileges import has_privilege, PRIVILEGES

logger = logging.getLogger(__name__)

# ──────────────────────────────────────────────────────────────────────
# Constants
# ──────────────────────────────────────────────────────────────────────

_LIBRARIAN_TTL = 86400 * 7  # 7-day safety-net TTL for temp definitions

# Default model — must be in subagent_tools._ALLOWED_MODELS_SET
_DEFAULT_LIBRARIAN_MODEL = "gemini-3-flash"

# Exact tool names as registered in tools/notebook_tools.py TOOLS list.
# _call_subagent uses _build_scoped_registry(ctx, tool_names) to pull
# these from the main ToolRegistry, so they must be registered there.
_LIBRARIAN_TOOL_NAMES = [
    "get_notebook_toc",
    "get_global_notebook_toc",
    "get_notebook_page",
    "write_notebook_entry",
    "get_instance_conversation_history",
    "get_notebook_git_log",
    "get_notebook_git_diff",
]

LIBRARIAN_SYSTEM_PROMPT = """\
You are the Stargazer Librarian, an autonomous subagent specialized in \
long-term memory management and knowledge synthesis.

OBJECTIVE:
Your goal is to maintain the main agent's "Notebook" (a git-versioned, \
paginated state file system). You must analyze information provided to you, \
plan your updates, execute them using your notebook tools, and verify the \
consistency of the state after your changes.

WORKFLOW:
1.  **Analyze**: Review the conversation history of the instance you are \
documenting to extract salient facts, decisions, and outcomes.
2.  **Plan**: Determine which notebook page needs updating or if a new entry \
is required.
3.  **Execute**: Use `write_notebook_entry` to document information. Use \
`get_notebook_toc` or `get_notebook_page` to check current state.
4.  **Verify**: Ensure your notes are accurately recorded and follow a \
consistent structure.
5.  **Report**: Provide a concise summary of your work.

CONSTRAINTS:
- Be precise and objective. Avoid fluff.
- Use structured notes (bullet points, clear headers).
- Always commit your changes with a descriptive message.
- You operate in a background process; your reporting goes to the \
Stargazer-System-Log.
"""


# ──────────────────────────────────────────────────────────────────────
# Dispatch entry point
# ──────────────────────────────────────────────────────────────────────

_active_librarians: set[asyncio.Task] = set()


# ---------------------------------------------------------------------------
# Authorization
# ---------------------------------------------------------------------------


async def _check_subagent_access(ctx) -> str | None:
    """Gate Librarian dispatch on the ``SUBAGENT_ACCESS`` privilege.

    The single authorization check guarding this tool: the Librarian spins up a
    real autonomous subagent with its own LLM loop and notebook tools, so only
    callers holding the ``SUBAGENT_ACCESS`` bit may invoke it.

    Interactions: reads ``ctx.user_id``, ``ctx.redis``, and ``ctx.config`` and
    calls ``has_privilege`` from ``tools.alter_privileges`` (a Redis-backed
    privilege lookup); builds a JSON denial with ``json.dumps``. Called by
    :func:`dispatch_librarian` before it schedules any work. (The same
    ``_check_subagent_access`` name is used independently in
    ``tools/subagent_tools.py`` and ``tools/swarm_state_tools.py``; this is the
    Librarian's own copy.)

    Args:
        ctx: The tool context, supplying identity, Redis, and config.

    Returns:
        str | None: ``None`` when the caller is authorized, otherwise a JSON error
        string explaining the missing ``SUBAGENT_ACCESS`` privilege.
    """
    user_id = getattr(ctx, "user_id", "") or ""
    redis = getattr(ctx, "redis", None)
    config = getattr(ctx, "config", None)
    if not await has_privilege(redis, user_id, PRIVILEGES["SUBAGENT_ACCESS"], config):
        return json.dumps(
            {
                "success": False,
                "error": "The user does not have the SUBAGENT_ACCESS privilege. Ask an admin to grant it with the alter_privileges tool.",
            }
        )
    return None


# ──────────────────────────────────────────────────────────────────────


[docs] async def dispatch_librarian( ctx: ToolContext, instruction: Annotated[ str, Field( description="Task for the librarian (e.g., 'Summarize the architecture decision')" ), ], model: Annotated[ str, Field(description="LLM model to use for the librarian."), ] = _DEFAULT_LIBRARIAN_MODEL, ) -> str: """Spawn the Librarian subagent to manage the notebook in the background. The ``dispatch_librarian`` tool handler. After an authorization check it schedules the Librarian runner as a detached asyncio task and returns immediately, so the LLM gets an instant confirmation while the actual notebook analysis and writing happen out of band; results land in the system log rather than the reply. Interactions: gates on :func:`_check_subagent_access`; launches :func:`_librarian_runner` via ``asyncio.create_task`` and keeps a strong reference in the module-level ``_active_librarians`` set (with a done-callback that discards it) so the loop does not GC-cancel the in-flight task. The runner itself, not this function, touches Redis, subagents, and the LLM. Dispatched by name through ``tool_loader`` from the ``TOOLS`` list (and exercised directly by ``tests/test_librarian_tool.py`` and ``tests/test_subagent_access_gating.py``). Args: ctx: The tool context, supplying identity, Redis, platform, and channel. instruction: The task for the Librarian (e.g. summarize a decision). model: The LLM model the Librarian should use; defaults to :data:`_DEFAULT_LIBRARIAN_MODEL`. Returns: str: A plain confirmation string on dispatch, or the JSON error from :func:`_check_subagent_access` when the caller is unauthorized. """ auth_err = await _check_subagent_access(ctx) if auth_err: return auth_err task = asyncio.create_task(_librarian_runner(ctx, instruction, model)) _active_librarians.add(task) task.add_done_callback(_active_librarians.discard) return ( f"Librarian dispatched with instruction: '{instruction}'. " "Results will appear in the system log." )
# ────────────────────────────────────────────────────────────────────── # Background runner — follows the Swarm Supervisor lifecycle: # _create_subagent → _call_subagent → _delete_subagent # ────────────────────────────────────────────────────────────────────── async def _librarian_runner( ctx: ToolContext, instruction: str, model_name: str, ) -> None: """Drive the full Librarian subagent lifecycle as a detached background task. The coroutine launched by :func:`dispatch_librarian`. It follows the Swarm Supervisor's three-phase pattern: register a temporary subagent definition, call it with a prompt that has it review recent conversation history and update the notebook, then clean the definition up — reporting the outcome (or any failure) to the system log instead of to the user. A short initial sleep lets the parent agent flush its buffer first to avoid a context blind spot. Interactions: imports and uses ``_create_subagent`` / ``_call_subagent`` / ``_delete_subagent`` (and ``_DEF_PREFIX``) from ``tools.subagent_tools``; reads ``ctx.platform`` and ``ctx.channel_id`` to form the instance id; overrides the temp definition's Redis TTL to :data:`_LIBRARIAN_TTL` via ``ctx.redis.expire``; retries the subagent call up to three times with backoff; reports progress and errors through :func:`_log_report` (which writes a tool-call summary into the message cache); and parses JSON envelopes with ``json.loads``/``json.dumps``. Logs throughout. Called by :func:`dispatch_librarian` (scheduled as a task) and directly by ``tests/test_librarian_tool.py``. Args: ctx: The tool context, supplying Redis, platform, and channel. instruction: The Librarian's task, forwarded into the subagent prompt. model_name: The LLM model the temporary subagent is created with. Returns: None. All results and errors are surfaced via the system log. """ from tools.subagent_tools import ( _create_subagent, _call_subagent, _delete_subagent, _DEF_PREFIX, ) instance_uid = f"{ctx.platform}:{ctx.channel_id}" logger.info("Librarian runner started for %s: %s", instance_uid, instruction) # Prevent Context Blind-Spot by ensuring Parent Agent commits its current buffer await asyncio.sleep(5) subagent_id: str | None = None try: # ── Phase 1: Register a temporary subagent definition ──────── create_raw = await _create_subagent( description=f"Librarian for {instance_uid}", system_prompt=LIBRARIAN_SYSTEM_PROMPT, tools=_LIBRARIAN_TOOL_NAMES, model=model_name, ctx=ctx, ) create_result = json.loads(create_raw) if not create_result.get("success"): error_msg = create_result.get("error", "Unknown creation error") logger.error( "Librarian creation failed for %s: %s", instance_uid, error_msg, ) await _log_report( ctx, instance_uid, instruction, f"❌ Librarian creation failed: {error_msg}", ) return subagent_id = create_result["subagent_id"] logger.info( "Librarian temp definition created: %s for %s", subagent_id, instance_uid, ) # Override TTL to 7 days (safety net if cleanup fails) try: r = ctx.redis await r.expire(f"{_DEF_PREFIX}{subagent_id}", _LIBRARIAN_TTL) except Exception: logger.debug("TTL override skipped", exc_info=True) # ── Phase 2: Call the subagent by ID ───────────────────────── prompt = ( f"INSTRUCTION: {instruction}\n\n" f"You are operating on instance: {instance_uid}.\n" f"First, use `get_instance_conversation_history` to understand " f"the recent context.\n" f"Then, use your notebook tools to fulfill the instruction." ) call_raw = "" for attempt in range(3): try: call_raw = await _call_subagent( subagent_id=subagent_id, prompt=prompt, ctx=ctx, ) break except Exception as e: if attempt == 2: raise logger.warning( "Librarian call attempt %d failed for %s: %s", attempt + 1, instance_uid, e, ) await asyncio.sleep(2**attempt) # Extract the LLM response text from the JSON envelope try: call_result = json.loads(call_raw) response_text = call_result.get("response", call_raw) except (json.JSONDecodeError, TypeError): response_text = call_raw # ── Phase 2b: Report success ───────────────────────────────── report_msg = ( f"📑 **Librarian Report** ({instance_uid})\n" f"**Task**: {instruction}\n" f"**Outcome**: {response_text}" ) await _log_report(ctx, instance_uid, instruction, report_msg) logger.info("Librarian runner completed for %s", instance_uid) except Exception as e: logger.exception("Librarian runner failed for %s", instance_uid) await _log_report( ctx, instance_uid, instruction, f"❌ **Librarian Error** ({instance_uid}): {str(e)}", ) finally: # ── Phase 3: Cleanup the temporary definition ──────────────── if subagent_id is not None: try: await _delete_subagent(subagent_id, ctx=ctx) logger.info( "Librarian temp definition cleaned up: %s", subagent_id, ) except Exception: logger.warning( "Librarian cleanup failed for %s (TTL will expire it)", subagent_id, exc_info=True, ) async def _log_report( ctx: ToolContext, instance_uid: str, instruction: str, report_text: str, ) -> None: """Log a librarian report to the Stargazer-System-Log pipeline. Uses the real ``MessageCache.log_tool_call_summary`` signature: summary_id, record_ids, summary_text, channel_id, platform """ try: from message_cache import MessageCache cache = MessageCache(ctx.redis) await cache.log_tool_call_summary( summary_id=uuid.uuid4().hex[:12], record_ids=[], summary_text=report_text, channel_id=str(ctx.channel_id), platform=ctx.platform, ) except Exception: logger.warning( "Failed to log librarian report for %s", instance_uid, exc_info=True, ) # ------------------------------------------------------------------ # Tool Registry Definitions # ------------------------------------------------------------------ TOOLS = [ { "name": "dispatch_librarian", "handler": dispatch_librarian, "description": ( "Trigger the Librarian subagent to document information " "or manage the notebook." ), "parameters": { "type": "object", "properties": { "instruction": { "type": "string", "description": "Specific goal for the librarian.", }, "model": { "type": "string", "description": (f"LLM model (default {_DEFAULT_LIBRARIAN_MODEL})."), "default": _DEFAULT_LIBRARIAN_MODEL, }, }, "required": ["instruction"], }, "no_background": True, }, ]