Browse Source

fix(ec.decode): purge EC shards when volume is empty (#7749)

* fix(ec.decode): purge EC shards when volume is empty

When an EC volume has no live entries (all deleted), ec.decode should not generate an empty normal volume. Instead, treat decode as a no-op and allow shard purge to proceed cleanly.\n\nFixes: #7748

* chore: address PR review comments

* test: cover live EC index + avoid magic string

* chore: harden empty-EC handling

- Make shard cleanup best-effort (collect errors)\n- Remove unreachable EOF handling in HasLiveNeedles\n- Add empty ecx test case\n- Share no-live-entries substring between server/client\n

* perf: parallelize EC shard unmount/delete across locations

* refactor: combine unmount+delete into single goroutine per location

* refactor: use errors.Join for multi-error aggregation

* refactor: use existing ErrorWaitGroup for parallel execution

* fix: capture loop variables + clarify SuperBlockSize safety
pull/7754/head
Chris Lu 4 days ago
committed by GitHub
parent
commit
7ed7578424
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 14
      weed/server/volume_grpc_erasure_coding.go
  2. 65
      weed/shell/command_ec_decode.go
  3. 22
      weed/storage/erasure_coding/ec_decoder.go
  4. 81
      weed/storage/erasure_coding/ec_decoder_test.go

14
weed/server/volume_grpc_erasure_coding.go

@ -20,6 +20,9 @@ import (
"github.com/seaweedfs/seaweedfs/weed/storage/types" "github.com/seaweedfs/seaweedfs/weed/storage/types"
"github.com/seaweedfs/seaweedfs/weed/storage/volume_info" "github.com/seaweedfs/seaweedfs/weed/storage/volume_info"
"github.com/seaweedfs/seaweedfs/weed/util" "github.com/seaweedfs/seaweedfs/weed/util"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
) )
/* /*
@ -506,6 +509,17 @@ func (vs *VolumeServer) VolumeEcShardsToVolume(ctx context.Context, req *volume_
} }
dataBaseFileName, indexBaseFileName := v.DataBaseFileName(), v.IndexBaseFileName() dataBaseFileName, indexBaseFileName := v.DataBaseFileName(), v.IndexBaseFileName()
// If the EC index contains no live entries, decoding should be a no-op:
// just allow the caller to purge EC shards and do not generate an empty normal volume.
hasLive, err := erasure_coding.HasLiveNeedles(indexBaseFileName)
if err != nil {
return nil, fmt.Errorf("HasLiveNeedles %s: %w", indexBaseFileName, err)
}
if !hasLive {
return nil, status.Errorf(codes.FailedPrecondition, "ec volume %d %s", req.VolumeId, erasure_coding.EcNoLiveEntriesSubstring)
}
// calculate .dat file size // calculate .dat file size
datFileSize, err := erasure_coding.FindDatFileSize(dataBaseFileName, indexBaseFileName) datFileSize, err := erasure_coding.FindDatFileSize(dataBaseFileName, indexBaseFileName)
if err != nil { if err != nil {

65
weed/shell/command_ec_decode.go

@ -5,11 +5,14 @@ import (
"flag" "flag"
"fmt" "fmt"
"io" "io"
"strings"
"github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/storage/types" "github.com/seaweedfs/seaweedfs/weed/storage/types"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/seaweedfs/seaweedfs/weed/operation" "github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
@ -118,6 +121,11 @@ func doEcDecode(commandEnv *CommandEnv, topoInfo *master_pb.TopologyInfo, collec
// generate a normal volume // generate a normal volume
err = generateNormalVolume(commandEnv.option.GrpcDialOption, vid, collection, targetNodeLocation) err = generateNormalVolume(commandEnv.option.GrpcDialOption, vid, collection, targetNodeLocation)
if err != nil { if err != nil {
// Special case: if the EC index has no live entries, decoding is a no-op.
// Just purge EC shards and return success without generating/mounting an empty volume.
if isEcDecodeEmptyVolumeErr(err) {
return unmountAndDeleteEcShards(commandEnv.option.GrpcDialOption, collection, nodeToEcIndexBits, vid)
}
return fmt.Errorf("generate normal volume %d on %s: %v", vid, targetNodeLocation, err) return fmt.Errorf("generate normal volume %d on %s: %v", vid, targetNodeLocation, err)
} }
@ -130,6 +138,44 @@ func doEcDecode(commandEnv *CommandEnv, topoInfo *master_pb.TopologyInfo, collec
return nil return nil
} }
func isEcDecodeEmptyVolumeErr(err error) bool {
st, ok := status.FromError(err)
if !ok {
return false
}
if st.Code() != codes.FailedPrecondition {
return false
}
// Keep this robust against wording tweaks while still being specific.
return strings.Contains(st.Message(), erasure_coding.EcNoLiveEntriesSubstring)
}
func unmountAndDeleteEcShards(grpcDialOption grpc.DialOption, collection string, nodeToEcIndexBits map[pb.ServerAddress]erasure_coding.ShardBits, vid needle.VolumeId) error {
return unmountAndDeleteEcShardsWithPrefix("unmountAndDeleteEcShards", grpcDialOption, collection, nodeToEcIndexBits, vid)
}
func unmountAndDeleteEcShardsWithPrefix(prefix string, grpcDialOption grpc.DialOption, collection string, nodeToEcIndexBits map[pb.ServerAddress]erasure_coding.ShardBits, vid needle.VolumeId) error {
ewg := NewErrorWaitGroup(len(nodeToEcIndexBits))
// unmount and delete ec shards in parallel (one goroutine per location)
for location, ecIndexBits := range nodeToEcIndexBits {
location, ecIndexBits := location, ecIndexBits // capture loop variables for goroutine
ewg.Add(func() error {
fmt.Printf("unmount ec volume %d on %s has shards: %+v\n", vid, location, ecIndexBits.ShardIds())
if err := unmountEcShards(grpcDialOption, vid, location, ecIndexBits.ToUint32Slice()); err != nil {
return fmt.Errorf("%s unmount ec volume %d on %s: %w", prefix, vid, location, err)
}
fmt.Printf("delete ec volume %d on %s has shards: %+v\n", vid, location, ecIndexBits.ShardIds())
if err := sourceServerDeleteEcShards(grpcDialOption, collection, vid, location, ecIndexBits.ToUint32Slice()); err != nil {
return fmt.Errorf("%s delete ec volume %d on %s: %w", prefix, vid, location, err)
}
return nil
})
}
return ewg.Wait()
}
func mountVolumeAndDeleteEcShards(grpcDialOption grpc.DialOption, collection string, targetNodeLocation pb.ServerAddress, nodeToEcIndexBits map[pb.ServerAddress]erasure_coding.ShardBits, vid needle.VolumeId) error { func mountVolumeAndDeleteEcShards(grpcDialOption grpc.DialOption, collection string, targetNodeLocation pb.ServerAddress, nodeToEcIndexBits map[pb.ServerAddress]erasure_coding.ShardBits, vid needle.VolumeId) error {
// mount volume // mount volume
@ -142,24 +188,7 @@ func mountVolumeAndDeleteEcShards(grpcDialOption grpc.DialOption, collection str
return fmt.Errorf("mountVolumeAndDeleteEcShards mount volume %d on %s: %v", vid, targetNodeLocation, err) return fmt.Errorf("mountVolumeAndDeleteEcShards mount volume %d on %s: %v", vid, targetNodeLocation, err)
} }
// unmount ec shards
for location, ecIndexBits := range nodeToEcIndexBits {
fmt.Printf("unmount ec volume %d on %s has shards: %+v\n", vid, location, ecIndexBits.ShardIds())
err := unmountEcShards(grpcDialOption, vid, location, ecIndexBits.ToUint32Slice())
if err != nil {
return fmt.Errorf("mountVolumeAndDeleteEcShards unmount ec volume %d on %s: %v", vid, location, err)
}
}
// delete ec shards
for location, ecIndexBits := range nodeToEcIndexBits {
fmt.Printf("delete ec volume %d on %s has shards: %+v\n", vid, location, ecIndexBits.ShardIds())
err := sourceServerDeleteEcShards(grpcDialOption, collection, vid, location, ecIndexBits.ToUint32Slice())
if err != nil {
return fmt.Errorf("mountVolumeAndDeleteEcShards delete ec volume %d on %s: %v", vid, location, err)
}
}
return nil
return unmountAndDeleteEcShardsWithPrefix("mountVolumeAndDeleteEcShards", grpcDialOption, collection, nodeToEcIndexBits, vid)
} }
func generateNormalVolume(grpcDialOption grpc.DialOption, vid needle.VolumeId, collection string, sourceVolumeServer pb.ServerAddress) error { func generateNormalVolume(grpcDialOption grpc.DialOption, vid needle.VolumeId, collection string, sourceVolumeServer pb.ServerAddress) error {

22
weed/storage/erasure_coding/ec_decoder.go

@ -14,6 +14,23 @@ import (
"github.com/seaweedfs/seaweedfs/weed/util" "github.com/seaweedfs/seaweedfs/weed/util"
) )
// EcNoLiveEntriesSubstring is used for server/client coordination when ec.decode determines that
// decoding should be a no-op (all entries are deleted).
const EcNoLiveEntriesSubstring = "has no live entries"
// HasLiveNeedles returns whether the EC index (.ecx) contains at least one live (non-deleted) entry.
// This is used by ec.decode to avoid generating an empty normal volume when all entries were deleted.
func HasLiveNeedles(indexBaseFileName string) (hasLive bool, err error) {
err = iterateEcxFile(indexBaseFileName, func(_ types.NeedleId, _ types.Offset, size types.Size) error {
if !size.IsDeleted() {
hasLive = true
return io.EOF // stop early
}
return nil
})
return
}
// write .idx file from .ecx and .ecj files // write .idx file from .ecx and .ecj files
func WriteIdxFileFromEcIndex(baseFileName string) (err error) { func WriteIdxFileFromEcIndex(baseFileName string) (err error) {
@ -52,6 +69,11 @@ func FindDatFileSize(dataBaseFileName, indexBaseFileName string) (datSize int64,
return 0, fmt.Errorf("read ec volume %s version: %v", dataBaseFileName, err) return 0, fmt.Errorf("read ec volume %s version: %v", dataBaseFileName, err)
} }
// Safety: ensure datSize is at least SuperBlockSize. While the caller typically
// checks HasLiveNeedles first, this protects against direct calls to FindDatFileSize
// when all needles are deleted (see issue #7748).
datSize = int64(super_block.SuperBlockSize)
err = iterateEcxFile(indexBaseFileName, func(key types.NeedleId, offset types.Offset, size types.Size) error { err = iterateEcxFile(indexBaseFileName, func(key types.NeedleId, offset types.Offset, size types.Size) error {
if size.IsDeleted() { if size.IsDeleted() {

81
weed/storage/erasure_coding/ec_decoder_test.go

@ -0,0 +1,81 @@
package erasure_coding_test
import (
"os"
"path/filepath"
"testing"
erasure_coding "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
)
func TestHasLiveNeedles_AllDeletedIsFalse(t *testing.T) {
dir := t.TempDir()
collection := "foo"
base := filepath.Join(dir, collection+"_1")
// Build an ecx file with only deleted entries.
// ecx file entries are the same format as .idx entries.
ecx := makeNeedleMapEntry(types.NeedleId(1), types.Offset{}, types.TombstoneFileSize)
if err := os.WriteFile(base+".ecx", ecx, 0644); err != nil {
t.Fatalf("write ecx: %v", err)
}
hasLive, err := erasure_coding.HasLiveNeedles(base)
if err != nil {
t.Fatalf("HasLiveNeedles: %v", err)
}
if hasLive {
t.Fatalf("expected no live entries")
}
}
func TestHasLiveNeedles_WithLiveEntryIsTrue(t *testing.T) {
dir := t.TempDir()
collection := "foo"
base := filepath.Join(dir, collection+"_1")
// Build an ecx file containing at least one live entry.
// ecx file entries are the same format as .idx entries.
live := makeNeedleMapEntry(types.NeedleId(1), types.Offset{}, types.Size(1))
if err := os.WriteFile(base+".ecx", live, 0644); err != nil {
t.Fatalf("write ecx: %v", err)
}
hasLive, err := erasure_coding.HasLiveNeedles(base)
if err != nil {
t.Fatalf("HasLiveNeedles: %v", err)
}
if !hasLive {
t.Fatalf("expected live entries")
}
}
func TestHasLiveNeedles_EmptyFileIsFalse(t *testing.T) {
dir := t.TempDir()
base := filepath.Join(dir, "foo_1")
// Create an empty ecx file.
if err := os.WriteFile(base+".ecx", []byte{}, 0644); err != nil {
t.Fatalf("write ecx: %v", err)
}
hasLive, err := erasure_coding.HasLiveNeedles(base)
if err != nil {
t.Fatalf("HasLiveNeedles: %v", err)
}
if hasLive {
t.Fatalf("expected no live entries for empty file")
}
}
func makeNeedleMapEntry(key types.NeedleId, offset types.Offset, size types.Size) []byte {
b := make([]byte, types.NeedleIdSize+types.OffsetSize+types.SizeSize)
types.NeedleIdToBytes(b[0:types.NeedleIdSize], key)
types.OffsetToBytes(b[types.NeedleIdSize:types.NeedleIdSize+types.OffsetSize], offset)
types.SizeToBytes(b[types.NeedleIdSize+types.OffsetSize:types.NeedleIdSize+types.OffsetSize+types.SizeSize], size)
return b
}
Loading…
Cancel
Save