core.distributed_lock module

Watchdog-renewed distributed lock for per-channel serialization.

DistributedLock implements a token-based Redis lock (sg:lock:channel:{id}) released and renewed via Lua scripts so only the true owner can act. A background watchdog auto-renews the lease for long-running work and can abort the holder if the lease is lost, which the inference worker uses to guarantee that messages for one channel are processed serially without corrupting NCM/history state.

class core.distributed_lock.DistributedLock(redis, key, ttl=30, auto_renew=False, max_renewals=5, abort_on_loss=True)[source]

Bases: object

Token-fenced Redis lock with optional watchdog lease renewal.

Implements a single-key mutual-exclusion lock over Redis using SET NX EX to claim ownership and a per-instance UUID4 fencing token so only the true owner can renew or release the key (both done via the RELEASE_SCRIPT and RENEW_SCRIPT Lua scripts). When auto_renew is enabled a background watchdog keeps the lease alive for long-running work and, on abort_on_loss, cancels the owning asyncio task if the lease is lost so a stale holder cannot corrupt shared state. Acquisition is re-entrant within one instance, supports blocking polling, and works as an async context manager; when no Redis client is supplied it degrades to a purely local lock.

Instantiated across the codebase for per-channel and single-flight serialization, including core.stream_consumer and core.outbound_consumer (message ordering), ncm_variant_cache, ops_planner, star_self_mirror, sword.monitor, and plugins.swarm_engine.state_manager.

Parameters:
__init__(redis, key, ttl=30, auto_renew=False, max_renewals=5, abort_on_loss=True)[source]

Initialize a token-based Redis lock for a single key.

Generates a unique fencing token (a UUID4) that identifies this lock instance as the legitimate owner and configures the lease TTL, watchdog auto-renewal policy, and abort-on-loss behavior. No Redis I/O happens here; the key is only claimed later in acquire().

The constructed lock holds a reference to the supplied async Redis client but performs no network calls. It is instantiated throughout the codebase to serialize work per logical resource, e.g. core.stream_consumer (key sg:lock:channel:{channel_id}) and core.outbound_consumer (key sg:lock:outbound:{platform}:{channel_id}) for per-channel message ordering, as well as ncm_variant_cache, ops_planner, star_self_mirror, sword.monitor, and plugins.swarm_engine.state_manager for their respective single-flight generation locks.

Parameters:
  • redis (Any) – An async Redis client (or None for a degraded local-only fallback in which acquisition always succeeds without touching Redis).

  • key (str) – The Redis key the lock occupies (e.g. sg:lock:channel:{channel_id}).

  • ttl (int) – Lease time-to-live in seconds applied to the key on SET NX EX and on each watchdog renewal.

  • auto_renew (bool) – When True, a background watchdog task is started on acquire to keep the lease alive for long-running work.

  • max_renewals (int) – Maximum number of watchdog renewals before the lease is voluntarily given up; <= 0 means unlimited (used by stream_consumer so long-running turns never expire).

  • abort_on_loss (bool) – When True, the watchdog cancels the owning asyncio task if the lease is lost or the renewal limit is reached, preventing a stale holder from corrupting state.

keys_released_count()[source]

Return how many times this lock physically deleted its Redis key.

Reports self._released_count, which release() increments only on a real compare-and-delete against Redis (not on re-entrant or local releases). Primarily an observability hook for tests asserting that a re-entrant hold does not delete the key prematurely; read by tests/adversarial/test_lock_watchdog.py.

Returns:

The number of physical key deletions performed by this instance.

Return type:

int

async acquire(raise_contention_log=True)[source]

Attempt to claim the lock once, without blocking.

Tries to atomically take ownership of the key. Acquisition is re-entrant: if this instance already holds the lock, the internal acquisition count is incremented and the call succeeds immediately without contacting Redis. On a fresh acquire, the watchdog is started when auto_renew is enabled.

Interacts with Redis via SET key token NX EX ttl to claim the key only when it is unheld; on success it records the owning asyncio task (for later abort-on-loss) and, when auto_renew is set, schedules _run_watchdog() as a background task that renews the lease. When self._redis is None it falls back to a purely local acquire. Redis errors are caught and logged, leaving the lock unacquired.

Called by the non-blocking single-flight paths in ncm_variant_cache (variant and embedding generation locks), ops_planner, star_self_mirror, and sword.monitor, each of which skip their work when acquisition fails. It is also the primitive that acquire_blocking() polls and that __aenter__() calls.

Parameters:

raise_contention_log (bool) – When True, log lost contention at INFO (“skipping execution”); when False, log at DEBUG (“retrying”) so blocking pollers stay quiet.

Returns:

True if the lock is now held by this instance (including re-entrant and local-fallback cases), False on contention or a Redis error.

Return type:

bool

async release()[source]

Release the lock, honoring re-entrancy and stopping the watchdog.

No-ops when the lock is not held. For a re-entrant hold (acquisition count greater than one), this only decrements the count and returns without touching Redis, so the key is only freed on the outermost release. On the final release it cancels any running watchdog task, clears owner state, and deletes the key.

Interacts with Redis by evaluating RELEASE_SCRIPT (a compare-and-delete Lua script keyed on the fencing token) so the key is removed only if this instance still owns it, never clobbering a lock that was stolen and re-taken by another holder; the physical release is tallied in self._released_count (exposed via keys_released_count()). When self._redis is None it simply clears local state. Redis errors are caught and logged, and self._acquired is cleared in all cases.

Called from the finally blocks that pair with acquisition in core.stream_consumer and core.outbound_consumer (per-channel serialization), as well as ncm_variant_cache, ops_planner, star_self_mirror, and sword.monitor; it is also invoked by __aexit__() for async with usage.

Return type:

None

async acquire_blocking(poll_interval=0.1, timeout=None)[source]

Acquire the lock, polling until it is free or the timeout elapses.

Repeatedly calls acquire() (with contention logging suppressed so only the first wait is announced), sleeping poll_interval seconds between attempts, until ownership is won or the optional timeout is exceeded. Used where a caller must process strictly in order rather than skip on contention, namely the per-channel core.stream_consumer and core.outbound_consumer loops; also exercised by tests/core/test_concurrent_consumers.py and tests/core/test_distributed_lock.py.

Parameters:
  • poll_interval (float) – Seconds to wait between acquisition attempts.

  • timeout (Optional[float]) – Maximum seconds to keep polling, or None to wait forever.

Returns:

True once the lock is held, False if the timeout elapsed first.

Return type:

bool

async __aenter__()[source]

Enter the lock as an async context manager.

Performs a single non-blocking acquire() and returns the lock. Note this does not block on contention, so callers using async with should treat the lock as best-effort unless they check self._acquired; plugins.swarm_engine.state_manager uses this form for its sg:lock:swarm:{slug} lock.

Returns:

This instance, for binding in the as clause.

Return type:

DistributedLock

async __aexit__(exc_type, exc_val, exc_tb)[source]

Exit the async context manager by releasing the lock.

Calls release() to free the key (or decrement the re-entrant count) regardless of whether the async with body raised, and does not suppress exceptions.

Parameters:
  • exc_type (Any) – Exception type if the body raised, else None.

  • exc_val (Any) – Exception instance if the body raised, else None.

  • exc_tb (Any) – Traceback if the body raised, else None.

Returns:

Always False so any in-flight exception propagates.

Return type:

bool

class core.distributed_lock.FencedKGWriter(db_client, lock)[source]

Bases: object

Database-write wrapper that refuses to write unless its lock is held.

Pairs a database client (e.g. a FalkorDB/knowledge-graph client) with a DistributedLock and gates every write() on that lock still being acquired, so a holder whose lease was stolen (detected by the lock watchdog) cannot race a newer owner and corrupt graph state. The lock’s fencing token is exposed via token for stores that record it alongside the write.

No application call sites were found; the wrapper is currently exercised only by tests/core/migration/test_distributed_lock_hardened.py.

Parameters:
__init__(db_client, lock)[source]

Bind a database client to a lock for fenced writes.

Stores the underlying database client and the DistributedLock whose ownership gates every write performed through this wrapper. No I/O occurs here.

Parameters:
  • db_client (Any) – A database client exposing an async query method (e.g. a FalkorDB/knowledge-graph client).

  • lock (DistributedLock) – The lock whose held/owned state must be valid before writes are allowed.

property token: str

Return the fencing token of the bound lock.

Exposes the underlying lock’s UUID4 token so callers (or a downstream store) can attach it to writes for fencing checks.

Returns:

The bound lock’s fencing token.

Return type:

str

async write(query, **kwargs)[source]

Execute a database write only while the fencing lock is held.

Verifies that the bound lock is still acquired before forwarding the query, refusing the write outright if ownership has been lost (e.g. the watchdog detected a stolen lease). This guards knowledge-graph mutations against a stale holder racing a newer owner.

Delegates the actual write to self._db.query(query, **kwargs) on the wrapped database client when the gate passes. No internal callers were found in application code; usage is currently exercised only by tests/core/migration/test_distributed_lock_hardened.py.

Parameters:
  • query (str) – The query/statement to execute against the database.

  • **kwargs – Additional parameters forwarded verbatim to the client’s query method.

Returns:

Whatever the underlying db_client.query call returns.

Return type:

Any

Raises:

RuntimeError – If the bound lock is not currently held.