Browse Source

fix: Processor not changed after container redeployment #850

tjq 1 year ago
parent
commit
86ec85331a

+ 6 - 5
powerjob-server/powerjob-server-persistence/src/test/java/tech/powerjob/server/persistence/storage/impl/MinioOssServiceTest.java

@@ -1,18 +1,18 @@
 package tech.powerjob.server.persistence.storage.impl;
 
-import org.apache.commons.lang3.exception.ExceptionUtils;
+import lombok.extern.slf4j.Slf4j;
 import tech.powerjob.server.extension.dfs.DFsService;
 
 import java.util.Optional;
 
-import static org.junit.jupiter.api.Assertions.*;
-
 /**
- * desc
+ * MinioOssServiceTest
+ * 测试需要先本地部署 minio,因此捕获异常,失败也不阻断测试
  *
  * @author tjq
  * @since 2024/2/26
  */
+@Slf4j
 class MinioOssServiceTest extends AbstractDfsServiceTest {
 
     @Override
@@ -22,7 +22,8 @@ class MinioOssServiceTest extends AbstractDfsServiceTest {
             aliOssService.initOssClient("http://192.168.124.23:9000", "pj2","testAk", "testSktestSktestSk");
             return Optional.of(aliOssService);
         } catch (Exception e) {
-            ExceptionUtils.rethrow(e);
+            // 仅异常提醒
+            log.error("[MinioOssServiceTest] test exception!", e);
         }
         return Optional.empty();
     }

+ 1 - 1
powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/worker/selector/impl/SpecifyTaskTrackerSelector.java

@@ -1,7 +1,7 @@
 package tech.powerjob.server.remote.worker.selector.impl;
 
+import com.google.common.collect.Lists;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.compress.utils.Lists;
 import org.apache.commons.lang3.StringUtils;
 import org.springframework.stereotype.Component;
 import tech.powerjob.common.enums.DispatchStrategy;

+ 2 - 0
powerjob-worker/src/main/java/tech/powerjob/worker/container/OmsContainerFactory.java

@@ -92,6 +92,7 @@ public class OmsContainerFactory {
 
         try {
             if (!jarFile.exists()) {
+                log.info("[OmsContainer-{}] container not exist(path={}), try to download from server!", containerId, jarFile.getPath());
                 FileUtils.forceMkdirParent(jarFile);
                 FileUtils.copyURLToFile(new URL(request.getDownloadURL()), jarFile, 5000, 300000);
                 log.info("[OmsContainer-{}] download jar successfully, path={}", containerId, jarFile.getPath());
@@ -107,6 +108,7 @@ public class OmsContainerFactory {
 
             if (oldContainer != null) {
                 // 销毁旧容器
+                log.info("[OmsContainer-{}] start to destroy old container(version={})", containerId, oldContainer.getVersion());
                 oldContainer.destroy();
             }
 

+ 6 - 0
powerjob-worker/src/main/java/tech/powerjob/worker/extension/processor/ProcessorBean.java

@@ -25,4 +25,10 @@ public class ProcessorBean {
      */
     private transient ClassLoader classLoader;
 
+    /**
+     * Bean 是否稳定
+     * SpringBean / 普通Java 对象,在整个 JVM 生命周期内都不会变,可声明为稳定,在上层缓存,避免每次都要重现 build processor
+     * 对于动态容器,可能在部署后改变,则需要声明为不稳定
+     */
+    private boolean stable = true;
 }

+ 29 - 20
powerjob-worker/src/main/java/tech/powerjob/worker/processor/PowerJobProcessorLoader.java

@@ -30,27 +30,36 @@ public class PowerJobProcessorLoader implements ProcessorLoader {
 
     @Override
     public ProcessorBean load(ProcessorDefinition definition) {
-        return def2Bean.computeIfAbsent(definition, ignore -> {
-            final String processorType = definition.getProcessorType();
-            log.info("[ProcessorFactory] start to load Processor: {}", definition);
-            for (ProcessorFactory pf : processorFactoryList) {
-                final String pfName = pf.getClass().getSimpleName();
-                if (!Optional.ofNullable(pf.supportTypes()).orElse(Collections.emptySet()).contains(processorType)) {
-                    log.info("[ProcessorFactory] [{}] can't load type={}, skip!", pfName, processorType);
-                    continue;
-                }
-                log.info("[ProcessorFactory] [{}] try to load processor: {}", pfName, definition);
-                try {
-                    ProcessorBean processorBean = pf.build(definition);
-                    if (processorBean != null) {
-                        log.info("[ProcessorFactory] [{}] load processor successfully: {}", pfName, definition);
-                        return processorBean;
-                    }
-                } catch (Throwable t) {
-                    log.error("[ProcessorFactory] [{}] load processor failed: {}", pfName, definition, t);
+
+        ProcessorBean pBean = def2Bean.computeIfAbsent(definition, ignore -> buildProcessorBean(definition));
+
+        if (pBean.isStable()) {
+            return pBean;
+        }
+
+        return buildProcessorBean(definition);
+    }
+
+    private ProcessorBean buildProcessorBean(ProcessorDefinition definition) {
+        final String processorType = definition.getProcessorType();
+        log.info("[ProcessorFactory] start to load Processor: {}", definition);
+        for (ProcessorFactory pf : processorFactoryList) {
+            final String pfName = pf.getClass().getSimpleName();
+            if (!Optional.ofNullable(pf.supportTypes()).orElse(Collections.emptySet()).contains(processorType)) {
+                log.info("[ProcessorFactory] [{}] can't load type={}, skip!", pfName, processorType);
+                continue;
+            }
+            log.info("[ProcessorFactory] [{}] try to load processor: {}", pfName, definition);
+            try {
+                ProcessorBean processorBean = pf.build(definition);
+                if (processorBean != null) {
+                    log.info("[ProcessorFactory] [{}] load processor successfully: {}", pfName, definition);
+                    return processorBean;
                 }
+            } catch (Throwable t) {
+                log.error("[ProcessorFactory] [{}] load processor failed: {}", pfName, definition, t);
             }
-            throw new PowerJobException("fetch Processor failed, please check your processorType and processorInfo config");
-        });
+        }
+        throw new PowerJobException("fetch Processor failed, please check your processorType and processorInfo config");
     }
 }

+ 3 - 1
powerjob-worker/src/main/java/tech/powerjob/worker/processor/impl/JarContainerProcessorFactory.java

@@ -46,7 +46,9 @@ public class JarContainerProcessorFactory implements ProcessorFactory {
         if (omsContainer != null) {
             return new ProcessorBean()
                     .setProcessor(omsContainer.getProcessor(className))
-                    .setClassLoader(omsContainer.getContainerClassLoader());
+                    .setClassLoader(omsContainer.getContainerClassLoader())
+                    .setStable(false)
+                    ;
         } else {
             log.warn("[ProcessorFactory] load container failed. processor info : {}", processorInfo);
         }