From c265a1318c51d67ffdb79fd7ded2d71900769541 Mon Sep 17 00:00:00 2001 From: Lisandro Pin Date: Fri, 6 Feb 2026 16:22:06 +0100 Subject: [PATCH] Implement local scrubbing for EC volumes. --- weed/server/volume_grpc_scrub.go | 8 +- weed/shell/command_ec_scrub.go | 3 +- weed/storage/erasure_coding/ec_volume.go | 11 -- .../storage/erasure_coding/ec_volume_scrub.go | 103 ++++++++++++++++++ weed/storage/erasure_coding/ec_volume_test.go | 2 +- .../test_files/389.ecx | Bin weed/storage/idx/check_test.go | 7 -- weed/storage/types/needle_types.go | 4 + 8 files changed, 111 insertions(+), 27 deletions(-) create mode 100644 weed/storage/erasure_coding/ec_volume_scrub.go rename weed/storage/{idx => erasure_coding}/test_files/389.ecx (100%) diff --git a/weed/server/volume_grpc_scrub.go b/weed/server/volume_grpc_scrub.go index 43e33f2d4..dfb317da1 100644 --- a/weed/server/volume_grpc_scrub.go +++ b/weed/server/volume_grpc_scrub.go @@ -99,9 +99,9 @@ func (vs *VolumeServer) ScrubEcVolume(ctx context.Context, req *volume_server_pb switch m := req.GetMode(); m { case volume_server_pb.VolumeScrubMode_INDEX: // index scrubs do not verify individual EC shards - files, serrs = v.CheckIndex() + files, serrs = v.ScrubIndex() case volume_server_pb.VolumeScrubMode_LOCAL: - files, shardInfos, serrs = scrubEcVolumeLocal(ctx, v) + files, shardInfos, serrs = v.ScrubLocal() case volume_server_pb.VolumeScrubMode_FULL: files, shardInfos, serrs = scrubEcVolumeFull(ctx, v) default: @@ -129,10 +129,6 @@ func (vs *VolumeServer) ScrubEcVolume(ctx context.Context, req *volume_server_pb return res, nil } -func scrubEcVolumeLocal(ctx context.Context, v *erasure_coding.EcVolume) (int64, []*volume_server_pb.EcShardInfo, []error) { - return 0, nil, []error{fmt.Errorf("scrubEcVolumeLocal(): not implemented, see https://github.com/seaweedfs/seaweedfs/issues/8018")} -} - func scrubEcVolumeFull(ctx context.Context, v *erasure_coding.EcVolume) (int64, []*volume_server_pb.EcShardInfo, []error) { return 0, nil, []error{fmt.Errorf("scrubEcVolumeFull(): not implemented, see https://github.com/seaweedfs/seaweedfs/issues/8018")} } diff --git a/weed/shell/command_ec_scrub.go b/weed/shell/command_ec_scrub.go index 64b474fe9..24117464b 100644 --- a/weed/shell/command_ec_scrub.go +++ b/weed/shell/command_ec_scrub.go @@ -49,8 +49,7 @@ func (c *commandEcVolumeScrub) Do(args []string, commandEnv *CommandEnv, writer volScrubCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) nodesStr := volScrubCommand.String("node", "", "comma-separated list of volume server : (optional)") volumeIDsStr := volScrubCommand.String("volumeId", "", "comma-separated EC volume IDs to process (optional)") - // TODO: switch default mode to LOCAL, once implemented. - mode := volScrubCommand.String("mode", "index", "scrubbing mode (index/local/full)") + mode := volScrubCommand.String("mode", "local", "scrubbing mode (index/local/full)") maxParallelization := volScrubCommand.Int("maxParallelization", DefaultMaxParallelization, "run up to X tasks in parallel, whenever possible") if err = volScrubCommand.Parse(args); err != nil { diff --git a/weed/storage/erasure_coding/ec_volume.go b/weed/storage/erasure_coding/ec_volume.go index 27fcec13b..f926f1925 100644 --- a/weed/storage/erasure_coding/ec_volume.go +++ b/weed/storage/erasure_coding/ec_volume.go @@ -339,14 +339,3 @@ func (ev *EcVolume) WalkIndex(processNeedleFn func(key types.NeedleId, offset ty } return idx.WalkIndexFile(ev.ecxFile, 0, processNeedleFn) } - -func (ev *EcVolume) CheckIndex() (int64, []error) { - if ev.ecxFile == nil { - return 0, []error{fmt.Errorf("no ECX file associated with EC volume %v", ev.VolumeId)} - } - if ev.ecxFileSize == 0 { - return 0, []error{fmt.Errorf("zero-size ECX file for EC volume %v", ev.VolumeId)} - } - - return idx.CheckIndexFile(ev.ecxFile, ev.ecxFileSize, ev.Version) -} diff --git a/weed/storage/erasure_coding/ec_volume_scrub.go b/weed/storage/erasure_coding/ec_volume_scrub.go new file mode 100644 index 000000000..1a051d523 --- /dev/null +++ b/weed/storage/erasure_coding/ec_volume_scrub.go @@ -0,0 +1,103 @@ +package erasure_coding + +import ( + "fmt" + "slices" + + "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" + "github.com/seaweedfs/seaweedfs/weed/storage/idx" + "github.com/seaweedfs/seaweedfs/weed/storage/needle" + "github.com/seaweedfs/seaweedfs/weed/storage/types" +) + +// ScrubIndex verifies index integrity of an EC volume. +func (ev *EcVolume) ScrubIndex() (int64, []error) { + if ev.ecxFile == nil { + return 0, []error{fmt.Errorf("no ECX file associated with EC volume %v", ev.VolumeId)} + } + if ev.ecxFileSize == 0 { + return 0, []error{fmt.Errorf("zero-size ECX file for EC volume %v", ev.VolumeId)} + } + + return idx.CheckIndexFile(ev.ecxFile, ev.ecxFileSize, ev.Version) +} + +// ScrubLocal checks the integrity of local shards for a EC volume. Notably, it cannot verify CRC on needles. +// Returns a count of processed file entries, slice of found broken shards, and slice of found errors. +func (ecv *EcVolume) ScrubLocal() (int64, []*volume_server_pb.EcShardInfo, []error) { + // local scan means verifying indexes as well + _, errs := ecv.ScrubIndex() + + brokenShardsMap := map[ShardId]*EcVolumeShard{} + var count int64 + + flagShardBroken := func(ecs *EcVolumeShard, errFmt string, a ...any) { + // reads for EC chunks can hit the same shard multiple times, so dedupe upon read errors + brokenShardsMap[ecs.ShardId] = ecs + errs = append(errs, fmt.Errorf(errFmt, a...)) + } + + err := idx.WalkIndexFile(ecv.ecxFile, 0, func(id types.NeedleId, offset types.Offset, size types.Size) error { + count++ + if size.IsTombstone() { + // nothing to do for tombstones... + return nil + } + + var read int64 + locations := ecv.LocateEcShardNeedleInterval(ecv.Version, offset.ToActualOffset(), size) + + for i, iv := range locations { + sid, soffset := iv.ToShardIdAndOffset(ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize) + ssize := int64(iv.Size.Raw()) + shard, found := ecv.FindEcVolumeShard(sid) + if !found { + // shard is not local :( skip it + read += ssize + continue + } + if soffset+int64(ssize) > shard.Size() { + flagShardBroken(shard, "local shard %d for needle %d is too short (%d), cannot read chunk %d/%d", sid, id, shard.Size(), i+1, len(locations)) + continue + } + + buf := make([]byte, ssize) + got, err := shard.ReadAt(buf, soffset) + if err != nil { + flagShardBroken(shard, "failed to read chunk %d/%d for needle %d from local shard %d at offset %d: %v", i+1, len(locations), id, sid, soffset, err) + continue + } + if int64(got) != ssize { + flagShardBroken(shard, "expected %d bytes for chunk %d/%d for needle %d from local shard %d, got %d", ssize, i+1, len(locations), id, sid, got) + continue + } + read += int64(got) + } + + if got, want := read, needle.GetActualSize(size, ecv.Version); got != want { + return fmt.Errorf("expected %d bytes for needle %d, got %d", want, id, got) + } + + return nil + }) + if err != nil { + errs = append(errs, err) + } + + // collect broken shard infos for reporting + brokenShards := make([]*volume_server_pb.EcShardInfo, 0, len(brokenShardsMap)) + for _, s := range brokenShardsMap { + brokenShards = append(brokenShards, s.ToEcShardInfo()) + } + slices.SortFunc(brokenShards, func(a, b *volume_server_pb.EcShardInfo) int { + if a.ShardId < b.ShardId { + return -1 + } + if a.ShardId > b.ShardId { + return 1 + } + return 0 + }) + + return count, brokenShards, errs +} diff --git a/weed/storage/erasure_coding/ec_volume_test.go b/weed/storage/erasure_coding/ec_volume_test.go index 323972c1a..7d833502c 100644 --- a/weed/storage/erasure_coding/ec_volume_test.go +++ b/weed/storage/erasure_coding/ec_volume_test.go @@ -13,7 +13,7 @@ import ( func TestPositioning(t *testing.T) { - ecxFile, err := os.OpenFile("../idx/test_files/389.ecx", os.O_RDONLY, 0) + ecxFile, err := os.OpenFile("./test_files/389.ecx", os.O_RDONLY, 0) if err != nil { t.Errorf("failed to open ecx file: %v", err) } diff --git a/weed/storage/idx/test_files/389.ecx b/weed/storage/erasure_coding/test_files/389.ecx similarity index 100% rename from weed/storage/idx/test_files/389.ecx rename to weed/storage/erasure_coding/test_files/389.ecx diff --git a/weed/storage/idx/check_test.go b/weed/storage/idx/check_test.go index c397d8e56..c3f8a3b9b 100644 --- a/weed/storage/idx/check_test.go +++ b/weed/storage/idx/check_test.go @@ -50,13 +50,6 @@ func TestCheckIndexFile(t *testing.T) { fmt.Errorf("expected an index file of size 2540, got 2528"), }, }, - { - name: "healthy EC index", - indexPath: "./test_files/389.ecx", - version: needle.Version3, - want: 485098, - wantErrs: []error{}, - }, { name: "healthy EC index with deleted files", indexPath: "./test_files/deleted_files.ecx", diff --git a/weed/storage/types/needle_types.go b/weed/storage/types/needle_types.go index fb527d132..c870f8a18 100644 --- a/weed/storage/types/needle_types.go +++ b/weed/storage/types/needle_types.go @@ -14,6 +14,10 @@ type Offset struct { type Size int32 +func (s Size) IsTombstone() bool { + return s == TombstoneFileSize +} + // IsDeleted checks if the needle entry has been marked as deleted (tombstoned). // Use this when checking if an entry should exist in the needle map. // Returns true for negative sizes or TombstoneFileSize.