platforms.redis module

Redis Platform Adapter.

Provides a fully Redis-backed platform for the Stargazer v3 pipeline. External callers interact with instances via the REST API defined in web/redis_platform_api.py; this adapter bridges those API calls into the standard MessageProcessor pipeline.

Architecture:

  • One adapter instance is created per Gateway boot by platforms.factory.create_platform and runs inside the Gateway service.

  • Each “instance” is a logical shard identified by an instance_id (which maps to channel_id inside the pipeline).

  • Per-instance API keys authenticate callers; the adapter manages these in a Redis hash redis_platform:meta:{instance_id}.

  • Outbound send() / send_file() calls publish to a Pub/Sub channel so ?wait=true callers can collect synchronous responses.

  • Files are stored in Redis with a 24h TTL and served via a dedicated download endpoint.

class platforms.redis.RedisPlatformAdapter(message_handler, config=None, **kwargs)[source]

Bases: PlatformAdapter

Platform adapter whose entire state lives in Redis.

One adapter instance manages all Redis platform instances — each logical conversation is identified by instance_id, which the pipeline treats as the channel_id.

Parameters:
  • message_handler (Callable[[IncomingMessage, PlatformAdapter], Awaitable[None]]) – Callback wired by the Gateway service to its inbound handler, which publishes the message onto the sg:stream:inbound Redis stream for the Inference service to pick up. Every injected message is forwarded here for pipeline processing.

  • config (Any | None) – Bot configuration object. Used to build the Redis connection. When None, the adapter starts without a Redis connection (safe for unit tests with mocked state).

  • kwargs (Any)

__init__(message_handler, config=None, **kwargs)[source]

Construct the Redis-backed platform adapter.

Stores the supplied message_handler (via the PlatformAdapter base) and config without opening any connection; the actual Redis client is created lazily in start(). Until then self._redis is None and self._running is False. Extra keyword arguments are accepted and ignored so platforms.factory.create_platform can pass adapter options uniformly across platform types.

Parameters:
  • message_handler (Callable[[IncomingMessage, PlatformAdapter], Awaitable[None]]) – Async callback wired by the Gateway service that forwards each injected IncomingMessage onto the sg:stream:inbound Redis stream for the Inference service.

  • config (Any | None) – Bot configuration used to build the Redis connection in start(). When None, the adapter runs without Redis and registry operations no-op (used by tests).

  • **kwargs (Any) – Additional adapter options, accepted and ignored for factory call-site uniformity.

Return type:

None

async start()[source]

Connect to Redis and mark the adapter as running.

Lazily builds the async Redis client the rest of this adapter relies on, preferring the config’s build_async_redis_client (Sentinel-aware, failover-resilient, tracks master re-election) and falling back to a bare aioredis.from_url with the config’s SSL kwargs when that helper is absent. When no Redis URL or Sentinel set is configured the connection is skipped and a warning is logged, leaving every registry operation to no-op; in all cases self._running is flipped to True. Invoked once per Gateway boot after platforms.factory.create_platform builds this adapter, as part of platform lifecycle startup.

Raises:

redis.exceptions.RedisError – Propagated if the underlying client construction surfaces a connection error eagerly.

Return type:

None

async stop()[source]

Gracefully disconnect and release Redis resources.

Clears self._running and, if a client was opened in start(), closes it via aclose (swallowing and debug-logging any teardown error) before dropping the reference so a later restart reconnects cleanly. Invoked during platform lifecycle shutdown, the mirror of start(), when the Gateway service is stopping.

Return type:

None

property name: str

Return the platform’s short identifier.

Implements the abstract PlatformAdapter.name property. The constant "redis" is used as the adapter’s key when the pipeline builds its adapters_by_name maps in message_processor and gateway_main, and to branch platform-specific behaviour elsewhere.

Returns:

The literal "redis".

Return type:

str

property is_running: bool

Report whether the adapter has been started and not stopped.

Implements the abstract PlatformAdapter.is_running property. Reflects the self._running flag set in start() and cleared in stop(). Read by background_tasks (which waits for adapters to come up before backfill), gateway_main, web/deps, web/platforms_api, and web/bot_admin for status reporting.

Returns:

True while the adapter is running, False otherwise.

Return type:

bool

property bot_identity: dict[str, str]

Describe this bot’s identity on the Redis platform.

Implements the abstract PlatformAdapter.bot_identity property. Unlike Discord or Matrix, the Redis platform has no live account to query, so a fixed identity for “Star” is returned. Read across adapters by prompt_context (which gathers bot_identity for every adapter), message_processor.user_message_format, message_processor.history_backfill, and background_tasks to label and attribute the bot’s own messages.

Returns:

A mapping with platform, user_id, display_name, and mention keys for the Redis bot identity.

Return type:

dict[str, str]

async should_skip_channel_heartbeat(channel_id)[source]

Always skip the proactive channel heartbeat for Redis instances.

Redis platform instances are driven entirely by explicit API calls (inject_message), so the bot never needs to wake them with unsolicited background heartbeat messages the way it does for live chat channels; this unconditionally returns True. Consulted by message_processor.channel_heartbeat before it would otherwise emit a heartbeat, and reached for the Redis platform through core.outbound_consumer (the in-Gateway RPC worker that resolves the inference worker’s core.proxy_adapter.ProxyPlatformAdapter calls).

Parameters:

channel_id (str) – The instance id under consideration. Ignored — the answer is the same for every Redis instance.

Returns:

Always True.

Return type:

bool

async create_instance(instance_id=None, display_name='', api_key=None, metadata=None)[source]

Create a new Redis platform instance and persist its registry record.

Mints an instance_id (random 16-hex when not supplied) and an API key (a secrets.token_urlsafe token when not supplied), then writes both — plus the display name, a UTC created_at stamp, and any flattened caller metadata — into Redis: the id is added to the redis_platform:instances set and the metadata into the redis_platform:meta:{instance_id} hash, in a single pipeline. With no Redis connection the call still returns a fresh id/key pair but persists nothing. The plaintext key is returned only here and never retrievable afterward, since registry reads redact it. Invoked by the create-instance route in web/redis_platform_api.py.

Parameters:
  • instance_id (str | None) – Desired instance id; a random hex id is generated when None.

  • display_name (str) – Human-friendly label; defaults to the id itself when empty.

  • api_key (str | None) – Pre-chosen key; a secure token is generated when None.

  • metadata (dict[str, Any] | None) – Extra fields stored as top-level hash fields; keys and values are stringified, so callers should supply string-coercible values.

Returns:

A mapping with instance_id and the one-time plaintext api_key.

Return type:

dict[str, str]

async validate_api_key(instance_id, api_key)[source]

Validate a per-instance API key using constant-time comparison.

Reads the stored key from the redis_platform:meta:{instance_id} hash and compares it against the presented key with secrets.compare_digest to avoid leaking timing information. Returns False rather than raising when there is no Redis connection, the instance does not exist, or no key is stored, so callers can map every failure to a uniform 401/403. Invoked by the bearer-auth dependency in web/redis_platform_api.py that guards the per-instance routes.

Parameters:
  • instance_id (str) – The instance whose stored key to check.

  • api_key (str) – The candidate key presented by the caller.

Returns:

True only when a key is stored and matches; False otherwise.

Return type:

bool

async instance_exists(instance_id)[source]

Report whether an instance is registered.

Checks membership of instance_id in the redis_platform:instances set, returning False when there is no Redis connection. Used as a precondition guard before instance-scoped work: by inject_message() and update_instance_meta() here, and by the user and inject routes in web/redis_platform_api.py that 404 on a missing instance.

Parameters:

instance_id (str) – The candidate instance id.

Returns:

True if the id is in the registry set, False otherwise.

Return type:

bool

async list_instances()[source]

Return redacted metadata for every registered instance.

Reads the redis_platform:instances set, then fetches each redis_platform:meta:{instance_id} hash in id-sorted order, stripping the api_key field and stamping in the instance_id so the result is safe to expose. Returns an empty list when there is no Redis connection. Invoked by the list-instances route in web/redis_platform_api.py.

Returns:

One metadata dict per instance, each carrying instance_id and never the API key, sorted by id.

Return type:

list[dict[str, str]]

async get_instance_meta(instance_id)[source]

Return one instance’s redacted metadata, or None if absent.

Reads the redis_platform:meta:{instance_id} hash, drops the api_key field, and stamps in instance_id before returning. Yields None when there is no Redis connection or the hash is empty (i.e. the instance is unknown). Invoked by the get-instance and related routes in web/redis_platform_api.py.

Parameters:

instance_id (str) – The instance whose metadata to fetch.

Returns:

The key-redacted metadata with an instance_id field, or None when the instance has no metadata.

Return type:

dict[str, str] | None

async update_instance_meta(instance_id, display_name=None, metadata=None)[source]

Update mutable fields on an instance’s metadata hash.

Verifies the instance exists, then HSETs the changed fields into redis_platform:meta:{instance_id} — the display name when supplied and any caller metadata, with keys and values stringified — while leaving the API key and creation stamp untouched. A no-op write is skipped when nothing changed. Returns False without writing when there is no Redis connection or the instance is unknown. Invoked by the update-instance route in web/redis_platform_api.py.

Parameters:
  • instance_id (str) – The instance to update.

  • display_name (str | None) – New display name; left unchanged when None.

  • metadata (dict[str, Any] | None) – Extra fields to merge in as top-level hash fields, stringified.

Returns:

True when the instance exists (whether or not any field actually changed); False when it is missing or Redis is absent.

Return type:

bool

async register_user(instance_id, user_id, display_name, metadata=None)[source]

Register (or overwrite) a user in an instance’s user registry.

Builds a user record — id, display name, arbitrary metadata, and a UTC registered_at stamp — and HSETs it as a JSON value under the field user_id in the redis_platform:users:{instance_id} hash, replacing any prior entry for that user. With no Redis connection the record is built and returned but not persisted. Once any user is registered the instance becomes a closed roster that inject_message() enforces. Invoked by the register-user route in web/redis_platform_api.py.

Parameters:
  • instance_id (str) – The instance to register the user under.

  • user_id (str) – Stable identifier used as the hash field and for pipeline attribution.

  • display_name (str) – Human-friendly name surfaced in history and the system prompt.

  • metadata (dict[str, Any] | None) – Optional extra attributes stored verbatim inside the JSON record.

Returns:

The stored user record (id, display name, metadata, and registered_at).

Return type:

dict[str, Any]

async list_users(instance_id)[source]

Return every registered user for an instance.

Reads the whole redis_platform:users:{instance_id} hash and decodes each JSON value into a record, skipping (and warning about) any value that fails to deserialize so one corrupt entry cannot break the listing. Returns an empty list when there is no Redis connection. Invoked by the list-users route in web/redis_platform_api.py.

Parameters:

instance_id (str) – The instance whose roster to read.

Returns:

The decoded user records, sorted by user_id.

Return type:

list[dict[str, Any]]

async get_user(instance_id, user_id)[source]

Return one registered user record, or None if absent.

HGETs the user_id field from the redis_platform:users:{instance_id} hash and JSON-decodes it, treating a missing field or any decode failure as “not found” by returning None. Also returns None when there is no Redis connection. Called by the get-user route in web/redis_platform_api.py and by inject_message() here to resolve the canonical display name for a roster-enforced instance.

Parameters:
  • instance_id (str) – The instance to look in.

  • user_id (str) – The user id (hash field) to fetch.

Returns:

The decoded user record, or None when it is missing or unreadable.

Return type:

dict[str, Any] | None

async delete_user(instance_id, user_id)[source]

Remove a user from an instance’s roster.

HDELs the user_id field from the redis_platform:users:{instance_id} hash and reports whether anything was actually removed. Returns False without touching Redis when there is no connection. Invoked by the delete-user route in web/redis_platform_api.py.

Parameters:
  • instance_id (str) – The instance to remove the user from.

  • user_id (str) – The user id (hash field) to delete.

Returns:

True when a field was deleted, False when the user was already absent or Redis is unavailable.

Return type:

bool

async user_exists(instance_id, user_id)[source]

Report whether a specific user is on an instance’s roster.

Uses an HEXISTS on the redis_platform:users:{instance_id} hash so it can answer without deserializing the record, returning False when there is no Redis connection. No in-repo callers were found, so this is a convenience predicate available to API or test code rather than part of the live request path.

Parameters:
  • instance_id (str) – The instance whose roster to check.

  • user_id (str) – The user id (hash field) to test for.

Returns:

True if the user is registered, False otherwise.

Return type:

bool

async has_users(instance_id)[source]

Report whether an instance has a non-empty user roster.

Uses an HLEN on the redis_platform:users:{instance_id} hash to decide whether the instance enforces a closed roster, returning False when there is no Redis connection. Called by inject_message() here: when True it requires the sender to be a registered user and pulls their canonical display name; when False it accepts any sender that supplies a name.

Parameters:

instance_id (str) – The instance whose roster size to check.

Returns:

True when at least one user is registered, False otherwise.

Return type:

bool

async delete_instance(instance_id)[source]

Delete an instance and purge its associated Redis state.

In a single pipeline this removes the registry membership and redis_platform:meta:{instance_id} hash, the per-channel message- batching toggles (message_batching_{disabled,enabled,batch_window}), the channel_goals and channel_recall_zset channel state, the MessageCache channel_msgs:redis:{instance_id} sorted set, any lingering redis_platform:wait_lock entry, and the redis_platform:users roster hash. Only Redis-backed state is touched here; any per-instance state elsewhere in the pipeline is the caller’s responsibility. With no Redis connection it cleans nothing and flags a no_redis warning. Invoked by the delete-instance route in web/redis_platform_api.py.

Parameters:

instance_id (str) – The instance to delete.

Returns:

A summary with the instance_id and a cleaned list of the state categories removed; includes a warning of no_redis when there was no connection.

Return type:

dict[str, Any]

async inject_message(instance_id, text, user_id, user_name='', *, is_addressed=True, attachments=None, channel_name='', message_id='', reply_to_id='', extra=None, reactions='', response_nonce='')[source]

Inject a message into the pipeline for instance_id.

Parameters:
  • instance_id (str) – Must already exist in the instance registry.

  • text (str) – Message body.

  • user_id (str) – Caller-supplied sender identifier (used for KG, threadweave, privilege checks).

  • user_name (str) – Display name shown in conversation history and system prompt.

  • is_addressed (bool) – Defaults to True — must be True for the message to bypass the proactive triage gate and guarantee a response.

  • response_nonce (str) – Forwarded in msg.extra["_response_nonce"] so the turn- complete signal can be correlated with a specific request.

  • attachments (list[Attachment] | None)

  • channel_name (str)

  • message_id (str)

  • reply_to_id (str)

  • extra (dict[str, Any] | None)

  • reactions (str)

Returns:

The message_id assigned to this message.

Return type:

str

async is_channel_valid(channel_id)[source]

Report whether a channel id maps to a live Redis instance.

Implements the abstract PlatformAdapter.is_channel_valid() check used to drop work for stale or unknown channels. Tests membership of channel_id (the instance id) in the redis_platform:instances set, routing the call through _safe_publish() so a transient Redis blip retries rather than spuriously invalidating the channel; any error after retries, or no Redis connection, yields False. For the Redis platform this is reached through core.outbound_consumer (the in-Gateway RPC worker) on behalf of the inference worker’s core.proxy_adapter.ProxyPlatformAdapter.

Parameters:

channel_id (str) – The channel/instance id to validate.

Returns:

True only when the instance is in the active set; False on absence, error, or missing connection.

Return type:

bool

async send(channel_id, text)[source]

Publish a text message to the instance’s response Pub/Sub channel.

Implements the abstract PlatformAdapter.send(). Mints a message id and, when connected, publishes a JSON message envelope (id, text, timestamp) to the stargazer:redis_platform:response:{channel_id} Pub/Sub channel through _safe_publish(), so any ?wait=true caller subscribed in web/redis_platform_api.py collects it synchronously. With no Redis connection the id is still returned but nothing is published. For the Redis platform this is dispatched by core.outbound_consumer (the in-Gateway RPC worker) when the pipeline’s generate_and_send and command_router emit text output — tool status lines, final LLM responses, errors, and command results.

Parameters:
  • channel_id (str) – The instance id, used to address the Pub/Sub channel.

  • text (str) – The message body to deliver.

Returns:

The generated message id for this outbound message.

Return type:

str

async send_file(channel_id, data, filename, mimetype='application/octet-stream')[source]

Store a file in Redis with a 24h TTL and publish retrieval metadata.

Implements the abstract PlatformAdapter.send_file(). When connected, it HSETs the base64-encoded payload plus filename, mimetype, size, timestamp, and channel id into redis_platform:file:{file_id}, sets a 24-hour expiry, and publishes a lightweight JSON file envelope (id, filename, mimetype, size, download url, timestamp) to the stargazer:redis_platform:response:{channel_id} Pub/Sub channel — all in one pipeline run through _safe_publish(). Only metadata, never the bytes, crosses Pub/Sub; the file itself is fetched later via GET /redis-platform/files/{file_id}/{filename} in web/redis_platform_api.py. For the Redis platform this is dispatched by core.outbound_consumer when the pipeline sends an attachment. The download url is always returned, even when no Redis connection means nothing was stored.

Parameters:
  • channel_id (str) – The instance id used to address the Pub/Sub channel and tag the stored file.

  • data (bytes) – The raw file contents to store.

  • filename (str) – The file’s name, used in the download url and metadata.

  • mimetype (str) – The content type; defaults to application/octet-stream.

Returns:

The relative download url for the stored file.

Return type:

str | None

async fetch_history(channel_id, limit=100)[source]

Return no backfill history — Redis instances have no external source.

Implements the abstract PlatformAdapter.fetch_history(). Unlike Discord or Matrix, a Redis instance has no upstream account to backfill from: every message it has ever seen was written by the pipeline and already lives in the MessageCache, which serves history directly to web/redis_platform_api.py. So this unconditionally returns an empty list, telling the backfill path there is nothing to import. Reached for the Redis platform through core.outbound_consumer (the in-Gateway RPC worker) on behalf of callers like background_tasks and message_processor.history_backfill.

Parameters:
  • channel_id (str) – The instance id requesting history. Ignored.

  • limit (int) – Maximum messages the caller would accept. Ignored.

Returns:

Always an empty list.

Return type:

list[HistoricalMessage]