test_func_wrap.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169
  1. # Licensed to the Apache Software Foundation (ASF) under one
  2. # or more contributor license agreements. See the NOTICE file
  3. # distributed with this work for additional information
  4. # regarding copyright ownership. The ASF licenses this file
  5. # to you under the Apache License, Version 2.0 (the
  6. # "License"); you may not use this file except in compliance
  7. # with the License. You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing,
  12. # software distributed under the License is distributed on an
  13. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  14. # KIND, either express or implied. See the License for the
  15. # specific language governing permissions and limitations
  16. # under the License.
  17. """Test module about function wrap task decorator."""
  18. from unittest.mock import patch
  19. import pytest
  20. from pydolphinscheduler.core.process_definition import ProcessDefinition
  21. from pydolphinscheduler.exceptions import PyDSParamException
  22. from pydolphinscheduler.tasks.func_wrap import task
  23. from tests.testing.decorator import foo as foo_decorator
  24. from tests.testing.task import Task
  25. PD_NAME = "test_process_definition"
  26. TASK_NAME = "test_task"
  27. @patch(
  28. "pydolphinscheduler.core.task.Task.gen_code_and_version", return_value=(12345, 1)
  29. )
  30. def test_single_task_outside(mock_code):
  31. """Test single decorator task which outside process definition."""
  32. @task
  33. def foo():
  34. print(TASK_NAME)
  35. with ProcessDefinition(PD_NAME) as pd:
  36. foo()
  37. assert pd is not None and pd.name == PD_NAME
  38. assert len(pd.tasks) == 1
  39. pd_task = pd.tasks[12345]
  40. assert pd_task.name == "foo"
  41. assert pd_task.raw_script == "def foo():\n print(TASK_NAME)\nfoo()"
  42. @patch(
  43. "pydolphinscheduler.core.task.Task.gen_code_and_version", return_value=(12345, 1)
  44. )
  45. def test_single_task_inside(mock_code):
  46. """Test single decorator task which inside process definition."""
  47. with ProcessDefinition(PD_NAME) as pd:
  48. @task
  49. def foo():
  50. print(TASK_NAME)
  51. foo()
  52. assert pd is not None and pd.name == PD_NAME
  53. assert len(pd.tasks) == 1
  54. pd_task = pd.tasks[12345]
  55. assert pd_task.name == "foo"
  56. assert pd_task.raw_script == "def foo():\n print(TASK_NAME)\nfoo()"
  57. @patch(
  58. "pydolphinscheduler.core.task.Task.gen_code_and_version", return_value=(12345, 1)
  59. )
  60. def test_addition_decorator_error(mock_code):
  61. """Test error when using task decorator to a function already have decorator."""
  62. @task
  63. @foo_decorator
  64. def foo():
  65. print(TASK_NAME)
  66. with ProcessDefinition(PD_NAME) as pd: # noqa: F841
  67. with pytest.raises(
  68. PyDSParamException, match="Do no support other decorators for.*"
  69. ):
  70. foo()
  71. @patch(
  72. "pydolphinscheduler.core.task.Task.gen_code_and_version",
  73. side_effect=Task("test_func_wrap", "func_wrap").gen_code_and_version,
  74. )
  75. def test_multiple_tasks_outside(mock_code):
  76. """Test multiple decorator tasks which outside process definition."""
  77. @task
  78. def foo():
  79. print(TASK_NAME)
  80. @task
  81. def bar():
  82. print(TASK_NAME)
  83. with ProcessDefinition(PD_NAME) as pd:
  84. foo = foo()
  85. bar = bar()
  86. foo >> bar
  87. assert pd is not None and pd.name == PD_NAME
  88. assert len(pd.tasks) == 2
  89. task_foo = pd.get_one_task_by_name("foo")
  90. task_bar = pd.get_one_task_by_name("bar")
  91. assert set(pd.task_list) == {task_foo, task_bar}
  92. assert (
  93. task_foo is not None
  94. and task_foo._upstream_task_codes == set()
  95. and task_foo._downstream_task_codes.pop() == task_bar.code
  96. )
  97. assert (
  98. task_bar is not None
  99. and task_bar._upstream_task_codes.pop() == task_foo.code
  100. and task_bar._downstream_task_codes == set()
  101. )
  102. @patch(
  103. "pydolphinscheduler.core.task.Task.gen_code_and_version",
  104. side_effect=Task("test_func_wrap", "func_wrap").gen_code_and_version,
  105. )
  106. def test_multiple_tasks_inside(mock_code):
  107. """Test multiple decorator tasks which inside process definition."""
  108. with ProcessDefinition(PD_NAME) as pd:
  109. @task
  110. def foo():
  111. print(TASK_NAME)
  112. @task
  113. def bar():
  114. print(TASK_NAME)
  115. foo = foo()
  116. bar = bar()
  117. foo >> bar
  118. assert pd is not None and pd.name == PD_NAME
  119. assert len(pd.tasks) == 2
  120. task_foo = pd.get_one_task_by_name("foo")
  121. task_bar = pd.get_one_task_by_name("bar")
  122. assert set(pd.task_list) == {task_foo, task_bar}
  123. assert (
  124. task_foo is not None
  125. and task_foo._upstream_task_codes == set()
  126. and task_foo._downstream_task_codes.pop() == task_bar.code
  127. )
  128. assert (
  129. task_bar is not None
  130. and task_bar._upstream_task_codes.pop() == task_foo.code
  131. and task_bar._downstream_task_codes == set()
  132. )