Prechádzať zdrojové kódy

feat: use PowerJobRemoteEngine to replace akka

tjq 2 rokov pred
rodič
commit
5b94247daf

+ 4 - 2
powerjob-worker-samples/src/main/java/tech/powerjob/samples/processors/SimpleProcessor.java

@@ -5,6 +5,8 @@ import tech.powerjob.worker.core.processor.TaskContext;
 import tech.powerjob.worker.core.processor.sdk.BasicProcessor;
 import tech.powerjob.worker.log.OmsLogger;
 
+import java.util.Optional;
+
 /**
  * @author Echo009
  * @since 2022/4/27
@@ -17,11 +19,11 @@ public class SimpleProcessor implements BasicProcessor {
 
         OmsLogger logger = context.getOmsLogger();
 
-        String jobParams = context.getJobParams();
+        String jobParams = Optional.ofNullable(context.getJobParams()).orElse("S");
         logger.info("Current context:{}", context.getWorkflowContext());
         logger.info("Current job params:{}", jobParams);
 
-        return jobParams.contains("F") ? new ProcessResult(false) : new ProcessResult(true);
+        return jobParams.contains("F") ? new ProcessResult(false) : new ProcessResult(true, "yeah!");
 
     }
 }

+ 3 - 4
powerjob-worker/src/main/java/tech/powerjob/worker/background/WorkerHealthReporter.java

@@ -1,8 +1,8 @@
 package tech.powerjob.worker.background;
 
 import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
-import tech.powerjob.common.enums.Protocol;
 import tech.powerjob.common.model.SystemMetrics;
 import tech.powerjob.common.request.WorkerHeartbeat;
 import tech.powerjob.worker.common.PowerJobWorkerVersion;
@@ -10,7 +10,6 @@ import tech.powerjob.worker.common.WorkerRuntime;
 import tech.powerjob.worker.common.utils.SystemInfoUtils;
 import tech.powerjob.worker.common.utils.TransportUtils;
 import tech.powerjob.worker.container.OmsContainerFactory;
-import lombok.extern.slf4j.Slf4j;
 import tech.powerjob.worker.core.tracker.manager.HeavyTaskTrackerManager;
 import tech.powerjob.worker.core.tracker.manager.LightTaskTrackerManager;
 
@@ -53,8 +52,8 @@ public class WorkerHealthReporter implements Runnable {
         heartbeat.setAppId(workerRuntime.getAppId());
         heartbeat.setHeartbeatTime(System.currentTimeMillis());
         heartbeat.setVersion(PowerJobWorkerVersion.getVersion());
-        heartbeat.setProtocol(Protocol.AKKA.name());
-        heartbeat.setClient("Atlantis");
+        heartbeat.setProtocol(workerRuntime.getWorkerConfig().getProtocol().name());
+        heartbeat.setClient("KingPenguin");
         heartbeat.setTag(workerRuntime.getWorkerConfig().getTag());
 
         // 上报 Tracker 数量

+ 1 - 1
powerjob-worker/src/main/java/tech/powerjob/worker/common/utils/TransportUtils.java

@@ -97,7 +97,7 @@ public class TransportUtils {
     }
 
     private static AskResponse reliableAsk(ServerType t, String rootPath, String handlerPath, String address, PowerSerializable req, Transporter transporter) throws Exception {
-        final URL url = easyBuildUrl(ServerType.WORKER, WTT_PATH, WTT_HANDLER_MAP_TASK, address);
+        final URL url = easyBuildUrl(t, rootPath, handlerPath, address);
         final CompletionStage<AskResponse> completionStage = transporter.ask(url, req, AskResponse.class);
         return completionStage
                 .toCompletableFuture()

+ 1 - 1
powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java

@@ -530,7 +530,7 @@ public abstract class HeavyTaskTracker extends TaskTracker {
                 List<String> workerList = JsonUtils.parseObject(response.getData(), new TypeReference<List<String>>() {});
                 ptStatusHolder.register(workerList);
             } catch (Exception e) {
-                log.warn("[TaskTracker-{}] detective failed!", instanceId, e);
+                log.warn("[TaskTracker-{}] detective failed, currentServer: {}", instanceId, currentServerAddress, e);
             }
         }
     }