Browse Source

atomic

pull/7159/head
chrislu 1 month ago
parent
commit
8b3e480780
  1. 61
      weed/topology/node.go
  2. 7
      weed/topology/volume_growth.go

61
weed/topology/node.go

@ -61,11 +61,7 @@ func (cr *CapacityReservations) removeReservation(reservationId string) bool {
if reservation, exists := cr.reservations[reservationId]; exists {
delete(cr.reservations, reservationId)
cr.reservedCounts[reservation.diskType] -= reservation.count
// Clean up zero counts to prevent map growth
if cr.reservedCounts[reservation.diskType] <= 0 {
delete(cr.reservedCounts, reservation.diskType)
}
cr.decrementCount(reservation.diskType, reservation.count)
return true
}
return false
@ -78,6 +74,40 @@ func (cr *CapacityReservations) getReservedCount(diskType types.DiskType) int64
return cr.reservedCounts[diskType]
}
// decrementCount is a helper to decrement reserved count and clean up zero entries
func (cr *CapacityReservations) decrementCount(diskType types.DiskType, count int64) {
cr.reservedCounts[diskType] -= count
// Clean up zero counts to prevent map growth
if cr.reservedCounts[diskType] <= 0 {
delete(cr.reservedCounts, diskType)
}
}
// tryReserveAtomic atomically checks available space and reserves if possible
func (cr *CapacityReservations) tryReserveAtomic(diskType types.DiskType, count int64, availableSpaceFunc func() int64) (reservationId string, success bool) {
cr.Lock()
defer cr.Unlock()
// Check available space under lock
currentReserved := cr.reservedCounts[diskType]
availableSpace := availableSpaceFunc() - currentReserved
if availableSpace >= count {
// Create and add reservation atomically
reservationId = fmt.Sprintf("%s-%d-%d-%d", diskType, count, time.Now().UnixNano(), rand.Int64())
cr.reservations[reservationId] = &CapacityReservation{
reservationId: reservationId,
diskType: diskType,
count: count,
createdAt: time.Now(),
}
cr.reservedCounts[diskType] += count
return reservationId, true
}
return "", false
}
func (cr *CapacityReservations) cleanExpiredReservations(expirationDuration time.Duration) {
cr.Lock()
defer cr.Unlock()
@ -86,11 +116,7 @@ func (cr *CapacityReservations) cleanExpiredReservations(expirationDuration time
for id, reservation := range cr.reservations {
if now.Sub(reservation.createdAt) > expirationDuration {
delete(cr.reservations, id)
cr.reservedCounts[reservation.diskType] -= reservation.count
// Clean up zero counts to prevent map growth
if cr.reservedCounts[reservation.diskType] <= 0 {
delete(cr.reservedCounts, reservation.diskType)
}
cr.decrementCount(reservation.diskType, reservation.count)
glog.V(1).Infof("Cleaned up expired capacity reservation: %s", id)
}
}
@ -262,23 +288,22 @@ func (n *NodeImpl) AvailableSpaceForReservation(option *VolumeGrowOption) int64
// TryReserveCapacity attempts to atomically reserve capacity for volume creation
func (n *NodeImpl) TryReserveCapacity(diskType types.DiskType, count int64) (reservationId string, success bool) {
const reservationTimeout = 5 * time.Minute
const reservationTimeout = 5 * time.Minute // TODO: make this configurable
// Clean up any expired reservations first
n.capacityReservations.cleanExpiredReservations(reservationTimeout)
// Check for available space using the optimized method
// Atomically check and reserve space
option := &VolumeGrowOption{DiskType: diskType}
availableSpace := n.AvailableSpaceForReservation(option)
reservationId, success = n.capacityReservations.tryReserveAtomic(diskType, count, func() int64 {
return n.AvailableSpaceFor(option)
})
if availableSpace >= count {
// Add new reservation using the optimized method
reservationId = n.capacityReservations.addReservation(diskType, count)
if success {
glog.V(1).Infof("Reserved %d capacity for diskType %s on node %s: %s", count, diskType, n.Id(), reservationId)
return reservationId, true
}
return "", false
return reservationId, success
}
// ReleaseReservedCapacity releases a previously reserved capacity

7
weed/topology/volume_growth.go

@ -183,6 +183,13 @@ func (vg *VolumeGrowth) findAndGrow(grpcDialOption grpc.DialOption, topo *Topolo
func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *VolumeGrowOption, useReservations bool) (servers []*DataNode, reservation *VolumeGrowReservation, err error) {
//find main datacenter and other data centers
rp := option.ReplicaPlacement
// Ensure cleanup of partial reservations on error
defer func() {
if err != nil && reservation != nil {
reservation.releaseAllReservations()
}
}()
mainDataCenter, otherDataCenters, dc_err := topo.PickNodesByWeight(rp.DiffDataCenterCount+1, option, func(node Node) error {
if option.DataCenter != "" && node.IsDataCenter() && node.Id() != NodeId(option.DataCenter) {
return fmt.Errorf("Not matching preferred data center:%s", option.DataCenter)

Loading…
Cancel
Save