Home EventBus使用和解析
Post
Cancel

EventBus使用和解析

使用

依赖

Maven地址

1
implementation 'org.greenrobot:eventbus:3.3.1'

使用

绑定事件

1
2
3
4
@Subscribe(threadMode = ThreadMode.MAIN, sticky = false, priority = 1)
fun notice() {

}

注册对象

1
2
3
4
override fun onCreate(savedInstanceState: Bundle?, persistentState: PersistableBundle?) {
    super.onCreate(savedInstanceState, persistentState)
    EventBus.getDefault().register(this)
}

解注册

1
2
3
4
override fun onResume() {
    super.onResume()
    EventBus.getDefault().unregister(this)
}

发送事件

1
EventBus.getDefault().post(Event())

发送粘性事件

1
EventBus.getDefault().postSticky(Event())

源码解析

getDefault获取单例

1
2
3
4
5
6
7
8
9
10
11
12
public static EventBus getDefault() {
    EventBus instance = defaultInstance;
    if (instance == null) {
        synchronized (EventBus.class) {
            instance = EventBus.defaultInstance;
            if (instance == null) {
                instance = EventBus.defaultInstance = new EventBus();
            }
        }
    }
    return instance;
}

使用双重检测,延迟初始化

register

1
2
3
4
5
6
7
8
9
10
11
12
13
public void register(Object subscriber) {
    // ...

    // 获取传入对象的class类型
    Class<?> subscriberClass = subscriber.getClass();
    // 遍历查找含有注解的所有方法
    List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
    synchronized (this) {
        for (SubscriberMethod subscriberMethod : subscriberMethods) {
            subscribe(subscriber, subscriberMethod);
        }
    }
}

先拿到传入对象的class类型,然后根据class类型使用SubscriberMethodFinder去查找解析含有注解的方法以及注解的属性。这里传入参数为对象是因为后续反射调用时需要用到对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
// Class to List<SubscriberMethod> 方法集合的缓存
Map<Class<?>, List<SubscriberMethod>> METHOD_CACHE = new ConcurrentHashMap<>()

List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
    // 看是否有缓存
    List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
    if (subscriberMethods != null) {
        return subscriberMethods;
    }

    if (ignoreGeneratedIndex) {
        // 使用反射查找
        subscriberMethods = findUsingReflection(subscriberClass);
    } else {
        subscriberMethods = findUsingInfo(subscriberClass);
    }
    if (subscriberMethods.isEmpty()) {
        throw new EventBusException("Subscriber " + subscriberClass
                + " and its super classes have no public methods with the @Subscribe annotation");
    } else {
        // 缓存起来
        METHOD_CACHE.put(subscriberClass, subscriberMethods);
        return subscriberMethods;
    }
}

private List<SubscriberMethod> findUsingReflection(Class<?> subscriberClass) {
    // 从缓存中获取,使用findState来查找
    FindState findState = prepareFindState();
    // 初始化findState
    findState.initForSubscriber(subscriberClass);
    while (findState.clazz != null) {
        findUsingReflectionInSingleClass(findState);
        findState.moveToSuperclass();
    }
    // 获取收集到的方法并缓存FindState对象
    return getMethodsAndRelease(findState);
}

// 查找需要的方法
private void findUsingReflectionInSingleClass(FindState findState) {
    Method[] methods;
    try {
        // 通过这个方法,手动遍历所有parent
        // This is faster than getMethods, especially when subscribers are fat classes like Activities
        methods = findState.clazz.getDeclaredMethods();
    } catch (Throwable th) {
        // Workaround for java.lang.NoClassDefFoundError, see https://github.com/greenrobot/EventBus/issues/149
        try {
            methods = findState.clazz.getMethods();
        } catch (LinkageError error) { // super class of NoClassDefFoundError to be a bit more broad...
            // ...
            throw new EventBusException(msg, error);
        }
        // getMethods方法默认会返回所有public方法,所以直接跳过父查找
        findState.skipSuperClasses = true;
    }
    for (Method method : methods) {
        int modifiers = method.getModifiers();
        if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
            Class<?>[] parameterTypes = method.getParameterTypes();
            // 只有一个入参
            if (parameterTypes.length == 1) {
                Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
                // 含有注解
                if (subscribeAnnotation != null) {
                    Class<?> eventType = parameterTypes[0];
                    if (findState.checkAdd(method, eventType)) {
                        ThreadMode threadMode = subscribeAnnotation.threadMode();
                        findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
                                subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
                    }
                }
            }
            // ...
        }
        // ...
    }
}

SubscriberMethodFinder中对查找过的class类型进行了缓存,如果没有命中缓存,则开始构建FindState,反射查找对应的方法。默认是手动逐层去查找所有的方法,遍历出public的且含有注解且入参只有一个的方法。获取对应的方法以及注解属性放入到FindState中。

再回到后面的subscribe方法中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
// 事件EventClass关联对应接收者的集合,发送事件时使用
private final Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType;
// 注册对象关联可以接收的EventClass集合
private final Map<Object, List<Class<?>>> typesBySubscriber;
// EventClass关联EventObj
private final Map<Class<?>, Object> stickyEvents;

private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
    Class<?> eventType = subscriberMethod.eventType;
    // 把对象和对象中的观察者方法关联起来,方便直接反射调用
    Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
    // 从缓存中取,EventClass关联接收方法和对象
    CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
    if (subscriptions == null) {
        subscriptions = new CopyOnWriteArrayList<>();
        subscriptionsByEventType.put(eventType, subscriptions);
    } else {
        // 不能重复注册,检查是否注册过了
        if (subscriptions.contains(newSubscription)) {
            throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
                    + eventType);
        }
    }

    int size = subscriptions.size();
    for (int i = 0; i <= size; i++) {
        // 根据优先级添加
        if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
            subscriptions.add(i, newSubscription);
            break;
        }
    }

    // 注册对象关联的所有事件
    List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
    if (subscribedEvents == null) {
        subscribedEvents = new ArrayList<>();
        typesBySubscriber.put(subscriber, subscribedEvents);
    }
    subscribedEvents.add(eventType);

    // 如果是粘性事件。检查是否有缓存的事件
    if (subscriberMethod.sticky) {
        if (eventInheritance) {
            // Existing sticky events of all subclasses of eventType have to be considered.
            // Note: Iterating over all events may be inefficient with lots of sticky events,
            // thus data structure should be changed to allow a more efficient lookup
            // (e.g. an additional map storing sub classes of super classes: Class -> List<Class>).
            Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
            for (Map.Entry<Class<?>, Object> entry : entries) {
                Class<?> candidateEventType = entry.getKey();
                if (eventType.isAssignableFrom(candidateEventType)) {
                    Object stickyEvent = entry.getValue();
                    checkPostStickyEventToSubscription(newSubscription, stickyEvent);
                }
            }
        } else {
            // 根据EventClass去获取EventObj
            Object stickyEvent = stickyEvents.get(eventType);
            checkPostStickyEventToSubscription(newSubscription, stickyEvent);
        }
    }
}

private void checkPostStickyEventToSubscription(Subscription newSubscription, Object stickyEvent) {
    // 如果对象不为空,则去触发对应的方法
    if (stickyEvent != null) {
        // If the subscriber is trying to abort the event, it will fail (event is not tracked in posting state)
        // --> Strange corner case, which we don't take care of here.
        postToSubscription(newSubscription, stickyEvent, isMainThread());
    }
}

这里会把注册的对象和对应的方法封装成Subscription对象,然后添加到Key为事件Class类型的缓存中,value为Subscription的列表,添加到列表中的时候会根据优先级排序。然后也会把注册对象和可以接收的事件集合缓存到另外的map中。如果方法是粘性方法,会去粘性事件的缓存中去查找对应的事件对象,若存在着直接发送事件。

post发送事件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private final ThreadLocal<PostingThreadState> currentPostingThreadState = new ThreadLocal<PostingThreadState>() {
    @Override
    protected PostingThreadState initialValue() {
        return new PostingThreadState();
    }
};

public void post(Object event) {
    // 获取当前线程的postState对象
    PostingThreadState postingState = currentPostingThreadState.get();
    // 把事件添加到队列中
    List<Object> eventQueue = postingState.eventQueue;
    eventQueue.add(event);

    if (!postingState.isPosting) {
        postingState.isMainThread = isMainThread();
        postingState.isPosting = true;
        if (postingState.canceled) {
            throw new EventBusException("Internal error. Abort state was not reset");
        }
        try {
            // 持续发送事件
            while (!eventQueue.isEmpty()) {
                postSingleEvent(eventQueue.remove(0), postingState);
            }
        } finally {
            postingState.isPosting = false;
            postingState.isMainThread = false;
        }
    }
}

发送事件时,会把对象添加到当前线程的postingState对象的事件队列中。然后持续去发送事件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
    Class<?> eventClass = event.getClass();
    boolean subscriptionFound = false;
    if (eventInheritance) {
        List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
        int countTypes = eventTypes.size();
        for (int h = 0; h < countTypes; h++) {
            Class<?> clazz = eventTypes.get(h);
            subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
        }
    } else {
        // 直接发送事件
        subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
    }
    if (!subscriptionFound) {
        if (logNoSubscriberMessages) {
            logger.log(Level.FINE, "No subscribers registered for event " + eventClass);
        }
        if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
                eventClass != SubscriberExceptionEvent.class) {
            post(new NoSubscriberEvent(this, event));
        }
    }
}

private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
    CopyOnWriteArrayList<Subscription> subscriptions;
    synchronized (this) {
        // 获取可以接收此事件的集合
        subscriptions = subscriptionsByEventType.get(eventClass);
    }
    if (subscriptions != null && !subscriptions.isEmpty()) {
        for (Subscription subscription : subscriptions) {
            postingState.event = event;
            postingState.subscription = subscription;
            boolean aborted;
            try {
                // 开始去发送事件
                postToSubscription(subscription, event, postingState.isMainThread);
                aborted = postingState.canceled;
            } finally {
                postingState.event = null;
                postingState.subscription = null;
                postingState.canceled = false;
            }
            if (aborted) {
                break;
            }
        }
        return true;
    }
    return false;
}

private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
    switch (subscription.subscriberMethod.threadMode) {
        case POSTING:
        // 不切换线程,直接调用
            invokeSubscriber(subscription, event);
            break;
        case MAIN:
            if (isMainThread) {
                invokeSubscriber(subscription, event);
            } else {
                // 添加到主线程的执行队列中
                mainThreadPoster.enqueue(subscription, event);
            }
            break;
        case MAIN_ORDERED:
            if (mainThreadPoster != null) {
                mainThreadPoster.enqueue(subscription, event);
            } else {
                // temporary: technically not correct as poster not decoupled from subscriber
                invokeSubscriber(subscription, event);
            }
            break;
        case BACKGROUND:
            if (isMainThread) {
                // 添加到子线程队列
                backgroundPoster.enqueue(subscription, event);
            } else {
                invokeSubscriber(subscription, event);
            }
            break;
        case ASYNC:
            asyncPoster.enqueue(subscription, event);
            break;
        default:
            throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
    }
}

真正发送时会先去根据事件的Class类型去查询可以接收的Subscription对象,然后去循环遍历去逐个判断通知,主要是判断是否需要线程切换。如果当前线程不满足,就会被添加到对应线程的poster队列中,然后发送handler事件一并执行。

1
2
3
4
5
6
7
8
9
10
void invokeSubscriber(Subscription subscription, Object event) {
    try {
        // 直接反射调用
        subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
    } catch (InvocationTargetException e) {
        handleSubscriberException(subscription, event, e.getCause());
    } catch (IllegalAccessException e) {
        throw new IllegalStateException("Unexpected exception", e);
    }
}

最后通知就是直接反射调用对应的方法

postSticky发送粘性事件

1
2
3
4
5
6
7
8
public void postSticky(Object event) {
    // 如果是粘性事件,就缓存起来
    synchronized (stickyEvents) {
        stickyEvents.put(event.getClass(), event);
    }
    // Should be posted after it is putted, in case the subscriber wants to remove immediately
    post(event);
}

粘性事件与普通事件的区别就是粘性事件会被缓存起来,然后就直接调用post发送事件。

unregister

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public synchronized void unregister(Object subscriber) {
    // 根据对象获取他所有的EventClass
    List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
    if (subscribedTypes != null) {
        for (Class<?> eventType : subscribedTypes) {
            unsubscribeByEventType(subscriber, eventType);
        }
        typesBySubscriber.remove(subscriber);
    } else {
        logger.log(Level.WARNING, "Subscriber to unregister was not registered before: " + subscriber.getClass());
    }
}

private void unsubscribeByEventType(Object subscriber, Class<?> eventType) {
    // 根据EventClass获取可以接收的对象
    List<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
    if (subscriptions != null) {
        int size = subscriptions.size();
        for (int i = 0; i < size; i++) {
            Subscription subscription = subscriptions.get(i);
            // 移除同一个对象,避免内存泄漏
            if (subscription.subscriber == subscriber) {
                subscription.active = false;
                subscriptions.remove(i);
                i--;
                size--;
            }
        }
    }
}

解注册时先根据传入的对象类型,查找缓存的EventClass集合,然后根据EventClass查询所有Subscription,依次循环移除掉与当前相等的Subscription对象。

总结

  • EventBus单例创建使用了双重检测的方式
  • 注册监听时EventBus会使用subscriberMethodFinder来查找出注册对象的所有监听方法,subscriberMethodFinder存有Class类型对应方法的集合缓存,然后对注册对象以及解锁出来的所有方法包装成Subscription对象,储存在EventBus中的EventClass对Subscription集合中,添加时会判断优先级插入到对应的位置,方便之后的事件发送时查找关联事件发送。另外还会缓存一份注册对象对应他可以接收事件的集合在另外的一个map。最后如果是注册的粘性事件,会去粘性集合中查找是否有对应的事件,如果有直接触发。
  • 当发送事件时,会根据发送的事件Class查找到可以接收事件的Subscription对象列表,根据设置的线程要求,直接触发或发送到对应的线程的poster队列中去,最后通过反射调用执行。如果是粘性事件,则会单独缓存到一个EventClass to EventObject的集合中。
  • 当解注册时,直接通过注册对象去查找可以接收的事件类型,然后查找可以接收对应事件的Subscription对象集合,然后移除含有与注册对象相同对象的Subscription对象。
This post is licensed under CC BY 4.0 by the author.