kafka kafka多进程消费重启,会接着上次的地方开始消费吗

本类文章总排行版
本类文章本月排行
本类文章本周排行1144人阅读
sarama(1)
kafka(2)
golang(1)
Kafka Confusion
(1) consumer与partition是一对多的关系
consumer group里的一个consumer可以消费多个partition,但是一个partition只能被一个consumer消费,因为kafka的设计是不允许在一个partition上并发的。为了提高消费的并发,必须增加partition的数量。
如果consumer比partition多,会导致某些consumer无法消费到partition。所以要让partition的数量大于等于consumer的数量。合理的部署方案是,起初将partition数量尽量分配得多一些(通常是2的整数次幂),以后为了扩展就直接增加consumer的数量。
当partition数量多于consumer数量时,kafka会尽量均衡每个consumer消费的partition数量,以此达到每个consumer负载均衡的目的。参考以下这句话(来自于kafka官方文档 ):
The consumers in a group divide up the partitions as fairly as possible, each partition is consumed by exactly one consumer in a consumer group.
以下链接对此做了实验:
</Shopify/sarama 是go语言实现的kafka client库。
(1) SyncProducer是对AsyncProducer作的封装
SyncProducer每发送出去一条消息,就等待返回结果,然后再发下一条。因此SyncProducer不支持批量发送。
(2) AsyncProducer的Input()和Successes()的阻塞问题
AsyncProducer的两个方法:Input()返回用来写入消息的channel,Successes()返回用来收集发送成功的消息的channel(Errors()用来收集发送失败的消息)。
应用程序可以用一个goroutine不断地向Input()写入消息,用另一个goroutine从Successes()和Errors()里读取发送结果,以此实现异步发送。而这两个操作也可以写在一个for-select里:
case producer.Input() &- message:
// do something
case message = &- producer.Successes():
// do something
case err := &- producer.Errors()
// do something
只有sarama.Config.Producer.Return.Successes设置为true,才可以从producer.Successes()里读取。而且,如果该参数为true,必须读取producer.Successes(),否则producer.successes channel就满,进而导致producer.input channel也满,然后写producer.Input()的时候就阻塞了。
3.consumer group
</wvanbergen/kafka/consumergroup 是基于sarama开发的对consumer group的扩展。sarama目前不支持consumer group功能。
(1) consumergroup包是将offset提交到zookeeper上,而不是kafka上。 它在初始化(join group)时需要zookeeper地址。
(2) 配置消费起始点
配置里有两个参数与此有关:
Offsets.Initial
: 从最旧还是最新的消息开始消费,只能是sarama.OffsetOldest或sarama.OffsetNewest。
Offsets.ResetOffsets : bool类型。如果程序重启,true表示不从上次中断的位置消费,false表示从上次中断的位置消费。
pdos-server里需要设置Offsets.Initial=sarama.OffsetNewest,Offsets.ResetOffsets=true。
参考知识库
* 以上用户言论只代表其个人观点,不代表CSDN网站的观点或立场
访问:43697次
排名:千里之外
原创:14篇
(1)(3)(2)(3)(1)(5)(1)(4)logstash的kafka插件使用 - 推酷
logstash的kafka插件使用
关于logstash可以产看其
,对于英文有障碍的人士,或是想知道更多插件使用技巧的用户请移步
,本片内容已经并入其中相关章节.
Logstash-kafka简介
插件已经正式合并进官方仓库,以下使用介绍基于
logstash 1.4相关版本
,1.5及以后版本的使用后续依照官方文档持续更新。
插件本身内容非常简单,其主要依赖同一作者写的
模块。需要注意的是:
该模块仅支持 Kafka-0.8 版本。如果是使用 0.7 版本 kafka 的,将无法直接使 jruby-kafka 该模块和 logstash-kafka 插件。
安装按照官方文档完全自动化的安装.或是可以通过以下方式手动自己安装插件,不过重点注意的是
kafka 的版本
,上面已经指出了。
下载 logstash 并解压重命名为
./logstash-1.4.0
文件目录。
下载 kafka 相关组件,以下示例选的为
,并解压重命名为
./kafka_2.8.0-0.8.1.1
下载 logstash-kafka v0.4.2 从
,并解压重命名为
./logstash-kafka-0.4.2
./kafka_2.8.0-0.8.1.1/libs
目录下复制所有的 jar 文件拷贝到
./logstash-1.4.0/vendor/jar/kafka_2.8.0-0.8.1.1/libs
下,其中你需要创建
kafka_2.8.0-0.8.1.1/libs
相关文件夹及目录。
./logstash-kafka-0.4.2/logstash
,拷贝到对应的
./logstash-1.4.0/lib/logstash
对应目录下。
./logstash-1.4.0
目录下,现在需要运行 logstash-kafka 的 gembag.rb 脚本去安装 jruby-kafka 库,执行以下命令:
GEM_HOME=vendor/bundle/jruby/1.9 GEM_PATH= java -jar vendor/jar/jruby-complete-1.7.11.jar --1.9 ../logstash-kafka-0.4.2/gembag.rb ../logstash-kafka-0.4.2/logstash-kafka.gemspec
现在可以使用 logstash-kafka 插件运行 logstash 了。例如:
bin/logstash agent -f logstash.conf
Input 配置示例
以下配置可以实现对 kafka 读取端(consumer)的基本使用。
消费端更多详细的配置请查看
kafka 官方文档的消费者部分配置文档。
zk_connect =& &localhost:2181&
group_id =& &logstash&
topic_id =& &test&
reset_beginning =& false # boolean (optional), default: false
consumer_threads =& 5
# number (optional), default: 1
decorate_events =& true # boolean (optional), default: false
Input 解释
消费端的一些比较有用的配置项:
消费者分组,可以通过组 ID 去指定,不同的组之间消费是相互不受影响的,相互隔离。
指定消费话题,也是必填项目,指定消费某个
,这个其实就是订阅某个主题,然后去消费。
reset_beginning
logstash 启动后从什么位置开始读取数据,默认是结束位置,也就是说 logstash 进程会以从上次读取结束时的偏移量开始继续读取,如果之前没有消费过,那么就开始从头读取.如果你是要导入原有数据,把这个设定改成 &true&, logstash 进程就从头开始读取.有点类似
,但是读到最后一行不会终止,而是变成
,继续监听相应数据。
decorate_events
在输出消息的时候会输出自身的信息包括:消费消息的大小, topic 来源以及 consumer 的 group 信息。
rebalance_max_retries
当有新的 consumer(logstash) 加入到同一 group 时,将会
,此后将会有
partitions
的消费端迁移到新的
上,如果一个
获得了某个
的消费权限,那么它将会向
Partition Owner registry
节点信息,但是有可能此时旧的
尚没有释放此节点,此值用于控制,注册节点的重试次数。
consumer_timeout_ms
指定时间内没有消息到达就抛出异常,一般不需要改。
以上是相对重要参数的使用示例,更多参数可以选项可以跟据
查看 input 默认参数。
1.想要使用多个 logstash 端协同消费同一个
的话,那么需要把两个或是多个 logstash 消费端配置成相同的
, 但是前提是要把
相应的 topic 分多个 partitions (区)
,多个消费者消费是无法保证消息的消费顺序性的。
这里解释下,为什么要分多个
partitions(区)
, kafka 的消息模型是对 topic 分区以达到分布式效果。每个
下的不同的
partitions (区)
只能有一个
去消费。所以只有多个分区后才能启动多个消费者,对应不同的区去消费。其中协调消费部分是由 server 端协调而成。不必使用者考虑太多。只是
消息的消费则是无序的
总结:保证消息的顺序,那就用一个
kafka 的每个 partition 只能同时被同一个 group 中的一个 consumer 消费
Output 配置
以下配置可以实现对 kafka 写入端 (producer) 的基本使用。
生产端更多详细的配置请查看
kafka 官方文档的生产者部分配置文档。
broker_list =& &localhost:9092&
topic_id =& &test&
compression_codec =& &snappy& # string (optional), one of [&none&, &gzip&, &snappy&], default: &none&
Output 解释
生产的可设置性还是很多的,设置其实更多,以下是更多的设置:
compression_codec
消息的压缩模式,默认是 none,可以有 gzip 和 snappy (暂时还未测试开启压缩与不开启的性能,数据传输大小等对比)。
compressed_topics
可以针对特定的 topic 进行压缩,设置这个参数为
进行压缩。
request_required_acks
消息的确认模式:
可以设置为 0: 生产者不等待 broker 的回应,只管发送.会有最低能的延迟和最差的保证性(在服务器失败后会导致信息丢失)
可以设置为 1: 生产者会收到 leader 的回应在 leader 写入之后.(在当前 leader 服务器为复制前失败可能会导致信息丢失)
可以设置为 -1: 生产者会收到 leader 的回应在全部拷贝完成之后。
partitioner_class
分区的策略,默认是 hash 取模
send_buffer_bytes
socket 的缓存大小设置,其实就是缓冲区的大小
消息模式相关
serializer_class
消息体的系列化处理类,转化为字节流进行传输,
请注意 encoder 必须和下面的
key_serializer_class
使用相同的类型
key_serializer_class
默认的是与
serializer_class
producer_type
生产者的类型
异步执行消息的发送
同步执行消息的发送
queue_buffering_max_ms
异步模式下,那么就会在设置的时间缓存消息,并一次性发送
queue_buffering_max_messages
异步的模式下,最长等待的消息数
queue_enqueue_timeout_ms
异步模式下,进入队列的等待时间,若是设置为0,那么要么进入队列,要么直接抛弃
batch_num_messages
异步模式下,每次发送的最大消息数,前提是触发了
queue_buffering_max_messages
queue_enqueue_timeout_ms
以上是相对重要参数的使用示例,更多参数可以选项可以跟据
查看 output 默认参数。
默认情况下,插件是使用 json 编码来输入和输出相应的消息,消息传递过程中 logstash 默认会为消息编码内加入相应的时间戳和 hostname 等信息。如果不想要以上信息(一般做消息转发的情况下),可以使用以下配置,例如:
codec =& plain {
format =& &%{message}&
已发表评论数()
请填写推刊名
描述不能大于100个字符!
权限设置: 公开
仅自己可见
正文不准确
标题不准确
排版有问题
主题不准确
没有分页内容
图片无法显示
视频无法显示
与原文不一致}

我要回帖

更多关于 kafka指定offset消费 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信