test_task.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169
  1. # Licensed to the Apache Software Foundation (ASF) under one
  2. # or more contributor license agreements. See the NOTICE file
  3. # distributed with this work for additional information
  4. # regarding copyright ownership. The ASF licenses this file
  5. # to you under the Apache License, Version 2.0 (the
  6. # "License"); you may not use this file except in compliance
  7. # with the License. You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing,
  12. # software distributed under the License is distributed on an
  13. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  14. # KIND, either express or implied. See the License for the
  15. # specific language governing permissions and limitations
  16. # under the License.
  17. """Test Task class function."""
  18. from unittest.mock import patch
  19. import pytest
  20. from pydolphinscheduler.core.task import TaskParams, TaskRelation, Task
  21. from tests.testing.task import Task as testTask
  22. def test_task_params_to_dict():
  23. """Test TaskParams object function to_dict."""
  24. raw_script = "test_task_params_to_dict"
  25. expect = {
  26. "resourceList": [],
  27. "localParams": [],
  28. "rawScript": raw_script,
  29. "dependence": {},
  30. "conditionResult": TaskParams.DEFAULT_CONDITION_RESULT,
  31. "waitStartTimeout": {},
  32. }
  33. task_param = TaskParams(raw_script=raw_script)
  34. assert task_param.to_dict() == expect
  35. def test_task_relation_to_dict():
  36. """Test TaskRelation object function to_dict."""
  37. pre_task_code = 123
  38. post_task_code = 456
  39. expect = {
  40. "name": "",
  41. "preTaskCode": pre_task_code,
  42. "postTaskCode": post_task_code,
  43. "preTaskVersion": 1,
  44. "postTaskVersion": 1,
  45. "conditionType": 0,
  46. "conditionParams": {},
  47. }
  48. task_param = TaskRelation(
  49. pre_task_code=pre_task_code, post_task_code=post_task_code
  50. )
  51. assert task_param.to_dict() == expect
  52. def test_task_to_dict():
  53. """Test Task object function to_dict."""
  54. code = 123
  55. version = 1
  56. name = "test_task_to_dict"
  57. task_type = "test_task_to_dict_type"
  58. raw_script = "test_task_params_to_dict"
  59. expect = {
  60. "code": code,
  61. "name": name,
  62. "version": version,
  63. "description": None,
  64. "delayTime": 0,
  65. "taskType": task_type,
  66. "taskParams": {
  67. "resourceList": [],
  68. "localParams": [],
  69. "rawScript": raw_script,
  70. "dependence": {},
  71. "conditionResult": {"successNode": [""], "failedNode": [""]},
  72. "waitStartTimeout": {},
  73. },
  74. "flag": "YES",
  75. "taskPriority": "MEDIUM",
  76. "workerGroup": "default",
  77. "failRetryTimes": 0,
  78. "failRetryInterval": 1,
  79. "timeoutFlag": "CLOSE",
  80. "timeoutNotifyStrategy": None,
  81. "timeout": 0,
  82. }
  83. with patch(
  84. "pydolphinscheduler.core.task.Task.gen_code_and_version",
  85. return_value=(code, version),
  86. ):
  87. task = Task(name=name, task_type=task_type, task_params=TaskParams(raw_script))
  88. assert task.to_dict() == expect
  89. @pytest.mark.parametrize("shift", ["<<", ">>"])
  90. def test_two_tasks_shift(shift: str):
  91. """Test bit operator between tasks.
  92. Here we test both `>>` and `<<` bit operator.
  93. """
  94. raw_script = "script"
  95. upstream = testTask(
  96. name="upstream", task_type=shift, task_params=TaskParams(raw_script)
  97. )
  98. downstream = testTask(
  99. name="downstream", task_type=shift, task_params=TaskParams(raw_script)
  100. )
  101. if shift == "<<":
  102. downstream << upstream
  103. elif shift == ">>":
  104. upstream >> downstream
  105. else:
  106. assert False, f"Unexpect bit operator type {shift}."
  107. assert (
  108. 1 == len(upstream._downstream_task_codes)
  109. and downstream.code in upstream._downstream_task_codes
  110. ), "Task downstream task attributes error, downstream codes size or specific code failed."
  111. assert (
  112. 1 == len(downstream._upstream_task_codes)
  113. and upstream.code in downstream._upstream_task_codes
  114. ), "Task upstream task attributes error, upstream codes size or upstream code failed."
  115. @pytest.mark.parametrize(
  116. "dep_expr, flag",
  117. [
  118. ("task << tasks", "upstream"),
  119. ("tasks << task", "downstream"),
  120. ("task >> tasks", "downstream"),
  121. ("tasks >> task", "upstream"),
  122. ],
  123. )
  124. def test_tasks_list_shift(dep_expr: str, flag: str):
  125. """Test bit operator between task and sequence of tasks.
  126. Here we test both `>>` and `<<` bit operator.
  127. """
  128. reverse_dict = {
  129. "upstream": "downstream",
  130. "downstream": "upstream",
  131. }
  132. task_type = "dep_task_and_tasks"
  133. raw_script = "script"
  134. task = testTask(
  135. name="upstream", task_type=task_type, task_params=TaskParams(raw_script)
  136. )
  137. tasks = [
  138. testTask(
  139. name="downstream1", task_type=task_type, task_params=TaskParams(raw_script)
  140. ),
  141. testTask(
  142. name="downstream2", task_type=task_type, task_params=TaskParams(raw_script)
  143. ),
  144. ]
  145. # Use build-in function eval to simply test case and reduce duplicate code
  146. eval(dep_expr)
  147. direction_attr = f"_{flag}_task_codes"
  148. reverse_direction_attr = f"_{reverse_dict[flag]}_task_codes"
  149. assert 2 == len(getattr(task, direction_attr))
  150. assert [t.code in getattr(task, direction_attr) for t in tasks]
  151. assert all([1 == len(getattr(t, reverse_direction_attr)) for t in tasks])
  152. assert all([task.code in getattr(t, reverse_direction_attr) for t in tasks])