Appearance
PocketFlow 应用案例(Application Cases)
学习指南:本章精选了 PocketFlow 的 12 个应用案例,从入门到进阶,覆盖聊天、RAG、智能体、批处理、并行等常见模式。每个案例都包含 Flow 架构图、核心代码和学习要点。
🎯PocketFlow 应用案例全景
最基础的 ChatBot —— 维护对话历史,调用 LLM 生成回复,支持多轮对话。
多步骤写作流程:先列大纲,再分章节撰写,最后统一润色风格。
经典 RAG 流程:离线构建向量索引,在线检索相关文档片段并增强 LLM 生成。
能够调用搜索工具的研究智能体 —— 理解问题、搜索网络、整合答案。
Taboo 猜词游戏 —— 两个 AsyncNode 智能体通过消息队列异步通信,asyncio.gather 并发运行。
批量评估简历 —— 并行处理每份简历,最后汇总排名。
使用 AsyncParallelBatchNode 并行处理多张图片,实现 8x 加速。
让 LLM 输出严格 JSON 格式 —— 生成、解析校验、格式不对则自动重新生成。
实现 Chain-of-Thought 推理 —— 分步思考,逐步求解复杂问题。
通过 Model Context Protocol 集成外部工具,构建具备丰富工具使用能力的智能体。
将领域知识模块化为 Markdown 技能文件,智能体根据用户请求动态选择技能并注入 prompt。
人类设计 + AI 实现的高效协作范式 —— 8 步流程从需求到可靠系统的完整工程实践。
0. 案例地图:选择你的学习路径
不同背景的读者可以选择不同的入门路径:
零基础入门
想做智能体
关注性能
输出质量
1. 聊天机器人(ChatBot)
难度:入门 | 模式:链式 + 循环 | 关键词:对话历史、多轮对话
1.1 架构
GetInput → CallLLM → SendReply
↑ |
└─── "continue" ──────┘1.2 核心思路
聊天机器人是最基础的 LLM 应用,它的 Flow 只有三个节点:
- GetInput:从用户获取输入,将其追加到
shared["history"] - CallLLM:拼接对话历史为 prompt,调用 LLM API
- SendReply:输出回复,
post()返回"continue"跳回 GetInput
1.3 关键代码
下面是完整的聊天机器人实现。注意 SendReply 的 post() 返回 "continue" 跳回 GetInput,形成对话循环:
python
from pocketflow import Node, Flow
class GetInput(Node):
def prep(self, shared):
return shared.get("history", [])
def exec(self, history):
user_input = input("You: ")
return user_input
def post(self, shared, prep_res, exec_res):
if exec_res.lower() == "quit":
return "end"
shared.setdefault("history", []).append(
{"role": "user", "content": exec_res}
)
return "default"
class CallLLM(Node):
def prep(self, shared):
return shared["history"]
def exec(self, history):
# 调用你的 LLM API
response = call_llm(history)
return response
def post(self, shared, prep_res, exec_res):
shared["history"].append(
{"role": "assistant", "content": exec_res}
)
shared["last_reply"] = exec_res
class SendReply(Node):
def prep(self, shared):
return shared["last_reply"]
def exec(self, reply):
print(f"AI: {reply}")
return reply
def post(self, shared, prep_res, exec_res):
return "continue"
# 构建 Flow
get_input = GetInput()
call_llm = CallLLM()
send_reply = SendReply()
get_input >> call_llm >> send_reply
send_reply - "continue" >> get_input
flow = Flow(start=get_input)
flow.run({})学习要点
- 循环模式:
send_reply - "continue" >> get_input实现多轮对话 - shared 通信:对话历史存在
shared["history"]中 - 退出条件:
get_input的post()返回"end"时无后继节点,Flow 结束
2. 写作工作流(Writing Workflow)
难度:入门 | 模式:链式 | 关键词:多步骤生成、内容流水线
2.1 架构
写作工作流的架构非常简洁 —— 三个节点依次处理,从大纲到草稿再到润色:
Outline → WriteDraft → Polish2.2 核心代码
每个节点对应写作流程的一个阶段,通过 shared 字典传递中间结果(大纲 → 草稿 → 成稿):
python
from pocketflow import Node, Flow
class OutlineNode(Node):
"""第一步:生成文章大纲"""
def prep(self, shared):
return shared["topic"] # 从 shared 读取主题
def exec(self, topic):
prompt = f"为主题'{topic}'列出文章大纲(3-5 个章节)"
return call_llm(prompt) # 调用 LLM 生成大纲
def post(self, shared, prep_res, exec_res):
shared["outline"] = exec_res # 写入 shared,供下一个节点读取
class WriteDraftNode(Node):
"""第二步:根据大纲撰写草稿"""
def prep(self, shared):
return shared["outline"] # 读取上一步的大纲
def exec(self, outline):
prompt = f"根据以下大纲撰写完整文章:\n{outline}"
return call_llm(prompt)
def post(self, shared, prep_res, exec_res):
shared["draft"] = exec_res # 写入草稿
class PolishNode(Node):
"""第三步:润色成稿"""
def prep(self, shared):
return shared["draft"] # 读取草稿
def exec(self, draft):
prompt = f"润色以下文章,使语言更流畅:\n{draft}"
return call_llm(prompt)
def post(self, shared, prep_res, exec_res):
shared["final_article"] = exec_res # 最终成品
# 实例化节点并用 >> 连接成链
outline = OutlineNode()
write_draft = WriteDraftNode()
polish = PolishNode()
outline >> write_draft >> polish # 链式连接
flow = Flow(start=outline)
flow.run({"topic": "PocketFlow 入门指南"})学习要点
- 最简链式模式:三个节点顺序执行,每个节点做一件事
- 任务分解原则:复杂任务拆分为小步骤,每步用一个 Node 处理
- 不要过度拆分:太细的拆分会导致节点间上下文不连贯
3. RAG 检索增强生成
难度:入门 | 模式:链式 + BatchNode | 关键词:向量检索、知识库
3.1 架构
离线索引:Chunk → Embed → Index
在线查询:Retrieve → Generate3.2 核心思路
RAG 分两个阶段:
离线阶段(构建索引):
- Chunk:将文档切分成小片段
- Embed:使用 BatchNode 批量计算向量
- Index:存入向量数据库
在线阶段(回答问题):
- Retrieve:根据问题检索相关片段
- Generate:将检索到的 context 和 question 拼接,调用 LLM
3.3 关键代码
离线阶段将文档切分、向量化、建索引;在线阶段根据问题检索相关片段并生成答案。注意 EmbedBatch 使用 BatchNode 批量处理每个 chunk:
python
from pocketflow import Node, BatchNode, Flow
# ===== 离线阶段 =====
class ChunkNode(Node):
def prep(self, shared):
return shared["documents"]
def exec(self, docs):
chunks = []
for doc in docs:
chunks.extend(split_text(doc, chunk_size=500))
return chunks
def post(self, shared, prep_res, exec_res):
shared["chunks"] = exec_res
class EmbedBatch(BatchNode):
"""使用 BatchNode 批量处理每个 chunk"""
def prep(self, shared):
return shared["chunks"] # 返回列表
def exec(self, chunk):
# 每个 chunk 独立计算 embedding
return compute_embedding(chunk)
def post(self, shared, prep_res, exec_res):
shared["embeddings"] = exec_res # 所有结果的列表
class IndexNode(Node):
"""将 chunk 与 embedding 配对存入索引"""
def prep(self, shared):
return {
"chunks": shared["chunks"],
"embeddings": shared["embeddings"],
}
def exec(self, data):
# 构建索引(实际场景使用向量数据库)
index = list(zip(data["chunks"], data["embeddings"]))
return index
def post(self, shared, prep_res, exec_res):
shared["index"] = exec_res
# ===== 在线阶段 =====
class RetrieveNode(Node):
def prep(self, shared):
return shared["question"]
def exec(self, question):
q_embedding = compute_embedding(question)
top_k = vector_search(q_embedding, k=3)
return top_k
def post(self, shared, prep_res, exec_res):
shared["context"] = "\n".join(exec_res)
class GenerateNode(Node):
def prep(self, shared):
return {
"context": shared["context"],
"question": shared["question"]
}
def exec(self, data):
prompt = f"""基于以下信息回答问题:
{data['context']}
问题:{data['question']}"""
return call_llm(prompt)
def post(self, shared, prep_res, exec_res):
shared["answer"] = exec_res
print(f"Answer: {exec_res}")
# 构建离线 Flow
chunk = ChunkNode()
embed = EmbedBatch()
index = IndexNode()
chunk >> embed >> index
offline_flow = Flow(start=chunk)
# 构建在线 Flow
retrieve = RetrieveNode()
generate = GenerateNode()
retrieve >> generate
online_flow = Flow(start=retrieve)学习要点
- BatchNode:
EmbedBatch的prep()返回列表,exec()对每个 chunk 独立执行 - 两条 Flow:离线索引和在线查询是独立的 Flow,共享同一个向量数据库
- 三节点离线流水线:Chunk → Embed → Index 完整覆盖索引构建过程
4. 搜索智能体
难度:中级 | 模式:循环 + 条件分支 | 关键词:工具调用、自主决策
4.1 架构
搜索智能体的核心是 Think → Act → Observe 循环 —— LLM 判断信息是否充足,不够则继续搜索:
Think → Search
↑ |
| "need_more"
└────────┘
|
"enough"
↓
Synthesize4.2 核心代码
三个节点分工明确:ThinkNode 决策、SearchNode 执行搜索、SynthesizeNode 整合答案。注意 post() 中通过返回不同的 action 字符串来控制 Flow 走向:
python
class ThinkNode(Node):
def prep(self, shared):
return {
"question": shared["question"],
"search_results": shared.get("search_results", [])
}
def exec(self, data):
prompt = f"""问题:{data['question']}
已有信息:{data['search_results']}
请决定:还需要搜索什么?输出搜索关键词,或输出 ENOUGH 表示信息充分。"""
return call_llm(prompt)
def post(self, shared, prep_res, exec_res):
if "ENOUGH" in exec_res:
return "enough"
shared["search_query"] = exec_res
return "need_more"
class SearchNode(Node):
def prep(self, shared):
return shared["search_query"]
def exec(self, query):
return web_search(query) # 调用搜索 API
def post(self, shared, prep_res, exec_res):
shared.setdefault("search_results", []).extend(exec_res)
class SynthesizeNode(Node):
def prep(self, shared):
return {
"question": shared["question"],
"results": shared["search_results"]
}
def exec(self, data):
prompt = f"基于以下搜索结果回答问题...\n{data}"
return call_llm(prompt)
# 构建 Flow
think = ThinkNode()
search = SearchNode()
synthesize = SynthesizeNode()
think - "need_more" >> search # 信息不足则继续搜索
think - "enough" >> synthesize # 信息充分则生成
search >> think # 搜索后回到思考
flow = Flow(start=think)
flow.run({"question": "PocketFlow 和 LangChain 有什么区别?"})4.3 智能体 设计最佳实践
构建高性能 智能体 时,以下原则至关重要:
上下文管理:向 LLM 提供相关且精简的上下文,而非全量信息。LLM 在处理过长文本时容易“迷失在中间”(lost in the middle),应该用 RAG 检索相关片段而非直接塞入完整历史。
动作空间设计:每个 action 应结构清晰、含义明确,避免语义重叠。例如,不要同时定义 search_database 和 query_csv —— 应该合并为统一的 search_data(source, query) 接口。
渐进式信息展示:不要一次性展示全部信息。先给 LLM 概览(目录、摘要),让它选择要深入的部分。每次最多输入约 500 行内容。
参数化动作:让 action 通过参数灵活化。例如 search_data(query="...", source="db") 比固定的 search_database 和 search_csv 更灵活通用。
错误恢复:支持回退操作,让 智能体 能撤销失败的步骤。部分回退比完全重启更高效。
学习要点
- 智能体 核心模式:Think → Act → Observe 的循环
- 自主决策:LLM 在
exec()中判断是否需要更多信息 - 工具调用:
SearchNode.exec()调用外部搜索 API
5. 多智能体协作
难度:中级 | 模式:AsyncNode + 消息队列 + 并发 | 关键词:异步通信、协作博弈
5.1 场景
Taboo 猜词游戏:一个 智能体 描述词语(不能说出关键词),另一个 智能体 猜词。两个 智能体 通过异步消息队列通信,使用 asyncio.gather() 并发运行。
5.2 架构
两个智能体各自运行独立的 AsyncFlow,通过异步消息队列交换信息,由 asyncio.gather() 并发驱动:
Hinter智能体 ←──── asyncio.Queue ────→ Guesser智能体
↻ "continue" ↻ "continue"
└───── asyncio.gather() ──────────┘5.3 核心代码
两个智能体各自继承 AsyncNode,使用 async/await 三阶段。注意 prep_async 中通过 await queue.get() 等待对方消息,post_async 中通过 await queue.put() 发送消息:
python
import asyncio
from pocketflow import AsyncNode, AsyncFlow
class Hinter智能体(AsyncNode):
"""提示者:描述目标词,不能使用禁忌词"""
async def prep_async(self, shared):
msg = await shared["hinter_queue"].get() # 等待消息
return {
"msg": msg,
"word": shared["word"],
"taboo_words": shared["taboo_words"],
}
async def exec_async(self, data):
if data["msg"] == "start":
prompt = f"请描述'{data['word']}',不能使用:{data['taboo_words']}"
else:
prompt = f"对方猜的是'{data['msg']}',不对。换个方式描述'{data['word']}',不能使用:{data['taboo_words']}"
return await async_call_llm(prompt)
async def post_async(self, shared, prep_res, exec_res):
await shared["guesser_queue"].put(exec_res) # 发送提示给猜测者
if shared.get("game_over"):
return "end"
return "continue"
class Guesser智能体(AsyncNode):
"""猜测者:根据提示猜词"""
async def prep_async(self, shared):
hint = await shared["guesser_queue"].get() # 等待提示
return hint
async def exec_async(self, hint):
prompt = f"根据以下描述猜一个词:{hint}"
return await async_call_llm(prompt)
async def post_async(self, shared, prep_res, exec_res):
if exec_res.strip() == shared["word"]:
shared["game_over"] = True
print(f"猜对了!答案是「{shared['word']}」")
return "end"
shared["round"] = shared.get("round", 0) + 1
if shared["round"] >= 5:
shared["game_over"] = True
print(f"超过 5 轮,游戏结束。答案是「{shared['word']}」")
return "end"
await shared["hinter_queue"].put(exec_res) # 告诉提示者猜错了
return "continue"
# 每个 智能体 自循环
hinter = Hinter智能体()
hinter - "continue" >> hinter
hinter_flow = AsyncFlow(start=hinter)
guesser = Guesser智能体()
guesser - "continue" >> guesser
guesser_flow = AsyncFlow(start=guesser)
# 两个 智能体 并发运行
async def main():
shared = {
"word": "大熊猫",
"taboo_words": ["熊猫", "国宝", "黑白"],
"hinter_queue": asyncio.Queue(),
"guesser_queue": asyncio.Queue(),
}
shared["hinter_queue"].put_nowait("start") # 启动信号
await asyncio.gather(
hinter_flow.run_async(shared),
guesser_flow.run_async(shared),
)
asyncio.run(main())学习要点
- AsyncNode:使用
prep_async/exec_async/post_async异步三阶段 - 消息队列:
asyncio.Queue实现 智能体 间的异步通信 - 并发执行:
asyncio.gather()让两个 智能体 同时运行,通过队列协调 - 自循环:
agent - "continue" >> agent实现 智能体 的持续运行循环 - AsyncFlow:AsyncNode 必须包裹在 AsyncFlow 中,不能用普通 Flow
6. Map-Reduce 批处理
难度:入门 | 模式:BatchNode | 关键词:批量评估、数据聚合
6.1 架构
Map-Reduce 是最常见的批处理模式 —— 把一批数据"拆开"分别处理,再"合并"结果:
┌─ exec(item1) ─┐
├─ exec(item2) ─┤
prep ─────├─ exec(item3) ─┤───── post
(列表) ├─ ... ─┤ (结果列表)
└─ exec(itemN) ─┘6.2 核心思路
BatchNode 的核心约定:prep() 返回一个列表,框架自动对列表中的每个元素调用 exec(),最后把所有结果收集为列表传给 post()。你只需要写"处理单个元素"的逻辑,批量调度由框架完成。
6.3 关键代码
下面以"批量评估简历"为例,展示 BatchNode 的三阶段如何配合:
python
from pocketflow import BatchNode, Flow
class EvalResume(BatchNode):
"""批量评估简历 —— 继承 BatchNode 即可获得批处理能力"""
def prep(self, shared):
# prep 返回列表,框架会逐个取出交给 exec
return shared["resumes"]
def exec(self, resume):
# 这里只处理"单份简历",框架负责循环
prompt = f"请为以下简历评分(1-10):\n{resume}"
score = call_llm(prompt)
return {"resume": resume, "score": int(score)}
def post(self, shared, prep_res, exec_res):
# exec_res 是所有评分结果的列表,与 prep 返回的顺序一致
shared["scores"] = sorted(
exec_res, key=lambda x: x["score"], reverse=True
)
print(f"Top 3: {shared['scores'][:3]}")
# 构建并运行 Flow
eval_node = EvalResume()
flow = Flow(start=eval_node)
flow.run({"resumes": ["简历A...", "简历B...", "简历C..."]})学习要点
- 三阶段约定:
prep()返回列表 →exec()对每个元素独立执行 →post()收到结果列表 - 自动重试:每个元素的
exec()都有独立的重试机制,单个失败不影响其他元素 - 代码极简:你只需要写处理单个元素的逻辑,代码量与处理单条数据几乎相同
从同步到异步:真实业务的必然选择
上面的 BatchNode 使用同步 exec(),适合教学演示。但在真实业务中,LLM API 调用、Web 搜索、数据库查询都是 I/O 密集型网络请求 —— 每次要等几百毫秒到几秒。同步逐个处理 8 个请求,总耗时 = 8 × 单次;异步并行则可以同时发出,总耗时 ≈ 单次。
PocketFlow 提供三种批处理模式:
| 模式 | 类 | 执行方式 | 适用场景 |
|---|---|---|---|
| 同步逐个 | BatchNode | for item: exec(item) | 教学演示、CPU 密集、需严格顺序 |
| 异步逐个 | AsyncBatchNode | for item: await exec(item) | API 有速率限制、需要顺序但不阻塞事件循环 |
| 异步并行 | AsyncParallelBatchNode | asyncio.gather(*all) | 最常用 —— I/O 密集场景,N 倍加速 |
本教程的案例使用同步模拟函数降低入门门槛。接入真实 API 时,只需将 Node 换成 AsyncNode、exec() 换成 async exec_async(),逻辑完全不变。
7. 并行处理(8x 加速)
难度:中级 | 模式:AsyncParallelBatchNode | 关键词:并发、I/O 密集
7.1 场景
你需要对 8 篇文章分别调用 LLM 生成摘要。同步 BatchNode 每篇等 2 秒,总计 16 秒;用 AsyncParallelBatchNode 并行执行,8 篇同时请求,总计约 2 秒 —— 8 倍加速。
7.2 架构
AsyncParallelBatchNode 的执行模型:prep_async 返回列表后,所有 exec_async 通过 asyncio.gather() 同时发出,最后 post_async 收到有序的结果列表:
┌─ exec_async(item1) ─┐
├─ exec_async(item2) ─┤
prep_async ├─ exec_async(item3) ─┤ post_async
(列表) ├─ ... ─┤ (结果列表)
└─ exec_async(item8) ─┘
asyncio.gather()7.3 核心代码
只需继承 AsyncParallelBatchNode,将 exec 改为 async exec_async,框架自动用 asyncio.gather() 并行执行所有元素:
python
import asyncio
from pocketflow import AsyncParallelBatchNode, AsyncFlow
class ParallelProcess(AsyncParallelBatchNode):
async def prep_async(self, shared):
return shared["items"] # 返回待处理列表
async def exec_async(self, item):
# 每个 item 并发执行,不互相等待
result = await async_api_call(item)
return result
async def post_async(self, shared, prep_res, exec_res):
shared["results"] = exec_res # 结果列表,顺序与输入一致7.4 对比:同步 vs 异步并行
下面的对比展示了从 BatchNode 到 AsyncParallelBatchNode 的迁移有多简单 —— 只需改继承类和加 async/await:
python
# 同步方式(案例 6)—— 逐个执行,总耗时 = N × 单次
class SyncProcess(BatchNode):
def exec(self, item):
return call_api(item) # 阻塞等待
# 异步并行(本案例)—— 全部同时发出,总耗时 ≈ 单次
class ParallelProcess(AsyncParallelBatchNode):
async def exec_async(self, item):
return await async_api_call(item) # 非阻塞学习要点
- 为什么用异步:真实 LLM API 都是网络 I/O,天然适合 async/await,同步会白白浪费等待时间
- 接口对称:BatchNode → AsyncParallelBatchNode,
exec()→async exec_async(),其余逻辑不变 - 结果有序:
asyncio.gather()返回的结果列表与输入顺序一致 - AsyncFlow:异步节点必须包裹在
AsyncFlow中运行,不能用普通Flow - 何时不用并行:API 有速率限制时,改用
AsyncBatchNode(顺序异步),避免被限流
8. 结构化输出(Structured Output)
难度:中级 | 模式:循环 + 重试 + 校验 | 关键词:JSON 解析、格式验证、可靠输出
8.1 架构
通过"生成 → 校验 → 检查"三步循环,确保 LLM 输出符合预期的 JSON 格式:
text
┌────────── "retry" ─────────┐
│ │
↓ │
Generate ──→ Validate ──→ Check
│
│ "done"
↓
Output8.2 核心思路
LLM 的输出是自由文本,但下游系统往往需要结构化数据(JSON、表格、特定格式)。核心挑战是:LLM 可能输出格式不对的内容。解决方案:生成 → 解析校验 → 不对就重来。
8.3 关键代码
实现分为四个节点:GenerateJSON 生成、ValidateJSON 解析校验(带 max_retries)、CheckResult 决策重试或通过、Output 输出。注意双层重试机制 —— 节点内重试解析,Flow 层重试生成:
python
import json
import re
from pocketflow import Node, Flow
class GenerateJSON(Node):
def prep(self, shared):
return shared["task"]
def exec(self, task):
prompt = f"""请为以下任务生成严格的 JSON 格式结果:
{task}
输出格式:{{"name": "...", "score": 0-100, "reason": "..."}}
只输出 JSON,不要其他文字。"""
return call_llm(prompt)
def post(self, shared, prep_res, exec_res):
shared["raw_output"] = exec_res
class ValidateJSON(Node):
"""解析并校验 JSON 格式,利用 max_retries 自动重试解析"""
def prep(self, shared):
return shared["raw_output"]
def exec(self, raw):
# 提取 JSON 部分
match = re.search(r'\{.*\}', raw, re.DOTALL)
if not match:
raise ValueError("输出中未找到 JSON")
data = json.loads(match.group())
# 校验必需字段和类型
assert "name" in data, "缺少 name 字段"
assert "score" in data, "缺少 score 字段"
assert isinstance(data["score"], (int, float)), "score 必须是数字"
assert 0 <= data["score"] <= 100, "score 必须在 0-100 之间"
return data
def exec_fallback(self, prep_res, exc):
# 解析失败,返回 None 触发重新生成
print(f"解析失败:{exc}")
return None
def post(self, shared, prep_res, exec_res):
shared["result"] = exec_res # 写入解析结果(成功为 dict,失败为 None)
class CheckResult(Node):
def prep(self, shared):
return shared.get("result")
def post(self, shared, prep_res, exec_res):
if shared.get("result") is None:
shared["retry_count"] = shared.get("retry_count", 0) + 1
if shared["retry_count"] >= 3:
return "give_up"
return "retry" # 解析失败,让 LLM 重新生成
return "done"
# 构建 Flow
generate = GenerateJSON()
validate = ValidateJSON(max_retries=2) # 解析本身可重试 2 次
check = CheckResult()
output = Node() # 占位输出节点
generate >> validate >> check
check - "retry" >> generate # 格式不对,重新生成
check - "done" >> output # 格式正确,输出结果
check - "give_up" >> output # 多次失败,放弃
flow = Flow(start=generate)
flow.run({"task": "评估候选人张三的 Python 编程能力"})学习要点
- 双层重试:
ValidateJSON(max_retries=2)在节点内重试解析,check - "retry" >> generate在 Flow 层重试生成 - exec_fallback:解析失败时不抛异常,而是返回
None让后续节点决策 - 防御性解析:用正则提取 JSON、逐字段校验,应对 LLM 输出的不确定性
- 退出条件:设置最大重试次数,避免无限循环
9. 思维链推理(Chain-of-Thought)
难度:进阶 | 模式:循环 + 自检 | 关键词:分步推理、自我验证、复杂问题求解
9.1 架构
分步推理 + 自我验证的循环架构 —— 每推一步就校验,错了则回退重推:
text
┌──── "error" ────┐
│ │
│ ┌─ "continue" ─┤
│ │ │
↓ ↓ │
StepReason ──→ Verify
│
│ "ok"
↓
ConcludeVerify 有三条出路:步骤有误时
"error"回退重推;步骤正确但未完成时"continue"继续下一步;全部完成时"ok"进入 Conclude 输出答案。
9.2 核心思路
复杂问题(数学、逻辑、多步规划)直接让 LLM 一步回答容易出错。解决方案:分步推理,每步验证。
- StepReason:每次只推理一步,追加到推理链
- Verify:检查最新一步是否正确,不正确则回退重推
- Conclude:推理完成后,整合所有步骤给出最终答案
9.3 关键代码
三个节点各司其职:StepReason 推理一步、Verify 校验正确性、Conclude 整合答案。shared["steps"] 列表记录完整推理链,验证失败时 pop() 回退:
python
from pocketflow import Node, Flow
class StepReason(Node):
def prep(self, shared):
return {
"question": shared["question"],
"steps": shared.get("steps", []),
}
def exec(self, data):
prompt = f"""问题:{data['question']}
已有推理步骤:{data['steps']}
请继续推理下一步,输出格式:
STEP: [推理过程]
ANSWER: [如果已得出最终答案]"""
return call_llm(prompt)
def post(self, shared, prep_res, exec_res):
shared.setdefault("steps", []).append(exec_res)
shared["latest_step"] = exec_res
class Verify(Node):
def prep(self, shared):
return {
"steps": shared["steps"],
"latest_step": shared["latest_step"],
}
def exec(self, data):
prompt = f"请验证以下推理是否正确:\n{data['steps']}\n如果有错误请指出。"
return call_llm(prompt)
def post(self, shared, prep_res, exec_res):
if "错误" in exec_res:
shared["steps"].pop() # 移除错误的步骤
return "error" # 回退重推
if "ANSWER" in shared.get("latest_step", ""):
return "ok" # 已得出答案
return "continue" # 正确但未完成
class Conclude(Node):
def prep(self, shared):
return shared["steps"]
def exec(self, steps):
prompt = f"基于以下推理步骤,给出最终答案:\n{steps}"
return call_llm(prompt)
def post(self, shared, prep_res, exec_res):
shared["answer"] = exec_res
# 构建 Flow
step_reason = StepReason()
verify = Verify()
conclude = Conclude()
step_reason >> verify
verify - "error" >> step_reason # 发现错误,重推
verify - "continue" >> step_reason # 继续推理下一步
verify - "ok" >> conclude # 验证通过,输出
flow = Flow(start=step_reason)
flow.run({"question": "一个水池有 A、B 两个进水管,A 管 4 小时注满,B 管 6 小时注满,同时开两管几小时注满?"})学习要点
- 分步推理:每次只推理一步,降低单步出错概率
- 自我验证:Verify 节点检查推理正确性,错误则回退
- 步骤管理:
shared["steps"]列表记录完整推理链,验证失败时pop()回退 - 与结构化输出的区别:结构化输出校验格式,思维链校验逻辑
10. MCP 工具集成
难度:进阶 | 模式:智能体 + 工具 | 关键词:MCP 协议、标准化工具调用、扩展能力
10.1 架构
智能体在每轮循环中选择工具、执行调用、反思结果,直到任务完成:
SelectTool → ExecuteTool → Reflect
↑ │
└──── "continue" ─────────┘
│ "done"
↓
Output10.2 核心思路
Model Context Protocol (MCP) 是一种标准化的工具调用协议 —— 让 LLM 能以统一的方式调用各种外部工具(搜索、数据库、文件系统等)。PocketFlow 通过 Node 的 exec() 方法自然地集成 MCP 工具。
入门推荐:MCP Lite Dev 教程 提供了详细的 MCP 协议学习指南和最佳实践。
10.3 关键代码
三个节点组成典型的智能体循环:SelectTool 让 LLM 选择工具、ExecuteTool 通过 MCP 协议调用、Reflect 判断任务是否完成:
python
from pocketflow import Node, Flow
class SelectTool(Node):
"""让 LLM 从可用工具中选择最合适的"""
def prep(self, shared):
return {
"task": shared["task"],
"results": shared.get("results", []),
}
def exec(self, data):
available_tools = get_mcp_tools() # 获取 MCP 工具列表
prompt = f"任务:{data['task']}\n已有结果:{data['results']}\n可用工具:{available_tools}\n请选择工具并指定参数。"
return call_llm(prompt)
def post(self, shared, prep_res, exec_res):
shared["tool_call"] = exec_res
class ExecuteTool(Node):
"""通过 MCP 协议调用选中的工具"""
def prep(self, shared):
return shared["tool_call"]
def exec(self, tool_call):
return mcp_execute(tool_call) # MCP 标准调用
def post(self, shared, prep_res, exec_res):
shared.setdefault("results", []).append(exec_res)
class Reflect(Node):
"""判断任务是否完成"""
def prep(self, shared):
return {
"task": shared["task"],
"results": shared["results"],
}
def exec(self, data):
prompt = f"任务:{data['task']}\n已获得:{data['results']}\n任务完成了吗?输出 DONE 或 CONTINUE。"
return call_llm(prompt)
def post(self, shared, prep_res, exec_res):
if "DONE" in exec_res:
shared["answer"] = exec_res
return "done"
return "continue"
# 构建 Flow
select_tool = SelectTool()
execute_tool = ExecuteTool()
reflect = Reflect()
output = Node() # 占位输出节点
select_tool >> execute_tool >> reflect
reflect - "continue" >> select_tool # 还需要更多工具
reflect - "done" >> output # 任务完成
flow = Flow(start=select_tool)
flow.run({"task": "查询北京今天的天气并生成播报文案"})学习要点
- MCP 是协议,不是工具:它定义了“如何调用工具”的标准,具体有哪些工具由你的 MCP 服务器决定
- 与搜索智能体 的区别:搜索智能体 只有一个工具(搜索),MCP 智能体 可以选择多种工具
- Reflect 节点:智能体 每次使用工具后反思是否已完成任务,避免不必要的额外调用
get_mcp_tools()和mcp_execute():这两个是你的工具函数(参见原理篇 §6),具体实现取决于你连接的 MCP 服务器
11. 智能体技能(技能路由)
难度:中级 | 模式:链式 + 条件路由 | 关键词:技能文件、动态 Prompt、模块化知识
智能体技能 是一种将领域知识模块化为独立文件的模式。智能体 根据用户请求动态选择技能,将技能指令注入 LLM prompt,实现“一个 智能体,多种能力”。
核心思路:技能 = Markdown 文件,选择技能 = 路由节点,执行技能 = Prompt 注入。
11.1 问题场景
你有一个通用 智能体,但需要处理多种不同类型的任务 —— 写摘要、列清单、做评审。如果为每种任务写一个独立的 Node 和 Flow,代码会迅速膨胀。
智能体技能 的解法:把每种任务的指令写成一个 Markdown 文件(技能),智能体 在运行时根据用户输入动态选择并加载。
11.2 架构设计
text
用户请求
│
↓
┌────────────┐ ┌───────────┐
│Select Skill│──→│Apply Skill│
│ 选择技能 │ │ 执行任务 │
└────────────┘ └───────────┘
│ │
│ 读取 skills/ │ 技能指令 + 用户请求
│ 目录下的 .md │ 拼入 LLM prompt
↓ ↓
skills/ LLM 输出
├── executive_brief.md
├── checklist_writer.md
└── code_reviewer.md两个节点,职责清晰:
- SelectSkill:列出所有可用技能,根据用户意图选择最匹配的一个
- ApplySkill:读取选中技能的指令,拼入 prompt,调用 LLM 执行
11.3 技能文件示例
技能文件就是普通的 Markdown,包含指令和规则:
markdown
<!-- skills/executive_brief.md -->
# 执行摘要技能
你正在为高管撰写摘要。
## 规则
- 保持简洁,面向决策
- 以 3 个要点开头
- 包含风险和建议的下一步行动
- 避免实现细节另一个技能文件专注于将任务转化为可执行的清单:
markdown
<!-- skills/checklist_writer.md -->
# 清单编写技能
将请求转换为清晰、可执行的清单。
## 规则
- 使用编号步骤
- 每步简短且可验证
- 标注依赖和阻塞项
- 以"完成标准"结尾11.4 核心代码
SelectSkill 扫描技能目录并让 LLM 匹配最合适的技能,ApplySkill 将技能指令注入 prompt 执行任务:
python
from pathlib import Path
from pocketflow import Node, Flow
def load_skills(skills_dir: str) -> dict:
"""从目录加载所有 .md 技能文件"""
skills = {}
for md_file in sorted(Path(skills_dir).glob("*.md")):
skills[md_file.stem] = md_file.read_text(encoding="utf-8")
return skills
class SelectSkill(Node):
"""根据用户任务选择最匹配的技能"""
def prep(self, shared):
return {
"task": shared["task"],
"skills": load_skills(shared.get("skills_dir", "./skills")),
}
def exec(self, data):
skill_names = list(data["skills"].keys())
# 让 LLM 根据任务描述选择最匹配的技能
prompt = f"任务:{data['task']}\n可用技能:{skill_names}\n请返回最匹配的技能名。"
selected = call_llm(prompt) # 返回技能名字符串
return selected, data["skills"].get(selected, "")
def post(self, shared, prep_res, exec_res):
skill_name, skill_content = exec_res
shared["selected_skill"] = skill_name
shared["skill_content"] = skill_content
class ApplySkill(Node):
"""将选中的技能注入 prompt 并执行任务"""
def prep(self, shared):
return {
"task": shared["task"],
"skill_name": shared["selected_skill"],
"skill_content": shared["skill_content"],
}
def exec(self, data):
prompt = f"""你正在执行一个智能体技能。
技能名:{data['skill_name']}
技能指令:
---
{data['skill_content']}
---
用户任务:{data['task']}
请严格按照技能指令完成任务。"""
return call_llm(prompt)
def post(self, shared, prep_res, exec_res):
shared["result"] = exec_res
# 构建 Flow
select = SelectSkill()
apply = ApplySkill()
select >> apply
flow = Flow(start=select)
# 运行
shared = {"task": "总结 PocketFlow 的核心优势,给技术 VP 汇报用"}
flow.run(shared)
print(shared["result"])11.5 为什么这个模式有价值?
| 传统做法 | 智能体技能 |
|---|---|
| 每种任务写一个 Node 类 | 一个通用 Flow,技能文件即插即用 |
| 新增任务 = 改代码 | 新增任务 = 加一个 .md 文件 |
| 指令硬编码在 Python 中 | 指令与代码分离,非开发者也能维护 |
| 测试需要跑整个 Flow | 技能文件可以独立 review 和迭代 |
学习要点
- 技能文件是纯 Markdown,不是代码 —— 产品经理、运营人员都能编写和维护
- SelectSkill 本身可以用 LLM 做路由(语义匹配),也可以用关键词规则(确定性路由)
- 这个模式可以和 智能体 循环组合:智能体 在每轮决策中选择不同的 Skill 来执行
- 详见 原理篇 §6:工具函数层 了解 PocketFlow 的工具函数体系
12. 智能体编程(Agentic Coding)
难度:进阶 | 模式:人类设计 + 代理实现 | 关键词:系统设计、数据契约、可靠性
智能体编程是一种高效协作范式:人类负责系统设计,AI 负责实现。PocketFlow 的核心抽象(Node / Flow / Shared Store)让这种协作更自然。
重要提醒
如果你正在用 AI 构建 LLM 系统,请务必牢记三件事: 1)从小而简单的方案开始;2)先写高层设计文档(例如 docs/design.md)再写代码;3)频繁向人类确认与复盘。
12.1 分工原则(谁负责什么)
| 步骤 | 人类参与度 | AI 参与度 | 关键目标 |
|---|---|---|---|
| 1. 需求澄清 | 高 | 低 | 明确用户问题与价值边界 |
| 2. Flow 设计 | 中 | 中 | 用节点描述高层流程 |
| 3. Utilities | 中 | 中 | 列出外部能力/接口 |
| 4. Data 设计 | 低 | 高 | 设计 shared 数据契约 |
| 5. Node 设计 | 低 | 高 | 明确每个节点读写 |
| 6. 实现 | 低 | 高 | 让 智能体 写代码 |
| 7. 优化 | 中 | 中 | 调整拆分与提示词 |
| 8. 可靠性 | 低 | 高 | 补测试、补校验 |
12.2 8 步流程(写在设计文档里)
Requirements:明确需求,判断是否适合用 AI 解决
- 适合:重复性、规则清晰的任务(填表、邮件回复)
- 适合:输入明确的创作任务(生成文案、写 SQL)
- 不适合:高度模糊且需复杂决策的问题(商业战略、公司治理)
- 以用户为中心:先写“用户问题”,再写“功能清单”
- 复杂度与价值平衡:优先交付高价值、低复杂度能力
Flow:用节点描述系统如何协作
- 识别设计模式:
- 每个节点写一句话职责
- 如果是 Map-Reduce:说明“拆分”和“聚合”
- 如果是 智能体:说明“上下文”和“行动空间”
- 如果是 RAG:说明“离线索引”和“在线检索”
- 画流程图(示例):
mermaidflowchart LR start[Start] --> batch[Batch] batch --> check[Check] check -->|OK| process check -->|Error| fix[Fix] fix --> check subgraph process[Process] step1[Step 1] --> step2[Step 2] end process --> endNode[End]TIP
如果人类无法画出 Flow,AI 就无法自动化。建议先手动解几条样例,建立直觉。
Utilities:识别并实现外部工具(系统的“身体”)
- 读取输入:拉取消息、读文件、查数据库
- 写入输出:发送通知、生成报告
- 调用外部工具:搜索、API、数据库、LLM
- 注意:LLM 内部任务(总结、分析)不是 Utility

- 为每个 Utility 写一个小测试并记录输入输出
- 示例记录:
name:get_embedding(utils/get_embedding.py)input:stroutput: 3072 维向量necessity: 第二个节点需要 embedding
Data:设计 shared 数据契约
- 小系统用内存字典;大系统可接数据库
- 避免重复:引用或外键优先
pythonshared = { "user": { "id": "user123", "context": { "weather": {"temp": 72, "condition": "sunny"}, "location": "San Francisco" } }, "results": {} }Node:写清每个节点读写与工具依赖
type:Regular / Batch / Asyncprep:读 shared 的什么字段exec:调用哪个 Utility(不写异常处理)post:写回 shared 的什么字段
Implementation:开始让 智能体 写代码
- Keep it simple:不要一上来就追求复杂特性
- Fail fast:用 Node 的重试/回退机制快速暴露薄弱环节
- 添加足够日志,方便调试
Optimization:再迭代
- 先用直觉做快评估
- 回到步骤 3–6 重新拆分与优化
- 提示词更清晰、加入示例减少歧义
Reliability:补齐稳定性
exec内增加结果校验- 适当提升
max_retries和wait - 加入“自评估节点”对结果做二次检查
12.3 最小设计文档模板(节选)
每个项目都应该先写一份设计文档,涵盖需求、Flow、工具、数据和节点五个部分:
markdown
# Design Doc: 项目名
## Requirements
- 用户要解决的具体问题:
- 成功标准:
## Flow Design
- 节点列表与一句话说明
```mermaid
flowchart TD
A[Node A] --> B[Node B]
B --> C[Node C]
```
## Utilities
- call_llm(prompt: str) -> str
- search_web(query: str) -> list
## Data (shared)
shared = {
"input": ..., # 原始输入
"context": ...,
"answer": ...
}
## Node Design
- Node A: prep 读 input,exec 调用工具,post 写 context
- Node B: ...12.4 示例工程结构
一个典型的 PocketFlow 项目按职责分文件 —— nodes 定义节点、flow 组装图、utils 放工具函数:
my_project/
├── main.py
├── nodes.py
├── flow.py
├── utils/
│ ├── __init__.py
│ ├── call_llm.py
│ └── search_web.py
├── requirements.txt
└── docs/
└── design.md12.5 Utilities 实战代码(可直接运行)
第一个工具函数封装了 OpenAI API 调用,通过环境变量配置密钥和模型:
python
# utils/call_llm.py —— LLM 调用工具
import os
from openai import OpenAI
def call_llm(prompt: str) -> str:
"""调用 LLM 并返回文本回复"""
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY", ""))
r = client.chat.completions.create(
model=os.getenv("OPENAI_MODEL", "gpt-4o-mini"), # 通过环境变量切换模型
messages=[{"role": "user", "content": prompt}]
)
return r.choices[0].message.content
# 独立运行时可快速测试
if __name__ == "__main__":
print(call_llm("用一句话解释 PocketFlow"))第二个工具函数负责网络搜索,封装了外部搜索 API 的调用细节:
python
# utils/search_web.py
import requests
def search_web(query: str) -> list[str]:
# 伪实现:请替换为你的搜索 API
url = "https://api.example.com/search"
r = requests.get(url, params={"q": query}, timeout=10)
r.raise_for_status()
data = r.json()
return [item["snippet"] for item in data.get("items", [])]
if __name__ == "__main__":
print(search_web("PocketFlow agentic coding"))12.6 Node/Flow/Main 实战代码
nodes.py 定义了三个节点 —— DecideAction 负责决策、Search 调用搜索工具、Answer 生成最终回复:
python
# nodes.py —— 节点定义(每个节点遵循 prep/exec/post 三阶段)
from pocketflow import Node
from utils.call_llm import call_llm
from utils.search_web import search_web
class DecideAction(Node):
"""决策节点:让 LLM 判断是继续搜索还是直接回答"""
def prep(self, shared):
return shared["question"], shared.get("context", [])
def exec(self, data):
question, context = data
prompt = f"""问题:{question}
已有信息:{context}
请决定下一步:SEARCH 或 ANSWER。"""
return call_llm(prompt) # LLM 输出包含 SEARCH 或 ANSWER
def post(self, shared, prep_res, exec_res):
if "SEARCH" in exec_res:
return "search" # 路由到 Search 节点
return "answer" # 路由到 Answer 节点
class Search(Node):
"""搜索节点:调用外部搜索 API"""
def prep(self, shared):
return shared["question"]
def exec(self, question):
return search_web(question) # 调用工具函数
def post(self, shared, prep_res, exec_res):
shared.setdefault("context", []).extend(exec_res) # 追加搜索结果
class Answer(Node):
def prep(self, shared):
return shared["question"], shared.get("context", [])
def exec(self, data):
question, context = data
prompt = f"""请基于以下信息回答问题:
{context}
问题:{question}"""
return call_llm(prompt)
def post(self, shared, prep_res, exec_res):
shared["answer"] = exec_res有了 Node 定义后,接下来在 flow.py 中把它们连接成 Flow 图:
python
# flow.py
from pocketflow import Flow
from nodes import DecideAction, Search, Answer
def create_agent_flow():
decide = DecideAction()
search = Search()
answer = Answer()
decide - "search" >> search
decide - "answer" >> answer
search >> decide # 搜索后回到判断
return Flow(start=decide)最后是 main.py 入口文件,初始化 shared 数据并启动 Flow:
python
# main.py
from flow import create_agent_flow
def main():
shared = {"question": "PocketFlow 有哪些设计模式?"}
flow = create_agent_flow()
flow.run(shared)
print("Answer:", shared.get("answer"))
if __name__ == "__main__":
main()12.7 可靠性增强(重试 + 回退)
通过 max_retries 和 exec_fallback 两个机制,让节点在失败时自动重试,彻底失败时优雅降级:
python
from pocketflow import Node
from utils.call_llm import call_llm
class SafeAnswer(Node):
def exec(self, question):
if not question:
raise ValueError("empty question") # 主动抛异常触发重试
return call_llm(question)
def exec_fallback(self, prep_res, exc):
# 重试耗尽后执行此方法,返回兜底回复而非抛异常
return "抱歉,当前无法生成答案,请稍后再试。"
# max_retries=3: 最多重试 3 次;wait=2: 每次重试间隔 2 秒
safe_answer = SafeAnswer(max_retries=3, wait=2)12.8 最小可运行测试
为关键节点编写单元测试,验证 node.run() 返回预期的 action 字符串:
python
# tests/test_nodes.py
from nodes import DecideAction
def test_decide_action_returns_string():
node = DecideAction()
shared = {"question": "What is PocketFlow?"}
action = node.run(shared)
assert action in ["search", "answer", "default"]学习要点
- 智能体编程的核心不是“让 AI 写代码”,而是让 AI 严格按设计实现
- 设计文档越清晰,Flow 的可维护性与稳定性越高
- 可靠性靠“检查 + 重试 + 评估节点”来补齐
配套示例代码
本教程的所有案例都已整理为完整可运行的 Python 脚本(默认使用模拟 LLM,无需 API 密钥),存放在 examples/ 文件夹中。环境配置方法与原理篇相同,详见 原理篇 §1.1 环境配置。
| 文件 | 案例 | 核心模式 |
|---|---|---|
01_chatbot.py | 1. 聊天机器人 | 链式 + 循环 |
02_writing_workflow.py | 2. 写作工作流 | 链式 |
03_rag.py | 3. RAG 检索增强 | 链式 + BatchNode |
04_search_agent.py | 4. 搜索智能体 | 循环 + 条件分支 |
05_multi_agent.py | 5. 多智能体协作 | AsyncNode + 消息队列 |
06_map_reduce.py | 6. Map-Reduce | BatchNode |
07_parallel_processing.py | 7. 并行处理 | AsyncParallelBatchNode |
08_structured_output.py | 8. 结构化输出 | 循环 + 重试 + 校验 |
09_chain_of_thought.py | 9. 思维链推理 | 循环 + 自检 |
10_mcp_tool.py | 10. MCP 工具集成 | 智能体 + 工具 |
11_agent_skills.py | 11. 智能体技能 | 链式 + 条件路由 |
12_agentic_coding/ | 12. 智能体编程 | 完整项目模板 |
下一步
- 回顾 PocketFlow 原理入门 巩固核心概念
- 访问 PocketFlow GitHub 查看完整 cookbook
