1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950 |
- """Test process definition in integration."""
- from typing import Dict
- import pytest
- from pydolphinscheduler.core.process_definition import ProcessDefinition
- from pydolphinscheduler.tasks.shell import Shell
- PROCESS_DEFINITION_NAME = "test_change_exists_attr_pd"
- TASK_NAME = f"task_{PROCESS_DEFINITION_NAME}"
- @pytest.mark.parametrize(
- "pre, post",
- [
- (
- {
- "user": "pre_user",
- },
- {
- "user": "post_user",
- },
- )
- ],
- )
- def test_change_process_definition_attr(pre: Dict, post: Dict):
- """Test whether process definition success when specific attribute change."""
- assert pre.keys() == post.keys(), "Not equal keys for pre and post attribute."
- for attrs in [pre, post]:
- with ProcessDefinition(name=PROCESS_DEFINITION_NAME, **attrs) as pd:
- Shell(name=TASK_NAME, command="echo 1")
- pd.submit()
|