springcloud+rabbitmq实现分布式事务【死信队列实现】

您所在的位置:网站首页 分布式事务消息队列 springcloud+rabbitmq实现分布式事务【死信队列实现】

springcloud+rabbitmq实现分布式事务【死信队列实现】

2024-07-08 22:46| 来源: 网络整理| 查看: 265

说明:本文章是基于springcloud+springboot+rabbitmq实现的分布式事务,注册中心为eureka,服务调用为openfeign。使用死信队列完成分布式事务【即补偿机制】,上篇文章使用简单队列完成分布式。

业务需求:用户支付功能,如果用户支付成功,需要改变订单表中订单状态为支付状态【跨服务调用修改订单状态接口】,同时库存表中减库存数量【跨服务调用修改库存数量接口】。

下面提到的回款操作是商家不愿意看到,很多商家不愿意这样,这里回款操作只是文章需要。

技术说明【那些消息会放到死信队列】:

      a.被拒绝的消息

      b.消息过期

      c.队列过长

实现逻辑分析:

1.如果用户支付失败,直接抛异常不走下面操作,只需要本地事务回滚即可。

2.如果用户支付成功,调用修改订单状态接口失败,需要在支付接口中抛异常,通过异常处理给已经支付从用户进行回款操作

3.如果用户支付成功,调用修改订单状态接口成功,调用修改库存接口失败【调用修改库存需要使用消息中间件处理】,也就是消息中间件发送消息失败或者消息消费失败两种情况:a.如果是发送消息失败,处理方案为采取消息发送回调机制,在回调机制中进行重试机制或者说消息发送失败根本就不想进行重试机制,那么就选择补偿机制,即在回调机制中进行回款操作和更改订单为未支付状态。b.如果是消费消息失败,那么采用手动签收消息机制,通过异常处理消息,catch到异常进行消息消费进行手动拒绝策略,手动拒绝后消息会进入的死信队列中,之后在进行死信队列进行监听,监听到消息后就可以信息业务操作了即回款操作和修改订单状态为未支付状态【其实回款操作也是可以通过消息队列实现,这里就不说了】。

不说了,上代码,代码还是分为  用户支付服务   和    商品订单库存服务

一:用户支付服务工程目录说明:

pom.xml文件

org.springframework.cloud spring-cloud-starter-openfeign org.springframework.boot spring-boot-starter-amqp

application.yml文件【主要是看消息中间件配置】

server: port: 7300 spring: application: name: cloud-diemq-start-7300 datasource: driver-class-name: org.postgresql.Driver url: jdbc:postgresql://localhost:5432/myMq?useSSL=false username: postgres password: root rabbitmq: host: 11.11.177.170 port: 45672 username: liqingwei01 # sod-rabbitmq password: liqingwei01 virtual-host: mymqdemo ###开启消息确认机制 confirms publisher-confirms: true publisher-returns: true eureka: client: register-with-eureka: true fetch-registry: true service-url: defaultZone: http://localhost:80/eureka mybatis-plus: mapper-locations: classpath:mapping/*.xml type-aliases-package: com.diestart.model #开启驼峰 configuration: map-underscore-to-camel-case: true #分页插件 pagehelper: auto-dialect: postgresql reasonable: true support-methods-arguments: true

GoodsStockModel和OrderModel文件

import lombok.Data; /** * 商品库存 * @author 李庆伟 * @date 2021/6/11 10:32 */ @Data public class GoodsStockModel { private String id;//商品id private String goodsName;//商品名称 private String goodsNum;//商品数量 private String goodsType;//商品类型 } import lombok.Data; import java.util.Date; /** * @author 李庆伟 * @date 2021/6/11 10:31 */ @Data public class OrderModel { private String id;//订单id private String orderNo;//订单编号 private String goodsId;//商品id private String goodsNum;//商品数量 private String orderType;//订单状态 0未支付 1支付 private Date createDate;//订单创建时间 private String createUser;//下单人 }

 

RabbitMqConfig文件

import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; import java.util.HashMap; import java.util.Map; /** * @author 李庆伟 * @date 2021/6/11 16:58 */ @Component public class RabbitMqConfig { // 业务队列 public static final String GOODS_STOCK_CUT_QUEUE = "order_goods_stock_queue"; // 业务交换机 public static final String ORDER_GOODS_STOCK_EXCHANGE = "order_goods_stock_exchange"; // 死信队列 public static final String ORDER_GOODS_STOCK_DIE_QUEUE = "order_goods_stock_die_queue"; // 死信交换机 public static final String ORDER_GOODS_STOCK_DIE_EXCHANGE = "order_goods_stock_die_exchange"; //声明一个死信队列用来存放死信消息 @Bean public Queue orderGoodsStockDieQueue() { return new Queue(ORDER_GOODS_STOCK_DIE_QUEUE,true,false,false,null); } //声明一个死信交换机 @Bean public TopicExchange orderGoodsStockDieExchange() { return new TopicExchange(ORDER_GOODS_STOCK_DIE_EXCHANGE,true,true); } //将死信队列和死信的交换机绑定 @Bean public Binding dieQueueBindingDieExchange() { return BindingBuilder.bind(orderGoodsStockDieQueue()). to(orderGoodsStockDieExchange()). with("dieRoutingKeys"); } //声明一个业务队列 @Bean public Queue orderGoodsStockQueue() { Map args = new HashMap(); // 声明 死信队列Exchange args.put("x-dead-letter-exchange", ORDER_GOODS_STOCK_DIE_EXCHANGE); // 设置死信routingKey args.put("x-dead-letter-routing-key", "dieRoutingKeys"); return new Queue( GOODS_STOCK_CUT_QUEUE, true, false, false, args); } //声明一个业务交换机 @Bean public TopicExchange orderGoodsStockExchange() { return new TopicExchange(ORDER_GOODS_STOCK_EXCHANGE); } //将业务队列和业务交换机绑定 @Bean public Binding bindingDirect() { return BindingBuilder.bind(orderGoodsStockQueue()).to(orderGoodsStockExchange()).with("routingKeys"); } }

RabbitGoodsStockUitl文件

import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageBuilder; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * @author 李庆伟 * @date 2021/6/7 20:41 */ @Component public class RabbitGoodsStockUitl implements RabbitTemplate.ConfirmCallback{ @Autowired private RabbitTemplate rabbitTemplate; public void send(String orderId) { // 封装消息 Message message = MessageBuilder.withBody(orderId.getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON) .setContentEncoding("utf-8").setMessageId(orderId).build(); // 构建回调返回的数据 CorrelationData correlationData = new CorrelationData(orderId); // 发送消息 rabbitTemplate.setMandatory(true); rabbitTemplate.setConfirmCallback(this); rabbitTemplate.convertAndSend("order_goods_stock_exchange", "routingKeys", message, correlationData); System.out.println("消息发送完成了 。。。。。。。。。"); System.out.println("消息发送完成了 。。。。。。。。。"); System.out.println("消息发送完成了 。。。。。。。。。"); } @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { String orderId = correlationData.getId(); //id 都是相同的哦 全局ID System.out.println("消息id:" + correlationData.getId()); if (ack) { //消息发送成功 System.out.println("消息发送 确认成功"); } else { //重试机制 send(orderId); System.out.println("消息发送 确认失败:" + cause); } } }

AffairProService文件:主要是跨服务调用

import com.diestart.model.OrderModel; import org.springframework.cloud.openfeign.FeignClient; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; /** * @author 李庆伟 * @date 2021/6/6 17:26 */ @FeignClient("cloud-fordiemq-end-9300") public interface AffairProService { /** * 订单详情 * [orderId] * @return {@link OrderModel} * @throws * @author 李庆伟 * @date 2021/6/22 10:53 */ @PostMapping("/orders/show/{orderId}") public OrderModel show(@PathVariable("orderId") String orderId); /** * 修改订单状态 * [orderModel] * @return {@link String} * @throws * @author 李庆伟 * @date 2021/6/22 10:53 */ @PostMapping("/orders/update") public String update(@RequestBody OrderModel orderModel); }

PaymentsController文件

import com.diestart.service.PaymentsService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.*; /** * 支付服务 * @author 李庆伟 * @date 2021/6/11 10:47 */ @RequestMapping("payments") @RestController public class PaymentsController { @Autowired private PaymentsService paymentsService; /** * 模拟支付接口 * 支付完成后会调用: * 1.修改订单为支付状态接口 * 2.减库存接口 * [orderId] * @return {@link String} * @throws * @author 李庆伟 * @date 2021/6/11 14:20 */ @PostMapping("makePayments/{orderId}") public String makePayments(@PathVariable("orderId") String orderId){ paymentsService.makePayments(orderId); return "ok"; } }

PaymentsService文件

/** * @author 李庆伟 * @date 2021/6/7 11:24 */ public interface PaymentsService { void makePayments(String orderId); }

PaymentsServiceImpl文件

import com.diestart.config.RabbitGoodsStockUitl; import com.diestart.feign_service.AffairProService; import com.diestart.model.OrderModel; import com.diestart.service.PaymentsService; import org.apache.commons.lang.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import javax.annotation.Resource; /** * @author 李庆伟 * @date 2021/6/7 11:24 */ @Service public class PaymentsServiceImpl implements PaymentsService { @Resource private AffairProService affairProService; @Autowired private RabbitGoodsStockUitl rabbitGoodsStockUitl; /** * 模拟支付接口 * 支付完成后会调用: * 1.修改订单为支付状态接口 * 2.减库存接口 */ @Transactional public void makePayments(String orderId) { //调用查询订单 OrderModel orderModel = affairProService.show(orderId); if(orderModel == null){ throw new RuntimeException("订单错误。。。。"); } //模拟支付 System.out.println("支付完成啦啦啦。。。。。。。。。"); //调用订单支付状态接口 orderModel.setOrderType("1"); String orderType = affairProService.update(orderModel); if(orderType == null || StringUtils.isEmpty(orderType)){ throw new RuntimeException("订单状态修改失败"); } //消息队列调用减库存接口 rabbitGoodsStockUitl.send(orderId); } }

二:订单库存服务工程目录说明:

pom.xml文件

org.springframework.boot spring-boot-starter-amqp

application.yml文件【重点为rabbitmq配置,更改你的mq配置】

server: port: 9300 spring: application: name: cloud-fordiemq-end-9300 datasource: driver-class-name: org.postgresql.Driver url: jdbc:postgresql://localhost:5432/myMq?useSSL=false username: postgres password: root rabbitmq: host: 11.11.177.170 port: 45672 username: liqingwei01 password: liqingwei01 virtual-host: mymqdemo listener: simple: retry: ####开启消费者(程序出现异常的情况下会)进行重试 enabled: true ####最大重试次数 max-attempts: 5 ####重试间隔次数 initial-interval: 3000 ####开启手动ack acknowledge-mode: manual eureka: client: register-with-eureka: true fetch-registry: true service-url: defaultZone: http://localhost:80/eureka mybatis-plus: mapper-locations: classpath:mapping/*.xml type-aliases-package: com.dieend.model #开启驼峰 configuration: map-underscore-to-camel-case: true #分页插件 pagehelper: auto-dialect: postgresql reasonable: true support-methods-arguments: true

GoodsStockModel实体和OrderModel实体

import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableName; import lombok.Data; /** * 商品库存 * @author 李庆伟 * @date 2021/6/11 10:32 */ @Data @TableName("t_goods_stock") public class GoodsStockModel { @TableId private String id;//商品id private String goodsName;//商品名称 private Integer goodsNum;//商品数量 private String goodsType;//商品类型 } import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableName; import lombok.Data; import java.util.Date; /** * @author 李庆伟 * @date 2021/6/11 10:31 */ @Data @TableName("t_order") public class OrderModel { @TableId private String id;//订单id private String orderNo;//订单编号 private String goodsId;//商品id private Integer goodsNum;//商品数量 private String orderType;//订单状态 0未支付 1支付 private Date createDate;//订单创建时间 private String createUser;//下单人 }

RabbitMqConfig文件

import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; import java.util.HashMap; import java.util.Map; /** * @author 李庆伟 * @date 2021/6/11 16:58 */ @Component public class RabbitMqConfig { // 业务队列 public static final String GOODS_STOCK_CUT_QUEUE = "order_goods_stock_queue"; // 业务交换机 public static final String ORDER_GOODS_STOCK_EXCHANGE = "order_goods_stock_exchange"; // 死信队列 public static final String ORDER_GOODS_STOCK_DIE_QUEUE = "order_goods_stock_die_queue"; // 死信交换机 public static final String ORDER_GOODS_STOCK_DIE_EXCHANGE = "order_goods_stock_die_exchange"; //声明一个死信队列用来存放死信消息 @Bean public Queue orderGoodsStockDieQueue() { return new Queue(ORDER_GOODS_STOCK_DIE_QUEUE,true,false,false,null); } //声明一个死信交换机 @Bean public TopicExchange orderGoodsStockDieExchange() { return new TopicExchange(ORDER_GOODS_STOCK_DIE_EXCHANGE,true,true); } //将死信队列和死信的交换机绑定 @Bean public Binding dieQueueBindingDieExchange() { return BindingBuilder.bind(orderGoodsStockDieQueue()). to(orderGoodsStockDieExchange()). with("dieRoutingKeys"); } //声明一个业务队列 @Bean public Queue orderGoodsStockQueue() { Map args = new HashMap(); // 声明 死信队列Exchange args.put("x-dead-letter-exchange", ORDER_GOODS_STOCK_DIE_EXCHANGE); // 设置死信routingKey args.put("x-dead-letter-routing-key", "dieRoutingKeys"); return new Queue( GOODS_STOCK_CUT_QUEUE, true, false, false, args); } //声明一个业务交换机 @Bean public TopicExchange orderGoodsStockExchange() { return new TopicExchange(ORDER_GOODS_STOCK_EXCHANGE); } //将业务队列和业务交换机绑定 @Bean public Binding bindingDirect() { return BindingBuilder.bind(orderGoodsStockQueue()).to(orderGoodsStockExchange()).with("routingKeys"); } }

GoodsStockMaListener文件

import com.fordieend.model.GoodsStockModel; import com.fordieend.model.OrderModel; import com.fordieend.service.GoodsStockService; import com.fordieend.service.OrderService; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.handler.annotation.Headers; import org.springframework.stereotype.Component; import java.util.Map; /** * 商品库存服务 * @author 李庆伟 * @date 2021/6/11 10:47 */ @Component public class GoodsStockMqListener { @Autowired private GoodsStockService goodsStockService; @Autowired private OrderService orderService; /** * 把该监听命名为A监听 * 监听队列 远程服务支付接口发送消息 该队列会接受 * [message, headers, channel] * @return {@link } * @throws * @author 李庆伟 * @date 2021/6/13 18:36 */ @RabbitListener(queues = "order_goods_stock_queue") public void process(Message message, @Headers Map headers, Channel channel) throws Exception { String orderId = new String(message.getBody(), "UTF-8"); OrderModel orderModel = orderService.show(orderId); GoodsStockModel goodsStockModel = goodsStockService.show(orderModel.getGoodsId()); goodsStockModel.setGoodsNum(goodsStockModel.getGoodsNum() - orderModel.getGoodsNum()); try { //需要处理重复消费,这里就不说明了 int a = 1/0;//模拟异常 int cutNum = goodsStockService.update(goodsStockModel);//扣减库存 if (cutNum > 0) { // 手动签收消息,通知mq服务器端删除该消息 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } } catch (Exception e) { e.printStackTrace(); //出现异常后,原本队列消息进入死信队列 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); } } /** * 把该监听命名为B监听 * 监听死信队列 远程服务支付接口发送消息 该队列会接受 * [message, headers, channel] * @return {@link } * @throws * @author 李庆伟 * @date 2021/6/13 18:36 */ @RabbitListener(queues = "order_goods_stock_die_queue") public void processForDie(Message message, @Headers Map headers, Channel channel) throws Exception { String orderId = new String(message.getBody(), "UTF-8"); OrderModel orderModel = orderService.show(orderId); GoodsStockModel goodsStockModel = goodsStockService.show(orderModel.getGoodsId()); goodsStockModel.setGoodsNum(goodsStockModel.getGoodsNum() - orderModel.getGoodsNum()); try { //int a = 1/0; //模拟死信队列出现异常 int cutNum = goodsStockService.update(goodsStockModel);//再次扣减库存 if (cutNum > 0) { // 手动签收消息,通知mq服务器端删除该消息 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } } catch (Exception e) { e.printStackTrace(); //死信队列出现异常进入业务补偿机制 orderModel.setOrderType("0"); int orderNum = orderService.update(orderModel);//业务补偿 if(orderNum == 1){ // // 丢弃该消息 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); }else { channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);//继续消费该消息 } } } }

GoodsStockController文件和OrderController文件

import com.fordieend.model.GoodsStockModel; import com.fordieend.service.GoodsStockService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.*; /** * 商品库存服务 * @author 李庆伟 * @date 2021/6/11 10:47 */ @RequestMapping("goodsStock") @RestController public class GoodsStockController { @Autowired private GoodsStockService goodsStockService; /** * 修改库存 * 订单支付成后,扣除库存 * [goodsStockModel] * @return {@link String} * @throws * @author 李庆伟 * @date 2021/6/11 11:06 */ @PostMapping("update") public String update(@RequestBody GoodsStockModel goodsStockModel){ int num = goodsStockService.update(goodsStockModel); if(num != 1){ return "error"; } return "ok"; } /** * 库存详情 * [goodsId] * @return {@link GoodsStockModel} * @throws * @author 李庆伟 * @date 2021/6/22 11:15 */ @PostMapping("show/{goodsId}") public GoodsStockModel show(@PathVariable("goodsId") String goodsId){ return goodsStockService.show(goodsId); } } import com.fordieend.model.OrderModel; import com.fordieend.service.OrderService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.*; /** * 订单服务 * @author 李庆伟 * @date 2021/6/11 10:47 */ @RequestMapping("orders") @RestController public class OrderController { @Autowired private OrderService orderService; /** * 修改订单 * 支付成后后,修改订单状态 * [orderModel] * @return {@link String} * @throws * @author 李庆伟 * @date 2021/6/11 11:05 */ @PostMapping("update") public String update(@RequestBody OrderModel orderModel){ int num = orderService.update(orderModel); if(num != 1){ return "error"; } return "ok"; } /** * 根据订单id查询订单详情 * [orderId] * @return {@link String} * @throws * @author 李庆伟 * @date 2021/6/11 11:25 */ @PostMapping("show/{orderId}") public OrderModel show(@PathVariable("orderId") String orderId){ return orderService.show(orderId); } }

GoodsStockService文件和OrderService文件

import com.fordieend.model.GoodsStockModel; /** * @author 李庆伟 * @date 2021/6/11 10:50 */ public interface GoodsStockService { int update(GoodsStockModel goodsStockModel); GoodsStockModel show(String goodsId); } import com.fordieend.model.OrderModel; /** * @author 李庆伟 * @date 2021/6/11 10:50 */ public interface OrderService { int update(OrderModel orderModel); OrderModel show(String orderId); }

GoodsStockServiceImpl文件和OrderServiceImpl文件

import com.fordieend.mapper.GoodsStockMapper; import com.fordieend.model.GoodsStockModel; import com.fordieend.service.GoodsStockService; import org.springframework.stereotype.Service; import javax.annotation.Resource; /** * 库存服务 * @author 李庆伟 * @date 2021/6/11 10:50 */ @Service public class GoodsStockServiceImpl implements GoodsStockService { @Resource private GoodsStockMapper goodsStockMapper; /** * 修改库存 * [goodsStockModel] * @return {@link int} * @throws * @author 李庆伟 * @date 2021/6/22 11:17 */ public int update(GoodsStockModel goodsStockModel) { return goodsStockMapper.updateById(goodsStockModel); } /** * 库存详情 * [goodsId] * @return {@link GoodsStockModel} * @throws * @author 李庆伟 * @date 2021/6/22 11:17 */ public GoodsStockModel show(String goodsId) { return goodsStockMapper.selectById(goodsId); } } import com.fordieend.mapper.OrderMapper; import com.fordieend.model.OrderModel; import com.fordieend.service.OrderService; import org.springframework.stereotype.Service; import javax.annotation.Resource; /** * @author 李庆伟 * @date 2021/6/11 10:50 */ @Service public class OrderServiceImpl implements OrderService { @Resource private OrderMapper orderMapper; /** * 修改订单 * [orderModel] * @return {@link int} * @throws * @author 李庆伟 * @date 2021/6/22 11:18 */ public int update(OrderModel orderModel) { return orderMapper.updateById(orderModel); } /** * 订单详情 * [orderId] * @return {@link OrderModel} * @throws * @author 李庆伟 * @date 2021/6/22 11:18 */ public OrderModel show(String orderId) { return orderMapper.selectById(orderId); } }

GoodsStockServiceMapper文件和OrderServiceMapper文件

import com.baomidou.mybatisplus.core.mapper.BaseMapper; import com.fordieend.model.GoodsStockModel; import org.apache.ibatis.annotations.Mapper; /** * @author 李庆伟 * @date 2021/6/11 10:53 */ @Mapper public interface GoodsStockMapper extends BaseMapper { } import com.baomidou.mybatisplus.core.mapper.BaseMapper; import com.fordieend.model.OrderModel; import org.apache.ibatis.annotations.Mapper; /** * @author 李庆伟 * @date 2021/6/11 10:53 */ @Mapper public interface OrderMapper extends BaseMapper { }

 

 

库表结构

 

 

 

希望对你有所帮助。。。。。。。。。。。。。

 

分布式事务简单实现到此结束。。。。。。。。。

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3