"""Per-process transparent Tor proxy via cgroup v2 + iptables NAT.
Moves a PID into a dedicated cgroup v2 slice and adds OUTPUT nat rules that
REDIRECT TCP and DNS (UDP/53) to Tor's TransPort and DNSPort. Linux iptables
cannot match by PID directly; cgroup path matching is used instead.
Requires: Tor with TransPort/DNSPort (see /etc/tor/torrc), cgroup v2 mounted
at /sys/fs/cgroup, iptables xt_cgroup match ( cgroup directory must exist before
adding rules). Transparent TCP only except DNS; other UDP is not routed via Tor.
All tools require the UNSANDBOXED_EXEC privilege.
"""
from __future__ import annotations
import asyncio
import ipaddress
import jsonutil as json
import logging
import re
from pathlib import Path
from typing import Any
from tools._stargazer_pid_cgroup import (
CGROUP_MOUNT,
LEGACY_TOR_CGROUP_PARENT,
STARGAZER_PID_NET_PARENT,
assign_pid_cgroup,
cgroup_parent_path,
cgroup_rel_path,
release_pid_cgroup,
)
logger = logging.getLogger(__name__)
TORRC = Path("/etc/tor/torrc")
NAT_CHAIN = "STARGAZER_TOR_TRANSPROXY"
STATE_DIR = Path("/var/lib/stargazer")
STATE_FILE = STATE_DIR / "tor_transproxy_state.json"
_DEFAULT_TRANS_PORT = 9040
_DEFAULT_DNS_PORT = 5353
# ---------------------------------------------------------------------------
# Privilege + subprocess (match tor_tools / firewall_tools)
# ---------------------------------------------------------------------------
async def _check_priv(ctx: Any) -> str | None:
"""Gate a tool call on the caller holding the ``UNSANDBOXED_EXEC`` privilege.
Every handler in this module runs privileged iptables/cgroup mutations, so
this guard is invoked first to reject callers lacking elevated rights. It
pulls ``redis``, ``config`` and ``user_id`` off the :class:`ToolContext` and
delegates the actual check to ``tools.alter_privileges.has_privilege``,
looking up the privilege flag in the ``PRIVILEGES`` mapping (so it reads the
privilege store backing that helper, typically Redis).
Called by every handler in this module (``_tor_transproxy_status``,
``_tor_transproxy_enable`` and ``_tor_transproxy_disable``) immediately after
the ``ctx is None`` check; mirrors the same pattern used in
``tor_tools`` / ``firewall_tools``.
Args:
ctx: The tool execution context (``ToolContext``), expected to expose
``redis``, ``config`` and ``user_id`` attributes.
Returns:
``None`` when the caller is authorized. Otherwise a JSON-encoded error
string (``{"success": False, "error": ...}``) describing either the
missing privilege or that the privilege subsystem is unavailable, which
the handler returns verbatim to the caller.
"""
try:
from tools.alter_privileges import PRIVILEGES, has_privilege
redis = getattr(ctx, "redis", None)
config = getattr(ctx, "config", None)
user_id = getattr(ctx, "user_id", "") or ""
if not await has_privilege(
redis, user_id, PRIVILEGES["UNSANDBOXED_EXEC"], config
):
return json.dumps(
{
"success": False,
"error": (
"The user does not have the UNSANDBOXED_EXEC privilege. "
"Ask an admin to grant it with the alter_privileges tool."
),
}
)
except ImportError:
return json.dumps({"success": False, "error": "Privilege system unavailable."})
return None
async def _run_cmd(
cmd: list[str], timeout: int = 30, stdin_data: str = ""
) -> tuple[bool, str, str]:
"""Run an external command asynchronously and capture its result.
Thin non-blocking wrapper around ``asyncio.create_subprocess_exec`` used for
all shell-outs in this module (``iptables``, ``iptables-save``, ``ss``).
Optionally feeds ``stdin_data`` to the process and enforces a wall-clock
timeout. Failures are normalized into the return tuple rather than raised so
callers can branch on success without try/except.
This is the only function that spawns subprocesses here; it logs unexpected
exceptions via the module ``logger`` (with traceback) before returning them.
Called by ``_iptables_chain_exists``, ``_ensure_nat_chain_and_jump``,
``_iptables_append``, ``_iptables_delete``, ``_iptables_delete_rules_matching_pid``,
``_remove_output_jump_if_unused`` and ``_tor_transproxy_status``.
Args:
cmd: The command and its arguments as an argv list (no shell).
timeout: Maximum seconds to wait for completion before giving up.
stdin_data: Optional text written to the process's stdin; when empty no
stdin pipe is attached.
Returns:
A ``(ok, stdout, stderr)`` tuple where ``ok`` is ``True`` only when the
process exited 0, and ``stdout``/``stderr`` are decoded, stripped text.
On timeout, missing binary, or any other error, ``ok`` is ``False`` and
``stderr`` carries a human-readable explanation.
"""
try:
proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
stdin=asyncio.subprocess.PIPE if stdin_data else None,
)
stdin_bytes = stdin_data.encode() if stdin_data else None
stdout_b, stderr_b = await asyncio.wait_for(
proc.communicate(input=stdin_bytes), timeout=timeout
)
return (
proc.returncode == 0,
stdout_b.decode(errors="replace").strip(),
stderr_b.decode(errors="replace").strip(),
)
except asyncio.TimeoutError:
return False, "", f"Command timed out after {timeout}s."
except FileNotFoundError as exc:
return False, "", f"Command not found: {exc}"
except Exception as exc:
logger.error("_run_cmd %s: %s", cmd, exc, exc_info=True)
return False, "", str(exc)
# ---------------------------------------------------------------------------
# torrc + state
# ---------------------------------------------------------------------------
def _extract_port_from_tor_directive(line: str, directive: str) -> int | None:
"""Pull the numeric port out of a single torrc ``TransPort``/``DNSPort`` line.
Strips the leading directive keyword, then scans the remaining tokens,
skipping Tor option flags (e.g. ``IsolateClientAddr``, ``IPv6Traffic``) and
stopping at an inline ``#`` comment. Accepts either a bare port number or an
``addr:port`` form, returning the trailing port component.
Pure string parsing with no side effects. Called only by
``parse_tor_trans_dns_ports`` once it has matched a relevant directive line.
Args:
line: A raw (already stripped is fine) torrc configuration line.
directive: The directive keyword to remove from the front of ``line``
(``"TransPort"`` or ``"DNSPort"``).
Returns:
The parsed port as an ``int``, or ``None`` if no numeric port token is
present (e.g. a unix socket or flags-only directive).
"""
rest = re.sub(rf"^{re.escape(directive)}\s+", "", line.strip(), flags=re.IGNORECASE)
for tok in rest.split():
if tok.startswith("#"):
break
lower = tok.lower()
if lower in frozenset(
"nofetchisolateclientaddr isolateclientaddr isolatesessionprotocol "
"ipv6traffic preferipv6 autoipv6".split()
):
continue
if ":" in tok:
try:
return int(tok.rsplit(":", 1)[-1])
except ValueError:
continue
if tok.isdigit():
return int(tok)
return None
[docs]
def parse_tor_trans_dns_ports() -> tuple[int, int]:
"""Resolve the Tor TransPort and DNSPort to redirect to from torrc.
The transparent proxy must REDIRECT a process's TCP to Tor's ``TransPort`` and
its DNS to Tor's ``DNSPort``, so this reads the live ``/etc/tor/torrc`` (the
module-level ``TORRC`` path) to discover the ports the local Tor instance is
actually listening on. It scans non-comment lines for ``TransPort`` and
``DNSPort`` directives (case-insensitive) and extracts the numeric port from
each via ``_extract_port_from_tor_directive``, so flag-only or ``addr:port``
forms are handled. When torrc is missing or a directive is absent, it falls back
to the module defaults (``_DEFAULT_TRANS_PORT`` 9040, ``_DEFAULT_DNS_PORT`` 5353).
It only reads the torrc file and has no other side effects. It is called by
``_tor_transproxy_status`` (to report the configured ports) and by
``_tor_transproxy_enable`` (to build the per-PID REDIRECT rule specs).
Returns:
A ``(trans_port, dns_port)`` tuple of the resolved ports.
"""
trans, dns = _DEFAULT_TRANS_PORT, _DEFAULT_DNS_PORT
if not TORRC.exists():
return trans, dns
for raw in TORRC.read_text().splitlines():
s = raw.strip()
if not s or s.startswith("#"):
continue
if re.match(r"TransPort\b", s, re.I):
p = _extract_port_from_tor_directive(s, "TransPort")
if p is not None:
trans = p
elif re.match(r"DNSPort\b", s, re.I):
p = _extract_port_from_tor_directive(s, "DNSPort")
if p is not None:
dns = p
return trans, dns
def _load_state() -> dict[str, Any]:
"""Load the on-disk transproxy state mapping managed PIDs to their rules.
Reads and JSON-decodes ``STATE_FILE`` (``/var/lib/stargazer/tor_transproxy_state.json``),
which records each managed PID's cgroup path, excluded nets, ports and the
exact iptables rule specs that were installed. Tolerates a missing or
corrupt file by logging a warning and returning a fresh empty structure.
Called by ``_remove_output_jump_if_unused``, ``_tor_transproxy_status``,
``_tor_transproxy_enable`` and ``_tor_transproxy_disable`` at the start of
their state-aware logic.
Returns:
The decoded state ``dict``; always contains a ``"managed"`` key (an empty
dict when the file is absent or unreadable).
"""
if not STATE_FILE.is_file():
return {"managed": {}}
try:
return json.loads(STATE_FILE.read_text(encoding="utf-8"))
except (json.JSONDecodeError, OSError) as e:
logger.warning("tor transproxy state read failed: %s", e)
return {"managed": {}}
def _save_state(state: dict[str, Any]) -> None:
"""Persist the transproxy state dict to disk as pretty-printed JSON.
Ensures ``STATE_DIR`` (``/var/lib/stargazer``) exists, then writes ``state``
to ``STATE_FILE``, overwriting any previous contents. This is the durable
record of which PIDs are managed and the iptables rules installed for them,
consulted across separate tool invocations.
Called by ``_tor_transproxy_enable`` after rules are installed and by
``_tor_transproxy_disable`` after rules/cgroups are torn down.
Args:
state: The state mapping (typically with a ``"managed"`` key) to write.
"""
STATE_DIR.mkdir(parents=True, exist_ok=True)
STATE_FILE.write_text(json.dumps(state, indent=2), encoding="utf-8")
def _validate_cidrs(nets: list[str]) -> str | None:
"""Validate that every entry in ``nets`` parses as an IP network.
Uses ``ipaddress.ip_network(..., strict=False)`` so host bits are tolerated,
guarding the exclude-nets input before it is fed into iptables ``-d`` rule
specs. Pure validation with no side effects.
Called by ``_tor_transproxy_enable`` to sanity-check the ``exclude_nets``
argument supplied to the tool.
Args:
nets: Candidate CIDR/IP-network strings to validate.
Returns:
``None`` if all entries are valid, otherwise an error message string
naming the first offending value.
"""
for n in nets:
try:
ipaddress.ip_network(n, strict=False)
except ValueError:
return f"Invalid CIDR: {n!r}"
return None
# ---------------------------------------------------------------------------
# iptables + cgroup helpers
# ---------------------------------------------------------------------------
async def _iptables_chain_exists() -> bool:
"""Report whether the module's dedicated nat chain already exists.
Runs ``iptables -t nat -S STARGAZER_TOR_TRANSPROXY`` via ``_run_cmd``; the
command succeeds only when the custom chain is present, so its exit status
doubles as the existence check.
Called by ``_ensure_nat_chain_and_jump`` (to decide whether to create the
chain), ``_tor_transproxy_status`` (to report chain state) and
``_tor_transproxy_disable`` (to decide whether to scan for orphaned rules).
Returns:
``True`` if the ``STARGAZER_TOR_TRANSPROXY`` nat chain exists, else
``False``.
"""
ok, _, _ = await _run_cmd(["iptables", "-t", "nat", "-S", NAT_CHAIN], timeout=10)
return ok
async def _ensure_nat_chain_and_jump() -> tuple[bool, str]:
"""Idempotently create the nat chain and the OUTPUT jump into it.
Ensures two pieces of iptables state exist: the custom
``STARGAZER_TOR_TRANSPROXY`` chain (created with ``-N`` only if
``_iptables_chain_exists`` reports it missing) and an ``OUTPUT -j`` jump into
that chain (added with ``-A`` only if a ``-C`` check shows it is absent).
This is the prerequisite plumbing that lets per-PID REDIRECT rules take
effect on locally generated traffic.
Runs ``iptables`` through ``_run_cmd``. Called by ``_tor_transproxy_enable``
before it appends the per-PID rule specs.
Returns:
A ``(ok, error)`` tuple: ``(True, "")`` when both the chain and the jump
are present, or ``(False, message)`` if creating the chain or adding the
OUTPUT jump failed.
"""
if not await _iptables_chain_exists():
ok, _, err = await _run_cmd(
["iptables", "-t", "nat", "-N", NAT_CHAIN], timeout=10
)
if not ok:
return False, err or "Failed to create nat chain."
ok, _, _ = await _run_cmd(
["iptables", "-t", "nat", "-C", "OUTPUT", "-j", NAT_CHAIN],
timeout=10,
)
if ok:
return True, ""
ok, _, err = await _run_cmd(
["iptables", "-t", "nat", "-A", "OUTPUT", "-j", NAT_CHAIN],
timeout=10,
)
if not ok:
return False, err or "Failed to add OUTPUT jump."
return True, ""
async def _iptables_append(spec: list[str]) -> tuple[bool, str]:
"""Append one rule spec to the module's nat chain.
Prefixes ``spec`` with ``iptables -t nat -A STARGAZER_TOR_TRANSPROXY`` and
runs it via ``_run_cmd``, installing a single REDIRECT/RETURN rule produced
by ``_build_rule_specs``.
Called by ``_tor_transproxy_enable`` in a loop over the built rule specs.
Args:
spec: The argv tail (rule match + target) to append to the chain.
Returns:
A ``(ok, error)`` tuple; ``error`` is ``""`` on success or the iptables
error text on failure.
"""
cmd = ["iptables", "-t", "nat", "-A", NAT_CHAIN, *spec]
ok, _, err = await _run_cmd(cmd, timeout=10)
return ok, err or ""
async def _iptables_delete(spec: list[str]) -> tuple[bool, str]:
"""Delete one rule spec from the module's nat chain.
Prefixes ``spec`` with ``iptables -t nat -D STARGAZER_TOR_TRANSPROXY`` and
runs it via ``_run_cmd``. Used both for normal teardown and for rolling back
a partially applied enable.
Called by ``_tor_transproxy_enable`` (rollback of already-added rules on
error), ``_iptables_delete_rules_matching_pid`` (per orphaned rule), and
``_tor_transproxy_disable`` (deleting each saved rule spec).
Args:
spec: The argv tail of the rule to delete (must match an installed
rule exactly).
Returns:
A ``(ok, error)`` tuple; ``(True, "")`` on success, or ``(False, message)``
with the iptables error text on failure.
"""
cmd = ["iptables", "-t", "nat", "-D", NAT_CHAIN, *spec]
ok, _, err = await _run_cmd(cmd, timeout=10)
if not ok:
return False, err or ""
return True, ""
async def _iptables_delete_rules_matching_pid(pid: int) -> list[str]:
"""Best-effort sweep that deletes any orphaned nat rules tagged for one PID.
Each rule this module installs carries a comment of the form ``stg-tor-{pid}-*``
(see ``_build_rule_specs``), so when a PID is being disabled but no longer
appears in saved state, this scans the live chain and removes every rule still
bearing that PID's tag. This is the recovery path for rules that leaked out of
sync with the persisted state file.
It dumps the chain with ``iptables -t nat -S STARGAZER_TOR_TRANSPROXY`` via
``_run_cmd``, then deletes each matching rule through ``_iptables_delete``;
failures are collected rather than raised. It is called only by
``_tor_transproxy_disable`` (in the branch where the PID is absent from state
but the nat chain exists). A like-named helper in ``tools/pid_vpn_route_tools.py``
is a separate function, not a caller of this one.
Args:
pid: The Linux process ID whose tagged rules should be removed.
Returns:
A list of error strings, one per delete that failed (or a single entry if
the chain dump itself failed); empty when every matching rule was removed
cleanly.
"""
errors: list[str] = []
ok, out, err = await _run_cmd(
["iptables", "-t", "nat", "-S", NAT_CHAIN], timeout=10
)
if not ok:
return [err or "iptables -S chain failed (chain may not exist)."]
prefix = f"-A {NAT_CHAIN} "
needle = f"stg-tor-{pid}-"
for line in out.splitlines():
if not line.startswith(prefix) or needle not in line:
continue
spec = line[len(prefix) :].strip().split()
ok_d, e_d = await _iptables_delete(spec)
if not ok_d:
errors.append(e_d or "delete failed")
return errors
async def _remove_output_jump_if_unused() -> None:
"""Remove the OUTPUT->chain jump once no PIDs remain managed.
After the last managed PID is disabled, the ``OUTPUT -j STARGAZER_TOR_TRANSPROXY``
jump serves no purpose, so this tears it down to leave the host's nat table
clean. It reads current state via ``_load_state`` and bails out early if any
PIDs are still managed; otherwise it checks for the jump (``-C``) and deletes
it (``-D``) through ``_run_cmd``. The empty chain itself is left in place.
Called by ``_tor_transproxy_disable`` at the end of teardown.
Returns:
``None``. Side effects only; failures to delete the jump are ignored.
"""
state = _load_state()
if state.get("managed"):
return
ok, _, _ = await _run_cmd(
["iptables", "-t", "nat", "-C", "OUTPUT", "-j", NAT_CHAIN],
timeout=10,
)
if ok:
await _run_cmd(
["iptables", "-t", "nat", "-D", "OUTPUT", "-j", NAT_CHAIN], timeout=10
)
def _build_rule_specs(
pid: int,
cgroup_path: str,
trans_port: int,
dns_port: int,
exclude_nets: list[str],
) -> list[list[str]]:
"""Build the ordered iptables nat rule specs that route one PID through Tor.
Produces the exact per-PID rule set the enable path installs (and that disable
later deletes), all matched to the process by its cgroup v2 ``--path`` since
iptables cannot match by PID directly. The ``exclude_nets`` entries come first
as ``RETURN`` rules so excluded destinations bypass Tor, followed by a TCP
``REDIRECT`` to ``trans_port`` and a UDP/53 ``REDIRECT`` to ``dns_port``. Every
rule carries a unique ``stg-tor-{pid}-*`` comment so it can be identified and
cleaned up later, including the orphan sweep in
``_iptables_delete_rules_matching_pid``.
This is a pure builder: it only constructs argv lists (validating each net via
``ipaddress.ip_network``) and performs no I/O. It is called by
``_tor_transproxy_enable``, which stores the returned specs in state and feeds
each to ``_iptables_append``.
Args:
pid: The Linux process ID being proxied; embedded into each rule's comment.
cgroup_path: The relative cgroup v2 path used as the iptables ``cgroup``
match for the process.
trans_port: Tor's TransPort; TCP traffic is redirected here.
dns_port: Tor's DNSPort; UDP/53 traffic is redirected here.
exclude_nets: Destination CIDRs that should bypass Tor (emitted as leading
``RETURN`` rules).
Returns:
A list of argv tails, each an inner list suitable as the rule portion of an
``iptables -t nat -A``/``-D STARGAZER_TOR_TRANSPROXY`` command.
"""
specs: list[list[str]] = []
for i, net in enumerate(exclude_nets):
specs.append(
[
"-d",
str(ipaddress.ip_network(net, strict=False)),
"-m",
"cgroup",
"--path",
cgroup_path,
"-m",
"comment",
"--comment",
f"stg-tor-{pid}-ex-{i}",
"-j",
"RETURN",
]
)
specs.append(
[
"-p",
"tcp",
"-m",
"cgroup",
"--path",
cgroup_path,
"-m",
"comment",
"--comment",
f"stg-tor-{pid}-tcp",
"-j",
"REDIRECT",
"--to-ports",
str(trans_port),
]
)
specs.append(
[
"-p",
"udp",
"--dport",
"53",
"-m",
"cgroup",
"--path",
cgroup_path,
"-m",
"comment",
"--comment",
f"stg-tor-{pid}-dns",
"-j",
"REDIRECT",
"--to-ports",
str(dns_port),
]
)
return specs
async def _assign_pid_cgroup(pid: int) -> tuple[bool, str]:
"""Move ``pid`` into its dedicated cgroup v2 leaf (async shim).
Async wrapper that delegates straight to the synchronous
``tools._stargazer_pid_cgroup.assign_pid_cgroup``, which creates the
``stargazer-pid-net/<pid>`` leaf and writes the PID into its
``cgroup.procs``. Existing for call-site symmetry with the async iptables
helpers; it performs no awaiting itself.
Called by ``_tor_transproxy_enable`` before installing the per-PID nat rules.
Args:
pid: The Linux process ID to classify into the cgroup leaf.
Returns:
The ``(ok, error)`` tuple returned by ``assign_pid_cgroup``.
"""
return assign_pid_cgroup(pid)
async def _release_pid_cgroup(pid: int) -> tuple[bool, str]:
"""Move ``pid`` back to the parent slice and drop its empty leaf (async shim).
Async wrapper delegating to ``tools._stargazer_pid_cgroup.release_pid_cgroup``,
which writes the PID into the parent cgroup's ``cgroup.procs`` and removes the
now-empty ``stargazer-pid-net/<pid>`` leaf (best-effort). No awaiting occurs
here; the shim exists to match the async helper style.
Called by ``_tor_transproxy_enable`` (rollback path) and
``_tor_transproxy_disable`` (normal teardown).
Args:
pid: The Linux process ID to return to the parent cgroup.
Returns:
The ``(ok, error)`` tuple returned by ``release_pid_cgroup``.
"""
return release_pid_cgroup(pid)
# ---------------------------------------------------------------------------
# Handlers
# ---------------------------------------------------------------------------
async def _tor_transproxy_status(ctx: Any = None) -> str:
"""Report the current Tor transparent-proxy configuration and runtime state.
Implements the ``tor_transproxy_status`` tool. Gathers a read-only snapshot:
the TransPort/DNSPort parsed from torrc, whether cgroup v2 is mounted,
whether iptables can match by cgroup (probed by briefly adding and removing a
temporary ``OUTPUT`` rule against a throwaway probe cgroup directory), whether
the nat chain exists, which Tor listeners are bound on the relevant ports, the
nat-table lines mentioning this module's chain or cgroup parents, and the set
of managed PIDs from saved state.
Reads torrc via ``parse_tor_trans_dns_ports``; shells out through ``_run_cmd``
to ``iptables``, ``iptables-save`` and ``ss``; uses ``_iptables_chain_exists``
and ``_load_state``; and creates/removes a probe directory under
``CGROUP_MOUNT``. After authorizing the caller via ``_check_priv``, it makes no
persistent changes. This handler is invoked dynamically by the tool dispatcher
(registered in ``TOOLS`` and called from ``tools/__init__.py`` as
``tool_def.handler(..., ctx=ctx)``); no direct internal callers exist.
Args:
ctx: The tool execution context (``ToolContext``); ``None`` only when
invoked without a context, which is rejected.
Returns:
A JSON string. On success, a status report with the keys described above;
on failure (missing context or privilege), a ``{"success": False,
"error": ...}`` object.
"""
if ctx is None:
return json.dumps({"success": False, "error": "No context."})
auth_err = await _check_priv(ctx)
if auth_err:
return auth_err
trans_port, dns_port = parse_tor_trans_dns_ports()
cgroup_ok = (
CGROUP_MOUNT.is_dir() and (CGROUP_MOUNT / "cgroup.controllers").is_file()
)
chain_ok = await _iptables_chain_exists()
ok_cgroup_match = False
cgroup_match_err = ""
if cgroup_ok:
test_leaf = CGROUP_MOUNT / "stargazer-iptables-probe"
try:
test_leaf.mkdir(exist_ok=True)
ok_probe, _, e2 = await _run_cmd(
[
"iptables",
"-t",
"nat",
"-A",
"OUTPUT",
"-m",
"cgroup",
"--path",
"stargazer-iptables-probe",
"-j",
"RETURN",
],
timeout=10,
)
if ok_probe:
ok_cgroup_match = True
await _run_cmd(
[
"iptables",
"-t",
"nat",
"-D",
"OUTPUT",
"-m",
"cgroup",
"--path",
"stargazer-iptables-probe",
"-j",
"RETURN",
],
timeout=10,
)
else:
cgroup_match_err = e2 or "iptables cgroup match failed"
except OSError:
cgroup_match_err = "Could not create probe cgroup directory"
finally:
try:
test_leaf.rmdir()
except OSError:
pass
ok_save, save_out, _ = await _run_cmd(["iptables-save", "-t", "nat"], timeout=15)
chain_dump = ""
if ok_save:
chain_dump = "\n".join(
ln
for ln in save_out.splitlines()
if NAT_CHAIN in ln
or f"{STARGAZER_PID_NET_PARENT}/" in ln
or f"{LEGACY_TOR_CGROUP_PARENT}/" in ln
)
ok_ss, ss_out, _ = await _run_cmd(["ss", "-lnptuH"], timeout=10)
listeners = []
if ok_ss:
for line in ss_out.splitlines():
if f":{trans_port}" in line or f":{dns_port}" in line:
listeners.append(line.strip())
state = _load_state()
managed = state.get("managed", {})
legacy_paths = any(
LEGACY_TOR_CGROUP_PARENT in str(v.get("cgroup_rel_path", ""))
for v in managed.values()
)
return json.dumps(
{
"success": True,
"torrc_trans_port": trans_port,
"torrc_dns_port": dns_port,
"cgroup_parent": STARGAZER_PID_NET_PARENT,
"legacy_tor_cgroup_paths_in_state": legacy_paths,
"cgroup_v2_mounted": cgroup_ok,
"iptables_cgroup_match_ok": ok_cgroup_match,
"iptables_cgroup_match_error": cgroup_match_err or None,
"nat_chain_exists": chain_ok,
"tor_listeners": listeners,
"managed_pids": list(managed.keys()),
"managed_detail": managed,
"nat_chain_matching_lines": chain_dump or "(none)",
"note": (
"UDP other than DNS/53 is not sent through Tor. "
"These tools edit iptables nat directly (not UFW). "
f"Per-PID cgroup leaf is {STARGAZER_PID_NET_PARENT}/<pid> (shared with pid_vpn_route_*). "
"If cgroup_rel_path still shows the old tor-proxy name, disable and re-enable to migrate."
),
},
indent=2,
)
async def _tor_transproxy_enable(
pid: int,
exclude_nets: list[str] | None = None,
ctx: Any = None,
) -> str:
"""Route one process's traffic through Tor via cgroup classification + NAT.
Implements the ``tor_transproxy_enable`` tool. Moves ``pid`` into a dedicated
cgroup v2 leaf and installs iptables nat rules that REDIRECT the process's TCP
to Tor's TransPort and its UDP/53 DNS to Tor's DNSPort, with optional
``exclude_nets`` matched first and sent to RETURN (bypassing Tor). The
operation is transactional: if any rule fails to install, already-added rules
are rolled back and the cgroup assignment is released before returning an
error. On success the PID, its cgroup path, ports, exclude-nets and exact rule
specs are recorded in persisted state.
After ``_check_priv`` authorization, it validates the PID exists
(``/proc/{pid}``) and the CIDRs via ``_validate_cidrs``, refuses to double-
manage a PID already in ``_load_state``, derives the cgroup path via
``cgroup_rel_path`` and ports via ``parse_tor_trans_dns_ports``, builds rules
with ``_build_rule_specs``, ensures the cgroup parent dir exists, classifies
the PID with ``_assign_pid_cgroup``, plumbs the chain/jump with
``_ensure_nat_chain_and_jump``, appends each rule with ``_iptables_append``
(rolling back via ``_iptables_delete`` / ``_release_pid_cgroup`` on failure),
and persists via ``_save_state``. This handler is dispatched dynamically from
the ``TOOLS`` registry (``tools/__init__.py`` calls ``tool_def.handler(...,
ctx=ctx)``); no direct internal callers exist.
Args:
pid: The Linux process ID to transparently proxy through Tor; must be a
positive integer naming a live process.
exclude_nets: Optional list of IPv4 CIDRs whose destination traffic
bypasses Tor (RETURN). Defaults to ``["127.0.0.0/8"]`` when ``None``.
ctx: The tool execution context (``ToolContext``); ``None`` is rejected.
Returns:
A JSON string: on success, the PID, cgroup path, ports, exclude-nets and a
summary message; on failure, a ``{"success": False, "error": ...}`` object
(bad/duplicate PID, invalid CIDR, missing cgroup mount, or an iptables/
cgroup error after rollback).
"""
if ctx is None:
return json.dumps({"success": False, "error": "No context."})
auth_err = await _check_priv(ctx)
if auth_err:
return auth_err
if pid < 1:
return json.dumps(
{"success": False, "error": "pid must be a positive integer."}
)
if not Path(f"/proc/{pid}").is_dir():
return json.dumps({"success": False, "error": f"Process {pid} does not exist."})
nets = list(exclude_nets) if exclude_nets is not None else ["127.0.0.0/8"]
bad = _validate_cidrs(nets)
if bad:
return json.dumps({"success": False, "error": bad})
state = _load_state()
managed = state.setdefault("managed", {})
if str(pid) in managed:
return json.dumps(
{
"success": False,
"error": f"PID {pid} is already managed. Use tor_transproxy_disable first.",
}
)
if not CGROUP_MOUNT.is_dir():
return json.dumps({"success": False, "error": "/sys/fs/cgroup not available."})
cg_rel = cgroup_rel_path(pid)
trans_port, dns_port = parse_tor_trans_dns_ports()
specs = _build_rule_specs(pid, cg_rel, trans_port, dns_port, nets)
cgroup_parent_path().mkdir(parents=True, exist_ok=True)
ok_cg, e_cg = await _assign_pid_cgroup(pid)
if not ok_cg:
return json.dumps({"success": False, "error": e_cg})
added: list[list[str]] = []
try:
ok_j, e_j = await _ensure_nat_chain_and_jump()
if not ok_j:
raise RuntimeError(e_j)
for spec in specs:
ok_a, e_a = await _iptables_append(spec)
if not ok_a:
raise RuntimeError(e_a or "iptables -A failed")
added.append(spec)
except Exception as exc:
while added:
spec = added.pop()
await _iptables_delete(spec)
await _release_pid_cgroup(pid)
return json.dumps({"success": False, "error": str(exc)})
managed[str(pid)] = {
"cgroup_rel_path": cg_rel,
"exclude_nets": nets,
"trans_port": trans_port,
"dns_port": dns_port,
"rule_specs": specs,
}
_save_state(state)
return json.dumps(
{
"success": True,
"pid": pid,
"cgroup_rel_path": cg_rel,
"trans_port": trans_port,
"dns_port": dns_port,
"exclude_nets": nets,
"message": (
f"PID {pid} moved to cgroup {cg_rel}; NAT redirects TCP -> {trans_port}, "
f"DNS/udp53 -> {dns_port}."
),
},
indent=2,
)
async def _tor_transproxy_disable(pid: int, ctx: Any = None) -> str:
"""Tear down a PID's Tor transparent-proxy rules and release its cgroup.
Implements the ``tor_transproxy_disable`` tool. Removes the nat rules that
redirected the process through Tor, moves the task back to the parent cgroup
slice, drops the now-empty leaf, and removes the global OUTPUT jump if no PIDs
remain managed. When the PID is present in saved state its exact recorded rule
specs are deleted (in reverse); if it is absent but the chain exists, a
best-effort comment-based scan removes any orphaned rules tagged for this PID.
After ``_check_priv`` authorization it loads state via ``_load_state``, deletes
rules with ``_iptables_delete`` (or scans via ``_iptables_delete_rules_matching_pid``),
returns the task with ``_release_pid_cgroup``, persists via ``_save_state``,
and finally calls ``_remove_output_jump_if_unused``. This handler is dispatched
dynamically from the ``TOOLS`` registry (``tools/__init__.py`` calls
``tool_def.handler(..., ctx=ctx)``); no direct internal callers exist.
Args:
pid: The Linux process ID previously enabled; must be a positive integer.
It need not still be alive — cleanup is attempted regardless.
ctx: The tool execution context (``ToolContext``); ``None`` is rejected.
Returns:
A JSON string. On success, a confirmation message plus any non-fatal
``warnings`` (e.g. cgroup-release issues). On failure, a
``{"success": False, "error": ...}`` object — for a managed PID when rule
deletion fails (with a hint to inspect the chain), or for an unmanaged PID
when the iptables cleanup itself fails.
"""
if ctx is None:
return json.dumps({"success": False, "error": "No context."})
auth_err = await _check_priv(ctx)
if auth_err:
return auth_err
if pid < 1:
return json.dumps(
{"success": False, "error": "pid must be a positive integer."}
)
state = _load_state()
managed = state.setdefault("managed", {})
key = str(pid)
entry = managed.get(key)
errors: list[str] = []
if entry:
for spec in reversed(entry.get("rule_specs", [])):
ok_d, err_d = await _iptables_delete(spec)
if not ok_d and err_d:
errors.append(err_d)
if errors:
return json.dumps(
{
"success": False,
"pid": pid,
"error": "; ".join(errors),
"hint": "Rules may be out of sync; inspect iptables -t nat -S STARGAZER_TOR_TRANSPROXY.",
}
)
managed.pop(key, None)
elif await _iptables_chain_exists():
scan_errs = await _iptables_delete_rules_matching_pid(pid)
if scan_errs:
errors.extend(scan_errs)
ok_rel, e_rel = await _release_pid_cgroup(pid)
if not ok_rel:
errors.append(e_rel)
_save_state(state)
await _remove_output_jump_if_unused()
if entry is None and errors:
return json.dumps(
{
"success": False,
"pid": pid,
"error": (
f"PID {pid} was not in managed state; iptables cleanup failed. "
f"Details: {'; '.join(errors)}"
),
}
)
return json.dumps(
{
"success": True,
"pid": pid,
"warnings": errors or None,
"message": f"Removed Tor transparent proxy rules for PID {pid} and released cgroup.",
},
indent=2,
)
# ---------------------------------------------------------------------------
# TOOLS
# ---------------------------------------------------------------------------
TOOLS = [
{
"name": "tor_transproxy_status",
"description": (
"Report Tor transparent-proxy ports from torrc, listener status, cgroup v2 / "
"iptables cgroup match probe, STARGAZER Tor NAT chain snapshot, and managed PIDs. "
"Requires UNSANDBOXED_EXEC."
),
"parameters": {"type": "object", "properties": {}},
"handler": _tor_transproxy_status,
},
{
"name": "tor_transproxy_enable",
"description": (
"Enable Tor transparent proxy for one process: move PID into cgroup v2 "
f"{STARGAZER_PID_NET_PARENT}/<pid> and append iptables nat rules redirecting TCP to Tor "
"TransPort and UDP/53 to DNSPort. exclude_nets lists destinations that bypass Tor "
"(RETURN); default is 127.0.0.0/8 only. Requires root-equivalent UNSANDBOXED_EXEC."
),
"parameters": {
"type": "object",
"properties": {
"pid": {"type": "integer", "description": "Linux process ID to proxy."},
"exclude_nets": {
"type": "array",
"items": {"type": "string"},
"description": (
"Optional list of IPv4 CIDRs to exclude from redirection (RETURN). "
'Default: ["127.0.0.0/8"].'
),
},
},
"required": ["pid"],
},
"handler": _tor_transproxy_enable,
},
{
"name": "tor_transproxy_disable",
"description": (
"Disable Tor transparent proxy for a PID: delete matching nat rules, move the "
"task back to the parent cgroup, and remove the leaf cgroup if empty. "
"Requires UNSANDBOXED_EXEC."
),
"parameters": {
"type": "object",
"properties": {
"pid": {
"type": "integer",
"description": "Process ID previously enabled.",
},
},
"required": ["pid"],
},
"handler": _tor_transproxy_disable,
},
]