Skip to content
Maozy's Blog
Go back

[PocketAgent 实战] Day1 Python SSE 与网络背压机制

PocketAgent 是我的从零开始的 AI Agent 项目,核心目标是打造一个开箱即用的移动端 AI Agent SDK。

核心痛点与架构选型

为了让客户端开发人员集成 SDK 后,能为用户提供极其丝滑的“打字机”体验,必须在后端为其铺设一条稳定的通信“水管”。

大模型 API(如 DeepSeek、智谱等)的响应通常伴随高延迟。如果配套的后端服务使用传统的 HTTP 同步阻塞请求,不仅服务端并发极差,客户端调用 SDK 后的体验(死等好几秒才出全量结果)也是无法接受的。

解决方案:基于 FastAPI 的全异步非阻塞架构 + Server-Sent Events (SSE) 流式传输。

对比 Java后端&Android 技术栈:


一、async with:优雅的网络 I/O 与资源管理

在调用大模型时,建立的是底层的 TCP Socket 连接。Python 提供了 async with 语法糖来处理异步网络请求。

async with httpx.AsyncClient() as client:
    async with client.stream("POST", url, json=payload, headers=headers) as response:
        # 处理持续的网络流

机制拆解:

二、yield

要实现端侧 SSE 的“流水席”效果,普通的 return 无法满足需求。必须引入 Python的生成器 (Generator) 与 yield。

async for line in response.aiter_lines():
    if line.startswith("data: "):
        chunk = parse_json(line) # 解析出最新的一个词
        if chunk:
            yield chunk  # 将词发射出去,并在此处冻结状态

return vs yield:

带有 yield 的函数不再是普通函数,它变成了一个源源不断为端侧 SDK 产出数据的流水线节点

三、StreamingResponse 与 HTTP Chunk

后端的生成器开始“吐字”后,需要通过 FastAPI 的 StreamingResponse 桥接,推给集成 SDK 的手机客户端。

return StreamingResponse(
    stream_llm_reply(request.query), 
    media_type="text/event-stream"
)

协议层面的改变:

四、网络背压 (Backpressure)

如果大模型吐字极快,而移动端所处网络极差(接收极慢),后端的内存会被撑爆吗?

答案是不会。因为这是一套完美的拉取模型 (Pull Model),其齿轮紧密咬合:

  1. 发送受阻:移动端网络差导致 TCP 接收窗口打满,Uvicorn 服务器在向端侧 SDK 发送 HTTP Chunk 时遭遇底层阻塞。
  2. 停止索要:外层 StreamingResponse 发送不出去,便会在原地等待,不会开启下一轮循环去向生成器索要数据。
  3. 休眠保内存:生成器迟迟等不到外部的唤醒信号,便会一直停留在 yield 的冻结状态。自然也不会去网卡缓冲区拉取大模型的新数据。

核心传动链:

SDK 端侧接收速率 → 决定后端下发速率 → 决定 StreamingResponse 的循环频率 → 决定 yield 的唤醒频率 → 决定底层 aiter_lines() 去 Socket 读取数据的频率。

通过这种“消费者拉动生产者”的机制,支撑 SDK 的底层服务在不额外消耗内存的情况下,完美适配了复杂多变的移动端网络环境。

五、问题整理

疑问 1:response.aiter_lines() 是实时在变化的吗?

绝对是实时的。 :“大模型不可能一开始就把全量结果直接给返回了”。

在传统的代码里,如果写 lines = file.readlines(),那是把整个静态文件一口气全读进内存,这是一个死数据。

但是,这里的 response.aiter_lines() 面对的不是一个硬盘里的死文件,而是一个活着且敞开的 TCP 网络套接字(Socket)缓冲区

所以,这是一条动态的、流动的、不知何时结束的长河

疑问 2:yield 会去判断数据是不是被外层消费了吗?

正确。这也是 Python 生成器设计的绝妙之处。

如果大模型算得极其快,一秒钟发过来一万个字,而外层的手机端网络极差,一秒钟只能接收十个字,那服务器内存会不会被撑爆?

答案是:不会,因为这是一套极度优雅的“消费者拉动生产者”的模型。

StreamingResponse(外层消费者)和 yield(内层生产者)的对话过程:

  1. StreamingResponse 发起索取: StreamingResponse 底层其实就是一个大 while 循环。它对生成器说:“喂,执行你的代码,给我下一块数据!”
  2. 生成器开始干活: 你的 stream_llm_reply 开始运行,去网线上抓数据,抓到后执行到 yield chunk
  3. 数据交接与冻结: yield 把这个 chunk 递给 StreamingResponse,然后生成器就像被按了暂停键一样,死死停在 yield 这一行。注意,在 StreamingResponse 下一次来要数据之前,它绝对不会去网线上抓下一行!
  4. 外层消费: StreamingResponse 拿着这个 chunk,通过 Uvicorn 服务器把它打包发送给手机端。
  5. 网络背压控制: 如果手机端网络差,TCP 窗口满了,Uvicorn 发送这个 chunk 就会受阻,就会稍微卡一会儿。
  6. 再次唤醒: 等手机端成功接收了这个 chunk,Uvicorn 终于发送完毕了。此时 StreamingResponse 才会开启新一轮循环,再次对生成器说:“好了,刚才那块消费完了,把我唤醒吧,去拿下一块!”
  7. 生成器复活: 代码从 yield 后面继续执行,进入下一次 for 循环,再去 aiter_lines() 探头看网线里有没有新数据。

手机端接收速度 -> 决定了 StreamingResponse 发送速度 -> 决定了 yield 被唤醒的频率 -> 决定了 aiter_lines 去套接字读取数据的频率。

如果最终手机端直接断网杀后台了,TCP 连接断开,StreamingResponse 会抛出异常并终止。你的生成器再也不会被唤醒,它占用的内存和那个连向大模型的 httpx 连接,就会顺着 async with 的大括号结束,被彻底销毁。干干净净,没有一丝内存泄漏。

疑问3: response.aiter_lines() 什么时候终止

虽然形容它是“不知何时结束的长河”,那是因为在它流动的过程中,无法提前预知它的总长度(没有 Content-Length)。

但是,这条河最终一定会迎来一个明确的“入海口”而结束。

在正常的网络通信和 SSE(Server-Sent Events)协议下,response.aiter_lines() 的结束可以从三个层面来彻底拆解:

1. 业务协议层(大模型的显式告别)

这是最直观的一层。对于兼容 OpenAI 标准的 API(比如咱们用的智谱或 DeepSeek),当大模型把最后半句废话也生成完之后,它会在网络流的最后,单独发送一条极其特殊的数据。

就像之前的代码里写的:

if data_str.strip() == "[DONE]":
    break

服务器发来 data: [DONE],就相当于在业务层面明确告诉你:“我说完了”。代码里执行 break,就主动跳出了 async for 循环,这条长河在代码逻辑上就结束了。

2. HTTP 协议层(底层 Chunk 的终结)

假设代码里没有写 if "[DONE]": break,这个循环会自动结束吗?答案是:会的。

在流式输出时,HTTP 响应头里有一个关键配置:Transfer-Encoding: chunked(分块传输)。HTTP 协议规定了这种分块传输的结束方式: 当服务器(大模型端)发完所有真实数据后,它会向网线里发送一个长度为 0 的空数据块(通常是 0\r\n\r\n

httpx 底层的 HTTP 解析器一旦读到了这个长度为 0 的块,它就懂了:“哦,完整的 HTTP Body 已经接收完毕了。”

3. 语言机制层(Python 迭代器的死亡)

httpx 发现 HTTP 报文彻底结束,或者底层的 TCP Socket 被服务器正常关闭(收到了 FIN 包)时,它底层是如何通知 async for 循环的呢?

这涉及 Python 迭代器的核心机制: 当 aiter_lines() 底层再也读不出哪怕一滴数据(遇到了 EOF,End Of File)时,它会在内部默默抛出一个内置的异常:StopAsyncIteration

async for 循环就是专门用来捕捉这个异常的。 一旦捕捉到 StopAsyncIterationasync for 循环就会非常平滑地自动退出,不报错,不崩溃,就像什么都没发生过一样,悄悄结束了它的使命。

补充:如果发生“非正常”结束?

上面说的都是大模型正常吐完数据的情况。但因为这个项目是为移动端 SDK 写配套服务,就必须考虑移动端极其恶劣的网络环境。

如果这条长河遭遇了“山洪”或“断流”,循环会怎么结束?

  1. 手机端强杀 App / 断网:TCP 连接断开。我们的外层 StreamingResponse 发送失败报错退出,内层的 async for 循环会连带着被销毁,根本等不到结束。
  2. 大模型服务器宕机:如果在吐字吐到一半时,大模型服务器直接挂了。httpxaiter_lines() 会等不到后续的 TCP 包,一旦超过了设置的 timeout=30.0,底层就会抛出 httpx.ReadTimeout 异常,强行炸毁这个循环。

总结: 正常情况下,response.aiter_lines() 就像一个在传送带尽头拿快递的人。当大模型放上最后一个标记着 [DONE] 的箱子,或者传送带直接停止运转(底层的 HTTP/TCP 发出了结束信号,Python 抛出 StopAsyncIteration),这个人就知道活儿干完了,下班回家。

六、配套代码

👉 点击查看完整 `v0.3.0` 源码
import json
import httpx
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse # 核心:导入 FastAPI 的流式响应组件
from pydantic import BaseModel, Field

app = FastAPI(
    title="PocketAgent Core API",
    description="支持流式输出的 AI Agent",
    version="0.3.0"
)

class ChatRequest(BaseModel):
    query: str = Field(..., description="用户提问")

# --- 智谱大模型配置区 ---
API_KEY = "API_KEY" 
BASE_URL = "https://open.bigmodel.cn/api/paas/v4/chat/completions"
MODEL_NAME = "glm-4-flash"

# --- 核心改造:这是一个 Python 异步生成器 (Generator) ---
async def stream_llm_reply(query: str):
    payload = {
        "model": MODEL_NAME,
        "messages": [
            {"role": "system", "content": "你是一个名为 PocketAgent 的智能体。你的回答需要简明扼要,像个极客。"},
            {"role": "user", "content": query}
        ],
        "temperature": 0.7,
        "stream": True  # 核心开关 1:告诉大模型,开启流式传输!
    }
    
    headers = {
        "Authorization": f"Bearer {API_KEY}",
        "Content-Type": "application/json"
    }
    
    async with httpx.AsyncClient() as client:
        # 核心开关 2:使用 client.stream 而不是 client.post
        # 这意味着不会等待整个请求结束,而是建立连接后就开始接管底层的字节流
        async with client.stream("POST", BASE_URL, json=payload, headers=headers, timeout=30.0) as response:
            response.raise_for_status()
            
            # 核心难点:解析 SSE 协议报文
            # SSE 协议规定,服务器推过来的每一块数据,都以 "data: " 开头
            async for line in response.aiter_lines():
                if line.startswith("data: "):
                    # 把 "data: " 这 6 个字符切掉,剩下的就是纯 JSON 字符串
                    data_str = line[6:] 
                    
                    # 协议约定:大模型吐完所有的字后,最后会发一个 "[DONE]" 标记
                    if data_str.strip() == "[DONE]":
                        break
                    
                    try:
                        data_json = json.loads(data_str)
                        # 注意这里的结构变了,流式返回时,字段名是 'delta' 而不是 'message'
                        chunk = data_json["choices"][0]["delta"].get("content", "")
                        if chunk:
                            # 核心开关 3:yield (产出)
                            # 它不像 return 会终止函数。它会把这几个字先“发射”出去,
                            # 然后函数暂停在这里,等外面的请求拿走了字,它再继续循环去接大模型的下一块数据。
                            yield chunk
                    except Exception as e:
                        print(f"解析块出现异常,忽略: {e}")
                        pass

# --- API 路由出口改造 ---
# 注意:流式接口不需要 response_model 了,因为返回的是持续的数据流,不是固定的 JSON
@app.post("/api/v1/chat/stream")
async def chat_stream(request: ChatRequest):
    try:
        # 将生成器丢给 StreamingResponse,并指定媒体类型为文本流
        return StreamingResponse(
            stream_llm_reply(request.query), 
            media_type="text/event-stream"
        )
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

Share this post on:

Previous Post
[PocketAgent 实战] Day2 多轮记忆与滑动窗口
Next Post
[Jetpack Compose] 副作用(Side Effects)和 LaunchedEffect