From a9d12a07925afcea63c317a210e044f8d25f9832 Mon Sep 17 00:00:00 2001 From: Lisandro Pin Date: Tue, 17 Feb 2026 00:09:01 +0100 Subject: [PATCH] Implement full scrubbing for EC volumes (#8318) Implement full scrubbing for EC volumes. --- weed/server/volume_grpc_scrub.go | 7 +- weed/storage/erasure_coding/ec_shard.go | 17 +++++ weed/storage/store_ec.go | 73 ++++++++++++------- weed/storage/store_ec_scrub.go | 97 +++++++++++++++++++++++++ 4 files changed, 161 insertions(+), 33 deletions(-) create mode 100644 weed/storage/store_ec_scrub.go diff --git a/weed/server/volume_grpc_scrub.go b/weed/server/volume_grpc_scrub.go index 1e0b32dc5..62f848ec0 100644 --- a/weed/server/volume_grpc_scrub.go +++ b/weed/server/volume_grpc_scrub.go @@ -5,7 +5,6 @@ import ( "fmt" "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" - "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" "github.com/seaweedfs/seaweedfs/weed/storage/needle" ) @@ -95,7 +94,7 @@ func (vs *VolumeServer) ScrubEcVolume(ctx context.Context, req *volume_server_pb case volume_server_pb.VolumeScrubMode_LOCAL: files, shardInfos, serrs = v.ScrubLocal() case volume_server_pb.VolumeScrubMode_FULL: - files, shardInfos, serrs = scrubEcVolumeFull(ctx, v) + files, shardInfos, serrs = vs.store.ScrubEcVolume(v.VolumeId) default: return nil, fmt.Errorf("unsupported EC volume scrub mode %d", m) } @@ -120,7 +119,3 @@ func (vs *VolumeServer) ScrubEcVolume(ctx context.Context, req *volume_server_pb } return res, nil } - -func scrubEcVolumeFull(ctx context.Context, v *erasure_coding.EcVolume) (int64, []*volume_server_pb.EcShardInfo, []error) { - return 0, nil, []error{fmt.Errorf("scrubEcVolumeFull(): not implemented, see https://github.com/seaweedfs/seaweedfs/issues/8018")} -} diff --git a/weed/storage/erasure_coding/ec_shard.go b/weed/storage/erasure_coding/ec_shard.go index bc9aacb0a..0bf702e79 100644 --- a/weed/storage/erasure_coding/ec_shard.go +++ b/weed/storage/erasure_coding/ec_shard.go @@ -43,6 +43,23 @@ func AllShardIds() []ShardId { return res } +// Compares a pair of EcShardInfo protos for sorting. +func CmpEcShardInfo(a, b *volume_server_pb.EcShardInfo) int { + if a.VolumeId < b.VolumeId { + return -1 + } + if a.VolumeId > b.VolumeId { + return 1 + } + if a.ShardId < b.ShardId { + return -1 + } + if a.ShardId > b.ShardId { + return 1 + } + return 0 +} + type EcVolumeShard struct { VolumeId needle.VolumeId ShardId ShardId diff --git a/weed/storage/store_ec.go b/weed/storage/store_ec.go index 0f9beba1e..605c0532b 100644 --- a/weed/storage/store_ec.go +++ b/weed/storage/store_ec.go @@ -159,7 +159,7 @@ func (s *Store) ReadEcShardNeedle(vid needle.VolumeId, n *needle.Needle, onReadS if len(intervals) > 1 { glog.V(3).Infof("ReadEcShardNeedle needle id %s intervals:%+v", n.String(), intervals) } - bytes, isDeleted, err := s.readEcShardIntervals(vid, n.Id, localEcVolume, intervals) + bytes, isDeleted, err := s.readEcShardIntervals(n.Id, localEcVolume, intervals) if err != nil { return 0, fmt.Errorf("ReadEcShardIntervals: %w", err) } @@ -178,8 +178,11 @@ func (s *Store) ReadEcShardNeedle(vid needle.VolumeId, n *needle.Needle, onReadS return 0, fmt.Errorf("ec shard %d not found", vid) } -func (s *Store) readEcShardIntervals(vid needle.VolumeId, needleId types.NeedleId, ecVolume *erasure_coding.EcVolume, intervals []erasure_coding.Interval) (data []byte, is_deleted bool, err error) { +func (s *Store) IntervalToShardIdAndOffset(iv erasure_coding.Interval) (erasure_coding.ShardId, int64) { + return iv.ToShardIdAndOffset(erasure_coding.ErasureCodingLargeBlockSize, erasure_coding.ErasureCodingSmallBlockSize) +} +func (s *Store) readEcShardIntervals(needleId types.NeedleId, ecVolume *erasure_coding.EcVolume, intervals []erasure_coding.Interval) (data []byte, is_deleted bool, err error) { if err = s.cachedLookupEcShardLocations(ecVolume); err != nil { return nil, false, fmt.Errorf("failed to locate shard via master grpc %s: %v", s.MasterAddress, err) } @@ -202,37 +205,36 @@ func (s *Store) readEcShardIntervals(vid needle.VolumeId, needleId types.NeedleI } func (s *Store) readOneEcShardInterval(needleId types.NeedleId, ecVolume *erasure_coding.EcVolume, interval erasure_coding.Interval) (data []byte, is_deleted bool, err error) { - shardId, actualOffset := interval.ToShardIdAndOffset(erasure_coding.ErasureCodingLargeBlockSize, erasure_coding.ErasureCodingSmallBlockSize) + shardId, actualOffset := s.IntervalToShardIdAndOffset(interval) data = make([]byte, interval.Size) - if shard, found := ecVolume.FindEcVolumeShard(shardId); found { - var readSize int - if readSize, err = shard.ReadAt(data, actualOffset); err != nil { - if readSize != int(interval.Size) { - glog.V(0).Infof("read local ec shard %d.%d offset %d: %v", ecVolume.VolumeId, shardId, actualOffset, err) - return - } - } - } else { - ecVolume.ShardLocationsLock.RLock() - sourceDataNodes, hasShardIdLocation := ecVolume.ShardLocations[shardId] - ecVolume.ShardLocationsLock.RUnlock() - - // try reading directly - if hasShardIdLocation { - _, is_deleted, err = s.readRemoteEcShardInterval(sourceDataNodes, needleId, ecVolume.VolumeId, shardId, data, actualOffset) - if err == nil { - return - } - glog.V(0).Infof("clearing ec shard %d.%d locations: %v", ecVolume.VolumeId, shardId, err) - } - // try reading by recovering from other shards - _, is_deleted, err = s.recoverOneRemoteEcShardInterval(needleId, ecVolume, shardId, data, actualOffset) + // try local read + err = s.readLocalEcShardInterval(ecVolume, shardId, data, actualOffset) + if err == nil { + return + } + glog.V(0).Infof("read local ec shard %d.%d offset %d: %v", ecVolume.VolumeId, shardId, actualOffset, err) + + ecVolume.ShardLocationsLock.RLock() + sourceDataNodes, hasShardIdLocation := ecVolume.ShardLocations[shardId] + ecVolume.ShardLocationsLock.RUnlock() + + // try reading directly + if hasShardIdLocation { + _, is_deleted, err = s.readRemoteEcShardInterval(sourceDataNodes, needleId, ecVolume.VolumeId, shardId, data, actualOffset) if err == nil { return } - glog.V(0).Infof("recover ec shard %d.%d : %v", ecVolume.VolumeId, shardId, err) + glog.V(0).Infof("read remote ec shard %d.%d locations: %v", ecVolume.VolumeId, shardId, err) + } + + // try reading by recovering from other shards + _, is_deleted, err = s.recoverOneRemoteEcShardInterval(needleId, ecVolume, shardId, data, actualOffset) + if err == nil { + return } + glog.V(0).Infof("recover ec shard %d.%d : %v", ecVolume.VolumeId, shardId, err) + return } @@ -286,6 +288,23 @@ func (s *Store) cachedLookupEcShardLocations(ecVolume *erasure_coding.EcVolume) return } +func (s *Store) readLocalEcShardInterval(ecVolume *erasure_coding.EcVolume, shardId erasure_coding.ShardId, buf []byte, offset int64) error { + shard, found := ecVolume.FindEcVolumeShard(shardId) + if !found { + return fmt.Errorf("shard %d not found for volume %d", shardId, ecVolume.VolumeId) + } + + readBytes, err := shard.ReadAt(buf, offset) + if err != nil { + return fmt.Errorf("failed to read local EC shard %d for volume %d: %v", shardId, ecVolume.VolumeId, err) + } + if got, want := readBytes, len(buf); got != want { + return fmt.Errorf("expected %d bytes for local EC shard %d on volume %d, got %d", want, shardId, ecVolume.VolumeId, got) + } + + return nil +} + func (s *Store) readRemoteEcShardInterval(sourceDataNodes []pb.ServerAddress, needleId types.NeedleId, vid needle.VolumeId, shardId erasure_coding.ShardId, buf []byte, offset int64) (n int, is_deleted bool, err error) { if len(sourceDataNodes) == 0 { diff --git a/weed/storage/store_ec_scrub.go b/weed/storage/store_ec_scrub.go new file mode 100644 index 000000000..7302b2876 --- /dev/null +++ b/weed/storage/store_ec_scrub.go @@ -0,0 +1,97 @@ +package storage + +import ( + "fmt" + "slices" + + "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" + "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" + "github.com/seaweedfs/seaweedfs/weed/storage/needle" + "github.com/seaweedfs/seaweedfs/weed/storage/types" +) + +// ScrubEcVolume checks the full integrity of a EC volume, across both local and remote shards. +// Returns a count of processed file entries, slice of found broken shards, and slice of found errors. +func (s *Store) ScrubEcVolume(vid needle.VolumeId) (int64, []*volume_server_pb.EcShardInfo, []error) { + ecv, found := s.FindEcVolume(vid) + if !found { + return 0, nil, []error{fmt.Errorf("EC volume id %d not found", vid)} + } + if err := s.cachedLookupEcShardLocations(ecv); err != nil { + return 0, nil, []error{fmt.Errorf("failed to locate shard via master grpc %s: %v", s.MasterAddress, err)} + } + + // full scan means verifying indexes as well + _, errs := ecv.ScrubIndex() + + var count int64 + // reads for EC chunks can hit the same shard multiple times, so dedupe upon read errors + brokenShardsMap := map[erasure_coding.ShardId]*volume_server_pb.EcShardInfo{} + + err := ecv.WalkIndex(func(id types.NeedleId, offset types.Offset, size types.Size) error { + count++ + if size.IsTombstone() { + // nothing to do for tombstones... + return nil + } + + data := make([]byte, 0, needle.GetActualSize(size, ecv.Version)) + intervals := ecv.LocateEcShardNeedleInterval(ecv.Version, offset.ToActualOffset(), size) + + for i, iv := range intervals { + chunk := make([]byte, iv.Size) + shardId, offset := s.IntervalToShardIdAndOffset(iv) + + // try a local shard read first... + if err := s.readLocalEcShardInterval(ecv, shardId, chunk, offset); err == nil { + data = append(data, chunk...) + continue + } + + // ...then remote. note we do not try to recover EC-encoded data upon read failures; + // we want check that shards are valid without decoding + ecv.ShardLocationsLock.RLock() + sourceDataNodes, ok := ecv.ShardLocations[shardId] + ecv.ShardLocationsLock.RUnlock() + if ok { + if _, _, err := s.readRemoteEcShardInterval(sourceDataNodes, id, ecv.VolumeId, shardId, chunk, offset); err == nil { + data = append(data, chunk...) + continue + } + } + + // chunk read for shard failed :( + errs = append(errs, fmt.Errorf("failed to read EC shard %d for needle %d on volume %d (interval %d/%d)", shardId, id, ecv.VolumeId, i+1, len(intervals))) + brokenShardsMap[shardId] = &volume_server_pb.EcShardInfo{ + ShardId: uint32(shardId), + Size: int64(iv.Size), + Collection: ecv.Collection, + VolumeId: uint32(ecv.VolumeId), + } + break + } + + if got, want := int64(len(data)), needle.GetActualSize(size, ecv.Version); got != want { + errs = append(errs, fmt.Errorf("expected %d bytes for needle %d, got %d", want, id, got)) + return nil + } + + n := needle.Needle{} + if err := n.ReadBytes(data, 0, size, ecv.Version); err != nil { + errs = append(errs, fmt.Errorf("needle %d on EC volume %d: %v", id, ecv.VolumeId, err)) + } + + return nil + }) + if err != nil { + errs = append(errs, err) + } + + brokenShards := []*volume_server_pb.EcShardInfo{} + for _, s := range brokenShardsMap { + brokenShards = append(brokenShards, s) + } + slices.SortFunc(brokenShards, erasure_coding.CmpEcShardInfo) + + return count, brokenShards, errs +}