Source code for tools.headless_browser

"""Headless Firefox browser automation via Playwright.

Provides navigation, interaction, content extraction, waiting,
screenshots, JavaScript execution, cookie management, network
interception, session persistence, download handling, and console/dialog
tools -- all exposed through the v3 multi-tool format.
"""

import asyncio
import fnmatch
import json
import logging
import os
import re
import shutil
import time
import uuid
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional
from urllib.parse import urlparse

try:
    from playwright.async_api import (
        async_playwright,
        Browser,
        BrowserContext,
        Page,
        Playwright,
        TimeoutError as PlaywrightTimeout,
        Error as PlaywrightError,
    )
    PLAYWRIGHT_AVAILABLE = True
except ImportError as e:
    logging.error(f"Playwright not installed: {e}")
    PLAYWRIGHT_AVAILABLE = False
    async_playwright = None
    Browser = None
    BrowserContext = None
    Page = None
    Playwright = None
    PlaywrightTimeout = Exception
    PlaywrightError = Exception

try:
    import html2text
    HTML2TEXT_AVAILABLE = True
except ImportError:
    HTML2TEXT_AVAILABLE = False
    html2text = None

logger = logging.getLogger(__name__)

OUTPUT_DIR = Path(os.environ.get("BROWSER_OUTPUT_DIR", "/home/star/large_files/browser_outputs"))
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)

SESSIONS_DIR = Path(os.environ.get("BROWSER_SESSIONS_DIR", "/home/star/large_files/browser_sessions"))
SESSIONS_DIR.mkdir(parents=True, exist_ok=True)

DOWNLOADS_DIR = OUTPUT_DIR / "downloads"
DOWNLOADS_DIR.mkdir(parents=True, exist_ok=True)

DEFAULT_TIMEOUT = 30000
MAX_CONTENT_LENGTH = 1000000
MAX_CONSOLE_LOGS = 1000
MAX_INTERCEPTED_RESPONSES = 100
MAX_RESPONSE_BODY_SIZE = 10 * 1024 * 1024
_JSON_PARSE_OFFLOAD_THRESHOLD = 64 * 1024


async def _json_loads_network_body(body_bytes: bytes) -> Any:
    """Parse JSON from captured HTTP bodies without blocking on large payloads."""
    text = body_bytes.decode("utf-8")
    if len(body_bytes) >= _JSON_PARSE_OFFLOAD_THRESHOLD:
        return await asyncio.to_thread(json.loads, text)
    return json.loads(text)


# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------

def _json_response(status: str, message: str = "", data: Optional[Dict] = None, error: Optional[str] = None) -> str:
    """Internal helper: json response.

        Args:
            status (str): The status value.
            message (str): The message value.
            data (Optional[Dict]): Input data payload.
            error (Optional[str]): The error value.

        Returns:
            str: Result string.
        """
    response = {"status": status}
    if message:
        response["message"] = message
    if data:
        response["data"] = data
    if error:
        response["error"] = error
    return json.dumps(response, indent=2, default=str)


def _validate_url(url: str) -> tuple:
    """Internal helper: validate url.

        Args:
            url (str): URL string.

        Returns:
            tuple: The result.
        """
    try:
        parsed = urlparse(url)
        if parsed.scheme not in ["http", "https"]:
            return False, f"Invalid URL scheme: {parsed.scheme}. Only http and https are allowed."
        if not parsed.netloc:
            return False, "Invalid URL: missing domain"
        blocked_hosts = ["localhost", "127.0.0.1", "0.0.0.0", "::1"]
        if parsed.hostname and parsed.hostname.lower() in blocked_hosts:
            return False, f"Blocked URL: {parsed.hostname} is not allowed"
        return True, ""
    except Exception as e:
        return False, f"URL validation error: {str(e)}"


def _truncate_content(content: str, max_length: int = MAX_CONTENT_LENGTH) -> str:
    """Internal helper: truncate content.

        Args:
            content (str): Content data.
            max_length (int): The max length value.

        Returns:
            str: Result string.
        """
    if len(content) <= max_length:
        return content
    return content[:max_length] + "\n\n[Content truncated - exceeded maximum length]"


# ---------------------------------------------------------------------------
# Browser manager
# ---------------------------------------------------------------------------

[docs] class HeadlessBrowserManager: """HeadlessBrowserManager. Attributes: default_context_id: The default context id. is_initialized: The is initialized. last_used: The last used. usage_count: The usage count. error_count: The error count. max_errors_before_restart: The max errors before restart. idle_timeout: The idle timeout. _lock: The lock. default_viewport: The default viewport. default_user_agent: The default user agent. """
[docs] def __init__(self): """Initialize the instance. """ self.playwright: Optional[Any] = None self.browser: Optional[Any] = None self.contexts: Dict[str, Any] = {} self.pages: Dict[str, Any] = {} self.default_context_id = "default" self.is_initialized = False self.last_used = None self.usage_count = 0 self.error_count = 0 self.max_errors_before_restart = 5 self.idle_timeout = 300 self._lock = asyncio.Lock() self.default_viewport = {"width": 1920, "height": 1080} self.default_user_agent = ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:120.0) " "Gecko/20100101 Firefox/120.0" ) self.intercepted_responses: Dict[str, Dict] = {} self.response_patterns: Dict[str, List[str]] = {} self.console_logs: Dict[str, List[Dict]] = {} self.downloads: Dict[str, Dict] = {} self.pending_downloads: Dict[str, asyncio.Future] = {} self.persistent_contexts: Dict[str, str] = {} self.dialog_handlers: Dict[str, str] = {} self.pending_dialogs: Dict[str, Dict] = {}
[docs] async def initialize(self) -> bool: """Initialize. Returns: bool: True on success, False otherwise. """ if not PLAYWRIGHT_AVAILABLE: logger.error("Playwright is not installed.") return False async with self._lock: if self.is_initialized and self.browser: return True logger.info("Initializing headless Firefox browser...") try: self.playwright = await async_playwright().start() self.browser = await self.playwright.firefox.launch( headless=True, firefox_user_prefs={ "dom.webnotifications.enabled": False, "media.autoplay.default": 5, "permissions.default.desktop-notification": 2, }, ) await self._create_context(self.default_context_id) self.is_initialized = True self.last_used = time.time() self.error_count = 0 logger.info("Headless Firefox browser initialized successfully") return True except Exception as e: logger.error(f"Failed to initialize browser: {e}") await self.cleanup() return False
async def _create_context(self, context_id: str, **options): """Internal helper: create context. Args: context_id (str): The context id value. **options: Additional keyword arguments. """ if not self.browser: raise RuntimeError("Browser not initialized") context_options = { "viewport": options.get("viewport", self.default_viewport), "user_agent": options.get("user_agent", self.default_user_agent), "accept_downloads": True, "ignore_https_errors": options.get("ignore_https_errors", False), } if "geolocation" in options: context_options["geolocation"] = options["geolocation"] context_options["permissions"] = ["geolocation"] if "device" in options: device = self.playwright.devices.get(options["device"]) if device: context_options.update(device) context = await self.browser.new_context(**context_options) self.contexts[context_id] = context self.console_logs[context_id] = [] self.response_patterns[context_id] = [] self.dialog_handlers[context_id] = "accept" page = await context.new_page() self.pages[context_id] = page await self._setup_page_handlers(page, context_id) return context async def _create_persistent_context(self, session_name: str, context_id: str, **options): """Internal helper: create persistent context. Args: session_name (str): The session name value. context_id (str): The context id value. **options: Additional keyword arguments. """ if not self.playwright: raise RuntimeError("Playwright not initialized") safe_session_name = re.sub(r"[^\w\-]", "_", session_name) user_data_dir = SESSIONS_DIR / safe_session_name user_data_dir.mkdir(parents=True, exist_ok=True) context_options = { "viewport": options.get("viewport", self.default_viewport), "user_agent": options.get("user_agent", self.default_user_agent), "accept_downloads": True, "ignore_https_errors": options.get("ignore_https_errors", False), } if "geolocation" in options: context_options["geolocation"] = options["geolocation"] context_options["permissions"] = ["geolocation"] context = await self.playwright.firefox.launch_persistent_context( str(user_data_dir), headless=True, **context_options ) self.contexts[context_id] = context self.persistent_contexts[context_id] = safe_session_name self.console_logs[context_id] = [] self.response_patterns[context_id] = [] self.dialog_handlers[context_id] = "accept" pages = context.pages page = pages[0] if pages else await context.new_page() self.pages[context_id] = page await self._setup_page_handlers(page, context_id) return context async def _setup_page_handlers(self, page, context_id: str): """Internal helper: setup page handlers. Args: page: The page value. context_id (str): The context id value. """ async def on_response(response): """On response. Args: response: The response value. """ try: patterns = self.response_patterns.get(context_id, []) if not patterns: return url = response.url if not any(fnmatch.fnmatch(url, p) for p in patterns): return if len(self.intercepted_responses) >= MAX_INTERCEPTED_RESPONSES: oldest_id = next(iter(self.intercepted_responses)) del self.intercepted_responses[oldest_id] response_id = f"resp_{uuid.uuid4().hex[:12]}" body = None content_type = response.headers.get("content-type", "") try: body_bytes = await response.body() if len(body_bytes) <= MAX_RESPONSE_BODY_SIZE: if "application/json" in content_type: body = await _json_loads_network_body(body_bytes) elif "text/" in content_type: body = body_bytes.decode("utf-8") else: body = f"[Binary data: {len(body_bytes)} bytes]" else: body = f"[Response too large: {len(body_bytes)} bytes]" except Exception as e: body = f"[Could not read body: {str(e)}]" self.intercepted_responses[response_id] = { "response_id": response_id, "context_id": context_id, "url": url, "status": response.status, "status_text": response.status_text, "headers": dict(response.headers), "body": body, "method": response.request.method, "timestamp": datetime.now().isoformat(), } except Exception as e: logger.warning(f"Error in response handler: {e}") page.on("response", on_response) def on_console(msg): """On console. Args: msg: Incoming message object. """ try: logs = self.console_logs.get(context_id, []) if len(logs) >= MAX_CONSOLE_LOGS: logs.pop(0) log_entry = {"type": msg.type, "text": msg.text, "timestamp": datetime.now().isoformat()} try: location = msg.location if location: log_entry["url"] = location.get("url", "") log_entry["line"] = location.get("lineNumber", 0) except Exception: pass logs.append(log_entry) self.console_logs[context_id] = logs except Exception as e: logger.warning(f"Error in console handler: {e}") page.on("console", on_console) async def on_download(download): """On download. Args: download: The download value. """ try: download_id = f"dl_{uuid.uuid4().hex[:12]}" suggested_filename = download.suggested_filename safe_filename = re.sub(r"[^\w\-\.]", "_", suggested_filename) save_path = DOWNLOADS_DIR / f"{download_id}_{safe_filename}" download_info = { "download_id": download_id, "context_id": context_id, "url": download.url, "suggested_filename": suggested_filename, "path": str(save_path), "state": "pending", "timestamp": datetime.now().isoformat(), } self.downloads[download_id] = download_info try: await download.save_as(str(save_path)) download_info["state"] = "completed" download_info["size"] = save_path.stat().st_size if save_path.exists() else 0 except Exception as e: download_info["state"] = "failed" download_info["error"] = str(e) if context_id in self.pending_downloads: future = self.pending_downloads.pop(context_id) if not future.done(): future.set_result(download_info) except Exception as e: logger.warning(f"Error in download handler: {e}") page.on("download", on_download) async def on_dialog(dialog): """On dialog. Args: dialog: The dialog value. """ try: dialog_info = { "type": dialog.type, "message": dialog.message, "default_value": dialog.default_value, "timestamp": datetime.now().isoformat(), } self.pending_dialogs[context_id] = dialog_info action = self.dialog_handlers.get(context_id, "accept") if action == "dismiss": await dialog.dismiss() dialog_info["handled"] = "dismissed" else: await dialog.accept() dialog_info["handled"] = "accepted" if action == "accept" else "auto-accepted (manual mode)" except Exception as e: logger.warning(f"Error in dialog handler: {e}") page.on("dialog", on_dialog)
[docs] async def ensure_ready(self) -> bool: """Ensure ready. Returns: bool: True on success, False otherwise. """ current_time = time.time() if self.error_count >= self.max_errors_before_restart: await self.restart() return self.is_initialized if not self.is_initialized: return await self.initialize() try: if self.browser and self.browser.is_connected(): self.last_used = current_time return True await self.restart() return self.is_initialized except Exception: self.error_count += 1 await self.restart() return self.is_initialized
[docs] async def restart(self) -> bool: """Restart. Returns: bool: True on success, False otherwise. """ await self.cleanup() return await self.initialize()
[docs] async def cleanup(self): """Cleanup. """ async with self._lock: try: for page in self.pages.values(): try: await page.close() except Exception: pass self.pages.clear() for context in self.contexts.values(): try: await context.close() except Exception: pass self.contexts.clear() if self.browser: try: await self.browser.close() except Exception: pass self.browser = None if self.playwright: try: await self.playwright.stop() except Exception: pass self.playwright = None self.is_initialized = False except Exception as e: logger.error(f"Error during browser cleanup: {e}")
[docs] async def get_page(self, context_id: str = None): """Retrieve the page. Args: context_id (str): The context id value. """ context_id = context_id or self.default_context_id if not await self.ensure_ready(): raise RuntimeError("Browser not available") if context_id not in self.pages: if context_id not in self.contexts: await self._create_context(context_id) else: page = await self.contexts[context_id].new_page() self.pages[context_id] = page self.usage_count += 1 self.last_used = time.time() return self.pages[context_id]
[docs] async def get_context(self, context_id: str = None): """Retrieve the context. Args: context_id (str): The context id value. """ context_id = context_id or self.default_context_id if not await self.ensure_ready(): raise RuntimeError("Browser not available") if context_id not in self.contexts: await self._create_context(context_id) return self.contexts[context_id]
[docs] async def create_new_context(self, context_id: str = None, **options) -> str: """Create a new new context. Args: context_id (str): The context id value. **options: Additional keyword arguments. Returns: str: Result string. """ if not await self.ensure_ready(): raise RuntimeError("Browser not available") context_id = context_id or f"context_{uuid.uuid4().hex[:8]}" await self._create_context(context_id, **options) return context_id
[docs] async def close_context(self, context_id: str): """Close context. Args: context_id (str): The context id value. """ if context_id in self.pages: try: await self.pages[context_id].close() except Exception: pass del self.pages[context_id] if context_id in self.contexts: try: await self.contexts[context_id].close() except Exception: pass del self.contexts[context_id]
[docs] def record_error(self): """Record error. """ self.error_count += 1
_bm = HeadlessBrowserManager() # =================================================================== # Tool handler functions # =================================================================== # Navigation --------------------------------------------------------
[docs] async def browser_navigate(url: str, wait_until: str = "load", timeout: int = DEFAULT_TIMEOUT, context_id: str = None) -> str: """Browser navigate. Args: url (str): URL string. wait_until (str): The wait until value. timeout (int): Maximum wait time in seconds. context_id (str): The context id value. Returns: str: Result string. """ if not PLAYWRIGHT_AVAILABLE: return _json_response("error", error="Playwright not installed") valid, error_msg = _validate_url(url) if not valid: return _json_response("error", error=error_msg) valid_wait_states = ["load", "domcontentloaded", "networkidle", "commit"] if wait_until not in valid_wait_states: return _json_response("error", error=f"Invalid wait_until value. Must be one of: {valid_wait_states}") try: page = await _bm.get_page(context_id) response = await page.goto(url, wait_until=wait_until, timeout=timeout) return _json_response("success", message="Navigation complete", data={ "url": page.url, "title": await page.title(), "status": response.status if response else None, "ok": response.ok if response else None, }) except PlaywrightTimeout: _bm.record_error() return _json_response("error", error=f"Navigation timed out after {timeout}ms") except PlaywrightError as e: _bm.record_error() return _json_response("error", error=f"Navigation error: {str(e)}") except Exception as e: _bm.record_error() return _json_response("error", error=f"Unexpected error: {str(e)}")
[docs] async def browser_go_back(timeout: int = DEFAULT_TIMEOUT, context_id: str = None) -> str: """Browser go back. Args: timeout (int): Maximum wait time in seconds. context_id (str): The context id value. Returns: str: Result string. """ if not PLAYWRIGHT_AVAILABLE: return _json_response("error", error="Playwright not installed") try: page = await _bm.get_page(context_id) response = await page.go_back(timeout=timeout) return _json_response("success", message="Navigated back", data={ "url": page.url, "title": await page.title(), "status": response.status if response else None, }) except PlaywrightTimeout: return _json_response("error", error="Back navigation timed out") except Exception as e: _bm.record_error() return _json_response("error", error=f"Error navigating back: {str(e)}")
[docs] async def browser_go_forward(timeout: int = DEFAULT_TIMEOUT, context_id: str = None) -> str: """Browser go forward. Args: timeout (int): Maximum wait time in seconds. context_id (str): The context id value. Returns: str: Result string. """ if not PLAYWRIGHT_AVAILABLE: return _json_response("error", error="Playwright not installed") try: page = await _bm.get_page(context_id) response = await page.go_forward(timeout=timeout) return _json_response("success", message="Navigated forward", data={ "url": page.url, "title": await page.title(), "status": response.status if response else None, }) except PlaywrightTimeout: return _json_response("error", error="Forward navigation timed out") except Exception as e: _bm.record_error() return _json_response("error", error=f"Error navigating forward: {str(e)}")
[docs] async def browser_reload(wait_until: str = "load", timeout: int = DEFAULT_TIMEOUT, context_id: str = None) -> str: """Browser reload. Args: wait_until (str): The wait until value. timeout (int): Maximum wait time in seconds. context_id (str): The context id value. Returns: str: Result string. """ if not PLAYWRIGHT_AVAILABLE: return _json_response("error", error="Playwright not installed") try: page = await _bm.get_page(context_id) response = await page.reload(wait_until=wait_until, timeout=timeout) return _json_response("success", message="Page reloaded", data={ "url": page.url, "title": await page.title(), "status": response.status if response else None, }) except PlaywrightTimeout: return _json_response("error", error="Page reload timed out") except Exception as e: _bm.record_error() return _json_response("error", error=f"Error reloading page: {str(e)}")
[docs] async def browser_close_context(context_id: str) -> str: """Browser close context. Args: context_id (str): The context id value. Returns: str: Result string. """ if not PLAYWRIGHT_AVAILABLE: return _json_response("error", error="Playwright not installed") if context_id == _bm.default_context_id: return _json_response("error", error="Cannot close the default context") try: await _bm.close_context(context_id) return _json_response("success", message=f"Context '{context_id}' closed") except Exception as e: return _json_response("error", error=f"Error closing context: {str(e)}")
# Interaction -------------------------------------------------------
[docs] async def browser_click(selector: str, button: str = "left", click_count: int = 1, timeout: int = DEFAULT_TIMEOUT, context_id: str = None) -> str: """Browser click. Args: selector (str): The selector value. button (str): The button value. click_count (int): The click count value. timeout (int): Maximum wait time in seconds. context_id (str): The context id value. Returns: str: Result string. """ if not PLAYWRIGHT_AVAILABLE: return _json_response("error", error="Playwright not installed") if button not in ("left", "right", "middle"): return _json_response("error", error="Invalid button. Must be left, right, or middle") try: page = await _bm.get_page(context_id) await page.click(selector, button=button, click_count=click_count, timeout=timeout) return _json_response("success", message=f"Clicked element: {selector}", data={ "selector": selector, "button": button, "click_count": click_count, }) except PlaywrightTimeout: return _json_response("error", error=f"Element not found within {timeout}ms: {selector}") except Exception as e: _bm.record_error() return _json_response("error", error=f"Click error: {str(e)}")
[docs] async def browser_type(selector: str, text: str, delay: int = 0, clear_first: bool = False, timeout: int = DEFAULT_TIMEOUT, context_id: str = None) -> str: """Browser type. Args: selector (str): The selector value. text (str): Text content. delay (int): The delay value. clear_first (bool): The clear first value. timeout (int): Maximum wait time in seconds. context_id (str): The context id value. Returns: str: Result string. """ if not PLAYWRIGHT_AVAILABLE: return _json_response("error", error="Playwright not installed") try: page = await _bm.get_page(context_id) if clear_first: await page.fill(selector, "", timeout=timeout) await page.type(selector, text, delay=delay, timeout=timeout) return _json_response("success", message=f"Typed text into: {selector}", data={ "selector": selector, "text_length": len(text), "delay": delay, }) except PlaywrightTimeout: return _json_response("error", error=f"Element not found within {timeout}ms: {selector}") except Exception as e: _bm.record_error() return _json_response("error", error=f"Type error: {str(e)}")
[docs] async def browser_fill(selector: str, value: str, timeout: int = DEFAULT_TIMEOUT, context_id: str = None) -> str: """Browser fill. Args: selector (str): The selector value. value (str): Value to set. timeout (int): Maximum wait time in seconds. context_id (str): The context id value. Returns: str: Result string. """ if not PLAYWRIGHT_AVAILABLE: return _json_response("error", error="Playwright not installed") try: page = await _bm.get_page(context_id) await page.fill(selector, value, timeout=timeout) return _json_response("success", message=f"Filled element: {selector}", data={ "selector": selector, "value_length": len(value), }) except PlaywrightTimeout: return _json_response("error", error=f"Element not found within {timeout}ms: {selector}") except Exception as e: _bm.record_error() return _json_response("error", error=f"Fill error: {str(e)}")
[docs] async def browser_fill_form(fields: str, timeout: int = DEFAULT_TIMEOUT, context_id: str = None) -> str: """Browser fill form. Args: fields (str): The fields value. timeout (int): Maximum wait time in seconds. context_id (str): The context id value. Returns: str: Result string. """ if not PLAYWRIGHT_AVAILABLE: return _json_response("error", error="Playwright not installed") try: parsed_fields = json.loads(fields) if isinstance(fields, str) else fields except (json.JSONDecodeError, TypeError): return _json_response("error", error="fields must be a JSON object mapping selectors to values") if not parsed_fields or not isinstance(parsed_fields, dict): return _json_response("error", error="Fields must be a non-empty dictionary") try: page = await _bm.get_page(context_id) results, errors = [], [] for sel, val in parsed_fields.items(): try: await page.fill(sel, val, timeout=timeout) results.append({"selector": sel, "status": "filled"}) except Exception as e: errors.append({"selector": sel, "error": str(e)}) if errors: return _json_response("partial", message=f"Filled {len(results)} fields with {len(errors)} errors", data={"filled": results, "errors": errors}) return _json_response("success", message=f"Filled {len(results)} form fields", data={"filled": results}) except Exception as e: _bm.record_error() return _json_response("error", error=f"Form fill error: {str(e)}")
[docs] async def browser_select_option(selector: str, value: str = None, label: str = None, index: int = None, timeout: int = DEFAULT_TIMEOUT, context_id: str = None) -> str: """Browser select option. Args: selector (str): The selector value. value (str): Value to set. label (str): The label value. index (int): The index value. timeout (int): Maximum wait time in seconds. context_id (str): The context id value. Returns: str: Result string. """ if not PLAYWRIGHT_AVAILABLE: return _json_response("error", error="Playwright not installed") if value is None and label is None and index is None: return _json_response("error", error="Must provide at least one of: value, label, or index") try: page = await _bm.get_page(context_id) opts = {} if value is not None: opts["value"] = value if label is not None: opts["label"] = label if index is not None: opts["index"] = index selected = await page.select_option(selector, timeout=timeout, **opts) return _json_response("success", message=f"Selected option in: {selector}", data={ "selector": selector, "selected_values": selected, }) except PlaywrightTimeout: return _json_response("error", error=f"Element not found within {timeout}ms: {selector}") except Exception as e: _bm.record_error() return _json_response("error", error=f"Select error: {str(e)}")
[docs] async def browser_press_key(key: str, selector: str = None, timeout: int = DEFAULT_TIMEOUT, context_id: str = None) -> str: """Browser press key. Args: key (str): Dictionary or cache key. selector (str): The selector value. timeout (int): Maximum wait time in seconds. context_id (str): The context id value. Returns: str: Result string. """ if not PLAYWRIGHT_AVAILABLE: return _json_response("error", error="Playwright not installed") try: page = await _bm.get_page(context_id) if selector: await page.press(selector, key, timeout=timeout) else: await page.keyboard.press(key) return _json_response("success", message=f"Pressed key: {key}", data={"key": key, "selector": selector}) except PlaywrightTimeout: return _json_response("error", error=f"Element not found within {timeout}ms: {selector}") except Exception as e: _bm.record_error() return _json_response("error", error=f"Key press error: {str(e)}")
[docs] async def browser_hover(selector: str, timeout: int = DEFAULT_TIMEOUT, context_id: str = None) -> str: """Browser hover. Args: selector (str): The selector value. timeout (int): Maximum wait time in seconds. context_id (str): The context id value. Returns: str: Result string. """ if not PLAYWRIGHT_AVAILABLE: return _json_response("error", error="Playwright not installed") try: page = await _bm.get_page(context_id) await page.hover(selector, timeout=timeout) return _json_response("success", message=f"Hovering over: {selector}", data={"selector": selector}) except PlaywrightTimeout: return _json_response("error", error=f"Element not found within {timeout}ms: {selector}") except Exception as e: _bm.record_error() return _json_response("error", error=f"Hover error: {str(e)}")
[docs] async def browser_scroll(direction: str = "down", amount: int = 500, selector: str = None, context_id: str = None) -> str: """Browser scroll. Args: direction (str): The direction value. amount (int): The amount value. selector (str): The selector value. context_id (str): The context id value. Returns: str: Result string. """ if not PLAYWRIGHT_AVAILABLE: return _json_response("error", error="Playwright not installed") valid_directions = ["up", "down", "left", "right"] if direction not in valid_directions: return _json_response("error", error=f"Invalid direction. Must be one of: {valid_directions}") try: page = await _bm.get_page(context_id) dx, dy = 0, 0 if direction == "down": dy = amount elif direction == "up": dy = -amount elif direction == "right": dx = amount elif direction == "left": dx = -amount if selector: await page.evaluate(f"const el = document.querySelector('{selector}'); if (el) el.scrollBy({dx}, {dy});") else: await page.evaluate(f"window.scrollBy({dx}, {dy})") return _json_response("success", message=f"Scrolled {direction} by {amount}px", data={ "direction": direction, "amount": amount, "selector": selector, }) except Exception as e: _bm.record_error() return _json_response("error", error=f"Scroll error: {str(e)}")
# Content Extraction ------------------------------------------------
[docs] async def browser_get_content(format: str = "text", selector: str = None, context_id: str = None) -> str: """Browser get content. Args: format (str): The format value. selector (str): The selector value. context_id (str): The context id value. Returns: str: Result string. """ if not PLAYWRIGHT_AVAILABLE: return _json_response("error", error="Playwright not installed") valid_formats = ["html", "text", "markdown"] if format not in valid_formats: return _json_response("error", error=f"Invalid format. Must be one of: {valid_formats}") try: page = await _bm.get_page(context_id) if selector: element = await page.query_selector(selector) if not element: return _json_response("error", error=f"Element not found: {selector}") content = await element.inner_html() if format == "html" else await element.inner_text() else: content = await page.content() if format == "html" else await page.inner_text("body") if format == "markdown": if not HTML2TEXT_AVAILABLE: return _json_response("error", error="html2text not installed for markdown conversion") h = html2text.HTML2Text() h.ignore_links = False h.ignore_images = False h.body_width = 0 html_content = content if selector else await page.content() content = h.handle(html_content) content = re.sub(r"\n\s*\n\s*\n+", "\n\n", content).strip() content = _truncate_content(content) return _json_response("success", message=f"Retrieved {format} content", data={ "format": format, "content": content, "length": len(content), "url": page.url, "title": await page.title(), }) except Exception as e: _bm.record_error() return _json_response("error", error=f"Content extraction error: {str(e)}")
[docs] async def browser_get_text(selector: str, timeout: int = DEFAULT_TIMEOUT, context_id: str = None) -> str: """Browser get text. Args: selector (str): The selector value. timeout (int): Maximum wait time in seconds. context_id (str): The context id value. Returns: str: Result string. """ if not PLAYWRIGHT_AVAILABLE: return _json_response("error", error="Playwright not installed") try: page = await _bm.get_page(context_id) text = await page.inner_text(selector, timeout=timeout) return _json_response("success", message="Retrieved element text", data={"selector": selector, "text": text}) except PlaywrightTimeout: return _json_response("error", error=f"Element not found within {timeout}ms: {selector}") except Exception as e: _bm.record_error() return _json_response("error", error=f"Get text error: {str(e)}")
[docs] async def browser_get_attribute(selector: str, attribute: str, timeout: int = DEFAULT_TIMEOUT, context_id: str = None) -> str: """Browser get attribute. Args: selector (str): The selector value. attribute (str): The attribute value. timeout (int): Maximum wait time in seconds. context_id (str): The context id value. Returns: str: Result string. """ if not PLAYWRIGHT_AVAILABLE: return _json_response("error", error="Playwright not installed") try: page = await _bm.get_page(context_id) value = await page.get_attribute(selector, attribute, timeout=timeout) return _json_response("success", message="Retrieved attribute value", data={ "selector": selector, "attribute": attribute, "value": value, }) except PlaywrightTimeout: return _json_response("error", error=f"Element not found within {timeout}ms: {selector}") except Exception as e: _bm.record_error() return _json_response("error", error=f"Get attribute error: {str(e)}")
[docs] async def browser_get_page_info(context_id: str = None) -> str: """Browser get page info. Args: context_id (str): The context id value. Returns: str: Result string. """ if not PLAYWRIGHT_AVAILABLE: return _json_response("error", error="Playwright not installed") try: page = await _bm.get_page(context_id) return _json_response("success", message="Retrieved page info", data={ "url": page.url, "title": await page.title(), "viewport": page.viewport_size, }) except Exception as e: _bm.record_error() return _json_response("error", error=f"Get page info error: {str(e)}")
[docs] async def browser_query_selector_all(selector: str, attributes: str = None, timeout: int = DEFAULT_TIMEOUT, context_id: str = None) -> str: """Browser query selector all. Args: selector (str): The selector value. attributes (str): The attributes value. timeout (int): Maximum wait time in seconds. context_id (str): The context id value. Returns: str: Result string. """ if not PLAYWRIGHT_AVAILABLE: return _json_response("error", error="Playwright not installed") try: parsed_attrs = None if attributes: try: parsed_attrs = json.loads(attributes) if isinstance(attributes, str) else attributes except (json.JSONDecodeError, TypeError): parsed_attrs = None page = await _bm.get_page(context_id) elements = await page.query_selector_all(selector) results = [] for i, element in enumerate(elements): item = {"index": i} try: item["text"] = await element.inner_text() except Exception: item["text"] = "" if parsed_attrs: for attr in parsed_attrs: if attr != "text": try: item[attr] = await element.get_attribute(attr) except Exception: item[attr] = None results.append(item) return _json_response("success", message=f"Found {len(results)} elements", data={ "selector": selector, "count": len(results), "elements": results[:100], }) except Exception as e: _bm.record_error() return _json_response("error", error=f"Query selector all error: {str(e)}")
# Waiting -----------------------------------------------------------
[docs] async def browser_wait_for_selector(selector: str, state: str = "visible", timeout: int = DEFAULT_TIMEOUT, context_id: str = None) -> str: """Browser wait for selector. Args: selector (str): The selector value. state (str): The state value. timeout (int): Maximum wait time in seconds. context_id (str): The context id value. Returns: str: Result string. """ if not PLAYWRIGHT_AVAILABLE: return _json_response("error", error="Playwright not installed") valid_states = ["visible", "hidden", "attached", "detached"] if state not in valid_states: return _json_response("error", error=f"Invalid state. Must be one of: {valid_states}") try: page = await _bm.get_page(context_id) await page.wait_for_selector(selector, state=state, timeout=timeout) return _json_response("success", message=f"Element reached state: {state}", data={ "selector": selector, "state": state, }) except PlaywrightTimeout: return _json_response("error", error=f"Timeout waiting for {selector} to be {state}") except Exception as e: _bm.record_error() return _json_response("error", error=f"Wait error: {str(e)}")
[docs] async def browser_wait_for_load_state(state: str = "load", timeout: int = DEFAULT_TIMEOUT, context_id: str = None) -> str: """Browser wait for load state. Args: state (str): The state value. timeout (int): Maximum wait time in seconds. context_id (str): The context id value. Returns: str: Result string. """ if not PLAYWRIGHT_AVAILABLE: return _json_response("error", error="Playwright not installed") valid_states = ["load", "domcontentloaded", "networkidle"] if state not in valid_states: return _json_response("error", error=f"Invalid state. Must be one of: {valid_states}") try: page = await _bm.get_page(context_id) await page.wait_for_load_state(state, timeout=timeout) return _json_response("success", message=f"Page reached load state: {state}", data={ "state": state, "url": page.url, }) except PlaywrightTimeout: return _json_response("error", error=f"Timeout waiting for {state} state") except Exception as e: _bm.record_error() return _json_response("error", error=f"Wait error: {str(e)}")
[docs] async def browser_wait_for_url(url_pattern: str, timeout: int = DEFAULT_TIMEOUT, context_id: str = None) -> str: """Browser wait for url. Args: url_pattern (str): The url pattern value. timeout (int): Maximum wait time in seconds. context_id (str): The context id value. Returns: str: Result string. """ if not PLAYWRIGHT_AVAILABLE: return _json_response("error", error="Playwright not installed") try: page = await _bm.get_page(context_id) if url_pattern.startswith("^") or url_pattern.endswith("$"): pattern = re.compile(url_pattern) await page.wait_for_url(pattern, timeout=timeout) else: await page.wait_for_url(f"**{url_pattern}**", timeout=timeout) return _json_response("success", message="URL matched pattern", data={ "pattern": url_pattern, "url": page.url, }) except PlaywrightTimeout: return _json_response("error", error=f"Timeout waiting for URL to match: {url_pattern}") except Exception as e: _bm.record_error() return _json_response("error", error=f"Wait error: {str(e)}")
[docs] async def browser_wait(milliseconds: int, context_id: str = None) -> str: """Browser wait. Args: milliseconds (int): The milliseconds value. context_id (str): The context id value. Returns: str: Result string. """ if milliseconds < 0 or milliseconds > 60000: return _json_response("error", error="Milliseconds must be between 0 and 60000") try: await asyncio.sleep(milliseconds / 1000) return _json_response("success", message=f"Waited {milliseconds}ms", data={"milliseconds": milliseconds}) except Exception as e: return _json_response("error", error=f"Wait error: {str(e)}")
# Screenshots & PDF -------------------------------------------------
[docs] async def browser_screenshot(full_page: bool = True, selector: str = None, filename: str = None, context_id: str = None) -> str: """Browser screenshot. Args: full_page (bool): The full page value. selector (str): The selector value. filename (str): The filename value. context_id (str): The context id value. Returns: str: Result string. """ if not PLAYWRIGHT_AVAILABLE: return _json_response("error", error="Playwright not installed") try: page = await _bm.get_page(context_id) if not filename: filename = f"screenshot_{datetime.now().strftime('%Y%m%d_%H%M%S')}" filepath = OUTPUT_DIR / f"{filename}.png" if selector: element = await page.query_selector(selector) if not element: return _json_response("error", error=f"Element not found: {selector}") await element.screenshot(path=str(filepath)) else: await page.screenshot(path=str(filepath), full_page=full_page) return _json_response("success", message="Screenshot saved", data={ "path": str(filepath), "full_page": full_page, "selector": selector, "url": page.url, }) except Exception as e: _bm.record_error() return _json_response("error", error=f"Screenshot error: {str(e)}")
[docs] async def browser_pdf(filename: str = None, format: str = "A4", landscape: bool = False, print_background: bool = True, context_id: str = None) -> str: """Browser pdf. Args: filename (str): The filename value. format (str): The format value. landscape (bool): The landscape value. print_background (bool): The print background value. context_id (str): The context id value. Returns: str: Result string. """ if not PLAYWRIGHT_AVAILABLE: return _json_response("error", error="Playwright not installed") valid_formats = ["A4", "Letter", "Legal", "Tabloid", "A3", "A5"] if format not in valid_formats: return _json_response("error", error=f"Invalid format. Must be one of: {valid_formats}") try: page = await _bm.get_page(context_id) if not filename: filename = f"page_{datetime.now().strftime('%Y%m%d_%H%M%S')}" filepath = OUTPUT_DIR / f"{filename}.pdf" await page.pdf(path=str(filepath), format=format, landscape=landscape, print_background=print_background) return _json_response("success", message="PDF saved", data={ "path": str(filepath), "format": format, "landscape": landscape, "url": page.url, }) except Exception as e: _bm.record_error() return _json_response("error", error=f"PDF generation error: {str(e)}")
# JavaScript --------------------------------------------------------
[docs] async def browser_evaluate(expression: str, context_id: str = None) -> str: """Browser evaluate. Args: expression (str): The expression value. context_id (str): The context id value. Returns: str: Result string. """ if not PLAYWRIGHT_AVAILABLE: return _json_response("error", error="Playwright not installed") try: page = await _bm.get_page(context_id) result = await page.evaluate(expression) result_str = json.dumps(result, default=str) if len(result_str) > MAX_CONTENT_LENGTH: return _json_response("success", message="JavaScript evaluated (result truncated)", data={ "result": "[Result too large - truncated]", "result_type": type(result).__name__, "result_length": len(result_str), }) return _json_response("success", message="JavaScript evaluated", data={ "result": result, "result_type": type(result).__name__, }) except Exception as e: _bm.record_error() return _json_response("error", error=f"JavaScript evaluation error: {str(e)}")
[docs] async def browser_execute(script: str, context_id: str = None) -> str: """Browser execute. Args: script (str): The script value. context_id (str): The context id value. Returns: str: Result string. """ if not PLAYWRIGHT_AVAILABLE: return _json_response("error", error="Playwright not installed") try: page = await _bm.get_page(context_id) await page.evaluate(script) return _json_response("success", message="JavaScript executed", data={"script_length": len(script)}) except Exception as e: _bm.record_error() return _json_response("error", error=f"JavaScript execution error: {str(e)}")
# Cookies -----------------------------------------------------------
[docs] async def browser_get_cookies(urls: str = None, context_id: str = None) -> str: """Browser get cookies. Args: urls (str): The urls value. context_id (str): The context id value. Returns: str: Result string. """ if not PLAYWRIGHT_AVAILABLE: return _json_response("error", error="Playwright not installed") try: parsed_urls = None if urls: try: parsed_urls = json.loads(urls) if isinstance(urls, str) else urls except (json.JSONDecodeError, TypeError): parsed_urls = None context = await _bm.get_context(context_id) cookies = await context.cookies(parsed_urls) if parsed_urls else await context.cookies() sanitized = [{ "name": c.get("name"), "domain": c.get("domain"), "path": c.get("path"), "expires": c.get("expires"), "httpOnly": c.get("httpOnly"), "secure": c.get("secure"), "sameSite": c.get("sameSite"), "value_length": len(c.get("value", "")), } for c in cookies] return _json_response("success", message=f"Retrieved {len(cookies)} cookies", data={ "cookies": sanitized, "count": len(cookies), }) except Exception as e: _bm.record_error() return _json_response("error", error=f"Get cookies error: {str(e)}")
[docs] async def browser_set_cookies(cookies: str, context_id: str = None) -> str: """Browser set cookies. Args: cookies (str): The cookies value. context_id (str): The context id value. Returns: str: Result string. """ if not PLAYWRIGHT_AVAILABLE: return _json_response("error", error="Playwright not installed") try: parsed = json.loads(cookies) if isinstance(cookies, str) else cookies except (json.JSONDecodeError, TypeError): return _json_response("error", error="cookies must be a JSON array of cookie objects") if not parsed or not isinstance(parsed, list): return _json_response("error", error="Cookies must be a non-empty list") for cookie in parsed: if "name" not in cookie or "value" not in cookie: return _json_response("error", error="Each cookie must have 'name' and 'value'") if "url" not in cookie and "domain" not in cookie: return _json_response("error", error="Each cookie must have 'domain' or 'url'") try: context = await _bm.get_context(context_id) await context.add_cookies(parsed) return _json_response("success", message=f"Set {len(parsed)} cookies", data={ "count": len(parsed), "names": [c["name"] for c in parsed], }) except Exception as e: _bm.record_error() return _json_response("error", error=f"Set cookies error: {str(e)}")
[docs] async def browser_clear_cookies(context_id: str = None) -> str: """Browser clear cookies. Args: context_id (str): The context id value. Returns: str: Result string. """ if not PLAYWRIGHT_AVAILABLE: return _json_response("error", error="Playwright not installed") try: context = await _bm.get_context(context_id) await context.clear_cookies() return _json_response("success", message="All cookies cleared") except Exception as e: _bm.record_error() return _json_response("error", error=f"Clear cookies error: {str(e)}")
# Advanced ----------------------------------------------------------
[docs] async def browser_emulate_device(device_name: str, context_id: str = None) -> str: """Browser emulate device. Args: device_name (str): The device name value. context_id (str): The context id value. Returns: str: Result string. """ if not PLAYWRIGHT_AVAILABLE: return _json_response("error", error="Playwright not installed") try: if not await _bm.ensure_ready(): return _json_response("error", error="Browser not available") device = _bm.playwright.devices.get(device_name) if not device: available = list(_bm.playwright.devices.keys())[:20] return _json_response("error", error=f"Unknown device: {device_name}. Some available: {available}") new_ctx_id = context_id or f"device_{device_name.replace(' ', '_').lower()}" await _bm.create_new_context(new_ctx_id, device=device_name) return _json_response("success", message=f"Device emulation enabled: {device_name}", data={ "context_id": new_ctx_id, "device": device_name, "viewport": device.get("viewport"), "user_agent": device.get("user_agent", "")[:100] + "...", }) except Exception as e: _bm.record_error() return _json_response("error", error=f"Device emulation error: {str(e)}")
[docs] async def browser_set_geolocation(latitude: float, longitude: float, accuracy: float = 100, context_id: str = None) -> str: """Browser set geolocation. Args: latitude (float): The latitude value. longitude (float): The longitude value. accuracy (float): The accuracy value. context_id (str): The context id value. Returns: str: Result string. """ if not PLAYWRIGHT_AVAILABLE: return _json_response("error", error="Playwright not installed") if not (-90 <= latitude <= 90): return _json_response("error", error="Latitude must be between -90 and 90") if not (-180 <= longitude <= 180): return _json_response("error", error="Longitude must be between -180 and 180") try: if not await _bm.ensure_ready(): return _json_response("error", error="Browser not available") new_ctx_id = context_id or f"geo_{latitude:.2f}_{longitude:.2f}" await _bm.create_new_context( new_ctx_id, geolocation={"latitude": latitude, "longitude": longitude, "accuracy": accuracy}, ) return _json_response("success", message="Geolocation set", data={ "context_id": new_ctx_id, "latitude": latitude, "longitude": longitude, "accuracy": accuracy, }) except Exception as e: _bm.record_error() return _json_response("error", error=f"Geolocation error: {str(e)}")
[docs] async def browser_set_viewport(width: int, height: int, context_id: str = None) -> str: """Browser set viewport. Args: width (int): The width value. height (int): The height value. context_id (str): The context id value. Returns: str: Result string. """ if not PLAYWRIGHT_AVAILABLE: return _json_response("error", error="Playwright not installed") if not (100 <= width <= 4000): return _json_response("error", error="Width must be between 100 and 4000") if not (100 <= height <= 4000): return _json_response("error", error="Height must be between 100 and 4000") try: page = await _bm.get_page(context_id) await page.set_viewport_size({"width": width, "height": height}) return _json_response("success", message="Viewport set", data={"width": width, "height": height}) except Exception as e: _bm.record_error() return _json_response("error", error=f"Set viewport error: {str(e)}")
[docs] async def browser_block_resources(resource_types: str, context_id: str = None) -> str: """Browser block resources. Args: resource_types (str): The resource types value. context_id (str): The context id value. Returns: str: Result string. """ if not PLAYWRIGHT_AVAILABLE: return _json_response("error", error="Playwright not installed") try: parsed_types = json.loads(resource_types) if isinstance(resource_types, str) else resource_types except (json.JSONDecodeError, TypeError): return _json_response("error", error="resource_types must be a JSON array of strings") valid_types = ["image", "stylesheet", "font", "script", "media", "websocket"] invalid = [t for t in parsed_types if t not in valid_types] if invalid: return _json_response("error", error=f"Invalid resource types: {invalid}. Valid: {valid_types}") try: page = await _bm.get_page(context_id) async def route_handler(route): """Route handler. Args: route: The route value. """ if route.request.resource_type in parsed_types: await route.abort() else: await route.continue_() await page.route("**/*", route_handler) return _json_response("success", message="Resource blocking enabled", data={"blocked_types": parsed_types}) except Exception as e: _bm.record_error() return _json_response("error", error=f"Block resources error: {str(e)}")
[docs] async def browser_get_status() -> str: """Browser get status. Returns: str: Result string. """ status = { "playwright_available": PLAYWRIGHT_AVAILABLE, "is_initialized": _bm.is_initialized, "browser_connected": _bm.browser.is_connected() if _bm.browser else False, "usage_count": _bm.usage_count, "error_count": _bm.error_count, "active_contexts": list(_bm.contexts.keys()), "active_pages": list(_bm.pages.keys()), "last_used": _bm.last_used, } return _json_response("success", message="Browser status retrieved", data=status)
[docs] async def browser_restart() -> str: """Browser restart. Returns: str: Result string. """ if not PLAYWRIGHT_AVAILABLE: return _json_response("error", error="Playwright not installed") try: success = await _bm.restart() if success: return _json_response("success", message="Browser restarted successfully") return _json_response("error", error="Failed to restart browser") except Exception as e: return _json_response("error", error=f"Restart error: {str(e)}")
[docs] async def browser_new_context(context_id: str = None, incognito: bool = True) -> str: """Browser new context. Args: context_id (str): The context id value. incognito (bool): The incognito value. Returns: str: Result string. """ if not PLAYWRIGHT_AVAILABLE: return _json_response("error", error="Playwright not installed") try: new_id = await _bm.create_new_context(context_id) return _json_response("success", message="New context created", data={ "context_id": new_id, "incognito": incognito, }) except Exception as e: _bm.record_error() return _json_response("error", error=f"Create context error: {str(e)}")
# Network Interception ----------------------------------------------
[docs] async def browser_enable_response_interception(url_patterns: str, context_id: str = None) -> str: """Browser enable response interception. Args: url_patterns (str): The url patterns value. context_id (str): The context id value. Returns: str: Result string. """ if not PLAYWRIGHT_AVAILABLE: return _json_response("error", error="Playwright not installed") try: parsed_patterns = json.loads(url_patterns) if isinstance(url_patterns, str) else url_patterns except (json.JSONDecodeError, TypeError): return _json_response("error", error="url_patterns must be a JSON array of strings") if not parsed_patterns or not isinstance(parsed_patterns, list): return _json_response("error", error="url_patterns must be a non-empty list") try: await _bm.get_page(context_id) ctx_id = context_id or _bm.default_context_id existing = _bm.response_patterns.get(ctx_id, []) for pattern in parsed_patterns: if pattern not in existing: existing.append(pattern) _bm.response_patterns[ctx_id] = existing return _json_response("success", message="Response interception enabled", data={ "patterns": existing, "context_id": ctx_id, }) except Exception as e: _bm.record_error() return _json_response("error", error=f"Enable interception error: {str(e)}")
[docs] async def browser_wait_for_response(url_pattern: str, timeout: int = DEFAULT_TIMEOUT, context_id: str = None) -> str: """Browser wait for response. Args: url_pattern (str): The url pattern value. timeout (int): Maximum wait time in seconds. context_id (str): The context id value. Returns: str: Result string. """ if not PLAYWRIGHT_AVAILABLE: return _json_response("error", error="Playwright not installed") try: page = await _bm.get_page(context_id) async with page.expect_response( lambda resp: fnmatch.fnmatch(resp.url, url_pattern), timeout=timeout, ) as response_info: pass response = await response_info.value body = None content_type = response.headers.get("content-type", "") try: body_bytes = await response.body() if len(body_bytes) <= MAX_RESPONSE_BODY_SIZE: if "application/json" in content_type: body = await _json_loads_network_body(body_bytes) elif "text/" in content_type: body = body_bytes.decode("utf-8") else: body = f"[Binary data: {len(body_bytes)} bytes]" else: body = f"[Response too large: {len(body_bytes)} bytes]" except Exception as e: body = f"[Could not read body: {str(e)}]" return _json_response("success", message="Response captured", data={ "url": response.url, "status": response.status, "status_text": response.status_text, "headers": dict(response.headers), "body": body, "method": response.request.method, }) except PlaywrightTimeout: return _json_response("error", error=f"Timeout waiting for response matching: {url_pattern}") except Exception as e: _bm.record_error() return _json_response("error", error=f"Wait for response error: {str(e)}")
[docs] async def browser_get_intercepted_responses(context_id: str = None, url_filter: str = None) -> str: """Browser get intercepted responses. Args: context_id (str): The context id value. url_filter (str): The url filter value. Returns: str: Result string. """ try: responses = list(_bm.intercepted_responses.values()) if context_id: responses = [r for r in responses if r.get("context_id") == context_id] if url_filter: responses = [r for r in responses if fnmatch.fnmatch(r.get("url", ""), url_filter)] summaries = [{ "response_id": r["response_id"], "url": r["url"], "status": r["status"], "method": r["method"], "timestamp": r["timestamp"], "has_body": r.get("body") is not None, } for r in responses] return _json_response("success", message=f"Found {len(summaries)} intercepted responses", data={ "count": len(summaries), "responses": summaries, }) except Exception as e: return _json_response("error", error=f"Get intercepted responses error: {str(e)}")
[docs] async def browser_get_response_body(response_id: str) -> str: """Browser get response body. Args: response_id (str): The response id value. Returns: str: Result string. """ try: if response_id not in _bm.intercepted_responses: return _json_response("error", error=f"Response not found: {response_id}") return _json_response("success", message="Response body retrieved", data=_bm.intercepted_responses[response_id]) except Exception as e: return _json_response("error", error=f"Get response body error: {str(e)}")
[docs] async def browser_clear_intercepted_responses(context_id: str = None) -> str: """Browser clear intercepted responses. Args: context_id (str): The context id value. Returns: str: Result string. """ try: if context_id: to_remove = [rid for rid, r in _bm.intercepted_responses.items() if r.get("context_id") == context_id] for rid in to_remove: del _bm.intercepted_responses[rid] count = len(to_remove) else: count = len(_bm.intercepted_responses) _bm.intercepted_responses.clear() return _json_response("success", message=f"Cleared {count} intercepted responses") except Exception as e: return _json_response("error", error=f"Clear responses error: {str(e)}")
# Session Persistence -----------------------------------------------
[docs] async def browser_create_persistent_context(session_name: str, context_id: str = None) -> str: """Browser create persistent context. Args: session_name (str): The session name value. context_id (str): The context id value. Returns: str: Result string. """ if not PLAYWRIGHT_AVAILABLE: return _json_response("error", error="Playwright not installed") if not session_name or not isinstance(session_name, str): return _json_response("error", error="session_name must be a non-empty string") try: ctx_id = context_id or session_name if ctx_id in _bm.contexts: return _json_response("error", error=f"Context '{ctx_id}' already exists. Close it first.") if not _bm.playwright: await _bm.initialize() await _bm._create_persistent_context(session_name, ctx_id) safe_session_name = re.sub(r"[^\w\-]", "_", session_name) return _json_response("success", message="Persistent context created", data={ "context_id": ctx_id, "session_name": safe_session_name, "session_path": str(SESSIONS_DIR / safe_session_name), }) except Exception as e: _bm.record_error() return _json_response("error", error=f"Create persistent context error: {str(e)}")
[docs] async def browser_list_sessions() -> str: """Browser list sessions. Returns: str: Result string. """ try: sessions = [] if SESSIONS_DIR.exists(): for session_dir in SESSIONS_DIR.iterdir(): if session_dir.is_dir(): size = sum(f.stat().st_size for f in session_dir.rglob("*") if f.is_file()) active_context = None for ctx_id, sess_name in _bm.persistent_contexts.items(): if sess_name == session_dir.name: active_context = ctx_id break sessions.append({ "session_name": session_dir.name, "path": str(session_dir), "size_bytes": size, "size_mb": round(size / (1024 * 1024), 2), "active_context": active_context, }) return _json_response("success", message=f"Found {len(sessions)} sessions", data={ "count": len(sessions), "sessions": sessions, }) except Exception as e: return _json_response("error", error=f"List sessions error: {str(e)}")
[docs] async def browser_delete_session(session_name: str) -> str: """Browser delete session. Args: session_name (str): The session name value. Returns: str: Result string. """ if not session_name: return _json_response("error", error="session_name is required") try: safe_session_name = re.sub(r"[^\w\-]", "_", session_name) session_path = SESSIONS_DIR / safe_session_name if not session_path.exists(): return _json_response("error", error=f"Session not found: {session_name}") for ctx_id, sess_name in _bm.persistent_contexts.items(): if sess_name == safe_session_name: return _json_response("error", error=f"Session is in use by context '{ctx_id}'. Close it first.") await asyncio.to_thread(shutil.rmtree, session_path) return _json_response("success", message=f"Session '{session_name}' deleted", data={ "session_name": safe_session_name, "path": str(session_path), }) except Exception as e: return _json_response("error", error=f"Delete session error: {str(e)}")
# Downloads ---------------------------------------------------------
[docs] async def browser_wait_for_download(timeout: int = DEFAULT_TIMEOUT, context_id: str = None) -> str: """Browser wait for download. Args: timeout (int): Maximum wait time in seconds. context_id (str): The context id value. Returns: str: Result string. """ if not PLAYWRIGHT_AVAILABLE: return _json_response("error", error="Playwright not installed") try: await _bm.get_page(context_id) ctx_id = context_id or _bm.default_context_id future = asyncio.get_event_loop().create_future() _bm.pending_downloads[ctx_id] = future try: download_info = await asyncio.wait_for(future, timeout=timeout / 1000) return _json_response("success", message="Download completed", data=download_info) except asyncio.TimeoutError: _bm.pending_downloads.pop(ctx_id, None) return _json_response("error", error=f"Timeout waiting for download after {timeout}ms") except Exception as e: _bm.record_error() return _json_response("error", error=f"Wait for download error: {str(e)}")
[docs] async def browser_get_downloads(context_id: str = None, state: str = None) -> str: """Browser get downloads. Args: context_id (str): The context id value. state (str): The state value. Returns: str: Result string. """ try: downloads = list(_bm.downloads.values()) if context_id: downloads = [d for d in downloads if d.get("context_id") == context_id] if state: downloads = [d for d in downloads if d.get("state") == state] return _json_response("success", message=f"Found {len(downloads)} downloads", data={ "count": len(downloads), "downloads": downloads, }) except Exception as e: return _json_response("error", error=f"Get downloads error: {str(e)}")
[docs] async def browser_save_download(download_id: str, filename: str = None) -> str: """Browser save download. Args: download_id (str): The download id value. filename (str): The filename value. Returns: str: Result string. """ try: if download_id not in _bm.downloads: return _json_response("error", error=f"Download not found: {download_id}") download_info = _bm.downloads[download_id] if filename: old_path = Path(download_info["path"]) if old_path.exists(): safe_filename = re.sub(r"[^\w\-\.]", "_", filename) new_path = DOWNLOADS_DIR / safe_filename old_path.rename(new_path) download_info["path"] = str(new_path) download_info["renamed_to"] = safe_filename return _json_response("success", message="Download info retrieved", data=download_info) except Exception as e: return _json_response("error", error=f"Save download error: {str(e)}")
# Console & Dialog --------------------------------------------------
[docs] async def browser_get_console_logs(context_id: str = None, level: str = None, limit: int = 100) -> str: """Browser get console logs. Args: context_id (str): The context id value. level (str): The level value. limit (int): Maximum number of items. Returns: str: Result string. """ try: ctx_id = context_id or _bm.default_context_id logs = _bm.console_logs.get(ctx_id, []) if level: valid_levels = ["log", "warn", "error", "info", "debug", "warning"] if level not in valid_levels: return _json_response("error", error=f"Invalid level. Must be one of: {valid_levels}") logs = [log for log in logs if log.get("type") == level] logs = logs[-limit:] return _json_response("success", message=f"Retrieved {len(logs)} console logs", data={ "count": len(logs), "logs": logs, }) except Exception as e: return _json_response("error", error=f"Get console logs error: {str(e)}")
[docs] async def browser_clear_console_logs(context_id: str = None) -> str: """Browser clear console logs. Args: context_id (str): The context id value. Returns: str: Result string. """ try: ctx_id = context_id or _bm.default_context_id count = len(_bm.console_logs.get(ctx_id, [])) _bm.console_logs[ctx_id] = [] return _json_response("success", message=f"Cleared {count} console logs") except Exception as e: return _json_response("error", error=f"Clear console logs error: {str(e)}")
[docs] async def browser_handle_dialog(action: str, prompt_text: str = None, context_id: str = None) -> str: """Browser handle dialog. Args: action (str): The action value. prompt_text (str): The prompt text value. context_id (str): The context id value. Returns: str: Result string. """ if not PLAYWRIGHT_AVAILABLE: return _json_response("error", error="Playwright not installed") valid_actions = ["accept", "dismiss", "manual"] if action not in valid_actions: return _json_response("error", error=f"Invalid action. Must be one of: {valid_actions}") try: ctx_id = context_id or _bm.default_context_id _bm.dialog_handlers[ctx_id] = action last_dialog = _bm.pending_dialogs.get(ctx_id) return _json_response("success", message=f"Dialog handler set to: {action}", data={ "action": action, "prompt_text": prompt_text, "last_dialog": last_dialog, }) except Exception as e: return _json_response("error", error=f"Handle dialog error: {str(e)}")
[docs] async def browser_get_pending_dialog(context_id: str = None) -> str: """Browser get pending dialog. Args: context_id (str): The context id value. Returns: str: Result string. """ try: ctx_id = context_id or _bm.default_context_id dialog_info = _bm.pending_dialogs.get(ctx_id) if dialog_info: return _json_response("success", message="Dialog info retrieved", data=dialog_info) return _json_response("success", message="No dialog has appeared", data=None) except Exception as e: return _json_response("error", error=f"Get pending dialog error: {str(e)}")
# =================================================================== # v3 multi-tool registration # =================================================================== def _p(props: dict, required: list = None): """Shorthand to build a parameters schema.""" schema = {"type": "object", "properties": props} if required: schema["required"] = required return schema _S = {"type": "string"} _I = {"type": "integer"} _N = {"type": "number"} _B = {"type": "boolean"} TOOLS = [ # --- Navigation --- {"name": "browser_navigate", "description": "Navigate the headless Firefox browser to a URL.", "parameters": _p({"url": {**_S, "description": "URL to navigate to."}, "wait_until": {**_S, "description": "When to consider navigation complete (load|domcontentloaded|networkidle|commit)."}, "timeout": {**_I, "description": "Timeout in ms (default 30000)."}, "context_id": {**_S, "description": "Browser context ID (optional)."}}, ["url"]), "handler": browser_navigate}, {"name": "browser_go_back", "description": "Navigate back in browser history.", "parameters": _p({"timeout": {**_I, "description": "Timeout in ms."}, "context_id": _S}), "handler": browser_go_back}, {"name": "browser_go_forward", "description": "Navigate forward in browser history.", "parameters": _p({"timeout": {**_I, "description": "Timeout in ms."}, "context_id": _S}), "handler": browser_go_forward}, {"name": "browser_reload", "description": "Reload the current page.", "parameters": _p({"wait_until": _S, "timeout": _I, "context_id": _S}), "handler": browser_reload}, {"name": "browser_close_context", "description": "Close a browser context (window).", "parameters": _p({"context_id": {**_S, "description": "ID of the context to close."}}, ["context_id"]), "handler": browser_close_context}, # --- Interaction --- {"name": "browser_click", "description": "Click on an element on the page.", "parameters": _p({"selector": {**_S, "description": "Element selector (CSS, XPath, text=...)."}, "button": _S, "click_count": _I, "timeout": _I, "context_id": _S}, ["selector"]), "handler": browser_click}, {"name": "browser_type", "description": "Type text into an input element with optional delay.", "parameters": _p({"selector": _S, "text": _S, "delay": _I, "clear_first": _B, "timeout": _I, "context_id": _S}, ["selector", "text"]), "handler": browser_type}, {"name": "browser_fill", "description": "Fill an input field instantly (no typing simulation).", "parameters": _p({"selector": _S, "value": _S, "timeout": _I, "context_id": _S}, ["selector", "value"]), "handler": browser_fill}, {"name": "browser_fill_form", "description": "Fill multiple form fields at once.", "parameters": _p({"fields": {**_S, "description": "JSON object mapping selectors to values."}, "timeout": _I, "context_id": _S}, ["fields"]), "handler": browser_fill_form}, {"name": "browser_select_option", "description": "Select an option from a dropdown/select element.", "parameters": _p({"selector": _S, "value": _S, "label": _S, "index": _I, "timeout": _I, "context_id": _S}, ["selector"]), "handler": browser_select_option}, {"name": "browser_press_key", "description": "Press a keyboard key (Enter, Tab, Escape, etc.).", "parameters": _p({"key": {**_S, "description": "Key to press (e.g. Enter, Tab, Control+a)."}, "selector": _S, "timeout": _I, "context_id": _S}, ["key"]), "handler": browser_press_key}, {"name": "browser_hover", "description": "Hover over an element.", "parameters": _p({"selector": _S, "timeout": _I, "context_id": _S}, ["selector"]), "handler": browser_hover}, {"name": "browser_scroll", "description": "Scroll the page or an element.", "parameters": _p({"direction": {**_S, "description": "up|down|left|right."}, "amount": _I, "selector": _S, "context_id": _S}), "handler": browser_scroll}, # --- Content Extraction --- {"name": "browser_get_content", "description": "Get page content in HTML, text, or markdown format.", "parameters": _p({"format": {**_S, "description": "html|text|markdown."}, "selector": _S, "context_id": _S}), "handler": browser_get_content}, {"name": "browser_get_text", "description": "Get the text content of an element.", "parameters": _p({"selector": _S, "timeout": _I, "context_id": _S}, ["selector"]), "handler": browser_get_text}, {"name": "browser_get_attribute", "description": "Get an attribute value from an element.", "parameters": _p({"selector": _S, "attribute": {**_S, "description": "Attribute name (href, src, class, etc.)."}, "timeout": _I, "context_id": _S}, ["selector", "attribute"]), "handler": browser_get_attribute}, {"name": "browser_get_page_info", "description": "Get current page URL, title, and viewport info.", "parameters": _p({"context_id": _S}), "handler": browser_get_page_info}, {"name": "browser_query_selector_all", "description": "Find all elements matching a selector.", "parameters": _p({"selector": _S, "attributes": {**_S, "description": "JSON array of attribute names to retrieve."}, "timeout": _I, "context_id": _S}, ["selector"]), "handler": browser_query_selector_all}, # --- Waiting --- {"name": "browser_wait_for_selector", "description": "Wait for an element to appear, disappear, or change state.", "parameters": _p({"selector": _S, "state": {**_S, "description": "visible|hidden|attached|detached."}, "timeout": _I, "context_id": _S}, ["selector"]), "handler": browser_wait_for_selector}, {"name": "browser_wait_for_load_state", "description": "Wait for page load state (load, domcontentloaded, networkidle).", "parameters": _p({"state": _S, "timeout": _I, "context_id": _S}), "handler": browser_wait_for_load_state}, {"name": "browser_wait_for_url", "description": "Wait for the URL to match a pattern.", "parameters": _p({"url_pattern": _S, "timeout": _I, "context_id": _S}, ["url_pattern"]), "handler": browser_wait_for_url}, {"name": "browser_wait", "description": "Wait for a specified number of milliseconds.", "parameters": _p({"milliseconds": {**_I, "description": "Time to wait (0-60000)."}, "context_id": _S}, ["milliseconds"]), "handler": browser_wait}, # --- Screenshots & PDF --- {"name": "browser_screenshot", "description": "Take a screenshot of the page or element.", "parameters": _p({"full_page": _B, "selector": _S, "filename": _S, "context_id": _S}), "handler": browser_screenshot}, {"name": "browser_pdf", "description": "Generate a PDF of the current page.", "parameters": _p({"filename": _S, "format": {**_S, "description": "A4|Letter|Legal|Tabloid|A3|A5."}, "landscape": _B, "print_background": _B, "context_id": _S}), "handler": browser_pdf}, # --- JavaScript --- {"name": "browser_evaluate", "description": "Evaluate JavaScript and return the result.", "parameters": _p({"expression": _S, "context_id": _S}, ["expression"]), "handler": browser_evaluate}, {"name": "browser_execute", "description": "Execute JavaScript code on the page.", "parameters": _p({"script": _S, "context_id": _S}, ["script"]), "handler": browser_execute}, # --- Cookies --- {"name": "browser_get_cookies", "description": "Get browser cookies.", "parameters": _p({"urls": {**_S, "description": "JSON array of URLs to filter cookies."}, "context_id": _S}), "handler": browser_get_cookies}, {"name": "browser_set_cookies", "description": "Set browser cookies.", "parameters": _p({"cookies": {**_S, "description": "JSON array of cookie objects."}, "context_id": _S}, ["cookies"]), "handler": browser_set_cookies}, {"name": "browser_clear_cookies", "description": "Clear all browser cookies.", "parameters": _p({"context_id": _S}), "handler": browser_clear_cookies}, # --- Advanced --- {"name": "browser_emulate_device", "description": "Emulate a mobile device or tablet.", "parameters": _p({"device_name": {**_S, "description": "Device name (e.g. iPhone 12, Pixel 5)."}, "context_id": _S}, ["device_name"]), "handler": browser_emulate_device}, {"name": "browser_set_geolocation", "description": "Set the browser's geolocation.", "parameters": _p({"latitude": _N, "longitude": _N, "accuracy": _N, "context_id": _S}, ["latitude", "longitude"]), "handler": browser_set_geolocation}, {"name": "browser_set_viewport", "description": "Set the browser viewport size.", "parameters": _p({"width": _I, "height": _I, "context_id": _S}, ["width", "height"]), "handler": browser_set_viewport}, {"name": "browser_block_resources", "description": "Block images, CSS, fonts, or other resources.", "parameters": _p({"resource_types": {**_S, "description": "JSON array of types: image|stylesheet|font|script|media|websocket."}, "context_id": _S}, ["resource_types"]), "handler": browser_block_resources}, {"name": "browser_new_context", "description": "Create a new browser context (like incognito window).", "parameters": _p({"context_id": _S, "incognito": _B}), "handler": browser_new_context}, # --- Status & Management --- {"name": "browser_get_status", "description": "Get the browser manager status.", "parameters": _p({}), "handler": browser_get_status}, {"name": "browser_restart", "description": "Restart the browser instance.", "parameters": _p({}), "handler": browser_restart}, # --- Network Interception --- {"name": "browser_enable_response_interception", "description": "Enable interception of network responses matching URL patterns.", "parameters": _p({"url_patterns": {**_S, "description": "JSON array of glob patterns."}, "context_id": _S}, ["url_patterns"]), "handler": browser_enable_response_interception}, {"name": "browser_wait_for_response", "description": "Wait for a specific network response and capture its body.", "parameters": _p({"url_pattern": _S, "timeout": _I, "context_id": _S}, ["url_pattern"]), "handler": browser_wait_for_response}, {"name": "browser_get_intercepted_responses", "description": "Get all captured network responses.", "parameters": _p({"context_id": _S, "url_filter": _S}), "handler": browser_get_intercepted_responses}, {"name": "browser_get_response_body", "description": "Get the full body of an intercepted response.", "parameters": _p({"response_id": _S}, ["response_id"]), "handler": browser_get_response_body}, {"name": "browser_clear_intercepted_responses", "description": "Clear stored intercepted responses.", "parameters": _p({"context_id": _S}), "handler": browser_clear_intercepted_responses}, # --- Session Persistence --- {"name": "browser_create_persistent_context", "description": "Create a browser context with persistent session storage.", "parameters": _p({"session_name": _S, "context_id": _S}, ["session_name"]), "handler": browser_create_persistent_context}, {"name": "browser_list_sessions", "description": "List all saved browser sessions.", "parameters": _p({}), "handler": browser_list_sessions}, {"name": "browser_delete_session", "description": "Delete a saved browser session.", "parameters": _p({"session_name": _S}, ["session_name"]), "handler": browser_delete_session}, # --- Downloads --- {"name": "browser_wait_for_download", "description": "Wait for a file download to complete.", "parameters": _p({"timeout": _I, "context_id": _S}), "handler": browser_wait_for_download}, {"name": "browser_get_downloads", "description": "Get list of all captured downloads.", "parameters": _p({"context_id": _S, "state": {**_S, "description": "Filter: completed|pending|failed."}}), "handler": browser_get_downloads}, {"name": "browser_save_download", "description": "Get download info or rename a downloaded file.", "parameters": _p({"download_id": _S, "filename": _S}, ["download_id"]), "handler": browser_save_download}, # --- Console & Dialog --- {"name": "browser_get_console_logs", "description": "Get captured console log messages from the browser.", "parameters": _p({"context_id": _S, "level": {**_S, "description": "Filter: log|warn|error|info|debug."}, "limit": _I}), "handler": browser_get_console_logs}, {"name": "browser_clear_console_logs", "description": "Clear captured console logs.", "parameters": _p({"context_id": _S}), "handler": browser_clear_console_logs}, {"name": "browser_handle_dialog", "description": "Configure how to handle JavaScript alerts, confirms, and prompts.", "parameters": _p({"action": {**_S, "description": "accept|dismiss|manual."}, "prompt_text": _S, "context_id": _S}, ["action"]), "handler": browser_handle_dialog}, {"name": "browser_get_pending_dialog", "description": "Get information about the last JavaScript dialog.", "parameters": _p({"context_id": _S}), "handler": browser_get_pending_dialog}, ]