diff --git a/weed/wdclient/filer_client.go b/weed/wdclient/filer_client.go index f0dd5f2e6..820a346c5 100644 --- a/weed/wdclient/filer_client.go +++ b/weed/wdclient/filer_client.go @@ -5,6 +5,7 @@ import ( "fmt" "math/rand" "strings" + "sync" "sync/atomic" "time" @@ -12,6 +13,7 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + "github.com/seaweedfs/seaweedfs/weed/cluster" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" @@ -35,9 +37,11 @@ type filerHealth struct { // It uses the shared vidMap cache for efficient lookups // Supports multiple filer addresses with automatic failover for high availability // Tracks filer health to avoid repeatedly trying known-unhealthy filers +// Can discover additional filers from master server when configured with filer group type FilerClient struct { *vidMapClient filerAddresses []pb.ServerAddress + filerAddressesMu sync.RWMutex // Protects filerAddresses and filerHealth filerIndex int32 // atomic: current filer index for round-robin filerHealth []*filerHealth // health status per filer (same order as filerAddresses) grpcDialOption grpc.DialOption @@ -50,6 +54,12 @@ type FilerClient struct { maxRetries int // Retry: maximum retry attempts for transient failures initialRetryWait time.Duration // Retry: initial wait time before first retry retryBackoffFactor float64 // Retry: backoff multiplier for wait time + + // Filer discovery fields + masterClient *MasterClient // Optional: for discovering filers in the same group + filerGroup string // Optional: filer group for discovery + discoveryInterval time.Duration // How often to refresh filer list from master + stopDiscovery chan struct{} // Signal to stop discovery goroutine } // filerVolumeProvider implements VolumeLocationProvider by querying filer @@ -68,6 +78,11 @@ type FilerClientOption struct { MaxRetries int // Retry: maximum retry attempts for transient failures (0 = use default of 3) InitialRetryWait time.Duration // Retry: initial wait time before first retry (0 = use default of 1s) RetryBackoffFactor float64 // Retry: backoff multiplier for wait time (0 = use default of 1.5) + + // Filer discovery options + MasterClient *MasterClient // Optional: enables filer discovery from master + FilerGroup string // Optional: filer group name for discovery (required if MasterClient is set) + DiscoveryInterval time.Duration // Optional: how often to refresh filer list (0 = use default of 5 minutes) } // NewFilerClient creates a new client that queries filer(s) for volume locations @@ -87,6 +102,9 @@ func NewFilerClient(filerAddresses []pb.ServerAddress, grpcDialOption grpc.DialO maxRetries := 3 // Default: 3 retry attempts for transient failures initialRetryWait := time.Second // Default: 1 second initial retry wait retryBackoffFactor := 1.5 // Default: 1.5x backoff multiplier + var masterClient *MasterClient + var filerGroup string + discoveryInterval := 5 * time.Minute // Default: refresh every 5 minutes // Override with provided options if len(opts) > 0 && opts[0] != nil { @@ -115,6 +133,13 @@ func NewFilerClient(filerAddresses []pb.ServerAddress, grpcDialOption grpc.DialO if opt.RetryBackoffFactor > 0 { retryBackoffFactor = opt.RetryBackoffFactor } + if opt.MasterClient != nil { + masterClient = opt.MasterClient + filerGroup = opt.FilerGroup + if opt.DiscoveryInterval > 0 { + discoveryInterval = opt.DiscoveryInterval + } + } } // Initialize health tracking for each filer @@ -137,6 +162,16 @@ func NewFilerClient(filerAddresses []pb.ServerAddress, grpcDialOption grpc.DialO maxRetries: maxRetries, initialRetryWait: initialRetryWait, retryBackoffFactor: retryBackoffFactor, + masterClient: masterClient, + filerGroup: filerGroup, + discoveryInterval: discoveryInterval, + } + + // Start filer discovery if master client is configured + if masterClient != nil && filerGroup != "" { + fc.stopDiscovery = make(chan struct{}) + go fc.discoverFilers() + glog.V(0).Infof("FilerClient: started filer discovery for group '%s' (refresh interval: %v)", filerGroup, discoveryInterval) } // Create provider that references this FilerClient for failover support @@ -149,6 +184,97 @@ func NewFilerClient(filerAddresses []pb.ServerAddress, grpcDialOption grpc.DialO return fc } +// Close stops the filer discovery goroutine if running +func (fc *FilerClient) Close() { + if fc.stopDiscovery != nil { + close(fc.stopDiscovery) + } +} + +// discoverFilers periodically queries the master to discover filers in the same group +// and updates the filer list. This runs in a background goroutine. +func (fc *FilerClient) discoverFilers() { + // Do an initial discovery + fc.refreshFilerList() + + ticker := time.NewTicker(fc.discoveryInterval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + fc.refreshFilerList() + case <-fc.stopDiscovery: + glog.V(0).Infof("FilerClient: stopping filer discovery for group '%s'", fc.filerGroup) + return + } + } +} + +// refreshFilerList queries the master for the current list of filers and updates the local list +func (fc *FilerClient) refreshFilerList() { + if fc.masterClient == nil { + return + } + + // Get current master address + currentMaster := fc.masterClient.GetMaster(context.Background()) + if currentMaster == "" { + glog.V(1).Infof("FilerClient: no master available for filer discovery") + return + } + + // Query master for filers in our group + updates := cluster.ListExistingPeerUpdates(currentMaster, fc.grpcDialOption, fc.filerGroup, cluster.FilerType) + + if len(updates) == 0 { + glog.V(2).Infof("FilerClient: no filers found in group '%s'", fc.filerGroup) + return + } + + // Build new filer address list + discoveredFilers := make(map[pb.ServerAddress]bool) + for _, update := range updates { + if update.Address != "" { + discoveredFilers[pb.ServerAddress(update.Address)] = true + } + } + + // Thread-safe update of filer list + fc.filerAddressesMu.Lock() + defer fc.filerAddressesMu.Unlock() + + // Build list of new filers that aren't in our current list + var newFilers []pb.ServerAddress + for addr := range discoveredFilers { + found := false + for _, existing := range fc.filerAddresses { + if existing == addr { + found = true + break + } + } + if !found { + newFilers = append(newFilers, addr) + } + } + + // Add new filers + if len(newFilers) > 0 { + glog.V(0).Infof("FilerClient: discovered %d new filer(s) in group '%s': %v", len(newFilers), fc.filerGroup, newFilers) + fc.filerAddresses = append(fc.filerAddresses, newFilers...) + + // Initialize health tracking for new filers + for range newFilers { + fc.filerHealth = append(fc.filerHealth, &filerHealth{}) + } + } + + // Optionally, remove filers that are no longer in the cluster + // For now, we keep all filers and rely on health checks to avoid dead ones + // This prevents removing filers that might be temporarily unavailable +} + // GetLookupFileIdFunction returns a lookup function with URL preference handling func (fc *FilerClient) GetLookupFileIdFunction() LookupFileIdFunctionType { if fc.urlPreference == PreferUrl { @@ -299,13 +425,25 @@ func (p *filerVolumeProvider) LookupVolumeIds(ctx context.Context, volumeIds []s // Try all filer addresses with round-robin starting from current index // Skip known-unhealthy filers (circuit breaker pattern) i := atomic.LoadInt32(&fc.filerIndex) + + // Get filer count with read lock + fc.filerAddressesMu.RLock() n := int32(len(fc.filerAddresses)) + fc.filerAddressesMu.RUnlock() for x := int32(0); x < n; x++ { + // Get current filer address and health with read lock + fc.filerAddressesMu.RLock() + if i >= int32(len(fc.filerAddresses)) { + // Filer list changed, reset index + i = 0 + } + // Circuit breaker: skip unhealthy filers if fc.shouldSkipUnhealthyFiler(i) { glog.V(2).Infof("FilerClient: skipping unhealthy filer %s (consecutive failures: %d)", fc.filerAddresses[i], atomic.LoadInt32(&fc.filerHealth[i].failureCount)) + fc.filerAddressesMu.RUnlock() i++ if i >= n { i = 0 @@ -314,6 +452,7 @@ func (p *filerVolumeProvider) LookupVolumeIds(ctx context.Context, volumeIds []s } filerAddress := fc.filerAddresses[i] + fc.filerAddressesMu.RUnlock() // Use anonymous function to ensure defer cancel() is called per iteration, not accumulated err := func() error {