Source code for tools.read_tool_code

"""Read Python tool source code from the tools/ directory."""

from __future__ import annotations

import asyncio
import jsonutil as json
import logging
import os
from pathlib import Path
from typing import TYPE_CHECKING

import aiofiles

if TYPE_CHECKING:
    from tool_context import ToolContext

logger = logging.getLogger(__name__)

TOOL_NAME = "read_tool_code"
TOOL_DESCRIPTION = (
    "Read the source code of a Python tool file from the tools/ directory. "
    "Useful for examining existing tool implementations or debugging. "
    "Requires READ_TOOL_CODE privilege."
)
TOOL_PARAMETERS = {
    "type": "object",
    "properties": {
        "filename": {
            "type": "string",
            "description": "Name of the tool file to read (e.g. 'shell_tool.py').",
        },
        "max_lines": {
            "type": "integer",
            "description": "Max number of lines to return (default: all).",
        },
        "start_line": {
            "type": "integer",
            "description": "Starting line number (1-indexed).",
        },
        "end_line": {
            "type": "integer",
            "description": "Ending line number (1-indexed).",
        },
    },
    "required": ["filename"],
}

TOOLS_ROOT = Path(__file__).resolve().parent


async def _check_read_tool_code(ctx: ToolContext | None) -> str | None:
    """Authorize reading tool source code, returning error JSON or None.

    Gatekeeper for the ``read_tool_code`` tool. Reading tool implementations
    can expose security-sensitive logic, so it is reserved for callers holding
    the ``READ_TOOL_CODE`` privilege; this is the choke point that enforces
    that before any file is opened.

    It reads ``redis``, ``config`` and ``user_id`` off the tool context and
    delegates to ``tools.alter_privileges.has_privilege``, which consults the
    Redis-backed privilege store. A denied attempt is logged at warning level
    via the module ``logger`` (tagged ``SECURITY``). A missing context, a
    failed check, or an unavailable privilege subsystem each yield a JSON error
    payload; a successful check returns ``None``. Called by :func:`run` before
    it touches the filesystem.

    Args:
        ctx: Tool execution context carrying ``redis``, ``config`` and
            ``user_id``; ``None`` is treated as an authorization failure.

    Returns:
        str | None: A JSON error string when the caller is unauthorized or the
        privilege system is unavailable, or ``None`` when reading is allowed.
    """
    if ctx is None:
        return json.dumps(
            {
                "success": False,
                "error": (
                    "Authorization failed: tool context is required to verify "
                    "privileges for read_tool_code."
                ),
            }
        )
    try:
        from tools.alter_privileges import has_privilege, PRIVILEGES

        redis = getattr(ctx, "redis", None)
        config = getattr(ctx, "config", None)
        user_id = getattr(ctx, "user_id", "") or ""
        if not await has_privilege(
            redis,
            user_id,
            PRIVILEGES["READ_TOOL_CODE"],
            config,
        ):
            logger.warning(
                "SECURITY: User %s attempted read_tool_code without READ_TOOL_CODE",
                user_id,
            )
            return json.dumps(
                {
                    "success": False,
                    "error": (
                        "You do not have the READ_TOOL_CODE privilege. "
                        "Ask an admin to grant it with the alter_privileges tool."
                    ),
                }
            )
    except ImportError:
        return json.dumps(
            {
                "success": False,
                "error": "Privilege system unavailable.",
            }
        )
    return None


[docs] async def run( filename: str, max_lines: int = None, start_line: int = None, end_line: int = None, ctx: ToolContext | None = None, **_kwargs, ) -> str: """Read the source of a Python tool file from the ``tools/`` directory. Entry point for the ``read_tool_code`` tool, used to inspect or debug existing tool implementations. It returns the full file or a bounded slice, after a privilege check and several safety checks that keep reads confined to ``tools/`` and to ``.py`` files. It first calls :func:`_check_read_tool_code`, short-circuiting with that helper's error JSON when the caller lacks ``READ_TOOL_CODE``. It then rejects subdirectory components and path traversal (resolving against ``TOOLS_ROOT`` and requiring the result to stay inside it), checks existence and file-ness via ``filepath.exists`` / ``filepath.is_file`` offloaded with ``asyncio.to_thread``, and requires a ``.py`` extension. The file is read asynchronously with ``aiofiles``; a ``start_line``/``end_line`` window or a ``max_lines`` head is applied, and the result is serialized with ``jsonutil`` (imported as ``json``). Errors (permission, non-UTF-8, anything else) are logged via the module ``logger`` and returned as JSON rather than raised. This module-level ``run`` is discovered and dispatched by the tool runtime via ``tool_loader.py`` (``getattr(module, "run")``); no other in-repo callers. Args: filename (str): Bare filename (no path) of a ``.py`` file under ``tools/`` to read. max_lines (int): When set and no line range is given, return only the first this-many lines. start_line (int): 1-indexed first line of a range to return (with ``end_line``). end_line (int): 1-indexed last line of a range to return (with ``start_line``). ctx (ToolContext | None): Tool context used for the privilege check. Returns: str: A JSON document with the file ``content``, ``total_lines`` and ``returned_lines`` on success, or a ``{"success": False, "error": ...}`` payload for an authorization failure, an invalid/missing/non-Python file, an out-of-range line selection, or a read error. """ auth_err = await _check_read_tool_code(ctx) if auth_err: return auth_err if not filename or not filename.strip(): return json.dumps({"success": False, "error": "Filename is required"}) filename = filename.strip() if os.path.basename(filename) != filename: return json.dumps( {"success": False, "error": "Invalid filename: subdirectories not allowed"} ) filepath = (TOOLS_ROOT / filename).resolve() if not filepath.is_relative_to(TOOLS_ROOT): return json.dumps( {"success": False, "error": "Invalid filename: path traversal not allowed"} ) if not await asyncio.to_thread(filepath.exists): return json.dumps({"success": False, "error": f"File not found: {filename}"}) if not await asyncio.to_thread(filepath.is_file): return json.dumps( {"success": False, "error": f"Path is not a file: {filename}"} ) if not filename.endswith(".py"): return json.dumps( { "success": False, "error": f"Only Python files (.py) are allowed: {filename}", } ) try: async with aiofiles.open(str(filepath), "r", encoding="utf-8") as f: content = await f.read() lines = content.splitlines(keepends=True) total_lines = len(lines) if start_line is not None or end_line is not None: start_idx = (start_line - 1) if start_line is not None else 0 end_idx = end_line if end_line is not None else total_lines if start_idx < 0 or start_idx >= total_lines: return json.dumps( { "success": False, "error": f"Invalid start_line: {start_line}. File has {total_lines} lines.", } ) if end_idx < 1 or end_idx > total_lines: return json.dumps( { "success": False, "error": f"Invalid end_line: {end_line}. File has {total_lines} lines.", } ) if start_idx >= end_idx: return json.dumps( { "success": False, "error": f"start_line ({start_line}) must be less than end_line ({end_line})", } ) lines = lines[start_idx:end_idx] elif max_lines is not None: lines = lines[:max_lines] return json.dumps( { "success": True, "filename": filename, "content": "".join(lines), "total_lines": total_lines, "returned_lines": len(lines), } ) except PermissionError: return json.dumps( {"success": False, "error": f"Permission denied reading file: {filename}"} ) except UnicodeDecodeError: return json.dumps( {"success": False, "error": f"File encoding error (not UTF-8): {filename}"} ) except Exception as e: logger.error("Error reading tool file %s: %s", filepath, e) return json.dumps( {"success": False, "error": f"Unexpected error reading file: {e}"} )