From b05456fe07f49e50776124f5b3315c4bd7bfef36 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 3 Jun 2019 20:25:02 -0700 Subject: [PATCH] able to purge extra ec shard copies --- weed/server/volume_grpc_erasure_coding.go | 16 ++++- weed/shell/command_ec_balance.go | 83 ++++++++++++++++------- 2 files changed, 74 insertions(+), 25 deletions(-) diff --git a/weed/server/volume_grpc_erasure_coding.go b/weed/server/volume_grpc_erasure_coding.go index d73cbdeeb..ab1310c4a 100644 --- a/weed/server/volume_grpc_erasure_coding.go +++ b/weed/server/volume_grpc_erasure_coding.go @@ -126,8 +126,20 @@ func (vs *VolumeServer) VolumeEcShardsDelete(ctx context.Context, req *volume_se baseFilename := erasure_coding.EcShardBaseFileName(req.Collection, int(req.VolumeId)) - for _, shardId := range req.ShardIds { - os.Remove(baseFilename + erasure_coding.ToExt(int(shardId))) + found := false + for _, location := range vs.store.Locations { + if util.FileExists(path.Join(location.Directory, baseFilename+".ecx")) { + found = true + baseFilename = path.Join(location.Directory, baseFilename) + for _, shardId := range req.ShardIds { + os.Remove(baseFilename + erasure_coding.ToExt(int(shardId))) + } + break + } + } + + if !found { + return nil, nil } // check whether to delete the ecx file also diff --git a/weed/shell/command_ec_balance.go b/weed/shell/command_ec_balance.go index 0934c79fd..664284df9 100644 --- a/weed/shell/command_ec_balance.go +++ b/weed/shell/command_ec_balance.go @@ -129,40 +129,77 @@ func balanceEcVolumes(commandEnv *commandEnv, collection string, applyBalancing for vid, locations := range vidLocations { - // collect all ec nodes with at least one free slot - var possibleDestinationEcNodes []*EcNode - for _, ecNode := range allEcNodes { - if ecNode.freeEcSlot > 0 { - possibleDestinationEcNodes = append(possibleDestinationEcNodes, ecNode) - } + if err := doDeduplicateEcShards(ctx, commandEnv, collection, vid, locations, applyBalancing); err != nil { + return err } - // calculate average number of shards an ec node should have for one volume - averageShardsPerEcNode := int(math.Ceil(float64(erasure_coding.TotalShardsCount) / float64(len(possibleDestinationEcNodes)))) + if err := doBalanceEcShards(ctx, commandEnv, collection, vid, locations, allEcNodes, applyBalancing); err != nil { + return err + } - fmt.Printf("vid %d averageShardsPerEcNode %+v\n", vid, averageShardsPerEcNode) + } - // check whether this volume has ecNodes that are over average - isOverLimit := false - for _, ecNode := range locations { - shardBits := findEcVolumeShards(ecNode, vid) - if shardBits.ShardIdCount() > averageShardsPerEcNode { - isOverLimit = true - fmt.Printf("vid %d %s has %d shards, isOverLimit %+v\n", vid, ecNode.info.Id, shardBits.ShardIdCount(), isOverLimit) - break - } + return nil +} + +func doBalanceEcShards(ctx context.Context, commandEnv *commandEnv, collection string, vid needle.VolumeId, locations []*EcNode, allEcNodes []*EcNode, applyBalancing bool) error { + // collect all ec nodes with at least one free slot + var possibleDestinationEcNodes []*EcNode + for _, ecNode := range allEcNodes { + if ecNode.freeEcSlot > 0 { + possibleDestinationEcNodes = append(possibleDestinationEcNodes, ecNode) + } + } + // calculate average number of shards an ec node should have for one volume + averageShardsPerEcNode := int(math.Ceil(float64(erasure_coding.TotalShardsCount) / float64(len(possibleDestinationEcNodes)))) + fmt.Printf("vid %d averageShardsPerEcNode %+v\n", vid, averageShardsPerEcNode) + // check whether this volume has ecNodes that are over average + isOverLimit := false + for _, ecNode := range locations { + shardBits := findEcVolumeShards(ecNode, vid) + if shardBits.ShardIdCount() > averageShardsPerEcNode { + isOverLimit = true + fmt.Printf("vid %d %s has %d shards, isOverLimit %+v\n", vid, ecNode.info.Id, shardBits.ShardIdCount(), isOverLimit) + break + } + } + if isOverLimit { + if err := spreadShardsIntoMoreDataNodes(ctx, commandEnv, averageShardsPerEcNode, collection, vid, locations, possibleDestinationEcNodes, applyBalancing); err != nil { + return err } + } + return nil +} - if isOverLimit { +func doDeduplicateEcShards(ctx context.Context, commandEnv *commandEnv, collection string, vid needle.VolumeId, locations []*EcNode, applyBalancing bool) error { - if err := spreadShardsIntoMoreDataNodes(ctx, commandEnv, averageShardsPerEcNode, collection, vid, locations, possibleDestinationEcNodes, applyBalancing); err != nil { + // check whether this volume has ecNodes that are over average + shardToLocations := make([][]*EcNode, erasure_coding.TotalShardsCount) + for _, ecNode := range locations { + shardBits := findEcVolumeShards(ecNode, vid) + for _, shardId := range shardBits.ShardIds() { + shardToLocations[shardId] = append(shardToLocations[shardId], ecNode) + } + } + for shardId, ecNodes := range shardToLocations { + if len(ecNodes) <= 1 { + continue + } + sortEcNodes(ecNodes) + fmt.Printf("ec shard %d.%d has %d copies, removing from %+v\n", vid, shardId, len(ecNodes), ecNodes[1:]) + if !applyBalancing { + continue + } + for _, ecNode := range ecNodes[1:] { + duplicatedShardIds := []uint32{uint32(shardId)} + if err := unmountEcShards(ctx, commandEnv.option.GrpcDialOption, vid, ecNode.info.Id, duplicatedShardIds); err != nil { + return err + } + if err := sourceServerDeleteEcShards(ctx, commandEnv.option.GrpcDialOption, collection, vid, ecNode.info.Id, duplicatedShardIds); err != nil { return err } - } - } - return nil }