Reactor 3 (9): 并发和调度器 Schedulers、publishOn、subscribeOn |
您所在的位置:网站首页 › 线程并行处理 › Reactor 3 (9): 并发和调度器 Schedulers、publishOn、subscribeOn |
Stream中的并行处理非常简单,只要加上parallel(),就可以将stream并行化: @Test public void streamParallel () { Stream.of(1,2,3,4,5,6,7,8).parallel().map(String::valueOf).forEach(log::info); } }根据结果中线程使用情况可知这种操作就完成了stream的并行化: Reactor的并行化跟stream一样简单,不同于stream对并行的不可控,Reator还提供了可以对并行运行的调度器Schedulers Schedulers简介在Reactor中,并行执行以及执行的位置由所Scheduler确定 。 Schedulers 类有如下几种对上下文操作的静态方法: immediate():无执行上下文,提交的Runnable将直接在原线程上执行,可以理解没有调度single():可重用单线程,使用一个线程处理所有请求elastic(): 没有边界的弹性线程池boundedElastic():有边界弹性线程池,设置线程限制,默认为cpu核心数*10。达到上限后最多可以提交10万个任务。是阻塞线程的方法parallel(): 固定线程数量的并行线程池,线程数量和cpu内核一样多Reactor 提供了两种通过Scheduler切换上下文执行的方法:publishOn和subscribeOn。 publishOn在执行顺序中的位置很重要subscribeOn的位置不重要 publishOn方法有图可见subscribeOn使用之后会全方位覆盖,因此如果出现多个subscribeOn(),回执行后触发的 @Test public void subscribeOnTest() throws InterruptedException { Flux.range(1,2) .map(i -> { log.info("Map 1, the value map to: {}", i*i); return i*i; }) .subscribeOn(Schedulers.single()) .map(i -> { log.info("Map 2, the value map to: {}", -i); return -i; }) .subscribeOn(Schedulers.newParallel("parallel", 4)) .map(i -> { log.info("Map 3, the value map to {}", i+2); return (i+2) + ""; }) .subscribe(); Thread.sleep(100); }由结果可见,subscribe是反向处理,因此先触发parallel,后触发single,因此都是使用的single subscribeOn和publishOn混合使用看个例子代码: @Test public void subscribeOnTest() throws InterruptedException { Flux.range(1,2) .map(i -> { log.info("Map 1, the value map to: {}", i*i); return i*i; }) .publishOn(Schedulers.single()) .map(i -> { log.info("Map 2, the value map to: {}", -i); return -i; }) .subscribeOn(Schedulers.newParallel("parallel", 4)) .map(i -> { log.info("Map 3, the value map to {}", i+2); return (i+2) + ""; }) .subscribe(); Thread.sleep(100); }github gitee |
今日新闻 |
推荐新闻 |
CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3 |