📂SpringCloudStream

您所在的位置:网站首页 rocketmq挂了怎么办 📂SpringCloudStream

📂SpringCloudStream

2023-03-24 21:56| 来源: 网络整理| 查看: 265

概述注意点代码pom依赖application.properties配置文件Controller启动类消费者生产者启动测试

出自 图灵学院 ,我自己学了一下,然后自己做了个笔记,再结合老师的讲义,整理了一下,写了个博客

概述

SpringCloudStream是Spring社区提供的一个统一的消息驱动框架,目的是想要以一个统一的编程模型来对接所有的MQ消息中间件产品。我们还是来看看SpringCloudStream如何来集成RocketMQ。

SpringCloudStream更牛逼的事情就是解耦,假如说我以后换MQ了,我把RocketMQ换成RabbitMQ了,或者Kafka了,我代码不需要任何改动,只需要换Maven依赖和properties配置文件即可.

使用SpringCloudStream可以让我们更多的经历关注我们的业务而不是各种MQ产品配置

注意点 关于SpringCloudStream。这是一套几乎通用的消息中间件编程框架,例如从对接RocketMQ换到对接Kafka,业务代码几乎不需要动,只需要更换pom依赖并且修改配置文件就行了。但是,由于各个MQ产品都有自己的业务模型,差距非常大,所以使用使用SpringCloudStream时要注意业务模型转换。并且在实际使用中,要非常注意各个MQ的个性化配置属性。例如RocketMQ的个性化属性都是以spring.cloud.stream.rocketmq开头,只有通过这些属性才能用上RocketMQ的延迟消息、排序消息、事务消息等个性化功能。SpringCloudStream是Spring社区提供的一套统一框架,但是官方目前只封装了kafka、kafka Stream、RabbitMQ的具体依赖。而RocketMQ的依赖是交由厂商自己维护的,也就是由阿里巴巴自己来维护。这个维护力度显然是有不小差距的。所以一方面可以看到之前在使用SpringBoot时着重强调的版本问题,在使用SpringCloudStream中被放大了很多。spring-cloud-starter-stream-rocketmq目前最新的2.2.3.RELEASE版本中包含的rocketmq-client版本还是4.4.0。这个差距就非常大了。另一方面,RocketMQ这帮大神不屑于写文档的问题也特别严重,SpringCloudStream中关于RocketMQ的个性化配置几乎很难找到完整的文档。总之,对于RocketMQ来说,SpringCloudStream目前来说还并不是一个非常好的集成方案。这方面跟kafka和Rabbit还没法比。所以使用时要慎重。

代码

pom依赖 org.apache.rocketmq rocketmq-client 4.7.1 org.apache.rocketmq rocketmq-acl 4.7.1 com.alibaba.cloud spring-cloud-starter-stream-rocketmq 2.2.3.RELEASE org.apache.rocketmq rocketmq-client org.apache.rocketmq rocketmq-acl org.springframework.boot spring-boot-starter-web 2.3.3.RELEASE

application.properties配置文件#消息的生产者 input来自sink.classspring.cloud.stream.bindings.input.destination=TestTopicspring.cloud.stream.bindings.input.group=scGroup# 消息的发送者 output 是source.classspring.cloud.stream.bindings.output.destination=TestTopic# NameServer的地址spring.cloud.stream.rocketmq.binder.name-server=zjj101:9876;zjj102:9876;zjj103:9876

Controllerpackage com.roy.scrocket.controller;import com.roy.scrocket.basic.ScProducer;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource;@RestController@RequestMapping("/MQTest")public class MQTestController { @Resource private ScProducer producer; /** * 发送消息 * * @param message * @return */ @RequestMapping("/sendMessage") public String sendMessage(String message) { producer.sendMessage(message); return "消息发送完成"; }}

启动类package com.roy.scrocket;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;import org.springframework.cloud.stream.annotation.EnableBinding;import org.springframework.cloud.stream.messaging.Sink;import org.springframework.cloud.stream.messaging.Source;//EnableBinding 的意思是@EnableBinding({Source.class, Sink.class})@SpringBootApplicationpublic class ScRocketMQApplication { public static void main(String[] args) { SpringApplication.run(ScRocketMQApplication.class, args); }}

消费者package com.roy.scrocket.basic;import org.springframework.cloud.stream.annotation.StreamListener;import org.springframework.cloud.stream.messaging.Sink;import org.springframework.stereotype.Component;@Componentpublic class ScConsumer { @StreamListener(Sink.INPUT) public void onMessage(String messsage) { System.out.println("received message:" + messsage + " from binding:" + Sink.INPUT); }}

生产者package com.roy.scrocket.basic;import org.apache.rocketmq.common.message.MessageConst;import org.springframework.cloud.stream.messaging.Source;import org.springframework.messaging.Message;import org.springframework.messaging.MessageHeaders;import org.springframework.messaging.support.MessageBuilder;import org.springframework.stereotype.Component;import javax.annotation.Resource;import java.util.HashMap;import java.util.Map;@Componentpublic class ScProducer { @Resource private Source source; public void sendMessage(String msg) { Map headers = new HashMap(); headers.put(MessageConst.PROPERTY_TAGS, "testTag"); MessageHeaders messageHeaders = new MessageHeaders(headers); Message message = MessageBuilder.createMessage(msg, messageHeaders); //发送消息 this.source.output().send(message); }}

启动测试启动ScRocketMQApplication,然后执行get请求: http://localhost:8080/MQTest/sendMessage?message=demoData


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3