转载请附原文链接: RxJava 源码学习笔记
RxJava 使用简介
学习了 RxJava有一段时间了,也在自己的项目中全面替换使用,整个项目的代码结构和质量确实有很大的改观,这么牛逼的一个框架,不仅要知其然,更要知其所以然。阅读源码学习原理也是很有必要的,这篇文章算是学习记录吧,圈内已有很多大神的文章深入分析了RxJava的原理,吾等小辈,不敢造次。
如果你还在学习RxJava的使用,可以看看我上一篇文章,希望能有所帮助
一次完整订阅
observable被create创建的时候,传入一个OnSubscribe对象
|
至于这个hook,是RxJavaObservableExecutionHook,RxJavaPlugins中的一个类,用于插入一些你所需要的代码,记录,测试等,在默认的情况下,没有做任何事情,以下是官方文档给出的解释:
This plugin allows you to register functions that RxJava will call upon certain regular RxJava activities, for instance for logging or metrics-collection purposes.
所以目前不需要管这个钩子。在文章后面如果出现其他类型的hook不再作解释
来看看OnSubscribe是个什么东西,继承了Action1接口,接口中只有一个方法call,这个方法什么时候被调用,我们慢慢来看。
|
我们知道当调用了Observable的subscribe()方法后,Observable开始发送数据,那call应该就是在subscribe()时调用的,来看代码(onStart前面是判断参数为空抛异常的代码,没有放上来)
|
首先调用了Subscriber的onStart方法,我们的订阅前的准备代码就是在这里执行,接着判断subscriber是否是 SafeSubscriber的子类,如果不是的话,转成这个类,SafeSubscriber又是什么?
> SafeSubscriber is a wrapper around Subscriber that ensures that the Subscriber complies with the Observable contract> http://reactivex.io/documentation/contract.html>
简单翻译,是为了使 Subscriber遵守Observable的某种规则而进行的一次封装,保证onComplete和onError互斥,onNext在onComplete不再发送数据,对异常做了一些操作等等。
接着往下看调用了OnSubscribe.call(),开始执行call中的代码,并且做了一些异常处理,返回一个Subscription对象用于管理这次订阅
用过RxJava的同学应该知道,Subscriber是实现自Observer接口的抽象类,在subscribe时也可以就传入一个observer对象,其实RxJava还是会使用一个subscriber去订阅事件,然后将数据简单的向前传递给这个observer,其他的不同参数的subscribe都是同样的处理方式;
|
知道了一个完整订阅流程在内部时怎么实现的之后,我们来看一下通过其他的方法创建Observable进行发射数据有什么不同。
- just()
- 一个参数
- 多个参数 — 实际是执行了from()
|
一颗赛艇,重写了call方法,调用了subscriber.setProducer(),所以这是个什么方法?干什么用的?从方法名上理解,是给当前的 Subscriber设置了一个producer(提供者)
|
从上面的代码会执行到producer.request(Long.MAX_VALUE),Producer是一个接口,里面只有一个方法request(long n) — n 是你希望该Producer所提供数据数量的最大值,我们再回到just()中所用到的SingleProducer来看看它是怎么实现的
|
这段代码看懂的难度不大
- from()
|
from方法传入一个数组,如果数组长度为0,则返回empty()— 一个不发射数据的Observable,如果数组只有一个元素,就返回just方法创建的Observable,我们稍后看just,最后一句代码,create时创建了一个OnSubscribeFromArray对象,这个类是OnSubscribe的一个实现类
|
同just()一样也是通过setProducer的方法来发射数据,但是FromArrayProducer的request()方法里面用到了一个BackpressureUtils来处理,关于Backpressure ,大概是一种处理堆积消息的策略,有兴趣的可以看看这篇文章。
Observable的转换 — lift(),compose()
map()
先从我们最熟悉的操作符map开始,map实际调用了一个方法lift()
public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {return lift(new OperatorMap<T, R>(func));}flatMap()
然后来看看flatMap(),调用了merge(),传入了一个参数lift()
public final <U, R> Observable<R> flatMap(final Func1<? super T, ? extends Observable<? extends U>> collectionSelector,final Func2<? super T, ? super U, ? extends R> resultSelector) {return merge(lift(new OperatorMapPair<T, U, R>(collectionSelector, resultSelector)));}merge()
public static <T> Observable<T> merge(Observable<? extends Observable<? extends T>> source) {if (source.getClass() == ScalarSynchronousObservable.class) {return ((ScalarSynchronousObservable<T>)source).scalarFlatMap((Func1)UtilityFunctions.identity());}return source.lift(OperatorMerge.<T>instance(false));}merge中也调用了lift()函数,实际上大部分对Observable进行操作的操作符几乎都是用到了这个函数,那么我们来看看lift()到底做了什么
lift()
Lifts a function to the current Observable and returns a new Observable that when subscribed to will pass the values of the current Observable through the Operator function.
源码中的注释,简单翻译一下,这段话大概的意思是按照这个操作符的规则来将当前的Observable替换为一个新的Observable
//部分注释被删掉了public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {return new Observable<R>(new OnSubscribe<R>() {public void call(Subscriber<? super R> o) {try {Subscriber<? super T> st = hook.onLift(operator).call(o);try {// new Subscriber created and being subscribed with so 'onStart' itst.onStart();onSubscribe.call(st);} catch (Throwable e) {Exceptions.throwIfFatal(e);st.onError(e);}} catch (Throwable e) {Exceptions.throwIfFatal(e);o.onError(e);}}});}这段代码乍一看一脸懵逼,我们先看看Operator是个什么鬼
public interface Operator<R, T> extends Func1<Subscriber<? super R>, Subscriber<? super T>> {// cover for generics insanity}Operator接口继承自Func1接口,做了一件事情,将泛型指定为两个Subscriber,这时候我们再去看lift中的代码,好像明白怎么回事了
首先new了一个新的OnSubscribe对象,泛型指定为转换后的数据类型,注意了,operator.call返回了一个Subscriber对象,泛型是原始的Observable所发射的数据,然后调用了原始的Observable中的onSubscribe.call()将数据发射出去。
原始的数据发射到哪儿去了呢?来看OperatorMap中的代码
public final class OperatorMap<T, R> implements Operator<R, T> {final Func1<? super T, ? extends R> transformer;public OperatorMap(Func1<? super T, ? extends R> transformer) {this.transformer = transformer;}public Subscriber<? super T> call(final Subscriber<? super R> o) {return new Subscriber<T>(o) {public void onCompleted() {o.onCompleted();}public void onError(Throwable e) {o.onError(e);}public void onNext(T t) {try {o.onNext(transformer.call(t));} catch (Throwable e) {Exceptions.throwOrReport(e, this, t);}}};}}我看到这段代码的时候有种醍醐灌顶的感觉啊,是不是很6,subscriber将数据传递给transformer,让我们在Func1中处理相关逻辑
关于lift的原理,看看扔物线大神的文章变换的原理 ,里面的图解很好的解释了lift的流程
- compose() — 上一篇用法学习的时候已经讲过,不再赘述,传送门
线程转换
线程转换的原理其实也是使用lift()去转换,关于流程原理可以参考Scheduler的原理,线程转换的内部实现,代码较多,打算放到下一篇文章中,专门分析学习。