Browse Source

Address CodeRabbit review: fix buffer reuse and improve lock safety

Address two code review suggestions:

1. **Fix buffer reuse in ReadFilerConfFromFilers**:
   - Use local []byte data instead of shared buffer
   - Prevents partial data from failed attempts affecting successful reads
   - Creates fresh buffer inside callback for masterClient path
   - More robust to future changes in read helpers

2. **Improve lock safety in FilerClient**:
   - Add *WithHealth variants that accept health pointer
   - Get health pointer while holding lock, then release before calling
   - Eliminates potential for lock confusion (though no actual deadlock existed)
   - Clearer separation: lock for data access, atomics for health ops

Changes:
- ReadFilerConfFromFilers: var data []byte, create buf inside callback
- shouldSkipUnhealthyFilerWithHealth(health *filerHealth)
- recordFilerSuccessWithHealth(health *filerHealth)
- recordFilerFailureWithHealth(health *filerHealth)
- Keep old functions for backward compatibility (marked deprecated)
- Update LookupVolumeIds to use WithHealth variants

Benefits:
- More robust multi-filer configuration reading
- Clearer lock vs atomic operation boundaries
- No lock held during health checks (even though atomics don't block)
- Better code organization and maintainability
pull/7550/head
Chris Lu 3 days ago
parent
commit
1ce9826c36
  1. 19
      weed/filer/filer_conf.go
  2. 49
      weed/wdclient/filer_client.go

19
weed/filer/filer_conf.go

@ -37,22 +37,29 @@ func ReadFilerConf(filerGrpcAddress pb.ServerAddress, grpcDialOption grpc.DialOp
// ReadFilerConfFromFilers reads filer configuration with multi-filer failover support
func ReadFilerConfFromFilers(filerGrpcAddresses []pb.ServerAddress, grpcDialOption grpc.DialOption, masterClient *wdclient.MasterClient) (*FilerConf, error) {
var buf bytes.Buffer
var data []byte
if err := pb.WithOneOfGrpcFilerClients(false, filerGrpcAddresses, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
if masterClient != nil {
return ReadEntry(masterClient, client, DirectoryEtcSeaweedFS, FilerConfName, &buf)
} else {
var buf bytes.Buffer
if err := ReadEntry(masterClient, client, DirectoryEtcSeaweedFS, FilerConfName, &buf); err != nil {
return err
}
data = buf.Bytes()
return nil
}
content, err := ReadInsideFiler(client, DirectoryEtcSeaweedFS, FilerConfName)
buf = *bytes.NewBuffer(content)
if err != nil {
return err
}
data = content
return nil
}); err != nil && err != filer_pb.ErrNotFound {
return nil, fmt.Errorf("read %s/%s: %v", DirectoryEtcSeaweedFS, FilerConfName, err)
}
fc := NewFilerConf()
if buf.Len() > 0 {
if err := fc.LoadFromBytes(buf.Bytes()); err != nil {
if len(data) > 0 {
if err := fc.LoadFromBytes(data); err != nil {
return nil, fmt.Errorf("parse %s/%s: %v", DirectoryEtcSeaweedFS, FilerConfName, err)
}
}

49
weed/wdclient/filer_client.go

@ -391,8 +391,9 @@ func isRetryableGrpcError(err error) bool {
// shouldSkipUnhealthyFiler checks if we should skip a filer based on recent failures
// Circuit breaker pattern: skip filers with multiple recent consecutive failures
func (fc *FilerClient) shouldSkipUnhealthyFiler(index int32) bool {
health := fc.filerHealth[index]
// shouldSkipUnhealthyFilerWithHealth checks if a filer should be skipped based on health
// Uses atomic operations only - safe to call without locks
func (fc *FilerClient) shouldSkipUnhealthyFilerWithHealth(health *filerHealth) bool {
failureCount := atomic.LoadInt32(&health.failureCount)
// Check if failure count exceeds threshold
@ -413,19 +414,34 @@ func (fc *FilerClient) shouldSkipUnhealthyFiler(index int32) bool {
return true // Skip this unhealthy filer
}
// recordFilerSuccess resets failure tracking for a successful filer
func (fc *FilerClient) recordFilerSuccess(index int32) {
// Deprecated: Use shouldSkipUnhealthyFilerWithHealth instead
// This function is kept for backward compatibility but requires array access
func (fc *FilerClient) shouldSkipUnhealthyFiler(index int32) bool {
health := fc.filerHealth[index]
return fc.shouldSkipUnhealthyFilerWithHealth(health)
}
// recordFilerSuccessWithHealth resets failure tracking for a successful filer
func (fc *FilerClient) recordFilerSuccessWithHealth(health *filerHealth) {
atomic.StoreInt32(&health.failureCount, 0)
}
// recordFilerFailure increments failure count for an unhealthy filer
func (fc *FilerClient) recordFilerFailure(index int32) {
health := fc.filerHealth[index]
// recordFilerSuccess resets failure tracking for a successful filer
func (fc *FilerClient) recordFilerSuccess(index int32) {
fc.recordFilerSuccessWithHealth(fc.filerHealth[index])
}
// recordFilerFailureWithHealth increments failure count for an unhealthy filer
func (fc *FilerClient) recordFilerFailureWithHealth(health *filerHealth) {
atomic.AddInt32(&health.failureCount, 1)
atomic.StoreInt64(&health.lastFailureTimeNs, time.Now().UnixNano())
}
// recordFilerFailure increments failure count for an unhealthy filer
func (fc *FilerClient) recordFilerFailure(index int32) {
fc.recordFilerFailureWithHealth(fc.filerHealth[index])
}
// LookupVolumeIds queries the filer for volume locations with automatic failover
// Tries all configured filer addresses until one succeeds (high availability)
// Retries transient gRPC errors (Unavailable, DeadlineExceeded, etc.) with exponential backoff
@ -459,11 +475,15 @@ func (p *filerVolumeProvider) LookupVolumeIds(ctx context.Context, volumeIds []s
i = 0
}
// Circuit breaker: skip unhealthy filers
if fc.shouldSkipUnhealthyFiler(i) {
glog.V(2).Infof("FilerClient: skipping unhealthy filer %s (consecutive failures: %d)",
fc.filerAddresses[i], atomic.LoadInt32(&fc.filerHealth[i].failureCount))
// Get health pointer while holding lock
health := fc.filerHealth[i]
filerAddress := fc.filerAddresses[i]
fc.filerAddressesMu.RUnlock()
// Circuit breaker: skip unhealthy filers (no lock needed - uses atomics)
if fc.shouldSkipUnhealthyFilerWithHealth(health) {
glog.V(2).Infof("FilerClient: skipping unhealthy filer %s (consecutive failures: %d)",
filerAddress, atomic.LoadInt32(&health.failureCount))
i++
if i >= n {
i = 0
@ -471,9 +491,6 @@ func (p *filerVolumeProvider) LookupVolumeIds(ctx context.Context, volumeIds []s
continue
}
filerAddress := fc.filerAddresses[i]
fc.filerAddressesMu.RUnlock()
// Use anonymous function to ensure defer cancel() is called per iteration, not accumulated
err := func() error {
// Create a fresh timeout context for each filer attempt
@ -526,7 +543,7 @@ func (p *filerVolumeProvider) LookupVolumeIds(ctx context.Context, volumeIds []s
if err != nil {
glog.V(1).Infof("FilerClient: filer %s lookup failed (attempt %d/%d, retry %d/%d): %v", filerAddress, x+1, n, retry+1, maxRetries, err)
fc.recordFilerFailure(i)
fc.recordFilerFailureWithHealth(health)
lastErr = err
i++
if i >= n {
@ -537,7 +554,7 @@ func (p *filerVolumeProvider) LookupVolumeIds(ctx context.Context, volumeIds []s
// Success - update the preferred filer index and reset health tracking
atomic.StoreInt32(&fc.filerIndex, i)
fc.recordFilerSuccess(i)
fc.recordFilerSuccessWithHealth(health)
glog.V(3).Infof("FilerClient: looked up %d volumes on %s, found %d", len(volumeIds), filerAddress, len(result))
return result, nil
}

Loading…
Cancel
Save