You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
434 lines
15 KiB
434 lines
15 KiB
package shell
|
|
|
|
import (
|
|
"context"
|
|
"flag"
|
|
"fmt"
|
|
"io"
|
|
"strings"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/types"
|
|
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/codes"
|
|
"google.golang.org/grpc/status"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/operation"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
|
|
)
|
|
|
|
func init() {
|
|
Commands = append(Commands, &commandEcDecode{})
|
|
}
|
|
|
|
type commandEcDecode struct {
|
|
}
|
|
|
|
func (c *commandEcDecode) Name() string {
|
|
return "ec.decode"
|
|
}
|
|
|
|
func (c *commandEcDecode) Help() string {
|
|
return `decode a erasure coded volume into a normal volume
|
|
|
|
ec.decode [-collection=""] [-volumeId=<volume_id>] [-diskType=<disk_type>] [-checkMinFreeSpace]
|
|
|
|
The -collection parameter supports regular expressions for pattern matching:
|
|
- Use exact match: ec.decode -collection="^mybucket$"
|
|
- Match multiple buckets: ec.decode -collection="bucket.*"
|
|
- Match all collections: ec.decode -collection=".*"
|
|
|
|
Options:
|
|
-diskType: source disk type where EC shards are stored (hdd, ssd, or empty for default hdd)
|
|
-checkMinFreeSpace: check min free space when selecting the decode target (default true)
|
|
|
|
Examples:
|
|
# Decode EC shards from HDD (default)
|
|
ec.decode -collection=mybucket
|
|
|
|
# Decode EC shards from SSD
|
|
ec.decode -collection=mybucket -diskType=ssd
|
|
|
|
`
|
|
}
|
|
|
|
func (c *commandEcDecode) HasTag(CommandTag) bool {
|
|
return false
|
|
}
|
|
|
|
func (c *commandEcDecode) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
|
|
decodeCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
|
|
volumeId := decodeCommand.Int("volumeId", 0, "the volume id")
|
|
collection := decodeCommand.String("collection", "", "the collection name")
|
|
diskTypeStr := decodeCommand.String("diskType", "", "source disk type where EC shards are stored (hdd, ssd, or empty for default hdd)")
|
|
checkMinFreeSpace := decodeCommand.Bool("checkMinFreeSpace", true, "check min free space when selecting the decode target")
|
|
if err = decodeCommand.Parse(args); err != nil {
|
|
return nil
|
|
}
|
|
|
|
if err = commandEnv.confirmIsLocked(args); err != nil {
|
|
return
|
|
}
|
|
|
|
vid := needle.VolumeId(*volumeId)
|
|
diskType := types.ToDiskType(*diskTypeStr)
|
|
|
|
// collect topology information
|
|
topologyInfo, _, err := collectTopologyInfo(commandEnv, 0)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
var diskUsageState *decodeDiskUsageState
|
|
if *checkMinFreeSpace {
|
|
diskUsageState = newDecodeDiskUsageState(topologyInfo, diskType)
|
|
}
|
|
|
|
// volumeId is provided
|
|
if vid != 0 {
|
|
return doEcDecode(commandEnv, topologyInfo, *collection, vid, diskType, *checkMinFreeSpace, diskUsageState)
|
|
}
|
|
|
|
// apply to all volumes in the collection
|
|
volumeIds, err := collectEcShardIds(topologyInfo, *collection, diskType)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
fmt.Printf("ec decode volumes: %v\n", volumeIds)
|
|
for _, vid := range volumeIds {
|
|
if err = doEcDecode(commandEnv, topologyInfo, *collection, vid, diskType, *checkMinFreeSpace, diskUsageState); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func doEcDecode(commandEnv *CommandEnv, topoInfo *master_pb.TopologyInfo, collection string, vid needle.VolumeId, diskType types.DiskType, checkMinFreeSpace bool, diskUsageState *decodeDiskUsageState) (err error) {
|
|
|
|
if !commandEnv.isLocked() {
|
|
return fmt.Errorf("lock is lost")
|
|
}
|
|
|
|
// find volume location
|
|
nodeToEcShardsInfo := collectEcNodeShardsInfo(topoInfo, vid, diskType)
|
|
|
|
fmt.Printf("ec volume %d shard locations: %+v\n", vid, nodeToEcShardsInfo)
|
|
|
|
if len(nodeToEcShardsInfo) == 0 {
|
|
return fmt.Errorf("no EC shards found for volume %d (diskType %s)", vid, diskType.ReadableString())
|
|
}
|
|
|
|
var originalShardCounts map[pb.ServerAddress]int
|
|
if diskUsageState != nil {
|
|
originalShardCounts = make(map[pb.ServerAddress]int, len(nodeToEcShardsInfo))
|
|
for location, si := range nodeToEcShardsInfo {
|
|
originalShardCounts[location] = si.Count()
|
|
}
|
|
}
|
|
|
|
var eligibleTargets map[pb.ServerAddress]struct{}
|
|
if checkMinFreeSpace {
|
|
if diskUsageState == nil {
|
|
return fmt.Errorf("min free space checking requires disk usage state")
|
|
}
|
|
eligibleTargets = make(map[pb.ServerAddress]struct{})
|
|
for location := range nodeToEcShardsInfo {
|
|
if freeCount, found := diskUsageState.freeVolumeCount(location); found && freeCount > 0 {
|
|
eligibleTargets[location] = struct{}{}
|
|
}
|
|
}
|
|
if len(eligibleTargets) == 0 {
|
|
return fmt.Errorf("no eligible target datanodes with free volume slots for volume %d (diskType %s); use -checkMinFreeSpace=false to override", vid, diskType.ReadableString())
|
|
}
|
|
}
|
|
|
|
// collect ec shards to the server with most space
|
|
targetNodeLocation, err := collectEcShards(commandEnv, nodeToEcShardsInfo, collection, vid, eligibleTargets)
|
|
if err != nil {
|
|
return fmt.Errorf("collectEcShards for volume %d: %v", vid, err)
|
|
}
|
|
|
|
// generate a normal volume
|
|
err = generateNormalVolume(commandEnv.option.GrpcDialOption, vid, collection, targetNodeLocation)
|
|
if err != nil {
|
|
// Special case: if the EC index has no live entries, decoding is a no-op.
|
|
// Just purge EC shards and return success without generating/mounting an empty volume.
|
|
if isEcDecodeEmptyVolumeErr(err) {
|
|
if err := unmountAndDeleteEcShards(commandEnv.option.GrpcDialOption, collection, nodeToEcShardsInfo, vid); err != nil {
|
|
return err
|
|
}
|
|
if diskUsageState != nil {
|
|
diskUsageState.applyDecode(targetNodeLocation, originalShardCounts, false)
|
|
}
|
|
return nil
|
|
}
|
|
return fmt.Errorf("generate normal volume %d on %s: %v", vid, targetNodeLocation, err)
|
|
}
|
|
|
|
// delete the previous ec shards
|
|
err = mountVolumeAndDeleteEcShards(commandEnv.option.GrpcDialOption, collection, targetNodeLocation, nodeToEcShardsInfo, vid)
|
|
if err != nil {
|
|
return fmt.Errorf("delete ec shards for volume %d: %v", vid, err)
|
|
}
|
|
if diskUsageState != nil {
|
|
diskUsageState.applyDecode(targetNodeLocation, originalShardCounts, true)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func isEcDecodeEmptyVolumeErr(err error) bool {
|
|
st, ok := status.FromError(err)
|
|
if !ok {
|
|
return false
|
|
}
|
|
if st.Code() != codes.FailedPrecondition {
|
|
return false
|
|
}
|
|
// Keep this robust against wording tweaks while still being specific.
|
|
return strings.Contains(st.Message(), erasure_coding.EcNoLiveEntriesSubstring)
|
|
}
|
|
|
|
func unmountAndDeleteEcShards(grpcDialOption grpc.DialOption, collection string, nodeToShardsInfo map[pb.ServerAddress]*erasure_coding.ShardsInfo, vid needle.VolumeId) error {
|
|
return unmountAndDeleteEcShardsWithPrefix("unmountAndDeleteEcShards", grpcDialOption, collection, nodeToShardsInfo, vid)
|
|
}
|
|
|
|
func unmountAndDeleteEcShardsWithPrefix(prefix string, grpcDialOption grpc.DialOption, collection string, nodeToShardsInfo map[pb.ServerAddress]*erasure_coding.ShardsInfo, vid needle.VolumeId) error {
|
|
ewg := NewErrorWaitGroup(len(nodeToShardsInfo))
|
|
|
|
// unmount and delete ec shards in parallel (one goroutine per location)
|
|
for location, si := range nodeToShardsInfo {
|
|
location, si := location, si // capture loop variables for goroutine
|
|
ewg.Add(func() error {
|
|
fmt.Printf("unmount ec volume %d on %s has shards: %+v\n", vid, location, si.Ids())
|
|
if err := unmountEcShards(grpcDialOption, vid, location, si.Ids()); err != nil {
|
|
return fmt.Errorf("%s unmount ec volume %d on %s: %w", prefix, vid, location, err)
|
|
}
|
|
|
|
fmt.Printf("delete ec volume %d on %s has shards: %+v\n", vid, location, si.Ids())
|
|
if err := sourceServerDeleteEcShards(grpcDialOption, collection, vid, location, si.Ids()); err != nil {
|
|
return fmt.Errorf("%s delete ec volume %d on %s: %w", prefix, vid, location, err)
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
return ewg.Wait()
|
|
}
|
|
|
|
func mountVolumeAndDeleteEcShards(grpcDialOption grpc.DialOption, collection string, targetNodeLocation pb.ServerAddress, nodeToShardsInfo map[pb.ServerAddress]*erasure_coding.ShardsInfo, vid needle.VolumeId) error {
|
|
|
|
// mount volume
|
|
if err := operation.WithVolumeServerClient(false, targetNodeLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
|
|
_, mountErr := volumeServerClient.VolumeMount(context.Background(), &volume_server_pb.VolumeMountRequest{
|
|
VolumeId: uint32(vid),
|
|
})
|
|
return mountErr
|
|
}); err != nil {
|
|
return fmt.Errorf("mountVolumeAndDeleteEcShards mount volume %d on %s: %v", vid, targetNodeLocation, err)
|
|
}
|
|
|
|
return unmountAndDeleteEcShardsWithPrefix("mountVolumeAndDeleteEcShards", grpcDialOption, collection, nodeToShardsInfo, vid)
|
|
}
|
|
|
|
func generateNormalVolume(grpcDialOption grpc.DialOption, vid needle.VolumeId, collection string, sourceVolumeServer pb.ServerAddress) error {
|
|
|
|
fmt.Printf("generateNormalVolume from ec volume %d on %s\n", vid, sourceVolumeServer)
|
|
|
|
err := operation.WithVolumeServerClient(false, sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
|
|
_, genErr := volumeServerClient.VolumeEcShardsToVolume(context.Background(), &volume_server_pb.VolumeEcShardsToVolumeRequest{
|
|
VolumeId: uint32(vid),
|
|
Collection: collection,
|
|
})
|
|
return genErr
|
|
})
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
func collectEcShards(commandEnv *CommandEnv, nodeToShardsInfo map[pb.ServerAddress]*erasure_coding.ShardsInfo, collection string, vid needle.VolumeId, eligibleTargets map[pb.ServerAddress]struct{}) (targetNodeLocation pb.ServerAddress, err error) {
|
|
|
|
maxShardCount := -1
|
|
existingShardsInfo := erasure_coding.NewShardsInfo()
|
|
for loc, si := range nodeToShardsInfo {
|
|
if eligibleTargets != nil {
|
|
if _, ok := eligibleTargets[loc]; !ok {
|
|
continue
|
|
}
|
|
}
|
|
toBeCopiedShardCount := si.MinusParityShards().Count()
|
|
if toBeCopiedShardCount > maxShardCount {
|
|
maxShardCount = toBeCopiedShardCount
|
|
targetNodeLocation = loc
|
|
existingShardsInfo = si
|
|
}
|
|
}
|
|
if targetNodeLocation == "" {
|
|
return "", fmt.Errorf("no eligible target datanodes available to decode volume %d", vid)
|
|
}
|
|
|
|
fmt.Printf("collectEcShards: ec volume %d collect shards to %s from: %+v\n", vid, targetNodeLocation, nodeToShardsInfo)
|
|
|
|
copiedShardsInfo := erasure_coding.NewShardsInfo()
|
|
for loc, si := range nodeToShardsInfo {
|
|
if loc == targetNodeLocation {
|
|
continue
|
|
}
|
|
|
|
needToCopyShardsInfo := si.Minus(existingShardsInfo).MinusParityShards()
|
|
if needToCopyShardsInfo.Count() == 0 {
|
|
continue
|
|
}
|
|
|
|
err = operation.WithVolumeServerClient(false, targetNodeLocation, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
|
|
|
|
fmt.Printf("copy %d.%v %s => %s\n", vid, needToCopyShardsInfo.Ids(), loc, targetNodeLocation)
|
|
|
|
_, copyErr := volumeServerClient.VolumeEcShardsCopy(context.Background(), &volume_server_pb.VolumeEcShardsCopyRequest{
|
|
VolumeId: uint32(vid),
|
|
Collection: collection,
|
|
ShardIds: needToCopyShardsInfo.IdsUint32(),
|
|
CopyEcxFile: false,
|
|
CopyEcjFile: true,
|
|
CopyVifFile: true,
|
|
SourceDataNode: string(loc),
|
|
})
|
|
if copyErr != nil {
|
|
return fmt.Errorf("copy %d.%v %s => %s : %v\n", vid, needToCopyShardsInfo.Ids(), loc, targetNodeLocation, copyErr)
|
|
}
|
|
|
|
fmt.Printf("mount %d.%v on %s\n", vid, needToCopyShardsInfo.Ids(), targetNodeLocation)
|
|
_, mountErr := volumeServerClient.VolumeEcShardsMount(context.Background(), &volume_server_pb.VolumeEcShardsMountRequest{
|
|
VolumeId: uint32(vid),
|
|
Collection: collection,
|
|
ShardIds: needToCopyShardsInfo.IdsUint32(),
|
|
})
|
|
if mountErr != nil {
|
|
return fmt.Errorf("mount %d.%v on %s : %v\n", vid, needToCopyShardsInfo.Ids(), targetNodeLocation, mountErr)
|
|
}
|
|
|
|
return nil
|
|
})
|
|
|
|
if err != nil {
|
|
break
|
|
}
|
|
|
|
copiedShardsInfo.Add(needToCopyShardsInfo)
|
|
}
|
|
|
|
nodeToShardsInfo[targetNodeLocation] = existingShardsInfo.Plus(copiedShardsInfo)
|
|
|
|
return targetNodeLocation, err
|
|
}
|
|
|
|
func lookupVolumeIds(commandEnv *CommandEnv, volumeIds []string) (volumeIdLocations []*master_pb.LookupVolumeResponse_VolumeIdLocation, err error) {
|
|
var resp *master_pb.LookupVolumeResponse
|
|
err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
|
|
resp, err = client.LookupVolume(context.Background(), &master_pb.LookupVolumeRequest{VolumeOrFileIds: volumeIds})
|
|
return err
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return resp.VolumeIdLocations, nil
|
|
}
|
|
|
|
func collectEcShardIds(topoInfo *master_pb.TopologyInfo, collectionPattern string, diskType types.DiskType) (vids []needle.VolumeId, err error) {
|
|
// compile regex pattern for collection matching
|
|
collectionRegex, err := compileCollectionPattern(collectionPattern)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("invalid collection pattern '%s': %v", collectionPattern, err)
|
|
}
|
|
|
|
vidMap := make(map[uint32]bool)
|
|
eachDataNode(topoInfo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) {
|
|
if diskInfo, found := dn.DiskInfos[string(diskType)]; found {
|
|
for _, v := range diskInfo.EcShardInfos {
|
|
if collectionRegex.MatchString(v.Collection) {
|
|
vidMap[v.Id] = true
|
|
}
|
|
}
|
|
}
|
|
})
|
|
|
|
for vid := range vidMap {
|
|
vids = append(vids, needle.VolumeId(vid))
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
func collectEcNodeShardsInfo(topoInfo *master_pb.TopologyInfo, vid needle.VolumeId, diskType types.DiskType) map[pb.ServerAddress]*erasure_coding.ShardsInfo {
|
|
res := make(map[pb.ServerAddress]*erasure_coding.ShardsInfo)
|
|
eachDataNode(topoInfo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) {
|
|
if diskInfo, found := dn.DiskInfos[string(diskType)]; found {
|
|
for _, v := range diskInfo.EcShardInfos {
|
|
if v.Id == uint32(vid) {
|
|
res[pb.NewServerAddressFromDataNode(dn)] = erasure_coding.ShardsInfoFromVolumeEcShardInformationMessage(v)
|
|
}
|
|
}
|
|
}
|
|
})
|
|
|
|
return res
|
|
}
|
|
|
|
type decodeDiskUsageState struct {
|
|
byNode map[pb.ServerAddress]*decodeDiskUsageCounts
|
|
}
|
|
|
|
type decodeDiskUsageCounts struct {
|
|
maxVolumeCount int64
|
|
volumeCount int64
|
|
remoteVolumeCount int64
|
|
ecShardCount int64
|
|
}
|
|
|
|
func newDecodeDiskUsageState(topoInfo *master_pb.TopologyInfo, diskType types.DiskType) *decodeDiskUsageState {
|
|
state := &decodeDiskUsageState{byNode: make(map[pb.ServerAddress]*decodeDiskUsageCounts)}
|
|
eachDataNode(topoInfo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) {
|
|
if diskInfo, found := dn.DiskInfos[string(diskType)]; found {
|
|
state.byNode[pb.NewServerAddressFromDataNode(dn)] = &decodeDiskUsageCounts{
|
|
maxVolumeCount: diskInfo.MaxVolumeCount,
|
|
volumeCount: diskInfo.VolumeCount,
|
|
remoteVolumeCount: diskInfo.RemoteVolumeCount,
|
|
ecShardCount: int64(countShards(diskInfo.EcShardInfos)),
|
|
}
|
|
}
|
|
})
|
|
return state
|
|
}
|
|
|
|
func (state *decodeDiskUsageState) freeVolumeCount(location pb.ServerAddress) (int64, bool) {
|
|
if state == nil {
|
|
return 0, false
|
|
}
|
|
usage, found := state.byNode[location]
|
|
if !found {
|
|
return 0, false
|
|
}
|
|
free := usage.maxVolumeCount - (usage.volumeCount - usage.remoteVolumeCount)
|
|
free -= (usage.ecShardCount + int64(erasure_coding.DataShardsCount) - 1) / int64(erasure_coding.DataShardsCount)
|
|
return free, true
|
|
}
|
|
|
|
func (state *decodeDiskUsageState) applyDecode(targetNodeLocation pb.ServerAddress, shardCounts map[pb.ServerAddress]int, createdVolume bool) {
|
|
if state == nil {
|
|
return
|
|
}
|
|
for location, shardCount := range shardCounts {
|
|
if usage, found := state.byNode[location]; found {
|
|
usage.ecShardCount -= int64(shardCount)
|
|
}
|
|
}
|
|
if createdVolume {
|
|
if usage, found := state.byNode[targetNodeLocation]; found {
|
|
usage.volumeCount++
|
|
}
|
|
}
|
|
}
|