神刀安全网

RxJava Error Handling Operators

RxJava Error Handling Operators

简介

RxJava提供了一些操作函数去处理错误相关的情景,使用这些函数可以:

  • 吞掉这个错误,切换到一个备用的Observable继续发射数据
  • 吞掉这个错误然后发射默认值
  • 吞掉这个错误并立即尝试重启这个Observable
  • 吞掉这个错误,在等待一段时间后重启这个Observable

相关处理函数

处理错误相关的操作函数包括:

onErrorReturn

    public final Observable<T> onErrorReturn(Func1<java.lang.Throwable,? extends T> resumeFunction)

Instructs an Observable to emit an item (returned by a specified function) rather than invoking onError if it encounters an error.

一般出错时,会调用onError,然后结束.使用onErrorReturn,onError不会调用,而是发送onErrorReturn里面提供的值,然后onCompleted会被调用. 注意,紧随其后onCompleted会调用.

例子:

    Observable<String> observable = Observable.create(new OnSubscribe<String>(){             @Override             public void call(Subscriber<? super String> arg0) {                 arg0.onNext("a1");                 arg0.onNext("a2");                 arg0.onError(new Exception("testErrorReturn mock error"));             }          });          observable.onErrorReturn(new Func1<Throwable, String>() {              @Override             public String call(Throwable arg0) {                 Utils.println("onErrorReturn call");                 return "aError";             }         })         .subscribe(new Subscriber<String>() {              @Override             public void onCompleted() {                 Utils.println("onCompleted");             }              @Override             public void onError(Throwable arg0) {                 Utils.println("onError");             }              @Override             public void onNext(String arg0) {                 Utils.println("onResult:"+arg0);             }         }); 输出:     onResult:a1     onResult:a2     onErrorReturn call     onResult:aError     onCompleted

onErrorResumeNext

声明:

    public final Observable<T> onErrorResumeNext(final Observable<? extends T> resumeSequence) {     public final Observable<T> onErrorResumeNext(final Func1<Throwable, ? extends Observable<? extends T>> resumeFunction);

如果原Observable发生错误时,不调用onError,而是使用一个新的Observable,调用它的call函数(其实就是订阅它),里面onNext的值回传到subscriber,如果有调onCompleted,则subscriber会收到,如果没有则不会收到. 即后续的行为由新的Observable决定了.

上面两个函数的区别是第一个始终提供一个Observable,而第二个可以根据不同的Throwable提供不同的Observable.

例子:

        Observable<String> observable = Observable.create(new OnSubscribe<String>() {              @Override             public void call(Subscriber<? super String> arg0) {                 arg0.onNext("value1");                 arg0.onNext("value2");                 arg0.onNext("value3");                 arg0.onError(new Exception("mock testErrorResumeNext exception"));                 arg0.onNext("value4");             }         });         Observable<String> resumeObservable = Observable.create(new OnSubscribe<String>() {              @Override             public void call(Subscriber<? super String> arg0) {                 arg0.onNext("valueMock1");                 arg0.onNext("valueMock2");             }         });          observable.onErrorResumeNext(resumeObservable).subscribe(getSubscriber()); 输出:     onResult:value1     onResult:value2     onResult:value3     onResult:valueMock1     onResult:valueMock2

如果resumeObservable调用onCompleted,则onCompleted会被调用.

onExceptionResumeNext

    public final Observable<T> onExceptionResumeNext(final Observable<? extends T> resumeSequence);

Instructs an Observable to pass control to another Observable rather than invoking Observer#onError if it encounters an Exception.

和onErrorResumeNext类似,区别是onErrorResumeNext是针对onError的throwable,而onExceptionResumeNext仅仅是针对onError里的类型是Exception.

retry

if a source Observable emits an error, resubscribe to it in the hopes that it will complete without error.

如果Observable.onError调用,即发射错误,会重新订阅原来的Obserable,即重新触发Observable,也就是从新调它的call函数.这种方式数据会重复.

  1. public final Observable<T> retry():
    无限重试,直到成功.
  2. public final Observable<T> retry(final long count):
    指定次数重试,超过指定次数还没成功,则subscriber.onError会被调用.
  3. public final Observable<T> retry(Func2<Integer, Throwable, Boolean> predicate):
    传入一个Fun2,里面有两个入参,一个表示目前参数的次数,另一个就是Observable.onError里的类型,返回值true表示继续重试,false表示不重试,Subscriber.onError会被调用.
    也就是说只要Observable.onError被调用后,会调用Func2去判断是否要重试,Integer表示询问重试的次数,从1开始.

retry(count)例子:

    Observable<String> observable = Observable.create(new OnSubscribe<String>() {              @Override             public void call(Subscriber<? super String> arg0) {                 arg0.onNext("value1");                 arg0.onNext("value2");                 arg0.onNext("value3");                 arg0.onError(new Exception("mock testRetryCount exception"));                 arg0.onNext("value4");             }         });          observable.retry(1).subscribe(getSubscriber()); 输出:     onResult:value1     onResult:value2     onResult:value3     onResult:value1     onResult:value2     onResult:value3     onError

从上面可以看出,重试了一次,后面不重试,Subscriber.onError被调用.另外由于重试,数据重复了.

retry(Func2)例子:

    Observable<String> observable = Observable.create(new OnSubscribe<String>() {             @Override             public void call(Subscriber<? super String> arg0) {                 arg0.onNext("value1");                 arg0.onNext("value2");                 arg0.onNext("value3");                 arg0.onError(new Exception("mock testRetryFunc2 exception"));                 arg0.onNext("value4");             }         });          observable.retry(new Func2<Integer, Throwable,     Boolean>() {             @Override             public Boolean call(Integer arg0, Throwable arg1) {                 Utils.println("testRetryFunc2 func2::call time:"+arg0.intValue());                 return (arg0.intValue()>1)?false:true;             }         })         .subscribe(getSubscriber()); 输出:     onResult:value1     onResult:value2     onResult:value3     testRetryFunc2 func2::call time:1     onResult:value1     onResult:value2     onResult:value3     testRetryFunc2 func2::call time:2     onError

Integer从1开始,第一次重试,第二次返回false所以就不重试了,直接onError了.

retryWhen

    public final Observable<T> retryWhen(final Func1<? super Observable<? extends Throwable>, ? extends Observable<?>> notificationHandler);     public final Observable<T> retryWhen(final Func1<? super Observable<? extends Throwable>, ? extends Observable<?>> notificationHandler, Scheduler scheduler);

if a source Observable emits an error, pass that error to another Observable to determine whether to resubscribe to the source

retryWhen的输入参数是Func1,接受一个输入Observable<Throwable>, 返回输出参数Observable<?>。

返回的Observable<?>所要发送的事件决定了重试是否会发生:

  • 如果发送的是onCompleted或者onError事件,将不会触发重订阅。
  • 如果它发送onNext事件,则触发重订阅(不管onNext实际上是什么事件)。

这就是为什么使用了通配符作为泛型类型:这仅仅是个通知(next, error或者completed),一个很重要的通知而已。

可以理解为如果原Observable.onError发生, 如果Func1返回的Observable<?>的onNext被调用,则重新订阅原来的Observable重试,否则不重试.

输入的Observable(即Func1里Observable<?extends Throwable>)必须作为输出Observable(即Func1的返回值)的源。我们必须对Observable<Throwable>做出反应,然后基于它发送事件;不能只返回一个通用泛型流。

原Observable每次一调用onError(Throwable),Observable<Throwable>都会被作为输入传入方法中。换句话说就是,它的每一次调用我们都需要决定是否需要重订阅。

下面的例子会重试:

source.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {               @Override public Observable<?> call(Observable<? extends Throwable> errors) {                 return errors.flatMap(new Func1<Throwable, Observable<?>>() {                   @Override public Observable<?> call(Throwable error) {                     return Observable.timer(5, TimeUnit.SECONDS);                   }                 });               }             })

总结:

  1. onErrorReturn,吞掉错误,提供一个默认的返回值.
  2. onErrorResumeNext,吞掉错误,提供一个默认的Observable继续发送数据.
  3. onExceptionResumeNext,和onErrorResumeNext类似,区别是onErrorResumeNext是针对onError的throwable,而onExceptionResumeNext仅仅是针对onError里的类型是Exception.
  4. retry,吞掉错误,按条件重新订阅重试
  5. retryWhen,吞掉错误,由另外一个提供的Observable的行为决定是否重试,如果onNext被调用,则重新订阅重试,onError或者onComplete被调用,则不重试.

RxJava特有的异常

  • CompositeException:
    表示发生了多个异常。你可以使用异常的 getExceptions() 方法获取单独的异常。

  • MissingBackpressureException:
    表示一个订阅者或者操作符试图对一个不支持反压操作的Observable应用该操作。可以参考 [[Backpressure]] 查找针对没有实现反压操作的Observable的解决办法。

  • OnErrorFailedException:
    表示Observable尝试调用观察者的 onError() 方法,但是那个方法自己抛出了异常。

  • OnErrorNotImplementedException:
    表示一个Observable尝试调用它的观察者的onError() 方法,但是那个方法不存在。有多种方法可以消除这个错误:可以调整Observable使它不会到达这个错误条件,也可以在观察者中实现一个onError 处理器, 或者使用其它的操作符在错误到达之前拦截这个 onError 通知。

  • OnErrorThrowable:
    观察者们可以用这种形式传递异常给它们的观察者们的 onError() 方法。相比标准的Throwable,这种Throwable包含更多的信息:错误本身和在错误发生时Observable的内部状态。

Reference

  1. Error Handling Operators
  2. RxJava错误处理
  3. RxJava操作符(五)Error Handling
  4. RxJava 教程第三部分:驯服数据流之 高级错误处理

转载本站任何文章请注明:转载至神刀安全网,谢谢神刀安全网 » RxJava Error Handling Operators

分享到:更多 ()

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址