message_cache
Redis-backed message cache with per-channel sorted-set indexes and RediSearch vector search.
Each message is stored as a Redis hash keyed by msg:{uuid} with
the embedding as a binary FLOAT32 blob. A per-channel sorted set
(channel_msgs:{platform}:{channel_id}) provides O(log N) time-
ordered lookups by channel, while a RediSearch HNSW index
(idx:messages) enables semantic KNN similarity search.
All cached messages are represented as CachedMessage objects
and are given a 90-day TTL in Redis.
- message_cache.strip_llm_injection_artifacts_for_cache(text)[source]
Remove prompt-injection blobs that must not be stored in the message cache.
Callers normally pass raw platform text; this guards against accidental forwarding of augmented LLM-request content into
log_message().
- class message_cache.CachedMessage(user_id, user_name, platform, channel_id, text, timestamp, embedding=<factory>, message_key='', message_id='', reply_to_id='', kind='user_in', turn_summary_id='')[source]
Bases:
objectA single cached chat message and its embedding, as stored in Redis.
The in-memory representation of one row in the message cache: speaker identity, the channel/platform it belongs to, the text, a monotonic timestamp, and the FLOAT32 embedding used for semantic search. Each instance maps to a
msg:{uuid}Redis hash and a membership in the per-channelchannel_msgs:{platform}:{channel_id}sorted set.Construct one from raw JSON via
from_json()/from_dict(), from a RedisHGETALLmapping viafrom_redis_hash(), or letMessageCachebuild and persist one for you inMessageCache.log_message(). Serialise back out withto_dict(),to_json(), orto_redis_hash(). Constructed directly in tests and byMessageCache._fetch_hashes()andMessageCache._doc_to_cached_message().- Parameters:
- message_id: str = ''
Platform-specific message identifier (Discord message ID, Matrix event ID, etc.).
- to_dict()[source]
Serialise this message to a plain JSON-friendly dict.
Unlike
to_redis_hash(), this keeps the embedding as a list of floats (not a binary blob) and omits the transientmessage_key, making it suitable for JSON transport and snapshotting. Pure in-memory transform with no side effects. Used byto_json()and as the general-purpose dict form consumed across the codebase wherever a message is round-tripped through JSON.
- to_json()[source]
Serialise this message to a JSON string.
Thin wrapper that
json.dumpsthe output ofto_dict(), giving a compact string form for transport or logging. The inverse isfrom_json(). Pure in-memory transform with no I/O.- Returns:
The JSON-encoded message.
- Return type:
- to_redis_hash()[source]
Produce the field mapping written to this message’s Redis hash.
Renders the message into the exact shape stored under
msg:{uuid}: scalars become strings, the embedding is packed to a FLOAT32 blob via_embed_to_bytes(), andNonevalues are coerced to empty strings. An emptymessage_idis dropped from the mapping so a later write cannot clobber a gateway-synced id that was filled in out of band. Pure in-memory transform with no I/O. Called byMessageCache.log_message()to build the mapping handed toMessageCache._atomic_log_to_redis(), and exercised directly intests/core/test_message_cache_sanitize.pyandtests/core/test_message_id_resync.py.
- classmethod from_dict(data)[source]
Reconstruct a
CachedMessagefrom a plain dict.The inverse of
to_dict(): coerces each field to its expected type with permissive defaults (missing keys become empty strings, zero timestamps, an empty embedding, andkinddefaulting touser_in) so partial or legacy payloads still deserialise cleanly. Pure in-memory transform with no I/O. Called byfrom_json()and used directly by callers that already hold a decoded dict.
- classmethod from_json(raw)[source]
Reconstruct a
CachedMessagefrom a JSON string.The inverse of
to_json(): parses the string withjson.loadsand delegates field coercion tofrom_dict(). Pure in-memory transform with no I/O.- Parameters:
- Returns:
The reconstructed message.
- Return type:
- Raises:
json.JSONDecodeError – If raw is not valid JSON.
- classmethod from_redis_hash(mapping, key='')[source]
Reconstruct a
CachedMessagefrom a RedisHGETALLmapping.The inverse of
to_redis_hash(): coerces each stored field back to its native type and decodes the binary embedding via_bytes_to_embed(), but only when the raw blob looks like a real vector (byteslonger than 100 bytes) rather than a decoded-string artifact, otherwise leaving the embedding empty. The originating Redis key is preserved on the returned object asmessage_key. Pure in-memory transform with no I/O.- Parameters:
- Returns:
The reconstructed message.
- Return type:
- property repr: str
Render a human-readable
[time] user: textline for this message.Formats the UTC timestamp and truncates the body to ~120 characters, producing the compact form used when injecting recalled messages into the LLM prompt or writing them to logs (distinct from
__repr__(), which is the developer-facing debug form). Pure in-memory formatting with no I/O.- Returns:
A one-line
[YYYY-MM-DD HH:MM:SS] user_name: previewstring.- Return type:
- __repr__()[source]
Return a developer-facing debug representation of this message.
Summarises the identifying fields plus the embedding length (rather than the full vector) so log lines and debugger output stay readable. This is the standard
repr()form; the prompt/log-friendly one-liner lives in thereprproperty. Pure in-memory formatting with no I/O.- Returns:
A
CachedMessage(...)debug string.- Return type:
- class message_cache.MessageCache(redis_url, openrouter_client, embedding_model='google/gemini-embedding-001', ssl_kwargs=None, redis_sentinels=None, redis_sentinel_master='falkordb', resilience_kwargs=None)[source]
Bases:
objectAsync Redis message cache with automatic embedding generation and RediSearch-backed vector search.
- Parameters:
redis_url (
str) – Redis connection URL (e.g."redis://localhost:6379/0").openrouter_client (
OpenRouterClient) – SharedOpenRouterClientused to call the embeddings API.embedding_model (
str) – Model identifier for the embeddings endpoint (e.g."google/gemini-embedding-001").ssl_kwargs (dict | None)
redis_sentinel_master (str)
resilience_kwargs (dict | None)
- __init__(redis_url, openrouter_client, embedding_model='google/gemini-embedding-001', ssl_kwargs=None, redis_sentinels=None, redis_sentinel_master='falkordb', resilience_kwargs=None)[source]
Initialize the instance.
- Parameters:
redis_url (
str) – The redis url value.openrouter_client (
OpenRouterClient) – The openrouter client value.embedding_model (
str) – The embedding model value.ssl_kwargs (
dict|None) – Optional SSL/mTLS keyword arguments forwarded toredis.asyncio.from_urlorSentinel.redis_sentinels (
list[str] |None) – Optional list of sentinel hosts.redis_sentinel_master (
str) – The sentinel master name.resilience_kwargs (
dict|None) – Optional retry/backoff/health-check keyword arguments (seeConfig.redis_resilience_kwargs) so reads/writes ride through a Sentinel failover window instead of raising on the first attempt.
- Return type:
None
- async log_message(platform, channel_id, user_id, user_name, text, timestamp=None, embedding=None, defer_embedding=False, message_id='', reply_to_id='', kind='user_in', turn_summary_id='', message_key='')[source]
Log a message to Redis with an embedding and a 90-day TTL.
text is passed through
strip_llm_injection_artifacts_for_cache()so prompt-only suffixes (e.g. channel semantic recall XML) are never stored or embedded. If a caller-supplied embedding was computed from unstripped text, it is dropped and recomputed from the cleaned text.The message is stored as a Redis hash (
msg:{uuid}) with the embedding as a binary FLOAT32 blob, automatically indexed by theidx:messagesRediSearch index.If you add the
kindTAG field to an existing RediSearch index, runpython init_redis_indexes.py(or rely on bot startup) soALTERcan addkind; otherwise recreateidx:messages.- Parameters:
embedding (
list[float] |None) – Pre-computed embedding vector. When provided the internal embedding API call is skipped, saving a round-trip.defer_embedding (
bool) – WhenTrueand no pre-computed embedding is given, store a zero-vector placeholder instead of calling the embeddings API. The caller is responsible for enqueuing the returnedmessage_keyin anEmbeddingBatchQueueso the real embedding is written later.platform (str)
channel_id (str)
user_id (str)
user_name (str)
text (str)
timestamp (float | None)
message_id (str)
reply_to_id (str)
kind (str)
turn_summary_id (str)
message_key (str)
- Returns:
The message object that was written to Redis (includes the embedding vector, or an empty list when deferred).
- Return type:
- async get_recent(platform, channel_id, count=50)[source]
Return the count most recent cached messages for a channel.
The primary “recent history” read used when assembling the LLM prompt and by the heartbeat/proactive/backfill paths.
ZREVRANGEthe per-channelchannel_msgs:{platform}:{channel_id}sorted set for the newest keys, then hydrate them via_fetch_hashes(); this O(log N) path never touches the RediSearch module, so recent history stays available even if the vector index is unavailable. Results come back newest-first.Called widely, including from
prompt_context,message_processor.proactive_gates,message_processor.history_backfill,message_processor.channel_heartbeat, andbuild_kg.- Parameters:
- Returns:
Up to count messages, newest first; empty when the channel has no cached messages.
- Return type:
- async get_by_timerange(platform, channel_id, start, end)[source]
Return cached messages whose timestamp falls within a range.
Time-window read over the per-channel
channel_msgs:{platform}:{channel_id}sorted set. Because zset scores are the widened monotonic values from_monotonic_channel_score()(not raw seconds), bounds supplied as plain Unix seconds (heuristically, anything below1e12) are scaled up by 1_000_000 to match. Runs aZRANGEBYSCOREand hydrates the keys via_fetch_hashes(); results are ascending by timestamp. Exercised intests/core/test_message_cache_ts_alignment.py.- Parameters:
- Returns:
Matching messages ascending by timestamp; empty when none fall in the range.
- Return type:
- async get_messages_after(platform, channel_id, after_ts_exclusive, *, zset_batch=5000)[source]
Return channel messages strictly newer than a timestamp floor.
The “everything since X” read used to gather context the model has not yet seen. Paginates
ZRANGEBYSCOREover the per-channelchannel_msgs:{platform}:{channel_id}sorted set in zset_batch-sized windows (using Redis exclusive lower-bound syntax(scoreso the floor itself is excluded), then hydrates the collected keys via_fetch_hashes()in batches of 400. As withget_by_timerange(), a floor given in plain Unix seconds (below1e12) is scaled up to a monotonic zset score;Nonemeans “from the beginning”. Called byopenrouter_client.executorand exercised intests/core/test_message_cache_ts_alignment.py.- Parameters:
- Returns:
Matching messages ascending by timestamp; empty when none qualify.
- Return type:
- async update_text_by_message_id(platform, channel_id, message_id, new_text)[source]
Overwrite a cached message’s text, located by platform message id.
Supports the edited-message path: when a user edits a message on the platform, the gateway resyncs the new body here. Resolves the message’s Redis key via
find_key_by_message_id()(idempotency-index lookup, falling back to a channel-zset scan) and, on a hit,HSETs thetextfield on thatmsg:*hash. The embedding is intentionally left untouched. Called bymessage_processor.processoron edit events.- Parameters:
- Returns:
The Redis key that was updated, or
Noneif no matching message was found.- Return type:
- async find_key_by_message_id(platform, channel_id, message_id)[source]
Resolve the Redis key of a cached message by its platform message id.
Tries the fast path first —
GETthe idempotency indexmsgid:{platform}:{channel_id}:{message_id}written at log time and confirm the pointed-at hash stillEXISTS— then falls back to a boundedZREVRANGEscan (windows of 500, up to 5000 keys) of the per-channel sorted set, pipeliningHGET ... message_idto find the match. Reads only; index-lookup failures are swallowed and logged at debug. Called byupdate_text_by_message_id(),find_keys_by_message_ids(), andtools.xray_tool.
- async find_keys_by_message_ids(platform, channel_id, message_ids)[source]
Batch version of
find_key_by_message_id().Returns a mapping of
{message_id: redis_key}for every message_id that was found in Redis. Missing IDs are omitted.Fetches the channel zset once and builds the full lookup map in a single pipeline, instead of repeating the scan per message.
- async has_real_embedding(redis_key)[source]
Report whether a message hash holds a real (non-placeholder) embedding.
Messages logged with
defer_embedding=True(or with empty text) get a zero-vector placeholder so they can be backfilled later; this lets the embedding backfill workers tell whichmsg:*rows still need a real vector.HGETs theembeddingfield over the raw, non-decode_responsesRedis connection (self._redis_raw) so the binary FLOAT32 bytes are not mangled by UTF-8 decoding, then checks the blob is full length and not all zeros. Read-only. Seehas_real_embedding_many()for the pipelined batch form.
- async has_real_embedding_many(redis_keys)[source]
Batch-check which message hashes hold a real embedding.
The pipelined form of
has_real_embedding(), used by the embedding backfill paths to filter a whole batch of keys in one Redis round trip instead of one call each. PipelinesHGET ... embeddingfor every key over the raw, non-decode_responsesconnection (self._redis_raw) so binary FLOAT32 data survives intact, then applies the same full-length-and-non-zero test per key. Read-only. Called frombackground_tasksandmessage_processor.history_backfill.
- async mark_deleted_by_message_id(platform, channel_id, message_id, deleted_at_iso)[source]
Tombstone a cached message by prefixing a deletion marker to its text.
Supports the platform delete path: when a user deletes a message, the gateway calls this so cached/recalled history shows it was removed rather than silently dropping it (the original text is kept after the marker).
ZREVRANGEs the most recent 200 keys of the per-channelchannel_msgs:{platform}:{channel_id}sorted set, pipelinesHGET ... message_idto find the match, and — if the marker is not already present —HSETs thetextfield with a leading[deleted at ...]tag. Called bymessage_processor.processoron delete events.- Parameters:
- Returns:
Trueif the message was found (and tagged, unless already tagged);Falseif no match was in the recent window.- Return type:
- async search_messages(query, limit=10, channel_id=None, platform=None, user_id=None, *, channel_ids=None, min_timestamp=None, query_embedding=None)[source]
Semantic search across cached messages using RediSearch KNN.
Generates an embedding for query unless query_embedding is provided, then runs vector similarity search filtered by optional channel_id, platform, and/or user_id.
When channel_ids is provided, the filter becomes an OR over the listed channel IDs (RediSearch
@channel_id:{c1|c2|c3}syntax) and channel_id must beNone. Each id is escaped individually.When min_timestamp is set (Unix seconds), only messages with
timestamp >= min_timestampare considered. In that case platform must be set, plus either channel_id or channel_ids (strict per-channel(s) recall).Returns a list of dicts with
text,user_name,redis_key,timestamp,similarity,channel_id, andplatform.
- async get_messages_around_key(platform, channel_id, redis_key, before=5, after=5)[source]
Return a chronological window of messages centered on one key.
Powers semantic-recall context expansion: after a vector hit identifies a single relevant message, this widens it to the surrounding conversation so the recalled snippet reads coherently. Looks up the key’s
ZRANKin the per-channelchannel_msgs:{platform}:{channel_id}sorted set,ZRANGEs the rank window[rank - before, rank + after], and hydrates the keys via_fetch_hashes(). If the key is absent from the zset but its hash stillEXISTS, the single message is returned; if it is gone entirely, an empty list is returned. Called bymessage_processor.channel_semantic_recall.- Parameters:
- Returns:
Up to
before + 1 + aftermessages ascending by time; possibly just the one message, or empty if the key is gone.- Return type:
- async get_recent_for_user(platform, user_id, limit=20)[source]
Return a user’s most recent cached messages across all channels.
Unlike the channel-scoped reads, this finds a user’s history without knowing where they spoke, by running an
FT.SEARCHover theidx:messagesRediSearch index with TAG filters on platform and user_id (escaped via_escape_tag()), sorted by timestamp descending. Results are materialised through_doc_to_cached_message()(embeddings are not returned). Backsget_recent_speaker_channels()and several tools (tools.xray_tool,tools.dm_history,tools.chat_analytics,tools.gravimetric_telescope).- Parameters:
- Returns:
The user’s messages newest-first, across every channel (including DMs).
- Return type:
- async get_recent_speaker_channels(platform, user_id, limit=10, lookback=300)[source]
Return the speaker’s most-recently-used channel IDs (MRU order).
Walks the last lookback messages this user sent on platform (newest first) and returns up to limit unique channel IDs in most-recent-first order.
Privacy-by-construction: the result only contains channels where user_id has actually sent at least one message — i.e. channels the speaker demonstrably had access to at write time. Other users’ DMs and channels the speaker only lurked in cannot leak through this function.
- async record_channel_metadata(platform, channel_id, *, channel_name='', guild_id='', guild_name='', is_dm=False)[source]
Upsert human-readable metadata for a channel.
Empty fields are still written so a later read can tell the difference between “channel was never seen” (HGETALL returns empty mapping) and “channel was seen but has no guild” (DM-style: guild_id/guild_name explicitly empty).
is_dmis persisted as"1"/"0"and is consulted by the cross-channel recall path to avoid ever leaking 1-on-1 DM (Discord) or 2-member room (Matrix) context into another channel’s prompt.
- async get_channel_metadata_many(platform, channel_ids)[source]
Batch-fetch channel metadata. Missing channels are omitted.
Returns
{channel_id: {"channel_name": str, "guild_id": str, "guild_name": str, "is_dm": bool, "updated_at": str}}.is_dmis decoded back to a Pythonbool. Channels written before theis_dmfield existed default toFalse(legacy rows are unlikely to be DMs because there is no pre-existing deploy that would have populated this cache).
- async log_thought_summary(channel_id, thought_text, timestamp=None)[source]
Store a thought summary in a per-channel Redis sorted set.
Deduplicates by comparing against the most recent entry — if it has the same content the new entry is silently skipped.
- async get_recent_thought_summaries(channel_id, limit=30)[source]
Retrieve unique thought summaries for a channel (newest first).
- async backfill_channel_indexes(batch_size=500)[source]
Rebuild the per-channel sorted-set indexes from stored message hashes.
A maintenance/migration routine for when the
channel_msgs:*indexes are missing or incomplete (e.g. after introducing them) but the underlyingmsg:*hashes still exist.SCANs allmsg:*keys in batch_size pages, pipelinesHMGET ... platform channel_id timestampto read their routing fields, thenZADDs each key into itschannel_msgs:{platform}:{channel_id}sorted set, refreshes the zset TTL, and registers the channel insg:active_channels— logging a total at the end. Idempotent:ZADDsimply re-scores existing members, so it is safe to run repeatedly. No in-repo callers (operational entry point, invoked manually).
- async log_tool_call_record(*, record_id, tool_name, raw_arguments_json, result_output, success, execution_start, execution_end, duration_ms, order_index, round_number, channel_id, platform)[source]
Persist one hidden tool-call execution trace to Redis.
Part of the Stargazer-System-Log pipeline: every individual tool invocation in a turn is recorded so its arguments, output, timing, and success can be replayed or summarised later. Pipelines an
HSETof thetoolcall:{record_id}hash (result output capped at 50_000 chars), sets a 90-day TTL, andZADDs the key into the per-channeltool_call_log:{platform}:{channel_id}sorted set scored by execution start. Deliberately kept out ofchannel_msgs:*so the prompt context builder never surfaces these raw traces; they are reached only via their summary (seelog_tool_call_summary()). Theturn_summary_idis left blank here and back-patched later. Called bymessage_processor.generate_and_send.- Parameters:
record_id (
str) – Unique id for this trace; forms thetoolcall:{record_id}key.tool_name (
str) – Name of the invoked tool.raw_arguments_json (
str) – JSON-encoded tool arguments as called.result_output (
str) – The tool’s output (stored truncated to 50_000 chars).success (
bool) – Whether the call succeeded.execution_start (
float) – Unix start time; also the zset score.execution_end (
float) – Unix end time.duration_ms (
float) – Wall-clock duration in milliseconds.order_index (
int) – Position of this call within the turn.round_number (
int) – Tool-calling round the call belongs to.channel_id (
str) – Channel identifier.platform (
str) – Platform identifier (e.g."discord").
- Returns:
The Redis key the record was stored under (
toolcall:{record_id}).- Return type:
- async log_tool_call_summary(*, summary_id, record_ids, summary_text, channel_id, platform, timestamp=None)[source]
Persist the turn-level Stargazer-System-Log summary linking its traces.
The visible counterpart to the hidden
log_tool_call_record()traces: one human-readable summary of everything the tools did this turn, keyed by the assistant message it belongs to. Pipelines anHSETof thetoolcall_summary:{summary_id}hash (with record_ids stored as a JSON array), sets a 90-day TTL,ZADDs it into the per-channeltool_call_summaries:{platform}:{channel_id}sorted set, and back-patches each referencedtoolcall:{rid}record’sturn_summary_idso traces point back at their summary. Called bymessage_processor.generate_and_sendandtools.librarian_tool.- Parameters:
summary_id (
str) – Unique id for the summary; forms thetoolcall_summary:{summary_id}key.record_ids (
list[str]) – Ids of thetoolcall:*records this summary covers.summary_text (
str) – The human-readable turn summary.channel_id (
str) – Channel identifier.platform (
str) – Platform identifier (e.g."discord").timestamp (
float|None) – Unix timestamp and zset score; defaults totime.time().
- Returns:
The Redis key the summary was stored under (
toolcall_summary:{summary_id}).- Return type:
- async get_tool_call_records_by_summary(summary_id)[source]
Fetch all hidden tool-call records belonging to a summary.
The expansion read for the Stargazer-System-Log: given a summary id, pull back the full per-call traces it links so a user (or tool) can inspect exactly what each tool did.
HGETALLs thetoolcall_summary:{summary_id}hash, parses itsrecord_idsJSON array, pipelinesHGETALLfor eachtoolcall:{rid}hash, skips expired/empty rows, and sorts the survivors byorder_index. Returns empty when the summary id is unknown or has no records. Called bytools.retrieve_tool_call_log.
- async get_recent_tool_call_summaries(platform, channel_id, limit=10)[source]
Fetch the most recent tool-call summaries for a channel.
Feeds the context-injection and observability surfaces with a channel’s recent Stargazer-System-Log summaries.
ZREVRANGEs the per-channeltool_call_summaries:{platform}:{channel_id}sorted set for the newest limit keys and pipelinesHGETALLfor each, dropping expired/empty rows. On a Redis read error it records a degraded-read metric via_note_context_read_degraded()and fails open with an empty list so a missing read never blocks a turn. Called bymessage_processor.context_injections,message_processor.generate_and_send, andweb.obs.- Parameters:
- Returns:
Summary field mappings newest-first (each with
summary_id,summary_text,timestamp, and arecord_idsJSON array); empty on miss or degraded read.- Return type:
- async set_ctxbreak_ts(platform, channel_id, ts)[source]
Persist a context-break timestamp floor for a channel.
Implements the user-facing “context break” / fresh-start command: once set, every context-building pathway must drop messages whose timestamp is
<=this value, so the model starts the conversation cleanly from this point.SETs thectxbreak:{platform}:{channel_id}string key (built by_ctxbreak_key()) to the stringified timestamp. The counterpart reader isget_ctxbreak_ts(). Called bymessage_processor.processor.
- async get_ctxbreak_ts(platform, channel_id)[source]
Return a channel’s context-break timestamp floor, or
None.The reader paired with
set_ctxbreak_ts(): every context assembly path consults this to learn the cutoff below which messages must be excluded.GETs thectxbreak:{platform}:{channel_id}key (built by_ctxbreak_key()) and parses it to a float, returningNonewhen unset or unparseable (i.e. no break in effect). Called byprompt_context,message_processor.proactive_gates,message_processor.generate_and_send,message_processor.history_backfill, andweb.obs.
- property redis_client: redis.asyncio.Redis
The underlying async Redis connection.
Exposed so that other subsystems (e.g.
ToolContext) can share the same connection without reaching into private state.
- property redis_raw_client: redis.asyncio.Redis
The underlying async Redis connection with decode_responses=False.
Exposed so that subsystems needing raw binary payloads (e.g., RedisEventBus) can use it.
- async close()[source]
Close both underlying Redis connections held by this cache.
Releases the decoded (
self._redis) and raw-binary (self._redis_raw) connection pools created in__init__(), callingaclose()on each. Invoked at shutdown / teardown so sockets and pooled connections are not leaked. Called by maintenance scripts such asbuild_kg.- Return type:
- async message_cache.get_active_channels(redis, limit=10)[source]
Discover the most recently active channels across the whole cache.
A module-level utility (not a
MessageCachemethod, so it can run against any raw Redis handle) that background workers use to decide which channels to summarise, extract knowledge from, or run heartbeats on.SCANs allchannel_msgs:*keys, pipelinesZREVRANGE ... 0 0 WITHSCORESto read each channel’s newest message score, parses theplatformandchannel_idback out of the key name, and returns the top limit by recency. Read-only; on any failure it logs, records a degraded-read metric via_note_context_read_degraded(), and fails open with an empty list. Called bybackground_tasks,kg_extraction,background_agents.channel_summarizer, andbackground_agents.channel_heartbeat.