Browse Source

master: delete partially created replicated volumes during volume growth

fix https://github.com/seaweedfs/seaweedfs/discussions/3792#discussioncomment-3973120
pull/3918/head
chrislu 2 years ago
parent
commit
1e0d64c048
  1. 12
      weed/topology/allocate_volume.go
  2. 30
      weed/topology/volume_growth.go

12
weed/topology/allocate_volume.go

@ -30,3 +30,15 @@ func AllocateVolume(dn *DataNode, grpcDialOption grpc.DialOption, vid needle.Vol
}) })
} }
func DeleteVolume(dn *DataNode, grpcDialOption grpc.DialOption, vid needle.VolumeId) error {
return operation.WithVolumeServerClient(false, dn.ServerAddress(), grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
_, allocateErr := client.VolumeDelete(context.Background(), &volume_server_pb.VolumeDeleteRequest{
VolumeId: uint32(vid),
})
return allocateErr
})
}

30
weed/topology/volume_growth.go

@ -228,10 +228,11 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum
return return
} }
func (vg *VolumeGrowth) grow(grpcDialOption grpc.DialOption, topo *Topology, vid needle.VolumeId, option *VolumeGrowOption, servers ...*DataNode) error {
func (vg *VolumeGrowth) grow(grpcDialOption grpc.DialOption, topo *Topology, vid needle.VolumeId, option *VolumeGrowOption, servers ...*DataNode) (growErr error) {
var createdVolumes []storage.VolumeInfo
for _, server := range servers { for _, server := range servers {
if err := AllocateVolume(server, grpcDialOption, vid, option); err == nil { if err := AllocateVolume(server, grpcDialOption, vid, option); err == nil {
vi := storage.VolumeInfo{
createdVolumes = append(createdVolumes, storage.VolumeInfo{
Id: vid, Id: vid,
Size: 0, Size: 0,
Collection: option.Collection, Collection: option.Collection,
@ -239,14 +240,31 @@ func (vg *VolumeGrowth) grow(grpcDialOption grpc.DialOption, topo *Topology, vid
Ttl: option.Ttl, Ttl: option.Ttl,
Version: needle.CurrentVersion, Version: needle.CurrentVersion,
DiskType: option.DiskType.String(), DiskType: option.DiskType.String(),
})
glog.V(0).Infof("Created Volume %d on %s", vid, server.NodeImpl.String())
} else {
glog.Warningf("Failed to assign volume %d on %s: %v", vid, server.NodeImpl.String(), err)
growErr = fmt.Errorf("failed to assign volume %d on %s: %v", vid, server.NodeImpl.String(), err)
break
} }
}
if growErr == nil {
for i, vi := range createdVolumes {
server := servers[i]
server.AddOrUpdateVolume(vi) server.AddOrUpdateVolume(vi)
topo.RegisterVolumeLayout(vi, server) topo.RegisterVolumeLayout(vi, server)
glog.V(0).Infoln("Created Volume", vid, "on", server.NodeImpl.String())
glog.V(0).Infof("Registered Volume %d on %s", vid, server.NodeImpl.String())
}
} else { } else {
glog.V(0).Infoln("Failed to assign volume", vid, "to", servers, "error", err)
return fmt.Errorf("Failed to assign %d: %v", vid, err)
// cleaning up created volume replicas
for i, vi := range createdVolumes {
server := servers[i]
if err := DeleteVolume(server, grpcDialOption, vi.Id); err != nil {
glog.Warningf("Failed to clean up volume %d on %s", vid, server.NodeImpl.String())
} }
} }
return nil
}
return growErr
} }
Loading…
Cancel
Save