构建一个flink程序,从kafka读取然后写入MYSQL

您所在的位置:网站首页 java往kafka写数据 构建一个flink程序,从kafka读取然后写入MYSQL

构建一个flink程序,从kafka读取然后写入MYSQL

2023-03-26 15:13| 来源: 网络整理| 查看: 265

最近flink已经变得比较流行了,所以大家要了解flink并且使用flink。现在最流行的实时计算应该就是flink了,它具有了流计算和批处理功能。它可以处理有界数据和无界数据,也就是可以处理永远生产的数据。具体的细节我们不讨论,我们直接搭建一个flink功能。总体的思路是source -> transform -> sink,即从source获取相应的数据来源,然后进行数据转换,将数据从比较乱的格式,转换成我们需要的格式,转换处理后,然后进行sink功能,也就是将数据写入到相应的db里边或文件中用于存储和展现。

  接下来我们需要下载flink,kafka,mysql, zookeeper,  我直接下载了tar或tgz包,然后解压。

  下载好flink之后,然后启动一下,比如我下载了flink-1.9.1-bin-scala_2.11.tgz,然后解压一下。

tar -zxvf flink-1.9.1-bin-scala_2.11.tgz cd flink-1.9.1 ./bin/start-cluster.sh

  启动好了之后访问 http://localhost:8081 ,会看到截图。

 

 

   下载zookeeper,解压之后,复制zookeeper/conf下的zoo_sample.cfg, 然后启动,命令如下,zookeeper是和kafka结合使用的,因为kafka要监听和发现所有broker的。

cp zoo_sample.cfg zoo.cfg cd ../ ./bin/zkServer.sh start

   接下来下载kafka和启动, 创建一个person的topic,由一个partition和一个备份构成。

./bin/kafka-server-start.sh config/server.properties ./bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic person

  mysql的话,大家可以自行安装了,安装好之后可以在数据库里创建一张表。

复制代码 CREATE TABLE `Person` ( `id` mediumint NOT NULL auto_increment, `name` varchar(255) NOT NULL, `age` int(11) DEFAULT NULL, `createDate` timestamp NULL DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci 复制代码

  接下来我们该创建一个JAVA工程,采用的maven的方式。前提是大家一定要先安装好maven,可以执行mvn命令。直接执行一下maven的时候可能会卡住,下载不了,我先从

 https://repo.maven.apache.org/maven2/上下载一个  archetype-catalog.xml 文件,然后放到本地的maven对应的库,你们可以参考这个我的路径进行调整。    /Users/huangqingshi/.m2/repository/org/apache/maven/archetype/archetype-catalog/2.4    复制代码 mvn archetype:generate \ -DarchetypeGroupId=org.apache.flink \ -DarchetypeArtifactId=flink-quickstart-java \ -DarchetypeVersion=1.7.2 \ -DgroupId=flink-project \ -DartifactId=flink-project \ -Dversion=0.1 \ -Dpackage=myflink \ -DinteractiveMode=false \ #这个是创建项目时采用交互方式,上边指定了了相关的版本号和包名等信息,所以不需要交互方式进行。 -DarchetypeCatalog=local #这个是使用上边下载的文件,local也就是从本地文件获取,因为远程获取特别慢。导致工程生成不了。 复制代码

  我的这个项目添加了一些依赖比如kafka的,数据库连接等,具体的pom文件内容为:

复制代码 4.0.0 flink-project flink-project 0.1 jar Flink Quickstart Job http://www.myorganization.org UTF-8 1.7.2 1.8 2.11 ${java.version} ${java.version} apache.snapshots Apache Development Snapshot Repository https://repository.apache.org/content/repositories/snapshots/ false true org.apache.flink flink-java ${flink.version} provided org.apache.flink flink-streaming-java_${scala.binary.version} ${flink.version} provided org.apache.flink flink-connector-kafka-0.10_${scala.binary.version} ${flink.version}


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3