test_condition.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439
  1. # Licensed to the Apache Software Foundation (ASF) under one
  2. # or more contributor license agreements. See the NOTICE file
  3. # distributed with this work for additional information
  4. # regarding copyright ownership. The ASF licenses this file
  5. # to you under the Apache License, Version 2.0 (the
  6. # "License"); you may not use this file except in compliance
  7. # with the License. You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing,
  12. # software distributed under the License is distributed on an
  13. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  14. # KIND, either express or implied. See the License for the
  15. # specific language governing permissions and limitations
  16. # under the License.
  17. """Test Task dependent."""
  18. from typing import List, Tuple
  19. from unittest.mock import patch
  20. import pytest
  21. from pydolphinscheduler.core.process_definition import ProcessDefinition
  22. from pydolphinscheduler.exceptions import PyDSParamException
  23. from pydolphinscheduler.tasks.condition import (
  24. FAILURE,
  25. SUCCESS,
  26. And,
  27. ConditionOperator,
  28. Conditions,
  29. Or,
  30. Status,
  31. )
  32. from tests.testing.task import Task
  33. TEST_NAME = "test-name"
  34. TEST_PROJECT = "test-project"
  35. TEST_PROCESS_DEFINITION = "test-process-definition"
  36. TEST_TYPE = "test-type"
  37. TEST_PROJECT_CODE, TEST_DEFINITION_CODE, TEST_TASK_CODE = 12345, 123456, 1234567
  38. TEST_OPERATOR_LIST = ("AND", "OR")
  39. @pytest.mark.parametrize(
  40. "obj, expect",
  41. [
  42. (Status, "STATUS"),
  43. (SUCCESS, "SUCCESS"),
  44. (FAILURE, "FAILURE"),
  45. ],
  46. )
  47. def test_class_status_status_name(obj: Status, expect: str):
  48. """Test class status and sub class property status_name."""
  49. assert obj.status_name() == expect
  50. @pytest.mark.parametrize(
  51. "obj, tasks",
  52. [
  53. (Status, (1, 2, 3)),
  54. (SUCCESS, (1.1, 2.2, 3.3)),
  55. (FAILURE, (ConditionOperator(1), ConditionOperator(2), ConditionOperator(3))),
  56. ],
  57. )
  58. def test_class_status_depend_item_list_no_expect_type(obj: Status, tasks: Tuple):
  59. """Test class status and sub class raise error when assign not support type."""
  60. with pytest.raises(
  61. PyDSParamException, match=".*?only accept class Task or sub class Task, but get"
  62. ):
  63. obj(*tasks).get_define()
  64. @pytest.mark.parametrize(
  65. "obj, tasks",
  66. [
  67. (Status, [Task(str(i), TEST_TYPE) for i in range(1)]),
  68. (Status, [Task(str(i), TEST_TYPE) for i in range(2)]),
  69. (Status, [Task(str(i), TEST_TYPE) for i in range(3)]),
  70. (SUCCESS, [Task(str(i), TEST_TYPE) for i in range(1)]),
  71. (SUCCESS, [Task(str(i), TEST_TYPE) for i in range(2)]),
  72. (SUCCESS, [Task(str(i), TEST_TYPE) for i in range(3)]),
  73. (FAILURE, [Task(str(i), TEST_TYPE) for i in range(1)]),
  74. (FAILURE, [Task(str(i), TEST_TYPE) for i in range(2)]),
  75. (FAILURE, [Task(str(i), TEST_TYPE) for i in range(3)]),
  76. ],
  77. )
  78. def test_class_status_depend_item_list(obj: Status, tasks: Tuple):
  79. """Test class status and sub class function :func:`depend_item_list`."""
  80. status = obj.status_name()
  81. expect = [
  82. {
  83. "depTaskCode": i.code,
  84. "status": status,
  85. }
  86. for i in tasks
  87. ]
  88. assert obj(*tasks).get_define() == expect
  89. @pytest.mark.parametrize(
  90. "obj, expect",
  91. [
  92. (ConditionOperator, "CONDITIONOPERATOR"),
  93. (And, "AND"),
  94. (Or, "OR"),
  95. ],
  96. )
  97. def test_condition_operator_operator_name(obj: ConditionOperator, expect: str):
  98. """Test class ConditionOperator and sub class class function :func:`operator_name`."""
  99. assert obj.operator_name() == expect
  100. @pytest.mark.parametrize(
  101. "obj, expect",
  102. [
  103. (ConditionOperator, "CONDITIONOPERATOR"),
  104. (And, "AND"),
  105. (Or, "OR"),
  106. ],
  107. )
  108. def test_condition_operator_relation(obj: ConditionOperator, expect: str):
  109. """Test class ConditionOperator and sub class class property `relation`."""
  110. assert obj(1).relation == expect
  111. @pytest.mark.parametrize(
  112. "obj, status_or_operator, match",
  113. [
  114. (
  115. ConditionOperator,
  116. [Status(Task("1", TEST_TYPE)), 1],
  117. ".*?operator parameter support ConditionTask and ConditionOperator.*?",
  118. ),
  119. (
  120. ConditionOperator,
  121. [
  122. Status(Task("1", TEST_TYPE)),
  123. 1.0,
  124. ],
  125. ".*?operator parameter support ConditionTask and ConditionOperator.*?",
  126. ),
  127. (
  128. ConditionOperator,
  129. [
  130. Status(Task("1", TEST_TYPE)),
  131. ConditionOperator(And(Status(Task("1", TEST_TYPE)))),
  132. ],
  133. ".*?operator parameter only support same type.",
  134. ),
  135. (
  136. ConditionOperator,
  137. [
  138. ConditionOperator(And(Status(Task("1", TEST_TYPE)))),
  139. Status(Task("1", TEST_TYPE)),
  140. ],
  141. ".*?operator parameter only support same type.",
  142. ),
  143. ],
  144. )
  145. def test_condition_operator_set_define_attr_not_support_type(
  146. obj, status_or_operator, match
  147. ):
  148. """Test class ConditionOperator parameter error, including parameter not same or type not support."""
  149. with pytest.raises(PyDSParamException, match=match):
  150. op = obj(*status_or_operator)
  151. op.set_define_attr()
  152. @pytest.mark.parametrize(
  153. "obj, task_num",
  154. [
  155. (ConditionOperator, 1),
  156. (ConditionOperator, 2),
  157. (ConditionOperator, 3),
  158. (And, 1),
  159. (And, 2),
  160. (And, 3),
  161. (Or, 1),
  162. (Or, 2),
  163. (Or, 3),
  164. ],
  165. )
  166. def test_condition_operator_set_define_attr_status(
  167. obj: ConditionOperator, task_num: int
  168. ):
  169. """Test :func:`set_define_attr` with one or more class status."""
  170. attr = "depend_item_list"
  171. tasks = [Task(str(i), TEST_TYPE) for i in range(task_num)]
  172. status = Status(*tasks)
  173. expect = [
  174. {"depTaskCode": task.code, "status": status.status_name()} for task in tasks
  175. ]
  176. co = obj(status)
  177. co.set_define_attr()
  178. assert getattr(co, attr) == expect
  179. @pytest.mark.parametrize(
  180. "obj, status",
  181. [
  182. (ConditionOperator, (SUCCESS, SUCCESS)),
  183. (ConditionOperator, (FAILURE, FAILURE)),
  184. (ConditionOperator, (SUCCESS, FAILURE)),
  185. (ConditionOperator, (FAILURE, SUCCESS)),
  186. (And, (SUCCESS, SUCCESS)),
  187. (And, (FAILURE, FAILURE)),
  188. (And, (SUCCESS, FAILURE)),
  189. (And, (FAILURE, SUCCESS)),
  190. (Or, (SUCCESS, SUCCESS)),
  191. (Or, (FAILURE, FAILURE)),
  192. (Or, (SUCCESS, FAILURE)),
  193. (Or, (FAILURE, SUCCESS)),
  194. ],
  195. )
  196. def test_condition_operator_set_define_attr_mix_status(
  197. obj: ConditionOperator, status: List[Status]
  198. ):
  199. """Test :func:`set_define_attr` with one or more mixed status."""
  200. attr = "depend_item_list"
  201. task = Task("test-operator", TEST_TYPE)
  202. status_list = []
  203. expect = []
  204. for sta in status:
  205. status_list.append(sta(task))
  206. expect.append({"depTaskCode": task.code, "status": sta.status_name()})
  207. co = obj(*status_list)
  208. co.set_define_attr()
  209. assert getattr(co, attr) == expect
  210. @pytest.mark.parametrize(
  211. "obj, task_num",
  212. [
  213. (ConditionOperator, 1),
  214. (ConditionOperator, 2),
  215. (ConditionOperator, 3),
  216. (And, 1),
  217. (And, 2),
  218. (And, 3),
  219. (Or, 1),
  220. (Or, 2),
  221. (Or, 3),
  222. ],
  223. )
  224. def test_condition_operator_set_define_attr_operator(
  225. obj: ConditionOperator, task_num: int
  226. ):
  227. """Test :func:`set_define_attr` with one or more class condition operator."""
  228. attr = "depend_task_list"
  229. task = Task("test-operator", TEST_TYPE)
  230. status = Status(task)
  231. expect = [
  232. {
  233. "relation": obj.operator_name(),
  234. "dependItemList": [
  235. {
  236. "depTaskCode": task.code,
  237. "status": status.status_name(),
  238. }
  239. ],
  240. }
  241. for _ in range(task_num)
  242. ]
  243. co = obj(*[obj(status) for _ in range(task_num)])
  244. co.set_define_attr()
  245. assert getattr(co, attr) == expect
  246. @pytest.mark.parametrize(
  247. "cond, sub_cond",
  248. [
  249. (ConditionOperator, (And, Or)),
  250. (ConditionOperator, (Or, And)),
  251. (And, (And, Or)),
  252. (And, (Or, And)),
  253. (Or, (And, Or)),
  254. (Or, (Or, And)),
  255. ],
  256. )
  257. def test_condition_operator_set_define_attr_mix_operator(
  258. cond: ConditionOperator, sub_cond: Tuple[ConditionOperator]
  259. ):
  260. """Test :func:`set_define_attr` with one or more class mix condition operator."""
  261. attr = "depend_task_list"
  262. task = Task("test-operator", TEST_TYPE)
  263. expect = []
  264. sub_condition = []
  265. for cond in sub_cond:
  266. status = Status(task)
  267. sub_condition.append(cond(status))
  268. expect.append(
  269. {
  270. "relation": cond.operator_name(),
  271. "dependItemList": [
  272. {
  273. "depTaskCode": task.code,
  274. "status": status.status_name(),
  275. }
  276. ],
  277. }
  278. )
  279. co = cond(*sub_condition)
  280. co.set_define_attr()
  281. assert getattr(co, attr) == expect
  282. @patch(
  283. "pydolphinscheduler.core.task.Task.gen_code_and_version",
  284. return_value=(12345, 1),
  285. )
  286. @patch(
  287. "pydolphinscheduler.tasks.condition.Conditions.gen_code_and_version",
  288. return_value=(123, 1),
  289. )
  290. def test_dependent_get_define(mock_condition_code_version, mock_task_code_version):
  291. """Test task condition :func:`get_define`."""
  292. common_task = Task(name="common_task", task_type="test_task_condition")
  293. cond_operator = And(
  294. And(
  295. SUCCESS(common_task, common_task),
  296. FAILURE(common_task, common_task),
  297. ),
  298. Or(
  299. SUCCESS(common_task, common_task),
  300. FAILURE(common_task, common_task),
  301. ),
  302. )
  303. name = "test_condition_get_define"
  304. expect = {
  305. "code": 123,
  306. "name": name,
  307. "version": 1,
  308. "description": None,
  309. "delayTime": 0,
  310. "taskType": "CONDITIONS",
  311. "taskParams": {
  312. "resourceList": [],
  313. "localParams": [],
  314. "dependence": {
  315. "relation": "AND",
  316. "dependTaskList": [
  317. {
  318. "relation": "AND",
  319. "dependItemList": [
  320. {"depTaskCode": common_task.code, "status": "SUCCESS"},
  321. {"depTaskCode": common_task.code, "status": "SUCCESS"},
  322. {"depTaskCode": common_task.code, "status": "FAILURE"},
  323. {"depTaskCode": common_task.code, "status": "FAILURE"},
  324. ],
  325. },
  326. {
  327. "relation": "OR",
  328. "dependItemList": [
  329. {"depTaskCode": common_task.code, "status": "SUCCESS"},
  330. {"depTaskCode": common_task.code, "status": "SUCCESS"},
  331. {"depTaskCode": common_task.code, "status": "FAILURE"},
  332. {"depTaskCode": common_task.code, "status": "FAILURE"},
  333. ],
  334. },
  335. ],
  336. },
  337. "conditionResult": {"successNode": [""], "failedNode": [""]},
  338. "waitStartTimeout": {},
  339. },
  340. "flag": "YES",
  341. "taskPriority": "MEDIUM",
  342. "workerGroup": "default",
  343. "failRetryTimes": 0,
  344. "failRetryInterval": 1,
  345. "timeoutFlag": "CLOSE",
  346. "timeoutNotifyStrategy": None,
  347. "timeout": 0,
  348. }
  349. task = Conditions(name, condition=cond_operator)
  350. assert task.get_define() == expect
  351. @patch(
  352. "pydolphinscheduler.core.task.Task.gen_code_and_version",
  353. return_value=(123, 1),
  354. )
  355. def test_condition_set_dep_workflow(mock_task_code_version):
  356. """Test task condition set dependence in workflow level."""
  357. with ProcessDefinition(name="test-condition-set-dep-workflow") as pd:
  358. parent = Task(name="parent", task_type=TEST_TYPE)
  359. condition_success_1 = Task(name="condition_success_1", task_type=TEST_TYPE)
  360. condition_success_2 = Task(name="condition_success_2", task_type=TEST_TYPE)
  361. condition_fail = Task(name="condition_fail", task_type=TEST_TYPE)
  362. cond_operator = And(
  363. And(
  364. SUCCESS(condition_success_1, condition_success_2),
  365. FAILURE(condition_fail),
  366. ),
  367. )
  368. condition = Conditions(name=TEST_NAME, condition=cond_operator)
  369. parent >> condition
  370. # General tasks test
  371. assert len(pd.tasks) == 5
  372. assert sorted(pd.task_list, key=lambda t: t.name) == sorted(
  373. [
  374. parent,
  375. condition,
  376. condition_success_1,
  377. condition_success_2,
  378. condition_fail,
  379. ],
  380. key=lambda t: t.name,
  381. )
  382. # Task dep test
  383. assert parent._downstream_task_codes == {condition.code}
  384. assert condition._upstream_task_codes == {parent.code}
  385. # Condition task dep after ProcessDefinition function get_define called
  386. assert condition._downstream_task_codes == {
  387. condition_success_1.code,
  388. condition_success_2.code,
  389. condition_fail.code,
  390. }
  391. assert all(
  392. [
  393. child._upstream_task_codes == {condition.code}
  394. for child in [condition_success_1, condition_success_2, condition_fail]
  395. ]
  396. )