hilimd
5 years ago
committed by
GitHub
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
36 changed files with 822 additions and 191 deletions
-
23other/java/client/src/main/java/seaweedfs/client/ByteBufferPool.java
-
135other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java
-
86other/java/client/src/main/java/seaweedfs/client/FilerClient.java
-
17other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java
-
9other/java/client/src/main/java/seaweedfs/client/SeaweedUtil.java
-
42other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java
-
5other/java/client/src/test/java/seaweedfs/client/SeaweedReadTest.java
-
3other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
-
7other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java
-
8other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java
-
3other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
-
7other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java
-
8other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedOutputStream.java
-
142weed/filer2/filechunk_manifest.go
-
113weed/filer2/filechunk_manifest_test.go
-
32weed/filer2/filechunks.go
-
10weed/filer2/filechunks_test.go
-
17weed/filer2/reader_at.go
-
30weed/filer2/stream.go
-
50weed/filesys/dirty_page.go
-
2weed/filesys/file.go
-
13weed/filesys/filehandle.go
-
23weed/filesys/meta_cache/meta_cache.go
-
66weed/filesys/wfs_write.go
-
2weed/replication/sink/azuresink/azure_sink.go
-
2weed/replication/sink/b2sink/b2_sink.go
-
26weed/replication/sink/filersink/filer_sink.go
-
2weed/replication/sink/gcssink/gcs_sink.go
-
2weed/replication/sink/s3sink/s3_sink.go
-
73weed/server/filer_grpc_server.go
-
5weed/server/filer_server_handlers_write.go
-
28weed/server/filer_server_handlers_write_autochunk.go
-
2weed/server/filer_server_handlers_write_cipher.go
-
2weed/server/webdav_server.go
-
8weed/shell/command_volume_fsck.go
-
10weed/storage/needle/volume_ttl.go
@ -1,22 +1,39 @@ |
|||||
package seaweedfs.client; |
package seaweedfs.client; |
||||
|
|
||||
|
import org.slf4j.Logger; |
||||
|
import org.slf4j.LoggerFactory; |
||||
|
|
||||
import java.nio.ByteBuffer; |
import java.nio.ByteBuffer; |
||||
import java.util.ArrayList; |
import java.util.ArrayList; |
||||
import java.util.List; |
import java.util.List; |
||||
|
|
||||
public class ByteBufferPool { |
public class ByteBufferPool { |
||||
|
|
||||
static List<ByteBuffer> bufferList = new ArrayList<>(); |
|
||||
|
private static final int MIN_BUFFER_SIZE = 8 * 1024 * 1024; |
||||
|
private static final Logger LOG = LoggerFactory.getLogger(ByteBufferPool.class); |
||||
|
|
||||
|
private static final List<ByteBuffer> bufferList = new ArrayList<>(); |
||||
|
|
||||
public static synchronized ByteBuffer request(int bufferSize) { |
public static synchronized ByteBuffer request(int bufferSize) { |
||||
|
if (bufferSize < MIN_BUFFER_SIZE) { |
||||
|
bufferSize = MIN_BUFFER_SIZE; |
||||
|
} |
||||
if (bufferList.isEmpty()) { |
if (bufferList.isEmpty()) { |
||||
return ByteBuffer.allocate(bufferSize); |
return ByteBuffer.allocate(bufferSize); |
||||
} |
} |
||||
return bufferList.remove(bufferList.size()-1); |
|
||||
|
ByteBuffer buffer = bufferList.remove(bufferList.size() - 1); |
||||
|
if (buffer.capacity() >= bufferSize) { |
||||
|
return buffer; |
||||
|
} |
||||
|
|
||||
|
LOG.info("add new buffer from {} to {}", buffer.capacity(), bufferSize); |
||||
|
bufferList.add(0, buffer); |
||||
|
return ByteBuffer.allocate(bufferSize); |
||||
|
|
||||
} |
} |
||||
|
|
||||
public static synchronized void release(ByteBuffer obj) { |
public static synchronized void release(ByteBuffer obj) { |
||||
bufferList.add(obj); |
|
||||
|
bufferList.add(0, obj); |
||||
} |
} |
||||
|
|
||||
} |
} |
@ -0,0 +1,135 @@ |
|||||
|
package seaweedfs.client; |
||||
|
|
||||
|
import org.slf4j.Logger; |
||||
|
import org.slf4j.LoggerFactory; |
||||
|
|
||||
|
import java.io.IOException; |
||||
|
import java.util.ArrayList; |
||||
|
import java.util.List; |
||||
|
import java.util.Map; |
||||
|
|
||||
|
public class FileChunkManifest { |
||||
|
|
||||
|
private static final Logger LOG = LoggerFactory.getLogger(FileChunkManifest.class); |
||||
|
|
||||
|
private static final int mergeFactor = 1000; |
||||
|
|
||||
|
public static boolean hasChunkManifest(List<FilerProto.FileChunk> chunks) { |
||||
|
for (FilerProto.FileChunk chunk : chunks) { |
||||
|
if (chunk.getIsChunkManifest()) { |
||||
|
return true; |
||||
|
} |
||||
|
} |
||||
|
return false; |
||||
|
} |
||||
|
|
||||
|
public static List<FilerProto.FileChunk> resolveChunkManifest( |
||||
|
final FilerGrpcClient filerGrpcClient, List<FilerProto.FileChunk> chunks) throws IOException { |
||||
|
|
||||
|
List<FilerProto.FileChunk> dataChunks = new ArrayList<>(); |
||||
|
|
||||
|
for (FilerProto.FileChunk chunk : chunks) { |
||||
|
if (!chunk.getIsChunkManifest()) { |
||||
|
dataChunks.add(chunk); |
||||
|
continue; |
||||
|
} |
||||
|
|
||||
|
// IsChunkManifest |
||||
|
LOG.debug("fetching chunk manifest:{}", chunk); |
||||
|
byte[] data = fetchChunk(filerGrpcClient, chunk); |
||||
|
FilerProto.FileChunkManifest m = FilerProto.FileChunkManifest.newBuilder().mergeFrom(data).build(); |
||||
|
List<FilerProto.FileChunk> resolvedChunks = new ArrayList<>(); |
||||
|
for (FilerProto.FileChunk t : m.getChunksList()) { |
||||
|
// avoid deprecated chunk.getFileId() |
||||
|
resolvedChunks.add(t.toBuilder().setFileId(FilerClient.toFileId(t.getFid())).build()); |
||||
|
} |
||||
|
dataChunks.addAll(resolveChunkManifest(filerGrpcClient, resolvedChunks)); |
||||
|
} |
||||
|
|
||||
|
return dataChunks; |
||||
|
} |
||||
|
|
||||
|
private static byte[] fetchChunk(final FilerGrpcClient filerGrpcClient, FilerProto.FileChunk chunk) throws IOException { |
||||
|
|
||||
|
FilerProto.LookupVolumeRequest.Builder lookupRequest = FilerProto.LookupVolumeRequest.newBuilder(); |
||||
|
String vid = "" + chunk.getFid().getVolumeId(); |
||||
|
lookupRequest.addVolumeIds(vid); |
||||
|
FilerProto.LookupVolumeResponse lookupResponse = filerGrpcClient |
||||
|
.getBlockingStub().lookupVolume(lookupRequest.build()); |
||||
|
Map<String, FilerProto.Locations> vid2Locations = lookupResponse.getLocationsMapMap(); |
||||
|
FilerProto.Locations locations = vid2Locations.get(vid); |
||||
|
|
||||
|
SeaweedRead.ChunkView chunkView = new SeaweedRead.ChunkView( |
||||
|
FilerClient.toFileId(chunk.getFid()), // avoid deprecated chunk.getFileId() |
||||
|
0, |
||||
|
-1, |
||||
|
0, |
||||
|
true, |
||||
|
chunk.getCipherKey().toByteArray(), |
||||
|
chunk.getIsCompressed()); |
||||
|
|
||||
|
byte[] chunkData = SeaweedRead.chunkCache.getChunk(chunkView.fileId); |
||||
|
if (chunkData == null) { |
||||
|
LOG.debug("doFetchFullChunkData:{}", chunkView); |
||||
|
chunkData = SeaweedRead.doFetchFullChunkData(chunkView, locations); |
||||
|
} |
||||
|
LOG.debug("chunk {} size {}", chunkView.fileId, chunkData.length); |
||||
|
SeaweedRead.chunkCache.setChunk(chunkView.fileId, chunkData); |
||||
|
|
||||
|
return chunkData; |
||||
|
|
||||
|
} |
||||
|
|
||||
|
public static List<FilerProto.FileChunk> maybeManifestize( |
||||
|
final FilerGrpcClient filerGrpcClient, List<FilerProto.FileChunk> inputChunks) throws IOException { |
||||
|
// the return variable |
||||
|
List<FilerProto.FileChunk> chunks = new ArrayList<>(); |
||||
|
|
||||
|
List<FilerProto.FileChunk> dataChunks = new ArrayList<>(); |
||||
|
for (FilerProto.FileChunk chunk : inputChunks) { |
||||
|
if (!chunk.getIsChunkManifest()) { |
||||
|
dataChunks.add(chunk); |
||||
|
} else { |
||||
|
chunks.add(chunk); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
int remaining = dataChunks.size(); |
||||
|
for (int i = 0; i + mergeFactor < dataChunks.size(); i += mergeFactor) { |
||||
|
FilerProto.FileChunk chunk = mergeIntoManifest(filerGrpcClient, dataChunks.subList(i, i + mergeFactor)); |
||||
|
chunks.add(chunk); |
||||
|
remaining -= mergeFactor; |
||||
|
} |
||||
|
|
||||
|
// remaining |
||||
|
for (int i = dataChunks.size() - remaining; i < dataChunks.size(); i++) { |
||||
|
chunks.add(dataChunks.get(i)); |
||||
|
} |
||||
|
return chunks; |
||||
|
} |
||||
|
|
||||
|
private static FilerProto.FileChunk mergeIntoManifest(final FilerGrpcClient filerGrpcClient, List<FilerProto.FileChunk> dataChunks) throws IOException { |
||||
|
// create and serialize the manifest |
||||
|
dataChunks = FilerClient.beforeEntrySerialization(dataChunks); |
||||
|
FilerProto.FileChunkManifest.Builder m = FilerProto.FileChunkManifest.newBuilder().addAllChunks(dataChunks); |
||||
|
byte[] data = m.build().toByteArray(); |
||||
|
|
||||
|
long minOffset = Long.MAX_VALUE; |
||||
|
long maxOffset = -1; |
||||
|
for (FilerProto.FileChunk chunk : dataChunks) { |
||||
|
minOffset = Math.min(minOffset, chunk.getOffset()); |
||||
|
maxOffset = Math.max(maxOffset, chunk.getSize() + chunk.getOffset()); |
||||
|
} |
||||
|
|
||||
|
FilerProto.FileChunk.Builder manifestChunk = SeaweedWrite.writeChunk( |
||||
|
filerGrpcClient.getReplication(), |
||||
|
filerGrpcClient, |
||||
|
minOffset, |
||||
|
data, 0, data.length); |
||||
|
manifestChunk.setIsChunkManifest(true); |
||||
|
manifestChunk.setSize(maxOffset - minOffset); |
||||
|
return manifestChunk.build(); |
||||
|
|
||||
|
} |
||||
|
|
||||
|
} |
@ -0,0 +1,142 @@ |
|||||
|
package filer2 |
||||
|
|
||||
|
import ( |
||||
|
"bytes" |
||||
|
"fmt" |
||||
|
"io" |
||||
|
"math" |
||||
|
|
||||
|
"github.com/golang/protobuf/proto" |
||||
|
|
||||
|
"github.com/chrislusf/seaweedfs/weed/glog" |
||||
|
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" |
||||
|
"github.com/chrislusf/seaweedfs/weed/util" |
||||
|
) |
||||
|
|
||||
|
const ( |
||||
|
ManifestBatch = 1000 |
||||
|
) |
||||
|
|
||||
|
func HasChunkManifest(chunks []*filer_pb.FileChunk) bool { |
||||
|
for _, chunk := range chunks { |
||||
|
if chunk.IsChunkManifest { |
||||
|
return true |
||||
|
} |
||||
|
} |
||||
|
return false |
||||
|
} |
||||
|
|
||||
|
func ResolveChunkManifest(lookupFileIdFn LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (dataChunks, manifestChunks []*filer_pb.FileChunk, manefestResolveErr error) { |
||||
|
// TODO maybe parallel this
|
||||
|
for _, chunk := range chunks { |
||||
|
if !chunk.IsChunkManifest { |
||||
|
dataChunks = append(dataChunks, chunk) |
||||
|
continue |
||||
|
} |
||||
|
|
||||
|
// IsChunkManifest
|
||||
|
data, err := fetchChunk(lookupFileIdFn, chunk.FileId, chunk.CipherKey, chunk.IsCompressed) |
||||
|
if err != nil { |
||||
|
return chunks, nil, fmt.Errorf("fail to read manifest %s: %v", chunk.FileId, err) |
||||
|
} |
||||
|
m := &filer_pb.FileChunkManifest{} |
||||
|
if err := proto.Unmarshal(data, m); err != nil { |
||||
|
return chunks, nil, fmt.Errorf("fail to unmarshal manifest %s: %v", chunk.FileId, err) |
||||
|
} |
||||
|
manifestChunks = append(manifestChunks, chunk) |
||||
|
// recursive
|
||||
|
filer_pb.AfterEntryDeserialization(m.Chunks) |
||||
|
dchunks, mchunks, subErr := ResolveChunkManifest(lookupFileIdFn, m.Chunks) |
||||
|
if subErr != nil { |
||||
|
return chunks, nil, subErr |
||||
|
} |
||||
|
dataChunks = append(dataChunks, dchunks...) |
||||
|
manifestChunks = append(manifestChunks, mchunks...) |
||||
|
} |
||||
|
return |
||||
|
} |
||||
|
|
||||
|
// TODO fetch from cache for weed mount?
|
||||
|
func fetchChunk(lookupFileIdFn LookupFileIdFunctionType, fileId string, cipherKey []byte, isGzipped bool) ([]byte, error) { |
||||
|
urlString, err := lookupFileIdFn(fileId) |
||||
|
if err != nil { |
||||
|
glog.Errorf("operation LookupFileId %s failed, err: %v", fileId, err) |
||||
|
return nil, err |
||||
|
} |
||||
|
var buffer bytes.Buffer |
||||
|
err = util.ReadUrlAsStream(urlString, cipherKey, isGzipped, true, 0, 0, func(data []byte) { |
||||
|
buffer.Write(data) |
||||
|
}) |
||||
|
if err != nil { |
||||
|
glog.V(0).Infof("read %s failed, err: %v", fileId, err) |
||||
|
return nil, err |
||||
|
} |
||||
|
|
||||
|
return buffer.Bytes(), nil |
||||
|
} |
||||
|
|
||||
|
func MaybeManifestize(saveFunc SaveDataAsChunkFunctionType, inputChunks []*filer_pb.FileChunk) (chunks []*filer_pb.FileChunk, err error) { |
||||
|
return doMaybeManifestize(saveFunc, inputChunks, ManifestBatch, mergeIntoManifest) |
||||
|
} |
||||
|
|
||||
|
func doMaybeManifestize(saveFunc SaveDataAsChunkFunctionType, inputChunks []*filer_pb.FileChunk, mergeFactor int, mergefn func(saveFunc SaveDataAsChunkFunctionType, dataChunks []*filer_pb.FileChunk) (manifestChunk *filer_pb.FileChunk, err error)) (chunks []*filer_pb.FileChunk, err error) { |
||||
|
|
||||
|
var dataChunks []*filer_pb.FileChunk |
||||
|
for _, chunk := range inputChunks { |
||||
|
if !chunk.IsChunkManifest { |
||||
|
dataChunks = append(dataChunks, chunk) |
||||
|
} else { |
||||
|
chunks = append(chunks, chunk) |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
remaining := len(dataChunks) |
||||
|
for i := 0; i+mergeFactor <= len(dataChunks); i += mergeFactor { |
||||
|
chunk, err := mergefn(saveFunc, dataChunks[i:i+mergeFactor]) |
||||
|
if err != nil { |
||||
|
return dataChunks, err |
||||
|
} |
||||
|
chunks = append(chunks, chunk) |
||||
|
remaining -= mergeFactor |
||||
|
} |
||||
|
// remaining
|
||||
|
for i := len(dataChunks) - remaining; i < len(dataChunks); i++ { |
||||
|
chunks = append(chunks, dataChunks[i]) |
||||
|
} |
||||
|
return |
||||
|
} |
||||
|
|
||||
|
func mergeIntoManifest(saveFunc SaveDataAsChunkFunctionType, dataChunks []*filer_pb.FileChunk) (manifestChunk *filer_pb.FileChunk, err error) { |
||||
|
|
||||
|
filer_pb.BeforeEntrySerialization(dataChunks) |
||||
|
|
||||
|
// create and serialize the manifest
|
||||
|
data, serErr := proto.Marshal(&filer_pb.FileChunkManifest{ |
||||
|
Chunks: dataChunks, |
||||
|
}) |
||||
|
if serErr != nil { |
||||
|
return nil, fmt.Errorf("serializing manifest: %v", serErr) |
||||
|
} |
||||
|
|
||||
|
minOffset, maxOffset := int64(math.MaxInt64), int64(math.MinInt64) |
||||
|
for _, chunk := range dataChunks { |
||||
|
if minOffset > int64(chunk.Offset) { |
||||
|
minOffset = chunk.Offset |
||||
|
} |
||||
|
if maxOffset < int64(chunk.Size)+chunk.Offset { |
||||
|
maxOffset = int64(chunk.Size) + chunk.Offset |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
manifestChunk, _, _, err = saveFunc(bytes.NewReader(data), "", 0) |
||||
|
if err != nil { |
||||
|
return nil, err |
||||
|
} |
||||
|
manifestChunk.IsChunkManifest = true |
||||
|
manifestChunk.Offset = minOffset |
||||
|
manifestChunk.Size = uint64(maxOffset - minOffset) |
||||
|
|
||||
|
return |
||||
|
} |
||||
|
|
||||
|
type SaveDataAsChunkFunctionType func(reader io.Reader, name string, offset int64) (chunk *filer_pb.FileChunk, collection, replication string, err error) |
@ -0,0 +1,113 @@ |
|||||
|
package filer2 |
||||
|
|
||||
|
import ( |
||||
|
"bytes" |
||||
|
"math" |
||||
|
"testing" |
||||
|
|
||||
|
"github.com/stretchr/testify/assert" |
||||
|
|
||||
|
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" |
||||
|
) |
||||
|
|
||||
|
func TestDoMaybeManifestize(t *testing.T) { |
||||
|
var manifestTests = []struct { |
||||
|
inputs []*filer_pb.FileChunk |
||||
|
expected []*filer_pb.FileChunk |
||||
|
}{ |
||||
|
{ |
||||
|
inputs: []*filer_pb.FileChunk{ |
||||
|
{FileId: "1", IsChunkManifest: false}, |
||||
|
{FileId: "2", IsChunkManifest: false}, |
||||
|
{FileId: "3", IsChunkManifest: false}, |
||||
|
{FileId: "4", IsChunkManifest: false}, |
||||
|
}, |
||||
|
expected: []*filer_pb.FileChunk{ |
||||
|
{FileId: "12", IsChunkManifest: true}, |
||||
|
{FileId: "34", IsChunkManifest: true}, |
||||
|
}, |
||||
|
}, |
||||
|
{ |
||||
|
inputs: []*filer_pb.FileChunk{ |
||||
|
{FileId: "1", IsChunkManifest: true}, |
||||
|
{FileId: "2", IsChunkManifest: false}, |
||||
|
{FileId: "3", IsChunkManifest: false}, |
||||
|
{FileId: "4", IsChunkManifest: false}, |
||||
|
}, |
||||
|
expected: []*filer_pb.FileChunk{ |
||||
|
{FileId: "1", IsChunkManifest: true}, |
||||
|
{FileId: "23", IsChunkManifest: true}, |
||||
|
{FileId: "4", IsChunkManifest: false}, |
||||
|
}, |
||||
|
}, |
||||
|
{ |
||||
|
inputs: []*filer_pb.FileChunk{ |
||||
|
{FileId: "1", IsChunkManifest: false}, |
||||
|
{FileId: "2", IsChunkManifest: true}, |
||||
|
{FileId: "3", IsChunkManifest: false}, |
||||
|
{FileId: "4", IsChunkManifest: false}, |
||||
|
}, |
||||
|
expected: []*filer_pb.FileChunk{ |
||||
|
{FileId: "2", IsChunkManifest: true}, |
||||
|
{FileId: "13", IsChunkManifest: true}, |
||||
|
{FileId: "4", IsChunkManifest: false}, |
||||
|
}, |
||||
|
}, |
||||
|
{ |
||||
|
inputs: []*filer_pb.FileChunk{ |
||||
|
{FileId: "1", IsChunkManifest: true}, |
||||
|
{FileId: "2", IsChunkManifest: true}, |
||||
|
{FileId: "3", IsChunkManifest: false}, |
||||
|
{FileId: "4", IsChunkManifest: false}, |
||||
|
}, |
||||
|
expected: []*filer_pb.FileChunk{ |
||||
|
{FileId: "1", IsChunkManifest: true}, |
||||
|
{FileId: "2", IsChunkManifest: true}, |
||||
|
{FileId: "34", IsChunkManifest: true}, |
||||
|
}, |
||||
|
}, |
||||
|
} |
||||
|
|
||||
|
for i, mtest := range manifestTests { |
||||
|
println("test", i) |
||||
|
actual, _ := doMaybeManifestize(nil, mtest.inputs, 2, mockMerge) |
||||
|
assertEqualChunks(t, mtest.expected, actual) |
||||
|
} |
||||
|
|
||||
|
} |
||||
|
|
||||
|
func assertEqualChunks(t *testing.T, expected, actual []*filer_pb.FileChunk) { |
||||
|
assert.Equal(t, len(expected), len(actual)) |
||||
|
for i := 0; i < len(actual); i++ { |
||||
|
assertEqualChunk(t, actual[i], expected[i]) |
||||
|
} |
||||
|
} |
||||
|
func assertEqualChunk(t *testing.T, expected, actual *filer_pb.FileChunk) { |
||||
|
assert.Equal(t, expected.FileId, actual.FileId) |
||||
|
assert.Equal(t, expected.IsChunkManifest, actual.IsChunkManifest) |
||||
|
} |
||||
|
|
||||
|
func mockMerge(saveFunc SaveDataAsChunkFunctionType, dataChunks []*filer_pb.FileChunk) (manifestChunk *filer_pb.FileChunk, err error) { |
||||
|
|
||||
|
var buf bytes.Buffer |
||||
|
minOffset, maxOffset := int64(math.MaxInt64), int64(math.MinInt64) |
||||
|
for k := 0; k < len(dataChunks); k++ { |
||||
|
chunk := dataChunks[k] |
||||
|
buf.WriteString(chunk.FileId) |
||||
|
if minOffset > int64(chunk.Offset) { |
||||
|
minOffset = chunk.Offset |
||||
|
} |
||||
|
if maxOffset < int64(chunk.Size)+chunk.Offset { |
||||
|
maxOffset = int64(chunk.Size) + chunk.Offset |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
manifestChunk = &filer_pb.FileChunk{ |
||||
|
FileId: buf.String(), |
||||
|
} |
||||
|
manifestChunk.IsChunkManifest = true |
||||
|
manifestChunk.Offset = minOffset |
||||
|
manifestChunk.Size = uint64(maxOffset - minOffset) |
||||
|
|
||||
|
return |
||||
|
} |
@ -0,0 +1,66 @@ |
|||||
|
package filesys |
||||
|
|
||||
|
import ( |
||||
|
"context" |
||||
|
"fmt" |
||||
|
"io" |
||||
|
|
||||
|
"github.com/chrislusf/seaweedfs/weed/filer2" |
||||
|
"github.com/chrislusf/seaweedfs/weed/glog" |
||||
|
"github.com/chrislusf/seaweedfs/weed/operation" |
||||
|
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" |
||||
|
"github.com/chrislusf/seaweedfs/weed/security" |
||||
|
) |
||||
|
|
||||
|
func (wfs *WFS) saveDataAsChunk(dir string) filer2.SaveDataAsChunkFunctionType { |
||||
|
|
||||
|
return func(reader io.Reader, filename string, offset int64) (chunk *filer_pb.FileChunk, collection, replication string, err error) { |
||||
|
var fileId, host string |
||||
|
var auth security.EncodedJwt |
||||
|
|
||||
|
if err := wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { |
||||
|
|
||||
|
request := &filer_pb.AssignVolumeRequest{ |
||||
|
Count: 1, |
||||
|
Replication: wfs.option.Replication, |
||||
|
Collection: wfs.option.Collection, |
||||
|
TtlSec: wfs.option.TtlSec, |
||||
|
DataCenter: wfs.option.DataCenter, |
||||
|
ParentPath: dir, |
||||
|
} |
||||
|
|
||||
|
resp, err := client.AssignVolume(context.Background(), request) |
||||
|
if err != nil { |
||||
|
glog.V(0).Infof("assign volume failure %v: %v", request, err) |
||||
|
return err |
||||
|
} |
||||
|
if resp.Error != "" { |
||||
|
return fmt.Errorf("assign volume failure %v: %v", request, resp.Error) |
||||
|
} |
||||
|
|
||||
|
fileId, host, auth = resp.FileId, resp.Url, security.EncodedJwt(resp.Auth) |
||||
|
host = wfs.AdjustedUrl(host) |
||||
|
collection, replication = resp.Collection, resp.Replication |
||||
|
|
||||
|
return nil |
||||
|
}); err != nil { |
||||
|
return nil, "", "", fmt.Errorf("filerGrpcAddress assign volume: %v", err) |
||||
|
} |
||||
|
|
||||
|
fileUrl := fmt.Sprintf("http://%s/%s", host, fileId) |
||||
|
uploadResult, err, data := operation.Upload(fileUrl, filename, wfs.option.Cipher, reader, false, "", nil, auth) |
||||
|
if err != nil { |
||||
|
glog.V(0).Infof("upload data %v to %s: %v", filename, fileUrl, err) |
||||
|
return nil, "", "", fmt.Errorf("upload data: %v", err) |
||||
|
} |
||||
|
if uploadResult.Error != "" { |
||||
|
glog.V(0).Infof("upload failure %v to %s: %v", filename, fileUrl, err) |
||||
|
return nil, "", "", fmt.Errorf("upload result: %v", uploadResult.Error) |
||||
|
} |
||||
|
|
||||
|
wfs.chunkCache.SetChunk(fileId, data) |
||||
|
|
||||
|
chunk = uploadResult.ToPbFileChunk(fileId, offset) |
||||
|
return chunk, "", "", nil |
||||
|
} |
||||
|
} |
Write
Preview
Loading…
Cancel
Save
Reference in new issue