Java多线程并行计算

您所在的位置:网站首页 java求多个数的和 Java多线程并行计算

Java多线程并行计算

2023-11-25 11:20| 来源: 网络整理| 查看: 265

在实际业务开发中如何降低接口响应时间,即如何提高程序的并行计算能力。

本文主要包含如下内容: 1、顺序执行 2、线程池+Future 3、使用Java8的CompletableFuture 4、使用Guava的ListenableFuture

1、顺序执行

直接上代码:

package com.c306.test; import lombok.extern.slf4j.Slf4j; import org.junit.Test; import java.util.ArrayList; import java.util.List; @Slf4j public class ParallelTest { /** * 测试方法 * * @return */ private int testMethod() { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return 1; } /** * 顺序执行 */ private void test01() { long start = System.currentTimeMillis(); List list = new ArrayList(5); list.add(this.testMethod()); list.add(this.testMethod()); list.add(this.testMethod()); list.add(this.testMethod()); list.add(this.testMethod()); log.info("costs: {}ms", System.currentTimeMillis() - start); } @Test public void testSequentialExec() { this.test01(); this.test01(); this.test01(); } }

多次顺序执行耗时结果:

2020-04-19 16:33:43,310+0800 INFO [main] com.qx.test.ParallelTest - costs: 5046ms 2020-04-19 16:33:48,329+0800 INFO [main] com.qx.test.ParallelTest - costs: 5003ms 2020-04-19 16:33:53,332+0800 INFO [main] com.qx.test.ParallelTest - costs: 5003ms 2、线程池+Future

顺序执行确实很慢,所以我们需要并行执行,即同时调用testMethod()这5个方法。每个方法单独开启一个线程异步去执行,全部执行完成输出结果。 注意:这样执行有个问题是,每次调用都需要创建5个线程,线程的创建和销毁都是需要开销的,所以我们使用池化技术

直接上代码:

package com.c306.test; import lombok.extern.slf4j.Slf4j; import org.junit.Test; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.stream.Collectors; @Slf4j public class ParallelTest { /** * IO密集型建议:2*CPU,因为IO密集型线程不是一直在运行,所以可以配置多一点; * CPU密集型建议:因为一直在使用CPU,所以要保证线程数不能太多,可以CPU数+1; */ private ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() + 1); /** * 测试方法 * * @return */ private int testMethod() { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return 1; } /** * 线程池+Future */ private void test02() { long start = System.currentTimeMillis(); Future test1 = executor.submit(this::testMethod); Future test2 = executor.submit(this::testMethod); Future test3 = executor.submit(this::testMethod); Future test4 = executor.submit(this::testMethod); Future test5 = executor.submit(this::testMethod); List futureList = Arrays.asList(test1, test2, test3, test4, test5); List list = futureList.stream().map(future -> { try { return future.get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } return null; }).collect(Collectors.toList()); log.info("costs: {}ms", System.currentTimeMillis() - start); } @Test public void testFutureExec() { this.test02(); this.test02(); this.test02(); } }

多次并行执行耗时结果:

2020-04-19 16:53:38,983+0800 INFO [main] com.qx.test.ParallelTest - costs: 1208ms 2020-04-19 16:53:40,002+0800 INFO [main] com.qx.test.ParallelTest - costs: 1000ms 2020-04-19 16:53:41,002+0800 INFO [main] com.qx.test.ParallelTest - costs: 1000ms

效果很明显,直接相当于一个方法的调用耗时,这种通过线程池+Future并行计算的方式,直接可以把接口性能提高上去了。

3、使用Java8的CompletableFuture

Future是java.util.concurrent并发包中的接口类,用来表示一个线程异步执行后的结果,核心方法包含: Future.get() 阻塞调用线程,直到计算结果返回 Future.isDone() 判断线程是否执行完毕 Future.cancel() 取消当前线程的执行

Future.get()是阻塞调用的,想要拿到线程执行的结果,必须是Future.get()阻塞或者while(Future.isDone())轮询方式调用。这种方式叫“主动拉(pull)”,现在流行响应式编程,即“主动推(push)”的方式,当线程执行完了,你告诉我就可以了。

直接上代码:

package com.c306.test; import lombok.extern.slf4j.Slf4j; import org.junit.Test; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.concurrent.*; import java.util.stream.Collectors; @Slf4j public class ParallelTest { /** * IO密集型建议:2*CPU,因为IO密集型线程不是一直在运行,所以可以配置多一点; * CPU密集型建议:因为一直在使用CPU,所以要保证线程数不能太多,可以CPU数+1; */ private ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() + 1); /** * 测试方法 * * @return */ private int testMethod() { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return 1; } /** * 使用Java8的CompletableFuture */ private void test03() { long start = System.currentTimeMillis(); List futureList = new ArrayList(5); futureList.add(CompletableFuture.supplyAsync(this::testMethod, executor)); futureList.add(CompletableFuture.supplyAsync(this::testMethod, executor)); futureList.add(CompletableFuture.supplyAsync(this::testMethod, executor)); futureList.add(CompletableFuture.supplyAsync(this::testMethod, executor)); futureList.add(CompletableFuture.supplyAsync(this::testMethod, executor)); CompletableFuture allFuture = CompletableFuture.allOf(futureList.toArray(new CompletableFuture[5])); CompletableFuture resultList = allFuture.thenApplyAsync(value -> futureList.stream().map(CompletableFuture::join).collect(Collectors.toList()), executor ); List list = resultList.join(); log.info("costs: {}ms", System.currentTimeMillis() - start); } @Test public void testCompletableFutureExec() { this.test03(); this.test03(); this.test03(); } }

可以看到实现方式和Future并没有什么不同,但是CompletableFuture提供了很多方便的方法,比如代码中的allOf,thenApplyAsync,可以将多个CompletableFuture组合成一个CompletableFuture,再调用join方法阻塞拿到结果

多次并行执行耗时结果:

2020-04-19 17:08:17,800+0800 INFO [main] com.qx.test.ParallelTest - costs: 1212ms 2020-04-19 17:08:18,844+0800 INFO [main] com.qx.test.ParallelTest - costs: 1011ms 2020-04-19 17:08:19,847+0800 INFO [main] com.qx.test.ParallelTest - costs: 1002ms

CompletableFuture类中有很多方法可以供大家使用,不像Future只要那么几个方法可以使用,这也是Java自有库对Future的一个增强。这里只是简单展示了CompletableFuture的一种用法,实际开发中需要根据不同的场景去选择使用不同的方法。

4、使用Guava的ListenableFuture

谷歌开源的Guava中的ListenableFuture接口对java自带的Future接口做了进一步扩展和封装,并且提供了静态工具类Futures。

直接上代码:

package com.qx.test; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import lombok.extern.slf4j.Slf4j; import org.junit.Test; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.concurrent.*; import java.util.stream.Collectors; @Slf4j public class ParallelTest { /** * IO密集型建议:2*CPU,因为IO密集型线程不是一直在运行,所以可以配置多一点; * CPU密集型建议:因为一直在使用CPU,所以要保证线程数不能太多,可以CPU数+1; */ private ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() + 1); /** * 测试方法 * * @return */ private int testMethod() { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return 1; } /** * 使用Guava的ListenableFuture * * @throws ExecutionException * @throws InterruptedException */ private void test04() throws ExecutionException, InterruptedException { // guava需要包装一下线程池 ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator(executor); long start = System.currentTimeMillis(); ListenableFuture test1 = listeningExecutorService.submit(this::testMethod); ListenableFuture test2 = listeningExecutorService.submit(this::testMethod); ListenableFuture test3 = listeningExecutorService.submit(this::testMethod); ListenableFuture test4 = listeningExecutorService.submit(this::testMethod); ListenableFuture test5 = listeningExecutorService.submit(this::testMethod); ListenableFuture listListenableFuture = Futures.allAsList(test1, test2, test3, test4, test5); List list = listListenableFuture.get(); log.info("costs: {}ms", System.currentTimeMillis() - start); } @Test public void testListenableFutureExec() throws ExecutionException, InterruptedException { this.test04(); this.test04(); this.test04(); } }

多次并行执行耗时结果:

2020-04-19 17:21:11,919+0800 INFO [main] com.qx.test.ParallelTest - costs: 1203ms 2020-04-19 17:21:12,944+0800 INFO [main] com.qx.test.ParallelTest - costs: 1002ms 2020-04-19 17:21:13,945+0800 INFO [main] com.qx.test.ParallelTest - costs: 1001ms

总结:以上就是如何让接口并行计算的三种实现方式,属于日常开发中比较常用的代码优化技巧。这里没有做过多的说明和比较,需要大家查阅更多的相关源码和资料。



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3