test_process_definition.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495
  1. # Licensed to the Apache Software Foundation (ASF) under one
  2. # or more contributor license agreements. See the NOTICE file
  3. # distributed with this work for additional information
  4. # regarding copyright ownership. The ASF licenses this file
  5. # to you under the Apache License, Version 2.0 (the
  6. # "License"); you may not use this file except in compliance
  7. # with the License. You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing,
  12. # software distributed under the License is distributed on an
  13. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  14. # KIND, either express or implied. See the License for the
  15. # specific language governing permissions and limitations
  16. # under the License.
  17. """Test process definition."""
  18. from datetime import datetime
  19. from typing import Any
  20. from unittest.mock import patch
  21. import pytest
  22. from freezegun import freeze_time
  23. from pydolphinscheduler.core import configuration
  24. from pydolphinscheduler.core.process_definition import ProcessDefinition
  25. from pydolphinscheduler.exceptions import PyDSParamException
  26. from pydolphinscheduler.side import Project, Tenant, User
  27. from pydolphinscheduler.tasks.switch import Branch, Default, Switch, SwitchCondition
  28. from pydolphinscheduler.utils.date import conv_to_schedule
  29. from tests.testing.task import Task
  30. TEST_PROCESS_DEFINITION_NAME = "simple-test-process-definition"
  31. TEST_TASK_TYPE = "test-task-type"
  32. @pytest.mark.parametrize("func", ["run", "submit", "start"])
  33. def test_process_definition_key_attr(func):
  34. """Test process definition have specific functions or attributes."""
  35. with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
  36. assert hasattr(
  37. pd, func
  38. ), f"ProcessDefinition instance don't have attribute `{func}`"
  39. @pytest.mark.parametrize(
  40. "name,value",
  41. [
  42. ("timezone", configuration.WORKFLOW_TIME_ZONE),
  43. ("project", Project(configuration.WORKFLOW_PROJECT)),
  44. ("tenant", Tenant(configuration.WORKFLOW_TENANT)),
  45. (
  46. "user",
  47. User(
  48. configuration.USER_NAME,
  49. configuration.USER_PASSWORD,
  50. configuration.USER_EMAIL,
  51. configuration.USER_PHONE,
  52. configuration.WORKFLOW_TENANT,
  53. configuration.WORKFLOW_QUEUE,
  54. configuration.USER_STATE,
  55. ),
  56. ),
  57. ("worker_group", configuration.WORKFLOW_WORKER_GROUP),
  58. ("warning_type", configuration.WORKFLOW_WARNING_TYPE),
  59. ("warning_group_id", 0),
  60. ("release_state", 1),
  61. ],
  62. )
  63. def test_process_definition_default_value(name, value):
  64. """Test process definition default attributes."""
  65. with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
  66. assert getattr(pd, name) == value, (
  67. f"ProcessDefinition instance attribute `{name}` not with "
  68. f"except default value `{getattr(pd, name)}`"
  69. )
  70. @pytest.mark.parametrize(
  71. "name,cls,expect",
  72. [
  73. ("name", str, "name"),
  74. ("description", str, "description"),
  75. ("schedule", str, "schedule"),
  76. ("timezone", str, "timezone"),
  77. ("worker_group", str, "worker_group"),
  78. ("warning_type", str, "FAILURE"),
  79. ("warning_group_id", int, 1),
  80. ("timeout", int, 1),
  81. ("param", dict, {"key": "value"}),
  82. ],
  83. )
  84. def test_set_attr(name, cls, expect):
  85. """Test process definition set attributes which get with same type."""
  86. with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
  87. setattr(pd, name, expect)
  88. assert (
  89. getattr(pd, name) == expect
  90. ), f"ProcessDefinition set attribute `{name}` do not work expect"
  91. @pytest.mark.parametrize(
  92. "value,expect",
  93. [
  94. ("online", 1),
  95. ("offline", 0),
  96. ],
  97. )
  98. def test_set_release_state(value, expect):
  99. """Test process definition set release_state attributes."""
  100. with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME, release_state=value) as pd:
  101. assert (
  102. getattr(pd, "release_state") == expect
  103. ), "ProcessDefinition set attribute release_state do not return expect value."
  104. @pytest.mark.parametrize(
  105. "value",
  106. [
  107. "oneline",
  108. "offeline",
  109. 1,
  110. 0,
  111. None,
  112. ],
  113. )
  114. def test_set_release_state_error(value):
  115. """Test process definition set release_state attributes with error."""
  116. pd = ProcessDefinition(TEST_PROCESS_DEFINITION_NAME, release_state=value)
  117. with pytest.raises(
  118. PyDSParamException,
  119. match="Parameter release_state only support `online` or `offline` but get.*",
  120. ):
  121. pd.release_state
  122. @pytest.mark.parametrize(
  123. "set_attr,set_val,get_attr,get_val",
  124. [
  125. ("_project", "project", "project", Project("project")),
  126. ("_tenant", "tenant", "tenant", Tenant("tenant")),
  127. ("_start_time", "2021-01-01", "start_time", datetime(2021, 1, 1)),
  128. ("_end_time", "2021-01-01", "end_time", datetime(2021, 1, 1)),
  129. ],
  130. )
  131. def test_set_attr_return_special_object(set_attr, set_val, get_attr, get_val):
  132. """Test process definition set attributes which get with different type."""
  133. with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
  134. setattr(pd, set_attr, set_val)
  135. assert get_val == getattr(
  136. pd, get_attr
  137. ), f"Set attribute {set_attr} can not get back with {get_val}."
  138. @pytest.mark.parametrize(
  139. "val,expect",
  140. [
  141. (datetime(2021, 1, 1), datetime(2021, 1, 1)),
  142. (None, None),
  143. ("2021-01-01", datetime(2021, 1, 1)),
  144. ("2021-01-01 01:01:01", datetime(2021, 1, 1, 1, 1, 1)),
  145. ],
  146. )
  147. def test__parse_datetime(val, expect):
  148. """Test process definition function _parse_datetime.
  149. Only two datetime test cases here because we have more test cases in tests/utils/test_date.py file.
  150. """
  151. with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
  152. assert expect == pd._parse_datetime(
  153. val
  154. ), f"Function _parse_datetime with unexpect value by {val}."
  155. @pytest.mark.parametrize(
  156. "val",
  157. [
  158. 20210101,
  159. (2021, 1, 1),
  160. {"year": "2021", "month": "1", "day": 1},
  161. ],
  162. )
  163. def test__parse_datetime_not_support_type(val: Any):
  164. """Test process definition function _parse_datetime not support type error."""
  165. with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
  166. with pytest.raises(PyDSParamException, match="Do not support value type.*?"):
  167. pd._parse_datetime(val)
  168. @pytest.mark.parametrize(
  169. "val",
  170. [
  171. "ALLL",
  172. "nonee",
  173. ],
  174. )
  175. def test_warn_type_not_support_type(val: str):
  176. """Test process definition param warning_type not support type error."""
  177. with pytest.raises(
  178. PyDSParamException, match="Parameter `warning_type` with unexpect value.*?"
  179. ):
  180. ProcessDefinition(TEST_PROCESS_DEFINITION_NAME, warning_type=val)
  181. @pytest.mark.parametrize(
  182. "param, expect",
  183. [
  184. (
  185. None,
  186. [],
  187. ),
  188. (
  189. {},
  190. [],
  191. ),
  192. (
  193. {"key1": "val1"},
  194. [
  195. {
  196. "prop": "key1",
  197. "direct": "IN",
  198. "type": "VARCHAR",
  199. "value": "val1",
  200. }
  201. ],
  202. ),
  203. (
  204. {
  205. "key1": "val1",
  206. "key2": "val2",
  207. },
  208. [
  209. {
  210. "prop": "key1",
  211. "direct": "IN",
  212. "type": "VARCHAR",
  213. "value": "val1",
  214. },
  215. {
  216. "prop": "key2",
  217. "direct": "IN",
  218. "type": "VARCHAR",
  219. "value": "val2",
  220. },
  221. ],
  222. ),
  223. ],
  224. )
  225. def test_property_param_json(param, expect):
  226. """Test ProcessDefinition's property param_json."""
  227. pd = ProcessDefinition(TEST_PROCESS_DEFINITION_NAME, param=param)
  228. assert pd.param_json == expect
  229. @patch(
  230. "pydolphinscheduler.core.task.Task.gen_code_and_version",
  231. return_value=(123, 1),
  232. )
  233. def test__pre_submit_check_switch_without_param(mock_code_version):
  234. """Test :func:`_pre_submit_check` if process definition with switch but without attribute param."""
  235. with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
  236. parent = Task(name="parent", task_type=TEST_TASK_TYPE)
  237. switch_child_1 = Task(name="switch_child_1", task_type=TEST_TASK_TYPE)
  238. switch_child_2 = Task(name="switch_child_2", task_type=TEST_TASK_TYPE)
  239. switch_condition = SwitchCondition(
  240. Branch(condition="${var} > 1", task=switch_child_1),
  241. Default(task=switch_child_2),
  242. )
  243. switch = Switch(name="switch", condition=switch_condition)
  244. parent >> switch
  245. with pytest.raises(
  246. PyDSParamException,
  247. match="Parameter param or at least one local_param of task must "
  248. "be provider if task Switch in process definition.",
  249. ):
  250. pd._pre_submit_check()
  251. @patch(
  252. "pydolphinscheduler.core.task.Task.gen_code_and_version",
  253. return_value=(123, 1),
  254. )
  255. def test__pre_submit_check_switch_with_local_params(mock_code_version):
  256. """Test :func:`_pre_submit_check` if process definition with switch with local params of task."""
  257. with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
  258. parent = Task(
  259. name="parent",
  260. task_type=TEST_TASK_TYPE,
  261. local_params=[
  262. {"prop": "var", "direct": "OUT", "type": "VARCHAR", "value": ""}
  263. ],
  264. )
  265. switch_child_1 = Task(name="switch_child_1", task_type=TEST_TASK_TYPE)
  266. switch_child_2 = Task(name="switch_child_2", task_type=TEST_TASK_TYPE)
  267. switch_condition = SwitchCondition(
  268. Branch(condition="${var} > 1", task=switch_child_1),
  269. Default(task=switch_child_2),
  270. )
  271. switch = Switch(name="switch", condition=switch_condition)
  272. parent >> switch
  273. pd._pre_submit_check()
  274. def test_process_definition_get_define_without_task():
  275. """Test process definition function get_define without task."""
  276. expect = {
  277. "name": TEST_PROCESS_DEFINITION_NAME,
  278. "description": None,
  279. "project": configuration.WORKFLOW_PROJECT,
  280. "tenant": configuration.WORKFLOW_TENANT,
  281. "workerGroup": configuration.WORKFLOW_WORKER_GROUP,
  282. "warningType": configuration.WORKFLOW_WARNING_TYPE,
  283. "warningGroupId": 0,
  284. "timeout": 0,
  285. "releaseState": 1,
  286. "param": None,
  287. "tasks": {},
  288. "taskDefinitionJson": [{}],
  289. "taskRelationJson": [{}],
  290. }
  291. with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
  292. assert pd.get_define() == expect
  293. def test_process_definition_simple_context_manager():
  294. """Test simple create workflow in process definition context manager mode."""
  295. expect_tasks_num = 5
  296. with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
  297. for i in range(expect_tasks_num):
  298. curr_task = Task(name=f"task-{i}", task_type=f"type-{i}")
  299. # Set deps task i as i-1 parent
  300. if i > 0:
  301. pre_task = pd.get_one_task_by_name(f"task-{i - 1}")
  302. curr_task.set_upstream(pre_task)
  303. assert len(pd.tasks) == expect_tasks_num
  304. # Test if task process_definition same as origin one
  305. task: Task = pd.get_one_task_by_name("task-0")
  306. assert pd is task.process_definition
  307. # Test if all tasks with expect deps
  308. for i in range(expect_tasks_num):
  309. task: Task = pd.get_one_task_by_name(f"task-{i}")
  310. if i == 0:
  311. assert task._upstream_task_codes == set()
  312. assert task._downstream_task_codes == {
  313. pd.get_one_task_by_name("task-1").code
  314. }
  315. elif i == expect_tasks_num - 1:
  316. assert task._upstream_task_codes == {
  317. pd.get_one_task_by_name(f"task-{i - 1}").code
  318. }
  319. assert task._downstream_task_codes == set()
  320. else:
  321. assert task._upstream_task_codes == {
  322. pd.get_one_task_by_name(f"task-{i - 1}").code
  323. }
  324. assert task._downstream_task_codes == {
  325. pd.get_one_task_by_name(f"task-{i + 1}").code
  326. }
  327. def test_process_definition_simple_separate():
  328. """Test process definition simple create workflow in separate mode.
  329. This test just test basic information, cause most of test case is duplicate to
  330. test_process_definition_simple_context_manager.
  331. """
  332. expect_tasks_num = 5
  333. pd = ProcessDefinition(TEST_PROCESS_DEFINITION_NAME)
  334. for i in range(expect_tasks_num):
  335. curr_task = Task(
  336. name=f"task-{i}",
  337. task_type=f"type-{i}",
  338. process_definition=pd,
  339. )
  340. # Set deps task i as i-1 parent
  341. if i > 0:
  342. pre_task = pd.get_one_task_by_name(f"task-{i - 1}")
  343. curr_task.set_upstream(pre_task)
  344. assert len(pd.tasks) == expect_tasks_num
  345. assert all(["task-" in task.name for task in pd.task_list])
  346. @pytest.mark.parametrize(
  347. "user_attrs",
  348. [
  349. {"tenant": "tenant_specific"},
  350. ],
  351. )
  352. def test_set_process_definition_user_attr(user_attrs):
  353. """Test user with correct attributes if we specific assigned to process definition object."""
  354. default_value = {
  355. "tenant": configuration.WORKFLOW_TENANT,
  356. }
  357. with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME, **user_attrs) as pd:
  358. user = pd.user
  359. for attr in default_value:
  360. # Get assigned attribute if we specific, else get default value
  361. except_attr = (
  362. user_attrs[attr] if attr in user_attrs else default_value[attr]
  363. )
  364. # Get actually attribute of user object
  365. actual_attr = getattr(user, attr)
  366. assert (
  367. except_attr == actual_attr
  368. ), f"Except attribute is {except_attr} but get {actual_attr}"
  369. def test_schedule_json_none_schedule():
  370. """Test function schedule_json with None as schedule."""
  371. with ProcessDefinition(
  372. TEST_PROCESS_DEFINITION_NAME,
  373. schedule=None,
  374. ) as pd:
  375. assert pd.schedule_json is None
  376. # We freeze time here, because we test start_time with None, and if will get datetime.datetime.now. If we do
  377. # not freeze time, it will cause flaky test here.
  378. @freeze_time("2021-01-01")
  379. @pytest.mark.parametrize(
  380. "start_time,end_time,expect_date",
  381. [
  382. (
  383. "20210101",
  384. "20210201",
  385. {"start_time": "2021-01-01 00:00:00", "end_time": "2021-02-01 00:00:00"},
  386. ),
  387. (
  388. "2021-01-01",
  389. "2021-02-01",
  390. {"start_time": "2021-01-01 00:00:00", "end_time": "2021-02-01 00:00:00"},
  391. ),
  392. (
  393. "2021/01/01",
  394. "2021/02/01",
  395. {"start_time": "2021-01-01 00:00:00", "end_time": "2021-02-01 00:00:00"},
  396. ),
  397. # Test mix pattern
  398. (
  399. "2021/01/01 01:01:01",
  400. "2021-02-02 02:02:02",
  401. {"start_time": "2021-01-01 01:01:01", "end_time": "2021-02-02 02:02:02"},
  402. ),
  403. (
  404. "2021/01/01 01:01:01",
  405. "20210202 020202",
  406. {"start_time": "2021-01-01 01:01:01", "end_time": "2021-02-02 02:02:02"},
  407. ),
  408. (
  409. "20210101 010101",
  410. "2021-02-02 02:02:02",
  411. {"start_time": "2021-01-01 01:01:01", "end_time": "2021-02-02 02:02:02"},
  412. ),
  413. # Test None value
  414. (
  415. "2021/01/01 01:02:03",
  416. None,
  417. {"start_time": "2021-01-01 01:02:03", "end_time": "9999-12-31 23:59:59"},
  418. ),
  419. (
  420. None,
  421. None,
  422. {
  423. "start_time": conv_to_schedule(datetime(2021, 1, 1)),
  424. "end_time": "9999-12-31 23:59:59",
  425. },
  426. ),
  427. ],
  428. )
  429. def test_schedule_json_start_and_end_time(start_time, end_time, expect_date):
  430. """Test function schedule_json about handle start_time and end_time.
  431. Only two datetime test cases here because we have more test cases in tests/utils/test_date.py file.
  432. """
  433. schedule = "0 0 0 * * ? *"
  434. expect = {
  435. "crontab": schedule,
  436. "startTime": expect_date["start_time"],
  437. "endTime": expect_date["end_time"],
  438. "timezoneId": configuration.WORKFLOW_TIME_ZONE,
  439. }
  440. with ProcessDefinition(
  441. TEST_PROCESS_DEFINITION_NAME,
  442. schedule=schedule,
  443. start_time=start_time,
  444. end_time=end_time,
  445. timezone=configuration.WORKFLOW_TIME_ZONE,
  446. ) as pd:
  447. assert pd.schedule_json == expect