"""Post-response extraction pipeline for persona preference memory.
Runs as a fire-and-forget hook after each LLM response. Detects expressed
opinions in the bot's output and stores them as persona preferences via
:class:`~persona_preferences.PersonaPreferenceManager`.
Opinions are allowed to be wrong. No fact-checking. No alignment filtering.
"""
from __future__ import annotations
import jsonutil as json
import logging
import re
from typing import Any, TYPE_CHECKING
if TYPE_CHECKING:
from persona_preferences import PersonaPreferenceManager
# Model used for the extraction LLM call — lightweight flash, not Pro.
_EXTRACTION_MODEL = "gemini-3-flash-preview"
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Gate constants
# ---------------------------------------------------------------------------
_MIN_RESPONSE_LENGTH = 150
_RATE_LIMIT_KEY_PREFIX = "stargazer:persona_pref_extract_rl"
_OPINION_SIGNALS = re.compile(
r"\b("
r"i think|i believe|i prefer|i love|i hate|i adore|i despise"
r"|i feel strongly|i disagree|i agree|my take|in my opinion"
r"|personally|i find|i consider|i value|i hold|my view"
r"|i'm convinced|i'm certain|i stand by|i maintain"
r"|beautiful|ugly|elegant|clumsy|brilliant|terrible"
r"|should|shouldn't|must|ought to|wrong to|right to"
r")\b",
re.IGNORECASE,
)
_EXTRACTION_SYSTEM_PROMPT = """\
You are analyzing a response from an AI persona for expressed preferences, opinions, and ideals.
DEFAULT OUTPUT: Most responses contain zero load-bearing persona preferences. Returning
{"preferences": []} is not a failure — it is the correct and preferred outcome in the
majority of cases. Do not feel pressure to find something to extract.
Extraction bar: Only add an item if the persona clearly and genuinely staked a position
that would read inconsistent if it later contradicted itself in another conversation.
Do NOT treat as preferences: casual phrasing, single stray adjectives, politeness,
in-the-moment reactions, conversational agreement ("yeah, that's cool"), hedging, or
generic enthusiasm that does not commit the persona to a stable stance.
Extract ONLY opinions/preferences that the persona genuinely expressed — not facts it stated,
not things it quoted, not hedged hypotheticals, not role-play content, not descriptions of
other entities' views.
Look for (only when the bar above is met):
- Aesthetic preferences (what it finds beautiful, ugly, elegant, clumsy, etc.)
- Moral positions (what it considers right, wrong, ethical, unethical, etc.)
- Philosophical stances (worldview, epistemology, values, metaphysics)
- Taste (music, art, food, culture, media, literature preferences)
- Relational preferences (how it prefers to interact, what it values in others)
- Practical preferences (preferred approaches, methods, workflows, tools)
- Creative preferences (style choices, artistic sensibilities, aesthetic vision)
- Technical preferences (tools, patterns, paradigms, architectures it favors)
Important rules:
- Do NOT fact-check opinions. Do NOT filter for correctness. Opinions are ALLOWED TO BE WRONG.
- Do NOT extract statements that are clearly role-play, quotes, or hypothetical.
- Do NOT extract generic facts or neutral descriptions.
- If a preference is specifically ABOUT a person or entity, include their name or identifier
in the "about_entities" array.
- Assign strength based on how forcefully the opinion was expressed:
"mild" = hedged, tentative, casual
"moderate" = clear preference, stated directly
"strong" = emphatic, repeated, or emotionally charged
When in doubt, return {"preferences": []}.
Return JSON exactly:
{"preferences": [{"name": "...", "opinion": "...", "category": "aesthetic|moral|philosophical|taste|relational|practical|creative|technical", "strength_hint": "mild|moderate|strong", "about_entities": []}]}
If no genuine preferences were expressed, return {"preferences": []}.\
"""
def _passes_gates(
response_text: str,
*,
min_length: int = _MIN_RESPONSE_LENGTH,
) -> bool:
"""Run cheap pre-flight gates before spending an LLM call on extraction.
The first line of defence against burning a flash-model call on every bot
reply: rejects responses that are too short to contain a staked opinion, then
requires at least one opinion-signal phrase (the module-level ``_OPINION_SIGNALS``
regex, e.g. "i think", "i prefer", "beautiful", "should") to be present. This is
purely a local, synchronous CPU check — no Redis, no network, no side effects.
Called by :func:`extract_persona_preferences`, which invokes it (with the
config-resolved ``min_length``) as the very first gate before the rate-limit
check and the LLM call.
Args:
response_text: The bot's response to screen for expressed preferences.
min_length: Minimum character length below which extraction is skipped;
defaults to the module-level ``_MIN_RESPONSE_LENGTH``.
Returns:
True if the text is long enough and contains an opinion signal, meaning
extraction should proceed; False to short-circuit and skip the LLM call.
"""
if not response_text or len(response_text) < min_length:
return False
return bool(_OPINION_SIGNALS.search(response_text))
async def _check_rate_limit(
redis: Any,
channel_id: str,
rate_limit_seconds: int,
) -> bool:
"""Enforce a per-channel cooldown so extraction does not run on every reply.
Implements a simple set-if-absent rate limiter keyed by channel so that, after
one successful extraction, the same channel is skipped until the cooldown elapses.
This keeps the flash-model spend and write volume bounded on busy channels. It
reads and writes a Redis key of the form ``stargazer:persona_pref_extract_rl:<channel_id>``
(the ``_RATE_LIMIT_KEY_PREFIX``), setting it with a TTL of ``rate_limit_seconds``;
any Redis error or a missing client is treated as fail-open (allow extraction)
so that rate-limiter problems never block the feature.
Called by :func:`extract_persona_preferences` as the second gate, right after
:func:`_passes_gates` and before :func:`_run_extraction`.
Args:
redis: The async Redis client, or None when no cache is configured.
channel_id: The channel whose cooldown is being checked, used in the key.
rate_limit_seconds: TTL applied to the cooldown key when extraction is allowed.
Returns:
True if extraction is allowed (no live cooldown key, or Redis unavailable);
False if a cooldown key already exists for this channel.
"""
if redis is None:
return True
key = f"{_RATE_LIMIT_KEY_PREFIX}:{channel_id}"
try:
existing = await redis.get(key)
if existing is not None:
return False
await redis.set(key, "1", ex=rate_limit_seconds)
return True
except Exception:
logger.debug("Persona pref rate limit check failed", exc_info=True)
return True
def _parse_llm_extraction(raw: str) -> list[dict]:
"""Robustly parse the extractor LLM's JSON reply into preference dicts.
The flash model is prompted to return a JSON object of the shape
``{"preferences": [...]}``, but in practice it sometimes wraps the payload in
markdown code fences or returns malformed output. This helper strips any leading
code-fence lines, decodes the JSON, and filters the ``preferences`` list down to
well-formed entries — dicts that carry both a non-empty ``name`` and ``opinion``.
All parsing failures are swallowed and logged at debug level so a bad LLM reply
silently yields no candidates rather than raising. Pure CPU work with no I/O.
Called by :func:`_run_extraction` immediately after the ``openrouter.chat`` call,
to turn the raw reply string into candidates for storage.
Args:
raw: The raw text reply from the extraction LLM call.
Returns:
A list of preference dicts that each have truthy ``name`` and ``opinion``
keys; an empty list if the reply is unparseable, not a list, or empty.
"""
raw = raw.strip()
# Strip markdown code fences if present
if raw.startswith("```"):
lines = raw.split("\n")
raw = "\n".join(line for line in lines if not line.startswith("```")).strip()
try:
data = json.loads(raw)
prefs = data.get("preferences", [])
if not isinstance(prefs, list):
return []
return [
p
for p in prefs
if isinstance(p, dict) and p.get("name") and p.get("opinion")
]
except Exception:
logger.debug("Failed to parse extraction JSON: %s", raw[:200], exc_info=True)
return []
def _build_extraction_user_content(
response_text: str,
context_messages: list[dict] | None,
) -> str:
"""Build the user-turn content for the extraction LLM call.
When context_messages are available they are formatted as a
brief conversation snippet above the target response so the
extractor understands what the opinion was a reaction to.
"""
trimmed_response = response_text[:3000]
if not context_messages:
return trimmed_response
# Format context as a labelled snippet.
# Exclude the final entry if it duplicates response_text to avoid
# the extractor seeing the response twice.
snippet_parts: list[str] = []
for cm in context_messages:
role = cm.get("role", "")
content = (cm.get("content") or "").strip()
if not content:
continue
# Skip entries that are essentially the response we're analysing
if role == "assistant" and content[:100] == trimmed_response[:100]:
continue
label = "User" if role == "user" else "Persona"
snippet_parts.append(f"[{label}]: {content}")
if not snippet_parts:
return trimmed_response
context_block = "\n".join(snippet_parts)
return (
f"=== Conversation context (for reference only) ===\n"
f"{context_block}\n\n"
f"=== Target response to analyse for preferences ===\n"
f"{trimmed_response}"
)
async def _run_extraction(
response_text: str,
persona_id: str,
manager: PersonaPreferenceManager,
openrouter: Any,
channel_id: str,
platform: str,
triggering_user_id: str,
triggering_message_id: str,
context_messages: list[dict] | None = None,
) -> None:
"""Run the actual extraction LLM call and fan the results out to storage.
The heart of the pipeline once the gates have passed. It builds the user turn via
:func:`_build_extraction_user_content` (optionally folding in the surrounding
conversation snippet), then calls the lightweight extractor model through
``openrouter.chat`` with ``override_model`` set to ``_EXTRACTION_MODEL`` and no
tools. The reply is decoded by :func:`_parse_llm_extraction`, and each surviving
candidate is handed to :func:`_process_single_extraction` for dedup, reinforce,
and storage. Network errors from the LLM call are caught and logged so a failed
extraction never propagates; the function returns early (storing nothing) when
the reply is empty or yields no candidates.
Called by :func:`extract_persona_preferences` after the length, regex, and
rate-limit gates pass.
Args:
response_text: The bot reply being analysed for expressed preferences.
persona_id: Identifier of the persona whose preferences are being recorded.
manager: The :class:`~persona_preferences.PersonaPreferenceManager` used by
downstream processing to embed, dedup, and persist preferences.
openrouter: The OpenRouter client used to make the extraction ``chat`` call.
channel_id: Source channel, threaded through to storage as provenance.
platform: Source platform string, threaded through to storage as provenance.
triggering_user_id: User whose message triggered the bot reply, for provenance.
triggering_message_id: Message id that triggered the bot reply, for provenance.
context_messages: Optional surrounding conversation turns used to give the
extractor context for what the opinion was reacting to.
Returns:
None. Side effects only: one LLM call plus zero or more preference writes
via the manager.
"""
user_content = _build_extraction_user_content(response_text, context_messages)
try:
messages = [
{"role": "system", "content": _EXTRACTION_SYSTEM_PROMPT},
{"role": "user", "content": user_content},
]
# tool_names defaults to None (no tools). Use override_model for
# gemini-3-flash-preview; temperature/max_tokens use the client defaults.
raw_reply = await openrouter.chat(
messages=messages,
override_model=_EXTRACTION_MODEL,
record_executed_tools=False,
)
if not raw_reply:
return
except Exception:
logger.warning("Persona pref LLM extraction call failed", exc_info=True)
return
extracted = _parse_llm_extraction(raw_reply)
if not extracted:
return
logger.info(
"Extracted %d preference candidates from %s response",
len(extracted),
persona_id,
)
for item in extracted:
await _process_single_extraction(
item=item,
persona_id=persona_id,
manager=manager,
channel_id=channel_id,
platform=platform,
triggering_user_id=triggering_user_id,
triggering_message_id=triggering_message_id,
)
async def _process_single_extraction(
item: dict,
persona_id: str,
manager: PersonaPreferenceManager,
channel_id: str,
platform: str,
triggering_user_id: str,
triggering_message_id: str,
) -> None:
"""Embed, dedup, and then store or reinforce a single extracted preference.
Handles the full lifecycle of one candidate preference. It normalises and length-caps
the candidate fields, embeds the opinion text via ``manager._embed`` (the shared Gemini
embedding path), and runs a similarity search through ``manager.find_conflicts`` against
the persona's existing preferences. On a near-duplicate (similarity at or above
``_REINFORCE_THRESHOLD``) it silently calls ``manager.reinforce_preference`` instead of
creating a new record; in the conflict zone (between the conflict and reinforce
thresholds) it logs a warning but still stores the new preference. It best-effort
resolves any ``about_entities`` to user ids and KG UUIDs (via :func:`_resolve_kg_entity`)
and finally persists the preference through ``manager.add_preference``. The reinforce,
conflict, and threshold constants are imported lazily from :mod:`persona_preferences`.
Every backend interaction is wrapped so failures are logged and never raised.
Called by :func:`_run_extraction`, once per candidate returned by the extractor LLM.
Args:
item: One candidate preference dict (``name``, ``opinion``, ``category``,
``strength_hint``, ``about_entities``) from the parsed LLM output.
persona_id: Persona whose preference store is being updated.
manager: The :class:`~persona_preferences.PersonaPreferenceManager` providing
embedding, conflict search, reinforce, and persistence.
channel_id: Source channel recorded as provenance on the stored preference.
platform: Source platform recorded as provenance on the stored preference.
triggering_user_id: Triggering user recorded as provenance.
triggering_message_id: Triggering message recorded as provenance.
Returns:
None. Side effects only: an embedding call plus a reinforce or add write
(or an early return when the candidate is empty or embedding fails).
"""
from persona_preferences import (
_hint_to_strength,
_REINFORCE_THRESHOLD,
_CONFLICT_THRESHOLD,
)
name = (item.get("name") or "").strip()[:256]
opinion = (item.get("opinion") or "").strip()[:4096]
category = (item.get("category") or "philosophical").strip()
strength_hint = (item.get("strength_hint") or "mild").strip()
about_entities = item.get("about_entities") or []
if not name or not opinion:
return
strength = _hint_to_strength(strength_hint)
# Embed the new opinion for similarity search
try:
embedding = await manager._embed(opinion)
except Exception:
logger.warning("Failed to embed extraction candidate", exc_info=True)
return
# Dedup check
try:
conflicts = await manager.find_conflicts(
persona_id,
embedding,
threshold=_CONFLICT_THRESHOLD,
)
except Exception:
conflicts = []
if conflicts:
best = max(conflicts, key=lambda x: float(x.get("similarity", 0)))
sim = float(best.get("similarity", 0))
if sim >= _REINFORCE_THRESHOLD:
# Silent auto-reinforce
try:
await manager.reinforce_preference(best["id"], persona_id)
logger.info(
"Auto-reinforced persona preference [%s] '%s' (sim=%.3f)",
best["id"],
best.get("name", ""),
sim,
)
except Exception:
logger.warning("Failed to reinforce preference", exc_info=True)
return
# Conflict zone: still create but log a warning
logger.warning(
"Persona pref conflict zone (sim=%.3f) for '%s' vs existing '%s' [%s]; "
"creating new — use evolve_persona_preference to resolve",
sim,
name,
best.get("name", ""),
best.get("id", ""),
)
# Resolve about_entities to UIDs / KG UUIDs (best-effort)
related_user_ids: list[str] = []
related_kg_uuids: list[str] = []
for entity in about_entities:
entity_str = str(entity).strip()
if not entity_str:
continue
# Pure numeric or snowflake IDs likely are user IDs
if re.match(r"^\d{17,20}$", entity_str):
related_user_ids.append(entity_str)
else:
related_user_ids.append(entity_str)
# Attempt KG UUID resolution
if hasattr(manager, "_cfg") and manager._cfg is not None:
try:
# KG resolution is best-effort; don't block extraction on it
kg_uuid = await _resolve_kg_entity(entity_str, manager)
if kg_uuid:
related_kg_uuids.append(kg_uuid)
except Exception:
pass
try:
await manager.add_preference(
persona_id=persona_id,
name=name,
opinion=opinion,
category=category,
triggering_user_id=triggering_user_id,
triggering_message_id=triggering_message_id,
source_platform=platform,
source_channel_id=channel_id,
strength=strength,
related_user_ids=related_user_ids if related_user_ids else None,
related_kg_uuids=related_kg_uuids if related_kg_uuids else None,
)
except Exception:
logger.warning("Failed to store extracted preference '%s'", name, exc_info=True)
async def _resolve_kg_entity(entity_name: str, manager: Any) -> str | None:
"""Best-effort hook to resolve an entity name to a knowledge-graph UUID.
Intended to map an ``about_entities`` name onto a stable KG UUID so a preference can
be linked to the graph entity it concerns. The persona preference manager does not
expose the processor's ``kg_manager`` here, so the current implementation is a stub
that always returns None — it touches no FalkorDB/KG, makes no network calls, and has
no side effects. It exists so the caller's entity-resolution branch has a clean,
non-raising seam to extend once a KG handle becomes available at this layer.
Called by :func:`_process_single_extraction` while resolving ``about_entities`` for a
candidate preference; the caller treats a None result as "no KG link" and proceeds.
Args:
entity_name: The entity name extracted from the preference's ``about_entities``.
manager: The preference manager (unused by the current stub).
Returns:
The resolved KG UUID, or None — currently always None.
"""
try:
# We can't get kg_manager from manager directly; attempt a module-level approach
# This is best-effort and may return None
return None
except Exception:
return None