1. 软件系统开发定制相关概念及安装
1.1 MQ基本概念
消息队列(MQ:Message Queue)软件系统开发定制是以一种用来保存消息软件系统开发定制数据的队列。
调用:web
软件系统开发定制层代码调用service
软件系统开发定制层代码时调用;软件系统开发定制请求响应可以称之为调用;软件系统开发定制这些调用多是同步的,软件系统开发定制调用方需要等待被调用软件系统开发定制方给出结果之后,软件系统开发定制才能继续执行后面的代码。
消息:软件系统开发定制调用者发送给被调用者,软件系统开发定制需要后者处理的内容。软件系统开发定制包括但不仅限于(eg:)web层发送给service软件系统开发定制层需要其保存的数据对象。
队列:的一种,特征:先进先出,FIFO。
MQ
软件系统开发定制系统中包含如下角色和概念:
生产者(producer
):软件系统开发定制生产并发送消息的一方
消费者(consumer
):软件系统开发定制接收使用消息的一方
软件系统开发定制代理服务器(Broker
):软件系统开发定制临时保存生产者发送消软件系统开发定制息的服务器
1.2 作用/优点
-
应用解耦,发送方为
生产者
,接收方为消费者
;软件系统开发定制异步请求响应的方式,软件系统开发定制消息发送处理也是异步的 -
软件系统开发定制应用快速变更维护,软件系统开发定制方便增删业务(生产者、软件系统开发定制消费者节点),软件系统开发定制软件系统开发定制同样依赖消息的异步发送、处理
-
削峰填谷:软件系统开发定制大量请求到底后,在
MQ
处排队,软件系统开发定制等待后台服务器(软件系统开发定制应用服务器、软件系统开发定制数据库服务器)慢慢处理;同样依赖消息的异步发送、处理
重点:异步。软件系统开发定制所有的实现都是基于异软件系统开发定制步这样一个大的前提。
1.3 缺点
-
软件系统开发定制系统复杂度提高
-
系统可用性降低
-
异步消息机制
-
消息顺序性
-
消息丢失
-
消息一致性
-
消息重复消费
-
上述缺点都能搞定。
1.4 MQ产品
项目\产品 | RocketMQ | ActiveMQ | RabbitMQ | Kafka |
---|---|---|---|---|
公司/社区 | 阿里/Apache | Apache | Rabbit | Apache |
开发语言 | Java | Java | <font color="red">Erlang</font> | Scala&Java |
消息延迟 | 毫秒级 | 毫秒级 | <font color="red">微秒级</font> | 毫秒以内 |
单机吞吐量 | <font color="red">十万级(最好)</font> | 万级(最差) | 万级(其次) | 十万级(次之) |
架构 | <font color="red">分布式架构</font> | 主从架构 | 主从架构 | <font color="red">分布式架构</font> |
协议支持 | 自定义 | OpenWire/STOMP<br>REST/XMPP/AMQP | SMTP/STOMP/XMPP/AMQP | 自定义协议,社区封<br>装了http协议支持 |
客户端<br>支持语言 | Java<br>C++(不成熟) | Java/C/C++/PHP<br>Python/Perl/.Net | 官方支持Erlang/Java/Ruby<br>社区支持几乎所有语言 | 官方支持Java<br>社区支持PHP/Python |
功能特性 | 功能强大<br>拓展性强 | 老牌产品,成熟<br>度高,文档较多 | 并发能力强,性能极其好<br>社区活跃,管理界面丰富 | 功能少,大数<br>据领域产品 |
所有产品都可以实现消息的生产或消费,实现逻辑稍有差异。
1.5 RocketMQ
角色和概念
-
消息生产者:
producer
,消息生产者,web-service
中web
是生产者。 -
消息服务器:
broker
,经纪人。实现接收、提供、持久化、过滤消息。 -
消息消费者:
consumer
。消费消息,web-service
中service
是消费者。 -
上述三个角色都可以搭建集群,实现高可用;
-
监听器监听
broker
,消费者监听broker
,有消息就消费 -
偏移量(
offset
):消费者需要从代理服务器中获取消息,消费使用;消费完之后并没有删除,而是打了一个已经消费完的标签;偏移量记录的就是所有已经消费过的数据的编码。 -
命名服务器:NameServer [cluster],统筹管理前前三个角色
-
broker
将自己注册进NameServer
-
producer
、consumer
通过其获取broker
信息然后发送、接收消息 -
命名服务器
NameServer
通过心跳检测确认producer
、consumer
、broker
上下线(哥仨向NameServer,30s/次发送心跳)
-
-
消息组成:消息体(body)、主题(Topic)、标签(tag子主题)
-
broker
组成:内含多个不同主题(Topic)
,每个topic
中包含多个队列(默认4个)
1.6 安装
选择安装二进制版本的,不需要编译等操作。前提:系统中JAVA_HOME配置好,版本≥1.8
-
上传文件并解压,安装成功
rocketmq-all-4.5.2-bin-release.zip
-
启动
nameserver
[root@localhost /]# cd /usr/local/rocketmq-4.5.2/bin/ #进入启动目录[root@localhost bin]# sh mqnamesrv # 启动NameServer# 提示信息Java HotSpot(TM) 64-Bit Server VM warning: Using the DefNew young collector with the CMS collector is deprecated and will likely be removed in a future releaseJava HotSpot(TM) 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release.# 启动成功The Name Server boot success. serializeType=JSON
-
修改
broker
配置文件broker.conf
。指定自己的ip地址,方便生产消费者连接。[root@localhost ~]# cd /usr/local/rocketmq-4.5.2/[root@localhost conf]# echo 'brokerIP1=你的broker所在主机ip' >> /usr/local/rocketmq-4.5.2/conf/broker.conf [root@localhost conf]# cat broker.conf brokerClusterName = DefaultClusterbrokerName = broker-abrokerId = 0deleteWhen = 04fileReservedTime = 48brokerRole = ASYNC_MASTERflushDiskType = ASYNC_FLUSH# 保证broker能够正确识别网卡,最终让我们的代码可以通过正确的网卡连接该brokerbrokerIP1=你的broker所在主机ip
-
启动broker
[root@localhost conf]# cd ../bin/ # 进入启动的bin目录# -n 表示连接的NameServer服务器ip和端口。 -c 指定加载的配置文件[root@localhost bin]# mqbroker -n 192.168.115.130:9876 -c ../conf/broker.conf # 提示信息The broker[broker-a, 192.168.115.130:10911] boot success. serializeType=JSON and name server is 192.168.115.130:9876
-
校验是否启动成功(单机测试)
-
方式1:使用内置的测试脚本
# 生产消息# 1.设置环境变量export NAMESRV_ADDR=localhost:9876# 2.使用安装包的Demo发送消息sh tools.sh org.apache.rocketmq.example.quickstart.Producer# 消费消息# 1.设置环境变量export NAMESRV_ADDR=localhost:9876# 2.接收消息sh tools.sh org.apache.rocketmq.example.quickstart.Consumer# 能看到发送成功的提示、消费成功的提示表示启动正常。
-
方式2:使用
RocketMQ-console
,图形化界面查看上传打包好的
rocketmq-console-ng-2.0.0.jar
运行命令启动:
`java -jar rocketmq-console-ng-2.0.0.jar
访问图形页面:
http://192.168.115.130:8889
,地址为所在主机地址,端口默认8889登录默认账密:admin/admin
在
运维
页面点击更新
后,切换到集群
页面,如果能看到一个broker,而且显示的ip地址和broker运行的主机ip地址一样,表示成功。
-
注意:
-
启动
broker
的时候,默认配置内存需求为8G/4G/4G,需要 调整为256M/128M/128M(学习期间)[root@localhost ~]# cd /usr/local/rocketmq-4.5.2/bin/[root@localhost bin]# vim runbroker.shJAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"
2. 快速入门
2.1 准备工作
-
新建
maven
管理的java
项目,导入依赖<groupId>com.itheima</groupId><artifactId>rocketmq</artifactId><version>1.0-SNAPSHOT</version><properties> <!-- 明确maven使用jdk1.8编译该模块 --> <project.build.sourceEncoding>utf-8</project.build.sourceEncoding> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target></properties><dependencies> <!-- rocketmq客户端依赖 --> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.5.2</version> </dependency></dependencies>
2.2 一对一同步消息
消费者只能消费已订阅的所有主题消息。
-
生产者
/** * @Author Vsunks.v * @Date 2021/3 * @Blog blog.sunxiaowei.net * @Description: *///生产者,产生消息public class Producer { public static void main(String[] args) throws Exception { //1.创建一个发送消息的对象Producer,指定分组(生产者分组) 等会讲 DefaultMQProducer producer = new DefaultMQProducer("group1"); //2.设定发送的命名服务器地址,连接上ns之后,才能拿到broker地址,发送消息 producer.setNamesrvAddr("192.168.115.130:9876"); //3.1启动发送的服务 producer.start(); //4.创建要发送的消息对象,指定topic,指定内容body Message msg = new Message("topic1","hello rocketmq".getBytes("UTF-8")); //3.2发送消息。这里是同步请求,如果broker没有给出响应,就拿不到返回值并且卡死在当前行代码 SendResult result = producer.send(msg); System.out.println("返回结果:"+result); //5.关闭连接 producer.shutdown(); }}
日志输出:
返回结果:SendResult [sendStatus=SEND_OK, msgId=C0A820F0396418B4AAC20290EE250000, offsetMsgId=C0A8738200002A9F0000000000061D59, messageQueue=MessageQueue [topic=topic1, brokerName=broker-a, queueId=1], queueOffset=0]
-console页面显示
-
消费者
//消费者,接收消息class Consumer { public static void main(String[] args) throws Exception { //1.创建一个接收消息的对象Consumer DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1"); //2.设定接收的命名服务器地址 consumer.setNamesrvAddr("192.168.115.130:9876"); //3.设置接收消息对应的topic,对应的sub标签为任意* // 如果想接收之前topic1的生产者发送的消息,这里的就要订阅相同的topic才可以 consumer.subscribe("topic1", "*"); //4.开启监听,用于接收消息 consumer.registerMessageListener(new MessageListenerConcurrently() { /** * 设置好监听之后,只要有消息出现,就会调用 consumeMessage方法 * @param list 所有的消息都会存入该集合,供消费者消费 * @param consumeConcurrentlyContext 同时并行消费(多线程)的上下文 * @return */ @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { //遍历消息 for (MessageExt msg : list) {// System.out.println("收到消息:"+msg); System.out.println("消息:" + new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //5.启动接收消息的服务 consumer.start(); System.out.println("接收消息服务已开启运行"); // 不能关闭消费者端服务器,因为对broker中topic设置了监听; // 该topic中只要有了新消息,就要通知消费者消费 // consumer.shutdown(); }}
2.3 一对多同步消息
消费者消费模式有两种
-
MessageModel.CLUSTERING
:负载均衡模式。同一个消费组的多个CLUSTERING
模式的消费者之间会竞争;不同消费组之间的消费者不竞争,效果类似于下面的广播模式。 -
MessageModel.BROADCASTING
:广播模式。消息通过广播的方式发送给所有的消费者,每个消费者都会消费所有的消息。
演示代码:
生产者Producer
/单生产者对多消费者//生产者,产生消息public class Producer { public static void main(String[] args) throws Exception { //1.创建一个发送消息的对象Producer DefaultMQProducer producer = new DefaultMQProducer("group5"); //2.设定发送的命名服务器地址 producer.setNamesrvAddr("192.168.115.130:9876"); //3.1启动发送的服务 producer.start(); for (int i = 1; i <= 10; i++) { //4.创建要发送的消息对象,指定topic,指定内容body Message msg = new Message("topic5",("hello rocketmq "+i).getBytes("UTF-8")); //3.2发送消息 SendResult result = producer.send(msg); System.out.println("返回结果:"+result); } //5.关闭连接 producer.shutdown(); }}
消费者Consumer
//消费者,接收消息public class Consumer { public static void main(String[] args) throws Exception { //1.创建一个接收消息的对象Consumer DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group2"); //2.设定接收的命名服务器地址 consumer.setNamesrvAddr("192.168.115.130:9876"); //3.设置接收消息对应的topic,对应的sub标签为任意* consumer.subscribe("topic2","*"); //设置当前消费者的消费模式(默认模式:负载均衡) consumer.setMessageModel(MessageModel.CLUSTERING); //设置当前消费者的消费模式为广播模式:所有客户端接收的消息都是一样的 //consumer.setMessageModel(MessageModel.BROADCASTING); //3.开启监听,用于接收消息 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { //遍历消息 for(MessageExt msg : list){// System.out.println("收到消息:"+msg); System.out.println("group2 clustering"+"消息:"+new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //4.启动接收消息的服务 consumer.start(); System.out.println("group2 clustering"+"接收消息服务已开启运行"); }}
2.4 多对多同步消息
生产者的分组对消费者分组不会产生影响,多个消费者发送的消息只需要属于同一个topic
,就可以被订阅该主题的消费者消费。
3. 消息的类别
-
同步消息:及时性较强,重要的、必须要有回执的消息;
-
异步消息:及时性较弱,但是需要回执的消息,
-
单向消息:不需要回执的消息。
应用场景
-
如果业务需求,立马要根据返回结果进行后续工作,则选用同步消息。转账通知等。
-
如果及时性要求不高,可以选用异步消息;因为效率高,不需要等待,效果好。例如订单的支付单
-
视频中老师通过等待10s是为了等消费者消费完响应,实际生产中不需要等待生产者程序会一直运行
-
-
管杀不管埋的选单向消息。eg:日志类消息
-
工作中优先选用哪个?
-
首选异步。包含之前解耦削锋等特点,目的就是为了提高效率,异步同样可以提高效率。
-
演示代码
//测试消息的种类public class Producer { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("group1"); producer.setNamesrvAddr("192.168.184.128:9876"); producer.start(); for (int i = 1; i <= 5; i++) { //同步消息发送// Message msg = new Message("topic2",("同步消息:hello rocketmq "+i).getBytes("UTF-8"));// SendResult result = producer.send(msg);// System.out.println("返回结果:"+result); //异步消息发送// Message msg = new Message("topic2",("异步消息:hello rocketmq "+i).getBytes("UTF-8"));// producer.send(msg, new SendCallback() {// //表示成功返回结果// public void onSuccess(SendResult sendResult) {// System.out.println(sendResult);// }// //表示发送消息失败// public void onException(Throwable t) {// System.out.println(t);// }// }); //单向消息 Message msg = new Message("topic2", ("单向消息:hello rocketmq " + i).getBytes("UTF-8")); producer.sendOneway(msg); } //添加一个休眠操作,确保异步消息返回后能够输出 // 工作中生产环境生产者程序会一直运行,就不需要休眠了 TimeUnit.SECONDS.sleep(10); producer.shutdown(); }}
4. 延时消息
RocketMQ
不支持任意时间的延时,只支持固定时间的延时;
1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
可以通过 msg.setDelayTimeLevel(index)
来设置延时,索引index
从0开始。
应用场景:
下单订单之后,就可以发送一个延时消息;一个小时后执行该延时消息,检查订单是否支付,如未支付,就取消订单,释放库存。
演示代码
-
生产者
Producer
//测试延时消息public class Producer { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("group1"); producer.setNamesrvAddr("192.168.115.130:9876"); producer.start(); for (int i = 1; i <= 5; i++) { Message msg = new Message("topic3",("非延时消息:hello rocketmq "+i).getBytes("UTF-8")); //设置当前消息的延时效果 // 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m // 1h 2h msg.setDelayTimeLevel(0); SendResult result = producer.send(msg); System.out.println("返回结果:"+result); } producer.shutdown(); }}
-
消费者
Consumer
public class Consumer { public static void main(String[] args) throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1"); consumer.setNamesrvAddr("192.168.115.130:9876"); consumer.subscribe("topic3","*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { for(MessageExt msg : list){ System.out.println("消息:"+new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("接收消息服务已开启运行"); }}
5. 批量消息发送
如果有多个消息,可以一次性发送。指的是生产者端。
创建多个消息,添加到list
对象中,一起发送。
批量发送消息时,每次发送的消息总量不能超过4M,具体包含:
-
topic
(字符串字节数) -
body
(字节数组长度) -
property
:消息追加的属性(key与value对应字符串字节数) -
log
(固定20字节)
演示代码
-
生产者
Producer
//测试批量消息public class Producer { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("group1"); producer.setNamesrvAddr("192.168.115.130:9876"); producer.start(); //创建一个集合保存多个消息 List<Message> msgList = new ArrayList<Message>(); Message msg1 = new Message("topic5",("批量消息:hello rocketmq "+1).getBytes("UTF-8")); Message msg2 = new Message("topic5",("批量消息:hello rocketmq "+2).getBytes("UTF-8")); Message msg3 = new Message("topic5",("批量消息:hello rocketmq "+3).getBytes("UTF-8")); msgList.add(msg1); msgList.add(msg2); msgList.add(msg3); //发送批量消息(每次发送的消息总量不得超过4M) //消息的总长度包含4个信息:topic,body,消息的属性,日志(20字节) SendResult send = producer.send(msgList); System.out.println(send); producer.shutdown(); }}
-
消费者
Consumer
public class Consumer { public static void main(String[] args) throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1"); consumer.setNamesrvAddr("192.168.115.130:9876"); consumer.subscribe("topic5","*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { for(MessageExt msg : list){ System.out.println("消息:"+new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("接收消息服务已开启运行"); }}
6. 消息过滤
消费端可以根据不同的规则选择性的消费符合要求的消息,过滤规则如下
-
主题过滤
-
消费者按照
topic
过滤,只消费指定topic
的消息。之前的都是该规则。
-
-
标签过滤
-
消费者按照
tag
过滤,只消费指定topic
下对应tag
的消息。 -
需要生产者在创建消息对象时,指定
tag
-
消费时,通过
tag
过滤。支持或
来同时指定多个tag
。eg:tag1 || tag2
-
-
SQL
过滤-
消费者按照
属性
过滤,只消费指定topic
下含有指定属性(或属性值)的消息。 -
生产者在创建消息对象后,为消息对象添加属性
-
消费时,通过属性过滤。语法类似于SQL,支持
=
、>=
、<=
、or
、and
、in
,不支持模糊查询like
-
需要在配置文件中开启该功能
enablePropertyFilter=true
-
6.1 主题过滤
代码略。
6.2 标签过滤
演示代码
-
生产者
Producer
public class Producer { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("group1"); producer.setNamesrvAddr("192.168.115.130:9876"); producer.start(); //创建消息的时候除了制定topic,还可以指定tag Message msg = new Message("topic6","tag2",("消息过滤按照tag:hello rocketmq 2").getBytes("UTF-8")); SendResult send = producer.send(msg); System.out.println(send); producer.shutdown(); }}
-
消费者
Consumer
public class Consumer { public static void main(String[] args) throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1"); consumer.setNamesrvAddr("192.168.115.130:9876"); //接收消息的时候,除了制定topic,还可以指定接收的tag,*代表任意tag consumer.subscribe("topic6","tag1 || tag2"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { for(MessageExt msg : list){ System.out.println("消息:"+new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("接收消息服务已开启运行"); }}
6.3 SQL过滤
通过类SQL
语法的方式,选择性的过滤要消费的消息。
也叫属性过滤、语法过滤。
演示代码
-
生产者
Producer
//测试按照sql过滤消息public class Producer { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("group1"); producer.setNamesrvAddr("192.168.115.130:9876"); producer.start(); // 可以同时设置tag和属性,相互不会影响 Message msg = new Message("topic7","zzz",("5消息过滤按照sql:hello rocketmq").getBytes("UTF-8")); //为消息添加属性 msg.putUserProperty("vip","1"); msg.putUserProperty("age","25"); msg.putUserProperty("username","zhangsan"); SendResult send = producer.send(msg); System.out.println(send); producer.shutdown(); }}
-
消费者
Consumer
public class Consumer { public static void main(String[] args) throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1"); consumer.setNamesrvAddr("192.168.115.130:9876"); //使用消息选择器来过滤对应的属性,语法格式为类SQL语法 //consumer.subscribe("topic7", MessageSelector.bySql("age >= 18")); //consumer.subscribe("topic7", MessageSelector.bySql("username=‘zhangsan’")); // 并集 //consumer.subscribe("topic7", MessageSelector.bySql("age > 18 or username='zhangsan'")); // 交集 //consumer.subscribe("topic7", MessageSelector.bySql("age > 18 and username='zhangsan'")); // 枚举tag consumer.subscribe("topic7", MessageSelector.bySql("TAGS in ('xxx','yyy')")); consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { for(MessageExt msg : list){ System.out.println("消息:"+new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("接收消息服务已开启运行"); }}
7 消息有序性
如何保证消息的有序性。
-
要求某个业务的所有消息只能存入一个队列。如果随机存入多个队列,则不能保证在消费的时候按照顺序消费。
-
某个队列只能被一个消费者线程消费。多个有序消息存入一个队列之后,如果是多个消费者线程消费该队列的消息,上一个消费者还没完,下个消息就可能被另外一个消费线程开始消费了,顺序也有可能被打乱;
演示代码
发送消息时,需要指定的选择器MessageQueueSelector
实体类
// 该类表示订单类@Datapublic class Order { // 为了便于区分,同一个主单的多个Order对象id相同 private String id; // 为了便于区分,msg描述当前order对象是主单还是子单 private String msg;}生产者
Producer
public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("group1"); producer.setNamesrvAddr("192.168.184.128:9876"); producer.start(); //创建要执行的业务队列 List<Order> orderList = new ArrayList<Order>(); Order order11 = new Order(); order11.setId("a"); order11.setMsg("主单-1"); orderList.add(order11); Order order12 = new Order(); order12.setId("a"); order12.setMsg("子单-2"); orderList.add(order12); Order order13 = new Order(); order13.setId("a"); order13.setMsg("支付-3"); orderList.add(order13); Order order14 = new Order(); order14.setId("a"); order14.setMsg("推送-4"); orderList.add(order14); Order order21 = new Order(); order21.setId("b"); order21.setMsg("主单-1"); orderList.add(order21); Order order22 = new Order(); order22.setId("b"); order22.setMsg("子单-2"); orderList.add(order22); Order order31 = new Order(); order31.setId("c"); order31.setMsg("主单-1"); orderList.add(order31); Order order32 = new Order(); order32.setId("c"); order32.setMsg("子单-2"); orderList.add(order32); Order order33 = new Order(); order33.setId("c"); order33.setMsg("支付-3"); orderList.add(order33); //设置消息进入到指定的消息队列中 for(final Order order : orderList){ Message msg = new Message("orderTopic",order.toString().getBytes()); // 发送时要指定对应的消息队列选择器 // 消息队列选择器作用:通过某种算法,保证相同id的多个Order消息会最终选择同一个队列并存入 SendResult result = producer.send(msg, new MessageQueueSelector() { // 设置当前消息发送时使用哪一个消息队列, // 具体队列由send方法的第二个参数的实现类的select方法的返回值决定 /** * 方法内部编写选择的规则,并将选中的队列返回。每次发送消息的时候都要调用该方法 * @param list 所有备选的消息队列 * @param message 消息对象本身 * @param o xxx * @return 当前消息选中并要存入的队列 */ @Override public MessageQueue select(List<MessageQueue> list, Message message, Object o) { // 一致性hash算法 //根据发送的信息不同,选择不同的消息队列 //根据id来选择一个消息队列的对象,并返回->id得到int值 // 永远只选择第一个队列。但是不推荐,因为有性能问题,其他的队列会被浪费,对应的性能也被浪费掉了。 //return list.get(0); // 推荐下面类似的方式,该方式会得到一个效果: // 1. 相同id的所有消息会打包 // 2. 打包后的消息会均匀的存入每个队列(hash值是散列且随机的) int mqIndex = order.getId().hashCode() % list.size(); return list.get(mqIndex); } }, null); System.out.println(result); } producer.shutdown();}
-
消费者
Consumer
public static void main(String[] args) throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1"); consumer.setNamesrvAddr("192.168.184.128:9876"); consumer.subscribe("orderTopic","*"); //使用单线程的模式从消息队列中取数据,一个线程绑定一个消息队列 consumer.registerMessageListener(new MessageListenerOrderly() { //使用MessageListenerOrderly接口后,对消息队列的处理由一个消息队列多个线程服务,转化为一个消息队列一个线程服务 public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) { for(MessageExt msg : list){ System.out.println(Thread.currentThread().getName()+" 消息:"+new String(msg.getBody())); } return ConsumeOrderlyStatus.SUCCESS; } }); consumer.start(); System.out.println("接收消息服务已开启运行");}
总结:
有序性包含:存入的时候有序,消费的时候有序。
存入的时候有序:同一个业务的多个消息有序的存入同一个队列。实现:让业务id
和队列id
绑定
消费的时候有序:只能有一个确定的线程消费当前对列。
8 消息的原子性
8.1 可能存在的原子性问题
Q:生产者Producer端的消息发送与本地事务执行的原子性问题
假设:producer
执行某个业务A过程中有三个子业务操作,每个业务操作需要向broker
发送消息;
A业务的三个子业务(A1、A2、A3)操作应该在同一个事务中,具有原子性;
producer
向broker
发送的三个消息整体上是否要具有原子性?
已经发送消息无法撤回!producer
业务回滚了怎么办?
问题演示伪代码
// service层发送消息// 下单(主单)// producer中业务方法的事务AService的 order(){ BService的 order(支付单){ // 处理一些业务逻辑 // 然后再发消息 }; CService的 order1(运单){ // 处理一些业务逻辑 int i=1/0; // 然后再发消息 }; CService的 order2(通知成功){ // 处理一些业务逻辑 // 然后再发消息 }; // 如果该方法抛异常,上述两个方法也要回滚;但是他们发送的消息已经被存入队列,而且很有可能已经被消费了;就算现在没被消费,之后也肯定会被消费。无法撤回!!!}// 3个业务方法都会发送消息到broker,本地的事务可以回滚,但是消息无法回滚!
8.2初步解决方案:
在producer
本地事务提交之前,找个地方把消息临时存起来,而非直接发给broker
。
-
消息不要直接发给
broker
,因为到了broker
之后就会进入队列等待消费,消费者发现有消息会立马消费;找一个地方先存起来,比方说在
producer
内存中保存一个对象用于记录本地事务的状态
和消息内容
,-
对象的
status
属性保存本地事务的状态;提交了为COMMIT
,回滚了为ROLLBACK
,其他情况为UNKNOWN
。该属性的值要根据事务的进展而不断设置调整。 -
对象的
msg
属性保存本次要发送的所有消息;发消息时先把消息存入该属性,假装已经发送了,此为预发送。
-
-
异步定时检查对象
status
属性,值UNKNOWN
就继续等待,COMMIT
就真的发送消息,ROLLBACK
就销毁消息不发送。
8.3 RocketMQ解决方案
聪明如RocketMQ
,也想到了这点;不同的是消息临时保存点转移到了RocketMQ
的broker
中,在确认producer
本地事务提交前,该消息不能被consumer
消费。从RocketMQ4.3
版本开始,定义了事务消息
实现该功能。
-
普通消息:生产者发送普通消息到broker之后,就立即存入目标队列无法撤回。说出去的话,泼出去的水。
-
事务消息:生产者发送的事务消息到broker之后,不会立即存入目标队列,等生产者确定无误之后再存入目标队列等待消费。
8.4 事务消息相关概念
两个过程
-
正常事务过程。本地事务没有卡住,直接回滚或者提交了;继而直接发送通知给
broker
,让其处理消息。 -
事务补偿过程。事务回检过程。本地事务卡主了,
broker
等急了,所以不断的来问问。
producer
本地事务三个状态
-
COMMIT
本地事务已经提交了 -
ROLLBACK
本地事务回滚了 -
UNKNOWN
不知道本地事务咋样了(执行事务操作的同时,等着broker来问,其实就对应了事务补偿过程。)
演示代码
-
生产者
Producer
//测试事务消息public class Producer { public static void main(String[] args) throws Exception { //事务消息使用的生产者是TransactionMQProducer TransactionMQProducer producer = new TransactionMQProducer("group1"); producer.setNamesrvAddr("192.168.184.128:9876"); //添加本地事务对应的监听 producer.setTransactionListener(new TransactionListener() { //正常事务过程 public LocalTransactionState executeLocalTransaction(Message message, Object o) { //中间状态 return LocalTransactionState.UNKNOW; } //事务补偿过程 public LocalTransactionState checkLocalTransaction(MessageExt messageExt) { System.out.println("事务补偿过程执行"); return LocalTransactionState.COMMIT_MESSAGE; } }); producer.start(); Message msg = new Message("topic11",("事务消息:hello rocketmq ").getBytes("UTF-8")); SendResult result = producer.sendMessageInTransaction(msg,null); System.out.println("返回结果:"+result); //事务补偿过程必须保障服务器在运行过程中,否则将无法进行正常的事务补偿// producer.shutdown(); } public static void main1(String[] args) throws Exception { //事务消息使用的生产者是TransactionMQProducer TransactionMQProducer producer = new TransactionMQProducer("group1"); producer.setNamesrvAddr("192.168.184.128:9876"); //添加本地事务对应的监听 producer.setTransactionListener(new TransactionListener() { //正常事务过程 public LocalTransactionState executeLocalTransaction(Message message, Object o) { //事务提交状态 return LocalTransactionState.COMMIT_MESSAGE; } //事务补偿过程 public LocalTransactionState checkLocalTransaction(MessageExt messageExt) { return null; } }); producer.start(); Message msg = new Message("topic8",("事务消息:hello rocketmq ").getBytes("UTF-8")); SendResult result = producer.sendMessageInTransaction(msg,null); System.out.println("返回结果:"+result); producer.shutdown(); } public static void main2(String[] args) throws Exception { //事务消息使用的生产者是TransactionMQProducer TransactionMQProducer producer = new TransactionMQProducer("group1"); producer.setNamesrvAddr("192.168.184.128:9876"); //添加本地事务对应的监听 producer.setTransactionListener(new TransactionListener() { //正常事务过程 public LocalTransactionState executeLocalTransaction(Message message, Object o) { //事务回滚状态 return LocalTransactionState.ROLLBACK_MESSAGE; } //事务补偿过程 public LocalTransactionState checkLocalTransaction(MessageExt messageExt) { return null; } }); producer.start(); Message msg = new Message("topic9",("事务消息:hello rocketmq ").getBytes("UTF-8")); SendResult result = producer.sendMessageInTransaction(msg,null); System.out.println("返回结果:"+result); producer.shutdown(); }}
-
消费者
Consumer
public class Consumer { public static void main(String[] args) throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1"); consumer.setNamesrvAddr("192.168.184.128:9876"); consumer.subscribe("topic11","*"); consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { for(MessageExt msg : list){ System.out.println("消息:"+new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("接收消息服务已开启运行"); }}
事务消息弊端
:
-
事务消息没有延迟和批量支持,即不能使用延迟消息的特性和批量发送消息的特性。
-
为了避免多次检查单个消息并导致
HalfTopic
消息累积,默认将单个消息的检查次数限制为15次。 -
在
broker
的配置中,由参数“transactionTimeout”配置检查事务消息的固定周期。 -
可以多次检查或消费事务消息。
-
将事务消息提交到用户的目标
topic
的可能会失败。RocketMQ
自身的高可用性机制确保了高可用性。如果要确保事务性消息不会丢失且事务完整性得到保证,建议使用同步双写机制。 -
事务消息的生产者ID不能与其他类型消息的生产者ID共享。与其他类型的消息不同,事务性消息允许后向查询。MQ Server按其生产者ID查询客户端。
另外一个方案:
本地事务确认提交之后,再统一发送所有的相关消息。可以使用多数的场景。
常见问题
1. 找不到topic
1.1 现象
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).RocketMQLog:WARN Please initialize the logger system properly.Exception in thread "main" org.apache.rocketmq.client.exception.MQClientException: No route info of this topic, topic1See http://rocketmq.apache.org/docs/faq/ for further details. at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:662) at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1310) at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1256) at org.apache.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:339) at com.itheima.filterbysql.Producer.main(Producer.java:20)
1.2 原因
错误提示是找不到名字叫
topic1
的topic
,但是提示不够明确。出现这个提示多半是连不上broker
造成的;常见原因如下:
代码中
NameServer
的地址不对
linux
防火墙启动了启动
broker
时未指定NameServer
地址和端口
1.3 解决办法
-
情况一
// Java代码中修正nameServer地址和端口
-
情况二:
# 关闭防火墙(仅对当前这次开启启动有效)service firewalld stop# 禁用防火墙(禁止开机启动)systemctl disable firewalld
-
情况三
# 正确命令如下,不要忘记通过-n 指定命名服务器的地址端口sh ../bin/mqbroker -c broker.conf -n 192.168.115.130:9876
2. 请求超时
2.1 现象
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).RocketMQLog:WARN Please initialize the logger system properly.Exception in thread "main" org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: sendDefaultImpl call timeout at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:640) at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1310) at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1256) at org.apache.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:339) at com.itheima.base.Producer.main(Producer.java:19)
2.2 原因
broker
启动的时候,选择错了网卡:应该选择本地ens33
的网卡,而错误选择了docker0
网卡
2.3 解决办法
整体思路:配置实现启动的时候指定使用ens33
的网卡
-
修改
broker
启动时的配置文件broker.conf
,执行如下命令:# 自动向配置文件中添加:brokerIP1=你的ens33网卡的ipecho brokerIP1=你的ens33网卡的ip >> /usr/local/rocketmq-4.5.2/conf/broker.conf
-
重新启动
broker
服务器,并且指定配置文件为broker.conf
sh mqbroker -c /usr/local/rocketmq-4.5.2/conf/broker.conf -n 192.168.115.130:9876# 以下为提示内容,其中ip显示已经正确识别网卡The broker[broker-a, 192.168.115.130:10911] boot success. serializeType=JSON and name server is 192.168.115.130:9876