Flink(Scala版)消费Kafka数据存入Mysql

您所在的位置:网站首页 flink读取kafka的数据写到mysql中 Flink(Scala版)消费Kafka数据存入Mysql

Flink(Scala版)消费Kafka数据存入Mysql

2024-06-24 17:24| 来源: 网络整理| 查看: 265

题目要求:1.使用Flume采集指定日志文件,并将采集到的数据存入kafka中 2.将存入kafka的消息,使用Flink进行处理并存入Mysql中

第一步 :编写Flume文档

在Flume目录下进入job目录,编写flume-kafka.conf文件 在这里插入图片描述

a.sources = s1 a.channels = c1 a.sinks = k1 a.sources.s1.type = exec a.sources.s1.command = tail -F /usr/apps/tmp/redis.log a.channels.c1.type = memory a.channels.c1.capacity = 1000 a.channels.c1.transactionCapacity = 100 a.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a.sinks.k1.kafka.topic = ssm a.sinks.k1.kafka.bootstrap.servers = master:9092,slave1:9092,slave2:9092 a.sinks.k1.kafka.flumeBatchSize = 20 a.sinks.k1.kafka.producer.acks = 1 a.sinks.k1.kafka.producer.linger.ms = 1 a.sinks.k1.kafka.producer.compression.type = snappy a.sources.s1.channels = c1 a.sinks.k1.channel = c1

对上文个别语句的解释: a.sources.s1.type = exec //表示监听文件 a.sources.s1.command =tail -F /usr/apps/tmp/redis.log //监听文件的绝对路径 a.sinks.k1.kafka.topic = ssm //存入kafka的主题中,主题名为:ssm 监听的文件自行创建,保证和文档内写的相同就可以。 注意:此处不需要使用kafka提前创建主题,kafka会自动创建 关于Kafka的命令:kafka命令大全

第二步:开启Flume采集

1、jps 检查Kafka和ZK进程是否都开启了! 在这里插入图片描述 确认开启之后,开始Flume采集. 采集命令: bin/flume-ng agent -c conf/ -n a -f job/flume-kafka.conf -Dflume.root.logger=INFO,console Flume命令以及详细过程,请参考以下文章:Flume命令详解参考文章

第三步:编写Flink程序 import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction} import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011 import java.sql.{Connection, DriverManager, PreparedStatement} import java.util.Properties case class UVPV(user_id: String, times: Long) object KafkaToMysql_test1 { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val prop = new Properties() prop.setProperty("bootstrap.servers", "192.168.38.147:9092") prop.setProperty("group.id", "flink") val kafkaSource = env.addSource(new FlinkKafkaConsumer011[String]("ssm", new SimpleStringSchema(), prop)) val result = kafkaSource.map(data => { val arr = data.split(",") UVPV(arr(0), arr(1).toLong) }) result.addSink(new MyJdbcSinkFunction()) env.execute() } } class MyJdbcSinkFunction() extends RichSinkFunction[UVPV]{ //定义连接,预编译语句 var conn:Connection=_ var insertStmt:PreparedStatement=_ var updateStmt:PreparedStatement=_ override def open(parameters: Configuration): Unit = { conn=DriverManager.getConnection("jdbc:mysql://ip地址:3306/数据库名称","用户名","密码") insertStmt=conn.prepareStatement("insert into uvpv(user_id,times) values (?,?)") updateStmt=conn.prepareStatement("update uvpv set times=? where user_id=?") } override def invoke(value: UVPV, context: SinkFunction.Context[_]): Unit = { updateStmt.setLong(1,value.times) updateStmt.setString(2,value.user_id) updateStmt.execute() if (updateStmt.getUpdateCount==0){ insertStmt.setString(1,value.user_id) insertStmt.setLong(2,value.times) insertStmt.execute() } } override def close(): Unit = { insertStmt.close() updateStmt.close() conn.close() } }

因为我用的Flink版本过低(1.10.2)并没有提供JdbcSink的包,所以需要自定义SinkFunction来连接Mysql进行交互。 jdbc如果看不懂的可以自行百度,或者说去小破站搜教程即可。

第四步 创建Mysql数据库和表 mysql -uroot -p000 create database test; use test; create table uvpv(user_id varchar(255) not null ,times int not null); 捋清楚执行顺序:

Flume采集–>Flink执行程序–>向Flume监听的文件加入数据—>Mysql查询表 第一步 第二步

第三步 第四步



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3