Java Thread 和 Park

引言

从本篇文章开始,将会是一些和 Java 多线程相关的杂谈,本篇文章主要介绍 JVM 对 JavaThread 实现,以及 Thread Park 的实现。所有关于 Java 并发的文章均收录于<Java并发系列文章>

多线程相关知识

Thread

HotSpot里的Thread类对应着一个OS的Thread, JavaThread类继承自Thread, 一个JavaThread实例对应着一个Java层的Thread。所以, Java层的每一个Thread在操作系统上对应一个thread, linux上就是一个轻量级task。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// thread.hpp
class Thread {
// OS data associated with the thread
OSThread* _osthread; // Platform-specific thread information
ParkEvent * _ParkEvent; // for synchronized(), wait
ParkEvent * _SleepEvent; // for Thread.sleep
// JSR166 per-thread parker
Parker* _parker; // for LockSupport::park
//...
};
class JavaThread: public Thread {
// 指向Java Thread实例, oop是HotSpot里指向一个Java level的实例, 一个gc对象.
oop _threadObj; // The Java level thread ,
JavaFrameAnchor _anchor; // Encapsulation of current java frame and it state
CompiledMethod* _deopt_nmethod; // CompiledMethod that is currently being deoptimized
//
volatile JavaThreadState _thread_state;
//...
};

Thread类里有两个ParkEvent和一个Parker, 其实ParkEvent和Parker实现和功能十分类似。一个 ParkEvent 是实现synchronized关键字,wait,notify 用的, 一个是给Thread.sleep用的。parker是用来实现J.U.C的park/unpark(阻塞/唤醒)。也就是说 ParkEvent 是用来处理多个线程竞争同一个资源的情况,而 parker 比较简单,是用来处理一个线程的阻塞,其他线程唤醒它的情况。

1
2
3
// A word of caution: The JVM uses 2 very similar constructs:
// 1. ParkEvent are used for Java-level "monitor" synchronization.
// 2. Parkers are used by JSR166-JUC park-unpark.

JavaThread和Java的线程一一对应, 成员变量oop _threadObj指向Java层的thread对象。OSThread是通过os::create_thread()创建, 最后还是调用POSIX phtread, glibc在linux平台上就是fork一个轻量级task。

1
2
3
4
5
//thread.cpp
os::create_thread(this, thr_type, stack_sz);
//linux_os.cpp
pthread_t tid;
int ret = pthread_create(&tid, &attr, (void* (*)(void*)) thread_native_entry, thread);

Parker

Parker 是用来实现LockSupport 的park 和unpark的。JDK中的LockSupport只是用来block(park, 阻塞)/unblock(unpark, 唤醒)线程, 线程队列的管理是JDK的AQS处理。就像前面所说的,Parker 用来处理一个线程的阻塞,其他线程唤醒它的情况。

从 Parker 的声明中,不难发现,它就是对 mutex 和 condition 的组合使用。

1
2
3
4
5
6
7
8
9
//park.hpp
class Parker : public os::PlatformParker { /*略...*/ };
//os_linux.hpp
class PlatformParker {
pthread_mutex_t _mutex[1];
//一个是给相对时间用,一个给绝对时间用
pthread_cond_t _cond[2]; // one for relative times and one for abs.
//...
};

我们继续往下看看 park 函数的实现,这里我们以 Linux 为例,简单地说,park 过程是:

  1. 通过 CAS 检查是否之前执行过 unpark,如果是则跳过等待直接返回,这样做可以少加一次锁速度更快
  2. 如果没有被打断,则获取mutex锁
  3. 拿到 mutex 锁后,在判断一下 _counter,是否别的线程执行了 unpark,如果是则跳过等待直接返回
  4. 调用 pthread_cond_wait 进入等待,并自动释放 mutex 锁
  5. 唤醒后,内核会自动帮我们重新获取 mutex 锁,所以我们将 _counter 改为 0 表示当前没有睡眠的线程
  6. 最后释放 mutex 锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
void Parker::park(bool isAbsolute, jlong time) {
// 如果别的线程已经unblock了我.
// 这里并没有拿到mutex的锁, 需要Atomic::xchg和barrier保证lock-free代码的正确。
// We depend on Atomic::xchg() having full barrier semantics
// since we are doing a lock-free update to _counter.
// 通过原子操作来提升性能,可以跳过 mutex 加锁
if (Atomic::xchg(0, &_counter) > 0) return;
// safepoint region相关
ThreadBlockInVM tbivm(jt);
// 如果别的线程正在unblock我, 而持有了mutex, 我先返回了,没有必要在_mutex上等
if (Thread::is_interrupted(thread, false) || pthread_mutex_trylock(_mutex) != 0) {
return;
}
// 如果别的线程已经unblock了我, no wait needed
// 已经拿到了mutex, 检查 _counter 大于 0 说明其他线程执行过 unpark,这里就可以跳过等待过程
int status;
if (_counter > 0) {
_counter = 0;
status = pthread_mutex_unlock(_mutex);
OrderAccess::fence();
return;
}
// 记录线程的状态
OSThreadWaitState osts(thread->osthread(), false /* not Object.wait() */);
jt->set_suspend_equivalent();
// cleared by handle_special_suspend_equivalent_condition() or java_suspend_self()
if (time == 0) {
_cur_index = REL_INDEX; // arbitrary choice when not timed
// 进入等待并自动释放 mutex 锁,这里没有通过 while 包裹 wait 过程,所以会出现伪唤醒问题
status = pthread_cond_wait(&_cond[_cur_index], _mutex);
} else {
_cur_index = isAbsolute ? ABS_INDEX : REL_INDEX;
// 进入等待并自动释放 mutex 锁,这里没有通过 while 包裹 wait 过程,所以会出现伪唤醒问题
status = pthread_cond_timedwait(&_cond[_cur_index], _mutex, &absTime);
}
_cur_index = -1;
// 已经从block住状态中恢复返回了, 把_counter设0.
_counter = 0;
status = pthread_mutex_unlock(_mutex);
// 要保证多线程的正确性要十二分小心
// 这里的memory fence 是一个lock addl 指令, 加上compiler_barrier
// 保证_counter = 0 是对其他线程是可见的.
// Paranoia to ensure our locked and lock-free paths interact
// correctly with each other and Java-level accesses.
OrderAccess::fence();
// 已经醒过来, 但如果有别人在suspend我,那么继续suspend自己.
// If externally suspended while waiting, re-suspend
if (jt->handle_special_suspend_equivalent_condition()) {
jt->java_suspend_self();
}
}

这里可能有些主攻 Java 的同学不太懂 Mutex 和 Condition 都是什么,起什么用,这里我简单的介绍一下。为了照顾这部分同学,我这里简单的介绍一下什么时 mutex 和 condition。

简单地说,mutex用于上锁,condition variable用于等待,这是两种不同类型的同步方式。

对于条件变量,它提供的操作接口主要为:

1
2
3
4
5
6
#include <pthread.h>
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
int pthread_cond_signal(pthread_cond_t *cond);
int pthread_cond_broadcast(pthread_cond_t *cond);
int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
// All return 0 on success, or a positive error number on error

pthread_cond_signal的作用是通知那些wait在cond上的线程,有事件达到。而pthread_cond_wait的作用实际上包含3部分:

  • 将与之绑定的mutex解锁(unlock)
  • 让自己进入等待状态,直至被signal唤醒
  • 在被唤醒之后再次拿到与之绑定的mutex

基于mutex + cond的生产者往往如下所示:

1
2
3
4
5
6
7
8
9
10
s = pthread_mutex_lock(&mtx);//加锁,执行生产动作
if (s != 0)
errExitEN(s, "pthread_mutex_lock");
_Event++; //实际的生产相应数据
s = pthread_mutex_unlock(&mtx);//完成生产之后,首先将mutex释放
if (s != 0)
errExitEN(s, "pthread_mutex_unlock");
s = pthread_cond_signal(&cond); //之后,再通过signal,通知消费者
if (s != 0)
errExitEN(s, "pthread_cond_signal");

不过我们这里的 park 先通过 cas 进行了检查,效率更好,思想类似于 Java 单例模式中的双重检查。

最后,我们看一下 unpark 的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
void Parker::unpark() {
int s, status ;
// 其实 unpark 这里也可以先通过一个 cas 判断是否 _counter 已经大于0,如果是就可以跳过 mutex 加锁过程,效率更高,稍后你会发现 ParkEvent 就是类似的做法
status = pthread_mutex_lock(_mutex);
assert (status == 0, "invariant") ;
s = _counter;
_counter = 1;
if (s < 1) {
// thread might be parked
if (_cur_index != -1) {
// thread is definitely parked
if (WorkAroundNPTLTimedWaitHang) {
status = pthread_cond_signal (&_cond[_cur_index]);
assert (status == 0, "invariant");
status = pthread_mutex_unlock(_mutex);
assert (status == 0, "invariant");
} else {
// must capture correct index before unlocking
int index = _cur_index;
status = pthread_mutex_unlock(_mutex);
assert (status == 0, "invariant");
status = pthread_cond_signal (&_cond[index]);
assert (status == 0, "invariant");
}
} else {
pthread_mutex_unlock(_mutex);
assert (status == 0, "invariant") ;
}
} else {
pthread_mutex_unlock(_mutex);
assert (status == 0, "invariant") ;
}
}

unpark 的处理流程就比较简单了,先拿 mutex 锁,然后通过 counter 是否小于1来决定是否进行 condigion signal 唤醒线程。这里大家会发现,unpark会根据 WorkAroundNPTLTimedWaitHang 来决定是先发送唤醒信号在解锁,还是先解锁再发送唤醒信号。这是为什么呢?

这里还有一点隐晦的知识,就是生产者在使用 mutex 和 condition 时,是先 unlock 再 signal 好呢,还是先 signal 再 unlock 好呢?

分析一下,如果先放锁,后signal,释放锁之后,如果有等待线程,可能pthread_cond_signal还没运行就发生了线程切换;这时候,可能其他线程会尝试获取锁,并把 mutex 保护的资源消耗掉,当 condition 中等待的线程被唤醒时,发现已经没有资源了,就得重新进入等待。

而如果 先signal, 后释放锁:signal之后,等待线程可以马上运行,但由于无法获取锁,会马上进入waiting状态。这个过程可能涉及上下文切换。但是,在 linux 平台或者 Native POSIX Thread Library(NPTL)上时,对 signal 的实现有一个优化,因为 mutex 和 condition 各有一个等待队列,当一个处于 condition 等待队列的线程被唤醒时,不会直接让它处于运行状态(即返回用户空间),而是把它丢到 mutex 的等待队列中,这样就不会有性能损耗。

正因为如此 WorkAroundNPTLTimedWaitHang 在linux上的默认值为 true,所以在 linux 上就会先signal, 后释放锁。

看到这大家一定很乱,怎么到处都是队列,怎么 mutex 里也有队列,condition 也是一个等待队列。但是大家仔细想一下,mutex 是一个锁,有锁就势必得有一个队列来保存哪些线程在等待这个锁,这很正常。而 condition 是一个等待+通知的工具,用来控制线程等待某一条件满足时立刻执行后续的流程,所以我们也必须用一个队列来保存所有处于等待中的线程。而 condition 自身没有锁的功能,只有等待和唤醒的功能,所以要想使用它,需要配合 mutex 锁实现。

Mutex

看到这,不知道大家和我一样有一个疑问,mutex 到底是怎么实现的,操作系统能够使用的只有 cas 怎么仅通过 cas 来实现一个自带等待唤醒机制的重量级锁?

下面结合代码来分析一下在X86体系结构下,互斥锁的实现原理。下面是互斥锁所使用到的数据结构。

1
2
3
4
5
struct mutex {
atomic_t count; //引用计数器,1: 可以利用, 小于等于0:该锁已被获取,需要等待
spinlock_t wait_lock;//自旋锁类型,保证多cpu下,对等待队列访问是安全的。
struct list_head wait_list; //等待队列,如果该锁被获取,任务将挂在此队列上,等待调度。
};

加锁流程的核心实现内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
static inline int __sched
__mutex_lock_common(struct mutex *lock, long state, unsigned int subclass,unsigned long ip)
{
//获取当前进程的task_struct的地址
struct task_struct *task = current;
struct mutex_waiter waiter;
unsigned int old_val;
unsigned long flags;

//对该锁上的等待队列加自旋锁,防止多个CPU的情况。
spin_lock_mutex(&lock->wait_lock, flags);

//将该任务添加到该锁的等待队列上
list_add_tail(&waiter.list, &lock->wait_list);
waiter.task = task;

//用一条汇编指令对count进行赋值,lock->count=-1,保证该操作在一个cpu上是原子的
old_val = atomic_xchg(&lock->count, -1);

//如果lock->count之前的值为1,说明是可以获取锁的
if (old_val == 1)
goto done;
lock_contended(&lock->dep_map, ip);

for (;;) {
//在这个地方,又尝试去获取锁,处理方式如上。
old_val = atomic_xchg(&lock->count, -1);
if (old_val == 1)
break;

//如果该进程是可中断的,或者该进程是可kiilable的,如果有信号被递送到该任务,那么该进程将从等待队列中移除
if (unlikely((state == TASK_INTERRUPTIBLE &&signal_pending(task)) ||(state == TASK_KILLABLE &&fatal_signal_pending(task)))) {
mutex_remove_waiter(lock, &waiter,task_thread_info(task));
mutex_release(&lock->dep_map, 1, ip);
spin_unlock_mutex(&lock->wait_lock, flags);
debug_mutex_free_waiter(&waiter);
//返回被信号中断
return -EINTR;
}
__set_task_state(task, state);

//如果还不能获取锁,则将自旋锁解除,当从schedule返回时再次获取自旋锁,重复如上操作。
spin_unlock_mutex(&lock->wait_lock, flags);
schedule();
spin_lock_mutex(&lock->wait_lock, flags);
}
//表示已经获取了锁
done:
lock_acquired(&lock->dep_map);
//将该任务从等待队列中删除
mutex_remove_waiter(lock, &waiter, task_thread_info(task));
debug_mutex_set_owner(lock, task_thread_info(task));
//如果等待队列为空将lock->count置为0
if (likely(list_empty(&lock->wait_list)))
atomic_set(&lock->count, 0);
spin_unlock_mutex(&lock->wait_lock, flags);
debug_mutex_free_waiter(&waiter);
return 0;
}

看到这不知道大家有没有捋清楚 mutex 加锁的逻辑,首先通过自旋锁获取等待队列 wait_lock 的锁,这是第一个锁,当任何一个 cpu 拿到锁之后,其他 cpu 就会一直卡在自旋了。而同一个 cpu 下,也可能出现时钟中断导致的调度,所以即便在同一个 cpu 下,我们仍然需要 atomic_xchg 对mutex的状态 lock->count 进行原子的设置。

  • 如果设置之前mutex 的状态为未锁定,那么说明当前cpu的当前线程加锁成功,释放等待队列的自旋锁即可。
  • 否则,释放等待队列的自旋锁,并进行调度,当下一次该线程重新执行时,会重新尝试自旋锁获取等待队列 wait_lock 的锁,然后通过 CAS 修改mutex 锁状态,并一直重复这个过程,直到成功获取为止

那么解锁的过程是什么样的呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
static inline void
__mutex_unlock_common_slowpath(atomic_t *lock_count, int nested)
{
//通过结构的成员地址,获取该结构地址
struct mutex *lock = container_of(lock_count, struct mutex, count);

unsigned long flags;
//为等待队列加自旋锁
spin_lock_mutex(&lock->wait_lock, flags);
mutex_release(&lock->dep_map, nested, _RET_IP_);
debug_mutex_unlock(lock);
if (__mutex_slowpath_needs_to_unlock())
atomic_set(&lock->count, 1);

//先看看等待队列是不是为空了,如果已经为空,不需要做任何处理,否则将该等待队列上面的队首进程唤醒
if (!list_empty(&lock->wait_list)) {
struct mutex_waiter *waiter =list_entry(lock->wait_list.next,struct mutex_waiter, list);
debug_mutex_wake_waiter(lock, waiter);
wake_up_process(waiter->task);
}

debug_mutex_clear_owner(lock);
spin_unlock_mutex(&lock->wait_lock, flags);

}

在进行mutex锁的释放时,同样先会尝试获取等待队列 wait_lock 自旋锁,然后修改mutex锁状态 lock->count,最后检查当前等待队列是否为空,如果不为空则尝试唤醒等待队列的队首线程, 然后释放 wait_lock 自旋锁。

这里大家要注意的是,因为加锁过程是将当前进程加入等待队列后先释放 wait_lock 自旋锁再调度,而解锁过程是如果等待队列不为空就尝试唤醒队首进程然后解锁。所以就有可能唤醒过程执行结束之后,队首的线程才刚执行完调度。这种情况只会导致队首等待线程获取 mutex 锁不是那么实时,没有什么大问题,毕竟正常一个线程的调度也是很快的。

至此,sleep->park->unpark->condition->mutex 这一串技术的实现就介绍完了,总结一下 park 使用 condition 来实现等待和唤醒的功能,通过 mutex 来保护 condition 的使用以及 park 的内部状态 _counter(类似于执行令牌),而 mutex 又是通过一个等待队列,一个自旋锁,以及一个描述当前是否处于锁定状态的原子属性 lock->count 来实现。整个这一套流程有点乱,没理清的同学可以整理一下思路。

ParkEvent

之前我们说过,ParkEvent 和 Parker 功能基本类似,那么它们到底有啥不同呢?这里我们简单地介绍一下它。先来看看 ParkEventpark 函数,它是怎么实现的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
int os::PlatformEvent::park(jlong millis) {
guarantee (_nParked == 0, "invariant") ;

int v ;
for (;;) {
v = _Event ;
if (Atomic::cmpxchg (v-1, &_Event, v) == v) break ; // cas 设置 _Event 减一
}
guarantee (v >= 0, "invariant") ;
if (v != 0) return OS_OK ; // 如果最初的 _Event 不等于 0,会直接跳出不进行休眠,因为 os::PlatformEvent::unpark 的时候会设置_Event=1
// 也就是说 _Event 相当于一个令牌,默认值为 0,但是如果在 park 之前执行了 unpark,令牌就会为 1,休眠过程直接跳过

// We do this the hard way, by blocking the thread.
// Consider enforcing a minimum timeout value.
struct timespec abst;
compute_abstime(&abst, millis); // 计算绝对时间

int ret = OS_TIMEOUT;
int status = pthread_mutex_lock(_mutex); // 加 mutex 锁
assert_status(status == 0, status, "mutex_lock");
guarantee (_nParked == 0, "invariant") ;
++_nParked ;

// Object.wait(timo) will return because of
// (a) notification
// (b) timeout
// (c) thread.interrupt
//
// Thread.interrupt and object.notify{All} both call Event::set.
// That is, we treat thread.interrupt as a special case of notification.
// The underlying Solaris implementation, cond_timedwait, admits
// spurious/premature wakeups, but the JLS/JVM spec prevents the
// JVM from making those visible to Java code. As such, we must
// filter out spurious wakeups. We assume all ETIME returns are valid.
//
// TODO: properly differentiate simultaneous notify+interrupt.
// In that case, we should propagate the notify to another waiter.

while (_Event < 0) { // 当令牌不足时,会循环进入等待状态
status = os::Linux::safe_cond_timedwait(_cond, _mutex, &abst);// 调用该函数后会自动释放mutex 锁
if (status != 0 && WorkAroundNPTLTimedWaitHang) {
pthread_cond_destroy (_cond);
pthread_cond_init (_cond, os::Linux::condAttr()) ;
}
assert_status(status == 0 || status == EINTR ||
status == ETIME || status == ETIMEDOUT,
status, "cond_timedwait");
if (!FilterSpuriousWakeups) break ; // previous semantics
if (status == ETIME || status == ETIMEDOUT) break ;
// We consume and ignore EINTR and spurious wakeups.
}
--_nParked ;
if (_Event >= 0) {
ret = OS_OK;
}
_Event = 0 ;
status = pthread_mutex_unlock(_mutex); // 从 safe_cond_timedwait 返回后会重新获得 mutex锁,所以这里要进行释放
assert_status(status == 0, status, "mutex_unlock");
assert (_nParked == 0, "invariant") ;
// Paranoia to ensure our locked and lock-free paths interact
// correctly with each other.
// 加入内存屏障
OrderAccess::fence();
return ret;
}

其中 os:: Linux::safe_cond_timedwait 调用了pthread_cond_timedwait函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
int os::Linux::safe_cond_timedwait(pthread_cond_t *_cond, pthread_mutex_t *_mutex, const struct timespec *_abstime)
{
if (is_NPTL()) {
return pthread_cond_timedwait(_cond, _mutex, _abstime);
} else {
// 6292965: LinuxThreads pthread_cond_timedwait() resets FPU control
// word back to default 64bit precision if condvar is signaled. Java
// wants 53bit precision. Save and restore current value.
int fpu = get_fpu_control_word();
int status = pthread_cond_timedwait(_cond, _mutex, _abstime);
set_fpu_control_word(fpu);
return status;
}
}

可以看到 ParkEvent 的park函数和 Parker 的park函数很像,先是通过 cas 修改了 _Event,然后根据_Event 的原始值决定是不是要加 mutex 锁和睡眠。其实,JVM park 的这个思路很类似于 java 实现单例的双重检查模式,因为第一次通过 cas 的检查,如果发现有令牌就不加锁,不等待了,毕竟加锁和等待都是很重的过程,可能会被阻塞,而 cas 很轻很快,效率更高。

然后在一个循环中执行 park,这一点和 Parker 不太一样,在 Parker 中没有这一层循环,这是因为 Parker 调用 Parker::park 函数的只有一个线程,就是 parker 对象从属的线程,所以当它被唤醒时资源的使用不存在竞争,而 ParkEvent 则不同,ParkEvent 用来实现 wait 和 monitor 锁,所以是会出现多个线程都在等待,当多个线程都被唤醒时就要通过判断当前令牌是否已经被别人抢走来决定之后的处理流程。如果令牌被别的线程抢走了,自己就继续睡眠,否则返回用户代码。但是,即便如此 Parker::park 可能也会有虚假唤醒(spurious wakeup)的情况发生。

在linux对条件变量的描述认为spurious wakeup是允许的, 也就是说在 linux 中,即使没有线程broadcast 或者signal条件变量,wait也可能偶尔返回。举个例子,pthread 的条件变量等待 pthread_cond_wait 是使用阻塞的系统调用实现的(比如 Linux 上的 futex),这些阻塞的系统调用在进程被信号中断后,通常会中止阻塞、直接返回 EINTR 错误,这一点我们在 Linux系列文章介绍过。同样是阻塞系统调用,你从 read 拿到 EINTR 错误后可以直接决定重试,因为这通常不影响它本身的语义。而条件变量等待则不能,因为本线程拿到 EINTR 错误和重新调用 futex 等待之间,可能别的线程已经通过 pthread_cond_signal 或者 pthread_cond_broadcast发过通知了。 所以,虚假唤醒的一个可能性是条件变量的等待被信号中断。
David R. Butenhof 认为认为完全消除虚假唤醒本质上会降低了条件变量的操作性能,因为虚假唤醒发生的概率发生很小。
正因为如此,Parker 的上层函数 LockSupport::park 明确指出了该函数的返回可能,其中一条就是 “The call spuriously (that is, for no reason) returns.”,不过可能是觉得这没有什么影响,只是声明了出来。

看到这大家应该也猜到了 jvm 对 unpark 的实现,肯定也是先 cas 比对,看看有没有必要进行唤醒,有过有必要,再加锁并发送信号 pthread_cond_signal。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
void os::PlatformEvent::unpark() {
// Transitions for _Event:
// 0 :=> 1
// 1 :=> 1
// -1 :=> either 0 or 1; must signal target thread
// That is, we can safely transition _Event from -1 to either
// 0 or 1. Forcing 1 is slightly more efficient for back-to-back
// unpark() calls.
// See also: "Semaphores in Plan 9" by Mullender & Cox
//
// Note: Forcing a transition from "-1" to "1" on an unpark() means
// that it will take two back-to-back park() calls for the owning
// thread to block. This has the benefit of forcing a spurious return
// from the first park() call after an unpark() call which will help
// shake out uses of park() and unpark() without condition variables.

if (Atomic::xchg(1, &_Event) >= 0) return;

// Wait for the thread associated with the event to vacate
int status = pthread_mutex_lock(_mutex);
assert_status(status == 0, status, "mutex_lock");
int AnyWaiters = _nParked;
assert(AnyWaiters == 0 || AnyWaiters == 1, "invariant");
if (AnyWaiters != 0 && WorkAroundNPTLTimedWaitHang) {
AnyWaiters = 0;
pthread_cond_signal(_cond);
}
status = pthread_mutex_unlock(_mutex);
assert_status(status == 0, status, "mutex_unlock");
if (AnyWaiters != 0) {
status = pthread_cond_signal(_cond);
assert_status(status == 0, status, "cond_signal");
}

果不其然,这里先通过 cas 将 _Event 修改为 1,如果_Event 之前是 -1,则说明可能有线程处于等待状态,或者将要进行等待,所以后面必须通过 mutex 锁来控制“唤醒过程”和“等待过程”互斥并且原子的进行。

参考内容

[1] linux 2.6 互斥锁的实现-源码分析
[2] 深入解析条件变量(condition variables)
[3] Linux下Condition Vairable和Mutext合用的小细节
[4] 从ReentrantLock的实现看AQS的原理及应用
[5] 不可不说的Java“锁”事
[6] 从源码层面解析yield、sleep、wait、park
[7] LockSupport中的park与unpark原理
[8] Thread.sleep、Object.wait、LockSupport.park 区别
[9] 从AQS到futex-二-JVM的Thread和Parker
[10] Java的LockSupport.park()实现分析
[11] JVM源码分析之Object.wait/notify实现
[12] Java线程源码解析之interrupt
[13] Thread.interrupt()相关源码分析
[14] Java CAS 原理剖析
[15] 源码解析 Java 的 compareAndSwapObject 到底比较的是什么
[16] 《Java并发编程的艺术》
[17] 《实战 Java 高并发程序设计》
[18] volatile关键字深入学习
[19] 为什么Netty的FastThreadLocal速度快
[20] 线程池ThreadPoolExecutor实现原理
[21] 深入理解Java线程池:ThreadPoolExecutor
[22] ConcurrentHashMap 详解一
[23] ConcurrentHashMap 详解二
[24] JUC中Atomic class之lazySet的一点疑惑
[25] The JSR-133 Cookbook for Compiler Writers
[26] 就是要你懂Java中volatile关键字实现原理

贝克街的流浪猫 wechat
您的打赏将鼓励我继续分享!
  • 本文作者: 贝克街的流浪猫
  • 本文链接: https://www.beikejiedeliulangmao.top/java/concurrent/thread-park/
  • 版权声明: 本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!
  • 创作声明: 本文基于上述所有参考内容进行创作,其中可能涉及复制、修改或者转换,图片均来自网络,如有侵权请联系我,我会第一时间进行删除。