// thread.hpp classThread { // OS data associated with the thread OSThread* _osthread; // Platform-specific thread information ParkEvent * _ParkEvent; // for synchronized(), wait ParkEvent * _SleepEvent; // for Thread.sleep // JSR166 per-thread parker Parker* _parker; // for LockSupport::park //... }; classJavaThread:public Thread { // 指向Java Thread实例, oop是HotSpot里指向一个Java level的实例, 一个gc对象. oop _threadObj; // The Java level thread , JavaFrameAnchor _anchor; // Encapsulation of current java frame and it state CompiledMethod* _deopt_nmethod; // CompiledMethod that is currently being deoptimized // volatile JavaThreadState _thread_state; //... };
// A word of caution: The JVM uses 2 very similar constructs: // 1. ParkEvent are used for Java-level "monitor" synchronization. // 2. Parkers are used by JSR166-JUC park-unpark.
//park.hpp classParker :public os::PlatformParker { /*略...*/ }; //os_linux.hpp classPlatformParker { pthread_mutex_t _mutex[1]; //一个是给相对时间用,一个给绝对时间用 pthread_cond_t _cond[2]; // one for relative times and one for abs. //... };
我们继续往下看看 park 函数的实现,这里我们以 Linux 为例,简单地说,park 过程是:
通过 CAS 检查是否之前执行过 unpark,如果是则跳过等待直接返回,这样做可以少加一次锁速度更快
#include<pthread.h> pthread_cond_t cond = PTHREAD_COND_INITIALIZER; intpthread_cond_signal(pthread_cond_t *cond); intpthread_cond_broadcast(pthread_cond_t *cond); intpthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex); // All return 0 on success, or a positive error number on error
s = pthread_mutex_lock(&mtx);//加锁,执行生产动作 if (s != 0) errExitEN(s, "pthread_mutex_lock"); _Event++; //实际的生产相应数据 s = pthread_mutex_unlock(&mtx);//完成生产之后,首先将mutex释放 if (s != 0) errExitEN(s, "pthread_mutex_unlock"); s = pthread_cond_signal(&cond); //之后,再通过signal,通知消费者 if (s != 0) errExitEN(s, "pthread_cond_signal");
不过我们这里的 park 先通过 cas 进行了检查,效率更好,思想类似于 Java 单例模式中的双重检查。
看到这不知道大家有没有捋清楚 mutex 加锁的逻辑,首先通过自旋锁获取等待队列 wait_lock 的锁,这是第一个锁,当任何一个 cpu 拿到锁之后,其他 cpu 就会一直卡在自旋了。而同一个 cpu 下,也可能出现时钟中断导致的调度,所以即便在同一个 cpu 下,我们仍然需要 atomic_xchg 对mutex的状态 lock->count 进行原子的设置。
int os::PlatformEvent::park(jlong millis) { guarantee (_nParked == 0, "invariant") ;
int v ; for (;;) { v = _Event ; if (Atomic::cmpxchg (v-1, &_Event, v) == v) break ; // cas 设置 _Event 减一 } guarantee (v >= 0, "invariant") ; if (v != 0) return OS_OK ; // 如果最初的 _Event 不等于 0,会直接跳出不进行休眠,因为 os::PlatformEvent::unpark 的时候会设置_Event=1 // 也就是说 _Event 相当于一个令牌,默认值为 0,但是如果在 park 之前执行了 unpark,令牌就会为 1,休眠过程直接跳过
// We do this the hard way, by blocking the thread. // Consider enforcing a minimum timeout value. structtimespecabst; compute_abstime(&abst, millis); // 计算绝对时间
int ret = OS_TIMEOUT; int status = pthread_mutex_lock(_mutex); // 加 mutex 锁 assert_status(status == 0, status, "mutex_lock"); guarantee (_nParked == 0, "invariant") ; ++_nParked ;
// Object.wait(timo) will return because of // (a) notification // (b) timeout // (c) thread.interrupt // // Thread.interrupt and object.notify{All} both call Event::set. // That is, we treat thread.interrupt as a special case of notification. // The underlying Solaris implementation, cond_timedwait, admits // spurious/premature wakeups, but the JLS/JVM spec prevents the // JVM from making those visible to Java code. As such, we must // filter out spurious wakeups. We assume all ETIME returns are valid. // // TODO: properly differentiate simultaneous notify+interrupt. // In that case, we should propagate the notify to another waiter.
while (_Event < 0) { // 当令牌不足时,会循环进入等待状态 status = os::Linux::safe_cond_timedwait(_cond, _mutex, &abst);// 调用该函数后会自动释放mutex 锁 if (status != 0 && WorkAroundNPTLTimedWaitHang) { pthread_cond_destroy (_cond); pthread_cond_init (_cond, os::Linux::condAttr()) ; } assert_status(status == 0 || status == EINTR || status == ETIME || status == ETIMEDOUT, status, "cond_timedwait"); if (!FilterSpuriousWakeups) break ; // previous semantics if (status == ETIME || status == ETIMEDOUT) break ; // We consume and ignore EINTR and spurious wakeups. } --_nParked ; if (_Event >= 0) { ret = OS_OK; } _Event = 0 ; status = pthread_mutex_unlock(_mutex); // 从 safe_cond_timedwait 返回后会重新获得 mutex锁,所以这里要进行释放 assert_status(status == 0, status, "mutex_unlock"); assert (_nParked == 0, "invariant") ; // Paranoia to ensure our locked and lock-free paths interact // correctly with each other. // 加入内存屏障 OrderAccess::fence(); return ret; }
int os::Linux::safe_cond_timedwait(pthread_cond_t *_cond, pthread_mutex_t *_mutex, const struct timespec *_abstime) { if (is_NPTL()) { return pthread_cond_timedwait(_cond, _mutex, _abstime); } else { // 6292965: LinuxThreads pthread_cond_timedwait() resets FPU control // word back to default 64bit precision if condvar is signaled. Java // wants 53bit precision. Save and restore current value. int fpu = get_fpu_control_word(); int status = pthread_cond_timedwait(_cond, _mutex, _abstime); set_fpu_control_word(fpu); return status; } }
可以看到 ParkEvent 的park函数和 Parker 的park函数很像,先是通过 cas 修改了 _Event,然后根据_Event 的原始值决定是不是要加 mutex 锁和睡眠。其实,JVM park 的这个思路很类似于 java 实现单例的双重检查模式,因为第一次通过 cas 的检查,如果发现有令牌就不加锁,不等待了,毕竟加锁和等待都是很重的过程,可能会被阻塞,而 cas 很轻很快,效率更高。
void os::PlatformEvent::unpark() { // Transitions for _Event: // 0 :=> 1 // 1 :=> 1 // -1 :=> either 0 or 1; must signal target thread // That is, we can safely transition _Event from -1 to either // 0 or 1. Forcing 1 is slightly more efficient for back-to-back // unpark() calls. // See also: "Semaphores in Plan 9" by Mullender & Cox // // Note: Forcing a transition from "-1" to "1" on an unpark() means // that it will take two back-to-back park() calls for the owning // thread to block. This has the benefit of forcing a spurious return // from the first park() call after an unpark() call which will help // shake out uses of park() and unpark() without condition variables.
if (Atomic::xchg(1, &_Event) >= 0) return;
// Wait for the thread associated with the event to vacate int status = pthread_mutex_lock(_mutex); assert_status(status == 0, status, "mutex_lock"); int AnyWaiters = _nParked; assert(AnyWaiters == 0 || AnyWaiters == 1, "invariant"); if (AnyWaiters != 0 && WorkAroundNPTLTimedWaitHang) { AnyWaiters = 0; pthread_cond_signal(_cond); } status = pthread_mutex_unlock(_mutex); assert_status(status == 0, status, "mutex_unlock"); if (AnyWaiters != 0) { status = pthread_cond_signal(_cond); assert_status(status == 0, status, "cond_signal"); }
果不其然,这里先通过 cas 将 _Event 修改为 1,如果_Event 之前是 -1,则说明可能有线程处于等待状态,或者将要进行等待,所以后面必须通过 mutex 锁来控制“唤醒过程”和“等待过程”互斥并且原子的进行。