"""Channel summarizer — periodically summarises recent channel activity.
Scans Redis for the 10 most-recently-used ``channel_msgs:*`` sorted sets,
fetches recent messages, generates a per-channel summary via an LLM call
(gemini-3-flash-preview), and stores the result in Redis for retrieval by
``channel_summary_tools`` and ``cross_channel_query``.
"""
from __future__ import annotations
import asyncio
import json
import logging
import time
from datetime import datetime, timezone
from typing import Any
logger = logging.getLogger(__name__)
_SUMMARY_KEY_PREFIX = "stargazer:last1k_summary:"
_META_KEY = "stargazer:mru_summary"
_SUMMARY_TTL = 86400 * 7 # 7 days
_MESSAGES_PER_CHANNEL = 300
_MAX_CHANNELS = 10
_SUMMARY_MODEL = "gemini-3-flash-preview"
_CHANNEL_SYSTEM_PROMPT = (
"You are a concise channel analyst. Given a transcript of recent "
"messages from a chat channel, produce a structured JSON summary with "
"the following keys:\n"
' "overview": a 2-3 sentence summary of the channel\'s recent activity,\n'
' "key_topics": a list of the main discussion topics,\n'
' "active_users": a list of the most active participants,\n'
' "notable_discussions": a list of noteworthy exchanges or decisions,\n'
' "mood": a short description of the overall tone/mood.\n\n'
"The transcript uses the following format per line:\n"
" [ISO_TIMESTAMP] DisplayName (UserID) [Message ID: ID] [Replying to: ID] : message text\n"
"Where:\n"
" - ISO_TIMESTAMP is the UTC time the message was sent\n"
" - DisplayName is the user's display name\n"
" - UserID is the platform-specific user identifier\n"
" - Message ID is the platform-specific message identifier\n"
" - [Replying to: ID] is only present if the message is a reply\n\n"
"Respond ONLY with valid JSON, no markdown fences or extra text."
)
_META_SYSTEM_PROMPT = (
"You are a concise cross-channel analyst. Given per-channel summaries, "
"create a brief JSON meta-summary with keys:\n"
' "overview": 2-3 sentences on overall server activity,\n'
' "key_themes": common themes across channels,\n'
' "most_active_channels": which channels are busiest,\n'
' "notable_discussions": highlights worth knowing about.\n\n'
"Respond ONLY with valid JSON, no markdown fences or extra text."
)
# ------------------------------------------------------------------
# Core logic
# ------------------------------------------------------------------
[docs]
async def summarise_channel(
channel_id: str,
platform: str,
redis: Any,
openrouter: Any,
messages_limit: int = _MESSAGES_PER_CHANNEL,
) -> dict | None:
"""Summarise recent messages for a single channel.
Returns the parsed summary dict or ``None`` if nothing to summarise.
"""
messages = await _fetch_channel_messages(
redis, platform, channel_id, messages_limit,
)
if not messages:
return None
# Build a transcript using the canonical bot message format
lines: list[str] = []
for m in messages:
ts = m.get("timestamp", 0)
dt = datetime.fromtimestamp(float(ts), tz=timezone.utc)
ts_str = dt.isoformat()
user_name = m.get("user_name", "?")
user_id = m.get("user_id", "?")
text = m.get("text", "")
message_id = m.get("message_id", "")
reply_to_id = m.get("reply_to_id", "")
if not text.strip():
continue
formatted = (
f"[{ts_str}] {user_name} ({user_id})"
f" [Message ID: {message_id}]"
)
if reply_to_id:
formatted += f" [Replying to: {reply_to_id}]"
formatted += f" : {text}"
lines.append(formatted)
if not lines:
return None
transcript = "\n".join(lines)
# Cap at ~120k chars to stay within context limits
if len(transcript) > 120_000:
transcript = transcript[-120_000:]
llm_messages = [
{"role": "system", "content": _CHANNEL_SYSTEM_PROMPT},
{"role": "user", "content": transcript},
]
try:
raw_response = await openrouter.chat(llm_messages, tool_names=[])
except Exception as e:
logger.error(
"Channel summarisation LLM call failed for %s: %s",
channel_id, e,
)
return None
# Parse the JSON response
summary = _parse_json_response(raw_response)
if summary is None:
# Store the raw text as a fallback
summary = {"overview": raw_response, "raw": True}
summary["channel_id"] = channel_id
summary["platform"] = platform
summary["message_count"] = len(messages)
summary["generated_at"] = time.time()
# Store in Redis
key = f"{_SUMMARY_KEY_PREFIX}{channel_id}"
try:
await redis.set(
key, json.dumps(summary, default=str), ex=_SUMMARY_TTL,
)
logger.info(
"Stored summary for channel %s (%d messages)",
channel_id, len(messages),
)
except Exception:
logger.debug("Failed to store channel summary", exc_info=True)
return summary
[docs]
async def summarise_all_active(
redis: Any,
max_channels: int = _MAX_CHANNELS,
) -> dict[str, Any]:
"""Summarise the most recently active channels.
Finds channels by scanning ``channel_msgs:*`` sorted sets, picks the
ones with the most recent activity, and summarises each.
Returns a dict with ``channels_processed`` and ``summaries``.
"""
# Build a dedicated client using gemini-3-flash-preview
from openrouter_client import OpenRouterClient
from tools import ToolRegistry
from config import Config
cfg = Config.load()
summary_client = OpenRouterClient(
api_key=cfg.api_key,
model=_SUMMARY_MODEL,
base_url=cfg.llm_base_url,
tool_registry=ToolRegistry(), # no tools
max_tool_rounds=1,
)
try:
from message_cache import get_active_channels
channels = await get_active_channels(redis, max_channels)
if not channels:
logger.info("No active channels found for summarisation")
return {"channels_processed": 0, "summaries": {}}
logger.info(
"Channel summarisation starting for %d channel(s)",
len(channels),
)
summaries: dict[str, dict] = {}
failed: list[tuple[str, str]] = [] # (platform, channel_id)
_BATCH_SIZE = 4
for i in range(0, len(channels), _BATCH_SIZE):
batch = channels[i : i + _BATCH_SIZE]
results = await asyncio.gather(
*(
summarise_channel(cid, plat, redis, summary_client)
for plat, cid in batch
),
return_exceptions=True,
)
for (plat, cid), result in zip(batch, results):
if isinstance(result, Exception):
logger.warning(
"Channel summarisation failed for %s:%s (will retry): %s",
plat, cid, result,
)
failed.append((plat, cid))
elif result:
summaries[cid] = result
# Retry failed channels once
if failed:
logger.info("Retrying %d failed channel summarisation(s)", len(failed))
retry_results = await asyncio.gather(
*(
summarise_channel(cid, plat, redis, summary_client)
for plat, cid in failed
),
return_exceptions=True,
)
for (plat, cid), result in zip(failed, retry_results):
if isinstance(result, Exception):
logger.error(
"Channel summarisation retry also failed for %s:%s: %s",
plat, cid, result,
)
elif result:
summaries[cid] = result
# Build a meta-summary across all channels
if summaries:
combined_parts: list[str] = []
for cid, s in summaries.items():
overview = s.get("overview", "")
topics = ", ".join(s.get("key_topics", []))
combined_parts.append(
f"Channel {cid} ({s.get('platform', '?')}): "
f"{overview} Topics: {topics}"
)
combined = "\n\n".join(combined_parts)
meta_msgs = [
{"role": "system", "content": _META_SYSTEM_PROMPT},
{"role": "user", "content": combined[:60_000]},
]
try:
meta_raw = await summary_client.chat(meta_msgs, tool_names=[])
meta = _parse_json_response(meta_raw)
if meta is None:
meta = {"overview": meta_raw}
meta["generated_at"] = time.time()
meta["channels_included"] = list(summaries.keys())
await redis.set(
_META_KEY,
json.dumps(meta, default=str),
ex=_SUMMARY_TTL,
)
logger.info(
"Stored meta-summary across %d channels",
len(summaries),
)
except Exception:
logger.debug("Meta-summary failed", exc_info=True)
logger.info(
"Channel summarisation complete: %d/%d channels processed",
len(summaries), len(channels),
)
return {
"channels_processed": len(summaries),
"summaries": {
cid: s.get("overview", "")[:200]
for cid, s in summaries.items()
},
}
finally:
await summary_client.close()
# ------------------------------------------------------------------
# Helpers
# ------------------------------------------------------------------
def _parse_json_response(text: str) -> dict | None:
"""Try to parse a JSON object from an LLM response."""
text = text.strip()
# Strip markdown fences if present
if text.startswith("```"):
first_nl = text.index("\n") if "\n" in text else 3
text = text[first_nl + 1:]
if text.endswith("```"):
text = text[:-3].strip()
try:
result = json.loads(text)
if isinstance(result, dict):
return result
except json.JSONDecodeError:
pass
return None
async def _fetch_channel_messages(
redis: Any,
platform: str,
channel_id: str,
limit: int,
) -> list[dict[str, Any]]:
"""Fetch recent messages from the channel_msgs ZSET."""
zset_key = f"channel_msgs:{platform}:{channel_id}"
try:
# Get the most recent message keys
msg_keys: list[str] = await redis.zrevrange(zset_key, 0, limit - 1)
if not msg_keys:
return []
# Pipeline HMGET for the fields we need
fields = (
"user_name", "text", "timestamp", "user_id",
"message_id", "reply_to_id",
)
pipe = redis.pipeline()
for key in msg_keys:
pipe.hmget(key, *fields)
results = await pipe.execute()
messages: list[dict[str, Any]] = []
for values in results:
if not values or all(v is None for v in values):
continue
mapping = dict(zip(fields, values))
messages.append(mapping)
# Reverse to chronological order (oldest first)
messages.reverse()
return messages
except Exception:
logger.debug(
"Failed to fetch messages for channel %s:%s",
platform, channel_id, exc_info=True,
)
return []
# _get_active_channels moved to message_cache.get_active_channels