|
@@ -18,6 +18,7 @@ package cn.escheduler.common.utils;
|
|
|
|
|
|
import cn.escheduler.common.Constants;
|
|
|
import cn.escheduler.common.enums.ExecutionStatus;
|
|
|
+import cn.escheduler.common.enums.ResUploadType;
|
|
|
import com.alibaba.fastjson.JSON;
|
|
|
import com.alibaba.fastjson.JSONException;
|
|
|
import com.alibaba.fastjson.JSONObject;
|
|
@@ -40,6 +41,7 @@ import java.util.stream.Stream;
|
|
|
|
|
|
import static cn.escheduler.common.Constants.*;
|
|
|
import static cn.escheduler.common.utils.PropertyUtils.*;
|
|
|
+import static cn.escheduler.common.utils.PropertyUtils.getString;
|
|
|
|
|
|
/**
|
|
|
* hadoop utils
|
|
@@ -94,48 +96,61 @@ public class HadoopUtils implements Closeable {
|
|
|
try {
|
|
|
configuration = new Configuration();
|
|
|
|
|
|
- if (getBoolean(Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE)){
|
|
|
- System.setProperty(Constants.JAVA_SECURITY_KRB5_CONF,
|
|
|
- getString(Constants.JAVA_SECURITY_KRB5_CONF_PATH));
|
|
|
- configuration.set(Constants.HADOOP_SECURITY_AUTHENTICATION,"kerberos");
|
|
|
- UserGroupInformation.setConfiguration(configuration);
|
|
|
- UserGroupInformation.loginUserFromKeytab(getString(Constants.LOGIN_USER_KEY_TAB_USERNAME),
|
|
|
- getString(Constants.LOGIN_USER_KEY_TAB_PATH));
|
|
|
- }
|
|
|
+ String resUploadStartupType = PropertyUtils.getString(Constants.RES_UPLOAD_STARTUP_TYPE);
|
|
|
+ ResUploadType resUploadType = ResUploadType.valueOf(resUploadStartupType);
|
|
|
+
|
|
|
+ if (resUploadType == ResUploadType.HDFS){
|
|
|
+ if (getBoolean(Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE)){
|
|
|
+ System.setProperty(Constants.JAVA_SECURITY_KRB5_CONF,
|
|
|
+ getString(Constants.JAVA_SECURITY_KRB5_CONF_PATH));
|
|
|
+ configuration.set(Constants.HADOOP_SECURITY_AUTHENTICATION,"kerberos");
|
|
|
+ UserGroupInformation.setConfiguration(configuration);
|
|
|
+ UserGroupInformation.loginUserFromKeytab(getString(Constants.LOGIN_USER_KEY_TAB_USERNAME),
|
|
|
+ getString(Constants.LOGIN_USER_KEY_TAB_PATH));
|
|
|
+ }
|
|
|
|
|
|
- String defaultFS = configuration.get(FS_DEFAULTFS);
|
|
|
- //first get key from core-site.xml hdfs-site.xml ,if null ,then try to get from properties file
|
|
|
- // the default is the local file system
|
|
|
- if(defaultFS.startsWith("file")){
|
|
|
- String defaultFSProp = getString(FS_DEFAULTFS);
|
|
|
- if(StringUtils.isNotBlank(defaultFSProp)){
|
|
|
- Map<String, String> fsRelatedProps = getPrefixedProperties("fs.");
|
|
|
- configuration.set(FS_DEFAULTFS,defaultFSProp);
|
|
|
- fsRelatedProps.entrySet().stream().forEach(entry -> configuration.set(entry.getKey(), entry.getValue()));
|
|
|
+ String defaultFS = configuration.get(FS_DEFAULTFS);
|
|
|
+ //first get key from core-site.xml hdfs-site.xml ,if null ,then try to get from properties file
|
|
|
+ // the default is the local file system
|
|
|
+ if(defaultFS.startsWith("file")){
|
|
|
+ String defaultFSProp = getString(FS_DEFAULTFS);
|
|
|
+ if(StringUtils.isNotBlank(defaultFSProp)){
|
|
|
+ Map<String, String> fsRelatedProps = getPrefixedProperties("fs.");
|
|
|
+ configuration.set(FS_DEFAULTFS,defaultFSProp);
|
|
|
+ fsRelatedProps.entrySet().stream().forEach(entry -> configuration.set(entry.getKey(), entry.getValue()));
|
|
|
+ }else{
|
|
|
+ logger.error("property:{} can not to be empty, please set!");
|
|
|
+ throw new RuntimeException("property:{} can not to be empty, please set!");
|
|
|
+ }
|
|
|
}else{
|
|
|
- logger.error("property:{} can not to be empty, please set!");
|
|
|
- throw new RuntimeException("property:{} can not to be empty, please set!");
|
|
|
+ logger.info("get property:{} -> {}, from core-site.xml hdfs-site.xml ", FS_DEFAULTFS, defaultFS);
|
|
|
}
|
|
|
- }else{
|
|
|
- logger.info("get property:{} -> {}, from core-site.xml hdfs-site.xml ", FS_DEFAULTFS, defaultFS);
|
|
|
- }
|
|
|
|
|
|
- if (fs == null) {
|
|
|
- if(StringUtils.isNotEmpty(hdfsUser)){
|
|
|
- //UserGroupInformation ugi = UserGroupInformation.createProxyUser(hdfsUser,UserGroupInformation.getLoginUser());
|
|
|
- UserGroupInformation ugi = UserGroupInformation.createRemoteUser(hdfsUser);
|
|
|
- ugi.doAs(new PrivilegedExceptionAction<Boolean>() {
|
|
|
- @Override
|
|
|
- public Boolean run() throws Exception {
|
|
|
- fs = FileSystem.get(configuration);
|
|
|
- return true;
|
|
|
- }
|
|
|
- });
|
|
|
- }else{
|
|
|
- logger.warn("hdfs.root.user is not set value!");
|
|
|
- fs = FileSystem.get(configuration);
|
|
|
+ if (fs == null) {
|
|
|
+ if(StringUtils.isNotEmpty(hdfsUser)){
|
|
|
+ //UserGroupInformation ugi = UserGroupInformation.createProxyUser(hdfsUser,UserGroupInformation.getLoginUser());
|
|
|
+ UserGroupInformation ugi = UserGroupInformation.createRemoteUser(hdfsUser);
|
|
|
+ ugi.doAs(new PrivilegedExceptionAction<Boolean>() {
|
|
|
+ @Override
|
|
|
+ public Boolean run() throws Exception {
|
|
|
+ fs = FileSystem.get(configuration);
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }else{
|
|
|
+ logger.warn("hdfs.root.user is not set value!");
|
|
|
+ fs = FileSystem.get(configuration);
|
|
|
+ }
|
|
|
}
|
|
|
+ }else if (resUploadType == ResUploadType.S3){
|
|
|
+ configuration.set(FS_DEFAULTFS,getString(FS_DEFAULTFS));
|
|
|
+ configuration.set(FS_S3A_ENDPOINT,getString(FS_S3A_ENDPOINT));
|
|
|
+ configuration.set(FS_S3A_ACCESS_KEY,getString(FS_S3A_ACCESS_KEY));
|
|
|
+ configuration.set(FS_S3A_SECRET_KEY,getString(FS_S3A_SECRET_KEY));
|
|
|
+ fs = FileSystem.get(configuration);
|
|
|
}
|
|
|
+
|
|
|
+
|
|
|
String rmHaIds = getString(YARN_RESOURCEMANAGER_HA_RM_IDS);
|
|
|
String appAddress = getString(Constants.YARN_APPLICATION_STATUS_ADDRESS);
|
|
|
if (!StringUtils.isEmpty(rmHaIds)) {
|