Spring Cloud微服务项目模板系列(四)添加rocketmq公共消息队列模块

您所在的位置:网站首页 rocketmq消费队列和写入队列 Spring Cloud微服务项目模板系列(四)添加rocketmq公共消息队列模块

Spring Cloud微服务项目模板系列(四)添加rocketmq公共消息队列模块

2023-04-17 16:32| 来源: 网络整理| 查看: 265

上一篇文章《Spring Cloud微服务项目模板系列(三)添加Spring cloud gateway网关》我们在添加网关之后,其实一个基础的项目就已经完全成型了,这里的话,我们再极少一个添加rocketmq的公共模块,这样子只要使用到rocketmq的话,则直接引入这个模块即可。如下图:

image.png

这里我们创建的是shjop-rocketmq模块,这个模块我们主要填写的内容有:

引入rocketmq的maven依赖 编写rocketmq的集中生产者

备注:

1、这个模块里面我们不能写消费者,因为各个服务的消费者业务是不一样的,如果把消费者也写到公共的模块里面,那么只要引入这个公共项目就会出现进行消费,这样子会导致A服务消费了B服务的内容,容易出错。所以千万不要把消费者放在公共的模块里面来,我们必须把消费者放到对应的业务子模块里面来。

对于公共模块的生产者,我们列举的功能内容有:

发送普通消息:支持同步消息,异步消息,单向消息 发送顺序消息:支持同步顺序消息,异步顺序消息,单向顺序消息 发送事务半消息:这里主要是分布式事务使用

对于前两个来说,比较简单,在使用的时候,我们在子模块里面引入rocketmq的公共模块即可,然后在子模块的配置文件里面需要进行如下的配置:

rocketmq:   name-server: 192.168.31.218:9876   producer:      group: shop-order # 生产者组

切记,不管是子模块中只使用到生产者或者只使用到消费者,或者是生产者和消费者都是用到的情况下,我们都需要配置一个这个rocketmq.producer.group。因为集成的是公共模块,因此不引入的话会报错。

配置完成之后,我们只需要在子模块里面引入对应的消息生产者即可,然后使用对应的生产者发送消息,示例如下:

@Autowired private NormalProducer normalProducer; normalProducer.syncSendMsg("order-topic", order);

这样子就可以一步到位了。

下面我们重点说下半消息事务这块:

对于半消息事务这里其实他的整个流程图如下:

68A6218C-7B2A-4221-813C-B9FC30C413F6.png

所以对于要使用半事务消息的话,我们需要在生产者端做如下消息:

在生产者端,编写一个半事务消息生产者,发送半事务消息 在生产者端,编写一个半事务消息的listener,用意监听rocketmq的的回调及回查。

基于这两点,所以我们的代码流程如下:

一、编写一个半事务消息的生产者

package org.shop.rocketmq.producer; import org.apache.rocketmq.client.producer.TransactionSendResult; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component; import com.alibaba.fastjson.JSON; import cn.hutool.crypto.SecureUtil; import lombok.extern.slf4j.Slf4j; @Component @Slf4j public class TransactionProducer { @Autowired private RocketMQTemplate rocketMQTemplate; /**  * 这里发送一个半事务消息  *   * @param topic  * @param msg  */ public void sendMessageInTransaction(String producerGroup, String topic, Object msg) { String txId = SecureUtil.md5(JSON.toJSONString(msg)); TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(producerGroup, topic, MessageBuilder.withPayload(msg).setHeader("txId", txId).build(), msg); log.info("准备发送mq消息,发送生产者group是: {}  主题是:{} , 发送内容时:{} ,txId是:{}  发送结果是:{}", new Object[] { producerGroup, topic, JSON.toJSONString(msg), txId, JSON.toJSONString(result) }); } }

这个生产者里面有几个重要的指标:

producerGroup,这个group需要和监听者保持一致 topic   这是发送到哪个topic上 msg      这里是发送的消息 txid     这里主要是为每一条消息生成一个事务id,这里的事务i需要在后面本地事务上进行执行。

二、编写生产者半事务消息的listener,代码示例如下:

package org.shop.order.manager; import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener; import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener; import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState; import org.shop.order.service.OrderService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.Message; import org.springframework.stereotype.Component; import org.springframework.stereotype.Service; import lombok.extern.slf4j.Slf4j; @Component @Slf4j @RocketMQTransactionListener(txProducerGroup = "shop-order-tx") public class OrderRocketTransactionListener implements RocketMQLocalTransactionListener { @Autowired private OrderService orderService; @Override public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) { try { // 本地事物 String txId = msg.getHeaders().get("txId").toString(); log.info("调用了创建订单的本地事务监听方法方法:{}",txId); // 这里需要执行本地的事务消息 TODO,例如保存数据,执行成功的话 orderService.createOrder(); return RocketMQLocalTransactionState.COMMIT; } catch (Exception e) { log.error(e.getMessage(), e); return RocketMQLocalTransactionState.ROLLBACK; } } /**  * 这个方法是mq向本地进行回查结果的  */ @Override public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { String txId = msg.getHeaders().get("txId").toString(); log.info("调用了回查方法:{}",txId); //根据id查询是否创建成功 TODO if (txId != null) { return RocketMQLocalTransactionState.COMMIT; } else { return RocketMQLocalTransactionState.ROLLBACK; } } }

这里有几个知识点:

1)在@RocketMQTransactionListener这个注解里面配置的producergroup需要和上面那步生产者的producergroup一模一样。

2)executeLocalTransaction这个方法非常重要,主要的逻辑是,生产者那边只管发送消息,具体的业务需要在这里做,例如这里的创建订单,我们需要调用dao,对数据库进行订单的创建,同时这里我们需要使用@transaction注解。然后本地事务执行成功,我们返回commit,执行不成功,我们则返回rollback。

3)checkLocalTransaction这个方法主要是rocketmq的回查,如果executeLocalTransaction出现什么意外情况,一直没有return,这时候rocketmq就相当于认为没有创建成功,所以需要回查,在回查里面我们可以判断下订单是否创建成功,如果成功,则返回commit,如果不成功,则返回rollback。

备注:

1、这里的listener的话,一般一个生产者业务编写一个,因为他监听的是对应的生产者group,如果这个group的处理逻辑是一样的,则listener就可以复用,如果处理逻辑不一样,则我们需要编写其他的listener,同时group名称不能一样。

2、对于使用rocketmq做半事务消息的话,个人觉得编码的复杂度较高,建议使用其他的框架,例如:tcc-transaction或者Himily或者seta等等。

最后按照管理,我们附上本案例的源码,登录后即可下载。

登录访问 本站用户 免费查看 登录账号 您未登录,请登录 或 注册后查看


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3