Kafka

Kafka

概述

  • 一个分布式的基于发布/订阅模式的消息队列

基础架构

  • Kafka集群
    • 由多个Broker组成,每个Broker拥有唯一的id
    • 有多个Topic,每个Topic可有多个分区(partition),每个分区可有多个副本(replication)
    • 一个Topic的多个分区可以存在到一个Broker中, 一个分区的多个副本只能在不同的Broker存在
    • 一个分区的多个副本由一个leader和多个follower组成
    • 生产者和消费者读写数据面向leader
    • follower主要同步leader的数据。leader故障后follower代替leader工作
  • 生产者
    • 往topic中发布消息
  • 消费者
    • 从topic中消费消息
    • 以消费者组为单位进行消息消费
    • 一个消费者组内的一个消费者可以同时消费一个topic中多个分区的消息
    • 一个Topic中的一个分区的消息同时只能被一个消费者组中的一个消费者消费
  • Zookeeper
    • Kafka集群的工作需要依赖zookeeper,每个broker启动后需要向zookeeper注册
    • Broker中大哥(controller)的选举:争抢策略
    • Kafka 0.9版本之前消费者组的offset维护在zookeeper中,0.9版本之后维护在kafka内部

安装部署

  • http://kafka.apache.org/downloads.html

  • 在kafka目录下创建data文件夹

  • 修改配置文件

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    # vim /kafka/config/server.properties
    # broker的全局唯一编号,不能重复
    # bigdata1/2/3改动
    broker.id=0
    # 删除topic功能使能,当前版本此配置默认为true,已从配置文件移除
    delete.topic.enable=true
    # 处理网络请求的线程数量
    num.network.threads=3
    # 用来处理磁盘IO的线程数量
    num.io.threads=8
    # 发送套接字的缓冲区大小
    socket.send.buffer.bytes=102400
    # 接收套接字的缓冲区大小
    socket.receive.buffer.bytes=102400
    # 请求套接字的缓冲区大小
    socket.request.max.bytes=104857600
    # 消息存放的路径
    # 改动
    log.dirs=/kafka/data
    # topic在当前broker上的分区个数
    num.partitions=1
    # 用来恢复和清理data下数据的线程数量
    num.recovery.threads.per.data.dir=1
    # segment文件保留的最长时间,超时将被删除
    log.retention.hours=168
    # 配置连接Zookeeper集群地址
    # 改动
    zookeeper.connect=bigdata1:2181,bigdata2:2181,bigdata3:2181
  • 配置环境变量

    1
    2
    3
    4
    vim /etc/profile.d/env.sh
    # KAFKA_HOME
    export KAFKA_HOME=/kafka
    export PATH=$PATH:$KAFKA_HOME/bin

起停脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# vim /usr/local/bin/kafka.sh
#!/bin/bash
if [ $# -lt 1 ]
then
echo "USAGE:kafka.sh {start|stop}"
exit
fi

case $1 in

start)
for i in bigdata1 bigdata2 bigdata3
do
echo ":::: START $i KAFKA ::::"
ssh $i /kafka/bin/kafka-server-start.sh -daemon /kafka/config/server.properties
done
;;
stop)
for i in bigdata1 bigdata2 bigdata3
do
echo ":::: STOP $i KAFKA ::::"
ssh $i /kafka/bin/kafka-server-stop.sh
done
;;
*)
echo "USAGE:kafka.sh {start|stop}"
exit
;;
esac

命令行操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 查看topic 列表
kafka-topics.sh --list --bootstrap-server bigdata1:9092
# 创建topic,默认1个分区,1个副本
kafka-topics.sh --create --bootstrap-server bigdata1:9092 --topic [topic名称]
kafka-topics.sh --create --bootstrap-server bigdata1:9092 --topic [topic名称] --partitions [分区数] --replication-factor [副本数(包括自己)]
# 查看Topic详情
kafka-topics.sh --describe --bootstrap-server bigdata1:9092 --topic [topic名称]
# 修改Topic的分区数(只能改大)
kafka-topics.sh --alter --bootstrap-server bigdata1:9092 --topic [topic名称] --partitions [分区数]
# 删除Topic
kafka-topics.sh --delete --bootstrap-server bigdata1:9092 --topic [topic名称]

# 生产者
kafka-console-producer.sh --broker-list bigdata1:9092 --topic [topic名称]

# 消费者,存在数据offset重置问题:新启动的消费者组中的消费者消费不到topic中的数据
kafka-console-consumer.sh --bootstrap-server bigdata1:9092 --topic [topic名称]
# 从头消费,不存在数据offset重置问题,保证分区内有序但不能保证整体有序
kafka-console-consumer.sh --bootstrap-server bigdata1:9092 --topic [topic名称] --from-beginning
# 消费者组,在/kafka/config/consumer.properties中可以指定group.id
kafka-console-consumer.sh --bootstrap-server bigdata1:9092 --topic [topic名称] --consumer.config /kafka/config/consumer.properties
kafka-console-consumer.sh --bootstrap-server bigdata1:9092 --topic [topic名称] --group [消费者组名称]

原理

存储机制

  • topic是逻辑上的概念,partition是物理上的概念
  • 每个partition对应一个log文件,该log文件中存储的就是Producer生产的数据,Producer生产的数据会被不断追加到该log文件末端(追加写),每条数据都有自己的offset
  • 消费者组中的每个消费者,都会实时记录自己消费到了哪个offset,以便出错恢复时,从上次的位置继续消费
  • 采取分片和索引机制:一个topic有多个partition,一个partition有多个segment,一个segment有.index文件(存储索引)和.log文件(存储数据)组成,文件名为当前文件的第一个offset

生产者

分区策略

  • 方便在集群中扩展,可以提高并发
  • 原则
    • 有partition值:直接使用
    • 无partition值有key值:将key的hash值与topic的partition取余得到值
    • 无partition值无key值:采用Sticky Partition(黏性分区器),随机选择一个分区并尽量一直使用该分区,待该分区的batch已满或已完成后再随机一个分区进行使用

数据可靠性

  • topic的每个partition收到producer发送的数据后,都要向producer发送ack(acknowledgement确认收到),若producer收到ack说明发送成功,否则重新发送数据
  • ACK
    • 全部的(ISR中的)follewer同步完成发送
    • 参数配置
      • 0:leader接收到消息还没有写入磁盘就已经返回ack,leader故障可能丢失数据
      • 1: leader落盘成功后返回ack,follower同步成功前leader故障会丢失数据
      • -1(all): leader和follower全部落盘成功后才返回ack,在follower同步完成后,broker发送ack前,leader发生故障,会造成数据重复
  • ISR
    • Leader维护一个动态的ISR(在同步列表的副本)
    • 当ISR中的follower完成数据的同步之后,leader会给producer发送ack
    • 若follower长时间未向leader同步数据,该follower将被踢出ISR,时间阈值由replica.lag.time.max.ms参数设定
    • Leader发生故障之后会从ISR中选举新的leader

故障处理

  • LEO:每个副本最后一个offset
  • HW:所有副本中最小的LEO(消费者能见到的最大的offset),每个节点都同步完的最后一个offset
  • follower故障
    • 该follower被踢出ISR
    • 该follower截掉高于HW的部分,从HW开始向leader进行同步
    • 等到follower的LEO大于等于该Partition的HW后重新加入ISR
  • leader故障
    • 按副本顺序选举出新的leader
    • 其余follower将各自log文件高于HW的部分截掉,从新的leader同步数据
    • 只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复

消费者

  • 采用pull(拉)模式从broker中读取数据,如果当前没有数据可供消费,会等待一段时间之后再返回

分区策略

  • RoundRobin:轮询
  • Range:默认,范围分区,分区数与消费者个数取余
  • Sticky:第一次分区时与轮询一样,消费者减少时原有的分配不变

offset的维护

  • 0.9版本之前:将offset保存在Zookeeper中
  • 0.9版本开始:将offset保存在Kafka一个内置的topic中,该topic为**__consumer_offsets**

读写数据

  • 顺序写磁盘:写的过程是一直追加到文件末端
  • 页缓存
  • 零复制

Zookeeper作用

  • Kafka集群中有一个broker会被选举为Controller,负责管理集群broker的上下线,所有topic的分区副本分配和leader选举等工作,Controller的管理工作都依赖于Zookeeper

API

生产者

发送流程

  • 采用异步发送

  • 消息发送

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    main线程				recordAccumulator		     sender线程		topic
    =============+=========================+==========+=========
    Producer | | |
    ↓ | | |
    Interceptors | | |
    ↓ | | |
    Serializer | | |
    ↓ | ↗ [分区1的RecordBatch] → | Sender → | [分区1]
    Partitoner → | → [分区2的RecordBatch] → | Sender → | [分区2]
    | ↘ [分区3的RecordBatch] → | Sender → | [分区3]
    • main线程:程将消息发送给RecordAccumulator
    • sender线程:不断从RecordAccumulator中拉取消息发送到Kafka broker
    • recordAccumulator:线程共享变量
  • 相关参数

    • batch.size:只有数据积累到batch.size之后,sender才会发送数据
    • linger.ms:若数据未达到batch.size,sender等待linger.time后发送数据

异步发送

  • 依赖

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.2.0</version>
    </dependency>

    <dependency>
    <groupId>org.apache.logging.log4j</groupId>
    <artifactId>log4j-slf4j-impl</artifactId>
    <version>2.17.2</version>
    </dependency>
  • log4j配置文件

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    <?xml version="1.0" encoding="UTF-8"?>
    <Configuration status="error" strict="true" name="XMLConfig">
    <Appenders>
    <!-- 类型名为Console,名称为必须属性 -->
    <Appender type="Console" name="STDOUT">
    <!-- 布局为PatternLayout的方式,
    输出样式为[INFO] [2018-01-22 17:34:01][org.test.Console]I'm here -->
    <Layout type="PatternLayout"
    pattern="[%p] [%d{yyyy-MM-dd HH:mm:ss}][%c{10}]%m%n"/>
    </Appender>
    </Appenders>
    <Loggers>
    <!-- 可加性为false -->
    <Logger name="test" level="info" additivity="false">
    <AppenderRef ref="STDOUT"/>
    </Logger>
    <!-- root loggerConfig设置 -->
    <Root level="info">
    <AppenderRef ref="STDOUT"/>
    </Root>
    </Loggers>
    </Configuration>
  • 不带回调函数的API

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    public static void main(String[] args) {
    // 创建配置对象
    Properties props = new Properties();
    // Kafka集群位置,需要指定
    // props.put("bootstrap.servers", "bigdata1:9092");
    // CommonClientConfigs:通用配置类
    // --ProducerConfig:生产者配置类
    // --ConsumerConfig:消费者配置类
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "bigdata1:9092");
    // 序列化器,需要指定
    props.put(
    ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
    "org.apache.kafka.common.serialization.StringSerializer"
    );
    props.put(
    ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
    "org.apache.kafka.common.serialization.StringSerializer"
    );
    props.put("acks", "all");
    //重试次数
    props.put("retries", 1);
    //批次大小
    props.put("batch.size", 16384);
    //等待时间
    props.put("linger.ms", 1);
    //RecordAccumulator缓冲区大小
    props.put("buffer.memory", 33554432);
    // 创建生产者对象
    KafkaProducer<String, String> producer = new KafkaProducer<>(props);
    // 生产数据
    for (int i = 0; i < 10; i++) {
    // 指定partition
    producer.send(new ProducerRecord<>("topicA", 0, "key" + i, "val" + i));
    }
    // 关闭对象
    producer.close();
    }
  • 带回调函数的API

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    public class AsyncWithCallback {

    private static final Logger log = LoggerFactory.getLogger(AsyncWithCallback.class);

    public static void main(String[] args) {
    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "bigdata1:9092");
    props.put(
    ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
    "org.apache.kafka.common.serialization.StringSerializer"
    );
    props.put(
    ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
    "org.apache.kafka.common.serialization.StringSerializer"
    );
    KafkaProducer<String, String> producer = new KafkaProducer<>(props);
    for (int i = 0; i < 10; i++) {
    producer.send(
    new ProducerRecord<>("topicA", 0, "key" + i, "val" + i),
    // 回调
    new Callback() {
    /**
    * 消息发送完成后会调用该方法
    * @param recordMetadata 消息的元数据信息
    * @param e 消息发送过程中抛出的异常
    */
    @Override
    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
    if (e != null) log.warn("消息发送失败: {}", e.getMessage());
    else log.info("消息发送成功: {}::{}::{}", recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset());
    }
    }
    );
    }
    producer.close();
    }

    }

同步发送

  • 一条消息发送之后,会阻塞当前线程,直至返回ack

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    for (int i = 0; i < 10; i++) {
    Future<RecordMetadata> future = producer.send(
    new ProducerRecord<>("topicA", 0, "key" + i, "val" + i),
    // 回调
    new Callback() {
    /**
    * 消息发送完成后会调用该方法
    * @param recordMetadata 消息的元数据信息
    * @param e 消息发送过程中抛出的异常
    */
    @Override
    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
    if (e != null) log.warn("消息发送失败: {}", e.getMessage());
    else
    log.info("消息发送成功: {}::{}::{}", recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset());
    }
    }
    );
    log.info("消息发送");
    // 会阻塞当前线程,直到该方法的结果返回
    future.get();
    log.info("消息发送完成");
    }

自定义分区器

  • 默认分区器:DefaultPartitioner

  • 自定义分区器

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    public class MyPartitioner implements Partitioner {

    /**
    * 计算分区号
    * @param s 消息的主题
    * @param o 消息的key
    * @param bytes 消息的key序列化后的字节数组
    * @param o1 消息的val
    * @param bytes1 消息的val序列化后的字节数组
    * @param cluster 上下文对象
    * @return 分区号
    */
    @Override
    public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
    return o1.toString().contains("shanghai") ? 0 : 1;
    }

    @Override
    public void close() {
    }

    @Override
    public void configure(Map<String, ?> map) {
    }

    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    public class ProducerWithPartitioner {

    public static void main(String[] args) {
    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "bigdata1:9092");
    props.put(
    ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
    "org.apache.kafka.common.serialization.StringSerializer"
    );
    props.put(
    ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
    "org.apache.kafka.common.serialization.StringSerializer"
    );
    // 使用自定义分区
    props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "cc.mousse.partitioner.MyPartitioner");
    KafkaProducer<String, String> producer = new KafkaProducer<>(props);
    for (int i = 0; i < 10; i++) {
    String val = i % 2 == 0 ? "Shanghai" : "Beijing";
    producer.send(new ProducerRecord<>("topicA", 0, "city", val + i));
    }
    producer.close();
    }

    }

消费者

自动提交offset

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class ConsumerWithAutoCommit {

private static final Logger log = LoggerFactory.getLogger(ConsumerWithAutoCommit.class);

public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "bigdata1:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "thisIsAGroupId");
// 自动提交offset
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
// offset提交间隔
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
// kv反序列化器
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
// 创建消费者对象
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 订阅主题
List<String> topics = new ArrayList<>();
topics.add("topicA");
consumer.subscribe(topics);
// 持续消费数据
while (true) {
// 休眠时间
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(2));
for (ConsumerRecord<String, String> record : records) {
log.info(
"{}.{}.{}: {}::{}",
record.topic(),
record.partition(),
record.offset(),
record.key(),
record.value()
);
}
}
// 关闭生产者对象
// consumer.close();
}

}

重置offset

  • 参数
    • earliest:重置到最早的offset
    • latest:默认,重置到最后的offset
  • 重置情况
    • 新的组
    • 要消费的offset对应的消息已经被删除
1
2
// offset重置
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

手动提交offset

  • Comsumer启动后第一次会向kafka获取offset数据,后续获取数据时在内存中记录和读取新的offset
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class ConsumerWithSyncManualCommit {

private static final Logger log = LoggerFactory.getLogger(ConsumerWithSyncManualCommit.class);

public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "bigdata1:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "thisIsAGroupId");
// 手动提交offset
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
List<String> topics = new ArrayList<>();
topics.add("topicA");
consumer.subscribe(topics);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(2));
for (ConsumerRecord<String, String> record : records) {
log.info(
"{}.{}.{}: {}::{}",
record.topic(),
record.partition(),
record.offset(),
record.key(),
record.value()
);
}
// 同步提交数据
consumer.commitSync();
// 异步提交数据
consumer.commitAsync(new OffsetCommitCallback() {
// 回调方法
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
if (e != null) log.warn("提交失败: {}", e.getMessage());
else log.info("提交成功: {}", map);
}
});

}
// 关闭生产者对象
// consumer.close();
}

}

自定义拦截器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// 将时间戳添加在消息前面
public class TimeStampInterceptor implements ProducerInterceptor<String, String> {

/**
* 拦截器核心方法
* @param producerRecord 被拦截处理的消息
* @return
*/
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) {
// 获取消息的val
String val = producerRecord.value();
// 添加时间戳
val = System.currentTimeMillis() + "::" + val;
// 重新构建消息对象
ProducerRecord<String, String> newRecord = new ProducerRecord<>(
producerRecord.topic(),
producerRecord.partition(),
producerRecord.key(),
val
);
return newRecord;
}

@Override
public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {}

@Override
public void close() {}

@Override
public void configure(Map<String, ?> map) {}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// 统计发送成功和失败的消息数
public class StatusCountInterceptor implements ProducerInterceptor<String, String> {

private Logger log = LoggerFactory.getLogger(ProducerInterceptor.class);
private int success, fail;

@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) {
return producerRecord;
}

@Override
public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
if (e != null) fail++;
else success++;
}

@Override
public void close() {
log.info("success: {}", success);
log.info("fail: {}", fail);
}

@Override
public void configure(Map<String, ?> map) {}

}
1
2
3
// 配置拦截器,按顺序执行
List<String> interceptors = Arrays.asList("cc.mousse.interceptor.TimeStampInterceptor", "cc.mousse.interceptor.StatusCountInterceptor");
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);

与FLume对接

Kafka Sink

  • 往Kafka中写数据:Flume是sink,Kafka是生产者

  • Flume → Kafka

    • netcat-flume-kafka.conf

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      #Named
      a1.sources = r1
      a1.channels = c1
      a1.sinks = k1

      #Source
      a1.sources.r1.type = netcat
      a1.sources.r1.bind = 0.0.0.0
      a1.sources.r1.port = 6666

      #Channel
      a1.channels.c1.type = memory
      a1.channels.c1.capacity = 10000
      a1.channels.c1.transactionCapacity = 100

      #Sink
      a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
      a1.sinks.k1.kafka.bootstrap.servers = bigdata1:9092,bigdata2:9092,bigdata3:9092
      a1.sinks.k1.kafka.topic = flumetopic
      a1.sinks.k1.kafka.flumeBatchSize = 100
      # 使用Flume Event格式,为true会保留header数据
      a1.sinks.k1.useFlumeEventFormat = true
      a1.sinks.k1.kafka.producer.acks = -1

      #Bind
      a1.sources.r1.channels = c1
      a1.sinks.k1.channel = c1
    • 运行

      1
      flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/kafka/netcat-flume-kafka.conf -n a1 -Dflume.root.logger=INFO,console
  • Flume → Kafka: KafkaSink多topic支持

    • netcat-flume-kafkatopic.conf

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      #Named
      a1.sources = r1
      a1.channels = c1
      a1.sinks = k1

      #Source
      a1.sources.r1.type = netcat
      a1.sources.r1.bind = 0.0.0.0
      a1.sources.r1.port = 6666

      #Channel
      a1.channels.c1.type = memory
      a1.channels.c1.capacity = 10000
      a1.channels.c1.transactionCapacity = 100

      #Interceptor
      a1.sources.r1.interceptors = i1
      a1.sources.r1.interceptors.i1.type = cc.mousse.flumeinterceptor.DataValueInterceptor$MyBuilder

      #Sink
      a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
      a1.sinks.k1.kafka.bootstrap.servers = bigdata1:9092,bigdata2:9092,bigdata3:9092
      a1.sinks.k1.kafka.topic = topicOT
      a1.sinks.k1.kafka.flumeBatchSize = 100
      a1.sinks.k1.useFlumeEventFormat = true
      a1.sinks.k1.kafka.producer.acks = -1

      #Bind
      a1.sources.r1.channels = c1
      a1.sinks.k1.channel = c1
    • 运行

      1
      flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/kafka/netcat-flume-kafkatopic.conf -n a1 -Dflume.root.logger=INFO,console

Kafka Source

  • 从kafka中读取数据:Flume是source,Kafka是消费者

  • Kafka → Flume

    • kafka-flume-logger.conf

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      #Named
      a1.sources = r1
      a1.channels = c1
      a1.sinks = k1

      #Source
      a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
      a1.sources.r1.kafka.bootstrap.servers = bigdata1:9092,bigdata2:9092,bigdata3:9092
      a1.sources.r1.kafka.topics = first
      a1.sources.r1.kafka.consumer.group.id = flume
      a1.sources.r1.batchSize = 100
      a1.sources.r1.useFlumeEventFormat = false

      #Channel
      a1.channels.c1.type = memory
      a1.channels.c1.capacity = 10000
      a1.channels.c1.transactionCapacity = 100

      #Sink
      a1.sinks.k1.type = logger

      #Bind
      a1.sources.r1.channels = c1
      a1.sinks.k1.channel = c1
    • 运行

      1
      flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/kafka/kafka-flume-logger.conf -n a1 -Dflume.root.logger=INFO,console

Kafka Channel

  • 作为基本的channel使用:xxxSource → KafkaChannel → xxxSink

  • 支持往kafka中写入数据:xxxSource → KafkaChannel

  • 支持从Kafka中读取数据:kafkaChannel → xxxSink

  • KafkaChannel → xxxSink

    • kafkachannel-flume-logger.conf

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      #Named
      a1.channels = c1
      a1.sinks = k1

      #Source

      #Channel
      a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
      a1.channels.c1.kafka.bootstrap.servers = bigdata1:9092,bigdata2:9092,bigdata3:9092
      a1.channels.c1.kafka.topic = first
      a1.channels.c1.kafka.consumer.group.id = flume
      a1.channels.c1.kafka.consumer.auto.offset.reset = latest
      a1.channels.c1.parseAsFlumeEvent = false

      #Sink
      a1.sinks.k1.type = logger

      #Bind
      a1.sinks.k1.channel = c1
    • 运行

      1
      flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/kafka/kafkachannel-flume-logger.conf -n a1 -Dflume.root.logger=INFO,console
  • xxxSource → KafkaChannel

    • netcat-flume-kafkachannel.conf

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      #Named
      a1.sources = r1
      a1.channels = c1

      #Source
      a1.sources.r1.type = netcat
      a1.sources.r1.bind = 0.0.0.0
      a1.sources.r1.port = 6666

      #Channel
      a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
      a1.channels.c1.kafka.bootstrap.servers = bigdata1:9092,bigdata2:9092,bigdata3:9092
      a1.channels.c1.kafka.topic = first
      a1.channels.c1.parseAsFlumeEvent = false

      #Sink

      #Bind
      a1.sources.r1.channels = c1
    • 运行

      1
      flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/kafka/netcat-flume-kafkachannel.conf -n a1 -Dflume.root.logger=INFO,console

监控

  • 部署Kafka Eagle步骤

    • 修改kafka-server-start.sh命令并分发

      1
      2
      3
      4
      5
      6
      7
      #vim /kafka/bin/kafka-server-start.sh

      if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
      export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"
      export JMX_PORT="9999"
      #export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
      fi
    • 下载kafka-eagle并给启动文件执行权限

      1
      chmod 777 ke.sh 
    • 修改配置文件

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      # vim /eagle/conf/system-config.properties

      ######################################
      # multi zookeeper&kafka cluster list
      ######################################
      kafka.eagle.zk.cluster.alias=cluster1
      cluster1.zk.list=bigdata1:2181,bigdata2:2181,bigdata3:2181

      ######################################
      # kafka offset storage
      ######################################
      cluster1.kafka.eagle.offset.storage=kafka

      ######################################
      # kafka jdbc driver address
      ######################################
      kafka.eagle.driver=com.mysql.cj.jdbc.Driver
      kafka.eagle.url=jdbc:mysql://bigdata1:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
      kafka.eagle.username=root
      kafka.eagle.password=root
    • 添加环境变量

      1
      2
      3
      # KE_HOME
      export KE_HOME=/eagle
      export PATH=$PATH:$KE_HOME/bin
    • 启动:启动之前需要先启动ZK以及KAFKA

      1
      /eagle/bin/ke.sh start
    • 登录页面查看监控数据:http://10.211.55.15:8048

面试题

  • ISR和AR

    • ISR:同步列表,与leader保持同步的follower集合
    • AR:分区的所有副本
  • HW和LEO

    • LEO:每个副本的最后条消息的offset
    • HW:一个分区中所有副本最小的offset
  • 消息顺序性

    • 区内有序,每个分区内,每条消息都有一个offset,故只能保证分区内有序
  • 处理顺序

    • 拦截器 → 序列化器 → 分区器
  • 生产者客户端的整体结构

    • main线程:程将消息发送给RecordAccumulator
    • sender线程:不断从RecordAccumulator中拉取消息发送到Kafka broker
    • recordAccumulator:线程共享变量
  • 消费者提交消费位移

    • 提交的是当前消费到的最新消息的offset +1
  • 重复消费和消消费

    • 可能导致重复消费:先消费,后提交offset
    • 可能导致漏消费:先提交offset,后消费
  • 创建(删除)topic之后Kafka背后的逻辑

    • 在zookeeper中的/brokers/topics节点下创建一个新的topic节点
    • 触发Controller的监听程序
    • Kafka Controller负责topic的创建工作,并更新metadata cache
  • topic的分区数

    • 可以增加,不可以减少
  • Kafka内部topic

    • __consumer_offsets:保存消费者offset
  • 分区分配概念

    • 生产者
      • 有partition值:直接使用
      • 无partition值有key值:将key的hash值与topic的partition取余得到值
      • 无partition值无key值:采用Sticky Partition(黏性分区器),随机选择一个分区并尽量一直使用该分区,待该分区的batch已满或已完成后再随机一个分区进行使用
    • 消费者
      • RoundRobin:轮询
      • Range:默认,范围分区,分区数与消费者个数取余
      • Sticky:第一次分区时与轮询一样,消费者减少时原有的分配不变
  • Kafka日志目录结构

    • 一个topic有多个partition,一个partition有多个segment,一个segment有.index文件(存储索引)和.log文件(存储数据)组成,文件名为当前文件的第一个offset
    • Kafka根据offset判断在哪个segment中,从该segment中的index文件找到消息的位置
  • Kafka Controller

    • 负责管理集群broker的上下线,所有topic的分区副本分配和leader选举等工作
  • 需要选举的地方及其选举策略

    • partition leader:根据ISR顺序
    • controller:争抢
  • 失效副本

    • 不能及时与leader同步,暂时踢出ISR,等其追上leader之后再重新加入
    • follower故障
      • 该follower被踢出ISR
      • 该follower截掉高于HW的部分,从HW开始向leader进行同步
      • 等到follower的LEO大于等于该Partition的HW后重新加入ISR
    • leader故障
      • 按副本顺序选举出新的leader
      • 其余follower将各自log文件高于HW的部分截掉,从新的leader同步数据
      • 只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复
  • Kafka高性能设计

    • 分区,顺序写磁盘,0-copy

Kafka
http://docs.mousse.cc/Kafka/
作者
Mocha Mousse
发布于
2025年5月26日
许可协议