core.state_machine module
Per-operation state machine with crash-safe checkpoints.
OperationStateMachine tracks each trace through a validated
lifecycle — RECEIVED -> PRE_INFERENCE_GATHER -> INFERRING
-> TOOL_EXECUTING / POSTPROCESSING -> DELIVERING ->
COMPLETED (plus ERRORED / RETRYING / RECLAIMED) — and
persists compressed, schema-versioned checkpoints so a reclaimed
message can resume without redoing expensive gather/inference work.
- class core.state_machine.OperationStateMachine(redis)[source]
Bases:
object- classmethod is_valid_transition(current, target)[source]
Report whether a move from
currenttotargetis legal.Consults the class-level
_VALID_PREDECESSORSadjacency map, which encodes the operation lifecycle, and returnsTrueonly whencurrentappears intarget‘s allowed-predecessor list. Used to guard against out-of-order or skipped lifecycle steps.This is a pure lookup with no side effects. Within the repo it is exercised only by
tests/core/migration/test_state_machine.py; no production caller invokes it directly. Note that neithertransition()norwrite_checkpoint_and_transition()consults this guard, and the Lua script inscripts/state_transition.luadeliberately skips re-validation and justHSETs the new state for atomicity, so callers needing the lifecycle check must call this method themselves.
- __init__(redis)[source]
Bind the state machine to a Redis client for trace updates.
Stores
redisonself._redisfor later use bytransition(), which mutates thesg:trace:{trace_id}hash.Instances are wired into
core.stream_consumer.InboundStreamConsumer, which receives the state machine via itsstate_machineconstructor argument (stored asself._state_machine) and callstransition()as messages are claimed (CLAIMED), completed (COMPLETED), or errored (ERRORED); no internal call site constructingOperationStateMachineitself was found outside the tests.- Parameters:
redis – An async Redis client (
redis.asyncio-style) supportingexistsandhset.
- async transition(trace_id, target_state, metadata=None)[source]
Move a trace to
target_state, recording it for observability.Advances the lifecycle stamp on a trace by writing
state(plus any extrametadatafields) into the trace hash, so operators and the web dashboard can see where each operation is in theRECEIVED-> … ->COMPLETEDpipeline. It first checks the trace still exists and skips the write if not, which guards against resurrecting a trace whose hash has already expired or been reaped.Touches Redis: an
existsprobe followed by anhsetonsg:trace:{trace_id}(on the client passed at construction). Unlike the Lua-script path inwrite_checkpoint_and_transition(), this does not consultis_valid_transition()– it writes whatevertarget_stateit is given. Called bycore.stream_consumer.InboundStreamConsumer(which holds the state machine asself._state_machine) as messages are claimed (CLAIMED), completed (COMPLETED), and errored (ERRORED).- Parameters:
- Returns:
Trueif the trace existed and was updated,Falseif no trace hash was found (the write is skipped).- Return type:
- async core.state_machine.write_checkpoint_and_transition(redis, trace_id, gather_output, target_state, script_sha)[source]
Atomically write checkpoint AND transition state in one network hop.
Persist the gather-phase output as a compressed, schema-versioned checkpoint and advance the trace to
target_statein a single pipelined round trip, so a later reclaim can resume from the checkpoint instead of redoing expensive pre-inference gather work. The payload is JSON-encoded then zlib-compressed (level 6) before storage.Builds a Redis pipeline that
hsets the checkpoint hashsg:checkpoint:{trace_id}(schema version, timestamp, compressed payload, and original/compressed sizes), sets that key’s TTL toABSOLUTE_FAILSAFE_TTLseconds, and runs the caller-supplied Lua CAS script viaevalsha(script_sha, ...)againstsg:trace:{trace_id}to perform the validated state transition. Logs acheckpoint_savedline with the achieved compression ratio. Within the repo it is exercised only bytests/core/migration/test_state_machine.py; no production caller was found, soscript_sha(the SHA of a pre-loaded transition script) is expected to be supplied by the inference worker’s checkpoint flow.- Parameters:
redis – An async Redis client whose
pipeline()supportshset,expire, andevalsha.trace_id (
str) – Identifier of the operation; selects both thesg:checkpoint:{trace_id}andsg:trace:{trace_id}keys.gather_output (
dict) – JSON-serializable dict of pre-inference gather results to checkpoint.target_state (
str) – Lifecycle state to transition the trace into (passed as the sole argument to the Lua CAS script).script_sha (
str) – SHA1 of a server-side Lua script (loaded viaSCRIPT LOAD) that performs the validated state transition onsg:trace:{trace_id}.
- Returns:
The pipeline’s results list, one entry per queued command (
hset,expire,evalsha).- Return type:
- async core.state_machine.recover_checkpoint(redis, trace_id)[source]
Load and decompress a previously saved gather checkpoint, if usable.
Read the checkpoint written by
write_checkpoint_and_transition(), validate its schema version, and return the original gather output so a reclaimed operation can resume without redoing the gather phase. ReturnsNonewhen no checkpoint exists or when the stored checkpoint predates the currentCHECKPOINT_SCHEMA_VERSION(forcing a fresh regather).Calls
redis.hgetallonsg:checkpoint:{trace_id}, normalizes any byte-keyed fields tostr, and on a current-schema hit zlib-decompresses thepayload_compressedfield andjson.loadsit back into a dict. A stale schema logscheckpoint_schema_staleand yieldsNone; a successful recovery logscheckpoint_recoveredwith the checkpoint age. Within the repo it is exercised only bytests/core/migration/test_state_machine.py; no production caller was found, so it is expected to be driven by the inference worker’s reclaim path.- Parameters:
redis – An async Redis client supporting
hgetall.trace_id (
str) – Identifier of the operation whose checkpoint to load (selectssg:checkpoint:{trace_id}).
- Returns:
The recovered gather output dict, or
Noneif the checkpoint is missing or its schema is older than the current version.- Return type: