|
@@ -170,8 +170,8 @@ def test_process_definition_to_dict_without_task():
|
|
|
assert pd.to_dict() == expect
|
|
|
|
|
|
|
|
|
-def test_process_definition_simple():
|
|
|
- """Test process definition simple create workflow, including process definition, task, relation define."""
|
|
|
+def test_process_definition_simple_context_manager():
|
|
|
+ """Test simple create workflow in process definition context manager mode."""
|
|
|
expect_tasks_num = 5
|
|
|
with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
|
|
|
for i in range(expect_tasks_num):
|
|
@@ -211,6 +211,30 @@ def test_process_definition_simple():
|
|
|
}
|
|
|
|
|
|
|
|
|
+def test_process_definition_simple_separate():
|
|
|
+ """Test process definition simple create workflow in separate mode.
|
|
|
+
|
|
|
+ This test just test basic information, cause most of test case is duplicate to
|
|
|
+ test_process_definition_simple_context_manager.
|
|
|
+ """
|
|
|
+ expect_tasks_num = 5
|
|
|
+ pd = ProcessDefinition(TEST_PROCESS_DEFINITION_NAME)
|
|
|
+ for i in range(expect_tasks_num):
|
|
|
+ task_params = TaskParams()
|
|
|
+ curr_task = Task(
|
|
|
+ name=f"task-{i}",
|
|
|
+ task_type=f"type-{i}",
|
|
|
+ task_params=task_params,
|
|
|
+ process_definition=pd,
|
|
|
+ )
|
|
|
+
|
|
|
+ if i > 0:
|
|
|
+ pre_task = pd.get_one_task_by_name(f"task-{i - 1}")
|
|
|
+ curr_task.set_upstream(pre_task)
|
|
|
+ assert len(pd.tasks) == expect_tasks_num
|
|
|
+ assert all(["task-" in task.name for task in pd.task_list])
|
|
|
+
|
|
|
+
|
|
|
@pytest.mark.parametrize(
|
|
|
"user_attrs",
|
|
|
[
|