Browse Source

fix: Loss of subtask data when mapreduce enters swap mode

tjq 1 year ago
parent
commit
e64ad0f74d

+ 4 - 2
powerjob-common/src/main/java/tech/powerjob/common/PowerJobDKey.java

@@ -55,11 +55,13 @@ public class PowerJobDKey {
      */
     public static final String FREQUENCY_JOB_MAX_INTERVAL = "powerjob.server.frequency-job.max-interval";
 
-    /* ******************* 不太可能有人用的参数 ******************* */
+    /* ******************* 不太可能有人用的参数,主要方便内部测试 ******************* */
 
     /**
      * 最大活跃任务数量,超出部分 SWAP 到磁盘以提升性能
      */
-    public static final String WORKER_RUNTIME_MAX_ACTIVE_TASK_NUM = "powerjob.worker.max-active-task-num";
+    public static final String WORKER_RUNTIME_SWAP_MAX_ACTIVE_TASK_NUM = "powerjob.worker.swap.max-active-task-num";
+
+    public static final String WORKER_RUNTIME_SWAP_TASK_SCHEDULE_INTERVAL_MS = "powerjob.worker.swap.scan-interval";
 
 }

+ 1 - 1
powerjob-worker-samples/src/main/java/tech/powerjob/samples/processors/MapReduceProcessorDemo.java

@@ -29,7 +29,7 @@ import java.util.concurrent.atomic.AtomicLong;
  * @since 2020/4/17
  */
 @Slf4j
-@Component("testMapReduceProcessor")
+@Component("demoMapReduceProcessor")
 public class MapReduceProcessorDemo implements MapReduceProcessor {
 
     @Override

+ 55 - 10
powerjob-worker-samples/src/main/resources/logback.xml

@@ -1,5 +1,10 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <configuration>
+
+    <!-- 不同 worker 区分不同日志 参考参数: spring.profiles.active=local log.name=worker1 -->
+    <springProperty name="LOG_NAME" source="log.name" defaultValue="INFO"/>
+    <property name="LOG_PATH" value="${user.home}/powerjob-worker-samples/${LOG_NAME}/logs"/>
+
     <!-- %m输出的信息,%p日志级别,%t线程名,%d日期,%c类的全名,%i索引【从数字0开始递增】,,, -->
     <!-- appender是configuration的子节点,是负责写日志的组件。 -->
     <!-- ConsoleAppender:把日志输出到控制台 -->
@@ -12,16 +17,56 @@
         </encoder>
     </appender>
 
-    <logger name="com.zaxxer.hikari" level="INFO">
-        <appender-ref ref="STDOUT"/>
-    </logger>
+    <springProfile name="default">
+        <!-- 默认环境时激活,全部输出到控制台 -->
+        <logger name="com.zaxxer.hikari" level="INFO">
+            <appender-ref ref="STDOUT"/>
+        </logger>
+
+        <logger name="tech.powerjob" level="DEBUG" additivity="false">
+            <appender-ref ref="STDOUT"/>
+        </logger>
+        <!-- 控制台输出日志级别 -->
+        <root level="INFO">
+            <appender-ref ref="STDOUT"/>
+        </root>
+    </springProfile>
+
+    <springProfile name="local">
+        <!-- 传入 local 的时候,全部使用本地日志 -->
+
+        <appender name="APPLICATION_APPENDER" class="ch.qos.logback.core.rolling.RollingFileAppender">
+            <file>${LOG_PATH}/application.log</file>
+            <encoder>
+                <pattern>%d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n</pattern>
+            </encoder>
+
+            <rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
+                <!-- 每天滚动日志文件 -->
+                <fileNamePattern>${LOG_PATH}/application.log.%d{yyyy-MM-dd}.%i</fileNamePattern>
+                <!-- 日志文件总大小限制为3GB -->
+                <maxFileSize>500MB</maxFileSize>
+                <!-- 设置最大历史记录为10天 -->
+                <maxHistory>7</maxHistory>
+                <!-- 设置总大小限制 -->
+                <totalSizeCap>3GB</totalSizeCap>
+            </rollingPolicy>
+        </appender>
+
+        <!-- 异步输出 -->
+        <appender name="ASYNC_APPLICATION_APPENDER" class="ch.qos.logback.classic.AsyncAppender">
+            <!-- 不丢失日志,默认的,如果队列的 80% 已满,则会丢弃TRACT、DEBUG、INFO级别的日志 -->
+            <discardingThreshold>0</discardingThreshold>
+            <!-- 更改默认的队列的深度,该值会影响性能,默认值为256 -->
+            <queueSize>256</queueSize>
+            <!-- 添加附加的appender,最多只能添加一个 -->
+            <appender-ref ref="APPLICATION_APPENDER"/>
+        </appender>
 
-    <logger name="tech.powerjob" level="DEBUG" additivity="false">
-        <appender-ref ref="STDOUT"/>
-    </logger>
+        <!-- 控制台输出日志级别 -->
+        <root level="INFO">
+            <appender-ref ref="ASYNC_APPLICATION_APPENDER"/>
+        </root>
+    </springProfile>
 
-    <!-- 控制台输出日志级别 -->
-    <root level="INFO">
-        <appender-ref ref="STDOUT"/>
-    </root>
 </configuration>

+ 5 - 5
powerjob-worker/src/main/java/tech/powerjob/worker/core/processor/runnable/HeavyProcessorRunnable.java

@@ -21,14 +21,13 @@ import tech.powerjob.worker.core.processor.sdk.BroadcastProcessor;
 import tech.powerjob.worker.core.processor.sdk.MapReduceProcessor;
 import tech.powerjob.worker.extension.processor.ProcessorBean;
 import tech.powerjob.worker.log.OmsLogger;
+import tech.powerjob.worker.persistence.PersistenceServiceManager;
 import tech.powerjob.worker.persistence.TaskDO;
+import tech.powerjob.worker.persistence.TaskPersistenceService;
 import tech.powerjob.worker.pojo.model.InstanceInfo;
 import tech.powerjob.worker.pojo.request.ProcessorReportTaskStatusReq;
 
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
+import java.util.*;
 
 /**
  * Processor 执行器
@@ -136,7 +135,8 @@ public class HeavyProcessorRunnable implements Runnable {
         Stopwatch stopwatch = Stopwatch.createStarted();
         log.debug("[ProcessorRunnable-{}] the last task(taskId={}) start to process.", instanceId, taskId);
 
-        List<TaskResult> taskResults = workerRuntime.getTaskPersistenceService().getAllTaskResult(instanceId, task.getSubInstanceId());
+        TaskPersistenceService taskPersistenceService = Optional.ofNullable(PersistenceServiceManager.fetchTaskPersistenceService(instanceId)).orElse(workerRuntime.getTaskPersistenceService());
+        List<TaskResult> taskResults = taskPersistenceService.getAllTaskResult(instanceId, task.getSubInstanceId());
         try {
             switch (executeType) {
                 case BROADCAST:

+ 1 - 1
powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java

@@ -470,7 +470,7 @@ public abstract class HeavyTaskTracker extends TaskTracker {
 
             // 2. 没有可用 ProcessorTracker,本次不派发
             if (availablePtIps.isEmpty()) {
-                log.debug("[TaskTracker-{}] no available ProcessorTracker now.", instanceId);
+                log.warn("[TaskTracker-{}] no available ProcessorTracker now, skip dispatch", instanceId);
                 return;
             }
 

+ 24 - 0
powerjob-worker/src/main/java/tech/powerjob/worker/persistence/PersistenceServiceManager.java

@@ -0,0 +1,24 @@
+package tech.powerjob.worker.persistence;
+
+import com.google.common.collect.Maps;
+
+import java.util.Map;
+
+/**
+ * 持久化器管理
+ *
+ * @author tjq
+ * @since 2024/2/25
+ */
+public class PersistenceServiceManager {
+
+    private static final Map<Long, TaskPersistenceService> INSTANCE_ID_2_TASK_PERSISTENCE_SERVICE = Maps.newConcurrentMap();
+
+    public static void register(Long instanceId, TaskPersistenceService taskPersistenceService) {
+        INSTANCE_ID_2_TASK_PERSISTENCE_SERVICE.put(instanceId, taskPersistenceService);
+    }
+
+    public static TaskPersistenceService fetchTaskPersistenceService(Long instanceId) {
+        return INSTANCE_ID_2_TASK_PERSISTENCE_SERVICE.get(instanceId);
+    }
+}

+ 13 - 10
powerjob-worker/src/main/java/tech/powerjob/worker/persistence/SwapTaskPersistenceService.java

@@ -32,6 +32,7 @@ public class SwapTaskPersistenceService implements TaskPersistenceService {
 
     private final Long instanceId;
     private final long maxActiveTaskNum;
+    private final long scheduleRateMs;
     /**
      * 数据库记录数量,不要求完全精确,仅用于控制存哪里,有一定容忍度
      */
@@ -63,9 +64,10 @@ public class SwapTaskPersistenceService implements TaskPersistenceService {
         this.needResult = ExecuteType.MAP_REDUCE.name().equalsIgnoreCase(instanceInfo.getExecuteType());
         this.canUseSwap = ExecuteType.MAP.name().equalsIgnoreCase(instanceInfo.getExecuteType()) || ExecuteType.MAP_REDUCE.name().equalsIgnoreCase(instanceInfo.getExecuteType());
         this.dbTaskPersistenceService = dbTaskPersistenceService;
-        this.maxActiveTaskNum = Long.parseLong(System.getProperty(PowerJobDKey.WORKER_RUNTIME_MAX_ACTIVE_TASK_NUM, String.valueOf(DEFAULT_RUNTIME_MAX_ACTIVE_TASK_NUM)));
-
-        log.info("[SwapTaskPersistenceService-{}] initialized SwapTaskPersistenceService, canUseSwap: {}, needResult: {}, maxActiveTaskNum: {}", instanceId, canUseSwap, needResult, maxActiveTaskNum);
+        this.maxActiveTaskNum = Long.parseLong(System.getProperty(PowerJobDKey.WORKER_RUNTIME_SWAP_MAX_ACTIVE_TASK_NUM, String.valueOf(DEFAULT_RUNTIME_MAX_ACTIVE_TASK_NUM)));
+        this.scheduleRateMs = Long.parseLong(System.getProperty(PowerJobDKey.WORKER_RUNTIME_SWAP_TASK_SCHEDULE_INTERVAL_MS, String.valueOf(DEFAULT_SCHEDULE_TIME)));
+        PersistenceServiceManager.register(this.instanceId, this);
+        log.info("[SwapTaskPersistenceService-{}] initialized SwapTaskPersistenceService, canUseSwap: {}, needResult: {}, maxActiveTaskNum: {}, scheduleRateMs: {}", instanceId, canUseSwap, needResult, maxActiveTaskNum, scheduleRateMs);
     }
 
     @Override
@@ -139,7 +141,7 @@ public class SwapTaskPersistenceService implements TaskPersistenceService {
                 externalPendingRecordNum.add(tasks.size());
             }
 
-            log.info("[SwapTaskPersistenceService-{}] too many tasks at runtime(dbRecordNum: {}), SWAP enabled, persistence result: {}, externalPendingRecordNum: {}", instanceId, dbNum, persistPendingTaskRes, externalPendingRecordNum);
+            log.debug("[SwapTaskPersistenceService-{}] too many tasks at runtime(dbRecordNum: {}), SWAP enabled, persistence result: {}, externalPendingRecordNum: {}", instanceId, dbNum, persistPendingTaskRes, externalPendingRecordNum);
             return persistPendingTaskRes;
         } else {
             return persistTask2Db(tasks);
@@ -214,10 +216,11 @@ public class SwapTaskPersistenceService implements TaskPersistenceService {
                     return;
                 }
 
-                CommonUtils.easySleep(DEFAULT_SCHEDULE_TIME);
+                CommonUtils.easySleep(scheduleRateMs);
 
-                moveInPendingTask();
+                // 顺序很关键,先移出才有空间移入
                 moveOutFinishedTask();
+                moveInPendingTask();
             }
         }
 
@@ -247,7 +250,7 @@ public class SwapTaskPersistenceService implements TaskPersistenceService {
                 externalPendingRecordNum.add(-taskDOS.size());
 
                 boolean persistTask2Db = persistTask2Db(taskDOS);
-                log.info("[SwapTaskPersistenceService-{}] [moveInPendingTask] readPendingTask size: {}, persistResult: {}, currentDbRecordNum: {}, remainExternalPendingRecordNum: {}", instanceId, taskDOS.size(), persistTask2Db, dbRecordNum, externalPendingRecordNum);
+                log.debug("[SwapTaskPersistenceService-{}] [moveInPendingTask] readPendingTask size: {}, persistResult: {}, currentDbRecordNum: {}, remainExternalPendingRecordNum: {}", instanceId, taskDOS.size(), persistTask2Db, dbRecordNum, externalPendingRecordNum);
 
                 // 持久化失败的情况,及时跳出本次循环,防止损失扩大,等待下次扫描
                 if (!persistTask2Db) {
@@ -261,7 +264,7 @@ public class SwapTaskPersistenceService implements TaskPersistenceService {
 
             while (true) {
 
-                // 一旦启动 SWAP,需要移出更多的数据来灌入,最多允许
+                // 一旦启动 SWAP,需要移出更多的数据来灌入
                 long maxRemainNum = maxActiveTaskNum / 2;
                 if (dbRecordNum.sum() <= maxRemainNum) {
                     return;
@@ -306,10 +309,10 @@ public class SwapTaskPersistenceService implements TaskPersistenceService {
 
             if (deleteTasksByTaskIdsResult) {
                 dbRecordNum.add(-moveOutNum);
-                log.info("{} move task to external successfully(movedNum: {}, currentExternalRecordNum: {}, currentDbRecordNum: {})", logKey, moveOutNum, externalRecord, dbRecordNum);
+                log.debug("{} move task to external successfully(movedNum: {}, currentExternalSucceedNum: {}, currentExternalFailedNum: {}, currentDbRecordNum: {})", logKey, moveOutNum, externalSucceedRecordNum, externalFailedRecordNum, dbRecordNum);
             } else {
                 // DB 删除失败影响不大,reduce 重复而已
-                log.warn("{} persistFinishedTask to external successfully but delete in runtime failed(movedNum: {}, currentExternalRecordNum: {}, currentDbRecordNum: {}), these taskIds may have duplicate results in reduce stage: {}", logKey, moveOutNum, externalRecord, dbRecordNum, deleteTaskIds);
+                log.warn("{} persistFinishedTask to external successfully but delete in runtime failed(movedNum: {}, currentExternalSucceedNum: {}, currentExternalFailedNum: {}, currentDbRecordNum: {}), these taskIds may have duplicate results in reduce stage: {}", logKey, moveOutNum, externalSucceedRecordNum, externalFailedRecordNum, dbRecordNum, deleteTaskIds);
             }
         }
     }

+ 24 - 7
powerjob-worker/src/main/java/tech/powerjob/worker/persistence/fs/impl/ExternalTaskFileSystemPersistenceService.java

@@ -3,7 +3,9 @@ package tech.powerjob.worker.persistence.fs.impl;
 import com.google.common.collect.Lists;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
 import tech.powerjob.common.serialize.JsonUtils;
+import tech.powerjob.common.utils.CollectionUtils;
 import tech.powerjob.common.utils.CommonUtils;
 import tech.powerjob.worker.persistence.TaskDO;
 import tech.powerjob.worker.persistence.fs.ExternalTaskPersistenceService;
@@ -55,6 +57,9 @@ public class ExternalTaskFileSystemPersistenceService implements ExternalTaskPer
 
     @Override
     public boolean persistPendingTask(List<TaskDO> tasks) {
+        if (CollectionUtils.isEmpty(tasks)) {
+            return true;
+        }
         try {
             String content = JsonUtils.toJSONString(tasks);
             pendingFsService.writeLine(content);
@@ -69,15 +74,19 @@ public class ExternalTaskFileSystemPersistenceService implements ExternalTaskPer
     @SneakyThrows
     public List<TaskDO> readPendingTask() {
         String pendingTaskStr = pendingFsService.readLine();
-        TaskDO[] taskDOS = JsonUtils.parseObject(pendingTaskStr, TaskDO[].class);
-        if (taskDOS != null) {
-            return Lists.newArrayList(taskDOS);
-        }
-        return Collections.emptyList();
+        return str2TaskDoList(pendingTaskStr);
     }
 
     @Override
     public boolean persistFinishedTask(List<TaskDO> tasks) {
+
+        if (CollectionUtils.isEmpty(tasks)) {
+            return true;
+        }
+
+        // 移除无用的参数列
+        tasks.forEach(t -> t.setTaskContent(null));
+
         try {
             String content = JsonUtils.toJSONString(tasks);
             resultFsService.writeLine(content);
@@ -91,8 +100,16 @@ public class ExternalTaskFileSystemPersistenceService implements ExternalTaskPer
     @Override
     @SneakyThrows
     public List<TaskDO> readFinishedTask() {
-        String pendingTaskStr = resultFsService.readLine();
-        TaskDO[] taskDOS = JsonUtils.parseObject(pendingTaskStr, TaskDO[].class);
+        String finishedStr = resultFsService.readLine();
+        return str2TaskDoList(finishedStr);
+    }
+
+
+    private static List<TaskDO> str2TaskDoList(String finishedStr) throws Exception {
+        if (StringUtils.isEmpty(finishedStr)) {
+            return Collections.emptyList();
+        }
+        TaskDO[] taskDOS = JsonUtils.parseObject(finishedStr, TaskDO[].class);
         if (taskDOS != null) {
             return Lists.newArrayList(taskDOS);
         }