diff --git a/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java b/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java index 3293db2ca..3d7da91d5 100644 --- a/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java +++ b/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java @@ -74,7 +74,7 @@ public class FileChunkManifest { byte[] chunkData = SeaweedRead.chunkCache.getChunk(chunkView.fileId); if (chunkData == null) { LOG.debug("doFetchFullChunkData:{}", chunkView); - chunkData = SeaweedRead.doFetchFullChunkData(chunkView, locations); + chunkData = SeaweedRead.doFetchFullChunkData(filerGrpcClient, chunkView, locations); } if (chunk.getIsChunkManifest()){ LOG.debug("chunk {} size {}", chunkView.fileId, chunkData.length); diff --git a/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java b/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java index 1a719f3c0..8a37827f1 100644 --- a/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java +++ b/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java @@ -15,6 +15,10 @@ import java.util.concurrent.TimeUnit; public class FilerGrpcClient { + public final int VOLUME_SERVER_ACCESS_DIRECT = 0; + public final int VOLUME_SERVER_ACCESS_PUBLIC_URL = 1; + public final int VOLUME_SERVER_ACCESS_FILER_PROXY = 2; + private static final Logger logger = LoggerFactory.getLogger(FilerGrpcClient.class); static SslContext sslContext; @@ -34,6 +38,8 @@ public class FilerGrpcClient { private boolean cipher = false; private String collection = ""; private String replication = ""; + private int volumeServerAccess = VOLUME_SERVER_ACCESS_DIRECT; + private String filerAddress; public FilerGrpcClient(String host, int grpcPort) { this(host, grpcPort, sslContext); @@ -49,6 +55,8 @@ public class FilerGrpcClient { .negotiationType(NegotiationType.TLS) .sslContext(sslContext)); + filerAddress = String.format("%s:%d", host, grpcPort-10000); + FilerProto.GetFilerConfigurationResponse filerConfigurationResponse = this.getBlockingStub().getFilerConfiguration( FilerProto.GetFilerConfigurationRequest.newBuilder().build()); @@ -58,7 +66,7 @@ public class FilerGrpcClient { } - public FilerGrpcClient(ManagedChannelBuilder channelBuilder) { + private FilerGrpcClient(ManagedChannelBuilder channelBuilder) { channel = channelBuilder.build(); blockingStub = SeaweedFilerGrpc.newBlockingStub(channel); asyncStub = SeaweedFilerGrpc.newStub(channel); @@ -93,4 +101,26 @@ public class FilerGrpcClient { return futureStub; } + public void setAccessVolumeServerDirectly() { + this.volumeServerAccess = VOLUME_SERVER_ACCESS_DIRECT; + } + public boolean isAccessVolumeServerDirectly() { + return this.volumeServerAccess == VOLUME_SERVER_ACCESS_DIRECT; + } + public void setAccessVolumeServerByPublicUrl() { + this.volumeServerAccess = VOLUME_SERVER_ACCESS_PUBLIC_URL; + } + public boolean isAccessVolumeServerByPublicUrl() { + return this.volumeServerAccess == VOLUME_SERVER_ACCESS_PUBLIC_URL; + } + public void setAccessVolumeServerByFilerProxy() { + this.volumeServerAccess = VOLUME_SERVER_ACCESS_FILER_PROXY; + } + public boolean isAccessVolumeServerByFilerProxy() { + return this.volumeServerAccess == VOLUME_SERVER_ACCESS_FILER_PROXY; + } + public String getFilerAddress() { + return this.filerAddress; + } + } diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java index c45987bed..3df832d7d 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java @@ -71,7 +71,7 @@ public class SeaweedRead { return 0; } - int len = readChunkView(startOffset, buf, chunkView, locations); + int len = readChunkView(filerGrpcClient, startOffset, buf, chunkView, locations); LOG.debug("read [{},{}) {} size {}", startOffset, startOffset + len, chunkView.fileId, chunkView.size); @@ -93,12 +93,12 @@ public class SeaweedRead { return readCount; } - private static int readChunkView(long startOffset, ByteBuffer buf, ChunkView chunkView, FilerProto.Locations locations) throws IOException { + private static int readChunkView(FilerGrpcClient filerGrpcClient, long startOffset, ByteBuffer buf, ChunkView chunkView, FilerProto.Locations locations) throws IOException { byte[] chunkData = chunkCache.getChunk(chunkView.fileId); if (chunkData == null) { - chunkData = doFetchFullChunkData(chunkView, locations); + chunkData = doFetchFullChunkData(filerGrpcClient, chunkView, locations); chunkCache.setChunk(chunkView.fileId, chunkData); } @@ -110,12 +110,18 @@ public class SeaweedRead { return len; } - public static byte[] doFetchFullChunkData(ChunkView chunkView, FilerProto.Locations locations) throws IOException { + public static byte[] doFetchFullChunkData(FilerGrpcClient filerGrpcClient, ChunkView chunkView, FilerProto.Locations locations) throws IOException { byte[] data = null; IOException lastException = null; for (long waitTime = 1000L; waitTime < 10 * 1000; waitTime += waitTime / 2) { for (FilerProto.Location location : locations.getLocationsList()) { + String host = location.getUrl(); + if (filerGrpcClient.isAccessVolumeServerByPublicUrl()) { + host = location.getPublicUrl(); + } else if (filerGrpcClient.isAccessVolumeServerByFilerProxy()) { + host = filerGrpcClient.getFilerAddress(); + } String url = String.format("http://%s/%s", location.getUrl(), chunkView.fileId); try { data = doFetchOneFullChunkData(chunkView, url); @@ -145,7 +151,7 @@ public class SeaweedRead { } - public static byte[] doFetchOneFullChunkData(ChunkView chunkView, String url) throws IOException { + private static byte[] doFetchOneFullChunkData(ChunkView chunkView, String url) throws IOException { HttpGet request = new HttpGet(url); diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java index b8fd3e299..3cc11e21c 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java @@ -51,9 +51,15 @@ public class SeaweedWrite { .setPath(path) .build()); String fileId = response.getFileId(); - String url = response.getUrl(); String auth = response.getAuth(); - String targetUrl = String.format("http://%s/%s", url, fileId); + + String host = response.getUrl(); + if (filerGrpcClient.isAccessVolumeServerByPublicUrl()) { + host = response.getPublicUrl(); + } else if (filerGrpcClient.isAccessVolumeServerByFilerProxy()) { + host = filerGrpcClient.getFilerAddress(); + } + String targetUrl = String.format("http://%s/%s", host, fileId); ByteString cipherKeyString = com.google.protobuf.ByteString.EMPTY; byte[] cipherKey = null; diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java index 84f11e846..6072d3ec8 100644 --- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java +++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java @@ -26,6 +26,7 @@ public class SeaweedFileSystem extends FileSystem { public static final int FS_SEAWEED_DEFAULT_PORT = 8888; public static final String FS_SEAWEED_BUFFER_SIZE = "fs.seaweed.buffer.size"; public static final String FS_SEAWEED_REPLICATION = "fs.seaweed.replication"; + public static final String FS_SEAWEED_VOLUME_SERVER_ACCESS = "fs.seaweed.volumeServerAccess"; public static final int FS_SEAWEED_DEFAULT_BUFFER_SIZE = 4 * 1024 * 1024; private static final Logger LOG = LoggerFactory.getLogger(SeaweedFileSystem.class); diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java index 223036c13..8147f3efe 100644 --- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java +++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java @@ -18,8 +18,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import static seaweed.hdfs.SeaweedFileSystem.FS_SEAWEED_BUFFER_SIZE; -import static seaweed.hdfs.SeaweedFileSystem.FS_SEAWEED_DEFAULT_BUFFER_SIZE; +import static seaweed.hdfs.SeaweedFileSystem.*; public class SeaweedFileSystemStore { @@ -34,6 +33,13 @@ public class SeaweedFileSystemStore { filerGrpcClient = new FilerGrpcClient(host, grpcPort); filerClient = new FilerClient(filerGrpcClient); this.conf = conf; + String volumeServerAccessMode = this.conf.get(FS_SEAWEED_VOLUME_SERVER_ACCESS, "direct"); + if (volumeServerAccessMode.equals("publicUrl")) { + filerGrpcClient.setAccessVolumeServerByPublicUrl(); + } else if (volumeServerAccessMode.equals("filerProxy")) { + filerGrpcClient.setAccessVolumeServerByFilerProxy(); + } + } public void close() {