test_process_definition.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419
  1. # Licensed to the Apache Software Foundation (ASF) under one
  2. # or more contributor license agreements. See the NOTICE file
  3. # distributed with this work for additional information
  4. # regarding copyright ownership. The ASF licenses this file
  5. # to you under the Apache License, Version 2.0 (the
  6. # "License"); you may not use this file except in compliance
  7. # with the License. You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing,
  12. # software distributed under the License is distributed on an
  13. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  14. # KIND, either express or implied. See the License for the
  15. # specific language governing permissions and limitations
  16. # under the License.
  17. """Test process definition."""
  18. from datetime import datetime
  19. from typing import Any
  20. from unittest.mock import patch
  21. import pytest
  22. from freezegun import freeze_time
  23. from pydolphinscheduler.constants import (
  24. ProcessDefinitionDefault,
  25. ProcessDefinitionReleaseState,
  26. )
  27. from pydolphinscheduler.core.process_definition import ProcessDefinition
  28. from pydolphinscheduler.exceptions import PyDSParamException
  29. from pydolphinscheduler.side import Project, Tenant, User
  30. from pydolphinscheduler.tasks.switch import Branch, Default, Switch, SwitchCondition
  31. from pydolphinscheduler.utils.date import conv_to_schedule
  32. from tests.testing.task import Task
  33. TEST_PROCESS_DEFINITION_NAME = "simple-test-process-definition"
  34. TEST_TASK_TYPE = "test-task-type"
  35. @pytest.mark.parametrize("func", ["run", "submit", "start"])
  36. def test_process_definition_key_attr(func):
  37. """Test process definition have specific functions or attributes."""
  38. with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
  39. assert hasattr(
  40. pd, func
  41. ), f"ProcessDefinition instance don't have attribute `{func}`"
  42. @pytest.mark.parametrize(
  43. "name,value",
  44. [
  45. ("timezone", ProcessDefinitionDefault.TIME_ZONE),
  46. ("project", Project(ProcessDefinitionDefault.PROJECT)),
  47. ("tenant", Tenant(ProcessDefinitionDefault.TENANT)),
  48. (
  49. "user",
  50. User(
  51. ProcessDefinitionDefault.USER,
  52. ProcessDefinitionDefault.USER_PWD,
  53. ProcessDefinitionDefault.USER_EMAIL,
  54. ProcessDefinitionDefault.USER_PHONE,
  55. ProcessDefinitionDefault.TENANT,
  56. ProcessDefinitionDefault.QUEUE,
  57. ProcessDefinitionDefault.USER_STATE,
  58. ),
  59. ),
  60. ("worker_group", ProcessDefinitionDefault.WORKER_GROUP),
  61. ("release_state", ProcessDefinitionReleaseState.ONLINE),
  62. ],
  63. )
  64. def test_process_definition_default_value(name, value):
  65. """Test process definition default attributes."""
  66. with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
  67. assert getattr(pd, name) == value, (
  68. f"ProcessDefinition instance attribute `{name}` not with "
  69. f"except default value `{getattr(pd, name)}`"
  70. )
  71. @pytest.mark.parametrize(
  72. "name,cls,expect",
  73. [
  74. ("name", str, "name"),
  75. ("description", str, "description"),
  76. ("schedule", str, "schedule"),
  77. ("timezone", str, "timezone"),
  78. ("worker_group", str, "worker_group"),
  79. ("timeout", int, 1),
  80. ("release_state", str, "OFFLINE"),
  81. ("param", dict, {"key": "value"}),
  82. ],
  83. )
  84. def test_set_attr(name, cls, expect):
  85. """Test process definition set attributes which get with same type."""
  86. with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
  87. setattr(pd, name, expect)
  88. assert (
  89. getattr(pd, name) == expect
  90. ), f"ProcessDefinition set attribute `{name}` do not work expect"
  91. @pytest.mark.parametrize(
  92. "set_attr,set_val,get_attr,get_val",
  93. [
  94. ("_project", "project", "project", Project("project")),
  95. ("_tenant", "tenant", "tenant", Tenant("tenant")),
  96. ("_start_time", "2021-01-01", "start_time", datetime(2021, 1, 1)),
  97. ("_end_time", "2021-01-01", "end_time", datetime(2021, 1, 1)),
  98. ],
  99. )
  100. def test_set_attr_return_special_object(set_attr, set_val, get_attr, get_val):
  101. """Test process definition set attributes which get with different type."""
  102. with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
  103. setattr(pd, set_attr, set_val)
  104. assert get_val == getattr(
  105. pd, get_attr
  106. ), f"Set attribute {set_attr} can not get back with {get_val}."
  107. @pytest.mark.parametrize(
  108. "val,expect",
  109. [
  110. (datetime(2021, 1, 1), datetime(2021, 1, 1)),
  111. (None, None),
  112. ("2021-01-01", datetime(2021, 1, 1)),
  113. ("2021-01-01 01:01:01", datetime(2021, 1, 1, 1, 1, 1)),
  114. ],
  115. )
  116. def test__parse_datetime(val, expect):
  117. """Test process definition function _parse_datetime.
  118. Only two datetime test cases here because we have more test cases in tests/utils/test_date.py file.
  119. """
  120. with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
  121. assert expect == pd._parse_datetime(
  122. val
  123. ), f"Function _parse_datetime with unexpect value by {val}."
  124. @pytest.mark.parametrize(
  125. "val",
  126. [
  127. 20210101,
  128. (2021, 1, 1),
  129. {"year": "2021", "month": "1", "day": 1},
  130. ],
  131. )
  132. def test__parse_datetime_not_support_type(val: Any):
  133. """Test process definition function _parse_datetime not support type error."""
  134. with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
  135. with pytest.raises(PyDSParamException, match="Do not support value type.*?"):
  136. pd._parse_datetime(val)
  137. @pytest.mark.parametrize(
  138. "param, expect",
  139. [
  140. (
  141. None,
  142. None,
  143. ),
  144. (
  145. {},
  146. None,
  147. ),
  148. (
  149. {"key1": "val1"},
  150. [
  151. {
  152. "prop": "key1",
  153. "direct": "IN",
  154. "type": "VARCHAR",
  155. "value": "val1",
  156. }
  157. ],
  158. ),
  159. (
  160. {
  161. "key1": "val1",
  162. "key2": "val2",
  163. },
  164. [
  165. {
  166. "prop": "key1",
  167. "direct": "IN",
  168. "type": "VARCHAR",
  169. "value": "val1",
  170. },
  171. {
  172. "prop": "key2",
  173. "direct": "IN",
  174. "type": "VARCHAR",
  175. "value": "val2",
  176. },
  177. ],
  178. ),
  179. ],
  180. )
  181. def test_property_param_json(param, expect):
  182. """Test ProcessDefinition's property param_json."""
  183. pd = ProcessDefinition(TEST_PROCESS_DEFINITION_NAME, param=param)
  184. assert pd.param_json == expect
  185. @patch(
  186. "pydolphinscheduler.core.task.Task.gen_code_and_version",
  187. return_value=(123, 1),
  188. )
  189. def test__pre_submit_check_switch_without_param(mock_code_version):
  190. """Test :func:`_pre_submit_check` if process definition with switch but without attribute param."""
  191. with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
  192. parent = Task(name="parent", task_type=TEST_TASK_TYPE)
  193. switch_child_1 = Task(name="switch_child_1", task_type=TEST_TASK_TYPE)
  194. switch_child_2 = Task(name="switch_child_2", task_type=TEST_TASK_TYPE)
  195. switch_condition = SwitchCondition(
  196. Branch(condition="${var} > 1", task=switch_child_1),
  197. Default(task=switch_child_2),
  198. )
  199. switch = Switch(name="switch", condition=switch_condition)
  200. parent >> switch
  201. with pytest.raises(
  202. PyDSParamException,
  203. match="Parameter param must be provider if task Switch in process definition.",
  204. ):
  205. pd._pre_submit_check()
  206. def test_process_definition_get_define_without_task():
  207. """Test process definition function get_define without task."""
  208. expect = {
  209. "name": TEST_PROCESS_DEFINITION_NAME,
  210. "description": None,
  211. "project": ProcessDefinitionDefault.PROJECT,
  212. "tenant": ProcessDefinitionDefault.TENANT,
  213. "workerGroup": ProcessDefinitionDefault.WORKER_GROUP,
  214. "timeout": 0,
  215. "releaseState": ProcessDefinitionReleaseState.ONLINE,
  216. "param": None,
  217. "tasks": {},
  218. "taskDefinitionJson": [{}],
  219. "taskRelationJson": [{}],
  220. }
  221. with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
  222. assert pd.get_define() == expect
  223. def test_process_definition_simple_context_manager():
  224. """Test simple create workflow in process definition context manager mode."""
  225. expect_tasks_num = 5
  226. with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
  227. for i in range(expect_tasks_num):
  228. curr_task = Task(name=f"task-{i}", task_type=f"type-{i}")
  229. # Set deps task i as i-1 parent
  230. if i > 0:
  231. pre_task = pd.get_one_task_by_name(f"task-{i - 1}")
  232. curr_task.set_upstream(pre_task)
  233. assert len(pd.tasks) == expect_tasks_num
  234. # Test if task process_definition same as origin one
  235. task: Task = pd.get_one_task_by_name("task-0")
  236. assert pd is task.process_definition
  237. # Test if all tasks with expect deps
  238. for i in range(expect_tasks_num):
  239. task: Task = pd.get_one_task_by_name(f"task-{i}")
  240. if i == 0:
  241. assert task._upstream_task_codes == set()
  242. assert task._downstream_task_codes == {
  243. pd.get_one_task_by_name("task-1").code
  244. }
  245. elif i == expect_tasks_num - 1:
  246. assert task._upstream_task_codes == {
  247. pd.get_one_task_by_name(f"task-{i - 1}").code
  248. }
  249. assert task._downstream_task_codes == set()
  250. else:
  251. assert task._upstream_task_codes == {
  252. pd.get_one_task_by_name(f"task-{i - 1}").code
  253. }
  254. assert task._downstream_task_codes == {
  255. pd.get_one_task_by_name(f"task-{i + 1}").code
  256. }
  257. def test_process_definition_simple_separate():
  258. """Test process definition simple create workflow in separate mode.
  259. This test just test basic information, cause most of test case is duplicate to
  260. test_process_definition_simple_context_manager.
  261. """
  262. expect_tasks_num = 5
  263. pd = ProcessDefinition(TEST_PROCESS_DEFINITION_NAME)
  264. for i in range(expect_tasks_num):
  265. curr_task = Task(
  266. name=f"task-{i}",
  267. task_type=f"type-{i}",
  268. process_definition=pd,
  269. )
  270. # Set deps task i as i-1 parent
  271. if i > 0:
  272. pre_task = pd.get_one_task_by_name(f"task-{i - 1}")
  273. curr_task.set_upstream(pre_task)
  274. assert len(pd.tasks) == expect_tasks_num
  275. assert all(["task-" in task.name for task in pd.task_list])
  276. @pytest.mark.parametrize(
  277. "user_attrs",
  278. [
  279. {"tenant": "tenant_specific"},
  280. {"queue": "queue_specific"},
  281. {"tenant": "tenant_specific", "queue": "queue_specific"},
  282. ],
  283. )
  284. def test_set_process_definition_user_attr(user_attrs):
  285. """Test user with correct attributes if we specific assigned to process definition object."""
  286. default_value = {
  287. "tenant": ProcessDefinitionDefault.TENANT,
  288. "queue": ProcessDefinitionDefault.QUEUE,
  289. }
  290. with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME, **user_attrs) as pd:
  291. user = pd.user
  292. for attr in default_value:
  293. # Get assigned attribute if we specific, else get default value
  294. except_attr = (
  295. user_attrs[attr] if attr in user_attrs else default_value[attr]
  296. )
  297. # Get actually attribute of user object
  298. actual_attr = getattr(user, attr)
  299. assert (
  300. except_attr == actual_attr
  301. ), f"Except attribute is {except_attr} but get {actual_attr}"
  302. def test_schedule_json_none_schedule():
  303. """Test function schedule_json with None as schedule."""
  304. with ProcessDefinition(
  305. TEST_PROCESS_DEFINITION_NAME,
  306. schedule=None,
  307. ) as pd:
  308. assert pd.schedule_json is None
  309. # We freeze time here, because we test start_time with None, and if will get datetime.datetime.now. If we do
  310. # not freeze time, it will cause flaky test here.
  311. @freeze_time("2021-01-01")
  312. @pytest.mark.parametrize(
  313. "start_time,end_time,expect_date",
  314. [
  315. (
  316. "20210101",
  317. "20210201",
  318. {"start_time": "2021-01-01 00:00:00", "end_time": "2021-02-01 00:00:00"},
  319. ),
  320. (
  321. "2021-01-01",
  322. "2021-02-01",
  323. {"start_time": "2021-01-01 00:00:00", "end_time": "2021-02-01 00:00:00"},
  324. ),
  325. (
  326. "2021/01/01",
  327. "2021/02/01",
  328. {"start_time": "2021-01-01 00:00:00", "end_time": "2021-02-01 00:00:00"},
  329. ),
  330. # Test mix pattern
  331. (
  332. "2021/01/01 01:01:01",
  333. "2021-02-02 02:02:02",
  334. {"start_time": "2021-01-01 01:01:01", "end_time": "2021-02-02 02:02:02"},
  335. ),
  336. (
  337. "2021/01/01 01:01:01",
  338. "20210202 020202",
  339. {"start_time": "2021-01-01 01:01:01", "end_time": "2021-02-02 02:02:02"},
  340. ),
  341. (
  342. "20210101 010101",
  343. "2021-02-02 02:02:02",
  344. {"start_time": "2021-01-01 01:01:01", "end_time": "2021-02-02 02:02:02"},
  345. ),
  346. # Test None value
  347. (
  348. "2021/01/01 01:02:03",
  349. None,
  350. {"start_time": "2021-01-01 01:02:03", "end_time": "9999-12-31 23:59:59"},
  351. ),
  352. (
  353. None,
  354. None,
  355. {
  356. "start_time": conv_to_schedule(datetime(2021, 1, 1)),
  357. "end_time": "9999-12-31 23:59:59",
  358. },
  359. ),
  360. ],
  361. )
  362. def test_schedule_json_start_and_end_time(start_time, end_time, expect_date):
  363. """Test function schedule_json about handle start_time and end_time.
  364. Only two datetime test cases here because we have more test cases in tests/utils/test_date.py file.
  365. """
  366. schedule = "0 0 0 * * ? *"
  367. expect = {
  368. "crontab": schedule,
  369. "startTime": expect_date["start_time"],
  370. "endTime": expect_date["end_time"],
  371. "timezoneId": ProcessDefinitionDefault.TIME_ZONE,
  372. }
  373. with ProcessDefinition(
  374. TEST_PROCESS_DEFINITION_NAME,
  375. schedule=schedule,
  376. start_time=start_time,
  377. end_time=end_time,
  378. timezone=ProcessDefinitionDefault.TIME_ZONE,
  379. ) as pd:
  380. assert pd.schedule_json == expect