From acf7ca7b93e0a33e9fc987dec600d1b9f1dc1e32 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 8 Jan 2020 09:45:03 -0800 Subject: [PATCH] volume: fix compaction --- weed/storage/needle_map/memdb.go | 5 ++++- weed/storage/volume_vacuum.go | 38 ++++++++++++++------------------ 2 files changed, 20 insertions(+), 23 deletions(-) diff --git a/weed/storage/needle_map/memdb.go b/weed/storage/needle_map/memdb.go index 6aba6adeb..9eb4d9f56 100644 --- a/weed/storage/needle_map/memdb.go +++ b/weed/storage/needle_map/memdb.go @@ -89,6 +89,9 @@ func (cm *MemDb) SaveToIdx(idxName string) (ret error) { defer idxFile.Close() return cm.AscendingVisit(func(value NeedleValue) error { + if value.Offset.IsZero() || value.Size == TombstoneFileSize { + return nil + } _, err := idxFile.Write(value.ToBytes()) return err }) @@ -104,7 +107,7 @@ func (cm *MemDb) LoadFromIdx(idxName string) (ret error) { return idx.WalkIndexFile(idxFile, func(key NeedleId, offset Offset, size uint32) error { if offset.IsZero() || size == TombstoneFileSize { - return nil + return cm.Delete(key) } return cm.Set(key, offset, size) }) diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go index 434b5989d..09bf36f7a 100644 --- a/weed/storage/volume_vacuum.go +++ b/weed/storage/volume_vacuum.go @@ -356,19 +356,17 @@ func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string, prealloca func (v *Volume) copyDataBasedOnIndexFile(dstName, idxName string, preallocate int64) (err error) { var ( dstDatBackend backend.BackendStorageFile - oldIndexFile *os.File ) if dstDatBackend, err = createVolumeFile(dstName, preallocate, 0); err != nil { return } defer dstDatBackend.Close() - if oldIndexFile, err = os.OpenFile(v.FileName()+".idx", os.O_RDONLY, 0644); err != nil { + oldNm := needle_map.NewMemDb() + newNm := needle_map.NewMemDb() + if err = oldNm.LoadFromIdx(v.FileName()+".idx"); err != nil { return } - defer oldIndexFile.Close() - - nm := needle_map.NewMemDb() now := uint64(time.Now().Unix()) @@ -376,13 +374,11 @@ func (v *Volume) copyDataBasedOnIndexFile(dstName, idxName string, preallocate i dstDatBackend.WriteAt(v.SuperBlock.Bytes(), 0) newOffset := int64(v.SuperBlock.BlockSize()) - idx2.WalkIndexFile(oldIndexFile, func(key NeedleId, offset Offset, size uint32) error { - if offset.IsZero() || size == TombstoneFileSize { - return nil - } + oldNm.AscendingVisit(func(value needle_map.NeedleValue) error { + + offset, size := value.Offset, value.Size - nv, ok := v.nm.Get(key) - if !ok { + if offset.IsZero() || size == TombstoneFileSize { return nil } @@ -396,21 +392,19 @@ func (v *Volume) copyDataBasedOnIndexFile(dstName, idxName string, preallocate i return nil } - glog.V(4).Infoln("needle expected offset ", offset, "ok", ok, "nv", nv) - if nv.Offset == offset && nv.Size > 0 { - if err = nm.Set(n.Id, ToOffset(newOffset), n.Size); err != nil { - return fmt.Errorf("cannot put needle: %s", err) - } - if _, _, _, err = n.Append(dstDatBackend, v.Version()); err != nil { - return fmt.Errorf("cannot append needle: %s", err) - } - newOffset += n.DiskSize(v.Version()) - glog.V(3).Infoln("saving key", n.Id, "volume offset", offset, "=>", newOffset, "data_size", n.Size) + if err = newNm.Set(n.Id, ToOffset(newOffset), n.Size); err != nil { + return fmt.Errorf("cannot put needle: %s", err) } + if _, _, _, err = n.Append(dstDatBackend, v.Version()); err != nil { + return fmt.Errorf("cannot append needle: %s", err) + } + newOffset += n.DiskSize(v.Version()) + glog.V(3).Infoln("saving key", n.Id, "volume offset", offset, "=>", newOffset, "data_size", n.Size) + return nil }) - nm.SaveToIdx(idxName) + newNm.SaveToIdx(idxName) return }