|
|
|
@ -169,7 +169,8 @@ func NewFilerClient(filerAddresses []pb.ServerAddress, grpcDialOption grpc.DialO |
|
|
|
} |
|
|
|
|
|
|
|
// Start filer discovery if master client is configured
|
|
|
|
if masterClient != nil && filerGroup != "" { |
|
|
|
// Empty filerGroup is valid (represents default group)
|
|
|
|
if masterClient != nil { |
|
|
|
fc.stopDiscovery = make(chan struct{}) |
|
|
|
go fc.discoverFilers() |
|
|
|
glog.V(0).Infof("FilerClient: started filer discovery for group '%s' (refresh interval: %v)", filerGroup, discoveryInterval) |
|
|
|
@ -205,6 +206,85 @@ func (fc *FilerClient) GetCurrentFiler() pb.ServerAddress { |
|
|
|
return fc.filerAddresses[index] |
|
|
|
} |
|
|
|
|
|
|
|
// GetAllFilers returns a snapshot of all filer addresses
|
|
|
|
// Returns a copy to avoid concurrent modification issues
|
|
|
|
func (fc *FilerClient) GetAllFilers() []pb.ServerAddress { |
|
|
|
fc.filerAddressesMu.RLock() |
|
|
|
defer fc.filerAddressesMu.RUnlock() |
|
|
|
|
|
|
|
// Return a copy to avoid concurrent modification
|
|
|
|
filers := make([]pb.ServerAddress, len(fc.filerAddresses)) |
|
|
|
copy(filers, fc.filerAddresses) |
|
|
|
return filers |
|
|
|
} |
|
|
|
|
|
|
|
// SetCurrentFiler updates the current filer index to the specified address
|
|
|
|
// This is useful after successful failover to prefer the healthy filer for future requests
|
|
|
|
func (fc *FilerClient) SetCurrentFiler(addr pb.ServerAddress) { |
|
|
|
fc.filerAddressesMu.RLock() |
|
|
|
defer fc.filerAddressesMu.RUnlock() |
|
|
|
|
|
|
|
// Find the index of the specified filer address
|
|
|
|
for i, filer := range fc.filerAddresses { |
|
|
|
if filer == addr { |
|
|
|
atomic.StoreInt32(&fc.filerIndex, int32(i)) |
|
|
|
return |
|
|
|
} |
|
|
|
} |
|
|
|
// If address not found, leave index unchanged
|
|
|
|
} |
|
|
|
|
|
|
|
// ShouldSkipUnhealthyFiler checks if a filer address should be skipped based on health tracking
|
|
|
|
// Returns true if the filer has exceeded failure threshold and reset timeout hasn't elapsed
|
|
|
|
func (fc *FilerClient) ShouldSkipUnhealthyFiler(addr pb.ServerAddress) bool { |
|
|
|
fc.filerAddressesMu.RLock() |
|
|
|
defer fc.filerAddressesMu.RUnlock() |
|
|
|
|
|
|
|
// Find the health for this filer address
|
|
|
|
for i, filer := range fc.filerAddresses { |
|
|
|
if filer == addr { |
|
|
|
if i < len(fc.filerHealth) { |
|
|
|
return fc.shouldSkipUnhealthyFilerWithHealth(fc.filerHealth[i]) |
|
|
|
} |
|
|
|
return false |
|
|
|
} |
|
|
|
} |
|
|
|
// If address not found, don't skip it
|
|
|
|
return false |
|
|
|
} |
|
|
|
|
|
|
|
// RecordFilerSuccess resets failure tracking for a successful filer
|
|
|
|
func (fc *FilerClient) RecordFilerSuccess(addr pb.ServerAddress) { |
|
|
|
fc.filerAddressesMu.RLock() |
|
|
|
defer fc.filerAddressesMu.RUnlock() |
|
|
|
|
|
|
|
// Find the health for this filer address
|
|
|
|
for i, filer := range fc.filerAddresses { |
|
|
|
if filer == addr { |
|
|
|
if i < len(fc.filerHealth) { |
|
|
|
fc.recordFilerSuccessWithHealth(fc.filerHealth[i]) |
|
|
|
} |
|
|
|
return |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// RecordFilerFailure increments failure count for an unhealthy filer
|
|
|
|
func (fc *FilerClient) RecordFilerFailure(addr pb.ServerAddress) { |
|
|
|
fc.filerAddressesMu.RLock() |
|
|
|
defer fc.filerAddressesMu.RUnlock() |
|
|
|
|
|
|
|
// Find the health for this filer address
|
|
|
|
for i, filer := range fc.filerAddresses { |
|
|
|
if filer == addr { |
|
|
|
if i < len(fc.filerHealth) { |
|
|
|
fc.recordFilerFailureWithHealth(fc.filerHealth[i]) |
|
|
|
} |
|
|
|
return |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// Close stops the filer discovery goroutine if running
|
|
|
|
// Safe to call multiple times (idempotent)
|
|
|
|
func (fc *FilerClient) Close() { |
|
|
|
@ -425,7 +505,7 @@ func (fc *FilerClient) shouldSkipUnhealthyFilerWithHealth(health *filerHealth) b |
|
|
|
|
|
|
|
// Deprecated: Use shouldSkipUnhealthyFilerWithHealth instead
|
|
|
|
// This function is kept for backward compatibility but requires array access
|
|
|
|
// Note: Accesses filerHealth without lock; safe only when discovery is disabled
|
|
|
|
// Note: This function is now thread-safe.
|
|
|
|
func (fc *FilerClient) shouldSkipUnhealthyFiler(index int32) bool { |
|
|
|
fc.filerAddressesMu.RLock() |
|
|
|
if index >= int32(len(fc.filerHealth)) { |
|
|
|
|