FastAPI開發AI應用教程:新增用戶歷史消息
本文將深入介紹如何在 FastAPI AI 聊天應用中實現用戶歷史消息功能,當用戶切換助手,刷新頁面時,都可以保留當前會話歷史消息。
圖片
本項目已經開源至 Github,項目地址:https://github.com/wayn111/fastapi-ai-chat-demo
溫馨提示:本文全文約一萬字,看完約需 15 分鐘。
文章概述
重點講解每個助手區分 sessionid、獲取歷史消息接口以及發送消息時攜帶上下文信息的核心技術實現。通過本教程,你將掌握構建智能聊天應用中消息持久化和上下文管理的關鍵技術。
核心功能
- 多助手會話隔離:每個 AI 助手(智能助手、AI 老師、編程專家)都有獨立的會話歷史
- 智能會話管理:自動生成和管理 sessionid,確保會話的唯一性和持久性
- 歷史消息加載:快速加載和展示用戶的歷史對話記錄
- 上下文傳遞:發送消息時自動攜帶歷史上下文,保持對話連貫性
- 數據持久化:支持 Redis 和內存兩種存儲方式
技術棧
- 后端框架:FastAPI(高性能異步 Web 框架)
- 數據存儲:Redis(主要)+ 內存存儲(備用)
- 前端技術:原生 JavaScript + HTML5 + CSS3
- 數據格式:JSON(消息序列化和傳輸)
- 會話管理:UUID + 時間戳(會話 ID 生成)
核心架構設計
??? 數據模型設計
在實現歷史消息功能之前,我們需要設計合理的數據模型來存儲和管理消息數據:
@dataclass
class AIMessage:
"""AI消息數據類"""
role: str
content: str
timestamp: float
image_data: Optional[str] = None # Base64編碼的圖片數據
image_type: Optional[str] = None # 圖片類型 (jpeg, png, gif)這個數據類定義了消息的基本結構,包含角色、內容、時間戳和可選的圖片數據字段。
?? 會話 ID 管理策略
會話 ID 是整個歷史消息系統的核心,我們采用了前端生成、后端接收的管理策略:
前端會話 ID 生成邏輯:
// 前端生成會話ID的核心邏輯
if (sessionId) {
// 復用已存在的會話ID
currentSessionId = sessionId;
} else {
// 生成新的會話ID:時間戳 + 隨機數
const timestamp = Date.now();
const randomNum = Math.floor(Math.random() * 10000);
sessionId = `session_${timestamp}_${randomNum}`;
currentSessionId = sessionId;
localStorage.setItem(sessionKey, sessionId);
}后端鍵名管理:
def get_conversation_key(user_id: str, session_id: str) -> str:
"""獲取對話在Redis中的鍵名"""
return f"conversation:{user_id}:{session_id}"
def get_user_sessions_key(user_id: str) -> str:
"""獲取用戶會話列表在Redis中的鍵名"""
return f"user_sessions:{user_id}"前端生成唯一的會話 ID 并傳遞給后端,后端使用這個 ID 構建 Redis 鍵名來存儲對話數據。
核心功能實現
?? 功能一:每個助手區分 sessionid
前端實現:智能會話管理
在前端,我們為每個助手類型維護獨立的 sessionid,實現真正的會話隔離:
/**
* 選擇智能助手類型
* @param {string} assistantType - 助手類型
*/
function selectAssistant(assistantType) {
// 更新當前助手類型
currentAssistantType = assistantType;
// 移除所有助手項的active類
document.querySelectorAll('.assistant-item').forEach(item => {
item.classList.remove('active');
});
// 為當前選中的助手添加active類
event.target.closest('.assistant-item').classList.add('active');
// 更新所有現有的assistant消息頭像
updateAssistantAvatars(assistantType);
// 從全局配置中獲取角色信息
const roleConfig = aiRolesConfig[assistantType];
if (!roleConfig) {
console.error('未找到角色配置:', assistantType);
return;
}
// 更新選中模型信息顯示
updateSelectedModelInfo(assistantType);
// 切換助手時處理sessionId
const sessionKey = `${assistantType}_sessionId`;
let sessionId = localStorage.getItem(sessionKey);
if (sessionId) {
// 如果該助手已有sessionId,使用之前的
currentSessionId = sessionId;
} else {
// 如果沒有sessionId,生成新的
const timestamp = Date.now();
const randomNum = Math.floor(Math.random() * 10000);
sessionId = `session_${timestamp}_${randomNum}`;
currentSessionId = sessionId;
localStorage.setItem(sessionKey, sessionId);
}
// 根據當前助手的sessionId重新調用history接口
loadAssistantHistory(assistantType);
}這個函數負責切換助手時的會話管理,為每個助手類型維護獨立的 sessionId,并從 localStorage 中獲取或生成新的會話 ID。
后端實現:接收會話 ID 并管理數據
后端接收前端傳來的會話 ID,通過 Redis 實現會話數據的持久化存儲:
async def save_message_to_redis(user_id: str, session_id: str, message: ChatMessage):
"""將消息保存到Redis或內存"""
try:
message_data = {
"role": message.role,
"content": message.content,
"timestamp": message.timestamp,
"image_data": getattr(message, 'image_data', None),
"image_type": getattr(message, 'image_type', None)
}
if REDIS_AVAILABLE and redis_client:
# Redis存儲:高性能,支持數據過期
conversation_key = get_conversation_key(user_id, session_id)
redis_client.lpush(conversation_key, json.dumps(message_data))
redis_client.ltrim(conversation_key, 0, 19) # 只保留最近20條消息
redis_client.expire(conversation_key, 86400 * 7) # 7天過期
# 更新會話信息
sessions_key = get_user_sessions_key(user_id)
session_info = {
"session_id": session_id,
"last_message": message.content[:50] + "..."if len(message.content) > 50else message.content,
"last_timestamp": message.timestamp
}
redis_client.hset(sessions_key, session_id, json.dumps(session_info))
redis_client.expire(sessions_key, 86400 * 30) # 30天過期
logger.info(f"消息已保存到Redis - 用戶: {user_id}, 會話: {session_id[:8]}..., 角色: {message.role}")
else:
# 內存存儲:備用方案
if user_id notin MEMORY_STORAGE["conversations"]:
MEMORY_STORAGE["conversations"][user_id] = {}
if session_id notin MEMORY_STORAGE["conversations"][user_id]:
MEMORY_STORAGE["conversations"][user_id][session_id] = []
MEMORY_STORAGE["conversations"][user_id][session_id].append(message_data)
# 限制內存中的消息數量
if len(MEMORY_STORAGE["conversations"][user_id][session_id]) > 20:
MEMORY_STORAGE["conversations"][user_id][session_id] = \
MEMORY_STORAGE["conversations"][user_id][session_id][-20:]
logger.info(f"消息已保存到內存 - 用戶: {user_id}, 會話: {session_id[:8]}..., 角色: {message.role}")
except Exception as e:
logger.error(f"保存消息失敗 - 用戶: {user_id}, 會話: {session_id[:8]}..., 錯誤: {e}")
raise這個函數將消息保存到 Redis 或內存中,支持雙重存儲策略,并設置了消息數量限制和過期時間。
?? 功能二:獲取歷史消息接口
后端 API 設計
我們設計了一個高效的歷史消息獲取接口:
@app.get("/chat/history")
asyncdef get_chat_history(
user_id: str = Query(..., descriptinotallow="用戶ID"),
session_id: str = Query(..., descriptinotallow="會話ID")
):
"""獲取聊天歷史"""
logger.info(f"獲取聊天歷史 - 用戶: {user_id}, 會話: {session_id[:8]}...")
try:
history = await get_conversation_history(user_id, session_id)
logger.info(f"聊天歷史獲取成功 - 用戶: {user_id}, 會話: {session_id[:8]}..., 消息數: {len(history)}")
return {
"session_id": session_id,
"messages": history,
"total": len(history)
}
except Exception as e:
logger.error(f"獲取聊天歷史失敗 - 用戶: {user_id}, 會話: {session_id[:8]}..., 錯誤: {e}")
raise HTTPException(status_code=500, detail="獲取聊天歷史失敗")
asyncdef get_conversation_history(user_id: str, session_id: str) -> List[Dict[str, Any]]:
"""從Redis或內存獲取對話歷史"""
try:
if REDIS_AVAILABLE and redis_client:
# 從Redis獲取
conversation_key = get_conversation_key(user_id, session_id)
messages = redis_client.lrange(conversation_key, 0, -1)
# 反轉消息順序(Redis中是倒序存儲的)
messages.reverse()
history = [json.loads(msg) for msg in messages]
logger.info(f"從Redis獲取對話歷史 - 用戶: {user_id}, 會話: {session_id[:8]}..., 消息數量: {len(history)}")
return history
else:
# 從內存獲取
if (user_id in MEMORY_STORAGE["conversations"] and
session_id in MEMORY_STORAGE["conversations"][user_id]):
history = MEMORY_STORAGE["conversations"][user_id][session_id]
logger.info(f"從內存獲取對話歷史 - 用戶: {user_id}, 會話: {session_id[:8]}..., 消息數量: {len(history)}")
return history
else:
logger.info(f"未找到對話歷史 - 用戶: {user_id}, 會話: {session_id[:8]}...")
return []
except Exception as e:
logger.error(f"獲取對話歷史失敗 - 用戶: {user_id}, 會話: {session_id[:8]}..., 錯誤: {e}")
return []前端歷史消息加載
前端通過異步請求加載和渲染歷史消息:
/**
* 加載指定助手的歷史消息
* @param {string} assistantType - 助手類型
*/
asyncfunction loadAssistantHistory(assistantType) {
try {
// 獲取該助手的sessionId
const sessionId = localStorage.getItem(`${assistantType}_sessionId`);
if (!sessionId) {
// 如果沒有sessionId,顯示歡迎消息
showWelcomeMessage(assistantType);
return;
}
// 更新當前會話ID
currentSessionId = sessionId;
// 清空當前聊天消息
const chatMessages = document.getElementById('chatMessages');
chatMessages.innerHTML = '';
// 顯示加載提示
const loadingMessage = document.createElement('div');
loadingMessage.className = 'message assistant';
loadingMessage.innerHTML = `
<div class="message-avatar">??</div>
<div class="message-content-wrapper">
正在加載歷史消息...
</div>
`;
chatMessages.appendChild(loadingMessage);
// 從后端獲取歷史消息
const response = await fetch(`/chat/history?session_id=${sessionId}&user_id=${userId}`);
if (response.ok) {
const data = await response.json();
// 清空加載提示
chatMessages.innerHTML = '';
// 渲染歷史消息
if (data.messages && data.messages.length > 0) {
data.messages.forEach(message => {
renderHistoryMessage(message);
});
console.log(`加載了 ${data.messages.length}條歷史消息`);
} else {
// 如果沒有歷史消息,顯示歡迎消息
showWelcomeMessage(assistantType);
}
// 滾動到底部
scrollToBottom();
} else {
console.error('加載歷史消息失敗:', response.statusText);
showWelcomeMessage(assistantType);
}
} catch (error) {
console.error('加載助手歷史失敗:', error);
showWelcomeMessage(assistantType);
}
}
/**
* 渲染歷史消息
* @param {Object} message - 消息對象
*/
function renderHistoryMessage(message) {
const chatMessages = document.getElementById('chatMessages');
const messageDiv = document.createElement('div');
messageDiv.className = `message ${message.role}`;
// 創建頭像
const avatarDiv = document.createElement('div');
avatarDiv.className = 'message-avatar';
// 如果是assistant消息,設置助手圖標
if (message.role === 'assistant') {
const icon = getAssistantIcon(currentAssistantType);
avatarDiv.setAttribute('data-icon', icon);
}
const contentDiv = document.createElement('div');
contentDiv.className = 'message-content-wrapper';
// 處理消息內容
if (message.role === 'assistant') {
// 對于AI回復,使用Markdown渲染
renderMarkdownContent(message.content, contentDiv);
} else {
// 對于用戶消息,檢查是否包含圖片
if (message.image_data) {
// 創建圖片元素
const imageDiv = document.createElement('div');
imageDiv.className = 'message-image';
const img = document.createElement('img');
img.src = `data:${message.image_type};base64,${message.image_data}`;
img.alt = '用戶上傳的圖片';
img.style.maxWidth = '300px';
img.style.borderRadius = '8px';
imageDiv.appendChild(img);
contentDiv.appendChild(imageDiv);
}
// 添加文本內容
if (message.content && message.content.trim()) {
const textDiv = document.createElement('div');
textDiv.textContent = message.content;
contentDiv.appendChild(textDiv);
}
}
messageDiv.appendChild(avatarDiv);
messageDiv.appendChild(contentDiv);
chatMessages.appendChild(messageDiv);
}這個函數從后端獲取指定助手的歷史消息,并在前端進行渲染顯示,支持文本和圖片消息的完整展示。
?? 功能三:發送消息時攜帶上下文信息
后端流式對話實現
發送消息時,我們需要獲取歷史上下文并傳遞給 AI 模型:
1. 流式聊天接口
@app.post("/chat/stream")
asyncdef chat_stream(request: ChatRequest):
"""流式聊天接口"""
# 設置默認值
role = "assistant"
provider = request.provider
model = getattr(request, 'model', None)
logger.info(f"流式聊天請求 - 用戶: {request.user_id}, 會話: {request.session_id[:8]}..., 角色: {role}, 消息長度: {len(request.message)}, 提供商: {provider}")
if role notin AI_ROLES:
logger.warning(f"不支持的AI角色: {role}")
raise HTTPException(status_code=400, detail="不支持的AI角色")
return StreamingResponse(
generate_streaming_response(request.user_id, request.session_id, request.message, role, provider, model, request.image_data, request.image_type),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"Access-Control-Allow-Origin": "*"
}
)這個接口是流式聊天的入口點:
- 接收前端發送的 ChatRequest 對象,包含用戶 ID、會話 ID、消息內容等
- 設置默認的 AI 角色為 "assistant",從請求中獲取 AI 提供商和模型信息
- 驗證 AI 角色是否在支持的角色列表中
- 返回 StreamingResponse 對象,設置 SSE(Server-Sent Events)相關的響應頭
- 調用 generate_streaming_response 函數處理具體的流式響應邏輯
2. 流式響應生成函數
async def generate_streaming_response(user_id: str, session_id: str, user_message: str, role: str = "assistant", provider: Optional[str] = None, model: Optional[str] = None, image_data: Optional[str] = None, image_type: Optional[str] = None):
"""生成流式響應"""
logger.info(f"開始流式響應 - 用戶: {user_id}, 會話: {session_id[:8]}..., 角色: {role}, 消息長度: {len(user_message)}, 提供商: {provider}")
try:
# 1. 保存用戶消息到Redis
from ai_providers.base import AIMessage
user_msg = AIMessage(
role="user",
cnotallow=user_message,
timestamp=time.time(),
image_data=image_data,
image_type=image_type
)
await save_message_to_redis(user_id, session_id, user_msg)
# 2. 獲取對話歷史記錄
history = await get_conversation_history(user_id, session_id)
# 3. 構建系統提示詞
system_prompt = AI_ROLES.get(role, AI_ROLES["assistant"])["prompt"]
# 4. 構建AI消息對象列表
ai_messages = []
# 5. 添加歷史消息(限制數量避免上下文過長)
recent_messages = history[-config.MAX_HISTORY_MESSAGES:] if len(history) > config.MAX_HISTORY_MESSAGES else history
for msg in recent_messages:
if msg["role"] in ["user", "assistant"]:
ai_messages.append(AIMessage(
role=msg["role"],
cnotallow=msg["content"],
timestamp=msg.get("timestamp", time.time()),
image_data=msg.get("image_data"),
image_type=msg.get("image_type")
))
# 6. 調用AI提供商的流式API
logger.info(f"調用AI流式API - 消息數: {len(ai_messages)}, 提供商: {provider or '默認'}, 模型: {model or '默認'}")
full_response = ""
content_only_response = ""# 只保存 type: 'content' 的內容
chunk_count = 0
# 7. 處理流式響應
asyncfor chunk in ai_manager.generate_streaming_response(
messages=ai_messages,
provider=provider,
model=model,
system_prompt=system_prompt
):
if chunk:
full_response += chunk
chunk_count += 1
# 8. 解析chunk數據,過濾出純文本內容
try:
if chunk.startswith("data: "):
json_str = chunk[6:].strip() # 移除 "data: " 前綴
if json_str:
chunk_data = json.loads(json_str)
# 只累積 type 為 'content' 的內容用于保存到Redis
if chunk_data.get('type') == 'content'and'content'in chunk_data:
content_only_response += chunk_data['content']
except (json.JSONDecodeError, KeyError) as e:
# 如果解析失敗,按原來的方式處理(向后兼容)
logger.debug(f"解析chunk數據失敗,使用原始內容: {e}")
content_only_response += chunk
# 9. 實時推送數據到前端
yield chunk
logger.info(f"流式響應完成 - 用戶: {user_id}, 會話: {session_id[:8]}..., 塊數: {chunk_count}, 總長度: {len(full_response)}, 內容長度: {len(content_only_response)}")
# 10. 保存AI響應到Redis(只保存純文本內容)
ai_msg = ChatMessage(
role="assistant",
cnotallow=content_only_response, # 使用過濾后的內容
timestamp=time.time()
)
await save_message_to_redis(user_id, session_id, ai_msg)
# 11. 發送結束信號
yieldf"data: {json.dumps({'type': 'end', 'session_id': session_id})}\n\n"
except Exception as e:
logger.error(f"流式響應錯誤 - 用戶: {user_id}, 會話: {session_id[:8]}..., 錯誤: {e}")
error_msg = f"抱歉,服務出現錯誤:{str(e)}"
yieldf"data: {json.dumps({'content': error_msg, 'type': 'error'})}\n\n"這個函數是流式響應的核心實現,主要包含以下步驟:
- 保存用戶消息:將用戶發送的消息(包括文本和圖片)保存到 Redis 中
- 獲取歷史記錄:根據用戶 ID 和會話 ID 從 Redis 中獲取完整的對話歷史
- 構建系統提示:根據 AI 角色獲取對應的系統提示詞
- 構建消息列表:將歷史消息轉換為 AI 模型需要的格式
- 限制歷史長度:只取最近的 N 條消息,避免上下文過長影響性能
- 調用 AI API:使用 AI 管理器調用指定提供商的流式 API
- 處理流式數據:逐塊接收 AI 響應,實時推送給前端
- 數據過濾:從流式數據中提取純文本內容,用于保存到數據庫
- 實時推送:使用 yield 將數據塊實時發送給前端
- 保存 AI 響應:將完整的 AI 回復保存到 Redis 中
- 發送結束信號:通知前端流式響應已完成
通過這種設計,實現了帶有完整上下文的流式對話功能,用戶可以看到 AI 的實時回復,同時所有對話記錄都會被持久化保存。
總結
本教程通過前端會話 ID 管理、后端歷史消息接口和流式對話上下文傳遞三個核心技術,實現了支持多助手切換和歷史記錄持久化的 AI 聊天應用。


























