0%

理解 Kafka 消费者属性的 enable.auto.commit

前言

理解一下Kafka的读的自动提交功能。

找到了一篇专门介绍这个功能的文章,选择主要的内容进行一下翻译和做笔记。

正文

Understanding the ‘enable.auto.commit’ Kafka Consumer property

img

Kafka Consumers read messages from a Kafka topic, its not a hard concept to get your head around. But behind the scenes there’s a lot more going on than meets the eye.

Say we’re consuming messages from a Topic and our Consumer crashes. Once we realise that the world isn’t ending, we recover from the crash and we start consuming again. We start receiving messages exactly where we left off from, its kinda neat.

假设我们正在从一个 Topic 中消费消息,这个时候我们的这个消费者(客户端)宕机了。我们意识到这不是世界的末日,我们可以从宕机中恢复,重新开始消费。我们可以从我们上一次离开的地方重新接收消息,这非常灵巧。

There’s two reasons as to why this happens. One is something referred to as the “Offset” and the other is a couple of default Consumer values.

发生这样的事情是因为两个原因。一个是一个叫 “Offset” 的东西,另外一个是一些 Consumer 的默认的值。

So whats an Offset?

The Offset is a piece of metadata, an integer value that continually increases for each message that is received in a partition. Each message will have a unique Offset value in a partition.

Offset 是一块元数据,一个整数,会针对每一个 partition 上接收到的消息而持续增长。每一个消息在一个 partition 上将会有唯一的一个Offset。

img

I use Keys in some of my projects, some of them I don’t ;)

So as you can see here, each message has a unique Offset, and that Offset represents the position of that message in that particular partition.

上面介绍了一下Kafka的offset是什么,offset是记录每条消息在partition里面的位置的。

When a Consumer reads the messages from the Partition it lets Kafka know the Offset of the last consumed message. This Offset is stored in a Topic named _consumer_offsets, in doing this a consumer can stop and restart without forgetting which messages it has consumed.

这里讲,offset会被存在一个叫做**_consumer_offsets**的主题中,这样来帮助消费者记录处理到哪里了。

When we create our Consumers, they have a set of default properties which we can override or we can just leave the default values in effect.

There are two properties that are driving this behaviour.

有两个属性需要关注。

1
2
3
4
5
enable.auto.commit



auto.commit.interval.ms

The first property enable.auto.commit has a default value of true and the second property auto.commit.interval.ms has a default value of 5000. These values are correct for Blizzards node-rdkafka client and the Java KafkaConsumer client but other libraries may differ.

enable.auto.commit 的默认值是 true;就是默认采用自动提交的机制。

auto.commit.interval.ms 的默认值是 5000,单位是毫秒。

So by default every 5 seconds a Consumer is going to commit its Offset to Kafka or every time data is fetched from the specified Topic it will commit the latest Offset.

这样,默认5秒钟,一个 Consumer 将会提交它的 Offset 给 Kafka,或者每一次数据从指定的 Topic 取回时,将会提交最后一次的 Offset。

Now in some scenarios this is the ideal behaviour but on other scenarios its not.

这样,在某些场景下,这是理想的表现,但是在其他场景下,并不是。

Say our Consumer is processing a message with an Offset of 100 and whilst processing it the Consumer fetches some more data, the Offset is commit and then the Consumer crashes. Upon coming back up it will start consuming messages from the most recent committed Offset, but how can we safely say that we haven’t lost messages and the Offset of the new message isn’t later then the one of the message been processed?

这么说,我们的 Consumer 正在消费一个 Offset 是100的消息,同时这个 Consumer 取回了一些数据,这个 Offset 提交了,然后 Consumer 崩溃了。在我们回来的时候,我们会重新从最新提交的 Offset 去进行消息的消费,但是我们如何能安全地说,我们没有丢失消息,并且这个新消息的 Offset 不会比刚刚被处理的那个消息靠后呢?

What we can do is commit the Offset of messages manually after processing them. This give us full control over when we consider a message dealt with, processed and ready to let Kafka know that.

解决这个问题的方案就是我们手动地提交这个 Offset,在处理完这些消息之后。这给与了我们完全的控制,什么时候去处理一个消息,什么时候去让 Kafka 知道这个。

Firstly we have to change the value of the enable.auto.commit property.

1
enable.auto.commit: false

When we change this property the auto.commit.interval.ms value isnt taken into consideration.

So now we can commit our Offset manually after the processing has taken place and if the Consumer crashes whilst processing a message it will start consuming from that same Offset, no messages lost.

我们把这个参数设置为 false ,就会由我们自己手动地来处理这个事情。

Both the clients mentioned earlier in this article have methods exposed to commit the Offset.

For further reading on the clients check out the links below.

如果 enable.auto.commit 设置成 false,那么 auto.commit.interval.ms 也就不被再考虑了

JSDoc: Class: KafkaConsumer
KafkaConsumer class for reading messages from Kafka This is the main entry point for reading data from Kafka. You…blizzard.github.io

KafkaConsumer (kafka 0.10.2.1 API)
To avoid this, we will manually commit the offsets only after the corresponding records have been inserted into the…kafka.apache.org

If anyone wants any more information on Kafka or Consumers get in touch on Twitter.

Cheers,

Danny

https://twitter.com/danieljameskay

参考

https://medium.com/@danieljameskay/understanding-the-enable-auto-commit-kafka-consumer-property-12fa0ade7b65

https://kafka.apache.org/0102/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html 这里是官网介绍如何使用consumer

Welcome to my other publishing channels