Flume

概念

概念

概念

  • 基于流式架构,高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统

架构

基础架构

graph LR
1(Web Server) --> 2(Source)
subgraph Agent
2 --> 3(Channel) --> 4(Sink)
end
4 --> 5(HDFS)

Agent

  • 一个JVM进程,以事件形式将数据从源头送至目的
  • Source
    • 负责接收数据到Flume Agent的组件
    • 可处理各类型、各格式的日志数据,包括avro、thrift、exec、jms、spooling directory、netcat、 taildir
  • Channel
    • Source和Sink之间的缓冲区,允许Source和Sink运作在不同的速率上
    • 线程安全
    • Flume自带两种Channel:Memory Channel和File Channel
  • Sink
    • 不断轮询Channel中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个Flume Agent
    • 组件目的地包括hdfs、logger、avro、thrift、ipc、file、HBase、自定义

Event

  • Flume数据传输的基本单元
  • 组成
    • Header:存放该event的一些属性,K-V结构
    • Body:存放该条数据,形式为字节数组

安装

地址

安装部署

  • 将lib文件夹下的guava-11.0.2.jar删除以兼容Hadoop 3.1.3

    1
    rm /flume/lib/guava-11.0.2.jar

案例

监控端口数据

概念

  • 使用Flume监听一个端口,收集该端口数据,并打印到控制台

配置

  • 文件名:netcat-flume-logger.conf

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    # Named
    # a1:自定义该Agent的名称
    a1.sources = r1
    a1.channels = c1
    a1.sinks = k1

    # Source
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = localhost
    a1.sources.r1.port = 6666

    # Channel
    a1.channels.c1.type = memory
    # Channel的容量:该Channel能放多少个Event
    a1.channels.c1.capacity = 10000
    # Channel事物的容量
    a1.channels.c1.transactionCapacity = 100

    # Sink
    a1.sinks.k1.type = logger

    # Bind
    # 一个Source能接多个Channel
    a1.sources.r1.channels = c1
    # 一个Sink只能接一个Channel
    a1.sinks.k1.channel = c1

启动

1
flume-ng agent --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/jobs/netcat-flume-logger.conf --name a1 -Dflume.root.logger=INFO,console

监控单个追加文件

概念

  • 实时监控单个追加文件,将内容打印到控制台

配置

  • 文件名:exec-flume-logger.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# Named
a1.sources = r1
a1.channels = c1
a1.sinks = k1

# Source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -f /flume/jobs/tail.txt

# Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100

# Sink
a1.sinks.k1.type = logger

# Bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

启动

1
2
3
4
flume-ng agent --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/jobs/netcat-flume-logger.conf --name a1 -Dflume.root.logger=INFO,console

# 简写
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/exec-flume-logger.conf -n a1 -Dflume.root.logger=INFO,console

概念

  • 实时监控单个追加文件,将内容上传到HDFS中

配置

  • 文件名:exec-flume-hdfs.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# Named
a1.sources = r1
a1.channels = c1
a1.sinks = k1

# Source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -f /flume/jobs/tail.txt

#Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100

# Sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://bigdata1:8020/flume/%Y%m%d/%H
# 上传文件的前缀
a1.sinks.k1.hdfs.filePrefix = logs-
# 是否按照时间滚动文件夹
a1.sinks.k1.hdfs.round = true
# 多少时间单位创建一个新的文件夹
a1.sinks.k1.hdfs.roundValue = 1
# 重新定义时间单位
a1.sinks.k1.hdfs.roundUnit = hour
# 是否使用本地时间戳
a1.sinks.k1.hdfs.useLocalTimeStamp = true
# 积攒多少个Event才flush到HDFS一次
a1.sinks.k1.hdfs.batchSize = 100
# 设置文件类型,可支持压缩
a1.sinks.k1.hdfs.fileType = DataStream
# 多久生成一个新的文件
a1.sinks.k1.hdfs.rollInterval = 60
# 设置每个文件的滚动大小
a1.sinks.k1.hdfs.rollSize = 134217700
# 文件的滚动与Event数量无关
a1.sinks.k1.hdfs.rollCount = 0

# Bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

启动

1
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/exec-flume-hdfs.conf -n a1 -Dflume.root.logger=INFO,console

监控多个新文件

配置

  • 文件名:spooling-flume-hdfs.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# Named
a1.sources = r1
a1.channels = c1
a1.sinks = k1

# Source
a1.sources.r1.type = spooldir
# 监控的目录
a1.sources.r1.spoolDir = /flume/jobs/spooling
# 通过后缀名判断文件是否被采集过
a1.sources.r1.fileSuffix = .COMPLETED
# 忽略文件的正则
a1.sources.r1.ignorePattern = .*\.tmp


# Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100

# Sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://bigdata1:8020/flume/%Y%m%d/%H
# 上传文件的前缀
a1.sinks.k1.hdfs.filePrefix = logs-
# 是否按照时间滚动文件夹
a1.sinks.k1.hdfs.round = true
# 多少时间单位创建一个新的文件夹
a1.sinks.k1.hdfs.roundValue = 1
# 重新定义时间单位
a1.sinks.k1.hdfs.roundUnit = hour
# 是否使用本地时间戳
a1.sinks.k1.hdfs.useLocalTimeStamp = true
# 积攒多少个Event才flush到HDFS一次
a1.sinks.k1.hdfs.batchSize = 100
# 设置文件类型,可支持压缩
a1.sinks.k1.hdfs.fileType = DataStream
# 多久生成一个新的文件
a1.sinks.k1.hdfs.rollInterval = 60
# 设置每个文件的滚动大小
a1.sinks.k1.hdfs.rollSize = 134217700
# 文件的滚动与Event数量无关
a1.sinks.k1.hdfs.rollCount = 0

# Bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

启动

1
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/spooling-flume-hdfs.conf -n a1 -Dflume.root.logger=INFO,console

监控多个追加文件

概念

  • 使用Flume监听整个目录的实时追加文件,并上传至HDFS

配置

  • 文件名字: taildir-flume-hdfs.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# Named
a1.sources = r1
a1.channels = c1
a1.sinks = k1

# Source
a1.sources.r1.type = TAILDIR
# 监听的文件组
a1.sources.r1.filegroups = f1 f2
# 每个文件组监控的文件
a1.sources.r1.filegroups.f1 = /flume/jobs/taildir/.*\.txt
a1.sources.r1.filegroups.f2 = /flume/jobs/taildir/.*\.log
# 断点续传功能:记录每个文件读取位置
a1.sources.r1.positionFile = /flume/jobs/position/position.json

# Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100

# Sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://bigdata1:8020/flume/%Y%m%d/%H
# 上传文件的前缀
a1.sinks.k1.hdfs.filePrefix = logs-
# 是否按照时间滚动文件夹
a1.sinks.k1.hdfs.round = true
# 多少时间单位创建一个新的文件夹
a1.sinks.k1.hdfs.roundValue = 1
# 重新定义时间单位
a1.sinks.k1.hdfs.roundUnit = hour
# 是否使用本地时间戳
a1.sinks.k1.hdfs.useLocalTimeStamp = true
# 积攒多少个Event才flush到HDFS一次
a1.sinks.k1.hdfs.batchSize = 100
# 设置文件类型,可支持压缩
a1.sinks.k1.hdfs.fileType = DataStream
# 多久生成一个新的文件
a1.sinks.k1.hdfs.rollInterval = 60
# 设置每个文件的滚动大小
a1.sinks.k1.hdfs.rollSize = 134217700
# 文件的滚动与Event数量无关
a1.sinks.k1.hdfs.rollCount = 0

# Bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

启动

1
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/taildir-flume-hdfs.conf -n a1 -Dflume.root.logger=INFO,console
1
2
3
4
5
// positionFile
[{"inode":1317393,"pos":6,"file":"/opt/module/flume-1.9.0/jobs/taildir/file1.txt"},
{"inode":1317394,"pos":6,"file":"/opt/module/flume-1.9.0/jobs/taildir/file2.txt"},
{"inode":1317395,"pos":5,"file":"/opt/module/flume-1.9.0/jobs/taildir/log1.log"},
{"inode":1317396,"pos":5,"file":"/opt/module/flume-1.9.0/jobs/taildir/log2.log"}]

Source分析

  • Exec source:适用于监控一个实时追加的文件,不能实现断点续传
  • Spooldir Source:适合用于同步新文件,但不适合对实时追加日志的文件进行监听并同步
  • Taildir Source:适合用于监听多个实时追加的文件,并且能够实现断点续传

原理

事务

概念

  • 能保证数据不丢,但不能保证不重复

Put

  1. doPut:Source将批量数据(Batch Data)写入临时缓冲区putList
  2. doCommit:检查Channel内存队列是否足够合并
  3. doRollback:若Channel内存队列空间不足,回滚数据

Take

  1. doTake:Sink将批量数据(Batch Data)取到临时缓冲区takeList,将数据发送到HDFS
  2. doCommit:若诗句全部发送成功,则清除零食缓冲区
  3. doRollback:若数据发送过程出现异常,rollback将临时缓冲区的数据归还给内存队列(可能造成数据重复)

补充

  • Channel大小由capacity决定
  • putList/takeList大小由transactionCapacity决定
  • Batch大小由batchSize决定
  • batchSize ≤ transactionCapacity ≤ capacity

Agent内部原理

  1. Source将接收到的数据封装成Event后交给Channel Processor处理
  2. Channel Processor将数据传递给拦截器链
  3. Channel Processor将拦截器返回的每个Event给Channel Selector
    • Channel Selector种类
      • Replication:将Source过来的Event发往所有Channel
      • Multiplexing:按配置,将Source过来的Event发往特定Channel
  4. Channel Processor根据Channel Selector返回的结果写入相应的Channel
  5. Event进入SinkProcessor
    • SinkProcessor种类
      • DefaultSinkProcessor:只有一个Sink
      • LoadBalanceSinkProcessor:按规则选择Sink
      • FailoverSinkProcessor:只有一个Sink工作,其他Sink作为替补

拓扑结构

简单串联

  • 通过AVRO组件进行连接
graph LR
subgraph 客户端
1(Source) --> 2(Channel) --> 3(Sink<br>Avro)
end
3 --Avro RPC--> 5(Source<br>Avro)
subgraph 服务端
5 --> 6(Channel) --> 7(Sink)
end

复制和多路复用

  • 一份数据可以进行多份处理/数据分离
graph LR
subgraph 客户端
1(Source)
1 --> 2(Channel1) --> 3(Sink1)
1 --> 5(Channel2) --> 6(Sink2)
1 --> 8(Channel3) --> 9(Sink3<br>Avro)
end
subgraph 服务端
3 --> 4(HDFS)
6 --> 7(JMS)
9 -- Arvo RPC --> 10(Source4<br>Avro) --> 11(Channel4) --> 12(Sink4)
end

负载均衡和故障转移

graph LR
subgraph 客户端
1(Source) --> 2(Channel)
2 --> 3(Sink<br>Avro)
2 --> 4(Sink<br>Avro)
2 --> 5(Sink<br>Avro)
end
subgraph 服务端
3 -- Arvo RPC --> 6(Source1<br>Avro) --> 7(Channel1) --> 8(Sink1)
4 -- Arvo RPC --> 9(Source2<br>Avro) --> 10(Channel2) --> 11(Sink2)
5 -- Arvo RPC --> 12(Source3<br>Avro) --> 13(Channel3) --> 14(Sink3)
end
8 --> 15(HDFS)
11 --> 15
14 --> 15

聚合

graph LR
subgraph 客户端
1(Source1) --> 2(Channel1) --> 3(Sink1<br>Avro)
4(Source2) --> 5(Channel2) --> 6(Sink2<br>Avro)
7(Source3) --> 8(Channel3) --> 9(Sink3<br>Avro)
end
subgraph 服务端
3 -- Arvo RPC --> 10(Source4<br>Avro) --> 11(Channel4) --> 12(Sink4)
6 -- Arvo RPC --> 10
9 -- Arvo RPC --> 10
end
12 --> 13(HDFS)

企业案例

复制和多路复用

概念

使用Flume-1监控文件变动,Flume-1将变动内容传递给Flume-2,Flume-2负责存储到HDFS。同时Flume-1将变动内容传递给Flume-3,Flume-3负责输出到Local FileSystem

graph LR
subgraph flume1
1(Source1) --> 2(replicating<br>selector) 
2 --> 3(Channel1) --> 4(Sink1)
2 --> 8(Channel1) --> 9(Sink1)
end
subgraph flume2
4 --> 5(Source2) --> 6(Channel2) --> 7(Sink2)
end
subgraph flume3
9 --> 10(Source3) --> 11(Channel3) --> 12(Sink3)
end

配置

flume1.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
#Named
a1.sources = r1
a1.channels = c1 c2
a1.sinks = k1 k2

#Source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /flume/jobs/taildir/.*\.txt
a1.sources.r1.positionFile = /flume/jobs/position/position.json
#channel selector
a1.sources.r1.selector.type = replicating

#Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type = memory
a1.channels.c2.capacity = 10000
a1.channels.c2.transactionCapacity = 100

#Sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = localhost
a1.sinks.k1.port = 7777

a1.sinks.k2.type = avro
a1.sinks.k2.hostname = localhost
a1.sinks.k2.port = 8888

#Bind
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

flume2.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
a2.sources = r1
a2.channels = c1
a2.sinks = k1

#Source
a2.sources.r1.type = avro
a2.sources.r1.bind = localhost
a2.sources.r1.port = 7777

#Channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 10000
a2.channels.c1.transactionCapacity = 100

#Sink
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = hdfs://bigdata1:8020/flume/%Y%m%d/%H
a2.sinks.k1.hdfs.filePrefix = logs-
a2.sinks.k1.hdfs.round = true
a2.sinks.k1.hdfs.roundValue = 1
a2.sinks.k1.hdfs.roundUnit = hour
a2.sinks.k1.hdfs.useLocalTimeStamp = true
a2.sinks.k1.hdfs.batchSize = 100
a2.sinks.k1.hdfs.fileType = DataStream
a2.sinks.k1.hdfs.rollInterval = 60
a2.sinks.k1.hdfs.rollSize = 134217700
a2.sinks.k1.hdfs.rollCount = 0

#Bind
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

flume3.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#Named
a3.sources = r1
a3.channels = c1
a3.sinks = k1

#Source
a3.sources.r1.type = avro
a3.sources.r1.bind = localhost
a3.sources.r1.port = 8888

#Channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 10000
a3.channels.c1.transactionCapacity = 100

#Sink
a3.sinks.k1.type = file_roll
a3.sinks.k1.sink.directory = /flume/jobs/fileroll

#Bind
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1

启动

1
2
3
4
5
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/replicating/flume3.conf -n a3 -Dflume.root.logger=INFO,console

flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/replicating/flume2.conf -n a2 -Dflume.root.logger=INFO,console

flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/replicating/flume1.conf -n a1 -Dflume.root.logger=INFO,console

负载均衡和故障转移

负载均衡

概念

  • 使用Flume1监控一个端口,其sink组中的sink分别对接Flume2和Flume3
graph LR
1(Source) --> 2(replicating<br>selector) --> 3(Channel) --> 4(load_balance<br>processor)
4 --> 5(Avro<br>Sink) --> 6(Avro<br>Source) --> 7(Channel) --> 8(Sink) --> 9(控制台)
4 --> 10(Avro<br>Sink) --> 11(Avro<br>Source) --> 12(Channel) --> 13(Sink)

配置

flume1.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
#Named
a1.sources = r1
a1.channels = c1
a1.sinks = k1 k2

#Source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 6666
#channel selector
a1.sources.r1.selector.type = replicating

#Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100

#Sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = localhost
a1.sinks.k1.port = 7777

a1.sinks.k2.type = avro
a1.sinks.k2.hostname = localhost
a1.sinks.k2.port = 8888

#Sink
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
#负载均衡的策略
a1.sinkgroups.g1.processor.selector = random

#Bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1
flume2.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
a2.sources = r1
a2.channels = c1
a2.sinks = k1

#Source
a2.sources.r1.type = avro
a2.sources.r1.bind = localhost
a2.sources.r1.port = 7777

#Channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 10000
a2.channels.c1.transactionCapacity = 100

#Sink
a2.sinks.k1.type = logger

#Bind
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
flume3.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#Named
a3.sources = r1
a3.channels = c1
a3.sinks = k1

#Source
a3.sources.r1.type = avro
a3.sources.r1.bind = localhost
a3.sources.r1.port = 8888

#Channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 10000
a3.channels.c1.transactionCapacity = 100

#Sink
a3.sinks.k1.type = logger

#Bind
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1

启动

1
2
3
4
5
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/loadbalancing/flume3.conf -n a3 -Dflume.root.logger=INFO,console

flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/loadbalancing/flume2.conf -n a2 -Dflume.root.logger=INFO,console

flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/loadbalancing/flume1.conf -n a1 -Dflume.root.logger=INFO,console

故障转移

配置

flume1.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
#Named
a1.sources = r1
a1.channels = c1
a1.sinks = k1 k2

#Source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 6666
#channel selector
a1.sources.r1.selector.type = replicating

#Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100

#Sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = localhost
a1.sinks.k1.port = 7777

a1.sinks.k2.type = avro
a1.sinks.k2.hostname = localhost
a1.sinks.k2.port = 8888

#Sink processor
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
#各Sink的权重,只要权重高的活着就往权重高的发
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10


#Bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1
flume2.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
a2.sources = r1
a2.channels = c1
a2.sinks = k1

#Source
a2.sources.r1.type = avro
a2.sources.r1.bind = localhost
a2.sources.r1.port = 7777

#Channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 10000
a2.channels.c1.transactionCapacity = 100

#Sink
a2.sinks.k1.type = logger

#Bind
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
flume3.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#Named
a3.sources = r1
a3.channels = c1
a3.sinks = k1

#Source
a3.sources.r1.type = avro
a3.sources.r1.bind = localhost
a3.sources.r1.port = 8888

#Channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 10000
a3.channels.c1.transactionCapacity = 100

#Sink
a3.sinks.k1.type = logger

#Bind
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1

启动

1
2
3
4
5
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/failover/flume3.conf -n a3 -Dflume.root.logger=INFO,console

flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/failover/flume2.conf -n a2 -Dflume.root.logger=INFO,console

flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/failover/flume1.conf -n a1 -Dflume.root.logger=INFO,console

聚合

概念

  • bigdata1上的Flume-1监控文件/opt/module/group.log,bigdata2上的Flume-2监控某一个端口的数据流,Flume-1与Flume-2将数据发送给bigdata3上的Flume-3,Flume-3将最终数据打印到控制台

配置

flume1.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#Named
a1.sources = r1
a1.channels = c1
a1.sinks = k1

#Source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /flume/jobs/taildir/.*\.txt
a1.sources.r1.positionFile = /flume/jobs/position/position.json

#Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100

#Sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = bigdata3
a1.sinks.k1.port = 8888

#Bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

flume2.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
a2.sources = r1
a2.channels = c1
a2.sinks = k1

#Source
a2.sources.r1.type = netcat
a2.sources.r1.bind = localhost
a2.sources.r1.port = 6666

#Channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 10000
a2.channels.c1.transactionCapacity = 100

#Sink
a2.sinks.k1.type = avro
a2.sinks.k1.hostname = bigdata3
a2.sinks.k1.port = 8888

#Bind
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

flume3.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#Named
a3.sources = r1
a3.channels = c1
a3.sinks = k1

#Source
a3.sources.r1.type = avro
a3.sources.r1.bind = bigdata3
a3.sources.r1.port = 8888

#Channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 10000
a3.channels.c1.transactionCapacity = 100

#Sink
a3.sinks.k1.type = logger

#Bind
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1

启动

1
2
3
4
5
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/aggre/flume3.conf -n a3 -Dflume.root.logger=INFO,console

flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/aggre/flume2.conf -n a2 -Dflume.root.logger=INFO,console

flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/aggre/flume1.conf -n a1 -Dflume.root.logger=INFO,console

自定义组件

Interceptor

概念

graph LR
1(Source<br>Flume1) --> 2(Interceptor) --> 3(multiplexing<br>selector)
3 --> 4(包含Shanghai的<br>Channel) --> 5(Sink) --> 6(Flume2)
3 --> 7(包含Beijing的<br>Channel) --> 8(Sink) --> 9(Flume3)
3 --> 10(其它的<br>Channel) --> 11(Sink) --> 12(Flume4)
  • 基于Event的Header判断

  • 多路案例

依赖

1
2
3
4
5
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.10.0</version>
</dependency>

业务代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
// 实现Flume提供的Interceptor接口
public class EventHeaderInterceptor implements Interceptor {

@Override
public void initialize() {
}

// 拦截方法
@Override
public Event intercept(Event event) {
// 获取event的headers
Map<String, String> headers = event.getHeaders();
// 获取event的body
String body = new String(event.getBody(), StandardCharsets.UTF_8);
// 判断body中是否包含"Shanghai","Beijing"
if (body.contains("Shanghai")) headers.put("city", "sh");
if (body.contains("Beijing")) headers.put("city", "bj");
return event;
}

@Override
public List<Event> intercept(List<Event> list) {
for (Event event : list) {
intercept(event);
}
return list;
}

@Override
public void close() {
}

// Flume通过Builder创建实例
// 实现Flume提供的Builder接口
public static class MyBuilder implements Builder {

@Override
public Interceptor build() {
return new EventHeaderInterceptor();
}

@Override
public void configure(Context context) {
}

}

}
  • 将打包完的Jar包放入Flume的lib目录中

配置

flume1.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
#Named
a1.sources = r1
a1.channels = c1 c2 c3
a1.sinks = k1 k2 k3

#Source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 5555

#channel selector
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = city
a1.sources.r1.selector.mapping.sh = c1
a1.sources.r1.selector.mapping.bj = c2
a1.sources.r1.selector.default = c3

# Interceptor
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = cc.mousse.flume.EventHeaderInterceptor$MyBuilder

#Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type = memory
a1.channels.c2.capacity = 10000
a1.channels.c2.transactionCapacity = 100


a1.channels.c3.type = memory
a1.channels.c3.capacity = 10000
a1.channels.c3.transactionCapacity = 100


#Sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = localhost
a1.sinks.k1.port = 6666

a1.sinks.k2.type = avro
a1.sinks.k2.hostname = localhost
a1.sinks.k2.port = 7777

a1.sinks.k3.type = avro
a1.sinks.k3.hostname = localhost
a1.sinks.k3.port = 8888

#Bind
a1.sources.r1.channels = c1 c2 c3
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
a1.sinks.k3.channel = c3

flume2.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
a2.sources = r1
a2.channels = c1
a2.sinks = k1

#Source
a2.sources.r1.type = avro
a2.sources.r1.bind = localhost
a2.sources.r1.port = 6666

#Channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 10000
a2.channels.c1.transactionCapacity = 100

#Sink
a2.sinks.k1.type = logger

#Bind
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

flume3.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#Named
a3.sources = r1
a3.channels = c1
a3.sinks = k1

#Source
a3.sources.r1.type = avro
a3.sources.r1.bind = localhost
a3.sources.r1.port = 7777

#Channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 10000
a3.channels.c1.transactionCapacity = 100

#Sink
a3.sinks.k1.type = logger

#Bind
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1

flume4.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#Named
a4.sources = r1
a4.channels = c1
a4.sinks = k1

#Source
a4.sources.r1.type = avro
a4.sources.r1.bind = localhost
a4.sources.r1.port = 8888

#Channel
a4.channels.c1.type = memory
a4.channels.c1.capacity = 10000
a4.channels.c1.transactionCapacity = 100

#Sink
a4.sinks.k1.type = logger

#Bind
a4.sources.r1.channels = c1
a4.sinks.k1.channel = c1

启动

1
2
3
4
5
6
7
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/multi/flume4.conf -n a4 -Dflume.root.logger=INFO,console

flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/multi/flume3.conf -n a3 -Dflume.root.logger=INFO,console

flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/multi/flume2.conf -n a2 -Dflume.root.logger=INFO,console

flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/multi/flume1.conf -n a1 -Dflume.root.logger=INFO,console

Source(了解)

生成随机数逻辑代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
public class MySource extends AbstractSource implements Configurable, PollableSource {

private String prefix;

/**
* Source的核心处理方法,在Flume的处理流程中是循环调用的
*
* @return
* @throws EventDeliveryException
*/
@Override
public Status process() throws EventDeliveryException {
Status status;
try {
// 采集数据封装成event对象
Event event = getSomeData();
// 将event对象交给ChannelProcessor处理
getChannelProcessor().processEvent(event);
// 正常处理,返回Status.READY
status = Status.READY;
} catch (Throwable t) {
// 处理失败,返回Status.BACKOFF
status = Status.BACKOFF;
}
return status;
}

private Event getSomeData() {
// 随机生成字符串
String data = UUID.randomUUID().toString();
String res = prefix + data;
SimpleEvent event = new SimpleEvent();
event.setBody(res.getBytes(StandardCharsets.UTF_8));
event.getHeaders().put("key1", "value1");
return event;
}

/**
* 规避时间的增长步长
*
* @return 时间
*/
@Override
public long getBackOffSleepIncrement() {
return 1;
}

/**
* 最大规避时间
*
* @return 时间
*/
@Override
public long getMaxBackOffSleepInterval() {
return 10;
}

/**
* 用于读取Flume的配置信息
*
* @param context
*/
@Override
public void configure(Context context) {
// 读取配置信息
this.prefix = context.getString("prefix", "");
}

}

配置

flume4.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#Named
a1.sources = r1
a1.channels = c1
a1.sinks = k1

#Source
a1.sources.r1.type = cc.mousse.flume.MySource
a1.sources.r1.prefix = log--

#Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100

#Sink
a1.sinks.k1.type = logger

#Bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

启动

1
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/mysource-flume-logger.conf -n a1 -Dflume.root.logger=INFO,console

Sink(了解)

控制台打印逻辑代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
public class MySink extends AbstractSink implements Configurable {

// 为当前类获取logger对象
private final Logger logger = LoggerFactory.getLogger(MySink.class);

/**
* Sink的核型处理方法,在Flume处理流程中被循环调用
*
* @return
* @throws EventDeliveryException
*/
@Override
public Status process() throws EventDeliveryException {
// 休眠一秒
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
Status status;
// 获取Channel
Channel ch = getChannel();
// 获取事务对象
Transaction txn = ch.getTransaction();
// 开启事务
txn.begin();
try {
// 从Channel中获取event对象
Event event = ch.take();
// 处理event
storeSomeData(event);
// 处理成功提交事务
txn.commit();
// 返回状态
status = Status.READY;
} catch (Throwable t) {
// 处理失败回滚事务
txn.rollback();
// 返回状态
status = Status.BACKOFF;
} finally {
// 无论事务成功与否,关闭事务
txn.close();
}
return status;
}

private void storeSomeData(Event event) {
logger.info(event.getHeaders() + " :: " + new String(event.getBody(), StandardCharsets.UTF_8));
}

@Override
public void configure(Context context) {
}

}

配置

conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#Named
a1.sources = r1
a1.channels = c1
a1.sinks = k1

#Source
a1.sources.r1.type = com.atguigu.flume.source.MySource
a1.sources.r1.prefix = log--

#Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100

#Sink
a1.sinks.k1.type = com.atguigu.flume.sink.MySink

#Bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

启动

1
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/mysource-flume-mysink.conf -n a1 -Dflume.root.logger=INFO,console

数据流监控

概念

  • Ganglia由gmond、gmetad和gweb三部分组成
  • gmond(Ganglia Monitoring Daemon):一种轻量级服务,安装在每台需要收集指标数据的节点主机上。可收集很多系统指标数据,如CPU、内存、磁盘、网络和活跃进程的数据等
  • gmetad(Ganglia Meta Daemon):整合所有信息,并将其以RRD格式存储至磁盘的服务
  • gweb(Ganglia Web):Ganglia可视化工具,gweb是一种利用浏览器显示gmetad所存储数据的PHP前端。在Web界面中以图表方式展现集群的运行状态下收集的多种不同指标数据

安装

  • 规划

    • bigdata1:gweb gmetad gmod
    • bigdata2/3: gmod
  • Bigdata1

    1
    2
    3
    4
    apt install -y ganglia-monitor 
    apt install -y ganglia-webfrontend
    apt install -y gmetad
    ln -s /usr/lib/ganglia /usr/lib/aarch64-linux-gnu/ganglia
  • bigdata2/3

    1
    2
    apt install -y ganglia-monitor
    ln -s /usr/lib/ganglia /usr/lib/aarch64-linux-gnu/ganglia
  • bigdata1修改配置文件

    1
    2
    # vim /etc/ganglia/gmetad.conf
    data_source "pz cluster" bigdata1 bigdata2 bigdata3
  • bigdata1/2/3修改配置文件

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    # vim /etc/ganglia/gmond.conf
    cluster {
    name = "pz cluster"
    owner = "unspecified"
    latlong = "unspecified"
    url = "unspecified"
    }
    udp_send_channel {
    #bind_hostname = yes # Highly recommended, soon to be default.
    # This option tells gmond to use a source address
    # that resolves to the machine's hostname. Without
    # this, the metrics may appear to come from any
    # interface and the DNS names associated with
    # those IPs will be used to create the RRDs.
    # mcast_join = 239.2.11.71
    # 数据发送给hadoop102
    host = bigdata1
    port = 8649
    ttl = 1
    }
    udp_recv_channel {
    # mcast_join = 239.2.11.71
    port = 8649
    # 接收来自任意连接的数据
    # bind = 0.0.0.0
    # Size of the UDP buffer. If you are handling lots of metrics you really
    # should bump it up to e.g. 10MB or even higher.
    # buffer = 10485760
    }

  • bigdata1修改配置文件/etc/selinux/config

  • selinux本次生效关闭必须重启,如果此时不想重启,可以临时生效之

  • 启动

    1
    2
    /etc/init.d/ganglia-monitor start
    /etc/init.d/gmetad start
  • 打开网页浏览ganglia页面:http://bigdata1/ganglia

    • 如果完成以上操作依然出现权限不足错误,请修改/var/lib/ganglia目录的权限
  • Flume启动

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    flume-ng agent \
    -c $FLUME_HOME/conf \
    -n a1 \
    -f $FLUME_HOME/jobs/netcat-flume-logger.conf \
    -Dflume.root.logger=INFO,console \
    -Dflume.monitoring.type=ganglia \
    -Dflume.monitoring.hosts=bigdata1:8649

    # EventPutAttemptCount source尝试写入channel的事件总数量
    # EventPutSuccessCount 成功写入channel且提交的事件总数量
    # EventTakeAttemptCount sink尝试从channel拉取事件的总数量。
    # EventTakeSuccessCount sink成功读取的事件的总数量
    # StartTime channel启动的时间(毫秒)
    # StopTime channel停止的时间(毫秒)
    # ChannelSize 目前channel中事件的总数量
    # ChannelFillPercentage channel占用百分比
    # ChannelCapacity channel的容量

面试

传输监控

  • 使用第三方框架Ganglia

Source&Sink&Channel

  • Source:用来收集数据,可以处理各种类型、各种格式的日志数据,包括avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy
  • Channel:对采集到的数据进行缓存,可以存放在Memory或File中
  • Sink:把数据发送到目的地的组件,目的地包括Hdfs、Logger、avro、thrift、ipc、file、Hbase、solr、自定义

Channel Selectors

  • Replication:将Source过来的Event发往所有Channel
  • Multiplexing:按规则将Source过来的Event发往特定Channel

参数调优

Source

  • 增加Source个数,提高batchSize容量
  • 增加Source个(使用Tair Dir Source时可增加FileGroups个数)可以增大Source的读取数据的能力
    • 例如:当某一个目录产生的文件过多时需要将这个文件目录拆分成多个文件目录,同时配置好多个Source 以保证Source有足够的能力获取到新产生的数据
  • batchSize参数决定Source一次批量运输到Channel的event条数,适当调大这个参数可以提高Source搬运Event到Channel时的性能

Channel

  • 使用file Channel时配置多个不同盘下的目录,transactionCapacity容量大于batchSize
  • type选择memory时Channel的性能最好,但是如果Flume进程意外挂掉可能会丢失数据。type选择file时Channel的容错性更好,但是性能上会比memory channel差。
  • 使用file Channel时dataDirs配置多个不同盘下的目录可以提高性能
  • Capacity参数决定Channel可容纳最大的event条数
  • transactionCapacity参数决定每次Source往channel里面写的最大event条数和每次Sink从channel里面读的最大event条数
  • transactionCapacity需要大于Source和Sink的batchSize参数

Sink

  • 增加Sink的个数,提高batchSize容量
  • 增加Sink的个数可以增加Sink消费event的能力。Sink也不是越多越好够用就行,过多的Sink会占用系统资源,造成系统资源不必要的浪费
  • batchSize参数决定Sink一次批量从Channel读取的event条数,适当调大这个参数可以提高Sink从Channel搬出event的性能

事务机制

  • Put事务流程

    1. doPut:Source将批量数据(Batch Data)写入临时缓冲区putList
    2. doCommit:检查Channel内存队列是否足够合并
    3. doRollback:若Channel内存队列空间不足,回滚数据
  • Take事务流程

    1. doTake:Sink将批量数据(Batch Data)取到临时缓冲区takeList,将数据发送到HDFS
    2. doCommit:若诗句全部发送成功,则清除零食缓冲区
    3. doRollback:若数据发送过程出现异常,rollback将临时缓冲区的所有数据归还给内存队列(可能造成数据重复)
  • 能保证数据不丢,但不能保证不重复


Flume
http://docs.mousse.cc/Flume/
作者
Mocha Mousse
发布于
2025年5月26日
许可协议