devstory

Le Tutoriel de Java Pipe.SinkChannel

  1. Pipe.SinkChannel
  2. Examples

1. Pipe.SinkChannel

On suppose que vous développiez une application Multithreading et que vous disposiez de 2 threads indépendants, Thread-A et Thread-B. La question est la voici :
  • Qu'est-ce qu'il faut faire pour que les données qui apparaissent dans Thread-A soient automatiquement transférées vers Thread-B ?
Pipe.SinkChannel et Pipe.SourceChannel sont deux classes créées pour gérer la situation mentionnée ci-dessus. Chaque fois que des données sont écrites dans Pipe.SinkChannel, elles apparaissent automatiquement dans Pipe.SourceChannel. C'est ce qu'on appelle l'effet de tuyau (pipe).
La classe Pipe.SinkChannel est une classe abstraite définie dans la classe Pipe et implémente les interfaces WritableByteChannel et GatheringByteChannel. Il agit comme un canal d'écriture.
public abstract static class SinkChannel extends AbstractSelectableChannel
                        implements WritableByteChannel, GatheringByteChannel
La classe Pipe.SourceChannel est une classe abstraite définie dans la classe Pipe et implémente les interfaces ReadableByteChannel et ScatteringByteChannel. Il agit comme un canal de lecture.
public abstract static class SourceChannel extends AbstractSelectableChannel
                        implements ReadableByteChannel, ScatteringByteChannel

2. Examples

Dans cet exemple, on écrira des messages dans un Pipe.SinkChannel (contrôlé par ThreadA). Ils apparaîtront automatiquement dans le Pipe.SourceChannel (contrôlé par ThreadB).
Pipe_ex1.java
package org.o7planning.pipe.sinkchannel.ex;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.Pipe;

public class Pipe_ex1 {

    public static void main(String[] args) throws IOException, InterruptedException {
        Pipe pipe = Pipe.open();

        ThreadA threadA = new ThreadA(pipe);
        ThreadB threadB = new ThreadB(pipe);

        threadA.start();
        threadB.start();
        threadA.join(); // Waits for this thread to die.
        threadB.join(); // Waits for this thread to die.
        System.out.println();
        System.out.println("Done!");
    }
}

//
class ThreadA extends Thread {
    private Pipe pipe;

    public ThreadA(Pipe pipe) {
        this.pipe = pipe;
    }

    @Override
    public void run() {
        try (Pipe.SinkChannel skChannel = this.pipe.sink();) { // try
            String[] messages = new String[] { "Hello\n", "Hi\n", "Bye\n" };

            ByteBuffer buffer = ByteBuffer.allocate(512);

            for (String msg : messages) {
                // Set position = 0; limit = capacity;
                buffer.clear();
                buffer.put(msg.getBytes("UTF-8"));
                buffer.flip();
                while (buffer.hasRemaining()) {
                    skChannel.write(buffer);
                }
                Thread.sleep(1000);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

//
class ThreadB extends Thread {
    private Pipe pipe;

    public ThreadB(Pipe pipe) {
        this.pipe = pipe;
    }

    @Override
    public void run() {
        try (Pipe.SourceChannel srcChannel = this.pipe.source();) { // try
            ByteBuffer buffer = ByteBuffer.allocate(512);

            while (srcChannel.read(buffer) != -1) {
                buffer.flip(); // Set limit = current position; position = 0;
                ByteArrayOutputStream baos = new ByteArrayOutputStream();

                while (buffer.hasRemaining()) {
                    byte b = buffer.get();
                    if (b != '\n') {
                        baos.write(b);
                    } else {
                        String s = baos.toString("UTF-8");
                        System.out.println(s);
                    }
                }
                buffer.clear(); // Set position =0; limit = capacity;
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
Output: