提示:网站建设定制开发文章写完后,网站建设定制开发目录可以自动生成,网站建设定制开发如何生成可参考右边的帮助文档
文章目录
前言
在使用时,网站建设定制开发会因为各种原因(网络波动,系统宕机,程序异常等)导致消息发送失败。rabbitmq也提供了相应的处理机制。
提示:以下是本篇文章正文内容,下面案例可供参考
一、rabbitmq消息发送失败处理机制
生产法发送失败
配置回调器。
yml配置开启确认和返回机制
confirm:发送给exchange时的回调,不管是否成功发送给队列。
return:消息没有发送给exchange时的回调。
#成功发送到exchange时的回调spring.rabbitmq.publisher-confirm-type=correlated#exchange未发送到队列时回调spring.rabbitmq.publisher-returns=true
- 1
- 2
- 3
- 4
回调函数配置方式
@Component@Slf4jpublic class MyCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback { /** * 交换机不管是否收到消息的一个回调方法 * * @param correlationData 消息相关数据 * @param ack 交换机是否收到消息 * @param cause 未收到消息的原因 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { String id = correlationData != null ? correlationData.getId() : ""; if (ack) { log.info("交换机已经收到 id 为:{}的消息", id); } else { //重发处理,可以入库,死信队列处理.... log.info("交换机还未收到 id 为:{}消息,原因:{}", id, cause); } } //当消息无法路由的时候触发回调方法 @Override public void returnedMessage(ReturnedMessage returned) { //重发处理,可以入库,死信队列处理.... log.error("消息:{},被交换机 {} 退回,原因:{},路由key:{},code:{}", new String(returned.getMessage().getBody()), returned.getExchange(), returned.getReplyText(), returned.getRoutingKey(), returned.getReplyCode()); }}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
rabbitmqTemplate在启动时注入:
@Autowired private RabbitTemplate rabbitTemplate; @Autowired private RabbitTemplate.ConfirmCallback confirmCallback; @Autowired private RabbitTemplate.ReturnsCallback returnsCallback; //依赖注入 rabbitTemplate 之后再设置它的回调对象 @PostConstruct public void init() { rabbitTemplate.setConfirmCallback(confirmCallback); rabbitTemplate.setReturnsCallback(returnsCallback); }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
消费者消费失败后处理
通过自动ack+retry配置+私信队列方式实现
spring: rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest listener: simple:# acknowledge-mode: manual # 配置该消费者的ack方式为手动 acknowledge-mode: auto # 配置该消费者的ack方式为自动 default-requeue-rejected: false #设置消费失败后重发 retry: #重发次数 max-attempts: 3 #开启重发 enabled: true # 重试间隔(ms) initial-interval: 5000
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = QUEUE_NAME1, durable = “true”, autoDelete = “false”,
arguments = {@Argument(name = “x-dead-letter-exchange”, value = “dead-exchange”),
@Argument(name = “x-dead-letter-routing-key”, value = “dead-routing-key”),
@Argument(name = “x-message-ttl”, value = “1000”,type = “java.lang.Long”)
}),
exchange = @Exchange(value = “first_exchange”, type = ExchangeTypes.DIRECT),
key = “queue_one_key1”))
public void handleMessage1(Message message, Channel channel) throws IOException {
log.info(“OrderConsumer handleMessage {} , error:”, message);
//模拟消费异常,自动进入私信队列
throw new RuntimeException(“抛出异常,模拟消费失败,触发spring-retry”);
}
/** * 死信队列消费者 * * @param data * @param channel * @throws Exception */@RabbitListener(queues = "dead-queue")public void consumeDL(String data, Channel channel) throws Exception { //处理消费失败的消息 log.info(">>>> 死信队列消费 tag = {},消息内容 : {}", data);
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
// channel.basicNack(tag, false, false);
}
通过手动ack+私信队列实现(不要配置retry!!!)
spring.rabbitmq.publisher-returns=truespring.rabbitmq.listener.simple.acknowledge-mode=manual
- 1
- 2
@RabbitListener(queues = CONFIRM_QUEUE_NAME) public void receiveMsg(String data, Message message, Channel channel) throws IOException { String msg = new String(message.getBody()); log.info("接受到队列 confirm.queue 消息:{}", msg);// throw new RuntimeException("抛出异常,模拟消费失败,触发spring-retry"); //模拟消费失败,重发n次后仍然失败。调用basicNack 抛给私信队列处理 channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false); } //接收消息 @RabbitListener(queues = "QD") public void receiveD(Message message, Channel channel) throws IOException { String msg = new String(message.getBody()); log.info("当前时间:{},收到死信队列信息{}", new Date().toString(), msg); }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
总结
rabbitmq消息失败处理需要谨慎对待,因为容易产生资源消耗殆尽的问题!!!