Flume简介及基本使用
本文于2072天之前发表,文中内容可能已经过时。
一、Flume简介
Apache Flume是一个分布式,高可用的数据收集系统。它可以从不同的数据源收集数据,经过聚合后发送到存储系统中,通常用于日志数据的收集。Flume 分为 NG 和 OG (1.0 之前)两个版本,NG在OG的基础上进行了完全的重构,是目前使用最为广泛的版本。下面的介绍均以NG为基础。
二、Flume架构和基本概念
下图为Flume的基本架构图:

2.1 基本架构
外部数据源以特定格式向Flume发送events
(事件),当source
接收到events
时,它将其存储到一个或多个channel
,channe
会一直保存events
直到它被sink
所消费。sink
的主要功能从channel
中读取events
,并将其存入外部存储系统或转发到下一个source
,成功后再从channel
中移除events
。
2.2 基本概念
1. Event
Evnet
是Flume NG数据传输的基本单元。类似于JMS和消息系统中的消息。一个Evnet
由标题和正文组成:前者是键/值映射,后者是任意字节数组。
2. Source
数据收集组件,从外部数据源收集数据,并存储到Channel中。
3. Channel
Channel
是源和接收器之间的管道,用于临时存储数据。可以是内存或持久化的文件系统:
Memory Channel
: 使用内存,优点是速度快,但数据可能会丢失(如突然宕机);File Channel
: 使用持久化的文件系统,优点是能保证数据不丢失,但是速度慢。
4. Sink
Sink
的主要功能从Channel
中读取Evnet
,并将其存入外部存储系统或将其转发到下一个Source
,成功后再从Channel
中移除Event
。
5. Agent
是一个独立的(JVM)进程,包含Source
、 Channel
、 Sink
等组件。
2.3 组件种类
Flume中的每一个组件都提供了丰富的类型,适用于不同场景:
Source类型 :内置了几十种类型,如
Avro Source
,Thrift Source
,Kafka Source
,JMS Source
;Sink类型 :
HDFS Sink
,Hive Sink
,HBaseSinks
,Avro Sink
等;Channel类型 :
Memory Channel
,JDBC Channel
,Kafka Channel
,File Channel
等。
对于Flume的使用,除非有特别的需求,否则通过组合内置的各种类型的Source,Sink和Channel就能满足大多数的需求。在 Flume官网上对所有类型组件的配置参数均以表格的方式做了详尽的介绍,并附有配置样例;同时不同版本的参数可能略有所不同,所以使用时建议选取官网对应版本的User Guide作为主要参考资料。
三、Flume架构模式
Flume 支持多种架构模式,分别介绍如下
3.1 multi-agent flow

Flume支持跨越多个Agent的数据传递,这要求前一个Agent的Sink和下一个Agent的Source都必须是Avro
类型,Sink指向Source所在主机名(或IP地址)和端口(详细配置见下文案例三)。
3.2 Consolidation

日志收集中常常存在大量的客户端(比如分布式web服务),Flume支持使用多个Agent分别收集日志,然后通过一个或者多个Agent聚合后再存储到文件系统中。
3.3 Multiplexing the flow

Flume支持从一个Source向多个Channel,也就是向多个Sink传递事件,这个操作称之为Fan Out
(扇出)。默认情况下Fan Out
是向所有的Channel复制Event
,即所有Channel收到的数据都是相同的。同时Flume也支持在Source
上自定义一个复用选择器(multiplexing selector) 来实现自定义的路由规则。
四、Flume配置格式
Flume配置通常需要以下两个步骤:
- 分别定义好Agent的Sources,Sinks,Channels,然后将Sources和Sinks与通道进行绑定。需要注意的是一个Source可以配置多个Channel,但一个Sink只能配置一个Channel。基本格式如下:
1 | <Agent>.sources = <Source> |
- 分别定义Source,Sink,Channel的具体属性。基本格式如下:
1 |
|
五、Flume的安装部署
为方便大家后期查阅,本仓库中所有软件的安装均单独成篇,Flume的安装见:
六、Flume使用案例
介绍几个Flume的使用案例:
- 案例一:使用Flume监听文件内容变动,将新增加的内容输出到控制台。
- 案例二:使用Flume监听指定目录,将目录下新增加的文件存储到HDFS。
- 案例三:使用Avro将本服务器收集到的日志数据发送到另外一台服务器。
6.1 案例一
需求: 监听文件内容变动,将新增加的内容输出到控制台。
实现: 主要使用Exec Source
配合tail
命令实现。
1. 配置
新建配置文件exec-memory-logger.properties
,其内容如下:
1 | #指定agent的sources,sinks,channels |
2. 启动
1 | flume-ng agent \ |
3. 测试
向文件中追加数据:

控制台的显示:

6.2 案例二
需求: 监听指定目录,将目录下新增加的文件存储到HDFS。
实现:使用Spooling Directory Source
和HDFS Sink
。
1. 配置
1 | #指定agent的sources,sinks,channels |
2. 启动
1 | flume-ng agent \ |
3. 测试
拷贝任意文件到监听目录下,可以从日志看到文件上传到HDFS的路径:
1 | cp log.txt logs/ |

查看上传到HDFS上的文件内容与本地是否一致:
1 | hdfs dfs -cat /flume/events/19-04-09/13/log.txt.1554788567801 |

6.3 案例三
需求: 将本服务器收集到的数据发送到另外一台服务器。
实现:使用avro sources
和avro Sink
实现。
1. 配置日志收集Flume
新建配置netcat-memory-avro.properties
,监听文件内容变化,然后将新的文件内容通过avro sink
发送到hadoop001这台服务器的8888端口:
1 | #指定agent的sources,sinks,channels |
2. 配置日志聚合Flume
使用 avro source
监听hadoop001服务器的8888端口,将获取到内容输出到控制台:
1 | #指定agent的sources,sinks,channels |
3. 启动
启动日志聚集Flume:
1 | flume-ng agent \ |
在启动日志收集Flume:
1 | flume-ng agent \ |
这里建议按以上顺序启动,原因是avro.source
会先与端口进行绑定,这样avro sink
连接时才不会报无法连接的异常。但是即使不按顺序启动也是没关系的,sink
会一直重试,直至建立好连接。

4.测试
向文件tmp/log.txt
中追加内容:

可以看到已经从8888端口监听到内容,并成功输出到控制台:
