Source code for tools.file_ops

"""File operations: create, read, edit, delete files on disk."""

from __future__ import annotations

import asyncio
from datetime import datetime, timezone
import jsonutil as json
import logging
from pathlib import Path
import shutil

from tools.alter_privileges import has_privilege, PRIVILEGES

logger = logging.getLogger(__name__)

ALLOWED_BASES: dict[str, Path] = {
    "large_files": Path("/home/star/large_files"),
    "git": Path("/home/star/git"),
}
DEFAULT_BASE = "large_files"

TOOL_NAME = "file_ops"
TOOL_DESCRIPTION = (
    "Manage files on the server. Actions: 'create', 'read', "
    "'edit', 'delete', 'list', 'rmtree'. Paths starting with 'git/' "
    "access /home/star/git; all other paths access "
    "/home/star/large_files. 'rmtree' recursively removes a directory."
)
TOOL_PARAMETERS = {
    "type": "object",
    "properties": {
        "action": {
            "type": "string",
            "enum": ["create", "read", "edit", "delete", "list", "rmtree"],
            "description": "The file operation to perform.",
        },
        "file_path": {
            "type": "string",
            "description": (
                "Path relative to an allowed base directory. "
                "Prefix with 'git/' for /home/star/git, otherwise "
                "defaults to /home/star/large_files."
            ),
        },
        "contents": {
            "type": "string",
            "description": ("File contents (create only, optional)."),
        },
        "old_string": {
            "type": "string",
            "description": ("String to search for (edit only)."),
        },
        "new_string": {
            "type": "string",
            "description": ("Replacement string (edit only)."),
        },
        "replace_all": {
            "type": "boolean",
            "description": ("Replace all occurrences (edit only, " "default false)."),
        },
    },
    "required": ["action", "file_path"],
}


# ------------------------------------------------------------------
# Path validation
# ------------------------------------------------------------------


def _pick_base(file_path: str) -> tuple[Path, str]:
    """Choose the allowed base directory for a request path and split off the rest.

    Implements the sandboxing convention for this tool: paths prefixed with a key
    in ``ALLOWED_BASES`` (currently ``git/`` to ``/home/star/git``) are routed to
    that base, and everything else falls back to ``DEFAULT_BASE``
    (``/home/star/large_files``). Confining every operation under one of these
    roots is what prevents the file tool from touching arbitrary host paths.

    This is a pure, leading-slash-tolerant helper called by :func:`_resolve`
    (to anchor and traversal-check a path before any I/O) and by :func:`_list`
    (to compute a path relative to its base for display).

    Args:
        file_path: The user-supplied path, e.g. ``"git/foo.txt"`` or
            ``"notes/a.md"``.

    Returns:
        tuple[Path, str]: ``(base_path, relative_remainder)`` where
        ``base_path`` is the chosen allowed root and ``relative_remainder`` is
        the path within it (``"."`` when the prefix itself was given).
    """
    stripped = file_path.lstrip("/")
    for prefix, base in ALLOWED_BASES.items():
        if stripped == prefix or stripped.startswith(prefix + "/"):
            remainder = stripped[len(prefix) :].lstrip("/") or "."
            return base, remainder
    return ALLOWED_BASES[DEFAULT_BASE], stripped


def _resolve(file_path: str) -> tuple[Path | None, str | None]:
    """Resolve *file_path* safely within an allowed base directory.

    Returns ``(resolved, None)`` on success or
    ``(None, error_message)`` on failure.
    """
    try:
        base_dir, remainder = _pick_base(file_path)
        base = base_dir.resolve()
        base.mkdir(parents=True, exist_ok=True)

        combined = base / remainder
        resolved = combined.resolve()
        if resolved != base and not resolved.is_relative_to(base):
            return None, (
                f"Path traversal detected: '{file_path}' "
                f"escapes the base directory."
            )
        return resolved, None
    except Exception as exc:
        return None, f"Invalid path: {exc}"


# ------------------------------------------------------------------
# Entry point
# ------------------------------------------------------------------


[docs] async def run( action: str, file_path: str, contents: str | None = None, old_string: str | None = None, new_string: str | None = None, replace_all: bool = False, **_kwargs, ) -> str: """Execute this tool and return the result. Args: action (str): The action value. file_path (str): The file path value. contents (str | None): The contents value. old_string (str | None): The old string value. new_string (str | None): The new string value. replace_all (bool): The replace all value. Returns: str: Result string. """ user_id = getattr(_kwargs.get("ctx"), "user_id", "") or "" redis = getattr(_kwargs.get("ctx"), "redis", None) config = getattr(_kwargs.get("ctx"), "config", None) if not await has_privilege(redis, user_id, PRIVILEGES["FILE_OPS_ADMIN"], config): return json.dumps( { "status": "error", "error": "You do not have the FILE_OPS_ADMIN privilege.", } ) if action == "create": return await asyncio.to_thread(_create, file_path, contents) if action == "read": return await asyncio.to_thread(_read, file_path) if action == "edit": return await asyncio.to_thread( _edit, file_path, old_string, new_string, replace_all ) if action == "delete": return await asyncio.to_thread(_delete, file_path) if action == "list": return await asyncio.to_thread(_list, file_path) if action == "rmtree": return await asyncio.to_thread(_rmtree, file_path) return json.dumps( { "status": "error", "error": f"Unknown action: '{action}'", } )
# ------------------------------------------------------------------ # Action implementations # ------------------------------------------------------------------ def _create(file_path: str, contents: str | None) -> str: """Internal helper: create. Args: file_path (str): The file path value. contents (str | None): The contents value. Returns: str: Result string. """ path, err = _resolve(file_path) if err: return json.dumps({"status": "error", "error": err}) if path.exists() and path.is_file(): return json.dumps( { "status": "error", "error": ( f"File already exists: {file_path}. " f"Use edit action to modify." ), } ) try: path.parent.mkdir(parents=True, exist_ok=True) path.write_text(contents or "", encoding="utf-8") size = path.stat().st_size logger.info("Created file: %s (%d bytes)", file_path, size) return json.dumps( { "status": "success", "message": "File created successfully.", "file_path": str(path), "size": size, } ) except PermissionError: return json.dumps( { "status": "error", "error": f"Permission denied: {file_path}", } ) except Exception as exc: return json.dumps( { "status": "error", "error": f"Failed to create file: {exc}", } ) def _read(file_path: str) -> str: """Internal helper: read. Args: file_path (str): The file path value. Returns: str: Result string. """ path, err = _resolve(file_path) if err: return json.dumps({"status": "error", "error": err}) if not path.exists(): return json.dumps( { "status": "error", "error": f"File does not exist: {file_path}", } ) if not path.is_file(): return json.dumps( { "status": "error", "error": f"Path is not a file: {file_path}", } ) try: text = path.read_text(encoding="utf-8") except UnicodeDecodeError: return json.dumps( { "status": "error", "error": (f"File is not a text file (binary): {file_path}"), } ) size = path.stat().st_size logger.info("Read file: %s (%d bytes)", file_path, size) return json.dumps( { "status": "success", "message": "File read successfully.", "file_path": str(path), "contents": text, "size": size, } ) def _edit( file_path: str, old_string: str | None, new_string: str | None, replace_all: bool, ) -> str: """Internal helper: edit. Args: file_path (str): The file path value. old_string (str | None): The old string value. new_string (str | None): The new string value. replace_all (bool): The replace all value. Returns: str: Result string. """ if old_string is None or new_string is None: return json.dumps( { "status": "error", "error": ("old_string and new_string are required for edit."), } ) path, err = _resolve(file_path) if err: return json.dumps({"status": "error", "error": err}) if not path.exists(): return json.dumps( { "status": "error", "error": ( f"File does not exist: {file_path}. " f"Use create action for new files." ), } ) if not path.is_file(): return json.dumps( { "status": "error", "error": f"Path is not a file: {file_path}", } ) try: text = path.read_text(encoding="utf-8") except UnicodeDecodeError: return json.dumps( { "status": "error", "error": f"File is not a text file: {file_path}", } ) count = text.count(old_string) if count == 0: return json.dumps( { "status": "error", "error": f"Search string not found in {file_path}.", } ) if replace_all: new_text = text.replace(old_string, new_string) replacements = count else: new_text = text.replace(old_string, new_string, 1) replacements = 1 try: path.write_text(new_text, encoding="utf-8") logger.info( "Edited file: %s (%d replacements)", file_path, replacements, ) return json.dumps( { "status": "success", "message": "File edited successfully.", "file_path": str(path), "replacements": replacements, } ) except PermissionError: return json.dumps( { "status": "error", "error": f"Permission denied: {file_path}", } ) except Exception as exc: return json.dumps( { "status": "error", "error": f"Failed to edit file: {exc}", } ) def _list(file_path: str) -> str: """Internal helper: list. Args: file_path (str): The file path value. Returns: str: Result string. """ path, err = _resolve(file_path) if err: return json.dumps({"status": "error", "error": err}) if not path.exists(): return json.dumps( { "status": "error", "error": f"Directory does not exist: {file_path}", } ) if not path.is_dir(): return json.dumps( { "status": "error", "error": f"Path is not a directory: {file_path}", } ) try: entries = [] for entry in path.iterdir(): stat = entry.stat() entries.append( { "name": entry.name, "type": "dir" if entry.is_dir() else "file", "size": stat.st_size if entry.is_file() else 0, "modified": datetime.fromtimestamp( stat.st_mtime, tz=timezone.utc ).isoformat(), } ) entries.sort(key=lambda e: (0 if e["type"] == "dir" else 1, e["name"].lower())) base_dir, _ = _pick_base(file_path) relative = str(path.relative_to(base_dir.resolve())) return json.dumps( { "status": "success", "directory": relative, "entry_count": len(entries), "entries": entries, } ) except PermissionError: return json.dumps( { "status": "error", "error": f"Permission denied: {file_path}", } ) except Exception as exc: return json.dumps( { "status": "error", "error": f"Failed to list directory: {exc}", } ) def _rmtree(file_path: str) -> str: """Recursively delete a directory tree under an allowed base. Backs the ``rmtree`` action of the ``file_ops`` tool, performing the destructive :func:`shutil.rmtree` removal on the filesystem. As a guard, it resolves the path via :func:`_resolve` (which traversal-checks it) and then refuses to remove any of the ``ALLOWED_BASES`` roots themselves, requiring a subdirectory; it also rejects non-existent paths and plain files (pointing the caller at the ``delete`` action instead). Successful removals are logged. This is synchronous, blocking I/O dispatched by :func:`run` through :func:`asyncio.to_thread` when ``action == "rmtree"``; there are no other callers. Args: file_path: Path (relative to an allowed base) of the directory to remove. Returns: str: A JSON status string — ``status="success"`` with the resolved path on success, or ``status="error"`` with a reason (invalid/escaping path, base-directory refusal, missing path, not-a-directory, permission denied, or any other failure). """ path, err = _resolve(file_path) if err: return json.dumps({"status": "error", "error": err}) resolved_bases = {b.resolve() for b in ALLOWED_BASES.values()} if path in resolved_bases: return json.dumps( { "status": "error", "error": ( "Refusing to remove a base directory. " "Specify a subdirectory instead." ), } ) if not path.exists(): return json.dumps( { "status": "error", "error": f"Path does not exist: {file_path}", } ) if not path.is_dir(): return json.dumps( { "status": "error", "error": ( f"Path is not a directory: {file_path}. " f"Use the delete action for single files." ), } ) try: shutil.rmtree(path) logger.info("Removed directory tree: %s", file_path) return json.dumps( { "status": "success", "message": "Directory removed successfully.", "file_path": str(path), } ) except PermissionError: return json.dumps( { "status": "error", "error": f"Permission denied: {file_path}", } ) except Exception as exc: return json.dumps( { "status": "error", "error": f"Failed to remove directory: {exc}", } ) def _delete(file_path: str) -> str: """Internal helper: delete. Args: file_path (str): The file path value. Returns: str: Result string. """ path, err = _resolve(file_path) if err: return json.dumps({"status": "error", "error": err}) if not path.exists(): return json.dumps( { "status": "error", "error": f"File does not exist: {file_path}", } ) if not path.is_file(): return json.dumps( { "status": "error", "error": f"Path is not a file: {file_path}", } ) try: path.unlink() logger.info("Deleted file: %s", file_path) return json.dumps( { "status": "success", "message": "File deleted successfully.", "file_path": str(path), } ) except PermissionError: return json.dumps( { "status": "error", "error": f"Permission denied: {file_path}", } ) except Exception as exc: return json.dumps( { "status": "error", "error": f"Failed to delete file: {exc}", } )