Kaynağa Gözat

[Feature-7349] [Python]Add workflow as code task type datax (#7437)

* support datax task by python
* add task datax example

Co-authored-by: Jiajie Zhong <zhongjiajie955@gmail.com>
Devosend 3 yıl önce
ebeveyn
işleme
0c7aa4e2c5

+ 50 - 0
dolphinscheduler-python/pydolphinscheduler/examples/task_datax_example.py

@@ -0,0 +1,50 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+A example workflow for task datax.
+
+This example will create a workflow named `task_datax`.
+`task_datax` is true workflow define and run task task_datax.
+You can create data sources `first_mysql` and `first_mysql` through UI.
+It creates a task to synchronize datax from the source database to the target database.
+"""
+
+
+from pydolphinscheduler.core.process_definition import ProcessDefinition
+from pydolphinscheduler.tasks.datax import CustomDataX, DataX
+
+# datax json template
+JSON_TEMPLATE = ""
+
+with ProcessDefinition(
+    name="task_datax",
+    tenant="tenant_exists",
+) as pd:
+    # This task synchronizes the data in `t_ds_project`
+    # of `first_mysql` database to `target_project` of `second_mysql` database.
+    task1 = DataX(
+        name="task_datax",
+        datasource_name="first_mysql",
+        datatarget_name="second_mysql",
+        sql="select id, name, code, description from source_table",
+        target_table="target_table",
+    )
+
+    # you can custom json_template of datax to sync data.
+    task2 = CustomDataX(name="task_custom_datax", json=JSON_TEMPLATE)
+    pd.run()

+ 1 - 0
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py

@@ -73,6 +73,7 @@ class TaskType(str):
     SQL = "SQL"
     SUB_PROCESS = "SUB_PROCESS"
     PROCEDURE = "PROCEDURE"
+    DATAX = "DATAX"
     DEPENDENT = "DEPENDENT"
 
 

+ 56 - 0
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/database.py

@@ -0,0 +1,56 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Module database."""
+
+from typing import Dict
+
+from pydolphinscheduler.java_gateway import launch_gateway
+
+
+class Database(dict):
+    """database object, get information about database.
+
+    You provider database_name contain connection information, it decisions which
+    database type and database instance would run task.
+    """
+
+    def __init__(self, database_name: str, type_key, database_key, *args, **kwargs):
+        super().__init__(*args, **kwargs)
+        self._database = {}
+        self.database_name = database_name
+        self[type_key] = self.database_type
+        self[database_key] = self.database_id
+
+    @property
+    def database_type(self) -> str:
+        """Get database type from java gateway, a wrapper for :func:`get_database_info`."""
+        return self.get_database_info(self.database_name).get("type")
+
+    @property
+    def database_id(self) -> str:
+        """Get database id from java gateway, a wrapper for :func:`get_database_info`."""
+        return self.get_database_info(self.database_name).get("id")
+
+    def get_database_info(self, name) -> Dict:
+        """Get database info from java gateway, contains database id, type, name."""
+        if self._database:
+            return self._database
+        else:
+            gateway = launch_gateway()
+            self._database = gateway.entry_point.getDatasourceInfo(name)
+            return self._database

+ 0 - 80
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/database.py

@@ -1,80 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-"""Task database base task."""
-
-from typing import Dict
-
-from pydolphinscheduler.core.task import Task
-from pydolphinscheduler.java_gateway import launch_gateway
-
-
-class Database(Task):
-    """Base task to handle database, declare behavior for the base handler of database.
-
-    It a parent class for all database task of dolphinscheduler. And it should run sql like
-    job in multiply sql lik engine, such as:
-    - ClickHouse
-    - DB2
-    - HIVE
-    - MySQL
-    - Oracle
-    - Postgresql
-    - Presto
-    - SQLServer
-    You provider datasource_name contain connection information, it decisions which
-    database type and database instance would run this sql.
-    """
-
-    def __init__(
-        self, task_type: str, name: str, datasource_name: str, *args, **kwargs
-    ):
-        super().__init__(name, task_type, *args, **kwargs)
-        self.datasource_name = datasource_name
-        self._datasource = {}
-
-    def get_datasource_type(self) -> str:
-        """Get datasource type from java gateway, a wrapper for :func:`get_datasource_info`."""
-        return self.get_datasource_info(self.datasource_name).get("type")
-
-    def get_datasource_id(self) -> str:
-        """Get datasource id from java gateway, a wrapper for :func:`get_datasource_info`."""
-        return self.get_datasource_info(self.datasource_name).get("id")
-
-    def get_datasource_info(self, name) -> Dict:
-        """Get datasource info from java gateway, contains datasource id, type, name."""
-        if self._datasource:
-            return self._datasource
-        else:
-            gateway = launch_gateway()
-            self._datasource = gateway.entry_point.getDatasourceInfo(name)
-            return self._datasource
-
-    @property
-    def task_params(self, camel_attr: bool = True, custom_attr: set = None) -> Dict:
-        """Override Task.task_params for sql task.
-
-        Sql task have some specials attribute for task_params, and is odd if we
-        directly set as python property, so we Override Task.task_params here.
-        """
-        params = super().task_params
-        custom_params = {
-            "type": self.get_datasource_type(),
-            "datasource": self.get_datasource_id(),
-        }
-        params.update(custom_params)
-        return params

+ 121 - 0
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/datax.py

@@ -0,0 +1,121 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Task datax."""
+
+from typing import Dict, List, Optional
+
+from pydolphinscheduler.constants import TaskType
+from pydolphinscheduler.core.database import Database
+from pydolphinscheduler.core.task import Task
+
+
+class CustomDataX(Task):
+    """Task CustomDatax object, declare behavior for custom DataX task to dolphinscheduler.
+
+    You provider json template for DataX, it can synchronize data according to the template you provided.
+    """
+
+    CUSTOM_CONFIG = 1
+
+    _task_custom_attr = {"custom_config", "json", "xms", "xmx"}
+
+    def __init__(
+        self,
+        name: str,
+        json: str,
+        xms: Optional[int] = 1,
+        xmx: Optional[int] = 1,
+        *args,
+        **kwargs
+    ):
+        super().__init__(name, TaskType.DATAX, *args, **kwargs)
+        self.custom_config = self.CUSTOM_CONFIG
+        self.json = json
+        self.xms = xms
+        self.xmx = xmx
+
+
+class DataX(Task):
+    """Task DataX object, declare behavior for DataX task to dolphinscheduler.
+
+    It should run database datax job in multiply sql link engine, such as:
+    - MySQL
+    - Oracle
+    - Postgresql
+    - SQLServer
+    You provider datasource_name and datatarget_name contain connection information, it decisions which
+    database type and database instance would synchronous data.
+    """
+
+    CUSTOM_CONFIG = 0
+
+    _task_custom_attr = {
+        "custom_config",
+        "sql",
+        "target_table",
+        "job_speed_byte",
+        "job_speed_record",
+        "pre_statements",
+        "post_statements",
+        "xms",
+        "xmx",
+    }
+
+    def __init__(
+        self,
+        name: str,
+        datasource_name: str,
+        datatarget_name: str,
+        sql: str,
+        target_table: str,
+        job_speed_byte: Optional[int] = 0,
+        job_speed_record: Optional[int] = 1000,
+        pre_statements: Optional[List[str]] = None,
+        post_statements: Optional[List[str]] = None,
+        xms: Optional[int] = 1,
+        xmx: Optional[int] = 1,
+        *args,
+        **kwargs
+    ):
+        super().__init__(name, TaskType.DATAX, *args, **kwargs)
+        self.sql = sql
+        self.custom_config = self.CUSTOM_CONFIG
+        self.datasource_name = datasource_name
+        self.datatarget_name = datatarget_name
+        self.target_table = target_table
+        self.job_speed_byte = job_speed_byte
+        self.job_speed_record = job_speed_record
+        self.pre_statements = pre_statements or []
+        self.post_statements = post_statements or []
+        self.xms = xms
+        self.xmx = xmx
+
+    @property
+    def task_params(self, camel_attr: bool = True, custom_attr: set = None) -> Dict:
+        """Override Task.task_params for datax task.
+
+        datax task have some specials attribute for task_params, and is odd if we
+        directly set as python property, so we Override Task.task_params here.
+        """
+        params = super().task_params
+        datasource = Database(self.datasource_name, "dsType", "dataSource")
+        params.update(datasource)
+
+        datatarget = Database(self.datatarget_name, "dtType", "dataTarget")
+        params.update(datatarget)
+        return params

+ 19 - 3
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/procedure.py

@@ -17,11 +17,14 @@
 
 """Task procedure."""
 
+from typing import Dict
+
 from pydolphinscheduler.constants import TaskType
-from pydolphinscheduler.tasks.database import Database
+from pydolphinscheduler.core.database import Database
+from pydolphinscheduler.core.task import Task
 
 
-class Procedure(Database):
+class Procedure(Task):
     """Task Procedure object, declare behavior for Procedure task to dolphinscheduler.
 
     It should run database procedure job in multiply sql lik engine, such as:
@@ -40,5 +43,18 @@ class Procedure(Database):
     _task_custom_attr = {"method"}
 
     def __init__(self, name: str, datasource_name: str, method: str, *args, **kwargs):
-        super().__init__(TaskType.PROCEDURE, name, datasource_name, *args, **kwargs)
+        super().__init__(name, TaskType.PROCEDURE, *args, **kwargs)
+        self.datasource_name = datasource_name
         self.method = method
+
+    @property
+    def task_params(self, camel_attr: bool = True, custom_attr: set = None) -> Dict:
+        """Override Task.task_params for produce task.
+
+        produce task have some specials attribute for task_params, and is odd if we
+        directly set as python property, so we Override Task.task_params here.
+        """
+        params = super().task_params
+        datasource = Database(self.datasource_name, "type", "datasource")
+        params.update(datasource)
+        return params

+ 18 - 4
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/sql.py

@@ -18,10 +18,11 @@
 """Task sql."""
 
 import re
-from typing import Optional
+from typing import Dict, Optional
 
 from pydolphinscheduler.constants import TaskType
-from pydolphinscheduler.tasks.database import Database
+from pydolphinscheduler.core.database import Database
+from pydolphinscheduler.core.task import Task
 
 
 class SqlType:
@@ -31,7 +32,7 @@ class SqlType:
     NOT_SELECT = 1
 
 
-class Sql(Database):
+class Sql(Task):
     """Task SQL object, declare behavior for SQL task to dolphinscheduler.
 
     It should run sql job in multiply sql lik engine, such as:
@@ -66,8 +67,9 @@ class Sql(Database):
         *args,
         **kwargs
     ):
-        super().__init__(TaskType.SQL, name, datasource_name, *args, **kwargs)
+        super().__init__(name, TaskType.SQL, *args, **kwargs)
         self.sql = sql
+        self.datasource_name = datasource_name
         self.pre_statements = pre_statements or []
         self.post_statements = post_statements or []
         self.display_rows = display_rows
@@ -83,3 +85,15 @@ class Sql(Database):
             return SqlType.NOT_SELECT
         else:
             return SqlType.SELECT
+
+    @property
+    def task_params(self, camel_attr: bool = True, custom_attr: set = None) -> Dict:
+        """Override Task.task_params for sql task.
+
+        sql task have some specials attribute for task_params, and is odd if we
+        directly set as python property, so we Override Task.task_params here.
+        """
+        params = super().task_params
+        datasource = Database(self.datasource_name, "type", "datasource")
+        params.update(datasource)
+        return params

+ 54 - 0
dolphinscheduler-python/pydolphinscheduler/tests/core/test_database.py

@@ -0,0 +1,54 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Test Database."""
+
+
+from unittest.mock import patch
+
+import pytest
+
+from pydolphinscheduler.core.database import Database
+
+TEST_DATABASE_DATASOURCE_NAME = "test_datasource"
+TEST_DATABASE_TYPE_KEY = "type"
+TEST_DATABASE_KEY = "datasource"
+
+
+@pytest.mark.parametrize(
+    "expect",
+    [
+        {
+            TEST_DATABASE_TYPE_KEY: "mock_type",
+            TEST_DATABASE_KEY: 1,
+        }
+    ],
+)
+@patch(
+    "pydolphinscheduler.core.task.Task.gen_code_and_version",
+    return_value=(123, 1),
+)
+@patch(
+    "pydolphinscheduler.core.database.Database.get_database_info",
+    return_value=({"id": 1, "type": "mock_type"}),
+)
+def test_get_datasource_detail(mock_datasource, mock_code_version, expect):
+    """Test :func:`get_database_type` and :func:`get_database_id` can return expect value."""
+    database_info = Database(
+        TEST_DATABASE_DATASOURCE_NAME, TEST_DATABASE_TYPE_KEY, TEST_DATABASE_KEY
+    )
+    assert expect == database_info

+ 0 - 122
dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_database.py

@@ -1,122 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-"""Test Task Database."""
-
-
-from unittest.mock import patch
-
-import pytest
-
-from pydolphinscheduler.tasks.database import Database
-
-TEST_DATABASE_TASK_TYPE = "SQL"
-TEST_DATABASE_SQL = "select 1"
-TEST_DATABASE_DATASOURCE_NAME = "test_datasource"
-
-
-@patch(
-    "pydolphinscheduler.core.task.Task.gen_code_and_version",
-    return_value=(123, 1),
-)
-@patch(
-    "pydolphinscheduler.tasks.database.Database.get_datasource_info",
-    return_value=({"id": 1, "type": "mock_type"}),
-)
-def test_get_datasource_detail(mock_datasource, mock_code_version):
-    """Test :func:`get_datasource_type` and :func:`get_datasource_id` can return expect value."""
-    name = "test_get_database_detail"
-    task = Database(
-        TEST_DATABASE_TASK_TYPE, name, TEST_DATABASE_DATASOURCE_NAME, TEST_DATABASE_SQL
-    )
-    assert 1 == task.get_datasource_id()
-    assert "mock_type" == task.get_datasource_type()
-
-
-@pytest.mark.parametrize(
-    "attr, expect",
-    [
-        (
-            {
-                "task_type": TEST_DATABASE_TASK_TYPE,
-                "name": "test-task-params",
-                "datasource_name": TEST_DATABASE_DATASOURCE_NAME,
-            },
-            {
-                "type": "MYSQL",
-                "datasource": 1,
-                "localParams": [],
-                "resourceList": [],
-                "dependence": {},
-                "waitStartTimeout": {},
-                "conditionResult": {"successNode": [""], "failedNode": [""]},
-            },
-        )
-    ],
-)
-@patch(
-    "pydolphinscheduler.core.task.Task.gen_code_and_version",
-    return_value=(123, 1),
-)
-@patch(
-    "pydolphinscheduler.tasks.database.Database.get_datasource_info",
-    return_value=({"id": 1, "type": "MYSQL"}),
-)
-def test_property_task_params(mock_datasource, mock_code_version, attr, expect):
-    """Test task database task property."""
-    task = Database(**attr)
-    assert expect == task.task_params
-
-
-@patch(
-    "pydolphinscheduler.core.task.Task.gen_code_and_version",
-    return_value=(123, 1),
-)
-@patch(
-    "pydolphinscheduler.tasks.database.Database.get_datasource_info",
-    return_value=({"id": 1, "type": "MYSQL"}),
-)
-def test_database_get_define(mock_datasource, mock_code_version):
-    """Test task database function get_define."""
-    name = "test_database_get_define"
-    expect = {
-        "code": 123,
-        "name": name,
-        "version": 1,
-        "description": None,
-        "delayTime": 0,
-        "taskType": TEST_DATABASE_TASK_TYPE,
-        "taskParams": {
-            "type": "MYSQL",
-            "datasource": 1,
-            "localParams": [],
-            "resourceList": [],
-            "dependence": {},
-            "conditionResult": {"successNode": [""], "failedNode": [""]},
-            "waitStartTimeout": {},
-        },
-        "flag": "YES",
-        "taskPriority": "MEDIUM",
-        "workerGroup": "default",
-        "failRetryTimes": 0,
-        "failRetryInterval": 1,
-        "timeoutFlag": "CLOSE",
-        "timeoutNotifyStrategy": None,
-        "timeout": 0,
-    }
-    task = Database(TEST_DATABASE_TASK_TYPE, name, TEST_DATABASE_DATASOURCE_NAME)
-    assert task.get_define() == expect

+ 124 - 0
dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_datax.py

@@ -0,0 +1,124 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Test Task DataX."""
+
+from unittest.mock import patch
+
+import pytest
+
+from pydolphinscheduler.tasks.datax import CustomDataX, DataX
+
+
+@patch(
+    "pydolphinscheduler.core.database.Database.get_database_info",
+    return_value=({"id": 1, "type": "MYSQL"}),
+)
+def test_datax_get_define(mock_datasource):
+    """Test task datax function get_define."""
+    code = 123
+    version = 1
+    name = "test_datax_get_define"
+    command = "select name from test_source_table_name"
+    datasource_name = "test_datasource"
+    datatarget_name = "test_datatarget"
+    target_table = "test_target_table_name"
+    expect = {
+        "code": code,
+        "name": name,
+        "version": 1,
+        "description": None,
+        "delayTime": 0,
+        "taskType": "DATAX",
+        "taskParams": {
+            "customConfig": 0,
+            "dsType": "MYSQL",
+            "dataSource": 1,
+            "dtType": "MYSQL",
+            "dataTarget": 1,
+            "sql": command,
+            "targetTable": target_table,
+            "jobSpeedByte": 0,
+            "jobSpeedRecord": 1000,
+            "xms": 1,
+            "xmx": 1,
+            "preStatements": [],
+            "postStatements": [],
+            "localParams": [],
+            "resourceList": [],
+            "dependence": {},
+            "conditionResult": {"successNode": [""], "failedNode": [""]},
+            "waitStartTimeout": {},
+        },
+        "flag": "YES",
+        "taskPriority": "MEDIUM",
+        "workerGroup": "default",
+        "failRetryTimes": 0,
+        "failRetryInterval": 1,
+        "timeoutFlag": "CLOSE",
+        "timeoutNotifyStrategy": None,
+        "timeout": 0,
+    }
+    with patch(
+        "pydolphinscheduler.core.task.Task.gen_code_and_version",
+        return_value=(code, version),
+    ):
+        task = DataX(name, datasource_name, datatarget_name, command, target_table)
+        assert task.get_define() == expect
+
+
+@pytest.mark.parametrize("json_template", ["json_template"])
+def test_custom_datax_get_define(json_template):
+    """Test task custom datax function get_define."""
+    code = 123
+    version = 1
+    name = "test_custom_datax_get_define"
+    expect = {
+        "code": code,
+        "name": name,
+        "version": 1,
+        "description": None,
+        "delayTime": 0,
+        "taskType": "DATAX",
+        "taskParams": {
+            "customConfig": 1,
+            "json": json_template,
+            "xms": 1,
+            "xmx": 1,
+            "localParams": [],
+            "resourceList": [],
+            "dependence": {},
+            "conditionResult": {"successNode": [""], "failedNode": [""]},
+            "waitStartTimeout": {},
+        },
+        "flag": "YES",
+        "taskPriority": "MEDIUM",
+        "workerGroup": "default",
+        "failRetryTimes": 0,
+        "failRetryInterval": 1,
+        "timeoutFlag": "CLOSE",
+        "timeoutNotifyStrategy": None,
+        "timeout": 0,
+    }
+    with patch(
+        "pydolphinscheduler.core.task.Task.gen_code_and_version",
+        return_value=(code, version),
+    ):
+        task = CustomDataX(name, json_template)
+        print(task.get_define())
+        print(expect)
+        assert task.get_define() == expect

+ 2 - 18
dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_procedure.py

@@ -29,22 +29,6 @@ TEST_PROCEDURE_SQL = (
 TEST_PROCEDURE_DATASOURCE_NAME = "test_datasource"
 
 
-@patch(
-    "pydolphinscheduler.core.task.Task.gen_code_and_version",
-    return_value=(123, 1),
-)
-@patch(
-    "pydolphinscheduler.tasks.procedure.Procedure.get_datasource_info",
-    return_value=({"id": 1, "type": "mock_type"}),
-)
-def test_get_datasource_detail(mock_datasource, mock_code_version):
-    """Test :func:`get_datasource_type` and :func:`get_datasource_id` can return expect value."""
-    name = "test_get_datasource_detail"
-    task = Procedure(name, TEST_PROCEDURE_DATASOURCE_NAME, TEST_PROCEDURE_SQL)
-    assert 1 == task.get_datasource_id()
-    assert "mock_type" == task.get_datasource_type()
-
-
 @pytest.mark.parametrize(
     "attr, expect",
     [
@@ -72,7 +56,7 @@ def test_get_datasource_detail(mock_datasource, mock_code_version):
     return_value=(123, 1),
 )
 @patch(
-    "pydolphinscheduler.tasks.procedure.Procedure.get_datasource_info",
+    "pydolphinscheduler.core.database.Database.get_database_info",
     return_value=({"id": 1, "type": "MYSQL"}),
 )
 def test_property_task_params(mock_datasource, mock_code_version, attr, expect):
@@ -86,7 +70,7 @@ def test_property_task_params(mock_datasource, mock_code_version, attr, expect):
     return_value=(123, 1),
 )
 @patch(
-    "pydolphinscheduler.tasks.procedure.Procedure.get_datasource_info",
+    "pydolphinscheduler.core.database.Database.get_database_info",
     return_value=({"id": 1, "type": "MYSQL"}),
 )
 def test_sql_get_define(mock_datasource, mock_code_version):

+ 3 - 21
dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sql.py

@@ -25,24 +25,6 @@ import pytest
 from pydolphinscheduler.tasks.sql import Sql, SqlType
 
 
-@patch(
-    "pydolphinscheduler.core.task.Task.gen_code_and_version",
-    return_value=(123, 1),
-)
-@patch(
-    "pydolphinscheduler.tasks.sql.Sql.get_datasource_info",
-    return_value=({"id": 1, "type": "mock_type"}),
-)
-def test_get_datasource_detail(mock_datasource, mock_code_version):
-    """Test :func:`get_datasource_type` and :func:`get_datasource_id` can return expect value."""
-    name = "test_get_sql_type"
-    datasource_name = "test_datasource"
-    sql = "select 1"
-    task = Sql(name, datasource_name, sql)
-    assert 1 == task.get_datasource_id()
-    assert "mock_type" == task.get_datasource_type()
-
-
 @pytest.mark.parametrize(
     "sql, sql_type",
     [
@@ -69,7 +51,7 @@ def test_get_datasource_detail(mock_datasource, mock_code_version):
     return_value=(123, 1),
 )
 @patch(
-    "pydolphinscheduler.tasks.sql.Sql.get_datasource_info",
+    "pydolphinscheduler.core.database.Database.get_database_info",
     return_value=({"id": 1, "type": "mock_type"}),
 )
 def test_get_sql_type(mock_datasource, mock_code_version, sql, sql_type):
@@ -109,7 +91,7 @@ def test_get_sql_type(mock_datasource, mock_code_version, sql, sql_type):
     return_value=(123, 1),
 )
 @patch(
-    "pydolphinscheduler.tasks.sql.Sql.get_datasource_info",
+    "pydolphinscheduler.core.database.Database.get_database_info",
     return_value=({"id": 1, "type": "MYSQL"}),
 )
 def test_property_task_params(mock_datasource, mock_code_version, attr, expect):
@@ -119,7 +101,7 @@ def test_property_task_params(mock_datasource, mock_code_version, attr, expect):
 
 
 @patch(
-    "pydolphinscheduler.tasks.sql.Sql.get_datasource_info",
+    "pydolphinscheduler.core.database.Database.get_database_info",
     return_value=({"id": 1, "type": "MYSQL"}),
 )
 def test_sql_get_define(mock_datasource):