客官,您的【Spark读取千万数据量Mysql大表后写入到Hive】代码已上齐,请慢用。。。

您所在的位置:网站首页 spark读取数据 客官,您的【Spark读取千万数据量Mysql大表后写入到Hive】代码已上齐,请慢用。。。

客官,您的【Spark读取千万数据量Mysql大表后写入到Hive】代码已上齐,请慢用。。。

2023-08-16 07:11| 来源: 网络整理| 查看: 265

鸣谢:如果您觉得本文对您有帮助,请点赞和收藏,Thanks。

本文主要用来描述如何用Spark读取Mysql超百万甚至千万数据量的大表。 经过测试,spark读取1千万数据量的mysql表写到hive中,整个job运行只需1.6分钟。

开始参考这篇文章的写法读取mysql【spark jdbc分区并发读取 mysql 大表】,但是只适合读取数字型的主键或者要写死分区,不管是从实际需求还是代码层面来说都显得不够优秀。 于是就想到了用mysql分页读取的方法来改进。这种方法无需关注分区字段的类型,也无需写死分区,可以根据配置进行动态分区读取。可能还有不足之处,欢迎提出改进建议。

客官,您的代码已上齐,请慢用。。。 POM.xml文件

按照项目实际需求添加依赖即可。

4.0.0 org.example spark 1.0 jar 1.8 UTF-8 UTF-8 2.11 2.1.1 5.1.48 org.apache.spark spark-core_${scala.vesion} ${spark.version} org.apache.spark spark-sql_${scala.vesion} ${spark.version} org.apache.spark spark-hive_${scala.vesion} ${spark.version} mysql mysql-connector-java ${mysql.version} runtime net.alchim31.maven scala-maven-plugin 3.2.2 compile testCompile maven-assembly-plugin false jar-with-dependencies make-assembly package single Properties 文件

配置mysql连接参数。

# mysql连接参数 mysql.url=jdbc:mysql://92.168.13.11:3306/zhmlj?useUnicode=true&characterEncoding=utf-8&useSSL=false mysql.user=root mysql.password=root # mysql读取大表配置(开启后采用并发分区读取) mysql.isPartition=true # 表数据大小(估算值即可,大于等于实际表数据量) mysql.tableCount=11000000 # 分区读取条数(按照分页进行读取) mysql.partitionLimit=100000 # 排序字段 mysql.orderField=id Properties 工具类

用来读取properties文件的mysql连接参数

package utils import java.io.InputStream import java.util.Properties object PropertiesUtils { //单例配置文件 lazy val getProperties: Properties = { val properties = new Properties() val in: InputStream = this.getClass.getClassLoader.getResourceAsStream("application.properties"); properties.load(in) properties } } SparkUtils工具类

用来创建单例SparkSession 。

package utils import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession object SparkUtils { /** * 创建批处理配置对象 * setMaster:设置运行模式 local:单线程模式,local[n]:以n个线程运行,local[*]:以所有CPU核数的线程运行 * setAppName:设置应用名称 * set:各种属性(spark.testing.memory分配4G内存来测试) */ //TODO 本地运行用这个 lazy val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("TestHive").set("spark.testing.memory", "4294967296") //TODO 打包运行用这个 // lazy val sparkConf: SparkConf = new SparkConf().setAppName("SparkJob") //创建session // lazy val sparkSession: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate() //创建session,并启用hive lazy val sparkSessionWithHive: SparkSession = SparkSession.builder().enableHiveSupport().config(sparkConf).getOrCreate() } 连接Mysql工具类

用来创建mysql连接。

package utils import java.sql.{Connection, DriverManager} import java.util.Properties import org.apache.spark.sql.{DataFrame, SaveMode} object ConnectUtils { private val properties: Properties = PropertiesUtils.getProperties /** * mysql数据源输入 */ def mysqlSource: (String) => DataFrame = (tableName: String) => { val prop = new Properties() prop.setProperty("user", properties.getProperty("mysql.user")) prop.setProperty("password", properties.getProperty("mysql.password")) prop.setProperty("driver", "com.mysql.jdbc.Driver") //是否开启读大表配置 if (properties.getProperty("mysql.isPartition").equals("true")) { //表数据大小 val tableCount: Int = properties.getProperty("mysql.tableCount").toInt //每页数据大小 val partitionLimit: Int = properties.getProperty("mysql.partitionLimit").toInt //需要的分页数 val pages: Int = tableCount / partitionLimit //分页条件 val partitionArray = new Array[String](pages) val orderField: String = properties.getProperty("mysql.orderField") for (i SparkUtils.sparkSessionWithHive.read.jdbc(properties.getProperty("mysql.url"), tableName, prop) } } } Mysql to Hive

使用spark读取mysql表数据写入到hive中。

package demo import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} import utils.{ConnectUtils, SparkUtils} object MysqlToHive { def main(args: Array[String]): Unit = { //创建session val spark: SparkSession = SparkUtils.sparkSessionWithHive //连接mysql //TODO 修改表名 val dataDF: DataFrame = ConnectUtils.mysqlSource("table_test01") //TODO 具体业务逻辑处理 //通过调用API的方式保存到hive dataDF.write.mode(SaveMode.Append).insertInto("databases_test.table_test01") //方式二:利用API自动创建表再插入数据 //dataDF.write.mode(SaveMode.Append).saveAsTable("test_xsh_0401") //方式三:利用SQL插入已存在的表 // dataDF.createTempView("qiaoJie") // sql("insert into table ods_xsh_0330 select * from qiaoJie") println("OK。。。。。") //释放资源 spark.stop() } } 打包提交到YARN运行 本地debug程序的时候,可以先在properties文件把表数据大小tableCount和分区读取条数partitionLimit设置小一点,用作本地测试程序。打包运行时,记得把SparkUtils的sparkConf修改为打包用的;把pom.xml的scope注释去掉,用服务器上spark自带的依赖即可。provided用maven clean package打包成功后,把jar包提交到YARN上运行。运行jar包命令如下: 具体参数可结合服务器配置和美团技术文章说明进行修改。 Spark性能优化指南——基础篇 spark-submit \ --master yarn-cluster \ --num-executors 2\ --executor-memory 6G \ --executor-cores 4 \ --driver-memory 1G


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3