参考文章:给 Android 开发者的 RxJava 详解,本文的部分内容参考自该文章。
注:下文中demo部分代码采用Kotlin语言编写,RxJava源码部分为Java语言,RxJava lib版本为2.0。

RxJava是什么

  • RxJava在GitHub上的自我介绍为:"a library for composing asynchronous and event-based programs using observable sequences for the Java VM"(一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库);
  • 可理解为:一个实现异步操作的库。

RxJava的优势

  • 简化逻辑(链式调用,无嵌套)
  • 代码示例:
// 将某文件夹下的所有png文件转化为bitmap并进行渲染Observable.fromArray(*folders) // 观察文件夹数组        .flatMap { Observable.fromArray(*it.listFiles()) } // 转换为文件夹中的文件        .filter { it.name.endsWith(".png") } // 筛选其中的png文件        .map { getBitmapFromFile(it) } // 将file转换为bitmap        .subscribeOn(Schedulers.io()) // 事件产生在io线程        .observeOn(AndroidSchedulers.mainThread()) // 事件消费在主线程        .subscribe { imageCollectorView.addImage(bitmap) }// 消费事件,渲染bitmap

如果你对RxJava不是很熟悉,看上面这段代码就会有一种“虽然不知道你在说啥,但是感觉很屌的样子”的感觉。没错,就是要让你感受到它很diao,然后,我来让你看懂这段代码到底是在干嘛。

API介绍

概念

  • 异步实现原理:观察者模式
  • RxJava的观察者模式包含以下四个概念:
    • 被观察者:Observable
    • 观察者:Observer
    • 订阅:subscribe
    • 事件
  • RxJava将每个事件单独处理,并把所有事件作为一个队列。
  • RxJava事件回调:
    • onNext():普通事件;
    • onComplete(): 事件队列完结。当不回再有新的OnNext()发出时,触发onComplete()
    • onError():事件队列异常。当事件队列中的事件处理过程中出现异常时,触发onError(),并终止队列。
    • 在同一个事件队列中onComplete()onError()只会有一个触发,并且触发后队列结束。

基本使用

创建Observer

Observer即观察者,决定了事件触发时的行为。RxJava中的Observer接口的实现方式:

val observer: Observer = object : Observer {    override fun onComplete() {        // do something..    }    override fun onSubscribe(d: Disposable) {        // do something..    }    override fun onNext(t: String) {        // do something..    }    override fun onError(e: Throwable) {        // do something..    }}
创建Observable

Observable即被观察者,决定了什么时候触发事件以及触发怎样的事件。RxJava使用create()方法创建Observable,并定义其触发规则:

val observable: Observable = Observable.create(object : ObservableOnSubscribe {    override fun subscribe(emitter: ObservableEmitter) {        emitter.onNext("Hello")        emitter.onNext("Hi")        emitter.onComplete()    }})// Kotlin语言中,推荐使用Lambda表达式将类型为接口的参数缩进为方法块// 如下所示,两种代码实际上是一样的val observable: Observable = Observable.create { it ->    it.onNext("Hello")    it.onNext("Hi")    it.onComplete()}

这里传入了一个ObservableOnSubscribe对象作为参数,当Observable被订阅的时候,ObservableOnSubscribesubscribe()方法会被调用,然后依次触发两次onNext()方法和一次onCompleted()
基于create()方法,RxJava还提供了其他用来快捷创建时间队列的方法:

  • just(T...):将传入的参数依次发送:
val observable: Observable = Observable.just("Hello", "Hi")// 会依次调用:// onNext("Hello");// onNext("Hi");// onCompleted();
  • fromArray/fromIterable/fromCallable/fromFuture/fromPublisher:将传入的数组或Iterable等的item依次发送出来:
val stringArr = arrayOf("Hello", "Hi")val observable = Observable.fromArray(*stringArr)// 会依次调用:// onNext("Hello");// onNext("Hi");// onCompleted();
Subscribe:订阅

创建了ObservableObserver之后,在用subscribe()方法为它们之间建立订阅关系。代码如下:

observable.subscribe(observer)

这里RxJava为了流式 API 的设计,subscribe()方法看起来是被观察者订阅了观察者,而非普通的观察者模式中观察者订阅了被观察者。

RxJava还提供了以下subscribe()方法的重载:

observable.subscribe {     // onNext}observable.subscribe ({    // onNext}, {    // onError})observable.subscribe ({    // onNext}, {    // onError}, {    // onComplete})
  • 实例:打印输出学生名称
val students = arrayOf(Student("李明"), Student("黄翔"))Observable.fromArray(*students)        .subscribe { it:Student! ->            Log.i(TAG, it.name)        }// 输出结果为:// 李明// 黄翔

在以上代码中,事件默认在同一个线程(UI线程)产生以及消费。观察者模式通常是处理异步操作时使用的。要用RxJava实现异步,就要用到RxJava中的Scheduler(后面会详细讲)。
我们可以发现,当事件序列中的的操作对象为多个对象或者一个数组时,就好像是对操作对象进行了循环,而循环的时间复杂度就是该操作对象中对象的个数,或者数组的长度,循环内执行的操作就是事件序列中各事件中所执行的操作。听起来很抽象,直接来看以下代码。其实操作对象为单个对象时,也可以看作是在进行时间复杂度为1的一次循环。

// 将上一段代码转化为循环语法val students = arrayOf(Student("李明"), Student("黄翔"))for(item in students) {    Log.i(TAG, item.name)}// 输出结果为:// 李明// 黄翔

那为啥不直接用循环语法而要用RxJava呢?仅仅在这样一个例子中,看起来循环语法似乎是更为简洁,但是,如果是多层循环嵌套,或是要进行线程切换,RxJava的流式结构无疑更为清晰。后文中会涉及到相关功能。

线程调度(Scheduler)
  • Scheduler的使用
    RxJava的线程调度主要通过Scheduler实现。如以下代码:
Observable.just("Hello", "Hi")        .subscribeOn(Schedulers.newThread()) // 事件产生在子线程        .observeOn(AndroidSchedulers.mainThread()) // 事件消费在主线程        .subscribe { it: String! ->            Toast.makeText(context, it, Toast.LENGTH_SHORT).show()        }

以上代码即为简单的线程调度使用实例。在RxJava中,为使用者提供了以下几种Scheduler

  1. Schedulers.immediate():当前线程,不会进行线程转换。RxJava中默认的Scheduler
  2. Schedulers.newThread():启用新线程。
  3. Schedulers.io():进行I/O操作的线程。拥有一个无上限的线程池,可复用空线程,比newThread()更有效率。
  4. Schedulers.computation():进行计算(CPU密集型计算)操作的线程。拥有固定大小的线程池,数量为CPU核数。
  5. AndroidSchedulers.mainThread():Android特有的主线程。

在RxJava中,通过subscribeOn()observeOn()两个方法,根据传入的Scheduler参数进行线程切换。subscribeOn()指定了被观察者Observable运行所在的线程,即ObservableOnSubscribesubscribe()方法执行时所处的线程,observeOn()指定了Observer中的事件所在的线程。若为指定,则默认为当前线程,不会切换
需要注意的是,一个Observable队列中,调用多次subscribeOn()方法,Observable会运行在第一次调用的subscribeOn()方法中;而observeOn()方法可多次调用,一次observeOn()方法被调用,会将队列中之后的事件所处的线程切换为传入的Scheduler的线程,直到调用下一个observeOn()被调用或者直到队列结束。
Talk is cheep, show me the code:

// 将若干字符串打印、存库并显示toastObservable.just("Hello", "Hi")        .subscribeOn(Schedulers.newThread()) // 新建线程        .subscribeOn(AndroidSchedulers.mainThread()) // 主线程        .doOnNext { it: String! ->            Log.i(TAG, it) // 在刚才新建的线程中输出        }        .observeOn(Schedulers.io()) // 切换至io线程        .doOnNext { it: String! ->            StringDao.getInstance().saveString(it) // 在io线程进行数据库存储操作        }        .observeOn(AndroidSchedulers.mainThread()) // 切换至主线程        .subscribe { it: String! ->            Toast.makeText(context, it, Toast.LENGTH_SHORT).show() // 在主线程操作view层,显示吐司        }
  • 几个特殊的方法:
  1. doOnSubscribe():在subscribe()发生时执行

根据上文可知,队列中多次调用subscribeOn()方法似乎只有第一次调用时起作用,那么调用多次subscribeOn()方法看起来是个没什么用而且很蠢的做法,但是,有一种例外:doOnSubscribe()。默认情况下,该方法执行在subscribe()发生的线程;若在调用doOnSubscribe()方法之后再次调用subscribeOn()的话,则doOnSubscribe()会执行在其之后第一次调用的subscribeOn()所指定的线程:

Observable.just("Hello", "Hi")        .subscribeOn(Schedulers.newThread()) // 新建线程        .doOnSubscribe{ it: String! ->            Log.i(TAG, it) // 在主线程中输出        }        .subscribeOn(AndroidSchedulers.mainThread()) // 主线程        .subscribeOn(Schedulers.io())        .doOnNext { it: String! ->            Log.i(TAG, it) // 在新建的线程中输出        }        .observeOn(AndroidSchedulers.mainThread()) // 切换至主线程        .subscribe { it: String! ->            Toast.makeText(context, it, Toast.LENGTH_SHORT).show() // 在主线程操作view层,显示吐司        }
  1. doOnNext():onNext()事件触发时调用
  2. doOnComplete():onComplete()事件触发时调用
  3. doOnError():onError()事件触发时调用
  4. doFinally():Observable队列结束时调用

以上方法都可以通过observeOn()方法指定线程,以doOnNext()为例:

Observable.just("Hello", "Hi")        .subscribeOn(Schedulers.newThread()) // 新建线程        .subscribeOn(AndroidSchedulers.mainThread()) // 主线程        .doOnNext { it: String! ->            // onNext()事件触发时,在新建的线程中输出            Log.i(TAG, it)         }        .observeOn(Schedulers.io()) // 切换至io线程        .doOnNext { it: String! ->            // onNext()事件触发时,在io线程进行数据库存储操作            StringDao.getInstance().saveString(it)         }        .observeOn(AndroidSchedulers.mainThread()) // 切换至主线程        .subscribe { it: String! ->            // Observer中的onNext()事件,在主线程操作view层,显示吐司            Toast.makeText(context, it, Toast.LENGTH_SHORT).show()        }
变换

所谓变换,是指RxJava能将事件序列的操作对象变换为其他对象的功能。该功能十分强大,是RxJava的核心共呢功能之一。

  • map():
// 将json转化为对象后存库val json: String = studentJson  Observable.just(json)    .map(object : Function {        override fun apply(t: String): Student {            // 将student的json字符串转换为student对象            return Gson().fromJson(t, Student::class.java)        }    })    .observeOn(Schedulers.io())    .subscribe(object : Consumer {        // 操作对象变为student        override fun accept(t: Student?) {            // onNext            // 将student对象存库            StudentDao.saveStudent(t)        }    })

这段代码可用lambda表达式简化为:

val json: String = studentJsonObservable.just(json)    .map { it: String ->        Gson().fromJson(it, Student::class.java)    }    .observeOn(Schedulers.io())    .subscribe { it: Student ->        // onNext        StudentDao.saveStudent(it)    }

map()方法,传入一个Function接口对象。在该接口的apply()方法中,return转换后的对象。此后该事件序列的操作对象,都会变为转换后的对象。这是RxJava中最常用的变换方法。

  • flatMap():
    map()方法类似,flatMap()方法传进一个Function接口对象,并在该接口的apply()方法中,返回一个转换后的Observable对象,即返回一个新的事件序列。代码如下:
val json: String = studentJsonObservable.just(json)    .flatMap(object : Function> {        override fun apply(t: String): Observable {            return Observable.just(Gson().fromJson(t, Student::class.java))        }    })    .subscribe(object : Consumer {        override fun accept(t: Student?) {            StudentDao.saveStudent(t)        }    })// 用lambda表达式可简化为:Observable.just(json)    .flatMap{ it : String ->        Observable.just(Gson().fromJson(it, Student::class.java))    }    .subscribe{ it : Student ->        StudentDao.saveStudent(it)    }

看到这你可能会疑惑,那这样不就和map()一样了吗,只是返回的对象不同,用法其实都一样,看起来还比map()方法更麻烦。没错,因为flatMap()根本不是这么用的,虽然这样写也没有问题。接下来详细说明二者的使用场景之间的区别。
前文说过Observable的操作对象如果是多个,或者是数组类型,则事件序列中的操作就像是对所有操作对象进行了一次循环,而flatMap()方法,则是对该事件序列进行了嵌套循环。也就是说,map()方法的转换只能是一对一(不会增加循环的层数)的,而flatMap()方法的转换是一对多(会增加循环的层数)的。示例代码如下:

// 将所有学生的所有课程打印输出:val students = arrayOf(Student("李明"), Student("黄翔"))Observable.fromArray(*students)    .flatMap{ it : Student ->        // courses的类型为ArrayList        // Java中ArrayList.toArray()方法的返回值为Object[]        Observable.fromArray(*it.courses.toArray())    }    .map { it : Any ->        // Kotlin中的Any就是Java中的Object        // 将Object强转为Course        it as Course    }    .subscribe { it : Course ->        Log.i(TAG, it.name)    }

如果对比循环语法,则应该是这样:

val students = arrayOf(Student("李明"), Student("黄翔"))for (student in students) {    for(course in student.courses) {        Log.i(TAG, course.name)    } }

你可能又会问了,这看起来比循环语法要复杂得多嘛。当然,这段代码是双层循环,并且循环嵌套之间没有其他复杂逻辑。但是在实际应用中就不一定是这样了,循环可能有多层,而嵌套之间也可能包含复杂的逻辑,比如线程切换。在这种情况下,每一层嵌套的缩进,都会降低代码的可读性。而用RxJava看起来代码行数很多,可是逻辑自上而下很清晰,代码可读性很高。

筛选(filter)

filter()类似于if语法(不是if-else语法),传入一个Predicate接口对象,并在接口test()方法的返回值中,将需要过滤掉的返回false,需要留下的返回true。比如如下示例:

// 输出学生中所有男生的姓名Observable.fromArray(*students)    .filter(object : Predicate {        override fun test(t: Student): Boolean {            return t.sex.equals("男")        }    })    .subscribe(object : Consumer {        override fun accept(t: Student?) {            Log.i(TAG, t?.name)        }    })// 用lambda表达式可简化为Observable.fromArray(*students)    .filter { it : Student ->        it.sex.equals("男")    }    .subscribe{ it : Student ->        Log.i(TAG, it.name)    }// 用if语法可写为Observable.fromArray(*students)    .subscribe{ it : Student ->        if(it.sex.equals("男")) {            Log.i(TAG, it.name)        }    }

使用场景

由于RxJava是一个实现异步操作的库,而且实现原理是通过观察者模式。所以,理论上,RxJava可以用于任何异步操作或观察者模式的使用场景,只是需要添加对RxJava的支持,不然可能无法直接使用。

与Retrofit结合使用

  • 什么是Retrofit?

Retrofit官网对自己的描述是:Type-safe HTTP client for Android and Java by Square, Inc.即:由Square公司出品的用于Android和Java的类型安全的HTTP客户端。对于Android而言,就是一个网络请求库。
若对Retrofit不了解的可以跳过这部分内容,想要了解的也可以自行了解。

  • 如何与RxJava结合使用

Retrofit添加对RxJava的支持,可以将原本的相应回调Callback以转换为Observable的形式。具体如下:
使用Callback(以下代码为已对Retrofit二次封装后的代码):

// 在ApiService接口中定义api@FormUrlEncoded@POST("/user")fun getUser(@Field("userid") uid: Int) : Call// 使用apival call = ApiService.getInstance().getUser(userId)call.enqueue(object : Callback {    override fun onFailure(call: Call, t: Throwable) {            // 请求失败回调        }    override fun onResponse(call: Call, response: Response) {            // 请求成功回调        }})

使用Observable

// 创建Retrofit实例时添加对RxJava的支持val retrofit = Retrofit.Builder()                    .baseUrl(baseUrl)                    .addCallAdapterFactory(RxJava2CallAdapterFactory.create()) // 适配RxJava2.x版本                    .build()// 在ApiService接口中定义Api时将返回类型改为Observable@FormUrlEncoded@POST("/user")fun getUser(@Field("userid") uid: Int) : Observable// 使用Observable进行api请求ApiService.getInstance().getUser(userId)                    .subscribeOn(Schedulers.io())                    .observeOn(AndroidSchedulers.mainThread())                    .subscribe({ it : UserResponseModel ->                        // onNext,成功回调                    }, { it : Throwable ->                        // onError,失败回调                    })

这样就可以使用Observable处理网络请求了,更可以通过flatMap()方法进行请求链的调用,比如:

// 在ApiService接口中定义Api// 登录@FormUrlEncoded@POST("/user")fun getUser(@Field("userid") uid : Int) : Observable// 获取token@FormUrlEncoded@POST("/token")fun getToken() : Observable// 先调用登录api,在调用获取token apiApiService.getInstance().getToken()                    .subscribeOn(Schedulers.io())                    .flatMap { it : TokenResponseModel ->                        saveToken(it)                        getUser(it.userId)                    }                    .observeOn(AndroidSchedulers.mainThread())                    .subscribe({ it :UserResponseModel ->                        // onNext,成功回调                    }, { it : Throwable ->                        // onError,失败回调                    })

将请求链通过Observable序列实现,代码逻辑是不是变得清晰了很多?
当然,还有很多其他用法,这里就不一一列举了。

与RxBinding结合使用

  • 什么是RxBinding

RxBinding是ButterKnife作者JakeWharton写的一个基于RxJava的View注入框架。
该框架将View的监听事件,例如View.OnClickListenerView.OnTouchListenerRecyclerView.OnScrollListener等监听事件,通过Observable的方式进行回调。

  • 怎么使用

以点击事件为例:

RxView.clicks(view)    .subscribe { it : Unit ->        // do something    }

RxJava中还有一个api叫做throttleFirst(),该api会在每次事件触发后的一段时间内屏蔽新的事件,可用于去抖动过滤。比如用在下方代码中,可以防止连击启动多个Activity

RxView.clicks(view)    .throttleFirst(300, TimeUnit. MILLISECONDS) // 300毫秒抖动过滤    .subscribe { it : Unit ->        NewActivity.start(view.context)    }

其他异步操作、观察者模式使用场景

这里就不再举例啦。
只要是异步操作或者观察者模式的使用场景,理论上都可以使用RxJava,只是需要自己手动适配一下。而上文中的Retrofit和RxBinding,在源码中都有对RxJava进行适配。感兴趣的话可以去Github上将源码下载下来进行研究。

总结

这是我第一篇严格意义上的技术方向的博客。从业快三年了,看了不少前辈们的文章,最近在学习RxJava的过程中,就想要自己将学习的东西记录下来,方便以后温故知新,也能造福后来的旅者。
由于这篇文章除了借鉴了大神的文章外,还加入了许多自己的见地,所以可能有些地方理解的不准确,或者排版混乱,有意见者,还望多多指正、不吝赐教。
希望这篇文章能真的帮助到各位正走在学习路上的人。

更多相关文章

  1. RecyclerView高度随Item自适应 GridLayoutManager和LinearLayout
  2. Android(安卓)ListView异步加载图片时图片顺序混乱解决办法
  3. Android(安卓)listView scroll 恢复滚动位置
  4. Android的Hello World
  5. Android(安卓)json通信(解析)方法
  6. Binder使用示例
  7. Android(安卓)HandlerThread源码解析
  8. Android实现简单断点续传和下载到本地功能
  9. Android(安卓)rom开发:webview崩溃问题Binary XML file line #103

随机推荐

  1. 我应该使用DataInputStream还是BufferedI
  2. 教你如何秒杀12306,JAVA程序抢票成功!----
  3. 如何更改webservice url端点?
  4. Linux下java/bin目录下的命令集合
  5. 派生类具有基类私有成员
  6. java中final关键字详解
  7. java 对称加密——密钥与加密后的数据存
  8. 基于james3.0 的邮件系统(struts2.3.2 +sp
  9. 如何在Java中递归解压缩文件?
  10. 试图改变Jtable java中行的颜色