17 changed files with 1 additions and 2862 deletions
-
5.github/workflows/java_integration_tests.yml
-
6.github/workflows/java_unit_tests.yml
-
8.github/workflows/spark-integration-tests.yml
-
2other/java/examples/pom.xml
-
199other/java/hdfs2/README.md
-
578other/java/hdfs2/dependency-reduced-pom.xml
-
195other/java/hdfs2/pom.xml
-
25other/java/hdfs2/src/main/java/seaweed/hdfs/BufferedByteBufferReadableInputStream.java
-
35other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedAbstractFileSystem.java
-
643other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
-
291other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java
-
150other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedHadoopInputStream.java
-
16other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedHadoopOutputStream.java
-
90other/java/hdfs2/src/test/java/seaweed/hdfs/SeaweedFileSystemConfigTest.java
-
379other/java/hdfs2/src/test/java/seaweed/hdfs/SeaweedFileSystemTest.java
-
109other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedAtomicOutputStream.java
-
132test/java/spark/COMMIT_SUMMARY.md
@ -1,199 +0,0 @@ |
|||||
# SeaweedFS Hadoop2 Client |
|
||||
|
|
||||
Hadoop FileSystem implementation for SeaweedFS, compatible with Hadoop 2.x/3.x. |
|
||||
|
|
||||
## Building |
|
||||
|
|
||||
```bash |
|
||||
mvn clean install |
|
||||
``` |
|
||||
|
|
||||
## Testing |
|
||||
|
|
||||
This project includes two types of tests: |
|
||||
|
|
||||
### 1. Configuration Tests (No SeaweedFS Required) |
|
||||
|
|
||||
These tests verify configuration handling and initialization logic without requiring a running SeaweedFS instance: |
|
||||
|
|
||||
```bash |
|
||||
mvn test -Dtest=SeaweedFileSystemConfigTest |
|
||||
``` |
|
||||
|
|
||||
### 2. Integration Tests (Requires SeaweedFS) |
|
||||
|
|
||||
These tests verify actual FileSystem operations against a running SeaweedFS instance. |
|
||||
|
|
||||
#### Prerequisites |
|
||||
|
|
||||
1. Start SeaweedFS with default ports: |
|
||||
```bash |
|
||||
# Terminal 1: Start master |
|
||||
weed master |
|
||||
|
|
||||
# Terminal 2: Start volume server |
|
||||
weed volume -mserver=localhost:9333 |
|
||||
|
|
||||
# Terminal 3: Start filer |
|
||||
weed filer -master=localhost:9333 |
|
||||
``` |
|
||||
|
|
||||
2. Verify services are running: |
|
||||
- Master: http://localhost:9333 |
|
||||
- Filer HTTP: http://localhost:8888 |
|
||||
- Filer gRPC: localhost:18888 |
|
||||
|
|
||||
#### Running Integration Tests |
|
||||
|
|
||||
```bash |
|
||||
# Enable integration tests |
|
||||
export SEAWEEDFS_TEST_ENABLED=true |
|
||||
|
|
||||
# Run all tests |
|
||||
mvn test |
|
||||
|
|
||||
# Run specific test |
|
||||
mvn test -Dtest=SeaweedFileSystemTest |
|
||||
``` |
|
||||
|
|
||||
### Test Configuration |
|
||||
|
|
||||
Integration tests can be configured via environment variables or system properties: |
|
||||
|
|
||||
- `SEAWEEDFS_TEST_ENABLED`: Set to `true` to enable integration tests (default: false) |
|
||||
- Tests use these default connection settings: |
|
||||
- Filer Host: localhost |
|
||||
- Filer HTTP Port: 8888 |
|
||||
- Filer gRPC Port: 18888 |
|
||||
|
|
||||
### Running Tests with Custom Configuration |
|
||||
|
|
||||
To test against a different SeaweedFS instance, modify the test code or use Hadoop configuration: |
|
||||
|
|
||||
```java |
|
||||
conf.set("fs.seaweed.filer.host", "your-host"); |
|
||||
conf.setInt("fs.seaweed.filer.port", 8888); |
|
||||
conf.setInt("fs.seaweed.filer.port.grpc", 18888); |
|
||||
``` |
|
||||
|
|
||||
## Test Coverage |
|
||||
|
|
||||
The test suite covers: |
|
||||
|
|
||||
- **Configuration & Initialization** |
|
||||
- URI parsing and configuration |
|
||||
- Default values |
|
||||
- Configuration overrides |
|
||||
- Working directory management |
|
||||
|
|
||||
- **File Operations** |
|
||||
- Create files |
|
||||
- Read files |
|
||||
- Write files |
|
||||
- Append to files |
|
||||
- Delete files |
|
||||
|
|
||||
- **Directory Operations** |
|
||||
- Create directories |
|
||||
- List directory contents |
|
||||
- Delete directories (recursive and non-recursive) |
|
||||
|
|
||||
- **Metadata Operations** |
|
||||
- Get file status |
|
||||
- Set permissions |
|
||||
- Set owner/group |
|
||||
- Rename files and directories |
|
||||
|
|
||||
## Usage in Hadoop |
|
||||
|
|
||||
1. Copy the built JAR to your Hadoop classpath: |
|
||||
```bash |
|
||||
cp target/seaweedfs-hadoop2-client-*.jar $HADOOP_HOME/share/hadoop/common/lib/ |
|
||||
``` |
|
||||
|
|
||||
2. Configure `core-site.xml`: |
|
||||
```xml |
|
||||
<configuration> |
|
||||
<property> |
|
||||
<name>fs.seaweedfs.impl</name> |
|
||||
<value>seaweed.hdfs.SeaweedFileSystem</value> |
|
||||
</property> |
|
||||
<property> |
|
||||
<name>fs.seaweed.filer.host</name> |
|
||||
<value>localhost</value> |
|
||||
</property> |
|
||||
<property> |
|
||||
<name>fs.seaweed.filer.port</name> |
|
||||
<value>8888</value> |
|
||||
</property> |
|
||||
<property> |
|
||||
<name>fs.seaweed.filer.port.grpc</name> |
|
||||
<value>18888</value> |
|
||||
</property> |
|
||||
<!-- Optional: Replication configuration with three priority levels: |
|
||||
1) If set to non-empty value (e.g. "001") - uses that value |
|
||||
2) If set to empty string "" - uses SeaweedFS filer's default replication |
|
||||
3) If not configured (property not present) - uses HDFS replication parameter |
|
||||
--> |
|
||||
<!-- <property> |
|
||||
<name>fs.seaweed.replication</name> |
|
||||
<value>001</value> |
|
||||
</property> --> |
|
||||
</configuration> |
|
||||
``` |
|
||||
|
|
||||
3. Use SeaweedFS with Hadoop commands: |
|
||||
```bash |
|
||||
hadoop fs -ls seaweedfs://localhost:8888/ |
|
||||
hadoop fs -mkdir seaweedfs://localhost:8888/test |
|
||||
hadoop fs -put local.txt seaweedfs://localhost:8888/test/ |
|
||||
``` |
|
||||
|
|
||||
## Continuous Integration |
|
||||
|
|
||||
For CI environments, tests can be run in two modes: |
|
||||
|
|
||||
1. **Configuration Tests Only** (default, no SeaweedFS required): |
|
||||
```bash |
|
||||
mvn test -Dtest=SeaweedFileSystemConfigTest |
|
||||
``` |
|
||||
|
|
||||
2. **Full Integration Tests** (requires SeaweedFS): |
|
||||
```bash |
|
||||
# Start SeaweedFS in CI environment |
|
||||
# Then run: |
|
||||
export SEAWEEDFS_TEST_ENABLED=true |
|
||||
mvn test |
|
||||
``` |
|
||||
|
|
||||
## Troubleshooting |
|
||||
|
|
||||
### Tests are skipped |
|
||||
|
|
||||
If you see "Skipping test - SEAWEEDFS_TEST_ENABLED not set": |
|
||||
```bash |
|
||||
export SEAWEEDFS_TEST_ENABLED=true |
|
||||
``` |
|
||||
|
|
||||
### Connection refused errors |
|
||||
|
|
||||
Ensure SeaweedFS is running and accessible: |
|
||||
```bash |
|
||||
curl http://localhost:8888/ |
|
||||
``` |
|
||||
|
|
||||
### gRPC errors |
|
||||
|
|
||||
Verify the gRPC port is accessible: |
|
||||
```bash |
|
||||
# Should show the port is listening |
|
||||
netstat -an | grep 18888 |
|
||||
``` |
|
||||
|
|
||||
## Contributing |
|
||||
|
|
||||
When adding new features, please include: |
|
||||
1. Configuration tests (no SeaweedFS required) |
|
||||
2. Integration tests (with SEAWEEDFS_TEST_ENABLED guard) |
|
||||
3. Documentation updates |
|
||||
|
|
||||
@ -1,578 +0,0 @@ |
|||||
<?xml version="1.0" encoding="UTF-8"?> |
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> |
|
||||
<modelVersion>4.0.0</modelVersion> |
|
||||
<groupId>com.seaweedfs</groupId> |
|
||||
<artifactId>seaweedfs-hadoop2-client</artifactId> |
|
||||
<name>SeaweedFS HDFS2 Client</name> |
|
||||
<version>${seaweedfs.client.version}</version> |
|
||||
<description>A java client for SeaweedFS.</description> |
|
||||
<url>https://github.com/seaweedfs/seaweedfs</url> |
|
||||
<developers> |
|
||||
<developer> |
|
||||
<name>Chris Lu</name> |
|
||||
<email>chris.lu@gmail.com</email> |
|
||||
<organization>SeaweedFS</organization> |
|
||||
<organizationUrl>https://seaweedfs.com</organizationUrl> |
|
||||
</developer> |
|
||||
</developers> |
|
||||
<licenses> |
|
||||
<license> |
|
||||
<name>The Apache License, Version 2.0</name> |
|
||||
<url>http://www.apache.org/licenses/LICENSE-2.0.txt</url> |
|
||||
</license> |
|
||||
</licenses> |
|
||||
<scm> |
|
||||
<connection>scm:git:git://github.com/seaweedfs/seaweedfs.git</connection> |
|
||||
<developerConnection>scm:git:ssh://github.com:seaweedfs/seaweedfs.git</developerConnection> |
|
||||
<url>https://github.com/seaweedfs/seaweedfs/tree/master</url> |
|
||||
</scm> |
|
||||
<build> |
|
||||
<plugins> |
|
||||
<plugin> |
|
||||
<artifactId>maven-compiler-plugin</artifactId> |
|
||||
<configuration> |
|
||||
<source>8</source> |
|
||||
<target>8</target> |
|
||||
<release>8</release> |
|
||||
</configuration> |
|
||||
</plugin> |
|
||||
<plugin> |
|
||||
<artifactId>maven-shade-plugin</artifactId> |
|
||||
<version>3.2.1</version> |
|
||||
<executions> |
|
||||
<execution> |
|
||||
<phase>package</phase> |
|
||||
<goals> |
|
||||
<goal>shade</goal> |
|
||||
</goals> |
|
||||
<configuration> |
|
||||
<filters> |
|
||||
<filter> |
|
||||
<artifact>*:*</artifact> |
|
||||
<excludes> |
|
||||
<exclude>META-INF/*.SF</exclude> |
|
||||
<exclude>META-INF/*.DSA</exclude> |
|
||||
<exclude>META-INF/*.RSA</exclude> |
|
||||
<exclude>org/slf4j/**</exclude> |
|
||||
<exclude>META-INF/maven/org.slf4j/**</exclude> |
|
||||
</excludes> |
|
||||
</filter> |
|
||||
</filters> |
|
||||
<transformers> |
|
||||
<transformer /> |
|
||||
</transformers> |
|
||||
<relocations> |
|
||||
<relocation> |
|
||||
<pattern>com.google</pattern> |
|
||||
<shadedPattern>shaded.com.google</shadedPattern> |
|
||||
</relocation> |
|
||||
<relocation> |
|
||||
<pattern>io.grpc.internal</pattern> |
|
||||
<shadedPattern>shaded.io.grpc.internal</shadedPattern> |
|
||||
</relocation> |
|
||||
<relocation> |
|
||||
<pattern>org.apache.commons</pattern> |
|
||||
<shadedPattern>shaded.org.apache.commons</shadedPattern> |
|
||||
<excludes> |
|
||||
<exclude>org.apache.hadoop</exclude> |
|
||||
<exclude>org.apache.log4j</exclude> |
|
||||
</excludes> |
|
||||
</relocation> |
|
||||
<relocation> |
|
||||
<pattern>org.apache.http</pattern> |
|
||||
<shadedPattern>shaded.org.apache.http</shadedPattern> |
|
||||
</relocation> |
|
||||
</relocations> |
|
||||
</configuration> |
|
||||
</execution> |
|
||||
</executions> |
|
||||
</plugin> |
|
||||
<plugin> |
|
||||
<artifactId>maven-gpg-plugin</artifactId> |
|
||||
<version>1.5</version> |
|
||||
<executions> |
|
||||
<execution> |
|
||||
<id>sign-artifacts</id> |
|
||||
<phase>verify</phase> |
|
||||
<goals> |
|
||||
<goal>sign</goal> |
|
||||
</goals> |
|
||||
</execution> |
|
||||
</executions> |
|
||||
</plugin> |
|
||||
<plugin> |
|
||||
<groupId>org.sonatype.central</groupId> |
|
||||
<artifactId>central-publishing-maven-plugin</artifactId> |
|
||||
<version>0.5.0</version> |
|
||||
<extensions>true</extensions> |
|
||||
<configuration> |
|
||||
<publishingServerId>central</publishingServerId> |
|
||||
<autoPublish>true</autoPublish> |
|
||||
</configuration> |
|
||||
</plugin> |
|
||||
<plugin> |
|
||||
<artifactId>maven-source-plugin</artifactId> |
|
||||
<version>2.2.1</version> |
|
||||
<executions> |
|
||||
<execution> |
|
||||
<id>attach-sources</id> |
|
||||
<goals> |
|
||||
<goal>jar-no-fork</goal> |
|
||||
</goals> |
|
||||
</execution> |
|
||||
</executions> |
|
||||
</plugin> |
|
||||
<plugin> |
|
||||
<artifactId>maven-javadoc-plugin</artifactId> |
|
||||
<version>3.0.1</version> |
|
||||
<executions> |
|
||||
<execution> |
|
||||
<id>attach-javadocs</id> |
|
||||
<goals> |
|
||||
<goal>jar</goal> |
|
||||
</goals> |
|
||||
</execution> |
|
||||
</executions> |
|
||||
</plugin> |
|
||||
</plugins> |
|
||||
</build> |
|
||||
<dependencies> |
|
||||
<dependency> |
|
||||
<groupId>org.apache.hadoop</groupId> |
|
||||
<artifactId>hadoop-client</artifactId> |
|
||||
<version>3.4.0</version> |
|
||||
<scope>provided</scope> |
|
||||
<exclusions> |
|
||||
<exclusion> |
|
||||
<artifactId>hadoop-hdfs-client</artifactId> |
|
||||
<groupId>org.apache.hadoop</groupId> |
|
||||
</exclusion> |
|
||||
<exclusion> |
|
||||
<artifactId>hadoop-yarn-api</artifactId> |
|
||||
<groupId>org.apache.hadoop</groupId> |
|
||||
</exclusion> |
|
||||
<exclusion> |
|
||||
<artifactId>hadoop-yarn-client</artifactId> |
|
||||
<groupId>org.apache.hadoop</groupId> |
|
||||
</exclusion> |
|
||||
<exclusion> |
|
||||
<artifactId>hadoop-mapreduce-client-core</artifactId> |
|
||||
<groupId>org.apache.hadoop</groupId> |
|
||||
</exclusion> |
|
||||
<exclusion> |
|
||||
<artifactId>hadoop-mapreduce-client-jobclient</artifactId> |
|
||||
<groupId>org.apache.hadoop</groupId> |
|
||||
</exclusion> |
|
||||
<exclusion> |
|
||||
<artifactId>hadoop-annotations</artifactId> |
|
||||
<groupId>org.apache.hadoop</groupId> |
|
||||
</exclusion> |
|
||||
</exclusions> |
|
||||
</dependency> |
|
||||
<dependency> |
|
||||
<groupId>org.apache.hadoop</groupId> |
|
||||
<artifactId>hadoop-common</artifactId> |
|
||||
<version>3.4.0</version> |
|
||||
<scope>provided</scope> |
|
||||
<exclusions> |
|
||||
<exclusion> |
|
||||
<artifactId>hadoop-shaded-protobuf_3_21</artifactId> |
|
||||
<groupId>org.apache.hadoop.thirdparty</groupId> |
|
||||
</exclusion> |
|
||||
<exclusion> |
|
||||
<artifactId>hadoop-shaded-guava</artifactId> |
|
||||
<groupId>org.apache.hadoop.thirdparty</groupId> |
|
||||
</exclusion> |
|
||||
<exclusion> |
|
||||
<artifactId>commons-cli</artifactId> |
|
||||
<groupId>commons-cli</groupId> |
|
||||
</exclusion> |
|
||||
<exclusion> |
|
||||
<artifactId>commons-math3</artifactId> |
|
||||
<groupId>org.apache.commons</groupId> |
|
||||
</exclusion> |
|
||||
<exclusion> |
|
||||
<artifactId>commons-io</artifactId> |
|
||||
<groupId>commons-io</groupId> |
|
||||
</exclusion> |
|
||||
<exclusion> |
|
||||
<artifactId>commons-net</artifactId> |
|
||||
<groupId>commons-net</groupId> |
|
||||
</exclusion> |
|
||||
<exclusion> |
|
||||
<artifactId>commons-collections</artifactId> |
|
||||
<groupId>commons-collections</groupId> |
|
||||
</exclusion> |
|
||||
<exclusion> |
|
||||
<artifactId>javax.servlet-api</artifactId> |
|
||||
<groupId>javax.servlet</groupId> |
|
||||
</exclusion> |
|
||||
<exclusion> |
|
||||
<artifactId>jakarta.activation-api</artifactId> |
|
||||
<groupId>jakarta.activation</groupId> |
|
||||
</exclusion> |
|
||||
<exclusion> |
|
||||
<artifactId>jetty-server</artifactId> |
|
||||
<groupId>org.eclipse.jetty</groupId> |
|
||||
</exclusion> |
|
||||
<exclusion> |
|
||||
<artifactId>jetty-util</artifactId> |
|
||||
<groupId>org.eclipse.jetty</groupId> |
|
||||
</exclusion> |
|
||||
<exclusion> |
|
||||
<artifactId>jetty-servlet</artifactId> |
|
||||
<groupId>org.eclipse.jetty</groupId> |
|
||||
</exclusion> |
|
||||
<exclusion> |
|
||||
<artifactId>jetty-webapp</artifactId> |
|
||||
<groupId>org.eclipse.jetty</groupId> |
|
||||
</exclusion> |
|
||||
<exclusion> |
|
||||
<artifactId>jsp-api</artifactId> |
|
||||
<groupId>javax.servlet.jsp</groupId> |
|
||||
</exclusion> |
|
||||
<exclusion> |
|
||||
<artifactId>jersey-core</artifactId> |
|
||||
<groupId>com.sun.jersey</groupId> |
|
||||
</exclusion> |
|
||||
<exclusion> |
|
||||
<artifactId>jersey-servlet</artifactId> |
|
||||
<groupId>com.sun.jersey</groupId> |
|
||||
</exclusion> |
|
||||
<exclusion> |
|
||||
<artifactId>jersey-json</artifactId> |
|
||||
<groupId>com.github.pjfanning</groupId> |
|
||||
</exclusion> |
|
||||
<exclusion> |
|
||||
<artifactId>jettison</artifactId> |
|
||||
<groupId>org.codehaus.jettison</groupId> |
|
||||
</exclusion> |
|
||||
<exclusion> |
|
||||
<artifactId>jersey-server</artifactId> |
|
||||
<groupId>com.sun.jersey</groupId> |
|
||||
</exclusion> |
|
||||
<exclusion> |
|
||||
<artifactId>reload4j</artifactId> |
|
||||
<groupId>ch.qos.reload4j</groupId> |
|
||||
</exclusion> |
|
||||
<exclusion> |
|
||||
<artifactId>commons-beanutils</artifactId> |
|
||||
<groupId>commons-beanutils</groupId> |
|
||||
</exclusion> |
|
||||
<exclusion> |
|
||||
<artifactId>commons-configuration2</artifactId> |
|
||||
<groupId>org.apache.commons</groupId> |
|
||||
</exclusion> |
|
||||
<exclusion> |
|
||||
<artifactId>commons-lang3</artifactId> |
|
||||
<groupId>org.apache.commons</groupId> |
|
||||
</exclusion> |
|
||||
<exclusion> |
|
||||
<artifactId>commons-text</artifactId> |
|
||||
<groupId>org.apache.commons</groupId> |
|
||||
</exclusion> |
|
||||
<exclusion> |
|
||||
<artifactId>slf4j-reload4j</artifactId> |
|
||||
<groupId>org.slf4j</groupId> |
|
||||
</exclusion> |
|
||||
<exclusion> |
|
||||
<artifactId>avro</artifactId> |
|
||||
<groupId>org.apache.avro</groupId> |
|
||||
</exclusion> |
|
||||
<exclusion> |
|
||||
<artifactId>re2j</artifactId> |
|
||||
<groupId>com.google.re2j</groupId> |
|
||||
</exclusion> |
|
||||
<exclusion> |
|
||||
<artifactId>hadoop-auth</artifactId> |
|
||||
<groupId>org.apache.hadoop</groupId> |
|
||||
</exclusion> |
|
||||
<exclusion> |
|
||||
<artifactId>jsch</artifactId> |
|
||||
<groupId>com.jcraft</groupId> |
|
||||
</exclusion> |
|
||||
<exclusion> |
|
||||
<artifactId>curator-client</artifactId> |
|
||||
<groupId>org.apache.curator</groupId> |
|
||||
</exclusion> |
|
||||
<exclusion> |
|
||||
<artifactId>curator-recipes</artifactId> |
|
||||
<groupId>org.apache.curator</groupId> |
|
||||
</exclusion> |
|
||||
<exclusion> |
|
||||
<artifactId>zookeeper</artifactId> |
|
||||
<groupId>org.apache.zookeeper</groupId> |
|
||||
</exclusion> |
|
||||
<exclusion> |
|
||||
<artifactId>netty-handler</artifactId> |
|
||||
<groupId>io.netty</groupId> |
|
||||
</exclusion> |
|
||||
<exclusion> |
|
||||
<artifactId>netty-transport-native-epoll</artifactId> |
|
||||
<groupId>io.netty</groupId> |
|
||||
</exclusion> |
|
||||
<exclusion> |
|
||||
<artifactId>metrics-core</artifactId> |
|
||||
<groupId>io.dropwizard.metrics</groupId> |
|
||||
</exclusion> |
|
||||
<exclusion> |
|
||||
<artifactId>commons-compress</artifactId> |
|
||||
<groupId>org.apache.commons</groupId> |
|
||||
</exclusion> |
|
||||
<exclusion> |
|
||||
<artifactId>bcprov-jdk15on</artifactId> |
|
||||
<groupId>org.bouncycastle</groupId> |
|
||||
</exclusion> |
|
||||
<exclusion> |
|
||||
<artifactId>kerb-core</artifactId> |
|
||||
<groupId>org.apache.kerby</groupId> |
|
||||
</exclusion> |
|
||||
<exclusion> |
|
||||
<artifactId>jackson-databind</artifactId> |
|
||||
<groupId>com.fasterxml.jackson.core</groupId> |
|
||||
</exclusion> |
|
||||
<exclusion> |
|
||||
<artifactId>stax2-api</artifactId> |
|
||||
<groupId>org.codehaus.woodstox</groupId> |
|
||||
</exclusion> |
|
||||
<exclusion> |
|
||||
<artifactId>woodstox-core</artifactId> |
|
||||
<groupId>com.fasterxml.woodstox</groupId> |
|
||||
</exclusion> |
|
||||
<exclusion> |
|
||||
<artifactId>dnsjava</artifactId> |
|
||||
<groupId>dnsjava</groupId> |
|
||||
</exclusion> |
|
||||
<exclusion> |
|
||||
<artifactId>snappy-java</artifactId> |
|
||||
<groupId>org.xerial.snappy</groupId> |
|
||||
</exclusion> |
|
||||
<exclusion> |
|
||||
<artifactId>hadoop-annotations</artifactId> |
|
||||
<groupId>org.apache.hadoop</groupId> |
|
||||
</exclusion> |
|
||||
</exclusions> |
|
||||
</dependency> |
|
||||
<dependency> |
|
||||
<groupId>junit</groupId> |
|
||||
<artifactId>junit</artifactId> |
|
||||
<version>4.13.1</version> |
|
||||
<scope>test</scope> |
|
||||
<exclusions> |
|
||||
<exclusion> |
|
||||
<artifactId>hamcrest-core</artifactId> |
|
||||
<groupId>org.hamcrest</groupId> |
|
||||
</exclusion> |
|
||||
</exclusions> |
|
||||
</dependency> |
|
||||
<dependency> |
|
||||
<groupId>org.mockito</groupId> |
|
||||
<artifactId>mockito-core</artifactId> |
|
||||
<version>3.12.4</version> |
|
||||
<scope>test</scope> |
|
||||
<exclusions> |
|
||||
<exclusion> |
|
||||
<artifactId>byte-buddy</artifactId> |
|
||||
<groupId>net.bytebuddy</groupId> |
|
||||
</exclusion> |
|
||||
<exclusion> |
|
||||
<artifactId>byte-buddy-agent</artifactId> |
|
||||
<groupId>net.bytebuddy</groupId> |
|
||||
</exclusion> |
|
||||
<exclusion> |
|
||||
<artifactId>objenesis</artifactId> |
|
||||
<groupId>org.objenesis</groupId> |
|
||||
</exclusion> |
|
||||
</exclusions> |
|
||||
</dependency> |
|
||||
<dependency> |
|
||||
<groupId>org.apache.hadoop</groupId> |
|
||||
<artifactId>hadoop-common</artifactId> |
|
||||
<version>3.4.0</version> |
|
||||
<type>test-jar</type> |
|
||||
<scope>test</scope> |
|
||||
<exclusions> |
|
||||
<exclusion> |
|
||||
<artifactId>hadoop-shaded-protobuf_3_21</artifactId> |
|
||||
<groupId>org.apache.hadoop.thirdparty</groupId> |
|
||||
</exclusion> |
|
||||
<exclusion> |
|
||||
<artifactId>hadoop-shaded-guava</artifactId> |
|
||||
<groupId>org.apache.hadoop.thirdparty</groupId> |
|
||||
</exclusion> |
|
||||
<exclusion> |
|
||||
<artifactId>commons-cli</artifactId> |
|
||||
<groupId>commons-cli</groupId> |
|
||||
</exclusion> |
|
||||
<exclusion> |
|
||||
<artifactId>commons-math3</artifactId> |
|
||||
<groupId>org.apache.commons</groupId> |
|
||||
</exclusion> |
|
||||
<exclusion> |
|
||||
<artifactId>commons-io</artifactId> |
|
||||
<groupId>commons-io</groupId> |
|
||||
</exclusion> |
|
||||
<exclusion> |
|
||||
<artifactId>commons-net</artifactId> |
|
||||
<groupId>commons-net</groupId> |
|
||||
</exclusion> |
|
||||
<exclusion> |
|
||||
<artifactId>commons-collections</artifactId> |
|
||||
<groupId>commons-collections</groupId> |
|
||||
</exclusion> |
|
||||
<exclusion> |
|
||||
<artifactId>javax.servlet-api</artifactId> |
|
||||
<groupId>javax.servlet</groupId> |
|
||||
</exclusion> |
|
||||
<exclusion> |
|
||||
<artifactId>jakarta.activation-api</artifactId> |
|
||||
<groupId>jakarta.activation</groupId> |
|
||||
</exclusion> |
|
||||
<exclusion> |
|
||||
<artifactId>jetty-server</artifactId> |
|
||||
<groupId>org.eclipse.jetty</groupId> |
|
||||
</exclusion> |
|
||||
<exclusion> |
|
||||
<artifactId>jetty-util</artifactId> |
|
||||
<groupId>org.eclipse.jetty</groupId> |
|
||||
</exclusion> |
|
||||
<exclusion> |
|
||||
<artifactId>jetty-servlet</artifactId> |
|
||||
<groupId>org.eclipse.jetty</groupId> |
|
||||
</exclusion> |
|
||||
<exclusion> |
|
||||
<artifactId>jetty-webapp</artifactId> |
|
||||
<groupId>org.eclipse.jetty</groupId> |
|
||||
</exclusion> |
|
||||
<exclusion> |
|
||||
<artifactId>jsp-api</artifactId> |
|
||||
<groupId>javax.servlet.jsp</groupId> |
|
||||
</exclusion> |
|
||||
<exclusion> |
|
||||
<artifactId>jersey-core</artifactId> |
|
||||
<groupId>com.sun.jersey</groupId> |
|
||||
</exclusion> |
|
||||
<exclusion> |
|
||||
<artifactId>jersey-servlet</artifactId> |
|
||||
<groupId>com.sun.jersey</groupId> |
|
||||
</exclusion> |
|
||||
<exclusion> |
|
||||
<artifactId>jersey-json</artifactId> |
|
||||
<groupId>com.github.pjfanning</groupId> |
|
||||
</exclusion> |
|
||||
<exclusion> |
|
||||
<artifactId>jettison</artifactId> |
|
||||
<groupId>org.codehaus.jettison</groupId> |
|
||||
</exclusion> |
|
||||
<exclusion> |
|
||||
<artifactId>jersey-server</artifactId> |
|
||||
<groupId>com.sun.jersey</groupId> |
|
||||
</exclusion> |
|
||||
<exclusion> |
|
||||
<artifactId>reload4j</artifactId> |
|
||||
<groupId>ch.qos.reload4j</groupId> |
|
||||
</exclusion> |
|
||||
<exclusion> |
|
||||
<artifactId>commons-beanutils</artifactId> |
|
||||
<groupId>commons-beanutils</groupId> |
|
||||
</exclusion> |
|
||||
<exclusion> |
|
||||
<artifactId>commons-configuration2</artifactId> |
|
||||
<groupId>org.apache.commons</groupId> |
|
||||
</exclusion> |
|
||||
<exclusion> |
|
||||
<artifactId>commons-lang3</artifactId> |
|
||||
<groupId>org.apache.commons</groupId> |
|
||||
</exclusion> |
|
||||
<exclusion> |
|
||||
<artifactId>commons-text</artifactId> |
|
||||
<groupId>org.apache.commons</groupId> |
|
||||
</exclusion> |
|
||||
<exclusion> |
|
||||
<artifactId>slf4j-reload4j</artifactId> |
|
||||
<groupId>org.slf4j</groupId> |
|
||||
</exclusion> |
|
||||
<exclusion> |
|
||||
<artifactId>avro</artifactId> |
|
||||
<groupId>org.apache.avro</groupId> |
|
||||
</exclusion> |
|
||||
<exclusion> |
|
||||
<artifactId>re2j</artifactId> |
|
||||
<groupId>com.google.re2j</groupId> |
|
||||
</exclusion> |
|
||||
<exclusion> |
|
||||
<artifactId>hadoop-auth</artifactId> |
|
||||
<groupId>org.apache.hadoop</groupId> |
|
||||
</exclusion> |
|
||||
<exclusion> |
|
||||
<artifactId>jsch</artifactId> |
|
||||
<groupId>com.jcraft</groupId> |
|
||||
</exclusion> |
|
||||
<exclusion> |
|
||||
<artifactId>curator-client</artifactId> |
|
||||
<groupId>org.apache.curator</groupId> |
|
||||
</exclusion> |
|
||||
<exclusion> |
|
||||
<artifactId>curator-recipes</artifactId> |
|
||||
<groupId>org.apache.curator</groupId> |
|
||||
</exclusion> |
|
||||
<exclusion> |
|
||||
<artifactId>zookeeper</artifactId> |
|
||||
<groupId>org.apache.zookeeper</groupId> |
|
||||
</exclusion> |
|
||||
<exclusion> |
|
||||
<artifactId>netty-handler</artifactId> |
|
||||
<groupId>io.netty</groupId> |
|
||||
</exclusion> |
|
||||
<exclusion> |
|
||||
<artifactId>netty-transport-native-epoll</artifactId> |
|
||||
<groupId>io.netty</groupId> |
|
||||
</exclusion> |
|
||||
<exclusion> |
|
||||
<artifactId>metrics-core</artifactId> |
|
||||
<groupId>io.dropwizard.metrics</groupId> |
|
||||
</exclusion> |
|
||||
<exclusion> |
|
||||
<artifactId>commons-compress</artifactId> |
|
||||
<groupId>org.apache.commons</groupId> |
|
||||
</exclusion> |
|
||||
<exclusion> |
|
||||
<artifactId>bcprov-jdk15on</artifactId> |
|
||||
<groupId>org.bouncycastle</groupId> |
|
||||
</exclusion> |
|
||||
<exclusion> |
|
||||
<artifactId>kerb-core</artifactId> |
|
||||
<groupId>org.apache.kerby</groupId> |
|
||||
</exclusion> |
|
||||
<exclusion> |
|
||||
<artifactId>jackson-databind</artifactId> |
|
||||
<groupId>com.fasterxml.jackson.core</groupId> |
|
||||
</exclusion> |
|
||||
<exclusion> |
|
||||
<artifactId>stax2-api</artifactId> |
|
||||
<groupId>org.codehaus.woodstox</groupId> |
|
||||
</exclusion> |
|
||||
<exclusion> |
|
||||
<artifactId>woodstox-core</artifactId> |
|
||||
<groupId>com.fasterxml.woodstox</groupId> |
|
||||
</exclusion> |
|
||||
<exclusion> |
|
||||
<artifactId>dnsjava</artifactId> |
|
||||
<groupId>dnsjava</groupId> |
|
||||
</exclusion> |
|
||||
<exclusion> |
|
||||
<artifactId>snappy-java</artifactId> |
|
||||
<groupId>org.xerial.snappy</groupId> |
|
||||
</exclusion> |
|
||||
<exclusion> |
|
||||
<artifactId>hadoop-annotations</artifactId> |
|
||||
<groupId>org.apache.hadoop</groupId> |
|
||||
</exclusion> |
|
||||
</exclusions> |
|
||||
</dependency> |
|
||||
</dependencies> |
|
||||
<properties> |
|
||||
<seaweedfs.client.version>3.80</seaweedfs.client.version> |
|
||||
<hadoop.version>3.4.0</hadoop.version> |
|
||||
</properties> |
|
||||
</project> |
|
||||
@ -1,195 +0,0 @@ |
|||||
<?xml version="1.0" encoding="UTF-8"?> |
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" |
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" |
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> |
|
||||
<modelVersion>4.0.0</modelVersion> |
|
||||
|
|
||||
<properties> |
|
||||
<seaweedfs.client.version>3.80.1-SNAPSHOT</seaweedfs.client.version> |
|
||||
<hadoop.version>3.4.0</hadoop.version> |
|
||||
</properties> |
|
||||
|
|
||||
<groupId>com.seaweedfs</groupId> |
|
||||
<artifactId>seaweedfs-hadoop2-client</artifactId> |
|
||||
<version>${seaweedfs.client.version}</version> |
|
||||
|
|
||||
<name>SeaweedFS HDFS2 Client</name> |
|
||||
<description>A java client for SeaweedFS.</description> |
|
||||
<url>https://github.com/seaweedfs/seaweedfs</url> |
|
||||
<licenses> |
|
||||
<license> |
|
||||
<name>The Apache License, Version 2.0</name> |
|
||||
<url>http://www.apache.org/licenses/LICENSE-2.0.txt</url> |
|
||||
</license> |
|
||||
</licenses> |
|
||||
<developers> |
|
||||
<developer> |
|
||||
<name>Chris Lu</name> |
|
||||
<email>chris.lu@gmail.com</email> |
|
||||
<organization>SeaweedFS</organization> |
|
||||
<organizationUrl>https://seaweedfs.com</organizationUrl> |
|
||||
</developer> |
|
||||
</developers> |
|
||||
<scm> |
|
||||
<connection>scm:git:git://github.com/seaweedfs/seaweedfs.git</connection> |
|
||||
<developerConnection>scm:git:ssh://github.com:seaweedfs/seaweedfs.git</developerConnection> |
|
||||
<url>https://github.com/seaweedfs/seaweedfs/tree/master</url> |
|
||||
</scm> |
|
||||
|
|
||||
<build> |
|
||||
<plugins> |
|
||||
<plugin> |
|
||||
<groupId>org.apache.maven.plugins</groupId> |
|
||||
<artifactId>maven-compiler-plugin</artifactId> |
|
||||
<configuration> |
|
||||
<source>8</source> |
|
||||
<target>8</target> |
|
||||
<release>8</release> |
|
||||
</configuration> |
|
||||
</plugin> |
|
||||
<plugin> |
|
||||
<groupId>org.apache.maven.plugins</groupId> |
|
||||
<artifactId>maven-shade-plugin</artifactId> |
|
||||
<version>3.2.1</version> |
|
||||
<executions> |
|
||||
<execution> |
|
||||
<phase>package</phase> |
|
||||
<goals> |
|
||||
<goal>shade</goal> |
|
||||
</goals> |
|
||||
<configuration> |
|
||||
<filters> |
|
||||
<filter> |
|
||||
<artifact>*:*</artifact> |
|
||||
<excludes> |
|
||||
<exclude>META-INF/*.SF</exclude> |
|
||||
<exclude>META-INF/*.DSA</exclude> |
|
||||
<exclude>META-INF/*.RSA</exclude> |
|
||||
<exclude>org/slf4j/**</exclude> |
|
||||
<exclude>META-INF/maven/org.slf4j/**</exclude> |
|
||||
</excludes> |
|
||||
</filter> |
|
||||
</filters> |
|
||||
<transformers> |
|
||||
<transformer |
|
||||
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> |
|
||||
</transformers> |
|
||||
<relocations> |
|
||||
<relocation> |
|
||||
<pattern>com.google</pattern> |
|
||||
<shadedPattern>shaded.com.google</shadedPattern> |
|
||||
</relocation> |
|
||||
<relocation> |
|
||||
<pattern>io.grpc.internal</pattern> |
|
||||
<shadedPattern>shaded.io.grpc.internal</shadedPattern> |
|
||||
</relocation> |
|
||||
<relocation> |
|
||||
<pattern>org.apache.commons</pattern> |
|
||||
<shadedPattern>shaded.org.apache.commons</shadedPattern> |
|
||||
<excludes> |
|
||||
<exclude>org.apache.hadoop</exclude> |
|
||||
<exclude>org.apache.log4j</exclude> |
|
||||
</excludes> |
|
||||
</relocation> |
|
||||
<relocation> |
|
||||
<pattern>org.apache.http</pattern> |
|
||||
<shadedPattern>shaded.org.apache.http</shadedPattern> |
|
||||
</relocation> |
|
||||
</relocations> |
|
||||
</configuration> |
|
||||
</execution> |
|
||||
</executions> |
|
||||
</plugin> |
|
||||
<plugin> |
|
||||
<groupId>org.apache.maven.plugins</groupId> |
|
||||
<artifactId>maven-gpg-plugin</artifactId> |
|
||||
<version>1.5</version> |
|
||||
<executions> |
|
||||
<execution> |
|
||||
<id>sign-artifacts</id> |
|
||||
<phase>verify</phase> |
|
||||
<goals> |
|
||||
<goal>sign</goal> |
|
||||
</goals> |
|
||||
</execution> |
|
||||
</executions> |
|
||||
</plugin> |
|
||||
<plugin> |
|
||||
<groupId>org.sonatype.central</groupId> |
|
||||
<artifactId>central-publishing-maven-plugin</artifactId> |
|
||||
<version>0.5.0</version> |
|
||||
<extensions>true</extensions> |
|
||||
<configuration> |
|
||||
<publishingServerId>central</publishingServerId> |
|
||||
<autoPublish>true</autoPublish> |
|
||||
</configuration> |
|
||||
</plugin> |
|
||||
<plugin> |
|
||||
<groupId>org.apache.maven.plugins</groupId> |
|
||||
<artifactId>maven-source-plugin</artifactId> |
|
||||
<version>2.2.1</version> |
|
||||
<executions> |
|
||||
<execution> |
|
||||
<id>attach-sources</id> |
|
||||
<goals> |
|
||||
<goal>jar-no-fork</goal> |
|
||||
</goals> |
|
||||
</execution> |
|
||||
</executions> |
|
||||
</plugin> |
|
||||
<plugin> |
|
||||
<groupId>org.apache.maven.plugins</groupId> |
|
||||
<artifactId>maven-javadoc-plugin</artifactId> |
|
||||
<version>3.0.1</version> |
|
||||
<executions> |
|
||||
<execution> |
|
||||
<id>attach-javadocs</id> |
|
||||
<goals> |
|
||||
<goal>jar</goal> |
|
||||
</goals> |
|
||||
</execution> |
|
||||
</executions> |
|
||||
</plugin> |
|
||||
</plugins> |
|
||||
</build> |
|
||||
|
|
||||
<dependencies> |
|
||||
<dependency> |
|
||||
<groupId>org.apache.hadoop</groupId> |
|
||||
<artifactId>hadoop-client</artifactId> |
|
||||
<version>${hadoop.version}</version> |
|
||||
<scope>provided</scope> |
|
||||
</dependency> |
|
||||
<dependency> |
|
||||
<groupId>com.seaweedfs</groupId> |
|
||||
<artifactId>seaweedfs-client</artifactId> |
|
||||
<version>${seaweedfs.client.version}</version> |
|
||||
</dependency> |
|
||||
<dependency> |
|
||||
<groupId>org.apache.hadoop</groupId> |
|
||||
<artifactId>hadoop-common</artifactId> |
|
||||
<version>${hadoop.version}</version> |
|
||||
<scope>provided</scope> |
|
||||
</dependency> |
|
||||
<dependency> |
|
||||
<groupId>junit</groupId> |
|
||||
<artifactId>junit</artifactId> |
|
||||
<version>4.13.1</version> |
|
||||
<scope>test</scope> |
|
||||
</dependency> |
|
||||
<dependency> |
|
||||
<groupId>org.mockito</groupId> |
|
||||
<artifactId>mockito-core</artifactId> |
|
||||
<version>3.12.4</version> |
|
||||
<scope>test</scope> |
|
||||
</dependency> |
|
||||
<dependency> |
|
||||
<groupId>org.apache.hadoop</groupId> |
|
||||
<artifactId>hadoop-common</artifactId> |
|
||||
<version>${hadoop.version}</version> |
|
||||
<scope>test</scope> |
|
||||
<type>test-jar</type> |
|
||||
</dependency> |
|
||||
</dependencies> |
|
||||
|
|
||||
</project> |
|
||||
@ -1,25 +0,0 @@ |
|||||
package seaweed.hdfs; |
|
||||
|
|
||||
import org.apache.hadoop.fs.*; |
|
||||
|
|
||||
import java.io.IOException; |
|
||||
import java.nio.ByteBuffer; |
|
||||
|
|
||||
public class BufferedByteBufferReadableInputStream extends BufferedFSInputStream implements ByteBufferReadable { |
|
||||
|
|
||||
public BufferedByteBufferReadableInputStream(FSInputStream in, int size) { |
|
||||
super(in, size); |
|
||||
if (!(in instanceof Seekable) || !(in instanceof PositionedReadable)) { |
|
||||
throw new IllegalArgumentException("In is not an instance of Seekable or PositionedReadable"); |
|
||||
} |
|
||||
} |
|
||||
|
|
||||
@Override |
|
||||
public int read(ByteBuffer buf) throws IOException { |
|
||||
if (this.in instanceof ByteBufferReadable) { |
|
||||
return ((ByteBufferReadable)this.in).read(buf); |
|
||||
} else { |
|
||||
throw new UnsupportedOperationException("Byte-buffer read unsupported by input stream"); |
|
||||
} |
|
||||
} |
|
||||
} |
|
||||
@ -1,35 +0,0 @@ |
|||||
/** |
|
||||
* Licensed to the Apache Software Foundation (ASF) under one |
|
||||
* or more contributor license agreements. See the NOTICE file |
|
||||
* distributed with this work for additional information |
|
||||
* regarding copyright ownership. The ASF licenses this file |
|
||||
* to you under the Apache License, Version 2.0 (the |
|
||||
* "License"); you may not use this file except in compliance |
|
||||
* with the License. You may obtain a copy of the License at |
|
||||
* |
|
||||
* http://www.apache.org/licenses/LICENSE-2.0 |
|
||||
* |
|
||||
* Unless required by applicable law or agreed to in writing, software |
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
||||
* See the License for the specific language governing permissions and |
|
||||
* limitations under the License. |
|
||||
*/ |
|
||||
|
|
||||
package seaweed.hdfs; |
|
||||
|
|
||||
import org.apache.hadoop.conf.Configuration; |
|
||||
import org.apache.hadoop.fs.DelegateToFileSystem; |
|
||||
|
|
||||
import java.io.IOException; |
|
||||
import java.net.URI; |
|
||||
import java.net.URISyntaxException; |
|
||||
|
|
||||
public class SeaweedAbstractFileSystem extends DelegateToFileSystem { |
|
||||
|
|
||||
SeaweedAbstractFileSystem(final URI uri, final Configuration conf) |
|
||||
throws IOException, URISyntaxException { |
|
||||
super(uri, new SeaweedFileSystem(), conf, "seaweedfs", false); |
|
||||
} |
|
||||
|
|
||||
} |
|
||||
@ -1,643 +0,0 @@ |
|||||
package seaweed.hdfs; |
|
||||
|
|
||||
import org.apache.hadoop.conf.Configuration; |
|
||||
import org.apache.hadoop.fs.*; |
|
||||
import org.apache.hadoop.fs.permission.AclEntry; |
|
||||
import org.apache.hadoop.fs.permission.AclStatus; |
|
||||
import org.apache.hadoop.fs.permission.FsPermission; |
|
||||
import org.apache.hadoop.security.UserGroupInformation; |
|
||||
import org.apache.hadoop.util.Progressable; |
|
||||
import org.slf4j.Logger; |
|
||||
import org.slf4j.LoggerFactory; |
|
||||
import seaweedfs.client.FilerProto; |
|
||||
|
|
||||
import java.io.FileNotFoundException; |
|
||||
import java.io.IOException; |
|
||||
import java.io.OutputStream; |
|
||||
import java.net.URI; |
|
||||
import java.util.EnumSet; |
|
||||
import java.util.List; |
|
||||
import java.util.Map; |
|
||||
|
|
||||
public class SeaweedFileSystem extends FileSystem { |
|
||||
|
|
||||
public static final String FS_SEAWEED_FILER_HOST = "fs.seaweed.filer.host"; |
|
||||
public static final String FS_SEAWEED_FILER_PORT = "fs.seaweed.filer.port"; |
|
||||
public static final String FS_SEAWEED_FILER_PORT_GRPC = "fs.seaweed.filer.port.grpc"; |
|
||||
public static final int FS_SEAWEED_DEFAULT_PORT = 8888; |
|
||||
public static final String FS_SEAWEED_BUFFER_SIZE = "fs.seaweed.buffer.size"; |
|
||||
public static final String FS_SEAWEED_REPLICATION = "fs.seaweed.replication"; |
|
||||
public static final String FS_SEAWEED_VOLUME_SERVER_ACCESS = "fs.seaweed.volume.server.access"; |
|
||||
public static final int FS_SEAWEED_DEFAULT_BUFFER_SIZE = 4 * 1024 * 1024; |
|
||||
public static final String FS_SEAWEED_FILER_CN = "fs.seaweed.filer.cn"; |
|
||||
|
|
||||
private static final Logger LOG = LoggerFactory.getLogger(SeaweedFileSystem.class); |
|
||||
|
|
||||
private URI uri; |
|
||||
private Path workingDirectory = new Path("/"); |
|
||||
private SeaweedFileSystemStore seaweedFileSystemStore; |
|
||||
|
|
||||
public URI getUri() { |
|
||||
return uri; |
|
||||
} |
|
||||
|
|
||||
public String getScheme() { |
|
||||
return "seaweedfs"; |
|
||||
} |
|
||||
|
|
||||
@Override |
|
||||
public void initialize(URI uri, Configuration conf) throws IOException { // get |
|
||||
super.initialize(uri, conf); |
|
||||
|
|
||||
// get host information from uri (overrides info in conf) |
|
||||
String host = uri.getHost(); |
|
||||
host = (host == null) ? conf.get(FS_SEAWEED_FILER_HOST, "localhost") : host; |
|
||||
conf.set(FS_SEAWEED_FILER_HOST, host); |
|
||||
|
|
||||
// get port information from uri, (overrides info in conf) |
|
||||
int port = uri.getPort(); |
|
||||
port = (port == -1) ? FS_SEAWEED_DEFAULT_PORT : port; |
|
||||
conf.setInt(FS_SEAWEED_FILER_PORT, port); |
|
||||
|
|
||||
int grpcPort = conf.getInt(FS_SEAWEED_FILER_PORT_GRPC, port + 10000); |
|
||||
|
|
||||
setConf(conf); |
|
||||
this.uri = uri; |
|
||||
|
|
||||
String cn = conf.get(FS_SEAWEED_FILER_CN, ""); |
|
||||
|
|
||||
seaweedFileSystemStore = new SeaweedFileSystemStore(host, port, grpcPort, cn, conf); |
|
||||
} |
|
||||
|
|
||||
@Override |
|
||||
public void close() throws IOException { |
|
||||
super.close(); |
|
||||
this.seaweedFileSystemStore.close(); |
|
||||
} |
|
||||
|
|
||||
@Override |
|
||||
public FSDataInputStream open(Path path, int bufferSize) throws IOException { |
|
||||
|
|
||||
LOG.debug("open path: {} bufferSize:{}", path, bufferSize); |
|
||||
|
|
||||
path = qualify(path); |
|
||||
|
|
||||
try { |
|
||||
int seaweedBufferSize = this.getConf().getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE); |
|
||||
FSInputStream inputStream = seaweedFileSystemStore.openFileForRead(path, statistics); |
|
||||
return new FSDataInputStream(new BufferedByteBufferReadableInputStream(inputStream, 4 * seaweedBufferSize)); |
|
||||
} catch (Exception ex) { |
|
||||
LOG.error("Failed to open file: {} bufferSize:{}", path, bufferSize, ex); |
|
||||
throw new IOException("Failed to open file: " + path, ex); |
|
||||
} |
|
||||
} |
|
||||
|
|
||||
@Override |
|
||||
public FSDataOutputStream create(Path path, FsPermission permission, final boolean overwrite, final int bufferSize, |
|
||||
final short replication, final long blockSize, final Progressable progress) throws IOException { |
|
||||
|
|
||||
LOG.debug("create path: {} bufferSize:{} blockSize:{}", path, bufferSize, blockSize); |
|
||||
|
|
||||
path = qualify(path); |
|
||||
|
|
||||
try { |
|
||||
// Priority: 1) non-empty FS_SEAWEED_REPLICATION, 2) empty string -> filer |
|
||||
// default, 3) null -> HDFS replication |
|
||||
String replicaPlacement = this.getConf().get(FS_SEAWEED_REPLICATION); |
|
||||
if (replicaPlacement == null) { |
|
||||
// Not configured, use HDFS replication parameter. This creates a "00N" |
|
||||
// replication string, |
|
||||
// placing N (replication-1) extra replicas on different servers in the same |
|
||||
// rack. |
|
||||
replicaPlacement = String.format("%03d", replication - 1); |
|
||||
} |
|
||||
int seaweedBufferSize = this.getConf().getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE); |
|
||||
OutputStream outputStream = seaweedFileSystemStore.createFile(path, overwrite, permission, |
|
||||
seaweedBufferSize, replicaPlacement); |
|
||||
return new FSDataOutputStream(outputStream, statistics); |
|
||||
} catch (Exception ex) { |
|
||||
LOG.error("Failed to create file: {} bufferSize:{} blockSize:{}", path, bufferSize, blockSize, ex); |
|
||||
throw new IOException("Failed to create file: " + path, ex); |
|
||||
} |
|
||||
} |
|
||||
|
|
||||
/** |
|
||||
* {@inheritDoc} |
|
||||
* |
|
||||
* @throws FileNotFoundException if the parent directory is not present -or |
|
||||
* is not a directory. |
|
||||
*/ |
|
||||
@Override |
|
||||
public FSDataOutputStream createNonRecursive(Path path, |
|
||||
FsPermission permission, |
|
||||
EnumSet<CreateFlag> flags, |
|
||||
int bufferSize, |
|
||||
short replication, |
|
||||
long blockSize, |
|
||||
Progressable progress) throws IOException { |
|
||||
Path parent = path.getParent(); |
|
||||
if (parent != null) { |
|
||||
// expect this to raise an exception if there is no parent |
|
||||
if (!getFileStatus(parent).isDirectory()) { |
|
||||
throw new FileAlreadyExistsException("Not a directory: " + parent); |
|
||||
} |
|
||||
} |
|
||||
int seaweedBufferSize = this.getConf().getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE); |
|
||||
return create(path, permission, |
|
||||
flags.contains(CreateFlag.OVERWRITE), bufferSize, |
|
||||
replication, seaweedBufferSize, progress); |
|
||||
} |
|
||||
|
|
||||
@Override |
|
||||
public FSDataOutputStream append(Path path, int bufferSize, Progressable progressable) throws IOException { |
|
||||
|
|
||||
LOG.debug("append path: {} bufferSize:{}", path, bufferSize); |
|
||||
|
|
||||
path = qualify(path); |
|
||||
try { |
|
||||
int seaweedBufferSize = this.getConf().getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE); |
|
||||
OutputStream outputStream = seaweedFileSystemStore.createFile(path, false, null, seaweedBufferSize, ""); |
|
||||
return new FSDataOutputStream(outputStream, statistics); |
|
||||
} catch (Exception ex) { |
|
||||
LOG.error("Failed to append to file: {} bufferSize:{}", path, bufferSize, ex); |
|
||||
throw new IOException("Failed to append to file: " + path, ex); |
|
||||
} |
|
||||
} |
|
||||
|
|
||||
@Override |
|
||||
public boolean rename(Path src, Path dst) throws IOException { |
|
||||
|
|
||||
LOG.debug("rename path: {} => {}", src, dst); |
|
||||
|
|
||||
if (src.isRoot()) { |
|
||||
return false; |
|
||||
} |
|
||||
|
|
||||
if (src.equals(dst)) { |
|
||||
return true; |
|
||||
} |
|
||||
FilerProto.Entry entry = seaweedFileSystemStore.lookupEntry(dst); |
|
||||
|
|
||||
Path adjustedDst = dst; |
|
||||
|
|
||||
if (entry != null) { |
|
||||
FileStatus dstFileStatus = getFileStatus(dst); |
|
||||
String sourceFileName = src.getName(); |
|
||||
if (!dstFileStatus.isDirectory()) { |
|
||||
return false; |
|
||||
} |
|
||||
adjustedDst = new Path(dst, sourceFileName); |
|
||||
} |
|
||||
|
|
||||
Path qualifiedSrcPath = qualify(src); |
|
||||
Path qualifiedDstPath = qualify(adjustedDst); |
|
||||
|
|
||||
seaweedFileSystemStore.rename(qualifiedSrcPath, qualifiedDstPath); |
|
||||
return true; |
|
||||
} |
|
||||
|
|
||||
@Override |
|
||||
public boolean delete(Path path, boolean recursive) throws IOException { |
|
||||
|
|
||||
LOG.debug("delete path: {} recursive:{}", path, recursive); |
|
||||
|
|
||||
path = qualify(path); |
|
||||
|
|
||||
FilerProto.Entry entry = seaweedFileSystemStore.lookupEntry(path); |
|
||||
|
|
||||
if (entry == null) { |
|
||||
return true; |
|
||||
} |
|
||||
|
|
||||
FileStatus fileStatus = getFileStatus(path); |
|
||||
|
|
||||
return seaweedFileSystemStore.deleteEntries(path, fileStatus.isDirectory(), recursive); |
|
||||
|
|
||||
} |
|
||||
|
|
||||
@Override |
|
||||
public FileStatus[] listStatus(Path path) throws IOException { |
|
||||
|
|
||||
LOG.debug("listStatus path: {}", path); |
|
||||
|
|
||||
path = qualify(path); |
|
||||
|
|
||||
return seaweedFileSystemStore.listEntries(path); |
|
||||
} |
|
||||
|
|
||||
@Override |
|
||||
public Path getWorkingDirectory() { |
|
||||
return workingDirectory; |
|
||||
} |
|
||||
|
|
||||
@Override |
|
||||
public void setWorkingDirectory(Path path) { |
|
||||
if (path.isAbsolute()) { |
|
||||
workingDirectory = path; |
|
||||
} else { |
|
||||
workingDirectory = new Path(workingDirectory, path); |
|
||||
} |
|
||||
} |
|
||||
|
|
||||
@Override |
|
||||
public boolean mkdirs(Path path, FsPermission fsPermission) throws IOException { |
|
||||
|
|
||||
LOG.debug("mkdirs path: {}", path); |
|
||||
|
|
||||
path = qualify(path); |
|
||||
|
|
||||
FilerProto.Entry entry = seaweedFileSystemStore.lookupEntry(path); |
|
||||
|
|
||||
if (entry == null) { |
|
||||
|
|
||||
UserGroupInformation currentUser = UserGroupInformation.getCurrentUser(); |
|
||||
return seaweedFileSystemStore.createDirectory(path, currentUser, |
|
||||
fsPermission == null ? FsPermission.getDirDefault() : fsPermission, |
|
||||
FsPermission.getUMask(getConf())); |
|
||||
|
|
||||
} |
|
||||
|
|
||||
FileStatus fileStatus = getFileStatus(path); |
|
||||
|
|
||||
if (fileStatus.isDirectory()) { |
|
||||
return true; |
|
||||
} else { |
|
||||
throw new FileAlreadyExistsException("Path is a file: " + path); |
|
||||
} |
|
||||
} |
|
||||
|
|
||||
@Override |
|
||||
public FileStatus getFileStatus(Path path) throws IOException { |
|
||||
|
|
||||
LOG.debug("getFileStatus path: {}", path); |
|
||||
|
|
||||
path = qualify(path); |
|
||||
|
|
||||
return seaweedFileSystemStore.getFileStatus(path); |
|
||||
} |
|
||||
|
|
||||
/** |
|
||||
* Set owner of a path (i.e. a file or a directory). |
|
||||
* The parameters owner and group cannot both be null. |
|
||||
* |
|
||||
* @param path The path |
|
||||
* @param owner If it is null, the original username remains unchanged. |
|
||||
* @param group If it is null, the original groupname remains unchanged. |
|
||||
*/ |
|
||||
@Override |
|
||||
public void setOwner(Path path, final String owner, final String group) |
|
||||
throws IOException { |
|
||||
LOG.debug("setOwner path: {}", path); |
|
||||
path = qualify(path); |
|
||||
|
|
||||
seaweedFileSystemStore.setOwner(path, owner, group); |
|
||||
} |
|
||||
|
|
||||
/** |
|
||||
* Set permission of a path. |
|
||||
* |
|
||||
* @param path The path |
|
||||
* @param permission Access permission |
|
||||
*/ |
|
||||
@Override |
|
||||
public void setPermission(Path path, final FsPermission permission) throws IOException { |
|
||||
LOG.debug("setPermission path: {}", path); |
|
||||
|
|
||||
if (permission == null) { |
|
||||
throw new IllegalArgumentException("The permission can't be null"); |
|
||||
} |
|
||||
|
|
||||
path = qualify(path); |
|
||||
|
|
||||
seaweedFileSystemStore.setPermission(path, permission); |
|
||||
} |
|
||||
|
|
||||
Path qualify(Path path) { |
|
||||
return path.makeQualified(uri, workingDirectory); |
|
||||
} |
|
||||
|
|
||||
/** |
|
||||
* Concat existing files together. |
|
||||
* |
|
||||
* @param trg the path to the target destination. |
|
||||
* @param psrcs the paths to the sources to use for the concatenation. |
|
||||
* @throws IOException IO failure |
|
||||
* @throws UnsupportedOperationException if the operation is unsupported |
|
||||
* (default). |
|
||||
*/ |
|
||||
@Override |
|
||||
public void concat(final Path trg, final Path[] psrcs) throws IOException { |
|
||||
throw new UnsupportedOperationException("Not implemented by the " + |
|
||||
getClass().getSimpleName() + " FileSystem implementation"); |
|
||||
} |
|
||||
|
|
||||
/** |
|
||||
* Truncate the file in the indicated path to the indicated size. |
|
||||
* <ul> |
|
||||
* <li>Fails if path is a directory.</li> |
|
||||
* <li>Fails if path does not exist.</li> |
|
||||
* <li>Fails if path is not closed.</li> |
|
||||
* <li>Fails if new size is greater than current size.</li> |
|
||||
* </ul> |
|
||||
* |
|
||||
* @param f The path to the file to be truncated |
|
||||
* @param newLength The size the file is to be truncated to |
|
||||
* @return <code>true</code> if the file has been truncated to the desired |
|
||||
* <code>newLength</code> and is immediately available to be reused for |
|
||||
* write operations such as <code>append</code>, or |
|
||||
* <code>false</code> if a background process of adjusting the length of |
|
||||
* the last block has been started, and clients should wait for it to |
|
||||
* complete before proceeding with further file updates. |
|
||||
* @throws IOException IO failure |
|
||||
* @throws UnsupportedOperationException if the operation is unsupported |
|
||||
* (default). |
|
||||
*/ |
|
||||
@Override |
|
||||
public boolean truncate(Path f, long newLength) throws IOException { |
|
||||
throw new UnsupportedOperationException("Not implemented by the " + |
|
||||
getClass().getSimpleName() + " FileSystem implementation"); |
|
||||
} |
|
||||
|
|
||||
@Override |
|
||||
public void createSymlink(final Path target, final Path link, |
|
||||
final boolean createParent) throws IOException { |
|
||||
// Supporting filesystems should override this method |
|
||||
throw new UnsupportedOperationException( |
|
||||
"Filesystem does not support symlinks!"); |
|
||||
} |
|
||||
|
|
||||
public boolean supportsSymlinks() { |
|
||||
return false; |
|
||||
} |
|
||||
|
|
||||
/** |
|
||||
* Create a snapshot. |
|
||||
* |
|
||||
* @param path The directory where snapshots will be taken. |
|
||||
* @param snapshotName The name of the snapshot |
|
||||
* @return the snapshot path. |
|
||||
* @throws IOException IO failure |
|
||||
* @throws UnsupportedOperationException if the operation is unsupported |
|
||||
*/ |
|
||||
@Override |
|
||||
public Path createSnapshot(Path path, String snapshotName) |
|
||||
throws IOException { |
|
||||
throw new UnsupportedOperationException(getClass().getSimpleName() |
|
||||
+ " doesn't support createSnapshot"); |
|
||||
} |
|
||||
|
|
||||
/** |
|
||||
* Rename a snapshot. |
|
||||
* |
|
||||
* @param path The directory path where the snapshot was taken |
|
||||
* @param snapshotOldName Old name of the snapshot |
|
||||
* @param snapshotNewName New name of the snapshot |
|
||||
* @throws IOException IO failure |
|
||||
* @throws UnsupportedOperationException if the operation is unsupported |
|
||||
* (default outcome). |
|
||||
*/ |
|
||||
@Override |
|
||||
public void renameSnapshot(Path path, String snapshotOldName, |
|
||||
String snapshotNewName) throws IOException { |
|
||||
throw new UnsupportedOperationException(getClass().getSimpleName() |
|
||||
+ " doesn't support renameSnapshot"); |
|
||||
} |
|
||||
|
|
||||
/** |
|
||||
* Delete a snapshot of a directory. |
|
||||
* |
|
||||
* @param path The directory that the to-be-deleted snapshot belongs to |
|
||||
* @param snapshotName The name of the snapshot |
|
||||
* @throws IOException IO failure |
|
||||
* @throws UnsupportedOperationException if the operation is unsupported |
|
||||
* (default outcome). |
|
||||
*/ |
|
||||
@Override |
|
||||
public void deleteSnapshot(Path path, String snapshotName) |
|
||||
throws IOException { |
|
||||
throw new UnsupportedOperationException(getClass().getSimpleName() |
|
||||
+ " doesn't support deleteSnapshot"); |
|
||||
} |
|
||||
|
|
||||
/** |
|
||||
* Modifies ACL entries of files and directories. This method can add new ACL |
|
||||
* entries or modify the permissions on existing ACL entries. All existing |
|
||||
* ACL entries that are not specified in this call are retained without |
|
||||
* changes. (Modifications are merged into the current ACL.) |
|
||||
* |
|
||||
* @param path Path to modify |
|
||||
* @param aclSpec List<AclEntry> describing modifications |
|
||||
* @throws IOException if an ACL could not be modified |
|
||||
* @throws UnsupportedOperationException if the operation is unsupported |
|
||||
* (default outcome). |
|
||||
*/ |
|
||||
@Override |
|
||||
public void modifyAclEntries(Path path, List<AclEntry> aclSpec) |
|
||||
throws IOException { |
|
||||
throw new UnsupportedOperationException(getClass().getSimpleName() |
|
||||
+ " doesn't support modifyAclEntries"); |
|
||||
} |
|
||||
|
|
||||
/** |
|
||||
* Removes ACL entries from files and directories. Other ACL entries are |
|
||||
* retained. |
|
||||
* |
|
||||
* @param path Path to modify |
|
||||
* @param aclSpec List describing entries to remove |
|
||||
* @throws IOException if an ACL could not be modified |
|
||||
* @throws UnsupportedOperationException if the operation is unsupported |
|
||||
* (default outcome). |
|
||||
*/ |
|
||||
@Override |
|
||||
public void removeAclEntries(Path path, List<AclEntry> aclSpec) |
|
||||
throws IOException { |
|
||||
throw new UnsupportedOperationException(getClass().getSimpleName() |
|
||||
+ " doesn't support removeAclEntries"); |
|
||||
} |
|
||||
|
|
||||
/** |
|
||||
* Removes all default ACL entries from files and directories. |
|
||||
* |
|
||||
* @param path Path to modify |
|
||||
* @throws IOException if an ACL could not be modified |
|
||||
* @throws UnsupportedOperationException if the operation is unsupported |
|
||||
* (default outcome). |
|
||||
*/ |
|
||||
@Override |
|
||||
public void removeDefaultAcl(Path path) |
|
||||
throws IOException { |
|
||||
throw new UnsupportedOperationException(getClass().getSimpleName() |
|
||||
+ " doesn't support removeDefaultAcl"); |
|
||||
} |
|
||||
|
|
||||
/** |
|
||||
* Removes all but the base ACL entries of files and directories. The entries |
|
||||
* for user, group, and others are retained for compatibility with permission |
|
||||
* bits. |
|
||||
* |
|
||||
* @param path Path to modify |
|
||||
* @throws IOException if an ACL could not be removed |
|
||||
* @throws UnsupportedOperationException if the operation is unsupported |
|
||||
* (default outcome). |
|
||||
*/ |
|
||||
@Override |
|
||||
public void removeAcl(Path path) |
|
||||
throws IOException { |
|
||||
throw new UnsupportedOperationException(getClass().getSimpleName() |
|
||||
+ " doesn't support removeAcl"); |
|
||||
} |
|
||||
|
|
||||
/** |
|
||||
* Fully replaces ACL of files and directories, discarding all existing |
|
||||
* entries. |
|
||||
* |
|
||||
* @param path Path to modify |
|
||||
* @param aclSpec List describing modifications, which must include entries |
|
||||
* for user, group, and others for compatibility with permission |
|
||||
* bits. |
|
||||
* @throws IOException if an ACL could not be modified |
|
||||
* @throws UnsupportedOperationException if the operation is unsupported |
|
||||
* (default outcome). |
|
||||
*/ |
|
||||
@Override |
|
||||
public void setAcl(Path path, List<AclEntry> aclSpec) throws IOException { |
|
||||
throw new UnsupportedOperationException(getClass().getSimpleName() |
|
||||
+ " doesn't support setAcl"); |
|
||||
} |
|
||||
|
|
||||
/** |
|
||||
* Gets the ACL of a file or directory. |
|
||||
* |
|
||||
* @param path Path to get |
|
||||
* @return AclStatus describing the ACL of the file or directory |
|
||||
* @throws IOException if an ACL could not be read |
|
||||
* @throws UnsupportedOperationException if the operation is unsupported |
|
||||
* (default outcome). |
|
||||
*/ |
|
||||
@Override |
|
||||
public AclStatus getAclStatus(Path path) throws IOException { |
|
||||
throw new UnsupportedOperationException(getClass().getSimpleName() |
|
||||
+ " doesn't support getAclStatus"); |
|
||||
} |
|
||||
|
|
||||
/** |
|
||||
* Set an xattr of a file or directory. |
|
||||
* The name must be prefixed with the namespace followed by ".". For example, |
|
||||
* "user.attr". |
|
||||
* <p> |
|
||||
* Refer to the HDFS extended attributes user documentation for details. |
|
||||
* |
|
||||
* @param path Path to modify |
|
||||
* @param name xattr name. |
|
||||
* @param value xattr value. |
|
||||
* @param flag xattr set flag |
|
||||
* @throws IOException IO failure |
|
||||
* @throws UnsupportedOperationException if the operation is unsupported |
|
||||
* (default outcome). |
|
||||
*/ |
|
||||
@Override |
|
||||
public void setXAttr(Path path, String name, byte[] value, |
|
||||
EnumSet<XAttrSetFlag> flag) throws IOException { |
|
||||
throw new UnsupportedOperationException(getClass().getSimpleName() |
|
||||
+ " doesn't support setXAttr"); |
|
||||
} |
|
||||
|
|
||||
/** |
|
||||
* Get an xattr name and value for a file or directory. |
|
||||
* The name must be prefixed with the namespace followed by ".". For example, |
|
||||
* "user.attr". |
|
||||
* <p> |
|
||||
* Refer to the HDFS extended attributes user documentation for details. |
|
||||
* |
|
||||
* @param path Path to get extended attribute |
|
||||
* @param name xattr name. |
|
||||
* @return byte[] xattr value. |
|
||||
* @throws IOException IO failure |
|
||||
* @throws UnsupportedOperationException if the operation is unsupported |
|
||||
* (default outcome). |
|
||||
*/ |
|
||||
@Override |
|
||||
public byte[] getXAttr(Path path, String name) throws IOException { |
|
||||
throw new UnsupportedOperationException(getClass().getSimpleName() |
|
||||
+ " doesn't support getXAttr"); |
|
||||
} |
|
||||
|
|
||||
/** |
|
||||
* Get all of the xattr name/value pairs for a file or directory. |
|
||||
* Only those xattrs which the logged-in user has permissions to view |
|
||||
* are returned. |
|
||||
* <p> |
|
||||
* Refer to the HDFS extended attributes user documentation for details. |
|
||||
* |
|
||||
* @param path Path to get extended attributes |
|
||||
* @return Map describing the XAttrs of the file or directory |
|
||||
* @throws IOException IO failure |
|
||||
* @throws UnsupportedOperationException if the operation is unsupported |
|
||||
* (default outcome). |
|
||||
*/ |
|
||||
@Override |
|
||||
public Map<String, byte[]> getXAttrs(Path path) throws IOException { |
|
||||
throw new UnsupportedOperationException(getClass().getSimpleName() |
|
||||
+ " doesn't support getXAttrs"); |
|
||||
} |
|
||||
|
|
||||
/** |
|
||||
* Get all of the xattrs name/value pairs for a file or directory. |
|
||||
* Only those xattrs which the logged-in user has permissions to view |
|
||||
* are returned. |
|
||||
* <p> |
|
||||
* Refer to the HDFS extended attributes user documentation for details. |
|
||||
* |
|
||||
* @param path Path to get extended attributes |
|
||||
* @param names XAttr names. |
|
||||
* @return Map describing the XAttrs of the file or directory |
|
||||
* @throws IOException IO failure |
|
||||
* @throws UnsupportedOperationException if the operation is unsupported |
|
||||
* (default outcome). |
|
||||
*/ |
|
||||
@Override |
|
||||
public Map<String, byte[]> getXAttrs(Path path, List<String> names) |
|
||||
throws IOException { |
|
||||
throw new UnsupportedOperationException(getClass().getSimpleName() |
|
||||
+ " doesn't support getXAttrs"); |
|
||||
} |
|
||||
|
|
||||
/** |
|
||||
* Get all of the xattr names for a file or directory. |
|
||||
* Only those xattr names which the logged-in user has permissions to view |
|
||||
* are returned. |
|
||||
* <p> |
|
||||
* Refer to the HDFS extended attributes user documentation for details. |
|
||||
* |
|
||||
* @param path Path to get extended attributes |
|
||||
* @return List{@literal <String>} of the XAttr names of the file or directory |
|
||||
* @throws IOException IO failure |
|
||||
* @throws UnsupportedOperationException if the operation is unsupported |
|
||||
* (default outcome). |
|
||||
*/ |
|
||||
@Override |
|
||||
public List<String> listXAttrs(Path path) throws IOException { |
|
||||
throw new UnsupportedOperationException(getClass().getSimpleName() |
|
||||
+ " doesn't support listXAttrs"); |
|
||||
} |
|
||||
|
|
||||
/** |
|
||||
* Remove an xattr of a file or directory. |
|
||||
* The name must be prefixed with the namespace followed by ".". For example, |
|
||||
* "user.attr". |
|
||||
* <p> |
|
||||
* Refer to the HDFS extended attributes user documentation for details. |
|
||||
* |
|
||||
* @param path Path to remove extended attribute |
|
||||
* @param name xattr name |
|
||||
* @throws IOException IO failure |
|
||||
* @throws UnsupportedOperationException if the operation is unsupported |
|
||||
* (default outcome). |
|
||||
*/ |
|
||||
@Override |
|
||||
public void removeXAttr(Path path, String name) throws IOException { |
|
||||
throw new UnsupportedOperationException(getClass().getSimpleName() |
|
||||
+ " doesn't support removeXAttr"); |
|
||||
} |
|
||||
|
|
||||
} |
|
||||
@ -1,291 +0,0 @@ |
|||||
package seaweed.hdfs; |
|
||||
|
|
||||
import org.apache.hadoop.conf.Configuration; |
|
||||
import org.apache.hadoop.fs.FSInputStream; |
|
||||
import org.apache.hadoop.fs.FileStatus; |
|
||||
import org.apache.hadoop.fs.FileSystem; |
|
||||
import org.apache.hadoop.fs.Path; |
|
||||
import org.apache.hadoop.fs.permission.FsPermission; |
|
||||
import org.apache.hadoop.security.UserGroupInformation; |
|
||||
import org.slf4j.Logger; |
|
||||
import org.slf4j.LoggerFactory; |
|
||||
import seaweedfs.client.*; |
|
||||
|
|
||||
import java.io.FileNotFoundException; |
|
||||
import java.io.IOException; |
|
||||
import java.io.OutputStream; |
|
||||
import java.util.ArrayList; |
|
||||
import java.util.Arrays; |
|
||||
import java.util.List; |
|
||||
|
|
||||
import static seaweed.hdfs.SeaweedFileSystem.*; |
|
||||
|
|
||||
public class SeaweedFileSystemStore { |
|
||||
|
|
||||
private static final Logger LOG = LoggerFactory.getLogger(SeaweedFileSystemStore.class); |
|
||||
|
|
||||
private FilerClient filerClient; |
|
||||
private Configuration conf; |
|
||||
|
|
||||
public SeaweedFileSystemStore(String host, int port, int grpcPort, String cn, Configuration conf) { |
|
||||
filerClient = new FilerClient(host, port, grpcPort, cn); |
|
||||
this.conf = conf; |
|
||||
String volumeServerAccessMode = this.conf.get(FS_SEAWEED_VOLUME_SERVER_ACCESS, "direct"); |
|
||||
if (volumeServerAccessMode.equals("publicUrl")) { |
|
||||
filerClient.setAccessVolumeServerByPublicUrl(); |
|
||||
} else if (volumeServerAccessMode.equals("filerProxy")) { |
|
||||
filerClient.setAccessVolumeServerByFilerProxy(); |
|
||||
} |
|
||||
} |
|
||||
|
|
||||
public void close() { |
|
||||
try { |
|
||||
this.filerClient.shutdown(); |
|
||||
} catch (InterruptedException e) { |
|
||||
e.printStackTrace(); |
|
||||
} |
|
||||
} |
|
||||
|
|
||||
public static String getParentDirectory(Path path) { |
|
||||
return path.isRoot() ? "/" : path.getParent().toUri().getPath(); |
|
||||
} |
|
||||
|
|
||||
static int permissionToMode(FsPermission permission, boolean isDirectory) { |
|
||||
int p = permission.toShort(); |
|
||||
if (isDirectory) { |
|
||||
p = p | 1 << 31; |
|
||||
} |
|
||||
return p; |
|
||||
} |
|
||||
|
|
||||
public boolean createDirectory(final Path path, UserGroupInformation currentUser, |
|
||||
final FsPermission permission, final FsPermission umask) { |
|
||||
|
|
||||
LOG.debug("createDirectory path: {} permission: {} umask: {}", |
|
||||
path, |
|
||||
permission, |
|
||||
umask); |
|
||||
|
|
||||
return filerClient.mkdirs( |
|
||||
path.toUri().getPath(), |
|
||||
permissionToMode(permission, true), |
|
||||
currentUser.getUserName(), |
|
||||
currentUser.getGroupNames() |
|
||||
); |
|
||||
} |
|
||||
|
|
||||
public FileStatus[] listEntries(final Path path) throws IOException { |
|
||||
LOG.debug("listEntries path: {}", path); |
|
||||
|
|
||||
FileStatus pathStatus = getFileStatus(path); |
|
||||
|
|
||||
if (pathStatus == null) { |
|
||||
return new FileStatus[0]; |
|
||||
} |
|
||||
|
|
||||
if (!pathStatus.isDirectory()) { |
|
||||
return new FileStatus[]{pathStatus}; |
|
||||
} |
|
||||
|
|
||||
List<FileStatus> fileStatuses = new ArrayList<FileStatus>(); |
|
||||
|
|
||||
List<FilerProto.Entry> entries = filerClient.listEntries(path.toUri().getPath()); |
|
||||
|
|
||||
for (FilerProto.Entry entry : entries) { |
|
||||
|
|
||||
FileStatus fileStatus = doGetFileStatus(new Path(path, entry.getName()), entry); |
|
||||
|
|
||||
fileStatuses.add(fileStatus); |
|
||||
} |
|
||||
LOG.debug("listEntries path: {} size {}", fileStatuses, fileStatuses.size()); |
|
||||
return fileStatuses.toArray(new FileStatus[0]); |
|
||||
|
|
||||
} |
|
||||
|
|
||||
public FileStatus getFileStatus(final Path path) throws IOException { |
|
||||
|
|
||||
FilerProto.Entry entry = lookupEntry(path); |
|
||||
if (entry == null) { |
|
||||
throw new FileNotFoundException("File does not exist: " + path); |
|
||||
} |
|
||||
LOG.debug("doGetFileStatus path:{} entry:{}", path, entry); |
|
||||
|
|
||||
FileStatus fileStatus = doGetFileStatus(path, entry); |
|
||||
return fileStatus; |
|
||||
} |
|
||||
|
|
||||
public boolean deleteEntries(final Path path, boolean isDirectory, boolean recursive) { |
|
||||
LOG.debug("deleteEntries path: {} isDirectory {} recursive: {}", |
|
||||
path, |
|
||||
String.valueOf(isDirectory), |
|
||||
String.valueOf(recursive)); |
|
||||
|
|
||||
if (path.isRoot()) { |
|
||||
return true; |
|
||||
} |
|
||||
|
|
||||
if (recursive && isDirectory) { |
|
||||
List<FilerProto.Entry> entries = filerClient.listEntries(path.toUri().getPath()); |
|
||||
for (FilerProto.Entry entry : entries) { |
|
||||
deleteEntries(new Path(path, entry.getName()), entry.getIsDirectory(), true); |
|
||||
} |
|
||||
} |
|
||||
|
|
||||
return filerClient.deleteEntry(getParentDirectory(path), path.getName(), true, recursive, true); |
|
||||
} |
|
||||
|
|
||||
private FileStatus doGetFileStatus(Path path, FilerProto.Entry entry) { |
|
||||
FilerProto.FuseAttributes attributes = entry.getAttributes(); |
|
||||
long length = SeaweedRead.fileSize(entry); |
|
||||
boolean isDir = entry.getIsDirectory(); |
|
||||
int block_replication = 1; |
|
||||
int blocksize = this.conf.getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE); |
|
||||
long modification_time = attributes.getMtime() * 1000; // milliseconds |
|
||||
long access_time = 0; |
|
||||
FsPermission permission = FsPermission.createImmutable((short) attributes.getFileMode()); |
|
||||
String owner = attributes.getUserName(); |
|
||||
String group = attributes.getGroupNameCount() > 0 ? attributes.getGroupName(0) : ""; |
|
||||
return new FileStatus(length, isDir, block_replication, blocksize, |
|
||||
modification_time, access_time, permission, owner, group, null, path); |
|
||||
} |
|
||||
|
|
||||
public FilerProto.Entry lookupEntry(Path path) { |
|
||||
|
|
||||
return filerClient.lookupEntry(getParentDirectory(path), path.getName()); |
|
||||
|
|
||||
} |
|
||||
|
|
||||
public void rename(Path source, Path destination) { |
|
||||
|
|
||||
LOG.debug("rename source: {} destination:{}", source, destination); |
|
||||
|
|
||||
if (source.isRoot()) { |
|
||||
return; |
|
||||
} |
|
||||
LOG.info("rename source: {} destination:{}", source, destination); |
|
||||
FilerProto.Entry entry = lookupEntry(source); |
|
||||
if (entry == null) { |
|
||||
LOG.warn("rename non-existing source: {}", source); |
|
||||
return; |
|
||||
} |
|
||||
filerClient.mv(source.toUri().getPath(), destination.toUri().getPath()); |
|
||||
} |
|
||||
|
|
||||
public OutputStream createFile(final Path path, |
|
||||
final boolean overwrite, |
|
||||
FsPermission permission, |
|
||||
int bufferSize, |
|
||||
String replication) throws IOException { |
|
||||
|
|
||||
permission = permission == null ? FsPermission.getFileDefault() : permission; |
|
||||
|
|
||||
LOG.debug("createFile path: {} overwrite: {} permission: {}", |
|
||||
path, |
|
||||
overwrite, |
|
||||
permission.toString()); |
|
||||
|
|
||||
UserGroupInformation userGroupInformation = UserGroupInformation.getCurrentUser(); |
|
||||
long now = System.currentTimeMillis() / 1000L; |
|
||||
|
|
||||
FilerProto.Entry.Builder entry = null; |
|
||||
long writePosition = 0; |
|
||||
if (!overwrite) { |
|
||||
FilerProto.Entry existingEntry = lookupEntry(path); |
|
||||
LOG.debug("createFile merged entry path:{} existingEntry:{}", path, existingEntry); |
|
||||
if (existingEntry != null) { |
|
||||
entry = FilerProto.Entry.newBuilder(); |
|
||||
entry.mergeFrom(existingEntry); |
|
||||
entry.clearContent(); |
|
||||
entry.getAttributesBuilder().setMtime(now); |
|
||||
LOG.debug("createFile merged entry path:{} entry:{} from:{}", path, entry, existingEntry); |
|
||||
writePosition = SeaweedRead.fileSize(existingEntry); |
|
||||
} |
|
||||
} |
|
||||
if (entry == null) { |
|
||||
entry = FilerProto.Entry.newBuilder() |
|
||||
.setName(path.getName()) |
|
||||
.setIsDirectory(false) |
|
||||
.setAttributes(FilerProto.FuseAttributes.newBuilder() |
|
||||
.setFileMode(permissionToMode(permission, false)) |
|
||||
.setCrtime(now) |
|
||||
.setMtime(now) |
|
||||
.setUserName(userGroupInformation.getUserName()) |
|
||||
.clearGroupName() |
|
||||
.addAllGroupName(Arrays.asList(userGroupInformation.getGroupNames())) |
|
||||
); |
|
||||
SeaweedWrite.writeMeta(filerClient, getParentDirectory(path), entry); |
|
||||
} |
|
||||
|
|
||||
return new SeaweedHadoopOutputStream(filerClient, path.toString(), entry, writePosition, bufferSize, replication); |
|
||||
|
|
||||
} |
|
||||
|
|
||||
public FSInputStream openFileForRead(final Path path, FileSystem.Statistics statistics) throws IOException { |
|
||||
|
|
||||
LOG.debug("openFileForRead path:{}", path); |
|
||||
|
|
||||
FilerProto.Entry entry = lookupEntry(path); |
|
||||
|
|
||||
if (entry == null) { |
|
||||
throw new FileNotFoundException("read non-exist file " + path); |
|
||||
} |
|
||||
|
|
||||
return new SeaweedHadoopInputStream(filerClient, |
|
||||
statistics, |
|
||||
path.toUri().getPath(), |
|
||||
entry); |
|
||||
} |
|
||||
|
|
||||
public void setOwner(Path path, String owner, String group) { |
|
||||
|
|
||||
LOG.debug("setOwner path:{} owner:{} group:{}", path, owner, group); |
|
||||
|
|
||||
FilerProto.Entry entry = lookupEntry(path); |
|
||||
if (entry == null) { |
|
||||
LOG.debug("setOwner path:{} entry:{}", path, entry); |
|
||||
return; |
|
||||
} |
|
||||
|
|
||||
FilerProto.Entry.Builder entryBuilder = entry.toBuilder(); |
|
||||
FilerProto.FuseAttributes.Builder attributesBuilder = entry.getAttributes().toBuilder(); |
|
||||
|
|
||||
if (owner != null) { |
|
||||
attributesBuilder.setUserName(owner); |
|
||||
} |
|
||||
if (group != null) { |
|
||||
attributesBuilder.clearGroupName(); |
|
||||
attributesBuilder.addGroupName(group); |
|
||||
} |
|
||||
|
|
||||
entryBuilder.setAttributes(attributesBuilder); |
|
||||
|
|
||||
LOG.debug("setOwner path:{} entry:{}", path, entryBuilder); |
|
||||
|
|
||||
filerClient.updateEntry(getParentDirectory(path), entryBuilder.build()); |
|
||||
|
|
||||
} |
|
||||
|
|
||||
public void setPermission(Path path, FsPermission permission) { |
|
||||
|
|
||||
LOG.debug("setPermission path:{} permission:{}", path, permission); |
|
||||
|
|
||||
FilerProto.Entry entry = lookupEntry(path); |
|
||||
if (entry == null) { |
|
||||
LOG.debug("setPermission path:{} entry:{}", path, entry); |
|
||||
return; |
|
||||
} |
|
||||
|
|
||||
FilerProto.Entry.Builder entryBuilder = entry.toBuilder(); |
|
||||
FilerProto.FuseAttributes.Builder attributesBuilder = entry.getAttributes().toBuilder(); |
|
||||
|
|
||||
attributesBuilder.setFileMode(permissionToMode(permission, entry.getIsDirectory())); |
|
||||
|
|
||||
entryBuilder.setAttributes(attributesBuilder); |
|
||||
|
|
||||
LOG.debug("setPermission path:{} entry:{}", path, entryBuilder); |
|
||||
|
|
||||
filerClient.updateEntry(getParentDirectory(path), entryBuilder.build()); |
|
||||
|
|
||||
} |
|
||||
|
|
||||
} |
|
||||
@ -1,150 +0,0 @@ |
|||||
package seaweed.hdfs; |
|
||||
|
|
||||
// based on org.apache.hadoop.fs.azurebfs.services.AbfsInputStream |
|
||||
|
|
||||
import org.apache.hadoop.fs.ByteBufferReadable; |
|
||||
import org.apache.hadoop.fs.FSInputStream; |
|
||||
import org.apache.hadoop.fs.FileSystem.Statistics; |
|
||||
import seaweedfs.client.FilerClient; |
|
||||
import seaweedfs.client.FilerProto; |
|
||||
import seaweedfs.client.SeaweedInputStream; |
|
||||
|
|
||||
import java.io.EOFException; |
|
||||
import java.io.IOException; |
|
||||
import java.nio.ByteBuffer; |
|
||||
|
|
||||
public class SeaweedHadoopInputStream extends FSInputStream implements ByteBufferReadable { |
|
||||
|
|
||||
private final SeaweedInputStream seaweedInputStream; |
|
||||
private final Statistics statistics; |
|
||||
|
|
||||
public SeaweedHadoopInputStream( |
|
||||
final FilerClient filerClient, |
|
||||
final Statistics statistics, |
|
||||
final String path, |
|
||||
final FilerProto.Entry entry) throws IOException { |
|
||||
this.seaweedInputStream = new SeaweedInputStream(filerClient, path, entry); |
|
||||
this.statistics = statistics; |
|
||||
} |
|
||||
|
|
||||
@Override |
|
||||
public int read() throws IOException { |
|
||||
return seaweedInputStream.read(); |
|
||||
} |
|
||||
|
|
||||
@Override |
|
||||
public int read(final byte[] b, final int off, final int len) throws IOException { |
|
||||
return seaweedInputStream.read(b, off, len); |
|
||||
} |
|
||||
|
|
||||
// implement ByteBufferReadable |
|
||||
@Override |
|
||||
public synchronized int read(ByteBuffer buf) throws IOException { |
|
||||
int bytesRead = seaweedInputStream.read(buf); |
|
||||
|
|
||||
if (bytesRead > 0) { |
|
||||
if (statistics != null) { |
|
||||
statistics.incrementBytesRead(bytesRead); |
|
||||
} |
|
||||
} |
|
||||
|
|
||||
return bytesRead; |
|
||||
} |
|
||||
|
|
||||
/** |
|
||||
* Seek to given position in stream. |
|
||||
* |
|
||||
* @param n position to seek to |
|
||||
* @throws IOException if there is an error |
|
||||
* @throws EOFException if attempting to seek past end of file |
|
||||
*/ |
|
||||
@Override |
|
||||
public synchronized void seek(long n) throws IOException { |
|
||||
seaweedInputStream.seek(n); |
|
||||
} |
|
||||
|
|
||||
@Override |
|
||||
public synchronized long skip(long n) throws IOException { |
|
||||
return seaweedInputStream.skip(n); |
|
||||
} |
|
||||
|
|
||||
/** |
|
||||
* Return the size of the remaining available bytes |
|
||||
* if the size is less than or equal to {@link Integer#MAX_VALUE}, |
|
||||
* otherwise, return {@link Integer#MAX_VALUE}. |
|
||||
* <p> |
|
||||
* This is to match the behavior of DFSInputStream.available(), |
|
||||
* which some clients may rely on (HBase write-ahead log reading in |
|
||||
* particular). |
|
||||
*/ |
|
||||
@Override |
|
||||
public synchronized int available() throws IOException { |
|
||||
return seaweedInputStream.available(); |
|
||||
} |
|
||||
|
|
||||
/** |
|
||||
* Returns the length of the file that this stream refers to. Note that the length returned is the length |
|
||||
* as of the time the Stream was opened. Specifically, if there have been subsequent appends to the file, |
|
||||
* they wont be reflected in the returned length. |
|
||||
* |
|
||||
* @return length of the file. |
|
||||
* @throws IOException if the stream is closed |
|
||||
*/ |
|
||||
public long length() throws IOException { |
|
||||
return seaweedInputStream.length(); |
|
||||
} |
|
||||
|
|
||||
/** |
|
||||
* Return the current offset from the start of the file |
|
||||
* |
|
||||
* @throws IOException throws {@link IOException} if there is an error |
|
||||
*/ |
|
||||
@Override |
|
||||
public synchronized long getPos() throws IOException { |
|
||||
return seaweedInputStream.getPos(); |
|
||||
} |
|
||||
|
|
||||
/** |
|
||||
* Seeks a different copy of the data. Returns true if |
|
||||
* found a new source, false otherwise. |
|
||||
* |
|
||||
* @throws IOException throws {@link IOException} if there is an error |
|
||||
*/ |
|
||||
@Override |
|
||||
public boolean seekToNewSource(long l) throws IOException { |
|
||||
return false; |
|
||||
} |
|
||||
|
|
||||
@Override |
|
||||
public synchronized void close() throws IOException { |
|
||||
seaweedInputStream.close(); |
|
||||
} |
|
||||
|
|
||||
/** |
|
||||
* Not supported by this stream. Throws {@link UnsupportedOperationException} |
|
||||
* |
|
||||
* @param readlimit ignored |
|
||||
*/ |
|
||||
@Override |
|
||||
public synchronized void mark(int readlimit) { |
|
||||
throw new UnsupportedOperationException("mark()/reset() not supported on this stream"); |
|
||||
} |
|
||||
|
|
||||
/** |
|
||||
* Not supported by this stream. Throws {@link UnsupportedOperationException} |
|
||||
*/ |
|
||||
@Override |
|
||||
public synchronized void reset() throws IOException { |
|
||||
throw new UnsupportedOperationException("mark()/reset() not supported on this stream"); |
|
||||
} |
|
||||
|
|
||||
/** |
|
||||
* gets whether mark and reset are supported by {@code ADLFileInputStream}. Always returns false. |
|
||||
* |
|
||||
* @return always {@code false} |
|
||||
*/ |
|
||||
@Override |
|
||||
public boolean markSupported() { |
|
||||
return false; |
|
||||
} |
|
||||
} |
|
||||
@ -1,16 +0,0 @@ |
|||||
package seaweed.hdfs; |
|
||||
|
|
||||
// adapted from org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream |
|
||||
|
|
||||
import seaweedfs.client.FilerClient; |
|
||||
import seaweedfs.client.FilerProto; |
|
||||
import seaweedfs.client.SeaweedOutputStream; |
|
||||
|
|
||||
public class SeaweedHadoopOutputStream extends SeaweedOutputStream { |
|
||||
|
|
||||
public SeaweedHadoopOutputStream(FilerClient filerClient, final String path, FilerProto.Entry.Builder entry, |
|
||||
final long position, final int bufferSize, final String replication) { |
|
||||
super(filerClient, path, entry, position, bufferSize, replication); |
|
||||
} |
|
||||
|
|
||||
} |
|
||||
@ -1,90 +0,0 @@ |
|||||
package seaweed.hdfs; |
|
||||
|
|
||||
import org.apache.hadoop.conf.Configuration; |
|
||||
import org.apache.hadoop.fs.Path; |
|
||||
import org.junit.Before; |
|
||||
import org.junit.Test; |
|
||||
|
|
||||
import static org.junit.Assert.*; |
|
||||
|
|
||||
/** |
|
||||
* Unit tests for SeaweedFileSystem configuration that don't require a running SeaweedFS instance. |
|
||||
* |
|
||||
* These tests verify basic properties and constants. |
|
||||
*/ |
|
||||
public class SeaweedFileSystemConfigTest { |
|
||||
|
|
||||
private SeaweedFileSystem fs; |
|
||||
private Configuration conf; |
|
||||
|
|
||||
@Before |
|
||||
public void setUp() { |
|
||||
fs = new SeaweedFileSystem(); |
|
||||
conf = new Configuration(); |
|
||||
} |
|
||||
|
|
||||
@Test |
|
||||
public void testScheme() { |
|
||||
assertEquals("seaweedfs", fs.getScheme()); |
|
||||
} |
|
||||
|
|
||||
@Test |
|
||||
public void testConstants() { |
|
||||
// Test that constants are defined correctly |
|
||||
assertEquals("fs.seaweed.filer.host", SeaweedFileSystem.FS_SEAWEED_FILER_HOST); |
|
||||
assertEquals("fs.seaweed.filer.port", SeaweedFileSystem.FS_SEAWEED_FILER_PORT); |
|
||||
assertEquals("fs.seaweed.filer.port.grpc", SeaweedFileSystem.FS_SEAWEED_FILER_PORT_GRPC); |
|
||||
assertEquals(8888, SeaweedFileSystem.FS_SEAWEED_DEFAULT_PORT); |
|
||||
assertEquals("fs.seaweed.buffer.size", SeaweedFileSystem.FS_SEAWEED_BUFFER_SIZE); |
|
||||
assertEquals(4 * 1024 * 1024, SeaweedFileSystem.FS_SEAWEED_DEFAULT_BUFFER_SIZE); |
|
||||
assertEquals("fs.seaweed.replication", SeaweedFileSystem.FS_SEAWEED_REPLICATION); |
|
||||
assertEquals("fs.seaweed.volume.server.access", SeaweedFileSystem.FS_SEAWEED_VOLUME_SERVER_ACCESS); |
|
||||
assertEquals("fs.seaweed.filer.cn", SeaweedFileSystem.FS_SEAWEED_FILER_CN); |
|
||||
} |
|
||||
|
|
||||
@Test |
|
||||
public void testWorkingDirectoryPathOperations() { |
|
||||
// Test path operations that don't require initialization |
|
||||
Path testPath = new Path("/test/path"); |
|
||||
assertTrue("Path should be absolute", testPath.isAbsolute()); |
|
||||
assertEquals("/test/path", testPath.toUri().getPath()); |
|
||||
|
|
||||
Path childPath = new Path(testPath, "child"); |
|
||||
assertEquals("/test/path/child", childPath.toUri().getPath()); |
|
||||
} |
|
||||
|
|
||||
@Test |
|
||||
public void testConfigurationProperties() { |
|
||||
// Test that configuration can be set and read |
|
||||
conf.set(SeaweedFileSystem.FS_SEAWEED_FILER_HOST, "testhost"); |
|
||||
assertEquals("testhost", conf.get(SeaweedFileSystem.FS_SEAWEED_FILER_HOST)); |
|
||||
|
|
||||
conf.setInt(SeaweedFileSystem.FS_SEAWEED_FILER_PORT, 9999); |
|
||||
assertEquals(9999, conf.getInt(SeaweedFileSystem.FS_SEAWEED_FILER_PORT, 0)); |
|
||||
|
|
||||
conf.setInt(SeaweedFileSystem.FS_SEAWEED_BUFFER_SIZE, 8 * 1024 * 1024); |
|
||||
assertEquals(8 * 1024 * 1024, conf.getInt(SeaweedFileSystem.FS_SEAWEED_BUFFER_SIZE, 0)); |
|
||||
|
|
||||
conf.set(SeaweedFileSystem.FS_SEAWEED_REPLICATION, "001"); |
|
||||
assertEquals("001", conf.get(SeaweedFileSystem.FS_SEAWEED_REPLICATION)); |
|
||||
|
|
||||
conf.set(SeaweedFileSystem.FS_SEAWEED_VOLUME_SERVER_ACCESS, "publicUrl"); |
|
||||
assertEquals("publicUrl", conf.get(SeaweedFileSystem.FS_SEAWEED_VOLUME_SERVER_ACCESS)); |
|
||||
|
|
||||
conf.set(SeaweedFileSystem.FS_SEAWEED_FILER_CN, "test-cn"); |
|
||||
assertEquals("test-cn", conf.get(SeaweedFileSystem.FS_SEAWEED_FILER_CN)); |
|
||||
} |
|
||||
|
|
||||
@Test |
|
||||
public void testDefaultBufferSize() { |
|
||||
// Test default buffer size constant |
|
||||
int expected = 4 * 1024 * 1024; // 4MB |
|
||||
assertEquals(expected, SeaweedFileSystem.FS_SEAWEED_DEFAULT_BUFFER_SIZE); |
|
||||
} |
|
||||
|
|
||||
@Test |
|
||||
public void testDefaultPort() { |
|
||||
// Test default port constant |
|
||||
assertEquals(8888, SeaweedFileSystem.FS_SEAWEED_DEFAULT_PORT); |
|
||||
} |
|
||||
} |
|
||||
@ -1,379 +0,0 @@ |
|||||
package seaweed.hdfs; |
|
||||
|
|
||||
import org.apache.hadoop.conf.Configuration; |
|
||||
import org.apache.hadoop.fs.FSDataInputStream; |
|
||||
import org.apache.hadoop.fs.FSDataOutputStream; |
|
||||
import org.apache.hadoop.fs.FileStatus; |
|
||||
import org.apache.hadoop.fs.Path; |
|
||||
import org.apache.hadoop.fs.permission.FsPermission; |
|
||||
import org.junit.After; |
|
||||
import org.junit.Before; |
|
||||
import org.junit.Test; |
|
||||
|
|
||||
import java.io.IOException; |
|
||||
import java.net.URI; |
|
||||
|
|
||||
import static org.junit.Assert.*; |
|
||||
|
|
||||
/** |
|
||||
* Unit tests for SeaweedFileSystem. |
|
||||
* |
|
||||
* These tests verify basic FileSystem operations against a SeaweedFS backend. |
|
||||
* Note: These tests require a running SeaweedFS filer instance. |
|
||||
* |
|
||||
* To run tests, ensure SeaweedFS is running with default ports: |
|
||||
* - Filer HTTP: 8888 |
|
||||
* - Filer gRPC: 18888 |
|
||||
* |
|
||||
* Set environment variable SEAWEEDFS_TEST_ENABLED=true to enable these tests. |
|
||||
*/ |
|
||||
public class SeaweedFileSystemTest { |
|
||||
|
|
||||
private SeaweedFileSystem fs; |
|
||||
private Configuration conf; |
|
||||
private static final String TEST_ROOT = "/test-hdfs2"; |
|
||||
private static final boolean TESTS_ENABLED = |
|
||||
"true".equalsIgnoreCase(System.getenv("SEAWEEDFS_TEST_ENABLED")); |
|
||||
|
|
||||
@Before |
|
||||
public void setUp() throws Exception { |
|
||||
if (!TESTS_ENABLED) { |
|
||||
return; |
|
||||
} |
|
||||
|
|
||||
conf = new Configuration(); |
|
||||
conf.set("fs.seaweed.filer.host", "localhost"); |
|
||||
conf.setInt("fs.seaweed.filer.port", 8888); |
|
||||
conf.setInt("fs.seaweed.filer.port.grpc", 18888); |
|
||||
|
|
||||
fs = new SeaweedFileSystem(); |
|
||||
URI uri = new URI("seaweedfs://localhost:8888/"); |
|
||||
fs.initialize(uri, conf); |
|
||||
|
|
||||
// Clean up any existing test directory |
|
||||
Path testPath = new Path(TEST_ROOT); |
|
||||
if (fs.exists(testPath)) { |
|
||||
fs.delete(testPath, true); |
|
||||
} |
|
||||
} |
|
||||
|
|
||||
@After |
|
||||
public void tearDown() throws Exception { |
|
||||
if (!TESTS_ENABLED || fs == null) { |
|
||||
return; |
|
||||
} |
|
||||
|
|
||||
// Clean up test directory |
|
||||
Path testPath = new Path(TEST_ROOT); |
|
||||
if (fs.exists(testPath)) { |
|
||||
fs.delete(testPath, true); |
|
||||
} |
|
||||
|
|
||||
fs.close(); |
|
||||
} |
|
||||
|
|
||||
@Test |
|
||||
public void testInitialization() throws Exception { |
|
||||
if (!TESTS_ENABLED) { |
|
||||
System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set"); |
|
||||
return; |
|
||||
} |
|
||||
|
|
||||
assertNotNull(fs); |
|
||||
assertEquals("seaweedfs", fs.getScheme()); |
|
||||
assertNotNull(fs.getUri()); |
|
||||
assertEquals("/", fs.getWorkingDirectory().toUri().getPath()); |
|
||||
} |
|
||||
|
|
||||
@Test |
|
||||
public void testMkdirs() throws Exception { |
|
||||
if (!TESTS_ENABLED) { |
|
||||
System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set"); |
|
||||
return; |
|
||||
} |
|
||||
|
|
||||
Path testDir = new Path(TEST_ROOT + "/testdir"); |
|
||||
assertTrue("Failed to create directory", fs.mkdirs(testDir)); |
|
||||
assertTrue("Directory should exist", fs.exists(testDir)); |
|
||||
|
|
||||
FileStatus status = fs.getFileStatus(testDir); |
|
||||
assertTrue("Path should be a directory", status.isDirectory()); |
|
||||
} |
|
||||
|
|
||||
@Test |
|
||||
public void testCreateAndReadFile() throws Exception { |
|
||||
if (!TESTS_ENABLED) { |
|
||||
System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set"); |
|
||||
return; |
|
||||
} |
|
||||
|
|
||||
Path testFile = new Path(TEST_ROOT + "/testfile.txt"); |
|
||||
String testContent = "Hello, SeaweedFS!"; |
|
||||
|
|
||||
// Create and write to file |
|
||||
FSDataOutputStream out = fs.create(testFile, FsPermission.getDefault(), |
|
||||
false, 4096, (short) 1, 4 * 1024 * 1024, null); |
|
||||
assertNotNull("Output stream should not be null", out); |
|
||||
out.write(testContent.getBytes()); |
|
||||
out.close(); |
|
||||
|
|
||||
// Verify file exists |
|
||||
assertTrue("File should exist", fs.exists(testFile)); |
|
||||
|
|
||||
// Read and verify content |
|
||||
FSDataInputStream in = fs.open(testFile, 4096); |
|
||||
assertNotNull("Input stream should not be null", in); |
|
||||
byte[] buffer = new byte[testContent.length()]; |
|
||||
int bytesRead = in.read(buffer); |
|
||||
in.close(); |
|
||||
|
|
||||
assertEquals("Should read all bytes", testContent.length(), bytesRead); |
|
||||
assertEquals("Content should match", testContent, new String(buffer)); |
|
||||
} |
|
||||
|
|
||||
@Test |
|
||||
public void testFileStatus() throws Exception { |
|
||||
if (!TESTS_ENABLED) { |
|
||||
System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set"); |
|
||||
return; |
|
||||
} |
|
||||
|
|
||||
Path testFile = new Path(TEST_ROOT + "/statustest.txt"); |
|
||||
String content = "test content"; |
|
||||
|
|
||||
FSDataOutputStream out = fs.create(testFile); |
|
||||
out.write(content.getBytes()); |
|
||||
out.close(); |
|
||||
|
|
||||
FileStatus status = fs.getFileStatus(testFile); |
|
||||
assertNotNull("FileStatus should not be null", status); |
|
||||
assertFalse("Should not be a directory", status.isDirectory()); |
|
||||
assertTrue("Should be a file", status.isFile()); |
|
||||
assertEquals("File length should match", content.length(), status.getLen()); |
|
||||
assertNotNull("Path should not be null", status.getPath()); |
|
||||
} |
|
||||
|
|
||||
@Test |
|
||||
public void testListStatus() throws Exception { |
|
||||
if (!TESTS_ENABLED) { |
|
||||
System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set"); |
|
||||
return; |
|
||||
} |
|
||||
|
|
||||
Path testDir = new Path(TEST_ROOT + "/listtest"); |
|
||||
fs.mkdirs(testDir); |
|
||||
|
|
||||
// Create multiple files |
|
||||
for (int i = 0; i < 3; i++) { |
|
||||
Path file = new Path(testDir, "file" + i + ".txt"); |
|
||||
FSDataOutputStream out = fs.create(file); |
|
||||
out.write(("content" + i).getBytes()); |
|
||||
out.close(); |
|
||||
} |
|
||||
|
|
||||
FileStatus[] statuses = fs.listStatus(testDir); |
|
||||
assertNotNull("List should not be null", statuses); |
|
||||
assertEquals("Should have 3 files", 3, statuses.length); |
|
||||
} |
|
||||
|
|
||||
@Test |
|
||||
public void testRename() throws Exception { |
|
||||
if (!TESTS_ENABLED) { |
|
||||
System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set"); |
|
||||
return; |
|
||||
} |
|
||||
|
|
||||
Path srcFile = new Path(TEST_ROOT + "/source.txt"); |
|
||||
Path dstFile = new Path(TEST_ROOT + "/destination.txt"); |
|
||||
String content = "rename test"; |
|
||||
|
|
||||
// Create source file |
|
||||
FSDataOutputStream out = fs.create(srcFile); |
|
||||
out.write(content.getBytes()); |
|
||||
out.close(); |
|
||||
|
|
||||
assertTrue("Source file should exist", fs.exists(srcFile)); |
|
||||
|
|
||||
// Rename |
|
||||
assertTrue("Rename should succeed", fs.rename(srcFile, dstFile)); |
|
||||
|
|
||||
// Verify |
|
||||
assertFalse("Source file should not exist", fs.exists(srcFile)); |
|
||||
assertTrue("Destination file should exist", fs.exists(dstFile)); |
|
||||
|
|
||||
// Verify content preserved |
|
||||
FSDataInputStream in = fs.open(dstFile); |
|
||||
byte[] buffer = new byte[content.length()]; |
|
||||
in.read(buffer); |
|
||||
in.close(); |
|
||||
assertEquals("Content should be preserved", content, new String(buffer)); |
|
||||
} |
|
||||
|
|
||||
@Test |
|
||||
public void testDelete() throws Exception { |
|
||||
if (!TESTS_ENABLED) { |
|
||||
System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set"); |
|
||||
return; |
|
||||
} |
|
||||
|
|
||||
Path testFile = new Path(TEST_ROOT + "/deletetest.txt"); |
|
||||
|
|
||||
// Create file |
|
||||
FSDataOutputStream out = fs.create(testFile); |
|
||||
out.write("delete me".getBytes()); |
|
||||
out.close(); |
|
||||
|
|
||||
assertTrue("File should exist before delete", fs.exists(testFile)); |
|
||||
|
|
||||
// Delete |
|
||||
assertTrue("Delete should succeed", fs.delete(testFile, false)); |
|
||||
assertFalse("File should not exist after delete", fs.exists(testFile)); |
|
||||
} |
|
||||
|
|
||||
@Test |
|
||||
public void testDeleteDirectory() throws Exception { |
|
||||
if (!TESTS_ENABLED) { |
|
||||
System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set"); |
|
||||
return; |
|
||||
} |
|
||||
|
|
||||
Path testDir = new Path(TEST_ROOT + "/deletedir"); |
|
||||
Path testFile = new Path(testDir, "file.txt"); |
|
||||
|
|
||||
// Create directory with file |
|
||||
fs.mkdirs(testDir); |
|
||||
FSDataOutputStream out = fs.create(testFile); |
|
||||
out.write("content".getBytes()); |
|
||||
out.close(); |
|
||||
|
|
||||
assertTrue("Directory should exist", fs.exists(testDir)); |
|
||||
assertTrue("File should exist", fs.exists(testFile)); |
|
||||
|
|
||||
// Recursive delete |
|
||||
assertTrue("Recursive delete should succeed", fs.delete(testDir, true)); |
|
||||
assertFalse("Directory should not exist after delete", fs.exists(testDir)); |
|
||||
assertFalse("File should not exist after delete", fs.exists(testFile)); |
|
||||
} |
|
||||
|
|
||||
@Test |
|
||||
public void testAppend() throws Exception { |
|
||||
if (!TESTS_ENABLED) { |
|
||||
System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set"); |
|
||||
return; |
|
||||
} |
|
||||
|
|
||||
Path testFile = new Path(TEST_ROOT + "/appendtest.txt"); |
|
||||
String initialContent = "initial"; |
|
||||
String appendContent = " appended"; |
|
||||
|
|
||||
// Create initial file |
|
||||
FSDataOutputStream out = fs.create(testFile); |
|
||||
out.write(initialContent.getBytes()); |
|
||||
out.close(); |
|
||||
|
|
||||
// Append |
|
||||
FSDataOutputStream appendOut = fs.append(testFile, 4096, null); |
|
||||
assertNotNull("Append stream should not be null", appendOut); |
|
||||
appendOut.write(appendContent.getBytes()); |
|
||||
appendOut.close(); |
|
||||
|
|
||||
// Verify combined content |
|
||||
FSDataInputStream in = fs.open(testFile); |
|
||||
byte[] buffer = new byte[initialContent.length() + appendContent.length()]; |
|
||||
int bytesRead = in.read(buffer); |
|
||||
in.close(); |
|
||||
|
|
||||
String expected = initialContent + appendContent; |
|
||||
assertEquals("Should read all bytes", expected.length(), bytesRead); |
|
||||
assertEquals("Content should match", expected, new String(buffer)); |
|
||||
} |
|
||||
|
|
||||
@Test |
|
||||
public void testSetWorkingDirectory() throws Exception { |
|
||||
if (!TESTS_ENABLED) { |
|
||||
System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set"); |
|
||||
return; |
|
||||
} |
|
||||
|
|
||||
Path originalWd = fs.getWorkingDirectory(); |
|
||||
assertEquals("Original working directory should be /", "/", originalWd.toUri().getPath()); |
|
||||
|
|
||||
Path newWd = new Path(TEST_ROOT); |
|
||||
fs.mkdirs(newWd); |
|
||||
fs.setWorkingDirectory(newWd); |
|
||||
|
|
||||
Path currentWd = fs.getWorkingDirectory(); |
|
||||
assertTrue("Working directory should be updated", |
|
||||
currentWd.toUri().getPath().contains(TEST_ROOT)); |
|
||||
} |
|
||||
|
|
||||
@Test |
|
||||
public void testSetPermission() throws Exception { |
|
||||
if (!TESTS_ENABLED) { |
|
||||
System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set"); |
|
||||
return; |
|
||||
} |
|
||||
|
|
||||
Path testFile = new Path(TEST_ROOT + "/permtest.txt"); |
|
||||
|
|
||||
// Create file |
|
||||
FSDataOutputStream out = fs.create(testFile); |
|
||||
out.write("permission test".getBytes()); |
|
||||
out.close(); |
|
||||
|
|
||||
// Set permission |
|
||||
FsPermission newPerm = new FsPermission((short) 0644); |
|
||||
fs.setPermission(testFile, newPerm); |
|
||||
|
|
||||
FileStatus status = fs.getFileStatus(testFile); |
|
||||
assertNotNull("Permission should not be null", status.getPermission()); |
|
||||
} |
|
||||
|
|
||||
@Test |
|
||||
public void testSetOwner() throws Exception { |
|
||||
if (!TESTS_ENABLED) { |
|
||||
System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set"); |
|
||||
return; |
|
||||
} |
|
||||
|
|
||||
Path testFile = new Path(TEST_ROOT + "/ownertest.txt"); |
|
||||
|
|
||||
// Create file |
|
||||
FSDataOutputStream out = fs.create(testFile); |
|
||||
out.write("owner test".getBytes()); |
|
||||
out.close(); |
|
||||
|
|
||||
// Set owner - this may not fail even if not fully implemented |
|
||||
fs.setOwner(testFile, "testuser", "testgroup"); |
|
||||
|
|
||||
// Just verify the call doesn't throw an exception |
|
||||
FileStatus status = fs.getFileStatus(testFile); |
|
||||
assertNotNull("FileStatus should not be null", status); |
|
||||
} |
|
||||
|
|
||||
@Test |
|
||||
public void testRenameToExistingDirectory() throws Exception { |
|
||||
if (!TESTS_ENABLED) { |
|
||||
System.out.println("Skipping test - SEAWEEDFS_TEST_ENABLED not set"); |
|
||||
return; |
|
||||
} |
|
||||
|
|
||||
Path srcFile = new Path(TEST_ROOT + "/movefile.txt"); |
|
||||
Path dstDir = new Path(TEST_ROOT + "/movedir"); |
|
||||
|
|
||||
// Create source file and destination directory |
|
||||
FSDataOutputStream out = fs.create(srcFile); |
|
||||
out.write("move test".getBytes()); |
|
||||
out.close(); |
|
||||
fs.mkdirs(dstDir); |
|
||||
|
|
||||
// Rename file to existing directory (should move file into directory) |
|
||||
assertTrue("Rename to directory should succeed", fs.rename(srcFile, dstDir)); |
|
||||
|
|
||||
// File should be moved into the directory |
|
||||
Path expectedLocation = new Path(dstDir, srcFile.getName()); |
|
||||
assertTrue("File should exist in destination directory", fs.exists(expectedLocation)); |
|
||||
assertFalse("Source file should not exist", fs.exists(srcFile)); |
|
||||
} |
|
||||
} |
|
||||
|
|
||||
@ -1,109 +0,0 @@ |
|||||
package seaweed.hdfs; |
|
||||
|
|
||||
import org.apache.hadoop.fs.Syncable; |
|
||||
import org.slf4j.Logger; |
|
||||
import org.slf4j.LoggerFactory; |
|
||||
import seaweedfs.client.FilerClient; |
|
||||
import seaweedfs.client.FilerProto; |
|
||||
|
|
||||
import java.io.ByteArrayOutputStream; |
|
||||
import java.io.IOException; |
|
||||
|
|
||||
/** |
|
||||
* Atomic output stream for Parquet files. |
|
||||
* |
|
||||
* Buffers all writes in memory and writes atomically on close(). |
|
||||
* This ensures that getPos() always returns accurate positions that match |
|
||||
* the final file layout, which is required for Parquet's footer metadata. |
|
||||
*/ |
|
||||
public class SeaweedAtomicOutputStream extends SeaweedHadoopOutputStream implements Syncable { |
|
||||
|
|
||||
private static final Logger LOG = LoggerFactory.getLogger(SeaweedAtomicOutputStream.class); |
|
||||
|
|
||||
private final ByteArrayOutputStream memoryBuffer; |
|
||||
private final String filePath; |
|
||||
private boolean closed = false; |
|
||||
|
|
||||
public SeaweedAtomicOutputStream(FilerClient filerClient, String path, FilerProto.Entry.Builder entry, |
|
||||
long position, int maxBufferSize, String replication) { |
|
||||
super(filerClient, path, entry, position, maxBufferSize, replication); |
|
||||
this.filePath = path; |
|
||||
this.memoryBuffer = new ByteArrayOutputStream(maxBufferSize); |
|
||||
LOG.info("[ATOMIC] Created atomic output stream for: {} (maxBuffer={})", path, maxBufferSize); |
|
||||
} |
|
||||
|
|
||||
@Override |
|
||||
public synchronized void write(int b) throws IOException { |
|
||||
if (closed) { |
|
||||
throw new IOException("Stream is closed"); |
|
||||
} |
|
||||
memoryBuffer.write(b); |
|
||||
} |
|
||||
|
|
||||
@Override |
|
||||
public synchronized void write(byte[] b, int off, int len) throws IOException { |
|
||||
if (closed) { |
|
||||
throw new IOException("Stream is closed"); |
|
||||
} |
|
||||
memoryBuffer.write(b, off, len); |
|
||||
} |
|
||||
|
|
||||
@Override |
|
||||
public synchronized long getPos() throws IOException { |
|
||||
// Return the current size of the memory buffer |
|
||||
// This is always accurate since nothing is flushed until close() |
|
||||
long pos = memoryBuffer.size(); |
|
||||
|
|
||||
// Log getPos() calls around the problematic positions |
|
||||
if (pos >= 470 && pos <= 476) { |
|
||||
LOG.error("[ATOMIC-GETPOS] getPos() returning pos={}", pos); |
|
||||
} |
|
||||
|
|
||||
return pos; |
|
||||
} |
|
||||
|
|
||||
@Override |
|
||||
public synchronized void flush() throws IOException { |
|
||||
// No-op for atomic writes - everything is flushed on close() |
|
||||
LOG.debug("[ATOMIC] flush() called (no-op for atomic writes)"); |
|
||||
} |
|
||||
|
|
||||
@Override |
|
||||
public synchronized void hsync() throws IOException { |
|
||||
// No-op for atomic writes |
|
||||
LOG.debug("[ATOMIC] hsync() called (no-op for atomic writes)"); |
|
||||
} |
|
||||
|
|
||||
@Override |
|
||||
public synchronized void hflush() throws IOException { |
|
||||
// No-op for atomic writes |
|
||||
LOG.debug("[ATOMIC] hflush() called (no-op for atomic writes)"); |
|
||||
} |
|
||||
|
|
||||
@Override |
|
||||
public synchronized void close() throws IOException { |
|
||||
if (closed) { |
|
||||
return; |
|
||||
} |
|
||||
|
|
||||
try { |
|
||||
byte[] data = memoryBuffer.toByteArray(); |
|
||||
int size = data.length; |
|
||||
|
|
||||
LOG.info("[ATOMIC] Closing atomic stream: {} ({} bytes buffered)", filePath, size); |
|
||||
|
|
||||
if (size > 0) { |
|
||||
// Write all data at once using the parent's write method |
|
||||
super.write(data, 0, size); |
|
||||
} |
|
||||
|
|
||||
// Now close the parent stream which will flush and write metadata |
|
||||
super.close(); |
|
||||
|
|
||||
LOG.info("[ATOMIC] Successfully wrote {} bytes atomically to: {}", size, filePath); |
|
||||
} finally { |
|
||||
closed = true; |
|
||||
memoryBuffer.reset(); |
|
||||
} |
|
||||
} |
|
||||
} |
|
||||
@ -1,132 +0,0 @@ |
|||||
# Fix Parquet EOF Error by Removing ByteBufferReadable Interface |
|
||||
|
|
||||
## Summary |
|
||||
|
|
||||
Fixed `EOFException: Reached the end of stream. Still have: 78 bytes left` error when reading Parquet files with complex schemas in Spark. |
|
||||
|
|
||||
## Root Cause |
|
||||
|
|
||||
`SeaweedHadoopInputStream` declared it implemented `ByteBufferReadable` interface but didn't properly implement it, causing incorrect buffering strategy and position tracking issues during positioned reads (critical for Parquet). |
|
||||
|
|
||||
## Solution |
|
||||
|
|
||||
Removed `ByteBufferReadable` interface from `SeaweedHadoopInputStream` to match Hadoop's `RawLocalFileSystem` pattern, which uses `BufferedFSInputStream` for proper position tracking. |
|
||||
|
|
||||
## Changes |
|
||||
|
|
||||
### Core Fix |
|
||||
|
|
||||
1. **`SeaweedHadoopInputStream.java`**: |
|
||||
- Removed `ByteBufferReadable` interface |
|
||||
- Removed `read(ByteBuffer)` method |
|
||||
- Cleaned up debug logging |
|
||||
- Added documentation explaining the design choice |
|
||||
|
|
||||
2. **`SeaweedFileSystem.java`**: |
|
||||
- Changed from `BufferedByteBufferReadableInputStream` to `BufferedFSInputStream` |
|
||||
- Applies to all streams uniformly |
|
||||
- Cleaned up debug logging |
|
||||
|
|
||||
3. **`SeaweedInputStream.java`**: |
|
||||
- Cleaned up debug logging |
|
||||
|
|
||||
### Cleanup |
|
||||
|
|
||||
4. **Deleted debug-only files**: |
|
||||
- `DebugDualInputStream.java` |
|
||||
- `DebugDualInputStreamWrapper.java` |
|
||||
- `DebugDualOutputStream.java` |
|
||||
- `DebugMode.java` |
|
||||
- `LocalOnlyInputStream.java` |
|
||||
- `ShadowComparisonStream.java` |
|
||||
|
|
||||
5. **Reverted**: |
|
||||
- `SeaweedFileSystemStore.java` (removed all debug mode logic) |
|
||||
|
|
||||
6. **Cleaned**: |
|
||||
- `docker-compose.yml` (removed debug environment variables) |
|
||||
- All `.md` documentation files in `test/java/spark/` |
|
||||
|
|
||||
## Testing |
|
||||
|
|
||||
All Spark integration tests pass: |
|
||||
- ✅ `SparkSQLTest.testCreateTableAndQuery` (complex 4-column schema) |
|
||||
- ✅ `SimpleOneColumnTest` (basic operations) |
|
||||
- ✅ All other Spark integration tests |
|
||||
|
|
||||
## Technical Details |
|
||||
|
|
||||
### Why This Works |
|
||||
|
|
||||
Hadoop's `RawLocalFileSystem` uses the exact same pattern: |
|
||||
- Does NOT implement `ByteBufferReadable` |
|
||||
- Uses `BufferedFSInputStream` for buffering |
|
||||
- Properly handles positioned reads with automatic position restoration |
|
||||
|
|
||||
### Position Tracking |
|
||||
|
|
||||
`BufferedFSInputStream` implements positioned reads correctly: |
|
||||
```java |
|
||||
public int read(long position, byte[] buffer, int offset, int length) { |
|
||||
long oldPos = getPos(); |
|
||||
try { |
|
||||
seek(position); |
|
||||
return read(buffer, offset, length); |
|
||||
} finally { |
|
||||
seek(oldPos); // Restores position! |
|
||||
} |
|
||||
} |
|
||||
``` |
|
||||
|
|
||||
This ensures buffered reads don't permanently change the stream position, which is critical for Parquet's random access pattern. |
|
||||
|
|
||||
### Performance Impact |
|
||||
|
|
||||
Minimal to none: |
|
||||
- Network latency dominates for remote storage |
|
||||
- Buffering is still active (4x buffer size) |
|
||||
- Extra byte[] copy is negligible compared to network I/O |
|
||||
|
|
||||
## Commit Message |
|
||||
|
|
||||
``` |
|
||||
Fix Parquet EOF error by removing ByteBufferReadable interface |
|
||||
|
|
||||
SeaweedHadoopInputStream incorrectly declared ByteBufferReadable interface |
|
||||
without proper implementation, causing position tracking issues during |
|
||||
positioned reads. This resulted in "78 bytes left" EOF errors when reading |
|
||||
Parquet files with complex schemas in Spark. |
|
||||
|
|
||||
Solution: Remove ByteBufferReadable and use BufferedFSInputStream (matching |
|
||||
Hadoop's RawLocalFileSystem pattern) which properly handles position |
|
||||
restoration for positioned reads. |
|
||||
|
|
||||
Changes: |
|
||||
- Remove ByteBufferReadable interface from SeaweedHadoopInputStream |
|
||||
- Change SeaweedFileSystem to use BufferedFSInputStream for all streams |
|
||||
- Clean up debug logging |
|
||||
- Delete debug-only classes and files |
|
||||
|
|
||||
Tested: All Spark integration tests pass |
|
||||
``` |
|
||||
|
|
||||
## Files Changed |
|
||||
|
|
||||
### Modified |
|
||||
- `other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedHadoopInputStream.java` |
|
||||
- `other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java` |
|
||||
- `other/java/client/src/main/java/seaweedfs/client/SeaweedInputStream.java` |
|
||||
- `test/java/spark/docker-compose.yml` |
|
||||
|
|
||||
### Reverted |
|
||||
- `other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java` |
|
||||
|
|
||||
### Deleted |
|
||||
- `other/java/hdfs3/src/main/java/seaweed/hdfs/DebugDualInputStream.java` |
|
||||
- `other/java/hdfs3/src/main/java/seaweed/hdfs/DebugDualInputStreamWrapper.java` |
|
||||
- `other/java/hdfs3/src/main/java/seaweed/hdfs/DebugDualOutputStream.java` |
|
||||
- `other/java/hdfs3/src/main/java/seaweed/hdfs/DebugMode.java` |
|
||||
- `other/java/hdfs3/src/main/java/seaweed/hdfs/LocalOnlyInputStream.java` |
|
||||
- `other/java/hdfs3/src/main/java/seaweed/hdfs/ShadowComparisonStream.java` |
|
||||
- All `.md` files in `test/java/spark/` (debug documentation) |
|
||||
|
|
||||
Write
Preview
Loading…
Cancel
Save
Reference in new issue