PocketAgent 是我的从零开始的 AI Agent 项目,核心目标是打造一个开箱即用的移动端 AI Agent SDK。
核心痛点与架构选型
为了让客户端开发人员集成 SDK 后,能为用户提供极其丝滑的“打字机”体验,必须在后端为其铺设一条稳定的通信“水管”。
大模型 API(如 DeepSeek、智谱等)的响应通常伴随高延迟。如果配套的后端服务使用传统的 HTTP 同步阻塞请求,不仅服务端并发极差,客户端调用 SDK 后的体验(死等好几秒才出全量结果)也是无法接受的。
解决方案:基于 FastAPI 的全异步非阻塞架构 + Server-Sent Events (SSE) 流式传输。
对比 Java后端&Android 技术栈:
- FastAPI + Uvicorn ≈ Spring WebFlux + Netty(全异步非阻塞 Web 框架)
- httpx ≈ 异步 OkHttp(负责非阻塞的网络 I/O)
- pydantic ≈ Lombok + Validation(严苛的数据 DTO 校验)
一、async with:优雅的网络 I/O 与资源管理
在调用大模型时,建立的是底层的 TCP Socket 连接。Python 提供了 async with 语法糖来处理异步网络请求。
async with httpx.AsyncClient() as client:
async with client.stream("POST", url, json=payload, headers=headers) as response:
# 处理持续的网络流
机制拆解:
- 资源自动回收:with 关键字等同于 Java 的 try-with-resources。无论代码块内是否抛出异常,只要跳出缩进,底层强制调用 client.close(),彻底杜绝连接未释放导致的内存泄漏。
- 非阻塞挂起: 配合 async/await(或 async for),当线程向大模型发起请求后,不会原地死等。操作系统会将当前线程挂起,让出 CPU 执行其他并发任务。直到网卡接收到大模型传回的首包数据,线程才会被唤醒继续执行。
二、yield
要实现端侧 SSE 的“流水席”效果,普通的 return 无法满足需求。必须引入 Python的生成器 (Generator) 与 yield。
async for line in response.aiter_lines():
if line.startswith("data: "):
chunk = parse_json(line) # 解析出最新的一个词
if chunk:
yield chunk # 将词发射出去,并在此处冻结状态
return vs yield:
- return (一锤子买卖): 一次性收集所有数据 -> 返回全量字符串 -> 清空函数内存栈 -> 函数生命周期彻底结束。
- yield (状态机管道):抓取到一块数据(chunk) -> 将其抛给外层调用者 -> 在此行代码按下暂停键,冻结局部变量与运行状态 -> 等待外层消费完毕并索要下一块数据 -> 从暂停处复活,继续下一轮循环抓取。
带有 yield 的函数不再是普通函数,它变成了一个源源不断为端侧 SDK 产出数据的流水线节点。
三、StreamingResponse 与 HTTP Chunk
后端的生成器开始“吐字”后,需要通过 FastAPI 的 StreamingResponse 桥接,推给集成 SDK 的手机客户端。
return StreamingResponse(
stream_llm_reply(request.query),
media_type="text/event-stream"
)
协议层面的改变:
- 不再发送传统的 Content-Length 头(因为大模型输出长度未知)。
- 强制修改为 Transfer-Encoding: chunked 和 Content-Type: text/event-stream。
- StreamingResponse 作为一个消费者,死死咬住内层的 yield 生成器。生成器吐出一个词,它就将其封装为一个 HTTP Chunk,顺着 TCP 链路推给手机端。
四、网络背压 (Backpressure)
如果大模型吐字极快,而移动端所处网络极差(接收极慢),后端的内存会被撑爆吗?
答案是不会。因为这是一套完美的拉取模型 (Pull Model),其齿轮紧密咬合:
- 发送受阻:移动端网络差导致 TCP 接收窗口打满,Uvicorn 服务器在向端侧 SDK 发送 HTTP Chunk 时遭遇底层阻塞。
- 停止索要:外层 StreamingResponse 发送不出去,便会在原地等待,不会开启下一轮循环去向生成器索要数据。
- 休眠保内存:生成器迟迟等不到外部的唤醒信号,便会一直停留在 yield 的冻结状态。自然也不会去网卡缓冲区拉取大模型的新数据。
核心传动链:
SDK 端侧接收速率 → 决定后端下发速率 → 决定 StreamingResponse 的循环频率 → 决定 yield 的唤醒频率 → 决定底层 aiter_lines() 去 Socket 读取数据的频率。
通过这种“消费者拉动生产者”的机制,支撑 SDK 的底层服务在不额外消耗内存的情况下,完美适配了复杂多变的移动端网络环境。
五、问题整理
疑问 1:response.aiter_lines() 是实时在变化的吗?
绝对是实时的。 :“大模型不可能一开始就把全量结果直接给返回了”。
在传统的代码里,如果写 lines = file.readlines(),那是把整个静态文件一口气全读进内存,这是一个死数据。
但是,这里的 response.aiter_lines() 面对的不是一个硬盘里的死文件,而是一个活着且敞开的 TCP 网络套接字(Socket)缓冲区。
- 云端的大模型是一块极其昂贵的显卡,它在吭哧吭哧地做矩阵乘法。它每算出几个词(Token),就立刻打包成一个网络数据包,顺着网线踢给我们。
- 我们 PC的操作系统网卡接收到这个包,把它塞进内存里的 TCP 接收缓冲区。
aiter_lines()就是一个伸进这个缓冲区的“探头”。- 如果缓冲区里有一行完整的数据,它就立刻抓出来。
- 如果缓冲区空了(大模型卡壳了,还没算出下一个词),
aiter_lines()不会报错,它会触发await的特性,把当前线程挂起(休眠)。 - 等大模型的新词顺着网线到达缓冲区,操作系统“叮”地一声唤醒线程,
aiter_lines()抓起新数据继续跑。
所以,这是一条动态的、流动的、不知何时结束的长河。
疑问 2:yield 会去判断数据是不是被外层消费了吗?
正确。这也是 Python 生成器设计的绝妙之处。
如果大模型算得极其快,一秒钟发过来一万个字,而外层的手机端网络极差,一秒钟只能接收十个字,那服务器内存会不会被撑爆?
答案是:不会,因为这是一套极度优雅的“消费者拉动生产者”的模型。
StreamingResponse(外层消费者)和 yield(内层生产者)的对话过程:
- StreamingResponse 发起索取:
StreamingResponse底层其实就是一个大while循环。它对生成器说:“喂,执行你的代码,给我下一块数据!” - 生成器开始干活: 你的
stream_llm_reply开始运行,去网线上抓数据,抓到后执行到yield chunk。 - 数据交接与冻结:
yield把这个chunk递给StreamingResponse,然后生成器就像被按了暂停键一样,死死停在yield这一行。注意,在StreamingResponse下一次来要数据之前,它绝对不会去网线上抓下一行! - 外层消费:
StreamingResponse拿着这个chunk,通过 Uvicorn 服务器把它打包发送给手机端。 - 网络背压控制: 如果手机端网络差,TCP 窗口满了,Uvicorn 发送这个
chunk就会受阻,就会稍微卡一会儿。 - 再次唤醒: 等手机端成功接收了这个
chunk,Uvicorn 终于发送完毕了。此时StreamingResponse才会开启新一轮循环,再次对生成器说:“好了,刚才那块消费完了,把我唤醒吧,去拿下一块!” - 生成器复活: 代码从
yield后面继续执行,进入下一次for循环,再去aiter_lines()探头看网线里有没有新数据。
手机端接收速度 -> 决定了 StreamingResponse 发送速度 -> 决定了 yield 被唤醒的频率 -> 决定了 aiter_lines 去套接字读取数据的频率。
如果最终手机端直接断网杀后台了,TCP 连接断开,StreamingResponse 会抛出异常并终止。你的生成器再也不会被唤醒,它占用的内存和那个连向大模型的 httpx 连接,就会顺着 async with 的大括号结束,被彻底销毁。干干净净,没有一丝内存泄漏。
疑问3: response.aiter_lines() 什么时候终止
虽然形容它是“不知何时结束的长河”,那是因为在它流动的过程中,无法提前预知它的总长度(没有 Content-Length)。
但是,这条河最终一定会迎来一个明确的“入海口”而结束。
在正常的网络通信和 SSE(Server-Sent Events)协议下,response.aiter_lines() 的结束可以从三个层面来彻底拆解:
1. 业务协议层(大模型的显式告别)
这是最直观的一层。对于兼容 OpenAI 标准的 API(比如咱们用的智谱或 DeepSeek),当大模型把最后半句废话也生成完之后,它会在网络流的最后,单独发送一条极其特殊的数据。
就像之前的代码里写的:
if data_str.strip() == "[DONE]":
break
服务器发来 data: [DONE],就相当于在业务层面明确告诉你:“我说完了”。代码里执行 break,就主动跳出了 async for 循环,这条长河在代码逻辑上就结束了。
2. HTTP 协议层(底层 Chunk 的终结)
假设代码里没有写 if "[DONE]": break,这个循环会自动结束吗?答案是:会的。
在流式输出时,HTTP 响应头里有一个关键配置:Transfer-Encoding: chunked(分块传输)。HTTP 协议规定了这种分块传输的结束方式: 当服务器(大模型端)发完所有真实数据后,它会向网线里发送一个长度为 0 的空数据块(通常是 0\r\n\r\n)。
httpx 底层的 HTTP 解析器一旦读到了这个长度为 0 的块,它就懂了:“哦,完整的 HTTP Body 已经接收完毕了。”
3. 语言机制层(Python 迭代器的死亡)
当 httpx 发现 HTTP 报文彻底结束,或者底层的 TCP Socket 被服务器正常关闭(收到了 FIN 包)时,它底层是如何通知 async for 循环的呢?
这涉及 Python 迭代器的核心机制: 当 aiter_lines() 底层再也读不出哪怕一滴数据(遇到了 EOF,End Of File)时,它会在内部默默抛出一个内置的异常:StopAsyncIteration。
async for 循环就是专门用来捕捉这个异常的。 一旦捕捉到 StopAsyncIteration,async for 循环就会非常平滑地自动退出,不报错,不崩溃,就像什么都没发生过一样,悄悄结束了它的使命。
补充:如果发生“非正常”结束?
上面说的都是大模型正常吐完数据的情况。但因为这个项目是为移动端 SDK 写配套服务,就必须考虑移动端极其恶劣的网络环境。
如果这条长河遭遇了“山洪”或“断流”,循环会怎么结束?
- 手机端强杀 App / 断网:TCP 连接断开。我们的外层
StreamingResponse发送失败报错退出,内层的async for循环会连带着被销毁,根本等不到结束。 - 大模型服务器宕机:如果在吐字吐到一半时,大模型服务器直接挂了。
httpx的aiter_lines()会等不到后续的 TCP 包,一旦超过了设置的timeout=30.0,底层就会抛出httpx.ReadTimeout异常,强行炸毁这个循环。
总结: 正常情况下,response.aiter_lines() 就像一个在传送带尽头拿快递的人。当大模型放上最后一个标记着 [DONE] 的箱子,或者传送带直接停止运转(底层的 HTTP/TCP 发出了结束信号,Python 抛出 StopAsyncIteration),这个人就知道活儿干完了,下班回家。
六、配套代码
👉 点击查看完整 `v0.3.0` 源码
import json
import httpx
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse # 核心:导入 FastAPI 的流式响应组件
from pydantic import BaseModel, Field
app = FastAPI(
title="PocketAgent Core API",
description="支持流式输出的 AI Agent",
version="0.3.0"
)
class ChatRequest(BaseModel):
query: str = Field(..., description="用户提问")
# --- 智谱大模型配置区 ---
API_KEY = "API_KEY"
BASE_URL = "https://open.bigmodel.cn/api/paas/v4/chat/completions"
MODEL_NAME = "glm-4-flash"
# --- 核心改造:这是一个 Python 异步生成器 (Generator) ---
async def stream_llm_reply(query: str):
payload = {
"model": MODEL_NAME,
"messages": [
{"role": "system", "content": "你是一个名为 PocketAgent 的智能体。你的回答需要简明扼要,像个极客。"},
{"role": "user", "content": query}
],
"temperature": 0.7,
"stream": True # 核心开关 1:告诉大模型,开启流式传输!
}
headers = {
"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json"
}
async with httpx.AsyncClient() as client:
# 核心开关 2:使用 client.stream 而不是 client.post
# 这意味着不会等待整个请求结束,而是建立连接后就开始接管底层的字节流
async with client.stream("POST", BASE_URL, json=payload, headers=headers, timeout=30.0) as response:
response.raise_for_status()
# 核心难点:解析 SSE 协议报文
# SSE 协议规定,服务器推过来的每一块数据,都以 "data: " 开头
async for line in response.aiter_lines():
if line.startswith("data: "):
# 把 "data: " 这 6 个字符切掉,剩下的就是纯 JSON 字符串
data_str = line[6:]
# 协议约定:大模型吐完所有的字后,最后会发一个 "[DONE]" 标记
if data_str.strip() == "[DONE]":
break
try:
data_json = json.loads(data_str)
# 注意这里的结构变了,流式返回时,字段名是 'delta' 而不是 'message'
chunk = data_json["choices"][0]["delta"].get("content", "")
if chunk:
# 核心开关 3:yield (产出)
# 它不像 return 会终止函数。它会把这几个字先“发射”出去,
# 然后函数暂停在这里,等外面的请求拿走了字,它再继续循环去接大模型的下一块数据。
yield chunk
except Exception as e:
print(f"解析块出现异常,忽略: {e}")
pass
# --- API 路由出口改造 ---
# 注意:流式接口不需要 response_model 了,因为返回的是持续的数据流,不是固定的 JSON
@app.post("/api/v1/chat/stream")
async def chat_stream(request: ChatRequest):
try:
# 将生成器丢给 StreamingResponse,并指定媒体类型为文本流
return StreamingResponse(
stream_llm_reply(request.query),
media_type="text/event-stream"
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))