Source code for tools.winrm_tools

"""WinRM and PowerShell remoting (WS-Man) tools for remote Windows management.

Uses pywinrm (HTTP/HTTPS to ports 5985/5986). Per-user named sessions, same
ergonomics as ssh_* tools. Requires UNSANDBOXED_EXEC.
"""

from __future__ import annotations

import asyncio
import jsonutil as json
import logging
import re
from typing import Any, TYPE_CHECKING

from tools._credential_profile_store import (
    delete_profile as _cred_delete,
    list_profile_names as _cred_list,
    load_profile as _cred_load,
    merge_profile as _cred_merge,
    save_profile as _cred_save,
)

if TYPE_CHECKING:
    from tool_context import ToolContext

logger = logging.getLogger(__name__)

CRED_PREFIX = "winrm"

try:
    import winrm  # type: ignore[import-untyped]

    WINRM_AVAILABLE = True
except ImportError:
    winrm = None  # type: ignore[assignment]
    WINRM_AVAILABLE = False

_winrm_sessions: dict[str, dict[str, Any]] = {}

DEFAULT_CONNECT_TIMEOUT = 30
DEFAULT_COMMAND_TIMEOUT = 120

_HOST_RE = re.compile(r"^[0-9A-Za-z._-]+$")
_VALID_TRANSPORTS = frozenset(
    {
        "ntlm",
        "credssp",
        "basic",
        "kerberos",
        "plaintext",
        "ssl",
        "certificate",
    }
)


[docs] async def check_unsandboxed_exec(ctx: Any) -> str | None: """Gate a WinRM operation behind the ``UNSANDBOXED_EXEC`` privilege. Verifies that the calling user is allowed to run arbitrary remote commands. Every WinRM tool handler calls this first and returns its result verbatim when it is non-``None``, short-circuiting the operation. Interactions: imports and calls :func:`tools.alter_privileges.has_privilege` with ``PRIVILEGES["UNSANDBOXED_EXEC"]``, reading ``ctx.redis``, ``ctx.config`` and ``ctx.user_id`` off the tool context (the privilege check itself consults the Redis-backed privilege store). On denial it logs a ``SECURITY`` warning naming the user. This helper is also imported and reused by the sibling Windows tool modules ``tools.ad_dns_tools``, ``tools.ad_gpo_tools`` and ``tools.agpm_tools``. Called by: every handler in this module (``_winrm_connect``, ``_winrm_run_ps``, ``_winrm_run_cmd``, ``_winrm_save_credentials``, etc.) and by the AD/GPO/AGPM tool handlers listed above. Args: ctx: The active :class:`~tool_context.ToolContext` (or any object exposing ``redis``, ``config`` and ``user_id`` attributes). Returns: ``None`` when the user holds the privilege (proceed). Otherwise a JSON string with ``success: False`` describing either the missing privilege or that the privilege system is unavailable. """ try: from tools.alter_privileges import has_privilege, PRIVILEGES redis = getattr(ctx, "redis", None) config = getattr(ctx, "config", None) user_id = getattr(ctx, "user_id", "") or "" if not await has_privilege( redis, user_id, PRIVILEGES["UNSANDBOXED_EXEC"], config ): logger.warning( "SECURITY: User %s attempted WinRM tool without UNSANDBOXED_EXEC", user_id, ) return json.dumps( { "success": False, "error": ( "The user does not have the UNSANDBOXED_EXEC privilege. " "Ask an admin to grant it with the alter_privileges tool." ), } ) except ImportError: return json.dumps( { "success": False, "error": "Privilege system unavailable.", } ) return None
def _is_owner(name: str, user_id: str) -> bool: """Return whether the session named *name* belongs to *user_id*. Enforces per-user isolation of WinRM sessions by comparing the stored ``owner_id`` against the caller, reading the module-level ``_winrm_sessions`` registry. Called by the ownership checks in ``resolve_owned_winrm_session``, ``_winrm_run_cmd``, ``_winrm_disconnect`` and other handlers before they act on a session. Args: name: Connection label to look up. user_id: Identifier of the requesting user. Returns: ``True`` if a session with that name exists and its ``owner_id`` matches, else ``False``. """ info = _winrm_sessions.get(name) return info is not None and info.get("owner_id") == user_id def _user_sessions(user_id: str) -> dict[str, dict[str, Any]]: """Return the subset of ``_winrm_sessions`` owned by *user_id*. Filters the global session registry so callers only see their own connections. Called by ``resolve_owned_winrm_session`` and ``_winrm_run_cmd`` to list a user's connection names in error messages, by ``_winrm_list_connections`` to enumerate active sessions, and indirectly when reporting "Your connections". Args: user_id: Identifier of the requesting user. Returns: A new dict mapping connection name to its session-info dict for every session whose ``owner_id`` equals *user_id*. """ return { n: info for n, info in _winrm_sessions.items() if info.get("owner_id") == user_id } def _host_ok(host: str) -> bool: """Validate a target host string against ``_HOST_RE`` before connecting. Rejects empty, overlong (>512 chars) or syntactically invalid hostnames/IPs so they are never interpolated into a WinRM endpoint URL. Called by ``_winrm_connect`` to guard the resolved host (this is a module-local check, distinct from the identically named helper in ``tools.ipmi_tools``). Args: host: Candidate hostname or IP address. Returns: ``True`` if the trimmed host is non-empty, at most 512 characters and matches the allowed character set, else ``False``. """ h = (host or "").strip() if not h or len(h) > 512: return False return bool(_HOST_RE.fullmatch(h)) def _decode_out(data: bytes) -> str: """Decode raw WinRM stdout/stderr bytes to text, never raising. Converts the ``std_out``/``std_err`` byte payloads returned by pywinrm into a UTF-8 string, replacing undecodable bytes rather than failing. Called by ``execute_winrm_ps`` and ``_winrm_run_cmd`` to normalise command output before it is placed in the JSON result. Args: data: Raw output bytes (may be empty or ``None``-ish/falsy). Returns: The decoded string, or ``""`` when *data* is empty. """ if not data: return "" return data.decode("utf-8", errors="replace")
[docs] def resolve_owned_winrm_session( connection_name: str, user_id: str ) -> tuple[Any, str | None]: """Look up a WinRM session and enforce that the caller owns it. Central ownership-and-existence gate shared by the run handlers: it confirms that ``connection_name`` is present in the registry and was opened by ``user_id`` before any command is allowed to run on it, returning a ready-to-send JSON error string (rather than raising) when it is not. Interactions: reads the module-level ``_winrm_sessions`` registry, listing the user's own connection names via :func:`_user_sessions` for a helpful error, and checks ownership with :func:`_is_owner`; no network or Redis use. Called by :func:`get_owned_winrm_session` and :func:`_winrm_run_ps` here, and reused by the sibling Windows tool modules ``tools.ad_gpo_tools``, ``tools.ad_dns_tools`` and ``tools.agpm_tools``. Args: connection_name: Label of the session to resolve. user_id: Identifier of the requesting user, used for the ownership check. Returns: tuple[Any, str | None]: ``(session, None)`` when the named session exists and is owned by the caller; otherwise ``(None, json_error_string)`` whose message distinguishes an unknown name (listing the user's connections, or noting there are none) from a connection owned by someone else. """ if connection_name not in _winrm_sessions: mine = list(_user_sessions(user_id).keys()) if mine: return None, json.dumps( { "success": False, "error": f"No connection '{connection_name}'. Your connections: {mine}", } ) return None, json.dumps( { "success": False, "error": "No active WinRM sessions. Use winrm_connect first.", } ) if not _is_owner(connection_name, user_id): return None, json.dumps({"success": False, "error": "Not your connection."}) return _winrm_sessions[connection_name]["session"], None
[docs] def get_owned_winrm_session(connection_name: str, user_id: str) -> Any | None: """Return the owned pywinrm session, discarding the error string. Thin convenience wrapper over :func:`resolve_owned_winrm_session` for callers that only want the session object and treat any failure (unknown name or wrong owner) uniformly as ``None``. Interactions: delegates entirely to :func:`resolve_owned_winrm_session` (and thus to the ``_winrm_sessions`` registry, :func:`_user_sessions` and :func:`_is_owner`); no other side effects. No callers were found in the repo, so this is a public convenience entry point retained for sibling tool modules and tests. Args: connection_name: Label of the session to resolve. user_id: Identifier of the requesting user, used for the ownership check. Returns: Any | None: The live pywinrm ``Session`` when it exists and is owned by the caller, otherwise ``None``. """ sess, err = resolve_owned_winrm_session(connection_name, user_id) if err is not None: return None return sess
[docs] async def execute_winrm_ps(session: Any, script: str, timeout: int) -> dict[str, Any]: """Run a PowerShell script on a connected WinRM session without blocking. Async wrapper around pywinrm's blocking ``run_ps``: it derives the WS-Man read/operation timeouts from ``timeout``, off-loads the synchronous remote call to a worker thread, and normalises the result (or any failure) into a plain dict so handlers never have to catch exceptions themselves. Interactions: invokes the remote Windows host's PowerShell over HTTP/HTTPS by running the nested ``_run`` closure through ``asyncio.to_thread`` under an ``asyncio.wait_for`` guard; decodes the returned ``std_out``/``std_err`` bytes with :func:`_decode_out`; logs an exception via the module ``logger`` on error. Called by :func:`_winrm_run_ps` here and by the AD/GPO/AGPM tool handlers in ``tools.ad_gpo_tools``, ``tools.ad_dns_tools`` and ``tools.agpm_tools``. Args: session: A connected pywinrm ``Session`` (typically from :func:`resolve_owned_winrm_session`). script: The PowerShell script text to execute remotely. timeout: Maximum execution time in seconds; the WS-Man and outer ``wait_for`` timeouts are derived from it with margins. Returns: dict[str, Any]: ``{"success": True, "stdout": ..., "stderr": ..., "status_code": ...}`` on success, or ``{"success": False, "error": ...}`` on a timeout or any pywinrm exception. """ op_ts = max(1, int(timeout)) read_ts = op_ts + 30 def _run() -> Any: """Synchronously run the PowerShell *script* on the pywinrm *session*. Blocking helper that first sets the WS-Man protocol's ``read_timeout_sec`` and ``operation_timeout_sec`` on the captured *session* (using the ``read_ts``/``op_ts`` computed by the enclosing :func:`execute_winrm_ps`), then calls the blocking ``session.run_ps(script)``, which performs the remote PowerShell invocation over HTTP/HTTPS to the Windows host. This closure is off-loaded to a worker thread via ``asyncio.to_thread`` by :func:`execute_winrm_ps` so the event loop is never blocked; it has no other callers. Returns: The pywinrm ``Response`` object exposing ``std_out``, ``std_err`` and ``status_code``. Raises: Exception: Any error raised by pywinrm during the remote call is propagated to :func:`execute_winrm_ps`, which catches it. """ session.protocol.read_timeout_sec = read_ts session.protocol.operation_timeout_sec = op_ts return session.run_ps(script) try: resp = await asyncio.wait_for( asyncio.to_thread(_run), timeout=float(timeout + 35) ) except asyncio.TimeoutError: return { "success": False, "error": f"PowerShell execution timed out after {timeout}s.", } except Exception as exc: logger.exception("execute_winrm_ps failed: %s", exc) return {"success": False, "error": str(exc)} out = _decode_out(resp.std_out) err = _decode_out(resp.std_err) code = getattr(resp, "status_code", 0) return { "success": True, "stdout": out, "stderr": err, "status_code": code, }
def _sync_session_connect( target: str, username: str, password: str, transport: str, server_cert_validation: str, read_timeout_sec: int, operation_timeout_sec: int, ) -> Any: """Construct a blocking pywinrm ``Session`` with the given connection settings. Thin synchronous wrapper around ``winrm.Session(...)`` that exists so the blocking constructor can be off-loaded to a thread. The protocol requires ``read_timeout_sec`` to exceed ``operation_timeout_sec``; the caller computes both. Called by ``_winrm_connect`` via ``asyncio.to_thread`` (and patched in ``tests/test_winrm_tools.py`` / ``tests/test_credential_profile_merges.py``). Args: target: Full WS-Man endpoint URL, e.g. ``http://host:5985/wsman``. username: Authentication principal (e.g. ``DOMAIN\\user``). password: Authentication password. transport: pywinrm auth transport (``ntlm``, ``credssp``, ``kerberos``, ...). server_cert_validation: ``"validate"`` or ``"ignore"`` for TLS certs. read_timeout_sec: HTTP read timeout in seconds (must exceed the op timeout). operation_timeout_sec: WS-Man operation timeout in seconds. Returns: A connected ``winrm.Session`` instance. Raises: AssertionError: If pywinrm is not importable (``winrm`` is ``None``). Exception: Any error raised by ``winrm.Session`` (propagated to the caller, which catches it). """ assert winrm is not None return winrm.Session( target, auth=(username, password), transport=transport, server_cert_validation=server_cert_validation, read_timeout_sec=read_timeout_sec, operation_timeout_sec=operation_timeout_sec, ) async def _winrm_save_credentials( host: str, username: str, password: str = "", profile: str = "default", port: int | None = None, use_ssl: bool = False, transport: str = "ntlm", server_cert_validation: str = "ignore", connect_timeout: int = DEFAULT_CONNECT_TIMEOUT, ctx: "ToolContext | None" = None, ) -> str: """Handler for the ``winrm_save_credentials`` tool: persist a reusable profile. Validates and normalises a WinRM connection bundle (host, username, password, port, SSL, transport, cert policy, timeout) and stores it encrypted under a named profile private to the calling user, so later ``winrm_connect`` calls can reference it via ``credential_profile``. Interactions: gates on :func:`check_unsandboxed_exec`; delegates persistence to ``tools._credential_profile_store.save_profile`` (imported as ``_cred_save``) with ``CRED_PREFIX = "winrm"``, which encrypts the JSON and writes it to the Redis hash ``stargazer:winrm_credentials:{user_id}`` under the lowercased profile name. The returned summary echoes non-secret fields and a ``has_password`` flag but never the password itself. Called by: the tool-dispatch layer via the ``handler`` entry of the ``winrm_save_credentials`` definition in ``TOOLS``; no direct internal callers. Args: host: Target Windows hostname or IP (required). username: Authentication principal, e.g. ``DOMAIN\\user`` (required). password: Password; required unless ``transport`` is ``kerberos``. profile: Profile name to store under (default ``"default"``). port: WinRM port; defaults to 5986 when ``use_ssl`` else 5985. use_ssl: Whether to use HTTPS WinRM. transport: pywinrm auth transport (normalised to lowercase). server_cert_validation: ``"validate"`` or ``"ignore"`` (normalised). connect_timeout: Connect timeout in seconds. ctx: Active tool context; required for auth and Redis access. Returns: A JSON string. On success ``success: True`` plus the stored profile name and the echoed non-secret connection fields; on failure ``success: False`` with an ``error`` describing the missing context, privilege, or validation problem. """ if ctx is None: return json.dumps({"success": False, "error": "No tool context."}) auth_err = await check_unsandboxed_exec(ctx) if auth_err: return auth_err if not host or not host.strip(): return json.dumps({"success": False, "error": "host is required."}) if not username or not username.strip(): return json.dumps({"success": False, "error": "username is required."}) tnorm = (transport or "ntlm").strip().lower() if tnorm != "kerberos" and (password is None or str(password).strip() == ""): return json.dumps( { "success": False, "error": "password is required for this transport (kerberos may omit).", } ) p = port if port is not None else (5986 if use_ssl else 5985) data = { "host": host.strip(), "username": username.strip(), "password": str(password), "port": int(p), "use_ssl": bool(use_ssl), "transport": tnorm, "server_cert_validation": (server_cert_validation or "ignore").strip().lower(), "connect_timeout": int(connect_timeout), } extra = await _cred_save(CRED_PREFIX, profile, data, ctx) out = json.loads(extra) if out.get("success"): out["host"] = data["host"] out["username"] = data["username"] out["has_password"] = bool(str(password).strip()) out["port"] = data["port"] out["use_ssl"] = data["use_ssl"] out["transport"] = data["transport"] return json.dumps(out) async def _winrm_list_credentials(ctx: "ToolContext | None" = None) -> str: """Handler for ``winrm_list_credentials``: list the user's saved profile names. Returns the names of stored WinRM credential profiles without exposing any secrets. Gates on :func:`check_unsandboxed_exec`, then delegates to ``tools._credential_profile_store.list_profile_names`` (imported as ``_cred_list``) with ``CRED_PREFIX = "winrm"``, which reads the field names of the Redis hash ``stargazer:winrm_credentials:{user_id}``. Called by: the ``winrm_list_credentials`` tool handler in ``TOOLS``; no direct internal callers. Args: ctx: Active tool context; required for auth and Redis access. Returns: A JSON string listing the profile names, or an error object when context is missing or the privilege check fails. """ if ctx is None: return json.dumps({"success": False, "error": "No tool context."}) auth_err = await check_unsandboxed_exec(ctx) if auth_err: return auth_err return await _cred_list(CRED_PREFIX, ctx) async def _winrm_delete_credentials( profile: str = "default", ctx: "ToolContext | None" = None, ) -> str: """Handler for ``winrm_delete_credentials``: remove a saved credential profile. Deletes one named WinRM profile belonging to the calling user. Gates on :func:`check_unsandboxed_exec`, then delegates to ``tools._credential_profile_store.delete_profile`` (imported as ``_cred_delete``) with ``CRED_PREFIX = "winrm"``, which removes the field from the Redis hash ``stargazer:winrm_credentials:{user_id}``. Called by: the ``winrm_delete_credentials`` tool handler in ``TOOLS``; no direct internal callers. Args: profile: Name of the profile to delete (default ``"default"``). ctx: Active tool context; required for auth and Redis access. Returns: A JSON string reporting the deletion result, or an error object when context is missing or the privilege check fails. """ if ctx is None: return json.dumps({"success": False, "error": "No tool context."}) auth_err = await check_unsandboxed_exec(ctx) if auth_err: return auth_err return await _cred_delete(CRED_PREFIX, profile, ctx) async def _winrm_connect( host: str = "", username: str = "", password: str = "", connection_name: str = "default", port: int | None = None, use_ssl: bool = False, transport: str = "ntlm", server_cert_validation: str = "ignore", connect_timeout: int = DEFAULT_CONNECT_TIMEOUT, credential_profile: str = "", ctx: "ToolContext | None" = None, ) -> str: """Handler for ``winrm_connect``: open and register a named WinRM session. Resolves connection settings (optionally merging in a saved ``credential_profile``, with explicit kwargs taking precedence), validates host/username/transport/cert policy/port, then builds a pywinrm session and stores it in the module-level ``_winrm_sessions`` registry keyed by ``connection_name`` so later ``winrm_run_ps``/``winrm_run_cmd`` calls can reuse it. Interactions: gates on :func:`check_unsandboxed_exec`; when a profile is given, loads it via ``_cred_load`` (decrypting from Redis hash ``stargazer:winrm_credentials:{user_id}``) and merges with ``_cred_merge``; validates the host with :func:`_host_ok` and the transport against ``_VALID_TRANSPORTS``; constructs the session by off-loading :func:`_sync_session_connect` to a thread under an ``asyncio.wait_for`` timeout; records the session (with its ``owner_id`` set to ``ctx.user_id``) in ``_winrm_sessions`` and logs success. Refuses to overwrite an existing connection name. Called by: the ``winrm_connect`` tool handler in ``TOOLS`` and exercised directly in ``tests/test_winrm_tools.py`` and ``tests/test_credential_profile_merges.py``; no production internal callers. Args: host: Hostname or IP (optional if ``credential_profile`` supplies it). username: Auth principal (optional if a profile supplies it). password: Password; required unless ``transport`` is ``kerberos``. connection_name: Label for the registered session (default ``"default"``). port: WinRM port; defaults to 5986 with SSL else 5985. use_ssl: Whether to use HTTPS WinRM. transport: Auth transport; validated against ``_VALID_TRANSPORTS``. server_cert_validation: ``"validate"`` or ``"ignore"``. connect_timeout: Connect timeout in seconds. credential_profile: Name of a saved profile to seed settings from. ctx: Active tool context; required for auth, Redis and ownership. Returns: A JSON string. On success ``success: True`` with the connection name, target URL and transport; on failure ``success: False`` with an ``error`` describing the missing dependency/context/privilege, a validation problem, a name collision, a timeout, or the underlying connection exception. """ if ctx is None: return json.dumps({"success": False, "error": "No tool context."}) if not WINRM_AVAILABLE: return json.dumps( { "success": False, "error": "pywinrm is not installed. Add: pip install pywinrm", } ) auth_err = await check_unsandboxed_exec(ctx) if auth_err: return auth_err user_id = getattr(ctx, "user_id", "") or "" if credential_profile and str(credential_profile).strip(): loaded = await _cred_load(CRED_PREFIX, credential_profile.strip(), ctx) if isinstance(loaded, str): return json.dumps({"success": False, "error": loaded}) merged = _cred_merge( loaded, { "host": host, "username": username, "password": password, "port": port, "use_ssl": use_ssl, "transport": transport, "server_cert_validation": server_cert_validation, "connect_timeout": connect_timeout, }, ) host = str(merged.get("host") or "") username = str(merged.get("username") or "") password = str(merged.get("password") or "") if merged.get("port") is not None: port = merged.get("port") # type: ignore[assignment] if "use_ssl" in merged: use_ssl = bool(merged["use_ssl"]) if merged.get("transport"): transport = str(merged["transport"]) if merged.get("server_cert_validation"): server_cert_validation = str(merged["server_cert_validation"]) if merged.get("connect_timeout") is not None: connect_timeout = int(merged["connect_timeout"]) if not _host_ok(host): return json.dumps({"success": False, "error": "Invalid or empty host."}) if not username or not username.strip(): return json.dumps({"success": False, "error": "Username is required."}) tnorm = (transport or "ntlm").strip().lower() if tnorm not in _VALID_TRANSPORTS: return json.dumps( { "success": False, "error": f"Invalid transport. Use one of: {', '.join(sorted(_VALID_TRANSPORTS))}", } ) if tnorm != "kerberos" and (password is None or str(password).strip() == ""): return json.dumps( { "success": False, "error": "password is required for this transport (kerberos may omit).", } ) scv = (server_cert_validation or "ignore").strip().lower() if scv not in ("validate", "ignore"): return json.dumps( { "success": False, "error": "server_cert_validation must be 'validate' or 'ignore'.", } ) if connection_name in _winrm_sessions: ex = _winrm_sessions[connection_name] return json.dumps( { "success": False, "error": ( f"Connection '{connection_name}' already exists " f"({ex.get('username')}@{ex.get('target')}). Disconnect first or pick another name." ), } ) use_ssl_b = bool(use_ssl) p = port if p is None: p = 5986 if use_ssl_b else 5985 if not isinstance(p, int) or p < 1 or p > 65535: return json.dumps({"success": False, "error": "port must be 1–65535."}) scheme = "https" if use_ssl_b else "http" target = f"{scheme}://{host.strip()}:{p}/wsman" # Protocol requires read_timeout_sec > operation_timeout_sec (both seconds). op_ts = max(1, int(connect_timeout)) read_ts = op_ts + 15 try: session = await asyncio.wait_for( asyncio.to_thread( _sync_session_connect, target, username.strip(), password, tnorm, scv, read_ts, op_ts, ), timeout=float(connect_timeout + 30), ) except asyncio.TimeoutError: return json.dumps( { "success": False, "error": f"Connection timed out after {connect_timeout}s.", } ) except Exception as exc: logger.exception("WinRM connect failed: %s", exc) return json.dumps({"success": False, "error": f"Connection failed: {exc}"}) _winrm_sessions[connection_name] = { "session": session, "target": target, "host": host.strip(), "port": p, "username": username.strip(), "transport": tnorm, "owner_id": user_id, } logger.info( "WinRM connected %s as '%s' by user %s", target, connection_name, user_id, ) return json.dumps( { "success": True, "message": f"Connected WinRM to {target}", "connection_name": connection_name, "target": target, "transport": tnorm, } ) async def _winrm_run_ps( script: str, connection_name: str = "default", timeout: int = DEFAULT_COMMAND_TIMEOUT, ctx: "ToolContext | None" = None, ) -> str: """Handler for ``winrm_run_ps``: run a PowerShell script on a named session. Looks up the caller's WinRM session and executes the supplied PowerShell script block remotely, returning its stdout/stderr/exit code. Interactions: gates on :func:`check_unsandboxed_exec`; resolves and ownership-checks the session via :func:`resolve_owned_winrm_session`; runs the script through :func:`execute_winrm_ps` (which off-loads the blocking pywinrm ``run_ps`` call to a thread under a timeout). Called by: the ``winrm_run_ps`` tool handler in ``TOOLS`` and ``tests/test_winrm_tools.py``; no production internal callers. Args: script: PowerShell script text to execute (required, non-empty). connection_name: Name of the session to run on (default ``"default"``). timeout: Maximum execution time in seconds. ctx: Active tool context; required for auth and ownership. Returns: A JSON string. On success ``success: True`` with the connection name, ``stdout``, ``stderr`` and ``status_code``; on failure ``success: False`` with an ``error`` (missing dependency/context/privilege, empty script, unknown/unowned connection, or an execution error/timeout). """ if ctx is None: return json.dumps({"success": False, "error": "No tool context."}) if not WINRM_AVAILABLE: return json.dumps({"success": False, "error": "pywinrm is not installed."}) auth_err = await check_unsandboxed_exec(ctx) if auth_err: return auth_err user_id = getattr(ctx, "user_id", "") or "" if not script or not str(script).strip(): return json.dumps({"success": False, "error": "script is required."}) sess, sess_err = resolve_owned_winrm_session(connection_name, user_id) if sess_err is not None: return sess_err result = await execute_winrm_ps(sess, script, timeout) if not result.get("success"): return json.dumps({"success": False, "error": result.get("error", "unknown")}) return json.dumps( { "success": True, "connection_name": connection_name, "stdout": result["stdout"], "stderr": result["stderr"], "status_code": result["status_code"], } ) async def _winrm_run_cmd( command: str, arguments: list[str] | None = None, connection_name: str = "default", timeout: int = DEFAULT_COMMAND_TIMEOUT, ctx: "ToolContext | None" = None, ) -> str: """Handler for ``winrm_run_cmd``: run a CMD executable on a named session. Executes a plain Windows command (no PowerShell wrapper) with optional arguments on the caller's WinRM session, returning its stdout/stderr/exit code. Interactions: gates on :func:`check_unsandboxed_exec`; performs its own existence and ownership checks against ``_winrm_sessions`` using :func:`_user_sessions` and :func:`_is_owner` (rather than ``resolve_owned_winrm_session``); off-loads the blocking pywinrm ``run_cmd`` call to a thread under an ``asyncio.wait_for`` timeout, configuring the session protocol read/operation timeouts; decodes output via :func:`_decode_out`. Called by: the ``winrm_run_cmd`` tool handler in ``TOOLS``; no internal callers found. Args: command: Executable name to run, e.g. ``ipconfig.exe`` (required). arguments: Optional list of string arguments (defaults to empty). connection_name: Name of the session to run on (default ``"default"``). timeout: Maximum execution time in seconds. ctx: Active tool context; required for auth and ownership. Returns: A JSON string. On success ``success: True`` with the connection name, ``stdout``, ``stderr`` and ``status_code``; on failure ``success: False`` with an ``error`` (missing dependency/context/privilege, empty command, unknown/unowned connection, or an execution error/timeout). """ if ctx is None: return json.dumps({"success": False, "error": "No tool context."}) if not WINRM_AVAILABLE: return json.dumps({"success": False, "error": "pywinrm is not installed."}) auth_err = await check_unsandboxed_exec(ctx) if auth_err: return auth_err user_id = getattr(ctx, "user_id", "") or "" if not command or not str(command).strip(): return json.dumps({"success": False, "error": "command is required."}) args = arguments or [] if connection_name not in _winrm_sessions: mine = list(_user_sessions(user_id).keys()) if mine: return json.dumps( { "success": False, "error": f"No connection '{connection_name}'. Your connections: {mine}", } ) return json.dumps( { "success": False, "error": "No active WinRM sessions. Use winrm_connect first.", } ) if not _is_owner(connection_name, user_id): return json.dumps({"success": False, "error": "Not your connection."}) sess = _winrm_sessions[connection_name]["session"] op_ts = max(1, int(timeout)) read_ts = op_ts + 30 def _run() -> Any: """Synchronously run the CMD *command* with *args* on the pywinrm session. Blocking helper that sets the WS-Man protocol's ``read_timeout_sec`` and ``operation_timeout_sec`` on the captured *sess* (from the ``read_ts``/``op_ts`` computed by the enclosing :func:`_winrm_run_cmd`), then calls the blocking ``sess.run_cmd(command.strip(), args)`` to execute a plain Windows command (no PowerShell wrapper) on the remote host over HTTP/HTTPS. This closure is off-loaded to a worker thread via ``asyncio.to_thread`` by :func:`_winrm_run_cmd` so the event loop is never blocked; it has no other callers. Returns: The pywinrm ``Response`` object exposing ``std_out``, ``std_err`` and ``status_code``. Raises: Exception: Any error raised by pywinrm during the remote call is propagated to :func:`_winrm_run_cmd`, which catches it. """ sess.protocol.read_timeout_sec = read_ts sess.protocol.operation_timeout_sec = op_ts return sess.run_cmd(command.strip(), args) try: resp = await asyncio.wait_for( asyncio.to_thread(_run), timeout=float(timeout + 35) ) except asyncio.TimeoutError: return json.dumps( { "success": False, "error": f"Command execution timed out after {timeout}s.", } ) except Exception as exc: logger.exception("winrm_run_cmd failed: %s", exc) return json.dumps({"success": False, "error": str(exc)}) out = _decode_out(resp.std_out) err = _decode_out(resp.std_err) code = getattr(resp, "status_code", 0) return json.dumps( { "success": True, "connection_name": connection_name, "stdout": out, "stderr": err, "status_code": code, } ) async def _winrm_disconnect( connection_name: str = "default", ctx: "ToolContext | None" = None, ) -> str: """Handler for ``winrm_disconnect``: close one named WinRM session. Removes a single session owned by the caller from the in-memory ``_winrm_sessions`` registry (pywinrm uses no persistent transport, so forgetting the entry is the disconnect). Interactions: gates on :func:`check_unsandboxed_exec`; verifies ownership with :func:`_is_owner` before deleting the registry entry. Called by: the ``winrm_disconnect`` tool handler in ``TOOLS``; no internal callers found. Args: connection_name: Name of the session to close (default ``"default"``). ctx: Active tool context; required for auth and ownership. Returns: A JSON string with ``success: True`` and a confirmation message, or ``success: False`` with an ``error`` when context/privilege is missing or the connection is unknown or not owned by the caller. """ if ctx is None: return json.dumps({"success": False, "error": "No tool context."}) auth_err = await check_unsandboxed_exec(ctx) if auth_err: return auth_err user_id = getattr(ctx, "user_id", "") or "" if connection_name not in _winrm_sessions: return json.dumps( {"success": False, "error": f"No connection '{connection_name}'."} ) if not _is_owner(connection_name, user_id): return json.dumps({"success": False, "error": "Not your connection."}) del _winrm_sessions[connection_name] return json.dumps( {"success": True, "message": f"Disconnected '{connection_name}'."} ) async def _winrm_disconnect_all(ctx: "ToolContext | None" = None) -> str: """Handler for ``winrm_disconnect_all``: close every session owned by the user. Removes all of the caller's entries from the in-memory ``_winrm_sessions`` registry in one call, leaving other users' sessions untouched. Interactions: gates on :func:`check_unsandboxed_exec`; filters ``_winrm_sessions`` by ``owner_id`` and deletes each matching entry. Called by: the ``winrm_disconnect_all`` tool handler in ``TOOLS``; no internal callers found. Args: ctx: Active tool context; required for auth and to identify the owner. Returns: A JSON string with ``success: True``, the ``closed`` count and the list of ``names`` removed, or an error object when context/privilege is missing. """ if ctx is None: return json.dumps({"success": False, "error": "No tool context."}) auth_err = await check_unsandboxed_exec(ctx) if auth_err: return auth_err user_id = getattr(ctx, "user_id", "") or "" names = [n for n, i in _winrm_sessions.items() if i.get("owner_id") == user_id] for n in names: del _winrm_sessions[n] return json.dumps({"success": True, "closed": len(names), "names": names}) async def _winrm_list_connections(ctx: "ToolContext | None" = None) -> str: """Handler for ``winrm_list_connections``: list the caller's active sessions. Returns connection metadata (name, target URL, host, port, username, transport) for every WinRM session the user currently owns, omitting any secrets and the live session object. Interactions: gates on :func:`check_unsandboxed_exec`; enumerates the user's sessions via :func:`_user_sessions` over the ``_winrm_sessions`` registry. Called by: the ``winrm_list_connections`` tool handler in ``TOOLS`` and ``tests/test_winrm_tools.py``; no production internal callers. Args: ctx: Active tool context; required for auth and to identify the owner. Returns: A JSON string with ``success: True``, a ``connections`` list of metadata rows and their ``count``, or an error object when context/privilege is missing. """ if ctx is None: return json.dumps({"success": False, "error": "No tool context."}) auth_err = await check_unsandboxed_exec(ctx) if auth_err: return auth_err user_id = getattr(ctx, "user_id", "") or "" rows = [] for name, info in _user_sessions(user_id).items(): rows.append( { "connection_name": name, "target": info.get("target"), "host": info.get("host"), "port": info.get("port"), "username": info.get("username"), "transport": info.get("transport"), } ) return json.dumps({"success": True, "connections": rows, "count": len(rows)}) TOOLS = [ { "name": "winrm_save_credentials", "description": ( "Save WinRM connection settings (host, user, password, port, SSL, transport) " "encrypted per-user under a named profile. Private to the caller. Requires UNSANDBOXED_EXEC." ), "parameters": { "type": "object", "properties": { "host": { "type": "string", "description": "Windows target hostname or IP.", }, "username": { "type": "string", "description": "User principal (e.g. DOMAIN\\\\user).", }, "password": { "type": "string", "description": "Password (omit only for kerberos if pool supports it).", }, "profile": { "type": "string", "description": "Profile name (default: default).", "default": "default", }, "port": { "type": "integer", "description": "5985 or 5986; default from use_ssl.", }, "use_ssl": { "type": "boolean", "description": "Use HTTPS WinRM (5986).", "default": False, }, "transport": { "type": "string", "description": "ntlm, credssp, kerberos, etc.", "default": "ntlm", }, "server_cert_validation": { "type": "string", "description": "validate or ignore", "default": "ignore", }, "connect_timeout": { "type": "integer", "description": "Seconds", "default": 30, }, }, "required": ["host", "username"], }, "handler": _winrm_save_credentials, }, { "name": "winrm_list_credentials", "description": ( "List saved WinRM profile names for this user. Does not expose secrets. " "Requires UNSANDBOXED_EXEC." ), "parameters": {"type": "object", "properties": {}}, "handler": _winrm_list_credentials, }, { "name": "winrm_delete_credentials", "description": "Delete a saved WinRM credential profile. Requires UNSANDBOXED_EXEC.", "parameters": { "type": "object", "properties": { "profile": { "type": "string", "description": "Profile name to delete.", "default": "default", }, }, "required": ["profile"], }, "handler": _winrm_delete_credentials, }, { "name": "winrm_connect", "description": ( "Connect to a Windows host via WinRM (WS-Man, PowerShell remoting transport). " "Uses HTTP(S) on 5985/5986. Named persistent session for winrm_run_ps / winrm_run_cmd. " "Requires UNSANDBOXED_EXEC. Optionally set credential_profile to load a saved bundle; " "explicit parameters override the profile." ), "parameters": { "type": "object", "properties": { "host": { "type": "string", "description": "Hostname or IP (optional if credential_profile set).", }, "username": { "type": "string", "description": "User principal (optional if credential_profile set).", }, "password": { "type": "string", "description": "Password (omit only if using kerberos with external auth).", }, "connection_name": { "type": "string", "description": "Label for this session (default: default).", }, "credential_profile": { "type": "string", "description": "Load host/user/password from this saved profile; kwargs override.", "default": "", }, "port": { "type": "integer", "description": "5985 HTTP or 5986 HTTPS; default by use_ssl.", }, "use_ssl": { "type": "boolean", "description": "If true, use https://host:5986 (typical for encrypted WinRM).", }, "transport": { "type": "string", "description": "Auth transport: ntlm, credssp, basic, kerberos, etc. Default ntlm.", }, "server_cert_validation": { "type": "string", "description": "validate or ignore (default ignore for self-signed).", }, "connect_timeout": { "type": "integer", "description": "Connect timeout seconds (default 30).", }, }, "required": [], }, "handler": _winrm_connect, }, { "name": "winrm_run_ps", "description": ( "Run a PowerShell script block on the connected WinRM session (encoded command). " "Requires UNSANDBOXED_EXEC." ), "parameters": { "type": "object", "properties": { "script": {"type": "string", "description": "PowerShell to execute."}, "connection_name": { "type": "string", "description": "Session name (default: default).", }, "timeout": { "type": "integer", "description": "Max seconds (default 120).", }, }, "required": ["script"], }, "handler": _winrm_run_ps, }, { "name": "winrm_run_cmd", "description": ( "Run a CMD executable with optional arguments via WinRM (no PowerShell wrapper). " "Requires UNSANDBOXED_EXEC." ), "parameters": { "type": "object", "properties": { "command": { "type": "string", "description": "Executable name (e.g. ipconfig.exe).", }, "arguments": { "type": "array", "items": {"type": "string"}, "description": "Optional argument list.", }, "connection_name": { "type": "string", "description": "Session name (default: default).", }, "timeout": { "type": "integer", "description": "Max seconds (default 120).", }, }, "required": ["command"], }, "handler": _winrm_run_cmd, }, { "name": "winrm_disconnect", "description": "Close a named WinRM session. Requires UNSANDBOXED_EXEC.", "parameters": { "type": "object", "properties": { "connection_name": { "type": "string", "description": "Session name (default: default).", }, }, }, "handler": _winrm_disconnect, }, { "name": "winrm_disconnect_all", "description": "Close all of your WinRM sessions. Requires UNSANDBOXED_EXEC.", "parameters": {"type": "object", "properties": {}}, "handler": _winrm_disconnect_all, }, { "name": "winrm_list_connections", "description": "List your active WinRM sessions. Requires UNSANDBOXED_EXEC.", "parameters": {"type": "object", "properties": {}}, "handler": _winrm_list_connections, }, ]