神刀安全网

打造属于自己的RxBus


RxBus

通过RxJava实现Rxbus。

相信大家已经非常熟悉EventBus了。最近正在学习Rxjava,如果在项目中已经使用了Rxjava,使用RxBus来代替EventBus应该是不错的选择。

RxJava最核心的两个东西是Observables(被观察者,事件源)和Subscribers(观察者)。Observables发出一系列事件,Subscribers处理这些事件。

RxBus工作原理

直接看代码

Note that it is important to subscribe to the exact same rxBus instance that was used to post the events

采用单例模式来保证rxBus对象一致

public class RxBus {      private static RxBus rxBus;     private final Subject<Object, Object> _bus = new SerializedSubject<>(PublishSubject.create());      private RxBus() {     }      public static RxBus getInstance() {         if (rxBus == null) {             synchronized (RxBus.class) {                 if (rxBus == null) {                     rxBus = new RxBus();                 }             }         }         return rxBus;     }       public void send(Object o) {         _bus.onNext(o);     }      public Observable<Object> toObserverable() {         return _bus;     } }

Activity中发送事件

public void sendTap(View view){             RxBus.getInstance().send(new TapEvent()); }  public void sendOther(View view){         RxBus.getInstance().send(new OtherEvent()); }

Fragment中接收事件

 RxBus.getInstance().toObserverable()                 .subscribe(new Action1<Object>() {                     @Override                     public void call(Object o) {                         if (o instanceof TapEvent) {                             textView.setText("tap");                         } else if (o instanceof OtherEvent) {                             textView.setText("other");                         }                     }                 });

效果

打造属于自己的RxBus

效果图

以上就是使用Rxjava简单实现RxBus的功能,当然这还远远不够

RxBus升级

在具体使用过程中总会碰到各种各样的问题

场景1
我在上一个项目中实现了无限轮播的功能,并且希望轮播图在用户滑动、不可见、以及程序在后台休眠时都停止滚动,这时候就希望EventBus及时的传递这3种状态,为此我需要写slideEvent、visibleEvent、aliveEvent3个类,虽然他们都需要传递一个简单的Boolen值。

解决方案
创建一个Event“管家”
类似key-value的方式,每个事件都有自己的唯一的Code,接收事件时根据Code返回对应的content

public class Events<T> {      //所有事件的CODE     public static final int TAP = 1; //点击事件     public static final int OTHER = 21; //其它事件      //枚举     @IntDef({TAP, OTHER})     @Retention(RetentionPolicy.SOURCE)     public @interface EventCode {}       public @Events.EventCode int code;     public T content;      public static <O> Events<O> setContent(O t) {         Events<O> events = new Events<>();         events.content = t;         return events;     }      public <T> T getContent() {         return (T) content;     }  }

场景2
怎么又内存泄漏了?

每个人在开发过程中,或多或少都会碰到内存泄漏的的问题,我一直有一个天真的想法,RxJava那么牛逼,是不是能无声无息地就能解决内存泄漏的问题了,答案是否定的。

我看了不少有关RxJava的文章,都会提到
一定要记得在生命周期结束的地方取消订阅事件,防止RxJava可能会引起的内存泄漏问题。

你可以

@Overrideprotected void onDestroy() {          super.onDestroy();          if(!rxSubscription.isUnsubscribed()) {                  rxSubscription.unsubscribe();          } }

又或者

使用CompositeSubscription把 Subscription 收集到一起,方便 Activity(基类) 销毁时取消订阅,防止内存泄漏。

前者可以在任一生命周期阶段取消订阅,缺点是每个acivity/fragment都要重写方法。

后者可以写在BaseActivity(大家都不会陌生),每个activity都能用,缺点是不够灵活。

以上两种方法似乎都欠缺点意思,所幸Rx家族”人丁兴旺“,早已想好了解决方案
RxLifecycle

一、bindToLifecycle()方法
在子类使用Observable中的compose操作符,调用,完成Observable发布的事件和当前的组件绑定,实现生命周期同步。从而实现当前组件生命周期结束时,自动取消对Observable订阅。

 Observable.interval(1, TimeUnit.SECONDS)         .compose(this.bindToLifecycle())             .subscribe(new Action1<Long>() {                  @Override                 public void call(Long num) {                     Log.i(TAG, "  " +num);                 }             });

二、bindUntilEvent() 方法
使用ActivityEvent类,其中的CREATE、START、 RESUME、PAUSE、STOP、 DESTROY分别对应生命周期内的方法。使用bindUntilEvent指定在哪个生命周期方法调用时取消订阅。

public enum ActivityEvent {      CREATE,     START,     RESUME,     PAUSE,     STOP,     DESTROY  }
public enum FragmentEvent {      ATTACH,     CREATE,     CREATE_VIEW,     START,     RESUME,     PAUSE,     STOP,     DESTROY_VIEW,     DESTROY,     DETACH  }

组装零件

public class RxBus {      private static RxBus rxBus;     private final Subject<Events<?>, Events<?>> _bus = new SerializedSubject<>(PublishSubject.<Events<?>>create());      private RxBus(){}      public static RxBus getInstance(){         if (rxBus == null){             synchronized (RxBus.class){                 if (rxBus == null){                     rxBus = new RxBus();                 }             }         }         return rxBus;     }      public void send(Events<?> o) {         _bus.onNext(o);     }      public void send(@Events.EventCode int code, Object content){         Events<Object> event = new Events<>();         event.code = code;         event.content = content;         send(event);     }      public Observable<Events<?>> toObservable() {         return _bus;     }      public static SubscriberBuilder with(FragmentLifecycleProvider provider){         return new SubscriberBuilder(provider);     }      public static SubscriberBuilder with(ActivityLifecycleProvider provider){         return new SubscriberBuilder(provider);     }       public static class SubscriberBuilder{          private FragmentLifecycleProvider mFragLifecycleProvider;         private ActivityLifecycleProvider mActLifecycleProvider;         private FragmentEvent mFragmentEndEvent;         private ActivityEvent mActivityEndEvent;         private int event;         private Action1<? super Events<?>> onNext;         private Action1<Throwable> onError;          public SubscriberBuilder(FragmentLifecycleProvider provider) {             this.mFragLifecycleProvider = provider;         }          public SubscriberBuilder(ActivityLifecycleProvider provider){             this.mActLifecycleProvider = provider;         }          public SubscriberBuilder setEvent(@Events.EventCode int event){             this.event = event;             return this;         }          public SubscriberBuilder setEndEvent(FragmentEvent event){             this.mFragmentEndEvent = event;             return this;         }          public SubscriberBuilder setEndEvent(ActivityEvent event){             this.mActivityEndEvent = event;             return this;         }          public SubscriberBuilder onNext(Action1<? super Events<?>> action){             this.onNext = action;             return this;         }          public SubscriberBuilder onError(Action1<Throwable> action){             this.onError = action;             return this;         }           public void create(){             _create();         }          public Subscription _create(){             if (mFragLifecycleProvider!=null){                 return RxBus.getInstance().toObservable()                         .compose(mFragmentEndEvent == null ? mFragLifecycleProvider.bindToLifecycle() :mFragLifecycleProvider.<Events<?>>bindUntilEvent(mFragmentEndEvent)) // 绑定生命周期                         .filter(new Func1<Events<?>, Boolean>() {                             @Override                             public Boolean call(Events<?> events) {                                 return events.code == event;                             }                         })   //过滤 根据code判断返回事件                         .subscribe(onNext, onError == null ? new Action1<Throwable>() {                             @Override                             public void call(Throwable throwable) {                                 throwable.printStackTrace();                             }                         } : onError);             }             if (mActLifecycleProvider!=null){                 return RxBus.getInstance().toObservable()                         .compose(mActivityEndEvent == null ? mActLifecycleProvider.bindToLifecycle() :mActLifecycleProvider.<Events<?>>bindUntilEvent(mActivityEndEvent))                         .filter(new Func1<Events<?>, Boolean>() {                             @Override                             public Boolean call(Events<?> events) {                                 return events.code == event;                             }                         })                         .subscribe(onNext, onError == null ? (Action1<Throwable>) new Action1<Throwable>() {                             @Override                             public void call(Throwable throwable) {                                 throwable.printStackTrace();                             }                         } : onError);             }             return null;         }     } }

新BUS上路

依然使用前面的例子

Activity中发送事件

 public void sendTap(View view){         RxBus.getInstance().send(Events.TAP, "Tap传了一个String");     }      public void sendOther(View view){         RxBus.getInstance().send(Events.OTHER, null); //        RxBus.getInstance().send(Events.OTHER, new OtherEvent("Cloud", 25));     }

Fragment中接收事件

fragment需要继承RxLifecycle对应组件

public class BlankFragment extends RxFragment {}
  RxBus.with(this)                 .setEvent(Events.TAP) //                .setEndEvent(FragmentEvent.DESTROY_VIEW) //不设置默认与fragment生命周期同步                 .onNext(new Action1<Events<?>>() {                     @Override                     public void call(Events<?> events) {                         String content = events.getContent();                         textView.setText(content);                     }                 })                 .create();          RxBus.with(this)                 .setEvent(Events.OTHER)                 .setEndEvent(FragmentEvent.DESTROY_VIEW) //不设置默认与fragment生命周期同步                 .onNext(new Action1<Events<?>>() {                     @Override                     public void call(Events<?> events) {                         OtherEvent event = events.getContent();                         textView.setText("Name: "  + event.getName() + ",Age: "+ event.getAge());                     }                 })                 .onError(new Action1<Throwable>() {                     @Override                     public void call(Throwable throwable) {                         textView.setText(throwable.toString());                     }                 }) // 异常处理,默认捕获异常,不做处理,程序不会crash。                 .create();

效果

打造属于自己的RxBus

打造属于自己的RxBus

normal.gif

完整代码,请移步

参考资料

转载本站任何文章请注明:转载至神刀安全网,谢谢神刀安全网 » 打造属于自己的RxBus

分享到:更多 ()

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址