Netty
NIO
概念
NIO:非阻塞IO
Java Socket是全双工的:在任意时刻,线路上存在
A到B
和B到A
的双向信号传输。即使是阻塞IO,读和写可以同时进行,只要分别采用读线程和写线程即可,读不会阻塞写、写也不会阻塞读
三大组件
Channel&Buffer
- NIO系统的核心
- Buffer是非线程安全的
- channel类似于 stream,是读写数据的双向通道,可以从channel将数据读入buffer,也可以将buffer的数据写入channel。channel比stream更底层
- 通道负责传输,缓冲区负责存储
graph LR
channel --> buffer
buffer --> channel
Channel
FileChannel:文件传输
DatagramChannel:UTP网络通信
SocketChannel:TCP网络通信
ServerSocketChannel:TCP网络通信
Buffer
ByteBuffer:使用较多
- MappedByteBuffer
- DirectByteBuffer
- HeapByteBuffer
[Short/Int/Long/Double/Char]Buffer
Selector
多线程版设计
- 内存占用高
- 线程上下文切换成本高
- 只适合连接数少的场景
graph TD t1(thread) --> s1(socket1) t2(thread) --> s2(socket2) t3(thread) --> s3(socket3)
线程池版设计
- 阻塞模式下,线程仅能处理一个socket连接
- 仅适合短连接场景
graph TD t4(thread) --> s4(socket1) t5(thread) --> s5(socket2) t4(thread) -.-> s6(socket3) t5(thread) -.-> s7(socket4)
Selector版设计
- 配合一个线程来管理多个channel获取这些channel上发生的事件
- channel工作在非阻塞模式下,不会让线程吊死在一个channel上
- 适合连接数特别多,但流量低的场景(low traffic)
- 调用selector的
select()
会阻塞直到channel发生了读写就绪事件,事件发生后select
方法就会返回这些事件交给thread处理
graph TD thread --> selector selector --> c1(channel) selector --> c2(channel) selector --> c3(channel)
ByteBuffer
使用步骤
- 向buffer写入数据,如调用
channel.read(buffer)
- 调用
flip()
切换至读模式 - 从 buffer 读取数据,如调用
buffer.get()
- 调用
clear()
或compact()
切换至写模式 - 重复1~4步骤
1 |
|
结构原理
重要属性
- Capacity:buffer容量
- PositIOn:读写指针
- Limit:读写限制
使用过程
写模式下,PositIOn是写入位置,Limit等于Capacity
- 下图表示写入了4个字节后的状态
1
2
3
4
5
6
7Limit
↓
+---+---+---+---+---+---+---+
| a | b | c | d | | | |
+---+---+---+---+---+---+---+
↑ ↑
PositIOn Capacityflip动作发生后,positIOn切换为读取位置,limit切换为读取限制
1
2
3
4
5
6
7Limit
↓
+---+---+---+---+---+---+---+
| a | b | c | d | | | |
+---+---+---+---+---+---+---+
↑ ↑
PositIOn Capacityclear
动作发生后的状态1
2
3
4
5
6
7Limit
↓
+---+---+---+---+---+---+---+
| | | | | | | |
+---+---+---+---+---+---+---+
↑ ↑
Position Capacitycompact
方法:把未读完的部分向前压缩,然后切换至写模式1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17// compact前
Limit
↓
+---+---+---+---+---+---+---+
| a | b | c | d | | | |
+---+---+---+---+---+---+---+
↑ ↑
PositIOn apacity
// compact后
Limit
↓
+---+---+---+---+---+---+---+
| c | d | | | | | |
+---+---+---+---+---+---+---+
↑ ↑
PositIOn Capacity
调试工具类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170public class ByteBufferUtil {
private static final char[] BYTE2CHAR = new char[256];
private static final char[] HEXDUMP_TABLE = new char[256 * 4];
private static final String[] HEXPADDING = new String[16];
private static final String[] HEXDUMP_ROWPREFIXES = new String[65536 >>> 4];
private static final String[] BYTE2HEX = new String[256];
private static final String[] BYTEPADDING = new String[16];
static {
final char[] DIGITS = "0123456789abcdef".toCharArray();
for (int i = 0; i < 256; i++) {
HEXDUMP_TABLE[i << 1] = DIGITS[i >>> 4 & 0x0F];
HEXDUMP_TABLE[(i << 1) + 1] = DIGITS[i & 0x0F];
}
int i;
// Generate the lookup table for hex dump paddings
for (i = 0; i < HEXPADDING.length; i++) {
int padding = HEXPADDING.length - i;
StringBuilder buf = new StringBuilder(padding * 3);
for (int j = 0; j < padding; j++) {
buf.append(" ");
}
HEXPADDING[i] = buf.toString();
}
// Generate the lookup table for the start-offset header in each row (up to 64KiB).
for (i = 0; i < HEXDUMP_ROWPREFIXES.length; i++) {
StringBuilder buf = new StringBuilder(12);
buf.append(NEWLINE);
buf.append(Long.toHexString(i << 4 & 0xFFFFFFFFL | 0x100000000L));
buf.setCharAt(buf.length() - 9, '|');
buf.append('|');
HEXDUMP_ROWPREFIXES[i] = buf.toString();
}
// Generate the lookup table for byte-to-hex-dump conversIOn
for (i = 0; i < BYTE2HEX.length; i++) {
BYTE2HEX[i] = ' ' + StringUtil.byteToHexStringPadded(i);
}
// Generate the lookup table for byte dump paddings
for (i = 0; i < BYTEPADDING.length; i++) {
int padding = BYTEPADDING.length - i;
StringBuilder buf = new StringBuilder(padding);
for (int j = 0; j < padding; j++) {
buf.append(' ');
}
BYTEPADDING[i] = buf.toString();
}
// Generate the lookup table for byte-to-char conversIOn
for (i = 0; i < BYTE2CHAR.length; i++) {
if (i <= 0x1f || i >= 0x7f) {
BYTE2CHAR[i] = '.';
} else {
BYTE2CHAR[i] = (char) i;
}
}
}
/**
* 打印所有内容
*
* @param buffer
*/
public static void debugAll(ByteBuffer buffer) {
int oldlimit = buffer.limit();
buffer.limit(buffer.capacity());
StringBuilder origin = new StringBuilder(256);
appendPrettyHexDump(origin, buffer, 0, buffer.capacity());
System.out.println("+--------+-------------------- all ------------------------+----------------+");
System.out.printf("positIOn: [%d], limit: [%d]\n", buffer.positIOn(), oldlimit);
System.out.println(origin);
buffer.limit(oldlimit);
}
/**
* 打印可读取内容
*
* @param buffer
*/
public static void debugRead(ByteBuffer buffer) {
StringBuilder builder = new StringBuilder(256);
appendPrettyHexDump(builder, buffer, buffer.positIOn(), buffer.limit() - buffer.positIOn());
System.out.println("+--------+-------------------- read -----------------------+----------------+");
System.out.printf("positIOn: [%d], limit: [%d]\n", buffer.positIOn(), buffer.limit());
System.out.println(builder);
}
private static void appendPrettyHexDump(StringBuilder dump, ByteBuffer buf, int offset, int length) {
if (isOutOfBounds(offset, length, buf.capacity())) {
throw new IndexOutOfBoundsExceptIOn(
"expected: " + "0 <= offset(" + offset + ") <= offset + length(" + length
+ ") <= " + "buf.capacity(" + buf.capacity() + ')');
}
if (length == 0) {
return;
}
dump.append(
" +-------------------------------------------------+" +
NEWLINE + " | 0 1 2 3 4 5 6 7 8 9 a b c d e f |" +
NEWLINE + "+--------+-------------------------------------------------+----------------+");
final int startIndex = offset;
final int fullRows = length >>> 4;
final int remainder = length & 0xF;
// Dump the rows which have 16 bytes.
for (int row = 0; row < fullRows; row++) {
int rowStartIndex = (row << 4) + startIndex;
// Per-row prefix.
appendHexDumpRowPrefix(dump, row, rowStartIndex);
// Hex dump
int rowEndIndex = rowStartIndex + 16;
for (int j = rowStartIndex; j < rowEndIndex; j++) {
dump.append(BYTE2HEX[getUnsignedByte(buf, j)]);
}
dump.append(" |");
// ASCII dump
for (int j = rowStartIndex; j < rowEndIndex; j++) {
dump.append(BYTE2CHAR[getUnsignedByte(buf, j)]);
}
dump.append('|');
}
// Dump the last row which has less than 16 bytes.
if (remainder != 0) {
int rowStartIndex = (fullRows << 4) + startIndex;
appendHexDumpRowPrefix(dump, fullRows, rowStartIndex);
// Hex dump
int rowEndIndex = rowStartIndex + remainder;
for (int j = rowStartIndex; j < rowEndIndex; j++) {
dump.append(BYTE2HEX[getUnsignedByte(buf, j)]);
}
dump.append(HEXPADDING[remainder]);
dump.append(" |");
// Ascii dump
for (int j = rowStartIndex; j < rowEndIndex; j++) {
dump.append(BYTE2CHAR[getUnsignedByte(buf, j)]);
}
dump.append(BYTEPADDING[remainder]);
dump.append('|');
}
dump.append(NEWLINE +
"+--------+-------------------------------------------------+----------------+");
}
private static void appendHexDumpRowPrefix(StringBuilder dump, int row, int rowStartIndex) {
if (row < HEXDUMP_ROWPREFIXES.length) {
dump.append(HEXDUMP_ROWPREFIXES[row]);
} else {
dump.append(NEWLINE);
dump.append(Long.toHexString(rowStartIndex & 0xFFFFFFFFL | 0x100000000L));
dump.setCharAt(dump.length() - 9, '|');
dump.append('|');
}
}
public static short getUnsignedByte(ByteBuffer buffer, int index) {
return (short) (buffer.get(index) & 0xFF);
}
}
常用方法
分配空间
可以使用
allocate
方法为ByteBuffer分配空间,其它buffer类也有该方法allocate不能动态分配,若想使用更大的buffer只能重新分配
1
2
3
4
5
6
7
8
9
10// java.nIO.HeapByteBuffer:Java堆内存
// 读写效率较低
// 受到GC影响
System.out.println(ByteBuffer.allocate(16).getClass());
// java.nIO.DirectByteBuffer:直接内存
// 读写效率较高
// 不受到GC影响
// 分配效率低
// 可能会造成内存泄漏
System.out.println(ByteBuffer.allocateDirect(16).getClass());
向buffer写入数据
方法1:调用channel的
read
方法1
int readBytes = channel.read(buf);
方法2:调用buffer自己的
put
方法1
buf.put((byte)127);
从buffer读取数据
方法1:调用channel的
write
方法1
int writeBytes = channel.write(buf);
方法2:调用buffer自己的
get
方法1
byte b = buf.get();
get
方法会让PositIOn读指针向后走,如果想重复读取数据- 可以调用
rewind
方法将positIOn重新置为0 - 或者调用
get(int i)
方法获取索引i
的内容,它不会移动读指针
- 可以调用
mark&reset
mark
在读取时做一个标记,即使positIOn改变,只要调用reset
就能回到mark
的位置
字符串与ByteBuffer互转
1
2
3
4
5
6
7
8
9
10
11
12
13// TODO String => ByteBuffer
// 方式1:put,不会自动切换到读模式
ByteBuffer buffer = ByteBuffer.allocate(16);
buffer.put("hello".getBytes());
// 方式2:Charset,会自动切换到读模式
ByteBuffer buffer1 = StandardCharsets.UTF_8.encode("hello");
// 方式3:wrap,会自动切换到读模式
ByteBuffer buffer2 = ByteBuffer.wrap("hello".getBytes());
// TODO ByteBuffer => String
// 方式1:Charset,buffer需要转换成读模式
buffer.flip();
String str = StandardCharsets.UTF_8.decode(buffer).toString();
Scattering Reads
- 分散读,将数据填充至多个buffer
- 减少数据在buffer间的拷贝次数,提高效率
1 |
|
Gathering Writes
- 集中写
- 减少数据在buffer间的拷贝次数,提高效率
1 |
|
文件编程
FileChannel
工作模式:FileChannel只能工作在阻塞模式下
获取
- 不能直接打开FileChannel,必须通过FileInputStream/FileOutputStream/RandomAccessFile来获取FileChannel,它们都有
getChannel
方法 - 通过FileInputStream获取的channel只能读
- 通过FileOutputStream获取的channel只能写
- 通过RandomAccessFile是否能读写根据构造RandomAccessFile时的读写模式决定
- 不能直接打开FileChannel,必须通过FileInputStream/FileOutputStream/RandomAccessFile来获取FileChannel,它们都有
读取
- 从channel读取数据填充ByteBuffer,返回值表示读到了多少字节,
-1
表示到达了文件的末尾
1
int readBytes = channel.read(buffer);
- 从channel读取数据填充ByteBuffer,返回值表示读到了多少字节,
写入
- 在
while
中调用channel.write
是因为write
方法并不能保证一次将buffer中的内容全部写入channel
1
2
3
4
5
6
7ByteBuffer buffer = ...;
buffer.put(...); // 存入数据
buffer.flip(); // 切换读模式
while(buffer.hasRemaining()) {
channel.write(buffer);
}- 在
关闭
- channel必须关闭,不过调用了FileInputStream/FileOutputStream/RandomAccessFile的
close
方法会间接地调用 channel的close
方法
- channel必须关闭,不过调用了FileInputStream/FileOutputStream/RandomAccessFile的
位置
获取当前位置
1
long pos = channel.positIOn();
设置当前位置
1
2long newPos = ...;
channel.positIOn(newPos);设置当前位置时,如果设置为文件的末尾
- 这时读取会返回
-1
- 这时写入会追加内容,但要注意如果
positIOn
超过了文件末尾,再写入时在新内容和原末尾之间会有空洞(00)
- 这时读取会返回
大小
- 使用
size
方法获取文件的大小
- 使用
强制写入
- 操作系统出于性能的考虑会将数据缓存,不是立刻写入磁盘。可以调用
force(true)
方法将文件内容和元数据(文件的权限等信息)立刻写入磁盘
- 操作系统出于性能的考虑会将数据缓存,不是立刻写入磁盘。可以调用
Channel传输数据
2G以内
transferTo
效率高,底层会使用操作系统的零拷贝进行优化,最多传输2G数据
1
2
3
4
5
6
7
8try (
FileChannel source = new FileInputStream("data/data.txt").getChannel();
FileChannel distance = new FileOutputStream("data/data.txt.copy").getChannel();
) {
source.transferTo(0, source.size(), distance);
} catch (IOExceptIOn e) {
e.printStackTrace();
}超过2G
1
2
3
4
5
6
7
8
9
10
11
12try (
FileChannel source = new FileInputStream("data/data.txt").getChannel();
FileChannel distance = new FileOutputStream("data/data.txt.copy").getChannel();
) {
long size = source.size(), left = size;
// 剩余部分大于0
while (left > 0) {
left -= source.transferTo(size - left, left, distance);
}
} catch (IOExceptIOn e) {
e.printStackTrace();
}
Path
JDK7引入了Path和Paths类
- Path用来表示文件路径
- Paths是工具类,用来获取Path实例
1
2
3
4
5
6
7
8// 相对路径 使用 user.dir 环境变量来定位 1.txt
Path source = Paths.get("1.txt");
// 绝对路径 代表了 d:\1.txt
Path source = Paths.get("d:\\1.txt");
// 绝对路径 同样代表了 d:\1.txt
Path source = Paths.get("d:/1.txt");
// 代表了 d:\data\projects
Path projects = Paths.get("d:\\data", "projects");.
代表了当前路径..
代表了上一级路径
Files
检查文件是否存在
1
2Path path = Paths.get("helloword/data.txt");
System.out.println(Files.exists(path));创建一级目录
如果目录已存在,会抛异常FileAlreadyExistsExceptIOn
不能一次创建多级目录,否则会抛异常NoSuchFileExceptIOn
1
2Path path = Paths.get("helloword/d1");
Files.createDirectory(path);创建多级目录用
1
2Path path = Paths.get("helloword/d1/d2");
Files.createDirectories(path);拷贝文件
如果文件已存在,会抛异常FileAlreadyExistsExceptIOn
1
2
3Path source = Paths.get("helloword/data.txt");
Path target = Paths.get("helloword/target.txt");
Files.copy(source, target);如果希望用source覆盖掉target,需要用StandardCopyOptIOn来控制
1
Files.copy(source, target, StandardCopyOptIOn.REPLACE_EXISTING);
移动文件
StandardCopyOptIOn.ATOMIC_MOVE
保证文件移动的原子性
1
2
3Path source = Paths.get("helloword/data.txt");
Path target = Paths.get("helloword/data.txt");
Files.move(source, target, StandardCopyOptIOn.ATOMIC_MOVE);删除文件
- 如果文件不存在,会抛异常NoSuchFileExceptIOn
1
2Path target = Paths.get("helloword/target.txt");
Files.delete(target);删除目录
- 如果目录还有内容,会抛异常DirectoryNotEmptyExceptIOn
1
2Path target = Paths.get("helloword/d1");
Files.delete(target);遍历目录文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35AtomicInteger dirCount = new AtomicInteger();
AtomicInteger fileCount = new AtomicInteger();
// 参数1:遍历起点
// 参数2:对应操作
Files.walkFileTree(Paths.get("/Users/mousse/IdeaProjects/JavaEECourse/Netty"), new SimpleFileVisitor<Path>() {
// 进入文件夹前
@Override
public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOExceptIOn {
System.out.println(dir);
dirCount.incrementAndGet();
return super.preVisitDirectory(dir, attrs);
}
// 遍历到文件时
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOExceptIOn {
System.out.println("\t" + file);
fileCount.incrementAndGet();
return super.visitFile(file, attrs);
}
// 遍历文件失败时
@Override
public FileVisitResult visitFileFailed(Path file, IOExceptIOn exc) throws IOExceptIOn {
return super.visitFileFailed(file, exc);
}
// 文件夹出来后
@Override
public FileVisitResult postVisitDirectory(Path dir, IOExceptIOn exc) throws IOExceptIOn {
return super.postVisitDirectory(dir, exc);
}
});
System.out.println("dir count: \t" + dirCount);
System.out.println("file count: " + fileCount);统计Jar的数目
1
2
3
4
5
6
7
8
9
10
11
12AtomicInteger jarCount = new AtomicInteger();
Files.walkFileTree(Paths.get("/Users/mousse/IdeaProjects/JavaEECourse/Netty"), new SimpleFileVisitor<Path>() {
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOExceptIOn {
if (file.toString().endsWith(".jar")) {
System.out.println("\t" + file);
jarCount.incrementAndGet();
}
return super.visitFile(file, attrs);
}
});
System.out.println("jar count: \t" + jarCount);删除多级目录
- 删除是危险操作,确保要递归删除的文件夹没有重要内容
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20Files.walkFileTree(Paths.get("/Users/mousse/IdeaProjects/JavaEECourse/Netty/data/delete"), new SimpleFileVisitor<Path>() {
@Override
public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOExceptIOn {
System.out.println("进入" + dir);
return super.preVisitDirectory(dir, attrs);
}
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOExceptIOn {
Files.delete(file);
return super.visitFile(file, attrs);
}
@Override
public FileVisitResult postVisitDirectory(Path dir, IOExceptIOn exc) throws IOExceptIOn {
System.out.println("退出" + dir);
Files.delete(dir);
return super.postVisitDirectory(dir, exc);
}
});拷贝多级目录
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16String source = "data/data.txt";
String target = "data/data.txt.copy";
Files.walk(Paths.get(source)).forEach(path -> {
try {
String targetName = path.toString().replace(source, target);
if (Files.isDirectory(path)) {
// 是目录
Files.createDirectory(Paths.get(targetName));
} else if (Files.isRegularFile(path)) {
// 是文件
Files.copy(path, Paths.get(targetName));
}
} catch (IOExceptIOn e) {
throw new RuntimeExceptIOn(e);
}
});
网络编程
非阻塞/阻塞
阻塞
阻塞模式下,相关方法会导致线程暂停
ServerSocketChannel.accept
:没有连接建立时让线程暂停SocketChannel.read
:没有数据可读时让线程暂停- 暂停期间不会占用CPU,线程相当于闲置
单线程下,阻塞方法之间相互影响,几乎不能正常工作,需要多线程支持
服务端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25ByteBuffer buffer = ByteBuffer.allocate(16);
// 创建服务器
ServerSocketChannel ssc = ServerSocketChannel.open();
// 绑定监听端口
ssc.bind(new InetSocketAddress(8080));
List<SocketChannel> channels = new ArrayList<>();
// 建立与客户端的连接
while (true) {
log.debug("connecting");
// SocketChannel为数据读写的通道
// accept方法为阻塞方法,线程会停止运行
SocketChannel sc = ssc.accept();
log.debug("connected: {}", sc);
channels.add(sc);
// 接收客户端发送的数据
for (SocketChannel channel : channels) {
log.debug("start read: {}", channel);
// read方法为阻塞方法,线程会停止运行
channel.read(buffer);
buffer.flip();
debugRead(buffer);
buffer.clear();
log.debug("finish read: {}", channel);
}
}
客户端
1
2
3SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("localhost", 8080));
System.out.println("waiting...");
非阻塞
非阻塞模式下,相关方法不会让线程暂停
ServerSocketChannel.accept
:没有连接建立时返回null
,继续运行SocketChannel.read
:没有数据可读时会返回0
,线程不阻塞,可以去执行其它SocketChannel的read
,或执行ServerSocketChannel.accept
- 写数据时,线程只是等待数据写入Channel即可,无需等Channel通过网络把数据发送出去
非阻塞模式下,即使没有连接建立和可读数据,线程仍然在不断运行,浪费资源
数据复制过程中,线程实际还是阻塞的(AIO改进的地方)
服务器端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30ByteBuffer buffer = ByteBuffer.allocate(16);
ServerSocketChannel ssc = ServerSocketChannel.open();
// 将ServerSocketChannel设置为非阻塞模式
ssc.configureBlocking(false);
ssc.bind(new InetSocketAddress(8080));
List<SocketChannel> channels = new ArrayList<>();
// 建立与客户端的连接
while (true) {
// 设置为非阻塞模式后accept方法变为非阻塞方法
// 如果没有建立连接则返回null,线程继续运行
SocketChannel sc = ssc.accept();
if (sc != null) {
log.debug("connected: {}", sc);
// 将SocketChannel设置为非阻塞模式
sc.configureBlocking(false);
channels.add(sc);
}
// 接收客户端发送的数据
for (SocketChannel channel : channels) {
// 设置为非阻塞模式后read方法变为非阻塞方法
// 如果没有读到数据返回0
int read = channel.read(buffer);
if (read == 0) continue;
log.debug("start read: {}", channel);
buffer.flip();
debugRead(buffer);
buffer.clear();
log.debug("finish read: {}", channel);
}
}
Selector
graph TD
thread --> selector
selector --> c1(channel)
selector --> c2(channel)
selector --> c3(channel)
特点
一个线程配合selector可以监控多个channel的事件,事件发生线程才去处理,称为多路复用
使线程能够被充分利用
节约线程数量
减少线程上下文切换
多路复用仅针对网络IO、普通文件IO没法利用
创建
1
Selector selector = Selector.open();
绑定Channel事件
- 也称之为注册事件,绑定的事件selector才会关心
channel必须工作在非阻塞模式
FileChannel没有非阻塞模式,因此不能配合selector一起使用
绑定的事件类型
- connect:客户端连接成功时触发
- accept:服务器端成功接受连接时触发
- read:数据可读入时触发,有因为接收能力弱,数据暂不能读入的情况
- write:数据可写出时触发,有因为发送能力弱,数据暂不能写出的情况
1
2channel.configureBlocking(false);
SelectIOnKey key = channel.register(selector, 绑定事件);监听Channel事件
方法的返回值代表有多少channel发生了事件
方法1:阻塞直到绑定事件发生
1
int count = selector.select();
方法2:阻塞直到绑定事件发生,或是超时(时间单位为 ms)
1
int count = selector.select(long timeout);
方法3:不会阻塞,不管有没有事件,立刻返回,自己根据返回值检查是否有事件
1
int count = selector.selectNow();
select
不阻塞时机- 事件发生时
- 客户端发起连接请求会触发
accept
事件 - 客户端发送数据过来,客户端正常/异常关闭时都会触发
read
事件,如果发送的数据大于buffer缓冲区,会触发多次读取事件 - channel可写会触发
write
事件 - 在linux下nIO bug发生时
- 客户端发起连接请求会触发
- 调用
selector.wakeup()
- 调用
selector.close()
- selector所在线程
interrupt
- 事件发生时
处理
accept
事件- 事件发生后,要么处理要么取消(cancel),什么都不做的话下次该事件仍会触发,这是因为nIO底层使用的是水平触发
处理
read
事件- 客户端断开连接时会发送
read
事件,需要将key取消注册(从selectedKeys集合中删除) - 正常断开连接时
read
返回值为-1 - 异常断开连接时抛出异常
- 客户端断开连接时会发送
迭代器移除元素
select
在事件发生后会将相关的key放入selectedKeys集合,但不会在处理完后从 selectedKeys集合中移除,需要手动删除- 第一次触发了
ssckey
上的accept
事件,没有移除ssckey
- 第二次触发了
sckey
上的read
事件,但这时selectedKeys中还有上次的ssckey
,在处理时因为没有连接,会导致空指针异常
- 第一次触发了
cancel
- 用来取消注册在selector上的channel,并从keys集合中删除key,后续不再监听事件
处理消息边界
- TLV格式(Type类型、Length长度、Value数据),类型和长度已知的情况下,可以方便获取消息大小,分配合适的buffer,缺点是buffer需要提前分配,如果内容过大会影响serve吞吐量
- Http1.1是TLV格式
- Http2.0是LTV格式
- TLV格式(Type类型、Length长度、Value数据),类型和长度已知的情况下,可以方便获取消息大小,分配合适的buffer,缺点是buffer需要提前分配,如果内容过大会影响serve吞吐量
sequenceDiagram
participant c1 as 客户端1
participant s as 服务器
participant b1 as ByteBuffer1
participant b2 as ByteBuffer2
c1 ->> s: 发送 0123456789abcdefMore\n
s ->> b1: 第一次 read 存入 01234567890abcdef
s ->> b2: 扩容
b1 ->> b2: 拷贝 01234567890abcdef
s ->> b2: 第二次 read 存入 More\n
b2 ->> b2: 01234567890abcdefMore\n
ByteBuffer大小分配
每个channel都需要记录可能被切分的消息,ByteBuffer不能被多个channel共用,需要为每个channel维护一个独立的ByteBuffer
ByteBuffer不能太大,需要设计大小可变的ByteBuffer
- 思路1
- 首先分配一个较小的buffer(如4k),若发现数据不够,再分配8k的 buffer,将4k buffer内容拷贝至8k buffer
- 消息连续容易处理,但数据拷贝耗费性能
- 参考实现:http://tutorials.jenkov.com/java-performance/resizable-array.html
- 思路2
- 用多个数组组成buffer,一个数组不够就把多出来的内容写入新的数组,与前面的区别是消息存储不连续解析复杂,避免拷贝引起的性能损耗
- 思路1
处理
write
事件非阻塞模式下,无法保证把buffer中所有数据都写入channel,需要追踪
write
方法的返回值(代表实际写入字节数)用selector监听所有channel的可写事件,每个channel都需要一个key来跟踪buffer,但这样又会导致占用内存过多,就有两阶段策略
- 当消息处理器第一次写入消息时,才将channel注册到selector上
- selector检查channel上的可写事件,若写完所有数据就取消注册channel
- 不取消会每次可写均会触发
write
事件
- 不取消会每次可写均会触发
只要向channel送数据时socket缓冲可写,这个事件会频繁触发,应当只在socket缓冲区写不下时再关注可写事件,数据写完之后再取消关注
利用多线程优化
- 分两组选择器
- 单线程配一个选择器,专门处理
accept
事件 - 创建CPU核心数的线程,每个线程配一个选择器,轮流处理
read
事件
- 单线程配一个选择器,专门处理
- 获取CPU个数
Runtime.getRuntime().availableProcessors()
:若工作在docker容器下会拿到物理CPU个数,而不是容器申请时的个数- 这个问题直到JDK 10才修复,使用Jvm参数
UseContainerSupport
配置, 默认开启
- 分两组选择器
UDP
UDP是无连接的,client发送数据不会管server是否开启
server的
receive
方法会将接收到的数据存入byte buffer,但如果数据报文超过buffer 大小,多出来的数据会被默默抛弃首先启动服务器端
1
2
3
4
5
6
7
8
9
10
11
12
13
14public class UdpServer {
public static void main(String[] args) {
try (DatagramChannel channel = DatagramChannel.open()) {
channel.socket().bind(new InetSocketAddress(9999));
System.out.println("waiting...");
ByteBuffer buffer = ByteBuffer.allocate(32);
channel.receive(buffer);
buffer.flip();
debug(buffer);
} catch (IOExceptIOn e) {
e.printStackTrace();
}
}
}
运行客户端
1
2
3
4
5
6
7
8
9
10
11public class UdpClient {
public static void main(String[] args) {
try (DatagramChannel channel = DatagramChannel.open()) {
ByteBuffer buffer = StandardCharsets.UTF_8.encode("hello");
InetSocketAddress address = new InetSocketAddress("localhost", 9999);
channel.send(buffer, address);
} catch (ExceptIOn e) {
e.printStackTrace();
}
}
}
NIO与BIO
Stream与Channel
Stream
- 不会自动缓冲数据
- 只支持阻塞API
- 全双工(读写可同时进行)
Channel
- 利用系统提供的发送/接收缓冲区(更为底层)
- 支持阻塞/非阻塞API
- 网络channel可配合selector实现多路复用
- 全双工(读写可同时进行)
IO模型
同步阻塞、同步非阻塞、同步多路复用、异步非阻塞
- 同步:线程自己去获取结果(一个线程)
- 异步:线程自己不去获取结果,而是由其它线程送结果(至少两个线程)
当调用一次
channel.read
或stream.read
后,会切换至操作系统内核态来完成真正数据读取,读取分为两个阶段等待数据阶段
复制数据阶段
阻塞IO:同步,等待阶段和复制阶段都阻塞
sequenceDiagram
participant u as 用户空间
participant k1 as 内核空间channel1
participant k2 as 内核空间channel2
u ->> k1: read
k1 ->> k1: 等待数据
k1 ->> k1: 复制数据
k1 ->> u: 返回
u ->> k2: accept
k2 ->> k2: 等待连接
k2 ->> k2: 建立连接
k2 ->> u: 返回
u ->> k1: read
k1 ->> k1: 复制数据
k1 ->> u: 返回
- 非阻塞IO:同步,等待阶段非阻塞,复制阶段阻塞
sequenceDiagram
participant u as 用户空间
participant k as 内核空间
u ->> k: read
k ->> u: 返回
u ->> k: read
k ->> k: 复制数据
k ->> u: 返回
- 多路复用:同步
sequenceDiagram
participant u as 用户空间
participant k1 as 内核空间channel1
participant k2 as 内核空间channel2
u ->> k1: select
u ->> k2: 返回
k1 ->> k1: 等待事件
k2 ->> k2: 返回
k1 ->> u: c1.read c2.accpet
k2 ->> u: 返回
u ->> k2: accept
k2 ->> k2: 建立连接
k2 ->> u: 返回
u ->> k1: read
k1 ->> k1: 复制数据
k1 ->> u: 返回
- 信号驱动
- 异步IO
sequenceDiagram
participant u as 用户空间
participant k as 内核空间
u ->> k: thread1
k ->> u: 回调方法(参数)
k ->> k: 等待数据
k ->> k: 复制数据
k ->> u: thread2回调方法(真正结果)
零拷贝
概念
零拷贝并不是真正无拷贝,而是在不会拷贝重复数据到JVM内存中
优点
更少的用户态与内核态的切换
不利用CPU计算,减少CPU缓存伪共享
适合小文件传输
传统IO问题
传统的IO将一个文件通过socket写出
1
2
3
4
5
6
7
8
9File f = new File("helloword/data.txt");
RandomAccessFile file = new RandomAccessFile(file, "r");
byte[] buf = new byte[(int)f.length()];
// 转为内核态
file.read(buf);
// 转为用户态
Socket socket = ...;
// 转为内核态
socket.getOutputStream().write(buf);内部工作流程
graph LR 磁盘 -- 拷贝1 --> 内核缓冲区 -- 拷贝2 --> 用户缓冲区 -- 拷贝3 --> Socket缓冲区 -- 拷贝4 --> 网卡
Java本身不具备IO读写能力,
read
方法调用后,要从Java程序的用户态切换至内核态,去调用操作系统(Kernel)的读能力,将数据读入内核缓冲区- 期间用户线程阻塞
- 操作系统使用DMA(Direct Memory Access)实现文件读,其间也不会使用CPU(DMA可以理解为硬件单元,用来解放CPU完成文件IO)
从内核态切换回用户态,将数据从内核缓冲区读入用户缓冲区(即
byte[] buf
),期间CPU会参与拷贝,无法利用DMA调用
write
方法,将数据从用户缓冲区(byte[] buf
)写入socket缓冲区,CPU参与拷贝向网卡写数据,Java不具备该能力,得从用户态切换至内核态,调用操作系统的写能力,使用DMA将socket 缓冲区的数据写入网卡,不会使用CPU
中间环节较多,Java的IO实际不是物理设备级别的读写,而是缓存的复制,底层的真正读写是操作系统来完成的
用户态与内核态的切换发生了3次,这个操作比较重量级
数据拷贝了共4次
NIO优化
通过DirectByteBuf
ByteBuffer.allocate(10)
:HeapByteBuffer,使用JVM内存ByteBuffer.allocateDirect(10)
: DirectByteBuffer,使用OS内存
大部分步骤与优化前相同,唯有一点:Java可使用DirectByteBuf将堆外内存映射到 JVM内存中来直接访问使用
graph LR 磁盘 -- 拷贝1 --> 内核缓冲区 -..-> 用户缓冲区 -- 拷贝2 --> Socket缓冲区 -- 拷贝3 --> 网卡
这块内存不受JVM垃圾回收的影响,内存地址固定,有助于IO读写
Java中的DirectByteBuf对象仅维护了此内存的虚引用,内存回收分成两步
- DirectByteBuf对象被垃圾回收,将虚引用加入引用队列
- 通过专门线程访问引用队列,根据虚引用释放堆外内存
- 减少了一次数据拷贝,用户态与内核态的切换次数没有减少
进一步优化(底层采用了linux 2.1后提供的
sendFile
方法)graph LR 磁盘 -- 拷贝1 --> 内核缓冲区 -- 拷贝2 --> Socket缓冲区 -- 拷贝3 --> 网卡
Java中对应着两个 channel调用
transferTo/transferFrom
方法拷贝数据- Java调用
transferTo
方法后,从Java程序的用户态切换至内核态,使用DMA将数据读入内核缓冲区,不使用CPU - 数据从内核缓冲区传输到socket缓冲区,CPU参与拷贝
- 使用DMA将socket 缓冲区的数据写入网卡,不使用CPU
- Java调用
只发生了一次用户态与内核态的切换
数据拷贝了3次
进一步优化(linux 2.4)
graph LR 磁盘 -- 拷贝1 --> 内核缓冲区 -- 拷贝2 --> 网卡 内核缓冲区 -..-> Socket缓冲区 -..- 网卡
- Java调用
transferTo
方法后,从Java程序的用户态切换至内核态,使用DMA将数据读入内核缓冲区,不使用CPU - 只将一些offset和length信息拷入socket缓冲区,几乎无消耗
- 使用DMA将内核缓冲区的数据写入网卡,不使用CPU
- Java调用
整个过程仅只发生了1次用户态与内核态的切换,数据拷贝了2次
AIO
用来解决数据复制阶段的阻塞问题
同步:在进行读写操作时,线程需要等待结果
异步:在进行读写操作时,线程不必等待结果,将来由操作系统来通过回调方式由另外的线程来获得结果
异步模型需要底层操作系统(Kernel)提供支持
- Windows系统通过IOCP实现了真正的异步IO
- Linux系统异步IO在2.6版本引入,但其底层实现还是用多路复用模拟了异步IO,性能没有优势
文件AIO
AsynchronousFileChannel
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30try (AsynchronousFileChannel channel = AsynchronousFileChannel.open(Paths.get("data/data.txt"), StandardOpenOptIOn.READ)) {
// 参数: ByteBuffer, 读取文件的启始位置, 附件, 回调对象
ByteBuffer buffer = ByteBuffer.allocate(16);
log.debug("read begin");
channel.read(buffer, 0, buffer, new CompletIOnHandler<Integer, ByteBuffer>() {
// 守护线程:其他线程运行结束后守护线程也会结束
/**
* 读取成功
*
* @param result 读取到的实际字节数
* @param attachment 附件中的buffer对象
*/
@Override
public void completed(Integer result, ByteBuffer attachment) {
log.debug("read completed");
attachment.flip();
debugAll(attachment);
}
// 读取失败
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
}
});
System.in.read();
log.debug("read finish");
} catch (IOExceptIOn e) {
e.printStackTrace();
}- 响应文件读取成功的是另一个线程Thread-5
- 主线程并没有IO操作阻塞
守护线程
- 默认文件AIO使用的线程都是守护线程,最后要执行
System.in.read()
以避免守护线程意外结束
- 默认文件AIO使用的线程都是守护线程,最后要执行
网络 AIO
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100public class AIOServer {
public static void main(String[] args) throws IOExceptIOn {
AsynchronousServerSocketChannel ssc = AsynchronousServerSocketChannel.open();
ssc.bind(new InetSocketAddress(8080));
ssc.accept(null, new AcceptHandler(ssc));
System.in.read();
}
private static void closeChannel(AsynchronousSocketChannel sc) {
try {
System.out.printf("[%s] %s close\n", Thread.currentThread().getName(), sc.getRemoteAddress());
sc.close();
} catch (IOExceptIOn e) {
e.printStackTrace();
}
}
private static class ReadHandler implements CompletIOnHandler<Integer, ByteBuffer> {
private final AsynchronousSocketChannel sc;
public ReadHandler(AsynchronousSocketChannel sc) {
this.sc = sc;
}
@Override
public void completed(Integer result, ByteBuffer attachment) {
try {
if (result == -1) {
closeChannel(sc);
return;
}
System.out.printf("[%s] %s read\n", Thread.currentThread().getName(), sc.getRemoteAddress());
attachment.flip();
System.out.println(Charset.defaultCharset().decode(attachment));
attachment.clear();
// 处理完第一个 read 时,需要再次调用 read 方法来处理下一个 read 事件
sc.read(attachment, attachment, this);
} catch (IOExceptIOn e) {
e.printStackTrace();
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
closeChannel(sc);
exc.printStackTrace();
}
}
private static class WriteHandler implements CompletIOnHandler<Integer, ByteBuffer> {
private final AsynchronousSocketChannel sc;
private WriteHandler(AsynchronousSocketChannel sc) {
this.sc = sc;
}
@Override
public void completed(Integer result, ByteBuffer attachment) {
// 如果作为附件的 buffer 还有内容,需要再次 write 写出剩余内容
if (attachment.hasRemaining()) {
sc.write(attachment);
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
exc.printStackTrace();
closeChannel(sc);
}
}
private static class AcceptHandler implements CompletIOnHandler<AsynchronousSocketChannel, Object> {
private final AsynchronousServerSocketChannel ssc;
public AcceptHandler(AsynchronousServerSocketChannel ssc) {
this.ssc = ssc;
}
@Override
public void completed(AsynchronousSocketChannel sc, Object attachment) {
try {
System.out.printf("[%s] %s connected\n", Thread.currentThread().getName(), sc.getRemoteAddress());
} catch (IOExceptIOn e) {
e.printStackTrace();
}
ByteBuffer buffer = ByteBuffer.allocate(16);
// 读事件由 ReadHandler 处理
sc.read(buffer, buffer, new ReadHandler(sc));
// 写事件由 WriteHandler 处理
sc.write(Charset.defaultCharset().encode("server hello!"), ByteBuffer.allocate(16), new WriteHandler(sc));
// 处理完第一个 accpet 时,需要再次调用 accept 方法来处理下一个 accept 事件
ssc.accept(null, this);
}
@Override
public void failed(Throwable exc, Object attachment) {
exc.printStackTrace();
}
}
}
Netty
概述
概念
异步的、基于事件驱动的网络应用框架,用于快速开发可维护、高性能的网络服务器和客户端
- Netty的异步是指调用时的异步
- Netty的IO模型仍基于多路复用
优势
- 解决TCP传输问题,如粘包、半包
- 解决epoll空轮询导致CPU 100%的bug
- 对API进行增强,更易用,如FastThreadLocal=>ThreadLocal,ByteBuf=>ByteBuffer
案例
目标
开发一个简单的服务器端和客户端
客户端向服务器端发送hello, world
服务器仅接收,不返回
依赖
1
2
3
4
5<dependency>
<groupId>IO.Netty</groupId>
<artifactId>Netty-all</artifactId>
<versIOn>4.1.79.Final</versIOn>
</dependency>服务端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34// 服务器端启动器,负责组装Netty组件,启动服务器
new ServerBootstrap()
// 创建NIOEventLoopGroup,可以简单理解为`线程池 + Selector`
.group(new NIOEventLoopGroup())
// 选择服务Socket实现类,NIOServerSocketChannel表示基于NIO的服务器端实现,其它实现还有OIO(BIO)等
.channel(NIOServerSocketChannel.class)
// 添加的处理器是给SocketChannel用的,而不是给ServerSocketChannel
// Leader负责连接,Worker(Child)负责读写,决定了Worker(Child)能执行哪些操作(handler)
.childHandler(
// Channel代表和客户端进行数据读写的通道,Initializer初始化器,负责添加别的handler
// ChannelInitializer处理器(仅执行一次)
// 待客户端SocketChannel建立连接后,执行initChannel以便添加更多的处理器
new ChannelInitializer<NIOSocketChannel>() {
// 添加具体的handler
@Override
protected void initChannel(NIOSocketChannel nIOSocketChannel) throws ExceptIOn {
// 将ByteBuf转为String
nIOSocketChannel.pipeline().addLast(new StringDecoder());
nIOSocketChannel.pipeline().addLast(
// 自定义handler
new ChannelInboundHandlerAdapter() {
// 读事件
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws ExceptIOn {
// 打印字符串
log.info("message: {}", msg);
}
}
);
}
}
)
// 绑定监听端口
.bind(8080);客户端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23// 创建启动类
new Bootstrap()
// 添加EventLoop
.group(new NIOEventLoopGroup())
// 选择客户端channel实现
.channel(NIOSocketChannel.class)
// 添加处理器
.handler(new ChannelInitializer<NIOSocketChannel>() {
// 在连接建立后被调用
@Override
protected void initChannel(NIOSocketChannel nIOSocketChannel) throws ExceptIOn {
nIOSocketChannel.pipeline().addLast(new StringEncoder());
}
})
// 连接服务器
.connect(new InetSocketAddress("localhost", 8080))
// 阻塞方法,直到连接建立
// Netty中很多方法都是异步的,如connect,这时需要使用sync方法等待connect建立连接完毕
.sync()
// 获取channel对象,它即为通道抽象,可以进行数据读写操作
.channel()
// 写入消息并清空缓冲区
.writeAndFlush("Hello World!");总结
- channel:数据通道
- msg:流动的数据
- 最开始输入是ByteBuf,经过pipeline的加工会变成其它类型对象,最后输出又变成ByteBuf
- handler:数据的处理工序
- 工序有多道,合在一起就是pipeline,pipeline负责发布事件(读、读取完成…)传播给每个handler, handler对自己感兴趣的事件进行处理(重写了相应事件处理方法)
- handler分Inbound和Outbound
- eventLoop:处理数据的工人
- 可以管理多个channel的IO操作,一旦工人负责了某个channel就要负责到底(绑定)
- 既可以执行IO操作,也可以进行任务处理,每位工人有任务队列,队列里可以堆放多个channel的待处理任务,任务分为普通任务、定时任务
- 工人按照pipeline 顺序,依次按照handler的规划(代码)处理数据,可以为每道工序指定不同的工人
组件
EventLoop
概念
EventLoop
事件循环对象,本质是一个单线程执行器(同时维护了一个Selector),里面有
run
方法处理Channel上源源不断的IO事件继承关系(接口多继承)
- 继承1:j.u.c.ScheduledExecutorService,包含线程池中所有的方法
- 继承2:Netty自己的OrderedEventExecutor
- 提供
boolean inEventLoop(Thread thread)
方法判断一个线程是否属于此 EventLoop - 提供了
parent
方法查看自己属于哪个EventLoopGroup
- 提供
EventLoopGrop
- 事件循环组,是一组EventLoop,Channel一般会调用EventLoopGroup的
register
方法绑定其中一个EventLoop,后续该Channel上的IO事件都由此 EventLoop处理(保证IO事件处理时的线程安全)
- 继承Netty自己的EventExecutorGroup
- 实现Iterable接口提供遍历EventLoop的能力
next
方法获取集合中下一个EventLoop
- 事件循环组,是一组EventLoop,Channel一般会调用EventLoopGroup的
优雅关闭
- 优雅关闭
shutdownGracefully
方法,会先切换EventLoopGroup
到关闭状态从而拒绝新的任务的加入,然后在任务队列的任务都处理完成后,停止线程的运行。从而确保整体应用是在正常有序的状态下退出的
- 优雅关闭
处理IO事件
1 |
|
handler切换线程源码
1 |
|
- 关键代码
IO.Netty.channel.AbstractChannelHandlerContext#invokeChannelRead()
- 若两个handler绑定的是同一线程则直接调用,否则把要调用的代码封装为一个任务对象,由下一个handler的线程调用
处理普通任务
1 |
|
处理定时任务
1 |
|
Channel
主要作用
close()
:关闭channelcloseFuture()
:处理channel的关闭sync
:同步等待channel关闭addListener
:异步等待channel关闭
pipeline()
:添加处理器write()
:将数据写入writeAndFlush()
:将数据写入并刷出
ChannelFuture
1 |
|
CloseFuture
1 |
|
异步总结
- 单线程没法异步提高效率,必须配合多线程、多核CPU才能发挥异步的优势
- 异步并没有缩短响应时间,反而有所增加
- 合理进行任务拆分,也是利用异步的关键
- 用响应时间换取吞吐量
Future&Promise
概述
- 异步处理时常用的两个接口
- JDK Future:只能同步等待任务结束(或成功或失败)才能得到结果
- Netty Future:可以同步等待任务结束得到结果,也可以异步方式得到结果,但都是要等任务结束
- Netty Promise:不仅有Netty Future的功能,而且脱离了任务独立存在,只作为两个线程间传递结果的容器
- Netty Future:可以同步等待任务结束得到结果,也可以异步方式得到结果,但都是要等任务结束
功能/名称 | JDK Future | Netty Future | Promise |
---|---|---|---|
cancel |
取消任务 | - | - |
isCanceled |
任务是否取消 | - | - |
isDone |
任务是否完成,不能区分成功失败 | - | - |
get |
获取任务结果,阻塞等待 | - | - |
getNow |
- | 获取任务结果,非阻塞,还未产生结果时返回 null | - |
await |
- | 等待任务结束,如果任务失败,不会抛异常,而是通过 isSuccess 判断 | - |
sync |
- | 等待任务结束,如果任务失败,抛出异常 | - |
isSuccess |
- | 判断任务是否成功 | - |
cause |
- | 获取失败信息,非阻塞,如果没有失败,返回null | - |
addLinstener |
- | 添加回调,异步接收结果 | - |
setSuccess |
- | - | 设置成功结果 |
setFailure |
- | - | 设置失败结果 |
案例
JDKFuture
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15// 线程池
ExecutorService service = Executors.newFixedThreadPool(2);
// 提交任务
Future<Integer> future = service.submit(() -> {
try {
log.debug("执行计算");
Thread.sleep(1000);
return 50;
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
log.debug("等待结果");
// 主线程通过future获取结果
log.debug("结果: {}", future.get());NettyFuture
1
2
3
4
5
6
7
8
9
10
11
12
13
14NioEventLoopGroup group = new NioEventLoopGroup(2);
EventLoop eventLoop = group.next();
Future<Integer> future = eventLoop.submit(() -> {
// [nioEventLoopGroup-2-1] c.m.n.e.f.NettyFutureDemo - 执行计算
log.debug("执行计算");
Thread.sleep(1000);
return 50;
});
log.debug("等待结果");
// 方式1,[main] c.m.n.e.f.NettyFutureDemo - 结果: 50
// 主线程通过future获取结果
log.debug("结果: {}", future.get());
// 方式2,[nioEventLoopGroup-2-1] c.m.n.e.f.NettyFutureDemo - 结果: 50
future.addListener(future1 -> log.debug("结果: {}", future1.getNow()));NettyPromise
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18// 准备EventLoop对象
EventLoop eventLoop = new NioEventLoopGroup().next();
// 可以主动创建promise
DefaultPromise<Integer> promise = new DefaultPromise<>(eventLoop);
new Thread(() -> {
log.debug("执行计算");
try {
int i = 1 / 0;
Thread.sleep(1000);
} catch (Exception e) {
promise.setFailure(e);
}
// 向promise填充结果
promise.setSuccess(50);
}).start();
// 接收结果
log.debug("等待结果");
promise.addListener(future1 -> log.debug("结果: {}", future1.getNow()));
Handler&Pipeline
概念
ChannelHandler:用来处理Channel上的各种事件,分为入站和出站。所有 ChannelHandler被连成一串就是Pipeline
- 入站处理器: ChannelInboundHandlerAdapter的子类,用来读取客户端数据,写回结果
- 出站处理器:ChannelOutboundHandlerAdapter的子类,对写回结果进行加
每个Channel是一个产品的加工车间,Pipeline是车间中的流水线,ChannelHandler是流水线上的各道工序,ByteBuf是原材料,经过很多工序的加工:先经过一道道入站工序,再经过一道道出站工序最终变成产品
Pipeline
1 |
|
1 |
|
- InboundHandler从head顺序执行,Outbound从tail执行。ChannelPipeline的实现是一 ChannelHandlerContext(包装了ChannelHandler)组成的双向链表
- 入站处理器中,
ctx.fireChannelRead(msg)
是调用下一个入站处理器 ctx.channel().write(msg)
:从尾部开始查找出站处理器ctx.write(msg)
:从当前节点找上一个出站处理器- 服务端pipeline触发的原始流程,图中数字代表了处理步骤的先后次序
EmbeddedChannel
- Netty提供的用来测试的channel,用来绑定多个handler,使用时不需要启动服务端和客户端
1 |
|
ByteBuf
概念
对字节数据的封装
组成
- 最开始读写指针都在0位置
优势
- 池化:可以重用池中ByteBuf实例,更节约内存,减少内存溢出的可能
- 读写指针分离:不需要像ByteBuffer一样切换读写模式
- 可以自动扩容
- 支持链式调用:使用更流畅
- 很多地方体现零拷贝:例如slice、duplicate、CompositeByteBuf
操作
创建
堆内存
创建池化基于堆的ByteBuf
1
ByteBuf buffer = ByteBufAllocator.DEFAULT.heapBuffer();
直接内存(默认)
创建池化基于直接内存的ByteBuf
1
2ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
ByteBuf buffer = ByteBufAllocator.DEFAULT.directBuffer();创建和销毁的代价昂贵,但读写性能高(少一次内存复制),适合配合池化功能一起用
对GC压力小,因为这部分内存不受JVM垃圾回收的管理,但要注意及时主动释放
log工具
1
2
3
4
5
6
7
8
9
10
11private static void log(ByteBuf buffer) {
int length = buffer.readableBytes();
int rows = length / 16 + (length % 15 == 0 ? 0 : 1) + 4;
StringBuilder buf = new StringBuilder(rows * 80 * 2)
.append("read index:").append(buffer.readerIndex())
.append(" write index:").append(buffer.writerIndex())
.append(" capacity:").append(buffer.capacity())
.append(NEWLINE);
appendPrettyHexDump(buf, buffer);
System.out.println(buf.toString());
}
池化vs非池化
池化的最大意义在于可以重用ByteBuf
没有池化,每次都得创建新的ByteBuf实例,这个操作对直接内存代价昂贵,就算是堆内存也会增加GC压力
有了池化,可以重用池中ByteBuf实例,并且采用了与jemalloc类似的内存分配算法提升分配效率
高并发时,池化功能更节约内存,减少内存溢出的可能
池化功能是否开启,可以通过下面的系统环境变量来设置
1
-DIO.Netty.allocator.type={unpooled|pooled}
- 4.1后,非Android平台默认启用池化实现,Android平台启用非池化实现
写入
省略一些不重要的方法
这些方法的未指明返回值的返回的都是ByteBuf,可以链式调用
网络传输,默认习惯是Big Endian
set开头的一系列方法也可以写入数据,但不会改变写指针位置
方法签名 | 含义 | 备注 |
---|---|---|
writeBoolean(boolean value) |
写入boolean值 | 用一字节01/00代表true/false |
writeByte(int value) |
写入byte值 | |
writeShort(int value) |
写入short值 | |
writeInt(int value) |
写入int值 | Big Endian,从低位向高位写入,即0x250 ,写入后00 00 02 50 |
writeIntLE(int value) |
写入int值 | Little Endian,从高位向低位写入,即0x250 ,写入后50 02 00 00 |
writeLong(long value) |
写入long值 | |
writeChar(int value) |
写入char值 | |
writeFloat(float value) |
写入float值 | |
writeDouble(double value) |
写入double值 | |
writeBytes(ByteBuf src) |
写入Netty的ByteBuf | |
writeBytes(byte[] src) |
写入byte[] | |
writeBytes(ByteBuffer src) |
写入NIO的ByteBuffer | |
int writeCharSequence(CharSequence sequence, Charset charset) |
写入字符串 |
扩容
若写入后数据大小未超过512,则选择下一个16的整数倍
- 例如写入后大小为12 ,则扩容后capacity是16
如果写入后数据大小超过 512,则选择下一个2^n^
- 例如写入后大小为513,则扩容后capacity是2^10^=1024(2^9^=512已经不够了)
扩容不能超过
max capacity
否则会报错
读取
如果需要重复读取,可以在
read
前先做个标记mark
1
2
3buffer.markReaderIndex();
System.out.println(buffer.readInt());
log(buffer);时要重复读取的话,重置到标记位置
reset
1
2buffer.resetReaderIndex();
log(buffer);还有种办法是采用get开头的一系列方法,这些方法不会改变read index
内存回收
由于Netty中有堆外内存的ByteBuf实现,堆外内存最好是手动来释放,而不是等GC垃圾回收
UnpooledHeapByteBuf使用JVM内存,只需等GC回收内存即可
UnpooledDirectByteBuf使用直接内存,需要特殊的方法来回收内存
PooledByteBuf和它的子类使用池化机制,需要更复杂的规则来回收内存
回收内存的源码实现,请关注下面方法的不同实现p
rotected abstract void deallocate()
Netty采用引用计数法来控制回收内存,每个ByteBuf都实现了ReferenceCounted接口
每个ByteBuf对象的初始计数为1
调用
release
方法计数减1,如果计数为0,ByteBuf内存被回收调用
retain
方法计数加1,表示调用者没用完之前,其它handler即使调用了release 也不会造成回收当计数为0时,底层内存会被回收,这时即使ByteBuf对象还在,其各个方法均无法正常使用
基本规则
- 谁最后使用,谁负责
release
- 起点,对NIO实现来讲,在
IO.Netty.channel.nIO.AbstractNIOByteChannel.NIOByteUnsafe#read
方法中首次创建ByteBuf放入pipeline(line 163pipeline.fireChannelRead(byteBuf)
)
- 谁最后使用,谁负责
入站ByteBuf处理原则
- 对原始ByteBuf不做处理,调用
ctx.fireChannelRead(msg)
向后传递,这时无须release
- 将原始ByteBuf转换为其它类型的Java对象,这时ByteBuf就没用了,必须
release
- 如果不调用
ctx.fireChannelRead(msg)
向后传递,那么也必须release
- 如果ByteBuf没有成功传递到下一个ChannelHandler,必须
release
- 假设消息一直向后传,那么TailContext会负责释放未处理消息(原始的 ByteBuf)
- 对原始ByteBuf不做处理,调用
出站ByteBuf处理原则
- 出站消息最终都会转为ByteBuf输出一直向前传,由HeadContext
flush
后release
- 出站消息最终都会转为ByteBuf输出一直向前传,由HeadContext
异常处理原则
- 有时候不清楚ByteBuf被引用了多少次,但又必须彻底释放,可以循环调用
release
直到返回true
- 有时候不清楚ByteBuf被引用了多少次,但又必须彻底释放,可以循环调用
TailContext释放未处理消息逻辑
1
2
3
4
5
6
7
8
9
10// IO.Netty.channel.DefaultChannelPipeline#onUnhandledInboundMessage(java.lang.Object)
protected void onUnhandledInboundMessage(Object msg) {
try {
logger.debug(
"Discarded inbound message {} that reached at the tail of the pipeline. " +
"Please check your pipeline configuratIOn.", msg);
} finally {
ReferenceCountUtil.release(msg);
}
}具体代码
1
2
3
4
5
6
7// IO.Netty.util.ReferenceCountUtil#release(java.lang.Object)
public static boolean release(Object msg) {
if (msg instanceof ReferenceCounted) {
return ((ReferenceCounted) msg).release();
}
return false;
}
零拷贝
slice
- 对原始ByteBuf进行切片成多个ByteBuf,切片后的ByteBuf还是使用原始ByteBuf的内存,切片后的ByteBuf维护独立的
read
,write
指针
切片后的
max capacity
被固定为这个区间的大小,不能追加write
释放原有ByteBuf内存可能会出异常(使用
retaion
解决)1
2
3
4
5
6
7
8
9
10
11ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(10);
buf.writeBytes(new byte[]{'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j'});
// 在切片过程中没有发生数据复制
// 无参:从原始ByteBuf的read index到write index间的内容进行切片
// 有参:起始位置,参数长度
ByteBuf buf1 = buf.slice(0, 5);
buf1.retain();
ByteBuf buf2 = buf.slice(5, 5);
buf2.retain();
// 释放原始ByteBuf内存
buf.release();
- 对原始ByteBuf进行切片成多个ByteBuf,切片后的ByteBuf还是使用原始ByteBuf的内存,切片后的ByteBuf维护独立的
duplicate
- 与原始ByteBuf使用同一块底层内存,只是读写指针是独立的并没有
max capacity
的限制
- 与原始ByteBuf使用同一块底层内存,只是读写指针是独立的并没有
copy
- 将底层内存数据进行深拷贝,无论读写都与原始ByteBuf无关
- 不是零拷贝
CompositeByteBuf
将多个ByteBuf合并为一个逻辑上的ByteBuf,避免拷贝
是一个组合的ByteBuf,内部维护了一个Component数组,每个Component管理一个ByteBuf,记录这个ByteBuf相对于整体偏移量等信息,代表着整体中某一段的数据
优点:对外是一个虚拟视图,组合时不会产生内存复制
缺点:复杂,多次操作会带来性能损耗
需要注意引用计数
1
2
3
4
5
6
7
8
9
10
11ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer();
buf1.writeBytes(new byte[]{1, 2, 3, 4, 5});
ByteBuf buf2 = ByteBufAllocator.DEFAULT.buffer();
buf2.writeBytes(new byte[]{6, 7, 8, 9, 10});
// 会产生多次数据复制,影响性能
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();
buf.writeBytes(buf1).writeBytes(buf2);
// 不会产生数据复制
CompositeByteBuf compositeBuf = ByteBufAllocator.DEFAULT.compositeBuffer();
// 参数1:是否自动调整指针的位置
compositeBuf.addComponents(true, buf1, buf2);Unpooled
- 一个工具类,提供了非池化的ByteBuf创建、组合、复制等操作
- 零拷贝相关的
wrappedBuffer
方法,可以用来包装ByteBuf
1
2
3
4
5
6
7ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer(5);
buf1.writeBytes(new byte[]{1, 2, 3, 4, 5});
ByteBuf buf2 = ByteBufAllocator.DEFAULT.buffer(5);
buf2.writeBytes(new byte[]{6, 7, 8, 9, 10});
// 当包装 ByteBuf 个数超过一个时, 底层使用了 CompositeByteBuf
ByteBuf buf3 = Unpooled.wrappedBuffer(buf1, buf2);
System.out.println(ByteBufUtil.prettyHexDump(buf3));
粘包半包
分析
本质是因TCP是流式协议,消息无边界
粘包
现象,发送abc def,接收 abcdef
原因
- 应用层:接收方ByteBuf设置太大(Netty默认1024)
- 滑动窗口(TCP层面):设发送方256bytes表示一个完整报文,但由于接收方处理不及时且窗口大小足够大,这256 bytes字节就会缓冲在接收方的滑动窗口中,当滑动窗口中缓冲了多个报文就会粘包
- Nagle算法(TCP层面):会造成粘包
半包
现象,发送abcdef,接收abc def
原因
- 应用层:接收方ByteBuf小于实际发送数据量
- 滑动窗口(TCP层面):假设接收方的窗口只剩128bytes,发送方的报文大小是256bytes,放不下只能先发送前128bytes,等ack后才能发送剩余部分,造成半包
- MSS限制(数据链路层):当发送的数据超过MSS限制后,会将数据切分发送,造成半包
滑动窗口
TCP以一个段(segment)为单位,每发送一个段就需要进行一次确认应答(ack)处理,但这么做包的往返时间越长性能就越差
为解决此问题,引入窗口概念,窗口大小即决定了无需等待应答而可以继续发送的数据最大值
窗口实际就起到一个缓冲区的作用,同时也能起到流量控制的作用
- 图中深色的部分即要发送的数据,高亮的部分即窗口
- 窗口内的数据才允许被发送,当应答未到达前,窗口必须停止滑动
- 如果1001~2000这个段的数据ack回来了,窗口就可以向前滑动
- 接收方也会维护一个窗口,只有落在窗口内的数据才能允许接收
- Nagle算法
- 即使发送一个字节也需要加入TCP头和ip头,也就是总字节数会使用41bytes,非常不经济。因此为了提高网络利用率,TCP希望尽可能发送足够大的数据,这就是Nagle算法产生的缘由
- 该算法是指发送端即使还有应该发送的数据,但如果这部分数据很少的话,则进行延迟发送
- 如果SO_SNDBUF的数据达到MSS,则需要发送
- 如果SO_SNDBUF中含有FIN(表示需要连接关闭)这时将剩余数据发送,再关闭
- 如果TCP_NODELAY = true,则需要发送
- 已发送的数据都收到ack时,则需要发送
- 上述条件不满足,但发生超时(一般为200ms)则需要发送
- 除上述情况,延迟发送
- MSS限制
链路层对一次能够发送的最大数据有限制,这个限制称之为 MTU(maximum transmission unit),不同的链路设备的 MTU 值也有所不同,例如
以太网的MTU是1500
FDDI(光纤分布式数据接口)的MTU是4352
本地回环地址的MTU是65535 - 本地测试不走网卡
MSS是最大段长度(maximum segment size),它是MTU刨去TCP头和ip头后剩余能够作为数据传输的字节数
ipv4 TCP头占用20bytes,ip头占用20bytes,因此以太网MSS的值为1500 - 40 = 1460
TCP在传递大量数据时,会按照MSS大小将数据进行分割发送
MSS的值在三次握手时通知对方自己MSS的值,然后在两者之间选择一个小值作为MSS
解决
短链接
- 发一个包建立一次连接,这样连接建立到连接断开之间就是消息的边界
- 能解决粘包
- 缺点
- 效率低
- 不能解决半包
固定长度
- 每条消息采用固定长度
- 缺点:数据包的大小不好把握
- 长度定的太大,浪费
- 长度定的太小,对某些数据包又显得不够
1 |
|
固定分隔符
- 每条消息采用分隔符,如
\n
- 缺点:处理字符数据比较合适,但如果内容本身包含了分隔符(字节数据常常会有此情况),那么就会解析错误
1 |
|
预设长度
每条消息分为head和body,head中包含body的长度
在发送消息前,先约定用定长字节表示接下来数据的长度
1
2// 最大长度,长度偏移,长度占用字节,长度调整,剥离字节数
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 0, 1, 0, 1));
1 |
|
协议设计
概述
TCP/IP中消息传输基于流的方式,没有边界
协议的目的就是划定消息的边界,制定通信双方要共同遵守的通信规则
自定义
要素
- 魔数,用来在第一时间判定是否是无效数据包
- 版本号,可以支持协议的升级
- 序列化算法,消息正文到底采用哪种序列化反序列化方式,可以由此扩展,如:JSON、protobuf、hessian、JDK
- 指令类型,是登录、注册、单聊、群聊… 跟业务相关
- 请求序号,为了双工通信,提供异步能力
- 正文长度
- 消息正文
编解码器
1 |
|
1 |
|
1 |
|
@Sharable
- 该handler可以在多线程下被共享使用,只需创建一个实例
- 当handler不保存状态时,就可以安全地在多线程下被共享
- 对于编解码器类,不能继承ByteToMessageCodec或CombinedChannelDuplexHandler父类,他们的构造方法对
@Sharable
有限制 - 如果能确保编解码器不会保存状态,可以继承MessageToMessageCodec父类
案例
空闲检测
连接假死
原因
网络设备出现故障,例如网卡,机房等,底层的 TCP 连接已经断开了,但应用程序没有感知到,仍然占用着资源。
公网网络不稳定,出现丢包。如果连续出现丢包,这时现象就是客户端数据发不出去,服务端也一直收不到数据,就这么一直耗着
应用程序线程阻塞,无法进行数据读写
问题
假死的连接占用的资源不能自动释放
向假死的连接发送数据,得到的反馈是发送超时
服务端解决
- 每隔一段时间就检查这段时间内是否接收到客户端数据,没有就可以判定为连接假死
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16// 用来判断是不是 读空闲时间过长,或 写空闲时间过长
// 5s 内如果没有收到 channel 的数据,会触发一个 IdleState#READER_IDLE 事件
ch.pipeline().addLast(new IdleStateHandler(5, 0, 0));
// ChannelDuplexHandler 可以同时作为入站和出站处理器
ch.pipeline().addLast(new ChannelDuplexHandler() {
// 用来触发特殊事件
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception{
IdleStateEvent event = (IdleStateEvent) evt;
// 触发了读空闲事件
if (event.state() == IdleState.READER_IDLE) {
log.debug("已经 5s 没有读到数据了");
ctx.channel().close();
}
}
});客户端定时心跳
- 客户端可以定时向服务器端发送数据,只要这个时间间隔小于服务器定义的空闲检测的时间间隔,那么就能防止前面提到的误判,客户端可以定义如下心跳处理器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16// 用来判断是不是 读空闲时间过长,或 写空闲时间过长
// 3s 内如果没有向服务器写数据,会触发一个 IdleState#WRITER_IDLE 事件
ch.pipeline().addLast(new IdleStateHandler(0, 3, 0));
// ChannelDuplexHandler 可以同时作为入站和出站处理器
ch.pipeline().addLast(new ChannelDuplexHandler() {
// 用来触发特殊事件
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception{
IdleStateEvent event = (IdleStateEvent) evt;
// 触发了写空闲事件
if (event.state() == IdleState.WRITER_IDLE) {
// log.debug("3s 没有写数据了,发送一个心跳包");
ctx.writeAndFlush(new PingMessage());
}
}
});
序列化
序列化,反序列化主要用在消息正文的转换上
序列化时,需要将Java对象变为要传输的数据(可以是
byte[]
,或JSON等,最终都需要变成byte[]
)反序列化时,需要将传入的正文数据还原成Java对象,便于处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70public interface Serializer {
// 反序列化方法
<T> T deserialize(Class<T> clazz, byte[] bytes);
// 序列化方法
<T> byte[] serialize(T object);
enum Algorithm implements Serializer {
Java {
@Override
public <T> T deserialize(Class<T> clazz, byte[] bytes) {
try {
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
return (T) ois.readObject();
} catch (IOException | ClassNotFoundException e) {
throw new RuntimeException("反序列化失败", e);
}
}
@Override
public <T> byte[] serialize(T object) {
try {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeObject(object);
return bos.toByteArray();
} catch (IOException e) {
throw new RuntimeException("序列化失败", e);
}
}
},
Json {
@Override
public <T> T deserialize(Class<T> clazz, byte[] bytes) {
Gson gson = new GsonBuilder().registerTypeAdapter(Class.class, new ClassCodec()).create();
String json = new String(bytes, StandardCharsets.UTF_8);
return gson.fromJson(json, clazz);
}
@Override
public <T> byte[] serialize(T object) {
Gson gson = new GsonBuilder().registerTypeAdapter(Class.class, new ClassCodec()).create();
String json = gson.toJson(object);
return json.getBytes(StandardCharsets.UTF_8);
}
}
}
class ClassCodec implements JsonSerializer<Class<?>>, JsonDeserializer<Class<?>> {
@Override
public Class<?> deserialize(JsonElement json, Type typeOfT, JsonDeserializationContext context) throws JsonParseException {
try {
String str = json.getAsString();
return Class.forName(str);
} catch (ClassNotFoundException e) {
throw new JsonParseException(e);
}
}
@Override // String.class
public JsonElement serialize(Class<?> src, Type typeOfSrc, JsonSerializationContext context) {
// class -> json
return new JsonPrimitive(src.getName());
}
}
}
参数调优
CONNECT_TIMEOUT_MILLIS
属于SocketChannal参数
若客户端建立连接时未在指定毫秒内连接,会抛出timeout异常
SO_TIMEOUT
用在阻塞IO:阻塞IO中accept/read
等都是无限等待的,如果不希望永远阻塞,使用它调整超时时间
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21// 为ServerSocketChannel配置参数
new ServerBootstrap().option();
// 为SocketChannel配置参数
new ServerBootstrap().childOption();
NioEventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap()
.group(group)
// 客户端通过option方法配置参数
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 300)
.channel(NioSocketChannel.class)
.handler(new LoggingHandler());
ChannelFuture future = bootstrap.connect("localhost", 8080);
future.sync().channel().closeFuture().sync(); // 断点1
} catch (Exception e) {
e.printStackTrace();
log.debug("timeout");
} finally {
group.shutdownGracefully();
}io.netty.channel.nio.AbstractNioChannel.AbstractNioUnsafe#connect
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21@Override
public final void connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
// ...
// Schedule connect timeout.
int connectTimeoutMillis = config().getConnectTimeoutMillis();
if (connectTimeoutMillis > 0) {
connectTimeoutFuture = eventLoop().schedule(new Runnable() {
@Override
public void run() {
ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
ConnectTimeoutException cause =
new ConnectTimeoutException("connection timed out: " + remoteAddress); // 断点2
if (connectPromise != null && connectPromise.tryFailure(cause)) {
close(voidPromise());
}
}
}, connectTimeoutMillis, TimeUnit.MILLISECONDS);
}
// ...
}SO_BACKLOG
属于ServerSocketChannal参数
sequenceDiagram participant c as client participant s as server participant sq as syns queue(半连接队列) participant aq as accept queue(全连接队列) s ->> s : bind() s ->> s : listen() c ->> c : connect() c ->> s : 1. SYN Note left of c : SYN_SEND s ->> sq : put Note right of s : SYN_RCVD s ->> c : 2. SYN + ACK Note left of c : ESTABLISHED c ->> s : 3. ACK sq ->> aq : put Note right of s : ESTABLISHED aq -->> s : s ->> s : accept()
- 第一次握手,client发送SYN到server,状态修改为
SYN_SEND
,server收到,状态改变为SYN_REVD
,并将该请求放入sync queue队列 - 第二次握手,server回复SYN + ACK给client,client收到,状态改变为
ESTABLISHED
,并发送ACK给server - 第三次握手,server收到ACK,状态改变为
ESTABLISHED
,将该请求从sync queue放入accept queue
- 第一次握手,client发送SYN到server,状态修改为
在linux 2.2后,分别用下面两个参数来控制
sync queue:半连接队列,存放没有完成三次握手的连接信息
- 大小通过
/proc/sys/net/ipv4/tcp_max_syn_backlog
指定,在syncookies
启用的情况下,逻辑上没有最大值限制,这个设置便被忽略
- 大小通过
accept queue:全连接队列,存放已经完成三次握手的连接信息
- 其大小通过
/proc/sys/net/core/somaxconn
指定,在使用listen函数时,内核会根据传入的backlog参数与系统参数,取二者的较小值- 如果accpet queue队列满了,server将发送一个拒绝连接的错误信息到 client
- 其大小通过
- netty中可以通过
option(ChannelOption.SO_BACKLOG, 值)
设置大小
ulimit -n
- 允许同时打开的文件描述符的数量
- 属于操作系统参数
TCP_NODELAY
- 小数据包不延迟发送(nagle算法会使小数据包攒在一起发送)
- 属于SocketChannal参数
SO_SNDBUF & SO_RCVBUF
SO_SNDBUF
属于SocketChannal参数SO_RCVBUF
既可用于SocketChannal参数,也可以用于ServerSocketChannal参数(建议设置到ServerSocketChannal上)
ALLOCATOR
- 用来分配ByteBuf,
ctx.alloc()
- 属于SocketChannal参数
- 用来分配ByteBuf,
RCVBUF_ALLOCATOR
- 控制netty接收缓冲区大小
- 负责入站数据的分配,决定入站缓冲区的大小(并可动态调整),统一采用direct直接内存,具体池化还是非池化由allocator决定
- 属于SocketChannal参数
RPC框架
准备工作
为了简化起见,在原来聊天项目的基础上新增Rpc请求和响应消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15@Data
public abstract class Message implements Serializable {
// 省略旧的代码
public static final int RPC_MESSAGE_TYPE_REQUEST = 101;
public static final int RPC_MESSAGE_TYPE_RESPONSE = 102;
static {
// ...
messageClasses.put(RPC_MESSAGE_TYPE_REQUEST, RpcRequestMessage.class);
messageClasses.put(RPC_MESSAGE_TYPE_RESPONSE, RpcResponseMessage.class);
}
}请求消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39@Getter
@ToString(callSuper = true)
public class RpcRequestMessage extends Message {
/**
* 调用的接口全限定名,服务端根据它找到实现
*/
private String interfaceName;
/**
* 调用接口中的方法名
*/
private String methodName;
/**
* 方法返回类型
*/
private Class<?> returnType;
/**
* 方法参数类型数组
*/
private Class[] parameterTypes;
/**
* 方法参数值数组
*/
private Object[] parameterValue;
public RpcRequestMessage(int sequenceId, String interfaceName, String methodName, Class<?> returnType, Class[] parameterTypes, Object[] parameterValue) {
super.setSequenceId(sequenceId);
this.interfaceName = interfaceName;
this.methodName = methodName;
this.returnType = returnType;
this.parameterTypes = parameterTypes;
this.parameterValue = parameterValue;
}
@Override
public int getMessageType() {
return RPC_MESSAGE_TYPE_REQUEST;
}
}响应消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17@Data
@ToString(callSuper = true)
public class RpcResponseMessage extends Message {
/**
* 返回值
*/
private Object returnValue;
/**
* 异常值
*/
private Exception exceptionValue;
@Override
public int getMessageType() {
return RPC_MESSAGE_TYPE_RESPONSE;
}
}
服务端
1 |
|
1 |
|
客户端
- 包括channel管理,代理,接收结果
1 |
|
1 |
|
源码
启动
入口
io.netty.bootstrap.ServerBootstrap#bind
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22//1 netty 中使用 NioEventLoopGroup (简称 nio boss 线程)来封装线程和 selector
Selector selector = Selector.open();
//2 创建 NioServerSocketChannel,同时会初始化它关联的 handler,以及为原生 ssc 存储 config
NioServerSocketChannel attachment = new NioServerSocketChannel();
//3 创建 NioServerSocketChannel 时,创建了 java 原生的 ServerSocketChannel
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
//4 启动 nio boss 线程执行接下来的操作
//5 注册(仅关联 selector 和 NioServerSocketChannel),未关注事件
SelectionKey selectionKey = serverSocketChannel.register(selector, 0, attachment);
//6 head -> 初始化器 -> ServerBootstrapAcceptor -> tail,初始化器是一次性的,只为添加 acceptor
//7 绑定端口
serverSocketChannel.bind(new InetSocketAddress(8080));
//8 触发 channel active 事件,在 head 中关注 op_accept 事件
selectionKey.interestOps(SelectionKey.OP_ACCEPT);io.netty.bootstrap.AbstractBootstrap#doBind
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37private ChannelFuture doBind(final SocketAddress localAddress) {
// 1. 执行初始化和注册 regFuture 会由 initAndRegister 设置其是否完成,从而回调 3.2 处代码
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
// 2. 因为是 initAndRegister 异步执行,需要分两种情况来看,调试时也需要通过 suspend 断点类型加以区分
// 2.1 如果已经完成
if (regFuture.isDone()) {
ChannelPromise promise = channel.newPromise();
// 3.1 立刻调用 doBind0
doBind0(regFuture, channel, localAddress, promise);
return promise;
}
// 2.2 还没有完成
else {
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
// 3.2 回调 doBind0
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// 处理异常...
promise.setFailure(cause);
} else {
promise.registered();
// 3. 由注册线程去执行 doBind0
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}io.netty.bootstrap.AbstractBootstrap#initAndRegister
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();
// 1.1 初始化 - 做的事就是添加一个初始化器 ChannelInitializer
init(channel);
} catch (Throwable t) {
// 处理异常...
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}
// 1.2 注册 - 做的事就是将原生 channel 注册到 selector 上
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
// 处理异常...
}
return regFuture;
}io.netty.bootstrap.ServerBootstrap#init
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50// 这里 channel 实际上是 NioServerSocketChannel
void init(Channel channel) throws Exception {
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
setChannelOptions(channel, options, logger);
}
final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
}
// 为 NioServerSocketChannel 添加初始化器
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
// 初始化器的职责是将 ServerBootstrapAcceptor 加入至 NioServerSocketChannel
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}io.netty.channel.AbstractChannel.AbstractUnsafe#register
1 |
|
io.netty.channel.AbstractChannel.AbstractUnsafe#register0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33private void register0(ChannelPromise promise) {
try {
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
// 1.2.1 原生的 nio channel 绑定到 selector 上,注意此时没有注册 selector 关注事件,附件为 NioServerSocketChannel
doRegister();
neverRegistered = false;
registered = true;
// 1.2.2 执行 NioServerSocketChannel 初始化器的 initChannel
pipeline.invokeHandlerAddedIfNeeded();
// 回调 3.2 io.netty.bootstrap.AbstractBootstrap#doBind0
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
// 对应 server socket channel 还未绑定,isActive 为 false
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}io.netty.channel.ChannelInitializer#initChannel
1 |
|
io.netty.bootstrap.AbstractBootstrap#doBind0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16// 3.1 或 3.2 执行 doBind0
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}io.netty.channel.AbstractChannel.AbstractUnsafe#bind
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
assertEventLoop();
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
localAddress instanceof InetSocketAddress &&
!((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
!PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
// 记录日志...
}
boolean wasActive = isActive();
try {
// 3.3 执行端口绑定
doBind(localAddress);
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
// 3.4 触发 active 事件
pipeline.fireChannelActive();
}
});
}
safeSetSuccess(promise);
}io.netty.channel.socket.nio.NioServerSocketChannel#doBind
1 |
|
io.netty.channel.DefaultChannelPipeline.HeadContext#channelActive
1
2
3
4
5public void channelActive(ChannelHandlerContext ctx) {
ctx.fireChannelActive();
// 触发 read (NioServerSocketChannel 上的 read 不是读取数据,只是为了触发 channel 的事件注册)
readIfIsAutoRead();
}io.netty.channel.nio.AbstractNioChannel#doBeginRead
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
final int interestOps = selectionKey.interestOps();
// readInterestOp 取值是 16,在 NioServerSocketChannel 创建时初始化好,代表关注 accept 事件
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp);
}
}
NioEventLoop
NioEventLoop线程不仅要处理IO事件,还要处理Task(包括普通任务和定时任务)
提交任务代码
io.netty.util.concurrent.SingleThreadEventExecutor#execute
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
boolean inEventLoop = inEventLoop();
// 添加任务,其中队列使用了 jctools 提供的 mpsc 无锁队列
addTask(task);
if (!inEventLoop) {
// inEventLoop 如果为 false 表示由其它线程来调用 execute,即首次调用,这时需要向 eventLoop 提交首个任务,启动死循环,会执行到下面的 doStartThread
startThread();
if (isShutdown()) {
// 如果已经 shutdown,做拒绝逻辑,代码略...
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
// 如果线程由于 IO select 阻塞了,添加的任务的线程需要负责唤醒 NioEventLoop 线程
wakeup(inEventLoop);
}
}唤醒
select
阻塞线程io.netty.channel.nio.NioEventLoop#wakeup
1
2
3
4
5
6@Override
protected void wakeup(boolean inEventLoop) {
if (!inEventLoop && wakenUp.compareAndSet(false, true)) {
selector.wakeup();
}
}启动 EventLoop 主循环
io.netty.util.concurrent.SingleThreadEventExecutor#doStartThread
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25private void doStartThread() {
assert thread == null;
executor.execute(new Runnable() {
@Override
public void run() {
// 将线程池的当前线程保存在成员变量中,以便后续使用
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
boolean success = false;
updateLastExecutionTime();
try {
// 调用外部类 SingleThreadEventExecutor 的 run 方法,进入死循环,run 方法见下
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
// 清理工作,代码略...
}
}
});
}io.netty.channel.nio.NioEventLoop#run
主要任务是执行死循环,不断看有没有新任务,有没有 IO 事件1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73protected void run() {
for (;;) {
try {
try {
// calculateStrategy 的逻辑如下:
// 有任务,会执行一次 selectNow,清除上一次的 wakeup 结果,无论有没有 IO 事件,都会跳过 switch
// 没有任务,会匹配 SelectStrategy.SELECT,看是否应当阻塞
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.BUSY_WAIT:
case SelectStrategy.SELECT:
// 因为 IO 线程和提交任务线程都有可能执行 wakeup,而 wakeup 属于比较昂贵的操作,因此使用了一个原子布尔对象 wakenUp,它取值为 true 时,表示该由当前线程唤醒
// 进行 select 阻塞,并设置唤醒状态为 false
boolean oldWakenUp = wakenUp.getAndSet(false);
// 如果在这个位置,非 EventLoop 线程抢先将 wakenUp 置为 true,并 wakeup
// 下面的 select 方法不会阻塞
// 等 runAllTasks 处理完成后,到再循环进来这个阶段新增的任务会不会及时执行呢?
// 因为 oldWakenUp 为 true,因此下面的 select 方法就会阻塞,直到超时
// 才能执行,让 select 方法无谓阻塞
select(oldWakenUp);
if (wakenUp.get()) {
selector.wakeup();
}
default:
}
} catch (IOException e) {
rebuildSelector0();
handleLoopException(e);
continue;
}
cancelledKeys = 0;
needsToSelectAgain = false;
// ioRatio 默认是 50
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
processSelectedKeys();
} finally {
// ioRatio 为 100 时,总是运行完所有非 IO 任务
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// 记录 io 事件处理耗时
final long ioTime = System.nanoTime() - ioStartTime;
// 运行非 IO 任务,一旦超时会退出 runAllTasks
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}
wakeup
既可以由提交任务的线程来调用(比较好理解),也可以由EventLoop线程来调用(比较费解),wakeup
方法的效果:
- 由非EventLoop线程调用,会唤醒当前在执行
select
阻塞的EventLoop线程- 由EventLoop自己调用,会本次的
wakeup
会取消下一次的select
操作
参考下图

io.netty.channel.nio.NioEventLoop#select
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
try {
int selectCnt = 0;
long currentTimeNanos = System.nanoTime();
// 计算等待时间
// * 没有 scheduledTask,超时时间为 1s
// * 有 scheduledTask,超时时间为 `下一个定时任务执行时间 - 当前时间`
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
for (;;) {
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
// 如果超时,退出循环
if (timeoutMillis <= 0) {
if (selectCnt == 0) {
selector.selectNow();
selectCnt = 1;
}
break;
}
// 如果期间又有 task 退出循环,如果没这个判断,那么任务就会等到下次 select 超时时才能被执行
// wakenUp.compareAndSet(false, true) 是让非 NioEventLoop 不必再执行 wakeup
if (hasTasks() && wakenUp.compareAndSet(false, true)) {
selector.selectNow();
selectCnt = 1;
break;
}
// select 有限时阻塞
// 注意 nio 有 bug,当 bug 出现时,select 方法即使没有时间发生,也不会阻塞住,导致不断空轮询,cpu 占用 100%
int selectedKeys = selector.select(timeoutMillis);
// 计数加 1
selectCnt ++;
// 醒来后,如果有 IO 事件、或是由非 EventLoop 线程唤醒,或者有任务,退出循环
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
break;
}
if (Thread.interrupted()) {
// 线程被打断,退出循环
// 记录日志
selectCnt = 1;
break;
}
long time = System.nanoTime();
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
// 如果超时,计数重置为 1,下次循环就会 break
selectCnt = 1;
}
// 计数超过阈值,由 io.netty.selectorAutoRebuildThreshold 指定,默认 512
// 这是为了解决 nio 空轮询 bug
else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
// 重建 selector
selector = selectRebuildSelector(selectCnt);
selectCnt = 1;
break;
}
currentTimeNanos = time;
}
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
// 记录日志
}
} catch (CancelledKeyException e) {
// 记录日志
}
}处理keys
io.netty.channel.nio.NioEventLoop#processSelectedKeys
1
2
3
4
5
6
7
8
9private void processSelectedKeys() {
if (selectedKeys != null) {
// 通过反射将 Selector 实现类中的就绪事件集合替换为 SelectedSelectionKeySet
// SelectedSelectionKeySet 底层为数组实现,可以提高遍历性能(原本为 HashSet)
processSelectedKeysOptimized();
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}io.netty.channel.nio.NioEventLoop#processSelectedKey
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
// 当 key 取消或关闭时会导致这个 key 无效
if (!k.isValid()) {
// 无效时处理...
return;
}
try {
int readyOps = k.readyOps();
// 连接事件
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
// 可写事件
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
ch.unsafe().forceFlush();
}
// 可读或可接入事件
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
// 如果是可接入 io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe#read
// 如果是可读 io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
accept
nio中如下代码,在netty中的流程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20//1 阻塞直到事件发生
selector.select();
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
//2 拿到一个事件
SelectionKey key = iter.next();
//3 如果是 accept 事件
if (key.isAcceptable()) {
//4 执行 accept
SocketChannel channel = serverSocketChannel.accept();
channel.configureBlocking(false);
//5 关注 read 事件
channel.register(selector, SelectionKey.OP_READ);
}
// ...
}可接入事件处理(accept)
io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe#read
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58public void read() {
assert eventLoop().inEventLoop();
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config);
boolean closed = false;
Throwable exception = null;
try {
try {
do {
// doReadMessages 中执行了 accept 并创建 NioSocketChannel 作为消息放入 readBuf
// readBuf 是一个 ArrayList 用来缓存消息
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
// localRead 为 1,就一条消息,即接收一个客户端连接
allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
// 触发 read 事件,让 pipeline 上的 handler 处理,这时是处理
// io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor#channelRead
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
if (exception != null) {
closed = closeOnReadError(exception);
pipeline.fireExceptionCaught(exception);
}
if (closed) {
inputShutdown = true;
if (isOpen()) {
close(voidPromise());
}
}
} finally {
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor#channelRead
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 这时的 msg 是 NioSocketChannel
final Channel child = (Channel) msg;
// NioSocketChannel 添加 childHandler 即初始化器
child.pipeline().addLast(childHandler);
// 设置选项
setChannelOptions(child, childOptions, logger);
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
try {
// 注册 NioSocketChannel 到 nio worker 线程,接下来的处理也移交至 nio worker 线程
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}io.netty.channel.AbstractChannel.AbstractUnsafe#register
1 |
|
io.netty.channel.AbstractChannel.AbstractUnsafe#register0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31private void register0(ChannelPromise promise) {
try {
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
doRegister();
neverRegistered = false;
registered = true;
// 执行初始化器,执行前 pipeline 中只有 head -> 初始化器 -> tail
pipeline.invokeHandlerAddedIfNeeded();
// 执行后就是 head -> logging handler -> my handler -> tail
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
if (isActive()) {
if (firstRegistration) {
// 触发 pipeline 上 active 事件
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}io.netty.channel.DefaultChannelPipeline.HeadContext#channelActive
1
2
3
4
5public void channelActive(ChannelHandlerContext ctx) {
ctx.fireChannelActive();
// 触发 read (NioSocketChannel 这里 read,只是为了触发 channel 的事件注册,还未涉及数据读取)
readIfIsAutoRead();
}io.netty.channel.nio.AbstractNioChannel#doBeginRead
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
// 这时候 interestOps 是 0
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
// 关注 read 事件
selectionKey.interestOps(interestOps | readInterestOp);
}
}
read
可读事件
io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read
,注意发送的数据未必能够一次读完,因此会触发多次nio read事件,一次事件内会触发多次pipeline read,一次事件会触发一次pipeline read complete1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54public final void read() {
final ChannelConfig config = config();
if (shouldBreakReadReady(config)) {
clearReadPending();
return;
}
final ChannelPipeline pipeline = pipeline();
// io.netty.allocator.type 决定 allocator 的实现
final ByteBufAllocator allocator = config.getAllocator();
// 用来分配 byteBuf,确定单次读取大小
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);
ByteBuf byteBuf = null;
boolean close = false;
try {
do {
byteBuf = allocHandle.allocate(allocator);
// 读取
allocHandle.lastBytesRead(doReadBytes(byteBuf));
if (allocHandle.lastBytesRead() <= 0) {
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
if (close) {
readPending = false;
}
break;
}
allocHandle.incMessagesRead(1);
readPending = false;
// 触发 read 事件,让 pipeline 上的 handler 处理,这时是处理 NioSocketChannel 上的 handler
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
}
// 是否要继续循环
while (allocHandle.continueReading());
allocHandle.readComplete();
// 触发 read complete 事件
pipeline.fireChannelReadComplete();
if (close) {
closeOnRead(pipeline);
}
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close, allocHandle);
} finally {
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}io.netty.channel.DefaultMaxMessagesRecvByteBufAllocator.MaxMessageHandle#continueReading(io.netty.util.UncheckedBooleanSupplier)
1
2
3
4
5
6
7
8
9
10
11
12public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
return
// 一般为 true
config.isAutoRead() &&
// respectMaybeMoreData 默认为 true
// maybeMoreDataSupplier 的逻辑是如果预期读取字节与实际读取字节相等,返回 true
(!respectMaybeMoreData || maybeMoreDataSupplier.get()) &&
// 小于最大次数,maxMessagePerRead 默认 16
totalMessages < maxMessagePerRead &&
// 实际读到了数据
totalBytesRead > 0;
}