软件系统定制开发SpringBoot集成RabbitMq

一、初识MQ

软件系统定制开发是一个开源的消息代理软件系统定制开发和队列服务器,软件系统定制开发用来通过普通协议在完软件系统定制开发全不同的应用之间共享数据,RabbitMQ是使用Erlang软件系统定制开发语言来编写的,软件系统定制开发并且是基于AMQP协议的。

AMQP:Advanced Message Queuing Protocol,高级协议。软件系统定制开发是具有现代特征的二进制协议,软件系统定制开发是一个提供统一消息服软件系统定制开发务的应用层标准高级消软件系统定制开发息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。

1、同步调用的问题

2、方案

优势一:服务解耦

优势二:性能提升,提高
优势三:服务没有强依赖,不担心级联失败问题
优势四:流量消峰

异步通讯的缺点 :

  • 依赖于Broker的可靠性,安全性,吞吐能力
  • 架构复杂了,业务没有明显的流程线,不好追踪管理

3、各种MQ的对比

二、RabbitMQ入门

1、基本概念


核心概念

1.1、Message

消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key (路由键)、priority (相对于其他消息的优先权)、delivery-mode (指出该消息可能需要持久性存储)等。

1.2、Publisher

消息的生产者,也是一个向交换器发布消息的客户端应用程序。

1.3、Exchange

交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的。
Exchange有4种类型: direct(默认), fanout, topic,和headers,不同类型的Exchange转发消息的策略有所区别

1.4、Queue

消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。

1.5、Binding

绑定,用于消息队列和交换器之间的关联。一个绑定就是基 于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。
Exchange和Queue的绑定可以是多对多的关系。

1.6、Connection

网络连接,比如一个TCP连接。

1.7、Channel

信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内的虚拟连接,AMQP命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁TCP都是非常昂贵的开销,所以引入了信道的概念,以复用一条TCP连接。

1.8、Consumer

消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。

1.9、Virtual Host

虚拟主机,表示-一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个vhost本质上就是一个mini版的RabbitMQ服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是AMQP概念的基础,必须在连接时指定,
  RabbitMQ默认的vhost是/。

1.10、Broker

表示消息队列服务器实体

2、安装

2、常见消息模型

2.1、基本消息类型


三、SpringAMQP

1、什么是SpringAMQP


2、简单示例

2.1、引入AMQP依赖

<!--AMQP依赖,包含RabbitMQ--><dependency>    <groupId>org.springframework.boot</groupId>    <artifactId>spring-boot-starter-amqp</artifactId></dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

2.2、配置文件

spring:  rabbitmq:    addresses: 192.168.100.120:5672,192.168.100.121:5672,192.168.100.122:5672    username: admin    password: admin    #开启消息确认模式,新版本已经弃用    #publisher-confirms: true    #开启消息送达提示    publisher-returns: true    # springboot.rabbitmq.publisher-confirm 新版本已被弃用,现在使用 spring.rabbitmq.publisher-confirm-type = correlated 实现相同效果    publisher-confirm-type: correlated    virtual-host: /    listener:      type: simple      simple:        acknowledge-mode: auto #确认模式        prefetch: 1 #限制每次发送一条数据。        concurrency: 3 #同一个队列启动几个消费者        max-concurrency: 3 #启动消费者最大数量        #重试策略相关配置        retry:          # 开启消费者(程序出现异常)重试机制,默认开启并一直重试          enabled: true          # 最大重试次数          max-attempts: 5          # 重试间隔时间(毫秒)          initial-interval: 3000
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

spring.rabbitmq.publisher-confirm-type新版发布确认属性有三种确认类型

/** * The type of publisher confirms to use. */public enum ConfirmType {	/**	 * Use {@code RabbitTemplate#waitForConfirms()} (or {@code waitForConfirmsOrDie()}	 * within scoped operations.	 */	SIMPLE,	/**	 * Use with {@code CorrelationData} to correlate confirmations with sent	 * messsages.	 */	CORRELATED,	/**	 * Publisher confirms are disabled (default).	 */	NONE}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

NONE值是禁用发布确认模式,是默认值
CORRELATED值是发布消息成功到交换器后会触发回调方法,如1示例
SIMPLE值经测试有两种效果,其一效果和CORRELATED值一样会触发回调方法,其二在发布消息成功后使用rabbitTemplate调用waitForConfirms或waitForConfirmsOrDie方法等待broker节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是waitForConfirmsOrDie方法如果返回false则会关闭channel,则接下来无法发送消息到broker;
————————————————
版权声明:本文为CSDN博主「OkidoGreen」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/z69183787/article/details/109371628

2.3、发送消息

在publisher服务中新建一个测试类,编写测试方法

2.4、消费消息

3、WorkQueue工作队列

3.1、模拟WorkQueue实现一个队列绑定多个消费者

3.2、消息发送

3.3、消息接受

3.4、消费预期机制prefetch

配置一次只能取一条,处理完才能取下一条

spring:  rabbitmq:    host: 192.168.100.120    port: 5672    username: admin    password: admin    #开启消息确认模式,新版本已经弃用    #publisher-confirms: true    #开启消息送达提示    publisher-returns: true    # springboot.rabbitmq.publisher-confirm 新版本已被弃用,现在使用 spring.rabbitmq.publisher-confirm-type = correlated 实现相同效果    publisher-confirm-type: correlated    virtual-host: /    listener:      simple:        prefetch: 1 #限制每次发送一条数据。
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

4、发布和订阅模型

4.1、概念

4.2、Fanout广播类型

4.2.1、示例

4.2.2、声明绑定



4.2.3、消费者绑定

4.2.4、生产者发送消息

4.3、DriectExchage路由类型

通过routeKey可以实现Fanout广播类型

4.3.1、示例

4.3.2、申明Exchange、Queue

@Configurationpublic class RabbitDirectConfig {    @Bean    public Queue directQueue(){        // 参数介绍        // 1.队列名 2.是否持久化 3.是否独占 4.自动删除 5.其他参数        return new Queue("directQueue-One",true,false,false,null);    }    @Bean    public Queue directQueue2(){        // 参数介绍        // 1.队列名 2.是否持久化 3.是否独占 4.自动删除 5.其他参数        return new Queue("directQueue-Two",true,false,false,null);    }    @Bean    public DirectExchange directExchange(){        // 参数介绍        // 1.交换器名 2.是否持久化 3.自动删除 4.其他参数        return new DirectExchange("MqSendService-One",true,false,null);    }    @Bean    public Binding bingExchange(){        // 绑定队列        return BindingBuilder.bind(directQueue2())                 // 队列绑定到哪个交换器                .to(directExchange())                // 绑定路由key,必须指定                .with("One");    }    @Bean    public Binding bingExchange2(){		// 绑定队列        return BindingBuilder.bind(directQueue2())                 // 队列绑定到哪个交换器                .to(directExchange())                // 绑定路由key,必须指定                .with("Two");    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45

4.3.3、在consumer消费者服务监听

4.3.4、总结

4.4、TopicExchange话题广播类型

4.4.1、示例

4.4.2、申明

@Configurationpublic class RabbitTopicConfig {    @Bean    public Queue queue(){        // 参数介绍        // 1.队列名 2.是否持久化 3.是否独占 4.自动删除 5.其他参数        return new Queue("simple.queue",true,false,false,null);    }    @Bean    public TopicExchange topicExchange(){        // 参数介绍        // 1.交换器名 2.是否持久化 3.自动删除 4.其他参数        return new TopicExchange("amq.topic",true,false,null);    }    @Bean    public Binding bingExchange(){        // 绑定队列        return BindingBuilder.bind(queue())                // 队列绑定到哪个交换器                .to(topicExchange())                // 绑定路由key,必须指定                .with("simple.#");    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

4.4.3、生产者发送

5、消息转换器

spring的消息对象处理是有org.springframework.amqp.support.converter.MessageConverter来处理的,而默认实现是SpringMessageConverter,基于JDK的ObjectOutputStream完成序列化。
如果需要修改只需要定义一个MessageConverter类型的bean即可,推荐用JSON方式序列化,步骤如下

5.1、引入jackson的依赖

<dependency>    <groupId>com.fasterxml.jackson.dataformat</groupId>    <artifactId>jackson-dataformat-xml</artifactId>    <version>2.9.10</version></dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

5.2、申明Bean

@Beanpublic MessageConverter jsonMessageConverter(){	return new Jackson2JsonMessageConverter();}
  • 1
  • 2
  • 3
  • 4

网站建设定制开发 软件系统开发定制 定制软件开发 软件开发定制 定制app开发 app开发定制 app开发定制公司 电商商城定制开发 定制小程序开发 定制开发小程序 客户管理系统开发定制 定制网站 定制开发 crm开发定制 开发公司 小程序开发定制 定制软件 收款定制开发 企业网站定制开发 定制化开发 android系统定制开发 定制小程序开发费用 定制设计 专注app软件定制开发 软件开发定制定制 知名网站建设定制 软件定制开发供应商 应用系统定制开发 软件系统定制开发 企业管理系统定制开发 系统定制开发