diff --git a/.github/workflows/java_integration_tests.yml b/.github/workflows/java_integration_tests.yml index 9b86d8e69..a15b050fa 100644 --- a/.github/workflows/java_integration_tests.yml +++ b/.github/workflows/java_integration_tests.yml @@ -128,11 +128,6 @@ jobs: run: | mvn test -Dtest=*IntegrationTest - - name: Run HDFS2 Configuration Tests - working-directory: other/java/hdfs2 - run: | - mvn test -Dtest=SeaweedFileSystemConfigTest -Dmaven.javadoc.skip=true -Dgpg.skip=true - - name: Run HDFS3 Configuration Tests working-directory: other/java/hdfs3 run: | diff --git a/.github/workflows/java_unit_tests.yml b/.github/workflows/java_unit_tests.yml index e79499b04..ddf7d03c9 100644 --- a/.github/workflows/java_unit_tests.yml +++ b/.github/workflows/java_unit_tests.yml @@ -42,11 +42,6 @@ jobs: run: | mvn test -Dtest=SeaweedReadTest,SeaweedCipherTest - - name: Run HDFS2 Configuration Tests - working-directory: other/java/hdfs2 - run: | - mvn test -Dtest=SeaweedFileSystemConfigTest -Dmaven.javadoc.skip=true -Dgpg.skip=true - - name: Run HDFS3 Configuration Tests working-directory: other/java/hdfs3 run: | @@ -59,6 +54,5 @@ jobs: name: test-reports-java-${{ matrix.java }} path: | other/java/client/target/surefire-reports/ - other/java/hdfs2/target/surefire-reports/ other/java/hdfs3/target/surefire-reports/ diff --git a/.github/workflows/spark-integration-tests.yml b/.github/workflows/spark-integration-tests.yml index 5d4e7ccd9..1f5e97d9a 100644 --- a/.github/workflows/spark-integration-tests.yml +++ b/.github/workflows/spark-integration-tests.yml @@ -4,14 +4,12 @@ on: push: paths: - 'test/java/spark/**' - - 'other/java/hdfs2/**' - 'other/java/hdfs3/**' - 'other/java/client/**' - '.github/workflows/spark-integration-tests.yml' pull_request: paths: - 'test/java/spark/**' - - 'other/java/hdfs2/**' - 'other/java/hdfs3/**' - 'other/java/client/**' - '.github/workflows/spark-integration-tests.yml' @@ -65,12 +63,6 @@ jobs: echo "OK Java client built" cd ../../.. - echo "Building HDFS2 client..." - cd other/java/hdfs2 - mvn clean install -U -DskipTests -Dgpg.skip=true -Dcentral.publishing.skip=true - echo "OK HDFS2 client built" - cd ../../.. - echo "Building HDFS3 client..." cd other/java/hdfs3 mvn clean install -U -DskipTests -Dgpg.skip=true -Dcentral.publishing.skip=true diff --git a/other/java/examples/pom.xml b/other/java/examples/pom.xml index 5c0981eae..d98158599 100644 --- a/other/java/examples/pom.xml +++ b/other/java/examples/pom.xml @@ -16,7 +16,7 @@ com.seaweedfs - seaweedfs-hadoop2-client + seaweedfs-hadoop3-client 3.80 compile diff --git a/other/java/hdfs2/README.md b/other/java/hdfs2/README.md deleted file mode 100644 index 6c712681a..000000000 --- a/other/java/hdfs2/README.md +++ /dev/null @@ -1,199 +0,0 @@ -# SeaweedFS Hadoop2 Client - -Hadoop FileSystem implementation for SeaweedFS, compatible with Hadoop 2.x/3.x. - -## Building - -```bash -mvn clean install -``` - -## Testing - -This project includes two types of tests: - -### 1. Configuration Tests (No SeaweedFS Required) - -These tests verify configuration handling and initialization logic without requiring a running SeaweedFS instance: - -```bash -mvn test -Dtest=SeaweedFileSystemConfigTest -``` - -### 2. Integration Tests (Requires SeaweedFS) - -These tests verify actual FileSystem operations against a running SeaweedFS instance. - -#### Prerequisites - -1. Start SeaweedFS with default ports: - ```bash - # Terminal 1: Start master - weed master - - # Terminal 2: Start volume server - weed volume -mserver=localhost:9333 - - # Terminal 3: Start filer - weed filer -master=localhost:9333 - ``` - -2. Verify services are running: - - Master: http://localhost:9333 - - Filer HTTP: http://localhost:8888 - - Filer gRPC: localhost:18888 - -#### Running Integration Tests - -```bash -# Enable integration tests -export SEAWEEDFS_TEST_ENABLED=true - -# Run all tests -mvn test - -# Run specific test -mvn test -Dtest=SeaweedFileSystemTest -``` - -### Test Configuration - -Integration tests can be configured via environment variables or system properties: - -- `SEAWEEDFS_TEST_ENABLED`: Set to `true` to enable integration tests (default: false) -- Tests use these default connection settings: - - Filer Host: localhost - - Filer HTTP Port: 8888 - - Filer gRPC Port: 18888 - -### Running Tests with Custom Configuration - -To test against a different SeaweedFS instance, modify the test code or use Hadoop configuration: - -```java -conf.set("fs.seaweed.filer.host", "your-host"); -conf.setInt("fs.seaweed.filer.port", 8888); -conf.setInt("fs.seaweed.filer.port.grpc", 18888); -``` - -## Test Coverage - -The test suite covers: - -- **Configuration & Initialization** - - URI parsing and configuration - - Default values - - Configuration overrides - - Working directory management - -- **File Operations** - - Create files - - Read files - - Write files - - Append to files - - Delete files - -- **Directory Operations** - - Create directories - - List directory contents - - Delete directories (recursive and non-recursive) - -- **Metadata Operations** - - Get file status - - Set permissions - - Set owner/group - - Rename files and directories - -## Usage in Hadoop - -1. Copy the built JAR to your Hadoop classpath: - ```bash - cp target/seaweedfs-hadoop2-client-*.jar $HADOOP_HOME/share/hadoop/common/lib/ - ``` - -2. Configure `core-site.xml`: - ```xml - - - fs.seaweedfs.impl - seaweed.hdfs.SeaweedFileSystem - - - fs.seaweed.filer.host - localhost - - - fs.seaweed.filer.port - 8888 - - - fs.seaweed.filer.port.grpc - 18888 - - - - - ``` - -3. Use SeaweedFS with Hadoop commands: - ```bash - hadoop fs -ls seaweedfs://localhost:8888/ - hadoop fs -mkdir seaweedfs://localhost:8888/test - hadoop fs -put local.txt seaweedfs://localhost:8888/test/ - ``` - -## Continuous Integration - -For CI environments, tests can be run in two modes: - -1. **Configuration Tests Only** (default, no SeaweedFS required): - ```bash - mvn test -Dtest=SeaweedFileSystemConfigTest - ``` - -2. **Full Integration Tests** (requires SeaweedFS): - ```bash - # Start SeaweedFS in CI environment - # Then run: - export SEAWEEDFS_TEST_ENABLED=true - mvn test - ``` - -## Troubleshooting - -### Tests are skipped - -If you see "Skipping test - SEAWEEDFS_TEST_ENABLED not set": -```bash -export SEAWEEDFS_TEST_ENABLED=true -``` - -### Connection refused errors - -Ensure SeaweedFS is running and accessible: -```bash -curl http://localhost:8888/ -``` - -### gRPC errors - -Verify the gRPC port is accessible: -```bash -# Should show the port is listening -netstat -an | grep 18888 -``` - -## Contributing - -When adding new features, please include: -1. Configuration tests (no SeaweedFS required) -2. Integration tests (with SEAWEEDFS_TEST_ENABLED guard) -3. Documentation updates - diff --git a/other/java/hdfs2/dependency-reduced-pom.xml b/other/java/hdfs2/dependency-reduced-pom.xml deleted file mode 100644 index 69f0e18ea..000000000 --- a/other/java/hdfs2/dependency-reduced-pom.xml +++ /dev/null @@ -1,578 +0,0 @@ - - - 4.0.0 - com.seaweedfs - seaweedfs-hadoop2-client - SeaweedFS HDFS2 Client - ${seaweedfs.client.version} - A java client for SeaweedFS. - https://github.com/seaweedfs/seaweedfs - - - Chris Lu - chris.lu@gmail.com - SeaweedFS - https://seaweedfs.com - - - - - The Apache License, Version 2.0 - http://www.apache.org/licenses/LICENSE-2.0.txt - - - - scm:git:git://github.com/seaweedfs/seaweedfs.git - scm:git:ssh://github.com:seaweedfs/seaweedfs.git - https://github.com/seaweedfs/seaweedfs/tree/master - - - - - maven-compiler-plugin - - 8 - 8 - 8 - - - - maven-shade-plugin - 3.2.1 - - - package - - shade - - - - - *:* - - META-INF/*.SF - META-INF/*.DSA - META-INF/*.RSA - org/slf4j/** - META-INF/maven/org.slf4j/** - - - - - - - - - com.google - shaded.com.google - - - io.grpc.internal - shaded.io.grpc.internal - - - org.apache.commons - shaded.org.apache.commons - - org.apache.hadoop - org.apache.log4j - - - - org.apache.http - shaded.org.apache.http - - - - - - - - maven-gpg-plugin - 1.5 - - - sign-artifacts - verify - - sign - - - - - - org.sonatype.central - central-publishing-maven-plugin - 0.5.0 - true - - central - true - - - - maven-source-plugin - 2.2.1 - - - attach-sources - - jar-no-fork - - - - - - maven-javadoc-plugin - 3.0.1 - - - attach-javadocs - - jar - - - - - - - - - org.apache.hadoop - hadoop-client - 3.4.0 - provided - - - hadoop-hdfs-client - org.apache.hadoop - - - hadoop-yarn-api - org.apache.hadoop - - - hadoop-yarn-client - org.apache.hadoop - - - hadoop-mapreduce-client-core - org.apache.hadoop - - - hadoop-mapreduce-client-jobclient - org.apache.hadoop - - - hadoop-annotations - org.apache.hadoop - - - - - org.apache.hadoop - hadoop-common - 3.4.0 - provided - - - hadoop-shaded-protobuf_3_21 - org.apache.hadoop.thirdparty - - - hadoop-shaded-guava - org.apache.hadoop.thirdparty - - - commons-cli - commons-cli - - - commons-math3 - org.apache.commons - - - commons-io - commons-io - - - commons-net - commons-net - - - commons-collections - commons-collections - - - javax.servlet-api - javax.servlet - - - jakarta.activation-api - jakarta.activation - - - jetty-server - org.eclipse.jetty - - - jetty-util - org.eclipse.jetty - - - jetty-servlet - org.eclipse.jetty - - - jetty-webapp - org.eclipse.jetty - - - jsp-api - javax.servlet.jsp - - - jersey-core - com.sun.jersey - - - jersey-servlet - com.sun.jersey - - - jersey-json - com.github.pjfanning - - - jettison - org.codehaus.jettison - - - jersey-server - com.sun.jersey - - - reload4j - ch.qos.reload4j - - - commons-beanutils - commons-beanutils - - - commons-configuration2 - org.apache.commons - - - commons-lang3 - org.apache.commons - - - commons-text - org.apache.commons - - - slf4j-reload4j - org.slf4j - - - avro - org.apache.avro - - - re2j - com.google.re2j - - - hadoop-auth - org.apache.hadoop - - - jsch - com.jcraft - - - curator-client - org.apache.curator - - - curator-recipes - org.apache.curator - - - zookeeper - org.apache.zookeeper - - - netty-handler - io.netty - - - netty-transport-native-epoll - io.netty - - - metrics-core - io.dropwizard.metrics - - - commons-compress - org.apache.commons - - - bcprov-jdk15on - org.bouncycastle - - - kerb-core - org.apache.kerby - - - jackson-databind - com.fasterxml.jackson.core - - - stax2-api - org.codehaus.woodstox - - - woodstox-core - com.fasterxml.woodstox - - - dnsjava - dnsjava - - - snappy-java - org.xerial.snappy - - - hadoop-annotations - org.apache.hadoop - - - - - junit - junit - 4.13.1 - test - - - hamcrest-core - org.hamcrest - - - - - org.mockito - mockito-core - 3.12.4 - test - - - byte-buddy - net.bytebuddy - - - byte-buddy-agent - net.bytebuddy - - - objenesis - org.objenesis - - - - - org.apache.hadoop - hadoop-common - 3.4.0 - test-jar - test - - - hadoop-shaded-protobuf_3_21 - org.apache.hadoop.thirdparty - - - hadoop-shaded-guava - org.apache.hadoop.thirdparty - - - commons-cli - commons-cli - - - commons-math3 - org.apache.commons - - - commons-io - commons-io - - - commons-net - commons-net - - - commons-collections - commons-collections - - - javax.servlet-api - javax.servlet - - - jakarta.activation-api - jakarta.activation - - - jetty-server - org.eclipse.jetty - - - jetty-util - org.eclipse.jetty - - - jetty-servlet - org.eclipse.jetty - - - jetty-webapp - org.eclipse.jetty - - - jsp-api - javax.servlet.jsp - - - jersey-core - com.sun.jersey - - - jersey-servlet - com.sun.jersey - - - jersey-json - com.github.pjfanning - - - jettison - org.codehaus.jettison - - - jersey-server - com.sun.jersey - - - reload4j - ch.qos.reload4j - - - commons-beanutils - commons-beanutils - - - commons-configuration2 - org.apache.commons - - - commons-lang3 - org.apache.commons - - - commons-text - org.apache.commons - - - slf4j-reload4j - org.slf4j - - - avro - org.apache.avro - - - re2j - com.google.re2j - - - hadoop-auth - org.apache.hadoop - - - jsch - com.jcraft - - - curator-client - org.apache.curator - - - curator-recipes - org.apache.curator - - - zookeeper - org.apache.zookeeper - - - netty-handler - io.netty - - - netty-transport-native-epoll - io.netty - - - metrics-core - io.dropwizard.metrics - - - commons-compress - org.apache.commons - - - bcprov-jdk15on - org.bouncycastle - - - kerb-core - org.apache.kerby - - - jackson-databind - com.fasterxml.jackson.core - - - stax2-api - org.codehaus.woodstox - - - woodstox-core - com.fasterxml.woodstox - - - dnsjava - dnsjava - - - snappy-java - org.xerial.snappy - - - hadoop-annotations - org.apache.hadoop - - - - - - 3.80 - 3.4.0 - - diff --git a/other/java/hdfs2/pom.xml b/other/java/hdfs2/pom.xml deleted file mode 100644 index 437a181a2..000000000 --- a/other/java/hdfs2/pom.xml +++ /dev/null @@ -1,195 +0,0 @@ - - - 4.0.0 - - - 3.80.1-SNAPSHOT - 3.4.0 - - - com.seaweedfs - seaweedfs-hadoop2-client - ${seaweedfs.client.version} - - SeaweedFS HDFS2 Client - A java client for SeaweedFS. - https://github.com/seaweedfs/seaweedfs - - - The Apache License, Version 2.0 - http://www.apache.org/licenses/LICENSE-2.0.txt - - - - - Chris Lu - chris.lu@gmail.com - SeaweedFS - https://seaweedfs.com - - - - scm:git:git://github.com/seaweedfs/seaweedfs.git - scm:git:ssh://github.com:seaweedfs/seaweedfs.git - https://github.com/seaweedfs/seaweedfs/tree/master - - - - - - org.apache.maven.plugins - maven-compiler-plugin - - 8 - 8 - 8 - - - - org.apache.maven.plugins - maven-shade-plugin - 3.2.1 - - - package - - shade - - - - - *:* - - META-INF/*.SF - META-INF/*.DSA - META-INF/*.RSA - org/slf4j/** - META-INF/maven/org.slf4j/** - - - - - - - - - com.google - shaded.com.google - - - io.grpc.internal - shaded.io.grpc.internal - - - org.apache.commons - shaded.org.apache.commons - - org.apache.hadoop - org.apache.log4j - - - - org.apache.http - shaded.org.apache.http - - - - - - - - org.apache.maven.plugins - maven-gpg-plugin - 1.5 - - - sign-artifacts - verify - - sign - - - - - - org.sonatype.central - central-publishing-maven-plugin - 0.5.0 - true - - central - true - - - - org.apache.maven.plugins - maven-source-plugin - 2.2.1 - - - attach-sources - - jar-no-fork - - - - - - org.apache.maven.plugins - maven-javadoc-plugin - 3.0.1 - - - attach-javadocs - - jar - - - - - - - - - - org.apache.hadoop - hadoop-client - ${hadoop.version} - provided - - - com.seaweedfs - seaweedfs-client - ${seaweedfs.client.version} - - - org.apache.hadoop - hadoop-common - ${hadoop.version} - provided - - - junit - junit - 4.13.1 - test - - - org.mockito - mockito-core - 3.12.4 - test - - - org.apache.hadoop - hadoop-common - ${hadoop.version} - test - test-jar - - - - diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/BufferedByteBufferReadableInputStream.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/BufferedByteBufferReadableInputStream.java deleted file mode 100644 index 3d0b68a52..000000000 --- a/other/java/hdfs2/src/main/java/seaweed/hdfs/BufferedByteBufferReadableInputStream.java +++ /dev/null @@ -1,25 +0,0 @@ -package seaweed.hdfs; - -import org.apache.hadoop.fs.*; - -import java.io.IOException; -import java.nio.ByteBuffer; - -public class BufferedByteBufferReadableInputStream extends BufferedFSInputStream implements ByteBufferReadable { - - public BufferedByteBufferReadableInputStream(FSInputStream in, int size) { - super(in, size); - if (!(in instanceof Seekable) || !(in instanceof PositionedReadable)) { - throw new IllegalArgumentException("In is not an instance of Seekable or PositionedReadable"); - } - } - - @Override - public int read(ByteBuffer buf) throws IOException { - if (this.in instanceof ByteBufferReadable) { - return ((ByteBufferReadable)this.in).read(buf); - } else { - throw new UnsupportedOperationException("Byte-buffer read unsupported by input stream"); - } - } -} diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedAbstractFileSystem.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedAbstractFileSystem.java deleted file mode 100644 index e021401aa..000000000 --- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedAbstractFileSystem.java +++ /dev/null @@ -1,35 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package seaweed.hdfs; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.DelegateToFileSystem; - -import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; - -public class SeaweedAbstractFileSystem extends DelegateToFileSystem { - - SeaweedAbstractFileSystem(final URI uri, final Configuration conf) - throws IOException, URISyntaxException { - super(uri, new SeaweedFileSystem(), conf, "seaweedfs", false); - } - -} diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java deleted file mode 100644 index 78cf5a2fc..000000000 --- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java +++ /dev/null @@ -1,643 +0,0 @@ -package seaweed.hdfs; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.*; -import org.apache.hadoop.fs.permission.AclEntry; -import org.apache.hadoop.fs.permission.AclStatus; -import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.util.Progressable; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import seaweedfs.client.FilerProto; - -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.OutputStream; -import java.net.URI; -import java.util.EnumSet; -import java.util.List; -import java.util.Map; - -public class SeaweedFileSystem extends FileSystem { - - public static final String FS_SEAWEED_FILER_HOST = "fs.seaweed.filer.host"; - public static final String FS_SEAWEED_FILER_PORT = "fs.seaweed.filer.port"; - public static final String FS_SEAWEED_FILER_PORT_GRPC = "fs.seaweed.filer.port.grpc"; - public static final int FS_SEAWEED_DEFAULT_PORT = 8888; - public static final String FS_SEAWEED_BUFFER_SIZE = "fs.seaweed.buffer.size"; - public static final String FS_SEAWEED_REPLICATION = "fs.seaweed.replication"; - public static final String FS_SEAWEED_VOLUME_SERVER_ACCESS = "fs.seaweed.volume.server.access"; - public static final int FS_SEAWEED_DEFAULT_BUFFER_SIZE = 4 * 1024 * 1024; - public static final String FS_SEAWEED_FILER_CN = "fs.seaweed.filer.cn"; - - private static final Logger LOG = LoggerFactory.getLogger(SeaweedFileSystem.class); - - private URI uri; - private Path workingDirectory = new Path("/"); - private SeaweedFileSystemStore seaweedFileSystemStore; - - public URI getUri() { - return uri; - } - - public String getScheme() { - return "seaweedfs"; - } - - @Override - public void initialize(URI uri, Configuration conf) throws IOException { // get - super.initialize(uri, conf); - - // get host information from uri (overrides info in conf) - String host = uri.getHost(); - host = (host == null) ? conf.get(FS_SEAWEED_FILER_HOST, "localhost") : host; - conf.set(FS_SEAWEED_FILER_HOST, host); - - // get port information from uri, (overrides info in conf) - int port = uri.getPort(); - port = (port == -1) ? FS_SEAWEED_DEFAULT_PORT : port; - conf.setInt(FS_SEAWEED_FILER_PORT, port); - - int grpcPort = conf.getInt(FS_SEAWEED_FILER_PORT_GRPC, port + 10000); - - setConf(conf); - this.uri = uri; - - String cn = conf.get(FS_SEAWEED_FILER_CN, ""); - - seaweedFileSystemStore = new SeaweedFileSystemStore(host, port, grpcPort, cn, conf); - } - - @Override - public void close() throws IOException { - super.close(); - this.seaweedFileSystemStore.close(); - } - - @Override - public FSDataInputStream open(Path path, int bufferSize) throws IOException { - - LOG.debug("open path: {} bufferSize:{}", path, bufferSize); - - path = qualify(path); - - try { - int seaweedBufferSize = this.getConf().getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE); - FSInputStream inputStream = seaweedFileSystemStore.openFileForRead(path, statistics); - return new FSDataInputStream(new BufferedByteBufferReadableInputStream(inputStream, 4 * seaweedBufferSize)); - } catch (Exception ex) { - LOG.error("Failed to open file: {} bufferSize:{}", path, bufferSize, ex); - throw new IOException("Failed to open file: " + path, ex); - } - } - - @Override - public FSDataOutputStream create(Path path, FsPermission permission, final boolean overwrite, final int bufferSize, - final short replication, final long blockSize, final Progressable progress) throws IOException { - - LOG.debug("create path: {} bufferSize:{} blockSize:{}", path, bufferSize, blockSize); - - path = qualify(path); - - try { - // Priority: 1) non-empty FS_SEAWEED_REPLICATION, 2) empty string -> filer - // default, 3) null -> HDFS replication - String replicaPlacement = this.getConf().get(FS_SEAWEED_REPLICATION); - if (replicaPlacement == null) { - // Not configured, use HDFS replication parameter. This creates a "00N" - // replication string, - // placing N (replication-1) extra replicas on different servers in the same - // rack. - replicaPlacement = String.format("%03d", replication - 1); - } - int seaweedBufferSize = this.getConf().getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE); - OutputStream outputStream = seaweedFileSystemStore.createFile(path, overwrite, permission, - seaweedBufferSize, replicaPlacement); - return new FSDataOutputStream(outputStream, statistics); - } catch (Exception ex) { - LOG.error("Failed to create file: {} bufferSize:{} blockSize:{}", path, bufferSize, blockSize, ex); - throw new IOException("Failed to create file: " + path, ex); - } - } - - /** - * {@inheritDoc} - * - * @throws FileNotFoundException if the parent directory is not present -or - * is not a directory. - */ - @Override - public FSDataOutputStream createNonRecursive(Path path, - FsPermission permission, - EnumSet flags, - int bufferSize, - short replication, - long blockSize, - Progressable progress) throws IOException { - Path parent = path.getParent(); - if (parent != null) { - // expect this to raise an exception if there is no parent - if (!getFileStatus(parent).isDirectory()) { - throw new FileAlreadyExistsException("Not a directory: " + parent); - } - } - int seaweedBufferSize = this.getConf().getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE); - return create(path, permission, - flags.contains(CreateFlag.OVERWRITE), bufferSize, - replication, seaweedBufferSize, progress); - } - - @Override - public FSDataOutputStream append(Path path, int bufferSize, Progressable progressable) throws IOException { - - LOG.debug("append path: {} bufferSize:{}", path, bufferSize); - - path = qualify(path); - try { - int seaweedBufferSize = this.getConf().getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE); - OutputStream outputStream = seaweedFileSystemStore.createFile(path, false, null, seaweedBufferSize, ""); - return new FSDataOutputStream(outputStream, statistics); - } catch (Exception ex) { - LOG.error("Failed to append to file: {} bufferSize:{}", path, bufferSize, ex); - throw new IOException("Failed to append to file: " + path, ex); - } - } - - @Override - public boolean rename(Path src, Path dst) throws IOException { - - LOG.debug("rename path: {} => {}", src, dst); - - if (src.isRoot()) { - return false; - } - - if (src.equals(dst)) { - return true; - } - FilerProto.Entry entry = seaweedFileSystemStore.lookupEntry(dst); - - Path adjustedDst = dst; - - if (entry != null) { - FileStatus dstFileStatus = getFileStatus(dst); - String sourceFileName = src.getName(); - if (!dstFileStatus.isDirectory()) { - return false; - } - adjustedDst = new Path(dst, sourceFileName); - } - - Path qualifiedSrcPath = qualify(src); - Path qualifiedDstPath = qualify(adjustedDst); - - seaweedFileSystemStore.rename(qualifiedSrcPath, qualifiedDstPath); - return true; - } - - @Override - public boolean delete(Path path, boolean recursive) throws IOException { - - LOG.debug("delete path: {} recursive:{}", path, recursive); - - path = qualify(path); - - FilerProto.Entry entry = seaweedFileSystemStore.lookupEntry(path); - - if (entry == null) { - return true; - } - - FileStatus fileStatus = getFileStatus(path); - - return seaweedFileSystemStore.deleteEntries(path, fileStatus.isDirectory(), recursive); - - } - - @Override - public FileStatus[] listStatus(Path path) throws IOException { - - LOG.debug("listStatus path: {}", path); - - path = qualify(path); - - return seaweedFileSystemStore.listEntries(path); - } - - @Override - public Path getWorkingDirectory() { - return workingDirectory; - } - - @Override - public void setWorkingDirectory(Path path) { - if (path.isAbsolute()) { - workingDirectory = path; - } else { - workingDirectory = new Path(workingDirectory, path); - } - } - - @Override - public boolean mkdirs(Path path, FsPermission fsPermission) throws IOException { - - LOG.debug("mkdirs path: {}", path); - - path = qualify(path); - - FilerProto.Entry entry = seaweedFileSystemStore.lookupEntry(path); - - if (entry == null) { - - UserGroupInformation currentUser = UserGroupInformation.getCurrentUser(); - return seaweedFileSystemStore.createDirectory(path, currentUser, - fsPermission == null ? FsPermission.getDirDefault() : fsPermission, - FsPermission.getUMask(getConf())); - - } - - FileStatus fileStatus = getFileStatus(path); - - if (fileStatus.isDirectory()) { - return true; - } else { - throw new FileAlreadyExistsException("Path is a file: " + path); - } - } - - @Override - public FileStatus getFileStatus(Path path) throws IOException { - - LOG.debug("getFileStatus path: {}", path); - - path = qualify(path); - - return seaweedFileSystemStore.getFileStatus(path); - } - - /** - * Set owner of a path (i.e. a file or a directory). - * The parameters owner and group cannot both be null. - * - * @param path The path - * @param owner If it is null, the original username remains unchanged. - * @param group If it is null, the original groupname remains unchanged. - */ - @Override - public void setOwner(Path path, final String owner, final String group) - throws IOException { - LOG.debug("setOwner path: {}", path); - path = qualify(path); - - seaweedFileSystemStore.setOwner(path, owner, group); - } - - /** - * Set permission of a path. - * - * @param path The path - * @param permission Access permission - */ - @Override - public void setPermission(Path path, final FsPermission permission) throws IOException { - LOG.debug("setPermission path: {}", path); - - if (permission == null) { - throw new IllegalArgumentException("The permission can't be null"); - } - - path = qualify(path); - - seaweedFileSystemStore.setPermission(path, permission); - } - - Path qualify(Path path) { - return path.makeQualified(uri, workingDirectory); - } - - /** - * Concat existing files together. - * - * @param trg the path to the target destination. - * @param psrcs the paths to the sources to use for the concatenation. - * @throws IOException IO failure - * @throws UnsupportedOperationException if the operation is unsupported - * (default). - */ - @Override - public void concat(final Path trg, final Path[] psrcs) throws IOException { - throw new UnsupportedOperationException("Not implemented by the " + - getClass().getSimpleName() + " FileSystem implementation"); - } - - /** - * Truncate the file in the indicated path to the indicated size. - * - * - * @param f The path to the file to be truncated - * @param newLength The size the file is to be truncated to - * @return true if the file has been truncated to the desired - * newLength and is immediately available to be reused for - * write operations such as append, or - * false if a background process of adjusting the length of - * the last block has been started, and clients should wait for it to - * complete before proceeding with further file updates. - * @throws IOException IO failure - * @throws UnsupportedOperationException if the operation is unsupported - * (default). - */ - @Override - public boolean truncate(Path f, long newLength) throws IOException { - throw new UnsupportedOperationException("Not implemented by the " + - getClass().getSimpleName() + " FileSystem implementation"); - } - - @Override - public void createSymlink(final Path target, final Path link, - final boolean createParent) throws IOException { - // Supporting filesystems should override this method - throw new UnsupportedOperationException( - "Filesystem does not support symlinks!"); - } - - public boolean supportsSymlinks() { - return false; - } - - /** - * Create a snapshot. - * - * @param path The directory where snapshots will be taken. - * @param snapshotName The name of the snapshot - * @return the snapshot path. - * @throws IOException IO failure - * @throws UnsupportedOperationException if the operation is unsupported - */ - @Override - public Path createSnapshot(Path path, String snapshotName) - throws IOException { - throw new UnsupportedOperationException(getClass().getSimpleName() - + " doesn't support createSnapshot"); - } - - /** - * Rename a snapshot. - * - * @param path The directory path where the snapshot was taken - * @param snapshotOldName Old name of the snapshot - * @param snapshotNewName New name of the snapshot - * @throws IOException IO failure - * @throws UnsupportedOperationException if the operation is unsupported - * (default outcome). - */ - @Override - public void renameSnapshot(Path path, String snapshotOldName, - String snapshotNewName) throws IOException { - throw new UnsupportedOperationException(getClass().getSimpleName() - + " doesn't support renameSnapshot"); - } - - /** - * Delete a snapshot of a directory. - * - * @param path The directory that the to-be-deleted snapshot belongs to - * @param snapshotName The name of the snapshot - * @throws IOException IO failure - * @throws UnsupportedOperationException if the operation is unsupported - * (default outcome). - */ - @Override - public void deleteSnapshot(Path path, String snapshotName) - throws IOException { - throw new UnsupportedOperationException(getClass().getSimpleName() - + " doesn't support deleteSnapshot"); - } - - /** - * Modifies ACL entries of files and directories. This method can add new ACL - * entries or modify the permissions on existing ACL entries. All existing - * ACL entries that are not specified in this call are retained without - * changes. (Modifications are merged into the current ACL.) - * - * @param path Path to modify - * @param aclSpec List<AclEntry> describing modifications - * @throws IOException if an ACL could not be modified - * @throws UnsupportedOperationException if the operation is unsupported - * (default outcome). - */ - @Override - public void modifyAclEntries(Path path, List aclSpec) - throws IOException { - throw new UnsupportedOperationException(getClass().getSimpleName() - + " doesn't support modifyAclEntries"); - } - - /** - * Removes ACL entries from files and directories. Other ACL entries are - * retained. - * - * @param path Path to modify - * @param aclSpec List describing entries to remove - * @throws IOException if an ACL could not be modified - * @throws UnsupportedOperationException if the operation is unsupported - * (default outcome). - */ - @Override - public void removeAclEntries(Path path, List aclSpec) - throws IOException { - throw new UnsupportedOperationException(getClass().getSimpleName() - + " doesn't support removeAclEntries"); - } - - /** - * Removes all default ACL entries from files and directories. - * - * @param path Path to modify - * @throws IOException if an ACL could not be modified - * @throws UnsupportedOperationException if the operation is unsupported - * (default outcome). - */ - @Override - public void removeDefaultAcl(Path path) - throws IOException { - throw new UnsupportedOperationException(getClass().getSimpleName() - + " doesn't support removeDefaultAcl"); - } - - /** - * Removes all but the base ACL entries of files and directories. The entries - * for user, group, and others are retained for compatibility with permission - * bits. - * - * @param path Path to modify - * @throws IOException if an ACL could not be removed - * @throws UnsupportedOperationException if the operation is unsupported - * (default outcome). - */ - @Override - public void removeAcl(Path path) - throws IOException { - throw new UnsupportedOperationException(getClass().getSimpleName() - + " doesn't support removeAcl"); - } - - /** - * Fully replaces ACL of files and directories, discarding all existing - * entries. - * - * @param path Path to modify - * @param aclSpec List describing modifications, which must include entries - * for user, group, and others for compatibility with permission - * bits. - * @throws IOException if an ACL could not be modified - * @throws UnsupportedOperationException if the operation is unsupported - * (default outcome). - */ - @Override - public void setAcl(Path path, List aclSpec) throws IOException { - throw new UnsupportedOperationException(getClass().getSimpleName() - + " doesn't support setAcl"); - } - - /** - * Gets the ACL of a file or directory. - * - * @param path Path to get - * @return AclStatus describing the ACL of the file or directory - * @throws IOException if an ACL could not be read - * @throws UnsupportedOperationException if the operation is unsupported - * (default outcome). - */ - @Override - public AclStatus getAclStatus(Path path) throws IOException { - throw new UnsupportedOperationException(getClass().getSimpleName() - + " doesn't support getAclStatus"); - } - - /** - * Set an xattr of a file or directory. - * The name must be prefixed with the namespace followed by ".". For example, - * "user.attr". - *

- * Refer to the HDFS extended attributes user documentation for details. - * - * @param path Path to modify - * @param name xattr name. - * @param value xattr value. - * @param flag xattr set flag - * @throws IOException IO failure - * @throws UnsupportedOperationException if the operation is unsupported - * (default outcome). - */ - @Override - public void setXAttr(Path path, String name, byte[] value, - EnumSet flag) throws IOException { - throw new UnsupportedOperationException(getClass().getSimpleName() - + " doesn't support setXAttr"); - } - - /** - * Get an xattr name and value for a file or directory. - * The name must be prefixed with the namespace followed by ".". For example, - * "user.attr". - *

- * Refer to the HDFS extended attributes user documentation for details. - * - * @param path Path to get extended attribute - * @param name xattr name. - * @return byte[] xattr value. - * @throws IOException IO failure - * @throws UnsupportedOperationException if the operation is unsupported - * (default outcome). - */ - @Override - public byte[] getXAttr(Path path, String name) throws IOException { - throw new UnsupportedOperationException(getClass().getSimpleName() - + " doesn't support getXAttr"); - } - - /** - * Get all of the xattr name/value pairs for a file or directory. - * Only those xattrs which the logged-in user has permissions to view - * are returned. - *

- * Refer to the HDFS extended attributes user documentation for details. - * - * @param path Path to get extended attributes - * @return Map describing the XAttrs of the file or directory - * @throws IOException IO failure - * @throws UnsupportedOperationException if the operation is unsupported - * (default outcome). - */ - @Override - public Map getXAttrs(Path path) throws IOException { - throw new UnsupportedOperationException(getClass().getSimpleName() - + " doesn't support getXAttrs"); - } - - /** - * Get all of the xattrs name/value pairs for a file or directory. - * Only those xattrs which the logged-in user has permissions to view - * are returned. - *

- * Refer to the HDFS extended attributes user documentation for details. - * - * @param path Path to get extended attributes - * @param names XAttr names. - * @return Map describing the XAttrs of the file or directory - * @throws IOException IO failure - * @throws UnsupportedOperationException if the operation is unsupported - * (default outcome). - */ - @Override - public Map getXAttrs(Path path, List names) - throws IOException { - throw new UnsupportedOperationException(getClass().getSimpleName() - + " doesn't support getXAttrs"); - } - - /** - * Get all of the xattr names for a file or directory. - * Only those xattr names which the logged-in user has permissions to view - * are returned. - *

- * Refer to the HDFS extended attributes user documentation for details. - * - * @param path Path to get extended attributes - * @return List{@literal } of the XAttr names of the file or directory - * @throws IOException IO failure - * @throws UnsupportedOperationException if the operation is unsupported - * (default outcome). - */ - @Override - public List listXAttrs(Path path) throws IOException { - throw new UnsupportedOperationException(getClass().getSimpleName() - + " doesn't support listXAttrs"); - } - - /** - * Remove an xattr of a file or directory. - * The name must be prefixed with the namespace followed by ".". For example, - * "user.attr". - *

- * Refer to the HDFS extended attributes user documentation for details. - * - * @param path Path to remove extended attribute - * @param name xattr name - * @throws IOException IO failure - * @throws UnsupportedOperationException if the operation is unsupported - * (default outcome). - */ - @Override - public void removeXAttr(Path path, String name) throws IOException { - throw new UnsupportedOperationException(getClass().getSimpleName() - + " doesn't support removeXAttr"); - } - -} diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java deleted file mode 100644 index f65c1961b..000000000 --- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java +++ /dev/null @@ -1,291 +0,0 @@ -package seaweed.hdfs; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSInputStream; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.security.UserGroupInformation; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import seaweedfs.client.*; - -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.OutputStream; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; - -import static seaweed.hdfs.SeaweedFileSystem.*; - -public class SeaweedFileSystemStore { - - private static final Logger LOG = LoggerFactory.getLogger(SeaweedFileSystemStore.class); - - private FilerClient filerClient; - private Configuration conf; - - public SeaweedFileSystemStore(String host, int port, int grpcPort, String cn, Configuration conf) { - filerClient = new FilerClient(host, port, grpcPort, cn); - this.conf = conf; - String volumeServerAccessMode = this.conf.get(FS_SEAWEED_VOLUME_SERVER_ACCESS, "direct"); - if (volumeServerAccessMode.equals("publicUrl")) { - filerClient.setAccessVolumeServerByPublicUrl(); - } else if (volumeServerAccessMode.equals("filerProxy")) { - filerClient.setAccessVolumeServerByFilerProxy(); - } - } - - public void close() { - try { - this.filerClient.shutdown(); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - - public static String getParentDirectory(Path path) { - return path.isRoot() ? "/" : path.getParent().toUri().getPath(); - } - - static int permissionToMode(FsPermission permission, boolean isDirectory) { - int p = permission.toShort(); - if (isDirectory) { - p = p | 1 << 31; - } - return p; - } - - public boolean createDirectory(final Path path, UserGroupInformation currentUser, - final FsPermission permission, final FsPermission umask) { - - LOG.debug("createDirectory path: {} permission: {} umask: {}", - path, - permission, - umask); - - return filerClient.mkdirs( - path.toUri().getPath(), - permissionToMode(permission, true), - currentUser.getUserName(), - currentUser.getGroupNames() - ); - } - - public FileStatus[] listEntries(final Path path) throws IOException { - LOG.debug("listEntries path: {}", path); - - FileStatus pathStatus = getFileStatus(path); - - if (pathStatus == null) { - return new FileStatus[0]; - } - - if (!pathStatus.isDirectory()) { - return new FileStatus[]{pathStatus}; - } - - List fileStatuses = new ArrayList(); - - List entries = filerClient.listEntries(path.toUri().getPath()); - - for (FilerProto.Entry entry : entries) { - - FileStatus fileStatus = doGetFileStatus(new Path(path, entry.getName()), entry); - - fileStatuses.add(fileStatus); - } - LOG.debug("listEntries path: {} size {}", fileStatuses, fileStatuses.size()); - return fileStatuses.toArray(new FileStatus[0]); - - } - - public FileStatus getFileStatus(final Path path) throws IOException { - - FilerProto.Entry entry = lookupEntry(path); - if (entry == null) { - throw new FileNotFoundException("File does not exist: " + path); - } - LOG.debug("doGetFileStatus path:{} entry:{}", path, entry); - - FileStatus fileStatus = doGetFileStatus(path, entry); - return fileStatus; - } - - public boolean deleteEntries(final Path path, boolean isDirectory, boolean recursive) { - LOG.debug("deleteEntries path: {} isDirectory {} recursive: {}", - path, - String.valueOf(isDirectory), - String.valueOf(recursive)); - - if (path.isRoot()) { - return true; - } - - if (recursive && isDirectory) { - List entries = filerClient.listEntries(path.toUri().getPath()); - for (FilerProto.Entry entry : entries) { - deleteEntries(new Path(path, entry.getName()), entry.getIsDirectory(), true); - } - } - - return filerClient.deleteEntry(getParentDirectory(path), path.getName(), true, recursive, true); - } - - private FileStatus doGetFileStatus(Path path, FilerProto.Entry entry) { - FilerProto.FuseAttributes attributes = entry.getAttributes(); - long length = SeaweedRead.fileSize(entry); - boolean isDir = entry.getIsDirectory(); - int block_replication = 1; - int blocksize = this.conf.getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE); - long modification_time = attributes.getMtime() * 1000; // milliseconds - long access_time = 0; - FsPermission permission = FsPermission.createImmutable((short) attributes.getFileMode()); - String owner = attributes.getUserName(); - String group = attributes.getGroupNameCount() > 0 ? attributes.getGroupName(0) : ""; - return new FileStatus(length, isDir, block_replication, blocksize, - modification_time, access_time, permission, owner, group, null, path); - } - - public FilerProto.Entry lookupEntry(Path path) { - - return filerClient.lookupEntry(getParentDirectory(path), path.getName()); - - } - - public void rename(Path source, Path destination) { - - LOG.debug("rename source: {} destination:{}", source, destination); - - if (source.isRoot()) { - return; - } - LOG.info("rename source: {} destination:{}", source, destination); - FilerProto.Entry entry = lookupEntry(source); - if (entry == null) { - LOG.warn("rename non-existing source: {}", source); - return; - } - filerClient.mv(source.toUri().getPath(), destination.toUri().getPath()); - } - - public OutputStream createFile(final Path path, - final boolean overwrite, - FsPermission permission, - int bufferSize, - String replication) throws IOException { - - permission = permission == null ? FsPermission.getFileDefault() : permission; - - LOG.debug("createFile path: {} overwrite: {} permission: {}", - path, - overwrite, - permission.toString()); - - UserGroupInformation userGroupInformation = UserGroupInformation.getCurrentUser(); - long now = System.currentTimeMillis() / 1000L; - - FilerProto.Entry.Builder entry = null; - long writePosition = 0; - if (!overwrite) { - FilerProto.Entry existingEntry = lookupEntry(path); - LOG.debug("createFile merged entry path:{} existingEntry:{}", path, existingEntry); - if (existingEntry != null) { - entry = FilerProto.Entry.newBuilder(); - entry.mergeFrom(existingEntry); - entry.clearContent(); - entry.getAttributesBuilder().setMtime(now); - LOG.debug("createFile merged entry path:{} entry:{} from:{}", path, entry, existingEntry); - writePosition = SeaweedRead.fileSize(existingEntry); - } - } - if (entry == null) { - entry = FilerProto.Entry.newBuilder() - .setName(path.getName()) - .setIsDirectory(false) - .setAttributes(FilerProto.FuseAttributes.newBuilder() - .setFileMode(permissionToMode(permission, false)) - .setCrtime(now) - .setMtime(now) - .setUserName(userGroupInformation.getUserName()) - .clearGroupName() - .addAllGroupName(Arrays.asList(userGroupInformation.getGroupNames())) - ); - SeaweedWrite.writeMeta(filerClient, getParentDirectory(path), entry); - } - - return new SeaweedHadoopOutputStream(filerClient, path.toString(), entry, writePosition, bufferSize, replication); - - } - - public FSInputStream openFileForRead(final Path path, FileSystem.Statistics statistics) throws IOException { - - LOG.debug("openFileForRead path:{}", path); - - FilerProto.Entry entry = lookupEntry(path); - - if (entry == null) { - throw new FileNotFoundException("read non-exist file " + path); - } - - return new SeaweedHadoopInputStream(filerClient, - statistics, - path.toUri().getPath(), - entry); - } - - public void setOwner(Path path, String owner, String group) { - - LOG.debug("setOwner path:{} owner:{} group:{}", path, owner, group); - - FilerProto.Entry entry = lookupEntry(path); - if (entry == null) { - LOG.debug("setOwner path:{} entry:{}", path, entry); - return; - } - - FilerProto.Entry.Builder entryBuilder = entry.toBuilder(); - FilerProto.FuseAttributes.Builder attributesBuilder = entry.getAttributes().toBuilder(); - - if (owner != null) { - attributesBuilder.setUserName(owner); - } - if (group != null) { - attributesBuilder.clearGroupName(); - attributesBuilder.addGroupName(group); - } - - entryBuilder.setAttributes(attributesBuilder); - - LOG.debug("setOwner path:{} entry:{}", path, entryBuilder); - - filerClient.updateEntry(getParentDirectory(path), entryBuilder.build()); - - } - - public void setPermission(Path path, FsPermission permission) { - - LOG.debug("setPermission path:{} permission:{}", path, permission); - - FilerProto.Entry entry = lookupEntry(path); - if (entry == null) { - LOG.debug("setPermission path:{} entry:{}", path, entry); - return; - } - - FilerProto.Entry.Builder entryBuilder = entry.toBuilder(); - FilerProto.FuseAttributes.Builder attributesBuilder = entry.getAttributes().toBuilder(); - - attributesBuilder.setFileMode(permissionToMode(permission, entry.getIsDirectory())); - - entryBuilder.setAttributes(attributesBuilder); - - LOG.debug("setPermission path:{} entry:{}", path, entryBuilder); - - filerClient.updateEntry(getParentDirectory(path), entryBuilder.build()); - - } - -} diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedHadoopInputStream.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedHadoopInputStream.java deleted file mode 100644 index f26eae597..000000000 --- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedHadoopInputStream.java +++ /dev/null @@ -1,150 +0,0 @@ -package seaweed.hdfs; - -// based on org.apache.hadoop.fs.azurebfs.services.AbfsInputStream - -import org.apache.hadoop.fs.ByteBufferReadable; -import org.apache.hadoop.fs.FSInputStream; -import org.apache.hadoop.fs.FileSystem.Statistics; -import seaweedfs.client.FilerClient; -import seaweedfs.client.FilerProto; -import seaweedfs.client.SeaweedInputStream; - -import java.io.EOFException; -import java.io.IOException; -import java.nio.ByteBuffer; - -public class SeaweedHadoopInputStream extends FSInputStream implements ByteBufferReadable { - - private final SeaweedInputStream seaweedInputStream; - private final Statistics statistics; - - public SeaweedHadoopInputStream( - final FilerClient filerClient, - final Statistics statistics, - final String path, - final FilerProto.Entry entry) throws IOException { - this.seaweedInputStream = new SeaweedInputStream(filerClient, path, entry); - this.statistics = statistics; - } - - @Override - public int read() throws IOException { - return seaweedInputStream.read(); - } - - @Override - public int read(final byte[] b, final int off, final int len) throws IOException { - return seaweedInputStream.read(b, off, len); - } - - // implement ByteBufferReadable - @Override - public synchronized int read(ByteBuffer buf) throws IOException { - int bytesRead = seaweedInputStream.read(buf); - - if (bytesRead > 0) { - if (statistics != null) { - statistics.incrementBytesRead(bytesRead); - } - } - - return bytesRead; - } - - /** - * Seek to given position in stream. - * - * @param n position to seek to - * @throws IOException if there is an error - * @throws EOFException if attempting to seek past end of file - */ - @Override - public synchronized void seek(long n) throws IOException { - seaweedInputStream.seek(n); - } - - @Override - public synchronized long skip(long n) throws IOException { - return seaweedInputStream.skip(n); - } - - /** - * Return the size of the remaining available bytes - * if the size is less than or equal to {@link Integer#MAX_VALUE}, - * otherwise, return {@link Integer#MAX_VALUE}. - *

- * This is to match the behavior of DFSInputStream.available(), - * which some clients may rely on (HBase write-ahead log reading in - * particular). - */ - @Override - public synchronized int available() throws IOException { - return seaweedInputStream.available(); - } - - /** - * Returns the length of the file that this stream refers to. Note that the length returned is the length - * as of the time the Stream was opened. Specifically, if there have been subsequent appends to the file, - * they wont be reflected in the returned length. - * - * @return length of the file. - * @throws IOException if the stream is closed - */ - public long length() throws IOException { - return seaweedInputStream.length(); - } - - /** - * Return the current offset from the start of the file - * - * @throws IOException throws {@link IOException} if there is an error - */ - @Override - public synchronized long getPos() throws IOException { - return seaweedInputStream.getPos(); - } - - /** - * Seeks a different copy of the data. Returns true if - * found a new source, false otherwise. - * - * @throws IOException throws {@link IOException} if there is an error - */ - @Override - public boolean seekToNewSource(long l) throws IOException { - return false; - } - - @Override - public synchronized void close() throws IOException { - seaweedInputStream.close(); - } - - /** - * Not supported by this stream. Throws {@link UnsupportedOperationException} - * - * @param readlimit ignored - */ - @Override - public synchronized void mark(int readlimit) { - throw new UnsupportedOperationException("mark()/reset() not supported on this stream"); - } - - /** - * Not supported by this stream. Throws {@link UnsupportedOperationException} - */ - @Override - public synchronized void reset() throws IOException { - throw new UnsupportedOperationException("mark()/reset() not supported on this stream"); - } - - /** - * gets whether mark and reset are supported by {@code ADLFileInputStream}. Always returns false. - * - * @return always {@code false} - */ - @Override - public boolean markSupported() { - return false; - } -} diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedHadoopOutputStream.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedHadoopOutputStream.java deleted file mode 100644 index da5b56bbc..000000000 --- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedHadoopOutputStream.java +++ /dev/null @@ -1,16 +0,0 @@ -package seaweed.hdfs; - -// adapted from org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream - -import seaweedfs.client.FilerClient; -import seaweedfs.client.FilerProto; -import seaweedfs.client.SeaweedOutputStream; - -public class SeaweedHadoopOutputStream extends SeaweedOutputStream { - - public SeaweedHadoopOutputStream(FilerClient filerClient, final String path, FilerProto.Entry.Builder entry, - final long position, final int bufferSize, final String replication) { - super(filerClient, path, entry, position, bufferSize, replication); - } - -} diff --git a/other/java/hdfs2/src/test/java/seaweed/hdfs/SeaweedFileSystemConfigTest.java b/other/java/hdfs2/src/test/java/seaweed/hdfs/SeaweedFileSystemConfigTest.java deleted file mode 100644 index bcc08b8e2..000000000 --- a/other/java/hdfs2/src/test/java/seaweed/hdfs/SeaweedFileSystemConfigTest.java +++ /dev/null @@ -1,90 +0,0 @@ -package seaweed.hdfs; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.junit.Before; -import org.junit.Test; - -import static org.junit.Assert.*; - -/** - * Unit tests for SeaweedFileSystem configuration that don't require a running SeaweedFS instance. - * - * These tests verify basic properties and constants. - */ -public class SeaweedFileSystemConfigTest { - - private SeaweedFileSystem fs; - private Configuration conf; - - @Before - public void setUp() { - fs = new SeaweedFileSystem(); - conf = new Configuration(); - } - - @Test - public void testScheme() { - assertEquals("seaweedfs", fs.getScheme()); - } - - @Test - public void testConstants() { - // Test that constants are defined correctly - assertEquals("fs.seaweed.filer.host", SeaweedFileSystem.FS_SEAWEED_FILER_HOST); - assertEquals("fs.seaweed.filer.port", SeaweedFileSystem.FS_SEAWEED_FILER_PORT); - assertEquals("fs.seaweed.filer.port.grpc", SeaweedFileSystem.FS_SEAWEED_FILER_PORT_GRPC); - assertEquals(8888, SeaweedFileSystem.FS_SEAWEED_DEFAULT_PORT); - assertEquals("fs.seaweed.buffer.size", SeaweedFileSystem.FS_SEAWEED_BUFFER_SIZE); - assertEquals(4 * 1024 * 1024, SeaweedFileSystem.FS_SEAWEED_DEFAULT_BUFFER_SIZE); - assertEquals("fs.seaweed.replication", SeaweedFileSystem.FS_SEAWEED_REPLICATION); - assertEquals("fs.seaweed.volume.server.access", SeaweedFileSystem.FS_SEAWEED_VOLUME_SERVER_ACCESS); - assertEquals("fs.seaweed.filer.cn", SeaweedFileSystem.FS_SEAWEED_FILER_CN); - } - - @Test - public void testWorkingDirectoryPathOperations() { - // Test path operations that don't require initialization - Path testPath = new Path("/test/path"); - assertTrue("Path should be absolute", testPath.isAbsolute()); - assertEquals("/test/path", testPath.toUri().getPath()); - - Path childPath = new Path(testPath, "child"); - assertEquals("/test/path/child", childPath.toUri().getPath()); - } - - @Test - public void testConfigurationProperties() { - // Test that configuration can be set and read - conf.set(SeaweedFileSystem.FS_SEAWEED_FILER_HOST, "testhost"); - assertEquals("testhost", conf.get(SeaweedFileSystem.FS_SEAWEED_FILER_HOST)); - - conf.setInt(SeaweedFileSystem.FS_SEAWEED_FILER_PORT, 9999); - assertEquals(9999, conf.getInt(SeaweedFileSystem.FS_SEAWEED_FILER_PORT, 0)); - - conf.setInt(SeaweedFileSystem.FS_SEAWEED_BUFFER_SIZE, 8 * 1024 * 1024); - assertEquals(8 * 1024 * 1024, conf.getInt(SeaweedFileSystem.FS_SEAWEED_BUFFER_SIZE, 0)); - - conf.set(SeaweedFileSystem.FS_SEAWEED_REPLICATION, "001"); - assertEquals("001", conf.get(SeaweedFileSystem.FS_SEAWEED_REPLICATION)); - - conf.set(SeaweedFileSystem.FS_SEAWEED_VOLUME_SERVER_ACCESS, "publicUrl"); - assertEquals("publicUrl", conf.get(SeaweedFileSystem.FS_SEAWEED_VOLUME_SERVER_ACCESS)); - - conf.set(SeaweedFileSystem.FS_SEAWEED_FILER_CN, "test-cn"); - assertEquals("test-cn", conf.get(SeaweedFileSystem.FS_SEAWEED_FILER_CN)); - } - - @Test - public void testDefaultBufferSize() { - // Test default buffer size constant - int expected = 4 * 1024 * 1024; // 4MB - assertEquals(expected, SeaweedFileSystem.FS_SEAWEED_DEFAULT_BUFFER_SIZE); - } - - @Test - public void testDefaultPort() { - // Test default port constant - assertEquals(8888, SeaweedFileSystem.FS_SEAWEED_DEFAULT_PORT); - } -} diff --git a/other/java/hdfs2/src/test/java/seaweed/hdfs/SeaweedFileSystemTest.java b/other/java/hdfs2/src/test/java/seaweed/hdfs/SeaweedFileSystemTest.java deleted file mode 100644 index ec43b3481..000000000 --- a/other/java/hdfs2/src/test/java/seaweed/hdfs/SeaweedFileSystemTest.java +++ /dev/null @@ -1,379 +0,0 @@ -package seaweed.hdfs; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.permission.FsPermission; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import java.io.IOException; -import java.net.URI; - -import static org.junit.Assert.*; - -/** - * Unit tests for SeaweedFileSystem. - * - * These tests verify basic FileSystem operations against a SeaweedFS backend. - * Note: These tests require a running SeaweedFS filer instance. - * - * To run tests, ensure SeaweedFS is running with default ports: - * - Filer HTTP: 8888 - * - Filer gRPC: 18888 - * - * Set environment variable SEAWEEDFS_TEST_ENABLED=true to enable these tests. - */ -public class SeaweedFileSystemTest { - - private SeaweedFileSystem fs; - private Configuration conf; - private static final String TEST_ROOT = "/test-hdfs2"; - private static final boolean TESTS_ENABLED = - "true".equalsIgnoreCase(System.getenv("SEAWEEDFS_TEST_ENABLED")); - - @Before - public void setUp() throws Exception { - if (!TESTS_ENABLED) { - return; - } - - conf = new Configuration(); - conf.set("fs.seaweed.filer.host", "localhost"); - conf.setInt("fs.seaweed.filer.port", 8888); - conf.setInt("fs.seaweed.filer.port.grpc", 18888); - - fs = new SeaweedFileSystem(); - URI uri = new URI("seaweedfs://localhost:8888/"); - fs.initialize(uri, conf); - - // Clean up any existing test directory - Path testPath = new Path(TEST_ROOT); - if (fs.exists(testPath)) { - fs.delete(testPath, true); - } - } - - @After - public void tearDown() throws Exception { - if (!TESTS_ENABLED || fs == null) { - return; - } - - // Clean up test directory - Path testPath = new Path(TEST_ROOT); - if (fs.exists(testPath)) { - fs.delete(testPath, true); - } - - fs.close(); - } - - @Test - public void testInitialization() throws Exception { - if (!TESTS_ENABLED) { - System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set"); - return; - } - - assertNotNull(fs); - assertEquals("seaweedfs", fs.getScheme()); - assertNotNull(fs.getUri()); - assertEquals("/", fs.getWorkingDirectory().toUri().getPath()); - } - - @Test - public void testMkdirs() throws Exception { - if (!TESTS_ENABLED) { - System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set"); - return; - } - - Path testDir = new Path(TEST_ROOT + "/testdir"); - assertTrue("Failed to create directory", fs.mkdirs(testDir)); - assertTrue("Directory should exist", fs.exists(testDir)); - - FileStatus status = fs.getFileStatus(testDir); - assertTrue("Path should be a directory", status.isDirectory()); - } - - @Test - public void testCreateAndReadFile() throws Exception { - if (!TESTS_ENABLED) { - System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set"); - return; - } - - Path testFile = new Path(TEST_ROOT + "/testfile.txt"); - String testContent = "Hello, SeaweedFS!"; - - // Create and write to file - FSDataOutputStream out = fs.create(testFile, FsPermission.getDefault(), - false, 4096, (short) 1, 4 * 1024 * 1024, null); - assertNotNull("Output stream should not be null", out); - out.write(testContent.getBytes()); - out.close(); - - // Verify file exists - assertTrue("File should exist", fs.exists(testFile)); - - // Read and verify content - FSDataInputStream in = fs.open(testFile, 4096); - assertNotNull("Input stream should not be null", in); - byte[] buffer = new byte[testContent.length()]; - int bytesRead = in.read(buffer); - in.close(); - - assertEquals("Should read all bytes", testContent.length(), bytesRead); - assertEquals("Content should match", testContent, new String(buffer)); - } - - @Test - public void testFileStatus() throws Exception { - if (!TESTS_ENABLED) { - System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set"); - return; - } - - Path testFile = new Path(TEST_ROOT + "/statustest.txt"); - String content = "test content"; - - FSDataOutputStream out = fs.create(testFile); - out.write(content.getBytes()); - out.close(); - - FileStatus status = fs.getFileStatus(testFile); - assertNotNull("FileStatus should not be null", status); - assertFalse("Should not be a directory", status.isDirectory()); - assertTrue("Should be a file", status.isFile()); - assertEquals("File length should match", content.length(), status.getLen()); - assertNotNull("Path should not be null", status.getPath()); - } - - @Test - public void testListStatus() throws Exception { - if (!TESTS_ENABLED) { - System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set"); - return; - } - - Path testDir = new Path(TEST_ROOT + "/listtest"); - fs.mkdirs(testDir); - - // Create multiple files - for (int i = 0; i < 3; i++) { - Path file = new Path(testDir, "file" + i + ".txt"); - FSDataOutputStream out = fs.create(file); - out.write(("content" + i).getBytes()); - out.close(); - } - - FileStatus[] statuses = fs.listStatus(testDir); - assertNotNull("List should not be null", statuses); - assertEquals("Should have 3 files", 3, statuses.length); - } - - @Test - public void testRename() throws Exception { - if (!TESTS_ENABLED) { - System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set"); - return; - } - - Path srcFile = new Path(TEST_ROOT + "/source.txt"); - Path dstFile = new Path(TEST_ROOT + "/destination.txt"); - String content = "rename test"; - - // Create source file - FSDataOutputStream out = fs.create(srcFile); - out.write(content.getBytes()); - out.close(); - - assertTrue("Source file should exist", fs.exists(srcFile)); - - // Rename - assertTrue("Rename should succeed", fs.rename(srcFile, dstFile)); - - // Verify - assertFalse("Source file should not exist", fs.exists(srcFile)); - assertTrue("Destination file should exist", fs.exists(dstFile)); - - // Verify content preserved - FSDataInputStream in = fs.open(dstFile); - byte[] buffer = new byte[content.length()]; - in.read(buffer); - in.close(); - assertEquals("Content should be preserved", content, new String(buffer)); - } - - @Test - public void testDelete() throws Exception { - if (!TESTS_ENABLED) { - System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set"); - return; - } - - Path testFile = new Path(TEST_ROOT + "/deletetest.txt"); - - // Create file - FSDataOutputStream out = fs.create(testFile); - out.write("delete me".getBytes()); - out.close(); - - assertTrue("File should exist before delete", fs.exists(testFile)); - - // Delete - assertTrue("Delete should succeed", fs.delete(testFile, false)); - assertFalse("File should not exist after delete", fs.exists(testFile)); - } - - @Test - public void testDeleteDirectory() throws Exception { - if (!TESTS_ENABLED) { - System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set"); - return; - } - - Path testDir = new Path(TEST_ROOT + "/deletedir"); - Path testFile = new Path(testDir, "file.txt"); - - // Create directory with file - fs.mkdirs(testDir); - FSDataOutputStream out = fs.create(testFile); - out.write("content".getBytes()); - out.close(); - - assertTrue("Directory should exist", fs.exists(testDir)); - assertTrue("File should exist", fs.exists(testFile)); - - // Recursive delete - assertTrue("Recursive delete should succeed", fs.delete(testDir, true)); - assertFalse("Directory should not exist after delete", fs.exists(testDir)); - assertFalse("File should not exist after delete", fs.exists(testFile)); - } - - @Test - public void testAppend() throws Exception { - if (!TESTS_ENABLED) { - System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set"); - return; - } - - Path testFile = new Path(TEST_ROOT + "/appendtest.txt"); - String initialContent = "initial"; - String appendContent = " appended"; - - // Create initial file - FSDataOutputStream out = fs.create(testFile); - out.write(initialContent.getBytes()); - out.close(); - - // Append - FSDataOutputStream appendOut = fs.append(testFile, 4096, null); - assertNotNull("Append stream should not be null", appendOut); - appendOut.write(appendContent.getBytes()); - appendOut.close(); - - // Verify combined content - FSDataInputStream in = fs.open(testFile); - byte[] buffer = new byte[initialContent.length() + appendContent.length()]; - int bytesRead = in.read(buffer); - in.close(); - - String expected = initialContent + appendContent; - assertEquals("Should read all bytes", expected.length(), bytesRead); - assertEquals("Content should match", expected, new String(buffer)); - } - - @Test - public void testSetWorkingDirectory() throws Exception { - if (!TESTS_ENABLED) { - System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set"); - return; - } - - Path originalWd = fs.getWorkingDirectory(); - assertEquals("Original working directory should be /", "/", originalWd.toUri().getPath()); - - Path newWd = new Path(TEST_ROOT); - fs.mkdirs(newWd); - fs.setWorkingDirectory(newWd); - - Path currentWd = fs.getWorkingDirectory(); - assertTrue("Working directory should be updated", - currentWd.toUri().getPath().contains(TEST_ROOT)); - } - - @Test - public void testSetPermission() throws Exception { - if (!TESTS_ENABLED) { - System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set"); - return; - } - - Path testFile = new Path(TEST_ROOT + "/permtest.txt"); - - // Create file - FSDataOutputStream out = fs.create(testFile); - out.write("permission test".getBytes()); - out.close(); - - // Set permission - FsPermission newPerm = new FsPermission((short) 0644); - fs.setPermission(testFile, newPerm); - - FileStatus status = fs.getFileStatus(testFile); - assertNotNull("Permission should not be null", status.getPermission()); - } - - @Test - public void testSetOwner() throws Exception { - if (!TESTS_ENABLED) { - System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set"); - return; - } - - Path testFile = new Path(TEST_ROOT + "/ownertest.txt"); - - // Create file - FSDataOutputStream out = fs.create(testFile); - out.write("owner test".getBytes()); - out.close(); - - // Set owner - this may not fail even if not fully implemented - fs.setOwner(testFile, "testuser", "testgroup"); - - // Just verify the call doesn't throw an exception - FileStatus status = fs.getFileStatus(testFile); - assertNotNull("FileStatus should not be null", status); - } - - @Test - public void testRenameToExistingDirectory() throws Exception { - if (!TESTS_ENABLED) { - System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set"); - return; - } - - Path srcFile = new Path(TEST_ROOT + "/movefile.txt"); - Path dstDir = new Path(TEST_ROOT + "/movedir"); - - // Create source file and destination directory - FSDataOutputStream out = fs.create(srcFile); - out.write("move test".getBytes()); - out.close(); - fs.mkdirs(dstDir); - - // Rename file to existing directory (should move file into directory) - assertTrue("Rename to directory should succeed", fs.rename(srcFile, dstDir)); - - // File should be moved into the directory - Path expectedLocation = new Path(dstDir, srcFile.getName()); - assertTrue("File should exist in destination directory", fs.exists(expectedLocation)); - assertFalse("Source file should not exist", fs.exists(srcFile)); - } -} - diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedAtomicOutputStream.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedAtomicOutputStream.java deleted file mode 100644 index ed42af0a9..000000000 --- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedAtomicOutputStream.java +++ /dev/null @@ -1,109 +0,0 @@ -package seaweed.hdfs; - -import org.apache.hadoop.fs.Syncable; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import seaweedfs.client.FilerClient; -import seaweedfs.client.FilerProto; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; - -/** - * Atomic output stream for Parquet files. - * - * Buffers all writes in memory and writes atomically on close(). - * This ensures that getPos() always returns accurate positions that match - * the final file layout, which is required for Parquet's footer metadata. - */ -public class SeaweedAtomicOutputStream extends SeaweedHadoopOutputStream implements Syncable { - - private static final Logger LOG = LoggerFactory.getLogger(SeaweedAtomicOutputStream.class); - - private final ByteArrayOutputStream memoryBuffer; - private final String filePath; - private boolean closed = false; - - public SeaweedAtomicOutputStream(FilerClient filerClient, String path, FilerProto.Entry.Builder entry, - long position, int maxBufferSize, String replication) { - super(filerClient, path, entry, position, maxBufferSize, replication); - this.filePath = path; - this.memoryBuffer = new ByteArrayOutputStream(maxBufferSize); - LOG.info("[ATOMIC] Created atomic output stream for: {} (maxBuffer={})", path, maxBufferSize); - } - - @Override - public synchronized void write(int b) throws IOException { - if (closed) { - throw new IOException("Stream is closed"); - } - memoryBuffer.write(b); - } - - @Override - public synchronized void write(byte[] b, int off, int len) throws IOException { - if (closed) { - throw new IOException("Stream is closed"); - } - memoryBuffer.write(b, off, len); - } - - @Override - public synchronized long getPos() throws IOException { - // Return the current size of the memory buffer - // This is always accurate since nothing is flushed until close() - long pos = memoryBuffer.size(); - - // Log getPos() calls around the problematic positions - if (pos >= 470 && pos <= 476) { - LOG.error("[ATOMIC-GETPOS] getPos() returning pos={}", pos); - } - - return pos; - } - - @Override - public synchronized void flush() throws IOException { - // No-op for atomic writes - everything is flushed on close() - LOG.debug("[ATOMIC] flush() called (no-op for atomic writes)"); - } - - @Override - public synchronized void hsync() throws IOException { - // No-op for atomic writes - LOG.debug("[ATOMIC] hsync() called (no-op for atomic writes)"); - } - - @Override - public synchronized void hflush() throws IOException { - // No-op for atomic writes - LOG.debug("[ATOMIC] hflush() called (no-op for atomic writes)"); - } - - @Override - public synchronized void close() throws IOException { - if (closed) { - return; - } - - try { - byte[] data = memoryBuffer.toByteArray(); - int size = data.length; - - LOG.info("[ATOMIC] Closing atomic stream: {} ({} bytes buffered)", filePath, size); - - if (size > 0) { - // Write all data at once using the parent's write method - super.write(data, 0, size); - } - - // Now close the parent stream which will flush and write metadata - super.close(); - - LOG.info("[ATOMIC] Successfully wrote {} bytes atomically to: {}", size, filePath); - } finally { - closed = true; - memoryBuffer.reset(); - } - } -} diff --git a/test/java/spark/COMMIT_SUMMARY.md b/test/java/spark/COMMIT_SUMMARY.md deleted file mode 100644 index a8b405f55..000000000 --- a/test/java/spark/COMMIT_SUMMARY.md +++ /dev/null @@ -1,132 +0,0 @@ -# Fix Parquet EOF Error by Removing ByteBufferReadable Interface - -## Summary - -Fixed `EOFException: Reached the end of stream. Still have: 78 bytes left` error when reading Parquet files with complex schemas in Spark. - -## Root Cause - -`SeaweedHadoopInputStream` declared it implemented `ByteBufferReadable` interface but didn't properly implement it, causing incorrect buffering strategy and position tracking issues during positioned reads (critical for Parquet). - -## Solution - -Removed `ByteBufferReadable` interface from `SeaweedHadoopInputStream` to match Hadoop's `RawLocalFileSystem` pattern, which uses `BufferedFSInputStream` for proper position tracking. - -## Changes - -### Core Fix - -1. **`SeaweedHadoopInputStream.java`**: - - Removed `ByteBufferReadable` interface - - Removed `read(ByteBuffer)` method - - Cleaned up debug logging - - Added documentation explaining the design choice - -2. **`SeaweedFileSystem.java`**: - - Changed from `BufferedByteBufferReadableInputStream` to `BufferedFSInputStream` - - Applies to all streams uniformly - - Cleaned up debug logging - -3. **`SeaweedInputStream.java`**: - - Cleaned up debug logging - -### Cleanup - -4. **Deleted debug-only files**: - - `DebugDualInputStream.java` - - `DebugDualInputStreamWrapper.java` - - `DebugDualOutputStream.java` - - `DebugMode.java` - - `LocalOnlyInputStream.java` - - `ShadowComparisonStream.java` - -5. **Reverted**: - - `SeaweedFileSystemStore.java` (removed all debug mode logic) - -6. **Cleaned**: - - `docker-compose.yml` (removed debug environment variables) - - All `.md` documentation files in `test/java/spark/` - -## Testing - -All Spark integration tests pass: -- ✅ `SparkSQLTest.testCreateTableAndQuery` (complex 4-column schema) -- ✅ `SimpleOneColumnTest` (basic operations) -- ✅ All other Spark integration tests - -## Technical Details - -### Why This Works - -Hadoop's `RawLocalFileSystem` uses the exact same pattern: -- Does NOT implement `ByteBufferReadable` -- Uses `BufferedFSInputStream` for buffering -- Properly handles positioned reads with automatic position restoration - -### Position Tracking - -`BufferedFSInputStream` implements positioned reads correctly: -```java -public int read(long position, byte[] buffer, int offset, int length) { - long oldPos = getPos(); - try { - seek(position); - return read(buffer, offset, length); - } finally { - seek(oldPos); // Restores position! - } -} -``` - -This ensures buffered reads don't permanently change the stream position, which is critical for Parquet's random access pattern. - -### Performance Impact - -Minimal to none: -- Network latency dominates for remote storage -- Buffering is still active (4x buffer size) -- Extra byte[] copy is negligible compared to network I/O - -## Commit Message - -``` -Fix Parquet EOF error by removing ByteBufferReadable interface - -SeaweedHadoopInputStream incorrectly declared ByteBufferReadable interface -without proper implementation, causing position tracking issues during -positioned reads. This resulted in "78 bytes left" EOF errors when reading -Parquet files with complex schemas in Spark. - -Solution: Remove ByteBufferReadable and use BufferedFSInputStream (matching -Hadoop's RawLocalFileSystem pattern) which properly handles position -restoration for positioned reads. - -Changes: -- Remove ByteBufferReadable interface from SeaweedHadoopInputStream -- Change SeaweedFileSystem to use BufferedFSInputStream for all streams -- Clean up debug logging -- Delete debug-only classes and files - -Tested: All Spark integration tests pass -``` - -## Files Changed - -### Modified -- `other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedHadoopInputStream.java` -- `other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java` -- `other/java/client/src/main/java/seaweedfs/client/SeaweedInputStream.java` -- `test/java/spark/docker-compose.yml` - -### Reverted -- `other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java` - -### Deleted -- `other/java/hdfs3/src/main/java/seaweed/hdfs/DebugDualInputStream.java` -- `other/java/hdfs3/src/main/java/seaweed/hdfs/DebugDualInputStreamWrapper.java` -- `other/java/hdfs3/src/main/java/seaweed/hdfs/DebugDualOutputStream.java` -- `other/java/hdfs3/src/main/java/seaweed/hdfs/DebugMode.java` -- `other/java/hdfs3/src/main/java/seaweed/hdfs/LocalOnlyInputStream.java` -- `other/java/hdfs3/src/main/java/seaweed/hdfs/ShadowComparisonStream.java` -- All `.md` files in `test/java/spark/` (debug documentation) -