Browse Source

[HotFix] Fix TaskOutputParameterParser might OOM if meed a bad output param expression (#15264)

Wenjun Ruan 1 year ago
parent
commit
db3d84b73d

+ 43 - 11
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parser/TaskOutputParameterParser.java

@@ -36,46 +36,78 @@ import lombok.extern.slf4j.Slf4j;
 @NotThreadSafe
 public class TaskOutputParameterParser {
 
-    private final Map<String, String> taskOutputParams = new HashMap<>();
+    // Used to avoid '${setValue(' which loss the end of ')}'
+    private final int maxOneParameterRows;
+
+    // Used to avoid '${setValue(' which length is too long, this may case OOM
+    private final int maxOneParameterLength;
+
+    private final Map<String, String> taskOutputParams;
 
     private List<String> currentTaskOutputParam;
 
-    public void appendParseLog(String log) {
-        if (log == null) {
+    private long currentTaskOutputParamLength;
+
+    public TaskOutputParameterParser() {
+        // the default max rows of one parameter is 1024, this should be enough
+        this(1024, Integer.MAX_VALUE);
+    }
+
+    public TaskOutputParameterParser(int maxOneParameterRows, int maxOneParameterLength) {
+        this.maxOneParameterRows = maxOneParameterRows;
+        this.maxOneParameterLength = maxOneParameterLength;
+        this.taskOutputParams = new HashMap<>();
+        this.currentTaskOutputParam = null;
+        this.currentTaskOutputParamLength = 0;
+    }
+
+    public void appendParseLog(String logLine) {
+        if (logLine == null) {
             return;
         }
 
         if (currentTaskOutputParam != null) {
+            if (currentTaskOutputParam.size() > maxOneParameterRows
+                    || currentTaskOutputParamLength > maxOneParameterLength) {
+                log.warn(
+                        "The output param expression '{}' is too long, the max rows is {}, max length is {}, will skip this param",
+                        String.join("\n", currentTaskOutputParam), maxOneParameterLength, maxOneParameterRows);
+                currentTaskOutputParam = null;
+                currentTaskOutputParamLength = 0;
+                return;
+            }
             // continue to parse the rest of line
-            int i = log.indexOf(")}");
+            int i = logLine.indexOf(")}");
             if (i == -1) {
                 // the end of var pool not found
-                currentTaskOutputParam.add(log);
+                currentTaskOutputParam.add(logLine);
+                currentTaskOutputParamLength += logLine.length();
             } else {
                 // the end of var pool found
-                currentTaskOutputParam.add(log.substring(0, i + 2));
+                currentTaskOutputParam.add(logLine.substring(0, i + 2));
                 Pair<String, String> keyValue = parseOutputParam(String.join("\n", currentTaskOutputParam));
                 if (keyValue.getKey() != null && keyValue.getValue() != null) {
                     taskOutputParams.put(keyValue.getKey(), keyValue.getValue());
                 }
                 currentTaskOutputParam = null;
+                currentTaskOutputParamLength = 0;
                 // continue to parse the rest of line
-                if (i + 2 != log.length()) {
-                    appendParseLog(log.substring(i + 2));
+                if (i + 2 != logLine.length()) {
+                    appendParseLog(logLine.substring(i + 2));
                 }
             }
             return;
         }
 
-        int indexOfVarPoolBegin = log.indexOf("${setValue(");
+        int indexOfVarPoolBegin = logLine.indexOf("${setValue(");
         if (indexOfVarPoolBegin == -1) {
-            indexOfVarPoolBegin = log.indexOf("#{setValue(");
+            indexOfVarPoolBegin = logLine.indexOf("#{setValue(");
         }
         if (indexOfVarPoolBegin == -1) {
             return;
         }
         currentTaskOutputParam = new ArrayList<>();
-        appendParseLog(log.substring(indexOfVarPoolBegin));
+        appendParseLog(logLine.substring(indexOfVarPoolBegin));
     }
 
     public Map<String, String> getTaskOutputParams() {

+ 41 - 8
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/parser/TaskOutputParameterParserTest.java

@@ -19,14 +19,15 @@ package org.apache.dolphinscheduler.plugin.task.api.parser;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
-import java.io.IOException;
 import java.net.URI;
-import java.net.URISyntaxException;
 import java.nio.file.Files;
 import java.nio.file.Paths;
+import java.util.Collections;
 import java.util.List;
 import java.util.stream.Collectors;
 
+import lombok.SneakyThrows;
+
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
@@ -35,7 +36,7 @@ import com.google.common.collect.ImmutableMap;
 class TaskOutputParameterParserTest {
 
     @Test
-    void testEmptyLog() throws IOException, URISyntaxException {
+    void testEmptyLog() {
         List<String> varPools = getLogs("/outputParam/emptyVarPoolLog.txt");
         TaskOutputParameterParser taskOutputParameterParser = new TaskOutputParameterParser();
         varPools.forEach(taskOutputParameterParser::appendParseLog);
@@ -43,7 +44,7 @@ class TaskOutputParameterParserTest {
     }
 
     @Test
-    void testOneLineLog() throws IOException, URISyntaxException {
+    void testOneLineLog() {
         List<String> varPools = getLogs("/outputParam/onelineVarPoolLog.txt");
         TaskOutputParameterParser taskOutputParameterParser = new TaskOutputParameterParser();
         varPools.forEach(taskOutputParameterParser::appendParseLog);
@@ -51,7 +52,7 @@ class TaskOutputParameterParserTest {
     }
 
     @Test
-    void testOneVarPollInMultiLineLog() throws IOException, URISyntaxException {
+    void testOneVarPoolInMultiLineLog() {
         List<String> varPools = getLogs("/outputParam/oneVarPollInMultiLineLog.txt");
         TaskOutputParameterParser taskOutputParameterParser = new TaskOutputParameterParser();
         varPools.forEach(taskOutputParameterParser::appendParseLog);
@@ -63,14 +64,46 @@ class TaskOutputParameterParserTest {
     }
 
     @Test
-    void testVarPollInMultiLineLog() throws IOException, URISyntaxException {
-        List<String> varPools = getLogs("/outputParam/multipleVarPoll.txt");
+    void testVarPoolInMultiLineLog() {
+        List<String> varPools = getLogs("/outputParam/multipleVarPool.txt");
         TaskOutputParameterParser taskOutputParameterParser = new TaskOutputParameterParser();
         varPools.forEach(taskOutputParameterParser::appendParseLog);
         assertEquals(ImmutableMap.of("name", "tom", "age", "1"), taskOutputParameterParser.getTaskOutputParams());
     }
 
-    private List<String> getLogs(String file) throws IOException, URISyntaxException {
+    @Test
+    void textVarPoolExceedMaxRows() {
+        List<String> varPools = getLogs("/outputParam/maxRowsVarPool.txt");
+        TaskOutputParameterParser taskOutputParameterParser = new TaskOutputParameterParser(2, Integer.MAX_VALUE);
+        varPools.forEach(taskOutputParameterParser::appendParseLog);
+        assertEquals(Collections.emptyMap(), taskOutputParameterParser.getTaskOutputParams());
+
+        taskOutputParameterParser = new TaskOutputParameterParser();
+        varPools.forEach(taskOutputParameterParser::appendParseLog);
+        assertEquals(ImmutableMap.of("name", "name=tom\n" +
+                "name=name=tom\n" +
+                "name=name=tom\n" +
+                "name=name=tom\n" +
+                "name=name=tom"), taskOutputParameterParser.getTaskOutputParams());
+
+    }
+
+    @Test
+    void textVarPoolExceedMaxLength() {
+        List<String> varPools = getLogs("/outputParam/maxLengthVarPool.txt");
+        TaskOutputParameterParser taskOutputParameterParser = new TaskOutputParameterParser(2, 10);
+        varPools.forEach(taskOutputParameterParser::appendParseLog);
+        assertEquals(Collections.emptyMap(), taskOutputParameterParser.getTaskOutputParams());
+
+        taskOutputParameterParser = new TaskOutputParameterParser();
+        varPools.forEach(taskOutputParameterParser::appendParseLog);
+        assertEquals(ImmutableMap.of("name", "123456789\n" +
+                "12345\n"), taskOutputParameterParser.getTaskOutputParams());
+
+    }
+
+    @SneakyThrows
+    private List<String> getLogs(String file) {
         URI uri = TaskOutputParameterParserTest.class.getResource(file).toURI();
         return Files.lines(Paths.get(uri)).collect(Collectors.toList());
     }

+ 20 - 0
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/resources/outputParam/maxLengthVarPool.txt

@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+${setValue(name=123456789
+12345
+)}

+ 22 - 0
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/resources/outputParam/maxRowsVarPool.txt

@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+${setValue(name=name=tom
+name=name=tom
+name=name=tom
+name=name=tom
+name=name=tom)}

dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/resources/outputParam/multipleVarPoll.txt → dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/resources/outputParam/multipleVarPool.txt