Skip to content

3. 通信机制:Shared Store 与 Params

PocketFlow 提供两种节点间通信方式,各有分工。

3.1 Shared Store —— 全局共享字典

Shared Store:所有节点通过共享字典读写数据

节点之间不直接通信,而是通过一个共享的 shared 字典来传递数据。

python
shared = {}

# Node A 的 post() 写入数据
shared["question"] = "什么是 PocketFlow?"

# Node B 的 prep() 读取数据
question = shared["question"]

设计哲学

为什么不让节点直接传参?因为 PocketFlow 追求最小抽象

  • shared 是一个普通 Python 字典,没有任何封装
  • 所有节点都能读写,足够灵活
  • 你完全清楚数据从哪来、到哪去,没有"黑魔法"

3.2 Params —— 局部参数传递

除了 shared 这个"全局共享"机制,PocketFlow 还提供了一套局部参数机制 —— params

python
# 设置节点参数
node.set_params({"filename": "doc1.txt"})

# 在节点中读取参数
class ProcessFile(Node):
    def prep(self, shared):
        filename = self.params["filename"]  # 读取局部参数
        return shared["files"][filename]

Params 的值由父 Flow 传入:当 Flow 执行子节点时,会自动调用 node.set_params(params) 将参数传递给每个节点。这在 BatchFlow(见第 5.5 节)中尤为关键 —— BatchFlow 每次迭代会将不同的 params 传递给子节点,让同一条 Flow 用不同参数运行多次。

Shared 与 Params 的类比

可以用编程中的内存模型来理解这两种通信方式:

  • Shared Store堆(Heap)—— 所有函数(节点)共享同一块内存,任意读写
  • Params栈(Stack)—— 由调用者(父 Flow)赋值,节点内只读
特性Shared StoreParams
作用域全局,所有节点共享局部,由父 Flow 传入
用途存储数据、结果、大对象传递任务标识(文件名、ID)
可变性节点可读写节点内只读
典型场景对话历史、检索结果BatchFlow 的迭代参数

4. 六大设计模式

掌握了 Node 和 Flow,你就可以构建 LLM 应用中几乎所有的主流模式。它们不是框架提供的"功能类",而是 Node + Flow 自然组合出的图拓扑

模式图形态关键技巧对应案例
链式调用A → B → C 顺序执行a >> b >> c写作工作流
条件分支根据结果走不同路径node - "action" >> target搜索智能体
循环/重试不满意则重做post() 返回 action 指回前序节点聊天机器人
嵌套子流程Flow 中包含子 FlowFlow 继承自 BaseNode多智能体协作
批量处理列表中的每个元素独立处理BatchNode / BatchFlowMap-Reduce
并行执行多个任务同时运行AsyncParallelBatchNode并行处理

每种模式的代码骨架:

链式调用:节点按顺序依次执行

python
# 链式:顺序执行
a >> b >> c
flow = Flow(start=a)

条件分支:根据 action 走不同路径

python
# 条件分支:根据 action 走不同路径
check - "yes" >> handle_yes
check - "no"  >> handle_no
flow = Flow(start=check)

循环/重试:不满足条件则持续推理

python
# 循环:post() 返回 action 指回前序节点
step >> verify
verify - "retry" >> step        # 不满意则重做
verify - "done"  >> output      # 满意则输出

嵌套子流程:Flow 作为可复用的步骤

python
# 嵌套子流程:Flow 本身也是 BaseNode
sub_flow = Flow(start=sub_a)
main_a >> sub_flow >> main_b    # Flow 当节点用

批量处理:对列表中每个元素重复执行

python
# 批量处理:BatchNode 自动遍历列表
class ProcessAll(BatchNode):
    def prep(self, shared): return shared["items"]  # 返回列表
    def exec(self, item): return process(item)       # 逐一执行

并行执行:多个任务并发处理

python
# 并行执行:asyncio.gather 并发
class ParallelProcess(AsyncParallelBatchNode):
    async def prep_async(self, shared): return shared["items"]
    async def exec_async(self, item): return await async_process(item)

Easy-Pocket —— 从零掌握 PocketFlow