EventBus3.0源码解析(二):post()与postSticky()
在上一篇文章中,我们分析了register和unregister过程:EventBus3.0源码解析(一):register()与unregister()
接下来我们分析post()与postSticky()。
1.postSticky(Object event)
这个方法是发送粘性事件,细节如下:
public void postSticky(Object event) { synchronized (stickyEvents) { stickyEvents.put(event.getClass(), event); } post(event); }
将事件加入粘性事件map中(当有新的注册者时传递),并调用post方法。
2.post(Object event)
public void post(Object event) { PostingThreadState postingState = currentPostingThreadState.get(); List
首先获取当前线程PostingThreadState对象,其中currentPostingThreadState:
private final ThreadLocal currentPostingThreadState = new ThreadLocal() { @Override protected PostingThreadState initialValue() { return new PostingThreadState(); } };
当获取当前线程PostingThreadState后,获得其事件队列(list),判断当前线程是否在传递事件,如果不是,调用isMainThread()将值赋予postingThreadState对象,并把isPosting改为true,判断其变量canceled,查看传递是否被停止,是的话则抛出异常:“内部错误。中止状态未重置”。如果为否则循环事件队列调用postSingleEvent方法直至队列为空,最后重置当前线程PostingThreadState标志位isMainThread与isPosting。
PostingThreadState保存当前传递线程的状态,如是否为主线程,是否在传递事件等:
final static class PostingThreadState { final List
postSingleEvent方法:
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error { Class<?> eventClass = event.getClass(); boolean subscriptionFound = false; if (eventInheritance) { List> eventTypes = lookupAllEventTypes(eventClass); int countTypes = eventTypes.size(); for (int h = 0; h < countTypes; h++) { Class<?> clazz = eventTypes.get(h); subscriptionFound |= postSingleEventForEventType(event, postingState, clazz); } } else { subscriptionFound = postSingleEventForEventType(event, postingState, eventClass); } if (!subscriptionFound) { if (logNoSubscriberMessages) { logger.log(Level.FINE, "No subscribers registered for event " + eventClass); } if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class && eventClass != SubscriberExceptionEvent.class) { post(new NoSubscriberEvent(this, event)); } } }
首先获取传递事件类型,如果eventInheritance为true,则代表需将此事件传递给此事件父类的接收者。调用lookupAllEventTypes方法获得所有需传递的事件类型。
private static List> lookupAllEventTypes(Class<?> eventClass) { synchronized (eventTypesCache) { List> eventTypes = eventTypesCache.get(eventClass); if (eventTypes == null) { eventTypes = new ArrayList<>(); Class<?> clazz = eventClass; while (clazz != null) { eventTypes.add(clazz); addInterfaces(eventTypes, clazz.getInterfaces()); clazz = clazz.getSuperclass(); } eventTypesCache.put(eventClass, eventTypes); } return eventTypes; } }
eventTypesCache中存储一个类型的所有父类及其接口及本身接口(包括自身类型)。
同步eventTypesCache获取传入事件类型对应列表,如果列表为空则新建列表遍历类型将其所有父类及其接口,本身接口及类型放入列表,然后放入eventTypesCache,并返回列表。
postSingleEvent方法得到列表后循环列表,执行方法postSingleEventForEventType(Object, PostingThreadState, Class<?>):
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) { CopyOnWriteArrayList subscriptions; synchronized (this) { subscriptions = subscriptionsByEventType.get(eventClass); } if (subscriptions != null && !subscriptions.isEmpty()) { for (Subscription subscription : subscriptions) { postingState.event = event; postingState.subscription = subscription; boolean aborted = false; try { postToSubscription(subscription, event, postingState.isMainThread); aborted = postingState.canceled; } finally { postingState.event = null; postingState.subscription = null; postingState.canceled = false; } if (aborted) { break; } } return true; } return false; }
首先根据传递事件类型从subscriptionsByEventType中获取Subscription列表,循环列表,设置postingState的event和subscription,调用postToSubscription方法,每传递给一个注册者,获取postingState的canceled,在最后清空postingState,然后根据的canceled获取判断当前线程传递是否被停止,如果是的话则跳出循环。接下来看postToSubscription方法:
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) { switch (subscription.subscriberMethod.threadMode) { case POSTING: invokeSubscriber(subscription, event); break; case MAIN: if (isMainThread) { invokeSubscriber(subscription, event); } else { mainThreadPoster.enqueue(subscription, event); } break; case MAIN_ORDERED: if (mainThreadPoster != null) { mainThreadPoster.enqueue(subscription, event); } else { // temporary: technically not correct as poster not decoupled from subscriber invokeSubscriber(subscription, event); } break; case BACKGROUND: if (isMainThread) { backgroundPoster.enqueue(subscription, event); } else { invokeSubscriber(subscription, event); } break; case ASYNC: asyncPoster.enqueue(subscription, event); break; default: throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode); } }
根据不同注册者处理方法指定的线程方式执行不同逻辑,如果指定线程模式不匹配的话会抛出异常。
如果为POSTING(默认模式),则在哪个线程发送,在哪个线程执行。调用invokeSubscriber方法,
如果为MAIN(主线程模式),如果当前为主线程则直接执行invokeSubscriber方法,不然则调用mainThreadPoster对象的enqueue(subscription, event)方法将事件传递切换到主线程。
如果为MAIN_ORDERED,则直接调用mainThreadPoster对象的enqueue方法,保证事件执行顺序,与MAIN模式不同之处就是发送事件是否按照串行顺序执行。
如果为BACKGROUND(子线程模式),如果当前不是主线程则直接调用invokeSubscriber方法,否则调用对象backgroundPoster的enqueue方法。
如果为ASYNC(异步模式),则调用对象asyncPoster的enqueue(subscription, event)方法。
各种模式最终还是调用invokeSubscriber方法:
//利用传入参数执行方法。void invokeSubscriber(Subscription subscription, Object event) { try { subscription.subscriberMethod.method.invoke(subscription.subscriber, event); } catch (InvocationTargetException e) { handleSubscriberException(subscription, event, e.getCause()); } catch (IllegalAccessException e) { throw new IllegalStateException("Unexpected exception", e); } }
除了,模式POSTING外,其他模式都有调用Poster的enqueue方法。 Poster是一个接口,其实现类有HandlerPoster,BackgroundPoster,AsyncPoster,对应3种线程模式切换线程。
mainThreadPoster调用对象mainThreadSupport的createPoster(EventBus)方法创建对象(返回HandlerPoster对象)。HandlerPoster继承了Handler,持有EventBus实例:
public class HandlerPoster extends Handler implements Poster { private final PendingPostQueue queue; private final int maxMillisInsideHandleMessage; private final EventBus eventBus; private boolean handlerActive; protected HandlerPoster(EventBus eventBus, Looper looper, int maxMillisInsideHandleMessage) { super(looper); this.eventBus = eventBus; this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage; queue = new PendingPostQueue(); } ......}
HandlerPoster利用Handler机制切到主线程执行,其enqueue方法:
public void enqueue(Subscription subscription, Object event) { PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event); synchronized (this) { queue.enqueue(pendingPost); if (!handlerActive) { handlerActive = true; if (!sendMessage(obtainMessage())) { throw new EventBusException("Could not send handler message"); } } } }
构建一个PendingPost对象将其放入PendingPostQueue,并调用sendMessage()方法发送一个Message,利用handler机制使其执行handleMessage方法:
@Override public void handleMessage(Message msg) { boolean rescheduled = false; try { long started = SystemClock.uptimeMillis(); while (true) { PendingPost pendingPost = queue.poll(); if (pendingPost == null) { synchronized (this) { // Check again, this time in synchronized pendingPost = queue.poll(); if (pendingPost == null) { handlerActive = false; return; } } } eventBus.invokeSubscriber(pendingPost); long timeInMethod = SystemClock.uptimeMillis() - started; if (timeInMethod >= maxMillisInsideHandleMessage) { if (!sendMessage(obtainMessage())) { throw new EventBusException("Could not send handler message"); } rescheduled = true; return; } } } finally { handlerActive = rescheduled; } }
andleMessage方法从队列取出pendingPost对象,再执行eventBus对象的invokeSubscriber(PendingPost)方法,其中调用invokeSubscriber(Subscription, Object)方法,此方法在上面已说明。还有一些状态位及判断,这里不再详述。
BackgroundPoster继承Runnable,本身就是一个子线程:
final class BackgroundPoster implements Runnable, Poster { private final PendingPostQueue queue; private final EventBus eventBus; private volatile boolean executorRunning; BackgroundPoster(EventBus eventBus) { this.eventBus = eventBus; queue = new PendingPostQueue(); } ......}
再看其enqueue方法:
public void enqueue(Subscription subscription, Object event) { PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event); synchronized (this) { queue.enqueue(pendingPost); if (!executorRunning) { executorRunning = true; eventBus.getExecutorService().execute(this); } } }
新建PendingPost对象并入队,调用eventBus中的线程池执行本身,即run方法:
@Override public void run() { try { try { while (true) { PendingPost pendingPost = queue.poll(1000); if (pendingPost == null) { synchronized (this) { // Check again, this time in synchronized pendingPost = queue.poll(); if (pendingPost == null) { executorRunning = false; return; } } } eventBus.invokeSubscriber(pendingPost); } } catch (InterruptedException e) { eventBus.getLogger().log(Level.WARNING, Thread.currentThread().getName() + " was interruppted", e); } } finally { executorRunning = false; } }
最终还是执行eventBus的invokeSubscriber()方法。
AsyncPoster与BackgroundPoster类似,不再详述。其代码如下:
class AsyncPoster implements Runnable, Poster { private final PendingPostQueue queue; private final EventBus eventBus; AsyncPoster(EventBus eventBus) { this.eventBus = eventBus; queue = new PendingPostQueue(); } public void enqueue(Subscription subscription, Object event) { PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event); queue.enqueue(pendingPost); eventBus.getExecutorService().execute(this); } @Override public void run() { PendingPost pendingPost = queue.poll(); if(pendingPost == null) { throw new IllegalStateException("No pending post available"); } eventBus.invokeSubscriber(pendingPost); }}
AsyncPoster与BackgroundPoster都使用了 eventBus.getExecutorService()方法,此方法返回一个线程池:
private final ExecutorService executorService;ExecutorService getExecutorService() { return executorService; }
到此EventBus核心源码解析完成。
更多相关文章
- Android(安卓)面试复习资料
- Android让屏幕保持常亮,不熄屏的三种方法
- Android知识点总结(二十)Android中的ANR
- android将path拆分为多个path
- Android(安卓)异步 RxAndroid框架官方例子
- Android(安卓)利用getIdentifier()方法获取资源ID
- 百度地图SDK for Android【事件监听】
- android的Dialog使用
- android中的保存数据方法