tyndale-ai-service/app/memory/conversation.py

123 lines
3.3 KiB
Python

"""Conversation history management with in-memory storage."""
import logging
import time
from dataclasses import dataclass, asdict, field
from typing import Annotated
from fastapi import Depends
from app.config import settings
logger = logging.getLogger(__name__)
MAX_HISTORY_MESSAGES = 20
@dataclass
class Message:
"""A single message in conversation history."""
role: str # "user" or "assistant"
content: str
sources: list[dict] | None = None
@dataclass
class ConversationData:
"""Container for conversation messages with timestamp for TTL."""
messages: list[Message] = field(default_factory=list)
last_updated: float = field(default_factory=time.time)
# Module-level storage for conversations
_conversations: dict[str, ConversationData] = {}
class ConversationMemory:
"""Manages conversation history in memory."""
def __init__(self, ttl: int):
"""Initialize conversation memory.
Args:
ttl: Time-to-live in seconds for conversations
"""
self.ttl = ttl
async def get_history(self, conversation_id: str) -> list[Message]:
"""Get conversation history.
Args:
conversation_id: Conversation identifier
Returns:
List of messages in chronological order, or empty list if expired/not found
"""
data = _conversations.get(conversation_id)
if data is None:
return []
# Check if expired
if time.time() - data.last_updated > self.ttl:
del _conversations[conversation_id]
return []
return data.messages
async def store_turn(
self,
conversation_id: str,
user_message: str,
assistant_message: str,
sources: list[dict] | None = None,
) -> None:
"""Store a conversation turn (user message + assistant response).
Args:
conversation_id: Conversation identifier
user_message: User's message
assistant_message: Assistant's response
sources: Optional source references used in response
"""
# Get existing history (checks TTL)
history = await self.get_history(conversation_id)
# Add new messages
history.append(Message(role="user", content=user_message))
history.append(Message(role="assistant", content=assistant_message, sources=sources))
# Trim to max size (keep most recent)
if len(history) > MAX_HISTORY_MESSAGES:
history = history[-MAX_HISTORY_MESSAGES:]
# Store with updated timestamp
_conversations[conversation_id] = ConversationData(
messages=history,
last_updated=time.time(),
)
logger.debug(f"Stored conversation turn for {conversation_id}")
async def clear(self, conversation_id: str) -> bool:
"""Clear conversation history.
Args:
conversation_id: Conversation identifier
Returns:
True if cleared successfully
"""
if conversation_id in _conversations:
del _conversations[conversation_id]
return True
def get_conversation_memory() -> ConversationMemory:
"""Get conversation memory instance."""
return ConversationMemory(ttl=settings.conversation_ttl)
ConversationMemoryDependency = Annotated[ConversationMemory, Depends(get_conversation_memory)]