Skip to content

Commit 6bc388f

Browse files
authored
Enhance the functionality of s3filesystem (#5208)
* 1. Enhance the functionality of s3filesystem to support multipart uploads. 2. Support the use of s3 storage for BML materials and workspaces. * format code
1 parent 2d0b2ad commit 6bc388f

File tree

9 files changed

+558
-85
lines changed

9 files changed

+558
-85
lines changed

linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/S3FileSystem.java

Lines changed: 111 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,17 @@
2121
import org.apache.linkis.storage.domain.FsPathListWithError;
2222
import org.apache.linkis.storage.exception.StorageWarnException;
2323
import org.apache.linkis.storage.fs.FileSystem;
24+
import org.apache.linkis.storage.fs.stream.S3OutputStream;
2425
import org.apache.linkis.storage.utils.StorageConfiguration;
2526
import org.apache.linkis.storage.utils.StorageUtils;
2627

2728
import org.apache.commons.io.IOUtils;
2829
import org.apache.commons.lang3.StringUtils;
2930

30-
import java.io.*;
31+
import java.io.File;
32+
import java.io.IOException;
33+
import java.io.InputStream;
34+
import java.io.OutputStream;
3135
import java.util.ArrayList;
3236
import java.util.List;
3337
import java.util.Map;
@@ -99,6 +103,7 @@ public String rootUserName() {
99103
public FsPath get(String dest) throws IOException {
100104
FsPath ret = new FsPath(dest);
101105
if (exists(ret)) {
106+
ret.setIsdir(isDir(buildKey(ret.getPath())));
102107
return ret;
103108
} else {
104109
logger.warn("File or folder does not exist or file name is garbled(文件或者文件夹不存在或者文件名乱码)");
@@ -111,21 +116,31 @@ public FsPath get(String dest) throws IOException {
111116
@Override
112117
public InputStream read(FsPath dest) throws IOException {
113118
try {
114-
return s3Client.getObject(bucket, buildPrefix(dest.getPath(), false)).getObjectContent();
119+
return s3Client.getObject(bucket, buildKey(dest.getPath())).getObjectContent();
115120
} catch (AmazonS3Exception e) {
116121
throw new IOException("You have not permission to access path " + dest.getPath());
117122
}
118123
}
119124

120125
@Override
121126
public OutputStream write(FsPath dest, boolean overwrite) throws IOException {
122-
try (InputStream inputStream = read(dest);
123-
OutputStream outputStream =
124-
new S3OutputStream(s3Client, bucket, buildPrefix(dest.getPath(), false))) {
127+
InputStream inputStream = null;
128+
try {
129+
if (!exists(dest)) {
130+
create(dest.getPath());
131+
}
132+
133+
OutputStream outputStream = new S3OutputStream(s3Client, bucket, buildKey(dest.getPath()));
134+
125135
if (!overwrite) {
136+
inputStream = read(dest);
126137
IOUtils.copy(inputStream, outputStream);
127138
}
128139
return outputStream;
140+
} catch (IOException e) {
141+
throw new IOException("You have not permission to access path " + dest.getPath());
142+
} finally {
143+
IOUtils.closeQuietly(inputStream);
129144
}
130145
}
131146

@@ -134,24 +149,39 @@ public boolean create(String dest) throws IOException {
134149
if (exists(new FsPath(dest))) {
135150
return false;
136151
}
137-
s3Client.putObject(bucket, dest, "");
152+
s3Client.putObject(bucket, buildKey(dest), "");
138153
return true;
139154
}
140155

141156
@Override
142157
public List<FsPath> list(FsPath path) throws IOException {
143158
try {
144159
if (!StringUtils.isEmpty(path.getPath())) {
145-
ListObjectsV2Result listObjectsV2Result = s3Client.listObjectsV2(bucket, path.getPath());
146-
List<S3ObjectSummary> s3ObjectSummaries = listObjectsV2Result.getObjectSummaries();
147-
return s3ObjectSummaries.stream()
148-
.filter(summary -> !isInitFile(summary))
149-
.map(
150-
summary -> {
151-
FsPath newPath = new FsPath(buildPath(summary.getKey()));
152-
return fillStorageFile(newPath, summary);
153-
})
154-
.collect(Collectors.toList());
160+
ListObjectsV2Request listObjectsV2Request =
161+
new ListObjectsV2Request()
162+
.withBucketName(bucket)
163+
.withPrefix(buildKey(path.getPath()) + "/")
164+
.withDelimiter("/");
165+
ListObjectsV2Result dirResult = s3Client.listObjectsV2(listObjectsV2Request);
166+
List<S3ObjectSummary> s3ObjectSummaries = dirResult.getObjectSummaries();
167+
List<String> commonPrefixes = dirResult.getCommonPrefixes();
168+
List<FsPath> fsPaths =
169+
s3ObjectSummaries.stream()
170+
.filter(summary -> !isInitFile(summary))
171+
.map(
172+
summary -> {
173+
FsPath newPath = new FsPath(buildPath(summary.getKey()));
174+
return fillStorageFile(newPath, summary);
175+
})
176+
.collect(Collectors.toList());
177+
if (commonPrefixes != null) {
178+
for (String dir : commonPrefixes) {
179+
FsPath newPath = new FsPath(buildPath(dir));
180+
newPath.setIsdir(true);
181+
fsPaths.add(newPath);
182+
}
183+
}
184+
return fsPaths;
155185
}
156186
} catch (AmazonS3Exception e) {
157187
throw new IOException("You have not permission to access path " + path.getPath());
@@ -173,7 +203,7 @@ public FsPathListWithError listPathWithError(FsPath path, boolean ignoreInitFile
173203
ListObjectsV2Request listObjectsV2Request =
174204
new ListObjectsV2Request()
175205
.withBucketName(bucket)
176-
.withPrefix(buildPrefix(path.getPath()))
206+
.withPrefix(buildKey(path.getPath()) + "/")
177207
.withDelimiter("/");
178208
ListObjectsV2Result dirResult = s3Client.listObjectsV2(listObjectsV2Request);
179209
List<S3ObjectSummary> s3ObjectSummaries = dirResult.getObjectSummaries();
@@ -204,25 +234,15 @@ public FsPathListWithError listPathWithError(FsPath path, boolean ignoreInitFile
204234
@Override
205235
public boolean exists(FsPath dest) throws IOException {
206236
try {
207-
if (new File(dest.getPath()).getName().contains(".")) {
208-
return existsFile(dest);
237+
if (dest == null) {
238+
return false;
209239
}
210240
ListObjectsV2Request listObjectsV2Request = new ListObjectsV2Request();
211241
listObjectsV2Request
212242
.withBucketName(bucket)
213-
.withPrefix(buildPrefix(dest.getPath()))
214-
.withDelimiter("/");
215-
return s3Client.listObjectsV2(listObjectsV2Request).getObjectSummaries().size()
216-
+ s3Client.listObjectsV2(listObjectsV2Request).getCommonPrefixes().size()
217-
> 0;
218-
} catch (AmazonS3Exception e) {
219-
return false;
220-
}
221-
}
222-
223-
public boolean existsFile(FsPath dest) {
224-
try {
225-
return s3Client.doesObjectExist(bucket, buildPrefix(dest.getPath(), false));
243+
.withPrefix(buildKey(dest.getPath()))
244+
.withMaxKeys(1);
245+
return !s3Client.listObjectsV2(listObjectsV2Request).getObjectSummaries().isEmpty();
226246
} catch (AmazonS3Exception e) {
227247
return false;
228248
}
@@ -231,25 +251,41 @@ public boolean existsFile(FsPath dest) {
231251
@Override
232252
public boolean delete(FsPath dest) throws IOException {
233253
try {
234-
ListObjectsV2Request listObjectsV2Request = new ListObjectsV2Request();
235-
listObjectsV2Request.withBucketName(bucket).withPrefix(buildPrefix(dest.getPath(), false));
236-
ListObjectsV2Result result = s3Client.listObjectsV2(listObjectsV2Request);
237-
String[] keyList =
238-
result.getObjectSummaries().stream().map(S3ObjectSummary::getKey).toArray(String[]::new);
239-
DeleteObjectsRequest deleteObjectsRequest =
240-
new DeleteObjectsRequest("test").withKeys(keyList);
241-
s3Client.deleteObjects(deleteObjectsRequest);
254+
List<String> deleteKeys = new ArrayList<>();
255+
delete(dest, deleteKeys);
256+
if (!deleteKeys.isEmpty()) {
257+
DeleteObjectsRequest deleteObjectsRequest =
258+
new DeleteObjectsRequest(bucket).withKeys(deleteKeys.toArray(new String[0]));
259+
s3Client.deleteObjects(deleteObjectsRequest);
260+
}
242261
return true;
243262
} catch (AmazonS3Exception e) {
244263
throw new IOException("You have not permission to access path " + dest.getPath());
245264
}
246265
}
247266

267+
public void delete(FsPath dest, List<String> keys) throws IOException {
268+
if (isDir(buildKey(dest.getPath()))) {
269+
FsPathListWithError fsPathListWithError = listPathWithError(dest, false);
270+
List<FsPath> fsPaths = fsPathListWithError.getFsPaths();
271+
fsPaths.forEach(
272+
fsPath -> {
273+
try {
274+
delete(fsPath, keys);
275+
} catch (IOException e) {
276+
throw new RuntimeException(e);
277+
}
278+
});
279+
} else {
280+
keys.add(buildKey(dest.getPath()));
281+
}
282+
}
283+
248284
@Override
249285
public boolean renameTo(FsPath oldDest, FsPath newDest) throws IOException {
250286
try {
251-
String newOriginPath = buildPrefix(oldDest.getPath(), false);
252-
String newDestPath = buildPrefix(newDest.getPath(), false);
287+
String newOriginPath = buildKey(oldDest.getPath());
288+
String newDestPath = buildKey(newDest.getPath());
253289
ListObjectsV2Request listObjectsV2Request = new ListObjectsV2Request();
254290
listObjectsV2Request.withBucketName(bucket).withPrefix(newOriginPath);
255291
ListObjectsV2Result result = s3Client.listObjectsV2(listObjectsV2Request);
@@ -281,8 +317,8 @@ public boolean renameTo(FsPath oldDest, FsPath newDest) throws IOException {
281317
@Override
282318
public boolean copy(String origin, String dest) throws IOException {
283319
try {
284-
String newOrigin = buildPrefix(origin, false);
285-
String newDest = buildPrefix(dest, false);
320+
String newOrigin = buildKey(origin);
321+
String newDest = buildKey(dest);
286322
ListObjectsV2Request listObjectsV2Request = new ListObjectsV2Request();
287323
listObjectsV2Request.withBucketName(bucket).withPrefix(newOrigin);
288324
ListObjectsV2Result result = s3Client.listObjectsV2(listObjectsV2Request);
@@ -305,8 +341,16 @@ public boolean copy(String origin, String dest) throws IOException {
305341
}
306342
}
307343

308-
private boolean isDir(S3ObjectSummary s3ObjectSummary, String prefix) {
309-
return s3ObjectSummary.getKey().substring(prefix.length()).contains("/");
344+
private boolean isDir(String key) {
345+
ListObjectsV2Request listObjectsV2Request = new ListObjectsV2Request();
346+
listObjectsV2Request
347+
.withBucketName(bucket)
348+
.withPrefix(key + "/")
349+
.withDelimiter("/")
350+
.withMaxKeys(1);
351+
352+
return !(s3Client.listObjectsV2(listObjectsV2Request).getCommonPrefixes().isEmpty()
353+
&& s3Client.listObjectsV2(listObjectsV2Request).getObjectSummaries().isEmpty());
310354
}
311355

312356
private boolean isInitFile(S3ObjectSummary s3ObjectSummary) {
@@ -318,6 +362,13 @@ public String listRoot() {
318362
return "/";
319363
}
320364

365+
/**
366+
* s3没有目录概念,无法直接创建目录 S3 lacks the concept of directories and cannot create directories directly.
367+
*
368+
* @param dest
369+
* @return
370+
* @throws IOException
371+
*/
321372
@Override
322373
public boolean mkdir(FsPath dest) throws IOException {
323374
String path = new File(dest.getPath(), INIT_FILE_NAME).getPath();
@@ -339,7 +390,7 @@ private FsPath fillStorageFile(FsPath fsPath, S3ObjectSummary s3ObjectSummary) {
339390
fsPath.setOwner(owner.getDisplayName());
340391
}
341392
try {
342-
fsPath.setIsdir(isDir(s3ObjectSummary, fsPath.getParent().getPath()));
393+
fsPath.setIsdir(isDir(s3ObjectSummary.getKey()));
343394
} catch (Throwable e) {
344395
logger.warn("Failed to fill storage file:" + fsPath.getPath(), e);
345396
}
@@ -359,7 +410,7 @@ public boolean canRead(FsPath dest) {
359410

360411
@Override
361412
public boolean canRead(FsPath dest, String user) throws IOException {
362-
return false;
413+
return true;
363414
}
364415

365416
@Override
@@ -384,7 +435,10 @@ public long getUsableSpace(FsPath dest) {
384435

385436
@Override
386437
public long getLength(FsPath dest) throws IOException {
387-
return 0;
438+
return s3Client
439+
.getObject(bucket, buildKey(dest.getPath()))
440+
.getObjectMetadata()
441+
.getContentLength();
388442
}
389443

390444
@Override
@@ -418,7 +472,9 @@ public boolean setPermission(FsPath dest, String permission) {
418472
}
419473

420474
@Override
421-
public void close() throws IOException {}
475+
public void close() throws IOException {
476+
s3Client.shutdown();
477+
}
422478

423479
public String getLabel() {
424480
return label;
@@ -429,46 +485,22 @@ public void setLabel(String label) {
429485
}
430486

431487
public String buildPath(String path) {
432-
if (path == null || "".equals(path)) return "";
488+
if (path == null || path.isEmpty()) return "";
433489
if (path.startsWith("/")) {
434490
return StorageUtils.S3_SCHEMA() + path;
435491
}
436492
return StorageUtils.S3_SCHEMA() + "/" + path;
437493
}
438494

439-
public String buildPrefix(String path, boolean addTail) {
495+
public String buildKey(String path) {
440496
String res = path;
441-
if (path == null || "".equals(path)) return "";
497+
if (path == null || path.isEmpty()) return "";
442498
if (path.startsWith("/")) {
443499
res = path.replaceFirst("/", "");
444500
}
445-
if (!path.endsWith("/") && addTail) {
446-
res = res + "/";
501+
if (path.endsWith("/") && !res.isEmpty()) {
502+
res = res.substring(0, res.length() - 1);
447503
}
448504
return res;
449505
}
450-
451-
public String buildPrefix(String path) {
452-
return buildPrefix(path, true);
453-
}
454-
}
455-
456-
class S3OutputStream extends ByteArrayOutputStream {
457-
private AmazonS3 s3Client;
458-
private String bucket;
459-
private String path;
460-
461-
public S3OutputStream(AmazonS3 s3Client, String bucket, String path) {
462-
this.s3Client = s3Client;
463-
this.bucket = bucket;
464-
this.path = path;
465-
}
466-
467-
@Override
468-
public void close() throws IOException {
469-
byte[] buffer = this.toByteArray();
470-
try (InputStream in = new ByteArrayInputStream(buffer)) {
471-
s3Client.putObject(bucket, path, in, new ObjectMetadata());
472-
}
473-
}
474506
}

0 commit comments

Comments
 (0)