devstory

Le Tutoriel de Java BlockingQueue

  1. BlockingQueue
  2. BlockingQueue methods
  3. Par exemple :
  4. drainTo(Collection<? super E>)
  5. drainTo(Collection<? super E>, int)
  6. offer(E, long, TimeUnit)
  7. poll(long, TimeUnit)
  8. put(E e)
  9. remainingCapacity()
  10. take()

1. BlockingQueue

BlockingQueue est une sous-interface de Queue, qui fournit des opérations supplémentaires et qui est utile dans les situations où la file d'attente est vide ou pleine d'éléments.
public interface BlockingQueue<E> extends Queue<E>
La différence entre Queue et BlockingQueue est illustrée par les méthodes qu'ils fournissent :
Interface
Queue<E>
Interface
BlockingQueue<E>
Action
Throws exception
Special value
Blocks
Times out
Insert
boolean add(e)
boolean offer(e)
void put(e)
boolean offer(e, time, unit)
Remove
E remove()
E poll()
E take()
E poll(time, unit)
Examine
E element()
E peek()
take()/poll(time,unit)
Comme on le sait, les méthodes remove(), element(), poll() et peek() de l'interfaceQueue renvoient l'élément en tête de la file d'attente, qui lancera immédiatement une exception ou renverra null si la file d'attente ne le fait pas. contenir des éléments. De telles opérations ne sont pas suffisantes dans un environnement multithread, l'interface BlockingQueue fournit donc de nouvelles méthodes take() et poll(time,unit).
  • take() : renvoyer l'élément de tête et le supprime de la file d'attente. Si la file d'attente est vide, la méthode attendra qu'un élément soit disponible dans la file d'attente.
  • poll(timeout,unit) : renvoyer l'élément de tête et le supprime de la file d'attente. Si la file d'attente est vide, la méthode attendra qu'un élément soit disponible pendant un laps de temps spécifié. Si le délai d'attente se termine sans aucun élément disponible, la méthode renverra null.
put(e)/offer(e,time,unit)
Les méthodes add(e) et offer(e) de l'interface Queue sont utilisées pour ajouter un élément à la file d'attente. Ils lancent immédiatement une exception ou renvoient false si la file d'attente est pleine. L'interface BlockingQueue fournit des méthodes put(e) et offer(e,timeout,unit) dans le même but, mais elles ont des fonctionnalités plus spéciales.
  • put(e) : insérer un élément dans la file d'attente. Si la file d'attente est pleine, cette méthode attendra jusqu'à ce qu'il y ait un espace disponible à insérer.
  • offer(e,timeout,unit): insérer un élément dans la file d'attente. Si la file d'attente est pleine, la méthode attendra un espace disponible à insérer pendant un laps de temps spécifié. Si le délai d'attente se termine sans aucun espace disponible, aucune action ne sera entreprise et la méthode renvoie false.
La hiérarchie de classe et d'interface liée à l'interface BlockingQueue :
Les caractéristiques de BlockingQueue:
  • BlockingQueue n'accepte pas les éléments null, si vous ajoutez intentionnellement un élément null à cette file d'attente, une NullPointerException sera lancée.
  • Une BlockingQueue peut être limitée en capacité. La méthode remainingCapacity() renvoie la capacité restante de cette file d'attente, ou Integer.MAX_VALUE si la file d'attente n'est pas limitée en capacité.
  • BlockingQueue est souvent utilisé dans les applications de type Producer et Consumer. BlockingQueue est un descendant de l'interface Collection, donc la méthode remove(e) est également prise en charge. Cependant, ces méthodes fonctionnent de manière inefficace et pour une utilisation occasionnelle uniquement. Par exemple, supprimer un produit défectueux de la file d'attente.
  • BlockingQueue est une file d'attente thread-safe. Toutes les méthodes de file d'attente sont des opérations atomiques. Cependant, les méthodes héritées de l'interface Collection telles que addAll, containsAll, retentionAll et removeAll ne sont pas nécessairement des opérations atomiques, cela dépend de la classe qui implémente l'interface BockingQueue. Par conséquent, il est fort probable que, par exemple, l'appel à addAll(aCollection) pourrait lancer une exception si un autre thread ajoute un élément à aCollection en même temps.
  • BlockingQueue ne prend pas en charge les méthodes telles que "close" ou "shutdown", par exemple lorsque le Producer veut envoyer un signal indiquant qu'aucun autre "produit" ne sera ajouté à la file d'attente. Le besoin et l'utilisation de ces fonctionnalités ont tendance à dépendre de la mise en œuvre. La solution pourrait être : Un « produit » final et spécial est ajouté à la file d'attente pour signaler au Consumer qu'il s'agit du dernier produit à être ajouté à la file d'attente.
Voir aussi :
  • The concept of Atomic operations in computer science

2. BlockingQueue methods

Les méthodes héritées de l'interface Queue<E> :
void put(E e) throws InterruptedException;
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
E take() throws InterruptedException;
E poll(long timeout, TimeUnit unit) throws InterruptedException;
int remainingCapacity();
int drainTo(Collection<? super E> c);  
int drainTo(Collection<? super E> c, int maxElements);
Les méthodes héritées de l'interface Queue<E> :
boolean add(E e);
boolean offer(E e);
E remove();
E poll();
E element();
E peek();
Les méthodes héritées de l'interface Collection<E> :
int size();
boolean isEmpty();
boolean contains(Object o);
Iterator<E> iterator();
Object[] toArray();
<T> T[] toArray(T[] a);
boolean add(E e);
boolean remove(Object o);
boolean containsAll(Collection<?> c);  
boolean addAll(Collection<? extends E> c);
boolean removeAll(Collection<?> c);
boolean retainAll(Collection<?> c);
void clear();  
boolean equals(Object o);
int hashCode();

default <T> T[] toArray(IntFunction<T[]> generator)  
default boolean removeIf(Predicate<? super E> filter)
default Spliterator<E> spliterator()  
default Stream<E> stream()  
default Stream<E> parallelStream()

3. Par exemple :

Le modèle Producer/Consumer est un bon exemple d'utilisation de l'interface BlockingQueue. Les produits créés par les producteurs sont ajoutés à une file d'attente avant d'être retirés par les consommateurs.
  • Les threads Producer convoquent la méthode BlockingQueue.put(e) pour ajouter des produits à une BlockingQueue. Si la file d'attente est pleine, la méthode put(e) attendra jusqu'à ce qu'il y ait de l'espace disponible.
  • Les threads Consumer appellent la méthode BlockingQueue.take() pour récupérer les produits de la file d'attente. Si la file d'attente est vide, cette méthode attendra jusqu'à ce que le produit soit disponible.
Voir le code complet de cet exemple :
La classe Product simule un produit.
Product.java
package org.o7planning.blockingqueue.ex;

public class Product {
    private String name;
    private int serial;

    public Product(String name, int serial) {
        this.name = name;
        this.serial = serial;
    }
    public String getInfo() {
        return "Product: " + this.name + ". Serial: " + this.serial;
    }
}
La classe Consumer simule le consommateur.
Consumer.java
package org.o7planning.blockingqueue.ex;

import java.util.concurrent.BlockingQueue;

public class Consumer implements Runnable {
    private String consumerName;
    private final BlockingQueue<Product> queue;

    public Consumer(String consumerName, BlockingQueue<Product> q) {
        this.consumerName = consumerName;
        this.queue = q;
    }
    @Override
    public void run() {
        try {
            while (true) {
                this.consume(queue.take());
            }
        } catch (InterruptedException ex) {
        }
    }  
    private void consume(Product x) {
        System.out.println(" --> " + this.consumerName + " >> Consume: " + x.getInfo());
    }
}
La classe Producer simule le producteur.
Producer.java
package org.o7planning.blockingqueue.ex;

import java.util.concurrent.BlockingQueue;

public class Producer implements Runnable {
    private static int serial = 1;

    private final String producerName;
    private final BlockingQueue<Product> queue;
    private final int delay; // Seconds

    public Producer(String producerName, int delay, BlockingQueue<Product> q) {
        this.producerName = producerName;
        this.delay = delay;
        this.queue = q;
    }
    @Override
    public void run() {
        try {
            while (true) {
                Thread.sleep(this.delay * 1000); // 'delay' second.
                this.queue.put(this.produce());
            }
        } catch (InterruptedException ex) {
        }
    }
    private Product produce() {
        System.out.println("#" + this.producerName + " >> Create a new product!");
        return new Product("IPhone", serial++);
    }
}
La classe Setup est utilisée pour faire fonctionner le système Producer/Consumer :
Setup.java
package org.o7planning.blockingqueue.ex;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class Setup {
    public static void main(String[] args) {
        // Create a BlockingQueue with a capacity of 5.
        BlockingQueue<Product> q = new ArrayBlockingQueue<Product>(5);
        Producer producer1 = new Producer("Producer 01", 2, q);
        Producer producer2 = new Producer("Producer 02", 1, q);
        Consumer consumer1 = new Consumer("Consumer 01", q);
        Consumer consumer2 = new Consumer("Consumer 02", q);
        Consumer consumer3 = new Consumer("Consumer 03", q);

        // Starting the threads
        new Thread(producer1).start();
        new Thread(producer2).start();
        new Thread(consumer1).start();
        new Thread(consumer2).start();
        new Thread(consumer3).start();
    }
}
Exécuter l'exemple ci-dessus et vous obtiendrez le résultat suivant :
#Producer 02 >> Create a new product!
 --> Consumer 03 >> Consume: Product: IPhone. Serial: 1
#Producer 01 >> Create a new product!
 --> Consumer 01 >> Consume: Product: IPhone. Serial: 2
#Producer 02 >> Create a new product!
 --> Consumer 02 >> Consume: Product: IPhone. Serial: 3
#Producer 02 >> Create a new product!
 --> Consumer 03 >> Consume: Product: IPhone. Serial: 4
#Producer 01 >> Create a new product!
 --> Consumer 01 >> Consume: Product: IPhone. Serial: 5
#Producer 02 >> Create a new product!
 --> Consumer 02 >> Consume: Product: IPhone. Serial: 6
#Producer 02 >> Create a new product!
 --> Consumer 03 >> Consume: Product: IPhone. Serial: 7
...

4. drainTo(Collection<? super E>)

int drainTo(Collection<? super E> c);
Supprimer tous les éléments de cette BlockingQueue et les ajoute à une Collection spécifiée. L'utilisation de cette méthode est plus efficace que d'appeler plusieurs fois poll() ou remove().
La méthode drainTo(Collection) garantit que tous les éléments seront déplacés avec succès vers la Collection, ou qu'aucun élément ne sera déplacé vers la Collection si une erreur se produit.

5. drainTo(Collection<? super E>, int)

int drainTo(Collection<? super E> c, int maxElements);
Supprimer jusqu'à maxElements éléments de cette BlockingQueue et les ajoute à une Collection spécifiée. L'utilisation de cette méthode est plus efficace que d'appeler plusieurs fois poll() ou remove().
Si une erreur se produit, aucun élément ne sera supprimé de cette BlockingQueue et aucun élément ne sera ajouté à la Collection.

6. offer(E, long, TimeUnit)

boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
Insérer un élément spécifié dans la file d'attente. Si la file d'attente est pleine, la méthode attendra un espace disponible à insérer, dans un laps de temps spécifié. Si le délai d'attente se termine sans qu'aucun espace ne soit disponible, aucune action ne sera entreprise et la méthode renvoie false.
Par exemple :
queue.offer(e, 5, TimeUnit.HOURS); // 5 hours.

7. poll(long, TimeUnit)

E poll(long timeout, TimeUnit unit) throws InterruptedException;
Renvoyer l'élément de tête et le supprimer de la file d'attente. Si la file d'attente est vide, la méthode attendra qu'un élément soit disponible dans un laps de temps spécifié. Si le délai d'attente se termine sans qu'aucun élément ne soit disponible, la méthode renverra null.
Par exemple :
E e = queue.offer(2, TimeUnit.HOURS); // 2 hours

8. put(E e)

void put(E e) throws InterruptedException;
Insérer un élément dans la file d'attente. Si la file d'attente est pleine, cette méthode attendra jusqu'à ce qu'il y ait un espace disponible à insérer.

9. remainingCapacity()

int remainingCapacity();
Renvoyer la capacité restante de cette file d'attente, ou Integer.MAX_VALUE si la file d'attente n'est pas limitée en capacité.
La classe ArrayBlockingQueue permet de créer une BlockingQueue en spécifiant le nombre maximum d'éléments.

10. take()

E take() throws InterruptedException;
Renvoyer l'élément de tête et le supprimer de la file d'attente. Si la file d'attente est vide, la méthode attendra qu'un élément soit disponible dans la file d'attente.