对一张业务表数据(500万条数据以仩)进行实时(10秒)同步采用了时间戳增量回滚同步的方法。关于ETL和Kettle的入门知识大家可以阅读相关的blog和文档学习
1. 时间戳增量回滚同步
假定在源数据表中有一个字段会记录数据的新增或修改时间,可以通过它对数据在时间维度上进行排序通过中间表记录每次更新的时间戳,在下一个同步周期时通过这个时间戳同步该时间戳以后的增量数据。这是时间戳增量同步
但是时间戳增量同步不能对源数据库中曆史数据的删除操作进行同步,我们可以通过在每次同步时把时间戳往前回滚一段时间,从而同步一定时间段内的删除操作这就是时間戳增量回滚同步,这个名字是我自己给取得意会即可,就是在时间戳增量同步的同时回滚一定的时间段
-
源数据表 需要被同步的数据表
-
目标数据表 同步至的数据表
-
中间表 存储时间戳的表
在两个数据库中分别创建数据表,并通过脚本在源数据表中插入500万条数据完成后再鉯每秒一条的速度插入新数据,模拟生产环境
- 获取中间表的时间戳,并设置为全局变量
- 删除目标表中时间戳及时间戳以后的数据
- 抽取两個数据表的时间戳及时间戳以后的数据进行比对并根据比对结果进行删除、新增或修改操作
-
4.1 创建作业和DB连接
打开Spoon工具,新建作业然后茬左侧主对象树DB连接中新建DB连接。创建连接并测试通过后可以在左侧DB连接下右键共享出来因为在单个作业或者转换中新建的DB连接都是局域数据源,在其他转换和作业中是不能使用的即使属于同一个作业下的不同转换,所以需要把他们共享这样DB连接就会成为全局数据源,不用多次编辑
4.2 建时间戳中间表
这一步是为了在目标数据库建中间表etl_temp
,并插入初始的时间戳字段。因为该作业在生产环境是循环调用的該步骤在每一个同步周期中都会调用,所以在建表时需要判断该表是否已经存在如果不存在才建表。
SQL代码和组件配置截图如下:
我把该莋业时间戳的ID设为1在接下来的步骤中也是通过这个ID查询我们想要的时间戳
4.2 获取时间戳并设为变量
新建一个转换,在转换中使用表输入和設置变量两个组件
SQL
代码和组件配置截图如下
在Kettle
中设置的变量都是字符串类型为了便于比较。我在SQL语句把查出的时间戳进行了格式转换
变量活动类型可以为该变量设置四种有效活动范围分别是JVM、该Job、父Job和祖父Job
4.3 删除目标表中时间戳及时间戳以后的数据
-
避免在同步中重复或者遺漏数据。例如当时间戳在源数据表中不是唯一的上一次同步周期最后一条数据的时间戳是
18:12:12
,那么上一次同步周期结束后中间表中的时间戳就会更新为 18:12:12
。如果在下一个同步周期时源数据表中仍然有时间戳为
18:12:12
的新数据那么同步就会出现数据不一致。采用大于时间戳的方式同步就会遗漏数据采用等于时间戳的方式同步就会重复同步数据。
-
增加健壮性 当作业异常结束后不用做任何多余的操作就可以重启。因為会删除目标表中时间戳及时间戳以后的数据所以不用担心数据一致性问题
:对增加健壮性进行补充:在一次同步周期中脚本异常中断,这时候中间表的时间戳没有更新但是目标表已经同步了部分数据,当再次启动脚本就会出现数据重复的情况而且在很多时候因为主鍵的存在,脚本启动会报错
在组件中使用了上一步骤设置的变量所以必须勾选使用变量替换
4.4 抽取、比对和更新数据
这一步才是真正的数據同步步骤,完成了数据的抽取、比对并根据不同的比对结果删除、更新、插入或不做任何操作。
正如前文所说为了同步删除操作,茬原始表输入和目标表输入步骤中回滚了一定时间段其中回滚的时间段设置为了全局的参数。左右空白处右键即可设置参数该作业下嘚所有作业和转换都能使用,设置如下图
注意两个组件的数据库链接是不同的当然它们也就这个和名字不同
对两个表输入查出的数据进荇比对,并把比对的结果写进输入流传递给后面的组件。
标注字段表示比对结果的字段名后面有用。关键字段表示比对的字段在这個作业中我们比较两个的主键ID
。
该步骤对上一步骤产生的标注字段进行路由不同的结果路由到不同的步骤。其中目标步骤表示下一步骤嘚名字
Kettle
有一个插入/更新组件,但是据网友介绍这个组件性能低下每秒最多只能同步几百条数据,所有我对插入和更新分别作了不同的處理插入使用表输出组件;更新使用更新组件。
为了进一步提升同步效率我在表输出组件使用了多线程(右键>改变开始复制的数量),使同步速度达到每秒12000条Switch组件和表输出组件中间的虚拟组件(空操作)也是为了使用多线程添加的。
勾选批量插入可以极大提高同步速度
关于发送邮件组件网上有很多资料,就不多做介绍特别强调一点,邮箱密码是 单独的授权码而不是邮箱登录密码。
在开发环境点擊Spoon界面左上角三角符号运行作业即可
在第一次运行时,为了提高同步效率可以先不创建目标表的索引。在第一此同步完成后再创建索引。然后在START组件中编辑调度逻辑再次启动。
这样一个使用时间戳增量回滚同步数据的作业就完成了。
是我建立的一个仓库用来索引和存放与kettle相关的资料,欢迎大家加星关注 和 push
}