diff --git a/weed/topology/node.go b/weed/topology/node.go index 3e22e03fb..a8edb828a 100644 --- a/weed/topology/node.go +++ b/weed/topology/node.go @@ -61,11 +61,7 @@ func (cr *CapacityReservations) removeReservation(reservationId string) bool { if reservation, exists := cr.reservations[reservationId]; exists { delete(cr.reservations, reservationId) - cr.reservedCounts[reservation.diskType] -= reservation.count - // Clean up zero counts to prevent map growth - if cr.reservedCounts[reservation.diskType] <= 0 { - delete(cr.reservedCounts, reservation.diskType) - } + cr.decrementCount(reservation.diskType, reservation.count) return true } return false @@ -78,6 +74,40 @@ func (cr *CapacityReservations) getReservedCount(diskType types.DiskType) int64 return cr.reservedCounts[diskType] } +// decrementCount is a helper to decrement reserved count and clean up zero entries +func (cr *CapacityReservations) decrementCount(diskType types.DiskType, count int64) { + cr.reservedCounts[diskType] -= count + // Clean up zero counts to prevent map growth + if cr.reservedCounts[diskType] <= 0 { + delete(cr.reservedCounts, diskType) + } +} + +// tryReserveAtomic atomically checks available space and reserves if possible +func (cr *CapacityReservations) tryReserveAtomic(diskType types.DiskType, count int64, availableSpaceFunc func() int64) (reservationId string, success bool) { + cr.Lock() + defer cr.Unlock() + + // Check available space under lock + currentReserved := cr.reservedCounts[diskType] + availableSpace := availableSpaceFunc() - currentReserved + + if availableSpace >= count { + // Create and add reservation atomically + reservationId = fmt.Sprintf("%s-%d-%d-%d", diskType, count, time.Now().UnixNano(), rand.Int64()) + cr.reservations[reservationId] = &CapacityReservation{ + reservationId: reservationId, + diskType: diskType, + count: count, + createdAt: time.Now(), + } + cr.reservedCounts[diskType] += count + return reservationId, true + } + + return "", false +} + func (cr *CapacityReservations) cleanExpiredReservations(expirationDuration time.Duration) { cr.Lock() defer cr.Unlock() @@ -86,11 +116,7 @@ func (cr *CapacityReservations) cleanExpiredReservations(expirationDuration time for id, reservation := range cr.reservations { if now.Sub(reservation.createdAt) > expirationDuration { delete(cr.reservations, id) - cr.reservedCounts[reservation.diskType] -= reservation.count - // Clean up zero counts to prevent map growth - if cr.reservedCounts[reservation.diskType] <= 0 { - delete(cr.reservedCounts, reservation.diskType) - } + cr.decrementCount(reservation.diskType, reservation.count) glog.V(1).Infof("Cleaned up expired capacity reservation: %s", id) } } @@ -262,23 +288,22 @@ func (n *NodeImpl) AvailableSpaceForReservation(option *VolumeGrowOption) int64 // TryReserveCapacity attempts to atomically reserve capacity for volume creation func (n *NodeImpl) TryReserveCapacity(diskType types.DiskType, count int64) (reservationId string, success bool) { - const reservationTimeout = 5 * time.Minute + const reservationTimeout = 5 * time.Minute // TODO: make this configurable // Clean up any expired reservations first n.capacityReservations.cleanExpiredReservations(reservationTimeout) - // Check for available space using the optimized method + // Atomically check and reserve space option := &VolumeGrowOption{DiskType: diskType} - availableSpace := n.AvailableSpaceForReservation(option) + reservationId, success = n.capacityReservations.tryReserveAtomic(diskType, count, func() int64 { + return n.AvailableSpaceFor(option) + }) - if availableSpace >= count { - // Add new reservation using the optimized method - reservationId = n.capacityReservations.addReservation(diskType, count) + if success { glog.V(1).Infof("Reserved %d capacity for diskType %s on node %s: %s", count, diskType, n.Id(), reservationId) - return reservationId, true } - return "", false + return reservationId, success } // ReleaseReservedCapacity releases a previously reserved capacity diff --git a/weed/topology/volume_growth.go b/weed/topology/volume_growth.go index 8344168a0..2500d08d2 100644 --- a/weed/topology/volume_growth.go +++ b/weed/topology/volume_growth.go @@ -183,6 +183,13 @@ func (vg *VolumeGrowth) findAndGrow(grpcDialOption grpc.DialOption, topo *Topolo func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *VolumeGrowOption, useReservations bool) (servers []*DataNode, reservation *VolumeGrowReservation, err error) { //find main datacenter and other data centers rp := option.ReplicaPlacement + + // Ensure cleanup of partial reservations on error + defer func() { + if err != nil && reservation != nil { + reservation.releaseAllReservations() + } + }() mainDataCenter, otherDataCenters, dc_err := topo.PickNodesByWeight(rp.DiffDataCenterCount+1, option, func(node Node) error { if option.DataCenter != "" && node.IsDataCenter() && node.Id() != NodeId(option.DataCenter) { return fmt.Errorf("Not matching preferred data center:%s", option.DataCenter)