|
@@ -21,7 +21,6 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
|
|
|
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
|
|
|
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
|
|
|
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
|
|
|
-import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
|
|
|
import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
|
|
|
import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils;
|
|
|
import org.apache.dolphinscheduler.plugin.task.python.PythonTask;
|
|
@@ -42,9 +41,8 @@ import com.google.common.base.Preconditions;
|
|
|
public class OpenmldbTask extends PythonTask {
|
|
|
|
|
|
/**
|
|
|
- * openmldb parameters
|
|
|
+ * openmldb parameters: cast pythonParameters to OpenmldbParameters
|
|
|
*/
|
|
|
- private OpenmldbParameters openmldbParameters;
|
|
|
|
|
|
/**
|
|
|
* python process(openmldb only supports version 3 by default)
|
|
@@ -63,11 +61,10 @@ public class OpenmldbTask extends PythonTask {
|
|
|
|
|
|
@Override
|
|
|
public void init() {
|
|
|
+ pythonParameters = JSONUtils.parseObject(taskRequest.getTaskParams(), OpenmldbParameters.class);
|
|
|
|
|
|
- openmldbParameters = JSONUtils.parseObject(taskRequest.getTaskParams(), OpenmldbParameters.class);
|
|
|
-
|
|
|
- logger.info("Initialize openmldb task params {}", JSONUtils.toPrettyJsonString(openmldbParameters));
|
|
|
- if (openmldbParameters == null || !openmldbParameters.checkParameters()) {
|
|
|
+ logger.info("Initialize openmldb task params {}", JSONUtils.toPrettyJsonString(pythonParameters));
|
|
|
+ if (pythonParameters == null || !pythonParameters.checkParameters()) {
|
|
|
throw new TaskException("openmldb task params is not valid");
|
|
|
}
|
|
|
}
|
|
@@ -78,11 +75,6 @@ public class OpenmldbTask extends PythonTask {
|
|
|
return "";
|
|
|
}
|
|
|
|
|
|
- @Override
|
|
|
- public AbstractParameters getParameters() {
|
|
|
- return openmldbParameters;
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
* build python command file path
|
|
|
*
|
|
@@ -100,6 +92,7 @@ public class OpenmldbTask extends PythonTask {
|
|
|
*/
|
|
|
@Override
|
|
|
protected String buildPythonScriptContent() {
|
|
|
+ OpenmldbParameters openmldbParameters = (OpenmldbParameters) pythonParameters;
|
|
|
logger.info("raw sql script : {}", openmldbParameters.getSql());
|
|
|
|
|
|
String rawSQLScript = openmldbParameters.getSql().replaceAll("[\\r]?\\n", "\n");
|
|
@@ -117,6 +110,7 @@ public class OpenmldbTask extends PythonTask {
|
|
|
StringBuilder builder = new StringBuilder("import openmldb\nimport sqlalchemy as db\n");
|
|
|
|
|
|
// connect to openmldb
|
|
|
+ OpenmldbParameters openmldbParameters = (OpenmldbParameters) pythonParameters;
|
|
|
builder.append(String.format("engine = db.create_engine('openmldb:///?zk=%s&zkPath=%s')\n",
|
|
|
openmldbParameters.getZk(), openmldbParameters.getZkPath()));
|
|
|
builder.append("con = engine.connect()\n");
|