123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106 |
- """Test Task Procedure."""
- from unittest.mock import patch
- import pytest
- from pydolphinscheduler.tasks.procedure import Procedure
- TEST_PROCEDURE_SQL = (
- 'create procedure HelloWorld() selece "hello world"; call HelloWorld();'
- )
- TEST_PROCEDURE_DATASOURCE_NAME = "test_datasource"
- @pytest.mark.parametrize(
- "attr, expect",
- [
- (
- {
- "name": "test-procedure-task-params",
- "datasource_name": TEST_PROCEDURE_DATASOURCE_NAME,
- "method": TEST_PROCEDURE_SQL,
- },
- {
- "method": TEST_PROCEDURE_SQL,
- "type": "MYSQL",
- "datasource": 1,
- "localParams": [],
- "resourceList": [],
- "dependence": {},
- "waitStartTimeout": {},
- "conditionResult": {"successNode": [""], "failedNode": [""]},
- },
- )
- ],
- )
- @patch(
- "pydolphinscheduler.core.task.Task.gen_code_and_version",
- return_value=(123, 1),
- )
- @patch(
- "pydolphinscheduler.core.database.Database.get_database_info",
- return_value=({"id": 1, "type": "MYSQL"}),
- )
- def test_property_task_params(mock_datasource, mock_code_version, attr, expect):
- """Test task sql task property."""
- task = Procedure(**attr)
- assert expect == task.task_params
- @patch(
- "pydolphinscheduler.core.task.Task.gen_code_and_version",
- return_value=(123, 1),
- )
- @patch(
- "pydolphinscheduler.core.database.Database.get_database_info",
- return_value=({"id": 1, "type": "MYSQL"}),
- )
- def test_sql_get_define(mock_datasource, mock_code_version):
- """Test task procedure function get_define."""
- name = "test_procedure_get_define"
- expect = {
- "code": 123,
- "name": name,
- "version": 1,
- "description": None,
- "delayTime": 0,
- "taskType": "PROCEDURE",
- "taskParams": {
- "type": "MYSQL",
- "datasource": 1,
- "method": TEST_PROCEDURE_SQL,
- "localParams": [],
- "resourceList": [],
- "dependence": {},
- "conditionResult": {"successNode": [""], "failedNode": [""]},
- "waitStartTimeout": {},
- },
- "flag": "YES",
- "taskPriority": "MEDIUM",
- "workerGroup": "default",
- "failRetryTimes": 0,
- "failRetryInterval": 1,
- "timeoutFlag": "CLOSE",
- "timeoutNotifyStrategy": None,
- "timeout": 0,
- }
- task = Procedure(name, TEST_PROCEDURE_DATASOURCE_NAME, TEST_PROCEDURE_SQL)
- assert task.get_define() == expect
|