核心痛点:告别“七秒记忆”的智能体
我们的最终目标是交付一个开箱即用的移动端 AI Agent SDK。如果端侧的智能体像金鱼一样只有 7 秒钟的记忆,上一秒告诉它名字,下一秒它就全忘了,那这就是个玩具,根本无法嵌入复杂的业务流。
认知重塑:云端的大模型(LLM)本身是绝对无状态(Stateless)的。
要让端侧 SDK 表现出“对答如流且知根知底”的智能,压力全部给到了后端的这条“供水管”。
后端必须扮演一个“端茶倒水兼记事本”的角色:每次接收到客户端的新请求,都要把“系统人设 + 历史聊天记录 + 最新问题”打包成一个完整的数组,喂给大模型。
一、会话隔离:基于 Session ID 的并发基础
在真实的端侧场景中,会有成千上万的用户同时调用 SDK。后端如果用一个全局变量存对话,必然会发生“串线”灾难。
为了支撑多用户并发,在 API 的入参中强制引入了 session_id。
# 简易内存池结构:{ "session_123": [{"role": "user", "content": "..."}] }
SESSION_MEMORY: Dict[str, List[Dict[str, str]]] = {}
# 每次请求先提取当前会话的记忆指针
if session_id not in SESSION_MEMORY:
SESSION_MEMORY[session_id] = [{"role": "system", "content": "..."}]
history = SESSION_MEMORY[session_id]
架构思考:目前为了快速验证 MVP(最小可行性产品),直接使用了 Python 的内存 Dict。
后续生产环境,这里会无缝替换为 Redis(存短期高频记忆)或 MySQL/向量数据库(存长期记忆)。
二、架构核心:滑动窗口 (Sliding Window) 解决 Token 爆炸
随着端侧用户聊的轮数越来越多,如果把几百轮历史记录全发给大模型,会引发两个致命问题:
- 超出大模型的最大 Context Window 上下文限制,接口直接报错。
- 极其高昂的 Token 费用账单(因为前面的废话每次都要被重复计费)。
所以在流式生成器中引入了经典的滑动窗口机制:
# 假设最多保留最近的 10 条消息(加上第1条系统设定,总共11条)
if len(history) > 11:
# 数组切片:永远保留 index=0 的 System Prompt,然后取最后的 10 条最新对话
SESSION_MEMORY[session_id] = [history[0]] + history[-10:]
history = SESSION_MEMORY[session_id]
这几行Python切片代码,确保了传给云端大脑的上下文永远是最精华、最相关的部分,从架构层面卡死了成本上限。
三、Python 生成器 (Generator) 的降维打击:流式后处理
这是今天实战中最惊艳的语言特性体验。
在大模型流式输出完毕后,需要把大模型刚才说的那一大段话,也塞进 history 数组里存起来。
如果是传统的 Java Web 开发思路,普通函数一旦 return 将流响应发给前端,函数就销毁了。想做后处理持久化,往往还得额外开个异步线程。而在基于 yield 的流式生成器里,一切浑然天成:
full_ai_answer = ""
async for line in response.aiter_lines():
# ... 解析 chunk ...
if chunk:
full_ai_answer += chunk # 1. 悄悄拼接,毫秒级操作,不阻塞网络推流
yield chunk # 2. 实时推给客户端 SDK
# 3. 核心机制:当大模型发来 [DONE] 导致循环结束后,生成器并没有死亡!
# 代码会继续往下执行,此时从容不迫地进行状态聚合与持久化。
history.append({"role": "assistant", "content": full_ai_answer})
底层逻辑: yield 在循环中扮演了“边推流边冻结”的桥梁。而当循环彻底结束后,函数的收尾工作完全可以在同一个上下文中同步完成。这种“边流式响应,边聚合状态,响应结束后统一持久化”的模式,是开发高并发 AI Agent 的绝佳工具。
四、配套代码
👉 点击查看完整 `v0.4.0` 源码
import json
import httpx
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse # 核心:导入 FastAPI 的流式响应组件
from pydantic import BaseModel, Field
from typing import Dict, List # 引入类型提示
app = FastAPI(
title="PocketAgent Core API",
description="支持极速流式输出的 AI Agent 调度网关",
version="0.4.0"
)
class ChatRequest(BaseModel):
query: str = Field(..., description="用户的提问")
# 新增 session_id,客户端如果不传,默认为 default_session
session_id: str = Field(default="default_session", description="会话ID")
# --- 核心新增:简易内存数据库 ---
# 结构:{ "session_123": [{"role": "system", "content": "..."}, {"role": "user", "content": "..."}] }
SESSION_MEMORY: Dict[str, List[Dict[str, str]]] = {}
# --- 智谱大模型配置区 ---
API_KEY = "xxx"
BASE_URL = "https://open.bigmodel.cn/api/paas/v4/chat/completions"
MODEL_NAME = "glm-4-flash"
# --- 核心改造:带记忆和滑动窗口的流式生成器 ---
async def stream_llm_reply(query: str, session_id: str):
# 1. 提取或初始化当前会话的记忆
if session_id not in SESSION_MEMORY:
SESSION_MEMORY[session_id] = [
{"role": "system", "content": "你是一个名为 PocketAgent 的智能体。回答要简明扼要,像个极客。"}
]
# 获取当前用户的历史记录指针
history = SESSION_MEMORY[session_id]
# 2. 将最新的问题追加到历史中
history.append({"role": "user", "content": query})
# 3. 【滑动窗口机制】控制记忆长度,防止 Token 爆炸
# 假设我们最多保留最近的 10 条消息(加上第1条系统设定,总共11条)
if len(history) > 11:
# 切片魔法:保留第 0 个(系统设定),然后拼上最后 10 个(最新对话)
SESSION_MEMORY[session_id] = [history[0]] + history[-10:]
history = SESSION_MEMORY[session_id] # 更新指针
payload = {
"model": MODEL_NAME,
"messages": history, # 这一次,我们把整个历史数组传进去!
"temperature": 0.7,
"stream": True
}
headers = {
"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json"
}
# 用于在后端悄悄拼装大模型的完整回答,稍后存入记忆
full_ai_answer = ""
async with httpx.AsyncClient() as client:
async with client.stream("POST", BASE_URL, json=payload, headers=headers, timeout=30.0) as response:
response.raise_for_status()
async for line in response.aiter_lines():
if line.startswith("data: "):
data_str = line[6:]
if data_str.strip() == "[DONE]":
break
try:
data_json = json.loads(data_str)
chunk = data_json["choices"][0]["delta"].get("content", "")
if chunk:
full_ai_answer += chunk # 悄悄拼接
yield chunk # 实时推给客户端 SDK
except Exception as e:
pass
# 4. 重点:当流式输出彻底结束后,把大模型的完整回答追加到记忆中!
# 这样下一轮对话时,大模型就知道自己刚才说过什么了。
history.append({"role": "assistant", "content": full_ai_answer})
print(f"[记忆快照] Session: {session_id}, 当前记忆轮数: {len(history)}")
# --- API 路由出口改造 ---
# 注意:流式接口不需要 response_model 了,因为返回的是持续的数据流,不是固定的 JSON
@app.post("/api/v1/chat/stream")
async def chat_stream(request: ChatRequest):
try:
# 把 session_id 也传进去
return StreamingResponse(
stream_llm_reply(request.query, request.session_id),
media_type="text/event-stream"
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))