【Flink |
您所在的位置:网站首页 › flinksql读取kafka历史数据 › 【Flink |
【Flink-Kafka-To-Mysql】使用 Flink 实现 Kafka 数据写入 Mysql(根据对应操作类型进行增、删、改操作)
1)导入依赖2)resources2.1.appconfig.yml2.2.application.properties2.3.log4j.properties2.4.log4j2.xml
3)util3.1.KafkaMysqlUtils3.2.CustomDeSerializationSchema
4)po4.1.TableBean
5)kafkacdc2mysql5.1.Kafka2MysqlApp
需求描述: 1、数据从 Kafka 写入 Mysql。 2、相关配置存放于 Mysql 中,通过 Mysql 进行动态读取。 3、此案例中的 Kafka 是进行了 Kerberos 安全认证的,如果不需要自行修改。 4、Kafka 数据为 Json 格式,获取到的数据根据操作类型字段进行增删改操作。 5、读取时使用自定义 Source,写入时使用自定义 Sink。 6、消费 Kafka 数据时自定义反序列化。 1)导入依赖这里的依赖比较冗余,大家可以根据各自需求做删除或保留。 4.0.0 gaei.cn.x5l x8vbusiness 1.0.0 UTF-8 1.8 ${target.java.version} ${target.java.version} 2.12 2.12.10 1.14.0 2.17.2 3.1.2 3.1.2 3.12.6 4.3.1 com.ververica flink-connector-mysql-cdc 2.3.0 redis.clients jedis 2.9.0 org.apache.flink flink-java ${flink.version} provided org.apache.flink flink-streaming-java_${scala.binary.version} ${flink.version} provided org.apache.flink flink-clients_${scala.binary.version} ${flink.version} provided org.apache.flink flink-table-api-java-bridge_${scala.binary.version} 1.14.0 provided org.apache.flink flink-table-planner_${scala.binary.version} ${flink.version} provided org.apache.flink flink-streaming-scala_${scala.binary.version} ${flink.version} provided org.apache.flink flink-table-common ${flink.version} provided org.apache.flink flink-cep_${scala.binary.version} ${flink.version} org.apache.flink flink-json ${flink.version} provided org.apache.flink flink-csv ${flink.version} provided org.apache.flink flink-sql-connector-kafka_${scala.binary.version} ${flink.version} org.apache.flink flink-state-processor-api_${scala.binary.version} ${flink.version} provided commons-lang commons-lang 2.5 compile org.apache.flink flink-runtime-web_${scala.binary.version} ${flink.version} provided org.apache.logging.log4j log4j-slf4j-impl ${log4j.version} runtime org.apache.logging.log4j log4j-api ${log4j.version} runtime org.apache.logging.log4j log4j-core ${log4j.version} runtime org.apache.hadoop hadoop-client 3.3.1 org.apache.avro avro org.apache.hadoop hadoop-auth ${hadoop.version} org.apache.flink flink-statebackend-rocksdb_${scala.binary.version} ${flink.version} provided com.alibaba fastjson 1.1.23 org.projectlombok lombok 1.16.18 provided org.jyaml jyaml 1.3 org.apache.flink flink-table-planner-blink_${scala.binary.version} 1.13.5 provided com.google.code.gson gson 2.8.3 com.ververica flink-connector-mongodb-cdc 2.3.0 mysql mysql-connector-java 8.0.27 runtime com.alibaba druid 1.2.8 org.mongodb bson ${mongo.driver.core.version} org.mongodb mongodb-driver-core ${mongo.driver.core.version} org.mongodb mongodb-driver 3.12.6 org.apache.maven.plugins maven-compiler-plugin 3.1 ${target.java.version} ${target.java.version} org.apache.maven.plugins maven-shade-plugin 3.0.0 package shade org.apache.flink:force-shading com.google.code.findbugs:jsr305 org.slf4j:* org.apache.logging.log4j:* org.apache.flink:flink-runtime-web_2.11 *:* META-INF/*.SF META-INF/*.DSA META-INF/*.RSA com.owp.flink.kafka.KafkaSourceDemo org.eclipse.m2e lifecycle-mapping 1.0.0 org.apache.maven.plugins maven-shade-plugin [3.0.0,) shade org.apache.maven.plugins maven-compiler-plugin [3.1,) testCompile compile 2)resources 2.1.appconfig.yml mysql.url: "jdbc:mysql://1.1.1.1:3306/test?useSSL=false" mysql.username: "test" mysql.password: "123456" mysql.driver: "com.mysql.jdbc.Driver" 2.2.application.properties url=mongodb://test:[email protected]:34516/?authSource=admin #database=diagnosis #collection=diagnosisEntiry maxConnectionIdleTime=1000000 batchSize=1 # flink checkpoint.interval=300000 checkpoint.minPauseBetweenCheckpoints=10000 checkpoint.checkpointTimeout=400000 maxConcurrentCheckpoints=1 restartInterval=120 restartStrategy=3 checkpointDataUri=hdfs://nameserver/user/flink/rocksdbcheckpoint_mongo mysql.url=jdbc:mysql://1.1.1.1:3306/test?useSSL=false mysql.username=test mysql.password=123456 #envType=PRE envType=PRD # mysql druid 连接池生产环境连接池配置 druid.driverClassName=com.mysql.jdbc.Driver #生产 druid.url=jdbc:mysql://1.1.1.1:3306/test druid.username=test druid.password=123456 # 初始化连接数 druid.initialSize=1 # 最大连接数 druid.maxActive=5 # 最大等待时间 druid.maxWait=3000 2.3.log4j.properties log4j.rootLogger=info, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n 2.4.log4j2.xml 3)util 3.1.KafkaMysqlUtils public class KafkaUtils { public static FlinkKafkaConsumer getKafkaConsumer(List topic) throws IOException { Properties prop1 = confFromYaml(); //认证环境 String envType = prop1.getProperty("envType"); Properties prop = new Properties(); System.setProperty("java.security.krb5.conf", "/opt/conf/krb5.conf"); prop.put("security.protocol", "SASL_PLAINTEXT"); prop.put("sasl.jaas.config", "com.sun.security.auth.module.Krb5LoginModule required " + "useTicketCache=false " + "serviceName=\"" + "kafka" + "\" " + "useKeyTab=true " + "keyTab=\"" + "/opt/conf/test.keytab" + "\" " + "principal=\"" + getKafkaKerberos(envType).get("principal") + "\";"); prop.put("bootstrap.servers", getKafkaKerberos(envType).get("bootstrap.servers")); prop.put("group.id", "Kafka2Mysql_test"); prop.put("auto.offset.reset", "earliest"); prop.put("enable.auto.commit", "false"); prop.put("max.poll.interval.ms", "60000"); prop.put("max.poll.records", "3000"); prop.put("session.timeout.ms", "600000"); // List topics = Stream.of(prop.getProperty("topics").split(",", -1)) // .collect(Collectors.toList()); prop.put("key.serializer", "org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer"); prop.put("value.serializer", "org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer"); FlinkKafkaConsumer consumer = new FlinkKafkaConsumer(topic, new CustomDeSerializationSchema(), prop); consumer.setStartFromGroupOffsets(); consumer.setCommitOffsetsOnCheckpoints(true); return consumer; } public static void main(String[] args) throws Exception { Properties druidConf = KafkaUtils.getDruidConf(); if (druidConf == null) { throw new RuntimeException("缺少druid相关配置信息,请检查"); } DataSource dataSource = DruidDataSourceFactory.createDataSource(druidConf); Connection connection = dataSource.getConnection(); PreparedStatement showDatabases = connection.prepareStatement("\n" + "select count(*) from tab_factory"); ResultSet resultSet = showDatabases.executeQuery(); while (resultSet.next()) { String string = resultSet.getString(1); System.out.println(string); } resultSet.close(); showDatabases.close(); connection.close(); } public static Properties getDruidConf() { try { Properties prop = confFromYaml(); String driverClassName = prop.get("druid.driverClassName").toString(); String url = prop.get("druid.url").toString(); String username = prop.get("druid.username").toString(); String password = prop.get("druid.password").toString(); String initialSize = prop.get("druid.initialSize").toString(); String maxActive = prop.get("druid.maxActive").toString(); String maxWait = prop.get("druid.maxWait").toString(); Properties p = new Properties(); p.put("driverClassName", driverClassName); p.put("url", url); p.put("username", username); p.put("password", password); p.put("initialSize", initialSize); p.put("maxActive", maxActive); p.put("maxWait", maxWait); // p.forEach((k,v)-> System.out.println("连接池属性 "+k+"="+v)); return p; } catch (Exception e) { e.printStackTrace(); } return null; } // envType PRE PRD public static Map getKafkaKerberos(String envType) { Map map = new HashMap(); if ("PRD".equalsIgnoreCase(envType)) { map.put("principal", "[email protected]"); map.put("bootstrap.servers", "kfk01.prd:9092,kfk02.prd:9092,kfk03.prd:9092,kfk04.prd:9092,kfk05.prd:9092,kfk06.prd:9092"); } else if ("PRE".equalsIgnoreCase(envType)) { map.put("principal", "[email protected]"); map.put("bootstrap.servers", "kfk01.pre:9092,kfk02.pre:9092,kfk03.pre:9092"); } /*else if ("TEST".equalsIgnoreCase(envType)) { map.put("principal","[email protected]"); map.put("bootstrap.servers","[email protected]"); } */ else { System.out.println("没有该" + envType + "环境"); throw new RuntimeException("没有该" + envType + "环境"); } return map; } public static StreamExecutionEnvironment setupFlinkEnv(StreamExecutionEnvironment env) throws IOException { Properties prop = confFromYaml(); env.enableCheckpointing(Long.valueOf(prop.getProperty("checkpoint.interval")), CheckpointingMode.EXACTLY_ONCE);//这里会造成offset提交的延迟 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(Long.valueOf(prop.getProperty("checkpoint.minPauseBetweenCheckpoints"))); env.getCheckpointConfig().setCheckpointTimeout(Long.valueOf(prop.getProperty("checkpoint.checkpointTimeout"))); env.getCheckpointConfig().setMaxConcurrentCheckpoints(Integer.valueOf(prop.getProperty("maxConcurrentCheckpoints"))); env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); // env.getCheckpointConfig().setTolerableCheckpointFailureNumber(5); env.setRestartStrategy(RestartStrategies.fixedDelayRestart( Integer.valueOf(prop.getProperty("restartStrategy")), // 尝试重启的次数,不宜过小,分布式任务很容易出问题(正常情况),建议3-5次 Time.of(Integer.valueOf(prop.getProperty("restartInterval")), TimeUnit.SECONDS) // 延时 )); // 设置状态后端存储方式 // env.setStateBackend(new RocksDBStateBackend((String) prop.getProperty("checkPointPath"), true)); // env.setStateBackend(new MemoryStateBackend()); env.setStateBackend(new RocksDBStateBackend(String.valueOf(prop.getProperty("checkpointDataUri")), true)); return env; } public static Properties confFromYaml() { Properties prop = new Properties(); InputStream resourceStream = Thread.currentThread().getContextClassLoader().getResourceAsStream("application.properties"); try { prop.load(resourceStream); } catch (Exception e) { e.printStackTrace(); } finally { try { if (resourceStream != null) { resourceStream.close(); } } catch (Exception ex) { ex.printStackTrace(); } } return prop; } } 3.2.CustomDeSerializationSchema public class CustomDeSerializationSchema implements KafkaDeserializationSchema { private static String encoding = "UTF8"; //是否表示l流的最后一条元素,设置为false,表示数据会源源不断的到来 @Override public boolean isEndOfStream(ConsumerRecord nextElement) { return false; } //这里返回一个ConsumerRecord类型的数据,除了原数据还包括topic,offset,partition等信息 @Override public ConsumerRecord deserialize(ConsumerRecord record) throws Exception { byte[] key = (record.key() == null ? "".getBytes() : record.key()); return new ConsumerRecord( record.topic(), record.partition(), record.offset(), record.timestamp(), record.timestampType(), record.checksum(), record.serializedKeySize(), record.serializedValueSize(), /*这里我没有进行空值判断,生产一定记得处理*/ new String(key, encoding), new String(record.value(), encoding)); } //指定数据的输入类型 @Override public TypeInformation getProducedType() { return TypeInformation.of(new TypeHint() { }); } } 4)po 4.1.TableBean @Data public class TableBean { private String database; private String table; private String primaryKey; private TableBean() { } public TableBean(String database, String table, String primaryKey) { this.database = '`' + database + '`'; this.table = '`' + table + '`'; this.primaryKey = primaryKey; } } 5)kafkacdc2mysql 5.1.Kafka2MysqlApp public class Kafka2MysqlApp { // key 是 topic 名,value是对应数据库表中的主键列名 private static final Map map = new HashMap(); static { //表名这里没有进行配置,后面根据实际业务进行配置即可 map.put("mysql_tab1", new TableBean("db1", "", "alarm_id")); map.put("mysql_tab2", new TableBean("db2", "", "id")); } public static void main(String[] args) throws Exception { ArrayList topicList = new ArrayList(map.keySet()); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().disableOperatorChaining(); KafkaUtils.setupFlinkEnv(env); RichSinkFunction sinkFunction = new RichSinkFunction() { DataSource dataSource = null; @Override public void open(Configuration parameters) throws Exception { initDruidDataSource(); } private void initDruidDataSource() throws Exception { Properties druidConf = KafkaUtils.getDruidConf(); if (druidConf == null) { throw new RuntimeException("缺少druid相关配置信息,请检查"); } dataSource = DruidDataSourceFactory.createDataSource(druidConf); } @Override public void close() throws Exception { } @Override public void invoke(ConsumerRecord record, Context context) throws Exception { if (dataSource == null) { throw new RuntimeException("连接池未初始化"); } String operationType = ""; String keyId = ""; String sql = ""; try (Connection connection = dataSource.getConnection()) { //定义表名 String table_name = record.topic(); JSONObject jsonObject = JSONObject.parseObject(record.value()); operationType = jsonObject.getString("operationType"); jsonObject.remove("operationType"); String primaryKey = map.get(record.topic()).getPrimaryKey(); String database = map.get(record.topic()).getDatabase(); keyId = jsonObject.getString(primaryKey); List columns = new ArrayList(); List columnValues = new ArrayList(); jsonObject.forEach((k, v) -> { columns.add(k); columnValues.add(v.toString()); }); if ("INSERT".equals(operationType)) { try { sql = "delete from " + database + "." + table_name + " where " + primaryKey + "= ?"; PreparedStatement preparedStatement = connection.prepareStatement(sql); preparedStatement.setObject(1, keyId); preparedStatement.executeUpdate(); preparedStatement.close(); } catch (Exception ignore) { } StringBuilder sb = new StringBuilder(); sb.append("insert into ").append(database).append(".").append(table_name).append("("); for (String column : columns) { sb.append("`").append(column).append("`,"); } sb.append(") values("); for (String columnValue : columnValues) { sb.append("?,"); } sb.append(")"); //去除最后一个逗号 sql = sb.toString().replace(",)", ")"); PreparedStatement preparedStatement = connection.prepareStatement(sql); for (int i = 0; i |
今日新闻 |
推荐新闻 |
CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3 |