Skip to content
Maozy's Blog
Go back

[PocketAgent 实战] Day2 多轮记忆与滑动窗口

核心痛点:告别“七秒记忆”的智能体

我们的最终目标是交付一个开箱即用的移动端 AI Agent SDK。如果端侧的智能体像金鱼一样只有 7 秒钟的记忆,上一秒告诉它名字,下一秒它就全忘了,那这就是个玩具,根本无法嵌入复杂的业务流。

认知重塑:云端的大模型(LLM)本身是绝对无状态(Stateless)的。

要让端侧 SDK 表现出“对答如流且知根知底”的智能,压力全部给到了后端的这条“供水管”。

后端必须扮演一个“端茶倒水兼记事本”的角色:每次接收到客户端的新请求,都要把“系统人设 + 历史聊天记录 + 最新问题”打包成一个完整的数组,喂给大模型。


一、会话隔离:基于 Session ID 的并发基础

在真实的端侧场景中,会有成千上万的用户同时调用 SDK。后端如果用一个全局变量存对话,必然会发生“串线”灾难。

为了支撑多用户并发,在 API 的入参中强制引入了 session_id

# 简易内存池结构:{ "session_123": [{"role": "user", "content": "..."}] }
SESSION_MEMORY: Dict[str, List[Dict[str, str]]] = {}

# 每次请求先提取当前会话的记忆指针
if session_id not in SESSION_MEMORY:
    SESSION_MEMORY[session_id] = [{"role": "system", "content": "..."}]
history = SESSION_MEMORY[session_id]

架构思考:目前为了快速验证 MVP(最小可行性产品),直接使用了 Python 的内存 Dict。

后续生产环境,这里会无缝替换为 Redis(存短期高频记忆)或 MySQL/向量数据库(存长期记忆)。

二、架构核心:滑动窗口 (Sliding Window) 解决 Token 爆炸

随着端侧用户聊的轮数越来越多,如果把几百轮历史记录全发给大模型,会引发两个致命问题:

  1. 超出大模型的最大 Context Window 上下文限制,接口直接报错。
  2. 极其高昂的 Token 费用账单(因为前面的废话每次都要被重复计费)。

所以在流式生成器中引入了经典的滑动窗口机制:

# 假设最多保留最近的 10 条消息(加上第1条系统设定,总共11条)
if len(history) > 11:
    # 数组切片:永远保留 index=0 的 System Prompt,然后取最后的 10 条最新对话
    SESSION_MEMORY[session_id] = [history[0]] + history[-10:]
    history = SESSION_MEMORY[session_id] 

这几行Python切片代码,确保了传给云端大脑的上下文永远是最精华、最相关的部分,从架构层面卡死了成本上限。

三、Python 生成器 (Generator) 的降维打击:流式后处理

这是今天实战中最惊艳的语言特性体验。

在大模型流式输出完毕后,需要把大模型刚才说的那一大段话,也塞进 history 数组里存起来。

如果是传统的 Java Web 开发思路,普通函数一旦 return 将流响应发给前端,函数就销毁了。想做后处理持久化,往往还得额外开个异步线程。而在基于 yield 的流式生成器里,一切浑然天成:

full_ai_answer = "" 
async for line in response.aiter_lines():
    # ... 解析 chunk ...
    if chunk:
        full_ai_answer += chunk # 1. 悄悄拼接,毫秒级操作,不阻塞网络推流
        yield chunk             # 2. 实时推给客户端 SDK

# 3. 核心机制:当大模型发来 [DONE] 导致循环结束后,生成器并没有死亡!
# 代码会继续往下执行,此时从容不迫地进行状态聚合与持久化。
history.append({"role": "assistant", "content": full_ai_answer})

底层逻辑: yield 在循环中扮演了“边推流边冻结”的桥梁。而当循环彻底结束后,函数的收尾工作完全可以在同一个上下文中同步完成。这种“边流式响应,边聚合状态,响应结束后统一持久化”的模式,是开发高并发 AI Agent 的绝佳工具。

四、配套代码

👉 点击查看完整 `v0.4.0` 源码
  import json
  import httpx
  from fastapi import FastAPI, HTTPException
  from fastapi.responses import StreamingResponse  # 核心:导入 FastAPI 的流式响应组件
  from pydantic import BaseModel, Field
  from typing import Dict, List  # 引入类型提示

  app = FastAPI(
      title="PocketAgent Core API",
      description="支持极速流式输出的 AI Agent 调度网关",
      version="0.4.0"
  )

  class ChatRequest(BaseModel):
      query: str = Field(..., description="用户的提问")
      # 新增 session_id,客户端如果不传,默认为 default_session
      session_id: str = Field(default="default_session", description="会话ID")


  # --- 核心新增:简易内存数据库 ---
  # 结构:{ "session_123": [{"role": "system", "content": "..."}, {"role": "user", "content": "..."}] }
  SESSION_MEMORY: Dict[str, List[Dict[str, str]]] = {}

  # --- 智谱大模型配置区 ---
  API_KEY = "xxx"
  BASE_URL = "https://open.bigmodel.cn/api/paas/v4/chat/completions"
  MODEL_NAME = "glm-4-flash"


  # --- 核心改造:带记忆和滑动窗口的流式生成器 ---
  async def stream_llm_reply(query: str, session_id: str):
      # 1. 提取或初始化当前会话的记忆
      if session_id not in SESSION_MEMORY:
          SESSION_MEMORY[session_id] = [
              {"role": "system", "content": "你是一个名为 PocketAgent 的智能体。回答要简明扼要,像个极客。"}
          ]

      # 获取当前用户的历史记录指针
      history = SESSION_MEMORY[session_id]

      # 2. 将最新的问题追加到历史中
      history.append({"role": "user", "content": query})

      # 3. 【滑动窗口机制】控制记忆长度,防止 Token 爆炸
      # 假设我们最多保留最近的 10 条消息(加上第1条系统设定,总共11条)
      if len(history) > 11:
          # 切片魔法:保留第 0 个(系统设定),然后拼上最后 10 个(最新对话)
          SESSION_MEMORY[session_id] = [history[0]] + history[-10:]
          history = SESSION_MEMORY[session_id]  # 更新指针

      payload = {
          "model": MODEL_NAME,
          "messages": history,  # 这一次,我们把整个历史数组传进去!
          "temperature": 0.7,
          "stream": True
      }

      headers = {
          "Authorization": f"Bearer {API_KEY}",
          "Content-Type": "application/json"
      }

      # 用于在后端悄悄拼装大模型的完整回答,稍后存入记忆
      full_ai_answer = ""

      async with httpx.AsyncClient() as client:
          async with client.stream("POST", BASE_URL, json=payload, headers=headers, timeout=30.0) as response:
              response.raise_for_status()

              async for line in response.aiter_lines():
                  if line.startswith("data: "):
                      data_str = line[6:]
                      if data_str.strip() == "[DONE]":
                          break

                      try:
                          data_json = json.loads(data_str)
                          chunk = data_json["choices"][0]["delta"].get("content", "")
                          if chunk:
                              full_ai_answer += chunk  # 悄悄拼接
                              yield chunk  # 实时推给客户端 SDK
                      except Exception as e:
                          pass

      # 4. 重点:当流式输出彻底结束后,把大模型的完整回答追加到记忆中!
      # 这样下一轮对话时,大模型就知道自己刚才说过什么了。
      history.append({"role": "assistant", "content": full_ai_answer})
      print(f"[记忆快照] Session: {session_id}, 当前记忆轮数: {len(history)}")


  # --- API 路由出口改造 ---
  # 注意:流式接口不需要 response_model 了,因为返回的是持续的数据流,不是固定的 JSON
  @app.post("/api/v1/chat/stream")
  async def chat_stream(request: ChatRequest):
      try:
          # 把 session_id 也传进去
          return StreamingResponse(
              stream_llm_reply(request.query, request.session_id),
              media_type="text/event-stream"
          )
      except Exception as e:
          raise HTTPException(status_code=500, detail=str(e))

Share this post on:

Previous Post
[Jetpack Compose] Compose 的三大渲染阶段
Next Post
[PocketAgent 实战] Day1 Python SSE 与网络背压机制