@ -29,8 +29,8 @@ const (
// filerHealth tracks the health status of a filer
type filerHealth struct {
failureCount int32 // atomic: consecutive failures
lastFailureTimeNs int64 // atomic: last failure time in Unix nanoseconds
failureCount int32 // atomic: consecutive failures
}
// FilerClient provides volume location services by querying a filer
@ -54,7 +54,7 @@ type FilerClient struct {
maxRetries int // Retry: maximum retry attempts for transient failures
initialRetryWait time . Duration // Retry: initial wait time before first retry
retryBackoffFactor float64 // Retry: backoff multiplier for wait time
// Filer discovery fields
masterClient * MasterClient // Optional: for discovering filers in the same group
filerGroup string // Optional: filer group for discovery
@ -79,7 +79,7 @@ type FilerClientOption struct {
MaxRetries int // Retry: maximum retry attempts for transient failures (0 = use default of 3)
InitialRetryWait time . Duration // Retry: initial wait time before first retry (0 = use default of 1s)
RetryBackoffFactor float64 // Retry: backoff multiplier for wait time (0 = use default of 1.5)
// Filer discovery options
MasterClient * MasterClient // Optional: enables filer discovery from master
FilerGroup string // Optional: filer group name for discovery (required if MasterClient is set)
@ -192,17 +192,17 @@ func NewFilerClient(filerAddresses []pb.ServerAddress, grpcDialOption grpc.DialO
func ( fc * FilerClient ) GetCurrentFiler ( ) pb . ServerAddress {
fc . filerAddressesMu . RLock ( )
defer fc . filerAddressesMu . RUnlock ( )
if len ( fc . filerAddresses ) == 0 {
return ""
}
// Get current index (atomically updated on successful operations)
index := atomic . LoadInt32 ( & fc . filerIndex )
if index >= int32 ( len ( fc . filerAddresses ) ) {
index = 0
}
return fc . filerAddresses [ index ]
}
@ -211,7 +211,7 @@ func (fc *FilerClient) GetCurrentFiler() pb.ServerAddress {
func ( fc * FilerClient ) GetAllFilers ( ) [ ] pb . ServerAddress {
fc . filerAddressesMu . RLock ( )
defer fc . filerAddressesMu . RUnlock ( )
// Return a copy to avoid concurrent modification
filers := make ( [ ] pb . ServerAddress , len ( fc . filerAddresses ) )
copy ( filers , fc . filerAddresses )
@ -223,7 +223,7 @@ func (fc *FilerClient) GetAllFilers() []pb.ServerAddress {
func ( fc * FilerClient ) SetCurrentFiler ( addr pb . ServerAddress ) {
fc . filerAddressesMu . RLock ( )
defer fc . filerAddressesMu . RUnlock ( )
// Find the index of the specified filer address
for i , filer := range fc . filerAddresses {
if filer == addr {
@ -239,7 +239,7 @@ func (fc *FilerClient) SetCurrentFiler(addr pb.ServerAddress) {
func ( fc * FilerClient ) ShouldSkipUnhealthyFiler ( addr pb . ServerAddress ) bool {
fc . filerAddressesMu . RLock ( )
defer fc . filerAddressesMu . RUnlock ( )
// Find the health for this filer address
for i , filer := range fc . filerAddresses {
if filer == addr {
@ -257,7 +257,7 @@ func (fc *FilerClient) ShouldSkipUnhealthyFiler(addr pb.ServerAddress) bool {
func ( fc * FilerClient ) RecordFilerSuccess ( addr pb . ServerAddress ) {
fc . filerAddressesMu . RLock ( )
defer fc . filerAddressesMu . RUnlock ( )
// Find the health for this filer address
for i , filer := range fc . filerAddresses {
if filer == addr {
@ -273,7 +273,7 @@ func (fc *FilerClient) RecordFilerSuccess(addr pb.ServerAddress) {
func ( fc * FilerClient ) RecordFilerFailure ( addr pb . ServerAddress ) {
fc . filerAddressesMu . RLock ( )
defer fc . filerAddressesMu . RUnlock ( )
// Find the health for this filer address
for i , filer := range fc . filerAddresses {
if filer == addr {
@ -303,13 +303,13 @@ func (fc *FilerClient) discoverFilers() {
glog . Errorf ( "FilerClient: panic in filer discovery goroutine for group '%s': %v" , fc . filerGroup , r )
}
} ( )
// Do an initial discovery
fc . refreshFilerList ( )
ticker := time . NewTicker ( fc . discoveryInterval )
defer ticker . Stop ( )
for {
select {
case <- ticker . C :
@ -326,22 +326,22 @@ func (fc *FilerClient) refreshFilerList() {
if fc . masterClient == nil {
return
}
// Get current master address
currentMaster := fc . masterClient . GetMaster ( context . Background ( ) )
if currentMaster == "" {
glog . V ( 1 ) . Infof ( "FilerClient: no master available for filer discovery" )
return
}
// Query master for filers in our group
updates := cluster . ListExistingPeerUpdates ( currentMaster , fc . grpcDialOption , fc . filerGroup , cluster . FilerType )
if len ( updates ) == 0 {
glog . V ( 2 ) . Infof ( "FilerClient: no filers found in group '%s'" , fc . filerGroup )
return
}
// Build new filer address list
discoveredFilers := make ( map [ pb . ServerAddress ] bool )
for _ , update := range updates {
@ -349,17 +349,17 @@ func (fc *FilerClient) refreshFilerList() {
discoveredFilers [ pb . ServerAddress ( update . Address ) ] = true
}
}
// Thread-safe update of filer list
fc . filerAddressesMu . Lock ( )
defer fc . filerAddressesMu . Unlock ( )
// Build a map of existing filers for efficient O(1) lookup
existingFilers := make ( map [ pb . ServerAddress ] struct { } , len ( fc . filerAddresses ) )
for _ , f := range fc . filerAddresses {
existingFilers [ f ] = struct { } { }
}
// Find new filers - O(N+M) instead of O(N*M)
var newFilers [ ] pb . ServerAddress
for addr := range discoveredFilers {
@ -367,18 +367,18 @@ func (fc *FilerClient) refreshFilerList() {
newFilers = append ( newFilers , addr )
}
}
// Add new filers
if len ( newFilers ) > 0 {
glog . V ( 0 ) . Infof ( "FilerClient: discovered %d new filer(s) in group '%s': %v" , len ( newFilers ) , fc . filerGroup , newFilers )
fc . filerAddresses = append ( fc . filerAddresses , newFilers ... )
// Initialize health tracking for new filers
for range newFilers {
fc . filerHealth = append ( fc . filerHealth , & filerHealth { } )
}
}
// Optionally, remove filers that are no longer in the cluster
// For now, we keep all filers and rely on health checks to avoid dead ones
// This prevents removing filers that might be temporarily unavailable
@ -571,7 +571,7 @@ func (p *filerVolumeProvider) LookupVolumeIds(ctx context.Context, volumeIds []s
// Try all filer addresses with round-robin starting from current index
// Skip known-unhealthy filers (circuit breaker pattern)
i := atomic . LoadInt32 ( & fc . filerIndex )
// Get filer count with read lock
fc . filerAddressesMu . RLock ( )
n := int32 ( len ( fc . filerAddresses ) )
@ -589,12 +589,12 @@ func (p *filerVolumeProvider) LookupVolumeIds(ctx context.Context, volumeIds []s
// Filer list changed, reset index
i = 0
}
// Get health pointer while holding lock
health := fc . filerHealth [ i ]
filerAddress := fc . filerAddresses [ i ]
fc . filerAddressesMu . RUnlock ( )
// Circuit breaker: skip unhealthy filers (no lock needed - uses atomics)
if fc . shouldSkipUnhealthyFilerWithHealth ( health ) {
glog . V ( 2 ) . Infof ( "FilerClient: skipping unhealthy filer %s (consecutive failures: %d)" ,