diff --git a/weed/shell/command_ec_decode.go b/weed/shell/command_ec_decode.go index 8f04a7bf0..b903ebf45 100644 --- a/weed/shell/command_ec_decode.go +++ b/weed/shell/command_ec_decode.go @@ -35,7 +35,7 @@ func (c *commandEcDecode) Name() string { func (c *commandEcDecode) Help() string { return `decode a erasure coded volume into a normal volume - ec.decode [-collection=""] [-volumeId=] [-diskType=] + ec.decode [-collection=""] [-volumeId=] [-diskType=] [-checkMinFreeSpace] The -collection parameter supports regular expressions for pattern matching: - Use exact match: ec.decode -collection="^mybucket$" @@ -44,6 +44,7 @@ func (c *commandEcDecode) Help() string { Options: -diskType: source disk type where EC shards are stored (hdd, ssd, or empty for default hdd) + -checkMinFreeSpace: check min free space when selecting the decode target (default true) Examples: # Decode EC shards from HDD (default) @@ -64,6 +65,7 @@ func (c *commandEcDecode) Do(args []string, commandEnv *CommandEnv, writer io.Wr volumeId := decodeCommand.Int("volumeId", 0, "the volume id") collection := decodeCommand.String("collection", "", "the collection name") diskTypeStr := decodeCommand.String("diskType", "", "source disk type where EC shards are stored (hdd, ssd, or empty for default hdd)") + checkMinFreeSpace := decodeCommand.Bool("checkMinFreeSpace", true, "check min free space when selecting the decode target") if err = decodeCommand.Parse(args); err != nil { return nil } @@ -80,10 +82,14 @@ func (c *commandEcDecode) Do(args []string, commandEnv *CommandEnv, writer io.Wr if err != nil { return err } + var diskUsageState *decodeDiskUsageState + if *checkMinFreeSpace { + diskUsageState = newDecodeDiskUsageState(topologyInfo, diskType) + } // volumeId is provided if vid != 0 { - return doEcDecode(commandEnv, topologyInfo, *collection, vid, diskType) + return doEcDecode(commandEnv, topologyInfo, *collection, vid, diskType, *checkMinFreeSpace, diskUsageState) } // apply to all volumes in the collection @@ -93,7 +99,7 @@ func (c *commandEcDecode) Do(args []string, commandEnv *CommandEnv, writer io.Wr } fmt.Printf("ec decode volumes: %v\n", volumeIds) for _, vid := range volumeIds { - if err = doEcDecode(commandEnv, topologyInfo, *collection, vid, diskType); err != nil { + if err = doEcDecode(commandEnv, topologyInfo, *collection, vid, diskType, *checkMinFreeSpace, diskUsageState); err != nil { return err } } @@ -101,7 +107,7 @@ func (c *commandEcDecode) Do(args []string, commandEnv *CommandEnv, writer io.Wr return nil } -func doEcDecode(commandEnv *CommandEnv, topoInfo *master_pb.TopologyInfo, collection string, vid needle.VolumeId, diskType types.DiskType) (err error) { +func doEcDecode(commandEnv *CommandEnv, topoInfo *master_pb.TopologyInfo, collection string, vid needle.VolumeId, diskType types.DiskType, checkMinFreeSpace bool, diskUsageState *decodeDiskUsageState) (err error) { if !commandEnv.isLocked() { return fmt.Errorf("lock is lost") @@ -112,8 +118,36 @@ func doEcDecode(commandEnv *CommandEnv, topoInfo *master_pb.TopologyInfo, collec fmt.Printf("ec volume %d shard locations: %+v\n", vid, nodeToEcShardsInfo) + if len(nodeToEcShardsInfo) == 0 { + return fmt.Errorf("no EC shards found for volume %d (diskType %s)", vid, diskType.ReadableString()) + } + + var originalShardCounts map[pb.ServerAddress]int + if diskUsageState != nil { + originalShardCounts = make(map[pb.ServerAddress]int, len(nodeToEcShardsInfo)) + for location, si := range nodeToEcShardsInfo { + originalShardCounts[location] = si.Count() + } + } + + var eligibleTargets map[pb.ServerAddress]struct{} + if checkMinFreeSpace { + if diskUsageState == nil { + return fmt.Errorf("min free space checking requires disk usage state") + } + eligibleTargets = make(map[pb.ServerAddress]struct{}) + for location := range nodeToEcShardsInfo { + if freeCount, found := diskUsageState.freeVolumeCount(location); found && freeCount > 0 { + eligibleTargets[location] = struct{}{} + } + } + if len(eligibleTargets) == 0 { + return fmt.Errorf("no eligible target datanodes with free volume slots for volume %d (diskType %s); use -checkMinFreeSpace=false to override", vid, diskType.ReadableString()) + } + } + // collect ec shards to the server with most space - targetNodeLocation, err := collectEcShards(commandEnv, nodeToEcShardsInfo, collection, vid) + targetNodeLocation, err := collectEcShards(commandEnv, nodeToEcShardsInfo, collection, vid, eligibleTargets) if err != nil { return fmt.Errorf("collectEcShards for volume %d: %v", vid, err) } @@ -124,7 +158,13 @@ func doEcDecode(commandEnv *CommandEnv, topoInfo *master_pb.TopologyInfo, collec // Special case: if the EC index has no live entries, decoding is a no-op. // Just purge EC shards and return success without generating/mounting an empty volume. if isEcDecodeEmptyVolumeErr(err) { - return unmountAndDeleteEcShards(commandEnv.option.GrpcDialOption, collection, nodeToEcShardsInfo, vid) + if err := unmountAndDeleteEcShards(commandEnv.option.GrpcDialOption, collection, nodeToEcShardsInfo, vid); err != nil { + return err + } + if diskUsageState != nil { + diskUsageState.applyDecode(targetNodeLocation, originalShardCounts, false) + } + return nil } return fmt.Errorf("generate normal volume %d on %s: %v", vid, targetNodeLocation, err) } @@ -134,6 +174,9 @@ func doEcDecode(commandEnv *CommandEnv, topoInfo *master_pb.TopologyInfo, collec if err != nil { return fmt.Errorf("delete ec shards for volume %d: %v", vid, err) } + if diskUsageState != nil { + diskUsageState.applyDecode(targetNodeLocation, originalShardCounts, true) + } return nil } @@ -207,11 +250,16 @@ func generateNormalVolume(grpcDialOption grpc.DialOption, vid needle.VolumeId, c } -func collectEcShards(commandEnv *CommandEnv, nodeToShardsInfo map[pb.ServerAddress]*erasure_coding.ShardsInfo, collection string, vid needle.VolumeId) (targetNodeLocation pb.ServerAddress, err error) { +func collectEcShards(commandEnv *CommandEnv, nodeToShardsInfo map[pb.ServerAddress]*erasure_coding.ShardsInfo, collection string, vid needle.VolumeId, eligibleTargets map[pb.ServerAddress]struct{}) (targetNodeLocation pb.ServerAddress, err error) { - maxShardCount := 0 + maxShardCount := -1 existingShardsInfo := erasure_coding.NewShardsInfo() for loc, si := range nodeToShardsInfo { + if eligibleTargets != nil { + if _, ok := eligibleTargets[loc]; !ok { + continue + } + } toBeCopiedShardCount := si.MinusParityShards().Count() if toBeCopiedShardCount > maxShardCount { maxShardCount = toBeCopiedShardCount @@ -219,6 +267,9 @@ func collectEcShards(commandEnv *CommandEnv, nodeToShardsInfo map[pb.ServerAddre existingShardsInfo = si } } + if targetNodeLocation == "" { + return "", fmt.Errorf("no eligible target datanodes available to decode volume %d", vid) + } fmt.Printf("collectEcShards: ec volume %d collect shards to %s from: %+v\n", vid, targetNodeLocation, nodeToShardsInfo) @@ -326,3 +377,58 @@ func collectEcNodeShardsInfo(topoInfo *master_pb.TopologyInfo, vid needle.Volume return res } + +type decodeDiskUsageState struct { + byNode map[pb.ServerAddress]*decodeDiskUsageCounts +} + +type decodeDiskUsageCounts struct { + maxVolumeCount int64 + volumeCount int64 + remoteVolumeCount int64 + ecShardCount int64 +} + +func newDecodeDiskUsageState(topoInfo *master_pb.TopologyInfo, diskType types.DiskType) *decodeDiskUsageState { + state := &decodeDiskUsageState{byNode: make(map[pb.ServerAddress]*decodeDiskUsageCounts)} + eachDataNode(topoInfo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) { + if diskInfo, found := dn.DiskInfos[string(diskType)]; found { + state.byNode[pb.NewServerAddressFromDataNode(dn)] = &decodeDiskUsageCounts{ + maxVolumeCount: diskInfo.MaxVolumeCount, + volumeCount: diskInfo.VolumeCount, + remoteVolumeCount: diskInfo.RemoteVolumeCount, + ecShardCount: int64(countShards(diskInfo.EcShardInfos)), + } + } + }) + return state +} + +func (state *decodeDiskUsageState) freeVolumeCount(location pb.ServerAddress) (int64, bool) { + if state == nil { + return 0, false + } + usage, found := state.byNode[location] + if !found { + return 0, false + } + free := usage.maxVolumeCount - (usage.volumeCount - usage.remoteVolumeCount) + free -= (usage.ecShardCount + int64(erasure_coding.DataShardsCount) - 1) / int64(erasure_coding.DataShardsCount) + return free, true +} + +func (state *decodeDiskUsageState) applyDecode(targetNodeLocation pb.ServerAddress, shardCounts map[pb.ServerAddress]int, createdVolume bool) { + if state == nil { + return + } + for location, shardCount := range shardCounts { + if usage, found := state.byNode[location]; found { + usage.ecShardCount -= int64(shardCount) + } + } + if createdVolume { + if usage, found := state.byNode[targetNodeLocation]; found { + usage.volumeCount++ + } + } +}