"""Watchdog-renewed distributed lock for per-channel serialization.
:class:`DistributedLock` implements a token-based Redis lock
(``sg:lock:channel:{id}``) released and renewed via Lua scripts so
only the true owner can act. A background watchdog auto-renews the
lease for long-running work and can abort the holder if the lease is
lost, which the inference worker uses to guarantee that messages for
one channel are processed serially without corrupting NCM/history
state.
"""
import uuid
import logging
import asyncio
from typing import Any, Optional
logger = logging.getLogger("stargazer.lock")
# Lua script to release the lock only if the token matches
RELEASE_SCRIPT = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"""
# Lua script to renew the lock only if the token still owns it
RENEW_SCRIPT = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("expire", KEYS[1], tonumber(ARGV[2]))
else
return 0
end
"""
[docs]
class DistributedLock:
"""Token-fenced Redis lock with optional watchdog lease renewal.
Implements a single-key mutual-exclusion lock over Redis using ``SET NX EX``
to claim ownership and a per-instance UUID4 fencing token so only the true
owner can renew or release the key (both done via the
:data:`RELEASE_SCRIPT` and :data:`RENEW_SCRIPT` Lua scripts). When
``auto_renew`` is enabled a background watchdog keeps the lease alive for
long-running work and, on ``abort_on_loss``, cancels the owning asyncio task
if the lease is lost so a stale holder cannot corrupt shared state.
Acquisition is re-entrant within one instance, supports blocking polling, and
works as an async context manager; when no Redis client is supplied it
degrades to a purely local lock.
Instantiated across the codebase for per-channel and single-flight
serialization, including ``core.stream_consumer`` and
``core.outbound_consumer`` (message ordering), ``ncm_variant_cache``,
``ops_planner``, ``star_self_mirror``, ``sword.monitor``, and
``plugins.swarm_engine.state_manager``.
"""
[docs]
def __init__(
self,
redis: Any,
key: str,
ttl: int = 30,
auto_renew: bool = False,
max_renewals: int = 5,
abort_on_loss: bool = True
):
"""Initialize a token-based Redis lock for a single key.
Generates a unique fencing token (a UUID4) that identifies this lock
instance as the legitimate owner and configures the lease TTL,
watchdog auto-renewal policy, and abort-on-loss behavior. No Redis
I/O happens here; the key is only claimed later in :meth:`acquire`.
The constructed lock holds a reference to the supplied async Redis
client but performs no network calls. It is instantiated throughout
the codebase to serialize work per logical resource, e.g.
``core.stream_consumer`` (key ``sg:lock:channel:{channel_id}``) and
``core.outbound_consumer`` (key
``sg:lock:outbound:{platform}:{channel_id}``) for per-channel
message ordering, as well as ``ncm_variant_cache``, ``ops_planner``,
``star_self_mirror``, ``sword.monitor``, and
``plugins.swarm_engine.state_manager`` for their respective
single-flight generation locks.
Args:
redis: An async Redis client (or ``None`` for a degraded
local-only fallback in which acquisition always succeeds
without touching Redis).
key: The Redis key the lock occupies (e.g.
``sg:lock:channel:{channel_id}``).
ttl: Lease time-to-live in seconds applied to the key on
``SET NX EX`` and on each watchdog renewal.
auto_renew: When ``True``, a background watchdog task is started
on acquire to keep the lease alive for long-running work.
max_renewals: Maximum number of watchdog renewals before the
lease is voluntarily given up; ``<= 0`` means unlimited
(used by ``stream_consumer`` so long-running turns never
expire).
abort_on_loss: When ``True``, the watchdog cancels the owning
asyncio task if the lease is lost or the renewal limit is
reached, preventing a stale holder from corrupting state.
"""
self._redis = redis
self._key = key
self._ttl = ttl
self._auto_renew = auto_renew
self._max_renewals = max_renewals
self._abort_on_loss = abort_on_loss
self._token = str(uuid.uuid4())
self._acquired = False
# Watchdog & Re-entrancy states
self._acquisition_count = 0
self._watchdog_task: Optional[asyncio.Task] = None
self._owner_task: Optional[asyncio.Task] = None
self._renewals = 0
self._released_count = 0
[docs]
def keys_released_count(self) -> int:
"""Return how many times this lock physically deleted its Redis key.
Reports ``self._released_count``, which :meth:`release` increments only on
a real compare-and-delete against Redis (not on re-entrant or local
releases). Primarily an observability hook for tests asserting that a
re-entrant hold does not delete the key prematurely; read by
``tests/adversarial/test_lock_watchdog.py``.
Returns:
int: The number of physical key deletions performed by this instance.
"""
return self._released_count
[docs]
async def acquire(self, raise_contention_log: bool = True) -> bool:
"""Attempt to claim the lock once, without blocking.
Tries to atomically take ownership of the key. Acquisition is
re-entrant: if this instance already holds the lock, the internal
acquisition count is incremented and the call succeeds immediately
without contacting Redis. On a fresh acquire, the watchdog is started
when ``auto_renew`` is enabled.
Interacts with Redis via ``SET key token NX EX ttl`` to claim the key
only when it is unheld; on success it records the owning asyncio task
(for later abort-on-loss) and, when ``auto_renew`` is set, schedules
:meth:`_run_watchdog` as a background task that renews the lease. When
``self._redis`` is ``None`` it falls back to a purely local acquire.
Redis errors are caught and logged, leaving the lock unacquired.
Called by the non-blocking single-flight paths in ``ncm_variant_cache``
(variant and embedding generation locks), ``ops_planner``,
``star_self_mirror``, and ``sword.monitor``, each of which skip their
work when acquisition fails. It is also the primitive that
:meth:`acquire_blocking` polls and that :meth:`__aenter__` calls.
Args:
raise_contention_log: When ``True``, log lost contention at INFO
("skipping execution"); when ``False``, log at DEBUG
("retrying") so blocking pollers stay quiet.
Returns:
bool: ``True`` if the lock is now held by this instance
(including re-entrant and local-fallback cases), ``False`` on
contention or a Redis error.
"""
# Re-entrant acquisition check
if self._acquired:
self._acquisition_count += 1
logger.debug("Re-entered lock: %s (count: %d)", self._key, self._acquisition_count)
return True
if not self._redis:
# Handle local fallback when Redis is not available
self._acquired = True
self._acquisition_count = 1
return True
try:
result = await self._redis.set(self._key, self._token, nx=True, ex=self._ttl)
# redis.set returns True (or b"OK") on success, None or False on failure
self._acquired = result is not None and result is not False
if self._acquired:
self._acquisition_count = 1
self._renewals = 0
self._owner_task = asyncio.current_task()
logger.debug("Acquired lock: %s", self._key)
# Start watchdog lease renewal if enabled
if self._auto_renew:
logger.debug("Starting watchdog lease auto-renewal for key: %s (TTL: %d, max renewals: %d)", self._key, self._ttl, self._max_renewals)
self._watchdog_task = asyncio.create_task(self._run_watchdog())
else:
if raise_contention_log:
logger.info("Lock contention for key: %s, skipping execution", self._key)
else:
logger.debug("Lock contention for key: %s, retrying...", self._key)
except Exception as e:
logger.warning("Failed to acquire Redis lock for key %s: %s", self._key, e)
self._acquired = False
return self._acquired
[docs]
async def release(self) -> None:
"""Release the lock, honoring re-entrancy and stopping the watchdog.
No-ops when the lock is not held. For a re-entrant hold (acquisition
count greater than one), this only decrements the count and returns
without touching Redis, so the key is only freed on the outermost
release. On the final release it cancels any running watchdog task,
clears owner state, and deletes the key.
Interacts with Redis by evaluating :data:`RELEASE_SCRIPT` (a
compare-and-delete Lua script keyed on the fencing token) so the key
is removed only if this instance still owns it, never clobbering a
lock that was stolen and re-taken by another holder; the physical
release is tallied in ``self._released_count`` (exposed via
:meth:`keys_released_count`). When ``self._redis`` is ``None`` it
simply clears local state. Redis errors are caught and logged, and
``self._acquired`` is cleared in all cases.
Called from the ``finally`` blocks that pair with acquisition in
``core.stream_consumer`` and ``core.outbound_consumer`` (per-channel
serialization), as well as ``ncm_variant_cache``, ``ops_planner``,
``star_self_mirror``, and ``sword.monitor``; it is also invoked by
:meth:`__aexit__` for ``async with`` usage.
"""
if not self._acquired:
return
# Re-entrant release check
if self._acquisition_count > 1:
self._acquisition_count -= 1
logger.debug("Released re-entered lock: %s (remaining: %d)", self._key, self._acquisition_count)
return
# Normal release
self._acquisition_count = 0
self._owner_task = None
# Stop watchdog task if active
if self._watchdog_task is not None:
self._watchdog_task.cancel()
self._watchdog_task = None
logger.debug("Terminated watchdog lease auto-renewal for key: %s", self._key)
if not self._redis:
self._acquired = False
return
try:
await self._redis.eval(RELEASE_SCRIPT, 1, self._key, self._token)
self._released_count += 1
logger.debug("Released lock: %s", self._key)
except Exception as e:
logger.warning("Failed to release Redis lock for key %s: %s", self._key, e)
finally:
self._acquired = False
async def _run_watchdog(self) -> None:
"""Renew the lease in the background, aborting the owner if it is lost.
Sleeps for roughly a third of the TTL between renewals and, while the
lock is held and the ``max_renewals`` budget is not exhausted, evaluates
:data:`RENEW_SCRIPT` (a token-checked ``EXPIRE``) to extend the key's
lease in Redis. If a renewal reports that ownership was lost, or the
renewal limit is reached, it clears ``self._acquired`` and — when
``abort_on_loss`` is set — cancels the recorded owner task so a stale
holder cannot keep mutating shared state. Renewal failures are logged and
retried; :class:`asyncio.CancelledError` (from :meth:`release` cancelling
the task) ends the loop quietly.
Started only by :meth:`acquire` as a background task when ``auto_renew``
is enabled; also driven directly by
``tests/core/test_context_assembly_hardening.py``.
Returns:
None.
"""
sleep_time = max(0.01, self._ttl / 3.0)
try:
while self._acquired and (
self._max_renewals <= 0 or self._renewals < self._max_renewals
):
await asyncio.sleep(sleep_time)
if not self._acquired:
break
try:
renewed = await self._redis.eval(
RENEW_SCRIPT,
1,
self._key,
self._token,
str(self._ttl),
)
if not renewed:
logger.warning(
"Lock watchdog lost ownership of key %s — stopping renewal",
self._key,
)
self._acquired = False
if getattr(self, "_abort_on_loss", True) and self._owner_task:
logger.error("Lock stolen, aborting owner task")
self._owner_task.cancel()
break
self._renewals += 1
logger.debug("Lock watchdog renewed key: %s (renewal %d/%d)",
self._key, self._renewals, self._max_renewals)
except Exception as e:
logger.warning("Lock watchdog failed to renew key %s: %s", self._key, e)
if self._max_renewals > 0 and self._renewals >= self._max_renewals:
logger.warning("Lock watchdog reached maximum renewal limit (%d) for key: %s",
self._max_renewals, self._key)
self._acquired = False
if getattr(self, "_abort_on_loss", True) and self._owner_task:
logger.error("Lock max renewals reached, aborting owner task")
self._owner_task.cancel()
except asyncio.CancelledError:
pass
[docs]
async def acquire_blocking(self, poll_interval: float = 0.1, timeout: Optional[float] = None) -> bool:
"""Acquire the lock, polling until it is free or the timeout elapses.
Repeatedly calls :meth:`acquire` (with contention logging suppressed so
only the first wait is announced), sleeping ``poll_interval`` seconds
between attempts, until ownership is won or the optional ``timeout`` is
exceeded. Used where a caller must process strictly in order rather than
skip on contention, namely the per-channel ``core.stream_consumer`` and
``core.outbound_consumer`` loops; also exercised by
``tests/core/test_concurrent_consumers.py`` and
``tests/core/test_distributed_lock.py``.
Args:
poll_interval: Seconds to wait between acquisition attempts.
timeout: Maximum seconds to keep polling, or ``None`` to wait forever.
Returns:
bool: ``True`` once the lock is held, ``False`` if the timeout
elapsed first.
"""
start_time = asyncio.get_running_loop().time()
first = True
while True:
if await self.acquire(raise_contention_log=False):
return True
if first:
logger.info("Lock contention for key: %s, waiting for release...", self._key)
first = False
if timeout is not None and (asyncio.get_running_loop().time() - start_time) >= timeout:
logger.warning("Lock acquisition timed out for key: %s", self._key)
return False
await asyncio.sleep(poll_interval)
[docs]
async def __aenter__(self) -> "DistributedLock":
"""Enter the lock as an async context manager.
Performs a single non-blocking :meth:`acquire` and returns the lock.
Note this does not block on contention, so callers using
``async with`` should treat the lock as best-effort unless they check
``self._acquired``; ``plugins.swarm_engine.state_manager`` uses this
form for its ``sg:lock:swarm:{slug}`` lock.
Returns:
DistributedLock: This instance, for binding in the ``as`` clause.
"""
await self.acquire()
return self
[docs]
async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> bool:
"""Exit the async context manager by releasing the lock.
Calls :meth:`release` to free the key (or decrement the re-entrant
count) regardless of whether the ``async with`` body raised, and does
not suppress exceptions.
Args:
exc_type: Exception type if the body raised, else ``None``.
exc_val: Exception instance if the body raised, else ``None``.
exc_tb: Traceback if the body raised, else ``None``.
Returns:
bool: Always ``False`` so any in-flight exception propagates.
"""
await self.release()
return False
[docs]
class FencedKGWriter:
"""Database-write wrapper that refuses to write unless its lock is held.
Pairs a database client (e.g. a FalkorDB/knowledge-graph client) with a
:class:`DistributedLock` and gates every :meth:`write` on that lock still
being acquired, so a holder whose lease was stolen (detected by the lock
watchdog) cannot race a newer owner and corrupt graph state. The lock's
fencing token is exposed via :attr:`token` for stores that record it
alongside the write.
No application call sites were found; the wrapper is currently exercised only
by ``tests/core/migration/test_distributed_lock_hardened.py``.
"""
[docs]
def __init__(self, db_client: Any, lock: DistributedLock):
"""Bind a database client to a lock for fenced writes.
Stores the underlying database client and the :class:`DistributedLock`
whose ownership gates every write performed through this wrapper. No
I/O occurs here.
Args:
db_client: A database client exposing an async ``query`` method
(e.g. a FalkorDB/knowledge-graph client).
lock: The lock whose held/owned state must be valid before writes
are allowed.
"""
self._db = db_client
self._lock = lock
@property
def token(self) -> str:
"""Return the fencing token of the bound lock.
Exposes the underlying lock's UUID4 token so callers (or a downstream
store) can attach it to writes for fencing checks.
Returns:
str: The bound lock's fencing token.
"""
return self._lock._token
[docs]
async def write(self, query: str, **kwargs) -> Any:
"""Execute a database write only while the fencing lock is held.
Verifies that the bound lock is still acquired before forwarding the
query, refusing the write outright if ownership has been lost (e.g.
the watchdog detected a stolen lease). This guards knowledge-graph
mutations against a stale holder racing a newer owner.
Delegates the actual write to ``self._db.query(query, **kwargs)`` on
the wrapped database client when the gate passes. No internal callers
were found in application code; usage is currently exercised only by
``tests/core/migration/test_distributed_lock_hardened.py``.
Args:
query: The query/statement to execute against the database.
**kwargs: Additional parameters forwarded verbatim to the
client's ``query`` method.
Returns:
Any: Whatever the underlying ``db_client.query`` call returns.
Raises:
RuntimeError: If the bound lock is not currently held.
"""
if not self._lock._acquired:
raise RuntimeError("Fencing token validation failed: Lock not held")
return await self._db.query(query, **kwargs)