Browse Source

Parallelize volume replica operations within `ec.encode`.

pull/6374/head
Lisandro Pin 5 days ago
parent
commit
826edd5d6f
  1. 21
      weed/shell/command_ec_encode.go
  2. 8
      weed/shell/command_volume_move.go

21
weed/shell/command_ec_encode.go

@ -119,7 +119,7 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr
// encode all requested volumes... // encode all requested volumes...
for _, vid := range volumeIds { for _, vid := range volumeIds {
if err = doEcEncode(commandEnv, *collection, vid); err != nil {
if err = doEcEncode(commandEnv, *collection, vid, *parallelize); err != nil {
return fmt.Errorf("ec encode for volume %d: %v", vid, err) return fmt.Errorf("ec encode for volume %d: %v", vid, err)
} }
} }
@ -131,7 +131,7 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr
return nil return nil
} }
func doEcEncode(commandEnv *CommandEnv, collection string, vid needle.VolumeId) error {
func doEcEncode(commandEnv *CommandEnv, collection string, vid needle.VolumeId, parallelize bool) error {
if !commandEnv.isLocked() { if !commandEnv.isLocked() {
return fmt.Errorf("lock is lost") return fmt.Errorf("lock is lost")
} }
@ -142,11 +142,20 @@ func doEcEncode(commandEnv *CommandEnv, collection string, vid needle.VolumeId)
return fmt.Errorf("volume %d not found", vid) return fmt.Errorf("volume %d not found", vid)
} }
// fmt.Printf("found ec %d shards on %v\n", vid, locations)
// mark the volume as readonly // mark the volume as readonly
if err := markVolumeReplicasWritable(commandEnv.option.GrpcDialOption, vid, locations, false, false); err != nil {
return fmt.Errorf("mark volume %d as readonly on %s: %v", vid, locations[0].Url, err)
ewg := ErrorWaitGroup{
parallelize: parallelize,
}
for _, location := range locations {
ewg.Add(func() error {
if err := markVolumeReplicaWritable(commandEnv.option.GrpcDialOption, vid, location, false, false); err != nil {
return fmt.Errorf("mark volume %d as readonly on %s: %v", vid, location.Url, err)
}
return nil
})
}
if err := ewg.Wait(); err != nil {
return err
} }
// generate ec shards // generate ec shards

8
weed/shell/command_volume_move.go

@ -227,10 +227,14 @@ func markVolumeWritable(grpcDialOption grpc.DialOption, volumeId needle.VolumeId
}) })
} }
func markVolumeReplicaWritable(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, location wdclient.Location, writable, persist bool) error {
fmt.Printf("markVolumeReadonly %d on %s ...\n", volumeId, location.Url)
return markVolumeWritable(grpcDialOption, volumeId, location.ServerAddress(), writable, persist)
}
func markVolumeReplicasWritable(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, locations []wdclient.Location, writable, persist bool) error { func markVolumeReplicasWritable(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, locations []wdclient.Location, writable, persist bool) error {
for _, location := range locations { for _, location := range locations {
fmt.Printf("markVolumeReadonly %d on %s ...\n", volumeId, location.Url)
if err := markVolumeWritable(grpcDialOption, volumeId, location.ServerAddress(), writable, persist); err != nil {
if err := markVolumeReplicaWritable(grpcDialOption, volumeId, location, writable, persist); err != nil {
return err return err
} }
} }

Loading…
Cancel
Save