您的位置 首页 java

RxJava2.X 源码解析(二):神秘的取消订阅流程

作者博客

前言

基于RxJava2.1.1

本篇我们将探索RxJava2.x提供给我们的Disposable能力的来源。

要相信,任何神奇的功能,当你探索了其本质之后,收获都是巨大的。

Demo 到原理

( ̄∇ ̄)猜猜会输出什么呢?

在发送玩hello之后,成功终止了后面的Reactive流。从结果我们还发现,后面的Reactive流被终止了,也就是订阅者或者观察者收不到后面的信息了,但是生产者或者说被订阅者、被观察者的代码还是会继续执行的。

Ok,我们从哪开始入手呢?我们发现,在我们执行了 disposable.dispose;后,触发了该事件,我们看下 disposable.dispose;到底做了什么呢,很开心的,我们点进 disposable.dispose;的源码,╮(╯_╰)╭,好吧,只是接口

此时我们要回忆一下上一篇的一段代码

我们之前分析到在执行source.subscribe(parent);触发数据分发事件之前先执行了 observer .onSubscribe(parent);这句代码,所传入的parent也就对应了我们的Disposable

parent是CreateEmitter类型的,但是CreateEmitter是实现了Disposable接口的一个类。而parent又是我们的observer的一个包装后的对象。

OK,分析到这里我们来整理下前面的环节,根据Demo来解释下:首先在执行下面代码之前

先执行了observer.onSubscribe(parent);,我们在demo中也是通过传入的parent调用其dispose方法来终止Reactive流,而执行分发hello等数据的e也是我们的parent,也就是他们都是同一个对象。而执行e.onNext(“hello”);的e对象也是observer的一个包装后的ObservableEmitter类型的对象。

总结:Observer自己来控制了Reactive流状态。

Ok,此时如果我说关键点应该在ObservableEmitter这个类上面,你觉得可能性有多少呢?( ̄∇ ̄)

关键点就是CreateEmitter<T> parent = new CreateEmitter<T>(observer);这个包装的过程,我们来看下其源码

因为其实现了ObservableEmitter<T>, Disposable接口类,所以需实现其方法。这里其实是使用了装饰者模式,其魅力所在一会就会看到了。

我们可以看到在ObservableEmitter内部通过final Observersuper T> observer;存储了我们的observer,这样有什么用呢?看Demo,我们在调用e.onNext(“hello”);时,调用的时ObservableEmitter对象的onNext方法,然后ObservableEmitter对象的onNext方法内部再通过observer调用onNext方法,但是从源码我们可以发现,其并不是简单的调用哦。

1、先判断传入的数据是否为

2、判断isDisposed,如果isDisposed返回 false 则不执行onNext。

isDisposed什么时候会返回false呢?按照demo,也就是我们调用了disposable.dispose;后,disposable前面分析了就是CreateEmitter<T> parent,我们看CreateEmitter.dispose

里面调用DisposableHelper.dispose(this);,我们看isDisposed

RxJava的onComplete;与onError(t);只有一个会被执行的秘密原来是它?

再看另外两个方法的调用

其内部也基本做了同样的操作,先判断!isDisposed后再决定是否执行。

但是再这里还有一点哦,我们应该知道onComplete;和onError(t)只有一个会发生,其实现原理也是通过isDisposed这个方法哦,我们可以看到,不关是先执行onComplete;还是先执行onError(t),最终都会调用dispose;,而调用了dispose;后,isDisposed为false,也就不会再执行另外一个了。而且如果人为先调用onError再调用onComplete,onComplete不会被触发,而且会抛出PointerException异常。

小结:

此时我们的目的基本达到了,我们知道了Reactive流是如何被终止的以及RxJava的onComplete;与onError(t);只有一个会被执行的原因。

我们虽然知道了原因,但是秉着刨根问底的态度,抵挡不住内心的好奇,我还是决定挖一挖DisposableHelper这个类,当然如果不想了解DisposableHelper的话,看到这里也就可以了;

Ok,前面分析到,代码里调用了DisposableHelper类的 静态方法 ,我们看下其调用的两个静态方法分别做了什么?

1、DISPOSED:作为是否要终止的 枚举类型 的标识

2、isDisposed:判断上次记录的终点标识的是否是 当前执行的Observer,如果是返回true

3、dispose:采用了原子性引用类AtomicReference,目的是防止多线程操作出现的错误。

更详细的分析放入了代码中

总结

通过本次,1、我们了解了RxJava的随意终止Reactive流的能力的来源;2、过程中也明白了RxJava的onComplete;与onError(t);只有一个会被执行的秘密。

实现该能力的主要方式还是利用了装饰者模式

从中体会了设计模式的魅力所在,当然我们还接触了AtomicReference这个类,在平时估计很少接触到。

后续会继续分析RxJava的各种魔力点。

先关推荐

文章来源:智云一二三科技

文章标题:RxJava2.X 源码解析(二):神秘的取消订阅流程

文章地址:https://www.zhihuclub.com/173792.shtml

关于作者: 智云科技

热门文章

网站地图