Browse Source

[python] Add task type python http (#6906)

* [python] Add task type python http

* Fix unittest error

* Fix UT error
Jiajie Zhong 3 years ago
parent
commit
0dce68edd7

+ 2 - 0
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py

@@ -68,6 +68,8 @@ class TaskType(str):
     """Constants for task type, it will also show you which kind we support up to now."""
 
     SHELL = "SHELL"
+    HTTP = "HTTP"
+    PYTHON = "PYTHON"
 
 
 class DefaultTaskCodeNum(str):

+ 3 - 3
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py

@@ -65,15 +65,15 @@ class TaskParams(ObjectJsonBase):
 
     def __init__(
         self,
-        raw_script: str,
         local_params: Optional[List] = None,
         resource_list: Optional[List] = None,
         dependence: Optional[Dict] = None,
         wait_start_timeout: Optional[Dict] = None,
         condition_result: Optional[Dict] = None,
+        *args,
+        **kwargs,
     ):
-        super().__init__()
-        self.raw_script = raw_script
+        super().__init__(*args, **kwargs)
         self.local_params = local_params or []
         self.resource_list = resource_list or []
         self.dependence = dependence or {}

+ 115 - 0
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/http.py

@@ -0,0 +1,115 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Task shell."""
+
+from typing import Optional
+
+from pydolphinscheduler.constants import TaskType
+from pydolphinscheduler.core.task import Task, TaskParams
+
+
+class HttpMethod:
+    """Constant of HTTP method."""
+
+    GET = "GET"
+    POST = "POST"
+    HEAD = "HEAD"
+    PUT = "PUT"
+    DELETE = "DELETE"
+
+
+class HttpCheckCondition:
+    """Constant of HTTP check condition.
+
+    For now it contain four value:
+    - STATUS_CODE_DEFAULT: when http response code equal to 200, mark as success.
+    - STATUS_CODE_CUSTOM: when http response code equal to the code user define, mark as success.
+    - BODY_CONTAINS: when http response body contain text user define, mark as success.
+    - BODY_NOT_CONTAINS: when http response body do not contain text user define, mark as success.
+    """
+
+    STATUS_CODE_DEFAULT = "STATUS_CODE_DEFAULT"
+    STATUS_CODE_CUSTOM = "STATUS_CODE_CUSTOM"
+    BODY_CONTAINS = "BODY_CONTAINS"
+    BODY_NOT_CONTAINS = "BODY_NOT_CONTAINS"
+
+
+class HttpTaskParams(TaskParams):
+    """Parameter only for Http task types."""
+
+    def __init__(
+        self,
+        url: str,
+        http_method: Optional[str] = HttpMethod.GET,
+        http_params: Optional[str] = None,
+        http_check_condition: Optional[str] = HttpCheckCondition.STATUS_CODE_DEFAULT,
+        condition: Optional[str] = None,
+        connect_timeout: Optional[int] = 60000,
+        socket_timeout: Optional[int] = 60000,
+        *args,
+        **kwargs
+    ):
+        super().__init__(*args, **kwargs)
+        self.url = url
+        if not hasattr(HttpMethod, http_method):
+            raise ValueError("Parameter http_method %s not support.", http_method)
+        self.http_method = http_method
+        self.http_params = http_params or []
+        if not hasattr(HttpCheckCondition, http_check_condition):
+            raise ValueError(
+                "Parameter http_check_condition %s not support.", http_check_condition
+            )
+        self.http_check_condition = http_check_condition
+        if (
+            http_check_condition != HttpCheckCondition.STATUS_CODE_DEFAULT
+            and condition is None
+        ):
+            raise ValueError(
+                "Parameter condition must provider if http_check_condition not equal to STATUS_CODE_DEFAULT"
+            )
+        self.condition = condition
+        self.connect_timeout = connect_timeout
+        self.socket_timeout = socket_timeout
+
+
+class Http(Task):
+    """Task HTTP object, declare behavior for HTTP task to dolphinscheduler."""
+
+    def __init__(
+        self,
+        name: str,
+        url: str,
+        http_method: Optional[str] = HttpMethod.GET,
+        http_params: Optional[str] = None,
+        http_check_condition: Optional[str] = HttpCheckCondition.STATUS_CODE_DEFAULT,
+        condition: Optional[str] = None,
+        connect_timeout: Optional[int] = 60000,
+        socket_timeout: Optional[int] = 60000,
+        *args,
+        **kwargs
+    ):
+        task_params = HttpTaskParams(
+            url=url,
+            http_method=http_method,
+            http_params=http_params,
+            http_check_condition=http_check_condition,
+            condition=condition,
+            connect_timeout=connect_timeout,
+            socket_timeout=socket_timeout,
+        )
+        super().__init__(name, TaskType.HTTP, task_params, *args, **kwargs)

+ 47 - 0
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/python.py

@@ -0,0 +1,47 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Task Python."""
+
+import inspect
+import types
+from typing import Any
+
+from pydolphinscheduler.constants import TaskType
+from pydolphinscheduler.core.task import Task, TaskParams
+
+
+class PythonTaskParams(TaskParams):
+    """Parameter only for Python task types."""
+
+    def __init__(self, raw_script: str, *args, **kwargs):
+        super().__init__(*args, **kwargs)
+        self.raw_script = raw_script
+
+
+class Python(Task):
+    """Task Python object, declare behavior for Python task to dolphinscheduler."""
+
+    def __init__(self, name: str, code: Any, *args, **kwargs):
+        if isinstance(code, str):
+            task_params = PythonTaskParams(raw_script=code)
+        elif isinstance(code, types.FunctionType):
+            py_function = inspect.getsource(code)
+            task_params = PythonTaskParams(raw_script=py_function)
+        else:
+            raise ValueError("Parameter code do not support % for now.", type(code))
+        super().__init__(name, TaskType.PYTHON, task_params, *args, **kwargs)

+ 11 - 5
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/shell.py

@@ -21,6 +21,14 @@ from pydolphinscheduler.constants import TaskType
 from pydolphinscheduler.core.task import Task, TaskParams
 
 
+class ShellTaskParams(TaskParams):
+    """Parameter only for shell task types."""
+
+    def __init__(self, raw_script: str, *args, **kwargs):
+        super().__init__(*args, **kwargs)
+        self.raw_script = raw_script
+
+
 class Shell(Task):
     """Task shell object, declare behavior for shell task to dolphinscheduler.
 
@@ -29,8 +37,6 @@ class Shell(Task):
     task.name assign to `task_shell`
     """
 
-    def __init__(
-        self, name: str, command: str, task_type: str = TaskType.SHELL, *args, **kwargs
-    ):
-        task_params = TaskParams(raw_script=command)
-        super().__init__(name, task_type, task_params, *args, **kwargs)
+    def __init__(self, name: str, command: str, *args, **kwargs):
+        task_params = ShellTaskParams(raw_script=command)
+        super().__init__(name, TaskType.SHELL, task_params, *args, **kwargs)

+ 1 - 1
dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py

@@ -175,7 +175,7 @@ def test_process_definition_simple():
     expect_tasks_num = 5
     with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
         for i in range(expect_tasks_num):
-            task_params = TaskParams(raw_script=f"test-raw-script-{i}")
+            task_params = TaskParams()
             curr_task = Task(
                 name=f"task-{i}", task_type=f"type-{i}", task_params=task_params
             )

+ 7 - 23
dolphinscheduler-python/pydolphinscheduler/tests/core/test_task.py

@@ -27,16 +27,14 @@ from tests.testing.task import Task as testTask
 
 def test_task_params_to_dict():
     """Test TaskParams object function to_dict."""
-    raw_script = "test_task_params_to_dict"
     expect = {
         "resourceList": [],
         "localParams": [],
-        "rawScript": raw_script,
         "dependence": {},
         "conditionResult": TaskParams.DEFAULT_CONDITION_RESULT,
         "waitStartTimeout": {},
     }
-    task_param = TaskParams(raw_script=raw_script)
+    task_param = TaskParams()
     assert task_param.to_dict() == expect
 
 
@@ -65,7 +63,6 @@ def test_task_to_dict():
     version = 1
     name = "test_task_to_dict"
     task_type = "test_task_to_dict_type"
-    raw_script = "test_task_params_to_dict"
     expect = {
         "code": code,
         "name": name,
@@ -76,7 +73,6 @@ def test_task_to_dict():
         "taskParams": {
             "resourceList": [],
             "localParams": [],
-            "rawScript": raw_script,
             "dependence": {},
             "conditionResult": {"successNode": [""], "failedNode": [""]},
             "waitStartTimeout": {},
@@ -94,7 +90,7 @@ def test_task_to_dict():
         "pydolphinscheduler.core.task.Task.gen_code_and_version",
         return_value=(code, version),
     ):
-        task = Task(name=name, task_type=task_type, task_params=TaskParams(raw_script))
+        task = Task(name=name, task_type=task_type, task_params=TaskParams())
         assert task.to_dict() == expect
 
 
@@ -104,13 +100,8 @@ def test_two_tasks_shift(shift: str):
 
     Here we test both `>>` and `<<` bit operator.
     """
-    raw_script = "script"
-    upstream = testTask(
-        name="upstream", task_type=shift, task_params=TaskParams(raw_script)
-    )
-    downstream = testTask(
-        name="downstream", task_type=shift, task_params=TaskParams(raw_script)
-    )
+    upstream = testTask(name="upstream", task_type=shift, task_params=TaskParams())
+    downstream = testTask(name="downstream", task_type=shift, task_params=TaskParams())
     if shift == "<<":
         downstream << upstream
     elif shift == ">>":
@@ -146,17 +137,10 @@ def test_tasks_list_shift(dep_expr: str, flag: str):
         "downstream": "upstream",
     }
     task_type = "dep_task_and_tasks"
-    raw_script = "script"
-    task = testTask(
-        name="upstream", task_type=task_type, task_params=TaskParams(raw_script)
-    )
+    task = testTask(name="upstream", task_type=task_type, task_params=TaskParams())
     tasks = [
-        testTask(
-            name="downstream1", task_type=task_type, task_params=TaskParams(raw_script)
-        ),
-        testTask(
-            name="downstream2", task_type=task_type, task_params=TaskParams(raw_script)
-        ),
+        testTask(name="downstream1", task_type=task_type, task_params=TaskParams()),
+        testTask(name="downstream2", task_type=task_type, task_params=TaskParams()),
     ]
 
     # Use build-in function eval to simply test case and reduce duplicate code

+ 112 - 0
dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_http.py

@@ -0,0 +1,112 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Test Task HTTP."""
+
+from unittest.mock import patch
+
+import pytest
+
+from pydolphinscheduler.tasks.http import (
+    Http,
+    HttpCheckCondition,
+    HttpMethod,
+    HttpTaskParams,
+)
+
+
+@pytest.mark.parametrize(
+    "class_name, attrs",
+    [
+        (HttpMethod, ("GET", "POST", "HEAD", "PUT", "DELETE")),
+        (
+            HttpCheckCondition,
+            (
+                "STATUS_CODE_DEFAULT",
+                "STATUS_CODE_CUSTOM",
+                "BODY_CONTAINS",
+                "BODY_NOT_CONTAINS",
+            ),
+        ),
+    ],
+)
+def test_attr_exists(class_name, attrs):
+    """Test weather class HttpMethod and HttpCheckCondition contain specific attribute."""
+    assert all(hasattr(class_name, attr) for attr in attrs)
+
+
+@pytest.mark.parametrize(
+    "param",
+    [
+        {"http_method": "http_method"},
+        {"http_check_condition": "http_check_condition"},
+        {"http_check_condition": HttpCheckCondition.STATUS_CODE_CUSTOM},
+        {
+            "http_check_condition": HttpCheckCondition.STATUS_CODE_CUSTOM,
+            "condition": None,
+        },
+    ],
+)
+def test_http_task_param_not_support_param(param):
+    """Test HttpTaskParams not support parameter."""
+    url = "https://www.apache.org"
+    with pytest.raises(ValueError, match="Parameter .*?"):
+        HttpTaskParams(url, **param)
+
+
+def test_http_to_dict():
+    """Test task HTTP function to_dict."""
+    code = 123
+    version = 1
+    name = "test_http_to_dict"
+    url = "https://www.apache.org"
+    expect = {
+        "code": code,
+        "name": name,
+        "version": 1,
+        "description": None,
+        "delayTime": 0,
+        "taskType": "HTTP",
+        "taskParams": {
+            "localParams": [],
+            "httpParams": [],
+            "url": url,
+            "httpMethod": "GET",
+            "httpCheckCondition": "STATUS_CODE_DEFAULT",
+            "condition": None,
+            "connectTimeout": 60000,
+            "socketTimeout": 60000,
+            "dependence": {},
+            "resourceList": [],
+            "conditionResult": {"successNode": [""], "failedNode": [""]},
+            "waitStartTimeout": {},
+        },
+        "flag": "YES",
+        "taskPriority": "MEDIUM",
+        "workerGroup": "default",
+        "failRetryTimes": 0,
+        "failRetryInterval": 1,
+        "timeoutFlag": "CLOSE",
+        "timeoutNotifyStrategy": None,
+        "timeout": 0,
+    }
+    with patch(
+        "pydolphinscheduler.core.task.Task.gen_code_and_version",
+        return_value=(code, version),
+    ):
+        http = Http(name, url)
+        assert http.to_dict() == expect

+ 115 - 0
dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_python.py

@@ -0,0 +1,115 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Test Task python."""
+
+
+from unittest.mock import patch
+
+import pytest
+
+from pydolphinscheduler.tasks.python import Python, PythonTaskParams
+
+
+@pytest.mark.parametrize(
+    "name, value",
+    [
+        ("local_params", "local_params"),
+        ("resource_list", "resource_list"),
+        ("dependence", "dependence"),
+        ("wait_start_timeout", "wait_start_timeout"),
+        ("condition_result", "condition_result"),
+    ],
+)
+def test_python_task_params_attr_setter(name, value):
+    """Test python task parameters."""
+    command = 'print("hello world.")'
+    python_task_params = PythonTaskParams(command)
+    assert command == python_task_params.raw_script
+    setattr(python_task_params, name, value)
+    assert value == getattr(python_task_params, name)
+
+
+@pytest.mark.parametrize(
+    "script_code",
+    [
+        123,
+        ("print", "hello world"),
+    ],
+)
+def test_python_task_not_support_code(script_code):
+    """Test python task parameters."""
+    name = "not_support_code_type"
+    code = 123
+    version = 1
+    with patch(
+        "pydolphinscheduler.core.task.Task.gen_code_and_version",
+        return_value=(code, version),
+    ):
+        with pytest.raises(ValueError, match="Parameter code do not support .*?"):
+            Python(name, script_code)
+
+
+def foo():  # noqa: D103
+    print("hello world.")
+
+
+@pytest.mark.parametrize(
+    "name, script_code, raw",
+    [
+        ("string_define", 'print("hello world.")', 'print("hello world.")'),
+        (
+            "function_define",
+            foo,
+            'def foo():  # noqa: D103\n    print("hello world.")\n',
+        ),
+    ],
+)
+def test_python_to_dict(name, script_code, raw):
+    """Test task python function to_dict."""
+    code = 123
+    version = 1
+    expect = {
+        "code": code,
+        "name": name,
+        "version": 1,
+        "description": None,
+        "delayTime": 0,
+        "taskType": "PYTHON",
+        "taskParams": {
+            "resourceList": [],
+            "localParams": [],
+            "rawScript": raw,
+            "dependence": {},
+            "conditionResult": {"successNode": [""], "failedNode": [""]},
+            "waitStartTimeout": {},
+        },
+        "flag": "YES",
+        "taskPriority": "MEDIUM",
+        "workerGroup": "default",
+        "failRetryTimes": 0,
+        "failRetryInterval": 1,
+        "timeoutFlag": "CLOSE",
+        "timeoutNotifyStrategy": None,
+        "timeout": 0,
+    }
+    with patch(
+        "pydolphinscheduler.core.task.Task.gen_code_and_version",
+        return_value=(code, version),
+    ):
+        shell = Python(name, script_code)
+        assert shell.to_dict() == expect

+ 22 - 1
dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_shell.py

@@ -20,7 +20,28 @@
 
 from unittest.mock import patch
 
-from pydolphinscheduler.tasks.shell import Shell
+import pytest
+
+from pydolphinscheduler.tasks.shell import Shell, ShellTaskParams
+
+
+@pytest.mark.parametrize(
+    "name, value",
+    [
+        ("local_params", "local_params"),
+        ("resource_list", "resource_list"),
+        ("dependence", "dependence"),
+        ("wait_start_timeout", "wait_start_timeout"),
+        ("condition_result", "condition_result"),
+    ],
+)
+def test_shell_task_params_attr_setter(name, value):
+    """Test shell task parameters."""
+    raw_script = "echo shell task parameter"
+    shell_task_params = ShellTaskParams(raw_script)
+    assert raw_script == shell_task_params.raw_script
+    setattr(shell_task_params, name, value)
+    assert value == getattr(shell_task_params, name)
 
 
 def test_shell_to_dict():