test_process_definition.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324
  1. # Licensed to the Apache Software Foundation (ASF) under one
  2. # or more contributor license agreements. See the NOTICE file
  3. # distributed with this work for additional information
  4. # regarding copyright ownership. The ASF licenses this file
  5. # to you under the Apache License, Version 2.0 (the
  6. # "License"); you may not use this file except in compliance
  7. # with the License. You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing,
  12. # software distributed under the License is distributed on an
  13. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  14. # KIND, either express or implied. See the License for the
  15. # specific language governing permissions and limitations
  16. # under the License.
  17. """Test process definition."""
  18. from datetime import datetime
  19. from typing import Any
  20. from pydolphinscheduler.utils.date import conv_to_schedule
  21. import pytest
  22. from pydolphinscheduler.constants import (
  23. ProcessDefinitionDefault,
  24. ProcessDefinitionReleaseState,
  25. )
  26. from pydolphinscheduler.core.process_definition import ProcessDefinition
  27. from pydolphinscheduler.core.task import TaskParams
  28. from pydolphinscheduler.side import Tenant, Project, User
  29. from tests.testing.task import Task
  30. from freezegun import freeze_time
  31. TEST_PROCESS_DEFINITION_NAME = "simple-test-process-definition"
  32. @pytest.mark.parametrize("func", ["run", "submit", "start"])
  33. def test_process_definition_key_attr(func):
  34. """Test process definition have specific functions or attributes."""
  35. with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
  36. assert hasattr(
  37. pd, func
  38. ), f"ProcessDefinition instance don't have attribute `{func}`"
  39. @pytest.mark.parametrize(
  40. "name,value",
  41. [
  42. ("timezone", ProcessDefinitionDefault.TIME_ZONE),
  43. ("project", Project(ProcessDefinitionDefault.PROJECT)),
  44. ("tenant", Tenant(ProcessDefinitionDefault.TENANT)),
  45. (
  46. "user",
  47. User(
  48. ProcessDefinitionDefault.USER,
  49. ProcessDefinitionDefault.USER_PWD,
  50. ProcessDefinitionDefault.USER_EMAIL,
  51. ProcessDefinitionDefault.USER_PHONE,
  52. ProcessDefinitionDefault.TENANT,
  53. ProcessDefinitionDefault.QUEUE,
  54. ProcessDefinitionDefault.USER_STATE,
  55. ),
  56. ),
  57. ("worker_group", ProcessDefinitionDefault.WORKER_GROUP),
  58. ("release_state", ProcessDefinitionReleaseState.ONLINE),
  59. ],
  60. )
  61. def test_process_definition_default_value(name, value):
  62. """Test process definition default attributes."""
  63. with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
  64. assert getattr(pd, name) == value, (
  65. f"ProcessDefinition instance attribute `{name}` not with "
  66. f"except default value `{getattr(pd, name)}`"
  67. )
  68. @pytest.mark.parametrize(
  69. "name,cls,expect",
  70. [
  71. ("name", str, "name"),
  72. ("description", str, "description"),
  73. ("schedule", str, "schedule"),
  74. ("timezone", str, "timezone"),
  75. ("worker_group", str, "worker_group"),
  76. ("timeout", int, 1),
  77. ("release_state", str, "OFFLINE"),
  78. ("param", dict, {"key": "value"}),
  79. ],
  80. )
  81. def test_set_attr(name, cls, expect):
  82. """Test process definition set attributes which get with same type."""
  83. with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
  84. setattr(pd, name, expect)
  85. assert (
  86. getattr(pd, name) == expect
  87. ), f"ProcessDefinition set attribute `{name}` do not work expect"
  88. @pytest.mark.parametrize(
  89. "set_attr,set_val,get_attr,get_val",
  90. [
  91. ("_project", "project", "project", Project("project")),
  92. ("_tenant", "tenant", "tenant", Tenant("tenant")),
  93. ("_start_time", "2021-01-01", "start_time", datetime(2021, 1, 1)),
  94. ("_end_time", "2021-01-01", "end_time", datetime(2021, 1, 1)),
  95. ],
  96. )
  97. def test_set_attr_return_special_object(set_attr, set_val, get_attr, get_val):
  98. """Test process definition set attributes which get with different type."""
  99. with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
  100. setattr(pd, set_attr, set_val)
  101. assert get_val == getattr(
  102. pd, get_attr
  103. ), f"Set attribute {set_attr} can not get back with {get_val}."
  104. @pytest.mark.parametrize(
  105. "val,expect",
  106. [
  107. (datetime(2021, 1, 1), datetime(2021, 1, 1)),
  108. (None, None),
  109. ("2021-01-01", datetime(2021, 1, 1)),
  110. ("2021-01-01 01:01:01", datetime(2021, 1, 1, 1, 1, 1)),
  111. ],
  112. )
  113. def test__parse_datetime(val, expect):
  114. """Test process definition function _parse_datetime.
  115. Only two datetime test cases here because we have more test cases in tests/utils/test_date.py file.
  116. """
  117. with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
  118. assert expect == pd._parse_datetime(
  119. val
  120. ), f"Function _parse_datetime with unexpect value by {val}."
  121. @pytest.mark.parametrize(
  122. "val",
  123. [
  124. 20210101,
  125. (2021, 1, 1),
  126. {"year": "2021", "month": "1", "day": 1},
  127. ],
  128. )
  129. def test__parse_datetime_not_support_type(val: Any):
  130. """Test process definition function _parse_datetime not support type error."""
  131. with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
  132. with pytest.raises(ValueError):
  133. pd._parse_datetime(val)
  134. def test_process_definition_to_dict_without_task():
  135. """Test process definition function to_dict without task."""
  136. expect = {
  137. "name": TEST_PROCESS_DEFINITION_NAME,
  138. "description": None,
  139. "project": ProcessDefinitionDefault.PROJECT,
  140. "tenant": ProcessDefinitionDefault.TENANT,
  141. "workerGroup": ProcessDefinitionDefault.WORKER_GROUP,
  142. "timeout": 0,
  143. "releaseState": ProcessDefinitionReleaseState.ONLINE,
  144. "param": None,
  145. "tasks": {},
  146. "taskDefinitionJson": [{}],
  147. "taskRelationJson": [{}],
  148. }
  149. with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
  150. assert pd.to_dict() == expect
  151. def test_process_definition_simple():
  152. """Test process definition simple create workflow, including process definition, task, relation define."""
  153. expect_tasks_num = 5
  154. with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
  155. for i in range(expect_tasks_num):
  156. task_params = TaskParams(raw_script=f"test-raw-script-{i}")
  157. curr_task = Task(
  158. name=f"task-{i}", task_type=f"type-{i}", task_params=task_params
  159. )
  160. # Set deps task i as i-1 parent
  161. if i > 0:
  162. pre_task = pd.get_one_task_by_name(f"task-{i - 1}")
  163. curr_task.set_upstream(pre_task)
  164. assert len(pd.tasks) == expect_tasks_num
  165. # Test if task process_definition same as origin one
  166. task: Task = pd.get_one_task_by_name("task-0")
  167. assert pd is task.process_definition
  168. # Test if all tasks with expect deps
  169. for i in range(expect_tasks_num):
  170. task: Task = pd.get_one_task_by_name(f"task-{i}")
  171. if i == 0:
  172. assert task._upstream_task_codes == set()
  173. assert task._downstream_task_codes == {
  174. pd.get_one_task_by_name("task-1").code
  175. }
  176. elif i == expect_tasks_num - 1:
  177. assert task._upstream_task_codes == {
  178. pd.get_one_task_by_name(f"task-{i - 1}").code
  179. }
  180. assert task._downstream_task_codes == set()
  181. else:
  182. assert task._upstream_task_codes == {
  183. pd.get_one_task_by_name(f"task-{i - 1}").code
  184. }
  185. assert task._downstream_task_codes == {
  186. pd.get_one_task_by_name(f"task-{i + 1}").code
  187. }
  188. @pytest.mark.parametrize(
  189. "user_attrs",
  190. [
  191. {"tenant": "tenant_specific"},
  192. {"queue": "queue_specific"},
  193. {"tenant": "tenant_specific", "queue": "queue_specific"},
  194. ],
  195. )
  196. def test_set_process_definition_user_attr(user_attrs):
  197. """Test user with correct attributes if we specific assigned to process definition object."""
  198. default_value = {
  199. "tenant": ProcessDefinitionDefault.TENANT,
  200. "queue": ProcessDefinitionDefault.QUEUE,
  201. }
  202. with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME, **user_attrs) as pd:
  203. user = pd.user
  204. for attr in default_value:
  205. # Get assigned attribute if we specific, else get default value
  206. except_attr = (
  207. user_attrs[attr] if attr in user_attrs else default_value[attr]
  208. )
  209. # Get actually attribute of user object
  210. actual_attr = getattr(user, attr)
  211. assert (
  212. except_attr == actual_attr
  213. ), f"Except attribute is {except_attr} but get {actual_attr}"
  214. def test_schedule_json_none_schedule():
  215. """Test function schedule_json with None as schedule."""
  216. with ProcessDefinition(
  217. TEST_PROCESS_DEFINITION_NAME,
  218. schedule=None,
  219. ) as pd:
  220. assert pd.schedule_json is None
  221. # We freeze time here, because we test start_time with None, and if will get datetime.datetime.now. If we do
  222. # not freeze time, it will cause flaky test here.
  223. @freeze_time("2021-01-01")
  224. @pytest.mark.parametrize(
  225. "start_time,end_time,expect_date",
  226. [
  227. (
  228. "20210101",
  229. "20210201",
  230. {"start_time": "2021-01-01 00:00:00", "end_time": "2021-02-01 00:00:00"},
  231. ),
  232. (
  233. "2021-01-01",
  234. "2021-02-01",
  235. {"start_time": "2021-01-01 00:00:00", "end_time": "2021-02-01 00:00:00"},
  236. ),
  237. (
  238. "2021/01/01",
  239. "2021/02/01",
  240. {"start_time": "2021-01-01 00:00:00", "end_time": "2021-02-01 00:00:00"},
  241. ),
  242. # Test mix pattern
  243. (
  244. "2021/01/01 01:01:01",
  245. "2021-02-02 02:02:02",
  246. {"start_time": "2021-01-01 01:01:01", "end_time": "2021-02-02 02:02:02"},
  247. ),
  248. (
  249. "2021/01/01 01:01:01",
  250. "20210202 020202",
  251. {"start_time": "2021-01-01 01:01:01", "end_time": "2021-02-02 02:02:02"},
  252. ),
  253. (
  254. "20210101 010101",
  255. "2021-02-02 02:02:02",
  256. {"start_time": "2021-01-01 01:01:01", "end_time": "2021-02-02 02:02:02"},
  257. ),
  258. # Test None value
  259. (
  260. "2021/01/01 01:02:03",
  261. None,
  262. {"start_time": "2021-01-01 01:02:03", "end_time": "9999-12-31 23:59:59"},
  263. ),
  264. (
  265. None,
  266. None,
  267. {
  268. "start_time": conv_to_schedule(datetime(2021, 1, 1)),
  269. "end_time": "9999-12-31 23:59:59",
  270. },
  271. ),
  272. ],
  273. )
  274. def test_schedule_json_start_and_end_time(start_time, end_time, expect_date):
  275. """Test function schedule_json about handle start_time and end_time.
  276. Only two datetime test cases here because we have more test cases in tests/utils/test_date.py file.
  277. """
  278. schedule = "0 0 0 * * ? *"
  279. expect = {
  280. "crontab": schedule,
  281. "startTime": expect_date["start_time"],
  282. "endTime": expect_date["end_time"],
  283. "timezoneId": ProcessDefinitionDefault.TIME_ZONE,
  284. }
  285. with ProcessDefinition(
  286. TEST_PROCESS_DEFINITION_NAME,
  287. schedule=schedule,
  288. start_time=start_time,
  289. end_time=end_time,
  290. timezone=ProcessDefinitionDefault.TIME_ZONE,
  291. ) as pd:
  292. assert pd.schedule_json == expect