rxjava线程切换原理(代码片段)

android小菜比 android小菜比     2022-12-06     487

关键词:

推荐

推荐几篇在学习Rxjava中的阅读的文章。尤其是大神W_BinaryTree的文章,给学习过程中带来了不少启发。

  1. 什么是函数响应式编程(Java&Android版本)
    函数响应式编程介绍
  2. Rxjava2.0 较全的Api介绍和使用,可以当作开发手册
    Rxjava2.0 Api介绍和使用
  3. Rxjava2.0 和1.0的主要区别
    Rxjava2.0 和1.0的主要区别
  4. Rxjava github官方地址
    Github地址
  5. RxJava背压策略的原理
    背压策略
    背压的理解
  6. Rxjava实战系列
    Android RxJava 实际应用讲解:(无条件)网络请求轮询
    Android RxJava 实际应用讲解:(有条件)网络请求轮询
    Android RxJava 实际应用讲解:网络请求嵌套回调
    Android RxJava 实际应用讲解:合并数据源
    Android RxJava 实际应用讲解:从磁盘 / 内存缓存中 获取缓存数据
    Android RxJava 实际应用讲解:联合判断
    Android RxJava:细说 线程控制(切换 / 调度 )(含Retrofit实例讲解)
    Android RxJava 实际应用讲解:网络请求出错重连(结合Retrofit)
  7. Rxjava 深度教程,对Rxjava讲解的比较透彻
    放弃RxBus,拥抱RxJava
    Rxjava的推理过程
    大神关于实际运用过程中的一些看法
  8. Rxjava中设计到的Monad概念理解
    函数式编程有一个重要概念,叫做Monad
    理解 Monad,一份 monad 的解惑指南
    什么是 Monad (Functional Programming)?
    Functors, Applicatives, And Monads In Pictures

基础概念

观察者模式

  1. 观察者概念

观察者模式(Observer Mode)是定义对象间的一对多的依赖关系,当被观察者的状态发生改变时,所有依赖于它的对象都得到通知并自动刷新。
在观察者模式中有以下四个主要角色:
抽象主题[抽象被观察者](Subject):定义添加和删除观察者的方法,内部通过集合维护观察者序列。
具体主题[具体被观察者](Concrete Subject):抽象主题的实现对象,在具体主题内部状态发生变化时,通知所有的观察者更新状态。
抽象观察者(Observer):定义观察者的统一接口和方法。
具体观察者(Concrete Observer):抽象观察者的具体实现类,实现抽象观察者定义的统一接口,以便使本身的状态与主题状态协调。
经典的观察者模式UML类图:

2. 观察者模式在Rxjava中的运用

为了方便分析问题,下面给出Rxjava实现的最简单的被观察者(主题)发送数据观察者打印数据的代码。从代码中分析Rxjava中是如何定义并且实现观察者模式中不同的角色。为了方便说明问题,把Rxjava中的链式(Chain)拆分成最基本的3段。
(1)Observable对象创建,抽象类Observable是接口ObservableSource下的一个抽象实现,通过Observable创建一个可观察对象发射事件流。
(2)Observer对象创建,创建一个观察者Observer来接受并响应可观察对象发射的事件。
(3)Observer订阅Observable,通过subscribe方法,使Observer与Observable建立订阅关系,Observer与Observable便成为了一个整体,Observer便可对Observable中的行为作出响应。
PS: 虽然从代码上看上去像是Observable订阅了Observer,但是其实还是观察者订阅了被观察者,Rxjava这么设计是为了保持链式调用(Chain)。
这里问了说明问题,没有采用极简的代码实现。
java实现:

 private void rxjavaDemo() 
        Observable observable = Observable.create(new ObservableOnSubscribe() 
            @Override
            public void subscribe(ObservableEmitter e) throws Exception 
                e.onNext("R");
                e.onNext("X");
                e.onComplete();
            
        );

        Observer observer = new Observer() 
            @Override
            public void onSubscribe(Disposable d) 
            

            @Override
            public void onNext(Object s) 
                Log.e(RxjavaDemoActivity.class.getSimpleName(), "object : " + s);
            

            @Override
            public void onError(Throwable e) 
            

            @Override
            public void onComplete() 
            
        ;

        observable.subscribe(observer);
    

Kotlin实现:

  private fun rxjavaDemo() 
        val mObservable = Observable.create<String>
            it.onNext("R")
            it.onNext("X")
            it.onComplete()
        

        val mObserver = object : Observer<String>
            override fun onComplete() 

            

            override fun onSubscribe(d: Disposable?) 

            

            override fun onNext(value: String?) 
                Log.e(RxjavaDemoActivity::class.java.simpleName, "object : $value")
            

            override fun onError(e: Throwable?) 

            
        

        mObservable.subscribe(mObserver)
    
  1. 被观察者(Observable)

在上面代码中我们调用了Observable的create方法来创建被观察者。
(1)在Observable类内部提供了众多的静态方法来创建被观察者。诸如:create、just、interval、from、zip、contact、merge等方法。
(2)requireNonNull方法,是Rxjava的判空实现,防止出现空指针异常。
(3)在create方法中会创建ObservableCreate的被观察者。

 @SchedulerSupport(SchedulerSupport.NONE)
    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) 
        ObjectHelper.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    

被观察者ObservableCreate类继承自抽象类Observable,内部实现了父类的subscribeActual方法。

public final class ObservableCreate<T> extends Observable<T>
..........
  @Override
    protected void subscribeActual(Observer<? super T> observer) 
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try 
            source.subscribe(parent);
         catch (Throwable ex) 
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        
    
   .................

被观察者的抽象类Observable,Observable又是接口ObservableSource下的一个抽象实现。
(1)内部实现了ObservableSource接口定义的subscribe方法。subscribe方法内部主要是调用了subscribeActual方法,所有断定订阅关系是在subscribeActual方法内部实现的。
(2)所以要实现订阅关系,观察者真正需要复写的是subscribeActual方法。比如ObservableCreate类就复写了该方法。

public abstract class Observable<T> implements ObservableSource<T> 
    ..........
     @SchedulerSupport(SchedulerSupport.NONE)
    @Override
    public final void subscribe(Observer<? super T> observer) 
        ObjectHelper.requireNonNull(observer, "observer is null");
        try 
            observer = RxJavaPlugins.onSubscribe(this, observer);

            ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

            subscribeActual(observer);
         catch (NullPointerException e)  // NOPMD
            throw e;
         catch (Throwable e) 
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        
    
..............

Observable实现了ObservableSource接口,在ObservableSource内部定义了subscribe方法用来实现订阅观察者(Observer)。

public interface ObservableSource<T> 

    /**
     * Subscribes the given Observer to this ObservableSource instance.
     * @param observer the Observer, not null
     * @throws NullPointerException if @code observer is null
     */
    void subscribe(Observer<? super T> observer);

总结:
(1)ObservableSource就是扮演着抽象被观察者的角色。
(2)在ObservableSource 接口中定义了subscribe方法用来用来实现订阅观察者(Observer)。
(3)Observable类实现了ObservableSource接口并且实现了其subscribe方法,但是它并没有真正的去完成主题和观察者之间的订阅关系,而是内部调用了另一个抽象方法subscribeActual。
(4)在Observable内部提供了一系列创建型操作符, 用来创建不同场景的Observable。

经过上面的介绍,我们已经明白了在Rxjava中被观察者(Observable)是如何创建的,以及是谁扮演者抽象观察者的角色。但是我们并没有在ObservableCreate类中发现具体发送事件的实现。那么这里就有一个问题:

问题: ObservableCreate等内部是如何发送事件到观察者(Observer)的?

  1. 观察者(Observer)

通过上面的代码和分析,我们知道Observer扮演着抽象观察者的角色。下面分别解释一下Observer类内部定义的四个主要的方法:
(1)onSubscribe(Disposable d)里面的Disposable对象,Disposable翻译过来是可随意使用的。相当于观察者和被观察者之间的订阅关系,如果观察者不想订阅被观察者了,可以调用 mDisposable.dispose()取消订阅关系。
(2)onCompleted(): 事件队列完成。RxJava 不仅把每个事件单独处理,还会把它们看做一个队列。RxJava 规定,当不会再有新的 onNext() 发出时,需要触发 onCompleted() 方法作为标志。
(3)onError(): 事件队列异常。在事件处理过程中出异常时,onError() 会被触发,同时队列自动终止,不允许再有事件发出。
(4)onNext():接收数据。
(5)在一个正确运行的事件序列中, onCompleted() 和 onError() 有且只有一个,并且是事件序列中的最后一个。而且onCompleted() 和 onError() 二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个。

public interface Observer<T> 

    /**
     * Provides the Observer with the means of cancelling (disposing) the
     * connection (channel) with the Observable in both
     * synchronous (from within @link #onNext(Object)) and asynchronous manner.
     * @param d the Disposable instance whose @link Disposable#dispose() can
     * be called anytime to cancel the connection
     * @since 2.0
     */
    void onSubscribe(Disposable d);

    /**
     * Provides the Observer with a new item to observe.
     * <p>
     * The @link Observable may call this method 0 or more times.
     * <p>
     * The @code Observable will not call this method again after it calls either @link #onComplete or
     * @link #onError.
     *
     * @param value
     *          the item emitted by the Observable
     */
    void onNext(T value);

    /**
     * Notifies the Observer that the @link Observable has experienced an error condition.
     * <p>
     * If the @link Observable calls this method, it will not thereafter call @link #onNext or
     * @link #onComplete.
     *
     * @param e
     *          the exception encountered by the Observable
     */
    void onError(Throwable e);

    /**
     * Notifies the Observer that the @link Observable has finished sending push-based notifications.
     * <p>
     * The @link Observable will not call this method if it calls @link #onError.
     */
    void onComplete();


既然Observer是个接口,那么就应该是个抽象观察者,具体的观察者是我们在实际运用的时候直接new一个实例对象。

经过上面的介绍,我们已经明白了在Rxjava中观察者是如何创建的以及各个方法的作用。那么观察者是如何接受被观察者发送的事件的呢?

问题: Observer是如何接受数据到被观察者发送的数据?

订阅

前提:观察者订阅被观察者后,被观察者才会开始发送事件。
PS:但是并不是所有的被观察者都需要被订阅才会发送数据,比如Observable.just的方法返回的ObservableJust被观察者者。
示例:执行下面的代码会输出JustObservable,但是将just换成create方法就不会输出,所有得出结论:并不是所有的被观察者都需要被订阅才会发送数据

Observable.just(new JustObservable());

class JustObservable
    JustObservable()
        Log.e("rx","JustObservable");
    

下面我们来分析上面遗留的两个问题:即观察者和被观察者是如何发送和接受事件的

Observable.create生成的被观察者需要被订阅后才会发送数据到观察者。
根据上面的分析我们知道:这里的observable对象是ObservableCreate类的实例。

 observable.subscribe(observer);

这里的subscribe是父类Observable的方法,在里面又会调用subscribeActual方法,Observable的子类ObservableCreate会复写subscribeActual方法。

public final void subscribe(Observer<? super T> observer) 
        ObjectHelper.requireNonNull(observer, "observer is null");
        try 
            observer = RxJavaPlugins.onSubscribe(this, observer);

            ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

            subscribeActual(observer);
         catch (NullPointerException e)  // NOPMD
            throw e;
         catch (Throwable e) 
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        
    

下面来分析一下ObservableCreate类。
(1)在ObservableCreate的构造方法中有个ObservableOnSubscribe类型的形参。
(2)并且正如我们上面所说内部实现了subscribeActual方法。
(3)所以真正处理被观察者和观察者之间实现订阅的逻辑在Observable的subscribeActual方法中。

public final class ObservableCreate<T> extends Observable<T> 
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) 
        this.source = source;
    

    @Override
    protected void subscribeActual(Observer<? super T> observer) 
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try 
            source.subscribe(parent);
         catch (Throwable ex) 
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        
    
   .............

那么在ObservableCreate的构造方法的形参的赋值肯定是在ObservableCreate对象初始化的时候,然而ObservableCreate的初始化,是通过Observable的create方法,下面我们回到Observable的create的方法。

  public static <T> Observable<T> create(ObservableOnSubscribe<T> source) 
        ObjectHelper.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    

在调用create方法的时候需要传递ObservableOnSubscribe的对象作为参数,而这个对象最终会传入到ObservableCreate的构造方法中。
下面来看一下ObservableOnSubscribe的作用:
(1)内部声明了一个subscribe方法,subscribe 接收到一个ObservableEmitter对象。

public interface ObservableOnSubscribe<T> 

    /**
     * Called for each Observer that subscribes.
     * @param e the safe emitter instance, never null
     * @throws Exception on error
     */
    void subscribe(ObservableEmitter<T> e) throws Exception;

下面再来看一下ObservableEmitter类的作用:
(1)ObservableEmitter以一种可以安全取消的形式发送事件到观察者,通过调用setDisposable方法。
(2)继承了Emitter接口。

public interface ObservableEmitter<T> extends Emitter<T> 

    /**
     * Sets a Disposable on this emitter; any previous Disposable
     * or Cancellation will be unsubscribed/cancelled.
     * @param d the disposable, null is allowed
     */
    void setDisposable(Disposable d);

    /**
     * Sets a Cancellable on this emitter; any previous Disposable
     * or Cancellation will be unsubscribed/cancelled.
     * @param c the cancellable resource, null is allowed
     */
    void setCancellable(Cancellable c);

    /**
     * Returns true if the downstream disposed the sequence.
     * @return true if the downstream disposed the sequence
     */
    boolean isDisposed();

    /**
     * Ensures that calls to onNext, onError and onComplete are properly serialized.
     * @return the serialized ObservableEmitter
     */
    ObservableEmitter<T> serialize();


下面我们来看一下Emitter类的作用:
(1)Emitter翻译过来就是发射器的意识,到这里我们可以想到,该类内部应该定义了一些跟事件发送相关的方法。
(2)并且在实例化ObservableOnSubscribe的时候,我们正好是调用了ObservableEmitter的onNext向观察者发送数据的。

Observable observable = Observable.create(new ObservableOnSubscribe() 
            @Override
            public void subscribe(ObservableEmitter e) throws Exception 
                e.onNext("R");
                e.onNext("X");
                e.onComplete();
            
        );
public interface Emitter<T> 

    /**
     * Signal a normal value.
     * @param value the value to signal, not null
     */
    void onNext(T value);

    /**
     * Signal a Throwable exception.
     * @param error the Throwable to signal, not null
     */
    void onError(Throwable error);

    /**
     * Signal a completion.
     */
    void onComplete();

由此我们可以知道Emitter内部声明了三种事件类型,而ObservableEmitter 扩展了Emiiter的功能,添加了Disposable相关的方法,可以用来安全取消事件的发送即取消观察者和被观察者之间的订阅关系。
由上诉分析我们已经知道了ObservableCreate的ObservableOnSubscribe变量的来历和基本作用以及被观察者的创建过程。
下面继续回到ObservableCreate类的subscribeActual方法来看看事件是如何从被观察者发送到观察者的。
(1)在subscribeActual方法内部创建了CreateEmitter类对象,并且接受Observer作为参数,CreateEmitter实现了ObservableEmitter接口。所以该类是负责事件发送,到这里我们已经明确了事件的发送类即ObservableEmitter。
(2)在subscribeActual方法内部,调用了ObservableOnSubscribe的subscribe方法并且传递ObservableEmitter对象实例作为参数,然后就可以调用ObservableEmitter的方法发送事件了。
(3)在subscribeActual方法内部,调用了Observer的onSubscribe方法并且传递CreateEmitter作为参数,这样观察者就持有了发送事件(被观察者)的直接引用,方便观察者取消订阅关系。
到这里我们已经确定到了:观察者是如何和被观察者订阅的。以及事件是如何发送到被观察者的。并且确认了只有发生了订阅关系,事件才可以发送。

下面在来看一下CreateEmitter类的实现逻辑。

(1)CreateEmitter的构造函数,传递一个Observer的对象作为形参。这样就可以将事件发送到对应的观察者了。
(2)CreateEmitter实现了ObservableEmitter接口,作为事件发送器。
(3)onNext事件中,不可以发送参数为null的类型,在事件序列没有中断的情况下把事件从被观察者传递给观察者。
(4)onComplete事件,用于通知观察者事件队列已经没有事件发送了。
(5)onError 事件,事件队列异常。在事件处理过程中出异常时,onError() 会被触发,同时队列自动终止,不允许再有事件发出。
(6)setDisposable、setCancellable方法,观察者根据获取到的Emitter的实例对象,可以取消被观察者和观察者之间的订阅关系。

static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable 


        private static final long serialVersionUID = -3434801548987643227L;

        final Observer<? super T> observer;

        CreateEmitter(Observer<? super T> observer) 
            this.observer = observer;
        

        @Override
        public void onNext(T t) 
            if (t == null) 
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            
            if (!isDisposed()) 
                observer.onNext(t);
            
        

        @Override
        public void onError(Throwable t) 
            if (t == null) 
                t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
            
            if (!isDisposed()) 
                try 
                    observer.onError(t);
                 finally 
                    dispose();
                
             else 
                RxJavaPlugins.onError(t);
            
        

        @Override
        public void onComplete() 
            if (!isDisposed()) 
                try 
                    observer.onComplete();
                 finally 
                    dispose();
                
            
        

        @Override
        public void setDisposable(Disposable d) 
            DisposableHelper.set(this, d);
        

        @Override
        public void setCancellable(Cancellable c) 
            setDisposable(new CancellableDisposable(c));
        

        @Override
        public ObservableEmitter<T> serialize() 
            return new SerializedEmitter<T>(this);
        

        @Override
        public void dispose() 
            DisposableHelper.dispose(this);
        

        @Override
        public boolean isDisposed() 
            return DisposableHelper.isDisposed(get());
        
    

小结:
(1)在RxJava中Observer通过onSubscribe方法获取了发送事件中的Disposable对象,这样他就可以控制观察者和被观察者之间的订阅关系。
(2)被观察者并没有直接控制事件的发送,而是将事件的发送给Disposable对象的发送。
(3)订阅关系并没有发生在subscribe方法中,而是在subscribeActual方法中实现了订阅关系。

简单的线程切换

下面的这段代码实现了最简单的Rxjava线程切换。发送事件就可以在非UI线程(RxNewThreadScheduler 的线程执行,将耗时操作放在子线程中,避免阻塞UI线程。接受事件又切换回了Android UI线程,Android禁止在非UI线程操作UI。这样就简单的实现了在自线程处理耗时操作然后在UI线程刷新UI的逻辑。

为了说明问题,把代码拆分成五段,可以看出,其实每次的链式调用都会生成不同的Observable对象,所以我们在平时开发的时候,应该尽可能避免长链式的调用,规避掉不需要的中间操作。

private void rxjavaDemo() 
        Observable observable = Observable.create(new ObservableOnSubscribe() 
            @Override
            public void subscribe(ObservableEmitter e) throws Exception 
                Log.e("rxjava","ObservableEmitter current thread :"+ Thread.currentThread().getName());
                e.onNext("R");
                e.onNext("X");
                e.onComplete();
            
        );

        Observable observableSubscribeOn = observable.subscribeOn(Schedulers.newThread());

        Observer observer = new Observer() 
            @Override
            public void onSubscribe(Disposable d) 
                Log.e("rxjava","onSubscribe current thread :"+ Thread.currentThread().getName());
            

            @Override
            public void onNext(Object s) 
                Log.e("rxjava","onNext current thread :"+ Thread.currentThread().getName());
                Log.e(RxjavaDemoActivity.class.getSimpleName(), "object : " + s);
            

            @Override
            public void onError(Throwable e) 
            

            @Override
            public void onComplete() 
            
        ;

        Observable observableObserveOn = observableSubscribeOn.observeOn(AndroidSchedulers.mainThread());
        
        observableObserveOn.subscribe(observer);
    

subscribeOn

下面我们将结合上面的分析和代码,分析一下在observable.subscribeOn内部都做了那些操作。
(1)实例化了ObservableSubscribeOn对象。并且传入的两个参数分别是ObservableCreate对象和Scheduler对象。
(2)ObservableCreate对象是我们上面分析的Observable.create生成的一个被观察者。
(3)Scheduler对象,现在猜测应该是和线程调度有关的类。接下来会分析到。

  public final Observable<T> subscribeOn(Scheduler scheduler) 
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    

下面在来分析ObservableSubscribeOn类。

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> 
    final Scheduler scheduler;

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) 
        super(source);
        this.scheduler = scheduler;
    

    @Override
    public void subscribeActual(final Observer<? super T> s) 
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

        s.onSubscribe(parent);

        parent.setDisposable(scheduler.scheduleDirect(new Runnable() 
            @Override
            public void run() 
                source.subscribe(parent);
            
        ));
    
    ..........

(1)继承了AbstractObservableWithUpstream类。AbstractObservableWithUpstream类内部存储了ObservableSource类对象,根据上下文,这里的source就是ObservableCreate类对象。
(2)实现了subscribeActual方法,并且在在subscribeActual方法内部创建了SubscribeOnObserver对象,这里的SubscribeOnObserver其实也是个观察者,可以理解成事件经过SubscribeOnObserver观察者中转了才最终到达我们创建的观察者。SubscribeOnObserver 是AtomicReference的子类(保证原子性),实现了 Observer接口 和 Disposable 接口。
(3)内部调用onSubscribe方法将SubscribeOnObserver传递给观察者,这样观察者就可以控制事件的接受了,即获取了事件发送(被观察者发送数据)的控制权。
(3)ObservableSubscribeOn和ObservableCreate一样,也是Observable的一个子类。并且在ObservableSubscribeOn内部持有它上一步的被观察者Observable的引用(这里就是ObservableCreate)。
(4) source.subscribe(parent)实现了观察者和被观察者之间的订阅关系,并将通过SubscribeOnObserver类对象传递给ObservableOnSubscribe的subscribe方法。SubscribeOnObserver类对象就可以实现事件的发送了。
下面在来看一下SubscribeOnObserver类:作用和代码基本同于CreateEmitter类,看上面的CreateEmitter类分析即可。

static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable 

        private static final long serialVersionUID = 8094547886072529208L;
        final Observer<? super T> actual;

        final AtomicReference<Disposable> s;

        SubscribeOnObserver(Observer<? super T> actual) 
            this.actual = actual;
            this.s = new AtomicReference<Disposable>();
        

        @Override
        public void onSubscribe(Disposable s) 
            DisposableHelper.setOnce(this.s, s);
        

        @Override
        public void onNext(T t) 
            actual.onNext(t);
        

        @Override
        public void onError(Throwable t) 
            actual.onError(t);
        

        @Override
        public void onComplete() 
            actual.onComplete();
        

        @Override
        public void dispose() 
            DisposableHelper.dispose(s);
            DisposableHelper.dispose(this);
        

        @Override
        public boolean isDisposed() 
            return DisposableHelper.isDisposed(get());
        

        void setDisposable(Disposable d) 
            DisposableHelper.setOnce(this, d);
        
    

经过上面的分析,我们明确了observable.subscribeOn(Schedulers.newThread())创建的被观察者(Observable)和观察者(Observer)之间的订阅关系。下面来分析一下subscribeOn是如何实现线程切换的。首先我们思考一下,如果要实现线程切换,肯定要创建子线程。
问题: 子线程是如何创建
在调用subscribeOn的时候,传入了Scheduler参数,Scheduler翻译过来就是调度者的意识。通过调用Schedulers的newThread方法,创建子线程(RxNewThreadScheduler)。下面我们以RxNewThreadScheduler线程为例。看看线程是如何被创建的。

Observable observableSubscribeOn = observable.subscribeOn(Schedulers.newThread());

在Schedulers类内部定义了newThread静态方法用于生成Scheduler对象。
(1)其中NEW_THREAD为默认的生成的一个Scheduler对象。


static final Scheduler NEW_THREAD;

  public static Scheduler newThread() 
        return RxJavaPlugins.onNewThreadScheduler(NEW_THREAD);
    
static 
       ..........
        NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new Callable<Scheduler>() 
            @Override
            public Scheduler call() throws Exception 
                return NewThreadHolder.DEFAULT;
            
        );
    

接下来我们就来看看,initNewThreadScheduler() 是如何生成一个Scheduler实例的。
(1)在initNewThreadScheduler方法中经过一系列的条件判断,最终会执行到call方法(延迟初始化)。
(2)NewThreadHolder.DEFAULT会返回一个NewThreadScheduler对象(单例模式)

     static final Scheduler DEFAULT = NewThreadScheduler.instance();
    

下面再来看看单例模式(饿汉式)的NewThreadScheduler类,看名字就可以猜测到是线程调度者。

(1)NewThreadScheduler 继承自Scheduler抽象类。
(2)通过静态代码块中创建了RxThreadFactory线程工厂对象,该类实现了ThreadFactory接口,并且在RxThreadFactory类的newThread方法中创建了优先级为5的线程Thread。
(3)在NewThreadScheduler的createWorker()方法中,创建了NewThreadWorker 对象。

public final class NewThreadScheduler extends Scheduler 

    private static final String THREAD_NAME_PREFIX = "RxNewThreadScheduler";
    private static final RxThreadFactory THREAD_FACTORY;

    private static final NewThreadScheduler INSTANCE = new NewThreadScheduler();

    /** The name of the system property for setting the thread priority for this Scheduler. */
    private static final String KEY_NEWTHREAD_PRIORITY = "rx2.newthread-priority";

    static 
        int priority = Math.max(Thread.MIN_PRIORITY, Math.min(Thread.MAX_PRIORITY,
                Integer.getInteger(KEY_NEWTHREAD_PRIORITY, Thread.NORM_PRIORITY)));

        THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX, priority);
    

    public static NewThreadScheduler instance() 
        return INSTANCE;
    

    private NewThreadScheduler() 

    

    @Override
    public Worker createWorker() 
        return new NewThreadWorker(THREAD_FACTORY);
    

接下来我们就来看看NewThreadWorker 都做了写什么。
(1)在NewThreadWorker的构造函数中,通过调用SchedulerPoolFactory.create的方法并且传入NewThreadScheduler中提供的线程工厂RxThreadFactory创建了一个ScheduledExecutorService对象。

public class NewThreadWorker extends Scheduler.Worker implements Disposable 
    private final ScheduledExecutorService executor;

    volatile boolean disposed;

    public NewThreadWorker(ThreadFactory threadFactory) 
        executor = SchedulerPoolFactory.create(threadFactory);
    
.........

在来看一下SchedulerPoolFactory类
(1)通过create方法创建了核心线程数量为1的线程池。

 /**
     * Creates a ScheduledExecutorService with the given factory.
     * @param factory the thread factory
     * @return the ScheduledExecutorService
     */
    public static ScheduledExecutorService create(ThreadFactory factory) 
        final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory);
        if (exec instanceof ScheduledThreadPoolExecutor) 
            ScheduledThreadPoolExecutor e = (ScheduledThreadPoolExecutor) exec;
            POOLS.put(e, exec);
        
        return exec;
    

分析到这里我们明确了,通过Schedulers.newThread()会创建一个核心线程数量为1的线程池。
创建完线程,下面就是启动和运行线程了,并且将事件的发送,放在子线程中进行处理。并且我们都知道调用多次 subscribeOn 指定子线程只有第一次会生效 ,下面我们将带着这两个疑问,来分析一下。
在ObservableSubscribeOn的subscribeActual方法中,通过source.subscribe(parent)调用实现了观察者和被观察者之间的订阅关系。我们可以看到该方法的执行是放在了Runnable里面执行的。所以线程的切换,应该就发生子此处。
(1)经过上面的分析,这里的scheduler对象是NewThreadScheduler类。并且调用了Schedule的scheduleDirect方法。

  parent.setDisposable(scheduler.scheduleDirect(new Runnable() 
            @Override
            public void run() 
                source.subscribe(parent);
            
        ));

下面来看看Scheduler类的scheduleDirect方法。
(1)内部调用了重载的scheduleDirect方法。
(2)createWorker返回的是NewThreadWorker类对象。并且调用了NewThreadWorker类的schedule方法。

 public Disposable scheduleDirect(Runnable run) 
        return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
    
   public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) 
        final Worker w = createWorker();

        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        w.schedule(new Runnable() 
            @Override
            public void run() 
                try 
                    decoratedRun.run();
                 finally 
                    w.dispose();
                
            
        , delay, unit);

        return w;
    

下面来看看NewThreadWorker类的schedule方法。
(1)在schedulerActual方法中,通过ScheduledExecutorService执行submit或schedule执行一个Runnable任务,即开启了线程池里面的线程任务。

 @Override
    public Disposable schedule(final Runnable run) 
        return schedule(run, 0, null);
    

    @Override
    public Disposable schedule(final Runnable action, long delayTime, TimeUnit unit) 
        if (disposed) 
            return EmptyDisposable.INSTANCE;
        
        return scheduleActual(action, delayTime, unit, null);
    

 public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, TimeUnit unit, DisposableContainer parent) 
        Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

        if (parent != null) 
            if (!parent.add(sr)) 
                return sr;
            
        

        Future<?> f;
        try 
            if (delayTime <= 0) 
                f = executor.submit((Callable<Object>)sr);
             else 
                f = executor.schedule((Callable<Object>)sr, delayTime, unit);
            
            sr.setFuture(f);
         catch (RejectedExecutionException ex) 
            parent.remove(sr);
            RxJavaPlugins.onError(ex);
        

        return sr;
    

分析到这里我们知道了线程的开启是在NewThreadWorker类中进行的。

那么还有个疑问:调用多次 subscribeOn 指定子线程只有第一次会生效
(1)这里的生效并不是指其他的subscribeOn方法创建的线程没有生效,而是会被第一次的subscribeOn创建的线程“掩盖掉”
(2)多次调用subscribeOn,会生成若干个Observable对象,每个新生成的对象都有切换线程的能力,但是只有第一次的subscribeOn才生效,因为后续的线程切换被第一个“掩盖掉”了。
这么说可能有点抽象,下面以一张图来说明:
(1)每次调用subscribeOn方法,都会生成一个Observable,并且回持有上游的Observable对象。
(2)事件的发送是在第一次的subscribeOn创建的子线程中发送的,中间不会切换线程。

observeOn

通过上面subscribeOn的流程梳理,我们知道了上游事件是被如何切换都子线程的。下面我们将分析事件是如何被切换到下游线程的,大部分情况下就是我们的Android UI线程。
下面我们将结合上面的分析和代码,分析一下在observable.observeOn内部都做了那些操作。

observableSubscribeOn.observeOn(AndroidSchedulers.mainThread())

下面来看一下Observable的observeOn方法:
(1) observeOn 方法返回了一个 ObservableObserveOn类对象。

  public final Observable<T> observeOn(Scheduler scheduler) 
        return observeOn(scheduler, false, bufferSize());
    

    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) 
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    

接下来看看 ObservableObserveOn类。

(1)该类ObservableSubscribeOn基本一致,继承了 AbstractObservableWithUpstream ,拥有ObservableSource类型对象,这里是ObservableSubscribeOn实例对象。
(2)在subscribeActual 方法内部,scheduler是HandlerScheduler类型对象,这里的scheduler就是展开分析了,基本上就是利用Android的Handler机制实现线程切换的。
(3)通过 scheduler.createWorker() 创建了 HandlerWorker的Worker对象。
(4)创建了一个ObserveOnObserver对象,该 类实现了Observer 接口,所有它是个Observer,同时实现了一个Runnable接口,这样通过Handler就可以执行到ObserveOnObserver的run方法。

public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> 
    final Scheduler scheduler;
    final boolean delayError;
    final int bufferSize;
    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) 
        super(source);
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    

    @Override
    protected void subscribeActual(Observer<? super T> observer) 
        if (scheduler instanceof TrampolineScheduler) 
            source.subscribe(observer);
         else 
            Scheduler.Worker w = scheduler.createWorker();

            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        
    
    ...........


下面就来看看这个 ObserveOnObserver,通过上面我们知道,线程切换是通过Handler实现的。
(1)actual 参数是我们创建的Observer对象。
(2)Worker参数是HandlerWorker对象。通过AndroidSchedulers.mainThread()的调用是创建的。

 static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
    implements Observer<T>, Runnable 

        private static final long serialVersionUID = 6576896619930983584L;
        final Observer<? super T> actual;
        final Scheduler.Worker worker;
        final boolean delayError;
        final int bufferSize;

        SimpleQueue<T> queue;

        Disposable s;

        Throwable error;
        volatile boolean done;

        volatile boolean cancelled;

        int sourceMode;

        boolean outputFused;

        ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) 
            this.actual = actual;
            this.worker = worker;
            this.delayError = delayError;
            this.bufferSize = bufferSize;
        
        

下面来看一下run方法里面的操作:
outputFused参数默认是false,所以接下来看看drainNormal方法。

 @Override
        public void run() 
            if (outputFused) 
                drainFused();
             else 
                drainNormal();
            
        

(1)queue参数是在onSubscribe方法里面创建的。而onSubscribe方法的调用,则是在上游的subscribeActual方法中调用的。
(2)内部通过轮训队列里面的事件,将事件最终发送到Observer。

 void drainNormal() 
            int missed = 1;

            final SimpleQueue<T> q = queue;
            final Observer<? super T> a = actual;

            for (;;) 
                if (checkTerminated(done, q.isEmpty(), a)) 
                    return;
                

                for (;;) 
                    boolean d = done;
                    T v;

                    try 
                        v = q.poll();
                     catch (Throwable ex) 
                        Exceptions.throwIfFatal(ex);
                        s.dispose();
                        q.clear();
                        a.onError(ex);
                        return;
                    
                    boolean empty = v == null;

                    if (checkTerminated(d, empty, a)) 
                        return;
                    

                    if (empty) 
                        break;
                    

                    a.onNext(v);
                

                missed = addAndGet(-missed);
                if (missed == 0) 
                    break;
                
            
        

下面以一张图,总结一下:observeOn和subscribeOn的流程。

总结

(1)subscribeOn 控制上游线程切换,subscribeOn多次调用只有第一次的subscribeOn会起作用。
(2)observeOn控制下游线程切换。observeOn可以使用多次。并且observeOn 后面的所有操作都会在observeOn指定的线程中执行。
(3)subscribeOn和observeOn之间的操作,会在subscribeOn 指定的线程中执行,直到执行了observeOn操作。

一张图搞定-rxjava2的线程切换原理和内存泄露问题(代码片段)

一张图搞定-RxJava2的线程切换原理和内存泄露问题分析首先祭出我自己画的一张图这张图显示的是RxJava2源码层面上的调用关系下面通过一个例子来解释一下这张图publicclassMainActivityextendsActivityprivatestaticfinalStringTAG="MainActivity... 查看详情

一张图搞定-rxjava2的线程切换原理和内存泄露问题(代码片段)

一张图搞定-RxJava2的线程切换原理和内存泄露问题分析首先祭出我自己画的一张图这张图显示的是RxJava2源码层面上的调用关系下面通过一个例子来解释一下这张图publicclassMainActivityextendsActivityprivatestaticfinalStringTAG="MainActivity... 查看详情

rxjava2探索-线程切换原理之observeon(代码片段)

前言在前一篇文章《RxJava2探索-线程切换原理之subscribeOn》里我说了我会把RxJava线程切换相关的知识分成两部分来讲,如果大家没看过上一部分,我建议大家戳进去看一下,因为我这篇文章会基于里面的一些结论和原... 查看详情

rxjava2线程切换原理分析

一、概述  本节将分析RxJava2的线程切换模型。通过对线程切换源代码的分析到达对RxJava2线程切换彻底理解的目的。通过对本节的学习你会发现,RxJava2线程切换是如此的简单,仅仅是通过两个操作符就能完成从子线程到主线程... 查看详情

rxjava源码解析-线程切换源码(代码片段)

Rxjava源码解析系列:Rxjava源码解析(一)-subscribe源码Rxjava源码解析(二)-线程切换源码Rxjava源码解析(三)-Schedulers默认线程池概述前文已经讲了rxjava简单subscribe的源码,有兴趣的读者可以看下。Rxjava源码解析(一)-subscribe源码本... 查看详情

rxjava源码解析-线程切换源码(代码片段)

概述前文已经讲了rxjava简单subscribe的源码,有兴趣的读者可以看下。Rxjava源码解析(一)-subscribe源码本文将基于前文的分析,继续讲下rxjava中断线程切换。Demo主要逻辑如下:Observable调用create()创建ObservableCreateObservableCre... 查看详情

rxjava源码解析-线程切换源码(代码片段)

Rxjava源码解析系列:Rxjava源码解析(一)-subscribe源码Rxjava源码解析(二)-线程切换源码Rxjava源码解析(三)-Schedulers默认线程池概述前文已经讲了rxjava简单subscribe的源码,有兴趣的读者可以看下。Rxjava源码解析(一)-subscribe源码本... 查看详情

rxjava源码解析-schedulers默认线程池(代码片段)

Rxjava源码解析系列:Rxjava源码解析(一)-subscribe源码Rxjava源码解析(二)-线程切换源码Rxjava源码解析(三)-Schedulers默认线程池概述前文已经分析过rxjava中我们常见用法的一些源码,还没有了解的读者推荐看下:Rxjava源码解... 查看详情

rxjava编程思想3-(实现简易版rxjava,如何实现线程切换)(代码片段)

前言已经使用rxjava两个月了,觉得rxjava特别好用,爱不释手。本文目的是通过几百行的代码,帮助大家理解rxjava中的链式调用,操作符,线程切换是如何实现的。如果有写的不对的地方欢迎在评论区留言,... 查看详情

rxjava常用总结以及原理理解

1、RxJava如何实现线程切换?subscribeOn是通过新建Observable的方式,使用OnSubscribe类的方式去做到线程切换的。observeOn是通过operator操作符的形式去完成线程切换的,所以他的作用域和其他操作符一样,是调用observeOn... 查看详情

浅析rxjava1.x&2.x版本区别及原理:1.x版本scheduler调度线程变换及subscribeonobserveon方法源码解析(代码片段)

上两篇文章都是对比分析RxJava中较基本的订阅流程与操作,即Observable、Flowable等基本元素的源码,还有map、lift操作符的源码。在对Rxjava框架有了一个坚实的基础后,此篇文章将直袭Rxjava中最闪亮的Point,也是Android... 查看详情

rxjava源码解析-schedulers默认线程池(代码片段)

Rxjava源码解析系列:Rxjava源码解析(一)-subscribe源码Rxjava源码解析(二)-线程切换源码Rxjava源码解析(三)-Schedulers默认线程池概述前文已经分析过rxjava中我们常见用法的一些源码,还没有了解的读者推荐看下:Rxjava源码解... 查看详情

从源码的角度去探索rxjava笔记(代码片段)

RxJava是一个异步操作库,在线程切换上有很大的优势,以链式的Api方式来进行线程的切换,以及提供一系列操作符,使得代码变得简洁,维护起来也相对简单,可以避免回调地狱或迷之缩进.本篇记录RxJava... 查看详情

减少rxjava中多余的线程调度(代码片段)

作者:路人宇为什么要抑制线程调度对于一次可观察序列中的多次subscribeOn或者observeOn操作,哪怕指定在相同的Schedulers.io调度器上,观察者操作也会在不同的线程上执行,即发生从io线程到io线程的切换。这种线程... 查看详情

rxjava源码解析-subscribe源码(代码片段)

Rxjava源码解析系列:Rxjava源码解析(一)-subscribe源码Rxjava源码解析(二)-线程切换源码Rxjava源码解析(三)-Schedulers默认线程池概述rxjava的应用还是较为广泛的,在实际项目中经常与MVP一起使用,可以使代码的可读性更高。... 查看详情

rxjava是如何实现线程切换的(上)

前言通过前一篇的从观察者模式出发,聊聊RxJava,我们大致理解了RxJava的实现原理,在RxJava中可以非常方便的实现不同线程间的切换。subscribeOn用于指定上游线程,observeOn用于指定下游线程,多次用subscribeOn指定上... 查看详情

rxjava是如何实现线程切换的(上)

前言通过前一篇的从观察者模式出发,聊聊RxJava,我们大致理解了RxJava的实现原理,在RxJava中可以非常方便的实现不同线程间的切换。subscribeOn用于指定上游线程,observeOn用于指定下游线程,多次用subscribeOn指定上... 查看详情

浅析rxjava1.x&2.x版本使用区别及原理:observableflowable等基本元素源码解析(代码片段)

RxJava开源框架的风靡程度在Github上无需多言,它带来的响应式编程模式和线程随意切换、嵌套请求、背压等功能给了开发者耳目一新的体验,更是成为了大多数APP中常用的RxJava+Okhttp/Retrofit+MVP/MVVM/Clean黄金组合中的... 查看详情