神刀安全网

RxJava部分操作符解析

今天我们来看部分RxJava相关的知识,主要是上一篇RxJava内存泄漏的一种解决方案提到的开源框架RxLifecycle里面会涉及到的知识点,有下面几个:

1 Subject
2 takeUntil
3 filter
4 compose

1.Subject

从代码可以看出来Subject既可以当观察者也可以当被观察者。

public abstract class Subject<T> extends Observable<T> implements Observer<T> 

所以可以在生命周期中通过Subject发送事件然后又自己接收,从而根据事件类型做相应的操作。

Subject总共有四种类型

1 AsyncSubject
2 BehaviorSubject
3 PublishSubject
4 ReplaySubject

今天我们就说下第二种类型BehaviorSubject,它可以给订阅者发送订阅前最近的事件和订阅后发送的事件:

RxJava部分操作符解析

BehaviorSubject Rx.PNG

图中橙色的就是订阅前最近发送的事件,在订阅后也可以收到。文字解释始终太苍白,我们来看下代码:

BehaviorSubject<Integer> behaviorSubject = BehaviorSubject.create();         behaviorSubject.onNext(1);         behaviorSubject.onNext(2);         behaviorSubject.subscribe(new Consumer<Integer>() {             @Override             public void accept(Integer integer) throws Exception {                 Timber.tag(TAG).d("running num : " + integer);             }         });         behaviorSubject.onNext(3);         behaviorSubject.onNext(4); 

上面代码运行结果就是收到2, 3,4

RxJava部分操作符解析

behaviorSubject.PNG

2.takeUntil

这是一个操作符,可以这样用

AObservable.takeUntil(BObservable) 

可以AObservable监听另外一个BObservable,如果BObservable开始发送数据,AObservable就不再发送数据。
看一下官方的图片解释,B发送0数据后,A就停止发送数据了。

RxJava部分操作符解析

takeUntil.PNG

talk is cheap, show me the code:

Observable.interval(1, TimeUnit.SECONDS).            subscribeOn(Schedulers.io()).            takeUntil(Observable.timer(5, TimeUnit.SECONDS)).            subscribe(new Consumer<Long>() {                 @Override                 public void accept(Long num) throws Exception {                     Timber.tag(TAG).d("running num : " + num);                 }             }); 

上面代码的意思就是从0开发每隔1秒发送一个数据,5s时停止发送,看下运行结果,和我们的预期完美一致:

RxJava部分操作符解析

takeUntil Result.PNG

3.filter

filter操作符就是过滤的意思,只有事件满足过滤条件时被观察者才会发送给观察者。看下官方的解释图,很清晰明了我就不做解释了哈。

RxJava部分操作符解析

filter.PNG

看一下怎么用,这个代码的意思还是每个1s发送数据,但是会进行过滤只发送偶数,也是5秒后停止发送:

Observable.interval(1, TimeUnit.SECONDS).                 subscribeOn(Schedulers.io()).                 filter(new Predicate<Long>() {                     @Override                     public boolean test(Long aLong) throws Exception {                         return aLong % 2 == 0;                     }                 }).                 takeUntil(Observable.timer(5, TimeUnit.SECONDS)).                 subscribe(new Consumer<Long>() {                     @Override                     public void accept(Long num) throws Exception {                         Timber.tag(TAG).e("running num : " + num);                     }                 }); 

上面代码的运行效果,确实是只收到了偶数。

RxJava部分操作符解析

filter result.PNG

4.compose

compose操作符是用来对Observable进行转换操作的,并且可以保证调用链不被破坏。
比如我们经常这样用:

Observable.interval(1,TimeUnit.SECONDS)                 .subscribeOn(Schedulers.io()).                 observeOn(AndroidSchedulers.mainThread()); 

这部分代码经常写,怎么进行封装呢?可能有的小伙伴立马就想到下面的方法:

private Observable composeObservable(Observable observable){         return observable.subscribeOn(Schedulers.io()).                         observeOn(AndroidSchedulers.mainThread()); } 

但是上面这样用就破坏了调用链了,因为你肯定得这样调用,这样就会变得怪怪的,不是Observable开头了,变成函数开头。

composeObservable(Observable.interval(1,TimeUnit.SECONDS)).subscribe(new Consumer<Long>() {             @Override             public void accept(Long aLong) throws Exception {             } }); 

这个问题用compose就可以完美解决:

Observable.interval(1, TimeUnit.SECONDS).                 compose(bindUntil(5)).                 subscribe(new Consumer<Long>() {                     @Override                     public void accept(Long num) throws Exception {                         Timber.tag(TAG).d("running num : " + num);                     }  });  private ObservableTransformer<Long, Long> bindUntil(final long deleyTime) {         return new ObservableTransformer<Long, Long>() {             @Override             public ObservableSource<Long> apply(Observable<Long> upstream) {                 return upstream.subscribeOn(Schedulers.io()).takeUntil(Observable.timer(deleyTime, TimeUnit.SECONDS));             }         }; } 

5.总结

上面的内容是假定大家有一点点RxJava的知识的,没有涉及到基本的使用。本次分享可能看起来毫无章法哈,其实还是有针对目的的,就是前面提到的开源框架RxLifecycle,这次分享就是针对里面用到的RxJava的一些知识点进行解析。RxJava的操作符挺多的,也不太可能也没必要一个个进行分析,用到的时候进行查找官方文档就可以了。

下面会用前面提到的这些知识点来自己实现一个类似于RxLifecycle的小Demo,欢迎大家关注和点赞哈。

最后感谢@右倾倾的理解和支持哈。

以上!

转载本站任何文章请注明:转载至神刀安全网,谢谢神刀安全网 » RxJava部分操作符解析

分享到:更多 ()

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址