Java ThreadPool 实现

引言

前面我们已经介绍了 JDK 中常用的并发库(JUC)的使用方式, 本文我们着重介绍 JUC 中 ThreadPool 的实现方式。所有关于 Java 并发的文章均收录于<Java并发系列文章>

ThreadPool 实现

在介绍 ThreadPoolExecutor 的实现时,我们着重介绍它的 execute 函数和shutdown,shutdownNow,在介绍之前,让我们来看一看 ThreadPoolExecutor 是如何维护内部数据的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
/**
* The main pool control state, ctl, is an atomic integer packing
* two conceptual fields
* workerCount, indicating the effective number of threads
* runState, indicating whether running, shutting down etc
*
* In order to pack them into one int, we limit workerCount to
* (2^29)-1 (about 500 million) threads rather than (2^31)-1 (2
* billion) otherwise representable. If this is ever an issue in
* the future, the variable can be changed to be an AtomicLong,
* and the shift/mask constants below adjusted. But until the need
* arises, this code is a bit faster and simpler using an int.
*
* The workerCount is the number of workers that have been
* permitted to start and not permitted to stop. The value may be
* transiently different from the actual number of live threads,
* for example when a ThreadFactory fails to create a thread when
* asked, and when exiting threads are still performing
* bookkeeping before terminating. The user-visible pool size is
* reported as the current size of the workers set.
*
* The runState provides the main lifecycle control, taking on values:
*
* RUNNING: Accept new tasks and process queued tasks
* SHUTDOWN: Don't accept new tasks, but process queued tasks
* STOP: Don't accept new tasks, don't process queued tasks,
* and interrupt in-progress tasks
* TIDYING: All tasks have terminated, workerCount is zero,
* the thread transitioning to state TIDYING
* will run the terminated() hook method
* TERMINATED: terminated() has completed
*
* The numerical order among these values matters, to allow
* ordered comparisons. The runState monotonically increases over
* time, but need not hit each state. The transitions are:
*
* RUNNING -> SHUTDOWN
* On invocation of shutdown(), perhaps implicitly in finalize()
* (RUNNING or SHUTDOWN) -> STOP
* On invocation of shutdownNow()
* SHUTDOWN -> TIDYING
* When both queue and pool are empty
* STOP -> TIDYING
* When pool is empty
* TIDYING -> TERMINATED
* When the terminated() hook method has completed
*
* Threads waiting in awaitTermination() will return when the
* state reaches TERMINATED.
*
* Detecting the transition from SHUTDOWN to TIDYING is less
* straightforward than you'd like because the queue may become
* empty after non-empty and vice versa during SHUTDOWN state, but
* we can only terminate if, after seeing that it is empty, we see
* that workerCount is 0 (which sometimes entails a recheck -- see
* below).
*/
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;

// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

通过注释,我们会发现,它使用了一个 int 来保存状态信息和当前的工作线程数,其中 32 位 int 的前3位用来保存状态,之后的 29 位保存了工作线程的数量。那么 ThreadPoolExecutor 都有哪些状态呢?

  • RUNNING: 接受新任务,并且正在处理任务,这是正常工作状态
  • SHUTDOWN: 不再接受新任务,但是会处理队列中的剩余任务,这是执行完 shutdown 接口之后的状态
  • STOP: 不再接受新任务,同时,也不再处理队列中的剩余任务,并且会打断所有进行中的任务,这是执行完 shutdownNow 接口之后的状态
  • TIDYING: 所有任务已经处理完,并且工作线程数为0,但是执行 terminated 回调函数之前
  • TERMINATED: 执行 terminated 回调函数之后, terminated 是 ThreadPoolExecutor 的一个函数,可以通过继承 ThreadPoolExecutor 来覆写该函数

明白了内部数据的组织方式之后,再来看 execute 的实现逻辑就清晰多了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
/**
* Executes the given task sometime in the future. The task
* may execute in a new thread or in an existing pooled thread.
*
* If the task cannot be submitted for execution, either because this
* executor has been shutdown or because its capacity has been reached,
* the task is handled by the current {@code RejectedExecutionHandler}.
*
* @param command the task to execute
* @throws RejectedExecutionException at discretion of
* {@code RejectedExecutionHandler}, if the task
* cannot be accepted for execution
* @throws NullPointerException if {@code command} is null
*/
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
// 首先它会获取当前内部数据 ctl,然后从中提取工作线程数(后29位),如果小于 corePoolSize,则创建新的线程
if (workerCountOf(c) < corePoolSize) {
// 线程数小于核心线程数时任务不入队,直接通过参数传递到 worker 线程中
if (addWorker(command, true))
// 创建成功的话,直接返回
return;
// 拉取最新内部数据 ctl
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
// 如果当前还处于运行状态,并且任务队列没满
int recheck = ctl.get();
// 因为查看状态时并没有用到锁,所以这里在检查一次如果当前状态不是运行中,就把任务从队列中删除,然后拒绝该任务
if (! isRunning(recheck) && remove(command))
reject(command);
// 否则,检查工作线程数是否为 0,是的话就添加一个工作线程,之所以会出现这种情况是因为在 workQueue.offer 执行之前可能最后一个线程被销毁了(考虑 keepAliveTime)
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 如果当前状态不是运行中,或者工作线程满了,入队失败时,当前任务也是参数传递到 worker 线程中
else if (!addWorker(command, false))
// 如果 addWorker 失败,既有可能是线程池已经停止,也有可能是线程数达到 maxThreadSize,无论是哪一种,都需要拒绝该任务
reject(command);
// 如果 addWorker 成功了,说明当前状态还是运行中,是工作队列满了,但是线程数没有达到 maxThreadSize
}
  1. 首先它会获取当前内部数据 ctl,然后从中提取工作线程数(后29位),如果小于 corePoolSize,则创建新的线程,线程数小于核心线程数时任务不入队,直接通过参数传递到 worker 线程中
    1. 创建成功的话,直接返回
    2. 否则,拉取最新内部数据 ctl
  2. 走到第二步有两个可能,一个是线程池可能已经停止工作了,也有可能线程数已经达到了 corePoolSize,所以在这里我们要分情况处理
    1. 如果当前还处于运行状态,并且任务队列没满,又分两种情况
      1. 因为查看状态时并没有用到锁,所以这里在检查一次如果当前状态不是运行中,就把任务从队列中删除,然后拒绝该任务
      2. 否则,检查工作线程数是否为 0,是的话就添加一个工作线程,之所以会出现这种情况是因为在 workQueue.offer 执行之前可能最后一个线程被销毁了
    2. 如果当前状态不是运行中,或者工作线程满了,通过 addWorker 的结果来决定到底该怎么做
      1. 如果 addWorker 成功了,说明当前状态还是运行中,是工作队列满了,但是线程数没有达到 maxThreadSize
      2. 如果 addWorker 失败,即有可能是线程池已经停止,也有可能是线程数达到 maxThreadSize,无论是哪一种,都需要拒绝该任务

execute 的整体实现就是对线程池执行规则的复现,其中用到了比较多的 CAS 操作,而不是通过加一个大锁,显然这样效率更好,而这里所说的 CAS 操作主要是指 addWorker 函数,那么它内部是怎么实现的呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
/**
* Checks if a new worker can be added with respect to current
* pool state and the given bound (either core or maximum). If so,
* the worker count is adjusted accordingly, and, if possible, a
* new worker is created and started, running firstTask as its
* first task. This method returns false if the pool is stopped or
* eligible to shut down. It also returns false if the thread
* factory fails to create a thread when asked. If the thread
* creation fails, either due to the thread factory returning
* null, or due to an exception (typically OutOfMemoryError in
* Thread.start()), we roll back cleanly.
*
* @param firstTask the task the new thread should run first (or
* null if none). Workers are created with an initial first task
* (in method execute()) to bypass queuing when there are fewer
* than corePoolSize threads (in which case we always start one),
* or when the queue is full (in which case we must bypass queue).
* Initially idle threads are usually created via
* prestartCoreThread or to replace other dying workers.
*
* @param core if true use corePoolSize as bound, else
* maximumPoolSize. (A boolean indicator is used here rather than a
* value to ensure reads of fresh values after checking other pool
* state).
* @return true if successful
*/
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
// 循环 CAS
int c = ctl.get();
int rs = runStateOf(c);

// Check if queue empty only if necessary.
// 如果调用过 shutdown 或者 shutdownNow 一般就不用创建工作线程了,但是这里要排除一种情况:
// 考虑到当前状态是 SHUTDOWN,并且 firstTask == null,就是execute中addWorker(null, false)的情况,这说明刚才有一个任务已经入队了,但是最后一个工作线程可能在 workQueue.offer 执行之前被销毁了
// 而且 !workQueue.isEmpty() 工作队列不等于空,这时候就需要创建一个线程来吧 workQueue 中剩下的任务处理完
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
// 通过了状态检查,这里我们通过 CAS 修改工作线程数
for (;;) {
int wc = workerCountOf(c);
// 检查线程数是否过多
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// cas 操作成功则跳到下一步
if (compareAndIncrementWorkerCount(c))
break retry;
// 如果线程池状态变了,重新检查状态
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// 走到这一步说明工作线程数已经成功+1,状态目前来看没问题
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 创建工作线程
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
// 这里使用到了一个锁 mainLock,它主要是用来保护所有工作线程的集合 workers,而且在执行shutdown 时也会持有该锁,所以这里在锁的保护下进行最终的状态确认
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
// 最后的状态检查,并加入到 workers 集合中,小于 SHUTDOWN 说明当前状态是 RUNING,或者 rs == SHUTDOWN && firstTask == null,这就是execute中addWorker(null, false)的情况
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 如果一切顺利则启动线程
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
// 否则,通过 CAS 进行工作线程数 -1,检查终止状态,并协助最终的状态转换
// 前两个操作比较好理解,这里所谓的状态转换是指从 SHUTDOWN -> TIDYING->TERMINATED的转换
// 在 SHUTDOWN 时,如果发现线程数为 0了,就开始状态转换
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}

/**
* Rolls back the worker thread creation.
* - removes worker from workers, if present
* - decrements worker count
* - rechecks for termination, in case the existence of this
* worker was holding up termination
*/
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)
workers.remove(w);
decrementWorkerCount();
// 检查当前的线程池状态和工作线程数,如果任务队列空了,并且线程数为0,就开始执行 terminated 回调
tryTerminate();
} finally {
mainLock.unlock();
}
}

addWorker 的工作流程如下:

  1. 通过一个循环进行状态检查并增加工作线程数
  2. 线程数增加成功,则开始创建实际的线程,创建好之后通过一把锁来进行最后的状态确认
    1. 如果状态 OK 则将其加入到工作线程集中,并启动线程
    2. 否则进行必要的清理工作

到此为止,线程池工作的协调工作部分就介绍完了,但是最核心工作线程 Worker 还没有讲,我们来看一看它是怎么实现的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
/**
* Class Worker mainly maintains interrupt control state for
* threads running tasks, along with other minor bookkeeping.
* This class opportunistically extends AbstractQueuedSynchronizer
* to simplify acquiring and releasing a lock surrounding each
* task execution. This protects against interrupts that are
* intended to wake up a worker thread waiting for a task from
* instead interrupting a task being run. We implement a simple
* non-reentrant mutual exclusion lock rather than use
* ReentrantLock because we do not want worker tasks to be able to
* reacquire the lock when they invoke pool control methods like
* setCorePoolSize. Additionally, to suppress interrupts until
* the thread actually starts running tasks, we initialize lock
* state to a negative value, and clear it upon start (in
* runWorker).
*/
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;

/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}

/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}

// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.

protected boolean isHeldExclusively() {
return getState() != 0;
}

protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}

protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}

public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }

void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}

可以看到 Worker 的内部实现了一个锁,为什么需要锁呢?我们需要通过它来保护运行中的任务,在执行 showdown的时候,我们是不能打断正在工作的线程的,所以在 showdown 过程中,打断工作线程前都需要尝试获取该锁,而且工作线程在执行任务时,也会一直持有该锁。

此外,还有一点就是 Worker 中的锁,初始是已加锁状态的,这是为什么呢?因为工作线程再加入到 Wrokers 工作线程集合时,线程还没有被 start,这时候如果执行 showdown(后面详细介绍这个该过程),可能会发生 interrupt 发生在 start 之前的情况。而如果 interrupt 发生在 start 之前,该线程的中断标志位并不会被置位,也就是说会丢失中断。关于 Interrupt 的实现在本文的最后有介绍,看到那大家就能理解了。

Worker 线程的实际工作代码在 runWorker 中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
/**
* Main worker run loop. Repeatedly gets tasks from queue and
* executes them, while coping with a number of issues:
*
* 1. We may start out with an initial task, in which case we
* don't need to get the first one. Otherwise, as long as pool is
* running, we get tasks from getTask. If it returns null then the
* worker exits due to changed pool state or configuration
* parameters. Other exits result from exception throws in
* external code, in which case completedAbruptly holds, which
* usually leads processWorkerExit to replace this thread.
*
* 2. Before running any task, the lock is acquired to prevent
* other pool interrupts while the task is executing, and then we
* ensure that unless pool is stopping, this thread does not have
* its interrupt set.
*
* 3. Each task run is preceded by a call to beforeExecute, which
* might throw an exception, in which case we cause thread to die
* (breaking loop with completedAbruptly true) without processing
* the task.
*
* 4. Assuming beforeExecute completes normally, we run the task,
* gathering any of its thrown exceptions to send to afterExecute.
* We separately handle RuntimeException, Error (both of which the
* specs guarantee that we trap) and arbitrary Throwables.
* Because we cannot rethrow Throwables within Runnable.run, we
* wrap them within Errors on the way out (to the thread's
* UncaughtExceptionHandler). Any thrown exception also
* conservatively causes thread to die.
*
* 5. After task.run completes, we call afterExecute, which may
* also throw an exception, which will also cause thread to
* die. According to JLS Sec 14.20, this exception is the one that
* will be in effect even if task.run throws.
*
* The net effect of the exception mechanics is that afterExecute
* and the thread's UncaughtExceptionHandler have as accurate
* information as we can provide about any problems encountered by
* user code.
*
* @param w the worker
*/
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
// 解锁,此后 shutdown 函数才能中断该线程
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 如果 firstTask 不等于 null,则不检查状态就开始试图执行
while (task != null || (task = getTask()) != null) { // getTask 中会检查状态,并从任务队列中拉取任务
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
// 这个地方设计的比较复杂,就像前面说的如果执行 firstTask 时被SHUTDOWN,那么这个没有进入任务队列的 firstTask 是需要正常执行完的,
// 但是 firstTask 执行前可能线程被中断了(调用了shutdown函数),这时候我们需要清除中断标志位才行,这样才能算正常执行,也就是下面的第一次 Thread.interrupted() 调用
// 而当我们执行 shutdownNow 时,线程池的状态是 STOP,又或者在我们刚才进行清除中断标志位之后线程池的状态变成了 STOP,并且当前中断标志位没有被有效的设置的话 !wt.isInterrupted(),
// 我们就要补上刚才误清的中断标志位,注意这里我们并没有直接结束线程,而是设置标志位并执行目标任务,让目标任务去决定遇到中断标志位时需要作出什么处理,而不是线程池直接掌管生杀大权
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
// 如果状态已经是 STOP,就打断自己, 只会设置中断标志位,任务还是会继续执行的
wt.interrupt();
try {
// 执行回调函数和目标任务
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 执行回调函数
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}

/**
* Performs blocking or timed wait for a task, depending on
* current configuration settings, or returns null if this worker
* must exit because of any of:
* 1. There are more than maximumPoolSize workers (due to
* a call to setMaximumPoolSize).
* 2. The pool is stopped.
* 3. The pool is shutdown and the queue is empty.
* 4. This worker timed out waiting for a task, and timed-out
* workers are subject to termination (that is,
* {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
* both before and after the timed wait, and if the queue is
* non-empty, this worker is not the last thread in the pool.
*
* @return task, or null if the worker must exit, in which case
* workerCount is decremented
*/
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?

for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// Check if queue empty only if necessary.
// 如果线程池已经STOP或者 SHUTDOWN状态时任务队列为空,就销毁线程
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}

int wc = workerCountOf(c);

// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 如果线程数过多或者超时,并且当前线程数>1或者任务队列为空,就销毁线程
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}

try {
// 如果带超时功能,allowCoreThreadTimeOut || wc > corePoolSize 则获取任务的最大超时时间是 keepAliveTime
// 否则,无限期等待
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}

/**
* Performs cleanup and bookkeeping for a dying worker. Called
* only from worker threads. Unless completedAbruptly is set,
* assumes that workerCount has already been adjusted to account
* for exit. This method removes thread from worker set, and
* possibly terminates the pool or replaces the worker if either
* it exited due to user task exception or if fewer than
* corePoolSize workers are running or queue is non-empty but
* there are no workers.
*
* @param w the worker
* @param completedAbruptly if the worker died due to user exception
*/
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();

final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
// 适时地进行状态转换 tryTerminate
tryTerminate();

int c = ctl.get();
if (runStateLessThan(c, STOP)) { // 状态还没到 STOP
if (!completedAbruptly) { // 不是因为异常终止,说明当时检查的时候没任务了
int min = allowCoreThreadTimeOut ? 0 : corePoolSize; // 计算当前线程池的最低线程数
if (min == 0 && ! workQueue.isEmpty())// 如果发现允许核心线程超时,并且任务队列又不为空了,要进行兜底
min = 1;
if (workerCountOf(c) >= min) // 当前线程是多余的就销毁
return; // replacement not needed
}
// 兜底发现线程不够了,重新恢复一个线程
addWorker(null, false);
}
}

RunWorker 的工作流程是:

  1. 先解锁,允许线程被中断
  2. 如果创建线程时有 firstTask 则优先执行首要任务,执行完成后,再检查当前线程池状态并从任务队列中拉取任务执行
  3. 如果任务队列中没有任务,那么当前线程会根据是否存在 keepAliveTime 来决定是通过 poll 进行计时等待还是,通过 take 进行持续等待
  4. 在获取到任务时,先会进行 Worker 的加锁,然后再开始执行任务,在执行任务前后还分别有 beforeExecute 和 afterExecute 回调,执行期间如果抛出任何异常,都会导致线程的销毁
  5. 当线程退出时,会进行必要的清理工作,比如维护工作线程数量等,最后适时地进行状态转换 tryTerminate

线程池的工作逻辑大致就是这样,最后我们来简单介绍一下关闭一个线程池的逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
/**
* Initiates an orderly shutdown in which previously submitted
* tasks are executed, but no new tasks will be accepted.
* Invocation has no additional effect if already shut down.
*
* <p>This method does not wait for previously submitted tasks to
* complete execution. Use {@link #awaitTermination awaitTermination}
* to do that.
*
* @throws SecurityException {@inheritDoc}
*/
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}

/**
* Interrupts threads that might be waiting for tasks (as
* indicated by not being locked) so they can check for
* termination or configuration changes. Ignores
* SecurityExceptions (in which case some threads may remain
* uninterrupted).
*
* @param onlyOne If true, interrupt at most one worker. This is
* called only from tryTerminate when termination is otherwise
* enabled but there are still other workers. In this case, at
* most one waiting worker is interrupted to propagate shutdown
* signals in case all threads are currently waiting.
* Interrupting any arbitrary thread ensures that newly arriving
* workers since shutdown began will also eventually exit.
* To guarantee eventual termination, it suffices to always
* interrupt only one idle worker, but shutdown() interrupts all
* idle workers so that redundant workers exit promptly, not
* waiting for a straggler task to finish.
*/
private void interruptIdleWorkers(boolean onlyOne) {
// mainLock 保护 workers 集合
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
// 先获取线程锁,然后再中断
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}

如果使用 shutdown 接口关闭线程池,处理的就比较温和:

  1. 先通过 CAS 修改状态为 SHOTDOWN
  2. 对所有工作线程先试着获取其 Worker 锁
    1. 如果成功拿到锁,说明它没有执行任务或者刚要执行 firstTask,如果没有执行任务直接 interrupt 并没有问题。而如果刚要执行 firstTask:
      1. 当状态是 SHUTDOWN 时显然我们需要让该线程处理完 firstTask 再销毁才符合规范,毕竟 firstTask 没有进入任务队列,这部分的逻辑实际上是在 runWorker 中控制的。
      2. 当状态至少为 STOP 时,runWorker 也没有直接停止任务的执行,只是设置了中断标志位,具体这个任务被中断后是需要停止还是硬着头皮执行是需要该任务内部决定的,线程池只管发中断,不应该控制”生杀大权”
    2. 如果没有拿到锁,说明任务正在执行中,让它继续执行

而 shutdownNow 的处理过程相较于 shutdown 就粗暴很多了。它不需要获取 Worker 锁而是直接执行线程的 interrupt 函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
/**
* Attempts to stop all actively executing tasks, halts the
* processing of waiting tasks, and returns a list of the tasks
* that were awaiting execution. These tasks are drained (removed)
* from the task queue upon return from this method.
*
* <p>This method does not wait for actively executing tasks to
* terminate. Use {@link #awaitTermination awaitTermination} to
* do that.
*
* <p>There are no guarantees beyond best-effort attempts to stop
* processing actively executing tasks. This implementation
* cancels tasks via {@link Thread#interrupt}, so any task that
* fails to respond to interrupts may never terminate.
*
* @throws SecurityException {@inheritDoc}
*/
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}

/**
* Interrupts all threads, even if active. Ignores SecurityExceptions
* (in which case some threads may remain uninterrupted).
*/
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}

最后我们简单说一说 ScheduledThreadPoolExecutor,它是在 ThreadPoolExecutor 的基础上额外实现了定时任务的功能,你可以简单地认为它的核心内容就是实现了一个 ThreadPoolExecutor 中使用到的阻塞队列,ScheduledThreadPoolExecutor 的内部实现了一个延时工作队列, 队列中的任务按照执行时间点排序(使用到二分查找),当第一个工作线程发现当前没有可执行的任务(下一个任务可能要在 N 秒之后执行)时,它会成为领头人线程,而其他线程都会在一个 Condition 上永久等待,而领头人线程会在该 Condition 上等待 N 秒,当等待超时或者有需要立刻执行的任务被添加时,领头人线程会苏醒过来并唤醒该 Condition 上的下一个线程。通过这个延时工作队列,ScheduledThreadPoolExecutor 达到了没有任务到期时,所有工作线程都在等待。有任务到期时,线程会立马唤醒并开始工作的效果。

参考内容

[1] linux 2.6 互斥锁的实现-源码分析
[2] 深入解析条件变量(condition variables)
[3] Linux下Condition Vairable和Mutext合用的小细节
[4] 从ReentrantLock的实现看AQS的原理及应用
[5] 不可不说的Java“锁”事
[6] 从源码层面解析yield、sleep、wait、park
[7] LockSupport中的park与unpark原理
[8] Thread.sleep、Object.wait、LockSupport.park 区别
[9] 从AQS到futex-二-JVM的Thread和Parker
[10] Java的LockSupport.park()实现分析
[11] JVM源码分析之Object.wait/notify实现
[12] Java线程源码解析之interrupt
[13] Thread.interrupt()相关源码分析
[14] Java CAS 原理剖析
[15] 源码解析 Java 的 compareAndSwapObject 到底比较的是什么
[16] 《Java并发编程的艺术》
[17] 《实战 Java 高并发程序设计》
[18] volatile关键字深入学习
[19] 为什么Netty的FastThreadLocal速度快
[20] 线程池ThreadPoolExecutor实现原理
[21] 深入理解Java线程池:ThreadPoolExecutor
[22] ConcurrentHashMap 详解一
[23] ConcurrentHashMap 详解二
[24] JUC中Atomic class之lazySet的一点疑惑
[25] The JSR-133 Cookbook for Compiler Writers
[26] 就是要你懂Java中volatile关键字实现原理

贝克街的流浪猫 wechat
您的打赏将鼓励我继续分享!
  • 本文作者: 贝克街的流浪猫
  • 本文链接: https://www.beikejiedeliulangmao.top/java/concurrent/thread-pool-impl/
  • 版权声明: 本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议,转载请注明出处!
  • 创作声明: 本文基于上述所有参考内容进行创作,其中可能涉及复制、修改或者转换,图片均来自网络,如有侵权请联系我,我会第一时间进行删除。