Netty

NIO

概念

  • NIO:非阻塞IO

  • Java Socket是全双工的:在任意时刻,线路上存在A到BB到A的双向信号传输。即使是阻塞IO,读和写可以同时进行,只要分别采用读线程和写线程即可,读不会阻塞写、写也不会阻塞读

三大组件

Channel&Buffer

  • NIO系统的核心
  • Buffer是非线程安全的
  • channel类似于 stream,是读写数据的双向通道,可以从channel将数据读入buffer,也可以将buffer的数据写入channel。channel比stream更底层
  • 通道负责传输,缓冲区负责存储
graph LR
channel --> buffer
buffer --> channel
  • Channel

    • FileChannel:文件传输

    • DatagramChannel:UTP网络通信

    • SocketChannel:TCP网络通信

    • ServerSocketChannel:TCP网络通信

  • Buffer

    • ByteBuffer:使用较多

      • MappedByteBuffer
      • DirectByteBuffer
      • HeapByteBuffer
    • [Short/Int/Long/Double/Char]Buffer

Selector

  • 多线程版设计

    • 内存占用高
    • 线程上下文切换成本高
    • 只适合连接数少的场景
    graph TD
    t1(thread) --> s1(socket1)
    t2(thread) --> s2(socket2)
    t3(thread) --> s3(socket3)
    
  • 线程池版设计

    • 阻塞模式下,线程仅能处理一个socket连接
    • 仅适合短连接场景
    graph TD
    t4(thread) --> s4(socket1)
    t5(thread) --> s5(socket2)
    t4(thread) -.-> s6(socket3)
    t5(thread) -.-> s7(socket4)
    
  • Selector版设计

    • 配合一个线程来管理多个channel获取这些channel上发生的事件
    • channel工作在非阻塞模式下,不会让线程吊死在一个channel上
    • 适合连接数特别多,但流量低的场景(low traffic)
    • 调用selector的select()会阻塞直到channel发生了读写就绪事件,事件发生后select 方法就会返回这些事件交给thread处理
    graph TD
    thread --> selector
    selector --> c1(channel)
    selector --> c2(channel)
    selector --> c3(channel)
    

ByteBuffer

使用步骤

  1. 向buffer写入数据,如调用channel.read(buffer)
  2. 调用flip()切换至读模式
  3. 从 buffer 读取数据,如调用 buffer.get()
  4. 调用clear()compact() 切换至写模式
  5. 重复1~4步骤
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 获取FileChannel
// 可从输入输出流,RandomAccessFile获取
try (FileChannel channel = new FileInputStream("data/data.txt").getChannel()) {
// 准备缓冲区
ByteBuffer buffer = ByteBuffer.allocate(10);
while (true) {
// 从channel读取,向buffer写入
int size = channel.read(buffer);
log.debug("读取字节数: {}", size);
// 如果返回-1则代表channel没有数据
if (size == -1) break;
// 打印buffer的内容
// 切换至读模式
buffer.flip();
// 是否还有剩余未读数据
while (buffer.hasRemaining()) {
// 读一个字节
byte b = buffer.get();
log.debug("实际字节: {}", (char) b);
}
// 切换为写模式
buffer.clear();
}
} catch (IOExceptIOn e) {
throw new RuntimeExceptIOn(e);
}

结构原理

  • 重要属性

    • Capacity:buffer容量
    • PositIOn:读写指针
    • Limit:读写限制
  • 使用过程

    • 写模式下,PositIOn是写入位置,Limit等于Capacity

      • 下图表示写入了4个字节后的状态
      1
      2
      3
      4
      5
      6
      7
                  Limit

      +---+---+---+---+---+---+---+
      | a | b | c | d | | | |
      +---+---+---+---+---+---+---+
      ↑ ↑
      PositIOn Capacity
    • flip动作发生后,positIOn切换为读取位置,limit切换为读取限制

      1
      2
      3
      4
      5
      6
      7
                  Limit

      +---+---+---+---+---+---+---+
      | a | b | c | d | | | |
      +---+---+---+---+---+---+---+
      ↑ ↑
      PositIOn Capacity
    • clear动作发生后的状态

      1
      2
      3
      4
      5
      6
      7
                              Limit

      +---+---+---+---+---+---+---+
      | | | | | | | |
      +---+---+---+---+---+---+---+
      ↑ ↑
      Position Capacity
    • compact方法:把未读完的部分向前压缩,然后切换至写模式

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      // compact前
      Limit

      +---+---+---+---+---+---+---+
      | a | b | c | d | | | |
      +---+---+---+---+---+---+---+
      ↑ ↑
      PositIOn apacity

      // compact后
      Limit

      +---+---+---+---+---+---+---+
      | c | d | | | | | |
      +---+---+---+---+---+---+---+
      ↑ ↑
      PositIOn Capacity
  • 调试工具类

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    162
    163
    164
    165
    166
    167
    168
    169
    170
    public class ByteBufferUtil {
    private static final char[] BYTE2CHAR = new char[256];
    private static final char[] HEXDUMP_TABLE = new char[256 * 4];
    private static final String[] HEXPADDING = new String[16];
    private static final String[] HEXDUMP_ROWPREFIXES = new String[65536 >>> 4];
    private static final String[] BYTE2HEX = new String[256];
    private static final String[] BYTEPADDING = new String[16];

    static {
    final char[] DIGITS = "0123456789abcdef".toCharArray();
    for (int i = 0; i < 256; i++) {
    HEXDUMP_TABLE[i << 1] = DIGITS[i >>> 4 & 0x0F];
    HEXDUMP_TABLE[(i << 1) + 1] = DIGITS[i & 0x0F];
    }

    int i;

    // Generate the lookup table for hex dump paddings
    for (i = 0; i < HEXPADDING.length; i++) {
    int padding = HEXPADDING.length - i;
    StringBuilder buf = new StringBuilder(padding * 3);
    for (int j = 0; j < padding; j++) {
    buf.append(" ");
    }
    HEXPADDING[i] = buf.toString();
    }

    // Generate the lookup table for the start-offset header in each row (up to 64KiB).
    for (i = 0; i < HEXDUMP_ROWPREFIXES.length; i++) {
    StringBuilder buf = new StringBuilder(12);
    buf.append(NEWLINE);
    buf.append(Long.toHexString(i << 4 & 0xFFFFFFFFL | 0x100000000L));
    buf.setCharAt(buf.length() - 9, '|');
    buf.append('|');
    HEXDUMP_ROWPREFIXES[i] = buf.toString();
    }

    // Generate the lookup table for byte-to-hex-dump conversIOn
    for (i = 0; i < BYTE2HEX.length; i++) {
    BYTE2HEX[i] = ' ' + StringUtil.byteToHexStringPadded(i);
    }

    // Generate the lookup table for byte dump paddings
    for (i = 0; i < BYTEPADDING.length; i++) {
    int padding = BYTEPADDING.length - i;
    StringBuilder buf = new StringBuilder(padding);
    for (int j = 0; j < padding; j++) {
    buf.append(' ');
    }
    BYTEPADDING[i] = buf.toString();
    }

    // Generate the lookup table for byte-to-char conversIOn
    for (i = 0; i < BYTE2CHAR.length; i++) {
    if (i <= 0x1f || i >= 0x7f) {
    BYTE2CHAR[i] = '.';
    } else {
    BYTE2CHAR[i] = (char) i;
    }
    }
    }

    /**
    * 打印所有内容
    *
    * @param buffer
    */
    public static void debugAll(ByteBuffer buffer) {
    int oldlimit = buffer.limit();
    buffer.limit(buffer.capacity());
    StringBuilder origin = new StringBuilder(256);
    appendPrettyHexDump(origin, buffer, 0, buffer.capacity());
    System.out.println("+--------+-------------------- all ------------------------+----------------+");
    System.out.printf("positIOn: [%d], limit: [%d]\n", buffer.positIOn(), oldlimit);
    System.out.println(origin);
    buffer.limit(oldlimit);
    }

    /**
    * 打印可读取内容
    *
    * @param buffer
    */
    public static void debugRead(ByteBuffer buffer) {
    StringBuilder builder = new StringBuilder(256);
    appendPrettyHexDump(builder, buffer, buffer.positIOn(), buffer.limit() - buffer.positIOn());
    System.out.println("+--------+-------------------- read -----------------------+----------------+");
    System.out.printf("positIOn: [%d], limit: [%d]\n", buffer.positIOn(), buffer.limit());
    System.out.println(builder);
    }

    private static void appendPrettyHexDump(StringBuilder dump, ByteBuffer buf, int offset, int length) {
    if (isOutOfBounds(offset, length, buf.capacity())) {
    throw new IndexOutOfBoundsExceptIOn(
    "expected: " + "0 <= offset(" + offset + ") <= offset + length(" + length
    + ") <= " + "buf.capacity(" + buf.capacity() + ')');
    }
    if (length == 0) {
    return;
    }
    dump.append(
    " +-------------------------------------------------+" +
    NEWLINE + " | 0 1 2 3 4 5 6 7 8 9 a b c d e f |" +
    NEWLINE + "+--------+-------------------------------------------------+----------------+");

    final int startIndex = offset;
    final int fullRows = length >>> 4;
    final int remainder = length & 0xF;

    // Dump the rows which have 16 bytes.
    for (int row = 0; row < fullRows; row++) {
    int rowStartIndex = (row << 4) + startIndex;

    // Per-row prefix.
    appendHexDumpRowPrefix(dump, row, rowStartIndex);

    // Hex dump
    int rowEndIndex = rowStartIndex + 16;
    for (int j = rowStartIndex; j < rowEndIndex; j++) {
    dump.append(BYTE2HEX[getUnsignedByte(buf, j)]);
    }
    dump.append(" |");

    // ASCII dump
    for (int j = rowStartIndex; j < rowEndIndex; j++) {
    dump.append(BYTE2CHAR[getUnsignedByte(buf, j)]);
    }
    dump.append('|');
    }

    // Dump the last row which has less than 16 bytes.
    if (remainder != 0) {
    int rowStartIndex = (fullRows << 4) + startIndex;
    appendHexDumpRowPrefix(dump, fullRows, rowStartIndex);

    // Hex dump
    int rowEndIndex = rowStartIndex + remainder;
    for (int j = rowStartIndex; j < rowEndIndex; j++) {
    dump.append(BYTE2HEX[getUnsignedByte(buf, j)]);
    }
    dump.append(HEXPADDING[remainder]);
    dump.append(" |");

    // Ascii dump
    for (int j = rowStartIndex; j < rowEndIndex; j++) {
    dump.append(BYTE2CHAR[getUnsignedByte(buf, j)]);
    }
    dump.append(BYTEPADDING[remainder]);
    dump.append('|');
    }

    dump.append(NEWLINE +
    "+--------+-------------------------------------------------+----------------+");
    }

    private static void appendHexDumpRowPrefix(StringBuilder dump, int row, int rowStartIndex) {
    if (row < HEXDUMP_ROWPREFIXES.length) {
    dump.append(HEXDUMP_ROWPREFIXES[row]);
    } else {
    dump.append(NEWLINE);
    dump.append(Long.toHexString(rowStartIndex & 0xFFFFFFFFL | 0x100000000L));
    dump.setCharAt(dump.length() - 9, '|');
    dump.append('|');
    }
    }

    public static short getUnsignedByte(ByteBuffer buffer, int index) {
    return (short) (buffer.get(index) & 0xFF);
    }
    }

常用方法

  • 分配空间

    • 可以使用allocate方法为ByteBuffer分配空间,其它buffer类也有该方法

    • allocate不能动态分配,若想使用更大的buffer只能重新分配

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      // java.nIO.HeapByteBuffer:Java堆内存
      // 读写效率较低
      // 受到GC影响
      System.out.println(ByteBuffer.allocate(16).getClass());
      // java.nIO.DirectByteBuffer:直接内存
      // 读写效率较高
      // 不受到GC影响
      // 分配效率低
      // 可能会造成内存泄漏
      System.out.println(ByteBuffer.allocateDirect(16).getClass());
  • 向buffer写入数据

    • 方法1:调用channel的read方法

      1
      int readBytes = channel.read(buf);
    • 方法2:调用buffer自己的put方法

      1
      buf.put((byte)127);
  • 从buffer读取数据

    • 方法1:调用channel的write方法

      1
      int writeBytes = channel.write(buf);
    • 方法2:调用buffer自己的get方法

      1
      byte b = buf.get();
    • get方法会让PositIOn读指针向后走,如果想重复读取数据

      • 可以调用rewind方法将positIOn重新置为0
      • 或者调用get(int i)方法获取索引i的内容,它不会移动读指针
  • mark&reset

    • mark在读取时做一个标记,即使positIOn改变,只要调用reset就能回到mark的位置
  • 字符串与ByteBuffer互转

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    // TODO String => ByteBuffer
    // 方式1:put,不会自动切换到读模式
    ByteBuffer buffer = ByteBuffer.allocate(16);
    buffer.put("hello".getBytes());
    // 方式2:Charset,会自动切换到读模式
    ByteBuffer buffer1 = StandardCharsets.UTF_8.encode("hello");
    // 方式3:wrap,会自动切换到读模式
    ByteBuffer buffer2 = ByteBuffer.wrap("hello".getBytes());

    // TODO ByteBuffer => String
    // 方式1:Charset,buffer需要转换成读模式
    buffer.flip();
    String str = StandardCharsets.UTF_8.decode(buffer).toString();

Scattering Reads

  • 分散读,将数据填充至多个buffer
  • 减少数据在buffer间的拷贝次数,提高效率
1
2
3
4
5
6
7
8
// TODO word.txt:OneTwoThree
try (FileChannel channel = new RandomAccessFile("data/word.txt", "r").getChannel()) {
ByteBuffer buffer1 = ByteBuffer.allocate(3);
ByteBuffer buffer2 = ByteBuffer.allocate(3);
ByteBuffer buffer3 = ByteBuffer.allocate(5);
channel.read(new ByteBuffer[]{buffer1, buffer2, buffer3});
} catch (IOExceptIOn ignore) {
}

Gathering Writes

  • 集中写
  • 减少数据在buffer间的拷贝次数,提高效率
1
2
3
4
5
6
7
ByteBuffer buffer1 = StandardCharsets.UTF_8.encode("Hello");
ByteBuffer buffer2 = StandardCharsets.UTF_8.encode("World");
ByteBuffer buffer3 = StandardCharsets.UTF_8.encode("你好");
try (FileChannel channel = new RandomAccessFile("data/words.txt", "rw").getChannel()) {
channel.write(new ByteBuffer[]{buffer1, buffer2, buffer3});
} catch (IOExceptIOn ignore) {
}

文件编程

FileChannel

  • 工作模式:FileChannel只能工作在阻塞模式下

  • 获取

    • 不能直接打开FileChannel,必须通过FileInputStream/FileOutputStream/RandomAccessFile来获取FileChannel,它们都有getChannel方法
    • 通过FileInputStream获取的channel只能读
    • 通过FileOutputStream获取的channel只能写
    • 通过RandomAccessFile是否能读写根据构造RandomAccessFile时的读写模式决定
  • 读取

    • 从channel读取数据填充ByteBuffer,返回值表示读到了多少字节,-1表示到达了文件的末尾
    1
    int readBytes = channel.read(buffer);
  • 写入

    • while中调用channel.write是因为write方法并不能保证一次将buffer中的内容全部写入channel
    1
    2
    3
    4
    5
    6
    7
    ByteBuffer buffer = ...;
    buffer.put(...); // 存入数据
    buffer.flip(); // 切换读模式

    while(buffer.hasRemaining()) {
    channel.write(buffer);
    }
  • 关闭

    • channel必须关闭,不过调用了FileInputStream/FileOutputStream/RandomAccessFile的close方法会间接地调用 channel的close方法
  • 位置

    • 获取当前位置

      1
      long pos = channel.positIOn();
    • 设置当前位置

      1
      2
      long newPos = ...;
      channel.positIOn(newPos);
    • 设置当前位置时,如果设置为文件的末尾

      • 这时读取会返回 -1
      • 这时写入会追加内容,但要注意如果positIOn超过了文件末尾,再写入时在新内容和原末尾之间会有空洞(00)
  • 大小

    • 使用size方法获取文件的大小
  • 强制写入

    • 操作系统出于性能的考虑会将数据缓存,不是立刻写入磁盘。可以调用force(true) 方法将文件内容和元数据(文件的权限等信息)立刻写入磁盘

Channel传输数据

  • 2G以内

    • transferTo效率高,底层会使用操作系统的零拷贝进行优化,最多传输2G数据
    1
    2
    3
    4
    5
    6
    7
    8
    try (
    FileChannel source = new FileInputStream("data/data.txt").getChannel();
    FileChannel distance = new FileOutputStream("data/data.txt.copy").getChannel();
    ) {
    source.transferTo(0, source.size(), distance);
    } catch (IOExceptIOn e) {
    e.printStackTrace();
    }
  • 超过2G

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    try (
    FileChannel source = new FileInputStream("data/data.txt").getChannel();
    FileChannel distance = new FileOutputStream("data/data.txt.copy").getChannel();
    ) {
    long size = source.size(), left = size;
    // 剩余部分大于0
    while (left > 0) {
    left -= source.transferTo(size - left, left, distance);
    }
    } catch (IOExceptIOn e) {
    e.printStackTrace();
    }

Path

  • JDK7引入了Path和Paths类

    • Path用来表示文件路径
    • Paths是工具类,用来获取Path实例
    1
    2
    3
    4
    5
    6
    7
    8
    // 相对路径 使用 user.dir 环境变量来定位 1.txt
    Path source = Paths.get("1.txt");
    // 绝对路径 代表了 d:\1.txt
    Path source = Paths.get("d:\\1.txt");
    // 绝对路径 同样代表了 d:\1.txt
    Path source = Paths.get("d:/1.txt");
    // 代表了 d:\data\projects
    Path projects = Paths.get("d:\\data", "projects");
  • . 代表了当前路径

  • .. 代表了上一级路径

Files

  • 检查文件是否存在

    1
    2
    Path path = Paths.get("helloword/data.txt");
    System.out.println(Files.exists(path));
  • 创建一级目录

    • 如果目录已存在,会抛异常FileAlreadyExistsExceptIOn

    • 不能一次创建多级目录,否则会抛异常NoSuchFileExceptIOn

    1
    2
    Path path = Paths.get("helloword/d1");
    Files.createDirectory(path);
  • 创建多级目录用

    1
    2
    Path path = Paths.get("helloword/d1/d2");
    Files.createDirectories(path);
  • 拷贝文件

    • 如果文件已存在,会抛异常FileAlreadyExistsExceptIOn

      1
      2
      3
      Path source = Paths.get("helloword/data.txt");
      Path target = Paths.get("helloword/target.txt");
      Files.copy(source, target);
    • 如果希望用source覆盖掉target,需要用StandardCopyOptIOn来控制

      1
      Files.copy(source, target, StandardCopyOptIOn.REPLACE_EXISTING);
  • 移动文件

    • StandardCopyOptIOn.ATOMIC_MOVE保证文件移动的原子性
    1
    2
    3
    Path source = Paths.get("helloword/data.txt");
    Path target = Paths.get("helloword/data.txt");
    Files.move(source, target, StandardCopyOptIOn.ATOMIC_MOVE);
  • 删除文件

    • 如果文件不存在,会抛异常NoSuchFileExceptIOn
    1
    2
    Path target = Paths.get("helloword/target.txt");
    Files.delete(target);
  • 删除目录

    • 如果目录还有内容,会抛异常DirectoryNotEmptyExceptIOn
    1
    2
    Path target = Paths.get("helloword/d1");
    Files.delete(target);
  • 遍历目录文件

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    AtomicInteger dirCount = new AtomicInteger();
    AtomicInteger fileCount = new AtomicInteger();
    // 参数1:遍历起点
    // 参数2:对应操作
    Files.walkFileTree(Paths.get("/Users/mousse/IdeaProjects/JavaEECourse/Netty"), new SimpleFileVisitor<Path>() {
    // 进入文件夹前
    @Override
    public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOExceptIOn {
    System.out.println(dir);
    dirCount.incrementAndGet();
    return super.preVisitDirectory(dir, attrs);
    }

    // 遍历到文件时
    @Override
    public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOExceptIOn {
    System.out.println("\t" + file);
    fileCount.incrementAndGet();
    return super.visitFile(file, attrs);
    }

    // 遍历文件失败时
    @Override
    public FileVisitResult visitFileFailed(Path file, IOExceptIOn exc) throws IOExceptIOn {
    return super.visitFileFailed(file, exc);
    }

    // 文件夹出来后
    @Override
    public FileVisitResult postVisitDirectory(Path dir, IOExceptIOn exc) throws IOExceptIOn {
    return super.postVisitDirectory(dir, exc);
    }
    });
    System.out.println("dir count: \t" + dirCount);
    System.out.println("file count: " + fileCount);
  • 统计Jar的数目

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    AtomicInteger jarCount = new AtomicInteger();
    Files.walkFileTree(Paths.get("/Users/mousse/IdeaProjects/JavaEECourse/Netty"), new SimpleFileVisitor<Path>() {
    @Override
    public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOExceptIOn {
    if (file.toString().endsWith(".jar")) {
    System.out.println("\t" + file);
    jarCount.incrementAndGet();
    }
    return super.visitFile(file, attrs);
    }
    });
    System.out.println("jar count: \t" + jarCount);
  • 删除多级目录

    • 删除是危险操作,确保要递归删除的文件夹没有重要内容
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    Files.walkFileTree(Paths.get("/Users/mousse/IdeaProjects/JavaEECourse/Netty/data/delete"), new SimpleFileVisitor<Path>() {
    @Override
    public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOExceptIOn {
    System.out.println("进入" + dir);
    return super.preVisitDirectory(dir, attrs);
    }

    @Override
    public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOExceptIOn {
    Files.delete(file);
    return super.visitFile(file, attrs);
    }

    @Override
    public FileVisitResult postVisitDirectory(Path dir, IOExceptIOn exc) throws IOExceptIOn {
    System.out.println("退出" + dir);
    Files.delete(dir);
    return super.postVisitDirectory(dir, exc);
    }
    });
  • 拷贝多级目录

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    String source = "data/data.txt";
    String target = "data/data.txt.copy";
    Files.walk(Paths.get(source)).forEach(path -> {
    try {
    String targetName = path.toString().replace(source, target);
    if (Files.isDirectory(path)) {
    // 是目录
    Files.createDirectory(Paths.get(targetName));
    } else if (Files.isRegularFile(path)) {
    // 是文件
    Files.copy(path, Paths.get(targetName));
    }
    } catch (IOExceptIOn e) {
    throw new RuntimeExceptIOn(e);
    }
    });

网络编程

非阻塞/阻塞

  • 阻塞

    • 阻塞模式下,相关方法会导致线程暂停

      • ServerSocketChannel.accept:没有连接建立时让线程暂停
      • SocketChannel.read:没有数据可读时让线程暂停
      • 暂停期间不会占用CPU,线程相当于闲置
    • 单线程下,阻塞方法之间相互影响,几乎不能正常工作,需要多线程支持

    • 服务端

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      ByteBuffer buffer = ByteBuffer.allocate(16);
      // 创建服务器
      ServerSocketChannel ssc = ServerSocketChannel.open();
      // 绑定监听端口
      ssc.bind(new InetSocketAddress(8080));
      List<SocketChannel> channels = new ArrayList<>();
      // 建立与客户端的连接
      while (true) {
      log.debug("connecting");
      // SocketChannel为数据读写的通道
      // accept方法为阻塞方法,线程会停止运行
      SocketChannel sc = ssc.accept();
      log.debug("connected: {}", sc);
      channels.add(sc);
      // 接收客户端发送的数据
      for (SocketChannel channel : channels) {
      log.debug("start read: {}", channel);
      // read方法为阻塞方法,线程会停止运行
      channel.read(buffer);
      buffer.flip();
      debugRead(buffer);
      buffer.clear();
      log.debug("finish read: {}", channel);
      }
      }
    • 客户端

      1
      2
      3
      SocketChannel sc = SocketChannel.open();
      sc.connect(new InetSocketAddress("localhost", 8080));
      System.out.println("waiting...");
  • 非阻塞

    • 非阻塞模式下,相关方法不会让线程暂停

      • ServerSocketChannel.accept:没有连接建立时返回null,继续运行
      • SocketChannel.read:没有数据可读时会返回 0,线程不阻塞,可以去执行其它SocketChannel的read,或执行ServerSocketChannel.accept
      • 写数据时,线程只是等待数据写入Channel即可,无需等Channel通过网络把数据发送出去
    • 非阻塞模式下,即使没有连接建立和可读数据,线程仍然在不断运行,浪费资源

    • 数据复制过程中,线程实际还是阻塞的(AIO改进的地方)

    • 服务器端

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      ByteBuffer buffer = ByteBuffer.allocate(16);
      ServerSocketChannel ssc = ServerSocketChannel.open();
      // 将ServerSocketChannel设置为非阻塞模式
      ssc.configureBlocking(false);
      ssc.bind(new InetSocketAddress(8080));
      List<SocketChannel> channels = new ArrayList<>();
      // 建立与客户端的连接
      while (true) {
      // 设置为非阻塞模式后accept方法变为非阻塞方法
      // 如果没有建立连接则返回null,线程继续运行
      SocketChannel sc = ssc.accept();
      if (sc != null) {
      log.debug("connected: {}", sc);
      // 将SocketChannel设置为非阻塞模式
      sc.configureBlocking(false);
      channels.add(sc);
      }
      // 接收客户端发送的数据
      for (SocketChannel channel : channels) {
      // 设置为非阻塞模式后read方法变为非阻塞方法
      // 如果没有读到数据返回0
      int read = channel.read(buffer);
      if (read == 0) continue;
      log.debug("start read: {}", channel);
      buffer.flip();
      debugRead(buffer);
      buffer.clear();
      log.debug("finish read: {}", channel);
      }
      }

Selector

graph TD
thread --> selector
selector --> c1(channel)
selector --> c2(channel)
selector --> c3(channel)
  • 特点

    • 一个线程配合selector可以监控多个channel的事件,事件发生线程才去处理,称为多路复用

    • 使线程能够被充分利用

    • 节约线程数量

    • 减少线程上下文切换

    • 多路复用仅针对网络IO、普通文件IO没法利用

  • 创建

    1
    Selector selector = Selector.open();
  • 绑定Channel事件

    • 也称之为注册事件,绑定的事件selector才会关心
    • channel必须工作在非阻塞模式

    • FileChannel没有非阻塞模式,因此不能配合selector一起使用

    • 绑定的事件类型

      • connect:客户端连接成功时触发
      • accept:服务器端成功接受连接时触发
      • read:数据可读入时触发,有因为接收能力弱,数据暂不能读入的情况
      • write:数据可写出时触发,有因为发送能力弱,数据暂不能写出的情况
    1
    2
    channel.configureBlocking(false);
    SelectIOnKey key = channel.register(selector, 绑定事件);
  • 监听Channel事件

    • 方法的返回值代表有多少channel发生了事件

    • 方法1:阻塞直到绑定事件发生

      1
      int count = selector.select();
    • 方法2:阻塞直到绑定事件发生,或是超时(时间单位为 ms)

      1
      int count = selector.select(long timeout);
    • 方法3:不会阻塞,不管有没有事件,立刻返回,自己根据返回值检查是否有事件

      1
      int count = selector.selectNow();
    • select不阻塞时机

      • 事件发生时
        • 客户端发起连接请求会触发accept事件
        • 客户端发送数据过来,客户端正常/异常关闭时都会触发read事件,如果发送的数据大于buffer缓冲区,会触发多次读取事件
        • channel可写会触发write事件
        • 在linux下nIO bug发生时
      • 调用selector.wakeup()
      • 调用selector.close()
      • selector所在线程interrupt
  • 处理accept事件

    • 事件发生后,要么处理要么取消(cancel),什么都不做的话下次该事件仍会触发,这是因为nIO底层使用的是水平触发
  • 处理read事件

    • 客户端断开连接时会发送read事件,需要将key取消注册(从selectedKeys集合中删除)
    • 正常断开连接时read返回值为-1
    • 异常断开连接时抛出异常
  • 迭代器移除元素

    • select在事件发生后会将相关的key放入selectedKeys集合,但不会在处理完后从 selectedKeys集合中移除,需要手动删除
      • 第一次触发了ssckey上的accept事件,没有移除ssckey
      • 第二次触发了sckey上的read事件,但这时selectedKeys中还有上次的ssckey,在处理时因为没有连接,会导致空指针异常
  • cancel

    • 用来取消注册在selector上的channel,并从keys集合中删除key,后续不再监听事件
  • 处理消息边界

    • TLV格式(Type类型、Length长度、Value数据),类型和长度已知的情况下,可以方便获取消息大小,分配合适的buffer,缺点是buffer需要提前分配,如果内容过大会影响serve吞吐量
      • Http1.1是TLV格式
      • Http2.0是LTV格式
sequenceDiagram 
participant c1 as 客户端1
participant s as 服务器
participant b1 as ByteBuffer1
participant b2 as ByteBuffer2
c1 ->> s: 发送 0123456789abcdefMore\n
s ->> b1: 第一次 read 存入 01234567890abcdef
s ->> b2: 扩容
b1 ->> b2: 拷贝 01234567890abcdef
s ->> b2: 第二次 read 存入 More\n
b2 ->> b2: 01234567890abcdefMore\n
  • ByteBuffer大小分配

    • 每个channel都需要记录可能被切分的消息,ByteBuffer不能被多个channel共用,需要为每个channel维护一个独立的ByteBuffer

    • ByteBuffer不能太大,需要设计大小可变的ByteBuffer

      • 思路1
      • 思路2
        • 用多个数组组成buffer,一个数组不够就把多出来的内容写入新的数组,与前面的区别是消息存储不连续解析复杂,避免拷贝引起的性能损耗
  • 处理write事件

    • 非阻塞模式下,无法保证把buffer中所有数据都写入channel,需要追踪write方法的返回值(代表实际写入字节数)

    • 用selector监听所有channel的可写事件,每个channel都需要一个key来跟踪buffer,但这样又会导致占用内存过多,就有两阶段策略

      • 当消息处理器第一次写入消息时,才将channel注册到selector上
      • selector检查channel上的可写事件,若写完所有数据就取消注册channel
        • 不取消会每次可写均会触发write事件
    • 只要向channel送数据时socket缓冲可写,这个事件会频繁触发,应当只在socket缓冲区写不下时再关注可写事件,数据写完之后再取消关注

  • 利用多线程优化

    • 分两组选择器
      • 单线程配一个选择器,专门处理accept事件
      • 创建CPU核心数的线程,每个线程配一个选择器,轮流处理read事件
    • 获取CPU个数
      • Runtime.getRuntime().availableProcessors():若工作在docker容器下会拿到物理CPU个数,而不是容器申请时的个数
      • 这个问题直到JDK 10才修复,使用Jvm参数UseContainerSupport配置, 默认开启

UDP

  • UDP是无连接的,client发送数据不会管server是否开启

  • server的receive方法会将接收到的数据存入byte buffer,但如果数据报文超过buffer 大小,多出来的数据会被默默抛弃

  • 首先启动服务器端

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    public class UdpServer {
    public static void main(String[] args) {
    try (DatagramChannel channel = DatagramChannel.open()) {
    channel.socket().bind(new InetSocketAddress(9999));
    System.out.println("waiting...");
    ByteBuffer buffer = ByteBuffer.allocate(32);
    channel.receive(buffer);
    buffer.flip();
    debug(buffer);
    } catch (IOExceptIOn e) {
    e.printStackTrace();
    }
    }
    }
  • 运行客户端

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    public class UdpClient {
    public static void main(String[] args) {
    try (DatagramChannel channel = DatagramChannel.open()) {
    ByteBuffer buffer = StandardCharsets.UTF_8.encode("hello");
    InetSocketAddress address = new InetSocketAddress("localhost", 9999);
    channel.send(buffer, address);
    } catch (ExceptIOn e) {
    e.printStackTrace();
    }
    }
    }

NIO与BIO

Stream与Channel

  • Stream

    • 不会自动缓冲数据
    • 只支持阻塞API
    • 全双工(读写可同时进行)
  • Channel

    • 利用系统提供的发送/接收缓冲区(更为底层)
    • 支持阻塞/非阻塞API
    • 网络channel可配合selector实现多路复用
    • 全双工(读写可同时进行)

IO模型

  • 同步阻塞、同步非阻塞、同步多路复用、异步非阻塞

    • 同步:线程自己去获取结果(一个线程)
    • 异步:线程自己不去获取结果,而是由其它线程送结果(至少两个线程)
  • 当调用一次channel.readstream.read后,会切换至操作系统内核态来完成真正数据读取,读取分为两个阶段

    • 等待数据阶段

    • 复制数据阶段

  • 阻塞IO:同步,等待阶段和复制阶段都阻塞

sequenceDiagram 
participant u as 用户空间
participant k1 as 内核空间channel1
participant k2 as 内核空间channel2
u ->> k1: read
k1 ->> k1: 等待数据
k1 ->> k1: 复制数据
k1 ->> u: 返回
u ->> k2: accept
k2 ->> k2: 等待连接
k2 ->> k2: 建立连接
k2 ->> u: 返回
u ->> k1: read
k1 ->> k1: 复制数据
k1 ->> u: 返回
  • 非阻塞IO:同步,等待阶段非阻塞,复制阶段阻塞
sequenceDiagram 
participant u as 用户空间
participant k as 内核空间
u ->> k: read
k ->> u: 返回
u ->> k: read
k ->> k: 复制数据
k ->> u: 返回
  • 多路复用:同步
sequenceDiagram 
participant u as 用户空间
participant k1 as 内核空间channel1
participant k2 as 内核空间channel2
u ->> k1: select
u ->> k2: 返回
k1 ->> k1: 等待事件
k2 ->> k2: 返回
k1 ->> u: c1.read c2.accpet
k2 ->> u: 返回
u ->> k2: accept
k2 ->> k2: 建立连接
k2 ->> u: 返回
u ->> k1: read
k1 ->> k1: 复制数据
k1 ->> u: 返回
  • 信号驱动
  • 异步IO
sequenceDiagram 
participant u as 用户空间
participant k as 内核空间
u ->> k: thread1
k ->> u: 回调方法(参数)
k ->> k: 等待数据
k ->> k: 复制数据
k ->> u: thread2回调方法(真正结果)

零拷贝

  • 概念

    • 零拷贝并不是真正无拷贝,而是在不会拷贝重复数据到JVM内存中

    • 优点

      • 更少的用户态与内核态的切换

      • 不利用CPU计算,减少CPU缓存伪共享

      • 适合小文件传输

  • 传统IO问题

    • 传统的IO将一个文件通过socket写出

      1
      2
      3
      4
      5
      6
      7
      8
      9
      File f = new File("helloword/data.txt");
      RandomAccessFile file = new RandomAccessFile(file, "r");
      byte[] buf = new byte[(int)f.length()];
      // 转为内核态
      file.read(buf);
      // 转为用户态
      Socket socket = ...;
      // 转为内核态
      socket.getOutputStream().write(buf);
    • 内部工作流程

    graph LR
    磁盘 -- 拷贝1 --> 内核缓冲区 -- 拷贝2 --> 用户缓冲区 -- 拷贝3 --> Socket缓冲区 -- 拷贝4 --> 网卡
    
    1. Java本身不具备IO读写能力,read方法调用后,要从Java程序的用户态切换至内核态,去调用操作系统(Kernel)的读能力,将数据读入内核缓冲区

      • 期间用户线程阻塞
      • 操作系统使用DMA(Direct Memory Access)实现文件读,其间也不会使用CPU(DMA可以理解为硬件单元,用来解放CPU完成文件IO)
    2. 内核态切换回用户态,将数据从内核缓冲区读入用户缓冲区(即byte[] buf),期间CPU会参与拷贝,无法利用DMA

    3. 调用write方法,将数据从用户缓冲区byte[] buf)写入socket缓冲区,CPU参与拷贝

    4. 向网卡写数据,Java不具备该能力,得从用户态切换至内核态,调用操作系统的写能力,使用DMA将socket 缓冲区的数据写入网卡,不会使用CPU

    5. 中间环节较多,Java的IO实际不是物理设备级别的读写,而是缓存的复制,底层的真正读写是操作系统来完成的

      • 用户态与内核态的切换发生了3次,这个操作比较重量级

      • 数据拷贝了共4次

  • NIO优化

    • 通过DirectByteBuf

      • ByteBuffer.allocate(10) :HeapByteBuffer,使用JVM内存

      • ByteBuffer.allocateDirect(10): DirectByteBuffer,使用OS内存

    • 大部分步骤与优化前相同,唯有一点:Java可使用DirectByteBuf将堆外内存映射到 JVM内存中来直接访问使用

      graph LR
      磁盘 -- 拷贝1 --> 内核缓冲区 -..-> 用户缓冲区 -- 拷贝2 --> Socket缓冲区 -- 拷贝3 --> 网卡
      
      • 这块内存不受JVM垃圾回收的影响,内存地址固定,有助于IO读写

      • Java中的DirectByteBuf对象仅维护了此内存的虚引用,内存回收分成两步

    1. DirectByteBuf对象被垃圾回收,将虚引用加入引用队列
      1. 通过专门线程访问引用队列,根据虚引用释放堆外内存
    • 减少了一次数据拷贝,用户态与内核态的切换次数没有减少
    • 进一步优化(底层采用了linux 2.1后提供的sendFile方法)

      graph LR
      磁盘 -- 拷贝1 --> 内核缓冲区 -- 拷贝2 --> Socket缓冲区 -- 拷贝3 --> 网卡
      
      • Java中对应着两个 channel调用transferTo/transferFrom方法拷贝数据

        1. Java调用transferTo方法后,从Java程序的用户态切换至内核态,使用DMA将数据读入内核缓冲区,不使用CPU
        2. 数据从内核缓冲区传输到socket缓冲区,CPU参与拷贝
        3. 使用DMA将socket 缓冲区的数据写入网卡,不使用CPU
      • 只发生了一次用户态与内核态的切换

      • 数据拷贝了3次

    • 进一步优化(linux 2.4)

      graph LR
      磁盘 -- 拷贝1 --> 内核缓冲区 -- 拷贝2 --> 网卡
      内核缓冲区 -..-> Socket缓冲区 -..- 网卡
      
      • Java调用transferTo方法后,从Java程序的用户态切换至内核态,使用DMA将数据读入内核缓冲区,不使用CPU
      • 只将一些offset和length信息拷入socket缓冲区,几乎无消耗
      • 使用DMA将内核缓冲区的数据写入网卡,不使用CPU
    • 整个过程仅只发生了1次用户态与内核态的切换,数据拷贝了2次

AIO

  • 用来解决数据复制阶段的阻塞问题

    • 同步:在进行读写操作时,线程需要等待结果

    • 异步:在进行读写操作时,线程不必等待结果,将来由操作系统来通过回调方式由另外的线程来获得结果

    • 异步模型需要底层操作系统(Kernel)提供支持

      • Windows系统通过IOCP实现了真正的异步IO
      • Linux系统异步IO在2.6版本引入,但其底层实现还是用多路复用模拟了异步IO,性能没有优势
  • 文件AIO

    • AsynchronousFileChannel

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      try (AsynchronousFileChannel channel = AsynchronousFileChannel.open(Paths.get("data/data.txt"), StandardOpenOptIOn.READ)) {
      // 参数: ByteBuffer, 读取文件的启始位置, 附件, 回调对象
      ByteBuffer buffer = ByteBuffer.allocate(16);
      log.debug("read begin");
      channel.read(buffer, 0, buffer, new CompletIOnHandler<Integer, ByteBuffer>() {
      // 守护线程:其他线程运行结束后守护线程也会结束
      /**
      * 读取成功
      *
      * @param result 读取到的实际字节数
      * @param attachment 附件中的buffer对象
      */
      @Override
      public void completed(Integer result, ByteBuffer attachment) {
      log.debug("read completed");
      attachment.flip();
      debugAll(attachment);
      }

      // 读取失败
      @Override
      public void failed(Throwable exc, ByteBuffer attachment) {

      }
      });
      System.in.read();
      log.debug("read finish");
      } catch (IOExceptIOn e) {
      e.printStackTrace();
      }
      • 响应文件读取成功的是另一个线程Thread-5
      • 主线程并没有IO操作阻塞
  • 守护线程

    • 默认文件AIO使用的线程都是守护线程,最后要执行System.in.read()以避免守护线程意外结束
  • 网络 AIO

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    public class AIOServer {
    public static void main(String[] args) throws IOExceptIOn {
    AsynchronousServerSocketChannel ssc = AsynchronousServerSocketChannel.open();
    ssc.bind(new InetSocketAddress(8080));
    ssc.accept(null, new AcceptHandler(ssc));
    System.in.read();
    }

    private static void closeChannel(AsynchronousSocketChannel sc) {
    try {
    System.out.printf("[%s] %s close\n", Thread.currentThread().getName(), sc.getRemoteAddress());
    sc.close();
    } catch (IOExceptIOn e) {
    e.printStackTrace();
    }
    }

    private static class ReadHandler implements CompletIOnHandler<Integer, ByteBuffer> {
    private final AsynchronousSocketChannel sc;

    public ReadHandler(AsynchronousSocketChannel sc) {
    this.sc = sc;
    }

    @Override
    public void completed(Integer result, ByteBuffer attachment) {
    try {
    if (result == -1) {
    closeChannel(sc);
    return;
    }
    System.out.printf("[%s] %s read\n", Thread.currentThread().getName(), sc.getRemoteAddress());
    attachment.flip();
    System.out.println(Charset.defaultCharset().decode(attachment));
    attachment.clear();
    // 处理完第一个 read 时,需要再次调用 read 方法来处理下一个 read 事件
    sc.read(attachment, attachment, this);
    } catch (IOExceptIOn e) {
    e.printStackTrace();
    }
    }

    @Override
    public void failed(Throwable exc, ByteBuffer attachment) {
    closeChannel(sc);
    exc.printStackTrace();
    }
    }

    private static class WriteHandler implements CompletIOnHandler<Integer, ByteBuffer> {
    private final AsynchronousSocketChannel sc;

    private WriteHandler(AsynchronousSocketChannel sc) {
    this.sc = sc;
    }

    @Override
    public void completed(Integer result, ByteBuffer attachment) {
    // 如果作为附件的 buffer 还有内容,需要再次 write 写出剩余内容
    if (attachment.hasRemaining()) {
    sc.write(attachment);
    }
    }

    @Override
    public void failed(Throwable exc, ByteBuffer attachment) {
    exc.printStackTrace();
    closeChannel(sc);
    }
    }

    private static class AcceptHandler implements CompletIOnHandler<AsynchronousSocketChannel, Object> {
    private final AsynchronousServerSocketChannel ssc;

    public AcceptHandler(AsynchronousServerSocketChannel ssc) {
    this.ssc = ssc;
    }

    @Override
    public void completed(AsynchronousSocketChannel sc, Object attachment) {
    try {
    System.out.printf("[%s] %s connected\n", Thread.currentThread().getName(), sc.getRemoteAddress());
    } catch (IOExceptIOn e) {
    e.printStackTrace();
    }
    ByteBuffer buffer = ByteBuffer.allocate(16);
    // 读事件由 ReadHandler 处理
    sc.read(buffer, buffer, new ReadHandler(sc));
    // 写事件由 WriteHandler 处理
    sc.write(Charset.defaultCharset().encode("server hello!"), ByteBuffer.allocate(16), new WriteHandler(sc));
    // 处理完第一个 accpet 时,需要再次调用 accept 方法来处理下一个 accept 事件
    ssc.accept(null, this);
    }

    @Override
    public void failed(Throwable exc, Object attachment) {
    exc.printStackTrace();
    }
    }
    }

Netty

概述

概念

  • 异步的、基于事件驱动的网络应用框架,用于快速开发可维护、高性能的网络服务器和客户端

    • Netty的异步是指调用时的异步
    • Netty的IO模型仍基于多路复用
  • 优势

    • 解决TCP传输问题,如粘包、半包
    • 解决epoll空轮询导致CPU 100%的bug
    • 对API进行增强,更易用,如FastThreadLocal=>ThreadLocal,ByteBuf=>ByteBuffer

案例

  • 目标

    • 开发一个简单的服务器端和客户端

      • 客户端向服务器端发送hello, world

      • 服务器仅接收,不返回

  • 依赖

    1
    2
    3
    4
    5
    <dependency>
    <groupId>IO.Netty</groupId>
    <artifactId>Netty-all</artifactId>
    <versIOn>4.1.79.Final</versIOn>
    </dependency>
  • 服务端

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    // 服务器端启动器,负责组装Netty组件,启动服务器
    new ServerBootstrap()
    // 创建NIOEventLoopGroup,可以简单理解为`线程池 + Selector`
    .group(new NIOEventLoopGroup())
    // 选择服务Socket实现类,NIOServerSocketChannel表示基于NIO的服务器端实现,其它实现还有OIO(BIO)等
    .channel(NIOServerSocketChannel.class)
    // 添加的处理器是给SocketChannel用的,而不是给ServerSocketChannel
    // Leader负责连接,Worker(Child)负责读写,决定了Worker(Child)能执行哪些操作(handler)
    .childHandler(
    // Channel代表和客户端进行数据读写的通道,Initializer初始化器,负责添加别的handler
    // ChannelInitializer处理器(仅执行一次)
    // 待客户端SocketChannel建立连接后,执行initChannel以便添加更多的处理器
    new ChannelInitializer<NIOSocketChannel>() {
    // 添加具体的handler
    @Override
    protected void initChannel(NIOSocketChannel nIOSocketChannel) throws ExceptIOn {
    // 将ByteBuf转为String
    nIOSocketChannel.pipeline().addLast(new StringDecoder());
    nIOSocketChannel.pipeline().addLast(
    // 自定义handler
    new ChannelInboundHandlerAdapter() {
    // 读事件
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws ExceptIOn {
    // 打印字符串
    log.info("message: {}", msg);
    }
    }
    );
    }
    }
    )
    // 绑定监听端口
    .bind(8080);
  • 客户端

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    // 创建启动类
    new Bootstrap()
    // 添加EventLoop
    .group(new NIOEventLoopGroup())
    // 选择客户端channel实现
    .channel(NIOSocketChannel.class)
    // 添加处理器
    .handler(new ChannelInitializer<NIOSocketChannel>() {
    // 在连接建立后被调用
    @Override
    protected void initChannel(NIOSocketChannel nIOSocketChannel) throws ExceptIOn {
    nIOSocketChannel.pipeline().addLast(new StringEncoder());
    }
    })
    // 连接服务器
    .connect(new InetSocketAddress("localhost", 8080))
    // 阻塞方法,直到连接建立
    // Netty中很多方法都是异步的,如connect,这时需要使用sync方法等待connect建立连接完毕
    .sync()
    // 获取channel对象,它即为通道抽象,可以进行数据读写操作
    .channel()
    // 写入消息并清空缓冲区
    .writeAndFlush("Hello World!");
  • 总结

    • channel:数据通道
    • msg:流动的数据
      • 最开始输入是ByteBuf,经过pipeline的加工会变成其它类型对象,最后输出又变成ByteBuf
    • handler:数据的处理工序
      • 工序有多道,合在一起就是pipeline,pipeline负责发布事件(读、读取完成…)传播给每个handler, handler对自己感兴趣的事件进行处理(重写了相应事件处理方法)
      • handler分Inbound和Outbound
    • eventLoop:处理数据的工人
      • 可以管理多个channel的IO操作,一旦工人负责了某个channel就要负责到底(绑定)
      • 既可以执行IO操作,也可以进行任务处理,每位工人有任务队列,队列里可以堆放多个channel的待处理任务,任务分为普通任务、定时任务
      • 工人按照pipeline 顺序,依次按照handler的规划(代码)处理数据,可以为每道工序指定不同的工人

组件

EventLoop

概念

  • EventLoop

    • 事件循环对象,本质是一个单线程执行器(同时维护了一个Selector),里面有run方法处理Channel上源源不断的IO事件

    • 继承关系(接口多继承)

      • 继承1:j.u.c.ScheduledExecutorService,包含线程池中所有的方法
      • 继承2:Netty自己的OrderedEventExecutor
        • 提供boolean inEventLoop(Thread thread)方法判断一个线程是否属于此 EventLoop
        • 提供了parent方法查看自己属于哪个EventLoopGroup
  • EventLoopGrop

    • 事件循环组,是一组EventLoop,Channel一般会调用EventLoopGroup的 register方法绑定其中一个EventLoop,后续该Channel上的IO事件都由此 EventLoop处理(保证IO事件处理时的线程安全)
    • 继承Netty自己的EventExecutorGroup
      • 实现Iterable接口提供遍历EventLoop的能力
      • next方法获取集合中下一个EventLoop
  • 优雅关闭

    • 优雅关闭shutdownGracefully方法,会先切换EventLoopGroup到关闭状态从而拒绝新的任务的加入,然后在任务队列的任务都处理完成后,停止线程的运行。从而确保整体应用是在正常有序的状态下退出的

处理IO事件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// TODO 创建一个独立的EventLoopGroup
DefaultEventLoopGroup group = new DefaultEventLoopGroup();
new ServerBootstrap()
// TODO 划分leader和worker
// leader(参数1)只负责ServerSocketChannel上处理accept事件
// worker(参数2)只负责SocketChannel上的读写操作
.group(new NIOEventLoopGroup(), new NIOEventLoopGroup(2))
.channel(NIOServerSocketChannel.class)
.childHandler(new ChannelInitializer<NIOSocketChannel>() {
@Override
protected void initChannel(NIOSocketChannel nIOSocketChannel) throws ExceptIOn {
nIOSocketChannel.pipeline().addLast("handle1", new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws ExceptIOn {
ByteBuf buf = (ByteBuf) msg;
log.info(buf.toString(StandardCharsets.UTF_8));
// 将消息传递给下一个handler
ctx.fireChannelRead(msg);
}
});
// 指定额外的EventLoopGroup
nIOSocketChannel.pipeline().addLast(group, "handle2", new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws ExceptIOn {
ByteBuf buf = (ByteBuf) msg;
log.info(buf.toString(StandardCharsets.UTF_8));
}
});
}
})
.bind(8080);

handler切换线程源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
// 下一个 handler 的事件循环是否与当前的事件循环是同一个线程
// 返回下一个handler的eventLoop
EventExecutor executor = next.executor();
// 判断当前handler中的线程和下一个executor(eventLoop)的线程是否为同一线程
if (executor.inEventLoop()) {
// 是,直接调用
next.invokeChannelRead(m);
} else {
// 不是,将要执行的代码作为任务提交给下一个事件循环处理(换人)
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}
  • 关键代码 IO.Netty.channel.AbstractChannelHandlerContext#invokeChannelRead()
  • 若两个handler绑定的是同一线程则直接调用,否则把要调用的代码封装为一个任务对象,由下一个handler的线程调用

处理普通任务

1
2
3
4
5
6
7
8
9
group.next().submit(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedExceptIOn e) {
throw new RuntimeExceptIOn(e);
}
log.info("submit");
});
log.info("main");

处理定时任务

1
group.next().scheduleAtFixedRate(() -> log.info("schedule"), 0, 1, TimeUnit.SECONDS);

Channel

主要作用

  • close():关闭channel

  • closeFuture():处理channel的关闭

    • sync:同步等待channel关闭
    • addListener:异步等待channel关闭
  • pipeline() :添加处理器

  • write() :将数据写入

  • writeAndFlush() :将数据写入并刷出

ChannelFuture

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// 带有future,promise的类型都是和异步方法配套使用,用来处理结果
ChannelFuture channelFuture = new Bootstrap()
.group(new NIOEventLoopGroup())
.channel(NIOSocketChannel.class)
.handler(new ChannelInitializer<NIOSocketChannel>() {
@Override
protected void initChannel(NIOSocketChannel nIOSocketChannel) throws ExceptIOn {
nIOSocketChannel.pipeline().addLast(new StringEncoder());
}
})
// 连接到服务器,返回ChannelFuture对象,它可以用channel()方法获取Channel对象
// 异步非阻塞,main线程发起调用,由NIOEventLoopGroup中的线程执行
.connect(new InetSocketAddress("localhost", 8080));
// TODO 方法1 使用sync方法同步处理线程,后续由发起sync的线程等待和处理
// connect方法是异步的,不等连接建立方法执行就返回了
// 因此channelFuture对象中不能"立刻"获得到正确的Channel对象
// 阻塞当前线程直到nIO线程建立完毕
channelFuture.sync();
Channel channel = channelFuture.channel();
channel.writeAndFlush("Hello World");
// TODO 方法2 使用addListener(回调对象)方法异步处理结果,后续交给其他线程处理
channelFuture.addListener(new ChannelFutureListener() {
// 在nIO线程建立好后调用
@Override
public void operatIOnComplete(ChannelFuture channelFuture) throws ExceptIOn {
channelFuture.channel().writeAndFlush("Hello world");
}
});

CloseFuture

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
NIOEventLoopGroup group = new NIOEventLoopGroup();
ChannelFuture channelFuture = new Bootstrap()
.group(group)
.channel(NIOSocketChannel.class)
.handler(new ChannelInitializer<NIOSocketChannel>() {
@Override
protected void initChannel(NIOSocketChannel nIOSocketChannel) throws ExceptIOn {
nIOSocketChannel.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
nIOSocketChannel.pipeline().addLast(new StringEncoder());
}
})
.connect(new InetSocketAddress("localhost", 8080));
Channel channel = channelFuture.sync().channel();
log.debug("connected: {}", channel);
new Thread(() -> {
Scanner scanner = new Scanner(System.in);
while (true) {
String line = scanner.nextLine();
if ("exit".equals(line)) {
// 异步操作
channel.close();
break;
}
channel.writeAndFlush(line);
}
}, "input").start();
// TODO 获取ClosedFuture对象
ChannelFuture closeFuture = channel.closeFuture();
// 方法1:同步处理关闭
closeFuture.sync();
log.debug("disconnected");
group.shutdownGracefully();
// 方法2:异步处理关闭
closeFuture.addListener((ChannelFutureListener) channelFuture1 -> {
log.debug("disconnected");
// 优雅关闭
group.shutdownGracefully();
});

异步总结

  • 单线程没法异步提高效率,必须配合多线程、多核CPU才能发挥异步的优势
  • 异步并没有缩短响应时间,反而有所增加
  • 合理进行任务拆分,也是利用异步的关键
  • 用响应时间换取吞吐量

Future&Promise

概述

  • 异步处理时常用的两个接口
  • JDK Future:只能同步等待任务结束(或成功或失败)才能得到结果
    • Netty Future:可以同步等待任务结束得到结果,也可以异步方式得到结果,但都是要等任务结束
      • Netty Promise:不仅有Netty Future的功能,而且脱离了任务独立存在,只作为两个线程间传递结果的容器
功能/名称 JDK Future Netty Future Promise
cancel 取消任务 - -
isCanceled 任务是否取消 - -
isDone 任务是否完成,不能区分成功失败 - -
get 获取任务结果,阻塞等待 - -
getNow - 获取任务结果,非阻塞,还未产生结果时返回 null -
await - 等待任务结束,如果任务失败,不会抛异常,而是通过 isSuccess 判断 -
sync - 等待任务结束,如果任务失败,抛出异常 -
isSuccess - 判断任务是否成功 -
cause - 获取失败信息,非阻塞,如果没有失败,返回null -
addLinstener - 添加回调,异步接收结果 -
setSuccess - - 设置成功结果
setFailure - - 设置失败结果

案例

  • JDKFuture

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    // 线程池
    ExecutorService service = Executors.newFixedThreadPool(2);
    // 提交任务
    Future<Integer> future = service.submit(() -> {
    try {
    log.debug("执行计算");
    Thread.sleep(1000);
    return 50;
    } catch (InterruptedException e) {
    throw new RuntimeException(e);
    }
    });
    log.debug("等待结果");
    // 主线程通过future获取结果
    log.debug("结果: {}", future.get());
  • NettyFuture

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    NioEventLoopGroup group = new NioEventLoopGroup(2);
    EventLoop eventLoop = group.next();
    Future<Integer> future = eventLoop.submit(() -> {
    // [nioEventLoopGroup-2-1] c.m.n.e.f.NettyFutureDemo - 执行计算
    log.debug("执行计算");
    Thread.sleep(1000);
    return 50;
    });
    log.debug("等待结果");
    // 方式1,[main] c.m.n.e.f.NettyFutureDemo - 结果: 50
    // 主线程通过future获取结果
    log.debug("结果: {}", future.get());
    // 方式2,[nioEventLoopGroup-2-1] c.m.n.e.f.NettyFutureDemo - 结果: 50
    future.addListener(future1 -> log.debug("结果: {}", future1.getNow()));
  • NettyPromise

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    // 准备EventLoop对象
    EventLoop eventLoop = new NioEventLoopGroup().next();
    // 可以主动创建promise
    DefaultPromise<Integer> promise = new DefaultPromise<>(eventLoop);
    new Thread(() -> {
    log.debug("执行计算");
    try {
    int i = 1 / 0;
    Thread.sleep(1000);
    } catch (Exception e) {
    promise.setFailure(e);
    }
    // 向promise填充结果
    promise.setSuccess(50);
    }).start();
    // 接收结果
    log.debug("等待结果");
    promise.addListener(future1 -> log.debug("结果: {}", future1.getNow()));

Handler&Pipeline

概念

  • ChannelHandler:用来处理Channel上的各种事件,分为入站和出站。所有 ChannelHandler被连成一串就是Pipeline

    • 入站处理器: ChannelInboundHandlerAdapter的子类,用来读取客户端数据,写回结果
    • 出站处理器:ChannelOutboundHandlerAdapter的子类,对写回结果进行加
  • 每个Channel是一个产品的加工车间,Pipeline是车间中的流水线,ChannelHandler是流水线上的各道工序,ByteBuf是原材料,经过很多工序的加工:先经过一道道入站工序,再经过一道道出站工序最终变成产品

Pipeline

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
new ServerBootstrap()
.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
// 通过channel拿到pipeline
ChannelPipeline pipeline = nioSocketChannel.pipeline();
// 添加处理器(双向链表) head <-> h1 <-> h2 <-> h3 <-> h4 <-> tail
// 处理顺序 h1, h2, h4, h3
// 入站从head开始,出站从tail开始
pipeline.addLast("h1", new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("h1.channelRead[inbound]: " + msg);
ByteBuf buf = (ByteBuf) msg;
String str = buf.toString(StandardCharsets.UTF_8);
// 将数据传递给下一个handler,若不调用,调用链会断开
super.channelRead(ctx, str);
}
});
pipeline.addLast("h2", new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("h2.channelRead[inbound]: " + msg);
User user = new User(msg.toString());
String str = user.getClass() + ":[username = " + user.getName() + "]";
// 从tail向前寻找出站处理器
nioSocketChannel.writeAndFlush(ctx.alloc().buffer().writeBytes(str.getBytes(StandardCharsets.UTF_8)));
// 从当前处理器向前寻找出站处理器
ctx.writeAndFlush(ctx.alloc().buffer().writeBytes(str.getBytes(StandardCharsets.UTF_8)));
}
});
// 出站处理器只有写出数据才会被触发
pipeline.addLast("h3", new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("h3.channelRead[outbound]");
super.write(ctx, msg, promise);
}
});
pipeline.addLast("h4", new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("h4.channelRead[outbound]");
super.write(ctx, msg, promise);
}
});
}
})
.bind(8080);
1
2
3
4
5
@Data
@AllArgsConstructor
static class User {
private String name;
}
  • InboundHandler从head顺序执行,Outbound从tail执行。ChannelPipeline的实现是一 ChannelHandlerContext(包装了ChannelHandler)组成的双向链表

  • 入站处理器中,ctx.fireChannelRead(msg) 调用下一个入站处理器
  • ctx.channel().write(msg):从尾部开始查找出站处理器
  • ctx.write(msg):从当前节点找上一个出站处理器
  • 服务端pipeline触发的原始流程,图中数字代表了处理步骤的先后次序

EmbeddedChannel

  • Netty提供的用来测试的channel,用来绑定多个handler,使用时不需要启动服务端和客户端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
ChannelInboundHandlerAdapter h1 = new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("1");
super.channelRead(ctx, msg);
}
};
ChannelInboundHandlerAdapter h2 = new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("2");
super.channelRead(ctx, msg);
}
};
ChannelOutboundHandlerAdapter h3 = new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("3");
super.write(ctx, msg, promise);
}
};
ChannelOutboundHandlerAdapter h4 = new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("4");
super.write(ctx, msg, promise);
}
};
EmbeddedChannel channel = new EmbeddedChannel(h1, h2, h3, h4);
// 模拟入站操作
channel.writeInbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello".getBytes(StandardCharsets.UTF_8)));
// 模拟出站操作
channel.writeOutbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("world".getBytes(StandardCharsets.UTF_8)));

ByteBuf

概念

  • 对字节数据的封装

  • 组成

    • 最开始读写指针都在0位置

  • 优势

    • 池化:可以重用池中ByteBuf实例,更节约内存,减少内存溢出的可能
    • 读写指针分离:不需要像ByteBuffer一样切换读写模式
    • 可以自动扩容
    • 支持链式调用:使用更流畅
    • 很多地方体现零拷贝:例如slice、duplicate、CompositeByteBuf

操作

  • 创建

    • 堆内存

      • 创建池化基于堆的ByteBuf

        1
        ByteBuf buffer = ByteBufAllocator.DEFAULT.heapBuffer();
    • 直接内存(默认)

      • 创建池化基于直接内存的ByteBuf

        1
        2
        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
        ByteBuf buffer = ByteBufAllocator.DEFAULT.directBuffer();
      • 创建和销毁的代价昂贵,但读写性能高(少一次内存复制),适合配合池化功能一起用

      • 对GC压力小,因为这部分内存不受JVM垃圾回收的管理,但要注意及时主动释放

    • log工具

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      private static void log(ByteBuf buffer) {
      int length = buffer.readableBytes();
      int rows = length / 16 + (length % 15 == 0 ? 0 : 1) + 4;
      StringBuilder buf = new StringBuilder(rows * 80 * 2)
      .append("read index:").append(buffer.readerIndex())
      .append(" write index:").append(buffer.writerIndex())
      .append(" capacity:").append(buffer.capacity())
      .append(NEWLINE);
      appendPrettyHexDump(buf, buffer);
      System.out.println(buf.toString());
      }
    • 池化vs非池化

      • 池化的最大意义在于可以重用ByteBuf

        • 没有池化,每次都得创建新的ByteBuf实例,这个操作对直接内存代价昂贵,就算是堆内存也会增加GC压力

        • 有了池化,可以重用池中ByteBuf实例,并且采用了与jemalloc类似的内存分配算法提升分配效率

        • 高并发时,池化功能更节约内存,减少内存溢出的可能

      • 池化功能是否开启,可以通过下面的系统环境变量来设置

        1
        -DIO.Netty.allocator.type={unpooled|pooled}
        • 4.1后,非Android平台默认启用池化实现,Android平台启用非池化实现
  • 写入

    • 省略一些不重要的方法

    • 这些方法的未指明返回值的返回的都是ByteBuf,可以链式调用

    • 网络传输,默认习惯是Big Endian

    • set开头的一系列方法也可以写入数据,但不会改变写指针位置

方法签名 含义 备注
writeBoolean(boolean value) 写入boolean值 用一字节01/00代表true/false
writeByte(int value) 写入byte值
writeShort(int value) 写入short值
writeInt(int value) 写入int值 Big Endian,从低位向高位写入,即0x250,写入后00 00 02 50
writeIntLE(int value) 写入int值 Little Endian,从高位向低位写入,即0x250,写入后50 02 00 00
writeLong(long value) 写入long值
writeChar(int value) 写入char值
writeFloat(float value) 写入float值
writeDouble(double value) 写入double值
writeBytes(ByteBuf src) 写入Netty的ByteBuf
writeBytes(byte[] src) 写入byte[]
writeBytes(ByteBuffer src) 写入NIO的ByteBuffer
int writeCharSequence(CharSequence sequence, Charset charset) 写入字符串
  • 扩容

    • 若写入后数据大小未超过512,则选择下一个16的整数倍

      • 例如写入后大小为12 ,则扩容后capacity是16
    • 如果写入后数据大小超过 512,则选择下一个2^n^

      • 例如写入后大小为513,则扩容后capacity是2^10^=1024(2^9^=512已经不够了)
    • 扩容不能超过max capacity否则会报错

  • 读取

    • 如果需要重复读取,可以在read前先做个标记mark

      1
      2
      3
      buffer.markReaderIndex();
      System.out.println(buffer.readInt());
      log(buffer);
    • 时要重复读取的话,重置到标记位置reset

      1
      2
      buffer.resetReaderIndex();
      log(buffer);
    • 还有种办法是采用get开头的一系列方法,这些方法不会改变read index

内存回收

  • 由于Netty中有堆外内存的ByteBuf实现,堆外内存最好是手动来释放,而不是等GC垃圾回收

    • UnpooledHeapByteBuf使用JVM内存,只需等GC回收内存即可

    • UnpooledDirectByteBuf使用直接内存,需要特殊的方法来回收内存

    • PooledByteBuf和它的子类使用池化机制,需要更复杂的规则来回收内存

    • 回收内存的源码实现,请关注下面方法的不同实现protected abstract void deallocate()

  • Netty采用引用计数法来控制回收内存,每个ByteBuf都实现了ReferenceCounted接口

    • 每个ByteBuf对象的初始计数为1

    • 调用release方法计数减1,如果计数为0,ByteBuf内存被回收

    • 调用retain方法计数加1,表示调用者没用完之前,其它handler即使调用了release 也不会造成回收

    • 当计数为0时,底层内存会被回收,这时即使ByteBuf对象还在,其各个方法均无法正常使用

  • 基本规则

    • 谁最后使用,谁负责release
    • 起点,对NIO实现来讲,在 IO.Netty.channel.nIO.AbstractNIOByteChannel.NIOByteUnsafe#read方法中首次创建ByteBuf放入pipeline(line 163 pipeline.fireChannelRead(byteBuf)
  • 入站ByteBuf处理原则

    • 对原始ByteBuf不做处理,调用ctx.fireChannelRead(msg)向后传递,这时无须release
    • 将原始ByteBuf转换为其它类型的Java对象,这时ByteBuf就没用了,必须 release
    • 如果不调用ctx.fireChannelRead(msg)向后传递,那么也必须release
    • 如果ByteBuf没有成功传递到下一个ChannelHandler,必须release
    • 假设消息一直向后传,那么TailContext会负责释放未处理消息(原始的 ByteBuf)
  • 出站ByteBuf处理原则

    • 出站消息最终都会转为ByteBuf输出一直向前传,由HeadContextflushrelease
  • 异常处理原则

    • 有时候不清楚ByteBuf被引用了多少次,但又必须彻底释放,可以循环调用release直到返回true
  • TailContext释放未处理消息逻辑

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    // IO.Netty.channel.DefaultChannelPipeline#onUnhandledInboundMessage(java.lang.Object)
    protected void onUnhandledInboundMessage(Object msg) {
    try {
    logger.debug(
    "Discarded inbound message {} that reached at the tail of the pipeline. " +
    "Please check your pipeline configuratIOn.", msg);
    } finally {
    ReferenceCountUtil.release(msg);
    }
    }
    • 具体代码

      1
      2
      3
      4
      5
      6
      7
      // IO.Netty.util.ReferenceCountUtil#release(java.lang.Object)
      public static boolean release(Object msg) {
      if (msg instanceof ReferenceCounted) {
      return ((ReferenceCounted) msg).release();
      }
      return false;
      }

零拷贝

  • slice

    • 对原始ByteBuf进行切片成多个ByteBuf,切片后的ByteBuf还是使用原始ByteBuf的内存,切片后的ByteBuf维护独立的readwrite指针

    • 切片后的max capacity被固定为这个区间的大小,不能追加write

    • 释放原有ByteBuf内存可能会出异常(使用retaion解决)

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(10);
      buf.writeBytes(new byte[]{'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j'});
      // 在切片过程中没有发生数据复制
      // 无参:从原始ByteBuf的read index到write index间的内容进行切片
      // 有参:起始位置,参数长度
      ByteBuf buf1 = buf.slice(0, 5);
      buf1.retain();
      ByteBuf buf2 = buf.slice(5, 5);
      buf2.retain();
      // 释放原始ByteBuf内存
      buf.release();
  • duplicate

    • 与原始ByteBuf使用同一块底层内存,只是读写指针是独立的并没有max capacity的限制

  • copy

    • 将底层内存数据进行深拷贝,无论读写都与原始ByteBuf无关
    • 不是零拷贝
  • CompositeByteBuf

    • 将多个ByteBuf合并为一个逻辑上的ByteBuf,避免拷贝

    • 是一个组合的ByteBuf,内部维护了一个Component数组,每个Component管理一个ByteBuf,记录这个ByteBuf相对于整体偏移量等信息,代表着整体中某一段的数据

      • 优点:对外是一个虚拟视图,组合时不会产生内存复制

      • 缺点:复杂,多次操作会带来性能损耗

    • 需要注意引用计数

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer();
    buf1.writeBytes(new byte[]{1, 2, 3, 4, 5});
    ByteBuf buf2 = ByteBufAllocator.DEFAULT.buffer();
    buf2.writeBytes(new byte[]{6, 7, 8, 9, 10});
    // 会产生多次数据复制,影响性能
    ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();
    buf.writeBytes(buf1).writeBytes(buf2);
    // 不会产生数据复制
    CompositeByteBuf compositeBuf = ByteBufAllocator.DEFAULT.compositeBuffer();
    // 参数1:是否自动调整指针的位置
    compositeBuf.addComponents(true, buf1, buf2);
  • Unpooled

    • 一个工具类,提供了非池化的ByteBuf创建、组合、复制等操作
    • 零拷贝相关的wrappedBuffer方法,可以用来包装ByteBuf
    1
    2
    3
    4
    5
    6
    7
    ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer(5);
    buf1.writeBytes(new byte[]{1, 2, 3, 4, 5});
    ByteBuf buf2 = ByteBufAllocator.DEFAULT.buffer(5);
    buf2.writeBytes(new byte[]{6, 7, 8, 9, 10});
    // 当包装 ByteBuf 个数超过一个时, 底层使用了 CompositeByteBuf
    ByteBuf buf3 = Unpooled.wrappedBuffer(buf1, buf2);
    System.out.println(ByteBufUtil.prettyHexDump(buf3));

粘包半包

分析

  • 本质是因TCP是流式协议,消息无边界

  • 粘包

    • 现象,发送abc def,接收 abcdef

    • 原因

      • 应用层:接收方ByteBuf设置太大(Netty默认1024)
      • 滑动窗口(TCP层面):设发送方256bytes表示一个完整报文,但由于接收方处理不及时且窗口大小足够大,这256 bytes字节就会缓冲在接收方的滑动窗口中,当滑动窗口中缓冲了多个报文就会粘包
      • Nagle算法(TCP层面):会造成粘包
  • 半包

    • 现象,发送abcdef,接收abc def

    • 原因

      • 应用层:接收方ByteBuf小于实际发送数据量
      • 滑动窗口(TCP层面):假设接收方的窗口只剩128bytes,发送方的报文大小是256bytes,放不下只能先发送前128bytes,等ack后才能发送剩余部分,造成半包
      • MSS限制(数据链路层):当发送的数据超过MSS限制后,会将数据切分发送,造成半包

滑动窗口

  • TCP以一个段(segment)为单位,每发送一个段就需要进行一次确认应答(ack)处理,但这么做包的往返时间越长性能就越差

  • 为解决此问题,引入窗口概念,窗口大小即决定了无需等待应答而可以继续发送的数据最大值

  • 窗口实际就起到一个缓冲区的作用,同时也能起到流量控制的作用

    • 图中深色的部分即要发送的数据,高亮的部分即窗口
    • 窗口内的数据才允许被发送,当应答未到达前,窗口必须停止滑动
    • 如果1001~2000这个段的数据ack回来了,窗口就可以向前滑动
    • 接收方也会维护一个窗口,只有落在窗口内的数据才能允许接收

  • Nagle算法
    • 即使发送一个字节也需要加入TCP头和ip头,也就是总字节数会使用41bytes,非常不经济。因此为了提高网络利用率,TCP希望尽可能发送足够大的数据,这就是Nagle算法产生的缘由
      • 该算法是指发送端即使还有应该发送的数据,但如果这部分数据很少的话,则进行延迟发送
      • 如果SO_SNDBUF的数据达到MSS,则需要发送
      • 如果SO_SNDBUF中含有FIN(表示需要连接关闭)这时将剩余数据发送,再关闭
      • 如果TCP_NODELAY = true,则需要发送
      • 已发送的数据都收到ack时,则需要发送
      • 上述条件不满足,但发生超时(一般为200ms)则需要发送
      • 除上述情况,延迟发送
  • MSS限制
    • 链路层对一次能够发送的最大数据有限制,这个限制称之为 MTU(maximum transmission unit),不同的链路设备的 MTU 值也有所不同,例如

    • 以太网的MTU是1500

    • FDDI(光纤分布式数据接口)的MTU是4352

    • 本地回环地址的MTU是65535 - 本地测试不走网卡

    • MSS是最大段长度(maximum segment size),它是MTU刨去TCP头和ip头后剩余能够作为数据传输的字节数

    • ipv4 TCP头占用20bytes,ip头占用20bytes,因此以太网MSS的值为1500 - 40 = 1460

    • TCP在传递大量数据时,会按照MSS大小将数据进行分割发送

    • MSS的值在三次握手时通知对方自己MSS的值,然后在两者之间选择一个小值作为MSS

解决

短链接

  • 发一个包建立一次连接,这样连接建立到连接断开之间就是消息的边界
  • 能解决粘包
  • 缺点
    • 效率低
    • 不能解决半包

固定长度

  • 每条消息采用固定长度
  • 缺点:数据包的大小不好把握
    • 长度定的太大,浪费
    • 长度定的太小,对某些数据包又显得不够
1
ch.pipeline().addLast(new FixedLengthFrameDecoder(8));

固定分隔符

  • 每条消息采用分隔符,如\n
  • 缺点:处理字符数据比较合适,但如果内容本身包含了分隔符(字节数据常常会有此情况),那么就会解析错误
1
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));

预设长度

  • 每条消息分为head和body,head中包含body的长度

  • 在发送消息前,先约定用定长字节表示接下来数据的长度

    1
    2
    // 最大长度,长度偏移,长度占用字节,长度调整,剥离字节数
    ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 0, 1, 0, 1));
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public static void main(String[] args) {
EmbeddedChannel channel = new EmbeddedChannel(
// 4字节长度,2字节版本号,收到的内容只保留正文
new LengthFieldBasedFrameDecoder(1024, 0, 4, 2, 6),
new LoggingHandler(LogLevel.DEBUG)
);
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();
send(buf, (short) 1,"Hello World!");
send(buf, (short) 2, "Hello Java!");
channel.writeInbound(buf);
}

private static void send(ByteBuf buf, short version, String str) {
byte[] bytes = str.getBytes(StandardCharsets.UTF_8);
// 写入长度和内容
buf.writeInt(bytes.length).writeShort(version).writeBytes(bytes);
}

协议设计

概述

  • TCP/IP中消息传输基于流的方式,没有边界

  • 协议的目的就是划定消息的边界,制定通信双方要共同遵守的通信规则

自定义

要素

  • 魔数,用来在第一时间判定是否是无效数据包
  • 版本号,可以支持协议的升级
  • 序列化算法,消息正文到底采用哪种序列化反序列化方式,可以由此扩展,如:JSON、protobuf、hessian、JDK
  • 指令类型,是登录、注册、单聊、群聊… 跟业务相关
  • 请求序号,为了双工通信,提供异步能力
  • 正文长度
  • 消息正文

编解码器

1
2
// ByteToMessageCodec MUST NOT annotated with @Sharable.
public class MessageCodec extends ByteToMessageCodec<Message> {}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
@Slf4j
/**
* 必须和LengthFieldBasedFrameDecoder一起使用,以确保能够接收到完整的ByteBuf
*/
@ChannelHandler.Sharable
public class MessageCodecSharable extends MessageToMessageCodec<ByteBuf, Message> {

@Override
protected void encode(ChannelHandlerContext ctx, Message msg, List<Object> out) throws Exception {
ByteBuf byteBuf = ctx.alloc().buffer();
// 6个字节的魔数
byteBuf.writeBytes("swan".getBytes(StandardCharsets.UTF_8));
// 1个字节的版本
byteBuf.writeByte(1);
// 1个字节的序列方式 0代表JDK,1代表JSON
byteBuf.writeByte(0);
// 1个字节的指令类型
byteBuf.writeByte(msg.getMessageType());
// 4个字节的请求序号
byteBuf.writeInt(msg.getSequenceId());
// 无意义,对齐填充
byteBuf.writeByte(0xff);
// 获取内容的字节数组
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(baos);
oos.writeObject(msg);
byte[] bytes = baos.toByteArray();
// 4个字节的内容长度
byteBuf.writeInt(bytes.length);
// 内容
byteBuf.writeBytes(bytes);
out.add(byteBuf);
}

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
ByteBuf magicNum = msg.readBytes(4);
byte version = msg.readByte();
byte serialType = msg.readByte();
byte messageType = msg.readByte();
int sequenceId = msg.readInt();
msg.readByte();
int contentLen = msg.readInt();
byte[] content = new byte[contentLen];
msg.readBytes(content, 0, contentLen);
Message message;
switch (sequenceId) {
case 0: {
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(content));
message = (Message) ois.readObject();
break;
}
default: {
throw new RuntimeException("未知的序列化类型");
}
}
log.debug("{}:{}:{}:{}:{}:{}", magicNum.toString(StandardCharsets.UTF_8), version, serialType, messageType, sequenceId, contentLen);
log.debug("{}", message);
out.add(message);
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// 没有状态信息的handler是线程安全的
LoggingHandler LOGGING_HANDLER = new LoggingHandler();

EmbeddedChannel channel = new EmbeddedChannel(
// 定义帧解码器
// 只要记录了多次消息间的状态就是线程不安全的
new LengthFieldBasedFrameDecoder(1024, 12, 4, 0, 0),
LOGGING_HANDLER,
new MessageCodec()
);
LoginRequestMessage message = new LoginRequestMessage("Tom", "123", "汤姆猫");
// TODO 出站测试
channel.writeOutbound(message);
// TODO 入站测试
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();
new MessageCodec().encode(null, message, buf);
channel.writeInbound(buf);
// TODO 半包测试
buf = ByteBufAllocator.DEFAULT.buffer();
new MessageCodec().encode(null, message, buf);
ByteBuf buf1 = buf.slice(0, 100);
ByteBuf buf2 = buf.slice(100, buf.readableBytes() - 100);
// 使用writeInbound会调用release方法
buf1.retain();
// java.lang.IndexOutOfBoundsException
channel.writeInbound(buf1);
channel.writeInbound(buf2);

@Sharable

  • 该handler可以在多线程下被共享使用,只需创建一个实例
  • 当handler不保存状态时,就可以安全地在多线程下被共享
  • 对于编解码器类,不能继承ByteToMessageCodec或CombinedChannelDuplexHandler父类,他们的构造方法对@Sharable有限制
  • 如果能确保编解码器不会保存状态,可以继承MessageToMessageCodec父类

案例

空闲检测

  • 连接假死

    • 原因

      • 网络设备出现故障,例如网卡,机房等,底层的 TCP 连接已经断开了,但应用程序没有感知到,仍然占用着资源。

      • 公网网络不稳定,出现丢包。如果连续出现丢包,这时现象就是客户端数据发不出去,服务端也一直收不到数据,就这么一直耗着

      • 应用程序线程阻塞,无法进行数据读写

    • 问题

      • 假死的连接占用的资源不能自动释放

      • 向假死的连接发送数据,得到的反馈是发送超时

  • 服务端解决

    • 每隔一段时间就检查这段时间内是否接收到客户端数据,没有就可以判定为连接假死
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    // 用来判断是不是 读空闲时间过长,或 写空闲时间过长
    // 5s 内如果没有收到 channel 的数据,会触发一个 IdleState#READER_IDLE 事件
    ch.pipeline().addLast(new IdleStateHandler(5, 0, 0));
    // ChannelDuplexHandler 可以同时作为入站和出站处理器
    ch.pipeline().addLast(new ChannelDuplexHandler() {
    // 用来触发特殊事件
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception{
    IdleStateEvent event = (IdleStateEvent) evt;
    // 触发了读空闲事件
    if (event.state() == IdleState.READER_IDLE) {
    log.debug("已经 5s 没有读到数据了");
    ctx.channel().close();
    }
    }
    });
  • 客户端定时心跳

    • 客户端可以定时向服务器端发送数据,只要这个时间间隔小于服务器定义的空闲检测的时间间隔,那么就能防止前面提到的误判,客户端可以定义如下心跳处理器
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    // 用来判断是不是 读空闲时间过长,或 写空闲时间过长
    // 3s 内如果没有向服务器写数据,会触发一个 IdleState#WRITER_IDLE 事件
    ch.pipeline().addLast(new IdleStateHandler(0, 3, 0));
    // ChannelDuplexHandler 可以同时作为入站和出站处理器
    ch.pipeline().addLast(new ChannelDuplexHandler() {
    // 用来触发特殊事件
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception{
    IdleStateEvent event = (IdleStateEvent) evt;
    // 触发了写空闲事件
    if (event.state() == IdleState.WRITER_IDLE) {
    // log.debug("3s 没有写数据了,发送一个心跳包");
    ctx.writeAndFlush(new PingMessage());
    }
    }
    });

序列化

  • 序列化,反序列化主要用在消息正文的转换上

    • 序列化时,需要将Java对象变为要传输的数据(可以是byte[],或JSON等,最终都需要变成byte[]

    • 反序列化时,需要将传入的正文数据还原成Java对象,便于处理

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    public interface Serializer {

    // 反序列化方法
    <T> T deserialize(Class<T> clazz, byte[] bytes);

    // 序列化方法
    <T> byte[] serialize(T object);

    enum Algorithm implements Serializer {

    Java {
    @Override
    public <T> T deserialize(Class<T> clazz, byte[] bytes) {
    try {
    ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
    return (T) ois.readObject();
    } catch (IOException | ClassNotFoundException e) {
    throw new RuntimeException("反序列化失败", e);
    }
    }

    @Override
    public <T> byte[] serialize(T object) {
    try {
    ByteArrayOutputStream bos = new ByteArrayOutputStream();
    ObjectOutputStream oos = new ObjectOutputStream(bos);
    oos.writeObject(object);
    return bos.toByteArray();
    } catch (IOException e) {
    throw new RuntimeException("序列化失败", e);
    }
    }
    },

    Json {
    @Override
    public <T> T deserialize(Class<T> clazz, byte[] bytes) {
    Gson gson = new GsonBuilder().registerTypeAdapter(Class.class, new ClassCodec()).create();
    String json = new String(bytes, StandardCharsets.UTF_8);
    return gson.fromJson(json, clazz);
    }

    @Override
    public <T> byte[] serialize(T object) {
    Gson gson = new GsonBuilder().registerTypeAdapter(Class.class, new ClassCodec()).create();
    String json = gson.toJson(object);
    return json.getBytes(StandardCharsets.UTF_8);
    }
    }
    }

    class ClassCodec implements JsonSerializer<Class<?>>, JsonDeserializer<Class<?>> {

    @Override
    public Class<?> deserialize(JsonElement json, Type typeOfT, JsonDeserializationContext context) throws JsonParseException {
    try {
    String str = json.getAsString();
    return Class.forName(str);
    } catch (ClassNotFoundException e) {
    throw new JsonParseException(e);
    }
    }

    @Override // String.class
    public JsonElement serialize(Class<?> src, Type typeOfSrc, JsonSerializationContext context) {
    // class -> json
    return new JsonPrimitive(src.getName());
    }
    }
    }

参数调优

  • CONNECT_TIMEOUT_MILLIS

    • 属于SocketChannal参数

    • 若客户端建立连接时未在指定毫秒内连接,会抛出timeout异常

    • SO_TIMEOUT用在阻塞IO:阻塞IO中accept/read等都是无限等待的,如果不希望永远阻塞,使用它调整超时时间

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    // 为ServerSocketChannel配置参数
    new ServerBootstrap().option();
    // 为SocketChannel配置参数
    new ServerBootstrap().childOption();

    NioEventLoopGroup group = new NioEventLoopGroup();
    try {
    Bootstrap bootstrap = new Bootstrap()
    .group(group)
    // 客户端通过option方法配置参数
    .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 300)
    .channel(NioSocketChannel.class)
    .handler(new LoggingHandler());
    ChannelFuture future = bootstrap.connect("localhost", 8080);
    future.sync().channel().closeFuture().sync(); // 断点1
    } catch (Exception e) {
    e.printStackTrace();
    log.debug("timeout");
    } finally {
    group.shutdownGracefully();
    }
    • io.netty.channel.nio.AbstractNioChannel.AbstractNioUnsafe#connect
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    @Override
    public final void connect(
    final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
    // ...
    // Schedule connect timeout.
    int connectTimeoutMillis = config().getConnectTimeoutMillis();
    if (connectTimeoutMillis > 0) {
    connectTimeoutFuture = eventLoop().schedule(new Runnable() {
    @Override
    public void run() {
    ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
    ConnectTimeoutException cause =
    new ConnectTimeoutException("connection timed out: " + remoteAddress); // 断点2
    if (connectPromise != null && connectPromise.tryFailure(cause)) {
    close(voidPromise());
    }
    }
    }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
    }
    // ...
    }
  • SO_BACKLOG

    • 属于ServerSocketChannal参数

      sequenceDiagram
      
      participant c as client
      participant s as server
      participant sq as syns queue(半连接队列)
      participant aq as accept queue(全连接队列)
      
      s ->> s : bind()
      s ->> s : listen()
      c ->> c : connect()
      c ->> s : 1. SYN
      Note left of c : SYN_SEND
      s ->> sq : put
      Note right of s : SYN_RCVD
      s ->> c : 2. SYN + ACK
      Note left of c : ESTABLISHED
      c ->> s : 3. ACK
      sq ->> aq : put
      Note right of s : ESTABLISHED
      aq -->> s : 
      s ->> s : accept()
      
      • 第一次握手,client发送SYN到server,状态修改为SYN_SEND,server收到,状态改变为SYN_REVD,并将该请求放入sync queue队列
      • 第二次握手,server回复SYN + ACK给client,client收到,状态改变为 ESTABLISHED,并发送ACK给server
      • 第三次握手,server收到ACK,状态改变为ESTABLISHED,将该请求从sync queue放入accept queue
    • 在linux 2.2后,分别用下面两个参数来控制

      • sync queue:半连接队列,存放没有完成三次握手的连接信息

        • 大小通过/proc/sys/net/ipv4/tcp_max_syn_backlog 指定,在 syncookies启用的情况下,逻辑上没有最大值限制,这个设置便被忽略
      • accept queue:全连接队列,存放已经完成三次握手的连接信息

        • 其大小通过/proc/sys/net/core/somaxconn指定,在使用listen函数时,内核会根据传入的backlog参数与系统参数,取二者的较小值
          • 如果accpet queue队列满了,server将发送一个拒绝连接的错误信息到 client
    • netty中可以通过option(ChannelOption.SO_BACKLOG, 值)设置大小
  • ulimit -n

    • 允许同时打开的文件描述符的数量
    • 属于操作系统参数
  • TCP_NODELAY

    • 小数据包不延迟发送(nagle算法会使小数据包攒在一起发送)
    • 属于SocketChannal参数
  • SO_SNDBUF & SO_RCVBUF

    • SO_SNDBUF属于SocketChannal参数

    • SO_RCVBUF既可用于SocketChannal参数,也可以用于ServerSocketChannal参数(建议设置到ServerSocketChannal上)

  • ALLOCATOR

    • 用来分配ByteBuf,ctx.alloc()
    • 属于SocketChannal参数
  • RCVBUF_ALLOCATOR

    • 控制netty接收缓冲区大小
    • 负责入站数据的分配,决定入站缓冲区的大小(并可动态调整),统一采用direct直接内存,具体池化还是非池化由allocator决定
    • 属于SocketChannal参数

RPC框架

准备工作

  • 为了简化起见,在原来聊天项目的基础上新增Rpc请求和响应消息

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    @Data
    public abstract class Message implements Serializable {

    // 省略旧的代码

    public static final int RPC_MESSAGE_TYPE_REQUEST = 101;
    public static final int RPC_MESSAGE_TYPE_RESPONSE = 102;

    static {
    // ...
    messageClasses.put(RPC_MESSAGE_TYPE_REQUEST, RpcRequestMessage.class);
    messageClasses.put(RPC_MESSAGE_TYPE_RESPONSE, RpcResponseMessage.class);
    }

    }
  • 请求消息

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    @Getter
    @ToString(callSuper = true)
    public class RpcRequestMessage extends Message {

    /**
    * 调用的接口全限定名,服务端根据它找到实现
    */
    private String interfaceName;
    /**
    * 调用接口中的方法名
    */
    private String methodName;
    /**
    * 方法返回类型
    */
    private Class<?> returnType;
    /**
    * 方法参数类型数组
    */
    private Class[] parameterTypes;
    /**
    * 方法参数值数组
    */
    private Object[] parameterValue;

    public RpcRequestMessage(int sequenceId, String interfaceName, String methodName, Class<?> returnType, Class[] parameterTypes, Object[] parameterValue) {
    super.setSequenceId(sequenceId);
    this.interfaceName = interfaceName;
    this.methodName = methodName;
    this.returnType = returnType;
    this.parameterTypes = parameterTypes;
    this.parameterValue = parameterValue;
    }

    @Override
    public int getMessageType() {
    return RPC_MESSAGE_TYPE_REQUEST;
    }
    }
  • 响应消息

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    @Data
    @ToString(callSuper = true)
    public class RpcResponseMessage extends Message {
    /**
    * 返回值
    */
    private Object returnValue;
    /**
    * 异常值
    */
    private Exception exceptionValue;

    @Override
    public int getMessageType() {
    return RPC_MESSAGE_TYPE_RESPONSE;
    }
    }

服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
NioEventLoopGroup leader = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
// rpc 请求消息处理器,待实现
RpcRequestMessageHandler RPC_HANDLER = new RpcRequestMessageHandler();
try {
new ServerBootstrap()
.channel(NioServerSocketChannel.class)
.group(leader, worker)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(new FrameDecoderProtocol())
.addLast(LOGGING_HANDLER)
.addLast(MESSAGE_CODEC)
.addLast(RPC_HANDLER);
}
})
.bind(8080)
.sync()
.channel()
.closeFuture()
.sync();
} catch (InterruptedException e) {
log.error("server error", e);
} finally {
leader.shutdownGracefully();
worker.shutdownGracefully();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Slf4j
@ChannelHandler.Sharable
public class RpcRequestMessageHandler extends SimpleChannelInboundHandler<RpcRequestMessage> {

@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, RpcRequestMessage rpcRequestMessage) throws Exception {
RpcResponseMessage response = new RpcResponseMessage();
try {
// 获取真正的实现对象
HelloService service = (HelloService) ServicesFactory.getService(Class.forName(rpcRequestMessage.getInterfaceName()));
// 获取要调用的方法
Method method = service.getClass().getMethod(rpcRequestMessage.getMethodName(), rpcRequestMessage.getParameterTypes());
// 调用方法
Object invoke = method.invoke(service, rpcRequestMessage.getParameterValue());
// 调用成功
response.setReturnValue(invoke);
} catch (Exception e) {
e.printStackTrace();
// 调用异常
response.setExceptionValue(e);
}
// 返回结果
channelHandlerContext.writeAndFlush(response);
}

}

客户端

  • 包括channel管理,代理,接收结果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
// 创建代理类
public static <T> T getProxyService(Class<T> serviceClass) {
ClassLoader loader = serviceClass.getClassLoader();
Class<?>[] interfaces = new Class[]{serviceClass};
// method=sayHello; args="Tom";
Object o = Proxy.newProxyInstance(loader, interfaces, (proxy, method, args) -> {
// 将方法调用转换为消息对象
int sequenceId = SequenceIdGenerator.nextId();
RpcRequestMessage msg = new RpcRequestMessage(
sequenceId,
serviceClass.getName(),
method.getName(),
method.getReturnType(),
method.getParameterTypes(),
args
);
// 将消息对象发送出去
getChannel().writeAndFlush(msg);
// 准备一个空Promise对象,来接收结果指定promise对象异步接收结果线程
DefaultPromise<Object> promise = new DefaultPromise<>(getChannel().eventLoop());
RpcResponseMessageHandler.PROMISES.put(sequenceId, promise);
// 异步方法
// promise.addListener(future -> {
// // 线程从getChannel().eventLoop()中来
// });
// 等待promise结果
// 同步方法
promise.await();
if (promise.isSuccess()) {
// 调用正常
return promise.getNow();
} else {
// 调用失败
throw new RuntimeException(promise.cause());
}
});
return (T) o;
}

private static volatile Channel channel = null;
private static final Object LOCK = new Object();

// 获取唯一的channel对象
public static Channel getChannel() {
if (channel == null) {
synchronized (LOCK) {
if (channel == null) {
initChannel();
return channel;
}
}
}
return channel;
}

// 初始化channel方法
private static void initChannel() {
NioEventLoopGroup group = new NioEventLoopGroup();
LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
RpcResponseMessageHandler RPC_HANDLER = new RpcResponseMessageHandler();
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(group);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new FrameDecoderProtocol());
ch.pipeline().addLast(LOGGING_HANDLER);
ch.pipeline().addLast(MESSAGE_CODEC);
ch.pipeline().addLast(RPC_HANDLER);
}
});
try {
channel = bootstrap.connect("localhost", 8080).sync().channel();
channel.closeFuture().addListener(future -> {
group.shutdownGracefully();
});
} catch (Exception e) {
log.error("client error", e);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Slf4j
@ChannelHandler.Sharable
public class RpcResponseMessageHandler extends SimpleChannelInboundHandler<RpcResponseMessage> {

// Map<序号, 用来接收结果的promise对象>
// ConcurrentHashMap是线程安全的
public static final Map<Integer, Promise<Object>> PROMISES = new ConcurrentHashMap<>();

@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcResponseMessage msg) throws Exception {
log.debug("{}", msg);
// 拿到空的promise并从中移除
Promise<Object> promise = PROMISES.remove(msg.getSequenceId());
if (promise == null) return;
Object returnValue = msg.getReturnValue();
Exception exceptionValue = msg.getExceptionValue();
// 判断消息是否成功
if (exceptionValue != null) {
promise.setFailure(exceptionValue);
} else {
promise.setSuccess(returnValue);
}
}

}

源码

启动

  • 入口 io.netty.bootstrap.ServerBootstrap#bind

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    //1 netty 中使用 NioEventLoopGroup (简称 nio boss 线程)来封装线程和 selector
    Selector selector = Selector.open();

    //2 创建 NioServerSocketChannel,同时会初始化它关联的 handler,以及为原生 ssc 存储 config
    NioServerSocketChannel attachment = new NioServerSocketChannel();

    //3 创建 NioServerSocketChannel 时,创建了 java 原生的 ServerSocketChannel
    ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
    serverSocketChannel.configureBlocking(false);

    //4 启动 nio boss 线程执行接下来的操作

    //5 注册(仅关联 selector 和 NioServerSocketChannel),未关注事件
    SelectionKey selectionKey = serverSocketChannel.register(selector, 0, attachment);

    //6 head -> 初始化器 -> ServerBootstrapAcceptor -> tail,初始化器是一次性的,只为添加 acceptor

    //7 绑定端口
    serverSocketChannel.bind(new InetSocketAddress(8080));

    //8 触发 channel active 事件,在 head 中关注 op_accept 事件
    selectionKey.interestOps(SelectionKey.OP_ACCEPT);
  • io.netty.bootstrap.AbstractBootstrap#doBind

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    private ChannelFuture doBind(final SocketAddress localAddress) {
    // 1. 执行初始化和注册 regFuture 会由 initAndRegister 设置其是否完成,从而回调 3.2 处代码
    final ChannelFuture regFuture = initAndRegister();
    final Channel channel = regFuture.channel();
    if (regFuture.cause() != null) {
    return regFuture;
    }

    // 2. 因为是 initAndRegister 异步执行,需要分两种情况来看,调试时也需要通过 suspend 断点类型加以区分
    // 2.1 如果已经完成
    if (regFuture.isDone()) {
    ChannelPromise promise = channel.newPromise();
    // 3.1 立刻调用 doBind0
    doBind0(regFuture, channel, localAddress, promise);
    return promise;
    }
    // 2.2 还没有完成
    else {
    final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
    // 3.2 回调 doBind0
    regFuture.addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture future) throws Exception {
    Throwable cause = future.cause();
    if (cause != null) {
    // 处理异常...
    promise.setFailure(cause);
    } else {
    promise.registered();
    // 3. 由注册线程去执行 doBind0
    doBind0(regFuture, channel, localAddress, promise);
    }
    }
    });
    return promise;
    }
    }
  • io.netty.bootstrap.AbstractBootstrap#initAndRegister

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    final ChannelFuture initAndRegister() {
    Channel channel = null;
    try {
    channel = channelFactory.newChannel();
    // 1.1 初始化 - 做的事就是添加一个初始化器 ChannelInitializer
    init(channel);
    } catch (Throwable t) {
    // 处理异常...
    return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
    }

    // 1.2 注册 - 做的事就是将原生 channel 注册到 selector 上
    ChannelFuture regFuture = config().group().register(channel);
    if (regFuture.cause() != null) {
    // 处理异常...
    }
    return regFuture;
    }
  • io.netty.bootstrap.ServerBootstrap#init

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    // 这里 channel 实际上是 NioServerSocketChannel
    void init(Channel channel) throws Exception {
    final Map<ChannelOption<?>, Object> options = options0();
    synchronized (options) {
    setChannelOptions(channel, options, logger);
    }

    final Map<AttributeKey<?>, Object> attrs = attrs0();
    synchronized (attrs) {
    for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
    @SuppressWarnings("unchecked")
    AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
    channel.attr(key).set(e.getValue());
    }
    }

    ChannelPipeline p = channel.pipeline();

    final EventLoopGroup currentChildGroup = childGroup;
    final ChannelHandler currentChildHandler = childHandler;
    final Entry<ChannelOption<?>, Object>[] currentChildOptions;
    final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
    synchronized (childOptions) {
    currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
    }
    synchronized (childAttrs) {
    currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
    }

    // 为 NioServerSocketChannel 添加初始化器
    p.addLast(new ChannelInitializer<Channel>() {
    @Override
    public void initChannel(final Channel ch) throws Exception {
    final ChannelPipeline pipeline = ch.pipeline();
    ChannelHandler handler = config.handler();
    if (handler != null) {
    pipeline.addLast(handler);
    }

    // 初始化器的职责是将 ServerBootstrapAcceptor 加入至 NioServerSocketChannel
    ch.eventLoop().execute(new Runnable() {
    @Override
    public void run() {
    pipeline.addLast(new ServerBootstrapAcceptor(
    ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
    }
    });
    }
    });
    }
  • io.netty.channel.AbstractChannel.AbstractUnsafe#register

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
// 一些检查,略...

AbstractChannel.this.eventLoop = eventLoop;

if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
// 首次执行 execute 方法时,会启动 nio 线程,之后注册等操作在 nio 线程上执行
// 因为只有一个 NioServerSocketChannel 因此,也只会有一个 boss nio 线程
// 这行代码完成的事实是 main -> nio boss 线程的切换
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
// 日志记录...
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
  • io.netty.channel.AbstractChannel.AbstractUnsafe#register0

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    private void register0(ChannelPromise promise) {
    try {
    if (!promise.setUncancellable() || !ensureOpen(promise)) {
    return;
    }
    boolean firstRegistration = neverRegistered;
    // 1.2.1 原生的 nio channel 绑定到 selector 上,注意此时没有注册 selector 关注事件,附件为 NioServerSocketChannel
    doRegister();
    neverRegistered = false;
    registered = true;

    // 1.2.2 执行 NioServerSocketChannel 初始化器的 initChannel
    pipeline.invokeHandlerAddedIfNeeded();

    // 回调 3.2 io.netty.bootstrap.AbstractBootstrap#doBind0
    safeSetSuccess(promise);
    pipeline.fireChannelRegistered();

    // 对应 server socket channel 还未绑定,isActive 为 false
    if (isActive()) {
    if (firstRegistration) {
    pipeline.fireChannelActive();
    } else if (config().isAutoRead()) {
    beginRead();
    }
    }
    } catch (Throwable t) {
    // Close the channel directly to avoid FD leak.
    closeForcibly();
    closeFuture.setClosed();
    safeSetFailure(promise, t);
    }
    }
  • io.netty.channel.ChannelInitializer#initChannel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
if (initMap.add(ctx)) { // Guard against re-entrance.
try {
// 1.2.2.1 执行初始化
initChannel((C) ctx.channel());
} catch (Throwable cause) {
exceptionCaught(ctx, cause);
} finally {
// 1.2.2.2 移除初始化器
ChannelPipeline pipeline = ctx.pipeline();
if (pipeline.context(this) != null) {
pipeline.remove(this);
}
}
return true;
}
return false;
}
  • io.netty.bootstrap.AbstractBootstrap#doBind0

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    // 3.1 或 3.2 执行 doBind0
    private static void doBind0(
    final ChannelFuture regFuture, final Channel channel,
    final SocketAddress localAddress, final ChannelPromise promise) {

    channel.eventLoop().execute(new Runnable() {
    @Override
    public void run() {
    if (regFuture.isSuccess()) {
    channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
    } else {
    promise.setFailure(regFuture.cause());
    }
    }
    });
    }
  • io.netty.channel.AbstractChannel.AbstractUnsafe#bind

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
    assertEventLoop();

    if (!promise.setUncancellable() || !ensureOpen(promise)) {
    return;
    }

    if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
    localAddress instanceof InetSocketAddress &&
    !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
    !PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
    // 记录日志...
    }

    boolean wasActive = isActive();
    try {
    // 3.3 执行端口绑定
    doBind(localAddress);
    } catch (Throwable t) {
    safeSetFailure(promise, t);
    closeIfClosed();
    return;
    }

    if (!wasActive && isActive()) {
    invokeLater(new Runnable() {
    @Override
    public void run() {
    // 3.4 触发 active 事件
    pipeline.fireChannelActive();
    }
    });
    }

    safeSetSuccess(promise);
    }
  • io.netty.channel.socket.nio.NioServerSocketChannel#doBind

1
2
3
4
5
6
7
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}
  • io.netty.channel.DefaultChannelPipeline.HeadContext#channelActive

    1
    2
    3
    4
    5
    public void channelActive(ChannelHandlerContext ctx) {
    ctx.fireChannelActive();
    // 触发 read (NioServerSocketChannel 上的 read 不是读取数据,只是为了触发 channel 的事件注册)
    readIfIsAutoRead();
    }
  • io.netty.channel.nio.AbstractNioChannel#doBeginRead

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    protected void doBeginRead() throws Exception {
    // Channel.read() or ChannelHandlerContext.read() was called
    final SelectionKey selectionKey = this.selectionKey;
    if (!selectionKey.isValid()) {
    return;
    }

    readPending = true;

    final int interestOps = selectionKey.interestOps();
    // readInterestOp 取值是 16,在 NioServerSocketChannel 创建时初始化好,代表关注 accept 事件
    if ((interestOps & readInterestOp) == 0) {
    selectionKey.interestOps(interestOps | readInterestOp);
    }
    }

NioEventLoop

  • NioEventLoop线程不仅要处理IO事件,还要处理Task(包括普通任务和定时任务)

  • 提交任务代码 io.netty.util.concurrent.SingleThreadEventExecutor#execute

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    public void execute(Runnable task) {
    if (task == null) {
    throw new NullPointerException("task");
    }

    boolean inEventLoop = inEventLoop();
    // 添加任务,其中队列使用了 jctools 提供的 mpsc 无锁队列
    addTask(task);
    if (!inEventLoop) {
    // inEventLoop 如果为 false 表示由其它线程来调用 execute,即首次调用,这时需要向 eventLoop 提交首个任务,启动死循环,会执行到下面的 doStartThread
    startThread();
    if (isShutdown()) {
    // 如果已经 shutdown,做拒绝逻辑,代码略...
    }
    }

    if (!addTaskWakesUp && wakesUpForTask(task)) {
    // 如果线程由于 IO select 阻塞了,添加的任务的线程需要负责唤醒 NioEventLoop 线程
    wakeup(inEventLoop);
    }
    }
  • 唤醒select阻塞线程io.netty.channel.nio.NioEventLoop#wakeup

    1
    2
    3
    4
    5
    6
    @Override
    protected void wakeup(boolean inEventLoop) {
    if (!inEventLoop && wakenUp.compareAndSet(false, true)) {
    selector.wakeup();
    }
    }
  • 启动 EventLoop 主循环 io.netty.util.concurrent.SingleThreadEventExecutor#doStartThread

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    private void doStartThread() {
    assert thread == null;
    executor.execute(new Runnable() {
    @Override
    public void run() {
    // 将线程池的当前线程保存在成员变量中,以便后续使用
    thread = Thread.currentThread();
    if (interrupted) {
    thread.interrupt();
    }

    boolean success = false;
    updateLastExecutionTime();
    try {
    // 调用外部类 SingleThreadEventExecutor 的 run 方法,进入死循环,run 方法见下
    SingleThreadEventExecutor.this.run();
    success = true;
    } catch (Throwable t) {
    logger.warn("Unexpected exception from an event executor: ", t);
    } finally {
    // 清理工作,代码略...
    }
    }
    });
    }
  • io.netty.channel.nio.NioEventLoop#run 主要任务是执行死循环,不断看有没有新任务,有没有 IO 事件

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    protected void run() {
    for (;;) {
    try {
    try {
    // calculateStrategy 的逻辑如下:
    // 有任务,会执行一次 selectNow,清除上一次的 wakeup 结果,无论有没有 IO 事件,都会跳过 switch
    // 没有任务,会匹配 SelectStrategy.SELECT,看是否应当阻塞
    switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
    case SelectStrategy.CONTINUE:
    continue;

    case SelectStrategy.BUSY_WAIT:

    case SelectStrategy.SELECT:
    // 因为 IO 线程和提交任务线程都有可能执行 wakeup,而 wakeup 属于比较昂贵的操作,因此使用了一个原子布尔对象 wakenUp,它取值为 true 时,表示该由当前线程唤醒
    // 进行 select 阻塞,并设置唤醒状态为 false
    boolean oldWakenUp = wakenUp.getAndSet(false);

    // 如果在这个位置,非 EventLoop 线程抢先将 wakenUp 置为 true,并 wakeup
    // 下面的 select 方法不会阻塞
    // 等 runAllTasks 处理完成后,到再循环进来这个阶段新增的任务会不会及时执行呢?
    // 因为 oldWakenUp 为 true,因此下面的 select 方法就会阻塞,直到超时
    // 才能执行,让 select 方法无谓阻塞
    select(oldWakenUp);

    if (wakenUp.get()) {
    selector.wakeup();
    }
    default:
    }
    } catch (IOException e) {
    rebuildSelector0();
    handleLoopException(e);
    continue;
    }

    cancelledKeys = 0;
    needsToSelectAgain = false;
    // ioRatio 默认是 50
    final int ioRatio = this.ioRatio;
    if (ioRatio == 100) {
    try {
    processSelectedKeys();
    } finally {
    // ioRatio 为 100 时,总是运行完所有非 IO 任务
    runAllTasks();
    }
    } else {
    final long ioStartTime = System.nanoTime();
    try {
    processSelectedKeys();
    } finally {
    // 记录 io 事件处理耗时
    final long ioTime = System.nanoTime() - ioStartTime;
    // 运行非 IO 任务,一旦超时会退出 runAllTasks
    runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
    }
    }
    } catch (Throwable t) {
    handleLoopException(t);
    }
    try {
    if (isShuttingDown()) {
    closeAll();
    if (confirmShutdown()) {
    return;
    }
    }
    } catch (Throwable t) {
    handleLoopException(t);
    }
    }
    }

wakeup既可以由提交任务的线程来调用(比较好理解),也可以由EventLoop线程来调用(比较费解),wakeup方法的效果:

  • 由非EventLoop线程调用,会唤醒当前在执行select阻塞的EventLoop线程
  • 由EventLoop自己调用,会本次的wakeup会取消下一次的select操作

参考下图

  • io.netty.channel.nio.NioEventLoop#select

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    private void select(boolean oldWakenUp) throws IOException {
    Selector selector = this.selector;
    try {
    int selectCnt = 0;
    long currentTimeNanos = System.nanoTime();
    // 计算等待时间
    // * 没有 scheduledTask,超时时间为 1s
    // * 有 scheduledTask,超时时间为 `下一个定时任务执行时间 - 当前时间`
    long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);

    for (;;) {
    long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
    // 如果超时,退出循环
    if (timeoutMillis <= 0) {
    if (selectCnt == 0) {
    selector.selectNow();
    selectCnt = 1;
    }
    break;
    }

    // 如果期间又有 task 退出循环,如果没这个判断,那么任务就会等到下次 select 超时时才能被执行
    // wakenUp.compareAndSet(false, true) 是让非 NioEventLoop 不必再执行 wakeup
    if (hasTasks() && wakenUp.compareAndSet(false, true)) {
    selector.selectNow();
    selectCnt = 1;
    break;
    }

    // select 有限时阻塞
    // 注意 nio 有 bug,当 bug 出现时,select 方法即使没有时间发生,也不会阻塞住,导致不断空轮询,cpu 占用 100%
    int selectedKeys = selector.select(timeoutMillis);
    // 计数加 1
    selectCnt ++;

    // 醒来后,如果有 IO 事件、或是由非 EventLoop 线程唤醒,或者有任务,退出循环
    if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
    break;
    }
    if (Thread.interrupted()) {
    // 线程被打断,退出循环
    // 记录日志
    selectCnt = 1;
    break;
    }

    long time = System.nanoTime();
    if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
    // 如果超时,计数重置为 1,下次循环就会 break
    selectCnt = 1;
    }
    // 计数超过阈值,由 io.netty.selectorAutoRebuildThreshold 指定,默认 512
    // 这是为了解决 nio 空轮询 bug
    else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
    selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
    // 重建 selector
    selector = selectRebuildSelector(selectCnt);
    selectCnt = 1;
    break;
    }

    currentTimeNanos = time;
    }

    if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
    // 记录日志
    }
    } catch (CancelledKeyException e) {
    // 记录日志
    }
    }
  • 处理keys io.netty.channel.nio.NioEventLoop#processSelectedKeys

    1
    2
    3
    4
    5
    6
    7
    8
    9
    private void processSelectedKeys() {
    if (selectedKeys != null) {
    // 通过反射将 Selector 实现类中的就绪事件集合替换为 SelectedSelectionKeySet
    // SelectedSelectionKeySet 底层为数组实现,可以提高遍历性能(原本为 HashSet)
    processSelectedKeysOptimized();
    } else {
    processSelectedKeysPlain(selector.selectedKeys());
    }
    }
  • io.netty.channel.nio.NioEventLoop#processSelectedKey

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
    // 当 key 取消或关闭时会导致这个 key 无效
    if (!k.isValid()) {
    // 无效时处理...
    return;
    }

    try {
    int readyOps = k.readyOps();
    // 连接事件
    if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
    int ops = k.interestOps();
    ops &= ~SelectionKey.OP_CONNECT;
    k.interestOps(ops);

    unsafe.finishConnect();
    }

    // 可写事件
    if ((readyOps & SelectionKey.OP_WRITE) != 0) {
    ch.unsafe().forceFlush();
    }

    // 可读或可接入事件
    if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
    // 如果是可接入 io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe#read
    // 如果是可读 io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read
    unsafe.read();
    }
    } catch (CancelledKeyException ignored) {
    unsafe.close(unsafe.voidPromise());
    }
    }

accept

  • nio中如下代码,在netty中的流程

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    //1 阻塞直到事件发生
    selector.select();

    Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
    while (iter.hasNext()) {
    //2 拿到一个事件
    SelectionKey key = iter.next();

    //3 如果是 accept 事件
    if (key.isAcceptable()) {

    //4 执行 accept
    SocketChannel channel = serverSocketChannel.accept();
    channel.configureBlocking(false);

    //5 关注 read 事件
    channel.register(selector, SelectionKey.OP_READ);
    }
    // ...
    }
  • 可接入事件处理(accept)io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe#read

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    public void read() {
    assert eventLoop().inEventLoop();
    final ChannelConfig config = config();
    final ChannelPipeline pipeline = pipeline();
    final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
    allocHandle.reset(config);

    boolean closed = false;
    Throwable exception = null;
    try {
    try {
    do {
    // doReadMessages 中执行了 accept 并创建 NioSocketChannel 作为消息放入 readBuf
    // readBuf 是一个 ArrayList 用来缓存消息
    int localRead = doReadMessages(readBuf);
    if (localRead == 0) {
    break;
    }
    if (localRead < 0) {
    closed = true;
    break;
    }
    // localRead 为 1,就一条消息,即接收一个客户端连接
    allocHandle.incMessagesRead(localRead);
    } while (allocHandle.continueReading());
    } catch (Throwable t) {
    exception = t;
    }

    int size = readBuf.size();
    for (int i = 0; i < size; i ++) {
    readPending = false;
    // 触发 read 事件,让 pipeline 上的 handler 处理,这时是处理
    // io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor#channelRead
    pipeline.fireChannelRead(readBuf.get(i));
    }
    readBuf.clear();
    allocHandle.readComplete();
    pipeline.fireChannelReadComplete();

    if (exception != null) {
    closed = closeOnReadError(exception);

    pipeline.fireExceptionCaught(exception);
    }

    if (closed) {
    inputShutdown = true;
    if (isOpen()) {
    close(voidPromise());
    }
    }
    } finally {
    if (!readPending && !config.isAutoRead()) {
    removeReadOp();
    }
    }
    }
  • io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor#channelRead

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
    // 这时的 msg 是 NioSocketChannel
    final Channel child = (Channel) msg;

    // NioSocketChannel 添加 childHandler 即初始化器
    child.pipeline().addLast(childHandler);

    // 设置选项
    setChannelOptions(child, childOptions, logger);

    for (Entry<AttributeKey<?>, Object> e: childAttrs) {
    child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
    }

    try {
    // 注册 NioSocketChannel 到 nio worker 线程,接下来的处理也移交至 nio worker 线程
    childGroup.register(child).addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture future) throws Exception {
    if (!future.isSuccess()) {
    forceClose(child, future.cause());
    }
    }
    });
    } catch (Throwable t) {
    forceClose(child, t);
    }
  • io.netty.channel.AbstractChannel.AbstractUnsafe#register

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
// 一些检查,略...

AbstractChannel.this.eventLoop = eventLoop;

if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
// 这行代码完成的事实是 nio boss -> nio worker 线程的切换
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
// 日志记录...
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
  • io.netty.channel.AbstractChannel.AbstractUnsafe#register0

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    private void register0(ChannelPromise promise) {
    try {
    if (!promise.setUncancellable() || !ensureOpen(promise)) {
    return;
    }
    boolean firstRegistration = neverRegistered;
    doRegister();
    neverRegistered = false;
    registered = true;

    // 执行初始化器,执行前 pipeline 中只有 head -> 初始化器 -> tail
    pipeline.invokeHandlerAddedIfNeeded();
    // 执行后就是 head -> logging handler -> my handler -> tail

    safeSetSuccess(promise);
    pipeline.fireChannelRegistered();

    if (isActive()) {
    if (firstRegistration) {
    // 触发 pipeline 上 active 事件
    pipeline.fireChannelActive();
    } else if (config().isAutoRead()) {
    beginRead();
    }
    }
    } catch (Throwable t) {
    closeForcibly();
    closeFuture.setClosed();
    safeSetFailure(promise, t);
    }
    }
  • io.netty.channel.DefaultChannelPipeline.HeadContext#channelActive

    1
    2
    3
    4
    5
    public void channelActive(ChannelHandlerContext ctx) {
    ctx.fireChannelActive();
    // 触发 read (NioSocketChannel 这里 read,只是为了触发 channel 的事件注册,还未涉及数据读取)
    readIfIsAutoRead();
    }
  • io.netty.channel.nio.AbstractNioChannel#doBeginRead

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    protected void doBeginRead() throws Exception {
    // Channel.read() or ChannelHandlerContext.read() was called
    final SelectionKey selectionKey = this.selectionKey;
    if (!selectionKey.isValid()) {
    return;
    }

    readPending = true;
    // 这时候 interestOps 是 0
    final int interestOps = selectionKey.interestOps();
    if ((interestOps & readInterestOp) == 0) {
    // 关注 read 事件
    selectionKey.interestOps(interestOps | readInterestOp);
    }
    }

read

  • 可读事件 io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read,注意发送的数据未必能够一次读完,因此会触发多次nio read事件,一次事件内会触发多次pipeline read,一次事件会触发一次pipeline read complete

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    public final void read() {
    final ChannelConfig config = config();
    if (shouldBreakReadReady(config)) {
    clearReadPending();
    return;
    }
    final ChannelPipeline pipeline = pipeline();
    // io.netty.allocator.type 决定 allocator 的实现
    final ByteBufAllocator allocator = config.getAllocator();
    // 用来分配 byteBuf,确定单次读取大小
    final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
    allocHandle.reset(config);

    ByteBuf byteBuf = null;
    boolean close = false;
    try {
    do {
    byteBuf = allocHandle.allocate(allocator);
    // 读取
    allocHandle.lastBytesRead(doReadBytes(byteBuf));
    if (allocHandle.lastBytesRead() <= 0) {
    byteBuf.release();
    byteBuf = null;
    close = allocHandle.lastBytesRead() < 0;
    if (close) {
    readPending = false;
    }
    break;
    }

    allocHandle.incMessagesRead(1);
    readPending = false;
    // 触发 read 事件,让 pipeline 上的 handler 处理,这时是处理 NioSocketChannel 上的 handler
    pipeline.fireChannelRead(byteBuf);
    byteBuf = null;
    }
    // 是否要继续循环
    while (allocHandle.continueReading());

    allocHandle.readComplete();
    // 触发 read complete 事件
    pipeline.fireChannelReadComplete();

    if (close) {
    closeOnRead(pipeline);
    }
    } catch (Throwable t) {
    handleReadException(pipeline, byteBuf, t, close, allocHandle);
    } finally {
    if (!readPending && !config.isAutoRead()) {
    removeReadOp();
    }
    }
    }
  • io.netty.channel.DefaultMaxMessagesRecvByteBufAllocator.MaxMessageHandle#continueReading(io.netty.util.UncheckedBooleanSupplier)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
    return
    // 一般为 true
    config.isAutoRead() &&
    // respectMaybeMoreData 默认为 true
    // maybeMoreDataSupplier 的逻辑是如果预期读取字节与实际读取字节相等,返回 true
    (!respectMaybeMoreData || maybeMoreDataSupplier.get()) &&
    // 小于最大次数,maxMessagePerRead 默认 16
    totalMessages < maxMessagePerRead &&
    // 实际读到了数据
    totalBytesRead > 0;
    }

Netty
http://docs.mousse.cc/Netty/
作者
Mocha Mousse
发布于
2025年5月26日
许可协议