Flume的概述
Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日 志采集、聚合和传输的系统。Flume基于流式架构,灵活简单。
- 高可用(HA) flume框架(故障转移机制)
- 高可靠 数据采集的可靠性
- 分布式 分布式集群搭建
Flume的作用
最主要的作用:实时读取服务器本地磁盘的数据,将数据写到HDFS、Kafka
Flume的优点
可以和任意存储进程集成。
- 支持不同的采集源
- 支持多类型的目标源
输入的的数据速率大于写入目的存储的速率,flume会进行缓冲,减小 hdfs的压力。
flume中的事务基于channel,使用了两个事务模型(sender + receiver),确保消息被可靠发送。
Flume使用两个独立的事务分别负责从soucrce到channel,以及从 channel到sink的事件传递。一旦事务中所有的数据全部成功提交到 channel,那么source才认为该数据读取完成。同理,只有成功被sink 写出去的数据,才会从channel中移除。
Flume的组成结构
1、Flume组成架构
2、Agent
a、简介
Agent是一个JVM进程,它以事件的形式将数据从源头送至目的。Agent 主要有3个部分组成,Source、Channel、Sink。
b、Source
Source是负责接收数据到Flume Agent的组件。Source组件可以处理 各种类型、各种格式的日志数据,包括avro、thrift、exec、jms、 spooling directory、netcat、sequence generator、syslog、 http、legacy。
c、Channel
Channel是位于Source和Sink之间的缓冲区。因此,Channel允许 Source和Sink运作在不同的速率上。Channel是线程安全的,可以同 时处理几个Source的写入操作和几个Sink的读取操作。 Flume自带两种Channel:Memory Channel和File Channel。 Memory Channel是内存中的队列。Memory Channel在不需要关心 数据丢失的情景下适用。如果需要关心数据丢失,那么Memory Channel就不应该使用,因为程序死亡、机器宕机或者重启都会导致数 据丢失。File Channel将所有事件写到磁盘。因此在程序关闭或机器宕 机的情况下不会丢失数据。
d、Sink
Sink不断地轮询Channel中的事件且批量地移除它们,并将这些事件批 量写入到存储或索引系统、或者被发送到另一个Flume Agent。Sink 是完全事务性的。在从Channel批量删除数据之前,每个Sink用 Channel启动一个事务。批量事件一旦成功写出到存储系统或下一个 Flume Agent,Sink就利用Channel提交事务。事务一旦被提交,该 Channel从自己的内部缓冲区删除事件。Sink组件目的地包括hdfs、 logger、avro、thrift、ipc、file、null、HBase、solr、自定义。
e、Event
传输单元,Flume数据传输的基本单元,以事件的形式将数据从源头送 至目的地。 Event由可选的header和载有数据的一个byte array 构成。Header是容纳了key-value字符串对的HashMap。
Flume agent的配置文件
单数据源单出口案例
这种模式是将多个flume给顺序连接起来了,从最初的source开始到最 终sink传送的目的存储系统。此模式不建议桥接过多的flume数量, flume数量过多不仅会影响传输速率,而且一旦传输过程中某个节点 flume宕机,会影响整个传输系统。
flume实现监控端口数据案例:
用netcat工具向本机端口号:44444发送消息,flume监听
# Name the components on this agent
# r1:表示a1的输入源 a1:表示agent的名称
a1.sources = r1
# k1:表示a1的输出目的地
a1.sinks = k1
# c1:表示a1的缓冲区
a1.channels = c1
# Describe/configure the source
# 表示a1的输入源类型为netcat端口类型
a1.sources.r1.type = netcat
# 表示a1的监听主机
a1.soucres.r1.bind = localhost
# 表示a1的监听的端口号
a1.sources.r1.port = 44444
# Describe the sink
# 表示a1的输出目的地是控制台logger类型
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
# 表示a1的channel类型是memory内存型
a1.channels.c1.type = memory
# 表示a1的channel总容量1000个event
a1.channels.c1.capacity = 1000
# 表示a1的channel传输时收集到100条event以后再去提交事务
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
# 表示讲r1和c1链接起来
a1.sources.r1.channels = c1
# 表示将k1和c1链接起来
a1.sinks.k1.channel = c1
启动flume:
-
方法一:bin/flume-ng agent --conf conf/ --name a1 --conf-file job/flume-netcat-logger.conf - Dflume.root.logger=INFO,console
-
方法二:bin/flume-ng agent -c conf/ -n a1 –f job/flume-netcat-logger.conf -Dflume.root.logger=INFO,console
参数说明:
- --conf conf/ :表示(conf)配置文件存储在conf/目录
- --name a1 :表示给agent起名为a1
- --conf-file job/flume-netcat.conf :flume本次启动读取的配置 文件是在job文件夹下的flume-telnet.conf文件。
- -Dflume.root.logger==INFO,console :-D表示flume运行时动 态修改flume.root.logger参数属性值,并将控制台日志打印级别设 置为INFO级别。日志级别包括:log、info、warn、error。
实时采集文件到HDFS上案例
用flume实时监听某文件,当该文件的内容变化时,上传该数据到HDFS上。
# Name the components on this agent
a2.sources = r2
a2.sinks = k2
a2.channels = c2
# Describe/configure the source
# 定义数据源文件的类型
a2.sources.r2.type = exec
# 监听该目录下的access.log文件
a2.sources.r2.command = tail -F /home/hadoop/nginx/logs/access.log
a2.sources.r2.shell = /bin/bash -c
# Describe the sink
a2.sinks.k2.type = hdfs
# 上传文件的路径 %Y%m%d为时间戳,自动生成对应时间 年月日
a2.sinks.k2.hdfs.path = hdfs://192.168.137.128:9000/flume/%Y%m%d/%H
#上传文件的前缀
a2.sinks.k2.hdfs.filePrefix = logs-
#是否按照时间滚动文件夹
a2.sinks.k2.hdfs.round = true
#多少时间单位创建一个新的文件夹
a2.sinks.k2.hdfs.roundValue = 1
#重新定义时间单位
a2.sinks.k2.hdfs.roundUnit = hour
#是否使用本地时间戳
a2.sinks.k2.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a2.sinks.k2.hdfs.batchSize = 1000
#设置文件类型,可支持压缩
a2.sinks.k2.hdfs.fileType = DataStream
#多久生成一个新的文件
a2.sinks.k2.hdfs.rollInterval = 60
#设置每个文件的滚动大小
a2.sinks.k2.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a2.sinks.k2.hdfs.rollCount = 0
# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2
实时读取目录文件到HDFS上案例
使用flume实时监听整个目录文件,当该目录文件新增时,上传该文件到HDFS上。
a3.sources = r3
a3.sinks = k3
a3.channels = c3
# Describe/configure the source
# 定义source类型为目录
a3.sources.r3.type = spooldir
# 定义监控目录
a3.sources.r3.spoolDir = /home/hadoop/bigdatasoftware/flume/upload
# 定义文件上传完的后缀名
a3.sources.r3.fileSuffix = .COMPLETED
# 是否有五年间头
a3.sources.r3.fileHeader = true
#忽略所有以.tmp结尾的文件,不上传
a3.sources.r3.ignorePattern = ([^ ]*.tmp)
# Describe the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://192.168.137.128:9000/flume/upload/%Y%m%d/%H
#上传文件的前缀
a3.sinks.k3.hdfs.filePrefix = upload-
#是否按照时间滚动文件夹
a3.sinks.k3.hdfs.round = true
#多少时间单位创建一个新的文件夹
a3.sinks.k3.hdfs.roundValue = 1
#重新定义时间单位
a3.sinks.k3.hdfs.roundUnit = hour
#是否使用本地时间戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a3.sinks.k3.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a3.sinks.k3.hdfs.fileType = DataStream
#多久生成一个新的文件
a3.sinks.k3.hdfs.rollInterval = 60
#设置每个文件的滚动大小大概是128M
a3.sinks.k3.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a3.sinks.k3.hdfs.rollCount = 0
# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3
单数据源多出口案例(选择器)
Flume支持将事件流向一个或者多个目的地。这种模式将数据源复制到 多个channel中,每个channel都有相同的数据,sink可以选择传送的 不同的目的地。
flume1监控文件的变动,并将变动的内容传递给flume2和flume3。
flume2负责输出到HDFS上
flume3负责输出到本地上
三个flume在同一台设备上
flume1:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# 将数据流复制给所有channel
a1.sources.r1.selector.type = replicating
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/hadoop/bigdatasoftware/nginx/logs/access.log
a1.sources.r1.shell = /bin/bash -c
# Describe the sink
# sink端的avro是一个数据发送者
a1.sinks.k1.type = avro
# 设置其中一个flume接收的地址
a1.sinks.k1.hostname = 192.168.137.128
a1.sinks.k1.port = 4141
a1.sinks.k2.type = avro
# 设置另一个flume的接收地址
a1.sinks.k2.hostname = 192.168.137.128
a1.sinks.k2.port = 4142
# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
flume2:
# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1
# Describe/configure the source
# source端的avro是一个数据接收服务
a2.sources.r1.type = avro
# 设置本机地址,注意端口号
a2.sources.r1.bind = 192.168.137.128
a2.sources.r1.port = 4141
# Describe the sink
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path =
hdfs://192.168.137.128:9000/flume2/%Y%m%d/%H
#上传文件的前缀
a2.sinks.k1.hdfs.filePrefix = flume2-
#是否按照时间滚动文件夹
a2.sinks.k1.hdfs.round = true
#多少时间单位创建一个新的文件夹
a2.sinks.k1.hdfs.roundValue = 1
#重新定义时间单位
a2.sinks.k1.hdfs.roundUnit = hour
#是否使用本地时间戳
a2.sinks.k1.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a2.sinks.k1.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a2.sinks.k1.hdfs.fileType = DataStream
#多久生成一个新的文件
a2.sinks.k1.hdfs.rollInterval = 600
#设置每个文件的滚动大小大概是128M
a2.sinks.k1.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a2.sinks.k1.hdfs.rollCount = 0
# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
flume3:
# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c2
# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = 192.168.137.128
a3.sources.r1.port = 4142
# Describe the sink
a3.sinks.k1.type = file_roll
a3.sinks.k1.sink.directory = /opt/module/data/flume3
# Describe the channel
a3.channels.c2.type = memory
a3.channels.c2.capacity = 1000
a3.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r1.channels = c2
a3.sinks.k1.channel = c2
注意:接收方与发送方的地址和端口号要对应
单数据源多出口案例(Sink组)
Flume支持使用将多个sink逻辑上分到一个sink组,flume将数据发送 到不同的sink,主要解决负载均衡和故障转移问题。
配置1个接收日志文件的source和1个channel、两个sink,分别输送给flume-flume-console1和flume-flume-console2。
flume1:
a1.sources = r1
a1.channels = c1
a1.sinks = k1 k2
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 22222
#定义一个sink组
#一个channel对应多个sink时要设置一个sinkgroups
a1.sinkgroups = g1
#指明sink组中的sink实例
a1.sinkgroups.g1.sinks = k1 k2
#设置sinkProcessor的类型(负载均衡)
a1.sinkgroups.g1.processor.type = load_balance
#①random-随机分配 ②round_robin-轮循
a1.sinkgroups.g1.processor.selector = random
a1.channels.c1.type = memory
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = 192.168.137.128
a1.sinks.k1.port = 33333
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = 192.168.137.129
a1.sinks.k2.port = 44444
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1
flume2:
a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.type = avro
a1.sources.r1.bind = 192.168.137.128
a1.sources.r1.port = 33333
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.sinks.k1.type = logger
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
flume3:
a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.type = avro
a1.sources.r1.bind = 192.168.137.129
a1.sources.r1.port = 44444
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.sinks.k1.type = logger
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
多数据源汇总
这种模式是我们最常见的,也非常实用,日常web应用通常分布在上百 个服务器,大者甚至上千个、上万个服务器。产生的日志,处理起来也 非常麻烦。用flume的这种组合方式能很好的解决这一问题,每台服务 器部署一个flume采集日志,传送到一个集中收集日志的flume,再由 此flume上传到hdfs、hive、hbase、jms等,进行日志分析
flume1监控一个文件的变动
flume2监控一个端口的数据
flume1和flume2将数据发送给flume3,flume3最终将数据打印到控制台。
flume1:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/hadoop/nginx/logs/access.log
a1.sources.r1.shell = /bin/bash -c
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = 192.168.137.129
a1.sinks.k1.port = 4141
# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
flume2:
# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1
# Describe/configure the source
a2.sources.r1.type = netcat
a2.sources.r1.bind = 198.168.137.128
a2.sources.r1.port = 44444
# Describe the sink
a2.sinks.k1.type = avro
a2.sinks.k1.hostname = 192.168.137.129
a2.sinks.k1.port = 4141
# Use a channel which buffers events in memory
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
flume3:
# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c1
# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = 192.168.137.129
a3.sources.r1.port = 4141
# Describe the sink
# Describe the sink
a3.sinks.k1.type = logger
# Describe the channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1
本文摘自 :https://www.cnblogs.com/