RxJS官方教程(五) Subject

您所在的位置:网站首页 subject反义 RxJS官方教程(五) Subject

RxJS官方教程(五) Subject

2023-06-30 01:03| 来源: 网络整理| 查看: 265

RxJS官方教程(一) 概览 RxJS官方教程(二) Observable RxJS官方教程(三) Observable剖析 RxJS官方教程(四) Observer和Subscription RxJS官方教程(五) Subject RxJS官方教程(六) 算子 Subject 主题

什么是Subject(主题)?RxJS Subject是一种特殊类型的Observable,允许将值多播到多个观察者Observer。虽然普通的Observable是单播的(每个订阅的Observer都拥有Observable的独立执行),但Subject是多播的。

Subject类似于Observable,但可以多播到许多观察者。Subject就像EventEmitters:它们维护着许多观察者的注册表。

每个Subject都是一个Observable。给定一个主题,您可以通过一个Observer subscribe他,它将开始正常接收值。从Observer的角度来看,它无法判断Observable执行是来自简单的单播Observable还是Subject。

在Subject的内部,subscribe不会调用一个新的传递值的执行。它只是将给定的Observer注册到Observers列表中,类似于其他库和语言中的addListener的工作方式。

每个Subject都是一个Observer。它是一个含有next(v),error(e)和complete()的对象。要向主题提供新值,只需调用next(theValue),它将被多播到已注册接受该主题的观察者。

在下面的示例中,我们有两个观察者订阅了主题,我们向主题提供一些值:

var subject = new Rx.Subject(); subject.subscribe({ next: (v) => console.log('observerA: ' + v) }); subject.subscribe({ next: (v) => console.log('observerB: ' + v) }) subject.next(1); subject.next(2);

在控制台上使用以下输出:

observerA: 1 observerB: 1 observerA: 2 observerB: 2

由于Subject是Observer,这也意味着您可以提供Subject作为任何Observable subscribe的参数,如下例所示:

var subject = new Rx.Subject(); subject.subscribe({ next: (v) => console.log('observerA: ' + v) }) subject.subscribe({ next: (v) => console.log('observerB: ' + v) }) var observable = Rx.Observable.from([1,2,3]); observable.subscribe(subject); // You can subscribe providing a Subject

其执行如下:

observerA: 1 observerB: 1 observerA: 2 observerB: 2 observerA: 3 observerB: 3

通过上面的方法,我们基本上只是通过Subject将单播Observable执行转换为多播。这演示了Subject是如何把Observable执行共享给多个Observer的唯一方法。

还有的几个Subject特例:BehaviorSubject,ReplaySubject,和AsyncSubject。

Multicasted Observable 多播的Observable

“多播Observable”通过可能有许多订阅者的Subject传递通知,而普通的“单播Observable”仅向单个Observer发送通知。

多播Observable使用一个Subject来使多个Observers看到相同的Observable执行。

这其实是multicast运算符的工作方式:观察者订阅基础主题,主题订阅源Observable。以下示例类似于上一个使用的示例observable.subscribe(subject):

var source = Rx.Observable.from([1,2,3]); var subject = new Rx.Subject(); var multicasted = source.multicast(subject); // These are, under the hood, `subject.subscribe({...})`: multicasted.subscribe({ next: (v) => console.log('observerA: ' + v) }); multicasted.subscribe({ next: (v) => console.log('observerB: ' + v) }); // This is, under the hood, `source.subscribe(subject)`: multicasted.connect();

multicast返回一个看起来像普通Observable的Observable,但在订阅时就像Subject一样工作。multicast返回一个ConnectableObservable,它只是一个带有connect()方法的Observable 。

该connect()方法对于确定何时开始共享Observable执行非常重要。因为connect()执行了source.subscribe(subject),并且connect()返回一个订阅,你可以执行取消订阅来取消Observable的执行。

参考计数

手动调用connect()和处理订阅通常很麻烦。通常,我们希望在第一个Observer到达时自动连接,并在最后一个Observer取消订阅时自动取消共享执行。

请考虑以下示例,其中订阅按此列表所述进行:

First Observer订阅了多播Observable 多播的Observable已连接 该next值0将传递给第一个Observer Second Observer订阅了多播Observable 该next值1将传递给第一个Observer 该next值1将传递给第二个Observer First Observer取消订阅多播Observable 该next值2将传递给第二个Observer Second Observer取消订阅了多播Observable 与多播Observable的连接已取消订阅

为了通过显式调用connect()实现这一点,我们编写以下代码:

var source = Rx.Observable.interval(500); var subject = new Rx.Subject(); var multicasted = source.multicast(subject); var subscription1, subscription2, subscriptionConnect; subscription1 = multicasted.subscribe({ next: (v) => console.log('observerA: ' + v) }); // We should call `connect()` here, because the first // subscriber to `multicasted` is interested in consuming values subscriptionConnect = multicasted.connect(); setTimeout(() => { subscription2 = multicasted.subscribe({ next: (v) => console.log('observerB: ' + v) }); }, 600); setTimeout(() => { subscription1.unsubscribe(); }, 1200); // We should unsubscribe the shared Observable execution here, // because `multicasted` would have no more subscribers after this setTimeout(() => { subscription2.unsubscribe(); subscriptionConnect.unsubscribe(); // for the shared Observable execution }, 2000);

如果我们希望避免显式调用connect(),我们可以使用ConnectableObservable的refCount()方法(引用计数),该方法返回一个Observable,用于跟踪它拥有多少订阅者。当订阅者数量从0增加到时1,它将调用connect()我们,这将启动共享执行。只有当订阅者数量从减少1到0完全取消订阅时,才会停止进一步执行。

refCount 使多播Observable在第一个订阅者到达时自动开始执行,并在最后一个订阅者离开时停止执行。

以下是一个例子:

var source = Rx.Observable.interval(500); var subject = new Rx.Subject(); var refCounted = source.multicast(subject).refCount(); var subscription1, subscription2, subscriptionConnect; // This calls `connect()`, because // it is the first subscriber to `refCounted` console.log('observerA subscribed'); subscription1 = refCounted.subscribe({ next: (v) => console.log('observerA: ' + v) }); setTimeout(() => { console.log('observerB subscribed'); subscription2 = refCounted.subscribe({ next: (v) => console.log('observerB: ' + v) }); }, 600); setTimeout(() => { console.log('observerA unsubscribed'); subscription1.unsubscribe(); }, 1200); // This is when the shared Observable execution will stop, because // `refCounted` would have no more subscribers after this setTimeout(() => { console.log('observerB unsubscribed'); subscription2.unsubscribe(); }, 2000);

哪个执行输出:

observerA subscribed observerA: 0 observerB subscribed observerA: 1 observerB: 1 observerA unsubscribed observerB: 2 observerB unsubscribed

该refCount()方法仅存在于ConnectableObservable上,它返回一个Observable而不是另一个ConnectableObservable。

BehaviorSubject

Subject的变体之一是BehaviorSubject,具有“当前值”的概念。它存储发布给消费者的最新值,每当新的Observer订阅时,它将立即从BehaviorSubject中获得“当前值” 。

BehaviorSubject用于表示“随时间变化的值”。例如,生日的事件流是Subject,但是人的年龄的流将是BehaviorSubject。

在以下示例中,BehaviorSubject使用0初始化,第一个Observer在订阅时将接收到0。第二个Observer将接收到2,即使它在2发送后才订阅的。

var subject = new Rx.BehaviorSubject(0); // 0 is the initial value subject.subscribe({ next: (v) => console.log('observerA: ' + v) }); subject.next(1); subject.next(2); subject.subscribe({ next: (v) => console.log('observerB: ' + v) }); subject.next(3);

带输出:

observerA: 0 observerA: 1 observerA: 2 observerB: 2 observerA: 3 observerB: 3 ReplaySubject

ReplaySubject类似于BehaviorSubject,它可以将旧值发送给新订阅者,但它也可以记录Observable执行的一部分。

ReplaySubject记录来自Observable执行的多个值,并将它们重放给新订阅者。

创建ReplaySubject时,您可以指定要重播的值的数量:

var subject = new Rx.ReplaySubject(3); // buffer 3 values for new subscribers subject.subscribe({ next: (v) => console.log('observerA: ' + v) }); subject.next(1); subject.next(2); subject.next(3); subject.next(4); subject.subscribe({ next: (v) => console.log('observerB: ' + v) }); subject.next(5);

输出:

observerA: 1 observerA: 2 observerA: 3 observerA: 4 observerB: 2 observerB: 3 observerB: 4 observerA: 5 observerB: 5

除了缓冲区大小之外,您还可以指定以毫秒为单位的窗口时间,以确定记录值的年龄。在下面的示例中,我们使用大缓冲区大小100,但窗口时间参数仅为500毫秒。

var subject = new Rx.ReplaySubject(100, 500 /* windowTime */); subject.subscribe({ next: (v) => console.log('observerA: ' + v) }); var i = 1; setInterval(() => subject.next(i++), 200); setTimeout(() => { subject.subscribe({ next: (v) => console.log('observerB: ' + v) }); }, 1000);

输出如下,其中第二个Observer将获取发生在订阅之前最后500毫秒的事件3,4和5:

observerA: 1 observerA: 2 observerA: 3 observerA: 4 observerA: 5 observerB: 3 observerB: 4 observerB: 5 observerA: 6 observerB: 6 ... AsyncSubject

AsyncSubject只将Observable执行的最后一个值发送给它的观察者,并且只有在执行完成时才会发送。

var subject = new Rx.AsyncSubject(); subject.subscribe({ next: (v) => console.log('observerA: ' + v) }); subject.next(1); subject.next(2); subject.next(3); subject.next(4); subject.subscribe({ next: (v) => console.log('observerB: ' + v) }); subject.next(5); subject.complete();

输出:

observerA: 5 observerB: 5

AsyncSubject类似于last()运算符,因为它等待complete通知以便传递单个值。

小结 BehaviorSubject 缓存一个值的Subject ReplaySubject 缓存多个值的Subject AsyncSubject 只返回最后一个值的Subject

官网 http://reactivex.io/rxjs/manual/overview.html#subject



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3