|
@@ -34,12 +34,14 @@ import org.slf4j.LoggerFactory;
|
|
|
|
|
|
import java.io.*;
|
|
import java.io.*;
|
|
import java.util.List;
|
|
import java.util.List;
|
|
|
|
+import java.util.Map;
|
|
import java.util.stream.Collectors;
|
|
import java.util.stream.Collectors;
|
|
import java.util.stream.Stream;
|
|
import java.util.stream.Stream;
|
|
|
|
|
|
import static cn.escheduler.common.Constants.*;
|
|
import static cn.escheduler.common.Constants.*;
|
|
import static cn.escheduler.common.utils.PropertyUtils.getInt;
|
|
import static cn.escheduler.common.utils.PropertyUtils.getInt;
|
|
import static cn.escheduler.common.utils.PropertyUtils.getString;
|
|
import static cn.escheduler.common.utils.PropertyUtils.getString;
|
|
|
|
+import static cn.escheduler.common.utils.PropertyUtils.getPrefixedProperties;
|
|
|
|
|
|
/**
|
|
/**
|
|
* hadoop utils
|
|
* hadoop utils
|
|
@@ -76,7 +78,9 @@ public class HadoopUtils implements Closeable {
|
|
if(defaultFS.startsWith("file")){
|
|
if(defaultFS.startsWith("file")){
|
|
String defaultFSProp = getString(FS_DEFAULTFS);
|
|
String defaultFSProp = getString(FS_DEFAULTFS);
|
|
if(StringUtils.isNotBlank(defaultFSProp)){
|
|
if(StringUtils.isNotBlank(defaultFSProp)){
|
|
|
|
+ Map<String, String> fsRelatedProps = getPrefixedProperties("fs.");
|
|
configuration.set(FS_DEFAULTFS,defaultFSProp);
|
|
configuration.set(FS_DEFAULTFS,defaultFSProp);
|
|
|
|
+ fsRelatedProps.entrySet().stream().forEach(entry -> configuration.set(entry.getKey(), entry.getValue()));
|
|
}else{
|
|
}else{
|
|
logger.error("property:{} can not to be empty, please set!");
|
|
logger.error("property:{} can not to be empty, please set!");
|
|
throw new RuntimeException("property:{} can not to be empty, please set!");
|
|
throw new RuntimeException("property:{} can not to be empty, please set!");
|
|
@@ -316,7 +320,13 @@ public class HadoopUtils implements Closeable {
|
|
* @return data hdfs path
|
|
* @return data hdfs path
|
|
*/
|
|
*/
|
|
public static String getHdfsDataBasePath() {
|
|
public static String getHdfsDataBasePath() {
|
|
- return getString(DATA_STORE_2_HDFS_BASEPATH);
|
|
|
|
|
|
+ String basePath = getString(DATA_STORE_2_HDFS_BASEPATH);
|
|
|
|
+ if ("/".equals(basePath)) {
|
|
|
|
+ // if basepath is configured to /, the generated url may be //default/resources (with extra leading /)
|
|
|
|
+ return "";
|
|
|
|
+ } else {
|
|
|
|
+ return basePath;
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -365,7 +375,7 @@ public class HadoopUtils implements Closeable {
|
|
* @return file directory of tenants on hdfs
|
|
* @return file directory of tenants on hdfs
|
|
*/
|
|
*/
|
|
private static String getHdfsTenantDir(String tenantCode) {
|
|
private static String getHdfsTenantDir(String tenantCode) {
|
|
- return String.format("%s/%s", getString(DATA_STORE_2_HDFS_BASEPATH), tenantCode);
|
|
|
|
|
|
+ return String.format("%s/%s", getHdfsDataBasePath(), tenantCode);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|