"""Persona preference memory system.
Persistent storage for the persona's opinions, preferences, and ideals.
This is not user memory — it is identity coherence for an entity that
ceases to exist between inference calls.
Redis key pattern: ``stargazer:persona_pref:{persona_id}:{preference_id}``
Each preference is a Redis HASH with full provenance including the exact
triggering user and message IDs, plus optional links to KG entities and
related user IDs for hard-filter retrieval.
"""
from __future__ import annotations
import jsonutil as json
import logging
import uuid
from datetime import datetime, timezone
from typing import Any, TYPE_CHECKING
import numpy as np
from gemini_embed_pool import embed_batch_via_gemini
if TYPE_CHECKING:
from config import Config
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
KEY_PREFIX = "stargazer:persona_pref"
INDEX_NAME = "idx:persona_prefs"
EMBED_DIM = 3072
VALID_CATEGORIES = frozenset(
{
"aesthetic",
"moral",
"philosophical",
"taste",
"relational",
"practical",
"creative",
"technical",
}
)
# Similarity thresholds for dedup / conflict detection
_REINFORCE_THRESHOLD = 0.90
_CONFLICT_THRESHOLD = 0.80
# Strength reinforcement curve constant (diminishing returns)
_REINFORCE_DELTA_FACTOR = 0.15
def _pref_key(persona_id: str, preference_id: str) -> str:
"""Build the Redis key for a single preference hash.
Composes the module-wide ``KEY_PREFIX`` with the persona and preference
identifiers into the canonical key pattern
``stargazer:persona_pref:{persona_id}:{preference_id}`` under which every
preference HASH is stored and which the RediSearch index scans by prefix.
Pure string formatting with no I/O. Called by the CRUD methods that need to
address one specific preference key:
:meth:`PersonaPreferenceManager.add_preference`,
:meth:`~PersonaPreferenceManager.reinforce_preference`,
:meth:`~PersonaPreferenceManager.evolve_preference`,
:meth:`~PersonaPreferenceManager.retract_preference`, and
:meth:`~PersonaPreferenceManager.get_preference`.
Args:
persona_id (str): Identifier of the persona that owns the preference.
preference_id (str): The 32-character hex preference id.
Returns:
str: The fully-qualified Redis hash key for that preference.
"""
return f"{KEY_PREFIX}:{persona_id}:{preference_id}"
def _new_id() -> str:
"""Mint a fresh hex preference identifier.
Returns a random 32-character UUID4 hex string used as the
``preference_id`` component of the Redis hash key. Called only by
:meth:`PersonaPreferenceManager.add_preference` to allocate the ID for a
newly stored preference.
Returns:
str: A 32-character lowercase hexadecimal UUID4 string.
"""
return uuid.uuid4().hex
def _now_iso() -> str:
"""Return the current UTC time as an ISO-8601 string.
Produces a timezone-aware UTC timestamp used to stamp provenance fields
(``first_expressed``, ``last_reinforced``) and evolution/retraction log
entries. Called by :meth:`PersonaPreferenceManager.add_preference`,
:meth:`~PersonaPreferenceManager.reinforce_preference`,
:meth:`~PersonaPreferenceManager.evolve_preference`, and
:meth:`~PersonaPreferenceManager.retract_preference`.
Returns:
str: The current UTC instant formatted via
:meth:`datetime.isoformat` (e.g. ``2026-06-04T12:00:00+00:00``).
"""
return datetime.now(timezone.utc).isoformat()
def _reinforce_strength(current: float) -> float:
"""Nudge a preference strength upward with diminishing returns.
Implements the reinforcement curve that closes a fraction
(``_REINFORCE_DELTA_FACTOR``, 0.15) of the remaining gap to 1.0 on each
reinforcement, so repeatedly re-expressed opinions asymptotically approach
full conviction without ever exceeding it. This keeps strength bounded
while modeling the idea that an already-firm belief firms up more slowly.
Pure arithmetic with no I/O. Called only by
:meth:`PersonaPreferenceManager.reinforce_preference` to compute the new
strength before it is persisted.
Args:
current (float): The preference's current strength in ``[0.0, 1.0]``.
Returns:
float: The increased strength, capped at ``1.0``.
"""
return min(1.0, current + (1.0 - current) * _REINFORCE_DELTA_FACTOR)
def _hint_to_strength(hint: str) -> float:
"""Map a coarse LLM-extracted strength hint to a numeric strength.
Translates the qualitative conviction label the extraction model emits
(``"strong"``, ``"moderate"``, or anything else) into the seed strength
used when first storing a preference, so newly-detected opinions start at a
sensible confidence instead of a flat default. Case- and whitespace-
insensitive; unknown or empty hints fall back to ``0.50``. Pure mapping
with no I/O. Called by ``persona_preference_extraction.py`` (around line
446) before it hands the resulting strength to
:meth:`PersonaPreferenceManager.add_preference`.
Args:
hint (str): The free-form strength hint from the extractor; may be
empty or ``None``.
Returns:
float: ``0.80`` for ``"strong"``, ``0.65`` for ``"moderate"``, else
``0.50``.
"""
hint = (hint or "").strip().lower()
if hint == "strong":
return 0.80
if hint == "moderate":
return 0.65
return 0.50
[docs]
class PersonaPreferenceManager:
"""Manager for persistent persona preference memory.
Backed by Redis hashes with a RediSearch HNSW vector index for semantic
retrieval and optional hard-filter queries by related user IDs.
Parameters
----------
redis:
Async ``redis.asyncio.Redis`` client (already connected).
config:
Optional bot :class:`~config.Config` for tuning knobs.
"""
[docs]
def __init__(self, redis: Any, config: Config | None = None) -> None:
"""Initialize the manager with a Redis client and optional config.
Stores the async Redis handle used by every CRUD/search method and
resolves the default base persona identifier from config (falling back
to ``"stargazer"`` when absent). No I/O is performed here — the
RediSearch index is created lazily via :meth:`ensure_index`.
The constructed manager is shared on the tool context as
``ctx.persona_pref_manager`` (declared in ``tool_context.py`` and
defaulted to ``None`` in ``inference_main.py``), letting the persona
preference tools reach it without a circular import. The resolved
``_base_persona_id`` is later read by the tool helpers in
``tools/persona_preferences.py`` (e.g. ``_resolve_persona_id`` and
``inspect_persona_preferences``) to default the persona. No direct
``PersonaPreferenceManager(...)`` construction site was found in the
repository, so the instance is wired up dynamically by the inference
worker's setup path.
Args:
redis (Any): A connected async ``redis.asyncio.Redis`` client used
for all hash and ``FT.*`` index operations.
config (Config | None): Optional bot :class:`~config.Config`
providing tuning knobs; only ``persona_pref_base_persona_id``
is read here. When ``None``, the base persona id defaults to
``"stargazer"``.
"""
self._redis = redis
self._cfg = config
self._base_persona_id: str = (
getattr(config, "persona_pref_base_persona_id", "stargazer")
if config
else "stargazer"
)
# ------------------------------------------------------------------
# Index management
# ------------------------------------------------------------------
[docs]
async def ensure_index(self) -> None:
"""Create the RediSearch HNSW vector index if it does not exist.
Idempotently provisions the ``idx:persona_prefs`` index over all
``stargazer:persona_pref:`` HASH keys so that the semantic-retrieval and
hard-filter queries used by :meth:`search_preferences` and
:meth:`get_preferences_for_injection` can run. First probes via
``FT.INFO`` and returns early if the index already exists; otherwise
issues ``FT.CREATE`` defining TAG fields (``persona_id``, ``category``,
``active``, ``related_user_ids_index``), a NUMERIC ``strength`` field,
and a 3072-dim ``FLOAT32`` HNSW ``embedding`` vector with cosine
distance. All failures are logged rather than raised, so a transient
Redis hiccup never crashes the caller. No external caller for the
persona-preference manager was found in the repo; it is expected to be
invoked during inference-worker setup or lazily before first search.
"""
try:
await self._redis.execute_command("FT.INFO", INDEX_NAME)
logger.debug("Persona preference index %s already exists", INDEX_NAME)
return
except Exception:
pass
logger.info("Creating persona preference RediSearch index %s", INDEX_NAME)
schema = [
"ON",
"HASH",
"PREFIX",
"1",
f"{KEY_PREFIX}:",
"SCHEMA",
"persona_id",
"TAG",
"SEPARATOR",
"|",
"category",
"TAG",
"SEPARATOR",
"|",
"active",
"TAG",
"SEPARATOR",
"|",
"strength",
"NUMERIC",
"related_user_ids_index",
"TAG",
"SEPARATOR",
",",
"embedding",
"VECTOR",
"HNSW",
"6",
"TYPE",
"FLOAT32",
"DIM",
str(EMBED_DIM),
"DISTANCE_METRIC",
"COSINE",
]
try:
await self._redis.execute_command(
"FT.CREATE",
INDEX_NAME,
*schema,
)
logger.info("Persona preference index created: %s", INDEX_NAME)
except Exception:
logger.exception("Failed to create persona preference index")
# ------------------------------------------------------------------
# Embedding helpers
# ------------------------------------------------------------------
async def _embed(self, text: str) -> list[float]:
"""Embed a single string via the shared Gemini embedding pool.
Wraps the repo-wide :func:`gemini_embed_pool.embed_batch_via_gemini`
helper to produce the 3072-dim vector stored alongside, and queried
against, each preference. Submits the text as a one-element batch and
falls back to a zero vector of ``EMBED_DIM`` length when the pool
returns nothing, so a failed embed degrades to a non-matching vector
rather than raising. Performs network I/O through the Gemini pool.
Called by :meth:`add_preference` (to embed a new opinion) and
:meth:`evolve_preference` (to re-embed an updated opinion); also reached
externally as ``manager._embed`` from
``persona_preference_extraction.py`` (around line 450) to embed an
opinion before conflict detection.
Args:
text (str): The opinion (or query) text to embed.
Returns:
list[float]: The embedding vector, or a zero vector of length
``EMBED_DIM`` if embedding produced no result.
"""
results = await embed_batch_via_gemini([text])
return results[0] if results else [0.0] * EMBED_DIM
def _vec_to_bytes(self, vec: list[float]) -> bytes:
"""Pack an embedding vector into raw FLOAT32 bytes for Redis.
Converts a Python float list into the little-endian ``FLOAT32`` byte
blob that the RediSearch HNSW index (created in :meth:`ensure_index`
with ``TYPE FLOAT32``) expects, both for the stored ``embedding`` hash
field and for the ``$vec`` KNN query parameter. Uses
:func:`numpy.array` with ``dtype=np.float32`` and ``.tobytes()``.
Called by :meth:`add_preference` and :meth:`evolve_preference` to
serialize the stored embedding, and by :meth:`search_preferences` to
serialize the query vector passed to ``FT.SEARCH``.
Args:
vec (list[float]): Embedding values, expected to be ``EMBED_DIM``
(3072) long for index compatibility.
Returns:
bytes: The vector serialized as contiguous ``float32`` bytes.
"""
return np.array(vec, dtype=np.float32).tobytes()
# ------------------------------------------------------------------
# CRUD
# ------------------------------------------------------------------
[docs]
async def add_preference(
self,
persona_id: str,
name: str,
opinion: str,
category: str,
*,
triggering_user_id: str,
triggering_message_id: str,
source_platform: str,
source_channel_id: str,
strength: float = 0.5,
related_user_ids: list[str] | None = None,
related_kg_uuids: list[str] | None = None,
) -> dict[str, Any]:
"""Embed an opinion and persist it as a new preference hash.
Creates a fresh preference: it mints an id via :func:`_new_id`, stamps
provenance timestamps via :func:`_now_iso`, embeds the opinion text via
:meth:`_embed`, serializes that vector with :meth:`_vec_to_bytes`, and
``HSETs`` the full record (name, opinion, validated category, strength,
triggering user/message/platform/channel, related user ids and KG
uuids, evolution history, and the embedding) under
:func:`_pref_key`. Unknown categories are coerced to
``"philosophical"``; ``name`` and ``opinion`` are truncated to 256 and
4096 chars. Touches Redis (one ``HSET``) and the Gemini embedding pool.
Called by ``persona_preference_extraction.py`` (around line 517) when
an opinion is detected, and by the ``manage_persona_preferences`` tool
in ``tools/persona_preferences.py`` (around lines 167 and 191).
Args:
persona_id (str): Persona that owns this preference.
name (str): Short human-readable label (truncated to 256 chars).
opinion (str): The opinion text that is embedded and stored
(truncated to 4096 chars).
category (str): One of ``VALID_CATEGORIES``; otherwise stored as
``"philosophical"``.
triggering_user_id (str): User id that prompted this preference.
triggering_message_id (str): Message id that prompted it.
source_platform (str): Originating platform (e.g. ``discord``).
source_channel_id (str): Originating channel id.
strength (float): Initial conviction in ``[0.0, 1.0]`` (default
``0.5``).
related_user_ids (list[str] | None): User ids this opinion is about,
stored both as JSON and as a comma-joined TAG index for
hard-filter retrieval.
related_kg_uuids (list[str] | None): Linked knowledge-graph entity
uuids.
Returns:
dict[str, Any]: The stored preference fields (the raw ``embedding``
bytes excluded) including the newly minted ``id``.
"""
pref_id = _new_id()
ts = _now_iso()
embedding = await self._embed(opinion)
rui = related_user_ids or []
rkgu = related_kg_uuids or []
mapping: dict[str, Any] = {
"id": pref_id,
"persona_id": persona_id,
"name": name[:256],
"opinion": opinion[:4096],
"category": category if category in VALID_CATEGORIES else "philosophical",
"strength": str(round(float(strength), 4)),
"first_expressed": ts,
"last_reinforced": ts,
"reinforcement_count": "1",
"triggering_user_id": triggering_user_id or "",
"triggering_message_id": triggering_message_id or "",
"source_platform": source_platform or "",
"source_channel_id": source_channel_id or "",
"related_user_ids": json.dumps(rui),
"related_user_ids_index": ",".join(rui),
"related_kg_uuids": json.dumps(rkgu),
"active": "1",
"evolution_history": json.dumps([]),
"embedding": self._vec_to_bytes(embedding),
}
key = _pref_key(persona_id, pref_id)
await self._redis.hset(key, mapping=mapping)
logger.info(
"Stored persona preference [%s] for %s: %s",
pref_id,
persona_id,
name,
)
result = {k: v for k, v in mapping.items() if k != "embedding"}
result["id"] = pref_id
return result
[docs]
async def reinforce_preference(
self,
preference_id: str,
persona_id: str,
) -> dict[str, Any]:
"""Reinforce an existing preference, bumping its strength.
Loads the preference HASH, raises its strength along the diminishing-
returns curve via :func:`_reinforce_strength`, increments
``reinforcement_count``, and refreshes ``last_reinforced`` via
:func:`_now_iso`, then writes those three fields back. Reads and writes
Redis (one ``HGETALL`` plus one ``HSET``) at the key from
:func:`_pref_key`; returns an ``error`` dict (rather than raising) when
the preference is missing. Called by
``persona_preference_extraction.py`` (around line 472) when an opinion
matches an existing one, and by the ``manage_persona_preferences`` tool
in ``tools/persona_preferences.py`` (around line 154).
Args:
preference_id (str): Id of the preference to reinforce.
persona_id (str): Persona that owns it.
Returns:
dict[str, Any]: Updated ``id``, ``name``, ``strength``, and
``reinforcement_count`` on success; an ``{"error": ...}`` dict if
the preference was not found.
"""
key = _pref_key(persona_id, preference_id)
data = await self._redis.hgetall(key)
if not data:
return {"error": f"Preference {preference_id} not found for {persona_id}"}
current_strength = float(data.get("strength", "0.5"))
new_strength = _reinforce_strength(current_strength)
count = int(data.get("reinforcement_count", "1")) + 1
await self._redis.hset(
key,
mapping={
"strength": str(round(new_strength, 4)),
"last_reinforced": _now_iso(),
"reinforcement_count": str(count),
},
)
return {
"id": preference_id,
"name": data.get("name", ""),
"strength": new_strength,
"reinforcement_count": count,
}
[docs]
async def evolve_preference(
self,
preference_id: str,
persona_id: str,
new_opinion: str,
reason: str,
) -> dict[str, Any]:
"""Rewrite a preference's opinion and log the change in its history.
Models a genuine change of mind: it appends an entry (old opinion, new
opinion, timestamp via :func:`_now_iso`, and reason) to the preference's
``evolution_history``, re-embeds the new opinion via :meth:`_embed`,
serializes that vector via :meth:`_vec_to_bytes`, and writes back the
updated opinion text (truncated to 4096 chars), refreshed
``last_reinforced``, history, and embedding. Strength is reset to
``0.6`` to reflect that an evolved belief is held with renewed but not
maximal conviction. Reads and writes Redis (one ``HGETALL`` plus one
``HSET``) at the key from :func:`_pref_key` and calls the Gemini
embedding pool; returns an ``error`` dict when the preference is
missing. Called by the ``manage_persona_preferences`` tool in
``tools/persona_preferences.py`` (around line 318).
Args:
preference_id (str): Id of the preference to evolve.
persona_id (str): Persona that owns it.
new_opinion (str): Replacement opinion text (truncated to 4096
chars and re-embedded).
reason (str): Why the opinion changed, recorded in the history log.
Returns:
dict[str, Any]: Updated ``id``, ``name``, ``new_opinion``,
``strength`` (``0.6``), and the new ``evolutions`` count; an
``{"error": ...}`` dict if the preference was not found.
"""
key = _pref_key(persona_id, preference_id)
data = await self._redis.hgetall(key)
if not data:
return {"error": f"Preference {preference_id} not found for {persona_id}"}
old_opinion = data.get("opinion", "")
history_raw = data.get("evolution_history", "[]")
try:
history: list[dict] = json.loads(history_raw)
except Exception:
history = []
history.append(
{
"old_opinion": old_opinion,
"new_opinion": new_opinion,
"timestamp": _now_iso(),
"reason": reason,
}
)
new_embedding = await self._embed(new_opinion)
await self._redis.hset(
key,
mapping={
"opinion": new_opinion[:4096],
"strength": "0.6",
"last_reinforced": _now_iso(),
"evolution_history": json.dumps(history),
"embedding": self._vec_to_bytes(new_embedding),
},
)
return {
"id": preference_id,
"name": data.get("name", ""),
"new_opinion": new_opinion,
"strength": 0.6,
"evolutions": len(history),
}
[docs]
async def retract_preference(
self,
preference_id: str,
persona_id: str,
reason: str = "",
) -> dict[str, Any]:
"""Soft-delete a preference by marking it inactive.
Retracts a preference without destroying it: it sets ``active`` to
``"0"`` so the ``@active:{1}`` filter excludes it from searches and
injection, while leaving the full record (and, when a ``reason`` is
given, a ``"retraction"`` entry appended to ``evolution_history`` with
a :func:`_now_iso` timestamp) intact for provenance. Reads and writes
Redis (one ``HGETALL`` plus one ``HSET``) at the key from
:func:`_pref_key`; returns an ``error`` dict when the preference is
missing. Called by the ``manage_persona_preferences`` tool in
``tools/persona_preferences.py`` (around line 358).
Args:
preference_id (str): Id of the preference to retract.
persona_id (str): Persona that owns it.
reason (str): Optional explanation; when non-empty it is recorded
in the evolution history.
Returns:
dict[str, Any]: ``id``, ``name``, and ``active`` (``False``) on
success; an ``{"error": ...}`` dict if the preference was not found.
"""
key = _pref_key(persona_id, preference_id)
data = await self._redis.hgetall(key)
if not data:
return {"error": f"Preference {preference_id} not found for {persona_id}"}
update: dict[str, str] = {"active": "0"}
if reason:
try:
history = json.loads(data.get("evolution_history", "[]"))
except Exception:
history = []
history.append(
{
"action": "retraction",
"reason": reason,
"timestamp": _now_iso(),
}
)
update["evolution_history"] = json.dumps(history)
await self._redis.hset(key, mapping=update)
return {"id": preference_id, "name": data.get("name", ""), "active": False}
[docs]
async def get_preference(
self,
preference_id: str,
persona_id: str,
) -> dict[str, Any] | None:
"""Fetch and decode a single preference by id.
Loads one preference HASH at the key from :func:`_pref_key` and returns
it as a plain dict via :meth:`_decode_pref` (which drops the raw
embedding bytes and coerces ``active`` to a bool), or ``None`` when no
such key exists. Performs a single Redis ``HGETALL``. No caller of this
manager method was found in the repository; it is a convenience
accessor available for tool or diagnostic use.
Args:
preference_id (str): Id of the preference to fetch.
persona_id (str): Persona that owns it.
Returns:
dict[str, Any] | None: The decoded preference, or ``None`` if not
found.
"""
key = _pref_key(persona_id, preference_id)
data = await self._redis.hgetall(key)
if not data:
return None
return self._decode_pref(data)
[docs]
async def list_preferences(
self,
persona_id: str,
category: str | None = None,
include_retracted: bool = False,
limit: int = 50,
) -> list[dict[str, Any]]:
"""List a persona's preferences, strongest first.
Enumerates every preference HASH for a persona by ``KEYS``-scanning the
``stargazer:persona_pref:{persona_id}:*`` pattern, decoding each via
:meth:`_decode_pref`, then filtering out retracted entries (unless
``include_retracted``) and non-matching categories before sorting by
descending ``strength`` and capping at ``limit``. Reads Redis (one
``KEYS`` plus one ``HGETALL`` per key) and does not use the vector
index. Called by :meth:`stats` (with ``include_retracted=True``) and by
the ``inspect_persona_preferences`` tool in
``tools/persona_preferences.py`` (around lines 240, 399, 408, and 416).
Args:
persona_id (str): Persona whose preferences to list.
category (str | None): If set, only preferences in this category are
returned.
include_retracted (bool): When ``True``, also include inactive
(retracted) preferences.
limit (int): Maximum number of preferences to return (default 50).
Returns:
list[dict[str, Any]]: Decoded preferences sorted by descending
strength.
"""
pattern = f"{KEY_PREFIX}:{persona_id}:*"
keys = await self._redis.keys(pattern)
results: list[dict[str, Any]] = []
for key in keys:
data = await self._redis.hgetall(key)
if not data:
continue
if not include_retracted and data.get("active", "1") != "1":
continue
if category and data.get("category", "") != category:
continue
results.append(self._decode_pref(data))
if len(results) >= limit:
break
results.sort(key=lambda x: float(x.get("strength", 0)), reverse=True)
return results
[docs]
async def stats(self, persona_id: str) -> dict[str, Any]:
"""Summarize a persona's preference memory.
Pulls every preference (including retracted ones) via
:meth:`list_preferences` with a large limit, then tallies active versus
retracted totals and builds a per-category histogram over the active
set. Its only I/O is the underlying ``list_preferences`` Redis scan.
Called by the ``inspect_persona_preferences`` tool in
``tools/persona_preferences.py`` (around line 396) to render the
``stats`` inspection action.
Args:
persona_id (str): Persona to summarize.
Returns:
dict[str, Any]: ``persona_id`` plus ``total``, ``active``,
``retracted`` counts and a ``by_category`` mapping of active
preferences per category.
"""
prefs = await self.list_preferences(
persona_id,
include_retracted=True,
limit=10000,
)
active = [p for p in prefs if p.get("active") in (True, "1", 1)]
retracted = [p for p in prefs if p.get("active") not in (True, "1", 1)]
cats: dict[str, int] = {}
for p in active:
cats[p.get("category", "unknown")] = (
cats.get(p.get("category", "unknown"), 0) + 1
)
return {
"persona_id": persona_id,
"total": len(prefs),
"active": len(active),
"retracted": len(retracted),
"by_category": cats,
}
# ------------------------------------------------------------------
# Semantic search and conflict detection
# ------------------------------------------------------------------
[docs]
async def search_preferences(
self,
persona_id: str,
query_embedding: list[float],
top_k: int = 10,
include_retracted: bool = False,
) -> list[dict[str, Any]]:
"""Run a KNN vector search over a persona's preferences.
Performs the core semantic retrieval: it serializes the query vector
via :meth:`_vec_to_bytes` and issues an ``FT.SEARCH`` KNN query against
the ``idx:persona_prefs`` HNSW index, constrained by a
``@persona_id`` TAG filter and (unless ``include_retracted``) an
``@active:{1}`` filter, returning the top ``top_k`` matches sorted by
ascending cosine distance (``score``). Raw results are normalized via
:meth:`_parse_search_results`; any search failure is logged and
returns an empty list rather than raising. Reads Redis via the search
index. Called by :meth:`find_conflicts` and
:meth:`get_preferences_for_injection` internally, and by the
``inspect_persona_preferences`` tool in
``tools/persona_preferences.py`` (around line 231).
Args:
persona_id (str): Persona whose preferences to search.
query_embedding (list[float]): The query vector (``EMBED_DIM`` long).
top_k (int): Number of nearest neighbors to return (default 10).
include_retracted (bool): When ``True``, also search inactive
preferences.
Returns:
list[dict[str, Any]]: Matching preferences, each carrying a
``score`` (cosine distance), or an empty list on error.
"""
active_filter = "@active:{1}" if not include_retracted else ""
persona_filter = f"@persona_id:{{{persona_id}}}"
query_filter = (
f"({persona_filter} {active_filter})" if active_filter else persona_filter
)
query_bytes = self._vec_to_bytes(query_embedding)
try:
raw = await self._redis.execute_command(
"FT.SEARCH",
INDEX_NAME,
f"({query_filter})=>[KNN {top_k} @embedding $vec AS score]",
"PARAMS",
"2",
"vec",
query_bytes,
"RETURN",
"10",
"id",
"name",
"opinion",
"category",
"strength",
"active",
"persona_id",
"score",
"reinforcement_count",
"last_reinforced",
"SORTBY",
"score",
"DIALECT",
"2",
)
return self._parse_search_results(raw)
except Exception:
logger.debug("Persona pref KNN search failed", exc_info=True)
return []
[docs]
async def find_conflicts(
self,
persona_id: str,
new_opinion_embedding: list[float],
threshold: float = _CONFLICT_THRESHOLD,
) -> list[dict[str, Any]]:
"""Find active preferences semantically close to a new opinion.
Supports the dedup/conflict decision when a candidate opinion is
detected: it runs a top-5 :meth:`search_preferences` KNN query, converts
each result's cosine distance into a similarity (``1.0 - score``), and
keeps only matches at or above ``threshold``, annotating each kept match
with its ``similarity``. The caller uses these to decide whether to
reinforce an existing preference, flag a conflict, or store a brand-new
one. Its only I/O is the underlying search-index query. Called by
``persona_preference_extraction.py`` (around line 457) and by the
``manage_persona_preferences`` tool in ``tools/persona_preferences.py``
(around line 136).
Args:
persona_id (str): Persona whose preferences to compare against.
new_opinion_embedding (list[float]): Embedding of the candidate
opinion.
threshold (float): Minimum similarity to count as a match (default
``_CONFLICT_THRESHOLD``, 0.80).
Returns:
list[dict[str, Any]]: Matching preferences, each augmented with a
``similarity`` field; empty if none clear the threshold.
"""
results = await self.search_preferences(
persona_id,
new_opinion_embedding,
top_k=5,
)
matches = []
for r in results:
score = float(r.get("score", 0.0))
similarity = 1.0 - score # COSINE distance → similarity
if similarity >= threshold:
r["similarity"] = similarity
matches.append(r)
return matches
[docs]
async def get_preferences_for_injection(
self,
persona_id: str,
query_embedding: list[float],
max_count: int = 8,
max_chars: int = 2000,
active_user_ids: list[str] | None = None,
) -> list[dict[str, Any]]:
"""Select the persona preferences to inject into the system prompt.
Performs the dual-pass retrieval that surfaces the persona's relevant
opinions for a given conversational moment. Pass one runs a
:meth:`search_preferences` KNN query (``top_k`` of ``max_count * 2``)
for topic-relevant preferences; pass two issues per-user ``FT.SEARCH``
hard-filter queries on the ``@related_user_ids_index`` TAG (with
``:`` and ``-`` escaped) to pull in opinions specifically about the
people currently in the conversation. The two result sets are merged,
deduplicated by id, sorted by the blended relevance-times-strength score
from the local :func:`_sort_key` closure, then truncated to ``max_count``
items and ``max_chars`` of opinion text and shaped via
:meth:`_format_for_injection`. Reads Redis via the search index;
per-user query failures are logged and skipped. Called by
``prompt_context.py`` (around line 3190) while assembling the runtime
prompt context.
Args:
persona_id (str): Persona whose preferences to retrieve.
query_embedding (list[float]): Embedding of the current
conversational context for the semantic pass.
max_count (int): Maximum number of preferences to inject (default 8).
max_chars (int): Maximum total opinion characters to inject (default
2000).
active_user_ids (list[str] | None): User ids present in the
conversation, used for the hard-filter pass.
Returns:
list[dict[str, Any]]: Injection-shaped preference dicts (from
:meth:`_format_for_injection`), ranked and capped.
"""
# Pass 1: semantic
semantic_results = await self.search_preferences(
persona_id,
query_embedding,
top_k=max_count * 2,
)
# Pass 2: hard UID filter
person_results: list[dict[str, Any]] = []
if active_user_ids:
for uid in active_user_ids:
uid_clean = uid.replace(":", "\\:").replace("-", "\\-")
persona_filter = f"@persona_id:{{{persona_id}}}"
uid_filter = f"@related_user_ids_index:{{{uid_clean}}}"
active_filter = "@active:{1}"
try:
raw = await self._redis.execute_command(
"FT.SEARCH",
INDEX_NAME,
f"({persona_filter} {uid_filter} {active_filter})",
"RETURN",
"10",
"id",
"name",
"opinion",
"category",
"strength",
"active",
"persona_id",
"reinforcement_count",
"last_reinforced",
"LIMIT",
"0",
"5",
"DIALECT",
"2",
)
person_results.extend(self._parse_search_results(raw))
except Exception:
logger.debug(
"Persona pref UID filter failed for %s",
uid,
exc_info=True,
)
# Merge and deduplicate
seen_ids: set[str] = set()
merged: list[dict[str, Any]] = []
for item in semantic_results + person_results:
pid = item.get("id", "")
if pid and pid not in seen_ids:
seen_ids.add(pid)
merged.append(item)
# Sort: combined relevance + strength score
def _sort_key(p: dict) -> float:
"""Compute the blended ranking score for one merged preference.
Local closure used as the ``key`` function for the in-place
``merged.sort`` inside
:meth:`get_preferences_for_injection`. It converts the RediSearch
``COSINE`` distance carried in the ``score`` field into a
similarity (``1.0 - score``; person-filter hits that lack a vector
score default to distance ``1.0`` / similarity ``0.0``) and blends
it with the preference ``strength`` so that both topical relevance
and how firmly the persona holds the opinion influence ordering.
Purely arithmetic — no Redis, embedding, or other I/O.
Args:
p (dict): A decoded preference dict, optionally carrying a
``score`` (cosine distance) and a ``strength`` field.
Returns:
float: ``similarity * 0.6 + strength * 0.4``; higher sorts
earlier under the caller's ``reverse=True`` sort.
"""
score = float(p.get("score", 1.0))
similarity = 1.0 - score
strength = float(p.get("strength", 0.5))
return similarity * 0.6 + strength * 0.4
merged.sort(key=_sort_key, reverse=True)
# Apply caps
final: list[dict[str, Any]] = []
total_chars = 0
for item in merged:
if len(final) >= max_count:
break
opinion_len = len(item.get("opinion", ""))
if total_chars + opinion_len > max_chars:
break
total_chars += opinion_len
final.append(self._format_for_injection(item))
return final
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
def _decode_pref(self, data: dict) -> dict[str, Any]:
"""Decode a raw Redis preference hash into a clean dict.
Normalizes one ``HGETALL`` result into plain Python: it decodes any
bytes keys and values to UTF-8 (replacing undecodable bytes), drops the
raw ``embedding`` field so callers never handle the vector blob, and
coerces the stored ``"1"``/``"0"`` ``active`` flag into a real bool.
Pure transformation with no I/O. Called by :meth:`get_preference` and
:meth:`list_preferences` to shape stored hashes for callers.
Args:
data (dict): The raw field mapping returned by ``HGETALL`` (keys and
values may be ``str`` or ``bytes``).
Returns:
dict[str, Any]: Decoded fields with ``embedding`` removed and
``active`` as a bool.
"""
result: dict[str, Any] = {}
for k, v in data.items():
k = k if isinstance(k, str) else k.decode()
if k == "embedding":
continue
if isinstance(v, bytes):
v = v.decode("utf-8", errors="replace")
result[k] = v
result["active"] = result.get("active", "1") == "1"
return result
def _parse_search_results(self, raw: Any) -> list[dict[str, Any]]:
"""Parse a raw ``FT.SEARCH`` reply into a list of preference dicts.
Walks the flat RediSearch reply (a leading count followed by alternating
key and field-array entries) and turns each document's flat
field-name/field-value list into a dict, decoding any bytes to UTF-8 and
coercing the ``active`` flag to a bool. Malformed or empty replies yield
an empty list. Pure parsing with no I/O. Called by
:meth:`search_preferences` and by the hard-filter pass inside
:meth:`get_preferences_for_injection`.
Args:
raw (Any): The raw list returned by the ``FT.SEARCH`` command.
Returns:
list[dict[str, Any]]: One dict per returned document, with decoded
string fields and a bool ``active``.
"""
if not raw or len(raw) < 1:
return []
results = []
items = raw[1:] # skip the count
for i in range(0, len(items) - 1, 2):
_key = items[i]
fields = items[i + 1]
if not isinstance(fields, (list, tuple)):
continue
d: dict[str, Any] = {}
for j in range(0, len(fields) - 1, 2):
field_name = fields[j]
field_val = fields[j + 1]
if isinstance(field_name, bytes):
field_name = field_name.decode()
if isinstance(field_val, bytes):
field_val = field_val.decode("utf-8", errors="replace")
d[field_name] = field_val
d["active"] = d.get("active", "1") == "1"
results.append(d)
return results
def _format_for_injection(self, pref: dict) -> dict[str, Any]:
"""Project a preference into the minimal shape for prompt injection.
Selects and lightly normalizes just the fields the system prompt needs
(id, name, opinion, category, plus a rounded ``strength``, integer
``reinforcement_count``, and ``last_reinforced``), dropping search
scores and provenance so the injected payload stays compact. Pure
transformation with no I/O. Called by
:meth:`get_preferences_for_injection` for each surviving preference
before it is handed back to the prompt-context assembler.
Args:
pref (dict): A decoded/parsed preference dict.
Returns:
dict[str, Any]: The injection-ready subset of fields with normalized
numeric types.
"""
return {
"id": pref.get("id", ""),
"name": pref.get("name", ""),
"opinion": pref.get("opinion", ""),
"category": pref.get("category", ""),
"strength": round(float(pref.get("strength", 0.5)), 3),
"reinforcement_count": int(pref.get("reinforcement_count", 1)),
"last_reinforced": pref.get("last_reinforced", ""),
}