OGG实现Oracle数据同步到Kafka

您所在的位置:网站首页 kafka数据库同步项目 OGG实现Oracle数据同步到Kafka

OGG实现Oracle数据同步到Kafka

#OGG实现Oracle数据同步到Kafka| 来源: 网络整理| 查看: 265

环境:源端:Oracle12.2 ogg for Oracle 12.3目标端:Kafka ogg for bigdata 12.3将Oracle中的数据通过OGG同步到Kafka

源端配置:1、为要同步的表添加附加日志dblogin USERID ogg@orclpdb, PASSWORD oggadd trandata scott.tab1add trandata scott.tab2

2、 添加抽取进程GGSCI>add extract EXT_KAF1,integrated tranlog, begin nowGGSCI>add EXTTRAIL ./dirdat/k1, extract EXT_KAF1,MEGABYTES 200

编辑抽取进程参数:GGSCI> edit params EXT_KAF1

extract EXT_KAF1userid c##ggadmin,PASSWORD ggadminLOGALLSUPCOLSUPDATERECORDFORMAT COMPACTexttrail ./dirdat/k1,FORMAT RELEASE 12.3SOURCECATALOG orclpdb --(指定pdb)table scott.tab1;table scott.tab2;

注册进程GGSCI> DBLOGIN USERID c##ggadmin,PASSWORD ggadminGGSCI> register extract EXT_KAF1 database container (orclpdb)

3、添加投递进程:GGSCI>add extract PMP_KAF1, exttrailsource ./dirdat/k1GGSCI>add rmttrail ./dirdat/f1,EXTRACT PMP_KAF1,MEGABYTES 200

编辑投递进程参数:GGSCI>edit param PMP_KAF1

EXTRACT PMP_KAF1USERID c##ggadmin,PASSWORD ggadminPASSTHRURMTHOST 10.1.1.247, MGRPORT 9178RMTTRAIL ./dirdat/f1,format release 12.3SOURCECATALOG orclpdbTABLE scott.tab1;table scott.tab2;

4、添加数据初始化进程(Oracle initial load) 可以多个表分开初始化也可以一起初始化,此处选择分开初始化GGSCI> add extract ek_01, sourceistable

编辑参数:GGSCI> EDIT PARAMS ek_01

EXTRACT ek_01 USERID c##ggadmin,PASSWORD ggadminRMTHOST 10.1.1.247, MGRPORT 9178RMTFILE ./dirdat/ka,maxfiles 999, megabytes 500,format release 12.3SOURCECATALOG orclpdbtable scott.tab1;

GGSCI> add extract ek_02, sourceistable

EDIT PARAMS ek_02EXTRACT ek_02USERID c##ggadmin,PASSWORD ggadminRMTHOST 10.1.1.247, MGRPORT 9178RMTFILE ./dirdat/kb,maxfiles 999, megabytes 500,format release 12.3SOURCECATALOG orclpdbtable scott.tab2;

5、生成def文件:GGSCI> edit param defgen1

USERID c##ggadmin,PASSWORD ggadmindefsfile /home/oracle/ogg/ggs12/dirdef/defgen1.def,format release 12.3SOURCECATALOG orclpdbtable scott.tab1;table scott.tab2;

在OGG_HOME下执行如下命令生成def文件defgen paramfile dirprm/defgen1.prm

将生成的def文件传到目标端$OGG_HOME/dirdef下

目标端配置:1、将$OGG_HOME/AdapterExamples/big-data/kafka下的所有文件copy到$OGG_HOME/dirprm下cd $OGG_HOME/AdapterExamples/big-data/kafkacp * $OGG_HOME/dirprm

2、将$ORACLE_HOME/AdapterExamples/trail下的文件tr000000000 copy到$OGG_HOME/dirdat下cd $ORACLE_HOME/AdapterExamples/trailcp tr000000000 $OGG_HOME/dirdat

3、添加初始化进程:(可以多表一起初始化也可以分开初始化,此处选择单独初始化)GGSCI> ADD replicat rp_01, specialrun

GGSCI> EDIT PARAMS rp_01SPECIALRUNend runtimesetenv(NLS_LANG="AMERICAN_AMERICA.ZHS16GBK")targetdb libfile libggjava.so set property=./dirprm/kafka1.propsSOURCEDEFS ./dirdef/defgen1.defEXTFILE ./dirdat/kareportcount every 1 minutes, rategrouptransops 10000MAP orclpdb.scott.tab1, TARGET scott.tab1;

GGSCI> ADD replicat rp_02, specialrunGGSCI> EDIT PARAMS rp_02

SPECIALRUNend runtimesetenv(NLS_LANG="AMERICAN_AMERICA.ZHS16GBK")targetdb libfile libggjava.so set property=./dirprm/kafka2.propsSOURCEDEFS ./dirdef/defgen1.defEXTFILE ./dirdat/kbreportcount every 1 minutes, rategrouptransops 10000MAP orclpdb.scott.tab2, TARGET scott.tab2;

4、添加恢复进程:GGSCI>add replicat r_kaf1,exttrail ./dirdat/f1GGSCI>edit params r_kaf1

REPLICAT r_kaf1setenv(NLS_LANG="AMERICAN_AMERICA.ZHS16GBK")HANDLECOLLISIONStargetdb libfile libggjava.so set property=./dirprm/kafka1.propsSOURCEDEFS ./dirdef/defgen1.defreportcount every 1 minutes, rategrouptransops 10000MAP orclpdb.scott.tab1, TARGET scott.tab1;

GGSCI> add replicat r_kaf2,exttrail ./dirdat/f2GGSCI> edit params r_kaf2

REPLICAT r_kaf2setenv(NLS_LANG="AMERICAN_AMERICA.ZHS16GBK")HANDLECOLLISIONStargetdb libfile libggjava.so set property=./dirprm/kafka2.propsSOURCEDEFS ./dirdef/defgen1.defreportcount every 1 minutes, rategrouptransops 10000MAP orclpdb.scott.tab2, TARGET scott.tab2;

5、参数配置:custom_kafka_producer.properties文件内容如下:

bootstrap.servers=10.1.1.246:9200,10.1.1.247:9200 --只需要改动这一行就行,指定kafka的地址和端口号acks=1reconnect.backoff.ms=1000value.serializer=org.apache.kafka.common.serialization.ByteArraySerializerkey.serializer=org.apache.kafka.common.serialization.ByteArraySerializerbatch.size=16384linger.ms=10000

kafka1.props文件内容如下:gg.handlerlist = kafkahandlergg.handler.kafkahandler.type=kafkagg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties#The following resolves the topic name using the short table namegg.handler.kafkahandler.topicMappingTemplate= topic1#gg.handler.kafkahandler.format=avro_opgg.handler.kafkahandler.format =json --这里做了改动,指定格式为json格式gg.handler.kafkahandler.format.insertOpKey=Igg.handler.kafkahandler.format.updateOpKey=Ugg.handler.kafkahandler.format.deleteOpKey=Dgg.handler.kafkahandler.format.truncateOpKey=Tgg.handler.kafkahandler.format.prettyPrint=falsegg.handler.kafkahandler.format.jsonDelimiter=CDATA[]gg.handler.kafkahandler.format.includePrimaryKeys=true --包含主键gg.handler.kafkahandler.SchemaTopicName= topic1 --此处指定为要同步到的目标topic名字gg.handler.kafkahandler.BlockingSend =falsegg.handler.kafkahandler.includeTokens=falsegg.handler.kafkahandler.mode=opgoldengate.userexit.timestamp=utcgoldengate.userexit.writers=javawriterjavawriter.stats.display=TRUEjavawriter.stats.full=TRUEgg.log=log4jgg.log.level=INFOgg.report.time=30sec#Sample gg.classpath for Apache Kafkagg.classpath=dirprm/:/opt/cloudera/parcels/KAFKA/lib/kafka/libs/ --指定classpath,这里很重要,必须有kafka安装文件的类库。#Sample gg.classpath for HDP#gg.classpath=/etc/kafka/conf:/usr/hdp/current/kafka-broker/libs/javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=ggjava/ggjava.jar

启动进程进程恢复:1、启动源端抓取进程GGSCI> start EXT_KAF12、启动源端投递进程GGSCI> start PMP_KAF13、启动源端初始化进程GGSCI> start ek_014、启动目标端初始化进程在$OGG_HOME下执行如下命令:./replicat paramfile ./dirprm/rp_01.prm reportfile ./dirrpt/rp_01.rpt -p INITIALDATALOAD5、启动目标端恢复进程GGSCI> start R_KAF1

遇到的错误:1、ERROR OGG-15050 Error loading Java VM runtime library(2 no such file or directory)

原因:找不到类库(配置好环境变量之后,OGG的mgr进程没有重启,导致的)解决:重启MGR进程

2、ERROR OG-15051 Java or JNI exception

原因:没有使用ogg12.3.1.1.1自带的kafka.props,而是copy了ogg12.2的kafka.props,导致出现异常。解决:使用ogg12.3.1.1.1自带的kafka.props,并指定相关的属性,解决。

更多Oracle相关信息见Oracle 专题页面 https://www.linuxidc.com/topicnews.aspx?tid=12

本文永久更新链接地址:https://www.linuxidc.com/Linux/2018-03/151513.htm

linux


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3