diff --git a/weed/topology/capacity_reservation_test.go b/weed/topology/capacity_reservation_test.go index bbe41114b..38cb14c50 100644 --- a/weed/topology/capacity_reservation_test.go +++ b/weed/topology/capacity_reservation_test.go @@ -1,6 +1,7 @@ package topology import ( + "sync" "testing" "time" @@ -166,24 +167,25 @@ func TestNodeImpl_ConcurrentReservations(t *testing.T) { diskUsage.maxVolumeCount = 10 diskUsage.volumeCount = 0 // 10 volumes free initially - // Test multiple concurrent reservations - var reservationIds []string - for i := 0; i < 10; i++ { - option := &VolumeGrowOption{DiskType: diskType} - availableBefore := dn.AvailableSpaceForReservation(option) - t.Logf("Iteration %d: Available before reservation: %d", i, availableBefore) + // Test concurrent reservations using goroutines + var wg sync.WaitGroup + var reservationIds sync.Map + concurrentRequests := 10 + wg.Add(concurrentRequests) - if reservationId, success := dn.TryReserveCapacity(diskType, 1); success { - reservationIds = append(reservationIds, reservationId) - availableAfter := dn.AvailableSpaceForReservation(option) - t.Logf("Iteration %d: Successfully reserved %s, available after: %d", i, reservationId, availableAfter) - } else { - t.Errorf("Expected successful reservation %d (available: %d)", i+1, availableBefore) - break - } + for i := 0; i < concurrentRequests; i++ { + go func(i int) { + defer wg.Done() + if reservationId, success := dn.TryReserveCapacity(diskType, 1); success { + reservationIds.Store(reservationId, true) + t.Logf("goroutine %d: Successfully reserved %s", i, reservationId) + } else { + t.Errorf("goroutine %d: Expected successful reservation", i) + } + }(i) } - t.Logf("Total reservations made: %d", len(reservationIds)) + wg.Wait() // Should have no more capacity option := &VolumeGrowOption{DiskType: diskType} @@ -201,9 +203,10 @@ func TestNodeImpl_ConcurrentReservations(t *testing.T) { } // Release all reservations - for _, id := range reservationIds { - dn.ReleaseReservedCapacity(id) - } + reservationIds.Range(func(key, value interface{}) bool { + dn.ReleaseReservedCapacity(key.(string)) + return true + }) // Should have full capacity back if available := dn.AvailableSpaceForReservation(option); available != 10 { diff --git a/weed/topology/node.go b/weed/topology/node.go index d9a59bc81..3e22e03fb 100644 --- a/weed/topology/node.go +++ b/weed/topology/node.go @@ -29,12 +29,14 @@ type CapacityReservation struct { // CapacityReservations manages capacity reservations for a node type CapacityReservations struct { sync.RWMutex - reservations map[string]*CapacityReservation + reservations map[string]*CapacityReservation + reservedCounts map[types.DiskType]int64 } func newCapacityReservations() *CapacityReservations { return &CapacityReservations{ - reservations: make(map[string]*CapacityReservation), + reservations: make(map[string]*CapacityReservation), + reservedCounts: make(map[types.DiskType]int64), } } @@ -49,6 +51,7 @@ func (cr *CapacityReservations) addReservation(diskType types.DiskType, count in count: count, createdAt: time.Now(), } + cr.reservedCounts[diskType] += count return reservationId } @@ -56,8 +59,13 @@ func (cr *CapacityReservations) removeReservation(reservationId string) bool { cr.Lock() defer cr.Unlock() - if _, exists := cr.reservations[reservationId]; exists { + if reservation, exists := cr.reservations[reservationId]; exists { delete(cr.reservations, reservationId) + cr.reservedCounts[reservation.diskType] -= reservation.count + // Clean up zero counts to prevent map growth + if cr.reservedCounts[reservation.diskType] <= 0 { + delete(cr.reservedCounts, reservation.diskType) + } return true } return false @@ -67,13 +75,7 @@ func (cr *CapacityReservations) getReservedCount(diskType types.DiskType) int64 cr.RLock() defer cr.RUnlock() - var total int64 - for _, reservation := range cr.reservations { - if reservation.diskType == diskType { - total += reservation.count - } - } - return total + return cr.reservedCounts[diskType] } func (cr *CapacityReservations) cleanExpiredReservations(expirationDuration time.Duration) { @@ -84,6 +86,11 @@ func (cr *CapacityReservations) cleanExpiredReservations(expirationDuration time for id, reservation := range cr.reservations { if now.Sub(reservation.createdAt) > expirationDuration { delete(cr.reservations, id) + cr.reservedCounts[reservation.diskType] -= reservation.count + // Clean up zero counts to prevent map growth + if cr.reservedCounts[reservation.diskType] <= 0 { + delete(cr.reservedCounts, reservation.diskType) + } glog.V(1).Infof("Cleaned up expired capacity reservation: %s", id) } } @@ -257,37 +264,16 @@ func (n *NodeImpl) AvailableSpaceForReservation(option *VolumeGrowOption) int64 func (n *NodeImpl) TryReserveCapacity(diskType types.DiskType, count int64) (reservationId string, success bool) { const reservationTimeout = 5 * time.Minute - n.capacityReservations.Lock() - defer n.capacityReservations.Unlock() - // Clean up any expired reservations first - now := time.Now() - for id, reservation := range n.capacityReservations.reservations { - if now.Sub(reservation.createdAt) > reservationTimeout { - delete(n.capacityReservations.reservations, id) - glog.V(1).Infof("Cleaned up expired capacity reservation: %s", id) - } - } + n.capacityReservations.cleanExpiredReservations(reservationTimeout) - // Check for available space + // Check for available space using the optimized method option := &VolumeGrowOption{DiskType: diskType} - var reservedCount int64 - for _, reservation := range n.capacityReservations.reservations { - if reservation.diskType == diskType { - reservedCount += reservation.count - } - } - availableSpace := n.AvailableSpaceFor(option) - reservedCount + availableSpace := n.AvailableSpaceForReservation(option) if availableSpace >= count { - // Add new reservation - reservationId = fmt.Sprintf("%s-%d-%d-%d", diskType, count, time.Now().UnixNano(), rand.Int64()) - n.capacityReservations.reservations[reservationId] = &CapacityReservation{ - reservationId: reservationId, - diskType: diskType, - count: count, - createdAt: time.Now(), - } + // Add new reservation using the optimized method + reservationId = n.capacityReservations.addReservation(diskType, count) glog.V(1).Infof("Reserved %d capacity for diskType %s on node %s: %s", count, diskType, n.Id(), reservationId) return reservationId, true }