From 7ed75784241ad8d7635113e3e173959f0e8446ae Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 14 Dec 2025 17:06:13 -0800 Subject: [PATCH] fix(ec.decode): purge EC shards when volume is empty (#7749) * fix(ec.decode): purge EC shards when volume is empty When an EC volume has no live entries (all deleted), ec.decode should not generate an empty normal volume. Instead, treat decode as a no-op and allow shard purge to proceed cleanly.\n\nFixes: #7748 * chore: address PR review comments * test: cover live EC index + avoid magic string * chore: harden empty-EC handling - Make shard cleanup best-effort (collect errors)\n- Remove unreachable EOF handling in HasLiveNeedles\n- Add empty ecx test case\n- Share no-live-entries substring between server/client\n * perf: parallelize EC shard unmount/delete across locations * refactor: combine unmount+delete into single goroutine per location * refactor: use errors.Join for multi-error aggregation * refactor: use existing ErrorWaitGroup for parallel execution * fix: capture loop variables + clarify SuperBlockSize safety --- weed/server/volume_grpc_erasure_coding.go | 14 ++++ weed/shell/command_ec_decode.go | 65 ++++++++++----- weed/storage/erasure_coding/ec_decoder.go | 22 +++++ .../storage/erasure_coding/ec_decoder_test.go | 81 +++++++++++++++++++ 4 files changed, 164 insertions(+), 18 deletions(-) create mode 100644 weed/storage/erasure_coding/ec_decoder_test.go diff --git a/weed/server/volume_grpc_erasure_coding.go b/weed/server/volume_grpc_erasure_coding.go index 5d100bdda..ec59ffa39 100644 --- a/weed/server/volume_grpc_erasure_coding.go +++ b/weed/server/volume_grpc_erasure_coding.go @@ -20,6 +20,9 @@ import ( "github.com/seaweedfs/seaweedfs/weed/storage/types" "github.com/seaweedfs/seaweedfs/weed/storage/volume_info" "github.com/seaweedfs/seaweedfs/weed/util" + + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) /* @@ -506,6 +509,17 @@ func (vs *VolumeServer) VolumeEcShardsToVolume(ctx context.Context, req *volume_ } dataBaseFileName, indexBaseFileName := v.DataBaseFileName(), v.IndexBaseFileName() + + // If the EC index contains no live entries, decoding should be a no-op: + // just allow the caller to purge EC shards and do not generate an empty normal volume. + hasLive, err := erasure_coding.HasLiveNeedles(indexBaseFileName) + if err != nil { + return nil, fmt.Errorf("HasLiveNeedles %s: %w", indexBaseFileName, err) + } + if !hasLive { + return nil, status.Errorf(codes.FailedPrecondition, "ec volume %d %s", req.VolumeId, erasure_coding.EcNoLiveEntriesSubstring) + } + // calculate .dat file size datFileSize, err := erasure_coding.FindDatFileSize(dataBaseFileName, indexBaseFileName) if err != nil { diff --git a/weed/shell/command_ec_decode.go b/weed/shell/command_ec_decode.go index 695641a31..a9e5c9113 100644 --- a/weed/shell/command_ec_decode.go +++ b/weed/shell/command_ec_decode.go @@ -5,11 +5,14 @@ import ( "flag" "fmt" "io" + "strings" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/storage/types" "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "github.com/seaweedfs/seaweedfs/weed/operation" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" @@ -118,6 +121,11 @@ func doEcDecode(commandEnv *CommandEnv, topoInfo *master_pb.TopologyInfo, collec // generate a normal volume err = generateNormalVolume(commandEnv.option.GrpcDialOption, vid, collection, targetNodeLocation) if err != nil { + // Special case: if the EC index has no live entries, decoding is a no-op. + // Just purge EC shards and return success without generating/mounting an empty volume. + if isEcDecodeEmptyVolumeErr(err) { + return unmountAndDeleteEcShards(commandEnv.option.GrpcDialOption, collection, nodeToEcIndexBits, vid) + } return fmt.Errorf("generate normal volume %d on %s: %v", vid, targetNodeLocation, err) } @@ -130,6 +138,44 @@ func doEcDecode(commandEnv *CommandEnv, topoInfo *master_pb.TopologyInfo, collec return nil } +func isEcDecodeEmptyVolumeErr(err error) bool { + st, ok := status.FromError(err) + if !ok { + return false + } + if st.Code() != codes.FailedPrecondition { + return false + } + // Keep this robust against wording tweaks while still being specific. + return strings.Contains(st.Message(), erasure_coding.EcNoLiveEntriesSubstring) +} + +func unmountAndDeleteEcShards(grpcDialOption grpc.DialOption, collection string, nodeToEcIndexBits map[pb.ServerAddress]erasure_coding.ShardBits, vid needle.VolumeId) error { + return unmountAndDeleteEcShardsWithPrefix("unmountAndDeleteEcShards", grpcDialOption, collection, nodeToEcIndexBits, vid) +} + +func unmountAndDeleteEcShardsWithPrefix(prefix string, grpcDialOption grpc.DialOption, collection string, nodeToEcIndexBits map[pb.ServerAddress]erasure_coding.ShardBits, vid needle.VolumeId) error { + ewg := NewErrorWaitGroup(len(nodeToEcIndexBits)) + + // unmount and delete ec shards in parallel (one goroutine per location) + for location, ecIndexBits := range nodeToEcIndexBits { + location, ecIndexBits := location, ecIndexBits // capture loop variables for goroutine + ewg.Add(func() error { + fmt.Printf("unmount ec volume %d on %s has shards: %+v\n", vid, location, ecIndexBits.ShardIds()) + if err := unmountEcShards(grpcDialOption, vid, location, ecIndexBits.ToUint32Slice()); err != nil { + return fmt.Errorf("%s unmount ec volume %d on %s: %w", prefix, vid, location, err) + } + + fmt.Printf("delete ec volume %d on %s has shards: %+v\n", vid, location, ecIndexBits.ShardIds()) + if err := sourceServerDeleteEcShards(grpcDialOption, collection, vid, location, ecIndexBits.ToUint32Slice()); err != nil { + return fmt.Errorf("%s delete ec volume %d on %s: %w", prefix, vid, location, err) + } + return nil + }) + } + return ewg.Wait() +} + func mountVolumeAndDeleteEcShards(grpcDialOption grpc.DialOption, collection string, targetNodeLocation pb.ServerAddress, nodeToEcIndexBits map[pb.ServerAddress]erasure_coding.ShardBits, vid needle.VolumeId) error { // mount volume @@ -142,24 +188,7 @@ func mountVolumeAndDeleteEcShards(grpcDialOption grpc.DialOption, collection str return fmt.Errorf("mountVolumeAndDeleteEcShards mount volume %d on %s: %v", vid, targetNodeLocation, err) } - // unmount ec shards - for location, ecIndexBits := range nodeToEcIndexBits { - fmt.Printf("unmount ec volume %d on %s has shards: %+v\n", vid, location, ecIndexBits.ShardIds()) - err := unmountEcShards(grpcDialOption, vid, location, ecIndexBits.ToUint32Slice()) - if err != nil { - return fmt.Errorf("mountVolumeAndDeleteEcShards unmount ec volume %d on %s: %v", vid, location, err) - } - } - // delete ec shards - for location, ecIndexBits := range nodeToEcIndexBits { - fmt.Printf("delete ec volume %d on %s has shards: %+v\n", vid, location, ecIndexBits.ShardIds()) - err := sourceServerDeleteEcShards(grpcDialOption, collection, vid, location, ecIndexBits.ToUint32Slice()) - if err != nil { - return fmt.Errorf("mountVolumeAndDeleteEcShards delete ec volume %d on %s: %v", vid, location, err) - } - } - - return nil + return unmountAndDeleteEcShardsWithPrefix("mountVolumeAndDeleteEcShards", grpcDialOption, collection, nodeToEcIndexBits, vid) } func generateNormalVolume(grpcDialOption grpc.DialOption, vid needle.VolumeId, collection string, sourceVolumeServer pb.ServerAddress) error { diff --git a/weed/storage/erasure_coding/ec_decoder.go b/weed/storage/erasure_coding/ec_decoder.go index a1d929f6c..429dd7ac4 100644 --- a/weed/storage/erasure_coding/ec_decoder.go +++ b/weed/storage/erasure_coding/ec_decoder.go @@ -14,6 +14,23 @@ import ( "github.com/seaweedfs/seaweedfs/weed/util" ) +// EcNoLiveEntriesSubstring is used for server/client coordination when ec.decode determines that +// decoding should be a no-op (all entries are deleted). +const EcNoLiveEntriesSubstring = "has no live entries" + +// HasLiveNeedles returns whether the EC index (.ecx) contains at least one live (non-deleted) entry. +// This is used by ec.decode to avoid generating an empty normal volume when all entries were deleted. +func HasLiveNeedles(indexBaseFileName string) (hasLive bool, err error) { + err = iterateEcxFile(indexBaseFileName, func(_ types.NeedleId, _ types.Offset, size types.Size) error { + if !size.IsDeleted() { + hasLive = true + return io.EOF // stop early + } + return nil + }) + return +} + // write .idx file from .ecx and .ecj files func WriteIdxFileFromEcIndex(baseFileName string) (err error) { @@ -52,6 +69,11 @@ func FindDatFileSize(dataBaseFileName, indexBaseFileName string) (datSize int64, return 0, fmt.Errorf("read ec volume %s version: %v", dataBaseFileName, err) } + // Safety: ensure datSize is at least SuperBlockSize. While the caller typically + // checks HasLiveNeedles first, this protects against direct calls to FindDatFileSize + // when all needles are deleted (see issue #7748). + datSize = int64(super_block.SuperBlockSize) + err = iterateEcxFile(indexBaseFileName, func(key types.NeedleId, offset types.Offset, size types.Size) error { if size.IsDeleted() { diff --git a/weed/storage/erasure_coding/ec_decoder_test.go b/weed/storage/erasure_coding/ec_decoder_test.go new file mode 100644 index 000000000..625d55402 --- /dev/null +++ b/weed/storage/erasure_coding/ec_decoder_test.go @@ -0,0 +1,81 @@ +package erasure_coding_test + +import ( + "os" + "path/filepath" + "testing" + + erasure_coding "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" + "github.com/seaweedfs/seaweedfs/weed/storage/types" +) + +func TestHasLiveNeedles_AllDeletedIsFalse(t *testing.T) { + dir := t.TempDir() + + collection := "foo" + base := filepath.Join(dir, collection+"_1") + + // Build an ecx file with only deleted entries. + // ecx file entries are the same format as .idx entries. + ecx := makeNeedleMapEntry(types.NeedleId(1), types.Offset{}, types.TombstoneFileSize) + if err := os.WriteFile(base+".ecx", ecx, 0644); err != nil { + t.Fatalf("write ecx: %v", err) + } + + hasLive, err := erasure_coding.HasLiveNeedles(base) + if err != nil { + t.Fatalf("HasLiveNeedles: %v", err) + } + if hasLive { + t.Fatalf("expected no live entries") + } +} + +func TestHasLiveNeedles_WithLiveEntryIsTrue(t *testing.T) { + dir := t.TempDir() + + collection := "foo" + base := filepath.Join(dir, collection+"_1") + + // Build an ecx file containing at least one live entry. + // ecx file entries are the same format as .idx entries. + live := makeNeedleMapEntry(types.NeedleId(1), types.Offset{}, types.Size(1)) + if err := os.WriteFile(base+".ecx", live, 0644); err != nil { + t.Fatalf("write ecx: %v", err) + } + + hasLive, err := erasure_coding.HasLiveNeedles(base) + if err != nil { + t.Fatalf("HasLiveNeedles: %v", err) + } + if !hasLive { + t.Fatalf("expected live entries") + } +} + +func TestHasLiveNeedles_EmptyFileIsFalse(t *testing.T) { + dir := t.TempDir() + + base := filepath.Join(dir, "foo_1") + + // Create an empty ecx file. + if err := os.WriteFile(base+".ecx", []byte{}, 0644); err != nil { + t.Fatalf("write ecx: %v", err) + } + + hasLive, err := erasure_coding.HasLiveNeedles(base) + if err != nil { + t.Fatalf("HasLiveNeedles: %v", err) + } + if hasLive { + t.Fatalf("expected no live entries for empty file") + } +} + +func makeNeedleMapEntry(key types.NeedleId, offset types.Offset, size types.Size) []byte { + b := make([]byte, types.NeedleIdSize+types.OffsetSize+types.SizeSize) + types.NeedleIdToBytes(b[0:types.NeedleIdSize], key) + types.OffsetToBytes(b[types.NeedleIdSize:types.NeedleIdSize+types.OffsetSize], offset) + types.SizeToBytes(b[types.NeedleIdSize+types.OffsetSize:types.NeedleIdSize+types.OffsetSize+types.SizeSize], size) + return b +}