package com.immooc.spark import org.apache.spark.{SparkConf, rdd} import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Seconds, StreamingContext, Time} import org.apache.spark.sql.SparkSession object SqlNetworkWordCount { def main(args: Array[String]): Unit = { var sparkConf = new SparkConf().setMaster("local[2]").setAppName("SqlNetworkWordCount") val ssc = new StreamingContext(sparkConf, Seconds(5)) val lines = ssc.socketTextStream("localhost", 9998) // val result = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _) val words = lines.flatMap(_.split(" ")) words.foreachRDD { (rdd: RDD[String], time: Time) => val spark = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf) import spark.implicits._ val wordsDataFrame = rdd.map(w => Record(w)).toDF() wordsDataFrame.createOrReplaceTempView("words") val wordCountsDataFrame = spark.sql("select word, count(*) as total from words group by word") println(s"========== $time ============") wordCountsDataFrame.show() } ssc.start() ssc.awaitTermination() } case class Record(word: String) object SparkSessionSingleton{ @transient private var instance: SparkSession = _ def getInstance(sparkConf: SparkConf): SparkSession = { if (instance == null) { instance = SparkSession .builder .config(sparkConf) .getOrCreate() } instance } } }4236
17
6月
spark streaming 中使用 spark sql

[…] http://www.waitingfy.com/archives/4236 […]