关键词:
前言
如果你对RxJava1.x还不是了解,可以参考下面文章。
1. RxJava使用介绍 【视频教程】
2. RxJava操作符
? Creating Observables(Observable的创建操作符) 【视频教程】
? Transforming Observables(Observable的转换操作符) 【视频教程】
? Filtering Observables(Observable的过滤操作符) 【视频教程】
? Combining Observables(Observable的组合操作符) 【视频教程】
? Error Handling Operators(Observable的错误处理操作符) 【视频教程】
? Observable Utility Operators(Observable的辅助性操作符) 【视频教程】
? Conditional and Boolean Operators(Observable的条件和布尔操作符) 【视频教程】
? Mathematical and Aggregate Operators(Observable数学运算及聚合操作符) 【视频教程】
? 其他如observable.toList()、observable.connect()、observable.publish()等等; 【视频教程】
3. RxJava Observer与Subcriber的关系 【视频教程】
4. RxJava线程控制(Scheduler) 【视频教程】
5. RxJava 并发之数据流发射太快如何办(背压(Backpressure)) 【视频教程】
开始
Rxjava 已经于2016年11月12日正式发布了2.0.1版本。
RxJava 2.0 已经按照Reactive-Streams specification规范完全的重写了。RxJava2.0 已经独立于RxJava 1.x而存在。
RxJava2.0相比RxJava1.x,它的改动还是很大的,下面我将带大家了解这些改动。
RxJava2.0与1.x的区别
Maven地址
为了让 RxJava 1.x 和 RxJava 2.x 相互独立,我们把RxJava 2.x 被放在了maven io.reactivex.rxjava2:rxjava:2.x.y 下,类放在了 io.reactivex 包下用户从 1.x 切换到 2.x 时需要导入的相应的包,但注意不要把1.x和2.x混淆了。
接口变化
RxJava2.0 是遵循 Reactive Streams Specification 的规范完成的,新的特性依赖其提供的4个基础接口。分别是:
- Publisher
- Subscriber
- Subscription
- Processor
在后边的介绍中我们会涉及到。
Javadoc文档
官方2.0的 Java 文档 http://reactivex.io/RxJava/2.x/javadoc/
添加依赖
Android端使用RxJava需要依赖新的包名:
//RxJava的依赖包
compile ‘io.reactivex.rxjava2:rxjava:2.0.3‘
//RxAndroid的依赖包
compile ‘io.reactivex.rxjava2:rxandroid:2.0.1‘
- 1
- 2
- 3
- 4
Nulls
RxJava1.x中,支持 null 值,如下代码所示:
Observable.just(null);
Single.just(null);
- 1
- 2
RxJava 2.0不再支持 null 值,如果传入一个null会抛出 NullPointerException
Observable.fromCallable(() -> null)
.subscribe(System.out::println, Throwable::printStackTrace);
Observable.just(1).map(v -> null)
.subscribe(System.out::println, Throwable::printStackTrace);
- 1
- 2
- 3
- 4
- 5
Observable and Flowable
在本节开始之前,我们先了解下RxJava背压(Backpressure)机制的问题。
什么是背压(Backpressure)
在RxJava中,可以通过对Observable连续调用多个Operator组成一个调用链,其中数据从上游向下游传递。当上游发送数据的速度大于下游处理数据的速度时,就需要进行Flow Control了。如果不进行Flow Control,就会抛出MissingBackpressureException异常。
这就像小学做的那道数学题:一个水池,有一个进水管和一个出水管。如果进水管水流更大,过一段时间水池就会满(溢出)。这就是没有Flow Control导致的结果。
再举个例子,在 RxJava1.x 中的 observeOn, 因为是切换了消费者的线程,因此内部实现用队列存储事件。在 Android 中默认的 buffersize 大小是16,因此当消费比生产慢时, 队列中的数目积累到超过16个,就会抛出MissingBackpressureException。
如果你想了解更多关于背压的知识,请参考:
http://blog.csdn.net/jdsjlzx/article/details/52717636
http://www.jianshu.com/p/2c4799fa91a4
下面我们通过一段代码来“感受”一下背压。
Observable.interval(1, TimeUnit.MILLISECONDS)
//将观察者的工作放在新线程环境中
.observeOn(Schedulers.newThread())
//观察者处理每1000ms才处理一个事件
.subscribe(new Subscriber<Long>()
@Override
public void onCompleted()
System.out.println("onCompleted");
@Override
public void onError(Throwable e)
System.out.println("onError :"+ e);
@Override
public void onNext(Long value)
try
Thread.sleep(1000);
catch (InterruptedException e)
e.printStackTrace();
System.out.println("onNext value :"+ value);
);
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
Flow Control有哪些思路呢?大概是有四种:
- 背压(Backpressure);
- 节流(Throttling);
- 打包处理;
- 调用栈阻塞(Callstack blocking)。
这里限于篇幅的问题,我们就不再一一介绍了,请移步:https://gold.xitu.io/post/58535b5161ff4b0063aa6b10
如何让Observable支持Backpressure?
在RxJava 1.x中,有些Observable是支持Backpressure的,而有些不支持。但不支持Backpressure的Observable可以通过一些operator来转化成支持Backpressure的Observable。这些operator包括:
- onBackpressureBuffer
- onBackpressureDrop
- onBackpressureLatest
- onBackpressureBlock(已过期)
它们转化成的Observable分别具有不同的Backpressure策略。
而在RxJava2.0 中,Observable 不再支持背压,而是改用Flowable 支持非阻塞式的背压。Flowable是RxJava2.0中专门用于应对背压(Backpressure)问题而新增的(抽象)类。其中,Flowable默认队列大小为128。并且规范要求,所有的操作符强制支持背压。幸运的是, Flowable 中的操作符大多与旧有的 Observable 类似。
上面提到的四种operator的前三种分别对应Flowable的三种Backpressure策略:
- BackpressureStrategy.BUFFER
- BackpressureStrategy.DROP
- BackpressureStrategy.LATEST
onBackpressureBuffer是不丢弃数据的处理方式。把上游收到的全部缓存下来,等下游来请求再发给下游。相当于一个水库。但上游太快,水库(buffer)就会溢出。
onBackpressureDrop和onBackpressureLatest比较类似,都会丢弃数据。这两种策略相当于一种令牌机制(或者配额机制),下游通过request请求产生令牌(配额)给上游,上游接到多少令牌,就给下游发送多少数据。当令牌数消耗到0的时候,上游开始丢弃数据。但这两种策略在令牌数为0的时候有一点微妙的区别:onBackpressureDrop直接丢弃数据,不缓存任何数据;而onBackpressureLatest则缓存最新的一条数据,这样当上游接到新令牌的时候,它就先把缓存的上一条“最新”数据发送给下游。可以结合下面两幅图来理解。
onBackpressureBlock是看下游有没有需求,有需求就发给下游,下游没有需求,不丢弃,但试图堵住上游的入口(能不能真堵得住还得看上游的情况了),自己并不缓存。这种策略已经废弃不用。
注意:在RxJava2.0中,旧的Observable也保留了,你还可以像以前那样使用,同时要注意接口的变化。
需要说明的是,RxJava2.0中,Flowable是对Observable的补充(而不是替代),也可以这么说,Flowable是能够支持Backpressure的Observable。
何时用Observable
- 当上游在一段时间发送的数据量不大(以1000为界限)的时候优先选择使用Observable;
- 在处理GUI相关的事件,比如鼠标移动或触摸事件,这种情况下很少会出现backpressured的问题,用Observable就足以满足需求;
- 获取数据操作是同步的,但你的平台不支持Java流或者相关特性。使用Observable的开销低于Flowable。
何时用Flowable
- 当上游在一段时间发送的数据量过大的时候(这个量我们往往无法预计),此时就要使用Flowable以限制它所产生的量的元素10K +处理。
- 当你从本地磁盘某个文件或者数据库读取数据时(这个数据量往往也很大),应当使用Flowable,这样下游可以根据需求自己控制一次读取多少数据;
- 以读取数据为主且有阻塞线程的可能时用Flowable,下游可以根据某种条件自己主动读取数据。
Single、Completable
Single 与 Completable 都基于新的 Reactive Streams 的思想重新设计了接口,主要是消费者的接口, 现在他们是这样的:
interface SingleObserver<T>
void onSubscribe(Disposable d);
void onSuccess(T value);
void onError(Throwable error);
interface CompletableObserver<T>
void onSubscribe(Disposable d);
void onComplete();
void onError(Throwable error);
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
Subscriber
对比一下 Subscriber :
public interface Subscriber<T>
public void onSubscribe(Subscription s);
public void onNext(T t);
public void onError(Throwable t);
public void onComplete();
- 1
- 2
- 3
- 4
- 5
- 6
我们会发现和以前不一样的是多了一个 onSubscribe 的方法, Subscription 如下:
Subscription
public interface Subscription
public void request(long n);
public void cancel();
- 1
- 2
- 3
- 4
熟悉 RxJava 1.x 的朋友能发现, 新的 Subscription 更像是综合了旧的 Producer 与 Subscription 的综合体。他既可以向上游请求(request)数据,又可以打断并释放(cancel)资源。而旧的 Subscription 在这里因为名字被占,而被重新命名成了 Disposable。
注意:Subscription 不再有订阅subcribe和unSubcribe的概念。
Disposable
public interface Disposable
void dispose();
boolean isDisposed();
- 1
- 2
- 3
- 4
这里最大的不同就是这个 onSubscribe ,根据 Specification, 这个函数一定是第一个被调用的, 然后就会传给调用方一个 Subscription ,通过这种方式组织新的背压关系。当我们消费数据时,可以通过 Subscription 对象,自己决定请求数据。
这里就可以解释上面的非阻塞的背压。旧的阻塞式的背压,就是根据下游的消费速度,中游可以选择阻塞住等待下游的消费,随后向上游请求数据。而新的非阻塞就不在有中间阻塞的过程,由下游自己决定取多少,还有背压策略,如抛弃最新、抛弃最旧、缓存、抛异常等。
而新的接口带来的新的调用方式与旧的也不太一样, subscribe 后不再会有 Subscription 也就是如今的 Disposable,为了保持向后的兼容, Flowable 提供了 subscribeWith方法 返回当前的 Subscriber 对象, 并且同时提供了 DefaultSubscriber , ResourceSubscriber , DisposableSubscriber ,让他们提供了 Disposable 接口,并且可以从外面取消 dispose()。 现在也可以完成和以前类似的代码:
ResourceSubscriber<Integer> subscriber = new ResourceSubscriber<Integer>()
@Override
public void onStart()
request(Long.MAX_VALUE);
@Override
public void onNext(Integer t)
System.out.println(t);
@Override
public void onError(Throwable t)
t.printStackTrace();
@Override
public void onComplete()
System.out.println("Done");
;
Flowable.range(1, 10).delay(1, TimeUnit.SECONDS).subscribe(subscriber);
subscriber.dispose();
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
注意,由于Reactive-Streams的兼容性,方法onCompleted被重命名为onComplete。另外注意dispose()方法,这个方法允许你释放资源。
RxJava2.x中提供了几个Subcriber对象,如下所示:
- DefaultSubscriber:通过实现Subscriber接口,可以通过调用request(long n)方法请求或者cancel()方法取消订阅(同步请求)
public abstract class DefaultSubscriber<T> implements Subscriber<T>
- 1
- DisposableSubscriber:通过实现Desposable异步删除。
public abstract class DisposableSubscriber<T> implements Subscriber<T>, Disposable
- 1
- ResourceSubscriber:允许异步取消其订阅相关资源,节省内存而且是线程安全。
public abstract class ResourceSubscriber<T> implements Subscriber<T>, Disposable
- 1
- SafeSubscriber:包装另一个订阅者,并确保所有onXXX方法遵守协议(序列化要求访问除外)。
public final class SafeSubscriber<T> implements Subscriber<T>, Subscription
- 1
- SerializedSubscriber:序列化访问另一个订阅者的onNext,onError和onComplete方法。
public final class SerializedSubscriber<T> implements Subscriber<T>, Subscription
- 1
在onSubscribe/onStart中调用request
注意,在Subscriber.onSubscribe或ResourceSubscriber.onStart中调用request(n)将会立即调用onNext,实例代码如下:
Flowable.range(1, 3).subscribe(new Subscriber<Integer>()
@Override
public void onSubscribe(Subscription s)
System.out.println("OnSubscribe start");
s.request(Long.MAX_VALUE);
System.out.println("OnSubscribe end");
@Override
public void onNext(Integer v)
System.out.println(v);
@Override
public void onError(Throwable e)
e.printStackTrace();
@Override
public void onComplete()
System.out.println("Done");
);
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
输出结果如下:
OnSubscribe start
1
2
3
Done
OnSubscribe end
- 1
- 2
- 3
- 4
- 5
- 6
当你在onSubscribe/onStart中做了一些初始化的工作,而这些工作是在request后面时,会出现一些问题,在onNext执行时,你的初始化工作的那部分代码还没有执行。为了避免这种情况,请确保你调用request时,已经把所有初始化工作做完了。
这个行为不同于1.x中的 request要经过延迟的逻辑直到上游的Producer到达时。在2.0中,总是Subscription先传递下来,90%的情况下没有延迟请求的必要。
Subscription
在RxJava 1.x中,接口rx.Subscription负责流和资源的生命周期管理,即退订和释放资源,例如scheduled tasks。Reactive-Streams规范用这个名称指定source和consumer之间的关系: org.reactivestreams.Subscription 允许从上游请求一个正数,并支持取消。
为了避免名字冲突,1.x的rx.Subscription被改成了 io.reactivex.Disposable。
因为Reactive-Streams的基础接口org.reactivestreams.Publisher 定义subscribe()为无返回值,Flowable.subscribe(Subscriber)不再返回任何Subscription。其他的基础类型也遵循这种规律。
在2.x中其他的subscribe的重载方法返回Disposable。
原始的Subscription容器类型已经被重命名和修改。
- CompositeSubscription 改成 CompositeDisposable,
- SerialSubscription 和MultipleAssignmentSubscription 被合并到了 SerialDisposable。 set() 方法取消了旧值,而replace()方法没有。
- RefCountSubscription 已被删除。
收回 create 方法权限
在RxJava 1.x 最明显的问题就是由于 create 的太过开放,导致其被开发者滥用,而不是学习使用提供的操作符。并且用户对 RxJava 不够了解,导致各种各样的问题,如背压、异常处理等。
由于规范要求所有的操作符强制支持背压,因此新的 create 采用了保守的设计,让用户实现 FlowableOnSubscribe 接口,并选取背压策略,然后在内部实现封装支持背压,简单的例子如下:
Flowable.create(new FlowableOnSubscribe<Integer>()
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception
emitter.onNext(1);
emitter.onNext(2);
emitter.onComplete();
, BackpressureStrategy.BUFFER);
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
Functions可以抛出异常
不同于RxJava1.x,RxJava2.x中没有了一系列的Action/Func接口,取而代之的是与Java8命名类似的函数式接口,如下图:
而Consumer即消费者,用于接收单个值,BiConsumer则是接收两个值,Function用于变换对象,Predicate用于判断。这些接口命名大多参照了Java8,熟悉Java8新特性的应该都知道意思,这里也就不再赘述了。
public interface Consumer<T>
void accept(T t) throws Exception;
- 1
- 2
- 3
新的ActionX、FunctionX的方法声明都增加了一个throws Exception,这带来了显而易见的好处,现在我们可以这样写:
Flowable.just("qq.txt")
.map(new Function<String, Integer>()
@Override
public Integer apply(String value) throws Exception
File file = new File(value);
file.createNewFile();
return 99;
);
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
而createNewFile方法显式的抛出了一个IOException,而在以前是不可以这样写的。
Schedulers
在2.0的API中仍然支持主要的默认scheduler: computation, io, newThread 和 trampoline,可以通过io.reactivex.schedulers.Schedulers这个实用的工具类来调度。
2.0中不存在immediate 调度器。 它被频繁的误用,并没有正常的实现 Scheduler 规范;它包含用于延迟动作的阻塞睡眠,并且不支持递归调度。你可以使用Schedulers.trampoline()来代替它。
Schedulers.test()已经被移除,这样避免了默认调度器休息的概念差异。那些返回一个”global”的调度器实例是鉴于test()总是返回一个新的TestScheduler实例。现在我们鼓励测试人员使用这样简单的代码new TestScheduler()。
io.reactivex.Scheduler抽象类现在支持直接调度任务,不需要先创建然后通过Worker调度。
操作符的差别
2.0中大部分操作符仍然被保留,实际上大部分行为和1.x一样。
关于操作符,引用JakeWharton的总结就是:
All the same operators(you konw and love or hate and despise) are still there.
Transformer
RxJava 1.x 中Transformer实际上就是Func1<Observable,Observable>
,换句话说就是提供给他一个Observable它会返回给你另一个Observable,这和内联一系列操作符有着同等功效。
相关API如下:
public interface Transformer<T, R> extends Func1<Observable<T>, Observable<R>>
// cover for generics insanity
public interface Func1<T, R> extends Function
R call(T t);
- 1
- 2
- 3
- 4
- 5
- 6
- 7
实际操作下,写个方法,创建一个Transformer调度器:
//子线程运行,主线程回调
public Observable.Transformer<T, T> io_main(final RxAppCompatActivity context)
return new Observable.Transformer<T, T>()
@Override
public Observable<T> call(Observable<T> tObservable)
Observable<T> observable = (Observable<T>) tObservable
.subscribeOn(Schedulers.io())
.doOnSubscribe(new Action0()
@Override
public void call()
DialogHelper.showProgressDlg(context, mMessage);
)
.subscribeOn(AndroidSchedulers.mainThread())
.observeOn(AndroidSchedulers.mainThread())
.compose(RxLifecycle.bindUntilEvent(context.lifecycle(), ActivityEvent.STOP));
return observable;
;
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
上面这个方法出自本人的Community框架,用法和源码详见:RxHelper.java
在实际应用中,Transformer 经常和 Observable.compose() 一起使用。本人的Community框架也有使用,这里就不多介绍了。
在RxJava2.0中,Transformer划分的更加细致了,每一种“Observable”都对应的有自己的Transformer,相关API如下所示:
public interface ObservableTransformer<Upstream, Downstream>
ObservableSource<Downstream> apply(Observable<Upstream> upstream);
public interface CompletableTransformer
CompletableSource apply(Completable upstream);
public interface FlowableTransformer<Upstream, Downstream>
Publisher<Downstream> apply(Flowable<Upstream> upstream);
public interface MaybeTransformer<Upstream, Downstream>
MaybeSource<Downstream> apply(Maybe<Upstream> upstream);
public interface SingleTransformer<Upstream, Downstream>
SingleSource<Downstream> apply(Single<Upstream> upstream);
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
这里以FlowableTransformer为例,创建一个Transformer调度器:
//子线程运行,主线程回调
public FlowableTransformer<T, T> io_main(final RxAppCompatActivity context)
return new FlowableTransformer<T, T>()
@Override
public Publisher<T> apply(Flowable<T> flowable)
return flowable
.subscribeOn(Schedulers.io())
.doOnSubscribe(new Consumer<Subscription>()
@Override
public void accept(Subscription subscription) throws Exception
DialogHelper.showProgressDlg(context, mMessage);
)
.subscribeOn(AndroidSchedulers.mainThread())
.observeOn(AndroidSchedulers.mainThread())
.compose(RxLifecycle.<T, ActivityEvent>bindUntilEvent(context.lifecycle(), ActivityEvent.DESTROY));
;
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
上面这个方法出自本人的CommunityRxJava2框架,用法和源码详见:RxHelper.java
其他改变
doOnCancel/doOnDispose/unsubscribeOn
在1.x中,doOnUnsubscribe总是执行终端事件,因为SafeSubscriber调用了unsubscribe。这实际上是没有必要的。Reactive-Streams规范中,一个终端事件到达Subscriber,上游的Subscription会取消,因此调用 cancel()是一个空操作。
由于同样的原因unsubscribeOn也没被在终端路径上调用,但只有实际在链上调用cancel时,才会调用unsubscribeOn。
因此,下面的序列不会被调用
doOnCancel
Flowable.just(1,2,3)
.doOnCancel(new Action()
@Override
public void run() throws Exception
Log.e(TAG, " doOnCancel " );
)
.subscribe(new DisposableSubscriber<Integer>()
@Override
public void onNext(Integer integer)
Log.e(TAG, " onNext : " + integer);
@Override
public void onError(Throwable t)
@Override
public void onComplete()
Log.e(TAG, " onComplete isDisposed() = " + isDisposed());
);
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
输出结果如下:
onNext : 1
onNext : 2
onNext : 3
onComplete isDisposed() = false
- 1
- 2
- 3
- 4
然而,下面将会调用take操作符在传送过程中取消onNext
Flowable.just(1,2,3)
.doOnCancel(new Action()
@Override
public void run() throws Exception
Log.e(TAG, " doOnCancel " );
)
.take(2)
.subscribe(new DisposableSubscriber<Integer>()
@Override
public void onNext(Integer integer)
Log.e(TAG, " onNext : " + integer);
@Override
public void onError(Throwable t)
@Override
public void onComplete()
Log.e(TAG, " onComplete isDisposed() = " + isDisposed());
);
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
输出结果如下:
onNext : 1
onNext : 2
doOnCancel
onComplete isDisposed() = false
- 1
- 2
- 3
- 4
使用take操作符,调用了cancel方法,我们看一下take操作符的源码:
@CheckReturnValue
@BackpressureSupport(BackpressureKind.SPECIAL) // may trigger UNBOUNDED_IN
@SchedulerSupport(SchedulerSupport.NONE)
public final Flowable<T> take(long count)
if (count < 0)
throw new IllegalArgumentException("count >= 0 required but it was " + count);
return RxJavaPlugins.onAssembly(new FlowableTake<T>(this, count));
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
关键点就是这个FlowableTake类,这里限于篇幅的原因就不看源码了,大家可以自己看一下,然后找找是什么地方调用了cancel。
同样的,如果你需要在终端或者取消时执行清理,考虑使用using操作符代替。
以上就是RxJava2.0中的改动,下面我们重点介绍下RxJava2.0中的观察者模式。
RxJava2.0中的观察者模式
RxJava始终以观察者模式为骨架,在2.0中依然如此。
在RxJava2.0中,有五种观察者模式:
Observable/Observer
Flowable/Subscriber
Single/SingleObserver
Completable/CompletableObserver
Maybe/MaybeObserver
后面三种观察者模式差不多,Maybe/MaybeObserver
可以说是Single/SingleObserver
和Completable/CompletableObserver
的复合体。
下面列出这五个观察者模式相关的接口。
Observable/Observer
public abstract class Observable<T> implements ObservableSource<T>...
public interface ObservableSource<T>
void subscribe(Observer<? super T> observer);
public interface Observer<T>
void onSubscribe(Disposable d);
void onNext(T t);
void onError(Throwable e);
void onComplete();
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
Completable/CompletableObserver
//代表一个延迟计算没有任何价值,但只显示完成或异常。类似事件模式Reactive-Streams:onSubscribe(onError | onComplete)?
public abstract class Completable implements CompletableSource...
//没有子类继承Completable
public interface CompletableSource
void subscribe(CompletableObserver cs);
public interface CompletableObserver
void onSubscribe(Disposable d);
void onComplete();
void onError(Throwable e);
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
Flowable/Subscriber
public abstract class Flowable<T> implements Publisher<T>...
public interface Publisher<T>
public void subscribe(Subscriber<? super T> s);
public interface Subscriber<T>
public void onSubscribe(Subscription s);
public void onNext(T t);
public void onError(Throwable t);
public void onComplete();
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
Maybe/MaybeObserver
//Maybe类似Completable,它的主要消费类型是MaybeObserver顺序的方式,遵循这个协议:onSubscribe(onSuccess | onError | onComplete)
public abstract class Maybe<T> implements MaybeSource<T>...
public interface MaybeSource<T>
void subscribe(MaybeObserver<? super T> observer);
public interface MaybeObserver<T>
void onSubscribe(Disposable d);
void onSuccess(T t);
void onError(Throwable e);
void onComplete();
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
Single/SingleObserver
//Single功能类似于Observable,除了它只能发出一个成功的值,或者一个错误(没有“onComplete”事件),这个特性是由SingleSource接口决定的。
public abstract class Single<T> implements SingleSource<T>...
public interface SingleSource<T>
void subscribe(SingleObserver<? super T> observer);
public interface SingleObserver<T>
void onSubscribe(Disposable d);
void onSuccess(T t);
void onError(Throwable e);
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
其实从API中我们可以看到,每一种观察者都继承自各自的接口(都有一个共同的方法subscrib()),但是参数不一样),正是各自接口的不同,决定了他们功能不同,各自独立(特别是Observable和Flowable),同时保证了他们各自的拓展或者配套的操作符不会相互影响。
下面我们重点说说在实际开发中经常会用到的两个模式:Observable/Observer和Flowable/Subscriber。
Observable/Observer
Observable正常用法:
Observable.create(new ObservableOnSubscribe<Integer>()
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception
emitter.onNext(1);
emitter.onNext(2);
emitter.onComplete();
).subscribe(new Observer<Integer>()
@Override
public void onSubscribe(Disposable d)
@Override
public void onNext(Integer integer)
@Override
public void onError(Throwable e)
@Override
public void onComplete()
);
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
需要注意的是,这类观察模式不支持背压,下面我们具体分析下。
当被观察者快速发送大量数据时,下游不会做其他处理,即使数据大量堆积,调用链也不会报MissingBackpressureException,消耗内存过大只会OOM。
在测试的时候,快速发送了100000个整形数据,下游延迟接收,结果被观察者的数据全部发送出去了,内存确实明显增加了,遗憾的是没有OOM。
所以,当我们使用Observable/Observer的时候,我们需要考虑的是,数据量是不是很大(官方给出以1000个事件为分界线,供各位参考)。
Flowable/Subscriber
Flowable.range(0, 10)
.subscribe(new Subscriber<Integer>()
Subscription subscription;
//当订阅后,会首先调用这个方法,其实就相当于onStart(),
//传入的Subscription s参数可以用于请求数据或者取消订阅
@Override
public void onSubscribe(Subscription s)
Log.d(TAG, "onsubscribe start");
subscription = s;
subscription.request(1);
Log.d(TAG, "onsubscribe end");
@Override
public void onNext(Integer o)
Log.d(TAG, "onNext--->" + o);
subscription.request(3);
@Override
public void onError(Throwable t)
t.printStackTrace();
@Override
public void onComplete()
Log.d(TAG, "onComplete");
);
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
输出结果如下:
onsubscribe start
onNext--->0
onNext--->1
onNext--->2
onNext--->3
onNext--->4
onNext--->5
onNext--->6
onNext--->7
onNext--->8
onNext--->9
onComplete
onsubscribe end
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
Flowable是支持背压的,也就是说,一般而言,上游的被观察者会响应下游观察者的数据请求,下游调用request(n)来告诉上游发送多少个数据。这样避免了大量数据堆积在调用链上,使内存一直处于较低水平。
当然,Flowable也可以通过create()来创建:
Flowable.create(new FlowableOnSubscribe<Integer>()
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
, BackpressureStrategy.BUFFER);//指定背压策略
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
Flowable虽然可以通过create()
来创建,但是你必须指定背压的策略,以保证你创建的Flowable是支持背压的(这个在1.0的时候就很难保证,可以说RxJava2.0收紧了create()的权限)。
根据上面的代码的结果输出中可以看到,当我们调用subscription.request(n)
方法的时候,不等onSubscribe()
中后面的代码执行,就会立刻执行onNext方法,因此,如果你在onNext方法中使用到需要初始化的类时,应当尽量在subscription.request(n)
这个方法调用之前做好初始化的工作;
当然,这也不是绝对的,我在测试的时候发现,通过create()
自定义Flowable的时候,即使调用了subscription.request(n)
方法,也会等onSubscribe()
方法中后面的代码都执行完之后,才开始调用onNext。
平滑升级
RxJava1.x 如何平滑升级到RxJava2.0呢?
由于RxJava2.0变化较大无法直接升级,幸运的是,官方提供了RxJava2Interop这个库,可以方便地将RxJava1.x升级到RxJava2.0,或者将RxJava2.0转回RxJava1.x。
地址:https://github.com/akarnokd/RxJava2Interop
总结
可以明显的看到,RxJava2.0最大的改动就是对于backpressure的处理,为此将原来的Observable拆分成了新的Observable和Flowable,同时其他相关部分也同时进行了拆分。
除此之外,就是我们最熟悉和喜爱的RxJava。
再分享一下我老师大神的人工智能教程吧。零基础!通俗易懂!风趣幽默!还带黄段子!希望你也加入到我们人工智能的队伍中来!https://blog.csdn.net/jiangjunshow
字符串,那些你不知道的事(代码片段)
...遇到乱码的情况随便Google下就能找到解决方法,但是这样你不觉得有种被动的感觉嘛,我觉得和学习任何东西一样,学习编程首要是学习其思想,知道某事物为什么(why)要这么做,至于如何做(how)那只是前人提出的解决方案... 查看详情
你不知道的事---sringcloud的feign的继承特性(代码片段)
前言说起SpringChoud的feign大家用过的都说好。Feign是Netflix开发的声明式、模板化的HTTP客户端。对于我们微服务来说,微服务之间的api调用,使用feign来说是再方便不过的。本文先介绍一下,传统的feign的调用写法方式,再介绍我们... 查看详情
电子书关于区块链,你不知道的事
你一定听说过区块链?但却并不了解它的内涵。通过阅读电子书《什么是区块链》,你将了解数据软件和系统的发展历程、区块链作为分布式公共数据管理系统具有哪些特点、区块链名字的由来、以及比特币与区块链的... 查看详情
电子书关于区块链,你不知道的事
你一定听说过区块链?但却并不了解它的内涵。通过阅读电子书《什么是区块链》,你将了解数据软件和系统的发展历程、区块链作为分布式公共数据管理系统具有哪些特点、区块链名字的由来、以及比特币与区块链的... 查看详情
ios开发你不知道的事-编译&链接(代码片段)
对于平常的应用程序开发,我们很少需要关注编译和链接过程。我们平常Xcode开发就是集成的的开发环境(IDE),这样的IDE一般都将编译和链接的过程一步完成,通常将这种编译和链接合并在一起的过程称为构建,即使使用命令行... 查看详情
《你不知道的js(中卷)》关于this(代码片段)
一、关于this:一)、为什么要用this?functionidentity() returnthis.name.toUpperCase();varme= name:"Kyle";varyou= name:"Reader";identity.call(me);//Kyleidentity.call(you);//Reader? 观察上面的代码,使 查看详情
关于防御性编程,你应该知道的事(代码片段)
*请求处理通用类@NotBlank(message="参数str不能为空")privateStringstr;@NotNull(message="参数i不能为空")privateIntegeri;@Min(value=0,message="最小值不能小于0")privateintmin;@Max(value=100,message="最大值不能大约100")privateintmax;public 查看详情
关于linux之父,你不知道的6件事(代码片段)
👇👇关注后回复 “进群” ,拉你进程序员交流群👇👇如果让你现在说出三个程序员的名字,Linus很可能就在其中。作为世界上最著名的电脑程序员、黑客之一,LinusBenedictTorvalds写出了Linux内核1.0版&... 查看详情
可能你不知道的,关于自动装箱和自动拆箱(代码片段)
包装类我们知道,Java中包含了8种基本数据类型:整数类型:byte、short、int、long字符类型:char浮点类型:float、double布尔类型:boolean这8种基本数据类型的变量不需要使用new来创建,它们不会在堆上创建,而是直接在栈内存中存... 查看详情
关于数据标注行业有些你不知道的事儿(代码片段)
一位在觉醒向量做数据标注的员工说:正如别人评论的那样,我们的工作的确很像数字世界的建筑工人,但是我们在扮演着很重要的角色,因为没有我们,AI这栋摩天大楼就无法建成。大数据作为一个术语而言,其历史可能并不... 查看详情
你不知道的事javascript中用一种更先进的方式进行深拷贝:structuredclone(代码片段)
你是否知道,JavaScript中有一种原生的方法来做对象的深拷贝?本文我们要介绍的是structuredClone函数,它是内置在JavaScript运行时中的:constcalendarEvent=title:"Builder.ioConf",date:newDate(123),attendees:["Steve"]constcopied=str... 查看详情
这里有你不知道的关于linux中权限的知识(代码片段)
☀️我们在平常玩电脑的时候,经常会遇到这样的情况,系统报错:您无权限访问该资源。☀️ 这就是因为权限的原因而导致的问题,不仅在windows系统中会遇到这样的问题,在我们Linux系统中我们也... 查看详情
浅谈android多屏幕的事(代码片段)
...xff01;这么吊的功能(非N版本,非第三方也能实现,你不知道吧)摆在你面前,你不享用?不关注它是怎样实现的?你来,我就满足你的欲望!一部手机可以同时看片、聊天,还可以腾出一支手来撸==... 查看详情
ios开发-文件管理之多的是你不知道的事
郝萌主倾心贡献。尊重作者的劳动成果,请勿转载。假设文章对您有所帮助,欢迎给作者捐赠,支持郝萌主,捐赠数额任意,重在心意^_^ 我要捐赠: 点击捐赠Cocos2d-X源代码下载:点我传送游戏官方下载:http://dwz.cn/RwTjl游... 查看详情
深度解析人生重开模拟器?那些你不知道的事。
1.前言要说最近最火的小游戏,莫过于《人生重开模拟器》,可谓是铺天盖地,上至网页版,下至APP,甚至在小程序里也有无数仿品的存在。其原因是源码早已公开,因此很容易复制生产。简单介绍一下这... 查看详情
技术宅小伙:关于前端的那些你不知道的事
今天我想和大家聊一聊前端开发。前端开发就像一个魔术表演。我们将一堆代码变成一个美丽的世界,让用户惊叹不已。作为一名前端程序员,我常常觉得自己像一个魔术师,这个魔术的道具有点不一样,因为我... 查看详情
javascript线程解释(settimeout,setinterval你不知道的事)
原文:http://www.iamued.com/qianduan/1645.html今天看到这篇文章,学到了不少东西特此发出来和大家分享JavaScript的setTimeout与setInterval是两个很容易欺骗别人感情的方法,因为我们开始常常以为调用了就会按既定的方式执行,我想不少人都... 查看详情
css之你不知道的元素隐藏(代码片段)
...藏,大部分人会想到display:none和visbility:hidden;但是实际上关于元素隐藏还分多种情况,比如是否占据空间,隐藏之后是否可以触发点击事件等情况css元素隐藏种类 不占空间,无法点击: 1.display:none; 2.posit... 查看详情