Browse Source

compaction changed to .idx based deletion

pull/1171/head
Chris Lu 5 years ago
parent
commit
efd2f50ede
  1. 2
      weed/command/backup.go
  2. 5
      weed/command/compact.go
  3. 2
      weed/storage/needle_map_memory.go
  4. 31
      weed/storage/needle_map_metric_test.go
  5. 2
      weed/storage/store_vacuum.go
  6. 2
      weed/storage/volume_loading.go
  7. 32
      weed/storage/volume_vacuum.go
  8. 2
      weed/storage/volume_vacuum_test.go

2
weed/command/backup.go

@ -121,7 +121,7 @@ func runBackup(cmd *Command, args []string) bool {
} }
if v.SuperBlock.CompactionRevision < uint16(stats.CompactRevision) { if v.SuperBlock.CompactionRevision < uint16(stats.CompactRevision) {
if err = v.Compact(0, 0); err != nil {
if err = v.Compact2(30 * 1024 * 1024 * 1024); err != nil {
fmt.Printf("Compact Volume before synchronizing %v\n", err) fmt.Printf("Compact Volume before synchronizing %v\n", err)
return true return true
} }

5
weed/command/compact.go

@ -17,6 +17,9 @@ var cmdCompact = &Command{
The compacted .dat file is stored as .cpd file. The compacted .dat file is stored as .cpd file.
The compacted .idx file is stored as .cpx file. The compacted .idx file is stored as .cpx file.
For method=0, it compacts based on the .dat file, works if .idx file is corrupted.
For method=1, it compacts based on the .idx file, works if deletion happened but not written to .dat files.
`, `,
} }
@ -47,7 +50,7 @@ func runCompact(cmd *Command, args []string) bool {
glog.Fatalf("Compact Volume [ERROR] %s\n", err) glog.Fatalf("Compact Volume [ERROR] %s\n", err)
} }
} else { } else {
if err = v.Compact2(); err != nil {
if err = v.Compact2(preallocate); err != nil {
glog.Fatalf("Compact Volume [ERROR] %s\n", err) glog.Fatalf("Compact Volume [ERROR] %s\n", err)
} }
} }

2
weed/storage/needle_map_memory.go

@ -34,14 +34,12 @@ func doLoading(file *os.File, nm *NeedleMap) (*NeedleMap, error) {
nm.FileCounter++ nm.FileCounter++
nm.FileByteCounter = nm.FileByteCounter + uint64(size) nm.FileByteCounter = nm.FileByteCounter + uint64(size)
oldOffset, oldSize := nm.m.Set(NeedleId(key), offset, size) oldOffset, oldSize := nm.m.Set(NeedleId(key), offset, size)
// glog.V(3).Infoln("reading key", key, "offset", offset*NeedlePaddingSize, "size", size, "oldSize", oldSize)
if !oldOffset.IsZero() && oldSize != TombstoneFileSize { if !oldOffset.IsZero() && oldSize != TombstoneFileSize {
nm.DeletionCounter++ nm.DeletionCounter++
nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize) nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize)
} }
} else { } else {
oldSize := nm.m.Delete(NeedleId(key)) oldSize := nm.m.Delete(NeedleId(key))
// glog.V(3).Infoln("removing key", key, "offset", offset*NeedlePaddingSize, "size", size, "oldSize", oldSize)
nm.DeletionCounter++ nm.DeletionCounter++
nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize) nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize)
} }

31
weed/storage/needle_map_metric_test.go

@ -0,0 +1,31 @@
package storage
import (
"io/ioutil"
"math/rand"
"testing"
"github.com/chrislusf/seaweedfs/weed/glog"
. "github.com/chrislusf/seaweedfs/weed/storage/types"
)
func TestFastLoadingNeedleMapMetrics(t *testing.T) {
idxFile, _ := ioutil.TempFile("", "tmp.idx")
nm := NewCompactNeedleMap(idxFile)
for i := 0; i < 10000; i++ {
nm.Put(Uint64ToNeedleId(uint64(i+1)), Uint32ToOffset(uint32(0)), uint32(1))
if rand.Float32() < 0.2 {
nm.Delete(Uint64ToNeedleId(uint64(rand.Int63n(int64(i))+1)), Uint32ToOffset(uint32(0)))
}
}
mm, _ := newNeedleMapMetricFromIndexFile(idxFile)
glog.V(0).Infof("FileCount expected %d actual %d", nm.FileCount(), mm.FileCount())
glog.V(0).Infof("DeletedSize expected %d actual %d", nm.DeletedSize(), mm.DeletedSize())
glog.V(0).Infof("ContentSize expected %d actual %d", nm.ContentSize(), mm.ContentSize())
glog.V(0).Infof("DeletedCount expected %d actual %d", nm.DeletedCount(), mm.DeletedCount())
glog.V(0).Infof("MaxFileKey expected %d actual %d", nm.MaxFileKey(), mm.MaxFileKey())
}

2
weed/storage/store_vacuum.go

@ -16,7 +16,7 @@ func (s *Store) CheckCompactVolume(volumeId needle.VolumeId) (float64, error) {
} }
func (s *Store) CompactVolume(vid needle.VolumeId, preallocate int64, compactionBytePerSecond int64) error { func (s *Store) CompactVolume(vid needle.VolumeId, preallocate int64, compactionBytePerSecond int64) error {
if v := s.findVolume(vid); v != nil { if v := s.findVolume(vid); v != nil {
return v.Compact(preallocate, compactionBytePerSecond)
return v.Compact2(preallocate) // compactionBytePerSecond
} }
return fmt.Errorf("volume id %d is not found during compact", vid) return fmt.Errorf("volume id %d is not found during compact", vid)
} }

2
weed/storage/volume_loading.go

@ -75,7 +75,7 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind
if err == nil && alsoLoadIndex { if err == nil && alsoLoadIndex {
var indexFile *os.File var indexFile *os.File
if v.noWriteOrDelete { if v.noWriteOrDelete {
glog.V(1).Infoln("open to read file", fileName+".idx")
glog.V(0).Infoln("open to read file", fileName+".idx")
if indexFile, err = os.OpenFile(fileName+".idx", os.O_RDONLY, 0644); err != nil { if indexFile, err = os.OpenFile(fileName+".idx", os.O_RDONLY, 0644); err != nil {
return fmt.Errorf("cannot read Volume Index %s.idx: %v", fileName, err) return fmt.Errorf("cannot read Volume Index %s.idx: %v", fileName, err)
} }

32
weed/storage/volume_vacuum.go

@ -20,9 +20,19 @@ func (v *Volume) garbageLevel() float64 {
if v.ContentSize() == 0 { if v.ContentSize() == 0 {
return 0 return 0
} }
return float64(v.DeletedSize()) / float64(v.ContentSize())
deletedSize := v.DeletedSize()
fileSize := v.ContentSize()
if v.DeletedCount() > 0 && v.DeletedSize() == 0 {
// this happens for .sdx converted back to normal .idx
// where deleted entry size is missing
datFileSize, _, _ := v.FileStat()
deletedSize = datFileSize - fileSize - super_block.SuperBlockSize
fileSize = datFileSize
}
return float64(deletedSize) / float64(fileSize)
} }
// compact a volume based on deletions in .dat files
func (v *Volume) Compact(preallocate int64, compactionBytePerSecond int64) error { func (v *Volume) Compact(preallocate int64, compactionBytePerSecond int64) error {
if v.MemoryMapMaxSizeMb != 0 { //it makes no sense to compact in memory if v.MemoryMapMaxSizeMb != 0 { //it makes no sense to compact in memory
@ -45,7 +55,8 @@ func (v *Volume) Compact(preallocate int64, compactionBytePerSecond int64) error
return v.copyDataAndGenerateIndexFile(filePath+".cpd", filePath+".cpx", preallocate, compactionBytePerSecond) return v.copyDataAndGenerateIndexFile(filePath+".cpd", filePath+".cpx", preallocate, compactionBytePerSecond)
} }
func (v *Volume) Compact2() error {
// compact a volume based on deletions in .idx files
func (v *Volume) Compact2(preallocate int64) error {
if v.MemoryMapMaxSizeMb != 0 { //it makes no sense to compact in memory if v.MemoryMapMaxSizeMb != 0 { //it makes no sense to compact in memory
return nil return nil
@ -58,8 +69,10 @@ func (v *Volume) Compact2() error {
}() }()
filePath := v.FileName() filePath := v.FileName()
v.lastCompactIndexOffset = v.IndexFileSize()
v.lastCompactRevision = v.SuperBlock.CompactionRevision
glog.V(3).Infof("creating copies for volume %d ...", v.Id) glog.V(3).Infof("creating copies for volume %d ...", v.Id)
return v.copyDataBasedOnIndexFile(filePath+".cpd", filePath+".cpx")
return v.copyDataBasedOnIndexFile(filePath+".cpd", filePath+".cpx", preallocate)
} }
func (v *Volume) CommitCompact() error { func (v *Volume) CommitCompact() error {
@ -140,6 +153,7 @@ func fetchCompactRevisionFromDatFile(datBackend backend.BackendStorageFile) (com
return superBlock.CompactionRevision, nil return superBlock.CompactionRevision, nil
} }
// if old .dat and .idx files are updated, this func tries to apply the same changes to new files accordingly
func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldIdxFileName string) (err error) { func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldIdxFileName string) (err error) {
var indexSize int64 var indexSize int64
@ -150,6 +164,7 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI
oldDatBackend := backend.NewDiskFile(oldDatFile) oldDatBackend := backend.NewDiskFile(oldDatFile)
defer oldDatBackend.Close() defer oldDatBackend.Close()
// skip if the old .idx file has not changed
if indexSize, err = verifyIndexFileIntegrity(oldIdxFile); err != nil { if indexSize, err = verifyIndexFileIntegrity(oldIdxFile); err != nil {
return fmt.Errorf("verifyIndexFileIntegrity %s failed: %v", oldIdxFileName, err) return fmt.Errorf("verifyIndexFileIntegrity %s failed: %v", oldIdxFileName, err)
} }
@ -157,6 +172,7 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI
return nil return nil
} }
// fail if the old .dat file has changed to a new revision
oldDatCompactRevision, err := fetchCompactRevisionFromDatFile(oldDatBackend) oldDatCompactRevision, err := fetchCompactRevisionFromDatFile(oldDatBackend)
if err != nil { if err != nil {
return fmt.Errorf("fetchCompactRevisionFromDatFile src %s failed: %v", oldDatFile.Name(), err) return fmt.Errorf("fetchCompactRevisionFromDatFile src %s failed: %v", oldDatFile.Name(), err)
@ -337,14 +353,14 @@ func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string, prealloca
return return
} }
func (v *Volume) copyDataBasedOnIndexFile(dstName, idxName string) (err error) {
func (v *Volume) copyDataBasedOnIndexFile(dstName, idxName string, preallocate int64) (err error) {
var ( var (
dst, oldIndexFile *os.File
dstDatBackend backend.BackendStorageFile
oldIndexFile *os.File
) )
if dst, err = os.OpenFile(dstName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644); err != nil {
if dstDatBackend, err = createVolumeFile(dstName, preallocate, 0); err != nil {
return return
} }
dstDatBackend := backend.NewDiskFile(dst)
defer dstDatBackend.Close() defer dstDatBackend.Close()
if oldIndexFile, err = os.OpenFile(v.FileName()+".idx", os.O_RDONLY, 0644); err != nil { if oldIndexFile, err = os.OpenFile(v.FileName()+".idx", os.O_RDONLY, 0644); err != nil {
@ -357,7 +373,7 @@ func (v *Volume) copyDataBasedOnIndexFile(dstName, idxName string) (err error) {
now := uint64(time.Now().Unix()) now := uint64(time.Now().Unix())
v.SuperBlock.CompactionRevision++ v.SuperBlock.CompactionRevision++
dst.Write(v.SuperBlock.Bytes())
dstDatBackend.WriteAt(v.SuperBlock.Bytes(), 0)
newOffset := int64(v.SuperBlock.BlockSize()) newOffset := int64(v.SuperBlock.BlockSize())
idx2.WalkIndexFile(oldIndexFile, func(key NeedleId, offset Offset, size uint32) error { idx2.WalkIndexFile(oldIndexFile, func(key NeedleId, offset Offset, size uint32) error {

2
weed/storage/volume_vacuum_test.go

@ -84,7 +84,7 @@ func TestCompaction(t *testing.T) {
} }
startTime := time.Now() startTime := time.Now()
v.Compact(0, 1024*1024)
v.Compact2(0)
speed := float64(v.ContentSize()) / time.Now().Sub(startTime).Seconds() speed := float64(v.ContentSize()) / time.Now().Sub(startTime).Seconds()
t.Logf("compaction speed: %.2f bytes/s", speed) t.Logf("compaction speed: %.2f bytes/s", speed)

Loading…
Cancel
Save