Browse Source

Rework volume compaction (a.k.a vacuuming) logic to cleanly support new parameters. (#8337)

We'll leverage on this to support a "ignore broken needles" option, necessary
to properly recover damaged volumes, as described in
https://github.com/seaweedfs/seaweedfs/issues/7442#issuecomment-3897784283 .
pull/8352/head
Lisandro Pin 3 days ago
committed by GitHub
parent
commit
0721e3c1e9
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 2
      weed/command/backup.go
  2. 29
      weed/command/compact.go
  3. 9
      weed/storage/store_vacuum.go
  4. 92
      weed/storage/volume_vacuum.go
  5. 8
      weed/storage/volume_vacuum_test.go

2
weed/command/backup.go

@ -137,7 +137,7 @@ func backupFromLocation(volumeServer pb.ServerAddress, grpcDialOption grpc.DialO
// Handle compaction if needed
if v.SuperBlock.CompactionRevision < uint16(stats.CompactRevision) {
if err = v.Compact2(0, 0, nil); err != nil {
if err = v.CompactByIndex(nil); err != nil {
v.Close()
return fmt.Errorf("compacting volume: %w", err), false
}

29
weed/command/compact.go

@ -1,6 +1,8 @@
package command
import (
"strings"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/storage"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
@ -18,8 +20,9 @@ var cmdCompact = &Command{
The compacted .dat file is stored as .cpd file.
The compacted .idx file is stored as .cpx file.
For method=0, it compacts based on the .dat file, works if .idx file is corrupted.
For method=1, it compacts based on the .idx file, works if deletion happened but not written to .dat files.
Supports two compaction methods:
* data: compacts based on the .dat file, works if .idx file is corrupted.
* index: compacts based on the .idx file, works if deletion happened but not written to .dat files.
`,
}
@ -28,7 +31,7 @@ var (
compactVolumePath = cmdCompact.Flag.String("dir", ".", "data directory to store files")
compactVolumeCollection = cmdCompact.Flag.String("collection", "", "volume collection name")
compactVolumeId = cmdCompact.Flag.Int("volumeId", -1, "a volume id. The volume should already exist in the dir.")
compactMethod = cmdCompact.Flag.Int("method", 0, "option to choose which compact method. use 0 (default) or 1.")
compactMethod = cmdCompact.Flag.String("method", "data", "option to choose which compact method (data/index)")
compactVolumePreallocate = cmdCompact.Flag.Int64("preallocateMB", 0, "preallocate volume disk space")
)
@ -38,21 +41,29 @@ func runCompact(cmd *Command, args []string) bool {
return false
}
preallocate := *compactVolumePreallocate * (1 << 20)
preallocateBytes := *compactVolumePreallocate * (1 << 20)
vid := needle.VolumeId(*compactVolumeId)
v, err := storage.NewVolume(util.ResolvePath(*compactVolumePath), util.ResolvePath(*compactVolumePath), *compactVolumeCollection, vid, storage.NeedleMapInMemory, nil, nil, preallocate, needle.GetCurrentVersion(), 0, 0)
v, err := storage.NewVolume(util.ResolvePath(*compactVolumePath), util.ResolvePath(*compactVolumePath), *compactVolumeCollection, vid, storage.NeedleMapInMemory, nil, nil, preallocateBytes, needle.GetCurrentVersion(), 0, 0)
if err != nil {
glog.Fatalf("Load Volume [ERROR] %s\n", err)
}
if *compactMethod == 0 {
if err = v.Compact(preallocate, 0); err != nil {
opts := &storage.CompactOptions{
PreallocateBytes: preallocateBytes,
MaxBytesPerSecond: 0, // unlimited
}
switch strings.ToLower(*compactMethod) {
case "data":
if err = v.CompactByVolumeData(opts); err != nil {
glog.Fatalf("Compact Volume [ERROR] %s\n", err)
}
} else {
if err = v.Compact2(preallocate, 0, nil); err != nil {
case "index":
if err = v.CompactByIndex(opts); err != nil {
glog.Fatalf("Compact Volume [ERROR] %s\n", err)
}
default:
glog.Fatalf("unsupported compaction method %q", *compactMethod)
}
return true

9
weed/storage/store_vacuum.go

@ -16,6 +16,7 @@ func (s *Store) CheckCompactVolume(volumeId needle.VolumeId) (float64, error) {
}
return 0, fmt.Errorf("volume id %d is not found during check compact", volumeId)
}
func (s *Store) CompactVolume(vid needle.VolumeId, preallocate int64, compactionBytePerSecond int64, progressFn ProgressFunc) error {
if v := s.findVolume(vid); v != nil {
// Get current volume size for space calculation
@ -39,10 +40,15 @@ func (s *Store) CompactVolume(vid needle.VolumeId, preallocate int64, compaction
glog.V(1).Infof("volume %d compaction space check: volume=%d, index=%d, space_needed=%d, free_space=%d",
vid, volumeSize, indexSize, spaceNeeded, diskStatus.Free)
return v.Compact2(preallocate, compactionBytePerSecond, progressFn)
return v.CompactByIndex(&CompactOptions{
PreallocateBytes: preallocate,
MaxBytesPerSecond: compactionBytePerSecond,
ProgressCallback: progressFn,
})
}
return fmt.Errorf("volume id %d is not found during compact", vid)
}
func (s *Store) CommitCompactVolume(vid needle.VolumeId) (bool, int64, error) {
if s.isStopping {
return false, 0, fmt.Errorf("volume id %d skips compact because volume is stopping", vid)
@ -58,6 +64,7 @@ func (s *Store) CommitCompactVolume(vid needle.VolumeId) (bool, int64, error) {
}
return false, 0, fmt.Errorf("volume id %d is not found during commit compact", vid)
}
func (s *Store) CommitCleanupVolume(vid needle.VolumeId) error {
if v := s.findVolume(vid); v != nil {
return v.cleanupCompact()

92
weed/storage/volume_vacuum.go

@ -35,8 +35,26 @@ func (v *Volume) garbageLevel() float64 {
return float64(deletedSize) / float64(fileSize)
}
type CompactOptions struct {
PreallocateBytes int64
MaxBytesPerSecond int64
ProgressCallback ProgressFunc
// internal state settings
srcDatPath string
srcIdxPath string
destDatPath string
destIdxPath string
superBlock super_block.SuperBlock
version needle.Version
}
// compact a volume based on deletions in .dat files
func (v *Volume) Compact(preallocate int64, compactionBytePerSecond int64) error {
func (v *Volume) CompactByVolumeData(opts *CompactOptions) error {
if opts == nil {
// default settings
opts = &CompactOptions{}
}
if v.MemoryMapMaxSizeMb != 0 { //it makes no sense to compact in memory
return nil
@ -64,11 +82,20 @@ func (v *Volume) Compact(preallocate int64, compactionBytePerSecond int64) error
if err := v.nm.Sync(); err != nil {
glog.V(0).Infof("compact failed to sync volume idx %d", v.Id)
}
return v.copyDataAndGenerateIndexFile(v.FileName(".cpd"), v.FileName(".cpx"), preallocate, compactionBytePerSecond)
opts.destDatPath = v.FileName(".cpd")
opts.destIdxPath = v.FileName(".cpx")
opts.superBlock = v.SuperBlock
opts.version = v.Version()
return v.copyDataAndGenerateIndexFile(opts)
}
// compact a volume based on deletions in .idx files
func (v *Volume) Compact2(preallocate int64, compactionBytePerSecond int64, progressFn ProgressFunc) error {
func (v *Volume) CompactByIndex(opts *CompactOptions) error {
if opts == nil {
// default settings
opts = &CompactOptions{}
}
if v.MemoryMapMaxSizeMb != 0 { //it makes no sense to compact in memory
return nil
@ -96,15 +123,14 @@ func (v *Volume) Compact2(preallocate int64, compactionBytePerSecond int64, prog
if err := v.nm.Sync(); err != nil {
glog.V(0).Infof("compact2 failed to sync volume idx %d: %v", v.Id, err)
}
return v.copyDataBasedOnIndexFile(
v.FileName(".dat"), v.FileName(".idx"),
v.FileName(".cpd"), v.FileName(".cpx"),
v.SuperBlock,
v.Version(),
preallocate,
compactionBytePerSecond,
progressFn,
)
opts.srcDatPath = v.FileName(".dat")
opts.srcIdxPath = v.FileName(".idx")
opts.destDatPath = v.FileName(".cpd")
opts.destIdxPath = v.FileName(".cpx")
opts.superBlock = v.SuperBlock
opts.version = v.Version()
return v.copyDataBasedOnIndexFile(opts)
}
func (v *Volume) CommitCompact() error {
@ -403,9 +429,9 @@ func (scanner *VolumeFileScanner4Vacuum) VisitNeedle(n *needle.Needle, offset in
return nil
}
func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string, preallocate int64, compactionBytePerSecond int64) (err error) {
func (v *Volume) copyDataAndGenerateIndexFile(opts *CompactOptions) (err error) {
var dst backend.BackendStorageFile
if dst, err = backend.CreateVolumeFile(dstName, preallocate, 0); err != nil {
if dst, err = backend.CreateVolumeFile(opts.destDatPath, opts.PreallocateBytes, 0); err != nil {
return err
}
defer dst.Close()
@ -418,7 +444,7 @@ func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string, prealloca
now: uint64(time.Now().Unix()),
nm: nm,
dstBackend: dst,
writeThrottler: util.NewWriteThrottler(compactionBytePerSecond),
writeThrottler: util.NewWriteThrottler(opts.MaxBytesPerSecond),
}
err = ScanVolumeFile(v.dir, v.Collection, v.Id, v.needleMapKind, scanner)
if err != nil {
@ -426,15 +452,15 @@ func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string, prealloca
return err
}
return nm.SaveToIdx(idxName)
return nm.SaveToIdx(opts.destIdxPath)
}
func (v *Volume) copyDataBasedOnIndexFile(srcDatName, srcIdxName, dstDatName, datIdxName string, sb super_block.SuperBlock, version needle.Version, preallocate, compactionBytePerSecond int64, progressFn ProgressFunc) (err error) {
func (v *Volume) copyDataBasedOnIndexFile(opts *CompactOptions) (err error) {
var (
srcDatBackend, dstDatBackend backend.BackendStorageFile
dataFile *os.File
)
if dstDatBackend, err = backend.CreateVolumeFile(dstDatName, preallocate, 0); err != nil {
if dstDatBackend, err = backend.CreateVolumeFile(opts.destDatPath, opts.PreallocateBytes, 0); err != nil {
return err
}
defer func() {
@ -446,10 +472,10 @@ func (v *Volume) copyDataBasedOnIndexFile(srcDatName, srcIdxName, dstDatName, da
defer oldNm.Close()
newNm := needle_map.NewMemDb()
defer newNm.Close()
if err = oldNm.LoadFromIdx(srcIdxName); err != nil {
if err = oldNm.LoadFromIdx(opts.srcIdxPath); err != nil {
return err
}
if dataFile, err = os.Open(srcDatName); err != nil {
if dataFile, err = os.Open(opts.srcDatPath); err != nil {
return err
}
srcDatBackend = backend.NewDiskFile(dataFile)
@ -457,11 +483,11 @@ func (v *Volume) copyDataBasedOnIndexFile(srcDatName, srcIdxName, dstDatName, da
now := uint64(time.Now().Unix())
sb.CompactionRevision++
dstDatBackend.WriteAt(sb.Bytes(), 0)
newOffset := int64(sb.BlockSize())
opts.superBlock.CompactionRevision++
dstDatBackend.WriteAt(opts.superBlock.Bytes(), 0)
newOffset := int64(opts.superBlock.BlockSize())
writeThrottler := util.NewWriteThrottler(compactionBytePerSecond)
writeThrottler := util.NewWriteThrottler(opts.MaxBytesPerSecond)
err = oldNm.AscendingVisit(func(value needle_map.NeedleValue) error {
offset, size := value.Offset, value.Size
@ -470,29 +496,29 @@ func (v *Volume) copyDataBasedOnIndexFile(srcDatName, srcIdxName, dstDatName, da
return nil
}
if progressFn != nil {
if !progressFn(offset.ToActualOffset()) {
if opts.ProgressCallback != nil {
if !opts.ProgressCallback(offset.ToActualOffset()) {
return fmt.Errorf("interrupted")
}
}
n := new(needle.Needle)
if err := n.ReadData(srcDatBackend, offset.ToActualOffset(), size, version); err != nil {
if err := n.ReadData(srcDatBackend, offset.ToActualOffset(), size, opts.version); err != nil {
v.checkReadWriteError(err)
return fmt.Errorf("cannot hydrate needle from file: %w", err)
}
if n.HasTtl() && now >= n.LastModified+uint64(sb.Ttl.Minutes()*60) {
if n.HasTtl() && now >= n.LastModified+uint64(opts.superBlock.Ttl.Minutes()*60) {
return nil
}
if err = newNm.Set(n.Id, ToOffset(newOffset), n.Size); err != nil {
return fmt.Errorf("cannot put needle: %s", err)
}
if _, _, _, err = n.Append(dstDatBackend, sb.Version); err != nil {
if _, _, _, err = n.Append(dstDatBackend, opts.superBlock.Version); err != nil {
return fmt.Errorf("cannot append needle: %s", err)
}
delta := n.DiskSize(version)
delta := n.DiskSize(opts.version)
newOffset += delta
writeThrottler.MaybeSlowdown(delta)
glog.V(4).Infoln("saving key", n.Id, "volume offset", offset, "=>", newOffset, "data_size", n.Size)
@ -518,14 +544,14 @@ func (v *Volume) copyDataBasedOnIndexFile(srcDatName, srcIdxName, dstDatName, da
v.Id.String(), v.nm.ContentSize(), v.nm.DeletedSize(), dstDatSize)
}
}
err = newNm.SaveToIdx(datIdxName)
err = newNm.SaveToIdx(opts.destIdxPath)
if err != nil {
return err
}
indexFile, err := os.OpenFile(datIdxName, os.O_RDWR|os.O_CREATE, 0644)
indexFile, err := os.OpenFile(opts.destIdxPath, os.O_RDWR|os.O_CREATE, 0644)
if err != nil {
glog.Errorf("cannot open Volume Index %s: %v", datIdxName, err)
glog.Errorf("cannot open Volume Index %s: %v", opts.destIdxPath, err)
return err
}
defer func() {

8
weed/storage/volume_vacuum_test.go

@ -62,14 +62,14 @@ func TestMakeDiff(t *testing.T) {
}
func TestMemIndexCompaction(t *testing.T) {
testCompaction(t, NeedleMapInMemory)
testCompactionByIndex(t, NeedleMapInMemory)
}
func TestLDBIndexCompaction(t *testing.T) {
testCompaction(t, NeedleMapLevelDb)
testCompactionByIndex(t, NeedleMapLevelDb)
}
func testCompaction(t *testing.T, needleMapKind NeedleMapKind) {
func testCompactionByIndex(t *testing.T, needleMapKind NeedleMapKind) {
dir := t.TempDir()
v, err := NewVolume(dir, dir, "", 1, needleMapKind, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, needle.GetCurrentVersion(), 0, 0)
@ -87,7 +87,7 @@ func testCompaction(t *testing.T, needleMapKind NeedleMapKind) {
}
startTime := time.Now()
v.Compact2(0, 0, nil)
v.CompactByIndex(nil)
speed := float64(v.ContentSize()) / time.Now().Sub(startTime).Seconds()
t.Logf("compaction speed: %.2f bytes/s", speed)

Loading…
Cancel
Save