platforms.redis module
Redis Platform Adapter.
Provides a fully Redis-backed platform for the Stargazer v3 pipeline.
External callers interact with instances via the REST API defined in
web/redis_platform_api.py; this adapter bridges those API calls into
the standard MessageProcessor pipeline.
Architecture:
One adapter instance is created per Gateway boot by
platforms.factory.create_platformand runs inside the Gateway service.Each “instance” is a logical shard identified by an
instance_id(which maps tochannel_idinside the pipeline).Per-instance API keys authenticate callers; the adapter manages these in a Redis hash
redis_platform:meta:{instance_id}.Outbound
send()/send_file()calls publish to a Pub/Sub channel so?wait=truecallers can collect synchronous responses.Files are stored in Redis with a 24h TTL and served via a dedicated download endpoint.
- class platforms.redis.RedisPlatformAdapter(message_handler, config=None, **kwargs)[source]
Bases:
PlatformAdapterPlatform adapter whose entire state lives in Redis.
One adapter instance manages all Redis platform instances — each logical conversation is identified by
instance_id, which the pipeline treats as thechannel_id.- Parameters:
message_handler (
Callable[[IncomingMessage,PlatformAdapter],Awaitable[None]]) – Callback wired by the Gateway service to its inbound handler, which publishes the message onto thesg:stream:inboundRedis stream for the Inference service to pick up. Every injected message is forwarded here for pipeline processing.config (
Any|None) – Bot configuration object. Used to build the Redis connection. WhenNone, the adapter starts without a Redis connection (safe for unit tests with mocked state).kwargs (Any)
- __init__(message_handler, config=None, **kwargs)[source]
Construct the Redis-backed platform adapter.
Stores the supplied
message_handler(via thePlatformAdapterbase) andconfigwithout opening any connection; the actual Redis client is created lazily instart(). Until thenself._redisisNoneandself._runningisFalse. Extra keyword arguments are accepted and ignored soplatforms.factory.create_platformcan pass adapter options uniformly across platform types.- Parameters:
message_handler (
Callable[[IncomingMessage,PlatformAdapter],Awaitable[None]]) – Async callback wired by the Gateway service that forwards each injectedIncomingMessageonto thesg:stream:inboundRedis stream for the Inference service.config (
Any|None) – Bot configuration used to build the Redis connection instart(). WhenNone, the adapter runs without Redis and registry operations no-op (used by tests).**kwargs (
Any) – Additional adapter options, accepted and ignored for factory call-site uniformity.
- Return type:
None
- async start()[source]
Connect to Redis and mark the adapter as running.
Lazily builds the async Redis client the rest of this adapter relies on, preferring the config’s
build_async_redis_client(Sentinel-aware, failover-resilient, tracks master re-election) and falling back to a bareaioredis.from_urlwith the config’s SSL kwargs when that helper is absent. When no Redis URL or Sentinel set is configured the connection is skipped and a warning is logged, leaving every registry operation to no-op; in all casesself._runningis flipped toTrue. Invoked once per Gateway boot afterplatforms.factory.create_platformbuilds this adapter, as part of platform lifecycle startup.- Raises:
redis.exceptions.RedisError – Propagated if the underlying client construction surfaces a connection error eagerly.
- Return type:
- async stop()[source]
Gracefully disconnect and release Redis resources.
Clears
self._runningand, if a client was opened instart(), closes it viaaclose(swallowing and debug-logging any teardown error) before dropping the reference so a later restart reconnects cleanly. Invoked during platform lifecycle shutdown, the mirror ofstart(), when the Gateway service is stopping.- Return type:
- property name: str
Return the platform’s short identifier.
Implements the abstract
PlatformAdapter.nameproperty. The constant"redis"is used as the adapter’s key when the pipeline builds itsadapters_by_namemaps inmessage_processorandgateway_main, and to branch platform-specific behaviour elsewhere.- Returns:
The literal
"redis".- Return type:
- property is_running: bool
Report whether the adapter has been started and not stopped.
Implements the abstract
PlatformAdapter.is_runningproperty. Reflects theself._runningflag set instart()and cleared instop(). Read bybackground_tasks(which waits for adapters to come up before backfill),gateway_main,web/deps,web/platforms_api, andweb/bot_adminfor status reporting.- Returns:
Truewhile the adapter is running,Falseotherwise.- Return type:
- property bot_identity: dict[str, str]
Describe this bot’s identity on the Redis platform.
Implements the abstract
PlatformAdapter.bot_identityproperty. Unlike Discord or Matrix, the Redis platform has no live account to query, so a fixed identity for “Star” is returned. Read across adapters byprompt_context(which gathersbot_identityfor every adapter),message_processor.user_message_format,message_processor.history_backfill, andbackground_tasksto label and attribute the bot’s own messages.
- async should_skip_channel_heartbeat(channel_id)[source]
Always skip the proactive channel heartbeat for Redis instances.
Redis platform instances are driven entirely by explicit API calls (
inject_message), so the bot never needs to wake them with unsolicited background heartbeat messages the way it does for live chat channels; this unconditionally returnsTrue. Consulted bymessage_processor.channel_heartbeatbefore it would otherwise emit a heartbeat, and reached for the Redis platform throughcore.outbound_consumer(the in-Gateway RPC worker that resolves the inference worker’score.proxy_adapter.ProxyPlatformAdaptercalls).
- async create_instance(instance_id=None, display_name='', api_key=None, metadata=None)[source]
Create a new Redis platform instance and persist its registry record.
Mints an
instance_id(random 16-hex when not supplied) and an API key (asecrets.token_urlsafetoken when not supplied), then writes both — plus the display name, a UTCcreated_atstamp, and any flattened caller metadata — into Redis: the id is added to theredis_platform:instancesset and the metadata into theredis_platform:meta:{instance_id}hash, in a single pipeline. With no Redis connection the call still returns a fresh id/key pair but persists nothing. The plaintext key is returned only here and never retrievable afterward, since registry reads redact it. Invoked by the create-instance route inweb/redis_platform_api.py.- Parameters:
instance_id (
str|None) – Desired instance id; a random hex id is generated whenNone.display_name (
str) – Human-friendly label; defaults to the id itself when empty.api_key (
str|None) – Pre-chosen key; a secure token is generated whenNone.metadata (
dict[str,Any] |None) – Extra fields stored as top-level hash fields; keys and values are stringified, so callers should supply string-coercible values.
- Returns:
A mapping with
instance_idand the one-time plaintextapi_key.- Return type:
- async validate_api_key(instance_id, api_key)[source]
Validate a per-instance API key using constant-time comparison.
Reads the stored key from the
redis_platform:meta:{instance_id}hash and compares it against the presented key withsecrets.compare_digestto avoid leaking timing information. ReturnsFalserather than raising when there is no Redis connection, the instance does not exist, or no key is stored, so callers can map every failure to a uniform 401/403. Invoked by the bearer-auth dependency inweb/redis_platform_api.pythat guards the per-instance routes.
- async instance_exists(instance_id)[source]
Report whether an instance is registered.
Checks membership of
instance_idin theredis_platform:instancesset, returningFalsewhen there is no Redis connection. Used as a precondition guard before instance-scoped work: byinject_message()andupdate_instance_meta()here, and by the user and inject routes inweb/redis_platform_api.pythat 404 on a missing instance.
- async list_instances()[source]
Return redacted metadata for every registered instance.
Reads the
redis_platform:instancesset, then fetches eachredis_platform:meta:{instance_id}hash in id-sorted order, stripping theapi_keyfield and stamping in theinstance_idso the result is safe to expose. Returns an empty list when there is no Redis connection. Invoked by the list-instances route inweb/redis_platform_api.py.
- async get_instance_meta(instance_id)[source]
Return one instance’s redacted metadata, or
Noneif absent.Reads the
redis_platform:meta:{instance_id}hash, drops theapi_keyfield, and stamps ininstance_idbefore returning. YieldsNonewhen there is no Redis connection or the hash is empty (i.e. the instance is unknown). Invoked by the get-instance and related routes inweb/redis_platform_api.py.
- async update_instance_meta(instance_id, display_name=None, metadata=None)[source]
Update mutable fields on an instance’s metadata hash.
Verifies the instance exists, then HSETs the changed fields into
redis_platform:meta:{instance_id}— the display name when supplied and any caller metadata, with keys and values stringified — while leaving the API key and creation stamp untouched. A no-op write is skipped when nothing changed. ReturnsFalsewithout writing when there is no Redis connection or the instance is unknown. Invoked by the update-instance route inweb/redis_platform_api.py.- Parameters:
- Returns:
Truewhen the instance exists (whether or not any field actually changed);Falsewhen it is missing or Redis is absent.- Return type:
- async register_user(instance_id, user_id, display_name, metadata=None)[source]
Register (or overwrite) a user in an instance’s user registry.
Builds a user record — id, display name, arbitrary metadata, and a UTC
registered_atstamp — and HSETs it as a JSON value under the fielduser_idin theredis_platform:users:{instance_id}hash, replacing any prior entry for that user. With no Redis connection the record is built and returned but not persisted. Once any user is registered the instance becomes a closed roster thatinject_message()enforces. Invoked by the register-user route inweb/redis_platform_api.py.- Parameters:
instance_id (
str) – The instance to register the user under.user_id (
str) – Stable identifier used as the hash field and for pipeline attribution.display_name (
str) – Human-friendly name surfaced in history and the system prompt.metadata (
dict[str,Any] |None) – Optional extra attributes stored verbatim inside the JSON record.
- Returns:
The stored user record (id, display name, metadata, and
registered_at).- Return type:
- async list_users(instance_id)[source]
Return every registered user for an instance.
Reads the whole
redis_platform:users:{instance_id}hash and decodes each JSON value into a record, skipping (and warning about) any value that fails to deserialize so one corrupt entry cannot break the listing. Returns an empty list when there is no Redis connection. Invoked by the list-users route inweb/redis_platform_api.py.
- async get_user(instance_id, user_id)[source]
Return one registered user record, or
Noneif absent.HGETs the
user_idfield from theredis_platform:users:{instance_id}hash and JSON-decodes it, treating a missing field or any decode failure as “not found” by returningNone. Also returnsNonewhen there is no Redis connection. Called by the get-user route inweb/redis_platform_api.pyand byinject_message()here to resolve the canonical display name for a roster-enforced instance.
- async delete_user(instance_id, user_id)[source]
Remove a user from an instance’s roster.
HDELs the
user_idfield from theredis_platform:users:{instance_id}hash and reports whether anything was actually removed. ReturnsFalsewithout touching Redis when there is no connection. Invoked by the delete-user route inweb/redis_platform_api.py.
- async user_exists(instance_id, user_id)[source]
Report whether a specific user is on an instance’s roster.
Uses an
HEXISTSon theredis_platform:users:{instance_id}hash so it can answer without deserializing the record, returningFalsewhen there is no Redis connection. No in-repo callers were found, so this is a convenience predicate available to API or test code rather than part of the live request path.
- async has_users(instance_id)[source]
Report whether an instance has a non-empty user roster.
Uses an
HLENon theredis_platform:users:{instance_id}hash to decide whether the instance enforces a closed roster, returningFalsewhen there is no Redis connection. Called byinject_message()here: whenTrueit requires the sender to be a registered user and pulls their canonical display name; whenFalseit accepts any sender that supplies a name.
- async delete_instance(instance_id)[source]
Delete an instance and purge its associated Redis state.
In a single pipeline this removes the registry membership and
redis_platform:meta:{instance_id}hash, the per-channel message- batching toggles (message_batching_{disabled,enabled,batch_window}), thechannel_goalsandchannel_recall_zsetchannel state, the MessageCachechannel_msgs:redis:{instance_id}sorted set, any lingeringredis_platform:wait_lockentry, and theredis_platform:usersroster hash. Only Redis-backed state is touched here; any per-instance state elsewhere in the pipeline is the caller’s responsibility. With no Redis connection it cleans nothing and flags ano_rediswarning. Invoked by the delete-instance route inweb/redis_platform_api.py.
- async inject_message(instance_id, text, user_id, user_name='', *, is_addressed=True, attachments=None, channel_name='', message_id='', reply_to_id='', extra=None, reactions='', response_nonce='')[source]
Inject a message into the pipeline for instance_id.
- Parameters:
instance_id (
str) – Must already exist in the instance registry.text (
str) – Message body.user_id (
str) – Caller-supplied sender identifier (used for KG, threadweave, privilege checks).user_name (
str) – Display name shown in conversation history and system prompt.is_addressed (
bool) – Defaults toTrue— must beTruefor the message to bypass the proactive triage gate and guarantee a response.response_nonce (
str) – Forwarded inmsg.extra["_response_nonce"]so the turn- complete signal can be correlated with a specific request.attachments (list[Attachment] | None)
channel_name (str)
message_id (str)
reply_to_id (str)
reactions (str)
- Returns:
The
message_idassigned to this message.- Return type:
- async is_channel_valid(channel_id)[source]
Report whether a channel id maps to a live Redis instance.
Implements the abstract
PlatformAdapter.is_channel_valid()check used to drop work for stale or unknown channels. Tests membership ofchannel_id(the instance id) in theredis_platform:instancesset, routing the call through_safe_publish()so a transient Redis blip retries rather than spuriously invalidating the channel; any error after retries, or no Redis connection, yieldsFalse. For the Redis platform this is reached throughcore.outbound_consumer(the in-Gateway RPC worker) on behalf of the inference worker’score.proxy_adapter.ProxyPlatformAdapter.
- async send(channel_id, text)[source]
Publish a text message to the instance’s response Pub/Sub channel.
Implements the abstract
PlatformAdapter.send(). Mints a message id and, when connected, publishes a JSONmessageenvelope (id, text, timestamp) to thestargazer:redis_platform:response:{channel_id}Pub/Sub channel through_safe_publish(), so any?wait=truecaller subscribed inweb/redis_platform_api.pycollects it synchronously. With no Redis connection the id is still returned but nothing is published. For the Redis platform this is dispatched bycore.outbound_consumer(the in-Gateway RPC worker) when the pipeline’sgenerate_and_sendandcommand_routeremit text output — tool status lines, final LLM responses, errors, and command results.
- async send_file(channel_id, data, filename, mimetype='application/octet-stream')[source]
Store a file in Redis with a 24h TTL and publish retrieval metadata.
Implements the abstract
PlatformAdapter.send_file(). When connected, it HSETs the base64-encoded payload plus filename, mimetype, size, timestamp, and channel id intoredis_platform:file:{file_id}, sets a 24-hour expiry, and publishes a lightweight JSONfileenvelope (id, filename, mimetype, size, download url, timestamp) to thestargazer:redis_platform:response:{channel_id}Pub/Sub channel — all in one pipeline run through_safe_publish(). Only metadata, never the bytes, crosses Pub/Sub; the file itself is fetched later viaGET /redis-platform/files/{file_id}/{filename}inweb/redis_platform_api.py. For the Redis platform this is dispatched bycore.outbound_consumerwhen the pipeline sends an attachment. The download url is always returned, even when no Redis connection means nothing was stored.- Parameters:
- Returns:
The relative download url for the stored file.
- Return type:
- async fetch_history(channel_id, limit=100)[source]
Return no backfill history — Redis instances have no external source.
Implements the abstract
PlatformAdapter.fetch_history(). Unlike Discord or Matrix, a Redis instance has no upstream account to backfill from: every message it has ever seen was written by the pipeline and already lives in the MessageCache, which serves history directly toweb/redis_platform_api.py. So this unconditionally returns an empty list, telling the backfill path there is nothing to import. Reached for the Redis platform throughcore.outbound_consumer(the in-Gateway RPC worker) on behalf of callers likebackground_tasksandmessage_processor.history_backfill.- Parameters:
- Returns:
Always an empty list.
- Return type: