springCloudStream集成
SpringCloudStream软件定制开发供应商框架封装出了三个最基软件定制开发供应商础的概念来对各种提供软件定制开发供应商统一的抽象:
- Destination Binders:软件定制开发供应商负责集成外部消息系统的组件。
- Destination Binding:由Binders创建的,软件定制开发供应商负责沟通外部消息系统、软件定制开发供应商消息发送者和消息消费者的桥梁。
- Message:软件定制开发供应商消息发送者与消息消费者沟通的简单数据结构。
简单使用案例
引入依赖
RabbitMQ的SpringCloudStream支持是由Spring社区官网提供的,所以这也是相当成熟的一种集成方案。但是要注意,SpringCloudStream框架集成的版本通常是比RabbitMQ产品本身落后几个版本的,使用时需要注意。
<properties> <spring.cloud.version>Hoxton.SR12</spring.cloud.version> </properties> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>${spring.cloud.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies></dependencyManagement> <dependencies><dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency> </dependencies>
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
依赖的版本通常建议使用SpringCloud的整体版本控制。
基础使用方法
spring.rabbitmq.host=idspring.rabbitmq.port=5672spring.rabbitmq.username=adminspring.rabbitmq.password=adminspring.rabbitmq.virtual-host=baseDemospring.cloud.stream.bindings.output.destination=mqExchangespring.cloud.stream.bindings.input.destination=mqExchangespring.cloud.stream.bindings.input.group=streamspring.cloud.stream.bindings.input.content-type=text/plain
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
声明Sink 消息消费者
@Component@EnableBinding(Sink.class)public class MqConsumer { private Logger logger = LoggerFactory.getLogger(MqConsumer.class); @StreamListener(Sink.INPUT) public void process(Object message) { System.out.println("received message : " + message); logger.info("received message : {}", message); }}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
声明Source 生产者
@Component@EnableBinding(Source.class)public class MqProduct {}
- 1
- 2
- 3
- 4
- 5
测试类
@RestController@RequestMapping("api/test")@Api(value = "测试用例", tags = "测试用例")public class testController { @Autowired private final Source source; @GetMapping("/send") @ApiOperation(value = "测试mq") @ApiOperationSupport(order = 11, author = "lsx") public R send(String message) { MessageBuilder<String> messageBuilder = MessageBuilder.withPayload(message); source.output().send(messageBuilder.build()); return R.data( "message sended : "+message) ; }}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
这样就能接收到消息,非常简单。当然这只是简单的案例,在开发当中不会这么去用。
开发中的使用
定义Binders,每个Binders就相当于一个消息中间件,可以指定多个Binders,每一个Binders代表一个消息中间件服务,比如一个Binders指向rabbitmq,另一个Binders指向其他的mq产品,例如kafka
application.properties
#spring.cloud.stream.binders.<bindername>.<prop>=valuespring.cloud.stream.binders.bxbinder.type=rabbit#spring.cloud.stream.binders.bxbinder.environment.spring.rabbitmq.addresses=192.168.232.128:5672,192.168.232.129:5672,192.168.232.130:5672spring.cloud.stream.binders.bxbinder.environment.spring.rabbitmq.host=ipspring.cloud.stream.binders.bxbinder.environment.spring.rabbitmq.port=5672spring.cloud.stream.binders.bxbinder.environment.spring.rabbitmq.username=adminspring.cloud.stream.binders.bxbinder.environment.spring.rabbitmq.password=adminspring.cloud.stream.binders.bxbinder.environment.spring.rabbitmq.virtual-host=baseDemo# 如果配置多个binders 需要指定默认的binders# spring.cloud.stream.default-binder=bxbinder#=====================配置binding 一个binding对应一个队列或者交换机# bindings后面的bxExchangeOrder、inputOrder 是接口中定义的名称#destination的值表示创建对应名称交换机或者队列spring.cloud.stream.bindings.bxExchangeOrder.destination=bxExchangeOrderspring.cloud.stream.bindings.inputOrder.destination=bxExchangeOrder#指定队列名。如果没有指定会生成一个很长的默认名字的队列。 此时生成的队列名就是destination的值+group的值spring.cloud.stream.bindings.inputOrder.group=bxOutputspring.cloud.stream.bindings.bxExchangeGoods.destination=bxExchangeGoodsspring.cloud.stream.bindings.inputGoods.destination=bxExchangeGoodsspring.cloud.stream.bindings.inputGoods.group=bxOutput
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
定义交换机和队列
public interface BxExchange { String OUTPUTORDER = "bxExchangeOrder"; String OUTPUTGOODS = "bxExchangeGoods"; //@Output 里面的值要和 配置文件中bindings.后面的值 相同 @Output(OUTPUTORDER) MessageChannel outputOrder(); @Output(OUTPUTGOODS) MessageChannel outputGoods();}public interface BxQueue { //@Input 里面的值要和 配置文件中bindings.后面的值相同 @Input("inputOrder") SubscribableChannel inputOrder(); @Input("inputGoods") SubscribableChannel inputGoods();}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
定义消费者和发送工具类
public class MqConsumer { private Logger logger = LoggerFactory.getLogger(MqConsumer.class); @StreamListener("inputOrder") public void processOrder(Object message) { System.out.println("received order message : " + message); } @StreamListener("inputGoods") public void processGoods(Object message) { System.out.println("received goods message : " + message); }}@Componentpublic class MqUtil { @Resource private MessageChannel bxExchangeOrder; @Resource private MessageChannel bxExchangeGoods; public boolean sendMessageOnOrder(JSONObject json){ MessageBuilder<JSONObject> messageBuilder = MessageBuilder.withPayload(json); boolean b = bxExchangeOrder.send(messageBuilder.build()); return b; } public boolean sendMessageOnGoods(JSONObject json){ MessageBuilder<JSONObject> messageBuilder = MessageBuilder.withPayload(json); boolean b = bxExchangeGoods.send(messageBuilder.build()); return b; }}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
config
@Configuration@EnableBinding({BxQueue.class, BxExchange.class})public class StreamMqConfig { @Bean public MqConsumer mqConsumer(){ return new MqConsumer(); }}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
测试
@RestController@RequestMapping("api/test")public class testController { @Autowired private MqUtil mqUtil; @GetMapping("/send") public R send(Object message) { GenEntity genEntity = new GenEntity().setCodeName("测试"). setDatabaseName("测试").setTableName("中国").setPrefix("xmkf"); JSONObject genJson = (JSONObject) JSONObject.toJSON(genEntity); boolean b = mqUtil.sendMessageOnOrder(genJson); return R.data("message sended : " + b); }}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
使用原生消息转发机制
springcloudStream其实自身实现了一套事件驱动的流程。这种流程,对于各种不同的MQ产品都是一样的。但是,毕竟每个MQ产品的实现机制和功能特性是不一样的,所以,SCStream还是提供了一套针对各个MQ产品的兼容机制。
在RabbitMQ的实现中,所有个性化的属性配置实现都是以spring.cloud..rabbit开头,支持对binder、producer、consumer进行单独配置
#绑定到已经存在的交换机 和 队列spring.cloud.stream.bindings.bxExchangeOrder.destination=rabbitExchangespring.cloud.stream.bindings.inputOrder.destination=rabbitExchangespring.cloud.stream.bindings.inputOrder.group=rabbitQueue#不自动声明exchange(自动声明的exchange都是topic)spring.cloud.stream.rabbit.bindings.bxExchangeOrder.producer.bind-queue=falsespring.cloud.stream.rabbit.bindings.bxExchangeOrder.producer.exchange-type=topicspring.cloud.stream.rabbit.bindings.bxExchangeOrder.producer.routing-key-expression=headers.routingKey#不自动创建队列spring.cloud.stream.rabbit.bindings.inputOrder.consumer.bind-queue=false#队列名只声明组名(前面不带destination前缀)spring.cloud.stream.rabbit.bindings.inputOrder.consumer.queue-name-group-only=true#交换机和队列通过routing-key 进行绑定spring.cloud.stream.rabbit.bindings.inputOrder.consumer.binding-routing-key=bbbb
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
消息发送
MessageBuilder<JSONObject> messageBuilder = MessageBuilder.withPayload(json);
boolean b = bxExchangeOrder.send(messageBuilder.setHeader("routingKey", "bbbb").build());
Headers路由
上面的代码都通过setHeader方法放入路由键 其实就是一种Headers路由。也可以根据业务要求,往head中存放其他的值。
如何获取head中的值
@StreamListener("inputOrder") public void processOrder(Message<Object> message) { Object payload = message.getPayload(); String routingKey = message.getHeaders().get("routingKey")+""; System.out.println("received order message : " + payload + "\" + "received order routingKey:"+ routingKey); }
- 1
- 2
- 3
- 4
- 5
- 6
分组消费策略
对于实现相同组消费者 只消费一次消息,不同组消费相同的消息
#==================原生rabbitmq的配置 使用已有的交换机和队列#绑定到已经存在的交换机 和 队列spring.cloud.stream.bindings.bxExchangeOrder.destination=rabbitExchangespring.cloud.stream.bindings.inputOrder.destination=rabbitExchangespring.cloud.stream.bindings.inputOrder.group=rabbitQueue#不自动声明exchange(自动声明的exchange都是topic)spring.cloud.stream.rabbit.bindings.bxExchangeOrder.producer.bind-queue=falsespring.cloud.stream.rabbit.bindings.bxExchangeOrder.producer.exchange-type=topicspring.cloud.stream.rabbit.bindings.bxExchangeOrder.producer.routing-key-expression=headers.routingKey#不自动创建队列spring.cloud.stream.rabbit.bindings.inputOrder.consumer.bind-queue=false#队列名只声明组名(前面不带destination前缀)spring.cloud.stream.rabbit.bindings.inputOrder.consumer.queue-name-group-only=true#交换机和队列通过routing-key 进行绑定spring.cloud.stream.rabbit.bindings.inputOrder.consumer.binding-routing-key=bbbb#====================分组消费=================spring.cloud.stream.bindings.inputGoods.destination=rabbitExchangespring.cloud.stream.bindings.inputGoods.group=rabbitQueue1spring.cloud.stream.rabbit.bindings.inputGoods.consumer.queue-name-group-only=truespring.cloud.stream.rabbit.bindings.inputGoods.consumer.binding-routing-key=bbbb
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
消费者
public class MqConsumer { private Logger logger = LoggerFactory.getLogger(MqConsumer.class); @StreamListener("inputOrder") public void processOrder(Message<Object> message) { Object payload = message.getPayload(); String routingKey = message.getHeaders().get("routingKey")+""; System.out.println("received order message : " + payload + "\" + "received order routingKey:"+ routingKey); } @StreamListener("inputGoods") public void processGoods(Message<Object> message) { Object payload = message.getPayload(); String routingKey = message.getHeaders().get("routingKey")+""; System.out.println("received goods message : " + payload + "\" + "received goods routingKey:"+ routingKey); }}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
其中inputOrder和inputGoods就是不同的分组然后,再开一个实例表示一组中有两个消费者
修改端口防止冲突
发送消息,可以发现相同组的消费者只会消费一次,不同的组的消费者会消费同一个消息
这是其中一个实例的消费,另一个实例没有
消费确认机制
最近项目中有用到stream继承rabbitmq,补充一下消息确认消费
配置文件:
spring: cloud: stream: default: consumer: # 重试次数 max-attempts: 5 rabbit: default: consumer: # 手动确认 acknowledge-mode: manual # 指定使用哪一个binders default-binder: demobinder binders: # 定义一个binder 表示rabbitmq demobinder: type: rabbit # 配置rabbitmq 环境配置 environment: spring: rabbitmq: host: ip port: 5672 username: xxxx password: xxxx virtual-host: xxxx # 绑定交换机和队列 bindings: hkUpcallbackExchange: destination: hkUpcallback hkDowncallbackExchange: destination: hkDowncallback hkUpcallbackQueue: destination: hkUpcallback group: hkOutput hkDowncallbackQueue: destination: hkDowncallback group: hkOutput
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
配置信息类
public interface HkExchange { String HK_UP_EXCHANGE = "hkUpcallbackExchange"; String HK_DOWN_EXCHANGE = "hkDowncallbackExchange"; //@Output 里面的值要和 配置文件中bindings.后面的值 相同 @Output(HK_UP_EXCHANGE) MessageChannel outputHkUp(); @Output(HK_DOWN_EXCHANGE) MessageChannel outputHkDown();}public interface HkQueue { String HK_UP_QUEUE = "hkUpcallbackQueue"; String HK_DOWN_QUEUE = "hkDowncallbackQueue"; @Input(HK_UP_QUEUE) SubscribableChannel inputHkUp(); @Input(HK_DOWN_QUEUE) SubscribableChannel inputHkDown();}@Conditional(MqConditional.class)@EnableBinding({HkExchange.class, HkQueue.class})public class StreamMqConfig { @Bean public HkMqConsumer hkMqConsumer(){ return new HkMqConsumer(); }}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
StreamMqConfig 添加了@Conditional(MqConditional.class)为了避免在开发环境中也开启mq,如果开发环境开启mq的话每次调试信息发送总会发到其他开发的小伙伴机器上去。
public class MqConditional implements Condition { @Override public boolean matches(ConditionContext conditionContext, AnnotatedTypeMetadata annotatedTypeMetadata) { String[] activeProfiles = conditionContext.getEnvironment().getActiveProfiles(); if ("dev".equals(activeProfiles[0])) return false; return true; }}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
消息发送和接收
@Slf4j//消费者public class HkMqConsumer { @StreamListener(HkQueue.HK_UP_QUEUE) public void processUp(JSONObject message, @Header(AmqpHeaders.CHANNEL) Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) Long deliveryTag) { System.out.println("received Up message : " + message); try { channel.basicAck(deliveryTag, false); } catch (IOException e) { e.printStackTrace(); } } @StreamListener(HkQueue.HK_DOWN_QUEUE) public void processDown(JSONObject message,@Header(AmqpHeaders.CHANNEL) Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) Long deliveryTag) { System.out.println("received down message : " + message); try { channel.basicAck(deliveryTag, false); } catch (IOException e) { e.printStackTrace(); } }}@Component@Datapublic class MqUtil { @Lazy @Autowired private MessageChannel hkUpcallbackExchange; @Lazy @Autowired private MessageChannel hkDowncallbackExchange; public R sendMessage(String exchangeName,JSONObject json) { try { Class<? extends MqUtil> aClass = this.getClass(); Field field = aClass.getDeclaredField(exchangeName); field.setAccessible(true); Object o = field.get(this); MessageChannel channel = (MessageChannel)o; MessageBuilder<JSONObject> messageBuilder = MessageBuilder.withPayload(json); if (channel.send(messageBuilder.build())){ return R.success("mq发送成功"); }else { return R.success("mq发送失败"); } } catch (NoSuchFieldException | IllegalAccessException e) { e.printStackTrace(); return R.fail("反射异常,找不到对应属性"); } catch (NoUniqueBeanDefinitionException en) { en.printStackTrace(); return R.fail("dev不配置mq 请使用test环境"); } catch (Exception ex){ ex.printStackTrace(); return R.fail("其他异常:"+ex.getMessage()); } }}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
封装了mq的发送 mqUtil.sendMessage(HkExchange.HK_UP_EXCHANGE, genJson);
后续发送只需要调api就好,避免重复写发送代码