From 21315f709de9520ae8dc8cd7a8998802ee89af72 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Fri, 14 Dec 2018 09:16:21 -0800 Subject: [PATCH] HCFS 1.0.2 --- other/java/client/pom.xml | 7 +- .../java/seaweedfs/client/FilerClient.java | 207 +++++++++++ other/java/client/src/main/proto/filer.proto | 2 +- other/java/hdfs/pom.xml | 24 +- .../java/seaweed/hdfs/SeaweedFileSystem.java | 339 ++++++++++++++++++ .../seaweed/hdfs/SeaweedFileSystemStore.java | 107 ++---- 6 files changed, 606 insertions(+), 80 deletions(-) create mode 100644 other/java/client/src/main/java/seaweedfs/client/FilerClient.java diff --git a/other/java/client/pom.xml b/other/java/client/pom.xml index 74404823b..a3c56856e 100644 --- a/other/java/client/pom.xml +++ b/other/java/client/pom.xml @@ -4,7 +4,7 @@ com.github.chrislusf seaweedfs-client - 1.0 + 1.0.2 org.sonatype.oss @@ -45,6 +45,11 @@ grpc-stub ${grpc.version} + + org.slf4j + slf4j-api + 1.7.25 + diff --git a/other/java/client/src/main/java/seaweedfs/client/FilerClient.java b/other/java/client/src/main/java/seaweedfs/client/FilerClient.java new file mode 100644 index 000000000..8414ee303 --- /dev/null +++ b/other/java/client/src/main/java/seaweedfs/client/FilerClient.java @@ -0,0 +1,207 @@ +package seaweedfs.client; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Arrays; +import java.util.List; + +public class FilerClient { + + private static final Logger LOG = LoggerFactory.getLogger(FilerClient.class); + + private FilerGrpcClient filerGrpcClient; + + public FilerClient(String host, int grpcPort) { + filerGrpcClient = new FilerGrpcClient(host, grpcPort); + } + + public FilerClient(FilerGrpcClient filerGrpcClient) { + this.filerGrpcClient = filerGrpcClient; + } + + public boolean mkdirs(String path, int mode) { + String currentUser = System.getProperty("user.name"); + return mkdirs(path, mode, 0, 0, currentUser, new String[]{}); + } + + public boolean mkdirs(String path, int mode, String userName, String[] groupNames) { + return mkdirs(path, mode, 0, 0, userName, groupNames); + } + + public boolean mkdirs(String path, int mode, int uid, int gid, String userName, String[] groupNames) { + + Path pathObject = Paths.get(path); + String parent = pathObject.getParent().toString(); + String name = pathObject.getFileName().toString(); + + if ("/".equals(parent)) { + return true; + } + + mkdirs(parent, mode, uid, gid, userName, groupNames); + + FilerProto.Entry existingEntry = lookupEntry(parent, name); + + if (existingEntry != null) { + return true; + } + + return createEntry( + parent, + newDirectoryEntry(name, mode, uid, gid, userName, groupNames).build() + ); + + } + + public boolean rm(String path, boolean isRecursive) { + + Path pathObject = Paths.get(path); + String parent = pathObject.getParent().toString(); + String name = pathObject.getFileName().toString(); + + return deleteEntry( + parent, + name, + true, + isRecursive); + } + + public boolean touch(String path, int mode) { + String currentUser = System.getProperty("user.name"); + return touch(path, mode, 0, 0, currentUser, new String[]{}); + } + + public boolean touch(String path, int mode, int uid, int gid, String userName, String[] groupNames) { + + Path pathObject = Paths.get(path); + String parent = pathObject.getParent().toString(); + String name = pathObject.getFileName().toString(); + + FilerProto.Entry entry = lookupEntry(parent, name); + if (entry == null) { + return createEntry( + parent, + newFileEntry(name, mode, uid, gid, userName, groupNames).build() + ); + } + long now = System.currentTimeMillis() / 1000L; + FilerProto.FuseAttributes.Builder attr = entry.getAttributes().toBuilder() + .setMtime(now) + .setUid(uid) + .setGid(gid) + .setUserName(userName) + .clearGroupName() + .addAllGroupName(Arrays.asList(groupNames)); + return updateEntry(parent, entry.toBuilder().setAttributes(attr).build()); + } + + public FilerProto.Entry.Builder newDirectoryEntry(String name, int mode, + int uid, int gid, String userName, String[] groupNames) { + + long now = System.currentTimeMillis() / 1000L; + + return FilerProto.Entry.newBuilder() + .setName(name) + .setIsDirectory(true) + .setAttributes(FilerProto.FuseAttributes.newBuilder() + .setMtime(now) + .setCrtime(now) + .setUid(uid) + .setGid(gid) + .setFileMode(mode | 1 << 31) + .setUserName(userName) + .clearGroupName() + .addAllGroupName(Arrays.asList(groupNames))); + } + + public FilerProto.Entry.Builder newFileEntry(String name, int mode, + int uid, int gid, String userName, String[] groupNames) { + + long now = System.currentTimeMillis() / 1000L; + + return FilerProto.Entry.newBuilder() + .setName(name) + .setIsDirectory(false) + .setAttributes(FilerProto.FuseAttributes.newBuilder() + .setMtime(now) + .setCrtime(now) + .setUid(uid) + .setGid(gid) + .setFileMode(mode) + .setUserName(userName) + .clearGroupName() + .addAllGroupName(Arrays.asList(groupNames))); + } + + public List listEntries(String path) { + return listEntries(path, "", "", 100000); + } + + public List listEntries(String path, String entryPrefix, String lastEntryName, int limit) { + return filerGrpcClient.getBlockingStub().listEntries(FilerProto.ListEntriesRequest.newBuilder() + .setDirectory(path) + .setPrefix(entryPrefix) + .setStartFromFileName(lastEntryName) + .setLimit(limit) + .build()).getEntriesList(); + } + + public FilerProto.Entry lookupEntry(String directory, String entryName) { + try { + return filerGrpcClient.getBlockingStub().lookupDirectoryEntry( + FilerProto.LookupDirectoryEntryRequest.newBuilder() + .setDirectory(directory) + .setName(entryName) + .build()).getEntry(); + } catch (Exception e) { + LOG.warn("lookupEntry {}/{}: {}", directory, entryName, e); + return null; + } + } + + + public boolean createEntry(String parent, FilerProto.Entry entry) { + try { + filerGrpcClient.getBlockingStub().createEntry(FilerProto.CreateEntryRequest.newBuilder() + .setDirectory(parent) + .setEntry(entry) + .build()); + } catch (Exception e) { + LOG.warn("createEntry {}/{}: {}", parent, entry.getName(), e); + return false; + } + return true; + } + + public boolean updateEntry(String parent, FilerProto.Entry entry) { + try { + filerGrpcClient.getBlockingStub().updateEntry(FilerProto.UpdateEntryRequest.newBuilder() + .setDirectory(parent) + .setEntry(entry) + .build()); + } catch (Exception e) { + LOG.warn("createEntry {}/{}: {}", parent, entry.getName(), e); + return false; + } + return true; + } + + public boolean deleteEntry(String parent, String entryName, boolean isDeleteFileChunk, boolean isRecursive) { + try { + filerGrpcClient.getBlockingStub().deleteEntry(FilerProto.DeleteEntryRequest.newBuilder() + .setDirectory(parent) + .setName(entryName) + .setIsDeleteData(isDeleteFileChunk) + .setIsRecursive(isRecursive) + .build()); + } catch (Exception e) { + LOG.warn("deleteEntry {}/{}: {}", parent, entryName, e); + return false; + } + return true; + } + +} diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto index 124eabcd2..bb33eb48e 100644 --- a/other/java/client/src/main/proto/filer.proto +++ b/other/java/client/src/main/proto/filer.proto @@ -117,7 +117,7 @@ message UpdateEntryResponse { message DeleteEntryRequest { string directory = 1; string name = 2; - bool is_directory = 3; + // bool is_directory = 3; bool is_delete_data = 4; bool is_recursive = 5; } diff --git a/other/java/hdfs/pom.xml b/other/java/hdfs/pom.xml index 522d39aa2..53d6746fa 100644 --- a/other/java/hdfs/pom.xml +++ b/other/java/hdfs/pom.xml @@ -6,7 +6,7 @@ com.github.chrislusf seaweedfs-hadoop-client - 1.0 + 1.0.2 org.sonatype.oss @@ -41,6 +41,18 @@ shade + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + org/slf4j/** + META-INF/maven/org.slf4j/** + + + @@ -54,6 +66,14 @@ io.grpc.internal shaded.io.grpc.internal + + org.apache.commons + shaded.org.apache.commons + + org.apache.hadoop + org.apache.log4j + + @@ -126,7 +146,7 @@ com.github.chrislusf seaweedfs-client - 1.0 + 1.0.2 org.apache.hadoop diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystem.java b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystem.java index dce642d09..7d992a7a1 100644 --- a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystem.java +++ b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystem.java @@ -5,17 +5,27 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.ParentNotDirectoryException; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.UnsupportedFileSystemException; +import org.apache.hadoop.fs.XAttrSetFlag; +import org.apache.hadoop.fs.permission.AclEntry; +import org.apache.hadoop.fs.permission.AclStatus; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Progressable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.URI; +import java.util.EnumSet; +import java.util.List; +import java.util.Map; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY; @@ -259,4 +269,333 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem { return path.makeQualified(uri, workingDirectory); } + /** + * Concat existing files together. + * @param trg the path to the target destination. + * @param psrcs the paths to the sources to use for the concatenation. + * @throws IOException IO failure + * @throws UnsupportedOperationException if the operation is unsupported + * (default). + */ + @Override + public void concat(final Path trg, final Path [] psrcs) throws IOException { + throw new UnsupportedOperationException("Not implemented by the " + + getClass().getSimpleName() + " FileSystem implementation"); + } + + /** + * Truncate the file in the indicated path to the indicated size. + *
    + *
  • Fails if path is a directory.
  • + *
  • Fails if path does not exist.
  • + *
  • Fails if path is not closed.
  • + *
  • Fails if new size is greater than current size.
  • + *
+ * @param f The path to the file to be truncated + * @param newLength The size the file is to be truncated to + * + * @return true if the file has been truncated to the desired + * newLength and is immediately available to be reused for + * write operations such as append, or + * false if a background process of adjusting the length of + * the last block has been started, and clients should wait for it to + * complete before proceeding with further file updates. + * @throws IOException IO failure + * @throws UnsupportedOperationException if the operation is unsupported + * (default). + */ + @Override + public boolean truncate(Path f, long newLength) throws IOException { + throw new UnsupportedOperationException("Not implemented by the " + + getClass().getSimpleName() + " FileSystem implementation"); + } + + /** + * See {@link FileContext#createSymlink(Path, Path, boolean)}. + */ + @Override + public void createSymlink(final Path target, final Path link, + final boolean createParent) throws AccessControlException, + FileAlreadyExistsException, FileNotFoundException, + ParentNotDirectoryException, UnsupportedFileSystemException, + IOException { + // Supporting filesystems should override this method + throw new UnsupportedOperationException( + "Filesystem does not support symlinks!"); + } + + /** + * See {@link AbstractFileSystem#supportsSymlinks()}. + */ + @Override + public boolean supportsSymlinks() { + return false; + } + + /** + * Create a snapshot. + * @param path The directory where snapshots will be taken. + * @param snapshotName The name of the snapshot + * @return the snapshot path. + * @throws IOException IO failure + * @throws UnsupportedOperationException if the operation is unsupported + */ + @Override + public Path createSnapshot(Path path, String snapshotName) + throws IOException { + throw new UnsupportedOperationException(getClass().getSimpleName() + + " doesn't support createSnapshot"); + } + + /** + * Rename a snapshot. + * @param path The directory path where the snapshot was taken + * @param snapshotOldName Old name of the snapshot + * @param snapshotNewName New name of the snapshot + * @throws IOException IO failure + * @throws UnsupportedOperationException if the operation is unsupported + * (default outcome). + */ + @Override + public void renameSnapshot(Path path, String snapshotOldName, + String snapshotNewName) throws IOException { + throw new UnsupportedOperationException(getClass().getSimpleName() + + " doesn't support renameSnapshot"); + } + + /** + * Delete a snapshot of a directory. + * @param path The directory that the to-be-deleted snapshot belongs to + * @param snapshotName The name of the snapshot + * @throws IOException IO failure + * @throws UnsupportedOperationException if the operation is unsupported + * (default outcome). + */ + @Override + public void deleteSnapshot(Path path, String snapshotName) + throws IOException { + throw new UnsupportedOperationException(getClass().getSimpleName() + + " doesn't support deleteSnapshot"); + } + + /** + * Modifies ACL entries of files and directories. This method can add new ACL + * entries or modify the permissions on existing ACL entries. All existing + * ACL entries that are not specified in this call are retained without + * changes. (Modifications are merged into the current ACL.) + * + * @param path Path to modify + * @param aclSpec List<AclEntry> describing modifications + * @throws IOException if an ACL could not be modified + * @throws UnsupportedOperationException if the operation is unsupported + * (default outcome). + */ + @Override + public void modifyAclEntries(Path path, List aclSpec) + throws IOException { + throw new UnsupportedOperationException(getClass().getSimpleName() + + " doesn't support modifyAclEntries"); + } + + /** + * Removes ACL entries from files and directories. Other ACL entries are + * retained. + * + * @param path Path to modify + * @param aclSpec List describing entries to remove + * @throws IOException if an ACL could not be modified + * @throws UnsupportedOperationException if the operation is unsupported + * (default outcome). + */ + @Override + public void removeAclEntries(Path path, List aclSpec) + throws IOException { + throw new UnsupportedOperationException(getClass().getSimpleName() + + " doesn't support removeAclEntries"); + } + + /** + * Removes all default ACL entries from files and directories. + * + * @param path Path to modify + * @throws IOException if an ACL could not be modified + * @throws UnsupportedOperationException if the operation is unsupported + * (default outcome). + */ + @Override + public void removeDefaultAcl(Path path) + throws IOException { + throw new UnsupportedOperationException(getClass().getSimpleName() + + " doesn't support removeDefaultAcl"); + } + + /** + * Removes all but the base ACL entries of files and directories. The entries + * for user, group, and others are retained for compatibility with permission + * bits. + * + * @param path Path to modify + * @throws IOException if an ACL could not be removed + * @throws UnsupportedOperationException if the operation is unsupported + * (default outcome). + */ + @Override + public void removeAcl(Path path) + throws IOException { + throw new UnsupportedOperationException(getClass().getSimpleName() + + " doesn't support removeAcl"); + } + + /** + * Fully replaces ACL of files and directories, discarding all existing + * entries. + * + * @param path Path to modify + * @param aclSpec List describing modifications, which must include entries + * for user, group, and others for compatibility with permission bits. + * @throws IOException if an ACL could not be modified + * @throws UnsupportedOperationException if the operation is unsupported + * (default outcome). + */ + @Override + public void setAcl(Path path, List aclSpec) throws IOException { + throw new UnsupportedOperationException(getClass().getSimpleName() + + " doesn't support setAcl"); + } + + /** + * Gets the ACL of a file or directory. + * + * @param path Path to get + * @return AclStatus describing the ACL of the file or directory + * @throws IOException if an ACL could not be read + * @throws UnsupportedOperationException if the operation is unsupported + * (default outcome). + */ + @Override + public AclStatus getAclStatus(Path path) throws IOException { + throw new UnsupportedOperationException(getClass().getSimpleName() + + " doesn't support getAclStatus"); + } + + /** + * Set an xattr of a file or directory. + * The name must be prefixed with the namespace followed by ".". For example, + * "user.attr". + *

+ * Refer to the HDFS extended attributes user documentation for details. + * + * @param path Path to modify + * @param name xattr name. + * @param value xattr value. + * @param flag xattr set flag + * @throws IOException IO failure + * @throws UnsupportedOperationException if the operation is unsupported + * (default outcome). + */ + @Override + public void setXAttr(Path path, String name, byte[] value, + EnumSet flag) throws IOException { + throw new UnsupportedOperationException(getClass().getSimpleName() + + " doesn't support setXAttr"); + } + + /** + * Get an xattr name and value for a file or directory. + * The name must be prefixed with the namespace followed by ".". For example, + * "user.attr". + *

+ * Refer to the HDFS extended attributes user documentation for details. + * + * @param path Path to get extended attribute + * @param name xattr name. + * @return byte[] xattr value. + * @throws IOException IO failure + * @throws UnsupportedOperationException if the operation is unsupported + * (default outcome). + */ + @Override + public byte[] getXAttr(Path path, String name) throws IOException { + throw new UnsupportedOperationException(getClass().getSimpleName() + + " doesn't support getXAttr"); + } + + /** + * Get all of the xattr name/value pairs for a file or directory. + * Only those xattrs which the logged-in user has permissions to view + * are returned. + *

+ * Refer to the HDFS extended attributes user documentation for details. + * + * @param path Path to get extended attributes + * @return Map describing the XAttrs of the file or directory + * @throws IOException IO failure + * @throws UnsupportedOperationException if the operation is unsupported + * (default outcome). + */ + @Override + public Map getXAttrs(Path path) throws IOException { + throw new UnsupportedOperationException(getClass().getSimpleName() + + " doesn't support getXAttrs"); + } + + /** + * Get all of the xattrs name/value pairs for a file or directory. + * Only those xattrs which the logged-in user has permissions to view + * are returned. + *

+ * Refer to the HDFS extended attributes user documentation for details. + * + * @param path Path to get extended attributes + * @param names XAttr names. + * @return Map describing the XAttrs of the file or directory + * @throws IOException IO failure + * @throws UnsupportedOperationException if the operation is unsupported + * (default outcome). + */ + @Override + public Map getXAttrs(Path path, List names) + throws IOException { + throw new UnsupportedOperationException(getClass().getSimpleName() + + " doesn't support getXAttrs"); + } + + /** + * Get all of the xattr names for a file or directory. + * Only those xattr names which the logged-in user has permissions to view + * are returned. + *

+ * Refer to the HDFS extended attributes user documentation for details. + * + * @param path Path to get extended attributes + * @return List{@literal } of the XAttr names of the file or directory + * @throws IOException IO failure + * @throws UnsupportedOperationException if the operation is unsupported + * (default outcome). + */ + @Override + public List listXAttrs(Path path) throws IOException { + throw new UnsupportedOperationException(getClass().getSimpleName() + + " doesn't support listXAttrs"); + } + + /** + * Remove an xattr of a file or directory. + * The name must be prefixed with the namespace followed by ".". For example, + * "user.attr". + *

+ * Refer to the HDFS extended attributes user documentation for details. + * + * @param path Path to remove extended attribute + * @param name xattr name + * @throws IOException IO failure + * @throws UnsupportedOperationException if the operation is unsupported + * (default outcome). + */ + @Override + public void removeXAttr(Path path, String name) throws IOException { + throw new UnsupportedOperationException(getClass().getSimpleName() + + " doesn't support removeXAttr"); + } + } diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java index c4b5683ce..ffc109b20 100644 --- a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java +++ b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java @@ -7,6 +7,7 @@ import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.security.UserGroupInformation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import seaweedfs.client.FilerClient; import seaweedfs.client.FilerGrpcClient; import seaweedfs.client.FilerProto; @@ -23,10 +24,12 @@ public class SeaweedFileSystemStore { private static final Logger LOG = LoggerFactory.getLogger(SeaweedFileSystemStore.class); private FilerGrpcClient filerGrpcClient; + private FilerClient filerClient; public SeaweedFileSystemStore(String host, int port) { int grpcPort = 10000 + port; filerGrpcClient = new FilerGrpcClient(host, grpcPort); + filerClient = new FilerClient(filerGrpcClient); } public static String getParentDirectory(Path path) { @@ -49,23 +52,12 @@ public class SeaweedFileSystemStore { permission, umask); - long now = System.currentTimeMillis() / 1000L; - - FilerProto.CreateEntryRequest.Builder request = FilerProto.CreateEntryRequest.newBuilder() - .setDirectory(getParentDirectory(path)) - .setEntry(FilerProto.Entry.newBuilder() - .setName(path.getName()) - .setIsDirectory(true) - .setAttributes(FilerProto.FuseAttributes.newBuilder() - .setMtime(now) - .setCrtime(now) - .setFileMode(permissionToMode(permission, true)) - .setUserName(currentUser.getUserName()) - .addAllGroupName(Arrays.asList(currentUser.getGroupNames()))) - ); - - FilerProto.CreateEntryResponse response = filerGrpcClient.getBlockingStub().createEntry(request.build()); - return true; + return filerClient.mkdirs( + path.toUri().getPath(), + permissionToMode(permission, true), + currentUser.getUserName(), + currentUser.getGroupNames() + ); } public FileStatus[] listEntries(final Path path) { @@ -73,7 +65,7 @@ public class SeaweedFileSystemStore { List fileStatuses = new ArrayList(); - List entries = lookupEntries(path); + List entries = filerClient.listEntries(path.toUri().getPath()); for (FilerProto.Entry entry : entries) { @@ -84,16 +76,6 @@ public class SeaweedFileSystemStore { return fileStatuses.toArray(new FileStatus[0]); } - private List lookupEntries(Path path) { - - LOG.debug("listEntries path: {}", path); - - return filerGrpcClient.getBlockingStub().listEntries(FilerProto.ListEntriesRequest.newBuilder() - .setDirectory(path.toUri().getPath()) - .setLimit(100000) - .build()).getEntriesList(); - } - public FileStatus getFileStatus(final Path path) { FilerProto.Entry entry = lookupEntry(path); @@ -106,32 +88,24 @@ public class SeaweedFileSystemStore { return fileStatus; } - public boolean deleteEntries(final Path path, boolean isDirectroy, boolean recursive) { + public boolean deleteEntries(final Path path, boolean isDirectory, boolean recursive) { LOG.debug("deleteEntries path: {} isDirectory {} recursive: {}", path, - String.valueOf(isDirectroy), + String.valueOf(isDirectory), String.valueOf(recursive)); if (path.isRoot()) { return true; } - if (recursive && isDirectroy) { - List entries = lookupEntries(path); + if (recursive && isDirectory) { + List entries = filerClient.listEntries(path.toUri().getPath()); for (FilerProto.Entry entry : entries) { - deleteEntries(new Path(path, entry.getName()), entry.getIsDirectory(), recursive); + deleteEntries(new Path(path, entry.getName()), entry.getIsDirectory(), true); } } - FilerProto.DeleteEntryResponse response = - filerGrpcClient.getBlockingStub().deleteEntry(FilerProto.DeleteEntryRequest.newBuilder() - .setDirectory(getParentDirectory(path)) - .setName(path.getName()) - .setIsDirectory(isDirectroy) - .setIsDeleteData(true) - .build()); - - return true; + return filerClient.deleteEntry(getParentDirectory(path), path.getName(), true, recursive); } private FileStatus doGetFileStatus(Path path, FilerProto.Entry entry) { @@ -151,18 +125,8 @@ public class SeaweedFileSystemStore { private FilerProto.Entry lookupEntry(Path path) { - String directory = getParentDirectory(path); + return filerClient.lookupEntry(getParentDirectory(path), path.getName()); - try { - FilerProto.LookupDirectoryEntryResponse response = - filerGrpcClient.getBlockingStub().lookupDirectoryEntry(FilerProto.LookupDirectoryEntryRequest.newBuilder() - .setDirectory(directory) - .setName(path.getName()) - .build()); - return response.getEntry(); - } catch (io.grpc.StatusRuntimeException e) { - return null; - } } public void rename(Path source, Path destination) { @@ -186,9 +150,16 @@ public class SeaweedFileSystemStore { LOG.debug("moveEntry: {}/{} => {}", oldParent, entry.getName(), destination); + FilerProto.Entry.Builder newEntry = entry.toBuilder().setName(destination.getName()); + boolean isDirectoryCreated = filerClient.createEntry(getParentDirectory(destination), newEntry.build()); + + if (!isDirectoryCreated) { + return false; + } + if (entry.getIsDirectory()) { Path entryPath = new Path(oldParent, entry.getName()); - List entries = lookupEntries(entryPath); + List entries = filerClient.listEntries(entryPath.toUri().getPath()); for (FilerProto.Entry ent : entries) { boolean isSucess = moveEntry(entryPath, ent, new Path(destination, ent.getName())); if (!isSucess) { @@ -197,20 +168,9 @@ public class SeaweedFileSystemStore { } } - FilerProto.Entry.Builder newEntry = entry.toBuilder().setName(destination.getName()); - filerGrpcClient.getBlockingStub().createEntry(FilerProto.CreateEntryRequest.newBuilder() - .setDirectory(getParentDirectory(destination)) - .setEntry(newEntry) - .build()); - - filerGrpcClient.getBlockingStub().deleteEntry(FilerProto.DeleteEntryRequest.newBuilder() - .setDirectory(oldParent.toUri().getPath()) - .setName(entry.getName()) - .setIsDirectory(entry.getIsDirectory()) - .setIsDeleteData(false) - .build()); - - return true; + return filerClient.deleteEntry( + oldParent.toUri().getPath(), entry.getName(), false, false); + } public OutputStream createFile(final Path path, @@ -253,6 +213,7 @@ public class SeaweedFileSystemStore { .setCrtime(now) .setMtime(now) .setUserName(userGroupInformation.getUserName()) + .clearGroupName() .addAllGroupName(Arrays.asList(userGroupInformation.getGroupNames())) ); } @@ -306,10 +267,7 @@ public class SeaweedFileSystemStore { LOG.debug("setOwner path:{} entry:{}", path, entryBuilder); - filerGrpcClient.getBlockingStub().updateEntry(FilerProto.UpdateEntryRequest.newBuilder() - .setDirectory(getParentDirectory(path)) - .setEntry(entryBuilder) - .build()); + filerClient.updateEntry(getParentDirectory(path), entryBuilder.build()); } @@ -332,10 +290,7 @@ public class SeaweedFileSystemStore { LOG.debug("setPermission path:{} entry:{}", path, entryBuilder); - filerGrpcClient.getBlockingStub().updateEntry(FilerProto.UpdateEntryRequest.newBuilder() - .setDirectory(getParentDirectory(path)) - .setEntry(entryBuilder) - .build()); + filerClient.updateEntry(getParentDirectory(path), entryBuilder.build()); } }