如何优雅地实现"异步"编程?

您所在的位置:网站首页 java接口异步返回结果不对怎么解决 如何优雅地实现"异步"编程?

如何优雅地实现"异步"编程?

2023-10-19 08:21| 来源: 网络整理| 查看: 265

1、 引言

Java异步编程极大的节省了主程序执行时间,提升了计算资源利用效率,是Java高级工程师的必备技能之一。本文围绕什么是异步,异步解决了什么问题,怎么异步编程来展开。

1.1 什么是异步编程

在解释异步编程之前,我们先来看同步编程的定义。同步编程,即是一种典型的请求-响应模型,当请求调用一个函数或方法后,需等待其响应返回,然后执行后续代码。同步的最大特征便是「有序」,当各个过程都执行完毕,最后返回结果。如图

image-20211208153717189

异步编程则是只发送了调用的指令,调用者无需等待被调用的方法执行完毕,而是继续执行下面的流程。在一个多处理器或多核的环境中,异步调用是真正的并行执行。如图

image-20211208153827238

1.1 异步编程解决了什么问题

Java异步编程的目的是充分利用计算机CPU资源,不让主程序阻塞在某个长时间运行的任务上,从而优化主程序的执行时间。这类耗时的任务可以是 IO操作、远程调用以及高密度计算任务。如果不使用多线程异步编程,我们的系统就会阻塞在耗时的子任务上,会导致极大延长完成主函数任务的时间。

在实际业务开发中,一般都是同步调用的。但有很多场景非常适合使用异步来处理,如:注册新用户,送100个积分;或下单成功,发送push消息等等。就拿注册新用户这个用例来说,为什么要异步处理?

第一个原因:容错性、健壮性,如果送积分出现异常,不能因为送积分而导致用户注册失败; 因为用户注册是主要功能,送积分是次要功能,即使送积分异常也要提示用户注册成功,然后后面在针对积分异常做补偿处理。 第二个原因:提升性能,例如注册用户花了20毫秒,送积分花费50毫秒,如果用同步的话,总耗时70毫秒,用异步的话,无需等待积分,故耗时20毫秒。

故,异步能解决2个问题,性能和容错性。那么想要实现异步,有哪些实现思路呢?

2、基于消息中间件实现异步

最常见的解决方案,是引入消息中间件。同步接口调用导致响应时间长的问题,使用mq之后,将同步调用改成异步,能够显著减少系统响应时间。

image-20211207200827321

系统A作为消息的生产者,在完成本职工作后,就能直接返回结果了。而无需等待消息消费者的返回,它们最终会独立完成所有的业务功能。

这样能避免总耗时比较长,从而影响用户的体验的问题。使用消息中间件美中不足之处是:针对相对简单业务,引入第三方消息组件,可能会有些重,会带来一些额外维护工作。

如果不引入第三方中间件,我们还可以考虑通过异步非阻塞的观察者模式实现异步,比如google开源包 Guava之EventBus,可以优雅实现异步队列。

image-20211208152705309 3、基于Servlet方式实现异步

Servlet 3.0之前,Servlet采用Thread-Per-Request的方式处理请求,即每一次Http请求都由一个线程从头到尾处理。当涉及到耗时操作时,性能问题便比较明显。Servlet 3.0中提供了异步处理请求。可以先释放容器分配给请求的线程与相关资源,减轻系统负担,从而增加服务的吞吐量。Servlet 3.0的异步是通过AsyncContext对象来完成的,它可以从当前线程传给另一个线程,并归还初始线程。新的线程处理完业务可以直接返回结果给客户端。

image-20211208151018729

AsyncContext对象可以从HttpServletRequest中获取:

@RequestMapping("/async") public void async(HttpServletRequest request) { AsyncContext asyncContext = request.getAsyncContext(); }

在AsyncContext中提供了获取ServletRequest、ServletResponse和添加监听(addListener)等功能:

public interface AsyncContext { ServletRequest getRequest(); ServletResponse getResponse(); void addListener(AsyncListener var1); void setTimeout(long var1); // 省略其他方法 }

不仅可以通过AsyncContext获取Request和Response等信息,还可以设置异步处理超时时间。通常,超时时间(单位毫秒)是需要设置的,不然无限等下去不就与同步处理一样了。通过AsyncContext的addListener还可以添加监听事件,用来处理异步线程的开始、完成、异常、超时等事件回调。

addListener方法的参数AsyncListener的源码如下:

public interface AsyncListener extends EventListener { // 异步执行完毕时调用 void onComplete(AsyncEvent var1) throws IOException; // 异步线程执行超时调用 void onTimeout(AsyncEvent var1) throws IOException; // 异步线程出错时调用 void onError(AsyncEvent var1) throws IOException; // 异步线程开始时调用 void onStartAsync(AsyncEvent var1) throws IOException; }

通常,异常或超时时返回调用方错误信息,而异常时会处理一些清理和关闭操作或记录异常日志等。

下面直接看一个基于Servlet方式的异步请求示例:

@GetMapping(value = "/email/send") public void servletReq(HttpServletRequest request) { AsyncContext asyncContext = request.startAsync(); // 设置监听器:可设置其开始、完成、异常、超时等事件的回调处理 asyncContext.addListener(new AsyncListener() { @Override public void onTimeout(AsyncEvent event) { System.out.println("处理超时了..."); } @Override public void onStartAsync(AsyncEvent event) { System.out.println("线程开始执行"); } @Override public void onError(AsyncEvent event) { System.out.println("执行过程中发生错误:" + event.getThrowable().getMessage()); } @Override public void onComplete(AsyncEvent event) { System.out.println("执行完成,释放资源"); } }); //设置超时时间 asyncContext.setTimeout(6000); asyncContext.start(new Runnable() { @Override public void run() { try { Thread.sleep(5000); System.out.println("内部线程:" + Thread.currentThread().getName()); asyncContext.getResponse().getWriter().println("async processing"); } catch (Exception e) { System.out.println("异步处理发生异常:" + e.getMessage()); } // 异步请求完成通知,整个请求完成 asyncContext.complete(); } }); //此时request的线程连接已经释放了 System.out.println("主线程:" + Thread.currentThread().getName()); }

启动项目,访问对应的URL,打印日志如下:

主线程:http-nio-8080-exec-4 内部线程:http-nio-8080-exec-5 执行完成,释放资源 复制代码

可以看出,上述代码先执行完了主线程,也就是程序的最后一行代码的日志打印,然后才是内部线程的执行。内部线程执行完成,AsyncContext的onComplete方法被调用。如果通过浏览器访问对应的URL,还可以看到该方法的返回值“async processing”。说明内部线程的结果同样正常的返回到客户端了。

4、基于Spring实现异步

基于Spring可以通过Callable、DeferredResult或者WebAsyncTask等方式实现异步请求。

4.1 基于Callable实现

对于一次请求(/email),基于Callable的处理流程如下:

1、Spring MVC开启副线程处理业务(将Callable提交到TaskExecutor);

2、DispatcherServlet和所有的Filter退出Web容器的线程,但是response保持打开状态;

3、Callable返回结果,SpringMVC将原始请求重新派发给容器(再重新请求一次/email),恢复之前的处理;

4、DispatcherServlet重新被调用,将结果返回给用户;

代码实现示例如下:

@GetMapping("/email") public Callable order() { System.out.println("主线程开始:" + Thread.currentThread().getName()); Callable result = () -> { System.out.println("副线程开始:" + Thread.currentThread().getName()); Thread.sleep(1000); System.out.println("副线程返回:" + Thread.currentThread().getName()); return "success"; }; System.out.println("主线程返回:" + Thread.currentThread().getName()); return result; }

访问对应URL,控制台输入日志如下:

主线程开始:http-nio-8080-exec-1 主线程返回:http-nio-8080-exec-1 副线程开始:task-1 副线程返回:task-1

通过日志可以看出,主线程已经完成了,副线程才进行执行。同时,URL返回结果“success”。这也说明一个问题,服务器端的异步处理对客户端来说是不可见的。

Callable默认使用SimpleAsyncTaskExecutor类来执行,该线程池不是真正意义上的线程池。值得特别注意的一点,【此处有坑】使用此线程池无法实现线程重用,每次调用都会新建一条线程。若系统中不断的创建线程,最终会导致系统占用内存过高,引发OutOfMemoryError错误,关键代码如下:

public void execute(Runnable task, long startTimeout) { Assert.notNull(task, "Runnable must not be null"); Runnable taskToUse = this.taskDecorator != null ? this.taskDecorator.decorate(task) : task; //判断是否开启限流,默认为否 if (this.isThrottleActive() && startTimeout > 0L) { //执行前置操作,进行限流 this.concurrencyThrottle.beforeAccess(); this.doExecute(new SimpleAsyncTaskExecutor.ConcurrencyThrottlingRunnable(taskToUse)); } else { //未限流的情况,执行线程任务 this.doExecute(taskToUse); } } protected void doExecute(Runnable task) { //不断创建线程 Thread thread = this.threadFactory != null ? this.threadFactory.newThread(task) : this.createThread(task); thread.start(); } //创建线程 public Thread createThread(Runnable runnable) { //指定线程名,task-1,task-2... Thread thread = new Thread(this.getThreadGroup(), runnable, this.nextThreadName()); thread.setPriority(this.getThreadPriority()); thread.setDaemon(this.isDaemon()); return thread; }

我们也可以直接通过上面的控制台日志观察,每次打印的线程名都是[task-1]、[task-2]、[task-3]、[task-4].....递增的。

正因如此,所以我们实际使用异步框架时一定要自定义线程池,替代默认的SimpleAsyncTaskExecutor。这里通过实现WebMvcConfigurer接口来完成线程池的配置。

@Configuration public class WebConfig implements WebMvcConfigurer { @Resource private ThreadPoolTaskExecutor myThreadPoolTaskExecutor; /** * 配置线程池 */ @Bean(name = "asyncPoolTaskExecutor") public ThreadPoolTaskExecutor getAsyncThreadPoolTaskExecutor() { ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); taskExecutor.setCorePoolSize(2); taskExecutor.setMaxPoolSize(10); taskExecutor.setQueueCapacity(25); taskExecutor.setKeepAliveSeconds(200); taskExecutor.setThreadNamePrefix("thread-pool-"); // 线程池对拒绝任务(无线程可用)的处理策略,目前只支持AbortPolicy、CallerRunsPolicy;默认为后者 taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); taskExecutor.initialize(); return taskExecutor; } @Override public void configureAsyncSupport(final AsyncSupportConfigurer configurer) { // 处理callable超时 configurer.setDefaultTimeout(60 * 1000); configurer.setTaskExecutor(myThreadPoolTaskExecutor); configurer.registerCallableInterceptors(timeoutCallableProcessingInterceptor()); } @Bean public TimeoutCallableProcessingInterceptor timeoutCallableProcessingInterceptor() { return new TimeoutCallableProcessingInterceptor(); } }

为了验证打印的线程,我们将实例代码中的System.out.println替换成日志输出,会发现在使用线程池之前,打印日志如下:

2021-02-21 09:45:37.144 INFO 8312 --- [nio-8080-exec-1] c.s.learn.controller.AsynController : 主线程开始:http-nio-8080-exec-1 2021-02-21 09:45:37.144 INFO 8312 --- [nio-8080-exec-1] c.s.learn.controller.AsynController : 主线程返回:http-nio-8080-exec-1 2021-02-21 09:45:37.148 INFO 8312 --- [ task-1] c.s.learn.controller.AsynController : 副线程开始:task-1 2021-02-21 09:45:38.153 INFO 8312 --- [ task-1] c.s.learn.controller.AsynController : 副线程返回:task-1

线程名称为“task-1”。让线程池生效之后,打印日志如下:

2021-02-21 09:50:28.950 INFO 8339 --- [nio-8080-exec-1] c.s.learn.controller.AsynController : 主线程开始:http-nio-8080-exec-1 2021-02-21 09:50:28.951 INFO 8339 --- [nio-8080-exec-1] c.s.learn.controller.AsynController : 主线程返回:http-nio-8080-exec-1 2021-02-21 09:50:28.955 INFO 8339 --- [ thread-pool-1] c.s.learn.controller.AsynController : 副线程开始:thread-pool-1 2021-02-21 09:50:29.956 INFO 8339 --- [ thread-pool-1] c.s.learn.controller.AsynController : 副线程返回:thread-pool-1

线程名称为“thread-pool-1”,其中前面的“thread-pool”正是我们配置的线程池前缀。

除了线程池的配置,还可以配置统一异常处理,这里就不再演示了。

4.2 基于WebAsyncTask实现异步

Spring提供的WebAsyncTask是对Callable的包装,提供了更强大的功能,比如:处理超时回调、错误回调、完成回调等。

@GetMapping("/webAsyncTask") public WebAsyncTask webAsyncTask() { log.info("外部线程:" + Thread.currentThread().getName()); WebAsyncTask result = new WebAsyncTask(60 * 1000L, new Callable() { @Override public String call() { log.info("内部线程:" + Thread.currentThread().getName()); return "success"; } }); result.onTimeout(new Callable() { @Override public String call() { log.info("timeout callback"); return "timeout callback"; } }); result.onCompletion(new Runnable() { @Override public void run() { log.info("finish callback"); } }); return result; }

访问对应请求,打印日志:

2021-02-21 10:22:33.028 INFO 8547 --- [nio-8080-exec-1] c.s.learn.controller.AsynController : 外部线程:http-nio-8080-exec-1 2021-02-21 10:22:33.033 INFO 8547 --- [ thread-pool-1] c.s.learn.controller.AsynController : 内部线程:thread-pool-1 2021-02-21 10:22:33.055 INFO 8547 --- [nio-8080-exec-2] c.s.learn.controller.AsynController : finish callback 4.3 基于DeferredResult实现异步

DeferredResult使用方式与Callable类似,但在返回结果时不一样,它返回的时实际结果可能没有生成,实际的结果可能会在另外的线程里面设置到DeferredResult中去。DeferredResult的这个特性对实现服务端推技术、订单过期时间处理、长轮询、模拟MQ的功能等高级应用非常重要。常见的使用场景:

1、如熟悉Apollo使用原理的同学,可能有了解,Apollo配置实时热更新便是基于DeferredResult实现配置变更通知的

2、如前端轮询订单支付状态,为减轻服务器压力,可改为长链接的形式,基于DeferredResult实现异步结果回调

关于DeferredResult的使用先来看一下官方的例子和说明:

@RequestMapping("/quotes") @ResponseBody public DeferredResult quotes() { DeferredResult deferredResult = new DeferredResult(); // Save the deferredResult in in-memory queue ... return deferredResult; } // In some other thread... deferredResult.setResult(data);

上述示例中我们可以发现DeferredResult的调用并不一定在Spring MVC当中,它可以是别的线程。官方的解释也是如此:

In this case the return value will also be produced from a separate thread. However, that thread is not known to Spring MVC. For example the result may be produced in response to some external event such as a JMS message, a scheduled task, etc.

也就是说,DeferredResult返回的结果也可能是由MQ、定时任务或其他线程触发。来个实例:

@Controller @RequestMapping("/async/controller") public class AsyncHelloController { private List deferredResultList = new ArrayList(); @ResponseBody @GetMapping("/hello") public DeferredResult helloGet() throws Exception { DeferredResult deferredResult = new DeferredResult(); //先存起来,等待触发 deferredResultList.add(deferredResult); return deferredResult; } @ResponseBody @GetMapping("/setHelloToAll") public void helloSet() throws Exception { // 让所有hold住的请求给与响应 deferredResultList.forEach(d -> d.setResult("say hello to all")); } }

第一个请求/hello,会先将deferredResult存起来,前端页面是一直等待(转圈)状态。直到发第二个请求:setHelloToAll,所有的相关页面才会有响应。

整个执行流程如下:

controller返回一个DeferredResult,把它保存到内存里或者List里面(供后续访问); Spring MVC调用request.startAsync(),开启异步处理; 与此同时将DispatcherServlet里的拦截器、Filter等等都马上退出主线程,但是response仍然保持打开的状态; 应用通过另外一个线程(可能是MQ消息、定时任务等)给DeferredResult#setResult值。然后SpringMVC会把这个请求再次派发给servlet容器; DispatcherServlet再次被调用,然后处理后续的标准流程;

通过上述流程可以发现:利用DeferredResult可实现一些长连接的功能,比如当某个操作是异步时,可以先保存对应的DeferredResult对象,当异步通知回来时,再找到这个DeferredResult对象,在setResult处理结果即可。从而提高性能。

5 、基于SpringBoot异步实现

在SpringBoot中将一个方法声明为异步方法非常简单,只需两个注解即可@EnableAsync和@Async。其中@EnableAsync用于开启SpringBoot支持异步的功能,用在SpringBoot的启动类上。@Async用于方法上,标记该方法为异步处理方法。

需要注意的是@Async并不支持用于被@Configuration注解的类的方法上。同一个类中,一个方法调用另外一个有@Async的方法,注解也是不会生效的。

@EnableAsync的使用示例:

@SpringBootApplication @EnableAsync public class App { public static void main(String[] args) { SpringApplication.run(App.class, args); } }

@Async的使用示例:

@Service public class SyncService { @Async public void asyncEvent() { // 业务处理 } }

@Async注解的使用与Callable有类似之处,在默认情况下使用的都是SimpleAsyncTaskExecutor线程池,可参考Callable中的方式来自定义线程池。

下面通过一个实例来验证一下,启动类上使用@EnableAsync,然后定义Controller类:

@RestController public class IndexController { @Resource private UserService userService; @RequestMapping("/async") public String async(){ System.out.println("--IndexController--1"); userService.sendSms(); System.out.println("--IndexController--4"); return "success"; } }

定义Service及异步方法:

@Service public class UserService { @Async public void sendSms(){ System.out.println("--sendSms--2"); IntStream.range(0, 5).forEach(d -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } }); System.out.println("--sendSms--3"); } }

如果先注释掉@EnableAsync和@Async注解,即正常情况下的业务请求,打印日志为:

--IndexController--1 --sendSms--2 --sendSms--3 --IndexController--4

使用@EnableAsync和@Async注解时,打印日志如下:

--IndexController--1 --IndexController--4 --sendSms--2 --sendSms--3

通过日志的对比我们可以看出,使用了@Async的方法,会被当成一个子线程。所以,整个sendSms方法会在主线程执行完了之后执行。

今天我们来探讨下 spring 是如何完成这个异步功能的: spring 在扫描bean的时候会扫描方法上是否包含@async的注解,如果包含的,spring会为这个bean动态的生成一个子类,我们称之为代理类(?), 代理类是继承我们所写的bean的,然后把代理类注入进来,那此时,在执行此方法的时候,会到代理类中,代理类判断了此方法需要异步执行,就不会调用父类 (我们原本写的bean)的对应方法。spring自己维护了一个队列,他会把需要执行的方法,放入队列中,等待线程池去读取这个队列,完成方法的执行, 从而完成了异步的功能。我们可以关注到再配置task的时候,是有参数让我们配置线程池的数量的。因为这种实现方法,所以在同一个类中的方法调用,添加@async注解是失效的!,原因是当你在同一个类中的时候,方法调用是在类体内执行的,spring无法截获这个方法调用。

那在深入一步,spring为我们提供了AOP,面向切面的功能。他的原理和异步注解的原理是类似的,spring在启动容器的时候,会扫描切面所定义的 类。在这些类被注入的时候,所注入的也是代理类,当你调用这些方法的时候,本质上是调用的代理类。通过代理类再去执行父类相对应的方法,那spring只 需要在调用之前和之后执行某段代码就完成了AOP的实现了!

@Async、WebAsyncTask、Callable、DeferredResult的区别

所在的包不同:

@Async:org.springframework.scheduling.annotation; WebAsyncTask:org.springframework.web.context.request.async; Callable:java.util.concurrent; DeferredResult:org.springframework.web.context.request.async;

通过所在的包,我们应该隐隐约约感到一些区别,比如@Async是位于scheduling包中,而WebAsyncTask和DeferredResult是用于Web(Spring MVC)的,而Callable是用于concurrent(并发)处理的。

对于Callable,通常用于Controller方法的异步请求,当然也可以用于替换Runable的方式。在方法的返回上与正常的方法有所区别:

// 普通方法 public String aMethod(){ } // 对照Callable方法 public Callable aMethod(){ }

而WebAsyncTask是对Callable的封装,提供了一些事件回调的处理,本质上区别不大。

DeferredResult使用方式与Callable类似,重点在于跨线程之间的通信。

@Async也是替换Runable的一种方式,可以代替我们自己创建线程。而且适用的范围更广,并不局限于Controller层,而可以是任何层的方法上。

当然,大家也可以从返回结果,异常处理等角度来分析一下,这里就不再展开了。

6、 自己造轮子

我们还可以自己造轮子,通过自定义线程池的工具类的方式,实现业务异步执行。

public abstract class ExecutorKit { private static final Logger logger = LoggerFactory.getLogger(ExecutorKit.class); private static ThreadPoolExecutor threadPoolExecutor; public static void sleep(int timeout, TimeUnit timeUnit) { try { timeUnit.sleep((long)timeout); } catch (Exception var3) { } } public static void execute(Runnable cmd) { execute("default", true, cmd); } public static void execute(String name, Runnable cmd) { execute(name, true, cmd); } public static void execute(String name, boolean ignoreException, Runnable cmd) { threadPoolExecutor.execute(() -> { Transaction transaction = Cat.newTransaction("Executor-Tasks", name); try { cmd.run(); transaction.setStatus("0"); } catch (Exception var8) { logger.error("异步运行[{}]任务异常", name, var8); if (ignoreException) { transaction.setStatus("0"); } else { transaction.setStatus(var8); } } finally { transaction.complete(); } }); } private ExecutorKit() { } static { AtomicInteger threadIndex = new AtomicInteger(0); LinkedBlockingDeque blockingDeque = new LinkedBlockingDeque(51200); ThreadFactory threadFactory = (r) -> { Thread thread = new Thread(r); thread.setName(ExecutorKit.class.getSimpleName() + "-" + threadIndex.getAndIncrement()); thread.setDaemon(true); return thread; }; threadPoolExecutor = new ThreadPoolExecutor(9, 9, 0L, TimeUnit.SECONDS, blockingDeque, threadFactory, (r, executor) -> { logger.error("线程池任务过多,丢弃处理"); }); } }

使用示例:

ExecutorKit.execute("Log-Save-Decide", () -> logAndSaveDecide(userId, result, userVars)); 7、小结

经过上述的分析探讨,大家应该对于常见异步处理有所了解,当熟悉这些基础理论,实战实例,使用方法及注意事项之后,在后续实际应用中,还需仔细探究其中实现细节,具体情况具体分析,研发过程需避免“拿来主义”,避免踩坑,敬畏线上的每一行代码。

附:参考链接:

1、zhuanlan.zhihu.com/p/146336940

2、zhuanlan.zhihu.com/p/99268715

3、juejin.cn/post/694312…

4、blog.csdn.net/wangdong567…



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3