From 97d97f35287a88de392a1a422b3533339d923ae2 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 19 Jul 2020 17:59:43 -0700 Subject: [PATCH 01/14] go code can read and write chunk manifest --- weed/filer2/filechunk_manifest.go | 136 ++++++++++++++++++ weed/filer2/filechunk_manifest_test.go | 113 +++++++++++++++ weed/filer2/filechunks.go | 32 ++++- weed/filer2/filechunks_test.go | 10 +- weed/filer2/reader_at.go | 17 +-- weed/filer2/stream.go | 30 ++-- weed/filesys/dirty_page.go | 50 +------ weed/filesys/file.go | 2 +- weed/filesys/filehandle.go | 13 +- weed/filesys/wfs_write.go | 66 +++++++++ weed/replication/sink/azuresink/azure_sink.go | 2 +- weed/replication/sink/b2sink/b2_sink.go | 2 +- weed/replication/sink/filersink/filer_sink.go | 26 +++- weed/replication/sink/gcssink/gcs_sink.go | 2 +- weed/replication/sink/s3sink/s3_sink.go | 2 +- weed/server/filer_grpc_server.go | 73 ++++++++-- weed/server/filer_server_handlers_write.go | 5 +- .../filer_server_handlers_write_autochunk.go | 28 +++- .../filer_server_handlers_write_cipher.go | 2 +- weed/server/webdav_server.go | 2 +- weed/storage/needle/volume_ttl.go | 10 +- 21 files changed, 511 insertions(+), 112 deletions(-) create mode 100644 weed/filer2/filechunk_manifest.go create mode 100644 weed/filer2/filechunk_manifest_test.go create mode 100644 weed/filesys/wfs_write.go diff --git a/weed/filer2/filechunk_manifest.go b/weed/filer2/filechunk_manifest.go new file mode 100644 index 000000000..92b114853 --- /dev/null +++ b/weed/filer2/filechunk_manifest.go @@ -0,0 +1,136 @@ +package filer2 + +import ( + "bytes" + "fmt" + "io" + "math" + + "github.com/golang/protobuf/proto" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" +) + +func HasChunkManifest(chunks []*filer_pb.FileChunk) bool { + for _, chunk := range chunks { + if chunk.IsChunkManifest { + return true + } + } + return false +} + +func ResolveChunkManifest(lookupFileIdFn LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (dataChunks, manifestChunks []*filer_pb.FileChunk, manefestResolveErr error) { + // TODO maybe parallel this + for _, chunk := range chunks { + if !chunk.IsChunkManifest { + dataChunks = append(dataChunks, chunk) + continue + } + + // IsChunkManifest + data, err := fetchChunk(lookupFileIdFn, chunk.FileId, chunk.CipherKey, chunk.IsCompressed) + if err != nil { + return chunks, nil, fmt.Errorf("fail to read manifest %s: %v", chunk.FileId, err) + } + m := &filer_pb.FileChunkManifest{} + if err := proto.Unmarshal(data, m); err != nil { + return chunks, nil, fmt.Errorf("fail to unmarshal manifest %s: %v", chunk.FileId, err) + } + manifestChunks = append(manifestChunks, chunk) + // recursive + dchunks, mchunks, subErr := ResolveChunkManifest(lookupFileIdFn, m.Chunks) + if subErr != nil { + return chunks, nil, subErr + } + dataChunks = append(dataChunks, dchunks...) + manifestChunks = append(manifestChunks, mchunks...) + } + return +} + +func fetchChunk(lookupFileIdFn LookupFileIdFunctionType, fileId string, cipherKey []byte, isGzipped bool) ([]byte, error) { + urlString, err := lookupFileIdFn(fileId) + if err != nil { + glog.V(1).Infof("operation LookupFileId %s failed, err: %v", fileId, err) + return nil, err + } + var buffer bytes.Buffer + err = util.ReadUrlAsStream(urlString, cipherKey, isGzipped, true, 0, 0, func(data []byte) { + buffer.Write(data) + }) + if err != nil { + glog.V(0).Infof("read %s failed, err: %v", fileId, err) + return nil, err + } + + return buffer.Bytes(), nil +} + +func MaybeManifestize(saveFunc SaveDataAsChunkFunctionType, dataChunks []*filer_pb.FileChunk) (chunks []*filer_pb.FileChunk, err error) { + return doMaybeManifestize(saveFunc, dataChunks, 10000, mergeIntoManifest) +} + +func doMaybeManifestize(saveFunc SaveDataAsChunkFunctionType, inputChunks []*filer_pb.FileChunk, mergeFactor int, mergefn func(saveFunc SaveDataAsChunkFunctionType, dataChunks []*filer_pb.FileChunk) (manifestChunk *filer_pb.FileChunk, err error)) (chunks []*filer_pb.FileChunk, err error) { + + var dataChunks []*filer_pb.FileChunk + for _, chunk := range inputChunks { + if !chunk.IsChunkManifest { + dataChunks = append(dataChunks, chunk) + } else { + chunks = append(chunks, chunk) + } + } + + manifestBatch := mergeFactor + remaining := len(dataChunks) + for i := 0; i+manifestBatch <= len(dataChunks); i += manifestBatch { + chunk, err := mergefn(saveFunc, dataChunks[i:i+manifestBatch]) + if err != nil { + return dataChunks, err + } + chunks = append(chunks, chunk) + remaining -= manifestBatch + } + // remaining + for i := len(dataChunks) - remaining; i < len(dataChunks); i++ { + chunks = append(chunks, dataChunks[i]) + } + return +} + +func mergeIntoManifest(saveFunc SaveDataAsChunkFunctionType, dataChunks []*filer_pb.FileChunk) (manifestChunk *filer_pb.FileChunk, err error) { + + // create and serialize the manifest + data, serErr := proto.Marshal(&filer_pb.FileChunkManifest{ + Chunks: dataChunks, + }) + if serErr != nil { + return nil, fmt.Errorf("serializing manifest: %v", serErr) + } + + minOffset, maxOffset := int64(math.MaxInt64), int64(math.MinInt64) + for k := 0; k < len(dataChunks); k++ { + chunk := dataChunks[k] + if minOffset > int64(chunk.Offset) { + minOffset = chunk.Offset + } + if maxOffset < int64(chunk.Size)+chunk.Offset { + maxOffset = int64(chunk.Size) + chunk.Offset + } + } + + manifestChunk, _, _, err = saveFunc(bytes.NewReader(data), "", 0) + if err != nil { + return nil, err + } + manifestChunk.IsChunkManifest = true + manifestChunk.Offset = minOffset + manifestChunk.Size = uint64(maxOffset - minOffset) + + return +} + +type SaveDataAsChunkFunctionType func(reader io.Reader, name string, offset int64) (chunk *filer_pb.FileChunk, collection, replication string, err error) diff --git a/weed/filer2/filechunk_manifest_test.go b/weed/filer2/filechunk_manifest_test.go new file mode 100644 index 000000000..2b0862d07 --- /dev/null +++ b/weed/filer2/filechunk_manifest_test.go @@ -0,0 +1,113 @@ +package filer2 + +import ( + "bytes" + "math" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" +) + +func TestDoMaybeManifestize(t *testing.T) { + var manifestTests = []struct { + inputs []*filer_pb.FileChunk + expected []*filer_pb.FileChunk + }{ + { + inputs: []*filer_pb.FileChunk{ + {FileId: "1", IsChunkManifest: false}, + {FileId: "2", IsChunkManifest: false}, + {FileId: "3", IsChunkManifest: false}, + {FileId: "4", IsChunkManifest: false}, + }, + expected: []*filer_pb.FileChunk{ + {FileId: "12", IsChunkManifest: true}, + {FileId: "34", IsChunkManifest: true}, + }, + }, + { + inputs: []*filer_pb.FileChunk{ + {FileId: "1", IsChunkManifest: true}, + {FileId: "2", IsChunkManifest: false}, + {FileId: "3", IsChunkManifest: false}, + {FileId: "4", IsChunkManifest: false}, + }, + expected: []*filer_pb.FileChunk{ + {FileId: "1", IsChunkManifest: true}, + {FileId: "23", IsChunkManifest: true}, + {FileId: "4", IsChunkManifest: false}, + }, + }, + { + inputs: []*filer_pb.FileChunk{ + {FileId: "1", IsChunkManifest: false}, + {FileId: "2", IsChunkManifest: true}, + {FileId: "3", IsChunkManifest: false}, + {FileId: "4", IsChunkManifest: false}, + }, + expected: []*filer_pb.FileChunk{ + {FileId: "2", IsChunkManifest: true}, + {FileId: "13", IsChunkManifest: true}, + {FileId: "4", IsChunkManifest: false}, + }, + }, + { + inputs: []*filer_pb.FileChunk{ + {FileId: "1", IsChunkManifest: true}, + {FileId: "2", IsChunkManifest: true}, + {FileId: "3", IsChunkManifest: false}, + {FileId: "4", IsChunkManifest: false}, + }, + expected: []*filer_pb.FileChunk{ + {FileId: "1", IsChunkManifest: true}, + {FileId: "2", IsChunkManifest: true}, + {FileId: "34", IsChunkManifest: true}, + }, + }, + } + + for i, mtest := range manifestTests { + println("test", i) + actual, _ := doMaybeManifestize(nil, mtest.inputs, 2, mockMerge) + assertEqualChunks(t, mtest.expected, actual) + } + +} + +func assertEqualChunks(t *testing.T, expected, actual []*filer_pb.FileChunk) { + assert.Equal(t, len(expected), len(actual)) + for i := 0; i < len(actual); i++ { + assertEqualChunk(t, actual[i], expected[i]) + } +} +func assertEqualChunk(t *testing.T, expected, actual *filer_pb.FileChunk) { + assert.Equal(t, expected.FileId, actual.FileId) + assert.Equal(t, expected.IsChunkManifest, actual.IsChunkManifest) +} + +func mockMerge(saveFunc SaveDataAsChunkFunctionType, dataChunks []*filer_pb.FileChunk) (manifestChunk *filer_pb.FileChunk, err error) { + + var buf bytes.Buffer + minOffset, maxOffset := int64(math.MaxInt64), int64(math.MinInt64) + for k := 0; k < len(dataChunks); k++ { + chunk := dataChunks[k] + buf.WriteString(chunk.FileId) + if minOffset > int64(chunk.Offset) { + minOffset = chunk.Offset + } + if maxOffset < int64(chunk.Size)+chunk.Offset { + maxOffset = int64(chunk.Size) + chunk.Offset + } + } + + manifestChunk = &filer_pb.FileChunk{ + FileId: buf.String(), + } + manifestChunk.IsChunkManifest = true + manifestChunk.Offset = minOffset + manifestChunk.Size = uint64(maxOffset - minOffset) + + return +} diff --git a/weed/filer2/filechunks.go b/weed/filer2/filechunks.go index 6832d0f31..ea7772b4a 100644 --- a/weed/filer2/filechunks.go +++ b/weed/filer2/filechunks.go @@ -46,9 +46,9 @@ func ETagChunks(chunks []*filer_pb.FileChunk) (etag string) { return fmt.Sprintf("%x", h.Sum32()) } -func CompactFileChunks(chunks []*filer_pb.FileChunk) (compacted, garbage []*filer_pb.FileChunk) { +func CompactFileChunks(lookupFileIdFn LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (compacted, garbage []*filer_pb.FileChunk) { - visibles := NonOverlappingVisibleIntervals(chunks) + visibles, _ := NonOverlappingVisibleIntervals(lookupFileIdFn, chunks) fileIds := make(map[string]bool) for _, interval := range visibles { @@ -65,7 +65,23 @@ func CompactFileChunks(chunks []*filer_pb.FileChunk) (compacted, garbage []*file return } -func MinusChunks(as, bs []*filer_pb.FileChunk) (delta []*filer_pb.FileChunk) { +func MinusChunks(lookupFileIdFn LookupFileIdFunctionType, as, bs []*filer_pb.FileChunk) (delta []*filer_pb.FileChunk, err error) { + + aData, aMeta, aErr := ResolveChunkManifest(lookupFileIdFn, as) + if aErr != nil { + return nil, aErr + } + bData, bMeta, bErr := ResolveChunkManifest(lookupFileIdFn, bs) + if bErr != nil { + return nil, bErr + } + + delta = append(delta, DoMinusChunks(aData, bData)...) + delta = append(delta, DoMinusChunks(aMeta, bMeta)...) + return +} + +func DoMinusChunks(as, bs []*filer_pb.FileChunk) (delta []*filer_pb.FileChunk) { fileIds := make(map[string]bool) for _, interval := range bs { @@ -94,9 +110,9 @@ func (cv *ChunkView) IsFullChunk() bool { return cv.Size == cv.ChunkSize } -func ViewFromChunks(chunks []*filer_pb.FileChunk, offset int64, size int64) (views []*ChunkView) { +func ViewFromChunks(lookupFileIdFn LookupFileIdFunctionType, chunks []*filer_pb.FileChunk, offset int64, size int64) (views []*ChunkView) { - visibles := NonOverlappingVisibleIntervals(chunks) + visibles, _ := NonOverlappingVisibleIntervals(lookupFileIdFn, chunks) return ViewFromVisibleIntervals(visibles, offset, size) @@ -190,7 +206,11 @@ func MergeIntoVisibles(visibles, newVisibles []VisibleInterval, chunk *filer_pb. return newVisibles } -func NonOverlappingVisibleIntervals(chunks []*filer_pb.FileChunk) (visibles []VisibleInterval) { +// NonOverlappingVisibleIntervals translates the file chunk into VisibleInterval in memory +// If the file chunk content is a chunk manifest +func NonOverlappingVisibleIntervals(lookupFileIdFn LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (visibles []VisibleInterval, err error) { + + chunks, _, err = ResolveChunkManifest(lookupFileIdFn, chunks) sort.Slice(chunks, func(i, j int) bool { return chunks[i].Mtime < chunks[j].Mtime diff --git a/weed/filer2/filechunks_test.go b/weed/filer2/filechunks_test.go index 7b1133b85..bfee59198 100644 --- a/weed/filer2/filechunks_test.go +++ b/weed/filer2/filechunks_test.go @@ -16,7 +16,7 @@ func TestCompactFileChunks(t *testing.T) { {Offset: 110, Size: 200, FileId: "jkl", Mtime: 300}, } - compacted, garbage := CompactFileChunks(chunks) + compacted, garbage := CompactFileChunks(nil, chunks) if len(compacted) != 3 { t.Fatalf("unexpected compacted: %d", len(compacted)) @@ -49,7 +49,7 @@ func TestCompactFileChunks2(t *testing.T) { }) } - compacted, garbage := CompactFileChunks(chunks) + compacted, garbage := CompactFileChunks(nil, chunks) if len(compacted) != 4 { t.Fatalf("unexpected compacted: %d", len(compacted)) @@ -186,7 +186,7 @@ func TestIntervalMerging(t *testing.T) { for i, testcase := range testcases { log.Printf("++++++++++ merged test case %d ++++++++++++++++++++", i) - intervals := NonOverlappingVisibleIntervals(testcase.Chunks) + intervals, _ := NonOverlappingVisibleIntervals(nil, testcase.Chunks) for x, interval := range intervals { log.Printf("test case %d, interval %d, start=%d, stop=%d, fileId=%s", i, x, interval.start, interval.stop, interval.fileId) @@ -371,7 +371,7 @@ func TestChunksReading(t *testing.T) { for i, testcase := range testcases { log.Printf("++++++++++ read test case %d ++++++++++++++++++++", i) - chunks := ViewFromChunks(testcase.Chunks, testcase.Offset, testcase.Size) + chunks := ViewFromChunks(nil, testcase.Chunks, testcase.Offset, testcase.Size) for x, chunk := range chunks { log.Printf("read case %d, chunk %d, offset=%d, size=%d, fileId=%s", i, x, chunk.Offset, chunk.Size, chunk.FileId) @@ -415,6 +415,6 @@ func BenchmarkCompactFileChunks(b *testing.B) { } for n := 0; n < b.N; n++ { - CompactFileChunks(chunks) + CompactFileChunks(nil, chunks) } } diff --git a/weed/filer2/reader_at.go b/weed/filer2/reader_at.go index 11a80443f..568d94267 100644 --- a/weed/filer2/reader_at.go +++ b/weed/filer2/reader_at.go @@ -1,7 +1,6 @@ package filer2 import ( - "bytes" "context" "fmt" "io" @@ -9,7 +8,6 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util/chunk_cache" "github.com/chrislusf/seaweedfs/weed/wdclient" ) @@ -144,19 +142,6 @@ func (c *ChunkReadAt) fetchChunkData(chunkView *ChunkView) (data []byte, err err func (c *ChunkReadAt) doFetchFullChunkData(fileId string, cipherKey []byte, isGzipped bool) ([]byte, error) { - urlString, err := c.lookupFileId(fileId) - if err != nil { - glog.V(1).Infof("operation LookupFileId %s failed, err: %v", fileId, err) - return nil, err - } - var buffer bytes.Buffer - err = util.ReadUrlAsStream(urlString, cipherKey, isGzipped, true, 0, 0, func(data []byte) { - buffer.Write(data) - }) - if err != nil { - glog.V(0).Infof("read %s failed, err: %v", fileId, err) - return nil, err - } + return fetchChunk(c.lookupFileId, fileId, cipherKey, isGzipped) - return buffer.Bytes(), nil } diff --git a/weed/filer2/stream.go b/weed/filer2/stream.go index 033a8dd13..c7df007ec 100644 --- a/weed/filer2/stream.go +++ b/weed/filer2/stream.go @@ -2,6 +2,7 @@ package filer2 import ( "bytes" + "fmt" "io" "math" "strings" @@ -14,7 +15,8 @@ import ( func StreamContent(masterClient *wdclient.MasterClient, w io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int64) error { - chunkViews := ViewFromChunks(chunks, offset, size) + fmt.Printf("start to stream content for chunks: %+v\n", chunks) + chunkViews := ViewFromChunks(masterClient.LookupFileId, chunks, offset, size) fileId2Url := make(map[string]string) @@ -50,14 +52,14 @@ func ReadAll(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) buffer := bytes.Buffer{} - chunkViews := ViewFromChunks(chunks, 0, math.MaxInt32) - - lookupFileId := func(fileId string) (targetUrl string, err error) { + lookupFileIdFn := func(fileId string) (targetUrl string, err error) { return masterClient.LookupFileId(fileId) } + chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64) + for _, chunkView := range chunkViews { - urlString, err := lookupFileId(chunkView.FileId) + urlString, err := lookupFileIdFn(chunkView.FileId) if err != nil { glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err) return nil, err @@ -88,23 +90,27 @@ var _ = io.ReadSeeker(&ChunkStreamReader{}) func NewChunkStreamReaderFromFiler(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader { - chunkViews := ViewFromChunks(chunks, 0, math.MaxInt32) + lookupFileIdFn := func(fileId string) (targetUrl string, err error) { + return masterClient.LookupFileId(fileId) + } + + chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64) return &ChunkStreamReader{ - chunkViews: chunkViews, - lookupFileId: func(fileId string) (targetUrl string, err error) { - return masterClient.LookupFileId(fileId) - }, + chunkViews: chunkViews, + lookupFileId: lookupFileIdFn, } } func NewChunkStreamReader(filerClient filer_pb.FilerClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader { - chunkViews := ViewFromChunks(chunks, 0, math.MaxInt32) + lookupFileIdFn := LookupFn(filerClient) + + chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64) return &ChunkStreamReader{ chunkViews: chunkViews, - lookupFileId: LookupFn(filerClient), + lookupFileId: lookupFileIdFn, } } diff --git a/weed/filesys/dirty_page.go b/weed/filesys/dirty_page.go index 45224b3e7..46d20e466 100644 --- a/weed/filesys/dirty_page.go +++ b/weed/filesys/dirty_page.go @@ -2,16 +2,12 @@ package filesys import ( "bytes" - "context" - "fmt" "io" "sync" "time" "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/security" ) type ContinuousDirtyPages struct { @@ -141,53 +137,15 @@ func (pages *ContinuousDirtyPages) saveExistingLargestPageToStorage() (chunk *fi func (pages *ContinuousDirtyPages) saveToStorage(reader io.Reader, offset int64, size int64) (*filer_pb.FileChunk, error) { - var fileId, host string - var auth security.EncodedJwt - dir, _ := pages.f.fullpath().DirAndName() - if err := pages.f.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { - - request := &filer_pb.AssignVolumeRequest{ - Count: 1, - Replication: pages.f.wfs.option.Replication, - Collection: pages.f.wfs.option.Collection, - TtlSec: pages.f.wfs.option.TtlSec, - DataCenter: pages.f.wfs.option.DataCenter, - ParentPath: dir, - } - - resp, err := client.AssignVolume(context.Background(), request) - if err != nil { - glog.V(0).Infof("assign volume failure %v: %v", request, err) - return err - } - if resp.Error != "" { - return fmt.Errorf("assign volume failure %v: %v", request, resp.Error) - } - - fileId, host, auth = resp.FileId, resp.Url, security.EncodedJwt(resp.Auth) - host = pages.f.wfs.AdjustedUrl(host) - pages.collection, pages.replication = resp.Collection, resp.Replication - - return nil - }); err != nil { - return nil, fmt.Errorf("filerGrpcAddress assign volume: %v", err) - } - - fileUrl := fmt.Sprintf("http://%s/%s", host, fileId) - uploadResult, err, data := operation.Upload(fileUrl, pages.f.Name, pages.f.wfs.option.Cipher, reader, false, "", nil, auth) + chunk, collection, replication, err := pages.f.wfs.saveDataAsChunk(dir)(reader, pages.f.Name, offset) if err != nil { - glog.V(0).Infof("upload data %v to %s: %v", pages.f.Name, fileUrl, err) - return nil, fmt.Errorf("upload data: %v", err) - } - if uploadResult.Error != "" { - glog.V(0).Infof("upload failure %v to %s: %v", pages.f.Name, fileUrl, err) - return nil, fmt.Errorf("upload result: %v", uploadResult.Error) + return nil, err } - pages.f.wfs.chunkCache.SetChunk(fileId, data) + pages.collection, pages.replication = collection, replication - return uploadResult.ToPbFileChunk(fileId, offset), nil + return chunk, nil } diff --git a/weed/filesys/file.go b/weed/filesys/file.go index 4a6bc9a8a..dcda93522 100644 --- a/weed/filesys/file.go +++ b/weed/filesys/file.go @@ -253,7 +253,7 @@ func (file *File) addChunks(chunks []*filer_pb.FileChunk) { func (file *File) setEntry(entry *filer_pb.Entry) { file.entry = entry - file.entryViewCache = filer2.NonOverlappingVisibleIntervals(file.entry.Chunks) + file.entryViewCache, _ = filer2.NonOverlappingVisibleIntervals(filer2.LookupFn(file.wfs), file.entry.Chunks) file.reader = nil } diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go index 9b9df916c..31fd08f97 100644 --- a/weed/filesys/filehandle.go +++ b/weed/filesys/filehandle.go @@ -88,8 +88,12 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) { return 0, nil } + var chunkResolveErr error if fh.f.entryViewCache == nil { - fh.f.entryViewCache = filer2.NonOverlappingVisibleIntervals(fh.f.entry.Chunks) + fh.f.entryViewCache, chunkResolveErr = filer2.NonOverlappingVisibleIntervals(filer2.LookupFn(fh.f.wfs), fh.f.entry.Chunks) + if chunkResolveErr != nil { + return 0, fmt.Errorf("fail to resolve chunk manifest: %v", chunkResolveErr) + } fh.f.reader = nil } @@ -206,7 +210,12 @@ func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error { glog.V(3).Infof("%s chunks %d: %v [%d,%d)", fh.f.fullpath(), i, chunk.FileId, chunk.Offset, chunk.Offset+int64(chunk.Size)) } - chunks, garbages := filer2.CompactFileChunks(fh.f.entry.Chunks) + chunks, garbages := filer2.CompactFileChunks(filer2.LookupFn(fh.f.wfs), fh.f.entry.Chunks) + chunks, manifestErr := filer2.MaybeManifestize(fh.f.wfs.saveDataAsChunk(fh.f.dir.FullPath()), chunks) + if manifestErr != nil { + // not good, but should be ok + glog.V(0).Infof("MaybeManifestize: %v", manifestErr) + } fh.f.entry.Chunks = chunks // fh.f.entryViewCache = nil diff --git a/weed/filesys/wfs_write.go b/weed/filesys/wfs_write.go new file mode 100644 index 000000000..786d0b42a --- /dev/null +++ b/weed/filesys/wfs_write.go @@ -0,0 +1,66 @@ +package filesys + +import ( + "context" + "fmt" + "io" + + "github.com/chrislusf/seaweedfs/weed/filer2" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/security" +) + +func (wfs *WFS) saveDataAsChunk(dir string) filer2.SaveDataAsChunkFunctionType { + + return func(reader io.Reader, filename string, offset int64) (chunk *filer_pb.FileChunk, collection, replication string, err error) { + var fileId, host string + var auth security.EncodedJwt + + if err := wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + + request := &filer_pb.AssignVolumeRequest{ + Count: 1, + Replication: wfs.option.Replication, + Collection: wfs.option.Collection, + TtlSec: wfs.option.TtlSec, + DataCenter: wfs.option.DataCenter, + ParentPath: dir, + } + + resp, err := client.AssignVolume(context.Background(), request) + if err != nil { + glog.V(0).Infof("assign volume failure %v: %v", request, err) + return err + } + if resp.Error != "" { + return fmt.Errorf("assign volume failure %v: %v", request, resp.Error) + } + + fileId, host, auth = resp.FileId, resp.Url, security.EncodedJwt(resp.Auth) + host = wfs.AdjustedUrl(host) + collection, replication = resp.Collection, resp.Replication + + return nil + }); err != nil { + return nil, "", "", fmt.Errorf("filerGrpcAddress assign volume: %v", err) + } + + fileUrl := fmt.Sprintf("http://%s/%s", host, fileId) + uploadResult, err, data := operation.Upload(fileUrl, filename, wfs.option.Cipher, reader, false, "", nil, auth) + if err != nil { + glog.V(0).Infof("upload data %v to %s: %v", filename, fileUrl, err) + return nil, "", "", fmt.Errorf("upload data: %v", err) + } + if uploadResult.Error != "" { + glog.V(0).Infof("upload failure %v to %s: %v", filename, fileUrl, err) + return nil, "", "", fmt.Errorf("upload result: %v", uploadResult.Error) + } + + wfs.chunkCache.SetChunk(fileId, data) + + chunk = uploadResult.ToPbFileChunk(fileId, offset) + return chunk, "", "", nil + } +} diff --git a/weed/replication/sink/azuresink/azure_sink.go b/weed/replication/sink/azuresink/azure_sink.go index aef97c06e..fa229de22 100644 --- a/weed/replication/sink/azuresink/azure_sink.go +++ b/weed/replication/sink/azuresink/azure_sink.go @@ -96,7 +96,7 @@ func (g *AzureSink) CreateEntry(key string, entry *filer_pb.Entry) error { } totalSize := filer2.TotalSize(entry.Chunks) - chunkViews := filer2.ViewFromChunks(entry.Chunks, 0, int64(totalSize)) + chunkViews := filer2.ViewFromChunks(g.filerSource.LookupFileId, entry.Chunks, 0, int64(totalSize)) // Create a URL that references a to-be-created blob in your // Azure Storage account's container. diff --git a/weed/replication/sink/b2sink/b2_sink.go b/weed/replication/sink/b2sink/b2_sink.go index 1e7d82ed4..bf8632827 100644 --- a/weed/replication/sink/b2sink/b2_sink.go +++ b/weed/replication/sink/b2sink/b2_sink.go @@ -85,7 +85,7 @@ func (g *B2Sink) CreateEntry(key string, entry *filer_pb.Entry) error { } totalSize := filer2.TotalSize(entry.Chunks) - chunkViews := filer2.ViewFromChunks(entry.Chunks, 0, int64(totalSize)) + chunkViews := filer2.ViewFromChunks(g.filerSource.LookupFileId, entry.Chunks, 0, int64(totalSize)) bucket, err := g.client.Bucket(context.Background(), g.bucket) if err != nil { diff --git a/weed/replication/sink/filersink/filer_sink.go b/weed/replication/sink/filersink/filer_sink.go index 50721a8f3..6429859b4 100644 --- a/weed/replication/sink/filersink/filer_sink.go +++ b/weed/replication/sink/filersink/filer_sink.go @@ -167,12 +167,15 @@ func (fs *FilerSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParent glog.V(0).Infof("already replicated %s", key) } else { // find out what changed - deletedChunks, newChunks := compareChunks(oldEntry, newEntry) + deletedChunks, newChunks, err := compareChunks(filer2.LookupFn(fs), oldEntry, newEntry) + if err != nil { + return true, fmt.Errorf("replicte %s compare chunks error: %v", key, err) + } // delete the chunks that are deleted from the source if deleteIncludeChunks { // remove the deleted chunks. Actual data deletion happens in filer UpdateEntry FindUnusedFileChunks - existingEntry.Chunks = filer2.MinusChunks(existingEntry.Chunks, deletedChunks) + existingEntry.Chunks = filer2.DoMinusChunks(existingEntry.Chunks, deletedChunks) } // replicate the chunks that are new in the source @@ -200,8 +203,21 @@ func (fs *FilerSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParent }) } -func compareChunks(oldEntry, newEntry *filer_pb.Entry) (deletedChunks, newChunks []*filer_pb.FileChunk) { - deletedChunks = filer2.MinusChunks(oldEntry.Chunks, newEntry.Chunks) - newChunks = filer2.MinusChunks(newEntry.Chunks, oldEntry.Chunks) +func compareChunks(lookupFileIdFn filer2.LookupFileIdFunctionType, oldEntry, newEntry *filer_pb.Entry) (deletedChunks, newChunks []*filer_pb.FileChunk, err error) { + aData, aMeta, aErr := filer2.ResolveChunkManifest(lookupFileIdFn, oldEntry.Chunks) + if aErr != nil { + return nil, nil, aErr + } + bData, bMeta, bErr := filer2.ResolveChunkManifest(lookupFileIdFn, newEntry.Chunks) + if bErr != nil { + return nil, nil, bErr + } + + deletedChunks = append(deletedChunks, filer2.DoMinusChunks(aData, bData)...) + deletedChunks = append(deletedChunks, filer2.DoMinusChunks(aMeta, bMeta)...) + + newChunks = append(newChunks, filer2.DoMinusChunks(bData, aData)...) + newChunks = append(newChunks, filer2.DoMinusChunks(bMeta, aMeta)...) + return } diff --git a/weed/replication/sink/gcssink/gcs_sink.go b/weed/replication/sink/gcssink/gcs_sink.go index bb5a54272..4b58160db 100644 --- a/weed/replication/sink/gcssink/gcs_sink.go +++ b/weed/replication/sink/gcssink/gcs_sink.go @@ -90,7 +90,7 @@ func (g *GcsSink) CreateEntry(key string, entry *filer_pb.Entry) error { } totalSize := filer2.TotalSize(entry.Chunks) - chunkViews := filer2.ViewFromChunks(entry.Chunks, 0, int64(totalSize)) + chunkViews := filer2.ViewFromChunks(g.filerSource.LookupFileId, entry.Chunks, 0, int64(totalSize)) wc := g.client.Bucket(g.bucket).Object(key).NewWriter(context.Background()) diff --git a/weed/replication/sink/s3sink/s3_sink.go b/weed/replication/sink/s3sink/s3_sink.go index d7af105b8..625cf406c 100644 --- a/weed/replication/sink/s3sink/s3_sink.go +++ b/weed/replication/sink/s3sink/s3_sink.go @@ -108,7 +108,7 @@ func (s3sink *S3Sink) CreateEntry(key string, entry *filer_pb.Entry) error { } totalSize := filer2.TotalSize(entry.Chunks) - chunkViews := filer2.ViewFromChunks(entry.Chunks, 0, int64(totalSize)) + chunkViews := filer2.ViewFromChunks(s3sink.filerSource.LookupFileId, entry.Chunks, 0, int64(totalSize)) parts := make([]*s3.CompletedPart, len(chunkViews)) diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go index 17e32731c..48e9253f0 100644 --- a/weed/server/filer_grpc_server.go +++ b/weed/server/filer_grpc_server.go @@ -14,6 +14,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/util" ) @@ -137,13 +138,28 @@ func (fs *FilerServer) LookupVolume(ctx context.Context, req *filer_pb.LookupVol return resp, nil } +func (fs *FilerServer) lookupFileId(fileId string) (targetUrl string, err error) { + fid, err := needle.ParseFileIdFromString(fileId) + if err != nil { + return "", err + } + locations, found := fs.filer.MasterClient.GetLocations(uint32(fid.VolumeId)) + if !found || len(locations) == 0 { + return "", fmt.Errorf("not found volume %d in %s", fid.VolumeId, fileId) + } + return fmt.Sprintf("http://%s/%s", locations[0].Url, fileId), nil +} + func (fs *FilerServer) CreateEntry(ctx context.Context, req *filer_pb.CreateEntryRequest) (resp *filer_pb.CreateEntryResponse, err error) { glog.V(4).Infof("CreateEntry %v", req) resp = &filer_pb.CreateEntryResponse{} - chunks, garbages := filer2.CompactFileChunks(req.Entry.Chunks) + chunks, garbage, err2 := fs.cleanupChunks(nil, req.Entry) + if err2 != nil { + return &filer_pb.CreateEntryResponse{}, fmt.Errorf("CreateEntry cleanupChunks %s %s: %v", req.Directory, req.Entry.Name, err2) + } if req.Entry.Attributes == nil { glog.V(3).Infof("CreateEntry %s: nil attributes", filepath.Join(req.Directory, req.Entry.Name)) @@ -158,7 +174,7 @@ func (fs *FilerServer) CreateEntry(ctx context.Context, req *filer_pb.CreateEntr }, req.OExcl, req.IsFromOtherCluster) if createErr == nil { - fs.filer.DeleteChunks(garbages) + fs.filer.DeleteChunks(garbage) } else { glog.V(3).Infof("CreateEntry %s: %v", filepath.Join(req.Directory, req.Entry.Name), createErr) resp.Error = createErr.Error() @@ -177,10 +193,10 @@ func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntr return &filer_pb.UpdateEntryResponse{}, fmt.Errorf("not found %s: %v", fullpath, err) } - // remove old chunks if not included in the new ones - unusedChunks := filer2.MinusChunks(entry.Chunks, req.Entry.Chunks) - - chunks, garbages := filer2.CompactFileChunks(req.Entry.Chunks) + chunks, garbage, err2 := fs.cleanupChunks(entry, req.Entry) + if err2 != nil { + return &filer_pb.UpdateEntryResponse{}, fmt.Errorf("UpdateEntry cleanupChunks %s: %v", fullpath, err2) + } newEntry := &filer2.Entry{ FullPath: util.JoinPath(req.Directory, req.Entry.Name), @@ -214,8 +230,7 @@ func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntr } if err = fs.filer.UpdateEntry(ctx, entry, newEntry); err == nil { - fs.filer.DeleteChunks(unusedChunks) - fs.filer.DeleteChunks(garbages) + fs.filer.DeleteChunks(garbage) } else { glog.V(3).Infof("UpdateEntry %s: %v", filepath.Join(req.Directory, req.Entry.Name), err) } @@ -225,6 +240,37 @@ func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntr return &filer_pb.UpdateEntryResponse{}, err } +func (fs *FilerServer) cleanupChunks(existingEntry *filer2.Entry, newEntry *filer_pb.Entry) (chunks, garbage []*filer_pb.FileChunk, err error) { + chunks = newEntry.Chunks + + // remove old chunks if not included in the new ones + if existingEntry != nil { + garbage, err = filer2.MinusChunks(fs.lookupFileId, existingEntry.Chunks, newEntry.Chunks) + if err != nil { + return chunks, nil, fmt.Errorf("MinusChunks: %v", err) + } + } + + // files with manifest chunks are usually large and append only, skip calculating covered chunks + var coveredChunks []*filer_pb.FileChunk + if !filer2.HasChunkManifest(newEntry.Chunks) { + chunks, coveredChunks = filer2.CompactFileChunks(fs.lookupFileId, newEntry.Chunks) + garbage = append(garbage, coveredChunks...) + } + + chunks, err = filer2.MaybeManifestize(fs.saveAsChunk( + newEntry.Attributes.Replication, + newEntry.Attributes.Collection, + "", + needle.SecondsToTTL(newEntry.Attributes.TtlSec), + false), chunks) + if err != nil { + // not good, but should be ok + glog.V(0).Infof("MaybeManifestize: %v", err) + } + return +} + func (fs *FilerServer) AppendToEntry(ctx context.Context, req *filer_pb.AppendToEntryRequest) (*filer_pb.AppendToEntryResponse, error) { glog.V(4).Infof("AppendToEntry %v", req) @@ -254,6 +300,17 @@ func (fs *FilerServer) AppendToEntry(ctx context.Context, req *filer_pb.AppendTo entry.Chunks = append(entry.Chunks, req.Chunks...) + entry.Chunks, err = filer2.MaybeManifestize(fs.saveAsChunk( + entry.Replication, + entry.Collection, + "", + needle.SecondsToTTL(entry.TtlSec), + false), entry.Chunks) + if err != nil { + // not good, but should be ok + glog.V(0).Infof("MaybeManifestize: %v", err) + } + err = fs.filer.CreateEntry(context.Background(), entry, false, false) return &filer_pb.AppendToEntryResponse{}, err diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go index a642c502a..da66178ce 100644 --- a/weed/server/filer_server_handlers_write.go +++ b/weed/server/filer_server_handlers_write.go @@ -40,7 +40,7 @@ type FilerPostResult struct { Url string `json:"url,omitempty"` } -func (fs *FilerServer) assignNewFileInfo(w http.ResponseWriter, r *http.Request, replication, collection, dataCenter, ttlString string, fsync bool) (fileId, urlLocation string, auth security.EncodedJwt, err error) { +func (fs *FilerServer) assignNewFileInfo(replication, collection, dataCenter, ttlString string, fsync bool) (fileId, urlLocation string, auth security.EncodedJwt, err error) { stats.FilerRequestCounter.WithLabelValues("assign").Inc() start := time.Now() @@ -67,7 +67,6 @@ func (fs *FilerServer) assignNewFileInfo(w http.ResponseWriter, r *http.Request, assignResult, ae := operation.Assign(fs.filer.GetMaster(), fs.grpcDialOption, ar, altRequest) if ae != nil { glog.Errorf("failing to assign a file id: %v", ae) - writeJsonError(w, r, http.StatusInternalServerError, ae) err = ae return } @@ -114,7 +113,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { return } - fileId, urlLocation, auth, err := fs.assignNewFileInfo(w, r, replication, collection, dataCenter, ttlString, fsync) + fileId, urlLocation, auth, err := fs.assignNewFileInfo(replication, collection, dataCenter, ttlString, fsync) if err != nil || fileId == "" || urlLocation == "" { glog.V(0).Infof("fail to allocate volume for %s, collection:%s, datacenter:%s", r.URL.Path, collection, dataCenter) diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go index 29546542c..be0438efb 100644 --- a/weed/server/filer_server_handlers_write_autochunk.go +++ b/weed/server/filer_server_handlers_write_autochunk.go @@ -102,7 +102,7 @@ func (fs *FilerServer) doAutoChunk(ctx context.Context, w http.ResponseWriter, r limitedReader := io.LimitReader(partReader, int64(chunkSize)) // assign one file id for one chunk - fileId, urlLocation, auth, assignErr := fs.assignNewFileInfo(w, r, replication, collection, dataCenter, ttlString, fsync) + fileId, urlLocation, auth, assignErr := fs.assignNewFileInfo(replication, collection, dataCenter, ttlString, fsync) if assignErr != nil { return nil, assignErr } @@ -132,6 +132,12 @@ func (fs *FilerServer) doAutoChunk(ctx context.Context, w http.ResponseWriter, r } } + fileChunks, replyerr = filer2.MaybeManifestize(fs.saveAsChunk(replication, collection, dataCenter, ttlString, fsync), fileChunks) + if replyerr != nil { + glog.V(0).Infof("manifestize %s: %v", r.RequestURI, replyerr) + return + } + path := r.URL.Path if strings.HasSuffix(path, "/") { if fileName != "" { @@ -184,3 +190,23 @@ func (fs *FilerServer) doUpload(urlLocation string, w http.ResponseWriter, r *ht uploadResult, err, _ := operation.Upload(urlLocation, fileName, fs.option.Cipher, limitedReader, false, contentType, pairMap, auth) return uploadResult, err } + +func (fs *FilerServer) saveAsChunk(replication string, collection string, dataCenter string, ttlString string, fsync bool) filer2.SaveDataAsChunkFunctionType { + + return func(reader io.Reader, name string, offset int64) (*filer_pb.FileChunk, string, string, error) { + // assign one file id for one chunk + fileId, urlLocation, auth, assignErr := fs.assignNewFileInfo(replication, collection, dataCenter, ttlString, fsync) + if assignErr != nil { + return nil, "", "", assignErr + } + + // upload the chunk to the volume server + uploadResult, uploadErr, _ := operation.Upload(urlLocation, name, fs.option.Cipher, reader, false, "", nil, auth) + if uploadErr != nil { + return nil, "", "", uploadErr + } + + return uploadResult.ToPbFileChunk(fileId, offset), collection, replication, nil + } +} + diff --git a/weed/server/filer_server_handlers_write_cipher.go b/weed/server/filer_server_handlers_write_cipher.go index 17f35838d..8413496b8 100644 --- a/weed/server/filer_server_handlers_write_cipher.go +++ b/weed/server/filer_server_handlers_write_cipher.go @@ -19,7 +19,7 @@ import ( func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *http.Request, replication string, collection string, dataCenter string, ttlSeconds int32, ttlString string, fsync bool) (filerResult *FilerPostResult, err error) { - fileId, urlLocation, auth, err := fs.assignNewFileInfo(w, r, replication, collection, dataCenter, ttlString, fsync) + fileId, urlLocation, auth, err := fs.assignNewFileInfo(replication, collection, dataCenter, ttlString, fsync) if err != nil || fileId == "" || urlLocation == "" { return nil, fmt.Errorf("fail to allocate volume for %s, collection:%s, datacenter:%s", r.URL.Path, collection, dataCenter) diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go index e8bedd352..8655daf70 100644 --- a/weed/server/webdav_server.go +++ b/weed/server/webdav_server.go @@ -474,7 +474,7 @@ func (f *WebDavFile) Read(p []byte) (readSize int, err error) { return 0, io.EOF } if f.entryViewCache == nil { - f.entryViewCache = filer2.NonOverlappingVisibleIntervals(f.entry.Chunks) + f.entryViewCache, _ = filer2.NonOverlappingVisibleIntervals(filer2.LookupFn(f.fs), f.entry.Chunks) f.reader = nil } if f.reader == nil { diff --git a/weed/storage/needle/volume_ttl.go b/weed/storage/needle/volume_ttl.go index 179057876..26ce3b8fd 100644 --- a/weed/storage/needle/volume_ttl.go +++ b/weed/storage/needle/volume_ttl.go @@ -1,11 +1,12 @@ package needle import ( + "fmt" "strconv" ) const ( - //stored unit types + // stored unit types Empty byte = iota Minute Hour @@ -139,3 +140,10 @@ func (t TTL) Minutes() uint32 { } return 0 } + +func SecondsToTTL(seconds int32) string { + if seconds == 0 { + return "" + } + return fmt.Sprintf("%dm", seconds/60) +} From 60d14a9800d8513ae0849a764d66ce80795dadbe Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 20 Jul 2020 00:17:50 -0700 Subject: [PATCH 02/14] mount: fix difference with storage format in local cache --- weed/filesys/meta_cache/meta_cache.go | 23 +++++++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/weed/filesys/meta_cache/meta_cache.go b/weed/filesys/meta_cache/meta_cache.go index fdb486ba4..edf329143 100644 --- a/weed/filesys/meta_cache/meta_cache.go +++ b/weed/filesys/meta_cache/meta_cache.go @@ -8,10 +8,14 @@ import ( "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/filer2/leveldb" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util/bounded_tree" ) +// need to have logic similar to FilerStoreWrapper +// e.g. fill fileId field for chunks + type MetaCache struct { actualStore filer2.FilerStore sync.RWMutex @@ -46,6 +50,7 @@ func openMetaStore(dbFolder string) filer2.FilerStore { func (mc *MetaCache) InsertEntry(ctx context.Context, entry *filer2.Entry) error { mc.Lock() defer mc.Unlock() + filer_pb.BeforeEntrySerialization(entry.Chunks) return mc.actualStore.InsertEntry(ctx, entry) } @@ -78,13 +83,19 @@ func (mc *MetaCache) AtomicUpdateEntry(ctx context.Context, oldPath util.FullPat func (mc *MetaCache) UpdateEntry(ctx context.Context, entry *filer2.Entry) error { mc.Lock() defer mc.Unlock() + filer_pb.BeforeEntrySerialization(entry.Chunks) return mc.actualStore.UpdateEntry(ctx, entry) } func (mc *MetaCache) FindEntry(ctx context.Context, fp util.FullPath) (entry *filer2.Entry, err error) { mc.RLock() defer mc.RUnlock() - return mc.actualStore.FindEntry(ctx, fp) + entry, err = mc.actualStore.FindEntry(ctx, fp) + if err != nil { + return nil, err + } + filer_pb.AfterEntryDeserialization(entry.Chunks) + return } func (mc *MetaCache) DeleteEntry(ctx context.Context, fp util.FullPath) (err error) { @@ -96,7 +107,15 @@ func (mc *MetaCache) DeleteEntry(ctx context.Context, fp util.FullPath) (err err func (mc *MetaCache) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int) ([]*filer2.Entry, error) { mc.RLock() defer mc.RUnlock() - return mc.actualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit) + + entries, err := mc.actualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit) + if err != nil { + return nil, err + } + for _, entry := range entries { + filer_pb.AfterEntryDeserialization(entry.Chunks) + } + return entries, err } func (mc *MetaCache) Shutdown() { From 1d724ab23755704f2e5058f9c81e1de134fb5fb0 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 20 Jul 2020 03:29:17 -0700 Subject: [PATCH 03/14] hdfs: support read write chunk manifest --- .../seaweedfs/client/FileChunkManifest.java | 134 ++++++++++++++++++ .../java/seaweedfs/client/FilerClient.java | 9 +- .../java/seaweedfs/client/SeaweedRead.java | 12 +- .../java/seaweedfs/client/SeaweedWrite.java | 42 ++++-- .../seaweedfs/client/SeaweedReadTest.java | 5 +- .../java/seaweed/hdfs/SeaweedInputStream.java | 7 +- .../seaweed/hdfs/SeaweedOutputStream.java | 2 +- .../java/seaweed/hdfs/SeaweedInputStream.java | 2 +- 8 files changed, 180 insertions(+), 33 deletions(-) create mode 100644 other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java diff --git a/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java b/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java new file mode 100644 index 000000000..3cac46db9 --- /dev/null +++ b/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java @@ -0,0 +1,134 @@ +package seaweedfs.client; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public class FileChunkManifest { + + private static final Logger LOG = LoggerFactory.getLogger(FileChunkManifest.class); + + private static final int mergeFactor = 3; + + public static boolean hasChunkManifest(List chunks) { + for (FilerProto.FileChunk chunk : chunks) { + if (chunk.getIsChunkManifest()) { + return true; + } + } + return false; + } + + public static List resolveChunkManifest( + final FilerGrpcClient filerGrpcClient, List chunks) throws IOException { + + List dataChunks = new ArrayList<>(); + + for (FilerProto.FileChunk chunk : chunks) { + if (!chunk.getIsChunkManifest()) { + dataChunks.add(chunk); + continue; + } + + // IsChunkManifest + LOG.debug("fetching chunk manifest:{}", chunk); + byte[] data = fetchChunk(filerGrpcClient, chunk); + FilerProto.FileChunkManifest m = FilerProto.FileChunkManifest.newBuilder().mergeFrom(data).build(); + List resolvedChunks = new ArrayList<>(); + for (FilerProto.FileChunk t : m.getChunksList()) { + // avoid deprecated chunk.getFileId() + resolvedChunks.add(t.toBuilder().setFileId(FilerClient.toFileId(t.getFid())).build()); + } + dataChunks.addAll(resolveChunkManifest(filerGrpcClient, resolvedChunks)); + } + + return dataChunks; + } + + private static byte[] fetchChunk(final FilerGrpcClient filerGrpcClient, FilerProto.FileChunk chunk) throws IOException { + + FilerProto.LookupVolumeRequest.Builder lookupRequest = FilerProto.LookupVolumeRequest.newBuilder(); + String vid = "" + chunk.getFid().getVolumeId(); + lookupRequest.addVolumeIds(vid); + FilerProto.LookupVolumeResponse lookupResponse = filerGrpcClient + .getBlockingStub().lookupVolume(lookupRequest.build()); + Map vid2Locations = lookupResponse.getLocationsMapMap(); + FilerProto.Locations locations = vid2Locations.get(vid); + + SeaweedRead.ChunkView chunkView = new SeaweedRead.ChunkView( + FilerClient.toFileId(chunk.getFid()), // avoid deprecated chunk.getFileId() + 0, + -1, + 0, + true, + chunk.getCipherKey().toByteArray(), + chunk.getIsCompressed()); + + byte[] chunkData = SeaweedRead.chunkCache.getChunk(chunkView.fileId); + if (chunkData == null) { + LOG.debug("doFetchFullChunkData:{}", chunkView); + chunkData = SeaweedRead.doFetchFullChunkData(chunkView, locations); + } + LOG.debug("chunk {} size {}", chunkView.fileId, chunkData.length); + SeaweedRead.chunkCache.setChunk(chunkView.fileId, chunkData); + + return chunkData; + + } + + public static List maybeManifestize( + final FilerGrpcClient filerGrpcClient, List inputChunks) throws IOException { + // the return variable + List chunks = new ArrayList<>(); + + List dataChunks = new ArrayList<>(); + for (FilerProto.FileChunk chunk : inputChunks) { + if (!chunk.getIsChunkManifest()) { + dataChunks.add(chunk); + } else { + chunks.add(chunk); + } + } + + int remaining = dataChunks.size(); + for (int i = 0; i + mergeFactor < dataChunks.size(); i += mergeFactor) { + FilerProto.FileChunk chunk = mergeIntoManifest(filerGrpcClient, dataChunks.subList(i, i + mergeFactor)); + chunks.add(chunk); + remaining -= mergeFactor; + } + + // remaining + for (int i = dataChunks.size() - remaining; i < dataChunks.size(); i++) { + chunks.add(dataChunks.get(i)); + } + return chunks; + } + + private static FilerProto.FileChunk mergeIntoManifest(final FilerGrpcClient filerGrpcClient, List dataChunks) throws IOException { + // create and serialize the manifest + FilerProto.FileChunkManifest.Builder m = FilerProto.FileChunkManifest.newBuilder().addAllChunks(dataChunks); + byte[] data = m.build().toByteArray(); + + long minOffset = Long.MAX_VALUE; + long maxOffset = -1; + for (FilerProto.FileChunk chunk : dataChunks) { + minOffset = Math.min(minOffset, chunk.getOffset()); + maxOffset = Math.max(maxOffset, chunk.getSize() + chunk.getOffset()); + } + + FilerProto.FileChunk.Builder manifestChunk = SeaweedWrite.writeChunk( + filerGrpcClient.getReplication(), + filerGrpcClient, + minOffset, + data, 0, data.length); + manifestChunk.setIsChunkManifest(true); + manifestChunk.setSize(maxOffset - minOffset); + return manifestChunk.build(); + + } + +} diff --git a/other/java/client/src/main/java/seaweedfs/client/FilerClient.java b/other/java/client/src/main/java/seaweedfs/client/FilerClient.java index 2103fc699..0d087d5b4 100644 --- a/other/java/client/src/main/java/seaweedfs/client/FilerClient.java +++ b/other/java/client/src/main/java/seaweedfs/client/FilerClient.java @@ -24,6 +24,10 @@ public class FilerClient { this.filerGrpcClient = filerGrpcClient; } + public static String toFileId(FilerProto.FileId fid) { + return String.format("%d,%d%x", fid.getVolumeId(), fid.getFileKey(), fid.getCookie()); + } + public boolean mkdirs(String path, int mode) { String currentUser = System.getProperty("user.name"); return mkdirs(path, mode, 0, 0, currentUser, new String[]{}); @@ -209,7 +213,6 @@ public class FilerClient { } } - public boolean createEntry(String parent, FilerProto.Entry entry) { try { filerGrpcClient.getBlockingStub().createEntry(FilerProto.CreateEntryRequest.newBuilder() @@ -279,9 +282,7 @@ public class FilerClient { entryBuilder.clearChunks(); for (FilerProto.FileChunk chunk : entry.getChunksList()) { FilerProto.FileChunk.Builder chunkBuilder = chunk.toBuilder(); - FilerProto.FileId fid = chunk.getFid(); - fileId = String.format("%d,%d%x", fid.getVolumeId(), fid.getFileKey(), fid.getCookie()); - chunkBuilder.setFileId(fileId); + chunkBuilder.setFileId(toFileId(chunk.getFid())); entryBuilder.addChunks(chunkBuilder); } return entryBuilder.build(); diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java index 301919919..fbc3296ae 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java @@ -2,16 +2,12 @@ package seaweedfs.client; import org.apache.http.HttpEntity; import org.apache.http.HttpHeaders; -import org.apache.http.HttpResponse; -import org.apache.http.client.HttpClient; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpGet; -import org.apache.http.impl.client.DefaultHttpClient; import org.apache.http.util.EntityUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.Closeable; import java.io.IOException; import java.util.*; @@ -77,7 +73,7 @@ public class SeaweedRead { return len; } - private static byte[] doFetchFullChunkData(ChunkView chunkView, FilerProto.Locations locations) throws IOException { + public static byte[] doFetchFullChunkData(ChunkView chunkView, FilerProto.Locations locations) throws IOException { HttpGet request = new HttpGet( String.format("http://%s/%s", locations.getLocations(0).getUrl(), chunkView.fileId)); @@ -138,7 +134,11 @@ public class SeaweedRead { return views; } - public static List nonOverlappingVisibleIntervals(List chunkList) { + public static List nonOverlappingVisibleIntervals( + final FilerGrpcClient filerGrpcClient, List chunkList) throws IOException { + + chunkList = FileChunkManifest.resolveChunkManifest(filerGrpcClient, chunkList); + FilerProto.FileChunk[] chunks = chunkList.toArray(new FilerProto.FileChunk[0]); Arrays.sort(chunks, new Comparator() { @Override diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java index e9819668c..fd54453a1 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java @@ -1,8 +1,6 @@ package seaweedfs.client; import com.google.protobuf.ByteString; -import org.apache.http.HttpResponse; -import org.apache.http.client.HttpClient; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpPost; import org.apache.http.entity.mime.HttpMultipartMode; @@ -10,10 +8,10 @@ import org.apache.http.entity.mime.MultipartEntityBuilder; import org.apache.http.util.EntityUtils; import java.io.ByteArrayInputStream; -import java.io.Closeable; import java.io.IOException; import java.io.InputStream; import java.security.SecureRandom; +import java.util.List; public class SeaweedWrite { @@ -25,6 +23,17 @@ public class SeaweedWrite { final long offset, final byte[] bytes, final long bytesOffset, final long bytesLength) throws IOException { + synchronized (entry) { + entry.addChunks(writeChunk(replication, filerGrpcClient, offset, bytes, bytesOffset, bytesLength)); + } + } + + public static FilerProto.FileChunk.Builder writeChunk(final String replication, + final FilerGrpcClient filerGrpcClient, + final long offset, + final byte[] bytes, + final long bytesOffset, + final long bytesLength) throws IOException { FilerProto.AssignVolumeResponse response = filerGrpcClient.getBlockingStub().assignVolume( FilerProto.AssignVolumeRequest.newBuilder() .setCollection(filerGrpcClient.getCollection()) @@ -46,25 +55,28 @@ public class SeaweedWrite { String etag = multipartUpload(targetUrl, auth, bytes, bytesOffset, bytesLength, cipherKey); - synchronized (entry) { - entry.addChunks(FilerProto.FileChunk.newBuilder() - .setFileId(fileId) - .setOffset(offset) - .setSize(bytesLength) - .setMtime(System.currentTimeMillis() / 10000L) - .setETag(etag) - .setCipherKey(cipherKeyString) - ); - } - // cache fileId ~ bytes SeaweedRead.chunkCache.setChunk(fileId, bytes); + return FilerProto.FileChunk.newBuilder() + .setFileId(fileId) + .setOffset(offset) + .setSize(bytesLength) + .setMtime(System.currentTimeMillis() / 10000L) + .setETag(etag) + .setCipherKey(cipherKeyString); } public static void writeMeta(final FilerGrpcClient filerGrpcClient, - final String parentDirectory, final FilerProto.Entry.Builder entry) { + final String parentDirectory, + final FilerProto.Entry.Builder entry) throws IOException { + + int chunkSize = entry.getChunksCount(); + List chunks = FileChunkManifest.maybeManifestize(filerGrpcClient, entry.getChunksList()); + synchronized (entry) { + entry.clearChunks(); + entry.addAllChunks(chunks); filerGrpcClient.getBlockingStub().createEntry( FilerProto.CreateEntryRequest.newBuilder() .setDirectory(parentDirectory) diff --git a/other/java/client/src/test/java/seaweedfs/client/SeaweedReadTest.java b/other/java/client/src/test/java/seaweedfs/client/SeaweedReadTest.java index ccfcdb117..44b833c90 100644 --- a/other/java/client/src/test/java/seaweedfs/client/SeaweedReadTest.java +++ b/other/java/client/src/test/java/seaweedfs/client/SeaweedReadTest.java @@ -3,13 +3,14 @@ package seaweedfs.client; import org.junit.Assert; import org.junit.Test; +import java.io.IOException; import java.util.ArrayList; import java.util.List; public class SeaweedReadTest { @Test - public void testNonOverlappingVisibleIntervals() { + public void testNonOverlappingVisibleIntervals() throws IOException { List chunks = new ArrayList<>(); chunks.add(FilerProto.FileChunk.newBuilder() .setFileId("aaa") @@ -24,7 +25,7 @@ public class SeaweedReadTest { .setMtime(2000) .build()); - List visibleIntervals = SeaweedRead.nonOverlappingVisibleIntervals(chunks); + List visibleIntervals = SeaweedRead.nonOverlappingVisibleIntervals(null, chunks); for (SeaweedRead.VisibleInterval visibleInterval : visibleIntervals) { System.out.println("visible:" + visibleInterval); } diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java index c26ad728f..6b3c72f7d 100644 --- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java +++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java @@ -2,7 +2,6 @@ package seaweed.hdfs; // based on org.apache.hadoop.fs.azurebfs.services.AbfsInputStream -import com.google.common.base.Preconditions; import org.apache.hadoop.fs.FSExceptionMessages; import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.fs.FileSystem.Statistics; @@ -37,7 +36,7 @@ public class SeaweedInputStream extends FSInputStream { final Statistics statistics, final String path, final FilerProto.Entry entry, - final int bufferSize) { + final int bufferSize) throws IOException { this.filerGrpcClient = filerGrpcClient; this.statistics = statistics; this.path = path; @@ -45,7 +44,7 @@ public class SeaweedInputStream extends FSInputStream { this.contentLength = SeaweedRead.totalSize(entry.getChunksList()); this.bufferSize = bufferSize; - this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(entry.getChunksList()); + this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(filerGrpcClient, entry.getChunksList()); LOG.debug("new path:{} entry:{} visibleIntervalList:{}", path, entry, visibleIntervalList); @@ -100,7 +99,7 @@ public class SeaweedInputStream extends FSInputStream { } } - return (int)bytesRead; + return (int) bytesRead; } diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java index 46de0c443..209e32d0b 100644 --- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java +++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java @@ -109,7 +109,7 @@ public class SeaweedOutputStream extends OutputStream { break; } - // System.out.println(path + " [" + (outputIndex + currentOffset) + "," + ((outputIndex + currentOffset) + writableBytes) + ")"); + // System.out.println(path + " [" + (outputIndex + currentOffset) + "," + ((outputIndex + currentOffset) + writableBytes) + ") " + buffer.capacity()); buffer.put(data, currentOffset, writableBytes); outputIndex += writableBytes; currentOffset += writableBytes; diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java index c26ad728f..10ec9b3cc 100644 --- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java +++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java @@ -45,7 +45,7 @@ public class SeaweedInputStream extends FSInputStream { this.contentLength = SeaweedRead.totalSize(entry.getChunksList()); this.bufferSize = bufferSize; - this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(entry.getChunksList()); + this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(filerGrpcClient, entry.getChunksList()); LOG.debug("new path:{} entry:{} visibleIntervalList:{}", path, entry, visibleIntervalList); From d02c0fe0c08cfd4a29d5b46ae8b310c4b73457e6 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 20 Jul 2020 03:34:06 -0700 Subject: [PATCH 04/14] refactoring --- weed/filer2/filechunk_manifest.go | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/weed/filer2/filechunk_manifest.go b/weed/filer2/filechunk_manifest.go index 92b114853..bab73c24a 100644 --- a/weed/filer2/filechunk_manifest.go +++ b/weed/filer2/filechunk_manifest.go @@ -13,6 +13,10 @@ import ( "github.com/chrislusf/seaweedfs/weed/util" ) +const ( + ManifestBatch = 10000 +) + func HasChunkManifest(chunks []*filer_pb.FileChunk) bool { for _, chunk := range chunks { if chunk.IsChunkManifest { @@ -51,10 +55,11 @@ func ResolveChunkManifest(lookupFileIdFn LookupFileIdFunctionType, chunks []*fil return } +// TODO fetch from cache for weed mount? func fetchChunk(lookupFileIdFn LookupFileIdFunctionType, fileId string, cipherKey []byte, isGzipped bool) ([]byte, error) { urlString, err := lookupFileIdFn(fileId) if err != nil { - glog.V(1).Infof("operation LookupFileId %s failed, err: %v", fileId, err) + glog.Errorf("operation LookupFileId %s failed, err: %v", fileId, err) return nil, err } var buffer bytes.Buffer @@ -69,8 +74,8 @@ func fetchChunk(lookupFileIdFn LookupFileIdFunctionType, fileId string, cipherKe return buffer.Bytes(), nil } -func MaybeManifestize(saveFunc SaveDataAsChunkFunctionType, dataChunks []*filer_pb.FileChunk) (chunks []*filer_pb.FileChunk, err error) { - return doMaybeManifestize(saveFunc, dataChunks, 10000, mergeIntoManifest) +func MaybeManifestize(saveFunc SaveDataAsChunkFunctionType, inputChunks []*filer_pb.FileChunk) (chunks []*filer_pb.FileChunk, err error) { + return doMaybeManifestize(saveFunc, inputChunks, ManifestBatch, mergeIntoManifest) } func doMaybeManifestize(saveFunc SaveDataAsChunkFunctionType, inputChunks []*filer_pb.FileChunk, mergeFactor int, mergefn func(saveFunc SaveDataAsChunkFunctionType, dataChunks []*filer_pb.FileChunk) (manifestChunk *filer_pb.FileChunk, err error)) (chunks []*filer_pb.FileChunk, err error) { @@ -84,15 +89,14 @@ func doMaybeManifestize(saveFunc SaveDataAsChunkFunctionType, inputChunks []*fil } } - manifestBatch := mergeFactor remaining := len(dataChunks) - for i := 0; i+manifestBatch <= len(dataChunks); i += manifestBatch { - chunk, err := mergefn(saveFunc, dataChunks[i:i+manifestBatch]) + for i := 0; i+mergeFactor <= len(dataChunks); i += mergeFactor { + chunk, err := mergefn(saveFunc, dataChunks[i:i+mergeFactor]) if err != nil { return dataChunks, err } chunks = append(chunks, chunk) - remaining -= manifestBatch + remaining -= mergeFactor } // remaining for i := len(dataChunks) - remaining; i < len(dataChunks); i++ { @@ -112,8 +116,7 @@ func mergeIntoManifest(saveFunc SaveDataAsChunkFunctionType, dataChunks []*filer } minOffset, maxOffset := int64(math.MaxInt64), int64(math.MinInt64) - for k := 0; k < len(dataChunks); k++ { - chunk := dataChunks[k] + for _, chunk := range dataChunks { if minOffset > int64(chunk.Offset) { minOffset = chunk.Offset } From 6debe3c3ecdcab350c4716f03c00d9ae9a0992af Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 20 Jul 2020 18:25:26 -0700 Subject: [PATCH 05/14] use 1000 as merge factor --- .../src/main/java/seaweedfs/client/FileChunkManifest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java b/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java index 3cac46db9..608deaf80 100644 --- a/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java +++ b/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java @@ -12,7 +12,7 @@ public class FileChunkManifest { private static final Logger LOG = LoggerFactory.getLogger(FileChunkManifest.class); - private static final int mergeFactor = 3; + private static final int mergeFactor = 1000; public static boolean hasChunkManifest(List chunks) { for (FilerProto.FileChunk chunk : chunks) { From aee6d893506020fe327642b1f7a106a2f0bd7fd1 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 20 Jul 2020 18:26:17 -0700 Subject: [PATCH 06/14] before writing and after reading file chunks --- .../seaweedfs/client/FileChunkManifest.java | 1 + .../java/seaweedfs/client/FilerClient.java | 81 ++++++++++++++----- 2 files changed, 61 insertions(+), 21 deletions(-) diff --git a/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java b/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java index 608deaf80..d8d29ede8 100644 --- a/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java +++ b/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java @@ -110,6 +110,7 @@ public class FileChunkManifest { private static FilerProto.FileChunk mergeIntoManifest(final FilerGrpcClient filerGrpcClient, List dataChunks) throws IOException { // create and serialize the manifest + dataChunks = FilerClient.beforeEntrySerialization(dataChunks); FilerProto.FileChunkManifest.Builder m = FilerProto.FileChunkManifest.newBuilder().addAllChunks(dataChunks); byte[] data = m.build().toByteArray(); diff --git a/other/java/client/src/main/java/seaweedfs/client/FilerClient.java b/other/java/client/src/main/java/seaweedfs/client/FilerClient.java index 0d087d5b4..468a95e28 100644 --- a/other/java/client/src/main/java/seaweedfs/client/FilerClient.java +++ b/other/java/client/src/main/java/seaweedfs/client/FilerClient.java @@ -25,7 +25,64 @@ public class FilerClient { } public static String toFileId(FilerProto.FileId fid) { - return String.format("%d,%d%x", fid.getVolumeId(), fid.getFileKey(), fid.getCookie()); + if (fid == null) { + return null; + } + return String.format("%d,%x%08x", fid.getVolumeId(), fid.getFileKey(), fid.getCookie()); + } + + public static FilerProto.FileId toFileIdObject(String fileIdStr) { + if (fileIdStr == null || fileIdStr.length() == 0) { + return null; + } + int commaIndex = fileIdStr.lastIndexOf(','); + String volumeIdStr = fileIdStr.substring(0, commaIndex); + String fileKeyStr = fileIdStr.substring(commaIndex + 1, fileIdStr.length() - 8); + String cookieStr = fileIdStr.substring(fileIdStr.length() - 8); + + return FilerProto.FileId.newBuilder() + .setVolumeId(Integer.parseInt(volumeIdStr)) + .setFileKey(Long.parseLong(fileKeyStr, 16)) + .setCookie((int) Long.parseLong(cookieStr, 16)) + .build(); + } + + public static List beforeEntrySerialization(List chunks) { + List cleanedChunks = new ArrayList<>(); + for (FilerProto.FileChunk chunk : chunks) { + FilerProto.FileChunk.Builder chunkBuilder = chunk.toBuilder(); + chunkBuilder.clearFileId(); + chunkBuilder.clearSourceFileId(); + chunkBuilder.setFid(toFileIdObject(chunk.getFileId())); + FilerProto.FileId sourceFid = toFileIdObject(chunk.getSourceFileId()); + if (sourceFid != null) { + chunkBuilder.setSourceFid(sourceFid); + } + cleanedChunks.add(chunkBuilder.build()); + } + return cleanedChunks; + } + + public static FilerProto.Entry afterEntryDeserialization(FilerProto.Entry entry) { + if (entry.getChunksList().size() <= 0) { + return entry; + } + String fileId = entry.getChunks(0).getFileId(); + if (fileId != null && fileId.length() != 0) { + return entry; + } + FilerProto.Entry.Builder entryBuilder = entry.toBuilder(); + entryBuilder.clearChunks(); + for (FilerProto.FileChunk chunk : entry.getChunksList()) { + FilerProto.FileChunk.Builder chunkBuilder = chunk.toBuilder(); + chunkBuilder.setFileId(toFileId(chunk.getFid())); + String sourceFileId = toFileId(chunk.getSourceFid()); + if (sourceFileId != null) { + chunkBuilder.setSourceFileId(sourceFileId); + } + entryBuilder.addChunks(chunkBuilder); + } + return entryBuilder.build(); } public boolean mkdirs(String path, int mode) { @@ -188,7 +245,7 @@ public class FilerClient { List entries = new ArrayList<>(); while (iter.hasNext()) { FilerProto.ListEntriesResponse resp = iter.next(); - entries.add(fixEntryAfterReading(resp.getEntry())); + entries.add(afterEntryDeserialization(resp.getEntry())); } return entries; } @@ -203,7 +260,7 @@ public class FilerClient { if (entry == null) { return null; } - return fixEntryAfterReading(entry); + return afterEntryDeserialization(entry); } catch (Exception e) { if (e.getMessage().indexOf("filer: no entry is found in filer store") > 0) { return null; @@ -270,22 +327,4 @@ public class FilerClient { return true; } - private FilerProto.Entry fixEntryAfterReading(FilerProto.Entry entry) { - if (entry.getChunksList().size() <= 0) { - return entry; - } - String fileId = entry.getChunks(0).getFileId(); - if (fileId != null && fileId.length() != 0) { - return entry; - } - FilerProto.Entry.Builder entryBuilder = entry.toBuilder(); - entryBuilder.clearChunks(); - for (FilerProto.FileChunk chunk : entry.getChunksList()) { - FilerProto.FileChunk.Builder chunkBuilder = chunk.toBuilder(); - chunkBuilder.setFileId(toFileId(chunk.getFid())); - entryBuilder.addChunks(chunkBuilder); - } - return entryBuilder.build(); - } - } From cacc601cc8dd8c6cae8a5d03921a5966306b909e Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 20 Jul 2020 18:26:48 -0700 Subject: [PATCH 07/14] ensure changing buffer size requirements --- .../java/seaweedfs/client/ByteBufferPool.java | 23 ++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/other/java/client/src/main/java/seaweedfs/client/ByteBufferPool.java b/other/java/client/src/main/java/seaweedfs/client/ByteBufferPool.java index 897fe9694..55f003a18 100644 --- a/other/java/client/src/main/java/seaweedfs/client/ByteBufferPool.java +++ b/other/java/client/src/main/java/seaweedfs/client/ByteBufferPool.java @@ -1,22 +1,39 @@ package seaweedfs.client; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; public class ByteBufferPool { - static List bufferList = new ArrayList<>(); + private static final int MIN_BUFFER_SIZE = 8 * 1024 * 1024; + private static final Logger LOG = LoggerFactory.getLogger(ByteBufferPool.class); + + private static final List bufferList = new ArrayList<>(); public static synchronized ByteBuffer request(int bufferSize) { + if (bufferSize < MIN_BUFFER_SIZE) { + bufferSize = MIN_BUFFER_SIZE; + } if (bufferList.isEmpty()) { return ByteBuffer.allocate(bufferSize); } - return bufferList.remove(bufferList.size()-1); + ByteBuffer buffer = bufferList.remove(bufferList.size() - 1); + if (buffer.capacity() >= bufferSize) { + return buffer; + } + + LOG.info("add new buffer from {} to {}", buffer.capacity(), bufferSize); + bufferList.add(0, buffer); + return ByteBuffer.allocate(bufferSize); + } public static synchronized void release(ByteBuffer obj) { - bufferList.add(obj); + bufferList.add(0, obj); } } From 0e341a2a9a43d7556e27ef3739123a378dde3a53 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 20 Jul 2020 18:26:59 -0700 Subject: [PATCH 08/14] error logging --- .../client/src/main/java/seaweedfs/client/SeaweedRead.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java index fbc3296ae..97da29447 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java @@ -40,7 +40,8 @@ public class SeaweedRead { int startOffset = bufferOffset; for (ChunkView chunkView : chunkViews) { FilerProto.Locations locations = vid2Locations.get(parseVolumeId(chunkView.fileId)); - if (locations.getLocationsCount() == 0) { + if (locations == null || locations.getLocationsCount() == 0) { + LOG.error("failed to locate {}", chunkView.fileId); // log here! return 0; } From b380b9e5d76c3c1b7170cf4abe6cdb5db71a48c0 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 20 Jul 2020 18:27:11 -0700 Subject: [PATCH 09/14] shared http client --- .../src/main/java/seaweedfs/client/SeaweedUtil.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedUtil.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedUtil.java index e2835b718..c465d935f 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedUtil.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedUtil.java @@ -9,19 +9,22 @@ import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; public class SeaweedUtil { static PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager(); + static CloseableHttpClient httpClient; static { // Increase max total connection to 200 cm.setMaxTotal(200); // Increase default max connection per route to 20 cm.setDefaultMaxPerRoute(20); - } - public static CloseableHttpClient getClosableHttpClient() { - return HttpClientBuilder.create() + httpClient = HttpClientBuilder.create() .setConnectionManager(cm) .setConnectionReuseStrategy(DefaultConnectionReuseStrategy.INSTANCE) .setKeepAliveStrategy(DefaultConnectionKeepAliveStrategy.INSTANCE) .build(); } + + public static CloseableHttpClient getClosableHttpClient() { + return httpClient; + } } From 0b2e06268b0d43c902754c34d3040092a7c83876 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 20 Jul 2020 18:30:09 -0700 Subject: [PATCH 10/14] use merge factor 1000 --- weed/filer2/filechunk_manifest.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/weed/filer2/filechunk_manifest.go b/weed/filer2/filechunk_manifest.go index bab73c24a..e7b4df1fe 100644 --- a/weed/filer2/filechunk_manifest.go +++ b/weed/filer2/filechunk_manifest.go @@ -14,7 +14,7 @@ import ( ) const ( - ManifestBatch = 10000 + ManifestBatch = 1000 ) func HasChunkManifest(chunks []*filer_pb.FileChunk) bool { From ae3e6d824499b499b3b53726e7f25751756456ae Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 20 Jul 2020 20:27:19 -0700 Subject: [PATCH 11/14] remove changing buffer size --- .../src/main/java/seaweed/hdfs/SeaweedFileSystem.java | 3 --- .../src/main/java/seaweed/hdfs/SeaweedOutputStream.java | 6 +++--- .../src/main/java/seaweed/hdfs/SeaweedFileSystem.java | 3 --- .../src/main/java/seaweed/hdfs/SeaweedInputStream.java | 5 ++--- .../src/main/java/seaweed/hdfs/SeaweedOutputStream.java | 8 ++++---- 5 files changed, 9 insertions(+), 16 deletions(-) diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java index 85490c181..2341d335d 100644 --- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java +++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java @@ -30,7 +30,6 @@ public class SeaweedFileSystem extends FileSystem { public static final String FS_SEAWEED_FILER_PORT = "fs.seaweed.filer.port"; private static final Logger LOG = LoggerFactory.getLogger(SeaweedFileSystem.class); - private static int BUFFER_SIZE = 16 * 1024 * 1024; private URI uri; private Path workingDirectory = new Path("/"); @@ -61,8 +60,6 @@ public class SeaweedFileSystem extends FileSystem { port = (port == -1) ? FS_SEAWEED_DEFAULT_PORT : port; conf.setInt(FS_SEAWEED_FILER_PORT, port); - conf.setInt(IO_FILE_BUFFER_SIZE_KEY, BUFFER_SIZE); - setConf(conf); this.uri = uri; diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java index 209e32d0b..d62d74fb1 100644 --- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java +++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java @@ -56,12 +56,12 @@ public class SeaweedOutputStream extends OutputStream { this.outputIndex = 0; this.writeOperations = new ConcurrentLinkedDeque<>(); - this.maxConcurrentRequestCount = 4 * Runtime.getRuntime().availableProcessors(); + this.maxConcurrentRequestCount = Runtime.getRuntime().availableProcessors(); this.threadExecutor = new ThreadPoolExecutor(maxConcurrentRequestCount, maxConcurrentRequestCount, - 10L, + 120L, TimeUnit.SECONDS, new LinkedBlockingQueue()); this.completionService = new ExecutorCompletionService<>(this.threadExecutor); @@ -180,7 +180,7 @@ public class SeaweedOutputStream extends OutputStream { bufferToWrite.flip(); int bytesLength = bufferToWrite.limit() - bufferToWrite.position(); - if (threadExecutor.getQueue().size() >= maxConcurrentRequestCount * 2) { + if (threadExecutor.getQueue().size() >= maxConcurrentRequestCount) { waitForTaskToComplete(); } final Future job = completionService.submit(() -> { diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java index 85490c181..2341d335d 100644 --- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java +++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java @@ -30,7 +30,6 @@ public class SeaweedFileSystem extends FileSystem { public static final String FS_SEAWEED_FILER_PORT = "fs.seaweed.filer.port"; private static final Logger LOG = LoggerFactory.getLogger(SeaweedFileSystem.class); - private static int BUFFER_SIZE = 16 * 1024 * 1024; private URI uri; private Path workingDirectory = new Path("/"); @@ -61,8 +60,6 @@ public class SeaweedFileSystem extends FileSystem { port = (port == -1) ? FS_SEAWEED_DEFAULT_PORT : port; conf.setInt(FS_SEAWEED_FILER_PORT, port); - conf.setInt(IO_FILE_BUFFER_SIZE_KEY, BUFFER_SIZE); - setConf(conf); this.uri = uri; diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java index 10ec9b3cc..6b3c72f7d 100644 --- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java +++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java @@ -2,7 +2,6 @@ package seaweed.hdfs; // based on org.apache.hadoop.fs.azurebfs.services.AbfsInputStream -import com.google.common.base.Preconditions; import org.apache.hadoop.fs.FSExceptionMessages; import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.fs.FileSystem.Statistics; @@ -37,7 +36,7 @@ public class SeaweedInputStream extends FSInputStream { final Statistics statistics, final String path, final FilerProto.Entry entry, - final int bufferSize) { + final int bufferSize) throws IOException { this.filerGrpcClient = filerGrpcClient; this.statistics = statistics; this.path = path; @@ -100,7 +99,7 @@ public class SeaweedInputStream extends FSInputStream { } } - return (int)bytesRead; + return (int) bytesRead; } diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedOutputStream.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedOutputStream.java index c602a0d81..05805b9e5 100644 --- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedOutputStream.java +++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedOutputStream.java @@ -60,12 +60,12 @@ public class SeaweedOutputStream extends OutputStream implements Syncable, Strea this.outputIndex = 0; this.writeOperations = new ConcurrentLinkedDeque<>(); - this.maxConcurrentRequestCount = 4 * Runtime.getRuntime().availableProcessors(); + this.maxConcurrentRequestCount = Runtime.getRuntime().availableProcessors(); this.threadExecutor = new ThreadPoolExecutor(maxConcurrentRequestCount, maxConcurrentRequestCount, - 10L, + 120L, TimeUnit.SECONDS, new LinkedBlockingQueue()); this.completionService = new ExecutorCompletionService<>(this.threadExecutor); @@ -113,7 +113,7 @@ public class SeaweedOutputStream extends OutputStream implements Syncable, Strea break; } - // System.out.println(path + " [" + (outputIndex + currentOffset) + "," + ((outputIndex + currentOffset) + writableBytes) + ")"); + // System.out.println(path + " [" + (outputIndex + currentOffset) + "," + ((outputIndex + currentOffset) + writableBytes) + ") " + buffer.capacity()); buffer.put(data, currentOffset, writableBytes); outputIndex += writableBytes; currentOffset += writableBytes; @@ -227,7 +227,7 @@ public class SeaweedOutputStream extends OutputStream implements Syncable, Strea bufferToWrite.flip(); int bytesLength = bufferToWrite.limit() - bufferToWrite.position(); - if (threadExecutor.getQueue().size() >= maxConcurrentRequestCount * 2) { + if (threadExecutor.getQueue().size() >= maxConcurrentRequestCount) { waitForTaskToComplete(); } final Future job = completionService.submit(() -> { From 64926d2345204e7100240f4d598641b4ecc10ee6 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 20 Jul 2020 21:18:58 -0700 Subject: [PATCH 12/14] disable ChunkCache --- .../java/client/src/main/java/seaweedfs/client/SeaweedRead.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java index 97da29447..f0490540d 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java @@ -15,7 +15,7 @@ public class SeaweedRead { private static final Logger LOG = LoggerFactory.getLogger(SeaweedRead.class); - static ChunkCache chunkCache = new ChunkCache(16); + static ChunkCache chunkCache = new ChunkCache(0); // returns bytesRead public static long read(FilerGrpcClient filerGrpcClient, List visibleIntervals, From 44057a4de18e4fdc9b9249d76ba8208654b507ee Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 20 Jul 2020 22:01:39 -0700 Subject: [PATCH 13/14] clean up chunks in manifest --- weed/filer2/filechunk_manifest.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/weed/filer2/filechunk_manifest.go b/weed/filer2/filechunk_manifest.go index e7b4df1fe..62d2c6e7f 100644 --- a/weed/filer2/filechunk_manifest.go +++ b/weed/filer2/filechunk_manifest.go @@ -45,6 +45,7 @@ func ResolveChunkManifest(lookupFileIdFn LookupFileIdFunctionType, chunks []*fil } manifestChunks = append(manifestChunks, chunk) // recursive + filer_pb.AfterEntryDeserialization(m.Chunks) dchunks, mchunks, subErr := ResolveChunkManifest(lookupFileIdFn, m.Chunks) if subErr != nil { return chunks, nil, subErr @@ -107,6 +108,8 @@ func doMaybeManifestize(saveFunc SaveDataAsChunkFunctionType, inputChunks []*fil func mergeIntoManifest(saveFunc SaveDataAsChunkFunctionType, dataChunks []*filer_pb.FileChunk) (manifestChunk *filer_pb.FileChunk, err error) { + filer_pb.BeforeEntrySerialization(dataChunks) + // create and serialize the manifest data, serErr := proto.Marshal(&filer_pb.FileChunkManifest{ Chunks: dataChunks, From 885c624bceb61688c806b91350e70d75088c6eea Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 20 Jul 2020 22:02:05 -0700 Subject: [PATCH 14/14] volume.fsck: follow manifest chunks --- weed/shell/command_volume_fsck.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/weed/shell/command_volume_fsck.go b/weed/shell/command_volume_fsck.go index 69a1a63b4..cf5ad6d6d 100644 --- a/weed/shell/command_volume_fsck.go +++ b/weed/shell/command_volume_fsck.go @@ -11,6 +11,7 @@ import ( "path/filepath" "sync" + "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" @@ -196,7 +197,12 @@ func (c *commandVolumeFsck) collectFilerFileIds(tempFolder string, volumeIdToSer files[i.vid].Write(buffer) } }, func(entry *filer_pb.FullEntry, outputChan chan interface{}) (err error) { - for _, chunk := range entry.Entry.Chunks { + dChunks, mChunks, resolveErr := filer2.ResolveChunkManifest(filer2.LookupFn(c.env), entry.Entry.Chunks) + if resolveErr != nil { + return nil + } + dChunks = append(dChunks, mChunks...) + for _, chunk := range dChunks { outputChan <- &Item{ vid: chunk.Fid.VolumeId, fileKey: chunk.Fid.FileKey,