test_process_definition.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414
  1. # Licensed to the Apache Software Foundation (ASF) under one
  2. # or more contributor license agreements. See the NOTICE file
  3. # distributed with this work for additional information
  4. # regarding copyright ownership. The ASF licenses this file
  5. # to you under the Apache License, Version 2.0 (the
  6. # "License"); you may not use this file except in compliance
  7. # with the License. You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing,
  12. # software distributed under the License is distributed on an
  13. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  14. # KIND, either express or implied. See the License for the
  15. # specific language governing permissions and limitations
  16. # under the License.
  17. """Test process definition."""
  18. from datetime import datetime
  19. from typing import Any
  20. from unittest.mock import patch
  21. import pytest
  22. from freezegun import freeze_time
  23. from pydolphinscheduler.constants import ProcessDefinitionReleaseState
  24. from pydolphinscheduler.core import configuration
  25. from pydolphinscheduler.core.process_definition import ProcessDefinition
  26. from pydolphinscheduler.exceptions import PyDSParamException
  27. from pydolphinscheduler.side import Project, Tenant, User
  28. from pydolphinscheduler.tasks.switch import Branch, Default, Switch, SwitchCondition
  29. from pydolphinscheduler.utils.date import conv_to_schedule
  30. from tests.testing.task import Task
  31. TEST_PROCESS_DEFINITION_NAME = "simple-test-process-definition"
  32. TEST_TASK_TYPE = "test-task-type"
  33. @pytest.mark.parametrize("func", ["run", "submit", "start"])
  34. def test_process_definition_key_attr(func):
  35. """Test process definition have specific functions or attributes."""
  36. with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
  37. assert hasattr(
  38. pd, func
  39. ), f"ProcessDefinition instance don't have attribute `{func}`"
  40. @pytest.mark.parametrize(
  41. "name,value",
  42. [
  43. ("timezone", configuration.WORKFLOW_TIME_ZONE),
  44. ("project", Project(configuration.WORKFLOW_PROJECT)),
  45. ("tenant", Tenant(configuration.WORKFLOW_TENANT)),
  46. (
  47. "user",
  48. User(
  49. configuration.USER_NAME,
  50. configuration.USER_PASSWORD,
  51. configuration.USER_EMAIL,
  52. configuration.USER_PHONE,
  53. configuration.WORKFLOW_TENANT,
  54. configuration.WORKFLOW_QUEUE,
  55. configuration.USER_STATE,
  56. ),
  57. ),
  58. ("worker_group", configuration.WORKFLOW_WORKER_GROUP),
  59. ("release_state", ProcessDefinitionReleaseState.ONLINE),
  60. ],
  61. )
  62. def test_process_definition_default_value(name, value):
  63. """Test process definition default attributes."""
  64. with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
  65. assert getattr(pd, name) == value, (
  66. f"ProcessDefinition instance attribute `{name}` not with "
  67. f"except default value `{getattr(pd, name)}`"
  68. )
  69. @pytest.mark.parametrize(
  70. "name,cls,expect",
  71. [
  72. ("name", str, "name"),
  73. ("description", str, "description"),
  74. ("schedule", str, "schedule"),
  75. ("timezone", str, "timezone"),
  76. ("worker_group", str, "worker_group"),
  77. ("timeout", int, 1),
  78. ("release_state", str, "OFFLINE"),
  79. ("param", dict, {"key": "value"}),
  80. ],
  81. )
  82. def test_set_attr(name, cls, expect):
  83. """Test process definition set attributes which get with same type."""
  84. with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
  85. setattr(pd, name, expect)
  86. assert (
  87. getattr(pd, name) == expect
  88. ), f"ProcessDefinition set attribute `{name}` do not work expect"
  89. @pytest.mark.parametrize(
  90. "set_attr,set_val,get_attr,get_val",
  91. [
  92. ("_project", "project", "project", Project("project")),
  93. ("_tenant", "tenant", "tenant", Tenant("tenant")),
  94. ("_start_time", "2021-01-01", "start_time", datetime(2021, 1, 1)),
  95. ("_end_time", "2021-01-01", "end_time", datetime(2021, 1, 1)),
  96. ],
  97. )
  98. def test_set_attr_return_special_object(set_attr, set_val, get_attr, get_val):
  99. """Test process definition set attributes which get with different type."""
  100. with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
  101. setattr(pd, set_attr, set_val)
  102. assert get_val == getattr(
  103. pd, get_attr
  104. ), f"Set attribute {set_attr} can not get back with {get_val}."
  105. @pytest.mark.parametrize(
  106. "val,expect",
  107. [
  108. (datetime(2021, 1, 1), datetime(2021, 1, 1)),
  109. (None, None),
  110. ("2021-01-01", datetime(2021, 1, 1)),
  111. ("2021-01-01 01:01:01", datetime(2021, 1, 1, 1, 1, 1)),
  112. ],
  113. )
  114. def test__parse_datetime(val, expect):
  115. """Test process definition function _parse_datetime.
  116. Only two datetime test cases here because we have more test cases in tests/utils/test_date.py file.
  117. """
  118. with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
  119. assert expect == pd._parse_datetime(
  120. val
  121. ), f"Function _parse_datetime with unexpect value by {val}."
  122. @pytest.mark.parametrize(
  123. "val",
  124. [
  125. 20210101,
  126. (2021, 1, 1),
  127. {"year": "2021", "month": "1", "day": 1},
  128. ],
  129. )
  130. def test__parse_datetime_not_support_type(val: Any):
  131. """Test process definition function _parse_datetime not support type error."""
  132. with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
  133. with pytest.raises(PyDSParamException, match="Do not support value type.*?"):
  134. pd._parse_datetime(val)
  135. @pytest.mark.parametrize(
  136. "param, expect",
  137. [
  138. (
  139. None,
  140. [],
  141. ),
  142. (
  143. {},
  144. [],
  145. ),
  146. (
  147. {"key1": "val1"},
  148. [
  149. {
  150. "prop": "key1",
  151. "direct": "IN",
  152. "type": "VARCHAR",
  153. "value": "val1",
  154. }
  155. ],
  156. ),
  157. (
  158. {
  159. "key1": "val1",
  160. "key2": "val2",
  161. },
  162. [
  163. {
  164. "prop": "key1",
  165. "direct": "IN",
  166. "type": "VARCHAR",
  167. "value": "val1",
  168. },
  169. {
  170. "prop": "key2",
  171. "direct": "IN",
  172. "type": "VARCHAR",
  173. "value": "val2",
  174. },
  175. ],
  176. ),
  177. ],
  178. )
  179. def test_property_param_json(param, expect):
  180. """Test ProcessDefinition's property param_json."""
  181. pd = ProcessDefinition(TEST_PROCESS_DEFINITION_NAME, param=param)
  182. assert pd.param_json == expect
  183. @patch(
  184. "pydolphinscheduler.core.task.Task.gen_code_and_version",
  185. return_value=(123, 1),
  186. )
  187. def test__pre_submit_check_switch_without_param(mock_code_version):
  188. """Test :func:`_pre_submit_check` if process definition with switch but without attribute param."""
  189. with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
  190. parent = Task(name="parent", task_type=TEST_TASK_TYPE)
  191. switch_child_1 = Task(name="switch_child_1", task_type=TEST_TASK_TYPE)
  192. switch_child_2 = Task(name="switch_child_2", task_type=TEST_TASK_TYPE)
  193. switch_condition = SwitchCondition(
  194. Branch(condition="${var} > 1", task=switch_child_1),
  195. Default(task=switch_child_2),
  196. )
  197. switch = Switch(name="switch", condition=switch_condition)
  198. parent >> switch
  199. with pytest.raises(
  200. PyDSParamException,
  201. match="Parameter param must be provider if task Switch in process definition.",
  202. ):
  203. pd._pre_submit_check()
  204. def test_process_definition_get_define_without_task():
  205. """Test process definition function get_define without task."""
  206. expect = {
  207. "name": TEST_PROCESS_DEFINITION_NAME,
  208. "description": None,
  209. "project": configuration.WORKFLOW_PROJECT,
  210. "tenant": configuration.WORKFLOW_TENANT,
  211. "workerGroup": configuration.WORKFLOW_WORKER_GROUP,
  212. "timeout": 0,
  213. "releaseState": ProcessDefinitionReleaseState.ONLINE,
  214. "param": None,
  215. "tasks": {},
  216. "taskDefinitionJson": [{}],
  217. "taskRelationJson": [{}],
  218. }
  219. with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
  220. assert pd.get_define() == expect
  221. def test_process_definition_simple_context_manager():
  222. """Test simple create workflow in process definition context manager mode."""
  223. expect_tasks_num = 5
  224. with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
  225. for i in range(expect_tasks_num):
  226. curr_task = Task(name=f"task-{i}", task_type=f"type-{i}")
  227. # Set deps task i as i-1 parent
  228. if i > 0:
  229. pre_task = pd.get_one_task_by_name(f"task-{i - 1}")
  230. curr_task.set_upstream(pre_task)
  231. assert len(pd.tasks) == expect_tasks_num
  232. # Test if task process_definition same as origin one
  233. task: Task = pd.get_one_task_by_name("task-0")
  234. assert pd is task.process_definition
  235. # Test if all tasks with expect deps
  236. for i in range(expect_tasks_num):
  237. task: Task = pd.get_one_task_by_name(f"task-{i}")
  238. if i == 0:
  239. assert task._upstream_task_codes == set()
  240. assert task._downstream_task_codes == {
  241. pd.get_one_task_by_name("task-1").code
  242. }
  243. elif i == expect_tasks_num - 1:
  244. assert task._upstream_task_codes == {
  245. pd.get_one_task_by_name(f"task-{i - 1}").code
  246. }
  247. assert task._downstream_task_codes == set()
  248. else:
  249. assert task._upstream_task_codes == {
  250. pd.get_one_task_by_name(f"task-{i - 1}").code
  251. }
  252. assert task._downstream_task_codes == {
  253. pd.get_one_task_by_name(f"task-{i + 1}").code
  254. }
  255. def test_process_definition_simple_separate():
  256. """Test process definition simple create workflow in separate mode.
  257. This test just test basic information, cause most of test case is duplicate to
  258. test_process_definition_simple_context_manager.
  259. """
  260. expect_tasks_num = 5
  261. pd = ProcessDefinition(TEST_PROCESS_DEFINITION_NAME)
  262. for i in range(expect_tasks_num):
  263. curr_task = Task(
  264. name=f"task-{i}",
  265. task_type=f"type-{i}",
  266. process_definition=pd,
  267. )
  268. # Set deps task i as i-1 parent
  269. if i > 0:
  270. pre_task = pd.get_one_task_by_name(f"task-{i - 1}")
  271. curr_task.set_upstream(pre_task)
  272. assert len(pd.tasks) == expect_tasks_num
  273. assert all(["task-" in task.name for task in pd.task_list])
  274. @pytest.mark.parametrize(
  275. "user_attrs",
  276. [
  277. {"tenant": "tenant_specific"},
  278. ],
  279. )
  280. def test_set_process_definition_user_attr(user_attrs):
  281. """Test user with correct attributes if we specific assigned to process definition object."""
  282. default_value = {
  283. "tenant": configuration.WORKFLOW_TENANT,
  284. }
  285. with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME, **user_attrs) as pd:
  286. user = pd.user
  287. for attr in default_value:
  288. # Get assigned attribute if we specific, else get default value
  289. except_attr = (
  290. user_attrs[attr] if attr in user_attrs else default_value[attr]
  291. )
  292. # Get actually attribute of user object
  293. actual_attr = getattr(user, attr)
  294. assert (
  295. except_attr == actual_attr
  296. ), f"Except attribute is {except_attr} but get {actual_attr}"
  297. def test_schedule_json_none_schedule():
  298. """Test function schedule_json with None as schedule."""
  299. with ProcessDefinition(
  300. TEST_PROCESS_DEFINITION_NAME,
  301. schedule=None,
  302. ) as pd:
  303. assert pd.schedule_json is None
  304. # We freeze time here, because we test start_time with None, and if will get datetime.datetime.now. If we do
  305. # not freeze time, it will cause flaky test here.
  306. @freeze_time("2021-01-01")
  307. @pytest.mark.parametrize(
  308. "start_time,end_time,expect_date",
  309. [
  310. (
  311. "20210101",
  312. "20210201",
  313. {"start_time": "2021-01-01 00:00:00", "end_time": "2021-02-01 00:00:00"},
  314. ),
  315. (
  316. "2021-01-01",
  317. "2021-02-01",
  318. {"start_time": "2021-01-01 00:00:00", "end_time": "2021-02-01 00:00:00"},
  319. ),
  320. (
  321. "2021/01/01",
  322. "2021/02/01",
  323. {"start_time": "2021-01-01 00:00:00", "end_time": "2021-02-01 00:00:00"},
  324. ),
  325. # Test mix pattern
  326. (
  327. "2021/01/01 01:01:01",
  328. "2021-02-02 02:02:02",
  329. {"start_time": "2021-01-01 01:01:01", "end_time": "2021-02-02 02:02:02"},
  330. ),
  331. (
  332. "2021/01/01 01:01:01",
  333. "20210202 020202",
  334. {"start_time": "2021-01-01 01:01:01", "end_time": "2021-02-02 02:02:02"},
  335. ),
  336. (
  337. "20210101 010101",
  338. "2021-02-02 02:02:02",
  339. {"start_time": "2021-01-01 01:01:01", "end_time": "2021-02-02 02:02:02"},
  340. ),
  341. # Test None value
  342. (
  343. "2021/01/01 01:02:03",
  344. None,
  345. {"start_time": "2021-01-01 01:02:03", "end_time": "9999-12-31 23:59:59"},
  346. ),
  347. (
  348. None,
  349. None,
  350. {
  351. "start_time": conv_to_schedule(datetime(2021, 1, 1)),
  352. "end_time": "9999-12-31 23:59:59",
  353. },
  354. ),
  355. ],
  356. )
  357. def test_schedule_json_start_and_end_time(start_time, end_time, expect_date):
  358. """Test function schedule_json about handle start_time and end_time.
  359. Only two datetime test cases here because we have more test cases in tests/utils/test_date.py file.
  360. """
  361. schedule = "0 0 0 * * ? *"
  362. expect = {
  363. "crontab": schedule,
  364. "startTime": expect_date["start_time"],
  365. "endTime": expect_date["end_time"],
  366. "timezoneId": configuration.WORKFLOW_TIME_ZONE,
  367. }
  368. with ProcessDefinition(
  369. TEST_PROCESS_DEFINITION_NAME,
  370. schedule=schedule,
  371. start_time=start_time,
  372. end_time=end_time,
  373. timezone=configuration.WORKFLOW_TIME_ZONE,
  374. ) as pd:
  375. assert pd.schedule_json == expect