|
41 | 41 | import org.apache.flink.kubernetes.KubernetesClusterClientFactory; |
42 | 42 | import org.apache.flink.kubernetes.KubernetesClusterDescriptor; |
43 | 43 | import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; |
44 | | -import org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient; |
45 | | -import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient; |
46 | 44 | import org.apache.flink.python.PythonOptions; |
47 | 45 |
|
48 | | -import java.lang.reflect.Method; |
| 46 | +import java.lang.reflect.InvocationTargetException; |
49 | 47 | import java.util.Collections; |
50 | 48 | import java.util.Map; |
51 | 49 | import java.util.UUID; |
|
55 | 53 | import cn.hutool.core.io.FileUtil; |
56 | 54 | import cn.hutool.core.lang.Assert; |
57 | 55 | import cn.hutool.core.text.StrFormatter; |
58 | | -import cn.hutool.core.util.ReflectUtil; |
59 | 56 | import cn.hutool.core.util.StrUtil; |
60 | 57 | import io.fabric8.kubernetes.api.model.Pod; |
61 | 58 | import lombok.Data; |
@@ -226,26 +223,37 @@ public TestResult test() { |
226 | 223 | // Test mode no jobName, use uuid . |
227 | 224 | addConfigParas(KubernetesConfigOptions.CLUSTER_ID, UUID.randomUUID().toString()); |
228 | 225 | initConfig(); |
229 | | - FlinkKubeClient client = k8sClientHelper.getClient(); |
230 | | - if (client instanceof Fabric8FlinkKubeClient) { |
231 | | - Object internalClient = ReflectUtil.getFieldValue(client, "internalClient"); |
232 | | - Method method = ReflectUtil.getMethod(internalClient.getClass(), "getVersion"); |
233 | | - Object versionInfo = method.invoke(internalClient); |
234 | | - logger.info( |
235 | | - "k8s cluster link successful ; k8s version: {} ; platform: {}", |
236 | | - ReflectUtil.getFieldValue(versionInfo, "gitVersion"), |
237 | | - ReflectUtil.getFieldValue(versionInfo, "platform")); |
238 | | - } |
| 226 | + String namespace = configuration.get(KubernetesConfigOptions.NAMESPACE); |
| 227 | + k8sClientHelper.getKubernetesClient().pods().inNamespace(namespace).list(); |
| 228 | + logger.info("k8s cluster link successful ; namespace: {}", namespace); |
239 | 229 | return TestResult.success(); |
240 | 230 | } catch (Exception e) { |
241 | 231 | logger.error(Status.GATEWAY_KUBERNETES_TEST_FAILED.getMessage(), e); |
| 232 | + String errorDetail = extractTestErrorDetail(e); |
242 | 233 | return TestResult.fail( |
243 | | - StrFormatter.format("{}:{}", Status.GATEWAY_KUBERNETES_TEST_FAILED.getMessage(), e.getMessage())); |
| 234 | + StrFormatter.format("{} {}", Status.GATEWAY_KUBERNETES_TEST_FAILED.getMessage(), errorDetail)); |
244 | 235 | } finally { |
245 | 236 | close(); |
246 | 237 | } |
247 | 238 | } |
248 | 239 |
|
| 240 | + static String extractTestErrorDetail(Throwable throwable) { |
| 241 | + Throwable rootCause = throwable; |
| 242 | + while (rootCause instanceof InvocationTargetException |
| 243 | + && ((InvocationTargetException) rootCause).getTargetException() != null) { |
| 244 | + rootCause = ((InvocationTargetException) rootCause).getTargetException(); |
| 245 | + } |
| 246 | + while (rootCause.getCause() != null && rootCause.getCause() != rootCause) { |
| 247 | + rootCause = rootCause.getCause(); |
| 248 | + } |
| 249 | + |
| 250 | + String message = rootCause.getMessage(); |
| 251 | + if (StringUtils.isBlank(message)) { |
| 252 | + return rootCause.getClass().getName(); |
| 253 | + } |
| 254 | + return StrFormatter.format("{}: {}", rootCause.getClass().getName(), message); |
| 255 | + } |
| 256 | + |
249 | 257 | @Override |
250 | 258 | public void killCluster() { |
251 | 259 | log.info("Start kill cluster: " + config.getFlinkConfig().getJobName()); |
|
0 commit comments