message_cache

Redis-backed message cache with per-channel sorted-set indexes and RediSearch vector search.

Each message is stored as a Redis hash keyed by msg:{uuid} with the embedding as a binary FLOAT32 blob. A per-channel sorted set (channel_msgs:{platform}:{channel_id}) provides O(log N) time- ordered lookups by channel, while a RediSearch HNSW index (idx:messages) enables semantic KNN similarity search.

All cached messages are represented as CachedMessage objects and are given a 45-day TTL in Redis.

class message_cache.CachedMessage(user_id, user_name, platform, channel_id, text, timestamp, embedding=<factory>, message_key='', message_id='', reply_to_id='')[source]

Bases: object

A single cached message stored in Redis.

Construct from raw JSON via from_json() / from_dict(), or let MessageCache build one for you.

Parameters:
user_id: str
user_name: str
platform: str
channel_id: str
text: str
timestamp: float
embedding: list[float]
message_key: str = ''
message_id: str = ''

Platform-specific message identifier (Discord message ID, Matrix event ID, etc.).

reply_to_id: str = ''

Platform-specific ID of the message this one replies to, if any.

to_dict()[source]

Convert to dict representation.

Returns:

The result.

Return type:

dict[str, Any]

to_json()[source]

Convert to json representation.

Returns:

Result string.

Return type:

str

to_redis_hash()[source]

Produce the mapping written to a Redis hash key.

Return type:

dict[str, str | bytes | float]

classmethod from_dict(data)[source]

Construct from dict data.

Parameters:

data (dict[str, Any]) – Input data payload.

Returns:

The result.

Return type:

CachedMessage

classmethod from_json(raw)[source]

Construct from json data.

Parameters:

raw (str) – The raw value.

Returns:

The result.

Return type:

CachedMessage

classmethod from_redis_hash(mapping, key='')[source]

Construct from a Redis HGETALL result.

Return type:

CachedMessage

Parameters:
property repr: str

Human-readable one-liner suitable for prompt injection or logs.

__repr__()[source]

Internal helper: repr .

Returns:

Result string.

Return type:

str

class message_cache.MessageCache(redis_url, openrouter_client, embedding_model='google/gemini-embedding-001', ssl_kwargs=None)[source]

Bases: object

Async Redis message cache with automatic embedding generation and RediSearch-backed vector search.

Parameters:
  • redis_url (str) – Redis connection URL (e.g. "redis://localhost:6379/0").

  • openrouter_client (OpenRouterClient) – Shared OpenRouterClient used to call the embeddings API.

  • embedding_model (str) – Model identifier for the embeddings endpoint (e.g. "google/gemini-embedding-001").

  • ssl_kwargs (dict | None)

__init__(redis_url, openrouter_client, embedding_model='google/gemini-embedding-001', ssl_kwargs=None)[source]

Initialize the instance.

Parameters:
  • redis_url (str) – The redis url value.

  • openrouter_client (OpenRouterClient) – The openrouter client value.

  • embedding_model (str) – The embedding model value.

  • ssl_kwargs (dict | None) – Optional SSL/mTLS keyword arguments forwarded to redis.asyncio.from_url.

Return type:

None

async log_message(platform, channel_id, user_id, user_name, text, timestamp=None, embedding=None, defer_embedding=False, message_id='', reply_to_id='')[source]

Log a message to Redis with an embedding and a 45-day TTL.

The message is stored as a Redis hash (msg:{uuid}) with the embedding as a binary FLOAT32 blob, automatically indexed by the idx:messages RediSearch index.

Parameters:
  • embedding (list[float] | None) – Pre-computed embedding vector. When provided the internal embedding API call is skipped, saving a round-trip.

  • defer_embedding (bool) – When True and no pre-computed embedding is given, store a zero-vector placeholder instead of calling the embeddings API. The caller is responsible for enqueuing the returned message_key in an EmbeddingBatchQueue so the real embedding is written later.

  • platform (str)

  • channel_id (str)

  • user_id (str)

  • user_name (str)

  • text (str)

  • timestamp (float | None)

  • message_id (str)

  • reply_to_id (str)

Returns:

The message object that was written to Redis (includes the embedding vector, or an empty list when deferred).

Return type:

CachedMessage

async get_recent(platform, channel_id, count=50)[source]

Return the count most recent cached messages (newest first).

Uses the per-channel sorted set for O(log N) lookups without depending on the RediSearch module.

Return type:

list[CachedMessage]

Parameters:
  • platform (str)

  • channel_id (str)

  • count (int)

async get_by_timerange(platform, channel_id, start, end)[source]

Return cached messages within a Unix-timestamp range (inclusive).

Uses the per-channel sorted set with ZRANGEBYSCORE for O(log N) range lookups. Results are ordered ascending by timestamp.

Return type:

list[CachedMessage]

Parameters:
async get_messages_after(platform, channel_id, after_ts_exclusive, *, zset_batch=5000)[source]

Messages with zset score strictly greater than after_ts_exclusive.

When after_ts_exclusive is None, returns all messages in the channel zset (ascending by time). Uses Redis exclusive-interval syntax (score on the lower bound.

Return type:

list[CachedMessage]

Parameters:
  • platform (str)

  • channel_id (str)

  • after_ts_exclusive (float | None)

  • zset_batch (int)

async update_text_by_message_id(platform, channel_id, message_id, new_text)[source]

Update the text field of a cached message identified by its platform message ID.

Scans the most recent entries in the channel ZSET to find the matching hash. Returns the Redis key if the message was found and updated, None otherwise.

Return type:

str | None

Parameters:
  • platform (str)

  • channel_id (str)

  • message_id (str)

  • new_text (str)

async find_key_by_message_id(platform, channel_id, message_id)[source]

Find the Redis key for a cached message by its platform message ID.

Returns the key string (e.g. "msg:abc-123") or None.

Return type:

str | None

Parameters:
  • platform (str)

  • channel_id (str)

  • message_id (str)

async find_keys_by_message_ids(platform, channel_id, message_ids)[source]

Batch version of find_key_by_message_id().

Returns a mapping of {message_id: redis_key} for every message_id that was found in Redis. Missing IDs are omitted.

Fetches the channel zset once and builds the full lookup map in a single pipeline, instead of repeating the scan per message.

Return type:

dict[str, str]

Parameters:
async has_real_embedding(redis_key)[source]

Return True if redis_key has a non-zero embedding blob.

A zero-vector placeholder (all zeros) is treated as missing. Uses the raw (non-decoded) Redis connection to avoid UTF-8 corruption of binary float32 data.

Return type:

bool

Parameters:

redis_key (str)

async has_real_embedding_many(redis_keys)[source]

Pipelined version of has_real_embedding().

Returns a list of booleans, one per key, indicating whether each key has a non-zero embedding. Uses the raw (non-decoded) Redis connection to correctly handle binary data.

Return type:

list[bool]

Parameters:

redis_keys (list[str])

async mark_deleted_by_message_id(platform, channel_id, message_id, deleted_at_iso)[source]

Prepend a [deleted at TIMESTAMP] marker to a cached message’s text.

The original text is preserved. Returns True if the message was found and updated.

Return type:

bool

Parameters:
  • platform (str)

  • channel_id (str)

  • message_id (str)

  • deleted_at_iso (str)

async search_messages(query, limit=10, channel_id=None, platform=None, user_id=None)[source]

Semantic search across cached messages using RediSearch KNN.

Generates an embedding for query, then runs a vector similarity search filtered by optional channel_id, platform, and/or user_id.

Returns a list of dicts with text, user_name, timestamp, score, channel_id, and platform.

Return type:

list[dict[str, Any]]

Parameters:
  • query (str)

  • limit (int)

  • channel_id (str | None)

  • platform (str | None)

  • user_id (str | None)

async get_recent_for_user(platform, user_id, limit=20)[source]

Return the most recent cached messages sent by user_id.

Uses a RediSearch FT.SEARCH query filtered by platform and user_id, sorted by timestamp descending. This works across all channels (including DMs) without needing to know the channel ID.

Return type:

list[CachedMessage]

Parameters:
async log_thought_summary(channel_id, thought_text, timestamp=None)[source]

Store a thought summary in a per-channel Redis sorted set.

Deduplicates by comparing against the most recent entry — if it has the same content the new entry is silently skipped.

Parameters:
  • channel_id (str) – Platform-agnostic channel identifier.

  • thought_text (str) – The raw text extracted from <thought> tags.

  • timestamp (float | None) – Unix timestamp; defaults to time.time().

Return type:

None

async get_recent_thought_summaries(channel_id, limit=30)[source]

Retrieve unique thought summaries for a channel (newest first).

Parameters:
  • channel_id (str) – Platform-agnostic channel identifier.

  • limit (int) – Maximum number of unique summaries to return.

Returns:

Each dict has content (str) and timestamp (float).

Return type:

list[dict[str, Any]]

async backfill_channel_indexes(batch_size=500)[source]

Populate per-channel ZSETs from existing msg:* hashes.

Scans all msg:* keys and inserts them into the appropriate channel_msgs:{platform}:{channel_id} sorted set. Safe to run multiple times – ZADD is idempotent for existing members.

Returns the number of messages indexed.

Return type:

int

Parameters:

batch_size (int)

property redis_client: redis.asyncio.Redis

The underlying async Redis connection.

Exposed so that other subsystems (e.g. ToolContext) can share the same connection without reaching into private state.

async close()[source]

Gracefully close the Redis connections.

Return type:

None

async message_cache.get_active_channels(redis, limit=10)[source]

Find the most recently active channels by checking ZSET scores.

Scans channel_msgs:* keys and returns the limit channels with the highest max score (most recent message timestamp).

Returns a list of (platform, channel_id) tuples.

Return type:

list[tuple[str, str]]

Parameters: