|
@@ -29,7 +29,48 @@ from pydolphinscheduler.core.process_definition import ProcessDefinition
|
|
|
from pydolphinscheduler.tasks.datax import CustomDataX, DataX
|
|
|
|
|
|
# datax json template
|
|
|
-JSON_TEMPLATE = ""
|
|
|
+JSON_TEMPLATE = {
|
|
|
+ "job": {
|
|
|
+ "content": [
|
|
|
+ {
|
|
|
+ "reader": {
|
|
|
+ "name": "mysqlreader",
|
|
|
+ "parameter": {
|
|
|
+ "username": "usr",
|
|
|
+ "password": "pwd",
|
|
|
+ "column": ["id", "name", "code", "description"],
|
|
|
+ "splitPk": "id",
|
|
|
+ "connection": [
|
|
|
+ {
|
|
|
+ "table": ["source_table"],
|
|
|
+ "jdbcUrl": ["jdbc:mysql://127.0.0.1:3306/source_db"],
|
|
|
+ }
|
|
|
+ ],
|
|
|
+ },
|
|
|
+ },
|
|
|
+ "writer": {
|
|
|
+ "name": "mysqlwriter",
|
|
|
+ "parameter": {
|
|
|
+ "writeMode": "insert",
|
|
|
+ "username": "usr",
|
|
|
+ "password": "pwd",
|
|
|
+ "column": ["id", "name"],
|
|
|
+ "connection": [
|
|
|
+ {
|
|
|
+ "jdbcUrl": "jdbc:mysql://127.0.0.1:3306/target_db",
|
|
|
+ "table": ["target_table"],
|
|
|
+ }
|
|
|
+ ],
|
|
|
+ },
|
|
|
+ },
|
|
|
+ }
|
|
|
+ ],
|
|
|
+ "setting": {
|
|
|
+ "errorLimit": {"percentage": 0, "record": 0},
|
|
|
+ "speed": {"channel": 1, "record": 1000},
|
|
|
+ },
|
|
|
+ }
|
|
|
+}
|
|
|
|
|
|
with ProcessDefinition(
|
|
|
name="task_datax_example",
|
|
@@ -37,6 +78,8 @@ with ProcessDefinition(
|
|
|
) as pd:
|
|
|
# This task synchronizes the data in `t_ds_project`
|
|
|
# of `first_mysql` database to `target_project` of `second_mysql` database.
|
|
|
+ # You have to make sure data source named `first_mysql` and `second_mysql` exists
|
|
|
+ # in your environment.
|
|
|
task1 = DataX(
|
|
|
name="task_datax",
|
|
|
datasource_name="first_mysql",
|
|
@@ -45,6 +88,7 @@ with ProcessDefinition(
|
|
|
target_table="target_table",
|
|
|
)
|
|
|
|
|
|
- # you can custom json_template of datax to sync data.
|
|
|
- task2 = CustomDataX(name="task_custom_datax", json=JSON_TEMPLATE)
|
|
|
+ # You can custom json_template of datax to sync data. This task create a new
|
|
|
+ # datax job same as task1, transfer record from `first_mysql` to `second_mysql`
|
|
|
+ task2 = CustomDataX(name="task_custom_datax", json=str(JSON_TEMPLATE))
|
|
|
pd.run()
|