当前位置:首页 > IT技术 > Web编程 > 正文

大数据技术Flume框架详解
2022-08-29 23:57:25

Flume的概述

Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日 志采集、聚合和传输的系统。Flume基于流式架构,灵活简单。

  • 高可用(HA) flume框架(故障转移机制)
  • 高可靠 数据采集的可靠性
  • 分布式 分布式集群搭建

Flume的作用

最主要的作用:实时读取服务器本地磁盘的数据,将数据写到HDFS、Kafka

Flume的优点

可以和任意存储进程集成。

  • 支持不同的采集源
  • 支持多类型的目标源

输入的的数据速率大于写入目的存储的速率,flume会进行缓冲,减小 hdfs的压力

flume中的事务基于channel,使用了两个事务模型(sender + receiver),确保消息被可靠发送

Flume使用两个独立的事务分别负责从soucrce到channel,以及从 channel到sink的事件传递。一旦事务中所有的数据全部成功提交到 channel,那么source才认为该数据读取完成。同理,只有成功被sink 写出去的数据,才会从channel中移除。

Flume的组成结构

1、Flume组成架构

2、Agent

a、简介

Agent是一个JVM进程,它以事件的形式将数据从源头送至目的。Agent 主要有3个部分组成,Source、Channel、Sink。

b、Source

Source是负责接收数据到Flume Agent的组件。Source组件可以处理 各种类型、各种格式的日志数据,包括avro、thrift、exec、jms、 spooling directory、netcat、sequence generator、syslog、 http、legacy。

c、Channel

Channel是位于Source和Sink之间的缓冲区。因此,Channel允许 Source和Sink运作在不同的速率上。Channel是线程安全的,可以同 时处理几个Source的写入操作和几个Sink的读取操作。 Flume自带两种Channel:Memory Channel和File Channel。 Memory Channel是内存中的队列。Memory Channel在不需要关心 数据丢失的情景下适用。如果需要关心数据丢失,那么Memory Channel就不应该使用,因为程序死亡、机器宕机或者重启都会导致数 据丢失。File Channel将所有事件写到磁盘。因此在程序关闭或机器宕 机的情况下不会丢失数据。

d、Sink

Sink不断地轮询Channel中的事件且批量地移除它们,并将这些事件批 量写入到存储或索引系统、或者被发送到另一个Flume Agent。Sink 是完全事务性的。在从Channel批量删除数据之前,每个Sink用 Channel启动一个事务。批量事件一旦成功写出到存储系统或下一个 Flume Agent,Sink就利用Channel提交事务。事务一旦被提交,该 Channel从自己的内部缓冲区删除事件。Sink组件目的地包括hdfs、 logger、avro、thrift、ipc、file、null、HBase、solr、自定义。

e、Event

传输单元,Flume数据传输的基本单元,以事件的形式将数据从源头送 至目的地。 Event由可选的header和载有数据的一个byte array 构成。Header是容纳了key-value字符串对的HashMap。

Flume agent的配置文件

单数据源单出口案例

这种模式是将多个flume给顺序连接起来了,从最初的source开始到最 终sink传送的目的存储系统。此模式不建议桥接过多的flume数量, flume数量过多不仅会影响传输速率,而且一旦传输过程中某个节点 flume宕机,会影响整个传输系统。

flume实现监控端口数据案例:

用netcat工具向本机端口号:44444发送消息,flume监听

# Name the components on this agent
# r1:表示a1的输入源	a1:表示agent的名称
a1.sources = r1
# k1:表示a1的输出目的地
a1.sinks = k1
# c1:表示a1的缓冲区
a1.channels = c1

# Describe/configure the source
# 表示a1的输入源类型为netcat端口类型
a1.sources.r1.type = netcat
# 表示a1的监听主机
a1.soucres.r1.bind = localhost
# 表示a1的监听的端口号
a1.sources.r1.port = 44444

# Describe the sink
# 表示a1的输出目的地是控制台logger类型
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
# 表示a1的channel类型是memory内存型
a1.channels.c1.type = memory
# 表示a1的channel总容量1000个event
a1.channels.c1.capacity = 1000
# 表示a1的channel传输时收集到100条event以后再去提交事务
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
# 表示讲r1和c1链接起来
a1.sources.r1.channels = c1
# 表示将k1和c1链接起来
a1.sinks.k1.channel = c1

启动flume

  • 方法一:bin/flume-ng agent --conf conf/ --name a1 --conf-file job/flume-netcat-logger.conf - Dflume.root.logger=INFO,console

  • 方法二:bin/flume-ng agent -c conf/ -n a1 –f job/flume-netcat-logger.conf -Dflume.root.logger=INFO,console

参数说明

  • --conf conf/ :表示(conf)配置文件存储在conf/目录
  • --name a1 :表示给agent起名为a1
  • --conf-file job/flume-netcat.conf :flume本次启动读取的配置 文件是在job文件夹下的flume-telnet.conf文件。
  • -Dflume.root.logger==INFO,console :-D表示flume运行时动 态修改flume.root.logger参数属性值,并将控制台日志打印级别设 置为INFO级别。日志级别包括:log、info、warn、error。

实时采集文件到HDFS上案例

用flume实时监听某文件,当该文件的内容变化时,上传该数据到HDFS上。

# Name the components on this agent
a2.sources = r2
a2.sinks = k2
a2.channels = c2

# Describe/configure the source
# 定义数据源文件的类型
a2.sources.r2.type = exec
# 监听该目录下的access.log文件
a2.sources.r2.command = tail -F /home/hadoop/nginx/logs/access.log
a2.sources.r2.shell = /bin/bash -c

# Describe the sink
a2.sinks.k2.type = hdfs
# 上传文件的路径 %Y%m%d为时间戳,自动生成对应时间 年月日
a2.sinks.k2.hdfs.path = hdfs://192.168.137.128:9000/flume/%Y%m%d/%H
#上传文件的前缀
a2.sinks.k2.hdfs.filePrefix = logs-
#是否按照时间滚动文件夹
a2.sinks.k2.hdfs.round = true
#多少时间单位创建一个新的文件夹
a2.sinks.k2.hdfs.roundValue = 1
#重新定义时间单位
a2.sinks.k2.hdfs.roundUnit = hour
#是否使用本地时间戳
a2.sinks.k2.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a2.sinks.k2.hdfs.batchSize = 1000
#设置文件类型,可支持压缩
a2.sinks.k2.hdfs.fileType = DataStream
#多久生成一个新的文件
a2.sinks.k2.hdfs.rollInterval = 60
#设置每个文件的滚动大小
a2.sinks.k2.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a2.sinks.k2.hdfs.rollCount = 0
# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2

实时读取目录文件到HDFS上案例

使用flume实时监听整个目录文件,当该目录文件新增时,上传该文件到HDFS上。

a3.sources = r3
a3.sinks = k3
a3.channels = c3

# Describe/configure the source
# 定义source类型为目录
a3.sources.r3.type = spooldir
# 定义监控目录
a3.sources.r3.spoolDir = /home/hadoop/bigdatasoftware/flume/upload
# 定义文件上传完的后缀名
a3.sources.r3.fileSuffix = .COMPLETED
# 是否有五年间头
a3.sources.r3.fileHeader = true
#忽略所有以.tmp结尾的文件,不上传
a3.sources.r3.ignorePattern = ([^ ]*.tmp)

# Describe the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://192.168.137.128:9000/flume/upload/%Y%m%d/%H
#上传文件的前缀
a3.sinks.k3.hdfs.filePrefix = upload-
#是否按照时间滚动文件夹
a3.sinks.k3.hdfs.round = true
#多少时间单位创建一个新的文件夹
a3.sinks.k3.hdfs.roundValue = 1
#重新定义时间单位
a3.sinks.k3.hdfs.roundUnit = hour
#是否使用本地时间戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a3.sinks.k3.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a3.sinks.k3.hdfs.fileType = DataStream
#多久生成一个新的文件
a3.sinks.k3.hdfs.rollInterval = 60
#设置每个文件的滚动大小大概是128M
a3.sinks.k3.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a3.sinks.k3.hdfs.rollCount = 0
# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3

单数据源多出口案例(选择器)

Flume支持将事件流向一个或者多个目的地。这种模式将数据源复制到 多个channel中,每个channel都有相同的数据,sink可以选择传送的 不同的目的地。

flume1监控文件的变动,并将变动的内容传递给flume2和flume3。

flume2负责输出到HDFS上

flume3负责输出到本地上

三个flume在同一台设备上

flume1:

# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# 将数据流复制给所有channel
a1.sources.r1.selector.type = replicating

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/hadoop/bigdatasoftware/nginx/logs/access.log
a1.sources.r1.shell = /bin/bash -c

# Describe the sink
# sink端的avro是一个数据发送者
a1.sinks.k1.type = avro
# 设置其中一个flume接收的地址
a1.sinks.k1.hostname = 192.168.137.128
a1.sinks.k1.port = 4141
a1.sinks.k2.type = avro
# 设置另一个flume的接收地址
a1.sinks.k2.hostname = 192.168.137.128
a1.sinks.k2.port = 4142

# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

flume2:

# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1

# Describe/configure the source
# source端的avro是一个数据接收服务
a2.sources.r1.type = avro
# 设置本机地址,注意端口号
a2.sources.r1.bind = 192.168.137.128
a2.sources.r1.port = 4141

# Describe the sink
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path =
hdfs://192.168.137.128:9000/flume2/%Y%m%d/%H
#上传文件的前缀
a2.sinks.k1.hdfs.filePrefix = flume2-
#是否按照时间滚动文件夹
a2.sinks.k1.hdfs.round = true
#多少时间单位创建一个新的文件夹
a2.sinks.k1.hdfs.roundValue = 1
#重新定义时间单位
a2.sinks.k1.hdfs.roundUnit = hour
#是否使用本地时间戳
a2.sinks.k1.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a2.sinks.k1.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a2.sinks.k1.hdfs.fileType = DataStream
#多久生成一个新的文件
a2.sinks.k1.hdfs.rollInterval = 600
#设置每个文件的滚动大小大概是128M
a2.sinks.k1.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a2.sinks.k1.hdfs.rollCount = 0

# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

flume3:

# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c2

# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = 192.168.137.128
a3.sources.r1.port = 4142

# Describe the sink
a3.sinks.k1.type = file_roll
a3.sinks.k1.sink.directory = /opt/module/data/flume3

# Describe the channel
a3.channels.c2.type = memory
a3.channels.c2.capacity = 1000
a3.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r1.channels = c2
a3.sinks.k1.channel = c2

注意:接收方与发送方的地址和端口号要对应

单数据源多出口案例(Sink组)

Flume支持使用将多个sink逻辑上分到一个sink组,flume将数据发送 到不同的sink,主要解决负载均衡和故障转移问题

配置1个接收日志文件的source和1个channel、两个sink,分别输送给flume-flume-console1和flume-flume-console2。

flume1:

a1.sources = r1
a1.channels = c1
a1.sinks = k1 k2

a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 22222

#定义一个sink组
#一个channel对应多个sink时要设置一个sinkgroups
a1.sinkgroups = g1
#指明sink组中的sink实例
a1.sinkgroups.g1.sinks = k1 k2
#设置sinkProcessor的类型(负载均衡)
a1.sinkgroups.g1.processor.type = load_balance
#①random-随机分配  ②round_robin-轮循
a1.sinkgroups.g1.processor.selector = random


a1.channels.c1.type = memory


a1.sinks.k1.type = avro
a1.sinks.k1.hostname = 192.168.137.128
a1.sinks.k1.port = 33333

a1.sinks.k2.type = avro
a1.sinks.k2.hostname = 192.168.137.129
a1.sinks.k2.port = 44444


a1.sources.r1.channels = c1 
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1

flume2:

a1.sources = r1
a1.channels = c1
a1.sinks = k1

a1.sources.r1.type = avro
a1.sources.r1.bind = 192.168.137.128
a1.sources.r1.port = 33333

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000

a1.sinks.k1.type = logger

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

flume3:

a1.sources = r1
a1.channels = c1
a1.sinks = k1

a1.sources.r1.type = avro
a1.sources.r1.bind = 192.168.137.129
a1.sources.r1.port = 44444

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000

a1.sinks.k1.type = logger


a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

多数据源汇总

这种模式是我们最常见的,也非常实用,日常web应用通常分布在上百 个服务器,大者甚至上千个、上万个服务器。产生的日志,处理起来也 非常麻烦。用flume的这种组合方式能很好的解决这一问题,每台服务 器部署一个flume采集日志,传送到一个集中收集日志的flume,再由 此flume上传到hdfs、hive、hbase、jms等,进行日志分析

flume1监控一个文件的变动

flume2监控一个端口的数据

flume1和flume2将数据发送给flume3,flume3最终将数据打印到控制台。

flume1:

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/hadoop/nginx/logs/access.log
a1.sources.r1.shell = /bin/bash -c

# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = 192.168.137.129
a1.sinks.k1.port = 4141

# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

flume2:

# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1

# Describe/configure the source
a2.sources.r1.type = netcat
a2.sources.r1.bind = 198.168.137.128
a2.sources.r1.port = 44444

# Describe the sink
a2.sinks.k1.type = avro
a2.sinks.k1.hostname = 192.168.137.129
a2.sinks.k1.port = 4141

# Use a channel which buffers events in memory
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

flume3:

# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c1

# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = 192.168.137.129
a3.sources.r1.port = 4141

# Describe the sink
# Describe the sink
a3.sinks.k1.type = logger

# Describe the channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1

本文摘自 :https://www.cnblogs.com/

开通会员,享受整站包年服务立即开通 >