Browse Source

Add thread safety to FilerEtcStore and clarify credential store comments

Address review suggestions for better thread safety and code clarity:

1. **Thread Safety**: Add RWMutex to FilerEtcStore
   - Protects filerAddressFunc and grpcDialOption from concurrent access
   - Initialize() uses write lock when setting function
   - SetFilerAddressFunc() uses write lock
   - withFilerClient() uses read lock to get function and dial option
   - GetPolicies() uses read lock to check if configured

2. **Improved Error Messages**:
   - Prefix errors with "filer_etc:" for easier debugging
   - "filer address not configured" → "filer_etc: filer address function not configured"
   - "filer address is empty" → "filer_etc: filer address is empty"

3. **Clarified Comments**:
   - auth_credentials.go: Clarify that initial setup is temporary
   - Document that it's updated in s3api_server.go after FilerClient creation
   - Remove ambiguity about when FilerClient.GetCurrentFiler is used

Benefits:
- Safe for concurrent credential operations
- Clear error messages for debugging
- Explicit documentation of initialization order
pull/7550/head
Chris Lu 3 days ago
parent
commit
8517cebb31
  1. 8
      weed/credential/filer_etc/filer_etc_policy.go
  2. 17
      weed/credential/filer_etc/filer_etc_store.go
  3. 13
      weed/s3api/auth_credentials.go

8
weed/credential/filer_etc/filer_etc_policy.go

@ -20,8 +20,12 @@ func (store *FilerEtcStore) GetPolicies(ctx context.Context) (map[string]policy_
Policies: make(map[string]policy_engine.PolicyDocument), Policies: make(map[string]policy_engine.PolicyDocument),
} }
// Check if filer client is configured
if store.filerAddressFunc == nil {
// Check if filer client is configured (with mutex protection)
store.mu.RLock()
configured := store.filerAddressFunc != nil
store.mu.RUnlock()
if !configured {
glog.V(1).Infof("Filer client not configured for policy retrieval, returning empty policies") glog.V(1).Infof("Filer client not configured for policy retrieval, returning empty policies")
// Return empty policies if filer client is not configured // Return empty policies if filer client is not configured
return policiesCollection.Policies, nil return policiesCollection.Policies, nil

17
weed/credential/filer_etc/filer_etc_store.go

@ -2,6 +2,7 @@ package filer_etc
import ( import (
"fmt" "fmt"
"sync"
"github.com/seaweedfs/seaweedfs/weed/credential" "github.com/seaweedfs/seaweedfs/weed/credential"
"github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb"
@ -18,6 +19,7 @@ func init() {
type FilerEtcStore struct { type FilerEtcStore struct {
filerAddressFunc func() pb.ServerAddress // Function to get current active filer filerAddressFunc func() pb.ServerAddress // Function to get current active filer
grpcDialOption grpc.DialOption grpcDialOption grpc.DialOption
mu sync.RWMutex // Protects filerAddressFunc and grpcDialOption
} }
func (store *FilerEtcStore) GetName() credential.CredentialStoreTypeName { func (store *FilerEtcStore) GetName() credential.CredentialStoreTypeName {
@ -30,9 +32,11 @@ func (store *FilerEtcStore) Initialize(configuration util.Configuration, prefix
filerAddr := configuration.GetString(prefix + "filer") filerAddr := configuration.GetString(prefix + "filer")
if filerAddr != "" { if filerAddr != "" {
// Static configuration - use fixed address // Static configuration - use fixed address
store.mu.Lock()
store.filerAddressFunc = func() pb.ServerAddress { store.filerAddressFunc = func() pb.ServerAddress {
return pb.ServerAddress(filerAddr) return pb.ServerAddress(filerAddr)
} }
store.mu.Unlock()
} }
// TODO: Initialize grpcDialOption based on configuration // TODO: Initialize grpcDialOption based on configuration
} }
@ -43,23 +47,30 @@ func (store *FilerEtcStore) Initialize(configuration util.Configuration, prefix
// SetFilerAddressFunc sets a function that returns the current active filer address // SetFilerAddressFunc sets a function that returns the current active filer address
// This enables high availability by using the currently active filer // This enables high availability by using the currently active filer
func (store *FilerEtcStore) SetFilerAddressFunc(getFiler func() pb.ServerAddress, grpcDialOption grpc.DialOption) { func (store *FilerEtcStore) SetFilerAddressFunc(getFiler func() pb.ServerAddress, grpcDialOption grpc.DialOption) {
store.mu.Lock()
defer store.mu.Unlock()
store.filerAddressFunc = getFiler store.filerAddressFunc = getFiler
store.grpcDialOption = grpcDialOption store.grpcDialOption = grpcDialOption
} }
// withFilerClient executes a function with a filer client // withFilerClient executes a function with a filer client
func (store *FilerEtcStore) withFilerClient(fn func(client filer_pb.SeaweedFilerClient) error) error { func (store *FilerEtcStore) withFilerClient(fn func(client filer_pb.SeaweedFilerClient) error) error {
store.mu.RLock()
if store.filerAddressFunc == nil { if store.filerAddressFunc == nil {
return fmt.Errorf("filer address not configured")
store.mu.RUnlock()
return fmt.Errorf("filer_etc: filer address function not configured")
} }
filerAddress := store.filerAddressFunc() filerAddress := store.filerAddressFunc()
dialOption := store.grpcDialOption
store.mu.RUnlock()
if filerAddress == "" { if filerAddress == "" {
return fmt.Errorf("filer address is empty")
return fmt.Errorf("filer_etc: filer address is empty")
} }
// Use the pb.WithGrpcFilerClient helper similar to existing code // Use the pb.WithGrpcFilerClient helper similar to existing code
return pb.WithGrpcFilerClient(false, 0, filerAddress, store.grpcDialOption, fn)
return pb.WithGrpcFilerClient(false, 0, filerAddress, dialOption, fn)
} }
func (store *FilerEtcStore) Shutdown() { func (store *FilerEtcStore) Shutdown() {

13
weed/s3api/auth_credentials.go

@ -137,18 +137,15 @@ func NewIdentityAccessManagementWithStore(option *S3ApiServerOption, explicitSto
glog.Fatalf("failed to initialize credential manager: %v", err) glog.Fatalf("failed to initialize credential manager: %v", err)
} }
// For stores that need filer client details, set them
// Use SetFilerAddressFunc to provide current active filer for HA
// For stores that need filer client details, set them temporarily
// This will be updated to use FilerClient's GetCurrentFiler after FilerClient is created
if store := credentialManager.GetStore(); store != nil { if store := credentialManager.GetStore(); store != nil {
if filerFuncSetter, ok := store.(interface { if filerFuncSetter, ok := store.(interface {
SetFilerAddressFunc(func() pb.ServerAddress, grpc.DialOption) SetFilerAddressFunc(func() pb.ServerAddress, grpc.DialOption)
}); ok { }); ok {
// Use FilerClient's GetCurrentFiler for HA
// Note: FilerClient is created later, so we need to capture it
// For now, use first filer - this will be updated when FilerClient is available
// Temporary setup: use first filer until FilerClient is available
// See s3api_server.go where this is updated to FilerClient.GetCurrentFiler
if len(option.Filers) > 0 { if len(option.Filers) > 0 {
// Create a closure that will use the first filer initially
// This will be updated later to use FilerClient's current filer
getFiler := func() pb.ServerAddress { getFiler := func() pb.ServerAddress {
if len(option.Filers) > 0 { if len(option.Filers) > 0 {
return option.Filers[0] return option.Filers[0]
@ -156,7 +153,7 @@ func NewIdentityAccessManagementWithStore(option *S3ApiServerOption, explicitSto
return "" return ""
} }
filerFuncSetter.SetFilerAddressFunc(getFiler, option.GrpcDialOption) filerFuncSetter.SetFilerAddressFunc(getFiler, option.GrpcDialOption)
glog.V(1).Infof("Credential store configured with filer function (will be updated to use FilerClient)")
glog.V(1).Infof("Credential store configured with temporary filer function (will be updated after FilerClient creation)")
} }
} }
} }

Loading…
Cancel
Save