Browse Source

[python] Task condition missing two downstream param (#7783)

* [python] Task condition missing two downstream param

We add two downstream tasks to set task condition
success and failed node

close: #7763

* Add getter and setter property condition_resulth in base task
Jiajie Zhong 3 years ago
parent
commit
1417967d9e

+ 18 - 17
dolphinscheduler-python/pydolphinscheduler/examples/task_conditions_example.py

@@ -22,11 +22,11 @@ This example will create five task in single workflow, with four shell task and
 condition have one upstream which we declare explicit with syntax `parent >> condition`, and three downstream
 automatically set dependence by condition task by passing parameter `condition`. The graph of this workflow
 like:
-pre_task_success_1 ->
-                      \
-pre_task_success_2 ->  --> conditions -> end
-                      /
-pre_task_fail      ->
+pre_task_1 ->                     -> success_branch
+             \                  /
+pre_task_2 ->  -> conditions ->
+             /                  \
+pre_task_3 ->                     -> fail_branch
 .
 """
 
@@ -35,22 +35,23 @@ from pydolphinscheduler.tasks.condition import FAILURE, SUCCESS, And, Conditions
 from pydolphinscheduler.tasks.shell import Shell
 
 with ProcessDefinition(name="task_conditions_example", tenant="tenant_exists") as pd:
-    condition_pre_task_1 = Shell(
-        name="pre_task_success_1", command="echo pre_task_success_1"
-    )
-    condition_pre_task_2 = Shell(
-        name="pre_task_success_2", command="echo pre_task_success_2"
-    )
-    condition_pre_task_3 = Shell(name="pre_task_fail", command="echo pre_task_fail")
+    pre_task_1 = Shell(name="pre_task_1", command="echo pre_task_1")
+    pre_task_2 = Shell(name="pre_task_2", command="echo pre_task_2")
+    pre_task_3 = Shell(name="pre_task_3", command="echo pre_task_3")
     cond_operator = And(
         And(
-            SUCCESS(condition_pre_task_1, condition_pre_task_2),
-            FAILURE(condition_pre_task_3),
+            SUCCESS(pre_task_1, pre_task_2),
+            FAILURE(pre_task_3),
         ),
     )
 
-    end = Shell(name="end", command="echo parent")
+    success_branch = Shell(name="success_branch", command="echo success_branch")
+    fail_branch = Shell(name="fail_branch", command="echo fail_branch")
 
-    condition = Conditions(name="conditions", condition=cond_operator)
-    condition >> end
+    condition = Conditions(
+        name="conditions",
+        condition=cond_operator,
+        success_task=success_branch,
+        failed_task=fail_branch,
+    )
     pd.submit()

+ 11 - 1
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py

@@ -156,7 +156,7 @@ class Task(Base):
         self.resource_list = resource_list or []
         self.dependence = dependence or {}
         self.wait_start_timeout = wait_start_timeout or {}
-        self.condition_result = condition_result or self.DEFAULT_CONDITION_RESULT
+        self._condition_result = condition_result or self.DEFAULT_CONDITION_RESULT
 
     @property
     def process_definition(self) -> Optional[ProcessDefinition]:
@@ -168,6 +168,16 @@ class Task(Base):
         """Set attribute process_definition."""
         self._process_definition = process_definition
 
+    @property
+    def condition_result(self) -> Dict:
+        """Get attribute condition_result."""
+        return self._condition_result
+
+    @condition_result.setter
+    def condition_result(self, condition_result: Optional[Dict]):
+        """Set attribute condition_result."""
+        self._condition_result = condition_result
+
     @property
     def task_params(self) -> Optional[Dict]:
         """Get task parameter object.

+ 20 - 1
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/condition.py

@@ -157,9 +157,19 @@ class Or(ConditionOperator):
 class Conditions(Task):
     """Task condition object, declare behavior for condition task to dolphinscheduler."""
 
-    def __init__(self, name: str, condition: ConditionOperator, *args, **kwargs):
+    def __init__(
+        self,
+        name: str,
+        condition: ConditionOperator,
+        success_task: Task,
+        failed_task: Task,
+        *args,
+        **kwargs,
+    ):
         super().__init__(name, TaskType.CONDITIONS, *args, **kwargs)
         self.condition = condition
+        self.success_task = success_task
+        self.failed_task = failed_task
         # Set condition tasks as current task downstream
         self._set_dep()
 
@@ -171,6 +181,15 @@ class Conditions(Task):
                 for status in cond.args:
                     upstream.extend(list(status.tasks))
         self.set_upstream(upstream)
+        self.set_downstream([self.success_task, self.failed_task])
+
+    @property
+    def condition_result(self) -> Dict:
+        """Get condition result define for java gateway."""
+        return {
+            "successNode": [self.success_task.code],
+            "failedNode": [self.failed_task.code],
+        }
 
     @property
     def task_params(self, camel_attr: bool = True, custom_attr: set = None) -> Dict:

+ 40 - 24
dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_condition.py

@@ -324,7 +324,7 @@ def test_condition_operator_set_define_attr_mix_operator(
     "pydolphinscheduler.tasks.condition.Conditions.gen_code_and_version",
     return_value=(123, 1),
 )
-def test_dependent_get_define(mock_condition_code_version, mock_task_code_version):
+def test_condition_get_define(mock_condition_code_version, mock_task_code_version):
     """Test task condition :func:`get_define`."""
     common_task = Task(name="common_task", task_type="test_task_condition")
     cond_operator = And(
@@ -372,7 +372,10 @@ def test_dependent_get_define(mock_condition_code_version, mock_task_code_versio
                     },
                 ],
             },
-            "conditionResult": {"successNode": [""], "failedNode": [""]},
+            "conditionResult": {
+                "successNode": [common_task.code],
+                "failedNode": [common_task.code],
+            },
             "waitStartTimeout": {},
         },
         "flag": "YES",
@@ -385,7 +388,9 @@ def test_dependent_get_define(mock_condition_code_version, mock_task_code_versio
         "timeout": 0,
     }
 
-    task = Conditions(name, condition=cond_operator)
+    task = Conditions(
+        name, condition=cond_operator, success_task=common_task, failed_task=common_task
+    )
     assert task.get_define() == expect
 
 
@@ -396,49 +401,60 @@ def test_dependent_get_define(mock_condition_code_version, mock_task_code_versio
 def test_condition_set_dep_workflow(mock_task_code_version):
     """Test task condition set dependence in workflow level."""
     with ProcessDefinition(name="test-condition-set-dep-workflow") as pd:
-        condition_pre_task_1 = Task(name="pre_task_success_1", task_type=TEST_TYPE)
-        condition_pre_task_2 = Task(name="pre_task_success_2", task_type=TEST_TYPE)
-        condition_pre_task_3 = Task(name="pre_task_fail", task_type=TEST_TYPE)
+        pre_task_1 = Task(name="pre_task_1", task_type=TEST_TYPE)
+        pre_task_2 = Task(name="pre_task_2", task_type=TEST_TYPE)
+        pre_task_3 = Task(name="pre_task_3", task_type=TEST_TYPE)
         cond_operator = And(
             And(
-                SUCCESS(condition_pre_task_1, condition_pre_task_2),
-                FAILURE(condition_pre_task_3),
+                SUCCESS(pre_task_1, pre_task_2),
+                FAILURE(pre_task_3),
             ),
         )
-        end = Task(name="end", task_type=TEST_TYPE)
 
-        condition = Conditions(name="conditions", condition=cond_operator)
-        condition >> end
+        success_branch = Task(name="success_branch", task_type=TEST_TYPE)
+        fail_branch = Task(name="fail_branch", task_type=TEST_TYPE)
+
+        condition = Conditions(
+            name="conditions",
+            condition=cond_operator,
+            success_task=success_branch,
+            failed_task=fail_branch,
+        )
 
         # General tasks test
-        assert len(pd.tasks) == 5
+        assert len(pd.tasks) == 6
         assert sorted(pd.task_list, key=lambda t: t.name) == sorted(
             [
+                pre_task_1,
+                pre_task_2,
+                pre_task_3,
+                success_branch,
+                fail_branch,
                 condition,
-                condition_pre_task_1,
-                condition_pre_task_2,
-                condition_pre_task_3,
-                end,
             ],
             key=lambda t: t.name,
         )
         # Task dep test
-        assert end._upstream_task_codes == {condition.code}
-        assert condition._downstream_task_codes == {end.code}
+        assert success_branch._upstream_task_codes == {condition.code}
+        assert fail_branch._upstream_task_codes == {condition.code}
+        assert condition._downstream_task_codes == {
+            success_branch.code,
+            fail_branch.code,
+        }
 
         # Condition task dep after ProcessDefinition function get_define called
         assert condition._upstream_task_codes == {
-            condition_pre_task_1.code,
-            condition_pre_task_2.code,
-            condition_pre_task_3.code,
+            pre_task_1.code,
+            pre_task_2.code,
+            pre_task_3.code,
         }
         assert all(
             [
                 child._downstream_task_codes == {condition.code}
                 for child in [
-                    condition_pre_task_1,
-                    condition_pre_task_2,
-                    condition_pre_task_3,
+                    pre_task_1,
+                    pre_task_2,
+                    pre_task_3,
                 ]
             ]
         )