diff --git a/weed/filer/filer_conf.go b/weed/filer/filer_conf.go index 27c32adfa..869b3b93d 100644 --- a/weed/filer/filer_conf.go +++ b/weed/filer/filer_conf.go @@ -37,22 +37,29 @@ func ReadFilerConf(filerGrpcAddress pb.ServerAddress, grpcDialOption grpc.DialOp // ReadFilerConfFromFilers reads filer configuration with multi-filer failover support func ReadFilerConfFromFilers(filerGrpcAddresses []pb.ServerAddress, grpcDialOption grpc.DialOption, masterClient *wdclient.MasterClient) (*FilerConf, error) { - var buf bytes.Buffer + var data []byte if err := pb.WithOneOfGrpcFilerClients(false, filerGrpcAddresses, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { if masterClient != nil { - return ReadEntry(masterClient, client, DirectoryEtcSeaweedFS, FilerConfName, &buf) - } else { - content, err := ReadInsideFiler(client, DirectoryEtcSeaweedFS, FilerConfName) - buf = *bytes.NewBuffer(content) + var buf bytes.Buffer + if err := ReadEntry(masterClient, client, DirectoryEtcSeaweedFS, FilerConfName, &buf); err != nil { + return err + } + data = buf.Bytes() + return nil + } + content, err := ReadInsideFiler(client, DirectoryEtcSeaweedFS, FilerConfName) + if err != nil { return err } + data = content + return nil }); err != nil && err != filer_pb.ErrNotFound { return nil, fmt.Errorf("read %s/%s: %v", DirectoryEtcSeaweedFS, FilerConfName, err) } fc := NewFilerConf() - if buf.Len() > 0 { - if err := fc.LoadFromBytes(buf.Bytes()); err != nil { + if len(data) > 0 { + if err := fc.LoadFromBytes(data); err != nil { return nil, fmt.Errorf("parse %s/%s: %v", DirectoryEtcSeaweedFS, FilerConfName, err) } } diff --git a/weed/wdclient/filer_client.go b/weed/wdclient/filer_client.go index 7e6b0b07a..465c2b3ac 100644 --- a/weed/wdclient/filer_client.go +++ b/weed/wdclient/filer_client.go @@ -391,8 +391,9 @@ func isRetryableGrpcError(err error) bool { // shouldSkipUnhealthyFiler checks if we should skip a filer based on recent failures // Circuit breaker pattern: skip filers with multiple recent consecutive failures -func (fc *FilerClient) shouldSkipUnhealthyFiler(index int32) bool { - health := fc.filerHealth[index] +// shouldSkipUnhealthyFilerWithHealth checks if a filer should be skipped based on health +// Uses atomic operations only - safe to call without locks +func (fc *FilerClient) shouldSkipUnhealthyFilerWithHealth(health *filerHealth) bool { failureCount := atomic.LoadInt32(&health.failureCount) // Check if failure count exceeds threshold @@ -413,19 +414,34 @@ func (fc *FilerClient) shouldSkipUnhealthyFiler(index int32) bool { return true // Skip this unhealthy filer } -// recordFilerSuccess resets failure tracking for a successful filer -func (fc *FilerClient) recordFilerSuccess(index int32) { +// Deprecated: Use shouldSkipUnhealthyFilerWithHealth instead +// This function is kept for backward compatibility but requires array access +func (fc *FilerClient) shouldSkipUnhealthyFiler(index int32) bool { health := fc.filerHealth[index] + return fc.shouldSkipUnhealthyFilerWithHealth(health) +} + +// recordFilerSuccessWithHealth resets failure tracking for a successful filer +func (fc *FilerClient) recordFilerSuccessWithHealth(health *filerHealth) { atomic.StoreInt32(&health.failureCount, 0) } -// recordFilerFailure increments failure count for an unhealthy filer -func (fc *FilerClient) recordFilerFailure(index int32) { - health := fc.filerHealth[index] +// recordFilerSuccess resets failure tracking for a successful filer +func (fc *FilerClient) recordFilerSuccess(index int32) { + fc.recordFilerSuccessWithHealth(fc.filerHealth[index]) +} + +// recordFilerFailureWithHealth increments failure count for an unhealthy filer +func (fc *FilerClient) recordFilerFailureWithHealth(health *filerHealth) { atomic.AddInt32(&health.failureCount, 1) atomic.StoreInt64(&health.lastFailureTimeNs, time.Now().UnixNano()) } +// recordFilerFailure increments failure count for an unhealthy filer +func (fc *FilerClient) recordFilerFailure(index int32) { + fc.recordFilerFailureWithHealth(fc.filerHealth[index]) +} + // LookupVolumeIds queries the filer for volume locations with automatic failover // Tries all configured filer addresses until one succeeds (high availability) // Retries transient gRPC errors (Unavailable, DeadlineExceeded, etc.) with exponential backoff @@ -459,11 +475,15 @@ func (p *filerVolumeProvider) LookupVolumeIds(ctx context.Context, volumeIds []s i = 0 } - // Circuit breaker: skip unhealthy filers - if fc.shouldSkipUnhealthyFiler(i) { + // Get health pointer while holding lock + health := fc.filerHealth[i] + filerAddress := fc.filerAddresses[i] + fc.filerAddressesMu.RUnlock() + + // Circuit breaker: skip unhealthy filers (no lock needed - uses atomics) + if fc.shouldSkipUnhealthyFilerWithHealth(health) { glog.V(2).Infof("FilerClient: skipping unhealthy filer %s (consecutive failures: %d)", - fc.filerAddresses[i], atomic.LoadInt32(&fc.filerHealth[i].failureCount)) - fc.filerAddressesMu.RUnlock() + filerAddress, atomic.LoadInt32(&health.failureCount)) i++ if i >= n { i = 0 @@ -471,9 +491,6 @@ func (p *filerVolumeProvider) LookupVolumeIds(ctx context.Context, volumeIds []s continue } - filerAddress := fc.filerAddresses[i] - fc.filerAddressesMu.RUnlock() - // Use anonymous function to ensure defer cancel() is called per iteration, not accumulated err := func() error { // Create a fresh timeout context for each filer attempt @@ -526,7 +543,7 @@ func (p *filerVolumeProvider) LookupVolumeIds(ctx context.Context, volumeIds []s if err != nil { glog.V(1).Infof("FilerClient: filer %s lookup failed (attempt %d/%d, retry %d/%d): %v", filerAddress, x+1, n, retry+1, maxRetries, err) - fc.recordFilerFailure(i) + fc.recordFilerFailureWithHealth(health) lastErr = err i++ if i >= n { @@ -537,7 +554,7 @@ func (p *filerVolumeProvider) LookupVolumeIds(ctx context.Context, volumeIds []s // Success - update the preferred filer index and reset health tracking atomic.StoreInt32(&fc.filerIndex, i) - fc.recordFilerSuccess(i) + fc.recordFilerSuccessWithHealth(health) glog.V(3).Infof("FilerClient: looked up %d volumes on %s, found %d", len(volumeIds), filerAddress, len(result)) return result, nil }