一、概述
项目使用了 Dubbo 进行不同系统服务间的调用,当服务提供端发生异常时,我们希望把异常传递给消费端,由消费端对异常进行 try-catch 捕获并处理。但在实际使用中,发现以往的异常处理在 dubbo 服务中并不能奏效。例如,自定义异常类 MyException 继承 RuntimeException,当服务端抛出这个异常时,消费端并不能捕获它。
版本:dubbo-2.7.23
1.ExceptionFilter
// ExceptionFilter.java
@Override
public void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation) {
if (appResponse.hasException() && GenericService.class != invoker.getInterface()) {
try {
Throwable exception = appResponse.getException();
// directly throw if it's checked exception
if (!(exception instanceof RuntimeException) && (exception instanceof Exception)) {
return;
}
// directly throw if the exception appears in the signature
try {
Method method = invoker.getInterface().getMethod(invocation.getMethodName(), invocation.getParameterTypes());
Class<?>[] exceptionClasses = method.getExceptionTypes();
for (Class<?> exceptionClass : exceptionClasses) {
if (exception.getClass().equals(exceptionClass)) {
return;
}
}
} catch (NoSuchMethodException e) {
return;
}
// for the exception not found in method's signature, print ERROR message in server's log.
logger.error("Got unchecked and undeclared exception which called by " + RpcContext.getContext().getRemoteHost() + ". service: " + invoker.getInterface().getName() + ", method: " + invocation.getMethodName() + ", exception: " + exception.getClass().getName() + ": " + exception.getMessage(), exception);
// directly throw if exception class and interface class are in the same jar file.
String serviceFile = ReflectUtils.getCodeBase(invoker.getInterface());
String exceptionFile = ReflectUtils.getCodeBase(exception.getClass());
if (serviceFile == null || exceptionFile == null || serviceFile.equals(exceptionFile)) {
return;
}
// directly throw if it's JDK exception
String className = exception.getClass().getName();
if (className.startsWith("java.") || className.startsWith("javax.")) {
return;
}
// directly throw if it's dubbo exception
if (exception instanceof RpcException) {
return;
}
// otherwise, wrap with RuntimeException and throw back to the client
appResponse.setException(new RuntimeException(StringUtils.toString(exception)));
} catch (Throwable e) {
...
}
}
}
2.ExceptionFilter 分析结论
- 如果是 checked 异常,直接返回
- 如果是方法签名声明的异常,直接返回
- 异常类和接口类在同一个 jar 包,直接返回
- 异常类的包名是都以 java.或是 javax.开头 的直接返回(是 JDK 自带的异常,直接抛出)
- 如果是 dubbo 本身的异常 RpcException 直接抛出
- 否则,封装成一个新的 RunTimeException 抛出
二、源码分析
1.加载 Filter
dubbo META-INF/dubbo/internal/org.apache.dubbo.rpc.Filter 文件定义的有:
...
monitor=org.apache.dubbo.monitor.support.MonitorFilter
echo=org.apache.dubbo.rpc.filter.EchoFilter
generic=org.apache.dubbo.rpc.filter.GenericFilter
classloader-callback=org.apache.dubbo.rpc.filter.ClassLoaderCallbackFilter
exception=org.apache.dubbo.rpc.filter.ExceptionFilter
timeout=org.apache.dubbo.rpc.filter.TimeoutFilter
trace=org.apache.dubbo.rpc.protocol.dubbo.filter.TraceFilter
...
// CommonConstants.java
String SERVICE_FILTER_KEY = "service.filter";
String PROVIDER = "provider";
// ProtocolFilterWrapper.java
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
if (UrlUtils.isRegistry(invoker.getUrl())) {
return protocol.export(invoker);
}
FilterChainBuilder builder = getFilterChainBuilder(invoker.getUrl());
return protocol.export(builder.buildInvokerChain(invoker, SERVICE_FILTER_KEY, CommonConstants.PROVIDER));
}
DefaultFilterChainBuilder 获得 filters
// DefaultFilterChainBuilder.java
@Override
public <T> Invoker<T> buildInvokerChain(final Invoker<T> originalInvoker, String key, String group) {
...
List<Filter> filters;
if (moduleModels != null && moduleModels.size() == 1) {
filters = ScopeModelUtil.getExtensionLoader(Filter.class, moduleModels.get(0))
.getActivateExtension(url, key, group);
} else if (moduleModels != null && moduleModels.size() > 1) {
...
} else {
filters = ScopeModelUtil.getExtensionLoader(Filter.class, null).getActivateExtension(url, key, group);
}
if (!CollectionUtils.isEmpty(filters)) {
for (int i = filters.size() - 1; i >= 0; i--) {
final Filter filter = filters.get(i);
final Invoker<T> next = last;
last = new CopyOfFilterChainNode<>(originalInvoker, next, filter);
}
return new CallbackRegistrationInvoker<>(last, filters);
}
return last;
}
loadExtensionClasses() 方法,负责从 DubboInternalLoadingStrategy,DubboLoadingStrategy,ServicesLoadingStrategy,即从 META-INF/dubbo/internal/,META-INF/dubbo/,META-INF/services/。
getExtension() 方法,实例化 Extension。
// ExtensionLoader.java
public List<T> getActivateExtension(URL url, String key, String group) {
String value = url.getParameter(key);
return getActivateExtension(url, StringUtils.isEmpty(value) ? null : COMMA_SPLIT_PATTERN.split(value), group);
}
public List<T> getActivateExtension(URL url, String[] values, String group) {
...
Set<String> namesSet = new HashSet<>(names);
if (!namesSet.contains(REMOVE_VALUE_PREFIX + DEFAULT_KEY)) {
...
// traverse all cached extensions
cachedActivateGroups.forEach((name, activateGroup) -> {
if (isMatchGroup(group, activateGroup)
&& !namesSet.contains(name)
&& !namesSet.contains(REMOVE_VALUE_PREFIX + name)
&& isActive(cachedActivateValues.get(name), url)) {
activateExtensionsMap.put(getExtensionClass(name), getExtension(name));
}
});
}
if (namesSet.contains(DEFAULT_KEY)) {
...
return extensionsResult;
} else {
// add extensions, will be sorted by its order
for (String name : names) {
if (name.startsWith(REMOVE_VALUE_PREFIX) || namesSet.contains(REMOVE_VALUE_PREFIX + name)) {
continue;
}
if (DEFAULT_KEY.equals(name)) {
continue;
}
if (containsExtension(name)) {
activateExtensionsMap.put(getExtensionClass(name), getExtension(name));
}
}
return new ArrayList<>(activateExtensionsMap.values());
}
}
private Map<String, Class<?>> getExtensionClasses() {
Map<String, Class<?>> classes = cachedClasses.get();
if (classes == null) {
loadExtensionClassesLock.lock();
try {
classes = cachedClasses.get();
if (classes == null) {
try {
classes = loadExtensionClasses();
} catch (InterruptedException e) {
...
}
cachedClasses.set(classes);
}
} finally {
loadExtensionClassesLock.unlock();
}
}
return classes;
}
2.执行 Filter
// DubboProtocol.java
public DubboProtocol(FrameworkModel frameworkModel) {
requestHandler = new ExchangeHandlerAdapter(frameworkModel) {
@Override
public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {
...
RpcContext.getServiceContext().setRemoteAddress(channel.getRemoteAddress());
Result result = invoker.invoke(inv);
return result.thenApply(Function.identity());
}
}
}
现执行每个 filter 的 invoke(invocation) 方法,然后执行每个 filter 的 onResponse(r, filterInvoker, invocation) 方法
// FilterChainBuilder$CallbackRegistrationInvoker.java
@Override
public Result invoke(Invocation invocation) throws RpcException {
Result asyncResult = filterInvoker.invoke(invocation);
asyncResult.whenCompleteWithContext((r, t) -> {
RuntimeException filterRuntimeException = null;
for (int i = filters.size() - 1; i >= 0; i--) {
FILTER filter = filters.get(i);
try {
InvocationProfilerUtils.releaseDetailProfiler(invocation);
if (filter instanceof ListenableFilter) {
ListenableFilter listenableFilter = ((ListenableFilter) filter);
Filter.Listener listener = listenableFilter.listener(invocation);
try {
if (listener != null) {
if (t == null) {
listener.onResponse(r, filterInvoker, invocation);
} else {
listener.onError(t, filterInvoker, invocation);
}
}
} finally {
listenableFilter.removeListener(invocation);
}
} else if (filter instanceof FILTER.Listener) {
FILTER.Listener listener = (FILTER.Listener) filter;
if (t == null) {
listener.onResponse(r, filterInvoker, invocation);
} else {
listener.onError(t, filterInvoker, invocation);
}
}
} catch (RuntimeException runtimeException) {
LOGGER.error(
CLUSTER_EXECUTE_FILTER_EXCEPTION,
"the custom filter is abnormal",
"",
String.format(
"Exception occurred while executing the %s filter named %s.",
i, filter.getClass().getSimpleName()));
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(String.format(
"Whole filter list is: %s",
filters.stream()
.map(tmpFilter ->
tmpFilter.getClass().getSimpleName())
.collect(Collectors.toList())));
}
filterRuntimeException = runtimeException;
t = runtimeException;
}
}
if (filterRuntimeException != null) {
throw filterRuntimeException;
}
});
return asyncResult;
}
// FilterChainBuilder$CallbackRegistrationInvoker.java
@Override
public Result invoke(Invocation invocation) throws RpcException {
Result asyncResult;
try {
InvocationProfilerUtils.enterDetailProfiler(
invocation, () -> "Filter " + filter.getClass().getName() + " invoke.");
asyncResult = filter.invoke(nextNode, invocation);
if (!(asyncResult instanceof AsyncRpcResult)) {
...
throw new RpcException(msg);
}
} catch (Exception e) {
...
throw e;
} finally {
}
return asyncResult;
}
// ExceptionFilter.java
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
return invoker.invoke(invocation);
}
@Override
public void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation) {
if (appResponse.hasException() && GenericService.class != invoker.getInterface()) {
try {
Throwable exception = appResponse.getException();
// directly throw if it's checked exception
if (!(exception instanceof RuntimeException) && (exception instanceof Exception)) {
return;
}
// directly throw if the exception appears in the signature
try {
Method method = invoker.getInterface()
.getMethod(RpcUtils.getMethodName(invocation), invocation.getParameterTypes());
Class<?>[] exceptionClasses = method.getExceptionTypes();
for (Class<?> exceptionClass : exceptionClasses) {
if (exception.getClass().equals(exceptionClass)) {
return;
}
}
} catch (NoSuchMethodException e) {
return;
}
// for the exception not found in method's signature, print ERROR message in server's log.
logger.error(
CONFIG_FILTER_VALIDATION_EXCEPTION,
"",
"",
"Got unchecked and undeclared exception which called by "
+ RpcContext.getServiceContext().getRemoteHost() + ". service: "
+ invoker.getInterface().getName() + ", method: " + RpcUtils.getMethodName(invocation)
+ ", exception: "
+ exception.getClass().getName() + ": " + exception.getMessage(),
exception);
// directly throw if exception class and interface class are in the same jar file.
String serviceFile = ReflectUtils.getCodeBase(invoker.getInterface());
String exceptionFile = ReflectUtils.getCodeBase(exception.getClass());
if (serviceFile == null || exceptionFile == null || serviceFile.equals(exceptionFile)) {
return;
}
// directly throw if it's JDK exception
String className = exception.getClass().getName();
if (className.startsWith("java.")
|| className.startsWith("javax.")
|| className.startsWith("jakarta.")) {
return;
}
// directly throw if it's dubbo exception
if (exception instanceof RpcException) {
return;
}
// otherwise, wrap with RuntimeException and throw back to the client
appResponse.setException(new RuntimeException(StringUtils.toString(exception)));
} catch (Throwable e) {
...
}
}
}
AbstractProxyInvoker 负责执行目标方法,如目标方法抛出异常,则这里会把异常放入 AsyncRpcResult 中
// AbstractProxyInvoker.java
@Override
public Result invoke(Invocation invocation) throws RpcException {
ProfilerEntry originEntry = null;
try {
...
Object value = doInvoke(
proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments());
CompletableFuture<Object> future = wrapWithFuture(value, invocation);
CompletableFuture<AppResponse> appResponseFuture = future.handle((obj, t) -> {
AppResponse result = new AppResponse(invocation);
if (t != null) {
if (t instanceof CompletionException) {
result.setException(t.getCause());
} else {
result.setException(t);
}
} else {
result.setValue(obj);
}
return result;
});
return new AsyncRpcResult(appResponseFuture, invocation);
} catch (InvocationTargetException e) {
...
return AsyncRpcResult.newDefaultAsyncResult(null, e.getTargetException(), invocation);
} catch (Throwable e) {
...
} finally {
...
}
}