Shlle脚本传参调用seatunnel(原waterdrop)将hive中数据导入ClickHouse

您所在的位置:网站首页 clickhouse执行脚本 Shlle脚本传参调用seatunnel(原waterdrop)将hive中数据导入ClickHouse

Shlle脚本传参调用seatunnel(原waterdrop)将hive中数据导入ClickHouse

2023-10-01 17:49| 来源: 网络整理| 查看: 265

前言

公司分析数据已经存入hive,但需要输入参数计算得到很长一段时间的趋势变化数据(不固定查询),经调研ClickHouse时序优化后比较满足需求,并且ClickHouse在数据量大时最好采用DNS轮询本地表写,分布式表读的工作方式。 这里测试写脚本将hive中数据导入ClickHouse,加入到原来的数仓流程。 其实可以采取kafka+spark/streaming方式批量插入clickhouse提供准实时计算,后续看需求吧

开始测试:

运行环境

首先,假设已经安装好seatunnel1.5.1(waterdrop1.5.1),并且安装spark2.4.8、scala(这三个搭配是官方的推荐的),$HADOOP_CONF/hive-site.xml放入spark2.4.8/conf。 百度网盘自取: 链接:https://pan.baidu.com/s/1BZ8-oNXhRjmrqd3hW-KxXA 提取码:hevt 环境变量:我这里hive3.1.2使用的是Spark3,所以配了一个SPARK2_HOME,后续脚本中设置使用SPARK2_HOME

# SPARK_HOME export SPARK_HOME=/u/module/spark export PATH=$PATH:$SPARK_HOME/bin # SPARK_END # 多版本共存Spark,for waterdrop and Hive export SPARK2_HOME=/u/module/spark-2.4.8-bin-hadoop2.7 #Scala Env export SCALA_HOME=/u/module/scala-2.11.8/ export PATH=$PATH:$SCALA_HOME/bin

创建jobs目录存放执行conf文件

mkdir /u/module/seatunnel-1.5.1/jobs 创建表

hive:

-- auto-generated definition create table student ( id int, name string ) row format serde 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' stored as inputformat 'org.apache.hadoop.mapred.TextInputFormat' outputformat 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' location 'hdfs://hadoop101:8020/user/hive/warehouse/student' tblproperties ('bucketing_version' = '2');

插入:

insert into student values (1,'abc'),(2,'def');

clickhouse:

drop table if exists waterdrop_test; create table waterdrop_test ( id UInt16, name String, birthday Date ) ENGINE = TinyLog; 脚本

我们可以使用cat touch ~/bin/mytest.sh && chmod u+x ~/bin/mytest.sh && vim ~/bin/mytest.sh #!/bin/bash # 环境变量 unset SPARK_HOME export SPARK_HOME=$SPARK2_HOME SEATUNNEL_HOME=/u/module/seatunnel-1.5.1 # 接收两个参数,第一个为要抽取的表,第二个为抽取时间 # 若输入的第一个值为first,不输入第二参数则直接退出脚本 if [[ $1 = first ]]; then if [ -n "$2" ] ;then do_date=$2 else echo "请传入日期参数" exit fi # 若输入的第一个值为all,不输入第二参数则取前一天 elif [[ $1 = all ]]; then # 判断非空,如果不传时间默认取前一天数据,传时间就取设定,主要是用于手动传参 if [ -n "$2" ] ;then do_date=$2 else do_date=`date -d '-1 day' +%F` fi else if [ -n "$2" ] ;then do_date=$2 else echo "请传入日期参数" exit fi fi echo "日期:$do_date" # 打印数据传输脚本并赋值 cat>$SEATUNNEL_HOME/jobs/hive2ck_test.conf hive { pre_sql = "select id,name,'${do_date}' as birthday from default.student" table_name = "test" } } filter {} output { clickhouse { host = "hadoop101:8123" database = "default" table = "waterdrop_test" fields = ["id","name","birthday"] username = "default" password = "" } } !EOF $SEATUNNEL_HOME/bin/start-waterdrop.sh --config $SEATUNNEL_HOME/jobs/hive2ck_test.conf -e client -m 'local[2]' 进阶脚本 #!/bin/bash # 环境变量 unset SPARK_HOME export SPARK_HOME=$SPARK2_HOME SEATUNNEL_HOME=/u/module/seatunnel-1.5.1 # 接收两个参数,第一个为要抽取的表,第二个为抽取时间 # 若输入的第一个值为first,不输入第二参数则直接退出脚本 if [[ $1 = first ]]; then if [ -n "$2" ] ;then do_date=$2 else echo "请传入日期参数" exit fi # 若输入的第一个值为all,不输入第二参数则取前一天 elif [[ $1 = all ]]; then # 判断非空,如果不传时间默认取前一天数据,传时间就取设定,主要是用于手动传参 if [ -n "$2" ] ;then do_date=$2 else do_date=`date -d '-1 day' +%F` fi else if [ -n "$2" ] ;then do_date=$2 else echo "请传入日期参数" exit fi fi echo "日期:$do_date" import_conf(){ # 打印数据传输脚本并赋值 cat>$SEATUNNEL_HOME/jobs/hive2ck_test.conf hive { # pre_sql = "select id,name,'${do_date}' as birthday from default.student" pre_sql = "$1" table_name = "$2" } } filter {} output { clickhouse { host = "$3" database = "$4" table = "$5" # fields = ["id","name","birthday"] fields = $6 username = "default" password = "" } } !EOF $SEATUNNEL_HOME/bin/start-waterdrop.sh --config $SEATUNNEL_HOME/jobs/hive2ck_test.conf -e client -m 'local[2]' } import_test(){ import_conf "select id,name,'${do_date}' as birthday from default.student" "test" "hadoop101:8123" "default" "waterdrop_test" "[\"id\",\"name\",\"birthday\"]" } import_test

此脚本将生成conf做为一个方法,可用于多表导入 后续增量导入

总结

在这里插入图片描述

我这里测试成功,这只是最基本的使用,可根据自己需求更改。

若对你有帮助,点个赞吧~



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3