From 60dae3887dcda065c3512d0f509294acb8c38d9e Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Tue, 25 Nov 2025 20:03:59 -0800 Subject: [PATCH] Add filer discovery: treat initial filers as seeds and discover peers from master Enhances FilerClient to automatically discover additional filers in the same filer group by querying the master server. This allows users to specify just a few seed filers, and the client will discover all other filers in the cluster. Key changes to wdclient/FilerClient: - Added MasterClient, FilerGroup, and DiscoveryInterval fields - Added thread-safe filer list management with RWMutex - Implemented discoverFilers() background goroutine - Uses cluster.ListExistingPeerUpdates() to query master for filers - Automatically adds newly discovered filers to the list - Added Close() method to clean up discovery goroutine New FilerClientOption fields: - MasterClient: enables filer discovery from master - FilerGroup: specifies which filer group to discover - DiscoveryInterval: how often to refresh (default 5 minutes) Usage example: masterClient := wdclient.NewMasterClient(...) filerClient := wdclient.NewFilerClient( []pb.ServerAddress{"localhost:8888"}, // seed filers grpcDialOption, dataCenter, &wdclient.FilerClientOption{ MasterClient: masterClient, FilerGroup: "my-group", }, ) defer filerClient.Close() The initial filers act as seeds - the client discovers and adds all other filers in the same group from the master. Discovered filers are added dynamically without removing existing ones (relying on health checks for unavailable filers). --- weed/wdclient/filer_client.go | 139 ++++++++++++++++++++++++++++++++++ 1 file changed, 139 insertions(+) diff --git a/weed/wdclient/filer_client.go b/weed/wdclient/filer_client.go index f0dd5f2e6..820a346c5 100644 --- a/weed/wdclient/filer_client.go +++ b/weed/wdclient/filer_client.go @@ -5,6 +5,7 @@ import ( "fmt" "math/rand" "strings" + "sync" "sync/atomic" "time" @@ -12,6 +13,7 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + "github.com/seaweedfs/seaweedfs/weed/cluster" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" @@ -35,9 +37,11 @@ type filerHealth struct { // It uses the shared vidMap cache for efficient lookups // Supports multiple filer addresses with automatic failover for high availability // Tracks filer health to avoid repeatedly trying known-unhealthy filers +// Can discover additional filers from master server when configured with filer group type FilerClient struct { *vidMapClient filerAddresses []pb.ServerAddress + filerAddressesMu sync.RWMutex // Protects filerAddresses and filerHealth filerIndex int32 // atomic: current filer index for round-robin filerHealth []*filerHealth // health status per filer (same order as filerAddresses) grpcDialOption grpc.DialOption @@ -50,6 +54,12 @@ type FilerClient struct { maxRetries int // Retry: maximum retry attempts for transient failures initialRetryWait time.Duration // Retry: initial wait time before first retry retryBackoffFactor float64 // Retry: backoff multiplier for wait time + + // Filer discovery fields + masterClient *MasterClient // Optional: for discovering filers in the same group + filerGroup string // Optional: filer group for discovery + discoveryInterval time.Duration // How often to refresh filer list from master + stopDiscovery chan struct{} // Signal to stop discovery goroutine } // filerVolumeProvider implements VolumeLocationProvider by querying filer @@ -68,6 +78,11 @@ type FilerClientOption struct { MaxRetries int // Retry: maximum retry attempts for transient failures (0 = use default of 3) InitialRetryWait time.Duration // Retry: initial wait time before first retry (0 = use default of 1s) RetryBackoffFactor float64 // Retry: backoff multiplier for wait time (0 = use default of 1.5) + + // Filer discovery options + MasterClient *MasterClient // Optional: enables filer discovery from master + FilerGroup string // Optional: filer group name for discovery (required if MasterClient is set) + DiscoveryInterval time.Duration // Optional: how often to refresh filer list (0 = use default of 5 minutes) } // NewFilerClient creates a new client that queries filer(s) for volume locations @@ -87,6 +102,9 @@ func NewFilerClient(filerAddresses []pb.ServerAddress, grpcDialOption grpc.DialO maxRetries := 3 // Default: 3 retry attempts for transient failures initialRetryWait := time.Second // Default: 1 second initial retry wait retryBackoffFactor := 1.5 // Default: 1.5x backoff multiplier + var masterClient *MasterClient + var filerGroup string + discoveryInterval := 5 * time.Minute // Default: refresh every 5 minutes // Override with provided options if len(opts) > 0 && opts[0] != nil { @@ -115,6 +133,13 @@ func NewFilerClient(filerAddresses []pb.ServerAddress, grpcDialOption grpc.DialO if opt.RetryBackoffFactor > 0 { retryBackoffFactor = opt.RetryBackoffFactor } + if opt.MasterClient != nil { + masterClient = opt.MasterClient + filerGroup = opt.FilerGroup + if opt.DiscoveryInterval > 0 { + discoveryInterval = opt.DiscoveryInterval + } + } } // Initialize health tracking for each filer @@ -137,6 +162,16 @@ func NewFilerClient(filerAddresses []pb.ServerAddress, grpcDialOption grpc.DialO maxRetries: maxRetries, initialRetryWait: initialRetryWait, retryBackoffFactor: retryBackoffFactor, + masterClient: masterClient, + filerGroup: filerGroup, + discoveryInterval: discoveryInterval, + } + + // Start filer discovery if master client is configured + if masterClient != nil && filerGroup != "" { + fc.stopDiscovery = make(chan struct{}) + go fc.discoverFilers() + glog.V(0).Infof("FilerClient: started filer discovery for group '%s' (refresh interval: %v)", filerGroup, discoveryInterval) } // Create provider that references this FilerClient for failover support @@ -149,6 +184,97 @@ func NewFilerClient(filerAddresses []pb.ServerAddress, grpcDialOption grpc.DialO return fc } +// Close stops the filer discovery goroutine if running +func (fc *FilerClient) Close() { + if fc.stopDiscovery != nil { + close(fc.stopDiscovery) + } +} + +// discoverFilers periodically queries the master to discover filers in the same group +// and updates the filer list. This runs in a background goroutine. +func (fc *FilerClient) discoverFilers() { + // Do an initial discovery + fc.refreshFilerList() + + ticker := time.NewTicker(fc.discoveryInterval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + fc.refreshFilerList() + case <-fc.stopDiscovery: + glog.V(0).Infof("FilerClient: stopping filer discovery for group '%s'", fc.filerGroup) + return + } + } +} + +// refreshFilerList queries the master for the current list of filers and updates the local list +func (fc *FilerClient) refreshFilerList() { + if fc.masterClient == nil { + return + } + + // Get current master address + currentMaster := fc.masterClient.GetMaster(context.Background()) + if currentMaster == "" { + glog.V(1).Infof("FilerClient: no master available for filer discovery") + return + } + + // Query master for filers in our group + updates := cluster.ListExistingPeerUpdates(currentMaster, fc.grpcDialOption, fc.filerGroup, cluster.FilerType) + + if len(updates) == 0 { + glog.V(2).Infof("FilerClient: no filers found in group '%s'", fc.filerGroup) + return + } + + // Build new filer address list + discoveredFilers := make(map[pb.ServerAddress]bool) + for _, update := range updates { + if update.Address != "" { + discoveredFilers[pb.ServerAddress(update.Address)] = true + } + } + + // Thread-safe update of filer list + fc.filerAddressesMu.Lock() + defer fc.filerAddressesMu.Unlock() + + // Build list of new filers that aren't in our current list + var newFilers []pb.ServerAddress + for addr := range discoveredFilers { + found := false + for _, existing := range fc.filerAddresses { + if existing == addr { + found = true + break + } + } + if !found { + newFilers = append(newFilers, addr) + } + } + + // Add new filers + if len(newFilers) > 0 { + glog.V(0).Infof("FilerClient: discovered %d new filer(s) in group '%s': %v", len(newFilers), fc.filerGroup, newFilers) + fc.filerAddresses = append(fc.filerAddresses, newFilers...) + + // Initialize health tracking for new filers + for range newFilers { + fc.filerHealth = append(fc.filerHealth, &filerHealth{}) + } + } + + // Optionally, remove filers that are no longer in the cluster + // For now, we keep all filers and rely on health checks to avoid dead ones + // This prevents removing filers that might be temporarily unavailable +} + // GetLookupFileIdFunction returns a lookup function with URL preference handling func (fc *FilerClient) GetLookupFileIdFunction() LookupFileIdFunctionType { if fc.urlPreference == PreferUrl { @@ -299,13 +425,25 @@ func (p *filerVolumeProvider) LookupVolumeIds(ctx context.Context, volumeIds []s // Try all filer addresses with round-robin starting from current index // Skip known-unhealthy filers (circuit breaker pattern) i := atomic.LoadInt32(&fc.filerIndex) + + // Get filer count with read lock + fc.filerAddressesMu.RLock() n := int32(len(fc.filerAddresses)) + fc.filerAddressesMu.RUnlock() for x := int32(0); x < n; x++ { + // Get current filer address and health with read lock + fc.filerAddressesMu.RLock() + if i >= int32(len(fc.filerAddresses)) { + // Filer list changed, reset index + i = 0 + } + // Circuit breaker: skip unhealthy filers if fc.shouldSkipUnhealthyFiler(i) { glog.V(2).Infof("FilerClient: skipping unhealthy filer %s (consecutive failures: %d)", fc.filerAddresses[i], atomic.LoadInt32(&fc.filerHealth[i].failureCount)) + fc.filerAddressesMu.RUnlock() i++ if i >= n { i = 0 @@ -314,6 +452,7 @@ func (p *filerVolumeProvider) LookupVolumeIds(ctx context.Context, volumeIds []s } filerAddress := fc.filerAddresses[i] + fc.filerAddressesMu.RUnlock() // Use anonymous function to ensure defer cancel() is called per iteration, not accumulated err := func() error {