1.x 和RxJava 2.x的主要区别,DoOnComplete — 注册一个

时间:2019-09-30 15:37来源:美高梅手机游戏网站
Android拾萃 - RxJava最简单的入门Android拾萃 - RxJava操作符列表和响应类型 传统的Java观察者模式可以参考此篇博客:Java观察者模式案例简析 这个页面展示的操作符可用于组合多个 Observab

Android拾萃 - RxJava最简单的入门Android拾萃 - RxJava操作符列表和响应类型

传统的Java观察者模式可以参考此篇博客:Java观察者模式案例简析

这个页面展示的操作符可用于组合多个 Observables。

本文首先会给出ReactiveX的核心操作符列表和对应的文档链接,后面会具体的介绍和实践所有的操作符。

RxJava 是基于Java的观察者模式开展的。构建被观察者(Observable/Flowable)、观察者(Observer/Subscriber),并通过建立两者的订阅关系实现观察,在事件的传递过程中可以对事件进行各种处理。

Delay — 延时发射 Observable 的结果。
DelaySubscription — 延时处理订阅请求。
DoOnEach — 注册一个动作,对 Observable 发射的每个数据项使用。
DoOnComplete — 注册一个动作,对正常完成的 Observable 使用。
DoOnError — 注册一个动作,对发生错误的 Observable 使用。
DoOnTerminate — 注册一个动作,对完成的 Observable 使用,无论是否发生错误。
DoOnSubscribe — 注册一个动作,在观察者订阅时使用。
DoOnUnsubscribe — 注册一个动作,在观察者取消订阅时使用。
Dematerialize — 将上面的结果逆转回一个 Observable
ObserveOn — 指定观察者观察 Observable 的调度器
Materialize — 将 Observable 转换成一个通知列表
Serialize — 强制一个 Observable 连续调用并保证行为正确
Subscribe — 操作来自 Observable 的发射物和通知。
SubscribeOn — 指定 Observable 执行任务的调度器。
TimeInterval — 定期发射数据。
Timeout - 对原始 Observable 的一个镜像,如果过了一个指定的时长仍没有发射数据,它会发一个错误通知。
Timestamp — 给 Observable 发射的每个数据项添加一个时间戳。

如果想实现自己的操作符,可以参考:实现自定义操作符

在rxjava 1.x、rxjava 2.x里,Observable是被观察者,Observer是观察者,正常逻辑是观察者通过subscribe订阅Observable的事件处理,当Observable发射事件时Observer接收数据。但为了保持流式API风格,观察者订阅被观察者的代码顺序设计有一些调整。如:

6.1 Delay

延迟一段指定的时间再发射来自 Observable 的请求。

手机美高梅游戏网址 1

Delay

RxJava 的实现是 delay 和 delaySubscription。不同之处在于 Delay 是延时数据的发射,而 DelaySubscription 是延时注册 Subscriber。

创建操作用于创建Observable的操作符Create — 通过调用观察者的方法从头创建一个ObservableDefer — 在观察者订阅之前不创建这个Observable,为每一个观察者创建一个新的ObservableEmpty/Never/Throw — 创建行为受限的特殊ObservableFrom — 将其它的对象或数据结构转换为ObservableInterval — 创建一个定时发射整数序列的ObservableJust — 将对象或者对象集合转换为一个会发射这些对象的ObservableRange — 创建发射指定范围的整数序列的ObservableRepeat — 创建重复发射特定的数据或数据序列的ObservableStart — 创建发射一个函数的返回值的ObservableTimer — 创建在一个指定的延迟之后发射单个数据的Observable

Observable.subscribe;

6.1.1 Delay

手机美高梅游戏网址 2

delay

示例代码:

final long currentTimeMillis = System.currentTimeMillis();
Observable.range(1, 2).delay(2000, TimeUnit.MILLISECONDS).subscribe(new Consumer<Integer>() {
    @Override
    public void accept(@NonNull Integer integer) throws Exception {
        if (integer == 1) {
            Log.e(TAG, "delay Time :" + (System.currentTimeMillis() - currentTimeMillis) + "");
        }
        Log.e(TAG, "accept:" + integer);
    }
});

输出结果:

delay Time :2408
accept:1
accept:2

变换操作这些操作符可用于对Observable发射的数据进行变换,详细解释可以看每个操作符的文档Buffer — 缓存,可以简单的理解为缓存,它定期从Observable收集数据到一个集合,然后把这些数据集合打包发射,而不是一次发射一个FlatMap — 扁平映射,将Observable发射的数据变换为Observables集合,然后将这些Observable发射的数据平坦化的放进一个单独的Observable,可以认为是一个将嵌套的数据结构展开的过程。GroupBy — 分组,将原来的Observable分拆为Observable集合,将原始Observable发射的数据按Key分组,每一个Observable发射一组不同的数据Map — 映射,通过对序列的每一项都应用一个函数变换Observable发射的数据,实质是对序列中的每一项执行一个函数,函数的参数就是这个数据项Scan — 扫描,对Observable发射的每一项数据应用一个函数,然后按顺序依次发射这些值Window — 窗口,定期将来自Observable的数据分拆成一些Observable窗口,然后发射这些窗口,而不是每次发射一项。类似于Buffer,但Buffer发射的是数据,Window发射的是Observable,每一个Observable发射原始Observable的数据的一个子集

在RxJava 2.x中的观察者模式有两种。而Flowable作为被观察者是专门支持背压的。这也是RxJava 1.x 和RxJava 2.x的主要区别。当然还有一些区别是操作符、接口的不兼容更新。

6.1.2 delaySubscription

手机美高梅游戏网址 3

delaySubscription

示例代码:

final long currentTimeMillis = System.currentTimeMillis();
Observable.range(1, 2).delaySubscription(2, TimeUnit.SECONDS).subscribe(new Consumer<Integer>() {
    @Override
    public void accept(@NonNull Integer aLong) throws Exception {
        if (aLong == 1) {
            Log.e(TAG, "delay Time :" + (System.currentTimeMillis() - currentTimeMillis) + "");
        }
        Log.e(TAG, "accept:" + aLong);
    }
});

输出结果:

delay Time :2500
accept:1
accept:2

手机美高梅游戏网址 ,过滤操作这些操作符用于从Observable发射的数据中进行选择Debounce — 只有在空闲了一段时间后才发射数据,通俗的说,就是如果一段时间没有操作,就执行一次操作Distinct — 去重,过滤掉重复数据项ElementAt — 取值,取特定位置的数据项Filter — 过滤,过滤掉没有通过谓词测试的数据项,只发射通过测试的First — 首项,只发射满足条件的第一条数据IgnoreElements — 忽略所有的数据,只保留终止通知(onError或onCompleted)Last — 末项,只发射最后一条数据Sample — 取样,定期发射最新的数据,等于是数据抽样,有的实现里叫ThrottleFirstSkip — 跳过前面的若干项数据SkipLast — 跳过后面的若干项数据Take — 只保留前面的若干项数据TakeLast — 只保留后面的若干项数据

  • Observable / Observer
  • Flowable / Subscriber

    手机美高梅游戏网址 4image.png

6.2 Do

注册一个动作作为原始 Observable 生命周期事件的一种占位符。

手机美高梅游戏网址 5

Do

Do 操作符就是给 Observable 的生命周期的各个阶段加上一系列的回调监听,当 Observable 执行到这个阶段的时候,这些回调就会被触发。
在 Rxjava2.0 中实现了很多的 do 操作符的变体。

组合操作组合操作符用于将多个Observable组合成一个单一的ObservableAnd/Then/When — 通过模式和计划组合两个或多个Observable发射的数据集CombineLatest — 当两个Observables中的任何一个发射了一个数据时,通过一个指定的函数组合每个Observable发射的最新数据,然后发射这个函数的结果Join — 无论何时,如果一个Observable发射了一个数据项,只要在另一个Observable发射的数据项定义的时间窗口内,就将两个Observable发射的数据合并发射Merge — 将两个Observable发射的数据组合并成一个StartWith — 在发射原来的Observable的数据序列之前,先发射一个指定的数据序列或数据项Switch — 将一个发射Observable序列的Observable转换为这样一个Observable:它逐个发射那些Observable最近发射的数据Zip — 打包,使用一个指定的函数将多个Observable发射的数据组合在一起,然后将这个函数的结果作为单项数据发射

RxJava1.x 平滑升级到RxJava2.x

由于RxJava2.0变化较大无法直接升级,幸运的是,官方提供了RxJava2Interop这个库,可以方便地将RxJava1.x升级到RxJava2.x,或者将RxJava2.x转回RxJava1.x。RxJava2Interop

 Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { e.onNext; e.onNext; e.onNext// ...省略很多在发射过程中的流式处理代码 .subscribe(new Observer<Integer>() { private Disposable mDisposable; @Override public void onSubscribe(Disposable d) { mDisposable = d; } @Override public void onNext(Integer integer) { Log.d("onNext", "" + integer); //新增的Disposable可以做到切断的操作,让Observer观察者不再接收上游事件。 if (integer == 3) { mDisposable.dispose(); Log.d("onNext", "已停止接收事件"); } } @Override public void onError(Throwable e) { } @Override public void onComplete;

打印结果:

11-28 13:00:42.195 29930-29930/? D/onNext: 111-28 13:00:42.195 29930-29930/? D/onNext: 211-28 13:00:42.195 29930-29930/? D/onNext: 311-28 13:00:42.195 29930-29930/? D/onNext: 已停止接收事件

6.2.1 doAfterNext

实现方法:doAfterNext(Consumer)
从上流向下流发射后被调用。

示例代码:

public static void demo_doAfterNext(){
    Observable<Integer> ob1 =  Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
            emitter.onNext(1);
            emitter.onNext(2);
            emitter.onNext(3);
            emitter.onComplete();
        }
    });
    ob1.doAfterNext(new Consumer<Integer>() {
        @Override
        public void accept(@NonNull Integer integer) throws Exception {
            Log.e(TAG,"doAfterNext="+integer);
        }
    }).subscribe(getNormalObserver());
}


public static Disposable mDisposable ;
//可重复使用
public static Observer<Integer> getNormalObserver(){
    return new Observer<Integer>() {
        @Override
        public void onSubscribe(@NonNull Disposable d) {
            mDisposable = d;
        }

        @Override
        public void onNext(@NonNull Integer integer) {
            Log.e(TAG,"normal,onNext:"+integer);
        }

        @Override
        public void onError(@NonNull Throwable error) {
            Log.e(TAG,"normal,Error: " + error.getMessage());
        }

        @Override
        public void onComplete() {
            Log.e(TAG,"normal,onComplete");
        }
    };
}

输出结果:

normal,onNext:1
doAfterNext : 1
normal,onNext:2
doAfterNext : 2
normal,onNext:3
doAfterNext : 3
normal,onComplete

错误处理这些操作符用于从错误通知中恢复Catch — 捕获,继续序列操作,将错误替换为正常的数据,从onError通知中恢复Retry — 重试,如果Observable发射了一个错误通知,重新订阅它,期待它正常终止

Rxjava 线程调度

subscribeOn() 指定的就是发射事件的线程,observerOn 指定的就是订阅者接收事件的线程。

多次指定发射事件的线程只有第一次指定的有效,也就是说多次调用 subscribeOn() 只有第一次的有效,其余的会被忽略。但多次指定订阅者接收线程是可以的,也就是说每调用一次 observerOn(),下游的线程就会切换一次。

1,Function<T, R> ——将输入的value类型T转换成输出的value类型R。通常结合Map操作符。

/** * A functional interface that takes a value and returns another value, possibly with a * different type and allows throwing a checked exception. * * @param <T> the input value type * @param <R> the output value type */public interface Function<T, R> { /** * Apply some calculation to the input value and return some other value. * @param t the input value * @return the output value * @throws Exception on error */ @NonNull R apply(@NonNull T t) throws Exception;}

2,Map——将一个Observable被观察者通过特定函数的执行,转换成另一种Observable被观察者。在 2.x 中和 1.x 中作用几乎一致,不同点在于:2.x 将 1.x 中的 Func1 和 Func2 改为了 Function 和 BiFunction。

 /** * Returns an Observable that applies a specified function to each item emitted by the source ObservableSource and emits the results of these function applications. * @SchedulerSupport(SchedulerSupport.NONE) public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) { ObjectHelper.requireNonNull(mapper, "mapper is null"); return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper)); }

3,Consumer——接收一个单独的数据,类似于一个简化版的观察者observer。

/** * A functional interface  that accepts a single value. * @param <T> the value type */public interface Consumer<T> { /** * Consume the given value. * @param t the value * @throws Exception on error */ void accept(@NonNull T t) throws Exception;}

4,distinct——去重操作符。即先有的数字保留,重复的数字去除并保留原先顺序的方式输出。

 Observable.just(2, 1, 2, 3, 4, 2, 3) .distinct() .subscribe(new Consumer<Integer>() { @Override public void accept(@NonNull Integer integer) throws Exception { Log.d("accept", "" + integer); } });

输出

11-28 11:59:17.511 7052-7052/com.zjrb.sjzsw D/accept: 211-28 11:59:17.511 7052-7052/com.zjrb.sjzsw D/accept: 111-28 11:59:17.511 7052-7052/com.zjrb.sjzsw D/accept: 311-28 11:59:17.511 7052-7052/com.zjrb.sjzsw D/accept: 4

5,concat—— 可以做到不交错的发射两个甚至多个 Observable 的发射事件,并且只有前一个 Observable 终止(onComplete) 后才会订阅下一个 Observable。比如可以采用 concat 操作符先读取缓存再通过网络请求获取数据。

案例说明:

 Observable observable = Observable.just(1, 2, 3, 4, 5, 6) .map(new Function<Integer, Integer>() { @Override public Integer apply(@NonNull Integer integer) throws Exception { return integer + 1; } }); Observable.concat(Observable.just(-1, -2, -3, -4, -5, -6), observable) .subscribe(new Consumer<Integer>() { @Override public void accept(@NonNull Integer integer) throws Exception { Log.d("accept", "" + integer); } });

美高梅手机游戏网站 ,打印输出:看吧,两个Observable是按照顺序依次无交错执行的。

11-29 01:48:50.430 31564-31564/com.zjrb.sjzsw D/accept: -111-29 01:48:50.430 31564-31564/com.zjrb.sjzsw D/accept: -211-29 01:48:50.430 31564-31564/com.zjrb.sjzsw D/accept: -311-29 01:48:50.430 31564-31564/com.zjrb.sjzsw D/accept: -411-29 01:48:50.430 31564-31564/com.zjrb.sjzsw D/accept: -511-29 01:48:50.430 31564-31564/com.zjrb.sjzsw D/accept: -611-29 01:48:50.430 31564-31564/com.zjrb.sjzsw D/accept: 211-29 01:48:50.430 31564-31564/com.zjrb.sjzsw D/accept: 311-29 01:48:50.430 31564-31564/com.zjrb.sjzsw D/accept: 411-29 01:48:50.430 31564-31564/com.zjrb.sjzsw D/accept: 511-29 01:48:50.431 31564-31564/com.zjrb.sjzsw D/accept: 611-29 01:48:50.431 31564-31564/com.zjrb.sjzsw D/accept: 7

注:熟悉操作符的目的在于,不同场景中都能随时想到有对应的工具可用。

背压:指在异步场景中,被观察者发送事件速度远快于观察者的处理速度的情况下,一种告诉上游的被观察者降低发送速度的策略。因为事件产生的速度远远快于事件消费的速度,最终导致数据积累越来越多,从而导致OOM等异常。这就是背压产生的必要性。

RxJava2.0中,Flowable是能够支持Backpressure的Observable,是对Observable的补充。所以Observable被观察者支持的API,Flowable也都支持,并且Flowable的API里也都强制支持背压。

6.2.2 doAfterTerminate

手机美高梅游戏网址 6

doAfterTerminate

实现方法: doAfterTerminate(Action)

注册一个 Action,当 Observable 调用 onComplete 或 onError 触发。

示例代码:

Observable<Integer> ob1 =  Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
        emitter.onNext(1);
        emitter.onNext(2);
        emitter.onComplete();
//                emitter.onError(new Throwable("nothingerro"));
    }
});
ob1.doAfterTerminate(new Action() {
    @Override
    public void run() throws Exception {
        Log.e(TAG,"doAfterTerminate run");
    }
}).subscribe(getNormalObserver());

输出结果:

normal,onNext:1
normal,onNext:2
normal,onComplete
doAfterTerminate run

辅助操作一组用于处理Observable的操作符Delay — 延迟一段时间发射结果数据Do — 注册一个动作占用一些Observable的生命周期事件,相当于Mock某个操作Materialize/Dematerialize — 将发射的数据和通知都当做数据发射,或者反过来ObserveOn — 指定观察者观察Observable的调度程序Serialize — 强制Observable按次序发射数据并且功能是有效的Subscribe — 收到Observable发射的数据和通知后执行的操作SubscribeOn — 指定Observable应该在哪个调度程序上执行TimeInterval — 将一个Observable转换为发射两个数据之间所耗费时间的ObservableTimeout — 添加超时机制,如果过了指定的一段时间没有发射数据,就发射一个错误通知Timestamp — 给Observable发射的每个数据项添加一个时间戳Using — 创建一个只在Observable的生命周期内存在的一次性资源

背压经典代码

Flowable.create(new FlowableOnSubscribe<Integer>() { @Override public void subscribe(FlowableEmitter<Integer> e) throws Exception { if (e.requested { for (int i = 0; i < 10; i++) { e.onNext; Log.d(TAG, "已发送" +  + "个——剩下" + e.requested; } e.onComplete(); } } }, BackpressureStrategy.LATEST) .subscribeOn(Schedulers.io .observeOn(AndroidSchedulers.mainThread .subscribe(new Subscriber<Integer>() { @Override public void onSubscribe(Subscription s) { subscription = s; } @Override public void onNext(Integer s) { Log.d(TAG, "接收 ——" + s); if { subscription.cancel(); } } @Override public void onError(Throwable t) { Log.d(TAG, "接收错误——" + t); } @Override public void onComplete;

外部调用subscription请求配合配额11个。

 if (subscription != null) { subscription.request; }

在BackpressureStrategy.LATEST背压策略下,上游发射10个事件,下游由外部调用请求发布配额指令,当下游接收到第9个事件时暂停上游发布(此操作会清空上游事件源)。

6.2.3 doFinally

实现方法: doFinally(Action onDispose)

当 Observable 调用 onError 或 onCompleted 之后调用指定的操作,或由下游处理。
doFinally 优先于 doAfterTerminate 的调用。

示例代码:

Observable<Integer> ob1 =  Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
        emitter.onNext(1);
        emitter.onNext(2);
        emitter.onComplete();
//      emitter.onError(new Throwable("nothingerro"));
    }
});
ob1.doFinally(new Action() {
    @Override
    public void run() throws Exception {
        Log.e(TAG,"doFinally run");
    }
}).subscribe(getNormalObserver());

输出结果:

normal,onNext:1
normal,onNext:2
normal,onComplete
doFinally run

条件和布尔操作v这些操作符可用于单个或多个数据项,也可用于ObservableAll — 判断Observable发射的所有的数据项是否都满足某个条件Amb — 给定多个Observable,只让第一个发射数据的Observable发射全部数据Contains — 判断Observable是否会发射一个指定的数据项DefaultIfEmpty — 发射来自原始Observable的数据,如果原始Observable没有发射数据,就发射一个默认数据SequenceEqual — 判断两个Observable是否按相同的数据序列SkipUntil — 丢弃原始Observable发射的数据,直到第二个Observable发射了一个数据,然后发射原始Observable的剩余数据SkipWhile — 丢弃原始Observable发射的数据,直到一个特定的条件为假,然后发射原始Observable剩余的数据TakeUntil — 发射来自原始Observable的数据,直到第二个Observable发射了一个数据或一个通知TakeWhile — 发射原始Observable的数据,直到一个特定的条件为真,然后跳过剩余的数据

背压策略

  • BackpressureStrategy.MISSING此策略下,上游发射的数据不做缓存也不丢弃,下游处理溢出的问题。简单说就是没有背压。
  • BackpressureStrategy.ERROR此策略下,在上游发射速度过快并超出下游接收速度时,抛出MissingBackpressureException异常。
  • BackpressureStrategy.BUFFER此策略下,把上游发射过来的所有数据全部缓存在缓存区,不做丢弃,待下游接收。
  • BackpressureStrategy.DROP此策略下,相当于一种令牌机制,下游通过request请求产生令牌给上游,上游接到多少令牌,就给下游发送多少数据。当令牌数消耗到0的时候,上游开始丢弃数据。BackpressureStrategy.LATEST的策略和此类似。
  • BackpressureStrategy.LATEST此策略和BackpressureStrategy.DROP的策略类似,但在令牌数为0的时候有一点微妙的区别:onBackpressureDrop直接丢弃数据,不缓存任何数据;而onBackpressureLatest则缓存最新的一条数据,这样当上游接到新令牌的时候,它就先把缓存的上一条“最新”数据发送给下游。
/** * Represents the options for applying backpressure to a source sequence. */public enum BackpressureStrategy { /** * OnNext events are written without any buffering or dropping. * Downstream has to deal with any overflow. * <p>Useful when one applies one of the custom-parameter onBackpressureXXX operators. */ MISSING, /** * Signals a MissingBackpressureException in case the downstream can't keep up. */ ERROR, /** * Buffers <em>all</em> onNext values until the downstream consumes it. */ BUFFER, /** * Drops the most recent onNext value if the downstream can't keep up. */ DROP, /** * Keeps only the latest onNext value, overwriting any previous value if the * downstream can't keep up. */ LATEST}

6.2.4 doOnDispose

手机美高梅游戏网址 7

doOnDispose

实现方法:doOnDispose(Action onDispose)

当 Observable 取消订阅时,它就会被调用。

示例代码:

Observable<Integer> ob1 =  Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {

        emitter.onNext(1);
        //mDisposable 参考6.2.1
        if (mDisposable != null) {
            mDisposable.dispose();
        }
        emitter.onNext(2);
        emitter.onComplete();
//                emitter.onError(new Throwable("nothingerro"));
    }
});
ob1.doOnDispose(new Action() {
    @Override
    public void run() throws Exception {
        Log.e(TAG,"doOnDispose run");
    }
}).subscribe(getNormalObserver());

输出结果:

normal,onNext:1
doOnDispose run

算术和聚合操作这些操作符可用于整个数据序列Average — 计算Observable发射的数据序列的平均值,然后发射这个结果Concat — 不交错的连接多个Observable的数据Count — 计算Observable发射的数据个数,然后发射这个结果Max — 计算并发射数据序列的最大值Min — 计算并发射数据序列的最小值Reduce — 按顺序对数据序列的每一个应用某个函数,然后返回这个值Sum — 计算并发射数据序列的和

Flowable案例代码

Flowable.create(new FlowableOnSubscribe<Integer>() { @Override public void subscribe(FlowableEmitter<Integer> e) throws Exception { e.onNext; e.onNext; e.onNext; e.onComplete(); e.onNext; } }, BackpressureStrategy.ERROR) //下面两行代码执行线程切换,达到异步效果// .subscribeOn(Schedulers.io// .observeOn(AndroidSchedulers.mainThread .subscribe(new Subscriber<Integer>() { @Override public void onSubscribe(Subscription s) { subscription = s;// subscription.request; } @Override public void onNext(Integer s) {// subscription.request; Log.d(TAG, "接收——" + s);// subscription.cancel(); } @Override public void onError(Throwable t) { Log.d(TAG, "接收错误——" + t); } @Override public void onComplete;

这里指定背压策略是BackpressureStrategy.ERROR,这种策略下执行此段代码会报如下错误。

D/rxjava2: 接收错误——io.reactivex.exceptions.MissingBackpressureException: create: could not emit value due to lack of requests

因为,上下游是同步的。上游发射了事件但是下游没有接收,就会造成阻塞(即便上游的事件队列长度只有3个 < 128)。为了避免ANR,就要提示MissingBackpressureException异常。

如果恢复第12、13行处理线程切换的代码,表示上下游位于不同线程,是异步状态。此种情形下,上游发射数据后就不会报MissingBackpressureException异常,但虽然上游能正常发射数据,下游同样接收不到数据。

这里涉及到一个知识点:

Flowable默认事件队列大小为128。BackpressureStrategy.BUFFER策略下事件队列无限大,和没有采取背压的Observable / Observer 类似了。

注:在处理同一组数据时,Observable / Observer 比BackpressureStrategy.BUFFER策略下的Flowable / Subscriber 性能更优,内存消耗更少。

在上下游异步的情况下,上游会先把事件发送到长度为128的事件队列中,待下游发送请求数据指令后从事件队列中拉取数据。这种“响应式拉取”的思想用于解决上下游流速不均衡的情况。

上述代码中,第19、24行代码是表示下游接收前、接收后发送请求配额指令给上游。也可以通过subscription.request;在外围调用发送n个请求配额给上游以获取数据。

6.2.5 doOnComplete

手机美高梅游戏网址 8

doOnComplete

当它产生的 Observable 正常终止调用 onCompleted 时会被调用。
Javadoc: doOnCompleted(Action)

示例代码:

Observable<Integer> ob1 = Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
        emitter.onNext(1);
        emitter.onNext(2);
//                emitter.onError(new Throwable("nothingerror"));
        emitter.onComplete();

    }
});
ob1.doOnComplete(new Action() {
    @Override
    public void run() throws Exception {
        Log.e(TAG, "doOnComplete run");
    }
}).subscribe(getNormalObserver());

输出结果:

normal,onNext:1
normal,onNext:2
doOnComplete run
normal,onComplete

编辑:美高梅手机游戏网站 本文来源:1.x 和RxJava 2.x的主要区别,DoOnComplete — 注册一个

关键词: