devstory

Le Tutoriel de ava TransferQueue

  1. TransferQueue
  2. TransferQueue methods
  3. Par exemple :
  4. getWaitingConsumerCount()
  5. hasWaitingConsumer()
  6. transfer(E)
  7. tryTransfer(E)
  8. tryTransfer(E, long, TimeUnit)

1. TransferQueue

En tant que sous-interface de BlockingQueue, TransferQueue dispose de toutes les fonctionnalités de l'interface parentale, en plus, elle offre à Producer la possibilité d'attendre la réception du "produit" (élément) par le Consumer. TransferQueue est utile dans certains types d'applications, telles que les applications de messagerie.
public interface TransferQueue<E> extends BlockingQueue<E>
Par rapport à BlockingQueue<E>, TransferQueue<E> fournit quelques méthodes supplémentaires, y compris :
void transfer(E e) throws InterruptedException;
boolean tryTransfer(E e);
boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException;
boolean hasWaitingConsumer();
int getWaitingConsumerCount();
transfer(e):
Ajouter un élément à cette TransferQueue et attendre qu'elle soit reçue par un consommateur en attente via la méthode BlockingQueue.take() ou BlockingQueue.poll(timeout,unit).
tryTransfer(e):
La méthode tryTransfer(e) n'ajoute un élément à cette TransferQueue que s'il y a un consommateur en attente de recevoir l'élément via la méthode BlockingQueue.take() ou BlockingQueue.poll(timeout,unit) et s'assurer que le consommateur le recevra élément immédiatement. Sinon, la méthode renvoie false et aucune autre action n'est entreprise.
tryTransfer(e, timeout, unit):
La méthode tryTransfer(e,timeout,unit) n'ajoute un élément à cette TransferQueue que si, au cours d'une période d'attente spécifiée, un consommateur attend de recevoir l'élément via BlockingQueue.take() ou BlockingQueue.poll(timeout,unit), et il faut s'assurer que le consommateur reçoit cet élément. Sinon, la méthode renvoie false et aucune autre action n'est entreprise.

2. TransferQueue methods

Les méthodes sont définies dans l'interface TransferQueue<E> :
void transfer(E e) throws InterruptedException;
boolean tryTransfer(E e);
boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException;
boolean hasWaitingConsumer();
int getWaitingConsumerCount();
Les méthodes héritées de l'interface BlockingQueue<E> :
void put(E e) throws InterruptedException;
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
E take() throws InterruptedException;
E poll(long timeout, TimeUnit unit) throws InterruptedException;
int remainingCapacity();
int drainTo(Collection<? super E> c);  
int drainTo(Collection<? super E> c, int maxElements);
Les méthodes héritées de l'interface Queue<E> :
boolean add(E e);
boolean offer(E e);
E remove();
E poll();
E element();
E peek();
Les méthodes héritées de l'interface Collection<E> :
int size();
boolean isEmpty();
boolean contains(Object o);
Iterator<E> iterator();
Object[] toArray();
<T> T[] toArray(T[] a);
boolean add(E e);
boolean remove(Object o);
boolean containsAll(Collection<?> c);  
boolean addAll(Collection<? extends E> c);
boolean removeAll(Collection<?> c);
boolean retainAll(Collection<?> c);
void clear();  
boolean equals(Object o);
int hashCode();

default <T> T[] toArray(IntFunction<T[]> generator)  
default boolean removeIf(Predicate<? super E> filter)
default Spliterator<E> spliterator()  
default Stream<E> stream()  
default Stream<E> parallelStream()

3. Par exemple :

Dans l'exemple ci-dessous, le Producer envoie des messages au Consumer via la méthode TransferQueue.transfer(e).
Compte tenu du résultat de cet exemple, on trouve que: si tous les Consumer(s) sont occupés à consommer les messages (ce qui signifie qu'aucun Consumer n'est en attente), la méthode TransferQueue.transfer(e) sera bloquée (tomber dans l'état d'attente).
TransferQueue_transfer_ex1.java
package org.o7planning.transferqueue.aa;

import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.TransferQueue;

public class TransferQueue_transfer_ex1 {

    public static void main(String[] args) {
        TransferQueue<String> queue = new LinkedTransferQueue<String>();

        queue.add("NORMAL-MESSAGE 1");
        queue.add("NORMAL-MESSAGE 2");
        queue.add("NORMAL-MESSAGE 3");

        Producer producer1 = new Producer(queue);
        Consumer consumer1 = new Consumer(queue);
        Consumer consumer2 = new Consumer(queue);

        // Start..
        producer1.start();
        
        consumer1.start();
        consumer2.start();
    }
}

class Producer extends Thread {
    final TransferQueue<String> queue;
    private static int count = 1;

    Producer(TransferQueue<String> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            while (true) {
                String message = "IMPORTANT-MESSAGE " + count++;
                System.out.println("[PRODUCER] Transfering: " + message);
                this.queue.transfer(message);
                System.out.println("[PRODUCER] Transfered: " + message + " (**)");
            }
        } catch (InterruptedException ex) {
        }
    }
}
class Consumer extends Thread {
    final TransferQueue<String> queue;

    Consumer(TransferQueue<String> queue) {
        this.queue = queue;
    }
    @Override
    public void run() {
        try {
            while (true) {
                String message = queue.take();
                this.longConsume(message);
            }
        } catch (InterruptedException ex) {
        }
    }
    // Need 2 seconds to consume the message.
    private void longConsume(String message) throws InterruptedException  {
        System.out.println(" [CONSUMER] Consuming: " + message);
        Thread.sleep(2 * 1000); // 2 seconds.
        System.out.println(" [CONSUMER] Consumed: " + message);
    }
}
Output:
[PRODUCER] Transfering: IMPORTANT-MESSAGE 1
 [CONSUMER] Consuming: NORMAL-MESSAGE 1
 [CONSUMER] Consuming: NORMAL-MESSAGE 2
 [CONSUMER] Consumed: NORMAL-MESSAGE 1
 [CONSUMER] Consumed: NORMAL-MESSAGE 2
 [CONSUMER] Consuming: NORMAL-MESSAGE 3
 [CONSUMER] Consuming: IMPORTANT-MESSAGE 1
[PRODUCER] Transfered: IMPORTANT-MESSAGE 1 (**)
[PRODUCER] Transfering: IMPORTANT-MESSAGE 2
 [CONSUMER] Consumed: NORMAL-MESSAGE 3
 [CONSUMER] Consuming: IMPORTANT-MESSAGE 2
 [CONSUMER] Consumed: IMPORTANT-MESSAGE 1
[PRODUCER] Transfered: IMPORTANT-MESSAGE 2 (**)
[PRODUCER] Transfering: IMPORTANT-MESSAGE 3
[PRODUCER] Transfered: IMPORTANT-MESSAGE 3 (**)
[PRODUCER] Transfering: IMPORTANT-MESSAGE 4
 [CONSUMER] Consuming: IMPORTANT-MESSAGE 3
 [CONSUMER] Consumed: IMPORTANT-MESSAGE 2
 [CONSUMER] Consuming: IMPORTANT-MESSAGE 4
[PRODUCER] Transfered: IMPORTANT-MESSAGE 4 (**)
[PRODUCER] Transfering: IMPORTANT-MESSAGE 5
 [CONSUMER] Consumed: IMPORTANT-MESSAGE 3
 [CONSUMER] Consuming: IMPORTANT-MESSAGE 5
[PRODUCER] Transfered: IMPORTANT-MESSAGE 5 (**)
...
L'exemple ci-dessous illustre la manière d'utilisation de la méthode TransferQueue.tryTransfer(e). Dans cet exemple, le Producer crée des messages et essaie de les transférer au Consumer en attente.
Compte tenu de la sortie de cet exemple, on voit que beaucoup de messages créés par le Producer seront ignorés, car au moment de la convocation de la méthode TransferQueue.tryTransfer(e), il n'y a pas de Consumer en attente.
TransferQueue_tryTransfer_ex1.java
package org.o7planning.transferqueue.bb;

import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.TransferQueue;

public class TransferQueue_tryTransfer_ex1 {

    public static void main(String[] args) {
        TransferQueue<String> queue = new LinkedTransferQueue<String>();
        queue.add("NORMAL-MESSAGE 1");
        queue.add("NORMAL-MESSAGE 2");
        queue.add("NORMAL-MESSAGE 3");

        Producer producer1 = new Producer(queue);
        Consumer consumer1 = new Consumer(queue);
        Consumer consumer2 = new Consumer(queue);

        // Start..
        producer1.start();
        consumer1.start();
        consumer2.start();
    }
}
class Producer extends Thread {
    final TransferQueue<String> queue;
    private static int count = 1;

    public Producer(TransferQueue<String> queue) {
        this.queue = queue;
    }
    @Override
    public void run() {
        try {
            while (true) {
                String message = "IMPORTANT-MESSAGE " + count++;
                this.queue.tryTransfer(message); // Calling tryTransfer method.
                Thread.sleep(1 * 1000); // 1 seconds.
            }
        } catch (InterruptedException ex) {
        }
    }
}
class Consumer extends Thread {
    final TransferQueue<String> queue;

    Consumer(TransferQueue<String> queue) {
        this.queue = queue;
    }
    @Override
    public void run() {
        try {
            while (true) {
                String message = queue.take();
                System.out.println(">> " + message);
                Thread.sleep(3 * 1000); // 3 seconds
            }
        } catch (InterruptedException ex) {
        }
    }
}
Output:
>> NORMAL-MESSAGE 1
>> NORMAL-MESSAGE 2
>> NORMAL-MESSAGE 3
>> IMPORTANT-MESSAGE 4
>> IMPORTANT-MESSAGE 7
>> IMPORTANT-MESSAGE 8
>> IMPORTANT-MESSAGE 10
>> IMPORTANT-MESSAGE 11
>> IMPORTANT-MESSAGE 13
>> IMPORTANT-MESSAGE 14
>> IMPORTANT-MESSAGE 16
>> IMPORTANT-MESSAGE 17
Les messages créés par le Producer ont été ignorés :
  • IMPORTANT-MESSAGE 1
  • IMPORTANT-MESSAGE 2
  • IMPORTANT-MESSAGE 3
  • IMPORTANT-MESSAGE 5
  • IMPORTANT-MESSAGE 6
  • ...

4. getWaitingConsumerCount()

int getWaitingConsumerCount();
Renvoyer le nombre estimé de consommateurs en attente de recevoir un élément de cette TransferQueue via la méthode BlockingQueue.take() ou BlockingQueue.poll(timeout,unit).
La valeur de retour est une approximation d'un état de fait momentané, qui peut être inexact si les consommateurs ont terminé ou cessé d'attendre. La valeur peut être utile pour la surveillance et l'heuristique, mais pas pour le contrôle de la synchronisation. Les implémentations de cette méthode sont susceptibles d'être sensiblement plus lentes que celles de hasWaitingConsumer().

5. hasWaitingConsumer()

boolean hasWaitingConsumer();
Renvoyer true si au moins un consommateur attend de recevoir un élément via la méthode BlockingQueue.take() ou BlockingQueue.poll(timeout,unit). La valeur de retour représente un état transitoire.

6. transfer(E)

void transfer(E e) throws InterruptedException;
Ajouter un élément à cette TransferQueue et attendre qu'elle soit reçue par un consommateur en attente via la méthode BlockingQueue.take() ou BlockingQueue.poll(timeout,unit).

7. tryTransfer(E)

boolean tryTransfer(E e);
La méthode tryTransfer(e) n'ajoute un élément à cette TransferQueue que s'il y a un consommateur en attente de recevoir l'élément via la méthode BlockingQueue.take() ou BlockingQueue.poll(timeout,unit) et s'assurer que le consommateur le recevra élément immédiatement. Sinon, la méthode renvoie false et aucune autre action n'est entreprise.

8. tryTransfer(E, long, TimeUnit)

boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException;
La méthode tryTransfer(e,timeout,unit) n'ajoute un élément à cette TransferQueue que si, au cours d'une période d'attente spécifiée, un consommateur attend de recevoir l'élément via BlockingQueue.take() ou BlockingQueue.poll(timeout,unit), et il faut s'assurer que le consommateur reçoit cet élément. Sinon, la méthode renvoie false et aucune autre action n'est entreprise.