在上一篇文章中,我们分析了register和unregister过程:EventBus3.0源码解析(一):register()与unregister()

接下来我们分析post()与postSticky()。

1.postSticky(Object event)

这个方法是发送粘性事件,细节如下:

public void postSticky(Object event) {        synchronized (stickyEvents) {            stickyEvents.put(event.getClass(), event);        }               post(event);    }

将事件加入粘性事件map中(当有新的注册者时传递),并调用post方法。

2.post(Object event)

 public void post(Object event) {        PostingThreadState postingState = currentPostingThreadState.get();        List eventQueue = postingState.eventQueue;        eventQueue.add(event);        if (!postingState.isPosting) {            postingState.isMainThread = isMainThread();            postingState.isPosting = true;            if (postingState.canceled) {                throw new EventBusException("Internal error. Abort state was not reset");            }            try {                while (!eventQueue.isEmpty()) {                    postSingleEvent(eventQueue.remove(0), postingState);                }            } finally {                postingState.isPosting = false;                postingState.isMainThread = false;            }        }    }   

首先获取当前线程PostingThreadState对象,其中currentPostingThreadState:

private final ThreadLocal currentPostingThreadState = new ThreadLocal() {        @Override        protected PostingThreadState initialValue() {            return new PostingThreadState();        }    };

当获取当前线程PostingThreadState后,获得其事件队列(list),判断当前线程是否在传递事件,如果不是,调用isMainThread()将值赋予postingThreadState对象,并把isPosting改为true,判断其变量canceled,查看传递是否被停止,是的话则抛出异常:“内部错误。中止状态未重置”。如果为否则循环事件队列调用postSingleEvent方法直至队列为空,最后重置当前线程PostingThreadState标志位isMainThread与isPosting。

PostingThreadState保存当前传递线程的状态,如是否为主线程,是否在传递事件等:

 final static class PostingThreadState {        final List eventQueue = new ArrayList<>();        boolean isPosting;        boolean isMainThread;        Subscription subscription;        Object event;        boolean canceled;    }   

postSingleEvent方法:

 private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {        Class<?> eventClass = event.getClass();        boolean subscriptionFound = false;        if (eventInheritance) {            List> eventTypes = lookupAllEventTypes(eventClass);            int countTypes = eventTypes.size();            for (int h = 0; h < countTypes; h++) {                Class<?> clazz = eventTypes.get(h);                subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);            }        } else {            subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);        }        if (!subscriptionFound) {            if (logNoSubscriberMessages) {                logger.log(Level.FINE, "No subscribers registered for event " + eventClass);            }            if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&                    eventClass != SubscriberExceptionEvent.class) {                post(new NoSubscriberEvent(this, event));            }        }    }

首先获取传递事件类型,如果eventInheritance为true,则代表需将此事件传递给此事件父类的接收者。调用lookupAllEventTypes方法获得所有需传递的事件类型。

 private static List> lookupAllEventTypes(Class<?> eventClass) {        synchronized (eventTypesCache) {            List> eventTypes = eventTypesCache.get(eventClass);            if (eventTypes == null) {                eventTypes = new ArrayList<>();                Class<?> clazz = eventClass;                while (clazz != null) {                    eventTypes.add(clazz);                    addInterfaces(eventTypes, clazz.getInterfaces());                    clazz = clazz.getSuperclass();                }                eventTypesCache.put(eventClass, eventTypes);            }            return eventTypes;        }    }

eventTypesCache中存储一个类型的所有父类及其接口及本身接口(包括自身类型)。

同步eventTypesCache获取传入事件类型对应列表,如果列表为空则新建列表遍历类型将其所有父类及其接口,本身接口及类型放入列表,然后放入eventTypesCache,并返回列表。

postSingleEvent方法得到列表后循环列表,执行方法postSingleEventForEventType(Object, PostingThreadState, Class<?>):

 private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {        CopyOnWriteArrayList subscriptions;        synchronized (this) {            subscriptions = subscriptionsByEventType.get(eventClass);        }        if (subscriptions != null && !subscriptions.isEmpty()) {            for (Subscription subscription : subscriptions) {                postingState.event = event;                postingState.subscription = subscription;                boolean aborted = false;                try {                    postToSubscription(subscription, event, postingState.isMainThread);                    aborted = postingState.canceled;                } finally {                    postingState.event = null;                    postingState.subscription = null;                    postingState.canceled = false;                }                if (aborted) {                    break;                }            }            return true;        }        return false;    }

首先根据传递事件类型从subscriptionsByEventType中获取Subscription列表,循环列表,设置postingState的event和subscription,调用postToSubscription方法,每传递给一个注册者,获取postingState的canceled,在最后清空postingState,然后根据的canceled获取判断当前线程传递是否被停止,如果是的话则跳出循环。接下来看postToSubscription方法:

private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {        switch (subscription.subscriberMethod.threadMode) {            case POSTING:                invokeSubscriber(subscription, event);                break;            case MAIN:                if (isMainThread) {                    invokeSubscriber(subscription, event);                } else {                    mainThreadPoster.enqueue(subscription, event);                }                break;            case MAIN_ORDERED:                if (mainThreadPoster != null) {                    mainThreadPoster.enqueue(subscription, event);                } else {                    // temporary: technically not correct as poster not decoupled from subscriber                    invokeSubscriber(subscription, event);                }                break;            case BACKGROUND:                if (isMainThread) {                    backgroundPoster.enqueue(subscription, event);                } else {                    invokeSubscriber(subscription, event);                }                break;            case ASYNC:                asyncPoster.enqueue(subscription, event);                break;            default:                throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);        }    }

根据不同注册者处理方法指定的线程方式执行不同逻辑,如果指定线程模式不匹配的话会抛出异常。

如果为POSTING(默认模式),则在哪个线程发送,在哪个线程执行。调用invokeSubscriber方法,

如果为MAIN(主线程模式),如果当前为主线程则直接执行invokeSubscriber方法,不然则调用mainThreadPoster对象的enqueue(subscription, event)方法将事件传递切换到主线程。

如果为MAIN_ORDERED,则直接调用mainThreadPoster对象的enqueue方法,保证事件执行顺序,与MAIN模式不同之处就是发送事件是否按照串行顺序执行。

如果为BACKGROUND(子线程模式),如果当前不是主线程则直接调用invokeSubscriber方法,否则调用对象backgroundPoster的enqueue方法。

如果为ASYNC(异步模式),则调用对象asyncPoster的enqueue(subscription, event)方法。

各种模式最终还是调用invokeSubscriber方法:

//利用传入参数执行方法。void invokeSubscriber(Subscription subscription, Object event) {        try {            subscription.subscriberMethod.method.invoke(subscription.subscriber, event);        } catch (InvocationTargetException e) {            handleSubscriberException(subscription, event, e.getCause());        } catch (IllegalAccessException e) {            throw new IllegalStateException("Unexpected exception", e);        }    }

除了,模式POSTING外,其他模式都有调用Poster的enqueue方法。 Poster是一个接口,其实现类有HandlerPoster,BackgroundPoster,AsyncPoster,对应3种线程模式切换线程。

mainThreadPoster调用对象mainThreadSupport的createPoster(EventBus)方法创建对象(返回HandlerPoster对象)。HandlerPoster继承了Handler,持有EventBus实例:

public class HandlerPoster extends Handler implements Poster {    private final PendingPostQueue queue;    private final int maxMillisInsideHandleMessage;    private final EventBus eventBus;    private boolean handlerActive;    protected HandlerPoster(EventBus eventBus, Looper looper, int maxMillisInsideHandleMessage) {        super(looper);        this.eventBus = eventBus;        this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage;        queue = new PendingPostQueue();    }   ......}

HandlerPoster利用Handler机制切到主线程执行,其enqueue方法:

public void enqueue(Subscription subscription, Object event) {        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);        synchronized (this) {            queue.enqueue(pendingPost);            if (!handlerActive) {                handlerActive = true;                if (!sendMessage(obtainMessage())) {                    throw new EventBusException("Could not send handler message");                }            }        }    }

构建一个PendingPost对象将其放入PendingPostQueue,并调用sendMessage()方法发送一个Message,利用handler机制使其执行handleMessage方法:

 @Override    public void handleMessage(Message msg) {        boolean rescheduled = false;        try {            long started = SystemClock.uptimeMillis();            while (true) {                PendingPost pendingPost = queue.poll();                if (pendingPost == null) {                    synchronized (this) {                        // Check again, this time in synchronized                        pendingPost = queue.poll();                        if (pendingPost == null) {                            handlerActive = false;                            return;                        }                    }                }                eventBus.invokeSubscriber(pendingPost);                long timeInMethod = SystemClock.uptimeMillis() - started;                if (timeInMethod >= maxMillisInsideHandleMessage) {                    if (!sendMessage(obtainMessage())) {                        throw new EventBusException("Could not send handler message");                    }                    rescheduled = true;                    return;                }            }        } finally {            handlerActive = rescheduled;        }    }

andleMessage方法从队列取出pendingPost对象,再执行eventBus对象的invokeSubscriber(PendingPost)方法,其中调用invokeSubscriber(Subscription, Object)方法,此方法在上面已说明。还有一些状态位及判断,这里不再详述。

BackgroundPoster继承Runnable,本身就是一个子线程:

final class BackgroundPoster implements Runnable, Poster {    private final PendingPostQueue queue;    private final EventBus eventBus;    private volatile boolean executorRunning;    BackgroundPoster(EventBus eventBus) {        this.eventBus = eventBus;        queue = new PendingPostQueue();    }    ......}

再看其enqueue方法:

public void enqueue(Subscription subscription, Object event) {        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);        synchronized (this) {            queue.enqueue(pendingPost);            if (!executorRunning) {                executorRunning = true;                eventBus.getExecutorService().execute(this);            }        }    }

新建PendingPost对象并入队,调用eventBus中的线程池执行本身,即run方法:

@Override    public void run() {        try {            try {                while (true) {                    PendingPost pendingPost = queue.poll(1000);                    if (pendingPost == null) {                        synchronized (this) {                            // Check again, this time in synchronized                            pendingPost = queue.poll();                            if (pendingPost == null) {                                executorRunning = false;                                return;                            }                        }                    }                    eventBus.invokeSubscriber(pendingPost);                }            } catch (InterruptedException e) {                eventBus.getLogger().log(Level.WARNING, Thread.currentThread().getName() + " was interruppted", e);            }        } finally {            executorRunning = false;        }    }

最终还是执行eventBus的invokeSubscriber()方法。

AsyncPoster与BackgroundPoster类似,不再详述。其代码如下:

class AsyncPoster implements Runnable, Poster {    private final PendingPostQueue queue;    private final EventBus eventBus;    AsyncPoster(EventBus eventBus) {        this.eventBus = eventBus;        queue = new PendingPostQueue();    }    public void enqueue(Subscription subscription, Object event) {        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);        queue.enqueue(pendingPost);        eventBus.getExecutorService().execute(this);    }    @Override    public void run() {        PendingPost pendingPost = queue.poll();        if(pendingPost == null) {            throw new IllegalStateException("No pending post available");        }        eventBus.invokeSubscriber(pendingPost);    }}

AsyncPoster与BackgroundPoster都使用了 eventBus.getExecutorService()方法,此方法返回一个线程池:

private final ExecutorService executorService;ExecutorService getExecutorService() {        return executorService;    }

到此EventBus核心源码解析完成。

更多相关文章

  1. Android(安卓)面试复习资料
  2. Android让屏幕保持常亮,不熄屏的三种方法
  3. Android知识点总结(二十)Android中的ANR
  4. android将path拆分为多个path
  5. Android(安卓)异步 RxAndroid框架官方例子
  6. Android(安卓)利用getIdentifier()方法获取资源ID
  7. 百度地图SDK for Android【事件监听】
  8. android的Dialog使用
  9. android中的保存数据方法

随机推荐

  1. 什么时候使用 CountDownLatch
  2. 数据结构--时间复杂度与空间复杂度
  3. JUnit Theories 介绍
  4. Android中gradle的配置
  5. 为什么 Java 不支持多重继承
  6. Java 集合框架面试问题集锦
  7. 从零开始学习C语言
  8. 改进异常处理的 6 条建议
  9. RBAC聚合
  10. 基本类型转 String