test_task.py 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278
  1. # Licensed to the Apache Software Foundation (ASF) under one
  2. # or more contributor license agreements. See the NOTICE file
  3. # distributed with this work for additional information
  4. # regarding copyright ownership. The ASF licenses this file
  5. # to you under the Apache License, Version 2.0 (the
  6. # "License"); you may not use this file except in compliance
  7. # with the License. You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing,
  12. # software distributed under the License is distributed on an
  13. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  14. # KIND, either express or implied. See the License for the
  15. # specific language governing permissions and limitations
  16. # under the License.
  17. """Test Task class function."""
  18. import logging
  19. import re
  20. from unittest.mock import patch
  21. import pytest
  22. from pydolphinscheduler.core.process_definition import ProcessDefinition
  23. from pydolphinscheduler.core.task import Task, TaskRelation
  24. from tests.testing.task import Task as testTask
  25. from tests.testing.task import TaskWithCode
  26. TEST_TASK_RELATION_SET = set()
  27. TEST_TASK_RELATION_SIZE = 0
  28. @pytest.mark.parametrize(
  29. "attr, expect",
  30. [
  31. (
  32. dict(),
  33. {
  34. "localParams": [],
  35. "resourceList": [],
  36. "dependence": {},
  37. "waitStartTimeout": {},
  38. "conditionResult": {"successNode": [""], "failedNode": [""]},
  39. },
  40. ),
  41. (
  42. {
  43. "local_params": ["foo", "bar"],
  44. "resource_list": ["foo", "bar"],
  45. "dependence": {"foo", "bar"},
  46. "wait_start_timeout": {"foo", "bar"},
  47. "condition_result": {"foo": ["bar"]},
  48. },
  49. {
  50. "localParams": ["foo", "bar"],
  51. "resourceList": [{"id": 1}],
  52. "dependence": {"foo", "bar"},
  53. "waitStartTimeout": {"foo", "bar"},
  54. "conditionResult": {"foo": ["bar"]},
  55. },
  56. ),
  57. ],
  58. )
  59. @patch(
  60. "pydolphinscheduler.core.task.Task.query_resource",
  61. return_value=({"id": 1, "name": "foo"}),
  62. )
  63. def test_property_task_params(mock_resource, attr, expect):
  64. """Test class task property."""
  65. task = testTask(
  66. "test-property-task-params",
  67. "test-task",
  68. **attr,
  69. )
  70. assert expect == task.task_params
  71. @pytest.mark.parametrize(
  72. "pre_code, post_code, expect",
  73. [
  74. (123, 456, hash("123 -> 456")),
  75. (12345678, 987654321, hash("12345678 -> 987654321")),
  76. ],
  77. )
  78. def test_task_relation_hash_func(pre_code, post_code, expect):
  79. """Test TaskRelation magic function :func:`__hash__`."""
  80. task_param = TaskRelation(pre_task_code=pre_code, post_task_code=post_code)
  81. assert hash(task_param) == expect
  82. @pytest.mark.parametrize(
  83. "pre_code, post_code, size_add",
  84. [
  85. (123, 456, 1),
  86. (123, 456, 0),
  87. (456, 456, 1),
  88. (123, 123, 1),
  89. (456, 123, 1),
  90. (0, 456, 1),
  91. (123, 0, 1),
  92. ],
  93. )
  94. def test_task_relation_add_to_set(pre_code, post_code, size_add):
  95. """Test TaskRelation with different pre_code and post_code add to set behavior.
  96. Here we use global variable to keep set of :class:`TaskRelation` instance and the number we expect
  97. of the size when we add a new task relation to exists set.
  98. """
  99. task_relation = TaskRelation(pre_task_code=pre_code, post_task_code=post_code)
  100. TEST_TASK_RELATION_SET.add(task_relation)
  101. # hint python interpreter use global variable instead of local's
  102. global TEST_TASK_RELATION_SIZE
  103. TEST_TASK_RELATION_SIZE += size_add
  104. assert len(TEST_TASK_RELATION_SET) == TEST_TASK_RELATION_SIZE
  105. def test_task_relation_to_dict():
  106. """Test TaskRelation object function to_dict."""
  107. pre_task_code = 123
  108. post_task_code = 456
  109. expect = {
  110. "name": "",
  111. "preTaskCode": pre_task_code,
  112. "postTaskCode": post_task_code,
  113. "preTaskVersion": 1,
  114. "postTaskVersion": 1,
  115. "conditionType": 0,
  116. "conditionParams": {},
  117. }
  118. task_relation = TaskRelation(
  119. pre_task_code=pre_task_code, post_task_code=post_task_code
  120. )
  121. assert task_relation.get_define() == expect
  122. def test_task_get_define():
  123. """Test Task object function get_define."""
  124. code = 123
  125. version = 1
  126. name = "test_task_get_define"
  127. task_type = "test_task_get_define_type"
  128. expect = {
  129. "code": code,
  130. "name": name,
  131. "version": version,
  132. "description": None,
  133. "delayTime": 0,
  134. "taskType": task_type,
  135. "taskParams": {
  136. "resourceList": [],
  137. "localParams": [],
  138. "dependence": {},
  139. "conditionResult": {"successNode": [""], "failedNode": [""]},
  140. "waitStartTimeout": {},
  141. },
  142. "flag": "YES",
  143. "taskPriority": "MEDIUM",
  144. "workerGroup": "default",
  145. "failRetryTimes": 0,
  146. "failRetryInterval": 1,
  147. "timeoutFlag": "CLOSE",
  148. "timeoutNotifyStrategy": None,
  149. "timeout": 0,
  150. }
  151. with patch(
  152. "pydolphinscheduler.core.task.Task.gen_code_and_version",
  153. return_value=(code, version),
  154. ):
  155. task = Task(name=name, task_type=task_type)
  156. assert task.get_define() == expect
  157. @pytest.mark.parametrize("shift", ["<<", ">>"])
  158. def test_two_tasks_shift(shift: str):
  159. """Test bit operator between tasks.
  160. Here we test both `>>` and `<<` bit operator.
  161. """
  162. upstream = testTask(name="upstream", task_type=shift)
  163. downstream = testTask(name="downstream", task_type=shift)
  164. if shift == "<<":
  165. downstream << upstream
  166. elif shift == ">>":
  167. upstream >> downstream
  168. else:
  169. assert False, f"Unexpect bit operator type {shift}."
  170. assert (
  171. 1 == len(upstream._downstream_task_codes)
  172. and downstream.code in upstream._downstream_task_codes
  173. ), "Task downstream task attributes error, downstream codes size or specific code failed."
  174. assert (
  175. 1 == len(downstream._upstream_task_codes)
  176. and upstream.code in downstream._upstream_task_codes
  177. ), "Task upstream task attributes error, upstream codes size or upstream code failed."
  178. @pytest.mark.parametrize(
  179. "dep_expr, flag",
  180. [
  181. ("task << tasks", "upstream"),
  182. ("tasks << task", "downstream"),
  183. ("task >> tasks", "downstream"),
  184. ("tasks >> task", "upstream"),
  185. ],
  186. )
  187. def test_tasks_list_shift(dep_expr: str, flag: str):
  188. """Test bit operator between task and sequence of tasks.
  189. Here we test both `>>` and `<<` bit operator.
  190. """
  191. reverse_dict = {
  192. "upstream": "downstream",
  193. "downstream": "upstream",
  194. }
  195. task_type = "dep_task_and_tasks"
  196. task = testTask(name="upstream", task_type=task_type)
  197. tasks = [
  198. testTask(name="downstream1", task_type=task_type),
  199. testTask(name="downstream2", task_type=task_type),
  200. ]
  201. # Use build-in function eval to simply test case and reduce duplicate code
  202. eval(dep_expr)
  203. direction_attr = f"_{flag}_task_codes"
  204. reverse_direction_attr = f"_{reverse_dict[flag]}_task_codes"
  205. assert 2 == len(getattr(task, direction_attr))
  206. assert [t.code in getattr(task, direction_attr) for t in tasks]
  207. assert all([1 == len(getattr(t, reverse_direction_attr)) for t in tasks])
  208. assert all([task.code in getattr(t, reverse_direction_attr) for t in tasks])
  209. def test_add_duplicate(caplog):
  210. """Test add task which code already in process definition."""
  211. with ProcessDefinition("test_add_duplicate_workflow") as _:
  212. TaskWithCode(name="test_task_1", task_type="test", code=123, version=1)
  213. with caplog.at_level(logging.WARNING):
  214. TaskWithCode(
  215. name="test_task_duplicate_code", task_type="test", code=123, version=2
  216. )
  217. assert all(
  218. [
  219. caplog.text.startswith("WARNING pydolphinscheduler"),
  220. re.findall("already in process definition", caplog.text),
  221. ]
  222. )
  223. @pytest.mark.parametrize(
  224. "resources, expect",
  225. [
  226. (
  227. ["/dev/test.py"],
  228. [{"id": 1}],
  229. ),
  230. (
  231. ["/dev/test.py", {"id": 2}],
  232. [{"id": 1}, {"id": 2}],
  233. ),
  234. ],
  235. )
  236. @patch(
  237. "pydolphinscheduler.core.task.Task.gen_code_and_version",
  238. return_value=(123, 1),
  239. )
  240. @patch(
  241. "pydolphinscheduler.core.task.Task.query_resource",
  242. return_value=({"id": 1, "name": "/dev/test.py"}),
  243. )
  244. def test_python_resource_list(mock_code_version, mock_resource, resources, expect):
  245. """Test python task resource list."""
  246. task = Task(
  247. name="python_resource_list.",
  248. task_type="PYTHON",
  249. resource_list=resources,
  250. )
  251. assert task.resource_list == expect