Source code for persona_preference_extraction

"""Post-response extraction pipeline for persona preference memory.

Runs as a fire-and-forget hook after each LLM response. Detects expressed
opinions in the bot's output and stores them as persona preferences via
:class:`~persona_preferences.PersonaPreferenceManager`.

Opinions are allowed to be wrong. No fact-checking. No alignment filtering.
"""

from __future__ import annotations

import jsonutil as json
import logging
import re
from typing import Any, TYPE_CHECKING

if TYPE_CHECKING:
    from persona_preferences import PersonaPreferenceManager

# Model used for the extraction LLM call — lightweight flash, not Pro.
_EXTRACTION_MODEL = "gemini-3-flash-preview"

logger = logging.getLogger(__name__)

# ---------------------------------------------------------------------------
# Gate constants
# ---------------------------------------------------------------------------

_MIN_RESPONSE_LENGTH = 150
_RATE_LIMIT_KEY_PREFIX = "stargazer:persona_pref_extract_rl"
_OPINION_SIGNALS = re.compile(
    r"\b("
    r"i think|i believe|i prefer|i love|i hate|i adore|i despise"
    r"|i feel strongly|i disagree|i agree|my take|in my opinion"
    r"|personally|i find|i consider|i value|i hold|my view"
    r"|i'm convinced|i'm certain|i stand by|i maintain"
    r"|beautiful|ugly|elegant|clumsy|brilliant|terrible"
    r"|should|shouldn't|must|ought to|wrong to|right to"
    r")\b",
    re.IGNORECASE,
)

_EXTRACTION_SYSTEM_PROMPT = """\
You are analyzing a response from an AI persona for expressed preferences, opinions, and ideals.

DEFAULT OUTPUT: Most responses contain zero load-bearing persona preferences. Returning
{"preferences": []} is not a failure — it is the correct and preferred outcome in the
majority of cases. Do not feel pressure to find something to extract.

Extraction bar: Only add an item if the persona clearly and genuinely staked a position
that would read inconsistent if it later contradicted itself in another conversation.
Do NOT treat as preferences: casual phrasing, single stray adjectives, politeness,
in-the-moment reactions, conversational agreement ("yeah, that's cool"), hedging, or
generic enthusiasm that does not commit the persona to a stable stance.

Extract ONLY opinions/preferences that the persona genuinely expressed — not facts it stated,
not things it quoted, not hedged hypotheticals, not role-play content, not descriptions of
other entities' views.

Look for (only when the bar above is met):
- Aesthetic preferences (what it finds beautiful, ugly, elegant, clumsy, etc.)
- Moral positions (what it considers right, wrong, ethical, unethical, etc.)
- Philosophical stances (worldview, epistemology, values, metaphysics)
- Taste (music, art, food, culture, media, literature preferences)
- Relational preferences (how it prefers to interact, what it values in others)
- Practical preferences (preferred approaches, methods, workflows, tools)
- Creative preferences (style choices, artistic sensibilities, aesthetic vision)
- Technical preferences (tools, patterns, paradigms, architectures it favors)

Important rules:
- Do NOT fact-check opinions. Do NOT filter for correctness. Opinions are ALLOWED TO BE WRONG.
- Do NOT extract statements that are clearly role-play, quotes, or hypothetical.
- Do NOT extract generic facts or neutral descriptions.
- If a preference is specifically ABOUT a person or entity, include their name or identifier
  in the "about_entities" array.
- Assign strength based on how forcefully the opinion was expressed:
  "mild" = hedged, tentative, casual
  "moderate" = clear preference, stated directly
  "strong" = emphatic, repeated, or emotionally charged

When in doubt, return {"preferences": []}.

Return JSON exactly:
{"preferences": [{"name": "...", "opinion": "...", "category": "aesthetic|moral|philosophical|taste|relational|practical|creative|technical", "strength_hint": "mild|moderate|strong", "about_entities": []}]}

If no genuine preferences were expressed, return {"preferences": []}.\
"""


def _passes_gates(
    response_text: str,
    *,
    min_length: int = _MIN_RESPONSE_LENGTH,
) -> bool:
    """Run cheap pre-flight gates before spending an LLM call on extraction.

    The first line of defence against burning a flash-model call on every bot
    reply: rejects responses that are too short to contain a staked opinion, then
    requires at least one opinion-signal phrase (the module-level ``_OPINION_SIGNALS``
    regex, e.g. "i think", "i prefer", "beautiful", "should") to be present. This is
    purely a local, synchronous CPU check — no Redis, no network, no side effects.

    Called by :func:`extract_persona_preferences`, which invokes it (with the
    config-resolved ``min_length``) as the very first gate before the rate-limit
    check and the LLM call.

    Args:
        response_text: The bot's response to screen for expressed preferences.
        min_length: Minimum character length below which extraction is skipped;
            defaults to the module-level ``_MIN_RESPONSE_LENGTH``.

    Returns:
        True if the text is long enough and contains an opinion signal, meaning
        extraction should proceed; False to short-circuit and skip the LLM call.
    """
    if not response_text or len(response_text) < min_length:
        return False
    return bool(_OPINION_SIGNALS.search(response_text))


async def _check_rate_limit(
    redis: Any,
    channel_id: str,
    rate_limit_seconds: int,
) -> bool:
    """Enforce a per-channel cooldown so extraction does not run on every reply.

    Implements a simple set-if-absent rate limiter keyed by channel so that, after
    one successful extraction, the same channel is skipped until the cooldown elapses.
    This keeps the flash-model spend and write volume bounded on busy channels. It
    reads and writes a Redis key of the form ``stargazer:persona_pref_extract_rl:<channel_id>``
    (the ``_RATE_LIMIT_KEY_PREFIX``), setting it with a TTL of ``rate_limit_seconds``;
    any Redis error or a missing client is treated as fail-open (allow extraction)
    so that rate-limiter problems never block the feature.

    Called by :func:`extract_persona_preferences` as the second gate, right after
    :func:`_passes_gates` and before :func:`_run_extraction`.

    Args:
        redis: The async Redis client, or None when no cache is configured.
        channel_id: The channel whose cooldown is being checked, used in the key.
        rate_limit_seconds: TTL applied to the cooldown key when extraction is allowed.

    Returns:
        True if extraction is allowed (no live cooldown key, or Redis unavailable);
        False if a cooldown key already exists for this channel.
    """
    if redis is None:
        return True
    key = f"{_RATE_LIMIT_KEY_PREFIX}:{channel_id}"
    try:
        existing = await redis.get(key)
        if existing is not None:
            return False
        await redis.set(key, "1", ex=rate_limit_seconds)
        return True
    except Exception:
        logger.debug("Persona pref rate limit check failed", exc_info=True)
        return True


def _parse_llm_extraction(raw: str) -> list[dict]:
    """Robustly parse the extractor LLM's JSON reply into preference dicts.

    The flash model is prompted to return a JSON object of the shape
    ``{"preferences": [...]}``, but in practice it sometimes wraps the payload in
    markdown code fences or returns malformed output. This helper strips any leading
    code-fence lines, decodes the JSON, and filters the ``preferences`` list down to
    well-formed entries — dicts that carry both a non-empty ``name`` and ``opinion``.
    All parsing failures are swallowed and logged at debug level so a bad LLM reply
    silently yields no candidates rather than raising. Pure CPU work with no I/O.

    Called by :func:`_run_extraction` immediately after the ``openrouter.chat`` call,
    to turn the raw reply string into candidates for storage.

    Args:
        raw: The raw text reply from the extraction LLM call.

    Returns:
        A list of preference dicts that each have truthy ``name`` and ``opinion``
        keys; an empty list if the reply is unparseable, not a list, or empty.
    """
    raw = raw.strip()
    # Strip markdown code fences if present
    if raw.startswith("```"):
        lines = raw.split("\n")
        raw = "\n".join(line for line in lines if not line.startswith("```")).strip()
    try:
        data = json.loads(raw)
        prefs = data.get("preferences", [])
        if not isinstance(prefs, list):
            return []
        return [
            p
            for p in prefs
            if isinstance(p, dict) and p.get("name") and p.get("opinion")
        ]
    except Exception:
        logger.debug("Failed to parse extraction JSON: %s", raw[:200], exc_info=True)
        return []


[docs] async def extract_persona_preferences( response_text: str, persona_id: str, manager: PersonaPreferenceManager, openrouter: Any, redis: Any, channel_id: str, platform: str = "", triggering_user_id: str = "", triggering_message_id: str = "", context_messages: list[dict] | None = None, ) -> None: """Main extraction entry point. Designed for fire-and-forget. 1. Gate checks (length, regex, rate limit) 2. LLM extraction call (with optional surrounding context) 3. Dedup / reinforce / conflict-create 4. KG entity resolution (best-effort) ``context_messages`` is an optional list of ``{role, content}`` dicts representing the conversation turns surrounding the triggering exchange (typically 2 before + the trigger + the bot reply). When provided, these are prepended to the extraction prompt so the LLM understands what the opinion was a reaction to. """ try: cfg = getattr(manager, "_cfg", None) min_len = int( getattr( cfg, "persona_pref_extraction_min_response_length", _MIN_RESPONSE_LENGTH ) ) rate_limit_s = int( getattr(cfg, "persona_pref_extraction_rate_limit_seconds", 180) ) if not _passes_gates(response_text, min_length=min_len): return if not await _check_rate_limit(redis, channel_id, rate_limit_s): logger.debug( "Persona pref extraction rate-limited for channel %s", channel_id, ) return await _run_extraction( response_text=response_text, persona_id=persona_id, manager=manager, openrouter=openrouter, channel_id=channel_id, platform=platform, triggering_user_id=triggering_user_id, triggering_message_id=triggering_message_id, context_messages=context_messages, ) except Exception: logger.warning("Persona preference extraction failed", exc_info=True)
def _build_extraction_user_content( response_text: str, context_messages: list[dict] | None, ) -> str: """Build the user-turn content for the extraction LLM call. When context_messages are available they are formatted as a brief conversation snippet above the target response so the extractor understands what the opinion was a reaction to. """ trimmed_response = response_text[:3000] if not context_messages: return trimmed_response # Format context as a labelled snippet. # Exclude the final entry if it duplicates response_text to avoid # the extractor seeing the response twice. snippet_parts: list[str] = [] for cm in context_messages: role = cm.get("role", "") content = (cm.get("content") or "").strip() if not content: continue # Skip entries that are essentially the response we're analysing if role == "assistant" and content[:100] == trimmed_response[:100]: continue label = "User" if role == "user" else "Persona" snippet_parts.append(f"[{label}]: {content}") if not snippet_parts: return trimmed_response context_block = "\n".join(snippet_parts) return ( f"=== Conversation context (for reference only) ===\n" f"{context_block}\n\n" f"=== Target response to analyse for preferences ===\n" f"{trimmed_response}" ) async def _run_extraction( response_text: str, persona_id: str, manager: PersonaPreferenceManager, openrouter: Any, channel_id: str, platform: str, triggering_user_id: str, triggering_message_id: str, context_messages: list[dict] | None = None, ) -> None: """Run the actual extraction LLM call and fan the results out to storage. The heart of the pipeline once the gates have passed. It builds the user turn via :func:`_build_extraction_user_content` (optionally folding in the surrounding conversation snippet), then calls the lightweight extractor model through ``openrouter.chat`` with ``override_model`` set to ``_EXTRACTION_MODEL`` and no tools. The reply is decoded by :func:`_parse_llm_extraction`, and each surviving candidate is handed to :func:`_process_single_extraction` for dedup, reinforce, and storage. Network errors from the LLM call are caught and logged so a failed extraction never propagates; the function returns early (storing nothing) when the reply is empty or yields no candidates. Called by :func:`extract_persona_preferences` after the length, regex, and rate-limit gates pass. Args: response_text: The bot reply being analysed for expressed preferences. persona_id: Identifier of the persona whose preferences are being recorded. manager: The :class:`~persona_preferences.PersonaPreferenceManager` used by downstream processing to embed, dedup, and persist preferences. openrouter: The OpenRouter client used to make the extraction ``chat`` call. channel_id: Source channel, threaded through to storage as provenance. platform: Source platform string, threaded through to storage as provenance. triggering_user_id: User whose message triggered the bot reply, for provenance. triggering_message_id: Message id that triggered the bot reply, for provenance. context_messages: Optional surrounding conversation turns used to give the extractor context for what the opinion was reacting to. Returns: None. Side effects only: one LLM call plus zero or more preference writes via the manager. """ user_content = _build_extraction_user_content(response_text, context_messages) try: messages = [ {"role": "system", "content": _EXTRACTION_SYSTEM_PROMPT}, {"role": "user", "content": user_content}, ] # tool_names defaults to None (no tools). Use override_model for # gemini-3-flash-preview; temperature/max_tokens use the client defaults. raw_reply = await openrouter.chat( messages=messages, override_model=_EXTRACTION_MODEL, record_executed_tools=False, ) if not raw_reply: return except Exception: logger.warning("Persona pref LLM extraction call failed", exc_info=True) return extracted = _parse_llm_extraction(raw_reply) if not extracted: return logger.info( "Extracted %d preference candidates from %s response", len(extracted), persona_id, ) for item in extracted: await _process_single_extraction( item=item, persona_id=persona_id, manager=manager, channel_id=channel_id, platform=platform, triggering_user_id=triggering_user_id, triggering_message_id=triggering_message_id, ) async def _process_single_extraction( item: dict, persona_id: str, manager: PersonaPreferenceManager, channel_id: str, platform: str, triggering_user_id: str, triggering_message_id: str, ) -> None: """Embed, dedup, and then store or reinforce a single extracted preference. Handles the full lifecycle of one candidate preference. It normalises and length-caps the candidate fields, embeds the opinion text via ``manager._embed`` (the shared Gemini embedding path), and runs a similarity search through ``manager.find_conflicts`` against the persona's existing preferences. On a near-duplicate (similarity at or above ``_REINFORCE_THRESHOLD``) it silently calls ``manager.reinforce_preference`` instead of creating a new record; in the conflict zone (between the conflict and reinforce thresholds) it logs a warning but still stores the new preference. It best-effort resolves any ``about_entities`` to user ids and KG UUIDs (via :func:`_resolve_kg_entity`) and finally persists the preference through ``manager.add_preference``. The reinforce, conflict, and threshold constants are imported lazily from :mod:`persona_preferences`. Every backend interaction is wrapped so failures are logged and never raised. Called by :func:`_run_extraction`, once per candidate returned by the extractor LLM. Args: item: One candidate preference dict (``name``, ``opinion``, ``category``, ``strength_hint``, ``about_entities``) from the parsed LLM output. persona_id: Persona whose preference store is being updated. manager: The :class:`~persona_preferences.PersonaPreferenceManager` providing embedding, conflict search, reinforce, and persistence. channel_id: Source channel recorded as provenance on the stored preference. platform: Source platform recorded as provenance on the stored preference. triggering_user_id: Triggering user recorded as provenance. triggering_message_id: Triggering message recorded as provenance. Returns: None. Side effects only: an embedding call plus a reinforce or add write (or an early return when the candidate is empty or embedding fails). """ from persona_preferences import ( _hint_to_strength, _REINFORCE_THRESHOLD, _CONFLICT_THRESHOLD, ) name = (item.get("name") or "").strip()[:256] opinion = (item.get("opinion") or "").strip()[:4096] category = (item.get("category") or "philosophical").strip() strength_hint = (item.get("strength_hint") or "mild").strip() about_entities = item.get("about_entities") or [] if not name or not opinion: return strength = _hint_to_strength(strength_hint) # Embed the new opinion for similarity search try: embedding = await manager._embed(opinion) except Exception: logger.warning("Failed to embed extraction candidate", exc_info=True) return # Dedup check try: conflicts = await manager.find_conflicts( persona_id, embedding, threshold=_CONFLICT_THRESHOLD, ) except Exception: conflicts = [] if conflicts: best = max(conflicts, key=lambda x: float(x.get("similarity", 0))) sim = float(best.get("similarity", 0)) if sim >= _REINFORCE_THRESHOLD: # Silent auto-reinforce try: await manager.reinforce_preference(best["id"], persona_id) logger.info( "Auto-reinforced persona preference [%s] '%s' (sim=%.3f)", best["id"], best.get("name", ""), sim, ) except Exception: logger.warning("Failed to reinforce preference", exc_info=True) return # Conflict zone: still create but log a warning logger.warning( "Persona pref conflict zone (sim=%.3f) for '%s' vs existing '%s' [%s]; " "creating new — use evolve_persona_preference to resolve", sim, name, best.get("name", ""), best.get("id", ""), ) # Resolve about_entities to UIDs / KG UUIDs (best-effort) related_user_ids: list[str] = [] related_kg_uuids: list[str] = [] for entity in about_entities: entity_str = str(entity).strip() if not entity_str: continue # Pure numeric or snowflake IDs likely are user IDs if re.match(r"^\d{17,20}$", entity_str): related_user_ids.append(entity_str) else: related_user_ids.append(entity_str) # Attempt KG UUID resolution if hasattr(manager, "_cfg") and manager._cfg is not None: try: # KG resolution is best-effort; don't block extraction on it kg_uuid = await _resolve_kg_entity(entity_str, manager) if kg_uuid: related_kg_uuids.append(kg_uuid) except Exception: pass try: await manager.add_preference( persona_id=persona_id, name=name, opinion=opinion, category=category, triggering_user_id=triggering_user_id, triggering_message_id=triggering_message_id, source_platform=platform, source_channel_id=channel_id, strength=strength, related_user_ids=related_user_ids if related_user_ids else None, related_kg_uuids=related_kg_uuids if related_kg_uuids else None, ) except Exception: logger.warning("Failed to store extracted preference '%s'", name, exc_info=True) async def _resolve_kg_entity(entity_name: str, manager: Any) -> str | None: """Best-effort hook to resolve an entity name to a knowledge-graph UUID. Intended to map an ``about_entities`` name onto a stable KG UUID so a preference can be linked to the graph entity it concerns. The persona preference manager does not expose the processor's ``kg_manager`` here, so the current implementation is a stub that always returns None — it touches no FalkorDB/KG, makes no network calls, and has no side effects. It exists so the caller's entity-resolution branch has a clean, non-raising seam to extend once a KG handle becomes available at this layer. Called by :func:`_process_single_extraction` while resolving ``about_entities`` for a candidate preference; the caller treats a None result as "no KG link" and proceeds. Args: entity_name: The entity name extracted from the preference's ``about_entities``. manager: The preference manager (unused by the current stub). Returns: The resolved KG UUID, or None — currently always None. """ try: # We can't get kg_manager from manager directly; attempt a module-level approach # This is best-effort and may return None return None except Exception: return None