博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spring boot异步注解源码解析
阅读量:7251 次
发布时间:2019-06-29

本文共 33520 字,大约阅读时间需要 111 分钟。

hot3.png

一、例子

我们先来看下面这个Demo。

pom.xml中maven依赖:

org.springframework.boot
spring-boot-starter-parent
1.5.14.RELEASE
org.springframework.boot
spring-boot-starter-web
org.projectlombok
lombok
1.16.18
org.springframework.boot
spring-boot-starter-test
test

启动类SpringBootAsyncApplication.java

@SpringBootApplication@EnableAsyncpublic class SpringBootAsyncApplication {    public static void main(String[] args) {        SpringApplication.run(SpringBootAsyncApplication.class, args);    }}

DemoController.java

@RestController@RequestMapping(value = "/demos")@Slf4jpublic class DemoController {    @Autowired    private IDemoService demoService;    @GetMapping("")    public String test(@RequestParam String name) {        long start = System.currentTimeMillis();        log.info("start send. ThreadName: {}", Thread.currentThread().getName());        demoService.send(name);        long end = System.currentTimeMillis();        log.info("send end, time: {}", (end - start));        return "success";    }}

IDemoService接口实现类DemoServiceImpl.java

@Service@Slf4jpublic class DemoServiceImpl implements IDemoService {    @Async    @Override    public void send(String name) {        try {            Thread.sleep(5000);        } catch (InterruptedException e) {            e.printStackTrace();        }        log.info("async name={}, ThreadName: {}", name, Thread.currentThread().getName());    }}

启动项目后,访问GET http://127.0.0.1:8002/demos?name=test,得到如下结果:

2019-04-12 17:17:32.462 INFO 12 --- [nio-8002-exec-1] c.l.d.s.controller.DemoController        : start send. ThreadName: http-nio-8002-exec-12019-04-12 17:17:32.466 INFO 12 --- [nio-8002-exec-1] .s.a.AnnotationAsyncExecutionInterceptor : No task executor bean found for async processing: no bean of type TaskExecutor and no bean named 'taskExecutor' either2019-04-12 17:17:32.467 INFO 12 --- [nio-8002-exec-1] c.l.d.s.controller.DemoController        : send end, time: 52019-04-12 17:17:37.468 INFO 12 --- [cTaskExecutor-1] c.l.d.s.service.impl.DemoServiceImpl     : async name=test, ThreadName: SimpleAsyncTaskExecutor-1

通过控制台日志打印,我们可以看到有两个线程,一个是主线程,一个是异步的线程。没等异步的线程执行完,主线程就直接执行完毕,返回响应结果了,这就是异步的效果。

二、结论

2.1 实现异步方式

  • 开启异步配置,即在启动类或者配置类上加@EnableAsync注解;
  • 在方法或类上加@Async注解。

2.2 @Async注解

  • 用@Async注解的方法,将使它在一个单独的线程(例子中SimpleAsyncTaskExecutor-1线程)中执行,调用者不用等待被调用方法完成。
  • 用@Async注解的方法,必须只应用于public方法上(只有public修饰的方法才能被进行代理)。
  • @Async注解不能自调用,即在同一个类中调用异步方法,否则不起作用(同一个类中调用方法的话会略过代理进行直接调用)。

三、疑问

为什么会抛出下面这段日志?

[nio-8002-exec-1] .s.a.AnnotationAsyncExecutionInterceptor : No task executor bean found for async processing: no bean of type TaskExecutor and no bean named 'taskExecutor' either

没有发现用于异步处理的任务执行器,既没有TaskExecutor类型的bean,也没有名为“taskExecutor”的bean。什么意思?而且异步开启的线程为啥前缀是SimpleAsyncTaskExecutor?带着这些疑问,我们深入到源码中,看看到底发生了什么。

四、源码解析

spring-context-4.3.18.RELEASE.jar!org.springframework.scheduling.annotation.Async.java

/** * Annotation that marks a method as a candidate for asynchronous execution. * Can also be used at the type level, in which case all of the type's methods are * considered as asynchronous. * * 

In terms of target method signatures, any parameter types are supported. * However, the return type is constrained to either {@code void} or * {@link java.util.concurrent.Future}. In the latter case, you may declare the * more specific {@link org.springframework.util.concurrent.ListenableFuture} or * {@link java.util.concurrent.CompletableFuture} types which allow for richer * interaction with the asynchronous task and for immediate composition with * further processing steps. * *

A {@code Future} handle returned from the proxy will be an actual asynchronous * {@code Future} that can be used to track the result of the asynchronous method * execution. However, since the target method needs to implement the same signature, * it will have to return a temporary {@code Future} handle that just passes a value * through: e.g. Spring's {@link AsyncResult}, EJB 3.1's {@link javax.ejb.AsyncResult}, * or {@link java.util.concurrent.CompletableFuture#completedFuture(Object)}. * * @author Juergen Hoeller * @author Chris Beams * @since 3.0 * @see AnnotationAsyncExecutionInterceptor * @see AsyncAnnotationAdvisor */@Target({ElementType.METHOD, ElementType.TYPE})@Retention(RetentionPolicy.RUNTIME)@Documentedpublic @interface Async { /** * 指定异步操作的限定符值 * A qualifier value for the specified asynchronous operation(s). *

May be used to determine the target executor to be used when executing this * method, matching the qualifier value (or the bean name) of a specific * {@link java.util.concurrent.Executor Executor} or * {@link org.springframework.core.task.TaskExecutor TaskExecutor} * bean definition. *

When specified on a class level {@code @Async} annotation, indicates that the * given executor should be used for all methods within the class. Method level use * of {@code Async#value} always overrides any value set at the class level. * @since 3.1.2 */ String value() default "";}

spring-context-4.3.18.RELEASE.jar!org.springframework.scheduling.annotation.EnableAsync.java

package org.springframework.scheduling.annotation;import java.lang.annotation.Annotation;import java.lang.annotation.Documented;import java.lang.annotation.ElementType;import java.lang.annotation.Retention;import java.lang.annotation.RetentionPolicy;import java.lang.annotation.Target;import org.springframework.context.annotation.AdviceMode;import org.springframework.context.annotation.Configuration;import org.springframework.context.annotation.Import;import org.springframework.core.Ordered;/** * Enables Spring's asynchronous method execution capability, similar to functionality * found in Spring's {@code 
} XML namespace. * *

To be used together with @{@link Configuration Configuration} classes as follows, * enabling annotation-driven async processing for an entire Spring application context: * *

 * @Configuration * @EnableAsync * public class AppConfig { * * }
* * {@code MyAsyncBean} is a user-defined type with one or more methods annotated with * either Spring's {@code @Async} annotation, the EJB 3.1 {@code @javax.ejb.Asynchronous} * annotation, or any custom annotation specified via the {@link #annotation} attribute. * The aspect is added transparently for any registered bean, for instance via this * configuration: * *
 * @Configuration * public class AnotherAppConfig { * *     @Bean *     public MyAsyncBean asyncBean() { *         return new MyAsyncBean(); *     } * }
* *

By default, Spring will be searching for an associated thread pool definition: * either a unique {@link org.springframework.core.task.TaskExecutor} bean in the context, * or an {@link java.util.concurrent.Executor} bean named "taskExecutor" otherwise. If * neither of the two is resolvable, a {@link org.springframework.core.task.SimpleAsyncTaskExecutor} * will be used to process async method invocations. Besides, annotated methods having a * {@code void} return type cannot transmit any exception back to the caller. By default, * such uncaught exceptions are only logged. * *

To customize all this, implement {@link AsyncConfigurer} and provide: *

    *
  • your own {@link java.util.concurrent.Executor Executor} through the * {@link AsyncConfigurer#getAsyncExecutor getAsyncExecutor()} method, and
  • *
  • your own {@link org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler * AsyncUncaughtExceptionHandler} through the {@link AsyncConfigurer#getAsyncUncaughtExceptionHandler * getAsyncUncaughtExceptionHandler()} * method.
  • *
* *
 * @Configuration * @EnableAsync * public class AppConfig implements AsyncConfigurer { * *     @Override *     public Executor getAsyncExecutor() { *         ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); *         executor.setCorePoolSize(7); *         executor.setMaxPoolSize(42); *         executor.setQueueCapacity(11); *         executor.setThreadNamePrefix("MyExecutor-"); *         executor.initialize(); *         return executor; *     } * *     @Override *     public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { *         return MyAsyncUncaughtExceptionHandler(); *     } * }
* *

If only one item needs to be customized, {@code null} can be returned to * keep the default settings. Consider also extending from {@link AsyncConfigurerSupport} * when possible. * *

Note: In the above example the {@code ThreadPoolTaskExecutor} is not a fully managed * Spring bean. Add the {@code @Bean} annotation to the {@code getAsyncExecutor()} method * if you want a fully managed bean. In such circumstances it is no longer necessary to * manually call the {@code executor.initialize()} method as this will be invoked * automatically when the bean is initialized. * *

For reference, the example above can be compared to the following Spring XML * configuration: * *

 * {@code * 
* *
* *
* *
* *
* *
* }
* * The above XML-based and JavaConfig-based examples are equivalent except for the * setting of the
thread name prefix of the {@code Executor}; this is because * the {@code
} element does not expose such an attribute. This * demonstrates how the JavaConfig-based approach allows for maximum configurability * through direct access to actual componentry. * *

The {@link #mode} attribute controls how advice is applied: If the mode is * {@link AdviceMode#PROXY} (the default), then the other attributes control the behavior * of the proxying. Please note that proxy mode allows for interception of calls through * the proxy only; local calls within the same class cannot get intercepted that way. * *

Note that if the {@linkplain #mode} is set to {@link AdviceMode#ASPECTJ}, then the * value of the {@link #proxyTargetClass} attribute will be ignored. Note also that in * this case the {@code spring-aspects} module JAR must be present on the classpath, with * compile-time weaving or load-time weaving applying the aspect to the affected classes. * There is no proxy involved in such a scenario; local calls will be intercepted as well. * * @author Chris Beams * @author Juergen Hoeller * @author Stephane Nicoll * @author Sam Brannen * @since 3.1 * @see Async * @see AsyncConfigurer * @see AsyncConfigurationSelector */@Target(ElementType.TYPE)@Retention(RetentionPolicy.RUNTIME)@Documented@Import(AsyncConfigurationSelector.class)public @interface EnableAsync { /** * Indicate the 'async' annotation type to be detected at either class * or method level. *

By default, both Spring's @{@link Async} annotation and the EJB 3.1 * {@code @javax.ejb.Asynchronous} annotation will be detected. *

This attribute exists so that developers can provide their own * custom annotation type to indicate that a method (or all methods of * a given class) should be invoked asynchronously. */ Class

annotation() default Annotation.class; /** * Indicate whether subclass-based (CGLIB) proxies are to be created as opposed * to standard Java interface-based proxies. *

Applicable only if the {@link #mode} is set to {@link AdviceMode#PROXY}. *

The default is {@code false}. *

Note that setting this attribute to {@code true} will affect all * Spring-managed beans requiring proxying, not just those marked with {@code @Async}. * For example, other beans marked with Spring's {@code @Transactional} annotation * will be upgraded to subclass proxying at the same time. This approach has no * negative impact in practice unless one is explicitly expecting one type of proxy * vs. another — for example, in tests. */ boolean proxyTargetClass() default false; /** * Indicate how async advice should be applied. *

The default is {@link AdviceMode#PROXY}. * Please note that proxy mode allows for interception of calls through the proxy * only. Local calls within the same class cannot get intercepted that way; an * {@link Async} annotation on such a method within a local call will be ignored * since Spring's interceptor does not even kick in for such a runtime scenario. * For a more advanced mode of interception, consider switching this to * {@link AdviceMode#ASPECTJ}. */ AdviceMode mode() default AdviceMode.PROXY; /** * Indicate the order in which the {@link AsyncAnnotationBeanPostProcessor} * should be applied. *

The default is {@link Ordered#LOWEST_PRECEDENCE} in order to run * after all other post-processors, so that it can add an advisor to * existing proxies rather than double-proxy. */ int order() default Ordered.LOWEST_PRECEDENCE;}

@EnableAsync启用Spring异步方法执行功能,类似于Spring的<task:*>XML命名空间,和@Configuration注解类一起使用,为整个Spring应用程序上下文启用注释驱动的异步处理。

默认情况下,Spring将会搜索一个关联的线程池定义:要么是一个在上下文中唯一的TaskExecutor类型的bean,要么是一个名叫"taskExecutor"的Executor类型的bean;如果两者都无法解决,SimpleAsyncTaskExecutor将用于处理异步方法调用。此外,具有void返回类型的带注释的方法不能将任何异常传回调用者。默认情况下,此类未捕获异常只会被记录下来。

为了定制所有这些,需要实现AsyncConfigurer类,并重写AsyncConfigurer类中getAsyncExecutor()方法和getAsyncUncaughtExceptionHandler()方法。

@Configuration@EnableAsyncpublic class AppConfig implements AsyncConfigurer {    @Override    public Executor getAsyncExecutor() {        ThreadPoolTaskExecutor executor= new ThreadPoolTaskExecutor();        executor.setCorePoolSize(7);        executor.setMaxPoolSize(42);        executor.setQueueCapacity(11);        executor.setThreadNamePrefix("MyExecutor-");        executor.initialize();        return executor;    }    @Override    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {        return MyAsyncUncaughtExceptionHandler();    }}

如果只需要定制一个项目,则可以返回null以保持默认设置,如果可能的话还可以考虑从AsyncConfigurerSupport扩展。

注意:在上面的示例中,ThreadPoolTaskExecutor不是一个完全受管理Spring bean。如果你想要一个完全受管理的bean,可以将@Bean注解添加到getAsyncExecutor()方法上;在这种情况下,就没有必要再手动调用executor.initialize()方法进行bean初始化了。

@Bean@Overridepublic Executor getAsyncExecutor() {	ThreadPoolTaskExecutor executor= new ThreadPoolTaskExecutor();	executor.setCorePoolSize(7);	executor.setMaxPoolSize(42);	executor.setQueueCapacity(11);	executor.setThreadNamePrefix("MyExecutor-");	return executor;}

为了便于参考,可以将上面的示例与下面的Spring XML配置进行比较:

上述基于xml和基于Java配置的示例除了设置执行器的线程名前缀外,其余都是等价的,这是因为<task:executor>元素没有暴露这样的属性。这演示了基于Java配置的方法如何通过直接访问实际组件来实现最大的可配置性。

#mode属性控制应用如何被通知:如果mode是AdviceMode#PROXY(默认值),那么其他属性控制代理的行为。请注意,代理模式只允许拦截通过代理进行的调用,同一类内的本地调用不能以这种方式被拦截。

注意,如果#mode被设置为AdviceMode#ASPECTJ,那么#proxyTargetClass属性值将被忽略。还要注意,在这种情况下,spring-aspects模块JAR必须出现在类路径上,编译时或加载时应用切面到受影响的类上。在这种情况下不涉及代理,本地调用也会被拦截。

上面这些都是EnableAsync类注释,具体是如何实现的?且看下面。

有没有发现,EnableAsync类唯一的核心注解就是@Import(AsyncConfigurationSelector.class),我们来看下它的源码:

spring-context-4.3.18.RELEASE.jar!org.springframework.scheduling.annotation.AsyncConfigurationSelector.java

/** * Selects which implementation of {@link AbstractAsyncConfiguration} should be used based * on the value of {@link EnableAsync#mode} on the importing {@code @Configuration} class. * * @author Chris Beams * @since 3.1 * @see EnableAsync * @see ProxyAsyncConfiguration */public class AsyncConfigurationSelector extends AdviceModeImportSelector
{ private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME = "org.springframework.scheduling.aspectj.AspectJAsyncConfiguration"; /** * {@inheritDoc} * @return {@link ProxyAsyncConfiguration} or {@code AspectJAsyncConfiguration} for * {@code PROXY} and {@code ASPECTJ} values of {@link EnableAsync#mode()}, respectively */ @Override public String[] selectImports(AdviceMode adviceMode) { switch (adviceMode) { case PROXY: return new String[] { ProxyAsyncConfiguration.class.getName() }; case ASPECTJ: return new String[] { ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME }; default: return null; } }}

根据导入@Configuration类上EnableAsync#mode的值选择AbstractAsyncConfiguration的哪个实现类应该被使用。

  • 默认配置PROXY,使用ProxyAsyncConfiguration。

spring-context-4.3.18.RELEASE.jar!org.springframework.scheduling.annotation.ProxyAsyncConfiguration.java

/** * {@code @Configuration} class that registers the Spring infrastructure beans necessary * to enable proxy-based asynchronous method execution. * * @author Chris Beams * @author Stephane Nicoll * @since 3.1 * @see EnableAsync * @see AsyncConfigurationSelector */@Configuration@Role(BeanDefinition.ROLE_INFRASTRUCTURE)public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {    /**     * 定义了一个AsyncAnnotationBeanPostProcessor类bean     */	@Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)	@Role(BeanDefinition.ROLE_INFRASTRUCTURE)	public AsyncAnnotationBeanPostProcessor asyncAdvisor() {		Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected");        // 新建一个异步注解bean后处理器		AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();        // 获取@EnableAsync中用户自定义annotation		Class
customAsyncAnnotation = this.enableAsync.getClass("annotation"); // 如果不是默认注解,则设置异步注解配置 if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) { bpp.setAsyncAnnotationType(customAsyncAnnotation); } // 设置线程任务执行器 if (this.executor != null) { bpp.setExecutor(this.executor); } // 设置异常处理器 if (this.exceptionHandler != null) { bpp.setExceptionHandler(this.exceptionHandler); } // 设置是否升级到CGLIB子类代理,默认不开启 bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass")); // 设置执行优先级,默认最后执行 bpp.setOrder(this.enableAsync.
getNumber("order")); return bpp; }}

spring-context-4.3.18.RELEASE.jar!org.springframework.scheduling.annotation.AbstractAsyncConfiguration.java

/** * Abstract base {@code Configuration} class providing common structure for enabling * Spring's asynchronous method execution capability. * * @author Chris Beams * @author Stephane Nicoll * @since 3.1 * @see EnableAsync */@Configurationpublic abstract class AbstractAsyncConfiguration implements ImportAware {    // 注解属性	protected AnnotationAttributes enableAsync;    // 线程任务执行器	protected Executor executor;    // 异常处理器	protected AsyncUncaughtExceptionHandler exceptionHandler;	@Override	public void setImportMetadata(AnnotationMetadata importMetadata) {		this.enableAsync = AnnotationAttributes.fromMap(				importMetadata.getAnnotationAttributes(EnableAsync.class.getName(), false));		if (this.enableAsync == null) {			throw new IllegalArgumentException(					"@EnableAsync is not present on importing class " + importMetadata.getClassName());		}	}	/**	 * Collect any {@link AsyncConfigurer} beans through autowiring.	 */	@Autowired(required = false)	void setConfigurers(Collection
configurers) { if (CollectionUtils.isEmpty(configurers)) { return; } if (configurers.size() > 1) { throw new IllegalStateException("Only one AsyncConfigurer may exist"); } AsyncConfigurer configurer = configurers.iterator().next(); this.executor = configurer.getAsyncExecutor(); this.exceptionHandler = configurer.getAsyncUncaughtExceptionHandler(); }}

可以看到接口org.springframework.scheduling.annotation.AsyncConfigurer的唯一实现类org.springframework.scheduling.annotation.AsyncConfigurerSupport:

/** * A convenience {@link AsyncConfigurer} that implements all methods * so that the defaults are used. Provides a backward compatible alternative * of implementing {@link AsyncConfigurer} directly. * * @author Stephane Nicoll * @since 4.1 */public class AsyncConfigurerSupport implements AsyncConfigurer {	@Override	public Executor getAsyncExecutor() {		return null;	}	@Override	public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {		return null;	}}

这就是上面注释讲的,可以通过实现AsyncConfigurer接口实现默认线程池和异常处理的定制化。

回到ProxyAsyncConfiguration类,这是一个Configuration类,在其中通过@bean注入了AsyncAnnotationBeanPostProcessor 类。

d6fbc0aa1d581306e4f1b7574671d5cd0f5.jpg

AsyncAnnotationBeanPostProcessor这个类的Bean初始化时,重写了BeanFactoryAware接口setBeanFactory方法,对AsyncAnnotationAdvisor异步注解切面进行了构造。

@Overridepublic void setBeanFactory(BeanFactory beanFactory) {	super.setBeanFactory(beanFactory);	AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);	if (this.asyncAnnotationType != null) {		advisor.setAsyncAnnotationType(this.asyncAnnotationType);	}	advisor.setBeanFactory(beanFactory);	this.advisor = advisor;}

它会在普通bean属性之后、初始化回调(如InitializingBean#afterPropertiesSet() 或者一个自定义初始化方法)之前被调用。

 

AsyncAnnotationBeanPostProcessor的后置bean处理是通过其父类AbstractAdvisingBeanPostProcessor来实现的,该类实现了BeanPostProcessor接口,重写postProcessAfterInitialization方法。

@Overridepublic Object postProcessAfterInitialization(Object bean, String beanName) {	if (bean instanceof AopInfrastructureBean) {		// Ignore AOP infrastructure such as scoped proxies.		return bean;	}    //把Advisor添加进bean  ProxyFactory-》AdvisedSupport-》Advised	if (bean instanceof Advised) {		Advised advised = (Advised) bean;		if (!advised.isFrozen() && isEligible(AopUtils.getTargetClass(bean))) {			// Add our local Advisor to the existing proxy's Advisor chain...			if (this.beforeExistingAdvisors) {				advised.addAdvisor(0, this.advisor);			}			else {				advised.addAdvisor(this.advisor);			}			return bean;		}	}       //构造ProxyFactory代理工厂,添加代理的接口,设置切面,最后返回代理类:AopProxy	if (isEligible(bean, beanName)) {		ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName);		if (!proxyFactory.isProxyTargetClass()) {			evaluateProxyInterfaces(bean.getClass(), proxyFactory);		}		proxyFactory.addAdvisor(this.advisor);		customizeProxyFactory(proxyFactory);		return proxyFactory.getProxy(getProxyClassLoader());	}	// No async proxy needed.	return bean;}

JDK动态代理类JdkDynamicAopProxy实现AopProxy接口,最终执行的是InvocationHandler接口的invoke方法。

/** * Implementation of {@code InvocationHandler.invoke}. * 

Callers will see exactly the exception thrown by the target, * unless a hook method throws an exception. */@Overridepublic Object invoke(Object proxy, Method method, Object[] args) throws Throwable { MethodInvocation invocation; Object oldProxy = null; boolean setProxyContext = false; TargetSource targetSource = this.advised.targetSource; Class

targetClass = null; Object target = null; try { if (!this.equalsDefined && AopUtils.isEqualsMethod(method)) { // The target does not implement the equals(Object) method itself. return equals(args[0]); } else if (!this.hashCodeDefined && AopUtils.isHashCodeMethod(method)) { // The target does not implement the hashCode() method itself. return hashCode(); } else if (method.getDeclaringClass() == DecoratingProxy.class) { // There is only getDecoratedClass() declared -> dispatch to proxy config. return AopProxyUtils.ultimateTargetClass(this.advised); } else if (!this.advised.opaque && method.getDeclaringClass().isInterface() && method.getDeclaringClass().isAssignableFrom(Advised.class)) { // Service invocations on ProxyConfig with the proxy config... return AopUtils.invokeJoinpointUsingReflection(this.advised, method, args); } Object retVal; if (this.advised.exposeProxy) { // Make invocation available if necessary. oldProxy = AopContext.setCurrentProxy(proxy); setProxyContext = true; } // May be null. Get as late as possible to minimize the time we "own" the target, // in case it comes from a pool. target = targetSource.getTarget(); if (target != null) { targetClass = target.getClass(); } // Get the interception chain for this method. List chain = this.advised.getInterceptorsAndDynamicInterceptionAdvice(method, targetClass); // Check whether we have any advice. If we don't, we can fallback on direct // reflective invocation of the target, and avoid creating a MethodInvocation. if (chain.isEmpty()) { // We can skip creating a MethodInvocation: just invoke the target directly // Note that the final invoker must be an InvokerInterceptor so we know it does // nothing but a reflective operation on the target, and no hot swapping or fancy proxying. Object[] argsToUse = AopProxyUtils.adaptArgumentsIfNecessary(method, args); retVal = AopUtils.invokeJoinpointUsingReflection(target, method, argsToUse); } else { // We need to create a method invocation... invocation = new ReflectiveMethodInvocation(proxy, target, method, args, targetClass, chain); // Proceed to the joinpoint through the interceptor chain. retVal = invocation.proceed(); } // Massage return value if necessary. Class
returnType = method.getReturnType(); if (retVal != null && retVal == target && returnType != Object.class && returnType.isInstance(proxy) && !RawTargetAccess.class.isAssignableFrom(method.getDeclaringClass())) { // Special case: it returned "this" and the return type of the method // is type-compatible. Note that we can't help if the target sets // a reference to itself in another returned object. retVal = proxy; } else if (retVal == null && returnType != Void.TYPE && returnType.isPrimitive()) { throw new AopInvocationException( "Null return value from advice does not match primitive return type for: " + method); } return retVal; } finally { if (target != null && !targetSource.isStatic()) { // Must have come from TargetSource. targetSource.releaseTarget(target); } if (setProxyContext) { // Restore old proxy. AopContext.setCurrentProxy(oldProxy); } }}

@Async注解的拦截器是AsyncExecutionInterceptor,它继承了MethodInterceptor接口。而MethodInterceptor就是AOP规范中的Advice(切点的处理器)。chain不为空,执行第二个分支,构造ReflectiveMethodInvocation,然后执行proceed方法。

@Overridepublic Object proceed() throws Throwable {	//	We start with an index of -1 and increment early.	if (this.currentInterceptorIndex == this.interceptorsAndDynamicMethodMatchers.size() - 1) {		return invokeJoinpoint();	}	Object interceptorOrInterceptionAdvice =			this.interceptorsAndDynamicMethodMatchers.get(++this.currentInterceptorIndex);	if (interceptorOrInterceptionAdvice instanceof InterceptorAndDynamicMethodMatcher) {		// Evaluate dynamic method matcher here: static part will already have		// been evaluated and found to match.		InterceptorAndDynamicMethodMatcher dm =				(InterceptorAndDynamicMethodMatcher) interceptorOrInterceptionAdvice;		if (dm.methodMatcher.matches(this.method, this.targetClass, this.arguments)) {			return dm.interceptor.invoke(this);		}		else {			// Dynamic matching failed.			// Skip this interceptor and invoke the next in the chain.			return proceed();		}	}	else {		// It's an interceptor, so we just invoke it: The pointcut will have		// been evaluated statically before this object was constructed.		return ((MethodInterceptor) interceptorOrInterceptionAdvice).invoke(this);	}}

核心方法是InterceptorAndDynamicMethodMatcher.interceptor.invoke(this),实际就是执行了AsyncExecutionInterceptor.invoke。

/** * Intercept the given method invocation, submit the actual calling of the method to * the correct task executor and return immediately to the caller. * @param invocation the method to intercept and make asynchronous * @return {@link Future} if the original method returns {@code Future}; {@code null} * otherwise. */@Overridepublic Object invoke(final MethodInvocation invocation) throws Throwable {	Class
targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null); Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass); final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod); AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod); if (executor == null) { throw new IllegalStateException( "No executor specified and no default executor set on AsyncExecutionInterceptor either"); } Callable task = new Callable() { @Override public Object call() throws Exception { try { Object result = invocation.proceed(); if (result instanceof Future) { return ((Future
) result).get(); } } catch (ExecutionException ex) { handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments()); } catch (Throwable ex) { handleError(ex, userDeclaredMethod, invocation.getArguments()); } return null; } }; return doSubmit(task, executor, invocation.getMethod().getReturnType());}

spring-aop-4.3.18.RELEASE-sources.jar!/org/springframework/aop/interceptor/AsyncExecutionAspectSupport.java

/** * Determine the specific executor to use when executing the given method. * Should preferably return an {@link AsyncListenableTaskExecutor} implementation. * @return the executor to use (or {@code null}, but just if no default executor is available) */protected AsyncTaskExecutor determineAsyncExecutor(Method method) {	AsyncTaskExecutor executor = this.executors.get(method);	if (executor == null) {		Executor targetExecutor;		String qualifier = getExecutorQualifier(method);		if (StringUtils.hasLength(qualifier)) {			targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier);		}		else {			targetExecutor = this.defaultExecutor;			if (targetExecutor == null) {				synchronized (this.executors) {					if (this.defaultExecutor == null) {						this.defaultExecutor = getDefaultExecutor(this.beanFactory);					}					targetExecutor = this.defaultExecutor;				}			}		}		if (targetExecutor == null) {			return null;		}		executor = (targetExecutor instanceof AsyncListenableTaskExecutor ?				(AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor));		this.executors.put(method, executor);	}	return executor;}

@Aync注解有个value可以标注使用哪个executor,这里的getExecutorQualifier就是寻找这个标识。这里如果defaultExecutor为null的话,则获取找默认的executor。

/** * Retrieve or build a default executor for this advice instance. * An executor returned from here will be cached for further use. * 

The default implementation searches for a unique {@link TaskExecutor} bean * in the context, or for an {@link Executor} bean named "taskExecutor" otherwise. * If neither of the two is resolvable, this implementation will return {@code null}. * @param beanFactory the BeanFactory to use for a default executor lookup * @return the default executor, or {@code null} if none available * @since 4.2.6 * @see #findQualifiedExecutor(BeanFactory, String) * @see #DEFAULT_TASK_EXECUTOR_BEAN_NAME */protected Executor getDefaultExecutor(BeanFactory beanFactory) { if (beanFactory != null) { try { // Search for TaskExecutor bean... not plain Executor since that would // match with ScheduledExecutorService as well, which is unusable for // our purposes here. TaskExecutor is more clearly designed for it. return beanFactory.getBean(TaskExecutor.class); } catch (NoUniqueBeanDefinitionException ex) { logger.debug("Could not find unique TaskExecutor bean", ex); try { return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class); } catch (NoSuchBeanDefinitionException ex2) { if (logger.isInfoEnabled()) { logger.info("More than one TaskExecutor bean found within the context, and none is named " + "'taskExecutor'. Mark one of them as primary or name it 'taskExecutor' (possibly " + "as an alias) in order to use it for async processing: " + ex.getBeanNamesFound()); } } } catch (NoSuchBeanDefinitionException ex) { logger.debug("Could not find default TaskExecutor bean", ex); try { return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class); } catch (NoSuchBeanDefinitionException ex2) { logger.info("No task executor bean found for async processing: " + "no bean of type TaskExecutor and no bean named 'taskExecutor' either"); } // Giving up -> either using local default executor or none at all... } } return null;}

如果工程里头没有定义默认的task executor的话,则获取bean的时候会抛出NoSuchBeanDefinitionException。这就是为什么上面例子中会抛出如下错误的原因。

[nio-8002-exec-1] .s.a.AnnotationAsyncExecutionInterceptor : No task executor bean found for async processing: no bean of type TaskExecutor and no bean named 'taskExecutor' either

spring-aop-4.3.18.RELEASE-sources.jar!/org/springframework/aop/interceptor/AsyncExecutionInterceptor.java

/** * This implementation searches for a unique {@link org.springframework.core.task.TaskExecutor} * bean in the context, or for an {@link Executor} bean named "taskExecutor" otherwise. * If neither of the two is resolvable (e.g. if no {@code BeanFactory} was configured at all), * this implementation falls back to a newly created {@link SimpleAsyncTaskExecutor} instance * for local use if no default could be found. * @see #DEFAULT_TASK_EXECUTOR_BEAN_NAME */@Overrideprotected Executor getDefaultExecutor(BeanFactory beanFactory) {	Executor defaultExecutor = super.getDefaultExecutor(beanFactory);	return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor());}

AsyncExecutionInterceptor重写了getDefaultExecutor方法,先调用AsyncExecutionAspectSupport的getDefaultExecutor,如果默认的找不到,这里new一个SimpleAsyncTaskExecutor。这也是为什么上面例子中出现SimpleAsyncTaskExecutor线程前缀的原因。

五、总结

整体流程大体可梳理为两条线:

1.从注解开始:@EnableAsync--》ProxyAsyncConfiguration类构造一个bean(类型:AsyncAnnotationBeanPostProcessor)

2.从AsyncAnnotationBeanPostProcessor这个类的bean的生命周期走:AOP-Advisor切面初始化(setBeanFactory())--》AOP-生成代理类AopProxy(postProcessAfterInitialization())--》AOP-切点执行(InvocationHandler.invoke)

 

参考

https://segmentfault.com/a/1190000011339882

https://blog.csdn.net/qq_39470742/article/details/83382338

https://blog.csdn.net/fenglongmiao/article/details/82429460

https://www.baeldung.com/spring-async

转载于:https://my.oschina.net/lienson/blog/3037666

你可能感兴趣的文章
SFB 项目经验-20-Skype for Business for Android-下载到电脑
查看>>
SQL Server 2012笔记分享-57:数据文件和日志文件放置最佳实践
查看>>
CentOS 6.5 LVM磁盘管理学习笔记
查看>>
友链SEO工具:换链神器测试体验
查看>>
Xcode 4.5运行时出现iOS 模拟器找不到SDK
查看>>
第三章 Python丰富的数据类型
查看>>
VMM2012应用指南之9-向VMM中添加VMware ESX Server主机
查看>>
运维监控利器Nagios之:Nagios的日常维护和管理
查看>>
ERP-SAP Business One 食品行业方案
查看>>
交换机成环故障分析
查看>>
SIEM比以往更重要的5个原因
查看>>
heartbeat3.0.4安装配置详解
查看>>
《你的知识需要管理》推荐个人知识管理常用软件工具介绍(2013年最新版)
查看>>
社交营销模式: QQ空间、微信还是阿里浪?
查看>>
Step by Step WebMatrix网站开发之三:Razor语法之一
查看>>
Exchange日常管理之二十二:配置保留策略
查看>>
aix 5 swap空间调整
查看>>
mysql5.6表空间管理
查看>>
ActiveReports 6.0 - 高效开发UI
查看>>
开源中间件大舞台
查看>>