基于 NIO 的 Socket
缓冲区及其操作是所有NIO的基础。
传统流IO是基于字节的,所有IO都被视为单个字节的移动;而NIO是基于块的,NIO的性能肯定优于流IO。其性能的提高主要要得益于其使用的结构更接近操作系统执行IO的方式:通道和缓冲器。我们可以把它想象成一个煤矿,通道是一个包含煤层(数据)的矿藏,而缓冲器则是派送到矿藏的卡车。卡车载满煤炭而归,我们再从卡车上获得煤炭。也就是说,我们并没有直接和通道交互;我们只是和缓冲器交互,并把缓冲器派送到通道。通道要么从缓冲器获得数据,要么向缓冲器发送数据。(这段比喻出自Java编程思想)
NIO的主要应用在高性能、高容量服务端应用程序,典型的有Apache Mina就是基于它的。
对于缓冲区的详细介绍见:使用NIO提升性能
基于NIO的socket主要涉及三大块:Channel,Selector,Buffer。
传统Socket写法:
Server端:
final ExecutorService executorService = Executors.newFixedThreadPool(200);
final ServerSocket serverSocket = new ServerSocket(554);
final String charset = "utf-8";
while(true){
try {
final Socket socket = serverSocket.accept();
executorService.execute(new Runnable() {
public void run() {
try {
BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream(),charset));
BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream(),charset));
StringBuilder sb = new StringBuilder();
for(String line=reader.readLine();line!=null;line = reader.readLine()){
sb.append(line);
}
writer.write("response:");
writer.write(sb.toString());
writer.flush();
socket.shutdownOutput();
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
});
} catch (IOException e) {
e.printStackTrace();
}
}
Client端:
Socket socket = new Socket("127.0.0.1", 554);
socket.setSoTimeout(5000);
String charset = "utf-8";
BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream(),charset ));
BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream(),charset));
writer.write("hello \r\nworld! 你好");
writer.flush();
socket.shutdownOutput();
StringBuilder sb = new StringBuilder();
for(String line=reader.readLine();line!=null;line = reader.readLine()){
sb.append(line);
}
System.out.println(sb.toString());
writer.close();
reader.close();
socket.close();
关于Socket通道Channel
Socket通道有三个,分别是ServerSocketChannel、SocketChannel和DatagramChannel,而它们又分别对 应java.net包中的Socket对象ServerSocket、Socket和DatagramSocket;Socket通道被实例化时,都会创建一个对等的Socket对象。
Socket通道可以运行非阻塞模式并且是可选择的,非阻塞I/O与可选择性是紧密相连的,这也正是管理阻塞的API要在 SelectableChannel中定义的原因。设置非阻塞非常简单,只要调用configureBlocking(false)方法即可。如果需要中途更改阻塞模式,那么必须首先获得blockingLock()方法返回的对象的锁。
- ServerSocketChannel ServerSocketChannel是一个基于通道的socket监听器。但它没有bind()方法,因此需要取出对等的Socket对象并使用它来绑定到某一端口以开始监听连接。在非阻塞模式下,当没有传入连接在等待时,其accept()方法会立即返回null。正是这种检查连接而不阻塞的能力实 现了可伸缩性并降低了复杂性,选择性也因此得以实现。
- SocketChannel 相对于ServerSocketChannel,它扮演客户端,发起到监听服务器的连接,连接成功后,开始接收数据。
要注意的是,调用它的open()方法仅仅是打开但并未连接,要建立连接需要紧接着调用connect()方法;也可以两步合为一步,调用open(SocketAddress remote)方法。 你会发现connect()方法并未提供timout参数,作为替代方案,你可以用isConnected()、isConnectPending()或finishConnect()方法来检查连接状态。 - DatagramChannel 不同于前面两个通道对象,它是无连接的,它既可以作为服务器,也可以作为客户端。
关于Selector
Selector(选择器)是Java NIO中能够检测一到多个NIO通道,并能够知晓通道是否为诸如读写事件做好准备的组件。这样,一个单独的线程可以管理多个channel,从而管理多个网络连接。选择器可谓NIO中的重头戏,I/O复用的核心。如果在阻塞模式下注册一个通道,系统会抛出IllegalBlockingModeException异常。
创建一个Selector:
Selector selector = Selector.open();
注册通道:
chanel.configureBlocking(false);
SelectionKey key = channel.register(selector,SelectionKey.OP_ACCEPT);
通过SelectionKey对象来联系通道和选择器,可以分别通过selectionKey.channel(),selectionKey.selector()获取,channel.register()的第二个参数用于监听关心系的操作,可以通过selectionKey.interestOps()获取。selectionKey.readyOps()可以得到通道已经准备好的操作。另外ready集合是interest集合的子集。
选择键类中定义了4种可选择操作:read、write、 connect和accept:
SelectionKey.OP_CONNECT
SelectionKey.OP_ACCEPT
SelectionKey.OP_READ
SelectionKey.OP_WRITE
每个可选择通道都有一个validOps()的抽象方法,每个具体通道各自有不同的有效的可选择操作集 合,比如ServerSocketChannel的有效操作集合是accept,而SocketChannel的有效操作集合是read、write和 connect。
在选择过程中,所关心的通道操作可以由方法 interestOps(int operations)进行修改,但不影响此次选择过程(在下一次选择过程中生效)。
一个Selector实例有3个SelectionKey的集合:
- 所有SelectionKey集合:代表了注册在该Selector上的Channel,这个集合可以通过keys()方法返回。
- 被选择的SelectionKey集合:代表了所有可通过select()方法监测到、需要进行IO处理的Channel,这个集合可以通过selectedKeys()返回。
- 被取消的SelectionKey集合:代表了所有被取消注册关系的Channel,在下一次执行select()方法时,这些Channel对应的SelectionKey会被彻底删除,程序通常无须直接访问该集合。
SelectionKey中还可以添加附加对象,这样就能方便的识别某个给定的通道。例如,可以附加与通道一起使用的Buffer,或是包含聚集数据的某个对象。
使用方法如下:
selectionKey.attach(theObject);
Object attachedObj = selectionKey.attachment();
还可以在用register()方法向Selector注册Channel的时候附加对象。如:
SelectionKey key = channel.register(selector, SelectionKey.OP_READ, theObject);
通过Selector选择通道
一旦向Selector注册了一或多个通道,就可以调用几个重载的select()方法。这些方法返回你所感兴趣的事件(如连接、接受、读或写)已经准备就绪的那些通道。换句话说,如果你对“读就绪”的通道感兴趣,select()方法会返回读事件已经就绪的那些通道。
下面是select()方法:
int select()
int select(long timeout)
int selectNow()
select()阻塞到至少有一个通道在你注册的事件上就绪了。
select(long timeout)和select()一样,除了最长会阻塞timeout毫秒(参数)。
selectNow()不会阻塞,不管什么通道就绪都立刻返回。
select()方法返回的int值表示有多少通道已经就绪。亦即,自上次调用select()方法后有多少通道变成就绪状态。如果调用select()方法,因为有一个通道变成就绪状态,返回了1,若再次调用select()方法,如果另一个通道就绪了,它会再次返回1。如果对第一个就绪的channel没有做任何操作,现在就有两个就绪的通道,但在每次select()方法调用之间,只有一个通道就绪了。
一旦调用了select()方法,并且返回值表明有一个或更多个通道就绪了,然后可以通过调用selector的selectedKeys()方法,访问“已选择键集(selected key set)”中的就绪通道。如下所示:
Set selectedKeys = selector.selectedKeys();
当Selector注册Channel时,Channel.register()方法会返回一个SelectionKey 对象。这个对象代表了注册到该Selector的通道。可以通过SelectionKey的selectedKeySet()方法访问这些对象。
每次迭代SelectedKeySet,需要调用remove();Selector不会自己从已选择键集中移除SelectionKey实例。必须在处理完通道时自己移除。下次该通道变成就绪时,Selector会再次将其放入已选择键集中。
在以下情况下,SelectionKey对象会失效,这意味着Selector再也不会监控与它相关的事件:
- 程序调用SelectionKey的cancel()方法
- 关闭与SelectionKey关联的Channel
- 与SelectionKey关联的Selector被关闭
基于NIO的Socket:
Server端:
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
ServerSocket serverSocket = serverSocketChannel.socket();
serverSocket.bind(new InetSocketAddress(2223));
Selector selector = Selector.open();
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT, null);
System.out.println("started...");
for(;;){
int readyChannel = selector.select();
if(readyChannel<=0) continue;
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectedKeys.iterator();
while(it.hasNext()){
SelectionKey selectionKey = it.next();
if(selectionKey.isAcceptable()){
ServerSocketChannel ssc = (ServerSocketChannel) selectionKey.channel();
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
sc.register(selector, SelectionKey.OP_READ, null);
}else if(selectionKey.isReadable()){
SocketChannel sc = (SocketChannel) selectionKey.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
StringBuilder sb = new StringBuilder();
while(true){
byteBuffer.clear();
if(sc.read(byteBuffer)<=0)break;
byteBuffer.flip();
while(byteBuffer.hasRemaining()){
sb.append(byteBuffer.get());
}
byteBuffer.rewind();
sc.write(byteBuffer);
}
System.out.println("request:"+sb.toString());
selectionKey.cancel();
}
// if(selectionKey.isWritable()){
// SocketChannel sc = (SocketChannel) selectionKey.channel();
// ByteBuffer byteBuffer = ByteBuffer.wrap("no request response".getBytes());
// sc.write(byteBuffer);
// selectionKey.cancel();
// }
it.remove();
}
}
Client端:
SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 2223));
socketChannel.configureBlocking(false);
Selector selector = Selector.open();
socketChannel.register(selector, SelectionKey.OP_READ|SelectionKey.OP_WRITE, null);
boolean isWrited = false;
for(;;){
int num = selector.select();
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectedKeys.iterator();
while(it.hasNext()){
SelectionKey selectionKey = it.next();
it.remove();
if(!isWrited&&selectionKey.isWritable()){
ByteBuffer byteBuffer = ByteBuffer.wrap("1".getBytes());
socketChannel.write(byteBuffer);
isWrited = true;
}else if(selectionKey.isReadable()){
ByteBuffer byteBuffer1 = ByteBuffer.allocate(1024);
while(true){
byteBuffer1.clear();
if(socketChannel.read(byteBuffer1)<=0)break;
byteBuffer1.flip();
while(byteBuffer1.hasRemaining()){
System.out.print(byteBuffer1.get());
}
}
selectionKey.cancel();
}
}
}
免费分享,随意打赏
admin
测试