diff --git a/weed/filer2/filechunk_manifest.go b/weed/filer2/filechunk_manifest.go new file mode 100644 index 000000000..92b114853 --- /dev/null +++ b/weed/filer2/filechunk_manifest.go @@ -0,0 +1,136 @@ +package filer2 + +import ( + "bytes" + "fmt" + "io" + "math" + + "github.com/golang/protobuf/proto" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" +) + +func HasChunkManifest(chunks []*filer_pb.FileChunk) bool { + for _, chunk := range chunks { + if chunk.IsChunkManifest { + return true + } + } + return false +} + +func ResolveChunkManifest(lookupFileIdFn LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (dataChunks, manifestChunks []*filer_pb.FileChunk, manefestResolveErr error) { + // TODO maybe parallel this + for _, chunk := range chunks { + if !chunk.IsChunkManifest { + dataChunks = append(dataChunks, chunk) + continue + } + + // IsChunkManifest + data, err := fetchChunk(lookupFileIdFn, chunk.FileId, chunk.CipherKey, chunk.IsCompressed) + if err != nil { + return chunks, nil, fmt.Errorf("fail to read manifest %s: %v", chunk.FileId, err) + } + m := &filer_pb.FileChunkManifest{} + if err := proto.Unmarshal(data, m); err != nil { + return chunks, nil, fmt.Errorf("fail to unmarshal manifest %s: %v", chunk.FileId, err) + } + manifestChunks = append(manifestChunks, chunk) + // recursive + dchunks, mchunks, subErr := ResolveChunkManifest(lookupFileIdFn, m.Chunks) + if subErr != nil { + return chunks, nil, subErr + } + dataChunks = append(dataChunks, dchunks...) + manifestChunks = append(manifestChunks, mchunks...) + } + return +} + +func fetchChunk(lookupFileIdFn LookupFileIdFunctionType, fileId string, cipherKey []byte, isGzipped bool) ([]byte, error) { + urlString, err := lookupFileIdFn(fileId) + if err != nil { + glog.V(1).Infof("operation LookupFileId %s failed, err: %v", fileId, err) + return nil, err + } + var buffer bytes.Buffer + err = util.ReadUrlAsStream(urlString, cipherKey, isGzipped, true, 0, 0, func(data []byte) { + buffer.Write(data) + }) + if err != nil { + glog.V(0).Infof("read %s failed, err: %v", fileId, err) + return nil, err + } + + return buffer.Bytes(), nil +} + +func MaybeManifestize(saveFunc SaveDataAsChunkFunctionType, dataChunks []*filer_pb.FileChunk) (chunks []*filer_pb.FileChunk, err error) { + return doMaybeManifestize(saveFunc, dataChunks, 10000, mergeIntoManifest) +} + +func doMaybeManifestize(saveFunc SaveDataAsChunkFunctionType, inputChunks []*filer_pb.FileChunk, mergeFactor int, mergefn func(saveFunc SaveDataAsChunkFunctionType, dataChunks []*filer_pb.FileChunk) (manifestChunk *filer_pb.FileChunk, err error)) (chunks []*filer_pb.FileChunk, err error) { + + var dataChunks []*filer_pb.FileChunk + for _, chunk := range inputChunks { + if !chunk.IsChunkManifest { + dataChunks = append(dataChunks, chunk) + } else { + chunks = append(chunks, chunk) + } + } + + manifestBatch := mergeFactor + remaining := len(dataChunks) + for i := 0; i+manifestBatch <= len(dataChunks); i += manifestBatch { + chunk, err := mergefn(saveFunc, dataChunks[i:i+manifestBatch]) + if err != nil { + return dataChunks, err + } + chunks = append(chunks, chunk) + remaining -= manifestBatch + } + // remaining + for i := len(dataChunks) - remaining; i < len(dataChunks); i++ { + chunks = append(chunks, dataChunks[i]) + } + return +} + +func mergeIntoManifest(saveFunc SaveDataAsChunkFunctionType, dataChunks []*filer_pb.FileChunk) (manifestChunk *filer_pb.FileChunk, err error) { + + // create and serialize the manifest + data, serErr := proto.Marshal(&filer_pb.FileChunkManifest{ + Chunks: dataChunks, + }) + if serErr != nil { + return nil, fmt.Errorf("serializing manifest: %v", serErr) + } + + minOffset, maxOffset := int64(math.MaxInt64), int64(math.MinInt64) + for k := 0; k < len(dataChunks); k++ { + chunk := dataChunks[k] + if minOffset > int64(chunk.Offset) { + minOffset = chunk.Offset + } + if maxOffset < int64(chunk.Size)+chunk.Offset { + maxOffset = int64(chunk.Size) + chunk.Offset + } + } + + manifestChunk, _, _, err = saveFunc(bytes.NewReader(data), "", 0) + if err != nil { + return nil, err + } + manifestChunk.IsChunkManifest = true + manifestChunk.Offset = minOffset + manifestChunk.Size = uint64(maxOffset - minOffset) + + return +} + +type SaveDataAsChunkFunctionType func(reader io.Reader, name string, offset int64) (chunk *filer_pb.FileChunk, collection, replication string, err error) diff --git a/weed/filer2/filechunk_manifest_test.go b/weed/filer2/filechunk_manifest_test.go new file mode 100644 index 000000000..2b0862d07 --- /dev/null +++ b/weed/filer2/filechunk_manifest_test.go @@ -0,0 +1,113 @@ +package filer2 + +import ( + "bytes" + "math" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" +) + +func TestDoMaybeManifestize(t *testing.T) { + var manifestTests = []struct { + inputs []*filer_pb.FileChunk + expected []*filer_pb.FileChunk + }{ + { + inputs: []*filer_pb.FileChunk{ + {FileId: "1", IsChunkManifest: false}, + {FileId: "2", IsChunkManifest: false}, + {FileId: "3", IsChunkManifest: false}, + {FileId: "4", IsChunkManifest: false}, + }, + expected: []*filer_pb.FileChunk{ + {FileId: "12", IsChunkManifest: true}, + {FileId: "34", IsChunkManifest: true}, + }, + }, + { + inputs: []*filer_pb.FileChunk{ + {FileId: "1", IsChunkManifest: true}, + {FileId: "2", IsChunkManifest: false}, + {FileId: "3", IsChunkManifest: false}, + {FileId: "4", IsChunkManifest: false}, + }, + expected: []*filer_pb.FileChunk{ + {FileId: "1", IsChunkManifest: true}, + {FileId: "23", IsChunkManifest: true}, + {FileId: "4", IsChunkManifest: false}, + }, + }, + { + inputs: []*filer_pb.FileChunk{ + {FileId: "1", IsChunkManifest: false}, + {FileId: "2", IsChunkManifest: true}, + {FileId: "3", IsChunkManifest: false}, + {FileId: "4", IsChunkManifest: false}, + }, + expected: []*filer_pb.FileChunk{ + {FileId: "2", IsChunkManifest: true}, + {FileId: "13", IsChunkManifest: true}, + {FileId: "4", IsChunkManifest: false}, + }, + }, + { + inputs: []*filer_pb.FileChunk{ + {FileId: "1", IsChunkManifest: true}, + {FileId: "2", IsChunkManifest: true}, + {FileId: "3", IsChunkManifest: false}, + {FileId: "4", IsChunkManifest: false}, + }, + expected: []*filer_pb.FileChunk{ + {FileId: "1", IsChunkManifest: true}, + {FileId: "2", IsChunkManifest: true}, + {FileId: "34", IsChunkManifest: true}, + }, + }, + } + + for i, mtest := range manifestTests { + println("test", i) + actual, _ := doMaybeManifestize(nil, mtest.inputs, 2, mockMerge) + assertEqualChunks(t, mtest.expected, actual) + } + +} + +func assertEqualChunks(t *testing.T, expected, actual []*filer_pb.FileChunk) { + assert.Equal(t, len(expected), len(actual)) + for i := 0; i < len(actual); i++ { + assertEqualChunk(t, actual[i], expected[i]) + } +} +func assertEqualChunk(t *testing.T, expected, actual *filer_pb.FileChunk) { + assert.Equal(t, expected.FileId, actual.FileId) + assert.Equal(t, expected.IsChunkManifest, actual.IsChunkManifest) +} + +func mockMerge(saveFunc SaveDataAsChunkFunctionType, dataChunks []*filer_pb.FileChunk) (manifestChunk *filer_pb.FileChunk, err error) { + + var buf bytes.Buffer + minOffset, maxOffset := int64(math.MaxInt64), int64(math.MinInt64) + for k := 0; k < len(dataChunks); k++ { + chunk := dataChunks[k] + buf.WriteString(chunk.FileId) + if minOffset > int64(chunk.Offset) { + minOffset = chunk.Offset + } + if maxOffset < int64(chunk.Size)+chunk.Offset { + maxOffset = int64(chunk.Size) + chunk.Offset + } + } + + manifestChunk = &filer_pb.FileChunk{ + FileId: buf.String(), + } + manifestChunk.IsChunkManifest = true + manifestChunk.Offset = minOffset + manifestChunk.Size = uint64(maxOffset - minOffset) + + return +} diff --git a/weed/filer2/filechunks.go b/weed/filer2/filechunks.go index 6832d0f31..ea7772b4a 100644 --- a/weed/filer2/filechunks.go +++ b/weed/filer2/filechunks.go @@ -46,9 +46,9 @@ func ETagChunks(chunks []*filer_pb.FileChunk) (etag string) { return fmt.Sprintf("%x", h.Sum32()) } -func CompactFileChunks(chunks []*filer_pb.FileChunk) (compacted, garbage []*filer_pb.FileChunk) { +func CompactFileChunks(lookupFileIdFn LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (compacted, garbage []*filer_pb.FileChunk) { - visibles := NonOverlappingVisibleIntervals(chunks) + visibles, _ := NonOverlappingVisibleIntervals(lookupFileIdFn, chunks) fileIds := make(map[string]bool) for _, interval := range visibles { @@ -65,7 +65,23 @@ func CompactFileChunks(chunks []*filer_pb.FileChunk) (compacted, garbage []*file return } -func MinusChunks(as, bs []*filer_pb.FileChunk) (delta []*filer_pb.FileChunk) { +func MinusChunks(lookupFileIdFn LookupFileIdFunctionType, as, bs []*filer_pb.FileChunk) (delta []*filer_pb.FileChunk, err error) { + + aData, aMeta, aErr := ResolveChunkManifest(lookupFileIdFn, as) + if aErr != nil { + return nil, aErr + } + bData, bMeta, bErr := ResolveChunkManifest(lookupFileIdFn, bs) + if bErr != nil { + return nil, bErr + } + + delta = append(delta, DoMinusChunks(aData, bData)...) + delta = append(delta, DoMinusChunks(aMeta, bMeta)...) + return +} + +func DoMinusChunks(as, bs []*filer_pb.FileChunk) (delta []*filer_pb.FileChunk) { fileIds := make(map[string]bool) for _, interval := range bs { @@ -94,9 +110,9 @@ func (cv *ChunkView) IsFullChunk() bool { return cv.Size == cv.ChunkSize } -func ViewFromChunks(chunks []*filer_pb.FileChunk, offset int64, size int64) (views []*ChunkView) { +func ViewFromChunks(lookupFileIdFn LookupFileIdFunctionType, chunks []*filer_pb.FileChunk, offset int64, size int64) (views []*ChunkView) { - visibles := NonOverlappingVisibleIntervals(chunks) + visibles, _ := NonOverlappingVisibleIntervals(lookupFileIdFn, chunks) return ViewFromVisibleIntervals(visibles, offset, size) @@ -190,7 +206,11 @@ func MergeIntoVisibles(visibles, newVisibles []VisibleInterval, chunk *filer_pb. return newVisibles } -func NonOverlappingVisibleIntervals(chunks []*filer_pb.FileChunk) (visibles []VisibleInterval) { +// NonOverlappingVisibleIntervals translates the file chunk into VisibleInterval in memory +// If the file chunk content is a chunk manifest +func NonOverlappingVisibleIntervals(lookupFileIdFn LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (visibles []VisibleInterval, err error) { + + chunks, _, err = ResolveChunkManifest(lookupFileIdFn, chunks) sort.Slice(chunks, func(i, j int) bool { return chunks[i].Mtime < chunks[j].Mtime diff --git a/weed/filer2/filechunks_test.go b/weed/filer2/filechunks_test.go index 7b1133b85..bfee59198 100644 --- a/weed/filer2/filechunks_test.go +++ b/weed/filer2/filechunks_test.go @@ -16,7 +16,7 @@ func TestCompactFileChunks(t *testing.T) { {Offset: 110, Size: 200, FileId: "jkl", Mtime: 300}, } - compacted, garbage := CompactFileChunks(chunks) + compacted, garbage := CompactFileChunks(nil, chunks) if len(compacted) != 3 { t.Fatalf("unexpected compacted: %d", len(compacted)) @@ -49,7 +49,7 @@ func TestCompactFileChunks2(t *testing.T) { }) } - compacted, garbage := CompactFileChunks(chunks) + compacted, garbage := CompactFileChunks(nil, chunks) if len(compacted) != 4 { t.Fatalf("unexpected compacted: %d", len(compacted)) @@ -186,7 +186,7 @@ func TestIntervalMerging(t *testing.T) { for i, testcase := range testcases { log.Printf("++++++++++ merged test case %d ++++++++++++++++++++", i) - intervals := NonOverlappingVisibleIntervals(testcase.Chunks) + intervals, _ := NonOverlappingVisibleIntervals(nil, testcase.Chunks) for x, interval := range intervals { log.Printf("test case %d, interval %d, start=%d, stop=%d, fileId=%s", i, x, interval.start, interval.stop, interval.fileId) @@ -371,7 +371,7 @@ func TestChunksReading(t *testing.T) { for i, testcase := range testcases { log.Printf("++++++++++ read test case %d ++++++++++++++++++++", i) - chunks := ViewFromChunks(testcase.Chunks, testcase.Offset, testcase.Size) + chunks := ViewFromChunks(nil, testcase.Chunks, testcase.Offset, testcase.Size) for x, chunk := range chunks { log.Printf("read case %d, chunk %d, offset=%d, size=%d, fileId=%s", i, x, chunk.Offset, chunk.Size, chunk.FileId) @@ -415,6 +415,6 @@ func BenchmarkCompactFileChunks(b *testing.B) { } for n := 0; n < b.N; n++ { - CompactFileChunks(chunks) + CompactFileChunks(nil, chunks) } } diff --git a/weed/filer2/reader_at.go b/weed/filer2/reader_at.go index 11a80443f..568d94267 100644 --- a/weed/filer2/reader_at.go +++ b/weed/filer2/reader_at.go @@ -1,7 +1,6 @@ package filer2 import ( - "bytes" "context" "fmt" "io" @@ -9,7 +8,6 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util/chunk_cache" "github.com/chrislusf/seaweedfs/weed/wdclient" ) @@ -144,19 +142,6 @@ func (c *ChunkReadAt) fetchChunkData(chunkView *ChunkView) (data []byte, err err func (c *ChunkReadAt) doFetchFullChunkData(fileId string, cipherKey []byte, isGzipped bool) ([]byte, error) { - urlString, err := c.lookupFileId(fileId) - if err != nil { - glog.V(1).Infof("operation LookupFileId %s failed, err: %v", fileId, err) - return nil, err - } - var buffer bytes.Buffer - err = util.ReadUrlAsStream(urlString, cipherKey, isGzipped, true, 0, 0, func(data []byte) { - buffer.Write(data) - }) - if err != nil { - glog.V(0).Infof("read %s failed, err: %v", fileId, err) - return nil, err - } + return fetchChunk(c.lookupFileId, fileId, cipherKey, isGzipped) - return buffer.Bytes(), nil } diff --git a/weed/filer2/stream.go b/weed/filer2/stream.go index 033a8dd13..c7df007ec 100644 --- a/weed/filer2/stream.go +++ b/weed/filer2/stream.go @@ -2,6 +2,7 @@ package filer2 import ( "bytes" + "fmt" "io" "math" "strings" @@ -14,7 +15,8 @@ import ( func StreamContent(masterClient *wdclient.MasterClient, w io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int64) error { - chunkViews := ViewFromChunks(chunks, offset, size) + fmt.Printf("start to stream content for chunks: %+v\n", chunks) + chunkViews := ViewFromChunks(masterClient.LookupFileId, chunks, offset, size) fileId2Url := make(map[string]string) @@ -50,14 +52,14 @@ func ReadAll(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) buffer := bytes.Buffer{} - chunkViews := ViewFromChunks(chunks, 0, math.MaxInt32) - - lookupFileId := func(fileId string) (targetUrl string, err error) { + lookupFileIdFn := func(fileId string) (targetUrl string, err error) { return masterClient.LookupFileId(fileId) } + chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64) + for _, chunkView := range chunkViews { - urlString, err := lookupFileId(chunkView.FileId) + urlString, err := lookupFileIdFn(chunkView.FileId) if err != nil { glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err) return nil, err @@ -88,23 +90,27 @@ var _ = io.ReadSeeker(&ChunkStreamReader{}) func NewChunkStreamReaderFromFiler(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader { - chunkViews := ViewFromChunks(chunks, 0, math.MaxInt32) + lookupFileIdFn := func(fileId string) (targetUrl string, err error) { + return masterClient.LookupFileId(fileId) + } + + chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64) return &ChunkStreamReader{ - chunkViews: chunkViews, - lookupFileId: func(fileId string) (targetUrl string, err error) { - return masterClient.LookupFileId(fileId) - }, + chunkViews: chunkViews, + lookupFileId: lookupFileIdFn, } } func NewChunkStreamReader(filerClient filer_pb.FilerClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader { - chunkViews := ViewFromChunks(chunks, 0, math.MaxInt32) + lookupFileIdFn := LookupFn(filerClient) + + chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64) return &ChunkStreamReader{ chunkViews: chunkViews, - lookupFileId: LookupFn(filerClient), + lookupFileId: lookupFileIdFn, } } diff --git a/weed/filesys/dirty_page.go b/weed/filesys/dirty_page.go index 45224b3e7..46d20e466 100644 --- a/weed/filesys/dirty_page.go +++ b/weed/filesys/dirty_page.go @@ -2,16 +2,12 @@ package filesys import ( "bytes" - "context" - "fmt" "io" "sync" "time" "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/security" ) type ContinuousDirtyPages struct { @@ -141,53 +137,15 @@ func (pages *ContinuousDirtyPages) saveExistingLargestPageToStorage() (chunk *fi func (pages *ContinuousDirtyPages) saveToStorage(reader io.Reader, offset int64, size int64) (*filer_pb.FileChunk, error) { - var fileId, host string - var auth security.EncodedJwt - dir, _ := pages.f.fullpath().DirAndName() - if err := pages.f.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { - - request := &filer_pb.AssignVolumeRequest{ - Count: 1, - Replication: pages.f.wfs.option.Replication, - Collection: pages.f.wfs.option.Collection, - TtlSec: pages.f.wfs.option.TtlSec, - DataCenter: pages.f.wfs.option.DataCenter, - ParentPath: dir, - } - - resp, err := client.AssignVolume(context.Background(), request) - if err != nil { - glog.V(0).Infof("assign volume failure %v: %v", request, err) - return err - } - if resp.Error != "" { - return fmt.Errorf("assign volume failure %v: %v", request, resp.Error) - } - - fileId, host, auth = resp.FileId, resp.Url, security.EncodedJwt(resp.Auth) - host = pages.f.wfs.AdjustedUrl(host) - pages.collection, pages.replication = resp.Collection, resp.Replication - - return nil - }); err != nil { - return nil, fmt.Errorf("filerGrpcAddress assign volume: %v", err) - } - - fileUrl := fmt.Sprintf("http://%s/%s", host, fileId) - uploadResult, err, data := operation.Upload(fileUrl, pages.f.Name, pages.f.wfs.option.Cipher, reader, false, "", nil, auth) + chunk, collection, replication, err := pages.f.wfs.saveDataAsChunk(dir)(reader, pages.f.Name, offset) if err != nil { - glog.V(0).Infof("upload data %v to %s: %v", pages.f.Name, fileUrl, err) - return nil, fmt.Errorf("upload data: %v", err) - } - if uploadResult.Error != "" { - glog.V(0).Infof("upload failure %v to %s: %v", pages.f.Name, fileUrl, err) - return nil, fmt.Errorf("upload result: %v", uploadResult.Error) + return nil, err } - pages.f.wfs.chunkCache.SetChunk(fileId, data) + pages.collection, pages.replication = collection, replication - return uploadResult.ToPbFileChunk(fileId, offset), nil + return chunk, nil } diff --git a/weed/filesys/file.go b/weed/filesys/file.go index 4a6bc9a8a..dcda93522 100644 --- a/weed/filesys/file.go +++ b/weed/filesys/file.go @@ -253,7 +253,7 @@ func (file *File) addChunks(chunks []*filer_pb.FileChunk) { func (file *File) setEntry(entry *filer_pb.Entry) { file.entry = entry - file.entryViewCache = filer2.NonOverlappingVisibleIntervals(file.entry.Chunks) + file.entryViewCache, _ = filer2.NonOverlappingVisibleIntervals(filer2.LookupFn(file.wfs), file.entry.Chunks) file.reader = nil } diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go index 9b9df916c..31fd08f97 100644 --- a/weed/filesys/filehandle.go +++ b/weed/filesys/filehandle.go @@ -88,8 +88,12 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) { return 0, nil } + var chunkResolveErr error if fh.f.entryViewCache == nil { - fh.f.entryViewCache = filer2.NonOverlappingVisibleIntervals(fh.f.entry.Chunks) + fh.f.entryViewCache, chunkResolveErr = filer2.NonOverlappingVisibleIntervals(filer2.LookupFn(fh.f.wfs), fh.f.entry.Chunks) + if chunkResolveErr != nil { + return 0, fmt.Errorf("fail to resolve chunk manifest: %v", chunkResolveErr) + } fh.f.reader = nil } @@ -206,7 +210,12 @@ func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error { glog.V(3).Infof("%s chunks %d: %v [%d,%d)", fh.f.fullpath(), i, chunk.FileId, chunk.Offset, chunk.Offset+int64(chunk.Size)) } - chunks, garbages := filer2.CompactFileChunks(fh.f.entry.Chunks) + chunks, garbages := filer2.CompactFileChunks(filer2.LookupFn(fh.f.wfs), fh.f.entry.Chunks) + chunks, manifestErr := filer2.MaybeManifestize(fh.f.wfs.saveDataAsChunk(fh.f.dir.FullPath()), chunks) + if manifestErr != nil { + // not good, but should be ok + glog.V(0).Infof("MaybeManifestize: %v", manifestErr) + } fh.f.entry.Chunks = chunks // fh.f.entryViewCache = nil diff --git a/weed/filesys/wfs_write.go b/weed/filesys/wfs_write.go new file mode 100644 index 000000000..786d0b42a --- /dev/null +++ b/weed/filesys/wfs_write.go @@ -0,0 +1,66 @@ +package filesys + +import ( + "context" + "fmt" + "io" + + "github.com/chrislusf/seaweedfs/weed/filer2" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/security" +) + +func (wfs *WFS) saveDataAsChunk(dir string) filer2.SaveDataAsChunkFunctionType { + + return func(reader io.Reader, filename string, offset int64) (chunk *filer_pb.FileChunk, collection, replication string, err error) { + var fileId, host string + var auth security.EncodedJwt + + if err := wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + + request := &filer_pb.AssignVolumeRequest{ + Count: 1, + Replication: wfs.option.Replication, + Collection: wfs.option.Collection, + TtlSec: wfs.option.TtlSec, + DataCenter: wfs.option.DataCenter, + ParentPath: dir, + } + + resp, err := client.AssignVolume(context.Background(), request) + if err != nil { + glog.V(0).Infof("assign volume failure %v: %v", request, err) + return err + } + if resp.Error != "" { + return fmt.Errorf("assign volume failure %v: %v", request, resp.Error) + } + + fileId, host, auth = resp.FileId, resp.Url, security.EncodedJwt(resp.Auth) + host = wfs.AdjustedUrl(host) + collection, replication = resp.Collection, resp.Replication + + return nil + }); err != nil { + return nil, "", "", fmt.Errorf("filerGrpcAddress assign volume: %v", err) + } + + fileUrl := fmt.Sprintf("http://%s/%s", host, fileId) + uploadResult, err, data := operation.Upload(fileUrl, filename, wfs.option.Cipher, reader, false, "", nil, auth) + if err != nil { + glog.V(0).Infof("upload data %v to %s: %v", filename, fileUrl, err) + return nil, "", "", fmt.Errorf("upload data: %v", err) + } + if uploadResult.Error != "" { + glog.V(0).Infof("upload failure %v to %s: %v", filename, fileUrl, err) + return nil, "", "", fmt.Errorf("upload result: %v", uploadResult.Error) + } + + wfs.chunkCache.SetChunk(fileId, data) + + chunk = uploadResult.ToPbFileChunk(fileId, offset) + return chunk, "", "", nil + } +} diff --git a/weed/replication/sink/azuresink/azure_sink.go b/weed/replication/sink/azuresink/azure_sink.go index aef97c06e..fa229de22 100644 --- a/weed/replication/sink/azuresink/azure_sink.go +++ b/weed/replication/sink/azuresink/azure_sink.go @@ -96,7 +96,7 @@ func (g *AzureSink) CreateEntry(key string, entry *filer_pb.Entry) error { } totalSize := filer2.TotalSize(entry.Chunks) - chunkViews := filer2.ViewFromChunks(entry.Chunks, 0, int64(totalSize)) + chunkViews := filer2.ViewFromChunks(g.filerSource.LookupFileId, entry.Chunks, 0, int64(totalSize)) // Create a URL that references a to-be-created blob in your // Azure Storage account's container. diff --git a/weed/replication/sink/b2sink/b2_sink.go b/weed/replication/sink/b2sink/b2_sink.go index 1e7d82ed4..bf8632827 100644 --- a/weed/replication/sink/b2sink/b2_sink.go +++ b/weed/replication/sink/b2sink/b2_sink.go @@ -85,7 +85,7 @@ func (g *B2Sink) CreateEntry(key string, entry *filer_pb.Entry) error { } totalSize := filer2.TotalSize(entry.Chunks) - chunkViews := filer2.ViewFromChunks(entry.Chunks, 0, int64(totalSize)) + chunkViews := filer2.ViewFromChunks(g.filerSource.LookupFileId, entry.Chunks, 0, int64(totalSize)) bucket, err := g.client.Bucket(context.Background(), g.bucket) if err != nil { diff --git a/weed/replication/sink/filersink/filer_sink.go b/weed/replication/sink/filersink/filer_sink.go index 50721a8f3..6429859b4 100644 --- a/weed/replication/sink/filersink/filer_sink.go +++ b/weed/replication/sink/filersink/filer_sink.go @@ -167,12 +167,15 @@ func (fs *FilerSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParent glog.V(0).Infof("already replicated %s", key) } else { // find out what changed - deletedChunks, newChunks := compareChunks(oldEntry, newEntry) + deletedChunks, newChunks, err := compareChunks(filer2.LookupFn(fs), oldEntry, newEntry) + if err != nil { + return true, fmt.Errorf("replicte %s compare chunks error: %v", key, err) + } // delete the chunks that are deleted from the source if deleteIncludeChunks { // remove the deleted chunks. Actual data deletion happens in filer UpdateEntry FindUnusedFileChunks - existingEntry.Chunks = filer2.MinusChunks(existingEntry.Chunks, deletedChunks) + existingEntry.Chunks = filer2.DoMinusChunks(existingEntry.Chunks, deletedChunks) } // replicate the chunks that are new in the source @@ -200,8 +203,21 @@ func (fs *FilerSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParent }) } -func compareChunks(oldEntry, newEntry *filer_pb.Entry) (deletedChunks, newChunks []*filer_pb.FileChunk) { - deletedChunks = filer2.MinusChunks(oldEntry.Chunks, newEntry.Chunks) - newChunks = filer2.MinusChunks(newEntry.Chunks, oldEntry.Chunks) +func compareChunks(lookupFileIdFn filer2.LookupFileIdFunctionType, oldEntry, newEntry *filer_pb.Entry) (deletedChunks, newChunks []*filer_pb.FileChunk, err error) { + aData, aMeta, aErr := filer2.ResolveChunkManifest(lookupFileIdFn, oldEntry.Chunks) + if aErr != nil { + return nil, nil, aErr + } + bData, bMeta, bErr := filer2.ResolveChunkManifest(lookupFileIdFn, newEntry.Chunks) + if bErr != nil { + return nil, nil, bErr + } + + deletedChunks = append(deletedChunks, filer2.DoMinusChunks(aData, bData)...) + deletedChunks = append(deletedChunks, filer2.DoMinusChunks(aMeta, bMeta)...) + + newChunks = append(newChunks, filer2.DoMinusChunks(bData, aData)...) + newChunks = append(newChunks, filer2.DoMinusChunks(bMeta, aMeta)...) + return } diff --git a/weed/replication/sink/gcssink/gcs_sink.go b/weed/replication/sink/gcssink/gcs_sink.go index bb5a54272..4b58160db 100644 --- a/weed/replication/sink/gcssink/gcs_sink.go +++ b/weed/replication/sink/gcssink/gcs_sink.go @@ -90,7 +90,7 @@ func (g *GcsSink) CreateEntry(key string, entry *filer_pb.Entry) error { } totalSize := filer2.TotalSize(entry.Chunks) - chunkViews := filer2.ViewFromChunks(entry.Chunks, 0, int64(totalSize)) + chunkViews := filer2.ViewFromChunks(g.filerSource.LookupFileId, entry.Chunks, 0, int64(totalSize)) wc := g.client.Bucket(g.bucket).Object(key).NewWriter(context.Background()) diff --git a/weed/replication/sink/s3sink/s3_sink.go b/weed/replication/sink/s3sink/s3_sink.go index d7af105b8..625cf406c 100644 --- a/weed/replication/sink/s3sink/s3_sink.go +++ b/weed/replication/sink/s3sink/s3_sink.go @@ -108,7 +108,7 @@ func (s3sink *S3Sink) CreateEntry(key string, entry *filer_pb.Entry) error { } totalSize := filer2.TotalSize(entry.Chunks) - chunkViews := filer2.ViewFromChunks(entry.Chunks, 0, int64(totalSize)) + chunkViews := filer2.ViewFromChunks(s3sink.filerSource.LookupFileId, entry.Chunks, 0, int64(totalSize)) parts := make([]*s3.CompletedPart, len(chunkViews)) diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go index 17e32731c..48e9253f0 100644 --- a/weed/server/filer_grpc_server.go +++ b/weed/server/filer_grpc_server.go @@ -14,6 +14,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/util" ) @@ -137,13 +138,28 @@ func (fs *FilerServer) LookupVolume(ctx context.Context, req *filer_pb.LookupVol return resp, nil } +func (fs *FilerServer) lookupFileId(fileId string) (targetUrl string, err error) { + fid, err := needle.ParseFileIdFromString(fileId) + if err != nil { + return "", err + } + locations, found := fs.filer.MasterClient.GetLocations(uint32(fid.VolumeId)) + if !found || len(locations) == 0 { + return "", fmt.Errorf("not found volume %d in %s", fid.VolumeId, fileId) + } + return fmt.Sprintf("http://%s/%s", locations[0].Url, fileId), nil +} + func (fs *FilerServer) CreateEntry(ctx context.Context, req *filer_pb.CreateEntryRequest) (resp *filer_pb.CreateEntryResponse, err error) { glog.V(4).Infof("CreateEntry %v", req) resp = &filer_pb.CreateEntryResponse{} - chunks, garbages := filer2.CompactFileChunks(req.Entry.Chunks) + chunks, garbage, err2 := fs.cleanupChunks(nil, req.Entry) + if err2 != nil { + return &filer_pb.CreateEntryResponse{}, fmt.Errorf("CreateEntry cleanupChunks %s %s: %v", req.Directory, req.Entry.Name, err2) + } if req.Entry.Attributes == nil { glog.V(3).Infof("CreateEntry %s: nil attributes", filepath.Join(req.Directory, req.Entry.Name)) @@ -158,7 +174,7 @@ func (fs *FilerServer) CreateEntry(ctx context.Context, req *filer_pb.CreateEntr }, req.OExcl, req.IsFromOtherCluster) if createErr == nil { - fs.filer.DeleteChunks(garbages) + fs.filer.DeleteChunks(garbage) } else { glog.V(3).Infof("CreateEntry %s: %v", filepath.Join(req.Directory, req.Entry.Name), createErr) resp.Error = createErr.Error() @@ -177,10 +193,10 @@ func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntr return &filer_pb.UpdateEntryResponse{}, fmt.Errorf("not found %s: %v", fullpath, err) } - // remove old chunks if not included in the new ones - unusedChunks := filer2.MinusChunks(entry.Chunks, req.Entry.Chunks) - - chunks, garbages := filer2.CompactFileChunks(req.Entry.Chunks) + chunks, garbage, err2 := fs.cleanupChunks(entry, req.Entry) + if err2 != nil { + return &filer_pb.UpdateEntryResponse{}, fmt.Errorf("UpdateEntry cleanupChunks %s: %v", fullpath, err2) + } newEntry := &filer2.Entry{ FullPath: util.JoinPath(req.Directory, req.Entry.Name), @@ -214,8 +230,7 @@ func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntr } if err = fs.filer.UpdateEntry(ctx, entry, newEntry); err == nil { - fs.filer.DeleteChunks(unusedChunks) - fs.filer.DeleteChunks(garbages) + fs.filer.DeleteChunks(garbage) } else { glog.V(3).Infof("UpdateEntry %s: %v", filepath.Join(req.Directory, req.Entry.Name), err) } @@ -225,6 +240,37 @@ func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntr return &filer_pb.UpdateEntryResponse{}, err } +func (fs *FilerServer) cleanupChunks(existingEntry *filer2.Entry, newEntry *filer_pb.Entry) (chunks, garbage []*filer_pb.FileChunk, err error) { + chunks = newEntry.Chunks + + // remove old chunks if not included in the new ones + if existingEntry != nil { + garbage, err = filer2.MinusChunks(fs.lookupFileId, existingEntry.Chunks, newEntry.Chunks) + if err != nil { + return chunks, nil, fmt.Errorf("MinusChunks: %v", err) + } + } + + // files with manifest chunks are usually large and append only, skip calculating covered chunks + var coveredChunks []*filer_pb.FileChunk + if !filer2.HasChunkManifest(newEntry.Chunks) { + chunks, coveredChunks = filer2.CompactFileChunks(fs.lookupFileId, newEntry.Chunks) + garbage = append(garbage, coveredChunks...) + } + + chunks, err = filer2.MaybeManifestize(fs.saveAsChunk( + newEntry.Attributes.Replication, + newEntry.Attributes.Collection, + "", + needle.SecondsToTTL(newEntry.Attributes.TtlSec), + false), chunks) + if err != nil { + // not good, but should be ok + glog.V(0).Infof("MaybeManifestize: %v", err) + } + return +} + func (fs *FilerServer) AppendToEntry(ctx context.Context, req *filer_pb.AppendToEntryRequest) (*filer_pb.AppendToEntryResponse, error) { glog.V(4).Infof("AppendToEntry %v", req) @@ -254,6 +300,17 @@ func (fs *FilerServer) AppendToEntry(ctx context.Context, req *filer_pb.AppendTo entry.Chunks = append(entry.Chunks, req.Chunks...) + entry.Chunks, err = filer2.MaybeManifestize(fs.saveAsChunk( + entry.Replication, + entry.Collection, + "", + needle.SecondsToTTL(entry.TtlSec), + false), entry.Chunks) + if err != nil { + // not good, but should be ok + glog.V(0).Infof("MaybeManifestize: %v", err) + } + err = fs.filer.CreateEntry(context.Background(), entry, false, false) return &filer_pb.AppendToEntryResponse{}, err diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go index a642c502a..da66178ce 100644 --- a/weed/server/filer_server_handlers_write.go +++ b/weed/server/filer_server_handlers_write.go @@ -40,7 +40,7 @@ type FilerPostResult struct { Url string `json:"url,omitempty"` } -func (fs *FilerServer) assignNewFileInfo(w http.ResponseWriter, r *http.Request, replication, collection, dataCenter, ttlString string, fsync bool) (fileId, urlLocation string, auth security.EncodedJwt, err error) { +func (fs *FilerServer) assignNewFileInfo(replication, collection, dataCenter, ttlString string, fsync bool) (fileId, urlLocation string, auth security.EncodedJwt, err error) { stats.FilerRequestCounter.WithLabelValues("assign").Inc() start := time.Now() @@ -67,7 +67,6 @@ func (fs *FilerServer) assignNewFileInfo(w http.ResponseWriter, r *http.Request, assignResult, ae := operation.Assign(fs.filer.GetMaster(), fs.grpcDialOption, ar, altRequest) if ae != nil { glog.Errorf("failing to assign a file id: %v", ae) - writeJsonError(w, r, http.StatusInternalServerError, ae) err = ae return } @@ -114,7 +113,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { return } - fileId, urlLocation, auth, err := fs.assignNewFileInfo(w, r, replication, collection, dataCenter, ttlString, fsync) + fileId, urlLocation, auth, err := fs.assignNewFileInfo(replication, collection, dataCenter, ttlString, fsync) if err != nil || fileId == "" || urlLocation == "" { glog.V(0).Infof("fail to allocate volume for %s, collection:%s, datacenter:%s", r.URL.Path, collection, dataCenter) diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go index 29546542c..be0438efb 100644 --- a/weed/server/filer_server_handlers_write_autochunk.go +++ b/weed/server/filer_server_handlers_write_autochunk.go @@ -102,7 +102,7 @@ func (fs *FilerServer) doAutoChunk(ctx context.Context, w http.ResponseWriter, r limitedReader := io.LimitReader(partReader, int64(chunkSize)) // assign one file id for one chunk - fileId, urlLocation, auth, assignErr := fs.assignNewFileInfo(w, r, replication, collection, dataCenter, ttlString, fsync) + fileId, urlLocation, auth, assignErr := fs.assignNewFileInfo(replication, collection, dataCenter, ttlString, fsync) if assignErr != nil { return nil, assignErr } @@ -132,6 +132,12 @@ func (fs *FilerServer) doAutoChunk(ctx context.Context, w http.ResponseWriter, r } } + fileChunks, replyerr = filer2.MaybeManifestize(fs.saveAsChunk(replication, collection, dataCenter, ttlString, fsync), fileChunks) + if replyerr != nil { + glog.V(0).Infof("manifestize %s: %v", r.RequestURI, replyerr) + return + } + path := r.URL.Path if strings.HasSuffix(path, "/") { if fileName != "" { @@ -184,3 +190,23 @@ func (fs *FilerServer) doUpload(urlLocation string, w http.ResponseWriter, r *ht uploadResult, err, _ := operation.Upload(urlLocation, fileName, fs.option.Cipher, limitedReader, false, contentType, pairMap, auth) return uploadResult, err } + +func (fs *FilerServer) saveAsChunk(replication string, collection string, dataCenter string, ttlString string, fsync bool) filer2.SaveDataAsChunkFunctionType { + + return func(reader io.Reader, name string, offset int64) (*filer_pb.FileChunk, string, string, error) { + // assign one file id for one chunk + fileId, urlLocation, auth, assignErr := fs.assignNewFileInfo(replication, collection, dataCenter, ttlString, fsync) + if assignErr != nil { + return nil, "", "", assignErr + } + + // upload the chunk to the volume server + uploadResult, uploadErr, _ := operation.Upload(urlLocation, name, fs.option.Cipher, reader, false, "", nil, auth) + if uploadErr != nil { + return nil, "", "", uploadErr + } + + return uploadResult.ToPbFileChunk(fileId, offset), collection, replication, nil + } +} + diff --git a/weed/server/filer_server_handlers_write_cipher.go b/weed/server/filer_server_handlers_write_cipher.go index 17f35838d..8413496b8 100644 --- a/weed/server/filer_server_handlers_write_cipher.go +++ b/weed/server/filer_server_handlers_write_cipher.go @@ -19,7 +19,7 @@ import ( func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *http.Request, replication string, collection string, dataCenter string, ttlSeconds int32, ttlString string, fsync bool) (filerResult *FilerPostResult, err error) { - fileId, urlLocation, auth, err := fs.assignNewFileInfo(w, r, replication, collection, dataCenter, ttlString, fsync) + fileId, urlLocation, auth, err := fs.assignNewFileInfo(replication, collection, dataCenter, ttlString, fsync) if err != nil || fileId == "" || urlLocation == "" { return nil, fmt.Errorf("fail to allocate volume for %s, collection:%s, datacenter:%s", r.URL.Path, collection, dataCenter) diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go index e8bedd352..8655daf70 100644 --- a/weed/server/webdav_server.go +++ b/weed/server/webdav_server.go @@ -474,7 +474,7 @@ func (f *WebDavFile) Read(p []byte) (readSize int, err error) { return 0, io.EOF } if f.entryViewCache == nil { - f.entryViewCache = filer2.NonOverlappingVisibleIntervals(f.entry.Chunks) + f.entryViewCache, _ = filer2.NonOverlappingVisibleIntervals(filer2.LookupFn(f.fs), f.entry.Chunks) f.reader = nil } if f.reader == nil { diff --git a/weed/storage/needle/volume_ttl.go b/weed/storage/needle/volume_ttl.go index 179057876..26ce3b8fd 100644 --- a/weed/storage/needle/volume_ttl.go +++ b/weed/storage/needle/volume_ttl.go @@ -1,11 +1,12 @@ package needle import ( + "fmt" "strconv" ) const ( - //stored unit types + // stored unit types Empty byte = iota Minute Hour @@ -139,3 +140,10 @@ func (t TTL) Minutes() uint32 { } return 0 } + +func SecondsToTTL(seconds int32) string { + if seconds == 0 { + return "" + } + return fmt.Sprintf("%dm", seconds/60) +}