`
字母哥
  • 浏览: 68919 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

flume+kafka+sparkstreaming搭建整合

阅读更多
主要是数据从flume进去kafka,然后交给sparkstreaming处理的流程
本文依旧以单词计数例子为例
首先,flume使用1.6版本,如果是1.6以下的话,没带有官方的kafkasink,需要自己实现,自己实现也不难实现,写一个自定义的sink,在里面方法调用kafka生产者代码,把数据发送到指定的kafka的broker的topic即可。
此处使用1.6版本,直接使用kafkaSink即可
agent4.channels.ch1.type = memory

agent4.sources.avro-source1.channels = ch1
agent4.sources.avro-source1.type = avro
agent4.sources.avro-source1.bind = 0.0.0.0
agent4.sources.avro-source1.port = 41414
 
agent4.sinks.log-sink1.channel = ch1
agent4.sinks.log-sink1.type = org.apache.flume.sink.kafka.KafkaSink
agent4.sinks.log-sink1.topic = test
agent4.sinks.log-sink1.brokerList = localhost:9092
agent4.sinks.log-sink1.requiredAcks = 1
agent4.sinks.log-sink1.batchSize = 20
 
agent4.channels = ch1
agent4.sources = avro-source1
agent4.sinks = log-sink1

然后启动flume即可
 bin/flume-ng agent --conf ./conf/ -f conf/agent4 -Dflume.root.logger=DEBUG,console -n agent4

开始进行kafka的操作,这里都是单机部署,包括flume和kafka
首先启动zookeeper服务
bin/zookeeper-server-start.sh config/zookeeper.properties

然后启动Kafka
bin/kafka-server-start.sh config/server.properties >/dev/null 2>&1 &

然后创建一个"test"的topic,一个分区一个副本
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

可以查看一下所有主题,验证一下
bin/kafka-topics.sh --list --zookeeper localhost:2181


然后开始写spark部分的代码
首先加入kafka的maven依赖
		<dependency>
			<groupId>org.apache.spark</groupId>
			<artifactId>spark-streaming-kafka_2.10</artifactId>
			<version>1.2.1</version>
		</dependency>

代码如下:
package cn.han

import org.apache.spark.SparkConf
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.flume._
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.kafka._

object MySparkkafka {
  def main(args: Array[String]): Unit = {
    System.setProperty("hadoop.home.dir","E:\\hadoop-2.6.0")
    val sc = new SparkContext("local[2]", "Spark Streaming kafka Integration")  
    //创建StreamingContext,3秒一个批次  
    val ssc = new StreamingContext(sc, Seconds(4))  
    val kafkaParams: Map[String, String] = Map("group.id" -> "terran")
    val readParallelism = 5
    val topics = Map("test" -> 1)
    val sl=StorageLevel.MEMORY_ONLY
    val kafkaStream=KafkaUtils.createStream(ssc,"192.168.5.220:2181","terran", topics, sl)
   
    
    val rss2=kafkaStream.flatMap(x =>{
      val by=x._2
      val sb=by.split(" ")
      sb
    })
    
    val rdd3=rss2.map(x=>(x,1))
    val rdd4=rdd3.reduceByKey(_+_)
    rdd4.print()
    //开始运行  
    ssc.start()  
    //计算完毕退出  
    ssc.awaitTermination()  
    sc.stop()  
  }
}


还需要一个简单的log4j程序,向flume写入测试数据,上一篇博客已经引入,这里就不再赘述了。最终,执行spark代码即可。
0
2
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics