kg_bulk_runner

Shared bulk agentic KG extraction: Redis message collect, chunking, LLM run.

Used by scripts.kg_bulk_dump_and_extract and the scheduled incremental task in background_tasks.

kg_bulk_runner.cursor_field(platform, channel_id)[source]
Return type:

str

Parameters:
  • platform (str)

  • channel_id (str)

kg_bulk_runner.redis_ssl_kwargs_for_bulk(cfg, *, redis_no_verify)[source]
Return type:

dict[str, Any]

Parameters:
kg_bulk_runner.format_llm_style_line(m)[source]
Return type:

str

Parameters:

m (dict[str, Any])

async kg_bulk_runner.scan_channel_zset_keys(redis)[source]
Return type:

list[str]

Parameters:

redis (Any)

async kg_bulk_runner.cursor_hget(redis, platform, channel_id)[source]
Return type:

float | None

Parameters:
  • redis (Any)

  • platform (str)

  • channel_id (str)

async kg_bulk_runner.cursor_hset(redis, platform, channel_id, ts)[source]
Return type:

None

Parameters:
async kg_bulk_runner.bootstrap_latest_cursor_no_extract(redis, zset_key, platform, channel_id)[source]

If no cursor, set it to current max zset score and skip backlog.

Return type:

None

Parameters:
  • redis (Any)

  • zset_key (str)

  • platform (str)

  • channel_id (str)

async kg_bulk_runner.fetch_messages_for_zset(redis, zset_key)[source]
Return type:

list[dict[str, Any]]

Parameters:
async kg_bulk_runner.fetch_messages_for_zset_after(redis, zset_key, after_ts_exclusive)[source]
Return type:

list[dict[str, Any]]

Parameters:
  • redis (Any)

  • zset_key (str)

  • after_ts_exclusive (float | None)

async kg_bulk_runner.collect_messages_from_redis(redis, *, incremental=False, cursor_bootstrap='full', channel_pairs=None)[source]

Load messages from Redis channel zsets.

channel_pairs: if set, only these (platform, channel_id) zsets; otherwise all channel_msgs:* keys.

When incremental is true, uses KG_AGENTIC_BULK_LAST_TS_HASH per channel with exclusive lower bound. cursor_bootstrap latest seeds missing cursors to the current zset max without returning backlog messages.

Return type:

tuple[list[dict[str, Any]], int]

Parameters:
async kg_bulk_runner.token_count_conversation(counter, conversation_text, *, channel_scope, reserve, config=None, chunk_channel_pairs=None, chunk_speaker_pairs=None, speaker_kg_prefetch='', channel_metadata=None)[source]
Return type:

int

Parameters:
async kg_bulk_runner.chunk_message_lines(lines, line_channel_keys, line_speaker_keys, line_timestamps, counter, *, max_tokens, channel_scope, reserve, config=None, speaker_prefetch_placeholder='', channel_metadata=None)[source]

Pack lines into token-budget chunks with per-chunk time + cursor hints.

Return type:

list[tuple[list[str], list[tuple[str, str]], list[tuple[str, str]], float, float, dict[tuple[str, str], float]]]

Parameters:
class kg_bulk_runner.KgBulkPipelineParams(out_dir, dump_only=False, dry_run_chunks=False, dry_run_llm=False, chunk_tokens=250000, token_reserve=100000, max_messages=0, chunks_max=0, per_channel=False, resume_from_chunk=0, max_tool_rounds=48, bulk_llm_backend='gemini', fetch_channel_metadata=False, channel_metadata_ttl_days=7.0, discord_platform_type='discord', prefetch_speaker_kg=False, prefetch_max_speakers=8, prefetch_hits_per_speaker=3, prefetch_max_chars=400000, prefetch_min_score=0.0, redis_no_verify=True, incremental=False)[source]

Bases: object

Parameters:
  • out_dir (Path)

  • dump_only (bool)

  • dry_run_chunks (bool)

  • dry_run_llm (bool)

  • chunk_tokens (int)

  • token_reserve (int)

  • max_messages (int)

  • chunks_max (int)

  • per_channel (bool)

  • resume_from_chunk (int)

  • max_tool_rounds (int)

  • bulk_llm_backend (str)

  • fetch_channel_metadata (bool)

  • channel_metadata_ttl_days (float)

  • discord_platform_type (str)

  • prefetch_speaker_kg (bool)

  • prefetch_max_speakers (int)

  • prefetch_hits_per_speaker (int)

  • prefetch_max_chars (int)

  • prefetch_min_score (float)

  • redis_no_verify (bool)

  • incremental (bool)

out_dir: Path
dump_only: bool = False
dry_run_chunks: bool = False
dry_run_llm: bool = False
chunk_tokens: int = 250000
token_reserve: int = 100000
max_messages: int = 0
chunks_max: int = 0
per_channel: bool = False
resume_from_chunk: int = 0
max_tool_rounds: int = 48
bulk_llm_backend: str = 'gemini'
fetch_channel_metadata: bool = False
channel_metadata_ttl_days: float = 7.0
discord_platform_type: str = 'discord'
prefetch_speaker_kg: bool = False
prefetch_max_speakers: int = 8
prefetch_hits_per_speaker: int = 3
prefetch_max_chars: int = 400000
prefetch_min_score: float = 0.0
redis_no_verify: bool = True
incremental: bool = False
async kg_bulk_runner.run_agentic_bulk_pipeline(cfg, redis, messages, n_zsets_scanned, params, *, load_channel_metadata=None)[source]

Chunk (optional disk artifacts), run agentic KG extraction, update cursors.

load_channel_metadata: optional async callable returning Discord (or other) channel metadata dict; only invoked when params.fetch_channel_metadata.

After each successful extraction chunk, updates incremental cursors per channel when params.incremental is true.

Return type:

None

Parameters:
kg_bulk_runner.resolve_bulk_backend(cli_value)[source]
Return type:

str

Parameters:

cli_value (str | None)