Browse Source

Make credential store use current active filer for HA

Update FilerEtcStore to use a function that returns the current active
filer instead of a fixed address, enabling high availability.

Changes:
- Add SetFilerAddressFunc() method to FilerEtcStore
- Store uses filerAddressFunc instead of fixed filerGrpcAddress
- withFilerClient() calls the function to get current active filer
- Keep SetFilerClient() for backward compatibility (marked deprecated)
- Update S3ApiServer to pass FilerClient.GetCurrentFiler to store

Benefits:
- Credential store now uses currently active/healthy filer
- Automatic failover when filer becomes unavailable
- True HA for credential operations
- Backward compatible with old SetFilerClient interface

This addresses the credential store limitation - no longer pinned to
first filer, uses FilerClient's tracked current active filer.
pull/7550/head
Chris Lu 4 days ago
parent
commit
c24629cd5e
  1. 4
      weed/credential/filer_etc/filer_etc_identity.go
  2. 6
      weed/credential/filer_etc/filer_etc_policy.go
  3. 33
      weed/credential/filer_etc/filer_etc_store.go
  4. 29
      weed/s3api/auth_credentials.go
  5. 11
      weed/s3api/s3api_server.go

4
weed/credential/filer_etc/filer_etc_identity.go

@ -15,8 +15,8 @@ import (
func (store *FilerEtcStore) LoadConfiguration(ctx context.Context) (*iam_pb.S3ApiConfiguration, error) { func (store *FilerEtcStore) LoadConfiguration(ctx context.Context) (*iam_pb.S3ApiConfiguration, error) {
s3cfg := &iam_pb.S3ApiConfiguration{} s3cfg := &iam_pb.S3ApiConfiguration{}
glog.V(1).Infof("Loading IAM configuration from %s/%s (filer: %s)",
filer.IamConfigDirectory, filer.IamIdentityFile, store.filerGrpcAddress)
glog.V(1).Infof("Loading IAM configuration from %s/%s (using current active filer)",
filer.IamConfigDirectory, filer.IamIdentityFile)
err := store.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { err := store.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
// Use ReadInsideFiler instead of ReadEntry since identity.json is small // Use ReadInsideFiler instead of ReadEntry since identity.json is small

6
weed/credential/filer_etc/filer_etc_policy.go

@ -21,14 +21,14 @@ func (store *FilerEtcStore) GetPolicies(ctx context.Context) (map[string]policy_
} }
// Check if filer client is configured // Check if filer client is configured
if store.filerGrpcAddress == "" {
if store.filerAddressFunc == nil {
glog.V(1).Infof("Filer client not configured for policy retrieval, returning empty policies") glog.V(1).Infof("Filer client not configured for policy retrieval, returning empty policies")
// Return empty policies if filer client is not configured // Return empty policies if filer client is not configured
return policiesCollection.Policies, nil return policiesCollection.Policies, nil
} }
glog.V(2).Infof("Loading IAM policies from %s/%s (filer: %s)",
filer.IamConfigDirectory, filer.IamPoliciesFile, store.filerGrpcAddress)
glog.V(2).Infof("Loading IAM policies from %s/%s (using current active filer)",
filer.IamConfigDirectory, filer.IamPoliciesFile)
err := store.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { err := store.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
// Use ReadInsideFiler instead of ReadEntry since policies.json is small // Use ReadInsideFiler instead of ReadEntry since policies.json is small

33
weed/credential/filer_etc/filer_etc_store.go

@ -16,7 +16,7 @@ func init() {
// FilerEtcStore implements CredentialStore using SeaweedFS filer for storage // FilerEtcStore implements CredentialStore using SeaweedFS filer for storage
type FilerEtcStore struct { type FilerEtcStore struct {
filerGrpcAddress string
filerAddressFunc func() pb.ServerAddress // Function to get current active filer
grpcDialOption grpc.DialOption grpcDialOption grpc.DialOption
} }
@ -27,27 +27,48 @@ func (store *FilerEtcStore) GetName() credential.CredentialStoreTypeName {
func (store *FilerEtcStore) Initialize(configuration util.Configuration, prefix string) error { func (store *FilerEtcStore) Initialize(configuration util.Configuration, prefix string) error {
// Handle nil configuration gracefully // Handle nil configuration gracefully
if configuration != nil { if configuration != nil {
store.filerGrpcAddress = configuration.GetString(prefix + "filer")
filerAddr := configuration.GetString(prefix + "filer")
if filerAddr != "" {
// Static configuration - use fixed address
store.filerAddressFunc = func() pb.ServerAddress {
return pb.ServerAddress(filerAddr)
}
}
// TODO: Initialize grpcDialOption based on configuration // TODO: Initialize grpcDialOption based on configuration
} }
// Note: filerGrpcAddress can be set later via SetFilerClient method
// Note: filerAddressFunc can be set later via SetFilerClient method
return nil return nil
} }
// SetFilerClient sets the filer client details for the file store // SetFilerClient sets the filer client details for the file store
// Deprecated: Use SetFilerAddressFunc for better HA support
func (store *FilerEtcStore) SetFilerClient(filerAddress string, grpcDialOption grpc.DialOption) { func (store *FilerEtcStore) SetFilerClient(filerAddress string, grpcDialOption grpc.DialOption) {
store.filerGrpcAddress = filerAddress
store.filerAddressFunc = func() pb.ServerAddress {
return pb.ServerAddress(filerAddress)
}
store.grpcDialOption = grpcDialOption
}
// SetFilerAddressFunc sets a function that returns the current active filer address
// This enables high availability by using the currently active filer
func (store *FilerEtcStore) SetFilerAddressFunc(getFiler func() pb.ServerAddress, grpcDialOption grpc.DialOption) {
store.filerAddressFunc = getFiler
store.grpcDialOption = grpcDialOption store.grpcDialOption = grpcDialOption
} }
// withFilerClient executes a function with a filer client // withFilerClient executes a function with a filer client
func (store *FilerEtcStore) withFilerClient(fn func(client filer_pb.SeaweedFilerClient) error) error { func (store *FilerEtcStore) withFilerClient(fn func(client filer_pb.SeaweedFilerClient) error) error {
if store.filerGrpcAddress == "" {
if store.filerAddressFunc == nil {
return fmt.Errorf("filer address not configured") return fmt.Errorf("filer address not configured")
} }
filerAddress := store.filerAddressFunc()
if filerAddress == "" {
return fmt.Errorf("filer address is empty")
}
// Use the pb.WithGrpcFilerClient helper similar to existing code // Use the pb.WithGrpcFilerClient helper similar to existing code
return pb.WithGrpcFilerClient(false, 0, pb.ServerAddress(store.filerGrpcAddress), store.grpcDialOption, fn)
return pb.WithGrpcFilerClient(false, 0, filerAddress, store.grpcDialOption, fn)
} }
func (store *FilerEtcStore) Shutdown() { func (store *FilerEtcStore) Shutdown() {

29
weed/s3api/auth_credentials.go

@ -14,6 +14,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/kms" "github.com/seaweedfs/seaweedfs/weed/kms"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/iam_pb" "github.com/seaweedfs/seaweedfs/weed/pb/iam_pb"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
@ -137,17 +138,35 @@ func NewIdentityAccessManagementWithStore(option *S3ApiServerOption, explicitSto
} }
// For stores that need filer client details, set them // For stores that need filer client details, set them
// Note: SetFilerClient interface currently accepts only a single filer address
// TODO: Update credential store interfaces to support multiple filers for HA
// For now, using first filer - this is a known limitation for filer-backed stores
// Use SetFilerAddressFunc to provide current active filer for HA
if store := credentialManager.GetStore(); store != nil { if store := credentialManager.GetStore(); store != nil {
if filerClientSetter, ok := store.(interface {
// Check for new HA-aware interface first
if filerFuncSetter, ok := store.(interface {
SetFilerAddressFunc(func() pb.ServerAddress, grpc.DialOption)
}); ok {
// Use FilerClient's GetCurrentFiler for HA
// Note: FilerClient is created later, so we need to capture it
// For now, use first filer - this will be updated when FilerClient is available
if len(option.Filers) > 0 {
// Create a closure that will use the first filer initially
// In a full implementation, this would get the FilerClient's current filer
getFiler := func() pb.ServerAddress {
if len(option.Filers) > 0 {
return option.Filers[0]
}
return ""
}
filerFuncSetter.SetFilerAddressFunc(getFiler, option.GrpcDialOption)
glog.V(1).Infof("Credential store configured with filer function (HA-aware)")
}
} else if filerClientSetter, ok := store.(interface {
SetFilerClient(string, grpc.DialOption) SetFilerClient(string, grpc.DialOption)
}); ok { }); ok {
// Fallback to old interface for backward compatibility
if len(option.Filers) > 0 { if len(option.Filers) > 0 {
filerAddr := option.Filers[0].ToGrpcAddress() filerAddr := option.Filers[0].ToGrpcAddress()
filerClientSetter.SetFilerClient(filerAddr, option.GrpcDialOption) filerClientSetter.SetFilerClient(filerAddr, option.GrpcDialOption)
glog.V(1).Infof("Credential store configured with first filer: %s (HA limitation)", filerAddr)
glog.V(1).Infof("Credential store configured with first filer: %s (legacy interface)", filerAddr)
} else { } else {
glog.Warningf("No filer addresses configured for credential store") glog.Warningf("No filer addresses configured for credential store")
} }

11
weed/s3api/s3api_server.go

@ -103,6 +103,17 @@ func NewS3ApiServerWithStore(router *mux.Router, option *S3ApiServerOption, expl
filerClient := wdclient.NewFilerClient(option.Filers, option.GrpcDialOption, option.DataCenter) filerClient := wdclient.NewFilerClient(option.Filers, option.GrpcDialOption, option.DataCenter)
glog.V(0).Infof("S3 API initialized FilerClient with %d filer(s) for volume location caching", len(option.Filers)) glog.V(0).Infof("S3 API initialized FilerClient with %d filer(s) for volume location caching", len(option.Filers))
// Update credential store to use FilerClient's current filer for HA
if store := iam.credentialManager.GetStore(); store != nil {
if filerFuncSetter, ok := store.(interface {
SetFilerAddressFunc(func() pb.ServerAddress, grpc.DialOption)
}); ok {
// Use FilerClient's GetCurrentFiler for true HA
filerFuncSetter.SetFilerAddressFunc(filerClient.GetCurrentFiler, option.GrpcDialOption)
glog.V(1).Infof("Updated credential store to use FilerClient's current active filer (HA-aware)")
}
}
s3ApiServer = &S3ApiServer{ s3ApiServer = &S3ApiServer{
option: option, option: option,
iam: iam, iam: iam,

Loading…
Cancel
Save