07 |
您所在的位置:网站首页 › spring5响应式编程能火吗 › 07 |
SpringWebFlux介绍
简介
SpringWebFlux是Spring5添加的新模块,用于Web开发,功能和SpringMvc类似的,WebFlux使用当前一种比较流行的响应式编程框架
![]() 异步和同步针对调度者,调用者发送请求,如果等待对方回应之后才去做其他事情,就是同步,如果发送请求之后不等着对方回应就去做其他事情就是异步 阻塞和非阻塞针对被调度者,被调度者收到请求后,做完请求任务之后才给出反馈就是阻塞,收到请求之后马上给出反馈然后去做事情,就是非阻塞 WebFlux特点 非阻塞式: 在有限资源下,提高系统吞吐量和伸缩性,以Reactor为基础实现响应式编程 函数式编程: Spring5框架基于Java8, WebFlux使用Java8函数式编程方式实现路由请求 比较SpringMvc ![]() 响应式编程是一种面向数据流和变化传播的编程范式,这意味着可以在编程语言中很方便的表达静态或动态的数据流,而相关的计算模型会自动将变化的值通过数据流进行传播,电子表格程序就是响应式编程的一个例子,单元格可以包含字面值或类似"=B1+C1"的公式,而包含公式的单元格的值会依据其他单元格的变化而变化. Java8及其之前的版本提供的 "观察者模式" 两个类Observer 和 Observable 新建SpringBoot项目新建一个SpringBoot的项目,我直接建一个模块算了 ok了 编写代码 package com.dance.webflux.reactor8; import lombok.extern.slf4j.Slf4j; import java.util.Observable; import java.util.Observer; @Slf4j public class ObserverDemo extends Observable { public static void main(String[] args) { ObserverDemo observerDemo = new ObserverDemo(); // 添加观察者 observerDemo.addObserver((o, arg) -> { log.info("o:{},arg:{}",o,arg); System.out.println("发生变化"); }); // 添加观察者 observerDemo.addObserver((o, arg) -> { log.info("o:{},arg:{}",o,arg); System.out.println("手动被观察者通知,准备改变"); }); // 发生改变 observerDemo.setChanged(); // 通知观察者 observerDemo.notifyObservers(); } }执行结果 23:00:10.650 [main] INFO com.dance.webflux.reactor8.ObserverDemo - o:com.dance.webflux.reactor8.ObserverDemo@504bae78,arg:null 手动被观察者通知,准备改变 23:00:10.663 [main] INFO com.dance.webflux.reactor8.ObserverDemo - o:com.dance.webflux.reactor8.ObserverDemo@504bae78,arg:null 发生变化 响应式编程(Reactor实现) 简介 响应式编程操作中,Reactor是满足Reactive规范框架 Reactor有两个核心类, Mono和Flux,这两个类实现接口Publisher,提供丰富的操作符,Flux对象实现发布者,返回N个元素,Mono对象实现发布者,返回1或者0个元素 Flux和Mono都是数据流的发布者,使用Flux和Mono都可以发出三种数据信号,"元素值","错误信号","完成信号",错误信号和完成信号都代表终止信号,终止信号用于告诉订阅者数据流结束了,错误信号终止数据流同时把错误信息传递给订阅者 真的,去看一下Java8吧,不然真看不懂 订阅数据流调用just或者其他方法只是声明数据流,数据流并没有发出,只有在进行订阅之后才会触发数据流,不订阅什么都不会发生 // 订阅数据流 flux.subscribe(x -> System.out.print(x + " ")); System.out.println(); mono.subscribe(System.out::println); 执行结果 1 2 3 1 操作符对数据进行一道道操作,称为操作符,比如工厂流水线 第一: map 元素映射为新元素(来自StreamAPI) 第二 flatmap 元素映射为流 把每个元素转换为流 把转换之后多个流合并为一个流 SpringWebFlux 基于Reactor,默认使用容器是Netty, Netty是高性能NIO框架,异步非阻塞的框架 NettyBIO SpringWebFlux执行过程和SpringMvc相似 SpringWebFlux 核心控制器 DispatchHandler,实现接口WebHandler 接口WebHandler有一个接口
因为没有添加依赖所以IDEA中找不到(我找了好长时间) 添加WebFlux依赖 org.springframework.boot spring-boot-starter-webfluxFAQ 如果添加依赖失败可以添加阿里云仓库 aliyun https://maven.aliyun.com/repository/public true false aliyun-plugin https://maven.aliyun.com/repository/public true falseWebHandler // // Source code recreated from a .class file by IntelliJ IDEA // (powered by FernFlower decompiler) // package org.springframework.web.server; import reactor.core.publisher.Mono; public interface WebHandler { Mono handle(ServerWebExchange exchange); }可以看出返回的就是Mono,一个或零个元素 DispatchHandler实现 @Override public Mono handle(ServerWebExchange exchange) { // http请求 // 判断请求映射集合是否为空 if (this.handlerMappings == null) { return createNotFoundError(); } // 获取Request 判断是否存在前置处理 if (CorsUtils.isPreFlightRequest(exchange.getRequest())) { return handlePreFlight(exchange); } /** * 将映射集合转为Flux发布(Flux.fromIterable),通过映射集合中的映射获取匹配, * 然后判断匹配完成后是否为空,为空返回没有找到(switchIfEmpty),然后流化执行handler处理器 * (invokeHandler),然后执行返回结果处理(handleResult) 返回一个或零个元素 Mono * / return Flux.fromIterable(this.handlerMappings) .concatMap(mapping -> mapping.getHandler(exchange)) .next() .switchIfEmpty(createNotFoundError()) .flatMap(handler -> invokeHandler(exchange, handler)) .flatMap(result -> handleResult(exchange, result)); } 组件介绍DispatchHandler: 负责请求的处理 HandlerMapping: 请求映射处理 HandlerAdapter: 请求适配处理 HandlerResultHandler: 响应结果处理 函数式编程接口SpringWebFlux 实现函数式编程,两个接口,RouteFunction(路由处理)和HandlerFunction(处理函数) SpringWebFlux(基于注解编程模型) SpringWebFlux实现方式有两种: 注解编程模型和函数式编程模型 使用注解编程模型方式,和之前SpringMvc使用类似的,只需要把相关依赖配置到项目中,SpringBoot自动配置相关运行容器,默认情况下使用Netty服务器 创建SpringBoot项目,引入WebFlux-Starter前面已经创建和引入了 配置文件修改application.properties server.port=8081 创建实体类 package com.dance.webflux.entity; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; @Data @AllArgsConstructor @NoArgsConstructor public class User { private String name; private String sex; private Integer age; } 创建Service新建UserService package com.dance.webflux.service; import com.dance.webflux.entity.User; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; public interface UserService { /** * 根据ID获取用户信息 * @param id id * @return user */ Mono getUserById(int id); /** * 获取全部用户信息 * @return 用户信息 */ Flux getAllUser(); /** * 保存用户信息 * @param user 用户信息 * @return void */ Mono saveUserInfo(Mono user); }实现接口 package com.dance.webflux.service.impl; import com.dance.webflux.entity.User; import com.dance.webflux.service.UserService; import org.springframework.stereotype.Service; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.util.HashMap; import java.util.Map; @Service public class UserServiceImpl implements UserService { /** * 创建 map 集合存储数据,模拟数据库 */ private final Map users = new HashMap(); { this.users.put(1,new User("lucy","nan",20)); this.users.put(2,new User("mary","nv",30)); this.users.put(3,new User("jack","nv",50)); } @Override public Mono getUserById(int id) { // 返回一个或者0个元素 return Mono.justOrEmpty(users.get(id)); } @Override public Flux getAllUser() { // 返回多个元素,返回全部的值 return Flux.fromIterable(this.users.values()); } @Override public Mono saveUserInfo(Mono user) { // 处理数据后返回空 return user.doOnNext(x -> { int key = users.size() + 1; users.put(key,x); }).thenEmpty(Mono.empty()); } } 创建Controller package com.dance.webflux.controller; import com.dance.webflux.entity.User; import com.dance.webflux.service.UserService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.*; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @RestController public class UserController { @Autowired private UserService userService; @GetMapping("/getUserById/{id}") public Mono getUserById(@PathVariable Integer id){ return userService.getUserById(id); } @GetMapping("/getAllUser") public Flux getAllUser(){ return userService.getAllUser(); } @PostMapping("/saveUserInfo") public Mono saveUserInfo(@RequestBody User user){ return userService.saveUserInfo(Mono.just(user)); } } 启动项目FAQ,我在启动的时候报错了,经过排查后是应为之前为了看类的使用引入的依赖的原因,导致JAR包冲突了 io.projectreactor reactor-core 3.0.3.RELEASE将这个JAR包注释掉后,重新启动就ok了 10:46:27.550 [Thread-1] DEBUG org.springframework.boot.devtools.restart.classloader.RestartClassLoader - Created RestartClassLoader org.springframework.boot.devtools.restart.classloader.RestartClassLoader@7d031891 . ____ _ __ _ _ /\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \ ( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \ \\/ ___)| |_)| | | | | || (_| | ) ) ) ) ' |____| .__|_| |_|_| |_\__, | / / / / =========|_|==============|___/=/_/_/_/ :: Spring Boot :: (v2.6.1) 2021-12-13 10:46:27.952 INFO 21120 --- [ restartedMain] com.dance.webflux.WebfluxApplication : Starting WebfluxApplication using Java 1.8.0_162 on ZB-PF2P9QVH with PID 21120 (D:\[email protected]\coding\Spring5\webflux\target\classes started by ext.caiyuanqing in D:\[email protected]\coding\Spring5) 2021-12-13 10:46:27.954 INFO 21120 --- [ restartedMain] com.dance.webflux.WebfluxApplication : No active profile set, falling back to default profiles: default 2021-12-13 10:46:27.996 INFO 21120 --- [ restartedMain] .e.DevToolsPropertyDefaultsPostProcessor : Devtools property defaults active! Set 'spring.devtools.add-properties' to 'false' to disable 2021-12-13 10:46:27.996 INFO 21120 --- [ restartedMain] .e.DevToolsPropertyDefaultsPostProcessor : For additional web related logging consider setting the 'logging.level.web' property to 'DEBUG' 2021-12-13 10:46:28.777 INFO 21120 --- [ restartedMain] o.s.b.d.a.OptionalLiveReloadServer : LiveReload server is running on port 35729 2021-12-13 10:46:29.151 INFO 21120 --- [ restartedMain] o.s.b.web.embedded.netty.NettyWebServer : Netty started on port 8081 2021-12-13 10:46:29.158 INFO 21120 --- [ restartedMain] com.dance.webflux.WebfluxApplication : Started WebfluxApplication in 1.598 seconds (JVM running for 2.507) 测试接口 根据ID获取用户信息 http://localhost:8081/getUserById/1说明 SpringMvc方式实现,同步阻塞的方式, 基于SpringMvc+Servlet+Tomcat SpringWebFlux方式实现,异步非阻塞的方式,基于SpringWebFlux+Reactor+Netty SpringWebFlux(基于函数式编程模型) 在使用函数式编程模型操作的时候,需要自己初始化服务器 基于函数式编程模型的时候,有两个核心接口,RouterFunction(实现路由功能, 请求转发给相应的Handler)和HandlerFunction(处理请求生成响应的函数),核心任务定义两个函数式接口的实现,并且启动需要的服务器 SpringWebFlux请求和响应不再是ServletRequest和ServletResponse,而是ServerRequest和ServerResponse 创建Handler(具体实现方法) package com.dance.webflux.handler; import com.dance.webflux.entity.User; import com.dance.webflux.service.UserService; import org.springframework.http.MediaType; import org.springframework.web.reactive.function.server.ServerRequest; import org.springframework.web.reactive.function.server.ServerResponse; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; public class UserHandler { private final UserService userService; public UserHandler(UserService userService) { this.userService = userService; } /** * 根据ID查询用户信息 * @param serverRequest 请求体 * @return 响应体 */ public Mono getUserById(ServerRequest serverRequest){ int id = Integer.parseInt(serverRequest.pathVariable("id")); Mono userById = userService.getUserById(id); return userById.flatMap(user -> ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).bodyValue(user).switchIfEmpty(ServerResponse.notFound().build())); } /** * 查询所有用户 * @param serverRequest 请求体 * @return 响应体 */ public Mono getUserAll(ServerRequest serverRequest){ Flux allUser = userService.getAllUser(); return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(allUser,User.class); } /** * 保存用户信息 * @param serverRequest 请求体 * @return 响应体 */ public Mono saveUserInfo(ServerRequest serverRequest){ Mono userMono = serverRequest.bodyToMono(User.class); return ServerResponse.ok().build(userService.saveUserInfo(userMono)); } } 创建服务器(路由和适配器) package com.dance.webflux; import com.dance.webflux.handler.UserHandler; import com.dance.webflux.service.impl.UserServiceImpl; import org.springframework.boot.autoconfigure.rsocket.RSocketProperties; import org.springframework.http.MediaType; import org.springframework.http.server.reactive.HttpHandler; import org.springframework.http.server.reactive.ReactorHttpHandlerAdapter; import org.springframework.web.reactive.function.server.RouterFunction; import org.springframework.web.reactive.function.server.RouterFunctions; import org.springframework.web.reactive.function.server.ServerResponse; import reactor.netty.DisposableServer; import reactor.netty.http.server.HttpServer; import java.io.IOException; import static org.springframework.web.reactive.function.server.RequestPredicates.*; import static org.springframework.web.reactive.function.server.RouterFunctions.toHttpHandler; public class FunctionServer { /** * 创建路由 * @return 路由函数 */ public RouterFunction routingFunction(){ // 创建UserService UserServiceImpl userService = new UserServiceImpl(); // 创建处理器 UserHandler userHandler = new UserHandler(userService); // 设置路由 return RouterFunctions .route(GET("/user/{id}").and(accept(MediaType.APPLICATION_JSON)), userHandler::getUserById) .andRoute(GET("/users").and(accept(MediaType.APPLICATION_JSON)), userHandler::getUserAll) .andRoute(POST("/saveUserInfo"), userHandler::saveUserInfo); } /** * 创建服务器完成适配 */ public void createReactorServer(){ // 路由和Handler适配 RouterFunction serverResponseRouterFunction = routingFunction(); // 转换为HttpHandler HttpHandler httpHandler = toHttpHandler(serverResponseRouterFunction); // 转换为适配器 ReactorHttpHandlerAdapter reactorHttpHandlerAdapter = new ReactorHttpHandlerAdapter(httpHandler); // 创建HttpServer 设置端口 HttpServer httpServer = HttpServer.create().port(8081); // 指定处理适配器并绑定 DisposableServer disposableServer = httpServer.handle(reactorHttpHandlerAdapter).bindNow(); } /** * 启动 * @param args 参数 */ public static void main(String[] args) throws IOException { FunctionServer server = new FunctionServer(); server.createReactorServer(); System.out.println("enter to exit"); System.in.read(); } } 启动项目启动main方法 16:06:37.931 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework 16:06:38.773 [main] DEBUG org.springframework.web.server.adapter.HttpWebHandlerAdapter - enableLoggingRequestDetails='false': form data and headers will be masked to prevent unsafe logging of potentially sensitive data 16:06:38.854 [main] DEBUG io.netty.util.internal.logging.InternalLoggerFactory - Using SLF4J as the default logging framework 16:06:38.877 [main] DEBUG io.netty.util.internal.PlatformDependent0 - -Dio.netty.noUnsafe: false 16:06:38.878 [main] DEBUG io.netty.util.internal.PlatformDependent0 - Java version: 8 16:06:38.883 [main] DEBUG io.netty.util.internal.PlatformDependent0 - sun.misc.Unsafe.theUnsafe: available 16:06:38.885 [main] DEBUG io.netty.util.internal.PlatformDependent0 - sun.misc.Unsafe.copyMemory: available 16:06:38.887 [main] DEBUG io.netty.util.internal.PlatformDependent0 - java.nio.Buffer.address: available 16:06:38.888 [main] DEBUG io.netty.util.internal.PlatformDependent0 - direct buffer constructor: available 16:06:38.891 [main] DEBUG io.netty.util.internal.PlatformDependent0 - java.nio.Bits.unaligned: available, true 16:06:38.894 [main] DEBUG io.netty.util.internal.PlatformDependent0 - jdk.internal.misc.Unsafe.allocateUninitializedArray(int): unavailable prior to Java9 16:06:38.894 [main] DEBUG io.netty.util.internal.PlatformDependent0 - java.nio.DirectByteBuffer.(long, int): available 16:06:38.895 [main] DEBUG io.netty.util.internal.PlatformDependent - sun.misc.Unsafe: available 16:06:38.896 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.tmpdir: C:\Users\EXT~1.CAI\AppData\Local\Temp (java.io.tmpdir) 16:06:38.896 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.bitMode: 64 (sun.arch.data.model) 16:06:38.897 [main] DEBUG io.netty.util.internal.PlatformDependent - Platform: Windows 16:06:38.900 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.maxDirectMemory: 3760193536 bytes 16:06:38.900 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.uninitializedArrayAllocationThreshold: -1 16:06:38.902 [main] DEBUG io.netty.util.internal.CleanerJava6 - java.nio.ByteBuffer.cleaner(): available 16:06:38.902 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.noPreferDirect: false 16:06:39.003 [main] DEBUG io.netty.util.ResourceLeakDetector - -Dio.netty.leakDetection.level: simple 16:06:39.003 [main] DEBUG io.netty.util.ResourceLeakDetector - -Dio.netty.leakDetection.targetRecords: 4 16:06:39.115 [main] DEBUG reactor.netty.tcp.TcpResources - [http] resources will use the default LoopResources: DefaultLoopResources {prefix=reactor-http, daemon=true, selectCount=8, workerCount=8} 16:06:39.115 [main] DEBUG reactor.netty.tcp.TcpResources - [http] resources will use the default ConnectionProvider: reactor.netty.resources.DefaultPooledConnectionProvider@26275bef 16:06:39.118 [main] DEBUG reactor.netty.resources.DefaultLoopIOUring - Default io_uring support : false 16:06:39.744 [main] DEBUG reactor.netty.resources.DefaultLoopEpoll - Default Epoll support : false 16:06:39.746 [main] DEBUG reactor.netty.resources.DefaultLoopKQueue - Default KQueue support : false 16:06:39.768 [main] DEBUG io.netty.channel.MultithreadEventLoopGroup - -Dio.netty.eventLoopThreads: 16 16:06:39.819 [main] DEBUG io.netty.util.internal.InternalThreadLocalMap - -Dio.netty.threadLocalMap.stringBuilder.initialSize: 1024 16:06:39.819 [main] DEBUG io.netty.util.internal.InternalThreadLocalMap - -Dio.netty.threadLocalMap.stringBuilder.maxSize: 4096 16:06:39.836 [main] DEBUG io.netty.channel.nio.NioEventLoop - -Dio.netty.noKeySetOptimization: false 16:06:39.836 [main] DEBUG io.netty.channel.nio.NioEventLoop - -Dio.netty.selectorAutoRebuildThreshold: 512 16:06:39.869 [main] DEBUG io.netty.util.internal.PlatformDependent - org.jctools-core.MpscChunkedArrayQueue: available 16:06:39.961 [main] DEBUG io.netty.channel.DefaultChannelId - -Dio.netty.processId: 18728 (auto-detected) 16:06:39.965 [main] DEBUG io.netty.util.NetUtil - -Djava.net.preferIPv4Stack: false 16:06:39.965 [main] DEBUG io.netty.util.NetUtil - -Djava.net.preferIPv6Addresses: false 16:06:40.496 [main] DEBUG io.netty.util.NetUtilInitializations - Loopback interface: lo (Software Loopback Interface 1, 127.0.0.1) 16:06:40.498 [main] DEBUG io.netty.util.NetUtil - Failed to get SOMAXCONN from sysctl and file \proc\sys\net\core\somaxconn. Default: 200 16:06:40.971 [main] DEBUG io.netty.channel.DefaultChannelId - -Dio.netty.machineId: 00:50:56:ff:fe:c0:00:08 (auto-detected) 16:06:41.012 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.numHeapArenas: 16 16:06:41.012 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.numDirectArenas: 16 16:06:41.012 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.pageSize: 8192 16:06:41.012 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.maxOrder: 11 16:06:41.012 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.chunkSize: 16777216 16:06:41.013 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.smallCacheSize: 256 16:06:41.013 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.normalCacheSize: 64 16:06:41.013 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.maxCachedBufferCapacity: 32768 16:06:41.013 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.cacheTrimInterval: 8192 16:06:41.013 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.cacheTrimIntervalMillis: 0 16:06:41.013 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.useCacheForAllThreads: true 16:06:41.013 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.maxCachedByteBuffersPerChunk: 1023 16:06:41.028 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.allocator.type: pooled 16:06:41.028 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.threadLocalDirectBufferSize: 0 16:06:41.028 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.maxThreadLocalCharBufferSize: 16384 16:06:41.216 [reactor-http-nio-1] DEBUG reactor.netty.transport.ServerTransport - [40eb46d7, L:/0:0:0:0:0:0:0:0:62011] Bound new server enter to exit 测试 根据ID获取用户信息 http://localhost:8081/user/1 |
今日新闻 |
推荐新闻 |
CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3 |