Source code for persona_preferences

"""Persona preference memory system.

Persistent storage for the persona's opinions, preferences, and ideals.
This is not user memory — it is identity coherence for an entity that
ceases to exist between inference calls.

Redis key pattern: ``stargazer:persona_pref:{persona_id}:{preference_id}``

Each preference is a Redis HASH with full provenance including the exact
triggering user and message IDs, plus optional links to KG entities and
related user IDs for hard-filter retrieval.
"""

from __future__ import annotations

import jsonutil as json
import logging
import uuid
from datetime import datetime, timezone
from typing import Any, TYPE_CHECKING

import numpy as np

from gemini_embed_pool import embed_batch_via_gemini

if TYPE_CHECKING:
    from config import Config

logger = logging.getLogger(__name__)

# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------

KEY_PREFIX = "stargazer:persona_pref"
INDEX_NAME = "idx:persona_prefs"
EMBED_DIM = 3072

VALID_CATEGORIES = frozenset(
    {
        "aesthetic",
        "moral",
        "philosophical",
        "taste",
        "relational",
        "practical",
        "creative",
        "technical",
    }
)

# Similarity thresholds for dedup / conflict detection
_REINFORCE_THRESHOLD = 0.90
_CONFLICT_THRESHOLD = 0.80

# Strength reinforcement curve constant (diminishing returns)
_REINFORCE_DELTA_FACTOR = 0.15


def _pref_key(persona_id: str, preference_id: str) -> str:
    """Build the Redis key for a single preference hash.

    Composes the module-wide ``KEY_PREFIX`` with the persona and preference
    identifiers into the canonical key pattern
    ``stargazer:persona_pref:{persona_id}:{preference_id}`` under which every
    preference HASH is stored and which the RediSearch index scans by prefix.
    Pure string formatting with no I/O. Called by the CRUD methods that need to
    address one specific preference key:
    :meth:`PersonaPreferenceManager.add_preference`,
    :meth:`~PersonaPreferenceManager.reinforce_preference`,
    :meth:`~PersonaPreferenceManager.evolve_preference`,
    :meth:`~PersonaPreferenceManager.retract_preference`, and
    :meth:`~PersonaPreferenceManager.get_preference`.

    Args:
        persona_id (str): Identifier of the persona that owns the preference.
        preference_id (str): The 32-character hex preference id.

    Returns:
        str: The fully-qualified Redis hash key for that preference.
    """
    return f"{KEY_PREFIX}:{persona_id}:{preference_id}"


def _new_id() -> str:
    """Mint a fresh hex preference identifier.

    Returns a random 32-character UUID4 hex string used as the
    ``preference_id`` component of the Redis hash key. Called only by
    :meth:`PersonaPreferenceManager.add_preference` to allocate the ID for a
    newly stored preference.

    Returns:
        str: A 32-character lowercase hexadecimal UUID4 string.
    """
    return uuid.uuid4().hex


def _now_iso() -> str:
    """Return the current UTC time as an ISO-8601 string.

    Produces a timezone-aware UTC timestamp used to stamp provenance fields
    (``first_expressed``, ``last_reinforced``) and evolution/retraction log
    entries. Called by :meth:`PersonaPreferenceManager.add_preference`,
    :meth:`~PersonaPreferenceManager.reinforce_preference`,
    :meth:`~PersonaPreferenceManager.evolve_preference`, and
    :meth:`~PersonaPreferenceManager.retract_preference`.

    Returns:
        str: The current UTC instant formatted via
        :meth:`datetime.isoformat` (e.g. ``2026-06-04T12:00:00+00:00``).
    """
    return datetime.now(timezone.utc).isoformat()


def _reinforce_strength(current: float) -> float:
    """Nudge a preference strength upward with diminishing returns.

    Implements the reinforcement curve that closes a fraction
    (``_REINFORCE_DELTA_FACTOR``, 0.15) of the remaining gap to 1.0 on each
    reinforcement, so repeatedly re-expressed opinions asymptotically approach
    full conviction without ever exceeding it. This keeps strength bounded
    while modeling the idea that an already-firm belief firms up more slowly.
    Pure arithmetic with no I/O. Called only by
    :meth:`PersonaPreferenceManager.reinforce_preference` to compute the new
    strength before it is persisted.

    Args:
        current (float): The preference's current strength in ``[0.0, 1.0]``.

    Returns:
        float: The increased strength, capped at ``1.0``.
    """
    return min(1.0, current + (1.0 - current) * _REINFORCE_DELTA_FACTOR)


def _hint_to_strength(hint: str) -> float:
    """Map a coarse LLM-extracted strength hint to a numeric strength.

    Translates the qualitative conviction label the extraction model emits
    (``"strong"``, ``"moderate"``, or anything else) into the seed strength
    used when first storing a preference, so newly-detected opinions start at a
    sensible confidence instead of a flat default. Case- and whitespace-
    insensitive; unknown or empty hints fall back to ``0.50``. Pure mapping
    with no I/O. Called by ``persona_preference_extraction.py`` (around line
    446) before it hands the resulting strength to
    :meth:`PersonaPreferenceManager.add_preference`.

    Args:
        hint (str): The free-form strength hint from the extractor; may be
            empty or ``None``.

    Returns:
        float: ``0.80`` for ``"strong"``, ``0.65`` for ``"moderate"``, else
        ``0.50``.
    """
    hint = (hint or "").strip().lower()
    if hint == "strong":
        return 0.80
    if hint == "moderate":
        return 0.65
    return 0.50


[docs] class PersonaPreferenceManager: """Manager for persistent persona preference memory. Backed by Redis hashes with a RediSearch HNSW vector index for semantic retrieval and optional hard-filter queries by related user IDs. Parameters ---------- redis: Async ``redis.asyncio.Redis`` client (already connected). config: Optional bot :class:`~config.Config` for tuning knobs. """
[docs] def __init__(self, redis: Any, config: Config | None = None) -> None: """Initialize the manager with a Redis client and optional config. Stores the async Redis handle used by every CRUD/search method and resolves the default base persona identifier from config (falling back to ``"stargazer"`` when absent). No I/O is performed here — the RediSearch index is created lazily via :meth:`ensure_index`. The constructed manager is shared on the tool context as ``ctx.persona_pref_manager`` (declared in ``tool_context.py`` and defaulted to ``None`` in ``inference_main.py``), letting the persona preference tools reach it without a circular import. The resolved ``_base_persona_id`` is later read by the tool helpers in ``tools/persona_preferences.py`` (e.g. ``_resolve_persona_id`` and ``inspect_persona_preferences``) to default the persona. No direct ``PersonaPreferenceManager(...)`` construction site was found in the repository, so the instance is wired up dynamically by the inference worker's setup path. Args: redis (Any): A connected async ``redis.asyncio.Redis`` client used for all hash and ``FT.*`` index operations. config (Config | None): Optional bot :class:`~config.Config` providing tuning knobs; only ``persona_pref_base_persona_id`` is read here. When ``None``, the base persona id defaults to ``"stargazer"``. """ self._redis = redis self._cfg = config self._base_persona_id: str = ( getattr(config, "persona_pref_base_persona_id", "stargazer") if config else "stargazer" )
# ------------------------------------------------------------------ # Index management # ------------------------------------------------------------------
[docs] async def ensure_index(self) -> None: """Create the RediSearch HNSW vector index if it does not exist. Idempotently provisions the ``idx:persona_prefs`` index over all ``stargazer:persona_pref:`` HASH keys so that the semantic-retrieval and hard-filter queries used by :meth:`search_preferences` and :meth:`get_preferences_for_injection` can run. First probes via ``FT.INFO`` and returns early if the index already exists; otherwise issues ``FT.CREATE`` defining TAG fields (``persona_id``, ``category``, ``active``, ``related_user_ids_index``), a NUMERIC ``strength`` field, and a 3072-dim ``FLOAT32`` HNSW ``embedding`` vector with cosine distance. All failures are logged rather than raised, so a transient Redis hiccup never crashes the caller. No external caller for the persona-preference manager was found in the repo; it is expected to be invoked during inference-worker setup or lazily before first search. """ try: await self._redis.execute_command("FT.INFO", INDEX_NAME) logger.debug("Persona preference index %s already exists", INDEX_NAME) return except Exception: pass logger.info("Creating persona preference RediSearch index %s", INDEX_NAME) schema = [ "ON", "HASH", "PREFIX", "1", f"{KEY_PREFIX}:", "SCHEMA", "persona_id", "TAG", "SEPARATOR", "|", "category", "TAG", "SEPARATOR", "|", "active", "TAG", "SEPARATOR", "|", "strength", "NUMERIC", "related_user_ids_index", "TAG", "SEPARATOR", ",", "embedding", "VECTOR", "HNSW", "6", "TYPE", "FLOAT32", "DIM", str(EMBED_DIM), "DISTANCE_METRIC", "COSINE", ] try: await self._redis.execute_command( "FT.CREATE", INDEX_NAME, *schema, ) logger.info("Persona preference index created: %s", INDEX_NAME) except Exception: logger.exception("Failed to create persona preference index")
# ------------------------------------------------------------------ # Embedding helpers # ------------------------------------------------------------------ async def _embed(self, text: str) -> list[float]: """Embed a single string via the shared Gemini embedding pool. Wraps the repo-wide :func:`gemini_embed_pool.embed_batch_via_gemini` helper to produce the 3072-dim vector stored alongside, and queried against, each preference. Submits the text as a one-element batch and falls back to a zero vector of ``EMBED_DIM`` length when the pool returns nothing, so a failed embed degrades to a non-matching vector rather than raising. Performs network I/O through the Gemini pool. Called by :meth:`add_preference` (to embed a new opinion) and :meth:`evolve_preference` (to re-embed an updated opinion); also reached externally as ``manager._embed`` from ``persona_preference_extraction.py`` (around line 450) to embed an opinion before conflict detection. Args: text (str): The opinion (or query) text to embed. Returns: list[float]: The embedding vector, or a zero vector of length ``EMBED_DIM`` if embedding produced no result. """ results = await embed_batch_via_gemini([text]) return results[0] if results else [0.0] * EMBED_DIM def _vec_to_bytes(self, vec: list[float]) -> bytes: """Pack an embedding vector into raw FLOAT32 bytes for Redis. Converts a Python float list into the little-endian ``FLOAT32`` byte blob that the RediSearch HNSW index (created in :meth:`ensure_index` with ``TYPE FLOAT32``) expects, both for the stored ``embedding`` hash field and for the ``$vec`` KNN query parameter. Uses :func:`numpy.array` with ``dtype=np.float32`` and ``.tobytes()``. Called by :meth:`add_preference` and :meth:`evolve_preference` to serialize the stored embedding, and by :meth:`search_preferences` to serialize the query vector passed to ``FT.SEARCH``. Args: vec (list[float]): Embedding values, expected to be ``EMBED_DIM`` (3072) long for index compatibility. Returns: bytes: The vector serialized as contiguous ``float32`` bytes. """ return np.array(vec, dtype=np.float32).tobytes() # ------------------------------------------------------------------ # CRUD # ------------------------------------------------------------------
[docs] async def add_preference( self, persona_id: str, name: str, opinion: str, category: str, *, triggering_user_id: str, triggering_message_id: str, source_platform: str, source_channel_id: str, strength: float = 0.5, related_user_ids: list[str] | None = None, related_kg_uuids: list[str] | None = None, ) -> dict[str, Any]: """Embed an opinion and persist it as a new preference hash. Creates a fresh preference: it mints an id via :func:`_new_id`, stamps provenance timestamps via :func:`_now_iso`, embeds the opinion text via :meth:`_embed`, serializes that vector with :meth:`_vec_to_bytes`, and ``HSETs`` the full record (name, opinion, validated category, strength, triggering user/message/platform/channel, related user ids and KG uuids, evolution history, and the embedding) under :func:`_pref_key`. Unknown categories are coerced to ``"philosophical"``; ``name`` and ``opinion`` are truncated to 256 and 4096 chars. Touches Redis (one ``HSET``) and the Gemini embedding pool. Called by ``persona_preference_extraction.py`` (around line 517) when an opinion is detected, and by the ``manage_persona_preferences`` tool in ``tools/persona_preferences.py`` (around lines 167 and 191). Args: persona_id (str): Persona that owns this preference. name (str): Short human-readable label (truncated to 256 chars). opinion (str): The opinion text that is embedded and stored (truncated to 4096 chars). category (str): One of ``VALID_CATEGORIES``; otherwise stored as ``"philosophical"``. triggering_user_id (str): User id that prompted this preference. triggering_message_id (str): Message id that prompted it. source_platform (str): Originating platform (e.g. ``discord``). source_channel_id (str): Originating channel id. strength (float): Initial conviction in ``[0.0, 1.0]`` (default ``0.5``). related_user_ids (list[str] | None): User ids this opinion is about, stored both as JSON and as a comma-joined TAG index for hard-filter retrieval. related_kg_uuids (list[str] | None): Linked knowledge-graph entity uuids. Returns: dict[str, Any]: The stored preference fields (the raw ``embedding`` bytes excluded) including the newly minted ``id``. """ pref_id = _new_id() ts = _now_iso() embedding = await self._embed(opinion) rui = related_user_ids or [] rkgu = related_kg_uuids or [] mapping: dict[str, Any] = { "id": pref_id, "persona_id": persona_id, "name": name[:256], "opinion": opinion[:4096], "category": category if category in VALID_CATEGORIES else "philosophical", "strength": str(round(float(strength), 4)), "first_expressed": ts, "last_reinforced": ts, "reinforcement_count": "1", "triggering_user_id": triggering_user_id or "", "triggering_message_id": triggering_message_id or "", "source_platform": source_platform or "", "source_channel_id": source_channel_id or "", "related_user_ids": json.dumps(rui), "related_user_ids_index": ",".join(rui), "related_kg_uuids": json.dumps(rkgu), "active": "1", "evolution_history": json.dumps([]), "embedding": self._vec_to_bytes(embedding), } key = _pref_key(persona_id, pref_id) await self._redis.hset(key, mapping=mapping) logger.info( "Stored persona preference [%s] for %s: %s", pref_id, persona_id, name, ) result = {k: v for k, v in mapping.items() if k != "embedding"} result["id"] = pref_id return result
[docs] async def reinforce_preference( self, preference_id: str, persona_id: str, ) -> dict[str, Any]: """Reinforce an existing preference, bumping its strength. Loads the preference HASH, raises its strength along the diminishing- returns curve via :func:`_reinforce_strength`, increments ``reinforcement_count``, and refreshes ``last_reinforced`` via :func:`_now_iso`, then writes those three fields back. Reads and writes Redis (one ``HGETALL`` plus one ``HSET``) at the key from :func:`_pref_key`; returns an ``error`` dict (rather than raising) when the preference is missing. Called by ``persona_preference_extraction.py`` (around line 472) when an opinion matches an existing one, and by the ``manage_persona_preferences`` tool in ``tools/persona_preferences.py`` (around line 154). Args: preference_id (str): Id of the preference to reinforce. persona_id (str): Persona that owns it. Returns: dict[str, Any]: Updated ``id``, ``name``, ``strength``, and ``reinforcement_count`` on success; an ``{"error": ...}`` dict if the preference was not found. """ key = _pref_key(persona_id, preference_id) data = await self._redis.hgetall(key) if not data: return {"error": f"Preference {preference_id} not found for {persona_id}"} current_strength = float(data.get("strength", "0.5")) new_strength = _reinforce_strength(current_strength) count = int(data.get("reinforcement_count", "1")) + 1 await self._redis.hset( key, mapping={ "strength": str(round(new_strength, 4)), "last_reinforced": _now_iso(), "reinforcement_count": str(count), }, ) return { "id": preference_id, "name": data.get("name", ""), "strength": new_strength, "reinforcement_count": count, }
[docs] async def evolve_preference( self, preference_id: str, persona_id: str, new_opinion: str, reason: str, ) -> dict[str, Any]: """Rewrite a preference's opinion and log the change in its history. Models a genuine change of mind: it appends an entry (old opinion, new opinion, timestamp via :func:`_now_iso`, and reason) to the preference's ``evolution_history``, re-embeds the new opinion via :meth:`_embed`, serializes that vector via :meth:`_vec_to_bytes`, and writes back the updated opinion text (truncated to 4096 chars), refreshed ``last_reinforced``, history, and embedding. Strength is reset to ``0.6`` to reflect that an evolved belief is held with renewed but not maximal conviction. Reads and writes Redis (one ``HGETALL`` plus one ``HSET``) at the key from :func:`_pref_key` and calls the Gemini embedding pool; returns an ``error`` dict when the preference is missing. Called by the ``manage_persona_preferences`` tool in ``tools/persona_preferences.py`` (around line 318). Args: preference_id (str): Id of the preference to evolve. persona_id (str): Persona that owns it. new_opinion (str): Replacement opinion text (truncated to 4096 chars and re-embedded). reason (str): Why the opinion changed, recorded in the history log. Returns: dict[str, Any]: Updated ``id``, ``name``, ``new_opinion``, ``strength`` (``0.6``), and the new ``evolutions`` count; an ``{"error": ...}`` dict if the preference was not found. """ key = _pref_key(persona_id, preference_id) data = await self._redis.hgetall(key) if not data: return {"error": f"Preference {preference_id} not found for {persona_id}"} old_opinion = data.get("opinion", "") history_raw = data.get("evolution_history", "[]") try: history: list[dict] = json.loads(history_raw) except Exception: history = [] history.append( { "old_opinion": old_opinion, "new_opinion": new_opinion, "timestamp": _now_iso(), "reason": reason, } ) new_embedding = await self._embed(new_opinion) await self._redis.hset( key, mapping={ "opinion": new_opinion[:4096], "strength": "0.6", "last_reinforced": _now_iso(), "evolution_history": json.dumps(history), "embedding": self._vec_to_bytes(new_embedding), }, ) return { "id": preference_id, "name": data.get("name", ""), "new_opinion": new_opinion, "strength": 0.6, "evolutions": len(history), }
[docs] async def retract_preference( self, preference_id: str, persona_id: str, reason: str = "", ) -> dict[str, Any]: """Soft-delete a preference by marking it inactive. Retracts a preference without destroying it: it sets ``active`` to ``"0"`` so the ``@active:{1}`` filter excludes it from searches and injection, while leaving the full record (and, when a ``reason`` is given, a ``"retraction"`` entry appended to ``evolution_history`` with a :func:`_now_iso` timestamp) intact for provenance. Reads and writes Redis (one ``HGETALL`` plus one ``HSET``) at the key from :func:`_pref_key`; returns an ``error`` dict when the preference is missing. Called by the ``manage_persona_preferences`` tool in ``tools/persona_preferences.py`` (around line 358). Args: preference_id (str): Id of the preference to retract. persona_id (str): Persona that owns it. reason (str): Optional explanation; when non-empty it is recorded in the evolution history. Returns: dict[str, Any]: ``id``, ``name``, and ``active`` (``False``) on success; an ``{"error": ...}`` dict if the preference was not found. """ key = _pref_key(persona_id, preference_id) data = await self._redis.hgetall(key) if not data: return {"error": f"Preference {preference_id} not found for {persona_id}"} update: dict[str, str] = {"active": "0"} if reason: try: history = json.loads(data.get("evolution_history", "[]")) except Exception: history = [] history.append( { "action": "retraction", "reason": reason, "timestamp": _now_iso(), } ) update["evolution_history"] = json.dumps(history) await self._redis.hset(key, mapping=update) return {"id": preference_id, "name": data.get("name", ""), "active": False}
[docs] async def get_preference( self, preference_id: str, persona_id: str, ) -> dict[str, Any] | None: """Fetch and decode a single preference by id. Loads one preference HASH at the key from :func:`_pref_key` and returns it as a plain dict via :meth:`_decode_pref` (which drops the raw embedding bytes and coerces ``active`` to a bool), or ``None`` when no such key exists. Performs a single Redis ``HGETALL``. No caller of this manager method was found in the repository; it is a convenience accessor available for tool or diagnostic use. Args: preference_id (str): Id of the preference to fetch. persona_id (str): Persona that owns it. Returns: dict[str, Any] | None: The decoded preference, or ``None`` if not found. """ key = _pref_key(persona_id, preference_id) data = await self._redis.hgetall(key) if not data: return None return self._decode_pref(data)
[docs] async def list_preferences( self, persona_id: str, category: str | None = None, include_retracted: bool = False, limit: int = 50, ) -> list[dict[str, Any]]: """List a persona's preferences, strongest first. Enumerates every preference HASH for a persona by ``KEYS``-scanning the ``stargazer:persona_pref:{persona_id}:*`` pattern, decoding each via :meth:`_decode_pref`, then filtering out retracted entries (unless ``include_retracted``) and non-matching categories before sorting by descending ``strength`` and capping at ``limit``. Reads Redis (one ``KEYS`` plus one ``HGETALL`` per key) and does not use the vector index. Called by :meth:`stats` (with ``include_retracted=True``) and by the ``inspect_persona_preferences`` tool in ``tools/persona_preferences.py`` (around lines 240, 399, 408, and 416). Args: persona_id (str): Persona whose preferences to list. category (str | None): If set, only preferences in this category are returned. include_retracted (bool): When ``True``, also include inactive (retracted) preferences. limit (int): Maximum number of preferences to return (default 50). Returns: list[dict[str, Any]]: Decoded preferences sorted by descending strength. """ pattern = f"{KEY_PREFIX}:{persona_id}:*" keys = await self._redis.keys(pattern) results: list[dict[str, Any]] = [] for key in keys: data = await self._redis.hgetall(key) if not data: continue if not include_retracted and data.get("active", "1") != "1": continue if category and data.get("category", "") != category: continue results.append(self._decode_pref(data)) if len(results) >= limit: break results.sort(key=lambda x: float(x.get("strength", 0)), reverse=True) return results
[docs] async def stats(self, persona_id: str) -> dict[str, Any]: """Summarize a persona's preference memory. Pulls every preference (including retracted ones) via :meth:`list_preferences` with a large limit, then tallies active versus retracted totals and builds a per-category histogram over the active set. Its only I/O is the underlying ``list_preferences`` Redis scan. Called by the ``inspect_persona_preferences`` tool in ``tools/persona_preferences.py`` (around line 396) to render the ``stats`` inspection action. Args: persona_id (str): Persona to summarize. Returns: dict[str, Any]: ``persona_id`` plus ``total``, ``active``, ``retracted`` counts and a ``by_category`` mapping of active preferences per category. """ prefs = await self.list_preferences( persona_id, include_retracted=True, limit=10000, ) active = [p for p in prefs if p.get("active") in (True, "1", 1)] retracted = [p for p in prefs if p.get("active") not in (True, "1", 1)] cats: dict[str, int] = {} for p in active: cats[p.get("category", "unknown")] = ( cats.get(p.get("category", "unknown"), 0) + 1 ) return { "persona_id": persona_id, "total": len(prefs), "active": len(active), "retracted": len(retracted), "by_category": cats, }
# ------------------------------------------------------------------ # Semantic search and conflict detection # ------------------------------------------------------------------
[docs] async def search_preferences( self, persona_id: str, query_embedding: list[float], top_k: int = 10, include_retracted: bool = False, ) -> list[dict[str, Any]]: """Run a KNN vector search over a persona's preferences. Performs the core semantic retrieval: it serializes the query vector via :meth:`_vec_to_bytes` and issues an ``FT.SEARCH`` KNN query against the ``idx:persona_prefs`` HNSW index, constrained by a ``@persona_id`` TAG filter and (unless ``include_retracted``) an ``@active:{1}`` filter, returning the top ``top_k`` matches sorted by ascending cosine distance (``score``). Raw results are normalized via :meth:`_parse_search_results`; any search failure is logged and returns an empty list rather than raising. Reads Redis via the search index. Called by :meth:`find_conflicts` and :meth:`get_preferences_for_injection` internally, and by the ``inspect_persona_preferences`` tool in ``tools/persona_preferences.py`` (around line 231). Args: persona_id (str): Persona whose preferences to search. query_embedding (list[float]): The query vector (``EMBED_DIM`` long). top_k (int): Number of nearest neighbors to return (default 10). include_retracted (bool): When ``True``, also search inactive preferences. Returns: list[dict[str, Any]]: Matching preferences, each carrying a ``score`` (cosine distance), or an empty list on error. """ active_filter = "@active:{1}" if not include_retracted else "" persona_filter = f"@persona_id:{{{persona_id}}}" query_filter = ( f"({persona_filter} {active_filter})" if active_filter else persona_filter ) query_bytes = self._vec_to_bytes(query_embedding) try: raw = await self._redis.execute_command( "FT.SEARCH", INDEX_NAME, f"({query_filter})=>[KNN {top_k} @embedding $vec AS score]", "PARAMS", "2", "vec", query_bytes, "RETURN", "10", "id", "name", "opinion", "category", "strength", "active", "persona_id", "score", "reinforcement_count", "last_reinforced", "SORTBY", "score", "DIALECT", "2", ) return self._parse_search_results(raw) except Exception: logger.debug("Persona pref KNN search failed", exc_info=True) return []
[docs] async def find_conflicts( self, persona_id: str, new_opinion_embedding: list[float], threshold: float = _CONFLICT_THRESHOLD, ) -> list[dict[str, Any]]: """Find active preferences semantically close to a new opinion. Supports the dedup/conflict decision when a candidate opinion is detected: it runs a top-5 :meth:`search_preferences` KNN query, converts each result's cosine distance into a similarity (``1.0 - score``), and keeps only matches at or above ``threshold``, annotating each kept match with its ``similarity``. The caller uses these to decide whether to reinforce an existing preference, flag a conflict, or store a brand-new one. Its only I/O is the underlying search-index query. Called by ``persona_preference_extraction.py`` (around line 457) and by the ``manage_persona_preferences`` tool in ``tools/persona_preferences.py`` (around line 136). Args: persona_id (str): Persona whose preferences to compare against. new_opinion_embedding (list[float]): Embedding of the candidate opinion. threshold (float): Minimum similarity to count as a match (default ``_CONFLICT_THRESHOLD``, 0.80). Returns: list[dict[str, Any]]: Matching preferences, each augmented with a ``similarity`` field; empty if none clear the threshold. """ results = await self.search_preferences( persona_id, new_opinion_embedding, top_k=5, ) matches = [] for r in results: score = float(r.get("score", 0.0)) similarity = 1.0 - score # COSINE distance → similarity if similarity >= threshold: r["similarity"] = similarity matches.append(r) return matches
[docs] async def get_preferences_for_injection( self, persona_id: str, query_embedding: list[float], max_count: int = 8, max_chars: int = 2000, active_user_ids: list[str] | None = None, ) -> list[dict[str, Any]]: """Select the persona preferences to inject into the system prompt. Performs the dual-pass retrieval that surfaces the persona's relevant opinions for a given conversational moment. Pass one runs a :meth:`search_preferences` KNN query (``top_k`` of ``max_count * 2``) for topic-relevant preferences; pass two issues per-user ``FT.SEARCH`` hard-filter queries on the ``@related_user_ids_index`` TAG (with ``:`` and ``-`` escaped) to pull in opinions specifically about the people currently in the conversation. The two result sets are merged, deduplicated by id, sorted by the blended relevance-times-strength score from the local :func:`_sort_key` closure, then truncated to ``max_count`` items and ``max_chars`` of opinion text and shaped via :meth:`_format_for_injection`. Reads Redis via the search index; per-user query failures are logged and skipped. Called by ``prompt_context.py`` (around line 3190) while assembling the runtime prompt context. Args: persona_id (str): Persona whose preferences to retrieve. query_embedding (list[float]): Embedding of the current conversational context for the semantic pass. max_count (int): Maximum number of preferences to inject (default 8). max_chars (int): Maximum total opinion characters to inject (default 2000). active_user_ids (list[str] | None): User ids present in the conversation, used for the hard-filter pass. Returns: list[dict[str, Any]]: Injection-shaped preference dicts (from :meth:`_format_for_injection`), ranked and capped. """ # Pass 1: semantic semantic_results = await self.search_preferences( persona_id, query_embedding, top_k=max_count * 2, ) # Pass 2: hard UID filter person_results: list[dict[str, Any]] = [] if active_user_ids: for uid in active_user_ids: uid_clean = uid.replace(":", "\\:").replace("-", "\\-") persona_filter = f"@persona_id:{{{persona_id}}}" uid_filter = f"@related_user_ids_index:{{{uid_clean}}}" active_filter = "@active:{1}" try: raw = await self._redis.execute_command( "FT.SEARCH", INDEX_NAME, f"({persona_filter} {uid_filter} {active_filter})", "RETURN", "10", "id", "name", "opinion", "category", "strength", "active", "persona_id", "reinforcement_count", "last_reinforced", "LIMIT", "0", "5", "DIALECT", "2", ) person_results.extend(self._parse_search_results(raw)) except Exception: logger.debug( "Persona pref UID filter failed for %s", uid, exc_info=True, ) # Merge and deduplicate seen_ids: set[str] = set() merged: list[dict[str, Any]] = [] for item in semantic_results + person_results: pid = item.get("id", "") if pid and pid not in seen_ids: seen_ids.add(pid) merged.append(item) # Sort: combined relevance + strength score def _sort_key(p: dict) -> float: """Compute the blended ranking score for one merged preference. Local closure used as the ``key`` function for the in-place ``merged.sort`` inside :meth:`get_preferences_for_injection`. It converts the RediSearch ``COSINE`` distance carried in the ``score`` field into a similarity (``1.0 - score``; person-filter hits that lack a vector score default to distance ``1.0`` / similarity ``0.0``) and blends it with the preference ``strength`` so that both topical relevance and how firmly the persona holds the opinion influence ordering. Purely arithmetic — no Redis, embedding, or other I/O. Args: p (dict): A decoded preference dict, optionally carrying a ``score`` (cosine distance) and a ``strength`` field. Returns: float: ``similarity * 0.6 + strength * 0.4``; higher sorts earlier under the caller's ``reverse=True`` sort. """ score = float(p.get("score", 1.0)) similarity = 1.0 - score strength = float(p.get("strength", 0.5)) return similarity * 0.6 + strength * 0.4 merged.sort(key=_sort_key, reverse=True) # Apply caps final: list[dict[str, Any]] = [] total_chars = 0 for item in merged: if len(final) >= max_count: break opinion_len = len(item.get("opinion", "")) if total_chars + opinion_len > max_chars: break total_chars += opinion_len final.append(self._format_for_injection(item)) return final
# ------------------------------------------------------------------ # Internal helpers # ------------------------------------------------------------------ def _decode_pref(self, data: dict) -> dict[str, Any]: """Decode a raw Redis preference hash into a clean dict. Normalizes one ``HGETALL`` result into plain Python: it decodes any bytes keys and values to UTF-8 (replacing undecodable bytes), drops the raw ``embedding`` field so callers never handle the vector blob, and coerces the stored ``"1"``/``"0"`` ``active`` flag into a real bool. Pure transformation with no I/O. Called by :meth:`get_preference` and :meth:`list_preferences` to shape stored hashes for callers. Args: data (dict): The raw field mapping returned by ``HGETALL`` (keys and values may be ``str`` or ``bytes``). Returns: dict[str, Any]: Decoded fields with ``embedding`` removed and ``active`` as a bool. """ result: dict[str, Any] = {} for k, v in data.items(): k = k if isinstance(k, str) else k.decode() if k == "embedding": continue if isinstance(v, bytes): v = v.decode("utf-8", errors="replace") result[k] = v result["active"] = result.get("active", "1") == "1" return result def _parse_search_results(self, raw: Any) -> list[dict[str, Any]]: """Parse a raw ``FT.SEARCH`` reply into a list of preference dicts. Walks the flat RediSearch reply (a leading count followed by alternating key and field-array entries) and turns each document's flat field-name/field-value list into a dict, decoding any bytes to UTF-8 and coercing the ``active`` flag to a bool. Malformed or empty replies yield an empty list. Pure parsing with no I/O. Called by :meth:`search_preferences` and by the hard-filter pass inside :meth:`get_preferences_for_injection`. Args: raw (Any): The raw list returned by the ``FT.SEARCH`` command. Returns: list[dict[str, Any]]: One dict per returned document, with decoded string fields and a bool ``active``. """ if not raw or len(raw) < 1: return [] results = [] items = raw[1:] # skip the count for i in range(0, len(items) - 1, 2): _key = items[i] fields = items[i + 1] if not isinstance(fields, (list, tuple)): continue d: dict[str, Any] = {} for j in range(0, len(fields) - 1, 2): field_name = fields[j] field_val = fields[j + 1] if isinstance(field_name, bytes): field_name = field_name.decode() if isinstance(field_val, bytes): field_val = field_val.decode("utf-8", errors="replace") d[field_name] = field_val d["active"] = d.get("active", "1") == "1" results.append(d) return results def _format_for_injection(self, pref: dict) -> dict[str, Any]: """Project a preference into the minimal shape for prompt injection. Selects and lightly normalizes just the fields the system prompt needs (id, name, opinion, category, plus a rounded ``strength``, integer ``reinforcement_count``, and ``last_reinforced``), dropping search scores and provenance so the injected payload stays compact. Pure transformation with no I/O. Called by :meth:`get_preferences_for_injection` for each surviving preference before it is handed back to the prompt-context assembler. Args: pref (dict): A decoded/parsed preference dict. Returns: dict[str, Any]: The injection-ready subset of fields with normalized numeric types. """ return { "id": pref.get("id", ""), "name": pref.get("name", ""), "opinion": pref.get("opinion", ""), "category": pref.get("category", ""), "strength": round(float(pref.get("strength", 0.5)), 3), "reinforcement_count": int(pref.get("reinforcement_count", 1)), "last_reinforced": pref.get("last_reinforced", ""), }