core.state_machine module

Per-operation state machine with crash-safe checkpoints.

OperationStateMachine tracks each trace through a validated lifecycle — RECEIVED -> PRE_INFERENCE_GATHER -> INFERRING -> TOOL_EXECUTING / POSTPROCESSING -> DELIVERING -> COMPLETED (plus ERRORED / RETRYING / RECLAIMED) — and persists compressed, schema-versioned checkpoints so a reclaimed message can resume without redoing expensive gather/inference work.

class core.state_machine.OperationStateMachine(redis)[source]

Bases: object

classmethod is_valid_transition(current, target)[source]

Report whether a move from current to target is legal.

Consults the class-level _VALID_PREDECESSORS adjacency map, which encodes the operation lifecycle, and returns True only when current appears in target‘s allowed-predecessor list. Used to guard against out-of-order or skipped lifecycle steps.

This is a pure lookup with no side effects. Within the repo it is exercised only by tests/core/migration/test_state_machine.py; no production caller invokes it directly. Note that neither transition() nor write_checkpoint_and_transition() consults this guard, and the Lua script in scripts/state_transition.lua deliberately skips re-validation and just HSETs the new state for atomicity, so callers needing the lifecycle check must call this method themselves.

Parameters:
  • current (str) – The state the trace is currently in (e.g. "CLAIMED").

  • target (str) – The state being transitioned to (e.g. "INFERRING").

Return type:

bool

Returns:

True if current is a valid predecessor of target, otherwise False (including when target is unknown).

__init__(redis)[source]

Bind the state machine to a Redis client for trace updates.

Stores redis on self._redis for later use by transition(), which mutates the sg:trace:{trace_id} hash.

Instances are wired into core.stream_consumer.InboundStreamConsumer, which receives the state machine via its state_machine constructor argument (stored as self._state_machine) and calls transition() as messages are claimed (CLAIMED), completed (COMPLETED), or errored (ERRORED); no internal call site constructing OperationStateMachine itself was found outside the tests.

Parameters:

redis – An async Redis client (redis.asyncio-style) supporting exists and hset.

async transition(trace_id, target_state, metadata=None)[source]

Move a trace to target_state, recording it for observability.

Advances the lifecycle stamp on a trace by writing state (plus any extra metadata fields) into the trace hash, so operators and the web dashboard can see where each operation is in the RECEIVED -> … -> COMPLETED pipeline. It first checks the trace still exists and skips the write if not, which guards against resurrecting a trace whose hash has already expired or been reaped.

Touches Redis: an exists probe followed by an hset on sg:trace:{trace_id} (on the client passed at construction). Unlike the Lua-script path in write_checkpoint_and_transition(), this does not consult is_valid_transition() – it writes whatever target_state it is given. Called by core.stream_consumer.InboundStreamConsumer (which holds the state machine as self._state_machine) as messages are claimed (CLAIMED), completed (COMPLETED), and errored (ERRORED).

Parameters:
  • trace_id (str) – Identifier selecting the sg:trace:{trace_id} hash.

  • target_state (str) – New lifecycle state to record (e.g. "CLAIMED").

  • metadata (dict | None) – Optional extra fields to merge into the trace hash alongside state (e.g. {"msg_id": ...}).

Returns:

True if the trace existed and was updated, False if no trace hash was found (the write is skipped).

Return type:

bool

async core.state_machine.write_checkpoint_and_transition(redis, trace_id, gather_output, target_state, script_sha)[source]

Atomically write checkpoint AND transition state in one network hop.

Persist the gather-phase output as a compressed, schema-versioned checkpoint and advance the trace to target_state in a single pipelined round trip, so a later reclaim can resume from the checkpoint instead of redoing expensive pre-inference gather work. The payload is JSON-encoded then zlib-compressed (level 6) before storage.

Builds a Redis pipeline that hsets the checkpoint hash sg:checkpoint:{trace_id} (schema version, timestamp, compressed payload, and original/compressed sizes), sets that key’s TTL to ABSOLUTE_FAILSAFE_TTL seconds, and runs the caller-supplied Lua CAS script via evalsha(script_sha, ...) against sg:trace:{trace_id} to perform the validated state transition. Logs a checkpoint_saved line with the achieved compression ratio. Within the repo it is exercised only by tests/core/migration/test_state_machine.py; no production caller was found, so script_sha (the SHA of a pre-loaded transition script) is expected to be supplied by the inference worker’s checkpoint flow.

Parameters:
  • redis – An async Redis client whose pipeline() supports hset, expire, and evalsha.

  • trace_id (str) – Identifier of the operation; selects both the sg:checkpoint:{trace_id} and sg:trace:{trace_id} keys.

  • gather_output (dict) – JSON-serializable dict of pre-inference gather results to checkpoint.

  • target_state (str) – Lifecycle state to transition the trace into (passed as the sole argument to the Lua CAS script).

  • script_sha (str) – SHA1 of a server-side Lua script (loaded via SCRIPT LOAD) that performs the validated state transition on sg:trace:{trace_id}.

Returns:

The pipeline’s results list, one entry per queued command (hset, expire, evalsha).

Return type:

list

async core.state_machine.recover_checkpoint(redis, trace_id)[source]

Load and decompress a previously saved gather checkpoint, if usable.

Read the checkpoint written by write_checkpoint_and_transition(), validate its schema version, and return the original gather output so a reclaimed operation can resume without redoing the gather phase. Returns None when no checkpoint exists or when the stored checkpoint predates the current CHECKPOINT_SCHEMA_VERSION (forcing a fresh regather).

Calls redis.hgetall on sg:checkpoint:{trace_id}, normalizes any byte-keyed fields to str, and on a current-schema hit zlib-decompresses the payload_compressed field and json.loads it back into a dict. A stale schema logs checkpoint_schema_stale and yields None; a successful recovery logs checkpoint_recovered with the checkpoint age. Within the repo it is exercised only by tests/core/migration/test_state_machine.py; no production caller was found, so it is expected to be driven by the inference worker’s reclaim path.

Parameters:
  • redis – An async Redis client supporting hgetall.

  • trace_id (str) – Identifier of the operation whose checkpoint to load (selects sg:checkpoint:{trace_id}).

Returns:

The recovered gather output dict, or None if the checkpoint is missing or its schema is older than the current version.

Return type:

dict | None