diff --git a/weed/filesys/dirty_page.go b/weed/filesys/dirty_page.go index 3b55d5c0b..1dd7d3fde 100644 --- a/weed/filesys/dirty_page.go +++ b/weed/filesys/dirty_page.go @@ -37,32 +37,7 @@ func (pages *ContinuousDirtyPages) AddPage(ctx context.Context, offset int64, da if len(data) > len(pages.Data) { // this is more than what buffer can hold. - - // flush existing - if chunk, err = pages.saveExistingPagesToStorage(ctx); err == nil { - if chunk != nil { - glog.V(4).Infof("%s/%s flush existing [%d,%d)", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size)) - chunks = append(chunks, chunk) - } - } else { - glog.V(0).Infof("%s/%s failed to flush1 [%d,%d): %v", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size), err) - return - } - pages.Size = 0 - pages.Offset = 0 - - // flush the big page - if chunk, err = pages.saveToStorage(ctx, data, offset); err == nil { - if chunk != nil { - glog.V(4).Infof("%s/%s flush big request [%d,%d)", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size)) - chunks = append(chunks, chunk) - } - } else { - glog.V(0).Infof("%s/%s failed to flush2 [%d,%d): %v", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size), err) - return - } - - return + return pages.flushAndSave(ctx, offset, data) } if offset < pages.Offset || offset >= pages.Offset+int64(len(pages.Data)) || @@ -91,15 +66,46 @@ func (pages *ContinuousDirtyPages) AddPage(ctx context.Context, offset int64, da if offset != pages.Offset+pages.Size { // when this happens, debug shows the data overlapping with existing data is empty // the data is not just append - copy(pages.Data[pages.Size:], data[pages.Offset+pages.Size-offset:]) - } else { - copy(pages.Data[offset-pages.Offset:], data) + return pages.flushAndSave(ctx, offset, data) } + + copy(pages.Data[offset-pages.Offset:], data) pages.Size = max(pages.Size, offset+int64(len(data))-pages.Offset) return } +func (pages *ContinuousDirtyPages) flushAndSave(ctx context.Context, offset int64, data []byte) (chunks []*filer_pb.FileChunk, err error) { + + var chunk *filer_pb.FileChunk + + // flush existing + if chunk, err = pages.saveExistingPagesToStorage(ctx); err == nil { + if chunk != nil { + glog.V(4).Infof("%s/%s flush existing [%d,%d)", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size)) + chunks = append(chunks, chunk) + } + } else { + glog.V(0).Infof("%s/%s failed to flush1 [%d,%d): %v", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size), err) + return + } + pages.Size = 0 + pages.Offset = 0 + + // flush the new page + if chunk, err = pages.saveToStorage(ctx, data, offset); err == nil { + if chunk != nil { + glog.V(4).Infof("%s/%s flush big request [%d,%d)", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size)) + chunks = append(chunks, chunk) + } + } else { + glog.V(0).Infof("%s/%s failed to flush2 [%d,%d): %v", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size), err) + return + } + + return +} + func (pages *ContinuousDirtyPages) FlushToStorage(ctx context.Context) (chunk *filer_pb.FileChunk, err error) { pages.lock.Lock()