diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go index ebdd95b71..499196e8a 100644 --- a/weed/shell/command_ec_encode.go +++ b/weed/shell/command_ec_encode.go @@ -4,10 +4,11 @@ import ( "context" "flag" "fmt" - "github.com/seaweedfs/seaweedfs/weed/storage/types" "io" "time" + "github.com/seaweedfs/seaweedfs/weed/storage/types" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/wdclient" @@ -115,6 +116,14 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr balanceCollections = []string{*collection} } + // Collect volume locations BEFORE EC encoding starts to avoid race condition + // where the master metadata is updated after EC encoding but before deletion + fmt.Printf("Collecting volume locations for %d volumes before EC encoding...\n", len(volumeIds)) + volumeLocationsMap, err := volumeLocations(commandEnv, volumeIds) + if err != nil { + return fmt.Errorf("failed to collect volume locations before EC encoding: %v", err) + } + // encode all requested volumes... if err = doEcEncode(commandEnv, *collection, volumeIds, *maxParallelization); err != nil { return fmt.Errorf("ec encode for volumes %v: %v", volumeIds, err) @@ -123,10 +132,12 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr if err := EcBalance(commandEnv, balanceCollections, "", rp, *maxParallelization, *applyBalancing); err != nil { return fmt.Errorf("re-balance ec shards for collection(s) %v: %v", balanceCollections, err) } - // ...then delete original volumes. - if err := doDeleteVolumes(commandEnv, volumeIds, *maxParallelization); err != nil { - return fmt.Errorf("re-balance ec shards for collection(s) %v: %v", balanceCollections, err) + // ...then delete original volumes using pre-collected locations. + fmt.Printf("Deleting original volumes after EC encoding...\n") + if err := doDeleteVolumesWithLocations(commandEnv, volumeIds, volumeLocationsMap, *maxParallelization); err != nil { + return fmt.Errorf("delete original volumes after EC encoding: %v", err) } + fmt.Printf("Successfully completed EC encoding for %d volumes\n", len(volumeIds)) return nil } @@ -150,7 +161,7 @@ func doEcEncode(commandEnv *CommandEnv, collection string, volumeIds []needle.Vo } locations, err := volumeLocations(commandEnv, volumeIds) if err != nil { - return nil + return fmt.Errorf("failed to get volume locations for EC encoding: %v", err) } // mark volumes as readonly @@ -207,18 +218,22 @@ func doEcEncode(commandEnv *CommandEnv, collection string, volumeIds []needle.Vo return nil } -func doDeleteVolumes(commandEnv *CommandEnv, volumeIds []needle.VolumeId, maxParallelization int) error { +// doDeleteVolumesWithLocations deletes volumes using pre-collected location information +// This avoids race conditions where master metadata is updated after EC encoding +func doDeleteVolumesWithLocations(commandEnv *CommandEnv, volumeIds []needle.VolumeId, volumeLocationsMap map[needle.VolumeId][]wdclient.Location, maxParallelization int) error { if !commandEnv.isLocked() { return fmt.Errorf("lock is lost") } - locations, err := volumeLocations(commandEnv, volumeIds) - if err != nil { - return nil - } ewg := NewErrorWaitGroup(maxParallelization) for _, vid := range volumeIds { - for _, l := range locations[vid] { + locations, found := volumeLocationsMap[vid] + if !found { + fmt.Printf("warning: no locations found for volume %d, skipping deletion\n", vid) + continue + } + + for _, l := range locations { ewg.Add(func() error { if err := deleteVolume(commandEnv.option.GrpcDialOption, vid, l.ServerAddress(), false); err != nil { return fmt.Errorf("deleteVolume %s volume %d: %v", l.Url, vid, err)