docker · 2024-09-04 0

dubbo 异常处理 ExceptionFilter

一、概述

项目使用了 Dubbo 进行不同系统服务间的调用,当服务提供端发生异常时,我们希望把异常传递给消费端,由消费端对异常进行 try-catch 捕获并处理。但在实际使用中,发现以往的异常处理在 dubbo 服务中并不能奏效。例如,自定义异常类 MyException 继承 RuntimeException,当服务端抛出这个异常时,消费端并不能捕获它。

版本:dubbo-2.7.23

1.ExceptionFilter

// ExceptionFilter.java
@Override
public void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation) {
    if (appResponse.hasException() && GenericService.class != invoker.getInterface()) {
        try {
            Throwable exception = appResponse.getException();

            // directly throw if it's checked exception
            if (!(exception instanceof RuntimeException) && (exception instanceof Exception)) {
                return;
            }
            // directly throw if the exception appears in the signature
            try {
                Method method = invoker.getInterface().getMethod(invocation.getMethodName(), invocation.getParameterTypes());
                Class<?>[] exceptionClasses = method.getExceptionTypes();
                for (Class<?> exceptionClass : exceptionClasses) {
                    if (exception.getClass().equals(exceptionClass)) {
                        return;
                    }
                }
            } catch (NoSuchMethodException e) {
                return;
            }

            // for the exception not found in method's signature, print ERROR message in server's log.
            logger.error("Got unchecked and undeclared exception which called by " + RpcContext.getContext().getRemoteHost() + ". service: " + invoker.getInterface().getName() + ", method: " + invocation.getMethodName() + ", exception: " + exception.getClass().getName() + ": " + exception.getMessage(), exception);

            // directly throw if exception class and interface class are in the same jar file.
            String serviceFile = ReflectUtils.getCodeBase(invoker.getInterface());
            String exceptionFile = ReflectUtils.getCodeBase(exception.getClass());
            if (serviceFile == null || exceptionFile == null || serviceFile.equals(exceptionFile)) {
                return;
            }
            // directly throw if it's JDK exception
            String className = exception.getClass().getName();
            if (className.startsWith("java.") || className.startsWith("javax.")) {
                return;
            }
            // directly throw if it's dubbo exception
            if (exception instanceof RpcException) {
                return;
            }

            // otherwise, wrap with RuntimeException and throw back to the client
            appResponse.setException(new RuntimeException(StringUtils.toString(exception)));
        } catch (Throwable e) {
            ...
        }
    }
}

2.ExceptionFilter 分析结论

  1. 如果是 checked 异常,直接返回
  2. 如果是方法签名声明的异常,直接返回
  3. 异常类和接口类在同一个 jar 包,直接返回
  4. 异常类的包名是都以 java.或是 javax.开头 的直接返回(是 JDK 自带的异常,直接抛出)
  5. 如果是 dubbo 本身的异常 RpcException 直接抛出
  6. 否则,封装成一个新的 RunTimeException 抛出

二、源码分析

1.加载 Filter

dubbo META-INF/dubbo/internal/org.apache.dubbo.rpc.Filter 文件定义的有:

...
monitor=org.apache.dubbo.monitor.support.MonitorFilter

echo=org.apache.dubbo.rpc.filter.EchoFilter
generic=org.apache.dubbo.rpc.filter.GenericFilter
classloader-callback=org.apache.dubbo.rpc.filter.ClassLoaderCallbackFilter
exception=org.apache.dubbo.rpc.filter.ExceptionFilter
timeout=org.apache.dubbo.rpc.filter.TimeoutFilter

trace=org.apache.dubbo.rpc.protocol.dubbo.filter.TraceFilter
...
// CommonConstants.java
String SERVICE_FILTER_KEY = "service.filter";

String PROVIDER = "provider";
// ProtocolFilterWrapper.java
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    if (UrlUtils.isRegistry(invoker.getUrl())) {
        return protocol.export(invoker);
    }
    FilterChainBuilder builder = getFilterChainBuilder(invoker.getUrl());
    return protocol.export(builder.buildInvokerChain(invoker, SERVICE_FILTER_KEY, CommonConstants.PROVIDER));
}

DefaultFilterChainBuilder 获得 filters

// DefaultFilterChainBuilder.java
@Override
public <T> Invoker<T> buildInvokerChain(final Invoker<T> originalInvoker, String key, String group) {
    ...
    List<Filter> filters;
    if (moduleModels != null && moduleModels.size() == 1) {
        filters = ScopeModelUtil.getExtensionLoader(Filter.class, moduleModels.get(0))
                .getActivateExtension(url, key, group);
    } else if (moduleModels != null && moduleModels.size() > 1) {
        ...
    } else {
        filters = ScopeModelUtil.getExtensionLoader(Filter.class, null).getActivateExtension(url, key, group);
    }

    if (!CollectionUtils.isEmpty(filters)) {
        for (int i = filters.size() - 1; i >= 0; i--) {
            final Filter filter = filters.get(i);
            final Invoker<T> next = last;
            last = new CopyOfFilterChainNode<>(originalInvoker, next, filter);
        }
        return new CallbackRegistrationInvoker<>(last, filters);
    }

    return last;
}

loadExtensionClasses() 方法,负责从 DubboInternalLoadingStrategy,DubboLoadingStrategy,ServicesLoadingStrategy,即从 META-INF/dubbo/internal/,META-INF/dubbo/,META-INF/services/。
getExtension() 方法,实例化 Extension。

// ExtensionLoader.java
public List<T> getActivateExtension(URL url, String key, String group) {
    String value = url.getParameter(key);
    return getActivateExtension(url, StringUtils.isEmpty(value) ? null : COMMA_SPLIT_PATTERN.split(value), group);
}

public List<T> getActivateExtension(URL url, String[] values, String group) {
    ...
    Set<String> namesSet = new HashSet<>(names);
    if (!namesSet.contains(REMOVE_VALUE_PREFIX + DEFAULT_KEY)) {
        ...
        // traverse all cached extensions
        cachedActivateGroups.forEach((name, activateGroup) -> {
            if (isMatchGroup(group, activateGroup)
                    && !namesSet.contains(name)
                    && !namesSet.contains(REMOVE_VALUE_PREFIX + name)
                    && isActive(cachedActivateValues.get(name), url)) {

                activateExtensionsMap.put(getExtensionClass(name), getExtension(name));
            }
        });
    }

    if (namesSet.contains(DEFAULT_KEY)) {
        ...
        return extensionsResult;
    } else {
        // add extensions, will be sorted by its order
        for (String name : names) {
            if (name.startsWith(REMOVE_VALUE_PREFIX) || namesSet.contains(REMOVE_VALUE_PREFIX + name)) {
                continue;
            }
            if (DEFAULT_KEY.equals(name)) {
                continue;
            }
            if (containsExtension(name)) {
                activateExtensionsMap.put(getExtensionClass(name), getExtension(name));
            }
        }
        return new ArrayList<>(activateExtensionsMap.values());
    }
}

private Map<String, Class<?>> getExtensionClasses() {
    Map<String, Class<?>> classes = cachedClasses.get();
    if (classes == null) {
        loadExtensionClassesLock.lock();
        try {
            classes = cachedClasses.get();
            if (classes == null) {
                try {
                    classes = loadExtensionClasses();
                } catch (InterruptedException e) {
                    ...
                }
                cachedClasses.set(classes);
            }
        } finally {
            loadExtensionClassesLock.unlock();
        }
    }
    return classes;
}

2.执行 Filter

// DubboProtocol.java
public DubboProtocol(FrameworkModel frameworkModel) {
    requestHandler = new ExchangeHandlerAdapter(frameworkModel) {

        @Override
        public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {
            ...
            RpcContext.getServiceContext().setRemoteAddress(channel.getRemoteAddress());
            Result result = invoker.invoke(inv);
            return result.thenApply(Function.identity());
        }
    }

}

现执行每个 filter 的 invoke(invocation) 方法,然后执行每个 filter 的 onResponse(r, filterInvoker, invocation) 方法

// FilterChainBuilder$CallbackRegistrationInvoker.java
@Override
public Result invoke(Invocation invocation) throws RpcException {
    Result asyncResult = filterInvoker.invoke(invocation);
    asyncResult.whenCompleteWithContext((r, t) -> {
        RuntimeException filterRuntimeException = null;
        for (int i = filters.size() - 1; i >= 0; i--) {
            FILTER filter = filters.get(i);
            try {
                InvocationProfilerUtils.releaseDetailProfiler(invocation);
                if (filter instanceof ListenableFilter) {
                    ListenableFilter listenableFilter = ((ListenableFilter) filter);
                    Filter.Listener listener = listenableFilter.listener(invocation);
                    try {
                        if (listener != null) {
                            if (t == null) {
                                listener.onResponse(r, filterInvoker, invocation);
                            } else {
                                listener.onError(t, filterInvoker, invocation);
                            }
                        }
                    } finally {
                        listenableFilter.removeListener(invocation);
                    }
                } else if (filter instanceof FILTER.Listener) {
                    FILTER.Listener listener = (FILTER.Listener) filter;
                    if (t == null) {
                        listener.onResponse(r, filterInvoker, invocation);
                    } else {
                        listener.onError(t, filterInvoker, invocation);
                    }
                }
            } catch (RuntimeException runtimeException) {
                LOGGER.error(
                        CLUSTER_EXECUTE_FILTER_EXCEPTION,
                        "the custom filter is abnormal",
                        "",
                        String.format(
                                "Exception occurred while executing the %s filter named %s.",
                                i, filter.getClass().getSimpleName()));
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug(String.format(
                            "Whole filter list is: %s",
                            filters.stream()
                                    .map(tmpFilter ->
                                            tmpFilter.getClass().getSimpleName())
                                    .collect(Collectors.toList())));
                }
                filterRuntimeException = runtimeException;
                t = runtimeException;
            }
        }
        if (filterRuntimeException != null) {
            throw filterRuntimeException;
        }
    });

    return asyncResult;
}
// FilterChainBuilder$CallbackRegistrationInvoker.java
@Override
public Result invoke(Invocation invocation) throws RpcException {
    Result asyncResult;
    try {
        InvocationProfilerUtils.enterDetailProfiler(
                invocation, () -> "Filter " + filter.getClass().getName() + " invoke.");
        asyncResult = filter.invoke(nextNode, invocation);
        if (!(asyncResult instanceof AsyncRpcResult)) {
            ...
            throw new RpcException(msg);
        }
    } catch (Exception e) {
        ...
        throw e;
    } finally {

    }
    return asyncResult;
}
// ExceptionFilter.java
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
    return invoker.invoke(invocation);
}

@Override
public void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation) {
    if (appResponse.hasException() && GenericService.class != invoker.getInterface()) {
        try {
            Throwable exception = appResponse.getException();

            // directly throw if it's checked exception
            if (!(exception instanceof RuntimeException) && (exception instanceof Exception)) {
                return;
            }
            // directly throw if the exception appears in the signature
            try {
                Method method = invoker.getInterface()
                        .getMethod(RpcUtils.getMethodName(invocation), invocation.getParameterTypes());
                Class<?>[] exceptionClasses = method.getExceptionTypes();
                for (Class<?> exceptionClass : exceptionClasses) {
                    if (exception.getClass().equals(exceptionClass)) {
                        return;
                    }
                }
            } catch (NoSuchMethodException e) {
                return;
            }

            // for the exception not found in method's signature, print ERROR message in server's log.
            logger.error(
                    CONFIG_FILTER_VALIDATION_EXCEPTION,
                    "",
                    "",
                    "Got unchecked and undeclared exception which called by "
                            + RpcContext.getServiceContext().getRemoteHost() + ". service: "
                            + invoker.getInterface().getName() + ", method: " + RpcUtils.getMethodName(invocation)
                            + ", exception: "
                            + exception.getClass().getName() + ": " + exception.getMessage(),
                    exception);

            // directly throw if exception class and interface class are in the same jar file.
            String serviceFile = ReflectUtils.getCodeBase(invoker.getInterface());
            String exceptionFile = ReflectUtils.getCodeBase(exception.getClass());
            if (serviceFile == null || exceptionFile == null || serviceFile.equals(exceptionFile)) {
                return;
            }
            // directly throw if it's JDK exception
            String className = exception.getClass().getName();
            if (className.startsWith("java.")
                    || className.startsWith("javax.")
                    || className.startsWith("jakarta.")) {
                return;
            }
            // directly throw if it's dubbo exception
            if (exception instanceof RpcException) {
                return;
            }

            // otherwise, wrap with RuntimeException and throw back to the client
            appResponse.setException(new RuntimeException(StringUtils.toString(exception)));
        } catch (Throwable e) {
            ...
        }
    }
}

AbstractProxyInvoker 负责执行目标方法,如目标方法抛出异常,则这里会把异常放入 AsyncRpcResult 中

// AbstractProxyInvoker.java
@Override
public Result invoke(Invocation invocation) throws RpcException {
    ProfilerEntry originEntry = null;
    try {
        ...
        Object value = doInvoke(
                proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments());

        CompletableFuture<Object> future = wrapWithFuture(value, invocation);
        CompletableFuture<AppResponse> appResponseFuture = future.handle((obj, t) -> {
            AppResponse result = new AppResponse(invocation);
            if (t != null) {
                if (t instanceof CompletionException) {
                    result.setException(t.getCause());
                } else {
                    result.setException(t);
                }
            } else {
                result.setValue(obj);
            }
            return result;
        });
        return new AsyncRpcResult(appResponseFuture, invocation);
    } catch (InvocationTargetException e) {
        ...
        return AsyncRpcResult.newDefaultAsyncResult(null, e.getTargetException(), invocation);
    } catch (Throwable e) {
        ...
    } finally {
        ...
    }
}