主要是数据从flume进去kafka,然后交给sparkstreaming处理的流程
本文依旧以单词计数例子为例
首先,flume使用1.6版本,如果是1.6以下的话,没带有官方的kafkasink,需要自己实现,自己实现也不难实现,写一个自定义的sink,在里面方法调用kafka生产者代码,把数据发送到指定的kafka的broker的topic即可。
此处使用1.6版本,直接使用kafkaSink即可
agent4.channels.ch1.type = memory
agent4.sources.avro-source1.channels = ch1
agent4.sources.avro-source1.type = avro
agent4.sources.avro-source1.bind = 0.0.0.0
agent4.sources.avro-source1.port = 41414
agent4.sinks.log-sink1.channel = ch1
agent4.sinks.log-sink1.type = org.apache.flume.sink.kafka.KafkaSink
agent4.sinks.log-sink1.topic = test
agent4.sinks.log-sink1.brokerList = localhost:9092
agent4.sinks.log-sink1.requiredAcks = 1
agent4.sinks.log-sink1.batchSize = 20
agent4.channels = ch1
agent4.sources = avro-source1
agent4.sinks = log-sink1
然后启动flume即可
bin/flume-ng agent --conf ./conf/ -f conf/agent4 -Dflume.root.logger=DEBUG,console -n agent4
开始进行kafka的操作,这里都是单机部署,包括flume和kafka
首先启动zookeeper服务
bin/zookeeper-server-start.sh config/zookeeper.properties
然后启动Kafka
bin/kafka-server-start.sh config/server.properties >/dev/null 2>&1 &
然后创建一个"test"的topic,一个分区一个副本
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
可以查看一下所有主题,验证一下
bin/kafka-topics.sh --list --zookeeper localhost:2181
然后开始写spark部分的代码
首先加入kafka的maven依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.10</artifactId>
<version>1.2.1</version>
</dependency>
代码如下:
package cn.han
import org.apache.spark.SparkConf
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.flume._
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.kafka._
object MySparkkafka {
def main(args: Array[String]): Unit = {
System.setProperty("hadoop.home.dir","E:\\hadoop-2.6.0")
val sc = new SparkContext("local[2]", "Spark Streaming kafka Integration")
//创建StreamingContext,3秒一个批次
val ssc = new StreamingContext(sc, Seconds(4))
val kafkaParams: Map[String, String] = Map("group.id" -> "terran")
val readParallelism = 5
val topics = Map("test" -> 1)
val sl=StorageLevel.MEMORY_ONLY
val kafkaStream=KafkaUtils.createStream(ssc,"192.168.5.220:2181","terran", topics, sl)
val rss2=kafkaStream.flatMap(x =>{
val by=x._2
val sb=by.split(" ")
sb
})
val rdd3=rss2.map(x=>(x,1))
val rdd4=rdd3.reduceByKey(_+_)
rdd4.print()
//开始运行
ssc.start()
//计算完毕退出
ssc.awaitTermination()
sc.stop()
}
}
还需要一个简单的log4j程序,向flume写入测试数据,上一篇博客已经引入,这里就不再赘述了。最终,执行spark代码即可。
分享到:
相关推荐
flume+Logstash+Kafka+Spark Streaming进行实时日志处理分析【大数据】
通过flume监控文件,让kafka消费flume数据,再将sparkstreaming连接kafka作为消费者进行数据处理,文档整理实现
Flume+kafka+Storm整合 示例简介: 以下为三个组建整合,这里只做操作也演示结果,原理性方面大家多学习基础。 流程顺序是flume获取telnet数据,将接收到的数据发送至kafak,kafka作为Storm的spout,Storm进行有向无...
flume+kafka+flink+mysql实现nginx数据统计与分析
基于 Flume+Kafka+Spark Streaming 实现实时监控输出日志的报警系统的 Spark Streaming 程序代码,博客链接: https://blog.csdn.net/linge1995/article/details/81326146 Spark Stream 实时监控
毕业设计,课程设计,项目源码均经过助教老师测试,运行无误,欢迎下载交流 ----- 下载后请首先打开README.md文件(如有)
本科毕业设计项目,基于spark streaming+flume+kafka+hbase的实时日志处理分析系统 基于spark streaming+flume+kafka+hbase的实时日志处理分析系统 本科毕业设计项目,基于spark streaming+flume+kafka+hbase的...
1、内容概要:Hadoop+Spark+Hive+HBase+Oozie+Kafka+Flume+Flink+Elasticsearch+Redash等大数据集群及组件搭建指南(详细搭建步骤+实践过程问题总结)。 2、适合人群:大数据运维、大数据相关技术及组件初学者。 3、...
详细讲解flume+kafka+spark实验环境搭建和测试例子,资源不能一次上传多个。需要更多资源可以免费给大家,q:1487954071
搭建Hadoop集群,并使用flume+kafka+storm+hbase实现日志抓取分析,使用一个主节点master、两个slave节点
lnmp(linux+nginx+mysql+php)安装配置及分布式系统大数据处理hadoop集群中的flume+Kafka+Storm+HDFS等实时系统搭分享
基于Flume+Kafka+SparkStreaming打造企业大数据流处理平台视频教程.txt
基于 Flume+Kafka+Spark Streaming 实现实时监控输出日志的报警系统的 Spark Streaming 程序代码,博客链接: https://blog.csdn.net/linge1995/article/details/81326146
基于FlumeKafkaSpark-的分布式日志流处理系统的设计与实现
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例.txt基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例.txt基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例.txt基于Flume+Kafka+Hbase+Flink+FineBI的实时综合...
基于spark streaming+flume+kafka+hbase的实时日志处理分析系统源码(分控制台版本和Web UI可视化版本).zip基于spark streaming+flume+kafka+hbase的实时日志处理分析系统源码(分控制台版本和Web UI可视化版本).zip...
log4j+flume+kafka+storm整合
主要讲的是Flume+Kafka+Storm的环境整合,并且全部都是最新版本 、、、你也可以到博客地址http://blog.csdn.net/u012185296中去学习相关的云技术,Flume+Kafka+Storm+Redis/Hbase+Hadoop+Hive+Mahout+Spark ......
本资源中的源码都是经过本地编译过可运行的,下载后按照文档配置好环境就可以运行。资源项目的难度比较适中,内容都是经过助教老师审定过的,应该能够满足学习、使用需求,如果有需要的话可以放心下载使用。...