线程池学习
线程池概念
使用线程池的优势
线程池是一种基于池化管理线程的工具,经常出现在多线程服务器中。既避免处理任务时创建销毁线程的开销,也避免了线程数据膨胀导致过分调度的问题。使用线程池的好处有:
- 降低资源消耗:通过池化技术重复利用已创建的线程,降低线程创建和销毁造成的损耗。
- 提高响应速度:任务到达时,无需等待线程创建即可执行。
- 提高线程的可管理性:线程是稀缺资源,无限制的创建既会销毁系统资源,也会导致调度失衡,减低系统稳定性。使用线程池可以进行统一的分配、调优和监控。
- 提供更多更强大的功能:基于线程池提供的钩子函数,可以扩展线程池的行为。
线程池的设计与实现
继承关系及类作用介绍
Executor
Executor
接口提供了一种思想:将任务提交和任务执行分离。用户无需关注线程的创建、调度和任务执行过程,只需要将表示任务的 Runnable
对象提交到执行器 Executor
中,由 Executor
完成线程的调配和任务的执行部分。
ExecutorService
ExecutorService
接口增加了一些能力:
- 扩展执行任务的能力:补充可以为一个或一批异步任务生成
Future
的方法。 - 提供了管理线程池的方法:停止线程池、查看任务数量等。
AbstractExecutorService
AbstractExecutorService
是上层抽象类,将任务执行的流程串联起来,保证下层的实现只需要关注一个执行任务的方法即可。
ThreadPoolExecutor
ThreadPoolExecutor
一方面维护自身的生命周期,另一方面管理线程和任务。
线程池在内部构建了一个生产者消费者模型,将线程和任务两者解耦,并不直接关联,从而良好的缓冲任务,复用线程。线程池的运行主要分为两部分:任务管理和线程管理。
任务管理充当生产者角色,当任务提交后,线程会判断该任务的后续流转:(1)直接申请线程执行该任务;(2)缓冲到队列中等待线程执行;(3)拒绝该任务。
线程管理部分是消费者,它们被维护在线程池内,根据任务请求进行线程的分配,当线程执行完任务后则继续获取新的任务去执行,最终当线程获取不到任务时就会被回收。
线程池生命周期管理
线程池内部使用一个变量来维护两个值:运行状态(runState)和线程数量(workCount):
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
高 3 位保存 runState,低 29 位保存 workCount,用一个变量存储两个值,可以避免使用锁维护两者的一致。
线程池的运行状态有 5 种,分别是:
状态 | 含义 |
---|---|
RUNNING | 能接收新任务,也能处理排队任务 |
SHUTWODN | 关闭状态,不接受新任务,但处理排队任务 |
STOP | 不接受新任务,不处理排队任务,并中断正在进行的任务 |
TIDYING | 所有任务都已经终止,workerCount 为零,并调用terminated 钩子方法 |
TERMINATED | terminated() 方法执行完成后进入该状态 |
其生命周期转换关系如下:
任务执行机制
所有任务的调度都是由 execute
方法完成的,这部分完成的工作是:检查现在线程池的运行状态、运行线程数、运行策略,决定接下来执行的流程,是直接申请线程执行,或是缓冲到队列中执行,亦或是直接拒绝该任务。其执行流程如下:
- 首先检测线程池运行状态,如果不是
RUNNING
,则直接拒绝,线程池要保证在RUNNING
状态下执行任务; - 如果
workerCount < corePoolSize
,则创建并启动一个线程来执行新提交的任务; - 如果
workerCount >= corePoolSize
,且线程池内的阻塞队列未满,则将该任务添加到阻塞队列中; - 如果
workCount >= corePoolSize && workerCount < maximumPoolSize
,且线程池内的阻塞队列已满,则创建一个新线程来执行新提交的任务; - 如果
workerCount >= maximumPoolSize
,并且线程池内的阻塞队列已满,则根据拒绝策略来处理该任务,默认的处理方式是直抛出异常。
其执行流程如图所示:
任务拒绝模块是线程池的保护部分,线程池有一个最大的容量,当线程池的任务缓存队列已满,并且线程池中的线程数达到 maximumPoolSize
时,就需要拒绝掉该任务,采取任务拒绝策略,保护线程池。
拒绝策略是一个接口,设计如下:
public interface RejectedExecutionHandler {
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
用户可以通过实现这个接口定制拒绝策略,也可以选择 JDK 提供的四种拒绝策略:
AbortPolicy
:抛出RejectedExecutionException
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); }
DiscardPolicy
:什么也不做,直接忽略public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { }
DiscardOldestPolicy
:丢弃执行队列中最老的任务,尝试为当前提交的任务腾出位置public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } }
CallerRunsPolicy
:直接由提交任务者执行这个任务public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } }
线程池的默认拒绝策略是:
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
Worker 线程管理
线程池为了掌握线程的状态并维护线程的生命周期,设计了线程池内部工作线程 Worker,部分代码如下:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
// worker 持有的线程
final Thread thread;
// 初始化的任务
Runnable firstTask;
public void run() {
runWorker(this);
}
}
Worker 线程实现了Runnable
接口,并持有一个线程 thread
,一个初始化任务 firstTask
。thread
是在调用构造方式时通过 ThreadFactory
来创建的线程,可以用来执行任务;firstTask
用来保存传入的第一个任务。
线程池需要管理线程的生命周期,需要在线程长时间不允许的时候进行回收。线程池使用 HashSet
持有线程的引用,这样可以通过添加引用、移除引用的操作来控制线程的生命周期。
Worker 通过继承 AQS,使用 AQS 来实现独占锁这个功能。
lock
方法一旦获取了独占锁,表示当前线程正在执行任务中。- 如果正在执行任务,则不应该中断线程,如果该线程现在不是独占锁状态,也就是空闲状态,说明它没有在处理任务,这时可以对该线程进行中断。
- 线程池在执行
shutdown
方法或tryTerminate
方法时会调用interruptIdleWorkers
方法来中断空闲线程,interruptIdleWorkers
方法会使用tryLock
方法来判断线程池中的线程是否是空闲状态,如果线程是空闲状态则可以安全回收。
增加线程是通过 addWorker
方法完成的。addWorker
方法有两个参数:firstTask
、core
。firstTask
参数用于指定新增的线程执行的第一个任务,该参数可以为空;core
参数为 true
时表示新增线程时会判断当前活动的线程数是否少于 corePoolSize
,false
表示新增线程前需要判断当前活动线程数是否少于 maximumPoolSize
,其增加流程如下:
Worker 类中的 run()
方法调用了 runWorker()
方法来执行任务,其执行过程如下:
- 循环通过
getTask()
方法获取任务; - 如果线程池正在停止,那么保证当前线程是中断状态,否则保证当前线程不是中断状态;
- 执行任务。
- 获取不到任务时,执行
processWorkerExit()
方法主动销毁线程。
线程池中线程的销毁依赖 JVM 自动的回收,线程池做的工作就是根据当前线程池状维护一定数量的线程引用,防止这部分线程被 JVM 回收,当线程池决定哪些线程需要回收时,只需要将其引用消除即可。Worker 被创建出来后,就会不断循环获取任务执行,核心线程可以无限等待获取任务,非核心线程要限时获取任务。当 Worker 无法获取到任务时,循环会结束,Worker 会主动消除自身在线程池内的引用。
final void runWorker(Worker w) {
try {
while (task != null || (task = getTask()) != null) {
// 执行任务
}
} finally {
// 获取不到任务时,主动回收自己
processWorkerExit(w, completedAbruptly);
}
}
private void processWorkerExit(Worker w, boolean completedAbruptly) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
// 将线程引用移除线程池
workers.remove(w);
} finally {
mainLock.unlock();
}
}
创建和停止线程池
线程池的构造器的参数
参数名 | 类型 | 含义 |
---|---|---|
corePoolSize | int | 核心线程数 |
maximumPoolSize | int | 最大线程数 |
keepAliveTime | long | 保持存活时间 |
workQueue | BlockingQueue | 任务存储队列 |
threadFactory | ThreadFactory | 使用 threadFactory 创建新的线程 |
handler | RejectedExecutionHandler | 拒绝策略 |
corePoolSize
:指的是核心线程数。maxPoolSize
:指的是最大线程数。- 如果线程数小于
corePoolSize
,创建一个新的线程来运行新任务; - 如果线程数等于或大于
corePoolSize
但小于maximumPoolSize
,则将任务添加到任务队列中; - 如果队列已满,且线程数小于
maximumPoolSize
,则创建新线程; - 如果队列已满,且线程数达到
maximumPoolSize
,则调用handler
执行拒绝策略。
- 如果线程数小于
keepAliveTime
:如果线程池当前的线程数多于corePoolSize
,那么多余的线程闲置超过指定时间会被终止。threadFactory
:创建线程的工厂。默认线程池创建的线程的属性为:
- 属于同一个
ThreadGroup
- 线程池和线程的名称按序号递增
- 不是 daemon 线程
- 线程的优先级为
NORM_PRIORITY
private final String namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-"; public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon()) t.setDaemon(false); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; }
- 属于同一个
workQueue
:任务队列- 直接交接:
SynchronousQueue
- 无界队列:
LinkedBlockingQueue
- 有界队列:
ArrayBlockingQueue
- 直接交接:
Executors创建线程池
方法名 | 功能 | 缺点 |
---|---|---|
newFixedThreadPool | 创建固定大小的线程池 | 容易造成大量内存占用,导致 OOM |
newSingleThreadExecutor | 创建只有一个线程的线程池 | 当请求堆积的时候,占用大量内存 |
newCachedThreadPool | 创建一个不设线程上限的线程池,任何任务都将立即执行 | 创建数量非常多的线程,导致 OOM |
newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory){ return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory); }
newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory)); }
newCachedThreadPool
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), threadFactory); }
线程池的线程数量如何设定
- CPU 密集型(加密、计算 hash):最佳线程数为CPU 核心数的 1-2 倍左右;
- 耗时 IO 型(读写数据库、文件、网络):最大线程数一般会大于 CPU 核心数很多倍。
- 线程数 = CPU 核心数 * (1 + 平均等待时间 / 平均工作时间)
停止线程池的方法
shutdown
- 将线程池的状态置为
SHUTDOWN
- 调用此方法后,不允许继续提交任务,即调用指定的拒绝策略拒绝任务
- 所有在调用此方法前提交的任务都会被执行
- 所有任务被执行完毕,
ExecutorService
才会真正关闭
- 将线程池的状态置为
shutdownNow
- 将线程池的状态置为
STOP
- 使用中断操作尝试停止运行中的任务
- 返回未尚未执行的任务
- 将线程池的状态置为
isShutdown
:线程池是否关闭isTerminated
:判断线程池关闭后所有的任务是否都执行完了awaitTermination
:阻塞,直到出现以下情况shutdown
调用后所有任务执行完成- 超时返回
- 当前线程中断
线程池钩子beforeExecute
实现一个可以暂停的线程池
public class PauseableThreadPool extends ThreadPoolExecutor {
public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
private boolean isPaused;
private final Lock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();
@Override
protected void beforeExecute(Thread t, Runnable r) {
lock.lock();
try {
while (isPaused) {
condition.await();
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
private void pause() {
lock.lock();
try {
isPaused = true;
} finally {
lock.unlock();
}
}
private void resume() {
lock.lock();
try {
isPaused = false;
condition.signalAll();
} finally {
lock.unlock();
}
}
}