kettle中通过时间戳方式来实现数据库的增量同步操作

上传人:鲁** 文档编号:465485862 上传时间:2023-08-21 格式:DOC 页数:8 大小:36.50KB
返回 下载 相关 举报
kettle中通过时间戳方式来实现数据库的增量同步操作_第1页
第1页 / 共8页
kettle中通过时间戳方式来实现数据库的增量同步操作_第2页
第2页 / 共8页
kettle中通过时间戳方式来实现数据库的增量同步操作_第3页
第3页 / 共8页
kettle中通过时间戳方式来实现数据库的增量同步操作_第4页
第4页 / 共8页
kettle中通过时间戳方式来实现数据库的增量同步操作_第5页
第5页 / 共8页
点击查看更多>>
资源描述

《kettle中通过时间戳方式来实现数据库的增量同步操作》由会员分享,可在线阅读,更多相关《kettle中通过时间戳方式来实现数据库的增量同步操作(8页珍藏版)》请在金锄头文库上搜索。

1、kettle 中通过时间戳( timestamp)方式来实现数据库的增量同步操作(一)这个实验主要思想是在创建数据库表的时候,通过增加一个额外的字段,也就是时间戳字段,例如在同步表 tt1 和表 tt2 的时候,通过检查那个表是最新更新的,那个表就作为新表,而另外的表最为旧表被新表中的数据进行更新。实验数据如下:mysql database 5.1test.tt1( id int primary key , name varchar(50) );mysql.tt2( id int primary key, name varchar(50) );快照表,可以将其存放在test 数据库中,同样可以

2、为了简便,可以将其创建为temporary 表类型。数据如图kettle-1kettle-1=主流程如图kettle-2kettle-2在 prepare 中,向 tt1 , tt2 表中增加 时间戳字段,由于 tt1 , tt2 所在的数据库是不同的,所以分别创建两个数据库的连接。preparekettle-3在执行这个job 之后,就会在数据库查询的时候看到下面的字段:kettle-4然后,我们来对tt1 表做一个insert 操作一个 update 操作吧kettle-5在原表上无论是insert 操作还是update 操作,对应的updateTime 都会发生变更。如果 tt1 表 和

3、 tt2 表中updateTime 字段为最新时间的话,则说明该表是新表。下面只要是对应main_thread 的截图:kettle-6在这里介绍一下Main 的层次:MainSTARTMain.prepareMain.main_threadSTARTmain_thread.create_tempTablemain_thread.insert_tempTablemain_thread.tt1_tt2_synSUCCESSMain.finishSUCCESS在 main_thread 中的过程是这样的:作为一个局部的整体,使它每隔200s 内进行一次循环,这样的话, 如果在其中有指定的表tt1

4、或是tt2 对应被更新或是插入的话,该表中的 updateTime 字段就会被捕捉到,并且进行同步。如果没有更新出现,则会走switch 的 default 路线对应的是write to log.继续循环。首先创建一个快照表, 然后将 tt1 ,tt2 表中的最大 ( 最新 )时间戳的值插入到快照表中。然后,通过一个 transformation 来判断那个表的 updateTime 值最新,来选择对应是tt1 表来更新tt2 还是tt2 表来更新tt1 表;main_thread.create_tempTable.JOB:main_thread.insert_tempTable.Job:PS

5、: 对于第二个SQL 应该改成 (不修改会出错的)set var1 = ( select MAX(updatetime) from tt2);insert into test.temp values ( 2 , var1 ) ;因为 conn 对应的是连接mysql (数据库实例名称) ,但是我们把快照表和 tt1 表都存到了 test(数据库实例名称)里面。在上面这个图中对应的语句是想实现,在temp表中插入两行记录元组。其中 id 为 1 的元组对应的 temp.lastTime 字段 是 从 tt1 表中选出的 updateTime 值为最新的,id 为 2 的元组对应的 temp.la

6、stTime 字段 是 从 tt2 表中选出的 updateTime 值为最新的 字段。当然 , id 是用来给后续 switch 操作提供参考的,用于标示最新 updateTime 是来自 tt1 还是 tt2,同样也可以使用tableName varchar(50) 这种字段来存放最新 updateTime 对应的数据库 .数据表的名称也可以的。main_thread.tt1_tt2_syn.Transformation:首先,创建连接test 数据库的temp 表的连接,选择temp 表中对应 lastTime 值最新的所在的记录所对应的id 号码。首先将 temp 中 lastTime

7、 字段进行 降序排列,然后选择 id , 并且将选择记录仅限定成一行。然后根据 id 的值进行 switch 选择。在这里 LZ 很想使用, SQL Executor ,但是它无法返回对应的 id 值。但是表输入可以返回对应的id 值,并被 switch 接收到。下图是对应 switch id = 1 的时候:即注意合并行比较 的新旧数据源 的选择tt1更新tt2和 Insert/Update 中的 Target table 的选择下图是对应 switch id = 2 的时候:即注意合并行比较 的新旧数据源 的选择tt2更新tt1和 Insert/Update 中的 Target table 的选择但是考虑到增加一个column 会浪费很多的空间,所以咋最终结束同步之后使用finish 操作步骤来将该updateTime 这个字段进行删除操作即可。这个与 Main 中的 prepare 的操作是相对应的。Main.finish这样的话,实验环境已经搭建好了,接下来进行,实验的数据测试了,写到下一个博客中。当然,触发器也是一种同步的好方法,写到后续博客中吧时间戳的方式相比于触发器,较为简单并且通用,但是数据库表中的时间戳字段,很费空间,并且无法对应删除操作,也就是说表中删除一行记录,该表应该作为新表来更新其余表,但是由于记录删除时间戳无所依附所以无法记录到。

展开阅读全文
相关资源
相关搜索

当前位置:首页 > 办公文档 > 活动策划

电脑版 |金锄头文库版权所有
经营许可证:蜀ICP备13022795号 | 川公网安备 51140202000112号