spring boot 并发请求,其他系统接口,丢失request的header信息【多线程、线程池、@Async 】

您所在的位置:网站首页 spring线程池使用 spring boot 并发请求,其他系统接口,丢失request的header信息【多线程、线程池、@Async 】

spring boot 并发请求,其他系统接口,丢失request的header信息【多线程、线程池、@Async 】

2023-05-16 17:38| 来源: 网络整理| 查看: 265

场景:一次迭代在灰度环境发版时,测试反馈说我开发的那个功能,查询接口有部分字段数据是空的,后续排查日志,发现日志如下: feign.RetryableException: cannot retry due to redirection, in streaming mode executing POST

下面是业务、环境和分析过程下面是业务、环境和分析过程:

接口的业务场景 :我这个接口类似是那种报表统计的接口,它会请求多个微服务,把请求到的数据,统一返回给前端,相当于设计模式中的门面模式了。

后续由于这个接口 是串行请求其他微服务的,速度有些慢,后面修改代码从串行请求,改成并行(多线程)获取数据

运维那边是通过判断http请求中cookie 或者 header中的某个数据,来区分请求是否要把流量打到灰度。

分析得出:应该是接口异步请求的时候cookie丢失,没走到灰度环境,找不到 这次迭代新开发的接口,导致的重定向到错误页面了。

验证:由于我代码是通过@Async异步注解,实现并行请求的,临时把五个接口的异步注解注释掉了,灰度在发版验证,数据能返回正常,说明流量打到灰度了

说明问题就是并发请求的时候,子线程获取不到 主线程的request 头信息,导致没有走到灰度

下图就是灰度环境的 流程图:

image

问题定位出来了,解决方案就是:让子线程能获取到主线程的 request 头信息,主线程把 数据透传到子线程。

我使用的是RequestContextHolder来透传数据 什么是 RequestContextHolder?

RequestContextHolder 是spring mvc的一个工具类,顾名思义,持有上下文的Request容器

如何使用:

//获取当前线程 request请求的属性 RequestAttributes requestAttributes = RequestContextHolder.currentRequestAttributes(); //设置当前线程 request请求的属性 RequestContextHolder.setRequestAttributes(attributes);

RequestContextHolder的 会用到的几个方法 currentRequestAttributes:获得当前线程请求的属性(头信息之类的) setRequestAttributes(attributes):设置当前线程 属性(设置头信息) resetRequestAttributes:删除当前线程 绑定的属性

下面是他们的源码,可以简单看一下,原理是通过ThreadLocal来绑定数据的:

private static final ThreadLocal requestAttributesHolder = new NamedThreadLocal("Request attributes"); private static final ThreadLocal inheritableRequestAttributesHolder = new NamedInheritableThreadLocal("Request context"); //获得当前线程请求的属性(头信息之类的) @Nullable public static RequestAttributes getRequestAttributes() { RequestAttributes attributes = requestAttributesHolder.get(); if (attributes == null) { attributes = inheritableRequestAttributesHolder.get(); } return attributes; } //设置当前线程 属性(设置头信息) public static void setRequestAttributes(@Nullable RequestAttributes attributes) { setRequestAttributes(attributes, false); } //设置当前线程 属性(设置头信息) public static void setRequestAttributes(@Nullable RequestAttributes attributes, boolean inheritable) { if (attributes == null) { resetRequestAttributes(); } else { if (inheritable) { inheritableRequestAttributesHolder.set(attributes); requestAttributesHolder.remove(); } else { requestAttributesHolder.set(attributes); inheritableRequestAttributesHolder.remove(); } } } //删除当前线程 绑定的属性 public static void resetRequestAttributes() { requestAttributesHolder.remove(); inheritableRequestAttributesHolder.remove(); } 下面我编写了一套遇到问题的代码例子,以及解决的代码:

TestUserController 测试接口

@Slf4j @RestController @RequestMapping(value = "/v1/testUser") public class TestUserController { @Autowired ITestRequestService testRequestService; @ApiOperation(value = "聚合数据接口(一)-串行获取数据") @RequestMapping(value = "/listUser", method = RequestMethod.GET) public Resp listUser(@RequestHeader(value = "token",required = false)String token){ TimeInterval timeInterval = DateUtil.timer(); DataResp dataResp = testRequestService.getDateResp(); log.info("聚合数据接口(一)-串行获取数据 总耗时:{}毫秒",timeInterval.interval()); return Resp.buildDataSuccess(dataResp).setTimeInterval(timeInterval.interval()); } @ApiOperation(value = "聚合数据接口(二)-并行获取数据@Async (子线程获取不到token)") @RequestMapping(value = "/listUser2", method = RequestMethod.GET) public Resp listUser2(@RequestHeader(value = "token",required = false)String token) throws ExecutionException, InterruptedException { TimeInterval timeInterval = DateUtil.timer(); DataResp dataResp = testRequestService.getDateResp2(); log.info("聚合数据接口(二)-并行获取数据@Async (子线程获取不到token) 总耗时:{}毫秒",timeInterval.interval()); return Resp.buildDataSuccess(dataResp).setTimeInterval(timeInterval.interval()); } @ApiOperation(value = "聚合数据接口(三)-并行获取数据(子线程能获取到token)") @RequestMapping(value = "/listUser3", method = RequestMethod.GET) public Resp listUser3(@RequestHeader(value = "token",required = false)String token) throws ExecutionException, InterruptedException { TimeInterval timeInterval = DateUtil.timer(); DataResp dataResp = testRequestService.getDateResp3(); log.info("聚合数据接口(三)-并行获取数据(子线程能获取到token) 总耗时:{}毫秒",timeInterval.interval()); return Resp.buildDataSuccess(dataResp).setTimeInterval(timeInterval.interval()); } }

TestRequestService 聚合数据的类

@Service public class TestRequestService implements ITestRequestService { @Autowired IUserService userService; @Autowired IOrderService orderService; /** * 自定义 - 线程池 */ private static final ThreadPoolExecutor executorService = new ThreadPoolExecutor(50, 200, 180L, TimeUnit.SECONDS, new LinkedBlockingQueue(3000), new ThreadFactory() { final ThreadFactory defaultFactory = Executors.defaultThreadFactory(); @Override public Thread newThread(Runnable r) { Thread thread = defaultFactory.newThread(r); thread.setName("testRequest - " + thread.getName()); return thread; } }, new ThreadPoolExecutor.CallerRunsPolicy()); /** * 聚合接口-串行获取数据 * @return */ @Override public DataResp getDateResp(){ //获取用户列表 List userList = userService.listUser_1(); //获取订单列表 List orderList = orderService.listOrder_1(); return DataResp.builder().userList(userList).orderList(orderList).build(); }; /** * 聚合接口-并行获取数据(@Async) 头信息传到子线程 * @return */ @Override public DataResp getDateResp2() throws ExecutionException, InterruptedException { //获取用户列表 start Future userListFuture = userService.listUser_2(); List userList = userListFuture.get(); //获取用户列表 end //获取订单列表 start Future orderListFuture = orderService.listOrder_2(); List orderList = orderListFuture.get(); //获取订单列表 end return DataResp.builder().userList(userListFuture.get()).orderList(orderList).build(); }; /** * 聚合接口-并行获取数据 * @return */ @Override public DataResp getDateResp3() throws ExecutionException, InterruptedException { RequestAttributes attributes = RequestContextHolder.getRequestAttributes(); //获取用户列表 start Future userListFuture = CompletableFuture.supplyAsync(() -> { RequestContextHolder.setRequestAttributes(attributes); try { List resp = userService.listUser_3(); return resp; }finally { RequestContextHolder.resetRequestAttributes(); } }, executorService); List userList = userListFuture.get(); //获取用户列表 end //获取订单列表 start Future orderListFuture = CompletableFuture.supplyAsync(() -> { RequestContextHolder.setRequestAttributes(attributes); try { List resp = orderService.listOrder_3(); return resp; }finally { RequestContextHolder.resetRequestAttributes(); } }, executorService); List orderList = orderListFuture.get(); //获取订单列表 end return DataResp.builder().userList(userListFuture.get()).orderList(orderList).build(); }; }

下面是两个请求 用户和订单请求类

OrderService 请求订单的服务的聚合方法

@Slf4j @Service public class OrderService implements IOrderService { /** * 获取订单code列表 * @return */ @Override public List listOrderCode(){ //使用httpUtil 模拟 feign请求服务接口 start String reqUrl = Config.baseUrl.concat("/v1/order/list"); HttpRequest httpRequest = HttpUtil.createGet(reqUrl); //设置请求头信息 String token = WebUtil.getCurrentRequestHeaderToken(); httpRequest.header("token",token); HttpResponse httpResponse = httpRequest.execute(); String body = httpResponse.body(); Resp respData = JSONUtil.toBean(body, Resp.class); //使用httpUtil 模拟 feign请求服务接口 end if(respData.isSuccess()){ return respData.getData(); } return null; }; /** * 根据订单code获取 订单数据 * @param orderCode * @return */ @Override public Order getOrder(String orderCode){ //使用httpUtil 模拟 feign请求服务接口 start String reqUrl = StrUtil.format(Config.baseUrl.concat("/v1/order/get?orderCode={}"),orderCode); HttpRequest httpRequest = HttpUtil.createGet(reqUrl); //设置请求头信息 String token = WebUtil.getCurrentRequestHeaderToken(); httpRequest.header("token",token); HttpResponse httpResponse = httpRequest.execute(); String body = httpResponse.body(); Gson gson = new Gson(); Resp respData = gson.fromJson(body , new TypeToken(){}.getType()); //使用httpUtil 模拟 feign请求服务接口 end if(respData.isSuccess()){ return respData.getData(); } return null; }; /** * 获取订单列表(串行获取) * @return */ @Override public List listOrder_1(){ //获取订单列表 start List orderList = new ArrayList(); List orderCodes = listOrderCode(); orderCodes.stream().forEach(orderCode->{ Order order = getOrder(orderCode); orderList.add(order); }); //获取订单列表 end return orderList; }; /** * 获取订单列表(并行获取数据) * stream也改成了parallelStream 并行for循环 * @return */ @Async @Override public Future listOrder_2(){ log.info("listOrder_2 当前线程是:{}",Thread.currentThread().getName()); //获取订单列表 start List orderList = new ArrayList(); List orderCodes = listOrderCode(); if(CollUtil.isNotEmpty(orderCodes)){ orderCodes.parallelStream().forEach(orderCode->{ Order order = getOrder(orderCode); if(order!=null){ orderList.add(order); } }); } //获取订单列表 end return new AsyncResult(orderList); }; /** * 获取订单列表(并行获取数据)(把主线程的request的数据 透传给 子线程和子子线程) * @return */ @Override public List listOrder_3(){ //获取订单列表 start List orderList = new ArrayList(); List orderCodes = listOrderCode(); if(CollUtil.isNotEmpty(orderCodes)){ RequestAttributes attributes = RequestContextHolder.getRequestAttributes(); orderCodes.parallelStream().forEach(orderCode->{ RequestContextHolder.setRequestAttributes(attributes); try { Order order = getOrder(orderCode); if(order!=null){ orderList.add(order); } }finally { RequestContextHolder.resetRequestAttributes(); } }); } //获取订单列表 end return orderList; }; }

UserService 请求订单的服务的聚合方法

import java.util.ArrayList; import java.util.List; import java.util.concurrent.Future; @Slf4j @Service public class UserService implements IUserService { @Override public List listUserId(){ //使用httpUtil 模拟 feign请求服务接口 start String reqUrl = Config.baseUrl.concat("/v1/user/list"); HttpRequest httpRequest = HttpUtil.createGet(reqUrl); //设置请求头信息 String token = WebUtil.getCurrentRequestHeaderToken(); httpRequest.header("token",token); HttpResponse httpResponse = httpRequest.execute(); String body = httpResponse.body(); Resp respData = JSONUtil.toBean(body, Resp.class); //使用httpUtil 模拟 feign请求服务接口 end if(respData.isSuccess()){ return respData.getData(); } return null; }; @Override public User getUser(Integer userId){ //使用httpUtil 模拟 feign请求服务接口 start String reqUrl = StrUtil.format(Config.baseUrl.concat("/v1/user/get?userId={}"),userId); HttpRequest httpRequest = HttpUtil.createGet(reqUrl); //设置请求头信息 String token = WebUtil.getCurrentRequestHeaderToken(); httpRequest.header("token",token); HttpResponse httpResponse = httpRequest.execute(); String body = httpResponse.body(); Gson gson = new Gson(); Resp respData = gson.fromJson(body , new TypeToken(){}.getType()); //使用httpUtil 模拟 feign请求服务接口 end if(respData.isSuccess()){ return respData.getData(); } return null; }; @Override public List listUser_1(){ //获取用户列表 start List userList = new ArrayList(); List userIds = listUserId(); userIds.stream().forEach(userId->{ User user = getUser(userId); userList.add(user); }); //获取用户列表 end return userList; }; @Async @Override public Future listUser_2(){ log.info("listUser_2 当前线程是:{}",Thread.currentThread().getName()); //获取用户列表 start List userList = new ArrayList(); List userIds = listUserId(); if(CollUtil.isNotEmpty(userIds)){ userIds.parallelStream().forEach(userId->{ User user = getUser(userId); if(user!=null){ userList.add(user); } }); } //获取用户列表 end return new AsyncResult(userList); }; @Override public List listUser_3(){ //获取用户列表 start List userList = new ArrayList(); List userIds = listUserId(); if(CollUtil.isNotEmpty(userIds)){ RequestAttributes attributes = RequestContextHolder.getRequestAttributes(); userIds.parallelStream().forEach(userId->{ RequestContextHolder.setRequestAttributes(attributes); try { User user = getUser(userId); if(user!=null){ userList.add(user); } }finally { RequestContextHolder.resetRequestAttributes(); } }); } //获取用户列表 end return userList; }; }

OrderController 你可以理解成其他其他微服务的接口(模拟写的一个接口,用来测试 请求接口的时候是否携带 请求头了)

@Slf4j @RestController @RequestMapping(value = "/v1/order") public class OrderController { @ApiOperation(value = "获取订单编号列表") @RequestMapping(value = "/list", method = RequestMethod.GET) public Resp list(HttpServletRequest request){ String token = request.getHeader("token"); if(StrUtil.isBlank(token)){ return Resp.buildFail("接口不存在 404"); } List userIds = new ArrayList(); userIds.add("11111"); userIds.add("22222"); userIds.add("33333"); userIds.add("44444"); userIds.add("55555"); userIds.add("6666"); userIds.add("7777"); handleBusinessTime(); return Resp.buildDataSuccess(userIds); } @ApiOperation(value = "获取订单详情") @ApiImplicitParams({ @ApiImplicitParam(name = "orderCode", value = "订单CODE", paramType = "query"), }) @RequestMapping(value = "/get", method = RequestMethod.GET) public Resp get(HttpServletRequest request,@RequestParam(value = "orderCode")String orderCode){ String token = request.getHeader("token"); if(StrUtil.isBlank(token)){ return Resp.buildFail("接口不存在 404"); } handleBusinessTime(); String name = StrUtil.format("订单-{}-名",orderCode); return Resp.buildDataSuccess(Order.builder().code(orderCode).orderName(name).build()); } /** * 这方法 模拟处理业务或者 去操作数据库 消耗的时间 */ public static void handleBusinessTime(){ //去数据库查询数据耗时 start int[] sleepTime = NumberUtil.generateRandomNumber(300,800,1); try { //Thread.sleep 休眠的时候 相当于 业务操作,或者请求数据库的需要消耗的时间 Thread.sleep(sleepTime[0]); } catch (InterruptedException e) { e.printStackTrace(); } //去数据库查询数据耗时 end } } @Slf4j @RestController @RequestMapping(value = "/v1/user") public class UserController { @ApiOperation(value = "获取用户列表-id") @ApiImplicitParams({ @ApiImplicitParam(name = "orderCode", value = "订单编号", paramType = "query"), }) @RequestMapping(value = "/list", method = RequestMethod.GET) public Resp list(HttpServletRequest request){ String token = request.getHeader("token"); if(StrUtil.isBlank(token)){ return Resp.buildFail("接口不存在 404"); } List userIds = new ArrayList(); userIds.add(1); userIds.add(2); userIds.add(3); userIds.add(4); userIds.add(5); handleBusinessTime(); return Resp.buildDataSuccess(userIds); } @ApiOperation(value = "根据用户ID获取 用户信息") @ApiImplicitParams({ @ApiImplicitParam(name = "userId", value = "用户ID", paramType = "query"), }) @RequestMapping(value = "/get", method = RequestMethod.GET) public Resp get(HttpServletRequest request,@RequestParam(value = "userId")Integer userId){ String token = request.getHeader("token"); if(StrUtil.isBlank(token)){ return Resp.buildFail("接口不存在 404"); } handleBusinessTime(); String name = StrUtil.format("用户{}号",userId); return Resp.buildDataSuccess(User.builder().id(userId).name(name).build()); } /** * 这方法 模拟处理业务或者 去操作数据库 消耗的时间 */ public static void handleBusinessTime(){ //去数据库查询数据耗时 start int[] sleepTime = NumberUtil.generateRandomNumber(300,800,1); try { //Thread.sleep 休眠的时候 相当于 业务操作,或者请求数据库的需要消耗的时间 Thread.sleep(sleepTime[0]); } catch (InterruptedException e) { e.printStackTrace(); } //去数据库查询数据耗时 end } }

下面三个接口的由来:

/v1/testUser/listUser 接口:就是串行调用其他服务接口 ,性能比较慢 /v1/testUser/listUser2 接口:是通过@Async 异步注解,并行调用其他 系统的接口,性能是提升上去了,但灰度环境 是需要根据请求头里面的数据判断是否把流量打到灰度环境 /v1/testUser/listUser3接口:对@Async注解没有找到透传 主线程request头信息的方案,就使用线程池+CompletableFuture.supplyAsync的方式 每次执行异步线程的时候,把主线程的 请求参数设置到子线程,然后通过try-finally 参数使用完之后RequestContextHolder.resetRequestAttributes() 删除参数。

注意:parallelStream它也是属于并行流操作,也要设置 请求头信息,虽说子线程(getDateResp3方法)能获取到主线程的请求头信息了,但是parallelStream 又相当于子线程的子线程了,它是获取不到的 主线程的attributes的,当时我就是没在parallelStream设置attributes,它没有走到灰度环境, 让我 耗费了两个多小时,代码加了四五次日志输出,才把这个问题定位出来,这是一个坑。。。

下面是代码: image

基于这个问题,我还写了一篇 spring boot使用@Async的文章,大家感兴趣可以去看看 传送门~

我已经把上述代码例子放到gitee了,大家感兴趣可以clone 传送门~



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3