Browse Source

[python] Add checker pd attr param and fix switch example (#7818)

* [python] Add checker pd attr param and fix switch example

* Correct submit self.param to java gateway
* Fix missing parameter for switch example
* Add mechanism checker before submit to java gateway

close: #7803

* Fix mock get task code and version

* Change judge statement and add comment
Jiajie Zhong 3 years ago
parent
commit
b1edce2b11

+ 1 - 2
dolphinscheduler-python/pydolphinscheduler/examples/task_switch_example.py

@@ -34,8 +34,7 @@ from pydolphinscheduler.tasks.shell import Shell
 from pydolphinscheduler.tasks.switch import Branch, Default, Switch, SwitchCondition
 
 with ProcessDefinition(
-    name="task_switch_example",
-    tenant="tenant_exists",
+    name="task_switch_example", tenant="tenant_exists", param={"var": "1"}
 ) as pd:
     parent = Shell(name="parent", command="echo parent")
     switch_child_1 = Shell(name="switch_child_1", command="echo switch_child_1")

+ 36 - 2
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py

@@ -24,6 +24,7 @@ from typing import Any, Dict, List, Optional, Set
 from pydolphinscheduler.constants import (
     ProcessDefinitionDefault,
     ProcessDefinitionReleaseState,
+    TaskType,
 )
 from pydolphinscheduler.core.base import Base
 from pydolphinscheduler.exceptions import PyDSParamException, PyDSTaskNoFoundException
@@ -97,7 +98,7 @@ class ProcessDefinition(Base):
         worker_group: Optional[str] = ProcessDefinitionDefault.WORKER_GROUP,
         timeout: Optional[int] = 0,
         release_state: Optional[str] = ProcessDefinitionReleaseState.ONLINE,
-        param: Optional[List] = None,
+        param: Optional[Dict] = None,
     ):
         super().__init__(name, description)
         self.schedule = schedule
@@ -189,6 +190,22 @@ class ProcessDefinition(Base):
         """Set attribute end_time."""
         self._end_time = val
 
+    @property
+    def param_json(self) -> Optional[List[Dict]]:
+        """Return param json base on self.param."""
+        # Handle empty dict and None value
+        if not self.param:
+            return None
+        return [
+            {
+                "prop": k,
+                "direct": "IN",
+                "type": "VARCHAR",
+                "value": v,
+            }
+            for k, v in self.param.items()
+        ]
+
     @property
     def task_definition_json(self) -> List[Dict]:
         """Return all tasks definition in list of dict."""
@@ -323,16 +340,33 @@ class ProcessDefinition(Base):
         # Project model need User object exists
         self.project.create_if_not_exists(self._user)
 
+    def _pre_submit_check(self):
+        """Check specific condition satisfy before.
+
+        This method should be called before process definition submit to java gateway
+        For now, we have below checker:
+        * `self.param` should be set if task `switch` in this workflow.
+        """
+        if (
+            any([task.task_type == TaskType.SWITCH for task in self.tasks.values()])
+            and self.param is None
+        ):
+            raise PyDSParamException(
+                "Parameter param must be provider if task Switch in process definition."
+            )
+
     def submit(self) -> int:
         """Submit ProcessDefinition instance to java gateway."""
         self._ensure_side_model_exists()
+        self._pre_submit_check()
+
         gateway = launch_gateway()
         self._process_definition_code = gateway.entry_point.createOrUpdateProcessDefinition(
             self._user,
             self._project,
             self.name,
             str(self.description) if self.description else "",
-            str(self.param) if self.param else None,
+            json.dumps(self.param_json),
             json.dumps(self.schedule_json) if self.schedule_json else None,
             json.dumps(self.task_location),
             self.timeout,

+ 77 - 0
dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py

@@ -19,6 +19,7 @@
 
 from datetime import datetime
 from typing import Any
+from unittest.mock import patch
 
 import pytest
 from freezegun import freeze_time
@@ -30,10 +31,12 @@ from pydolphinscheduler.constants import (
 from pydolphinscheduler.core.process_definition import ProcessDefinition
 from pydolphinscheduler.exceptions import PyDSParamException
 from pydolphinscheduler.side import Project, Tenant, User
+from pydolphinscheduler.tasks.switch import Branch, Default, Switch, SwitchCondition
 from pydolphinscheduler.utils.date import conv_to_schedule
 from tests.testing.task import Task
 
 TEST_PROCESS_DEFINITION_NAME = "simple-test-process-definition"
+TEST_TASK_TYPE = "test-task-type"
 
 
 @pytest.mark.parametrize("func", ["run", "submit", "start"])
@@ -151,6 +154,80 @@ def test__parse_datetime_not_support_type(val: Any):
             pd._parse_datetime(val)
 
 
+@pytest.mark.parametrize(
+    "param, expect",
+    [
+        (
+            None,
+            None,
+        ),
+        (
+            {},
+            None,
+        ),
+        (
+            {"key1": "val1"},
+            [
+                {
+                    "prop": "key1",
+                    "direct": "IN",
+                    "type": "VARCHAR",
+                    "value": "val1",
+                }
+            ],
+        ),
+        (
+            {
+                "key1": "val1",
+                "key2": "val2",
+            },
+            [
+                {
+                    "prop": "key1",
+                    "direct": "IN",
+                    "type": "VARCHAR",
+                    "value": "val1",
+                },
+                {
+                    "prop": "key2",
+                    "direct": "IN",
+                    "type": "VARCHAR",
+                    "value": "val2",
+                },
+            ],
+        ),
+    ],
+)
+def test_property_param_json(param, expect):
+    """Test ProcessDefinition's property param_json."""
+    pd = ProcessDefinition(TEST_PROCESS_DEFINITION_NAME, param=param)
+    assert pd.param_json == expect
+
+
+@patch(
+    "pydolphinscheduler.core.task.Task.gen_code_and_version",
+    return_value=(123, 1),
+)
+def test__pre_submit_check_switch_without_param(mock_code_version):
+    """Test :func:`_pre_submit_check` if process definition with switch but without attribute param."""
+    with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
+        parent = Task(name="parent", task_type=TEST_TASK_TYPE)
+        switch_child_1 = Task(name="switch_child_1", task_type=TEST_TASK_TYPE)
+        switch_child_2 = Task(name="switch_child_2", task_type=TEST_TASK_TYPE)
+        switch_condition = SwitchCondition(
+            Branch(condition="${var} > 1", task=switch_child_1),
+            Default(task=switch_child_2),
+        )
+
+        switch = Switch(name="switch", condition=switch_condition)
+        parent >> switch
+        with pytest.raises(
+            PyDSParamException,
+            match="Parameter param must be provider if task Switch in process definition.",
+        ):
+            pd._pre_submit_check()
+
+
 def test_process_definition_get_define_without_task():
     """Test process definition function get_define without task."""
     expect = {