Browse Source

fix(filer): limit concurrent proxy reads per volume server (#8608)

* fix(filer): limit concurrent proxy reads per volume server

Add a per-volume-server semaphore (default 16) to proxyToVolumeServer
to prevent replication bursts from overwhelming individual volume
servers with hundreds of concurrent connections, which causes them
to drop connections with "unexpected EOF".

Excess requests queue up and respect the client's context, returning
503 if the client disconnects while waiting.

Also log io.CopyBuffer errors that were previously silently discarded.

* Apply suggestion from @gemini-code-assist[bot]

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>

* fix(filer): use non-blocking release for proxy semaphore

Prevents a goroutine from blocking forever if releaseProxySemaphore
is ever called without a matching acquire.

* test(filer): clean up proxySemaphores entries in all proxy tests

---------

Co-authored-by: Copilot <copilot@github.com>
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
pull/8612/head
Chris Lu 2 days ago
committed by GitHub
parent
commit
92a76fc1a2
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 49
      weed/server/filer_server_handlers_proxy.go
  2. 102
      weed/server/filer_server_handlers_proxy_test.go

49
weed/server/filer_server_handlers_proxy.go

@ -1,6 +1,9 @@
package weed_server
import (
"context"
"sync"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/security"
util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
@ -12,6 +15,16 @@ import (
"net/http"
)
// proxyReadConcurrencyPerVolumeServer limits how many concurrent proxy read
// requests the filer will issue to any single volume server. Without this,
// replication bursts can open hundreds of connections to one volume server,
// causing it to drop connections with "unexpected EOF".
const proxyReadConcurrencyPerVolumeServer = 16
var (
proxySemaphores sync.Map // host -> chan struct{}
)
func (fs *FilerServer) maybeAddVolumeJwtAuthorization(r *http.Request, fileId string, isWrite bool) {
encodedJwt := fs.maybeGetVolumeJwtAuthorizationToken(fileId, isWrite)
@ -32,6 +45,29 @@ func (fs *FilerServer) maybeGetVolumeJwtAuthorizationToken(fileId string, isWrit
return string(encodedJwt)
}
func acquireProxySemaphore(ctx context.Context, host string) error {
v, _ := proxySemaphores.LoadOrStore(host, make(chan struct{}, proxyReadConcurrencyPerVolumeServer))
sem := v.(chan struct{})
select {
case sem <- struct{}{}:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
func releaseProxySemaphore(host string) {
v, ok := proxySemaphores.Load(host)
if !ok {
return
}
select {
case <-v.(chan struct{}):
default:
glog.Warningf("proxy semaphore for %s was already empty on release", host)
}
}
func (fs *FilerServer) proxyToVolumeServer(w http.ResponseWriter, r *http.Request, fileId string) {
ctx := r.Context()
urlStrings, err := fs.filer.MasterClient.GetLookupFileIdFunction()(ctx, fileId)
@ -53,6 +89,15 @@ func (fs *FilerServer) proxyToVolumeServer(w http.ResponseWriter, r *http.Reques
return
}
// Limit concurrent requests per volume server to prevent overload
volumeHost := proxyReq.URL.Host
if err := acquireProxySemaphore(ctx, volumeHost); err != nil {
glog.V(0).InfofCtx(ctx, "proxy to %s cancelled while waiting: %v", volumeHost, err)
w.WriteHeader(http.StatusServiceUnavailable)
return
}
defer releaseProxySemaphore(volumeHost)
proxyReq.Header.Set("Host", r.Host)
proxyReq.Header.Set("X-Forwarded-For", r.RemoteAddr)
request_id.InjectToRequest(ctx, proxyReq)
@ -79,6 +124,8 @@ func (fs *FilerServer) proxyToVolumeServer(w http.ResponseWriter, r *http.Reques
buf := mem.Allocate(128 * 1024)
defer mem.Free(buf)
io.CopyBuffer(w, proxyResponse.Body, buf)
if _, copyErr := io.CopyBuffer(w, proxyResponse.Body, buf); copyErr != nil {
glog.V(0).InfofCtx(ctx, "proxy copy %s: %v", fileId, copyErr)
}
}

102
weed/server/filer_server_handlers_proxy_test.go

@ -0,0 +1,102 @@
package weed_server
import (
"context"
"sync"
"sync/atomic"
"testing"
"time"
)
func TestProxySemaphore_LimitsConcurrency(t *testing.T) {
host := "test-volume:8080"
defer proxySemaphores.Delete(host)
var running atomic.Int32
var maxSeen atomic.Int32
var wg sync.WaitGroup
// Launch more goroutines than the semaphore allows
total := proxyReadConcurrencyPerVolumeServer * 3
for i := 0; i < total; i++ {
wg.Add(1)
go func() {
defer wg.Done()
if err := acquireProxySemaphore(context.Background(), host); err != nil {
t.Errorf("acquire: %v", err)
return
}
defer releaseProxySemaphore(host)
cur := running.Add(1)
// Track peak concurrency
for {
old := maxSeen.Load()
if cur <= old || maxSeen.CompareAndSwap(old, cur) {
break
}
}
time.Sleep(time.Millisecond)
running.Add(-1)
}()
}
wg.Wait()
peak := maxSeen.Load()
if peak > int32(proxyReadConcurrencyPerVolumeServer) {
t.Fatalf("peak concurrency %d exceeded limit %d", peak, proxyReadConcurrencyPerVolumeServer)
}
if peak == 0 {
t.Fatal("no goroutines ran")
}
}
func TestProxySemaphore_ContextCancellation(t *testing.T) {
host := "test-cancel:8080"
defer proxySemaphores.Delete(host)
// Fill the semaphore
for i := 0; i < proxyReadConcurrencyPerVolumeServer; i++ {
if err := acquireProxySemaphore(context.Background(), host); err != nil {
t.Fatalf("fill acquire: %v", err)
}
}
// Try to acquire with a cancelled context
ctx, cancel := context.WithCancel(context.Background())
cancel()
err := acquireProxySemaphore(ctx, host)
if err == nil {
t.Fatal("expected error from cancelled context")
}
// Clean up
for i := 0; i < proxyReadConcurrencyPerVolumeServer; i++ {
releaseProxySemaphore(host)
}
}
func TestProxySemaphore_PerHostIsolation(t *testing.T) {
hostA := "volume-a:8080"
hostB := "volume-b:8080"
defer proxySemaphores.Delete(hostA)
defer proxySemaphores.Delete(hostB)
// Fill hostA's semaphore
for i := 0; i < proxyReadConcurrencyPerVolumeServer; i++ {
if err := acquireProxySemaphore(context.Background(), hostA); err != nil {
t.Fatalf("fill hostA: %v", err)
}
}
// hostB should still be acquirable
if err := acquireProxySemaphore(context.Background(), hostB); err != nil {
t.Fatalf("hostB should not be blocked by hostA: %v", err)
}
releaseProxySemaphore(hostB)
// Clean up hostA
for i := 0; i < proxyReadConcurrencyPerVolumeServer; i++ {
releaseProxySemaphore(hostA)
}
}
Loading…
Cancel
Save