Netty编程之NIO实践

开始

最近在学习Netty,其核心便是基于Java的NIO编程封装而来,这篇文章对NIO编程的原理进行介绍,并提供一个NIO编程的完整实践案例。

什么是Netty?

Netty的特点总结如下:

  • Netty是由JBOSS提供的一个Java开源框架,现在为Github上的独立开发项目。
  • Netty是一个异步的、基于事件驱动的网络应用框架,用以快速开发高性能、高可靠性的网络IO程序。
  • Netty主要针对在TCP协议下,面向Clients端的高并发应用,或者Peer-to-Peer场景下的大量数据持续传输的应用。
  • Netty的本质是一个NIO框架,用于服务器通信。

Java网络编程三兄弟:BIO、NIO、NIO2(AIO)


显然NIO是Netty的灵魂,由于NIO属于Java三大I/O模型之一,要学习它不如把这三者合在一起先做个了解。

I/O模型类型描述
Java BIO同步阻塞(传统阻塞型)服务器实现模式为一个连接绑定一个线程即客户端有一个连接,当有客户端请求时,就需要启动一个线程进行处理。如果这个连接不做任何事情则造不必要的线程开销。
Java NIO同步非阻塞服务器实现模式为一个线程处理多个请求,即客户端发送的请求会注册到多路复用器上,多路复用器可以轮询存在的I/O请求进行处理。
Java AIO异步非阻塞AIO引入异步通道的概念,采用Proactor模式,简化了程序的编写,有效的请求才启动线程,它的特点是先由操作系统完成后才通知服务端程序启动线程去处理,一般适用于连接数较多且连接时间较长的应用。

NIO核心原理

NIO由三大核心部分组成:

  • Channel:通道。
  • Buffer:缓冲区。
  • Selector:选择器。

原理说明:


  • 每个Channel对应一个Buffer
  • Selector对应一个线程,一个线程对应多个Channel
  • 多个Channel可以注册到一个Selector。
  • 程序切换Channel由事件决定。
  • Selector会根据不同事件,在各个Channel切换。
  • 数据的读取或者写入通过Buffer,是双向的,但需要flip()切换读/写模式。
  • Channel是双向的,可以返回底层操作系统的情况(比如Linux底层是双向的)。

NIO模式示意图如下:
NIO示例图


关于Channel

基本介绍:

  1. BIO中的stream是单向的,例如FileInputStream 对象只能进行读取数据的操作,而NIO中的通道(Channel)是双向的,可以读操作,也可以写操作。
  2. ChannelNIO中是一 个接口:public interface Channel extends Closeable{}
  3. 常用的Channel类有: FileChannelDatagramChannelServerSocketChannelSocketChannelServerSocketChannel类似ServerSocketSocketChannel类似Socket
  4. FileChannel 用于文件的数据读写,DatagramChannel用于UDP的数据读写,ServerSocketChannelSocketChannel用于TCP的数据读写。

关于Buffer

Java的基本数据类型(除了boolean)均有一个Buffer类与之对应,最常用的自然是ByteBuffer类,该类的主要方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
public abstract class ByteBuffer{
//缓冲区创建相关api
public static ByteBuffer llocateDirect(int capacty)//创建直接缓冲区
public static ByteBuffer llocate(int capacty)//设置缓冲区的初始容量
public static ByteBuffer wrap(byte[] array)//把个数组放到缓冲区中使用
//构造初始化位置ofset和上界length的缓冲区
public static ByteBuffer wrap(byte[] array,int offset, int length)
//缓存区存取相关API
public abstract byte get( )//从当前位置position上get, get之后, position会自动+1
public abstract byte get (int index);//从绝对位置get
public abstract ByteBuffer put (byte b)://从当前位置上添加,put之后,position会自动+1
public abstract ByteBuffer put (int index, byte b);//从绝对位置上put
}

关于Selector

Selector类是一个抽象类,常用的方法如下:

1
2
3
4
5
public abstract class Selector implements Closeable {
public static Selector open();//得到一个选择器对象
public int select(long timeout);//监控所有注册的通道,当其中有10操作可以进行时,将对应的 //SelectionKey加入到内部集合中并返回,参数用来设置超时时间
public Set<SelectionKey> selectedKeys();// 从内部集合中得到所有的SelectionKey
}

NIO编程流程

  1. 当客户端需要连接时,通过ServerSocketChannel得到SocketChannel对象。
  2. 将该SocketChannel对象注册到Selector上,使用register(Selector sel)方法,在一个selector上可以注册多个SocketChannel对象。(将监听服务通道和监听到的对象通道均注册到Selector
  3. 注册后返回一个SelectionKey,会和该Selector关联(集合)。
  4. Selector进行监听select方法,返回有事件发生的通道个数。
  5. 进一步得到各个SelectionKey
  6. 通过使用SelectionKeychannel()方法反向获取SocketChannel
  7. 通过得到的channel完成业务处理。

NIO代码实践—简单的多用户聊天服务场景

服务端:NIOServer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
package com.tosang.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;

/**
* @author tosang
* @create 2020/3/17
*/
public class NIOServer {
public static void main(String[] args) throws IOException {
//创建ServerSocketChannel
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

//得到一个selector对象
Selector selector = Selector.open();

//绑定一个端口6666,在服务器端监听
serverSocketChannel.socket().bind(new InetSocketAddress(6666));

//设置为非阻塞
serverSocketChannel.configureBlocking(false);

//把serverSocketChannel注册到selector,关心的实践为OP_ACCEPT
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

//循环等待客户端连接
while (true){
//等待1秒,如果没有事件发生,继续
if(selector.select(1000) == 0){
//没有事件发生
System.out.println("服务器等待了1秒,无连接");
continue;
}
//如果返回的>0,就获取到相关的SelectionKey集合
//selectionKeys是关注事件的集合
//通过selectionKeys反向获取通道
Set<SelectionKey> selectionKeys = selector.selectedKeys();
//遍历selectionKeys,使用迭代器!!
Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
while (keyIterator.hasNext()){
//获取到SelectionKey
SelectionKey key = keyIterator.next();
//根据key对应通道发生的事件做相应的处理
if(key.isAcceptable()){
//有新的客户端连接
//给该客户端生成一个SocketChannel
//注意:这里已经知道实际的类型,所以下面的accept()方法
//并不会阻塞
SocketChannel socketChannel = serverSocketChannel.accept();
System.out.println("客户端连接成功,生成了一个通道 "+
socketChannel.hashCode());
//将通道设置为非阻塞
socketChannel.configureBlocking(false);
//将当前的socketChannel注册到selector,关注事件为
//OP_READ(从通道读入到缓冲区),同时给该socketChannel关联一个buffer
socketChannel.register(selector,SelectionKey.OP_READ,
ByteBuffer.allocate(1024));
}
if(key.isReadable()){
//发生一个OP_READ事件
//通过key,反向获取对应的通道
//SelectableChannel向下转型为SocketChannel
SocketChannel channel = (SocketChannel) key.channel();
//获取到该通道关联的buffer
ByteBuffer buffer = (ByteBuffer)key.attachment();
channel.read(buffer);
System.out.println("From 客户端:"+new String(buffer.array()));
}
//手动从集合中移除SelectionKey,防止重复操作
keyIterator.remove();
}
}
}
}

客户端:NIOClient

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package com.tosang.nio;

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;

/**
* @author tosang
* @create 2020/3/17
*/
public class NIOClient {
public static void main(String[] args) throws Exception{
//得到一个网络通道
SocketChannel socketChannel = SocketChannel.open();
//设置非阻塞模式
socketChannel.configureBlocking(false);
//提供服务器端的IP和端口
InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", 6666);
//连接服务器
//其中socketChannel.connect判断远程连接是否建立成功
//socketChannel.finishConnect()判断channel's socket是否连接
if(!socketChannel.connect(inetSocketAddress)){
while (!socketChannel.finishConnect()){
System.out.println("因为连接需要时间,客户端不会阻塞,可以继续工作");

}
}
//如果连接成功则发生数据
String str = "hello ,小yi";
//wrap()包裹的含义,返回一个与str大小相等的buffer
//等价于ByteBuffer buffer = ByteBuffer.allocate(str.getBytes().length);
ByteBuffer buffer = ByteBuffer.wrap(str.getBytes());

//发生数据,将buffer写入通道
socketChannel.write(buffer);
//客户端停在此处
System.in.read();

}
}

启动NIOServer和多个NIOClient,可以看到打印的hashcode值不相等,说明一个Selector同时管理着多个连接。