"""File operations: create, read, edit, delete files on disk."""
from __future__ import annotations
import aiofiles
import asyncio
from datetime import datetime, timezone
import json
import logging
import os
from pathlib import Path
import shutil
logger = logging.getLogger(__name__)
ALLOWED_BASES: dict[str, Path] = {
"large_files": Path("/home/star/large_files"),
"git": Path("/home/star/git"),
}
DEFAULT_BASE = "large_files"
TOOL_NAME = "file_ops"
TOOL_DESCRIPTION = (
"Manage files on the server. Actions: 'create', 'read', "
"'edit', 'delete', 'list', 'rmtree'. Paths starting with 'git/' "
"access /home/star/git; all other paths access "
"/home/star/large_files. 'rmtree' recursively removes a directory."
)
TOOL_PARAMETERS = {
"type": "object",
"properties": {
"action": {
"type": "string",
"enum": ["create", "read", "edit", "delete", "list", "rmtree"],
"description": "The file operation to perform.",
},
"file_path": {
"type": "string",
"description": (
"Path relative to an allowed base directory. "
"Prefix with 'git/' for /home/star/git, otherwise "
"defaults to /home/star/large_files."
),
},
"contents": {
"type": "string",
"description": (
"File contents (create only, optional)."
),
},
"old_string": {
"type": "string",
"description": (
"String to search for (edit only)."
),
},
"new_string": {
"type": "string",
"description": (
"Replacement string (edit only)."
),
},
"replace_all": {
"type": "boolean",
"description": (
"Replace all occurrences (edit only, "
"default false)."
),
},
},
"required": ["action", "file_path"],
}
# ------------------------------------------------------------------
# Path validation
# ------------------------------------------------------------------
def _pick_base(file_path: str) -> tuple[Path, str]:
"""Return ``(base_path, relative_remainder)`` for *file_path*."""
stripped = file_path.lstrip("/")
for prefix, base in ALLOWED_BASES.items():
if stripped == prefix or stripped.startswith(prefix + "/"):
remainder = stripped[len(prefix):].lstrip("/") or "."
return base, remainder
return ALLOWED_BASES[DEFAULT_BASE], stripped
def _resolve(file_path: str) -> tuple[Path | None, str | None]:
"""Resolve *file_path* safely within an allowed base directory.
Returns ``(resolved, None)`` on success or
``(None, error_message)`` on failure.
"""
try:
base_dir, remainder = _pick_base(file_path)
base = base_dir.resolve()
base.mkdir(parents=True, exist_ok=True)
combined = base / remainder
resolved = combined.resolve()
if not str(resolved).startswith(str(base)):
return None, (
f"Path traversal detected: '{file_path}' "
f"escapes the base directory."
)
return resolved, None
except Exception as exc:
return None, f"Invalid path: {exc}"
# ------------------------------------------------------------------
# Entry point
# ------------------------------------------------------------------
[docs]
async def run(
action: str,
file_path: str,
contents: str | None = None,
old_string: str | None = None,
new_string: str | None = None,
replace_all: bool = False,
**_kwargs,
) -> str:
"""Execute this tool and return the result.
Args:
action (str): The action value.
file_path (str): The file path value.
contents (str | None): The contents value.
old_string (str | None): The old string value.
new_string (str | None): The new string value.
replace_all (bool): The replace all value.
Returns:
str: Result string.
"""
if action == "create":
return await asyncio.to_thread(_create, file_path, contents)
if action == "read":
return await asyncio.to_thread(_read, file_path)
if action == "edit":
return await asyncio.to_thread(_edit, file_path, old_string, new_string, replace_all)
if action == "delete":
return await asyncio.to_thread(_delete, file_path)
if action == "list":
return await asyncio.to_thread(_list, file_path)
if action == "rmtree":
return await asyncio.to_thread(_rmtree, file_path)
return json.dumps({
"status": "error",
"error": f"Unknown action: '{action}'",
})
# ------------------------------------------------------------------
# Action implementations
# ------------------------------------------------------------------
def _create(file_path: str, contents: str | None) -> str:
"""Internal helper: create.
Args:
file_path (str): The file path value.
contents (str | None): The contents value.
Returns:
str: Result string.
"""
path, err = _resolve(file_path)
if err:
return json.dumps({"status": "error", "error": err})
if path.exists() and path.is_file():
return json.dumps({
"status": "error",
"error": (
f"File already exists: {file_path}. "
f"Use edit action to modify."
),
})
try:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(contents or "", encoding="utf-8")
size = path.stat().st_size
logger.info("Created file: %s (%d bytes)", file_path, size)
return json.dumps({
"status": "success",
"message": "File created successfully.",
"file_path": str(path),
"size": size,
})
except PermissionError:
return json.dumps({
"status": "error",
"error": f"Permission denied: {file_path}",
})
except Exception as exc:
return json.dumps({
"status": "error",
"error": f"Failed to create file: {exc}",
})
def _read(file_path: str) -> str:
"""Internal helper: read.
Args:
file_path (str): The file path value.
Returns:
str: Result string.
"""
path, err = _resolve(file_path)
if err:
return json.dumps({"status": "error", "error": err})
if not path.exists():
return json.dumps({
"status": "error",
"error": f"File does not exist: {file_path}",
})
if not path.is_file():
return json.dumps({
"status": "error",
"error": f"Path is not a file: {file_path}",
})
try:
text = path.read_text(encoding="utf-8")
except UnicodeDecodeError:
return json.dumps({
"status": "error",
"error": (
f"File is not a text file (binary): {file_path}"
),
})
size = path.stat().st_size
logger.info("Read file: %s (%d bytes)", file_path, size)
return json.dumps({
"status": "success",
"message": "File read successfully.",
"file_path": str(path),
"contents": text,
"size": size,
})
def _edit(
file_path: str,
old_string: str | None,
new_string: str | None,
replace_all: bool,
) -> str:
"""Internal helper: edit.
Args:
file_path (str): The file path value.
old_string (str | None): The old string value.
new_string (str | None): The new string value.
replace_all (bool): The replace all value.
Returns:
str: Result string.
"""
if old_string is None or new_string is None:
return json.dumps({
"status": "error",
"error": (
"old_string and new_string are required for edit."
),
})
path, err = _resolve(file_path)
if err:
return json.dumps({"status": "error", "error": err})
if not path.exists():
return json.dumps({
"status": "error",
"error": (
f"File does not exist: {file_path}. "
f"Use create action for new files."
),
})
if not path.is_file():
return json.dumps({
"status": "error",
"error": f"Path is not a file: {file_path}",
})
try:
text = path.read_text(encoding="utf-8")
except UnicodeDecodeError:
return json.dumps({
"status": "error",
"error": f"File is not a text file: {file_path}",
})
count = text.count(old_string)
if count == 0:
return json.dumps({
"status": "error",
"error": f"Search string not found in {file_path}.",
})
if replace_all:
new_text = text.replace(old_string, new_string)
replacements = count
else:
new_text = text.replace(old_string, new_string, 1)
replacements = 1
try:
path.write_text(new_text, encoding="utf-8")
logger.info(
"Edited file: %s (%d replacements)", file_path, replacements,
)
return json.dumps({
"status": "success",
"message": "File edited successfully.",
"file_path": str(path),
"replacements": replacements,
})
except PermissionError:
return json.dumps({
"status": "error",
"error": f"Permission denied: {file_path}",
})
except Exception as exc:
return json.dumps({
"status": "error",
"error": f"Failed to edit file: {exc}",
})
def _list(file_path: str) -> str:
"""Internal helper: list.
Args:
file_path (str): The file path value.
Returns:
str: Result string.
"""
path, err = _resolve(file_path)
if err:
return json.dumps({"status": "error", "error": err})
if not path.exists():
return json.dumps({
"status": "error",
"error": f"Directory does not exist: {file_path}",
})
if not path.is_dir():
return json.dumps({
"status": "error",
"error": f"Path is not a directory: {file_path}",
})
try:
entries = []
for entry in path.iterdir():
stat = entry.stat()
entries.append({
"name": entry.name,
"type": "dir" if entry.is_dir() else "file",
"size": stat.st_size if entry.is_file() else 0,
"modified": datetime.fromtimestamp(
stat.st_mtime, tz=timezone.utc
).isoformat(),
})
entries.sort(key=lambda e: (0 if e["type"] == "dir" else 1, e["name"].lower()))
base_dir, _ = _pick_base(file_path)
relative = str(path.relative_to(base_dir.resolve()))
return json.dumps({
"status": "success",
"directory": relative,
"entry_count": len(entries),
"entries": entries,
})
except PermissionError:
return json.dumps({
"status": "error",
"error": f"Permission denied: {file_path}",
})
except Exception as exc:
return json.dumps({
"status": "error",
"error": f"Failed to list directory: {exc}",
})
def _rmtree(file_path: str) -> str:
"""Recursively remove a directory. Refuses to delete a base directory."""
path, err = _resolve(file_path)
if err:
return json.dumps({"status": "error", "error": err})
resolved_bases = {b.resolve() for b in ALLOWED_BASES.values()}
if path in resolved_bases:
return json.dumps({
"status": "error",
"error": (
"Refusing to remove a base directory. "
"Specify a subdirectory instead."
),
})
if not path.exists():
return json.dumps({
"status": "error",
"error": f"Path does not exist: {file_path}",
})
if not path.is_dir():
return json.dumps({
"status": "error",
"error": (
f"Path is not a directory: {file_path}. "
f"Use the delete action for single files."
),
})
try:
shutil.rmtree(path)
logger.info("Removed directory tree: %s", file_path)
return json.dumps({
"status": "success",
"message": "Directory removed successfully.",
"file_path": str(path),
})
except PermissionError:
return json.dumps({
"status": "error",
"error": f"Permission denied: {file_path}",
})
except Exception as exc:
return json.dumps({
"status": "error",
"error": f"Failed to remove directory: {exc}",
})
def _delete(file_path: str) -> str:
"""Internal helper: delete.
Args:
file_path (str): The file path value.
Returns:
str: Result string.
"""
path, err = _resolve(file_path)
if err:
return json.dumps({"status": "error", "error": err})
if not path.exists():
return json.dumps({
"status": "error",
"error": f"File does not exist: {file_path}",
})
if not path.is_file():
return json.dumps({
"status": "error",
"error": f"Path is not a file: {file_path}",
})
try:
path.unlink()
logger.info("Deleted file: %s", file_path)
return json.dumps({
"status": "success",
"message": "File deleted successfully.",
"file_path": str(path),
})
except PermissionError:
return json.dumps({
"status": "error",
"error": f"Permission denied: {file_path}",
})
except Exception as exc:
return json.dumps({
"status": "error",
"error": f"Failed to delete file: {exc}",
})