From 7f7e4e98854766c1fb4604fec90945ba87a63d56 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 23 Jun 2018 18:24:59 -0700 Subject: [PATCH] fix error for deleted files during compaction deletion during commit may cause trouble when make up the difference during commitCompact() --- weed/storage/needle_read_write.go | 2 +- weed/storage/volume_vacuum.go | 130 +++++++++++++++-------------- weed/storage/volume_vacuum_test.go | 64 ++++++++------ 3 files changed, 107 insertions(+), 89 deletions(-) diff --git a/weed/storage/needle_read_write.go b/weed/storage/needle_read_write.go index ee7cc6046..4241f0758 100644 --- a/weed/storage/needle_read_write.go +++ b/weed/storage/needle_read_write.go @@ -162,7 +162,7 @@ func (n *Needle) ReadData(r *os.File, offset int64, size uint32, version Version } n.ParseNeedleHeader(bytes) if n.Size != size { - return fmt.Errorf("File Entry Not Found. Needle %d Memory %d", n.Size, size) + return fmt.Errorf("File Entry Not Found. Needle id %d expected size %d Memory %d", n.Id, n.Size, size) } switch version { case Version1: diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go index a9fe6c03d..9171cadfb 100644 --- a/weed/storage/volume_vacuum.go +++ b/weed/storage/volume_vacuum.go @@ -119,7 +119,7 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI oldDatCompactRevision, err := fetchCompactRevisionFromDatFile(oldDatFile) if err != nil { - return + return fmt.Errorf("fetchCompactRevisionFromDatFile src %s failed: %v", oldDatFile.Name(), err) } if oldDatCompactRevision != v.lastCompactRevision { return fmt.Errorf("current old dat file's compact revision %d is not the expected one %d", oldDatCompactRevision, v.lastCompactRevision) @@ -137,6 +137,7 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI return fmt.Errorf("readIndexEntry %s at offset %d failed: %v", oldIdxFileName, idx_offset, err) } key, offset, size := idxFileEntry(IdxEntry) + glog.V(0).Infof("key %d offset %d size %d", key, offset, size) if _, found := incrementedHasUpdatedIndexEntry[key]; !found { incrementedHasUpdatedIndexEntry[key] = keyField{ offset: offset, @@ -145,77 +146,82 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI } } - if len(incrementedHasUpdatedIndexEntry) > 0 { - var ( - dst, idx *os.File - ) - if dst, err = os.OpenFile(newDatFileName, os.O_RDWR, 0644); err != nil { - return - } - defer dst.Close() + // no updates during commit step + if len(incrementedHasUpdatedIndexEntry) == 0 { + return nil + } - if idx, err = os.OpenFile(newIdxFileName, os.O_RDWR, 0644); err != nil { - return - } - defer idx.Close() + // deal with updates during commit step + var ( + dst, idx *os.File + ) + if dst, err = os.OpenFile(newDatFileName, os.O_RDWR, 0644); err != nil { + return fmt.Errorf("open dat file %s failed: %v", newDatFileName, err) + } + defer dst.Close() - var newDatCompactRevision uint16 - newDatCompactRevision, err = fetchCompactRevisionFromDatFile(dst) - if err != nil { - return - } - if oldDatCompactRevision+1 != newDatCompactRevision { - return fmt.Errorf("oldDatFile %s 's compact revision is %d while newDatFile %s 's compact revision is %d", oldDatFileName, oldDatCompactRevision, newDatFileName, newDatCompactRevision) - } + if idx, err = os.OpenFile(newIdxFileName, os.O_RDWR, 0644); err != nil { + return fmt.Errorf("open idx file %s failed: %v", newIdxFileName, err) + } + defer idx.Close() + + var newDatCompactRevision uint16 + newDatCompactRevision, err = fetchCompactRevisionFromDatFile(dst) + if err != nil { + return fmt.Errorf("fetchCompactRevisionFromDatFile dst %s failed: %v", dst.Name(), err) + } + if oldDatCompactRevision+1 != newDatCompactRevision { + return fmt.Errorf("oldDatFile %s 's compact revision is %d while newDatFile %s 's compact revision is %d", oldDatFileName, oldDatCompactRevision, newDatFileName, newDatCompactRevision) + } - idx_entry_bytes := make([]byte, 16) - for key, incre_idx_entry := range incrementedHasUpdatedIndexEntry { - util.Uint64toBytes(idx_entry_bytes[0:8], key) - util.Uint32toBytes(idx_entry_bytes[8:12], incre_idx_entry.offset) - util.Uint32toBytes(idx_entry_bytes[12:16], incre_idx_entry.size) + idx_entry_bytes := make([]byte, 16) + for key, incre_idx_entry := range incrementedHasUpdatedIndexEntry { + util.Uint64toBytes(idx_entry_bytes[0:8], key) + util.Uint32toBytes(idx_entry_bytes[8:12], incre_idx_entry.offset) + util.Uint32toBytes(idx_entry_bytes[12:16], incre_idx_entry.size) - var offset int64 - if offset, err = dst.Seek(0, 2); err != nil { - glog.V(0).Infof("failed to seek the end of file: %v", err) + var offset int64 + if offset, err = dst.Seek(0, 2); err != nil { + glog.V(0).Infof("failed to seek the end of file: %v", err) + return + } + //ensure file writing starting from aligned positions + if offset%NeedlePaddingSize != 0 { + offset = offset + (NeedlePaddingSize - offset%NeedlePaddingSize) + if offset, err = v.dataFile.Seek(offset, 0); err != nil { + glog.V(0).Infof("failed to align in datafile %s: %v", v.dataFile.Name(), err) return } - //ensure file writing starting from aligned positions - if offset%NeedlePaddingSize != 0 { - offset = offset + (NeedlePaddingSize - offset%NeedlePaddingSize) - if offset, err = v.dataFile.Seek(offset, 0); err != nil { - glog.V(0).Infof("failed to align in datafile %s: %v", v.dataFile.Name(), err) - return - } - } + } - //updated needle - if incre_idx_entry.offset != 0 && incre_idx_entry.size != 0 { - //even the needle cache in memory is hit, the need_bytes is correct - var needle_bytes []byte - needle_bytes, err = ReadNeedleBlob(oldDatFile, int64(incre_idx_entry.offset)*NeedlePaddingSize, incre_idx_entry.size) - if err != nil { - return - } - dst.Write(needle_bytes) - util.Uint32toBytes(idx_entry_bytes[8:12], uint32(offset/NeedlePaddingSize)) - } else { //deleted needle - //fakeDelNeedle 's default Data field is nil - fakeDelNeedle := new(Needle) - fakeDelNeedle.Id = key - fakeDelNeedle.Cookie = 0x12345678 - _, _, err = fakeDelNeedle.Append(dst, v.Version()) - if err != nil { - return - } - util.Uint32toBytes(idx_entry_bytes[8:12], uint32(0)) + //updated needle + if incre_idx_entry.offset != 0 && incre_idx_entry.size != 0 && incre_idx_entry.size != TombstoneFileSize { + //even the needle cache in memory is hit, the need_bytes is correct + glog.V(0).Infof("file %d offset %d size %d", key, int64(incre_idx_entry.offset)*NeedlePaddingSize, incre_idx_entry.size) + var needle_bytes []byte + needle_bytes, err = ReadNeedleBlob(oldDatFile, int64(incre_idx_entry.offset)*NeedlePaddingSize, incre_idx_entry.size) + if err != nil { + return fmt.Errorf("ReadNeedleBlob %s key %d offset %d size %d failed: %v", oldDatFile.Name(), key, int64(incre_idx_entry.offset)*NeedlePaddingSize, incre_idx_entry.size, err) } - - if _, err := idx.Seek(0, 2); err != nil { - return fmt.Errorf("cannot seek end of indexfile %s: %v", - newIdxFileName, err) + dst.Write(needle_bytes) + util.Uint32toBytes(idx_entry_bytes[8:12], uint32(offset/NeedlePaddingSize)) + } else { //deleted needle + //fakeDelNeedle 's default Data field is nil + fakeDelNeedle := new(Needle) + fakeDelNeedle.Id = key + fakeDelNeedle.Cookie = 0x12345678 + _, _, err = fakeDelNeedle.Append(dst, v.Version()) + if err != nil { + return fmt.Errorf("append deleted %d failed: %v", key, err) } - _, err = idx.Write(idx_entry_bytes) + util.Uint32toBytes(idx_entry_bytes[8:12], uint32(0)) + } + + if _, err := idx.Seek(0, 2); err != nil { + return fmt.Errorf("cannot seek end of indexfile %s: %v", + newIdxFileName, err) } + _, err = idx.Write(idx_entry_bytes) } return nil diff --git a/weed/storage/volume_vacuum_test.go b/weed/storage/volume_vacuum_test.go index c685102f2..428be14b7 100644 --- a/weed/storage/volume_vacuum_test.go +++ b/weed/storage/volume_vacuum_test.go @@ -69,36 +69,21 @@ func TestCompaction(t *testing.T) { t.Fatalf("volume creation: %v", err) } - FILE_COUNT := 234 + beforeCommitFileCount := 10 + afterCommitFileCount := 10 - infos := make([]*needleInfo, FILE_COUNT) + infos := make([]*needleInfo, beforeCommitFileCount+afterCommitFileCount) - for i := 1; i <= FILE_COUNT; i++ { - n := newRandomNeedle(uint64(i)) - size, err := v.writeNeedle(n) - if err != nil { - t.Fatalf("write file %d: %v", i, err) - } - infos[i-1] = &needleInfo{ - size: size, - crc: n.Checksum, - } - - println("written file", i, "checksum", n.Checksum.Value(), "size", size) - - if rand.Float64() < 0.5 { - toBeDeleted := rand.Intn(i) + 1 - oldNeedle := newEmptyNeedle(uint64(toBeDeleted)) - v.deleteNeedle(oldNeedle) - println("deleted file", toBeDeleted) - infos[toBeDeleted-1] = &needleInfo{ - size: 0, - crc: n.Checksum, - } - } + for i := 1; i <= beforeCommitFileCount; i++ { + doSomeWritesDeletes(i, v, t, infos) } v.Compact(0) + + for i := 1; i <= afterCommitFileCount; i++ { + doSomeWritesDeletes(i+beforeCommitFileCount, v, t, infos) + } + v.commitCompact() v.Close() @@ -108,7 +93,12 @@ func TestCompaction(t *testing.T) { t.Fatalf("volume reloading: %v", err) } - for i := 1; i <= FILE_COUNT; i++ { + for i := 1; i <= beforeCommitFileCount+afterCommitFileCount; i++ { + + if infos[i-1] == nil { + t.Fatal("not found file", i) + continue + } if infos[i-1].size == 0 { continue @@ -129,6 +119,28 @@ func TestCompaction(t *testing.T) { } } +func doSomeWritesDeletes(i int, v *Volume, t *testing.T, infos []*needleInfo) { + n := newRandomNeedle(uint64(i)) + size, err := v.writeNeedle(n) + if err != nil { + t.Fatalf("write file %d: %v", i, err) + } + infos[i-1] = &needleInfo{ + size: size, + crc: n.Checksum, + } + println("written file", i, "checksum", n.Checksum.Value(), "size", size) + if rand.Float64() < 0.5 { + toBeDeleted := rand.Intn(i) + 1 + oldNeedle := newEmptyNeedle(uint64(toBeDeleted)) + v.deleteNeedle(oldNeedle) + println("deleted file", toBeDeleted) + infos[toBeDeleted-1] = &needleInfo{ + size: 0, + crc: n.Checksum, + } + } +} type needleInfo struct { size uint32