前言
这一节可能是系类中最短的一篇了,我们只看一个内容observeOn的运作原理。不过observeOn和subscribeOn有些区别,observeOn根据操作流的上层的操作有不同的运行流程。
流程1
Observable.just("1").subscribeOn(Schedulers.newThread()).observeOn(Schedulers.newThread()). subscribe(...);
我们已如上流程作为第一个流程进行分析。
我们来看observeOn的源码
@CheckReturnValue @SchedulerSupport(SchedulerSupport.CUSTOM) public final ObservableobserveOn(Scheduler scheduler) { return observeOn(scheduler, false, bufferSize()); }@CheckReturnValue @SchedulerSupport(SchedulerSupport.CUSTOM) public final Observable observeOn(Scheduler scheduler, boolean delayError) { return observeOn(scheduler, delayError, bufferSize()); }public final Observable observeOn(Scheduler scheduler, boolean delayError, int bufferSize) { ObjectHelper.requireNonNull(scheduler, "scheduler is null"); ObjectHelper.verifyPositive(bufferSize, "bufferSize"); return RxJavaPlugins.onAssembly(new ObservableObserveOn (this, scheduler, delayError, bufferSize)); }
内容很简单,就是讲原Observable再进行一次包装,变成ObservableObserveOn类型的被观察者。值得说一下的是后面两个参数
delayError,表示当遇到错误时,是否延迟处理错误。在后面的分析中我们将会看到它的作用。
bufferSize, 缓存大小,用来表示生产者堆积的最大数量(然后代码中发现这个东西实际上也没起到什么限制作用,求解!)。
当我们调用它的subscribe之后,会运行如下代码:
@Override protected void subscribeActual(Observer observer) { if (scheduler instanceof TrampolineScheduler) { source.subscribe(observer); } else { Scheduler.Worker w = scheduler.createWorker(); source.subscribe(new ObserveOnObserver(observer, w, delayError, bufferSize)); } }
如果scheduler是TrampolineScheduler那么observeOn等于什么都没做,在原被观察者上调用subscribe注册原观察者。
否则,在原观察者上注册一个包装过后的观察者。结合最上面的代码,相当于我们会在ObservableSubscribeOn上调用subscribe!
ObservableSubscribeOn 的运行流程这里就不说了,总之 ObserveOnObserver的onSubscribe会在当前线程最先被调用。
@Override public void onSubscribe(Disposable s) { if (DisposableHelper.validate(this.s, s)) { this.s = s; if (s instanceof QueueDisposable) { @SuppressWarnings("unchecked") QueueDisposableqd = (QueueDisposable ) s; int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY); if (m == QueueDisposable.SYNC) { .... return; } if (m == QueueDisposable.ASYNC) { .... return; } } queue = new SpscLinkedArrayQueue (bufferSize); actual.onSubscribe(this); } }
这里就是我所说的多个流程了。仔细思考,ObserveOnObserver的onSubscribe是被谁调用的?查看代码发现是被直接上层操作符ObservableSubscribeOn调用的。那么是不是存在不是被直接上层操作符调用的呢?当然,如果我的直接上层也是一个ObservableObserveOn,它并不会调用onSubscribe而是交给上层调用。
这也是多流程的成因。在这里s并不是继承与QueueDisposable(s的类型是 SubscribeOnObserver)。所以queue就等于SpscLinkedArrayQueue。这里并不打算详细介绍SpscLinkedArrayQueue,它就是一个队列的实现,你甚至可以自己实现一个功能相同的队列。来看一下介绍
A single-producer single-consumer array-backed queue which can allocate new arrays in case the consumer is slower than the producer.(单消费者单生产者的数组队列,当消费者消费速度慢与生产者时,会分配新的数组空间。)
然后调用原Observe的onSubscribe,这是正常流程。
然后当数据一层一层往下发送(这时候线程可能已经改变了,见subscribeOn的介绍),发送到了ObserveOnObserver,调用了onNext方法
@Override public void onNext(T t) { if (done) { return; } if (sourceMode != QueueDisposable.ASYNC) { queue.offer(t); } schedule(); }
我并没有设置done,同事sourceMode也是0,所以运行queue.offer(t),就是将T添加到了之前创建的队列中。
@Override public void onError(Throwable t) { if (done) { RxJavaPlugins.onError(t); return; } error = t; done = true; schedule(); } @Override public void onComplete() { if (done) { return; } done = true; schedule(); }
另外两个方法一并看下,onError会给error变量赋值。两个方法都会将done设置为true。接下来看下核心的schedule
void schedule() { if (getAndIncrement() == 0) { worker.schedule(this); } }
为什么说这两句话是核心代码呢?因为observeOn所想要实现的线程切换就是在这里完成的!具体切换请见第二节的scedule和worker分析。总之这里就是会在指定线程中取运行this.run()方法。
@Override public void run() { if (outputFused) { drainFused(); } else { drainNormal(); } }
outputFused和requestFusion相关,有可能会在链式下层被调用,此处默认不调用,所以默认值为false。会运行drainNormal方法。在理解它之前,我们需要先看一个关键性的方法:
boolean checkTerminated(boolean d, boolean empty, Observer a) { if (cancelled) { queue.clear(); return true; } if (d) { Throwable e = error; if (delayError) { if (empty) { if (e != null) { a.onError(e); } else { a.onComplete(); } worker.dispose(); return true; } } else { if (e != null) { queue.clear(); a.onError(e); worker.dispose(); return true; } else if (empty) { a.onComplete(); worker.dispose(); return true; } } } return false; }
改方法用来判断是否应该停止,返回true表示应该停止。比如如果已经被dispose了,那么显然,应该停止分发数据。参数d,一般就是done,用来表示是否已经完成(在onComplete和onError中会被设置为true),完成就表示所有数据都已经offer到了队列中。如果没完成,那么肯定不应该停止,直接返回false。
这里可以看到delayError的作用了,如果设置了它为true,那么只要发生错误,我不管我的数据是否为空的标记(一般队列里没有数据时为空),直接调用源Observer观察者的onError方法,并且停止后续分发。否则,只要还有数据,就会暂时忽略错误。
如果已经完成,并且没有错误,并且队列中也没有更多待处理的数据了,那么会调用onComplete方法并且停止。
void drainNormal() { int missed = 1; final SimpleQueueq = queue; final Observer a = actual; for (;;) { if (checkTerminated(done, q.isEmpty(), a)) { return; } for (;;) { boolean d = done; T v; try { v = q.poll(); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); s.dispose(); q.clear(); a.onError(ex); worker.dispose(); return; } boolean empty = v == null; if (checkTerminated(d, empty, a)) { return; } if (empty) { break; } a.onNext(v); } missed = addAndGet(-missed); if (missed == 0) { break; } } }
对到drainNormal方法,此处的无线循环需要结合missed和上面schedule中的addAndIncrement结合来看,我们将该问题记为问题A
先来看循环内部的,如果已经done完成,并且队列已经为空了,那么停止(注意onError和onComplete方法都是在这个停止操作中调用的)。
里层循环意义在于,循环从对立中获取数据,如果需要停止,那么直接返回,如果数据为空,跳出循环,如果不为空,调用源observer的onNext方法,然后继续从队列中取数据,重复操作。直到需要停止或者数据为空。
再来看问题A。
每次调用onNext产生数据或者onError或者onComplete之后都会调用schedule方法,给当前值++,但是只有为0的时候才会去调度运行run。这就保证只有一个run在被运行!
而drainNormal方法中,如果判断需要停止,就会跳过missed操作,也就是get()的值不会再为0,于是run方法再也不会被调度。
如果不是需要停止的情况,推出内层循环的情况只有一个,就是生产者还在继续发送数据(onError和onComplete都还没运行,所以done为false),并且队列中没有数据(所有数据都已经取出,所以empty为true)。那么就会运行missed块内容。
missed块的内容极为精巧,第一次,missed为1,所以相当于在当前值上-1,如果 -1 之后为0,那么退出run(因为是0所以schedule有机会再次调度run方法)。如果在处理run方法的数据时,onNext方法被多次运行,假设onNext总共运行了10次,于是get为10,第一次的时候调度了run。run方法在第一次循环中处理了所有数据,跳出循环并且当前值-1,但是依然为9,不为0,继续循环,假设没有新数据处理,直接跳出内存循环,这个时候就是-9,missed为0,跳出循环。
这只是一个并不好的说明,但是还是能了解missed块的原理的。讲到这里,下面反思非常重要:
我们看到在ObserveOnObserve或者SubscribeOnObserve等多个Observe子类,甚至很多其他类都会继承与AtomicInteger或者AtomicReference等这些原子操作类,主要作用就是改类内方法会在不同的线程被调用,使用原子操作能够有效避免脏数据产生的影响,以这样的方式来替代烦人的锁,不得不说这是一个极其值得借鉴的操作!
总结流程1
以上就是流程1的代码分析。脱离代码主要分析下思路,通过重新包装observer观察者,当调用观察者的onNext等方法时,将数据存入队列,并且启动线程调度机制,在指定线程中读取改队列数据,并且调用源Observer观察者的onNext方法。最终实现线程的切换!
流程2
流程2的代码更简单些
Observable.just("1","2","3").observeOn(Schedulers.newThread()). subscribe(...);
和流程1的区别在哪里?关键还是在ObserveOnObserver的OnSubscribe方法中,前面的调用都相同
@Override public void onSubscribe(Disposable s) { if (DisposableHelper.validate(this.s, s)) { this.s = s; if (s instanceof QueueDisposable) { @SuppressWarnings("unchecked") QueueDisposableqd = (QueueDisposable ) s; int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY); if (m == QueueDisposable.SYNC) { sourceMode = m; queue = qd; done = true; actual.onSubscribe(this); schedule(); return; } if (m == QueueDisposable.ASYNC) { sourceMode = m; queue = qd; actual.onSubscribe(this); return; } } queue = new SpscLinkedArrayQueue (bufferSize); actual.onSubscribe(this); } }
但是这里s instanceof QueueDisposable 判断成立,所以会调用s的requestFusion方法。(这是我们首次调用这个不明所以的方法,在此处大概可以窥探设计意图了。)
先来看简单的,如果返回的是ASYNC,和流程1的却别就非常小了,只是设置了sourceMode为ASYNC,并且queue直接赋值为传入进来的QueueDisposable了。
sourceMode唯一调用在onNext中,如果是ASYNC就不会将数据放入queue中。
同样如果返回SYNC,在ObserveOnObserver中我们也将不在自己管理queue而是直接调用。
那么ASYNC和SYNC到底是什么区别呢?这个可能就是requestFusion在这里意义了(因为看的代码不够多,不能保证这是requestFusion的唯一用途。)
如果返回SYNC那么它的QueueDisposable的数据产生是在poll被调用的同一个线程中。
如果返回ASYNC,那么QueueDisposable的数据产生和poll应该是生产者和消费者模式。
不一定准确,但是现在看来确实如此。
总结
那么到此observeOn也已经介绍结束了。我们知道,它通过schedule来调度onNext系列方法的运行线程。所以observeOn有如下特点:
observeOn只会改变最终onNext方法的运行线程。
多个observeOn已最后一个所指定的线程为准。