Spring异步事件机制剖析

您所在的位置:网站首页 接口异步处理 Spring异步事件机制剖析

Spring异步事件机制剖析

2022-12-12 20:22| 来源: 网络整理| 查看: 265

同步&异步 同步事件 在一个线程里,所有的业务方法都是顺序执行的,存在上下依赖关系,其中一个环节耗时过长或阻塞会影响后续环节,且整体耗时增加也受到影响。 在这里插入图片描述 异步事件 在一个线程里,执行一个业务方法或逻辑,其他业务方法或逻辑通过异步线程进行并行执行,彼此独立不影响,可以充分利用多线程的优势提高并发,减少整体耗时。 在这里插入图片描述 实现原理 交互流程

在这里插入图片描述

publisher 事件发布器,这里是事件对象的发布入口 listener 事件监听器,这里是事件对象处理的最终对象 event 事件对象,事件数据的载体 applicationEventMultiCaster 事件多播器,它是连接发布器与监听器的桥梁和中转路由,负责将事件对象分发到具体的监听器上去

Spring中的事件机制是通过观察者模式来进行实现的,支持同步/异步两种方式。

Spring的事件机制提供了一种可以实现业务解耦的优雅编程方式,将实现剥离,使得实现细节更加具体和聚焦,在一定程度上便于后续维护,扩展性较好。

一般来说,我们使用异步方式进行,否则便和日常的同步调用没有太大区别,无法充分发挥异步线程带来的并行优势。

特点作用观察者模式解耦异步并行化 初始化

关于Spring事件机制的初始化,其实主要是基于观察者模式进行发布器、监听器路由关系的绑定和建立,事件多播器也是依赖映射关系进行事件对象的分发实现具体监听器的处理。

Spring事件机制只是整个Spring环境中的一个组成部分,这里需要前置了解Spring环境的初始化方式和工作原理后再结合剖析事件机制的初始化就不难理解了。我们知道AbstractApplicationContext是Spring中环境的抽象基类,它的refresh()方法涵盖了所有整个Spring初始化流程,这里面就包含多播器的初始化方法initApplicationEventMulticaster()和监听器的绑定方法registerListeners()。

public void refresh() throws BeansException, IllegalStateException { //。。。 //省略其他 synchronized(this.startupShutdownMonitor) { //。。。 //省略其他 try { //。。。 //省略其他 this.initApplicationEventMulticaster(); this.registerListeners(); //省略其他 //。。。 } catch (BeansException var9) { //。。。 //省略其他 } finally { //。。。 //省略其他 } } } 复制代码

下面来看下多播器的初始化方法initApplicationEventMulticaster(),主要是从BeanFactory中判断是否已创建多播器,如果没有创建则创建默认的SimpleApplicationEventMulticaster作为多播器。

protected void initApplicationEventMulticaster() { // 拿到当前BeanFactory,通过Bean工厂来获取多播器的Bean实例 ConfigurableListableBeanFactory beanFactory = this.getBeanFactory(); // 如果存在多播器,则直接获取 if (beanFactory.containsLocalBean("applicationEventMulticaster")) { this.applicationEventMulticaster = (ApplicationEventMulticaster)beanFactory.getBean("applicationEventMulticaster", ApplicationEventMulticaster.class); if (this.logger.isDebugEnabled()) { this.logger.debug("Using ApplicationEventMulticaster [" + this.applicationEventMulticaster + "]"); } } // 如果不存在多播器,则直接创建默认的SimpleApplicationEventMulticaster多播器进行Bean注册 else { this.applicationEventMulticaster = new SimpleApplicationEventMulticaster(beanFactory); beanFactory.registerSingleton("applicationEventMulticaster", this.applicationEventMulticaster); if (this.logger.isDebugEnabled()) { this.logger.debug("Unable to locate ApplicationEventMulticaster with name 'applicationEventMulticaster': using default [" + this.applicationEventMulticaster + "]"); } } } 复制代码

监听器的注册方法registerListeners()

protected void registerListeners() { // 获取所有ApplicationListener的迭代器 Iterator var1 = this.getApplicationListeners().iterator(); // 把所有监听器都注册到当前的多播器上去 while(var1.hasNext()) { ApplicationListener listener = (ApplicationListener)var1.next(); this.getApplicationEventMulticaster().addApplicationListener(listener); } String[] listenerBeanNames = this.getBeanNamesForType(ApplicationListener.class, true, false); String[] var7 = listenerBeanNames; int var3 = listenerBeanNames.length; for(int var4 = 0; var4 < var3; ++var4) { String listenerBeanName = var7[var4]; this.getApplicationEventMulticaster().addApplicationListenerBean(listenerBeanName); } // 如果是早期事件,则直接遍历执行发布事件对象 Set earlyEventsToProcess = this.earlyApplicationEvents; this.earlyApplicationEvents = null; if (earlyEventsToProcess != null) { Iterator var9 = earlyEventsToProcess.iterator(); while(var9.hasNext()) { ApplicationEvent earlyEvent = (ApplicationEvent)var9.next(); this.getApplicationEventMulticaster().multicastEvent(earlyEvent); } } } 复制代码 发布事件

发布事件是通过AbstractApplicationContext的publishEvent()方法发布的

public void publishEvent(Object event) { publishEvent(event, null); } protected void publishEvent(Object event, @Nullable ResolvableType eventType) { Assert.notNull(event, "Event must not be null"); // 判断事件类型是否为ApplicationEvent,如果不是则封装成PayloadApplicationEvent ApplicationEvent applicationEvent; if (event instanceof ApplicationEvent) { applicationEvent = (ApplicationEvent) event; } else { applicationEvent = new PayloadApplicationEvent(this, event); if (eventType == null) { eventType = ((PayloadApplicationEvent) applicationEvent).getResolvableType(); } } // 如果是Spring初始化的早期事件,则需要加入到早期事件中立即发布 if (this.earlyApplicationEvents != null) { this.earlyApplicationEvents.add(applicationEvent); } else { //如果不是早期事件,则通过多播器立即进行事件发布 getApplicationEventMulticaster().multicastEvent(applicationEvent, eventType); } //如果父类上下文存在,进行发布事件 if (this.parent != null) { if (this.parent instanceof AbstractApplicationContext) { ((AbstractApplicationContext) this.parent).publishEvent(event, eventType); } else { this.parent.publishEvent(event); } } } 复制代码

然后再来看下SimpleApplicationEventMulticaster的multicastEvent()方法,这是多播器广播事件的方法

public void multicastEvent(ApplicationEvent event) { //解析Event类型,进行事件发布 multicastEvent(event, resolveDefaultEventType(event)); } public void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) { // 获取到事件对应的解析类型 ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event)); // 获取到多播器当前的线程池 Executor executor = getTaskExecutor(); // 获取当前应用中与给定事件类型匹配的ApplicationListeners的监听器集合,不符合的监听器会被排除在外。再循环执行 for (ApplicationListener listener : getApplicationListeners(event, type)) { // 如果线程池不为空,则通过线程池异步执行 if (executor != null) { executor.execute(() -> invokeListener(listener, event)); } else { // 否则由当前线程执行 invokeListener(listener, event); } } } 复制代码 事件处理

事件处理由invokeListener()方法进行处理,该方法做了一层try、catch封装,实际方法在doInvokeListener()方法中的listener.onApplicationEvent(event)

protected void invokeListener(ApplicationListener listener, ApplicationEvent event) { ErrorHandler errorHandler = getErrorHandler(); if (errorHandler != null) { try { doInvokeListener(listener, event); } catch (Throwable err) { errorHandler.handleError(err); } } else { doInvokeListener(listener, event); } } @SuppressWarnings({"rawtypes", "unchecked"}) private void doInvokeListener(ApplicationListener listener, ApplicationEvent event) { try { // 实际监听器接受该事件并处理 listener.onApplicationEvent(event); } catch (ClassCastException ex) { String msg = ex.getMessage(); if (msg == null || matchesClassCastMessage(msg, event.getClass())) { Log logger = LogFactory.getLog(getClass()); if (logger.isTraceEnabled()) { logger.trace("Non-matching event type for listener: " + listener, ex); } } else { throw ex; } } } 复制代码 关系匹配

当发布器发布事件对象后,会通过getApplicationListeners 方法进行监听器的获取,返回的监听器集合进行遍历和类型解析,匹配到符合条件的监听器。

protected Collection sourceType = (source != null ? source.getClass() : null); // 将给定事件类型与源类型进行封装 ListenerCacheKey cacheKey = new ListenerCacheKey(eventType, sourceType); // Quick check for existing entry on ConcurrentHashMap... ListenerRetriever retriever = this.retrieverCache.get(cacheKey); if (retriever != null) { return retriever.getApplicationListeners(); } if (this.beanClassLoader == null || (ClassUtils.isCacheSafe(event.getClass(), this.beanClassLoader) && (sourceType == null || ClassUtils.isCacheSafe(sourceType, this.beanClassLoader)))) { // 完全同步构建和缓存ListenerRetriever synchronized (this.retrievalMutex) { retriever = this.retrieverCache.get(cacheKey); if (retriever != null) { return retriever.getApplicationListeners(); } retriever = new ListenerRetriever(true); //实际检索给定事件和源类型的应用程序监听器,将过滤后的监听器集合进行返回 Collection> retrieveApplicationListeners( ResolvableType eventType, @Nullable Class sourceType, @Nullable ListenerRetriever retriever) { List> listeners; //当前应用中的监听器BeanName集合 Set listenerBeans; synchronized (this.retrievalMutex) { listeners = new LinkedHashSet(this.defaultRetriever.applicationListeners); listenerBeans = new LinkedHashSet(this.defaultRetriever.applicationListenerBeans); } for (ApplicationListener listener : listeners) { // 判断监听器是否支持给定事件 if (supportsEvent(listener, eventType, sourceType)) { if (retriever != null) { retriever.applicationListeners.add(listener); } allListeners.add(listener); } } if (!listenerBeans.isEmpty()) { BeanFactory beanFactory = getBeanFactory(); for (String listenerBeanName : listenerBeans) { try { //根据监听器的BeanName获取到对应类型 Class listenerType = beanFactory.getType(listenerBeanName); if (listenerType == null || supportsEvent(listenerType, eventType)) { ApplicationListener listener = beanFactory.getBean(listenerBeanName, ApplicationListener.class); //判断监听器是否支持给定事件 if (!allListeners.contains(listener) && supportsEvent(listener, eventType, sourceType)) { if (retriever != null) { if (beanFactory.isSingleton(listenerBeanName)) { retriever.applicationListeners.add(listener); } else { retriever.applicationListenerBeans.add(listenerBeanName); } } allListeners.add(listener); } } } catch (NoSuchBeanDefinitionException ex) { // Singleton listener instance (without backing bean definition) disappeared - // probably in the middle of the destruction phase } } } AnnotationAwareOrderComparator.sort(allListeners); if (retriever != null && retriever.applicationListenerBeans.isEmpty()) { retriever.applicationListeners.clear(); retriever.applicationListeners.addAll(allListeners); } return allListeners; } 复制代码 配置方式 开启异步 注解方式 /** * created by guanjian on 2021/4/7 9:36 */ @Configuration @EnableAsync public class AsyncConfig{ // 这里还可以配置其他更多可选项,如异步线程池等 } 复制代码 XML方式 复制代码



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3