Java Interrupt

引言

本文中,我们主要介绍一下 Java 提供的 Interrupt 机制的常见使用方法,以及 Interrupt 的实现。所有关于 Java 并发的文章均收录于<Java并发系列文章>

Interrupt

在 Java 中,如果我们想要操作一个线程的执行状态,可以直接调用 Thread::stop 停止它的运行,但是这样显然有些粗暴,如果线程正处于临界区进行数据修改,可能会导致数据的错乱。同样,我们也可以通过 Thread::suspendThread::resume 来暂停和恢复一个线程的运行。

但是上述的这些方式,都是在比较下层的方式来控制一个线程的运行,不够优雅不说,还存在潜在风险。所以从 JDK1.2 开始这些方法就已经被废弃了。取而代之的,是一个新的 Thread::runterupt 接口,它是一个交互式的接口,为什么这么说呢?因为它没有 Thread::stop 那么强硬直接从底层销毁线程,相反的它以类似于通知的方式告诉线程,现在希望你停止下来,具体停不停,以什么方式停下来,都由线程方自行决定。

你既可以完全忽略中断通知:

1
2
3
4
5
6
while (true) {
try {
// do some thing
} catch (InterruptedException ignored) {
}
}

也可以接收到中断请求后,立马停止。

1
2
3
4
5
6
7
while (!Thread.interrupted()) {
try {
// do some thing
} catch (InterruptedException ignored) {
break;
}
}

使用过 Thread::interrupt 可能会发现一个“怪”现象,当一个线程被中断时,有的时候线程内会产生 InterruptedException,有的时候啥都没有,这是为什么呢?这是因为,当我们执行不同的代码时,interrupt 的处理结果并不相同:

  • 对于大部分阻塞线程的方法,使用Thread.interrupt(),可以立刻退出等待,抛出InterruptedException 这些方法包括Object.wait(), Thread.join(),Thread.sleep(),以及各种AQS衍生类:Lock.lockInterruptibly()等任何显示声明throws InterruptedException的方法。
  • 被阻塞的nio Channel也会响应interrupt(),抛出ClosedByInterruptException,相应nio通道需要实现java.nio.channels. InterruptibleChannel接口。
  • 如果使用的是传统IO(非Channel,如ServerSocket.accept),所在线程被interrupt时不会抛出ClosedByInterruptException,只会修改 Thread 的 interrupt 标志位。。但可以使用流的close方法实现退出阻塞。
  • 还有一些阻塞方法不会响应interrupt,如等待进入synchronized段、Lock.lock()。他们不能被动的退出阻塞状态,只会修改 Thread 的 interrupt 标志位。
  • 一般的数字运算,内存操作也不会抛出 InterruptException,只会修改 Thread 的 interrupt 标志位。

看到这大家肯定有疑问,怎么规则这么多啊,interrupt 到底是怎么实现的啊。带着这些疑问,我们深入到 Thread::interrupt 的源码中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//Class java.lang.Thread
public void interrupt() {
if (this != Thread.currentThread())
checkAccess();
synchronized (blockerLock) {
Interruptible b = blocker; //中断触发器
if (b != null) {
interrupt0();
b.interrupt(this); //触发回调接口
return;
}
}
interrupt0();
}

private native void interrupt0();

从代码中可以看到,在执行 interrupt 时,首先会进行权限检查,如果被中断的线程属于系统线程组(即JVM线程),checkAccess()方法会使用系统的System.getSecurityManager()来判断权限。由于Java默认没有开启安全策略,此方法其实会跳过安全检查。

然后,如果中断触发器 broker 不为空,则会先执行 interrupt0 这个原生函数,然后触发 broker 的回调函数。这里我们先介绍一下 blocker 可以用来干什么。前面我们说了,被阻塞的nio Channel也会响应interrupt(),抛出ClosedByInterruptException。这个就是通过 broker 实现的。具体的实现在InterruptibleChannel接口的抽象实现类AbstractInterruptibleChannel的方法begin()中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
//Class java.nio.channels.spi.AbstractInterruptibleChannel
protected final void begin() {
if (interruptor == null) {
interruptor = new Interruptible() {
public void interrupt(Thread target) {
synchronized (closeLock) {
if (!open)
return;
open = false;
interrupted = target;
try {//关闭io通道
AbstractInterruptibleChannel.this.implCloseChannel();
} catch (IOException x) { }
}
}};
}
blockedOn(interruptor);//将interruptor设置为当前线程的blocker
Thread me = Thread.currentThread();
if (me.isInterrupted())
interruptor.interrupt(me);
}

protected final void end(boolean completed) throws AsynchronousCloseException {
blockedOn(null);
Thread interrupted = this.interrupted;
if (interrupted != null && interrupted == Thread.currentThread()) {
interrupted = null;
throw new ClosedByInterruptException();
}
if (!completed && !open)
throw new AsynchronousCloseException();
}

AbstractInterruptibleChannel 有两个函数,begin 负责将 Interruptible 回调器注入到 Thread 中,当前线程被打断时直接断开连接。end 函数负责检查当前线程是否被中断,如果是的话,则抛出 ClosedByInterruptException 异常。

1
2
3
4
5
6
7
8
9
10
11
//Class java.nio.channels.Channels.ReadableByteChannelImpl
public int read(ByteBuffer dst) throws IOException {
......
try {
begin();
bytesRead = in.read(buf, 0, bytesToRead);
finally {
end(bytesRead > 0);
}
......
}

nio通道ReadableByteChannel每次执行阻塞方法read()前,都会执行begin(),把Interruptible回调接口注册到当前线程上,以实现能够响应其他线程的中断。当线程收到中断时,Thread.interrupt()触发回调接口,在回调接口Interruptible中关闭io通道并返回,最后在finally块中执行end(),end()方法会检查中断标记,抛出ClosedByInterruptException。

之前在介绍Thread的时候曾经提到,JavaThread有三个成员变量,这三个变量就是 interrupt 实现的关键:

1
2
3
4
5
6
7
//用于synchronized同步块和Object.wait()
ParkEvent * _ParkEvent ;
//用于Thread.sleep()
ParkEvent * _SleepEvent ;
//用于unsafe.park()/unpark(),供java.util.concurrent.locks.LockSupport调用,
//因此它支持了java.util.concurrent的各种锁、条件变量等线程同步操作,是concurrent的实现基础
Parker* _parker;

接下来,我们看一下原生方法 interrupt0 的实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14

JVM_ENTRY(void, JVM_Interrupt(JNIEnv* env, jobject jthread))
JVMWrapper("JVM_Interrupt");
// Ensure that the C++ Thread and OSThread structures aren't freed before we operate
oop java_thread = JNIHandles::resolve_non_null(jthread);
MutexLockerEx ml(thread->threadObj() == java_thread ? NULL : Threads_lock);
// We need to re-resolve the java_thread, since a GC might have happened during the
// acquire of the lock
// 当调用 Thread::start 之后 JavaThread实例才会被创建,所以如果 Interrupt 发生在thread start 之前,是不会起到中断效果的
JavaThread* thr = java_lang_Thread::thread(JNIHandles::resolve_non_null(jthread));
if (thr != NULL) {
Thread::interrupt(thr);
}
JVM_END

JVM_Interrupt对参数进行了校验,然后直接调用Thread::interrupt:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
void os::interrupt(Thread* thread) {
assert(Thread::current() == thread || Threads_lock->owned_by_self(),
"possibility of dangling Thread pointer");
//获取系统native线程对象
OSThread* osthread = thread->osthread();
if (!osthread->interrupted()) {
osthread->set_interrupted(true);
//内存屏障,使osthread的interrupted状态对其它线程立即可见
OrderAccess::fence();
//前文说过,_SleepEvent用于Thread.sleep,线程调用了sleep方法,则通过unpark唤醒
ParkEvent * const slp = thread->_SleepEvent ;
if (slp != NULL) slp->unpark() ;
}
//_parker用于concurrent相关的锁,此处同样通过unpark唤醒
if (thread->is_Java_thread())
((JavaThread*)thread)->parker()->unpark();
//synchronized同步块和Object.wait() 唤醒
ParkEvent * ev = thread->_ParkEvent ;
if (ev != NULL) ev->unpark() ;
}

简单地说,interrupt 底层不仅仅会修改线程的中断标志位 osthread->set_interrupted(true) ,当该线程处于等待状态时,还会试图唤醒该线程。无论是它在 sleep 中,在 park 中,还是在 wait 中,或者正在获取 monitor 锁。关于 sleep,park和wait,我们前面已经介绍过了,当被唤醒时,如果发现线程的 interrupt 标志位被置位,就会抛出异常。

那 monitor 锁也是阻塞在 park 函数上的,它被唤醒之后怎么不抛出异常并返回呢?实际上这是实现策略决定的,如果是synchronized等待事件,被唤醒后会尝试获取锁,如果失败则会通过循环继续park()等待,因此synchronized等待实际上是不会被interrupt()中断的。

和 monitor 锁类似,通过 AQS 实现的锁的 lock 接口也是使用了同样的策略。在检测到中断时,不抛出异常继续尝试获得锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/**
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
*
* @param node the node
* @param arg the acquire argument
* @return {@code true} if interrupted while waiting
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

/**
* Convenience method to park and then check if interrupted
*
* @return {@code true} if interrupted
*/
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}

但是,AQS 也提供了可中断的加锁 api doAcquireInterruptibly , 它的实现就是检测到中断,就抛出异常。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* Acquires in exclusive interruptible mode.
* @param arg the acquire argument
*/
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}

关于 Interrupt 的实现,想必大家应该已经比较清楚了,那么我们应该如何正确的使用 Interrupt 接口,如果妥善的处理 InterruptedException 呢?

  • 如果自己很清楚当前线程被中断后的处理方式:则按自己的方式处理,通常是做好善后工作,主动退出线程
  • 自己不知道外层业务需要达到什么目的:直接在方法声明中throws InterruptedException,丢给上层处理,这种方式也很常见,将中断的处置权交给具体的业务来处理
  • 自己有需要清理的数据,同时外层函数可能也需要清理一些数据:通过 Thread::interrupted 清除标志位(如果捕获到 InterruptedException 说明标志位已经被清除,就不要调用 Thread::interrupted ),然后再抛出新的异常给上一层,或者通过 Thread::interupt 重新标记中断标志位等。

Thread 的 Interrupt 相关接口有3个,使用起来比较容易混淆,这里简单地帮大家区分一下:

  • interupt:中断指定线程。
  • isInterrupted:检查该线程是否被中断,但是不重置中断标志位。
  • interrupted:检查该线程是否被中断,同时重置中断标志位。

参考内容

[1] linux 2.6 互斥锁的实现-源码分析
[2] 深入解析条件变量(condition variables)
[3] Linux下Condition Vairable和Mutext合用的小细节
[4] 从ReentrantLock的实现看AQS的原理及应用
[5] 不可不说的Java“锁”事
[6] 从源码层面解析yield、sleep、wait、park
[7] LockSupport中的park与unpark原理
[8] Thread.sleep、Object.wait、LockSupport.park 区别
[9] 从AQS到futex-二-JVM的Thread和Parker
[10] Java的LockSupport.park()实现分析
[11] JVM源码分析之Object.wait/notify实现
[12] Java线程源码解析之interrupt
[13] Thread.interrupt()相关源码分析
[14] Java CAS 原理剖析
[15] 源码解析 Java 的 compareAndSwapObject 到底比较的是什么
[16] 《Java并发编程的艺术》
[17] 《实战 Java 高并发程序设计》
[18] volatile关键字深入学习
[19] 为什么Netty的FastThreadLocal速度快
[20] 线程池ThreadPoolExecutor实现原理
[21] 深入理解Java线程池:ThreadPoolExecutor
[22] ConcurrentHashMap 详解一
[23] ConcurrentHashMap 详解二
[24] JUC中Atomic class之lazySet的一点疑惑
[25] The JSR-133 Cookbook for Compiler Writers
[26] 就是要你懂Java中volatile关键字实现原理

贝克街的流浪猫 wechat
您的打赏将鼓励我继续分享!
  • 本文作者: 贝克街的流浪猫
  • 本文链接: https://www.beikejiedeliulangmao.top/java/concurrent/java-Interrupt/
  • 版权声明: 本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!
  • 创作声明: 本文基于上述所有参考内容进行创作,其中可能涉及复制、修改或者转换,图片均来自网络,如有侵权请联系我,我会第一时间进行删除。