简介
对rxjava 有一个简单的学习之后,笔者还是很难 理解rxjava 在服务端的使用,感觉学习了hystrix 之后,这块的理解会更深刻一些。
首先对于同步调用,rxjava的作用有限,而对于异步调用,对于类似于netty这种 方法直接返回future的,rxjava也套不上。其所谓异步调用,通常是另起 线程执行一个同步调用(从驱动线程的角度看,这就是一个异步调用了),由此成为一个多线程代码,解决多线程环境下的 数据流控制问题。
线程切换
线程控制绝对是RxJava的重点之一。在不指定线程的情况下,RxJava遵循的是线程不变的原则,在哪个线程调用subscribe(),就在哪个线程生产、消费事件。
线程控制的 本质 还是 将 当前 Observable 转换为 另一个Observable,具体的说是转换Observable的onSubscribe 方法,跟filter 等普通的数据转换一样一样的。明面上是线程切换,其实是函数 包装。
public Observable<T> observeOn(Scheduler scheduler) {
return observeOn(this, scheduler);
}
public Observable<T> subscribeOn(Scheduler scheduler) {
return subscribeOn(this, scheduler);
}
public Observable<T> filter(Func1<T, Boolean> predicate) {
return filter(this, predicate);
}
谜之RxJava (三)update 2 —— subscribeOn 和 observeOn 的区别
笔者最早找到 支持observeOn 的版本0.10.0
从0.10.0 可以看到,无论是observeOn 还是subscribeOn,参数都是Scheduler,都会导致 代码切换到 另一个线程(由Scheduler 实现类决定)执行。只是observeOn 只是 表示 其之后的操作,由observeOn 指定的Scheduler执行。subscribeOn 则是 之前及之后的操作 都由subscribeOn 指定的Scheduler 执行,直到遇到observeOn。
subscribeOn
Func1<Observer<T>, Subscription>
叫 onSubscribe,Subscribe 是 Subscribe ,别弄混onSubscribe和Subscribe。
public static <T> Func1<Observer<T>, Subscription> subscribeOn(Observable<T> source, Scheduler scheduler) {
return new SubscribeOn<T>(source, scheduler);
}
private static class SubscribeOn<T> implements Func1<Observer<T>, Subscription> {
private final Observable<T> source;
private final Scheduler scheduler;
public SubscribeOn(Observable<T> source, Scheduler scheduler) {
this.source = source;
this.scheduler = scheduler;
}
@Override
public Subscription call(final Observer<T> observer) {
return scheduler.schedule(new Func0<Subscription>() {
@Override
public Subscription call() {
return new ScheduledSubscription(source.subscribe(observer), scheduler);
}
});
}
}
Observable.subscribeOn 的逻辑链条,根据 当前Observable 和 scheduler 创建一个新的 Func1<Observer<T>, Subscription>
onSubscribe (学名叫subscribeOn )并基于此创建新的 Observable。 转换 onSubscribe 过程涉及到 几个Subscription 的转换
- 当前 Observable.subscribe(observer) 返回 Subscribe
- 将 Subscribe 封装为 ScheduledSubscription
- 将 ScheduledSubscription 封装为 SafeObservableSubscription
以NewThreadScheduler 为例
Observable.filter()
.map1()
.subscribeOn(NewThreadScheduler)
.map2()
.subscribe(xx)
以filter操作为例
// class Observable
public Observable<T> filter(Func1<T, Boolean> predicate) {
return filter(this, predicate);
}
public static <T> Observable<T> filter(Observable<T> that, Func1<T, Boolean> predicate) {
return create(OperationFilter.filter(that, predicate));
}
// class OperationFilter
public static <T> Func1<Observer<T>, Subscription> filter(Observable<T> that, Func1<T, Boolean> predicate) {
return new Filter<T>(that, predicate);
}
private static class Filter<T> implements Func1<Observer<T>, Subscription> {
private final Observable<T> that;
private final Func1<T, Boolean> predicate;
public Filter(Observable<T> that, Func1<T, Boolean> predicate) {
this.that = that;
this.predicate = predicate;
}
public Subscription call(final Observer<T> observer) {
...
that.subscribe(new Observer<T>() {
public void onNext(T value) {
try {
if (predicate.call(value)) {
observer.onNext(value);
}
} catch (Throwable ex) {
observer.onError(ex);
...
}
}
public void onError(Throwable ex) {
observer.onError(ex);
}
public void onCompleted() {
observer.onCompleted();
}
});
...
}
}
-
filter 时的 Observable 和 最后 subscribe 当时的 Observable 已经不是同一个了。filter 时的observer 是 new 出来的,跟最后subscribe 方法参数的 observer 也不是同一个。
动作 源Observable 对应observer filter Observable observer3.onNext map1 Observable1 observer2.onNext subscribeOn Observable2 observer1.onNext 只是异步驱动了一下 map2 Observable3 observer1.onNext subscribe Observable4 observer.onNext rxjava 通过封装,只将原始的Observable 和 observer 暴露给了用户。
- 下一个Observable 简介持有 上一个 Observable 的引用
- 最新的Observable4.subscribe 驱动整个逻辑 开始 执行,具体的说 是驱动 其对应的
Func1<Observer<T>, Subscription>
的执行。 -
Observable4.subscribe 实现是 Observable4. onSubscribe.call ,方法执行链条为
Observable4.subscribe ==> Observable4.onSubscribe.call ==> Observable3.subscribe ==> Observable3.subscribeOn.call ==> 驱动线程执行完毕,切换thread Observable2.subscribe ==> Observable2.onSubscribe.call ==> Observable1.subscribe ==> Observable1.onSubscribe.call ==> Observable.subscribe ==> Observable.onSubscribe.call ==> observer3.onNext1,onNext2,onCompleted ==> filter ==> observer2.onNext1,onNext2,onCompleted ==> ... observer.onNext1,onNext2,onCompleted
对于这个方法执行链
RxJava for 100% beginners (part3-switching threads)subscribeOn()
change the thread for emitting the source Observable’s elements, no matter where you put it in your “chain”.
用一张图解释RxJava中的线程控制 则将这个方法链分为两个阶段
- 驱动阶段,从下游到上游,反向驱动
- 事件发射阶段。第一个Observable开始产生事件,然后事件流就开始正向传递
这也就解答了笔者的一个疑惑,为什么subscribeOn 放在任何位置 对“副作用函数” 都有效?因为线程的切换 在事件驱动阶段,而副作用函数的执行 在事件发射阶段。
observeOn
以下列代码为例
Observable.filter()
.map1()
.observerOn(NewThreadScheduler)
.map2()
.subscribe(xx)
分析下 ObserveOn 源码
// OperationObserveOn
public static <T> Func1<Observer<T>, Subscription> observeOn(Observable<T> source, Scheduler scheduler) {
return new ObserveOn<T>(source, scheduler);
}
private static class ObserveOn<T> implements Func1<Observer<T>, Subscription> {
private final Observable<T> source;
private final Scheduler scheduler;
public ObserveOn(Observable<T> source, Scheduler scheduler) {
this.source = source;
this.scheduler = scheduler;
}
@Override
public Subscription call(final Observer<T> observer) {
if (scheduler instanceof ImmediateScheduler) {
// do nothing if we request ImmediateScheduler so we don't invoke overhead
return source.subscribe(observer);
} else {
return source.subscribe(new ScheduledObserver<T>(observer, scheduler));
}
}
}
分析Observable 与 observer 的 变换
动作 | 源Observable | 对应observer |
---|---|---|
filter | Observable | observer4.onNext |
map1 | Observable1 | observer3.onNext |
subscribeOn | Observable2 | observer2.onNext 变成了ScheduledObserver |
map2 | Observable3 | observer1.onNext |
subscribe | Observable4 | observer.onNext |
分析方法执行链
Observable4.subscribe ==>
Observable4.onSubscribe.call ==>
Observable3.subscribe ==>
Observable3.subscribeOn.call ==>
Observable2.subscribe ==>
Observable2.onSubscribe.call ==>
Observable1.subscribe ==>
Observable1.onSubscribe.call ==>
Observable.subscribe ==>
Observable.onSubscribe.call ==>
observer4.onNext1,onNext2 ==>
filter ==>
observer3.onNext1,onNext2 ==>
map ==>
observer2.onNext1,onNext2 ==> // 提交事件,驱动线程执行完毕,另一个线程执行下面的逻辑(接收事件并驱动后续执行)
...
observer.onNext1,onNext2
小结
形式上顺序执行filter、map 等,从上到下,实际上是subscribe 才真正触发执行,但最后还是按照filter、map 的顺序 执行业务逻辑——代码腾挪的艺术。
突然奇想对照下 builder 模式,示例代码可以类比为
Observable.setFilter(filterFunction)
.setMap1(map1Function)
.subscribeOn(NewThreadScheduler)
.setMap2(map2Function)
.setObserver(observer)
.build()
类似于函数式编程,返回函数 或者 函数接口的,一定要小心,代码写在哪里 跟 代码什么时候执行 没啥关系, 经常违反直觉。
Observable.just(1, 2, 3, 4, 5, 6)
.subscribe(new Subscriber() {
@Override
public void onCompleted() {
System.out.println("Complete!");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer value) {
System.out.println("onNext: " + value);
}
});
比如上述代码, 一个数组本来不具备任何能力(方法),其对应的订阅者(本质就是 System.out.println("onNext: " + value);
)同样也平淡无奇。但对象(此处的数组)可以加行为,行为(此处的 System.out.println)可以包括在一个对象中。 本体啥都没有,我们单靠外在的包裹,就可以将事件流、线程异步执行等概念加到上面去。