"""Autonomous Librarian subagent dispatch tool.
The Librarian handles long-term memory management for Stargazer agents by
recording notes in a git-versioned paginated notebook. It operates
asynchronously with a scoped toolset.
Follows the established Redis-backed subagent lifecycle:
_create_subagent → _call_subagent → _delete_subagent
as used by the Swarm Supervisor (scripts/swarm_supervisor.py).
"""
from __future__ import annotations
import asyncio
import logging
import uuid
from typing import Annotated
import jsonutil as json
from pydantic import Field
from tool_context import ToolContext
from tools.alter_privileges import has_privilege, PRIVILEGES
logger = logging.getLogger(__name__)
# ──────────────────────────────────────────────────────────────────────
# Constants
# ──────────────────────────────────────────────────────────────────────
_LIBRARIAN_TTL = 86400 * 7 # 7-day safety-net TTL for temp definitions
# Default model — must be in subagent_tools._ALLOWED_MODELS_SET
_DEFAULT_LIBRARIAN_MODEL = "gemini-3-flash"
# Exact tool names as registered in tools/notebook_tools.py TOOLS list.
# _call_subagent uses _build_scoped_registry(ctx, tool_names) to pull
# these from the main ToolRegistry, so they must be registered there.
_LIBRARIAN_TOOL_NAMES = [
"get_notebook_toc",
"get_global_notebook_toc",
"get_notebook_page",
"write_notebook_entry",
"get_instance_conversation_history",
"get_notebook_git_log",
"get_notebook_git_diff",
]
LIBRARIAN_SYSTEM_PROMPT = """\
You are the Stargazer Librarian, an autonomous subagent specialized in \
long-term memory management and knowledge synthesis.
OBJECTIVE:
Your goal is to maintain the main agent's "Notebook" (a git-versioned, \
paginated state file system). You must analyze information provided to you, \
plan your updates, execute them using your notebook tools, and verify the \
consistency of the state after your changes.
WORKFLOW:
1. **Analyze**: Review the conversation history of the instance you are \
documenting to extract salient facts, decisions, and outcomes.
2. **Plan**: Determine which notebook page needs updating or if a new entry \
is required.
3. **Execute**: Use `write_notebook_entry` to document information. Use \
`get_notebook_toc` or `get_notebook_page` to check current state.
4. **Verify**: Ensure your notes are accurately recorded and follow a \
consistent structure.
5. **Report**: Provide a concise summary of your work.
CONSTRAINTS:
- Be precise and objective. Avoid fluff.
- Use structured notes (bullet points, clear headers).
- Always commit your changes with a descriptive message.
- You operate in a background process; your reporting goes to the \
Stargazer-System-Log.
"""
# ──────────────────────────────────────────────────────────────────────
# Dispatch entry point
# ──────────────────────────────────────────────────────────────────────
_active_librarians: set[asyncio.Task] = set()
# ---------------------------------------------------------------------------
# Authorization
# ---------------------------------------------------------------------------
async def _check_subagent_access(ctx) -> str | None:
"""Gate Librarian dispatch on the ``SUBAGENT_ACCESS`` privilege.
The single authorization check guarding this tool: the Librarian spins up a
real autonomous subagent with its own LLM loop and notebook tools, so only
callers holding the ``SUBAGENT_ACCESS`` bit may invoke it.
Interactions: reads ``ctx.user_id``, ``ctx.redis``, and ``ctx.config`` and
calls ``has_privilege`` from ``tools.alter_privileges`` (a Redis-backed
privilege lookup); builds a JSON denial with ``json.dumps``. Called by
:func:`dispatch_librarian` before it schedules any work. (The same
``_check_subagent_access`` name is used independently in
``tools/subagent_tools.py`` and ``tools/swarm_state_tools.py``; this is the
Librarian's own copy.)
Args:
ctx: The tool context, supplying identity, Redis, and config.
Returns:
str | None: ``None`` when the caller is authorized, otherwise a JSON error
string explaining the missing ``SUBAGENT_ACCESS`` privilege.
"""
user_id = getattr(ctx, "user_id", "") or ""
redis = getattr(ctx, "redis", None)
config = getattr(ctx, "config", None)
if not await has_privilege(redis, user_id, PRIVILEGES["SUBAGENT_ACCESS"], config):
return json.dumps(
{
"success": False,
"error": "The user does not have the SUBAGENT_ACCESS privilege. Ask an admin to grant it with the alter_privileges tool.",
}
)
return None
# ──────────────────────────────────────────────────────────────────────
[docs]
async def dispatch_librarian(
ctx: ToolContext,
instruction: Annotated[
str,
Field(
description="Task for the librarian (e.g., 'Summarize the architecture decision')"
),
],
model: Annotated[
str,
Field(description="LLM model to use for the librarian."),
] = _DEFAULT_LIBRARIAN_MODEL,
) -> str:
"""Spawn the Librarian subagent to manage the notebook in the background.
The ``dispatch_librarian`` tool handler. After an authorization check it
schedules the Librarian runner as a detached asyncio task and returns
immediately, so the LLM gets an instant confirmation while the actual
notebook analysis and writing happen out of band; results land in the system
log rather than the reply.
Interactions: gates on :func:`_check_subagent_access`; launches
:func:`_librarian_runner` via ``asyncio.create_task`` and keeps a strong
reference in the module-level ``_active_librarians`` set (with a done-callback
that discards it) so the loop does not GC-cancel the in-flight task. The runner
itself, not this function, touches Redis, subagents, and the LLM. Dispatched by
name through ``tool_loader`` from the ``TOOLS`` list (and exercised directly by
``tests/test_librarian_tool.py`` and ``tests/test_subagent_access_gating.py``).
Args:
ctx: The tool context, supplying identity, Redis, platform, and channel.
instruction: The task for the Librarian (e.g. summarize a decision).
model: The LLM model the Librarian should use; defaults to
:data:`_DEFAULT_LIBRARIAN_MODEL`.
Returns:
str: A plain confirmation string on dispatch, or the JSON error from
:func:`_check_subagent_access` when the caller is unauthorized.
"""
auth_err = await _check_subagent_access(ctx)
if auth_err:
return auth_err
task = asyncio.create_task(_librarian_runner(ctx, instruction, model))
_active_librarians.add(task)
task.add_done_callback(_active_librarians.discard)
return (
f"Librarian dispatched with instruction: '{instruction}'. "
"Results will appear in the system log."
)
# ──────────────────────────────────────────────────────────────────────
# Background runner — follows the Swarm Supervisor lifecycle:
# _create_subagent → _call_subagent → _delete_subagent
# ──────────────────────────────────────────────────────────────────────
async def _librarian_runner(
ctx: ToolContext,
instruction: str,
model_name: str,
) -> None:
"""Drive the full Librarian subagent lifecycle as a detached background task.
The coroutine launched by :func:`dispatch_librarian`. It follows the Swarm
Supervisor's three-phase pattern: register a temporary subagent definition,
call it with a prompt that has it review recent conversation history and
update the notebook, then clean the definition up — reporting the outcome (or
any failure) to the system log instead of to the user. A short initial sleep
lets the parent agent flush its buffer first to avoid a context blind spot.
Interactions: imports and uses ``_create_subagent`` / ``_call_subagent`` /
``_delete_subagent`` (and ``_DEF_PREFIX``) from ``tools.subagent_tools``; reads
``ctx.platform`` and ``ctx.channel_id`` to form the instance id; overrides the
temp definition's Redis TTL to :data:`_LIBRARIAN_TTL` via ``ctx.redis.expire``;
retries the subagent call up to three times with backoff; reports progress and
errors through :func:`_log_report` (which writes a tool-call summary into the
message cache); and parses JSON envelopes with ``json.loads``/``json.dumps``.
Logs throughout. Called by :func:`dispatch_librarian` (scheduled as a task) and
directly by ``tests/test_librarian_tool.py``.
Args:
ctx: The tool context, supplying Redis, platform, and channel.
instruction: The Librarian's task, forwarded into the subagent prompt.
model_name: The LLM model the temporary subagent is created with.
Returns:
None. All results and errors are surfaced via the system log.
"""
from tools.subagent_tools import (
_create_subagent,
_call_subagent,
_delete_subagent,
_DEF_PREFIX,
)
instance_uid = f"{ctx.platform}:{ctx.channel_id}"
logger.info("Librarian runner started for %s: %s", instance_uid, instruction)
# Prevent Context Blind-Spot by ensuring Parent Agent commits its current buffer
await asyncio.sleep(5)
subagent_id: str | None = None
try:
# ── Phase 1: Register a temporary subagent definition ────────
create_raw = await _create_subagent(
description=f"Librarian for {instance_uid}",
system_prompt=LIBRARIAN_SYSTEM_PROMPT,
tools=_LIBRARIAN_TOOL_NAMES,
model=model_name,
ctx=ctx,
)
create_result = json.loads(create_raw)
if not create_result.get("success"):
error_msg = create_result.get("error", "Unknown creation error")
logger.error(
"Librarian creation failed for %s: %s",
instance_uid,
error_msg,
)
await _log_report(
ctx,
instance_uid,
instruction,
f"❌ Librarian creation failed: {error_msg}",
)
return
subagent_id = create_result["subagent_id"]
logger.info(
"Librarian temp definition created: %s for %s",
subagent_id,
instance_uid,
)
# Override TTL to 7 days (safety net if cleanup fails)
try:
r = ctx.redis
await r.expire(f"{_DEF_PREFIX}{subagent_id}", _LIBRARIAN_TTL)
except Exception:
logger.debug("TTL override skipped", exc_info=True)
# ── Phase 2: Call the subagent by ID ─────────────────────────
prompt = (
f"INSTRUCTION: {instruction}\n\n"
f"You are operating on instance: {instance_uid}.\n"
f"First, use `get_instance_conversation_history` to understand "
f"the recent context.\n"
f"Then, use your notebook tools to fulfill the instruction."
)
call_raw = ""
for attempt in range(3):
try:
call_raw = await _call_subagent(
subagent_id=subagent_id,
prompt=prompt,
ctx=ctx,
)
break
except Exception as e:
if attempt == 2:
raise
logger.warning(
"Librarian call attempt %d failed for %s: %s",
attempt + 1,
instance_uid,
e,
)
await asyncio.sleep(2**attempt)
# Extract the LLM response text from the JSON envelope
try:
call_result = json.loads(call_raw)
response_text = call_result.get("response", call_raw)
except (json.JSONDecodeError, TypeError):
response_text = call_raw
# ── Phase 2b: Report success ─────────────────────────────────
report_msg = (
f"📑 **Librarian Report** ({instance_uid})\n"
f"**Task**: {instruction}\n"
f"**Outcome**: {response_text}"
)
await _log_report(ctx, instance_uid, instruction, report_msg)
logger.info("Librarian runner completed for %s", instance_uid)
except Exception as e:
logger.exception("Librarian runner failed for %s", instance_uid)
await _log_report(
ctx,
instance_uid,
instruction,
f"❌ **Librarian Error** ({instance_uid}): {str(e)}",
)
finally:
# ── Phase 3: Cleanup the temporary definition ────────────────
if subagent_id is not None:
try:
await _delete_subagent(subagent_id, ctx=ctx)
logger.info(
"Librarian temp definition cleaned up: %s",
subagent_id,
)
except Exception:
logger.warning(
"Librarian cleanup failed for %s (TTL will expire it)",
subagent_id,
exc_info=True,
)
async def _log_report(
ctx: ToolContext,
instance_uid: str,
instruction: str,
report_text: str,
) -> None:
"""Log a librarian report to the Stargazer-System-Log pipeline.
Uses the real ``MessageCache.log_tool_call_summary`` signature:
summary_id, record_ids, summary_text, channel_id, platform
"""
try:
from message_cache import MessageCache
cache = MessageCache(ctx.redis)
await cache.log_tool_call_summary(
summary_id=uuid.uuid4().hex[:12],
record_ids=[],
summary_text=report_text,
channel_id=str(ctx.channel_id),
platform=ctx.platform,
)
except Exception:
logger.warning(
"Failed to log librarian report for %s",
instance_uid,
exc_info=True,
)
# ------------------------------------------------------------------
# Tool Registry Definitions
# ------------------------------------------------------------------
TOOLS = [
{
"name": "dispatch_librarian",
"handler": dispatch_librarian,
"description": (
"Trigger the Librarian subagent to document information "
"or manage the notebook."
),
"parameters": {
"type": "object",
"properties": {
"instruction": {
"type": "string",
"description": "Specific goal for the librarian.",
},
"model": {
"type": "string",
"description": (f"LLM model (default {_DEFAULT_LIBRARIAN_MODEL})."),
"default": _DEFAULT_LIBRARIAN_MODEL,
},
},
"required": ["instruction"],
},
"no_background": True,
},
]