test_process_definition.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502
  1. # Licensed to the Apache Software Foundation (ASF) under one
  2. # or more contributor license agreements. See the NOTICE file
  3. # distributed with this work for additional information
  4. # regarding copyright ownership. The ASF licenses this file
  5. # to you under the Apache License, Version 2.0 (the
  6. # "License"); you may not use this file except in compliance
  7. # with the License. You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing,
  12. # software distributed under the License is distributed on an
  13. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  14. # KIND, either express or implied. See the License for the
  15. # specific language governing permissions and limitations
  16. # under the License.
  17. """Test process definition."""
  18. from datetime import datetime
  19. from typing import Any, List
  20. from unittest.mock import patch
  21. import pytest
  22. from freezegun import freeze_time
  23. from pydolphinscheduler import configuration
  24. from pydolphinscheduler.core.process_definition import ProcessDefinition
  25. from pydolphinscheduler.core.resource import Resource
  26. from pydolphinscheduler.exceptions import PyDSParamException
  27. from pydolphinscheduler.models import Project, Tenant, User
  28. from pydolphinscheduler.tasks.switch import Branch, Default, Switch, SwitchCondition
  29. from pydolphinscheduler.utils.date import conv_to_schedule
  30. from tests.testing.task import Task
  31. TEST_PROCESS_DEFINITION_NAME = "simple-test-process-definition"
  32. TEST_TASK_TYPE = "test-task-type"
  33. @pytest.mark.parametrize("func", ["run", "submit", "start"])
  34. def test_process_definition_key_attr(func):
  35. """Test process definition have specific functions or attributes."""
  36. with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
  37. assert hasattr(
  38. pd, func
  39. ), f"ProcessDefinition instance don't have attribute `{func}`"
  40. @pytest.mark.parametrize(
  41. "name,value",
  42. [
  43. ("timezone", configuration.WORKFLOW_TIME_ZONE),
  44. ("project", Project(configuration.WORKFLOW_PROJECT)),
  45. ("tenant", Tenant(configuration.WORKFLOW_TENANT)),
  46. (
  47. "user",
  48. User(
  49. configuration.USER_NAME,
  50. configuration.USER_PASSWORD,
  51. configuration.USER_EMAIL,
  52. configuration.USER_PHONE,
  53. configuration.WORKFLOW_TENANT,
  54. configuration.WORKFLOW_QUEUE,
  55. configuration.USER_STATE,
  56. ),
  57. ),
  58. ("worker_group", configuration.WORKFLOW_WORKER_GROUP),
  59. ("warning_type", configuration.WORKFLOW_WARNING_TYPE),
  60. ("warning_group_id", 0),
  61. ("release_state", 1),
  62. ],
  63. )
  64. def test_process_definition_default_value(name, value):
  65. """Test process definition default attributes."""
  66. with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
  67. assert getattr(pd, name) == value, (
  68. f"ProcessDefinition instance attribute `{name}` not with "
  69. f"except default value `{getattr(pd, name)}`"
  70. )
  71. @pytest.mark.parametrize(
  72. "name,cls,expect",
  73. [
  74. ("name", str, "name"),
  75. ("description", str, "description"),
  76. ("schedule", str, "schedule"),
  77. ("timezone", str, "timezone"),
  78. ("worker_group", str, "worker_group"),
  79. ("warning_type", str, "FAILURE"),
  80. ("warning_group_id", int, 1),
  81. ("timeout", int, 1),
  82. ("param", dict, {"key": "value"}),
  83. (
  84. "resource_list",
  85. List,
  86. [Resource(name="/dev/test.py", content="hello world", description="desc")],
  87. ),
  88. ],
  89. )
  90. def test_set_attr(name, cls, expect):
  91. """Test process definition set attributes which get with same type."""
  92. with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
  93. setattr(pd, name, expect)
  94. assert (
  95. getattr(pd, name) == expect
  96. ), f"ProcessDefinition set attribute `{name}` do not work expect"
  97. @pytest.mark.parametrize(
  98. "value,expect",
  99. [
  100. ("online", 1),
  101. ("offline", 0),
  102. ],
  103. )
  104. def test_set_release_state(value, expect):
  105. """Test process definition set release_state attributes."""
  106. with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME, release_state=value) as pd:
  107. assert (
  108. getattr(pd, "release_state") == expect
  109. ), "ProcessDefinition set attribute release_state do not return expect value."
  110. @pytest.mark.parametrize(
  111. "value",
  112. [
  113. "oneline",
  114. "offeline",
  115. 1,
  116. 0,
  117. None,
  118. ],
  119. )
  120. def test_set_release_state_error(value):
  121. """Test process definition set release_state attributes with error."""
  122. pd = ProcessDefinition(TEST_PROCESS_DEFINITION_NAME, release_state=value)
  123. with pytest.raises(
  124. PyDSParamException,
  125. match="Parameter release_state only support `online` or `offline` but get.*",
  126. ):
  127. pd.release_state
  128. @pytest.mark.parametrize(
  129. "set_attr,set_val,get_attr,get_val",
  130. [
  131. ("_project", "project", "project", Project("project")),
  132. ("_tenant", "tenant", "tenant", Tenant("tenant")),
  133. ("_start_time", "2021-01-01", "start_time", datetime(2021, 1, 1)),
  134. ("_end_time", "2021-01-01", "end_time", datetime(2021, 1, 1)),
  135. ],
  136. )
  137. def test_set_attr_return_special_object(set_attr, set_val, get_attr, get_val):
  138. """Test process definition set attributes which get with different type."""
  139. with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
  140. setattr(pd, set_attr, set_val)
  141. assert get_val == getattr(
  142. pd, get_attr
  143. ), f"Set attribute {set_attr} can not get back with {get_val}."
  144. @pytest.mark.parametrize(
  145. "val,expect",
  146. [
  147. (datetime(2021, 1, 1), datetime(2021, 1, 1)),
  148. (None, None),
  149. ("2021-01-01", datetime(2021, 1, 1)),
  150. ("2021-01-01 01:01:01", datetime(2021, 1, 1, 1, 1, 1)),
  151. ],
  152. )
  153. def test__parse_datetime(val, expect):
  154. """Test process definition function _parse_datetime.
  155. Only two datetime test cases here because we have more test cases in tests/utils/test_date.py file.
  156. """
  157. with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
  158. assert expect == pd._parse_datetime(
  159. val
  160. ), f"Function _parse_datetime with unexpect value by {val}."
  161. @pytest.mark.parametrize(
  162. "val",
  163. [
  164. 20210101,
  165. (2021, 1, 1),
  166. {"year": "2021", "month": "1", "day": 1},
  167. ],
  168. )
  169. def test__parse_datetime_not_support_type(val: Any):
  170. """Test process definition function _parse_datetime not support type error."""
  171. with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
  172. with pytest.raises(PyDSParamException, match="Do not support value type.*?"):
  173. pd._parse_datetime(val)
  174. @pytest.mark.parametrize(
  175. "val",
  176. [
  177. "ALLL",
  178. "nonee",
  179. ],
  180. )
  181. def test_warn_type_not_support_type(val: str):
  182. """Test process definition param warning_type not support type error."""
  183. with pytest.raises(
  184. PyDSParamException, match="Parameter `warning_type` with unexpect value.*?"
  185. ):
  186. ProcessDefinition(TEST_PROCESS_DEFINITION_NAME, warning_type=val)
  187. @pytest.mark.parametrize(
  188. "param, expect",
  189. [
  190. (
  191. None,
  192. [],
  193. ),
  194. (
  195. {},
  196. [],
  197. ),
  198. (
  199. {"key1": "val1"},
  200. [
  201. {
  202. "prop": "key1",
  203. "direct": "IN",
  204. "type": "VARCHAR",
  205. "value": "val1",
  206. }
  207. ],
  208. ),
  209. (
  210. {
  211. "key1": "val1",
  212. "key2": "val2",
  213. },
  214. [
  215. {
  216. "prop": "key1",
  217. "direct": "IN",
  218. "type": "VARCHAR",
  219. "value": "val1",
  220. },
  221. {
  222. "prop": "key2",
  223. "direct": "IN",
  224. "type": "VARCHAR",
  225. "value": "val2",
  226. },
  227. ],
  228. ),
  229. ],
  230. )
  231. def test_property_param_json(param, expect):
  232. """Test ProcessDefinition's property param_json."""
  233. pd = ProcessDefinition(TEST_PROCESS_DEFINITION_NAME, param=param)
  234. assert pd.param_json == expect
  235. @patch(
  236. "pydolphinscheduler.core.task.Task.gen_code_and_version",
  237. return_value=(123, 1),
  238. )
  239. def test__pre_submit_check_switch_without_param(mock_code_version):
  240. """Test :func:`_pre_submit_check` if process definition with switch but without attribute param."""
  241. with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
  242. parent = Task(name="parent", task_type=TEST_TASK_TYPE)
  243. switch_child_1 = Task(name="switch_child_1", task_type=TEST_TASK_TYPE)
  244. switch_child_2 = Task(name="switch_child_2", task_type=TEST_TASK_TYPE)
  245. switch_condition = SwitchCondition(
  246. Branch(condition="${var} > 1", task=switch_child_1),
  247. Default(task=switch_child_2),
  248. )
  249. switch = Switch(name="switch", condition=switch_condition)
  250. parent >> switch
  251. with pytest.raises(
  252. PyDSParamException,
  253. match="Parameter param or at least one local_param of task must "
  254. "be provider if task Switch in process definition.",
  255. ):
  256. pd._pre_submit_check()
  257. @patch(
  258. "pydolphinscheduler.core.task.Task.gen_code_and_version",
  259. return_value=(123, 1),
  260. )
  261. def test__pre_submit_check_switch_with_local_params(mock_code_version):
  262. """Test :func:`_pre_submit_check` if process definition with switch with local params of task."""
  263. with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
  264. parent = Task(
  265. name="parent",
  266. task_type=TEST_TASK_TYPE,
  267. local_params=[
  268. {"prop": "var", "direct": "OUT", "type": "VARCHAR", "value": ""}
  269. ],
  270. )
  271. switch_child_1 = Task(name="switch_child_1", task_type=TEST_TASK_TYPE)
  272. switch_child_2 = Task(name="switch_child_2", task_type=TEST_TASK_TYPE)
  273. switch_condition = SwitchCondition(
  274. Branch(condition="${var} > 1", task=switch_child_1),
  275. Default(task=switch_child_2),
  276. )
  277. switch = Switch(name="switch", condition=switch_condition)
  278. parent >> switch
  279. pd._pre_submit_check()
  280. def test_process_definition_get_define_without_task():
  281. """Test process definition function get_define without task."""
  282. expect = {
  283. "name": TEST_PROCESS_DEFINITION_NAME,
  284. "description": None,
  285. "project": configuration.WORKFLOW_PROJECT,
  286. "tenant": configuration.WORKFLOW_TENANT,
  287. "workerGroup": configuration.WORKFLOW_WORKER_GROUP,
  288. "warningType": configuration.WORKFLOW_WARNING_TYPE,
  289. "warningGroupId": 0,
  290. "timeout": 0,
  291. "releaseState": 1,
  292. "param": None,
  293. "tasks": {},
  294. "taskDefinitionJson": [{}],
  295. "taskRelationJson": [{}],
  296. "resourceList": [],
  297. }
  298. with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
  299. assert pd.get_define() == expect
  300. def test_process_definition_simple_context_manager():
  301. """Test simple create workflow in process definition context manager mode."""
  302. expect_tasks_num = 5
  303. with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
  304. for i in range(expect_tasks_num):
  305. curr_task = Task(name=f"task-{i}", task_type=f"type-{i}")
  306. # Set deps task i as i-1 parent
  307. if i > 0:
  308. pre_task = pd.get_one_task_by_name(f"task-{i - 1}")
  309. curr_task.set_upstream(pre_task)
  310. assert len(pd.tasks) == expect_tasks_num
  311. # Test if task process_definition same as origin one
  312. task: Task = pd.get_one_task_by_name("task-0")
  313. assert pd is task.process_definition
  314. # Test if all tasks with expect deps
  315. for i in range(expect_tasks_num):
  316. task: Task = pd.get_one_task_by_name(f"task-{i}")
  317. if i == 0:
  318. assert task._upstream_task_codes == set()
  319. assert task._downstream_task_codes == {
  320. pd.get_one_task_by_name("task-1").code
  321. }
  322. elif i == expect_tasks_num - 1:
  323. assert task._upstream_task_codes == {
  324. pd.get_one_task_by_name(f"task-{i - 1}").code
  325. }
  326. assert task._downstream_task_codes == set()
  327. else:
  328. assert task._upstream_task_codes == {
  329. pd.get_one_task_by_name(f"task-{i - 1}").code
  330. }
  331. assert task._downstream_task_codes == {
  332. pd.get_one_task_by_name(f"task-{i + 1}").code
  333. }
  334. def test_process_definition_simple_separate():
  335. """Test process definition simple create workflow in separate mode.
  336. This test just test basic information, cause most of test case is duplicate to
  337. test_process_definition_simple_context_manager.
  338. """
  339. expect_tasks_num = 5
  340. pd = ProcessDefinition(TEST_PROCESS_DEFINITION_NAME)
  341. for i in range(expect_tasks_num):
  342. curr_task = Task(
  343. name=f"task-{i}",
  344. task_type=f"type-{i}",
  345. process_definition=pd,
  346. )
  347. # Set deps task i as i-1 parent
  348. if i > 0:
  349. pre_task = pd.get_one_task_by_name(f"task-{i - 1}")
  350. curr_task.set_upstream(pre_task)
  351. assert len(pd.tasks) == expect_tasks_num
  352. assert all(["task-" in task.name for task in pd.task_list])
  353. @pytest.mark.parametrize(
  354. "user_attrs",
  355. [
  356. {"tenant": "tenant_specific"},
  357. ],
  358. )
  359. def test_set_process_definition_user_attr(user_attrs):
  360. """Test user with correct attributes if we specific assigned to process definition object."""
  361. default_value = {
  362. "tenant": configuration.WORKFLOW_TENANT,
  363. }
  364. with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME, **user_attrs) as pd:
  365. user = pd.user
  366. for attr in default_value:
  367. # Get assigned attribute if we specific, else get default value
  368. except_attr = (
  369. user_attrs[attr] if attr in user_attrs else default_value[attr]
  370. )
  371. # Get actually attribute of user object
  372. actual_attr = getattr(user, attr)
  373. assert (
  374. except_attr == actual_attr
  375. ), f"Except attribute is {except_attr} but get {actual_attr}"
  376. def test_schedule_json_none_schedule():
  377. """Test function schedule_json with None as schedule."""
  378. with ProcessDefinition(
  379. TEST_PROCESS_DEFINITION_NAME,
  380. schedule=None,
  381. ) as pd:
  382. assert pd.schedule_json is None
  383. # We freeze time here, because we test start_time with None, and if will get datetime.datetime.now. If we do
  384. # not freeze time, it will cause flaky test here.
  385. @freeze_time("2021-01-01")
  386. @pytest.mark.parametrize(
  387. "start_time,end_time,expect_date",
  388. [
  389. (
  390. "20210101",
  391. "20210201",
  392. {"start_time": "2021-01-01 00:00:00", "end_time": "2021-02-01 00:00:00"},
  393. ),
  394. (
  395. "2021-01-01",
  396. "2021-02-01",
  397. {"start_time": "2021-01-01 00:00:00", "end_time": "2021-02-01 00:00:00"},
  398. ),
  399. (
  400. "2021/01/01",
  401. "2021/02/01",
  402. {"start_time": "2021-01-01 00:00:00", "end_time": "2021-02-01 00:00:00"},
  403. ),
  404. # Test mix pattern
  405. (
  406. "2021/01/01 01:01:01",
  407. "2021-02-02 02:02:02",
  408. {"start_time": "2021-01-01 01:01:01", "end_time": "2021-02-02 02:02:02"},
  409. ),
  410. (
  411. "2021/01/01 01:01:01",
  412. "20210202 020202",
  413. {"start_time": "2021-01-01 01:01:01", "end_time": "2021-02-02 02:02:02"},
  414. ),
  415. (
  416. "20210101 010101",
  417. "2021-02-02 02:02:02",
  418. {"start_time": "2021-01-01 01:01:01", "end_time": "2021-02-02 02:02:02"},
  419. ),
  420. # Test None value
  421. (
  422. "2021/01/01 01:02:03",
  423. None,
  424. {"start_time": "2021-01-01 01:02:03", "end_time": "9999-12-31 23:59:59"},
  425. ),
  426. (
  427. None,
  428. None,
  429. {
  430. "start_time": conv_to_schedule(datetime(2021, 1, 1)),
  431. "end_time": "9999-12-31 23:59:59",
  432. },
  433. ),
  434. ],
  435. )
  436. def test_schedule_json_start_and_end_time(start_time, end_time, expect_date):
  437. """Test function schedule_json about handle start_time and end_time.
  438. Only two datetime test cases here because we have more test cases in tests/utils/test_date.py file.
  439. """
  440. schedule = "0 0 0 * * ? *"
  441. expect = {
  442. "crontab": schedule,
  443. "startTime": expect_date["start_time"],
  444. "endTime": expect_date["end_time"],
  445. "timezoneId": configuration.WORKFLOW_TIME_ZONE,
  446. }
  447. with ProcessDefinition(
  448. TEST_PROCESS_DEFINITION_NAME,
  449. schedule=schedule,
  450. start_time=start_time,
  451. end_time=end_time,
  452. timezone=configuration.WORKFLOW_TIME_ZONE,
  453. ) as pd:
  454. assert pd.schedule_json == expect