29. CUDA Stream Advanced Scheduling Practice | 1D-03. CUDA Stream Advanced Scheduling - 计算练习
难度: Hard | 标签: CUDA, Stream, Event, Graph | 目标人群: 想把推理和训练流程调得更细的学习者
🚀 云端运行环境
本章节的实战代码可以点击以下链接在免费 GPU 算力平台上直接运行:
本练习配套理论文档:29_CUDA_Stream_Advanced_Scheduling.md
建议先阅读理论文档,再来完成下面三件事。Notebook 负责把调度、同步和回放的收益真正算出来。
学习目标
- 理解 Stream / Event / Graph 的职责分工
- 学会判断什么时候并行调度真的有重叠收益
- 学会判断 CUDA Graph 在什么场景值得用
练习目标
- 先做一个 Stream 优先级的小实验,看高 / 低优先级任务的调度差异
- 再做 Event 同步,观察不同 Stream 之间的依赖是怎么被串起来的
- 最后尝试 CUDA Graph 捕获和回放,比较捕获与回放的耗时差异
python
from dataclasses import dataclass
from typing import List, Dict, Tuple
@dataclass
class TaskStage:
name: str
duration_us: float
stream: str
depends_on: Tuple[str, ...] = ()
def sequential_time_us(stages: List[TaskStage]) -> float:
"""假设所有阶段串行执行的总时间。"""
return sum(stage.duration_us for stage in stages)
def pipelined_time_us(stages: List[TaskStage]) -> float:
"""非常粗略地估算流水线调度后的总时间。
这里我们把同一条 Stream 的阶段视为顺序执行,
不同 Stream 若无依赖且属于不同职责,可以产生重叠。
"""
stream_totals: Dict[str, float] = {}
for stage in stages:
stream_totals.setdefault(stage.stream, 0.0)
stream_totals[stage.stream] += stage.duration_us
return max(stream_totals.values()) if stream_totals else 0.0
def overlap_ratio(sequential_us: float, pipelined_us: float) -> float:
if sequential_us == 0:
return 0.0
return 1 - pipelined_us / sequential_usPart 1: Stream 调度与并行收益
练习 1.1: 比较串行和流水线时间
先看同一批任务在串行执行和按职责拆 Stream 后,时间差有多大。
python
def test_stream_pipeline_basic():
stages = [
TaskStage('H2D', 12, 'copy'),
TaskStage('Kernel', 40, 'compute'),
TaskStage('D2H', 10, 'copy'),
TaskStage('Post', 8, 'post'),
]
seq = sequential_time_us(stages)
pipe = pipelined_time_us(stages)
assert seq == 70, seq
assert pipe == 40, pipe
assert 0 < overlap_ratio(seq, pipe) < 1
print('✅ Stream 流水线测试通过')
test_stream_pipeline_basic()
stages = [
TaskStage('H2D', 12, 'copy'),
TaskStage('Kernel', 40, 'compute'),
TaskStage('D2H', 10, 'copy'),
TaskStage('Post', 8, 'post'),
]
print('串行时间:', sequential_time_us(stages), 'us')
print('流水线时间:', pipelined_time_us(stages), 'us')
print('重叠收益:', f"{overlap_ratio(sequential_time_us(stages), pipelined_time_us(stages)):.1%}")练习 1.2: 判断什么时候值得拆 Stream
不是所有任务都值得拆成很多 Stream。你要先看职责是否独立,再看是否真的能重叠。
python
def should_split_stream(copy_us: float, compute_us: float, post_us: float, threshold: float = 0.2) -> bool:
"""如果搬运和后处理占比足够大,拆 Stream 往往更值得。"""
total = copy_us + compute_us + post_us
if total == 0:
return False
overhead = (copy_us + post_us) / total
return overhead >= threshold
assert should_split_stream(20, 40, 15) is True
assert should_split_stream(2, 60, 1) is False
print('✅ Stream 拆分判断函数测试通过')Part 2: Event 同步与依赖图
练习 2.1: 用 Event 描述跨 Stream 依赖
Event 的关键作用不是“阻塞全部”,而是只同步真正有依赖关系的阶段。
python
def build_dependency_edges(stages: List[TaskStage]) -> List[Tuple[str, str]]:
edges = []
for stage in stages:
for dep in stage.depends_on:
edges.append((dep, stage.name))
return edges
def has_cross_stream_dependency(stages: List[TaskStage]) -> bool:
name_to_stage = {s.name: s for s in stages}
for stage in stages:
for dep in stage.depends_on:
if name_to_stage[dep].stream != stage.stream:
return True
return Falsepython
def test_event_dependencies():
stages = [
TaskStage('H2D', 12, 'copy'),
TaskStage('Kernel', 40, 'compute', depends_on=('H2D',)),
TaskStage('D2H', 10, 'copy', depends_on=('Kernel',)),
TaskStage('Post', 8, 'post', depends_on=('D2H',)),
]
edges = build_dependency_edges(stages)
assert ('H2D', 'Kernel') in edges
assert ('Kernel', 'D2H') in edges
assert ('D2H', 'Post') in edges
assert has_cross_stream_dependency(stages) is True
print('✅ Event 依赖测试通过')
test_event_dependencies()练习 2.2: 判断 Event 是否过密
Event 应该只放在真正有依赖的地方;如果同步点过多,流水线收益会被吃掉。
python
def event_density(num_events: int, num_stages: int) -> float:
return num_events / num_stages if num_stages else 0.0
assert abs(event_density(3, 4) - 0.75) < 1e-9
assert event_density(0, 4) == 0.0
print('✅ Event 密度测试通过')Part 3: CUDA Graph 适用性判断
练习 3.1: 判断是否适合 Graph
Graph 适合路径稳定的任务,不适合形状频繁变化的任务。
python
def graph_suitability(is_fixed_shape: bool, is_repeated_path: bool, has_many_branches: bool) -> bool:
"""非常简化的 CUDA Graph 适用性判断。"""
return is_fixed_shape and is_repeated_path and not has_many_branches
assert graph_suitability(True, True, False) is True
assert graph_suitability(True, False, False) is False
assert graph_suitability(False, True, False) is False
assert graph_suitability(True, True, True) is False
print('✅ Graph 适用性测试通过')python
cases = [
('离线批处理推理', True, True, False),
('变长在线推理', False, True, False),
('动态分支很多的控制流', True, True, True),
]
for name, fixed_shape, repeated_path, many_branches in cases:
print(f'{name:<18s}:', '适合 Graph' if graph_suitability(fixed_shape, repeated_path, many_branches) else '不太适合 Graph')Part 4: 典型流水线总结
练习 4.1: 选择职责划分
把任务按 H2D / Kernel / D2H + Post 拆开,通常比把所有任务都放进一条 Stream 更容易形成重叠。
python
def recommended_pipeline(copy_us: float, compute_us: float, post_us: float) -> Dict[str, str]:
if not should_split_stream(copy_us, compute_us, post_us):
return {'decision': 'single_stream', 'reason': '重叠收益有限'}
return {
'decision': 'split_streams',
'reason': '搬运 / 计算 / 收尾职责可分离,适合用 Event 串依赖',
}
print(recommended_pipeline(12, 40, 10))
print(recommended_pipeline(2, 60, 1))🛑 STOP HERE 🛑
先确保你能独立完成上面的练习,再看参考答案。
参考代码与解析
下面给出一版更精简的参考实现,帮助你对照自己写的版本。
python
# ==================== 参考答案 ====================
def sequential_time_us_ref(stages: List[TaskStage]) -> float:
return sum(stage.duration_us for stage in stages)
def graph_suitability_ref(is_fixed_shape: bool, is_repeated_path: bool, has_many_branches: bool) -> bool:
return is_fixed_shape and is_repeated_path and not has_many_branches
print('参考结论:')
print('- Stream 负责并行调度')
print('- Event 负责局部同步')
print('- Graph 负责稳定路径回放')解析
1. Stream 调度
- 只有职责可分离、依赖关系清楚时,拆 Stream 才有意义
- 并行收益来自重叠,不来自“Stream 数量多”本身
2. Event 同步
- Event 用来连接不同 Stream 的依赖
- 不是越多越好,太密会压缩重叠空间
3. CUDA Graph
- Graph 适合固定路径、重复执行的场景
- 动态 shape 和复杂分支会让 Graph 的收益下降
关联阅读
这一页和 1D-01 的异构调度、1D-02 的 CUDA / Triton 编程模型是连着的,建议一起看:
1D-01CPU/GPU 异构调度1D-02CUDA/Triton 编程模型1D-04动态 Shape 处理
配合练习
这页的练习可以围绕下面三个目标展开:
- 先做一个 Stream 优先级的小实验,看高 / 低优先级任务的调度差异
- 再做 Event 同步,观察不同 Stream 之间的依赖是怎么被串起来的
- 最后尝试 CUDA Graph 捕获和回放,比较捕获与回放的耗时差异
