Browse Source

address comments

pull/7159/head
chrislu 1 month ago
parent
commit
4773ed353c
  1. 39
      weed/topology/capacity_reservation_test.go
  2. 58
      weed/topology/node.go

39
weed/topology/capacity_reservation_test.go

@ -1,6 +1,7 @@
package topology
import (
"sync"
"testing"
"time"
@ -166,24 +167,25 @@ func TestNodeImpl_ConcurrentReservations(t *testing.T) {
diskUsage.maxVolumeCount = 10
diskUsage.volumeCount = 0 // 10 volumes free initially
// Test multiple concurrent reservations
var reservationIds []string
for i := 0; i < 10; i++ {
option := &VolumeGrowOption{DiskType: diskType}
availableBefore := dn.AvailableSpaceForReservation(option)
t.Logf("Iteration %d: Available before reservation: %d", i, availableBefore)
// Test concurrent reservations using goroutines
var wg sync.WaitGroup
var reservationIds sync.Map
concurrentRequests := 10
wg.Add(concurrentRequests)
if reservationId, success := dn.TryReserveCapacity(diskType, 1); success {
reservationIds = append(reservationIds, reservationId)
availableAfter := dn.AvailableSpaceForReservation(option)
t.Logf("Iteration %d: Successfully reserved %s, available after: %d", i, reservationId, availableAfter)
} else {
t.Errorf("Expected successful reservation %d (available: %d)", i+1, availableBefore)
break
}
for i := 0; i < concurrentRequests; i++ {
go func(i int) {
defer wg.Done()
if reservationId, success := dn.TryReserveCapacity(diskType, 1); success {
reservationIds.Store(reservationId, true)
t.Logf("goroutine %d: Successfully reserved %s", i, reservationId)
} else {
t.Errorf("goroutine %d: Expected successful reservation", i)
}
}(i)
}
t.Logf("Total reservations made: %d", len(reservationIds))
wg.Wait()
// Should have no more capacity
option := &VolumeGrowOption{DiskType: diskType}
@ -201,9 +203,10 @@ func TestNodeImpl_ConcurrentReservations(t *testing.T) {
}
// Release all reservations
for _, id := range reservationIds {
dn.ReleaseReservedCapacity(id)
}
reservationIds.Range(func(key, value interface{}) bool {
dn.ReleaseReservedCapacity(key.(string))
return true
})
// Should have full capacity back
if available := dn.AvailableSpaceForReservation(option); available != 10 {

58
weed/topology/node.go

@ -29,12 +29,14 @@ type CapacityReservation struct {
// CapacityReservations manages capacity reservations for a node
type CapacityReservations struct {
sync.RWMutex
reservations map[string]*CapacityReservation
reservations map[string]*CapacityReservation
reservedCounts map[types.DiskType]int64
}
func newCapacityReservations() *CapacityReservations {
return &CapacityReservations{
reservations: make(map[string]*CapacityReservation),
reservations: make(map[string]*CapacityReservation),
reservedCounts: make(map[types.DiskType]int64),
}
}
@ -49,6 +51,7 @@ func (cr *CapacityReservations) addReservation(diskType types.DiskType, count in
count: count,
createdAt: time.Now(),
}
cr.reservedCounts[diskType] += count
return reservationId
}
@ -56,8 +59,13 @@ func (cr *CapacityReservations) removeReservation(reservationId string) bool {
cr.Lock()
defer cr.Unlock()
if _, exists := cr.reservations[reservationId]; exists {
if reservation, exists := cr.reservations[reservationId]; exists {
delete(cr.reservations, reservationId)
cr.reservedCounts[reservation.diskType] -= reservation.count
// Clean up zero counts to prevent map growth
if cr.reservedCounts[reservation.diskType] <= 0 {
delete(cr.reservedCounts, reservation.diskType)
}
return true
}
return false
@ -67,13 +75,7 @@ func (cr *CapacityReservations) getReservedCount(diskType types.DiskType) int64
cr.RLock()
defer cr.RUnlock()
var total int64
for _, reservation := range cr.reservations {
if reservation.diskType == diskType {
total += reservation.count
}
}
return total
return cr.reservedCounts[diskType]
}
func (cr *CapacityReservations) cleanExpiredReservations(expirationDuration time.Duration) {
@ -84,6 +86,11 @@ func (cr *CapacityReservations) cleanExpiredReservations(expirationDuration time
for id, reservation := range cr.reservations {
if now.Sub(reservation.createdAt) > expirationDuration {
delete(cr.reservations, id)
cr.reservedCounts[reservation.diskType] -= reservation.count
// Clean up zero counts to prevent map growth
if cr.reservedCounts[reservation.diskType] <= 0 {
delete(cr.reservedCounts, reservation.diskType)
}
glog.V(1).Infof("Cleaned up expired capacity reservation: %s", id)
}
}
@ -257,37 +264,16 @@ func (n *NodeImpl) AvailableSpaceForReservation(option *VolumeGrowOption) int64
func (n *NodeImpl) TryReserveCapacity(diskType types.DiskType, count int64) (reservationId string, success bool) {
const reservationTimeout = 5 * time.Minute
n.capacityReservations.Lock()
defer n.capacityReservations.Unlock()
// Clean up any expired reservations first
now := time.Now()
for id, reservation := range n.capacityReservations.reservations {
if now.Sub(reservation.createdAt) > reservationTimeout {
delete(n.capacityReservations.reservations, id)
glog.V(1).Infof("Cleaned up expired capacity reservation: %s", id)
}
}
n.capacityReservations.cleanExpiredReservations(reservationTimeout)
// Check for available space
// Check for available space using the optimized method
option := &VolumeGrowOption{DiskType: diskType}
var reservedCount int64
for _, reservation := range n.capacityReservations.reservations {
if reservation.diskType == diskType {
reservedCount += reservation.count
}
}
availableSpace := n.AvailableSpaceFor(option) - reservedCount
availableSpace := n.AvailableSpaceForReservation(option)
if availableSpace >= count {
// Add new reservation
reservationId = fmt.Sprintf("%s-%d-%d-%d", diskType, count, time.Now().UnixNano(), rand.Int64())
n.capacityReservations.reservations[reservationId] = &CapacityReservation{
reservationId: reservationId,
diskType: diskType,
count: count,
createdAt: time.Now(),
}
// Add new reservation using the optimized method
reservationId = n.capacityReservations.addReservation(diskType, count)
glog.V(1).Infof("Reserved %d capacity for diskType %s on node %s: %s", count, diskType, n.Id(), reservationId)
return reservationId, true
}

Loading…
Cancel
Save