RxJava


前言

Linus Benedict Torvalds : RTFSC – Read The Fucking Source Code

概括

RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.
RxJava 是一个被设计实现为灵活扩展功能的 java 虚拟机:一个由异步和事件组成的观测序列为基础的程序库。

It extends the observer pattern to support sequences of data/events and adds operators that allow you to compose sequences together declaratively while abstracting away concerns about things like low-level threading, synchronization, thread-safety and concurrent data structures.
它扩展了观察者模式来支持data/events序列和增加了operators来允许你把序列以声明方式组成在一起并一边将关注的事情抽象化,比如低线程,同步,线程安全和并发数据结构。

RxJava 是一个响应式编程框架,采用观察者设计模式。

简单使用

Step 1:创建Observer

Observer observer = new Observer() {    @Override    public void onNext(T s) {}    @Override    public void onCompleted() {}    @Override    public void onError(Throwable e) {}};

除了Observer,还有一个内置的Subscriber类实现了Observer的抽象类。Subscriber类对Observer类进行了扩展。
它们的实现基本上是一样的,对于区别,主要有这两点:

  1. onStart(): 这是 Subscriber 增加的方法。在 subscribe 刚开始,而事件还未发送之前被调用。
  2. unsubscribe(): 这是 Subscriber 所实现的另一个接口 Subscription 的方法,用于取消订阅。

Step 2:创建Observable

Observable observable = Observable.create(new Observable.OnSubscribe() {    @Override    public void call(Subscriber<? super T> subscriber) {        subscriber.onNext(T);        subscriber.onCompleted();    }});

RxJava 还提供了其它快速创建事件队列的方法:

Observable observable = Observable.just("Hello", "Hi", "Aloha");String[] words = {"Hello", "Hi", "Aloha"};Observable observable = Observable.from(words);

Step 3:Subscribe

observable.subscribe(observer);// 或者:observable.subscribe(subscriber);

Observable.subscribe(Subscriber) 的内部实现:

subscriber.onStart();onSubscribe.call(subscriber);

在接收到订阅后马上会开始发送事件。

不完整定义回调

subscribe() 还支持不完整定义的回调,RxJava 会自动根据定义创建出 Subscriber 。

Action1 onNextAction = new Action1() {    // onNext()    @Override    public void call(String s) {        Log.d(tag, s);    }};Action1 onErrorAction = new Action1() {    // onError()    @Override    public void call(Throwable throwable) {        // Error handling    }};Action0 onCompletedAction = new Action0() {    // onCompleted()    @Override    public void call() {        Log.d(tag, "completed");    }};// 自动创建 Subscriber ,并使用 onNextAction 来定义 onNext()observable.subscribe(onNextAction);// 自动创建 Subscriber ,并使用 onNextAction 和 onErrorAction 来定义 onNext() 和 onError()observable.subscribe(onNextAction, onErrorAction);// 自动创建 Subscriber ,并使用 onNextAction、 onErrorAction 和 onCompletedAction 来定义 onNext()、 onError() 和 onCompleted()observable.subscribe(onNextAction, onErrorAction, onCompletedAction);

使用例子

// 将字符串数组 names 中的所有字符串依次打印出来:String[] names = ...;Observable.from(names)    .subscribe(new Action1() {        @Override        public void call(String name) {            Log.d(tag, name);        }    });

Scheduler 线程控制

在不指定线程的情况下, RxJava 遵循的是线程不变的原则,即:在哪个线程调用 subscribe(),就在哪个线程生产事件;在哪个线程生产事件,就在哪个线程消费事件。如果需要切换线程,就需要用到 Scheduler (调度器)。

Observable.just(1, 2, 3, 4)    .subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程    .observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程    .subscribe(new Action1() {        @Override        public void call(Integer number) {            Log.d(tag, "number:" + number);        }    });
  1. subscribeOn(): 指定 subscribe() 所发生的线程,即 Observable.OnSubscribe 被激活时所处的线程。或者叫做事件产生的线程。
  2. observeOn(): 指定 Subscriber 所运行在的线程。或者叫做事件消费的线程。

这段代码就是说,由 subscribeOn(Schedulers.io()) 的指定,创建的事件内容(1,2,3,4)将由io线程发出,而由 observeOn(AndroidScheculers.mainThread()) 的指定,subscriber 数字的 Log 将发生在主线程。

进阶使用

Operators 操作符

Operators在消息发送者Observable和消息消费者Subscriber之间起到操纵消息的作用。

map 操作符

Observable.just("Hello, world!")    .map(new Func1() {        @Override        public String call(String s) {            return s + " -Dan";        }    })    .subscribe(s -> System.out.println(s));

当我们在消息的过程中对消息进行转换,最简单的方法就是使用 map 操作符。
map()函数的特点是:它不一定发送和原始的Observable一样的数据类型。

flatMap 操作符

Observable> query(String text);query("a lot text")    .flatMap(new Func1, Observable>() {        @Override        public Observable call(ArrayList s) {            return Observable.from(s);        }    })    .subscribe(new Action1() {        @Override        public void call(String s) {            System.out.println(s);        }    });

利用 flatMap 操作符可以很方便的将一个 list 的字符串分开发出来,在 faltMap 函数内的 Func1跟之前的 Action1 意思一样,并且返回值是一个新的 Observable 对象。

至于更多的操作符会在未来添加。

最后在说下RxJava 的实际使用意义

  • Observable 和 Subscriber 他们本身就是一个处理事件的通用框架。
  • Observable 和 Subscriber 在它们之间的一系列转换步骤是相互独立的。

更多相关文章

  1. Handler+Thread+Message模式 Android线程网络
  2. Android(安卓)工具类
  3. Android监听键盘事件
  4. android 多线程下载文件案例
  5. Android(安卓)SD 卡上创建 SQLite 数据库
  6. TextView双击事件
  7. android_service_totoal
  8. Android栗子の双击事件
  9. Android(安卓)adb monkey 测试命令

随机推荐

  1. android studio 导入工程慢
  2. Android Studio删除工程里面无用的代码和
  3. MVVMArchitecture,一款可配置的 MVVM 框架
  4. Android 微信分享,无需那么麻烦。
  5. android中的多线程基础问题
  6. Android(安卓)Data Binding
  7. 终于来了!耗时268天,7大模块、2983页58万字
  8. Android(安卓)MVP 实践 Dagger + activit
  9. Android Binder AIDL解析
  10. Android 个人开发者接入支付功能