Browse Source

Address PR review comments: implement full failover for IAM operations

Critical fixes based on code review feedback:

1. **IAM API Failover (Critical)**:
   - Replace pb.WithGrpcFilerClient with pb.WithOneOfGrpcFilerClients in:
     * GetS3ApiConfigurationFromFiler()
     * PutS3ApiConfigurationToFiler()
     * GetPolicies()
     * PutPolicies()
   - Now all IAM operations support automatic failover across multiple filers

2. **Validation Improvements**:
   - Add validation in NewIamApiServerWithStore() to require at least one filer
   - Add validation in NewS3ApiServerWithStore() to require at least one filer
   - Add warning log when no filers configured for credential store

3. **Error Logging**:
   - Circuit breaker now logs when config load fails instead of silently ignoring
   - Helps operators understand why circuit breaker limits aren't applied

4. **Code Quality**:
   - Use ToGrpcAddress() for filer address in credential store setup
   - More consistent with rest of codebase and future-proof

These changes ensure IAM operations have the same high availability guarantees
as S3 operations, completing the multi-filer failover implementation.
pull/7550/head
Chris Lu 4 days ago
parent
commit
839c450d21
  1. 12
      weed/iamapi/iamapi_server.go
  2. 7
      weed/s3api/auth_credentials.go
  3. 1
      weed/s3api/s3api_circuit_breaker.go
  4. 4
      weed/s3api/s3api_server.go

12
weed/iamapi/iamapi_server.go

@ -69,6 +69,10 @@ func NewIamApiServer(router *mux.Router, option *IamServerOption) (iamApiServer
} }
func NewIamApiServerWithStore(router *mux.Router, option *IamServerOption, explicitStore string) (iamApiServer *IamApiServer, err error) { func NewIamApiServerWithStore(router *mux.Router, option *IamServerOption, explicitStore string) (iamApiServer *IamApiServer, err error) {
if len(option.Filers) == 0 {
return nil, fmt.Errorf("at least one filer address is required")
}
masterClient := wdclient.NewMasterClient(option.GrpcDialOption, "", "iam", "", "", "", *pb.NewServiceDiscoveryFromMap(option.Masters)) masterClient := wdclient.NewMasterClient(option.GrpcDialOption, "", "iam", "", "", "", *pb.NewServiceDiscoveryFromMap(option.Masters))
// Create a cancellable context for the master client connection // Create a cancellable context for the master client connection
@ -158,7 +162,7 @@ func (iama *IamS3ApiConfigure) PutS3ApiConfigurationToCredentialManager(s3cfg *i
func (iama *IamS3ApiConfigure) GetS3ApiConfigurationFromFiler(s3cfg *iam_pb.S3ApiConfiguration) (err error) { func (iama *IamS3ApiConfigure) GetS3ApiConfigurationFromFiler(s3cfg *iam_pb.S3ApiConfiguration) (err error) {
var buf bytes.Buffer var buf bytes.Buffer
err = pb.WithGrpcFilerClient(false, 0, iama.getFilerAddress(), iama.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
err = pb.WithOneOfGrpcFilerClients(false, iama.option.Filers, iama.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
if err = filer.ReadEntry(iama.masterClient, client, filer.IamConfigDirectory, filer.IamIdentityFile, &buf); err != nil { if err = filer.ReadEntry(iama.masterClient, client, filer.IamConfigDirectory, filer.IamIdentityFile, &buf); err != nil {
return err return err
} }
@ -180,7 +184,7 @@ func (iama *IamS3ApiConfigure) PutS3ApiConfigurationToFiler(s3cfg *iam_pb.S3ApiC
if err := filer.ProtoToText(&buf, s3cfg); err != nil { if err := filer.ProtoToText(&buf, s3cfg); err != nil {
return fmt.Errorf("ProtoToText: %s", err) return fmt.Errorf("ProtoToText: %s", err)
} }
return pb.WithGrpcFilerClient(false, 0, iama.getFilerAddress(), iama.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
return pb.WithOneOfGrpcFilerClients(false, iama.option.Filers, iama.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
err = util.Retry("saveIamIdentity", func() error { err = util.Retry("saveIamIdentity", func() error {
return filer.SaveInsideFiler(client, filer.IamConfigDirectory, filer.IamIdentityFile, buf.Bytes()) return filer.SaveInsideFiler(client, filer.IamConfigDirectory, filer.IamIdentityFile, buf.Bytes())
}) })
@ -193,7 +197,7 @@ func (iama *IamS3ApiConfigure) PutS3ApiConfigurationToFiler(s3cfg *iam_pb.S3ApiC
func (iama *IamS3ApiConfigure) GetPolicies(policies *Policies) (err error) { func (iama *IamS3ApiConfigure) GetPolicies(policies *Policies) (err error) {
var buf bytes.Buffer var buf bytes.Buffer
err = pb.WithGrpcFilerClient(false, 0, iama.getFilerAddress(), iama.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
err = pb.WithOneOfGrpcFilerClients(false, iama.option.Filers, iama.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
if err = filer.ReadEntry(iama.masterClient, client, filer.IamConfigDirectory, filer.IamPoliciesFile, &buf); err != nil { if err = filer.ReadEntry(iama.masterClient, client, filer.IamConfigDirectory, filer.IamPoliciesFile, &buf); err != nil {
return err return err
} }
@ -217,7 +221,7 @@ func (iama *IamS3ApiConfigure) PutPolicies(policies *Policies) (err error) {
if b, err = json.Marshal(policies); err != nil { if b, err = json.Marshal(policies); err != nil {
return err return err
} }
return pb.WithGrpcFilerClient(false, 0, iama.getFilerAddress(), iama.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
return pb.WithOneOfGrpcFilerClients(false, iama.option.Filers, iama.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
if err := filer.SaveInsideFiler(client, filer.IamConfigDirectory, filer.IamPoliciesFile, b); err != nil { if err := filer.SaveInsideFiler(client, filer.IamConfigDirectory, filer.IamPoliciesFile, b); err != nil {
return err return err
} }

7
weed/s3api/auth_credentials.go

@ -142,11 +142,12 @@ func NewIdentityAccessManagementWithStore(option *S3ApiServerOption, explicitSto
SetFilerClient(string, grpc.DialOption) SetFilerClient(string, grpc.DialOption)
}); ok { }); ok {
// Use the first filer address for credential store // Use the first filer address for credential store
filerAddr := ""
if len(option.Filers) > 0 { if len(option.Filers) > 0 {
filerAddr = string(option.Filers[0])
}
filerAddr := option.Filers[0].ToGrpcAddress()
filerClientSetter.SetFilerClient(filerAddr, option.GrpcDialOption) filerClientSetter.SetFilerClient(filerAddr, option.GrpcDialOption)
} else {
glog.Warningf("No filer addresses configured for credential store")
}
} }
} }

1
weed/s3api/s3api_circuit_breaker.go

@ -42,6 +42,7 @@ func NewCircuitBreaker(option *S3ApiServerOption) *CircuitBreaker {
}) })
if err != nil { if err != nil {
glog.Warningf("S3 circuit breaker disabled; failed to load config from any filer: %v", err)
} }
return cb return cb

4
weed/s3api/s3api_server.go

@ -69,6 +69,10 @@ func NewS3ApiServer(router *mux.Router, option *S3ApiServerOption) (s3ApiServer
} }
func NewS3ApiServerWithStore(router *mux.Router, option *S3ApiServerOption, explicitStore string) (s3ApiServer *S3ApiServer, err error) { func NewS3ApiServerWithStore(router *mux.Router, option *S3ApiServerOption, explicitStore string) (s3ApiServer *S3ApiServer, err error) {
if len(option.Filers) == 0 {
return nil, fmt.Errorf("at least one filer address is required")
}
startTsNs := time.Now().UnixNano() startTsNs := time.Now().UnixNano()
v := util.GetViper() v := util.GetViper()

Loading…
Cancel
Save