Source code for platforms.redis
"""Redis Platform Adapter.
Provides a fully Redis-backed platform for the Stargazer v3 pipeline.
External callers interact with instances via the REST API defined in
``web/redis_platform_api.py``; this adapter bridges those API calls into
the standard :class:`~message_processor.MessageProcessor` pipeline.
Architecture:
- One adapter instance is created per Gateway boot by
``platforms.factory.create_platform`` and runs inside the Gateway
service.
- Each "instance" is a logical shard identified by an ``instance_id``
(which maps to ``channel_id`` inside the pipeline).
- Per-instance API keys authenticate callers; the adapter manages these
in a Redis hash ``redis_platform:meta:{instance_id}``.
- Outbound ``send()`` / ``send_file()`` calls publish to a Pub/Sub
channel so ``?wait=true`` callers can collect synchronous responses.
- Files are stored in Redis with a 24h TTL and served via a dedicated
download endpoint.
"""
from __future__ import annotations
import base64
import jsonutil as json
import logging
import secrets
import time
import uuid
from datetime import datetime, timezone
from typing import Any
import redis.asyncio as aioredis
from platforms.base import (
Attachment,
HistoricalMessage,
IncomingMessage,
MessageHandler,
PlatformAdapter,
)
logger = logging.getLogger(__name__)
# Redis Pub/Sub channel prefix for outbound response events
_PUBSUB_PREFIX = "stargazer:redis_platform:response"
# Redis key namespaces
_INSTANCES_SET = "redis_platform:instances"
_META_PREFIX = "redis_platform:meta"
_FILE_PREFIX = "redis_platform:file"
[docs]
class RedisPlatformAdapter(PlatformAdapter):
"""Platform adapter whose entire state lives in Redis.
One adapter instance manages **all** Redis platform instances — each
logical conversation is identified by ``instance_id``, which the
pipeline treats as the ``channel_id``.
Parameters
----------
message_handler:
Callback wired by the Gateway service to its inbound handler,
which publishes the message onto the ``sg:stream:inbound`` Redis
stream for the Inference service to pick up. Every injected
message is forwarded here for pipeline processing.
config:
Bot configuration object. Used to build the Redis connection.
When ``None``, the adapter starts without a Redis connection (safe
for unit tests with mocked state).
"""
[docs]
def __init__(
self,
message_handler: MessageHandler,
config: Any | None = None,
**kwargs: Any,
) -> None:
"""Construct the Redis-backed platform adapter.
Stores the supplied ``message_handler`` (via the
:class:`PlatformAdapter` base) and ``config`` without opening any
connection; the actual Redis client is created lazily in
:meth:`start`. Until then ``self._redis`` is ``None`` and
``self._running`` is ``False``. Extra keyword arguments are accepted
and ignored so ``platforms.factory.create_platform`` can pass adapter
options uniformly across platform types.
Args:
message_handler (MessageHandler): Async callback wired by the
Gateway service that forwards each injected
:class:`~platforms.base.IncomingMessage` onto the
``sg:stream:inbound`` Redis stream for the Inference service.
config (Any | None): Bot configuration used to build the Redis
connection in :meth:`start`. When ``None``, the adapter runs
without Redis and registry operations no-op (used by tests).
**kwargs (Any): Additional adapter options, accepted and ignored
for factory call-site uniformity.
"""
super().__init__(message_handler)
self._config = config
self._running = False
self._redis: aioredis.Redis | None = None
# ------------------------------------------------------------------
# Lifecycle
# ------------------------------------------------------------------
[docs]
async def start(self) -> None:
"""Connect to Redis and mark the adapter as running.
Lazily builds the async Redis client the rest of this adapter relies on,
preferring the config's ``build_async_redis_client`` (Sentinel-aware,
failover-resilient, tracks master re-election) and falling back to a bare
``aioredis.from_url`` with the config's SSL kwargs when that helper is
absent. When no Redis URL or Sentinel set is configured the connection is
skipped and a warning is logged, leaving every registry operation to
no-op; in all cases ``self._running`` is flipped to ``True``. Invoked
once per Gateway boot after ``platforms.factory.create_platform`` builds
this adapter, as part of platform lifecycle startup.
Raises:
redis.exceptions.RedisError: Propagated if the underlying client
construction surfaces a connection error eagerly.
"""
_has_redis = self._config is not None and (
getattr(self._config, "redis_url", None)
or getattr(self._config, "redis_sentinels", None)
)
if _has_redis:
# Build a Sentinel-aware, failover-resilient client so this adapter
# tracks master re-election instead of pinning a stale master via a
# bare from_url (matches MessageCache wiring).
if hasattr(self._config, "build_async_redis_client"):
self._redis = self._config.build_async_redis_client(
decode_responses=True,
)
else:
ssl_kw = self._config.redis_connection_kwargs_for_url(
self._config.redis_url,
)
self._redis = aioredis.from_url(
self._config.redis_url,
decode_responses=True,
**ssl_kw,
)
logger.info(
"Redis platform adapter connected (sentinels=%s, url=%s)",
bool(getattr(self._config, "redis_sentinels", None)),
getattr(self._config, "redis_url", "") or "<sentinel>",
)
else:
logger.warning(
"Redis platform adapter started without a Redis URL — "
"instance registry operations will no-op."
)
self._running = True
logger.info("Redis platform adapter started")
[docs]
async def stop(self) -> None:
"""Gracefully disconnect and release Redis resources.
Clears ``self._running`` and, if a client was opened in :meth:`start`,
closes it via ``aclose`` (swallowing and debug-logging any teardown
error) before dropping the reference so a later restart reconnects
cleanly. Invoked during platform lifecycle shutdown, the mirror of
:meth:`start`, when the Gateway service is stopping.
"""
self._running = False
if self._redis is not None:
try:
await self._redis.aclose()
except Exception:
logger.debug(
"Redis platform adapter: error during aclose", exc_info=True
)
self._redis = None
logger.info("Redis platform adapter stopped")
# ------------------------------------------------------------------
# Metadata (PlatformAdapter abstract properties)
# ------------------------------------------------------------------
@property
def name(self) -> str:
"""Return the platform's short identifier.
Implements the abstract :attr:`PlatformAdapter.name` property. The
constant ``"redis"`` is used as the adapter's key when the pipeline
builds its ``adapters_by_name`` maps in ``message_processor`` and
``gateway_main``, and to branch platform-specific behaviour
elsewhere.
Returns:
str: The literal ``"redis"``.
"""
return "redis"
@property
def is_running(self) -> bool:
"""Report whether the adapter has been started and not stopped.
Implements the abstract :attr:`PlatformAdapter.is_running` property.
Reflects the ``self._running`` flag set in :meth:`start` and cleared
in :meth:`stop`. Read by ``background_tasks`` (which waits for
adapters to come up before backfill), ``gateway_main``, ``web/deps``,
``web/platforms_api``, and ``web/bot_admin`` for status reporting.
Returns:
bool: ``True`` while the adapter is running, ``False`` otherwise.
"""
return self._running
@property
def bot_identity(self) -> dict[str, str]:
"""Describe this bot's identity on the Redis platform.
Implements the abstract :attr:`PlatformAdapter.bot_identity`
property. Unlike Discord or Matrix, the Redis platform has no live
account to query, so a fixed identity for "Star" is returned. Read
across adapters by ``prompt_context`` (which gathers ``bot_identity``
for every adapter), ``message_processor.user_message_format``,
``message_processor.history_backfill``, and ``background_tasks`` to
label and attribute the bot's own messages.
Returns:
dict[str, str]: A mapping with ``platform``, ``user_id``,
``display_name``, and ``mention`` keys for the Redis bot identity.
"""
return {
"platform": "redis",
"user_id": "star",
"display_name": "Star",
"mention": "@Star",
}
[docs]
async def should_skip_channel_heartbeat(self, channel_id: str) -> bool:
"""Always skip the proactive channel heartbeat for Redis instances.
Redis platform instances are driven entirely by explicit API calls
(``inject_message``), so the bot never needs to wake them with
unsolicited background heartbeat messages the way it does for live chat
channels; this unconditionally returns ``True``. Consulted by
``message_processor.channel_heartbeat`` before it would otherwise emit a
heartbeat, and reached for the Redis platform through
``core.outbound_consumer`` (the in-Gateway RPC worker that resolves the
inference worker's ``core.proxy_adapter.ProxyPlatformAdapter`` calls).
Args:
channel_id (str): The instance id under consideration. Ignored — the
answer is the same for every Redis instance.
Returns:
bool: Always ``True``.
"""
return True
# ------------------------------------------------------------------
# Instance registry
# ------------------------------------------------------------------
[docs]
async def create_instance(
self,
instance_id: str | None = None,
display_name: str = "",
api_key: str | None = None,
metadata: dict[str, Any] | None = None,
) -> dict[str, str]:
"""Create a new Redis platform instance and persist its registry record.
Mints an ``instance_id`` (random 16-hex when not supplied) and an API key
(a ``secrets.token_urlsafe`` token when not supplied), then writes both —
plus the display name, a UTC ``created_at`` stamp, and any flattened
caller metadata — into Redis: the id is added to the
``redis_platform:instances`` set and the metadata into the
``redis_platform:meta:{instance_id}`` hash, in a single pipeline. With no
Redis connection the call still returns a fresh id/key pair but persists
nothing. The plaintext key is returned only here and never retrievable
afterward, since registry reads redact it. Invoked by the create-instance
route in ``web/redis_platform_api.py``.
Args:
instance_id (str | None): Desired instance id; a random hex id is
generated when ``None``.
display_name (str): Human-friendly label; defaults to the id itself
when empty.
api_key (str | None): Pre-chosen key; a secure token is generated
when ``None``.
metadata (dict[str, Any] | None): Extra fields stored as top-level
hash fields; keys and values are stringified, so callers should
supply string-coercible values.
Returns:
dict[str, str]: A mapping with ``instance_id`` and the one-time
plaintext ``api_key``.
"""
iid = instance_id or uuid.uuid4().hex[:16]
key = secrets.token_urlsafe(48) if api_key is None else api_key
if self._redis is not None:
meta: dict[str, str] = {
"api_key": key,
"display_name": display_name or iid,
"created_at": datetime.now(timezone.utc).isoformat(),
}
if metadata:
# Flatten caller-supplied metadata as top-level hash fields.
# Callers must ensure values are strings.
for k, v in metadata.items():
meta[str(k)] = str(v)
pipe = self._redis.pipeline()
pipe.sadd(_INSTANCES_SET, iid)
pipe.hset(f"{_META_PREFIX}:{iid}", mapping=meta)
await pipe.execute()
logger.info("Redis platform: created instance %s", iid)
return {"instance_id": iid, "api_key": key}
[docs]
async def validate_api_key(self, instance_id: str, api_key: str) -> bool:
"""Validate a per-instance API key using constant-time comparison.
Reads the stored key from the ``redis_platform:meta:{instance_id}`` hash
and compares it against the presented key with
``secrets.compare_digest`` to avoid leaking timing information. Returns
``False`` rather than raising when there is no Redis connection, the
instance does not exist, or no key is stored, so callers can map every
failure to a uniform 401/403. Invoked by the bearer-auth dependency in
``web/redis_platform_api.py`` that guards the per-instance routes.
Args:
instance_id (str): The instance whose stored key to check.
api_key (str): The candidate key presented by the caller.
Returns:
bool: ``True`` only when a key is stored and matches; ``False``
otherwise.
"""
if self._redis is None:
return False
stored: str | None = await self._redis.hget(
f"{_META_PREFIX}:{instance_id}",
"api_key",
)
if not stored:
return False
return secrets.compare_digest(stored, api_key)
[docs]
async def instance_exists(self, instance_id: str) -> bool:
"""Report whether an instance is registered.
Checks membership of ``instance_id`` in the
``redis_platform:instances`` set, returning ``False`` when there is no
Redis connection. Used as a precondition guard before instance-scoped
work: by :meth:`inject_message` and :meth:`update_instance_meta` here,
and by the user and inject routes in ``web/redis_platform_api.py`` that
404 on a missing instance.
Args:
instance_id (str): The candidate instance id.
Returns:
bool: ``True`` if the id is in the registry set, ``False`` otherwise.
"""
if self._redis is None:
return False
return bool(await self._redis.sismember(_INSTANCES_SET, instance_id))
[docs]
async def list_instances(self) -> list[dict[str, str]]:
"""Return redacted metadata for every registered instance.
Reads the ``redis_platform:instances`` set, then fetches each
``redis_platform:meta:{instance_id}`` hash in id-sorted order, stripping
the ``api_key`` field and stamping in the ``instance_id`` so the result
is safe to expose. Returns an empty list when there is no Redis
connection. Invoked by the list-instances route in
``web/redis_platform_api.py``.
Returns:
list[dict[str, str]]: One metadata dict per instance, each carrying
``instance_id`` and never the API key, sorted by id.
"""
if self._redis is None:
return []
instance_ids: set[str] = await self._redis.smembers(_INSTANCES_SET)
result: list[dict[str, str]] = []
for iid in sorted(instance_ids):
meta = await self._redis.hgetall(f"{_META_PREFIX}:{iid}")
meta.pop("api_key", None) # Never expose key in list response
meta["instance_id"] = iid
result.append(meta)
return result
[docs]
async def get_instance_meta(self, instance_id: str) -> dict[str, str] | None:
"""Return one instance's redacted metadata, or ``None`` if absent.
Reads the ``redis_platform:meta:{instance_id}`` hash, drops the
``api_key`` field, and stamps in ``instance_id`` before returning. Yields
``None`` when there is no Redis connection or the hash is empty (i.e. the
instance is unknown). Invoked by the get-instance and related routes in
``web/redis_platform_api.py``.
Args:
instance_id (str): The instance whose metadata to fetch.
Returns:
dict[str, str] | None: The key-redacted metadata with an
``instance_id`` field, or ``None`` when the instance has no metadata.
"""
if self._redis is None:
return None
meta = await self._redis.hgetall(f"{_META_PREFIX}:{instance_id}")
if not meta:
return None
meta.pop("api_key", None)
meta["instance_id"] = instance_id
return meta
[docs]
async def update_instance_meta(
self,
instance_id: str,
display_name: str | None = None,
metadata: dict[str, Any] | None = None,
) -> bool:
"""Update mutable fields on an instance's metadata hash.
Verifies the instance exists, then HSETs the changed fields into
``redis_platform:meta:{instance_id}`` — the display name when supplied
and any caller metadata, with keys and values stringified — while leaving
the API key and creation stamp untouched. A no-op write is skipped when
nothing changed. Returns ``False`` without writing when there is no Redis
connection or the instance is unknown. Invoked by the update-instance
route in ``web/redis_platform_api.py``.
Args:
instance_id (str): The instance to update.
display_name (str | None): New display name; left unchanged when
``None``.
metadata (dict[str, Any] | None): Extra fields to merge in as
top-level hash fields, stringified.
Returns:
bool: ``True`` when the instance exists (whether or not any field
actually changed); ``False`` when it is missing or Redis is absent.
"""
if self._redis is None or not await self.instance_exists(instance_id):
return False
updates: dict[str, str] = {}
if display_name is not None:
updates["display_name"] = display_name
if metadata:
for k, v in metadata.items():
updates[str(k)] = str(v)
if updates:
await self._redis.hset(f"{_META_PREFIX}:{instance_id}", mapping=updates)
return True
[docs]
async def register_user(
self,
instance_id: str,
user_id: str,
display_name: str,
metadata: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""Register (or overwrite) a user in an instance's user registry.
Builds a user record — id, display name, arbitrary metadata, and a UTC
``registered_at`` stamp — and HSETs it as a JSON value under the field
``user_id`` in the ``redis_platform:users:{instance_id}`` hash, replacing
any prior entry for that user. With no Redis connection the record is
built and returned but not persisted. Once any user is registered the
instance becomes a closed roster that :meth:`inject_message` enforces.
Invoked by the register-user route in ``web/redis_platform_api.py``.
Args:
instance_id (str): The instance to register the user under.
user_id (str): Stable identifier used as the hash field and for
pipeline attribution.
display_name (str): Human-friendly name surfaced in history and the
system prompt.
metadata (dict[str, Any] | None): Optional extra attributes stored
verbatim inside the JSON record.
Returns:
dict[str, Any]: The stored user record (id, display name, metadata,
and ``registered_at``).
"""
user_info = {
"user_id": user_id,
"display_name": display_name,
"metadata": metadata or {},
"registered_at": datetime.now(timezone.utc).isoformat(),
}
if self._redis is not None:
await self._redis.hset(
f"redis_platform:users:{instance_id}",
user_id,
json.dumps(user_info),
)
return user_info
[docs]
async def list_users(self, instance_id: str) -> list[dict[str, Any]]:
"""Return every registered user for an instance.
Reads the whole ``redis_platform:users:{instance_id}`` hash and decodes
each JSON value into a record, skipping (and warning about) any value
that fails to deserialize so one corrupt entry cannot break the listing.
Returns an empty list when there is no Redis connection. Invoked by the
list-users route in ``web/redis_platform_api.py``.
Args:
instance_id (str): The instance whose roster to read.
Returns:
list[dict[str, Any]]: The decoded user records, sorted by
``user_id``.
"""
if self._redis is None:
return []
raw_users = await self._redis.hgetall(f"redis_platform:users:{instance_id}")
result: list[dict[str, Any]] = []
for user_id, raw_val in raw_users.items():
try:
result.append(json.loads(raw_val))
except Exception:
logger.warning("Failed to deserialize user JSON for user %s", user_id)
return sorted(result, key=lambda x: x.get("user_id", ""))
[docs]
async def get_user(self, instance_id: str, user_id: str) -> dict[str, Any] | None:
"""Return one registered user record, or ``None`` if absent.
HGETs the ``user_id`` field from the
``redis_platform:users:{instance_id}`` hash and JSON-decodes it, treating
a missing field or any decode failure as "not found" by returning
``None``. Also returns ``None`` when there is no Redis connection. Called
by the get-user route in ``web/redis_platform_api.py`` and by
:meth:`inject_message` here to resolve the canonical display name for a
roster-enforced instance.
Args:
instance_id (str): The instance to look in.
user_id (str): The user id (hash field) to fetch.
Returns:
dict[str, Any] | None: The decoded user record, or ``None`` when it
is missing or unreadable.
"""
if self._redis is None:
return None
raw_val = await self._redis.hget(f"redis_platform:users:{instance_id}", user_id)
if not raw_val:
return None
try:
return json.loads(raw_val)
except Exception:
return None
[docs]
async def delete_user(self, instance_id: str, user_id: str) -> bool:
"""Remove a user from an instance's roster.
HDELs the ``user_id`` field from the
``redis_platform:users:{instance_id}`` hash and reports whether anything
was actually removed. Returns ``False`` without touching Redis when there
is no connection. Invoked by the delete-user route in
``web/redis_platform_api.py``.
Args:
instance_id (str): The instance to remove the user from.
user_id (str): The user id (hash field) to delete.
Returns:
bool: ``True`` when a field was deleted, ``False`` when the user was
already absent or Redis is unavailable.
"""
if self._redis is None:
return False
deleted = await self._redis.hdel(f"redis_platform:users:{instance_id}", user_id)
return bool(deleted)
[docs]
async def user_exists(self, instance_id: str, user_id: str) -> bool:
"""Report whether a specific user is on an instance's roster.
Uses an ``HEXISTS`` on the ``redis_platform:users:{instance_id}`` hash so
it can answer without deserializing the record, returning ``False`` when
there is no Redis connection. No in-repo callers were found, so this is a
convenience predicate available to API or test code rather than part of
the live request path.
Args:
instance_id (str): The instance whose roster to check.
user_id (str): The user id (hash field) to test for.
Returns:
bool: ``True`` if the user is registered, ``False`` otherwise.
"""
if self._redis is None:
return False
return bool(await self._redis.hexists(f"redis_platform:users:{instance_id}", user_id))
[docs]
async def has_users(self, instance_id: str) -> bool:
"""Report whether an instance has a non-empty user roster.
Uses an ``HLEN`` on the ``redis_platform:users:{instance_id}`` hash to
decide whether the instance enforces a closed roster, returning ``False``
when there is no Redis connection. Called by :meth:`inject_message` here:
when ``True`` it requires the sender to be a registered user and pulls
their canonical display name; when ``False`` it accepts any sender that
supplies a name.
Args:
instance_id (str): The instance whose roster size to check.
Returns:
bool: ``True`` when at least one user is registered, ``False``
otherwise.
"""
if self._redis is None:
return False
return bool(await self._redis.hlen(f"redis_platform:users:{instance_id}") > 0)
[docs]
async def delete_instance(self, instance_id: str) -> dict[str, Any]:
"""Delete an instance and purge its associated Redis state.
In a single pipeline this removes the registry membership and
``redis_platform:meta:{instance_id}`` hash, the per-channel message-
batching toggles (``message_batching_{disabled,enabled,batch_window}``),
the ``channel_goals`` and ``channel_recall_zset`` channel state, the
MessageCache ``channel_msgs:redis:{instance_id}`` sorted set, any lingering
``redis_platform:wait_lock`` entry, and the
``redis_platform:users`` roster hash. Only Redis-backed state is touched
here; any per-instance state elsewhere in the pipeline is the caller's
responsibility. With no Redis connection it cleans nothing and flags a
``no_redis`` warning. Invoked by the delete-instance route in
``web/redis_platform_api.py``.
Args:
instance_id (str): The instance to delete.
Returns:
dict[str, Any]: A summary with the ``instance_id`` and a ``cleaned``
list of the state categories removed; includes a ``warning`` of
``no_redis`` when there was no connection.
"""
channel_key = f"redis:{instance_id}"
cleaned: list[str] = []
if self._redis is None:
return {
"instance_id": instance_id,
"cleaned": cleaned,
"warning": "no_redis",
}
pipe = self._redis.pipeline()
# 1. Registry membership and metadata hash
pipe.srem(_INSTANCES_SET, instance_id)
pipe.delete(f"{_META_PREFIX}:{instance_id}")
# 2. Message batching feature toggles
for suffix in ("disabled", "enabled", "batch_window"):
pipe.delete(f"message_batching_{suffix}:{channel_key}")
# 3. Channel goals and semantic recall sorted sets
pipe.delete(f"channel_goals:{channel_key}")
pipe.delete(f"channel_recall_zset:{channel_key}")
# 4. MessageCache channel zset (orphaned messages sorted set)
pipe.delete(f"channel_msgs:redis:{instance_id}")
# 5. Any lingering wait lock
pipe.delete(f"redis_platform:wait_lock:{instance_id}")
# 6. User registry
pipe.delete(f"redis_platform:users:{instance_id}")
await pipe.execute()
cleaned.extend(["registry", "toggles", "channel_state", "message_cache_zset", "users"])
logger.info(
"Redis platform: deleted instance %s (cleaned: %s)", instance_id, cleaned
)
return {"instance_id": instance_id, "cleaned": cleaned}
# ------------------------------------------------------------------
# Message injection
# ------------------------------------------------------------------
[docs]
async def inject_message(
self,
instance_id: str,
text: str,
user_id: str,
user_name: str = "",
*,
is_addressed: bool = True,
attachments: list[Attachment] | None = None,
channel_name: str = "",
message_id: str = "",
reply_to_id: str = "",
extra: dict[str, Any] | None = None,
reactions: str = "",
response_nonce: str = "",
) -> str:
"""Inject a message into the pipeline for *instance_id*.
Parameters
----------
instance_id:
Must already exist in the instance registry.
text:
Message body.
user_id:
Caller-supplied sender identifier (used for KG, threadweave,
privilege checks).
user_name:
Display name shown in conversation history and system prompt.
is_addressed:
Defaults to ``True`` — must be ``True`` for the message to
bypass the proactive triage gate and guarantee a response.
response_nonce:
Forwarded in ``msg.extra["_response_nonce"]`` so the turn-
complete signal can be correlated with a specific request.
Returns
-------
str
The ``message_id`` assigned to this message.
"""
# Guard: verify instance exists before building any state
if self._redis is not None:
if not await self.instance_exists(instance_id):
raise ValueError(
f"Redis platform instance '{instance_id}' does not exist"
)
# User validation
if await self.has_users(instance_id):
user_info = await self.get_user(instance_id, user_id)
if not user_info:
raise ValueError(
f"User '{user_id}' is not registered in instance '{instance_id}'"
)
user_name = user_info["display_name"]
else:
if not user_name:
raise ValueError(
f"No users are registered in instance '{instance_id}', "
"so 'user_name' must be provided."
)
mid = message_id or uuid.uuid4().hex
# Look up the display name for the channel_name fallback
display_name = ""
if self._redis is not None:
display_name = (
await self._redis.hget(
f"{_META_PREFIX}:{instance_id}",
"display_name",
)
or ""
)
msg = IncomingMessage(
platform="redis",
channel_id=instance_id,
user_id=user_id,
user_name=user_name,
text=text,
is_addressed=is_addressed,
attachments=attachments or [],
channel_name=channel_name or display_name,
message_id=mid,
reply_to_id=reply_to_id,
extra={**(extra or {}), "_response_nonce": response_nonce},
reactions=reactions,
)
# _message_handler is the Gateway's inbound handler, which
# translates this IncomingMessage into an inbound envelope and
# publishes it onto the sg:stream:inbound Redis stream. The
# Inference service consumes that stream and runs the message
# through the processing pipeline.
await self._message_handler(msg, self)
return mid
async def _safe_publish(self, func: Any, *args: Any, **kwargs: Any) -> Any:
"""Run a Redis command with bounded retries and exponential backoff.
Awaits ``func(*args, **kwargs)`` and, on a transient connection or
timeout error, retries up to three attempts total with backoff doubling
from 0.5s; the final failure is logged at error level and re-raised, so
callers see an exception only after exhausting retries. This wraps the
adapter's outbound Redis calls — the publishes in :meth:`send`, the file
pipeline execute in :meth:`send_file`, and the ``SISMEMBER`` in
:meth:`is_channel_valid` — to ride out brief blips and Sentinel
failovers. Other (non-connection) exceptions propagate immediately.
Args:
func (Any): The awaitable Redis operation to invoke, e.g.
``self._redis.publish`` or a pipeline's ``execute``.
*args (Any): Positional arguments forwarded to ``func``.
**kwargs (Any): Keyword arguments forwarded to ``func``.
Returns:
Any: Whatever ``func`` returns on the first successful attempt.
Raises:
redis.exceptions.ConnectionError: Re-raised after the final attempt
still fails to connect.
redis.exceptions.TimeoutError: Re-raised after the final attempt
times out.
ConnectionError: Re-raised after the final attempt on a bare
connection error.
"""
import asyncio
max_retries = 3
backoff = 0.5
for attempt in range(1, max_retries + 1):
try:
return await func(*args, **kwargs)
except (aioredis.ConnectionError, aioredis.TimeoutError, ConnectionError) as exc:
func_name = getattr(func, "__name__", str(func))
if attempt == max_retries:
logger.error(
"Redis platform adapter: Permanent connection failure on command %s after %d attempts: %s",
func_name, attempt, exc
)
raise
logger.warning(
"Redis platform adapter: Transient connection error on command %s, retrying attempt %d/%d in %.1fs: %s",
func_name, attempt, max_retries, backoff, exc
)
await asyncio.sleep(backoff)
backoff *= 2
# ------------------------------------------------------------------
# Channel validation (PlatformAdapter abstract methods)
# ------------------------------------------------------------------
[docs]
async def is_channel_valid(self, channel_id: str) -> bool:
"""Report whether a channel id maps to a live Redis instance.
Implements the abstract :meth:`PlatformAdapter.is_channel_valid` check
used to drop work for stale or unknown channels. Tests membership of
``channel_id`` (the instance id) in the ``redis_platform:instances`` set,
routing the call through :meth:`_safe_publish` so a transient Redis blip
retries rather than spuriously invalidating the channel; any error after
retries, or no Redis connection, yields ``False``. For the Redis platform
this is reached through ``core.outbound_consumer`` (the in-Gateway RPC
worker) on behalf of the inference worker's
``core.proxy_adapter.ProxyPlatformAdapter``.
Args:
channel_id (str): The channel/instance id to validate.
Returns:
bool: ``True`` only when the instance is in the active set; ``False``
on absence, error, or missing connection.
"""
if self._redis is None:
return False
try:
return bool(
await self._safe_publish(
self._redis.sismember,
"redis_platform:instances",
channel_id,
)
)
except Exception:
return False
# ------------------------------------------------------------------
# Outbound messaging (PlatformAdapter abstract methods)
# ------------------------------------------------------------------
[docs]
async def send(self, channel_id: str, text: str) -> str:
"""Publish a text message to the instance's response Pub/Sub channel.
Implements the abstract :meth:`PlatformAdapter.send`. Mints a message id
and, when connected, publishes a JSON ``message`` envelope (id, text,
timestamp) to the ``stargazer:redis_platform:response:{channel_id}``
Pub/Sub channel through :meth:`_safe_publish`, so any ``?wait=true``
caller subscribed in ``web/redis_platform_api.py`` collects it
synchronously. With no Redis connection the id is still returned but
nothing is published. For the Redis platform this is dispatched by
``core.outbound_consumer`` (the in-Gateway RPC worker) when the pipeline's
``generate_and_send`` and ``command_router`` emit text output — tool
status lines, final LLM responses, errors, and command results.
Args:
channel_id (str): The instance id, used to address the Pub/Sub
channel.
text (str): The message body to deliver.
Returns:
str: The generated message id for this outbound message.
"""
msg_id = uuid.uuid4().hex
if self._redis is not None:
await self._safe_publish(
self._redis.publish,
f"{_PUBSUB_PREFIX}:{channel_id}",
json.dumps(
{
"type": "message",
"id": msg_id,
"text": text,
"timestamp": time.time(),
}
),
)
return msg_id
[docs]
async def send_file(
self,
channel_id: str,
data: bytes,
filename: str,
mimetype: str = "application/octet-stream",
) -> str | None:
"""Store a file in Redis with a 24h TTL and publish retrieval metadata.
Implements the abstract :meth:`PlatformAdapter.send_file`. When
connected, it HSETs the base64-encoded payload plus filename, mimetype,
size, timestamp, and channel id into ``redis_platform:file:{file_id}``,
sets a 24-hour expiry, and publishes a lightweight JSON ``file`` envelope
(id, filename, mimetype, size, download url, timestamp) to the
``stargazer:redis_platform:response:{channel_id}`` Pub/Sub channel — all
in one pipeline run through :meth:`_safe_publish`. Only metadata, never
the bytes, crosses Pub/Sub; the file itself is fetched later via
``GET /redis-platform/files/{file_id}/{filename}`` in
``web/redis_platform_api.py``. For the Redis platform this is dispatched
by ``core.outbound_consumer`` when the pipeline sends an attachment. The
download url is always returned, even when no Redis connection means
nothing was stored.
Args:
channel_id (str): The instance id used to address the Pub/Sub
channel and tag the stored file.
data (bytes): The raw file contents to store.
filename (str): The file's name, used in the download url and
metadata.
mimetype (str): The content type; defaults to
``application/octet-stream``.
Returns:
str | None: The relative download url for the stored file.
"""
file_id = uuid.uuid4().hex
download_url = f"/redis-platform/files/{file_id}/{filename}"
if self._redis is not None:
file_key = f"{_FILE_PREFIX}:{file_id}"
pipe = self._redis.pipeline()
pipe.hset(
file_key,
mapping={
"filename": filename,
"mimetype": mimetype,
"data_b64": base64.b64encode(data).decode(),
"size": str(len(data)),
"timestamp": str(time.time()),
"channel_id": channel_id,
},
)
# 24-hour TTL — files are ephemeral
pipe.expire(file_key, 86400)
pipe.publish(
f"{_PUBSUB_PREFIX}:{channel_id}",
json.dumps(
{
"type": "file",
"id": file_id,
"filename": filename,
"mimetype": mimetype,
"size": len(data),
"url": download_url,
"timestamp": time.time(),
}
),
)
await self._safe_publish(pipe.execute)
return download_url
# ------------------------------------------------------------------
# History (delegated entirely to MessageCache)
# ------------------------------------------------------------------
[docs]
async def fetch_history(
self,
channel_id: str,
limit: int = 100,
) -> list[HistoricalMessage]:
"""Return no backfill history — Redis instances have no external source.
Implements the abstract :meth:`PlatformAdapter.fetch_history`. Unlike
Discord or Matrix, a Redis instance has no upstream account to backfill
from: every message it has ever seen was written by the pipeline and
already lives in the MessageCache, which serves history directly to
``web/redis_platform_api.py``. So this unconditionally returns an empty
list, telling the backfill path there is nothing to import. Reached for
the Redis platform through ``core.outbound_consumer`` (the in-Gateway RPC
worker) on behalf of callers like ``background_tasks`` and
``message_processor.history_backfill``.
Args:
channel_id (str): The instance id requesting history. Ignored.
limit (int): Maximum messages the caller would accept. Ignored.
Returns:
list[HistoricalMessage]: Always an empty list.
"""
return []