欢迎访问悦橙教程(wld5.com),关注java教程。悦橙教程  java问答|  每日更新
页面导航 : > > 文章正文

javaArrayBlockingQueue阻塞队列的实现示例,

来源: javaer 分享于  点击 10238 次 点评:92

javaArrayBlockingQueue阻塞队列的实现示例,


目录
  • 1.ArrayBlockingQueue概述
  • 2.ArrayBlockingQueue的核心特性
    • 2.1.线程安全性
    • 2.2.阻塞控制
    • 2.3.有界性
  • 3.ArrayBlockingQueue的使用
    • 3.1.创建ArrayBlockingQueue
    • 3.2.生产者-消费者模式
  • 4.ArrayBlockingQueue的最佳实践
    • 4.1.选择合适的队列大小
    • 4.2.合理使用阻塞方法
    • 4.3.避免死锁
    • 4.4.考虑使用公平策略
  • 5.源码详解
    • 5.1.主要属性
    • 5.2.构造函数
    • 5.3.入队操作
    • 5.4.出队操作
  • 6.总结

    在Java并发编程中,ArrayBlockingQueue是一个非常常用的工具类。它是一个由数组支持的有界阻塞队列,提供了线程安全的队列操作。

    1.ArrayBlockingQueue概述

    ArrayBlockingQueue是一个基于数组实现的阻塞队列,它继承自AbstractQueue并实现了BlockingQueue接口。这个队列在创建时需要指定一个固定的大小,之后这个大小就不能再改变了。当队列满时,如果再有新的元素试图加入队列,那么这个操作会被阻塞;同样地,如果队列为空,那么从队列中取元素的操作也会被阻塞。这种特性使得ArrayBlockingQueue非常适合作为生产者-消费者模式中的缓冲区。

    2.ArrayBlockingQueue的核心特性

    2.1.线程安全性

    ArrayBlockingQueue是线程安全的,它通过内部锁机制保证了在多线程环境下的安全性。因此,在多线程环境中,你可以放心地使用它而不需要担心数据的一致性问题。

    2.2.阻塞控制

    ArrayBlockingQueue提供了阻塞控制机制。当队列满时,尝试向队列中添加元素的线程会被阻塞,直到队列中有空间可用;同样,当队列为空时,尝试从队列中取出元素的线程也会被阻塞,直到队列中有元素可供消费。这种机制可以有效地控制生产者和消费者的速度,避免资源的浪费。

    2.3.有界性

    ArrayBlockingQueue的有界性可以防止队列无限制地增长,从而避免内存溢出。在实际应用中,这种有界性可以作为系统的一个流量控制阀,当系统过载时,通过阻塞或拒绝请求来保护系统。

    3.ArrayBlockingQueue的使用

    3.1.创建ArrayBlockingQueue

    创建一个ArrayBlockingQueue非常简单,只需要指定队列的大小即可:

    int queueSize = 10;
    BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(queueSize);
    

    3.2.生产者-消费者模式

    ArrayBlockingQueue常用于生产者-消费者模式。生产者负责生成数据并添加到队列中,而消费者则从队列中取出数据并处理。下面是一个简单的生产者-消费者示例:

    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.BlockingQueue;
    
    public class ProducerConsumerExample {
        public static void main(String[] args) {
            BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(5);
    
            Thread producer = new Thread(() -> {
                for (int i = 0; i < 10; i++) {
                    try {
                        System.out.println("生产者生产了数据:" + i);
                        queue.put(i);
                        Thread.sleep(200);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
    
            Thread consumer = new Thread(() -> {
                while (true) {
                    try {
                        Integer data = queue.take();
                        System.out.println("消费者消费了数据:" + data);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
    
            producer.start();
            consumer.start();
        }
    }
    

    运行结果:

    生产者生产了数据:0
    消费者消费了数据:0
    生产者生产了数据:1
    消费者消费了数据:1
    生产者生产了数据:2
    消费者消费了数据:2
    生产者生产了数据:3
    消费者消费了数据:3
    生产者生产了数据:4
    消费者消费了数据:4
    生产者生产了数据:5
    消费者消费了数据:5
    生产者生产了数据:6
    消费者消费了数据:6
    生产者生产了数据:7
    消费者消费了数据:7
    生产者生产了数据:8
    消费者消费了数据:8
    生产者生产了数据:9
    消费者消费了数据:9

    在这个示例中,我们创建了一个大小为5的ArrayBlockingQueue,然后启动了一个生产者线程和一个消费者线程。生产者线程会生成10个数据,并尝试将它们添加到队列中;消费者线程则会不断地从队列中取出数据并处理。由于队列的大小只有5,因此当生产者生产了5个数据后,它会被阻塞,直到消费者消费了一些数据释放出空间。同样地,当队列为空时,消费者线程也会被阻塞,直到生产者生产了新的数据。

    4.ArrayBlockingQueue的最佳实践

    4.1.选择合适的队列大小

    队列的大小应根据具体的应用场景来设置。如果设置得太小,可能会导致频繁的阻塞和上下文切换,影响性能;如果设置得太大,可能会浪费内存资源。因此,在选择队列大小时,需要综合考虑系统的负载、内存资源和性能要求等因素。

    4.2.合理使用阻塞方法

    ArrayBlockingQueue提供了多种阻塞方法,如puttakeofferpoll等。在使用这些方法时,需要根据具体的需求来选择合适的方法。例如,如果你希望当队列满时生产者线程能够阻塞等待空间可用,那么可以使用put方法;如果你希望生产者线程在队列满时能够立即返回并做其他处理,那么可以使用offer方法。

    4.3.避免死锁

    在使用ArrayBlockingQueue时,需要注意避免死锁的发生。例如,不要在持有其他锁的情况下调用ArrayBlockingQueue的阻塞方法,否则可能会导致死锁。此外,还需要注意避免循环等待和饥饿等问题。

    4.4.考虑使用公平策略

    ArrayBlockingQueue的构造函数允许指定一个公平性参数。如果设置为true,等待时间最长的线程将优先获得访问队列的机会。但需要注意的是,公平性可能会降低性能。因此,在决定是否使用公平策略时,需要综合考虑系统的性能和公平性要求。

    5.源码详解

    5.1.主要属性

    // 用于存储队列元素的数组
    final Object[] items;
    
    // 队列的容量
    int count;
    
    // 控制并发访问的锁
    final ReentrantLock lock;
    
    // 队列不满时的等待条件
    private final Condition notFull;
    
    // 队列不为空时的等待条件
    private final Condition notEmpty;
    
    // 队列中等待取数据的线程数
    final AtomicInteger waitingConsumers = new AtomicInteger();
    
    // 队列中等待插入数据的线程数
    final AtomicInteger waitingProducers = new AtomicInteger();
    

    5.2.构造函数

    ArrayBlockingQueue 提供了几种构造函数,其中最基本的两个是接受队列容量和指定是否公平的构造函数。

    public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }
    
    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull = lock.newCondition();
    }
    

    5.3.入队操作

    put(E e) 和 offer(E e) 是两种入队操作,其中 put 方法在队列满时会阻塞,而 offer 方法在队列满时会立即返回失败或者根据提供的超时时间等待。

    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }
    
    public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {
    
        checkNotNull(e);
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length) {
                if (nanos <= 0)
                    return false;
                nanos = notFull.awaitNanos(nanos);
            }
            enqueue(e);
            return true;
        } finally {
            lock.unlock();
        }
    }
    
    private void enqueue(E x) {
        // 队列尾部插入元素
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        // 通知可能在等待的消费者线程
        notEmpty.signal();
    }
    

    5.4.出队操作

    take() 和 poll() 是两种出队操作,其中 take 方法在队列空时会阻塞,而 poll 方法在队列空时会立即返回 null 或者根据提供的超时时间等待。

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }
    
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0) {
                if (nanos <= 0)
                    return null;
                nanos = notEmpty.awaitNanos(nanos);
            }
            return dequeue();
        } finally {
            lock.unlock();
        }
    }
    
    private E dequeue() {
        // 队列头部取出元素
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        // 通知可能在等待的生产者线程
        notFull.signal();
        return x;
    }
    

    6.总结

    ArrayBlockingQueue是Java并发编程中一个非常实用的工具类。它提供了线程安全的阻塞队列实现,支持生产者-消费者模式,并允许通过队列的大小来控制系统的流量。在使用ArrayBlockingQueue时,需要注意选择合适的队列大小、合理使用阻塞方法、避免死锁和考虑使用公平策略等问题。通过合理地使用ArrayBlockingQueue,可以有效地提高系统的并发性能和稳定性。

    到此这篇关于java ArrayBlockingQueue阻塞队列的实现示例的文章就介绍到这了,更多相关java ArrayBlockingQueue阻塞队列内容请搜索3672js教程以前的文章或继续浏览下面的相关文章希望大家以后多多支持3672js教程!

    您可能感兴趣的文章:
    • Java源码解析阻塞队列ArrayBlockingQueue介绍
    • Java源码解析阻塞队列ArrayBlockingQueue常用方法
    • Java源码解析阻塞队列ArrayBlockingQueue功能简介
    相关栏目:

    用户点评