diff --git a/weed/server/filer_server_handlers_proxy.go b/weed/server/filer_server_handlers_proxy.go index ff060ec52..5780f2899 100644 --- a/weed/server/filer_server_handlers_proxy.go +++ b/weed/server/filer_server_handlers_proxy.go @@ -1,6 +1,9 @@ package weed_server import ( + "context" + "sync" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/security" util_http "github.com/seaweedfs/seaweedfs/weed/util/http" @@ -12,6 +15,16 @@ import ( "net/http" ) +// proxyReadConcurrencyPerVolumeServer limits how many concurrent proxy read +// requests the filer will issue to any single volume server. Without this, +// replication bursts can open hundreds of connections to one volume server, +// causing it to drop connections with "unexpected EOF". +const proxyReadConcurrencyPerVolumeServer = 16 + +var ( + proxySemaphores sync.Map // host -> chan struct{} +) + func (fs *FilerServer) maybeAddVolumeJwtAuthorization(r *http.Request, fileId string, isWrite bool) { encodedJwt := fs.maybeGetVolumeJwtAuthorizationToken(fileId, isWrite) @@ -32,6 +45,29 @@ func (fs *FilerServer) maybeGetVolumeJwtAuthorizationToken(fileId string, isWrit return string(encodedJwt) } +func acquireProxySemaphore(ctx context.Context, host string) error { + v, _ := proxySemaphores.LoadOrStore(host, make(chan struct{}, proxyReadConcurrencyPerVolumeServer)) + sem := v.(chan struct{}) + select { + case sem <- struct{}{}: + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + +func releaseProxySemaphore(host string) { + v, ok := proxySemaphores.Load(host) + if !ok { + return + } + select { + case <-v.(chan struct{}): + default: + glog.Warningf("proxy semaphore for %s was already empty on release", host) + } +} + func (fs *FilerServer) proxyToVolumeServer(w http.ResponseWriter, r *http.Request, fileId string) { ctx := r.Context() urlStrings, err := fs.filer.MasterClient.GetLookupFileIdFunction()(ctx, fileId) @@ -53,6 +89,15 @@ func (fs *FilerServer) proxyToVolumeServer(w http.ResponseWriter, r *http.Reques return } + // Limit concurrent requests per volume server to prevent overload + volumeHost := proxyReq.URL.Host + if err := acquireProxySemaphore(ctx, volumeHost); err != nil { + glog.V(0).InfofCtx(ctx, "proxy to %s cancelled while waiting: %v", volumeHost, err) + w.WriteHeader(http.StatusServiceUnavailable) + return + } + defer releaseProxySemaphore(volumeHost) + proxyReq.Header.Set("Host", r.Host) proxyReq.Header.Set("X-Forwarded-For", r.RemoteAddr) request_id.InjectToRequest(ctx, proxyReq) @@ -79,6 +124,8 @@ func (fs *FilerServer) proxyToVolumeServer(w http.ResponseWriter, r *http.Reques buf := mem.Allocate(128 * 1024) defer mem.Free(buf) - io.CopyBuffer(w, proxyResponse.Body, buf) + if _, copyErr := io.CopyBuffer(w, proxyResponse.Body, buf); copyErr != nil { + glog.V(0).InfofCtx(ctx, "proxy copy %s: %v", fileId, copyErr) + } } diff --git a/weed/server/filer_server_handlers_proxy_test.go b/weed/server/filer_server_handlers_proxy_test.go new file mode 100644 index 000000000..0bb4bfeb2 --- /dev/null +++ b/weed/server/filer_server_handlers_proxy_test.go @@ -0,0 +1,102 @@ +package weed_server + +import ( + "context" + "sync" + "sync/atomic" + "testing" + "time" +) + +func TestProxySemaphore_LimitsConcurrency(t *testing.T) { + host := "test-volume:8080" + defer proxySemaphores.Delete(host) + + var running atomic.Int32 + var maxSeen atomic.Int32 + var wg sync.WaitGroup + + // Launch more goroutines than the semaphore allows + total := proxyReadConcurrencyPerVolumeServer * 3 + for i := 0; i < total; i++ { + wg.Add(1) + go func() { + defer wg.Done() + if err := acquireProxySemaphore(context.Background(), host); err != nil { + t.Errorf("acquire: %v", err) + return + } + defer releaseProxySemaphore(host) + + cur := running.Add(1) + // Track peak concurrency + for { + old := maxSeen.Load() + if cur <= old || maxSeen.CompareAndSwap(old, cur) { + break + } + } + time.Sleep(time.Millisecond) + running.Add(-1) + }() + } + wg.Wait() + + peak := maxSeen.Load() + if peak > int32(proxyReadConcurrencyPerVolumeServer) { + t.Fatalf("peak concurrency %d exceeded limit %d", peak, proxyReadConcurrencyPerVolumeServer) + } + if peak == 0 { + t.Fatal("no goroutines ran") + } +} + +func TestProxySemaphore_ContextCancellation(t *testing.T) { + host := "test-cancel:8080" + defer proxySemaphores.Delete(host) + + // Fill the semaphore + for i := 0; i < proxyReadConcurrencyPerVolumeServer; i++ { + if err := acquireProxySemaphore(context.Background(), host); err != nil { + t.Fatalf("fill acquire: %v", err) + } + } + + // Try to acquire with a cancelled context + ctx, cancel := context.WithCancel(context.Background()) + cancel() + err := acquireProxySemaphore(ctx, host) + if err == nil { + t.Fatal("expected error from cancelled context") + } + + // Clean up + for i := 0; i < proxyReadConcurrencyPerVolumeServer; i++ { + releaseProxySemaphore(host) + } +} + +func TestProxySemaphore_PerHostIsolation(t *testing.T) { + hostA := "volume-a:8080" + hostB := "volume-b:8080" + defer proxySemaphores.Delete(hostA) + defer proxySemaphores.Delete(hostB) + + // Fill hostA's semaphore + for i := 0; i < proxyReadConcurrencyPerVolumeServer; i++ { + if err := acquireProxySemaphore(context.Background(), hostA); err != nil { + t.Fatalf("fill hostA: %v", err) + } + } + + // hostB should still be acquirable + if err := acquireProxySemaphore(context.Background(), hostB); err != nil { + t.Fatalf("hostB should not be blocked by hostA: %v", err) + } + releaseProxySemaphore(hostB) + + // Clean up hostA + for i := 0; i < proxyReadConcurrencyPerVolumeServer; i++ { + releaseProxySemaphore(hostA) + } +}