@ -5,6 +5,7 @@ import (
"fmt"
"math/rand"
"strings"
"sync"
"sync/atomic"
"time"
@ -12,6 +13,7 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/seaweedfs/seaweedfs/weed/cluster"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
@ -35,9 +37,11 @@ type filerHealth struct {
// It uses the shared vidMap cache for efficient lookups
// Supports multiple filer addresses with automatic failover for high availability
// Tracks filer health to avoid repeatedly trying known-unhealthy filers
// Can discover additional filers from master server when configured with filer group
type FilerClient struct {
* vidMapClient
filerAddresses [ ] pb . ServerAddress
filerAddressesMu sync . RWMutex // Protects filerAddresses and filerHealth
filerIndex int32 // atomic: current filer index for round-robin
filerHealth [ ] * filerHealth // health status per filer (same order as filerAddresses)
grpcDialOption grpc . DialOption
@ -50,6 +54,12 @@ type FilerClient struct {
maxRetries int // Retry: maximum retry attempts for transient failures
initialRetryWait time . Duration // Retry: initial wait time before first retry
retryBackoffFactor float64 // Retry: backoff multiplier for wait time
// Filer discovery fields
masterClient * MasterClient // Optional: for discovering filers in the same group
filerGroup string // Optional: filer group for discovery
discoveryInterval time . Duration // How often to refresh filer list from master
stopDiscovery chan struct { } // Signal to stop discovery goroutine
}
// filerVolumeProvider implements VolumeLocationProvider by querying filer
@ -68,6 +78,11 @@ type FilerClientOption struct {
MaxRetries int // Retry: maximum retry attempts for transient failures (0 = use default of 3)
InitialRetryWait time . Duration // Retry: initial wait time before first retry (0 = use default of 1s)
RetryBackoffFactor float64 // Retry: backoff multiplier for wait time (0 = use default of 1.5)
// Filer discovery options
MasterClient * MasterClient // Optional: enables filer discovery from master
FilerGroup string // Optional: filer group name for discovery (required if MasterClient is set)
DiscoveryInterval time . Duration // Optional: how often to refresh filer list (0 = use default of 5 minutes)
}
// NewFilerClient creates a new client that queries filer(s) for volume locations
@ -87,6 +102,9 @@ func NewFilerClient(filerAddresses []pb.ServerAddress, grpcDialOption grpc.DialO
maxRetries := 3 // Default: 3 retry attempts for transient failures
initialRetryWait := time . Second // Default: 1 second initial retry wait
retryBackoffFactor := 1.5 // Default: 1.5x backoff multiplier
var masterClient * MasterClient
var filerGroup string
discoveryInterval := 5 * time . Minute // Default: refresh every 5 minutes
// Override with provided options
if len ( opts ) > 0 && opts [ 0 ] != nil {
@ -115,6 +133,13 @@ func NewFilerClient(filerAddresses []pb.ServerAddress, grpcDialOption grpc.DialO
if opt . RetryBackoffFactor > 0 {
retryBackoffFactor = opt . RetryBackoffFactor
}
if opt . MasterClient != nil {
masterClient = opt . MasterClient
filerGroup = opt . FilerGroup
if opt . DiscoveryInterval > 0 {
discoveryInterval = opt . DiscoveryInterval
}
}
}
// Initialize health tracking for each filer
@ -137,6 +162,16 @@ func NewFilerClient(filerAddresses []pb.ServerAddress, grpcDialOption grpc.DialO
maxRetries : maxRetries ,
initialRetryWait : initialRetryWait ,
retryBackoffFactor : retryBackoffFactor ,
masterClient : masterClient ,
filerGroup : filerGroup ,
discoveryInterval : discoveryInterval ,
}
// Start filer discovery if master client is configured
if masterClient != nil && filerGroup != "" {
fc . stopDiscovery = make ( chan struct { } )
go fc . discoverFilers ( )
glog . V ( 0 ) . Infof ( "FilerClient: started filer discovery for group '%s' (refresh interval: %v)" , filerGroup , discoveryInterval )
}
// Create provider that references this FilerClient for failover support
@ -149,6 +184,97 @@ func NewFilerClient(filerAddresses []pb.ServerAddress, grpcDialOption grpc.DialO
return fc
}
// Close stops the filer discovery goroutine if running
func ( fc * FilerClient ) Close ( ) {
if fc . stopDiscovery != nil {
close ( fc . stopDiscovery )
}
}
// discoverFilers periodically queries the master to discover filers in the same group
// and updates the filer list. This runs in a background goroutine.
func ( fc * FilerClient ) discoverFilers ( ) {
// Do an initial discovery
fc . refreshFilerList ( )
ticker := time . NewTicker ( fc . discoveryInterval )
defer ticker . Stop ( )
for {
select {
case <- ticker . C :
fc . refreshFilerList ( )
case <- fc . stopDiscovery :
glog . V ( 0 ) . Infof ( "FilerClient: stopping filer discovery for group '%s'" , fc . filerGroup )
return
}
}
}
// refreshFilerList queries the master for the current list of filers and updates the local list
func ( fc * FilerClient ) refreshFilerList ( ) {
if fc . masterClient == nil {
return
}
// Get current master address
currentMaster := fc . masterClient . GetMaster ( context . Background ( ) )
if currentMaster == "" {
glog . V ( 1 ) . Infof ( "FilerClient: no master available for filer discovery" )
return
}
// Query master for filers in our group
updates := cluster . ListExistingPeerUpdates ( currentMaster , fc . grpcDialOption , fc . filerGroup , cluster . FilerType )
if len ( updates ) == 0 {
glog . V ( 2 ) . Infof ( "FilerClient: no filers found in group '%s'" , fc . filerGroup )
return
}
// Build new filer address list
discoveredFilers := make ( map [ pb . ServerAddress ] bool )
for _ , update := range updates {
if update . Address != "" {
discoveredFilers [ pb . ServerAddress ( update . Address ) ] = true
}
}
// Thread-safe update of filer list
fc . filerAddressesMu . Lock ( )
defer fc . filerAddressesMu . Unlock ( )
// Build list of new filers that aren't in our current list
var newFilers [ ] pb . ServerAddress
for addr := range discoveredFilers {
found := false
for _ , existing := range fc . filerAddresses {
if existing == addr {
found = true
break
}
}
if ! found {
newFilers = append ( newFilers , addr )
}
}
// Add new filers
if len ( newFilers ) > 0 {
glog . V ( 0 ) . Infof ( "FilerClient: discovered %d new filer(s) in group '%s': %v" , len ( newFilers ) , fc . filerGroup , newFilers )
fc . filerAddresses = append ( fc . filerAddresses , newFilers ... )
// Initialize health tracking for new filers
for range newFilers {
fc . filerHealth = append ( fc . filerHealth , & filerHealth { } )
}
}
// Optionally, remove filers that are no longer in the cluster
// For now, we keep all filers and rely on health checks to avoid dead ones
// This prevents removing filers that might be temporarily unavailable
}
// GetLookupFileIdFunction returns a lookup function with URL preference handling
func ( fc * FilerClient ) GetLookupFileIdFunction ( ) LookupFileIdFunctionType {
if fc . urlPreference == PreferUrl {
@ -299,13 +425,25 @@ func (p *filerVolumeProvider) LookupVolumeIds(ctx context.Context, volumeIds []s
// Try all filer addresses with round-robin starting from current index
// Skip known-unhealthy filers (circuit breaker pattern)
i := atomic . LoadInt32 ( & fc . filerIndex )
// Get filer count with read lock
fc . filerAddressesMu . RLock ( )
n := int32 ( len ( fc . filerAddresses ) )
fc . filerAddressesMu . RUnlock ( )
for x := int32 ( 0 ) ; x < n ; x ++ {
// Get current filer address and health with read lock
fc . filerAddressesMu . RLock ( )
if i >= int32 ( len ( fc . filerAddresses ) ) {
// Filer list changed, reset index
i = 0
}
// Circuit breaker: skip unhealthy filers
if fc . shouldSkipUnhealthyFiler ( i ) {
glog . V ( 2 ) . Infof ( "FilerClient: skipping unhealthy filer %s (consecutive failures: %d)" ,
fc . filerAddresses [ i ] , atomic . LoadInt32 ( & fc . filerHealth [ i ] . failureCount ) )
fc . filerAddressesMu . RUnlock ( )
i ++
if i >= n {
i = 0
@ -314,6 +452,7 @@ func (p *filerVolumeProvider) LookupVolumeIds(ctx context.Context, volumeIds []s
}
filerAddress := fc . filerAddresses [ i ]
fc . filerAddressesMu . RUnlock ( )
// Use anonymous function to ensure defer cancel() is called per iteration, not accumulated
err := func ( ) error {