app开发定制公司SpringBoot连接多个RabbitMQ

目 录

1. 前 言

在 SpringBoot app开发定制公司中整合单个 使用,app开发定制公司是很简单的,app开发定制公司只需要引入依赖,app开发定制公司然后在配置里面配置好 MQ app开发定制公司的连接地址、账号、app开发定制公司密码等信息,然后使用即可。但如果 MQ 的连接地址是多个,那这种连接方式就不奏效了。

前段时间,我开发的一个项目就遇到了这样的问题。那个项目,好几个关联方,每个关联方用的 MQ 的地址都不相同,也就意味着我这边要连接几个 RabbbitMQ 地址。SpringBoot 连接多个 RabbitMQ,怎么搞?

使用默认的连接方式是行不通的,我已经试过,而要实现 SpringBoot 连接多个 RabbitMQ,只能自定义一些东西,分别配置才可以,下面一起来走一下试试。

2. 重 写

首先要明确的是,下面的两个类是需要重写的:

  • RabbitTemplate:往队列里面丢消息时,需要用到
  • RabbitAdmin:声明队列、声明交换机、绑定队列和交换机用到

这里,我定义两个关联方,一个是 one,一个是 two,分别重写与它们的连接工厂。

2.1 重写与关联方one的连接工厂

package com.yuhuofei.mq.config;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.AcknowledgeMode;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;import org.springframework.amqp.rabbit.connection.ConnectionFactory;import org.springframework.amqp.rabbit.core.RabbitAdmin;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.amqp.rabbit.support.CorrelationData;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.beans.factory.annotation.Value;import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.context.annotation.Primary;/** * @author yuhuofei * @version 1.0 * @description 重写与关联方one的连接工厂 * @date 2022/10/3 16:57 */@Slf4j@Configurationpublic class OneMQConfig {    @Value("${one.spring.rabbitmq.host}")    private String host;    @Value("${one.spring.rabbitmq.port}")    private int port;    @Value("${one.spring.rabbitmq.username}")    private String username;    @Value("${one.spring.rabbitmq.password}")    private String password;    @Value("${one.spring.rabbitmq.virtual-host}")    private String virtualHost;    /**     * 定义与one的连接工厂     */    @Bean(name = "oneConnectionFactory")    @Primary    public ConnectionFactory oneConnectionFactory() {        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();        connectionFactory.setHost(host);        connectionFactory.setPort(port);        connectionFactory.setUsername(username);        connectionFactory.setPassword(password);        connectionFactory.setVirtualHost(virtualHost);        connectionFactory.setPublisherConfirms(true);        connectionFactory.setPublisherReturns(true);        return connectionFactory;    }    @Bean(name = "oneRabbitTemplate")    @Primary    public RabbitTemplate oneRabbitTemplate(@Qualifier("oneConnectionFactory") ConnectionFactory connectionFactory) {        RabbitTemplate oneRabbitTemplate = new RabbitTemplate(connectionFactory);        oneRabbitTemplate.setMandatory(true);        oneRabbitTemplate.setConnectionFactory(connectionFactory);        oneRabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {            /**             * 确认消息送到交换机(Exchange)回调             * @param correlationData             * @param ack             * @param cause             */            @Override            public void confirm(CorrelationData correlationData, boolean ack, String cause) {                log.info("确认消息送到交换机(Exchange)结果:");                log.info("相关数据:{}", correlationData);                boolean ret = false;                if (ack) {                    log.info("消息发送到交换机成功, 消息 = {}", correlationData.getId());                    //下面可自定义业务逻辑处理,如入库保存信息等                                    } else {                    log.error("消息发送到交换机失败! 消息: {}}; 错误原因:cause: {}", correlationData.getId(), cause);                    //下面可自定义业务逻辑处理,如入库保存信息等                                    }            }        });        oneRabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {            /**             * 只要消息没有投递给指定的队列 就触发这个失败回调             * @param message  投递失败的消息详细信息             * @param replyCode 回复的状态码             * @param replyText 回复的文本内容             * @param exchange 当时这个消息发给那个交换机             * @param routingKey 当时这个消息用那个路由键             */            @Override            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {                //获取消息id                String messageId = message.getMessageProperties().getMessageId();                // 内容                String result = null;                try {                    result = new String(message.getBody(), "UTF-8");                } catch (Exception e) {                    log.error("消息发送失败{}", e);                }                log.error("消息发送失败, 消息ID = {}; 消息内容 = {}", messageId, result);                //下面可自定义业务逻辑处理,如入库保存信息等            }        });        return oneRabbitTemplate;    }    @Bean(name = "oneFactory")    @Primary    public SimpleRabbitListenerContainerFactory oneFactory(@Qualifier("oneConnectionFactory") ConnectionFactory connectionFactory,                                                          SimpleRabbitListenerContainerFactoryConfigurer configurer) {        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);        configurer.configure(factory, connectionFactory);        return factory;    }    @Bean(name = "oneRabbitAdmin")    @Primary    public RabbitAdmin oneRabbitAdmin(@Qualifier("oneConnectionFactory") ConnectionFactory connectionFactory) {        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);        rabbitAdmin.setAutoStartup(true);        return rabbitAdmin;    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141

2.2 重写与关联方two的连接工厂

package com.yuhuofei.mq.config;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.AcknowledgeMode;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;import org.springframework.amqp.rabbit.connection.ConnectionFactory;import org.springframework.amqp.rabbit.core.RabbitAdmin;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.amqp.rabbit.support.CorrelationData;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.beans.factory.annotation.Value;import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;/** * @author yuhuofei * @version 1.0 * @description 重写与关联方two的连接工厂 * @date 2022/10/3 17:52 */@Slf4j@Configurationpublic class TwoMQConfig {    @Value("${two.spring.rabbitmq.host}")    private String host;    @Value("${two.spring.rabbitmq.port}")    private int port;    @Value("${two.spring.rabbitmq.username}")    private String username;    @Value("${two.spring.rabbitmq.password}")    private String password;    @Value("${two.spring.rabbitmq.virtualHost}")    private String virtualHost;    /**     * 定义与two的连接工厂     */    @Bean(name = "twoConnectionFactory")    public ConnectionFactory twoConnectionFactory() {        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();        connectionFactory.setHost(host);        connectionFactory.setPort(port);        connectionFactory.setUsername(username);        connectionFactory.setPassword(password);        connectionFactory.setVirtualHost(virtualHost);        connectionFactory.setPublisherConfirms(true);        connectionFactory.setPublisherReturns(true);        return connectionFactory;    }    @Bean(name = "twoRabbitTemplate")    public RabbitTemplate twoRabbitTemplate(@Qualifier("twoConnectionFactory") ConnectionFactory connectionFactory) {        RabbitTemplate twoRabbitTemplate = new RabbitTemplate(connectionFactory);        twoRabbitTemplate.setMandatory(true);        twoRabbitTemplate.setConnectionFactory(connectionFactory);        twoRabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {            /**             * 确认消息送到交换机(Exchange)回调             * @param correlationData             * @param ack             * @param cause             */            @Override            public void confirm(CorrelationData correlationData, boolean ack, String cause) {                log.info("确认消息送到交换机(Exchange)结果:");                log.info("相关数据:{}", correlationData);                boolean ret = false;                if (ack) {                    log.info("消息发送到交换机成功, 消息 = {}", correlationData.getId());                    //下面可自定义业务逻辑处理,如入库保存信息等                } else {                    log.error("消息发送到交换机失败! 消息: {}}; 错误原因:cause: {}", correlationData.getId(), cause);                    //下面可自定义业务逻辑处理,如入库保存信息等                }            }        });        twoRabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {            /**             * 只要消息没有投递给指定的队列 就触发这个失败回调             * @param message  投递失败的消息详细信息             * @param replyCode 回复的状态码             * @param replyText 回复的文本内容             * @param exchange 当时这个消息发给那个交换机             * @param routingKey 当时这个消息用那个路由键             */            @Override            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {                //获取消息id                String messageId = message.getMessageProperties().getMessageId();                // 内容                String result = null;                try {                    result = new String(message.getBody(), "UTF-8");                } catch (Exception e) {                    log.error("消息发送失败{}", e);                }                log.error("消息发送失败, 消息ID = {}; 消息内容 = {}", messageId, result);                //下面可自定义业务逻辑处理,如入库保存信息等            }        });        return twoRabbitTemplate;    }    @Bean(name = "twoFactory")    public SimpleRabbitListenerContainerFactory twoFactory(@Qualifier("twoConnectionFactory") ConnectionFactory connectionFactory,                                                           SimpleRabbitListenerContainerFactoryConfigurer configurer) {        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);        configurer.configure(factory, connectionFactory);        return factory;    }    @Bean(name = "twoRabbitAdmin")    public RabbitAdmin twoRabbitAdmin(@Qualifier("twoConnectionFactory") ConnectionFactory connectionFactory) {        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);        rabbitAdmin.setAutoStartup(true);        return rabbitAdmin;    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133

2.3 创建队列及交换机并绑定

package com.yuhuofei.mq.config;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.Queue;import org.springframework.amqp.core.TopicExchange;import org.springframework.amqp.rabbit.core.RabbitAdmin;import org.springframework.beans.factory.annotation.Value;import org.springframework.context.annotation.Configuration;import javax.annotation.PostConstruct;import javax.annotation.Resource;/** * @author yuhuofei * @version 1.0 * @description 创建队列、交换机并绑定 * @date 2022/10/3 18:15 */public class QueueConfig {    @Resource(name = "oneRabbitAdmin")    private RabbitAdmin oneRabbitAdmin;    @Resource(name = "twoRabbitAdmin")    private RabbitAdmin twoRabbitAdmin;    @Value("${one.out.queue}")    private String oneOutQueue;    @Value("${one.out.queue}")    private String oneRoutingKey;    @Value("${two.output.queue}")    private String twoOutQueue;    @Value("${two.output.queue}")    private String twoRoutingKey;    @Value("${one.topic.exchange.name}")    private String oneTopicExchange;    @Value("${two.topic.exchange.name}")    private String twoTopicExchange;    @PostConstruct    public void oneRabbitInit() {        //声明交换机        oneRabbitAdmin.declareExchange(new TopicExchange(oneTopicExchange, true, false));        //声明队列        oneRabbitAdmin.declareQueue(new Queue(oneOutQueue, true, false, false));        //绑定队列及交换机        oneRabbitAdmin.declareBinding(BindingBuilder.bind(new Queue(oneOutQueue, true, false, false))                .to(new TopicExchange(oneTopicExchange, true, false))                .with(oneRoutingKey));    }    @PostConstruct    public void twoRabbitInit() {        //声明交换机        twoRabbitAdmin.declareExchange(new TopicExchange(twoTopicExchange, true, false));        //声明队列        twoRabbitAdmin.declareQueue(new Queue(twoOutQueue, true));        //绑定队列及交换机        twoRabbitAdmin.declareBinding(BindingBuilder.bind(new Queue(twoOutQueue, true, false, false))                .to(new TopicExchange(twoTopicExchange, true, false))                .with(twoRoutingKey));    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70

2.4 配置信息

这里的配置信息,需要与各自的关联方约定好再配置

# 与关联方one的MQ配置one.spring.rabbitmq.host=one.mq.comone.spring.rabbitmq.port=5672one.spring.rabbitmq.username=xxxxxone.spring.rabbitmq.password=xxxxxone.spring.rabbitmq.virtual-host=/xxxxxone.out.queue=xxxaa.ssssd.cffs.xxxxone.topic.exchange.name=oneTopExchange# 与关联方two的MQ配置two.spring.rabbitmq.host=two.mq.comtwo.spring.rabbitmq.port=5672two.spring.rabbitmq.username=aaaaaaatwo.spring.rabbitmq.password=aaaaaaatwo.spring.rabbitmq.virtualHost=/aaaaaaatwo.out.queue=ddddd.sssss.hhhhh.eeeetwo.topic.exchange.name=twoTopExchange
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

2.5 注意点

在连接多个 MQ 的情况下,需要在某个连接加上 @Primary 注解(见 2.1 中的代码),表示主连接,默认使用这个连接,如果不加,服务会起不来

3. 使 用

3.1 作为消费者

由于在前面的 2.3 中,声明了队列及交换机,并进行了绑定,那么作为消费者,监听相应的队列,获取关联方发送的消息进行处理即可。这里用监听关联方 one 的出队列做展示,two 的类似。

需要注意的地方是,在监听队列时,需要指定 ContainerFactory。

package com.yuhuofei.mq.service;import com.rabbitmq.client.Channel;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Service;import java.nio.charset.StandardCharsets;/** * @author yuhuofei * @version 1.0 * @description 监听关联方one的消息 * @date 2022/10/3 18:38 */@Slf4j@Servicepublic class OneReceive {    @RabbitListener(queues = "${one.out.queue}", containerFactory = "oneFactory")    public void listenOne(Message message, Channel channel) {        //获取MQ返回的数据        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);        String data = new String(message.getBody(), StandardCharsets.UTF_8);        log.info("MQ返回的数据:{}", data);        //下面进行业务逻辑处理            }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

3.1 作为生产者

使用之前重写的 RabbitTemplate ,向各个关联方指定的队列发送消息。

package com.yuhuofei.mq.service;import com.google.gson.JsonObject;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.Message;import org.springframework.amqp.core.MessageProperties;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.amqp.rabbit.support.CorrelationData;import org.springframework.stereotype.Service;import javax.annotation.Resource;/** * @author yuhuofei * @version 1.0 * @description 向关联方的队列发送消息 * @date 2022/10/3 18:47 */@Slf4j@Servicepublic class SendMessage {    @Resource(name = "oneRabbitTemplate")    private RabbitTemplate oneRabbitTemplate;    @Resource(name = "twoRabbitTemplate")    private RabbitTemplate twoRabbitTemplate;    public void sendToOneMessage(String messageId, OneMessageConverter message) {        String exchange = message.getExchange();        String routingKey = message.getRoutingKey();        JsonObject data = message.getData();        MessageProperties messageProperties = new MessageProperties();        messageProperties.setContentType("application/json");        Message info = new Message(data.toString().getBytes(), messageProperties);        info.getMessageProperties().setMessageId(messageId);        oneRabbitTemplate.convertAndSend(exchange, routingKey, info, new CorrelationData(messageId));    }    public void sendToTwoMessage(String messageId, TwoMessageConverter message) {        String exchange = message.getExchange();        String routingKey = message.getRoutingKey();        JsonObject data = message.getData();        MessageProperties messageProperties = new MessageProperties();        messageProperties.setContentType("application/json");        Message info = new Message(data.toString().getBytes(), messageProperties);        info.getMessageProperties().setMessageId(messageId);        twoRabbitTemplate.convertAndSend(exchange, routingKey, info, new CorrelationData(messageId));    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
网站建设定制开发 软件系统开发定制 定制软件开发 软件开发定制 定制app开发 app开发定制 app开发定制公司 电商商城定制开发 定制小程序开发 定制开发小程序 客户管理系统开发定制 定制网站 定制开发 crm开发定制 开发公司 小程序开发定制 定制软件 收款定制开发 企业网站定制开发 定制化开发 android系统定制开发 定制小程序开发费用 定制设计 专注app软件定制开发 软件开发定制定制 知名网站建设定制 软件定制开发供应商 应用系统定制开发 软件系统定制开发 企业管理系统定制开发 系统定制开发