我们通过GoldenGate技术在OracleDB和Kafka代理之间创建集成,该技术实时发布Kafka中的CDC事件流。
Oracle在其OracleGoldenGateforBigData套件中提供了一个Kafka连接处理程序,用于将CDC(更改数据捕获)事件流推送到ApacheKafka集群。
因此,对于给定的Oracle数据库,成功完成的业务事务中的任何DML操作(插入、更新、删除)都将转换为实时发布的Kafka消息。
这种集成对于这类用例非常有趣和有用:
如果遗留的单片应用程序使用Oracle数据库作为单一数据源,那么应该可以通过监视相关表的更改来创建实时更新事件流。换句话说,我们可以实现来自遗留应用程序的数据管道,而无需更改它们。我们需要承认只有在数据库事务成功完成时才会发布Kafka消息。为了赋予这个特性,我们可以(始终以事务的方式)在一个由GoldenGate特别监视的表中编写Kafka消息,通过它的Kafka连接处理程序,将发布一个“插入”事件来存储原始的Kafka消息。在本文中,我们将逐步说明如何通过GoldenGate技术实现PoC(概念验证)来测试Oracle数据库与Kafka之间的集成。
PoC的先决条件我们将安装所有的东西在一个本地虚拟机,所以你需要:
安装OracleVirtualBox(我在OracleVirtualBox5.2.20上测试过)16gb的RAM。大约75GB的磁盘空间空闲。最后但并非最不重要的是:了解vi。PoC架构本指南将创建一个单一的虚拟机有:
Oracle数据库12c:要监视的表存储在其中。OracleGoldenGate12c(经典版本):将应用于监视表的业务事务实时提取,以中间日志格式(traillog)存储,并将其输送到另一个GoldenGate(用于大数据)实例管理的远程日志。OracleGoldenGateforBigData12c:pumped的业务事务并将其复制到Kafka消息中。ApacheZookeeper/ApacheKafka实例:在这里发布Kafka消息中转换的业务事务。换句话说,在某些Oracle表上应用的任何插入、更新和删除操作都将生成Kafka消息的CDC事件流,该事件流将在单个Kafka主题中发布。
下面是我们将要创建的架构和实时数据流:
步骤1/12:启动Oracle数据库您可以自由地安装Oracle数据库和OracleGoldenGate手动。但幸运的是……)Oracle共享了一些虚拟机,这些虚拟机已经安装了所有的东西,可以随时进行开发。
Oracle虚拟机可以在这里下载,你需要一个免费的Oracle帐户来获得它们。
我使用了OracleBigDataLite虚拟机(ver)。4.11),它包含了很多Oracle产品,包括:
Oracle数据库12c第一版企业版(12.1.0.2)OracleGoldenGate12c(12.3.0.1.2)从上述下载页面获取所有7-zip文件(约22GB),提取VM映像文件BigDataLite411。在OracleVirtualBox中双击文件,打开导入向导。完成导入过程后,一个名为BigDataLite-4.11的VM将可用。
启动BigDataLite-4.11并使用以下凭证登录:
用户:oracle密码:welcome1一个舒适的Linux桌面环境将会出现。
双击桌面上的“开始/停止服务”图标,然后:
检查第一项ORCL(Oracle数据库12c)。不要检查所有其他的东西(对PoC无用且有害)。按回车确认选择。最后,Oracle数据库将启动。
当您重新启动虚拟机时,Oracle数据库将自动启动。
与下载的虚拟机有关的其他有用信息:
Oracle主文件夹($ORACLE_HOME)是/u01/app/Oracle/product/12.1.0.2/dbhome_1GoldenGate(classic)安装在/u01/ogg中SQLDeveloper安装在/u01/sqldeveloper中。您可以从上面工具栏中的图标启动SQLDeveloper。Oracle数据库是作为多租户容器数据库(CDB)安装的。Oracle数据库监听端口是1521根容器的OracleSID是cdbPDB(可插拔数据库)的OracleSID是orcl所有Oracle数据库用户(SYS、SYSTEM等)的密码都是welcome1连接到PDB数据库的tnsname别名是ORCL(参见$ORACLE_HOME/network/admin/tnsnames)。ora文件内容)。Java主文件夹($JAVA_HOME)是/usr/java/latest$JAVA_HOME中安装的Java开发工具包是JDK8更新151。步骤2/12:在Oracle中启用归档日志我们需要在Oracle中启用归档日志来使用GoldenGate(classic)。
从VM的Linuxshell中启动SQLPlus作为SYS:
sqlplussys/welcome1assysdba
然后从SQL+shell运行这个命令列表(我建议一次启动一个):
ALTERDATABASEADDSUPPLEMENTALLOGDATA;ALTERDATABASEFORCELOGGING;ALTERSYSTEMSWITCHLOGFILE;ALTERSYSTEMSETENABLE_GOLDENGATE_REPLICATION=TRUE;SHUTDOWNIMMEDIATE;STARTUPMOUNT;ALTERDATABASEARCHIVELOG;ALTERDATABASEOPEN;
然后检查存档日志是否成功启用:
ARCHIVELOGLIST;
输出应该是这样的:
DatabaselogmodeArchiveModeAutomaticarchivalEnabledArchivedestinationUSE_DB_RECOVERY_FILE_DESTOldestonlinelogsequence527Nextlogsequencetoarchive529Currentlogsequence529
步骤3/12:创建一个ggadmin用户需要为GoldenGate(classic)创建一个特殊的Oracle管理员用户。
同样,从VM的Linuxshell中打开SQLPlus:
sqlplussys/welcome1作为sysdba
并通过运行这个脚本创建ggadmin用户:
ALTERSESSIONSET"_ORACLE_SCRIPT"=TRUE;CREATEUSERggadminIDENTIFIEDBYggadmin;GRANTCREATESESSION,CONNECT,RESOURCE,ALTERSYSTEMTOggadmin;EXECDBMS_GOLDENGATE_AUTH.GRANT_ADMIN_PRIVILEGE(grantee=>'ggadmin',privilege_type=>'CAPTURE',grant_optional_privileges=>'*');GRANTSELECTANYDICTIONARYTOggadmin;GRANTUNLIMITEDTABLESPACETOggadmin;
步骤4/12-创建ESHOP模式我们将创建一个模式(ESHOP),其中只有两个表(CUSTOMER_ORDER和CUSTOMER_ORDER_ITEM),用于生成要推送到Kafka中的CDC事件流。
使用SQLPlus(或者,如果您愿意,也可以使用SQLDeveloper)连接orcl作为SID的OraclePDB:
sqlplussys/welcome1@ORCLassysdba
运行这个脚本:
--initsessionALTERSESSIONSET"_ORACLE_SCRIPT"=TRUE;--createtablespaceforeshopCREATETABLESPACEeshop_tbsDATAFILE'eshop_tbs.dat'SIZE10MAUTOEXTENDON;CREATETEMPORARYTABLESPACEeshop_tbs_tempTEMPFILE'eshop_tbs_temp.dat'SIZE5MAUTOEXTENDON;--createuserschemaeshop,pleasenotethatthepasswordiseshopCREATEUSERESHOPIDENTIFIEDBYeshopDEFAULTTABLESPACEeshop_tbsTEMPORARYTABLESPACEeshop_tbs_temp;--granteshopuserpermissionsGRANTCREATESESSIONTOESHOP;GRANTCREATETABLETOESHOP;GRANTUNLIMITEDTABLESPACETOESHOP;GRANTRESOURCETOESHOP;GRANTCONNECTTOESHOP;GRANTCREATEVIEWTOESHOP;--createeshopsequencesCREATESEQUENCEESHOP.CUSTOMER_ORDER_SEQSTARTWITH1INCREMENTBY1NOCACHENOCYCLE;CREATESEQUENCEESHOP.CUSTOMER_ORDER_ITEM_SEQSTARTWITH1INCREMENTBY1NOCACHENOCYCLE;--createeshoptablesCREATETABLEESHOP.CUSTOMER_ORDER(IDNUMBER(19)PRIMARYKEY,CODEVARCHAR2(10),CREATEDDATE,STATUSVARCHAR2(32),UPDATE_TIMETIMESTAMP);CREATETABLEESHOP.CUSTOMER_ORDER_ITEM(IDNUMBER(19)PRIMARYKEY,ID_CUSTOMER_ORDERNUMBER(19),DESCRIPTIONVARCHAR2(255),QUANTITYNUMBER(3),CONSTRAINTFK_CUSTOMER_ORDERFOREIGNKEY(ID_CUSTOMER_ORDER)REFERENCESESHOP.CUSTOMER_ORDER(ID));
步骤5/12:初始化GoldenGateClassic现在是时候在BigDataListe-4.11虚拟机中安装GoldenGate(classic)实例了。
从Linuxshell运行:
cd/u01/ogg./ggsci
GoldenGateCLI(命令行界面)将启动:
OracleGoldenGateCommandInterpreterforOracleVersion12.2.0.1.0OGGCORE_12.2.0.1.0_PLATFORMS_151101.1925.2_FBOLinux,x64,64bit(optimized),Oracle12conNov11201503:53:23OperatingsystemcharactersetidentifiedasUTF-8.Copyright(C)1995,2015,Oracleand/oritsaffiliates.Allrightsreserved.GGSCI(bigdatalite.localdomain)1>
从GoldenGateCLI启动经理与以下命令:
startmgr
它将引导GoldenGate的主控制器进程(监听端口7810)。
现在创建一个凭据库来存储ggadmin用户凭据(并使用具有相同名称的别名来引用它们):
addcredentialstorealtercredentialstoreadduserggadminpasswordggadminaliasggadmin
现在,通过使用刚才创建的ggadmin别名连接到Oracle数据库,并启用对存储在名为orcl的PDB中的eshop模式的附加日志:
dbloginuseridaliasggadminaddschematrandataorcl.eshop
步骤6/12:制作金门果提取物在此步骤中,我们将创建一个GoldenGate摘要,此过程将监视Oraclearchive重做日志,以捕获与ESHOP表相关的数据库事务,并将此SQL修改流写入另一个名为traillog的日志文件中。
从GoldenGateCLI运行:
editparamsexteshop
该命令将打开一个引用新空文件的vi实例。在vi编辑器中放入以下内容:
EXTRACTexteshopUSERIDALIASggadminEXTTRAIL./dirdat/aaTABLEorcl.eshop.*;
保存内容并退出vi,以便返回GoldenGateCLI。
保存的内容将存储在/u01/ogg/dirprm/exteshop中。人口、难民和移民事务局文件。您也可以在外部编辑它的内容,而不需要再次从GoldenGateCLI运行“editparamsexteshop”命令。
现在在Oracle中注册提取过程,从GoldenGateCLI运行以下命令:
dbloginuseridaliasggadminregisterextractexteshopdatabasecontainer(orcl)
最后一个命令的输出应该是这样的:
OGG-02003ExtractEXTESHOPsuccessfullyregisteredwithdatabaseatSCN13624423.
使用所示的SCN号来完成提取配置。从GoldenGateCLI:
addextractexteshop,integratedtranlog,scn13624423addexttrail./dirdat/aa,extractexteshop
现在我们可以启动名为exteshop的GoldenGate提取过程:
startexteshop
你可以使用以下命令中的on来检查进程的状态:
infoexteshopviewreportexteshop
验证提取过程是否正常工作以完成此步骤。从Linuxshell运行以下命令,用SQLPlus(或SQLDeveloper)连接到ESHOP模式:
sqlpluseshop/eshop@ORCL
创建一个模拟客户订单:
INSERTINTOCUSTOMER_ORDER(ID,CODE,CREATED,STATUS,UPDATE_TIME)VALUES(CUSTOMER_ORDER_SEQ.NEXTVAL,'AAAA01',SYSDATE,'DRAFT',SYSTIMESTAMP);INSERTINTOCUSTOMER_ORDER_ITEM(ID,ID_CUSTOMER_ORDER,DESCRIPTION,QUANTITY)VALUES(CUSTOMER_ORDER_ITEM_SEQ.NEXTVAL,CUSTOMER_ORDER_SEQ.CURRVAL,'ToyStory',1);COMMIT;
最后,从GoldenGateCLI跑出来:
statsexteshop
并验证前面的插入操作是否已计算在内。下面是stats命令输出的一个小示例:
ExtractingfromORCL.ESHOP.CUSTOMER_ORDERtoORCL.ESHOP.CUSTOMER_ORDER:***Totalstatisticssince2019-05-2909:18:12***Totalinserts1.00Totalupdates0.00Totaldeletes0.00Totaldiscards0.00Totaloperations1.00
检查提取过程是否正常工作的另一种方法是检查GoldenGate跟踪日志文件的时间戳。在Linuxshell中运行“ls-l/u01/ogg/dirdat/”,并验证以“aa”开头的文件的时间戳已经更改。
步骤7/12:安装并运行ApacheKafka从VM的桌面环境中打开Firefox并下载ApacheKafka(我使用的是kafka_2.11-2.1.1.tgz)。
现在,打开一个Linuxshell并重置CLASSPATH环境变量(在BigDataLite-4.11虚拟机中设置的当前值会在Kafka中产生冲突):
declare-xCLASSPATH=""
从同一个Linuxshell中,解压缩压缩包,启动ZooKeeper和Kafka:
cdtarzxvfDownloads/kafka_2.11-2.1.1.tgzcdkafka_2.11-2.1.1./bin/zookeeper-server-start.sh-daemonconfig/zookeeper.properties./bin/kafka-server-start.sh-daemonconfig/server.properties
你可以通过启动“echostats|nclocalhost2181”来检查ZooKeeper是否正常:
[oracle@bigdatalite~]$echostats|nclocalhost2181Zookeeperversion:3.4.5-cdh5.13.1--1,builton11/09/201716:28GMTClients:/127.0.0.1:34997[1](queued=0,recved=7663,sent=7664)/0:0:0:0:0:0:0:1:17701[0](queued=0,recved=1,sent=0)Latencymin/avg/max:0/0/25Received:8186Sent:8194Connections:2Outstanding:0Zxid:0x3fMode:standaloneNodecount:25
您可以检查Kafka是否与“echodump|nclocalhost2181|grep代理”(一个字符串/brokers/ids/0应该出现)
[oracle@bigdatalite~]$echodump|nclocalhost2181|grepbrokers/brokers/ids/0
用于PoC的BigDataLite-4.11虚拟机已经在启动虚拟机时启动了一个较老的ZooKeeper实例。因此,请确保禁用了步骤1中描述的所有服务。
此外,当您打开一个新的Linuxshell时,请注意在启动ZooKeeper和Kafka之前总是要重置CLASSPATH环境变量,这一点在步骤开始时已经解释过了。
步骤8/12:为大数据安装GoldenGate同样,从这个页面下载OracleGoldenGateforBigData12c只需要使用VM中安装的Firefox浏览器(我在Linuxx86-64上使用OracleGoldenGateforBigData12.3.2.1.1)。请注意,您需要一个(免费)Oracle帐户来获得它。
安装很容易,只是爆炸压缩包内的下载:
cd~/DownloadsunzipOGG_BigData_Linux_x64_12.3.2.1.1.zipcd..mkdirogg-bd-poccdogg-bd-poctarxvf../Downloads/OGG_BigData_Linux_x64_12.3.2.1.1.tar
就这样,GoldenGateforBigData12c被安装在/home/oracle/ogg-bd-poc文件夹中。
同样,BigDataLite-4.11虚拟机已经在/u01/ogg-bd文件夹中安装了用于大数据的GoldenGate。但它是一个较旧的版本,连接Kafka的选项较少。
步骤9/12:启动GoldenGateforBigDataManager打开大数据大门
cd~/ogg-bd-poc./ggsci
需要更改管理器端口,否则之前启动的与GoldenGate(classic)管理器的冲突将被引发。
因此,从大数据的GoldenGate来看,CLI运行:
createsubdirseditparamsmgr
一个vi实例将开始,只是写这个内容:
PORT27801
然后保存内容,退出vi,返回CLI,我们终于可以启动GoldenGateforBigDatamanager监听端口27081:
步骤10/12:创建数据泵(DataPump)现在,我们需要创建在GoldenGate世界中被称为数据泵的东西。数据泵是一个提取过程,它监视一个跟踪日志,并(实时地)将任何更改推到另一个由不同的(通常是远程的)GoldenGate实例管理的跟踪日志。
对于这个PoC,由GoldenGate(classic)管理的traillogaa将被泵送至GoldenGate管理的traillogbb进行大数据处理。
因此,如果您关闭它,请回到来自Linuxshell的GoldenGate(经典)CLI:
cd/u01/ogg./ggsci
来自GoldenGate(经典)CLI:
editparamspmpeshop
并在vi中加入以下内容:
EXTRACTpmpeshopUSERIDALIASggadminSETENV(ORACLE_SID='orcl')--GoldenGateforBigDataaddress/port:RMTHOSTlocalhost,MGRPORT27801RMTTRAIL./dirdat/bbPASSTHRU--The"tokens"partitisusefulforwritingintheKafkamessages--theTransactionIDandthedatabaseChangeSerialNumberTABLEorcl.eshop.*,tokens(txid=@GETENV('TRANSACTION','XID'),csn=@GETENV('TRANSACTION','CSN'));
保存内容并退出vi。
正如已经解释的提取器,保存的内容将存储在/u01/ogg/dirprm/pmpeshop中。人口、难民和移民事务局文件。
现在我们要注册并启动数据泵,从GoldenGateCLI:
dbloginuseridaliasggadminaddextractpmpeshop,exttrailsource./dirdat/aabeginnowaddrmttrail./dirdat/bbextractpmpeshopstartpmpeshop
通过从CLI运行以下命令之一来检查数据泵的状态:
infopmpeshopviewreportpmpeshop
你甚至可以在金门大数据的dirdat文件夹中查看traillogbb是否已经创建:
[oracle@bigdatalitedirdat]$ls-l~/ogg-bd-poc/dirdattotal0-rw-r-----.1oracleoinstall0May3013:22bb000000000[oracle@bigdatalitedirdat]$
那检查泵送过程呢?来自Linuxshell:
sqlpluseshop/eshop@ORCL
执行这个SQL脚本创建一个新的模拟客户订单:
INSERTINTOCUSTOMER_ORDER(ID,CODE,CREATED,STATUS,UPDATE_TIME)VALUES(CUSTOMER_ORDER_SEQ.NEXTVAL,'AAAA02',SYSDATE,'SHIPPING',SYSTIMESTAMP);INSERTINTOCUSTOMER_ORDER_ITEM(ID,ID_CUSTOMER_ORDER,DESCRIPTION,QUANTITY)VALUES(CUSTOMER_ORDER_ITEM_SEQ.NEXTVAL,CUSTOMER_ORDER_SEQ.CURRVAL,'InsideOut',1);COMMIT;
现在从GoldenGate(经典)CLI运行:
statspmpeshop
用于检查插入操作是否正确计数(在输出的一部分下面):
GGSCI(bigdatalite.localdomainasggadmin@cdb/CDB$ROOT)11>statspmpeshopSendingSTATSrequesttoEXTRACTPMPESHOP...StartofStatisticsat2019-05-3014:49:00.Outputto./dirdat/bb:ExtractingfromORCL.ESHOP.CUSTOMER_ORDERtoORCL.ESHOP.CUSTOMER_ORDER:***Totalstatisticssince2019-05-3014:01:56***Totalinserts1.00Totalupdates0.00Totaldeletes0.00Totaldiscards0.00Totaloperations1.00
此外,您还可以验证GoldenGate中存储的用于测试泵过程的大数据的跟踪日志的时间戳。事务提交后,从Linuxshell运行:“ln-l~/og-bd-poc/dirdat”,并检查最后一个以“bb”作为前缀的文件的时间戳。
步骤11/12:将事务发布到Kafka最后,我们将在GoldenGate中为BigData创建一个副本流程,以便在Kafka主题中发布泵出的业务事务。replicat将从trail日志bb读取事务中的插入、更新和删除操作,并将它们转换为JSON编码的Kafka消息。
因此,创建一个名为eshop_kafkaconnect的文件。文件夹/home/oracle/ogg-bd-pocd/dirprm中的属性包含以下内容:
#File:/home/oracle/ogg-bd-poc/dirprm/eshop_kafkaconnect.properties#-----------------------------------------------------------#address/portoftheKafkabrokerbootstrap.servers=localhost:9092acks=1#JSONConverterSettingskey.converter=org.apache.kafka.connect.json.JsonConverterkey.converter.schemas.enable=falsevalue.converter=org.apache.kafka.connect.json.JsonConvertervalue.converter.schemas.enable=false#Adjustforperformancebuffer.memory=33554432batch.size=16384linger.ms=0#Thispropertyfixastart-uperrorasexplainedbyOracleSupporthere:#https://support.oracle.com/knowledge/Middleware/2455697_1.htmlconverter.type=key
在同一个文件夹中,创建一个名为eshop_kc的文件。具有以下内容的道具:
#File:/home/oracle/ogg-bd-poc/dirprm/eshop_kc.props#---------------------------------------------------gg.handlerlist=kafkaconnect#Thehandlerpropertiesgg.handler.kafkaconnect.type=kafkaconnectgg.handler.kafkaconnect.kafkaProducerConfigFile=eshop_kafkaconnect.propertiesgg.handler.kafkaconnect.mode=tx#Thefollowingselectsthetopicnamebasedonlyontheschemanamegg.handler.kafkaconnect.topicMappingTemplate=CDC-${schemaName}#Thefollowingselectsthemessagekeyusingtheconcatenatedprimarykeysgg.handler.kafkaconnect.keyMappingTemplate=${primaryKeys}#Theformatterpropertiesgg.handler.kafkaconnect.messageFormatting=opgg.handler.kafkaconnect.insertOpKey=Igg.handler.kafkaconnect.updateOpKey=Ugg.handler.kafkaconnect.deleteOpKey=Dgg.handler.kafkaconnect.truncateOpKey=Tgg.handler.kafkaconnect.treatAllColumnsAsStrings=falsegg.handler.kafkaconnect.iso8601Format=falsegg.handler.kafkaconnect.pkUpdateHandling=abendgg.handler.kafkaconnect.includeTableName=truegg.handler.kafkaconnect.includeOpType=truegg.handler.kafkaconnect.includeOpTimestamp=truegg.handler.kafkaconnect.includeCurrentTimestamp=truegg.handler.kafkaconnect.includePosition=truegg.handler.kafkaconnect.includePrimaryKeys=truegg.handler.kafkaconnect.includeTokens=truegoldengate.userexit.writers=javawriterjavawriter.stats.display=TRUEjavawriter.stats.full=TRUEgg.log=log4jgg.log.level=INFOgg.report.time=30sec#ApacheKafkaClasspath#Putthepathofthe"libs"folderinsidetheKafkahomepathgg.classpath=/home/oracle/kafka_2.11-2.1.1/libs/*javawriter.bootoptions=-Xmx512m-Xms32m-Djava.class.path=.:ggjava/ggjava.jar:./dirprm
如果关闭,重启大数据CLI的GoldenGate:
cd~/ogg-bd-poc./ggsci
andstarttocreateareplicatfromtheCLIwith:
editparamsrepeshop
inviputthiscontent:
REPLICATrepeshopTARGETDBLIBFILElibggjava.soSETproperty=dirprm/eshop_kc.propsGROUPTRANSOPS1000MAPorcl.eshop.*,TARGETorcl.eshop.*;
然后保存内容并退出vi。现在将replicat与traillogbb关联,并使用以下命令启动replicat进程,以便从GoldenGate启动大数据CLI:
addreplicatrepeshop,exttrail./dirdat/bbstartrepeshop
Checkthatthereplicatisliveandkickingwithoneofthesecommands:
inforepeshopviewreportrepeshop
Now,connecttotheESHOPschemafromanotherLinuxshell:
sqlpluseshop/eshop@ORCL
andcommitsomething:
INSERTINTOCUSTOMER_ORDER(ID,CODE,CREATED,STATUS,UPDATE_TIME)VALUES(CUSTOMER_ORDER_SEQ.NEXTVAL,'AAAA03',SYSDATE,'DELIVERED',SYSTIMESTAMP);INSERTINTOCUSTOMER_ORDER_ITEM(ID,ID_CUSTOMER_ORDER,DESCRIPTION,QUANTITY)VALUES(CUSTOMER_ORDER_ITEM_SEQ.NEXTVAL,CUSTOMER_ORDER_SEQ.CURRVAL,'Cars3',2);COMMIT;
FromtheGoldenGateforBigDataCLI,checkthattheINSERToperationwascountedforthereplicatprocessbyrunning:
statsrepeshop
And(hurrah!)wecanhavealookinsideKafka,astheLinuxshellchecksthatthetopicnamedCDC-ESHOPwascreated:
cd~/kafka_2.11-2.1.1/bin./kafka-topics.sh--list--zookeeperlocalhost:2181
andfromthesamefolderrunthefollowingcommandforshowingtheCDCeventsstoredinthetopic:
./kafka-console-consumer.sh--bootstrap-serverlocalhost:9092--topicCDC-ESHOP--from-beginning
Youshouldseesomethinglike:
[oracle@bigdatalitekafka_2.11-2.1.1]$./bin/kafka-console-consumer.sh--bootstrap-serverlocalhost:9092--topicCDC-ESHOP--from-beginning{"table":"ORCL.ESHOP.CUSTOMER_ORDER","op_type":"I","op_ts":"2019-05-3104:24:34.000327","current_ts":"2019-05-3104:24:39.637000","pos":"00000000020000003830","primary_keys":["ID"],"tokens":{"txid":"9.32.6726","csn":"13906131"},"before":null,"after":{"ID":11.0,"CODE":"AAAA03","CREATED":"2019-05-3104:24:34","STATUS":"DELIVERED","UPDATE_TIME":"2019-05-3104:24:34.929950000"}}{"table":"ORCL.ESHOP.CUSTOMER_ORDER_ITEM","op_type":"I","op_ts":"2019-05-3104:24:34.000327","current_ts":"2019-05-3104:24:39.650000","pos":"00000000020000004074","primary_keys":["ID"],"tokens":{"txid":"9.32.6726","csn":"13906131"},"before":null,"after":{"ID":11.0,"ID_CUSTOMER_ORDER":11.0,"DESCRIPTION":"Cars3","QUANTITY":2}}
Forabetteroutput,installjq:
sudoyum-yinstalljq./kafka-console-consumer.sh--bootstrap-serverlocalhost:9092--topicCDC-ESHOP--from-beginning|jq.
andhereishowwillappeartheJSONevents:
{"table":"ORCL.ESHOP.CUSTOMER_ORDER","op_type":"I","op_ts":"2019-05-3104:24:34.000327","current_ts":"2019-05-3104:24:39.637000","pos":"00000000020000003830","primary_keys":["ID"],"tokens":{"txid":"9.32.6726","csn":"13906131"},"before":null,"after":{"ID":11,"CODE":"AAAA03","CREATED":"2019-05-3104:24:34","STATUS":"DELIVERED","UPDATE_TIME":"2019-05-3104:24:34.929950000"}}{"table":"ORCL.ESHOP.CUSTOMER_ORDER_ITEM","op_type":"I","op_ts":"2019-05-3104:24:34.000327","current_ts":"2019-05-3104:24:39.650000","pos":"00000000020000004074","primary_keys":["ID"],"tokens":{"txid":"9.32.6726","csn":"13906131"},"before":null,"after":{"ID":11,"ID_CUSTOMER_ORDER":11,"DESCRIPTION":"Cars3","QUANTITY":2}}
现在打开Kafka-console-consumer.sh进程,并在ESHOP上执行其他一些数据库事务,以便实时打印发送给Kafka的CDC事件流。
以下是一些用于更新和删除操作的JSON事件示例:
//Generatedwith:UPDATECUSTOMER_ORDERSETSTATUS='DELIVERED'WHEREID=8;{"table":"ORCL.ESHOP.CUSTOMER_ORDER","op_type":"U","op_ts":"2019-05-3106:22:07.000245","current_ts":"2019-05-3106:22:11.233000","pos":"00000000020000004234","primary_keys":["ID"],"tokens":{"txid":"14.6.2656","csn":"13913689"},"before":{"ID":8,"CODE":null,"CREATED":null,"STATUS":"SHIPPING","UPDATE_TIME":null},"after":{"ID":8,"CODE":null,"CREATED":null,"STATUS":"DELIVERED","UPDATE_TIME":null}}//Generatedwith:DELETECUSTOMER_ORDER_ITEMWHEREID=3;{"table":"ORCL.ESHOP.CUSTOMER_ORDER_ITEM","op_type":"D","op_ts":"2019-05-3106:25:59.000916","current_ts":"2019-05-3106:26:04.910000","pos":"00000000020000004432","primary_keys":["ID"],"tokens":{"txid":"14.24.2651","csn":"13913846"},"before":{"ID":3,"ID_CUSTOMER_ORDER":1,"DESCRIPTION":"ToyStory","QUANTITY":1},"after":null}
恭喜你!你完成了PoC:
步骤12/12:使用PoCGoldenGate中提供的KafkaConnect处理程序有很多有用的选项,可以根据需要定制集成。点击这里查看官方文件。
例如,您可以选择为CDC流中涉及的每个表创建不同的主题,只需在eshop_kc.props中编辑此属性:
gg.handler.kafkaconnect.topicMappingTemplate=CDC-${schemaName}-${tableName}
更改后重新启动replicat,从GoldenGateforBigDataCLI:
stoprepeshopstartrepeshop
您可以在“~/og-bd-poc/AdapterExamples/big-data/kafka_connect”文件夹中找到其他配置示例。
结论在本文中,我们通过GoldenGate技术在Oracle数据库和Kafka代理之间创建了一个完整的集成。CDC事件流以Kafka实时发布。
为了简单起见,我们使用了一个已经全部安装的虚拟机,但是您可以在不同的主机上免费安装用于大数据的GoldenGate和Kafka。
请在评论中告诉我您对这种集成的潜力(或限制)的看法。
原文:https://dzone.com/articles/creates-a-cdc-stream-from-oracle-database-to-kafka
本文:https://pub.intelligentx.net/node/839
讨论:请加入知识星球【首席架构师圈】或者飞聊小组【首席架构师智库】