概念 概念 概念
基于流式架构,高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统
架构 基础架构 graph LR
1(Web Server) --> 2(Source)
subgraph Agent
2 --> 3(Channel) --> 4(Sink)
end
4 --> 5(HDFS)
Agent
一个JVM进程,以事件形式将数据从源头送至目的
Source
负责接收数据到Flume Agent的组件
可处理各类型、各格式的日志数据,包括avro、thrift、exec、jms、spooling directory、netcat、 taildir
Channel
Source和Sink之间的缓冲区,允许Source和Sink运作在不同的速率上
线程安全
Flume自带两种Channel:Memory Channel和File Channel
Sink
不断轮询Channel中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个Flume Agent
组件目的地包括hdfs、logger、avro、thrift、ipc、file、HBase、自定义
Event
Flume数据传输的基本单元
组成
Header:存放该event的一些属性,K-V结构
Body:存放该条数据,形式为字节数组
安装 地址
安装部署
案例 监控端口数据 概念
使用Flume监听一个端口,收集该端口数据,并打印到控制台
配置
启动 1 flume-ng agent --conf $FLUME_HOME /conf --conf-file $FLUME_HOME /jobs/netcat-flume-logger.conf --name a1 -Dflume.root.logger=INFO,console
监控单个追加文件 概念
配置
文件名:exec-flume-logger.conf
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 a1.sources = r1a1.channels = c1a1.sinks = k1 a1.sources.r1.type = exec a1.sources.r1.command = tail -f /flume/jobs/tail.txta1.channels.c1.type = memorya1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 100 a1.sinks.k1.type = loggera1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
启动 1 2 3 4 flume-ng agent --conf $FLUME_HOME /conf --conf-file $FLUME_HOME /jobs/netcat-flume-logger.conf --name a1 -Dflume.root.logger=INFO,console flume-ng agent -c $FLUME_HOME /conf -f $FLUME_HOME /jobs/exec-flume-logger.conf -n a1 -Dflume.root.logger=INFO,console
概念
配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 a1.sources = r1a1.channels = c1a1.sinks = k1 a1.sources.r1.type = exec a1.sources.r1.command = tail -f /flume/jobs/tail.txta1.channels.c1.type = memorya1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 100 a1.sinks.k1.type = hdfsa1.sinks.k1.hdfs.path = hdfs://bigdata1:8020 /flume/%Y%m%d/%Ha1.sinks.k1.hdfs.filePrefix = logs-a1.sinks.k1.hdfs.round = true a1.sinks.k1.hdfs.roundValue = 1 a1.sinks.k1.hdfs.roundUnit = houra1.sinks.k1.hdfs.useLocalTimeStamp = true a1.sinks.k1.hdfs.batchSize = 100 a1.sinks.k1.hdfs.fileType = DataStreama1.sinks.k1.hdfs.rollInterval = 60 a1.sinks.k1.hdfs.rollSize = 134217700 a1.sinks.k1.hdfs.rollCount = 0 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
启动 1 flume-ng agent -c $FLUME_HOME /conf -f $FLUME_HOME /jobs/exec-flume-hdfs.conf -n a1 -Dflume.root.logger=INFO,console
监控多个新文件 配置
文件名:spooling-flume-hdfs.conf
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 a1.sources = r1a1.channels = c1a1.sinks = k1 a1.sources.r1.type = spooldira1.sources.r1.spoolDir = /flume/jobs/spoolinga1.sources.r1.fileSuffix = .COMPLETEDa1.sources.r1.ignorePattern = .*\.tmpa1.channels.c1.type = memorya1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 100 a1.sinks.k1.type = hdfsa1.sinks.k1.hdfs.path = hdfs://bigdata1:8020 /flume/%Y%m%d/%Ha1.sinks.k1.hdfs.filePrefix = logs-a1.sinks.k1.hdfs.round = true a1.sinks.k1.hdfs.roundValue = 1 a1.sinks.k1.hdfs.roundUnit = houra1.sinks.k1.hdfs.useLocalTimeStamp = true a1.sinks.k1.hdfs.batchSize = 100 a1.sinks.k1.hdfs.fileType = DataStreama1.sinks.k1.hdfs.rollInterval = 60 a1.sinks.k1.hdfs.rollSize = 134217700 a1.sinks.k1.hdfs.rollCount = 0 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
启动 1 flume-ng agent -c $FLUME_HOME /conf -f $FLUME_HOME /jobs/spooling-flume-hdfs.conf -n a1 -Dflume.root.logger=INFO,console
监控多个追加文件 概念
使用Flume监听整个目录的实时追加文件,并上传至HDFS
配置
文件名字: taildir-flume-hdfs.conf
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 a1.sources = r1a1.channels = c1a1.sinks = k1 a1.sources.r1.type = TAILDIRa1.sources.r1.filegroups = f1 f2a1.sources.r1.filegroups.f1 = /flume/jobs/taildir/.*\.txta1.sources.r1.filegroups.f2 = /flume/jobs/taildir/.*\.loga1.sources.r1.positionFile = /flume/jobs/position/position.jsona1.channels.c1.type = memorya1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 100 a1.sinks.k1.type = hdfsa1.sinks.k1.hdfs.path = hdfs://bigdata1:8020 /flume/%Y%m%d/%Ha1.sinks.k1.hdfs.filePrefix = logs-a1.sinks.k1.hdfs.round = true a1.sinks.k1.hdfs.roundValue = 1 a1.sinks.k1.hdfs.roundUnit = houra1.sinks.k1.hdfs.useLocalTimeStamp = true a1.sinks.k1.hdfs.batchSize = 100 a1.sinks.k1.hdfs.fileType = DataStreama1.sinks.k1.hdfs.rollInterval = 60 a1.sinks.k1.hdfs.rollSize = 134217700 a1.sinks.k1.hdfs.rollCount = 0 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
启动 1 flume-ng agent -c $FLUME_HOME /conf -f $FLUME_HOME /jobs/taildir-flume-hdfs.conf -n a1 -Dflume.root.logger=INFO,console
1 2 3 4 5 [ { "inode" : 1317393 , "pos" : 6 , "file" : "/opt/module/flume-1.9.0/jobs/taildir/file1.txt" } , { "inode" : 1317394 , "pos" : 6 , "file" : "/opt/module/flume-1.9.0/jobs/taildir/file2.txt" } , { "inode" : 1317395 , "pos" : 5 , "file" : "/opt/module/flume-1.9.0/jobs/taildir/log1.log" } , { "inode" : 1317396 , "pos" : 5 , "file" : "/opt/module/flume-1.9.0/jobs/taildir/log2.log" } ]
Source分析
Exec source :适用于监控一个实时追加的文件,不能实现断点续传
Spooldir Source :适合用于同步新文件,但不适合对实时追加日志的文件进行监听并同步
Taildir Source :适合用于监听多个实时追加的文件,并且能够实现断点续传
原理 事务 概念
Put
doPut:Source将批量数据 (Batch Data)写入临时缓冲区putList
doCommit:检查Channel内存队列是否足够合并
doRollback:若Channel内存队列空间不足,回滚数据
Take
doTake:Sink将批量数据 (Batch Data)取到临时缓冲区takeList,将数据发送到HDFS
doCommit:若诗句全部发送成功,则清除零食缓冲区
doRollback:若数据发送过程出现异常,rollback将临时缓冲区的数据归还给内存队列(可能造成数据重复)
补充
Channel大小由capacity决定
putList/takeList大小由transactionCapacity决定
Batch大小由batchSize决定
batchSize ≤ transactionCapacity ≤ capacity
Agent内部原理
Source将接收到的数据封装成Event后交给Channel Processor处理
Channel Processor将数据传递给拦截器链
Channel Processor将拦截器返回的每个Event给Channel Selector
Channel Selector种类
Replication:将Source过来的Event发往所有Channel
Multiplexing:按配置,将Source过来的Event发往特定Channel
Channel Processor根据Channel Selector返回的结果写入相应的Channel
Event进入SinkProcessor
SinkProcessor种类
DefaultSinkProcessor:只有一个Sink
LoadBalanceSinkProcessor:按规则选择Sink
FailoverSinkProcessor:只有一个Sink工作,其他Sink作为替补
拓扑结构 简单串联
graph LR
subgraph 客户端
1(Source) --> 2(Channel) --> 3(Sink<br>Avro)
end
3 --Avro RPC--> 5(Source<br>Avro)
subgraph 服务端
5 --> 6(Channel) --> 7(Sink)
end
复制和多路复用
graph LR
subgraph 客户端
1(Source)
1 --> 2(Channel1) --> 3(Sink1)
1 --> 5(Channel2) --> 6(Sink2)
1 --> 8(Channel3) --> 9(Sink3<br>Avro)
end
subgraph 服务端
3 --> 4(HDFS)
6 --> 7(JMS)
9 -- Arvo RPC --> 10(Source4<br>Avro) --> 11(Channel4) --> 12(Sink4)
end
负载均衡和故障转移 graph LR
subgraph 客户端
1(Source) --> 2(Channel)
2 --> 3(Sink<br>Avro)
2 --> 4(Sink<br>Avro)
2 --> 5(Sink<br>Avro)
end
subgraph 服务端
3 -- Arvo RPC --> 6(Source1<br>Avro) --> 7(Channel1) --> 8(Sink1)
4 -- Arvo RPC --> 9(Source2<br>Avro) --> 10(Channel2) --> 11(Sink2)
5 -- Arvo RPC --> 12(Source3<br>Avro) --> 13(Channel3) --> 14(Sink3)
end
8 --> 15(HDFS)
11 --> 15
14 --> 15
聚合 graph LR
subgraph 客户端
1(Source1) --> 2(Channel1) --> 3(Sink1<br>Avro)
4(Source2) --> 5(Channel2) --> 6(Sink2<br>Avro)
7(Source3) --> 8(Channel3) --> 9(Sink3<br>Avro)
end
subgraph 服务端
3 -- Arvo RPC --> 10(Source4<br>Avro) --> 11(Channel4) --> 12(Sink4)
6 -- Arvo RPC --> 10
9 -- Arvo RPC --> 10
end
12 --> 13(HDFS)
企业案例 复制和多路复用 概念 使用Flume-1监控文件变动,Flume-1将变动内容传递给Flume-2,Flume-2负责存储到HDFS。同时Flume-1将变动内容传递给Flume-3,Flume-3负责输出到Local FileSystem
graph LR
subgraph flume1
1(Source1) --> 2(replicating<br>selector)
2 --> 3(Channel1) --> 4(Sink1)
2 --> 8(Channel1) --> 9(Sink1)
end
subgraph flume2
4 --> 5(Source2) --> 6(Channel2) --> 7(Sink2)
end
subgraph flume3
9 --> 10(Source3) --> 11(Channel3) --> 12(Sink3)
end
配置 flume1.conf 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 a1.sources = r1a1.channels = c1 c2a1.sinks = k1 k2a1.sources.r1.type = TAILDIRa1.sources.r1.filegroups = f1a1.sources.r1.filegroups.f1 = /flume/jobs/taildir/.*\.txta1.sources.r1.positionFile = /flume/jobs/position/position.jsona1.sources.r1.selector.type = replicatinga1.channels.c1.type = memorya1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 100 a1.channels.c2.type = memorya1.channels.c2.capacity = 10000 a1.channels.c2.transactionCapacity = 100 a1.sinks.k1.type = avroa1.sinks.k1.hostname = localhosta1.sinks.k1.port = 7777 a1.sinks.k2.type = avroa1.sinks.k2.hostname = localhosta1.sinks.k2.port = 8888 a1.sources.r1.channels = c1 c2 a1.sinks.k1.channel = c1 a1.sinks.k2.channel = c2
flume2.conf 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 a2.sources = r1a2.channels = c1a2.sinks = k1 a2.sources.r1.type = avroa2.sources.r1.bind = localhosta2.sources.r1.port = 7777 a2.channels.c1.type = memorya2.channels.c1.capacity = 10000 a2.channels.c1.transactionCapacity = 100 a2.sinks.k1.type = hdfsa2.sinks.k1.hdfs.path = hdfs://bigdata1:8020 /flume/%Y%m%d/%Ha2.sinks.k1.hdfs.filePrefix = logs-a2.sinks.k1.hdfs.round = true a2.sinks.k1.hdfs.roundValue = 1 a2.sinks.k1.hdfs.roundUnit = houra2.sinks.k1.hdfs.useLocalTimeStamp = true a2.sinks.k1.hdfs.batchSize = 100 a2.sinks.k1.hdfs.fileType = DataStreama2.sinks.k1.hdfs.rollInterval = 60 a2.sinks.k1.hdfs.rollSize = 134217700 a2.sinks.k1.hdfs.rollCount = 0 a2.sources.r1.channels = c1 a2.sinks.k1.channel = c1
flume3.conf 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 a3.sources = r1a3.channels = c1a3.sinks = k1 a3.sources.r1.type = avroa3.sources.r1.bind = localhosta3.sources.r1.port = 8888 a3.channels.c1.type = memorya3.channels.c1.capacity = 10000 a3.channels.c1.transactionCapacity = 100 a3.sinks.k1.type = file_rolla3.sinks.k1.sink.directory = /flume/jobs/filerolla3.sources.r1.channels = c1 a3.sinks.k1.channel = c1
启动 1 2 3 4 5 flume-ng agent -c $FLUME_HOME /conf -f $FLUME_HOME /jobs/replicating/flume3.conf -n a3 -Dflume.root.logger=INFO,console flume-ng agent -c $FLUME_HOME /conf -f $FLUME_HOME /jobs/replicating/flume2.conf -n a2 -Dflume.root.logger=INFO,console flume-ng agent -c $FLUME_HOME /conf -f $FLUME_HOME /jobs/replicating/flume1.conf -n a1 -Dflume.root.logger=INFO,console
负载均衡和故障转移 负载均衡 概念
使用Flume1监控一个端口,其sink组中的sink分别对接Flume2和Flume3
graph LR
1(Source) --> 2(replicating<br>selector) --> 3(Channel) --> 4(load_balance<br>processor)
4 --> 5(Avro<br>Sink) --> 6(Avro<br>Source) --> 7(Channel) --> 8(Sink) --> 9(控制台)
4 --> 10(Avro<br>Sink) --> 11(Avro<br>Source) --> 12(Channel) --> 13(Sink)
配置 flume1.conf 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 a1.sources = r1a1.channels = c1a1.sinks = k1 k2a1.sources.r1.type = netcata1.sources.r1.bind = localhosta1.sources.r1.port = 6666 a1.sources.r1.selector.type = replicatinga1.channels.c1.type = memorya1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 100 a1.sinks.k1.type = avroa1.sinks.k1.hostname = localhosta1.sinks.k1.port = 7777 a1.sinks.k2.type = avroa1.sinks.k2.hostname = localhosta1.sinks.k2.port = 8888 a1.sinkgroups = g1a1.sinkgroups.g1.sinks = k1 k2a1.sinkgroups.g1.processor.type = load_balancea1.sinkgroups.g1.processor.selector = random a1.sources.r1.channels = c1a1.sinks.k1.channel = c1 a1.sinks.k2.channel = c1
flume2.conf 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 a2.sources = r1a2.channels = c1a2.sinks = k1 a2.sources.r1.type = avroa2.sources.r1.bind = localhosta2.sources.r1.port = 7777 a2.channels.c1.type = memorya2.channels.c1.capacity = 10000 a2.channels.c1.transactionCapacity = 100 a2.sinks.k1.type = loggera2.sources.r1.channels = c1 a2.sinks.k1.channel = c1
flume3.conf 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 a3.sources = r1a3.channels = c1a3.sinks = k1 a3.sources.r1.type = avroa3.sources.r1.bind = localhosta3.sources.r1.port = 8888 a3.channels.c1.type = memorya3.channels.c1.capacity = 10000 a3.channels.c1.transactionCapacity = 100 a3.sinks.k1.type = loggera3.sources.r1.channels = c1 a3.sinks.k1.channel = c1
启动 1 2 3 4 5 flume-ng agent -c $FLUME_HOME /conf -f $FLUME_HOME /jobs/loadbalancing/flume3.conf -n a3 -Dflume.root.logger=INFO,console flume-ng agent -c $FLUME_HOME /conf -f $FLUME_HOME /jobs/loadbalancing/flume2.conf -n a2 -Dflume.root.logger=INFO,console flume-ng agent -c $FLUME_HOME /conf -f $FLUME_HOME /jobs/loadbalancing/flume1.conf -n a1 -Dflume.root.logger=INFO,console
故障转移 配置 flume1.conf 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 a1.sources = r1a1.channels = c1a1.sinks = k1 k2a1.sources.r1.type = netcata1.sources.r1.bind = localhosta1.sources.r1.port = 6666 a1.sources.r1.selector.type = replicatinga1.channels.c1.type = memorya1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 100 a1.sinks.k1.type = avroa1.sinks.k1.hostname = localhosta1.sinks.k1.port = 7777 a1.sinks.k2.type = avroa1.sinks.k2.hostname = localhosta1.sinks.k2.port = 8888 a1.sinkgroups = g1a1.sinkgroups.g1.sinks = k1 k2a1.sinkgroups.g1.processor.type = failovera1.sinkgroups.g1.processor.priority.k1 = 5 a1.sinkgroups.g1.processor.priority.k2 = 10 a1.sources.r1.channels = c1a1.sinks.k1.channel = c1 a1.sinks.k2.channel = c1
flume2.conf 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 a2.sources = r1a2.channels = c1a2.sinks = k1 a2.sources.r1.type = avroa2.sources.r1.bind = localhosta2.sources.r1.port = 7777 a2.channels.c1.type = memorya2.channels.c1.capacity = 10000 a2.channels.c1.transactionCapacity = 100 a2.sinks.k1.type = loggera2.sources.r1.channels = c1 a2.sinks.k1.channel = c1
flume3.conf 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 a3.sources = r1a3.channels = c1a3.sinks = k1 a3.sources.r1.type = avroa3.sources.r1.bind = localhosta3.sources.r1.port = 8888 a3.channels.c1.type = memorya3.channels.c1.capacity = 10000 a3.channels.c1.transactionCapacity = 100 a3.sinks.k1.type = loggera3.sources.r1.channels = c1 a3.sinks.k1.channel = c1
启动 1 2 3 4 5 flume-ng agent -c $FLUME_HOME /conf -f $FLUME_HOME /jobs/failover/flume3.conf -n a3 -Dflume.root.logger=INFO,console flume-ng agent -c $FLUME_HOME /conf -f $FLUME_HOME /jobs/failover/flume2.conf -n a2 -Dflume.root.logger=INFO,console flume-ng agent -c $FLUME_HOME /conf -f $FLUME_HOME /jobs/failover/flume1.conf -n a1 -Dflume.root.logger=INFO,console
聚合 概念
bigdata1上的Flume-1监控文件/opt/module/group.log,bigdata2上的Flume-2监控某一个端口的数据流,Flume-1与Flume-2将数据发送给bigdata3上的Flume-3,Flume-3将最终数据打印到控制台
配置 flume1.conf 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 a1.sources = r1a1.channels = c1a1.sinks = k1a1.sources.r1.type = TAILDIRa1.sources.r1.filegroups = f1a1.sources.r1.filegroups.f1 = /flume/jobs/taildir/.*\.txta1.sources.r1.positionFile = /flume/jobs/position/position.jsona1.channels.c1.type = memorya1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 100 a1.sinks.k1.type = avroa1.sinks.k1.hostname = bigdata3a1.sinks.k1.port = 8888 a1.sources.r1.channels = c1a1.sinks.k1.channel = c1
flume2.conf 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 a2.sources = r1a2.channels = c1a2.sinks = k1 a2.sources.r1.type = netcata2.sources.r1.bind = localhosta2.sources.r1.port = 6666 a2.channels.c1.type = memorya2.channels.c1.capacity = 10000 a2.channels.c1.transactionCapacity = 100 a2.sinks.k1.type = avroa2.sinks.k1.hostname = bigdata3a2.sinks.k1.port = 8888 a2.sources.r1.channels = c1 a2.sinks.k1.channel = c1
flume3.conf 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 a3.sources = r1a3.channels = c1a3.sinks = k1 a3.sources.r1.type = avroa3.sources.r1.bind = bigdata3a3.sources.r1.port = 8888 a3.channels.c1.type = memorya3.channels.c1.capacity = 10000 a3.channels.c1.transactionCapacity = 100 a3.sinks.k1.type = loggera3.sources.r1.channels = c1 a3.sinks.k1.channel = c1
启动 1 2 3 4 5 flume-ng agent -c $FLUME_HOME /conf -f $FLUME_HOME /jobs/aggre/flume3.conf -n a3 -Dflume.root.logger=INFO,console flume-ng agent -c $FLUME_HOME /conf -f $FLUME_HOME /jobs/aggre/flume2.conf -n a2 -Dflume.root.logger=INFO,console flume-ng agent -c $FLUME_HOME /conf -f $FLUME_HOME /jobs/aggre/flume1.conf -n a1 -Dflume.root.logger=INFO,console
自定义组件 Interceptor 概念 graph LR
1(Source<br>Flume1) --> 2(Interceptor) --> 3(multiplexing<br>selector)
3 --> 4(包含Shanghai的<br>Channel) --> 5(Sink) --> 6(Flume2)
3 --> 7(包含Beijing的<br>Channel) --> 8(Sink) --> 9(Flume3)
3 --> 10(其它的<br>Channel) --> 11(Sink) --> 12(Flume4)
依赖 1 2 3 4 5 <dependency > <groupId > org.apache.flume</groupId > <artifactId > flume-ng-core</artifactId > <version > 1.10.0</version > </dependency >
业务代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 public class EventHeaderInterceptor implements Interceptor { @Override public void initialize () { } @Override public Event intercept (Event event) { Map<String, String> headers = event.getHeaders(); String body = new String (event.getBody(), StandardCharsets.UTF_8); if (body.contains("Shanghai" )) headers.put("city" , "sh" ); if (body.contains("Beijing" )) headers.put("city" , "bj" ); return event; } @Override public List<Event> intercept (List<Event> list) { for (Event event : list) { intercept(event); } return list; } @Override public void close () { } public static class MyBuilder implements Builder { @Override public Interceptor build () { return new EventHeaderInterceptor (); } @Override public void configure (Context context) { } } }
配置 flume1.conf 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 a1.sources = r1a1.channels = c1 c2 c3a1.sinks = k1 k2 k3a1.sources.r1.type = netcata1.sources.r1.bind = localhosta1.sources.r1.port = 5555 a1.sources.r1.selector.type = multiplexinga1.sources.r1.selector.header = citya1.sources.r1.selector.mapping.sh = c1a1.sources.r1.selector.mapping.bj = c2a1.sources.r1.selector.default = c3a1.sources.r1.interceptors = i1a1.sources.r1.interceptors.i1.type = cc.mousse.flume.EventHeaderInterceptor$MyBuilder a1.channels.c1.type = memorya1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 100 a1.channels.c2.type = memorya1.channels.c2.capacity = 10000 a1.channels.c2.transactionCapacity = 100 a1.channels.c3.type = memorya1.channels.c3.capacity = 10000 a1.channels.c3.transactionCapacity = 100 a1.sinks.k1.type = avroa1.sinks.k1.hostname = localhosta1.sinks.k1.port = 6666 a1.sinks.k2.type = avroa1.sinks.k2.hostname = localhosta1.sinks.k2.port = 7777 a1.sinks.k3.type = avroa1.sinks.k3.hostname = localhosta1.sinks.k3.port = 8888 a1.sources.r1.channels = c1 c2 c3 a1.sinks.k1.channel = c1 a1.sinks.k2.channel = c2 a1.sinks.k3.channel = c3
flume2.conf 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 a2.sources = r1a2.channels = c1a2.sinks = k1 a2.sources.r1.type = avroa2.sources.r1.bind = localhosta2.sources.r1.port = 6666 a2.channels.c1.type = memorya2.channels.c1.capacity = 10000 a2.channels.c1.transactionCapacity = 100 a2.sinks.k1.type = loggera2.sources.r1.channels = c1 a2.sinks.k1.channel = c1
flume3.conf 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 a3.sources = r1a3.channels = c1a3.sinks = k1 a3.sources.r1.type = avroa3.sources.r1.bind = localhosta3.sources.r1.port = 7777 a3.channels.c1.type = memorya3.channels.c1.capacity = 10000 a3.channels.c1.transactionCapacity = 100 a3.sinks.k1.type = loggera3.sources.r1.channels = c1 a3.sinks.k1.channel = c1
flume4.conf 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 a4.sources = r1a4.channels = c1a4.sinks = k1 a4.sources.r1.type = avroa4.sources.r1.bind = localhosta4.sources.r1.port = 8888 a4.channels.c1.type = memorya4.channels.c1.capacity = 10000 a4.channels.c1.transactionCapacity = 100 a4.sinks.k1.type = loggera4.sources.r1.channels = c1 a4.sinks.k1.channel = c1
启动 1 2 3 4 5 6 7 flume-ng agent -c $FLUME_HOME /conf -f $FLUME_HOME /jobs/multi/flume4.conf -n a4 -Dflume.root.logger=INFO,console flume-ng agent -c $FLUME_HOME /conf -f $FLUME_HOME /jobs/multi/flume3.conf -n a3 -Dflume.root.logger=INFO,console flume-ng agent -c $FLUME_HOME /conf -f $FLUME_HOME /jobs/multi/flume2.conf -n a2 -Dflume.root.logger=INFO,console flume-ng agent -c $FLUME_HOME /conf -f $FLUME_HOME /jobs/multi/flume1.conf -n a1 -Dflume.root.logger=INFO,console
Source(了解) 生成随机数逻辑代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 public class MySource extends AbstractSource implements Configurable , PollableSource { private String prefix; @Override public Status process () throws EventDeliveryException { Status status; try { Event event = getSomeData(); getChannelProcessor().processEvent(event); status = Status.READY; } catch (Throwable t) { status = Status.BACKOFF; } return status; } private Event getSomeData () { String data = UUID.randomUUID().toString(); String res = prefix + data; SimpleEvent event = new SimpleEvent (); event.setBody(res.getBytes(StandardCharsets.UTF_8)); event.getHeaders().put("key1" , "value1" ); return event; } @Override public long getBackOffSleepIncrement () { return 1 ; } @Override public long getMaxBackOffSleepInterval () { return 10 ; } @Override public void configure (Context context) { this .prefix = context.getString("prefix" , "" ); } }
配置 flume4.conf 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 a1.sources = r1a1.channels = c1a1.sinks = k1 a1.sources.r1.type = cc.mousse.flume.MySourcea1.sources.r1.prefix = log--a1.channels.c1.type = memorya1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 100 a1.sinks.k1.type = loggera1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
启动 1 flume-ng agent -c $FLUME_HOME /conf -f $FLUME_HOME /jobs/mysource-flume-logger.conf -n a1 -Dflume.root.logger=INFO,console
Sink(了解) 控制台打印逻辑代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 public class MySink extends AbstractSink implements Configurable { private final Logger logger = LoggerFactory.getLogger(MySink.class); @Override public Status process () throws EventDeliveryException { try { TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { throw new RuntimeException (e); } Status status; Channel ch = getChannel(); Transaction txn = ch.getTransaction(); txn.begin(); try { Event event = ch.take(); storeSomeData(event); txn.commit(); status = Status.READY; } catch (Throwable t) { txn.rollback(); status = Status.BACKOFF; } finally { txn.close(); } return status; } private void storeSomeData (Event event) { logger.info(event.getHeaders() + " :: " + new String (event.getBody(), StandardCharsets.UTF_8)); } @Override public void configure (Context context) { } }
配置 conf 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 a1.sources = r1a1.channels = c1a1.sinks = k1 a1.sources.r1.type = com.atguigu.flume.source.MySourcea1.sources.r1.prefix = log--a1.channels.c1.type = memorya1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 100 a1.sinks.k1.type = com.atguigu.flume.sink.MySinka1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
启动 1 flume-ng agent -c $FLUME_HOME /conf -f $FLUME_HOME /jobs/mysource-flume-mysink.conf -n a1 -Dflume.root.logger=INFO,console
数据流监控 概念
Ganglia由gmond、gmetad和gweb三部分组成
gmond (Ganglia Monitoring Daemon):一种轻量级服务,安装在每台需要收集指标数据的节点主机上。可收集很多系统指标数据,如CPU、内存、磁盘、网络和活跃进程的数据等
gmetad (Ganglia Meta Daemon):整合所有信息,并将其以RRD格式存储至磁盘的服务
gweb (Ganglia Web):Ganglia可视化工具,gweb是一种利用浏览器显示gmetad所存储数据的PHP前端。在Web界面中以图表方式展现集群的运行状态下收集的多种不同指标数据
安装
规划
bigdata1:gweb gmetad gmod
bigdata2/3: gmod
Bigdata1
1 2 3 4 apt install -y ganglia-monitor apt install -y ganglia-webfrontend apt install -y gmetadln -s /usr/lib/ganglia /usr/lib/aarch64-linux-gnu/ganglia
bigdata2/3
1 2 apt install -y ganglia-monitorln -s /usr/lib/ganglia /usr/lib/aarch64-linux-gnu/ganglia
bigdata1修改配置文件
1 2 data_source "pz cluster" bigdata1 bigdata2 bigdata3
bigdata1/2/3修改配置文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 cluster { name = "pz cluster" owner = "unspecified" latlong = "unspecified" url = "unspecified" } udp_send_channel { host = bigdata1 port = 8649 ttl = 1 } udp_recv_channel { port = 8649 }
bigdata1修改配置文件/etc/selinux/config
selinux本次生效关闭必须重启,如果此时不想重启,可以临时生效之
启动
1 2 /etc/init.d/ganglia-monitor start /etc/init.d/gmetad start
打开网页浏览ganglia页面:http://bigdata1/ganglia
如果完成以上操作依然出现权限不足错误,请修改/var/lib/ganglia目录的权限
Flume启动
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 flume-ng agent \ -c $FLUME_HOME /conf \ -n a1 \ -f $FLUME_HOME /jobs/netcat-flume-logger.conf \ -Dflume.root.logger=INFO,console \ -Dflume.monitoring.type=ganglia \ -Dflume.monitoring.hosts=bigdata1:8649
面试 传输监控
Source&Sink&Channel
Source :用来收集数据,可以处理各种类型、各种格式的日志数据,包括avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy
Channel :对采集到的数据进行缓存,可以存放在Memory或File中
Sink :把数据发送到目的地的组件,目的地包括Hdfs、Logger、avro、thrift、ipc、file、Hbase、solr、自定义
Channel Selectors
Replication :将Source过来的Event发往所有Channel
Multiplexing :按规则将Source过来的Event发往特定Channel
参数调优 Source
增加Source个数,提高batchSize容量
增加Source个(使用Tair Dir Source时可增加FileGroups个数)可以增大Source的读取数据的能力
例如:当某一个目录产生的文件过多时需要将这个文件目录拆分成多个文件目录,同时配置好多个Source 以保证Source有足够的能力获取到新产生的数据
batchSize
参数决定Source一次批量运输到Channel的event条数,适当调大这个参数可以提高Source搬运Event到Channel时的性能
Channel
使用file Channel时配置多个不同盘下的目录,transactionCapacity
容量大于batchSize
type选择memory时Channel的性能最好,但是如果Flume进程意外挂掉可能会丢失数据。type选择file时Channel的容错性更好,但是性能上会比memory channel差。
使用file Channel时dataDirs
配置多个不同盘下的目录可以提高性能
Capacity
参数决定Channel可容纳最大的event条数
transactionCapacity
参数决定每次Source往channel里面写的最大event条数和每次Sink从channel里面读的最大event条数
transactionCapacity需要大于Source和Sink的batchSize参数
Sink
增加Sink的个数,提高batchSize
容量
增加Sink的个数可以增加Sink消费event的能力。Sink也不是越多越好够用就行,过多的Sink会占用系统资源,造成系统资源不必要的浪费
batchSize
参数决定Sink一次批量从Channel读取的event条数,适当调大这个参数可以提高Sink从Channel搬出event的性能
事务机制
Put事务流程
doPut:Source将批量数据 (Batch Data)写入临时缓冲区putList
doCommit:检查Channel内存队列是否足够合并
doRollback:若Channel内存队列空间不足,回滚数据
Take事务流程
doTake:Sink将批量数据 (Batch Data)取到临时缓冲区takeList,将数据发送到HDFS
doCommit:若诗句全部发送成功,则清除零食缓冲区
doRollback:若数据发送过程出现异常,rollback将临时缓冲区的所有数据归还给内存队列(可能造成数据重复)
能保证数据不丢,但不能保证不重复