1.5 Channel

2017-01-03 23:24:35 13,215 21

1 Channel简介 

通道(Channel)可以理解为数据传输的管道。通道与流不同的是,流只是在一个方向上移动(一个流必须是inputStream或者outputStream的子类),而通道可以用于读、写或者同时用于读写。

channel 类的继承关系

A3C58E87-39CE-4DB5-92CE-5A8797F2E3F6.png

为了保证尽可能清晰的显示我们关注的点,图中只显示了我们关心的Channel。

I/O 可以分为广义的两大类别: File I/O 和 Stream I/O。

那么相应地有两种类型的通道也就不足为怪了,它们是文件( file)通道和套接字( socket)通道。仔细看一下上图,你会发现有一个 FileChannel 类和三个 socket 通道类: SocketChannel、 ServerSocketChannel 和 DatagramChannel。

通道可以是单向( unidirectional)或者双向的( bidirectional)。一个 channel 类可能实现定义read( )方法的 ReadableByteChannel 接口,而另一个 channel 类也许实现 WritableByteChannel 接口以提供 write( )方法。

public interface ReadableByteChannel extends Channel {
    public int read(ByteBuffer dst) throws IOException;
}

public interface WritableByteChannel extends Channel{
    public int write(ByteBuffer src) throws IOException;
}

实现这两种接口其中之一的类都是单向的,只能在一个方向上传输数据。如果一个类同时实现这两个接口,那么它是双向的,可以双向传输数据。

可以看到read和write方法接受的都是一个ByteBuffer参数,其中read方法,就是往ByteBuffer中put数据,write方法就是将ByteBuffer中的数据get出来,以便发送给其他远程主机。

两种方法均返回已传输的字节数,可能比缓冲区的字节数少甚至可能为零。缓冲区的position位置也会发生与已传输字节相同数量的前移。如果只进行了部分传输,缓冲区可以被重新提交给通道并从上次中断的地方继续传输。该过程重复进行直到缓冲区的 hasRemaining( )方法返回 false 值。

在上面的类图中,我们可以看到FileChannel、SocketChannel通道都实现了这两个接口。从类定义的角度而言,这意味着FileChannel、SocketChannel 通道对象都是双向的。这对于 SocketChannel 不是问题,因为它们一直都是双向的,不过对于FileChannel 却是个问题了。

我们知道,一个文件可以在不同的时候以不同的权限打开。从 FileInputStream 对象的getChannel( )方法获取的 FileChannel 对象是只读的,不过从接口声明的角度来看却是双向的,因为FileChannel 实现 ByteChannel 接口。在这样一个通道上调用 write( )方法将抛出未经检查的NonWritableChannelException 异常,因为 FileInputStream 对象总是以 read-only 的权限打开文件。

通道可以以多种方式创建。 Socket 通道有可以直接创建新 socket 通道的工厂方法。但是一个FileChannel 对象却只能通过在一个打开的 RandomAccessFile、 FileInputStream 或 FileOutputStream对象上调用 getChannel( )方法来获取。您不能直接创建一个 FileChannel 对象。

SocketChannel sc = SocketChannel.open( );
sc.connect (new InetSocketAddress ("somehost", someport));

ServerSocketChannel ssc = ServerSocketChannel.open( );
ssc.socket( ).bind (new InetSocketAddress (somelocalport));

DatagramChannel dc = DatagramChannel.open( );

RandomAccessFile raf = new RandomAccessFile ("somefile", "r");
FileChannel fc = raf.getChannel( );

java.net 的 socket 类也有新的 getChannel( )方法。这些方法虽然能返回一个相应的 socket 通道对象,但它们却并非新通道的来源,RandomAccessFile.getChannel( )方法才是。只有在已经有通道存在的时候,它们才返回与一个 socket 关联的通道;它们永远不会创建新通道。

2 Socket通道详解

在通道类中,DatagramChannel 和 SocketChannel 实现定义读和写功能的接口而 ServerSocketChannel不实现。 ServerSocketChannel 负责监听传入的连接和创建新的 SocketChannel 对象,它本身从不传输数据。

全部 NIO中的socket 通道类( DatagramChannel、 SocketChannel 和 ServerSocketChannel)在被实例化时都会创建一个对等的BIO中的 socket 对象( Socket、 ServerSocket和 DatagramSocket)。

DatagramChannel、 SocketChannel 和 ServerSocketChannel通道类都定义了socket()方法,我们可以通过这个方法获取其关联的socket对象。另外每个Socket、 ServerSocket和 DatagramSocket都定义了getChannel()方法,来获取对应的通道。

需要注意是,只有通过通道类创建的socket对象,其getChannel方法才能返回对应的通道,如果直接new了socket对象,那么其getChannel方法返回的永远是null。

非阻塞模式

通道可以以阻塞( blocking)或非阻塞( nonblocking)模式运行。非阻塞模式的通道永远不会让调用的线程休眠。请求的操作要么立即完成,要么返回一个结果表明未进行任何操作。

这个陈述虽然简单却有着深远的含义。传统 Java socket的阻塞性质曾经是 Java 程序可伸缩性的最重要制约之一。非阻塞 I/O 是许多复杂的、高性能的程序构建的基础。

回顾我们之前讲解的BIO编程中,不能"以尽可能少的线程,处理尽可能多的client请求",就是因为通过Socket的getInputStream方法的read方法是阻塞的,一旦没有数据可读,处理线程就会被一直被block住。

默认情况下,一个通道创建,总是阻塞的,我们可以通过调用configureBlocking(boolean)方法即可,传递参数值为 true 则设为阻塞模式,参数值为 false 值设为非阻塞模式。而 isBlocking()方法来判断某个 socket 通道当前处于哪种模式

SocketChannel sc = SocketChannel.open( );
sc.configureBlocking (false); // nonblocking
    ...
   if ( ! sc.isBlocking( )) {
       doSomething (cs);
}

 偶尔地,我们也会需要防止 socket 通道的阻塞模式被更改。 API 中有一个 blockingLock( )方法,该方法会返回一个非透明的对象引用。返回的对象是通道实现修改阻塞模式时内部使用的。只有拥有此对象的锁的线程才能更改通道的阻塞模式,对于确保在执行代码的关键部分时 socket 通道的阻塞模式不会改变以及在不影响其他线程的前提下暂时改变阻塞模式来说,这个方法都是非常方便的。

Socket socket = null;
Object lockObj = serverChannel.blockingLock( );
// 执行关键代码部分的时候,使用这个锁进行同步
synchronize (lockObj)
{
// 一旦进入这个部分,锁就被获取到了,其他线程不能改变这个channel的阻塞模式
boolean prevState = serverChannel.isBlocking( );
serverChannel.configureBlocking (false);
socket = serverChannel.accept( );
serverChannel.configureBlocking (prevState);
}
// 释放锁,此时其他线程可以修改channel的阻塞模式
if (socket != null) {
doSomethingWithTheSocket (socket);
}

2.1 ServerSocketChannel

让我们从最简单的 ServerSocketChannel 来开始对 socket 通道类的讨论。

ServerSocketChannel 是一个基于通道的 socket 监听器。它同我们所熟悉的 java.net.ServerSocket执行相同的基本任务,不过它增加了通道语义,因此能够在非阻塞模式下运行。

用静态的 open( )工厂方法创建一个新的 ServerSocketChannel 对象,将会返回同一个未绑定的java.net.ServerSocket 关联的通道。该对等 ServerSocket 可以通过在返回的 ServerSocketChannel 上调用 socket( )方法来获取。作为 ServerSocketChannel 的对等体被创建的 ServerSocket 对象依赖通道实现。这些 socket 关联的 SocketImpl 能识别通道。通道不能被封装在随意的 socket 对象外面。

由于 ServerSocketChannel 没有 bind( )方法,因此有必要取出对等的 socket 并使用它来绑定到一个端口以开始监听连接。我们也是使用对等 ServerSocket 的 API 来根据需要设置其他的 socket 选项。

ServerSocketChannel ssc = ServerSocketChannel.open( );
ServerSocket serverSocket = ssc.socket( );
// Listen on port 1234
serverSocket.bind (new InetSocketAddress (1234));

同它的对等体 java.net.ServerSocket 一样, ServerSocketChannel 也有 accept( )方法。一旦您创建了一个 ServerSocketChannel 并用对等 socket 绑定了它,然后您就可以在其中一个上调用 accept( )。如果您选择在 ServerSocket 上调用 accept( )方法,那么它会同任何其他的 ServerSocket 表现一样的行为:总是阻塞并返回一个 java.net.Socket 对象。如果您选择在 ServerSocketChannel 上调用 accept( )方法则会返回 SocketChannel 类型的对象,返回的对象能够在非阻塞模式下运行。

如果以非阻塞模式被调用,当没有传入连接在等待时, ServerSocketChannel.accept( )会立即返回 null。正是这种检查连接而不阻塞的能力实现了可伸缩性并降低了复杂性。可选择性也因此得到实现。我们可以使用一个选择器实例来注册一个 ServerSocketChannel 对象以实现新连接到达时自动通知的功能。下面的代码演示了如何使用一个非阻塞的 accept( )方法:

public class ChannelAccept {
    public static final String GREETING = "Hello I must be going.\r\n";

    public static void main(String[] argv)
            throws Exception {
        int port = 1234; // default
        if (argv.length > 0) {
            port = Integer.parseInt(argv[0]);
        }
        ByteBuffer buffer = ByteBuffer.wrap(GREETING.getBytes());
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.socket().bind(new InetSocketAddress(port));
        ssc.configureBlocking(false);
        while (true) {
            System.out.println("Waiting for connections");
            SocketChannel sc = ssc.accept();
            if (sc == null) {
// no connections, snooze a while
                Thread.sleep(2000);
            } else {
                sc.configureBlocking(false);
                ByteBuffer allocate = ByteBuffer.allocateDirect (16 * 1024);
                while(sc.read(allocate)>0){
                    allocate.flip();
                    while (buffer.hasRemaining( )) {
                        byte b = buffer.get();
                        System.out.println(b);
                    }
                    allocate.clear();
                }

                System.out.println("Incoming connection from: "
                        + sc.socket().getRemoteSocketAddress());
                buffer.rewind();
                sc.write(buffer);
                sc.close();
            }
        }
    }
}

这段程序的作用是,在1234端口上接受client的请求,一旦接收到client的请求,会给其回复固定的字符串响应"Hello I must be going."

运行这段程序,可以看到控制台打印出

Waiting for connections
Waiting for connections
Waiting for connections
Waiting for connections
......

说明程序的确运行在非阻塞模式下,因为否则就会想ServerSocket.accpet方法那样,一直阻塞下去。

现在通过命令行执行

telnet localhost 1234

可以看到得到一个响应之后,连接立马关闭

BEA56C70-651E-4189-9001-97EAB22C47E0.png

2.2 SocketChannel

下面开始学习 SocketChannel,它是使用最多的 socket 通道类:

Socket 和 SocketChannel 类封装点对点、有序的网络连接,就是我们所熟知并喜爱的 TCP/IP网络连接。 SocketChannel 扮演客户端发起同一个监听服务器的连接。直到连接成功,它才能收到数据并且只会从连接到的地址接收。

每个 SocketChannel 对象创建时都是同一个对等的 java.net.Socket 对象串联的。静态的 open( )方法可以创建一个新的 SocketChannel 对象,而在新创建的 SocketChannel 上调用 socket( )方法能返回它对等的 Socket 对象;在该 Socket 上调用 getChannel( )方法则能返回最初的那个 SocketChannel。

新创建的 SocketChannel 虽已打开却是未连接的。在一个未连接的 SocketChannel 对象上尝试一个 I/O 操作会导致 NotYetConnectedException 异常。我们可以通过在通道上直接调用 connect( )方法或在通道关联的 Socket 对象上调用 connect( )来将该 socket 通道连接。一旦一个 socket 通道被连接,它将保持连接状态直到被关闭。您可以通过调用布尔型的 isConnected( )方法来测试某个SocketChannel 当前是否已连接。

下面两段代码是等价的

通过open方法
SocketChannel socketChannel =
SocketChannel.open (new InetSocketAddress ("somehost", somePort));

通过connect方法
SocketChannel socketChannel = SocketChannel.open( );
socketChannel.connect (new InetSocketAddress ("somehost", somePort));

如果您选择使用传统方式进行连接——通过在对等 Socket 对象上调用 connect( )方法,那么传统的连接语义将适用于此。线程在连接建立好或超时过期之前都将保持阻塞。如果您选择通过在通道上直接调用 connect( )方法来建立连接并且通道处于阻塞模式(默认模式),那么连接过程实际上是一样的。

在 SocketChannel 上并没有一种 connect( )方法可以让您指定超时( timeout)值,当 connect( )方法在非阻塞模式下被调用时 SocketChannel 提供并发连接:它发起对请求地址的连接并且立即返回值。如果返回值是 true,说明连接立即建立了(这可能是本地环回连接);如果连接不能立即建立, connect( )方法会返回 false 且并发地继续连接建立过程。

面向流的的 socket 建立连接状态需要一定的时间,因为两个待连接系统之间必须进行包对话以建立维护流 socket 所需的状态信息。跨越开放互联网连接到远程系统会特别耗时。假如某个SocketChannel 上当前正由一个并发连接, isConnectPending( )方法就会返回 true 值。

调用 finishConnect( )方法来完成连接过程,该方法任何时候都可以安全地进行调用。假如在一个非阻塞模式的 SocketChannel 对象上调用 finishConnect( )方法,将可能出现下列情形之一:

  • connect( )方法尚未被调用。那么将产生 NoConnectionPendingException 异常。

  • 连接建立过程正在进行,尚未完成。那么什么都不会发生, finishConnect( )方法会立即返回false 值。

  • 在非阻塞模式下调用 connect( )方法之后, SocketChannel 又被切换回了阻塞模式。那么如果有必要的话,调用线程会阻塞直到连接建立完成, finishConnect( )方法接着就会返回 true值。

  • 在初次调用 connect( )或最后一次调用 finishConnect( )之后,连接建立过程已经完成。那么SocketChannel 对象的内部状态将被更新到已连接状态, finishConnect( )方法会返回 true值,然后 SocketChannel 对象就可以被用来传输数据了。

  • 连接已经建立。那么什么都不会发生, finishConnect( )方法会返回 true 值。

当通道处于中间的连接等待( connection-pending)状态时,您只可以调用 finishConnect( )、isConnectPending( )或 isConnected( )方法。一旦连接建立过程成功完成, isConnected( )将返回 true值。

InetSocketAddress addr = new InetSocketAddress (host, port);
SocketChannel sc = SocketChannel.open( );
sc.configureBlocking (false);
sc.connect (addr);
while ( ! sc.finishConnect( )) {
   doSomethingElse( );
}
doSomethingWithChannel (sc);
sc.close( );

3 Channel案例

在这个案例中,我们使用nio中的channel+线程池,来实现TimeServer、TimeClient

Server端

public class TimeServer {
    private  BlockingQueue<SocketChannel> idleQueue =new LinkedBlockingQueue<SocketChannel>();
    private  BlockingQueue<Future<SocketChannel>> workingQueue=new LinkedBlockingQueue<Future<SocketChannel>>();
    private  ExecutorService executor = Executors.newSingleThreadExecutor();
   
     {
        new Thread(){
            @Override
            public void run() {
            try {
               while (true) {
                   //task1:迭代当前idleQueue中的SocketChannel,提交到线程池中执行任务,并将其移到workingQueue中
                        for (int i = 0; i < idleQueue.size(); i++) {
                            SocketChannel socketChannel = idleQueue.poll();
                            if (socketChannel != null) {
                                Future<SocketChannel> result = executor.submit(new TimeServerHandleTask(socketChannel), socketChannel);
                                workingQueue.put(result);
                            }
                        }
                        //task2:迭代当前workingQueue中的SocketChannel,如果任务执行完成,将其移到idleQueue中
                        for (int i = 0; i < workingQueue.size(); i++) {
                            Future<SocketChannel> future = workingQueue.poll();
                            if (!future.isDone()){
                                workingQueue.put(future);
                                continue;
                            }
                            SocketChannel channel  = null;
                            try {
                                channel = future.get();
                                idleQueue.put(channel);
                            } catch (ExecutionException e) {
                                //如果future.get()抛出异常,关闭SocketChannel,不再放回idleQueue
                                channel.close();
                                e.printStackTrace();
                            }
                        }
                    }
            } catch (Exception e) {
               e.printStackTrace();
            }
            }
        }.start();
    }
    public static void main(String[] args) throws IOException, InterruptedException {
        TimeServer timeServer = new TimeServer();
        ServerSocketChannel ssc=ServerSocketChannel.open();
        ssc.configureBlocking(false);
        ssc.socket().bind(new InetSocketAddress(8080));
        while (true){
            SocketChannel socketChannel = ssc.accept();
            if(socketChannel==null){
                continue;
            }else{
                socketChannel.configureBlocking(false);
                timeServer.idleQueue.add(socketChannel);
            }
        }
    }
}

public class TimeServerHandleTask implements Runnable {
    SocketChannel socketChannel;
    ExecutorService executorService;
   ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1024);

    public TimeServerHandleTask(SocketChannel socketChannel, ExecutorService executorService) {
        this.socketChannel = socketChannel;
        this.executorService = executorService;
    }

    @Override
    public void run() {
            try {
                if(socketChannel.read(byteBuffer)>0){
                    while (true){
                        byteBuffer.flip();
                        if(byteBuffer.remaining()<"GET CURRENT TIME".length()){
                            byteBuffer.compact();
                            socketChannel.read(byteBuffer);
                            continue;
                        }
                        byte[] request=new byte[byteBuffer.remaining()];
                        byteBuffer.get(request);
                        String requestStr=new String(request);
                        byteBuffer.clear();
                        if (!"GET CURRENT TIME".equals(requestStr)) {
                            socketChannel.write(byteBuffer.put("BAD_REQUEST".getBytes()));
                        } else {
                            ByteBuffer byteBuffer = this.byteBuffer.put(Calendar.getInstance()
                                           .getTime().toLocaleString().getBytes());
                            byteBuffer.flip();
                            socketChannel.write(byteBuffer);
                        }
                       
                    }
                }
                        TimeServerHandleTask currentTask = new TimeServerHandleTask(socketChannel,
                              executorService);
                        executorService.submit(currentTask);
            } catch (Exception e) {
                e.printStackTrace();
            }
    }
}

 

TimeServer中维护了两个队列,idleQueue 和workingQueue。

    工作步骤如下所示:

    1、在main线程中,当接受到一个新的连接时,我们将相应的SocketChannel放入idleQueue。

    2、在static静态代码块中,我们创建了一个Thread。其作用是不断的循环idleQueue和workingQueue。

首先循环idleQueue,迭代出其中的SocketChannel,然后封装成一个TimeServerHandleTask对象,提交到线程池中处理这个SocketChannel的请求,同时我们会将SocketChannel中移除,放到workingQueue中。需要注意的是,这个SocketChannel可能只是与服务端建立了连接,但是没有发送请求,又或者是发送了一次或者多次请求。发送一次"GET CURRENT TIME”,就相当于一次请求。在TimeServerHandleTask中,会判断是否发送了请求,如果没有请求则不需要处理。如果SocketChannel发送了多次请求,TimeServerHandleTask一次也只会处理一个请求。其他的请求等到下一次循环的时候再处理。因为使用线程池的情况,线程的数量有限,所以要合理的分配,不能让一个线程一直处理一个client的请求。

接着是迭代workingQueue,通过future.isDone()判断当前请求是否处理完成,如果处理完成,将其从workingQueue中移除,重新加入idleQueue中。 


TimeServerHandleTask

public class TimeServerHandleTask implements Runnable {
   SocketChannel socketChannel;
   public TimeServerHandleTask(SocketChannel socketChannel) {
      this.socketChannel = socketChannel;
   }
   @Override
   public void run() {
      try {
         ByteBuffer requestBuffer = ByteBuffer.allocate("GET CURRENT TIME".length());
         //尝试读取数据,因为是非阻塞,所以如果没有数据会立即返回。
         int bytesRead = socketChannel.read(requestBuffer);
        //如果没有读取到数据,说明当前SocketChannel并没有发送请求,不需要处理
         if (bytesRead <= 0) {
            return;
         }
         //如果读取到了数据,则需要考虑粘包、解包问题,这个while代码是为了读取一个完整的请求信息"GET CURRENT TIME",
         while (requestBuffer.hasRemaining()) {
            socketChannel.read(requestBuffer);
         }
         String requestStr = new String(requestBuffer.array());
         if (!"GET CURRENT TIME".equals(requestStr)) {
            String bad_request = "BAD_REQUEST";
            ByteBuffer responseBuffer = ByteBuffer.allocate(bad_request.length());
                responseBuffer.put(bad_request.getBytes());
                responseBuffer.flip();
                socketChannel.write(responseBuffer);
         } else {
                String timeStr = Calendar.getInstance().getTime().toLocaleString();
                ByteBuffer responseBuffer = ByteBuffer.allocate(timeStr.length());
                responseBuffer.put(timeStr.getBytes());
                responseBuffer.flip();
            socketChannel.write(responseBuffer);
         }
      } catch (Exception e) {
         throw new RuntimeException(e);
      }
   }
}



client端

public class TimeClient {
    //连接超时时间
    static int connectTimeOut=3000;
    static ByteBuffer buffer=ByteBuffer.allocateDirect(1024);
    public static void main(String[] args) throws IOException, InterruptedException {
        SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress(8080));
        socketChannel.configureBlocking(false);
        long start=System.currentTimeMillis();
        while (!socketChannel.finishConnect()){
            if (System.currentTimeMillis()-start>=connectTimeOut){
                throw new RuntimeException("尝试建立连接超过3秒");
            }
        }
        //如果走到这一步,说明连接建立成功
        while (true){
            buffer.put("GET CURRENT TIME".getBytes());
            buffer.flip();
            socketChannel.write(buffer);
            buffer.clear();
            if(socketChannel.read(buffer)>0){
                buffer.flip();
                byte[] response=new byte[buffer.remaining()];
                buffer.get(response);
                System.out.println("reveive response:"+new String(response));
                buffer.clear();
            }
            Thread.sleep(5000);
        }

    }
}

先运行server端,再运行client端,可以在client端看到类似以下的输出

reveive response:2016-12-18 21:52:09
reveive response:2016-12-18 21:52:14
reveive response:2016-12-18 21:52:19
reveive response:2016-12-18 21:52:24
reveive response:2016-12-18 21:52:29
...

到这里,我们好像已经可以达到我们的目标"以尽可能少的线程,处理尽可能多的client请求" ,但是现实总是残酷的,这个案例中代码的效率太低了。 

因为我们并不知道一个SocketChannel是否发送了请求,所以必须迭代所有的SocketChannel,然后尝试读取请求数据,如果有请求,就处理,否则就跳过。假设一个有10000个连接,前9999个连接都没有请求,刚好最后一个连接才有请求。那么前9999次任务处理都是没有必要的。

如果有一种方式,可以让我们直接获取到真正发送了请求的SocketChannel,那么效率将会高的多。

     这就是我们下一节将要讲解的Selector(选择器),其可以帮助我们管理所有与server端已经建立了连接的client(SocketChannel),并将准备好数据的client过滤出来。我们可以有一个专门的线程来运行Selector,将准备好数据的client交给工作线程来处理。 


致谢:


感谢@Jerry,@UFO,@苦咲 对本文案例提出的建议,于2018年4月24日进行了修改。