Chapter 06

并行编程

用 parallelize、vectorize、tile 三件套,全面榨干 CPU 的多核与缓存潜力

Mojo 并行化的三层优化

为什么需要三层?

现代 CPU 的性能来自三个维度的并行:多核并行(多个物理核心同时工作)、SIMD 并行(一个核心同时处理多个数据元素)、缓存层次(L1/L2/L3,访问越快、容量越小)。单纯提升某一层的利用率不够,需要三层协同,才能接近硬件理论峰值。

CPU 性能三层模型: 第一层:多核 (parallelize) ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ Core 0 │ │ Core 1 │ │ Core 2 │ │ Core 3 │ └─────────┘ └─────────┘ └─────────┘ └─────────┘ ↑ ↑ 第二层:SIMD (vectorize) 每个核心 → 向量执行单元 → 8x/16x 数据吞吐 ↑ 第三层:缓存 (tile) L1 Cache: 32KB, 4 cycles L2 Cache: 256KB, 12 cycles L3 Cache: 6MB+, 30-40 cycles RAM: GBs, 100+ cycles

parallelize():自动并行化循环

基本用法

from algorithm import parallelize
from sys.info import num_physical_cores

fn parallel_square(data: List[Float64], inout result: List[Float64]):
    let n = len(data)

    @parameter
    fn compute_chunk(i: Int):
        # 每个工作项处理一个元素
        result[i] = data[i] * data[i]

    # 自动分配到所有可用 CPU 核心
    parallelize[compute_chunk](n, num_physical_cores())

fn main():
    print("CPU 核心数:", num_physical_cores())

    var data = List[Float64]()
    for i in range(1_000_000):
        data.append(Float64(i))

    var result = List[Float64](len(data))
    parallel_square(data, result)
    print("完成并行平方计算,首个元素:", result[10])  # 100.0

parallelize 的参数说明

func[sw](i)
工作函数,接受一个整数索引 i,表示要处理的工作项编号。Mojo 自动将 0...n-1 的工作项分配给各核心。
num_work_items
总工作项数量(通常是数组长度)。
num_workers
并行工作线程数,通常设为 num_physical_cores()。超过物理核心数不会带来额外收益,反而增加调度开销。

vectorize():向量化循环

vectorize 与 parallelize 的协同

vectorize 在单核内利用 SIMD 并行(第5章的内容)。将 vectorize 嵌套在 parallelize 内部,可以同时利用多核和 SIMD,是 Mojo 最高效的计算模式。

from algorithm import parallelize, vectorize
from sys.info import simdwidthof, num_physical_cores
from math import sqrt

alias SIMD_W = simdwidthof[DType.float32]()

fn vector_sqrt(
    data: UnsafePointer[Float32],
    inout result: UnsafePointer[Float32],
    n: Int
):
    # 外层:多核并行(按行划分)
    @parameter
    fn parallel_chunk(chunk_idx: Int):
        let chunk_size = n // num_physical_cores()
        let start = chunk_idx * chunk_size
        let end = min(start + chunk_size, n)

        # 内层:SIMD 向量化(每次处理 SIMD_W 个元素)
        @parameter
        fn simd_sqrt[sw: Int](i: Int):
            var v = SIMD[DType.float32, sw].load(data + start + i)
            sqrt(v).store(result + start + i)

        vectorize[simd_sqrt, SIMD_W](end - start)

    parallelize[parallel_chunk](num_physical_cores())

tile():内存访问优化

缓存友好性的重要性

矩阵乘法的朴素实现按行访问 A、按列访问 B。访问 B 的一列需要跨越整个矩阵内存,极易造成缓存未命中(cache miss),大量时间浪费在等待内存上。tile() 将矩阵分成小块(tile),确保每个 tile 都能完整放入 L1/L2 缓存,大幅减少内存延迟。

Tiling 优化示意: 未 tiling(缓存不友好): A[i,k] 顺序访问(好) B[k,j] 列访问(坏!跨行跳跃,缓存不命中) Tiling(缓存友好): 将 C、A、B 各切成 T×T 的小块 每个块 = T*T * 4 bytes ≈ 32×32×4 = 4KB (适合 L1) for i_block in range(0, M, T): for j_block in range(0, N, T): for k_block in range(0, K, T): # 处理 T×T 的子块,全程在 L1 缓存中
from algorithm import tile, vectorize

alias TILE_SIZE = 32   # 32*32*4 = 4KB,适合大多数 CPU 的 L1 缓存
alias SIMD_W = simdwidthof[DType.float32]()

fn matmul_tiled(
    C: UnsafePointer[Float32],
    A: UnsafePointer[Float32],
    B: UnsafePointer[Float32],
    M: Int, N: Int, K: Int
):
    @parameter
    fn calc_tile[tile_j: Int, tile_k: Int](jo: Int, ko: Int):
        for i in range(M):
            for k in range(ko, ko + tile_k):
                @parameter
                fn dot[sw: Int](j: Int):
                    var a = SIMD[DType.float32, sw](A[i * K + k])
                    var b = SIMD[DType.float32, sw].load(B + k * N + jo + j)
                    var c = SIMD[DType.float32, sw].load(C + i * N + jo + j)
                    (c + a * b).store(C + i * N + jo + j)
                vectorize[dot, SIMD_W](tile_j)

    tile[calc_tile, TILE_SIZE, TILE_SIZE](N, K)

实战:并行图像处理(像素滤波加速)

from algorithm import parallelize, vectorize
from sys.info import simdwidthof, num_physical_cores
from math import min, max

alias SIMD_W = simdwidthof[DType.uint8]()  # 图像用 UInt8

fn brightness_adjust(
    src: UnsafePointer[UInt8],
    inout dst: UnsafePointer[UInt8],
    width: Int,
    height: Int,
    delta: Int8    # 亮度调整量 (-128 ~ 127)
):
    """并行 + SIMD 调整 RGB 图像亮度。"""
    let total_pixels = width * height * 3  # RGB 每像素 3 字节

    @parameter
    fn process_row(row: Int):
        let row_start = row * width * 3
        var d = SIMD[DType.int16, SIMD_W](Int16(delta))

        @parameter
        fn adjust_pixels[sw: Int](i: Int):
            # 加载 sw 个像素字节
            var pixels = SIMD[DType.uint8, sw].load(src + row_start + i)
            # 转为 Int16 防止溢出,加上 delta,截断到 0-255
            var p16 = pixels.cast[DType.int16]() + d.cast[DType.int16]()
            var clamped = p16.min(255).max(0).cast[DType.uint8]()
            clamped.store(dst + row_start + i)

        vectorize[adjust_pixels, SIMD_W](width * 3)

    # 每行分配一个工作项,多核并行处理所有行
    parallelize[process_row](height, num_physical_cores())

# 性能参考(1920x1080 图像,8核 CPU):
# 纯 Python 循环:~1200ms
# PIL/Pillow:~15ms(底层 C 实现)
# Mojo(vectorize only):~3ms
# Mojo(parallelize + vectorize):~0.5ms(约 2400 倍 vs 纯 Python)

多核利用率分析

Amdahl 定律:并行化的边界

并不是所有代码都能从并行化中受益相同程度。Amdahl 定律指出:程序中只有可并行化的部分才能从多核加速,串行部分(如数据读取、结果汇总)决定了并行加速的上限。

何时不应该用 parallelize

问题太小:工作项数量不足以摊销线程创建/调度开销(通常 < 10,000 元素)。
有数据依赖:循环迭代间有依赖(如累加),直接并行会导致结果错误,需要使用 atomic 操作或分治合并。
内存带宽瓶颈:计算量少但数据量极大,多核反而会争夺内存带宽,不如单核 + 向量化。

本章小结

三层优化parallelize(多核)、vectorize(SIMD)、tile(缓存),三者组合可以逼近 CPU 理论峰值性能。
parallelize:将循环工作项分配给多个 CPU 核心,线性扩展到核心数。
vectorize:将循环用 SIMD 向量化,每个核心可获得 8-16x 吞吐量提升。
tile:分块访问,保证工作数据集能放入 L1/L2 缓存,减少内存延迟。
实战:parallelize + vectorize 组合处理 1080p 图像亮度调整,比纯 Python 快 ~2400 倍。