Android 架构师之路 目录





                Observable.create(new Observable.OnSubscribe() {                            @Override                            public void call(Subscriber<? super String> subscriber) {                                if (!subscriber.isUnsubscribed()) {                                    subscriber.onNext("1");                                    subscriber.onNext("2");                                    subscriber.onCompleted();                                }                            }                        }).                        //处理                        map(new Func1() {                            @Override                            public Integer call(String s) {                                return Integer.parseInt(s)+2;                            }                        }).                        subscribe(new Observer() {                            @Override                            public void onCompleted() {                                Log.d("kpioneer", "onCompleted:");                            }                            @Override                            public void onError(Throwable e) {                            }                            @Override                            public void onNext(Integer integer) {                                Log.d("kpioneer", "onNext:" + integer + ",integer instanceOf" + integer.getClass());                            }                        });
06-11 10:10:20.008 14148-14148/com.haocai.rxjavademo D/kpioneer: onNext:3,integer instanceOfclass java.lang.Integer06-11 10:10:20.008 14148-14148/com.haocai.rxjavademo D/kpioneer: onNext:4,integer instanceOfclass java.lang.Integer06-11 10:10:20.008 14148-14148/com.haocai.rxjavademo D/kpioneer: onCompleted:
public final class OnSubscribeMap implements OnSubscribe {    final Observable source;    final Func1<? super T, ? extends R> transformer;    public OnSubscribeMap(Observable source, Func1<? super T, ? extends R> transformer) {        this.source = source;        this.transformer = transformer;    }    @Override    public void call(final Subscriber<? super R> o) {        MapSubscriber parent = new MapSubscriber(o, transformer);        o.add(parent);        source.unsafeSubscribe(parent);    }    static final class MapSubscriber extends Subscriber {        final Subscriber<? super R> actual;        final Func1<? super T, ? extends R> mapper;        boolean done;        public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) {            this.actual = actual;            this.mapper = mapper;        }        @Override        public void onNext(T t) {            R result;            try {                result =;            } catch (Throwable ex) {                Exceptions.throwIfFatal(ex);                unsubscribe();                onError(OnErrorThrowable.addValueAsLastCause(ex, t));                return;            }            actual.onNext(result);        }        @Override        public void onError(Throwable e) {            if (done) {                RxJavaHooks.onError(e);                return;            }            done = true;            actual.onError(e);        }        @Override        public void onCompleted() {            if (done) {                return;            }            actual.onCompleted();        }        @Override        public void setProducer(Producer p) {            actual.setProducer(p);        }    }}
public final class OnSubscribeLift implements OnSubscribe {    final OnSubscribe parent;    final Operator<? extends R, ? super T> operator;    public OnSubscribeLift(OnSubscribe parent, Operator<? extends R, ? super T> operator) {        this.parent = parent;        this.operator = operator;    }    @Override    public void call(Subscriber<? super R> o) {        try {            Subscriber<? super T> st = RxJavaHooks.onObservableLift(operator).call(o);            try {                // new Subscriber created and being subscribed with so 'onStart' it                st.onStart();      ;            } catch (Throwable e) {                // localized capture of errors rather than it skipping all operators                // and ending up in the try/catch of the subscribe method which then                // prevents onErrorResumeNext and other similar approaches to error handling                Exceptions.throwIfFatal(e);                st.onError(e);            }        } catch (Throwable e) {            Exceptions.throwIfFatal(e);            // if the lift function failed all we can do is pass the error to the final Subscriber            // as we don't have the operator available to us            o.onError(e);        }    }}


    public final  Observable lift(final Operator<? extends R, ? super T> operator) {        return unsafeCreate(new OnSubscribeLift(onSubscribe, operator));    }


        MapSubscriber parent = new MapSubscriber(o, transformer);        o.add(parent);        source.unsafeSubscribe(parent);


        @Override        public void onNext(T t) {            R result;            try {                result =;            } catch (Throwable ex) {                Exceptions.throwIfFatal(ex);                unsubscribe();                onError(OnErrorThrowable.addValueAsLastCause(ex, t));                return;            }            actual.onNext(result);        }
    public final  Observable lift(final Operator<? extends R, ? super T> operator) {        return unsafeCreate(new OnSubscribeLift(onSubscribe, operator));    }




                Observable.                        create(new ObservableOnSubscribe() {                            @Override                            public void subscribe(ObservableEmitter e) throws Exception {                                if (!e.isDisposed()) {                                    e.onNext("1");                                    e.onNext("2");                                    e.onComplete();                                }                            }                        }).                        map(new Function() {                            @Override                            public Integer apply(String s) throws Exception {                                return Integer.parseInt(s)+2;                            }                        }).                        subscribe(new Observer() {                            @Override                            public void onSubscribe(Disposable d) {                                Log.d("kpioneer", "onSubscribe:");                            }                            @Override                            public void onNext(Integer value) {                                Log.d("kpioneer", "onNext:" + value);                            }                            @Override                            public void onError(Throwable e) {                            }                            @Override                            public void onComplete() {                                Log.d("kpioneer", "onComplete" );                            }                        });                Flowable.                        create(new FlowableOnSubscribe() {                            @Override                            public void subscribe(FlowableEmitter e) throws Exception {                                if (!e.isCancelled()) {                                    e.onNext("1");                                    e.onNext("2");                                    e.onComplete();                                }                            }                        }, BackpressureStrategy.DROP).                        map(new Function() {                            @Override                            public Integer apply(String s) throws Exception {                                return Integer.parseInt(s)+2;                            }                        }).                        subscribe(new Subscriber() {                            @Override                            public void onSubscribe(Subscription s) {                                s.request(Long.MAX_VALUE);                                Log.d("kpioneer", "onSubscribe");                            }                            @Override                            public void onNext(Integer integer) {                                Log.d("kpioneer", "onNext:" + integer);                            }                            @Override                            public void onError(Throwable t) {                            }                            @Override                            public void onComplete() {                                Log.d("kpioneer", "onComplete");                            }                        });
06-11 10:20:56.688 16675-16675/com.haocai.rxjavademo D/kpioneer: onSubscribe:06-11 10:20:56.688 16675-16675/com.haocai.rxjavademo D/kpioneer: onNext:306-11 10:20:56.698 16675-16675/com.haocai.rxjavademo D/kpioneer: onNext:406-11 10:20:56.698 16675-16675/com.haocai.rxjavademo D/kpioneer: onComplete06-11 10:20:56.758 16675-16675/com.haocai.rxjavademo D/kpioneer: onSubscribe06-11 10:20:56.758 16675-16675/com.haocai.rxjavademo D/kpioneer: onNext:306-11 10:20:56.758 16675-16675/com.haocai.rxjavademo D/kpioneer: onNext:406-11 10:20:56.768 16675-16675/com.haocai.rxjavademo D/kpioneer: onComplete


ObservableMap :无背压
public final class ObservableMap extends AbstractObservableWithUpstream {    final Function<? super T, ? extends U> function;    public ObservableMap(ObservableSource source, Function<? super T, ? extends U> function) {        super(source);        this.function = function;    }    @Override    public void subscribeActual(Observer<? super U> t) {        source.subscribe(new MapObserver(t, function));    }    static final class MapObserver extends BasicFuseableObserver {        final Function<? super T, ? extends U> mapper;        MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {            super(actual);            this.mapper = mapper;        }        @Override        public void onNext(T t) {            if (done) {                return;            }            if (sourceMode != NONE) {                actual.onNext(null);                return;            }            U v;            try {                v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");            } catch (Throwable ex) {                fail(ex);                return;            }            actual.onNext(v);        }        @Override        public int requestFusion(int mode) {            return transitiveBoundaryFusion(mode);        }        @Nullable        @Override        public U poll() throws Exception {            T t = qs.poll();            return t != null ? ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null;        }    }}


public final class ObservableLift extends AbstractObservableWithUpstream {    /** The actual operator. */    final ObservableOperator<? extends R, ? super T> operator;    public ObservableLift(ObservableSource source, ObservableOperator<? extends R, ? super T> operator) {        super(source);        this.operator = operator;    }    @Override    public void subscribeActual(Observer<? super R> s) {        Observer<? super T> observer;        try {            observer = ObjectHelper.requireNonNull(operator.apply(s), "Operator " + operator + " returned a null Observer");        } catch (NullPointerException e) { // NOPMD            throw e;        } catch (Throwable e) {            Exceptions.throwIfFatal(e);            // can't call onError because no way to know if a Disposable has been set or not            // can't call onSubscribe because the call might have set a Disposable already            RxJavaPlugins.onError(e);            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");            npe.initCause(e);            throw npe;        }        source.subscribe(observer);    }}
FlowableMap: 有背压
public final class FlowableMap extends AbstractFlowableWithUpstream {    final Function<? super T, ? extends U> mapper;    public FlowableMap(Flowable source, Function<? super T, ? extends U> mapper) {        super(source);        this.mapper = mapper;    }    @Override    protected void subscribeActual(Subscriber<? super U> s) {        if (s instanceof ConditionalSubscriber) {            source.subscribe(new MapConditionalSubscriber((ConditionalSubscriber<? super U>)s, mapper));        } else {            source.subscribe(new MapSubscriber(s, mapper));        }    }    static final class MapSubscriber extends BasicFuseableSubscriber {        final Function<? super T, ? extends U> mapper;        MapSubscriber(Subscriber<? super U> actual, Function<? super T, ? extends U> mapper) {            super(actual);            this.mapper = mapper;        }        @Override        public void onNext(T t) {            if (done) {                return;            }            if (sourceMode != NONE) {                actual.onNext(null);                return;            }            U v;            try {                v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");            } catch (Throwable ex) {                fail(ex);                return;            }            actual.onNext(v);        }        @Override        public int requestFusion(int mode) {            return transitiveBoundaryFusion(mode);        }        @Nullable        @Override        public U poll() throws Exception {            T t = qs.poll();            return t != null ? ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null;        }    }    static final class MapConditionalSubscriber extends BasicFuseableConditionalSubscriber {        final Function<? super T, ? extends U> mapper;        MapConditionalSubscriber(ConditionalSubscriber<? super U> actual, Function<? super T, ? extends U> function) {            super(actual);            this.mapper = function;        }        @Override        public void onNext(T t) {            if (done) {                return;            }            if (sourceMode != NONE) {                actual.onNext(null);                return;            }            U v;            try {                v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");            } catch (Throwable ex) {                fail(ex);                return;            }            actual.onNext(v);        }        @Override        public boolean tryOnNext(T t) {            if (done) {                return false;            }            U v;            try {                v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");            } catch (Throwable ex) {                fail(ex);                return true;            }            return actual.tryOnNext(v);        }        @Override        public int requestFusion(int mode) {            return transitiveBoundaryFusion(mode);        }        @Nullable        @Override        public U poll() throws Exception {            T t = qs.poll();            return t != null ? ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null;        }    }


public final class FlowableLift extends AbstractFlowableWithUpstream {    /** The actual operator. */    final FlowableOperator<? extends R, ? super T> operator;    public FlowableLift(Flowable source, FlowableOperator<? extends R, ? super T> operator) {        super(source);        this.operator = operator;    }    @Override    public void subscribeActual(Subscriber<? super R> s) {        try {            Subscriber<? super T> st = operator.apply(s);            if (st == null) {                throw new NullPointerException("Operator " + operator + " returned a null Subscriber");            }            source.subscribe(st);        } catch (NullPointerException e) { // NOPMD            throw e;        } catch (Throwable e) {            Exceptions.throwIfFatal(e);            // can't call onError because no way to know if a Subscription has been set or not            // can't call onSubscribe because the call might have set a Subscription already            RxJavaPlugins.onError(e);            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");            npe.initCause(e);            throw npe;        }    }}

RxJava2 有背压和无背压核心实现使用了代理机制

3.RxJava1 操作符功能仿写实现

  1. Operator接口是操作符的抽象接口
  2. 个操作符实现Operator接口用于处理具体的变换
  1. 变换的基本原理
  2. 各操作符均实现Operator接口,并调用lift操作符
  1. 最基本的操作符
  2. 顾名思义,用于做映射
public class Caller {    final OnCall onCall;    public Caller(OnCall onCall) {        this.onCall = onCall;    }    public static  Caller create(OnCall onCall) {        return new Caller<>(onCall);    }    public Calling call(Receiver receiver) {;        return receiver;    }    public final  Caller lift(final Operator operator) {        return create(new OnCallLift<>(onCall, operator));    }    public final  Caller map(Func1 func) {        return lift(new MapOperator(func));    }    public interface OnCall extends Action1> {    }    public interface Operator extends Func1, Receiver> {    }}
public interface Func1{    R call(T t);}
public class MapOperator implements Caller.Operator {    private final Func1 mapper;    public MapOperator(Func1 mapper) {        this.mapper = mapper;    }    @Override    public Receiver call(Receiver rReceiver) {        return new MapReceiver<>(rReceiver, this.mapper);    }}
public class MapReceiver extends Receiver {    private final Receiver actual;    private final Func1 mapper;    public MapReceiver(Receiver actual, Func1 mapper) {        this.actual = actual;        this.mapper = mapper;    }    @Override    public void onCompleted() {        this.actual.onCompleted();    }    @Override    public void onError(Throwable t) {        this.actual.onError(t);    }    @Override    public void onReceive(T t) {        R tR =;        this.actual.onReceive(tR);    }}
public class OnCallLift implements Caller.OnCall {    private final Caller.OnCall parent;    private final Caller.Operator operator;    public OnCallLift(Caller.OnCall parent, Caller.Operator operator) {        this.parent = parent;        this.operator = operator;    }    @Override    public void call(Receiver rReceiver) {        Receiver tReceiver =;;    }}
public class Lesson2_2Activity extends AppCompatActivity {    @Override    protected void onCreate(final Bundle savedInstanceState) {        super.onCreate(savedInstanceState);        setContentView(R.layout.activity_custom_test);        ButterKnife.bind(this);    }    @OnClick(    public void onViewClicked() {        Caller.                create(new Caller.OnCall() {                    @Override                    public void call(Receiver stringReceiver) {                        if (!stringReceiver.isUnCalled()) {                            stringReceiver.onReceive("1");                            stringReceiver.onReceive("2");                            stringReceiver.onCompleted();                        }                    }                }).                map(new Func1() {                    @Override                    public Integer call(String s) {                        return Integer.parseInt(s)+2;                    }                }).                call(new Receiver() {                    @Override                    public void onCompleted() {                        Log.d("kpioneer", "onCompleted");                    }                    @Override                    public void onError(Throwable t) {                    }                    @Override                    public void onReceive(Integer integer) {                        Log.d("kpioneer", "onReceive:" + integer);                    }                });    }}
06-11 16:45:26.988 10850-10850/com.haocai.rxjavademo D/kpioneer: onReceive:306-11 16:45:26.988 10850-10850/com.haocai.rxjavademo D/kpioneer: onReceive:406-11 16:45:26.988 10850-10850/com.haocai.rxjavademo D/kpioneer: onCompleted

3.RxJava2(无背压) 操作符功能仿写实现

  1. 一个抽象类
  2. 有callActual方法
  3. 实现操作符需实现此方法
  1. 最基本的操作符
  2. 顾名思义,用于做映射
  1. 在callActual中做变换
  2. 可用于扩展操作符


public abstract class Caller {    public static  Caller create(CallerOnCall callerOnCall) {        return new CallerCreate<>(callerOnCall);    }    public void call(Callee callee) {        callActual(callee);    }    protected abstract void callActual(Callee callee);    public  Caller lift(CallerOperator operator) {        return new CallerLift<>(this, operator);    }    public  Caller map(Function function) {        return new CallerMap<>(this, function);    }}
public interface CallerOperator {    Callee call(Callee callee);}
/** * Created by Xionghu on 2018/6/11. * Desc:返回源Caller */public interface CallerSource {    Caller source();}
public abstract class CallerWithUpstream extends Caller implements CallerSource {    protected final Caller source;    public CallerWithUpstream(Caller source) {        this.source = source;    }    @Override    public Caller source() {        return source;    }}
public class CallerLift extends CallerWithUpstream {    private final CallerOperator mOperator;    public CallerLift(Caller source, CallerOperator mOperator) {        super(source);        this.mOperator = mOperator;    }    @Override    protected void callActual(Callee callee) {        Callee tCallee =;;    }}
public interface Function {    R call(T t);}
public class CallerMap extends CallerWithUpstream {    private Function function;    public CallerMap(Caller source, Function function) {        super(source);        this.function = function;    }    @Override    protected void callActual(Callee callee) { MapCallee<>(callee, function));    }    static class MapCallee implements Callee {        private final Callee mCallee;        private final Function mFunction;        public MapCallee(Callee mCallee, Function mFunction) {            this.mCallee = mCallee;            this.mFunction = mFunction;        }        @Override        public void onCall(Release release) {            mCallee.onCall(release);        }        @Override        public void onReceive(T t) {            R tR =;            mCallee.onReceive(tR);        }        @Override        public void onCompleted() {            mCallee.onCompleted();        }        @Override        public void onError(Throwable t) {            mCallee.onError(t);        }    }}
/** * Created by Xionghu on 2018/6/11. * Desc: 仿写RxJava2 无背压 操作符方法 */public class Lesson2_3Activity extends AppCompatActivity {    @Override    protected void onCreate(final Bundle savedInstanceState) {        super.onCreate(savedInstanceState);        setContentView(R.layout.activity_custom_test);        ButterKnife.bind(this);    }    @OnClick(    public void onViewClicked() {        Caller.                create(new CallerOnCall() {                    @Override                    public void call(CallerEmitter callerEmitter) {                        callerEmitter.onReceive("1");                        callerEmitter.onReceive("2");                        callerEmitter.onCompleted();                    }                }).                map(new Function() {                    @Override                    public Integer call(String s) {                        return Integer.parseInt(s);                    }                }).                call(new Callee() {                    @Override                    public void onCall(Release release) {                        Log.d("kpioneer", "onCall");                    }                    @Override                    public void onReceive(Integer integer) {                        Log.d("kpioneer", "onReceive:" + integer);                    }                    @Override                    public void onCompleted() {                        Log.d("kpioneer", "onCompleted");                    }                    @Override                    public void onError(Throwable t) {                    }                });        Caller.                create(new CallerOnCall() {                    @Override                    public void call(CallerEmitter callerEmitter) {                        callerEmitter.onReceive("3");                        callerEmitter.onReceive("4");                        callerEmitter.onCompleted();                    }                }).                lift(new CallerOperator() {                    @Override                    public Callee call(final Callee callee) {                        return new Callee() {                            @Override                            public void onCall(Release release) {                                callee.onCall(release);                            }                            @Override                            public void onReceive(String s) {                                callee.onReceive(Integer.parseInt(s));                            }                            @Override                            public void onCompleted() {                                callee.onCompleted();                            }                            @Override                            public void onError(Throwable t) {                                callee.onError(t);                            }                        };                    }                }).                call(new Callee() {                    @Override                    public void onCall(Release release) {                        Log.d("kpioneer", "onCall");                    }                    @Override                    public void onReceive(Integer integer) {                        Log.d("kpioneer", "onReceive:" + integer);                    }                    @Override                    public void onCompleted() {                        Log.d("kpioneer", "onCompleted");                    }                    @Override                    public void onError(Throwable t) {                        Log.d("kpioneer", "onError");                    }                });    }}
06-11 18:03:27.268 24409-24409/com.haocai.rxjavademo D/kpioneer: onCall06-11 18:03:27.268 24409-24409/com.haocai.rxjavademo D/kpioneer: onReceive:106-11 18:03:27.268 24409-24409/com.haocai.rxjavademo D/kpioneer: onReceive:206-11 18:03:27.268 24409-24409/com.haocai.rxjavademo D/kpioneer: onCompleted06-11 18:03:27.268 24409-24409/com.haocai.rxjavademo D/kpioneer: onCall06-11 18:03:27.268 24409-24409/com.haocai.rxjavademo D/kpioneer: onReceive:306-11 18:03:27.268 24409-24409/com.haocai.rxjavademo D/kpioneer: onReceive:406-11 18:03:27.268 24409-24409/com.haocai.rxjavademo D/kpioneer: onCompleted

4.RxJava2(有背压) 操作符功能仿写实现

  1. 在callActual中做变换
  2. 可用于扩展操作符
  1. 一个抽象类
  2. 有callActual方法
  3. 实现操作符需实现此方法
public abstract class Telephoner {    public static  Telephoner create(TelephonerOnCall telephonerOnCall){        return new TelephonerCreate<>(telephonerOnCall);        }    public void call(Receiver receiver) { callActual(receiver);}    protected abstract void callActual(Receiver receiver);    public  Telephoner map(Function function) {        return new TelephonerMap<>(this, function);    }    public  Telephoner lift(TelephonerOperator telephonerOperator) {        return new TelephonerLift<>(this, telephonerOperator);    }}
/** * Created by Xionghu on 2018/6/12. * Desc: lift操作符 */public class TelephonerLift extends TelephonerWithUpstream {    private final TelephonerOperator operator;    public TelephonerLift(Telephoner source, TelephonerOperator operator) {        super(source);        this.operator = operator;    }    @Override    protected void callActual(Receiver receiver) {        Receiver tReceiver =;;    }}
import com.haocai.mylibrary.rxJava2.Function;/** * Created by Xionghu on 2018/6/12. * Desc: map操作符 */public class TelephonerMap extends TelephonerWithUpstream {    private Function trFunction;    public TelephonerMap(Telephoner source, Function trFunction) {        super(source);        this.trFunction = trFunction;    }    @Override    protected void callActual(Receiver receiver) { MapReceiver<>(receiver, trFunction));    }    static class MapReceiver implements Receiver {        private final Receiver rReceiver;        private final Function trFunction;        public MapReceiver(Receiver rReceiver, Function trFunction) {            this.rReceiver = rReceiver;            this.trFunction = trFunction;        }        @Override        public void onCall(Drop d) {            rReceiver.onCall(d);        }        @Override        public void onReceive(T t) {            R tr =;            rReceiver.onReceive(tr);        }        @Override        public void onError(Throwable t) {            rReceiver.onError(t);        }        @Override        public void onCompleted() {            rReceiver.onCompleted();        }    }}
/** * Created by Xionghu on 2018/6/12. * Desc: 操作符接口 */public interface TelephonerOperator {    Receiver call(Receiver callee);}
/** * Created by Xionghu on 2018/6/11. * Desc: 返回源Telephoner */public interface TelephonerSource {    Telephoner source();}
public abstract class TelephonerWithUpstream extends Telephoner implements TelephonerSource {    protected final Telephoner source;    public TelephonerWithUpstream(Telephoner source) {        this.source = source;    }    @Override    public Telephoner source() {        return source;    }}
import android.os.Bundle;import;import android.util.Log;import com.haocai.mylibrary.rxJava2.Function;import com.haocai.mylibrary.rxJava2.backpressure.Drop;import com.haocai.mylibrary.rxJava2.backpressure.Receiver;import com.haocai.mylibrary.rxJava2.backpressure.Telephoner;import com.haocai.mylibrary.rxJava2.backpressure.TelephonerEmitter;import com.haocai.mylibrary.rxJava2.backpressure.TelephonerOnCall;import com.haocai.mylibrary.rxJava2.backpressure.TelephonerOperator;import com.haocai.rxjavademo.R;import butterknife.ButterKnife;import butterknife.OnClick;/** * Created by Xionghu on 2018/6/11. * Desc: 仿写RxJava2 无背压 操作符方法 */public class Lesson2_4Activity extends AppCompatActivity {    @Override    protected void onCreate(final Bundle savedInstanceState) {        super.onCreate(savedInstanceState);        setContentView(R.layout.activity_custom_test);        ButterKnife.bind(this);    }    @OnClick(    public void onViewClicked() {        Telephoner.                create(new TelephonerOnCall() {                    @Override                    public void call(TelephonerEmitter telephonerEmitter) {                        telephonerEmitter.onReceive("1");                        telephonerEmitter.onReceive("2");                        telephonerEmitter.onCompleted();                    }                }).                map(new Function() {                    @Override                    public Integer call(String s) {                        return Integer.parseInt(s);                    }                }).                call(new Receiver() {                    @Override                    public void onCall(Drop d) {                        d.request(Long.MAX_VALUE);                        Log.d("kpioneer", "onCall");                    }                    @Override                    public void onReceive(Integer integer) {                        Log.d("kpioneer", "onReceive:" + integer);                    }                    @Override                    public void onError(Throwable t) {                        Log.d("kpioneer", "onError");                    }                    @Override                    public void onCompleted() {                        Log.d("kpioneer", "onCompleted");                    }                });        Telephoner.                create(new TelephonerOnCall() {                    @Override                    public void call(TelephonerEmitter telephonerEmitter) {                        telephonerEmitter.onReceive("3");                        telephonerEmitter.onReceive("4");                        telephonerEmitter.onCompleted();                    }                }).                lift(new TelephonerOperator() {                    @Override                    public Receiver call(final Receiver receiver) {                        return new Receiver() {                            @Override                            public void onCall(Drop d) {                                receiver.onCall(d);                            }                            @Override                            public void onReceive(String s) {                                receiver.onReceive(Integer.parseInt(s));                            }                            @Override                            public void onError(Throwable t) {                                receiver.onError(t);                            }                            @Override                            public void onCompleted() {                                receiver.onCompleted();                            }                        };                    }                }).                call(new Receiver() {                    @Override                    public void onCall(Drop d) {                        d.request(Long.MAX_VALUE);                        Log.d("kpioneer", "onCall");                    }                    @Override                    public void onReceive(Integer integer) {                        Log.d("kpioneer", "onReceive:" + integer);                    }                    @Override                    public void onError(Throwable t) {                        Log.d("kpioneer", "onError");                    }                    @Override                    public void onCompleted() {                        Log.d("kpioneer", "onCompleted");                    }                });    }}
06-12 09:56:50.108 22364-22364/com.haocai.rxjavademo D/kpioneer: onCall06-12 09:56:50.108 22364-22364/com.haocai.rxjavademo D/kpioneer: onReceive:106-12 09:56:50.108 22364-22364/com.haocai.rxjavademo D/kpioneer: onReceive:206-12 09:56:50.108 22364-22364/com.haocai.rxjavademo D/kpioneer: onCompleted06-12 09:56:50.108 22364-22364/com.haocai.rxjavademo D/kpioneer: onCall06-12 09:56:50.108 22364-22364/com.haocai.rxjavademo D/kpioneer: onReceive:306-12 09:56:50.108 22364-22364/com.haocai.rxjavademo D/kpioneer: onReceive:406-12 09:56:50.108 22364-22364/com.haocai.rxjavademo D/kpioneer: onCompleted



  1. android中使用OpenGL ES
  2. android studio 报错Failed to save settings解决方法
  3. cocos2dx生成android时出错解决方法
  4. 通过Criteria获取LocationProvider
  5. Android(安卓)属性动画 源码解析 深入了解其内部实现
  6. java虚拟机理解
  7. Android(安卓)Fragment生命周期
  8. android中的dailog
  9. Android(安卓)解决监听home键的几种方法


  1. Android(安卓)addJavaScriptInterface
  2. Android/java 多线程(五)-ThreadPoolExec
  4. Android MVC模式
  5. Android动态显示具体到秒的相聚时间
  6. android基本概念
  7. Flutter跟Android交互
  8. Android Kotlin TextView跑马灯效果
  9. 关于Android studio Gradle 实现多渠道打
  10. Android属性系统