SparkStreaming的普通transform算子和一些特殊的output算子使用 |
您所在的位置:网站首页 › sparkstreaming广播变量 › SparkStreaming的普通transform算子和一些特殊的output算子使用 |
SparkStreaming之特殊算子
常见的算子大家还是参考官网:官网SparkStreaming算子 其实DStream与RDD的算子大概都差不多,有个别差异,相同的相信大家都懂,比如map()\flatMap()\filter()\reduceByKey()\repartition()\union()\join()等等 但是有一些不同的下面我们来一一对比 区别算子count() reduce() countByValue() 1、count()在RDD的的count是一个执行算子,返回rdd中的elements的个数,但是在SparkStreaming的DStream中的count最终是算出当前批次的DStream的元素个数 输入的数据格式为 flink spark core sql streaming flink spark //在RDD的的count是一个执行算子,返回rdd中的elements的个数 RDD[T] => Long val long: Long = rdd.count() //但是在SparkStreaming的DStream中的count就不一样了 DStream[T] => DStream[Long] // 代码简单实现 def main(args: Array[String]): Unit = { val sc: SparkContext = SparkUtil.getLocalSC() val ssc = new StreamingContext(sc, Durations.seconds(5)) val dstream: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999) val priStream: DStream[(String, Int)] = dstream.flatMap(_.split("\\s")) .filter(!_.isEmpty) //用来过滤DStream中的元素 .map((_, 1)) //将Stream中的每个元素进行转换 priStream.count() ssc.start() ssc.awaitTermination() } //用SparkStreaming的源码思想实现count() priStream.map(_ => (null, 1L)) .transform { rdd => val rdd1: RDD[(Null, Long)] = ssc.sparkContext.makeRDD(Seq((null, 0L)), 1) val kv: RDD[(Null, Long)] = rdd.union(rdd1) kv }.reduceByKey(_ + _) .map(_._2)*输出结果 ====> 6* 2、reduce()在RDD上reduce()是一个action算子,在DStream上reduce()是一个transform算子,用来返回一个只有唯一一个element的DStream 输入数据与count()的数据源一致 // RDD RDD[T] => T val str: String = rdd.reduce(_ + _) // DStream DStream[T] => Dstream[T] val sc: SparkContext = SparkUtil.getLocalSC() val ssc = new StreamingContext(sc, Durations.seconds(5)) val dstream: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999) val priStream: DStream[(String, Int)] = dstream.flatMap(_.split("\\s")) .filter(!_.isEmpty) //用来过滤DStream中的元素 .map((_, 1)) //将Stream中的每个元素进行转换 val thirdStream: DStream[(String, Int)] = priStream.reduce((a, b) => (a._1, a._2 + b._2)) //用streaming的源码怎么实现reduce priStream.map((null,_)).reduceByKey(((s:String,a:Int),(s1:String,a1:Int))=> (s2:String,a2:Int)).map(_._2) 3、countByValue()countByValue()是将相同value值的kv数据按照 输入数据与count()的输入数据一样 输入数据: sql streaming flink spark value1输出结果: (flink,7) (streaming,7) (spark,7) (sql,7) val value: DStream[((String, Int), Long)] = priStream.countByValue() //value.print() println("----######----######-----") val value1: DStream[(String, Long)] = priStream.map(_._1).countByValue() value1.print() //用源码实现countByValue的效果 val value2: DStream[((String, Int), Long)] = priStream.map((_, 1L)).reduceByKey(_ + _) value2.print() 特殊算子updateStateByKey() transform() foreachRDD() 1、updateStateByKey()Spark对于状态的支持没有Flink那么好,但是还是能够撑起简单的状态编程,updateStateByKey()的状态存储在checkpoint目录的文件中,所以必须开启ssc.checkpoint(“path”) 同时需要传入一个updateFun,可以是一个def也可以是一个匿名的方法函数 简单的代码示意: package com.shufang.spark_streaming.opers import com.shufang.utils.SparkUtil import org.apache.spark.SparkContext import org.apache.spark.streaming.{Durations, StreamingContext} import org.apache.spark.streaming.dstream.DStream /** * 在spark中虽然对状态支持没有Flink做的那么好,但是也是支持保存状态的, * 但是需要指定checkpoint的存储目录的 * updateStateByKey只适用于RDD[(K,V)]类型,可以全局通过checkpoint保证不同key的value状态 */ object UpdateStateByKeyDemo { def main(args: Array[String]): Unit = { val sc: SparkContext = SparkUtil.getLocalSC() val ssc = new StreamingContext(sc, Durations.seconds(10)) ssc.checkpoint("src/main/data/checkpoint_dir") val source: DStream[(String, Int)] = ssc.socketTextStream("localhost", 9999) .flatMap(_.split("\\s")).map((_, 1)) .reduceByKey(_ + _) //自定义更新的方法,也可以用匿名函数代替,如下 val updateFun = (values:Seq[Int],state:Option[Int]) => { val newstate: Int = values.sum val oldstate = state.getOrElse(0) Some(newstate+oldstate) } //匿名updateFun形式的updateStateByKey的使用 val updateStream: DStream[(String, Int)] = source.updateStateByKey( (value: Seq[Int], state: Option[Int]) => { val newstate: Int = value.sum val oldstate = state.getOrElse(0) Some(newstate + oldstate) }) //output操作 updateStream.print(20) ssc.start() ssc.awaitTermination() } } 2、transform()Transform()算子是Streaming的最基本的转换函数,可以将对DStream的操作转化成熟悉的RDD、DF、DS操作,而且还可以使用广播变量以及累加器等共享变量,最终会返回一个新的DStream[T] ,其中T的类型就是DStream中包含的RDD的元素类型T 简单的示意代码如下: package com.shufang.spark_streaming.opers import com.shufang.utils.SparkUtil import org.apache.spark.SparkContext import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Row, SparkSession} import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.{Durations, StreamingContext} /** * transform算子 */ object TransformOperDemo { def main(args: Array[String]): Unit = { val sc: SparkContext = SparkUtil.getLocalSC() val ssc = new StreamingContext(sc, Durations.seconds(10)) val seq = Seq(("spark", 1), ("core", 88), ("streaming", 100)) //val rdd: RDD[(String, Int)] = sc.makeRDD(seq) //通过广播变量将RDD的数据以集合的形式传送到DStream中进行运算 val bc: Broadcast[Seq[(String, Int)]] = sc.broadcast(seq) val spark: SparkSession = SparkSession.builder().config(sc.getConf).getOrCreate() import spark.implicits._ //获取DStream val source: DStream[(String, Int)] = ssc.socketTextStream("localhost", 9999) .flatMap(_.split("\\s")) .map((_, 1)) .reduceByKey(_ + _) //调用transform算子,还可以使用广播变量 val dStream: DStream[(String, Int)] = source.transform { rdd1 => val rdd: RDD[(String, Int)] = bc.value.toDF().rdd.map(row => (row.getString(0), row.getInt(1))) val result: RDD[(String, Int)] = rdd1.union(rdd) result } //输出transform算子的转换结果 dStream.print(100) ssc.start() ssc.awaitTermination() } } 3、foreachRDD()foreachRDD() 是一个特殊的output算子,Dstream只有在遇见output算子的情况下才会执行计算,而foreachRDD会将DStream的操作转化成RDD的操作,假如在foreachRDD算子中,不对RDD做任何action操作,那么相当于这个DStream只是简单的接受数据并且丢弃数据,不会执行SparkSteaming的计算。 简单的示意代码如下: package com.shufang.spark_streaming.opers import com.shufang.utils.SparkUtil import org.apache.spark.SparkContext import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.{Durations, StreamingContext} /** * foreachRDD,这是一个最终output的算子,这个算子可以将不同的RDD的数据 * 输出到外部存储系统如:HDFS\DATBASE\FILE */ object ForeachRDDOperDemo { def main(args: Array[String]): Unit = { val sc: SparkContext = SparkUtil.getLocalSC() val ssc = new StreamingContext(sc, Durations.seconds(10)) val source: DStream[(String, Int)] = ssc.socketTextStream("localhost", 9999) .flatMap(_.split("\\s")).map((_, 1)) .reduceByKey(_ + _) /** * DStream的output操作 * RDD的transform操作是lazy的,只有碰见action算子,那么才开始计算 * DStream的output算子例如foreachRDD也是执行计算的关键: * 1、foreachRDD算子会按照血缘关系计算之前的transform算子 * 2、假如foreachRDD中没对RDD做action操作,那么就会舍弃这些数据,什么也不做, * 只有对RDD做了对应的action操作,才会触发job */ source.foreachRDD( rdd => { //spark的创建和相关导入都需要放在foreachRDD里面 val spark: SparkSession = SparkSession.builder().config(sc.getConf).getOrCreate() import spark.implicits._ val ds: Dataset[CountCase] = rdd.toDF("name", "count").as[CountCase] //output,action操作,同时还可以做SQL操作 ds.write .format("jdbc") .mode("overwrite") .option("url", "jdbc:mysql://localhost:3306/hello") .option("dbtable", "foreach_rdd") .option("user", "root") .option("password", "888888") .save() } ) ssc.start() ssc.awaitTermination() } } case class CountCase(name:String,count:Long) |
今日新闻 |
推荐新闻 |
CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3 |