Python 进程池
外观
Python进程池是Python并发编程中用于管理和复用多个进程的高效工具,通过multiprocessing模块的Pool
类实现。它允许开发者将任务分配给一组预创建的进程,避免频繁创建/销毁进程的开销,特别适合处理CPU密集型任务。
核心概念[编辑 | 编辑源代码]
进程池的核心机制是“任务队列+工作进程”模型:
- 主进程将任务放入队列
- 工作进程从队列获取任务并执行
- 结果通过队列或共享内存返回
基本用法[编辑 | 编辑源代码]
创建进程池[编辑 | 编辑源代码]
使用multiprocessing.Pool
初始化:
from multiprocessing import Pool
# 创建包含4个工作进程的池
pool = Pool(processes=4)
任务提交方法[编辑 | 编辑源代码]
方法 | 描述 | 返回类型 |
---|---|---|
apply() |
同步阻塞式提交 | 直接返回结果 |
apply_async() |
异步非阻塞提交 | AsyncResult 对象
|
map() |
批量同步处理 | 结果列表 |
map_async() |
批量异步处理 | AsyncResult 对象
|
示例:计算平方[编辑 | 编辑源代码]
同步方式:
def square(x):
return x * x
if __name__ == '__main__':
with Pool(4) as p:
results = p.map(square, range(10))
print(results) # 输出: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
异步方式:
if __name__ == '__main__':
with Pool(4) as p:
async_result = p.map_async(square, range(10))
print("继续执行其他操作...")
results = async_result.get() # 阻塞获取结果
print(results)
高级特性[编辑 | 编辑源代码]
回调函数[编辑 | 编辑源代码]
可为异步任务添加完成回调:
def callback(result):
print(f"任务完成,结果为: {result}")
if __name__ == '__main__':
with Pool(2) as p:
p.apply_async(square, (5,), callback=callback)
p.close()
p.join()
错误处理[编辑 | 编辑源代码]
通过error_callback
捕获异常:
def error_handler(e):
print(f"发生错误: {str(e)}")
def risky_operation(x):
if x == 3:
raise ValueError("特殊值错误")
return x * x
if __name__ == '__main__':
with Pool(2) as p:
p.apply_async(risky_operation, (3,),
error_callback=error_handler)
p.close()
p.join()
性能优化[编辑 | 编辑源代码]
进程数选择[编辑 | 编辑源代码]
最优进程数通常为CPU核心数,可通过以下公式估算: 其中:
- :等待时间
- :计算时间
数据分块[编辑 | 编辑源代码]
处理大数据集时合理设置chunksize
:
# 将10000个任务分成每块100个
pool.map(func, range(10000), chunksize=100)
实际应用案例[编辑 | 编辑源代码]
图像批量处理[编辑 | 编辑源代码]
from PIL import Image
import os
def process_image(filename):
with Image.open(filename) as img:
img = img.filter(ImageFilter.GaussianBlur(2))
img.save(f"processed_{filename}")
if __name__ == '__main__':
image_files = ['a.jpg', 'b.jpg', 'c.jpg']
with Pool() as p:
p.map(process_image, image_files)
科学计算加速[编辑 | 编辑源代码]
并行计算的蒙特卡洛近似:
import random
def monte_carlo(n):
inside = 0
for _ in range(n):
x, y = random.random(), random.random()
if x**2 + y**2 <= 1:
inside += 1
return 4 * inside / n
if __name__ == '__main__':
with Pool(4) as p:
results = p.map(monte_carlo, [10**6]*4)
pi_approx = sum(results)/len(results)
print(f"π ≈ {pi_approx}")
常见问题[编辑 | 编辑源代码]
进程间通信[编辑 | 编辑源代码]
- 使用
Queue
、Pipe
或共享内存 - 避免传递不可pickle的对象
Windows系统限制[编辑 | 编辑源代码]
在Windows下必须保护入口代码:
if __name__ == '__main__':
# 进程池代码
最佳实践[编辑 | 编辑源代码]
- 始终使用
with
语句自动管理资源 - 避免在子进程中修改全局状态
- 对于IO密集型任务考虑线程池替代
- 监控进程状态:
pool._processes
、pool._taskqueue