devstory

Le Tutoriel de Java PriorityBlockingQueue

Suivez-nous sur notre fanpage pour recevoir des notifications chaque fois qu'il y a de nouveaux articles. Facebook

1- PriorityBlockingQueue

PriorityBlockingQueue<E> est une classe qui représente une file d'attente avec des éléments triés par priorité, similaire à la classe PriorityQueue<E>. La différence est qu'il implémente l'interface BlockingQueue<E>, donc dispose des fonctionnalités supplémentaires spécifiées par cette interface.
Vous pouvez lire l'article sur l'interface BlockingQueue pour mieux comprendre les fonctionnalités de cette interface à travers des exemples de base.

public class PriorityBlockingQueue<E> extends AbstractQueue<E>
             implements BlockingQueue<E>, java.io.Serializable
Les caractéristiques de PriorityBlockingQueue:
  • Une file d'attente à capacité illimitée.
  • Les éléments dupliqués sont autorisés, mais ce n'est pas le cas pour les éléments null.
  • Les éléments sont triés par ordre croissant selon un Comparator fourni ou les éléments sont triés dans leur ordre naturel (Selon le constructeur utilisé).
  • PriorityBlockingQueue et son Iterator prennent en charge toutes les méthodes facultatives spécifiées dans les interfaces Collection et Iterator.
En règle générale, PriorityBlockingQueue gère un tableau interne pour stocker les éléments, qui peut être remplacé par un nouveau tableau si le nombre d'éléments de la file d'attente augmente.
L'objet Iterator obtenu à partir de la méthode iterator() et l'objet Spliterator obtenu à partir de la méthode spliterator() ne garantissent pas parcourir des éléments de cette file d'attente dans un ordre. Si vous le souhaitez, vous pouvez utiliser Arrays.sort(queue.toArray()).
Par exemple :
PriorityBlockingQueue_traverse_ex1.java

// Create a queue that sorts its elements in natural order.
BlockingQueue<String> queue = new PriorityBlockingQueue<>();
queue.add("Rose");
queue.add("Lotus");
queue.add("Jasmine");
queue.add("Sunflower");
queue.add("Daisy");

System.out.println(queue); // [Daisy, Jasmine, Lotus, Sunflower, Rose] (Not ordered)
String[] array = new String[queue.size()];
queue.toArray(array);
Arrays.sort(array);
for(String flower: array)  {
    System.out.println(flower);
}
Output:

[Daisy, Jasmine, Lotus, Sunflower, Rose]
Daisy
Jasmine
Lotus
Rose
Sunflower

2- Constructors


public PriorityBlockingQueue()
public PriorityBlockingQueue(int initialCapacity)  
public PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator)  
public PriorityBlockingQueue(Collection<? extends E> c) 
Voir l'explication détaillée pour chaque constructeur ci-dessous.

3- Par exemple

Voir aussi l'article sur BockingQueue pour avoir des exemples de base.
Comme vous le savez, le modèle Producer/Consumer est un bon exemple d'utilisation de l'interface BlockingQueue. Les produits créés par les producteurs sont ajoutés à une file d'attente avant d'être pris par les consommateurs. L'utilisation d'une file d'attente prioritaire est nécessaire dans certains cas. Par exemple, les produits avec une courte durée de conservation doivent avoir une priorité plus élevée pour être consommés rapidement par les consommateurs.
Voici un exemple simple :
La classe Product représente des produits avec des informations sur le nom et la durée de conservation :
Product.java

package org.o7planning.priorityblockingqueue.ex;

public class Product {
    private String name;
    private int shelfLife;

    public Product(String name, int shelfLife) {
        this.name = name;
        this.shelfLife = shelfLife;
    }  
    public int getShelfLife() {
        return shelfLife;
    }
    @Override
    public String toString() {
        return this.name + ":" + this.shelfLife;
    }
}
La classe ProductComparator est utilisée pour comparer les objets Product. Le produit avec la plus grande durée de conservation est considéré comme plus grand. Le produit ayant la durée de conservation la plus courte sera en tête de la file d'attente.
ProductComparator.java

package org.o7planning.priorityblockingqueue.ex;

import java.util.Comparator;

public class ProductComparator implements Comparator<Product> {

    @Override
    public int compare(Product o1, Product o2) {
        return o1.getShelfLife() - o2.getShelfLife();
    }
}
  • TODO Link?
Producer.java

package org.o7planning.priorityblockingqueue.ex;

import java.util.Random;
import java.util.concurrent.BlockingQueue;

public class Producer implements Runnable {  
    private final String producerName;
    private final BlockingQueue<Product> queue;
    private final int delay; // Seconds

    public Producer(String producerName, int delay, BlockingQueue<Product> q) {
        this.producerName = producerName;
        this.delay = delay;
        this.queue = q;
    }
    @Override
    public void run() {
        try {
            while (true) {
                Thread.sleep(this.delay * 1000); // 'delay' seconds.
                Product newProduct = this.produce();
                System.out.println("\n#" + this.producerName + " >> New Product: " + newProduct);
                this.queue.put(newProduct);
                // Printed results may not be sorted (***):
                System.out.println("  Current products in queue: " + this.queue + " (***)");
            }
        } catch (InterruptedException ex) {
        }
    }
    private Product produce() {
        int shelfLife = new Random().nextInt(3) + 3;
        return new Product("Apple", shelfLife);
    }
}
Consumer.java

package org.o7planning.priorityblockingqueue.ex;

import java.util.concurrent.BlockingQueue;

public class Consumer implements Runnable {
    private String consumerName;
    private final BlockingQueue<Product> queue;

    public Consumer(String consumerName, BlockingQueue<Product> q) {
        this.consumerName = consumerName;
        this.queue = q;
    }
    @Override
    public void run() {
        try {
            while (true) {
                this.consume(queue.take());
            }
        } catch (InterruptedException ex) {
        }
    }
    // Need 1 seconds to consume a product.
    private void consume(Product x) throws InterruptedException {
        System.out.println(" --> " + this.consumerName + " >> Consuming: " + x);
        Thread.sleep(1 * 1000); // 1 seconds.
    }
}
Setup.java

package org.o7planning.priorityblockingqueue.ex;

import java.util.Comparator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;

public class Setup {
    public static void main(String[] args) {
        // Comparator
        Comparator<Product> comparator = new ProductComparator();
        // Create a PriorityBlockingQueue with a capacity of 100.
        BlockingQueue<Product> queue = new PriorityBlockingQueue<Product>(100, comparator);
        queue.add(new Product("Banana", 5));
        queue.add(new Product("Banana", 2));
        queue.add(new Product("Banana", 7));
        queue.add(new Product("Banana", 3));
        queue.add(new Product("Banana", 1));  
        
        Producer producer1 = new Producer("Producer 01", 2, queue);
        Producer producer2 = new Producer("Producer 02", 3, queue);
        
        Consumer consumer1 = new Consumer("Consumer 01", queue);
        Consumer consumer2 = new Consumer("Consumer 02", queue);
        Consumer consumer3 = new Consumer("Consumer 03", queue);

        // Starting Producer threads
        new Thread(producer1).start();
        new Thread(producer2).start();
        // Starting Consumer threads
        new Thread(consumer1).start();
    }
}
(***) Remarque : Dans cet exemple, on utilise la méthode System.out.println(queue) pour imprimer tous les éléments de la file d'attente. Cependant, cette méthode ne garantit pas l'ordre des éléments.
Output:

--> Consumer 01 >> Consuming: Banana:1
--> Consumer 01 >> Consuming: Banana:2

#Producer 01 >> New Product: Apple:4
Current products in queue: [Banana:3, Apple:4, Banana:7, Banana:5] (***)
--> Consumer 01 >> Consuming: Banana:3

#Producer 02 >> New Product: Apple:3
Current products in queue: [Apple:3, Apple:4, Banana:7, Banana:5] (***)
--> Consumer 01 >> Consuming: Apple:3

#Producer 01 >> New Product: Apple:5
Current products in queue: [Apple:4, Banana:5, Banana:7, Apple:5] (***)
--> Consumer 01 >> Consuming: Apple:4
--> Consumer 01 >> Consuming: Apple:5

#Producer 02 >> New Product: Apple:4
Current products in queue: [Apple:4, Banana:7, Banana:5] (***)

#Producer 01 >> New Product: Apple:5
Current products in queue: [Apple:4, Apple:5, Banana:5, Banana:7] (***)
--> Consumer 01 >> Consuming: Apple:4
--> Consumer 01 >> Consuming: Apple:5
...

4- PriorityBlockingQueue()


public PriorityBlockingQueue()
Créer une PriorityBlockingQueue avec la capacité initiale par défaut (11). Les éléments de cette file d'attente seront triés dans leur ordre naturel, ce qui nécessite que tous les éléments soient des objets de l'interface Comparable.
  • TODO Link?

5- PriorityBlockingQueue(int)


public PriorityBlockingQueue(int initialCapacity)  
Créer une PriorityBlockingQueue avec la capacité initiale spécifiée. Les éléments de cette file d'attente seront triés dans leur ordre naturel, il est donc nécessaire que tous les éléments soient des objets de l'interface Comparable.
  • TODO Link?

6- PriorityBlockingQueue(int, Comparator)


public PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator) 
Créer une PriorityBlockingQueue avec la capacité initiale spécifiée et un Comparator fourni pour trier ses éléments. Si le comparator est null, l'ordre naturel des éléments est utilisé, il est donc nécessaire que tous les éléments soient des objets de l'interface Comparable.

7- PriorityBlockingQueue(Collection)


public PriorityBlockingQueue(Collection<? extends E> c)
Créer une PriorityBlockingQueue contenant tous les éléments de la Collection spécifiée.
Si cette Collection spécifiée est un SortedSet ou une PriorityQueue, cette PriorityBlockingQueue utilisera le même Comparator. Sinon, l'ordre naturel des éléments sera utilisé, il est donc nécessaire que tous les éléments soient des objets de l'interface Comparable.

8- Methods

Les méthodes héritées de l'interface BockingQueue<E> :

void put(E e) throws InterruptedException;
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
E take() throws InterruptedException;
E poll(long timeout, TimeUnit unit) throws InterruptedException;
int remainingCapacity();
int drainTo(Collection<? super E> c);  
int drainTo(Collection<? super E> c, int maxElements);
Consulter l'article sur BlockingQueue pour apprendre à utiliser les méthodes ci-dessus.
Les méthodes héritées de l'interface Queue<E> :

boolean add(E e);
boolean offer(E e);
E remove();
E poll();
E element();
E peek();
Les méthodes héritées de l'interface Collection<E> :

int size();
boolean isEmpty();
boolean contains(Object o);
Iterator<E> iterator();
Object[] toArray();
<T> T[] toArray(T[] a);
boolean add(E e);
boolean remove(Object o);
boolean containsAll(Collection<?> c);  
boolean addAll(Collection<? extends E> c);
boolean removeAll(Collection<?> c);
boolean retainAll(Collection<?> c);
void clear();  
boolean equals(Object o);
int hashCode();

default <T> T[] toArray(IntFunction<T[]> generator)  
default boolean removeIf(Predicate<? super E> filter)
default Spliterator<E> spliterator()  
default Stream<E> stream()  
default Stream<E> parallelStream()