JUC包下同步工具类及Condition队列

在上篇关于从ReentrantLock看JUC中AQS的这篇文章中,留下了一个非常重要的Condition模块,并未去分析。而这个模块,在实现BlockingQueue的过程中,用到了。因此特地回过头来,去补习一下关于Condition的实现与原理、以及JUC下面其他的同步工具类的使用。

Condition队列

Condition是一个与CLH类似的队列,主要的逻辑在AQS中已经实现。

原理

在ReentrantLock中,新建一个condition的入口是ReentrantLock#newCondition()方法,这里直接new了一个ConditionObject对象,它里面含有一个Node链表,与CLH队列中的Node是同一个Node。

  • 这个队列的作用就是用来储存被暂停的线程,即调用ConditionObject的await()方法,等待条件成熟;
  • 当代码中,条件满足之后,再调用ConditionObject的signal或signalAll方法,唤醒再ConditionObject链表上一个/所有休眠的线程。

示例

因此,作用效果非常容易理解。它的应用举例的话,就直接用ArrayBlockingQueue的实现作为说明

  • put操作,如果队列满了,执行notFull.await()方法,将当前线程,加入到notFull这个Condition队列上,也就是语义上的等待有空间,再进行入队操作。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    public void put(E e) throws InterruptedException {
    Objects.requireNonNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
    while (count == items.length)
    notFull.await();
    enqueue(e);
    } finally {
    lock.unlock();
    }
    }
  • 当进行出队操作时,执行notFull.signal(),会唤醒notFull这个队列上的一条线程,进行入队操作。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    private E dequeue() {
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E e = (E) items[takeIndex];
    items[takeIndex] = null;
    if (++takeIndex == items.length) takeIndex = 0;
    count--;
    if (itrs != null)
    itrs.elementDequeued();
    notFull.signal();
    return e;
    }

其实在ArrayBlockingQueue这个阻塞队列中,还有一个是否为空的Condition队列,原理与上述代码类似,在此不做过多赘述,但是可以了解下其实现的过程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}

private void enqueue(E e) {
final Object[] items = this.items;
items[putIndex] = e;
if (++putIndex == items.length) putIndex = 0;
count++;
notEmpty.signal();
}

CountDownLatch

A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.

多个线程等待计数器倒数到0,然后执行后续逻辑。

使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
class Driver { // ...
void main() throws InterruptedException {
CountDownLatch startSignal = new CountDownLatch(1);
CountDownLatch doneSignal = new CountDownLatch(N);

for (int i = 0; i < N; ++i) // create and start threads
new Thread(new Worker(startSignal, doneSignal)).start();

doSomethingElse(); // don't let run yet
startSignal.countDown(); // let all threads proceed
doSomethingElse();
doneSignal.await(); // wait for all to finish
}
}

class Worker implements Runnable {
private final CountDownLatch startSignal;
private final CountDownLatch doneSignal;
Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
this.startSignal = startSignal;
this.doneSignal = doneSignal;
}
public void run() {
try {
startSignal.await();
doWork();
doneSignal.countDown();
} catch (InterruptedException ex) {} // return;
}

void doWork() { ... }
}}

使用示例二:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class Driver2 { // ...
void main() throws InterruptedException {
CountDownLatch doneSignal = new CountDownLatch(N);
Executor e = ...

for (int i = 0; i < N; ++i) // create and start threads
e.execute(new WorkerRunnable(doneSignal, i));

doneSignal.await(); // wait for all to finish
}
}

class WorkerRunnable implements Runnable {
private final CountDownLatch doneSignal;
private final int i;

WorkerRunnable(CountDownLatch doneSignal, int i) {
this.doneSignal = doneSignal;
this.i = i;
}

public void run() {
try {
doWork(i);
doneSignal.countDown();
} catch (InterruptedException ex) {} // return;
}

void doWork() { ... }
}

实现原理

内部类Sync继承自AQS

await方法,检查state是否为0,为0说明已经倒数到0,可以执行,否则加入CLH队列,然后park住该线程直至唤醒,再检查state…

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}

protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}

countDown方法,state减1,如果state==0,那么从CLH队列尾部开始找一个正常的线程unpark以唤醒。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public void countDown() {
sync.releaseShared(1);
}

public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}

protected boolean tryReleaseShared(int releases) {
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c - 1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}

CyclicBarrier

A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point. CyclicBarriers are useful in programs involving a fixed sized party of threads that must occasionally wait for each other. The barrier is called cyclic because it can be re-used after the waiting threads are released.

简单理解就是,多个线程执行完了之后(到达临界点),再执行最终的操作。

使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
class Solver {
final int N;
final float[][] data;
final CyclicBarrier barrier;

class Worker implements Runnable {
int myRow;
Worker(int row) { myRow = row; }
public void run() {
while (!done()) {
processRow(myRow);

try {
barrier.await();
} catch (InterruptedException ex) {
return;
} catch (BrokenBarrierException ex) {
return;
}
}
}
}

public Solver(float[][] matrix) {
data = matrix;
N = matrix.length;
Runnable barrierAction = () -> mergeRows(...);
barrier = new CyclicBarrier(N, barrierAction);

List<Thread> threads = new ArrayList<>(N);
for (int i = 0; i < N; i++) {
Thread thread = new Thread(new Worker(i));
threads.add(thread);
thread.start();
}

// wait until done
for (Thread thread : threads)
thread.join();
}
}

实现原理

内部实现依赖ReentrantLock及Condition。

await方法,最终会调用doWait(),首先会使用ReentrantLock加锁,然后将count减1。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
private int dowait(boolean timed, long nanos){
final ReentrantLock lock = this.lock;
lock.lock();
try {
...
// count减1
int index = --count;
if (index == 0) { // 到达临界条件
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
// 执行临界操作
command.run();
ranAction = true;
// 还原参数,如count,以准备下次使用
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}

// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
if (!timed)
// 在trip这个Condition队列上面睡眠等待
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {...}
...
}
} finally {
lock.unlock();
}
}
  • 如果count为0,也就是所有的线程完成其工作,那么执行barrierCommand,并重置参数,唤醒trip条件上的线程。
  • 如果count不为0,在Condition(trip)上面进行await操作,即进入睡眠,等待trip条件满足。

Semephore

限流器。简而言之,最多允许几个线程同时执行。

使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
class Pool {
private static final int MAX_AVAILABLE = 100;
private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);

public Object getItem() throws InterruptedException {
available.acquire();
return getNextAvailableItem();
}

public void putItem(Object x) {
if (markAsUnused(x))
available.release();
}

// Not a particularly efficient data structure; just for demo

protected Object[] items = ... whatever kinds of items being managed
protected boolean[] used = new boolean[MAX_AVAILABLE];

protected synchronized Object getNextAvailableItem() {
for (int i = 0; i < MAX_AVAILABLE; ++i) {
if (!used[i]) {
used[i] = true;
return items[i];
}
}
return null; // not reached
}

protected synchronized boolean markAsUnused(Object item) {
for (int i = 0; i < MAX_AVAILABLE; ++i) {
if (item == items[i]) {
if (used[i]) {
used[i] = false;
return true;
} else
return false;
}
}
return false;
}
}

实现原理

  • acquire方法,如果可用数(最多允许同时执行线程数)减去想要获取数(一般为1)小于0,获取锁失败,加入CLH队列中,直到唤醒再进行尝试获取资源;

  • release方法,可用数加上释放数,同时唤醒CLH上面的休眠的线程,让它们再次尝试获取资源。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    public void release() {
    sync.releaseShared(1);
    }

    protected final boolean tryReleaseShared(int releases) {
    for (;;) {
    int current = getState();
    int next = current + releases;
    if (next < current) // overflow
    throw new Error("Maximum permit count exceeded");
    if (compareAndSetState(current, next))
    return true;
    }
    }

获取资源有公平和非公平之分。公平获取时,如果发现有人在排队,那么会直接加入CLH队列中。返回-1小于0,直接入CLH队列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}

protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}

非公平会直接进行获取,如果数量不够,那么加入CLH队列。

1
2
3
4
5
6
7
8
9
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}

JUC包下同步工具类及Condition队列

https://eucham.me/2020/04/20/14e5d96312a9.html

作者

遇寻

发布于

2020-04-20

更新于

2022-04-21

许可协议

评论