学习地址:
01-入门
- 官网地址
https://www.rabbitmq.com/download.html - 文件上传
上传到/usr/local/software 目录下(如果没有softwareandroid系统定制开发需要自己创建) - 安装文件(android系统定制开发分别按照以下顺序安装)
rpm -ivh erlang-21.3-1.el7 .x86_64.rpm
yum install socat -y
rpm -ivh rabbitmq-server-3.8.8-1.el7.noarch.rpm - 常用命令(android系统定制开发按照以下顺序执行)
android系统定制开发添加开机启动RabbitMQ服务
chkconfig rabbitmq-server on
启动服务
/sbin/service rabbitmq-server start
android系统定制开发查看服务状态
/sbin/service rabbitmq-server status - 停止服务(选择执行)
/sbin/service rabbitmq-server stop
开启web管理插件
rabbitmq-plugins enable rabbitmq_management
android系统定制开发创建新用户
rabbitmqctl add_user root root
android系统定制开发显示用户列表
rabbitmqctl list_users
设置rootandroid系统定制开发用户有管理员权限
rabbitmqctl set_user_tags root administrator
设置rootandroid系统定制开发用户读写权限
rabbitmqctl set_permissions -p “/” root “.” “.” “.*”
android系统定制开发关闭防火墙
systemctl stop firewalld
systemctl disable firewalld
systemctl status firewalld - 访问web界面
http://linux主机ip地址:15672/
http://192.168.106.130:15672/
rabbitmqctl add_user root root
rabbitmqctl list_users
rabbitmqctl set_user_tags root administrator
rabbitmqctl set_permissions -p “/” root “." ".” “.*”
java入门使用
pom.xml
<dependencies> <!--rabbitmqandroid系统定制开发依赖客户端--> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.8.0</version> </dependency> <!--android系统定制开发操作文件流--> <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> <version>2.6</version> </dependency> </dependencies> <build> <plugins> <!--指定JDK编译版本--> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>8</source> <target>8</target> </configuration> </plugin> </plugins> </build>
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
package rabbitmq.one;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;import java.util.concurrent.TimeoutException;/** * 生产者:发消息 * @author:whd * @createTime: 2021/10/23 */public class Producer { //队列 public static final String QUEUE_NAME = "hello"; public static void main(String[] args) throws IOException, TimeoutException { //连接工程 ConnectionFactory factory = new ConnectionFactory(); //设置rabbitmq服务器IP,用户名,密码 factory.setHost("192.168.106.130"); factory.setUsername("root"); factory.setPassword("root"); //创建连接,生成信道 Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); /* 生成队列 1. 队列名称 2. android系统定制开发是否持久化(android系统定制开发保存至硬盘),android系统定制开发否则默认存储在内存 3. android系统定制开发是否只供一个消费者消费,android系统定制开发是否进行消息共享 4. android系统定制开发是否自动删除,android系统定制开发即最后一个消费者断开连接,android系统定制开发该队列是否自动删除 5. 其他参数 */ channel.queueDeclare(QUEUE_NAME,false,false,false,null); //发消息 String message = "hello,world"; /* android系统定制开发发送一个消息 1. android系统定制开发发送到哪个交换机 2. 表示路由key值 - (android系统定制开发本次是队列名) 3. android系统定制开发其他参数信息 4. android系统定制开发发送消息的消息体 */ channel.basicPublish("",QUEUE_NAME,null,message.getBytes()); System.out.println("android系统定制开发消息发送完毕"); }}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
运行后到webandroid系统定制开发界面查看如下:
linux服务器需要开启5672端口才可以发送成功
firewall-cmd --permenent --add-port=5672/tcp
消费者
package rabbitmq.one;import com.rabbitmq.client.*;import java.io.IOException;import java.util.concurrent.TimeoutException;/** * 消费者 * @author:whd * @createTime: 2021/10/23 */public class Consumer { //队列 public static final String QUEUE_NAME = "hello"; //接收消息 public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.106.130"); factory.setUsername("root"); factory.setPassword("root"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //声明接收消息的回调 DeliverCallback deliverCallback = (consumerTag,message)->{ System.out.println("message = " + new String(message.getBody())); }; //声明接收消息的回调 CancelCallback cancelCallback = consumerTag->{ System.out.println("消息消费被中断"); }; /* 消费者消费消息 1. 消费哪个队列 2. 消费成功之后是否要自动应答(true:自动应答) 3. 消费者未成功消费的回调 4. 消费者取消消费的回调 */ channel.basicConsume( QUEUE_NAME, true, deliverCallback, //消息一旦接收到,会放在这个函数回调中 cancelCallback); }}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
消费者可以一直开着,生产者可以重复去不停的发送消息
02-工作队列(work queues)
工作队列(又称任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。
相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进
程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任 务。
抽取工具类
package rabbitmq.utils;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;/** * 连接工厂,创建信道的工具类 * * @author:whd * @createTime: 2021/10/23 */public class RabbitMqUtils { //得到一个连接channel public static Channel getChannel() throws Exception { //创建一个连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.106.130"); factory.setUsername("root"); factory.setPassword("root"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); return channel; }}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
启动2个工作线程
工作线程1
package rabbitmq.two;import com.rabbitmq.client.CancelCallback;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;import rabbitmq.utils.RabbitMqUtils;/** * 测试一个工作线程,相当于消费者 * @author:whd * @createTime: 2021/10/23 */public class Worker01 { //队列名 public static final String QUEUE_NAME = "hello"; //接收消息 public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); //声明接收消息的回调 DeliverCallback deliverCallback = (consumerTag, message)->{ System.out.println("接收的消息:" + new String(message.getBody())); }; //声明取消消息接收的回调 CancelCallback cancelCallback = consumerTag -> { System.out.println("消息消费被中断"); }; /* 消费者消费消息 1. 消费哪个队列 2. 消费成功之后是否要自动应答(true:自动应答) 3. 消费者未成功消费的回调 4. 消费者取消消费的回调 */ channel.basicConsume( QUEUE_NAME, true, deliverCallback, //消息一旦接收到,会放在这个函数回调中 cancelCallback); }}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
运行一个程序运行多个实例(模拟2个工作线程)
package rabbitmq.two;import com.rabbitmq.client.Channel;import rabbitmq.utils.RabbitMqUtils;import java.util.Scanner;/** * 生产者发送大量消息 * @author:whd * @createTime: 2021/10/23 */public class Task01 { //队列 public static final String QUEUE_NAME = "hello"; //发送大量消息 public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); /* 生成队列 1. 队列名称 2. 是否持久化(保存至硬盘),否则默认存储在内存 3. 是否只供一个消费者消费,是否进行消息共享 4. 是否自动删除,即最后一个消费者断开连接,该队列是否自动删除 5. 其他参数 */ channel.queueDeclare(QUEUE_NAME,false,false,false,null); //发送消息 - 从控制台中接收信息 Scanner sc = new Scanner(System.in); while (sc.hasNext()) { String msg = sc.next(); channel.basicPublish("",QUEUE_NAME,null,msg.getBytes()); System.out.println("消息发送:" + msg); } }}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
03-消息应答与持久化
保证消息在发送过程中不丢失
消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务并仅只完成
了部分突然它挂掉了,会发生什么情况。
一旦向消费者传递了一条消息,便立即将该消息标记为删除。在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息。以及后续发送给该消费这的消息,因为它无法接收到。
为了保证消息在发送过程中不丢失,rabbitmq 引入消息应答机制,消息应答就是:消费者在接收到消息并且处理该消息之后,告诉 rabbitmq 它已经处理了,rabbitmq 可以把该消息删除了。
自动应答
不太靠谱
消息发送后立即被认为已经传送成功,这种模式需要在高吞吐量和数据传输安全性方面做权衡,因为这种模式如果消息在接收到之前,消费者那边出现连接或者 channel 关闭,那么消息就丢失了,当然另一方面这种模式消费者那边可以传递过载的消息,没有对传递的消息数量进行限制,当然这样有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压,最终使得内存耗尽,最终这些消费者线程被操作系统杀死,所以这种模式仅适用在消费者可以高效并以某种速率能够处理这些消息的情况下使用
少使用
手动应答
.
- Channel.basicAck(用于肯定确认)
RabbitMQ 已知道该消息并且成功的处理消息,可以将其丢弃了 - Channel.basicNack(用于否定确认)
- Channel.basicReject(用于否定确认)
与 Channel.basicNack 相比少一个参数不处理该消息了直接拒绝,可以将其丢弃了
Multiple 的解释
手动应答的好处是可以批量应答并且减少网络拥堵
消息自动重新入队
package rabbitmq.three;import com.rabbitmq.client.Channel;import rabbitmq.utils.RabbitMqUtils;import java.util.Scanner;/** * 消息在手动应答时不丢失,放回队列中重新消费 * @author:whd * @createTime: 2021/10/23 */public class Task02 { //队列名 public static final String TASK_QUEUE_NAME = "ack_queue"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); /* 生成队列 1. 队列名称 2. 是否持久化(保存至硬盘),否则默认存储在内存 3. 是否只供一个消费者消费,是否进行消息共享 4. 是否自动删除,即最后一个消费者断开连接,该队列是否自动删除 5. 其他参数 */ channel.queueDeclare(TASK_QUEUE_NAME,false,false,false,null); //发送消息 - 从控制台中接收信息 Scanner sc = new Scanner(System.in); while (sc.hasNext()) { String msg = sc.next(); channel.basicPublish("",TASK_QUEUE_NAME,null,msg.getBytes()); System.out.println("消息发送:" + msg); } }}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
快线程
package rabbitmq.three;import com.rabbitmq.client.CancelCallback;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;import rabbitmq.utils.RabbitMqUtils;/** * @author:whd * @createTime: 2021/10/23 * 处理消息比较快的线程 * 消息在手动应答时不丢失,放回队列中重新消费 */public class WorkerFast { //队列名 public static final String TASK_QUEUE_NAME = "ack_queue"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); System.out.println("C1等待接收消息处理时间短"); //声明接收消息的回调 DeliverCallback deliverCallback = (consumerTag, message)->{ try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("接收的消息:" + new String(message.getBody())); //手动应答 /* 1. 消息的标记tag 2. 是否批量应答 */ channel.basicAck( message.getEnvelope().getDeliveryTag(), false); }; //声明取消消息接收的回调 CancelCallback cancelCallback = consumerTag -> { System.out.println("消息消费被中断"); }; //采用手动应答 boolean autoAck = false; channel.basicConsume( TASK_QUEUE_NAME, autoAck, deliverCallback, cancelCallback); }}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
慢
package rabbitmq.three;import com.rabbitmq.client.CancelCallback;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;import rabbitmq.utils.RabbitMqUtils;/** * @author:whd * @createTime: 2021/10/23 * 处理消息比较慢的线程 * 消息在手动应答时不丢失,放回队列中重新消费 */public class WorkerSlow { //队列名 public static final String TASK_QUEUE_NAME = "ack_queue"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); System.out.println("C2等待接收消息处理时间长"); //声明接收消息的回调 DeliverCallback deliverCallback = (consumerTag, message)->{ try { Thread.sleep(1000*30); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("接收的消息:" + new String(message.getBody())); //手动应答 /* 1. 消息的标记tag 2. 是否批量应答 */ channel.basicAck( message.getEnvelope().getDeliveryTag(), false); }; //声明取消消息接收的回调 CancelCallback cancelCallback = consumerTag -> { System.out.println("消息消费被中断"); }; //采用手动应答 boolean autoAck = false; channel.basicConsume( TASK_QUEUE_NAME, autoAck, deliverCallback, cancelCallback); }}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
总结
当慢线程突然终止,原来由它收的信息给快线程收到
队列持久化
boolean durable = true; //需要让queue进行持久化
channel.queueDeclare(TASK_QUEUE_NAME,durable,false,false,null);
package rabbitmq.three;import com.rabbitmq.client.Channel;import rabbitmq.utils.RabbitMqUtils;import java.util.Scanner;/** * 消息在手动应答时不丢失,放回队列中重新消费 * @author:whd * @createTime: 2021/10/23 */public class Task02 { //队列名 public static final String TASK_QUEUE_NAME = "ack_queue"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); /* 生成队列 1. 队列名称 2. 是否持久化(保存至硬盘),否则默认存储在内存 3. 是否只供一个消费者消费,是否进行消息共享 4. 是否自动删除,即最后一个消费者断开连接,该队列是否自动删除 5. 其他参数 */ boolean durable = true; //需要让queue进行持久化 channel.queueDeclare(TASK_QUEUE_NAME,durable,false,false,null); //发送消息 - 从控制台中接收信息 Scanner sc = new Scanner(System.in); while (sc.hasNext()) { String msg = sc.next(); channel.basicPublish("",TASK_QUEUE_NAME,null,msg.getBytes()); System.out.println("消息发送:" + msg); } }}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
消息持久化
MessageProperties.PERSISTENT_TEXT_PLAIN
加入这个属性
channel.basicPublish("",TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes());
- 1
04-不公平分发
在最开始的时候我们学习到 RabbitMQ 分发消息采用的轮训分发,但是在某种场景下这种策略并不是很好,比方说有两个消费者在处理任务,其中有个消费者 1 处理任务的速度非常快,而另外一个消费者 处理速度却很慢,这个时候我们还是采用轮训分发的化就会到这处理速度快的这个消费者很大一部分时间处于空闲状态,而处理慢的那个消费者一直在干活,这种分配方式在这种情况下其实就不太好,但是RabbitMQ 并不知道这种情况它依然很公平的进行分发
WorkerFast
public class WorkerFast { public static void main(String[] args) throws Exception { ..... //设置不公平分发 int prefetchCount = 1; channel.basicQos(prefetchCount); //采用手动应答 boolean autoAck = false; channel.basicConsume( TASK_QUEUE_NAME, autoAck, deliverCallback, cancelCallback); }}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
预取值
WorkerFast
//设置不公平分发 int prefetchCount = 2; channel.basicQos(prefetchCount);
- 1
- 2
- 3
WorkerSlow
//设置不公平分发 int prefetchCount = 5; channel.basicQos(prefetchCount);
- 1
- 2
- 3
运行结果:
一定会在WorkerSlow中存在5条,但WorkerFast得到的条数是不确定的。
05-发布确认
原理
-
设置要求队列必须持久化
-
boolean durable = true; //需要让queue进行持久化
- 1
-
-
设置要求消息持久化
-
发布确认(才能肯定消息是没有丢失的)
三种确认发布
异步确认发布(可靠,效率高)
package rabbitmq.mq04;import com.rabbitmq.client.Channel;import com.rabbitmq.client.ConfirmCallback;import rabbitmq.utils.RabbitMqUtils;import java.util.UUID;/** * 发布确认模式 * * @author:whd * @createTime: 2021/10/23 */public class ConfirmMessage { //批量发消息的个数 public static final int MESSAGE_COUNT = 1000; public static void main(String[] args) throws Exception { //1. 单个确认 publishMessageIndividually(); //2. 批量确认 publishMessageBatch(); //3. 异步批量确认 publishMessageAsync(); } // 单个确认 public static void publishMessageIndividually() throws Exception { Channel channel = RabbitMqUtils.getChannel(); //队列的声明 String queueName = UUID.randomUUID().toString(); channel.queueDeclare(queueName, true, false, false, null); //开启发布确认 channel.confirmSelect(); //开始时间 long begin = System.currentTimeMillis(); //批量发消息 for (int i = 0; i < MESSAGE_COUNT; i++) { String msg = "mes:" + i; channel.basicPublish("", queueName, null, msg.getBytes()); //单个消息就马上进行发布确认 boolean flag = channel.waitForConfirms(); if (!flag) { System.out.println("消息发送失败"); } } System.out.println("消息发送成功"); //结束时间 long end = System.currentTimeMillis(); System.out.println("发布" + MESSAGE_COUNT + "消息耗时:" + (end - begin) + " ms"); } // 批量确认 public static void publishMessageBatch() throws Exception { Channel channel = RabbitMqUtils.getChannel(); //队列的声明 String queueName = UUID.randomUUID().toString(); channel.queueDeclare(queueName, true, false, false, null); //开启发布确认 channel.confirmSelect(); //开始时间 long begin = System.currentTimeMillis(); //批量确认消息大小 int batchSize = 100; //每100条确认一次 //批量发消息 for (int i = 0; i < MESSAGE_COUNT; i++) { String msg = "mes:" + i; channel.basicPublish("", queueName, null, msg.getBytes()); //判断达到100条消息,批量确认一次 if ((i + 1) % batchSize == 0) { //发布确认 boolean flag = channel.waitForConfirms(); } } System.out.println("消息发送成功"); //结束时间 long end = System.currentTimeMillis(); System.out.println("发布" + MESSAGE_COUNT + "消息批量确认耗时:" + (end - begin) + " ms"); } //异步确认发布(可靠,效率高) public static void publishMessageAsync() throws Exception { Channel channel = RabbitMqUtils.getChannel(); //队列的声明 String queueName = UUID.randomUUID().toString(); channel.queueDeclare(queueName, true, false, false, null); //开启发布确认 channel.confirmSelect(); //开始时间 long begin = System.currentTimeMillis(); //消息确认成功回调 ConfirmCallback askCallback = (deliveryTag, multiple) -> {// System.out.println("确认的消息:"+deliveryTag); }; //消息确认失败回调 //deliveryTag : 消息标记 //multiple : 是否批量确认 ConfirmCallback nackCallback = (deliveryTag, multiple) -> { System.out.println("未确认的消息:"+deliveryTag); }; //准备消息的监听器,监听消息的成功与失败 channel.addConfirmListener(askCallback, nackCallback); //批量发送 for (int i = 0; i < MESSAGE_COUNT; i++) { String msg = "消息:" + i; channel.basicPublish("", queueName, null, msg.getBytes()); } System.out.println("消息发送成功"); //结束时间 long end = System.currentTimeMillis(); System.out.println("发布" + MESSAGE_COUNT + "消息异步确认耗时:" + (end - begin) + " ms"); }}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
- 91
- 92
- 93
- 94
- 95
- 96
- 97
- 98
- 99
- 100
- 101
- 102
- 103
- 104
- 105
- 106
- 107
- 108
- 109
- 110
- 111
- 112
- 113
- 114
- 115
- 116
- 117
- 118
- 119
- 120
- 121
- 122
- 123
- 124
- 125
- 126
- 127
- 128
- 129
- 130
- 131
- 132
- 133
- 134
- 135
- 136
- 137
- 138
- 139
- 140
- 141
- 142
- 143
- 144
- 145
- 146
- 147
- 148
如何处理异步未确认消息
可以用一个map记录下发送的消息
在成功回调里删掉map中发送成功的消息
那么那个map剩下的都是未成功的
//异步确认发布(可靠,效率高) public static void publishMessageAsync() throws Exception { Channel channel = RabbitMqUtils.getChannel(); //队列的声明 String queueName = UUID.randomUUID().toString(); channel.queueDeclare(queueName, true, false, false, null); //开启发布确认 channel.confirmSelect(); //线程安全有序的哈希表,适用于高并发的情况下 // 1. 轻松将序号与消息关联 // 2. 轻松批量删除条目 // 3. 支持高并发* ConcurrentSkipListMap<Long,String> outstandingConfirms = new ConcurrentSkipListMap<>(); //消息确认成功回调 ConfirmCallback askCallback = (deliveryTag, multiple) -> {// System.out.println("确认的消息:"+deliveryTag); //删除已经确认的消息 if(multiple){ //如果是批量的 ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(deliveryTag); confirmed.clear(); }else{ //单个确认的 outstandingConfirms.remove(deliveryTag); } }; //消息确认失败回调 //deliveryTag : 消息标记 //multiple : 是否批量确认 ConfirmCallback nackCallback = (deliveryTag, multiple) -> { System.out.println("未确认的消息tag:"+deliveryTag); }; //准备消息的监听器,监听消息的成功与失败 channel.addConfirmListener(askCallback, nackCallback); //开始时间 long begin = System.currentTimeMillis(); //批量发送 for (int i = 0; i < MESSAGE_COUNT; i++) { String msg = "消息:" + i; channel.basicPublish("", queueName, null, msg.getBytes()); //记录下要发送的消息 outstandingConfirms.put(channel.getNextPublishSeqNo(),msg); } System.out.println("消息发送成功"); //结束时间 long end = System.currentTimeMillis(); System.out.println("发布" + MESSAGE_COUNT + "消息异步确认耗时:" + (end - begin) + " ms"); }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
交换机的作用
在上一节中,我们创建了一个工作队列。我们假设的是工作队列背后,每个任务都恰好交付给一个消
费者(工作进程)。在这一部分中,我们将做一些完全不同的事情-我们将消息传达给多个消费者。这种模式
称为 ”发布/订阅”.
为了说明这种模式,我们将构建一个简单的日志系统。它将由两个程序组成:第一个程序将发出日志消
息,第二个程序是消费者。其中我们会启动两个消费者,其中一个消费者接收到消息后把日志存储在磁盘另外一个消费者接收到消息后把消息打印在屏幕上,事实上第一个程序发出的日志消息将广播给所有消费
者者
临时队列
绑定
Fanout(发布订阅模式)
Fanout 这种类型非常简单。正如从名称中猜到的那样,它是将接收到的所有消息广播到它知道的
所有队列中。系统中默认有些 exchange 类型
QQ群
发布订阅模式:群聊、公告
ReceiveLog01
package rabbitmq.mq05;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;import rabbitmq.utils.RabbitMqUtils;import java.nio.charset.StandardCharsets;/** * @author:whd * @createTime: 2021/10/23 * * 消息接收 */public class ReceiveLog01 { public static final String EXCHANGE_NAME = "logs"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); //声明一个交换机 channel.exchangeDeclare(EXCHANGE_NAME,"fanout"); /* 声明一个队列,临时队列 队列名称是随机的 当消费者断开与队列的连接时,队列自动删除 */ String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName,EXCHANGE_NAME,""); System.out.println("ReceiveLog01等待接收消息...."); DeliverCallback deliverCallback = (consumerTag,msg)->{ System.out.println("ReceiveLog01控制台打印接收到的消息:" + new String(msg.getBody(), StandardCharsets.UTF_8)); }; channel.basicConsume( queueName, true, deliverCallback, consumerTag ->{} ); }}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
ReceiveLog02
package rabbitmq.mq05;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;import rabbitmq.utils.RabbitMqUtils;import java.nio.charset.StandardCharsets;/** * @author:whd * @createTime: 2021/10/23 * * 消息接收 */public class ReceiveLog02 { public static final String EXCHANGE_NAME = "logs"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); //声明一个交换机 channel.exchangeDeclare(EXCHANGE_NAME,"fanout"); /* 声明一个队列,临时队列 队列名称是随机的 当消费者断开与队列的连接时,队列自动删除 */ String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName,EXCHANGE_NAME,""); System.out.println("ReceiveLog02等待接收消息...."); DeliverCallback deliverCallback = (consumerTag,msg)->{ System.out.println("ReceiveLog02控制台打印接收到的消息:" + new String(msg.getBody(), StandardCharsets.UTF_8)); }; channel.basicConsume( queueName, true, deliverCallback, consumerTag ->{} ); }}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
EmitLog
package rabbitmq.mq05;import com.rabbitmq.client.Channel;import rabbitmq.utils.RabbitMqUtils;import java.io.IOException;import java.util.Scanner;/** * @author:whd * @createTime: 2021/10/23 * 生产者 */public class EmitLog { public static final String EXCHANGE_NAME = "logs"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); //声明一个交换机 channel.exchangeDeclare(EXCHANGE_NAME,"fanout"); //发送消息 - 从控制台中接收信息 Scanner sc = new Scanner(System.in); while (sc.hasNext()) { String msg = sc.next(); channel.basicPublish(EXCHANGE_NAME,"",null,msg.getBytes("UTF-8")); System.out.println("生产者发送:" + msg); } }}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
direct交换机
routingKey不相同
直接交换机
消费者1
package rabbitmq.mq06;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;import rabbitmq.utils.RabbitMqUtils;import java.io.IOException;import java.nio.charset.StandardCharsets;/** * @author:whd * @createTime: 2021/10/23 * 消费者1 */public class ReceiveLogsDirect01 { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); //声明一个交换机 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); //声明一个队列 channel.queueDeclare("console",false,false,false,null); channel.queueBind("console",EXCHANGE_NAME,"info"); channel.queueBind("console",EXCHANGE_NAME,"warning"); //接收消息 DeliverCallback deliverCallback = (consumerTag, msg)->{ System.out.println("ReceiveLogsDirect01控制台打印接收到的消息:" + new String(msg.getBody(), StandardCharsets.UTF_8)); }; channel.basicConsume( "console", true, deliverCallback, consumerTag ->{} ); }}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
消费者2
package rabbitmq.mq06;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;import rabbitmq.utils.RabbitMqUtils;import java.nio.charset.StandardCharsets;/** * @author:whd * @createTime: 2021/10/23 * 消费者2 */public class ReceiveLogsDirect02 { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); //声明一个交换机 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); //声明一个队列 channel.queueDeclare("disk",false,false,false,null); channel.queueBind("disk",EXCHANGE_NAME,"error"); //接收消息 DeliverCallback deliverCallback = (consumerTag, msg)->{ System.out.println("ReceiveLogsDirect02控制台打印接收到的消息:" + new String(msg.getBody(), StandardCharsets.UTF_8)); }; channel.basicConsume( "disk", true, deliverCallback, consumerTag ->{} ); }}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
生产者
package rabbitmq.mq06;import com.rabbitmq.client.Channel;import rabbitmq.utils.RabbitMqUtils;import java.util.Scanner;/** * @author:whd * @createTime: 2021/10/23 * 生产者 */public class DirectLogs { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); //发送消息 - 从控制台中接收信息 Scanner sc = new Scanner(System.in); while (sc.hasNext()) { String msg = sc.next(); //这里的“error"指定消息传送给哪个队列接收 channel.basicPublish(EXCHANGE_NAME,"error",null,msg.getBytes("UTF-8")); System.out.println("生产者发送:" + msg); } }}
- 1
只能路由1个队列,没法发送多个队列,只能发给1个队列。所以有了主题交换机
主题(topics)交换机
routing_key不能随意写,必须是一个单词列表,以点号分隔开。
stock.usd.nyse
单词列表最多不能超过255个字节
package rabbitmq.mq07;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.DeliverCallback;import rabbitmq.utils.RabbitMqUtils;import java.nio.charset.StandardCharsets;/** * @author:whd * @createTime: 2021/10/23 * 主题交换机 * 消费者C1 */public class ReceiveTopic01 { public static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); //声明一个交换机 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); //声明队列 String queueName = "Q1"; channel.queueDeclare(queueName,false,false,false,null); channel.queueBind(queueName,EXCHANGE_NAME,"*.orange.*"); System.out.println("等待接收消息....."); //接收消息 DeliverCallback deliverCallback = (consumerTag, msg)->{ System.out.println("ReceiveTopic01控制台打印接收到的消息:" + new String(msg.getBody(), StandardCharsets.UTF_8)); System.out.println("接收队列:"+queueName + " 绑定键:" + msg.getEnvelope().getRoutingKey()); }; channel.basicConsume( queueName, true, deliverCallback, consumerTag ->{} ); }}
- 1
package rabbitmq.mq07;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;import rabbitmq.utils.RabbitMqUtils;import java.nio.charset.StandardCharsets;/** * @author:whd * @createTime: 2021/10/23 * 主题交换机 * 消费者C1 */public class ReceiveTopic02 { public static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); //声明一个交换机 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); //声明队列 String queueName = "Q2"; channel.queueDeclare(queueName,false,false,false,null); channel.queueBind(queueName,EXCHANGE_NAME,"*.*.rabbit"); channel.queueBind(queueName,EXCHANGE_NAME,"lazy.*.*"); System.out.println("等待接收消息....."); //接收消息 DeliverCallback deliverCallback = (consumerTag, msg)->{ System.out.println("ReceiveTopic02控制台打印接收到的消息:" + new String(msg.getBody(), StandardCharsets.UTF_8)); System.out.println("接收队列:"+queueName + " 绑定键:" + msg.getEnvelope().getRoutingKey()); }; channel.basicConsume( queueName, true, deliverCallback, consumerTag ->{} ); }}
- 1
package rabbitmq.mq07;import com.rabbitmq.client.Channel;import rabbitmq.utils.RabbitMqUtils;import java.util.HashMap;import java.util.Map;/** * @author:whd * @createTime: 2021/10/23 * * 生产者 */public class EmitLogTopic { public static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); Map<String,String> bindingMap = new HashMap<>(); bindingMap.put("quick.orange.rabbit","被队列 Q1Q2 接收到"); bindingMap.put("lazy.orange.elephant","被队列 Q1Q2 接收到"); bindingMap.put("quick.orange.fox","被队列 Q1 接收到"); bindingMap.put("lazy.brown.fox","被队列 Q2 接收到"); bindingMap.put("lazy.pink.rabbit","虽然满足两个绑定但只被队列 Q2 接收一次"); bindingMap.put("quick.brown.fox","quick.brown.fox 不匹配任何绑定不会被任何队列接收到会被丢弃"); bindingMap.put("quick.orange.male.rabbit","是四个单词不匹配任何绑定会被丢弃"); bindingMap.put("lazy.orange.male.rabbit","是四个单词但匹配 Q2"); for (Map.Entry<String, String> entry : bindingMap.entrySet()) { String routingKey = entry.getKey(); String msg = entry.getValue(); channel.basicPublish( EXCHANGE_NAME, routingKey, null, msg.getBytes("UTF-8") ); } }}
- 1
死信队列
订单10分钟不支付删除
死信的来源
- 消息TTL过期
- 队列达到最大长度
- 消息被拒绝
C1
package rabbitmq.mq08;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;import rabbitmq.utils.RabbitMqUtils;import java.nio.charset.StandardCharsets;import java.util.HashMap;import java.util.Map;/** * @author:whd * @createTime: 2021/10/23 * * 死信 消费者1 * */public class c1 { //普通交换机 public static final String NORMAL_EXCHANGE = "normal_exchange"; //死信交换机 public static final String DEAD_EXCHANGE = "dead_exchange"; //普通队列 public static final String NORMAL_QUEUE = "normal_queue"; //死信队列 public static final String DEAD_QUEUE = "dead_queue"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); //==============================声明死信和普通交换机============================== channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT); channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT); //==============================普通队列============================== Map<String, Object> arguments = new HashMap<>(); //设置死信交换机 arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE); //设置死信 routingkey arguments.put("x-dead-letter-routing-key","lisi"); channel.queueDeclare( NORMAL_QUEUE, false, false, false, arguments); //==============================死信队列============================== channel.queueDeclare(DEAD_QUEUE,false,false,false,null); //==============================绑定交换机与队列============================== channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan"); channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi"); System.out.println("等待接收消息......."); //接收消息 DeliverCallback deliverCallback = (consumerTag, msg)->{ System.out.println("C1收到的消息:" + new String(msg.getBody(), StandardCharsets.UTF_8)); }; channel.basicConsume( NORMAL_QUEUE, true, deliverCallback, consumerTag->{}); }}
- 1
C2
package rabbitmq.mq08;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;import rabbitmq.utils.RabbitMqUtils;import java.nio.charset.StandardCharsets;/** * @author:whd * @createTime: 2021/10/23 * * 死信 消费者2 */public class c2 { //死信队列 public static final String DEAD_QUEUE = "dead_queue"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); System.out.println("等待接收消息......."); //接收消息 DeliverCallback deliverCallback = (consumerTag, msg)->{ System.out.println("C2收到的消息:" + new String(msg.getBody(), StandardCharsets.UTF_8)); }; channel.basicConsume( DEAD_QUEUE, true, deliverCallback, consumerTag->{}); }}
- 1
producer
package rabbitmq.mq08;import com.rabbitmq.client.AMQP;import com.rabbitmq.client.Channel;import rabbitmq.utils.RabbitMqUtils;/** * @author:whd * @createTime: 2021/10/23 * * 死信 生产者 */public class producer { //普通交换机 public static final String NORMAL_EXCHANGE = "normal_exchange"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); //设置消息TTL时间 AMQP.BasicProperties properties = new AMQP.BasicProperties() .builder() .expiration("10000") .build(); //死信消息 for (int i = 1; i <= 10; i++) { String msg = "info:" + i; channel.basicPublish( NORMAL_EXCHANGE, "zhangsan", properties, msg.getBytes("UTF-8")); } }}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
队列达到最大长度
//设置正常队列的长度限制 arguments.put("x-max-length",6);
- 1
- 2
消息被拒绝
修改C1
//接收消息 DeliverCallback deliverCallback = (consumerTag, msg) -> { String m = new String(msg.getBody(), StandardCharsets.UTF_8); if (m.equals("info5")) { System.out.println("被拒绝的消息:" + new String(msg.getBody(), StandardCharsets.UTF_8)); channel.basicReject( msg.getEnvelope().getDeliveryTag(), false); } else { System.out.println("C1收到的消息:" + new String(msg.getBody(), StandardCharsets.UTF_8)); channel.basicAck( msg.getEnvelope().getDeliveryTag() , false); } };
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
06-延迟队列
死信的一种
延迟队列的使用场景
1.订单在十分钟之内未支付则自动取消
2.新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。
3.用户注册成功后,如果三天内没有登陆则进行短信提醒。
4.用户发起退款,如果三天内没有得到处理则通知相关运营人员。
5.预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议
概念
延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望
在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列
这些场景都有一个特点,需要在某个事件发生之后或者之前的指定时间点完成某一项任务,如:
发生订单生成事件,在十分钟之后检查该订单支付状态,然后将未支付的订单进行关闭;看起来似乎使用定时任务,一直轮询数据,每秒查一次,取出需要被处理的数据,然后处理不就完事了吗?
如果数据量比较少,确实可以这样做,比如:对于“如果账单一周内未支付则进行自动结算”这样的需求,如果对于时间不是严格限制,而是宽松意义上的一周,那么每天晚上跑个定时任务检查一下所有未支付的账单,确实也是一个可行的方案。
但对于数据量比较大,并且时效性较强的场景,如:“订单十分钟内未支付则关闭“,短期内未支付的订单数据可能会有很多,活动期间甚至会达到百万甚至千万级别,对这么庞大的数据量仍旧使用轮询的方式显然是不可取的,很可能在一秒内无法完成所有订单的检查,同时会给数据库带来很大压力,无法满足业务要求而且性能低下
延迟队列优化
Controller
//开始发消息并携带TTL @GetMapping("/sendExpirationMsg/{message}/{ttlTime}") public void sendMsg(@PathVariable String message, @PathVariable String ttlTime){ log.info("当前时间:{},发送一条时长是{}ms的信息给队列QC:{}", new Date().toString(), ttlTime, message); MessagePostProcessor messagePostProcessor = msg->{ //设置发消息延时时长 msg.getMessageProperties().setExpiration(ttlTime); return msg; }; rabbitTemplate.convertAndSend( "X", "XC", "消息来自TTL为10秒的队列"+message, messagePostProcessor ); }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
修改配置类
package com.zuck.springbootrabbitmq.config;import org.springframework.amqp.core.*;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import java.util.HashMap;import java.util.Map;/** * @author:whd * @createTime: 2021/10/23 * TTL队列 配置文件类代码 */@Configurationpublic class TtlQueueConfig { //普通交换机名称 public static final String X_EXCHANGE = "X"; //死信交换机名称 public static final String Y_DEAD_LETTER_EXCHANGE = "Y"; //普通队列名称 public static final String QUEUE_A = "QA"; public static final String QUEUE_B = "QB"; public static final String QUEUE_C = "QC"; //===================声明交换机=================== //死信队列名称 public static final String DEAD_LETTER_QUEUE = "QD"; @Bean("xExchange") public DirectExchange xExchange(){ return new DirectExchange(X_EXCHANGE); } @Bean("yExchange") public DirectExchange yExchange(){ return new DirectExchange(Y_DEAD_LETTER_EXCHANGE); } //===================声明队列=================== @Bean("queueA") public Queue queueA(){ Map<String,Object> arguments = new HashMap<>(3); //设置死信交换机 arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE); //设置死信RoutingKey arguments.put("x-dead-letter-routing-key","YD"); //设置TTL 单位是ms arguments.put("x-message-ttl",10000); return QueueBuilder.durable(QUEUE_A) .withArguments(arguments) .build(); } @Bean("queueB") public Queue queueB(){ Map<String,Object> arguments = new HashMap<>(3); //设置死信交换机 arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE); //设置死信RoutingKey arguments.put("x-dead-letter-routing-key","YD"); //设置TTL 单位是ms arguments.put("x-message-ttl",40000); return QueueBuilder.durable(QUEUE_B) .withArguments(arguments) .build(); } @Bean("queueC") public Queue queueC(){ Map<String,Object> arguments = new HashMap<>(3); //设置死信交换机 arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE); //设置死信RoutingKey arguments.put("x-dead-letter-routing-key","YD"); return QueueBuilder.durable(QUEUE_C) .withArguments(arguments) .build(); } //死信队列 @Bean("queueD") public Queue queueD(){ return new Queue(DEAD_LETTER_QUEUE); } //==================绑定队列到交换机=================== //绑定队列A到xExchange @Bean public Binding queueABindingX( @Qualifier("queueA")Queue queueA, @Qualifier("xExchange")DirectExchange xExchange ){ return BindingBuilder.bind(queueA).to(xExchange).with("XA"); } //绑定队列B到xExchange @Bean public Binding queueBBindingX( @Qualifier("queueB")Queue queueB, @Qualifier("xExchange")DirectExchange xExchange ){ return BindingBuilder.bind(queueB).to(xExchange).with("XB"); } //绑定死信队列到yExchange @Bean public Binding queueDBindingX( @Qualifier("queueD")Queue queueD, @Qualifier("yExchange")DirectExchange yExchange ){ return BindingBuilder.bind(queueD).to(yExchange).with("YD"); } //绑定QC到x @Bean public Binding queueCBindingX( @Qualifier("queueC")Queue queueC, @Qualifier("xExchange")DirectExchange xExchange ){ return BindingBuilder.bind(queueC).to(xExchange).with("XC"); }}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
- 91
- 92
- 93
- 94
- 95
- 96
- 97
- 98
- 99
- 100
- 101
- 102
- 103
- 104
- 105
- 106
- 107
- 108
- 109
- 110
- 111
- 112
- 113
- 114
- 115
- 116
- 117
- 118
- 119
- 120
- 121
- 122
- 123
- 124
- 125
- 126
- 127
- 128
- 129
- 130
- 131
- 132
- 133
- 134
- 135
- 136
- 137
- 138
插件解决延时队列问题
这是企业的用法,建议使用
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
systemctl restart rabbitmq_server
http://localhost:8080/ttl/sendDelayMsg/come on baby1/20000
http://localhost:8080/ttl/sendDelayMsg/come on baby2/2000
Controller
//开始发消息【基于插件】的消息及延迟的时间 @GetMapping("/sendDelayMsg/{message}/{delayTime}") public void sendMsg(@PathVariable String message, @PathVariable Integer delayTime){ log.info("当前时间:{},发送一条时长是{}ms的信息给延迟队列delayedQueue:{}", new Date().toString(), delayTime, message); MessagePostProcessor messagePostProcessor = msg->{ //设置发消息延时时长 单位ms msg.getMessageProperties().setDelay(delayTime); return msg; }; rabbitTemplate.convertAndSend( DelayedQueueConfig.DELAYED_EXCHANGE_NAME, DelayedQueueConfig.DELAYED_ROUTING_KEY, "消息来自延迟队列:"+message, messagePostProcessor ); }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
延迟队列配置类
package com.zuck.springbootrabbitmq.config;import org.springframework.amqp.core.*;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import java.util.HashMap;import java.util.Map;/** * @author:whd * @createTime: 2021/10/24 * 延时队列 */@Configurationpublic class DelayedQueueConfig { //队列 public static final String DELAYED_QUEUE_NAME = "delayed.queue"; //交换机 public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange"; //routingKey public static final String DELAYED_ROUTING_KEY = "delayed.routingkey"; //声明延时交换机 @Bean("delayedExchange") public CustomExchange delayedExchange(){ //CustomExchange自定义交换机 Map<String,Object> arguments = new HashMap<>(3); arguments.put("x-delayed-type","direct"); //延迟类型是个直接类型 return new CustomExchange( DELAYED_EXCHANGE_NAME, //交换机名称 "x-delayed-message", //交换机类型 true, //是否需要持久化 false, //是否自动删除 arguments //其他参数 ); } //声明队列 @Bean("delayedQueue") public Queue delayedQueue(){ return new Queue(DELAYED_QUEUE_NAME); } //绑定 @Bean public Binding delayedQueueBindingExchange( @Qualifier("delayedQueue") Queue delayedQueue, @Qualifier("delayedExchange") CustomExchange delayedExchange ){ return BindingBuilder .bind(delayedQueue) .to(delayedExchange) .with(DELAYED_ROUTING_KEY) .noargs(); }}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
延迟队列消费者
package com.zuck.springbootrabbitmq.consumer;import com.zuck.springbootrabbitmq.config.DelayedQueueConfig;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import java.util.Date;/** * @author:whd * @createTime: 2021/10/24 * * 消费者 */@Slf4j@Componentpublic class DelayedQueueConsumer { //监听消息 @RabbitListener(queues = DelayedQueueConfig.DELAYED_QUEUE_NAME) public void receiveDelayedQueue(Message message){ String msg = new String(message.getBody()); log.info("当前时间:{},收到延迟队列的消息:{}", new Date().toString(), msg); }}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
07-发布确认高级
application添加
spring.rabbitmq.publisher-confirm-type=correlated
- NONE
禁用发布确认模式,是默认值 - CORRELATED
发布消息成功到交换器后会触发回调方法 - SIMPLE
经测试有两种效果,其一效果和 CORRELATED 值一样会触发回调方法,
其二在发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法
等待 broker 节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是
waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,则接下来无法发送消息到 broker
producerController
package com.zuck.springbootrabbitmq.controller;import com.zuck.springbootrabbitmq.config.ConfirmConfig;import com.zuck.springbootrabbitmq.config.DelayedQueueConfig;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.rabbit.connection.CorrelationData;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.PathVariable;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;import java.util.Date;/** * @author:whd * @createTime: 2021/10/24 */@Slf4j@RestController@RequestMapping("/confirm")public class ProducerController { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/sendConfirmMsg/{message}") public void sendConfirmMsg( @PathVariable String message ){ //建议在发送消息的时候,就把id传入 CorrelationData correlationData = new CorrelationData("1"); rabbitTemplate.convertAndSend( ConfirmConfig.CONFIRM_EXCHANGE_NAME, ConfirmConfig.CONFIRM_ROUTING_KEY, "消息来自confirm队列:"+message, correlationData //传入id给MyCallBack接收 ); log.info("【正确】发送消息内容:{}", message +"key1"); //建议在发送消息的时候,就把id传入 CorrelationData correlationData2 = new CorrelationData("2"); rabbitTemplate.convertAndSend( ConfirmConfig.CONFIRM_EXCHANGE_NAME, ConfirmConfig.CONFIRM_ROUTING_KEY+"2", "消息来自confirm队列:"+message, correlationData2 //传入id给MyCallBack接收 ); log.info("【错误】发送消息内容:{}", message+"key2"); }}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
MyCallBack
package com.zuck.springbootrabbitmq.config;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.ReturnedMessage;import org.springframework.amqp.rabbit.connection.CorrelationData;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;/** * @author:whd * @createTime: 2021/10/24 */@Slf4j@Componentpublic class MyCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback{ @Autowired private RabbitTemplate rabbitTemplate; //注入 @PostConstruct //在其他注解都完成后才执行 public void init(){ rabbitTemplate.setConfirmCallback(this); //将这个类注入到RabbitTemplate.ConfirmCallback rabbitTemplate.setReturnsCallback(this); //将这个类注入到RabbitTemplate.ReturnsCallback } /* 交换机确认回调方法 1. 发消息 交换机接收到了回调 1.1 correlationData 保存回调消息的ID及相关信息 1.2 交换机收到消息 ack = true 1.3 cause null 2.发消息交换机接收失败了回调 2.1 correlationData保存回调消息的ID及相关信息 2.2交换机收到消息 ack = false 2.3 cause 失败的原因 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { String id = ""; if(correlationData != null){ id = correlationData.getId(); } if(ack){ //如果交换机收到消息 log.info("交换机已经收到id为:{}的消息",id); }else{ //如果没收到 log.info("交换机未收到id为:{}的消息,原因是:{}",id,cause); } } //可以在当消息传递过程中不可达目的地时将消息返回给生产者 //只有 不可达目的地的时候才进行回退 @Override public void returnedMessage(ReturnedMessage returnedMessage) { /* private final Message message; private final int replyCode; private final String replyText; private final String exchange; private final String routingKey; public ReturnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { this.message = message; this.replyCode = replyCode; this.replyText = replyText; this.exchange = exchange; this.routingKey = routingKey; } */ log.error("消息{},被交换机{}回退,退回的原因是{},路由key是{}", new String(returnedMessage.getMessage().getBody()), returnedMessage.getExchange(), returnedMessage.getReplyText(), returnedMessage.getRoutingKey()); }}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
回退消息
spring.rabbitmq.publisher-returns=true
- 1
application.properties
spring.rabbitmq.host=192.168.106.130spring.rabbitmq.port=5672spring.rabbitmq.username=rootspring.rabbitmq.password=rootspring.rabbitmq.publisher-confirm-type=correlatedspring.rabbitmq.publisher-returns=true
- 1
- 2
- 3
- 4
- 5
- 6
08-备份交换机
目的:消息不丢失,备份,报警
09-优先级队列
Map<String,Object> arguments = new HashMap<>(); arguments.put("x-max-priority",10); //优先级范围设置为0-10 channel.queueDeclare( QUEUE_NAME, true, //持久化 false, false, arguments);
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
惰性队列
惰性队列: 消息保存在内存中还是在磁盘上
正常情况: 消息是保存在内存中
惰性队列: 消息是保存在磁盘中
使用在消费者宕机情况,性能不好
Map<String, Object> args = new HashMap<String, Object>();args.put("x-queue-mode", "lazy");channel.queueDeclare("myqueue", false, false, false, args);
- 1
- 2
- 3
10-rabbitmq集群
最开始我们介绍了如何安装及运行 RabbitMQ 服务,不过这些是单机版的,无法满足目前真实应用的要求。如果 RabbitMQ 服务器遇到内存崩溃、机器掉电或者主板故障等情况,该怎么办?单台 RabbitMQ服务器可以满足每秒 1000 条消息的吞吐量,那么如果应用需要 RabbitMQ 服务满足每秒 10 万条消息的吞吐量呢?购买昂贵的服务器来增强单机 RabbitMQ 务的性能显得捉襟见肘,搭建一个 RabbitMQ 集群才是解决实际问题的关键
搭建集群过程
我这里克隆了3台虚拟机
- 分别修改3台机器的主机名
- vim /etc/hostname
- 配置各个节点的 hosts 文件,让各个节点都能互相识别对方
- vim /etc/hosts
- 确保各个节点的 cookie 文件使用的是同一个值
- 在 node1 上执行远程操作命令
- scp /var/lib/rabbitmq/.erlang.cookie root@node2:/var/lib/rabbitmq/.erlang.cookie
- scp /var/lib/rabbitmq/.erlang.cookie root@node3:/var/lib/rabbitmq/.erlang.cookie
- 在 node1 上执行远程操作命令
- .启动 RabbitMQ 服务,顺带启动 Erlang 虚拟机和 RbbitMQ 应用服务(在三台节点上分别执行以下命令)
- rabbitmq-server -detached
- 在节点 2 执行
- rabbitmqctl stop_app
- rabbitmqctl reset
- rabbitmqctl join_cluster rabbit@node1
- rabbitmqctl start_app
- 在节点 3 执行
- rabbitmqctl stop_app
- rabbitmqctl reset
- rabbitmqctl join_cluster rabbit@node2
- rabbitmqctl start_app
- 集群状态
- rabbitmqctl cluster_status
- 需要重新设置用户
- 创建账号
- rabbitmqctl add_user admin 123
- 设置用户角色
- rabbitmqctl set_user_tags admin administrator
- 设置用户权限
- rabbitmqctl set_permissions -p “/” admin “." ".” “.*”
- 创建账号
- 解除集群节点**(node2 和 node3 机器分别执行)**
- rabbitmqctl stop_app
- rabbitmqctl reset
- rabbitmqctl start_app
- rabbitmqctl cluster_status
- 在Node1上执行
- rabbitmqctl forget_cluster_node rabbit@node2
11-镜像队列与负载均衡
如果 RabbitMQ 集群中只有一个 Broker 节点,那么该节点的失效将导致整体服务的临时性不可用,并且也可能会导致消息的丢失。可以将所有消息都设置为持久化,并且对应队列的durable属性也设置为true,
但是这样仍然无法避免由于缓存导致的问题:因为消息在发送之后和被写入磁盘井执行刷盘动作之间存在一个短暂却会产生问题的时间窗。通过 publisherconfirm 机制能够确保客户端知道哪些消息己经存入磁盘,尽管如此,一般不希望遇到因单点故障导致的服务不可用
搭建步骤
- 启动三台集群节点
- 随便找一个节点添加 policy
- 在 node1 上创建一个队列发送一条消息,队列存在镜像队列
- 停掉 node1 之后发现 node2 成为镜像队列
- 就算整个集群只剩下一台机器了 依然能消费队列里面的消息
说明队列里面的消息被镜像队列传递到相应机器里面了
负载均衡
Haproxy+Keepalive 实现高可用负载均衡
- 整体架构图
- Haproxy 实现负载均衡
- HAProxy 提供高可用性、负载均衡及基于 TCPHTTP 应用的代理,支持虚拟主机,它是免费、快速并且可靠的一种解决方案,包括 Twitter,Reddit,StackOverflow,GitHub 在内的多家知名互联网公司在使用。HAProxy 实现了一种事件驱动、单一进程模型,此模型支持非常大的井发连接数。
扩展 nginx,lvs,haproxy 之间的区别: http://www.ha97.com/5646.html
- HAProxy 提供高可用性、负载均衡及基于 TCPHTTP 应用的代理,支持虚拟主机,它是免费、快速并且可靠的一种解决方案,包括 Twitter,Reddit,StackOverflow,GitHub 在内的多家知名互联网公司在使用。HAProxy 实现了一种事件驱动、单一进程模型,此模型支持非常大的井发连接数。
- 搭建步骤
- 下载 haproxy(在 node1 和 node2)
- yum -y install haproxy
- 修改 node1 和 node2 的 haproxy.cfg
- vim /etc/haproxy/haproxy.cfg
- 在两台节点启动 haproxy
- haproxy -f /etc/haproxy/haproxy.cfg
- ps -ef | grep haproxy
- 访问地址
- http://10.211.55.71:8888/stats
- 下载 haproxy(在 node1 和 node2)
- Keepalived 实现双机(主备)热备
- 试想如果前面配置的 HAProxy 主机突然宕机或者网卡失效,那么虽然 RbbitMQ 集群没有任何故障但是对于外界的客户端来说所有的连接都会被断开结果将是灾难性的为了确保负载均衡服务的可靠性同样显得十分重要,这里就要引入 Keepalived 它能够通过自身健康检查、资源接管功能做高可用(双机热备),实现故障转移
- 搭建步骤
- 下载 keepalived
- yum -y install keepalived
- 节点 node1 配置文件
- vim /etc/keepalived/keepalived.conf
- 把资料里面的 keepalived.conf 修改之后替换
- 节点 node2 配置文件
- 需要修改 global_defs 的 router_id,如:nodeB
其次要修改 vrrp_instance_VI 中 state 为"BACKUP";
最后要将 priority 设置为小于 100 的值
- 需要修改 global_defs 的 router_id,如:nodeB
- 添加 haproxy_chk.sh
- (为了防止 HAProxy 服务挂掉之后 Keepalived 还在正常工作而没有切换到 Backup 上,所以这里需要编写一个脚本来检测 HAProxy 务的状态,当 HAProxy 服务挂掉之后该脚本会自动重启HAProxy 的服务,如果不成功则关闭 Keepalived 服务,这样便可以切换到 Backup 继续工作)
- vim /etc/keepalived/haproxy_chk.sh(可以直接上传文件)
- 修改权限 chmod 777 /etc/keepalived/haproxy_chk.sh
- 启动 keepalive 命令(node1 和 node2 启动)
- systemctl start keepalived
- 观察 Keepalived 的日志
- tail -f /var/log/messages -n 200
- 观察最新添加的 vip
- ip add show
- node1 模拟 keepalived 关闭状态
- systemctl stop keepalived
- 使用 vip 地址来访问 rabbitmq 集群
- 下载 keepalived
联邦交换机
- 使用原因
- (broker 北京),(broker 深圳)彼此之间相距甚远,网络延迟是一个不得不面对的问题。有一个在北京的业务(Client 北京) 需要连接(broker 北京),向其中的交换器 exchangeA 发送消息,此时的网络延迟很小,(Client 北京)可以迅速将消息发送至 exchangeA 中,就算在开启了 ublisherconfirm 机制或者事务机制的情况下,也可以迅速收到确认信息。此时又有个在深圳的业务(Client 深圳)需要向 exchangeA 发送消息,那么(Client 深圳) (broker 北京)之间有很大的网络延迟,(Client 深圳) 将发送消息至 exchangeA 会经历一定的延迟,尤其是在开启了 publisherconfirm 机制或者事务机制的情况下,(Client 深圳) 会等待很长的延迟时间来接收(broker 北京)的确认信息,进而必然造成这条发送线程的性能降低,甚至造成一定程度上的阻塞。将业务(Client 深圳)部署到北京的机房可以解决这个问题,但是如果(Client 深圳)调用的另些服务都部署在深圳,那么又会引发新的时延问题,总不见得将所有业务全部部署在一个机房,那么容灾又何以实现?这里使用 Federation 插件就可以很好地解决这个问题
- 搭建步骤
- 需要保证每台节点单独运行
- 在每台机器上开启 federation 相关插件
- rabbitmq-plugins enable rabbitmq_federation
- rabbitmq-plugins enable rabbitmq_federation_management
- 原理图(先运行 consumer 在 node2 创建 fed_exchange)
- 在 downstream(node2)配置 upstream(node1)
- 添加 policy
- 成功的前提
联邦队列
- 使用原因
- 联邦队列可以在多个 Broker 节点(或者集群)之间为单个队列提供均衡负载的功能。一个联邦队列可以连接一个或者多个上游队列(upstream queue),并从这些上游队列中获取消息以满足本地消费者消费消息的需求
- 搭建步骤
- 添加 upstream(同上)
- 添加 policy
- 添加 upstream(同上)
Shovel
- 使用原因
- Federation 具备的数据转发功能类似,Shovel 够可靠、持续地从一个 Broker 中的队列(作为源端,即source)拉取数据并转发至另一个 Broker 中的交换器(作为目的端,即 destination)。作为源端的队列和作为目的端的交换器可以同时位于同一个 Broker,也可以位于不同的 Broker 上。Shovel 可以翻译为"铲子",是一种比较形象的比喻,这个"铲子"可以将消息从一方"铲子"另一方。Shovel 行为就像优秀的客户端应用程序能够负责连接源和目的地、负责消息的读写及负责连接失败问题的处理
- 搭建步骤
- 开启插件(需要的机器都开启)
- rabbitmq-plugins enable rabbitmq_shovel
- rabbitmq-plugins enable rabbitmq_shovel_management
- 原理图(在源头发送的消息直接回进入到目的地队列)
- 添加 shovel 源和目的地
- 开启插件(需要的机器都开启)