123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169 |
- # Licensed to the Apache Software Foundation (ASF) under one
- # or more contributor license agreements. See the NOTICE file
- # distributed with this work for additional information
- # regarding copyright ownership. The ASF licenses this file
- # to you under the Apache License, Version 2.0 (the
- # "License"); you may not use this file except in compliance
- # with the License. You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing,
- # software distributed under the License is distributed on an
- # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- # KIND, either express or implied. See the License for the
- # specific language governing permissions and limitations
- # under the License.
- """Test module about function wrap task decorator."""
- from unittest.mock import patch
- import pytest
- from pydolphinscheduler.core.process_definition import ProcessDefinition
- from pydolphinscheduler.exceptions import PyDSParamException
- from pydolphinscheduler.tasks.func_wrap import task
- from tests.testing.decorator import foo as foo_decorator
- from tests.testing.task import Task
- PD_NAME = "test_process_definition"
- TASK_NAME = "test_task"
- @patch(
- "pydolphinscheduler.core.task.Task.gen_code_and_version", return_value=(12345, 1)
- )
- def test_single_task_outside(mock_code):
- """Test single decorator task which outside process definition."""
- @task
- def foo():
- print(TASK_NAME)
- with ProcessDefinition(PD_NAME) as pd:
- foo()
- assert pd is not None and pd.name == PD_NAME
- assert len(pd.tasks) == 1
- pd_task = pd.tasks[12345]
- assert pd_task.name == "foo"
- assert pd_task.raw_script == "def foo():\n print(TASK_NAME)\nfoo()"
- @patch(
- "pydolphinscheduler.core.task.Task.gen_code_and_version", return_value=(12345, 1)
- )
- def test_single_task_inside(mock_code):
- """Test single decorator task which inside process definition."""
- with ProcessDefinition(PD_NAME) as pd:
- @task
- def foo():
- print(TASK_NAME)
- foo()
- assert pd is not None and pd.name == PD_NAME
- assert len(pd.tasks) == 1
- pd_task = pd.tasks[12345]
- assert pd_task.name == "foo"
- assert pd_task.raw_script == "def foo():\n print(TASK_NAME)\nfoo()"
- @patch(
- "pydolphinscheduler.core.task.Task.gen_code_and_version", return_value=(12345, 1)
- )
- def test_addition_decorator_error(mock_code):
- """Test error when using task decorator to a function already have decorator."""
- @task
- @foo_decorator
- def foo():
- print(TASK_NAME)
- with ProcessDefinition(PD_NAME) as pd: # noqa: F841
- with pytest.raises(
- PyDSParamException, match="Do no support other decorators for.*"
- ):
- foo()
- @patch(
- "pydolphinscheduler.core.task.Task.gen_code_and_version",
- side_effect=Task("test_func_wrap", "func_wrap").gen_code_and_version,
- )
- def test_multiple_tasks_outside(mock_code):
- """Test multiple decorator tasks which outside process definition."""
- @task
- def foo():
- print(TASK_NAME)
- @task
- def bar():
- print(TASK_NAME)
- with ProcessDefinition(PD_NAME) as pd:
- foo = foo()
- bar = bar()
- foo >> bar
- assert pd is not None and pd.name == PD_NAME
- assert len(pd.tasks) == 2
- task_foo = pd.get_one_task_by_name("foo")
- task_bar = pd.get_one_task_by_name("bar")
- assert set(pd.task_list) == {task_foo, task_bar}
- assert (
- task_foo is not None
- and task_foo._upstream_task_codes == set()
- and task_foo._downstream_task_codes.pop() == task_bar.code
- )
- assert (
- task_bar is not None
- and task_bar._upstream_task_codes.pop() == task_foo.code
- and task_bar._downstream_task_codes == set()
- )
- @patch(
- "pydolphinscheduler.core.task.Task.gen_code_and_version",
- side_effect=Task("test_func_wrap", "func_wrap").gen_code_and_version,
- )
- def test_multiple_tasks_inside(mock_code):
- """Test multiple decorator tasks which inside process definition."""
- with ProcessDefinition(PD_NAME) as pd:
- @task
- def foo():
- print(TASK_NAME)
- @task
- def bar():
- print(TASK_NAME)
- foo = foo()
- bar = bar()
- foo >> bar
- assert pd is not None and pd.name == PD_NAME
- assert len(pd.tasks) == 2
- task_foo = pd.get_one_task_by_name("foo")
- task_bar = pd.get_one_task_by_name("bar")
- assert set(pd.task_list) == {task_foo, task_bar}
- assert (
- task_foo is not None
- and task_foo._upstream_task_codes == set()
- and task_foo._downstream_task_codes.pop() == task_bar.code
- )
- assert (
- task_bar is not None
- and task_bar._upstream_task_codes.pop() == task_foo.code
- and task_bar._downstream_task_codes == set()
- )
|