Spring Boot

您所在的位置:网站首页 谷雨节气课程 Spring Boot

Spring Boot

#Spring Boot| 来源: 网络整理| 查看: 265

一、概述

      消息队列(Message Queue,简称 MQ)是阿里巴巴集团中间件技术部自主研发的专业消息中间件。产品基于高可用分布式集群技术,提供消息发布订阅、消息轨迹查询、定时(延时)消息、资源统计、监控报警等一系列消息云服务,是企业级互联网架构的核心产品。MQ 历史超过9年,为分布式应用系统提供异步解耦、削峰填谷的能力,同时具备海量消息堆积、高吞吐、可靠重试等互联网应用所需的特性,是阿里巴巴双11使用的核心产品。

二、Spring Boot集成

      阿里云官方文档及demo是整合Spring,使用的是xml配置,在这里,我结合官方文档,在Spring Boot框架下整合MQ,由于在网上相关文档及demo不是很多,所以有很多不足的地方,如果有更好的建议,还请多多指教。       基础的项目搭建在这里就不多说了,在网上也有很多教程,就直接进入正题了。

1. 参数的配置

      我把MQ相关的参数写在Application.properties中

#aliyun MQ config producerId=*** //替换为自己的账号 consumerId=*** accessKey=*** secretKey=*** tag=*** topic=*** onsAddr=http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet sendMsgTimeoutMillis=3000 suspendTimeMillis=100 maxReconsumeTimes=20 2. demo

@Configuration标注在类上,相当于把该类作为spring的xml配置文件中的,作用为:配置spring容器(应用上下文) @Bean标注在方法上(返回某个实例的方法),等价于spring的xml配置文件中的,作用为:注册bean对象

@Configuration public class AliMQConfig { @Value("${producerId}") public String producerId; @Value("${consumerId}") public String consumerId; @Value("${accessKey}") public String accessKey; @Value("${secretKey}") public String secretKey; @Value("${topic}") public String topic; @Value("${tag}") public String tag; @Value("${onsAddr}") public String onsAddr; //超时时间 @Value("${sendMsgTimeoutMillis}") public String sendMsgTimeoutMillis; @Value("${suspendTimeMillis}") public String suspendTimeMillis; @Value("${maxReconsumeTimes}") public String maxReconsumeTimes; @Bean(initMethod = "start", destroyMethod = "shutdown") public ProducerBean getProducer() { ProducerBean producerBean = new ProducerBean(); Properties properties = new Properties(); properties.put(PropertyKeyConst.ProducerId, producerId); // AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建 properties.put(PropertyKeyConst.AccessKey, accessKey); // SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建 properties.put(PropertyKeyConst.SecretKey, secretKey); properties.put(PropertyKeyConst.SendMsgTimeoutMillis, sendMsgTimeoutMillis); properties.put(PropertyKeyConst.ONSAddr, onsAddr); producerBean.setProperties(properties); return producerBean; } @Bean(initMethod = "start", destroyMethod = "shutdown") public ConsumerBean getConsumer() { ConsumerBean consumerBean = new ConsumerBean(); Properties properties = new Properties(); properties.put(PropertyKeyConst.ConsumerId, consumerId); // AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建 properties.put(PropertyKeyConst.AccessKey, accessKey); // SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建 properties.put(PropertyKeyConst.SecretKey, secretKey); properties.put(PropertyKeyConst.SuspendTimeMillis, suspendTimeMillis); properties.put(PropertyKeyConst.MaxReconsumeTimes, maxReconsumeTimes); properties.put(PropertyKeyConst.ONSAddr, onsAddr); consumerBean.setProperties(properties); Subscription subscription = new Subscription(); subscription.setTopic(topic); subscription.setExpression(tag); Map map = new HashMap(); map.put(subscription, new AliMQConsumerListener()); consumerBean.setSubscriptionTable(map); return consumerBean; }

这个类相当于官方demo里的xml配置,由于SpringBoot旨在减少xml配置,所以在这里就把xml配置转换为java代码。 下面是生产者发送消息:

@Component public class AliMQUtil { private static final Logger logger = LoggerFactory.getLogger(AliMQUtil.class); @Autowired private AliMQConfig aliMQConfig; @Value("${topic}") public String topic; // 发送消息 public void sendMessage(String tag, String key, byte[] body) { Producer producer = aliMQConfig.getProducer(); Message msg = new Message(topic, tag, body); //msg.setKey(key); try { SendResult sendResult = producer.send(msg); if (sendResult != null) { logger.info("消息发送成功:" + sendResult.toString()); } } catch (ONSClientException e) { logger.info("消息发送失败:", e); // 出现异常意味着发送失败,为了避免消息丢失,建议缓存该消息然后进行重试。 } } }

消费者监听:

public class AliMQConsumerListener implements MessageListener { private static final Logger logger = LoggerFactory.getLogger(AliMQConsumerListener.class); @Override public Action consume(Message message, ConsumeContext context) { String msg = ""; try { //do something.. msg = new String(message.getBody(), "UTF-8"); logger.info("订阅消息:" + msg); return Action.CommitMessage; } catch (Exception e) { //消费失败 logger.info("消费失败:" + msg); return Action.ReconsumeLater; } }

当项目启动,消费者就开始监听消息,实测,项目启动后台就会打印消息日志。 由于官方文档中说明MQ 的消费者和生产者客户端对象是线程安全的,可以在多个线程之间共享使用,所以这里配置的是跟随项目启动,创建一个实例,多线程之间可以共享这一实例。由于对多线程的理解不是很深刻,如果不当,还请指正。       以上就是我在Spring Boot中对于MQ的整合方案,有很多不足的地方,希望和大家一起交流学习。

补充:MQTT的使用

针对用户在移动互联网以及物联网领域的存在的特殊消息传输需求,MQ 开放了 MQTT 协议的完整支持。

消息队列遥测传输(Message Queuing Telemetry Transport,简称 MQTT)是一种轻量的,基于发布订阅模型的即时通讯协议。该协议设计开放,协议简单,平台支持丰富,几乎可以把所有联网物品和外部连接起来,因此在移动互联网和物联网领域拥有众多优势。

协议的特点包括:

使用发布/订阅消息模式,提供一对多的消息分发,解除了应用程序之间的耦合; 对负载内容屏蔽的消息传输; 使用 TCP/IP 提供基础的网络连接; 有三种级别的消息传递服务; 小型传输,开销很小(固定长度的头部是2字节),协议交换最小化,以降低网络流量。

客户端分布: 使用 MQTT 协议的客户端作为移动端接入 MQ,一般分布在公网环境,比如嵌入式设备、移动手机、平板、浏览器之类的平台上。 使用 MQ 协议的客户端一般作为业务上的服务端接入 MQ,应该部署在阿里云的 ECS 环境。

MQ服务端:MQ发送MQTT消息,只需要在原来的代码做点小改动即可。

@Component public class AliMQUtil { private static final Logger logger = LoggerFactory.getLogger(AliMQUtil.class); @Autowired private AliMQConfig aliMQConfig; private static String topic; private static String groupId; @Value("${topic}") public String top; @Value("${groupId}") public String gId; public void setAliMQConfig(AliMQConfig aliMQConfig) { this.aliMQConfig = aliMQConfig; } public AliMQConfig getAliMQConfig() { return aliMQConfig; } private static AliMQUtil aliMQUtil; @PostConstruct public void init() { topic = top; groupId = gId; aliMQUtil = this; aliMQUtil.aliMQConfig = this.aliMQConfig; } // 发送消息 public static void sendMessage(String vehicleId, byte[] body) { Producer producer = aliMQUtil.aliMQConfig.getProducer(); final Message msg = new Message(topic, // MQ 消息的 Topic,需要事先创建 "MQ2MQTT", // MQ Tag,通过 MQ 向 MQTT 客户端发消息时,必须指定 MQ2MQTT 作为 Tag,其他 Tag 或者不设都将导致 MQTT 客户端收不到消息 body);// 消息体,和 MQTT 的 body 对应 /** * 使用 MQ 客户端给 MQTT 设备发送 P2P 消息时,需要在 MQ 消息中设置 mqttSecondTopic 属性 * 设置的值是“/p2p/”+目标 ClientID */ String targetClientID = groupId + "@@@" + vehicleId; msg.putUserProperties("mqttSecondTopic", "/p2p/" + targetClientID); try { SendResult sendResult = producer.send(msg); if (sendResult != null) { logger.info("消息发送成功:" + sendResult.toString()); } } catch (ONSClientException e) { logger.info("消息发送失败:", e); // 出现异常意味着发送失败,为了避免消息丢失,建议缓存该消息然后进行重试。 } } }

客户端:不能开发语言的demo都可以在阿里云帮助文档找到,我这里使用的是java,用于简单测试。

public static void acceptMessage() throws Exception { /** * 设置当前用户私有的 MQTT 的接入点。例如此处示意使用 XXX,实际使用请替换用户自己的接入点。接入点的获取方法是,在控制台创建 MQTT * 实例,每个实例都会分配一个接入点域名。 */ final String broker = "tcp://****.mqtt.aliyuncs.com:1883"; /** * 设置阿里云的 AccessKey,用于鉴权 */ final String acessKey = "***"; /** * 设置阿里云的 SecretKey,用于鉴权 */ final String secretKey = "***"; /** * 发消息使用的一级 Topic,需要先在 MQ 控制台里创建 */ final String topic = "***"; /** * MQTT 的 ClientID,一般由两部分组成,[email protected]@@DeviceID * 其中 GroupID 在 MQ 控制台里创建 * DeviceID 由应用方设置,可能是设备编号等,需要唯一,否则服务端拒绝重复的 ClientID 连接 */ final String clientId = "***@@@0001"; String sign; MemoryPersistence persistence = new MemoryPersistence(); try { final MqttClient sampleClient = new MqttClient(broker, clientId, persistence); final MqttConnectOptions connOpts = new MqttConnectOptions(); System.out.println("Connecting to broker: " + broker); /** * 计算签名,将签名作为 MQTT 的 password * 签名的计算方法,参考工具类 MacSignature,第一个参数是 ClientID 的前半部分,即 GroupID * 第二个参数阿里云的 SecretKey */ sign = MacSignature.macSignature(clientId.split("@@@")[0], secretKey); /** * 设置订阅方订阅的 Topic 集合,此处遵循 MQTT 的订阅规则,可以是一级 Topic,二级 Topic,P2P 消息请订阅/p2p */ final String[] topicFilters = new String[] { topic + "/notice/", topic + "/p2p" }; final int[] qos = { 0, 0 }; connOpts.setUserName(acessKey); connOpts.setServerURIs(new String[] { broker }); connOpts.setPassword(sign.toCharArray()); connOpts.setCleanSession(true); connOpts.setKeepAliveInterval(90); connOpts.setAutomaticReconnect(true); sampleClient.setCallback(new MqttCallbackExtended() { public void connectComplete(boolean reconnect, String serverURI) { System.out.println("connect success"); // 连接成功,需要上传客户端所有的订阅关系 try { sampleClient.subscribe(topicFilters, qos); } catch (MqttException e) { e.printStackTrace(); } } public void connectionLost(Throwable throwable) { System.out.println("mqtt connection lost"); } public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception { System.out.println("messageArrived:" + topic + "------" + new String(mqttMessage.getPayload())); } public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { System.out.println("deliveryComplete:" + iMqttDeliveryToken.getMessageId()); } }); // 客户端每次上线都必须上传自己所有涉及的订阅关系,否则可能会导致消息接收延迟 sampleClient.connect(connOpts); // 每个客户端最多允许存在30个订阅关系,超出限制可能会丢弃导致收不到部分消息 sampleClient.subscribe(topicFilters, qos); Thread.sleep(Integer.MAX_VALUE); } catch (Exception me) { me.printStackTrace(); } }


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3