基于 NIO 的 Socket

  • 2016-01-08
  • 4,560
  • 1

缓冲区及其操作是所有NIO的基础。
传统流IO是基于字节的,所有IO都被视为单个字节的移动;而NIO是基于块的,NIO的性能肯定优于流IO。其性能的提高主要要得益于其使用的结构更接近操作系统执行IO的方式:通道和缓冲器。我们可以把它想象成一个煤矿,通道是一个包含煤层(数据)的矿藏,而缓冲器则是派送到矿藏的卡车。卡车载满煤炭而归,我们再从卡车上获得煤炭。也就是说,我们并没有直接和通道交互;我们只是和缓冲器交互,并把缓冲器派送到通道。通道要么从缓冲器获得数据,要么向缓冲器发送数据。(这段比喻出自Java编程思想)
NIO的主要应用在高性能、高容量服务端应用程序,典型的有Apache Mina就是基于它的。
对于缓冲区的详细介绍见:使用NIO提升性能

基于NIO的socket主要涉及三大块:Channel,Selector,Buffer。

传统Socket写法:
Server端:

final ExecutorService executorService = Executors.newFixedThreadPool(200);
final ServerSocket serverSocket = new ServerSocket(554);
final String charset = "utf-8";
while(true){
    try {
        final Socket socket = serverSocket.accept();
        executorService.execute(new Runnable() {
            public void run() {
                try {
                    BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream(),charset));
                    BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream(),charset));
                    StringBuilder sb = new StringBuilder();
                    for(String line=reader.readLine();line!=null;line = reader.readLine()){
                        sb.append(line);
                    }
                    writer.write("response:");
                    writer.write(sb.toString());
                    writer.flush();
                    socket.shutdownOutput();
                    socket.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        });
    } catch (IOException e) {
        e.printStackTrace();
    }
}

Client端:

Socket socket = new Socket("127.0.0.1", 554);
socket.setSoTimeout(5000);
String charset = "utf-8";
BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream(),charset ));
BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream(),charset));
writer.write("hello \r\nworld! 你好");
writer.flush();
socket.shutdownOutput();
StringBuilder sb = new StringBuilder();
for(String line=reader.readLine();line!=null;line = reader.readLine()){
    sb.append(line);
}
System.out.println(sb.toString());
writer.close();
reader.close();
socket.close();

关于Socket通道Channel

Socket通道有三个,分别是ServerSocketChannel、SocketChannel和DatagramChannel,而它们又分别对 应java.net包中的Socket对象ServerSocket、Socket和DatagramSocket;Socket通道被实例化时,都会创建一个对等的Socket对象。
Socket通道可以运行非阻塞模式并且是可选择的,非阻塞I/O与可选择性是紧密相连的,这也正是管理阻塞的API要在 SelectableChannel中定义的原因。设置非阻塞非常简单,只要调用configureBlocking(false)方法即可。如果需要中途更改阻塞模式,那么必须首先获得blockingLock()方法返回的对象的锁。

  • ServerSocketChannel ServerSocketChannel是一个基于通道的socket监听器。但它没有bind()方法,因此需要取出对等的Socket对象并使用它来绑定到某一端口以开始监听连接。在非阻塞模式下,当没有传入连接在等待时,其accept()方法会立即返回null。正是这种检查连接而不阻塞的能力实 现了可伸缩性并降低了复杂性,选择性也因此得以实现。
  • SocketChannel 相对于ServerSocketChannel,它扮演客户端,发起到监听服务器的连接,连接成功后,开始接收数据。
    要注意的是,调用它的open()方法仅仅是打开但并未连接,要建立连接需要紧接着调用connect()方法;也可以两步合为一步,调用open(SocketAddress remote)方法。 你会发现connect()方法并未提供timout参数,作为替代方案,你可以用isConnected()、isConnectPending()或finishConnect()方法来检查连接状态。
  • DatagramChannel 不同于前面两个通道对象,它是无连接的,它既可以作为服务器,也可以作为客户端。

关于Selector

Selector(选择器)是Java NIO中能够检测一到多个NIO通道,并能够知晓通道是否为诸如读写事件做好准备的组件。这样,一个单独的线程可以管理多个channel,从而管理多个网络连接。选择器可谓NIO中的重头戏,I/O复用的核心。如果在阻塞模式下注册一个通道,系统会抛出IllegalBlockingModeException异常。

创建一个Selector:

Selector selector = Selector.open();

注册通道:

chanel.configureBlocking(false);
SelectionKey key = channel.register(selector,SelectionKey.OP_ACCEPT);

通过SelectionKey对象来联系通道和选择器,可以分别通过selectionKey.channel(),selectionKey.selector()获取,channel.register()的第二个参数用于监听关心系的操作,可以通过selectionKey.interestOps()获取。selectionKey.readyOps()可以得到通道已经准备好的操作。另外ready集合是interest集合的子集。

选择键类中定义了4种可选择操作:read、write、 connect和accept:

SelectionKey.OP_CONNECT
SelectionKey.OP_ACCEPT
SelectionKey.OP_READ
SelectionKey.OP_WRITE

每个可选择通道都有一个validOps()的抽象方法,每个具体通道各自有不同的有效的可选择操作集 合,比如ServerSocketChannel的有效操作集合是accept,而SocketChannel的有效操作集合是read、write和 connect

在选择过程中,所关心的通道操作可以由方法 interestOps(int operations)进行修改,但不影响此次选择过程(在下一次选择过程中生效)。

一个Selector实例有3个SelectionKey的集合:

  • 所有SelectionKey集合:代表了注册在该Selector上的Channel,这个集合可以通过keys()方法返回。
  • 被选择的SelectionKey集合:代表了所有可通过select()方法监测到、需要进行IO处理的Channel,这个集合可以通过selectedKeys()返回。
  • 被取消的SelectionKey集合:代表了所有被取消注册关系的Channel,在下一次执行select()方法时,这些Channel对应的SelectionKey会被彻底删除,程序通常无须直接访问该集合。

SelectionKey中还可以添加附加对象,这样就能方便的识别某个给定的通道。例如,可以附加与通道一起使用的Buffer,或是包含聚集数据的某个对象。
使用方法如下:

selectionKey.attach(theObject);
Object attachedObj = selectionKey.attachment();

还可以在用register()方法向Selector注册Channel的时候附加对象。如:

SelectionKey key = channel.register(selector, SelectionKey.OP_READ, theObject);

通过Selector选择通道

一旦向Selector注册了一或多个通道,就可以调用几个重载的select()方法。这些方法返回你所感兴趣的事件(如连接、接受、读或写)已经准备就绪的那些通道。换句话说,如果你对“读就绪”的通道感兴趣,select()方法会返回读事件已经就绪的那些通道。

下面是select()方法:

int select()
int select(long timeout)
int selectNow()

select()阻塞到至少有一个通道在你注册的事件上就绪了。
select(long timeout)和select()一样,除了最长会阻塞timeout毫秒(参数)。
selectNow()不会阻塞,不管什么通道就绪都立刻返回。

select()方法返回的int值表示有多少通道已经就绪。亦即,自上次调用select()方法后有多少通道变成就绪状态。如果调用select()方法,因为有一个通道变成就绪状态,返回了1,若再次调用select()方法,如果另一个通道就绪了,它会再次返回1。如果对第一个就绪的channel没有做任何操作,现在就有两个就绪的通道,但在每次select()方法调用之间,只有一个通道就绪了。

一旦调用了select()方法,并且返回值表明有一个或更多个通道就绪了,然后可以通过调用selector的selectedKeys()方法,访问“已选择键集(selected key set)”中的就绪通道。如下所示:

Set selectedKeys = selector.selectedKeys();

当Selector注册Channel时,Channel.register()方法会返回一个SelectionKey 对象。这个对象代表了注册到该Selector的通道。可以通过SelectionKey的selectedKeySet()方法访问这些对象。

每次迭代SelectedKeySet,需要调用remove();Selector不会自己从已选择键集中移除SelectionKey实例。必须在处理完通道时自己移除。下次该通道变成就绪时,Selector会再次将其放入已选择键集中。

在以下情况下,SelectionKey对象会失效,这意味着Selector再也不会监控与它相关的事件:

  • 程序调用SelectionKey的cancel()方法
  • 关闭与SelectionKey关联的Channel
  • 与SelectionKey关联的Selector被关闭

基于NIO的Socket:
Server端:

ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
ServerSocket serverSocket = serverSocketChannel.socket();
serverSocket.bind(new InetSocketAddress(2223));

Selector selector = Selector.open();
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT, null);
System.out.println("started...");

for(;;){
    int readyChannel = selector.select();
    if(readyChannel<=0) continue;
    Set<SelectionKey> selectedKeys = selector.selectedKeys();
    Iterator<SelectionKey> it = selectedKeys.iterator();
    while(it.hasNext()){
        SelectionKey selectionKey = it.next();
        if(selectionKey.isAcceptable()){
            ServerSocketChannel ssc =  (ServerSocketChannel) selectionKey.channel();
            SocketChannel sc = ssc.accept();
            sc.configureBlocking(false);
            sc.register(selector, SelectionKey.OP_READ, null);
        }else if(selectionKey.isReadable()){
            SocketChannel sc = (SocketChannel) selectionKey.channel();
            ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
            StringBuilder sb = new StringBuilder();
            while(true){
                byteBuffer.clear();
                if(sc.read(byteBuffer)<=0)break;
                byteBuffer.flip();
                while(byteBuffer.hasRemaining()){
                    sb.append(byteBuffer.get());
                }
                byteBuffer.rewind();
                sc.write(byteBuffer);
            }
            System.out.println("request:"+sb.toString());
            selectionKey.cancel();
        }
//      if(selectionKey.isWritable()){
//          SocketChannel sc = (SocketChannel) selectionKey.channel();
//          ByteBuffer byteBuffer = ByteBuffer.wrap("no request response".getBytes());
//          sc.write(byteBuffer);
//          selectionKey.cancel();
//      }
        it.remove();
    }
}

Client端:

SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 2223));
socketChannel.configureBlocking(false);
Selector selector = Selector.open();
socketChannel.register(selector, SelectionKey.OP_READ|SelectionKey.OP_WRITE, null);
boolean isWrited = false;
for(;;){
    int num = selector.select();
    Set<SelectionKey> selectedKeys = selector.selectedKeys();
    Iterator<SelectionKey> it = selectedKeys.iterator();
    while(it.hasNext()){
        SelectionKey selectionKey = it.next();
        it.remove();
        if(!isWrited&&selectionKey.isWritable()){
            ByteBuffer byteBuffer = ByteBuffer.wrap("1".getBytes());
            socketChannel.write(byteBuffer);
            isWrited = true;
        }else if(selectionKey.isReadable()){
            ByteBuffer byteBuffer1 = ByteBuffer.allocate(1024);
            while(true){
                byteBuffer1.clear();
                if(socketChannel.read(byteBuffer1)<=0)break;
                byteBuffer1.flip();
                while(byteBuffer1.hasRemaining()){
                    System.out.print(byteBuffer1.get());
                }
            }
            selectionKey.cancel();
        }
    }
}
>> 转载请注明来源:基于 NIO 的 Socket

评论

  • admin回复

    测试

发表评论