Browse Source

volume: fix compaction

pull/1189/head
Chris Lu 5 years ago
parent
commit
acf7ca7b93
  1. 5
      weed/storage/needle_map/memdb.go
  2. 26
      weed/storage/volume_vacuum.go

5
weed/storage/needle_map/memdb.go

@ -89,6 +89,9 @@ func (cm *MemDb) SaveToIdx(idxName string) (ret error) {
defer idxFile.Close() defer idxFile.Close()
return cm.AscendingVisit(func(value NeedleValue) error { return cm.AscendingVisit(func(value NeedleValue) error {
if value.Offset.IsZero() || value.Size == TombstoneFileSize {
return nil
}
_, err := idxFile.Write(value.ToBytes()) _, err := idxFile.Write(value.ToBytes())
return err return err
}) })
@ -104,7 +107,7 @@ func (cm *MemDb) LoadFromIdx(idxName string) (ret error) {
return idx.WalkIndexFile(idxFile, func(key NeedleId, offset Offset, size uint32) error { return idx.WalkIndexFile(idxFile, func(key NeedleId, offset Offset, size uint32) error {
if offset.IsZero() || size == TombstoneFileSize { if offset.IsZero() || size == TombstoneFileSize {
return nil
return cm.Delete(key)
} }
return cm.Set(key, offset, size) return cm.Set(key, offset, size)
}) })

26
weed/storage/volume_vacuum.go

@ -356,19 +356,17 @@ func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string, prealloca
func (v *Volume) copyDataBasedOnIndexFile(dstName, idxName string, preallocate int64) (err error) { func (v *Volume) copyDataBasedOnIndexFile(dstName, idxName string, preallocate int64) (err error) {
var ( var (
dstDatBackend backend.BackendStorageFile dstDatBackend backend.BackendStorageFile
oldIndexFile *os.File
) )
if dstDatBackend, err = createVolumeFile(dstName, preallocate, 0); err != nil { if dstDatBackend, err = createVolumeFile(dstName, preallocate, 0); err != nil {
return return
} }
defer dstDatBackend.Close() defer dstDatBackend.Close()
if oldIndexFile, err = os.OpenFile(v.FileName()+".idx", os.O_RDONLY, 0644); err != nil {
oldNm := needle_map.NewMemDb()
newNm := needle_map.NewMemDb()
if err = oldNm.LoadFromIdx(v.FileName()+".idx"); err != nil {
return return
} }
defer oldIndexFile.Close()
nm := needle_map.NewMemDb()
now := uint64(time.Now().Unix()) now := uint64(time.Now().Unix())
@ -376,13 +374,11 @@ func (v *Volume) copyDataBasedOnIndexFile(dstName, idxName string, preallocate i
dstDatBackend.WriteAt(v.SuperBlock.Bytes(), 0) dstDatBackend.WriteAt(v.SuperBlock.Bytes(), 0)
newOffset := int64(v.SuperBlock.BlockSize()) newOffset := int64(v.SuperBlock.BlockSize())
idx2.WalkIndexFile(oldIndexFile, func(key NeedleId, offset Offset, size uint32) error {
if offset.IsZero() || size == TombstoneFileSize {
return nil
}
oldNm.AscendingVisit(func(value needle_map.NeedleValue) error {
nv, ok := v.nm.Get(key)
if !ok {
offset, size := value.Offset, value.Size
if offset.IsZero() || size == TombstoneFileSize {
return nil return nil
} }
@ -396,9 +392,7 @@ func (v *Volume) copyDataBasedOnIndexFile(dstName, idxName string, preallocate i
return nil return nil
} }
glog.V(4).Infoln("needle expected offset ", offset, "ok", ok, "nv", nv)
if nv.Offset == offset && nv.Size > 0 {
if err = nm.Set(n.Id, ToOffset(newOffset), n.Size); err != nil {
if err = newNm.Set(n.Id, ToOffset(newOffset), n.Size); err != nil {
return fmt.Errorf("cannot put needle: %s", err) return fmt.Errorf("cannot put needle: %s", err)
} }
if _, _, _, err = n.Append(dstDatBackend, v.Version()); err != nil { if _, _, _, err = n.Append(dstDatBackend, v.Version()); err != nil {
@ -406,11 +400,11 @@ func (v *Volume) copyDataBasedOnIndexFile(dstName, idxName string, preallocate i
} }
newOffset += n.DiskSize(v.Version()) newOffset += n.DiskSize(v.Version())
glog.V(3).Infoln("saving key", n.Id, "volume offset", offset, "=>", newOffset, "data_size", n.Size) glog.V(3).Infoln("saving key", n.Id, "volume offset", offset, "=>", newOffset, "data_size", n.Size)
}
return nil return nil
}) })
nm.SaveToIdx(idxName)
newNm.SaveToIdx(idxName)
return return
} }
Loading…
Cancel
Save