Source code for background_agents.channel_summarizer

"""Channel summarizer — periodically summarises recent channel activity.

Scans Redis for the 10 most-recently-used ``channel_msgs:*`` sorted sets,
fetches recent messages, generates a per-channel summary via an LLM call
(gemini-3-flash-preview), and stores the result in Redis for retrieval by
``channel_summary_tools`` and ``cross_channel_query``.
"""

from __future__ import annotations

import asyncio
import json
import logging
import time
from datetime import datetime, timezone
from typing import Any

logger = logging.getLogger(__name__)

_SUMMARY_KEY_PREFIX = "stargazer:last1k_summary:"
_META_KEY = "stargazer:mru_summary"
_SUMMARY_TTL = 86400 * 7  # 7 days
_MESSAGES_PER_CHANNEL = 300
_MAX_CHANNELS = 10
_SUMMARY_MODEL = "gemini-3-flash-preview"

_CHANNEL_SYSTEM_PROMPT = (
    "You are a concise channel analyst. Given a transcript of recent "
    "messages from a chat channel, produce a structured JSON summary with "
    "the following keys:\n"
    '  "overview": a 2-3 sentence summary of the channel\'s recent activity,\n'
    '  "key_topics": a list of the main discussion topics,\n'
    '  "active_users": a list of the most active participants,\n'
    '  "notable_discussions": a list of noteworthy exchanges or decisions,\n'
    '  "mood": a short description of the overall tone/mood.\n\n'
    "The transcript uses the following format per line:\n"
    "  [ISO_TIMESTAMP] DisplayName (UserID) [Message ID: ID] [Replying to: ID] : message text\n"
    "Where:\n"
    "  - ISO_TIMESTAMP is the UTC time the message was sent\n"
    "  - DisplayName is the user's display name\n"
    "  - UserID is the platform-specific user identifier\n"
    "  - Message ID is the platform-specific message identifier\n"
    "  - [Replying to: ID] is only present if the message is a reply\n\n"
    "Respond ONLY with valid JSON, no markdown fences or extra text."
)

_META_SYSTEM_PROMPT = (
    "You are a concise cross-channel analyst. Given per-channel summaries, "
    "create a brief JSON meta-summary with keys:\n"
    '  "overview": 2-3 sentences on overall server activity,\n'
    '  "key_themes": common themes across channels,\n'
    '  "most_active_channels": which channels are busiest,\n'
    '  "notable_discussions": highlights worth knowing about.\n\n'
    "Respond ONLY with valid JSON, no markdown fences or extra text."
)


# ------------------------------------------------------------------
# Core logic
# ------------------------------------------------------------------

[docs] async def summarise_channel( channel_id: str, platform: str, redis: Any, openrouter: Any, messages_limit: int = _MESSAGES_PER_CHANNEL, ) -> dict | None: """Summarise recent messages for a single channel. Returns the parsed summary dict or ``None`` if nothing to summarise. """ messages = await _fetch_channel_messages( redis, platform, channel_id, messages_limit, ) if not messages: return None # Build a transcript using the canonical bot message format lines: list[str] = [] for m in messages: ts = m.get("timestamp", 0) dt = datetime.fromtimestamp(float(ts), tz=timezone.utc) ts_str = dt.isoformat() user_name = m.get("user_name", "?") user_id = m.get("user_id", "?") text = m.get("text", "") message_id = m.get("message_id", "") reply_to_id = m.get("reply_to_id", "") if not text.strip(): continue formatted = ( f"[{ts_str}] {user_name} ({user_id})" f" [Message ID: {message_id}]" ) if reply_to_id: formatted += f" [Replying to: {reply_to_id}]" formatted += f" : {text}" lines.append(formatted) if not lines: return None transcript = "\n".join(lines) # Cap at ~120k chars to stay within context limits if len(transcript) > 120_000: transcript = transcript[-120_000:] llm_messages = [ {"role": "system", "content": _CHANNEL_SYSTEM_PROMPT}, {"role": "user", "content": transcript}, ] try: raw_response = await openrouter.chat(llm_messages, tool_names=[]) except Exception as e: logger.error( "Channel summarisation LLM call failed for %s: %s", channel_id, e, ) return None # Parse the JSON response summary = _parse_json_response(raw_response) if summary is None: # Store the raw text as a fallback summary = {"overview": raw_response, "raw": True} summary["channel_id"] = channel_id summary["platform"] = platform summary["message_count"] = len(messages) summary["generated_at"] = time.time() # Store in Redis key = f"{_SUMMARY_KEY_PREFIX}{channel_id}" try: await redis.set( key, json.dumps(summary, default=str), ex=_SUMMARY_TTL, ) logger.info( "Stored summary for channel %s (%d messages)", channel_id, len(messages), ) except Exception: logger.debug("Failed to store channel summary", exc_info=True) return summary
[docs] async def summarise_all_active( redis: Any, max_channels: int = _MAX_CHANNELS, ) -> dict[str, Any]: """Summarise the most recently active channels. Finds channels by scanning ``channel_msgs:*`` sorted sets, picks the ones with the most recent activity, and summarises each. Returns a dict with ``channels_processed`` and ``summaries``. """ # Build a dedicated client using gemini-3-flash-preview from openrouter_client import OpenRouterClient from tools import ToolRegistry from config import Config cfg = Config.load() summary_client = OpenRouterClient( api_key=cfg.api_key, model=_SUMMARY_MODEL, base_url=cfg.llm_base_url, tool_registry=ToolRegistry(), # no tools max_tool_rounds=1, ) try: from message_cache import get_active_channels channels = await get_active_channels(redis, max_channels) if not channels: logger.info("No active channels found for summarisation") return {"channels_processed": 0, "summaries": {}} logger.info( "Channel summarisation starting for %d channel(s)", len(channels), ) summaries: dict[str, dict] = {} failed: list[tuple[str, str]] = [] # (platform, channel_id) _BATCH_SIZE = 4 for i in range(0, len(channels), _BATCH_SIZE): batch = channels[i : i + _BATCH_SIZE] results = await asyncio.gather( *( summarise_channel(cid, plat, redis, summary_client) for plat, cid in batch ), return_exceptions=True, ) for (plat, cid), result in zip(batch, results): if isinstance(result, Exception): logger.warning( "Channel summarisation failed for %s:%s (will retry): %s", plat, cid, result, ) failed.append((plat, cid)) elif result: summaries[cid] = result # Retry failed channels once if failed: logger.info("Retrying %d failed channel summarisation(s)", len(failed)) retry_results = await asyncio.gather( *( summarise_channel(cid, plat, redis, summary_client) for plat, cid in failed ), return_exceptions=True, ) for (plat, cid), result in zip(failed, retry_results): if isinstance(result, Exception): logger.error( "Channel summarisation retry also failed for %s:%s: %s", plat, cid, result, ) elif result: summaries[cid] = result # Build a meta-summary across all channels if summaries: combined_parts: list[str] = [] for cid, s in summaries.items(): overview = s.get("overview", "") topics = ", ".join(s.get("key_topics", [])) combined_parts.append( f"Channel {cid} ({s.get('platform', '?')}): " f"{overview} Topics: {topics}" ) combined = "\n\n".join(combined_parts) meta_msgs = [ {"role": "system", "content": _META_SYSTEM_PROMPT}, {"role": "user", "content": combined[:60_000]}, ] try: meta_raw = await summary_client.chat(meta_msgs, tool_names=[]) meta = _parse_json_response(meta_raw) if meta is None: meta = {"overview": meta_raw} meta["generated_at"] = time.time() meta["channels_included"] = list(summaries.keys()) await redis.set( _META_KEY, json.dumps(meta, default=str), ex=_SUMMARY_TTL, ) logger.info( "Stored meta-summary across %d channels", len(summaries), ) except Exception: logger.debug("Meta-summary failed", exc_info=True) logger.info( "Channel summarisation complete: %d/%d channels processed", len(summaries), len(channels), ) return { "channels_processed": len(summaries), "summaries": { cid: s.get("overview", "")[:200] for cid, s in summaries.items() }, } finally: await summary_client.close()
# ------------------------------------------------------------------ # Helpers # ------------------------------------------------------------------ def _parse_json_response(text: str) -> dict | None: """Try to parse a JSON object from an LLM response.""" text = text.strip() # Strip markdown fences if present if text.startswith("```"): first_nl = text.index("\n") if "\n" in text else 3 text = text[first_nl + 1:] if text.endswith("```"): text = text[:-3].strip() try: result = json.loads(text) if isinstance(result, dict): return result except json.JSONDecodeError: pass return None async def _fetch_channel_messages( redis: Any, platform: str, channel_id: str, limit: int, ) -> list[dict[str, Any]]: """Fetch recent messages from the channel_msgs ZSET.""" zset_key = f"channel_msgs:{platform}:{channel_id}" try: # Get the most recent message keys msg_keys: list[str] = await redis.zrevrange(zset_key, 0, limit - 1) if not msg_keys: return [] # Pipeline HMGET for the fields we need fields = ( "user_name", "text", "timestamp", "user_id", "message_id", "reply_to_id", ) pipe = redis.pipeline() for key in msg_keys: pipe.hmget(key, *fields) results = await pipe.execute() messages: list[dict[str, Any]] = [] for values in results: if not values or all(v is None for v in values): continue mapping = dict(zip(fields, values)) messages.append(mapping) # Reverse to chronological order (oldest first) messages.reverse() return messages except Exception: logger.debug( "Failed to fetch messages for channel %s:%s", platform, channel_id, exc_info=True, ) return [] # _get_active_channels moved to message_cache.get_active_channels