|
|
@ -184,11 +184,22 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum |
|
|
|
//find main datacenter and other data centers
|
|
|
|
rp := option.ReplicaPlacement |
|
|
|
|
|
|
|
// Track tentative reservations to make the process atomic
|
|
|
|
var tentativeReservation *VolumeGrowReservation |
|
|
|
|
|
|
|
// Select appropriate functions based on useReservations flag
|
|
|
|
var availableSpaceFunc func(Node, *VolumeGrowOption) int64 |
|
|
|
var reserveOneVolumeFunc func(Node, int64, *VolumeGrowOption) (*DataNode, error) |
|
|
|
|
|
|
|
if useReservations { |
|
|
|
// Initialize tentative reservation tracking
|
|
|
|
tentativeReservation = &VolumeGrowReservation{ |
|
|
|
servers: make([]*DataNode, 0), |
|
|
|
reservationIds: make([]string, 0), |
|
|
|
diskType: option.DiskType, |
|
|
|
} |
|
|
|
|
|
|
|
// For reservations, we make actual reservations during node selection
|
|
|
|
availableSpaceFunc = func(node Node, option *VolumeGrowOption) int64 { |
|
|
|
return node.AvailableSpaceForReservation(option) |
|
|
|
} |
|
|
@ -206,8 +217,8 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum |
|
|
|
|
|
|
|
// Ensure cleanup of partial reservations on error
|
|
|
|
defer func() { |
|
|
|
if err != nil && reservation != nil { |
|
|
|
reservation.releaseAllReservations() |
|
|
|
if err != nil && tentativeReservation != nil { |
|
|
|
tentativeReservation.releaseAllReservations() |
|
|
|
} |
|
|
|
}() |
|
|
|
mainDataCenter, otherDataCenters, dc_err := topo.PickNodesByWeight(rp.DiffDataCenterCount+1, option, func(node Node) error { |
|
|
@ -273,7 +284,21 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum |
|
|
|
if option.DataNode != "" && node.IsDataNode() && node.Id() != NodeId(option.DataNode) { |
|
|
|
return fmt.Errorf("Not matching preferred data node:%s", option.DataNode) |
|
|
|
} |
|
|
|
if availableSpaceFunc(node, option) < 1 { |
|
|
|
|
|
|
|
if useReservations { |
|
|
|
// For reservations, atomically check and reserve capacity
|
|
|
|
if node.IsDataNode() { |
|
|
|
reservationId, success := node.TryReserveCapacity(option.DiskType, 1) |
|
|
|
if !success { |
|
|
|
return fmt.Errorf("Cannot reserve capacity on node %s", node.Id()) |
|
|
|
} |
|
|
|
// Track the reservation for later cleanup if needed
|
|
|
|
tentativeReservation.servers = append(tentativeReservation.servers, node.(*DataNode)) |
|
|
|
tentativeReservation.reservationIds = append(tentativeReservation.reservationIds, reservationId) |
|
|
|
} else if availableSpaceFunc(node, option) < 1 { |
|
|
|
return fmt.Errorf("Free:%d < Expected:%d", availableSpaceFunc(node, option), 1) |
|
|
|
} |
|
|
|
} else if availableSpaceFunc(node, option) < 1 { |
|
|
|
return fmt.Errorf("Free:%d < Expected:%d", availableSpaceFunc(node, option), 1) |
|
|
|
} |
|
|
|
return nil |
|
|
@ -290,6 +315,16 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum |
|
|
|
r := rand.Int64N(availableSpaceFunc(rack, option)) |
|
|
|
if server, e := reserveOneVolumeFunc(rack, r, option); e == nil { |
|
|
|
servers = append(servers, server) |
|
|
|
|
|
|
|
// If using reservations, also make a reservation on the selected server
|
|
|
|
if useReservations { |
|
|
|
reservationId, success := server.TryReserveCapacity(option.DiskType, 1) |
|
|
|
if !success { |
|
|
|
return servers, nil, fmt.Errorf("failed to reserve capacity on server %s from other rack", server.Id()) |
|
|
|
} |
|
|
|
tentativeReservation.servers = append(tentativeReservation.servers, server) |
|
|
|
tentativeReservation.reservationIds = append(tentativeReservation.reservationIds, reservationId) |
|
|
|
} |
|
|
|
} else { |
|
|
|
return servers, nil, e |
|
|
|
} |
|
|
@ -298,28 +333,24 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum |
|
|
|
r := rand.Int64N(availableSpaceFunc(datacenter, option)) |
|
|
|
if server, e := reserveOneVolumeFunc(datacenter, r, option); e == nil { |
|
|
|
servers = append(servers, server) |
|
|
|
} else { |
|
|
|
return servers, nil, e |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// If reservations are requested, try to reserve capacity on each server
|
|
|
|
// If using reservations, also make a reservation on the selected server
|
|
|
|
if useReservations { |
|
|
|
reservation = &VolumeGrowReservation{ |
|
|
|
servers: servers, |
|
|
|
reservationIds: make([]string, len(servers)), |
|
|
|
diskType: option.DiskType, |
|
|
|
} |
|
|
|
|
|
|
|
// Try to reserve capacity on each server
|
|
|
|
for i, server := range servers { |
|
|
|
reservationId, success := server.TryReserveCapacity(option.DiskType, 1) |
|
|
|
if !success { |
|
|
|
return servers, nil, fmt.Errorf("failed to reserve capacity on server %s", server.Id()) |
|
|
|
return servers, nil, fmt.Errorf("failed to reserve capacity on server %s from other datacenter", server.Id()) |
|
|
|
} |
|
|
|
tentativeReservation.servers = append(tentativeReservation.servers, server) |
|
|
|
tentativeReservation.reservationIds = append(tentativeReservation.reservationIds, reservationId) |
|
|
|
} |
|
|
|
} else { |
|
|
|
return servers, nil, e |
|
|
|
} |
|
|
|
reservation.reservationIds[i] = reservationId |
|
|
|
} |
|
|
|
|
|
|
|
// If reservations were made, return the tentative reservation
|
|
|
|
if useReservations && tentativeReservation != nil { |
|
|
|
reservation = tentativeReservation |
|
|
|
glog.V(1).Infof("Successfully reserved capacity on %d servers for volume creation", len(servers)) |
|
|
|
} |
|
|
|
|
|
|
|