博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Rxjava2 源码解析 (三)
阅读量:6286 次
发布时间:2019-06-22

本文共 10231 字,大约阅读时间需要 34 分钟。

hot3.png

前言

    这一节可能是系类中最短的一篇了,我们只看一个内容observeOn的运作原理。不过observeOn和subscribeOn有些区别,observeOn根据操作流的上层的操作有不同的运行流程。

流程1    

Observable.just("1").subscribeOn(Schedulers.newThread()).observeOn(Schedulers.newThread()).                subscribe(...);

    我们已如上流程作为第一个流程进行分析。

    我们来看observeOn的源码

@CheckReturnValue    @SchedulerSupport(SchedulerSupport.CUSTOM)    public final Observable
observeOn(Scheduler scheduler) { return observeOn(scheduler, false, bufferSize()); }@CheckReturnValue @SchedulerSupport(SchedulerSupport.CUSTOM) public final Observable
observeOn(Scheduler scheduler, boolean delayError) { return observeOn(scheduler, delayError, bufferSize()); }public final Observable
observeOn(Scheduler scheduler, boolean delayError, int bufferSize) { ObjectHelper.requireNonNull(scheduler, "scheduler is null"); ObjectHelper.verifyPositive(bufferSize, "bufferSize"); return RxJavaPlugins.onAssembly(new ObservableObserveOn
(this, scheduler, delayError, bufferSize)); }

    内容很简单,就是讲原Observable再进行一次包装,变成ObservableObserveOn类型的被观察者。值得说一下的是后面两个参数

    delayError,表示当遇到错误时,是否延迟处理错误。在后面的分析中我们将会看到它的作用。

    bufferSize, 缓存大小,用来表示生产者堆积的最大数量(然后代码中发现这个东西实际上也没起到什么限制作用,求解!)。

    当我们调用它的subscribe之后,会运行如下代码:

@Override    protected void subscribeActual(Observer
observer) { if (scheduler instanceof TrampolineScheduler) { source.subscribe(observer); } else { Scheduler.Worker w = scheduler.createWorker(); source.subscribe(new ObserveOnObserver
(observer, w, delayError, bufferSize)); } }

    如果scheduler是TrampolineScheduler那么observeOn等于什么都没做,在原被观察者上调用subscribe注册原观察者。

    否则,在原观察者上注册一个包装过后的观察者。结合最上面的代码,相当于我们会在ObservableSubscribeOn上调用subscribe!

    ObservableSubscribeOn  的运行流程这里就不说了,总之  ObserveOnObserver的onSubscribe会在当前线程最先被调用。

@Override        public void onSubscribe(Disposable s) {            if (DisposableHelper.validate(this.s, s)) {                this.s = s;                if (s instanceof QueueDisposable) {                    @SuppressWarnings("unchecked")                    QueueDisposable
qd = (QueueDisposable
) s; int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY); if (m == QueueDisposable.SYNC) { .... return; } if (m == QueueDisposable.ASYNC) { .... return; } } queue = new SpscLinkedArrayQueue
(bufferSize); actual.onSubscribe(this); } }

    这里就是我所说的多个流程了。仔细思考,ObserveOnObserver的onSubscribe是被谁调用的?查看代码发现是被直接上层操作符ObservableSubscribeOn调用的。那么是不是存在不是被直接上层操作符调用的呢?当然,如果我的直接上层也是一个ObservableObserveOn,它并不会调用onSubscribe而是交给上层调用。

    这也是多流程的成因。在这里s并不是继承与QueueDisposable(s的类型是 SubscribeOnObserver)。所以queue就等于SpscLinkedArrayQueue。这里并不打算详细介绍SpscLinkedArrayQueue,它就是一个队列的实现,你甚至可以自己实现一个功能相同的队列。来看一下介绍

A single-producer single-consumer array-backed queue which can allocate new arrays in case the consumer is slower than the producer.(单消费者单生产者的数组队列,当消费者消费速度慢与生产者时,会分配新的数组空间。)

     然后调用原Observe的onSubscribe,这是正常流程。

    然后当数据一层一层往下发送(这时候线程可能已经改变了,见subscribeOn的介绍),发送到了ObserveOnObserver,调用了onNext方法

@Override        public void onNext(T t) {            if (done) {                return;            }            if (sourceMode != QueueDisposable.ASYNC) {                queue.offer(t);            }            schedule();        }

    我并没有设置done,同事sourceMode也是0,所以运行queue.offer(t),就是将T添加到了之前创建的队列中。

@Override        public void onError(Throwable t) {            if (done) {                RxJavaPlugins.onError(t);                return;            }            error = t;            done = true;            schedule();        }        @Override        public void onComplete() {            if (done) {                return;            }            done = true;            schedule();        }

    另外两个方法一并看下,onError会给error变量赋值。两个方法都会将done设置为true。接下来看下核心的schedule

void schedule() {            if (getAndIncrement() == 0) {                worker.schedule(this);            }        }

    为什么说这两句话是核心代码呢?因为observeOn所想要实现的线程切换就是在这里完成的!具体切换请见第二节的scedule和worker分析。总之这里就是会在指定线程中取运行this.run()方法。

@Override        public void run() {            if (outputFused) {                drainFused();            } else {                drainNormal();            }        }

    outputFused和requestFusion相关,有可能会在链式下层被调用,此处默认不调用,所以默认值为false。会运行drainNormal方法。在理解它之前,我们需要先看一个关键性的方法:

boolean checkTerminated(boolean d, boolean empty, Observer
a) { if (cancelled) { queue.clear(); return true; } if (d) { Throwable e = error; if (delayError) { if (empty) { if (e != null) { a.onError(e); } else { a.onComplete(); } worker.dispose(); return true; } } else { if (e != null) { queue.clear(); a.onError(e); worker.dispose(); return true; } else if (empty) { a.onComplete(); worker.dispose(); return true; } } } return false; }

    改方法用来判断是否应该停止,返回true表示应该停止。比如如果已经被dispose了,那么显然,应该停止分发数据。参数d,一般就是done,用来表示是否已经完成(在onComplete和onError中会被设置为true),完成就表示所有数据都已经offer到了队列中。如果没完成,那么肯定不应该停止,直接返回false。

    这里可以看到delayError的作用了,如果设置了它为true,那么只要发生错误,我不管我的数据是否为空的标记(一般队列里没有数据时为空),直接调用源Observer观察者的onError方法,并且停止后续分发。否则,只要还有数据,就会暂时忽略错误。

    如果已经完成,并且没有错误,并且队列中也没有更多待处理的数据了,那么会调用onComplete方法并且停止。

void drainNormal() {            int missed = 1;            final SimpleQueue
q = queue; final Observer
a = actual; for (;;) { if (checkTerminated(done, q.isEmpty(), a)) { return; } for (;;) { boolean d = done; T v; try { v = q.poll(); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); s.dispose(); q.clear(); a.onError(ex); worker.dispose(); return; } boolean empty = v == null; if (checkTerminated(d, empty, a)) { return; } if (empty) { break; } a.onNext(v); } missed = addAndGet(-missed); if (missed == 0) { break; } } }

    对到drainNormal方法,此处的无线循环需要结合missed和上面schedule中的addAndIncrement结合来看,我们将该问题记为问题A

    先来看循环内部的,如果已经done完成,并且队列已经为空了,那么停止(注意onError和onComplete方法都是在这个停止操作中调用的)。

    里层循环意义在于,循环从对立中获取数据,如果需要停止,那么直接返回,如果数据为空,跳出循环,如果不为空,调用源observer的onNext方法,然后继续从队列中取数据,重复操作。直到需要停止或者数据为空。

    再来看问题A。

    每次调用onNext产生数据或者onError或者onComplete之后都会调用schedule方法,给当前值++,但是只有为0的时候才会去调度运行run。这就保证只有一个run在被运行!

    而drainNormal方法中,如果判断需要停止,就会跳过missed操作,也就是get()的值不会再为0,于是run方法再也不会被调度。

    如果不是需要停止的情况,推出内层循环的情况只有一个,就是生产者还在继续发送数据(onError和onComplete都还没运行,所以done为false),并且队列中没有数据(所有数据都已经取出,所以empty为true)。那么就会运行missed块内容。

    missed块的内容极为精巧,第一次,missed为1,所以相当于在当前值上-1,如果 -1 之后为0,那么退出run(因为是0所以schedule有机会再次调度run方法)。如果在处理run方法的数据时,onNext方法被多次运行,假设onNext总共运行了10次,于是get为10,第一次的时候调度了run。run方法在第一次循环中处理了所有数据,跳出循环并且当前值-1,但是依然为9,不为0,继续循环,假设没有新数据处理,直接跳出内存循环,这个时候就是-9,missed为0,跳出循环。

    这只是一个并不好的说明,但是还是能了解missed块的原理的。讲到这里,下面反思非常重要:

    我们看到在ObserveOnObserve或者SubscribeOnObserve等多个Observe子类,甚至很多其他类都会继承与AtomicInteger或者AtomicReference等这些原子操作类,主要作用就是改类内方法会在不同的线程被调用,使用原子操作能够有效避免脏数据产生的影响,以这样的方式来替代烦人的锁,不得不说这是一个极其值得借鉴的操作!

    总结流程1

    以上就是流程1的代码分析。脱离代码主要分析下思路,通过重新包装observer观察者,当调用观察者的onNext等方法时,将数据存入队列,并且启动线程调度机制,在指定线程中读取改队列数据,并且调用源Observer观察者的onNext方法。最终实现线程的切换!

流程2

    流程2的代码更简单些

Observable.just("1","2","3").observeOn(Schedulers.newThread()).                subscribe(...);

    和流程1的区别在哪里?关键还是在ObserveOnObserver的OnSubscribe方法中,前面的调用都相同

@Override        public void onSubscribe(Disposable s) {            if (DisposableHelper.validate(this.s, s)) {                this.s = s;                if (s instanceof QueueDisposable) {                    @SuppressWarnings("unchecked")                    QueueDisposable
qd = (QueueDisposable
) s; int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY); if (m == QueueDisposable.SYNC) { sourceMode = m; queue = qd; done = true; actual.onSubscribe(this); schedule(); return; } if (m == QueueDisposable.ASYNC) { sourceMode = m; queue = qd; actual.onSubscribe(this); return; } } queue = new SpscLinkedArrayQueue
(bufferSize); actual.onSubscribe(this); } }

    但是这里s instanceof QueueDisposable 判断成立,所以会调用s的requestFusion方法。(这是我们首次调用这个不明所以的方法,在此处大概可以窥探设计意图了。)

    先来看简单的,如果返回的是ASYNC,和流程1的却别就非常小了,只是设置了sourceMode为ASYNC,并且queue直接赋值为传入进来的QueueDisposable了。

    sourceMode唯一调用在onNext中,如果是ASYNC就不会将数据放入queue中。

    同样如果返回SYNC,在ObserveOnObserver中我们也将不在自己管理queue而是直接调用。

    那么ASYNC和SYNC到底是什么区别呢?这个可能就是requestFusion在这里意义了(因为看的代码不够多,不能保证这是requestFusion的唯一用途。)

    如果返回SYNC那么它的QueueDisposable的数据产生是在poll被调用的同一个线程中。

    如果返回ASYNC,那么QueueDisposable的数据产生和poll应该是生产者和消费者模式。

    不一定准确,但是现在看来确实如此。

总结

    那么到此observeOn也已经介绍结束了。我们知道,它通过schedule来调度onNext系列方法的运行线程。所以observeOn有如下特点:

    observeOn只会改变最终onNext方法的运行线程。

    多个observeOn已最后一个所指定的线程为准。    

 

转载于:https://my.oschina.net/zzxzzg/blog/875035

你可能感兴趣的文章
3.07-JS合并两个JSON对象
查看>>
VUE2.0 实现移动端在固定区域内的滚动效果
查看>>
angularjs入门(一)
查看>>
环境变量PATH、cp命令、mv命令、cat命令、tac命令、more、less、head、tail
查看>>
2.10 环境变量PATH 2.11 cp命令 2.12 mv命令 2.13 文档查看cat/more/less/head/tail
查看>>
Linux快速入门(二)
查看>>
android -- 截屏-view.getDrawingCache()
查看>>
Mac OS X 10.11 系统环境变量配置
查看>>
Linux下设置开机自启动Tomcat
查看>>
Python时间序列分析--从线性模型到GARCH模型
查看>>
开发人员学Linux(12):CentOS7安装配置Memcached和Redis
查看>>
读取access日志文件并解析url encode内容
查看>>
再看GOPATH
查看>>
Android实用笔记——使用WebView在界面中显示网页
查看>>
Maven 更换国内镜像 飞一般的感觉
查看>>
netty handler的执行顺序(2)
查看>>
spring cloud 微服务实战开篇
查看>>
WebMagic的设计思想
查看>>
该死的Word——修复Doc文档的灵异错误
查看>>
写给 Node.js 学徒的 7 个建议
查看>>