软件开发定制【kafka异常】使用Spring-kafka遇到的坑

软件开发定制查看一下压缩策略

bin/-topics.sh --describe --zookeeper xxxx:2181 --topic SHI_TOPIC1

Topic:SHI_TOPIC1 PartitionCount:1 ReplicationFactor:1 Configs:cleanup.policy=compact

Topic: SHI_TOPIC1 Partition: 0 Leader: 0 Replicas: 0 Isr: 0

Configs:cleanup.policy=compact :

软件开发定制然后再检查一下自己发软件开发定制送消息的时候是不是没有传 key

[参考链接](()

问题堆栈信息

org..kafka.listener.ListenerExecutionFailedException: invokeHandler Failed;

nested exception is java.lang.IllegalStateException: No Acknowledgment available as an argument,

the listener must have a MANUAL AckMode to populate the Acknowledgment.;

nested exception is java.lang.IllegalStateException:

No Acknowledgment available as an argument, the listener container must have a MANUAL AckMode to populate the Acknowledgment.

问题原因

解决方案

问题堆栈信息

Failed to start bean ‘org.springframework.kafka.config.internalKafkaListenerEndpointRegistry’; nested exception is java.lang.IllegalStateException: Consumer cannot be configured for auto for ackMode MANUAL_IMMEDIATE

问题原因

不能再配置中既配置kafka.consumer.enable-auto-commit=true 自动提交; 然后又在监听器中使用手动提交

例如:

kafka.consumer.enable-auto-commit=true

@Autowired

private ConsumerFactory consumerFactory;

@Bean

public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaManualAckListenerContainerFactory() {

ConcurrentKafkaListenerContainerFactory<Integer, String> factory =

new ConcurrentKafkaListenerContainerFactory<>();

factory.setConsumerFactory(consumerFactory);

//设置提交偏移量的方式 当Acknowledgment.acknowledge()侦听器调用该方法时,立即提交偏移量

factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);

return factory;

}

/**

  • 手动ack 提交记录

  • @param data

  • @param ack

  • @throws InterruptedException

*/

@KafkaListener(id = “consumer-id2”,topics = “SHI_TOPIC1”,concurrency = “1”,

clientIdPrefix = “myClientId2”,containerFactory = “kafkaManualAckListenerContainerFactory”)

public void consumer2(String data, Acknowledgment ack) {

log.info(“consumer-id2-手动ack,提交记录,data:{}”,data);

ack.acknowledge();

}

解决方法:

将自动提交关掉,或者去掉手动提交;

如果你想他们都同时存在,某些情况自动提交;某些情况手动提交; 那你创建 一个新的

consumerFactory 将它的是否自动提交设置为false;比如

@Configuration

@EnableKafka

public class KafkaConfig {

@Autowired

private KafkaProperties properties;

/**

  • 创建一个新的消费者工厂

  • 创建多个工厂的时候 SpringBoot就不会自动帮忙创建工厂了;所以默认的还是自己创建一下

  • @return

*/

@Bean

public ConsumerFactory<Object, Object> kafkaConsumerFactory() {

Map<String, Object> map = properties.buildConsumerProperties();

DefaultKafkaConsumerFactory<Object, Object> factory = new DefaultKafkaConsumerFactory<>( map);

return factory;

}

/**

  • 创建一个新的消费者工厂

  • 但是修改为不自动提交

  • @return

*/

@Bean

public ConsumerFactory<Object, Object> kafkaManualConsumerFactory() {

Map<String, Object> map = properties.buildConsumerProperties();

map.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);

DefaultKafkaConsumerFactory<Object, Object> factory = new DefaultKafkaConsumerFactory<>( map);

return factory;

}

/**

  • 手动提交的监听器工厂 (使用的消费组工厂必须 kafka.consumer.enable-auto-commit = false)

  • @return

*/

@Bean

public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaManualAckListenerContainerFactory() {

ConcurrentKafkaListenerContainerFactory<Integer, String> factory =

new ConcurrentKafkaListenerContainerFactory<>();

factory.setConsumerFactory(kafkaManualConsumerFactory());

//设置 《一线大厂Java面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义》无偿开源 威信搜索公众号【编程进阶路】 提交偏移量的方式 当Acknowledgment.acknowledge()侦听器调用该方法时,立即提交偏移量

factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);

return factory;

}

网站建设定制开发 软件系统开发定制 定制软件开发 软件开发定制 定制app开发 app开发定制 app开发定制公司 电商商城定制开发 定制小程序开发 定制开发小程序 客户管理系统开发定制 定制网站 定制开发 crm开发定制 开发公司 小程序开发定制 定制软件 收款定制开发 企业网站定制开发 定制化开发 android系统定制开发 定制小程序开发费用 定制设计 专注app软件定制开发 软件开发定制定制 知名网站建设定制 软件定制开发供应商 应用系统定制开发 软件系统定制开发 企业管理系统定制开发 系统定制开发