From 0721e3c1e9d8a23549bdaa1018222e4b14dd26ca Mon Sep 17 00:00:00 2001 From: Lisandro Pin Date: Mon, 16 Feb 2026 11:15:14 +0100 Subject: [PATCH] Rework volume compaction (a.k.a vacuuming) logic to cleanly support new parameters. (#8337) We'll leverage on this to support a "ignore broken needles" option, necessary to properly recover damaged volumes, as described in https://github.com/seaweedfs/seaweedfs/issues/7442#issuecomment-3897784283 . --- weed/command/backup.go | 2 +- weed/command/compact.go | 29 +++++++--- weed/storage/store_vacuum.go | 9 ++- weed/storage/volume_vacuum.go | 92 +++++++++++++++++++----------- weed/storage/volume_vacuum_test.go | 8 +-- 5 files changed, 92 insertions(+), 48 deletions(-) diff --git a/weed/command/backup.go b/weed/command/backup.go index 59499d789..df5b106d2 100644 --- a/weed/command/backup.go +++ b/weed/command/backup.go @@ -137,7 +137,7 @@ func backupFromLocation(volumeServer pb.ServerAddress, grpcDialOption grpc.DialO // Handle compaction if needed if v.SuperBlock.CompactionRevision < uint16(stats.CompactRevision) { - if err = v.Compact2(0, 0, nil); err != nil { + if err = v.CompactByIndex(nil); err != nil { v.Close() return fmt.Errorf("compacting volume: %w", err), false } diff --git a/weed/command/compact.go b/weed/command/compact.go index 59e69bc74..f6117e237 100644 --- a/weed/command/compact.go +++ b/weed/command/compact.go @@ -1,6 +1,8 @@ package command import ( + "strings" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/storage" "github.com/seaweedfs/seaweedfs/weed/storage/needle" @@ -18,8 +20,9 @@ var cmdCompact = &Command{ The compacted .dat file is stored as .cpd file. The compacted .idx file is stored as .cpx file. - For method=0, it compacts based on the .dat file, works if .idx file is corrupted. - For method=1, it compacts based on the .idx file, works if deletion happened but not written to .dat files. + Supports two compaction methods: + * data: compacts based on the .dat file, works if .idx file is corrupted. + * index: compacts based on the .idx file, works if deletion happened but not written to .dat files. `, } @@ -28,7 +31,7 @@ var ( compactVolumePath = cmdCompact.Flag.String("dir", ".", "data directory to store files") compactVolumeCollection = cmdCompact.Flag.String("collection", "", "volume collection name") compactVolumeId = cmdCompact.Flag.Int("volumeId", -1, "a volume id. The volume should already exist in the dir.") - compactMethod = cmdCompact.Flag.Int("method", 0, "option to choose which compact method. use 0 (default) or 1.") + compactMethod = cmdCompact.Flag.String("method", "data", "option to choose which compact method (data/index)") compactVolumePreallocate = cmdCompact.Flag.Int64("preallocateMB", 0, "preallocate volume disk space") ) @@ -38,21 +41,29 @@ func runCompact(cmd *Command, args []string) bool { return false } - preallocate := *compactVolumePreallocate * (1 << 20) + preallocateBytes := *compactVolumePreallocate * (1 << 20) vid := needle.VolumeId(*compactVolumeId) - v, err := storage.NewVolume(util.ResolvePath(*compactVolumePath), util.ResolvePath(*compactVolumePath), *compactVolumeCollection, vid, storage.NeedleMapInMemory, nil, nil, preallocate, needle.GetCurrentVersion(), 0, 0) + v, err := storage.NewVolume(util.ResolvePath(*compactVolumePath), util.ResolvePath(*compactVolumePath), *compactVolumeCollection, vid, storage.NeedleMapInMemory, nil, nil, preallocateBytes, needle.GetCurrentVersion(), 0, 0) if err != nil { glog.Fatalf("Load Volume [ERROR] %s\n", err) } - if *compactMethod == 0 { - if err = v.Compact(preallocate, 0); err != nil { + + opts := &storage.CompactOptions{ + PreallocateBytes: preallocateBytes, + MaxBytesPerSecond: 0, // unlimited + } + switch strings.ToLower(*compactMethod) { + case "data": + if err = v.CompactByVolumeData(opts); err != nil { glog.Fatalf("Compact Volume [ERROR] %s\n", err) } - } else { - if err = v.Compact2(preallocate, 0, nil); err != nil { + case "index": + if err = v.CompactByIndex(opts); err != nil { glog.Fatalf("Compact Volume [ERROR] %s\n", err) } + default: + glog.Fatalf("unsupported compaction method %q", *compactMethod) } return true diff --git a/weed/storage/store_vacuum.go b/weed/storage/store_vacuum.go index 3c9b5f79b..178d96b08 100644 --- a/weed/storage/store_vacuum.go +++ b/weed/storage/store_vacuum.go @@ -16,6 +16,7 @@ func (s *Store) CheckCompactVolume(volumeId needle.VolumeId) (float64, error) { } return 0, fmt.Errorf("volume id %d is not found during check compact", volumeId) } + func (s *Store) CompactVolume(vid needle.VolumeId, preallocate int64, compactionBytePerSecond int64, progressFn ProgressFunc) error { if v := s.findVolume(vid); v != nil { // Get current volume size for space calculation @@ -39,10 +40,15 @@ func (s *Store) CompactVolume(vid needle.VolumeId, preallocate int64, compaction glog.V(1).Infof("volume %d compaction space check: volume=%d, index=%d, space_needed=%d, free_space=%d", vid, volumeSize, indexSize, spaceNeeded, diskStatus.Free) - return v.Compact2(preallocate, compactionBytePerSecond, progressFn) + return v.CompactByIndex(&CompactOptions{ + PreallocateBytes: preallocate, + MaxBytesPerSecond: compactionBytePerSecond, + ProgressCallback: progressFn, + }) } return fmt.Errorf("volume id %d is not found during compact", vid) } + func (s *Store) CommitCompactVolume(vid needle.VolumeId) (bool, int64, error) { if s.isStopping { return false, 0, fmt.Errorf("volume id %d skips compact because volume is stopping", vid) @@ -58,6 +64,7 @@ func (s *Store) CommitCompactVolume(vid needle.VolumeId) (bool, int64, error) { } return false, 0, fmt.Errorf("volume id %d is not found during commit compact", vid) } + func (s *Store) CommitCleanupVolume(vid needle.VolumeId) error { if v := s.findVolume(vid); v != nil { return v.cleanupCompact() diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go index 17c53023d..4cc3911a5 100644 --- a/weed/storage/volume_vacuum.go +++ b/weed/storage/volume_vacuum.go @@ -35,8 +35,26 @@ func (v *Volume) garbageLevel() float64 { return float64(deletedSize) / float64(fileSize) } +type CompactOptions struct { + PreallocateBytes int64 + MaxBytesPerSecond int64 + ProgressCallback ProgressFunc + + // internal state settings + srcDatPath string + srcIdxPath string + destDatPath string + destIdxPath string + superBlock super_block.SuperBlock + version needle.Version +} + // compact a volume based on deletions in .dat files -func (v *Volume) Compact(preallocate int64, compactionBytePerSecond int64) error { +func (v *Volume) CompactByVolumeData(opts *CompactOptions) error { + if opts == nil { + // default settings + opts = &CompactOptions{} + } if v.MemoryMapMaxSizeMb != 0 { //it makes no sense to compact in memory return nil @@ -64,11 +82,20 @@ func (v *Volume) Compact(preallocate int64, compactionBytePerSecond int64) error if err := v.nm.Sync(); err != nil { glog.V(0).Infof("compact failed to sync volume idx %d", v.Id) } - return v.copyDataAndGenerateIndexFile(v.FileName(".cpd"), v.FileName(".cpx"), preallocate, compactionBytePerSecond) + + opts.destDatPath = v.FileName(".cpd") + opts.destIdxPath = v.FileName(".cpx") + opts.superBlock = v.SuperBlock + opts.version = v.Version() + return v.copyDataAndGenerateIndexFile(opts) } // compact a volume based on deletions in .idx files -func (v *Volume) Compact2(preallocate int64, compactionBytePerSecond int64, progressFn ProgressFunc) error { +func (v *Volume) CompactByIndex(opts *CompactOptions) error { + if opts == nil { + // default settings + opts = &CompactOptions{} + } if v.MemoryMapMaxSizeMb != 0 { //it makes no sense to compact in memory return nil @@ -96,15 +123,14 @@ func (v *Volume) Compact2(preallocate int64, compactionBytePerSecond int64, prog if err := v.nm.Sync(); err != nil { glog.V(0).Infof("compact2 failed to sync volume idx %d: %v", v.Id, err) } - return v.copyDataBasedOnIndexFile( - v.FileName(".dat"), v.FileName(".idx"), - v.FileName(".cpd"), v.FileName(".cpx"), - v.SuperBlock, - v.Version(), - preallocate, - compactionBytePerSecond, - progressFn, - ) + + opts.srcDatPath = v.FileName(".dat") + opts.srcIdxPath = v.FileName(".idx") + opts.destDatPath = v.FileName(".cpd") + opts.destIdxPath = v.FileName(".cpx") + opts.superBlock = v.SuperBlock + opts.version = v.Version() + return v.copyDataBasedOnIndexFile(opts) } func (v *Volume) CommitCompact() error { @@ -403,9 +429,9 @@ func (scanner *VolumeFileScanner4Vacuum) VisitNeedle(n *needle.Needle, offset in return nil } -func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string, preallocate int64, compactionBytePerSecond int64) (err error) { +func (v *Volume) copyDataAndGenerateIndexFile(opts *CompactOptions) (err error) { var dst backend.BackendStorageFile - if dst, err = backend.CreateVolumeFile(dstName, preallocate, 0); err != nil { + if dst, err = backend.CreateVolumeFile(opts.destDatPath, opts.PreallocateBytes, 0); err != nil { return err } defer dst.Close() @@ -418,7 +444,7 @@ func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string, prealloca now: uint64(time.Now().Unix()), nm: nm, dstBackend: dst, - writeThrottler: util.NewWriteThrottler(compactionBytePerSecond), + writeThrottler: util.NewWriteThrottler(opts.MaxBytesPerSecond), } err = ScanVolumeFile(v.dir, v.Collection, v.Id, v.needleMapKind, scanner) if err != nil { @@ -426,15 +452,15 @@ func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string, prealloca return err } - return nm.SaveToIdx(idxName) + return nm.SaveToIdx(opts.destIdxPath) } -func (v *Volume) copyDataBasedOnIndexFile(srcDatName, srcIdxName, dstDatName, datIdxName string, sb super_block.SuperBlock, version needle.Version, preallocate, compactionBytePerSecond int64, progressFn ProgressFunc) (err error) { +func (v *Volume) copyDataBasedOnIndexFile(opts *CompactOptions) (err error) { var ( srcDatBackend, dstDatBackend backend.BackendStorageFile dataFile *os.File ) - if dstDatBackend, err = backend.CreateVolumeFile(dstDatName, preallocate, 0); err != nil { + if dstDatBackend, err = backend.CreateVolumeFile(opts.destDatPath, opts.PreallocateBytes, 0); err != nil { return err } defer func() { @@ -446,10 +472,10 @@ func (v *Volume) copyDataBasedOnIndexFile(srcDatName, srcIdxName, dstDatName, da defer oldNm.Close() newNm := needle_map.NewMemDb() defer newNm.Close() - if err = oldNm.LoadFromIdx(srcIdxName); err != nil { + if err = oldNm.LoadFromIdx(opts.srcIdxPath); err != nil { return err } - if dataFile, err = os.Open(srcDatName); err != nil { + if dataFile, err = os.Open(opts.srcDatPath); err != nil { return err } srcDatBackend = backend.NewDiskFile(dataFile) @@ -457,11 +483,11 @@ func (v *Volume) copyDataBasedOnIndexFile(srcDatName, srcIdxName, dstDatName, da now := uint64(time.Now().Unix()) - sb.CompactionRevision++ - dstDatBackend.WriteAt(sb.Bytes(), 0) - newOffset := int64(sb.BlockSize()) + opts.superBlock.CompactionRevision++ + dstDatBackend.WriteAt(opts.superBlock.Bytes(), 0) + newOffset := int64(opts.superBlock.BlockSize()) - writeThrottler := util.NewWriteThrottler(compactionBytePerSecond) + writeThrottler := util.NewWriteThrottler(opts.MaxBytesPerSecond) err = oldNm.AscendingVisit(func(value needle_map.NeedleValue) error { offset, size := value.Offset, value.Size @@ -470,29 +496,29 @@ func (v *Volume) copyDataBasedOnIndexFile(srcDatName, srcIdxName, dstDatName, da return nil } - if progressFn != nil { - if !progressFn(offset.ToActualOffset()) { + if opts.ProgressCallback != nil { + if !opts.ProgressCallback(offset.ToActualOffset()) { return fmt.Errorf("interrupted") } } n := new(needle.Needle) - if err := n.ReadData(srcDatBackend, offset.ToActualOffset(), size, version); err != nil { + if err := n.ReadData(srcDatBackend, offset.ToActualOffset(), size, opts.version); err != nil { v.checkReadWriteError(err) return fmt.Errorf("cannot hydrate needle from file: %w", err) } - if n.HasTtl() && now >= n.LastModified+uint64(sb.Ttl.Minutes()*60) { + if n.HasTtl() && now >= n.LastModified+uint64(opts.superBlock.Ttl.Minutes()*60) { return nil } if err = newNm.Set(n.Id, ToOffset(newOffset), n.Size); err != nil { return fmt.Errorf("cannot put needle: %s", err) } - if _, _, _, err = n.Append(dstDatBackend, sb.Version); err != nil { + if _, _, _, err = n.Append(dstDatBackend, opts.superBlock.Version); err != nil { return fmt.Errorf("cannot append needle: %s", err) } - delta := n.DiskSize(version) + delta := n.DiskSize(opts.version) newOffset += delta writeThrottler.MaybeSlowdown(delta) glog.V(4).Infoln("saving key", n.Id, "volume offset", offset, "=>", newOffset, "data_size", n.Size) @@ -518,14 +544,14 @@ func (v *Volume) copyDataBasedOnIndexFile(srcDatName, srcIdxName, dstDatName, da v.Id.String(), v.nm.ContentSize(), v.nm.DeletedSize(), dstDatSize) } } - err = newNm.SaveToIdx(datIdxName) + err = newNm.SaveToIdx(opts.destIdxPath) if err != nil { return err } - indexFile, err := os.OpenFile(datIdxName, os.O_RDWR|os.O_CREATE, 0644) + indexFile, err := os.OpenFile(opts.destIdxPath, os.O_RDWR|os.O_CREATE, 0644) if err != nil { - glog.Errorf("cannot open Volume Index %s: %v", datIdxName, err) + glog.Errorf("cannot open Volume Index %s: %v", opts.destIdxPath, err) return err } defer func() { diff --git a/weed/storage/volume_vacuum_test.go b/weed/storage/volume_vacuum_test.go index 9022fc5c7..7bf1e16cd 100644 --- a/weed/storage/volume_vacuum_test.go +++ b/weed/storage/volume_vacuum_test.go @@ -62,14 +62,14 @@ func TestMakeDiff(t *testing.T) { } func TestMemIndexCompaction(t *testing.T) { - testCompaction(t, NeedleMapInMemory) + testCompactionByIndex(t, NeedleMapInMemory) } func TestLDBIndexCompaction(t *testing.T) { - testCompaction(t, NeedleMapLevelDb) + testCompactionByIndex(t, NeedleMapLevelDb) } -func testCompaction(t *testing.T, needleMapKind NeedleMapKind) { +func testCompactionByIndex(t *testing.T, needleMapKind NeedleMapKind) { dir := t.TempDir() v, err := NewVolume(dir, dir, "", 1, needleMapKind, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, needle.GetCurrentVersion(), 0, 0) @@ -87,7 +87,7 @@ func testCompaction(t *testing.T, needleMapKind NeedleMapKind) { } startTime := time.Now() - v.Compact2(0, 0, nil) + v.CompactByIndex(nil) speed := float64(v.ContentSize()) / time.Now().Sub(startTime).Seconds() t.Logf("compaction speed: %.2f bytes/s", speed)