目录
什么是
RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue 定制软件开发高级消息队列协议 )定制软件开发的开源实现,定制软件开发能够实现定制软件开发异步消息处理
RabbitMQ定制软件开发是一个消息代理:定制软件开发它接受和转发消息。
定制软件开发你可以把它想象成一个邮局:定制软件开发当你把你想要发布的邮定制软件开发件放在邮箱中时,定制软件开发你可以确定邮差先生最定制软件开发终将邮件发送给你的收件人。定制软件开发在这个比喻中,RabbitMQ定制软件开发是邮政信箱,定制软件开发邮局和邮递员。
RabbitMQ定制软件开发和邮局的主要区别在于定制软件开发它不处理纸张,而是接受,定制软件开发存储和转发二进制数据块
优点:异步消息处理
业务解耦(定制软件开发下订单操作:扣减库存、生成订单、发红包、发短信),定制软件开发将下单操作主流程:扣减库存、定制软件开发生成订单然后通过MQ定制软件开发消息队列完成通知,发红包、发短信
错峰流控 (通知量 消息量 定制软件开发订单量大的情况实现MQ定制软件开发消息队列机制,定制软件开发淡季情况下访问量会少)
定制软件开发灵活的路由(Flexible Routing)
定制软件开发在消息进入队列之前,通过 Exchange 定制软件开发来路由消息的。定制软件开发对于典型的路由功能,RabbitMQ 定制软件开发已经提供了一些内置的 Exchange 来实现。定制软件开发针对更复杂的路由功能,定制软件开发可以将多个 Exchange 定制软件开发绑定在一起,定制软件开发也通过插件机制实现自己的 Exchange 。
RabbitMQ定制软件开发网站端口号:15672
定制软件开发程序里面实现的端口为:5672
使用docker安装RabbitMQ,定制软件开发如果没有使用过的可以定制软件开发看这篇文章
1.拉取RabbitMQ镜像
docker pull rabbitmq:management
2.运行RabbitMQ镜像
docker run -itd --name rabbit01 --hostname myrabbit -e RABBITMQ_DEFAULT_USER=chenjinxian -e RABBITMQ_DEFAULT_PASS=chenjinxian -p 15672:15672 -p 5672:5672 -p 25672:25672 rabbitmq:management
注意:RABBITMQ_DEFAULT_USER=chenjinxian -e RABBITMQ_DEFAULT_PASS=chenjinxian
定制软件开发这里设置的是(RABBITMQ_DEFAULT_USER)定制软件开发登录的账号和( RABBITMQ_DEFAULT_PASS)密码,定制软件开发根据自身来修改
定制软件开发这里看到容器已经开启成功了,定制软件开发然后就可以使用了
3.定制软件开发通过浏览器打开
定制软件开发如果你使用的是本地虚拟机,定制软件开发那么你直接使用虚拟机显示的ipv4定制软件开发地址加端口号就可以访问了;
定制软件开发如果你使用的是云服务器,定制软件开发那么你需要在对应服务器(阿里云,腾讯云等)定制软件开发的安全组中开放15672
端口,定制软件开发并且在防火墙中也开放15672端口
定制软件开发显示如上图那么就可以定制软件开发开始使用了
定制软件开发然后通过命令进入rabbitmq容器
docker exec -it rabbit01 /bin/bash
定制软件开发授权账号和密码
rabbitmqctl add_user admin admin
定制软件开发设置用户分配操作权限
rabbitmqctl set_user_tags admin administrator
用户级别:
-
administrator:定制软件开发可以登录控制台、查看所有信息、可以对 rabbitmq进行管理
-
monitoring:监控者 登录控制台,查看所有信息
-
policymaker:策略制定者 登录控制台,指定策略
-
managment 普通管理员 登录控制台
为用户添加资源权限
rabbitmqctl set_permissions -p / admin ".*"".*"".*"
也可以在界面操作进行添加用户
RabbitMQ支持的消息模型
1.简单模式 Simple
2.工作模式 Work
3.发布订阅模式
4.路由模式
5.主题 Topic模式
6.参数模式
7.出版商确认模式
1.入门案例
1. RabbitMQ入门案例 - Simple 简单模式
jdk1.8
构建一个 maven工程
定义生产者
定义消费者
观察消息的在 rabbitmq-server服务中的进程
01 构建一个maven工程
02 导入依赖
- <dependencies>
- <!--导入rabbitmq的依赖-->
- <dependency>
- <groupId>com.rabbitmq</groupId>
- <artifactId>amqp-client</artifactId>
- <version>5.13.0</version>
- </dependency>
-
- </dependencies>
3.代码编写
在上图的模型中,有以下概念:
生产者,也就是要发送消息的程序
消费者:消息的接受者,会一直等待消息到来。
消息队列:图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。
生产者
- package com.chen.rabbitmq.simple;
-
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import com.rabbitmq.client.MessageProperties;
-
- /**
- * @description: 简单模式Simple
- */
- public class Producer {
-
-
- public static void main(String[] args) {
-
- // 所有的中间件技术都是基于tcp/ip协议基础之上构建新型的协议规范,只不过rabbitmq遵循的是amqp
- // ip port
-
- // 1: 创建连接工程
- ConnectionFactory connectionFactory = new ConnectionFactory();
- connectionFactory.setHost("128.197.157.151");
- connectionFactory.setPort(5672);
- connectionFactory.setUsername("chenjinxian");//rabbitmq登录的账号
- connectionFactory.setPassword("chenjinxian");//rabbitmq登录的密码
- connectionFactory.setVirtualHost("/");
-
- //springboot ---rabbitmq
-
- Connection connection = null;
- Channel channel = null;
- try {
- // 2: 创建连接Connection Rabbitmq为什么是基于channel去处理而不是链接? 长连接----信道channel
- connection = connectionFactory.newConnection("生成者");
- // 3: 通过连接获取通道Channel
- channel = connection.createChannel();
- // 4: 通过通创建交换机,声明队列,绑定关系,路由key,发送消息,和接收消息
- String queueName = "queue1";
-
- /*
- * @params1 队列的名称
- * @params2 是否要持久化durable=false 所谓持久化消息是否存盘,如果false 非持久化 true是持久化? 非持久化会存盘吗? 会存盘,但是会随从重启服务会丢失。
- * @params3 排他性,是否是独占独立
- * @params4 是否自动删除,随着最后一个消费者消息完毕消息以后是否把队列自动删除
- * @params5 携带附属参数
- */
- channel.queueDeclare(queueName, true, false, false, null);
- // 5: 准备消息内容
- String message = "Hello chenjinxian!!!";
- // 6: 发送消息给队列queue
- // @params1: 交换机 @params2 队列、路由key @params 消息的状态控制 @params4 消息主题
- // 面试题:可以存在没有交换机的队列吗?不可能,虽然没有指定交换机但是一定会存在一个默认的交换机。
- channel.basicPublish("", queueName, null, message.getBytes());
-
- System.out.println("消息发送成功!!!");
- } catch (Exception ex) {
- ex.printStackTrace();
- } finally {
- // 7: 关闭通道
- if (channel != null && channel.isOpen()) {
- try {
- channel.close();
- } catch (Exception ex) {
- ex.printStackTrace();
- }
- }
- // 8: 关闭连接
-
- if (connection != null && connection.isOpen()) {
- try {
- connection.close();
- } catch (Exception ex) {
- ex.printStackTrace();
- }
- }
- }
-
-
- }
- }
消费者
- package com.chen.rabbitmq.simple;
-
- import com.rabbitmq.client.*;
- import java.io.IOException;
-
- public class Consumer {
-
-
- public static void main(String[] args) {
-
- // 所有的中间件技术都是基于tcp/ip协议基础之上构建新型的协议规范,只不过rabbitmq遵循的是amqp
- // ip port
-
- // 1: 创建连接工程
- ConnectionFactory connectionFactory = new ConnectionFactory();
- connectionFactory.setHost("128.197.157.151");//服务器IP
- connectionFactory.setPort(5672);
- connectionFactory.setUsername("chenjinxian");
- connectionFactory.setPassword("chenjinxian");
- connectionFactory.setVirtualHost("/");
-
- Connection connection = null;
- Channel channel = null;
- try {
- // 2: 创建连接Connection
- connection = connectionFactory.newConnection("消费者");
- // 3: 通过连接获取通道Channel
- channel = connection.createChannel();
- // 4: 通过通创建交换机,声明队列,绑定关系,路由key,发送消息,和接收消息
-
-
- // true = ack 正常的逻辑是没问题 死循环 rabbit 重发策略
- // false = nack 消息这在消费消息的时候可能会异常和故障
- final Channel channel2 = channel;
- channel2.basicConsume("queue1", false, new DeliverCallback() {
- public void handle(String consumerTag, Delivery message) throws IOException {
- try {
- System.out.println("收到消息是" + new String(message.getBody(), "UTF-8"));
- channel2.basicAck(message.getEnvelope().getDeliveryTag(),false);
- }catch (Exception ex){
- ex.printStackTrace();
- // 三次确认 -- reject + sixin
- }
-
- }
- }, new CancelCallback() {
- public void handle(String consumerTag) throws IOException {
- System.out.println("接受失败了...");
- }
- });
-
- System.out.println("开始接受消息");
- System.in.read();
-
- } catch (Exception ex) {
- ex.printStackTrace();
- } finally {
- // 7: 关闭通道
- if (channel != null && channel.isOpen()) {
- try {
- channel.close();
- } catch (Exception ex) {
- ex.printStackTrace();
- }
- }
- // 8: 关闭连接
-
- if (connection != null && connection.isOpen()) {
- try {
- connection.close();
- } catch (Exception ex) {
- ex.printStackTrace();
- }
- }
- }
-
-
- }
- }
2. 什么是AMQP
01 什么是AMQP
AMQP全称:Advanced Message Queuing Protocol(高级消息队列协议)。是应用层协议的一个开发标准,为面向消息的中间件设计
02 AMQP生产者流转过程
03 AMQP消费者流转过程
3. RabbitMQ的核心组成部分
01 RabbitMQ的核心组成部分
核心概念: 核心概念:
Server :又称Broker ,接受客户端的连接,实现AMQP实体服务。安装rabbitmq-serverConnection:连接,应用程序与Broker的网络连接TCP/IP/三次握手和四次挥手 服务器:又称Broker,接受客户端的连接,实现AMQP实体服务。安装Rabbitmq-serverConnection:连接,应用程序与Broker的网络连接tcp/ip/三次握手和四次挥手
Channel:网络信道,几乎所有的操作都在Channel中进行,Channel是进 息读写的通道,客户端可以建立对恪Channel,每个Channel代表一个会话任务。 频道:网络信道,几乎所有的操作都在频道中进行频道,是进息读写的通道,客户端可以建立对恪频道频道,每个频道代表一个会话任务频道。
Message :消息:服务与应用程序之间传送的数据,由Properties和body组成,Properties可是对消息进行修饰,如消息的优先级,延迟等高级特性,Body则就是消息体的内容。 消息:消息:服务与应用程序之间传送的数据,由Properties和Body组成,Properties可是对消息进行修饰,如消息的优先级,延迟等高级特性,Body则就是消息体的内容。
Virtual Host虚拟地址,用于进行逻辑隔离,最上层的消息路由,一个虚拟主机理由可以有若干个Exhange和Queueu,同一个虚拟主机里面不能有相同名字的Exchange 虚拟主机虚拟地址,用于进行逻辑隔离,最上层的消息路由,一个虚拟主机理由可以有若干个Exhange和Queueu,同一个虚拟主机里面不能有相同名字的Exchange
Exchange :交换机,接受消息,根据路由键发送消息到绑定的队列。(==不具备消息存储的能力==)Bindings : Exchange和Queue之间的虚拟连接,binding中可以保护多个routing key. 交换:交换机,接受消息,根据路由键发送消息到绑定的队列.(=不具备消息存储的能力==)绑定:Exchange和Queue之间的虚拟连接,Binding中可以保护多个路由密钥。
Routing key :是一个路由规则,虚拟机可以用它来确定如何路由一个特疋消恳.bttos:/bloq.csdn.net/qg _4485823(Queue:队列:也成为Message Queue,消息队列,保存消息并将它们转发给消费苦。"gwa" 路由密钥:是一个路由规则,虚拟机可以用它来确定如何路由一个特征消息(队列:队列:也成为消息队列,消息队列,保存消息并将它们转发给消费者.
02 RabbitMQ整体架构是什么样子的?
03 RabbitMQ的运行流程
4. RabbitMQ入门案例 - fanout 模式
01 RabbitMQ的模式之发布订阅模式
发布订阅模式的具体实现
-
类型:fanout
-
特点:Fanout - 发布与订阅模式,是一种广播机制,它是没有路由 key的模式
(注意这里已经在可视化界面让队列绑定了交换机)
生产者
- package com.chen.rabbitmq.fanout;
-
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
-
- /**
- 发布订阅模式的具体实现
- 类型:fanout
- 特点:Fanout - 发布与订阅模式,是一种广播机制,它是没有路由 key的模式
- */
- public class Producer {
- public static void main(String[] args) {
- // 1: 创建连接工厂
- ConnectionFactory connectionFactory = new ConnectionFactory();
- // 2: 设置连接属性
- connectionFactory.setHost("128.156.157.161");
- connectionFactory.setPort(5672);
- connectionFactory.setVirtualHost("/");
- connectionFactory.setUsername("chenjinxian");
- connectionFactory.setPassword("chenjinxian");
- Connection connection = null;
- Channel channel = null;
- try {
- // 3: 从连接工厂中获取连接
- connection = connectionFactory.newConnection("生产者");
- // 4: 从连接中获取通道channel
- channel = connection.createChannel();
-
- // 5: 准备发送消息的内容
- String message = "hello xuexi!!!";
-
- // 6:准备交换机
- String exchangeName = "fanout_change";
-
- // 8: 指定交换机的类型
- String type = "fanout";
- // 7: 发送消息给中间件rabbitmq-server
- // @params1: 交换机exchange
- // @params2: 队列名称/routingkey
- // @params3: 属性配置
- // @params4: 发送消息的内容
- // #.course.* queue3
- // *.order.# queue2 ta
- // com.order.course.xxx collecion
- channel.basicPublish(exchangeName,"", null, message.getBytes());
-
-
- System.out.println("消息发送成功!");
- } catch (Exception ex) {
- ex.printStackTrace();
- System.out.println("发送消息出现异常...");
- } finally {
-
- // 7: 释放连接关闭通道
- if (channel != null && channel.isOpen()) {
- try {
- channel.close();
- } catch (Exception ex) {
- ex.printStackTrace();
- }
- }
- if (connection != null) {
- try {
- connection.close();
- } catch (Exception ex) {
- ex.printStackTrace();
- }
- }
- }
- }
- }
消费者
- package com.chen.rabbitmq.fanout;
-
- import com.rabbitmq.client.*;
-
- import java.io.IOException;
-
- /**
- 发布订阅模式的具体实现
- 类型:fanout
- 特点:Fanout - 发布与订阅模式,是一种广播机制,它是没有路由 key的模式
- */
- public class Consumer {
-
- private static Runnable runnable = new Runnable() {
- public void run() {
- // 1: 创建连接工厂
- ConnectionFactory connectionFactory = new ConnectionFactory();
- // 2: 设置连接属性
- connectionFactory.setHost("128.156.157.151");
- connectionFactory.setPort(5672);
- connectionFactory.setVirtualHost("/");
- connectionFactory.setUsername("chenjinxian");
- connectionFactory.setPassword("chenjinxian");
- //获取队列的名称
- final String queueName = Thread.currentThread().getName();
- Connection connection = null;
- Channel channel = null;
- try {
- // 3: 从连接工厂中获取连接
- connection = connectionFactory.newConnection("生产者");
- // 4: 从连接中获取通道channel
- channel = connection.createChannel();
- // 5: 申明队列queue存储消息
- /*
- * 如果队列不存在,则会创建
- * Rabbitmq不允许创建两个相同的队列名称,否则会报错。
- *
- * @params1: queue 队列的名称
- * @params2: durable 队列是否持久化
- * @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
- * @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。
- * @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。
- * */
- // 这里如果queue已经被创建过一次了,可以不需要定义
- //channel.queueDeclare("queue1", false, false, false, null);
- // 6: 定义接受消息的回调
- Channel finalChannel = channel;
- finalChannel.basicConsume(queueName, true, new DeliverCallback() {
- @Override
- public void handle(String s, Delivery delivery) throws IOException {
- System.out.println(queueName + ":收到消息是:" + new String(delivery.getBody(), "UTF-8"));
- }
- }, new CancelCallback() {
- @Override
- public void handle(String s) throws IOException {
- }
- });
- System.out.println(queueName + ":开始接受消息");
- System.in.read();
- } catch (Exception ex) {
- ex.printStackTrace();
- System.out.println("发送消息出现异常...");
- } finally {
- // 7: 释放连接关闭通道
- if (channel != null && channel.isOpen()) {
- try {
- channel.close();
- } catch (Exception ex) {
- ex.printStackTrace();
- }
- }
- if (connection != null && connection.isOpen()) {
- try {
- connection.close();
- } catch (Exception ex) {
- ex.printStackTrace();
- }
- }
- }
- }
- };
-
-
-
- public static void main(String[] args) {
- // 启动三个线程去执行
- new Thread(runnable, "queue1").start();
- new Thread(runnable, "queue2").start();
- new Thread(runnable, "queue3").start();
- new Thread(runnable, "queue4").start();
- //new Thread(runnable, "queue5").start();
- }
- }
5. RabbitMQ入门案例 - Direct 模式
(注意这里已经在可视化界面让队列绑定了交换机)
生产者
- package com.chen.rabbitmq.routing;
-
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
-
- /**
- Direct 模式
- */
- public class Producer {
- public static void main(String[] args) {
- // 1: 创建连接工厂
- ConnectionFactory connectionFactory = new ConnectionFactory();
- // 2: 设置连接属性
- connectionFactory.setHost("128.176.157.151");
- connectionFactory.setPort(5672);
- connectionFactory.setVirtualHost("/");
- connectionFactory.setUsername("chenjinxian");
- connectionFactory.setPassword("chenjinxian");
- Connection connection = null;
- Channel channel = null;
- try {
- // 3: 从连接工厂中获取连接
- connection = connectionFactory.newConnection("生产者");
- // 4: 从连接中获取通道channel
- channel = connection.createChannel();
-
- // 5: 准备发送消息的内容
- String message = "hello direct_exchange!!!";
-
- // 6:准备交换机
- String exchangeName = "direct_exchange";
- // 7: 定义路由key
- String routeKey = "email";
- // 8: 指定交换机的类型
- String type = "direct";
- // 7: 发送消息给中间件rabbitmq-server
- // @params1: 交换机exchange
- // @params2: 队列名称/routingkey
- // @params3: 属性配置
- // @params4: 发送消息的内容
- // #.course.* queue3
- // *.order.# queue2 ta
- // com.order.course.xxx collecion
- channel.basicPublish(exchangeName, routeKey, null, message.getBytes());
- System.out.println("消息发送成功!");
- } catch (Exception ex) {
- ex.printStackTrace();
- System.out.println("发送消息出现异常...");
- } finally {
- // 7: 释放连接关闭通道
- if (channel != null && channel.isOpen()) {
- try {
- channel.close();
- } catch (Exception ex) {
- ex.printStackTrace();
- }
- }
- if (connection != null) {
- try {
- connection.close();
- } catch (Exception ex) {
- ex.printStackTrace();
- }
- }
- }
- }
- }
消费者
- package com.chen.rabbitmq.routing;
-
- import com.rabbitmq.client.*;
-
- import java.io.IOException;
-
- /**
- Direct 模式
- */
- public class Consumer {
-
- private static Runnable runnable = new Runnable() {
- public void run() {
- // 1: 创建连接工厂
- ConnectionFactory connectionFactory = new ConnectionFactory();
- // 2: 设置连接属性
- connectionFactory.setHost("123.156.147.151");
- connectionFactory.setPort(5672);
- connectionFactory.setVirtualHost("/");
- connectionFactory.setUsername("chenjinxian");
- connectionFactory.setPassword("chenjinxian");
- //获取队列的名称
- final String queueName = Thread.currentThread().getName();
- Connection connection = null;
- Channel channel = null;
- try {
- // 3: 从连接工厂中获取连接
- connection = connectionFactory.newConnection("生产者");
- // 4: 从连接中获取通道channel
- channel = connection.createChannel();
- // 5: 申明队列queue存储消息
- /*
- * 如果队列不存在,则会创建
- * Rabbitmq不允许创建两个相同的队列名称,否则会报错。
- *
- * @params1: queue 队列的名称
- * @params2: durable 队列是否持久化
- * @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
- * @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。
- * @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。
- * */
- // 这里如果queue已经被创建过一次了,可以不需要定义
- //channel.queueDeclare("queue1", false, false, false, null);
- // 6: 定义接受消息的回调
- Channel finalChannel = channel;
- finalChannel.basicConsume(queueName, true, new DeliverCallback() {
- @Override
- public void handle(String s, Delivery delivery) throws IOException {
- System.out.println(queueName + ":收到消息是:" + new String(delivery.getBody(), "UTF-8"));
- }
- }, new CancelCallback() {
- @Override
- public void handle(String s) throws IOException {
- }
- });
- System.out.println(queueName + ":开始接受消息");
- System.in.read();
- } catch (Exception ex) {
- ex.printStackTrace();
- System.out.println("发送消息出现异常...");
- } finally {
- // 7: 释放连接关闭通道
- if (channel != null && channel.isOpen()) {
- try {
- channel.close();
- } catch (Exception ex) {
- ex.printStackTrace();
- }
- }
- if (connection != null && connection.isOpen()) {
- try {
- connection.close();
- } catch (Exception ex) {
- ex.printStackTrace();
- }
- }
- }
- }
- };
-
-
-
- public static void main(String[] args) {
- // 启动三个线程去执行
- new Thread(runnable, "queue1").start();
- new Thread(runnable, "queue2").start();
- new Thread(runnable, "queue3").start();
- new Thread(runnable, "queue4").start();
- // new Thread(runnable, "queue5").start();
- }
- }
6. RabbitMQ入门案例 - Topic 模式
(注意这里已经在可视化界面让队列绑定了交换机)
生产者
- package com.chen.rabbitmq.topics;
-
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
-
- /**
- Topic模式
- */
- public class Producer {
- public static void main(String[] args) {
- // 1: 创建连接工厂
- ConnectionFactory connectionFactory = new ConnectionFactory();
- // 2: 设置连接属性
- connectionFactory.setHost("125.156.157.151");
- connectionFactory.setPort(5672);
- connectionFactory.setVirtualHost("/");
- connectionFactory.setUsername("chenjinxian");
- connectionFactory.setPassword("chenjinxian");
- Connection connection = null;
- Channel channel = null;
- try {
- // 3: 从连接工厂中获取连接
- connection = connectionFactory.newConnection("生产者");
- // 4: 从连接中获取通道channel
- channel = connection.createChannel();
-
- // 5: 准备发送消息的内容
- String message = "hello topic_exchange!!!";
-
- // 6:准备交换机
- String exchangeName = "topic_exchange";
- // 7: 定义路由key
- String routeKey = "com.order.user";
- // 8: 指定交换机的类型
- String type = "topic";
- // 7: 发送消息给中间件rabbitmq-server
- // @params1: 交换机exchange
- // @params2: 队列名称/routingkey
- // @params3: 属性配置
- // @params4: 发送消息的内容
- // #.course.* queue3
- // *.order.# queue2 ta
- // com.order.course.xxx collecion
- channel.basicPublish(exchangeName, routeKey, null, message.getBytes());
- System.out.println("消息发送成功!");
- } catch (Exception ex) {
- ex.printStackTrace();
- System.out.println("发送消息出现异常...");
- } finally {
- // 7: 释放连接关闭通道
- if (channel != null && channel.isOpen()) {
- try {
- channel.close();
- } catch (Exception ex) {
- ex.printStackTrace();
- }
- }
- if (connection != null) {
- try {
- connection.close();
- } catch (Exception ex) {
- ex.printStackTrace();
- }
- }
- }
- }
- }
消费者不变
完整案例(创建交换机,创建队列,交换机与队列绑定)
- package com.chen.rabbitmq.all;
-
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
-
- /**
- 完整案例
- */
- public class Producer {
- public static void main(String[] args) {
- // 1: 创建连接工厂
- ConnectionFactory connectionFactory = new ConnectionFactory();
- // 2: 设置连接属性
- connectionFactory.setHost("151.156.157.151");
- connectionFactory.setPort(5672);
- connectionFactory.setVirtualHost("/");
- connectionFactory.setUsername("chenjinxian");
- connectionFactory.setPassword("chenjinxian");
- Connection connection = null;
- Channel channel = null;
- try {
- // 3: 从连接工厂中获取连接
- connection = connectionFactory.newConnection("生产者");
- // 4: 从连接中获取通道channel
- channel = connection.createChannel();
- // 6: 准备发送消息的内容
- String message = " 你好,小白";
- // 交换机
- String exchangeName = "direct_message_exchange";
- // 交换机的类型 direct/topic/fanout/headers
- String exchangeType = "direct";
-
- // 如果你用界面把queueu 和 exchange的关系先绑定话,你代码就不需要在编写这些声明代码可以让代码变得更加简洁,但是不容读懂
- // 如果用代码的方式去声明,我们要学习一下
- // 7: 声明交换机 所谓的持久化就是指,交换机会不会随着服务器重启造成丢失,如果是true代表不丢失,false重启就会丢失
- channel.exchangeDeclare(exchangeName,exchangeType,true);
-
- // 8: 声明队列
- channel.queueDeclare("queue5",true,false,false,null);
- channel.queueDeclare("queue6",true,false,false,null);
- channel.queueDeclare("queue7",true,false,false,null);
-
- // 9:绑定队列和交换机的关系
- channel.queueBind("queue5",exchangeName,"order");
- channel.queueBind("queue6",exchangeName,"order");
- channel.queueBind("queue7",exchangeName,"course");
-
- channel.basicPublish(exchangeName, "course", null, message.getBytes());
- System.out.println("消息发送成功!");
- } catch (Exception ex) {
- ex.printStackTrace();
- System.out.println("发送消息出现异常...");
- } finally {
- // 7: 释放连接关闭通道
- if (channel != null && channel.isOpen()) {
- try {
- channel.close();
- } catch (Exception ex) {
- ex.printStackTrace();
- }
- }
- if (connection != null) {
- try {
- connection.close();
- } catch (Exception ex) {
- ex.printStackTrace();
- }
- }
- }
- }
- }
执行完后生成队列和交换机
7. RabbitMQ入门案例 - Work模式
01 Work模式轮询模式(Round-Robin)
当有多个消费者时,我们的消息会被哪个消费者消费呢,我们又该如何均衡消费者消费信息的多少呢?
主要有两种模式:
-
轮询模式的分发:一个消费者一条,按均分配
-
公平分发:根据消费者的消费能力进行公平分发,处理快的处理的多,处理慢的处理的少;按劳分配
01轮询模式
生产者
- package com.chen.rabbitmq.work.lunxun;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- /**
- 轮询模式
- */
- public class Producer {
- public static void main(String[] args) {
- // 1: 创建连接工厂
- ConnectionFactory connectionFactory = new ConnectionFactory();
- // 2: 设置连接属性
- connectionFactory.setHost("123.156.147.151");
- connectionFactory.setPort(5672);
- connectionFactory.setVirtualHost("/");
- connectionFactory.setUsername("chenjinxian");
- connectionFactory.setPassword("chenjinxian");
- Connection connection = null;
- Channel channel = null;
- try {
- // 3: 从连接工厂中获取连接
- connection = connectionFactory.newConnection("生产者");
- // 4: 从连接中获取通道channel
- channel = connection.createChannel();
- // 6: 准备发送消息的内容
- //===============================end topic模式==================================
- for (int i = 1; i <= 20; i++) {
- //消息的内容
- String msg = "学相伴:" + i;
- // 7: 发送消息给中间件rabbitmq-server
- // @params1: 交换机exchange
- // @params2: 队列名称/routingkey
- // @params3: 属性配置
- // @params4: 发送消息的内容
- channel.basicPublish("", "queue1", null, msg.getBytes());
- }
- System.out.println("消息发送成功!");
- } catch (Exception ex) {
- ex.printStackTrace();
- System.out.println("发送消息出现异常...");
- } finally {
- // 7: 释放连接关闭通道
- if (channel != null && channel.isOpen()) {
- try {
- channel.close();
- } catch (Exception ex) {
- ex.printStackTrace();
- }
- }
- if (connection != null) {
- try {
- connection.close();
- } catch (Exception ex) {
- ex.printStackTrace();
- }
- }
- }
- }
- }
消费者
- package com.chen.rabbitmq.work.lunxun;
- import com.rabbitmq.client.*;
- import java.io.IOException;
- /**
- 轮询模式
- */
- public class Work1 {
- public static void main(String[] args) {
- // 1: 创建连接工厂
- ConnectionFactory connectionFactory = new ConnectionFactory();
- // 2: 设置连接属性
- connectionFactory.setHost("123.156.147.155");
- connectionFactory.setPort(5672);
- connectionFactory.setVirtualHost("/");
- connectionFactory.setUsername("chenjinxian");
- connectionFactory.setPassword("chenjinxian");
- Connection connection = null;
- Channel channel = null;
- try {
- // 3: 从连接工厂中获取连接
- connection = connectionFactory.newConnection("消费者-Work1");
- // 4: 从连接中获取通道channel
- channel = connection.createChannel();
- // 5: 申明队列queue存储消息
- /*
- * 如果队列不存在,则会创建
- * Rabbitmq不允许创建两个相同的队列名称,否则会报错。
- *
- * @params1: queue 队列的名称
- * @params2: durable 队列是否持久化
- * @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
- * @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。
- * @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。
- * */
- // 这里如果queue已经被创建过一次了,可以不需要定义
- // channel.queueDeclare("queue1", false, false, false, null);
- // 同一时刻,服务器只会推送一条消息给消费者
- // 6: 定义接受消息的回调
- Channel finalChannel = channel;
- //finalChannel.basicQos(1);
-
- finalChannel.basicConsume("queue1", true, new DeliverCallback() {
- @Override
- public void handle(String s, Delivery delivery) throws IOException {
- try{
- System.out.println("Work1-收到消息是:" + new String(delivery.getBody(), "UTF-8"));
- Thread.sleep(200);
- }catch(Exception ex){
- ex.printStackTrace();
- }
- }
- }, new CancelCallback() {
- @Override
- public void handle(String s) throws IOException {
- }
- });
- System.out.println("Work1-开始接受消息");
- System.in.read();
- } catch (Exception ex) {
- ex.printStackTrace();
- System.out.println("发送消息出现异常...");
- } finally {
- // 7: 释放连接关闭通道
- if (channel != null && channel.isOpen()) {
- try {
- channel.close();
- } catch (Exception ex) {
- ex.printStackTrace();
- }
- }
- if (connection != null && connection.isOpen()) {
- try {
- connection.close();
- } catch (Exception ex) {
- ex.printStackTrace();
- }
- }
- }
- }
- }
- package com.chen.rabbitmq.work.lunxun;
- import com.rabbitmq.client.*;
- import java.io.IOException;
- /**
- 轮询模式
- */
- public class Work2 {
- public static void main(String[] args) {
- // 1: 创建连接工厂
- ConnectionFactory connectionFactory = new ConnectionFactory();
- // 2: 设置连接属性
- connectionFactory.setHost("123.195.157.151");
- connectionFactory.setPort(5672);
- connectionFactory.setVirtualHost("/");
- connectionFactory.setUsername("chenjinxian");
- connectionFactory.setPassword("chenjinxian");
- Connection connection = null;
- Channel channel = null;
- try {
- // 3: 从连接工厂中获取连接
- connection = connectionFactory.newConnection("消费者-Work2");
- // 4: 从连接中获取通道channel
- channel = connection.createChannel();
- // 5: 申明队列queue存储消息
- /*
- * 如果队列不存在,则会创建
- * Rabbitmq不允许创建两个相同的队列名称,否则会报错。
- *
- * @params1: queue 队列的名称
- * @params2: durable 队列是否持久化
- * @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
- * @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。
- * @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。
- * */
- // 这里如果queue已经被创建过一次了,可以不需要定义
- //channel.queueDeclare("queue1", false, true, false, null);
- // 同一时刻,服务器只会推送一条消息给消费者
- //channel.basicQos(1);
- // 6: 定义接受消息的回调
- Channel finalChannel = channel;
- //finalChannel.basicQos(1);
- finalChannel.basicConsume("queue1", true, new DeliverCallback() {
- @Override
- public void handle(String s, Delivery delivery) throws IOException {
- try{
- System.out.println("Work2-收到消息是:" + new String(delivery.getBody(), "UTF-8"));
- Thread.sleep(100);
- }catch(Exception ex){
- ex.printStackTrace();
- }
- }
- }, new CancelCallback() {
- @Override
- public void handle(String s) throws IOException {
- }
- });
- System.out.println("Work2-开始接受消息");
- System.in.read();
- } catch (Exception ex) {
- ex.printStackTrace();
- System.out.println("发送消息出现异常...");
- } finally {
- // 7: 释放连接关闭通道
- if (channel != null && channel.isOpen()) {
- try {
- channel.close();
- } catch (Exception ex) {
- ex.printStackTrace();
- }
- }
- if (connection != null && connection.isOpen()) {
- try {
- connection.close();
- } catch (Exception ex) {
- ex.printStackTrace();
- }
- }
- }
- }
- }
02 Work模式公平分发模式
生产者
- package com.chen.rabbitmq.work.fair;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
-
- /**
- 公平分发模式
- */
- public class Producer {
- public static void main(String[] args) {
- // 1: 创建连接工厂
- ConnectionFactory connectionFactory = new ConnectionFactory();
- // 2: 设置连接属性
- connectionFactory.setHost("125.156.157.151");
- connectionFactory.setPort(5672);
- connectionFactory.setVirtualHost("/");
- connectionFactory.setUsername("chenjinxian");
- connectionFactory.setPassword("chenjinxian");
- Connection connection = null;
- Channel channel = null;
- try {
- // 3: 从连接工厂中获取连接
- connection = connectionFactory.newConnection("生产者");
- // 4: 从连接中获取通道channel
- channel = connection.createChannel();
- // 6: 准备发送消息的内容
- //===============================end topic模式==================================
- for (int i = 1; i <= 20; i++) {
- //消息的内容
- String msg = "学相伴:" + i;
- // 7: 发送消息给中间件rabbitmq-server
- // @params1: 交换机exchange
- // @params2: 队列名称/routingkey
- // @params3: 属性配置
- // @params4: 发送消息的内容
- channel.basicPublish("", "queue1", null, msg.getBytes());
- }
- System.out.println("消息发送成功!");
- } catch (Exception ex) {
- ex.printStackTrace();
- System.out.println("发送消息出现异常...");
- } finally {
- // 7: 释放连接关闭通道
- if (channel != null && channel.isOpen()) {
- try {
- channel.close();
- } catch (Exception ex) {
- ex.printStackTrace();
- }
- }
- if (connection != null) {
- try {
- connection.close();
- } catch (Exception ex) {
- ex.printStackTrace();
- }
- }
- }
- }
- }
消费者
- package com.chen.rabbitmq.work.fair;
-
- import com.rabbitmq.client.*;
-
- import java.io.IOException;
-
- /**
- 公平分发模式
- */
- public class Work1 {
- public static void main(String[] args) {
- // 1: 创建连接工厂
- ConnectionFactory connectionFactory = new ConnectionFactory();
- // 2: 设置连接属性
- connectionFactory.setHost("123.156.146.151");
- connectionFactory.setPort(5672);
- connectionFactory.setVirtualHost("/");
- connectionFactory.setUsername("chenjinxian");
- connectionFactory.setPassword("chenjinxian");
- Connection connection = null;
- Channel channel = null;
- try {
- // 3: 从连接工厂中获取连接
- connection = connectionFactory.newConnection("消费者-Work1");
- // 4: 从连接中获取通道channel
- channel = connection.createChannel();
- // 5: 申明队列queue存储消息
- /*
- * 如果队列不存在,则会创建
- * Rabbitmq不允许创建两个相同的队列名称,否则会报错。
- *
- * @params1: queue 队列的名称
- * @params2: durable 队列是否持久化
- * @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
- * @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。
- * @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。
- * */
- // 这里如果queue已经被创建过一次了,可以不需要定义
- // channel.queueDeclare("queue1", false, false, false, null);
- // 同一时刻,服务器只会推送一条消息给消费者
- // 6: 定义接受消息的回调
- final Channel finalChannel = channel;
-
- finalChannel.basicQos(1);
- finalChannel.basicConsume("queue1", false, new DeliverCallback() {
- @Override
- public void handle(String s, Delivery delivery) throws IOException {
- try{
- System.out.println("Work1-收到消息是:" + new String(delivery.getBody(), "UTF-8"));
- Thread.sleep(1000);
- // 改成手动应答
- finalChannel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
- }catch(Exception ex){
- ex.printStackTrace();
- }
- }
- }, new CancelCallback() {
- @Override
- public void handle(String s) throws IOException {
- }
- });
- System.out.println("Work1-开始接受消息");
- System.in.read();
- } catch (Exception ex) {
- ex.printStackTrace();
- System.out.println("发送消息出现异常...");
- } finally {
- // 7: 释放连接关闭通道
- if (channel != null && channel.isOpen()) {
- try {
- channel.close();
- } catch (Exception ex) {
- ex.printStackTrace();
- }
- }
- if (connection != null && connection.isOpen()) {
- try {
- connection.close();
- } catch (Exception ex) {
- ex.printStackTrace();
- }
- }
- }
- }
- }
- package com.chen.rabbitmq.work.fair;
-
- import com.rabbitmq.client.*;
-
- import java.io.IOException;
-
- /**
- 公平分发模式
- */
- public class Work2 {
- public static void main(String[] args) {
- // 1: 创建连接工厂
- ConnectionFactory connectionFactory = new ConnectionFactory();
- // 2: 设置连接属性
- connectionFactory.setHost("121.156.157.131");
- connectionFactory.setPort(5672);
- connectionFactory.setVirtualHost("/");
- connectionFactory.setUsername("chenjinxian");
- connectionFactory.setPassword("chenjinxian");
- Connection connection = null;
- Channel channel = null;
- try {
- // 3: 从连接工厂中获取连接
- connection = connectionFactory.newConnection("消费者-Work2");
- // 4: 从连接中获取通道channel
- channel = connection.createChannel();
- // 5: 申明队列queue存储消息
- /*
- * 如果队列不存在,则会创建
- * Rabbitmq不允许创建两个相同的队列名称,否则会报错。
- *
- * @params1: queue 队列的名称
- * @params2: durable 队列是否持久化
- * @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
- * @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。
- * @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。
- * */
- // 这里如果queue已经被创建过一次了,可以不需要定义
- //channel.queueDeclare("queue1", false, true, false, null);
- // 同一时刻,服务器只会推送一条消息给消费者
- //channel.basicQos(1);
- // 6: 定义接受消息的回调
- final Channel finalChannel = channel;
- finalChannel.basicQos(1);
- finalChannel.basicConsume("queue1", false, new DeliverCallback() {
- @Override
- public void handle(String s, Delivery delivery) throws IOException {
- try{
- System.out.println("Work2-收到消息是:" + new String(delivery.getBody(), "UTF-8"));
- Thread.sleep(200);
- // 一定使用我们的手动应答
- finalChannel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
- }catch(Exception ex){
- ex.printStackTrace();
- }
- }
- }, new CancelCallback() {
- @Override
- public void handle(String s) throws IOException {
- }
- });
- System.out.println("Work2-开始接受消息");
- System.in.read();
- } catch (Exception ex) {
- ex.printStackTrace();
- System.out.println("发送消息出现异常...");
- } finally {
- // 7: 释放连接关闭通道
- if (channel != null && channel.isOpen()) {
- try {
- channel.close();
- } catch (Exception ex) {
- ex.printStackTrace();
- }
- }
- if (connection != null && connection.isOpen()) {
- try {
- connection.close();
- } catch (Exception ex) {
- ex.printStackTrace();
- }
- }
- }
- }
- }
8. RabbitMQ使用场景
01 解耦、削峰、异步
同步异步的问题(串行)
串行方式:将订单信息写入数据库成功后,发送注册邮件,再发送注册短信。以上三个任务全部完成后,返回给客户端
并行方式 异步线程池
并行方式:将订单信息写入数据库成功后,发送注册邮件的同时,发送注册短信。以上三个任务完成后,返回给客户端。与串行的差别是,并行的方式可以提高处理的时间
存在问题
-
耦合度高
-
需要自己写线程池自己维护成本太高
-
出现了消息可能会丢失,需要你自己做消息补偿
-
如何保证消息的可靠性你自己写
-
如果服务器承载不了,你需要自己去写高可用
异步消息队列的方式
好处:
-
完全解耦,用 MQ建立桥接
-
有独立的线程池和运行模型
-
出现了消息可能会丢失,MQ有持久化功能
-
如何保证消息的可靠性,死信队列和消息转移等
-
如果服务器承载不了,你需要自己去写高可用,HA镜像模型高可用
按照以上约定,用户的响应时间相当于是订单信息写入数据库的时间,也就是50毫秒。注册邮件,发送短信写入消息队列后,直接返回,因此写入消息队列的速度很快,基本可以忽略,因此用户的响应时间可能是50毫秒。因此架构改变后,系统的吞吐量提高到每秒20QPS。比串行提高了3倍,比并行提高了两倍
02 高内聚,低耦合
好处:
-
完全解耦,用 MQ建立桥接
-
有独立的线程池和运行模型
-
出现了消息可能会丢失,MQ有持久化功能
-
如何保证消息的可靠性,死信队列和消息转移等
-
如果服务器承载不了,你需要自己去写高可用,HA镜像模型高可用
按照以上约定,用户的响应时间相当于是订单信息写入数据库的时间,也就是50毫秒。注册邮件,发送短信写入消息队列后,直接返回,因此写入消息队列的速度很快,基本可以忽略,因此用户的响应时间可能是50毫秒。因此架构改变后,系统的吞吐量提高到每秒20QPS。比串行提高了3倍,比并行提高了两倍
四、Springboot案例
1. Fanout 模式
生产者
导入依赖
- <!--rabbitmq starter 依赖-->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
application.yml
- # 服务端口
- server:
- port: 8080
- # 配置rabbitmq服务
- spring:
- rabbitmq:
- username: admin
- password: admin
- virtual-host: /
- host: 127.0.0.1
- port: 5672
目录结构
创建配置类RabbitMqConfiguration.java
- package com.chen.springbootorderrabbitmqproducer.config;
- import org.springframework.amqp.core.Binding;
- import org.springframework.amqp.core.BindingBuilder;
- import org.springframework.amqp.core.FanoutExchange;
- import org.springframework.amqp.core.Queue;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- /**
- */
- @Configuration
- public class RabbitMqConfiguration {
-
-
- // 1: 声明交换机
- @Bean
- public FanoutExchange fanoutExchange() {
- return new FanoutExchange("fanout_order_ex", true, false);
- }
-
-
- // 2: 声明队列 duanxin.fanout.queue
- @Bean
- public Queue duanxinqueue() {
- return new Queue("duanxin.fanout.queue", true);
- }
-
-
- // 2: 声明队列 duanxin.fanout.queue
- @Bean
- public Queue smsqueue() {
- return new Queue("sms.fanout.queue", true);
- }
-
-
- // 2: 声明队列 duanxin.fanout.queue
- @Bean
- public Queue emailqueue() {
- return new Queue("email.fanout.queue", true);
- }
-
-
- // 3: 确定绑定关系
- @Bean
- public Binding bindduanxin(){
- return BindingBuilder.bind(duanxinqueue()).to(fanoutExchange());
- }
-
- // 3: 确定绑定关系
- @Bean
- public Binding bindsms(){
- return BindingBuilder.bind(smsqueue()).to(fanoutExchange());
- }
-
- // 3: 确定绑定关系
- @Bean
- public Binding bindemail(){
- return BindingBuilder.bind(emailqueue()).to(fanoutExchange());
- }
- }
编写实现类OrderService.java
- package com.chen.springbootorderrabbitmqproducer.service;
-
- import org.springframework.amqp.AmqpException;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.core.MessagePostProcessor;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Service;
-
- import java.util.UUID;
-
- /**
- */
- @Service
- public class OrderService {
-
- @Autowired
- private RabbitTemplate rabbitTemplate;
-
- // 交换机
- private String exchangeName = "fanout_order_ex";
- // 路由key
- private String routingKey = "";
-
-
- /**
- * @Author xuke
- * @Description 模拟用户购买商品下单的业务
- * @Date 22:26 2021/3/5
- * @Param [userId, productId, num]
- * @return void
- **/
- public void makeOrder(String userId,String productId,int num){
- // 1: 根据商品id查询库存是否充足
- // 2: 保存订单
- String orderId = UUID.randomUUID().toString();
- System.out.println("保存订单成功:id是:" + orderId);
- // 3: 发送消息
- rabbitTemplate.convertAndSend(exchangeName,routingKey,orderId);
- }
-
- }
编写测试类
- package com.chen.springbootorderrabbitmqproducer.rabbitmq.springbootorderrabbitmqproducer;
-
-
- import com.chen.springbootorderrabbitmqproducer.service.OrderService;
- import org.junit.jupiter.api.Test;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.boot.test.context.SpringBootTest;
-
- @SpringBootTest
- class SpringbootOrderRabbitmqProducerApplicationTests {
-
-
- @Autowired
- private OrderService orderService;
-
-
- @Test
- public void contextLoads() {
- orderService.makeOrder("100","100",10);
- }
-
-
-
- @Test
- public void testDirect() {
- orderService.makeOrderDirect("100","100",10);
- }
-
- @Test
- public void testDirectTTl() {
- orderService.makeOrderDirectTtl("100","100",10);
- }
-
-
- // @Test
- // public void testTopic() {
- // orderService.makeOrderTopic("100","100",10);
- // }
-
- }
消费者
application.yml
- # 服务端口
- server:
- port: 8080
- # 配置rabbitmq服务
- spring:
- rabbitmq:
- username: admin
- password: admin
- virtual-host: /
- host: 127.0.0.1
- port: 5672
接受消息
- package com.chen.direct;
-
- import org.springframework.amqp.rabbit.annotation.RabbitHandler;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Service;
-
- /**
-
- */
- @Service
- @RabbitListener(queues ={"duanxin.direct.queue"})
- public class DirectDuanxinConsumber {
-
-
- // 告诉你的接收服务器的消息,没有返回值
- @RabbitHandler
- public void reviceMessage(String message){
- System.out.println("duanxin--direct--->接收到订单消息,订单id是: " + message);
- }
- }
- package com.chen.direct;
-
- import org.springframework.amqp.rabbit.annotation.RabbitHandler;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Service;
-
- /**
- */
- @Service
- @RabbitListener(queues ={"email.direct.queue"})
- public class DirectEmailConsumber {
-
-
- @RabbitHandler
- public void reviceMessage(String message){
- System.out.println("email---direct-->接收到订单消息,订单id是: " + message);
- }
- }
- package com.chen.direct;
-
- import org.springframework.amqp.rabbit.annotation.RabbitHandler;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Service;
-
- /**
- */
- @Service
- @RabbitListener(queues ={"sms.direct.queue"})
- public class DirectSmsConsumber {
-
-
- @RabbitHandler
- public void reviceMessage(String message){
- System.out.println("sms--direct--->接收到订单消息,订单id是: " + message);
- }
- }
2. Direct 模式
生产者
配置类
- package com.chen.springbootorderrabbitmqproducer.config;
-
- import org.springframework.amqp.core.*;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- import java.util.HashMap;
- import java.util.Map;
-
- /**
- */
- @Configuration
- public class RabbitMqConfiguration2 {
-
-
- // 1: 声明交换机
- @Bean
- public DirectExchange directExchange() {
- return new DirectExchange("direct_order_ex", true, false);
- }
-
-
- // 2: 声明队列 duanxin.direct.queue
- @Bean
- public Queue duanxinqueue() {
- return new Queue("duanxin.direct.queue", true);
- }
-
-
- // 2: 声明队列 duanxin.direct.queue
- @Bean
- public Queue smsqueue() {
- return new Queue("sms.direct.queue", true);
- }
-
-
- // 2: 声明队列 duanxin.direct.queue
- @Bean
- public Queue emailqueue() {
- return new Queue("email.direct.queue", true);
- }
-
-
-
- // 3: 确定绑定关系
- @Bean
- public Binding bindduanxin(){
- return BindingBuilder.bind(duanxinqueue()).to(directExchange()).with("msg");
- }
-
- // 3: 确定绑定关系
- @Bean
- public Binding bindsms(){
- return BindingBuilder.bind(smsqueue()).to(directExchange()).with("sms");
- }
-
- // 3: 确定绑定关系
- @Bean
- public Binding bindemail(){
- return BindingBuilder.bind(emailqueue()).to(directExchange()).with("email");
- }
- }
实现类:
- package com.chen.springbootorderrabbitmqproducer.service;
-
- import org.springframework.amqp.AmqpException;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.core.MessagePostProcessor;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Service;
-
- import java.util.UUID;
-
- /**
- */
- @Service
- public class OrderService {
-
- @Autowired
- private RabbitTemplate rabbitTemplate;
-
- // 交换机
- private String exchangeName = "fanout_order_ex";
- // 路由key
- private String routingKey = "";
-
-
-
-
-
- /**
- * @Description 模拟用户购买商品下单的业务
- * @Param [userId, productId, num]
- * @return void
- **/
- public void makeOrderDirect(String userId,String productId,int num){
- // 1: 根据商品id查询库存是否充足
- // 2: 保存订单
- String orderId = UUID.randomUUID().toString();
- System.out.println("保存订单成功:id是:" + orderId);
- // 3: 发送消息
- rabbitTemplate.convertAndSend("direct_order_ex","email",orderId);
- rabbitTemplate.convertAndSend("direct_order_ex","sms",orderId);
- }
-
-
-
-
-
-
- }
测试:
- package com.chen.springbootorderrabbitmqproducer.rabbitmq.springbootorderrabbitmqproducer;
-
-
- import com.chen.springbootorderrabbitmqproducer.service.OrderService;
- import org.junit.jupiter.api.Test;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.boot.test.context.SpringBootTest;
-
- @SpringBootTest
- class SpringbootOrderRabbitmqProducerApplicationTests {
-
- @Autowired
- private OrderService orderService;
-
-
- @Test
- public void testDirect() {
- orderService.makeOrderDirect("100","100",10);
- }
-
-
- }
消费者
- package com.chen.direct;
-
- import org.springframework.amqp.rabbit.annotation.RabbitHandler;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Service;
-
- /**
- * @description:
- */
- @Service
- @RabbitListener(queues ={"duanxin.direct.queue"})
- public class DirectDuanxinConsumber {
-
-
- // 告诉你的接收服务器的消息,没有返回值
- @RabbitHandler
- public void reviceMessage(String message){
- System.out.println("duanxin--direct--->接收到订单消息,订单id是: " + message);
- }
- }
- package com.chen.direct;
-
- import org.springframework.amqp.rabbit.annotation.RabbitHandler;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Service;
-
- /**
- */
- @Service
- @RabbitListener(queues ={"email.direct.queue"})
- public class DirectEmailConsumber {
-
-
- @RabbitHandler
- public void reviceMessage(String message){
- System.out.println("email---direct-->接收到订单消息,订单id是: " + message);
- }
- }
- package com.chen.direct;
-
- import org.springframework.amqp.rabbit.annotation.RabbitHandler;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Service;
-
- /**
-
- @Service
- @RabbitListener(queues ={"sms.direct.queue"})
- public class DirectSmsConsumber {
-
-
- @RabbitHandler
- public void reviceMessage(String message){
- System.out.println("sms--direct--->接收到订单消息,订单id是: " + message);
- }
- }
3. Topic 模式
生产者
- public class OrderService{
- @Autowired
- private RabbitTemplate rabbitTemplate;
- //模拟用户下单
- public void makeOrder(String userid,String productid,int num){
-
- public void makeOrderTopic(String userId,String productId,int num){
- // 1: 根据商品id查询库存是否充足
- // 2: 保存订单
- String orderId = UUID.randomUUID().toString();
- System.out.println("保存订单成功:id是:" + orderId);
- // 3: 发送消息
-
- //com.# duanxin
- //#.email.* email
- //#.sms.# sms
- // 设置消息确认机制
- rabbitTemplate.convertAndSend("topic_order_ex","com.email.sms.xxx",orderId);
- }
- }
- @Test
- public void testTopic() {
- orderService.makeOrderTopic("100","100",10);
- }
消费者(采用注解)
FanoutSmsConsumer.java
- @Component
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(value = "sms.topic.queue",durable = "true",antoDelete = "false"),
- exchange = @Exchange(value = "topic_order_exchange",type = "ExchangeTypes.TOPIC")
- key = "#.sms.#"
- ))
- public class TopicSmsConsumer{
- @RabbitHandler
- public void reviceMessage(String message){
- sout("sms接收到了的订单信息是:"+message);
- }
- }
FanoutDuanxinConsumer.java
- @Component
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(value = "duanxin.topic.queue",durable = "true",antoDelete = "false"),
- exchange = @Exchange(value = "topic_order_exchange",type = "ExchangeTypes.TOPIC")
- key = "#.duanxin.#"
- ))
- public classTopicDuanxinConsumer{
- @RabbitHandler
- public void reviceMessage(String message){
- sout("duanxin接收到了的订单信息是:"+message);
- }
- }
FanoutEmailConsumer.java
- @Component
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(value = "email.topic.queue",durable = "true",antoDelete = "false"),
- exchange = @Exchange(value = "topic_order_exchange",type = "ExchangeTypes.TOPIC")
- key = "#.email.#"
- ))
- public class TopicEmailConsumer{
- @RabbitHandler
- public void reviceMessage(String message){
- sout("email接收到了的订单信息是:"+message);
- }
- }
五、RabbitMQ高级
1. 过期时间TTL
概述
过期时间 TTl表示可以对消息设置预期的时间,在这个时间内都可以被消费者接收获取;过了之后消息将自动被删除。RabbitMQ可以对消息和队列设置 TTL,目前有两种方法可以设置
-
第一种方法是通过队列属性设置,队列中所有消息都有相同的过期时间
-
第二种方法是对消息进行单独设置,每条消息 TTL可以不同
如果上述两种方法同时使用,则消息的过期时间以两者 TTL较小的那个数值为准。消息在队列的生存时间一旦超过设置的 TTL值,就称为 dead message被投递到死信队列,消费者将无法再收到该消息
设置队列TTL
RabbitMqConfiguration.java
- @Configuration
- public class TTLRabbitMQConfiguration{
- //1.声明注册direct模式的交换机
- @Bean
- public DirectExchange ttldirectExchange(){
- return new DirectExchange("ttl_direct_exchange",true,false);}
- //2.队列的过期时间
- @Bean
- public Queue directttlQueue(){
- //设置过期时间
- Map<String,Object> args = new HashMap<>();
- args.put("x-message-ttl",5000);//这里一定是int类型
- return new Queue("ttl.direct.queue",true,false,false,args);}
-
- @Bean
- public Binding ttlBingding(){
- return BindingBuilder.bin(directttlQueue()).to(ttldirectExchange()).with("ttl");
- }
- }
设置消息TTL
- public class OrderService{
- @Autowired
- private RabbitTemplate rabbitTemplate;
- //模拟用户下单
- public void makeOrder(String userid,String productid,int num){
- //1.根据商品id查询库存是否足够
- //2.保存订单
- String orderId = UUID.randomUUID().toString();
- sout("订单生产成功:"+orderId);
- //3.通过MQ来完成消息的分发
- //参数1:交换机 参数2:路由key/queue队列名称 参数3:消息内容
- String exchangeName = "ttl_order_exchange";
- String routingKey = "ttlmessage";
- //给消息设置过期时间
- MessagePostProcessor messagePostProcessor = new MessagePostProcessor(){
- public Message postProcessMessage(Message message){
- //这里就是字符串
- message.getMessageProperties().setExpiration("5000");
- message.getMessageProperties().setContentEncoding("UTF-8");
- return message;
- }
- }
- rabbitTemplate.convertAndSend(exchangeName,routingKey,orderId,messagePostProcessor);
- }
- }
RabbitMqConfiguration.java
- @Configuration
- public class TTLRabbitMQConfiguration{
- //1.声明注册direct模式的交换机
- @Bean
- public DirectExchange ttldirectExchange(){
- return new DirectExchange("ttl_direct_exchange",true,false);}
- //2.队列的过期时间
- @Bean
- public Queue directttlQueue(){
- //设置过期时间
- Map<String,Object> args = new HashMap<>();
- args.put("x-message-ttl",5000);//这里一定是int类型
- return new Queue("ttl.direct.queue",true,false,false,args);}
- @Bean
- public Queue directttlMessageQueue(){
- return new Queue("ttlMessage.direct.queue",true,false,false,args);}
-
- @Bean
- public Binding ttlBingding(){
- return BindingBuilder.bin(directttlMessageQueue()).to(ttldirectExchange()).with("ttlmessage");
- }
- }
2. 死信队列
概述
DLX,全称 Dead-Letter-Exchange
,可以称之为死信交换机,也有人称之为死信邮箱。当消息再一个队列中变成死信之后,它能被重新发送到另一个交换机中,这个交换机就是 DLX,绑定 DLX的队列就称之为死信队列。消息变成死信,可能是由于以下原因:
-
消息被拒绝
-
消息过期
-
队列达到最大长度
DLX也是一个正常的交换机,和一般的交换机没有区别,它能在任何的队列上被指定,实际上就是设置某一个队列的属性,当这个队列中存在死信时,Rabbitmq就会自动地将这个消息重新发布到设置的 DLX上去,进而被路由到另一个队列,即死信队列。
要想使用死信队列,只需要在定义队列的时候设置队列参数x-dead-letter-exchange
指定交换机即可
代码
DeadRabbitMqConfiguration.java
- @Configuration
- public class DeadRabbitMqConfiguration{
- //1.声明注册direct模式的交换机
- @Bean
- public DirectExchange deadDirect(){
- return new DirectExchange("dead_direct_exchange",true,false);}
- //2.队列的过期时间
- @Bean
- public Queue deadQueue(){
- return new Queue("dead.direct.queue",true);}
- @Bean
- public Binding deadbinds(){
- return BindingBuilder.bind(deadDirect()).to(deadQueue()).with("dead");
- }
- }
RabbitMqConfiguration.java
- @Configuration
- public class TTLRabbitMQConfiguration{
- //1.声明注册direct模式的交换机
- @Bean
- public DirectExchange ttldirectExchange(){
- return new DirectExchange("ttl_direct_exchange",true,false);}
- //2.队列的过期时间
- @Bean
- public Queue directttlQueue(){
- //设置过期时间
- Map<String,Object> args = new HashMap<>();
- //args.put("x-max-length",5);
- args.put("x-message-ttl",5000);//这里一定是int类型
- args.put("x-dead-letter-exchange","dead_direct_exchange");
- args.put("x-dead-letter-routing-key","dead");//fanout不需要配置
- return new Queue("ttl.direct.queue",true,false,false,args);}
-
-
- @Bean
- public Binding ttlBingding(){
- return BindingBuilder.bin(directttlQueue()).to(ttldirectExchange()).with("ttlmessage");
- }
- }
3. 内存磁盘的监控
01 RabbitMQ内存警告
02 RabbitMQ的内存控制
参考帮助文档:http://www.rabbbitmq.com/configure.html
当出现警告的时候,可以通过配置去修改和调整
命令的方式
rabbitmqctl set_vm_memory_high_watermark <fraction>
rabbitmqctl set_vm_memory_high_watermark absolute 50MB
fraction/value 为内存阈值。默认情况是:0.4/2GB,代表的含义是:当 RabbitMQ的内存超过40%时,就会产生警告并且会阻塞所有生产者的连接。通过此命令修改阈值在 Broker重启以后将会失效,通过修改配置文件设置的阈值则不会随着重启而消失,但修改了配置文件一样要重启 Broker才会生效
配置文件方式 rabbitmq.conf
03 RabbitMQ的内存换页
04 RabbitMQ的磁盘预警
4. 集群(docker集群rabbitmq)
1.先创建三个rabbitmq容器
- docker run -itd --name rabbit01 --hostname myrabbit01 -v /home/software/rabbitmqcluster/rabbitmq01:/var/lib/rabbitmq -e RABBITMQ_DEFAULT_USER=chenjinxian -e RABBITMQ_DEFAULT_PASS=chenjinxian -e RABBITMQ_ERLANG_COOKIE='rabbitmqCookie' -p 15672:15672 -p 5672:5672 -p 25672:25672 rabbitmq:management
-
- docker run -itd --name rabbit02 --hostname myrabbit02 -v /home/software/rabbitmqcluster/rabbitmq01:/var/lib/rabbitmq -e RABBITMQ_DEFAULT_USER=chenjinxian -e RABBITMQ_DEFAULT_PASS=chenjinxian -e RABBITMQ_ERLANG_COOKIE='rabbitmqCookie' --link rabbit01:myrabbit01 -p 15673:15672 -p 5673:5672 -p 25673:25672 rabbitmq:management
-
- docker run -itd --name rabbit03 --hostname myrabbit03 -v /home/software/rabbitmqcluster/rabbitmq01:/var/lib/rabbitmq -e RABBITMQ_DEFAULT_USER=chenjinxian -e RABBITMQ_DEFAULT_PASS=chenjinxian -e RABBITMQ_ERLANG_COOKIE='rabbitmqCookie' --link rabbit01:myrabbit01 --link rabbit02:myrabbit02 -p 15674:15672 -p 5674:5672 -p 25674:25672 rabbitmq:management
启动容器成功后,读者可以访问
分别通过浏览器访问:ip(自己的IP地址):15672;ip:15673;ip:15674都可以访问
2.容器节点加入集群
执行如下命令,进入第一个rabbitmq节点容器:
docker exec -it rabbit01 /bin/bash
进入容器后,操作rabbitmq,执行如下命令:
- rabbitmqctl stop_app
- rabbitmqctl reset
- rabbitmqctl start_app
- exit
执行如下命令,进入第二个rabbitmq节点容器:
docker exec -it rabbit02 /bin/bash
进入第二个rabbitmq节点容器,执行如下命令:
- rabbitmqctl stop_app
- rabbitmqctl reset
- rabbitmqctl join_cluster --ram rabbit@myrabbit01
- rabbitmqctl start_app
- exit
,进入第三个rabbitmq节点容器,执行如下命令:
- docker exec -it rabbit03 /bin/bash
- rabbitmqctl stop_app
- rabbitmqctl reset
- rabbitmqctl join_cluster --ram rabbit@myrabbit01
- rabbitmqctl start_app
- exit
最后可以看到节点信息