Browse Source

Parallelize volume replica operations within `ec.encode`. (#6374)

pull/6377/head
Lisandro Pin 2 weeks ago
committed by GitHub
parent
commit
44c48c929a
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 21
      weed/shell/command_ec_encode.go
  2. 8
      weed/shell/command_volume_move.go

21
weed/shell/command_ec_encode.go

@ -119,7 +119,7 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr
// encode all requested volumes... // encode all requested volumes...
for _, vid := range volumeIds { for _, vid := range volumeIds {
if err = doEcEncode(commandEnv, *collection, vid); err != nil { if err = doEcEncode(commandEnv, *collection, vid, *parallelize); err != nil {
return fmt.Errorf("ec encode for volume %d: %v", vid, err) return fmt.Errorf("ec encode for volume %d: %v", vid, err)
} }
} }
@ -131,7 +131,7 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr
return nil return nil
} }
func doEcEncode(commandEnv *CommandEnv, collection string, vid needle.VolumeId) error { func doEcEncode(commandEnv *CommandEnv, collection string, vid needle.VolumeId, parallelize bool) error {
if !commandEnv.isLocked() { if !commandEnv.isLocked() {
return fmt.Errorf("lock is lost") return fmt.Errorf("lock is lost")
} }
@ -142,11 +142,20 @@ func doEcEncode(commandEnv *CommandEnv, collection string, vid needle.VolumeId)
return fmt.Errorf("volume %d not found", vid) return fmt.Errorf("volume %d not found", vid)
} }
// fmt.Printf("found ec %d shards on %v\n", vid, locations)
// mark the volume as readonly // mark the volume as readonly
if err := markVolumeReplicasWritable(commandEnv.option.GrpcDialOption, vid, locations, false, false); err != nil { ewg := ErrorWaitGroup{
return fmt.Errorf("mark volume %d as readonly on %s: %v", vid, locations[0].Url, err) parallelize: parallelize,
}
for _, location := range locations {
ewg.Add(func() error {
if err := markVolumeReplicaWritable(commandEnv.option.GrpcDialOption, vid, location, false, false); err != nil {
return fmt.Errorf("mark volume %d as readonly on %s: %v", vid, location.Url, err)
}
return nil
})
}
if err := ewg.Wait(); err != nil {
return err
} }
// generate ec shards // generate ec shards

8
weed/shell/command_volume_move.go

@ -227,10 +227,14 @@ func markVolumeWritable(grpcDialOption grpc.DialOption, volumeId needle.VolumeId
}) })
} }
func markVolumeReplicaWritable(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, location wdclient.Location, writable, persist bool) error {
fmt.Printf("markVolumeReadonly %d on %s ...\n", volumeId, location.Url)
return markVolumeWritable(grpcDialOption, volumeId, location.ServerAddress(), writable, persist)
}
func markVolumeReplicasWritable(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, locations []wdclient.Location, writable, persist bool) error { func markVolumeReplicasWritable(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, locations []wdclient.Location, writable, persist bool) error {
for _, location := range locations { for _, location := range locations {
fmt.Printf("markVolumeReadonly %d on %s ...\n", volumeId, location.Url) if err := markVolumeReplicaWritable(grpcDialOption, volumeId, location, writable, persist); err != nil {
if err := markVolumeWritable(grpcDialOption, volumeId, location.ServerAddress(), writable, persist); err != nil {
return err return err
} }
} }

|||||||
100:0
Loading…
Cancel
Save