You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

102 lines
2.4 KiB

package weed_server
import (
"context"
"sync"
"sync/atomic"
"testing"
"time"
)
func TestProxySemaphore_LimitsConcurrency(t *testing.T) {
host := "test-volume:8080"
defer proxySemaphores.Delete(host)
var running atomic.Int32
var maxSeen atomic.Int32
var wg sync.WaitGroup
// Launch more goroutines than the semaphore allows
total := proxyReadConcurrencyPerVolumeServer * 3
for i := 0; i < total; i++ {
wg.Add(1)
go func() {
defer wg.Done()
if err := acquireProxySemaphore(context.Background(), host); err != nil {
t.Errorf("acquire: %v", err)
return
}
defer releaseProxySemaphore(host)
cur := running.Add(1)
// Track peak concurrency
for {
old := maxSeen.Load()
if cur <= old || maxSeen.CompareAndSwap(old, cur) {
break
}
}
time.Sleep(time.Millisecond)
running.Add(-1)
}()
}
wg.Wait()
peak := maxSeen.Load()
if peak > int32(proxyReadConcurrencyPerVolumeServer) {
t.Fatalf("peak concurrency %d exceeded limit %d", peak, proxyReadConcurrencyPerVolumeServer)
}
if peak == 0 {
t.Fatal("no goroutines ran")
}
}
func TestProxySemaphore_ContextCancellation(t *testing.T) {
host := "test-cancel:8080"
defer proxySemaphores.Delete(host)
// Fill the semaphore
for i := 0; i < proxyReadConcurrencyPerVolumeServer; i++ {
if err := acquireProxySemaphore(context.Background(), host); err != nil {
t.Fatalf("fill acquire: %v", err)
}
}
// Try to acquire with a cancelled context
ctx, cancel := context.WithCancel(context.Background())
cancel()
err := acquireProxySemaphore(ctx, host)
if err == nil {
t.Fatal("expected error from cancelled context")
}
// Clean up
for i := 0; i < proxyReadConcurrencyPerVolumeServer; i++ {
releaseProxySemaphore(host)
}
}
func TestProxySemaphore_PerHostIsolation(t *testing.T) {
hostA := "volume-a:8080"
hostB := "volume-b:8080"
defer proxySemaphores.Delete(hostA)
defer proxySemaphores.Delete(hostB)
// Fill hostA's semaphore
for i := 0; i < proxyReadConcurrencyPerVolumeServer; i++ {
if err := acquireProxySemaphore(context.Background(), hostA); err != nil {
t.Fatalf("fill hostA: %v", err)
}
}
// hostB should still be acquirable
if err := acquireProxySemaphore(context.Background(), hostB); err != nil {
t.Fatalf("hostB should not be blocked by hostA: %v", err)
}
releaseProxySemaphore(hostB)
// Clean up hostA
for i := 0; i < proxyReadConcurrencyPerVolumeServer; i++ {
releaseProxySemaphore(hostA)
}
}