Bläddra i källkod

[python] Add task condition (#7505)

* [python] Add task condition

close: #6927

* Add example and set downstream dep
Jiajie Zhong 3 år sedan
förälder
incheckning
e23a4848c0

+ 55 - 0
dolphinscheduler-python/pydolphinscheduler/examples/task_conditions_example.py

@@ -0,0 +1,55 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+r"""
+A example workflow for task condition.
+
+This example will create five task in single workflow, with four shell task and one condition task. Task
+condition have one upstream which we declare explicit with syntax `parent >> condition`, and three downstream
+automatically set dependence by condition task by passing parameter `condition`. The graph of this workflow
+like:
+                         --> condition_success_1
+                       /
+parent -> conditions ->  --> condition_success_2
+                       \
+                         --> condition_fail
+.
+"""
+
+from pydolphinscheduler.core.process_definition import ProcessDefinition
+from pydolphinscheduler.tasks.condition import FAILURE, SUCCESS, And, Conditions
+from pydolphinscheduler.tasks.shell import Shell
+
+with ProcessDefinition(name="task_conditions_example", tenant="tenant_exists") as pd:
+    parent = Shell(name="parent", command="echo parent")
+    condition_success_1 = Shell(
+        name="condition_success_1", command="echo condition_success_1"
+    )
+    condition_success_2 = Shell(
+        name="condition_success_2", command="echo condition_success_2"
+    )
+    condition_fail = Shell(name="condition_fail", command="echo condition_fail")
+    cond_operator = And(
+        And(
+            SUCCESS(condition_success_1, condition_success_2),
+            FAILURE(condition_fail),
+        ),
+    )
+
+    condition = Conditions(name="conditions", condition=cond_operator)
+    parent >> condition
+    pd.submit()

+ 1 - 0
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py

@@ -75,6 +75,7 @@ class TaskType(str):
     PROCEDURE = "PROCEDURE"
     PROCEDURE = "PROCEDURE"
     DATAX = "DATAX"
     DATAX = "DATAX"
     DEPENDENT = "DEPENDENT"
     DEPENDENT = "DEPENDENT"
+    CONDITIONS = "CONDITIONS"
 
 
 
 
 class DefaultTaskCodeNum(str):
 class DefaultTaskCodeNum(str):

+ 185 - 0
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/condition.py

@@ -0,0 +1,185 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Task Conditions."""
+
+from typing import Dict, List
+
+from pydolphinscheduler.constants import TaskType
+from pydolphinscheduler.core.base import Base
+from pydolphinscheduler.core.task import Task
+from pydolphinscheduler.exceptions import PyDSParamException
+
+
+class Status(Base):
+    """Base class of Condition task status.
+
+    It a parent class for :class:`SUCCESS` and :class:`FAILURE`. Provider status name
+    and :func:`get_define` to sub class.
+    """
+
+    def __init__(self, *tasks):
+        super().__init__(f"Condition.{self.status_name()}")
+        self.tasks = tasks
+
+    def __repr__(self) -> str:
+        return "depend_item_list"
+
+    @classmethod
+    def status_name(cls) -> str:
+        """Get name for Status or its sub class."""
+        return cls.__name__.upper()
+
+    def get_define(self, camel_attr: bool = True) -> List:
+        """Get status definition attribute communicate to Java gateway server."""
+        content = []
+        for task in self.tasks:
+            if not isinstance(task, Task):
+                raise PyDSParamException(
+                    "%s only accept class Task or sub class Task, but get %s",
+                    (self.status_name(), type(task)),
+                )
+            content.append({"depTaskCode": task.code, "status": self.status_name()})
+        return content
+
+
+class SUCCESS(Status):
+    """Class SUCCESS to task condition, sub class of :class:`Status`."""
+
+    def __init__(self, *tasks):
+        super().__init__(*tasks)
+
+
+class FAILURE(Status):
+    """Class FAILURE to task condition, sub class of :class:`Status`."""
+
+    def __init__(self, *tasks):
+        super().__init__(*tasks)
+
+
+class ConditionOperator(Base):
+    """Set ConditionTask or ConditionOperator with specific operator."""
+
+    _DEFINE_ATTR = {
+        "relation",
+    }
+
+    def __init__(self, *args):
+        super().__init__(self.__class__.__name__)
+        self.args = args
+
+    def __repr__(self) -> str:
+        return "depend_task_list"
+
+    @classmethod
+    def operator_name(cls) -> str:
+        """Get operator name in different class."""
+        return cls.__name__.upper()
+
+    @property
+    def relation(self) -> str:
+        """Get operator name in different class, for function :func:`get_define`."""
+        return self.operator_name()
+
+    def set_define_attr(self) -> str:
+        """Set attribute to function :func:`get_define`.
+
+        It is a wrapper for both `And` and `Or` operator.
+        """
+        result = []
+        attr = None
+        for condition in self.args:
+            if isinstance(condition, (Status, ConditionOperator)):
+                if attr is None:
+                    attr = repr(condition)
+                elif repr(condition) != attr:
+                    raise PyDSParamException(
+                        "Condition %s operator parameter only support same type.",
+                        self.relation,
+                    )
+            else:
+                raise PyDSParamException(
+                    "Condition %s operator parameter support ConditionTask and ConditionOperator but got %s.",
+                    (self.relation, type(condition)),
+                )
+            if attr == "depend_item_list":
+                result.extend(condition.get_define())
+            else:
+                result.append(condition.get_define())
+        setattr(self, attr, result)
+        return attr
+
+    def get_define(self, camel_attr=True) -> Dict:
+        """Overwrite Base.get_define to get task Condition specific get define."""
+        attr = self.set_define_attr()
+        dependent_define_attr = self._DEFINE_ATTR.union({attr})
+        return super().get_define_custom(
+            camel_attr=True, custom_attr=dependent_define_attr
+        )
+
+
+class And(ConditionOperator):
+    """Operator And for task condition.
+
+    It could accept both :class:`Task` and children of :class:`ConditionOperator`,
+    and set AND condition to those args.
+    """
+
+    def __init__(self, *args):
+        super().__init__(*args)
+
+
+class Or(ConditionOperator):
+    """Operator Or for task condition.
+
+    It could accept both :class:`Task` and children of :class:`ConditionOperator`,
+    and set OR condition to those args.
+    """
+
+    def __init__(self, *args):
+        super().__init__(*args)
+
+
+class Conditions(Task):
+    """Task condition object, declare behavior for condition task to dolphinscheduler."""
+
+    def __init__(self, name: str, condition: ConditionOperator, *args, **kwargs):
+        super().__init__(name, TaskType.CONDITIONS, *args, **kwargs)
+        self.condition = condition
+        # Set condition tasks as current task downstream
+        self._set_dep()
+
+    def _set_dep(self) -> None:
+        """Set downstream according to parameter `condition`."""
+        downstream = []
+        for cond in self.condition.args:
+            if isinstance(cond, ConditionOperator):
+                for status in cond.args:
+                    downstream.extend(list(status.tasks))
+        self.set_downstream(downstream)
+
+    @property
+    def task_params(self, camel_attr: bool = True, custom_attr: set = None) -> Dict:
+        """Override Task.task_params for Condition task.
+
+        Condition task have some specials attribute `dependence`, and in most of the task
+        this attribute is None and use empty dict `{}` as default value. We do not use class
+        attribute `_task_custom_attr` due to avoid attribute cover.
+        """
+        params = super().task_params
+        params["dependence"] = self.condition.get_define()
+        return params

+ 0 - 3
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/dependent.py

@@ -180,9 +180,6 @@ class DependentOperator(Base):
         "relation",
         "relation",
     }
     }
 
 
-    DEPENDENT_ITEM = "DependentItem"
-    DEPENDENT_OPERATOR = "DependentOperator"
-
     def __init__(self, *args):
     def __init__(self, *args):
         super().__init__(self.__class__.__name__)
         super().__init__(self.__class__.__name__)
         self.args = args
         self.args = args

+ 439 - 0
dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_condition.py

@@ -0,0 +1,439 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Test Task dependent."""
+from typing import List, Tuple
+from unittest.mock import patch
+
+import pytest
+
+from pydolphinscheduler.core.process_definition import ProcessDefinition
+from pydolphinscheduler.exceptions import PyDSParamException
+from pydolphinscheduler.tasks.condition import (
+    FAILURE,
+    SUCCESS,
+    And,
+    ConditionOperator,
+    Conditions,
+    Or,
+    Status,
+)
+from tests.testing.task import Task
+
+TEST_NAME = "test-name"
+TEST_PROJECT = "test-project"
+TEST_PROCESS_DEFINITION = "test-process-definition"
+TEST_TYPE = "test-type"
+TEST_PROJECT_CODE, TEST_DEFINITION_CODE, TEST_TASK_CODE = 12345, 123456, 1234567
+
+TEST_OPERATOR_LIST = ("AND", "OR")
+
+
+@pytest.mark.parametrize(
+    "obj, expect",
+    [
+        (Status, "STATUS"),
+        (SUCCESS, "SUCCESS"),
+        (FAILURE, "FAILURE"),
+    ],
+)
+def test_class_status_status_name(obj: Status, expect: str):
+    """Test class status and sub class property status_name."""
+    assert obj.status_name() == expect
+
+
+@pytest.mark.parametrize(
+    "obj, tasks",
+    [
+        (Status, (1, 2, 3)),
+        (SUCCESS, (1.1, 2.2, 3.3)),
+        (FAILURE, (ConditionOperator(1), ConditionOperator(2), ConditionOperator(3))),
+    ],
+)
+def test_class_status_depend_item_list_no_expect_type(obj: Status, tasks: Tuple):
+    """Test class status and sub class raise error when assign not support type."""
+    with pytest.raises(
+        PyDSParamException, match=".*?only accept class Task or sub class Task, but get"
+    ):
+        obj(*tasks).get_define()
+
+
+@pytest.mark.parametrize(
+    "obj, tasks",
+    [
+        (Status, [Task(str(i), TEST_TYPE) for i in range(1)]),
+        (Status, [Task(str(i), TEST_TYPE) for i in range(2)]),
+        (Status, [Task(str(i), TEST_TYPE) for i in range(3)]),
+        (SUCCESS, [Task(str(i), TEST_TYPE) for i in range(1)]),
+        (SUCCESS, [Task(str(i), TEST_TYPE) for i in range(2)]),
+        (SUCCESS, [Task(str(i), TEST_TYPE) for i in range(3)]),
+        (FAILURE, [Task(str(i), TEST_TYPE) for i in range(1)]),
+        (FAILURE, [Task(str(i), TEST_TYPE) for i in range(2)]),
+        (FAILURE, [Task(str(i), TEST_TYPE) for i in range(3)]),
+    ],
+)
+def test_class_status_depend_item_list(obj: Status, tasks: Tuple):
+    """Test class status and sub class function :func:`depend_item_list`."""
+    status = obj.status_name()
+    expect = [
+        {
+            "depTaskCode": i.code,
+            "status": status,
+        }
+        for i in tasks
+    ]
+    assert obj(*tasks).get_define() == expect
+
+
+@pytest.mark.parametrize(
+    "obj, expect",
+    [
+        (ConditionOperator, "CONDITIONOPERATOR"),
+        (And, "AND"),
+        (Or, "OR"),
+    ],
+)
+def test_condition_operator_operator_name(obj: ConditionOperator, expect: str):
+    """Test class ConditionOperator and sub class class function :func:`operator_name`."""
+    assert obj.operator_name() == expect
+
+
+@pytest.mark.parametrize(
+    "obj, expect",
+    [
+        (ConditionOperator, "CONDITIONOPERATOR"),
+        (And, "AND"),
+        (Or, "OR"),
+    ],
+)
+def test_condition_operator_relation(obj: ConditionOperator, expect: str):
+    """Test class ConditionOperator and sub class class property `relation`."""
+    assert obj(1).relation == expect
+
+
+@pytest.mark.parametrize(
+    "obj, status_or_operator, match",
+    [
+        (
+            ConditionOperator,
+            [Status(Task("1", TEST_TYPE)), 1],
+            ".*?operator parameter support ConditionTask and ConditionOperator.*?",
+        ),
+        (
+            ConditionOperator,
+            [
+                Status(Task("1", TEST_TYPE)),
+                1.0,
+            ],
+            ".*?operator parameter support ConditionTask and ConditionOperator.*?",
+        ),
+        (
+            ConditionOperator,
+            [
+                Status(Task("1", TEST_TYPE)),
+                ConditionOperator(And(Status(Task("1", TEST_TYPE)))),
+            ],
+            ".*?operator parameter only support same type.",
+        ),
+        (
+            ConditionOperator,
+            [
+                ConditionOperator(And(Status(Task("1", TEST_TYPE)))),
+                Status(Task("1", TEST_TYPE)),
+            ],
+            ".*?operator parameter only support same type.",
+        ),
+    ],
+)
+def test_condition_operator_set_define_attr_not_support_type(
+    obj, status_or_operator, match
+):
+    """Test class ConditionOperator parameter error, including parameter not same or type not support."""
+    with pytest.raises(PyDSParamException, match=match):
+        op = obj(*status_or_operator)
+        op.set_define_attr()
+
+
+@pytest.mark.parametrize(
+    "obj, task_num",
+    [
+        (ConditionOperator, 1),
+        (ConditionOperator, 2),
+        (ConditionOperator, 3),
+        (And, 1),
+        (And, 2),
+        (And, 3),
+        (Or, 1),
+        (Or, 2),
+        (Or, 3),
+    ],
+)
+def test_condition_operator_set_define_attr_status(
+    obj: ConditionOperator, task_num: int
+):
+    """Test :func:`set_define_attr` with one or more class status."""
+    attr = "depend_item_list"
+
+    tasks = [Task(str(i), TEST_TYPE) for i in range(task_num)]
+    status = Status(*tasks)
+
+    expect = [
+        {"depTaskCode": task.code, "status": status.status_name()} for task in tasks
+    ]
+
+    co = obj(status)
+    co.set_define_attr()
+    assert getattr(co, attr) == expect
+
+
+@pytest.mark.parametrize(
+    "obj, status",
+    [
+        (ConditionOperator, (SUCCESS, SUCCESS)),
+        (ConditionOperator, (FAILURE, FAILURE)),
+        (ConditionOperator, (SUCCESS, FAILURE)),
+        (ConditionOperator, (FAILURE, SUCCESS)),
+        (And, (SUCCESS, SUCCESS)),
+        (And, (FAILURE, FAILURE)),
+        (And, (SUCCESS, FAILURE)),
+        (And, (FAILURE, SUCCESS)),
+        (Or, (SUCCESS, SUCCESS)),
+        (Or, (FAILURE, FAILURE)),
+        (Or, (SUCCESS, FAILURE)),
+        (Or, (FAILURE, SUCCESS)),
+    ],
+)
+def test_condition_operator_set_define_attr_mix_status(
+    obj: ConditionOperator, status: List[Status]
+):
+    """Test :func:`set_define_attr` with one or more mixed status."""
+    attr = "depend_item_list"
+
+    task = Task("test-operator", TEST_TYPE)
+    status_list = []
+    expect = []
+    for sta in status:
+        status_list.append(sta(task))
+        expect.append({"depTaskCode": task.code, "status": sta.status_name()})
+
+    co = obj(*status_list)
+    co.set_define_attr()
+    assert getattr(co, attr) == expect
+
+
+@pytest.mark.parametrize(
+    "obj, task_num",
+    [
+        (ConditionOperator, 1),
+        (ConditionOperator, 2),
+        (ConditionOperator, 3),
+        (And, 1),
+        (And, 2),
+        (And, 3),
+        (Or, 1),
+        (Or, 2),
+        (Or, 3),
+    ],
+)
+def test_condition_operator_set_define_attr_operator(
+    obj: ConditionOperator, task_num: int
+):
+    """Test :func:`set_define_attr` with one or more class condition operator."""
+    attr = "depend_task_list"
+
+    task = Task("test-operator", TEST_TYPE)
+    status = Status(task)
+
+    expect = [
+        {
+            "relation": obj.operator_name(),
+            "dependItemList": [
+                {
+                    "depTaskCode": task.code,
+                    "status": status.status_name(),
+                }
+            ],
+        }
+        for _ in range(task_num)
+    ]
+
+    co = obj(*[obj(status) for _ in range(task_num)])
+    co.set_define_attr()
+    assert getattr(co, attr) == expect
+
+
+@pytest.mark.parametrize(
+    "cond, sub_cond",
+    [
+        (ConditionOperator, (And, Or)),
+        (ConditionOperator, (Or, And)),
+        (And, (And, Or)),
+        (And, (Or, And)),
+        (Or, (And, Or)),
+        (Or, (Or, And)),
+    ],
+)
+def test_condition_operator_set_define_attr_mix_operator(
+    cond: ConditionOperator, sub_cond: Tuple[ConditionOperator]
+):
+    """Test :func:`set_define_attr` with one or more class mix condition operator."""
+    attr = "depend_task_list"
+
+    task = Task("test-operator", TEST_TYPE)
+
+    expect = []
+    sub_condition = []
+    for cond in sub_cond:
+        status = Status(task)
+        sub_condition.append(cond(status))
+        expect.append(
+            {
+                "relation": cond.operator_name(),
+                "dependItemList": [
+                    {
+                        "depTaskCode": task.code,
+                        "status": status.status_name(),
+                    }
+                ],
+            }
+        )
+    co = cond(*sub_condition)
+    co.set_define_attr()
+    assert getattr(co, attr) == expect
+
+
+@patch(
+    "pydolphinscheduler.core.task.Task.gen_code_and_version",
+    return_value=(12345, 1),
+)
+@patch(
+    "pydolphinscheduler.tasks.condition.Conditions.gen_code_and_version",
+    return_value=(123, 1),
+)
+def test_dependent_get_define(mock_condition_code_version, mock_task_code_version):
+    """Test task condition :func:`get_define`."""
+    common_task = Task(name="common_task", task_type="test_task_condition")
+    cond_operator = And(
+        And(
+            SUCCESS(common_task, common_task),
+            FAILURE(common_task, common_task),
+        ),
+        Or(
+            SUCCESS(common_task, common_task),
+            FAILURE(common_task, common_task),
+        ),
+    )
+
+    name = "test_condition_get_define"
+    expect = {
+        "code": 123,
+        "name": name,
+        "version": 1,
+        "description": None,
+        "delayTime": 0,
+        "taskType": "CONDITIONS",
+        "taskParams": {
+            "resourceList": [],
+            "localParams": [],
+            "dependence": {
+                "relation": "AND",
+                "dependTaskList": [
+                    {
+                        "relation": "AND",
+                        "dependItemList": [
+                            {"depTaskCode": common_task.code, "status": "SUCCESS"},
+                            {"depTaskCode": common_task.code, "status": "SUCCESS"},
+                            {"depTaskCode": common_task.code, "status": "FAILURE"},
+                            {"depTaskCode": common_task.code, "status": "FAILURE"},
+                        ],
+                    },
+                    {
+                        "relation": "OR",
+                        "dependItemList": [
+                            {"depTaskCode": common_task.code, "status": "SUCCESS"},
+                            {"depTaskCode": common_task.code, "status": "SUCCESS"},
+                            {"depTaskCode": common_task.code, "status": "FAILURE"},
+                            {"depTaskCode": common_task.code, "status": "FAILURE"},
+                        ],
+                    },
+                ],
+            },
+            "conditionResult": {"successNode": [""], "failedNode": [""]},
+            "waitStartTimeout": {},
+        },
+        "flag": "YES",
+        "taskPriority": "MEDIUM",
+        "workerGroup": "default",
+        "failRetryTimes": 0,
+        "failRetryInterval": 1,
+        "timeoutFlag": "CLOSE",
+        "timeoutNotifyStrategy": None,
+        "timeout": 0,
+    }
+
+    task = Conditions(name, condition=cond_operator)
+    assert task.get_define() == expect
+
+
+@patch(
+    "pydolphinscheduler.core.task.Task.gen_code_and_version",
+    return_value=(123, 1),
+)
+def test_condition_set_dep_workflow(mock_task_code_version):
+    """Test task condition set dependence in workflow level."""
+    with ProcessDefinition(name="test-condition-set-dep-workflow") as pd:
+        parent = Task(name="parent", task_type=TEST_TYPE)
+        condition_success_1 = Task(name="condition_success_1", task_type=TEST_TYPE)
+        condition_success_2 = Task(name="condition_success_2", task_type=TEST_TYPE)
+        condition_fail = Task(name="condition_fail", task_type=TEST_TYPE)
+        cond_operator = And(
+            And(
+                SUCCESS(condition_success_1, condition_success_2),
+                FAILURE(condition_fail),
+            ),
+        )
+
+        condition = Conditions(name=TEST_NAME, condition=cond_operator)
+        parent >> condition
+        # General tasks test
+        assert len(pd.tasks) == 5
+        assert sorted(pd.task_list, key=lambda t: t.name) == sorted(
+            [
+                parent,
+                condition,
+                condition_success_1,
+                condition_success_2,
+                condition_fail,
+            ],
+            key=lambda t: t.name,
+        )
+        # Task dep test
+        assert parent._downstream_task_codes == {condition.code}
+        assert condition._upstream_task_codes == {parent.code}
+
+        # Condition task dep after ProcessDefinition function get_define called
+        assert condition._downstream_task_codes == {
+            condition_success_1.code,
+            condition_success_2.code,
+            condition_fail.code,
+        }
+        assert all(
+            [
+                child._upstream_task_codes == {condition.code}
+                for child in [condition_success_1, condition_success_2, condition_fail]
+            ]
+        )