app开发定制Kafka之enable.auto.commit使用解析

app开发定制通过字面意思我们不难app开发定制理解这是的自动提交功能。

app开发定制配置消费者(配置ENABLE_AUTO_COMMIT_CONFIG为 true 配置自动提交)

enable.auto.commit 的默认值是 true;就是默认采用自动提交的机制。

auto.commit.interval.ms 的默认值是 5000,单位是毫秒。

此时我们配置消息消费后自动提交offset 位置

  1. @Bean
  2. public KafkaConsumer<String, String> kafkaConsumer() {
  3. Map<String, Object> config = new HashMap<>();
  4. config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
  5. config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
  6. config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
  7. config.put(ConsumerConfig.GROUP_ID_CONFIG, "groupId");
  8. config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
  9. KafkaConsumer<String, String> consumer = new KafkaConsumer<>(config);
  10. return consumer;
  11. }

配置消息监听

  1. @Slf4j
  2. @Component
  3. public class PackageMessageConsumer {
  4. @Autowired
  5. KafkaConsumer<String, String> kafkaConsumer;
  6. @Autowired
  7. EventProcessMaster eventProcessMaster;
  8. @PostConstruct
  9. public void listenRealTimeGroup() {
  10. new Thread(() -> consumer()).start();
  11. }
  12. private void consumer() {
  13. List<String> topics = new ArrayList<>();
  14. topics.add("test-kafka-auto.commit");
  15. kafkaConsumer.subscribe(topics);
  16. while(true) {
  17. try {
  18. // 拉取消息时间间隔 ms
  19. ConsumerRecords<String, String> records = kafkaConsumer.poll(10);
  20. for (ConsumerRecord<String, String> record : records) {
  21. String key = record.key();
  22. Object content = record.value();
  23. eventProcessMaster.processRequest(new Event(record.topic(), key, content));
  24. }
  25. } catch (Exception e){
  26. log.error(e.getMessage());
  27. }
  28. }
  29. }
  30. }

配置自动提交offset 位置之后,我们不必关心消息消费到了什么位置,当程序重启后,消息也不会重复消费;

配置消费者(配置ENABLE_AUTO_COMMIT_CONFIG为 false 配置手动提交)

手动提交顾名思义就是每次我们消费后,kafka不会手动更新offset 位置,同时 auto.commit.interval.ms 也就不被再考虑了

  1. @Bean
  2. public KafkaConsumer<String, String> kafkaConsumer() {
  3. Map<String, Object> config = new HashMap<>();
  4. config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
  5. config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
  6. config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
  7. config.put(ConsumerConfig.GROUP_ID_CONFIG, "groupId");
  8. // 与自动提交的区别
  9. config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
  10. KafkaConsumer<String, String> consumer = new KafkaConsumer<>(config);
  11. return consumer;
  12. }

当我们设置成手动提交后,不修改其他代码会发现每次重启程序,kafka都会将我们没清理的所有数据都重新消费一遍,与我们需求的幂等性不符,将代码进行完善

  1. @Bean
  2. public KafkaConsumer<String, String> kafkaConsumer() {
  3. Map<String, Object> config = new HashMap<>();
  4. config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
  5. config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
  6. config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
  7. config.put(ConsumerConfig.GROUP_ID_CONFIG, "groupId");
  8. // 与自动提交的区别
  9. config.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");// 自动提交时间间隔
  10. config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
  11. KafkaConsumer<String, String> consumer = new KafkaConsumer<>(config);
  12. return consumer;
  13. }

  1. @Slf4j
  2. @Component
  3. public class DependPackageMessageConsumer {
  4. @Autowired
  5. KafkaConsumer<String, String> kafkaConsumer;
  6. @Autowired
  7. EventProcessMaster eventProcessMaster;
  8. @PostConstruct
  9. public void listenRealTimeGroup() {
  10. new Thread(() -> consumer()).start();
  11. }
  12. private void consumer() {
  13. List<String> topics = new ArrayList<>();
  14. topics.add("test-kafka-auto.commit");
  15. kafkaConsumer.subscribe(topics);
  16. while(true) {
  17. try {
  18. ConsumerRecords<String, String> records = kafkaConsumer.poll(10);
  19. for (ConsumerRecord<String, String> record : records) {
  20. String key = record.key();
  21. Object content = record.value();
  22. eventProcessMaster.processRequest(new Event(record.topic(), key, content));
  23. }
  24. // 手动提交 offset 位置
  25. kafkaConsumer.commitSync();
  26. } catch (Exception e){
  27. log.error(e.getMessage());
  28. }
  29. }
  30. }
  31. }

加上手动确认后服务重启,每次都会从上次offset 确认的位置开始消费

网站建设定制开发 软件系统开发定制 定制软件开发 软件开发定制 定制app开发 app开发定制 app开发定制公司 电商商城定制开发 定制小程序开发 定制开发小程序 客户管理系统开发定制 定制网站 定制开发 crm开发定制 开发公司 小程序开发定制 定制软件 收款定制开发 企业网站定制开发 定制化开发 android系统定制开发 定制小程序开发费用 定制设计 专注app软件定制开发 软件开发定制定制 知名网站建设定制 软件定制开发供应商 应用系统定制开发 软件系统定制开发 企业管理系统定制开发 系统定制开发