sparkstreaming+kafka+Hbase 实现实时数据写入Hbase

您所在的位置:网站首页 sparkstreaming读取数据库 sparkstreaming+kafka+Hbase 实现实时数据写入Hbase

sparkstreaming+kafka+Hbase 实现实时数据写入Hbase

2023-12-28 03:46| 来源: 网络整理| 查看: 265

安装kafka、sparkstreaming、hbase参考我之前写的文章

基于maven hadoop 2.7.5 hive 1.3.2 spark 2.4.7

代码:POM.XML

4.0.0 org.example maven_kafka_test 1.0-SNAPSHOT 8 8 org.apache.spark spark-streaming_2.11 2.4.7 org.apache.spark spark-streaming-kafka-0-10_2.11 2.4.7 org.apache.hbase hbase-client 1.3.2 org.apache.hbase hbase-common 1.3.2 org.apache.hbase hbase-server 1.3.2 net.alchim31.maven scala-maven-plugin 3.2.2 compile-scala compile add-source compile test-compile-scala test-compile add-source testCompile 2.11.12 org.apache.maven.plugins maven-compiler-plugin 3.2 1.8 1.8 org.apache.maven.plugins maven-assembly-plugin 3.0.0 false jar-with-dependencies test make-assembly package single

 

HbaseUtil   hbase连接

package dstream.output.hbase import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory} import org.apache.hadoop.hbase.{HBaseConfiguration, HConstants} object HbaseUtil extends Serializable { private val conf = HBaseConfiguration.create() conf.set(HConstants.ZOOKEEPER_CLIENT_PORT, "2181") conf.set(HConstants.ZOOKEEPER_QUORUM, "master,slave1,slave2") @volatile private var connection: Connection = _ @volatile private var num = 0 def getHBaseConn: Connection = { synchronized { if (connection == null || connection.isClosed() || num == 0) { connection = ConnectionFactory.createConnection(conf) println("conn is created! " + Thread.currentThread().getName()) } //每请求一次连接,计数器加一 num = num + 1 println("request conn num: " + num + " " + Thread.currentThread().getName()) } connection } def closeHbaseConn(): Unit = { synchronized { if (num "master:9092", //kafka集群地址 "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "jamjar", //消费者组名121212 "auto.offset.reset" -> "earliest", //latest自动重置偏移量为最新的偏移量 "enable.auto.commit" -> (false: java.lang.Boolean)) //如果是true,则这个消费者的偏移量会在后台自动提交 val topics = Array("top4") //消费主题,可以同时消费多个 //创建DStream,返回接收到的输入数据 val stream = KafkaUtils.createDirectStream[String, String]( //kafka input string output string //dont put kafka node in the spark computer node! ssc, PreferConsistent, //perferconsistent/preferbrokers:when spark datanode and namenode in the same node use this! Subscribe[String, String](topics, kafkaParams)) //subscribe input string output string. (iterator) // //打印获取到的数据,因为1秒刷新一次,所以数据长度大于0时才打印 // // stream.foreachRDD(f => { // // if (f.count > 0) // // f.foreach(f => println(f.value())) // // }) // println("hello world!") // val wordCounts=stream.map(crd=>crd.value()) val words = stream.flatMap(_.value().split(" ")).map((_, 1)) val wordCounts = words.updateStateByKey( //每个单词每次batch计算的时候都会调用这个函数 //第一个参数为每个key对应的新的值,可能有多个,比如(hello,1)(hello,1),那么values为(1,1) //第二个参数为这个key对应的之前的状态 (values: Seq[Int], state: Option[Int]) => { var newValue = state.getOrElse(0) values.foreach(newValue += _) Option(newValue) }) wordCounts.print() wordCounts.foreachRDD((rdd, time: Time) => { //RDD为空时,无需再向下执行,否则在分区中还需要获取数据库连接(无用操作) if (!rdd.isEmpty()) { //一个分区执行一批SQL rdd.foreachPartition((partition) => { //每个分区都会创建一个task任务线程,分区多,资源利用率高 //可通过参数配置分区数:"--conf spark.default.parallelism=20" if (!partition.isEmpty) { //partition和record共同位于本地计算节点Worker,故无需序列化发送conn和statement //如果多个分区位于一个Worker中,则共享连接(位于同一内存资源中) //获取HBase连接 val conn = HbaseUtil.getHBaseConn if (conn == null) { println("conn is null.") //在Worker节点的Executor中打印 } else { println("conn is not null." + Thread.currentThread().getName()) partition.foreach((record: (String, Int)) => { //每个分区中的记录在同一线程中处理 println("record : " + Thread.currentThread().getName()) //设置表名 val tableName = TableName.valueOf("wordfreq") //获取表的连接 val table = conn.getTable(tableName) try { //设定行键(单词) val put = new Put(Bytes.toBytes(record._1)) //添加列值(单词个数) //三个参数:列族、列、列值 put.addColumn(Bytes.toBytes("statistics"), Bytes.toBytes("cnt"), Bytes.toBytes(record._2)) //执行插入 table.put(put) println("insert (" + record._1 + "," + record._2 + ") into hbase success.") } catch { case e: Exception => e.printStackTrace() } finally { table.close() } }) //关闭HBase连接(此处每个partition任务结束都会执行,会频繁开关连接,耗费资源) // HbaseUtil.closeHbaseConn() } } }) //关闭HBase连接(此处只在Driver节点执行,故无效) // HbaseUtil.closeHbaseConn() } }) //打印从DStream中生成的RDD的前10个元素到控制台 ssc.start(); ssc.awaitTermination(); } }

 

致谢:https://blog.csdn.net/weixin_39469127/article/details/92965283



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3