core.tools_consumer module

Tool-execution stream consumer for the dedicated tools service.

ToolExecConsumer mirrors core.stream_consumer.InboundStreamConsumer — blocking XREADGROUP on sg:stream:tools under the sg:tools group, an autoclaim sweep, and DLQ on repeated failure — but with two deliberate differences:

  • No per-channel distributed lock. Tool execution must run fully in parallel and load-balance across tools instances; user-visible message ordering is already enforced downstream by the gateway’s per-channel outbound lock.

  • The injected process_fn is expected to always reply (it pushes the result, or an error envelope, onto the caller’s reply stream) and to swallow tool-level exceptions. Only infrastructure failures propagate to the DLQ.

class core.tools_consumer.ToolExecConsumer(redis, consumer_name, process_fn, autoclaim_interval=30.0, autoclaim_min_idle=60000)[source]

Bases: object

Consumes tool-execution requests and dispatches them to process_fn.

Parameters:
  • redis (Redis)

  • consumer_name (str)

  • process_fn (Callable[[dict[str, Any]], Awaitable[None]])

  • autoclaim_interval (float)

  • autoclaim_min_idle (int)

async start()[source]
Return type:

None

async stop()[source]
Return type:

None