Android 架构师之路 目录

操作符简介

操作符:将发出的数据进行处理并再发送
变化传播--通过操作符实现变化,并能向下传播

1.RxJava1操作符源码分析

1.Func1接口
2.Operator接口

1.1RxJava1实例
                Observable.create(new Observable.OnSubscribe() {                            @Override                            public void call(Subscriber<? super String> subscriber) {                                if (!subscriber.isUnsubscribed()) {                                    subscriber.onNext("1");                                    subscriber.onNext("2");                                    subscriber.onCompleted();                                }                            }                        }).                        //处理                        map(new Func1() {                            @Override                            public Integer call(String s) {                                return Integer.parseInt(s)+2;                            }                        }).                        subscribe(new Observer() {                            @Override                            public void onCompleted() {                                Log.d("kpioneer", "onCompleted:");                            }                            @Override                            public void onError(Throwable e) {                            }                            @Override                            public void onNext(Integer integer) {                                Log.d("kpioneer", "onNext:" + integer + ",integer instanceOf" + integer.getClass());                            }                        });
运行
06-11 10:10:20.008 14148-14148/com.haocai.rxjavademo D/kpioneer: onNext:3,integer instanceOfclass java.lang.Integer06-11 10:10:20.008 14148-14148/com.haocai.rxjavademo D/kpioneer: onNext:4,integer instanceOfclass java.lang.Integer06-11 10:10:20.008 14148-14148/com.haocai.rxjavademo D/kpioneer: onCompleted:
1.2RxJava1操作符源码
RxJava1中OnSubscribeMap类
public final class OnSubscribeMap implements OnSubscribe {    final Observable source;    final Func1<? super T, ? extends R> transformer;    public OnSubscribeMap(Observable source, Func1<? super T, ? extends R> transformer) {        this.source = source;        this.transformer = transformer;    }    @Override    public void call(final Subscriber<? super R> o) {        MapSubscriber parent = new MapSubscriber(o, transformer);        o.add(parent);        source.unsafeSubscribe(parent);    }    static final class MapSubscriber extends Subscriber {        final Subscriber<? super R> actual;        final Func1<? super T, ? extends R> mapper;        boolean done;        public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) {            this.actual = actual;            this.mapper = mapper;        }        @Override        public void onNext(T t) {            R result;            try {                result = mapper.call(t);            } catch (Throwable ex) {                Exceptions.throwIfFatal(ex);                unsubscribe();                onError(OnErrorThrowable.addValueAsLastCause(ex, t));                return;            }            actual.onNext(result);        }        @Override        public void onError(Throwable e) {            if (done) {                RxJavaHooks.onError(e);                return;            }            done = true;            actual.onError(e);        }        @Override        public void onCompleted() {            if (done) {                return;            }            actual.onCompleted();        }        @Override        public void setProducer(Producer p) {            actual.setProducer(p);        }    }}
RxJava1中OnSubscribeLift类
public final class OnSubscribeLift implements OnSubscribe {    final OnSubscribe parent;    final Operator<? extends R, ? super T> operator;    public OnSubscribeLift(OnSubscribe parent, Operator<? extends R, ? super T> operator) {        this.parent = parent;        this.operator = operator;    }    @Override    public void call(Subscriber<? super R> o) {        try {            Subscriber<? super T> st = RxJavaHooks.onObservableLift(operator).call(o);            try {                // new Subscriber created and being subscribed with so 'onStart' it                st.onStart();                parent.call(st);            } catch (Throwable e) {                // localized capture of errors rather than it skipping all operators                // and ending up in the try/catch of the subscribe method which then                // prevents onErrorResumeNext and other similar approaches to error handling                Exceptions.throwIfFatal(e);                st.onError(e);            }        } catch (Throwable e) {            Exceptions.throwIfFatal(e);            // if the lift function failed all we can do is pass the error to the final Subscriber            // as we don't have the operator available to us            o.onError(e);        }    }}
1.3变换的原理(核心操作符lift):

1.接收原OnSubscribe的当前的Operator
2.创建一个新的OnSubscribe并返回新的Observable

    public final  Observable lift(final Operator<? extends R, ? super T> operator) {        return unsafeCreate(new OnSubscribeLift(onSubscribe, operator));    }

3.用新的Subscriber包裹旧的Subscriber

        MapSubscriber parent = new MapSubscriber(o, transformer);        o.add(parent);        source.unsafeSubscribe(parent);

4.在新的Subscriber里做完变换再传给旧的Subscriber

        @Override        public void onNext(T t) {            R result;            try {                result = mapper.call(t);            } catch (Throwable ex) {                Exceptions.throwIfFatal(ex);                unsubscribe();                onError(OnErrorThrowable.addValueAsLastCause(ex, t));                return;            }            actual.onNext(result);        }
    public final  Observable lift(final Operator<? extends R, ? super T> operator) {        return unsafeCreate(new OnSubscribeLift(onSubscribe, operator));    }

分析

核心实现使用了代理机制

2.RxJava2操作符源码分析

2.1.RxJava2实例
                Observable.                        create(new ObservableOnSubscribe() {                            @Override                            public void subscribe(ObservableEmitter e) throws Exception {                                if (!e.isDisposed()) {                                    e.onNext("1");                                    e.onNext("2");                                    e.onComplete();                                }                            }                        }).                        map(new Function() {                            @Override                            public Integer apply(String s) throws Exception {                                return Integer.parseInt(s)+2;                            }                        }).                        subscribe(new Observer() {                            @Override                            public void onSubscribe(Disposable d) {                                Log.d("kpioneer", "onSubscribe:");                            }                            @Override                            public void onNext(Integer value) {                                Log.d("kpioneer", "onNext:" + value);                            }                            @Override                            public void onError(Throwable e) {                            }                            @Override                            public void onComplete() {                                Log.d("kpioneer", "onComplete" );                            }                        });                Flowable.                        create(new FlowableOnSubscribe() {                            @Override                            public void subscribe(FlowableEmitter e) throws Exception {                                if (!e.isCancelled()) {                                    e.onNext("1");                                    e.onNext("2");                                    e.onComplete();                                }                            }                        }, BackpressureStrategy.DROP).                        map(new Function() {                            @Override                            public Integer apply(String s) throws Exception {                                return Integer.parseInt(s)+2;                            }                        }).                        subscribe(new Subscriber() {                            @Override                            public void onSubscribe(Subscription s) {                                s.request(Long.MAX_VALUE);                                Log.d("kpioneer", "onSubscribe");                            }                            @Override                            public void onNext(Integer integer) {                                Log.d("kpioneer", "onNext:" + integer);                            }                            @Override                            public void onError(Throwable t) {                            }                            @Override                            public void onComplete() {                                Log.d("kpioneer", "onComplete");                            }                        });
运行
06-11 10:20:56.688 16675-16675/com.haocai.rxjavademo D/kpioneer: onSubscribe:06-11 10:20:56.688 16675-16675/com.haocai.rxjavademo D/kpioneer: onNext:306-11 10:20:56.698 16675-16675/com.haocai.rxjavademo D/kpioneer: onNext:406-11 10:20:56.698 16675-16675/com.haocai.rxjavademo D/kpioneer: onComplete06-11 10:20:56.758 16675-16675/com.haocai.rxjavademo D/kpioneer: onSubscribe06-11 10:20:56.758 16675-16675/com.haocai.rxjavademo D/kpioneer: onNext:306-11 10:20:56.758 16675-16675/com.haocai.rxjavademo D/kpioneer: onNext:406-11 10:20:56.768 16675-16675/com.haocai.rxjavademo D/kpioneer: onComplete
2.2.RxJava2操作符源码

Function接口

ObservableMap :无背压
public final class ObservableMap extends AbstractObservableWithUpstream {    final Function<? super T, ? extends U> function;    public ObservableMap(ObservableSource source, Function<? super T, ? extends U> function) {        super(source);        this.function = function;    }    @Override    public void subscribeActual(Observer<? super U> t) {        source.subscribe(new MapObserver(t, function));    }    static final class MapObserver extends BasicFuseableObserver {        final Function<? super T, ? extends U> mapper;        MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {            super(actual);            this.mapper = mapper;        }        @Override        public void onNext(T t) {            if (done) {                return;            }            if (sourceMode != NONE) {                actual.onNext(null);                return;            }            U v;            try {                v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");            } catch (Throwable ex) {                fail(ex);                return;            }            actual.onNext(v);        }        @Override        public int requestFusion(int mode) {            return transitiveBoundaryFusion(mode);        }        @Nullable        @Override        public U poll() throws Exception {            T t = qs.poll();            return t != null ? ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null;        }    }}

1.ObservableMap继承了AbstractObservableWithUpstream抽象类
2.利用了AbstractObservableWithUpstream中subscribeActual方法
3.用原Observable去subscribe变换后的Observer

public final class ObservableLift extends AbstractObservableWithUpstream {    /** The actual operator. */    final ObservableOperator<? extends R, ? super T> operator;    public ObservableLift(ObservableSource source, ObservableOperator<? extends R, ? super T> operator) {        super(source);        this.operator = operator;    }    @Override    public void subscribeActual(Observer<? super R> s) {        Observer<? super T> observer;        try {            observer = ObjectHelper.requireNonNull(operator.apply(s), "Operator " + operator + " returned a null Observer");        } catch (NullPointerException e) { // NOPMD            throw e;        } catch (Throwable e) {            Exceptions.throwIfFatal(e);            // can't call onError because no way to know if a Disposable has been set or not            // can't call onSubscribe because the call might have set a Disposable already            RxJavaPlugins.onError(e);            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");            npe.initCause(e);            throw npe;        }        source.subscribe(observer);    }}
FlowableMap: 有背压
public final class FlowableMap extends AbstractFlowableWithUpstream {    final Function<? super T, ? extends U> mapper;    public FlowableMap(Flowable source, Function<? super T, ? extends U> mapper) {        super(source);        this.mapper = mapper;    }    @Override    protected void subscribeActual(Subscriber<? super U> s) {        if (s instanceof ConditionalSubscriber) {            source.subscribe(new MapConditionalSubscriber((ConditionalSubscriber<? super U>)s, mapper));        } else {            source.subscribe(new MapSubscriber(s, mapper));        }    }    static final class MapSubscriber extends BasicFuseableSubscriber {        final Function<? super T, ? extends U> mapper;        MapSubscriber(Subscriber<? super U> actual, Function<? super T, ? extends U> mapper) {            super(actual);            this.mapper = mapper;        }        @Override        public void onNext(T t) {            if (done) {                return;            }            if (sourceMode != NONE) {                actual.onNext(null);                return;            }            U v;            try {                v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");            } catch (Throwable ex) {                fail(ex);                return;            }            actual.onNext(v);        }        @Override        public int requestFusion(int mode) {            return transitiveBoundaryFusion(mode);        }        @Nullable        @Override        public U poll() throws Exception {            T t = qs.poll();            return t != null ? ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null;        }    }    static final class MapConditionalSubscriber extends BasicFuseableConditionalSubscriber {        final Function<? super T, ? extends U> mapper;        MapConditionalSubscriber(ConditionalSubscriber<? super U> actual, Function<? super T, ? extends U> function) {            super(actual);            this.mapper = function;        }        @Override        public void onNext(T t) {            if (done) {                return;            }            if (sourceMode != NONE) {                actual.onNext(null);                return;            }            U v;            try {                v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");            } catch (Throwable ex) {                fail(ex);                return;            }            actual.onNext(v);        }        @Override        public boolean tryOnNext(T t) {            if (done) {                return false;            }            U v;            try {                v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");            } catch (Throwable ex) {                fail(ex);                return true;            }            return actual.tryOnNext(v);        }        @Override        public int requestFusion(int mode) {            return transitiveBoundaryFusion(mode);        }        @Nullable        @Override        public U poll() throws Exception {            T t = qs.poll();            return t != null ? ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null;        }    }

1.FlowableMap继承了AbstractFlowableWithUpstream
2.利用了AbstractFlowableWithUpstream中的subscribeActual方法
3.用原Flowable去subscribe变换后的Subscriber
FlowableLift

public final class FlowableLift extends AbstractFlowableWithUpstream {    /** The actual operator. */    final FlowableOperator<? extends R, ? super T> operator;    public FlowableLift(Flowable source, FlowableOperator<? extends R, ? super T> operator) {        super(source);        this.operator = operator;    }    @Override    public void subscribeActual(Subscriber<? super R> s) {        try {            Subscriber<? super T> st = operator.apply(s);            if (st == null) {                throw new NullPointerException("Operator " + operator + " returned a null Subscriber");            }            source.subscribe(st);        } catch (NullPointerException e) { // NOPMD            throw e;        } catch (Throwable e) {            Exceptions.throwIfFatal(e);            // can't call onError because no way to know if a Subscription has been set or not            // can't call onSubscribe because the call might have set a Subscription already            RxJavaPlugins.onError(e);            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");            npe.initCause(e);            throw npe;        }    }}
2.3Operator接口
1.实现此接口2.在subscribeActual中做变换3.用于扩展自定义操作符
分析

RxJava2 有背压和无背压核心实现使用了代理机制

3.RxJava1 操作符功能仿写实现

Operator接口实现
  1. Operator接口是操作符的抽象接口
  2. 个操作符实现Operator接口用于处理具体的变换
lift操作符
  1. 变换的基本原理
  2. 各操作符均实现Operator接口,并调用lift操作符
map操作符
  1. 最基本的操作符
  2. 顾名思义,用于做映射
public class Caller {    final OnCall onCall;    public Caller(OnCall onCall) {        this.onCall = onCall;    }    public static  Caller create(OnCall onCall) {        return new Caller<>(onCall);    }    public Calling call(Receiver receiver) {        this.onCall.call(receiver);        return receiver;    }    public final  Caller lift(final Operator operator) {        return create(new OnCallLift<>(onCall, operator));    }    public final  Caller map(Func1 func) {        return lift(new MapOperator(func));    }    public interface OnCall extends Action1> {    }    public interface Operator extends Func1, Receiver> {    }}
public interface Func1{    R call(T t);}
public class MapOperator implements Caller.Operator {    private final Func1 mapper;    public MapOperator(Func1 mapper) {        this.mapper = mapper;    }    @Override    public Receiver call(Receiver rReceiver) {        return new MapReceiver<>(rReceiver, this.mapper);    }}
public class MapReceiver extends Receiver {    private final Receiver actual;    private final Func1 mapper;    public MapReceiver(Receiver actual, Func1 mapper) {        this.actual = actual;        this.mapper = mapper;    }    @Override    public void onCompleted() {        this.actual.onCompleted();    }    @Override    public void onError(Throwable t) {        this.actual.onError(t);    }    @Override    public void onReceive(T t) {        R tR = this.mapper.call(t);        this.actual.onReceive(tR);    }}
public class OnCallLift implements Caller.OnCall {    private final Caller.OnCall parent;    private final Caller.Operator operator;    public OnCallLift(Caller.OnCall parent, Caller.Operator operator) {        this.parent = parent;        this.operator = operator;    }    @Override    public void call(Receiver rReceiver) {        Receiver tReceiver = this.operator.call(rReceiver);        this.parent.call(tReceiver);    }}
调用
public class Lesson2_2Activity extends AppCompatActivity {    @Override    protected void onCreate(final Bundle savedInstanceState) {        super.onCreate(savedInstanceState);        setContentView(R.layout.activity_custom_test);        ButterKnife.bind(this);    }    @OnClick(R.id.testDo)    public void onViewClicked() {        Caller.                create(new Caller.OnCall() {                    @Override                    public void call(Receiver stringReceiver) {                        if (!stringReceiver.isUnCalled()) {                            stringReceiver.onReceive("1");                            stringReceiver.onReceive("2");                            stringReceiver.onCompleted();                        }                    }                }).                map(new Func1() {                    @Override                    public Integer call(String s) {                        return Integer.parseInt(s)+2;                    }                }).                call(new Receiver() {                    @Override                    public void onCompleted() {                        Log.d("kpioneer", "onCompleted");                    }                    @Override                    public void onError(Throwable t) {                    }                    @Override                    public void onReceive(Integer integer) {                        Log.d("kpioneer", "onReceive:" + integer);                    }                });    }}
Log输出
06-11 16:45:26.988 10850-10850/com.haocai.rxjavademo D/kpioneer: onReceive:306-11 16:45:26.988 10850-10850/com.haocai.rxjavademo D/kpioneer: onReceive:406-11 16:45:26.988 10850-10850/com.haocai.rxjavademo D/kpioneer: onCompleted

3.RxJava2(无背压) 操作符功能仿写实现

CallerWithUpstream(类似于AbstractObservableWithUpstream)
  1. 一个抽象类
  2. 有callActual方法
  3. 实现操作符需实现此方法
map操作符
  1. 最基本的操作符
  2. 顾名思义,用于做映射
CallerOperator接口
  1. 在callActual中做变换
  2. 可用于扩展操作符

相关代码:

public abstract class Caller {    public static  Caller create(CallerOnCall callerOnCall) {        return new CallerCreate<>(callerOnCall);    }    public void call(Callee callee) {        callActual(callee);    }    protected abstract void callActual(Callee callee);    public  Caller lift(CallerOperator operator) {        return new CallerLift<>(this, operator);    }    public  Caller map(Function function) {        return new CallerMap<>(this, function);    }}
public interface CallerOperator {    Callee call(Callee callee);}
/** * Created by Xionghu on 2018/6/11. * Desc:返回源Caller */public interface CallerSource {    Caller source();}
public abstract class CallerWithUpstream extends Caller implements CallerSource {    protected final Caller source;    public CallerWithUpstream(Caller source) {        this.source = source;    }    @Override    public Caller source() {        return source;    }}
public class CallerLift extends CallerWithUpstream {    private final CallerOperator mOperator;    public CallerLift(Caller source, CallerOperator mOperator) {        super(source);        this.mOperator = mOperator;    }    @Override    protected void callActual(Callee callee) {        Callee tCallee = mOperator.call(callee);        source.call(tCallee);    }}
public interface Function {    R call(T t);}
public class CallerMap extends CallerWithUpstream {    private Function function;    public CallerMap(Caller source, Function function) {        super(source);        this.function = function;    }    @Override    protected void callActual(Callee callee) {        source.call(new MapCallee<>(callee, function));    }    static class MapCallee implements Callee {        private final Callee mCallee;        private final Function mFunction;        public MapCallee(Callee mCallee, Function mFunction) {            this.mCallee = mCallee;            this.mFunction = mFunction;        }        @Override        public void onCall(Release release) {            mCallee.onCall(release);        }        @Override        public void onReceive(T t) {            R tR = mFunction.call(t);            mCallee.onReceive(tR);        }        @Override        public void onCompleted() {            mCallee.onCompleted();        }        @Override        public void onError(Throwable t) {            mCallee.onError(t);        }    }}
/** * Created by Xionghu on 2018/6/11. * Desc: 仿写RxJava2 无背压 操作符方法 */public class Lesson2_3Activity extends AppCompatActivity {    @Override    protected void onCreate(final Bundle savedInstanceState) {        super.onCreate(savedInstanceState);        setContentView(R.layout.activity_custom_test);        ButterKnife.bind(this);    }    @OnClick(R.id.testDo)    public void onViewClicked() {        Caller.                create(new CallerOnCall() {                    @Override                    public void call(CallerEmitter callerEmitter) {                        callerEmitter.onReceive("1");                        callerEmitter.onReceive("2");                        callerEmitter.onCompleted();                    }                }).                map(new Function() {                    @Override                    public Integer call(String s) {                        return Integer.parseInt(s);                    }                }).                call(new Callee() {                    @Override                    public void onCall(Release release) {                        Log.d("kpioneer", "onCall");                    }                    @Override                    public void onReceive(Integer integer) {                        Log.d("kpioneer", "onReceive:" + integer);                    }                    @Override                    public void onCompleted() {                        Log.d("kpioneer", "onCompleted");                    }                    @Override                    public void onError(Throwable t) {                    }                });        Caller.                create(new CallerOnCall() {                    @Override                    public void call(CallerEmitter callerEmitter) {                        callerEmitter.onReceive("3");                        callerEmitter.onReceive("4");                        callerEmitter.onCompleted();                    }                }).                lift(new CallerOperator() {                    @Override                    public Callee call(final Callee callee) {                        return new Callee() {                            @Override                            public void onCall(Release release) {                                callee.onCall(release);                            }                            @Override                            public void onReceive(String s) {                                callee.onReceive(Integer.parseInt(s));                            }                            @Override                            public void onCompleted() {                                callee.onCompleted();                            }                            @Override                            public void onError(Throwable t) {                                callee.onError(t);                            }                        };                    }                }).                call(new Callee() {                    @Override                    public void onCall(Release release) {                        Log.d("kpioneer", "onCall");                    }                    @Override                    public void onReceive(Integer integer) {                        Log.d("kpioneer", "onReceive:" + integer);                    }                    @Override                    public void onCompleted() {                        Log.d("kpioneer", "onCompleted");                    }                    @Override                    public void onError(Throwable t) {                        Log.d("kpioneer", "onError");                    }                });    }}
06-11 18:03:27.268 24409-24409/com.haocai.rxjavademo D/kpioneer: onCall06-11 18:03:27.268 24409-24409/com.haocai.rxjavademo D/kpioneer: onReceive:106-11 18:03:27.268 24409-24409/com.haocai.rxjavademo D/kpioneer: onReceive:206-11 18:03:27.268 24409-24409/com.haocai.rxjavademo D/kpioneer: onCompleted06-11 18:03:27.268 24409-24409/com.haocai.rxjavademo D/kpioneer: onCall06-11 18:03:27.268 24409-24409/com.haocai.rxjavademo D/kpioneer: onReceive:306-11 18:03:27.268 24409-24409/com.haocai.rxjavademo D/kpioneer: onReceive:406-11 18:03:27.268 24409-24409/com.haocai.rxjavademo D/kpioneer: onCompleted

4.RxJava2(有背压) 操作符功能仿写实现

TelephonerOperator接口
  1. 在callActual中做变换
  2. 可用于扩展操作符
TelephonerWithUpstream(类似于AbstractObservableWithUpstream)
  1. 一个抽象类
  2. 有callActual方法
  3. 实现操作符需实现此方法
相关源码
public abstract class Telephoner {    public static  Telephoner create(TelephonerOnCall telephonerOnCall){        return new TelephonerCreate<>(telephonerOnCall);        }    public void call(Receiver receiver) { callActual(receiver);}    protected abstract void callActual(Receiver receiver);    public  Telephoner map(Function function) {        return new TelephonerMap<>(this, function);    }    public  Telephoner lift(TelephonerOperator telephonerOperator) {        return new TelephonerLift<>(this, telephonerOperator);    }}
/** * Created by Xionghu on 2018/6/12. * Desc: lift操作符 */public class TelephonerLift extends TelephonerWithUpstream {    private final TelephonerOperator operator;    public TelephonerLift(Telephoner source, TelephonerOperator operator) {        super(source);        this.operator = operator;    }    @Override    protected void callActual(Receiver receiver) {        Receiver tReceiver = operator.call(receiver);        source.call(tReceiver);    }}
import com.haocai.mylibrary.rxJava2.Function;/** * Created by Xionghu on 2018/6/12. * Desc: map操作符 */public class TelephonerMap extends TelephonerWithUpstream {    private Function trFunction;    public TelephonerMap(Telephoner source, Function trFunction) {        super(source);        this.trFunction = trFunction;    }    @Override    protected void callActual(Receiver receiver) {        source.call(new MapReceiver<>(receiver, trFunction));    }    static class MapReceiver implements Receiver {        private final Receiver rReceiver;        private final Function trFunction;        public MapReceiver(Receiver rReceiver, Function trFunction) {            this.rReceiver = rReceiver;            this.trFunction = trFunction;        }        @Override        public void onCall(Drop d) {            rReceiver.onCall(d);        }        @Override        public void onReceive(T t) {            R tr = trFunction.call(t);            rReceiver.onReceive(tr);        }        @Override        public void onError(Throwable t) {            rReceiver.onError(t);        }        @Override        public void onCompleted() {            rReceiver.onCompleted();        }    }}
/** * Created by Xionghu on 2018/6/12. * Desc: 操作符接口 */public interface TelephonerOperator {    Receiver call(Receiver callee);}
/** * Created by Xionghu on 2018/6/11. * Desc: 返回源Telephoner */public interface TelephonerSource {    Telephoner source();}
public abstract class TelephonerWithUpstream extends Telephoner implements TelephonerSource {    protected final Telephoner source;    public TelephonerWithUpstream(Telephoner source) {        this.source = source;    }    @Override    public Telephoner source() {        return source;    }}
import android.os.Bundle;import android.support.v7.app.AppCompatActivity;import android.util.Log;import com.haocai.mylibrary.rxJava2.Function;import com.haocai.mylibrary.rxJava2.backpressure.Drop;import com.haocai.mylibrary.rxJava2.backpressure.Receiver;import com.haocai.mylibrary.rxJava2.backpressure.Telephoner;import com.haocai.mylibrary.rxJava2.backpressure.TelephonerEmitter;import com.haocai.mylibrary.rxJava2.backpressure.TelephonerOnCall;import com.haocai.mylibrary.rxJava2.backpressure.TelephonerOperator;import com.haocai.rxjavademo.R;import butterknife.ButterKnife;import butterknife.OnClick;/** * Created by Xionghu on 2018/6/11. * Desc: 仿写RxJava2 无背压 操作符方法 */public class Lesson2_4Activity extends AppCompatActivity {    @Override    protected void onCreate(final Bundle savedInstanceState) {        super.onCreate(savedInstanceState);        setContentView(R.layout.activity_custom_test);        ButterKnife.bind(this);    }    @OnClick(R.id.testDo)    public void onViewClicked() {        Telephoner.                create(new TelephonerOnCall() {                    @Override                    public void call(TelephonerEmitter telephonerEmitter) {                        telephonerEmitter.onReceive("1");                        telephonerEmitter.onReceive("2");                        telephonerEmitter.onCompleted();                    }                }).                map(new Function() {                    @Override                    public Integer call(String s) {                        return Integer.parseInt(s);                    }                }).                call(new Receiver() {                    @Override                    public void onCall(Drop d) {                        d.request(Long.MAX_VALUE);                        Log.d("kpioneer", "onCall");                    }                    @Override                    public void onReceive(Integer integer) {                        Log.d("kpioneer", "onReceive:" + integer);                    }                    @Override                    public void onError(Throwable t) {                        Log.d("kpioneer", "onError");                    }                    @Override                    public void onCompleted() {                        Log.d("kpioneer", "onCompleted");                    }                });        Telephoner.                create(new TelephonerOnCall() {                    @Override                    public void call(TelephonerEmitter telephonerEmitter) {                        telephonerEmitter.onReceive("3");                        telephonerEmitter.onReceive("4");                        telephonerEmitter.onCompleted();                    }                }).                lift(new TelephonerOperator() {                    @Override                    public Receiver call(final Receiver receiver) {                        return new Receiver() {                            @Override                            public void onCall(Drop d) {                                receiver.onCall(d);                            }                            @Override                            public void onReceive(String s) {                                receiver.onReceive(Integer.parseInt(s));                            }                            @Override                            public void onError(Throwable t) {                                receiver.onError(t);                            }                            @Override                            public void onCompleted() {                                receiver.onCompleted();                            }                        };                    }                }).                call(new Receiver() {                    @Override                    public void onCall(Drop d) {                        d.request(Long.MAX_VALUE);                        Log.d("kpioneer", "onCall");                    }                    @Override                    public void onReceive(Integer integer) {                        Log.d("kpioneer", "onReceive:" + integer);                    }                    @Override                    public void onError(Throwable t) {                        Log.d("kpioneer", "onError");                    }                    @Override                    public void onCompleted() {                        Log.d("kpioneer", "onCompleted");                    }                });    }}
06-12 09:56:50.108 22364-22364/com.haocai.rxjavademo D/kpioneer: onCall06-12 09:56:50.108 22364-22364/com.haocai.rxjavademo D/kpioneer: onReceive:106-12 09:56:50.108 22364-22364/com.haocai.rxjavademo D/kpioneer: onReceive:206-12 09:56:50.108 22364-22364/com.haocai.rxjavademo D/kpioneer: onCompleted06-12 09:56:50.108 22364-22364/com.haocai.rxjavademo D/kpioneer: onCall06-12 09:56:50.108 22364-22364/com.haocai.rxjavademo D/kpioneer: onReceive:306-12 09:56:50.108 22364-22364/com.haocai.rxjavademo D/kpioneer: onReceive:406-12 09:56:50.108 22364-22364/com.haocai.rxjavademo D/kpioneer: onCompleted

源码下载

https://github.com/kpioneer123/RxJavaLearning

更多相关文章

  1. android中使用OpenGL ES
  2. android studio 报错Failed to save settings解决方法
  3. cocos2dx生成android时出错解决方法
  4. 通过Criteria获取LocationProvider
  5. Android(安卓)属性动画 源码解析 深入了解其内部实现
  6. java虚拟机理解
  7. Android(安卓)Fragment生命周期
  8. android中的dailog
  9. Android(安卓)解决监听home键的几种方法

随机推荐

  1. Android(安卓)addJavaScriptInterface
  2. Android/java 多线程(五)-ThreadPoolExec
  3. Android NDK: WARNING: APP_PLATFORM and
  4. Android MVC模式
  5. Android动态显示具体到秒的相聚时间
  6. android基本概念
  7. Flutter跟Android交互
  8. Android Kotlin TextView跑马灯效果
  9. 关于Android studio Gradle 实现多渠道打
  10. Android属性系统