test_process_definition.py 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175
  1. # Licensed to the Apache Software Foundation (ASF) under one
  2. # or more contributor license agreements. See the NOTICE file
  3. # distributed with this work for additional information
  4. # regarding copyright ownership. The ASF licenses this file
  5. # to you under the Apache License, Version 2.0 (the
  6. # "License"); you may not use this file except in compliance
  7. # with the License. You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing,
  12. # software distributed under the License is distributed on an
  13. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  14. # KIND, either express or implied. See the License for the
  15. # specific language governing permissions and limitations
  16. # under the License.
  17. """Test process definition."""
  18. import pytest
  19. from pydolphinscheduler.constants import (
  20. ProcessDefinitionDefault,
  21. ProcessDefinitionReleaseState,
  22. )
  23. from pydolphinscheduler.core.process_definition import ProcessDefinition
  24. from pydolphinscheduler.core.task import TaskParams
  25. from pydolphinscheduler.side import Tenant, Project, User
  26. from tests.testing.task import Task
  27. TEST_PROCESS_DEFINITION_NAME = "simple-test-process-definition"
  28. @pytest.mark.parametrize("func", ["run", "submit", "start"])
  29. def test_process_definition_key_attr(func):
  30. """Test process definition have specific functions or attributes."""
  31. with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
  32. assert hasattr(
  33. pd, func
  34. ), f"ProcessDefinition instance don't have attribute `{func}`"
  35. @pytest.mark.parametrize(
  36. "name,value",
  37. [
  38. ("project", Project(ProcessDefinitionDefault.PROJECT)),
  39. ("tenant", Tenant(ProcessDefinitionDefault.TENANT)),
  40. (
  41. "user",
  42. User(
  43. ProcessDefinitionDefault.USER,
  44. ProcessDefinitionDefault.USER_PWD,
  45. ProcessDefinitionDefault.USER_EMAIL,
  46. ProcessDefinitionDefault.USER_PHONE,
  47. ProcessDefinitionDefault.TENANT,
  48. ProcessDefinitionDefault.QUEUE,
  49. ProcessDefinitionDefault.USER_STATE,
  50. ),
  51. ),
  52. ("worker_group", ProcessDefinitionDefault.WORKER_GROUP),
  53. ("release_state", ProcessDefinitionReleaseState.ONLINE),
  54. ],
  55. )
  56. def test_process_definition_default_value(name, value):
  57. """Test process definition default attributes."""
  58. with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
  59. assert getattr(pd, name) == value, (
  60. f"ProcessDefinition instance attribute `{name}` not with "
  61. f"except default value `{getattr(pd, name)}`"
  62. )
  63. @pytest.mark.parametrize(
  64. "name,cls,expect",
  65. [
  66. ("project", Project, "project"),
  67. ("tenant", Tenant, "tenant"),
  68. ("worker_group", str, "worker_group"),
  69. ],
  70. )
  71. def test_process_definition_set_attr(name, cls, expect):
  72. """Test process definition set specific attributes."""
  73. with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
  74. setattr(pd, name, cls(expect))
  75. assert getattr(pd, name) == cls(
  76. expect
  77. ), f"ProcessDefinition set attribute `{name}` do not work expect"
  78. def test_process_definition_to_dict_without_task():
  79. """Test process definition function to_dict without task."""
  80. expect = {
  81. "name": TEST_PROCESS_DEFINITION_NAME,
  82. "description": None,
  83. "project": ProcessDefinitionDefault.PROJECT,
  84. "tenant": ProcessDefinitionDefault.TENANT,
  85. "workerGroup": ProcessDefinitionDefault.WORKER_GROUP,
  86. "timeout": 0,
  87. "releaseState": ProcessDefinitionReleaseState.ONLINE,
  88. "param": None,
  89. "tasks": {},
  90. "taskDefinitionJson": [{}],
  91. "taskRelationJson": [{}],
  92. }
  93. with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
  94. assert pd.to_dict() == expect
  95. def test_process_definition_simple():
  96. """Test process definition simple create workflow, including process definition, task, relation define."""
  97. expect_tasks_num = 5
  98. with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
  99. for i in range(expect_tasks_num):
  100. task_params = TaskParams(raw_script=f"test-raw-script-{i}")
  101. curr_task = Task(
  102. name=f"task-{i}", task_type=f"type-{i}", task_params=task_params
  103. )
  104. # Set deps task i as i-1 parent
  105. if i > 0:
  106. pre_task = pd.get_one_task_by_name(f"task-{i - 1}")
  107. curr_task.set_upstream(pre_task)
  108. assert len(pd.tasks) == expect_tasks_num
  109. # Test if task process_definition same as origin one
  110. task: Task = pd.get_one_task_by_name("task-0")
  111. assert pd is task.process_definition
  112. # Test if all tasks with expect deps
  113. for i in range(expect_tasks_num):
  114. task: Task = pd.get_one_task_by_name(f"task-{i}")
  115. if i == 0:
  116. assert task._upstream_task_codes == set()
  117. assert task._downstream_task_codes == {
  118. pd.get_one_task_by_name("task-1").code
  119. }
  120. elif i == expect_tasks_num - 1:
  121. assert task._upstream_task_codes == {
  122. pd.get_one_task_by_name(f"task-{i - 1}").code
  123. }
  124. assert task._downstream_task_codes == set()
  125. else:
  126. assert task._upstream_task_codes == {
  127. pd.get_one_task_by_name(f"task-{i - 1}").code
  128. }
  129. assert task._downstream_task_codes == {
  130. pd.get_one_task_by_name(f"task-{i + 1}").code
  131. }
  132. @pytest.mark.parametrize(
  133. "user_attrs",
  134. [
  135. {"tenant": "tenant_specific"},
  136. {"queue": "queue_specific"},
  137. {"tenant": "tenant_specific", "queue": "queue_specific"},
  138. ],
  139. )
  140. def test_set_process_definition_user_attr(user_attrs):
  141. """Test user with correct attributes if we specific assigned to process definition object."""
  142. default_value = {
  143. "tenant": ProcessDefinitionDefault.TENANT,
  144. "queue": ProcessDefinitionDefault.QUEUE,
  145. }
  146. with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME, **user_attrs) as pd:
  147. user = pd.user
  148. for attr in default_value:
  149. # Get assigned attribute if we specific, else get default value
  150. except_attr = (
  151. user_attrs[attr] if attr in user_attrs else default_value[attr]
  152. )
  153. # Get actually attribute of user object
  154. actual_attr = getattr(user, attr)
  155. assert (
  156. except_attr == actual_attr
  157. ), f"Except attribute is {except_attr} but get {actual_attr}"