SparkStreaming的普通transform算子和一些特殊的output算子使用

您所在的位置:网站首页 sparkstreaming广播变量 SparkStreaming的普通transform算子和一些特殊的output算子使用

SparkStreaming的普通transform算子和一些特殊的output算子使用

2023-09-08 01:00| 来源: 网络整理| 查看: 265

SparkStreaming之特殊算子

常见的算子大家还是参考官网:官网SparkStreaming算子

其实DStream与RDD的算子大概都差不多,有个别差异,相同的相信大家都懂,比如map()\flatMap()\filter()\reduceByKey()\repartition()\union()\join()等等

但是有一些不同的下面我们来一一对比

区别算子

count()

reduce()

countByValue()

1、count()

在RDD的的count是一个执行算子,返回rdd中的elements的个数,但是在SparkStreaming的DStream中的count最终是算出当前批次的DStream的元素个数

输入的数据格式为

flink spark core sql streaming flink spark

//在RDD的的count是一个执行算子,返回rdd中的elements的个数 RDD[T] => Long val long: Long = rdd.count() //但是在SparkStreaming的DStream中的count就不一样了 DStream[T] => DStream[Long] // 代码简单实现 def main(args: Array[String]): Unit = { val sc: SparkContext = SparkUtil.getLocalSC() val ssc = new StreamingContext(sc, Durations.seconds(5)) val dstream: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999) val priStream: DStream[(String, Int)] = dstream.flatMap(_.split("\\s")) .filter(!_.isEmpty) //用来过滤DStream中的元素 .map((_, 1)) //将Stream中的每个元素进行转换 priStream.count() ssc.start() ssc.awaitTermination() } //用SparkStreaming的源码思想实现count() priStream.map(_ => (null, 1L)) .transform { rdd => val rdd1: RDD[(Null, Long)] = ssc.sparkContext.makeRDD(Seq((null, 0L)), 1) val kv: RDD[(Null, Long)] = rdd.union(rdd1) kv }.reduceByKey(_ + _) .map(_._2)

*输出结果 ====> 6*

2、reduce()

在RDD上reduce()是一个action算子,在DStream上reduce()是一个transform算子,用来返回一个只有唯一一个element的DStream

输入数据与count()的数据源一致

// RDD RDD[T] => T val str: String = rdd.reduce(_ + _) // DStream DStream[T] => Dstream[T] val sc: SparkContext = SparkUtil.getLocalSC() val ssc = new StreamingContext(sc, Durations.seconds(5)) val dstream: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999) val priStream: DStream[(String, Int)] = dstream.flatMap(_.split("\\s")) .filter(!_.isEmpty) //用来过滤DStream中的元素 .map((_, 1)) //将Stream中的每个元素进行转换 val thirdStream: DStream[(String, Int)] = priStream.reduce((a, b) => (a._1, a._2 + b._2)) //用streaming的源码怎么实现reduce priStream.map((null,_)).reduceByKey(((s:String,a:Int),(s1:String,a1:Int))=> (s2:String,a2:Int)).map(_._2) 3、countByValue()

countByValue()是将相同value值的kv数据按照

输入数据与count()的输入数据一样

输入数据:

sql streaming flink spark

value1输出结果:

(flink,7) (streaming,7) (spark,7) (sql,7)

val value: DStream[((String, Int), Long)] = priStream.countByValue() //value.print() println("----######----######-----") val value1: DStream[(String, Long)] = priStream.map(_._1).countByValue() value1.print() //用源码实现countByValue的效果 val value2: DStream[((String, Int), Long)] = priStream.map((_, 1L)).reduceByKey(_ + _) value2.print() 特殊算子

updateStateByKey()

transform()

foreachRDD()

1、updateStateByKey()

Spark对于状态的支持没有Flink那么好,但是还是能够撑起简单的状态编程,updateStateByKey()的状态存储在checkpoint目录的文件中,所以必须开启ssc.checkpoint(“path”)

同时需要传入一个updateFun,可以是一个def也可以是一个匿名的方法函数

简单的代码示意:

package com.shufang.spark_streaming.opers import com.shufang.utils.SparkUtil import org.apache.spark.SparkContext import org.apache.spark.streaming.{Durations, StreamingContext} import org.apache.spark.streaming.dstream.DStream /** * 在spark中虽然对状态支持没有Flink做的那么好,但是也是支持保存状态的, * 但是需要指定checkpoint的存储目录的 * updateStateByKey只适用于RDD[(K,V)]类型,可以全局通过checkpoint保证不同key的value状态 */ object UpdateStateByKeyDemo { def main(args: Array[String]): Unit = { val sc: SparkContext = SparkUtil.getLocalSC() val ssc = new StreamingContext(sc, Durations.seconds(10)) ssc.checkpoint("src/main/data/checkpoint_dir") val source: DStream[(String, Int)] = ssc.socketTextStream("localhost", 9999) .flatMap(_.split("\\s")).map((_, 1)) .reduceByKey(_ + _) //自定义更新的方法,也可以用匿名函数代替,如下 val updateFun = (values:Seq[Int],state:Option[Int]) => { val newstate: Int = values.sum val oldstate = state.getOrElse(0) Some(newstate+oldstate) } //匿名updateFun形式的updateStateByKey的使用 val updateStream: DStream[(String, Int)] = source.updateStateByKey( (value: Seq[Int], state: Option[Int]) => { val newstate: Int = value.sum val oldstate = state.getOrElse(0) Some(newstate + oldstate) }) //output操作 updateStream.print(20) ssc.start() ssc.awaitTermination() } } 2、transform()

Transform()算子是Streaming的最基本的转换函数,可以将对DStream的操作转化成熟悉的RDD、DF、DS操作,而且还可以使用广播变量以及累加器等共享变量,最终会返回一个新的DStream[T] ,其中T的类型就是DStream中包含的RDD的元素类型T

简单的示意代码如下:

package com.shufang.spark_streaming.opers import com.shufang.utils.SparkUtil import org.apache.spark.SparkContext import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Row, SparkSession} import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.{Durations, StreamingContext} /** * transform算子 */ object TransformOperDemo { def main(args: Array[String]): Unit = { val sc: SparkContext = SparkUtil.getLocalSC() val ssc = new StreamingContext(sc, Durations.seconds(10)) val seq = Seq(("spark", 1), ("core", 88), ("streaming", 100)) //val rdd: RDD[(String, Int)] = sc.makeRDD(seq) //通过广播变量将RDD的数据以集合的形式传送到DStream中进行运算 val bc: Broadcast[Seq[(String, Int)]] = sc.broadcast(seq) val spark: SparkSession = SparkSession.builder().config(sc.getConf).getOrCreate() import spark.implicits._ //获取DStream val source: DStream[(String, Int)] = ssc.socketTextStream("localhost", 9999) .flatMap(_.split("\\s")) .map((_, 1)) .reduceByKey(_ + _) //调用transform算子,还可以使用广播变量 val dStream: DStream[(String, Int)] = source.transform { rdd1 => val rdd: RDD[(String, Int)] = bc.value.toDF().rdd.map(row => (row.getString(0), row.getInt(1))) val result: RDD[(String, Int)] = rdd1.union(rdd) result } //输出transform算子的转换结果 dStream.print(100) ssc.start() ssc.awaitTermination() } } 3、foreachRDD()

foreachRDD() 是一个特殊的output算子,Dstream只有在遇见output算子的情况下才会执行计算,而foreachRDD会将DStream的操作转化成RDD的操作,假如在foreachRDD算子中,不对RDD做任何action操作,那么相当于这个DStream只是简单的接受数据并且丢弃数据,不会执行SparkSteaming的计算。

简单的示意代码如下:

package com.shufang.spark_streaming.opers import com.shufang.utils.SparkUtil import org.apache.spark.SparkContext import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.{Durations, StreamingContext} /** * foreachRDD,这是一个最终output的算子,这个算子可以将不同的RDD的数据 * 输出到外部存储系统如:HDFS\DATBASE\FILE */ object ForeachRDDOperDemo { def main(args: Array[String]): Unit = { val sc: SparkContext = SparkUtil.getLocalSC() val ssc = new StreamingContext(sc, Durations.seconds(10)) val source: DStream[(String, Int)] = ssc.socketTextStream("localhost", 9999) .flatMap(_.split("\\s")).map((_, 1)) .reduceByKey(_ + _) /** * DStream的output操作 * RDD的transform操作是lazy的,只有碰见action算子,那么才开始计算 * DStream的output算子例如foreachRDD也是执行计算的关键: * 1、foreachRDD算子会按照血缘关系计算之前的transform算子 * 2、假如foreachRDD中没对RDD做action操作,那么就会舍弃这些数据,什么也不做, * 只有对RDD做了对应的action操作,才会触发job */ source.foreachRDD( rdd => { //spark的创建和相关导入都需要放在foreachRDD里面 val spark: SparkSession = SparkSession.builder().config(sc.getConf).getOrCreate() import spark.implicits._ val ds: Dataset[CountCase] = rdd.toDF("name", "count").as[CountCase] //output,action操作,同时还可以做SQL操作 ds.write .format("jdbc") .mode("overwrite") .option("url", "jdbc:mysql://localhost:3306/hello") .option("dbtable", "foreach_rdd") .option("user", "root") .option("password", "888888") .save() } ) ssc.start() ssc.awaitTermination() } } case class CountCase(name:String,count:Long)


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3