Source code for background_agents.channel_summarizer

"""Channel summarizer — periodically summarises recent channel activity.

Scans Redis for the 10 most-recently-used ``channel_msgs:*`` sorted sets,
fetches recent messages, generates a per-channel summary via an LLM call
(gemini-3-flash-preview), and stores the result in Redis for retrieval by
``channel_summary_tools`` and ``cross_channel_query``.
"""

from __future__ import annotations

import asyncio
import jsonutil as json
import logging
import time
from datetime import datetime, timezone
from typing import Any

logger = logging.getLogger(__name__)

_SUMMARY_KEY_PREFIX = "stargazer:last1k_summary:"
_META_KEY = "stargazer:mru_summary"
_SUMMARY_TTL = 86400 * 7  # 7 days
_MESSAGES_PER_CHANNEL = 300
_MAX_CHANNELS = 10
_SUMMARY_MODEL = "gemini-3-flash-preview"

_CHANNEL_SYSTEM_PROMPT = (
    "You are a concise channel analyst. Given a transcript of recent "
    "messages from a chat channel, produce a structured JSON summary with "
    "the following keys:\n"
    '  "overview": a 2-3 sentence summary of the channel\'s recent activity,\n'
    '  "key_topics": a list of the main discussion topics,\n'
    '  "active_users": a list of the most active participants,\n'
    '  "notable_discussions": a list of noteworthy exchanges or decisions,\n'
    '  "mood": a short description of the overall tone/mood.\n\n'
    "The transcript uses the following format per line:\n"
    "  [ISO_TIMESTAMP] DisplayName (UserID) [Message ID: ID] [Replying to: ID] : message text\n"
    "Where:\n"
    "  - ISO_TIMESTAMP is the UTC time the message was sent\n"
    "  - DisplayName is the user's display name\n"
    "  - UserID is the platform-specific user identifier\n"
    "  - Message ID is the platform-specific message identifier\n"
    "  - [Replying to: ID] is only present if the message is a reply\n\n"
    "Respond ONLY with valid JSON, no markdown fences or extra text."
)

_META_SYSTEM_PROMPT = (
    "You are a concise cross-channel analyst. Given per-channel summaries, "
    "create a brief JSON meta-summary with keys:\n"
    '  "overview": 2-3 sentences on overall server activity,\n'
    '  "key_themes": common themes across channels,\n'
    '  "most_active_channels": which channels are busiest,\n'
    '  "notable_discussions": highlights worth knowing about.\n\n'
    "Respond ONLY with valid JSON, no markdown fences or extra text."
)


# ------------------------------------------------------------------
# Core logic
# ------------------------------------------------------------------


[docs] async def summarise_channel( channel_id: str, platform: str, redis: Any, openrouter: Any, messages_limit: int = _MESSAGES_PER_CHANNEL, ) -> dict | None: """Summarise one channel's recent messages and cache the result in Redis. Builds a transcript from the channel's recent history, asks the LLM for a structured JSON summary (overview, key topics, active users, notable discussions, mood), enriches it with metadata, and stores it under a per-channel key for later retrieval by the summary tools and cross-channel query. This is the per-channel unit of work behind the broader summarisation pass. Loads messages via :func:`_fetch_channel_messages` (reading the ``channel_msgs:{platform}:{channel_id}`` ZSET in Redis), renders them into the canonical bot transcript format, truncates to roughly 120k characters, and calls ``openrouter.chat`` with ``_CHANNEL_SYSTEM_PROMPT`` and no tools. The reply is parsed by :func:`_parse_json_response` (falling back to raw text on parse failure), stamped with ``channel_id``, ``platform``, ``message_count`` and ``generated_at``, and written to ``{_SUMMARY_KEY_PREFIX}{channel_id}`` with a 7-day TTL. It also fires ``observability.publish_debug_event`` as a background task on both success and LLM failure. Called by :func:`summarise_all_active` (in batches and on retry); no other callers. Args: channel_id (str): Platform-specific channel identifier to summarise. platform (str): Platform the channel belongs to (e.g. ``"discord"``). redis (Any): Async Redis client used to read messages and store the summary. openrouter (Any): :class:`openrouter_client.OpenRouterClient` (flash model, no tools) used for the chat completion. messages_limit (int): Maximum recent messages to pull into the transcript; defaults to ``_MESSAGES_PER_CHANNEL``. Returns: dict | None: The stored summary dict, or ``None`` when the channel has no usable messages or the LLM call fails. """ messages = await _fetch_channel_messages( redis, platform, channel_id, messages_limit, ) if not messages: return None # Build a transcript using the canonical bot message format lines: list[str] = [] for m in messages: ts = m.get("timestamp", 0) dt = datetime.fromtimestamp(float(ts), tz=timezone.utc) ts_str = dt.isoformat() user_name = m.get("user_name", "?") user_id = m.get("user_id", "?") text = m.get("text", "") message_id = m.get("message_id", "") reply_to_id = m.get("reply_to_id", "") if not text.strip(): continue formatted = f"[{ts_str}] {user_name} ({user_id})" f" [Message ID: {message_id}]" if reply_to_id: formatted += f" [Replying to: {reply_to_id}]" formatted += f" : {text}" lines.append(formatted) if not lines: return None transcript = "\n".join(lines) # Cap at ~120k chars to stay within context limits if len(transcript) > 120_000: transcript = transcript[-120_000:] llm_messages = [ {"role": "system", "content": _CHANNEL_SYSTEM_PROMPT}, {"role": "user", "content": transcript}, ] t0 = time.monotonic() try: raw_response = await openrouter.chat(llm_messages, tool_names=[]) except Exception as e: logger.error( "Channel summarisation LLM call failed for %s: %s", channel_id, e, ) from observability import publish_debug_event asyncio.create_task( publish_debug_event( "channel_summary", "channel_summarizer", channel_id=channel_id, platform=platform, status="error", duration_ms=(time.monotonic() - t0) * 1000, preview=f"messages={len(messages)} error={str(e)[:100]}", payload={ "channel_id": channel_id, "platform": platform, "message_count": len(messages), "error": str(e), }, ), name="obs_chan_sum_err", ) return None # Parse the JSON response summary = _parse_json_response(raw_response) if summary is None: # Store the raw text as a fallback summary = {"overview": raw_response, "raw": True} from observability import publish_debug_event asyncio.create_task( publish_debug_event( "channel_summary", "channel_summarizer", channel_id=channel_id, platform=platform, status="ok", duration_ms=(time.monotonic() - t0) * 1000, llm_output=raw_response, preview=f"messages={len(messages)} overview='{str(summary.get('overview', ''))[:100]}'", payload={ "channel_id": channel_id, "platform": platform, "message_count": len(messages), "overview": summary.get("overview", ""), "key_topics": summary.get("key_topics", []), "active_users": summary.get("active_users", []), }, ), name="obs_chan_sum_ok", ) summary["channel_id"] = channel_id summary["platform"] = platform summary["message_count"] = len(messages) summary["generated_at"] = time.time() # Store in Redis key = f"{_SUMMARY_KEY_PREFIX}{channel_id}" try: await redis.set( key, json.dumps(summary, default=str), ex=_SUMMARY_TTL, ) logger.info( "Stored summary for channel %s (%d messages)", channel_id, len(messages), ) except Exception: logger.debug("Failed to store channel summary", exc_info=True) return summary
[docs] async def summarise_all_active( redis: Any, max_channels: int = _MAX_CHANNELS, ) -> dict[str, Any]: """Summarise the most recently active channels and build a meta-summary. The top-level orchestration entry point: it discovers the busiest channels, summarises them concurrently in small batches with a one-shot retry, then rolls the per-channel overviews up into a single cross-channel meta-summary cached in Redis. This keeps both the per-channel and server-wide summaries used in retrieval and context assembly fresh. Constructs its own dedicated :class:`openrouter_client.OpenRouterClient` on ``_SUMMARY_MODEL`` (gemini-3-flash-preview, no tools) from ``Config.load()``, finds channels via :func:`message_cache.get_active_channels` (which scans the ``channel_msgs:*`` ZSETs), and fans each out to :func:`summarise_channel` in batches of four under :func:`asyncio.gather`, retrying any that raised once. When at least one channel succeeds it sends a combined overview through the same client with ``_META_SYSTEM_PROMPT`` and writes the parsed meta-summary to ``_META_KEY`` with a 7-day TTL. Closes the client in a ``finally``. Called by the inference/agents service's ``channel_summarization`` task in ``background_tasks.py`` (the only caller), which logs the processed count. Args: redis (Any): Async Redis client for channel discovery and storage. max_channels (int): Cap on how many active channels to process; defaults to ``_MAX_CHANNELS``. Returns: dict[str, Any]: A result dict with ``channels_processed`` (int) and ``summaries`` (a mapping of channel id to a truncated overview string). """ # Build a dedicated client using gemini-3-flash-preview from openrouter_client import OpenRouterClient from tools import ToolRegistry from config import Config cfg = Config.load() summary_client = OpenRouterClient( api_key=cfg.api_key, model=_SUMMARY_MODEL, base_url=cfg.llm_base_url, tool_registry=ToolRegistry(), # no tools max_tool_rounds=1, top_p=cfg.top_p, http_connect_timeout=cfg.openrouter_http_connect_timeout_seconds, http_read_timeout=cfg.openrouter_http_read_timeout_seconds, http_write_timeout=cfg.openrouter_http_write_timeout_seconds, http_pool_timeout=cfg.openrouter_http_pool_timeout_seconds, ) try: from message_cache import get_active_channels channels = await get_active_channels(redis, max_channels) if not channels: logger.info("No active channels found for summarisation") return {"channels_processed": 0, "summaries": {}} logger.info( "Channel summarisation starting for %d channel(s)", len(channels), ) summaries: dict[str, dict] = {} failed: list[tuple[str, str]] = [] # (platform, channel_id) _BATCH_SIZE = 4 for i in range(0, len(channels), _BATCH_SIZE): batch = channels[i : i + _BATCH_SIZE] results = await asyncio.gather( *( summarise_channel(cid, plat, redis, summary_client) for plat, cid in batch ), return_exceptions=True, ) for (plat, cid), result in zip(batch, results): if isinstance(result, Exception): logger.warning( "Channel summarisation failed for %s:%s (will retry): %s", plat, cid, result, ) failed.append((plat, cid)) elif result: summaries[cid] = result # Retry failed channels once if failed: logger.info("Retrying %d failed channel summarisation(s)", len(failed)) retry_results = await asyncio.gather( *( summarise_channel(cid, plat, redis, summary_client) for plat, cid in failed ), return_exceptions=True, ) for (plat, cid), result in zip(failed, retry_results): if isinstance(result, Exception): logger.error( "Channel summarisation retry also failed for %s:%s: %s", plat, cid, result, ) elif result: summaries[cid] = result # Build a meta-summary across all channels if summaries: combined_parts: list[str] = [] for cid, s in summaries.items(): overview = s.get("overview", "") topics = ", ".join(s.get("key_topics", [])) combined_parts.append( f"Channel {cid} ({s.get('platform', '?')}): " f"{overview} Topics: {topics}" ) combined = "\n\n".join(combined_parts) meta_msgs = [ {"role": "system", "content": _META_SYSTEM_PROMPT}, {"role": "user", "content": combined[:60_000]}, ] try: meta_raw = await summary_client.chat(meta_msgs, tool_names=[]) meta = _parse_json_response(meta_raw) if meta is None: meta = {"overview": meta_raw} meta["generated_at"] = time.time() meta["channels_included"] = list(summaries.keys()) await redis.set( _META_KEY, json.dumps(meta, default=str), ex=_SUMMARY_TTL, ) logger.info( "Stored meta-summary across %d channels", len(summaries), ) except Exception: logger.debug("Meta-summary failed", exc_info=True) logger.info( "Channel summarisation complete: %d/%d channels processed", len(summaries), len(channels), ) return { "channels_processed": len(summaries), "summaries": { cid: s.get("overview", "")[:200] for cid, s in summaries.items() }, } finally: await summary_client.close()
# ------------------------------------------------------------------ # Helpers # ------------------------------------------------------------------ def _parse_json_response(text: str) -> dict | None: """Best-effort parse of a JSON object out of a raw LLM response. Tolerates the common ways a model wraps its JSON — surrounding whitespace and Markdown code fences — so callers can request "JSON only" yet still cope when the model adds fences anyway. Returns ``None`` on any parse failure or when the decoded value is not a JSON object, letting callers fall back to storing the raw text. Pure and side-effect free. Strips a leading triple-backtick fence and its trailing close if present, then runs ``jsonutil.loads``. Called by :func:`summarise_channel` (on the per-channel reply) and :func:`summarise_all_active` (on the meta-summary reply); no other callers. Args: text (str): The raw model output to parse. Returns: dict | None: The decoded object, or ``None`` if the text is not a valid JSON object. """ text = text.strip() # Strip markdown fences if present if text.startswith("```"): first_nl = text.index("\n") if "\n" in text else 3 text = text[first_nl + 1 :] if text.endswith("```"): text = text[:-3].strip() try: result = json.loads(text) if isinstance(result, dict): return result except json.JSONDecodeError: pass return None async def _fetch_channel_messages( redis: Any, platform: str, channel_id: str, limit: int, ) -> list[dict[str, Any]]: """Read a channel's recent messages from Redis in chronological order. Pulls up to ``limit`` of the newest messages for a channel and returns them oldest-first so the caller can lay out a readable transcript. Resilient by design: any Redis error is logged at debug level and yields an empty list rather than raising, so a single bad channel cannot abort a summarisation batch. Reads the newest member keys from the ``channel_msgs:{platform}:{channel_id}`` ZSET via ``ZREVRANGE``, then pipelines an ``HMGET`` per key to fetch the ``user_name``, ``text``, ``timestamp``, ``user_id``, ``message_id`` and ``reply_to_id`` fields, skips all-empty rows, and reverses the result into chronological order. Called by :func:`summarise_channel` (the only caller in this module; a same-named but differently-signed helper in ``tools/chat_analytics.py`` is unrelated). Args: redis (Any): Async Redis client used for the ZSET and hash reads. platform (str): Platform the channel belongs to. channel_id (str): Platform-specific channel identifier. limit (int): Maximum number of recent messages to fetch. Returns: list[dict[str, Any]]: Per-message field dicts in chronological order (oldest first); empty if there are no messages or a Redis error occurs. """ zset_key = f"channel_msgs:{platform}:{channel_id}" try: # Get the most recent message keys msg_keys: list[str] = await redis.zrevrange(zset_key, 0, limit - 1) if not msg_keys: return [] # Pipeline HMGET for the fields we need fields = ( "user_name", "text", "timestamp", "user_id", "message_id", "reply_to_id", ) pipe = redis.pipeline() for key in msg_keys: pipe.hmget(key, *fields) results = await pipe.execute() messages: list[dict[str, Any]] = [] for values in results: if not values or all(v is None for v in values): continue mapping = dict(zip(fields, values)) messages.append(mapping) # Reverse to chronological order (oldest first) messages.reverse() return messages except Exception: logger.debug( "Failed to fetch messages for channel %s:%s", platform, channel_id, exc_info=True, ) return [] # _get_active_channels moved to message_cache.get_active_channels