Browse Source

[Improvement][Master] Validate same content of input file when using task cache (#13298)

* support file content checksum

* fix inject null storageOperate bug
Aaron Wang 2 years ago
parent
commit
ccad56e88e

+ 3 - 0
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java

@@ -329,6 +329,9 @@ public final class Constants {
     public static final String THREAD_NAME_WORKER_SERVER = "Worker-Server";
     public static final String THREAD_NAME_ALERT_SERVER = "Alert-Server";
 
+    // suffix of crc file
+    public static final String CRC_SUFFIX = ".crc";
+
     /**
      * complement date default cron string
      */

+ 35 - 0
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/FileUtils.java

@@ -28,11 +28,14 @@ import org.apache.commons.io.IOUtils;
 
 import java.io.ByteArrayOutputStream;
 import java.io.File;
+import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.NoSuchFileException;
+import java.util.zip.CRC32;
+import java.util.zip.CheckedInputStream;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -264,4 +267,36 @@ public class FileUtils {
         }
     }
 
+    /**
+     * Calculate file checksum with CRC32 algorithm
+     * @param pathName
+     * @return checksum of file/dir
+     */
+    public static String getFileChecksum(String pathName) throws IOException {
+        CRC32 crc32 = new CRC32();
+        File file = new File(pathName);
+        String crcString = "";
+        if (file.isDirectory()) {
+            // file system interface remains the same order
+            String[] subPaths = file.list();
+            StringBuilder concatenatedCRC = new StringBuilder();
+            for (String subPath : subPaths) {
+                concatenatedCRC.append(getFileChecksum(pathName + FOLDER_SEPARATOR + subPath));
+            }
+            crcString = concatenatedCRC.toString();
+        } else {
+            try (
+                    FileInputStream fileInputStream = new FileInputStream(pathName);
+                    CheckedInputStream checkedInputStream = new CheckedInputStream(fileInputStream, crc32);) {
+                while (checkedInputStream.read() != -1) {
+                }
+            } catch (IOException e) {
+                throw new IOException("Calculate checksum error.");
+            }
+            crcString = Long.toHexString(crc32.getValue());
+        }
+
+        return crcString;
+    }
+
 }

+ 24 - 0
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/FileUtilsTest.java

@@ -118,4 +118,28 @@ public class FileUtilsTest {
         Assertions.assertTrue(FileUtils.directoryTraversal(path));
     }
 
+    @Test
+    void testGetFileChecksum() throws Exception {
+        String filePath1 = "test/testFile1.txt";
+        String filePath2 = "test/testFile2.txt";
+        String filePath3 = "test/testFile3.txt";
+        String content1 = "正正正faffdasfasdfas,한국어; 한글……にほんご\nfrançais";
+        String content2 = "正正正faffdasfasdfas,한국어; 한글……にほん\nfrançais";
+        FileUtils.writeContent2File(content1, filePath1);
+        FileUtils.writeContent2File(content2, filePath2);
+        FileUtils.writeContent2File(content1, filePath3);
+
+        String checksum1 = FileUtils.getFileChecksum(filePath1);
+        String checksum2 = FileUtils.getFileChecksum(filePath2);
+        String checksum3 = FileUtils.getFileChecksum(filePath3);
+
+        Assertions.assertNotEquals(checksum1, checksum2);
+        Assertions.assertEquals(checksum1, checksum3);
+
+        String dirPath = "test/";
+
+        Assertions.assertDoesNotThrow(
+                () -> FileUtils.getFileChecksum(dirPath));
+    }
+
 }

+ 4 - 0
dolphinscheduler-dao/pom.xml

@@ -112,6 +112,10 @@
             <artifactId>h2</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.dolphinscheduler</groupId>
+            <artifactId>dolphinscheduler-storage-api</artifactId>
+        </dependency>
     </dependencies>
 
     <build>

+ 56 - 3
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/TaskCacheUtils.java

@@ -17,9 +17,14 @@
 
 package org.apache.dolphinscheduler.dao.utils;
 
+import static org.apache.dolphinscheduler.common.constants.Constants.CRC_SUFFIX;
+
+import org.apache.dolphinscheduler.common.utils.FileUtils;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.enums.DataType;
 import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
 import org.apache.dolphinscheduler.plugin.task.api.model.Property;
 
@@ -27,8 +32,11 @@ import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Pair;
 
+import java.io.FileInputStream;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -36,10 +44,15 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import com.fasterxml.jackson.databind.JsonNode;
 
 public class TaskCacheUtils {
 
+    protected static final Logger logger = LoggerFactory.getLogger(TaskCacheUtils.class);
+
     private TaskCacheUtils() {
         throw new IllegalStateException("Utility class");
     }
@@ -54,15 +67,17 @@ public class TaskCacheUtils {
      * 4. input VarPool, from upstream task and workflow global parameters
      * @param taskInstance task instance
      * @param taskExecutionContext taskExecutionContext
+     * @param storageOperate storageOperate
      * @return cache key
      */
-    public static String generateCacheKey(TaskInstance taskInstance, TaskExecutionContext taskExecutionContext) {
+    public static String generateCacheKey(TaskInstance taskInstance, TaskExecutionContext taskExecutionContext,
+                                          StorageOperate storageOperate) {
         List<String> keyElements = new ArrayList<>();
         keyElements.add(String.valueOf(taskInstance.getTaskCode()));
         keyElements.add(String.valueOf(taskInstance.getTaskDefinitionVersion()));
         keyElements.add(String.valueOf(taskInstance.getIsCache().getCode()));
         keyElements.add(String.valueOf(taskInstance.getEnvironmentConfig()));
-        keyElements.add(getTaskInputVarPoolData(taskInstance, taskExecutionContext));
+        keyElements.add(getTaskInputVarPoolData(taskInstance, taskExecutionContext, storageOperate));
         String data = StringUtils.join(keyElements, "_");
         return DigestUtils.sha256Hex(data);
     }
@@ -109,7 +124,8 @@ public class TaskCacheUtils {
      * @param taskInstance task instance
      * taskExecutionContext taskExecutionContext
      */
-    public static String getTaskInputVarPoolData(TaskInstance taskInstance, TaskExecutionContext context) {
+    public static String getTaskInputVarPoolData(TaskInstance taskInstance, TaskExecutionContext context,
+                                                 StorageOperate storageOperate) {
         JsonNode taskParams = JSONUtils.parseObject(taskInstance.getTaskParams());
 
         // The set of input values considered from localParams in the taskParams
@@ -123,6 +139,12 @@ public class TaskCacheUtils {
         // var pool value from upstream task
         List<Property> varPool = JSONUtils.toList(taskInstance.getVarPool(), Property.class);
 
+        Map<String, String> fileCheckSumMap = new HashMap<>();
+        List<Property> fileInput = varPool.stream().filter(property -> property.getType().equals(DataType.FILE))
+                .collect(Collectors.toList());
+        fileInput.forEach(
+                property -> fileCheckSumMap.put(property.getProp(), getValCheckSum(property, context, storageOperate)));
+
         // var pool value from workflow global parameters
         if (context.getPrepareParamsMap() != null) {
             Set<String> taskVarPoolSet = varPool.stream().map(Property::getProp).collect(Collectors.toSet());
@@ -139,9 +161,40 @@ public class TaskCacheUtils {
                 .filter(property -> propertyInSet.contains(property.getProp()))
                 .sorted(Comparator.comparing(Property::getProp))
                 .collect(Collectors.toList());
+
+        varPool.forEach(property -> {
+            if (property.getType() == DataType.FILE) {
+                property.setValue(fileCheckSumMap.get(property.getValue()));
+            }
+        });
         return JSONUtils.toJsonString(varPool);
     }
 
+    /**
+     * get checksum from crc32 file of file property in varPool
+     * cache can be used if content of upstream output files are the same
+     * @param fileProperty
+     * @param context
+     * @param storageOperate
+     */
+    public static String getValCheckSum(Property fileProperty, TaskExecutionContext context,
+                                        StorageOperate storageOperate) {
+        String resourceCRCPath = fileProperty.getValue() + CRC_SUFFIX;
+        String resourceCRCWholePath = storageOperate.getResourceFileName(context.getTenantCode(), resourceCRCPath);
+        String targetPath = String.format("%s/%s", context.getExecutePath(), resourceCRCPath);
+        logger.info("{} --- Remote:{} to Local:{}", "CRC file", resourceCRCWholePath, targetPath);
+        String crcString = "";
+        try {
+            storageOperate.download(context.getTenantCode(), resourceCRCWholePath, targetPath, false,
+                    true);
+            crcString = FileUtils.readFile2Str(new FileInputStream(targetPath));
+            fileProperty.setValue(crcString);
+        } catch (IOException e) {
+            logger.error("Replace checksum failed for file property {}.", fileProperty.getProp());
+        }
+        return crcString;
+    }
+
     /**
      * get var in set from task definition
      * @param taskInstance task instance

+ 35 - 7
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/TaskCacheUtilsTest.java

@@ -17,8 +17,12 @@
 
 package org.apache.dolphinscheduler.dao.utils;
 
+import static org.apache.dolphinscheduler.common.constants.Constants.CRC_SUFFIX;
+
 import org.apache.dolphinscheduler.common.enums.Flag;
+import org.apache.dolphinscheduler.common.utils.FileUtils;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import org.apache.dolphinscheduler.plugin.task.api.enums.DataType;
 import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
@@ -35,6 +39,7 @@ import java.util.Map;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
 
 class TaskCacheUtilsTest {
 
@@ -42,6 +47,8 @@ class TaskCacheUtilsTest {
 
     private TaskExecutionContext taskExecutionContext;
 
+    private StorageOperate storageOperate;
+
     @BeforeEach
     void setUp() {
         String taskParams = "{\n" +
@@ -95,6 +102,7 @@ class TaskCacheUtilsTest {
         prepareParamsMap.put("a", property);
         taskExecutionContext.setPrepareParamsMap(prepareParamsMap);
 
+        storageOperate = Mockito.mock(StorageOperate.class);
     }
 
     @Test
@@ -121,25 +129,26 @@ class TaskCacheUtilsTest {
 
     @Test
     void TestGetTaskInputVarPoolData() {
-        TaskCacheUtils.getTaskInputVarPoolData(taskInstance, taskExecutionContext);
+        TaskCacheUtils.getTaskInputVarPoolData(taskInstance, taskExecutionContext, storageOperate);
         // only a=aa and c=cc will influence the result,
         // b=bb is a fixed value, will be considered in task version
         // k=kk is not in task params, will be ignored
         String except =
                 "[{\"prop\":\"a\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"aa\"},{\"prop\":\"c\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"cc\"}]";
-        Assertions.assertEquals(except, TaskCacheUtils.getTaskInputVarPoolData(taskInstance, taskExecutionContext));
+        Assertions.assertEquals(except,
+                TaskCacheUtils.getTaskInputVarPoolData(taskInstance, taskExecutionContext, storageOperate));
     }
 
     @Test
     void TestGenerateCacheKey() {
-        String cacheKeyBase = TaskCacheUtils.generateCacheKey(taskInstance, taskExecutionContext);
+        String cacheKeyBase = TaskCacheUtils.generateCacheKey(taskInstance, taskExecutionContext, storageOperate);
         Property propertyI = new Property();
         propertyI.setProp("i");
         propertyI.setDirect(Direct.IN);
         propertyI.setType(DataType.VARCHAR);
         propertyI.setValue("ii");
         taskExecutionContext.getPrepareParamsMap().put("i", propertyI);
-        String cacheKeyNew = TaskCacheUtils.generateCacheKey(taskInstance, taskExecutionContext);
+        String cacheKeyNew = TaskCacheUtils.generateCacheKey(taskInstance, taskExecutionContext, storageOperate);
         // i will not influence the result, because task instance not use it
         Assertions.assertEquals(cacheKeyBase, cacheKeyNew);
 
@@ -149,17 +158,17 @@ class TaskCacheUtilsTest {
         propertyD.setType(DataType.VARCHAR);
         propertyD.setValue("dd");
         taskExecutionContext.getPrepareParamsMap().put("i", propertyD);
-        String cacheKeyD = TaskCacheUtils.generateCacheKey(taskInstance, taskExecutionContext);
+        String cacheKeyD = TaskCacheUtils.generateCacheKey(taskInstance, taskExecutionContext, storageOperate);
         // d will influence the result, because task instance use it
         Assertions.assertNotEquals(cacheKeyBase, cacheKeyD);
 
         taskInstance.setTaskDefinitionVersion(100);
-        String cacheKeyE = TaskCacheUtils.generateCacheKey(taskInstance, taskExecutionContext);
+        String cacheKeyE = TaskCacheUtils.generateCacheKey(taskInstance, taskExecutionContext, storageOperate);
         // task definition version is changed, so cache key changed
         Assertions.assertNotEquals(cacheKeyD, cacheKeyE);
 
         taskInstance.setEnvironmentConfig("export PYTHON_HOME=/bin/python3");
-        String cacheKeyF = TaskCacheUtils.generateCacheKey(taskInstance, taskExecutionContext);
+        String cacheKeyF = TaskCacheUtils.generateCacheKey(taskInstance, taskExecutionContext, storageOperate);
         // EnvironmentConfig is changed, so cache key changed
         Assertions.assertNotEquals(cacheKeyE, cacheKeyF);
     }
@@ -169,4 +178,23 @@ class TaskCacheUtilsTest {
         String cacheKey = TaskCacheUtils.generateTagCacheKey(1, "123");
         Assertions.assertEquals("1-123", cacheKey);
     }
+
+    @Test
+    void testReplaceWithCheckSum() {
+        String content = "abcdefg";
+        String filePath = "test/testFile.txt";
+        FileUtils.writeContent2File(content, filePath + CRC_SUFFIX);
+
+        Property property = new Property();
+        property.setProp("f1");
+        property.setValue("testFile.txt");
+        property.setType(DataType.FILE);
+        property.setDirect(Direct.IN);
+        TaskExecutionContext taskExecutionContext = new TaskExecutionContext();
+        taskExecutionContext.setExecutePath("test");
+        taskExecutionContext.setTenantCode("aaa");
+
+        String crc = TaskCacheUtils.getValCheckSum(property, taskExecutionContext, storageOperate);
+        Assertions.assertEquals(crc, content);
+    }
 }

+ 8 - 1
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java

@@ -25,6 +25,7 @@ import org.apache.dolphinscheduler.common.thread.ThreadUtils;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
 import org.apache.dolphinscheduler.dao.utils.TaskCacheUtils;
+import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.TaskDispatchCommand;
@@ -104,6 +105,12 @@ public class TaskPriorityQueueConsumer extends BaseDaemonThread {
     @Autowired
     private TaskEventService taskEventService;
 
+    /**
+     * storage operator
+     */
+    @Autowired(required = false)
+    private StorageOperate storageOperate;
+
     /**
      * consumer thread pool
      */
@@ -298,7 +305,7 @@ public class TaskPriorityQueueConsumer extends BaseDaemonThread {
                 return false;
             }
             // check if task is cache execution
-            String cacheKey = TaskCacheUtils.generateCacheKey(taskInstance, context);
+            String cacheKey = TaskCacheUtils.generateCacheKey(taskInstance, context, storageOperate);
             TaskInstance cacheTaskInstance = taskInstanceDao.findTaskInstanceByCacheKey(cacheKey);
             // if we can find the cache task instance, we will add cache event, and return true.
             if (cacheTaskInstance != null) {

+ 20 - 2
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskFilesTransferUtils.java

@@ -17,7 +17,10 @@
 
 package org.apache.dolphinscheduler.server.worker.utils;
 
+import static org.apache.dolphinscheduler.common.constants.Constants.CRC_SUFFIX;
+
 import org.apache.dolphinscheduler.common.utils.DateUtils;
+import org.apache.dolphinscheduler.common.utils.FileUtils;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate;
 import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
@@ -84,16 +87,31 @@ public class TaskFilesTransferUtils {
         logger.info("Upload output files ...");
         for (Property property : localParamsProperty) {
             // get local file path
-            String srcPath =
-                    packIfDir(String.format("%s/%s", taskExecutionContext.getExecutePath(), property.getValue()));
+            String path = String.format("%s/%s", taskExecutionContext.getExecutePath(), property.getValue());
+            String srcPath = packIfDir(path);
+
+            // get crc file path
+            String srcCRCPath = srcPath + CRC_SUFFIX;
+            try {
+                FileUtils.writeContent2File(FileUtils.getFileChecksum(path), srcCRCPath);
+            } catch (IOException ex) {
+                throw new TaskException(ex.getMessage(), ex);
+            }
+
             // get remote file path
             String resourcePath = getResourcePath(taskExecutionContext, new File(srcPath).getName());
+            String resourceCRCPath = resourcePath + CRC_SUFFIX;
             try {
                 // upload file to storage
                 String resourceWholePath =
                         storageOperate.getResourceFileName(taskExecutionContext.getTenantCode(), resourcePath);
+                String resourceCRCWholePath =
+                        storageOperate.getResourceFileName(taskExecutionContext.getTenantCode(), resourceCRCPath);
                 logger.info("{} --- Local:{} to Remote:{}", property, srcPath, resourceWholePath);
                 storageOperate.upload(taskExecutionContext.getTenantCode(), srcPath, resourceWholePath, false, true);
+                logger.info("{} --- Local:{} to Remote:{}", "CRC file", srcCRCPath, resourceCRCWholePath);
+                storageOperate.upload(taskExecutionContext.getTenantCode(), srcCRCPath, resourceCRCWholePath, false,
+                        true);
             } catch (IOException ex) {
                 throw new TaskException("Upload file to storage error", ex);
             }