kg_bulk_runner

Shared bulk agentic KG extraction: Redis message collect, chunking, LLM run.

Used by scripts.kg_bulk_dump_and_extract and the scheduled incremental task in background_tasks.

kg_bulk_runner.cursor_field(platform, channel_id)[source]

Build the per-channel field name used inside the incremental cursor hash.

Composes the {platform}:{channel_id} key under which the last-processed timestamp for a channel is stored in KG_AGENTIC_BULK_LAST_TS_HASH. Note that channel_id may itself contain colons, so this is a left-anchored composition rather than a reversible encoding.

This is called by cursor_hget(), cursor_hset(), and bootstrap_latest_cursor_no_extract() to address the cursor hash, and is exercised directly by tests/test_kg_bulk_cursors.py.

Parameters:
  • platform (str) – Platform identifier (e.g. "discord", "discord-self").

  • channel_id (str) – Channel identifier within that platform.

Returns:

The "{platform}:{channel_id}" hash field name.

Return type:

str

kg_bulk_runner.redis_ssl_kwargs_for_bulk(cfg, *, redis_no_verify)[source]

Build Redis SSL connection kwargs for the bulk KG pipeline.

Starts from the project-wide TLS settings produced by Config.redis_ssl_kwargs() and, only when redis_no_verify is set and the URL uses the rediss:// scheme, downgrades certificate verification so that self-signed or hostname-mismatched broker certificates are accepted. This is intended for managed/ephemeral bulk runs, not the long-lived services.

The returned mapping is fed to message_cache.MessageCache so its Redis client can connect. This is called from run_agentic_bulk_pipeline() (building the cache2 client) and from the standalone scripts/kg_bulk_dump_and_extract.py entry point.

Parameters:
  • cfg (Config) – Loaded config.Config providing redis_url and base SSL kwargs.

  • redis_no_verify (bool) – When true, disable cert/hostname verification for rediss:// URLs (sets ssl_cert_reqs=CERT_NONE and ssl_check_hostname=False).

Returns:

SSL keyword arguments suitable for a redis-py async client.

Return type:

dict[str, Any]

kg_bulk_runner.format_llm_style_line(m)[source]

Render one message dict as a single human/LLM-readable transcript line.

Produces a deterministic line carrying the UTC timestamp, platform/channel, speaker name and id, message id, optional reply-to id, and text, e.g. [<iso>] [platform:discord channel:123] Alice (42) [Message ID: 9] : hi. Missing fields degrade gracefully (empty strings, "?" for an unknown name) and an unparseable timestamp falls back to the Unix epoch. These lines are what the bulk extractor feeds to the LLM and what the token packer measures.

This is a pure formatter with no I/O. It is called from run_agentic_bulk_pipeline() to populate the llm_style_line field of each messages.jsonl record and to build the flat lines list (and the per-channel grouped lines) that chunk_message_lines() packs.

Parameters:

m (dict[str, Any]) – A message mapping using the fields in _HASH_MSG_FIELDS (timestamp, platform, channel_id, user_id, user_name, message_id, reply_to_id, text).

Returns:

The formatted single-line representation of the message.

Return type:

str

async kg_bulk_runner.scan_channel_zset_keys(redis)[source]

Scan Redis for all per-channel message-index sorted-set keys.

Iterates the keyspace with SCAN MATCH channel_msgs:* COUNT 500, decodes any bytes keys, and returns them sorted for deterministic processing order. Each matched key is a sorted set mapping message keys to timestamp scores for one (platform, channel).

This is called by collect_messages_from_redis() when no explicit channel_pairs filter is supplied, to discover every channel to ingest.

Parameters:

redis (Any) – Async redis-py client.

Returns:

Sorted list of channel_msgs:{platform}:{channel_id} keys.

Return type:

list[str]

async kg_bulk_runner.cursor_hget(redis, platform, channel_id)[source]

Read the last-processed timestamp cursor for one channel.

Performs HGET on KG_AGENTIC_BULK_LAST_TS_HASH using the field from cursor_field(), decoding bytes and coercing to float. Returns None when no cursor exists yet or the stored value cannot be parsed, which the caller treats as “process the full backlog”.

This is called from collect_messages_from_redis() during incremental runs to determine the exclusive lower-bound timestamp for each channel.

Parameters:
  • redis (Any) – Async redis-py client.

  • platform (str) – Platform identifier for the channel.

  • channel_id (str) – Channel identifier.

Returns:

The stored cursor timestamp, or None if unset/unparsable.

Return type:

float | None

async kg_bulk_runner.cursor_hset(redis, platform, channel_id, ts)[source]

Persist the last-processed timestamp cursor for one channel.

Writes str(ts) via HSET into KG_AGENTIC_BULK_LAST_TS_HASH under the cursor_field() field, advancing the incremental watermark so future runs skip already-extracted messages.

This is called from run_agentic_bulk_pipeline() after a chunk extracts cleanly (no errors and no parse error), once per channel using that chunk’s per-channel maximum timestamp.

Parameters:
  • redis (Any) – Async redis-py client.

  • platform (str) – Platform identifier for the channel.

  • channel_id (str) – Channel identifier.

  • ts (float) – New cursor timestamp (epoch seconds) to store.

Return type:

None

async kg_bulk_runner.bootstrap_latest_cursor_no_extract(redis, zset_key, platform, channel_id)[source]

Seed a missing incremental cursor to the channel’s current high-water mark.

Implements the cursor_bootstrap="latest" behavior: when a channel has no stored cursor yet, this sets it to the newest message timestamp currently in the zset so the first incremental run skips the entire historical backlog and only ever extracts messages that arrive afterward. If a cursor already exists, it is a no-op, preserving any prior watermark.

It reads the top zset score with ZREVRANGE on the channel_msgs:{platform}:{channel_id} key, then writes the cursor with HSET into KG_AGENTIC_BULK_LAST_TS_HASH under the cursor_field() field, and logs the bootstrap. It is called by collect_messages_from_redis() for each channel when an incremental run requests cursor_bootstrap="latest".

Parameters:
  • redis (Any) – Async redis-py client.

  • zset_key (str) – The channel_msgs:{platform}:{channel_id} sorted-set key.

  • platform (str) – Platform identifier for the channel.

  • channel_id (str) – Channel identifier.

Return type:

None

async kg_bulk_runner.fetch_messages_for_zset(redis, zset_key)[source]

Fetch and hydrate every message recorded in one channel’s zset.

Pages through the entire sorted set in _ZSET_BATCH-sized ZRANGE slices, gathering the member message keys and hydrating each into a full message dict via _hmget_message_dicts(). Returns the union across all pages (ordering is not guaranteed here; the top-level caller re-sorts).

This is called by fetch_messages_for_zset_after() when there is no lower-bound timestamp, and directly by collect_messages_from_redis() for non-incremental (full backlog) channels.

Parameters:
  • redis (Any) – Async redis-py client.

  • zset_key (str) – The channel_msgs:{platform}:{channel_id} sorted-set key.

Returns:

Hydrated message dicts for the whole channel.

Return type:

list[dict[str, Any]]

async kg_bulk_runner.fetch_messages_for_zset_after(redis, zset_key, after_ts_exclusive)[source]

Fetch and hydrate channel messages newer than a cursor timestamp.

When after_ts_exclusive is None this delegates to fetch_messages_for_zset() (full backlog). Otherwise it pages through the zset with ZRANGEBYSCORE using the exclusive lower bound (ts up to +inf in _ZSET_BATCH-sized windows, hydrating each page through _hmget_message_dicts(). This is the incremental read path that honors the per-channel cursor.

This is called by collect_messages_from_redis() for incremental runs on channels that already have a stored cursor.

Parameters:
  • redis (Any) – Async redis-py client.

  • zset_key (str) – The channel_msgs:{platform}:{channel_id} sorted-set key.

  • after_ts_exclusive (float | None) – Exclusive lower-bound timestamp; None means no bound (return everything).

Returns:

Hydrated message dicts strictly newer than the bound.

Return type:

list[dict[str, Any]]

async kg_bulk_runner.collect_messages_from_redis(redis, *, incremental=False, cursor_bootstrap='full', channel_pairs=None)[source]

Load messages from Redis channel zsets.

channel_pairs: if set, only these (platform, channel_id) zsets; otherwise all channel_msgs:* keys.

When incremental is true, uses KG_AGENTIC_BULK_LAST_TS_HASH per channel with exclusive lower bound. cursor_bootstrap latest seeds missing cursors to the current zset max without returning backlog messages.

Return type:

tuple[list[dict[str, Any]], int]

Parameters:
async kg_bulk_runner.token_count_conversation(counter, conversation_text, *, channel_scope, reserve, config=None, chunk_channel_pairs=None, chunk_speaker_pairs=None, speaker_kg_prefetch='', channel_metadata=None)[source]

Estimate the input-token cost of a candidate conversation chunk.

Reconstructs the full extraction prompt for conversation_text via kg_agentic_extraction.messages_for_agentic_token_estimate() (including channel/speaker pairs, speaker-KG prefetch placeholder, and channel metadata), then asks the bulk LLM client to count its input tokens. If the client cannot count, it falls back to a rough len(text) // 3 heuristic. The configured reserve (headroom for tool rounds and output) is always added.

This makes a network/SDK call through counter.count_input_tokens and is called from the binary-search inner loop of chunk_message_lines() to decide how many lines fit within a chunk’s token budget.

Parameters:
  • counter (KgBulkLlmClient) – Bulk LLM client used purely for token counting.

  • conversation_text (str) – The joined transcript lines for the candidate chunk.

  • channel_scope (str) – Scope label for the prompt (a channel id or CROSS_CHANNEL_SCOPE).

  • reserve (int) – Token headroom always added to the estimate.

  • config (Config | None) – Optional config.Config influencing prompt assembly.

  • chunk_channel_pairs (list[tuple[str, str]] | None) – Channels represented in the chunk.

  • chunk_speaker_pairs (list[tuple[str, str]] | None) – Speakers represented in the chunk.

  • speaker_kg_prefetch (str) – Placeholder/real speaker-KG prefetch text to size.

  • channel_metadata (dict[str, dict[str, str]] | None) – Per-channel metadata subset for the chunk.

Returns:

Estimated total input tokens including reserve.

Return type:

int

async kg_bulk_runner.chunk_message_lines(lines, line_channel_keys, line_speaker_keys, line_timestamps, counter, *, max_tokens, channel_scope, reserve, config=None, speaker_prefetch_placeholder='', channel_metadata=None)[source]

Greedily pack transcript lines into token-budget extraction chunks.

Walks the line list in order and, for each chunk, binary-searches the largest contiguous run of lines whose reconstructed extraction prompt still fits under max_tokens. Because lines stay contiguous and in timestamp order, each chunk carries clean per-channel time boundaries, which lets the incremental pipeline advance cursors safely only after a chunk extracts without error. Always emits at least one line per chunk so an oversized single line cannot stall the loop.

Each candidate slice is priced by token_count_conversation() (which calls the bulk LLM client’s token counter), summarized via _unique_channel_pairs() and _unique_speaker_pairs(), and narrowed with _subset_channel_metadata(); the chosen slice also gets a per-channel max-timestamp map from _max_ts_per_channel() for cursor advancement. This is called from run_agentic_bulk_pipeline() (once for the cross-channel path, once per channel for the per-channel path) and is exercised by tests/test_kg_bulk_cursors.py.

Parameters:
  • lines (list[str]) – Formatted transcript lines to pack, in chronological order.

  • line_channel_keys (list[tuple[str, str]]) – Per-line (platform, channel_id) keys, aligned with lines.

  • line_speaker_keys (list[tuple[str, str]]) – Per-line (user_id, user_name) keys, aligned with lines.

  • line_timestamps (list[float]) – Per-line epoch-second timestamps, aligned with lines.

  • counter (KgBulkLlmClient) – Bulk LLM client used only for token counting.

  • max_tokens (int) – Maximum estimated input tokens allowed per chunk.

  • channel_scope (str) – Scope label passed into prompt assembly (a channel id or CROSS_CHANNEL_SCOPE).

  • reserve (int) – Token headroom added to every estimate.

  • config (Config | None) – Optional config.Config influencing prompt assembly.

  • speaker_prefetch_placeholder (str) – Filler text sizing the speaker-KG prefetch section during estimation.

  • channel_metadata (dict[str, dict[str, str]] | None) – Full per-channel metadata map, subset per candidate.

Return type:

list[tuple[list[str], list[tuple[str, str]], list[tuple[str, str]], float, float, dict[tuple[str, str], float]]]

Returns:

A list of chunk tuples, each (lines, channel_pairs, speaker_pairs, ts_lo, ts_hi, cursor_map): the chunk’s lines, its distinct channel and speaker pairs, its low/high timestamps, and the per-channel max-timestamp cursor map.

Raises:

ValueError – If line_channel_keys, line_speaker_keys, or line_timestamps do not all match the length of lines.

class kg_bulk_runner.KgBulkPipelineParams(out_dir, dump_only=False, dry_run_chunks=False, dry_run_llm=False, chunk_tokens=250000, token_reserve=100000, max_messages=0, chunks_max=0, per_channel=False, resume_from_chunk=0, max_tool_rounds=48, bulk_llm_backend='gemini', fetch_channel_metadata=False, channel_metadata_ttl_days=7.0, discord_platform_type='discord', prefetch_speaker_kg=False, prefetch_max_speakers=8, prefetch_hits_per_speaker=3, prefetch_max_chars=400000, prefetch_min_score=0.0, redis_no_verify=True, incremental=False)[source]

Bases: object

Bundle of tuning knobs for one bulk agentic KG extraction run.

Carries everything run_agentic_bulk_pipeline() needs that is not the runtime state (config, redis, messages): the output directory, dry-run and dump-only switches, token budgets (chunk size and reserve), chunking mode (per-channel vs cross-channel), resume/limit bounds, LLM backend selection, channel-metadata fetching, speaker-KG prefetch sizing, and the incremental cursor flag. Instances are constructed by scripts/kg_bulk_dump_and_extract.py (from parsed CLI args) and by background_tasks.py (for the scheduled incremental run), then passed straight into run_agentic_bulk_pipeline().

Parameters:
  • out_dir (Path)

  • dump_only (bool)

  • dry_run_chunks (bool)

  • dry_run_llm (bool)

  • chunk_tokens (int)

  • token_reserve (int)

  • max_messages (int)

  • chunks_max (int)

  • per_channel (bool)

  • resume_from_chunk (int)

  • max_tool_rounds (int)

  • bulk_llm_backend (str)

  • fetch_channel_metadata (bool)

  • channel_metadata_ttl_days (float)

  • discord_platform_type (str)

  • prefetch_speaker_kg (bool)

  • prefetch_max_speakers (int)

  • prefetch_hits_per_speaker (int)

  • prefetch_max_chars (int)

  • prefetch_min_score (float)

  • redis_no_verify (bool)

  • incremental (bool)

out_dir: Path
dump_only: bool = False
dry_run_chunks: bool = False
dry_run_llm: bool = False
chunk_tokens: int = 250000
token_reserve: int = 100000
max_messages: int = 0
chunks_max: int = 0
per_channel: bool = False
resume_from_chunk: int = 0
max_tool_rounds: int = 48
bulk_llm_backend: str = 'gemini'
fetch_channel_metadata: bool = False
channel_metadata_ttl_days: float = 7.0
discord_platform_type: str = 'discord'
prefetch_speaker_kg: bool = False
prefetch_max_speakers: int = 8
prefetch_hits_per_speaker: int = 3
prefetch_max_chars: int = 400000
prefetch_min_score: float = 0.0
redis_no_verify: bool = True
incremental: bool = False
async kg_bulk_runner.run_agentic_bulk_pipeline(cfg, redis, messages, n_zsets_scanned, params, *, load_channel_metadata=None)[source]

Chunk (optional disk artifacts), run agentic KG extraction, update cursors.

load_channel_metadata: optional async callable returning Discord (or other) channel metadata dict; only invoked when params.fetch_channel_metadata.

After each successful extraction chunk, updates incremental cursors per channel when params.incremental is true.

Return type:

None

Parameters:
kg_bulk_runner.resolve_bulk_backend(cli_value)[source]

Resolve which LLM backend the bulk pipeline should use.

Applies the precedence CLI argument, then the KG_BULK_LLM_BACKEND environment variable, then the "gemini" default, normalizing the result to lowercase. The chosen value selects between the Gemini pool and OpenRouter clients in run_agentic_bulk_pipeline().

This reads os.environ and is called by scripts/kg_bulk_dump_and_extract.py (passing the parsed --bulk-llm-backend arg) and by background_tasks.py (passing None, so it falls back to env/default for the scheduled incremental run).

Parameters:

cli_value (str | None) – Explicit backend from the CLI, or None to defer to env/default.

Returns:

The lowercased backend name (e.g. "gemini" or "openrouter").

Return type:

str