"""Per-process transparent Tor proxy via cgroup v2 + iptables NAT.
Moves a PID into a dedicated cgroup v2 slice and adds OUTPUT nat rules that
REDIRECT TCP and DNS (UDP/53) to Tor's TransPort and DNSPort. Linux iptables
cannot match by PID directly; cgroup path matching is used instead.
Requires: Tor with TransPort/DNSPort (see /etc/tor/torrc), cgroup v2 mounted
at /sys/fs/cgroup, iptables xt_cgroup match ( cgroup directory must exist before
adding rules). Transparent TCP only except DNS; other UDP is not routed via Tor.
All tools require the UNSANDBOXED_EXEC privilege.
"""
from __future__ import annotations
import asyncio
import ipaddress
import json
import logging
import re
from pathlib import Path
from typing import Any
from tools._stargazer_pid_cgroup import (
CGROUP_MOUNT,
LEGACY_TOR_CGROUP_PARENT,
STARGAZER_PID_NET_PARENT,
assign_pid_cgroup,
cgroup_parent_path,
cgroup_rel_path,
release_pid_cgroup,
)
logger = logging.getLogger(__name__)
TORRC = Path("/etc/tor/torrc")
NAT_CHAIN = "STARGAZER_TOR_TRANSPROXY"
STATE_DIR = Path("/var/lib/stargazer")
STATE_FILE = STATE_DIR / "tor_transproxy_state.json"
_DEFAULT_TRANS_PORT = 9040
_DEFAULT_DNS_PORT = 5353
# ---------------------------------------------------------------------------
# Privilege + subprocess (match tor_tools / firewall_tools)
# ---------------------------------------------------------------------------
async def _check_priv(ctx: Any) -> str | None:
try:
from tools.alter_privileges import PRIVILEGES, has_privilege
redis = getattr(ctx, "redis", None)
config = getattr(ctx, "config", None)
user_id = getattr(ctx, "user_id", "") or ""
if not await has_privilege(redis, user_id, PRIVILEGES["UNSANDBOXED_EXEC"], config):
return json.dumps({
"success": False,
"error": (
"The user does not have the UNSANDBOXED_EXEC privilege. "
"Ask an admin to grant it with the alter_privileges tool."
),
})
except ImportError:
return json.dumps({"success": False, "error": "Privilege system unavailable."})
return None
async def _run_cmd(cmd: list[str], timeout: int = 30, stdin_data: str = "") -> tuple[bool, str, str]:
try:
proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
stdin=asyncio.subprocess.PIPE if stdin_data else None,
)
stdin_bytes = stdin_data.encode() if stdin_data else None
stdout_b, stderr_b = await asyncio.wait_for(
proc.communicate(input=stdin_bytes), timeout=timeout
)
return (
proc.returncode == 0,
stdout_b.decode(errors="replace").strip(),
stderr_b.decode(errors="replace").strip(),
)
except asyncio.TimeoutError:
return False, "", f"Command timed out after {timeout}s."
except FileNotFoundError as exc:
return False, "", f"Command not found: {exc}"
except Exception as exc:
logger.error("_run_cmd %s: %s", cmd, exc, exc_info=True)
return False, "", str(exc)
# ---------------------------------------------------------------------------
# torrc + state
# ---------------------------------------------------------------------------
def _extract_port_from_tor_directive(line: str, directive: str) -> int | None:
rest = re.sub(rf"^{re.escape(directive)}\s+", "", line.strip(), flags=re.IGNORECASE)
for tok in rest.split():
if tok.startswith("#"):
break
lower = tok.lower()
if lower in frozenset(
"nofetchisolateclientaddr isolateclientaddr isolatesessionprotocol "
"ipv6traffic preferipv6 autoipv6".split()
):
continue
if ":" in tok:
try:
return int(tok.rsplit(":", 1)[-1])
except ValueError:
continue
if tok.isdigit():
return int(tok)
return None
[docs]
def parse_tor_trans_dns_ports() -> tuple[int, int]:
"""Read TransPort and DNSPort from torrc; fall back to Tor defaults."""
trans, dns = _DEFAULT_TRANS_PORT, _DEFAULT_DNS_PORT
if not TORRC.exists():
return trans, dns
for raw in TORRC.read_text().splitlines():
s = raw.strip()
if not s or s.startswith("#"):
continue
if re.match(r"TransPort\b", s, re.I):
p = _extract_port_from_tor_directive(s, "TransPort")
if p is not None:
trans = p
elif re.match(r"DNSPort\b", s, re.I):
p = _extract_port_from_tor_directive(s, "DNSPort")
if p is not None:
dns = p
return trans, dns
def _load_state() -> dict[str, Any]:
if not STATE_FILE.is_file():
return {"managed": {}}
try:
return json.loads(STATE_FILE.read_text(encoding="utf-8"))
except (json.JSONDecodeError, OSError) as e:
logger.warning("tor transproxy state read failed: %s", e)
return {"managed": {}}
def _save_state(state: dict[str, Any]) -> None:
STATE_DIR.mkdir(parents=True, exist_ok=True)
STATE_FILE.write_text(json.dumps(state, indent=2), encoding="utf-8")
def _validate_cidrs(nets: list[str]) -> str | None:
for n in nets:
try:
ipaddress.ip_network(n, strict=False)
except ValueError:
return f"Invalid CIDR: {n!r}"
return None
# ---------------------------------------------------------------------------
# iptables + cgroup helpers
# ---------------------------------------------------------------------------
async def _iptables_chain_exists() -> bool:
ok, _, _ = await _run_cmd(["iptables", "-t", "nat", "-S", NAT_CHAIN], timeout=10)
return ok
async def _ensure_nat_chain_and_jump() -> tuple[bool, str]:
if not await _iptables_chain_exists():
ok, _, err = await _run_cmd(["iptables", "-t", "nat", "-N", NAT_CHAIN], timeout=10)
if not ok:
return False, err or "Failed to create nat chain."
ok, _, _ = await _run_cmd(
["iptables", "-t", "nat", "-C", "OUTPUT", "-j", NAT_CHAIN],
timeout=10,
)
if ok:
return True, ""
ok, _, err = await _run_cmd(
["iptables", "-t", "nat", "-A", "OUTPUT", "-j", NAT_CHAIN],
timeout=10,
)
if not ok:
return False, err or "Failed to add OUTPUT jump."
return True, ""
async def _iptables_append(spec: list[str]) -> tuple[bool, str]:
cmd = ["iptables", "-t", "nat", "-A", NAT_CHAIN, *spec]
ok, _, err = await _run_cmd(cmd, timeout=10)
return ok, err or ""
async def _iptables_delete(spec: list[str]) -> tuple[bool, str]:
cmd = ["iptables", "-t", "nat", "-D", NAT_CHAIN, *spec]
ok, _, err = await _run_cmd(cmd, timeout=10)
if not ok:
return False, err or ""
return True, ""
async def _iptables_delete_rules_matching_pid(pid: int) -> list[str]:
"""Best-effort: remove nat chain rules whose comment contains stg-tor-{pid}-."""
errors: list[str] = []
ok, out, err = await _run_cmd(["iptables", "-t", "nat", "-S", NAT_CHAIN], timeout=10)
if not ok:
return [err or "iptables -S chain failed (chain may not exist)."]
prefix = f"-A {NAT_CHAIN} "
needle = f"stg-tor-{pid}-"
for line in out.splitlines():
if not line.startswith(prefix) or needle not in line:
continue
spec = line[len(prefix) :].strip().split()
ok_d, e_d = await _iptables_delete(spec)
if not ok_d:
errors.append(e_d or "delete failed")
return errors
async def _remove_output_jump_if_unused() -> None:
state = _load_state()
if state.get("managed"):
return
ok, _, _ = await _run_cmd(
["iptables", "-t", "nat", "-C", "OUTPUT", "-j", NAT_CHAIN],
timeout=10,
)
if ok:
await _run_cmd(["iptables", "-t", "nat", "-D", "OUTPUT", "-j", NAT_CHAIN], timeout=10)
def _build_rule_specs(
pid: int,
cgroup_path: str,
trans_port: int,
dns_port: int,
exclude_nets: list[str],
) -> list[list[str]]:
"""Each inner list is argv tail for iptables -A/-D CHAIN."""
specs: list[list[str]] = []
for i, net in enumerate(exclude_nets):
specs.append(
[
"-d",
str(ipaddress.ip_network(net, strict=False)),
"-m",
"cgroup",
"--path",
cgroup_path,
"-m",
"comment",
"--comment",
f"stg-tor-{pid}-ex-{i}",
"-j",
"RETURN",
]
)
specs.append(
[
"-p",
"tcp",
"-m",
"cgroup",
"--path",
cgroup_path,
"-m",
"comment",
"--comment",
f"stg-tor-{pid}-tcp",
"-j",
"REDIRECT",
"--to-ports",
str(trans_port),
]
)
specs.append(
[
"-p",
"udp",
"--dport",
"53",
"-m",
"cgroup",
"--path",
cgroup_path,
"-m",
"comment",
"--comment",
f"stg-tor-{pid}-dns",
"-j",
"REDIRECT",
"--to-ports",
str(dns_port),
]
)
return specs
async def _assign_pid_cgroup(pid: int) -> tuple[bool, str]:
return assign_pid_cgroup(pid)
async def _release_pid_cgroup(pid: int) -> tuple[bool, str]:
return release_pid_cgroup(pid)
# ---------------------------------------------------------------------------
# Handlers
# ---------------------------------------------------------------------------
async def _tor_transproxy_status(ctx: Any = None) -> str:
if ctx is None:
return json.dumps({"success": False, "error": "No context."})
auth_err = await _check_priv(ctx)
if auth_err:
return auth_err
trans_port, dns_port = parse_tor_trans_dns_ports()
cgroup_ok = CGROUP_MOUNT.is_dir() and (CGROUP_MOUNT / "cgroup.controllers").is_file()
chain_ok = await _iptables_chain_exists()
ok_cgroup_match = False
cgroup_match_err = ""
if cgroup_ok:
test_leaf = CGROUP_MOUNT / "stargazer-iptables-probe"
try:
test_leaf.mkdir(exist_ok=True)
ok_probe, _, e2 = await _run_cmd(
[
"iptables",
"-t",
"nat",
"-A",
"OUTPUT",
"-m",
"cgroup",
"--path",
"stargazer-iptables-probe",
"-j",
"RETURN",
],
timeout=10,
)
if ok_probe:
ok_cgroup_match = True
await _run_cmd(
[
"iptables",
"-t",
"nat",
"-D",
"OUTPUT",
"-m",
"cgroup",
"--path",
"stargazer-iptables-probe",
"-j",
"RETURN",
],
timeout=10,
)
else:
cgroup_match_err = e2 or "iptables cgroup match failed"
except OSError:
cgroup_match_err = "Could not create probe cgroup directory"
finally:
try:
test_leaf.rmdir()
except OSError:
pass
ok_save, save_out, _ = await _run_cmd(["iptables-save", "-t", "nat"], timeout=15)
chain_dump = ""
if ok_save:
chain_dump = "\n".join(
ln
for ln in save_out.splitlines()
if NAT_CHAIN in ln
or f"{STARGAZER_PID_NET_PARENT}/" in ln
or f"{LEGACY_TOR_CGROUP_PARENT}/" in ln
)
ok_ss, ss_out, _ = await _run_cmd(
["ss", "-lnptuH"], timeout=10
)
listeners = []
if ok_ss:
for line in ss_out.splitlines():
if f":{trans_port}" in line or f":{dns_port}" in line:
listeners.append(line.strip())
state = _load_state()
managed = state.get("managed", {})
legacy_paths = any(
LEGACY_TOR_CGROUP_PARENT in str(v.get("cgroup_rel_path", ""))
for v in managed.values()
)
return json.dumps({
"success": True,
"torrc_trans_port": trans_port,
"torrc_dns_port": dns_port,
"cgroup_parent": STARGAZER_PID_NET_PARENT,
"legacy_tor_cgroup_paths_in_state": legacy_paths,
"cgroup_v2_mounted": cgroup_ok,
"iptables_cgroup_match_ok": ok_cgroup_match,
"iptables_cgroup_match_error": cgroup_match_err or None,
"nat_chain_exists": chain_ok,
"tor_listeners": listeners,
"managed_pids": list(managed.keys()),
"managed_detail": managed,
"nat_chain_matching_lines": chain_dump or "(none)",
"note": (
"UDP other than DNS/53 is not sent through Tor. "
"These tools edit iptables nat directly (not UFW). "
f"Per-PID cgroup leaf is {STARGAZER_PID_NET_PARENT}/<pid> (shared with pid_vpn_route_*). "
"If cgroup_rel_path still shows the old tor-proxy name, disable and re-enable to migrate."
),
}, indent=2)
async def _tor_transproxy_enable(
pid: int,
exclude_nets: list[str] | None = None,
ctx: Any = None,
) -> str:
if ctx is None:
return json.dumps({"success": False, "error": "No context."})
auth_err = await _check_priv(ctx)
if auth_err:
return auth_err
if pid < 1:
return json.dumps({"success": False, "error": "pid must be a positive integer."})
if not Path(f"/proc/{pid}").is_dir():
return json.dumps({"success": False, "error": f"Process {pid} does not exist."})
nets = list(exclude_nets) if exclude_nets is not None else ["127.0.0.0/8"]
bad = _validate_cidrs(nets)
if bad:
return json.dumps({"success": False, "error": bad})
state = _load_state()
managed = state.setdefault("managed", {})
if str(pid) in managed:
return json.dumps({
"success": False,
"error": f"PID {pid} is already managed. Use tor_transproxy_disable first.",
})
if not CGROUP_MOUNT.is_dir():
return json.dumps({"success": False, "error": "/sys/fs/cgroup not available."})
cg_rel = cgroup_rel_path(pid)
trans_port, dns_port = parse_tor_trans_dns_ports()
specs = _build_rule_specs(pid, cg_rel, trans_port, dns_port, nets)
cgroup_parent_path().mkdir(parents=True, exist_ok=True)
ok_cg, e_cg = await _assign_pid_cgroup(pid)
if not ok_cg:
return json.dumps({"success": False, "error": e_cg})
added: list[list[str]] = []
try:
ok_j, e_j = await _ensure_nat_chain_and_jump()
if not ok_j:
raise RuntimeError(e_j)
for spec in specs:
ok_a, e_a = await _iptables_append(spec)
if not ok_a:
raise RuntimeError(e_a or "iptables -A failed")
added.append(spec)
except Exception as exc:
while added:
spec = added.pop()
await _iptables_delete(spec)
await _release_pid_cgroup(pid)
return json.dumps({"success": False, "error": str(exc)})
managed[str(pid)] = {
"cgroup_rel_path": cg_rel,
"exclude_nets": nets,
"trans_port": trans_port,
"dns_port": dns_port,
"rule_specs": specs,
}
_save_state(state)
return json.dumps({
"success": True,
"pid": pid,
"cgroup_rel_path": cg_rel,
"trans_port": trans_port,
"dns_port": dns_port,
"exclude_nets": nets,
"message": (
f"PID {pid} moved to cgroup {cg_rel}; NAT redirects TCP -> {trans_port}, "
f"DNS/udp53 -> {dns_port}."
),
}, indent=2)
async def _tor_transproxy_disable(pid: int, ctx: Any = None) -> str:
if ctx is None:
return json.dumps({"success": False, "error": "No context."})
auth_err = await _check_priv(ctx)
if auth_err:
return auth_err
if pid < 1:
return json.dumps({"success": False, "error": "pid must be a positive integer."})
state = _load_state()
managed = state.setdefault("managed", {})
key = str(pid)
entry = managed.get(key)
errors: list[str] = []
if entry:
for spec in reversed(entry.get("rule_specs", [])):
ok_d, err_d = await _iptables_delete(spec)
if not ok_d and err_d:
errors.append(err_d)
if errors:
return json.dumps({
"success": False,
"pid": pid,
"error": "; ".join(errors),
"hint": "Rules may be out of sync; inspect iptables -t nat -S STARGAZER_TOR_TRANSPROXY.",
})
managed.pop(key, None)
elif await _iptables_chain_exists():
scan_errs = await _iptables_delete_rules_matching_pid(pid)
if scan_errs:
errors.extend(scan_errs)
ok_rel, e_rel = await _release_pid_cgroup(pid)
if not ok_rel:
errors.append(e_rel)
_save_state(state)
await _remove_output_jump_if_unused()
if entry is None and errors:
return json.dumps({
"success": False,
"pid": pid,
"error": (
f"PID {pid} was not in managed state; iptables cleanup failed. "
f"Details: {'; '.join(errors)}"
),
})
return json.dumps({
"success": True,
"pid": pid,
"warnings": errors or None,
"message": f"Removed Tor transparent proxy rules for PID {pid} and released cgroup.",
}, indent=2)
# ---------------------------------------------------------------------------
# TOOLS
# ---------------------------------------------------------------------------
TOOLS = [
{
"name": "tor_transproxy_status",
"description": (
"Report Tor transparent-proxy ports from torrc, listener status, cgroup v2 / "
"iptables cgroup match probe, STARGAZER Tor NAT chain snapshot, and managed PIDs. "
"Requires UNSANDBOXED_EXEC."
),
"parameters": {"type": "object", "properties": {}},
"handler": _tor_transproxy_status,
},
{
"name": "tor_transproxy_enable",
"description": (
"Enable Tor transparent proxy for one process: move PID into cgroup v2 "
f"{STARGAZER_PID_NET_PARENT}/<pid> and append iptables nat rules redirecting TCP to Tor "
"TransPort and UDP/53 to DNSPort. exclude_nets lists destinations that bypass Tor "
"(RETURN); default is 127.0.0.0/8 only. Requires root-equivalent UNSANDBOXED_EXEC."
),
"parameters": {
"type": "object",
"properties": {
"pid": {"type": "integer", "description": "Linux process ID to proxy."},
"exclude_nets": {
"type": "array",
"items": {"type": "string"},
"description": (
"Optional list of IPv4 CIDRs to exclude from redirection (RETURN). "
"Default: [\"127.0.0.0/8\"]."
),
},
},
"required": ["pid"],
},
"handler": _tor_transproxy_enable,
},
{
"name": "tor_transproxy_disable",
"description": (
"Disable Tor transparent proxy for a PID: delete matching nat rules, move the "
"task back to the parent cgroup, and remove the leaf cgroup if empty. "
"Requires UNSANDBOXED_EXEC."
),
"parameters": {
"type": "object",
"properties": {
"pid": {"type": "integer", "description": "Process ID previously enabled."},
},
"required": ["pid"],
},
"handler": _tor_transproxy_disable,
},
]