一、简介 Apache Flume是一个分布式,高可用的数据收集系统,可以从不同的数据源收集数据,经过聚合后发送到分布式计算框架或者存储系统中。Spark Straming提供了以下两种方式用于Flume的整合。
二、推送式方法 在推送式方法(Flume-style Push-based Approach)中,Spark Streaming程序需要对某台服务器的某个端口进行监听,Flume通过avro Sink
将数据源源不断推送到该端口。这里以监听日志文件为例,具体整合方式如下:
2.1 配置日志收集Flume 新建配置netcat-memory-avro.properties
,使用tail
命令监听文件内容变化,然后将新的文件内容通过avro sink
发送到hadoop001这台服务器的8888端口:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 #指定agent的sources,sinks,channels a1.sources = s1 a1.sinks = k1 a1.channels = c1 #配置sources属性 a1.sources .s1 .type = exec a1.sources .s1 .command = tail -F /tmp/log.txt a1.sources .s1 .shell = /bin/bash -c a1.sources .s1 .channels = c1 #配置sink a1.sinks .k1 .type = avro a1.sinks .k1 .hostname = hadoop001 a1.sinks .k1 .port = 8888 a1.sinks .k1 .batch-size = 1 a1.sinks .k1 .channel = c1 #配置channel类型 a1.channels .c1 .type = memory a1.channels .c1 .capacity = 1000 a1.channels .c1 .transactionCapacity = 100
2.2 项目依赖 项目采用Maven工程进行构建,主要依赖为spark-streaming
和spark-streaming-flume
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 <properties > <scala.version > 2.11</scala.version > <spark.version > 2.4.0</spark.version > </properties > <dependencies > <dependency > <groupId > org.apache.spark</groupId > <artifactId > spark-streaming_$ {scala.version} </artifactId > <version > $ {spark.version} </version > </dependency > <dependency > <groupId > org.apache.spark</groupId > <artifactId > spark-streaming-flume_$ {scala.version} </artifactId > <version > 2.4.3</version > </dependency > </dependencies >
2.3 Spark Streaming接收日志数据 调用 FlumeUtils工具类的createStream
方法,对hadoop001的8888端口进行监听,获取到流数据并进行打印:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 import org.apache.spark.SparkConfimport org.apache.spark.streaming.{Seconds, StreamingContext}import org.apache.spark.streaming.flume.FlumeUtilsobject PushBasedWordCount { def main(args: Array [String]): Unit = { val sparkConf = new SparkConf() val ssc = new StreamingContext(sparkConf, Seconds(5 )) val flumeStream = FlumeUtils.createStream(ssc, "hadoop001" , 8888 ) flumeStream.map (line => new String(line.event.getBody.array ()).trim ).print() ssc.start() ssc.awaitTermination() } }
2.4 项目打包 因为Spark安装目录下是不含有spark-streaming-flume
依赖包的,所以在提交到集群运行时候必须提供该依赖包,你可以在提交命令中使用--jar
指定上传到服务器的该依赖包,或者使用--packages org.apache.spark:spark-streaming-flume_2.12:2.4.3
指定依赖包的完整名称,这样程序在启动时会先去中央仓库进行下载。
这里我采用的是第三种方式:使用maven-shade-plugin
插件进行ALL IN ONE
打包,把所有依赖的Jar一并打入最终包中。需要注意的是spark-streaming
包在Spark安装目录的jars
目录中已经提供,所以不需要打入。插件配置如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 <build > <plugins > <plugin > <groupId > org.apache.maven.plugins</groupId > <artifactId > maven-compiler-plugin</artifactId > <configuration > <source > 8</source > <target > 8</target > </configuration > </plugin > <plugin > <groupId > org.apache.maven.plugins</groupId > <artifactId > maven-shade-plugin</artifactId > <configuration > <createDependencyReducedPom > true</createDependencyReducedPom > <filters > <filter > <artifact > *:*</artifact > <excludes > <exclude > META-INF/*.SF</exclude > <exclude > META-INF/*.sf</exclude > <exclude > META-INF/*.DSA</exclude > <exclude > META-INF/*.dsa</exclude > <exclude > META-INF/*.RSA</exclude > <exclude > META-INF/*.rsa</exclude > <exclude > META-INF/*.EC</exclude > <exclude > META-INF/*.ec</exclude > <exclude > META-INF/MSFTSIG.SF</exclude > <exclude > META-INF/MSFTSIG.RSA</exclude > </excludes > </filter > </filters > <artifactSet > <excludes > <exclude > org.apache.spark:spark-streaming_$ {scala.version} </exclude > <exclude > org.scala-lang:scala-library</exclude > <exclude > org.apache.commons:commons-lang3</exclude > </excludes > </artifactSet > </configuration > <executions > <execution > <phase > package</phase > <goals > <goal > shade</goal > </goals > <configuration > <transformers > <transformer implementation ="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" /> <transformer implementation ="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer" > </transformer > </transformers > </configuration > </execution > </executions > </plugin > <plugin > <groupId > org.scala-tools</groupId > <artifactId > maven-scala-plugin</artifactId > <version > 2.15.1</version > <executions > <execution > <id > scala-compile</id > <goals > <goal > compile</goal > </goals > <configuration > <includes > <include > **/*.scala</include > </includes > </configuration > </execution > <execution > <id > scala-test-compile</id > <goals > <goal > testCompile</goal > </goals > </execution > </executions > </plugin > </plugins > </build >
本项目完整源码见:spark-streaming-flume
使用mvn clean package
命令打包后会生产以下两个Jar包,提交非original
开头的Jar即可。
2.5 启动服务和提交作业 启动Flume服务:
1 2 3 4 flume -ng agent \ --conf conf \ --conf-file /usr/app/apache-flume-1 .6 .0 -cdh5.15 .2 -bin/examples/netcat-memory-avro.properties \ --name a1 -Dflume.root.logger=INFO,console
提交Spark Streaming作业:
1 2 3 4 spark -submit \ --class com.myhhub.flume.PushBasedWordCount \ --master local[4 ] \ /usr/appjar/spark-streaming-flume-1 .0 .jar
2.6 测试 这里使用echo
命令模拟日志产生的场景,往日志文件中追加数据,然后查看程序的输出:
Spark Streaming程序成功接收到数据并打印输出:
2.7 注意事项 1. 启动顺序 这里需要注意的,不论你先启动Spark程序还是Flume程序,由于两者的启动都需要一定的时间,此时先启动的程序会短暂地抛出端口拒绝连接的异常,此时不需要进行任何操作,等待两个程序都启动完成即可。
2. 版本一致 最好保证用于本地开发和编译的Scala版本和Spark的Scala版本一致,至少保证大版本一致,如都是2.11
。
三、拉取式方法 拉取式方法(Pull-based Approach using a Custom Sink)是将数据推送到SparkSink
接收器中,此时数据会保持缓冲状态,Spark Streaming定时从接收器中拉取数据。这种方式是基于事务的,即只有在Spark Streaming接收和复制数据完成后,才会删除缓存的数据。与第一种方式相比,具有更强的可靠性和容错保证。整合步骤如下:
3.1 配置日志收集Flume 新建Flume配置文件netcat-memory-sparkSink.properties
,配置和上面基本一致,只是把a1.sinks.k1.type
的属性修改为org.apache.spark.streaming.flume.sink.SparkSink
,即采用Spark接收器。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 #指定agent的sources,sinks,channels a1.sources = s1 a1.sinks = k1 a1.channels = c1 #配置sources属性 a1.sources .s1 .type = exec a1.sources .s1 .command = tail -F /tmp/log.txt a1.sources .s1 .shell = /bin/bash -c a1.sources .s1 .channels = c1 #配置sink a1.sinks .k1 .type = org.apache .spark .streaming .flume .sink .SparkSink a1.sinks .k1 .hostname = hadoop001 a1.sinks .k1 .port = 8888 a1.sinks .k1 .batch-size = 1 a1.sinks .k1 .channel = c1 #配置channel类型 a1.channels .c1 .type = memory a1.channels .c1 .capacity = 1000 a1.channels .c1 .transactionCapacity = 100
2.2 新增依赖 使用拉取式方法需要额外添加以下两个依赖:
1 2 3 4 5 6 7 8 9 10 <dependency > <groupId > org.scala-lang</groupId > <artifactId > scala-library</artifactId > <version > 2.12.8</version > </dependency > <dependency > <groupId > org.apache.commons</groupId > <artifactId > commons-lang3</artifactId > <version > 3.5</version > </dependency >
注意:添加这两个依赖只是为了本地测试,Spark的安装目录下已经提供了这两个依赖,所以在最终打包时需要进行排除。
2.3 Spark Streaming接收日志数据 这里和上面推送式方法的代码基本相同,只是将调用方法改为createPollingStream
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 import org.apache.spark.SparkConfimport org.apache.spark.streaming.{Seconds, StreamingContext}import org.apache.spark.streaming.flume.FlumeUtilsobject PullBasedWordCount { def main(args: Array [String]): Unit = { val sparkConf = new SparkConf() val ssc = new StreamingContext(sparkConf, Seconds(5 )) val flumeStream = FlumeUtils.createPollingStream(ssc, "hadoop001" , 8888 ) flumeStream.map (line => new String(line.event.getBody.array ()).trim ).print() ssc.start() ssc.awaitTermination() } }
2.4 启动测试 启动和提交作业流程与上面相同,这里给出执行脚本,过程不再赘述。
启动Flume进行日志收集:
1 2 3 4 flume -ng agent \ --conf conf \ --conf-file /usr/app/apache-flume-1 .6 .0 -cdh5.15 .2 -bin/examples/netcat-memory-sparkSink.properties \ --name a1 -Dflume.root.logger=INFO,console
提交Spark Streaming作业:
1 2 3 4 spark -submit \ --class com.myhhub.flume.PullBasedWordCount \ --master local[4 ] \ /usr/appjar/spark-streaming-flume-1 .0 .jar
参考资料