package shell import ( "context" "flag" "fmt" "io" "strings" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/storage/types" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "github.com/seaweedfs/seaweedfs/weed/operation" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" "github.com/seaweedfs/seaweedfs/weed/storage/needle" ) func init() { Commands = append(Commands, &commandEcDecode{}) } type commandEcDecode struct { } func (c *commandEcDecode) Name() string { return "ec.decode" } func (c *commandEcDecode) Help() string { return `decode a erasure coded volume into a normal volume ec.decode [-collection=""] [-volumeId=] [-diskType=] [-checkMinFreeSpace] The -collection parameter supports regular expressions for pattern matching: - Use exact match: ec.decode -collection="^mybucket$" - Match multiple buckets: ec.decode -collection="bucket.*" - Match all collections: ec.decode -collection=".*" Options: -diskType: source disk type where EC shards are stored (hdd, ssd, or empty for default hdd) -checkMinFreeSpace: check min free space when selecting the decode target (default true) Examples: # Decode EC shards from HDD (default) ec.decode -collection=mybucket # Decode EC shards from SSD ec.decode -collection=mybucket -diskType=ssd ` } func (c *commandEcDecode) HasTag(CommandTag) bool { return false } func (c *commandEcDecode) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { decodeCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) volumeId := decodeCommand.Int("volumeId", 0, "the volume id") collection := decodeCommand.String("collection", "", "the collection name") diskTypeStr := decodeCommand.String("diskType", "", "source disk type where EC shards are stored (hdd, ssd, or empty for default hdd)") checkMinFreeSpace := decodeCommand.Bool("checkMinFreeSpace", true, "check min free space when selecting the decode target") if err = decodeCommand.Parse(args); err != nil { return nil } if err = commandEnv.confirmIsLocked(args); err != nil { return } vid := needle.VolumeId(*volumeId) diskType := types.ToDiskType(*diskTypeStr) // collect topology information topologyInfo, _, err := collectTopologyInfo(commandEnv, 0) if err != nil { return err } var diskUsageState *decodeDiskUsageState if *checkMinFreeSpace { diskUsageState = newDecodeDiskUsageState(topologyInfo, diskType) } // volumeId is provided if vid != 0 { return doEcDecode(commandEnv, topologyInfo, *collection, vid, diskType, *checkMinFreeSpace, diskUsageState) } // apply to all volumes in the collection volumeIds, err := collectEcShardIds(topologyInfo, *collection, diskType) if err != nil { return err } fmt.Printf("ec decode volumes: %v\n", volumeIds) for _, vid := range volumeIds { if err = doEcDecode(commandEnv, topologyInfo, *collection, vid, diskType, *checkMinFreeSpace, diskUsageState); err != nil { return err } } return nil } func doEcDecode(commandEnv *CommandEnv, topoInfo *master_pb.TopologyInfo, collection string, vid needle.VolumeId, diskType types.DiskType, checkMinFreeSpace bool, diskUsageState *decodeDiskUsageState) (err error) { if !commandEnv.isLocked() { return fmt.Errorf("lock is lost") } // find volume location nodeToEcShardsInfo := collectEcNodeShardsInfo(topoInfo, vid, diskType) fmt.Printf("ec volume %d shard locations: %+v\n", vid, nodeToEcShardsInfo) if len(nodeToEcShardsInfo) == 0 { return fmt.Errorf("no EC shards found for volume %d (diskType %s)", vid, diskType.ReadableString()) } var originalShardCounts map[pb.ServerAddress]int if diskUsageState != nil { originalShardCounts = make(map[pb.ServerAddress]int, len(nodeToEcShardsInfo)) for location, si := range nodeToEcShardsInfo { originalShardCounts[location] = si.Count() } } var eligibleTargets map[pb.ServerAddress]struct{} if checkMinFreeSpace { if diskUsageState == nil { return fmt.Errorf("min free space checking requires disk usage state") } eligibleTargets = make(map[pb.ServerAddress]struct{}) for location := range nodeToEcShardsInfo { if freeCount, found := diskUsageState.freeVolumeCount(location); found && freeCount > 0 { eligibleTargets[location] = struct{}{} } } if len(eligibleTargets) == 0 { return fmt.Errorf("no eligible target datanodes with free volume slots for volume %d (diskType %s); use -checkMinFreeSpace=false to override", vid, diskType.ReadableString()) } } // collect ec shards to the server with most space targetNodeLocation, err := collectEcShards(commandEnv, nodeToEcShardsInfo, collection, vid, eligibleTargets) if err != nil { return fmt.Errorf("collectEcShards for volume %d: %v", vid, err) } // generate a normal volume err = generateNormalVolume(commandEnv.option.GrpcDialOption, vid, collection, targetNodeLocation) if err != nil { // Special case: if the EC index has no live entries, decoding is a no-op. // Just purge EC shards and return success without generating/mounting an empty volume. if isEcDecodeEmptyVolumeErr(err) { if err := unmountAndDeleteEcShards(commandEnv.option.GrpcDialOption, collection, nodeToEcShardsInfo, vid); err != nil { return err } if diskUsageState != nil { diskUsageState.applyDecode(targetNodeLocation, originalShardCounts, false) } return nil } return fmt.Errorf("generate normal volume %d on %s: %v", vid, targetNodeLocation, err) } // delete the previous ec shards err = mountVolumeAndDeleteEcShards(commandEnv.option.GrpcDialOption, collection, targetNodeLocation, nodeToEcShardsInfo, vid) if err != nil { return fmt.Errorf("delete ec shards for volume %d: %v", vid, err) } if diskUsageState != nil { diskUsageState.applyDecode(targetNodeLocation, originalShardCounts, true) } return nil } func isEcDecodeEmptyVolumeErr(err error) bool { st, ok := status.FromError(err) if !ok { return false } if st.Code() != codes.FailedPrecondition { return false } // Keep this robust against wording tweaks while still being specific. return strings.Contains(st.Message(), erasure_coding.EcNoLiveEntriesSubstring) } func unmountAndDeleteEcShards(grpcDialOption grpc.DialOption, collection string, nodeToShardsInfo map[pb.ServerAddress]*erasure_coding.ShardsInfo, vid needle.VolumeId) error { return unmountAndDeleteEcShardsWithPrefix("unmountAndDeleteEcShards", grpcDialOption, collection, nodeToShardsInfo, vid) } func unmountAndDeleteEcShardsWithPrefix(prefix string, grpcDialOption grpc.DialOption, collection string, nodeToShardsInfo map[pb.ServerAddress]*erasure_coding.ShardsInfo, vid needle.VolumeId) error { ewg := NewErrorWaitGroup(len(nodeToShardsInfo)) // unmount and delete ec shards in parallel (one goroutine per location) for location, si := range nodeToShardsInfo { location, si := location, si // capture loop variables for goroutine ewg.Add(func() error { fmt.Printf("unmount ec volume %d on %s has shards: %+v\n", vid, location, si.Ids()) if err := unmountEcShards(grpcDialOption, vid, location, si.Ids()); err != nil { return fmt.Errorf("%s unmount ec volume %d on %s: %w", prefix, vid, location, err) } fmt.Printf("delete ec volume %d on %s has shards: %+v\n", vid, location, si.Ids()) if err := sourceServerDeleteEcShards(grpcDialOption, collection, vid, location, si.Ids()); err != nil { return fmt.Errorf("%s delete ec volume %d on %s: %w", prefix, vid, location, err) } return nil }) } return ewg.Wait() } func mountVolumeAndDeleteEcShards(grpcDialOption grpc.DialOption, collection string, targetNodeLocation pb.ServerAddress, nodeToShardsInfo map[pb.ServerAddress]*erasure_coding.ShardsInfo, vid needle.VolumeId) error { // mount volume if err := operation.WithVolumeServerClient(false, targetNodeLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { _, mountErr := volumeServerClient.VolumeMount(context.Background(), &volume_server_pb.VolumeMountRequest{ VolumeId: uint32(vid), }) return mountErr }); err != nil { return fmt.Errorf("mountVolumeAndDeleteEcShards mount volume %d on %s: %v", vid, targetNodeLocation, err) } return unmountAndDeleteEcShardsWithPrefix("mountVolumeAndDeleteEcShards", grpcDialOption, collection, nodeToShardsInfo, vid) } func generateNormalVolume(grpcDialOption grpc.DialOption, vid needle.VolumeId, collection string, sourceVolumeServer pb.ServerAddress) error { fmt.Printf("generateNormalVolume from ec volume %d on %s\n", vid, sourceVolumeServer) err := operation.WithVolumeServerClient(false, sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { _, genErr := volumeServerClient.VolumeEcShardsToVolume(context.Background(), &volume_server_pb.VolumeEcShardsToVolumeRequest{ VolumeId: uint32(vid), Collection: collection, }) return genErr }) return err } func collectEcShards(commandEnv *CommandEnv, nodeToShardsInfo map[pb.ServerAddress]*erasure_coding.ShardsInfo, collection string, vid needle.VolumeId, eligibleTargets map[pb.ServerAddress]struct{}) (targetNodeLocation pb.ServerAddress, err error) { maxShardCount := -1 existingShardsInfo := erasure_coding.NewShardsInfo() for loc, si := range nodeToShardsInfo { if eligibleTargets != nil { if _, ok := eligibleTargets[loc]; !ok { continue } } toBeCopiedShardCount := si.MinusParityShards().Count() if toBeCopiedShardCount > maxShardCount { maxShardCount = toBeCopiedShardCount targetNodeLocation = loc existingShardsInfo = si } } if targetNodeLocation == "" { return "", fmt.Errorf("no eligible target datanodes available to decode volume %d", vid) } fmt.Printf("collectEcShards: ec volume %d collect shards to %s from: %+v\n", vid, targetNodeLocation, nodeToShardsInfo) copiedShardsInfo := erasure_coding.NewShardsInfo() for loc, si := range nodeToShardsInfo { if loc == targetNodeLocation { continue } needToCopyShardsInfo := si.Minus(existingShardsInfo).MinusParityShards() if needToCopyShardsInfo.Count() == 0 { continue } err = operation.WithVolumeServerClient(false, targetNodeLocation, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { fmt.Printf("copy %d.%v %s => %s\n", vid, needToCopyShardsInfo.Ids(), loc, targetNodeLocation) _, copyErr := volumeServerClient.VolumeEcShardsCopy(context.Background(), &volume_server_pb.VolumeEcShardsCopyRequest{ VolumeId: uint32(vid), Collection: collection, ShardIds: needToCopyShardsInfo.IdsUint32(), CopyEcxFile: false, CopyEcjFile: true, CopyVifFile: true, SourceDataNode: string(loc), }) if copyErr != nil { return fmt.Errorf("copy %d.%v %s => %s : %v\n", vid, needToCopyShardsInfo.Ids(), loc, targetNodeLocation, copyErr) } fmt.Printf("mount %d.%v on %s\n", vid, needToCopyShardsInfo.Ids(), targetNodeLocation) _, mountErr := volumeServerClient.VolumeEcShardsMount(context.Background(), &volume_server_pb.VolumeEcShardsMountRequest{ VolumeId: uint32(vid), Collection: collection, ShardIds: needToCopyShardsInfo.IdsUint32(), }) if mountErr != nil { return fmt.Errorf("mount %d.%v on %s : %v\n", vid, needToCopyShardsInfo.Ids(), targetNodeLocation, mountErr) } return nil }) if err != nil { break } copiedShardsInfo.Add(needToCopyShardsInfo) } nodeToShardsInfo[targetNodeLocation] = existingShardsInfo.Plus(copiedShardsInfo) return targetNodeLocation, err } func lookupVolumeIds(commandEnv *CommandEnv, volumeIds []string) (volumeIdLocations []*master_pb.LookupVolumeResponse_VolumeIdLocation, err error) { var resp *master_pb.LookupVolumeResponse err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error { resp, err = client.LookupVolume(context.Background(), &master_pb.LookupVolumeRequest{VolumeOrFileIds: volumeIds}) return err }) if err != nil { return nil, err } return resp.VolumeIdLocations, nil } func collectEcShardIds(topoInfo *master_pb.TopologyInfo, collectionPattern string, diskType types.DiskType) (vids []needle.VolumeId, err error) { // compile regex pattern for collection matching collectionRegex, err := compileCollectionPattern(collectionPattern) if err != nil { return nil, fmt.Errorf("invalid collection pattern '%s': %v", collectionPattern, err) } vidMap := make(map[uint32]bool) eachDataNode(topoInfo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) { if diskInfo, found := dn.DiskInfos[string(diskType)]; found { for _, v := range diskInfo.EcShardInfos { if collectionRegex.MatchString(v.Collection) { vidMap[v.Id] = true } } } }) for vid := range vidMap { vids = append(vids, needle.VolumeId(vid)) } return } func collectEcNodeShardsInfo(topoInfo *master_pb.TopologyInfo, vid needle.VolumeId, diskType types.DiskType) map[pb.ServerAddress]*erasure_coding.ShardsInfo { res := make(map[pb.ServerAddress]*erasure_coding.ShardsInfo) eachDataNode(topoInfo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) { if diskInfo, found := dn.DiskInfos[string(diskType)]; found { for _, v := range diskInfo.EcShardInfos { if v.Id == uint32(vid) { res[pb.NewServerAddressFromDataNode(dn)] = erasure_coding.ShardsInfoFromVolumeEcShardInformationMessage(v) } } } }) return res } type decodeDiskUsageState struct { byNode map[pb.ServerAddress]*decodeDiskUsageCounts } type decodeDiskUsageCounts struct { maxVolumeCount int64 volumeCount int64 remoteVolumeCount int64 ecShardCount int64 } func newDecodeDiskUsageState(topoInfo *master_pb.TopologyInfo, diskType types.DiskType) *decodeDiskUsageState { state := &decodeDiskUsageState{byNode: make(map[pb.ServerAddress]*decodeDiskUsageCounts)} eachDataNode(topoInfo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) { if diskInfo, found := dn.DiskInfos[string(diskType)]; found { state.byNode[pb.NewServerAddressFromDataNode(dn)] = &decodeDiskUsageCounts{ maxVolumeCount: diskInfo.MaxVolumeCount, volumeCount: diskInfo.VolumeCount, remoteVolumeCount: diskInfo.RemoteVolumeCount, ecShardCount: int64(countShards(diskInfo.EcShardInfos)), } } }) return state } func (state *decodeDiskUsageState) freeVolumeCount(location pb.ServerAddress) (int64, bool) { if state == nil { return 0, false } usage, found := state.byNode[location] if !found { return 0, false } free := usage.maxVolumeCount - (usage.volumeCount - usage.remoteVolumeCount) free -= (usage.ecShardCount + int64(erasure_coding.DataShardsCount) - 1) / int64(erasure_coding.DataShardsCount) return free, true } func (state *decodeDiskUsageState) applyDecode(targetNodeLocation pb.ServerAddress, shardCounts map[pb.ServerAddress]int, createdVolume bool) { if state == nil { return } for location, shardCount := range shardCounts { if usage, found := state.byNode[location]; found { usage.ecShardCount -= int64(shardCount) } } if createdVolume { if usage, found := state.byNode[targetNodeLocation]; found { usage.volumeCount++ } } }