在上篇关于从ReentrantLock看JUC中AQS 的这篇文章中,留下了一个非常重要的Condition模块,并未去分析。而这个模块,在实现BlockingQueue的过程中,用到了。因此特地回过头来,去补习一下关于Condition的实现与原理、以及JUC下面其他的同步工具类的使用。
Condition队列 Condition是一个与CLH类似的队列,主要的逻辑在AQS中已经实现。
原理 在ReentrantLock中,新建一个condition的入口是ReentrantLock#newCondition()
方法,这里直接new了一个ConditionObject 对象,它里面含有一个Node链表,与CLH队列中的Node是同一个Node。
这个队列的作用就是用来储存被暂停的线程,即调用ConditionObject 的await()方法,等待条件成熟;
当代码中,条件满足之后,再调用ConditionObject 的signal或signalAll方法,唤醒再ConditionObject 链表上一个/所有休眠的线程。
示例 因此,作用效果非常容易理解。它的应用举例的话,就直接用ArrayBlockingQueue
的实现作为说明 :
其实在ArrayBlockingQueue
这个阻塞队列中,还有一个是否为空的Condition
队列,原理与上述代码类似,在此不做过多赘述,但是可以了解下其实现的过程。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public E take () throws InterruptedException { final ReentrantLock lock = this .lock; lock.lockInterruptibly(); try { while (count == 0 ) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } } private void enqueue (E e) { final Object[] items = this .items; items[putIndex] = e; if (++putIndex == items.length) putIndex = 0 ; count++; notEmpty.signal(); }
CountDownLatch
A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.
多个线程等待计数器倒数到0,然后执行后续逻辑。
使用示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 class Driver { void main () throws InterruptedException { CountDownLatch startSignal = new CountDownLatch (1 ); CountDownLatch doneSignal = new CountDownLatch (N); for (int i = 0 ; i < N; ++i) new Thread (new Worker (startSignal, doneSignal)).start(); doSomethingElse(); startSignal.countDown(); doSomethingElse(); doneSignal.await(); } } class Worker implements Runnable { private final CountDownLatch startSignal; private final CountDownLatch doneSignal; Worker(CountDownLatch startSignal, CountDownLatch doneSignal) { this .startSignal = startSignal; this .doneSignal = doneSignal; } public void run () { try { startSignal.await(); doWork(); doneSignal.countDown(); } catch (InterruptedException ex) {} } void doWork () { ... } }}
使用示例二:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 class Driver2 { void main () throws InterruptedException { CountDownLatch doneSignal = new CountDownLatch (N); Executor e = ... for (int i = 0 ; i < N; ++i) e.execute(new WorkerRunnable (doneSignal, i)); doneSignal.await(); } } class WorkerRunnable implements Runnable { private final CountDownLatch doneSignal; private final int i; WorkerRunnable(CountDownLatch doneSignal, int i) { this .doneSignal = doneSignal; this .i = i; } public void run () { try { doWork(i); doneSignal.countDown(); } catch (InterruptedException ex) {} } void doWork () { ... } }
实现原理 内部类Sync继承自AQS
await方法,检查state是否为0,为0说明已经倒数到0,可以执行,否则加入CLH队列,然后park住该线程直至唤醒,再检查state…
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public void await () throws InterruptedException { sync.acquireSharedInterruptibly(1 ); } public final void acquireSharedInterruptibly (int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException (); if (tryAcquireShared(arg) < 0 ) doAcquireSharedInterruptibly(arg); } protected int tryAcquireShared (int acquires) { return (getState() == 0 ) ? 1 : -1 ; }
countDown方法,state减1,如果state==0,那么从CLH队列尾部开始找一个正常的线程unpark以唤醒。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public void countDown () { sync.releaseShared(1 ); } public final boolean releaseShared (int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true ; } return false ; } protected boolean tryReleaseShared (int releases) { for (;;) { int c = getState(); if (c == 0 ) return false ; int nextc = c - 1 ; if (compareAndSetState(c, nextc)) return nextc == 0 ; } }
CyclicBarrier
A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point. CyclicBarriers are useful in programs involving a fixed sized party of threads that must occasionally wait for each other. The barrier is called cyclic because it can be re-used after the waiting threads are released.
简单理解就是,多个线程执行完了之后(到达临界点),再执行最终的操作。
使用示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 class Solver { final int N; final float [][] data; final CyclicBarrier barrier; class Worker implements Runnable { int myRow; Worker(int row) { myRow = row; } public void run () { while (!done()) { processRow(myRow); try { barrier.await(); } catch (InterruptedException ex) { return ; } catch (BrokenBarrierException ex) { return ; } } } } public Solver (float [][] matrix) { data = matrix; N = matrix.length; Runnable barrierAction = () -> mergeRows(...); barrier = new CyclicBarrier (N, barrierAction); List<Thread> threads = new ArrayList <>(N); for (int i = 0 ; i < N; i++) { Thread thread = new Thread (new Worker (i)); threads.add(thread); thread.start(); } for (Thread thread : threads) thread.join(); } }
实现原理 内部实现依赖ReentrantLock及Condition。
await方法,最终会调用doWait(),首先会使用ReentrantLock加锁,然后将count减1。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 private int dowait (boolean timed, long nanos) { final ReentrantLock lock = this .lock; lock.lock(); try { ... int index = --count; if (index == 0 ) { boolean ranAction = false ; try { final Runnable command = barrierCommand; if (command != null ) command.run(); ranAction = true ; nextGeneration(); return 0 ; } finally { if (!ranAction) breakBarrier(); } } for (;;) { try { if (!timed) trip.await(); else if (nanos > 0L ) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) {...} ... } } finally { lock.unlock(); } }
如果count为0,也就是所有的线程完成其工作,那么执行barrierCommand,并重置参数,唤醒trip条件上的线程。
如果count不为0,在Condition(trip)上面进行await操作,即进入睡眠,等待trip条件满足。
Semephore 限流器。简而言之,最多允许几个线程同时执行。
使用示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 class Pool { private static final int MAX_AVAILABLE = 100 ; private final Semaphore available = new Semaphore (MAX_AVAILABLE, true ); public Object getItem () throws InterruptedException { available.acquire(); return getNextAvailableItem(); } public void putItem (Object x) { if (markAsUnused(x)) available.release(); } protected Object[] items = ... whatever kinds of items being managed protected boolean [] used = new boolean [MAX_AVAILABLE]; protected synchronized Object getNextAvailableItem () { for (int i = 0 ; i < MAX_AVAILABLE; ++i) { if (!used[i]) { used[i] = true ; return items[i]; } } return null ; } protected synchronized boolean markAsUnused (Object item) { for (int i = 0 ; i < MAX_AVAILABLE; ++i) { if (item == items[i]) { if (used[i]) { used[i] = false ; return true ; } else return false ; } } return false ; } }
实现原理
获取资源有公平和非公平之分。公平获取时,如果发现有人在排队,那么会直接加入CLH队列中。返回-1小于0,直接入CLH队列。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public void acquire () throws InterruptedException { sync.acquireSharedInterruptibly(1 ); } public final void acquireSharedInterruptibly (int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException (); if (tryAcquireShared(arg) < 0 ) doAcquireSharedInterruptibly(arg); } protected int tryAcquireShared (int acquires) { for (;;) { if (hasQueuedPredecessors()) return -1 ; int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }
非公平会直接进行获取,如果数量不够,那么加入CLH队列。
1 2 3 4 5 6 7 8 9 final int nonfairTryAcquireShared (int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }