一、准备
1.pom 依赖
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.2</version>
<relativePath/>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
</dependencies>
2.java
@Service
public class Service1Impl {
@Async
public String method1() throws InterruptedException {
TimeUnit.SECONDS.sleep(5);
return "this is method1";
}
@Async
public CompletableFuture<String> method2() throws InterruptedException {
TimeUnit.SECONDS.sleep(5);
return CompletableFuture.completedFuture("this is method2");
}
@Async
public ListenableFuture<String> method3() throws InterruptedException {
TimeUnit.SECONDS.sleep(5);
return AsyncResult.forValue("this is method3");
}
@Async
public Future<String> method4() throws InterruptedException {
TimeUnit.SECONDS.sleep(5);
return AsyncResult.forValue("this is method4");
}
}
@Component
public class MyApplicationRunner implements ApplicationRunner {
@Autowired
private Service1Impl service;
@Override
public void run(ApplicationArguments args) throws Exception {
// 返回 null
String result1 = service.method1();
// 返回 Future
Future<String> result2 = service.method2();
Future<String> result3 = service.method3();
Future<String> result4 = service.method4();
System.out.println(result1);
System.out.println(result2.get());
System.out.println(result3.get());
System.out.println(result4.get());
System.out.println("- - -");
}
}
@EnableAsync
@SpringBootApplication
public class AsyncBoot {
public static void main(String[] args) {
SpringApplication.run(AsyncBoot.class, args);
}
}
二、原理
1.注册 AsyncAnnotationBeanPostProcessor
AsyncAnnotationBeanPostProcessor 继承了 ProxyConfig,
proxyTargetClass 值来自 @EnableAsync 注解,默认是 false
// EnableAsync.java
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(AsyncConfigurationSelector.class)
public @interface EnableAsync {
...
}
// AsyncConfigurationSelector.java
public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> {
...
@Override
@Nullable
public String[] selectImports(AdviceMode adviceMode) {
switch (adviceMode) {
case PROXY:
return new String[] {ProxyAsyncConfiguration.class.getName()};
case ASPECTJ:
return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME};
default:
return null;
}
}
}
// ProxyAsyncConfiguration.java
@Configuration(proxyBeanMethods = false)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {
@Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public AsyncAnnotationBeanPostProcessor asyncAdvisor() {
Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected");
AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();
bpp.configure(this.executor, this.exceptionHandler);
Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");
if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) {
bpp.setAsyncAnnotationType(customAsyncAnnotation);
}
bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));
bpp.setOrder(this.enableAsync.<Integer>getNumber("order"));
return bpp;
}
@Override
public void setImportMetadata(AnnotationMetadata importMetadata) {
this.enableAsync = AnnotationAttributes.fromMap(
importMetadata.getAnnotationAttributes(EnableAsync.class.getName()));
if (this.enableAsync == null) {
throw new IllegalArgumentException(
"@EnableAsync is not present on importing class " + importMetadata.getClassName());
}
}
}
2.创建 AsyncAnnotationAdvisor
// AsyncAnnotationBeanPostProcessor.java
public class AsyncAnnotationBeanPostProcessor extends AbstractBeanFactoryAwareAdvisingPostProcessor {
...
@Override
public void setBeanFactory(BeanFactory beanFactory) {
super.setBeanFactory(beanFactory);
AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);
if (this.asyncAnnotationType != null) {
advisor.setAsyncAnnotationType(this.asyncAnnotationType);
}
advisor.setBeanFactory(beanFactory);
this.advisor = advisor;
}
}
// AsyncAnnotationAdvisor.java
public AsyncAnnotationAdvisor(
@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<>(2);
asyncAnnotationTypes.add(Async.class);
...
this.advice = buildAdvice(executor, exceptionHandler);
this.pointcut = buildPointcut(asyncAnnotationTypes);
}
protected Advice buildAdvice(
@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);
interceptor.configure(executor, exceptionHandler);
return interceptor;
}
3.创建代理类 AopProxy
所有的 bean 会执行 AsyncAnnotationBeanPostProcessor 的 postProcessAfterInitialization 方法
构造 ProxyFactory,ProxyFactory 增加 Advisor 为 AsyncAnnotationAdvisor
// AsyncAnnotationBeanPostProcessor
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) {
if (this.advisor == null || bean instanceof AopInfrastructureBean) {
...
return bean;
}
if (bean instanceof Advised) {
Advised advised = (Advised) bean;
if (!advised.isFrozen() && isEligible(AopUtils.getTargetClass(bean))) {
...
return bean;
}
}
if (isEligible(bean, beanName)) {
ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName);
if (!proxyFactory.isProxyTargetClass()) {
evaluateProxyInterfaces(bean.getClass(), proxyFactory);
}
proxyFactory.addAdvisor(this.advisor);
customizeProxyFactory(proxyFactory);
...
return proxyFactory.getProxy(classLoader);
}
return bean;
}
@Override
protected boolean isEligible(Object bean, String beanName) {
return (!AutoProxyUtils.isOriginalInstance(beanName, bean.getClass()) &&
super.isEligible(bean, beanName));
}
protected boolean isEligible(Object bean, String beanName) {
return isEligible(bean.getClass());
}
protected boolean isEligible(Class<?> targetClass) {
Boolean eligible = this.eligibleBeans.get(targetClass);
if (eligible != null) {
return eligible;
}
if (this.advisor == null) {
return false;
}
eligible = AopUtils.canApply(this.advisor, targetClass);
this.eligibleBeans.put(targetClass, eligible);
return eligible;
}
ProxyFactory 创建代理类
// ProxyFactory.java
public Object getProxy(@Nullable ClassLoader classLoader) {
return createAopProxy().getProxy(classLoader);
}
// ProxyCreatorSupport.java
protected final synchronized AopProxy createAopProxy() {
if (!this.active) {
activate();
}
return getAopProxyFactory().createAopProxy(this);
}
// DefaultAopProxyFactory.java
@Override
public AopProxy createAopProxy(AdvisedSupport config) throws AopConfigException {
if (!NativeDetector.inNativeImage() &&
(config.isOptimize() || config.isProxyTargetClass() || hasNoUserSuppliedProxyInterfaces(config))) {
Class<?> targetClass = config.getTargetClass();
if (targetClass == null) {
throw new AopConfigException("TargetSource cannot determine target class: " +
"Either an interface or a target is required for proxy creation.");
}
if (targetClass.isInterface() || Proxy.isProxyClass(targetClass) || ClassUtils.isLambdaClass(targetClass)) {
return new JdkDynamicAopProxy(config);
}
return new ObjenesisCglibAopProxy(config);
}
else {
return new JdkDynamicAopProxy(config);
}
}
4.创建代理类
4.1创建 jdk 代理类
// JdkDynamicAopProxy.java
@Override
public Object getProxy(@Nullable ClassLoader classLoader) {
...
return Proxy.newProxyInstance(classLoader, this.proxiedInterfaces, this);
}
4.2 执行 jdk 代理方法
当执行被代理的方法时,会走 JdkDynamicAopProxy 的 invoke 方法
执行 this.advised.getInterceptorsAndDynamicInterceptionAdvice 时,把符合的方法转化为执行链,
如 @Async 转化为 AnnotationAsyncExecutionInterceptor
这里用到过滤器链设计模式,实现多层代理
// JdkDynamicAopProxy.java
@Override
@Nullable
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Object oldProxy = null;
boolean setProxyContext = false;
TargetSource targetSource = this.advised.targetSource;
Object target = null;
try {
if (!this.equalsDefined && AopUtils.isEqualsMethod(method)) {
return equals(args[0]);
}
else if (!this.hashCodeDefined && AopUtils.isHashCodeMethod(method)) {
return hashCode();
}
else if (method.getDeclaringClass() == DecoratingProxy.class) {
return AopProxyUtils.ultimateTargetClass(this.advised);
}
else if (!this.advised.opaque && method.getDeclaringClass().isInterface() &&
method.getDeclaringClass().isAssignableFrom(Advised.class)) {
return AopUtils.invokeJoinpointUsingReflection(this.advised, method, args);
}
Object retVal;
if (this.advised.exposeProxy) {
oldProxy = AopContext.setCurrentProxy(proxy);
setProxyContext = true;
}
target = targetSource.getTarget();
Class<?> targetClass = (target != null ? target.getClass() : null);
List<Object> chain = this.advised.getInterceptorsAndDynamicInterceptionAdvice(method, targetClass);
if (chain.isEmpty()) {
Object[] argsToUse = AopProxyUtils.adaptArgumentsIfNecessary(method, args);
retVal = AopUtils.invokeJoinpointUsingReflection(target, method, argsToUse);
}
else {
MethodInvocation invocation =
new ReflectiveMethodInvocation(proxy, target, method, args, targetClass, chain);
retVal = invocation.proceed();
}
...
return retVal;
}
...
}
使用提交任务到线程池
AsyncExecutionInterceptor#determineAsyncExecutor(Method method) 会依次找
- @Async 标注的线程名的线程
- beanFactory.getBean(TaskExecutor.class)
- beanFactory.getBean(“taskExecutor”, Executor.class)
- new SimpleAsyncTaskExecutor()
// AsyncExecutionInterceptor.java
@Override
@Nullable
public Object invoke(final MethodInvocation invocation) throws Throwable {
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);
AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
...
Callable<Object> task = () -> {
try {
Object result = invocation.proceed();
if (result instanceof Future) {
return ((Future<?>) result).get();
}
}
...
return null;
};
return doSubmit(task, executor, invocation.getMethod().getReturnType());
}
@Nullable
protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) {
if (CompletableFuture.class.isAssignableFrom(returnType)) {
return CompletableFuture.supplyAsync(() -> {
try {
return task.call();
}
...
}, executor);
}
else if (ListenableFuture.class.isAssignableFrom(returnType)) {
return ((AsyncListenableTaskExecutor) executor).submitListenable(task);
}
else if (Future.class.isAssignableFrom(returnType)) {
return executor.submit(task);
}
else {
executor.submit(task);
return null;
}
}
private SingletonSupplier<Executor> defaultExecutor;
this.defaultExecutor = new SingletonSupplier<>(defaultExecutor, () -> getDefaultExecutor(this.beanFactory));
@Nullable
protected AsyncTaskExecutor determineAsyncExecutor(Method method) {
AsyncTaskExecutor executor = this.executors.get(method);
if (executor == null) {
Executor targetExecutor;
String qualifier = getExecutorQualifier(method);
if (StringUtils.hasLength(qualifier)) {
targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier);
}
else {
targetExecutor = this.defaultExecutor.get();
}
if (targetExecutor == null) {
return null;
}
executor = (targetExecutor instanceof AsyncListenableTaskExecutor ?
(AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor));
this.executors.put(method, executor);
}
return executor;
}
@Override
@Nullable
protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
Executor defaultExecutor = super.getDefaultExecutor(beanFactory);
return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor());
}