Source code for tools.file_ops

"""File operations: create, read, edit, delete files on disk."""

from __future__ import annotations

import aiofiles
import asyncio
from datetime import datetime, timezone
import json
import logging
import os
from pathlib import Path
import shutil

logger = logging.getLogger(__name__)

ALLOWED_BASES: dict[str, Path] = {
    "large_files": Path("/home/star/large_files"),
    "git": Path("/home/star/git"),
}
DEFAULT_BASE = "large_files"

TOOL_NAME = "file_ops"
TOOL_DESCRIPTION = (
    "Manage files on the server. Actions: 'create', 'read', "
    "'edit', 'delete', 'list', 'rmtree'. Paths starting with 'git/' "
    "access /home/star/git; all other paths access "
    "/home/star/large_files. 'rmtree' recursively removes a directory."
)
TOOL_PARAMETERS = {
    "type": "object",
    "properties": {
        "action": {
            "type": "string",
            "enum": ["create", "read", "edit", "delete", "list", "rmtree"],
            "description": "The file operation to perform.",
        },
        "file_path": {
            "type": "string",
            "description": (
                "Path relative to an allowed base directory. "
                "Prefix with 'git/' for /home/star/git, otherwise "
                "defaults to /home/star/large_files."
            ),
        },
        "contents": {
            "type": "string",
            "description": (
                "File contents (create only, optional)."
            ),
        },
        "old_string": {
            "type": "string",
            "description": (
                "String to search for (edit only)."
            ),
        },
        "new_string": {
            "type": "string",
            "description": (
                "Replacement string (edit only)."
            ),
        },
        "replace_all": {
            "type": "boolean",
            "description": (
                "Replace all occurrences (edit only, "
                "default false)."
            ),
        },
    },
    "required": ["action", "file_path"],
}


# ------------------------------------------------------------------
# Path validation
# ------------------------------------------------------------------

def _pick_base(file_path: str) -> tuple[Path, str]:
    """Return ``(base_path, relative_remainder)`` for *file_path*."""
    stripped = file_path.lstrip("/")
    for prefix, base in ALLOWED_BASES.items():
        if stripped == prefix or stripped.startswith(prefix + "/"):
            remainder = stripped[len(prefix):].lstrip("/") or "."
            return base, remainder
    return ALLOWED_BASES[DEFAULT_BASE], stripped


def _resolve(file_path: str) -> tuple[Path | None, str | None]:
    """Resolve *file_path* safely within an allowed base directory.

    Returns ``(resolved, None)`` on success or
    ``(None, error_message)`` on failure.
    """
    try:
        base_dir, remainder = _pick_base(file_path)
        base = base_dir.resolve()
        base.mkdir(parents=True, exist_ok=True)

        combined = base / remainder
        resolved = combined.resolve()
        if not str(resolved).startswith(str(base)):
            return None, (
                f"Path traversal detected: '{file_path}' "
                f"escapes the base directory."
            )
        return resolved, None
    except Exception as exc:
        return None, f"Invalid path: {exc}"


# ------------------------------------------------------------------
# Entry point
# ------------------------------------------------------------------

[docs] async def run( action: str, file_path: str, contents: str | None = None, old_string: str | None = None, new_string: str | None = None, replace_all: bool = False, **_kwargs, ) -> str: """Execute this tool and return the result. Args: action (str): The action value. file_path (str): The file path value. contents (str | None): The contents value. old_string (str | None): The old string value. new_string (str | None): The new string value. replace_all (bool): The replace all value. Returns: str: Result string. """ if action == "create": return await asyncio.to_thread(_create, file_path, contents) if action == "read": return await asyncio.to_thread(_read, file_path) if action == "edit": return await asyncio.to_thread(_edit, file_path, old_string, new_string, replace_all) if action == "delete": return await asyncio.to_thread(_delete, file_path) if action == "list": return await asyncio.to_thread(_list, file_path) if action == "rmtree": return await asyncio.to_thread(_rmtree, file_path) return json.dumps({ "status": "error", "error": f"Unknown action: '{action}'", })
# ------------------------------------------------------------------ # Action implementations # ------------------------------------------------------------------ def _create(file_path: str, contents: str | None) -> str: """Internal helper: create. Args: file_path (str): The file path value. contents (str | None): The contents value. Returns: str: Result string. """ path, err = _resolve(file_path) if err: return json.dumps({"status": "error", "error": err}) if path.exists() and path.is_file(): return json.dumps({ "status": "error", "error": ( f"File already exists: {file_path}. " f"Use edit action to modify." ), }) try: path.parent.mkdir(parents=True, exist_ok=True) path.write_text(contents or "", encoding="utf-8") size = path.stat().st_size logger.info("Created file: %s (%d bytes)", file_path, size) return json.dumps({ "status": "success", "message": "File created successfully.", "file_path": str(path), "size": size, }) except PermissionError: return json.dumps({ "status": "error", "error": f"Permission denied: {file_path}", }) except Exception as exc: return json.dumps({ "status": "error", "error": f"Failed to create file: {exc}", }) def _read(file_path: str) -> str: """Internal helper: read. Args: file_path (str): The file path value. Returns: str: Result string. """ path, err = _resolve(file_path) if err: return json.dumps({"status": "error", "error": err}) if not path.exists(): return json.dumps({ "status": "error", "error": f"File does not exist: {file_path}", }) if not path.is_file(): return json.dumps({ "status": "error", "error": f"Path is not a file: {file_path}", }) try: text = path.read_text(encoding="utf-8") except UnicodeDecodeError: return json.dumps({ "status": "error", "error": ( f"File is not a text file (binary): {file_path}" ), }) size = path.stat().st_size logger.info("Read file: %s (%d bytes)", file_path, size) return json.dumps({ "status": "success", "message": "File read successfully.", "file_path": str(path), "contents": text, "size": size, }) def _edit( file_path: str, old_string: str | None, new_string: str | None, replace_all: bool, ) -> str: """Internal helper: edit. Args: file_path (str): The file path value. old_string (str | None): The old string value. new_string (str | None): The new string value. replace_all (bool): The replace all value. Returns: str: Result string. """ if old_string is None or new_string is None: return json.dumps({ "status": "error", "error": ( "old_string and new_string are required for edit." ), }) path, err = _resolve(file_path) if err: return json.dumps({"status": "error", "error": err}) if not path.exists(): return json.dumps({ "status": "error", "error": ( f"File does not exist: {file_path}. " f"Use create action for new files." ), }) if not path.is_file(): return json.dumps({ "status": "error", "error": f"Path is not a file: {file_path}", }) try: text = path.read_text(encoding="utf-8") except UnicodeDecodeError: return json.dumps({ "status": "error", "error": f"File is not a text file: {file_path}", }) count = text.count(old_string) if count == 0: return json.dumps({ "status": "error", "error": f"Search string not found in {file_path}.", }) if replace_all: new_text = text.replace(old_string, new_string) replacements = count else: new_text = text.replace(old_string, new_string, 1) replacements = 1 try: path.write_text(new_text, encoding="utf-8") logger.info( "Edited file: %s (%d replacements)", file_path, replacements, ) return json.dumps({ "status": "success", "message": "File edited successfully.", "file_path": str(path), "replacements": replacements, }) except PermissionError: return json.dumps({ "status": "error", "error": f"Permission denied: {file_path}", }) except Exception as exc: return json.dumps({ "status": "error", "error": f"Failed to edit file: {exc}", }) def _list(file_path: str) -> str: """Internal helper: list. Args: file_path (str): The file path value. Returns: str: Result string. """ path, err = _resolve(file_path) if err: return json.dumps({"status": "error", "error": err}) if not path.exists(): return json.dumps({ "status": "error", "error": f"Directory does not exist: {file_path}", }) if not path.is_dir(): return json.dumps({ "status": "error", "error": f"Path is not a directory: {file_path}", }) try: entries = [] for entry in path.iterdir(): stat = entry.stat() entries.append({ "name": entry.name, "type": "dir" if entry.is_dir() else "file", "size": stat.st_size if entry.is_file() else 0, "modified": datetime.fromtimestamp( stat.st_mtime, tz=timezone.utc ).isoformat(), }) entries.sort(key=lambda e: (0 if e["type"] == "dir" else 1, e["name"].lower())) base_dir, _ = _pick_base(file_path) relative = str(path.relative_to(base_dir.resolve())) return json.dumps({ "status": "success", "directory": relative, "entry_count": len(entries), "entries": entries, }) except PermissionError: return json.dumps({ "status": "error", "error": f"Permission denied: {file_path}", }) except Exception as exc: return json.dumps({ "status": "error", "error": f"Failed to list directory: {exc}", }) def _rmtree(file_path: str) -> str: """Recursively remove a directory. Refuses to delete a base directory.""" path, err = _resolve(file_path) if err: return json.dumps({"status": "error", "error": err}) resolved_bases = {b.resolve() for b in ALLOWED_BASES.values()} if path in resolved_bases: return json.dumps({ "status": "error", "error": ( "Refusing to remove a base directory. " "Specify a subdirectory instead." ), }) if not path.exists(): return json.dumps({ "status": "error", "error": f"Path does not exist: {file_path}", }) if not path.is_dir(): return json.dumps({ "status": "error", "error": ( f"Path is not a directory: {file_path}. " f"Use the delete action for single files." ), }) try: shutil.rmtree(path) logger.info("Removed directory tree: %s", file_path) return json.dumps({ "status": "success", "message": "Directory removed successfully.", "file_path": str(path), }) except PermissionError: return json.dumps({ "status": "error", "error": f"Permission denied: {file_path}", }) except Exception as exc: return json.dumps({ "status": "error", "error": f"Failed to remove directory: {exc}", }) def _delete(file_path: str) -> str: """Internal helper: delete. Args: file_path (str): The file path value. Returns: str: Result string. """ path, err = _resolve(file_path) if err: return json.dumps({"status": "error", "error": err}) if not path.exists(): return json.dumps({ "status": "error", "error": f"File does not exist: {file_path}", }) if not path.is_file(): return json.dumps({ "status": "error", "error": f"Path is not a file: {file_path}", }) try: path.unlink() logger.info("Deleted file: %s", file_path) return json.dumps({ "status": "success", "message": "File deleted successfully.", "file_path": str(path), }) except PermissionError: return json.dumps({ "status": "error", "error": f"Permission denied: {file_path}", }) except Exception as exc: return json.dumps({ "status": "error", "error": f"Failed to delete file: {exc}", })