Android(安卓)架构师之路20 响应式编程RxJava操作符源码分析与实现
16lz
2021-01-25
Android 架构师之路 目录
操作符简介
操作符:将发出的数据进行处理并再发送
变化传播--通过操作符实现变化,并能向下传播
1.RxJava1操作符源码分析
1.Func1接口
2.Operator接口
1.1RxJava1实例
Observable.create(new Observable.OnSubscribe() { @Override public void call(Subscriber<? super String> subscriber) { if (!subscriber.isUnsubscribed()) { subscriber.onNext("1"); subscriber.onNext("2"); subscriber.onCompleted(); } } }). //处理 map(new Func1() { @Override public Integer call(String s) { return Integer.parseInt(s)+2; } }). subscribe(new Observer() { @Override public void onCompleted() { Log.d("kpioneer", "onCompleted:"); } @Override public void onError(Throwable e) { } @Override public void onNext(Integer integer) { Log.d("kpioneer", "onNext:" + integer + ",integer instanceOf" + integer.getClass()); } });
运行
06-11 10:10:20.008 14148-14148/com.haocai.rxjavademo D/kpioneer: onNext:3,integer instanceOfclass java.lang.Integer06-11 10:10:20.008 14148-14148/com.haocai.rxjavademo D/kpioneer: onNext:4,integer instanceOfclass java.lang.Integer06-11 10:10:20.008 14148-14148/com.haocai.rxjavademo D/kpioneer: onCompleted:
1.2RxJava1操作符源码
RxJava1中OnSubscribeMap类
public final class OnSubscribeMap implements OnSubscribe { final Observable source; final Func1<? super T, ? extends R> transformer; public OnSubscribeMap(Observable source, Func1<? super T, ? extends R> transformer) { this.source = source; this.transformer = transformer; } @Override public void call(final Subscriber<? super R> o) { MapSubscriber parent = new MapSubscriber(o, transformer); o.add(parent); source.unsafeSubscribe(parent); } static final class MapSubscriber extends Subscriber { final Subscriber<? super R> actual; final Func1<? super T, ? extends R> mapper; boolean done; public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) { this.actual = actual; this.mapper = mapper; } @Override public void onNext(T t) { R result; try { result = mapper.call(t); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); unsubscribe(); onError(OnErrorThrowable.addValueAsLastCause(ex, t)); return; } actual.onNext(result); } @Override public void onError(Throwable e) { if (done) { RxJavaHooks.onError(e); return; } done = true; actual.onError(e); } @Override public void onCompleted() { if (done) { return; } actual.onCompleted(); } @Override public void setProducer(Producer p) { actual.setProducer(p); } }}
RxJava1中OnSubscribeLift类
public final class OnSubscribeLift implements OnSubscribe { final OnSubscribe parent; final Operator<? extends R, ? super T> operator; public OnSubscribeLift(OnSubscribe parent, Operator<? extends R, ? super T> operator) { this.parent = parent; this.operator = operator; } @Override public void call(Subscriber<? super R> o) { try { Subscriber<? super T> st = RxJavaHooks.onObservableLift(operator).call(o); try { // new Subscriber created and being subscribed with so 'onStart' it st.onStart(); parent.call(st); } catch (Throwable e) { // localized capture of errors rather than it skipping all operators // and ending up in the try/catch of the subscribe method which then // prevents onErrorResumeNext and other similar approaches to error handling Exceptions.throwIfFatal(e); st.onError(e); } } catch (Throwable e) { Exceptions.throwIfFatal(e); // if the lift function failed all we can do is pass the error to the final Subscriber // as we don't have the operator available to us o.onError(e); } }}
1.3变换的原理(核心操作符lift):
1.接收原OnSubscribe的当前的Operator
2.创建一个新的OnSubscribe并返回新的Observable
public final Observable lift(final Operator<? extends R, ? super T> operator) { return unsafeCreate(new OnSubscribeLift(onSubscribe, operator)); }
3.用新的Subscriber包裹旧的Subscriber
MapSubscriber parent = new MapSubscriber(o, transformer); o.add(parent); source.unsafeSubscribe(parent);
4.在新的Subscriber里做完变换再传给旧的Subscriber
@Override public void onNext(T t) { R result; try { result = mapper.call(t); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); unsubscribe(); onError(OnErrorThrowable.addValueAsLastCause(ex, t)); return; } actual.onNext(result); }
public final Observable lift(final Operator<? extends R, ? super T> operator) { return unsafeCreate(new OnSubscribeLift(onSubscribe, operator)); }
分析
核心实现使用了代理机制
2.RxJava2操作符源码分析
2.1.RxJava2实例
Observable. create(new ObservableOnSubscribe() { @Override public void subscribe(ObservableEmitter e) throws Exception { if (!e.isDisposed()) { e.onNext("1"); e.onNext("2"); e.onComplete(); } } }). map(new Function() { @Override public Integer apply(String s) throws Exception { return Integer.parseInt(s)+2; } }). subscribe(new Observer() { @Override public void onSubscribe(Disposable d) { Log.d("kpioneer", "onSubscribe:"); } @Override public void onNext(Integer value) { Log.d("kpioneer", "onNext:" + value); } @Override public void onError(Throwable e) { } @Override public void onComplete() { Log.d("kpioneer", "onComplete" ); } }); Flowable. create(new FlowableOnSubscribe() { @Override public void subscribe(FlowableEmitter e) throws Exception { if (!e.isCancelled()) { e.onNext("1"); e.onNext("2"); e.onComplete(); } } }, BackpressureStrategy.DROP). map(new Function() { @Override public Integer apply(String s) throws Exception { return Integer.parseInt(s)+2; } }). subscribe(new Subscriber() { @Override public void onSubscribe(Subscription s) { s.request(Long.MAX_VALUE); Log.d("kpioneer", "onSubscribe"); } @Override public void onNext(Integer integer) { Log.d("kpioneer", "onNext:" + integer); } @Override public void onError(Throwable t) { } @Override public void onComplete() { Log.d("kpioneer", "onComplete"); } });
运行
06-11 10:20:56.688 16675-16675/com.haocai.rxjavademo D/kpioneer: onSubscribe:06-11 10:20:56.688 16675-16675/com.haocai.rxjavademo D/kpioneer: onNext:306-11 10:20:56.698 16675-16675/com.haocai.rxjavademo D/kpioneer: onNext:406-11 10:20:56.698 16675-16675/com.haocai.rxjavademo D/kpioneer: onComplete06-11 10:20:56.758 16675-16675/com.haocai.rxjavademo D/kpioneer: onSubscribe06-11 10:20:56.758 16675-16675/com.haocai.rxjavademo D/kpioneer: onNext:306-11 10:20:56.758 16675-16675/com.haocai.rxjavademo D/kpioneer: onNext:406-11 10:20:56.768 16675-16675/com.haocai.rxjavademo D/kpioneer: onComplete
2.2.RxJava2操作符源码
Function接口
ObservableMap :无背压
public final class ObservableMap extends AbstractObservableWithUpstream { final Function<? super T, ? extends U> function; public ObservableMap(ObservableSource source, Function<? super T, ? extends U> function) { super(source); this.function = function; } @Override public void subscribeActual(Observer<? super U> t) { source.subscribe(new MapObserver(t, function)); } static final class MapObserver extends BasicFuseableObserver { final Function<? super T, ? extends U> mapper; MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) { super(actual); this.mapper = mapper; } @Override public void onNext(T t) { if (done) { return; } if (sourceMode != NONE) { actual.onNext(null); return; } U v; try { v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value."); } catch (Throwable ex) { fail(ex); return; } actual.onNext(v); } @Override public int requestFusion(int mode) { return transitiveBoundaryFusion(mode); } @Nullable @Override public U poll() throws Exception { T t = qs.poll(); return t != null ? ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null; } }}
1.ObservableMap继承了AbstractObservableWithUpstream抽象类
2.利用了AbstractObservableWithUpstream中subscribeActual方法
3.用原Observable去subscribe变换后的Observer
public final class ObservableLift extends AbstractObservableWithUpstream { /** The actual operator. */ final ObservableOperator<? extends R, ? super T> operator; public ObservableLift(ObservableSource source, ObservableOperator<? extends R, ? super T> operator) { super(source); this.operator = operator; } @Override public void subscribeActual(Observer<? super R> s) { Observer<? super T> observer; try { observer = ObjectHelper.requireNonNull(operator.apply(s), "Operator " + operator + " returned a null Observer"); } catch (NullPointerException e) { // NOPMD throw e; } catch (Throwable e) { Exceptions.throwIfFatal(e); // can't call onError because no way to know if a Disposable has been set or not // can't call onSubscribe because the call might have set a Disposable already RxJavaPlugins.onError(e); NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS"); npe.initCause(e); throw npe; } source.subscribe(observer); }}
FlowableMap: 有背压
public final class FlowableMap extends AbstractFlowableWithUpstream { final Function<? super T, ? extends U> mapper; public FlowableMap(Flowable source, Function<? super T, ? extends U> mapper) { super(source); this.mapper = mapper; } @Override protected void subscribeActual(Subscriber<? super U> s) { if (s instanceof ConditionalSubscriber) { source.subscribe(new MapConditionalSubscriber((ConditionalSubscriber<? super U>)s, mapper)); } else { source.subscribe(new MapSubscriber(s, mapper)); } } static final class MapSubscriber extends BasicFuseableSubscriber { final Function<? super T, ? extends U> mapper; MapSubscriber(Subscriber<? super U> actual, Function<? super T, ? extends U> mapper) { super(actual); this.mapper = mapper; } @Override public void onNext(T t) { if (done) { return; } if (sourceMode != NONE) { actual.onNext(null); return; } U v; try { v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value."); } catch (Throwable ex) { fail(ex); return; } actual.onNext(v); } @Override public int requestFusion(int mode) { return transitiveBoundaryFusion(mode); } @Nullable @Override public U poll() throws Exception { T t = qs.poll(); return t != null ? ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null; } } static final class MapConditionalSubscriber extends BasicFuseableConditionalSubscriber { final Function<? super T, ? extends U> mapper; MapConditionalSubscriber(ConditionalSubscriber<? super U> actual, Function<? super T, ? extends U> function) { super(actual); this.mapper = function; } @Override public void onNext(T t) { if (done) { return; } if (sourceMode != NONE) { actual.onNext(null); return; } U v; try { v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value."); } catch (Throwable ex) { fail(ex); return; } actual.onNext(v); } @Override public boolean tryOnNext(T t) { if (done) { return false; } U v; try { v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value."); } catch (Throwable ex) { fail(ex); return true; } return actual.tryOnNext(v); } @Override public int requestFusion(int mode) { return transitiveBoundaryFusion(mode); } @Nullable @Override public U poll() throws Exception { T t = qs.poll(); return t != null ? ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null; } }
1.FlowableMap继承了AbstractFlowableWithUpstream
2.利用了AbstractFlowableWithUpstream中的subscribeActual方法
3.用原Flowable去subscribe变换后的Subscriber
FlowableLift
public final class FlowableLift extends AbstractFlowableWithUpstream { /** The actual operator. */ final FlowableOperator<? extends R, ? super T> operator; public FlowableLift(Flowable source, FlowableOperator<? extends R, ? super T> operator) { super(source); this.operator = operator; } @Override public void subscribeActual(Subscriber<? super R> s) { try { Subscriber<? super T> st = operator.apply(s); if (st == null) { throw new NullPointerException("Operator " + operator + " returned a null Subscriber"); } source.subscribe(st); } catch (NullPointerException e) { // NOPMD throw e; } catch (Throwable e) { Exceptions.throwIfFatal(e); // can't call onError because no way to know if a Subscription has been set or not // can't call onSubscribe because the call might have set a Subscription already RxJavaPlugins.onError(e); NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS"); npe.initCause(e); throw npe; } }}
2.3Operator接口
1.实现此接口2.在subscribeActual中做变换3.用于扩展自定义操作符
分析
RxJava2 有背压和无背压核心实现使用了代理机制
3.RxJava1 操作符功能仿写实现
Operator接口实现
- Operator接口是操作符的抽象接口
- 个操作符实现Operator接口用于处理具体的变换
lift操作符
- 变换的基本原理
- 各操作符均实现Operator接口,并调用lift操作符
map操作符
- 最基本的操作符
- 顾名思义,用于做映射
public class Caller { final OnCall onCall; public Caller(OnCall onCall) { this.onCall = onCall; } public static Caller create(OnCall onCall) { return new Caller<>(onCall); } public Calling call(Receiver receiver) { this.onCall.call(receiver); return receiver; } public final Caller lift(final Operator operator) { return create(new OnCallLift<>(onCall, operator)); } public final Caller map(Func1 func) { return lift(new MapOperator(func)); } public interface OnCall extends Action1> { } public interface Operator extends Func1, Receiver> { }}
public interface Func1{ R call(T t);}
public class MapOperator implements Caller.Operator { private final Func1 mapper; public MapOperator(Func1 mapper) { this.mapper = mapper; } @Override public Receiver call(Receiver rReceiver) { return new MapReceiver<>(rReceiver, this.mapper); }}
public class MapReceiver extends Receiver { private final Receiver actual; private final Func1 mapper; public MapReceiver(Receiver actual, Func1 mapper) { this.actual = actual; this.mapper = mapper; } @Override public void onCompleted() { this.actual.onCompleted(); } @Override public void onError(Throwable t) { this.actual.onError(t); } @Override public void onReceive(T t) { R tR = this.mapper.call(t); this.actual.onReceive(tR); }}
public class OnCallLift implements Caller.OnCall { private final Caller.OnCall parent; private final Caller.Operator operator; public OnCallLift(Caller.OnCall parent, Caller.Operator operator) { this.parent = parent; this.operator = operator; } @Override public void call(Receiver rReceiver) { Receiver tReceiver = this.operator.call(rReceiver); this.parent.call(tReceiver); }}
调用
public class Lesson2_2Activity extends AppCompatActivity { @Override protected void onCreate(final Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_custom_test); ButterKnife.bind(this); } @OnClick(R.id.testDo) public void onViewClicked() { Caller. create(new Caller.OnCall() { @Override public void call(Receiver stringReceiver) { if (!stringReceiver.isUnCalled()) { stringReceiver.onReceive("1"); stringReceiver.onReceive("2"); stringReceiver.onCompleted(); } } }). map(new Func1() { @Override public Integer call(String s) { return Integer.parseInt(s)+2; } }). call(new Receiver() { @Override public void onCompleted() { Log.d("kpioneer", "onCompleted"); } @Override public void onError(Throwable t) { } @Override public void onReceive(Integer integer) { Log.d("kpioneer", "onReceive:" + integer); } }); }}
Log输出
06-11 16:45:26.988 10850-10850/com.haocai.rxjavademo D/kpioneer: onReceive:306-11 16:45:26.988 10850-10850/com.haocai.rxjavademo D/kpioneer: onReceive:406-11 16:45:26.988 10850-10850/com.haocai.rxjavademo D/kpioneer: onCompleted
3.RxJava2(无背压) 操作符功能仿写实现
CallerWithUpstream(类似于AbstractObservableWithUpstream)
- 一个抽象类
- 有callActual方法
- 实现操作符需实现此方法
map操作符
- 最基本的操作符
- 顾名思义,用于做映射
CallerOperator接口
- 在callActual中做变换
- 可用于扩展操作符
相关代码:
public abstract class Caller { public static Caller create(CallerOnCall callerOnCall) { return new CallerCreate<>(callerOnCall); } public void call(Callee callee) { callActual(callee); } protected abstract void callActual(Callee callee); public Caller lift(CallerOperator operator) { return new CallerLift<>(this, operator); } public Caller map(Function function) { return new CallerMap<>(this, function); }}
public interface CallerOperator { Callee call(Callee callee);}
/** * Created by Xionghu on 2018/6/11. * Desc:返回源Caller */public interface CallerSource { Caller source();}
public abstract class CallerWithUpstream extends Caller implements CallerSource { protected final Caller source; public CallerWithUpstream(Caller source) { this.source = source; } @Override public Caller source() { return source; }}
public class CallerLift extends CallerWithUpstream { private final CallerOperator mOperator; public CallerLift(Caller source, CallerOperator mOperator) { super(source); this.mOperator = mOperator; } @Override protected void callActual(Callee callee) { Callee tCallee = mOperator.call(callee); source.call(tCallee); }}
public interface Function { R call(T t);}
public class CallerMap extends CallerWithUpstream { private Function function; public CallerMap(Caller source, Function function) { super(source); this.function = function; } @Override protected void callActual(Callee callee) { source.call(new MapCallee<>(callee, function)); } static class MapCallee implements Callee { private final Callee mCallee; private final Function mFunction; public MapCallee(Callee mCallee, Function mFunction) { this.mCallee = mCallee; this.mFunction = mFunction; } @Override public void onCall(Release release) { mCallee.onCall(release); } @Override public void onReceive(T t) { R tR = mFunction.call(t); mCallee.onReceive(tR); } @Override public void onCompleted() { mCallee.onCompleted(); } @Override public void onError(Throwable t) { mCallee.onError(t); } }}
/** * Created by Xionghu on 2018/6/11. * Desc: 仿写RxJava2 无背压 操作符方法 */public class Lesson2_3Activity extends AppCompatActivity { @Override protected void onCreate(final Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_custom_test); ButterKnife.bind(this); } @OnClick(R.id.testDo) public void onViewClicked() { Caller. create(new CallerOnCall() { @Override public void call(CallerEmitter callerEmitter) { callerEmitter.onReceive("1"); callerEmitter.onReceive("2"); callerEmitter.onCompleted(); } }). map(new Function() { @Override public Integer call(String s) { return Integer.parseInt(s); } }). call(new Callee() { @Override public void onCall(Release release) { Log.d("kpioneer", "onCall"); } @Override public void onReceive(Integer integer) { Log.d("kpioneer", "onReceive:" + integer); } @Override public void onCompleted() { Log.d("kpioneer", "onCompleted"); } @Override public void onError(Throwable t) { } }); Caller. create(new CallerOnCall() { @Override public void call(CallerEmitter callerEmitter) { callerEmitter.onReceive("3"); callerEmitter.onReceive("4"); callerEmitter.onCompleted(); } }). lift(new CallerOperator() { @Override public Callee call(final Callee callee) { return new Callee() { @Override public void onCall(Release release) { callee.onCall(release); } @Override public void onReceive(String s) { callee.onReceive(Integer.parseInt(s)); } @Override public void onCompleted() { callee.onCompleted(); } @Override public void onError(Throwable t) { callee.onError(t); } }; } }). call(new Callee() { @Override public void onCall(Release release) { Log.d("kpioneer", "onCall"); } @Override public void onReceive(Integer integer) { Log.d("kpioneer", "onReceive:" + integer); } @Override public void onCompleted() { Log.d("kpioneer", "onCompleted"); } @Override public void onError(Throwable t) { Log.d("kpioneer", "onError"); } }); }}
06-11 18:03:27.268 24409-24409/com.haocai.rxjavademo D/kpioneer: onCall06-11 18:03:27.268 24409-24409/com.haocai.rxjavademo D/kpioneer: onReceive:106-11 18:03:27.268 24409-24409/com.haocai.rxjavademo D/kpioneer: onReceive:206-11 18:03:27.268 24409-24409/com.haocai.rxjavademo D/kpioneer: onCompleted06-11 18:03:27.268 24409-24409/com.haocai.rxjavademo D/kpioneer: onCall06-11 18:03:27.268 24409-24409/com.haocai.rxjavademo D/kpioneer: onReceive:306-11 18:03:27.268 24409-24409/com.haocai.rxjavademo D/kpioneer: onReceive:406-11 18:03:27.268 24409-24409/com.haocai.rxjavademo D/kpioneer: onCompleted
4.RxJava2(有背压) 操作符功能仿写实现
TelephonerOperator接口
- 在callActual中做变换
- 可用于扩展操作符
TelephonerWithUpstream(类似于AbstractObservableWithUpstream)
- 一个抽象类
- 有callActual方法
- 实现操作符需实现此方法
相关源码
public abstract class Telephoner { public static Telephoner create(TelephonerOnCall telephonerOnCall){ return new TelephonerCreate<>(telephonerOnCall); } public void call(Receiver receiver) { callActual(receiver);} protected abstract void callActual(Receiver receiver); public Telephoner map(Function function) { return new TelephonerMap<>(this, function); } public Telephoner lift(TelephonerOperator telephonerOperator) { return new TelephonerLift<>(this, telephonerOperator); }}
/** * Created by Xionghu on 2018/6/12. * Desc: lift操作符 */public class TelephonerLift extends TelephonerWithUpstream { private final TelephonerOperator operator; public TelephonerLift(Telephoner source, TelephonerOperator operator) { super(source); this.operator = operator; } @Override protected void callActual(Receiver receiver) { Receiver tReceiver = operator.call(receiver); source.call(tReceiver); }}
import com.haocai.mylibrary.rxJava2.Function;/** * Created by Xionghu on 2018/6/12. * Desc: map操作符 */public class TelephonerMap extends TelephonerWithUpstream { private Function trFunction; public TelephonerMap(Telephoner source, Function trFunction) { super(source); this.trFunction = trFunction; } @Override protected void callActual(Receiver receiver) { source.call(new MapReceiver<>(receiver, trFunction)); } static class MapReceiver implements Receiver { private final Receiver rReceiver; private final Function trFunction; public MapReceiver(Receiver rReceiver, Function trFunction) { this.rReceiver = rReceiver; this.trFunction = trFunction; } @Override public void onCall(Drop d) { rReceiver.onCall(d); } @Override public void onReceive(T t) { R tr = trFunction.call(t); rReceiver.onReceive(tr); } @Override public void onError(Throwable t) { rReceiver.onError(t); } @Override public void onCompleted() { rReceiver.onCompleted(); } }}
/** * Created by Xionghu on 2018/6/12. * Desc: 操作符接口 */public interface TelephonerOperator { Receiver call(Receiver callee);}
/** * Created by Xionghu on 2018/6/11. * Desc: 返回源Telephoner */public interface TelephonerSource { Telephoner source();}
public abstract class TelephonerWithUpstream extends Telephoner implements TelephonerSource { protected final Telephoner source; public TelephonerWithUpstream(Telephoner source) { this.source = source; } @Override public Telephoner source() { return source; }}
import android.os.Bundle;import android.support.v7.app.AppCompatActivity;import android.util.Log;import com.haocai.mylibrary.rxJava2.Function;import com.haocai.mylibrary.rxJava2.backpressure.Drop;import com.haocai.mylibrary.rxJava2.backpressure.Receiver;import com.haocai.mylibrary.rxJava2.backpressure.Telephoner;import com.haocai.mylibrary.rxJava2.backpressure.TelephonerEmitter;import com.haocai.mylibrary.rxJava2.backpressure.TelephonerOnCall;import com.haocai.mylibrary.rxJava2.backpressure.TelephonerOperator;import com.haocai.rxjavademo.R;import butterknife.ButterKnife;import butterknife.OnClick;/** * Created by Xionghu on 2018/6/11. * Desc: 仿写RxJava2 无背压 操作符方法 */public class Lesson2_4Activity extends AppCompatActivity { @Override protected void onCreate(final Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_custom_test); ButterKnife.bind(this); } @OnClick(R.id.testDo) public void onViewClicked() { Telephoner. create(new TelephonerOnCall() { @Override public void call(TelephonerEmitter telephonerEmitter) { telephonerEmitter.onReceive("1"); telephonerEmitter.onReceive("2"); telephonerEmitter.onCompleted(); } }). map(new Function() { @Override public Integer call(String s) { return Integer.parseInt(s); } }). call(new Receiver() { @Override public void onCall(Drop d) { d.request(Long.MAX_VALUE); Log.d("kpioneer", "onCall"); } @Override public void onReceive(Integer integer) { Log.d("kpioneer", "onReceive:" + integer); } @Override public void onError(Throwable t) { Log.d("kpioneer", "onError"); } @Override public void onCompleted() { Log.d("kpioneer", "onCompleted"); } }); Telephoner. create(new TelephonerOnCall() { @Override public void call(TelephonerEmitter telephonerEmitter) { telephonerEmitter.onReceive("3"); telephonerEmitter.onReceive("4"); telephonerEmitter.onCompleted(); } }). lift(new TelephonerOperator() { @Override public Receiver call(final Receiver receiver) { return new Receiver() { @Override public void onCall(Drop d) { receiver.onCall(d); } @Override public void onReceive(String s) { receiver.onReceive(Integer.parseInt(s)); } @Override public void onError(Throwable t) { receiver.onError(t); } @Override public void onCompleted() { receiver.onCompleted(); } }; } }). call(new Receiver() { @Override public void onCall(Drop d) { d.request(Long.MAX_VALUE); Log.d("kpioneer", "onCall"); } @Override public void onReceive(Integer integer) { Log.d("kpioneer", "onReceive:" + integer); } @Override public void onError(Throwable t) { Log.d("kpioneer", "onError"); } @Override public void onCompleted() { Log.d("kpioneer", "onCompleted"); } }); }}
06-12 09:56:50.108 22364-22364/com.haocai.rxjavademo D/kpioneer: onCall06-12 09:56:50.108 22364-22364/com.haocai.rxjavademo D/kpioneer: onReceive:106-12 09:56:50.108 22364-22364/com.haocai.rxjavademo D/kpioneer: onReceive:206-12 09:56:50.108 22364-22364/com.haocai.rxjavademo D/kpioneer: onCompleted06-12 09:56:50.108 22364-22364/com.haocai.rxjavademo D/kpioneer: onCall06-12 09:56:50.108 22364-22364/com.haocai.rxjavademo D/kpioneer: onReceive:306-12 09:56:50.108 22364-22364/com.haocai.rxjavademo D/kpioneer: onReceive:406-12 09:56:50.108 22364-22364/com.haocai.rxjavademo D/kpioneer: onCompleted
源码下载
https://github.com/kpioneer123/RxJavaLearning
更多相关文章
- android中使用OpenGL ES
- android studio 报错Failed to save settings解决方法
- cocos2dx生成android时出错解决方法
- 通过Criteria获取LocationProvider
- Android(安卓)属性动画 源码解析 深入了解其内部实现
- java虚拟机理解
- Android(安卓)Fragment生命周期
- android中的dailog
- Android(安卓)解决监听home键的几种方法