test_condition.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460
  1. # Licensed to the Apache Software Foundation (ASF) under one
  2. # or more contributor license agreements. See the NOTICE file
  3. # distributed with this work for additional information
  4. # regarding copyright ownership. The ASF licenses this file
  5. # to you under the Apache License, Version 2.0 (the
  6. # "License"); you may not use this file except in compliance
  7. # with the License. You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing,
  12. # software distributed under the License is distributed on an
  13. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  14. # KIND, either express or implied. See the License for the
  15. # specific language governing permissions and limitations
  16. # under the License.
  17. """Test Task dependent."""
  18. from typing import List, Tuple
  19. from unittest.mock import patch
  20. import pytest
  21. from pydolphinscheduler.core.process_definition import ProcessDefinition
  22. from pydolphinscheduler.exceptions import PyDSParamException
  23. from pydolphinscheduler.tasks.condition import (
  24. FAILURE,
  25. SUCCESS,
  26. And,
  27. ConditionOperator,
  28. Conditions,
  29. Or,
  30. Status,
  31. )
  32. from tests.testing.task import Task
  33. TEST_NAME = "test-name"
  34. TEST_PROJECT = "test-project"
  35. TEST_PROCESS_DEFINITION = "test-process-definition"
  36. TEST_TYPE = "test-type"
  37. TEST_PROJECT_CODE, TEST_DEFINITION_CODE, TEST_TASK_CODE = 12345, 123456, 1234567
  38. TEST_OPERATOR_LIST = ("AND", "OR")
  39. @pytest.mark.parametrize(
  40. "obj, expect",
  41. [
  42. (Status, "STATUS"),
  43. (SUCCESS, "SUCCESS"),
  44. (FAILURE, "FAILURE"),
  45. ],
  46. )
  47. def test_class_status_status_name(obj: Status, expect: str):
  48. """Test class status and sub class property status_name."""
  49. assert obj.status_name() == expect
  50. @pytest.mark.parametrize(
  51. "obj, tasks",
  52. [
  53. (Status, (1, 2, 3)),
  54. (SUCCESS, (1.1, 2.2, 3.3)),
  55. (FAILURE, (ConditionOperator(1), ConditionOperator(2), ConditionOperator(3))),
  56. ],
  57. )
  58. def test_class_status_depend_item_list_no_expect_type(obj: Status, tasks: Tuple):
  59. """Test class status and sub class raise error when assign not support type."""
  60. with pytest.raises(
  61. PyDSParamException, match=".*?only accept class Task or sub class Task, but get"
  62. ):
  63. obj(*tasks).get_define()
  64. @pytest.mark.parametrize(
  65. "obj, tasks",
  66. [
  67. (Status, [Task(str(i), TEST_TYPE) for i in range(1)]),
  68. (Status, [Task(str(i), TEST_TYPE) for i in range(2)]),
  69. (Status, [Task(str(i), TEST_TYPE) for i in range(3)]),
  70. (SUCCESS, [Task(str(i), TEST_TYPE) for i in range(1)]),
  71. (SUCCESS, [Task(str(i), TEST_TYPE) for i in range(2)]),
  72. (SUCCESS, [Task(str(i), TEST_TYPE) for i in range(3)]),
  73. (FAILURE, [Task(str(i), TEST_TYPE) for i in range(1)]),
  74. (FAILURE, [Task(str(i), TEST_TYPE) for i in range(2)]),
  75. (FAILURE, [Task(str(i), TEST_TYPE) for i in range(3)]),
  76. ],
  77. )
  78. def test_class_status_depend_item_list(obj: Status, tasks: Tuple):
  79. """Test class status and sub class function :func:`depend_item_list`."""
  80. status = obj.status_name()
  81. expect = [
  82. {
  83. "depTaskCode": i.code,
  84. "status": status,
  85. }
  86. for i in tasks
  87. ]
  88. assert obj(*tasks).get_define() == expect
  89. @pytest.mark.parametrize(
  90. "obj, expect",
  91. [
  92. (ConditionOperator, "CONDITIONOPERATOR"),
  93. (And, "AND"),
  94. (Or, "OR"),
  95. ],
  96. )
  97. def test_condition_operator_operator_name(obj: ConditionOperator, expect: str):
  98. """Test class ConditionOperator and sub class class function :func:`operator_name`."""
  99. assert obj.operator_name() == expect
  100. @pytest.mark.parametrize(
  101. "obj, expect",
  102. [
  103. (ConditionOperator, "CONDITIONOPERATOR"),
  104. (And, "AND"),
  105. (Or, "OR"),
  106. ],
  107. )
  108. def test_condition_operator_relation(obj: ConditionOperator, expect: str):
  109. """Test class ConditionOperator and sub class class property `relation`."""
  110. assert obj(1).relation == expect
  111. @pytest.mark.parametrize(
  112. "obj, status_or_operator, match",
  113. [
  114. (
  115. ConditionOperator,
  116. [Status(Task("1", TEST_TYPE)), 1],
  117. ".*?operator parameter support ConditionTask and ConditionOperator.*?",
  118. ),
  119. (
  120. ConditionOperator,
  121. [
  122. Status(Task("1", TEST_TYPE)),
  123. 1.0,
  124. ],
  125. ".*?operator parameter support ConditionTask and ConditionOperator.*?",
  126. ),
  127. (
  128. ConditionOperator,
  129. [
  130. Status(Task("1", TEST_TYPE)),
  131. ConditionOperator(And(Status(Task("1", TEST_TYPE)))),
  132. ],
  133. ".*?operator parameter only support same type.",
  134. ),
  135. (
  136. ConditionOperator,
  137. [
  138. ConditionOperator(And(Status(Task("1", TEST_TYPE)))),
  139. Status(Task("1", TEST_TYPE)),
  140. ],
  141. ".*?operator parameter only support same type.",
  142. ),
  143. ],
  144. )
  145. def test_condition_operator_set_define_attr_not_support_type(
  146. obj, status_or_operator, match
  147. ):
  148. """Test class ConditionOperator parameter error, including parameter not same or type not support."""
  149. with pytest.raises(PyDSParamException, match=match):
  150. op = obj(*status_or_operator)
  151. op.set_define_attr()
  152. @pytest.mark.parametrize(
  153. "obj, task_num",
  154. [
  155. (ConditionOperator, 1),
  156. (ConditionOperator, 2),
  157. (ConditionOperator, 3),
  158. (And, 1),
  159. (And, 2),
  160. (And, 3),
  161. (Or, 1),
  162. (Or, 2),
  163. (Or, 3),
  164. ],
  165. )
  166. def test_condition_operator_set_define_attr_status(
  167. obj: ConditionOperator, task_num: int
  168. ):
  169. """Test :func:`set_define_attr` with one or more class status."""
  170. attr = "depend_item_list"
  171. tasks = [Task(str(i), TEST_TYPE) for i in range(task_num)]
  172. status = Status(*tasks)
  173. expect = [
  174. {"depTaskCode": task.code, "status": status.status_name()} for task in tasks
  175. ]
  176. co = obj(status)
  177. co.set_define_attr()
  178. assert getattr(co, attr) == expect
  179. @pytest.mark.parametrize(
  180. "obj, status",
  181. [
  182. (ConditionOperator, (SUCCESS, SUCCESS)),
  183. (ConditionOperator, (FAILURE, FAILURE)),
  184. (ConditionOperator, (SUCCESS, FAILURE)),
  185. (ConditionOperator, (FAILURE, SUCCESS)),
  186. (And, (SUCCESS, SUCCESS)),
  187. (And, (FAILURE, FAILURE)),
  188. (And, (SUCCESS, FAILURE)),
  189. (And, (FAILURE, SUCCESS)),
  190. (Or, (SUCCESS, SUCCESS)),
  191. (Or, (FAILURE, FAILURE)),
  192. (Or, (SUCCESS, FAILURE)),
  193. (Or, (FAILURE, SUCCESS)),
  194. ],
  195. )
  196. def test_condition_operator_set_define_attr_mix_status(
  197. obj: ConditionOperator, status: List[Status]
  198. ):
  199. """Test :func:`set_define_attr` with one or more mixed status."""
  200. attr = "depend_item_list"
  201. task = Task("test-operator", TEST_TYPE)
  202. status_list = []
  203. expect = []
  204. for sta in status:
  205. status_list.append(sta(task))
  206. expect.append({"depTaskCode": task.code, "status": sta.status_name()})
  207. co = obj(*status_list)
  208. co.set_define_attr()
  209. assert getattr(co, attr) == expect
  210. @pytest.mark.parametrize(
  211. "obj, task_num",
  212. [
  213. (ConditionOperator, 1),
  214. (ConditionOperator, 2),
  215. (ConditionOperator, 3),
  216. (And, 1),
  217. (And, 2),
  218. (And, 3),
  219. (Or, 1),
  220. (Or, 2),
  221. (Or, 3),
  222. ],
  223. )
  224. def test_condition_operator_set_define_attr_operator(
  225. obj: ConditionOperator, task_num: int
  226. ):
  227. """Test :func:`set_define_attr` with one or more class condition operator."""
  228. attr = "depend_task_list"
  229. task = Task("test-operator", TEST_TYPE)
  230. status = Status(task)
  231. expect = [
  232. {
  233. "relation": obj.operator_name(),
  234. "dependItemList": [
  235. {
  236. "depTaskCode": task.code,
  237. "status": status.status_name(),
  238. }
  239. ],
  240. }
  241. for _ in range(task_num)
  242. ]
  243. co = obj(*[obj(status) for _ in range(task_num)])
  244. co.set_define_attr()
  245. assert getattr(co, attr) == expect
  246. @pytest.mark.parametrize(
  247. "cond, sub_cond",
  248. [
  249. (ConditionOperator, (And, Or)),
  250. (ConditionOperator, (Or, And)),
  251. (And, (And, Or)),
  252. (And, (Or, And)),
  253. (Or, (And, Or)),
  254. (Or, (Or, And)),
  255. ],
  256. )
  257. def test_condition_operator_set_define_attr_mix_operator(
  258. cond: ConditionOperator, sub_cond: Tuple[ConditionOperator]
  259. ):
  260. """Test :func:`set_define_attr` with one or more class mix condition operator."""
  261. attr = "depend_task_list"
  262. task = Task("test-operator", TEST_TYPE)
  263. expect = []
  264. sub_condition = []
  265. for cond in sub_cond:
  266. status = Status(task)
  267. sub_condition.append(cond(status))
  268. expect.append(
  269. {
  270. "relation": cond.operator_name(),
  271. "dependItemList": [
  272. {
  273. "depTaskCode": task.code,
  274. "status": status.status_name(),
  275. }
  276. ],
  277. }
  278. )
  279. co = cond(*sub_condition)
  280. co.set_define_attr()
  281. assert getattr(co, attr) == expect
  282. @patch(
  283. "pydolphinscheduler.core.task.Task.gen_code_and_version",
  284. return_value=(12345, 1),
  285. )
  286. @patch(
  287. "pydolphinscheduler.tasks.condition.Conditions.gen_code_and_version",
  288. return_value=(123, 1),
  289. )
  290. def test_condition_get_define(mock_condition_code_version, mock_task_code_version):
  291. """Test task condition :func:`get_define`."""
  292. common_task = Task(name="common_task", task_type="test_task_condition")
  293. cond_operator = And(
  294. And(
  295. SUCCESS(common_task, common_task),
  296. FAILURE(common_task, common_task),
  297. ),
  298. Or(
  299. SUCCESS(common_task, common_task),
  300. FAILURE(common_task, common_task),
  301. ),
  302. )
  303. name = "test_condition_get_define"
  304. expect = {
  305. "code": 123,
  306. "name": name,
  307. "version": 1,
  308. "description": None,
  309. "delayTime": 0,
  310. "taskType": "CONDITIONS",
  311. "taskParams": {
  312. "resourceList": [],
  313. "localParams": [],
  314. "dependence": {
  315. "relation": "AND",
  316. "dependTaskList": [
  317. {
  318. "relation": "AND",
  319. "dependItemList": [
  320. {"depTaskCode": common_task.code, "status": "SUCCESS"},
  321. {"depTaskCode": common_task.code, "status": "SUCCESS"},
  322. {"depTaskCode": common_task.code, "status": "FAILURE"},
  323. {"depTaskCode": common_task.code, "status": "FAILURE"},
  324. ],
  325. },
  326. {
  327. "relation": "OR",
  328. "dependItemList": [
  329. {"depTaskCode": common_task.code, "status": "SUCCESS"},
  330. {"depTaskCode": common_task.code, "status": "SUCCESS"},
  331. {"depTaskCode": common_task.code, "status": "FAILURE"},
  332. {"depTaskCode": common_task.code, "status": "FAILURE"},
  333. ],
  334. },
  335. ],
  336. },
  337. "conditionResult": {
  338. "successNode": [common_task.code],
  339. "failedNode": [common_task.code],
  340. },
  341. "waitStartTimeout": {},
  342. },
  343. "flag": "YES",
  344. "taskPriority": "MEDIUM",
  345. "workerGroup": "default",
  346. "failRetryTimes": 0,
  347. "failRetryInterval": 1,
  348. "timeoutFlag": "CLOSE",
  349. "timeoutNotifyStrategy": None,
  350. "timeout": 0,
  351. }
  352. task = Conditions(
  353. name, condition=cond_operator, success_task=common_task, failed_task=common_task
  354. )
  355. assert task.get_define() == expect
  356. @patch(
  357. "pydolphinscheduler.core.task.Task.gen_code_and_version",
  358. return_value=(123, 1),
  359. )
  360. def test_condition_set_dep_workflow(mock_task_code_version):
  361. """Test task condition set dependence in workflow level."""
  362. with ProcessDefinition(name="test-condition-set-dep-workflow") as pd:
  363. pre_task_1 = Task(name="pre_task_1", task_type=TEST_TYPE)
  364. pre_task_2 = Task(name="pre_task_2", task_type=TEST_TYPE)
  365. pre_task_3 = Task(name="pre_task_3", task_type=TEST_TYPE)
  366. cond_operator = And(
  367. And(
  368. SUCCESS(pre_task_1, pre_task_2),
  369. FAILURE(pre_task_3),
  370. ),
  371. )
  372. success_branch = Task(name="success_branch", task_type=TEST_TYPE)
  373. fail_branch = Task(name="fail_branch", task_type=TEST_TYPE)
  374. condition = Conditions(
  375. name="conditions",
  376. condition=cond_operator,
  377. success_task=success_branch,
  378. failed_task=fail_branch,
  379. )
  380. # General tasks test
  381. assert len(pd.tasks) == 6
  382. assert sorted(pd.task_list, key=lambda t: t.name) == sorted(
  383. [
  384. pre_task_1,
  385. pre_task_2,
  386. pre_task_3,
  387. success_branch,
  388. fail_branch,
  389. condition,
  390. ],
  391. key=lambda t: t.name,
  392. )
  393. # Task dep test
  394. assert success_branch._upstream_task_codes == {condition.code}
  395. assert fail_branch._upstream_task_codes == {condition.code}
  396. assert condition._downstream_task_codes == {
  397. success_branch.code,
  398. fail_branch.code,
  399. }
  400. # Condition task dep after ProcessDefinition function get_define called
  401. assert condition._upstream_task_codes == {
  402. pre_task_1.code,
  403. pre_task_2.code,
  404. pre_task_3.code,
  405. }
  406. assert all(
  407. [
  408. child._downstream_task_codes == {condition.code}
  409. for child in [
  410. pre_task_1,
  411. pre_task_2,
  412. pre_task_3,
  413. ]
  414. ]
  415. )