文章目錄
  1. 1. 一次完整订阅
  2. 2. Observable的转换 — lift(),compose()
  3. 3. 线程转换

转载请附原文链接: RxJava 源码学习笔记

RxJava 使用简介
学习了 RxJava有一段时间了,也在自己的项目中全面替换使用,整个项目的代码结构和质量确实有很大的改观,这么牛逼的一个框架,不仅要知其然,更要知其所以然。阅读源码学习原理也是很有必要的,这篇文章算是学习记录吧,圈内已有很多大神的文章深入分析了RxJava的原理,吾等小辈,不敢造次。

如果你还在学习RxJava的使用,可以看看我上一篇文章,希望能有所帮助

一次完整订阅

observable被create创建的时候,传入一个OnSubscribe对象

1
2
3
public static <T> Observable<T> create(OnSubscribe<T> f) {
return new Observable<T>(hook.onCreate(f));
}

至于这个hook,是RxJavaObservableExecutionHook,RxJavaPlugins中的一个类,用于插入一些你所需要的代码,记录,测试等,在默认的情况下,没有做任何事情,以下是官方文档给出的解释:

This plugin allows you to register functions that RxJava will call upon certain regular RxJava activities, for instance for logging or metrics-collection purposes.

所以目前不需要管这个钩子。在文章后面如果出现其他类型的hook不再作解释

来看看OnSubscribe是个什么东西,继承了Action1接口,接口中只有一个方法call,这个方法什么时候被调用,我们慢慢来看。

1
2
3
4
5
6
/**
* Invoked when Observable.subscribe is called.
*/
public interface OnSubscribe<T> extends Action1<Subscriber<? super T>> {
// cover for generics insanity
}

我们知道当调用了Observable的subscribe()方法后,Observable开始发送数据,那call应该就是在subscribe()时调用的,来看代码(onStart前面是判断参数为空抛异常的代码,没有放上来)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
subscriber.onStart();
if (!(subscriber instanceof SafeSubscriber)) {
subscriber = new SafeSubscriber<T>(subscriber);
}
try {
hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
return hook.onSubscribeReturn(subscriber);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
try {
subscriber.onError(hook.onSubscribeError(e));
} catch (Throwable e2) {
Exceptions.throwIfFatal(e2);
RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
hook.onSubscribeError(r);
throw r;
}
return Subscriptions.unsubscribed();
}
}

首先调用了Subscriber的onStart方法,我们的订阅前的准备代码就是在这里执行,接着判断subscriber是否是 SafeSubscriber的子类,如果不是的话,转成这个类,SafeSubscriber又是什么?

1
2
3
> SafeSubscriber is a wrapper around Subscriber that ensures that the Subscriber complies with the Observable contract
> http://reactivex.io/documentation/contract.html
>

简单翻译,是为了使 Subscriber遵守Observable的某种规则而进行的一次封装,保证onComplete和onError互斥,onNext在onComplete不再发送数据,对异常做了一些操作等等。

接着往下看调用了OnSubscribe.call(),开始执行call中的代码,并且做了一些异常处理,返回一个Subscription对象用于管理这次订阅

用过RxJava的同学应该知道,Subscriber是实现自Observer接口的抽象类,在subscribe时也可以就传入一个observer对象,其实RxJava还是会使用一个subscriber去订阅事件,然后将数据简单的向前传递给这个observer,其他的不同参数的subscribe都是同样的处理方式;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public final Subscription subscribe(final Observer<? super T> observer) {
if (observer instanceof Subscriber) {
return subscribe((Subscriber<? super T>)observer);
}
return subscribe(new Subscriber<T>() {
@Override
public void onCompleted() {
observer.onCompleted();
}
@Override
public void onError(Throwable e) {
observer.onError(e);
}
@Override
public void onNext(T t) {
observer.onNext(t);
}
});
}

知道了一个完整订阅流程在内部时怎么实现的之后,我们来看一下通过其他的方法创建Observable进行发射数据有什么不同。

  • just()
    • 一个参数
  • 多个参数 — 实际是执行了from()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
//一个参数
public static <T> Observable<T> just(final T value) {
return ScalarSynchronousObservable.create(value);
}
//ScalarSynchronousObservable的构造方法
protected ScalarSynchronousObservable(final T t) {
super(new OnSubscribe<T>() {
@Override
public void call(Subscriber<? super T> s) {
s.setProducer(createProducer(s, t));
}
});
this.t = t;
}

一颗赛艇,重写了call方法,调用了subscriber.setProducer(),所以这是个什么方法?干什么用的?从方法名上理解,是给当前的 Subscriber设置了一个producer(提供者)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public void setProducer(Producer p) {
long toRequest;
boolean passToSubscriber = false;
synchronized (this) {
toRequest = requested;
producer = p;
if (subscriber != null) {
// middle operator ... we pass thru unless a request has been made
if (toRequest == NOT_SET) {
// we pass-thru to the next producer as nothing has been requested
passToSubscriber = true;
}
}
}
// do after releasing lock
if (passToSubscriber) {
subscriber.setProducer(producer);
} else {
// we execute the request with whatever has been requested (or Long.MAX_VALUE)
if (toRequest == NOT_SET) {
producer.request(Long.MAX_VALUE);//执行到这句
} else {
producer.request(toRequest);
}
}
}

从上面的代码会执行到producer.request(Long.MAX_VALUE),Producer是一个接口,里面只有一个方法request(long n) — n 是你希望该Producer所提供数据数量的最大值,我们再回到just()中所用到的SingleProducer来看看它是怎么实现的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**注意这里贴出的只是核心代码,并不是完整的源代码*/
@Override
public void request(long n) {
final Subscriber<? super T> c = child;
T v = value;
// eagerly check for unsubscription
if (c.isUnsubscribed()) {
return;
}
// emit the value
try {
c.onNext(v);
} catch (Throwable e) {
Exceptions.throwOrReport(e, c, v);
return;
}
// eagerly check for unsubscription
if (c.isUnsubscribed()) {
return;
}
// complete the child
c.onCompleted();
}

这段代码看懂的难度不大

  • from()
1
2
3
4
5
6
7
8
9
10
public static <T> Observable<T> from(T[] array) {
int n = array.length;
if (n == 0) {
return empty();
} else
if (n == 1) {
return just(array[0]);
}
return create(new OnSubscribeFromArray<T>(array));
}

from方法传入一个数组,如果数组长度为0,则返回empty()— 一个不发射数据的Observable,如果数组只有一个元素,就返回just方法创建的Observable,我们稍后看just,最后一句代码,create时创建了一个OnSubscribeFromArray对象,这个类是OnSubscribe的一个实现类

1
2
3
4
@Override
public void call(Subscriber<? super T> child) {
child.setProducer(new FromArrayProducer<T>(child, array));
}

同just()一样也是通过setProducer的方法来发射数据,但是FromArrayProducer的request()方法里面用到了一个BackpressureUtils来处理,关于Backpressure ,大概是一种处理堆积消息的策略,有兴趣的可以看看这篇文章。

Observable的转换 — lift(),compose()

  • map()

    先从我们最熟悉的操作符map开始,map实际调用了一个方法lift()

    1
    2
    3
    public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
    return lift(new OperatorMap<T, R>(func));
    }
  • flatMap()

    然后来看看flatMap(),调用了merge(),传入了一个参数lift()

    1
    2
    3
    4
    public final <U, R> Observable<R> flatMap(final Func1<? super T, ? extends Observable<? extends U>> collectionSelector,
    final Func2<? super T, ? super U, ? extends R> resultSelector) {
    return merge(lift(new OperatorMapPair<T, U, R>(collectionSelector, resultSelector)));
    }
  • merge()

    1
    2
    3
    4
    5
    6
    public static <T> Observable<T> merge(Observable<? extends Observable<? extends T>> source) {
    if (source.getClass() == ScalarSynchronousObservable.class) {
    return ((ScalarSynchronousObservable<T>)source).scalarFlatMap((Func1)UtilityFunctions.identity());
    }
    return source.lift(OperatorMerge.<T>instance(false));
    }

    merge中也调用了lift()函数,实际上大部分对Observable进行操作的操作符几乎都是用到了这个函数,那么我们来看看lift()到底做了什么

  • lift()

    Lifts a function to the current Observable and returns a new Observable that when subscribed to will pass the values of the current Observable through the Operator function.

    源码中的注释,简单翻译一下,这段话大概的意思是按照这个操作符的规则来将当前的Observable替换为一个新的Observable

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    //部分注释被删掉了
    public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
    return new Observable<R>(new OnSubscribe<R>() {
    @Override
    public void call(Subscriber<? super R> o) {
    try {
    Subscriber<? super T> st = hook.onLift(operator).call(o);
    try {
    // new Subscriber created and being subscribed with so 'onStart' it
    st.onStart();
    onSubscribe.call(st);
    } catch (Throwable e) {
    Exceptions.throwIfFatal(e);
    st.onError(e);
    }
    } catch (Throwable e) {
    Exceptions.throwIfFatal(e);
    o.onError(e);
    }
    }
    });
    }

    这段代码乍一看一脸懵逼,我们先看看Operator是个什么鬼

    1
    2
    3
    public interface Operator<R, T> extends Func1<Subscriber<? super R>, Subscriber<? super T>> {
    // cover for generics insanity
    }

    Operator接口继承自Func1接口,做了一件事情,将泛型指定为两个Subscriber,这时候我们再去看lift中的代码,好像明白怎么回事了

    首先new了一个新的OnSubscribe对象,泛型指定为转换后的数据类型,注意了,operator.call返回了一个Subscriber对象,泛型是原始的Observable所发射的数据,然后调用了原始的Observable中的onSubscribe.call()将数据发射出去

    原始的数据发射到哪儿去了呢?来看OperatorMap中的代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    public final class OperatorMap<T, R> implements Operator<R, T> {
    final Func1<? super T, ? extends R> transformer;
    public OperatorMap(Func1<? super T, ? extends R> transformer) {
    this.transformer = transformer;
    }
    @Override
    public Subscriber<? super T> call(final Subscriber<? super R> o) {
    return new Subscriber<T>(o) {
    @Override
    public void onCompleted() {
    o.onCompleted();
    }
    @Override
    public void onError(Throwable e) {
    o.onError(e);
    }
    @Override
    public void onNext(T t) {
    try {
    o.onNext(transformer.call(t));
    } catch (Throwable e) {
    Exceptions.throwOrReport(e, this, t);
    }
    }
    };
    }
    }

    我看到这段代码的时候有种醍醐灌顶的感觉啊,是不是很6,subscriber将数据传递给transformer,让我们在Func1中处理相关逻辑

关于lift的原理,看看扔物线大神的文章变换的原理 ,里面的图解很好的解释了lift的流程

  • compose() — 上一篇用法学习的时候已经讲过,不再赘述,传送门

线程转换

线程转换的原理其实也是使用lift()去转换,关于流程原理可以参考Scheduler的原理,线程转换的内部实现,代码较多,打算放到下一篇文章中,专门分析学习。