RxJava源码解析
一,简单使用
1 | Observable observable = Observable.create(new ObservableOnSubscribe<String>() { |
目标:
被观察者 Observable 如何生产事件的?
被观察者 Observable 何时生产事件的?
观察者 Observer 是何时接收到上游事件的?
Observable 与 Observer 是如何关联在一起的?
Observable
Observable 是数据的上游,即事件生产者
首先我们来了解一下事件是如何生成的,我们看一下 *Observable.create()*方法。
1 |
|
最重要的是 **RxJavaPlugins.onAssembly(new ObservableCreate
继续跟踪进去
1 |
|
看注释,原来这个方法是个 hook function 钩子函数。通过调试得知静态对象 onObservableAssembly默认为 null, 所以此方法直接返回传入的参数 source。
钩子函数在RxJava中出现相当多,在系统没有调用函数之前,钩子就先捕获该消息,得到控制权。这时候钩子程序既可以改变该程序的执行,插入我们要执行的代码片段,还可以强制结束消息的传递。我们可以用作全局的监听。也可以做坏事,比如在下面程序中,把observable设置null,那就肯定会报空指针异常,不过我们还是不要这么干O(∩_∩)O
1
2
3
4
5
6
7
8 RxJavaPlugins.setOnObservableAssembly(new Function<Observable, Observable>() {
public Observable apply(Observable observable) throws Throwable {
System.out.println("apply : " + observable);
observable = null;
return observable;
}
});
onObservableAssembly 可以通过静态方法 **RxJavaPlugins. setOnObservableAssembly ()**设置全局的 Hook 函数。
现在我们明白了:
1 | Observable observable = Observable.create(new ObservableOnSubscribe<String>() { |
等价于
1 | Observable observable =new ObservableCreate<>(new ObservableOnSubscribe<String>() { |
好了,至此我们明白了,事件的源就是 new ObservableCreate()**对象,将ObservableOnSubscribe** 作为参数传递给 ObservableCreate 的构造函数。事件是由接口 ObservableOnSubscribe 的 subscribe 方法上产的,至于何时生产事件,稍后再分析。
Observable创建过程时序图如下:
Observer
Observer 是数据的下游,即事件消费者
1 | public interface Observer<@NonNull T> { |
上游发送的事件就是再这几个方法中被消费的。至于上游何时发送事件、如何发送,我们稍后再看
subscribe
observable.subscribe(observer)这个方法就是实现订阅的,是将观察者(observer)与被观察者(observable)连接起来的方法。只有 subscribe 方法执行后,上游产生的事件才能被下游接收并处理。其实自然的方式应该是 observer 订阅(subscribe) observable, 但这样会打断 rxjava 的链式结构。所以采用相反的方式。
1 | public final void subscribe(super T> observer) Observer<? { |
1 | //抽象方法,所以需要到实现类中去看具体实现,也就是说实现是在上文中提到的 ObservableCreate |
接下来我们来看 ObservableCreate.java:
构造函数
1 | public ObservableCreate(ObservableOnSubscribe<T> source) { |
重点是这个subscribeActual方法:
1 |
|
现在我们明白了,数据源生产事件的 subscribe 方法只有在observable.subscribe(observer)被执行后才执行的。 换言之,事件流是在订阅后才产生的。而 observable 被创建出来时并不生产事件,同时也不发射事件。
接下来我们再来看看事件是如何被发射出去,同时 observer 是如何接收到发射的事件的
1 | CreateEmitter<T> parent = new CreateEmitter<T>(observer); |
CreateEmitter 实现了 ObservableEmitter 接口,同时 ObservableEmitter 接口又继承了Emitter 接口。
CreateEmitter 还实现了 Disposable 接口,这个 disposable 接口是用来判断是否中断事件发射的。
从名称上就能看出,这个是发射器,故名思议是用来发射事件的,正是它将上游产生的事件发射到下游的。
Emitter 是事件源与下游的桥梁。
CreateEmitter 主要包括方法:
1 | void onNext( T value); |
是不是跟 observer 的方法很像?
我们来看看 CreateEmitter 中这几个方法的具体实现:
只列出关键代码
1 | static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable { |
CreateEmitter 的 onError 和 onComplete 方法任何一个执行完都会执行 dispose()**中断事件发射,所以 observer 中的 **onError 和 onComplete 也只能有一个被执行。
现在我们可以知道,事件是如何被发射给下游的。当订阅成功后,数据源ObservableOnSubscribe 开始生产事件,调用Emitter的onNext,onComplete向下游发射事件。
Emitter 包含了 observer 的引用,又调用了observer onNext,onComplete,这样下游observer 就接收到了上游发射的数据。
Observable 与 Observer 订阅的过程 重要步骤:
Observable 与 Observer 订阅的过程时序图如下:
总结
Rxjava 的流程大概是:
Observable.create 创建事件源,但并不生产也不发射事件。
实现 observer 接口,但此时没有也无法接受到任何发射来的事件。
订阅 observable.subscribe(observer)**, 此时会调用具体 **Observable的实现类中的subscribeActual 方法,此时会才会真正触发事件源生产事件,事件源生产出来的事件通过 Emitter的 onNext,onError,onComplete发射给 observer 对应的方法由下游 observer消费掉。从而完成整个事件流的处理。
observer 中的 onSubscribe 在订阅时即被调用,并传回了 Disposable, observer 中可以利用 Disposable 来随时中断事件流的发射。
二,map转换
我们知道了RxJava简单使用的原理之后,我们跟着就要学习操作符的使用了,可是操作符有那么多?我们怎么学呢?
其实我们只要搞懂一个操作符的原理,我们就会懂得其他操作符的原理,进而了解整个RxJava的原理。
接下来,我们来研究map操作符
使用如下:
1 | Observable.create(new ObservableOnSubscribe<String>() { |
现在,我们就走进map操作符的源码
1 | public final <R> Observable<R> map(super T, ? extends R> mapper) Function<? { |
我们现在又看到了熟悉的钩子hook函数,RxJava中的钩子函数真的是无处不在,在此,我们默认知道其实这个函数就会返回new ObservableMap<>(this, mapper) 这个对象。想都不用想我们知道这个ObservableMap其实也是Observable的子类
我们点进去ObservableMap的源码看一下
1 | public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> { |
上面ObservableMap就做了三件事
- 在构造方法中,将传入的Observable也就是本身抛给父类(ObservableSource是Observable的父类,所以可以接受)
- 对转换逻辑funtion进行保存
- 重写subscribeActual()方法并在其中实现订阅
我们重点看subscribeActual的实现,source指的是上游自定义source(即ObservableCreate),按照之前我们分析,应该是 source.subscribe(Observer),
刚好MapObserver这个也是Observer的子类,所以没问题。
创建MapObserver需要两个参数,第一个参数是自定义观察者(下游Observer或者又叫终点),第二个参数是转换逻辑的funtion。
我们现在点进去自定义source(ObservableCreate)的subscribe方法
1 |
|
发现它在第一层包裹的基础上,又给它封了一层包裹,也就是把第一层包裹作为参数传入了第二层包裹即发射器。
也就是说最终的终点(自定义观察者)经历了两次封装,第一次是封装为MapObserver
,我们称之为第一层包裹,第二次是封装为CreateEmitter
,我们称之为第二层包裹。
然后我们来看我们的自定义source发送的事件是怎么流入到终点的。
1 | Observable.create(new ObservableOnSubscribe<String>() { |
我们发射的最先是由CreateEmitter中开启。我们查看CreateEmitter这个类的onNext()方法:
1 | @Override |
发现它调用了observer的onNext方法,并且把我们传入的参数也作为参数传进去,这个observer是下一层,而不是自定义观察者。它的下一层就是ObservableMap,我们现在进入ObservableMap看一下
1 |
|
首先将我们传入的值进行了一个变换,即apply方法,然后调用下游的onNext
方法将变换后的值传过去。这里我们的下游就是终点,即自定义观察者。所以就到头了。
装饰模式
假如用到了两个map
操作符,create
方法返回的是ObservableCreate
对象,然后调用map
方法,相当于将ObservableCreate
用ObservableMap
包起来,然后又调用一次map
方法,相当于用ObservableMap
将ObservableMap
包起来。用图表示就是这样子
总结:
三,线程调度
Android 的 UI 线程是不能做网络操作,也不能做耗时操作,
所以一般我们把网络或耗时操作都放在非 UI 线程中执行。
RxJava强 大的线程调度能力能很快很好进行线程切换。
1 | Observable.create(new ObservableOnSubscribe<String>() { |
线程调度(被观察者) subscribeOn
Scheduler分类
调度器类型 效果 Schedulers.computation() 用于计算任务,如事件循环或和回调处理,不要用于IO操作(IO操作请使用Schedulers.io());默认线程数等于处理器的数量 Schedulers.from(executor) 使用指定的Executor作为调度器 Schedulers.immediate( ) 在当前线程立即开始执行任务 Schedulers.io( ) 用于IO密集型任务 Schedulers.newThread( ) 为每个任务创建一个新线程 Schedulers.trampoline( ) 当其它排队的任务完成后,在当前线程排队开始执行 AndroidSchedulers.mainThread() 用于Android的UI更新操作
首先我们先分析下 **Schedulers.io()**这个函数是什么
1 |
|
老熟人hook函数,我们直接看IO是啥
IO 是个 static 变量,初始化的地方是Schedulers的静态代码块中
1 | IO = RxJavaPlugins.initIoScheduler(new IOTask()); |
等价于
1 | io() = new IOTask().call(); |
继续看看 IOTask
1 | static final class IOTask implements Supplier<Scheduler> { |
综合以上,得出结论
1 | Schedulers.io() = new IoScheduler() |
好了,排除了其他干扰代码,接下来看看 IoScheduler()是什么了
IoScheduler 看名称就知道是个 IO 线程调度器,根据代码注释得知,它就是一个用来创建
和缓存线程的线程池。看到这个豁然开朗了,原来 Rxjava 就是通过这个调度器来调度线程
的,至于具体怎么实现我们接着往下看
1 | //无参构造函数 |
从上面的代码可以看出,new IoScheduler()**后 Rxjava 会创建 **CachedWorkerPool 的线程池,同时也创建并运行了一个名为 RxCachedWorkerPoolEvictor 的清除线程,主要作用是清除不再使用的一些线程。但目前只创建了线程池并没有实际的 thread,所以 Schedulers.io()相当于只做了线程调度的前期准备。
OK,终于可以开始分析 Rxjava 是如何实现线程调度的。回看 **subscribeOn()**方法的内部实现:
1 |
|
很熟悉的代码 RxJavaPlugins.onAssembly,上一篇已经分析过这个方法,就是个 hook function, 等价于直接 return new ObservableSubscribeOn
跟踪代码进入 ObservableSubscribeOn,可以看到这个 ObservableSubscribeOn 继承自 Observable,并且扩展了一些属性,增加了
scheduler。 这就是典型的装饰模式,Rxjava 中大量用到了装饰模式,后面还会经常看到这种 wrap 类。
上面我们已经知道了 Observable.subscribe()**方法最终都是调用了对应的实现类的subscribeActual** 方法。我们重点分析下 subscribeActual:
1 |
|
SubscribeOnObserver 也是装饰模式的体现, 是对下游 observer 的一个 wrap,只是添加了 Disposable 的管理。
接下来分析最重要的 scheduler.scheduleDirect(new SubscribeTask(parent))
1 | //这个类很简单,就是一个 Runnable,最终运行上游的 subscribe 方法 |
1 |
|
回到 IoScheduler
1 |
|
1 | static final class EventLoopWorker extends Scheduler.Worker { |
最终线程是在这个方法内调度并执行的。
1 |
|
至此我们终于明白了 Rxjava 是如何调度线程并执行的,通过 subscribeOn 方法将上游生产事件的方法运行在指定的调度线程中。
上游生产者已被调度到RxCachedThreadScheduler-1线程中,同时发射事件并没有切换线程,所以发射后消费事件的 onNext onErro onComplete 也在
RxCachedThreadScheduler-1 线程中。
图解
概括
- Schedulers.io()等价于 new IoScheduler()。
- new IoScheduler() Rxjava 创建了线程池,为后续创建线程做准备,同时创建并运行了一个清理线程 RxCachedWorkerPoolEvictor,定期执行清理任务。
- subscribeOn()返回一个 ObservableSubscribeOn 对象,它是 Observable 的一个装饰类,增加了 scheduler。
- 调用 subscribe()方法,在这个方法调用后,subscribeActual()被调用,才真正执行了IoSchduler 中的 createWorker()创建线程并运行,最终将上游 Observable 的 subscribe()方法调度到新创建的线程中运行。
- 因为 RxJava 最终能影响 ObservableOnSubscribe 这个匿名实现接口的运行环境的只能是最后一次运行的 subscribeOn() ,又因为 RxJava 订阅的时候是从下往上订阅,所以从上往下第一个 subscribeOn() 就是最后运行的,这就造成了写多个 subscribeOn() 并没有什么用的现象。
线程调度(观察者) observeOn
AndroidSchedulers.mainThread()
先来看看 AndroidSchedulers.mainThread()是什么?
1 | //在主线程执行任务的 scheduler |
1 | private static final Scheduler MAIN_THREAD = |
1 | private static final class MainHolder { |
1 | public static Scheduler initMainThreadScheduler(Callable<Scheduler> scheduler) { |
代码很简单,这个 AndroidSchedulers.mainThread()**想当于 **new HandlerScheduler(new Handler(Looper.getMainLooper())),原来是利用 Android 的 Handler 来调度到 main 线程的。
我们再看看 HandlerScheduler,它与我们上节分析的 IOScheduler 类似,都是继承自Scheduler,所以 AndroidSchedulers.mainThread()其实就是是创建了一个运行在 main thread 上的 scheduler。
observeOn
我们看看这个操作符的源码
1 | public final Observable<T> observeOn( Scheduler scheduler){ |
1 | public final Observable<T> observeOn(boolean delayError, int bufferSize) Scheduler scheduler, { |
重点是这个 new ObservableObserveOn,和之前研究的ObservableSubscribeOn继承自同一个父类。
重点还是这个方法,我们前文已经提到了,Observable 的 subscribe 方法最终都是调用subscribeActual 方法。下面看看这个方法的实现:
1 |
|
从上面代码可以看到使用了 ObserveOnObserver 类对 observer 进行装饰,好了,我们再来看看 ObserveOnObserver。
我们已经知道了,事件源发射的事件,是通过 observer 的 onNext,onError,onComplete 发射到下游的。所以看看 ObserveOnObserver 的这三个方法是如何实现的。我们来看onNext 方法:
1 |
|
1 | void schedule() { |
1 |
|
schedule 方法将传入的 run 调度到对应的 handle 所在的线程来执行,这个例子里就是有main 线程来完成。 再回去看看前面传入的 run 吧。
回到 ObserveOnObserver 中的 run 方法:
1 |
|
1 | void drainNormal() { |
至此我们明白了 RXjava 是如何调度消费者线程了。
概括
Rxjava 调度消费者现在的流程,以 observeOn(AndroidSchedulers.mainThread())为例。
AndroidSchedulers.mainThread()先创建一个包含 handler 的 Scheduler, 这个 handler 是主线程的 handler。
observeOn 方法创建 ObservableObserveOn,它是上游 Observable 的一个装饰类,其中包含前面创建的 Scheduler 和 bufferSize 等.
当订阅方法 subscribe 被调用后,ObservableObserveOn 的 subscribeActual 方法创建Scheduler.Worker 并调用上游的 subscribe 方法,同时将自身接收的参数observer用装饰类 ObserveOnObserver 装饰后传递给上游。
当上游调用被 ObserveOnObserver 的 onNext、onError 和 onComplete 方法时,ObserveOnObserver 将上游发送的事件通通加入到队列 queue 中,然后再调用 scheduler将处理事件的方法调度到对应的线程中(本例会调度到 main thread)。 处理事件的方法将queue 中保存的事件取出来,调用下游原始的 observer 再发射出去。
经过以上流程,下游处理事件的消费者线程就运行在了 observeOn 调度后的 thread 中。
线程总结
Rxjava 的 subscribe 方法是由下游一步步向上游进行传递的。会调用上游的 subscribe,直到调用到事件源。如: **source.subscribe(xxx);**而上游的 source 往往是经过装饰后的 Observable, Rxjava 就是利用ObservableSubscribeOn 将 subscribe 方法调度到了指定线程运行,生产者线程最终会运行在被调度后的线程中。但多次调用 subscribeOn 方法会怎么样呢? 我们知道因为 subscribe方法是由下而上传递的,所以事件源的生产者线程最终都只会运行在第一次执行subscribeOn 所调度的线程中,换句话就是多次调用 subscribeOn 方法,只有第一次有效。
Rxjava 发射事件是由上而下发射的,上游的 onNext、onError、onComplete 方法会调用下游传入的 observer 的对应方法。往往下游传递的 observer 对象也是经过装饰后的observer 对象。Rxjava 就是利用 ObserveOnObserver 将执行线程调度后,再调用下游对应的 onNext、onError、onComplete 方法,这样下游消费者就运行再了指定的线程内。 那么多次调用 observeOn 调度不同的线程会怎么样呢? 因为事件是由上而下发射的,所以每次用 observeOn 切换完线程后,对下游的事件消费都有效,比如下游的 map 操作符。最终的事件消费线程运行在最后一个 observeOn 切换后线程中。
四,背压
参考https://blog.csdn.net/carson_ho/article/details/79081407
原理
控制观察者接收事件的速度
异步订阅
同步订阅
同步订阅 & 异步订阅 的区别在于:
- 同步订阅中,被观察者 & 观察者工作于同1线程
- 同步订阅关系中没有缓存区
控制被观察者发送事件的速度
异步订阅
同步订阅
在同步订阅情况中使用
FlowableEmitter.requested()
时,有以下几种使用特性需要注意的:
背压策略
模式1:BackpressureStrategy.ERROR
问题:发送事件速度 > 接收事件 速度,即流速不匹配
具体表现:出现当缓存区大小存满(默认缓存区大小 = 128)、被观察者仍然继续发送下1个事件时
处理方式:直接抛出异常MissingBackpressureException
模式2:BackpressureStrategy.MISSING
问题:发送事件速度 > 接收事件 速度,即流速不匹配
具体表现是:出现当缓存区大小存满(默认缓存区大小 = 128)、被观察者仍然继续发送下1个事件时
处理方式:友好提示:缓存区满了
模式3:BackpressureStrategy.BUFFER
问题:发送事件速度 > 接收事件 速度,即流速不匹配
具体表现是:出现当缓存区大小存满(默认缓存区大小 = 128)、被观察者仍然继续发送下1个事件时
处理方式:将缓存区大小设置成无限大
- 即被观察者可无限发送事件观察者,但实际上是存放在缓存区
- 但要注意内存情况,防止出现OOM
模式4: BackpressureStrategy.DROP
问题:发送事件速度 > 接收事件 速度,即流速不匹配
具体表现是:出现当缓存区大小存满(默认缓存区大小 = 128)、被观察者仍然继续发送下1个事件时
处理方式:超过缓存区大小(128)的事件丢弃
如发送了150个事件,仅保存第1 - 第128个事件,第129 -第150事件将被丢弃
模式5:BackpressureStrategy.LATEST
问题:发送事件速度 > 接收事件 速度,即流速不匹配
具体表现是:出现当缓存区大小存满(默认缓存区大小 = 128)、被观察者仍然继续发送下1个事件时
处理方式:只保存最新(最后)事件,超过缓存区大小(128)的事件丢弃
即如果发送了150个事件,缓存区里会保存129个事件(第1-第128 + 第150事件)
五,常见问题
5.1,操作符 map 和 flatmap 的区别?
map:【数据类型转换】将被观察者发送的事件转换为另一种类型的事件。
flatMap:【化解循环嵌套和接口嵌套】将被观察者发送的事件序列进行拆分 & 转换 后合并成一个新的事件序列,最后再进行发送。
concatMap:【有序】与 flatMap 的 区别在于,拆分 & 重新合并生成的事件序列 的顺序与被观察者旧序列生产的顺序一致。
共同点
都是依赖 Function 函数进行转换(将一个类型依据程序逻辑转换成另一种类型,根据入参和返回值)
都能在转换后直接被 subscribe
区别
返回结果不同
map 返回的是结果集,flatmap 返回的是包含结果集的 Observable 对象(返回结果不同)
执行顺序不同
map 被订阅时每传递一个事件执行一次 onNext 方法,flatmap 多用于多对多,一对多,再被转化为多个时,一般利用 from/just 进行一一分发,被订阅时将所有数据传递完毕汇总到一个 Observable 然后一一执行 onNext 方法。(如单纯用于一对一转换则和 map 相同)
转换对象的能力不同
map 只能单一转换,单一指的是只能一对一进行转换,指一个对象可以转化为另一个对象但是不能转换成对象数组。
flatmap 既可以单一转换也可以一对多/多对多转换,flatmap 要求返回 Observable,因此可以再内部进行事件分发,逐个取出单一对象。