1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| package streaming
import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies} import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.{SparkConf, SparkContext}
object SparkStreaming_Kafka { def main(args: Array[String]): Unit = { val sparkConf=new SparkConf().setMaster("local[*]").setAppName("SparkStream") .set("spark.io.compression.codec", "snappy") val sc=new SparkContext(sparkConf) sc.setLogLevel("WARN") val ssc=new StreamingContext(sc,Seconds(3))
val kafkaPara=Map[String,Object]( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG->"hadoop01:9092,hadoop02:9092,hadoop03:9092", ConsumerConfig.GROUP_ID_CONFIG->"atguigu", "key.deserializer"->classOf[StringDeserializer], "value.deserializer"->classOf[StringDeserializer] )
val kafkaDataDS=KafkaUtils.createDirectStream[String,String](ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String,String](Set("atguigu"),kafkaPara))
val result = kafkaDataDS.map(_.value()).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
result.print()
ssc.start() ssc.awaitTermination() } }
|