定制开发RabbitMQ 如何避免消息重复消费?

文章目录


定制开发如何避免消息重复消费?

幂等性

定制开发当消费者消费完消息之后,定制开发通常会发送一个ack定制开发应答确认信息给生产者,定制开发但是这中间有可能因为定制开发网络中断等原因,定制开发导致生产者未能收到确认消息,定制开发由此这条消息将会被 定制开发重复发送给其他消费者进行消费,定制开发实际上这条消息已经被消费过了,定制开发这就是重复消费的问题。

消息幂等性,其实就是保证同一个消息不被消费者重复消费两次

如何避免消息重复消费?

消费者端实现幂等性,意味着消息永远不会消费多次,即使收到了多条一样的消息。通常有两种方式来避免消费重复消费:

  • 方式1: 消息全局 ID 或者写个唯一标识(如时间戳、UUID 等) :每次消费消息之前根据消息 id 去判断该消息是否已消费过,如果已经消费过,则不处理这条消息,否则正常消费消息,并且进行入库操作。(消息全局 ID 作为数据库表的主键,防止重复)

  • 方式2: 利用 Redis 的 setnx 命令:给消息分配一个全局 ID,消费该消息时,先去 Redis 中查询有没消费记录,无则以键值对形式写入 Redis ,有则不消费该消息。

基于本地消息表实现消息幂等性

导入 pom.xml 依赖(公共部分)

<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">    <modelVersion>4.0.0</modelVersion>    <parent>        <groupId>org.springframework.boot</groupId>        <artifactId>spring-boot-starter-parent</artifactId>        <version>2.7.0</version>        <relativePath/> <!-- lookup parent from repository -->    </parent>    <groupId>com.mq</groupId>    <artifactId>rabbitmq</artifactId>    <version>0.0.1-SNAPSHOT</version>    <name>rabbitmq</name>    <description>Demo project for Spring Boot</description>    <properties>        <java.version>1.8</java.version>    </properties>    <dependencies>        <dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter</artifactId>        </dependency>        <dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter-web</artifactId>        </dependency>        <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-amqp -->        <!--springboot rabbit 启动器-->        <dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter-amqp</artifactId>            <version>2.7.0</version>        </dependency>        <!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->        <dependency>            <groupId>org.projectlombok</groupId>            <artifactId>lombok</artifactId>            <version>1.18.24</version>            <scope>provided</scope>        </dependency>        <dependency>            <groupId>mysql</groupId>            <artifactId>mysql-connector-java</artifactId>        </dependency>        <!-- https://mvnrepository.com/artifact/cn.hutool/hutool-all -->        <dependency>            <groupId>cn.hutool</groupId>            <artifactId>hutool-all</artifactId>            <version>5.8.3</version>        </dependency>        <dependency>            <groupId>com.baomidou</groupId>            <artifactId>mybatis-plus-boot-starter</artifactId>            <version>3.4.0</version>        </dependency>        <dependency>            <groupId>com.baomidou</groupId>            <artifactId>mybatis-plus</artifactId>            <version>3.4.0</version>        </dependency>        <dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter-data-redis</artifactId>            <version>2.6.7</version>        </dependency>        <dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter-test</artifactId>            <scope>test</scope>        </dependency>    </dependencies>    <build>        <plugins>            <plugin>                <groupId>org.springframework.boot</groupId>                <artifactId>spring-boot-maven-plugin</artifactId>            </plugin>        </plugins>    </build></project>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86

yml 配置文件(公共部分)

spring:  application:    name: rabbitmq  rabbitmq:    host: 10.0.0.4    port: 5672    username: admin    password: admin    virtual-host: /    publisher-confirm-type: correlated # 开启消息发布确认机制    publisher-returns: true # 发布消息返回监听回调    # 指定消息确认模式    listener:      simple:        acknowledge-mode: manual    # 未正确路由的消息发送到备份队列    # 使用备份交换机模式,mandatory 将无效,即就算 mandatory设 置为 false,路由失败的消息同样会被投递到绑定的备份交换机    template:      mandatory: true  redis:    host: 10.0.0.4    database: 0    port: 6379    timeout: 300ms  datasource:    url: jdbc:mysql://10.0.0.4:3306/Test?useUnicode=true&characterEncoding=utf-8&serverTimezone=Asia/Shanghai    username: root    password: rootserver:  port: 8080# mybatis 插件打印日志,打印查询结果mybatis-plus:  configuration:    log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

创建本地消息表

CREATE TABLE `message_idempotent` (  `message_id` varchar(50) NOT NULL COMMENT '消息ID',  `message_content` varchar(2000) DEFAULT NULL COMMENT '消息内容',  PRIMARY KEY (`message_id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8;
  • 1
  • 2
  • 3
  • 4
  • 5

创建实体类

package com.mq.rabbitmq.vo;import com.baomidou.mybatisplus.extension.activerecord.Model;import lombok.AllArgsConstructor;import lombok.Data;import lombok.NoArgsConstructor;/** * @ClassName MessageIdempotent * @Description TODO * @Author 听秋 * @Date 2022/6/23 16:25 * @Version 1.0 **/@Data@NoArgsConstructor@AllArgsConstructorpublic class MessageIdempotent extends Model<MessageIdempotent> {    private String messageId;    private String messageContent;}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

创建 mapper 接口

package com.mq.rabbitmq.mapper;import com.baomidou.mybatisplus.core.mapper.BaseMapper;import com.mq.rabbitmq.vo.MessageIdempotent;import org.apache.ibatis.annotations.Mapper;/** * @ClassName MessageIdempotentMapper * @Description TODO * @Author 听秋 * @Date 2022/6/23 19:03 * @Version 1.0 **/@Mapperpublic interface MessageIdempotentMapper extends BaseMapper<MessageIdempotent> {}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

创建 RabbitMQ 配置类(公共部分)

package com.mq.rabbitmq.config;import org.springframework.amqp.core.*;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import java.util.HashMap;import java.util.Map;/** * @ClassName DirectConfig * @Description TODO direct 交换机配置类 * @Author 听秋 * @Date 2022/6/17 16:30 * @Version 1.0 **/@Configurationpublic class ExchangeConfig {    /**     * 创建 direct 队列     * */    @Bean    Queue DirectQueue01() {        return new Queue("DirectQueue-01",true);    }    /**     * 创建 direct 交换机     * */    @Bean    DirectExchange DirectExchange01() {        return new DirectExchange("DirectExchange-01");    }    /**     * 绑定 direct 队列和交换机     * */    @Bean    Binding bindingDirect01() {        return BindingBuilder.bind(DirectQueue01()).to(DirectExchange01()).with("DirectRouting01");    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44

自定义消息发送确认的回调(公共部分)

package com.mq.rabbitmq.callback;import org.springframework.amqp.core.ReturnedMessage;import org.springframework.amqp.rabbit.connection.CorrelationData;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;/** * @ClassName MyConfirmCallback * @Description TODO 自定义消息发送确认的回调 * @Author 听秋 * @Date 2022/6/21 19:12 * @Version 1.0 **/@Componentpublic class MyConfirmCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {    @Autowired    private RabbitTemplate rabbitTemplate;    /**     * PostConstruct: 用于在依赖关系注入完成之后需要执行的方法上,以执行任何初始化     */    @PostConstruct    public void init() {        // 指定 ConfirmCallback        rabbitTemplate.setConfirmCallback(this);        // 指定 ReturnCallback        rabbitTemplate.setReturnsCallback(this);    }    /**     * Confirmation callback.     *     * 确认消息是否成功到达交换机中,不管是否到达交换机,该回调都会执行     * 生产者 → 交换机     */    @Override    public void confirm(CorrelationData correlationData, boolean ack, String cause) {        System.out.println("发布消息的UUID为:" + correlationData.getId());        if (ack) {            System.out.println("消息发布成功!");        } else {            System.out.println("消息发布失败!失败原因是:" + cause);        }    }    /**     * Returned message callback.     *     * 确认消息是否从交换机成功到达队列中,失败将会执行,成功则不执行     * 交换机 → 队列     */    @Override    public void returnedMessage(ReturnedMessage returned) {        System.out.println("ReturnsCallback 回调内容:" + returned);    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61

创建生产者(公共部分)

package com.mq.rabbitmq.controller;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.Message;import org.springframework.amqp.core.MessageDeliveryMode;import org.springframework.amqp.core.MessagePostProcessor;import org.springframework.amqp.core.MessageProperties;import org.springframework.amqp.rabbit.connection.CorrelationData;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RestController;import java.util.UUID;/** * @ClassName DirectController * @Description TODO * @Author 听秋 * @Date 2022/6/17 16:48 * @Version 1.0 **/@RestController@Slf4jpublic class QueueController {    @Autowired    private RabbitTemplate rabbitTemplate;    /**     * 消息幂等性     * */    @GetMapping("/sendMessage")    public void sendMessage(String msg, String routingKey, String id) {        MessageProperties messageProperties = new MessageProperties();        messageProperties.setMessageId(id);        messageProperties.setContentType("text/plain");        messageProperties.setContentEncoding("utf-8");        Message message = new Message(msg.getBytes(), messageProperties);        log.info("生产消息:" + message.toString());        // 消息发送确认回调        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());        rabbitTemplate.convertAndSend("DirectExchange-01", routingKey, message, correlationData);    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46

发送消息

控制台


RabbitMQ 队列

创建消费者

package com.mq.rabbitmq.controller;import cn.hutool.core.util.ObjectUtil;import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;import com.mq.rabbitmq.vo.MessageIdempotent;import com.rabbitmq.client.Channel;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.data.redis.core.StringRedisTemplate;import org.springframework.stereotype.Component;import java.io.IOException;import java.nio.charset.StandardCharsets;/** * @ClassName DirectReceiver * @Description TODO * @Author 听秋 * @Date 2022/6/21 10:40 * @Version 1.0 **/@Component@Slf4jpublic class Consumer {    @Autowired    private StringRedisTemplate stringRedisTemplate;	/**     * 基于本地消息表实现消息幂等性     * @param message     */    @RabbitListener(queues = "DirectQueue-01")    public void receiveMessage02(Message message, Channel channel) throws IOException {        String messageId = message.getMessageProperties().getMessageId();        String messageContent = new String(message.getBody(), StandardCharsets.UTF_8);        MessageIdempotent messageIdempotent = new MessageIdempotent();        QueryWrapper<MessageIdempotent> queryWrapper = new QueryWrapper<>();        queryWrapper.eq("message_id", messageId);        MessageIdempotent msg = messageIdempotent.selectOne(queryWrapper);        if (ObjectUtil.isNull(msg)) {            messageIdempotent.setMessageId(messageId);            messageIdempotent.setMessageContent(messageContent);            messageIdempotent.insert();            log.info("DirectQueue-01-消费者收到消息,消息ID:" + messageId + " 消息内容:" + messageContent);            // 消息确认            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);        } else {            log.info("消息 " + messageId + " 已经消费过!");        }    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55

消费消息

控制台

RabbitMQ 队列


数据库

基于 Redis 实现消息幂等性

发送消息

控制台

RabbitMQ 队列

创建消费者

package com.mq.rabbitmq.controller;import cn.hutool.core.util.ObjectUtil;import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;import com.mq.rabbitmq.vo.MessageIdempotent;import com.rabbitmq.client.Channel;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.data.redis.core.StringRedisTemplate;import org.springframework.stereotype.Component;import java.io.IOException;import java.nio.charset.StandardCharsets;/** * @ClassName DirectReceiver * @Description TODO * @Author 听秋 * @Date 2022/6/21 10:40 * @Version 1.0 **/@Component@Slf4jpublic class Consumer {    @Autowired    private StringRedisTemplate stringRedisTemplate;	/**     * 基于 redis 实现消息幂等性     * @param message     */    //@RabbitHandler    //@RabbitListener(queues = "DirectQueue-01")    public void receiveMessage01(Message message, Channel channel) throws IOException {        String messageId = message.getMessageProperties().getMessageId();        String messageContent = new String(message.getBody(), StandardCharsets.UTF_8);        // 消息不存在则创建,返回 true        Boolean exist = stringRedisTemplate.opsForValue().setIfAbsent(messageId, messageContent);        if (!exist) {            log.info("消息 " + messageId + " 已经消费过");        } else {        	// 消息确认            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);            log.info("DirectQueue-01-消费者收到消息,消息ID:" + messageId + " 消息内容:" + messageContent);        }    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52

消费消息

控制台

RabbitMQ 队列

Redis

网站建设定制开发 软件系统开发定制 定制软件开发 软件开发定制 定制app开发 app开发定制 app开发定制公司 电商商城定制开发 定制小程序开发 定制开发小程序 客户管理系统开发定制 定制网站 定制开发 crm开发定制 开发公司 小程序开发定制 定制软件 收款定制开发 企业网站定制开发 定制化开发 android系统定制开发 定制小程序开发费用 定制设计 专注app软件定制开发 软件开发定制定制 知名网站建设定制 软件定制开发供应商 应用系统定制开发 软件系统定制开发 企业管理系统定制开发 系统定制开发