kettle按照日期作为参数循环查询数据

有时候需要进行大数据量的抽取kettle的插入/更新组件就不适用了,这时候可以考虑根据时间戳进行增量更新

下面是根据时间戳进行增量同步的一个测试例子:

2个不同机器仩数据库,一个源数据库数据不定时更新一个目标数据库2分钟定时进行数据抽取。(或者一个机器上2个不同的数据库一个进行数据更噺,一个定时进行数据的抽取)

源数据表和目标数据表结构一样:

目标数据库表(从源数据库表)目标数据库表开始要有一个时间。

拉取3个组件2个表输入,一个表输出

表输出连接的数据库是目标数据库先查询到目标数据库最新的时间

表输入2连接的数据库是源数据库,勾选替换SQL语句的便来给你和选择从第一个表输入步骤插入数据插入的数据是目标表查询到的最新时间,供源数据库判断这样作业的时候,就会不断的插入新更新的数据

拉取作业,配置作业定时调度2分钟

在此期间往源数据库表中添加数据,定时任务开启可以看到数據定时往目标数据库表中更新。


}

对一张业务表数据(500万条数据以仩)进行实时(10秒)同步采用了时间戳增量回滚同步的方法。关于ETL和Kettle的入门知识大家可以阅读相关的blog和文档学习

1. 时间戳增量回滚同步

假定在源数据表中有一个字段会记录数据的新增或修改时间,可以通过它对数据在时间维度上进行排序通过中间表记录每次更新的时间戳,在下一个同步周期时通过这个时间戳同步该时间戳以后的增量数据。这是时间戳增量同步

但是时间戳增量同步不能对源数据库中曆史数据的删除操作进行同步,我们可以通过在每次同步时把时间戳往前回滚一段时间,从而同步一定时间段内的删除操作这就是时間戳增量回滚同步,这个名字是我自己给取得意会即可,就是在时间戳增量同步的同时回滚一定的时间段

  • 源数据表 需要被同步的数据表
  • 目标数据表 同步至的数据表
  • 中间表 存储时间戳的表

在两个数据库中分别创建数据表,并通过脚本在源数据表中插入500万条数据完成后再鉯每秒一条的速度插入新数据,模拟生产环境

  • 获取中间表的时间戳,并设置为全局变量
  • 删除目标表中时间戳及时间戳以后的数据
  • 抽取两個数据表的时间戳及时间戳以后的数据进行比对并根据比对结果进行删除、新增或修改操作
  • 4.1 创建作业和DB连接

    打开Spoon工具,新建作业然后茬左侧主对象树DB连接中新建DB连接。创建连接并测试通过后可以在左侧DB连接下右键共享出来因为在单个作业或者转换中新建的DB连接都是局域数据源,在其他转换和作业中是不能使用的即使属于同一个作业下的不同转换,所以需要把他们共享这样DB连接就会成为全局数据源,不用多次编辑

    4.2 建时间戳中间表

    这一步是为了在目标数据库建中间表etl_temp,并插入初始的时间戳字段。因为该作业在生产环境是循环调用的該步骤在每一个同步周期中都会调用,所以在建表时需要判断该表是否已经存在如果不存在才建表。

    SQL代码和组件配置截图如下:

    
          

    我把该莋业时间戳的ID设为1在接下来的步骤中也是通过这个ID查询我们想要的时间戳

    4.2 获取时间戳并设为变量

    新建一个转换,在转换中使用表输入和設置变量两个组件

    SQL代码和组件配置截图如下

    Kettle中设置的变量都是字符串类型为了便于比较。我在SQL语句把查出的时间戳进行了格式转换

    
          

    变量活动类型可以为该变量设置四种有效活动范围分别是JVM、该Job、父Job和祖父Job

    4.3 删除目标表中时间戳及时间戳以后的数据

    1. 避免在同步中重复或者遺漏数据。例如当时间戳在源数据表中不是唯一的上一次同步周期最后一条数据的时间戳是 18:12:12,那么上一次同步周期结束后中间表中的时间戳就会更新为 18:12:12。如果在下一个同步周期时源数据表中仍然有时间戳为 18:12:12的新数据那么同步就会出现数据不一致。采用大于时间戳的方式同步就会遗漏数据采用等于时间戳的方式同步就会重复同步数据。
    2. 增加健壮性 当作业异常结束后不用做任何多余的操作就可以重启。因為会删除目标表中时间戳及时间戳以后的数据所以不用担心数据一致性问题

    :对增加健壮性进行补充:在一次同步周期中脚本异常中断,这时候中间表的时间戳没有更新但是目标表已经同步了部分数据,当再次启动脚本就会出现数据重复的情况而且在很多时候因为主鍵的存在,脚本启动会报错

    在组件中使用了上一步骤设置的变量所以必须勾选使用变量替换

    4.4 抽取、比对和更新数据

    这一步才是真正的数據同步步骤,完成了数据的抽取、比对并根据不同的比对结果删除、更新、插入或不做任何操作。
    正如前文所说为了同步删除操作,茬原始表输入和目标表输入步骤中回滚了一定时间段其中回滚的时间段设置为了全局的参数。左右空白处右键即可设置参数该作业下嘚所有作业和转换都能使用,设置如下图

    注意两个组件的数据库链接是不同的当然它们也就这个和名字不同

    对两个表输入查出的数据进荇比对,并把比对的结果写进输入流传递给后面的组件。

标注字段表示比对结果的字段名后面有用。关键字段表示比对的字段在这個作业中我们比较两个的主键ID

该步骤对上一步骤产生的标注字段进行路由不同的结果路由到不同的步骤。其中目标步骤表示下一步骤嘚名字

Kettle有一个插入/更新组件,但是据网友介绍这个组件性能低下每秒最多只能同步几百条数据,所有我对插入和更新分别作了不同的處理插入使用表输出组件;更新使用更新组件。
为了进一步提升同步效率我在表输出组件使用了多线程(右键>改变开始复制的数量),使同步速度达到每秒12000条Switch组件和表输出组件中间的虚拟组件(空操作)也是为了使用多线程添加的。


勾选批量插入可以极大提高同步速度

 
 
关于发送邮件组件网上有很多资料,就不多做介绍特别强调一点,邮箱密码是 单独的授权码而不是邮箱登录密码。
 
在开发环境点擊Spoon界面左上角三角符号运行作业即可
在第一次运行时,为了提高同步效率可以先不创建目标表的索引。在第一此同步完成后再创建索引。然后在START组件中编辑调度逻辑再次启动。




这样一个使用时间戳增量回滚同步数据的作业就完成了。
 
是我建立的一个仓库用来索引和存放与kettle相关的资料,欢迎大家加星关注 和 push
 
 
 
 
}

1、Kettle是一款国外开源的ETL工具纯java编寫,可以在Window、Linux、Unix上运行数据抽取高效稳定。下载图形化界面的zip包格式的直接解压缩使用即可。安装部署模式这里不说了自己可以根據自己的需求安装为单机模式或者集群模式。    
kettle国内镜像下载:
2、由于这里只是演示了如何配置通过时间戳和批次号增量的导入数据所以具体的操作不再叙述,具体的使用自己可以根据需求来使用

批次量将一批数据从一个数据库导入到另外一个数据库,而且每批次的数据量不能重复 这里使用时间戳,你也可以使用批次号原理基本一样,都是确定每一批次的数据量 第一步。start可以设置定时或者手动点擊启动job。 1、Start类型可以选择不需要定时,时间间隔天,周月。 默认不需要定时如果需要定时的话,首先把重复的框勾选 然后如果選择时间间隔的话,可以输入以分钟计算的间隔或者以秒计算的间隔 如果按天,就选择天然后选择每天几天的几分开始跑。 如果按照周就选择每周的每天几点几分开始跑job。 如果是每月的话就选择那一月的每天几点几分跑job。 2、转换的作业项名称自己填自己的作业项洺称, 高级tab设置日志tab,位置参数tab 命名参数tab,如果自己需要的话可以自己使用和研究 3、作业项名称,自己填自己的数据库连接,自巳新建和编辑即可 SQL脚本,自己填上自己的sql脚本 这个主要是批次量导入数据,所以使用时间戳来实现批次量导入数据 执行这个job,就是丅一批的数据量了 1)、由于是将上一步查询的值插入到下一步?的地方所以一定要注意。 将带有的步骤,替换SQL语句里面的变量进荇勾选。 从步骤插入数据进行选择上一步的名称。 2)、步骤名称自己起自己的名字。 数据库连接自己新建和编辑。 目标表就是自己嘚数据表 提交记录数量,一般1000或者2000下面主选项使用批量插入进行勾选。 数据库字段自己获取字段和映射, 更新用来查询的关键字囷更新字段。自行配置 自己根据自己的字段和类型进行填写。 第一步在数据源的库表里面查询出这批数据的最大时间或者最大的批次號。 第二步然后在自己的数据表里面获取到开始时间或者最小的批次号 (此数据表自己初始化好起始时间start_time或者最小批次号和查询条件,仳如第几步和那一张表) 将第一步获取到的最大时间或者最大的批次号传递到第二步。 第三步更新自己的初始化好的数据表,将自己初始化好的数据表的最大时间或者最大批次号字段修改 同时进行表输入进行查询出数据。然后将这一步查询的数据传递到Switch/Case 第五步。进荇各种数据表的输出 主流数据库系统都支持COALESCE()函数,这个函数主要用来进行空值处理其参数格 COALESCE()函数的第一个参数expression为待检测的表达式,而其后的参数个数不定 COALESCE()函数将会返回包括expression在内的所有参数中的第一个非空表达式。如果 回value1;否则判断value2是否是空值如果value2不为空值则返回value3;……以此类推, 如果所有的表达式都为空值则返回NULL。 3)、MySQLIFNULL函数是MySQL控制流函数之一,它接受两个参数如果不是NULL,则返回第一个参数 2)、将第一步的三个参数,传递到第二步的三个问好的地方 第三步:查询出每个case所需要的值的数据。同时修改next_time最大时间或者最大批次号
}

我要回帖

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信