Skip to content

5. 深入源码:100 行的全部秘密

🔬PocketFlow 核心源码解剖
pocketflow/__init__.pyLines 3-15
class BaseNode:
    def __init__(self):
        self.params, self.successors = {}, {}

    def next(self, node, action="default"):
        self.successors[action] = node
        return node

    # 三阶段执行模型
    def prep(self, shared): pass
    def exec(self, prep_res): pass
    def post(self, shared, prep_res, exec_res): pass

    def _run(self, shared):
        p = self.prep(shared)
        e = self._exec(p)
        return self.post(shared, p, e)

    # 操作符重载:node1 >> node2
    def __rshift__(self, other):
        return self.next(other)

    # 条件转移:node1 - "action" >> node2
    def __sub__(self, action):
        return _ConditionalTransition(self, action)
BaseNode — 万物之基
角色定义节点的基础结构:参数管理、后继连接、三阶段执行
1params 字典存储节点参数,successors 字典存储后继节点映射
2next() 方法将两个节点通过 action 字符串连接,构成有向图的边
3prep → exec → post 三阶段是核心执行模型:准备数据、执行逻辑、后处理
4>> 操作符让你可以写 nodeA >> nodeB 来连接节点
5- 操作符支持条件分支:nodeA - "yes" >> nodeB
🎯类比:BaseNode 就像乐高积木的基础底板,定义了所有积木块的通用接口。
继承关系
BaseNode
├─ Node
│  ├─ BatchNode
│  └─ AsyncNode
└─ Flow
   └─ BatchFlow / AsyncFlow

PocketFlow 的 100 行源码包含 12 个类,构成以下层次结构:

BaseNode                                       ← 万物之基(params / successors / 三阶段 / 操作符重载)
├── Node(BaseNode)                             ← 可重试的执行单元(max_retries / wait / exec_fallback)
│   ├── BatchNode(Node)                        ← 批量处理节点(对列表逐一执行 exec)
│   └── AsyncNode(Node)                        ← 异步节点(async 版三阶段)
│       ├── AsyncBatchNode(AsyncNode, BatchNode)        ← 顺序异步批处理 ◆
│       └── AsyncParallelBatchNode(AsyncNode, BatchNode)← 并行异步批处理 ◆
├── Flow(BaseNode)                             ← 图执行引擎(遍历有向图)
│   ├── BatchFlow(Flow)                        ← 批量运行整条 Flow(不同 params)
│   └── AsyncFlow(Flow, AsyncNode)             ← 异步图执行引擎(混合同步/异步节点)◆
│       ├── AsyncBatchFlow(AsyncFlow, BatchFlow)        ← 顺序异步批量 Flow ◆
│       └── AsyncParallelBatchFlow(AsyncFlow, BatchFlow)← 并行异步批量 Flow ◆
└── _ConditionalTransition                     ← 辅助类(实现 - "action" >> 语法)

◆ 菱形继承(Diamond Inheritance)

标记 ◆ 的类同时继承两个父类,形成菱形继承。例如 AsyncBatchNode(AsyncNode, BatchNode) —— 从 AsyncNode 获得 async 能力,从 BatchNode 获得批量循环,Python 的方法解析顺序(Method Resolution Order, MRO)确保 _exec() 等方法的调用顺序正确。这种"能力叠加"设计让 12 个类覆盖了所有同步/异步 × 单个/批量 × 顺序/并行的组合。

下面逐一解读关键类。

5.1 BaseNode —— 万物之基

BaseNode 是所有节点和流程的基类,它定义了:

  • params 字典:存储节点的配置参数(由父 Flow 传入)
  • successors 字典:存储 action → 后继节点 的映射
  • 三阶段方法prep()exec()post()
  • 操作符重载>>(默认连接)和 -(条件连接)

5.2 Node —— 可重试的执行单元

Node 继承 BaseNode,增加了重试机制

python
class Node(BaseNode):
    def __init__(self, max_retries=1, wait=0):
        self.max_retries = max_retries
        self.wait = wait

    def exec_fallback(self, prep_res, exc):
        raise exc  # 默认:重新抛出异常

    def _exec(self, prep_res):
        for self.cur_retry in range(self.max_retries):
            try:
                return self.exec(prep_res)
            except Exception as e:
                if self.cur_retry == self.max_retries - 1:
                    return self.exec_fallback(prep_res, e)
                if self.wait > 0:
                    time.sleep(self.wait)

实用场景

当调用 LLM API 时,网络波动、限流等问题很常见。设置 max_retries=3, wait=2 就能最多尝试 3 次(首次 + 2 次重试),每次间隔 2 秒。所有重试耗尽后,exec_fallback(prep_res, exc) 会被调用 —— 你可以覆写它来返回降级结果而非抛出异常。

注意源码中 for self.cur_retry in range(self.max_retries) —— 重试计数器 self.cur_retry 是节点实例属性,你可以在 exec() 中读取它(self.cur_retry)来判断当前是第几次重试,从而在不同重试轮次采用不同策略。

5.3 Flow —— 图执行引擎

Flow 本身也是 BaseNode 的子类 —— 这意味着 Flow 可以嵌套在其他 Flow 中,这是构建复杂应用的关键。

python
class Flow(BaseNode):
    def _orch(self, shared, params=None):
        curr = copy.copy(self.start_node)
        p = params or {**self.params}
        while curr:
            curr.set_params(p)          # 将 params 传给每个子节点
            last_action = curr._run(shared)
            curr = copy.copy(self.get_next_node(curr, last_action))
        return last_action

三个关键细节:

1. copy.copy —— 防止状态泄漏

注意 _orch 中的 copy.copy(self.start_node)copy.copy(self.get_next_node(...))。每次执行都对节点做浅拷贝,这意味着同一条 Flow 可以安全地多次运行,每次运行使用独立的节点副本,不会因上一次运行残留的 paramscur_retry 等内部状态而互相干扰。

2. Flow 自身也有 prep/post 生命周期

Flow 继承自 BaseNode,因此它的 _run() 实际上是 prep → _orch → post。大多数时候你不需要覆写 Flow 的 prep/post,但 BatchFlow 正是利用了这一点 —— 它覆写 prep() 返回 params 列表,再在 _run() 中逐一迭代(见 §5.5)。

3. Flow.post 默认透传 last_action

源码中 Flow.post() 被覆写为 return exec_res,即把最后一个节点的 action 原样返回。这使得嵌套 Flow 可以参与父 Flow 的路由 —— 子 Flow 内部最后一个节点返回的 action 会传递给父 Flow,决定父 Flow 的下一跳。

注意 curr.set_params(p) —— Flow 在执行每个子节点前,都会将自身的 params 传递给子节点。这就是第 3.2 节 Params 机制的运作方式。

get_next_node 的优雅终止

post() 返回的 action 如果在 successors 中找不到匹配,Flow 不会崩溃 —— 它会发出一条 warning 并正常结束。这意味着:

  • 返回 None(即 post() 没有显式 return)→ 如果没有 "default" 后继,Flow 结束
  • 返回一个未注册的 action → Flow 发出警告并结束
  • 这是有意为之的设计,让流程终止变得简单自然
python
# 子 Flow 作为一个 Node 参与到父 Flow 中
sub_flow = Flow(start=sub_start_node)
main_start >> sub_flow >> main_end
main_flow = Flow(start=main_start)

5.4 BatchNode —— 批量处理

BatchNode 只覆写了一个方法,就实现了批处理:

python
class BatchNode(Node):
    def _exec(self, items):
        return [super()._exec(i) for i in (items or [])]
  • prep() 返回一个列表
  • exec() 对列表中每个元素独立执行
  • 每个元素独享 Node 的重试机制

5.5 BatchFlow —— 批量运行整条 Flow

BatchFlow 与 BatchNode 解决不同的问题:BatchNode 在一个节点内批量处理列表元素,而 BatchFlow 用不同的 params 多次运行整条 Flow

python
class BatchFlow(Flow):
    def _run(self, shared):
        pr = self.prep(shared) or []        # prep 返回 params 字典列表
        for bp in pr:
            self._orch(shared, {**self.params, **bp})  # 合并父级 params 与当前批次 params
        return self.post(shared, pr, None)

使用方式

python
class SummarizeAllFiles(BatchFlow):
    def prep(self, shared):
        filenames = list(shared["files"].keys())
        return [{"filename": fn} for fn in filenames]  # 每次迭代一个文件

class SummarizeFile(Node):
    def prep(self, shared):
        filename = self.params["filename"]      # 通过 params 获取当前文件名
        return shared["files"][filename]

    def exec(self, content):
        return call_llm(f"Summarize: {content}")

    def post(self, shared, prep_res, exec_res):
        shared["summaries"][self.params["filename"]] = exec_res

# 构建 Flow
summarize = SummarizeFile()
inner_flow = Flow(start=summarize)
batch_flow = SummarizeAllFiles(start=inner_flow)
batch_flow.run(shared)

BatchNode vs BatchFlow

  • BatchNode:一个节点,批量执行 exec() → 适合"对列表每项做同一个操作"
  • BatchFlow:一条完整 Flow,用不同参数多次运行 → 适合"对多个任务运行同一条流水线"

5.6 AsyncNode 与异步家族

异步模式:等待 I/O 操作完成后继续执行

为什么需要 async?

本教程的案例使用同步模拟函数来降低门槛,但真实业务几乎都是异步的。LLM API 调用(OpenAI、Claude)、Web 搜索、数据库查询都是网络 I/O —— 每次请求要等几百毫秒到几秒。同步代码在等待期间阻塞整个线程;async/await 让 Python 在等待 I/O 时去处理其他任务。

生产环境中,你几乎总是用 AsyncNode + AsyncFlow 替代 Node + Flow。接口完全对称,只需给方法加上 async 前缀和 _async 后缀,逻辑代码不变。

AsyncNode 提供三阶段执行的 async/await 版本:

python
class AsyncNode(Node):
    async def prep_async(self, shared): pass
    async def exec_async(self, prep_res): pass
    async def exec_fallback_async(self, prep_res, exc): raise exc
    async def post_async(self, shared, prep_res, exec_res): pass

    def _run(self, shared):
        raise RuntimeError("Use run_async.")  # 不能在同步 Flow 中使用!

关键约束

  • AsyncNode 的 _run()直接抛出 RuntimeError —— 它不能放在普通 Flow
  • AsyncNode 必须包裹在 AsyncFlow 中运行
  • 反过来,AsyncFlow 可以包含普通同步 Node(它会自动判断用 _run 还是 _run_async

AsyncFlow 是如何做到"混合同步/异步节点"的?关键在 _orch_async 的一行 isinstance 检测:

python
# AsyncFlow._orch_async 核心逻辑(简化)
while curr:
    if isinstance(curr, AsyncNode):
        last_action = await curr._run_async(shared)   # 异步节点
    else:
        last_action = curr._run(shared)                # 普通同步节点
    curr = self.get_next_node(curr, last_action)

这意味着你可以在一条 AsyncFlow 中自由混搭同步和异步节点 —— AsyncFlow 会自动为每个节点选择正确的调用方式。

基于 AsyncNode,PocketFlow 提供了一系列异步组合类:

继承关系行为
AsyncNodeNode异步三阶段执行
AsyncBatchNodeAsyncNode + BatchNode异步逐个处理列表元素(for + await
AsyncParallelBatchNodeAsyncNode + BatchNode异步并行处理列表元素(asyncio.gather
AsyncFlowFlow + AsyncNode异步图引擎,支持混合同步/异步节点
AsyncBatchFlowAsyncFlow + BatchFlow异步逐次运行多组 params
AsyncParallelBatchFlowAsyncFlow + BatchFlow异步并行运行多组 params(asyncio.gather

其中 AsyncParallelBatchNode 是最常用的 —— 在 I/O 密集场景(多个 API 调用、多文件处理)下,asyncio.gather() 可以获得数倍的性能提升

python
class AsyncParallelBatchNode(AsyncNode, BatchNode):
    async def _exec(self, items):
        return await asyncio.gather(
            *(super()._exec(i) for i in items)
        )

Easy-Pocket —— 从零掌握 PocketFlow