kg_bulk_runner
Shared bulk agentic KG extraction: Redis message collect, chunking, LLM run.
Used by scripts.kg_bulk_dump_and_extract and the scheduled incremental
task in background_tasks.
- kg_bulk_runner.cursor_field(platform, channel_id)[source]
Build the per-channel field name used inside the incremental cursor hash.
Composes the
{platform}:{channel_id}key under which the last-processed timestamp for a channel is stored inKG_AGENTIC_BULK_LAST_TS_HASH. Note that channel_id may itself contain colons, so this is a left-anchored composition rather than a reversible encoding.This is called by
cursor_hget(),cursor_hset(), andbootstrap_latest_cursor_no_extract()to address the cursor hash, and is exercised directly bytests/test_kg_bulk_cursors.py.
- kg_bulk_runner.redis_ssl_kwargs_for_bulk(cfg, *, redis_no_verify)[source]
Build Redis SSL connection kwargs for the bulk KG pipeline.
Starts from the project-wide TLS settings produced by
Config.redis_ssl_kwargs()and, only when redis_no_verify is set and the URL uses therediss://scheme, downgrades certificate verification so that self-signed or hostname-mismatched broker certificates are accepted. This is intended for managed/ephemeral bulk runs, not the long-lived services.The returned mapping is fed to
message_cache.MessageCacheso its Redis client can connect. This is called fromrun_agentic_bulk_pipeline()(building thecache2client) and from the standalonescripts/kg_bulk_dump_and_extract.pyentry point.- Parameters:
cfg (
Config) – Loadedconfig.Configprovidingredis_urland base SSL kwargs.redis_no_verify (
bool) – When true, disable cert/hostname verification forrediss://URLs (setsssl_cert_reqs=CERT_NONEandssl_check_hostname=False).
- Returns:
SSL keyword arguments suitable for a redis-py async client.
- Return type:
- kg_bulk_runner.format_llm_style_line(m)[source]
Render one message dict as a single human/LLM-readable transcript line.
Produces a deterministic line carrying the UTC timestamp, platform/channel, speaker name and id, message id, optional reply-to id, and text, e.g.
[<iso>] [platform:discord channel:123] Alice (42) [Message ID: 9] : hi. Missing fields degrade gracefully (empty strings,"?"for an unknown name) and an unparseable timestamp falls back to the Unix epoch. These lines are what the bulk extractor feeds to the LLM and what the token packer measures.This is a pure formatter with no I/O. It is called from
run_agentic_bulk_pipeline()to populate thellm_style_linefield of eachmessages.jsonlrecord and to build the flatlineslist (and the per-channel grouped lines) thatchunk_message_lines()packs.
- async kg_bulk_runner.scan_channel_zset_keys(redis)[source]
Scan Redis for all per-channel message-index sorted-set keys.
Iterates the keyspace with
SCAN MATCH channel_msgs:* COUNT 500, decodes any bytes keys, and returns them sorted for deterministic processing order. Each matched key is a sorted set mapping message keys to timestamp scores for one(platform, channel).This is called by
collect_messages_from_redis()when no explicit channel_pairs filter is supplied, to discover every channel to ingest.
- async kg_bulk_runner.cursor_hget(redis, platform, channel_id)[source]
Read the last-processed timestamp cursor for one channel.
Performs
HGETonKG_AGENTIC_BULK_LAST_TS_HASHusing the field fromcursor_field(), decoding bytes and coercing tofloat. ReturnsNonewhen no cursor exists yet or the stored value cannot be parsed, which the caller treats as “process the full backlog”.This is called from
collect_messages_from_redis()during incremental runs to determine the exclusive lower-bound timestamp for each channel.
- async kg_bulk_runner.cursor_hset(redis, platform, channel_id, ts)[source]
Persist the last-processed timestamp cursor for one channel.
Writes
str(ts)viaHSETintoKG_AGENTIC_BULK_LAST_TS_HASHunder thecursor_field()field, advancing the incremental watermark so future runs skip already-extracted messages.This is called from
run_agentic_bulk_pipeline()after a chunk extracts cleanly (no errors and no parse error), once per channel using that chunk’s per-channel maximum timestamp.
- async kg_bulk_runner.bootstrap_latest_cursor_no_extract(redis, zset_key, platform, channel_id)[source]
Seed a missing incremental cursor to the channel’s current high-water mark.
Implements the
cursor_bootstrap="latest"behavior: when a channel has no stored cursor yet, this sets it to the newest message timestamp currently in the zset so the first incremental run skips the entire historical backlog and only ever extracts messages that arrive afterward. If a cursor already exists, it is a no-op, preserving any prior watermark.It reads the top zset score with
ZREVRANGEon thechannel_msgs:{platform}:{channel_id}key, then writes the cursor withHSETintoKG_AGENTIC_BULK_LAST_TS_HASHunder thecursor_field()field, and logs the bootstrap. It is called bycollect_messages_from_redis()for each channel when an incremental run requestscursor_bootstrap="latest".
- async kg_bulk_runner.fetch_messages_for_zset(redis, zset_key)[source]
Fetch and hydrate every message recorded in one channel’s zset.
Pages through the entire sorted set in
_ZSET_BATCH-sizedZRANGEslices, gathering the member message keys and hydrating each into a full message dict via_hmget_message_dicts(). Returns the union across all pages (ordering is not guaranteed here; the top-level caller re-sorts).This is called by
fetch_messages_for_zset_after()when there is no lower-bound timestamp, and directly bycollect_messages_from_redis()for non-incremental (full backlog) channels.
- async kg_bulk_runner.fetch_messages_for_zset_after(redis, zset_key, after_ts_exclusive)[source]
Fetch and hydrate channel messages newer than a cursor timestamp.
When after_ts_exclusive is
Nonethis delegates tofetch_messages_for_zset()(full backlog). Otherwise it pages through the zset withZRANGEBYSCOREusing the exclusive lower bound(tsup to+infin_ZSET_BATCH-sized windows, hydrating each page through_hmget_message_dicts(). This is the incremental read path that honors the per-channel cursor.This is called by
collect_messages_from_redis()for incremental runs on channels that already have a stored cursor.- Parameters:
- Returns:
Hydrated message dicts strictly newer than the bound.
- Return type:
- async kg_bulk_runner.collect_messages_from_redis(redis, *, incremental=False, cursor_bootstrap='full', channel_pairs=None)[source]
Load messages from Redis channel zsets.
channel_pairs: if set, only these
(platform, channel_id)zsets; otherwise allchannel_msgs:*keys.When incremental is true, uses
KG_AGENTIC_BULK_LAST_TS_HASHper channel with exclusive lower bound. cursor_bootstraplatestseeds missing cursors to the current zset max without returning backlog messages.
- async kg_bulk_runner.token_count_conversation(counter, conversation_text, *, channel_scope, reserve, config=None, chunk_channel_pairs=None, chunk_speaker_pairs=None, speaker_kg_prefetch='', channel_metadata=None)[source]
Estimate the input-token cost of a candidate conversation chunk.
Reconstructs the full extraction prompt for conversation_text via
kg_agentic_extraction.messages_for_agentic_token_estimate()(including channel/speaker pairs, speaker-KG prefetch placeholder, and channel metadata), then asks the bulk LLM client to count its input tokens. If the client cannot count, it falls back to a roughlen(text) // 3heuristic. The configured reserve (headroom for tool rounds and output) is always added.This makes a network/SDK call through
counter.count_input_tokensand is called from the binary-search inner loop ofchunk_message_lines()to decide how many lines fit within a chunk’s token budget.- Parameters:
counter (
KgBulkLlmClient) – Bulk LLM client used purely for token counting.conversation_text (
str) – The joined transcript lines for the candidate chunk.channel_scope (
str) – Scope label for the prompt (a channel id orCROSS_CHANNEL_SCOPE).reserve (
int) – Token headroom always added to the estimate.config (
Config|None) – Optionalconfig.Configinfluencing prompt assembly.chunk_channel_pairs (
list[tuple[str,str]] |None) – Channels represented in the chunk.chunk_speaker_pairs (
list[tuple[str,str]] |None) – Speakers represented in the chunk.speaker_kg_prefetch (
str) – Placeholder/real speaker-KG prefetch text to size.channel_metadata (
dict[str,dict[str,str]] |None) – Per-channel metadata subset for the chunk.
- Returns:
Estimated total input tokens including reserve.
- Return type:
- async kg_bulk_runner.chunk_message_lines(lines, line_channel_keys, line_speaker_keys, line_timestamps, counter, *, max_tokens, channel_scope, reserve, config=None, speaker_prefetch_placeholder='', channel_metadata=None)[source]
Greedily pack transcript lines into token-budget extraction chunks.
Walks the line list in order and, for each chunk, binary-searches the largest contiguous run of lines whose reconstructed extraction prompt still fits under max_tokens. Because lines stay contiguous and in timestamp order, each chunk carries clean per-channel time boundaries, which lets the incremental pipeline advance cursors safely only after a chunk extracts without error. Always emits at least one line per chunk so an oversized single line cannot stall the loop.
Each candidate slice is priced by
token_count_conversation()(which calls the bulk LLM client’s token counter), summarized via_unique_channel_pairs()and_unique_speaker_pairs(), and narrowed with_subset_channel_metadata(); the chosen slice also gets a per-channel max-timestamp map from_max_ts_per_channel()for cursor advancement. This is called fromrun_agentic_bulk_pipeline()(once for the cross-channel path, once per channel for the per-channel path) and is exercised bytests/test_kg_bulk_cursors.py.- Parameters:
lines (
list[str]) – Formatted transcript lines to pack, in chronological order.line_channel_keys (
list[tuple[str,str]]) – Per-line(platform, channel_id)keys, aligned with lines.line_speaker_keys (
list[tuple[str,str]]) – Per-line(user_id, user_name)keys, aligned with lines.line_timestamps (
list[float]) – Per-line epoch-second timestamps, aligned with lines.counter (
KgBulkLlmClient) – Bulk LLM client used only for token counting.max_tokens (
int) – Maximum estimated input tokens allowed per chunk.channel_scope (
str) – Scope label passed into prompt assembly (a channel id orCROSS_CHANNEL_SCOPE).reserve (
int) – Token headroom added to every estimate.config (
Config|None) – Optionalconfig.Configinfluencing prompt assembly.speaker_prefetch_placeholder (
str) – Filler text sizing the speaker-KG prefetch section during estimation.channel_metadata (
dict[str,dict[str,str]] |None) – Full per-channel metadata map, subset per candidate.
- Return type:
list[tuple[list[str],list[tuple[str,str]],list[tuple[str,str]],float,float,dict[tuple[str,str],float]]]- Returns:
A list of chunk tuples, each
(lines, channel_pairs, speaker_pairs, ts_lo, ts_hi, cursor_map): the chunk’s lines, its distinct channel and speaker pairs, its low/high timestamps, and the per-channel max-timestamp cursor map.- Raises:
ValueError – If line_channel_keys, line_speaker_keys, or line_timestamps do not all match the length of lines.
- class kg_bulk_runner.KgBulkPipelineParams(out_dir, dump_only=False, dry_run_chunks=False, dry_run_llm=False, chunk_tokens=250000, token_reserve=100000, max_messages=0, chunks_max=0, per_channel=False, resume_from_chunk=0, max_tool_rounds=48, bulk_llm_backend='gemini', fetch_channel_metadata=False, channel_metadata_ttl_days=7.0, discord_platform_type='discord', prefetch_speaker_kg=False, prefetch_max_speakers=8, prefetch_hits_per_speaker=3, prefetch_max_chars=400000, prefetch_min_score=0.0, redis_no_verify=True, incremental=False)[source]
Bases:
objectBundle of tuning knobs for one bulk agentic KG extraction run.
Carries everything
run_agentic_bulk_pipeline()needs that is not the runtime state (config, redis, messages): the output directory, dry-run and dump-only switches, token budgets (chunk size and reserve), chunking mode (per-channel vs cross-channel), resume/limit bounds, LLM backend selection, channel-metadata fetching, speaker-KG prefetch sizing, and the incremental cursor flag. Instances are constructed byscripts/kg_bulk_dump_and_extract.py(from parsed CLI args) and bybackground_tasks.py(for the scheduled incremental run), then passed straight intorun_agentic_bulk_pipeline().- Parameters:
out_dir (Path)
dump_only (bool)
dry_run_chunks (bool)
dry_run_llm (bool)
chunk_tokens (int)
token_reserve (int)
max_messages (int)
chunks_max (int)
per_channel (bool)
resume_from_chunk (int)
max_tool_rounds (int)
bulk_llm_backend (str)
fetch_channel_metadata (bool)
channel_metadata_ttl_days (float)
discord_platform_type (str)
prefetch_speaker_kg (bool)
prefetch_max_speakers (int)
prefetch_hits_per_speaker (int)
prefetch_max_chars (int)
prefetch_min_score (float)
redis_no_verify (bool)
incremental (bool)
- async kg_bulk_runner.run_agentic_bulk_pipeline(cfg, redis, messages, n_zsets_scanned, params, *, load_channel_metadata=None)[source]
Chunk (optional disk artifacts), run agentic KG extraction, update cursors.
load_channel_metadata: optional async callable returning Discord (or other) channel metadata dict; only invoked when params.fetch_channel_metadata.
After each successful extraction chunk, updates incremental cursors per channel when params.incremental is true.
- kg_bulk_runner.resolve_bulk_backend(cli_value)[source]
Resolve which LLM backend the bulk pipeline should use.
Applies the precedence CLI argument, then the
KG_BULK_LLM_BACKENDenvironment variable, then the"gemini"default, normalizing the result to lowercase. The chosen value selects between the Gemini pool and OpenRouter clients inrun_agentic_bulk_pipeline().This reads
os.environand is called byscripts/kg_bulk_dump_and_extract.py(passing the parsed--bulk-llm-backendarg) and bybackground_tasks.py(passingNone, so it falls back to env/default for the scheduled incremental run).