kg_bulk_runner
Shared bulk agentic KG extraction: Redis message collect, chunking, LLM run.
Used by scripts.kg_bulk_dump_and_extract and the scheduled incremental
task in background_tasks.
- async kg_bulk_runner.bootstrap_latest_cursor_no_extract(redis, zset_key, platform, channel_id)[source]
If no cursor, set it to current max zset score and skip backlog.
- async kg_bulk_runner.collect_messages_from_redis(redis, *, incremental=False, cursor_bootstrap='full', channel_pairs=None)[source]
Load messages from Redis channel zsets.
channel_pairs: if set, only these
(platform, channel_id)zsets; otherwise allchannel_msgs:*keys.When incremental is true, uses
KG_AGENTIC_BULK_LAST_TS_HASHper channel with exclusive lower bound. cursor_bootstraplatestseeds missing cursors to the current zset max without returning backlog messages.
- async kg_bulk_runner.token_count_conversation(counter, conversation_text, *, channel_scope, reserve, config=None, chunk_channel_pairs=None, chunk_speaker_pairs=None, speaker_kg_prefetch='', channel_metadata=None)[source]
- Return type:
- Parameters:
- async kg_bulk_runner.chunk_message_lines(lines, line_channel_keys, line_speaker_keys, line_timestamps, counter, *, max_tokens, channel_scope, reserve, config=None, speaker_prefetch_placeholder='', channel_metadata=None)[source]
Pack lines into token-budget chunks with per-chunk time + cursor hints.
- class kg_bulk_runner.KgBulkPipelineParams(out_dir, dump_only=False, dry_run_chunks=False, dry_run_llm=False, chunk_tokens=250000, token_reserve=100000, max_messages=0, chunks_max=0, per_channel=False, resume_from_chunk=0, max_tool_rounds=48, bulk_llm_backend='gemini', fetch_channel_metadata=False, channel_metadata_ttl_days=7.0, discord_platform_type='discord', prefetch_speaker_kg=False, prefetch_max_speakers=8, prefetch_hits_per_speaker=3, prefetch_max_chars=400000, prefetch_min_score=0.0, redis_no_verify=True, incremental=False)[source]
Bases:
object- Parameters:
out_dir (Path)
dump_only (bool)
dry_run_chunks (bool)
dry_run_llm (bool)
chunk_tokens (int)
token_reserve (int)
max_messages (int)
chunks_max (int)
per_channel (bool)
resume_from_chunk (int)
max_tool_rounds (int)
bulk_llm_backend (str)
fetch_channel_metadata (bool)
channel_metadata_ttl_days (float)
discord_platform_type (str)
prefetch_speaker_kg (bool)
prefetch_max_speakers (int)
prefetch_hits_per_speaker (int)
prefetch_max_chars (int)
prefetch_min_score (float)
redis_no_verify (bool)
incremental (bool)
- async kg_bulk_runner.run_agentic_bulk_pipeline(cfg, redis, messages, n_zsets_scanned, params, *, load_channel_metadata=None)[source]
Chunk (optional disk artifacts), run agentic KG extraction, update cursors.
load_channel_metadata: optional async callable returning Discord (or other) channel metadata dict; only invoked when params.fetch_channel_metadata.
After each successful extraction chunk, updates incremental cursors per channel when params.incremental is true.