Source code for tools.tor_transproxy_tools

"""Per-process transparent Tor proxy via cgroup v2 + iptables NAT.

Moves a PID into a dedicated cgroup v2 slice and adds OUTPUT nat rules that
REDIRECT TCP and DNS (UDP/53) to Tor's TransPort and DNSPort. Linux iptables
cannot match by PID directly; cgroup path matching is used instead.

Requires: Tor with TransPort/DNSPort (see /etc/tor/torrc), cgroup v2 mounted
at /sys/fs/cgroup, iptables xt_cgroup match ( cgroup directory must exist before
adding rules). Transparent TCP only except DNS; other UDP is not routed via Tor.

All tools require the UNSANDBOXED_EXEC privilege.
"""

from __future__ import annotations

import asyncio
import ipaddress
import json
import logging
import re
from pathlib import Path
from typing import Any

from tools._stargazer_pid_cgroup import (
    CGROUP_MOUNT,
    LEGACY_TOR_CGROUP_PARENT,
    STARGAZER_PID_NET_PARENT,
    assign_pid_cgroup,
    cgroup_parent_path,
    cgroup_rel_path,
    release_pid_cgroup,
)

logger = logging.getLogger(__name__)

TORRC = Path("/etc/tor/torrc")
NAT_CHAIN = "STARGAZER_TOR_TRANSPROXY"
STATE_DIR = Path("/var/lib/stargazer")
STATE_FILE = STATE_DIR / "tor_transproxy_state.json"

_DEFAULT_TRANS_PORT = 9040
_DEFAULT_DNS_PORT = 5353


# ---------------------------------------------------------------------------
# Privilege + subprocess (match tor_tools / firewall_tools)
# ---------------------------------------------------------------------------

async def _check_priv(ctx: Any) -> str | None:
    try:
        from tools.alter_privileges import PRIVILEGES, has_privilege

        redis = getattr(ctx, "redis", None)
        config = getattr(ctx, "config", None)
        user_id = getattr(ctx, "user_id", "") or ""
        if not await has_privilege(redis, user_id, PRIVILEGES["UNSANDBOXED_EXEC"], config):
            return json.dumps({
                "success": False,
                "error": (
                    "The user does not have the UNSANDBOXED_EXEC privilege. "
                    "Ask an admin to grant it with the alter_privileges tool."
                ),
            })
    except ImportError:
        return json.dumps({"success": False, "error": "Privilege system unavailable."})
    return None


async def _run_cmd(cmd: list[str], timeout: int = 30, stdin_data: str = "") -> tuple[bool, str, str]:
    try:
        proc = await asyncio.create_subprocess_exec(
            *cmd,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE,
            stdin=asyncio.subprocess.PIPE if stdin_data else None,
        )
        stdin_bytes = stdin_data.encode() if stdin_data else None
        stdout_b, stderr_b = await asyncio.wait_for(
            proc.communicate(input=stdin_bytes), timeout=timeout
        )
        return (
            proc.returncode == 0,
            stdout_b.decode(errors="replace").strip(),
            stderr_b.decode(errors="replace").strip(),
        )
    except asyncio.TimeoutError:
        return False, "", f"Command timed out after {timeout}s."
    except FileNotFoundError as exc:
        return False, "", f"Command not found: {exc}"
    except Exception as exc:
        logger.error("_run_cmd %s: %s", cmd, exc, exc_info=True)
        return False, "", str(exc)


# ---------------------------------------------------------------------------
# torrc + state
# ---------------------------------------------------------------------------

def _extract_port_from_tor_directive(line: str, directive: str) -> int | None:
    rest = re.sub(rf"^{re.escape(directive)}\s+", "", line.strip(), flags=re.IGNORECASE)
    for tok in rest.split():
        if tok.startswith("#"):
            break
        lower = tok.lower()
        if lower in frozenset(
            "nofetchisolateclientaddr isolateclientaddr isolatesessionprotocol "
            "ipv6traffic preferipv6 autoipv6".split()
        ):
            continue
        if ":" in tok:
            try:
                return int(tok.rsplit(":", 1)[-1])
            except ValueError:
                continue
        if tok.isdigit():
            return int(tok)
    return None


[docs] def parse_tor_trans_dns_ports() -> tuple[int, int]: """Read TransPort and DNSPort from torrc; fall back to Tor defaults.""" trans, dns = _DEFAULT_TRANS_PORT, _DEFAULT_DNS_PORT if not TORRC.exists(): return trans, dns for raw in TORRC.read_text().splitlines(): s = raw.strip() if not s or s.startswith("#"): continue if re.match(r"TransPort\b", s, re.I): p = _extract_port_from_tor_directive(s, "TransPort") if p is not None: trans = p elif re.match(r"DNSPort\b", s, re.I): p = _extract_port_from_tor_directive(s, "DNSPort") if p is not None: dns = p return trans, dns
def _load_state() -> dict[str, Any]: if not STATE_FILE.is_file(): return {"managed": {}} try: return json.loads(STATE_FILE.read_text(encoding="utf-8")) except (json.JSONDecodeError, OSError) as e: logger.warning("tor transproxy state read failed: %s", e) return {"managed": {}} def _save_state(state: dict[str, Any]) -> None: STATE_DIR.mkdir(parents=True, exist_ok=True) STATE_FILE.write_text(json.dumps(state, indent=2), encoding="utf-8") def _validate_cidrs(nets: list[str]) -> str | None: for n in nets: try: ipaddress.ip_network(n, strict=False) except ValueError: return f"Invalid CIDR: {n!r}" return None # --------------------------------------------------------------------------- # iptables + cgroup helpers # --------------------------------------------------------------------------- async def _iptables_chain_exists() -> bool: ok, _, _ = await _run_cmd(["iptables", "-t", "nat", "-S", NAT_CHAIN], timeout=10) return ok async def _ensure_nat_chain_and_jump() -> tuple[bool, str]: if not await _iptables_chain_exists(): ok, _, err = await _run_cmd(["iptables", "-t", "nat", "-N", NAT_CHAIN], timeout=10) if not ok: return False, err or "Failed to create nat chain." ok, _, _ = await _run_cmd( ["iptables", "-t", "nat", "-C", "OUTPUT", "-j", NAT_CHAIN], timeout=10, ) if ok: return True, "" ok, _, err = await _run_cmd( ["iptables", "-t", "nat", "-A", "OUTPUT", "-j", NAT_CHAIN], timeout=10, ) if not ok: return False, err or "Failed to add OUTPUT jump." return True, "" async def _iptables_append(spec: list[str]) -> tuple[bool, str]: cmd = ["iptables", "-t", "nat", "-A", NAT_CHAIN, *spec] ok, _, err = await _run_cmd(cmd, timeout=10) return ok, err or "" async def _iptables_delete(spec: list[str]) -> tuple[bool, str]: cmd = ["iptables", "-t", "nat", "-D", NAT_CHAIN, *spec] ok, _, err = await _run_cmd(cmd, timeout=10) if not ok: return False, err or "" return True, "" async def _iptables_delete_rules_matching_pid(pid: int) -> list[str]: """Best-effort: remove nat chain rules whose comment contains stg-tor-{pid}-.""" errors: list[str] = [] ok, out, err = await _run_cmd(["iptables", "-t", "nat", "-S", NAT_CHAIN], timeout=10) if not ok: return [err or "iptables -S chain failed (chain may not exist)."] prefix = f"-A {NAT_CHAIN} " needle = f"stg-tor-{pid}-" for line in out.splitlines(): if not line.startswith(prefix) or needle not in line: continue spec = line[len(prefix) :].strip().split() ok_d, e_d = await _iptables_delete(spec) if not ok_d: errors.append(e_d or "delete failed") return errors async def _remove_output_jump_if_unused() -> None: state = _load_state() if state.get("managed"): return ok, _, _ = await _run_cmd( ["iptables", "-t", "nat", "-C", "OUTPUT", "-j", NAT_CHAIN], timeout=10, ) if ok: await _run_cmd(["iptables", "-t", "nat", "-D", "OUTPUT", "-j", NAT_CHAIN], timeout=10) def _build_rule_specs( pid: int, cgroup_path: str, trans_port: int, dns_port: int, exclude_nets: list[str], ) -> list[list[str]]: """Each inner list is argv tail for iptables -A/-D CHAIN.""" specs: list[list[str]] = [] for i, net in enumerate(exclude_nets): specs.append( [ "-d", str(ipaddress.ip_network(net, strict=False)), "-m", "cgroup", "--path", cgroup_path, "-m", "comment", "--comment", f"stg-tor-{pid}-ex-{i}", "-j", "RETURN", ] ) specs.append( [ "-p", "tcp", "-m", "cgroup", "--path", cgroup_path, "-m", "comment", "--comment", f"stg-tor-{pid}-tcp", "-j", "REDIRECT", "--to-ports", str(trans_port), ] ) specs.append( [ "-p", "udp", "--dport", "53", "-m", "cgroup", "--path", cgroup_path, "-m", "comment", "--comment", f"stg-tor-{pid}-dns", "-j", "REDIRECT", "--to-ports", str(dns_port), ] ) return specs async def _assign_pid_cgroup(pid: int) -> tuple[bool, str]: return assign_pid_cgroup(pid) async def _release_pid_cgroup(pid: int) -> tuple[bool, str]: return release_pid_cgroup(pid) # --------------------------------------------------------------------------- # Handlers # --------------------------------------------------------------------------- async def _tor_transproxy_status(ctx: Any = None) -> str: if ctx is None: return json.dumps({"success": False, "error": "No context."}) auth_err = await _check_priv(ctx) if auth_err: return auth_err trans_port, dns_port = parse_tor_trans_dns_ports() cgroup_ok = CGROUP_MOUNT.is_dir() and (CGROUP_MOUNT / "cgroup.controllers").is_file() chain_ok = await _iptables_chain_exists() ok_cgroup_match = False cgroup_match_err = "" if cgroup_ok: test_leaf = CGROUP_MOUNT / "stargazer-iptables-probe" try: test_leaf.mkdir(exist_ok=True) ok_probe, _, e2 = await _run_cmd( [ "iptables", "-t", "nat", "-A", "OUTPUT", "-m", "cgroup", "--path", "stargazer-iptables-probe", "-j", "RETURN", ], timeout=10, ) if ok_probe: ok_cgroup_match = True await _run_cmd( [ "iptables", "-t", "nat", "-D", "OUTPUT", "-m", "cgroup", "--path", "stargazer-iptables-probe", "-j", "RETURN", ], timeout=10, ) else: cgroup_match_err = e2 or "iptables cgroup match failed" except OSError: cgroup_match_err = "Could not create probe cgroup directory" finally: try: test_leaf.rmdir() except OSError: pass ok_save, save_out, _ = await _run_cmd(["iptables-save", "-t", "nat"], timeout=15) chain_dump = "" if ok_save: chain_dump = "\n".join( ln for ln in save_out.splitlines() if NAT_CHAIN in ln or f"{STARGAZER_PID_NET_PARENT}/" in ln or f"{LEGACY_TOR_CGROUP_PARENT}/" in ln ) ok_ss, ss_out, _ = await _run_cmd( ["ss", "-lnptuH"], timeout=10 ) listeners = [] if ok_ss: for line in ss_out.splitlines(): if f":{trans_port}" in line or f":{dns_port}" in line: listeners.append(line.strip()) state = _load_state() managed = state.get("managed", {}) legacy_paths = any( LEGACY_TOR_CGROUP_PARENT in str(v.get("cgroup_rel_path", "")) for v in managed.values() ) return json.dumps({ "success": True, "torrc_trans_port": trans_port, "torrc_dns_port": dns_port, "cgroup_parent": STARGAZER_PID_NET_PARENT, "legacy_tor_cgroup_paths_in_state": legacy_paths, "cgroup_v2_mounted": cgroup_ok, "iptables_cgroup_match_ok": ok_cgroup_match, "iptables_cgroup_match_error": cgroup_match_err or None, "nat_chain_exists": chain_ok, "tor_listeners": listeners, "managed_pids": list(managed.keys()), "managed_detail": managed, "nat_chain_matching_lines": chain_dump or "(none)", "note": ( "UDP other than DNS/53 is not sent through Tor. " "These tools edit iptables nat directly (not UFW). " f"Per-PID cgroup leaf is {STARGAZER_PID_NET_PARENT}/<pid> (shared with pid_vpn_route_*). " "If cgroup_rel_path still shows the old tor-proxy name, disable and re-enable to migrate." ), }, indent=2) async def _tor_transproxy_enable( pid: int, exclude_nets: list[str] | None = None, ctx: Any = None, ) -> str: if ctx is None: return json.dumps({"success": False, "error": "No context."}) auth_err = await _check_priv(ctx) if auth_err: return auth_err if pid < 1: return json.dumps({"success": False, "error": "pid must be a positive integer."}) if not Path(f"/proc/{pid}").is_dir(): return json.dumps({"success": False, "error": f"Process {pid} does not exist."}) nets = list(exclude_nets) if exclude_nets is not None else ["127.0.0.0/8"] bad = _validate_cidrs(nets) if bad: return json.dumps({"success": False, "error": bad}) state = _load_state() managed = state.setdefault("managed", {}) if str(pid) in managed: return json.dumps({ "success": False, "error": f"PID {pid} is already managed. Use tor_transproxy_disable first.", }) if not CGROUP_MOUNT.is_dir(): return json.dumps({"success": False, "error": "/sys/fs/cgroup not available."}) cg_rel = cgroup_rel_path(pid) trans_port, dns_port = parse_tor_trans_dns_ports() specs = _build_rule_specs(pid, cg_rel, trans_port, dns_port, nets) cgroup_parent_path().mkdir(parents=True, exist_ok=True) ok_cg, e_cg = await _assign_pid_cgroup(pid) if not ok_cg: return json.dumps({"success": False, "error": e_cg}) added: list[list[str]] = [] try: ok_j, e_j = await _ensure_nat_chain_and_jump() if not ok_j: raise RuntimeError(e_j) for spec in specs: ok_a, e_a = await _iptables_append(spec) if not ok_a: raise RuntimeError(e_a or "iptables -A failed") added.append(spec) except Exception as exc: while added: spec = added.pop() await _iptables_delete(spec) await _release_pid_cgroup(pid) return json.dumps({"success": False, "error": str(exc)}) managed[str(pid)] = { "cgroup_rel_path": cg_rel, "exclude_nets": nets, "trans_port": trans_port, "dns_port": dns_port, "rule_specs": specs, } _save_state(state) return json.dumps({ "success": True, "pid": pid, "cgroup_rel_path": cg_rel, "trans_port": trans_port, "dns_port": dns_port, "exclude_nets": nets, "message": ( f"PID {pid} moved to cgroup {cg_rel}; NAT redirects TCP -> {trans_port}, " f"DNS/udp53 -> {dns_port}." ), }, indent=2) async def _tor_transproxy_disable(pid: int, ctx: Any = None) -> str: if ctx is None: return json.dumps({"success": False, "error": "No context."}) auth_err = await _check_priv(ctx) if auth_err: return auth_err if pid < 1: return json.dumps({"success": False, "error": "pid must be a positive integer."}) state = _load_state() managed = state.setdefault("managed", {}) key = str(pid) entry = managed.get(key) errors: list[str] = [] if entry: for spec in reversed(entry.get("rule_specs", [])): ok_d, err_d = await _iptables_delete(spec) if not ok_d and err_d: errors.append(err_d) if errors: return json.dumps({ "success": False, "pid": pid, "error": "; ".join(errors), "hint": "Rules may be out of sync; inspect iptables -t nat -S STARGAZER_TOR_TRANSPROXY.", }) managed.pop(key, None) elif await _iptables_chain_exists(): scan_errs = await _iptables_delete_rules_matching_pid(pid) if scan_errs: errors.extend(scan_errs) ok_rel, e_rel = await _release_pid_cgroup(pid) if not ok_rel: errors.append(e_rel) _save_state(state) await _remove_output_jump_if_unused() if entry is None and errors: return json.dumps({ "success": False, "pid": pid, "error": ( f"PID {pid} was not in managed state; iptables cleanup failed. " f"Details: {'; '.join(errors)}" ), }) return json.dumps({ "success": True, "pid": pid, "warnings": errors or None, "message": f"Removed Tor transparent proxy rules for PID {pid} and released cgroup.", }, indent=2) # --------------------------------------------------------------------------- # TOOLS # --------------------------------------------------------------------------- TOOLS = [ { "name": "tor_transproxy_status", "description": ( "Report Tor transparent-proxy ports from torrc, listener status, cgroup v2 / " "iptables cgroup match probe, STARGAZER Tor NAT chain snapshot, and managed PIDs. " "Requires UNSANDBOXED_EXEC." ), "parameters": {"type": "object", "properties": {}}, "handler": _tor_transproxy_status, }, { "name": "tor_transproxy_enable", "description": ( "Enable Tor transparent proxy for one process: move PID into cgroup v2 " f"{STARGAZER_PID_NET_PARENT}/<pid> and append iptables nat rules redirecting TCP to Tor " "TransPort and UDP/53 to DNSPort. exclude_nets lists destinations that bypass Tor " "(RETURN); default is 127.0.0.0/8 only. Requires root-equivalent UNSANDBOXED_EXEC." ), "parameters": { "type": "object", "properties": { "pid": {"type": "integer", "description": "Linux process ID to proxy."}, "exclude_nets": { "type": "array", "items": {"type": "string"}, "description": ( "Optional list of IPv4 CIDRs to exclude from redirection (RETURN). " "Default: [\"127.0.0.0/8\"]." ), }, }, "required": ["pid"], }, "handler": _tor_transproxy_enable, }, { "name": "tor_transproxy_disable", "description": ( "Disable Tor transparent proxy for a PID: delete matching nat rules, move the " "task back to the parent cgroup, and remove the leaf cgroup if empty. " "Requires UNSANDBOXED_EXEC." ), "parameters": { "type": "object", "properties": { "pid": {"type": "integer", "description": "Process ID previously enabled."}, }, "required": ["pid"], }, "handler": _tor_transproxy_disable, }, ]