Flink cdc 实时监听oracle归档日志及oracle数据延迟参数调优

您所在的位置:网站首页 实时监控有延迟 Flink cdc 实时监听oracle归档日志及oracle数据延迟参数调优

Flink cdc 实时监听oracle归档日志及oracle数据延迟参数调优

2024-07-17 20:35| 来源: 网络整理| 查看: 265

Flink cdc实时监听oracle归档日志,oracle数据延迟等问题调优 1. Oracle 配置1.1 oracle 归档日志开启配置1.2 为cdc 创建特定用户1.3 指定oracle表、库级启用 2. oracle CDC Connector2.1 Flink core2.2 Flink sql 3. 补充3.1 oracle相关记录3.2 官方文档地址

1. Oracle 配置 1.1 oracle 归档日志开启配置

1.数据库服务器终端执行命令

sqlplus / as sysdba 或 sqlplus /nolog CONNECT sys/password@host:port AS SYSDBA;

2.检查归档日志是否开启

archive log list;

(“Database log mode: No Archive Mode”,日志归档未开启) (“Database log mode: Archive Mode”,日志归档已开启) 3.配置归档日志参数

alter system set db_recovery_file_dest_size = 100G; alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile;

4.创建表空间

CREATE TABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/SID/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED; SID文件夹需使用root用户提前创建,赋予读写权限:chmod 777

5.启用归档日志

shutdown immediate; #停止oracle服务 startup mount; #启动oracle服务 alter database archivelog; #开启数据库归档 alter database open;

6.启动完成后重新执行 archive log list; 查看归档打开状态

1.2 为cdc 创建特定用户

账号为 flinkuser 密码为flinkpw (执行命令中用户、密码、主机、端口需自行替换)

sqlplus sys/password@host:port/SID AS SYSDBA; CREATE USER flinkuser IDENTIFIED BY flinkpw DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS; GRANT CREATE SESSION TO flinkuser; GRANT SET CONTAINER TO flinkuser; GRANT SELECT ON V_$DATABASE to flinkuser; GRANT FLASHBACK ANY TABLE TO flinkuser; GRANT SELECT ANY TABLE TO flinkuser; GRANT SELECT_CATALOG_ROLE TO flinkuser; GRANT EXECUTE_CATALOG_ROLE TO flinkuser; GRANT SELECT ANY TRANSACTION TO flinkuser; GRANT LOGMINING TO flinkuser; GRANT CREATE TABLE TO flinkuser; -- need not to execute if set scan.incremental.snapshot.enabled=true(default) GRANT LOCK ANY TABLE TO flinkuser; GRANT ALTER ANY TABLE TO flinkuser; GRANT CREATE SEQUENCE TO flinkuser; GRANT EXECUTE ON DBMS_LOGMNR TO flinkuser; GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkuser; GRANT SELECT ON V_$LOG TO flinkuser; GRANT SELECT ON V_$LOG_HISTORY TO flinkuser; GRANT SELECT ON V_$LOGMNR_LOGS TO flinkuser; GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkuser; GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkuser; GRANT SELECT ON V_$LOGFILE TO flinkuser; GRANT SELECT ON V_$ARCHIVED_LOG TO flinkuser; GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkuser; exit; 1.3 指定oracle表、库级启用 -- 指定表启用补充日志记录: ALTER TABLE database.table ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS; -- 为数据库的所有表启用 ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS; -- 指定数据库启用补充日志记录 ALTER DATABASE ADD SUPPLEMENTAL LOG DATA; -- 提交修改 ALTER SYSTEM SWITCH LOGFILE; 2. oracle CDC Connector 2.1 Flink core

flink使用oracle cdc连接器时数据延迟较大,需加入debezium参数进行调整,以下代码均已加入该配置。

Properties properties = new Properties(); //设置数据库连接参数(表名大小写转换) properties.setProperty("database.tablename.case.insensitive","false"); properties.setProperty("database.connection.adapter", "logminer"); //降低oracle cdc 延迟 properties.setProperty("log.mining.strategy", "online_catalog"); properties.setProperty("log.mining.continuous.mine", "true"); //创建Stream环境 Configuration configuration = new Configuration(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration); //oracle 连接器配置 SourceFunction build = OracleSource.builder() .hostname("localhost") .port(1521) .database("XE") // monitor XE database .schemaList("test") // monitor test schema .tableList("DBTEST.tbtest1," + "DBTEST.TBTEST2") // monitor tables .username("flinkuser") .password("flinkpw") //从最新位置读取,可自行修改initial()、latest() .startupOptions(com.ververica.cdc.connectors.oracle.table.StartupOptions.latest()) .deserializer(new FlinkCdcDataDeserializationSchema()) // converts SourceRecord to JSON String .debeziumProperties(properties) .build(); env.setParallelism(2); DataStreamSource stringDataStreamSource = env.addSource(build); stringDataStreamSource.addSink(new CustomSink()); env.execute(); 2.2 Flink sql CREATE TABLE products ( db_name STRING METADATA FROM 'database_name' VIRTUAL, schema_name STRING METADATA FROM 'schema_name' VIRTUAL, table_name STRING METADATA FROM 'table_name' VIRTUAL, operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL, ID INT NOT NULL, NAME STRING, DESCRIPTION STRING, WEIGHT DECIMAL(10, 3), PRIMARY KEY(id) NOT ENFORCED ) WITH ( 'connector' = 'oracle-cdc', 'hostname' = 'localhost', 'port' = '1521', 'username' = 'flinkuser', 'password' = 'flinkpw', 'database-name' = 'XE', 'schema-name' = 'test', 'table-name' = 'tbtest', 'debezium.log.mining.strategy' = 'online_catalog', 'debezium.log.mining.continuous.mine' = 'true' ); 3. 补充 3.1 oracle相关记录

SQL>select status from v$instance 查看实例状态 SQL>alter database open;打开数据库

##无法启动时错误代码1261、1263 可指定pfile文件启动(路径按实际位置) startup mount pfile=’/oracle/app/oracle/admin/win01/pfile/init.ora’;

3.2 官方文档地址

https://ververica.github.io/flink-cdc-connectors/master/content/connectors/oracle-cdc.html



【本文地址】


今日新闻


推荐新闻


    CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3