Java核心技术 - JUC



JUC包中锁原理剖析

LockSupport工具类

JDK中的 rt.jar包 里面的 LockSupport 是个工具类,它的主要作用是挂起和唤醒线程,该工具类是创建锁和其他同步类的基础

LockSupport类与每个使用它的线程都会关联一个许可证,在默认情况下调用LockSupport类的方法的线程是不持有许可证的。LockSupport是使用Unsafe类实现的,下面介绍LockSupport中的几个主要函数。

1、void park()

如果调用 park 方法的线程已经拿到了与 LockSupport关联 的许可证,则调用 Locksupport.park() 时会马上返回,否则调用线程会被禁止参与线程的调度,也就是会被阻塞挂起。

如下代码直接在main函数里面调用park方法,最终只会输出begin park!,然后当前线程被挂起(阻塞),这是因为在默认情况下调用线程是不持有许可证的。

1
2
3
4
5
6
7
public class Demo {
public static void main(String[] args) {
System.out.println("begin park");
LockSupport.park();
System.out.println("end park, but main thread isn't end!");
}
}

在其他线程调用 unpark(Thread thread)方法并且将当前线程作为参数时,调用park方法而被阻塞的线程会返回。另外,如果其他线程调用了阻塞线程的interrupt()方法,设置了中断标志或者线程被虚假唤醒,则阻塞线程也会返回。所以在调用park方法时最好也使用循环条件判断方式。

2、void unpark(Thread thread)方法

当一个线程调用unpark时,如果参数thread线程没有持有thread与LockSupport类关联的许可证,则让thread线程持有。如果thread之前因调用 park() 而被挂起,则调用unpark后,该线程会被唤醒。如果thread之前没有调用park,则调用unpark方法后, 再调用park方法,其会立刻返回。修改代码如下

1
2
3
4
5
6
7
8
public class Demo {
public static void main(String[] args) {
LockSupport.unpark(Thread.currentThread());
System.out.println("begin park");
LockSupport.park();
System.out.println("end park, but main thread isn't end!");
}
}

该代码会输出

1
2
begin park
end park, but main thread isn't end!

下面再来看一个例子以加深对park和unpark的理解。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class Demo1 {
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(() -> {
System.out.println("child begin park");
LockSupport.park();
System.out.println("child end park");
});
thread.start();

Thread.sleep(1000);

// 调用 unpark 方法让thread线程持有许可证,然后park方法返回。
LockSupport.unpark(thread);
// thread.interrupt(); 中断子线程,就不管你调用了几次park(),都可以结束中断。
System.out.printf("end park");
}
}

输出结果为:

1
2
3
child begin park
child end park
end park

上面代码首先创建了一个子线程thread,子线程启动后调用park方法,由于在默认情况下子线程没有持有许可证,因而它会把自己挂起

主线程休眠 1s 是为了让主线程调用unpark方法前让子线程输出child thread begin park! 并阻塞。

主线程然后执行unpark方法,参数为子线程,这样做的目的是让子线程持有许可证,然后子线程调用的park方法就返回了。park方法返回时不会告诉你因何种原因返回,所以调用者需要根据之前调用park方法的原因,再次检查条件是否满足,如果不满足则还需要再次调用park方法。

注意只有中断子线程,子线程才会运行结束,如果子线程不被中断, 即使你调用unpark(thread) 方法子线程也不会结束。一个unpark(thread)凭证只能释放一个park()方法。

3、void parkNanos(long nanos)方法

和park方法类似,如果调用park方法的线程已经拿到了与LockSupport关联的许可证,则调用LockSupport.parkNanos(longnanos)方法后会马上返回。该方法的不同在于,如果没有拿到许可证,则调用线程会被挂起nanos时间后修改为自动返回。

另外park方法还支持带有blocker参数的方法voidpark(Object blocker) 方法,当钱程在没有持有许可证的情况下调用park方法而被阻塞挂起时,这个blocker对象会被记录到该线程内部。

1
2
3
4
5
6
7
8
public static void parkNanos(Object blocker, long nanos) {
if (nanos > 0) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
UNSAFE.park(false, nanos);
setBlocker(t, null);
}
}

使用诊断工具可以观察线程被阻塞的原因,诊断工具是通过调用getBlocker(Thread)方法来获取blocker对象的,所以JDK推荐我们使用带有 blocker 参数的park方法,并且 blocker 被设置为this, 这样当在打印线程堆栈排查问题时就能知道是哪个类被阻塞了。

1
2
3
4
5
6
7
8
9
10
public class TestPark {
public void testPark() {
LockSupport.park(); // (1)
}

public static void main(String[] args) {
TestPark testPark = new TestPark();
testPark.testPark();
}
}

运行代码后,使用jstack pid命令查看线程堆栈时可以看到如下输出结果。

4、park(Object blocker)方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public static void park(Object blocker) {
Thread t = Thread.currentThread();
// 设置该线程的blocker变量
setBlocker(t, blocker);
// 挂起线程
UNSAFE.park(false, 0L);
// 线程被激活后清除blocker变量,因为一般都是在线程阻塞时才分析原因
setBlocker(t, null);
}

private static void setBlocker(Thread t, Object arg) {
// Even though volatile, hotspot doesn't need a write barrier here.
UNSAFE.putObject(t, parkBlockerOffset, arg);
}

Thread类里面有个变量volatile Object parkBlocker, 用来存放park方法传递的blocker对象,也就是把 blocker 变量存放到了调用 park方法的线程的成员变量里面。

5、void parkNanos(Object blocker, long nanos)

1
2
3
4
5
6
7
8
public static void parkNanos(Object blocker, long nanos) {
if (nanos > 0) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
UNSAFE.park(false, nanos);
setBlocker(t, null);
}
}

相比 park(Object blocker) 方法多了个超时时间。

6、void parkUntil(Object blocker, long deadline)

从1970年到这个时间点的总毫秒数

1
2
3
4
5
6
7
public static void parkUntil(Object blocker, long deadline) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
// isAbsolute=true, time=deadline;表示到deadline时间后返回
UNSAFE.park(true, deadline);
setBlocker(t, null);
}

其中参数deadline的时间单位为 ms,该时间是从1970年到现在某一个时间点的毫秒值。

这个方法和 parkNanos(Objectblocker, long nanos) 方法的区别是,后者是从当前算等待 nanos 秒时间,而前者是指定一个时间点,比如需要等到2017.12.11日12:00:00,则把这个时间点转换为从1970年到这个时间点的总毫秒数。

最后看一个例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class FIFOMutex {
private final AtomicBoolean locked = new AtomicBoolean(false);
private final Queue<Thread> waiters = new ConcurrentLinkedQueue<>();

private void lock() {
boolean wasInterrupted = false; // 是否中断了该线程,为false则没中断该线程
Thread current = Thread.currentThread();
waiters.add(current);

// 只有队首的线程可以获取锁 (1)
// 如果当前线程不是队首或者当前锁己经被其他线程获取,则调用park方法挂起自己。
while (waiters.peek() != current || !locked.compareAndSet(false, true)) {
LockSupport.park(this);
// 如果park方法是因为被中断而返回,则忽略中断,并且重置中断标志,做个标记,然后再次判断当前线程是不是队首元素或者当前锁是否己经被其他线程获取,如果是则继续调用park方法挂起自己。
if (Thread.interrupted()) // (2)
wasInterrupted = true;
}

waiters.remove();
// 判断标记,如果标记为true则中断该线程,这个怎么理解呢?
if (wasInterrupted) // (3)
// 其实就是其他线程中断了该线程,虽然我对中断信号不感兴趣,忽略它,但是不代表其他线程对该标志不感兴趣,所以要恢复下。
current.interrupt();
}

public void unlock() {
locked.set(false);
LockSupport.unpark(waiters.peek());
}

}

抽象同步队列 AQS 概述

锁的底层支持

AbstractQueuedSynchronizer 抽象同步队列简称 AQS,它是实现同步器的基础组件,并发包中锁的底层就是使用AQS实现的。 另外,大多数开发者可能永远不会直接使用AQS,但是知道其原理对于架构设计还是很有帮助的。 下面看下AQS的类图结构,如图6-1所示。

由该图可以看到,AQS是一个FIFO的双向队列(先进先出),其内部通过节点head和tail 记录队首和队尾元素,队列元素的类型为Node。 其中Node中的thread变量用来存放进入AQS队列里面的线程:Node节点内部的SHARED用来标记该线程是获取共享资源时被阻塞挂起后放入AQS队列的,EXCLUSIVE用来标记线程是获取独占资源时被挂起后放入AQS队列的; waitStatus记录当前线程等待状态,可以为CANCELLED(线程被取消了)、SIGNAL (线程需要被唤醒)、CONDITION(线程在条件队列里面等待〉、PROPAGATE (释放共享资源时需要通知其他节点〕;prev记录当前节点的前驱节点,next记录当前节点的后继节点。

在AQS中维持了一个单一的状态信息state,可以通过getState、 setState、compareAndSetState 函数修改其值。 对于ReentrantLock的实现来说,state可以用来表示当前线程获取锁的可重入次数;对于读写锁ReentrantReadWriteLock来说,state 的高16位表示读状态,也就是获取该读锁的次数,低16位表示获取到写锁的线程的可重入次数;对于semaphore来说,state用来表示当前可用信号的个数:对于CountDownlatch来说,state用来表示计数器当前的值。

AQS有个内部类ConditionObject,用来结合锁实现线程同步。ConditionObject可以直接访问AQS对象内部的变量,比如state状态值和AQS队列。ConditionObject是条件变量,每个条件变量对应一个条件队列(单向链表队列),其用来存放调用条件变量的await方法后被阻塞的线程,如类图所示,这个条件队列的头、尾元素分别为 firstWaiter 和 lastWaiter。

对于AQS来说,线程同步的关键是对状态值state进行操作根据state是否属于一个线程,操作state的方式分为独占方式和共享方式。在独占方式 下获取和释放资源使用的方法为: void acquire(int arg) void acquirelnterruptibly(int arg) boolean release(int arg)。

在共享方式下获取和释放资源的方法为: void acquireShared(int arg) void acquireSharedInterruptibly(int arg) boolean releaseShared(int arg)。

使用独占方式获取的资源是与具体线程绑定的,就是说如果一个线程获取到了资源,就会标记是这个线程获取到了,其他线程再尝试操作state获取资源时会发现当前该资源不是自己持有的,就会在获取失败后被阻塞。比如独占锁ReentrantLock的实现,当一个线程获取了ReentrantLock的锁后,在AQS内部会首先使用CAS操作把state状态值从0变为1,然后设置当前锁的持有者为当前线程,当该线程再次获取锁时发现它就是锁的持有者,则会把状态值从1变为2,也就是设置可重入次数,而当另外一个线程获取锁时发现自己并不是该锁的持有者就会被放入AQS阻塞队列后挂起。

在独占方式下, 获取与释放资源的流程如下:

( 1 )当一个线程调用acquire(int arg)方法获取独占资源时,会首先使用tryAcquire方法尝试获取资源,具体是设置状态变量state 的值成功则直接返回失败则将当前线程封装为类型为Node.EXCLUSIVE的Node节点后插入到AQS阻塞队列的尾部,并调用LockSupport. park( this) 方法挂起自己。

1
2
3
4
5
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

( 2 ) 当一个线程调用release(int arg)方法时会尝试使用tryRelease操作释放资源,这里是设置状态变量state 的值,然后调用LockSupport.unpark(thread)方法激活AQS队列里面被阻塞的一个线程(thread)。 被激活的线程则使用tryAcquire尝试,看当前状态变量state的值是否能满足自己的需要,满足则该线程被激活,然后继续向下运行,否则还是会被放入AQS队列并被挂起

1
2
3
4
5
6
7
8
9
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

需要注意的是, AQS类并没有提供可用的tryAcquire和tryRelease方法,正如AQS是锁阻塞和同步器的基础框架一样,tryAcquire和tryRelease需要由具体的子类来实现。子类在实现tryAcquire和tryRelease时要根据具体场景使用CAS算法尝试修改state状态值,成功则返回true,否则返回false。

最后,我们来看看如何维护AQS提供的队列,主要看入队操作。

入队操作: 当一个线程获取锁失败后该线程会被转换为Node节点,然后就会使用 enq(final Node node) 方法将该节点插入到AQS的阻塞队列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

基于AQS实现自定义同步器

本节我们基于AQS实现一个不可重入的独占锁,正如前文所讲的,自定义AQS需要重写一系列函数,还需要定义原子变量state 的含义。 这里我们定义,state为0表示目前锁没有被线程持有, state为 1 表示锁己经被某一个线程持有,由于是不可重入锁,所以不需要记录持有锁的线程获取锁的次数。另外, 我们自定义的锁支持条件变量。

1、代码实现

如下代码是基于AQS实现的不可重入的独占锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
public class NonReentrantLock implements Lock, java.io.Serializable{

private static class Sync extends AbstractQueuedSynchronizer {
// 是否锁已经被持有
@Override
protected boolean isHeldExclusively() {
return getState() == 1;
}

// 如果state为0 则尝试获取锁
@Override
protected boolean tryAcquire(int acquires) {
assert acquires == 1;
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}

// 尝试释放锁,设置state为0
@Override
protected boolean tryRelease(int releases) {
assert releases == 1;
if (getState() == 0) {
throw new IllegalMonitorStateException();
}
setExclusiveOwnerThread(null);
setState(0);
return true;
}

// 提供条件变量接口
Condition newCondition() {
return new ConditionObject();
}
}

// 创建一个Sync来做具体的工作
private final Sync sync = new Sync();

@Override
public void lock() {
sync.acquire(1);
}

@Override
public boolean tryLock() {
return sync.tryAcquire(1);
}

@Override
public void unlock() {
sync.release(1);
}

@Override
public Condition newCondition() {
return sync.newCondition();
}

public boolean isLocked() {
return sync.isHeldExclusively();
}

@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}

@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(time));
}
}

在如上代码中,NonReentrantLock定义了一个内部类Sync用来实现具体的锁的操作, Sync则继承了AQS。由于我们实现的是独占模式的锁,所以Sync重写了 tryAcquire 、tryRelease 和 isHeldExclusively 3个方法。另外, Sync提供了newCondition这个方法用来支持条件变量。

2、使用自定义锁实现生产 - 消费模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
public class Main {
final static NonReentrantLock lock = new NonReentrantLock(); // 自定义的不可重入锁
final static Condition consumerThread = lock.newCondition(); // 消费者线程
final static Condition producerThred = lock.newCondition(); // 生产者线程

final static Queue<String> queue = new LinkedBlockingDeque<String>();
final static int queueSize = 10;

public static void main(String[] args) {

Thread producer = new Thread(new Runnable() {
@Override
public void run() {
// 获取独占锁
lock.lock();
try {
// (1) 如果队列满了,则等待
while (queue.size() == queueSize) {
try {
producerThred.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// (2) 添加元素到队列
queue.add(UUID.randomUUID().toString().replace("-", ""));
// (3) 唤醒消费线程
consumerThread.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
});

Thread consumer = new Thread(new Runnable() {
@Override
public void run() {
// 获取独占锁
lock.lock();
try {
// 队列空,则等待
while (queue.size() == 0) {
try {
consumerThread.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 消费一个元素:检索并删除此队列的开头,返回此队列的头元素,如果此队列为空,则返回null
String element = queue.poll();
System.out.println(Thread.currentThread().getName() + "正在消费:" + element);
// 唤醒生产线程
producerThred.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
});

producer.start();
producer.setName("producer");

consumer.start();
consumer.setName("consumer");
}
}

如上代码首先创建了NonReentrantLock的一个对象lock,然后调用lock.newCondition创建了两个条件变量,用来进行生产者和消费者线程之间的同步。

在main 函数里面,首先创建了producer生产线程,在线程内部首先调用lock.lock()获取独占锁,然后判断当前队列是否己经满了,如果满了则调用producerThred.await()阻塞挂起当前线程。需要注意的是,这里使用while而不是if是为了避免虚假唤醒。 如果队列不满则直接向队列里面添加元素,然后调用consumerThread.signalAll()唤醒所有因为消费元素而被阻塞的消费线程,最后释放获取的锁。

然后在main函数里面创建了consumer生产线程,在线程内部首先调用lock.lock() 获取独占锁,然后判断当前队列里面是不是有元素,如果队列为空则调用notconsumerThreadFull.await()阻塞挂起当前线程。需要注意的是,这里使用while 而不是if是为了避免虚假唤醒。如果队列不为空则直接从队列里面获取并移除元素,然后唤醒因为队列满而被阻塞的生产线程,最后释放获取的锁。

乐观锁与悲观锁

悲观锁指对数据被外界修改持保守态度,认为数据很容易就会被其他线程修改,所以在数据被处理前先对数据进行加锁,并在整个数据处理过程中,使数据处于锁定状态。 悲观锁的实现往往依靠数据库提供的锁机制,即在数据库中,在对数据记录操作前给记录加排它锁。 如果获取锁失败,则说明数据正在被其他线程修改, 当前线程则等待或者抛出异常。 如果获取锁成功,则对记录进行操作,然后提交事务后释放排它锁。

下面我们看一个典型的例子,看它如何使用悲观锁来避免多线程同时对一个记录进行修改。

1
2
3
4
5
6
7
8
9
10
11
12
13
public int updateEntry(long id) {
// (1) 使用悲观锁获取指定记录
EntryObject entry = query("select * from table1 where id = #{id} for update", id);

// (2) 修改记录内容,根据计算修改entry记录的属性
String name = generatorName(entry);
entry.setName(name);
...

// (3)update操作
int count = update("update table1 set name = #{name}, age = #{age} where id = #{id}", entry);
return count;
}

对于如上代码,假设updateEntry、 query、 update方法都使用了事务切面的方法,并且事务传播性被设置为required。 执行updateEntry方法时如果上层调用方法里面没有开启事务,则会即时开启一个事务,然后执行代码(1)。 代码(1)调用了query方法,其根据指定id从数据库里面查询出一个记录。 由于事务传播性为requried时,所以执行query时没有开启新的事务, 而是加入了updateEntry开启的事务,也就是在updateEntry方法执行完毕提交事务时,query方法才会被提交,就是说记录的锁定会持续到updateEntry执行结束。

代码(2)则对获取的记录进行修改,代码(3)把修改的内容写回数据库,同样代码 (3)的update方法也没有开启新的事务,而是加入了updateEntry的事务。也就是updateEntry、query、 update方法共用同一个事务

当多个线程同时调用updateEntry方法,并且传递的是同一个id时, 只有一个线程执行代码(1)会成功,其他线程则会被阻塞,这是因为在同一时间只有一个线程可以获取对应记录的锁在获取锁的线程释放锁前(updateEntry执行完毕,提交事务前), 其他线程必须等待,也就是在同一时间只有一个线程可以对该记录进行修改。

乐观锁是相对悲观锁来说的,它认为数据在一般情况下不会造成冲突,所以在访问记录前不会加排它锁,而是在进行数据提交更新时,才会正式对数据冲突与否进行检测。具体来说,根据update返回的行数让用户决定如何去做。 将上面的例子改为使用乐观锁的代码如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
public int updateEntry(long id) {
// (1) 使用乐观锁获取指定记录
EntryObject entry = query("select * from table1 where id = #{id}", id);

// (2) 修改记录内容,version字段不能被修改
String name = generatorName(entry);
entry.setName(name);
...

// (3)update操作
int count = update("update table1 set name = #{name}, age = #{age}, version = #{version}+1 where id = #{id} and version = #{version}", entry);
return count;
}

在如上代码中,当多个线程调用updateEntry方法并且传递相同的id时, 多个线程可以同时执行代码(1)获取id对应的记录并把记录放入以同时执行代码(1)获取 id对应的记录并把记录放入线程本地栈里面,然后可以同时执行代码(2)对自己栈上的记录进行修改,多个线程修改后各自的entry里面的属性应该都不一样了。然后多个线程可以同时执行代码(3),代码(3)中的update语句的where条件里面加入了version=# {version}条件,并且 set语句中多了version=$ {version}+1表达式,该表达式的意思是,如果数据库里面id =#{id} and version=# {version}的记录存在,则更新version的值为原来的值加1,这有点CAS操作的意思。

假设多个线程同时执行updateEntry并传递相同的id,那么它们执行代码(1)时获取的 Entry是同一个,获取的Entry里面的version值都是相同的(这里假设version=0)。当多个线程执行代码(3)时,由于update语句本身是原子性的,假如线程A执行update成功了,那么这时候id对应的记录的version值由原始version值变为了1。其他线程执行代码(3)更新时发现数据库里面已经没有了version=0的语句,所以会返回影响行号0。在业务上根据返回值为0就可以知道当前更新没有成功,那么接下来有两个做法,如果业务发现更新失败了,下面可以什么都不做,也可以选择重试,如果选择重试,则updateEntry的代码可以修改为如下。

乐观锁并不会使用数据库提供的锁机制,一般在表中添加version宇段或者使用业务状态来实现。 乐观锁直到提交时才锁定,所以不会产生任何死锁。

公平锁与非公平锁

锁Lock分为 “公平锁” 和 “非公平锁” ,

公平锁表示线程获取锁的顺序是按照线程加锁的顺序来分配的,即先来先得的FIFO先进先出顺序。而非公平锁就是一种获取锁的抢占机制,是随机获得锁的,和公平锁不一样的就是先来的不一定先得到锁,这个方式可能造成某些线程一 直拿不到锁,结果也就是不公平的了。

读源代码可知

1
2
3
4
// true为公平锁,false为非公平锁
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}

独占锁与共享锁

根据锁只能被单个线程持有还是能被多个线程共同持有,锁可以分为独占锁和共享锁。

独占锁保证任何时候都只有一个线程能得到锁,ReentrantLock就是以独占方式实现的。 共享锁则可以同时由多个线程持有,例如ReadWriteLock读写锁,它允许一个资源可以被多线程同时进行读操作。

独占锁是一种悲观锁,由于每次访问资源都先加上互斥锁,这限制了并发性,因为读操作并不会影响数据的一致性,而独占锁只允许在同一时间由一个线程读取数据,其他线程必须等待当前线程释放锁才能进行读取。共享锁则是一种乐观锁,它放宽了加锁的条件,允许多个线程同时进行读操作。

什么是可重入锁

当一个线程要获取一个被其他线程持有的独占锁时,该线程会被阻塞,那么当一个线程再次获取它自己己经获取的锁时是否会被阻塞呢?如果不被阻塞,那么我们说该锁是可重入的,也就是只要该线程获取了该锁,那么可以无限次数(在高级篇中我们将知道,严格来说是有限次数)地进入被该锁锁住的代码。

1
2
3
4
5
6
7
8
9
10
public class Hello {
public synchronized void helloA() {
System.out.println("hello");
}

public synchronized void helloB() {
System.out.println("hello B");
helloA();
}
}

在如上代码中,调用helloB方法前会先获取内置锁,然后打印输出。之后调用helloA方法,在调用前会先去获取内置锁,如果内置锁不是可重入的,那么调用线程将会一直被阻塞。

实际上,synchronized 内部锁是可重入锁。 可重入锁的原理是在锁内部维护一个线程标示,用来标示该锁目前被哪个线程占用,然后关联一个计数器一开始计数器值为0,说明该锁没有被任何线程占用。 当一个钱程获取了该锁时,计数器的值会变成1,这时其他线程再来获取该锁时会发现锁的所有者不是自己而被阻塞挂起。

但是当获取了该锁的线程再次获取锁时发现锁拥有者是自己,就会把计数器值加+1,当释放锁后计数器值-1 。 当计数器值为0时,锁里面的线程标示被重置为null, 这时候被阻塞的线程会被唤醒来竞争获取该锁。

自旋锁

由于Java中的线程是与操作系统中的线程一一对应的,所以当一个线程在获取锁(比如独占锁)失败后,会被切换到内核状态而被挂起当该线程获取到锁时又需要将其切换到内核状态而唤醒该线程。 而从用户状态切换到内核状态的开销是比较大的,在一定程度上会影响并发性能。自旋锁则是,当前线程在获取锁时,如果发现锁已经被其他线程占有,它不马上阻塞自己,在不放弃CPU使用权的情况下,多次尝试获取(默认次数是10,可以使用 -XX:PreBlockSpinsh参数设置该值),很有可能在后面几次尝试中其他线程己经释放了锁。 如果尝试指定的次数后仍没有获取到锁则当前线程才会被阻塞挂起。 由此看来自旋锁是使用CPU时间换取线程阻塞与调度的开销,但是很有可能这些CPU时间白白浪费了。

简单说就是:spinlock,是指尝试获取锁的线程不会立即阻塞,而是采用循环的方式去尝试获取锁,这样的好处是减少线程上下文切换的消耗,缺点是循环会消耗CPU

手写自旋锁

通过CAS操作完成自旋锁,A线程先进来调用myLock方法自己持有锁5秒,B随后进来发现当前有线程持有锁,不是null,所以只能通过自旋等待,直到A释放锁后修改成null值,B随后抢到

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public class SpinLockDemo {

// 现在的泛型装的是Thread,原子引用线程
AtomicReference<Thread> atomicReference = new AtomicReference<>();

public void myLock() {
// 获取当前进来的线程
Thread thread = Thread.currentThread();
System.out.println(Thread.currentThread().getName() + "\t come in ");

// 开始自旋,期望值是null,更新值是当前线程,如果是null,则更新为当前线程,否则自旋
while(!atomicReference.compareAndSet(null, thread)) {

}
}

/**
* 解锁
*/
public void myUnLock() {

// 获取当前进来的线程
Thread thread = Thread.currentThread();
// 自己用完了后,把atomicReference变成null
atomicReference.compareAndSet(thread, null);

System.out.println(Thread.currentThread().getName() + "\t invoked myUnlock()");
}

public static void main(String[] args) {

SpinLockDemo spinLockDemo = new SpinLockDemo();

// 启动t1线程,开始操作
new Thread(() -> {
spinLockDemo.myLock(); // 开始占有锁
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
spinLockDemo.myUnLock(); // 开始释放锁
}, "t1").start();

try {
TimeUnit.SECONDS.sleep(2); // 让main线程暂停2秒,使得t1线程,先执行
} catch (InterruptedException e) {
e.printStackTrace();
}

// 1秒后,启动t2线程,开始占用这个锁
new Thread(() -> {
spinLockDemo.myLock(); // 开始占有锁
spinLockDemo.myUnLock(); // 开始释放锁
}, "t2").start();

}
}

输出结果:

1
2
3
4
t1	 come in 
t2 come in // 此时一直在自旋,等待 t1 释放锁
t1 invoked myUnlock()
t2 invoked myUnlock()

独占锁ReentrantLock

类ReentrantLock具有完全互斥排他的效果,即同一时间只有一个线程在执行ReentrantLock.lock()方法后面的任务。

类图结构

ReentrantLock是可重入的独占锁, 同时只能有一个线程可以获取该锁,其他获取该锁的线程会被阻塞而被放入该锁的AQS阻塞队列里面。首先看下ReentrantLock的类图以便对它的实现有个大致了解, 如图6-4所示。

从类图可以看到,ReentrantLock最终还是使用AQS来实现的,并且根据参数来决定其内部是一个公平还是非公平锁,默认是非公平锁。

1
2
3
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}

其中Sync类直接继承自AQS, 它的子类NonfairSync和FairSync分别实现了获取锁的非公平与公平策略。

在这里,AQS的state状态值表示线程获取该锁的可重入次数, 在默认情况下,state的值为0表示当前锁没有被任何线程持有。 当一个线程第一次获取该锁时会尝试使用CAS设置state 的值为 1,如果CAS成功则当前线程获取了该锁,然后记录该锁的持有者为当前线程。 在该线程没有释放锁的情况下第二次获取该锁后,状态值被设置为2, 这就是可重入次数。 在该线程释放该锁时,会尝试使用CAS让状态值减1, 如果减 1 后状态值为 0 ,则当前线程释放该锁。

使用细节

  • 必须在condition.await()方法调用之前调用lock.lock()代码获得同步监视器
  • Object类中的 wait() 方法相当于 Condition 类中的 await() 方法
  • Object类中的 wait(long timeout) 方法相当于 Condition 类中的 await(long time, TimeUnit unit) 方法
  • Object类中的 notify() 方法相当于Condition类中的 signal() 方法
  • Object类中的 notifyAll() 方法相当于Condition类中的 signalAll() 方法

类方法

  1. int getHoldCount() 的作用是:查询当前线程保持此锁定的个数,也就是调用lock()方法的次数。
  2. int getQueueLength() 的作用是:返回正等待获取此锁定的线程估计数, 比如有5个线程, 1个线程 首先执行 await()方法,那么在调用 getQueueLength() 方法后返回值是4,说明有4个线程同时在等待lock的释放。
  3. int getWaitQueueLength(Condition condition) 的作用是:返回等待与此锁定相关的给定条件Condition 的线程估计数,比如有5个线程,每个线程都执行了同一个 condition 对象的 await() 方 法,则调用 getWaitQueueLength(Condition condition) 方法时返回的int值是5
  4. boolean hasQueuedThread(Thread thread)的作用是:查询指定的线程是否正在等待获取此锁定。
  5. boolean hasQueuedThreads() 的作用是:查询是否有线程正在等待获取此锁定。
  6. boolean hasWaiters(Condition condition)的作用是:查询是否有线程正在等待与此锁定有关的condition条件。
  7. boolean isFair()的作用是:判断是不是公平锁。
  8. boolean isHeldByCurrentThread()的作用是:查询当前线程是否保持此锁定。
  9. boolean isLocked())的作用是:查询此锁定是否由任意线程保持。
  10. void lockInterruptibly()的作用是:如果当前线程未被中断,则获取锁定,如果已经被中断则出现异常
  11. boolean tryLock()的作用是:仅在调用时锁定未被另一个线程保持的情况下,才获取该锁定。
  12. boolean tryLock(long timeout, TimeUnit unit) 的作用是:如果锁在给定等待时间内没有被另一个线程保持,且当前线程未被中断,则获取该锁定。

获取锁

1、void lock() 方法

当一个线程调用该方法时,说明该线程希望获取该锁。 如果锁当前没有被其他线程占用并且当前线程之前没有获取过该锁,则当前线程会获取到该锁,然后设置当前锁的拥有者为当前线程, 并设置AQS的状态值为 1,然后直接返回。 如果当前线程之前己经获取过该锁,则这次只是简单地把AQS的状态值加 1 后返回。如果该锁己经被其他线程持有,则调用该方法的线程会被放入AQS队列后阻塞挂起

1
2
3
public void lock() {
sync.lock();
}

在如上代码中,ReentrantLock的 lock() 委托给了sync类,根据创建ReentrantLock构造函数选择sync的实现是NonfairSync还是FairSync,这个锁是一个非公平锁或者公平锁。这里先看sync的子类NonfairSync的情况,也就是非公平锁时。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
final void lock() {
// CAS 设置状态值:默认AQS的状态值为0,CAS成功则表示当前线程获取到了锁,state修改成 1
if (compareAndSetState(0, 1))
// 设置当前线程持有锁
setExclusiveOwnerThread(Thread.currentThread());
else
// 调用AQS方法,这边决定是其他线程阻塞并加入阻塞队列 还是重入锁,state + 1
acquire(1);
}

public final void acquire(int arg) {
if (!tryAcquire(arg) &&
// tryAcquire返回false会把当前线程放入AQS阻塞队列
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// 阻塞线程
selfInterrupt();
}

static void selfInterrupt() {
Thread.currentThread().interrupt();
}

之前说过,AQS并没有提供可用的tryAcquire方法, tryAcquire方法需要子类自己定制化,所以这里代码会调用ReentrantLock重写的tryAcquire方法。我们先看下非公平锁的代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}

final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 首次调用lock,acquires值为1,加锁成功就设置当前线程持有锁
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 当前线程是该锁的持有者
else if (current == getExclusiveOwnerThread()) {
// 防止次数溢出
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
// 放入AQS阻塞队列
return false;
}

/**
* The current owner of exclusive mode synchronization. 独占模式同步的当前所有者
*/
private transient Thread exclusiveOwnerThread;

protected final Thread getExclusiveOwnerThread() {
return exclusiveOwnerThread;
}

介绍完了非公平锁的实现代码,回过头来看看非公平在这里是怎么体现的。首先非公平是说先尝试获取锁的线程并不一定比后尝试获取锁的线程优先获取锁。

这里假设线程A调用 lock() 方法时执行到nonfairTryAcquire的代码,发现当前状态值不为0,所以执行后续代码,发现当前线程不是线程持有者,则返回false,然后当前线程被放入AQS阻塞队列

这时候线程B也调用了 lock() 方法执行到nonfairTryAcquire 的代码,发现当前状态值为0了(假设占有该锁的其他线程释放了该锁),所以通过CAS设置获取到了该锁。明明是线程A先请求获取该锁呀,这就是非公平的体现。

这里线程B在获取锁前并没有查看当前AQS队列里面是否有比自己更早请求该锁的线程, 而是使用了抢夺策略。那么下面看看公平锁是怎么实现公平的。公平锁的话只需要看FairSync重写的tryAcquire方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}

公平的tryAcquire策略与非公平的类似,不同之处在于,代码在设置CAS前添加了hasQueuedPredecessors方法,

1
2
3
4
5
6
7
8
9
10
11
// 如果当前线程之前有排队的线程,则返回true;如果当前线程位于队列的开头或队列为空,则返回false
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}

由条件归纳可见

条件 结果
当前线程节点有前驱节点 返回true
AQS队列为空或者当前线程节点是AQS的第一个节点 返回false
h == t 说明当前队列为空,直接返回false
h != t 并且 s == null 说明有一个元素将要作为AQS的第一个节点入队列,返回true
h != t 并且 s != null 和 s.thread != Thread.currentThread() 说明队列里面的第一个元素不是当前线程,那么返回true。

回顾前面的内容,enq函数的第一个元素入队列是两步操作:首先创建一个哨兵头节点,然后将第一个元素插入哨兵节点后面

2、void lockInterruptibly() 方法

该方法与 lock() 方法类似,它的不同在于,它对中断进行响应,就是当前线程在调用该方法时,如果其他线程调用了当前线程的interrupt() 方法, 则当前线程会抛出InterruptedException异常, 然后返回。

1
2
3
4
5
6
7
8
9
10
11
12
13
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}

public final void acquireInterruptibly(int arg) throws InterruptedException {
// 如果当前线程被中断,直接抛出异常
if (Thread.interrupted())
throw new InterruptedException();
// 尝试获取资源
if (!tryAcquire(arg))
// 调用AQS可被中断的方法
doAcquireInterruptibly(arg);
}

3、boolean trylock() 方法

尝试获取锁,如果当前该锁没有被其他线程持有,则当前线程获取该锁井返回true,否则返回false。注意,该方法不会引起当前线程阻塞。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public boolean tryLock() {
return sync.nonfairTryAcquire(1);
}

final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

如上代码与非公平锁的 tryAcquire() 方法代码类似,所以 tryLock() 使用的是非公平策略。

4、boolean tryLock(long timeout, TimeUnit unit) 方法

1
2
3
4
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}

尝试获取锁,与 tryLock() 的不同之处在于,它设置了超时时间,如果超时时间到没有获取到该锁则返回false

1
2
3
4
5
6
7
8
9
10
11
12
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
// 调用AQS的tryAcquireNanos方法 unit.toNanos(timeout)将转换成纳秒
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}

public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 尝试获取锁,获取失败就在规定时间内调用 doAcquireNanos方法 继续尝试获取锁
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}

下面我们来看下具体的 doAcquireNanos() 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
// 尝试时间小于0纳秒
if (nanosTimeout <= 0L)
return false;
// 计算出当前此线程超时的时间
final long deadline = System.nanoTime() + nanosTimeout;
// 添加到阻塞队列
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
// 获取锁成功
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
// 当前时间 - 超时时间,
nanosTimeout = deadline - System.nanoTime();
// 过了超时时间
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
// 尝试时间为正,还继续尝试获取锁。如果没有拿到许可证,则调用线程会被挂起nanosTimeout时间后修改为自动返回。
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
// 取消尝试获取
cancelAcquire(node);
}
}

释放锁

1、void unlock() 方法

尝试释放锁,如果当前线程持有该锁, 则调用该方法会让该线程对该线程持有的AQS状态值减 1 , 如果减去 1 后当前状态值为 0,则当前线程会释放该锁, 否则仅仅减 1 而己。如果当前线程没有持有该锁而调用了该方法则会抛出IllegalMonitorStateException异常,代码如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public void unlock() {
sync.release(1);
}

public final boolean release(int arg) {
if (tryRelease(arg)) {
// AQS 阻塞队列
Node h = head;
if (h != null && h.waitStatus != 0)
// LockSupport.unpark(s.thread); 释放凭证
unparkSuccessor(h);
return true;
}
return false;
}

protected final boolean tryRelease(int releases) {
int c = getState() - releases;
// 如果不是锁持有者调用Unlock方法,则抛出异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 如果当前可重入次数为0,则清空锁持有线程
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
// 设立可重入次数为原始值1
setState(c);
return free;
}

使用多个Condition实现一对一交替打印

1、创建线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class ThreadA extends Thread{

private Service service;

public ThreadA(Service service) {
this.service = service;
}

@Override
public void run() {
service.methodA();
}
}

public class ThreadB extends Thread {
private Service service;

public ThreadB(Service service) {
this.service = service;
}

@Override
public void run() {
service.methodB();
}
}

2、服务的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class Service {

private Boolean flag = true;
private ReentrantLock lock = new ReentrantLock();
private Condition conditionA = lock.newCondition();
private Condition conditionB = lock.newCondition();

public void methodA() {
lock.lock();
try {
while (!flag) {
conditionA.await();
}
System.out.println("methodA在努力做个打工人...");
flag = false;
conditionB.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}

public void methodB() {
lock.lock();
try {
while (flag) {
conditionB.await();
}
System.out.println("methodB在努力做个打工人...");
flag = true;
// 通知全部的 A 干活也行
conditionA.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}

3、运行测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public static void main(String[] args) {
Service service = new Service();
try {
for (int i = 0; i < 5; i++) {
// service对象需要传入同一个,不然methodA通知了B干活但是依旧未响应。
ThreadA threadA = new ThreadA(service);
threadA.start();
Thread.sleep(1000);
new ThreadB(service).start();
}
} catch(Exception e) {
e.printStackTrace();
}
}

4、由输出可见交替打印

1
2
3
4
5
6
methodA在努力做个打工人...
methodB在努力做个打工人...
methodA在努力做个打工人...
methodB在努力做个打工人...
methodA在努力做个打工人...
...

多对多交替打印

1、两个线程创建如上

2、服务调用的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class Service {

private Boolean flag = true;
private ReentrantLock lock = new ReentrantLock();

private Condition condition = lock.newCondition();

public void methodA() {
lock.lock();
try {
while (!flag) {
condition.await();
}
System.out.println("methodA在努力做个打工人...");
flag = false;
condition.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}

public void methodB() {
lock.lock();
try {
while (flag) {
condition.await();
}
System.out.println("methodB在努力做个打工人...");
flag = true;
// 通知全部的 A 干活
condition.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}

3、创建多对线程测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public static void main(String[] args) {
// service对象需要一个,不然methodA通知了B干活但是依旧未响应。
Service service = new Service();
try {
ThreadA[] threadA = new ThreadA[5];
ThreadB[] threadB = new ThreadB[5];
for (int i = 0; i < 5; i++) {
threadA[i] = new ThreadA(service);
threadA[i].start();
Thread.sleep(1000);
threadB[i] = new ThreadB(service);
threadB[i].start();
}
} catch(Exception e) {
e.printStackTrace();
}
}

4、注意 service 方法中在此情况下不要使用 condition.signal() 随机通知一个线程,可能造成阻塞。

锁的释放细节

当我们在getLock方法加两把锁会是什么情况呢? (阿里面试)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
 public static void getLock() {
lock.lock();
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + "\t get Lock \t" + lock.getHoldCount());
} finally {
lock.unlock();
System.out.println(Thread.currentThread().getName() + "\t freed Lock \t" + lock.getHoldCount());
}
}

public void unlock() {
sync.release(1);
}

由输出可见,我们申请几把锁,最后需要解除几把锁。调用一次 unlock 方法,只会让持有锁数 减一。而且不管有几把锁,其它他们都是同一把锁,也就是说用同一个钥匙都能够打开

1
2
main	 get Lock 	2
main freed Lock 1

当我们在getLock方法加两把锁,但是只解一把锁会出现什么情况呢?

由上可见,因为加了两把锁,只释放了一把锁,就会导致不释放锁,其他线程没有机会拿锁

当我们只加一把锁,但是用两把锁来解锁的时候,又会出现什么情况呢?

1
2
3
4
5
6
7
8
9
public void getLock() {
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + "\t get Lock");
} finally {
lock.unlock();
lock.unlock();
}
}

如果当前线程是此锁的持有者,则保留计数将减少。如果保持计数现在为零,则释放锁定。如果当前线程不是此锁的持有者,则抛出IllegalMonitorStateException。

1
2
3
4
5
6
7
8
main	 get Lock 	1
main freed Lock 0
Exception in thread "main" java.lang.IllegalMonitorStateException
at java.util.concurrent.locks.ReentrantLock$Sync.tryRelease(ReentrantLock.java:151)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.release(AbstractQueuedSynchronizer.java:1261)
at java.util.concurrent.locks.ReentrantLock.unlock(ReentrantLock.java:457)
at reentrantlock.LockSf.getLock(LockSf.java:23)
at reentrantlock.LockSf.main(LockSf.java:13)

小结

本节介绍了ReentrantLock的实现原理, ReentrantLock的底层是使用AQS实现的可重入独占锁。 在这里AQS状态值为 0 表示当前锁空闲,为大于等于 1 的值则说明该锁己经被占用。 该锁内部有公平与非公平实现, 默认情况下是非公平的实现。 另外,由于该锁是独占锁,所以某时只有一个线程可以获取该锁。

类ReentrantReadWriteLock

读写锁表示也有两个锁,一个是读操作相关的锁,也称为共享锁;另一个是写操作相关的锁,也叫排他锁。也就是多个读锁之间不互斥,读锁与写锁互斥,写锁与写锁互斥

在没有线程Thread进行写入操作时,进行读取操作的多个Thread都可以获取读锁,而进行写入操作的Thread只有在获取写锁后才能进行写入操作。即多个Thread可以同时进行读取操作,但是同一时刻只允许一个Thread进行写入操作。

类图结构

为了了解 ReentrantReadWriteLock 的内部构造,我们先看下它的类图结构, 如图6-7所示。

读写锁的内部维护了一个ReadLock和一个WriteLock,它们依赖Sync实现具体功能。而Sync继承自AQS,并且也提供了公平和非公平的实现。 下面只介绍非公平的读写锁实现。我们知道AQS中只维护了一个state状态,而ReentrantReadWriteLock则需要维护读状态和写状态,一个state 怎么表示写和读两种状态呢?ReentrantReadWriteLock巧妙地使用state 的高16位表示读状态,也就是获取到读锁的次数;使用低16位表示获取到写锁的线程的可重入次数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
static final int SHARED_SHIFT   = 16;

// 共享锁(读锁)状态单位值65536,2的16方次幂
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
// 共享锁线程最大个数65535
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
// 排它锁(写锁)掩码, 二进制,15个1
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

/** 返回读锁线程数 Returns the number of shared holds represented in count */
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
/** 返回写锁可重入个数 Returns the number of exclusive holds represented in count */
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }


<< : 左移运算符,num << 1,相当于num乘以2,幂指数
>> : 右移运算符,num >> 1,相当于num除以2,分数指数幂,根号
>>> : 无符号右移,忽略符号位,空位都以0补齐

其中

1
2
3
4
5
6
7
8
9
private transient Thread firstReader = null; // 记录第一个获取到读锁的线程
private transient int firstReaderHoldCount; // 记录第一个获取到读锁的线程获取读锁的可重入次数
private transient HoldCounter cachedHoldCounter; // 记录最后一个获取读锁的线程获取读锁的可重入次数

static final class HoldCounter {
int count = 0;
// Use id, not reference, to avoid garbage retention
final long tid = getThreadId(Thread.currentThread());
}

readHolds 是ThreadLocal变量, 用来存放除去第一个获取读锁线程外的其他线程获取读锁的可重入次数。 ThreadLocalHoIdCounter继承了ThreadLocal,因而initialValue方法返回一个HoldCounter对象

1
2
3
4
5
static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> {
public HoldCounter initialValue() {
return new HoldCounter();
}
}

写锁的获取与释放

在ReentrantReadWriteLock中写锁使用WriteLock来实现。

1、void lock()

写锁是个独占锁, 某时只有一个线程可以获取该锁。 如果当前没有线程获取到读锁和写锁,则当前线程可以获取到写锁然后返回。 如果当前己经有线程获取到读锁和写锁,则当前请求写锁的线程会被阻塞挂起。 另外, 写锁是可重入锁,如果当前线程己经获取了该锁,再次获取只是简单地把可重入次数加 1 后直接返回。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public void lock() {
sync.acquire(1);
}

protected final boolean tryAcquire(int acquires) {
/*
* Walkthrough:
* 1. If read count nonzero or write count nonzero
* and owner is a different thread, fail.
* 2. If count would saturate, fail. (This can only
* happen if count is already nonzero.)
* 3. Otherwise, this thread is eligible(有资格的) for lock if
* it is either a reentrant acquire or
* queue policy allows it. If so, update state
* and set owner.
*/
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
// c != 0说明读锁或者写锁已经被某线程获取
if (c != 0) {
// w=0说明已经有线程获取了读锁, w !=O 并且当前线程不是写锁拥有者,则返回false
if (w == 0 || current != getExclusiveOwnerThread())
return false;
// 说明当前线程获取了写锁,判断可重入次数
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
// 设置可重入次数(
setState(c + acquires);
return true;
}
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}

2、释放锁 void unlock()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public void unlock() {
sync.release(1);
}

public final boolean release(int arg) {
if (tryRelease(arg)) {
// 激活阻塞队列里面的一个线程
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

protected final boolean isHeldExclusively() {
return getExclusiveOwnerThread() == Thread.currentThread();
}

protected final boolean tryRelease(int releases) {
// 看是否是写锁拥有者调用的unlock
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 获取可重入值, 这里没有考虑高16位, 因为获取写锁时读锁状态值肯定为0
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
// 如果写锁可重入值为0则释放锁,否则只是简单地更新状态值
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}

在如上代码中,tryRelease首先通过 isHeldExclusively 判断是否当前线程是该写锁的持有者,如果不是则抛出异常,否则执行后续代码,这说明当前线程持有写锁,持有写锁说明状态值的高16位为0,所以这里 nextc 值就是当前线程写锁的剩余可重入次数。代码判断当前可重入次数是否为0,如果free巳为true则说明可重入次数为0,所以当前线程会释放写锁,将当前锁的持有者设置为null。 如果free为false则简单地更新可重入次数。

读锁的获取与释放

ReentrantReadWriteLock中的读锁是使用ReadLock来实现的。

1、void lock()

获取读锁,如果当前没有其他线程持有写锁,则当前线程可以获取读锁,AQS的状态值state的高16位的值会增加 1,然后方法返回。 否则如果其他一个线程持有写锁, 则当前线程会被阻塞。

1
2
3
4
5
6
7
8
 public void lock() {
sync.acquireShared(1);
}

public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}

在如上代码中, 读锁的lock方法调用了AQS的 acquireShared 方法,在其内部调用了ReentrantReadWriteLock中的sync重写的tryAcquireShared方法,代码如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
// 如果 写锁被占用 或者 写锁被其他线程持有,返回-1失败
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
// 如果写锁被其他线程持有,返回-1失败
int r = sharedCount(c);
// 尝试获取锁, 多个读线程只有一个会成功,不成功的进入fullTryAcquireShared进行重试
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
// 第一个线程获取读锁
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
// 如果当前线程是第一个获取读锁的线程
firstReaderHoldCount++;
} else {
// 记录最后一个获取读锁的线程或记录其他线程读锁的可重入数
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
// 类似tryAcquireShared,但是乏自旋获取
return fullTryAcquireShared(current);
}

如果当前要获取读锁的线程己经持有了写锁,则也可以获取读锁。 但是需要注意,当一个线程先获取了写锁,然后获取了读锁处理事情完毕后,要记得把读锁和写锁都释放掉,不能只释放写锁

读读共享

1、调用的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class Service {
private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();

public void read() {
try {
try {
lock.readLock().lock();
System.out.println("获得读锁" + Thread.currentThread().getName() + " " + System.currentTimeMillis());
Thread.sleep(1000);
} finally {
lock.readLock().unlock();
System.out.println("释放读锁" + Thread.currentThread().getName() + " " + System.currentTimeMillis());
}
} catch (Exception e) {
e.printStackTrace();
}
}
}

2、创建两个线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class ThreadA extends Thread{

private Service service;

public ThreadA(Service service) {
this.service = service;
}

@Override
public void run() {
service.read();
}
}

public class ThreadB extends Thread{

private Service service;

public ThreadB(Service service) {
this.service = service;
}

@Override
public void run() {
service.read();
}
}

3、测试

1
2
3
4
5
6
7
8
9
10
11
12
public class Test {
public static void main(String[] args) throws InterruptedException {
Service service = new Service();
ThreadA threadA = new ThreadA(service);
threadA.setName("A");
threadA.start();

ThreadB threadB = new ThreadB(service);
threadB.setName("B");
threadB.start();
}
}

4、由打印可见,读锁是可以共享的,支持多个Thread可以同时进行读取操作

1
2
3
4
获得读锁A 1616048679654
获得读锁B 1616048679655
释放读锁A 1616048680662
释放读锁B 1616048680662

写写互斥

1、在service中新增写方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public void write() {
try {
try {
lock.writeLock().lock();
System.out.println("获得写锁" + Thread.currentThread().getName() + " " + System.currentTimeMillis());
Thread.sleep(1000);
} finally {
lock.writeLock().unlock();
System.out.println("释放写锁" + Thread.currentThread().getName() + " " + System.currentTimeMillis());
}
} catch (Exception e) {
e.printStackTrace();
}
}

2、将两个线程调用方法修改成 write()

3、测试,由打印可见。写锁之间是互斥的,同一时刻只允许一个Thread进行写入操作。

1
2
3
4
获得写锁A 1616048968453
释放写锁A 1616048969461
获得写锁B 1616048969462
释放写锁B 1616048970470

读写互斥

1、将线程A调用获取读锁方法,线程B调用获取写锁方法

2、由打印可见,读写操作是互斥的

1
2
3
4
获得读锁A 1616049259073
释放读锁A 1616049260074
获得写锁B 1616049260074
释放写锁B 1616049261085

写读互斥

1、沿用上面代码

2、修改测试代码,先调用 写锁线程B

1
2
3
4
5
6
7
8
9
10
11
12
13
public class Test {
public static void main(String[] args) throws InterruptedException {
Service service = new Service();

ThreadB threadB = new ThreadB(service);
threadB.setName("B");
threadB.start();

ThreadA threadA = new ThreadA(service);
threadA.setName("A");
threadA.start();
}
}

3、由打印可见,写读操作也是互斥的。即只要出现“写操作 ”,就是互斥的

1
2
3
4
获得写锁B 1616049412361
释放写锁B 1616049413378
获得读锁A 1616049413378
释放读锁A 1616049414391

读写缓存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
class MyCache {

/**
* 缓存中的东西,必须保持可见性,因此使用volatile修饰
*/
private volatile Map<String, Object> map = new HashMap<>();

/**
* 创建一个读写锁
* 它是一个读写融为一体的锁,在使用的时候,需要转换
*/
private ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();

/**
* 定义写操作
*/
public void put(String key, Object value) {

rwLock.writeLock().lock(); // 创建一个写锁

try {

System.out.println(Thread.currentThread().getName() + "\t 正在写入:" + key);

try {
TimeUnit.MILLISECONDS.sleep(300); // 模拟网络拥堵,延迟0.3秒
} catch (InterruptedException e) {
e.printStackTrace();
}

map.put(key, value);

System.out.println(Thread.currentThread().getName() + "\t 写入完成");

} catch (Exception e) {
e.printStackTrace();
} finally {
// 写锁释放
rwLock.writeLock().unlock();
}
}

/**
* 获取
*
* @param key
*/
public void get(String key) {

rwLock.readLock().lock(); // 读锁
try {
System.out.println(Thread.currentThread().getName() + "\t 正在读取:");
try {
TimeUnit.MILLISECONDS.sleep(300); // 模拟网络拥堵,延迟0.3秒
} catch (InterruptedException e) {
e.printStackTrace();
}

System.out.println(Thread.currentThread().getName() + "\t 读取完成:" + map.get(key));

} catch (Exception e) {
e.printStackTrace();
} finally {
// 读锁释放
rwLock.readLock().unlock();
}
}

public void clean() {
map.clear();
}

}

class ReadWriteLockDemo {

public static void main(String[] args) {

MyCache myCache = new MyCache();

// 线程操作资源类,5个线程写
for (int i = 1; i <= 5; i++) {
// lambda表达式内部必须是final
final int tempInt = i;
new Thread(() -> {
myCache.put(tempInt + "", tempInt + "");
}, "第" + String.valueOf(i) + "个线程").start();
}

// 线程操作资源类, 5个线程读
for (int i = 1; i <= 5; i++) {
// lambda表达式内部必须是final
final int tempInt = i;
new Thread(() -> {
myCache.get(tempInt + "");
}, "第" + String.valueOf(i) + "个线程").start();
}
}
}

输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
第1个线程	 正在写入:1
第1个线程 写入完成
第2个线程 正在写入:2
第2个线程 写入完成
第3个线程 正在写入:3
第3个线程 写入完成
第5个线程 正在写入:5
第5个线程 写入完成
第4个线程 正在写入:4
第4个线程 写入完成
第1个线程 正在读取:
第2个线程 正在读取:
第3个线程 正在读取:
第5个线程 正在读取:
第4个线程 正在读取:
第4个线程 读取完成:4
第5个线程 读取完成:5
第3个线程 读取完成:3
第2个线程 读取完成:2
第1个线程 读取完成:1

构造线程安全的list

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class SafeList<T extends Object> {

private List<T> list = new ArrayList<T>();
private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private final Lock readLock = readWriteLock.readLock();
private final Lock writeLock = readWriteLock.writeLock();

// 添加数据
public void add(T t) {
writeLock.lock();
try {
list.add(t);
} finally {
writeLock.unlock();
}
}

// 删除数据
public void remove(T t) {
writeLock.lock();
try {
list.remove(t);
} finally {
writeLock.unlock();
}
}

// 获取数据
public void get(int index) {
readLock.lock();
try {
list.get(index);
} finally {
readLock.unlock();
}
}

}

总结

本节介绍了读写锁ReentrantReadWriteLock的原理,它的底层是使用AQS实现的。ReentrantReadWriteLock巧妙地使用AQS的状态值的高16位表示获取到读锁的个数,低16位表示获取写锁的线程的可重入次数,并通过CAS对其进行操作实现了读写分离,这在读多写少的场景下比较适用。

JUC包中的StampedLock锁探究

概述

JDK 8中新增的StampedLock是并发包里面JDK8版本新增的一个锁,该锁提供了三种模式的读写控制,当调用获取锁的系列函数时,会返回一个long型的变量,我们称之为戳记(stamp), 这个戳记代表了锁的状态。其中 try 系列获取锁的函数,当获取锁失败后会返回为0的
stamp值。当调用 释放锁 和 转换锁 的方法时需要传入获取锁时返回的stamp值。

StampedLock提供的三种读写模式的锁分别如下:

写锁writeLock:

  1. 是一个排它锁或者独占锁,某时只有一个线程可以获取该锁, 当二个线程获取该锁后,其他请求读锁和写锁的线程必须等待,这类似于ReentrantReadWriteLock的写锁(不同的是这里的写锁是不可重入锁);
  2. 当目前没有线程持有读锁或者写锁时才可以获取到该锁。 请求该锁成功后会返回一个stamp变量用来表示该锁的版本当释放该锁时需要调用unlockWrite方法并传递获取锁时的stamp参数。并且它提供了非阻塞的tryWriteLock方法。

悲观读锁readLock:

  1. 是一个共享锁,在没有线程获取独占写锁的情况下,多个线程可以同时获取该锁
  2. 如果己经有线程持有写锁,则其他线程请求获取该读锁会被阻塞,这类似于ReentrantReadWriteLock的读锁(不同的是这里的读锁是不可重入锁)。
  3. 这里说的悲观是指在具体操作数据前其会悲观地认为其他线程可能要对自己操作的数据进行修改,所以需要先对数据加锁,这是在读少写多的情况下的一种考虑。
  4. 请求该锁成功后会返回一个stamp变量用来表示该锁的版本,当释放该锁时需要调用unlockRead方法并传递stamp参数。并且它提供了非阻塞的tryReadLock方法。

乐观读锁 tryOptimisticRead:

  1. 它是相对于悲观锁来说的,在操作数据前并没有通过CAS设置锁的状态,仅仅通过位运算测试。如果当前没有线程持有写锁,则简单地返回一个非0的stamp版本信息
  2. 获取该stamp后在具体操作数据前还需要调用validate方法验证该stamp是否己经不可用,也就是看当调用tryOptimisticRead返回stamp后到当前时间期间是否有其他线程持有了写锁,如果是则validate会返回0,否则就可以使用该stamp版本的锁对数据进行操作
  3. 由于tryOptimisticRead并没有使用CAS设置锁状态,所以不需要显式地释放该锁。 该锁的一个特点是适用于读多写少的场景, 因为获取读锁只是使用位操作进行检验,不涉及CAS操作,所以效率会高很多,
  4. 但是同时由于没有使用真正的锁,在保证数据一致性上需要复制一份要操作的变量到方法栈,并且在操作数据时可能其他写线程己经修改了数据,而我们操作的是方法战里面的数据,也就是一个快照,所以最多返回的不是最新的数据,但是一致性还是得到保障的

另外,StampedLock的读写锁都是不可重入锁,所以在获取锁后释放锁前不应该再调用会获取锁的操作,以避免造成调用线程被阻塞。当多个线程同时尝试获取读锁和写锁时,谁先获取锁没有一定的规则,完全都是尽力而为,是随机的。并且该锁不是直接实现Lock 或 ReadWriteLock接口,而是其在内部自己维护了一个双向阻塞队列

JUC包中ThreadLocal Random类原理剖析

ThreadLocalRandom类是JDK7在JUC包下新增的随机数生成器,它弥补了Random类在多线程下的缺陷。本章讲解为何要在JUC下新增该类,以及该类的实现原理。

Random类及其局限性

在JDK7之前包括现在,java.util.Random都是使用比较广泛的随机数生成工具类,而且java.Iang.Math中的随机数生成也使用的是java.util.Random的实例。下面先看看java.util.Random的使用方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 生成默认的随机数种子的随机数生成器! 
Random random = new Random();
random.nextInt(5);

public int nextInt(int bound) {
// 参数检查
if (bound <= 0)
throw new IllegalArgumentException(BadBound);

// 根据老的种子, 生成新的种子!
int r = next(31);
// 根据新的种子计算随机数
.....
return r;
}

每个Random实例中都有一个原子性的种子变量来记录当前种子的值, 当要生成新的随机数时, 需要根据当前种子计算新的种子并更新回原子变量种! 但是当在多线程下, 当多个线程同时计算新的种子时, 多个线程会竞争同一个原子变量的更新操作, 由于原子变量的更新是CAS操作, 会造成大量线程自旋重试! 这降低了并发性能!

解决方案:使得通过老种子产生新种子的步骤是原子的即可!

即, 当多个线程根据老种子计算新种子时, 第一个线程的新种子被计算出来后, 第二个线程要丢弃自己的老种子, 而使用第一个线程的新种子来计算自己的新种子…..

ThreadLocalRandom

每个Random实例里面都有一个原子性的种子变量用来记录当前的种子值,当要生成新的随机数时需要根据当前种子计算新的种子并更新回原子变量。在多线程下使用单个Random实例生成随机数时,当多个线程同时计算随机数来计算新的种子时, 多个线程会竞争同一个原子变量的更新操作,由于原子变量的更新是CAS操作,同时只有一个线程会成功,所以会造成大量线程进行自旋重试, 这会降低并发性能,所以ThreadLocalRandom应运而生。

1
2
3
4
5
6
7
8
9
public class Test {
public static void main(String[] args) {
ThreadLocalRandom localRandom = ThreadLocalRandom.current();

for (int i = 0; i < 5; i++) {
System.out.println(localRandom.nextInt(10));
}
}
}

其中,代码(10) 调用ThreadLocalRandom.current() 来获取当前线程的随机数生成器。下面来分析下ThreadLocalRandom的实现原理。 从名字上看它会让我们联想到在基础篇中讲解的ThreadLocalRandom通过让每一个线程复制一份变量,使得在每个线程对变量进行操作时实际是操作自己本地内存里面的副本,从而避免了对共享变量进行同步。实际上ThreadLocalRandom的实现也是这个原理,Random的缺点是多个线程会使用同一个原子性种子变量,从而导致对原子变量更新的竞争,如图3-1 所示。

那么,如果每个线程都维护一个种子变量,则每个线程生成随机数时都根据自己老的种子计算新的种子,并使用新种子更新老的种子,再根据新种子计算随机数, 就不会存在竞争问题了,这会大大提高并发性能。 ThreadLocalRandom原理如图3-2所示。

源码分析

首先看下ThreadLocalRandom的类图结构,如图3-3所示。

从图中可以看出ThreadLocalRandom类继承了Random类并重写了nextlnt方法, 在ThreadLocalRandom类中并没有使用继承自Random类的原子性种子变量。 在ThreadLocalRandom中并没有存放具体的种子,具体的种子存放在具体的调用线程的threadLocalRandomSeed变量里面。 ThreadLocalRandom类似于ThreadLocal类,就是个工具类。 当线程调用ThreadLocalRandom的current方法时,ThreadLocalRandom负责初始化调用线程的threadLocalRandomSeed变量, 也就是初始化种子。

当调用ThreadLocalRandom的nextlnt方法时,实际上是获取当前线程的threadLocalRandomSeed变量作为当前种子来计算新的种子,然后更新新的种子到当前线程的threadLocalRandomSeed变量,而后再根据新种子并使用具体算法计算随机数。这里需要注意的是,threadLocalRandomSeed变量就是Thread类里面的一个普通long变量,它并不是原子性变量。 其实道理很简单,因为这个变量是线程级别的,所以根本不需要使用原子性变量,如果你还是不理解可以思考下ThreadLocal的原理。

其中seeder和probeGenerator是两个原子性变量,在初始化调用线程的种子和探针变量时会用到它们,每个线程只会使用一次。

另外,变量instance是ThreadLocalRandom的一个实例,该变量是static 的。当多线程通过ThreadLocalRandom的current方法获取ThreadLocalRandom的实例时,其实获取的是同一个实例。 但是由于具体的种子是存放在线程里面的,所以在ThreadLocaIRandom
的实例里面只包含与线程无关的通用算法,所以它是线程安全的。

下面看看ThreadLocaIRandom的主要代码的实现逻辑。

1、Unsafe 机制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long SEED;
private static final long PROBE;
private static final long SECONDARY;
static {
try {
// 获取unsafe实例
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> tk = Thread.class;
// 获取Thread类里面threadLocalRandomSeed变量在Thread实例里面的偏移量
SEED = UNSAFE.objectFieldOffset
(tk.getDeclaredField("threadLocalRandomSeed"));
// 获取Thread类里面threadLocalRandomProbe变量在Thread实例里面的偏移量
PROBE = UNSAFE.objectFieldOffset
(tk.getDeclaredField("threadLocalRandomProbe"));
// 获取Thread类里面threadLocalRandomSecondarySeed变量在Thread实例里面的偏移量
SECONDARY = UNSAFE.objectFieldOffset
(tk.getDeclaredField("threadLocalRandomSecondarySeed"));
} catch (Exception e) {
throw new Error(e);
}
}

2、ThreadLocalRandom current()方法

该方法获取ThreadLocalRandom实例,并初始化调用线程中的threadLocalRandomSeed和threadLocalRandomProbe变量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
static final ThreadLocalRandom instance = new ThreadLocalRandom();

public static ThreadLocalRandom current() {
// (12)
if (UNSAFE.getInt(Thread.currentThread(), PROBE) == 0)
// (13)
localInit();
// (14)
return instance;
}

static final void localInit() {
int p = probeGenerator.addAndGet(PROBE_INCREMENT);
int probe = (p == 0) ? 1 : p; // skip 0
long seed = mix64(seeder.getAndAdd(SEEDER_INCREMENT));
Thread t = Thread.currentThread();
UNSAFE.putLong(t, SEED, seed);
UNSAFE.putInt(t, PROBE, probe);
}

在如上代码(12) 中,如果当前线程中threadLocalRandomProbe的变量值为0 (默认情况下线程的这个变量值为0),则说明当前线程是第一次调用ThreadLocalRandom的current方法,那么就需要调用locallnit方法计算当前线程的初始化种子变量。这里为了延迟初始化,在不需要使用随机数功能时就不初始化Thread类中的种子变量, 这是一种优化。

代码 (13) 首先根据probeGenerator计算当前线程中threadLocalRandomProbe的初始化值,然后根据seeder计算当前线程的初始化种子,而后把这两个变量设置到当前线程。代码 (14)返回ThreadLocalRandom的实例。需要注意的是,这个方法是静态方法, 多个线程返回的是同一个ThreadLocalRandom实例。

3、int nextlnt(int bound)方法

计算当前线程的下一个随机数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public int nextInt(int bound) {
// (15) 参数校验
if (bound <= 0)
throw new IllegalArgumentException(BadBound);
// (16) 根据当前线程中的种子计算新种子
int r = mix32(nextSeed());
// (17)根据新种子和bound计算随机数
int m = bound - 1;
if ((bound & m) == 0) // power of two
r &= m;
else { // reject over-represented candidates
for (int u = r >>> 1;
u + m - (r = u % bound) < 0;
u = mix32(nextSeed()) >>> 1)
;
}
return r;
}

如上代码的逻辑步骤与Random相似,我们重点看下nextSeed() 方法。

1
2
3
4
5
6
7
8
 private static final long GAMMA = 0x9e3779b97f4a7c15L;

final long nextSeed() {
Thread t; long r; // read and update per-thread seed
UNSAFE.putLong(t = Thread.currentThread(), SEED,
r = UNSAFE.getLong(t, SEED) + GAMMA);
return r;
}

在如上代码中,首先使用r = UNSAFE.getLong(t, SEED) 获取当前线程中threadLocalRandomSeed变量的值, 然后在种子的基础上累加GAMMA值作为新种子,而后使用UNSAFE的putLong方法把新种子放入当前线程的threadLocalRandomSeed变量中。

总结

ThreadLocalRandom使用ThreadLocal的原理,让每个线程都持有一个本地的种子变量,该种子变量只有在使用随机数时才会被初始化。在多线程下计算新种子时是根据自己线程内维护的种子变量进行更新,从而避免了竞争。

JUC包中原子操作类原理剖析

JUC包提供了一系列的原子性操作类,这些类都是使用非阻塞算法CAS实现的,相比使用锁实现原子性操作这在性能上有很大提高。由于原子性操作类的原理都大致相同,所以本章只讲解最简单的AtomicLong类的实现原理以及JDK8 中新增的LongAdder和LongAccumulator类的原理。

原子变量操作类

JUC并发包中包含有Atomiclnteger、 AtomicLong和AtomicBoolean等原子性操作类,它们的原理类似,本章讲解AtomicLong类。AtomicLong是原子性递增或者递减类,其内部使用Unsafe来实现,我们看下面的代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class AtomicLong extends Number implements java.io.Serializable {
private static final long serialVersionUID = 1927816293512124184L;

// (1)获取Unsafe实例
private static final Unsafe unsafe = Unsafe.getUnsafe();
// (2) 存放交量value的偏移量
private static final long valueOffset;

// (3) 判断JVM是否支持Long类型无锁CAS
static final boolean VM_SUPPORTS_LONG_CAS = VMSupportsCS8();
private static native boolean VMSupportsCS8();

static {
try {
// (4) 获取value在AtomicLong中的偏移量
valueOffset = unsafe.objectFieldOffset
(AtomicLong.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}
// (5) 实际变量值
private volatile long value;

public AtomicLong(long initialValue) {
value = initialValue;
}

...
}

代码(1 )通过Unsafe. getUnsafe( )方法获取到 Unsafe 类的实例, 这里你可能会有疑问,为何能通过 Unsafe.getUnsafe ( )方法获取到Unsafe类的实例?其实这是因为AtomicLong类也是在rt.jar包下面的,AtomicLong类就是通过BootStarp类加载器进行加载的

代码(5) 中的value被声明为volatile 的,这是为了在多线程下保证内存可见性,value是具体存放计数的变量

代码(2) (4)获取value变量在AtomicLong类中的偏移量。

下面重点看下AtomicLong中的主要函数。

1、递增和递减操作代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// (6) 调用unsafe方法, 原子性设置value值为原始值+l,返回值为递增后的值
public final long incrementAndGet() {
return unsafe.getAndAddLong(this, valueOffset, 1L) + 1L;
}

// (7) 调用unsafe方法,调用unsafe方法,原子性设置value值为原始值-1,返回值为递减之后的值
public final long decrementAndGet() {
return unsafe.getAndAddLong(this, valueOffset, -1L) - 1L;
}

// (8) 调用unsafe方法,原子性设置value值为原始值+1,返回值为原始值
public final long getAndIncrement() {
return unsafe.getAndAddLong(this, valueOffset, 1L);
}

// (9) 调用unsafe方法,原子性设置value{值为原始值-1,返回值为原始值
public final long getAndDecrement() {
return unsafe.getAndAddLong(this, valueOffset, -1L);
}

在如上代码内部都是通过调用Unsafe的getAndAddLong方法来实现操作,这个函数是个原子性操作,这里第一个参数是AtomicLong实例的引用, 第二个参数是value变量在AtomicLong中的偏移值,第三个参数是要设置的原对象值,第四个变量的是期待的变量值

其中, getAndlncrement方法在JDK7中的实现逻辑为

1
2
3
4
5
6
7
8
public final long getAndIncrement() {
while(true) {
long current = get();
long next = current + 1;
if (compareAndSet (current, next))
return current;
}
}

在如上代码中,每个线程是先拿到变量的当前值(由于value是volatile变量,所以这里拿到的是最新的值),然后在工作内存中对其进行增 1 操作,而后使用CAS修改变量的值。如果设置失败,则循环继续尝试, 直到设置成功。

而JDK8中的逻辑为

1
2
3
public final long getAndIncrement() {
return unsafe.getAndAddLong(this, valueOffset, 1L);
}

其中JDK8中unsafe.getAndAddLong的代码为

1
2
3
4
5
6
7
8
public final long getAndAddLong(Object var1, long var2, long var4) {
long var6;
do {
var6 = this.getLongVolatile(var1, var2);
} while(!this.compareAndSwapLong(var1, var2, var6, var6 + var4));

return var6;
}

可以看到, JDK7的AtomicLong中的循环逻辑已经被JDK8 中的原子操作类UNsafe 内置了, 之所以内置应该是考虑到这个函数在其他地方也会用到,而内置可以提高复用性。

下面通过一个多线程使用AtomicLong统计0的个数的例子来加深对AtomicLong的理解。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class AtomicDemo {
// (10) 创建Long型原子计数器
private static AtomicLong atomicLong = new AtomicLong();
// (11) 创建数据源
private static Integer[] arrayOne = new Integer[]{0, 1, 2, 3, 0, 5, 6, 0, 56, 0};
private static Integer[] arrayTwo = new Integer[]{10, 1, 2, 3, 11, 5, 6, 0, 56, 0};

public static void main(String[] args) throws InterruptedException {
Thread threadOne = new Thread(new Runnable() {
@Override
public void run() {
int size = arrayOne.length;
for (int i = 0; i < size; i++) {
// public int intValue() { return value; }
if (arrayOne[i].intValue() == 0) {
atomicLong.incrementAndGet();
}
}
}
});

Thread threadTwo= new Thread(new Runnable() {
@Override
public void run() {
int size = arrayTwo.length;
for (int i = 0; i < size; i++) {
if (arrayTwo[i].intValue() == 0) {
atomicLong.incrementAndGet();
}
}
}
});

// (14)启动子线程
threadOne.start();
threadTwo.start();
// (16) 等待线程执行完毕
threadOne.join();
threadTwo.join();
System.out.println("count 0:\t" + atomicLong.get());
}

}

输出结果为:

1
count 0:	6

如上代码中的两个线程各自统计自己所持数据中0的个数,每当找到一个0就会调用AtomicLong的原子性递增方法。

在没有原子类的情况下, 实现计数器需要使用一定的同步措施, 比如使用synchronized关键字等, 但是这些都是阻塞算法, 对性能有一定损耗, 而本章介绍的这些原子操作类都使用CAS非阻塞算法,性能更好。 但是在高并发情况下AtomicLong还会存在性能问题。 JDK8提供了一个在高并发下性能更好的LongAdder类,下面我们来讲解这个类。

JDK 8新增的原子操作类LongAdder

简单介绍

使用AtomicLong时,在高并发下大量线程会同时去竞争更新同一个原子变量,但是由于同时只有一个线程的CAS操作会成功,这就造成了大量线程竞争失败后,会通过无限循环不断进行自旋尝试CAS的操作,而这会白白浪费CPU资源。

因此JDK8新增了一个原子性递增或者递减类LongAdder用来克服在高并发下使用AtomicLong的缺点。 既然AtomicLong的性能瓶颈是由于过多线程同时去竞争一个变量的更新而产生的,那么,是不是就解决了性能问题?是的,LongAdder就是这个思路。 下面通过图来理解两者设计的不同之处,如图4-1 所示。

如图4-1 所示,使用AtomicLong时,是多个线程同时竞争同一个原子变量。

如图4-2所示,使用LongAdder时,则是在内部维护多个 Cell 变量,每个 Cell 里面有一个初始值为 0 的long型变量,这样,在同等并发量的情况下,争夺单个变量更新操作的线程量会减少,这变相地减少了争夺共享资源的并发量。另外,多个线程在争夺同一个Cell原子变量时如果失败了, 它并不是在当前Cell 变量上一直自旋CAS重试,而是尝试在其他Cell 的变量上进行CAS尝试,这个改变增加了当前线程重试CAS成功的可能性。最后,在获取LongAdder当前值时,是把所有Cell变量的value值累加后再加上base返回的

LongAdder维护了一个延迟初始化的原子性更新数组(默认情况下Cell数组是null)和一个基值变量base。 由于Cells 占用的内存是相对比较大的,所以一开始并不创建它,而是在需要时创建,也就是惰性加载。当一开始判断Cell数组是null并且并发线程较少时,所有的累加操作都是对base变量进行的。 保持Cell 数组的大小为2的 N次方,在初始化时Cell数组中的Cell 元素个数为2,数组里面的变量实体是Cell类型。Cell类型是AtomicLong的一个改进,用来减少缓存的争用,也就是解决伪共享问题。

对于大多数孤立的多个原子操作进行字节填充是浪费的,因为原子性操作都是无规律地分散在内存中的(也就是说多个原子性变量的内存地址是不连续的), 多个原子变量被放入同一个缓存行的可能性很小。 但是原子性数组元素的内存地址是连续的,所以数组内的多个元素能经常共享缓存行,因此这里使用@sun.misc.Contended注解对Cell类进行字节填充,这防止了数组中多个元素共享一个缓存行,在性能上是一个提升。

LongAdder代码分析

为了解决高并发下多线程对一个变量CAS争夺失败后进行自旋而造成的降低并发性能问题,LongAdder在内部维护多个Cell 元素(一个动态的Cell 数组)来分担对单个变量进行争夺的开销。下面围绕以下话题从源码角度来分析LongAdder的实现: (1) LongAdder的结构是怎样的?(2)当前线程应该访问Cell数组里面的哪一个Cell元素?( 3 ) 如何初始化Cell 数组? ( 4 ) Cell 数组如何扩容? ( 5)线程访问分配的Cell 元素有冲突后如何处理? (6)如何保证线程操作被分配的Cell 元素的原子性?

首先看下LongAdder的类图结构,如图4-3所示。

由该图可知,LongAdder类继承自Striped64类,在Striped64 内部维护着三个变量。LongAdder的真实值其实是base 的值与Cell 数组里面所有Cell 元素中的value值的累加,base是个基础值,默认为0。 cellsBusy用来实现自旋锁,状态值只有 0 和 1,当创建Cell元素,扩容Cell 数组或者初始化Cell 数组时,使用CAS操作该变量来保证同时只有一个线程可以进行其中之一的操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@sun.misc.Contended static final class Cell {
volatile long value;
Cell(long x) { value = x; }
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}

// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long valueOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> ak = Cell.class;
valueOffset = UNSAFE.objectFieldOffset(ak.getDeclaredField("value"));
} catch (Exception e) {
throw new Error(e);
}
}
}

可以看到,Cell的构造很简单,其内部维护一个被声明为 volatile 的变量, 这里声明为volatile是因为线程操作value变量时没有使用锁, 为了保证变量的内存可见性这里将其声明为volatile的。另外cas函数通过CAS操作,保证了当前线程更新时被分配的Cell元素中value值的原子性。另外, Cell类使用@sun.misc. Contended修饰是为了避免伪共享。到这里我们回答了问题 1 ( LongAdder的结构是怎样的?)和问题 6(如何保证线程操作被分配的Cell 元素的原子性?)。

long sum() :返回当前的值,内部操作是累加所有Cell 内部的value值后再累加base。例如下面的代码, 由于计算总和时没有对Cell 数组进行加锁,所以在累加过程中可能有其他线程对 Cell 中的值进行了修改, 也有可能对数组进行了扩容,所以sum返回的值并不是非常精确的,其返回值并不是一个调用sum方法时的原子快照值。

1
2
3
4
5
6
7
8
9
10
11
public long sum() {
Cell[] as = cells; Cell a;
long sum = base;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}

void reset():为重置操作, 如下代码把base置为0, 如果Cell 数组有元素,则元素值被重置为0。

1
2
3
4
5
6
7
8
9
10
11
public void reset() {
Cell[] as = cells; Cell a;
base = 0L;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
// a = as[i] 对象引用
if ((a = as[i]) != null)
a.value = 0L;
}
}
}

long sumThenReset():是sum的改造版本,如下代码在使用sum累加对应的 Cell 值后,把当前 Cell 的值重置为0, base重置为0。这样, 当多线程调用该方法时会有问题,比如考虑第一个调用线程清空Cell的值,则后一个线程调用时累加的都是0值,所以需要保证是重置之前发生的最终值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public long sumThenReset() {
Cell[] as = cells; Cell a;
long sum = base;
base = 0L;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null) {
sum += a.value;
a.value = 0L;
}
}
}
return sum;
}

long longValue(): 等价于sum()。

1
2
3
public long longValue() {
return sum();
}

long longValue(): 等价于sum()。

下面主要看下add方法的实现,从这个方法里面就可以找到其他问题的答案。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
if ((as = cells) != null || !casBase(b = base, b + x)) { // (1)
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 || // (2)
(a = as[getProbe() & m]) == null || // (3)
!(uncontended = a.cas(v = a.value, v + x))) // (4)
longAccumulate(x, null, uncontended); // (5)
}
}

final boolean casBase(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
}

代码(1) 首先看 cells 是否为null,如果为null则当前在基础变量base上进行累加,这时候就类似AtomicLong的操作。

如果cells 不为null 或者线程执行代码(1 ) 的CAS操作失败了,则会去执行代码(2)。代码(2) (3)决定当前线程应该访问cells数组里面的哪一个Cell元素,如果当前线程映射的元素存在则执行代码 (4),使用 CAS 操作去更新分配的Cell元素的 value 值,如果当前线程映射的元素不存在或者存在但是CAS操作失败则执行代码 (5)。其实将代码 (2)(3)(4)合起来看就是获取当前线程应该访问的cells数组的Cell元素,然后进行CAS更新操作,只是在获取期间如果有些条件不满足则会跳转到代码(5)执行。另外当前线程应该访问cells数组的哪一个Cell元素是通过getProbe() & m进行计算的,其中m是当前cells数组元素个数-1,getProbe()则用于获取当前线程中变量threadLocalRandomProbe的值,这个值一开始为 0,在代码(5)里面会对其进行初始化。并且当前线程通过分配的Cell元素的cas函数来保证对Cell元素value值更新的原子性,到这里我们回答了 问题2(当前线程应该访问Cell数组里面的哪一个Cell元素?) 和问题6 (如何保证线程操作被分配的Cell 元素的原子性?)。

下面重点研究 longAccumulate 的代码逻辑,这是cells数组被初始化和扩容的地方。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
transient volatile Cell[] cells; // cells。如果为非null,则大小为2的幂。
transient volatile long base; // 基本值,主要在没有争用时使用,也用作表初始化过程中的后备。通过CAS更新。
transient volatile int cellsBusy; // 调整大小或创建单元时使用的自旋锁(通过CAS锁定)。

final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
// 初始化当前线程的变量threadLocalRandomProbe的值
int h;
if ((h = getProbe()) == 0) {
ThreadLocalRandom.current(); // force initialization
h = getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
for (;;) {
Cell[] as; Cell a; int n; long v;
if ((as = cells) != null && (n = as.length) > 0) { // (7)
if ((a = as[(n - 1) & h]) == null) { // (8)
if (cellsBusy == 0) { // Try to attach new Cell
Cell r = new Cell(x); // Optimistically create
if (cellsBusy == 0 && casCellsBusy()) {
boolean created = false;
try { // Recheck under lock
Cell[] rs; int m, j;
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
// 当前Cell存在,如l执行CAS设置(9 )
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
// 当前Cell数组元素个数大于cpu个数 (10)
else if (n >= NCPU || cells != as)
collide = false; // At max size or stale
// 是否有冲突(11)
else if (!collide)
collide = true;
// 如果当前元素个数没有达到CPU个数并且有冲突则扩容(12)
else if (cellsBusy == 0 && casCellsBusy()) {
try {
if (cells == as) { // Expand table unless stale
// 12.1
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
// 12.2
cellsBusy = 0;
}
// 12.3
collide = false;
continue; // Retry with expanded table
}
// (13) 为了能够找到一个空闲的Cell, 重新计算hash值, xorshift算法生成随机数
h = advanceProbe(h);
}
// 初始化Cell数组(14)
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try { // Initialize table
if (cells == as) {
// 14.1
Cell[] rs = new Cell[2];
// 14.2
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
// 14.3
cellsBusy = 0;
}
if (init)
break;
}
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break; // Fall back on using base
}
}

上面代码比较复杂, 这里我们主要关注问题3、 问题4和问题5。

当每个线程第一次执行到代码(6)时,会初始化当前线程变量threadLocalRandomProbe 的值,上面也说了,这个变量在计算当前线程应该被分配到cells数组的哪一个Cell 元素时会用到。

cells 数组的初始化是在代码(14) 中进行的, 其中cellsBusy是一个标示, 为0说明当前cells数组没有在被初始化或者扩容,也没有在新建Cell 元素,为 1 则说明cells数组

在被初始化或者扩容,或者当前在创建新的Cell元素、通过CAS操作来进行0或1状态的切换,这里使用casCellsBusy 函数。假设当前线程通过CAS设置cellsBusy为1,则当前线程开始初始化操作,那么这时候其他线程就不能进行扩容了。如代码(14.1)初始化cells数组元素个数为2,然后使用h&1计算当前线程应该访问cell 数组的哪个位置,也就是使用当前线程的threadLocalRandomProbe变量值& (cells数组元素个数–1),然后标示cells数组已经被初始化,最后代码(14.3)重置了cellsBusy标记。显然这里没有使用CAS操作,却是线程安全的,原因是cellsBusy是 volatile类型的,这保证了变量的内存可见性,另外此时其他地方的代码没有机会修改cellsBusy 的值。在这里初始化的cells数组里面的两个元素的值目前还是null。这里回答了问题3,知道了cells 数组如何被初始化。

cells数组的扩容是在代码(12)中进行的,对cells扩容是有条件的,也就是代码(10)(11)的条件都不满足的时候。具体就是当前cells 的元素个数小于当前机器CPU个数并且当前多个线程访问了cells中同一个元素,从而导致冲突使其中一个线程CAS失败时才会进行扩容操作。这里为何要涉及CPU个数呢﹖其实在基础篇中已经讲过,只有当每个CPU都运行一个线程时才会使多线程的效果最佳,也就是当cells数组元素个数与CPU个数一致时,每个Cell都使用一个CPU进行处理,这时性能才是最佳的。代码(12)中的扩容操作也是先通过CAS设置cellsBusy为1,然后才能进行扩容。假设CAS成功则执行代码(12.1)将容量扩充为之前的2倍,并复制Cell元素到扩容后数组。另外,扩容后cells 数组里面除了包含复制过来的元素外,还包含其他新元素,这些元素的值目前还是null。这里回答了问题4。

在代码(7 )(8) 中,当前线程调用add 方法并根据当前线程的随机数threadLocalRandomProbe和 cells元素个数计算要访问的Cell元素下标,然后如果发现对应下标元素的值为null,则新增一个Cell元素到cells数组,并且在将其添加到cells 数组之前要竞争设置cellsBusy为1。

代码(13)对CAS失败的线程重新计算当前线程的随机值threadLoca!RandomProbe,以减少下次访问cells元素时的冲突机会。这里回答了问题5。

小结

本节介绍了JDK8 中新增的LongAdder原子性操作类,该类通过内部cells数组分担了高并发下多线程同时对一个原子变量进行更新时的竞争量,让多个线程可以同时对cells数组里面的元素进行并行的更新操作。 另外,数组元素Cell 使用@sun.misc.Contended注解进行修饰, 这避免了cells数组内多个原子变量被放入同一个缓存行,也就是避免了伪共享,这对性能也是一个提升。

JUC包中并发List源码剖析

介绍

并发包中的并发List只有CopyOnWriteArrayList。 CopyOnWriteArrayList是一个线程安全的ArrayList,对其进行的修改操作都是在底层的一个复制的数组(快照)上进行的,也就是使用了写时复制策略。CopyOnWriteArraylist的类图结构如图所示。

在CopyOnWriteArrayList的类图中,每个CopyOnWriteArrayList对象里面有一个array数组对象用来存放具体元素,ReentrantLock独占锁对象用来保证同时只有一个线程对array进行修改。 这里只要记得ReentrantLock是独占锁,同时只有一个线程可以获取就可以了,后面会专门对JUC中的锁进行介绍。

主要方法源码解析

初始化

首先看下无参构造函数,如下代码在内部创建了一个大小为0的Object数组作为array的初始值。

1
2
3
public CopyOnWriteArrayList() {
setArray(new Object[0]);
}

然后看下有参构造函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 创建一个list,其内部元素是入参toCopyIn的副本 
public CopyOnWriteArrayList(E[] toCopyIn) {
setArray(Arrays.copyOf(toCopyIn, toCopyIn.length, Object[].class));
}

// 入参为集合,将集合里面的元素复制到本list
public CopyOnWriteArrayList(Collection<? extends E> c) {
Object[] elements;
if (c.getClass() == CopyOnWriteArrayList.class)
elements = ((CopyOnWriteArrayList<?>)c).getArray();
else {
elements = c.toArray();
// c.toArray might (incorrectly) not return Object[] (see 6260652)
if (elements.getClass() != Object[].class)
elements = Arrays.copyOf(elements, elements.length, Object[].class);
}
setArray(elements);
}

添加元素

CopyOnWriteArrayList中用来添加元素的函数有 add(E e)、add(int index, E element)等,它们的原理类似,所以本
节以 add(E e) 为例来讲解。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public boolean add(E e) {
// 获取独占锁(1 )
final ReentrantLock lock = this.lock;
lock.lock();
try {
// (3) 复制array到新数组,添加元素到新数组
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;

// (4) 使用新数组替换添加前的数组
setArray(newElements);
return true;
} finally {
// (5) 释放锁
lock.unlock();
}
}

private transient volatile Object[] array;

final Object[] getArray() {
return array;
}

final void setArray(Object[] a) {
array = a;
}

在如上代码中,调用add方法的线程会首先执行代码(1) 去获取独占锁,如果多个线程都调用add方法则只有一个线程会获取到该锁,其他线程会被阻塞挂起直到锁被释放。

所以一个线程获取到锁后,就保证了在该线程添加元素的过程中其他线程不会对array进行修改。

线程获取锁后执行代码 (2) 获取array, 然后执行代码(3)复制array到一个新数组(从这里可以知道新数组的大小是原来数组大小增加1,所以CopyOnWriteArrayList是无界list),并把新增的元素添加到新数组

然后执行代码(4)使用新数组替换原数组,并在返回前释放锁。由于加了锁,所以整个add过程是个原子性操作。需要注意的是,在添加元素时,首先复制了一个快照,然后在快照上进行添加,而不是直接在原来数组上进行。

获取指定位置元素

使用E get(int index) 获取下标为 index 的元素,如果元素不存在则抛出 IndexOutOfBoundsException 异常。

1
2
3
4
5
6
7
8
9
10
11
public E get(int index) {
return get(getArray(), index);
}

private E get(Object[] a, int index) {
return (E) a[index];
}

final Object[] getArray() {
return array;
}

在如上代码中,当线程x调用get 方法获取指定位置的元素时,分两步走, 首先获取array数组(这里命名为步骤A),然后通过下标访问指定位置的元素(这里命名为步骤B),这是两步操作, 但是在整个过程中并没有进行加锁同步。假设这时候List内容如 图5-2 所示,里面有1、 2、 3三个元素。

由于执行步骤A和步骤B没有加锁,这就可能导致在线程x执行完步骤A后执行步骤B前, 另外一个线程y进行了remove操作,假设要删除元素1。 remove操作首先会获取独占锁, 然后进行写时复制操作,也就是复制一份当前array数组, 然后在复制的数组里面删除线程x通过get 方法要访问的元素1,之后让array指向复制的数组。 而这时候array之前指向的数组的引用计数为1而不是0, 因为线程x还在使用它,这时线程x开始执行步骤B,步骤B操作的数组是线程y删除元素之前的数组,如图5-3所示。

所以,虽然线程y己经删除了index处的元素,但是线程x的步骤B还是会返回index处的元素,这其实就是写时复制策略产生的弱一致性问题。

简单来说,线程x获取到array数组,然后 线程y 开始remove操作就获取到独占锁,把复制快照的指定元素移出但是没有来得及调用 setArray(newElements),就还是指向remove前的 线程x 调用的数组,这时候 线程x 通过下标访问指定位置的元素就还是原先移出前的下标位置元素。

修改指定元素

使用E set (int index,E element)修改list 中指定元素的值,如果指定位置的元素不存在则抛出IndexOutOfBoundsException异常,代码如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public E set(int index, E element) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
E oldValue = get(elements, index);

if (oldValue != element) {
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len);
newElements[index] = element;
setArray(newElements);
} else {
// Not quite a no-op; ensures volatile write semantics
setArray(elements);
}
return oldValue;
} finally {
lock.unlock();
}
}

如上代码首先获取了独占锁,从而阻止其他线程对array数组进行修改,然后获取当前数组,并调用get方法获取指定位置的元素,如果指定位置的元素值与新值不一致则创建新数组井复制元素,然后在新数组上修改指定位置的元素值并设置新数组到array如果指定位置的元素值与新值一样,则为了保证volatile语义,还是需要重新设置array,虽然array的内容并没有改变。

删除元素

删除list 里面指定的元素,可以使用E remove(int index)、boolean remove(Object o) 和 boolean remove(Object o, Object[] snapshot, int index) 等方法,它们的原理一样。下面讲解下 remove(int index) 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public E remove(int index) {
final ReentrantLock lock = this.lock;
// 获取独占锁
lock.lock();
try {
// 获取数组
Object[] elements = getArray();
int len = elements.length;
// 获取指定元素
E oldValue = get(elements, index);
int numMoved = len - index - 1;
// 如果要删除的是最后一个元素
if (numMoved == 0)
setArray(Arrays.copyOf(elements, len - 1));
else {
// 分两次复制删除后剩余的元素到新数组
Object[] newElements = new Object[len - 1];
// arraycopy(Object src, int srcPos, Object dest, int destPos, int length);
System.arraycopy(elements, 0, newElements, 0, index);
System.arraycopy(elements, index + 1, newElements, index,
numMoved);
// 使用新数组代替老数组
setArray(newElements);
}
return oldValue;
} finally {
// 释放锁
lock.unlock();
}
}

如上代码其实和新增元素的代码类似,首先获取独占锁以保证删除数据期间其他线程不能对array进行修改,然后获取数组中要被删除的元素,并把剩余的元素复制到新数组,之后使用新数组替换原来的数组,最后在返回前释放锁。

arraycopy详解

1
2
3
4
5
6
7
8
public static native void arraycopy(Object src,  int  srcPos,
Object dest, int destPos,
int length);

arraycopy(Object src, int 被拷贝对象数组的起始索引,
Object 目标对象, int 目标对象起始索引, int 目标对象拷贝长度);

// 其中第四个参数目标对象拷贝长度如果小于 被拷贝对象src的长度,不足的部分按照数据类型默认值填充

使用Demo

1
2
3
4
5
6
7
8
9
10
11
12
public static void main(String[] args) {
int[] arr = {1, 3, 5, 8, 7};
int len = arr.length;

int[] newArr = new int[len - 1];
int index = 2;
System.arraycopy(arr, 0, newArr, 0, index);
System.out.println(Arrays.toString(newArr));

System.arraycopy(arr, index + 1, newArr, index, len - index - 1);
System.out.println(Arrays.toString(newArr));
}

输出结果:

1
2
[1, 3, 0, 0]
[1, 3, 8, 7]

弱一致性的迭代器

遍历列表元素可以使用迭代器。 在讲解什么是迭代器的弱一致性前,先举一个例子来说明如何使用迭代器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public class CopyOnWriteArrayListDemo {
public static void main(String[] args) throws InterruptedException {
CopyOnWriteArrayList<Object> arrayList = new CopyOnWriteArrayList<>();
arrayList.add("hello");
arrayList.add("alibaba");
arrayList.add("china");
arrayList.add("to");
arrayList.add("hangzhou");

Thread threadOne = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("threadOne:" + arrayList);
// 修改list中下标为1的元素为 heihei
arrayList.set(1, "shoppe");
// 删除元素
arrayList.remove(2);
arrayList.remove(3);
System.out.println("threadOne:" + arrayList);
}
});

Thread threadTwo = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("threadTwo:" + arrayList);
// 修改list中下标为1的元素为 heihei
arrayList.set(2, "shoppers");
// 删除元素
System.out.println("threadTwo:" + arrayList);
}
});

// 宝藏在修改线程启动前获取迭代器
Iterator<Object> iterator = arrayList.iterator();

threadOne.start();
threadOne.join();
Thread.sleep(2000);
threadTwo.join();
threadTwo.start();

while (iterator.hasNext()) {
System.out.println(iterator.next());
}
}
}

主线程在子线程执行完毕后使用获取的迭代器遍历数组元素,从输出结果我们知道,在子线程里面进行的操作一个都没有生效,这就是选代器弱一致性的体现。 需要注意的是,获取迭代器的操作必须在子线程操作之前进行。即所谓弱一致性是指返回跌代器后,其他线程对list 的增删改对迭代器是不可见的。

1
2
3
4
5
6
7
8
9
threadOne:[hello, alibaba, china, to, hangzhou]
threadOne:[hello, shoppe, to]
hello
alibaba
china
to
hangzhou
threadTwo:[hello, shoppe, to]
threadTwo:[hello, shoppe, shoppers]

总结

CopyOnWriteArrayList使用写时复制的策略来保证 list 的一致性,而获取一修改一写入三步操作并不是原子性的,所以在增删改的过程中都使用了独占锁,来保证在某个时间只有一个线程能对list 数组进行修改。 另外CopyOnWriteArrayList提供了弱一致性的迭代器从而保证在获取迭代器后,其他线程对list 的修改是不可见的,线程和线程间操作list可见迭代器遍历的数组是一个快照。 另外,CopyOnWriteArraySet 的底层就是使用它实现的,感兴趣的读者可以查阅相关源码。

JUC包中线程池ThreadPoolExecutor原理探究

介绍

线程池主要解决两个问题: 一是当执行大量异步任务时线程池能够提供较好的性能。在不使用线程池时,每当需要执行异步任务时直接new一个线程来运行,而线程的创建和销毁是需要开销的。 线程池里面的线程是可复用的,不需要每次执行异步任务时都重新创建和销毁线程。二是线程池提供了一种资源限制和管理的手段,比如可以限制线程的个数,动态新增线程等。 每个ThreadPoolExecutor也保留了一些基本的统计数据, 比如当前线程池完成的任务数目等。

类图介绍

Executors其实是个工具类,里面提供了好多静态方法,这些方法根据用户选择返回不同的线程池实例。 ThreadPoolExecutor 继承了
AbstractExecutorService,成员变量 ctl 是一个Integer的原子变量,用来记录线程池状态和线程池中线程个数,类似于ReentrantReadWriteLock使用一个变量来保存两种信息。

这里假设Integer类型是32位二进制表示,则其中高3位用来表示线程池状态后面29位用来记录线程池线程个数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// (高3位)用来表示线程池状态, (低29位)用来表示线程个数。默认是RUNNING状态,线程个数为0
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 是具体平台下Integer的二进制
位数-3后的剩余位数所表示的数才是线程的个数
private static final int COUNT_BITS = Integer.SIZE - 3;
// 线程最大个数(低29位)
private static final int CAPACITY = (1 << COUNT_BITS) - 1;

// 线程池状态
private static final int RUNNING = -1 << COUNT_BITS; // 接受新任务并且处理阻塞队列里的任务。
private static final int SHUTDOWN = 0 << COUNT_BITS; // 拒绝新任务但是处理阻塞队列里的任务。
private static final int STOP = 1 << COUNT_BITS; // 拒绝新任务并且抛弃阻塞队列里的任务,同时会中断正在处理的任务。
private static final int TIDYING = 2 << COUNT_BITS; // 所有任务都执行完(包含阻塞队列里面的任务)后当前线程池活动线程数为0, 将要调用terminated方法。
private static final int TERMINATED = 3 << COUNT_BITS; // 终止状态。 terminated方法调用完成以后的状态。

private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
// 计算ctl新值(线程状态与线程个数)
private static int ctlOf(int rs, int wc) { return rs | wc; }

线程池状态转换列举如下:

状态转换 什么下转换
RUNNING -> SHUTDOWN 显式调用 shutdown() 方法, 或者隐式调用了 finalize() 方法里面的 shutdown() 方法
RUNNING 或 SHUTDOWN -> STOP 显式调用 shutdownNow() 方法时
SHUTDOWN -> TIDYING 当线程池和任务队列都为空时
STOP -> TIDYING 当线程池为空时
TIDYING -> TERMINATED 当terminated()hook方法执行完成时

线程池参数如下:

参数 作用
corePoolSize 线程池核心线程个数
workQueue 用于保存等待执行的任务的阻塞队列,比如基于数组的有界
maximunPoolSize 线程池最大线程数量
RejectedExecutionHandler 饱和策略。当队列满并且线程个数达到maximunPoolSize后采取的策略, 比如AbortPolicy (抛出异常)、CallerRunsPolicy (使用调用者所在线程来运行任务)、 DiscardOldestPolicy (调用poll()丢弃一个任务,执行当前任务)
ThreadFactory 创建线程的工厂
keeyAliveTime 存活时间。 如果当前线程池中的线程数量比核心线程数量多,并且是闲置状态, 则这些闲置的线程能存活的最大时间
TimeUnit 存活时间的时间单位
ArrayBlockingQueue 基于链表的无界LinkedBlockingQueue、最多只有一个元素的同步队列SynchronousQueue及优先级队列PriorityBlockingQueue等

线程池类型如下:参考链接 - 线程池的四种使用方式

源码分析

1、public void execute(Runnable command)

execute方法的作用是提交任务command到线程池进行执行。用户线程提交任务到线程池的模型图如图8-2所示。

从该图可以看出, ThreadPoolExecutor的实现实际是一个生产消费模型,当用户添加任务到线程池时相当于生产者生产元素,workers线程工作集中的线程直接执行任务或者从任务队列里面获取任务时则相当于消费者消费元素。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// 如果任务为null,则抛出NPE异常
int c = ctl.get();
// 当前线程池中线程个数是否小于corePoolSize,小于则开启新线程运行
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
// 获取当前线程池的状态+线程个数变量的组合值;
c = ctl.get();
}
// 如果一个任务可以成功排队
if (isRunning(c) && workQueue.offer(command)) {
// 二次检查:(防止现有线程自上次检查后就死掉了)或该池自进入该方法后就关闭了。
int recheck = ctl.get();
// 如果当前线程池状态不是 RUNNING 则从队列中删除任务,并执行拒绝策略
if (!isRunning(recheck) && remove(command))
reject(command);
// 否则如果当前线程池为空, 则添加一个线程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 如果我们无法将任务排队,则尝试添加一个新线程。如果失败,则表明我们已关闭或已饱和,因此拒绝该任务。
else if (!addWorker(command, false))
reject(command);
}

public final int get() {
return value;
}

private static boolean isRunning(int c) {
return c < SHUTDOWN;
}

其中 addWorker()方法 主要分两个部分:第一部分双重循环的目的是通过CAS操作增加线程数,第二部分主要是把并发安全的任务添加到workers里面,并且启动任务执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 添加成功后启动任务
if (workerAdded) {
t.start();
workerStarted = true;
}

// 否则,按照记录的需添加的线程数回滚工作线程,并修改减少相应的线程池状态数量
if (! workerStarted)
addWorkerFailed(w);

private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)
workers.remove(w);
decrementWorkerCount();
tryTerminate();
} finally {
mainLock.unlock();
}
}

工作线程Worker的执行

用户线程提交任务到线程池后,由Worker来执行。先看下Worker的构造函数。

1
2
3
4
5
Worker(Runnable firstTask) {
setState(-1); // 在调用runWorker前禁止中断
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this); // 创建一个线程
}

在构造函数内首先设置Worker的状态为 -1,这是为了避免当前Worker在调用rnnWorker方法前被中断(当其他线程调用了线程池的 shutdownNow 时,如果Worker状态 >= 0 ,则会中断该线程)。这里设置了线程的状态为-1 ,所以该线程就不会被中断了。 在
如下runWorker代码中,运行代码(9) 时会调用unlock方法,该方法把status设置为了 0,所以这时候调用shutdownNow会中断Worker线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // (9) 将state设置为0, 允许中断
boolean completedAbruptly = true;
try {
// (10) 如果task不为null 或者 调用getTask从任务队列获取的任务返回不为null
while (task != null || (task = getTask()) != null) {
// (10.1) 获取工作线程内部持有的独占锁
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// (10.2) 执行任务前干一些事
beforeExecute(wt, task);
Throwable thrown = null;
try {
// (10.3) 执行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// (10.4) 执行任务完毕后干一些事
afterExecute(task, thrown);
}
} finally {
task = null;
// (10.5) 统计当前woker完成了多少任务,并释放锁
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// (11) 执行清理工作
processWorkerExit(w, completedAbruptly);
}
}

这里在执行具体任务期间加内部独占锁,是为了避免在任务运行期间,其他线程调用了shutdown后正在执行的任务被中断(shutdown只会中断当前被阻塞挂起的线程)

代码(11 )执行清理任务,其代码如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();

// (11.1) 统计整个线程池完成的任务个数, 并从工作集里面删除当前Woker
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}

// (11.2) 如果当前是SHUTDONW状态并且工作队列为空或者当前是STOP状态, 当前线程池里面没有活动线程
// 尝试设置线程池状态为TERMINATED
tryTerminate();

// (11.3) 如果当前线程个数小于核心个数,则增加
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}

shutdown 操作

调用shutdown方法后,线程池就不会再接受新的任务了,但是工作队列里面的任务还是要执行的。该方法会立刻返回,并不等待队列任务完成再返回。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// (12) 权限检查:检查看是否设置了安全管理器,是则看当前调用shutdown命令的线程是否有关闭线程的权限,
// 如果有权限则还要看调用线程是否有中断工作线程的权限,如果没有权限则抛出SecurityException或者NullPointerException异常。
checkShutdownAccess();
// (13) 设置当前线程池状态为SHUTDOWN或STOP,如果已经是直接返回,否则设置为SHUTDOWN
advanceRunState(SHUTDOWN);
// (14) 设置所有空闲线程的中断标志,
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
// (15) 尝试将状态变为TERMINATED
mainLock.unlock();
}
tryTerminate();
}

shutdownNow操作

调用shutdownNow方法后, 线程池就不会再接受新的任务了,并且会丢弃工作队列里面的任务, 正在执行的任务会被中断, 该方法会立刻返回,并不等待激活的任务执行完成。返回值为这时候队列里面被丢弃的任务列表。

awaitTermination操作

当线程调用awaitTermination方法后,当前线程会被阻塞,直到线程池状态变为TERMINATED才返回, 或者等待时间超时才返回。整个过程中独占锁的代码如下。

命名线程工厂

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class NamedThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;

NamedThreadFactory(String name) {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
if (null == name || name.isEmpty()) {
name = "pool";
}
namePrefix = name + "-" + poolNumber.getAndIncrement() + "-thread-";
}

@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}

然后我们创建线程池测试

1
2
3
4
5
6
7
8
ExecutorService executorService = new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
keepAliveTime,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(3),
new NamedThreadFactory("命名线程工厂"),
new ThreadPoolExecutor.CallerRunsPolicy());

输出:

1
2
Thread[命名线程工厂-1-thread-1,5,main] task1 - 1	
Thread[命名线程工厂-1-thread-2,5,main] task1 - 2

记得关闭线程池

在日常开发中为了便于线程的有效复用, 经常会用到线程池, 然而使用完线程池后如果不调用shutdown关闭线程池,则会导致线程池资源一直不被释放。下面通过简单的例子来说明该问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;

DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}

public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}

由如上代码可知,线程池默认的ThreadFactory创建的都是用户线程。 而线程池里面的核心线程是一直存在的,如果没有任务则会被阻塞,所以线程池里面的用户线程一直存在。而shutdown方法的作用就是让这些核心线程终止。下面简单看下shutdown的主要代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 设置线程池状态为SHUTDOWN
advanceRunState(SHUTDOWN);
// 中断所有的空闲工作线程
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}

这里在shutdown方法里面设置了线程池的状态为SHUTDOWN, 并且设置了所有Worker空闲线程(阻塞到队列的 take() 方法的线程)的中断标志。

总结

线程池巧妙地使用一个Integer类型的原子变量来记录线程池状态和线程池中的钱程个数。通过线程池状态来控制任务的执行,每个Worker线程可以处理多个任务。线程池通过线程的复用减少了线程创建和销毁的开销。

JUC包中ScheduledThreadPoolExecutor原理探究

介绍

这是一个可以在指定一定延迟时间后或者定时进行任务调度执行的线程池。

ScheduledThreadPoolExecutor继承了ThreadPoolExecutor并实现了ScheduledExecutorService接口。线程池队列是DelayedWorkQueue, 其和DelayedQueue类似,是一个延迟队列。

ScheduledFutureTask是具有返回值的任务, 继承自FutureTask。 FutureTask的内部有一个变量state用来表示任务的状态, 一开始状态为NEW, 所有状态为:

1
2
3
4
5
6
7
8
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;

ScheduledFutureTask内部还有一个变量 period 用来表示任务的类型,任务类型如下:

  • period=0, 说明当前任务是一次性的,执行完毕后就退出了。
  • period为负数,说明当前任务为fixed-delay任务,是固定延迟的定时可重复执行任务。
  • period为正数,说明当前任务为fixed-rate任务, 是固定频率的定时可重复执行任务。

ScheduledThreadPoolExecutor的一个构造函数如下,由该构造函数可知线程池队列是DelayedWorkQueue。

1
2
3
4
5
6
7
8
9
10
11
12
13
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}

public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}

原理剖析

本节讲解三个重要函数。

  • schedule(Runnable command, long delay,TimeUnit unit)
  • scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit)
  • scheduleAtFixedRate(Runnable command,long initialDelay,long period,Time Unit unit)

Scheduled()

该方法的作用是提交一个延迟执行的任务, 任务从提交时间算起延迟单位为unit的delay时间后开始执行。 提交的任务不是周期性任务,任务只会执行一次,代码如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit) {
// (1) 参数校验
if (command == null || unit == null)
throw new NullPointerException();
// (2) 任务转换
RunnableScheduledFuture<?> t = decorateTask(command,
new ScheduledFutureTask<Void>(command, null,
triggerTime(delay, unit)));
// (3) 添加任务到延迟队列
delayedExecute(t);
return t;
}

private long triggerTime(long delay, TimeUnit unit) {
return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
}

long triggerTime(long delay) {
return now() +
((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}

代码(2)装饰任务,把提交的command任务转换为ScheduledFutureTask。ScheduledFutureTask是具体放入延迟队列里面的东西。由于是延迟任务,所以ScheduledFutureTask实现了longgetDelay(TimeUnit unit)和int compareTo(Delayed other) 方法。triggerTime方法将延迟时间转换为绝对时间,也就是把当前时间的纳秒数加上延迟的纳秒数后的long型值。 ScheduledFutureTask的构造函数如下。

compare To的作用是加入元素到延迟队列后,在内部建立或者调整堆时会使用该元素的compareTo方法与队列里面其他元素进行比较,让最快要过期的元素放到队首。所以无论什么时候向队列里面添加元素,队首的元素都是最快要过期的元素

1
2
3
4
5
6
ScheduledFutureTask(Runnable r, V result, long ns) {
super(r, result);
this.time = ns;
this.period = 0;
this.sequenceNumber = sequencer.getAndIncrement();
}

scheduleWithFixedDelay()

该方法的作用是,当任务执行完毕后,让其延迟固定时间后再次运行(fixed-delay任务)。其中initialDelay表示提交任务后延迟多少时间开始执行任务command, delay表示当任务执行完毕后延长多少时间后再次运行command任务, unit是initialDelay和delay的时间单位。任务会一直重复运行直到任务运行中抛出了异常,被取消了,或者关闭了线程池。scheduleWithFixedDelay的代码如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (delay <= 0)
throw new IllegalArgumentException();
// 装饰任务类,注意period=-delay<O
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(-delay));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}

scheduleAtFixedRate()

该方法相对起始时间点以固定频率调用指定的任务(fixed-rate任务)。当把任务提交到线程池并延迟initialDelay时间(时间单位为unit)后开始执行任务command。然后从initialDelay+period时间点再次执行,而后在initialDelay+ 2* period时间点再次执行,循环往复,直到抛出异常或者调用了任务的cancel 方法取消了任务,或者关闭了线程池。如果当前任务还没有执行完,下一次要执行任务的时间到了,则不会并发执行,下次要执行的任务会延迟执行,要等到当前任务执行完毕后再执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
// 装饰任务类,注意period=period>O,不是负的
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}

总结

本章讲解了ScheduledThreadPoolExecutor的实现原理,如图9-2所示,其内部使用DelayQueue来存放具体任务。任务分为三种, 其中一次性执行任务执行完毕就结束了,fixed-delay任务保证同一个任务在多次执行之间间隔固定时间,fixed-rate任务保证按照固定的频率执行。 任务类型使用period的值来区分

https://img-blog.csdnimg.cn/20210325111416700.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzQzNDc2NDY1,size_16,color_FFFFFF,t_70

JUC包中线程同步器原理剖析

CountDownlatch原理剖析

案例介绍

在日常开发中经常会遇到需要在主线程中开启多个线程去并行执行任务, 并且主线程需要等待所有子线程执行完毕后再进行汇总的场景。在CountDownLatch 出现之前一般都使用线程的 join() 方法来实现这一点,但是join方法不够灵活, 不能够满足不同场景的需要,所以JDK开发组提供了CountDownLatch这个类, 我们前面介绍的例子使用CountDownLatch会更优雅。 使用CountDownLatch的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class JoinCountDownLatchUserPool {

// 创建一个CountDownLatch实例
private static volatile CountDownLatch countDownLatch = new CountDownLatch((2));

public static void main(String[] args) throws InterruptedException {

ExecutorService executorService = Executors.newFixedThreadPool(2);

// 将线程A添加到线程池
executorService.submit(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
countDownLatch.countDown();
}
System.out.println("Thread One over");
});

// 将线程B添加到线程池
executorService.submit(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
countDownLatch.countDown();
}
System.out.println("Thread tow over");
});

System.out.println("wait all child thread over");
countDownLatch.await();
System.out.println("all child thread over:" + countDownLatch.getCount());
}
}

创建了一个CountDownLatch实例,因为有两个子线程所以构造函数的传参为2。主线程调用countDownLatch.await() 方法后会被阻塞。子线程执行完毕后调用 countDownLatch.countDown()方法让countDownLatch内部的计数器减 1 ,所有子线程执行完毕并调用countDown() 方法后计数器会变为0,这时候主线程的 await() 方法才会返回。

这里总结下CountDownLatch与join方法的区别。一个区别是,调用一个子线程的join()方法后,该线程会一直被阻塞直到子线程运行完毕,而CountDownLatch则使用计数器来允许子线程运行完毕或者在运行中递减计数,也就是CountDownLatch可以在子线程运行的任何时候让await方法返回而不一定必须等到线程结束。另外, 使用线程池来管理线程时一般都是直接添加Runable到线程池,这时候就没有办法再调用线程的join方法了,就是说countDownLatch相比join方法让我们对线程同步有更灵活的控制。

实现原理探究

为了一览CountDownLatch的内部结构,我们先看它的类图(如图10-1 所示)。

从类图可以看出,CountDownLatch是使用AQS实现的。 通过下面的构造函数,你会发现,实际上是把计数器的值赋给了AQS的状态变量state,也就是这里使用AQS的状态值来表示计数器值。

1
2
3
4
5
6
7
8
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}

Sync(int count) {
setState(count);
}

下面我们来研究CountDownLatch中的几个重要的方法, 看它们是如何调用AQS来实现功能的。

1、void await() 方法

当线程调用CountDownLatch对象的await方法后, 当前线程会被阻塞, 直到下面的情况之一发生才会返回: 当所有线程都调用了CountDownLatch对象的countDown方法后,也就是计数器的值为0时;其他线程调用了当前线程的 interrupt() 方法中断了当前线程,当前线程就会抛出InterruptedException异常, 然后返回。

1
2
3
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

从以上代码可以看到,await() 方法委托sync调用了AQS的 acquireSharedInterruptibly 方法, 后者的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// AQS获取共享资源时可被中断的方法
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
// 如果线程被中断则抛出异常
if (Thread.interrupted())
throw new InterruptedException();
// 查看当前计数器值是否为0, 为0直接返回, 否则进入AQS的队列等待
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}

protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}

由如上代码可知, 该方法的特点是线程获取资源时可以被中断, 并且获取的资源是共享资源。另外可以看到,这里tryAcquireShared传递的arg参数没有被用到, 调用tryAcquireShared的方法仅仅是为了检查当前状态值是不是为0, 并没有调用CAS让当前状态值减1。

2、boolean await(long timeout, TimeUnit unit)方法

当线程调用了CountDownLatch对象的该方法后, 当前线程会被阻塞。相对于上个方法,这里多了个返回条件,设置的timeout时间到了,会因为超时而返回false ;

1
2
3
4
5
6
7
8
9
10
11
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquireShared(arg) >= 0 ||
doAcquireSharedNanos(arg, nanosTimeout);
}

3、 void countDown() 方法

线程调用该方法后,计数器的值递减, 递减后如果计数器值为0则唤醒所有因调用await方法而被阻塞的线程,否则什么都不做。下面看下countDown() 方法是如何调用AQS的方法的。

1
2
3
public void countDown() {
sync.releaseShared(1);
}

由如上代码可知,CountDownLatch的 countDown() 方法委托sync调用了AQS的releaseShared方法,后者的代码如下。

1
2
3
4
5
6
7
8
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
// LockSupport.unpark(s.thread) 调用AQS的doReleaseShared方法来激活阻塞的线程
doReleaseShared();
return true;
}
return false;
}

releaseShared首先调用了sync实现的AQS的tryReleaseShared方法,其代码如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
protected boolean tryReleaseShared(int releases) {
// 循环进行CAS,直到当前线程成功完成CAS使计数器值(状态值state)减 1 并更新到state
for (;;) {
int c = getState();
// 如果当前状态值为0,则直接返回,防止值被减成 负数
if (c == 0)
return false;
int nextc = c-1;
// 使用CAS让计数器值减1
if (compareAndSetState(c, nextc))
// 返回true说明是最后一个线程调用的countdown方法,而且当前计数器值为0
return nextc == 0;
}
}

使用CAS将计数器值减1, CAS失败则循环重试,否则如果当前计数器值为0则返回true,还需要唤醒因调用CountDownLatch的await方法而被阻塞的线程,具体是调用AQS的doReleaseShared方法来激活阻塞的线程。

4、long getCount() 方法

获取当前计数器的值,也就是AQS的 state 的值,一般在测试时使用该方法。下面看下代码。

1
2
3
4
5
6
7
public long getCount() {
return sync.getCount();
}

int getCount() {
return getState();
}

由如上代码可知,在其内部还是调用了AQS的getState方法来获取state 的值(计数器当前值)。

总结

本节首先介绍了CountDownLatch的使用,相比使用join 方法来实现线程间同步,前者更具有灵活性和方便性。另外还介绍了CountDownLatch的原理,CountDownLatch是使用AQS实现的。使用AQS的状态变量来存放计数器的值。首先在初始化CountDownLatch时设置状态值(计数器值),当多个线程调用countdown方法时实际是原子性递减AQS的状态值。当线程调用await 方法后当前线程会被放入AQS的阻塞队列等待计数器为0再返回。其他线程调用countdown方法让计数器值递减 1,当计数器值变为0时, 当前线程还要调用AQS的doReleaseShared方法来激活由于调用 await() 方法而被阻塞的线程(countdown方法中)。但是CountDownLatch的计数器是一次性的,也就是等到计数器值变为0后,再调用CountDownLatch的await和countdown方法都会立刻返回,

回环屏障CyclicBarrier原理探究

JDK开发组提供了CyclicBarrier类, 并且CyclicBarrier类的功能并不限于CountDownLatch的功能。从字面意思理解,CyclicBarrier是回环屏障的意思,它可以让一组线程全部达到一个状态后再全部同时执行。这里之所以叫作回环是因为当所有等待线程执行完毕,并重置CyclicBarrier的状态后它可以被重用。之所以叫作屏障是因为线程调用await 方法后就会被阻塞,这个阻塞点就称为屏障点,等所有线程都调用了await方法后,线程们就会冲破屏障,继续向下运行。

案列介绍

在介绍原理前先介绍几个实例以便加深理解。在下面的例子中,我们要实现的是,使用两个线程去执行一个被分解的任务A, 当两个线程把自己的任务都执行完毕后再对它们的结果进行汇总处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class CyclicBarrierTest1 {

// 创建一个CyclicBarrier 实例,添加一个所有子线程全部到达屏障后执行的任务
private static CyclicBarrier cyclicBarrier = new CyclicBarrier(2, () -> {
System.out.println(Thread.currentThread() + " task1 merge result");
});

public static void main(String[] args) {

// 创建一个线程个数固定为2的线程池
ExecutorService executorService = Executors.newFixedThreadPool(2);

executorService.submit(() -> {
System.out.println(Thread.currentThread() + " task1 - 1");

System.out.println(Thread.currentThread() + " enter in barrier");
try {
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread() + " enter out barrier");
});

executorService.submit(() -> {
System.err.println(Thread.currentThread() + " task1 - 2");

System.err.println(Thread.currentThread() + " enter in barrier");
try {
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
System.err.println(Thread.currentThread() + " enter out barrier");
});

executorService.shutdown();
}
}

输出结果如果:

1
2
3
4
5
6
7
Thread[pool-1-thread-2,5,main] task1 - 2
Thread[pool-1-thread-2,5,main] enter in barrier
Thread[pool-1-thread-2,5,main] enter out barrier0
Thread[pool-1-thread-1,5,main] task1 - 1
Thread[pool-1-thread-1,5,main] enter in barrier
Thread[pool-1-thread-1,5,main] task1 merge result
Thread[pool-1-thread-1,5,main] enter out barrier

如上代码创建了一个CyclicBarrier对象,其第一个参数为计数器初始值,第二个参数Runable是当计数器值为0时需要执行的任务。 在main 函数里面首先创建了一个大小为2的线程池,然后添加两个子任务到线程池, 每个子任务在执行完自己的逻辑后会调用await 方法。一开始计数器值为2, 当第一个线程调用await 方法时,计数器值会递减为1。 由于此时计数器值不为0,所以当前线程就到了屏障点而被阻塞。 然后第二个线程调用await时,会进入屏障,计数器值也会递减,现在计数器值为0, 这时就会去执行CyclicBarrier构造函数中的任务,执行完毕后退出屏障点,并且唤醒被阻塞的第二个线程,这时候第一个线程也会退出屏障点继续向下运行。

上面的例子说明了多个线程之间是相互等待的,假如计数器值为N,那么随后调用await方法的 N-1 个线程都会因为到达屏障点而被阻塞,当第N个线程调用await后,计数器值为0了,这时候第N个线程才会发出通知唤醒前面的 N - 1 个线程。 也就是当全部线程都到达屏障点时才能一块继续向下执行

假设一个任务由阶段1、阶段2和阶段3组成,每个线程要串行地执行阶段1、阶段2和阶段3,当多个线程执行该任务时,必须要保证所有线程的 阶段1 全部完成后才能进入阶段2执行, 当所有线程的阶段2全部完成后才能进入阶段3执行。 下面使用CyclicBanier来完成这个需求。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public class CyclicBarrierTest2 {

// 创建一个CyclicBarrier 实例,添加一个所有子线程全部到达屏障后执行的任务
private static CyclicBarrier cyclicBarrier = new CyclicBarrier(2);

public static void main(String[] args) {

// 创建一个线程个数固定为2的线程池
ExecutorService executorService = Executors.newFixedThreadPool(2);

executorService.submit(() -> {
try {
System.out.println(Thread.currentThread().getName() + " step1");
cyclicBarrier.await();

System.out.println(Thread.currentThread().getName() + " step2");
cyclicBarrier.await();

System.out.println(Thread.currentThread().getName() + " step3");
} catch (BrokenBarrierException | InterruptedException e) {
e.printStackTrace();
}
});

executorService.submit(() -> {
try {
System.out.println(Thread.currentThread().getName() + " step1");
cyclicBarrier.await();

Thread.sleep(1000); // 让它执行慢一秒
System.out.println(Thread.currentThread().getName() + " step2");
cyclicBarrier.await();

System.out.println(Thread.currentThread().getName() + " step3");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
});

executorService.shutdown();
}
}

输出结果:

1
2
3
4
5
6
pool-1-thread-1 step1
pool-1-thread-2 step1
pool-1-thread-1 step2
pool-1-thread-2 step2
pool-1-thread-2 step3
pool-1-thread-1 step3

在如上代码中,每个子线程在执行完阶段l 后都调用了await方法, 等到所有线程都到达屏障点后才会一块往下执行,这就保证了所有线程都完成了阶段l 后才会开始执行阶段2。然后在阶段2后面调用了await方法, 这保证了所有线程都完成了阶段2后, 才能开始阶段3的执行。 这个功能使用单个CountDownLatch是无法完成的。

实现原理探究

由以上类图可知,CyclicBarrier基于独占锁实现, 本质底层还是基于AQS的。 parties用来记录线程个数,这里表示多少线程调用await后,所有线程才会冲破屏障继续往下运行。而count一开始等于parties, 每当有线程调用await方法就递减1, 当count为0时就表示所有线程都到了屏障点。你可能会疑惑,为何维护parties和count两个变量,只使用count不就可以了?别忘了CyclicBarrier是可以被复用的, 使用两个变量的原因是,parties始终用来记录总的线程个数,当count计数器值变为0后, 会将parties 的值赋给count,从而进行复用。这两个变量是在构造CyclicBarrier对象时传递的, 如下所示:

1
2
3
4
5
6
7
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
// 当所有线程都到达屏障点后,执行的下一步任务
this.barrierCommand = barrierAction;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
private static class Generation {
boolean broken = false;
}

private final Condition trip = lock.newCondition();

private void breakBarrier() {
// broken并没有被声明为volatile 的,因为是在锁内使用变量, 所以不需要声明
generation.broken = true;
count = parties;
// 屏障一旦被打破 (当所有线程都到达屏障点后),就唤醒所有线程
trip.signalAll();
}

下面来看CyclicBarrier中的几个重要的方法。

1、int await() 方法

当前线程调用CyclicBarrier的该方法时会被阻塞, 直到满足下面条件之一才会返回:

  • parties个线程都调用了 await() 方法,也就是线程都到了屏障点;
  • 其他线程调用了当前线程的interrupt ()方法中断了当前线程,则当前线程会抛出Intem1ptedException异常而返回;
  • 与当前屏障点关联的Generation对象的broken标志被设置为true时, 会抛出BrokenBarrierException异常, 然后返回。
1
2
if (g.broken)
throw new BrokenBarrierException();

由如下代码可知, 在内部调用了dowait方法。第一个参数为false则说明不设置超时时间,这时候第二个参数没有意义。

1
2
3
4
5
6
7
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}

2、boolean await(long timeout, TimeUnit unit)方法

当前线程调用CyclicBarrier的该方法时会被阻塞,直到满足下面条件之一才会返回:

  • 设置的超时时间到了后返回false
  • parties个线程都调用了 await() 方法,也就是线程都到了屏障点;
  • 其他线程调用了当前线程的interrupt ()方法中断了当前线程,则当前线程会抛出Intem1ptedException异常而返回;
  • 与当前屏障点关联的Generation对象的broken标志被设置为true时, 会抛出BrokenBarrierException异常, 然后返回。
1
2
3
4
5
6
7
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(true, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}

3、int dowait(boolean timed, long nanos)方法

该方法实现了CyclicBarrier的核心功能,其代码如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
// 获取独占锁lock,进行后续操作,其余进入阻塞队列。
lock.lock();
try {
...

// (1) 如果index==O则说明所有线程都到了屏障点,此时执行初始化时传递的任务
int index = --count;
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
// (2) 执行任务
if (command != null)
command.run();
ranAction = true;
// (3) 激活其他因调用 await 方法而被阻塞的线程,并重置CyclicBarrier
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}

// (4)如果index != 0
for (;;) {
try {
// (5) 没有设置超时时间,
if (!timed)
trip.await();
// (6) 设置了超时时间
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
// 异常,唤醒线程
} else {
Thread.currentThread().interrupt();
}
}
...
}
} finally {
lock.unlock();
}
}

private void nextGeneration() {
// (7) signal completion of last generation
trip.signalAll();
// (8) 重置next generation
count = parties;
generation = new Generation();
}

private static class Generation {
boolean broken = false;
}

以上是dowait方法的主干代码。 当一个线程调用了dowait 方法后,首先会获取独占锁lock,如果创建CycleBarrier时传递的参数为10,那么后面9个调用钱程会被阻塞。然后当前获取到锁的线程会对计数器count进行递减操作,递减后count=index=9,因为index!=0所以当前线程会执行代码(4)。 如果当前线程调用的是无参数的await() 方法,则这里timed=false,所以当前线程会被放入条件变量的p的条件阻塞队列,当前线程会被挂起并释放获取的lock锁。 如果调用的是有参数的await 方法则timed=true,然后当前线程也会被放入条件变量的条件队列并释放锁资源,不同的是当前线程会在指定时间超时后自动被激活。

当第一个获取锁的线程由于被阻塞释放锁后,被阻塞的9个线程中有一个会竞争到lock锁,然后执行与第一个线程同样的操作,直到最后一个线程获取到lock锁,此时,己经有9个线程被放入了条件变量trip 的条件队列里面。最后count=index等于0,所以执行代码(2),如果创建CyclicBarrier时传递了任务,则在其他线程被唤醒前先执行任务,任务执行完毕后再执行代码(3),唤醒其他9个线程,并重置CyclicBarrier,然后这10个线程就可以继续向下运行了。

总结

本节首先通过案例说明了CycleBarrier与CountDownLatch的不同在于,前者是可以复用的,并且前者特别适合分段任务有序执行的场景。然后分析了CycleBarrier, 其通过独占锁ReentrantLock实现计数器原子性更新,并使用条件变量队列来实现线程同步。

信号量Semaphore原理探究

Semaphore信号量也是Java 中的一个同步器,与CountDownLatch和CycleBarrier不同的是,它内部的计数器是递增的,并且在一开始初始化Semaphore时可以指定一个初始值,但是并不需要知道需要同步的线程个数,而是在需要同步的地方调用acquire方法时指定需要同步的线程个数。

案例介绍

同样下面的例子也是在主线程中开启两个子线程让它们执行, 等所有子线程执行完毕后主线程再继续向下运行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class SemaphoreTest1 {
// 创建一个semaphore 实例
private static Semaphore semaphore = new Semaphore(0);

public static void main(String[] args) throws InterruptedException {

// 创建一个线程个数固定为2的线程池
ExecutorService executorService = Executors.newFixedThreadPool(2);

executorService.submit(new Runnable() {
@Override
public void run() {
semaphore.release();
System.err.println(Thread.currentThread() + " task1 - 1" + "\t");
}
});

executorService.submit(new Runnable() {
@Override
public void run() {
semaphore.release();
System.err.println(Thread.currentThread() + " task1 - 2" + "\t" );
}
});

// 等待子线程执行完毕,返回
semaphore.acquire(2);
System.out.println("All child Thread over" + "\t");

executorService.shutdown();
}
}

输出结果如下:

1
2
3
Thread[pool-1-thread-1,5,main] task1 - 1	
Thread[pool-1-thread-2,5,main] task1 - 2
All child Thread over

如上代码首先创建了一个信号量实例,构造函数的入参为0,说明当前信号量计数器的值为0。然后main函数向线程池添加两个线程任务,在每个线程内部调用信号量的release方法,这相当于让计数器值递增 1。最后在main线程里面调用信号量的acquire方法,传参为2说明调用acquire方法的线程会一直阻塞, 直到信号量的计数变为2才会返回。看到这里也就明白了,如果构造Semaphore时传递的参数为N,并在M个线程中调用了该信号量的release方法,那么在调用acquire使M个线程同步时传递的参数应该是M+N。

下面举个例子来模拟 CyclicBarrier 复用的功能,代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public class SemaphoreTest2 {
// 创建一个semaphore 实例
private static Semaphore semaphore = new Semaphore(0);

public static void main(String[] args) throws InterruptedException {

// 创建一个线程个数固定为2的线程池
ExecutorService executorService = Executors.newFixedThreadPool(2);

executorService.submit(new Runnable() {
@Override
public void run() {
semaphore.release();
System.err.println(Thread.currentThread() + " task1 - 1" + "\t");
}
});

executorService.submit(new Runnable() {
@Override
public void run() {
semaphore.release();
System.err.println(Thread.currentThread() + " task1 - 2" + "\t" );
}
});

// (1)等待子线程执行完毕,返回。会在获取到2个信号量后返回,返回后当前信号量值为0
semaphore.acquire(2);

executorService.submit(new Runnable() {
@Override
public void run() {
semaphore.release();
System.err.println(Thread.currentThread() + " task2 - 1" + "\t");
}
});

executorService.submit(new Runnable() {
@Override
public void run() {
semaphore.release();
System.err.println(Thread.currentThread() + " task2 - 2" + "\t" );
}
});

semaphore.acquire(2);
System.out.println("All child Thread over" + "\t");

executorService.shutdown();
}
}

如上代码首先将线程A和线程B加入到线程池。主线程执行代码(1)后被阻塞。线程A和线程B调用release方法后信号量的值变为了2,这时候主线程的aquire方法会在获取到2个信号量后返回(返回后当前信号量值为0)。然后主线程添加线程C和线程D到线程池,之后主线程执行代码。)后被阻塞(因为主线程要获取2个信号量,而当前信号量个数为0)。 当线程C和线程D执行完release方法后,主线程才返回。 从本例子可以看出,Semaphore在某种程度上实现了CyclicBarrier的复用功能。

实现原理探究

由该类图可知,Semaphore还是使用AQS实现的。Sync只是对AQS的一个修饰,并且Sync有两个实现类,用来指定获取信号量时是否采用公平策略。

打赏
  • 版权声明: 本博客所有文章除特别声明外,均采用 Apache License 2.0 许可协议。转载请注明出处!
  1. © 2020-2021 Lauy    湘ICP备20003709号

请我喝杯咖啡吧~

支付宝
微信