"""Read Python tool source code from the tools/ directory."""
from __future__ import annotations
import asyncio
import jsonutil as json
import logging
import os
from pathlib import Path
from typing import TYPE_CHECKING
import aiofiles
if TYPE_CHECKING:
from tool_context import ToolContext
logger = logging.getLogger(__name__)
TOOL_NAME = "read_tool_code"
TOOL_DESCRIPTION = (
"Read the source code of a Python tool file from the tools/ directory. "
"Useful for examining existing tool implementations or debugging. "
"Requires READ_TOOL_CODE privilege."
)
TOOL_PARAMETERS = {
"type": "object",
"properties": {
"filename": {
"type": "string",
"description": "Name of the tool file to read (e.g. 'shell_tool.py').",
},
"max_lines": {
"type": "integer",
"description": "Max number of lines to return (default: all).",
},
"start_line": {
"type": "integer",
"description": "Starting line number (1-indexed).",
},
"end_line": {
"type": "integer",
"description": "Ending line number (1-indexed).",
},
},
"required": ["filename"],
}
TOOLS_ROOT = Path(__file__).resolve().parent
async def _check_read_tool_code(ctx: ToolContext | None) -> str | None:
"""Authorize reading tool source code, returning error JSON or None.
Gatekeeper for the ``read_tool_code`` tool. Reading tool implementations
can expose security-sensitive logic, so it is reserved for callers holding
the ``READ_TOOL_CODE`` privilege; this is the choke point that enforces
that before any file is opened.
It reads ``redis``, ``config`` and ``user_id`` off the tool context and
delegates to ``tools.alter_privileges.has_privilege``, which consults the
Redis-backed privilege store. A denied attempt is logged at warning level
via the module ``logger`` (tagged ``SECURITY``). A missing context, a
failed check, or an unavailable privilege subsystem each yield a JSON error
payload; a successful check returns ``None``. Called by :func:`run` before
it touches the filesystem.
Args:
ctx: Tool execution context carrying ``redis``, ``config`` and
``user_id``; ``None`` is treated as an authorization failure.
Returns:
str | None: A JSON error string when the caller is unauthorized or the
privilege system is unavailable, or ``None`` when reading is allowed.
"""
if ctx is None:
return json.dumps(
{
"success": False,
"error": (
"Authorization failed: tool context is required to verify "
"privileges for read_tool_code."
),
}
)
try:
from tools.alter_privileges import has_privilege, PRIVILEGES
redis = getattr(ctx, "redis", None)
config = getattr(ctx, "config", None)
user_id = getattr(ctx, "user_id", "") or ""
if not await has_privilege(
redis,
user_id,
PRIVILEGES["READ_TOOL_CODE"],
config,
):
logger.warning(
"SECURITY: User %s attempted read_tool_code without READ_TOOL_CODE",
user_id,
)
return json.dumps(
{
"success": False,
"error": (
"You do not have the READ_TOOL_CODE privilege. "
"Ask an admin to grant it with the alter_privileges tool."
),
}
)
except ImportError:
return json.dumps(
{
"success": False,
"error": "Privilege system unavailable.",
}
)
return None
[docs]
async def run(
filename: str,
max_lines: int = None,
start_line: int = None,
end_line: int = None,
ctx: ToolContext | None = None,
**_kwargs,
) -> str:
"""Read the source of a Python tool file from the ``tools/`` directory.
Entry point for the ``read_tool_code`` tool, used to inspect or debug
existing tool implementations. It returns the full file or a bounded slice,
after a privilege check and several safety checks that keep reads confined
to ``tools/`` and to ``.py`` files.
It first calls :func:`_check_read_tool_code`, short-circuiting with that
helper's error JSON when the caller lacks ``READ_TOOL_CODE``. It then
rejects subdirectory components and path traversal (resolving against
``TOOLS_ROOT`` and requiring the result to stay inside it), checks
existence and file-ness via ``filepath.exists`` / ``filepath.is_file``
offloaded with ``asyncio.to_thread``, and requires a ``.py`` extension. The
file is read asynchronously with ``aiofiles``; a ``start_line``/``end_line``
window or a ``max_lines`` head is applied, and the result is serialized
with ``jsonutil`` (imported as ``json``). Errors (permission, non-UTF-8,
anything else) are logged via the module ``logger`` and returned as JSON
rather than raised. This module-level ``run`` is discovered and dispatched
by the tool runtime via ``tool_loader.py`` (``getattr(module, "run")``); no
other in-repo callers.
Args:
filename (str): Bare filename (no path) of a ``.py`` file under
``tools/`` to read.
max_lines (int): When set and no line range is given, return only the
first this-many lines.
start_line (int): 1-indexed first line of a range to return (with
``end_line``).
end_line (int): 1-indexed last line of a range to return (with
``start_line``).
ctx (ToolContext | None): Tool context used for the privilege check.
Returns:
str: A JSON document with the file ``content``, ``total_lines`` and
``returned_lines`` on success, or a ``{"success": False, "error":
...}`` payload for an authorization failure, an invalid/missing/non-Python
file, an out-of-range line selection, or a read error.
"""
auth_err = await _check_read_tool_code(ctx)
if auth_err:
return auth_err
if not filename or not filename.strip():
return json.dumps({"success": False, "error": "Filename is required"})
filename = filename.strip()
if os.path.basename(filename) != filename:
return json.dumps(
{"success": False, "error": "Invalid filename: subdirectories not allowed"}
)
filepath = (TOOLS_ROOT / filename).resolve()
if not filepath.is_relative_to(TOOLS_ROOT):
return json.dumps(
{"success": False, "error": "Invalid filename: path traversal not allowed"}
)
if not await asyncio.to_thread(filepath.exists):
return json.dumps({"success": False, "error": f"File not found: {filename}"})
if not await asyncio.to_thread(filepath.is_file):
return json.dumps(
{"success": False, "error": f"Path is not a file: {filename}"}
)
if not filename.endswith(".py"):
return json.dumps(
{
"success": False,
"error": f"Only Python files (.py) are allowed: {filename}",
}
)
try:
async with aiofiles.open(str(filepath), "r", encoding="utf-8") as f:
content = await f.read()
lines = content.splitlines(keepends=True)
total_lines = len(lines)
if start_line is not None or end_line is not None:
start_idx = (start_line - 1) if start_line is not None else 0
end_idx = end_line if end_line is not None else total_lines
if start_idx < 0 or start_idx >= total_lines:
return json.dumps(
{
"success": False,
"error": f"Invalid start_line: {start_line}. File has {total_lines} lines.",
}
)
if end_idx < 1 or end_idx > total_lines:
return json.dumps(
{
"success": False,
"error": f"Invalid end_line: {end_line}. File has {total_lines} lines.",
}
)
if start_idx >= end_idx:
return json.dumps(
{
"success": False,
"error": f"start_line ({start_line}) must be less than end_line ({end_line})",
}
)
lines = lines[start_idx:end_idx]
elif max_lines is not None:
lines = lines[:max_lines]
return json.dumps(
{
"success": True,
"filename": filename,
"content": "".join(lines),
"total_lines": total_lines,
"returned_lines": len(lines),
}
)
except PermissionError:
return json.dumps(
{"success": False, "error": f"Permission denied reading file: {filename}"}
)
except UnicodeDecodeError:
return json.dumps(
{"success": False, "error": f"File encoding error (not UTF-8): {filename}"}
)
except Exception as e:
logger.error("Error reading tool file %s: %s", filepath, e)
return json.dumps(
{"success": False, "error": f"Unexpected error reading file: {e}"}
)