|
@@ -53,6 +53,7 @@ import java.util.List;
|
|
|
import java.util.stream.Collectors;
|
|
|
import java.util.stream.Stream;
|
|
|
|
|
|
+import lombok.Data;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
|
|
|
import com.amazonaws.AmazonServiceException;
|
|
@@ -78,38 +79,72 @@ import com.amazonaws.services.s3.transfer.TransferManagerBuilder;
|
|
|
import com.google.common.base.Joiner;
|
|
|
|
|
|
@Slf4j
|
|
|
+@Data
|
|
|
public class S3StorageOperator implements Closeable, StorageOperate {
|
|
|
|
|
|
- // todo: move to s3
|
|
|
- private static final String ACCESS_KEY_ID = PropertyUtils.getString(TaskConstants.AWS_ACCESS_KEY_ID);
|
|
|
+ private String accessKeyId;
|
|
|
|
|
|
- private static final String SECRET_KEY_ID = PropertyUtils.getString(TaskConstants.AWS_SECRET_ACCESS_KEY);
|
|
|
+ private String accessKeySecret;
|
|
|
|
|
|
- private static final String REGION = PropertyUtils.getString(TaskConstants.AWS_REGION);
|
|
|
+ private String region;
|
|
|
|
|
|
- private static final String BUCKET_NAME = PropertyUtils.getString(Constants.AWS_S3_BUCKET_NAME);
|
|
|
+ private String bucketName;
|
|
|
|
|
|
- private final AmazonS3 s3Client;
|
|
|
+ private String endPoint;
|
|
|
+
|
|
|
+ private AmazonS3 s3Client;
|
|
|
|
|
|
public S3StorageOperator() {
|
|
|
- if (!StringUtils.isEmpty(PropertyUtils.getString(AWS_END_POINT))) {
|
|
|
- s3Client = AmazonS3ClientBuilder
|
|
|
+ }
|
|
|
+
|
|
|
+ public void init() {
|
|
|
+ accessKeyId = readAccessKeyID();
|
|
|
+ accessKeySecret = readAccessKeySecret();
|
|
|
+ region = readRegion();
|
|
|
+ bucketName = readBucketName();
|
|
|
+ endPoint = readEndPoint();
|
|
|
+ s3Client = buildS3Client();
|
|
|
+ checkBucketNameExists(bucketName);
|
|
|
+ }
|
|
|
+
|
|
|
+ protected AmazonS3 buildS3Client() {
|
|
|
+ if (!StringUtils.isEmpty(endPoint)) {
|
|
|
+ return AmazonS3ClientBuilder
|
|
|
.standard()
|
|
|
.withPathStyleAccessEnabled(true)
|
|
|
.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(
|
|
|
- PropertyUtils.getString(AWS_END_POINT), Regions.fromName(REGION).getName()))
|
|
|
+ endPoint, Regions.fromName(region).getName()))
|
|
|
.withCredentials(
|
|
|
- new AWSStaticCredentialsProvider(new BasicAWSCredentials(ACCESS_KEY_ID, SECRET_KEY_ID)))
|
|
|
+ new AWSStaticCredentialsProvider(new BasicAWSCredentials(accessKeyId, accessKeySecret)))
|
|
|
.build();
|
|
|
} else {
|
|
|
- s3Client = AmazonS3ClientBuilder
|
|
|
+ return AmazonS3ClientBuilder
|
|
|
.standard()
|
|
|
.withCredentials(
|
|
|
- new AWSStaticCredentialsProvider(new BasicAWSCredentials(ACCESS_KEY_ID, SECRET_KEY_ID)))
|
|
|
- .withRegion(Regions.fromName(REGION))
|
|
|
+ new AWSStaticCredentialsProvider(new BasicAWSCredentials(accessKeyId, accessKeySecret)))
|
|
|
+ .withRegion(Regions.fromName(region))
|
|
|
.build();
|
|
|
}
|
|
|
- checkBucketNameExists(BUCKET_NAME);
|
|
|
+ }
|
|
|
+
|
|
|
+ protected String readAccessKeyID() {
|
|
|
+ return PropertyUtils.getString(TaskConstants.AWS_ACCESS_KEY_ID);
|
|
|
+ }
|
|
|
+
|
|
|
+ protected String readAccessKeySecret() {
|
|
|
+ return PropertyUtils.getString(TaskConstants.AWS_SECRET_ACCESS_KEY);
|
|
|
+ }
|
|
|
+
|
|
|
+ protected String readRegion() {
|
|
|
+ return PropertyUtils.getString(TaskConstants.AWS_REGION);
|
|
|
+ }
|
|
|
+
|
|
|
+ protected String readBucketName() {
|
|
|
+ return PropertyUtils.getString(Constants.AWS_S3_BUCKET_NAME);
|
|
|
+ }
|
|
|
+
|
|
|
+ protected String readEndPoint() {
|
|
|
+ return PropertyUtils.getString(AWS_END_POINT);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -136,11 +171,11 @@ public class S3StorageOperator implements Closeable, StorageOperate {
|
|
|
@Override
|
|
|
public boolean mkdir(String tenantCode, String path) throws IOException {
|
|
|
String objectName = path + FOLDER_SEPARATOR;
|
|
|
- if (!s3Client.doesObjectExist(BUCKET_NAME, objectName)) {
|
|
|
+ if (!s3Client.doesObjectExist(bucketName, objectName)) {
|
|
|
ObjectMetadata metadata = new ObjectMetadata();
|
|
|
metadata.setContentLength(0);
|
|
|
InputStream emptyContent = new ByteArrayInputStream(new byte[0]);
|
|
|
- PutObjectRequest putObjectRequest = new PutObjectRequest(BUCKET_NAME, objectName, emptyContent, metadata);
|
|
|
+ PutObjectRequest putObjectRequest = new PutObjectRequest(bucketName, objectName, emptyContent, metadata);
|
|
|
s3Client.putObject(putObjectRequest);
|
|
|
}
|
|
|
return true;
|
|
@@ -191,7 +226,7 @@ public class S3StorageOperator implements Closeable, StorageOperate {
|
|
|
} else {
|
|
|
Files.createDirectories(dstFile.getParentFile().toPath());
|
|
|
}
|
|
|
- S3Object o = s3Client.getObject(BUCKET_NAME, srcFilePath);
|
|
|
+ S3Object o = s3Client.getObject(bucketName, srcFilePath);
|
|
|
try (
|
|
|
S3ObjectInputStream s3is = o.getObjectContent();
|
|
|
FileOutputStream fos = new FileOutputStream(dstFilePath)) {
|
|
@@ -210,13 +245,13 @@ public class S3StorageOperator implements Closeable, StorageOperate {
|
|
|
|
|
|
@Override
|
|
|
public boolean exists(String fullName) throws IOException {
|
|
|
- return s3Client.doesObjectExist(BUCKET_NAME, fullName);
|
|
|
+ return s3Client.doesObjectExist(bucketName, fullName);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public boolean delete(String fullName, boolean recursive) throws IOException {
|
|
|
try {
|
|
|
- s3Client.deleteObject(BUCKET_NAME, fullName);
|
|
|
+ s3Client.deleteObject(bucketName, fullName);
|
|
|
return true;
|
|
|
} catch (AmazonServiceException e) {
|
|
|
log.error("delete the object error,the resource path is {}", fullName);
|
|
@@ -229,7 +264,7 @@ public class S3StorageOperator implements Closeable, StorageOperate {
|
|
|
// append the resource fullName to the list for deletion.
|
|
|
childrenPathList.add(fullName);
|
|
|
|
|
|
- DeleteObjectsRequest deleteObjectsRequest = new DeleteObjectsRequest(BUCKET_NAME)
|
|
|
+ DeleteObjectsRequest deleteObjectsRequest = new DeleteObjectsRequest(bucketName)
|
|
|
.withKeys(childrenPathList.stream().toArray(String[]::new));
|
|
|
try {
|
|
|
s3Client.deleteObjects(deleteObjectsRequest);
|
|
@@ -243,8 +278,8 @@ public class S3StorageOperator implements Closeable, StorageOperate {
|
|
|
|
|
|
@Override
|
|
|
public boolean copy(String srcPath, String dstPath, boolean deleteSource, boolean overwrite) throws IOException {
|
|
|
- s3Client.copyObject(BUCKET_NAME, srcPath, BUCKET_NAME, dstPath);
|
|
|
- s3Client.deleteObject(BUCKET_NAME, srcPath);
|
|
|
+ s3Client.copyObject(bucketName, srcPath, bucketName, dstPath);
|
|
|
+ s3Client.deleteObject(bucketName, srcPath);
|
|
|
return true;
|
|
|
}
|
|
|
|
|
@@ -265,10 +300,10 @@ public class S3StorageOperator implements Closeable, StorageOperate {
|
|
|
public boolean upload(String tenantCode, String srcFile, String dstPath, boolean deleteSource,
|
|
|
boolean overwrite) throws IOException {
|
|
|
try {
|
|
|
- s3Client.putObject(BUCKET_NAME, dstPath, new File(srcFile));
|
|
|
+ s3Client.putObject(bucketName, dstPath, new File(srcFile));
|
|
|
return true;
|
|
|
} catch (AmazonServiceException e) {
|
|
|
- log.error("upload failed,the bucketName is {},the filePath is {}", BUCKET_NAME, dstPath);
|
|
|
+ log.error("upload failed,the bucketName is {},the filePath is {}", bucketName, dstPath);
|
|
|
return false;
|
|
|
}
|
|
|
}
|
|
@@ -279,7 +314,7 @@ public class S3StorageOperator implements Closeable, StorageOperate {
|
|
|
log.error("file path:{} is blank", filePath);
|
|
|
return Collections.emptyList();
|
|
|
}
|
|
|
- S3Object s3Object = s3Client.getObject(BUCKET_NAME, filePath);
|
|
|
+ S3Object s3Object = s3Client.getObject(bucketName, filePath);
|
|
|
try (BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(s3Object.getObjectContent()))) {
|
|
|
Stream<String> stream = bufferedReader.lines().skip(skipLineNums).limit(limit);
|
|
|
return stream.collect(Collectors.toList());
|
|
@@ -297,7 +332,7 @@ public class S3StorageOperator implements Closeable, StorageOperate {
|
|
|
* @param tenantCode tenant code
|
|
|
* @return S3 resource dir
|
|
|
*/
|
|
|
- public static String getS3ResDir(String tenantCode) {
|
|
|
+ public String getS3ResDir(String tenantCode) {
|
|
|
return String.format("%s/" + RESOURCE_TYPE_FILE, getS3TenantDir(tenantCode));
|
|
|
}
|
|
|
|
|
@@ -307,7 +342,7 @@ public class S3StorageOperator implements Closeable, StorageOperate {
|
|
|
* @param tenantCode tenant code
|
|
|
* @return get udf dir on S3
|
|
|
*/
|
|
|
- public static String getS3UdfDir(String tenantCode) {
|
|
|
+ public String getS3UdfDir(String tenantCode) {
|
|
|
return String.format("%s/" + RESOURCE_TYPE_UDF, getS3TenantDir(tenantCode));
|
|
|
}
|
|
|
|
|
@@ -315,7 +350,7 @@ public class S3StorageOperator implements Closeable, StorageOperate {
|
|
|
* @param tenantCode tenant code
|
|
|
* @return file directory of tenants on S3
|
|
|
*/
|
|
|
- public static String getS3TenantDir(String tenantCode) {
|
|
|
+ public String getS3TenantDir(String tenantCode) {
|
|
|
return String.format(FORMAT_S_S, getS3DataBasePath(), tenantCode);
|
|
|
}
|
|
|
|
|
@@ -324,7 +359,7 @@ public class S3StorageOperator implements Closeable, StorageOperate {
|
|
|
*
|
|
|
* @return data S3 path
|
|
|
*/
|
|
|
- public static String getS3DataBasePath() {
|
|
|
+ public String getS3DataBasePath() {
|
|
|
if (FOLDER_SEPARATOR.equals(RESOURCE_UPLOAD_PATH)) {
|
|
|
return "";
|
|
|
} else {
|
|
@@ -332,9 +367,9 @@ public class S3StorageOperator implements Closeable, StorageOperate {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private void deleteTenantCode(String tenantCode) {
|
|
|
- deleteDirectory(getResDir(tenantCode));
|
|
|
- deleteDirectory(getUdfDir(tenantCode));
|
|
|
+ protected void deleteTenantCode(String tenantCode) {
|
|
|
+ deleteDir(getResDir(tenantCode));
|
|
|
+ deleteDir(getUdfDir(tenantCode));
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -346,7 +381,7 @@ public class S3StorageOperator implements Closeable, StorageOperate {
|
|
|
* @param strPath
|
|
|
*/
|
|
|
private void uploadDirectory(String tenantCode, String keyPrefix, String strPath) {
|
|
|
- s3Client.putObject(BUCKET_NAME, tenantCode + FOLDER_SEPARATOR + keyPrefix, new File(strPath));
|
|
|
+ s3Client.putObject(bucketName, tenantCode + FOLDER_SEPARATOR + keyPrefix, new File(strPath));
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -361,10 +396,10 @@ public class S3StorageOperator implements Closeable, StorageOperate {
|
|
|
TransferManager tm = TransferManagerBuilder.standard().withS3Client(s3Client).build();
|
|
|
try {
|
|
|
MultipleFileDownload download =
|
|
|
- tm.downloadDirectory(BUCKET_NAME, tenantCode + FOLDER_SEPARATOR + keyPrefix, new File(srcPath));
|
|
|
+ tm.downloadDirectory(bucketName, tenantCode + FOLDER_SEPARATOR + keyPrefix, new File(srcPath));
|
|
|
download.waitForCompletion();
|
|
|
} catch (AmazonS3Exception | InterruptedException e) {
|
|
|
- log.error("download the directory failed with the bucketName is {} and the keyPrefix is {}", BUCKET_NAME,
|
|
|
+ log.error("download the directory failed with the bucketName is {} and the keyPrefix is {}", bucketName,
|
|
|
tenantCode + FOLDER_SEPARATOR + keyPrefix);
|
|
|
Thread.currentThread().interrupt();
|
|
|
} finally {
|
|
@@ -394,9 +429,9 @@ public class S3StorageOperator implements Closeable, StorageOperate {
|
|
|
/**
|
|
|
* only delete the object of directory ,it`s better to delete the files in it -r
|
|
|
*/
|
|
|
- private void deleteDirectory(String directoryName) {
|
|
|
- if (s3Client.doesObjectExist(BUCKET_NAME, directoryName)) {
|
|
|
- s3Client.deleteObject(BUCKET_NAME, directoryName);
|
|
|
+ protected void deleteDir(String directoryName) {
|
|
|
+ if (s3Client.doesObjectExist(bucketName, directoryName)) {
|
|
|
+ s3Client.deleteObject(bucketName, directoryName);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -446,7 +481,7 @@ public class S3StorageOperator implements Closeable, StorageOperate {
|
|
|
|
|
|
// TODO: optimize pagination
|
|
|
ListObjectsV2Request request = new ListObjectsV2Request();
|
|
|
- request.setBucketName(BUCKET_NAME);
|
|
|
+ request.setBucketName(bucketName);
|
|
|
request.setPrefix(path);
|
|
|
request.setDelimiter("/");
|
|
|
|
|
@@ -520,7 +555,7 @@ public class S3StorageOperator implements Closeable, StorageOperate {
|
|
|
// Since we still want to access it on frontend, this is a workaround using listObjects.
|
|
|
|
|
|
ListObjectsV2Request request = new ListObjectsV2Request();
|
|
|
- request.setBucketName(BUCKET_NAME);
|
|
|
+ request.setBucketName(bucketName);
|
|
|
request.setPrefix(path);
|
|
|
request.setDelimiter("/");
|
|
|
|
|
@@ -574,7 +609,7 @@ public class S3StorageOperator implements Closeable, StorageOperate {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- throw new FileNotFoundException("Object is not found in S3 Bucket: " + BUCKET_NAME);
|
|
|
+ throw new FileNotFoundException("Object is not found in S3 Bucket: " + bucketName);
|
|
|
}
|
|
|
|
|
|
/**
|