Browse Source

more flexible replication configuration

pull/7526/head
chrislu 1 week ago
parent
commit
c96448f3a5
  1. 9
      other/java/hdfs2/README.md
  2. 58
      other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
  3. 9
      other/java/hdfs3/README.md
  4. 58
      other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java

9
other/java/hdfs2/README.md

@ -130,6 +130,15 @@ The test suite covers:
<name>fs.seaweed.filer.port.grpc</name>
<value>18888</value>
</property>
<!-- Optional: Replication configuration with three priority levels:
1) If set to non-empty value (e.g. "001") - uses that value
2) If set to empty string "" - uses SeaweedFS filer's default replication
3) If not configured (property not present) - uses HDFS replication parameter
-->
<!-- <property>
<name>fs.seaweed.replication</name>
<value>001</value>
</property> -->
</configuration>
```

58
other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java

@ -59,7 +59,7 @@ public class SeaweedFileSystem extends FileSystem {
port = (port == -1) ? FS_SEAWEED_DEFAULT_PORT : port;
conf.setInt(FS_SEAWEED_FILER_PORT, port);
int grpcPort = conf.getInt(FS_SEAWEED_FILER_PORT_GRPC, port+10000);
int grpcPort = conf.getInt(FS_SEAWEED_FILER_PORT_GRPC, port + 10000);
setConf(conf);
this.uri = uri;
@ -94,16 +94,23 @@ public class SeaweedFileSystem extends FileSystem {
@Override
public FSDataOutputStream create(Path path, FsPermission permission, final boolean overwrite, final int bufferSize,
final short replication, final long blockSize, final Progressable progress) throws IOException {
final short replication, final long blockSize, final Progressable progress) throws IOException {
LOG.debug("create path: {} bufferSize:{} blockSize:{}", path, bufferSize, blockSize);
path = qualify(path);
try {
String replicaPlacement = this.getConf().get(FS_SEAWEED_REPLICATION, String.format("%03d", replication - 1));
// Priority: 1) non-empty FS_SEAWEED_REPLICATION, 2) empty string -> filer
// default, 3) null -> HDFS replication
String replicaPlacement = this.getConf().get(FS_SEAWEED_REPLICATION);
if (replicaPlacement == null) {
// Not configured, use HDFS replication parameter
replicaPlacement = String.format("%03d", replication - 1);
}
int seaweedBufferSize = this.getConf().getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE);
OutputStream outputStream = seaweedFileSystemStore.createFile(path, overwrite, permission, seaweedBufferSize, replicaPlacement);
OutputStream outputStream = seaweedFileSystemStore.createFile(path, overwrite, permission,
seaweedBufferSize, replicaPlacement);
return new FSDataOutputStream(outputStream, statistics);
} catch (Exception ex) {
LOG.warn("create path: {} bufferSize:{} blockSize:{}", path, bufferSize, blockSize, ex);
@ -119,12 +126,12 @@ public class SeaweedFileSystem extends FileSystem {
*/
@Override
public FSDataOutputStream createNonRecursive(Path path,
FsPermission permission,
EnumSet<CreateFlag> flags,
int bufferSize,
short replication,
long blockSize,
Progressable progress) throws IOException {
FsPermission permission,
EnumSet<CreateFlag> flags,
int bufferSize,
short replication,
long blockSize,
Progressable progress) throws IOException {
Path parent = path.getParent();
if (parent != null) {
// expect this to raise an exception if there is no parent
@ -283,7 +290,6 @@ public class SeaweedFileSystem extends FileSystem {
seaweedFileSystemStore.setOwner(path, owner, group);
}
/**
* Set permission of a path.
*
@ -334,11 +340,11 @@ public class SeaweedFileSystem extends FileSystem {
* @param f The path to the file to be truncated
* @param newLength The size the file is to be truncated to
* @return <code>true</code> if the file has been truncated to the desired
* <code>newLength</code> and is immediately available to be reused for
* write operations such as <code>append</code>, or
* <code>false</code> if a background process of adjusting the length of
* the last block has been started, and clients should wait for it to
* complete before proceeding with further file updates.
* <code>newLength</code> and is immediately available to be reused for
* write operations such as <code>append</code>, or
* <code>false</code> if a background process of adjusting the length of
* the last block has been started, and clients should wait for it to
* complete before proceeding with further file updates.
* @throws IOException IO failure
* @throws UnsupportedOperationException if the operation is unsupported
* (default).
@ -351,8 +357,7 @@ public class SeaweedFileSystem extends FileSystem {
@Override
public void createSymlink(final Path target, final Path link,
final boolean createParent) throws
IOException {
final boolean createParent) throws IOException {
// Supporting filesystems should override this method
throw new UnsupportedOperationException(
"Filesystem does not support symlinks!");
@ -390,7 +395,7 @@ public class SeaweedFileSystem extends FileSystem {
*/
@Override
public void renameSnapshot(Path path, String snapshotOldName,
String snapshotNewName) throws IOException {
String snapshotNewName) throws IOException {
throw new UnsupportedOperationException(getClass().getSimpleName()
+ " doesn't support renameSnapshot");
}
@ -412,10 +417,10 @@ public class SeaweedFileSystem extends FileSystem {
}
/**
* Modifies ACL entries of files and directories. This method can add new ACL
* entries or modify the permissions on existing ACL entries. All existing
* Modifies ACL entries of files and directories. This method can add new ACL
* entries or modify the permissions on existing ACL entries. All existing
* ACL entries that are not specified in this call are retained without
* changes. (Modifications are merged into the current ACL.)
* changes. (Modifications are merged into the current ACL.)
*
* @param path Path to modify
* @param aclSpec List&lt;AclEntry&gt; describing modifications
@ -431,7 +436,7 @@ public class SeaweedFileSystem extends FileSystem {
}
/**
* Removes ACL entries from files and directories. Other ACL entries are
* Removes ACL entries from files and directories. Other ACL entries are
* retained.
*
* @param path Path to modify
@ -463,7 +468,7 @@ public class SeaweedFileSystem extends FileSystem {
}
/**
* Removes all but the base ACL entries of files and directories. The entries
* Removes all but the base ACL entries of files and directories. The entries
* for user, group, and others are retained for compatibility with permission
* bits.
*
@ -485,7 +490,8 @@ public class SeaweedFileSystem extends FileSystem {
*
* @param path Path to modify
* @param aclSpec List describing modifications, which must include entries
* for user, group, and others for compatibility with permission bits.
* for user, group, and others for compatibility with permission
* bits.
* @throws IOException if an ACL could not be modified
* @throws UnsupportedOperationException if the operation is unsupported
* (default outcome).
@ -528,7 +534,7 @@ public class SeaweedFileSystem extends FileSystem {
*/
@Override
public void setXAttr(Path path, String name, byte[] value,
EnumSet<XAttrSetFlag> flag) throws IOException {
EnumSet<XAttrSetFlag> flag) throws IOException {
throw new UnsupportedOperationException(getClass().getSimpleName()
+ " doesn't support setXAttr");
}

9
other/java/hdfs3/README.md

@ -130,6 +130,15 @@ The test suite covers:
<name>fs.seaweed.filer.port.grpc</name>
<value>18888</value>
</property>
<!-- Optional: Replication configuration with three priority levels:
1) If set to non-empty value (e.g. "001") - uses that value
2) If set to empty string "" - uses SeaweedFS filer's default replication
3) If not configured (property not present) - uses HDFS replication parameter
-->
<!-- <property>
<name>fs.seaweed.replication</name>
<value>001</value>
</property> -->
</configuration>
```

58
other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java

@ -59,7 +59,7 @@ public class SeaweedFileSystem extends FileSystem {
port = (port == -1) ? FS_SEAWEED_DEFAULT_PORT : port;
conf.setInt(FS_SEAWEED_FILER_PORT, port);
int grpcPort = conf.getInt(FS_SEAWEED_FILER_PORT_GRPC, port+10000);
int grpcPort = conf.getInt(FS_SEAWEED_FILER_PORT_GRPC, port + 10000);
setConf(conf);
this.uri = uri;
@ -94,16 +94,23 @@ public class SeaweedFileSystem extends FileSystem {
@Override
public FSDataOutputStream create(Path path, FsPermission permission, final boolean overwrite, final int bufferSize,
final short replication, final long blockSize, final Progressable progress) throws IOException {
final short replication, final long blockSize, final Progressable progress) throws IOException {
LOG.debug("create path: {} bufferSize:{} blockSize:{}", path, bufferSize, blockSize);
path = qualify(path);
try {
String replicaPlacement = this.getConf().get(FS_SEAWEED_REPLICATION, String.format("%03d", replication - 1));
// Priority: 1) non-empty FS_SEAWEED_REPLICATION, 2) empty string -> filer
// default, 3) null -> HDFS replication
String replicaPlacement = this.getConf().get(FS_SEAWEED_REPLICATION);
if (replicaPlacement == null) {
// Not configured, use HDFS replication parameter
replicaPlacement = String.format("%03d", replication - 1);
}
int seaweedBufferSize = this.getConf().getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE);
OutputStream outputStream = seaweedFileSystemStore.createFile(path, overwrite, permission, seaweedBufferSize, replicaPlacement);
OutputStream outputStream = seaweedFileSystemStore.createFile(path, overwrite, permission,
seaweedBufferSize, replicaPlacement);
return new FSDataOutputStream(outputStream, statistics);
} catch (Exception ex) {
LOG.warn("create path: {} bufferSize:{} blockSize:{}", path, bufferSize, blockSize, ex);
@ -119,12 +126,12 @@ public class SeaweedFileSystem extends FileSystem {
*/
@Override
public FSDataOutputStream createNonRecursive(Path path,
FsPermission permission,
EnumSet<CreateFlag> flags,
int bufferSize,
short replication,
long blockSize,
Progressable progress) throws IOException {
FsPermission permission,
EnumSet<CreateFlag> flags,
int bufferSize,
short replication,
long blockSize,
Progressable progress) throws IOException {
Path parent = path.getParent();
if (parent != null) {
// expect this to raise an exception if there is no parent
@ -283,7 +290,6 @@ public class SeaweedFileSystem extends FileSystem {
seaweedFileSystemStore.setOwner(path, owner, group);
}
/**
* Set permission of a path.
*
@ -334,11 +340,11 @@ public class SeaweedFileSystem extends FileSystem {
* @param f The path to the file to be truncated
* @param newLength The size the file is to be truncated to
* @return <code>true</code> if the file has been truncated to the desired
* <code>newLength</code> and is immediately available to be reused for
* write operations such as <code>append</code>, or
* <code>false</code> if a background process of adjusting the length of
* the last block has been started, and clients should wait for it to
* complete before proceeding with further file updates.
* <code>newLength</code> and is immediately available to be reused for
* write operations such as <code>append</code>, or
* <code>false</code> if a background process of adjusting the length of
* the last block has been started, and clients should wait for it to
* complete before proceeding with further file updates.
* @throws IOException IO failure
* @throws UnsupportedOperationException if the operation is unsupported
* (default).
@ -351,8 +357,7 @@ public class SeaweedFileSystem extends FileSystem {
@Override
public void createSymlink(final Path target, final Path link,
final boolean createParent) throws
IOException {
final boolean createParent) throws IOException {
// Supporting filesystems should override this method
throw new UnsupportedOperationException(
"Filesystem does not support symlinks!");
@ -390,7 +395,7 @@ public class SeaweedFileSystem extends FileSystem {
*/
@Override
public void renameSnapshot(Path path, String snapshotOldName,
String snapshotNewName) throws IOException {
String snapshotNewName) throws IOException {
throw new UnsupportedOperationException(getClass().getSimpleName()
+ " doesn't support renameSnapshot");
}
@ -412,10 +417,10 @@ public class SeaweedFileSystem extends FileSystem {
}
/**
* Modifies ACL entries of files and directories. This method can add new ACL
* entries or modify the permissions on existing ACL entries. All existing
* Modifies ACL entries of files and directories. This method can add new ACL
* entries or modify the permissions on existing ACL entries. All existing
* ACL entries that are not specified in this call are retained without
* changes. (Modifications are merged into the current ACL.)
* changes. (Modifications are merged into the current ACL.)
*
* @param path Path to modify
* @param aclSpec List&lt;AclEntry&gt; describing modifications
@ -431,7 +436,7 @@ public class SeaweedFileSystem extends FileSystem {
}
/**
* Removes ACL entries from files and directories. Other ACL entries are
* Removes ACL entries from files and directories. Other ACL entries are
* retained.
*
* @param path Path to modify
@ -463,7 +468,7 @@ public class SeaweedFileSystem extends FileSystem {
}
/**
* Removes all but the base ACL entries of files and directories. The entries
* Removes all but the base ACL entries of files and directories. The entries
* for user, group, and others are retained for compatibility with permission
* bits.
*
@ -485,7 +490,8 @@ public class SeaweedFileSystem extends FileSystem {
*
* @param path Path to modify
* @param aclSpec List describing modifications, which must include entries
* for user, group, and others for compatibility with permission bits.
* for user, group, and others for compatibility with permission
* bits.
* @throws IOException if an ACL could not be modified
* @throws UnsupportedOperationException if the operation is unsupported
* (default outcome).
@ -528,7 +534,7 @@ public class SeaweedFileSystem extends FileSystem {
*/
@Override
public void setXAttr(Path path, String name, byte[] value,
EnumSet<XAttrSetFlag> flag) throws IOException {
EnumSet<XAttrSetFlag> flag) throws IOException {
throw new UnsupportedOperationException(getClass().getSimpleName()
+ " doesn't support setXAttr");
}

Loading…
Cancel
Save