message_cache
Redis-backed message cache with per-channel sorted-set indexes and RediSearch vector search.
Each message is stored as a Redis hash keyed by msg:{uuid} with
the embedding as a binary FLOAT32 blob. A per-channel sorted set
(channel_msgs:{platform}:{channel_id}) provides O(log N) time-
ordered lookups by channel, while a RediSearch HNSW index
(idx:messages) enables semantic KNN similarity search.
All cached messages are represented as CachedMessage objects
and are given a 45-day TTL in Redis.
- class message_cache.CachedMessage(user_id, user_name, platform, channel_id, text, timestamp, embedding=<factory>, message_key='', message_id='', reply_to_id='')[source]
Bases:
objectA single cached message stored in Redis.
Construct from raw JSON via
from_json()/from_dict(), or letMessageCachebuild one for you.- Parameters:
- message_id: str = ''
Platform-specific message identifier (Discord message ID, Matrix event ID, etc.).
- classmethod from_dict(data)[source]
Construct from dict data.
- Parameters:
- Returns:
The result.
- Return type:
- classmethod from_json(raw)[source]
Construct from json data.
- Parameters:
raw (
str) – The raw value.- Returns:
The result.
- Return type:
- class message_cache.MessageCache(redis_url, openrouter_client, embedding_model='google/gemini-embedding-001', ssl_kwargs=None)[source]
Bases:
objectAsync Redis message cache with automatic embedding generation and RediSearch-backed vector search.
- Parameters:
redis_url (
str) – Redis connection URL (e.g."redis://localhost:6379/0").openrouter_client (
OpenRouterClient) – SharedOpenRouterClientused to call the embeddings API.embedding_model (
str) – Model identifier for the embeddings endpoint (e.g."google/gemini-embedding-001").ssl_kwargs (dict | None)
- __init__(redis_url, openrouter_client, embedding_model='google/gemini-embedding-001', ssl_kwargs=None)[source]
Initialize the instance.
- Parameters:
redis_url (
str) – The redis url value.openrouter_client (
OpenRouterClient) – The openrouter client value.embedding_model (
str) – The embedding model value.ssl_kwargs (
dict|None) – Optional SSL/mTLS keyword arguments forwarded toredis.asyncio.from_url.
- Return type:
None
- async log_message(platform, channel_id, user_id, user_name, text, timestamp=None, embedding=None, defer_embedding=False, message_id='', reply_to_id='')[source]
Log a message to Redis with an embedding and a 45-day TTL.
The message is stored as a Redis hash (
msg:{uuid}) with the embedding as a binary FLOAT32 blob, automatically indexed by theidx:messagesRediSearch index.- Parameters:
embedding (
list[float] |None) – Pre-computed embedding vector. When provided the internal embedding API call is skipped, saving a round-trip.defer_embedding (
bool) – WhenTrueand no pre-computed embedding is given, store a zero-vector placeholder instead of calling the embeddings API. The caller is responsible for enqueuing the returnedmessage_keyin anEmbeddingBatchQueueso the real embedding is written later.platform (str)
channel_id (str)
user_id (str)
user_name (str)
text (str)
timestamp (float | None)
message_id (str)
reply_to_id (str)
- Returns:
The message object that was written to Redis (includes the embedding vector, or an empty list when deferred).
- Return type:
- async get_recent(platform, channel_id, count=50)[source]
Return the count most recent cached messages (newest first).
Uses the per-channel sorted set for O(log N) lookups without depending on the RediSearch module.
- Return type:
- Parameters:
- async get_by_timerange(platform, channel_id, start, end)[source]
Return cached messages within a Unix-timestamp range (inclusive).
Uses the per-channel sorted set with ZRANGEBYSCORE for O(log N) range lookups. Results are ordered ascending by timestamp.
- async get_messages_after(platform, channel_id, after_ts_exclusive, *, zset_batch=5000)[source]
Messages with zset score strictly greater than after_ts_exclusive.
When after_ts_exclusive is
None, returns all messages in the channel zset (ascending by time). Uses Redis exclusive-interval syntax(scoreon the lower bound.
- async update_text_by_message_id(platform, channel_id, message_id, new_text)[source]
Update the
textfield of a cached message identified by its platform message ID.Scans the most recent entries in the channel ZSET to find the matching hash. Returns the Redis key if the message was found and updated,
Noneotherwise.
- async find_key_by_message_id(platform, channel_id, message_id)[source]
Find the Redis key for a cached message by its platform message ID.
Returns the key string (e.g.
"msg:abc-123") orNone.
- async find_keys_by_message_ids(platform, channel_id, message_ids)[source]
Batch version of
find_key_by_message_id().Returns a mapping of
{message_id: redis_key}for every message_id that was found in Redis. Missing IDs are omitted.Fetches the channel zset once and builds the full lookup map in a single pipeline, instead of repeating the scan per message.
- async has_real_embedding(redis_key)[source]
Return
Trueif redis_key has a non-zero embedding blob.A zero-vector placeholder (all zeros) is treated as missing. Uses the raw (non-decoded) Redis connection to avoid UTF-8 corruption of binary float32 data.
- async has_real_embedding_many(redis_keys)[source]
Pipelined version of
has_real_embedding().Returns a list of booleans, one per key, indicating whether each key has a non-zero embedding. Uses the raw (non-decoded) Redis connection to correctly handle binary data.
- async mark_deleted_by_message_id(platform, channel_id, message_id, deleted_at_iso)[source]
Prepend a
[deleted at TIMESTAMP]marker to a cached message’s text.The original text is preserved. Returns
Trueif the message was found and updated.
- async search_messages(query, limit=10, channel_id=None, platform=None, user_id=None)[source]
Semantic search across cached messages using RediSearch KNN.
Generates an embedding for query, then runs a vector similarity search filtered by optional channel_id, platform, and/or user_id.
Returns a list of dicts with
text,user_name,timestamp,score,channel_id, andplatform.
- async get_recent_for_user(platform, user_id, limit=20)[source]
Return the most recent cached messages sent by user_id.
Uses a RediSearch
FT.SEARCHquery filtered by platform and user_id, sorted by timestamp descending. This works across all channels (including DMs) without needing to know the channel ID.- Return type:
- Parameters:
- async log_thought_summary(channel_id, thought_text, timestamp=None)[source]
Store a thought summary in a per-channel Redis sorted set.
Deduplicates by comparing against the most recent entry — if it has the same content the new entry is silently skipped.
- async get_recent_thought_summaries(channel_id, limit=30)[source]
Retrieve unique thought summaries for a channel (newest first).
- async backfill_channel_indexes(batch_size=500)[source]
Populate per-channel ZSETs from existing
msg:*hashes.Scans all
msg:*keys and inserts them into the appropriatechannel_msgs:{platform}:{channel_id}sorted set. Safe to run multiple times – ZADD is idempotent for existing members.Returns the number of messages indexed.
- property redis_client: redis.asyncio.Redis
The underlying async Redis connection.
Exposed so that other subsystems (e.g.
ToolContext) can share the same connection without reaching into private state.