跳转到内容

Python 进程池

来自代码酷


Python进程池Python并发编程中用于管理和复用多个进程的高效工具,通过multiprocessing模块的Pool类实现。它允许开发者将任务分配给一组预创建的进程,避免频繁创建/销毁进程的开销,特别适合处理CPU密集型任务

核心概念[编辑 | 编辑源代码]

进程池的核心机制是“任务队列+工作进程”模型:

  • 主进程将任务放入队列
  • 工作进程从队列获取任务并执行
  • 结果通过队列或共享内存返回

graph LR A[主进程] -->|提交任务| B[任务队列] B --> C[进程1] B --> D[进程2] B --> E[...] C --> F[结果队列] D --> F E --> F F --> A

基本用法[编辑 | 编辑源代码]

创建进程池[编辑 | 编辑源代码]

使用multiprocessing.Pool初始化:

from multiprocessing import Pool

# 创建包含4个工作进程的池
pool = Pool(processes=4)

任务提交方法[编辑 | 编辑源代码]

常用方法对比
方法 描述 返回类型
apply() 同步阻塞式提交 直接返回结果
apply_async() 异步非阻塞提交 AsyncResult对象
map() 批量同步处理 结果列表
map_async() 批量异步处理 AsyncResult对象

示例:计算平方[编辑 | 编辑源代码]

同步方式:

def square(x):
    return x * x

if __name__ == '__main__':
    with Pool(4) as p:
        results = p.map(square, range(10))
    print(results)  # 输出: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

异步方式:

if __name__ == '__main__':
    with Pool(4) as p:
        async_result = p.map_async(square, range(10))
        print("继续执行其他操作...")
        results = async_result.get()  # 阻塞获取结果
    print(results)

高级特性[编辑 | 编辑源代码]

回调函数[编辑 | 编辑源代码]

可为异步任务添加完成回调:

def callback(result):
    print(f"任务完成,结果为: {result}")

if __name__ == '__main__':
    with Pool(2) as p:
        p.apply_async(square, (5,), callback=callback)
        p.close()
        p.join()

错误处理[编辑 | 编辑源代码]

通过error_callback捕获异常:

def error_handler(e):
    print(f"发生错误: {str(e)}")

def risky_operation(x):
    if x == 3:
        raise ValueError("特殊值错误")
    return x * x

if __name__ == '__main__':
    with Pool(2) as p:
        p.apply_async(risky_operation, (3,), 
                     error_callback=error_handler)
        p.close()
        p.join()

性能优化[编辑 | 编辑源代码]

进程数选择[编辑 | 编辑源代码]

最优进程数通常为CPU核心数,可通过以下公式估算: Noptimal=Ncores×(1+WC) 其中:

  • W:等待时间
  • C:计算时间

数据分块[编辑 | 编辑源代码]

处理大数据集时合理设置chunksize

# 将10000个任务分成每块100个
pool.map(func, range(10000), chunksize=100)

实际应用案例[编辑 | 编辑源代码]

图像批量处理[编辑 | 编辑源代码]

from PIL import Image
import os

def process_image(filename):
    with Image.open(filename) as img:
        img = img.filter(ImageFilter.GaussianBlur(2))
        img.save(f"processed_{filename}")

if __name__ == '__main__':
    image_files = ['a.jpg', 'b.jpg', 'c.jpg']
    with Pool() as p:
        p.map(process_image, image_files)

科学计算加速[编辑 | 编辑源代码]

并行计算π的蒙特卡洛近似:

import random

def monte_carlo(n):
    inside = 0
    for _ in range(n):
        x, y = random.random(), random.random()
        if x**2 + y**2 <= 1:
            inside += 1
    return 4 * inside / n

if __name__ == '__main__':
    with Pool(4) as p:
        results = p.map(monte_carlo, [10**6]*4)
    pi_approx = sum(results)/len(results)
    print(f"π ≈ {pi_approx}")

常见问题[编辑 | 编辑源代码]

进程间通信[编辑 | 编辑源代码]

  • 使用QueuePipe或共享内存
  • 避免传递不可pickle的对象

Windows系统限制[编辑 | 编辑源代码]

在Windows下必须保护入口代码:

if __name__ == '__main__':
    # 进程池代码

最佳实践[编辑 | 编辑源代码]

  1. 始终使用with语句自动管理资源
  2. 避免在子进程中修改全局状态
  3. 对于IO密集型任务考虑线程池替代
  4. 监控进程状态:pool._processespool._taskqueue

参见[编辑 | 编辑源代码]