Skip to content

并發、异步與多线程

💡 學習指南:并發編程是很多後端工程师的"阿喀琉斯之踵"——面試被問倒、线上出 Bug、性能調優没思路。本章節會围绕一个核心問题展開:当10万个用户同時請求你的服務,你的代碼會崩吗?

在開始之前,建议你先补兩塊"基础砖":

  • CPU、內存、I/O 是什么:如果不清楚這些基础概念,可以先回顧操作系统的基本知識。
  • 什么是阻塞/非阻塞:如果還不熟悉同步/异步的概念,可以先通過實际編程體验感受一下。

0. 引言:為什么你的服務一到高峰期就"卡死"?

Process / Thread / Coroutine Comparison

Memory Usage
400
MB
Context Switches
0
Completed Tasks
0
Elapsed Time
0
ms
CPU 1
CPU 2
CPU 3
CPU 4
Task Queue
Task 1
Task 2
Task 3
Task 4
Task 5
Task 6
Task 7
Task 8
Task 9
Task 10
Task 11
Task 12
Task 13
Task 14
Task 15
Task 16

很多人在實际開發中都會遇到類似的情况:

  • 本地測試時服務響應飛快,一上线就"卡成 PPT";
  • 明明買了很高的服務器配置,CPU 占用率却總是上不去;
  • 一到促銷高峰期,服務就"雪崩",不得不降级或熔断。

直觉上,我们會以為是:"服務器不够強"。 但大多數時候,問题并不在于硬件"不够快",而在于我们没有設計好并發模型

核心矛盾

  • 如果不并發處理:用户請求排队等待,體验极差;
  • 如果亂用多线程:鎖竞爭、上下文切换開銷,性能反而下降。

面對這些挑戰,單纯依靠"加機器"已經捉襟见肘。我们需要一套系统的并發設計方法,在高并發場景下既保證性能,又确保穩定性。這正是本章節試图解决的問题。


1. 核心概念:進程、线程、協程,到底啥區別?

1.1 一个餐厅的比喻

想象你開了一家餐厅,要同時服務很多顧客:

概念餐厅比喻技術含義
進程 (Process)独立的餐厅分店擁有独立的內存空間、资源分配,是操作系统资源分配的基本單位。一个進程崩溃不會影響其他進程。
线程 (Thread)分店內的厨师是 CPU 調度的基本單位,共享進程內的內存空間。同一進程內的线程可以共享數據,但一个线程崩溃可能導致整个進程崩溃。
協程 (Coroutine)厨师的"分身術"用户態的輕量级线程,由程序自己調度而非操作系统。切换開銷极小,可以創建數百万个。

1.2 深入對比:三者的本质差异

Process Memory Isolation Demo

System Memory

進程:资源隔離的"集装箱"

核心特點

  • 隔離性強:每个進程有独立的虚擬地址空間
  • 開銷大:創建/切换需要操作系统介入,耗時约 1-10ms
  • 通信複雜:進程間通信(IPC)需要特殊機制(管道、消息队列、共享內存等)

適用場景

  • 需要強隔離的服務(如浏览器標簽頁、沙箱程序)
  • 多語言混合部署的服務
  • 需要独立重启/升级的服務單元

线程:共享內存的"輕骑兵"

Thread Scheduling Demo

Timeline
0ms
100ms
200ms
300ms
400ms
500ms
0
Completed Threads
0
Context Switches
0ms
Avg Wait Time
0
Throughput (threads/s)
Current Scheduling Algorithm: Round Robin (Time Slice)

Each thread takes turns executing for a time slice. When the slice expires, it switches to the next thread. Good responsiveness, suitable for interactive systems.

核心特點

  • 共享內存:同一進程內的线程共享代碼段、數據段、堆
  • 独立栈空間:每个线程有自己的栈(通常 1MB 左右)
  • 切换較快:线程切换约 1-10μs,比進程快 1000 倍
  • 需要同步:共享數據需要加鎖保護

適用場景

  • CPU 密集型任務(計算、图像處理)
  • 需要共享大量數據的并發任務
  • 對延遲敏感的後台任務

協程:用户態的"绿色线程"

Coroutine Lightweight Comparison Demo

1000 coroutines
Thread Model
Memory Usage
1000 MB
Creation Time
100 ms
Context Switch
~1-10 us
VS
Coroutine Model
Memory Usage
2000 MB
Creation Time
10 ms
Context Switch
~100 ns
Saves -100% Memory

核心特點

  • 用户態調度:由程序/運行時庫調度,不經過操作系统
  • 极輕量级:協程栈通常只有几 KB,可創建數百万个
  • 切换极快:協程切换约 100ns,比线程快 100 倍
  • 非抢占式:協程主動讓出 CPU(協作式多任務)

適用場景

  • I/O 密集型高并發服務(Web 服務器、網關)
  • 需要維持大量長連接的場景(IM、游戏服務器)
  • 流式數據處理、流水线作業

2. 案例分析:某電商大促的"并發之痛"

2.1 血泪教训:從"單機"到"分布式"的演進

讓我们看一个真實的電商系统演進故事:

階段一:單機時代(日活 1000)

python
# 简單的 Flask 應用
from flask import Flask

app = Flask(__name__)

@app.route('/order')
def create_order():
    # 查询庫存
    stock = db.query("SELECT stock FROM products WHERE id=1")
    if stock > 0:
        # 扣减庫存
        db.execute("UPDATE products SET stock = stock - 1 WHERE id=1")
        # 創建订單
        db.execute("INSERT INTO orders ...")
        return "Order created!"
    return "Out of stock!"

# 启動:flask run

問题

  • 單進程單线程,一次只能處理一个請求
  • 庫存扣减没有加鎖,并發時會出現超卖
  • 數據庫連接數有限,連接池很快被耗尽

階段二:多進程時代(日活 1万)

python
# 使用 Gunicorn 多進程部署
gunicorn -w 4 -k sync app:app

# 4个 worker 進程,每个進程独立處理請求

新問题

  • 4 个進程同時查庫存,都看到 stock=1,都扣减成功,超卖 3 个!
  • 需要引入分布式鎖
python
import redis

# 使用 Redis 分布式鎖
lock = redis_client.lock("stock_lock", timeout=10)
if lock.acquire():
    try:
        stock = db.query("SELECT stock FROM products WHERE id=1")
        if stock > 0:
            db.execute("UPDATE products SET stock = stock - 1 WHERE id=1")
    finally:
        lock.release()

階段三:協程時代(日活 10万)

python
# 使用 FastAPI + asyncio
from fastapi import FastAPI
import asyncio

app = FastAPI()

async def check_stock(product_id: int) -> int:
    # 异步查询數據庫,不阻塞
    result = await db.fetch_one(
        "SELECT stock FROM products WHERE id = :id",
        {"id": product_id}
    )
    return result["stock"]

@app.get("/order")
async def create_order(product_id: int):
    # 并發檢查庫存和用户信息
    stock_task = check_stock(product_id)
    user_task = get_user_info(request.user_id)

    stock, user = await asyncio.gather(stock_task, user_task)

    if stock > 0:
        # 异步扣减庫存
        await db.execute(
            "UPDATE products SET stock = stock - 1 WHERE id = :id",
            {"id": product_id}
        )
        return {"status": "success"}

    return {"status": "out_of_stock"}

# 启動:uvicorn main:app --workers 4
# 每个 worker 內可以處理數千个并發協程

優勢

  • 單线程內可處理數千并發連接
  • I/O 操作時主動讓出 CPU,不阻塞其他請求
  • 內存占用极低,適合高并發長連接場景

2.2 并發模型演進對比表

階段并發模型支撑日活核心問题解决方案
單體單進程單线程1K无法并發處理引入多進程
多進程多進程同步10K數據竞爭、超卖分布式鎖
多线程多线程+鎖50K上下文切换開銷、死鎖线程池、无鎖队列
協程异步 I/O100K+代碼複雜度、調試困難框架封装、鏈路追蹤
混合多進程+協程1000K+架構複雜度服務治理、弹性伸缩

3. 原理深入:各種并發模型的工作原理

3.1 進程模型:隔離性與通信

內存隔離機制

Process Memory Isolation Demo

System Memory

每个進程擁有独立的虚擬地址空間:

進程 A 的虚擬內存          進程 B 的虚擬內存
+----------------+        +----------------+
|  內核空間      |        |  內核空間      |  <-- 共享(只讀)
|  (共享)        |        |  (共享)        |
+----------------+        +----------------+
|  栈空間        |        |  栈空間        |  <-- 独立
|  (向下增長)    |        |  (向下增長)    |
+----------------+        +----------------+
|  堆空間        |        |  堆空間        |  <-- 独立
|  (向上增長)    |        |  (向上增長)    |
+----------------+        +----------------+
|  數據段        |        |  數據段        |  <-- 独立
|  (.bss/.data)  |        |  (.bss/.data)  |
+----------------+        +----------------+
|  代碼段        |        |  代碼段        |  <-- 独立
|  (.text)       |        |  (.text)       |
+----------------+        +----------------+

進程間通信(IPC)方式

方式原理速度適用場景
管道 (Pipe)內核緩衝區,單向流中等父子進程間通信
消息队列內核消息鏈表中等异步消息傳遞
共享內存同一塊物理內存映射最快大量數據共享
信号量內核計數器-同步與互斥
Socket網絡協议栈較慢跨機器通信
信号 (Signal)軟中断-事件通知

3.2 线程模型:調度與同步

线程調度原理

Thread Scheduling Demo

Timeline
0ms
100ms
200ms
300ms
400ms
500ms
0
Completed Threads
0
Context Switches
0ms
Avg Wait Time
0
Throughput (threads/s)
Current Scheduling Algorithm: Round Robin (Time Slice)

Each thread takes turns executing for a time slice. When the slice expires, it switches to the next thread. Good responsiveness, suitable for interactive systems.

操作系统线程調度器的基本工作:

就绪队列                    運行中                    等待队列
+--------+                +--------+               +--------+
| 线程 B |  <-- 時間片到   | 线程 A |  <-- I/O請求  | 线程 C |
| 线程 D |                | (運行) |               | 线程 E |
| 线程 F |                +--------+               | (阻塞) |
+--------+                                         +--------+
    |                                                  |
    v                                                  v
調度器根據優先级選择下一个運行            I/O完成時移回就绪队列

常见线程同步機制

機制原理優點缺點
互斥鎖 (Mutex)二元狀態,独占访問實現简單竞爭激烈時性能差
讀写鎖 (RWLock)讀共享,写独占讀多写少場景效率高實現複雜,有写饥餓風險
自旋鎖 (Spinlock)忙等待,不釋放 CPU等待時間短時效率高等待時間長時浪費 CPU
條件變量等待特定條件满足避免忙等待需要配合鎖使用
信号量 (Semaphore)計數器控制访問數量可控制并發數使用不当易出錯
原子操作CPU 指令级原子性无鎖,性能最高只能操作简單數據類型
无鎖队列CAS 操作實現高并發下性能優异實現複雜,ABA 問题

3.3 協程模型:用户態調度

Coroutine Lightweight Comparison Demo

1000 coroutines
Thread Model
Memory Usage
1000 MB
Creation Time
100 ms
Context Switch
~1-10 us
VS
Coroutine Model
Memory Usage
2000 MB
Creation Time
10 ms
Context Switch
~100 ns
Saves -100% Memory

協程的核心優勢

傳统多线程                vs              協程模型

+------------+                       +------------+
|  线程 1    |                       |  事件循環   |
| (1MB栈)   |                       |  (調度器)   |
+------------+                       +------------+
     |                                     |
     v                                     v
+------------+                       +------------+
|  线程 2    |                       |  協程 A    |
| (1MB栈)   |                       | (几KB栈)   |
+------------+                       +------------+
     |                                     |
     v                                     v
+------------+                       +------------+
|  线程 3    |                       |  協程 B    |
| (1MB栈)   |                       | (几KB栈)   |
+------------+                       +------------+

開銷:N MB                           開銷:N KB
創建:~10μs                         創建:~100ns
切换:~1μs                          切换:~100ns

async/await 的工作機制

async/await Mechanism Demo

Python asyncio Example
import asyncio

async def fetch_data(url):
    # await suspends, yields CPU
    response = await aiohttp.get(url)
    # Continue after I/O completes
    return response.json()

async def main():
    # Concurrent execution
    tasks = [fetch_data(url) for url in urls]
    results = await asyncio.gather(*tasks)
Execution Timeline
0ms
50ms
100ms
150ms
200ms
Event Loop
Scheduling
Task 1
Exec
I/O
Exec
I/O
Exec
I/O
Task 2
Exec
I/O
Exec
I/O
Exec
I/O
Task 3
Exec
I/O
Exec
I/O
Exec
I/O
Task 4
Exec
I/O
Exec
I/O
Exec
I/O
Task 5
Exec
I/O
Exec
I/O
Exec
I/O
Concurrent Tasks
5
Total Time
100ms
I/O Wait Time
60ms
CPU Utilization
40%
python
import asyncio

async def fetch_data(url):
    # 遇到 await,協程挂起,讓出 CPU
    response = await aiohttp.get(url)
    # I/O 完成後,事件循環唤醒協程,從這裡继續執行
    return response.json()

async def main():
    # 創建 3 个協程任務
    tasks = [
        fetch_data("https://api1.example.com"),
        fetch_data("https://api2.example.com"),
        fetch_data("https://api3.example.com")
    ]
    # 并發執行,總耗時 ≈ 最慢的那个請求
    results = await asyncio.gather(*tasks)
    return results

# 启動事件循環
asyncio.run(main())

執行流程

時間线 -------------------------------------------------------------------->

協程 A: [準備請求]--[await 挂起]=======[收到響應]--[處理數據]
                     |
協程 B:              [準備請求]--[await 挂起]=======[收到響應]--[處理數據]
                                  |
協程 C:                           [準備請求]--[await 挂起]=======[收到響應]
                                               |

                                         所有 I/O 完成

說明:[ ] 表示 CPU 執行, === 表示 I/O 等待, | 表示協程切换

3.4 事件循環:協程的"心脏"

Event Loop Demo

Call Stack
Stack Empty
Event Loop
Check
1Execute synchronous code in the call stack
2Execute all microtasks
3Render UI (if needed)
4Execute macrotask
Task Queue
Microtask Queue
Queue Empty
Macrotask Queue
Queue Empty

事件循環是協程調度的核心機制:

python
import selectors
import heapq

class EventLoop:
    def __init__(self):
        self.selector = selectors.DefaultSelector()
        self.ready = []  # 就绪队列
        self.scheduled = []  # 定時任務队列
        self.current = None

    def run(self):
        while True:
            # 1. 處理定時任務
            now = time.time()
            while self.scheduled and self.scheduled[0][0] <= now:
                _, callback = heapq.heappop(self.scheduled)
                self.ready.append(callback)

            # 2. 等待 I/O 事件
            timeout = 0 if self.ready else 0.1
            events = self.selector.select(timeout)

            for key, mask in events:
                callback = key.data
                self.ready.append(callback)

            # 3. 執行就绪的回調
            while self.ready:
                callback = self.ready.popleft()
                callback()

3.5 并發 vs 并行:不是一回事

Concurrency vs Parallelism Demo

CPU Core (Single Core)
CPU 1
Idle
CPU 2
Idle
CPU 3
Idle
CPU 4
Idle
Task Execution
Task 1
40ms
Task 2
30ms
Task 3
50ms
Task 4
35ms
Concurrency vs Parallelism
🔄
Concurrency
Multiple tasks alternate execution, progressing simultaneously at a macro level
Examples: Single-core CPU multi-threading, coroutine scheduling, async I/O
Parallelism
Multiple tasks execute truly simultaneously
Examples: Multi-core CPU computing, GPU parallel computing, distributed processing
What Conditions Are Needed?
Concurrency: A single-core CPU is sufficient
Parallelism: Requires multi-core CPU or multiple machines
概念英文含義比喻需要條件
并發Concurrency多个任務交替執行,宏观上同時推進一个人輪流做多个菜單核 CPU 即可
并行Parallelism多个任務真正同時執行多个人同時做不同的菜多核 CPU 或多機

图示說明

單核 CPU - 并發(Concurrent)
時間 →  1    2    3    4    5    6    7    8
任務 A: [執行][執行]      [執行][執行]
任務 B:      [執行][執行]      [執行][執行]

兩个任務交替執行,宏观上"同時"推進

========================================

多核 CPU - 并行(Parallel)
時間 →  1    2    3    4    5    6    7    8
核心 1: [任務A][任務A][任務A][任務A]
核心 2: [任務B][任務B][任務B][任務B]

兩个任務真正"同時"執行

========================================

現實中往往是:并發 + 并行
時間 →  1    2    3    4    5    6    7    8
核心 1: [A1][A1][B1][B1][C1][C1][D1][D1]
核心 2: [A2][A2][B2][B2][C2][C2][D2][D2]

多个任務先并發調度到不同核心,再在核心上并行執行

4. 實戰:Go 協程與绿色线程

4.1 Go 的并發哲學

Go Goroutine & GMP Scheduling Demo

Global Queue (G)3
G1
G2
G3
P (Processors) - 4 Total
P0Running
Local Queue
G4
G5
G6
Bound to M0
P1Idle
Local Queue
G7
G8
G9
P2Idle
Local Queue
G10
G11
G12
P3Idle
Local Queue
-
M (Machine Threads) - 4 Total
M0Running
M1Sleeping
M2Sleeping
M3Sleeping

Go 語言的并發設計哲學:不要通過共享內存來通信,而要通過通信來共享內存

go
package main

import (
    "fmt"
    "time"
)

// 生產者
func producer(ch chan<- int, id int) {
    for i := 0; i < 5; i++ {
        fmt.Printf("Producer %d sending: %d\n", id, i)
        ch <- i  // 發送數據到 channel
        time.Sleep(100 * time.Millisecond)
    }
}

// 消費者
func consumer(ch <-chan int, id int) {
    for val := range ch {  // 從 channel 接收數據
        fmt.Printf("Consumer %d received: %d\n", id, val)
    }
}

func main() {
    // 創建带緩衝的 channel
    ch := make(chan int, 10)

    // 启動 2 个生產者 goroutine
    for i := 0; i < 2; i++ {
        go producer(ch, i)
    }

    // 启動 2 个消費者 goroutine
    for i := 0; i < 2; i++ {
        go consumer(ch, i)
    }

    // 等待一段時間
    time.Sleep(3 * time.Second)
    close(ch)
}

4.2 Goroutine 調度器:GMP 模型

Go 的調度器采用了 GMP 模型:

組件含義作用
G (Goroutine)協程待執行的任務,輕量级(2KB 栈,可動態伸缩)
M (Machine)系统线程實际執行 G 的載體,與內核线程 1:1 對應
P (Processor)邏輯處理器調度上下文,包含可運行的 G 队列,數量默認等于 CPU 核心數

調度流程

全局队列
+----------------+
|  G1  |  G2  |  G3  |
+----------------+

P0 的本地队列       P1 的本地队列       P2 的本地队列       P3 的本地队列
+----------+       +----------+       +----------+       +----------+
| G4 | G5  |       | G6 | G7  |       | G8 | G9  |       | G10| G11 |
+----------+       +----------+       +----------+       +----------+
    |                     |                     |                     |
    v                     v                     v                     v
+----------+       +----------+       +----------+       +----------+
|    M0    |       |    M1    |       |    M2    |       |    M3    |
| (OS线程) |       | (OS线程) |       | (OS线程) |       | (OS线程) |
+----------+       +----------+       +----------+       +----------+

調度策略:
1. 每个 P 維護一个本地 G 队列,减少鎖竞爭
2. P 從本地队列取 G 交给 M 執行
3. 本地队列空時,從其他 P"偷"一半的 G(Work Stealing)
4. 全局队列作為兜底,每隔一段時間檢查一次

5. 實戰代碼模板

5.1 Python asyncio 高并發模板

python
import asyncio
import aiohttp
from typing import List, Dict
import time

class AsyncHTTPClient:
    """基于 asyncio 的高性能 HTTP 客户端"""

    def __init__(self, max_connections: int = 100, timeout: int = 30):
        self.timeout = aiohttp.ClientTimeout(total=timeout)
        # 限制并發連接數,防止把對方服務打挂
        connector = aiohttp.TCPConnector(
            limit=max_connections,
            limit_per_host=10,  # 對單个域名的連接限制
            enable_cleanup_closed=True,
            force_close=True,
        )
        self.session = aiohttp.ClientSession(
            connector=connector,
            timeout=self.timeout,
        )

    async def fetch(self, url: str, method: str = 'GET', **kwargs) -> Dict:
        """發送單个請求"""
        try:
            async with self.session.request(method, url, **kwargs) as response:
                return {
                    'url': url,
                    'status': response.status,
                    'data': await response.text(),
                    'error': None
                }
        except asyncio.TimeoutError:
            return {'url': url, 'status': None, 'data': None, 'error': 'Timeout'}
        except Exception as e:
            return {'url': url, 'status': None, 'data': None, 'error': str(e)}

    async def fetch_many(self, urls: List[str], concurrency: int = 10) -> List[Dict]:
        """并發獲取多个 URL,限制并發數"""
        semaphore = asyncio.Semaphore(concurrency)

        async def fetch_with_limit(url):
            async with semaphore:
                return await self.fetch(url)

        # 并發執行所有請求
        tasks = [fetch_with_limit(url) for url in urls]
        return await asyncio.gather(*tasks, return_exceptions=True)

    async def close(self):
        await self.session.close()


# 使用示例
async def main():
    client = AsyncHTTPClient(max_connections=50)

    # 要抓取的 URL 列表
    urls = [
        "https://api.github.com/users/github",
        "https://api.github.com/users/google",
        "https://api.github.com/users/microsoft",
        # ... 更多 URL
    ] * 10  # 模擬 300 个請求

    start = time.time()
    results = await client.fetch_many(urls, concurrency=20)
    elapsed = time.time() - start

    # 统計結果
    success = sum(1 for r in results if r.get('status') == 200)
    failed = len(results) - success

    print(f"總請求數: {len(results)}")
    print(f"成功: {success}, 失敗: {failed}")
    print(f"耗時: {elapsed:.2f}s")
    print(f"QPS: {len(results)/elapsed:.1f}")

    await client.close()

if __name__ == "__main__":
    asyncio.run(main())

5.2 Go 高并發服務模板

go
package main

import (
	"context"
	"encoding/json"
	"fmt"
	"log"
	"net/http"
	"runtime"
	"time"

	"golang.org/x/sync/errgroup"
)

// Request/Response 結構
type OrderRequest struct {
	UserID    int64   `json:"user_id"`
	ProductID int64   `json:"product_id"`
	Quantity  int     `json:"quantity"`
	Price     float64 `json:"price"`
}

type OrderResponse struct {
	OrderID   int64   `json:"order_id"`
	Status    string  `json:"status"`
	Total     float64 `json:"total"`
	CreatedAt string  `json:"created_at"`
}

// 模擬數據庫操作
type Database struct {
	orders map[int64]*OrderResponse
	mutex  chan struct{}
}

func NewDatabase() *Database {
	db := &Database{
		orders: make(map[int64]*OrderResponse),
		mutex:  make(chan struct{}, 1), // 模擬互斥鎖
	}
	return db
}

func (db *Database) CreateOrder(ctx context.Context, req *OrderRequest) (*OrderResponse, error) {
	// 獲取鎖
	select {
	case db.mutex <- struct{}{}:
		defer func() { <-db.mutex }()
	case <-ctx.Done():
		return nil, ctx.Err()
	}

	// 模擬數據庫操作延遲
	select {
	case <-time.After(50 * time.Millisecond):
	case <-ctx.Done():
		return nil, ctx.Err()
	}

	order := &OrderResponse{
		OrderID:   time.Now().UnixNano(),
		Status:    "created",
		Total:     req.Price * float64(req.Quantity),
		CreatedAt: time.Now().Format(time.RFC3339),
	}
	db.orders[order.OrderID] = order
	return order, nil
}

// HTTP 處理器
type Handler struct {
	db *Database
}

func NewHandler(db *Database) *Handler {
	return &Handler{db: db}
}

func (h *Handler) CreateOrder(w http.ResponseWriter, r *http.Request) {
	// 設置請求超時
	ctx, cancel := context.WithTimeout(r.Context(), 2*time.Second)
	defer cancel()

	var req OrderRequest
	if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
		http.Error(w, err.Error(), http.StatusBadRequest)
		return
	}

	order, err := h.db.CreateOrder(ctx, &req)
	if err != nil {
		if err == context.DeadlineExceeded {
			http.Error(w, "Request timeout", http.StatusGatewayTimeout)
			return
		}
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}

	w.Header().Set("Content-Type", "application/json")
	json.NewEncoder(w).Encode(order)
}

func (h *Handler) Health(w http.ResponseWriter, r *http.Request) {
	info := map[string]interface{}{
		"status":    "ok",
		"goroutine": runtime.NumGoroutine(),
		"cpu":       runtime.NumCPU(),
		"version":   runtime.Version(),
	}
	w.Header().Set("Content-Type", "application/json")
	json.NewEncoder(w).Encode(info)
}

// 批量處理示例
func BatchProcess(ctx context.Context, items []int) ([]int, error) {
	g, ctx := errgroup.WithContext(ctx)
	g.SetLimit(10) // 限制并發數為 10

	results := make([]int, len(items))

	for i, item := range items {
		i, item := i, item // 避免閉包陷阱
		g.Go(func() error {
			select {
			case <-ctx.Done():
				return ctx.Err()
			default:
				// 模擬處理
				time.Sleep(100 * time.Millisecond)
				results[i] = item * 2
				return nil
			}
		})
	}

	if err := g.Wait(); err != nil {
		return nil, err
	}
	return results, nil
}

func main() {
	// 初始化數據庫
	db := NewDatabase()

	// 創建處理器
	handler := NewHandler(db)

	// 設置路由
	mux := http.NewServeMux()
	mux.HandleFunc("/order", handler.CreateOrder)
	mux.HandleFunc("/health", handler.Health)

	// 創建服務器
	server := &http.Server{
		Addr:         ":8080",
		Handler:      mux,
		ReadTimeout:  5 * time.Second,
		WriteTimeout: 10 * time.Second,
		IdleTimeout:  120 * time.Second,
	}

	fmt.Println("Server starting on :8080")
	fmt.Printf("Go version: %s\n", runtime.Version())
	fmt.Printf("CPU cores: %d\n", runtime.NumCPU())

	if err := server.ListenAndServe(); err != nil {
		log.Fatal(err)
	}
}

6. 總結對照表

6.1 核心概念對比

特性進程线程協程
調度者操作系统操作系统用户程序/運行時
切换開銷~1-10ms~1-10μs~100ns
內存占用~10MB+~1MB~2KB
通信方式IPC共享內存共享內存/Channel
同步需求不需要需要鎖需要鎖/協作式
崩溃影響僅本進程整个進程可控制
適用場景強隔離、多租户CPU 密集型I/O 密集型
典型語言所有語言所有語言Go、Python、JS、Rust

6.2 并發模型選型指南

場景推荐模型理由
Web 服務網關協程 + 异步 I/O高并發連接,低內存占用
實時通信服務協程 + 長連接維持大量 WebSocket 連接
數據處理管道多進程 + 協程利用多核,I/O 不阻塞
科學計算多线程/多進程CPU 密集型,需要并行計算
微服務架構多進程 + 協程服務間隔離,內部高并發
嵌入式系统協程/單线程资源受限,确定性調度

6.3 名词對照表

英文術語中文對照解釋
Process進程操作系统资源分配的基本單位,擁有独立的內存空間
Thread线程CPU 調度的基本單位,共享進程內存空間
Coroutine協程用户態輕量级线程,由程序自主調度
Concurrency并發多个任務交替執行,宏观上同時推進
Parallelism并行多个任務真正同時執行,需要多核支持
Context Switch上下文切换CPU 從一个任務切换到另一个任務的過程
Blocking I/O阻塞 I/O發起 I/O 請求後等待完成,期間线程挂起
Non-blocking I/O非阻塞 I/O發起 I/O 請求後立即返回,不等待結果
Async I/O异步 I/OI/O 完成時通過回調或通知機制告知調用者
Event Loop事件循環協程調度機制,持續監听事件并分發處理
GoroutineGo 協程Go 語言的輕量级线程實現
Channel通道Go 語言中協程間通信的機制
Mutex互斥鎖用于保護共享资源的同步原語
Semaphore信号量控制同時访問某资源的线程數量
Deadlock死鎖多个线程互相等待對方釋放资源,導致永久阻塞
Race Condition竞態條件多个线程同時访問共享數據,導致結果不确定
Thread Pool线程池预先創建一組线程,複用以减少創建銷毁開銷
Work Stealing工作窃取空閒线程從忙碌线程的队列中"偷"任務執行
Zero-copy零拷贝數據在內核態和用户態之間傳輸時不經過 CPU 拷贝
C10K ProblemC10K 問题單機同時處理 1 万个連接的挑戰
C10M ProblemC10M 問题單機同時處理 1000 万个連接的终极挑戰

7. 写在最後

7.1 并發編程的黃金法则

  1. 不要過早優化:先讓代碼正确運行,再考虑性能優化
  2. 避免共享狀態:"不要通過共享內存來通信,而要通過通信來共享內存"
  3. 讓錯误尽早暴露:并發 Bug 往往難以複現,要在測試階段尽可能暴露
  4. 限制并發數:无限并發等于没有保護,要用信号量或連接池限制
  5. 監控和可观測:并發系统必须有完善的監控,才能快速定位問题

7.2 學習路线图

階段 1: 基础理解
    ├── 理解進程/线程的基本概念
    ├── 學習同步原語(鎖、信号量、條件變量)
    └── 編写简單的多线程程序

階段 2: 深入原理
    ├── 理解內存模型和可见性
    ├── 學習无鎖編程和原子操作
    ├── 理解线程池和工作窃取
    └── 分析死鎖和竞態條件

階段 3: 高级應用
    ├── 掌握協程和异步編程
    ├── 學習 Go/Python/Rust 的并發模型
    ├── 理解分布式系统中的并發
    └── 性能調優和容量規划

階段 4: 專家水平
    ├── 設計高并發系统架構
    ├── 解决複雜的并發 Bug
    ├── 開發并發編程框架
    └── 分享和傳播并發知識

希望這篇指南能帮助你建立起對并發編程的系统認知。記住,并發不是目的,而是手段——真正的目標是構建高性能、高可用的服務。理解原理、選對模型、写好代碼,你就能在并發這條路上越走越遠。