From 6ecefad692dfd62873006ecb34932864cd746541 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 25 Nov 2018 14:48:17 -0800 Subject: [PATCH] SeaweedFileSystem add rename --- .../java/seaweed/hdfs/SeaweedFileSystem.java | 26 +++++++- .../seaweed/hdfs/SeaweedFileSystemStore.java | 64 +++++++++++++++---- 2 files changed, 76 insertions(+), 14 deletions(-) diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystem.java b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystem.java index 217f4a10e..54e2d65cf 100644 --- a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystem.java +++ b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystem.java @@ -67,7 +67,31 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem { return null; } - public boolean rename(Path path, Path path1) throws IOException { + public boolean rename(Path src, Path dst) throws IOException { + + Path parentFolder = src.getParent(); + if (parentFolder == null) { + return false; + } + if (src.equals(dst)){ + return true; + } + FileStatus dstFileStatus = getFileStatus(dst); + + String sourceFileName = src.getName(); + Path adjustedDst = dst; + + if (dstFileStatus != null) { + if (!dstFileStatus.isDirectory()) { + return false; + } + adjustedDst = new Path(dst, sourceFileName); + } + + Path qualifiedSrcPath = qualify(src); + Path qualifiedDstPath = qualify(adjustedDst); + + seaweedFileSystemStore.rename(qualifiedSrcPath, qualifiedDstPath); return false; } diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java index 085d5d217..be60f3638 100644 --- a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java +++ b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java @@ -55,13 +55,9 @@ public class SeaweedFileSystemStore { List fileStatuses = new ArrayList(); - FilerProto.ListEntriesResponse response = - filerGrpcClient.getBlockingStub().listEntries(FilerProto.ListEntriesRequest.newBuilder() - .setDirectory(path.toUri().getPath()) - .setLimit(100000) - .build()); + List entries = lookupEntries(path); - for (FilerProto.Entry entry : response.getEntriesList()) { + for (FilerProto.Entry entry : entries) { FileStatus fileStatus = getFileStatus(new Path(path, entry.getName()), entry); @@ -70,16 +66,17 @@ public class SeaweedFileSystemStore { return fileStatuses.toArray(new FileStatus[0]); } + private List lookupEntries(Path path) { + return filerGrpcClient.getBlockingStub().listEntries(FilerProto.ListEntriesRequest.newBuilder() + .setDirectory(path.toUri().getPath()) + .setLimit(100000) + .build()).getEntriesList(); + } + public FileStatus getFileStatus(final Path path) { LOG.debug("getFileStatus path: {}", path); - FilerProto.LookupDirectoryEntryResponse response = - filerGrpcClient.getBlockingStub().lookupDirectoryEntry(FilerProto.LookupDirectoryEntryRequest.newBuilder() - .setDirectory(path.getParent().toUri().getPath()) - .setName(path.getName()) - .build()); - - FilerProto.Entry entry = response.getEntry(); + FilerProto.Entry entry = lookupEntry(path); FileStatus fileStatus = getFileStatus(path, entry); return fileStatus; } @@ -117,4 +114,45 @@ public class SeaweedFileSystemStore { modification_time, access_time, permission, owner, group, null, path); } + + private FilerProto.Entry lookupEntry(Path path) { + FilerProto.LookupDirectoryEntryResponse response = + filerGrpcClient.getBlockingStub().lookupDirectoryEntry(FilerProto.LookupDirectoryEntryRequest.newBuilder() + .setDirectory(path.getParent().toUri().getPath()) + .setName(path.getName()) + .build()); + + return response.getEntry(); + } + + public void rename(Path source, Path destination) { + FilerProto.Entry entry = lookupEntry(source); + moveEntry(source.getParent(), entry, destination); + } + + private boolean moveEntry(Path oldParent, FilerProto.Entry entry, Path destination) { + if (entry.getIsDirectory()) { + Path entryPath = new Path(oldParent, entry.getName()); + List entries = lookupEntries(entryPath); + for (FilerProto.Entry ent : entries) { + boolean isSucess = moveEntry(entryPath, ent, new Path(destination, ent.getName())); + if (!isSucess) { + return false; + } + } + } + + filerGrpcClient.getBlockingStub().createEntry(FilerProto.CreateEntryRequest.newBuilder() + .setDirectory(destination.getParent().toUri().getPath()) + .build()); + + filerGrpcClient.getBlockingStub().deleteEntry(FilerProto.DeleteEntryRequest.newBuilder() + .setDirectory(oldParent.toUri().getPath()) + .setName(entry.getName()) + .setIsDirectory(entry.getIsDirectory()) + .setIsDeleteData(false) + .build()); + + return true; + } }