Android(安卓)浅析 RxJava (一) 使用
RxJava
前言
Linus Benedict Torvalds : RTFSC – Read The Fucking Source Code
概括
RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.
RxJava 是一个被设计实现为灵活扩展功能的 java 虚拟机:一个由异步和事件组成的观测序列为基础的程序库。
It extends the observer pattern to support sequences of data/events and adds operators that allow you to compose sequences together declaratively while abstracting away concerns about things like low-level threading, synchronization, thread-safety and concurrent data structures.
它扩展了观察者模式来支持data/events序列和增加了operators来允许你把序列以声明方式组成在一起并一边将关注的事情抽象化,比如低线程,同步,线程安全和并发数据结构。
RxJava 是一个响应式编程框架,采用观察者设计模式。
简单使用
Step 1:创建Observer
Observer observer = new Observer() { @Override public void onNext(T s) {} @Override public void onCompleted() {} @Override public void onError(Throwable e) {}};
除了Observer,还有一个内置的Subscriber类实现了Observer的抽象类。Subscriber类对Observer类进行了扩展。
它们的实现基本上是一样的,对于区别,主要有这两点:
- onStart(): 这是 Subscriber 增加的方法。在 subscribe 刚开始,而事件还未发送之前被调用。
- unsubscribe(): 这是 Subscriber 所实现的另一个接口 Subscription 的方法,用于取消订阅。
Step 2:创建Observable
Observable observable = Observable.create(new Observable.OnSubscribe() { @Override public void call(Subscriber<? super T> subscriber) { subscriber.onNext(T); subscriber.onCompleted(); }});
RxJava 还提供了其它快速创建事件队列的方法:
Observable observable = Observable.just("Hello", "Hi", "Aloha");String[] words = {"Hello", "Hi", "Aloha"};Observable observable = Observable.from(words);
Step 3:Subscribe
observable.subscribe(observer);// 或者:observable.subscribe(subscriber);
Observable.subscribe(Subscriber) 的内部实现:
subscriber.onStart();onSubscribe.call(subscriber);
在接收到订阅后马上会开始发送事件。
不完整定义回调
subscribe() 还支持不完整定义的回调,RxJava 会自动根据定义创建出 Subscriber 。
Action1 onNextAction = new Action1() { // onNext() @Override public void call(String s) { Log.d(tag, s); }};Action1 onErrorAction = new Action1() { // onError() @Override public void call(Throwable throwable) { // Error handling }};Action0 onCompletedAction = new Action0() { // onCompleted() @Override public void call() { Log.d(tag, "completed"); }};// 自动创建 Subscriber ,并使用 onNextAction 来定义 onNext()observable.subscribe(onNextAction);// 自动创建 Subscriber ,并使用 onNextAction 和 onErrorAction 来定义 onNext() 和 onError()observable.subscribe(onNextAction, onErrorAction);// 自动创建 Subscriber ,并使用 onNextAction、 onErrorAction 和 onCompletedAction 来定义 onNext()、 onError() 和 onCompleted()observable.subscribe(onNextAction, onErrorAction, onCompletedAction);
使用例子
// 将字符串数组 names 中的所有字符串依次打印出来:String[] names = ...;Observable.from(names) .subscribe(new Action1() { @Override public void call(String name) { Log.d(tag, name); } });
Scheduler 线程控制
在不指定线程的情况下, RxJava 遵循的是线程不变的原则,即:在哪个线程调用 subscribe(),就在哪个线程生产事件;在哪个线程生产事件,就在哪个线程消费事件。如果需要切换线程,就需要用到 Scheduler (调度器)。
Observable.just(1, 2, 3, 4) .subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程 .observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程 .subscribe(new Action1() { @Override public void call(Integer number) { Log.d(tag, "number:" + number); } });
- subscribeOn(): 指定 subscribe() 所发生的线程,即 Observable.OnSubscribe 被激活时所处的线程。或者叫做事件产生的线程。
- observeOn(): 指定 Subscriber 所运行在的线程。或者叫做事件消费的线程。
这段代码就是说,由 subscribeOn(Schedulers.io()) 的指定,创建的事件内容(1,2,3,4)
将由io线程发出,而由 observeOn(AndroidScheculers.mainThread()) 的指定,subscriber 数字的 Log 将发生在主线程。
进阶使用
Operators 操作符
Operators在消息发送者Observable和消息消费者Subscriber之间起到操纵消息的作用。
map 操作符
Observable.just("Hello, world!") .map(new Func1() { @Override public String call(String s) { return s + " -Dan"; } }) .subscribe(s -> System.out.println(s));
当我们在消息的过程中对消息进行转换,最简单的方法就是使用 map 操作符。
map()函数的特点是:它不一定发送和原始的Observable一样的数据类型。
flatMap 操作符
Observable> query(String text);query("a lot text") .flatMap(new Func1, Observable>() { @Override public Observable call(ArrayList s) { return Observable.from(s); } }) .subscribe(new Action1() { @Override public void call(String s) { System.out.println(s); } });
利用 flatMap 操作符可以很方便的将一个 list 的字符串分开发出来,在 faltMap 函数内的 Func1跟之前的 Action1 意思一样,并且返回值是一个新的 Observable 对象。
至于更多的操作符会在未来添加。
最后在说下RxJava 的实际使用意义
- Observable 和 Subscriber 他们本身就是一个处理事件的通用框架。
- Observable 和 Subscriber 在它们之间的一系列转换步骤是相互独立的。
更多相关文章
- Handler+Thread+Message模式 Android线程网络
- Android(安卓)工具类
- Android监听键盘事件
- android 多线程下载文件案例
- Android(安卓)SD 卡上创建 SQLite 数据库
- TextView双击事件
- android_service_totoal
- Android栗子の双击事件
- Android(安卓)adb monkey 测试命令