test_task.py 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224
  1. # Licensed to the Apache Software Foundation (ASF) under one
  2. # or more contributor license agreements. See the NOTICE file
  3. # distributed with this work for additional information
  4. # regarding copyright ownership. The ASF licenses this file
  5. # to you under the Apache License, Version 2.0 (the
  6. # "License"); you may not use this file except in compliance
  7. # with the License. You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing,
  12. # software distributed under the License is distributed on an
  13. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  14. # KIND, either express or implied. See the License for the
  15. # specific language governing permissions and limitations
  16. # under the License.
  17. """Test Task class function."""
  18. from unittest.mock import patch
  19. import pytest
  20. from pydolphinscheduler.core.task import Task, TaskRelation
  21. from tests.testing.task import Task as testTask
  22. TEST_TASK_RELATION_SET = set()
  23. TEST_TASK_RELATION_SIZE = 0
  24. @pytest.mark.parametrize(
  25. "attr, expect",
  26. [
  27. (
  28. dict(),
  29. {
  30. "localParams": [],
  31. "resourceList": [],
  32. "dependence": {},
  33. "waitStartTimeout": {},
  34. "conditionResult": {"successNode": [""], "failedNode": [""]},
  35. },
  36. ),
  37. (
  38. {
  39. "local_params": ["foo", "bar"],
  40. "resource_list": ["foo", "bar"],
  41. "dependence": {"foo", "bar"},
  42. "wait_start_timeout": {"foo", "bar"},
  43. "condition_result": {"foo": ["bar"]},
  44. },
  45. {
  46. "localParams": ["foo", "bar"],
  47. "resourceList": ["foo", "bar"],
  48. "dependence": {"foo", "bar"},
  49. "waitStartTimeout": {"foo", "bar"},
  50. "conditionResult": {"foo": ["bar"]},
  51. },
  52. ),
  53. ],
  54. )
  55. def test_property_task_params(attr, expect):
  56. """Test class task property."""
  57. task = testTask(
  58. "test-property-task-params",
  59. "test-task",
  60. **attr,
  61. )
  62. assert expect == task.task_params
  63. @pytest.mark.parametrize(
  64. "pre_code, post_code, expect",
  65. [
  66. (123, 456, hash("123 -> 456")),
  67. (12345678, 987654321, hash("12345678 -> 987654321")),
  68. ],
  69. )
  70. def test_task_relation_hash_func(pre_code, post_code, expect):
  71. """Test TaskRelation magic function :func:`__hash__`."""
  72. task_param = TaskRelation(pre_task_code=pre_code, post_task_code=post_code)
  73. assert hash(task_param) == expect
  74. @pytest.mark.parametrize(
  75. "pre_code, post_code, size_add",
  76. [
  77. (123, 456, 1),
  78. (123, 456, 0),
  79. (456, 456, 1),
  80. (123, 123, 1),
  81. (456, 123, 1),
  82. (0, 456, 1),
  83. (123, 0, 1),
  84. ],
  85. )
  86. def test_task_relation_add_to_set(pre_code, post_code, size_add):
  87. """Test TaskRelation with different pre_code and post_code add to set behavior.
  88. Here we use global variable to keep set of :class:`TaskRelation` instance and the number we expect
  89. of the size when we add a new task relation to exists set.
  90. """
  91. task_relation = TaskRelation(pre_task_code=pre_code, post_task_code=post_code)
  92. TEST_TASK_RELATION_SET.add(task_relation)
  93. # hint python interpreter use global variable instead of local's
  94. global TEST_TASK_RELATION_SIZE
  95. TEST_TASK_RELATION_SIZE += size_add
  96. assert len(TEST_TASK_RELATION_SET) == TEST_TASK_RELATION_SIZE
  97. def test_task_relation_to_dict():
  98. """Test TaskRelation object function to_dict."""
  99. pre_task_code = 123
  100. post_task_code = 456
  101. expect = {
  102. "name": "",
  103. "preTaskCode": pre_task_code,
  104. "postTaskCode": post_task_code,
  105. "preTaskVersion": 1,
  106. "postTaskVersion": 1,
  107. "conditionType": 0,
  108. "conditionParams": {},
  109. }
  110. task_relation = TaskRelation(
  111. pre_task_code=pre_task_code, post_task_code=post_task_code
  112. )
  113. assert task_relation.get_define() == expect
  114. def test_task_get_define():
  115. """Test Task object function get_define."""
  116. code = 123
  117. version = 1
  118. name = "test_task_get_define"
  119. task_type = "test_task_get_define_type"
  120. expect = {
  121. "code": code,
  122. "name": name,
  123. "version": version,
  124. "description": None,
  125. "delayTime": 0,
  126. "taskType": task_type,
  127. "taskParams": {
  128. "resourceList": [],
  129. "localParams": [],
  130. "dependence": {},
  131. "conditionResult": {"successNode": [""], "failedNode": [""]},
  132. "waitStartTimeout": {},
  133. },
  134. "flag": "YES",
  135. "taskPriority": "MEDIUM",
  136. "workerGroup": "default",
  137. "failRetryTimes": 0,
  138. "failRetryInterval": 1,
  139. "timeoutFlag": "CLOSE",
  140. "timeoutNotifyStrategy": None,
  141. "timeout": 0,
  142. }
  143. with patch(
  144. "pydolphinscheduler.core.task.Task.gen_code_and_version",
  145. return_value=(code, version),
  146. ):
  147. task = Task(name=name, task_type=task_type)
  148. assert task.get_define() == expect
  149. @pytest.mark.parametrize("shift", ["<<", ">>"])
  150. def test_two_tasks_shift(shift: str):
  151. """Test bit operator between tasks.
  152. Here we test both `>>` and `<<` bit operator.
  153. """
  154. upstream = testTask(name="upstream", task_type=shift)
  155. downstream = testTask(name="downstream", task_type=shift)
  156. if shift == "<<":
  157. downstream << upstream
  158. elif shift == ">>":
  159. upstream >> downstream
  160. else:
  161. assert False, f"Unexpect bit operator type {shift}."
  162. assert (
  163. 1 == len(upstream._downstream_task_codes)
  164. and downstream.code in upstream._downstream_task_codes
  165. ), "Task downstream task attributes error, downstream codes size or specific code failed."
  166. assert (
  167. 1 == len(downstream._upstream_task_codes)
  168. and upstream.code in downstream._upstream_task_codes
  169. ), "Task upstream task attributes error, upstream codes size or upstream code failed."
  170. @pytest.mark.parametrize(
  171. "dep_expr, flag",
  172. [
  173. ("task << tasks", "upstream"),
  174. ("tasks << task", "downstream"),
  175. ("task >> tasks", "downstream"),
  176. ("tasks >> task", "upstream"),
  177. ],
  178. )
  179. def test_tasks_list_shift(dep_expr: str, flag: str):
  180. """Test bit operator between task and sequence of tasks.
  181. Here we test both `>>` and `<<` bit operator.
  182. """
  183. reverse_dict = {
  184. "upstream": "downstream",
  185. "downstream": "upstream",
  186. }
  187. task_type = "dep_task_and_tasks"
  188. task = testTask(name="upstream", task_type=task_type)
  189. tasks = [
  190. testTask(name="downstream1", task_type=task_type),
  191. testTask(name="downstream2", task_type=task_type),
  192. ]
  193. # Use build-in function eval to simply test case and reduce duplicate code
  194. eval(dep_expr)
  195. direction_attr = f"_{flag}_task_codes"
  196. reverse_direction_attr = f"_{reverse_dict[flag]}_task_codes"
  197. assert 2 == len(getattr(task, direction_attr))
  198. assert [t.code in getattr(task, direction_attr) for t in tasks]
  199. assert all([1 == len(getattr(t, reverse_direction_attr)) for t in tasks])
  200. assert all([task.code in getattr(t, reverse_direction_attr) for t in tasks])