Browse Source

refactor: Enhance existing NewS3ApiServer instead of creating separate IAM function

- Add IamConfig field to S3ApiServerOption for optional advanced IAM
- Integrate IAM loading logic directly into NewS3ApiServerWithStore
- Remove duplicate enhanced_s3_server.go file
- Simplify command line logic to use single server constructor
- Maintain backward compatibility - standard IAM works without config
- Advanced IAM activated automatically when -iam.config is provided

This follows better architectural principles by enhancing existing
functions rather than creating parallel implementations.
pull/7160/head
chrislu 1 month ago
parent
commit
4c324fca15
  1. 48
      weed/command/s3.go
  2. 169
      weed/s3api/enhanced_s3_server.go
  3. 83
      weed/s3api/s3api_server.go

48
weed/command/s3.go

@ -242,40 +242,28 @@ func (s3opt *S3Options) startS3Server() bool {
var s3ApiServer *s3api.S3ApiServer var s3ApiServer *s3api.S3ApiServer
var s3ApiServer_err error var s3ApiServer_err error
// Use enhanced S3 server with IAM if config is provided
// Create S3 server with optional advanced IAM integration
if *s3opt.iamConfig != "" { if *s3opt.iamConfig != "" {
glog.V(0).Infof("Starting S3 API Server with advanced IAM integration") glog.V(0).Infof("Starting S3 API Server with advanced IAM integration")
s3ApiServer, s3ApiServer_err = s3api.NewS3ApiServerWithIAM(router, &s3api.S3ApiServerOption{
Filer: filerAddress,
Port: *s3opt.port,
Config: *s3opt.config,
DomainName: *s3opt.domainName,
AllowedOrigins: strings.Split(*s3opt.allowedOrigins, ","),
BucketsPath: filerBucketsPath,
GrpcDialOption: grpcDialOption,
AllowEmptyFolder: *s3opt.allowEmptyFolder,
AllowDeleteBucketNotEmpty: *s3opt.allowDeleteBucketNotEmpty,
LocalFilerSocket: localFilerSocket,
DataCenter: *s3opt.dataCenter,
FilerGroup: filerGroup,
}, *s3opt.iamConfig)
} else { } else {
// Use standard S3 server
s3ApiServer, s3ApiServer_err = s3api.NewS3ApiServer(router, &s3api.S3ApiServerOption{
Filer: filerAddress,
Port: *s3opt.port,
Config: *s3opt.config,
DomainName: *s3opt.domainName,
AllowedOrigins: strings.Split(*s3opt.allowedOrigins, ","),
BucketsPath: filerBucketsPath,
GrpcDialOption: grpcDialOption,
AllowEmptyFolder: *s3opt.allowEmptyFolder,
AllowDeleteBucketNotEmpty: *s3opt.allowDeleteBucketNotEmpty,
LocalFilerSocket: localFilerSocket,
DataCenter: *s3opt.dataCenter,
FilerGroup: filerGroup,
})
glog.V(0).Infof("Starting S3 API Server with standard IAM")
} }
s3ApiServer, s3ApiServer_err = s3api.NewS3ApiServer(router, &s3api.S3ApiServerOption{
Filer: filerAddress,
Port: *s3opt.port,
Config: *s3opt.config,
DomainName: *s3opt.domainName,
AllowedOrigins: strings.Split(*s3opt.allowedOrigins, ","),
BucketsPath: filerBucketsPath,
GrpcDialOption: grpcDialOption,
AllowEmptyFolder: *s3opt.allowEmptyFolder,
AllowDeleteBucketNotEmpty: *s3opt.allowDeleteBucketNotEmpty,
LocalFilerSocket: localFilerSocket,
DataCenter: *s3opt.dataCenter,
FilerGroup: filerGroup,
IamConfig: *s3opt.iamConfig, // Advanced IAM config (optional)
})
if s3ApiServer_err != nil { if s3ApiServer_err != nil {
glog.Fatalf("S3 API Server startup error: %v", s3ApiServer_err) glog.Fatalf("S3 API Server startup error: %v", s3ApiServer_err)
} }

169
weed/s3api/enhanced_s3_server.go

@ -1,169 +0,0 @@
package s3api
import (
"context"
"encoding/json"
"fmt"
"net"
"net/http"
"os"
"strings"
"time"
"github.com/gorilla/mux"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/iam/integration"
"github.com/seaweedfs/seaweedfs/weed/iam/policy"
"github.com/seaweedfs/seaweedfs/weed/iam/sts"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/security"
"github.com/seaweedfs/seaweedfs/weed/util"
"github.com/seaweedfs/seaweedfs/weed/util/grace"
util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
// NewS3ApiServerWithIAM creates an S3 API server with advanced IAM integration
func NewS3ApiServerWithIAM(router *mux.Router, option *S3ApiServerOption, iamConfig string) (s3ApiServer *S3ApiServer, err error) {
return NewS3ApiServerWithStoreAndIAM(router, option, "", iamConfig)
}
// NewS3ApiServerWithStoreAndIAM creates an S3 API server with store and advanced IAM integration
func NewS3ApiServerWithStoreAndIAM(router *mux.Router, option *S3ApiServerOption, explicitStore string, iamConfig string) (s3ApiServer *S3ApiServer, err error) {
startTsNs := time.Now().UnixNano()
v := util.GetViper()
signingKey := v.GetString("jwt.filer_signing.key")
v.SetDefault("jwt.filer_signing.expires_after_seconds", 10)
expiresAfterSec := v.GetInt("jwt.filer_signing.expires_after_seconds")
readSigningKey := v.GetString("jwt.filer_signing.read.key")
v.SetDefault("jwt.filer_signing.read.expires_after_seconds", 60)
readExpiresAfterSec := v.GetInt("jwt.filer_signing.read.expires_after_seconds")
v.SetDefault("cors.allowed_origins.values", "*")
if len(option.AllowedOrigins) == 0 {
allowedOrigins := v.GetString("cors.allowed_origins.values")
domains := strings.Split(allowedOrigins, ",")
option.AllowedOrigins = domains
}
var iam *IdentityAccessManagement
iam = NewIdentityAccessManagementWithStore(option, explicitStore)
s3ApiServer = &S3ApiServer{
option: option,
iam: iam,
randomClientId: util.RandomInt32(),
filerGuard: security.NewGuard([]string{}, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec),
cb: NewCircuitBreaker(option),
credentialManager: iam.credentialManager,
bucketConfigCache: NewBucketConfigCache(60 * time.Minute), // Increased TTL since cache is now event-driven
}
// Initialize advanced IAM system if config is provided
if iamConfig != "" {
glog.V(0).Infof("Initializing advanced IAM system with config: %s", iamConfig)
// Create IAM manager from config file
iamManager, err := loadIAMManagerFromConfig(context.Background(), iamConfig)
if err != nil {
glog.Errorf("Failed to initialize advanced IAM system: %v", err)
return nil, fmt.Errorf("failed to initialize advanced IAM system: %v", err)
}
// Set the IAM integration on the server
s3ApiServer.SetIAMIntegration(iamManager)
glog.V(0).Infof("Advanced IAM system initialized successfully")
}
if option.Config != "" {
grace.OnReload(func() {
if err := s3ApiServer.iam.loadS3ApiConfigurationFromFile(option.Config); err != nil {
glog.Errorf("fail to load config file %s: %v", option.Config, err)
} else {
glog.V(0).Infof("Loaded %d identities from config file %s", len(s3ApiServer.iam.identities), option.Config)
}
})
}
s3ApiServer.bucketRegistry = NewBucketRegistry(s3ApiServer)
// Initialize HTTP client
if option.LocalFilerSocket == "" {
if s3ApiServer.client, err = util_http.NewGlobalHttpClient(); err != nil {
return nil, err
}
} else {
s3ApiServer.client = &http.Client{
Transport: &http.Transport{
DialContext: func(_ context.Context, _, _ string) (net.Conn, error) {
return net.Dial("unix", option.LocalFilerSocket)
},
},
}
}
s3ApiServer.registerRouter(router)
grace.OnInterrupt(func() {
s3ApiServer.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
glog.V(0).Infof("shut down gracefully")
return nil
})
})
util.LoadSecurityConfiguration()
finishTsNs := time.Now().UnixNano()
glog.V(0).Infof("S3 API Server (with IAM) startup completed in %d ms", (finishTsNs-startTsNs)/1e6)
return s3ApiServer, nil
}
// ExtendedIAMConfig holds the extended configuration for IAM including roles and policies
type ExtendedIAMConfig struct {
STS *sts.STSConfig `json:"sts"`
Policy *policy.PolicyEngineConfig `json:"policy"`
Roles []integration.RoleDefinition `json:"roles,omitempty"`
Policies []PolicyDefinition `json:"policies,omitempty"`
}
// PolicyDefinition defines a policy with its document
type PolicyDefinition struct {
Name string `json:"name"`
Document json.RawMessage `json:"document"`
}
// loadIAMManagerFromConfig loads IAM manager from a JSON config file
func loadIAMManagerFromConfig(ctx context.Context, configPath string) (*integration.IAMManager, error) {
// Read config file
configData, err := os.ReadFile(configPath)
if err != nil {
return nil, fmt.Errorf("failed to read config file: %w", err)
}
// Parse config
var extendedConfig ExtendedIAMConfig
if err := json.Unmarshal(configData, &extendedConfig); err != nil {
return nil, fmt.Errorf("failed to parse config: %w", err)
}
// Create basic IAM config from the extended config
config := &integration.IAMConfig{
STS: extendedConfig.STS,
Policy: extendedConfig.Policy,
}
// Create and initialize IAM manager
manager := integration.NewIAMManager()
if err := manager.Initialize(config); err != nil {
return nil, fmt.Errorf("failed to initialize IAM manager: %w", err)
}
// TODO: Set up providers, roles and policies from config
// For now, we'll use a minimal setup that works with our tests
glog.V(1).Infof("IAM manager initialized with %d roles and %d policies from config",
len(extendedConfig.Roles), len(extendedConfig.Policies))
return manager, nil
}

83
weed/s3api/s3api_server.go

@ -2,15 +2,20 @@ package s3api
import ( import (
"context" "context"
"encoding/json"
"fmt" "fmt"
"net" "net"
"net/http" "net/http"
"os"
"strings" "strings"
"time" "time"
"github.com/seaweedfs/seaweedfs/weed/credential" "github.com/seaweedfs/seaweedfs/weed/credential"
"github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/iam/integration"
"github.com/seaweedfs/seaweedfs/weed/iam/policy"
"github.com/seaweedfs/seaweedfs/weed/iam/sts"
"github.com/seaweedfs/seaweedfs/weed/pb/s3_pb" "github.com/seaweedfs/seaweedfs/weed/pb/s3_pb"
"github.com/seaweedfs/seaweedfs/weed/util/grace" "github.com/seaweedfs/seaweedfs/weed/util/grace"
@ -38,12 +43,14 @@ type S3ApiServerOption struct {
LocalFilerSocket string LocalFilerSocket string
DataCenter string DataCenter string
FilerGroup string FilerGroup string
IamConfig string // Advanced IAM configuration file path
} }
type S3ApiServer struct { type S3ApiServer struct {
s3_pb.UnimplementedSeaweedS3Server s3_pb.UnimplementedSeaweedS3Server
option *S3ApiServerOption option *S3ApiServerOption
iam *IdentityAccessManagement iam *IdentityAccessManagement
iamIntegration *S3IAMIntegration // Advanced IAM integration for JWT authentication
cb *CircuitBreaker cb *CircuitBreaker
randomClientId int32 randomClientId int32
filerGuard *security.Guard filerGuard *security.Guard
@ -91,6 +98,27 @@ func NewS3ApiServerWithStore(router *mux.Router, option *S3ApiServerOption, expl
bucketConfigCache: NewBucketConfigCache(60 * time.Minute), // Increased TTL since cache is now event-driven bucketConfigCache: NewBucketConfigCache(60 * time.Minute), // Increased TTL since cache is now event-driven
} }
// Initialize advanced IAM system if config is provided
if option.IamConfig != "" {
glog.V(0).Infof("Loading advanced IAM configuration from: %s", option.IamConfig)
iamManager, err := loadIAMManagerFromConfig(option.IamConfig)
if err != nil {
glog.Errorf("Failed to load IAM configuration: %v", err)
} else {
// Create S3 IAM integration with the loaded IAM manager
s3iam := NewS3IAMIntegration(iamManager)
// Set IAM integration in server
s3ApiServer.iamIntegration = s3iam
// Set the integration in the traditional IAM for compatibility
iam.SetIAMIntegration(s3iam)
glog.V(0).Infof("Advanced IAM system initialized successfully")
}
}
if option.Config != "" { if option.Config != "" {
grace.OnReload(func() { grace.OnReload(func() {
if err := s3ApiServer.iam.loadS3ApiConfigurationFromFile(option.Config); err != nil { if err := s3ApiServer.iam.loadS3ApiConfigurationFromFile(option.Config); err != nil {
@ -382,3 +410,58 @@ func (s3a *S3ApiServer) registerRouter(router *mux.Router) {
apiRouter.NotFoundHandler = http.HandlerFunc(s3err.NotFoundHandler) apiRouter.NotFoundHandler = http.HandlerFunc(s3err.NotFoundHandler)
} }
// loadIAMManagerFromConfig loads the advanced IAM manager from configuration file
func loadIAMManagerFromConfig(configPath string) (*integration.IAMManager, error) {
// Read configuration file
configData, err := os.ReadFile(configPath)
if err != nil {
return nil, fmt.Errorf("failed to read config file: %w", err)
}
// Parse configuration structure
var configRoot struct {
STS *sts.STSConfig `json:"sts"`
Policy *policy.PolicyEngineConfig `json:"policy"`
Providers []map[string]interface{} `json:"providers"`
Roles []*integration.RoleDefinition `json:"roles"`
Policies []struct {
Name string `json:"name"`
Document *policy.PolicyDocument `json:"document"`
} `json:"policies"`
}
if err := json.Unmarshal(configData, &configRoot); err != nil {
return nil, fmt.Errorf("failed to parse config: %w", err)
}
// Create IAM configuration
iamConfig := &integration.IAMConfig{
STS: configRoot.STS,
Policy: configRoot.Policy,
}
// Initialize IAM manager
iamManager := integration.NewIAMManager()
if err := iamManager.Initialize(iamConfig); err != nil {
return nil, fmt.Errorf("failed to initialize IAM manager: %w", err)
}
// Load policies
for _, policyDef := range configRoot.Policies {
if err := iamManager.CreatePolicy(context.Background(), policyDef.Name, policyDef.Document); err != nil {
glog.Warningf("Failed to create policy %s: %v", policyDef.Name, err)
}
}
// Load roles
for _, roleDef := range configRoot.Roles {
if err := iamManager.CreateRole(context.Background(), roleDef.RoleName, roleDef); err != nil {
glog.Warningf("Failed to create role %s: %v", roleDef.RoleName, err)
}
}
glog.V(0).Infof("Loaded %d policies and %d roles from config", len(configRoot.Policies), len(configRoot.Roles))
return iamManager, nil
}
Loading…
Cancel
Save