Kafka
Kafka
概述
- 一个分布式的基于发布/订阅模式的消息队列
基础架构
- Kafka集群
- 由多个Broker组成,每个Broker拥有唯一的id
- 有多个Topic,每个Topic可有多个分区(partition),每个分区可有多个副本(replication)
- 一个Topic的多个分区可以存在到一个Broker中, 一个分区的多个副本只能在不同的Broker存在
- 一个分区的多个副本由一个leader和多个follower组成
- 生产者和消费者读写数据面向leader
- follower主要同步leader的数据。leader故障后follower代替leader工作
- 生产者
- 往topic中发布消息
- 消费者
- 从topic中消费消息
- 以消费者组为单位进行消息消费
- 一个消费者组内的一个消费者可以同时消费一个topic中多个分区的消息
- 一个Topic中的一个分区的消息同时只能被一个消费者组中的一个消费者消费
- Zookeeper
- Kafka集群的工作需要依赖zookeeper,每个broker启动后需要向zookeeper注册
- Broker中大哥(controller)的选举:争抢策略
- Kafka 0.9版本之前消费者组的offset维护在zookeeper中,0.9版本之后维护在kafka内部
安装部署
在kafka目录下创建data文件夹
修改配置文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28# vim /kafka/config/server.properties
# broker的全局唯一编号,不能重复
# bigdata1/2/3改动
broker.id=0
# 删除topic功能使能,当前版本此配置默认为true,已从配置文件移除
delete.topic.enable=true
# 处理网络请求的线程数量
num.network.threads=3
# 用来处理磁盘IO的线程数量
num.io.threads=8
# 发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
# 接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
# 请求套接字的缓冲区大小
socket.request.max.bytes=104857600
# 消息存放的路径
# 改动
log.dirs=/kafka/data
# topic在当前broker上的分区个数
num.partitions=1
# 用来恢复和清理data下数据的线程数量
num.recovery.threads.per.data.dir=1
# segment文件保留的最长时间,超时将被删除
log.retention.hours=168
# 配置连接Zookeeper集群地址
# 改动
zookeeper.connect=bigdata1:2181,bigdata2:2181,bigdata3:2181配置环境变量
1
2
3
4vim /etc/profile.d/env.sh
# KAFKA_HOME
export KAFKA_HOME=/kafka
export PATH=$PATH:$KAFKA_HOME/bin
起停脚本
1 |
|
命令行操作
1 |
|
原理
存储机制
- topic是逻辑上的概念,partition是物理上的概念
- 每个partition对应一个log文件,该log文件中存储的就是Producer生产的数据,Producer生产的数据会被不断追加到该log文件末端(追加写),每条数据都有自己的offset
- 消费者组中的每个消费者,都会实时记录自己消费到了哪个offset,以便出错恢复时,从上次的位置继续消费
- 采取分片和索引机制:一个topic有多个partition,一个partition有多个segment,一个segment有.index文件(存储索引)和.log文件(存储数据)组成,文件名为当前文件的第一个offset
生产者
分区策略
- 方便在集群中扩展,可以提高并发
- 原则
- 有partition值:直接使用
- 无partition值有key值:将key的hash值与topic的partition取余得到值
- 无partition值无key值:采用Sticky Partition(黏性分区器),随机选择一个分区并尽量一直使用该分区,待该分区的batch已满或已完成后再随机一个分区进行使用
数据可靠性
- topic的每个partition收到producer发送的数据后,都要向producer发送ack(acknowledgement确认收到),若producer收到ack说明发送成功,否则重新发送数据
- ACK
- 全部的(ISR中的)follewer同步完成发送
- 参数配置
- 0:leader接收到消息还没有写入磁盘就已经返回ack,leader故障可能丢失数据
- 1: leader落盘成功后返回ack,follower同步成功前leader故障会丢失数据
- -1(all): leader和follower全部落盘成功后才返回ack,在follower同步完成后,broker发送ack前,leader发生故障,会造成数据重复
- ISR
- Leader维护一个动态的ISR(在同步列表的副本)
- 当ISR中的follower完成数据的同步之后,leader会给producer发送ack
- 若follower长时间未向leader同步数据,该follower将被踢出ISR,时间阈值由
replica.lag.time.max.ms
参数设定 - Leader发生故障之后会从ISR中选举新的leader
故障处理
- LEO:每个副本最后一个offset
- HW:所有副本中最小的LEO(消费者能见到的最大的offset),每个节点都同步完的最后一个offset
- follower故障
- 该follower被踢出ISR
- 该follower截掉高于HW的部分,从HW开始向leader进行同步
- 等到follower的LEO大于等于该Partition的HW后重新加入ISR
- leader故障
- 按副本顺序选举出新的leader
- 其余follower将各自log文件高于HW的部分截掉,从新的leader同步数据
- 只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复
消费者
- 采用pull(拉)模式从broker中读取数据,如果当前没有数据可供消费,会等待一段时间之后再返回
分区策略
- RoundRobin:轮询
- Range:默认,范围分区,分区数与消费者个数取余
- Sticky:第一次分区时与轮询一样,消费者减少时原有的分配不变
offset的维护
- 0.9版本之前:将offset保存在Zookeeper中
- 0.9版本开始:将offset保存在Kafka一个内置的topic中,该topic为**__consumer_offsets**
读写数据
- 顺序写磁盘:写的过程是一直追加到文件末端
- 页缓存
- 零复制
Zookeeper作用
- Kafka集群中有一个broker会被选举为Controller,负责管理集群broker的上下线,所有topic的分区副本分配和leader选举等工作,Controller的管理工作都依赖于Zookeeper
API
生产者
发送流程
采用异步发送
消息发送
1
2
3
4
5
6
7
8
9
10main线程 recordAccumulator sender线程 topic
=============+=========================+==========+=========
Producer | | |
↓ | | |
Interceptors | | |
↓ | | |
Serializer | | |
↓ | ↗ [分区1的RecordBatch] → | Sender → | [分区1]
Partitoner → | → [分区2的RecordBatch] → | Sender → | [分区2]
| ↘ [分区3的RecordBatch] → | Sender → | [分区3]- main线程:程将消息发送给RecordAccumulator
- sender线程:不断从RecordAccumulator中拉取消息发送到Kafka broker
- recordAccumulator:线程共享变量
相关参数
batch.size
:只有数据积累到batch.size
之后,sender才会发送数据linger.ms
:若数据未达到batch.size
,sender等待linger.time
后发送数据
异步发送
依赖
1
2
3
4
5
6
7
8
9
10
11<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.17.2</version>
</dependency>log4j配置文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="error" strict="true" name="XMLConfig">
<Appenders>
<!-- 类型名为Console,名称为必须属性 -->
<Appender type="Console" name="STDOUT">
<!-- 布局为PatternLayout的方式,
输出样式为[INFO] [2018-01-22 17:34:01][org.test.Console]I'm here -->
<Layout type="PatternLayout"
pattern="[%p] [%d{yyyy-MM-dd HH:mm:ss}][%c{10}]%m%n"/>
</Appender>
</Appenders>
<Loggers>
<!-- 可加性为false -->
<Logger name="test" level="info" additivity="false">
<AppenderRef ref="STDOUT"/>
</Logger>
<!-- root loggerConfig设置 -->
<Root level="info">
<AppenderRef ref="STDOUT"/>
</Root>
</Loggers>
</Configuration>不带回调函数的API
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37public static void main(String[] args) {
// 创建配置对象
Properties props = new Properties();
// Kafka集群位置,需要指定
// props.put("bootstrap.servers", "bigdata1:9092");
// CommonClientConfigs:通用配置类
// --ProducerConfig:生产者配置类
// --ConsumerConfig:消费者配置类
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "bigdata1:9092");
// 序列化器,需要指定
props.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer"
);
props.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer"
);
props.put("acks", "all");
//重试次数
props.put("retries", 1);
//批次大小
props.put("batch.size", 16384);
//等待时间
props.put("linger.ms", 1);
//RecordAccumulator缓冲区大小
props.put("buffer.memory", 33554432);
// 创建生产者对象
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// 生产数据
for (int i = 0; i < 10; i++) {
// 指定partition
producer.send(new ProducerRecord<>("topicA", 0, "key" + i, "val" + i));
}
// 关闭对象
producer.close();
}带回调函数的API
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38public class AsyncWithCallback {
private static final Logger log = LoggerFactory.getLogger(AsyncWithCallback.class);
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "bigdata1:9092");
props.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer"
);
props.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer"
);
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 10; i++) {
producer.send(
new ProducerRecord<>("topicA", 0, "key" + i, "val" + i),
// 回调
new Callback() {
/**
* 消息发送完成后会调用该方法
* @param recordMetadata 消息的元数据信息
* @param e 消息发送过程中抛出的异常
*/
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) log.warn("消息发送失败: {}", e.getMessage());
else log.info("消息发送成功: {}::{}::{}", recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset());
}
}
);
}
producer.close();
}
}
同步发送
一条消息发送之后,会阻塞当前线程,直至返回ack
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23for (int i = 0; i < 10; i++) {
Future<RecordMetadata> future = producer.send(
new ProducerRecord<>("topicA", 0, "key" + i, "val" + i),
// 回调
new Callback() {
/**
* 消息发送完成后会调用该方法
* @param recordMetadata 消息的元数据信息
* @param e 消息发送过程中抛出的异常
*/
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) log.warn("消息发送失败: {}", e.getMessage());
else
log.info("消息发送成功: {}::{}::{}", recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset());
}
}
);
log.info("消息发送");
// 会阻塞当前线程,直到该方法的结果返回
future.get();
log.info("消息发送完成");
}
自定义分区器
默认分区器:DefaultPartitioner
自定义分区器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26public class MyPartitioner implements Partitioner {
/**
* 计算分区号
* @param s 消息的主题
* @param o 消息的key
* @param bytes 消息的key序列化后的字节数组
* @param o1 消息的val
* @param bytes1 消息的val序列化后的字节数组
* @param cluster 上下文对象
* @return 分区号
*/
@Override
public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
return o1.toString().contains("shanghai") ? 0 : 1;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> map) {
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24public class ProducerWithPartitioner {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "bigdata1:9092");
props.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer"
);
props.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer"
);
// 使用自定义分区
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "cc.mousse.partitioner.MyPartitioner");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 10; i++) {
String val = i % 2 == 0 ? "Shanghai" : "Beijing";
producer.send(new ProducerRecord<>("topicA", 0, "city", val + i));
}
producer.close();
}
}
消费者
自动提交offset
1 |
|
重置offset
- 参数
- earliest:重置到最早的offset
- latest:默认,重置到最后的offset
- 重置情况
- 新的组
- 要消费的offset对应的消息已经被删除
1 |
|
手动提交offset
- Comsumer启动后第一次会向kafka获取offset数据,后续获取数据时在内存中记录和读取新的offset
1 |
|
自定义拦截器
1 |
|
1 |
|
1 |
|
与FLume对接
Kafka Sink
往Kafka中写数据:Flume是sink,Kafka是生产者
Flume → Kafka
netcat-flume-kafka.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27#Named
a1.sources = r1
a1.channels = c1
a1.sinks = k1
#Source
a1.sources.r1.type = netcat
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 6666
#Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100
#Sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = bigdata1:9092,bigdata2:9092,bigdata3:9092
a1.sinks.k1.kafka.topic = flumetopic
a1.sinks.k1.kafka.flumeBatchSize = 100
# 使用Flume Event格式,为true会保留header数据
a1.sinks.k1.useFlumeEventFormat = true
a1.sinks.k1.kafka.producer.acks = -1
#Bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1运行
1
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/kafka/netcat-flume-kafka.conf -n a1 -Dflume.root.logger=INFO,console
Flume → Kafka: KafkaSink多topic支持
netcat-flume-kafkatopic.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30#Named
a1.sources = r1
a1.channels = c1
a1.sinks = k1
#Source
a1.sources.r1.type = netcat
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 6666
#Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100
#Interceptor
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = cc.mousse.flumeinterceptor.DataValueInterceptor$MyBuilder
#Sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = bigdata1:9092,bigdata2:9092,bigdata3:9092
a1.sinks.k1.kafka.topic = topicOT
a1.sinks.k1.kafka.flumeBatchSize = 100
a1.sinks.k1.useFlumeEventFormat = true
a1.sinks.k1.kafka.producer.acks = -1
#Bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1运行
1
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/kafka/netcat-flume-kafkatopic.conf -n a1 -Dflume.root.logger=INFO,console
Kafka Source
从kafka中读取数据:Flume是source,Kafka是消费者
Kafka → Flume
kafka-flume-logger.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24#Named
a1.sources = r1
a1.channels = c1
a1.sinks = k1
#Source
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.kafka.bootstrap.servers = bigdata1:9092,bigdata2:9092,bigdata3:9092
a1.sources.r1.kafka.topics = first
a1.sources.r1.kafka.consumer.group.id = flume
a1.sources.r1.batchSize = 100
a1.sources.r1.useFlumeEventFormat = false
#Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100
#Sink
a1.sinks.k1.type = logger
#Bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1运行
1
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/kafka/kafka-flume-logger.conf -n a1 -Dflume.root.logger=INFO,console
Kafka Channel
作为基本的channel使用:xxxSource → KafkaChannel → xxxSink
支持往kafka中写入数据:xxxSource → KafkaChannel
支持从Kafka中读取数据:kafkaChannel → xxxSink
KafkaChannel → xxxSink
kafkachannel-flume-logger.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19#Named
a1.channels = c1
a1.sinks = k1
#Source
#Channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = bigdata1:9092,bigdata2:9092,bigdata3:9092
a1.channels.c1.kafka.topic = first
a1.channels.c1.kafka.consumer.group.id = flume
a1.channels.c1.kafka.consumer.auto.offset.reset = latest
a1.channels.c1.parseAsFlumeEvent = false
#Sink
a1.sinks.k1.type = logger
#Bind
a1.sinks.k1.channel = c1运行
1
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/kafka/kafkachannel-flume-logger.conf -n a1 -Dflume.root.logger=INFO,console
xxxSource → KafkaChannel
netcat-flume-kafkachannel.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19#Named
a1.sources = r1
a1.channels = c1
#Source
a1.sources.r1.type = netcat
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 6666
#Channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = bigdata1:9092,bigdata2:9092,bigdata3:9092
a1.channels.c1.kafka.topic = first
a1.channels.c1.parseAsFlumeEvent = false
#Sink
#Bind
a1.sources.r1.channels = c1运行
1
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/kafka/netcat-flume-kafkachannel.conf -n a1 -Dflume.root.logger=INFO,console
监控
部署Kafka Eagle步骤
修改kafka-server-start.sh命令并分发
1
2
3
4
5
6
7#vim /kafka/bin/kafka-server-start.sh
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"
export JMX_PORT="9999"
#export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
fi下载kafka-eagle并给启动文件执行权限
1
chmod 777 ke.sh
修改配置文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20# vim /eagle/conf/system-config.properties
######################################
# multi zookeeper&kafka cluster list
######################################
kafka.eagle.zk.cluster.alias=cluster1
cluster1.zk.list=bigdata1:2181,bigdata2:2181,bigdata3:2181
######################################
# kafka offset storage
######################################
cluster1.kafka.eagle.offset.storage=kafka
######################################
# kafka jdbc driver address
######################################
kafka.eagle.driver=com.mysql.cj.jdbc.Driver
kafka.eagle.url=jdbc:mysql://bigdata1:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
kafka.eagle.username=root
kafka.eagle.password=root添加环境变量
1
2
3# KE_HOME
export KE_HOME=/eagle
export PATH=$PATH:$KE_HOME/bin启动:启动之前需要先启动ZK以及KAFKA
1
/eagle/bin/ke.sh start
登录页面查看监控数据:http://10.211.55.15:8048
面试题
ISR和AR
- ISR:同步列表,与leader保持同步的follower集合
- AR:分区的所有副本
HW和LEO
- LEO:每个副本的最后条消息的offset
- HW:一个分区中所有副本最小的offset
消息顺序性
- 区内有序,每个分区内,每条消息都有一个offset,故只能保证分区内有序
处理顺序
- 拦截器 → 序列化器 → 分区器
生产者客户端的整体结构
- main线程:程将消息发送给RecordAccumulator
- sender线程:不断从RecordAccumulator中拉取消息发送到Kafka broker
- recordAccumulator:线程共享变量
消费者提交消费位移
- 提交的是当前消费到的最新消息的offset +1
重复消费和消消费
- 可能导致重复消费:先消费,后提交offset
- 可能导致漏消费:先提交offset,后消费
创建(删除)topic之后Kafka背后的逻辑
- 在zookeeper中的/brokers/topics节点下创建一个新的topic节点
- 触发Controller的监听程序
- Kafka Controller负责topic的创建工作,并更新metadata cache
topic的分区数
- 可以增加,不可以减少
Kafka内部topic
- __consumer_offsets:保存消费者offset
分区分配概念
- 生产者
- 有partition值:直接使用
- 无partition值有key值:将key的hash值与topic的partition取余得到值
- 无partition值无key值:采用Sticky Partition(黏性分区器),随机选择一个分区并尽量一直使用该分区,待该分区的batch已满或已完成后再随机一个分区进行使用
- 消费者
- RoundRobin:轮询
- Range:默认,范围分区,分区数与消费者个数取余
- Sticky:第一次分区时与轮询一样,消费者减少时原有的分配不变
- 生产者
Kafka日志目录结构
- 一个topic有多个partition,一个partition有多个segment,一个segment有.index文件(存储索引)和.log文件(存储数据)组成,文件名为当前文件的第一个offset
- Kafka根据offset判断在哪个segment中,从该segment中的index文件找到消息的位置
Kafka Controller
- 负责管理集群broker的上下线,所有topic的分区副本分配和leader选举等工作
需要选举的地方及其选举策略
- partition leader:根据ISR顺序
- controller:争抢
失效副本
- 不能及时与leader同步,暂时踢出ISR,等其追上leader之后再重新加入
- follower故障
- 该follower被踢出ISR
- 该follower截掉高于HW的部分,从HW开始向leader进行同步
- 等到follower的LEO大于等于该Partition的HW后重新加入ISR
- leader故障
- 按副本顺序选举出新的leader
- 其余follower将各自log文件高于HW的部分截掉,从新的leader同步数据
- 只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复
Kafka高性能设计
- 分区,顺序写磁盘,0-copy