Java中的阻塞队列
- ArrayBlockingQueue :由数组结构组成的有界阻塞队列。
- LinkedBlockingQueue :由链表结构组成的有界阻塞队列。
- PriorityBlockingQueue :支持优先级排序的无界阻塞队列。
- DelayQueue:使用优先级队列实现的无界阻塞队列。
- SynchronousQueue:不存储元素的阻塞队列。
- LinkedTransferQueue:由链表结构组成的无界阻塞队列。
- LinkedBlockingDeque:由链表结构组成的双向阻塞队列
1.ArrayBlockingQueue(公平、非公平)
用数组实现的有界阻塞队列。此队列按照先进先出(FIFO)的原则对元素进行排序。默认情况下
不保证访问者公平的访问队列,所谓公平访问队列是指阻塞的所有生产者线程或消费者线程,当
队列可用时,可以按照阻塞的先后顺序访问队列,即先阻塞的生产者线程,可以先往队列里插入
元素,先阻塞的消费者线程,可以先从队列里获取元素。通常情况下为了保证公平性会降低吞吐
量。我们可以使用以下代码创建一个公平的阻塞队列:ArrayBlockingQueue fairQueue = new ArrayBlockingQueue(1000,true);
2.LinkedBlockingQueue(两个独立锁提高并发)
基于链表的阻塞队列,同 ArrayListBlockingQueue
类似,此队列按照先进先出(FIFO)的原则对元素进行排序。而 LinkedBlockingQueue 之所以能够高效的处理并发数据,还因为其对于生产者端和消费者端分别采用了独立的锁
来控制数据同步,这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。
LinkedBlockingQueue 会默认一个类似无限大小的容量(Integer.MAX_VALUE)。
3.PriorityBlockingQueue(compareTo 排序实现优先)
是 一 个 支持 优 先级 的 无界 队 列 。默 认 情况 下 元素 采 取 自然 顺 序升 序 排列 。 可 以自 定 义实 现compareTo()
方法来指定元素进行排序规则,或者初始化 PriorityBlockingQueue
时,指定构造参数 Comparator 来对元素进行排序。需要注意的是不能保证同优先级元素的顺序。
4.DelayQueue(缓存失效、定时任务 )
是一个支持延时获取元素的无界阻塞队列。队列使用 PriorityQueue 来实现。队列中的元素必须实现 Delayed 接口,在创建元素时可以指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素。我们可以将 DelayQueue
运用在以下应用场景:
- 缓存系统的设计:可以用
DelayQueue
保存缓存元素的有效期,使用一个线程循环查询DelayQueue
,一旦能从DelayQueue
中获取元素时,表示缓存有效期到了。 - 定 时 任 务 调 度 : 使 用
DelayQueue
保 存 当 天 将 会 执 行 的 任 务 和 执 行 时 间 , 一 旦 从DelayQueue
中获取到任务就开始执行,从比如TimerQueue
就是使用DelayQueue
实现的
5.SynchronousQueue(不存储数据、可用于传递数据)
是一个不存储元素的阻塞队列。每一个 put 操作必须等待一个 take 操作,否则不能继续添加元素。SynchronousQueue 可以看成是一个传球手,负责把生产者线程处理的数据直接传递给消费者线程。队列本身并不存储任何元素,非常适合于传递性场景,比如在一个线程中使用的数据,传递给另 外 一 个 线 程 使 用 , SynchronousQueue 的 吞 吐 量 高 于 LinkedBlockingQueue 和
ArrayBlockingQueue。
6.LinkedTransferQueue
是 一 个 由 链 表 结 构 组 成 的 无 界 阻 塞 TransferQueue 队 列 。 相 对 于 其 他 阻 塞 队 列 ,
LinkedTransferQueue 多了 tryTransfer 和 transfer 方法。
- transfer 方法:如果当前有消费者正在等待接收元素(消费者使用 take()方法或带时间限制的
poll()方法时),transfer 方法可以把生产者传入的元素立刻 transfer(传输)给消费者。如
果没有消费者在等待接收元素,transfer 方法会将元素存放在队列的 tail 节点,并等到该元素
被消费者消费了才返回。 - tryTransfer 方法。则是用来试探下生产者传入的元素是否能直接传给消费者。如果没有消费
者等待接收元素,则返回 false。和 transfer 方法的区别是 tryTransfer 方法无论消费者是否
接收,方法立即返回。而 transfer 方法是必须等到消费者消费了才返回。
对于带有时间限制的 tryTransfer(E e, long timeout, TimeUnit unit)方法,则是试图把生产者传
入的元素直接传给消费者,但是如果没有消费者消费该元素则等待指定的时间再返回,如果超时
还没消费元素,则返回 false,如果在超时时间内消费了元素,则返回 true。7.LinkedBlockingDeque
是一个由链表结构组成的双向阻塞队列。所谓双向队列指的你可以从队列的两端插入和移出元素。双端队列因为多了一个操作队列的入口,在多线程同时入队时,也就减少了一半的竞争。相比其他 的 阻 塞 队 列 , LinkedBlockingDeque 多 了 addFirst , addLast , offerFirst , offerLast ,peekFirst,peekLast 等方法,以 First 单词结尾的方法,表示插入,获取(peek)或移除双端队列的第一个元素。以 Last 单词结尾的方法,表示插入,获取或移除双端队列的最后一个元素。另外插入方法 add 等同于 addLast,移除方法 remove 等效于 removeFirst。但是 take 方法却等同于 takeFirst,不知道是不是 Jdk 的 bug,使用时还是用带有 First 和 Last 后缀的方法更清楚。在初始化 LinkedBlockingDeque 时可以设置容量防止其过渡膨胀。另外双向阻塞队列可以运用在“工作窃取”模式中。CyclicBarrier、CountDownLatch、Semaphore 的用法
1.CountDownLatch(线程计数器 )
CountDownLatch 类位于 java.util.concurrent 包下,利用它可以实现类似计数器的功能。比如有一个任务 A,它要等待其他 4 个任务执行完毕之后才能执行,此时就可以利用CountDownLatch来实现这种功能了。final CountDownLatch latch = new CountDownLatch(2); new Thread(){public void run() { System.out.println("子线程"+Thread.currentThread().getName()+"正在执行"); Thread.sleep(3000); System.out.println("子线程"+Thread.currentThread().getName()+"执行完毕"); latch.countDown(); }; }.start(); new Thread(){ public void run() { System.out.println("子线程"+Thread.currentThread().getName()+"正在执行"); Thread.sleep(3000); System.out.println("子线程"+Thread.currentThread().getName()+"执行完毕"); latch.countDown(); }; }.start(); System.out.println("等待 2 个子线程执行完毕..."); latch.await(); System.out.println("2 个子线程已经执行完毕"); System.out.println("继续执行主线程"); }
2.CyclicBarrier(回环栅栏-等待至 barrier 状态再全部同时执行)
字面意思回环栅栏,通过它可以实现让一组线程等待至某个状态之后再全部同时执行。叫做回环
是因为当所有等待线程都被释放以后,CyclicBarrier 可以被重用。我们暂且把这个状态就叫做
barrier,当调用 await()方法之后,线程就处于 barrier 了。
CyclicBarrier 中最重要的方法就是 await 方法,它有 2 个重载版本: - public int await():用来挂起当前线程,直至所有线程都到达 barrier 状态再同时执行后续任
务; - public int await(long timeout, TimeUnit unit):让这些线程等待至一定的时间,如果还有
线程没有到达 barrier 状态就直接让到达 barrier 的线程执行后续任务。
具体使用如下,另外 CyclicBarrier 是可以重用的。public static void main(String[] args) { int N = 4; CyclicBarrier barrier = new CyclicBarrier(N); for(int i=0;i<N;i++) new Writer(barrier).start(); } static class Writer extends Thread{ private CyclicBarrier cyclicBarrier; public Writer(CyclicBarrier cyclicBarrier) { this.cyclicBarrier = cyclicBarrier; } @Override public void run() { try { Thread.sleep(5000); //以睡眠来模拟线程需要预定写入数据操作 System.out.println(" 线 程 "+Thread.currentThread().getName()+" 写 入 数 据 完 毕,等待其他线程写入完毕"); cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); }catch(BrokenBarrierException e){ e.printStackTrace(); } System.out.println("所有线程写入完毕,继续处理其他任务,比如数据操作"); } }
3.Semaphore(信号量-控制同时访问的线程个数)
Semaphore 翻译成字面意思为 信号量,Semaphore 可以控制同时访问的线程个数,通过
acquire() 获取一个许可,如果没有就等待,而 release() 释放一个许可。
Semaphore 类中比较重要的几个方法: - public void acquire(): 用来获取一个许可,若无许可能够获得,则会一直等待,直到获得许
可。 - public void acquire(int permits):获取 permits 个许可
- public void release() { } :释放许可。注意,在释放许可之前,必须先获获得许可。
- public void release(int permits) { }:释放 permits 个许可
上面 4 个方法都会被阻塞,如果想立即得到执行结果,可以使用下面几个方法
- public boolean tryAcquire():尝试获取一个许可,若获取成功,则立即返回 true,若获取失
败,则立即返回 false - public boolean tryAcquire(long timeout, TimeUnit unit):尝试获取一个许可,若在指定的
时间内获取成功,则立即返回 true,否则则立即返回 false - public boolean tryAcquire(int permits):尝试获取 permits 个许可,若获取成功,则立即返
回 true,若获取失败,则立即返回 false - public boolean tryAcquire(int permits, long timeout, TimeUnit unit): 尝试获取 permits
个许可,若在指定的时间内获取成功,则立即返回 true,否则则立即返回 false - 还可以通过 availablePermits()方法得到可用的许可数目。
例子:若一个工厂有 5 台机器,但是有 8 个工人,一台机器同时只能被一个工人使用,只有使用完了,其他工人才能继续使用。那么我们就可以通过 Semaphore 来实现:
int N = 8; //工人数
Semaphore semaphore = new Semaphore(5); //机器数目
for(int i=0;i<N;i++)
new Worker(i,semaphore).start();
}
static class Worker extends Thread{
private int num;
private Semaphore semaphore;
public Worker(int num,Semaphore semaphore){
this.num = num;
this.semaphore = semaphore;
}
@Override
public void run() {
try {
semaphore.acquire();
System.out.println("工人"+this.num+"占用一个机器在生产...");
Thread.sleep(2000);
System.out.println("工人"+this.num+"释放出机器");
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
- CountDownLatch 和 CyclicBarrier 都能够实现线程之间的等待,只不过它们侧重点不
同;CountDownLatch 一般用于某个线程 A 等待若干个其他线程执行完任务之后,它才执行;而 CyclicBarrier 一般用于一组线程互相等待至某个状态,然后这一组线程再同时
执行;另外,CountDownLatch 是不能够重用的,而 CyclicBarrier 是可以重用的。 - Semaphore 其实和锁有点类似,它一般用于控制对某组资源的访问权限。
volatile 关键字的作用(变量可见性、禁止重排序)
Java 语言提供了一种稍弱的同步机制,即 volatile 变量,用来确保将变量的更新操作通知到其他
线程。volatile 变量具备两种特性,volatile 变量不会被缓存在寄存器或者对其他处理器不可见的
地方,因此在读取 volatile 类型的变量时总会返回最新写入的值。
变量可见性
其一是保证该变量对所有线程可见,这里的可见性指的是当一个线程修改了变量的值,那么新的
值对于其他线程是可以立即获取的。
**禁止重排序 **
volatile 禁止了指令重排。
比 sychronized 更轻量级的同步锁
在访问 volatile 变量时不会执行加锁操作,因此也就不会使执行线程阻塞,因此 volatile 变量是一
种比 sychronized 关键字更轻量级的同步机制。volatile 适合这种场景:一个变量被多个线程共
享,线程直接给这个变量赋值。
当对非 volatile 变量进行读写的时候,每个线程先从内存拷贝变量到 CPU 缓存中。如果计算机有
多个 CPU,每个线程可能在不同的 CPU 上被处理,这意味着每个线程可以拷贝到不同的 CPU
cache 中。而声明变量是 volatile 的,JVM 保证了每次读变量都从内存中读,跳过 CPU cache
这一步。
适用场景
值得说明的是对 volatile 变量的单次读/写操作可以保证原子性的,如 long 和 double 类型变量,
但是并不能保证 i++这种操作的原子性,因为本质上 i++是读、写两次操作。在某些场景下可以
代替 Synchronized。但是,volatile 的不能完全取代 Synchronized 的位置,只有在一些特殊的场景下,才能适用 volatile。总的来说,必须同时满足下面两个条件才能保证在并发环境的线程安
全:
(1)对变量的写操作不依赖于当前值(比如 i++),或者说是单纯的变量赋值(boolean
flag = true)。
(2)该变量没有包含在具有其他变量的不变式中,也就是说,不同的 volatile 变量之间,不
能互相依赖。只有在状态真正独立于程序内其他内容时才能使用 volatile。