转载请注明出处:http://www.wangxinarhat.com/2016/05/01/2016-05-01-rxjava-android-operate2/

最近比较忙,也没想好这个文章该怎么写下去。可能会比较水,不过做事不能虎头蛇尾,所以继续吧。

场景五:BehaviorSubject操作符的使用(桥梁)

使用场景:制作缓存

效果图

代码:

  • 缓存管理类

    public class DataCache {                    /**             * 读取磁盘缓存数据             */            public List readData() {                ...            }                    /**             * 写缓存             */            public void writeData(List list) {                ...                   }                    /**             * 删除缓存             */            public boolean deleteCache() {                ...            }        }
  • 数据管理类

    public class Data {    private static Data instance;    private static final int DATA_SOURCE_MEMORY = 1;//内存    private static final int DATA_SOURCE_DISK = 2;//硬盘    private static final int DATA_SOURCE_NETWORK = 3;//网络    BehaviorSubject> cache;    private int dataSource;            private Data() {    }            public static Data newInstance() {        if (instance == null) {            instance = new Data();        }        return instance;    }        private void setDataSource(@DataSource int dataSource) {        this.dataSource = dataSource;    }            public String getDataSourceText() {        int dataSourceTextRes;        switch (dataSource) {            case DATA_SOURCE_MEMORY:                dataSourceTextRes = R.string.data_source_memory;                break;            case DATA_SOURCE_DISK:                dataSourceTextRes = R.string.data_source_disk;                break;            case DATA_SOURCE_NETWORK:                dataSourceTextRes = R.string.data_source_network;                break;            default:                dataSourceTextRes = R.string.data_source_network;        }        return BaseApplication.getApplication().getString(dataSourceTextRes);    }        /**     * 请求网络数据     */    public void loadData() {            Network.getGankApi()                .getBeauties(80, 1)                .map(BeautyResult2Beautise.newInstance())                .doOnNext(new Action1>() {                    @Override                    public void call(List list) {                        DataCache.newInstance().writeData(list);                    }                })                .subscribe(new Action1>() {                    @Override                    public void call(List list) {                        cache.onNext(list);                    }                }, new Action1() {                    @Override                    public void call(Throwable throwable) {                        throwable.printStackTrace();                    }                });        }        /**     * 获取数据     * @param observer     * @return     */    public Subscription subscribeData(@Nullable Observer> observer) {            if (null == cache) {            cache = BehaviorSubject.create();            Observable.create(new Observable.OnSubscribe>() {                @Override                public void call(Subscriber<? super List> subscriber) {                    //从缓存获取数据                    List list = DataCache.newInstance().readData();                        if (null == list) {                        setDataSource(DATA_SOURCE_NETWORK);                        //请求网络数据                        loadData();                    } else {                        setDataSource(DATA_SOURCE_DISK);                        subscriber.onNext(list);                    }                    }            })                    .subscribeOn(Schedulers.io()).subscribe(cache);            } else {            //内存中获取的数据            setDataSource(DATA_SOURCE_MEMORY);        }            return cache.observeOn(AndroidSchedulers.mainThread()).subscribe(observer);        }        /**     * 清空内存     */    public void clearMemoryCache() {        cache = null;    }        /**     * 清空内存和硬盘数据     */    public void clearMemoryAndDiskCache() {        clearMemoryCache();        DataCache.newInstance().deleteCache();    }}
  • 获取数据

    @OnClick(R.id.load)public void onClick() {    startTime = System.currentTimeMillis();    swipeRefreshLayout.setRefreshing(true);    unsubscribe();    subscription = Data.newInstance()            .subscribeData(getObserver());}
  • 在观察者中进行获取数据结果的处理

    private Observer> getObserver() {    if (null == observer) {        observer = new Observer>() {            @Override            public void onCompleted() {            }            @Override            public void onError(Throwable e) {                Toast.makeText(getActivity(), R.string.loading_failed, Toast.LENGTH_SHORT).show();            }            @Override            public void onNext(List list) {                swipeRefreshLayout.setRefreshing(false);                int loadingTime = (int) (System.currentTimeMillis() - startTime);                dataSituation.setText(getString(R.string.loading_time_and_source, loadingTime, Data.newInstance().getDataSourceText()));                adapter.setImages(list);            }        };    }    return observer;}

详解

Subject可以看成是一个桥梁或者代理,在RxJava中同时充当了Observer和Observable的角色。因为它是一个Observer,它可以订阅一个或多个Observable;又因为它是一个Observable,它可以转发它收到(Observe)的数据,也可以发射新的数据。

场景六:retryWhen操作符的使用(错误处理)

使用场景:有的 token 并非一次性的,而是可以多次使用,直到它超时或被销毁(多数 token 都是这样的)。

这样的 token 处理起来比较麻烦:需要把它保存起来,并且在发现它失效的时候要能够自动重新获取新的 token >并继续访问之前由于 token 失效而失败的请求。

如果项目中有多处的接口请求都需要这样的自动修复机制,使用传统的 Callback 形式需要写出非常复杂的代码。
而使用 RxJava ,可以用 retryWhen() 来轻松地处理这样的问题。

效果图

Token API准备

由于找不到足够简单的用于示例的 token API,以下API是代码伪造的

    /**     * Created by wangxinarhat on 16-4-5.     * TokenApi     */    public class TokenApi {            /**         * 获取Observable         * @param auth         * @return         */        public static Observable getToken(@NonNull String auth) {            return Observable.just(auth).map(new Func1() {                @Override                public Token call(String s) {                        try {                        Thread.sleep(new Random().nextInt(600) + 600);                    } catch (InterruptedException e) {                        e.printStackTrace();                    }                        Token token = new Token();                    token.token = createToken();                        return token;                }            });        }            /**         * 随机生成token         * @return         */        private static String createToken() {                return "token_wangxinarhat_" + System.currentTimeMillis() % 1000;        }                /**         * 根据Token获取用户数据         * @param token         * @return         */        public static Observable getData(@NonNull Token token) {            return Observable.just(token).map(new Func1() {                @Override                public DataInfo call(Token token) {                        try {                        Thread.sleep(new Random().nextInt(600) + 600);                    } catch (InterruptedException e) {                        e.printStackTrace();                    }                        if (token.isInvalid) {                        throw new IllegalArgumentException("Token is invalid");                    }                    DataInfo dataInfo = new DataInfo();                    dataInfo.id = (int) (System.currentTimeMillis() % 1000);                    dataInfo.name = "USER_" + dataInfo.id;                        return dataInfo;                }            });        }    }     

Token

    /**     *Token类     */    public class Token {        public String token;        public boolean isInvalid;//token是否失效            public Token(boolean isInvalid) {            this.isInvalid = isInvalid;        }            public Token() {        }    }

用户数据

    /**     * Created by wangxinarhat on 16-4-5.     * 用户数据     */    public class DataInfo {        public int id;        public String name;    }

操作符的使用

  • 根据token请求数据

    @OnClick(R.id.requestBt)void request() {    tokenUpdated = false;    swipeRefreshLayout.setRefreshing(true);    unsubscribe();    final TokenApi tokenApi = new TokenApi();    subscription = Observable.just(null).flatMap(new Func1>() {        @Override        public Observable call(Object o) {            return null == cachedFakeToken.token ?                    Observable.error(new NullPointerException("token id null")) :                    tokenApi.getData(cachedFakeToken);        }    }).retryWhen(new Func1, Observable<?>>() {        @Override        public Observable<?> call(Observable<? extends Throwable> observable) {            return observable.flatMap(new Func1>() {                @Override                public Observable<?> call(Throwable throwable) {                    if (throwable instanceof IllegalArgumentException || throwable instanceof NullPointerException) {                        return tokenApi.getToken("flat_map")                                .doOnNext(new Action1() {                                    @Override                                    public void call(Token token) {                                        tokenUpdated = true;                                        cachedFakeToken.token = token.token;                                        cachedFakeToken.isInvalid = token.isInvalid;                                    }                                });                    }                    return Observable.just(throwable);                }            });        }    }).subscribeOn(Schedulers.io())            .observeOn(AndroidSchedulers.mainThread())            .subscribe(new Action1() {                @Override                public void call(DataInfo dataInfo) {                    swipeRefreshLayout.setRefreshing(false);                    String token = cachedFakeToken.token;                    if (tokenUpdated) {                        token += "(" + getString(R.string.updated) + ")";                    }                    tokenTv.setText(String.format(getString(R.string.got_token_and_data), token, dataInfo.id, dataInfo.name));                }            }, new Action1() {                @Override                public void call(Throwable throwable) {                    swipeRefreshLayout.setRefreshing(false);                    Toast.makeText(getActivity(), R.string.loading_failed, Toast.LENGTH_SHORT).show();                }            });} 
  • 销毁token

    @OnClick(R.id.invalidateTokenBt)void incalidate() {    cachedFakeToken.isInvalid = true;    Toast.makeText(getActivity(), R.string.token_expired, Toast.LENGTH_SHORT).show();}

详解

如果原始Observable遇到错误,重新订阅它期望它能正常终止。

retryWhen操作符不会将原始Observable的onError通知传递给观察者,它会订阅这个Observable,再给它一次机会无错误地完成它的数据序列。

retryWhen总是传递onNext通知给观察者,由于重新订阅,可能会造成数据项重复。

无论收到多少次onError通知,无参数版本的retryWhen都会继续订阅并发射原始Observable。
接受单个count参数的retryWhen会最多重新订阅指定的次数,如果次数超了,它不会尝试再次订阅,它会把最新的一个onError通知传递给它的观察者。

还有一个版本的retryWhen接受一个谓词函数作为参数,这个函数的两个参数是:重试次数和导致发射onError通知的Throwable。这个函数返回一个布尔值,如果返回true,retryWhen应该再次订阅和镜像原始的Observable,如果返回false,retryWhen会将最新的一个onError通知传递给它的观察者。
retryWhen操作符默认在trampoline调度器上执行。

场景七:Debounce操作符的使用(过滤)

使用场景

实时搜索,如果在EditText中监听到字符改变就发起请求数据,明显不合适。
有了Debounce操作符,仅在过了指定的一段时间还没发射数据时才发射一个数据,Debounce操作符会过滤掉发射速率过快的数据项,优化网络请求

效果图

代码

  • 配合jakewharton大神的rxbinding使用,获取可观察对象

    @Overridepublic void onActivityCreated(@Nullable Bundle savedInstanceState) {    super.onActivityCreated(savedInstanceState);    setLogger();    //使用rxbing给EditText注册字符改变事件    subscription = RxTextView.textChangeEvents(input)            .debounce(500, TimeUnit.MILLISECONDS)//设置发射时间间隔            .observeOn(AndroidSchedulers.mainThread())            .subscribe(getObserver());} 
  • 在观察者中进行结果处理

     /** * 获取观察者 * @return */private Observer<? super TextViewTextChangeEvent> getObserver() {    return new Observer() {        @Override        public void onCompleted() {        }        @Override        public void onError(Throwable e) {        }        @Override        public void onNext(TextViewTextChangeEvent textViewTextChangeEvent) {            //得到搜索关键字,进行网络请求            log(String.format("搜索关键字 : %s", textViewTextChangeEvent.text().toString()));        }    };}    
  • 更新adapter数据集

     /** * 更新adapter数据集 * * @param logMsg */private void log(String logMsg) {    if (isCurrentlyOnMainThread()) {        mLogs.add(0, logMsg + " (main thread) ");        mAdapter.notifyDataSetChanged();    } else {        mLogs.add(0, logMsg + " (NOT main thread) ");        new Handler(Looper.getMainLooper()).post(new Runnable() {            @Override            public void run() {                mAdapter.notifyDataSetChanged();            }        });    }}

详解

Debounce仅在过了一段指定的时间还没发射数据时才发射一个数据,会根据设置的时间间隔过滤掉发射速率过快的数据项。

场景八:Buffer操作符的使用(变换)

定期收集Observable的数据放进一个数据包裹,然后发射这些数据包裹,而不是一次发射一个值。
这个操作符,我暂时还没有比较好的使用场景,不过既然是可以定期收集数据,那么应该可以做指定时间内点击次数等之类的统计。

效果图

代码

  • 还是使用jakewharton大神的rxbinding,注册点击事件获取可观察对象

    @Overridepublic void onActivityCreated(@Nullable Bundle savedInstanceState) {    super.onActivityCreated(savedInstanceState);    setLogger();    subscription = RxView.clicks(btn)            .map(new Func1() {                @Override                public Integer call(Void aVoid) {                    log("点击一次");                    return 1;                }            })            .buffer(3, TimeUnit.SECONDS)//设置收集数据时间间隔为3s            .observeOn(AndroidSchedulers.mainThread())            .subscribe(getObserver());}    

详解

Buffer操作符将一个Observable变换为另一个,原来的Observable正常发射数据,变换产生的Observable发射这些数据的缓存集合。

还有就是:如果原来的Observable发射了一个onError通知,Buffer会立即传递这个通知,而不是首先发射缓存的数据,即使在这之前缓存中包含了原始Observable发射的数据。

小结

因为我也才尝试使用rx,这篇终于挤出来了,好难。。代码在这里。

如果又学到新的使用场景,还是会再写。

说明

我是从国内rx大神扔物线,还有github上star数最多的那位哥们儿(kaushikgopal)学习的。因为他们都没有很详细的说明操作符的使用,所以才想写这个文章。

如想深入学习,请看大神代码。

更多相关文章

  1. 一句话锁定MySQL数据占用元凶
  2. Android实现省市区三级联动
  3. android调用手机摄像头拍照
  4. Android中传递对象的三种方法
  5. 获取手机联系人数据方法
  6. ContentProvider简单用法
  7. OpenGL播放yuv数据流(着色器SHADER)-android(一)
  8. MediaMetadataRetriever类、方法以及使用详解
  9. Android学习笔记(十四)

随机推荐

  1. android 源码导入到android studio
  2. 【译】Android 6.0接口变化(三)(Android 6.0
  3. android 系统属性 build.prop
  4. 2013.12.23 (2)——— android 代码调用she
  5. android获取手机号码以及imsi信息
  6. android按钮监听器的四种技术
  7. android网络图片的下载
  8. Android PinyinIME 源码笔记 -- 0. 简介
  9. Android Webview调用系统相册实现多选图
  10. Android图形报表之AchartEngine(附开发包