Java JUC 简介

引言

本系列文章旨在介绍 Java 并发相关的知识,本文作为开篇主要介绍了 JDK 中常用的并发库(JUC)的使用方式, 后续的文章中我会自上而下地剖析了 JUC 中各个部门的实现原理,从直接下级框架 AbstractQueuedSynchronizer 也就是大家常说的 AQS,再到其中使用的 CAS, Wait,Park,最后到操作系统层面的 Mutex,Condition,希望通过这篇文章,大家能够对整个 Java 并发有一个清晰全面的认识,而且把这些内容串在一起你会发现它们本质上都是相通的。所有关于 Java 并发的文章均收录于<Java并发系列文章>

并发问题

大家都知道,我们的 Java 代码是运行在 JVM 中的,而每一个运行中的 JVM 都是一个进程,在一个进程中(运行中的程序)可能需要多个执行流,例如在我们的 Web 系统中,每收到一个 HTTP 请求,就用一个新的执行流去处理它,而不是 One-By-One 的方式处理,这样整体的处理效率可能会更高(当然这也不绝对,执行流并不是越多越好)。这里所说的执行流,就是所谓的线程。

在单核时代多线程主要是为了提高 CPU 和 IO 设备的综合利用率也是引入线程的一个原因。举个例子:当只有一个线程的时候会导致 CPU 计算时,IO 设备空闲;进行 IO 操作时,CPU 空闲。我们可以简单地说这两者的利用率目前都是 50%左右。但是当有两个线程的时候就不一样了,当一个线程执行 CPU 计算时,另外一个线程可以进行 IO 操作,这样两个的利用率就可以在理想情况下达到 100%了。

因为这些线程都是为了一个软件服务的(从属于某一进程),所以它们会共用同一块内存空间,即使用同一个页表。那么可能就会存在一个问题,这些线程很可能会修改同一块内存,就拿最简单的 +1 来举例。

1
2
3
4
5
int sum = 0;
// Thread1
sum++;
// thread2
sum++;

因为,+1 操作需要 3 步才能完成:

  1. 提取当前内存值
  2. 对该值+1
  3. 将结果回写。

如果不使用任何保护措施的话,很可能会出现如下情况:

1
2
3
4
5
6
7
8
9
10
11
12
int sum = 0;
/*Thread1
sum 当前为 0
sum + 1 等于 1
将 1 回写
*/

/*Thread2
在 Thread1 回写之前提取到了 sum 内存值 0
sum + 1 等于 1
将 1 回写
*/

为了解决诸如此类的问题,我们就需要使用到锁以及一些保护措施来控制线程的执行,不过好在 JDK 中为我们提供了丰富的并发控制组件,基本能够满足我们的需求。

JUC

java.util.concurrent 包(JUC)中,有各式各样的并发控制工具,这里我们简单介绍几个常用的工具及其使用方式。

Atomic 类

Atomic 类有很多种,它们都在 java.util.concurrent.atomic 包中。基本都是通过 CAS(CompareAndSwap)来实现的,而 CAS 的具体实现依赖于体系结构提供的指令。

atomic

这里我们仅介绍几个例子,并不会介绍每一个 Atomic 类的使用。首先看一下 AtomicInteger , 通过它我们可以无锁化的修改一个 int 类型的值,并且能够保证修改过程是原子的。

1
2
3
4
5
6
7
public static class LockTest {
private AtomicInteger sum = new AtomicInteger(0);

public void increase() {
sum.incrementAndGet();
}
}

比如统计一个网页的访问量时,就可以使用它,因为不会使用到锁,所以没有上下文切换的消耗,速度很快。

如果你不仅仅是修改一个基础类型的数据,例如一次要修改好几个基础数据类型的话,你可以把它们封装到一个对象中,然后使用 AtomicReference 来进行整个对象的更新操作。下例中,我们就一次性更新了一个对象的所有属性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public static class LockTest {
private AtomicReference<LockTest> reference = new AtomicReference<>();
private int test1;
private int test2;

public void changeObject() {
reference.getAndUpdate(new UnaryOperator<LockTest>() {
@Override public LockTest apply(LockTest test) {
LockTest newItem = new LockTest();
newItem.test1 = test.test1 + 1;
newItem.test2 = test.test2 - 1;
return newItem;
}
});
}
}

Atomic 类虽然很快,但是也有一个问题就是 ABA 问题,当一个 Atomic 的值从 A 修改为 B,再重新修改为 A 时,虽然值改变了,但是在进行 CAS 时,会错认为该值没有发生变化。为了解决这类问题,你可以使用 AtomicStampedReference 。它通过一个版本号来控制数据的变化,如果遵循使用规范,即每次进行修改时都将版本号加一,那么就可以杜绝 ABA 问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public static class LockTest {
private AtomicStampedReference<LockTest> reference = new AtomicStampedReference<>(null, 0);
private int test1;

public void changeObject() {
LockTest newObject = new LockTest();
for (; ; ) {
int previousStamp = reference.getStamp();
LockTest previousObject = reference.getReference();
if (reference.compareAndSet(previousObject, newObject, previousStamp, previousStamp + 1)) {
break;
}
}

}

}

Semaphore

Semaphore(信号量) 和 synchronized 类似,是控制线程能否进入某一同步代码区的一种手段,但是 synchronized 每次只有一个进程可以进入同步代码区,而 Semaphore可以指定多个线程同时访问某个资源。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public static class LockTest {

public static void main(String[] args) {
ExecutorService threadPool = Executors.newFixedThreadPool(300);
Semaphore semaphore = new Semaphore(5);
for (int i = 0; i < 100; i++) {
int finali = i;
threadPool.execute(() -> {
try {
semaphore.acquire();
System.out.println("Index:" + finali);
Thread.sleep(2000);
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
threadPool.shutdown();
}

}

上例中,我们设定使用了 5 个许可证(同一时刻最多5个线程进入同步区),每次调用 acquire 都会消耗一个许可证,调用 release 时释放一个许可证,当许可证不足时调用 acquire 就会进入队列等待。值得一提的是,Semaphore 包含两种模式,公平模式和非公平模式,在公平模式下,获取许可证是以 FIFO 的顺序进行,而在非公平模式,是不能保证顺序的。

CountDownLatch

CountDownLatch是一个同步工具类,它允许一个或多个线程一直等待,直到其他线程的操作执行完后再执行。下面看一个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public static class LockTest {
public static void main(String[] args) throws InterruptedException {
ExecutorService threadPool = Executors.newFixedThreadPool(1);
CountDownLatch countDownLatch = new CountDownLatch(5);
for (int i = 0; i < 5; i++) {
int finali = i;
threadPool.submit(() -> {
try {
System.out.println("Index:" + finali);
Thread.sleep(2000);
countDownLatch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
countDownLatch.await();
System.out.println("Finish");
}
}

首先,我们指定 CountDownLatch 等待 5 个线程完成任务,在每个线程执行完任务后,都调用 countDown 函数,它会将 CountDownLatch 内部的计数器减1,当计数器为 0 时, CountDownLatch::await 函数才会返回。我一般用它来实现 Future 接口。值得一提的是,CountDownLatch是一次性的,计数器的值只能在构造方法中初始化一次,之后没有任何机制再次对其设置值,当CountDownLatch使用完毕后,就不能再次被使用。

CyclicBarrier

CyclicBarrier 和 CountDownLatch 非常类似,它也可以实现线程间的计数等待,但是它的功能比 CountDownLatch 更加复杂和强大。它可以控制一组线程全部完成第一轮任务时,再同时开始让它们执行下一轮任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public static class LockTest {
public static void main(String[] args) {
ExecutorService threadPool = Executors.newFixedThreadPool(5);
CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> System.out.println("barrierAction merge data"));
for (int i = 0; i < 5; i++) {
int finali = i;
threadPool.submit(() -> {
try {
System.out.println("Task 1 Begin Index:" + finali);
Thread.sleep(ThreadLocalRandom.current().nextInt(2000));
System.out.println("Task 1 Finished Index:" + finali);
cyclicBarrier.await();
System.out.println("Task 2 Begin Index:" + finali);
Thread.sleep(ThreadLocalRandom.current().nextInt(2000));
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
});
}
}
}

CyclicBarrier 很适合进行数据分组处理的任务,而且下一轮任务依赖于上一轮任务的结果,比如我们将一个大任务拆分成很多小任务,当所有小任务完成时,我们可以通过 barrierAction 合并上一轮任务的结果,然后再开始下一轮任务。

关于 CyclicBarrier 和 CountDownLatch 的区别:

  • CountDownLatch: A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.(CountDownLatch: 一个或者多个线程,等待其他多个线程完成某件事情之后才能执行;)
  • CyclicBarrier : A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point.(CyclicBarrier : 多个线程互相等待,直到到达同一个同步点,再继续一起执行。)

countdown-cyclic-barrier

ThreadLocal

通常情况下,我们创建的变量是可以被任何一个线程访问并修改的。但是 JDK 也为我们提供了让某一变量独享与各个线程的方案,也就是 ThreadLocal 。因为每个线程都有自己专属的变量,所以各个线程在操作 ThreadLocal 变量时不需要加锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public static class LockTest {
public static void main(String[] args) {
ThreadLocal<Integer> threadLocal = new ThreadLocal<>();
new Thread(new Runnable() {
@Override public void run() {
try {
System.out.println("Thread 1 Current Value:" + threadLocal.get());
threadLocal.set(10);
Thread.sleep(500);
System.out.println("Thread 1 Current Value:" + threadLocal.get());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
Thread.sleep(100);
new Thread(new Runnable() {
@Override public void run() {
try {
System.out.println("Thread 2 Current Value:" + threadLocal.get());
threadLocal.set(5);
Thread.sleep(1000);
System.out.println("Thread 2 Current Value:" + threadLocal.get());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}

threadLocal 的初始值是 null,然后线程1先启动将值改为 10,100 ms 后线程2启动,会发现 threadLocal 的值仍然为 null,然后将其改为 5,400ms 后线程1从睡眠中苏醒,发现 threadLocal 的值仍然为 10,可见这两个线程所观测到的 threadLocal 值是各自独立的。

ThreadPool

在多线程开发中,如果并发的请求数量非常多,但每个线程执行的时间很短,这样就会频繁的创建和销毁线程,如此一来会大大降低系统的效率。可能出现服务器在为每个请求创建新线程和销毁线程上花费的时间和所消耗的系统资源要比处理实际的用户请求的时间和资源更多。为了复用线程减少资源浪费,我们就需要使用到线程池。

线程池提供了一种限制和管理线程资源的方案。通过线程池我们可以达到如下目的:

  • 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  • 提高响应速度。当任务到达时,任务可以不需要的等到线程创建就能立即执行。
  • 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

J. U. C中有三个Executor接口:

  • Executor:一个运行新任务的简单接口;
  • ExecutorService:扩展了Executor接口。添加了Future 功能和一些用来管理执行器生命周期和任务生命周期的方法;
  • ScheduledExecutorService:扩展了ExecutorService。支持定期执行任务。

J. U. C中的ThreadPoolExecutor 实现了 ExecutorService,也是大家最常用到的线程池实现。我们接下来就以它为例介绍线程池。

使用方式

在创建一个线程池时,需要几个核心参数,一个是 corePoolSize 它描述该线程池中常驻的线程数量,其次是 maximumPoolSize 它描述该线程池中最多能有多少个线程。当线程池的线程数大于 corePoolSize 时,keepAliveTime 规定了那些多余的空闲线程最多能存活多久。workQueue 是一个存放任务的队列,用于缓存将要执行的任务。threadFactory 用于创建新的线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} or {@code handler} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

RejectedExecutionHandler 用来描述当任务队列满了并且当前线程池内的线程数达到 maximumPoolSize 时,应该如何回应新加入的任务, ThreadPoolTaskExecutor 定义一些常用的策略(当然大家也可以自己实现一些自定义策略):

  • ThreadPoolExecutor. AbortPolicy:抛出 RejectedExecutionException来拒绝新任务的处理。
  • ThreadPoolExecutor. CallerRunsPolicy:调用执行自己的线程运行任务。您不会任务请求。但是这种策略会降低对于新任务提交速度,影响程序的整体性能。如果您的应用程序可以承受此延迟并且你不能任务丢弃任何一个任务请求的话,你可以选择这个策略。
  • ThreadPoolExecutor. DiscardPolicy: 不处理新任务,直接丢弃掉。
  • ThreadPoolExecutor. DiscardOldestPolicy: 此策略将丢弃最早的未处理的任务请求。

thread-pool

如果想让线程池执行任务的话需要实现的 Runnable 接口或 Callable 接口。 Runnable接口或Callable接口实现类都可以被ThreadPoolExecutor或ScheduledThreadPoolExecutor执行。两者的区别在于 Runnable 接口不会返回结果但是 Callable 接口可以返回结果。

接下来我们以 ThreadPoolExecutor 为例介绍一下线程池的常用接口。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
/**
* Executes the given task sometime in the future. The task
* may execute in a new thread or in an existing pooled thread.
*
* If the task cannot be submitted for execution, either because this
* executor has been shutdown or because its capacity has been reached,
* the task is handled by the current {@code RejectedExecutionHandler}.
*
* @param command the task to execute
* @throws RejectedExecutionException at discretion of
* {@code RejectedExecutionHandler}, if the task
* cannot be accepted for execution
* @throws NullPointerException if {@code command} is null
*/
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
// 线程数小于核心线程数时任务不入队,直接通过参数传递到 worker 线程中
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 入队失败时,当前任务也是参数传递到 worker 线程中
else if (!addWorker(command, false))
reject(command);
}

Runnable 接口主要是配合线程池的 execute 使用,用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功与否。它的执行流程主要分3步:

  1. 如果当期线程数小于 corePoolSize,创建新的线程去执行该任务
  2. 如果线程数达到了 corePoolSize,则试图将任务添加到任务队列中
  3. 如果队列已经满了,则添加新的线程,直到达到 maximumPoolSize,这时候就采用前面所说的处理策略 RejectedExecutionHandler 来处理这些新任务。

提交任务除了使用 execute 方法之外,还可以使用 submit 方法,该方法用于提交需要返回值的任务。线程池会返回一个Future类型的对象,通过这个Future对象可以判断任务是否执行成功,并且可以通过future的get()方法来获取返回值,get()方法会阻塞当前线程直到任务完成,而使用 get(long timeout,TimeUnit unit)方法则会阻塞当前线程一段时间后立即返回,这时候有可能任务没有执行完。

1
2
3
4
5
6
7
8
9
10
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}

在使用线程池时,推荐使用 ThreadPoolExecutor,而且需要根据自己的使用场景,合理的调整 corePoolSize,maximumPoolSize,workQueue,RejectedExecutionHandler,规避资源耗尽的风险。

任务性质不同的任务可以用不同规模的线程池分开处理。CPU 密集型任务配置尽可能少的线程数量,如配置Ncpu+1个线程的线程池。IO 密集型任务则由于需要等待 IO 操作,线程并不是一直在执行任务,则配置尽可能多的线程,如2xNcpu。混合型的任务,如果可以拆分,则将其拆分成一个 CPU 密集型任务和一个 IO 密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐率要高于串行执行的吞吐率,如果这两个任务执行时间相差太大,则没必要进行分解。我们可以通过Runtime.getRuntime().availableProcessors()方法获得当前设备的 CPU 个数。

优先级不同的任务可以使用优先级队列 PriorityBlockingQueue 来处理。它可以让优先级高的任务先得到执行,需要注意的是如果一直有优先级高的任务提交到队列里,那么优先级低的任务可能永远不能执行。

执行时间不同的任务可以交给不同规模的线程池来处理,或者也可以使用优先级队列,让执行时间短的任务先执行。

依赖数据库连接池的任务,因为线程提交 SQL 后需要等待数据库返回结果,如果等待的时间越长 CPU 空闲时间就越长,那么线程数应该设置越大,这样才能更好的利用 CPU。并且,阻塞队列最好是使用有界队列,如果采用无界队列的话,一旦任务积压在阻塞队列中的话就会占用过多的内存资源,甚至会使得系统崩溃。

当我们要关闭线程池时,可以通过shutdown和shutdownNow这两个方法。它们的原理都是遍历线程池中所有的线程,然后依次中断线程。shutdown和shutdownNow还是有不一样的地方:

  • shutdownNow首先将线程池的状态设置为STOP, 然后尝试停止所有的正在执行和未执行任务的线程,并返回等待执行任务的列表
  • shutdown只是将线程池的状态设置为SHUTDOWN状态(这意味着不再接受新的任务),然后中断所有空闲的线程,等所有现存任务执行完之后才会销毁线程池

可以看出 shutdown 方法会将正在执行的任务继续执行完,而 shutdownNow 会直接中断正在执行的任务。调用了这两个方法的任意一个,isShutdown方法都会返回 true,当所有的线程都关闭成功,才表示线程池成功关闭,这时调用isTerminated方法才会返回 true。

并发容器

JDK 提供的并发容器大部分在 java.util.concurrent 包中。比较常用的有:

  • ConcurrentHashMap: 线程安全版HashMap。
  • ConcurrentLinkedQueue: 线程安全版 LinkedList。
  • ConcurrentSkipListMap: 线程安全版跳表 Map。
  • CopyOnWriteArrayList: 线程安全版 List,但是不是通过锁实现。在读多写少的场合性能非常好。
  • LinkedBlockingQueue: 线程安全的阻塞队列。
  • PriorityBlockingQueue: 支持优先级的无界阻塞队列。

后面的文章中我们会简单地介绍一下它们的实现方式,这里我就不再介绍它们的使用方式了,因为无论是 List 还是 Map 大家应该都用的比较熟悉了。

Lock

JUC 中锁的实现主要有3个,分别是ReentrantLock,ReentrantReadWriteLock,StampedLock。本节我们主要介绍各种锁的使用,后面锁的分类部分,我们会对比着介绍各个锁的实现方案。

ReentrantLock 是最基础的一个锁,它是一个可重入锁,也就是说持有锁的线程,可以多次加锁,此外,通过参数我们还能控制它是否是一个公平锁,公平锁就是说大家获得锁的顺序是和尝试加锁的顺序一致的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public static class LockTest {

private ReentrantLock lock = new ReentrantLock(true);

private void doThing() {
lock.lock();
try {
doSomeThings();
} finally {
lock.unlock();
}
}

private void doSomeThings() {
lock.lock();
try {
// do something
} finally {
lock.unlock();
}
}
}

ReentrantReadWriteLock 主要是在数据既有读又有写的场景中使用,它能保证读操作之间不互斥,但是读写和写写之间互斥。它里面有两个锁,在需要读数据时,对读锁加锁,在需要写数据时对写锁加锁。同样,我们也可以在构造读写锁的时候通过参数控制其是否是公平锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public static class LockTest {

private ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);

private int data;

private void read() {
lock.readLock().lock();
try {
// read
System.out.println(data);
} finally {
lock.readLock().unlock();
}
}

private void write() {
lock.writeLock().lock();
try {
// write
data = 10;
} finally {
lock.writeLock().unlock();
}
}

private void lockDowngrade() {
lock.writeLock().lock();
try {
// write
data = 10;
read();
} finally {
lock.writeLock().unlock();
}
}

private void deadLock() {
lock.readLock().lock();
try {
// read
System.out.println(data);
write();
} finally {
lock.readLock().unlock();
}
}
}

关于读写锁,有一个小知识点是,当持有写锁时,是可以获得到读锁的,因为有些写操作可能内部调用到了读操作,就像上例一样。而先持有读锁,再去获得写锁时,就会发生死锁,大家在使用的时候一定要注意这个问题。

最后要介绍的是 StampedLock,它和 ReentrantReadWriteLock 很像。ReadWriteLock中如果有线程正在读,写线程需要等待读线程释放锁后才能获取写锁,即读的过程中不允许写,这是一种悲观的读锁。要进一步提升并发执行效率,Java 8引入了新的读写锁:StampedLock。

在使用 StampedLock 时,我们可以先使用乐观读锁,在这个过程中其他线程是可以获得写锁的,也就是说我们读的数据就可能不一致,所以,需要一点额外的代码来判断读的过程中是否有写入。乐观锁的意思就是乐观地估计读的过程中大概率不会有写入,因此被称为乐观锁。反过来,悲观锁则是读的过程中拒绝有写入,也就是写入必须等待。显然乐观锁的并发效率更高,但一旦有小概率的写入导致读取的数据不一致,需要能检测出来,再读一遍就行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class LockTest {
private final StampedLock lock = new StampedLock();

private int num1;
private int num2;

public void change(int num1, int num2) {
long stamp = lock.writeLock();
try {
this.num1 = num1;
this.num2 = num2;
} finally {
lock.unlockWrite(stamp);
}
}

public double readAndCompute() {
long stamp = lock.tryOptimisticRead(); // 获取当前数据版本号
int currentNum1 = num1;
int currentNum2 = num2;
if (!lock.validate(stamp)) { // 确认之前的版本号和最新版本号是否一致,如果一致,则说明期间没发生数据更改,否则,可能数据被更改了
stamp = lock.readLock(); // 数据版本号不一致,通过悲观读锁来获取正确数据
try {
currentNum1 = num1;
currentNum2 = num2;
} finally {
lock.unlockRead(stamp); // 释放悲观读锁
}
}
return currentNum1 + currentNum2;
}
}

StampedLock 写锁的使用和读写锁完全一样,区别在与多了一个 tryOptimisticRead 接口,它能够获得当前数据版本号,我们记录下读数据之前的版本号,然后再读取所有数据,最后拿之前记录的版本号和最新版本号做对比,如果一致,则说明期间没发生数据更改,可以正常使用,否则,可能数据被更改了,这时候就得改用悲观读锁加锁,在读取数据,这个和 ReentrantReadWriteLock 的使用流程就一样了。

可见,StampedLock把读锁细分为乐观读和悲观读,能进一步提升并发效率。但这也是有代价的:一是代码更加复杂,二是StampedLock是不可重入锁,不能在一个线程中反复获取同一个锁。

StampedLock还提供了更复杂的将悲观读锁升级为写锁的功能,它主要使用在if-then-update的场景:即先读,如果读的数据满足条件,就返回,如果读的数据不满足条件,再尝试写。

JUC 之外的保护措施

synchronized

synchronized关键字解决的是多个线程之间访问资源的同步性,synchronized关键字可以保证被它修饰的方法或者代码块在任意时刻只能有一个线程执行。synchronized 可以修饰一段代码块,也可以用来修饰一个函数。

当修饰一个静态函数或者将synchronized代码块作用于某一个类时,是对该类进行加锁。

1
2
3
4
5
6
7
8
9
10
11
12
public static class LockTest{

public static synchronized void doSomeThing() {
// do things1
}

public void doOtherThings() {
synchronized (LockTest.class) {
// do things2
}
}
}

而当其修饰的是某一对象的非静态函数,或者synchronized代码块作用于某一个对象时,是对该对象进行加锁。

1
2
3
4
5
6
7
8
9
10
11
12
public static class LockTest{

public synchronized void doSomeThing() {
// do things1
}

public void doOtherThings() {
synchronized (this) {
// do things2
}
}
}

synchronized 之前是借助下层 mutex 来实现(具有进程调度成本),所以在锁竞争不激烈的情况下性能低下而饱受诟病,不过后来 JVM 引入了偏向锁,轻量级锁,重量级锁,通过这些手段,很好地适应了各种锁竞争的场景,关于 synchronized 实现的讨论,我们后面再进行。

synchronized和ReentrantLock对比:

  • 两者都是可重入锁
    • 两者都是可重入锁。“可重入锁”概念是:自己可以再次获取自己的内部锁。比如一个线程获得了某个对象的锁,此时这个对象锁还没有释放,当其再次想要获取这个对象的锁的时候还是可以获取的,如果不可锁重入的话,就会造成死锁。同一个线程每次获取锁,锁的计数器都自增1,所以要等到锁的计数器下降为0时才能释放锁。
  • synchronized 依赖于 JVM 而 ReentrantLock 依赖于 API
    • synchronized 是依赖于 JVM 实现的,JDK1.6 为 synchronized 关键字进行了很多优化,但是这些优化都是在虚拟机层面实现的,并没有直接暴露给我们。ReentrantLock 是 JDK 层面实现的(也就是 API 层面,需要 lock() 和 unlock() 方法配合 try/finally 语句块来完成),所以我们可以通过查看它的源代码,来看它是如何实现的。
  • ReentrantLock 比 synchronized 增加了一些高级功能
    • 相比synchronized,ReentrantLock增加了一些高级功能。主要来说主要有三点:
      1. ReentrantLock提供了一种能够中断等待锁的线程的机制,通过lock.lockInterruptibly()来实现这个机制。也就是说正在等待的线程可以选择放弃等待,改为处理其他事情。
      2. ReentrantLock可以指定是公平锁还是非公平锁。而synchronized只能是非公平锁。所谓的公平锁就是先等待的线程先获得锁。 ReentrantLock默认情况是非公平的,可以通过 ReentrantLock类的ReentrantLock(boolean fair)构造方法来制定是否是公平的。
      3. synchronized关键字与wait()和notify()/notifyAll()方法相结合可以实现等待/通知机制,ReentrantLock类当然也可以实现,但是需要借助于Condition接口与newCondition() 方法。Condition是JDK1.5之后才有的,它具有很好的灵活性,比如可以实现多路通知功能也就是在一个Lock对象中可以创建多个Condition实例(即对象监视器),线程对象可以注册在指定的Condition中,从而可以有选择性的进行线程通知,在调度线程上更加灵活。 在使用notify()/notifyAll()方法进行通知时,被通知的线程是由 JVM 选择的,用ReentrantLock类结合Condition实例可以实现“选择性通知” ,这个功能非常重要,而且是Condition接口默认提供的。而synchronized关键字就相当于整个Lock对象中只有一个Condition实例,所有的线程都注册在它一个身上。如果执行notifyAll()方法的话就会通知所有处于等待状态的线程这样会造成很大的效率问题,而Condition实例的signalAll()方法 只会唤醒注册在该Condition实例中的所有等待线程。
  • 性能已不是选择标准

volatile

volatile 可以禁止 JVM 的指令重排,保证在多线程环境下数据的可见性。一个常见的使用场景是 volatile 和 synchronized 配合实现单例模式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public static class LockTest{
private volatile static LockTest INSTANCE;

private LockTest() {
}

public static LockTest getInstance() {
if (INSTANCE == null) {
synchronized (LockTest.class) {
if (INSTANCE == null) {
INSTANCE = new LockTest();
}
}
}
return INSTANCE;
}
}

这里之所以使用 volatile 修饰 INSTANCE 是因为 INSTANCE = new LockTest(); 这段代码其实是分为三步执行:

  1. 为 LockTest 分配内存空间
  2. 初始化 LockTest
  3. 将 INSTANCE 指向 LockTest 分配的内存地址

但是由于 JVM 具有指令重排的特性,执行顺序有可能变成 1->3->2。指令重排在单线程环境下不会出现问题,但是在多线程环境下会导致一个线程获得还没有初始化的实例。这里大家可能会有疑问,不是说 synchronized 也有防止指令重排的效果吗,既然它已经有这个效果了,为什么我们这里还要用 volatile?

实际上 synchronized 只能保证 monitorEnter 前后以及 monitorExit 前后的代码不会重拍,这是一个很粗的粒度,而 monitorEnter 和 monitorExit 之间的代码是可以重排的。

例如,线程1 执行了 1 和 3,此时线程2 调用 getInstance() 后发现 INSTANCE 不为空,因此返回 INSTANCE,但此时 INSTANCE 可能还未被初始化。而当我们将 INSTANCE 修饰为 volatile后,上述的实例化过程只会以 1->2->3 的顺序执行。也就不会有上述问题了。

在 JDK1.2 之前,Java的内存模型实现总是从主存(即共享内存)读取变量,是不需要进行特别的注意的。而在当前的 Java 内存模型下,线程可以把变量保存本地内存比如机器的寄存器中,而不是直接在主存中进行读写。这就可能造成一个线程在主存中修改了一个变量的值,而另外一个线程还继续使用它在寄存器中的变量值的拷贝,造成数据的不一致。

java-main-memory

要解决这个问题,我们也需要把变量声明为volatile,这就指示 JVM,这个变量是不稳定的,每次使用它都到主存中进行读取。

说白了,volatile 关键字的主要作用就是保证变量的多线程可见性然后还有一个作用是防止指令重排序, 防止指令重排是 JVM 规范中所规定的,JVM 实现的时候需要遵循该规范。

java-volatile-memory

而实现多线程可见性使用到的是 StoreLoad 内存屏障,它一般是通过 lock 指令实现的,该指令一般用在多处理器系统中,执行指令时对共享内存的独占使用。

在修改内存操作时,使用LOCK前缀去调用加锁的读-修改-写操作,这种机制用于多处理器系统中处理器之间进行可靠的通讯,具体描述如下:

(1)在早期处理器中,LOCK前缀会使处理器执行当前指令时产生一个LOCK#信号,这种总是引起显式总线锁定
(2)在现代处理器中,加锁操作是由高速缓存锁或总线锁来处理。如果内存访问有高速缓存且只影响一个单独的高速缓存行,那么操作中就会调用高速缓存锁,而系统总线和系统内存中的实际区域内不会被锁定。同时,这条总线上的其它处理器就回写所有已修改的数据并使它们的高速缓存失效,以保证系统内存的一致性。如果内存访问没有高速缓存且/或它跨越了高速缓存行的边界,那么这个处理器就会产生LOCK#信号,并在锁定操作期间不会响应总线控制请求

synchronized关键字和volatile关键字比较:

  • volatile关键字是线程同步的轻量级实现,所以volatile性能肯定比synchronized关键字要好。但是volatile关键字只能用于变量而synchronized关键字可以修饰方法以及代码块。synchronized关键字在JavaSE1.6之后进行了主要包括为了减少获得锁和释放锁带来的性能消耗而引入的偏向锁和轻量级锁以及其它各种优化之后执行效率有了显著提升,实际开发中使用 synchronized 关键字的场景还是更多一些。
  • 多线程访问volatile关键字不会发生阻塞,而synchronized关键字可能会发生阻塞
  • volatile关键字能保证数据的可见性,但不能保证数据的原子性。synchronized关键字两者都能保证。
  • volatile关键字主要用于解决变量在多个线程之间的可见性,而 synchronized关键字解决的是多个线程之间访问资源的同步性。

参考内容

[1] linux 2.6 互斥锁的实现-源码分析
[2] 深入解析条件变量(condition variables)
[3] Linux下Condition Vairable和Mutext合用的小细节
[4] 从ReentrantLock的实现看AQS的原理及应用
[5] 不可不说的Java“锁”事
[6] 从源码层面解析yield、sleep、wait、park
[7] LockSupport中的park与unpark原理
[8] Thread.sleep、Object.wait、LockSupport.park 区别
[9] 从AQS到futex-二-JVM的Thread和Parker
[10] Java的LockSupport.park()实现分析
[11] JVM源码分析之Object.wait/notify实现
[12] Java线程源码解析之interrupt
[13] Thread.interrupt()相关源码分析
[14] Java CAS 原理剖析
[15] 源码解析 Java 的 compareAndSwapObject 到底比较的是什么
[16] 《Java并发编程的艺术》
[17] 《实战 Java 高并发程序设计》
[18] volatile关键字深入学习
[19] 为什么Netty的FastThreadLocal速度快
[20] 线程池ThreadPoolExecutor实现原理
[21] 深入理解Java线程池:ThreadPoolExecutor
[22] ConcurrentHashMap 详解一
[23] ConcurrentHashMap 详解二
[24] JUC中Atomic class之lazySet的一点疑惑
[25] The JSR-133 Cookbook for Compiler Writers
[26] 就是要你懂Java中volatile关键字实现原理

贝克街的流浪猫 wechat
您的打赏将鼓励我继续分享!
  • 本文作者: 贝克街的流浪猫
  • 本文链接: https://www.beikejiedeliulangmao.top/java/concurrent/juc-introduction/
  • 版权声明: 本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议,转载请注明出处!
  • 创作声明: 本文基于上述所有参考内容进行创作,其中可能涉及复制、修改或者转换,图片均来自网络,如有侵权请联系我,我会第一时间进行删除。