博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
JUC LinkedBlockingQueue
阅读量:7102 次
发布时间:2019-06-28

本文共 4292 字,大约阅读时间需要 14 分钟。

本文首先发表在

java.util.concurrent.LinkedBlockingQueue 是一个基于单向链表的、范围任意的(其实是有界的)、FIFO 阻塞队列。访问与移除操作是在队头进行,添加操作是在队尾进行,并分别使用不同的锁进行保护,只有在可能涉及多个节点的操作才同时对两个锁进行加锁。

队列是否为空、是否已满仍然是通过元素数量的计数器(count)进行判断的,由于可以同时在队头、队尾并发地进行访问、添加操作,所以这个计数器必须是线程安全的,这里使用了一个原子类 AtomicInteger,这就决定了它的容量范围是: 1 – Integer.MAX_VALUE。

由于同时使用了两把锁,在需要同时使用两把锁时,加锁顺序与释放顺序是非常重要的:必须以固定的顺序进行加锁,再以与加锁顺序的相反的顺序释放锁。

头结点和尾结点一开始总是指向一个哨兵的结点,它不持有实际数据,当队列中有数据时,头结点仍然指向这个哨兵,尾结点指向有效数据的最后一个结点。这样做的好处在于,与计数器 count 结合后,对队头、队尾的访问可以独立进行,而不需要判断头结点与尾结点的关系。

属性与链表节点类

 
// 链表的结点类,单向链表,只有一个后继指针static class Node
{ E item; /* * 后继指针。值为下列之一: * 实际的后继结点。 * 自身,表示后继是 head.next (用于在遍历处理时判断) * null,表示没有后继(这是尾结点) */ Node
next; Node(E x) { item = x; }}// 最大容量上限,默认是 Integer.MAX_VALUEprivate final int capacity;// 当前元素数量,这是个原子类。因为读写分别使用不同的锁,但都会访问这个属性,所以它需要是线程安全的。private final AtomicInteger count = new AtomicInteger(0);// 头结点private transient Node
head;// 尾结点private transient Node
last;// 队头访问锁private final ReentrantLock takeLock = new ReentrantLock();// 队头访问等待条件、队列private final Condition notEmpty = takeLock.newCondition();// 队尾访问锁private final ReentrantLock putLock = new ReentrantLock();// 队尾访问等待条件、队列private final Condition notFull = putLock.newCondition();

enqueue 操作

 
// 在持有 putLock 锁下执行private void enqueue(Node
node) { // assert putLock.isHeldByCurrentThread(); // assert last.next == null; last = last.next = node;}

dequeue 操作

返回队列里第一个有效元素。

 
// 在持有 takeLock 锁下执行private E dequeue() {    // assert takeLock.isHeldByCurrentThread();    // assert head.item == null;    Node
h = head; Node
first = h.next; h.next = h; // help GC head = first; E x = first.item; first.item = null; // 出队列后的结点作为新的哨兵结点 return x;}

对两把锁的加锁与释放

在需要对两把锁同时加锁时,把加锁的顺序与释放的顺序封装成方法,确保所有地方都是一致的。而且获取锁时都是不响应中断的,一直获取直到加锁成功,这就避免了第一把锁加锁成功,而第二把锁加锁失败导致锁不释放的风险。

注意,锁的释放顺序与加锁顺序是相反的。

 
// 把固定的加锁顺序封装在方法内,确保所有的对两把锁加锁的顺序都是一致的。void fullyLock() {    putLock.lock();    takeLock.lock();}// 把固定的释放锁顺序封装在方法内,确保所有的对两把锁的释放顺序都是一致的。void fullyUnlock() {    takeLock.unlock();    putLock.unlock();}

put 操作

put 操作把指定元素添加到队尾,如果没有空间则一直等待。

public void put(E e) throws InterruptedException {    if (e == null) throw new NullPointerException();    // 在所有的 put/take/etc 等操作中预设值本地变量 c 为负数表示失败。成功会设置为 >= 0 的值。    int c = -1;    Node
node = new Node(e); // 下面两行是访问优化 final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { /* * 注意,count用于等待监视,即使它没有用锁保护。这个可行是因为 * count 只能在此刻(持有putLock)减小(其他put线程都被锁拒之门外), * 当count对capacity发生变化时,当前线程(或其他put等待线程)将被通知。 * 在其他等待监视的使用中也类似。 */ while (count.get() == capacity) { notFull.await(); } enqueue(node); c = count.getAndIncrement(); // 还有可添加空间则唤醒put等待线程。 if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } // c 由 count.getAndIncrement()返回,如果等于0, // 则 count 应该是大于等于 1 了,唤醒take线程。 if (c == 0) signalNotEmpty();}

take 操作

take 操作会一直阻塞直到有元素可返回。

 
public E take() throws InterruptedException {    E x;    int c = -1;    // 下面两行是访问优化    final AtomicInteger count = this.count;    final ReentrantLock takeLock = this.takeLock;    takeLock.lockInterruptibly();    try {      // 循环里等待直到有数据可获取        while (count.get() == 0) {            notEmpty.await();        }        // 获取第一个有效元素        x = dequeue();        // 如果还有可获取元素,唤醒等待获取的线程。        c = count.getAndDecrement();        if (c > 1)            notEmpty.signal();    } finally {        takeLock.unlock();    }    // 注意,c 是调用 getAndDecrement 返回的,如果 if 成立,    // 表明当前的 count 是 capacity - 1,可以添加新元素,所以唤醒 添加线程。    if (c == capacity)        signalNotFull();    return x;}

remove 操作

 
// 移除指定元素。由于移除元素涉及该结点前后两个结点的访问与修改,// 对两把锁加锁简化了同步管理。public boolean remove(Object o) {    if (o == null ) return false;    fullyLock();    try {        for (Node
trail = head, p = trail.next; p != null ; trail = p, p = p.next) { if (o.equals(p.item)) { unlink(p, trail); return true ; } } return false ; } finally { fullyUnlock(); }}

转载地址:http://wwyhl.baihongyu.com/

你可能感兴趣的文章
Enable PowerShell script execution policy
查看>>
aix 设置主机信任
查看>>
编程题:输入一串字符,程序会自动将大写字母转换为小写
查看>>
yii框架中model映射数据库中不存在的表,做请求转发的接口
查看>>
抖音云控专业营销实战引流,一键暴涨百万精准粉原理!
查看>>
golang学习的点点滴滴:channel缓冲区
查看>>
springboot 之缓存注解
查看>>
apt-get install 的用法
查看>>
如何写高效率的SQL
查看>>
利用灰名单阻止垃圾邮件
查看>>
HTTP X-Forwarded-For 头部字段的应用
查看>>
【翻译】安卓 opengl ES教程之六——纹理
查看>>
我的友情链接
查看>>
ncat使用总结
查看>>
expect 批量修改密码(无密码ssh登入就不要用这个了)
查看>>
hadoop 添加删除datanode及tasktracker
查看>>
数据库 之 更改用户密码和忘记管理员密码解决方法
查看>>
Cisco典型三层交换机VLAN配置
查看>>
我的友情链接
查看>>
记BAT的交互设计求职
查看>>