RxJava 3 新不同

您所在的位置:网站首页 Rxjava计数 RxJava 3 新不同

RxJava 3 新不同

2023-08-10 21:56| 来源: 网络整理| 查看: 265

本文已参与「新人创作礼」活动,一起开启掘金创作之路。

包结构

RxJava 3 组件位于包io.reactivex.rxjava3下(RxJava 1 有rx包, RxJava 2就只是io.reactivex. 这允许版本3与先前版本共存. 此外, RxJava的核心类型(Flowable, Observer, 等)移动到了包io.reactivex.rxjava3.core下.

组件RxJava 2RxJava 3Coreio.reactivexio.reactivex.rxjava3.coreAnnotationsio.reactivex.annotationsio.reactivex.rxjava3.annotationsDisposablesio.reactivex.disposablesio.reactivex.rxjava3.disposablesExceptionsio.reactivex.exceptionsio.reactivex.rxjava3.exceptionsFunctionsio.reactivex.functionsio.reactivex.rxjava3.functionsFlowablesio.reactivex.flowablesio.reactivex.rxjava3.flowablesObservablesio.reactivex.observablesio.reactivex.rxjava3.observablesSubjectsio.reactivex.subjectsio.reactivex.rxjava3.subjectsProcessorsio.reactivex.processorsio.reactivex.rxjava3.processorsObserversio.reactivex.observersio.reactivex.rxjava3.observersSubscribersio.reactivex.subscribersio.reactivex.rxjava3.subscribersParallelio.reactivex.parallelio.reactivex.rxjava3.parallelInternalio.reactivex.internalio.reactivex.rxjava3.internal

⚠️ 关于从IDE运行“组织导入”的说明

由于命名的匹配, IDE倾向于导入java.util.Observable而非RxJava的io.reactivex.rxjava3.core.Observable. 通常可以使IDE忽略java.util.Observable和java.util.Observer, 或者, 在受影响的文件中显示地指定import io.reactivex.rxjava3.core.Observable;.

而且因为RxJava 3现在要求Java 8, 标准库的函数接口, 诸如java.util.function.Function而非io.reactivex.rxjava3.functions.Function可能会被导入. IDE倾向于给予非描述线的错误, 如"Function can't be converted to Function", 而忽略掉了包存在差异的事实.

行为变更

有时候, 组件和操作符的设计证明是不充分的, 限制太多或者某些场景下是错误的. 像这样的主要版本允许我们进行必要的更改, 这可能会在补丁版本中造成各种问题.

在RxJava 2.x中, 该目标的设定是为了不让错误滑走, 尤其是因为某些原因序列不再能够将数据发送给消费者. 尽管我们尽了最大努力, 但在几十个操作符的竞争条件下, 仍然可能会出现错误.

在2.x的补丁中修复这些问题可能会引起太多麻烦, 由此, 这些修复延迟到了3.x发布, 否则可能已经发生很大的变化. 现在, 取消内部推迟错误的操作符会将这些错误通RxJavaPlugins.onError()发送给全局错误处理器.

undeliverable示例 RxJavaPlugins.setErrorHandler(error -> System.out.println(error)); PublishProcessor main = PublishProcessor.create(); PublishProcessor inner = PublishProcessor.create(); // switchMapDelayError will delay all errors TestSubscriber ts = main.switchMapDelayError(v -> inner).test(); main.onNext(1); // the inner fails inner.onError(new IOException()); // the consumer is still clueless ts.assertEmpty(); // the consumer cancels ts.cancel(); // console prints // io.reactivex.rxjava3.exceptions.UndeliverableException: // The exception could not be delivered to the consumer because // it has already canceled/disposed the flow or the exception has // nowhere to go to begin with. Further reading: // https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0#error-handling // | java.io.IOException 复制代码 Connectable源重置

可重连类型的目的(ConnectableFlowable和ConnectableObservable)是为了允许一个或者多个消费者在真实的上流一旦调用connect()数据开始流动到它们之前准备好了. 首次是能够正确工作的, 但是如果上游终止而非断开连接的时候, 这将有些麻烦. 在终止的场景下, 依赖于可连接对象是通过replay()还是publish()创建的, 新的消费者要么不能够从新连接中接收数据, 要么数据全部丢失.

在3.x中, 可连接对象会在终止时显式地重置. 这个额外的一步允许消费者要么接收缓存数据, 要么为新的连接做好准备.

publish-reset 示例

有了publish, 如果可连接对象终止, 消费者后来的订阅将只接收终止事件. 开发者必须调用reset()才能使后面的消费者从新的连接中接收数据.

ConnectableFlowable connectable = Flowable.range(1, 10).publish(); // prepare consumers, nothing is signaled yet connectable.subscribe(/* ... */); connectable.subscribe(/* ... */); // connect, current consumers will receive items connectable.connect(); // let it terminate Thread.sleep(2000); // late consumers now will receive a terminal event connectable.subscribe( item -> { }, error -> { }, () -> System.out.println("Done!")); // reset the connectable to appear fresh again connectable.reset(); // fresh consumers, they will also be ready to receive connectable.subscribe( System.out::println, error -> { }, () -> System.out.println("Done!") ); // connect, the fresh consumer now gets the new items connectable.connect(); 复制代码 replay-reset 示例

有了replay, 如果可连接对象终止, 消费者后来的订阅将会接收缓存数据. 开发者必须调用reset()来抛弃缓存以使后面的消费者能够接收新鲜的数据.

ConnectableFlowable connectable = Flowable.range(1, 10).replay(); // prepare consumers, nothing is signaled yet connectable.subscribe(System.out::println); connectable.subscribe(System.out::println); // connect, current consumers will receive items connectable.connect(); // let it terminate Thread.sleep(2000); // late consumers will still receive the cached items connectable.subscribe( System.out::println, error -> { }, () -> System.out.println("Done!")); // reset the connectable to appear fresh again connectable.reset(); // fresh consumers, they will also be ready to receive connectable.subscribe( System.out::println, error -> { }, () -> System.out.println("Done!") ); // connect, the fresh consumer now gets the new items connectable.connect(); 复制代码 引用

RxJava 3 新不同 - 1

RxJava 3 新不同 - 2

RxJava 3 新不同 - 3

RxJava 3 新不同 - 4

RxJava 3 新不同 - 5

RxJava 3 新不同 - 6

RxJava 3 新不同 - 7

RxJava 3 新不同 - 8



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3