线程池原理与使用
外观
线程池原理与使用[编辑 | 编辑源代码]
线程池(Thread Pool)是Java并发编程中管理线程生命周期的核心工具,通过复用已创建的线程减少资源消耗,提高系统响应速度。本文将深入解析其工作原理、参数配置及最佳实践。
核心概念[编辑 | 编辑源代码]
线程池解决两个关键问题:
- 线程生命周期开销:避免频繁创建/销毁线程
- 资源耗尽风险:防止无限制创建线程导致OOM
Java通过java.util.concurrent.ExecutorService
接口及其实现类提供线程池支持。
线程池架构[编辑 | 编辑源代码]
核心参数解析[编辑 | 编辑源代码]
线程池通过ThreadPoolExecutor
的7个参数控制行为:
参数 | 类型 | 说明 |
---|---|---|
corePoolSize | int | 核心线程数(常驻线程) |
maximumPoolSize | int | 最大线程数(含核心线程) |
keepAliveTime | long | 非核心线程空闲存活时间(纳秒) |
unit | TimeUnit | 存活时间单位 |
workQueue | BlockingQueue<Runnable> | 任务队列 |
threadFactory | ThreadFactory | 线程创建工厂 |
handler | RejectedExecutionHandler | 拒绝策略 |
工作流程[编辑 | 编辑源代码]
数学表达线程池状态转换:
四种常用线程池[编辑 | 编辑源代码]
Java通过Executors
工厂类提供预配置线程池:
类型 | 实现方式 | 特点 | 适用场景 |
---|---|---|---|
FixedThreadPool | new ThreadPoolExecutor(n, n, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()) |
固定大小队列无界 | CPU密集型任务 |
CachedThreadPool | new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()) |
自动扩容线程 | 短生命周期任务 |
SingleThreadExecutor | new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()) |
单线程顺序执行 | 需要顺序执行的任务 |
ScheduledThreadPool | new ScheduledThreadPoolExecutor(corePoolSize) |
支持定时/周期任务 | 定时任务调度 |
代码示例[编辑 | 编辑源代码]
基础使用[编辑 | 编辑源代码]
// 创建固定大小线程池
ExecutorService pool = Executors.newFixedThreadPool(4);
// 提交任务
Future<String> future = pool.submit(() -> {
Thread.sleep(1000);
return "Task completed";
});
// 获取结果
System.out.println(future.get()); // 输出: Task completed
// 关闭线程池
pool.shutdown();
自定义线程池[编辑 | 编辑源代码]
ThreadPoolExecutor customPool = new ThreadPoolExecutor(
2, // 核心线程数
5, // 最大线程数
60, // 空闲时间
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(10), // 有界队列
new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
);
// 监控示例
System.out.println("Active threads: " + customPool.getActiveCount());
System.out.println("Queue size: " + customPool.getQueue().size());
拒绝策略[编辑 | 编辑源代码]
当线程池和队列都饱和时触发拒绝策略:
策略类 | 行为 | 类比 |
---|---|---|
AbortPolicy | 抛出RejectedExecutionException | 默认策略 |
CallerRunsPolicy | 由调用者线程执行任务 | 同步降级 |
DiscardPolicy | 静默丢弃任务 | 空实现 |
DiscardOldestPolicy | 丢弃队列最老任务 | 队列淘汰 |
实际案例[编辑 | 编辑源代码]
Web服务器请求处理[编辑 | 编辑源代码]
// 模拟HTTP服务器
public class WebServer {
private static final ThreadPoolExecutor workerPool = new ThreadPoolExecutor(
8, 32, 30, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000),
new ThreadPoolExecutor.AbortPolicy());
public void handleRequest(HttpRequest request) {
workerPool.execute(() -> {
// 处理请求逻辑
processRequest(request);
});
}
}
批量数据处理[编辑 | 编辑源代码]
// 并行处理百万级数据
List<Data> dataList = getHugeDataList(); // 获取数据
ExecutorService batchPool = Executors.newWorkStealingPool(); // JDK8+工作窃取池
List<CompletableFuture<Void>> futures = dataList.stream()
.map(data -> CompletableFuture.runAsync(() ->
processData(data), batchPool))
.collect(Collectors.toList());
// 等待所有任务完成
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
最佳实践[编辑 | 编辑源代码]
- 队列选择:
- CPU密集型:使用有界队列(如ArrayBlockingQueue)
- IO密集型:使用无界队列(如LinkedBlockingQueue)
- 线程数设置:
- CPU密集型:
Nthreads = Ncores + 1
- IO密集型:
Nthreads = Ncores × (1 + WT/ST)
(WT:等待时间,ST:服务时间)
- CPU密集型:
- 关闭规范:
pool.shutdown(); // 温和关闭
if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
pool.shutdownNow(); // 强制关闭
}
常见问题[编辑 | 编辑源代码]
页面模块:Message box/ambox.css没有内容。
以下为典型错误用法 |
- 内存泄漏:未关闭线程池导致JVM无法退出
- 队列堆积:任务生产速度 > 消费速度导致OOM
- 死锁风险:线程池任务又提交新任务到同一池
性能监控[编辑 | 编辑源代码]
通过扩展ThreadPoolExecutor
实现监控:
class MonitorableThreadPool extends ThreadPoolExecutor {
protected void beforeExecute(Thread t, Runnable r) {
System.out.printf("Task start - Active: %d, Completed: %d%n",
getActiveCount(), getCompletedTaskCount());
}
}
进阶知识[编辑 | 编辑源代码]
- 工作窃取算法:
ForkJoinPool
的实现原理 - 线程池预热:提前启动核心线程
prestartAllCoreThreads()
- 动态调参:运行时修改
setCorePoolSize()