"""Channel summarizer — periodically summarises recent channel activity.
Scans Redis for the 10 most-recently-used ``channel_msgs:*`` sorted sets,
fetches recent messages, generates a per-channel summary via an LLM call
(gemini-3-flash-preview), and stores the result in Redis for retrieval by
``channel_summary_tools`` and ``cross_channel_query``.
"""
from __future__ import annotations
import asyncio
import jsonutil as json
import logging
import time
from datetime import datetime, timezone
from typing import Any
logger = logging.getLogger(__name__)
_SUMMARY_KEY_PREFIX = "stargazer:last1k_summary:"
_META_KEY = "stargazer:mru_summary"
_SUMMARY_TTL = 86400 * 7 # 7 days
_MESSAGES_PER_CHANNEL = 300
_MAX_CHANNELS = 10
_SUMMARY_MODEL = "gemini-3-flash-preview"
_CHANNEL_SYSTEM_PROMPT = (
"You are a concise channel analyst. Given a transcript of recent "
"messages from a chat channel, produce a structured JSON summary with "
"the following keys:\n"
' "overview": a 2-3 sentence summary of the channel\'s recent activity,\n'
' "key_topics": a list of the main discussion topics,\n'
' "active_users": a list of the most active participants,\n'
' "notable_discussions": a list of noteworthy exchanges or decisions,\n'
' "mood": a short description of the overall tone/mood.\n\n'
"The transcript uses the following format per line:\n"
" [ISO_TIMESTAMP] DisplayName (UserID) [Message ID: ID] [Replying to: ID] : message text\n"
"Where:\n"
" - ISO_TIMESTAMP is the UTC time the message was sent\n"
" - DisplayName is the user's display name\n"
" - UserID is the platform-specific user identifier\n"
" - Message ID is the platform-specific message identifier\n"
" - [Replying to: ID] is only present if the message is a reply\n\n"
"Respond ONLY with valid JSON, no markdown fences or extra text."
)
_META_SYSTEM_PROMPT = (
"You are a concise cross-channel analyst. Given per-channel summaries, "
"create a brief JSON meta-summary with keys:\n"
' "overview": 2-3 sentences on overall server activity,\n'
' "key_themes": common themes across channels,\n'
' "most_active_channels": which channels are busiest,\n'
' "notable_discussions": highlights worth knowing about.\n\n'
"Respond ONLY with valid JSON, no markdown fences or extra text."
)
# ------------------------------------------------------------------
# Core logic
# ------------------------------------------------------------------
[docs]
async def summarise_channel(
channel_id: str,
platform: str,
redis: Any,
openrouter: Any,
messages_limit: int = _MESSAGES_PER_CHANNEL,
) -> dict | None:
"""Summarise one channel's recent messages and cache the result in Redis.
Builds a transcript from the channel's recent history, asks the LLM for a
structured JSON summary (overview, key topics, active users, notable
discussions, mood), enriches it with metadata, and stores it under a
per-channel key for later retrieval by the summary tools and cross-channel
query. This is the per-channel unit of work behind the broader summarisation
pass.
Loads messages via :func:`_fetch_channel_messages` (reading the
``channel_msgs:{platform}:{channel_id}`` ZSET in Redis), renders them into
the canonical bot transcript format, truncates to roughly 120k characters,
and calls ``openrouter.chat`` with ``_CHANNEL_SYSTEM_PROMPT`` and no tools.
The reply is parsed by :func:`_parse_json_response` (falling back to raw text
on parse failure), stamped with ``channel_id``, ``platform``,
``message_count`` and ``generated_at``, and written to
``{_SUMMARY_KEY_PREFIX}{channel_id}`` with a 7-day TTL. It also fires
``observability.publish_debug_event`` as a background task on both success
and LLM failure. Called by :func:`summarise_all_active` (in batches and on
retry); no other callers.
Args:
channel_id (str): Platform-specific channel identifier to summarise.
platform (str): Platform the channel belongs to (e.g. ``"discord"``).
redis (Any): Async Redis client used to read messages and store the
summary.
openrouter (Any): :class:`openrouter_client.OpenRouterClient` (flash
model, no tools) used for the chat completion.
messages_limit (int): Maximum recent messages to pull into the
transcript; defaults to ``_MESSAGES_PER_CHANNEL``.
Returns:
dict | None: The stored summary dict, or ``None`` when the channel has
no usable messages or the LLM call fails.
"""
messages = await _fetch_channel_messages(
redis,
platform,
channel_id,
messages_limit,
)
if not messages:
return None
# Build a transcript using the canonical bot message format
lines: list[str] = []
for m in messages:
ts = m.get("timestamp", 0)
dt = datetime.fromtimestamp(float(ts), tz=timezone.utc)
ts_str = dt.isoformat()
user_name = m.get("user_name", "?")
user_id = m.get("user_id", "?")
text = m.get("text", "")
message_id = m.get("message_id", "")
reply_to_id = m.get("reply_to_id", "")
if not text.strip():
continue
formatted = f"[{ts_str}] {user_name} ({user_id})" f" [Message ID: {message_id}]"
if reply_to_id:
formatted += f" [Replying to: {reply_to_id}]"
formatted += f" : {text}"
lines.append(formatted)
if not lines:
return None
transcript = "\n".join(lines)
# Cap at ~120k chars to stay within context limits
if len(transcript) > 120_000:
transcript = transcript[-120_000:]
llm_messages = [
{"role": "system", "content": _CHANNEL_SYSTEM_PROMPT},
{"role": "user", "content": transcript},
]
t0 = time.monotonic()
try:
raw_response = await openrouter.chat(llm_messages, tool_names=[])
except Exception as e:
logger.error(
"Channel summarisation LLM call failed for %s: %s",
channel_id,
e,
)
from observability import publish_debug_event
asyncio.create_task(
publish_debug_event(
"channel_summary",
"channel_summarizer",
channel_id=channel_id,
platform=platform,
status="error",
duration_ms=(time.monotonic() - t0) * 1000,
preview=f"messages={len(messages)} error={str(e)[:100]}",
payload={
"channel_id": channel_id,
"platform": platform,
"message_count": len(messages),
"error": str(e),
},
),
name="obs_chan_sum_err",
)
return None
# Parse the JSON response
summary = _parse_json_response(raw_response)
if summary is None:
# Store the raw text as a fallback
summary = {"overview": raw_response, "raw": True}
from observability import publish_debug_event
asyncio.create_task(
publish_debug_event(
"channel_summary",
"channel_summarizer",
channel_id=channel_id,
platform=platform,
status="ok",
duration_ms=(time.monotonic() - t0) * 1000,
llm_output=raw_response,
preview=f"messages={len(messages)} overview='{str(summary.get('overview', ''))[:100]}'",
payload={
"channel_id": channel_id,
"platform": platform,
"message_count": len(messages),
"overview": summary.get("overview", ""),
"key_topics": summary.get("key_topics", []),
"active_users": summary.get("active_users", []),
},
),
name="obs_chan_sum_ok",
)
summary["channel_id"] = channel_id
summary["platform"] = platform
summary["message_count"] = len(messages)
summary["generated_at"] = time.time()
# Store in Redis
key = f"{_SUMMARY_KEY_PREFIX}{channel_id}"
try:
await redis.set(
key,
json.dumps(summary, default=str),
ex=_SUMMARY_TTL,
)
logger.info(
"Stored summary for channel %s (%d messages)",
channel_id,
len(messages),
)
except Exception:
logger.debug("Failed to store channel summary", exc_info=True)
return summary
[docs]
async def summarise_all_active(
redis: Any,
max_channels: int = _MAX_CHANNELS,
) -> dict[str, Any]:
"""Summarise the most recently active channels and build a meta-summary.
The top-level orchestration entry point: it discovers the busiest channels,
summarises them concurrently in small batches with a one-shot retry, then
rolls the per-channel overviews up into a single cross-channel meta-summary
cached in Redis. This keeps both the per-channel and server-wide summaries
used in retrieval and context assembly fresh.
Constructs its own dedicated :class:`openrouter_client.OpenRouterClient` on
``_SUMMARY_MODEL`` (gemini-3-flash-preview, no tools) from ``Config.load()``,
finds channels via :func:`message_cache.get_active_channels` (which scans the
``channel_msgs:*`` ZSETs), and fans each out to :func:`summarise_channel` in
batches of four under :func:`asyncio.gather`, retrying any that raised once.
When at least one channel succeeds it sends a combined overview through the
same client with ``_META_SYSTEM_PROMPT`` and writes the parsed meta-summary
to ``_META_KEY`` with a 7-day TTL. Closes the client in a ``finally``. Called
by the inference/agents service's ``channel_summarization`` task in
``background_tasks.py`` (the only caller), which logs the processed count.
Args:
redis (Any): Async Redis client for channel discovery and storage.
max_channels (int): Cap on how many active channels to process;
defaults to ``_MAX_CHANNELS``.
Returns:
dict[str, Any]: A result dict with ``channels_processed`` (int) and
``summaries`` (a mapping of channel id to a truncated overview string).
"""
# Build a dedicated client using gemini-3-flash-preview
from openrouter_client import OpenRouterClient
from tools import ToolRegistry
from config import Config
cfg = Config.load()
summary_client = OpenRouterClient(
api_key=cfg.api_key,
model=_SUMMARY_MODEL,
base_url=cfg.llm_base_url,
tool_registry=ToolRegistry(), # no tools
max_tool_rounds=1,
top_p=cfg.top_p,
http_connect_timeout=cfg.openrouter_http_connect_timeout_seconds,
http_read_timeout=cfg.openrouter_http_read_timeout_seconds,
http_write_timeout=cfg.openrouter_http_write_timeout_seconds,
http_pool_timeout=cfg.openrouter_http_pool_timeout_seconds,
)
try:
from message_cache import get_active_channels
channels = await get_active_channels(redis, max_channels)
if not channels:
logger.info("No active channels found for summarisation")
return {"channels_processed": 0, "summaries": {}}
logger.info(
"Channel summarisation starting for %d channel(s)",
len(channels),
)
summaries: dict[str, dict] = {}
failed: list[tuple[str, str]] = [] # (platform, channel_id)
_BATCH_SIZE = 4
for i in range(0, len(channels), _BATCH_SIZE):
batch = channels[i : i + _BATCH_SIZE]
results = await asyncio.gather(
*(
summarise_channel(cid, plat, redis, summary_client)
for plat, cid in batch
),
return_exceptions=True,
)
for (plat, cid), result in zip(batch, results):
if isinstance(result, Exception):
logger.warning(
"Channel summarisation failed for %s:%s (will retry): %s",
plat,
cid,
result,
)
failed.append((plat, cid))
elif result:
summaries[cid] = result
# Retry failed channels once
if failed:
logger.info("Retrying %d failed channel summarisation(s)", len(failed))
retry_results = await asyncio.gather(
*(
summarise_channel(cid, plat, redis, summary_client)
for plat, cid in failed
),
return_exceptions=True,
)
for (plat, cid), result in zip(failed, retry_results):
if isinstance(result, Exception):
logger.error(
"Channel summarisation retry also failed for %s:%s: %s",
plat,
cid,
result,
)
elif result:
summaries[cid] = result
# Build a meta-summary across all channels
if summaries:
combined_parts: list[str] = []
for cid, s in summaries.items():
overview = s.get("overview", "")
topics = ", ".join(s.get("key_topics", []))
combined_parts.append(
f"Channel {cid} ({s.get('platform', '?')}): "
f"{overview} Topics: {topics}"
)
combined = "\n\n".join(combined_parts)
meta_msgs = [
{"role": "system", "content": _META_SYSTEM_PROMPT},
{"role": "user", "content": combined[:60_000]},
]
try:
meta_raw = await summary_client.chat(meta_msgs, tool_names=[])
meta = _parse_json_response(meta_raw)
if meta is None:
meta = {"overview": meta_raw}
meta["generated_at"] = time.time()
meta["channels_included"] = list(summaries.keys())
await redis.set(
_META_KEY,
json.dumps(meta, default=str),
ex=_SUMMARY_TTL,
)
logger.info(
"Stored meta-summary across %d channels",
len(summaries),
)
except Exception:
logger.debug("Meta-summary failed", exc_info=True)
logger.info(
"Channel summarisation complete: %d/%d channels processed",
len(summaries),
len(channels),
)
return {
"channels_processed": len(summaries),
"summaries": {
cid: s.get("overview", "")[:200] for cid, s in summaries.items()
},
}
finally:
await summary_client.close()
# ------------------------------------------------------------------
# Helpers
# ------------------------------------------------------------------
def _parse_json_response(text: str) -> dict | None:
"""Best-effort parse of a JSON object out of a raw LLM response.
Tolerates the common ways a model wraps its JSON — surrounding whitespace
and Markdown code fences — so callers can request "JSON only" yet still cope
when the model adds fences anyway. Returns ``None`` on any parse failure or
when the decoded value is not a JSON object, letting callers fall back to
storing the raw text. Pure and side-effect free.
Strips a leading triple-backtick fence and its trailing close if present,
then runs ``jsonutil.loads``. Called by :func:`summarise_channel` (on the
per-channel reply) and :func:`summarise_all_active` (on the meta-summary
reply); no other callers.
Args:
text (str): The raw model output to parse.
Returns:
dict | None: The decoded object, or ``None`` if the text is not a valid
JSON object.
"""
text = text.strip()
# Strip markdown fences if present
if text.startswith("```"):
first_nl = text.index("\n") if "\n" in text else 3
text = text[first_nl + 1 :]
if text.endswith("```"):
text = text[:-3].strip()
try:
result = json.loads(text)
if isinstance(result, dict):
return result
except json.JSONDecodeError:
pass
return None
async def _fetch_channel_messages(
redis: Any,
platform: str,
channel_id: str,
limit: int,
) -> list[dict[str, Any]]:
"""Read a channel's recent messages from Redis in chronological order.
Pulls up to ``limit`` of the newest messages for a channel and returns them
oldest-first so the caller can lay out a readable transcript. Resilient by
design: any Redis error is logged at debug level and yields an empty list
rather than raising, so a single bad channel cannot abort a summarisation
batch.
Reads the newest member keys from the ``channel_msgs:{platform}:{channel_id}``
ZSET via ``ZREVRANGE``, then pipelines an ``HMGET`` per key to fetch the
``user_name``, ``text``, ``timestamp``, ``user_id``, ``message_id`` and
``reply_to_id`` fields, skips all-empty rows, and reverses the result into
chronological order. Called by :func:`summarise_channel` (the only caller
in this module; a same-named but differently-signed helper in
``tools/chat_analytics.py`` is unrelated).
Args:
redis (Any): Async Redis client used for the ZSET and hash reads.
platform (str): Platform the channel belongs to.
channel_id (str): Platform-specific channel identifier.
limit (int): Maximum number of recent messages to fetch.
Returns:
list[dict[str, Any]]: Per-message field dicts in chronological order
(oldest first); empty if there are no messages or a Redis error occurs.
"""
zset_key = f"channel_msgs:{platform}:{channel_id}"
try:
# Get the most recent message keys
msg_keys: list[str] = await redis.zrevrange(zset_key, 0, limit - 1)
if not msg_keys:
return []
# Pipeline HMGET for the fields we need
fields = (
"user_name",
"text",
"timestamp",
"user_id",
"message_id",
"reply_to_id",
)
pipe = redis.pipeline()
for key in msg_keys:
pipe.hmget(key, *fields)
results = await pipe.execute()
messages: list[dict[str, Any]] = []
for values in results:
if not values or all(v is None for v in values):
continue
mapping = dict(zip(fields, values))
messages.append(mapping)
# Reverse to chronological order (oldest first)
messages.reverse()
return messages
except Exception:
logger.debug(
"Failed to fetch messages for channel %s:%s",
platform,
channel_id,
exc_info=True,
)
return []
# _get_active_channels moved to message_cache.get_active_channels