Flink cdc 实时监听oracle归档日志及oracle数据延迟参数调优 |
您所在的位置:网站首页 › 实时监控有延迟 › Flink cdc 实时监听oracle归档日志及oracle数据延迟参数调优 |
Flink cdc实时监听oracle归档日志,oracle数据延迟等问题调优
1. Oracle 配置1.1 oracle 归档日志开启配置1.2 为cdc 创建特定用户1.3 指定oracle表、库级启用
2. oracle CDC Connector2.1 Flink core2.2 Flink sql
3. 补充3.1 oracle相关记录3.2 官方文档地址
1. Oracle 配置
1.1 oracle 归档日志开启配置
1.数据库服务器终端执行命令 sqlplus / as sysdba 或 sqlplus /nolog CONNECT sys/password@host:port AS SYSDBA;2.检查归档日志是否开启 archive log list;(“Database log mode: No Archive Mode”,日志归档未开启) (“Database log mode: Archive Mode”,日志归档已开启) 3.配置归档日志参数 alter system set db_recovery_file_dest_size = 100G; alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile;4.创建表空间 CREATE TABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/SID/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED; SID文件夹需使用root用户提前创建,赋予读写权限:chmod 7775.启用归档日志 shutdown immediate; #停止oracle服务 startup mount; #启动oracle服务 alter database archivelog; #开启数据库归档 alter database open;6.启动完成后重新执行 archive log list; 查看归档打开状态 1.2 为cdc 创建特定用户账号为 flinkuser 密码为flinkpw (执行命令中用户、密码、主机、端口需自行替换) sqlplus sys/password@host:port/SID AS SYSDBA; CREATE USER flinkuser IDENTIFIED BY flinkpw DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS; GRANT CREATE SESSION TO flinkuser; GRANT SET CONTAINER TO flinkuser; GRANT SELECT ON V_$DATABASE to flinkuser; GRANT FLASHBACK ANY TABLE TO flinkuser; GRANT SELECT ANY TABLE TO flinkuser; GRANT SELECT_CATALOG_ROLE TO flinkuser; GRANT EXECUTE_CATALOG_ROLE TO flinkuser; GRANT SELECT ANY TRANSACTION TO flinkuser; GRANT LOGMINING TO flinkuser; GRANT CREATE TABLE TO flinkuser; -- need not to execute if set scan.incremental.snapshot.enabled=true(default) GRANT LOCK ANY TABLE TO flinkuser; GRANT ALTER ANY TABLE TO flinkuser; GRANT CREATE SEQUENCE TO flinkuser; GRANT EXECUTE ON DBMS_LOGMNR TO flinkuser; GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkuser; GRANT SELECT ON V_$LOG TO flinkuser; GRANT SELECT ON V_$LOG_HISTORY TO flinkuser; GRANT SELECT ON V_$LOGMNR_LOGS TO flinkuser; GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkuser; GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkuser; GRANT SELECT ON V_$LOGFILE TO flinkuser; GRANT SELECT ON V_$ARCHIVED_LOG TO flinkuser; GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkuser; exit; 1.3 指定oracle表、库级启用 -- 指定表启用补充日志记录: ALTER TABLE database.table ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS; -- 为数据库的所有表启用 ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS; -- 指定数据库启用补充日志记录 ALTER DATABASE ADD SUPPLEMENTAL LOG DATA; -- 提交修改 ALTER SYSTEM SWITCH LOGFILE; 2. oracle CDC Connector 2.1 Flink coreflink使用oracle cdc连接器时数据延迟较大,需加入debezium参数进行调整,以下代码均已加入该配置。 Properties properties = new Properties(); //设置数据库连接参数(表名大小写转换) properties.setProperty("database.tablename.case.insensitive","false"); properties.setProperty("database.connection.adapter", "logminer"); //降低oracle cdc 延迟 properties.setProperty("log.mining.strategy", "online_catalog"); properties.setProperty("log.mining.continuous.mine", "true"); //创建Stream环境 Configuration configuration = new Configuration(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration); //oracle 连接器配置 SourceFunction build = OracleSource.builder() .hostname("localhost") .port(1521) .database("XE") // monitor XE database .schemaList("test") // monitor test schema .tableList("DBTEST.tbtest1," + "DBTEST.TBTEST2") // monitor tables .username("flinkuser") .password("flinkpw") //从最新位置读取,可自行修改initial()、latest() .startupOptions(com.ververica.cdc.connectors.oracle.table.StartupOptions.latest()) .deserializer(new FlinkCdcDataDeserializationSchema()) // converts SourceRecord to JSON String .debeziumProperties(properties) .build(); env.setParallelism(2); DataStreamSource stringDataStreamSource = env.addSource(build); stringDataStreamSource.addSink(new CustomSink()); env.execute(); 2.2 Flink sql CREATE TABLE products ( db_name STRING METADATA FROM 'database_name' VIRTUAL, schema_name STRING METADATA FROM 'schema_name' VIRTUAL, table_name STRING METADATA FROM 'table_name' VIRTUAL, operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL, ID INT NOT NULL, NAME STRING, DESCRIPTION STRING, WEIGHT DECIMAL(10, 3), PRIMARY KEY(id) NOT ENFORCED ) WITH ( 'connector' = 'oracle-cdc', 'hostname' = 'localhost', 'port' = '1521', 'username' = 'flinkuser', 'password' = 'flinkpw', 'database-name' = 'XE', 'schema-name' = 'test', 'table-name' = 'tbtest', 'debezium.log.mining.strategy' = 'online_catalog', 'debezium.log.mining.continuous.mine' = 'true' ); 3. 补充 3.1 oracle相关记录SQL>select status from v$instance 查看实例状态 SQL>alter database open;打开数据库 ##无法启动时错误代码1261、1263 可指定pfile文件启动(路径按实际位置) startup mount pfile=’/oracle/app/oracle/admin/win01/pfile/init.ora’; 3.2 官方文档地址https://ververica.github.io/flink-cdc-connectors/master/content/connectors/oracle-cdc.html |
CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3 |