Browse Source

shell: volume.tier.move makes up changes if volume move failed

pull/2274/head
Chris Lu 3 years ago
parent
commit
0f7d4556d8
  1. 24
      weed/shell/command_ec_encode.go
  2. 11
      weed/shell/command_volume_move.go
  3. 9
      weed/shell/command_volume_tier_move.go
  4. 2
      weed/shell/command_volume_tier_upload.go

24
weed/shell/command_ec_encode.go

@ -100,7 +100,7 @@ func doEcEncode(commandEnv *CommandEnv, collection string, vid needle.VolumeId,
// fmt.Printf("found ec %d shards on %v\n", vid, locations) // fmt.Printf("found ec %d shards on %v\n", vid, locations)
// mark the volume as readonly // mark the volume as readonly
err = markVolumeReadonly(commandEnv.option.GrpcDialOption, vid, locations)
err = markVolumeReplicasWritable(commandEnv.option.GrpcDialOption, vid, locations, false)
if err != nil { if err != nil {
return fmt.Errorf("mark volume %d as readonly on %s: %v", vid, locations[0].Url, err) return fmt.Errorf("mark volume %d as readonly on %s: %v", vid, locations[0].Url, err)
} }
@ -120,28 +120,6 @@ func doEcEncode(commandEnv *CommandEnv, collection string, vid needle.VolumeId,
return nil return nil
} }
func markVolumeReadonly(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, locations []wdclient.Location) error {
for _, location := range locations {
fmt.Printf("markVolumeReadonly %d on %s ...\n", volumeId, location.Url)
err := operation.WithVolumeServerClient(location.Url, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, markErr := volumeServerClient.VolumeMarkReadonly(context.Background(), &volume_server_pb.VolumeMarkReadonlyRequest{
VolumeId: uint32(volumeId),
})
return markErr
})
if err != nil {
return err
}
}
return nil
}
func generateEcShards(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, collection string, sourceVolumeServer string) error { func generateEcShards(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, collection string, sourceVolumeServer string) error {
fmt.Printf("generateEcShards %s %d on %s ...\n", collection, volumeId, sourceVolumeServer) fmt.Printf("generateEcShards %s %d on %s ...\n", collection, volumeId, sourceVolumeServer)

11
weed/shell/command_volume_move.go

@ -4,6 +4,7 @@ import (
"context" "context"
"flag" "flag"
"fmt" "fmt"
"github.com/chrislusf/seaweedfs/weed/wdclient"
"io" "io"
"log" "log"
"time" "time"
@ -190,3 +191,13 @@ func markVolumeWritable(grpcDialOption grpc.DialOption, volumeId needle.VolumeId
return err return err
}) })
} }
func markVolumeReplicasWritable(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, locations []wdclient.Location, writable bool) error {
for _, location := range locations {
fmt.Printf("markVolumeReadonly %d on %s ...\n", volumeId, location.Url)
if err:= markVolumeWritable(grpcDialOption, volumeId, location.Url, writable); err != nil {
return err
}
}
return nil
}

9
weed/shell/command_volume_tier_move.go

@ -3,6 +3,7 @@ package shell
import ( import (
"flag" "flag"
"fmt" "fmt"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/storage/types" "github.com/chrislusf/seaweedfs/weed/storage/types"
"github.com/chrislusf/seaweedfs/weed/wdclient" "github.com/chrislusf/seaweedfs/weed/wdclient"
@ -172,10 +173,16 @@ func (c *commandVolumeTierMove) doVolumeTierMove(commandEnv *CommandEnv, writer
func (c *commandVolumeTierMove) doMoveOneVolume(commandEnv *CommandEnv, writer io.Writer, vid needle.VolumeId, toDiskType types.DiskType, locations []wdclient.Location, sourceVolumeServer string, dst location) (err error) { func (c *commandVolumeTierMove) doMoveOneVolume(commandEnv *CommandEnv, writer io.Writer, vid needle.VolumeId, toDiskType types.DiskType, locations []wdclient.Location, sourceVolumeServer string, dst location) (err error) {
// mark all replicas as read only // mark all replicas as read only
if err = markVolumeReadonly(commandEnv.option.GrpcDialOption, vid, locations); err != nil {
if err = markVolumeReplicasWritable(commandEnv.option.GrpcDialOption, vid, locations, false); err != nil {
return fmt.Errorf("mark volume %d as readonly on %s: %v", vid, locations[0].Url, err) return fmt.Errorf("mark volume %d as readonly on %s: %v", vid, locations[0].Url, err)
} }
if err = LiveMoveVolume(commandEnv.option.GrpcDialOption, writer, vid, sourceVolumeServer, dst.dataNode.Id, 5*time.Second, toDiskType.ReadableString(), true); err != nil { if err = LiveMoveVolume(commandEnv.option.GrpcDialOption, writer, vid, sourceVolumeServer, dst.dataNode.Id, 5*time.Second, toDiskType.ReadableString(), true); err != nil {
// mark all replicas as writable
if err = markVolumeReplicasWritable(commandEnv.option.GrpcDialOption, vid, locations, true); err != nil {
glog.Errorf("mark volume %d as writable on %s: %v", vid, locations[0].Url, err)
}
return fmt.Errorf("move volume %d %s => %s : %v", vid, locations[0].Url, dst.dataNode.Id, err) return fmt.Errorf("move volume %d %s => %s : %v", vid, locations[0].Url, dst.dataNode.Id, err)
} }

2
weed/shell/command_volume_tier_upload.go

@ -101,7 +101,7 @@ func doVolumeTierUpload(commandEnv *CommandEnv, writer io.Writer, collection str
return fmt.Errorf("volume %d not found", vid) return fmt.Errorf("volume %d not found", vid)
} }
err = markVolumeReadonly(commandEnv.option.GrpcDialOption, needle.VolumeId(vid), locations)
err = markVolumeReplicasWritable(commandEnv.option.GrpcDialOption, needle.VolumeId(vid), locations, false)
if err != nil { if err != nil {
return fmt.Errorf("mark volume %d as readonly on %s: %v", vid, locations[0].Url, err) return fmt.Errorf("mark volume %d as readonly on %s: %v", vid, locations[0].Url, err)
} }

Loading…
Cancel
Save