core.distributed_lock module
Watchdog-renewed distributed lock for per-channel serialization.
DistributedLock implements a token-based Redis lock
(sg:lock:channel:{id}) released and renewed via Lua scripts so
only the true owner can act. A background watchdog auto-renews the
lease for long-running work and can abort the holder if the lease is
lost, which the inference worker uses to guarantee that messages for
one channel are processed serially without corrupting NCM/history
state.
- class core.distributed_lock.DistributedLock(redis, key, ttl=30, auto_renew=False, max_renewals=5, abort_on_loss=True)[source]
Bases:
objectToken-fenced Redis lock with optional watchdog lease renewal.
Implements a single-key mutual-exclusion lock over Redis using
SET NX EXto claim ownership and a per-instance UUID4 fencing token so only the true owner can renew or release the key (both done via theRELEASE_SCRIPTandRENEW_SCRIPTLua scripts). Whenauto_renewis enabled a background watchdog keeps the lease alive for long-running work and, onabort_on_loss, cancels the owning asyncio task if the lease is lost so a stale holder cannot corrupt shared state. Acquisition is re-entrant within one instance, supports blocking polling, and works as an async context manager; when no Redis client is supplied it degrades to a purely local lock.Instantiated across the codebase for per-channel and single-flight serialization, including
core.stream_consumerandcore.outbound_consumer(message ordering),ncm_variant_cache,ops_planner,star_self_mirror,sword.monitor, andplugins.swarm_engine.state_manager.- Parameters:
- __init__(redis, key, ttl=30, auto_renew=False, max_renewals=5, abort_on_loss=True)[source]
Initialize a token-based Redis lock for a single key.
Generates a unique fencing token (a UUID4) that identifies this lock instance as the legitimate owner and configures the lease TTL, watchdog auto-renewal policy, and abort-on-loss behavior. No Redis I/O happens here; the key is only claimed later in
acquire().The constructed lock holds a reference to the supplied async Redis client but performs no network calls. It is instantiated throughout the codebase to serialize work per logical resource, e.g.
core.stream_consumer(keysg:lock:channel:{channel_id}) andcore.outbound_consumer(keysg:lock:outbound:{platform}:{channel_id}) for per-channel message ordering, as well asncm_variant_cache,ops_planner,star_self_mirror,sword.monitor, andplugins.swarm_engine.state_managerfor their respective single-flight generation locks.- Parameters:
redis (
Any) – An async Redis client (orNonefor a degraded local-only fallback in which acquisition always succeeds without touching Redis).key (
str) – The Redis key the lock occupies (e.g.sg:lock:channel:{channel_id}).ttl (
int) – Lease time-to-live in seconds applied to the key onSET NX EXand on each watchdog renewal.auto_renew (
bool) – WhenTrue, a background watchdog task is started on acquire to keep the lease alive for long-running work.max_renewals (
int) – Maximum number of watchdog renewals before the lease is voluntarily given up;<= 0means unlimited (used bystream_consumerso long-running turns never expire).abort_on_loss (
bool) – WhenTrue, the watchdog cancels the owning asyncio task if the lease is lost or the renewal limit is reached, preventing a stale holder from corrupting state.
- keys_released_count()[source]
Return how many times this lock physically deleted its Redis key.
Reports
self._released_count, whichrelease()increments only on a real compare-and-delete against Redis (not on re-entrant or local releases). Primarily an observability hook for tests asserting that a re-entrant hold does not delete the key prematurely; read bytests/adversarial/test_lock_watchdog.py.- Returns:
The number of physical key deletions performed by this instance.
- Return type:
- async acquire(raise_contention_log=True)[source]
Attempt to claim the lock once, without blocking.
Tries to atomically take ownership of the key. Acquisition is re-entrant: if this instance already holds the lock, the internal acquisition count is incremented and the call succeeds immediately without contacting Redis. On a fresh acquire, the watchdog is started when
auto_renewis enabled.Interacts with Redis via
SET key token NX EX ttlto claim the key only when it is unheld; on success it records the owning asyncio task (for later abort-on-loss) and, whenauto_renewis set, schedules_run_watchdog()as a background task that renews the lease. Whenself._redisisNoneit falls back to a purely local acquire. Redis errors are caught and logged, leaving the lock unacquired.Called by the non-blocking single-flight paths in
ncm_variant_cache(variant and embedding generation locks),ops_planner,star_self_mirror, andsword.monitor, each of which skip their work when acquisition fails. It is also the primitive thatacquire_blocking()polls and that__aenter__()calls.- Parameters:
raise_contention_log (
bool) – WhenTrue, log lost contention at INFO (“skipping execution”); whenFalse, log at DEBUG (“retrying”) so blocking pollers stay quiet.- Returns:
Trueif the lock is now held by this instance (including re-entrant and local-fallback cases),Falseon contention or a Redis error.- Return type:
- async release()[source]
Release the lock, honoring re-entrancy and stopping the watchdog.
No-ops when the lock is not held. For a re-entrant hold (acquisition count greater than one), this only decrements the count and returns without touching Redis, so the key is only freed on the outermost release. On the final release it cancels any running watchdog task, clears owner state, and deletes the key.
Interacts with Redis by evaluating
RELEASE_SCRIPT(a compare-and-delete Lua script keyed on the fencing token) so the key is removed only if this instance still owns it, never clobbering a lock that was stolen and re-taken by another holder; the physical release is tallied inself._released_count(exposed viakeys_released_count()). Whenself._redisisNoneit simply clears local state. Redis errors are caught and logged, andself._acquiredis cleared in all cases.Called from the
finallyblocks that pair with acquisition incore.stream_consumerandcore.outbound_consumer(per-channel serialization), as well asncm_variant_cache,ops_planner,star_self_mirror, andsword.monitor; it is also invoked by__aexit__()forasync withusage.- Return type:
- async acquire_blocking(poll_interval=0.1, timeout=None)[source]
Acquire the lock, polling until it is free or the timeout elapses.
Repeatedly calls
acquire()(with contention logging suppressed so only the first wait is announced), sleepingpoll_intervalseconds between attempts, until ownership is won or the optionaltimeoutis exceeded. Used where a caller must process strictly in order rather than skip on contention, namely the per-channelcore.stream_consumerandcore.outbound_consumerloops; also exercised bytests/core/test_concurrent_consumers.pyandtests/core/test_distributed_lock.py.
- async __aenter__()[source]
Enter the lock as an async context manager.
Performs a single non-blocking
acquire()and returns the lock. Note this does not block on contention, so callers usingasync withshould treat the lock as best-effort unless they checkself._acquired;plugins.swarm_engine.state_manageruses this form for itssg:lock:swarm:{slug}lock.- Returns:
This instance, for binding in the
asclause.- Return type:
- class core.distributed_lock.FencedKGWriter(db_client, lock)[source]
Bases:
objectDatabase-write wrapper that refuses to write unless its lock is held.
Pairs a database client (e.g. a FalkorDB/knowledge-graph client) with a
DistributedLockand gates everywrite()on that lock still being acquired, so a holder whose lease was stolen (detected by the lock watchdog) cannot race a newer owner and corrupt graph state. The lock’s fencing token is exposed viatokenfor stores that record it alongside the write.No application call sites were found; the wrapper is currently exercised only by
tests/core/migration/test_distributed_lock_hardened.py.- Parameters:
db_client (Any)
lock (DistributedLock)
- __init__(db_client, lock)[source]
Bind a database client to a lock for fenced writes.
Stores the underlying database client and the
DistributedLockwhose ownership gates every write performed through this wrapper. No I/O occurs here.- Parameters:
db_client (
Any) – A database client exposing an asyncquerymethod (e.g. a FalkorDB/knowledge-graph client).lock (
DistributedLock) – The lock whose held/owned state must be valid before writes are allowed.
- property token: str
Return the fencing token of the bound lock.
Exposes the underlying lock’s UUID4 token so callers (or a downstream store) can attach it to writes for fencing checks.
- Returns:
The bound lock’s fencing token.
- Return type:
- async write(query, **kwargs)[source]
Execute a database write only while the fencing lock is held.
Verifies that the bound lock is still acquired before forwarding the query, refusing the write outright if ownership has been lost (e.g. the watchdog detected a stolen lease). This guards knowledge-graph mutations against a stale holder racing a newer owner.
Delegates the actual write to
self._db.query(query, **kwargs)on the wrapped database client when the gate passes. No internal callers were found in application code; usage is currently exercised only bytests/core/migration/test_distributed_lock_hardened.py.- Parameters:
query (
str) – The query/statement to execute against the database.**kwargs – Additional parameters forwarded verbatim to the client’s
querymethod.
- Returns:
Whatever the underlying
db_client.querycall returns.- Return type:
- Raises:
RuntimeError – If the bound lock is not currently held.