Browse Source

Merge branch 'feature/add-concurrent-file-upload-limit' of https://github.com/seaweedfs/seaweedfs into feature/add-concurrent-file-upload-limit

pull/7554/head
Chris Lu 1 week ago
parent
commit
195fbd1e78
  1. 66
      weed/s3api/s3api_handlers.go
  2. 5
      weed/server/master_server_handlers_ui.go
  3. 2
      weed/server/volume_server_handlers_ui.go
  4. 84
      weed/wdclient/filer_client.go

66
weed/s3api/s3api_handlers.go

@ -5,6 +5,7 @@ import (
"fmt"
"net/http"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
"google.golang.org/grpc"
@ -15,7 +16,13 @@ import (
var _ = filer_pb.FilerClient(&S3ApiServer{})
func (s3a *S3ApiServer) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error {
// Use filerClient for proper connection management and failover
if s3a.filerClient != nil {
return s3a.withFilerClientFailover(streamingMode, fn)
}
// Fallback to direct connection if filerClient not initialized
// This should only happen during initialization or testing
return pb.WithGrpcClient(streamingMode, s3a.randomClientId, func(grpcConnection *grpc.ClientConn) error {
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
return fn(client)
@ -23,6 +30,63 @@ func (s3a *S3ApiServer) WithFilerClient(streamingMode bool, fn func(filer_pb.Sea
}
// withFilerClientFailover attempts to execute fn with automatic failover to other filers
func (s3a *S3ApiServer) withFilerClientFailover(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error {
// Get current filer as starting point
currentFiler := s3a.filerClient.GetCurrentFiler()
// Try current filer first (fast path)
err := pb.WithGrpcClient(streamingMode, s3a.randomClientId, func(grpcConnection *grpc.ClientConn) error {
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
return fn(client)
}, currentFiler.ToGrpcAddress(), false, s3a.option.GrpcDialOption)
if err == nil {
s3a.filerClient.RecordFilerSuccess(currentFiler)
return nil
}
// Record failure for current filer
s3a.filerClient.RecordFilerFailure(currentFiler)
// Current filer failed - try all other filers with health-aware selection
filers := s3a.filerClient.GetAllFilers()
var lastErr error = err
for _, filer := range filers {
if filer == currentFiler {
continue // Already tried this one
}
// Skip filers known to be unhealthy (circuit breaker pattern)
if s3a.filerClient.ShouldSkipUnhealthyFiler(filer) {
glog.V(2).Infof("WithFilerClient: skipping unhealthy filer %s", filer)
continue
}
err = pb.WithGrpcClient(streamingMode, s3a.randomClientId, func(grpcConnection *grpc.ClientConn) error {
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
return fn(client)
}, filer.ToGrpcAddress(), false, s3a.option.GrpcDialOption)
if err == nil {
// Success! Record success and update current filer for future requests
s3a.filerClient.RecordFilerSuccess(filer)
s3a.filerClient.SetCurrentFiler(filer)
glog.V(1).Infof("WithFilerClient: failover from %s to %s succeeded", currentFiler, filer)
return nil
}
// Record failure for health tracking
s3a.filerClient.RecordFilerFailure(filer)
glog.V(2).Infof("WithFilerClient: failover to %s failed: %v", filer, err)
lastErr = err
}
// All filers failed
return fmt.Errorf("all filers failed, last error: %w", lastErr)
}
func (s3a *S3ApiServer) AdjustedUrl(location *filer_pb.Location) string {
return location.Url
}

5
weed/server/master_server_handlers_ui.go

@ -1,10 +1,11 @@
package weed_server
import (
"github.com/seaweedfs/seaweedfs/weed/util/version"
"net/http"
"time"
"github.com/seaweedfs/seaweedfs/weed/util/version"
hashicorpRaft "github.com/hashicorp/raft"
"github.com/seaweedfs/raft"
@ -14,7 +15,7 @@ import (
func (ms *MasterServer) uiStatusHandler(w http.ResponseWriter, r *http.Request) {
infos := make(map[string]interface{})
infos["Up Time"] = time.Now().Sub(startTime).String()
infos["Up Time"] = time.Since(startTime).Truncate(time.Second).String()
infos["Max Volume Id"] = ms.Topo.GetMaxVolumeId()
ms.Topo.RaftServerAccessLock.RLock()

2
weed/server/volume_server_handlers_ui.go

@ -18,7 +18,7 @@ import (
func (vs *VolumeServer) uiStatusHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Server", "SeaweedFS Volume "+version.VERSION)
infos := make(map[string]interface{})
infos["Up Time"] = time.Now().Sub(startTime).String()
infos["Up Time"] = time.Since(startTime).Truncate(time.Second).String()
var ds []*volume_server_pb.DiskStatus
for _, loc := range vs.store.Locations {
if dir, e := filepath.Abs(loc.Directory); e == nil {

84
weed/wdclient/filer_client.go

@ -169,7 +169,8 @@ func NewFilerClient(filerAddresses []pb.ServerAddress, grpcDialOption grpc.DialO
}
// Start filer discovery if master client is configured
if masterClient != nil && filerGroup != "" {
// Empty filerGroup is valid (represents default group)
if masterClient != nil {
fc.stopDiscovery = make(chan struct{})
go fc.discoverFilers()
glog.V(0).Infof("FilerClient: started filer discovery for group '%s' (refresh interval: %v)", filerGroup, discoveryInterval)
@ -205,6 +206,85 @@ func (fc *FilerClient) GetCurrentFiler() pb.ServerAddress {
return fc.filerAddresses[index]
}
// GetAllFilers returns a snapshot of all filer addresses
// Returns a copy to avoid concurrent modification issues
func (fc *FilerClient) GetAllFilers() []pb.ServerAddress {
fc.filerAddressesMu.RLock()
defer fc.filerAddressesMu.RUnlock()
// Return a copy to avoid concurrent modification
filers := make([]pb.ServerAddress, len(fc.filerAddresses))
copy(filers, fc.filerAddresses)
return filers
}
// SetCurrentFiler updates the current filer index to the specified address
// This is useful after successful failover to prefer the healthy filer for future requests
func (fc *FilerClient) SetCurrentFiler(addr pb.ServerAddress) {
fc.filerAddressesMu.RLock()
defer fc.filerAddressesMu.RUnlock()
// Find the index of the specified filer address
for i, filer := range fc.filerAddresses {
if filer == addr {
atomic.StoreInt32(&fc.filerIndex, int32(i))
return
}
}
// If address not found, leave index unchanged
}
// ShouldSkipUnhealthyFiler checks if a filer address should be skipped based on health tracking
// Returns true if the filer has exceeded failure threshold and reset timeout hasn't elapsed
func (fc *FilerClient) ShouldSkipUnhealthyFiler(addr pb.ServerAddress) bool {
fc.filerAddressesMu.RLock()
defer fc.filerAddressesMu.RUnlock()
// Find the health for this filer address
for i, filer := range fc.filerAddresses {
if filer == addr {
if i < len(fc.filerHealth) {
return fc.shouldSkipUnhealthyFilerWithHealth(fc.filerHealth[i])
}
return false
}
}
// If address not found, don't skip it
return false
}
// RecordFilerSuccess resets failure tracking for a successful filer
func (fc *FilerClient) RecordFilerSuccess(addr pb.ServerAddress) {
fc.filerAddressesMu.RLock()
defer fc.filerAddressesMu.RUnlock()
// Find the health for this filer address
for i, filer := range fc.filerAddresses {
if filer == addr {
if i < len(fc.filerHealth) {
fc.recordFilerSuccessWithHealth(fc.filerHealth[i])
}
return
}
}
}
// RecordFilerFailure increments failure count for an unhealthy filer
func (fc *FilerClient) RecordFilerFailure(addr pb.ServerAddress) {
fc.filerAddressesMu.RLock()
defer fc.filerAddressesMu.RUnlock()
// Find the health for this filer address
for i, filer := range fc.filerAddresses {
if filer == addr {
if i < len(fc.filerHealth) {
fc.recordFilerFailureWithHealth(fc.filerHealth[i])
}
return
}
}
}
// Close stops the filer discovery goroutine if running
// Safe to call multiple times (idempotent)
func (fc *FilerClient) Close() {
@ -425,7 +505,7 @@ func (fc *FilerClient) shouldSkipUnhealthyFilerWithHealth(health *filerHealth) b
// Deprecated: Use shouldSkipUnhealthyFilerWithHealth instead
// This function is kept for backward compatibility but requires array access
// Note: Accesses filerHealth without lock; safe only when discovery is disabled
// Note: This function is now thread-safe.
func (fc *FilerClient) shouldSkipUnhealthyFiler(index int32) bool {
fc.filerAddressesMu.RLock()
if index >= int32(len(fc.filerHealth)) {

Loading…
Cancel
Save