From 4c324fca15c31efd4e0fe941019d2dac23c94e03 Mon Sep 17 00:00:00 2001 From: chrislu Date: Sun, 24 Aug 2025 12:16:19 -0700 Subject: [PATCH] refactor: Enhance existing NewS3ApiServer instead of creating separate IAM function - Add IamConfig field to S3ApiServerOption for optional advanced IAM - Integrate IAM loading logic directly into NewS3ApiServerWithStore - Remove duplicate enhanced_s3_server.go file - Simplify command line logic to use single server constructor - Maintain backward compatibility - standard IAM works without config - Advanced IAM activated automatically when -iam.config is provided This follows better architectural principles by enhancing existing functions rather than creating parallel implementations. --- weed/command/s3.go | 48 ++++----- weed/s3api/enhanced_s3_server.go | 169 ------------------------------- weed/s3api/s3api_server.go | 83 +++++++++++++++ 3 files changed, 101 insertions(+), 199 deletions(-) delete mode 100644 weed/s3api/enhanced_s3_server.go diff --git a/weed/command/s3.go b/weed/command/s3.go index 9e487bab0..92969d076 100644 --- a/weed/command/s3.go +++ b/weed/command/s3.go @@ -242,40 +242,28 @@ func (s3opt *S3Options) startS3Server() bool { var s3ApiServer *s3api.S3ApiServer var s3ApiServer_err error - // Use enhanced S3 server with IAM if config is provided + // Create S3 server with optional advanced IAM integration if *s3opt.iamConfig != "" { glog.V(0).Infof("Starting S3 API Server with advanced IAM integration") - s3ApiServer, s3ApiServer_err = s3api.NewS3ApiServerWithIAM(router, &s3api.S3ApiServerOption{ - Filer: filerAddress, - Port: *s3opt.port, - Config: *s3opt.config, - DomainName: *s3opt.domainName, - AllowedOrigins: strings.Split(*s3opt.allowedOrigins, ","), - BucketsPath: filerBucketsPath, - GrpcDialOption: grpcDialOption, - AllowEmptyFolder: *s3opt.allowEmptyFolder, - AllowDeleteBucketNotEmpty: *s3opt.allowDeleteBucketNotEmpty, - LocalFilerSocket: localFilerSocket, - DataCenter: *s3opt.dataCenter, - FilerGroup: filerGroup, - }, *s3opt.iamConfig) } else { - // Use standard S3 server - s3ApiServer, s3ApiServer_err = s3api.NewS3ApiServer(router, &s3api.S3ApiServerOption{ - Filer: filerAddress, - Port: *s3opt.port, - Config: *s3opt.config, - DomainName: *s3opt.domainName, - AllowedOrigins: strings.Split(*s3opt.allowedOrigins, ","), - BucketsPath: filerBucketsPath, - GrpcDialOption: grpcDialOption, - AllowEmptyFolder: *s3opt.allowEmptyFolder, - AllowDeleteBucketNotEmpty: *s3opt.allowDeleteBucketNotEmpty, - LocalFilerSocket: localFilerSocket, - DataCenter: *s3opt.dataCenter, - FilerGroup: filerGroup, - }) + glog.V(0).Infof("Starting S3 API Server with standard IAM") } + + s3ApiServer, s3ApiServer_err = s3api.NewS3ApiServer(router, &s3api.S3ApiServerOption{ + Filer: filerAddress, + Port: *s3opt.port, + Config: *s3opt.config, + DomainName: *s3opt.domainName, + AllowedOrigins: strings.Split(*s3opt.allowedOrigins, ","), + BucketsPath: filerBucketsPath, + GrpcDialOption: grpcDialOption, + AllowEmptyFolder: *s3opt.allowEmptyFolder, + AllowDeleteBucketNotEmpty: *s3opt.allowDeleteBucketNotEmpty, + LocalFilerSocket: localFilerSocket, + DataCenter: *s3opt.dataCenter, + FilerGroup: filerGroup, + IamConfig: *s3opt.iamConfig, // Advanced IAM config (optional) + }) if s3ApiServer_err != nil { glog.Fatalf("S3 API Server startup error: %v", s3ApiServer_err) } diff --git a/weed/s3api/enhanced_s3_server.go b/weed/s3api/enhanced_s3_server.go deleted file mode 100644 index 5ef0c03ec..000000000 --- a/weed/s3api/enhanced_s3_server.go +++ /dev/null @@ -1,169 +0,0 @@ -package s3api - -import ( - "context" - "encoding/json" - "fmt" - "net" - "net/http" - "os" - "strings" - "time" - - "github.com/gorilla/mux" - "github.com/seaweedfs/seaweedfs/weed/glog" - "github.com/seaweedfs/seaweedfs/weed/iam/integration" - "github.com/seaweedfs/seaweedfs/weed/iam/policy" - "github.com/seaweedfs/seaweedfs/weed/iam/sts" - "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" - "github.com/seaweedfs/seaweedfs/weed/security" - "github.com/seaweedfs/seaweedfs/weed/util" - "github.com/seaweedfs/seaweedfs/weed/util/grace" - util_http "github.com/seaweedfs/seaweedfs/weed/util/http" -) - -// NewS3ApiServerWithIAM creates an S3 API server with advanced IAM integration -func NewS3ApiServerWithIAM(router *mux.Router, option *S3ApiServerOption, iamConfig string) (s3ApiServer *S3ApiServer, err error) { - return NewS3ApiServerWithStoreAndIAM(router, option, "", iamConfig) -} - -// NewS3ApiServerWithStoreAndIAM creates an S3 API server with store and advanced IAM integration -func NewS3ApiServerWithStoreAndIAM(router *mux.Router, option *S3ApiServerOption, explicitStore string, iamConfig string) (s3ApiServer *S3ApiServer, err error) { - startTsNs := time.Now().UnixNano() - - v := util.GetViper() - signingKey := v.GetString("jwt.filer_signing.key") - v.SetDefault("jwt.filer_signing.expires_after_seconds", 10) - expiresAfterSec := v.GetInt("jwt.filer_signing.expires_after_seconds") - - readSigningKey := v.GetString("jwt.filer_signing.read.key") - v.SetDefault("jwt.filer_signing.read.expires_after_seconds", 60) - readExpiresAfterSec := v.GetInt("jwt.filer_signing.read.expires_after_seconds") - - v.SetDefault("cors.allowed_origins.values", "*") - - if len(option.AllowedOrigins) == 0 { - allowedOrigins := v.GetString("cors.allowed_origins.values") - domains := strings.Split(allowedOrigins, ",") - option.AllowedOrigins = domains - } - - var iam *IdentityAccessManagement - - iam = NewIdentityAccessManagementWithStore(option, explicitStore) - - s3ApiServer = &S3ApiServer{ - option: option, - iam: iam, - randomClientId: util.RandomInt32(), - filerGuard: security.NewGuard([]string{}, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec), - cb: NewCircuitBreaker(option), - credentialManager: iam.credentialManager, - bucketConfigCache: NewBucketConfigCache(60 * time.Minute), // Increased TTL since cache is now event-driven - } - - // Initialize advanced IAM system if config is provided - if iamConfig != "" { - glog.V(0).Infof("Initializing advanced IAM system with config: %s", iamConfig) - - // Create IAM manager from config file - iamManager, err := loadIAMManagerFromConfig(context.Background(), iamConfig) - if err != nil { - glog.Errorf("Failed to initialize advanced IAM system: %v", err) - return nil, fmt.Errorf("failed to initialize advanced IAM system: %v", err) - } - - // Set the IAM integration on the server - s3ApiServer.SetIAMIntegration(iamManager) - glog.V(0).Infof("Advanced IAM system initialized successfully") - } - - if option.Config != "" { - grace.OnReload(func() { - if err := s3ApiServer.iam.loadS3ApiConfigurationFromFile(option.Config); err != nil { - glog.Errorf("fail to load config file %s: %v", option.Config, err) - } else { - glog.V(0).Infof("Loaded %d identities from config file %s", len(s3ApiServer.iam.identities), option.Config) - } - }) - } - s3ApiServer.bucketRegistry = NewBucketRegistry(s3ApiServer) - - // Initialize HTTP client - if option.LocalFilerSocket == "" { - if s3ApiServer.client, err = util_http.NewGlobalHttpClient(); err != nil { - return nil, err - } - } else { - s3ApiServer.client = &http.Client{ - Transport: &http.Transport{ - DialContext: func(_ context.Context, _, _ string) (net.Conn, error) { - return net.Dial("unix", option.LocalFilerSocket) - }, - }, - } - } - - s3ApiServer.registerRouter(router) - - grace.OnInterrupt(func() { - s3ApiServer.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { - glog.V(0).Infof("shut down gracefully") - return nil - }) - }) - - util.LoadSecurityConfiguration() - - finishTsNs := time.Now().UnixNano() - glog.V(0).Infof("S3 API Server (with IAM) startup completed in %d ms", (finishTsNs-startTsNs)/1e6) - return s3ApiServer, nil -} - -// ExtendedIAMConfig holds the extended configuration for IAM including roles and policies -type ExtendedIAMConfig struct { - STS *sts.STSConfig `json:"sts"` - Policy *policy.PolicyEngineConfig `json:"policy"` - Roles []integration.RoleDefinition `json:"roles,omitempty"` - Policies []PolicyDefinition `json:"policies,omitempty"` -} - -// PolicyDefinition defines a policy with its document -type PolicyDefinition struct { - Name string `json:"name"` - Document json.RawMessage `json:"document"` -} - -// loadIAMManagerFromConfig loads IAM manager from a JSON config file -func loadIAMManagerFromConfig(ctx context.Context, configPath string) (*integration.IAMManager, error) { - // Read config file - configData, err := os.ReadFile(configPath) - if err != nil { - return nil, fmt.Errorf("failed to read config file: %w", err) - } - - // Parse config - var extendedConfig ExtendedIAMConfig - if err := json.Unmarshal(configData, &extendedConfig); err != nil { - return nil, fmt.Errorf("failed to parse config: %w", err) - } - - // Create basic IAM config from the extended config - config := &integration.IAMConfig{ - STS: extendedConfig.STS, - Policy: extendedConfig.Policy, - } - - // Create and initialize IAM manager - manager := integration.NewIAMManager() - if err := manager.Initialize(config); err != nil { - return nil, fmt.Errorf("failed to initialize IAM manager: %w", err) - } - - // TODO: Set up providers, roles and policies from config - // For now, we'll use a minimal setup that works with our tests - glog.V(1).Infof("IAM manager initialized with %d roles and %d policies from config", - len(extendedConfig.Roles), len(extendedConfig.Policies)) - - return manager, nil -} diff --git a/weed/s3api/s3api_server.go b/weed/s3api/s3api_server.go index 23a8e49a8..f7bbb112d 100644 --- a/weed/s3api/s3api_server.go +++ b/weed/s3api/s3api_server.go @@ -2,15 +2,20 @@ package s3api import ( "context" + "encoding/json" "fmt" "net" "net/http" + "os" "strings" "time" "github.com/seaweedfs/seaweedfs/weed/credential" "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/iam/integration" + "github.com/seaweedfs/seaweedfs/weed/iam/policy" + "github.com/seaweedfs/seaweedfs/weed/iam/sts" "github.com/seaweedfs/seaweedfs/weed/pb/s3_pb" "github.com/seaweedfs/seaweedfs/weed/util/grace" @@ -38,12 +43,14 @@ type S3ApiServerOption struct { LocalFilerSocket string DataCenter string FilerGroup string + IamConfig string // Advanced IAM configuration file path } type S3ApiServer struct { s3_pb.UnimplementedSeaweedS3Server option *S3ApiServerOption iam *IdentityAccessManagement + iamIntegration *S3IAMIntegration // Advanced IAM integration for JWT authentication cb *CircuitBreaker randomClientId int32 filerGuard *security.Guard @@ -91,6 +98,27 @@ func NewS3ApiServerWithStore(router *mux.Router, option *S3ApiServerOption, expl bucketConfigCache: NewBucketConfigCache(60 * time.Minute), // Increased TTL since cache is now event-driven } + // Initialize advanced IAM system if config is provided + if option.IamConfig != "" { + glog.V(0).Infof("Loading advanced IAM configuration from: %s", option.IamConfig) + + iamManager, err := loadIAMManagerFromConfig(option.IamConfig) + if err != nil { + glog.Errorf("Failed to load IAM configuration: %v", err) + } else { + // Create S3 IAM integration with the loaded IAM manager + s3iam := NewS3IAMIntegration(iamManager) + + // Set IAM integration in server + s3ApiServer.iamIntegration = s3iam + + // Set the integration in the traditional IAM for compatibility + iam.SetIAMIntegration(s3iam) + + glog.V(0).Infof("Advanced IAM system initialized successfully") + } + } + if option.Config != "" { grace.OnReload(func() { if err := s3ApiServer.iam.loadS3ApiConfigurationFromFile(option.Config); err != nil { @@ -382,3 +410,58 @@ func (s3a *S3ApiServer) registerRouter(router *mux.Router) { apiRouter.NotFoundHandler = http.HandlerFunc(s3err.NotFoundHandler) } + +// loadIAMManagerFromConfig loads the advanced IAM manager from configuration file +func loadIAMManagerFromConfig(configPath string) (*integration.IAMManager, error) { + // Read configuration file + configData, err := os.ReadFile(configPath) + if err != nil { + return nil, fmt.Errorf("failed to read config file: %w", err) + } + + // Parse configuration structure + var configRoot struct { + STS *sts.STSConfig `json:"sts"` + Policy *policy.PolicyEngineConfig `json:"policy"` + Providers []map[string]interface{} `json:"providers"` + Roles []*integration.RoleDefinition `json:"roles"` + Policies []struct { + Name string `json:"name"` + Document *policy.PolicyDocument `json:"document"` + } `json:"policies"` + } + + if err := json.Unmarshal(configData, &configRoot); err != nil { + return nil, fmt.Errorf("failed to parse config: %w", err) + } + + // Create IAM configuration + iamConfig := &integration.IAMConfig{ + STS: configRoot.STS, + Policy: configRoot.Policy, + } + + // Initialize IAM manager + iamManager := integration.NewIAMManager() + if err := iamManager.Initialize(iamConfig); err != nil { + return nil, fmt.Errorf("failed to initialize IAM manager: %w", err) + } + + // Load policies + for _, policyDef := range configRoot.Policies { + if err := iamManager.CreatePolicy(context.Background(), policyDef.Name, policyDef.Document); err != nil { + glog.Warningf("Failed to create policy %s: %v", policyDef.Name, err) + } + } + + // Load roles + for _, roleDef := range configRoot.Roles { + if err := iamManager.CreateRole(context.Background(), roleDef.RoleName, roleDef); err != nil { + glog.Warningf("Failed to create role %s: %v", roleDef.RoleName, err) + } + } + + glog.V(0).Infof("Loaded %d policies and %d roles from config", len(configRoot.Policies), len(configRoot.Roles)) + + return iamManager, nil +}