Source code for core.distributed_lock

"""Watchdog-renewed distributed lock for per-channel serialization.

:class:`DistributedLock` implements a token-based Redis lock
(``sg:lock:channel:{id}``) released and renewed via Lua scripts so
only the true owner can act. A background watchdog auto-renews the
lease for long-running work and can abort the holder if the lease is
lost, which the inference worker uses to guarantee that messages for
one channel are processed serially without corrupting NCM/history
state.
"""

import uuid
import logging
import asyncio
from typing import Any, Optional

logger = logging.getLogger("stargazer.lock")

# Lua script to release the lock only if the token matches
RELEASE_SCRIPT = """
if redis.call("get", KEYS[1]) == ARGV[1] then
    return redis.call("del", KEYS[1])
else
    return 0
end
"""

# Lua script to renew the lock only if the token still owns it
RENEW_SCRIPT = """
if redis.call("get", KEYS[1]) == ARGV[1] then
    return redis.call("expire", KEYS[1], tonumber(ARGV[2]))
else
    return 0
end
"""

[docs] class DistributedLock: """Token-fenced Redis lock with optional watchdog lease renewal. Implements a single-key mutual-exclusion lock over Redis using ``SET NX EX`` to claim ownership and a per-instance UUID4 fencing token so only the true owner can renew or release the key (both done via the :data:`RELEASE_SCRIPT` and :data:`RENEW_SCRIPT` Lua scripts). When ``auto_renew`` is enabled a background watchdog keeps the lease alive for long-running work and, on ``abort_on_loss``, cancels the owning asyncio task if the lease is lost so a stale holder cannot corrupt shared state. Acquisition is re-entrant within one instance, supports blocking polling, and works as an async context manager; when no Redis client is supplied it degrades to a purely local lock. Instantiated across the codebase for per-channel and single-flight serialization, including ``core.stream_consumer`` and ``core.outbound_consumer`` (message ordering), ``ncm_variant_cache``, ``ops_planner``, ``star_self_mirror``, ``sword.monitor``, and ``plugins.swarm_engine.state_manager``. """
[docs] def __init__( self, redis: Any, key: str, ttl: int = 30, auto_renew: bool = False, max_renewals: int = 5, abort_on_loss: bool = True ): """Initialize a token-based Redis lock for a single key. Generates a unique fencing token (a UUID4) that identifies this lock instance as the legitimate owner and configures the lease TTL, watchdog auto-renewal policy, and abort-on-loss behavior. No Redis I/O happens here; the key is only claimed later in :meth:`acquire`. The constructed lock holds a reference to the supplied async Redis client but performs no network calls. It is instantiated throughout the codebase to serialize work per logical resource, e.g. ``core.stream_consumer`` (key ``sg:lock:channel:{channel_id}``) and ``core.outbound_consumer`` (key ``sg:lock:outbound:{platform}:{channel_id}``) for per-channel message ordering, as well as ``ncm_variant_cache``, ``ops_planner``, ``star_self_mirror``, ``sword.monitor``, and ``plugins.swarm_engine.state_manager`` for their respective single-flight generation locks. Args: redis: An async Redis client (or ``None`` for a degraded local-only fallback in which acquisition always succeeds without touching Redis). key: The Redis key the lock occupies (e.g. ``sg:lock:channel:{channel_id}``). ttl: Lease time-to-live in seconds applied to the key on ``SET NX EX`` and on each watchdog renewal. auto_renew: When ``True``, a background watchdog task is started on acquire to keep the lease alive for long-running work. max_renewals: Maximum number of watchdog renewals before the lease is voluntarily given up; ``<= 0`` means unlimited (used by ``stream_consumer`` so long-running turns never expire). abort_on_loss: When ``True``, the watchdog cancels the owning asyncio task if the lease is lost or the renewal limit is reached, preventing a stale holder from corrupting state. """ self._redis = redis self._key = key self._ttl = ttl self._auto_renew = auto_renew self._max_renewals = max_renewals self._abort_on_loss = abort_on_loss self._token = str(uuid.uuid4()) self._acquired = False # Watchdog & Re-entrancy states self._acquisition_count = 0 self._watchdog_task: Optional[asyncio.Task] = None self._owner_task: Optional[asyncio.Task] = None self._renewals = 0 self._released_count = 0
[docs] def keys_released_count(self) -> int: """Return how many times this lock physically deleted its Redis key. Reports ``self._released_count``, which :meth:`release` increments only on a real compare-and-delete against Redis (not on re-entrant or local releases). Primarily an observability hook for tests asserting that a re-entrant hold does not delete the key prematurely; read by ``tests/adversarial/test_lock_watchdog.py``. Returns: int: The number of physical key deletions performed by this instance. """ return self._released_count
[docs] async def acquire(self, raise_contention_log: bool = True) -> bool: """Attempt to claim the lock once, without blocking. Tries to atomically take ownership of the key. Acquisition is re-entrant: if this instance already holds the lock, the internal acquisition count is incremented and the call succeeds immediately without contacting Redis. On a fresh acquire, the watchdog is started when ``auto_renew`` is enabled. Interacts with Redis via ``SET key token NX EX ttl`` to claim the key only when it is unheld; on success it records the owning asyncio task (for later abort-on-loss) and, when ``auto_renew`` is set, schedules :meth:`_run_watchdog` as a background task that renews the lease. When ``self._redis`` is ``None`` it falls back to a purely local acquire. Redis errors are caught and logged, leaving the lock unacquired. Called by the non-blocking single-flight paths in ``ncm_variant_cache`` (variant and embedding generation locks), ``ops_planner``, ``star_self_mirror``, and ``sword.monitor``, each of which skip their work when acquisition fails. It is also the primitive that :meth:`acquire_blocking` polls and that :meth:`__aenter__` calls. Args: raise_contention_log: When ``True``, log lost contention at INFO ("skipping execution"); when ``False``, log at DEBUG ("retrying") so blocking pollers stay quiet. Returns: bool: ``True`` if the lock is now held by this instance (including re-entrant and local-fallback cases), ``False`` on contention or a Redis error. """ # Re-entrant acquisition check if self._acquired: self._acquisition_count += 1 logger.debug("Re-entered lock: %s (count: %d)", self._key, self._acquisition_count) return True if not self._redis: # Handle local fallback when Redis is not available self._acquired = True self._acquisition_count = 1 return True try: result = await self._redis.set(self._key, self._token, nx=True, ex=self._ttl) # redis.set returns True (or b"OK") on success, None or False on failure self._acquired = result is not None and result is not False if self._acquired: self._acquisition_count = 1 self._renewals = 0 self._owner_task = asyncio.current_task() logger.debug("Acquired lock: %s", self._key) # Start watchdog lease renewal if enabled if self._auto_renew: logger.debug("Starting watchdog lease auto-renewal for key: %s (TTL: %d, max renewals: %d)", self._key, self._ttl, self._max_renewals) self._watchdog_task = asyncio.create_task(self._run_watchdog()) else: if raise_contention_log: logger.info("Lock contention for key: %s, skipping execution", self._key) else: logger.debug("Lock contention for key: %s, retrying...", self._key) except Exception as e: logger.warning("Failed to acquire Redis lock for key %s: %s", self._key, e) self._acquired = False return self._acquired
[docs] async def release(self) -> None: """Release the lock, honoring re-entrancy and stopping the watchdog. No-ops when the lock is not held. For a re-entrant hold (acquisition count greater than one), this only decrements the count and returns without touching Redis, so the key is only freed on the outermost release. On the final release it cancels any running watchdog task, clears owner state, and deletes the key. Interacts with Redis by evaluating :data:`RELEASE_SCRIPT` (a compare-and-delete Lua script keyed on the fencing token) so the key is removed only if this instance still owns it, never clobbering a lock that was stolen and re-taken by another holder; the physical release is tallied in ``self._released_count`` (exposed via :meth:`keys_released_count`). When ``self._redis`` is ``None`` it simply clears local state. Redis errors are caught and logged, and ``self._acquired`` is cleared in all cases. Called from the ``finally`` blocks that pair with acquisition in ``core.stream_consumer`` and ``core.outbound_consumer`` (per-channel serialization), as well as ``ncm_variant_cache``, ``ops_planner``, ``star_self_mirror``, and ``sword.monitor``; it is also invoked by :meth:`__aexit__` for ``async with`` usage. """ if not self._acquired: return # Re-entrant release check if self._acquisition_count > 1: self._acquisition_count -= 1 logger.debug("Released re-entered lock: %s (remaining: %d)", self._key, self._acquisition_count) return # Normal release self._acquisition_count = 0 self._owner_task = None # Stop watchdog task if active if self._watchdog_task is not None: self._watchdog_task.cancel() self._watchdog_task = None logger.debug("Terminated watchdog lease auto-renewal for key: %s", self._key) if not self._redis: self._acquired = False return try: await self._redis.eval(RELEASE_SCRIPT, 1, self._key, self._token) self._released_count += 1 logger.debug("Released lock: %s", self._key) except Exception as e: logger.warning("Failed to release Redis lock for key %s: %s", self._key, e) finally: self._acquired = False
async def _run_watchdog(self) -> None: """Renew the lease in the background, aborting the owner if it is lost. Sleeps for roughly a third of the TTL between renewals and, while the lock is held and the ``max_renewals`` budget is not exhausted, evaluates :data:`RENEW_SCRIPT` (a token-checked ``EXPIRE``) to extend the key's lease in Redis. If a renewal reports that ownership was lost, or the renewal limit is reached, it clears ``self._acquired`` and — when ``abort_on_loss`` is set — cancels the recorded owner task so a stale holder cannot keep mutating shared state. Renewal failures are logged and retried; :class:`asyncio.CancelledError` (from :meth:`release` cancelling the task) ends the loop quietly. Started only by :meth:`acquire` as a background task when ``auto_renew`` is enabled; also driven directly by ``tests/core/test_context_assembly_hardening.py``. Returns: None. """ sleep_time = max(0.01, self._ttl / 3.0) try: while self._acquired and ( self._max_renewals <= 0 or self._renewals < self._max_renewals ): await asyncio.sleep(sleep_time) if not self._acquired: break try: renewed = await self._redis.eval( RENEW_SCRIPT, 1, self._key, self._token, str(self._ttl), ) if not renewed: logger.warning( "Lock watchdog lost ownership of key %s — stopping renewal", self._key, ) self._acquired = False if getattr(self, "_abort_on_loss", True) and self._owner_task: logger.error("Lock stolen, aborting owner task") self._owner_task.cancel() break self._renewals += 1 logger.debug("Lock watchdog renewed key: %s (renewal %d/%d)", self._key, self._renewals, self._max_renewals) except Exception as e: logger.warning("Lock watchdog failed to renew key %s: %s", self._key, e) if self._max_renewals > 0 and self._renewals >= self._max_renewals: logger.warning("Lock watchdog reached maximum renewal limit (%d) for key: %s", self._max_renewals, self._key) self._acquired = False if getattr(self, "_abort_on_loss", True) and self._owner_task: logger.error("Lock max renewals reached, aborting owner task") self._owner_task.cancel() except asyncio.CancelledError: pass
[docs] async def acquire_blocking(self, poll_interval: float = 0.1, timeout: Optional[float] = None) -> bool: """Acquire the lock, polling until it is free or the timeout elapses. Repeatedly calls :meth:`acquire` (with contention logging suppressed so only the first wait is announced), sleeping ``poll_interval`` seconds between attempts, until ownership is won or the optional ``timeout`` is exceeded. Used where a caller must process strictly in order rather than skip on contention, namely the per-channel ``core.stream_consumer`` and ``core.outbound_consumer`` loops; also exercised by ``tests/core/test_concurrent_consumers.py`` and ``tests/core/test_distributed_lock.py``. Args: poll_interval: Seconds to wait between acquisition attempts. timeout: Maximum seconds to keep polling, or ``None`` to wait forever. Returns: bool: ``True`` once the lock is held, ``False`` if the timeout elapsed first. """ start_time = asyncio.get_running_loop().time() first = True while True: if await self.acquire(raise_contention_log=False): return True if first: logger.info("Lock contention for key: %s, waiting for release...", self._key) first = False if timeout is not None and (asyncio.get_running_loop().time() - start_time) >= timeout: logger.warning("Lock acquisition timed out for key: %s", self._key) return False await asyncio.sleep(poll_interval)
[docs] async def __aenter__(self) -> "DistributedLock": """Enter the lock as an async context manager. Performs a single non-blocking :meth:`acquire` and returns the lock. Note this does not block on contention, so callers using ``async with`` should treat the lock as best-effort unless they check ``self._acquired``; ``plugins.swarm_engine.state_manager`` uses this form for its ``sg:lock:swarm:{slug}`` lock. Returns: DistributedLock: This instance, for binding in the ``as`` clause. """ await self.acquire() return self
[docs] async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> bool: """Exit the async context manager by releasing the lock. Calls :meth:`release` to free the key (or decrement the re-entrant count) regardless of whether the ``async with`` body raised, and does not suppress exceptions. Args: exc_type: Exception type if the body raised, else ``None``. exc_val: Exception instance if the body raised, else ``None``. exc_tb: Traceback if the body raised, else ``None``. Returns: bool: Always ``False`` so any in-flight exception propagates. """ await self.release() return False
[docs] class FencedKGWriter: """Database-write wrapper that refuses to write unless its lock is held. Pairs a database client (e.g. a FalkorDB/knowledge-graph client) with a :class:`DistributedLock` and gates every :meth:`write` on that lock still being acquired, so a holder whose lease was stolen (detected by the lock watchdog) cannot race a newer owner and corrupt graph state. The lock's fencing token is exposed via :attr:`token` for stores that record it alongside the write. No application call sites were found; the wrapper is currently exercised only by ``tests/core/migration/test_distributed_lock_hardened.py``. """
[docs] def __init__(self, db_client: Any, lock: DistributedLock): """Bind a database client to a lock for fenced writes. Stores the underlying database client and the :class:`DistributedLock` whose ownership gates every write performed through this wrapper. No I/O occurs here. Args: db_client: A database client exposing an async ``query`` method (e.g. a FalkorDB/knowledge-graph client). lock: The lock whose held/owned state must be valid before writes are allowed. """ self._db = db_client self._lock = lock
@property def token(self) -> str: """Return the fencing token of the bound lock. Exposes the underlying lock's UUID4 token so callers (or a downstream store) can attach it to writes for fencing checks. Returns: str: The bound lock's fencing token. """ return self._lock._token
[docs] async def write(self, query: str, **kwargs) -> Any: """Execute a database write only while the fencing lock is held. Verifies that the bound lock is still acquired before forwarding the query, refusing the write outright if ownership has been lost (e.g. the watchdog detected a stolen lease). This guards knowledge-graph mutations against a stale holder racing a newer owner. Delegates the actual write to ``self._db.query(query, **kwargs)`` on the wrapped database client when the gate passes. No internal callers were found in application code; usage is currently exercised only by ``tests/core/migration/test_distributed_lock_hardened.py``. Args: query: The query/statement to execute against the database. **kwargs: Additional parameters forwarded verbatim to the client's ``query`` method. Returns: Any: Whatever the underlying ``db_client.query`` call returns. Raises: RuntimeError: If the bound lock is not currently held. """ if not self._lock._acquired: raise RuntimeError("Fencing token validation failed: Lock not held") return await self._db.query(query, **kwargs)