message_cache

Redis-backed message cache with per-channel sorted-set indexes and RediSearch vector search.

Each message is stored as a Redis hash keyed by msg:{uuid} with the embedding as a binary FLOAT32 blob. A per-channel sorted set (channel_msgs:{platform}:{channel_id}) provides O(log N) time- ordered lookups by channel, while a RediSearch HNSW index (idx:messages) enables semantic KNN similarity search.

All cached messages are represented as CachedMessage objects and are given a 90-day TTL in Redis.

message_cache.strip_llm_injection_artifacts_for_cache(text)[source]

Remove prompt-injection blobs that must not be stored in the message cache.

Callers normally pass raw platform text; this guards against accidental forwarding of augmented LLM-request content into log_message().

Return type:

str

Parameters:

text (str)

class message_cache.CachedMessage(user_id, user_name, platform, channel_id, text, timestamp, embedding=<factory>, message_key='', message_id='', reply_to_id='', kind='user_in', turn_summary_id='')[source]

Bases: object

A single cached chat message and its embedding, as stored in Redis.

The in-memory representation of one row in the message cache: speaker identity, the channel/platform it belongs to, the text, a monotonic timestamp, and the FLOAT32 embedding used for semantic search. Each instance maps to a msg:{uuid} Redis hash and a membership in the per-channel channel_msgs:{platform}:{channel_id} sorted set.

Construct one from raw JSON via from_json() / from_dict(), from a Redis HGETALL mapping via from_redis_hash(), or let MessageCache build and persist one for you in MessageCache.log_message(). Serialise back out with to_dict(), to_json(), or to_redis_hash(). Constructed directly in tests and by MessageCache._fetch_hashes() and MessageCache._doc_to_cached_message().

Parameters:
user_id: str
user_name: str
platform: str
channel_id: str
text: str
timestamp: float
embedding: list[float]
message_key: str = ''
message_id: str = ''

Platform-specific message identifier (Discord message ID, Matrix event ID, etc.).

reply_to_id: str = ''

Platform-specific ID of the message this one replies to, if any.

kind: str = 'user_in'

user_in for messages from humans, assistant_out for bot replies.

turn_summary_id: str = ''

Link to the Stargazer-System-Log summary for this assistant turn.

to_dict()[source]

Serialise this message to a plain JSON-friendly dict.

Unlike to_redis_hash(), this keeps the embedding as a list of floats (not a binary blob) and omits the transient message_key, making it suitable for JSON transport and snapshotting. Pure in-memory transform with no side effects. Used by to_json() and as the general-purpose dict form consumed across the codebase wherever a message is round-tripped through JSON.

Returns:

Field/value mapping with embedding as a list of floats.

Return type:

dict[str, Any]

to_json()[source]

Serialise this message to a JSON string.

Thin wrapper that json.dumps the output of to_dict(), giving a compact string form for transport or logging. The inverse is from_json(). Pure in-memory transform with no I/O.

Returns:

The JSON-encoded message.

Return type:

str

to_redis_hash()[source]

Produce the field mapping written to this message’s Redis hash.

Renders the message into the exact shape stored under msg:{uuid}: scalars become strings, the embedding is packed to a FLOAT32 blob via _embed_to_bytes(), and None values are coerced to empty strings. An empty message_id is dropped from the mapping so a later write cannot clobber a gateway-synced id that was filled in out of band. Pure in-memory transform with no I/O. Called by MessageCache.log_message() to build the mapping handed to MessageCache._atomic_log_to_redis(), and exercised directly in tests/core/test_message_cache_sanitize.py and tests/core/test_message_id_resync.py.

Returns:

The hash field mapping, with the embedding as bytes and message_id omitted when empty.

Return type:

dict[str, str | bytes | float]

classmethod from_dict(data)[source]

Reconstruct a CachedMessage from a plain dict.

The inverse of to_dict(): coerces each field to its expected type with permissive defaults (missing keys become empty strings, zero timestamps, an empty embedding, and kind defaulting to user_in) so partial or legacy payloads still deserialise cleanly. Pure in-memory transform with no I/O. Called by from_json() and used directly by callers that already hold a decoded dict.

Parameters:

data (dict[str, Any]) – A dict produced by to_dict() (or a close equivalent).

Returns:

The reconstructed message.

Return type:

CachedMessage

classmethod from_json(raw)[source]

Reconstruct a CachedMessage from a JSON string.

The inverse of to_json(): parses the string with json.loads and delegates field coercion to from_dict(). Pure in-memory transform with no I/O.

Parameters:

raw (str) – A JSON string produced by to_json().

Returns:

The reconstructed message.

Return type:

CachedMessage

Raises:

json.JSONDecodeError – If raw is not valid JSON.

classmethod from_redis_hash(mapping, key='')[source]

Reconstruct a CachedMessage from a Redis HGETALL mapping.

The inverse of to_redis_hash(): coerces each stored field back to its native type and decodes the binary embedding via _bytes_to_embed(), but only when the raw blob looks like a real vector (bytes longer than 100 bytes) rather than a decoded-string artifact, otherwise leaving the embedding empty. The originating Redis key is preserved on the returned object as message_key. Pure in-memory transform with no I/O.

Parameters:
  • mapping (dict[str, Any]) – The raw field/value mapping returned by an HGETALL on a msg:{uuid} hash.

  • key (str) – The Redis key the hash was read from, stored on the result as message_key.

Returns:

The reconstructed message.

Return type:

CachedMessage

property repr: str

Render a human-readable [time] user: text line for this message.

Formats the UTC timestamp and truncates the body to ~120 characters, producing the compact form used when injecting recalled messages into the LLM prompt or writing them to logs (distinct from __repr__(), which is the developer-facing debug form). Pure in-memory formatting with no I/O.

Returns:

A one-line [YYYY-MM-DD HH:MM:SS] user_name: preview string.

Return type:

str

__repr__()[source]

Return a developer-facing debug representation of this message.

Summarises the identifying fields plus the embedding length (rather than the full vector) so log lines and debugger output stay readable. This is the standard repr() form; the prompt/log-friendly one-liner lives in the repr property. Pure in-memory formatting with no I/O.

Returns:

A CachedMessage(...) debug string.

Return type:

str

class message_cache.MessageCache(redis_url, openrouter_client, embedding_model='google/gemini-embedding-001', ssl_kwargs=None, redis_sentinels=None, redis_sentinel_master='falkordb', resilience_kwargs=None)[source]

Bases: object

Async Redis message cache with automatic embedding generation and RediSearch-backed vector search.

Parameters:
  • redis_url (str) – Redis connection URL (e.g. "redis://localhost:6379/0").

  • openrouter_client (OpenRouterClient) – Shared OpenRouterClient used to call the embeddings API.

  • embedding_model (str) – Model identifier for the embeddings endpoint (e.g. "google/gemini-embedding-001").

  • ssl_kwargs (dict | None)

  • redis_sentinels (list[str] | None)

  • redis_sentinel_master (str)

  • resilience_kwargs (dict | None)

__init__(redis_url, openrouter_client, embedding_model='google/gemini-embedding-001', ssl_kwargs=None, redis_sentinels=None, redis_sentinel_master='falkordb', resilience_kwargs=None)[source]

Initialize the instance.

Parameters:
  • redis_url (str) – The redis url value.

  • openrouter_client (OpenRouterClient) – The openrouter client value.

  • embedding_model (str) – The embedding model value.

  • ssl_kwargs (dict | None) – Optional SSL/mTLS keyword arguments forwarded to redis.asyncio.from_url or Sentinel.

  • redis_sentinels (list[str] | None) – Optional list of sentinel hosts.

  • redis_sentinel_master (str) – The sentinel master name.

  • resilience_kwargs (dict | None) – Optional retry/backoff/health-check keyword arguments (see Config.redis_resilience_kwargs) so reads/writes ride through a Sentinel failover window instead of raising on the first attempt.

Return type:

None

async log_message(platform, channel_id, user_id, user_name, text, timestamp=None, embedding=None, defer_embedding=False, message_id='', reply_to_id='', kind='user_in', turn_summary_id='', message_key='')[source]

Log a message to Redis with an embedding and a 90-day TTL.

text is passed through strip_llm_injection_artifacts_for_cache() so prompt-only suffixes (e.g. channel semantic recall XML) are never stored or embedded. If a caller-supplied embedding was computed from unstripped text, it is dropped and recomputed from the cleaned text.

The message is stored as a Redis hash (msg:{uuid}) with the embedding as a binary FLOAT32 blob, automatically indexed by the idx:messages RediSearch index.

If you add the kind TAG field to an existing RediSearch index, run python init_redis_indexes.py (or rely on bot startup) so ALTER can add kind; otherwise recreate idx:messages.

Parameters:
  • embedding (list[float] | None) – Pre-computed embedding vector. When provided the internal embedding API call is skipped, saving a round-trip.

  • defer_embedding (bool) – When True and no pre-computed embedding is given, store a zero-vector placeholder instead of calling the embeddings API. The caller is responsible for enqueuing the returned message_key in an EmbeddingBatchQueue so the real embedding is written later.

  • platform (str)

  • channel_id (str)

  • user_id (str)

  • user_name (str)

  • text (str)

  • timestamp (float | None)

  • message_id (str)

  • reply_to_id (str)

  • kind (str)

  • turn_summary_id (str)

  • message_key (str)

Returns:

The message object that was written to Redis (includes the embedding vector, or an empty list when deferred).

Return type:

CachedMessage

async get_recent(platform, channel_id, count=50)[source]

Return the count most recent cached messages for a channel.

The primary “recent history” read used when assembling the LLM prompt and by the heartbeat/proactive/backfill paths. ZREVRANGE the per-channel channel_msgs:{platform}:{channel_id} sorted set for the newest keys, then hydrate them via _fetch_hashes(); this O(log N) path never touches the RediSearch module, so recent history stays available even if the vector index is unavailable. Results come back newest-first.

Called widely, including from prompt_context, message_processor.proactive_gates, message_processor.history_backfill, message_processor.channel_heartbeat, and build_kg.

Parameters:
  • platform (str) – Platform identifier (e.g. "discord").

  • channel_id (str) – Channel identifier.

  • count (int) – Maximum number of messages to return.

Returns:

Up to count messages, newest first; empty when the channel has no cached messages.

Return type:

list[CachedMessage]

async get_by_timerange(platform, channel_id, start, end)[source]

Return cached messages whose timestamp falls within a range.

Time-window read over the per-channel channel_msgs:{platform}:{channel_id} sorted set. Because zset scores are the widened monotonic values from _monotonic_channel_score() (not raw seconds), bounds supplied as plain Unix seconds (heuristically, anything below 1e12) are scaled up by 1_000_000 to match. Runs a ZRANGEBYSCORE and hydrates the keys via _fetch_hashes(); results are ascending by timestamp. Exercised in tests/core/test_message_cache_ts_alignment.py.

Parameters:
  • platform (str) – Platform identifier (e.g. "discord").

  • channel_id (str) – Channel identifier.

  • start (float) – Inclusive lower bound, Unix seconds (or a pre-scaled zset score).

  • end (float) – Inclusive upper bound, Unix seconds (or a pre-scaled zset score).

Returns:

Matching messages ascending by timestamp; empty when none fall in the range.

Return type:

list[CachedMessage]

async get_messages_after(platform, channel_id, after_ts_exclusive, *, zset_batch=5000)[source]

Return channel messages strictly newer than a timestamp floor.

The “everything since X” read used to gather context the model has not yet seen. Paginates ZRANGEBYSCORE over the per-channel channel_msgs:{platform}:{channel_id} sorted set in zset_batch-sized windows (using Redis exclusive lower-bound syntax (score so the floor itself is excluded), then hydrates the collected keys via _fetch_hashes() in batches of 400. As with get_by_timerange(), a floor given in plain Unix seconds (below 1e12) is scaled up to a monotonic zset score; None means “from the beginning”. Called by openrouter_client.executor and exercised in tests/core/test_message_cache_ts_alignment.py.

Parameters:
  • platform (str) – Platform identifier (e.g. "discord").

  • channel_id (str) – Channel identifier.

  • after_ts_exclusive (float | None) – Exclusive lower bound in Unix seconds (or a pre-scaled score); None returns all messages.

  • zset_batch (int) – Page size for each ZRANGEBYSCORE scan.

Returns:

Matching messages ascending by timestamp; empty when none qualify.

Return type:

list[CachedMessage]

async update_text_by_message_id(platform, channel_id, message_id, new_text)[source]

Overwrite a cached message’s text, located by platform message id.

Supports the edited-message path: when a user edits a message on the platform, the gateway resyncs the new body here. Resolves the message’s Redis key via find_key_by_message_id() (idempotency-index lookup, falling back to a channel-zset scan) and, on a hit, HSETs the text field on that msg:* hash. The embedding is intentionally left untouched. Called by message_processor.processor on edit events.

Parameters:
  • platform (str) – Platform identifier (e.g. "discord").

  • channel_id (str) – Channel identifier.

  • message_id (str) – Platform-specific id of the edited message.

  • new_text (str) – The replacement text to store.

Returns:

The Redis key that was updated, or None if no matching message was found.

Return type:

str | None

async find_key_by_message_id(platform, channel_id, message_id)[source]

Resolve the Redis key of a cached message by its platform message id.

Tries the fast path first — GET the idempotency index msgid:{platform}:{channel_id}:{message_id} written at log time and confirm the pointed-at hash still EXISTS — then falls back to a bounded ZREVRANGE scan (windows of 500, up to 5000 keys) of the per-channel sorted set, pipelining HGET ... message_id to find the match. Reads only; index-lookup failures are swallowed and logged at debug. Called by update_text_by_message_id(), find_keys_by_message_ids(), and tools.xray_tool.

Parameters:
  • platform (str) – Platform identifier (e.g. "discord").

  • channel_id (str) – Channel identifier.

  • message_id (str) – Platform-specific message id to resolve.

Returns:

The matching Redis key (e.g. "msg:abc-123"), or None if not found.

Return type:

str | None

async find_keys_by_message_ids(platform, channel_id, message_ids)[source]

Batch version of find_key_by_message_id().

Returns a mapping of {message_id: redis_key} for every message_id that was found in Redis. Missing IDs are omitted.

Fetches the channel zset once and builds the full lookup map in a single pipeline, instead of repeating the scan per message.

Return type:

dict[str, str]

Parameters:
async has_real_embedding(redis_key)[source]

Report whether a message hash holds a real (non-placeholder) embedding.

Messages logged with defer_embedding=True (or with empty text) get a zero-vector placeholder so they can be backfilled later; this lets the embedding backfill workers tell which msg:* rows still need a real vector. HGETs the embedding field over the raw, non-decode_responses Redis connection (self._redis_raw) so the binary FLOAT32 bytes are not mangled by UTF-8 decoding, then checks the blob is full length and not all zeros. Read-only. See has_real_embedding_many() for the pipelined batch form.

Parameters:

redis_key (str) – The msg:{uuid} key to inspect.

Returns:

True if the stored embedding is full-length and non-zero; False if missing, too short, or an all-zero placeholder.

Return type:

bool

async has_real_embedding_many(redis_keys)[source]

Batch-check which message hashes hold a real embedding.

The pipelined form of has_real_embedding(), used by the embedding backfill paths to filter a whole batch of keys in one Redis round trip instead of one call each. Pipelines HGET ... embedding for every key over the raw, non-decode_responses connection (self._redis_raw) so binary FLOAT32 data survives intact, then applies the same full-length-and-non-zero test per key. Read-only. Called from background_tasks and message_processor.history_backfill.

Parameters:

redis_keys (list[str]) – The msg:{uuid} keys to inspect.

Returns:

One flag per input key (positionally aligned), True where the embedding is real and False where it is missing, short, or a zero placeholder.

Return type:

list[bool]

async mark_deleted_by_message_id(platform, channel_id, message_id, deleted_at_iso)[source]

Tombstone a cached message by prefixing a deletion marker to its text.

Supports the platform delete path: when a user deletes a message, the gateway calls this so cached/recalled history shows it was removed rather than silently dropping it (the original text is kept after the marker). ZREVRANGEs the most recent 200 keys of the per-channel channel_msgs:{platform}:{channel_id} sorted set, pipelines HGET ... message_id to find the match, and — if the marker is not already present — HSETs the text field with a leading [deleted at ...] tag. Called by message_processor.processor on delete events.

Parameters:
  • platform (str) – Platform identifier (e.g. "discord").

  • channel_id (str) – Channel identifier.

  • message_id (str) – Platform-specific id of the deleted message.

  • deleted_at_iso (str) – ISO-8601 deletion timestamp embedded in the marker.

Returns:

True if the message was found (and tagged, unless already tagged); False if no match was in the recent window.

Return type:

bool

async search_messages(query, limit=10, channel_id=None, platform=None, user_id=None, *, channel_ids=None, min_timestamp=None, query_embedding=None)[source]

Semantic search across cached messages using RediSearch KNN.

Generates an embedding for query unless query_embedding is provided, then runs vector similarity search filtered by optional channel_id, platform, and/or user_id.

When channel_ids is provided, the filter becomes an OR over the listed channel IDs (RediSearch @channel_id:{c1|c2|c3} syntax) and channel_id must be None. Each id is escaped individually.

When min_timestamp is set (Unix seconds), only messages with timestamp >= min_timestamp are considered. In that case platform must be set, plus either channel_id or channel_ids (strict per-channel(s) recall).

Returns a list of dicts with text, user_name, redis_key, timestamp, similarity, channel_id, and platform.

Return type:

list[dict[str, Any]]

Parameters:
async get_messages_around_key(platform, channel_id, redis_key, before=5, after=5)[source]

Return a chronological window of messages centered on one key.

Powers semantic-recall context expansion: after a vector hit identifies a single relevant message, this widens it to the surrounding conversation so the recalled snippet reads coherently. Looks up the key’s ZRANK in the per-channel channel_msgs:{platform}:{channel_id} sorted set, ZRANGEs the rank window [rank - before, rank + after], and hydrates the keys via _fetch_hashes(). If the key is absent from the zset but its hash still EXISTS, the single message is returned; if it is gone entirely, an empty list is returned. Called by message_processor.channel_semantic_recall.

Parameters:
  • platform (str) – Platform identifier (e.g. "discord").

  • channel_id (str) – Channel identifier.

  • redis_key (str) – The msg:{uuid} key to center the window on.

  • before (int) – Number of preceding messages to include.

  • after (int) – Number of following messages to include.

Returns:

Up to before + 1 + after messages ascending by time; possibly just the one message, or empty if the key is gone.

Return type:

list[CachedMessage]

async get_recent_for_user(platform, user_id, limit=20)[source]

Return a user’s most recent cached messages across all channels.

Unlike the channel-scoped reads, this finds a user’s history without knowing where they spoke, by running an FT.SEARCH over the idx:messages RediSearch index with TAG filters on platform and user_id (escaped via _escape_tag()), sorted by timestamp descending. Results are materialised through _doc_to_cached_message() (embeddings are not returned). Backs get_recent_speaker_channels() and several tools (tools.xray_tool, tools.dm_history, tools.chat_analytics, tools.gravimetric_telescope).

Parameters:
  • platform (str) – Platform identifier (e.g. "discord").

  • user_id (str) – The speaker’s platform user id.

  • limit (int) – Maximum number of messages to return.

Returns:

The user’s messages newest-first, across every channel (including DMs).

Return type:

list[CachedMessage]

async get_recent_speaker_channels(platform, user_id, limit=10, lookback=300)[source]

Return the speaker’s most-recently-used channel IDs (MRU order).

Walks the last lookback messages this user sent on platform (newest first) and returns up to limit unique channel IDs in most-recent-first order.

Privacy-by-construction: the result only contains channels where user_id has actually sent at least one message — i.e. channels the speaker demonstrably had access to at write time. Other users’ DMs and channels the speaker only lurked in cannot leak through this function.

Return type:

list[str]

Parameters:
async record_channel_metadata(platform, channel_id, *, channel_name='', guild_id='', guild_name='', is_dm=False)[source]

Upsert human-readable metadata for a channel.

Empty fields are still written so a later read can tell the difference between “channel was never seen” (HGETALL returns empty mapping) and “channel was seen but has no guild” (DM-style: guild_id/guild_name explicitly empty).

is_dm is persisted as "1" / "0" and is consulted by the cross-channel recall path to avoid ever leaking 1-on-1 DM (Discord) or 2-member room (Matrix) context into another channel’s prompt.

Return type:

None

Parameters:
  • platform (str)

  • channel_id (str)

  • channel_name (str)

  • guild_id (str)

  • guild_name (str)

  • is_dm (bool)

async get_channel_metadata_many(platform, channel_ids)[source]

Batch-fetch channel metadata. Missing channels are omitted.

Returns {channel_id: {"channel_name": str, "guild_id": str, "guild_name": str, "is_dm": bool, "updated_at": str}}.

is_dm is decoded back to a Python bool. Channels written before the is_dm field existed default to False (legacy rows are unlikely to be DMs because there is no pre-existing deploy that would have populated this cache).

Return type:

dict[str, dict[str, Any]]

Parameters:
async log_thought_summary(channel_id, thought_text, timestamp=None)[source]

Store a thought summary in a per-channel Redis sorted set.

Deduplicates by comparing against the most recent entry — if it has the same content the new entry is silently skipped.

Parameters:
  • channel_id (str) – Platform-agnostic channel identifier.

  • thought_text (str) – The raw text extracted from <thought> tags.

  • timestamp (float | None) – Unix timestamp; defaults to time.time().

Return type:

None

async get_recent_thought_summaries(channel_id, limit=30)[source]

Retrieve unique thought summaries for a channel (newest first).

Parameters:
  • channel_id (str) – Platform-agnostic channel identifier.

  • limit (int) – Maximum number of unique summaries to return.

Returns:

Each dict has content (str) and timestamp (float).

Return type:

list[dict[str, Any]]

async backfill_channel_indexes(batch_size=500)[source]

Rebuild the per-channel sorted-set indexes from stored message hashes.

A maintenance/migration routine for when the channel_msgs:* indexes are missing or incomplete (e.g. after introducing them) but the underlying msg:* hashes still exist. SCANs all msg:* keys in batch_size pages, pipelines HMGET ... platform channel_id timestamp to read their routing fields, then ZADDs each key into its channel_msgs:{platform}:{channel_id} sorted set, refreshes the zset TTL, and registers the channel in sg:active_channels — logging a total at the end. Idempotent: ZADD simply re-scores existing members, so it is safe to run repeatedly. No in-repo callers (operational entry point, invoked manually).

Parameters:

batch_size (int) – Number of keys to scan and process per page.

Returns:

The number of messages (re)indexed.

Return type:

int

async log_tool_call_record(*, record_id, tool_name, raw_arguments_json, result_output, success, execution_start, execution_end, duration_ms, order_index, round_number, channel_id, platform)[source]

Persist one hidden tool-call execution trace to Redis.

Part of the Stargazer-System-Log pipeline: every individual tool invocation in a turn is recorded so its arguments, output, timing, and success can be replayed or summarised later. Pipelines an HSET of the toolcall:{record_id} hash (result output capped at 50_000 chars), sets a 90-day TTL, and ZADDs the key into the per-channel tool_call_log:{platform}:{channel_id} sorted set scored by execution start. Deliberately kept out of channel_msgs:* so the prompt context builder never surfaces these raw traces; they are reached only via their summary (see log_tool_call_summary()). The turn_summary_id is left blank here and back-patched later. Called by message_processor.generate_and_send.

Parameters:
  • record_id (str) – Unique id for this trace; forms the toolcall:{record_id} key.

  • tool_name (str) – Name of the invoked tool.

  • raw_arguments_json (str) – JSON-encoded tool arguments as called.

  • result_output (str) – The tool’s output (stored truncated to 50_000 chars).

  • success (bool) – Whether the call succeeded.

  • execution_start (float) – Unix start time; also the zset score.

  • execution_end (float) – Unix end time.

  • duration_ms (float) – Wall-clock duration in milliseconds.

  • order_index (int) – Position of this call within the turn.

  • round_number (int) – Tool-calling round the call belongs to.

  • channel_id (str) – Channel identifier.

  • platform (str) – Platform identifier (e.g. "discord").

Returns:

The Redis key the record was stored under (toolcall:{record_id}).

Return type:

str

async log_tool_call_summary(*, summary_id, record_ids, summary_text, channel_id, platform, timestamp=None)[source]

Persist the turn-level Stargazer-System-Log summary linking its traces.

The visible counterpart to the hidden log_tool_call_record() traces: one human-readable summary of everything the tools did this turn, keyed by the assistant message it belongs to. Pipelines an HSET of the toolcall_summary:{summary_id} hash (with record_ids stored as a JSON array), sets a 90-day TTL, ZADDs it into the per-channel tool_call_summaries:{platform}:{channel_id} sorted set, and back-patches each referenced toolcall:{rid} record’s turn_summary_id so traces point back at their summary. Called by message_processor.generate_and_send and tools.librarian_tool.

Parameters:
  • summary_id (str) – Unique id for the summary; forms the toolcall_summary:{summary_id} key.

  • record_ids (list[str]) – Ids of the toolcall:* records this summary covers.

  • summary_text (str) – The human-readable turn summary.

  • channel_id (str) – Channel identifier.

  • platform (str) – Platform identifier (e.g. "discord").

  • timestamp (float | None) – Unix timestamp and zset score; defaults to time.time().

Returns:

The Redis key the summary was stored under (toolcall_summary:{summary_id}).

Return type:

str

async get_tool_call_records_by_summary(summary_id)[source]

Fetch all hidden tool-call records belonging to a summary.

The expansion read for the Stargazer-System-Log: given a summary id, pull back the full per-call traces it links so a user (or tool) can inspect exactly what each tool did. HGETALLs the toolcall_summary:{summary_id} hash, parses its record_ids JSON array, pipelines HGETALL for each toolcall:{rid} hash, skips expired/empty rows, and sorts the survivors by order_index. Returns empty when the summary id is unknown or has no records. Called by tools.retrieve_tool_call_log.

Parameters:

summary_id (str) – The summary id whose linked records to fetch.

Returns:

The record field mappings ordered by order_index; empty if the summary or its records are missing.

Return type:

list[dict[str, str]]

async get_recent_tool_call_summaries(platform, channel_id, limit=10)[source]

Fetch the most recent tool-call summaries for a channel.

Feeds the context-injection and observability surfaces with a channel’s recent Stargazer-System-Log summaries. ZREVRANGEs the per-channel tool_call_summaries:{platform}:{channel_id} sorted set for the newest limit keys and pipelines HGETALL for each, dropping expired/empty rows. On a Redis read error it records a degraded-read metric via _note_context_read_degraded() and fails open with an empty list so a missing read never blocks a turn. Called by message_processor.context_injections, message_processor.generate_and_send, and web.obs.

Parameters:
  • platform (str) – Platform identifier (e.g. "discord").

  • channel_id (str) – Channel identifier.

  • limit (int) – Maximum number of summaries to return.

Returns:

Summary field mappings newest-first (each with summary_id, summary_text, timestamp, and a record_ids JSON array); empty on miss or degraded read.

Return type:

list[dict[str, str]]

async set_ctxbreak_ts(platform, channel_id, ts)[source]

Persist a context-break timestamp floor for a channel.

Implements the user-facing “context break” / fresh-start command: once set, every context-building pathway must drop messages whose timestamp is <= this value, so the model starts the conversation cleanly from this point. SETs the ctxbreak:{platform}:{channel_id} string key (built by _ctxbreak_key()) to the stringified timestamp. The counterpart reader is get_ctxbreak_ts(). Called by message_processor.processor.

Parameters:
  • platform (str) – Platform identifier (e.g. "discord").

  • channel_id (str) – Channel identifier.

  • ts (float) – The Unix-timestamp floor; messages at or before it are excluded from context.

Return type:

None

async get_ctxbreak_ts(platform, channel_id)[source]

Return a channel’s context-break timestamp floor, or None.

The reader paired with set_ctxbreak_ts(): every context assembly path consults this to learn the cutoff below which messages must be excluded. GETs the ctxbreak:{platform}:{channel_id} key (built by _ctxbreak_key()) and parses it to a float, returning None when unset or unparseable (i.e. no break in effect). Called by prompt_context, message_processor.proactive_gates, message_processor.generate_and_send, message_processor.history_backfill, and web.obs.

Parameters:
  • platform (str) – Platform identifier (e.g. "discord").

  • channel_id (str) – Channel identifier.

Returns:

The context-break floor in Unix seconds, or None if none is set.

Return type:

float | None

property redis_client: redis.asyncio.Redis

The underlying async Redis connection.

Exposed so that other subsystems (e.g. ToolContext) can share the same connection without reaching into private state.

property redis_raw_client: redis.asyncio.Redis

The underlying async Redis connection with decode_responses=False.

Exposed so that subsystems needing raw binary payloads (e.g., RedisEventBus) can use it.

async close()[source]

Close both underlying Redis connections held by this cache.

Releases the decoded (self._redis) and raw-binary (self._redis_raw) connection pools created in __init__(), calling aclose() on each. Invoked at shutdown / teardown so sockets and pooled connections are not leaked. Called by maintenance scripts such as build_kg.

Return type:

None

async message_cache.get_active_channels(redis, limit=10)[source]

Discover the most recently active channels across the whole cache.

A module-level utility (not a MessageCache method, so it can run against any raw Redis handle) that background workers use to decide which channels to summarise, extract knowledge from, or run heartbeats on. SCANs all channel_msgs:* keys, pipelines ZREVRANGE ... 0 0 WITHSCORES to read each channel’s newest message score, parses the platform and channel_id back out of the key name, and returns the top limit by recency. Read-only; on any failure it logs, records a degraded-read metric via _note_context_read_degraded(), and fails open with an empty list. Called by background_tasks, kg_extraction, background_agents.channel_summarizer, and background_agents.channel_heartbeat.

Parameters:
  • redis (Any) – An async Redis client (decoded or raw) to scan.

  • limit (int) – Maximum number of channels to return.

Returns:

Up to limit (platform, channel_id) tuples, most-recently-active first; empty on error or when no channels exist.

Return type:

list[tuple[str, str]]