欢迎来到无限飞翔,在这里,你会找到许多有趣的技术 : )

RocketMQ & Kafka 消息消费与消息重试

开发者头条 415℃

一、RocketMQ

保证消费成功

PushConsumer 为了保证消息肯定消费成功,只有使用方明确表示消费成功,RocketMQ 才会认为消息消费成功。中途断电,抛出异常等都不会认为成功——即都会重新投递。

业务实现消费回调的时候,当且仅当此回调函数返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS,RocketMQ 才会认为这批消息(默认是1条)是消费完成的。

如果这时候消息消费失败,只要返回ConsumeConcurrentlyStatus.RECONSUME_LATER,RocketMQ 就会认为这批消息消费失败了。如果业务的回调没有处理好而抛出异常,会认为是消费失败当ConsumeConcurrentlyStatus.RECONSUME_LATER处理。

为了保证消息是肯定被至少消费成功一次,RocketMQ 会把这批消息重发回 Broker(topic 不是原 topic 而是这个消费租的 RETRY topic),在延迟的某个时间点(默认是10秒,业务可设置)后,再次投递到这个 ConsumerGroup。而如果一直这样重复消费都持续失败到一定次数(默认16次),就会投递到 DLQ 死信队列。应用可以监控死信队列来做人工干预。

启动的时候从哪里消费

当新实例启动的时候,PushConsumer 会拿到本消费组 broker 已经记录好的消费进度(consumer offset),按照这个进度发起自己的第一次 Pull 请求。

如果这个消费进度在 Broker 并没有存储起来,证明这个是一个全新的消费组,这时候客户端有几个策略可以选择:

  • CONSUME_FROM_LAST_OFFSET:默认策略,从该队列最尾开始消费,即跳过历史消息

  • CONSUME_FROM_FIRST_OFFSET:从队列最开始开始消费,即历史消息(还储存在 broker 的)全部消费一遍

  • CONSUME_FROM_TIMESTAMP:从某个时间点开始消费,和setConsumeTimestamp()配合使用,默认是半个小时以前

消息 ACK 机制

RocketMQ 是以consumer group+queue为单位是管理消费进度的,以一个 consumer offset 标记这个这个消费组在这条 queue 上的消费进度。如果某已存在的消费组出现了新消费实例的时候,依靠这个组的消费进度,就可以判断第一次是从哪里开始拉取的。

每次消息成功后,本地的消费进度会被更新,然后由定时器定时同步到 broker,以此持久化消费进度。但是每次记录消费进度的时候,只会把一批消息中最小的 offset 值为消费进度值。

这钟方式和传统的一条 message 单独 ack 的方式有本质的区别。性能上提升的同时,会带来一个潜在的重复问题——由于消费进度只是记录了一个下标,就可能出现拉取了100条消息如 2101-2200的消息,后面99条都消费结束了,只有2101消费一直没有结束的情况。

在这种情况下,RocketMQ 为了保证消息肯定被消费成功,消费进度只能维持在2101,直到2101也消费结束了,本地的消费进度才能标记2200消费结束了。

在这种设计下,就有消费大量重复的风险。如2101在还没有消费完成的时候消费实例突然退出(机器断电,或者被 kill)。这条 queue 的消费进度还是维持在2101,当 queue 重新分配给新的实例的时候,新的实例从 broker 上拿到的消费进度还是维持在2101,这时候就会又从2101开始消费,2102-2200这批消息实际上已经被消费过还是会投递一次。

对于这个场景,RocketMQ 暂时无能为力,所以业务必须要保证消息消费的幂等性,这也是 RocketMQ 官方多次强调的态度。

消息重试机制

首先,我们需要明确,只有当消费模式为集群模式时,Broker 才会自动进行重试,对于广播消息是不会重试的。集群消费模式下,当消息消费失败,RocketMQ 会通过消息重试机制重新投递消息,努力使该消息消费成功。

死信的业务处理方式

默认的处理机制中,如果我们只对消息做重复消费,达到最大重试次数之后消息就进入死信队列了。RocketMQ 的处理方式为将达到最大重试次数(16 次)的消息标记为死信消息,将该死信消息投递到 DLQ 死信队列中,业务需要进行人工干预。

二、Kafka

拉取循环

Kafka 对外暴露了一个非常简洁的 poll 方法,其内部实现了协作、分区重平衡、心跳、数据拉取等功能。

另外需要提醒的是,消费者对象不是线程安全的,也就是不能够多个线程同时使用一个消费者对象;而且也不能够一个线程有多个消费者对象。简而言之,一个线程一个消费者,如果需要多个消费者那么请使用多线程来进行一一对应。

提交(commit)与位移(offset)

当我们调用poll()时,该方法会返回我们没有消费的消息。当消息从 broker 返回消费者时,broker 并不跟踪这些消息是否被消费者接收到;Kafka 让消费者自身来管理消费的位移,并向消费者提供更新位移的接口,这种更新位移方式称为提交(commit)。

在正常情况下,消费者会发送分区的提交信息到 Kafka,Kafka 进行记录。当消费者宕机或者新消费者加入时,Kafka 会进行重平衡,这会导致消费者负责之前并不属于它的分区。重平衡完成后,消费者会重新获取分区的位移,下面来看下两种有意思的情况。

假如一个消费者在重平衡前后都负责某个分区,如果提交位移比之前实际处理的消息位移要小,那么会导致消息重复消费,如下所示:

假如在重平衡前某个消费者拉取分区消息,在进行消息处理前提交了位移,但还没完成处理宕机了,然后 Kafka 进行重平衡,新的消费者负责此分区并读取提交位移,此时会“丢失”消息,如下所示:

对于所有消费者消费失败的消息,rocketMQ 都会把重试的消息 重新 new 出来,然后投递到主题SCHEDULE_TOPIC_XXXX下的队列中,然后由定时任务进行调度重试,同时为了保证消息可被找到,也会将原先的 topic 存储到 properties 中。

消费重试与死信队列

Kafka 没有重试机制不支持消息重试,也没有死信队列,因此使用 Kafka 做消息队列时,如果遇到了消息在业务处理时出现异常,就会很难进行下一步处理。应对这种场景,需要自己实现消息重试的功能。

Reference

https://blog.csdn.net/linuxheik/article/details/79579329

https://blog.csdn.net/intelrain/article/details/80449501

https://gitbook.cn/books/5d340810c43fe20aeadc88db/index.html

转载请注明:无限飞翔 » RocketMQ & Kafka 消息消费与消息重试

喜欢 (0)or分享 (0)