From 117bba794ce06467d6e90afab4ac417ffab5d8bf Mon Sep 17 00:00:00 2001 From: chrislu Date: Wed, 13 Aug 2025 00:28:12 -0700 Subject: [PATCH] WriteDatFileAndVacuum --- weed/storage/erasure_coding/ec_decoder.go | 121 ++++++++++++++++++ weed/worker/tasks/ec_vacuum/ec_vacuum_task.go | 28 ++-- 2 files changed, 133 insertions(+), 16 deletions(-) diff --git a/weed/storage/erasure_coding/ec_decoder.go b/weed/storage/erasure_coding/ec_decoder.go index 00d23be05..bdcbc5084 100644 --- a/weed/storage/erasure_coding/ec_decoder.go +++ b/weed/storage/erasure_coding/ec_decoder.go @@ -212,3 +212,124 @@ func min(x, y int64) int64 { } return x } + +// WriteDatFileAndVacuum reconstructs volume from EC shards and then vacuums deleted needles +// This reuses existing WriteDatFile and volume compaction logic to achieve the same result more cleanly +func WriteDatFileAndVacuum(baseFileName string, shardFileNames []string) error { + // Step 1: Use existing WriteDatFile to reconstruct the full volume + datFileSize, err := FindDatFileSize(baseFileName, baseFileName) + if err != nil { + return fmt.Errorf("failed to find dat file size: %w", err) + } + + tempDatFile := baseFileName + ".tmp.dat" + err = WriteDatFile(tempDatFile, datFileSize, shardFileNames) + if err != nil { + return fmt.Errorf("failed to reconstruct volume with WriteDatFile: %w", err) + } + defer os.Remove(tempDatFile) // cleanup temp file + + // Step 2: Create index file with deleted entries marked (existing function) + tempIdxFile := baseFileName + ".tmp.idx" + err = WriteIdxFileFromEcIndex(baseFileName + ".tmp") + if err != nil { + return fmt.Errorf("failed to create index file: %w", err) + } + defer os.Remove(tempIdxFile) // cleanup temp file + + // Step 3: Use existing volume compaction logic to filter out deleted needles + version, err := readEcVolumeVersion(baseFileName) + if err != nil { + return fmt.Errorf("failed to read volume version: %w", err) + } + + return copyDataBasedOnIndexFileForEcVacuum( + tempDatFile, tempIdxFile, // source files (with deleted entries) + baseFileName+".dat", baseFileName+".idx", // destination files (cleaned) + version, + ) +} + +// copyDataBasedOnIndexFileForEcVacuum copies only non-deleted needles from source to destination +// This is a simplified version of volume_vacuum.go's copyDataBasedOnIndexFile for EC vacuum use +func copyDataBasedOnIndexFileForEcVacuum(srcDatName, srcIdxName, dstDatName, dstIdxName string, version needle.Version) error { + // Open source data file + dataFile, err := os.Open(srcDatName) + if err != nil { + return fmt.Errorf("failed to open source dat file: %w", err) + } + srcDatBackend := backend.NewDiskFile(dataFile) + defer srcDatBackend.Close() + + // Create destination data file + dstDatBackend, err := backend.CreateVolumeFile(dstDatName, 0, 0) + if err != nil { + return fmt.Errorf("failed to create destination dat file: %w", err) + } + defer func() { + dstDatBackend.Sync() + dstDatBackend.Close() + }() + + // Load needle map from source index + oldNm := needle_map.NewMemDb() + defer oldNm.Close() + if err := oldNm.LoadFromIdx(srcIdxName); err != nil { + return fmt.Errorf("failed to load index file: %w", err) + } + + // Create new needle map for cleaned volume + newNm := needle_map.NewMemDb() + defer newNm.Close() + + // Copy superblock with incremented compaction revision + sb := super_block.SuperBlock{} + if existingSb, err := super_block.ReadSuperBlock(srcDatBackend); err == nil { + sb = existingSb + sb.CompactionRevision++ + } else { + // Use default superblock if reading fails + sb = super_block.SuperBlock{ + Version: version, + ReplicaPlacement: &super_block.ReplicaPlacement{}, + CompactionRevision: 1, + } + } + + dstDatBackend.WriteAt(sb.Bytes(), 0) + newOffset := int64(sb.BlockSize()) + + // Copy only non-deleted needles + err = oldNm.AscendingVisit(func(value needle_map.NeedleValue) error { + offset, size := value.Offset, value.Size + + // Skip deleted needles (this is the key filtering logic!) + if offset.IsZero() || size.IsDeleted() { + return nil + } + + // Read needle from source + n := new(needle.Needle) + if err := n.ReadData(srcDatBackend, offset.ToActualOffset(), size, version); err != nil { + return fmt.Errorf("cannot read needle from source: %w", err) + } + + // Write needle to destination + if err := newNm.Set(n.Id, types.ToOffset(newOffset), n.Size); err != nil { + return fmt.Errorf("cannot set needle in new map: %w", err) + } + if _, _, _, err := n.Append(dstDatBackend, sb.Version); err != nil { + return fmt.Errorf("cannot append needle to destination: %w", err) + } + + newOffset += n.DiskSize(version) + return nil + }) + + if err != nil { + return fmt.Errorf("failed to copy needles: %w", err) + } + + // Save the new index file + return newNm.SaveToIdx(dstIdxName) +} diff --git a/weed/worker/tasks/ec_vacuum/ec_vacuum_task.go b/weed/worker/tasks/ec_vacuum/ec_vacuum_task.go index 2c32ec01e..2356e0d10 100644 --- a/weed/worker/tasks/ec_vacuum/ec_vacuum_task.go +++ b/weed/worker/tasks/ec_vacuum/ec_vacuum_task.go @@ -39,8 +39,8 @@ type EcVacuumTask struct { sourceNodes map[pb.ServerAddress]erasure_coding.ShardBits tempDir string grpcDialOption grpc.DialOption - adminAddress string // admin server address for API calls masterAddress pb.ServerAddress // master server address for activation RPC + adminAddress string // admin server address for API calls cleanupGracePeriod time.Duration // grace period before cleaning up old generation (1 minute default) topologyTaskID string // links to ActiveTopology task for capacity tracking @@ -146,7 +146,7 @@ func (t *EcVacuumTask) Execute(ctx context.Context, params *worker_pb.TaskParams return fmt.Errorf("failed to collect EC shards: %w", err) } - // Step 3: Decode EC shards into normal volume on worker (skips deleted entries automatically) + // Step 3: Decode EC shards into normal volume on worker (properly filters out deleted entries using merged .ecj) if err := t.decodeEcShardsToVolume(); err != nil { return fmt.Errorf("failed to decode EC shards to volume: %w", err) } @@ -356,7 +356,7 @@ func (t *EcVacuumTask) copyFileFromVolumeServer(client volume_server_pb.VolumeSe return nil } -// decodeEcShardsToVolume decodes EC shards into a normal volume on worker, automatically skipping deleted entries +// decodeEcShardsToVolume decodes EC shards into a normal volume on worker, properly filtering deleted entries using merged .ecj file func (t *EcVacuumTask) decodeEcShardsToVolume() error { t.LogInfo("Decoding EC shards to normal volume on worker", map[string]interface{}{ "volume_id": t.volumeID, @@ -397,22 +397,18 @@ func (t *EcVacuumTask) decodeEcShardsToVolume() error { return fmt.Errorf("failed to find dat file size: %w", err) } - // Step 5: Write .dat file from EC data shards (this automatically skips deleted entries) - err = erasure_coding.WriteDatFile(baseFileName, datFileSize, shardFileNames) - if err != nil { - return fmt.Errorf("failed to write dat file: %w", err) - } - - // Step 6: Write .idx file from .ecx and merged .ecj files (skips deleted entries) - err = erasure_coding.WriteIdxFileFromEcIndex(baseFileName) + // Step 5: Reconstruct and vacuum volume data (reuses existing functions + compaction logic) + err = erasure_coding.WriteDatFileAndVacuum(baseFileName, shardFileNames) if err != nil { - return fmt.Errorf("failed to write idx file from ec index: %w", err) + return fmt.Errorf("failed to reconstruct and vacuum volume: %w", err) } - t.LogInfo("Successfully decoded EC shards to normal volume", map[string]interface{}{ - "dat_file": datFileName, - "idx_file": idxFileName, - "dat_size": datFileSize, + t.LogInfo("Successfully decoded EC shards to filtered normal volume", map[string]interface{}{ + "dat_file": datFileName, + "idx_file": idxFileName, + "original_dat_size": datFileSize, + "deleted_entries_filtered": true, + "note": "deleted needles physically removed from volume", }) return nil