Browse Source

Support multiple filers for S3 and IAM servers with automatic failover

This change adds support for multiple filer addresses in the 'weed s3' and 'weed iam' commands, enabling high availability through automatic failover.

Key changes:
- Updated S3ApiServerOption.Filer to Filers ([]pb.ServerAddress)
- Updated IamServerOption.Filer to Filers ([]pb.ServerAddress)
- Modified -filer flag to accept comma-separated addresses
- Added getFilerAddress() helper methods for backward compatibility
- Updated all filer client calls to support multiple addresses
- Uses pb.WithOneOfGrpcFilerClients for automatic failover

Usage:
  weed s3 -filer=localhost:8888,localhost:8889
  weed iam -filer=localhost:8888,localhost:8889

The underlying FilerClient already supported multiple filers with health
tracking and automatic failover - this change exposes that capability
through the command-line interface.
pull/7550/head
Chris Lu 4 days ago
parent
commit
e537852ec6
  1. 20
      weed/command/iam.go
  2. 23
      weed/command/s3.go
  3. 21
      weed/iamapi/iamapi_server.go
  4. 7
      weed/s3api/auth_credentials.go
  5. 8
      weed/s3api/filer_multipart.go
  6. 6
      weed/s3api/s3api_bucket_handlers.go
  7. 3
      weed/s3api/s3api_circuit_breaker.go
  8. 2
      weed/s3api/s3api_handlers.go
  9. 2
      weed/s3api/s3api_object_handlers.go
  10. 2
      weed/s3api/s3api_object_handlers_multipart.go
  11. 2
      weed/s3api/s3api_object_handlers_postpolicy.go
  12. 31
      weed/s3api/s3api_server.go

20
weed/command/iam.go

@ -35,16 +35,18 @@ type IamOptions struct {
func init() { func init() {
cmdIam.Run = runIam // break init cycle cmdIam.Run = runIam // break init cycle
iamStandaloneOptions.filer = cmdIam.Flag.String("filer", "localhost:8888", "filer server address")
iamStandaloneOptions.filer = cmdIam.Flag.String("filer", "localhost:8888", "comma-separated filer server addresses for high availability")
iamStandaloneOptions.masters = cmdIam.Flag.String("master", "localhost:9333", "comma-separated master servers") iamStandaloneOptions.masters = cmdIam.Flag.String("master", "localhost:9333", "comma-separated master servers")
iamStandaloneOptions.ip = cmdIam.Flag.String("ip", util.DetectedHostAddress(), "iam server http listen ip address") iamStandaloneOptions.ip = cmdIam.Flag.String("ip", util.DetectedHostAddress(), "iam server http listen ip address")
iamStandaloneOptions.port = cmdIam.Flag.Int("port", 8111, "iam server http listen port") iamStandaloneOptions.port = cmdIam.Flag.Int("port", 8111, "iam server http listen port")
} }
var cmdIam = &Command{ var cmdIam = &Command{
UsageLine: "iam [-port=8111] [-filer=<ip:port>] [-master=<ip:port>,<ip:port>]",
UsageLine: "iam [-port=8111] [-filer=<ip:port>[,<ip:port>]...] [-master=<ip:port>,<ip:port>]",
Short: "start a iam API compatible server", Short: "start a iam API compatible server",
Long: "start a iam API compatible server.",
Long: `start a iam API compatible server.
Multiple filer addresses can be specified for high availability, separated by commas.`,
} }
func runIam(cmd *Command, args []string) bool { func runIam(cmd *Command, args []string) bool {
@ -52,24 +54,24 @@ func runIam(cmd *Command, args []string) bool {
} }
func (iamopt *IamOptions) startIamServer() bool { func (iamopt *IamOptions) startIamServer() bool {
filerAddress := pb.ServerAddress(*iamopt.filer)
filerAddresses := pb.ServerAddresses(*iamopt.filer).ToAddresses()
util.LoadSecurityConfiguration() util.LoadSecurityConfiguration()
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
for { for {
err := pb.WithGrpcFilerClient(false, 0, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
err := pb.WithOneOfGrpcFilerClients(false, filerAddresses, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
if err != nil { if err != nil {
return fmt.Errorf("get filer %s configuration: %v", filerAddress, err)
return fmt.Errorf("get filer configuration: %v", err)
} }
glog.V(0).Infof("IAM read filer configuration: %s", resp) glog.V(0).Infof("IAM read filer configuration: %s", resp)
return nil return nil
}) })
if err != nil { if err != nil {
glog.V(0).Infof("wait to connect to filer %s grpc address %s", *iamopt.filer, filerAddress.ToGrpcAddress())
glog.V(0).Infof("wait to connect to filers %v", filerAddresses)
time.Sleep(time.Second) time.Sleep(time.Second)
} else { } else {
glog.V(0).Infof("connected to filer %s grpc address %s", *iamopt.filer, filerAddress.ToGrpcAddress())
glog.V(0).Infof("connected to filers %v", filerAddresses)
break break
} }
} }
@ -78,7 +80,7 @@ func (iamopt *IamOptions) startIamServer() bool {
router := mux.NewRouter().SkipClean(true) router := mux.NewRouter().SkipClean(true)
iamApiServer, iamApiServer_err := iamapi.NewIamApiServer(router, &iamapi.IamServerOption{ iamApiServer, iamApiServer_err := iamapi.NewIamApiServer(router, &iamapi.IamServerOption{
Masters: masters, Masters: masters,
Filer: filerAddress,
Filers: filerAddresses,
Port: *iamopt.port, Port: *iamopt.port,
GrpcDialOption: grpcDialOption, GrpcDialOption: grpcDialOption,
}) })

23
weed/command/s3.go

@ -61,7 +61,7 @@ type S3Options struct {
func init() { func init() {
cmdS3.Run = runS3 // break init cycle cmdS3.Run = runS3 // break init cycle
s3StandaloneOptions.filer = cmdS3.Flag.String("filer", "localhost:8888", "filer server address")
s3StandaloneOptions.filer = cmdS3.Flag.String("filer", "localhost:8888", "comma-separated filer server addresses for high availability")
s3StandaloneOptions.bindIp = cmdS3.Flag.String("ip.bind", "", "ip address to bind to. Default to localhost.") s3StandaloneOptions.bindIp = cmdS3.Flag.String("ip.bind", "", "ip address to bind to. Default to localhost.")
s3StandaloneOptions.port = cmdS3.Flag.Int("port", 8333, "s3 server http listen port") s3StandaloneOptions.port = cmdS3.Flag.Int("port", 8333, "s3 server http listen port")
s3StandaloneOptions.portHttps = cmdS3.Flag.Int("port.https", 0, "s3 server https listen port") s3StandaloneOptions.portHttps = cmdS3.Flag.Int("port.https", 0, "s3 server https listen port")
@ -86,9 +86,12 @@ func init() {
} }
var cmdS3 = &Command{ var cmdS3 = &Command{
UsageLine: "s3 [-port=8333] [-filer=<ip:port>] [-config=</path/to/config.json>]",
Short: "start a s3 API compatible server that is backed by a filer",
Long: `start a s3 API compatible server that is backed by a filer.
UsageLine: "s3 [-port=8333] [-filer=<ip:port>[,<ip:port>]...] [-config=</path/to/config.json>]",
Short: "start a s3 API compatible server that is backed by filer(s)",
Long: `start a s3 API compatible server that is backed by filer(s).
Multiple filer addresses can be specified for high availability, separated by commas.
The S3 server will automatically failover between filers if one becomes unavailable.
By default, you can use any access key and secret key to access the S3 APIs. By default, you can use any access key and secret key to access the S3 APIs.
To enable credential based access, create a config.json file similar to this: To enable credential based access, create a config.json file similar to this:
@ -200,7 +203,7 @@ func (s3opt *S3Options) GetCertificateWithUpdate(*tls.ClientHelloInfo) (*tls.Cer
func (s3opt *S3Options) startS3Server() bool { func (s3opt *S3Options) startS3Server() bool {
filerAddress := pb.ServerAddress(*s3opt.filer)
filerAddresses := pb.ServerAddresses(*s3opt.filer).ToAddresses()
filerBucketsPath := "/buckets" filerBucketsPath := "/buckets"
filerGroup := "" filerGroup := ""
@ -212,10 +215,10 @@ func (s3opt *S3Options) startS3Server() bool {
var metricsIntervalSec int var metricsIntervalSec int
for { for {
err := pb.WithGrpcFilerClient(false, 0, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
err := pb.WithOneOfGrpcFilerClients(false, filerAddresses, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
if err != nil { if err != nil {
return fmt.Errorf("get filer %s configuration: %v", filerAddress, err)
return fmt.Errorf("get filer configuration: %v", err)
} }
filerBucketsPath = resp.DirBuckets filerBucketsPath = resp.DirBuckets
filerGroup = resp.FilerGroup filerGroup = resp.FilerGroup
@ -224,10 +227,10 @@ func (s3opt *S3Options) startS3Server() bool {
return nil return nil
}) })
if err != nil { if err != nil {
glog.V(0).Infof("wait to connect to filer %s grpc address %s", *s3opt.filer, filerAddress.ToGrpcAddress())
glog.V(0).Infof("wait to connect to filers %v grpc address", filerAddresses)
time.Sleep(time.Second) time.Sleep(time.Second)
} else { } else {
glog.V(0).Infof("connected to filer %s grpc address %s", *s3opt.filer, filerAddress.ToGrpcAddress())
glog.V(0).Infof("connected to filers %v", filerAddresses)
break break
} }
} }
@ -252,7 +255,7 @@ func (s3opt *S3Options) startS3Server() bool {
} }
s3ApiServer, s3ApiServer_err = s3api.NewS3ApiServer(router, &s3api.S3ApiServerOption{ s3ApiServer, s3ApiServer_err = s3api.NewS3ApiServer(router, &s3api.S3ApiServerOption{
Filer: filerAddress,
Filers: filerAddresses,
Port: *s3opt.port, Port: *s3opt.port,
Config: *s3opt.config, Config: *s3opt.config,
DomainName: *s3opt.domainName, DomainName: *s3opt.domainName,

21
weed/iamapi/iamapi_server.go

@ -38,9 +38,18 @@ type IamS3ApiConfigure struct {
credentialManager *credential.CredentialManager credentialManager *credential.CredentialManager
} }
// getFilerAddress returns the current filer address to use
// Returns the first filer for single-address operations
func (iama *IamS3ApiConfigure) getFilerAddress() pb.ServerAddress {
if len(iama.option.Filers) > 0 {
return iama.option.Filers[0]
}
return ""
}
type IamServerOption struct { type IamServerOption struct {
Masters map[string]pb.ServerAddress Masters map[string]pb.ServerAddress
Filer pb.ServerAddress
Filers []pb.ServerAddress
Port int Port int
GrpcDialOption grpc.DialOption GrpcDialOption grpc.DialOption
} }
@ -80,7 +89,7 @@ func NewIamApiServerWithStore(router *mux.Router, option *IamServerOption, expli
s3ApiConfigure = configure s3ApiConfigure = configure
s3Option := s3api.S3ApiServerOption{ s3Option := s3api.S3ApiServerOption{
Filer: option.Filer,
Filers: option.Filers,
GrpcDialOption: option.GrpcDialOption, GrpcDialOption: option.GrpcDialOption,
} }
@ -149,7 +158,7 @@ func (iama *IamS3ApiConfigure) PutS3ApiConfigurationToCredentialManager(s3cfg *i
func (iama *IamS3ApiConfigure) GetS3ApiConfigurationFromFiler(s3cfg *iam_pb.S3ApiConfiguration) (err error) { func (iama *IamS3ApiConfigure) GetS3ApiConfigurationFromFiler(s3cfg *iam_pb.S3ApiConfiguration) (err error) {
var buf bytes.Buffer var buf bytes.Buffer
err = pb.WithGrpcFilerClient(false, 0, iama.option.Filer, iama.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
err = pb.WithGrpcFilerClient(false, 0, iama.getFilerAddress(), iama.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
if err = filer.ReadEntry(iama.masterClient, client, filer.IamConfigDirectory, filer.IamIdentityFile, &buf); err != nil { if err = filer.ReadEntry(iama.masterClient, client, filer.IamConfigDirectory, filer.IamIdentityFile, &buf); err != nil {
return err return err
} }
@ -171,7 +180,7 @@ func (iama *IamS3ApiConfigure) PutS3ApiConfigurationToFiler(s3cfg *iam_pb.S3ApiC
if err := filer.ProtoToText(&buf, s3cfg); err != nil { if err := filer.ProtoToText(&buf, s3cfg); err != nil {
return fmt.Errorf("ProtoToText: %s", err) return fmt.Errorf("ProtoToText: %s", err)
} }
return pb.WithGrpcFilerClient(false, 0, iama.option.Filer, iama.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
return pb.WithGrpcFilerClient(false, 0, iama.getFilerAddress(), iama.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
err = util.Retry("saveIamIdentity", func() error { err = util.Retry("saveIamIdentity", func() error {
return filer.SaveInsideFiler(client, filer.IamConfigDirectory, filer.IamIdentityFile, buf.Bytes()) return filer.SaveInsideFiler(client, filer.IamConfigDirectory, filer.IamIdentityFile, buf.Bytes())
}) })
@ -184,7 +193,7 @@ func (iama *IamS3ApiConfigure) PutS3ApiConfigurationToFiler(s3cfg *iam_pb.S3ApiC
func (iama *IamS3ApiConfigure) GetPolicies(policies *Policies) (err error) { func (iama *IamS3ApiConfigure) GetPolicies(policies *Policies) (err error) {
var buf bytes.Buffer var buf bytes.Buffer
err = pb.WithGrpcFilerClient(false, 0, iama.option.Filer, iama.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
err = pb.WithGrpcFilerClient(false, 0, iama.getFilerAddress(), iama.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
if err = filer.ReadEntry(iama.masterClient, client, filer.IamConfigDirectory, filer.IamPoliciesFile, &buf); err != nil { if err = filer.ReadEntry(iama.masterClient, client, filer.IamConfigDirectory, filer.IamPoliciesFile, &buf); err != nil {
return err return err
} }
@ -208,7 +217,7 @@ func (iama *IamS3ApiConfigure) PutPolicies(policies *Policies) (err error) {
if b, err = json.Marshal(policies); err != nil { if b, err = json.Marshal(policies); err != nil {
return err return err
} }
return pb.WithGrpcFilerClient(false, 0, iama.option.Filer, iama.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
return pb.WithGrpcFilerClient(false, 0, iama.getFilerAddress(), iama.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
if err := filer.SaveInsideFiler(client, filer.IamConfigDirectory, filer.IamPoliciesFile, b); err != nil { if err := filer.SaveInsideFiler(client, filer.IamConfigDirectory, filer.IamPoliciesFile, b); err != nil {
return err return err
} }

7
weed/s3api/auth_credentials.go

@ -141,7 +141,12 @@ func NewIdentityAccessManagementWithStore(option *S3ApiServerOption, explicitSto
if filerClientSetter, ok := store.(interface { if filerClientSetter, ok := store.(interface {
SetFilerClient(string, grpc.DialOption) SetFilerClient(string, grpc.DialOption)
}); ok { }); ok {
filerClientSetter.SetFilerClient(string(option.Filer), option.GrpcDialOption)
// Use the first filer address for credential store
filerAddr := ""
if len(option.Filers) > 0 {
filerAddr = string(option.Filers[0])
}
filerClientSetter.SetFilerClient(filerAddr, option.GrpcDialOption)
} }
} }

8
weed/s3api/filer_multipart.go

@ -184,7 +184,7 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl
if entry, _ := s3a.getEntry(dirName, entryName); entry != nil && entry.Extended != nil { if entry, _ := s3a.getEntry(dirName, entryName); entry != nil && entry.Extended != nil {
if uploadId, ok := entry.Extended[s3_constants.SeaweedFSUploadId]; ok && *input.UploadId == string(uploadId) { if uploadId, ok := entry.Extended[s3_constants.SeaweedFSUploadId]; ok && *input.UploadId == string(uploadId) {
return &CompleteMultipartUploadResult{ return &CompleteMultipartUploadResult{
Location: aws.String(fmt.Sprintf("http://%s%s/%s", s3a.option.Filer.ToHttpAddress(), urlEscapeObject(dirName), urlPathEscape(entryName))),
Location: aws.String(fmt.Sprintf("http://%s%s/%s", s3a.getFilerAddress().ToHttpAddress(), urlEscapeObject(dirName), urlPathEscape(entryName))),
Bucket: input.Bucket, Bucket: input.Bucket,
ETag: aws.String("\"" + filer.ETagChunks(entry.GetChunks()) + "\""), ETag: aws.String("\"" + filer.ETagChunks(entry.GetChunks()) + "\""),
Key: objectKey(input.Key), Key: objectKey(input.Key),
@ -401,7 +401,7 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl
// The latest version information is tracked in the .versions directory metadata // The latest version information is tracked in the .versions directory metadata
output = &CompleteMultipartUploadResult{ output = &CompleteMultipartUploadResult{
Location: aws.String(fmt.Sprintf("http://%s%s/%s", s3a.option.Filer.ToHttpAddress(), urlEscapeObject(dirName), urlPathEscape(entryName))),
Location: aws.String(fmt.Sprintf("http://%s%s/%s", s3a.getFilerAddress().ToHttpAddress(), urlEscapeObject(dirName), urlPathEscape(entryName))),
Bucket: input.Bucket, Bucket: input.Bucket,
ETag: aws.String("\"" + filer.ETagChunks(finalParts) + "\""), ETag: aws.String("\"" + filer.ETagChunks(finalParts) + "\""),
Key: objectKey(input.Key), Key: objectKey(input.Key),
@ -454,7 +454,7 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl
// Note: Suspended versioning should NOT return VersionId field according to AWS S3 spec // Note: Suspended versioning should NOT return VersionId field according to AWS S3 spec
output = &CompleteMultipartUploadResult{ output = &CompleteMultipartUploadResult{
Location: aws.String(fmt.Sprintf("http://%s%s/%s", s3a.option.Filer.ToHttpAddress(), urlEscapeObject(dirName), urlPathEscape(entryName))),
Location: aws.String(fmt.Sprintf("http://%s%s/%s", s3a.getFilerAddress().ToHttpAddress(), urlEscapeObject(dirName), urlPathEscape(entryName))),
Bucket: input.Bucket, Bucket: input.Bucket,
ETag: aws.String("\"" + filer.ETagChunks(finalParts) + "\""), ETag: aws.String("\"" + filer.ETagChunks(finalParts) + "\""),
Key: objectKey(input.Key), Key: objectKey(input.Key),
@ -511,7 +511,7 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl
// For non-versioned buckets, return response without VersionId // For non-versioned buckets, return response without VersionId
output = &CompleteMultipartUploadResult{ output = &CompleteMultipartUploadResult{
Location: aws.String(fmt.Sprintf("http://%s%s/%s", s3a.option.Filer.ToHttpAddress(), urlEscapeObject(dirName), urlPathEscape(entryName))),
Location: aws.String(fmt.Sprintf("http://%s%s/%s", s3a.getFilerAddress().ToHttpAddress(), urlEscapeObject(dirName), urlPathEscape(entryName))),
Bucket: input.Bucket, Bucket: input.Bucket,
ETag: aws.String("\"" + filer.ETagChunks(finalParts) + "\""), ETag: aws.String("\"" + filer.ETagChunks(finalParts) + "\""),
Key: objectKey(input.Key), Key: objectKey(input.Key),

6
weed/s3api/s3api_bucket_handlers.go

@ -877,7 +877,7 @@ func (s3a *S3ApiServer) GetBucketLifecycleConfigurationHandler(w http.ResponseWr
s3err.WriteErrorResponse(w, r, err) s3err.WriteErrorResponse(w, r, err)
return return
} }
fc, err := filer.ReadFilerConf(s3a.option.Filer, s3a.option.GrpcDialOption, nil)
fc, err := filer.ReadFilerConf(s3a.getFilerAddress(), s3a.option.GrpcDialOption, nil)
if err != nil { if err != nil {
glog.Errorf("GetBucketLifecycleConfigurationHandler: %s", err) glog.Errorf("GetBucketLifecycleConfigurationHandler: %s", err)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
@ -938,7 +938,7 @@ func (s3a *S3ApiServer) PutBucketLifecycleConfigurationHandler(w http.ResponseWr
return return
} }
fc, err := filer.ReadFilerConf(s3a.option.Filer, s3a.option.GrpcDialOption, nil)
fc, err := filer.ReadFilerConf(s3a.getFilerAddress(), s3a.option.GrpcDialOption, nil)
if err != nil { if err != nil {
glog.Errorf("PutBucketLifecycleConfigurationHandler read filer config: %s", err) glog.Errorf("PutBucketLifecycleConfigurationHandler read filer config: %s", err)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
@ -1020,7 +1020,7 @@ func (s3a *S3ApiServer) DeleteBucketLifecycleHandler(w http.ResponseWriter, r *h
return return
} }
fc, err := filer.ReadFilerConf(s3a.option.Filer, s3a.option.GrpcDialOption, nil)
fc, err := filer.ReadFilerConf(s3a.getFilerAddress(), s3a.option.GrpcDialOption, nil)
if err != nil { if err != nil {
glog.Errorf("DeleteBucketLifecycleHandler read filer config: %s", err) glog.Errorf("DeleteBucketLifecycleHandler read filer config: %s", err)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)

3
weed/s3api/s3api_circuit_breaker.go

@ -29,7 +29,8 @@ func NewCircuitBreaker(option *S3ApiServerOption) *CircuitBreaker {
limitations: make(map[string]int64), limitations: make(map[string]int64),
} }
err := pb.WithFilerClient(false, 0, option.Filer, option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
// Use WithOneOfGrpcFilerClients to support multiple filers with failover
err := pb.WithOneOfGrpcFilerClients(false, option.Filers, option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
content, err := filer.ReadInsideFiler(client, s3_constants.CircuitBreakerConfigDir, s3_constants.CircuitBreakerConfigFile) content, err := filer.ReadInsideFiler(client, s3_constants.CircuitBreakerConfigDir, s3_constants.CircuitBreakerConfigFile)
if errors.Is(err, filer_pb.ErrNotFound) { if errors.Is(err, filer_pb.ErrNotFound) {
return nil return nil

2
weed/s3api/s3api_handlers.go

@ -19,7 +19,7 @@ func (s3a *S3ApiServer) WithFilerClient(streamingMode bool, fn func(filer_pb.Sea
return pb.WithGrpcClient(streamingMode, s3a.randomClientId, func(grpcConnection *grpc.ClientConn) error { return pb.WithGrpcClient(streamingMode, s3a.randomClientId, func(grpcConnection *grpc.ClientConn) error {
client := filer_pb.NewSeaweedFilerClient(grpcConnection) client := filer_pb.NewSeaweedFilerClient(grpcConnection)
return fn(client) return fn(client)
}, s3a.option.Filer.ToGrpcAddress(), false, s3a.option.GrpcDialOption)
}, s3a.getFilerAddress().ToGrpcAddress(), false, s3a.option.GrpcDialOption)
} }

2
weed/s3api/s3api_object_handlers.go

@ -407,7 +407,7 @@ func newListEntry(entry *filer_pb.Entry, key string, dir string, name string, bu
func (s3a *S3ApiServer) toFilerUrl(bucket, object string) string { func (s3a *S3ApiServer) toFilerUrl(bucket, object string) string {
object = urlPathEscape(removeDuplicateSlashes(object)) object = urlPathEscape(removeDuplicateSlashes(object))
destUrl := fmt.Sprintf("http://%s%s/%s%s", destUrl := fmt.Sprintf("http://%s%s/%s%s",
s3a.option.Filer.ToHttpAddress(), s3a.option.BucketsPath, bucket, object)
s3a.getFilerAddress().ToHttpAddress(), s3a.option.BucketsPath, bucket, object)
return destUrl return destUrl
} }

2
weed/s3api/s3api_object_handlers_multipart.go

@ -439,7 +439,7 @@ func (s3a *S3ApiServer) genUploadsFolder(bucket string) string {
func (s3a *S3ApiServer) genPartUploadUrl(bucket, uploadID string, partID int) string { func (s3a *S3ApiServer) genPartUploadUrl(bucket, uploadID string, partID int) string {
return fmt.Sprintf("http://%s%s/%s/%04d_%s.part", return fmt.Sprintf("http://%s%s/%s/%04d_%s.part",
s3a.option.Filer.ToHttpAddress(), s3a.genUploadsFolder(bucket), uploadID, partID, uuid.NewString())
s3a.getFilerAddress().ToHttpAddress(), s3a.genUploadsFolder(bucket), uploadID, partID, uuid.NewString())
} }
// Generate uploadID hash string from object // Generate uploadID hash string from object

2
weed/s3api/s3api_object_handlers_postpolicy.go

@ -114,7 +114,7 @@ func (s3a *S3ApiServer) PostPolicyBucketHandler(w http.ResponseWriter, r *http.R
} }
} }
uploadUrl := fmt.Sprintf("http://%s%s/%s%s", s3a.option.Filer.ToHttpAddress(), s3a.option.BucketsPath, bucket, urlEscapeObject(object))
uploadUrl := fmt.Sprintf("http://%s%s/%s%s", s3a.getFilerAddress().ToHttpAddress(), s3a.option.BucketsPath, bucket, urlEscapeObject(object))
// Get ContentType from post formData // Get ContentType from post formData
// Otherwise from formFile ContentType // Otherwise from formFile ContentType

31
weed/s3api/s3api_server.go

@ -33,7 +33,7 @@ import (
) )
type S3ApiServerOption struct { type S3ApiServerOption struct {
Filer pb.ServerAddress
Filers []pb.ServerAddress
Port int Port int
Config string Config string
DomainName string DomainName string
@ -95,9 +95,9 @@ func NewS3ApiServerWithStore(router *mux.Router, option *S3ApiServerOption, expl
// Initialize FilerClient for volume location caching // Initialize FilerClient for volume location caching
// Uses the battle-tested vidMap with filer-based lookups // Uses the battle-tested vidMap with filer-based lookups
// S3 API typically connects to a single filer, but wrap in slice for consistency
filerClient := wdclient.NewFilerClient([]pb.ServerAddress{option.Filer}, option.GrpcDialOption, option.DataCenter)
glog.V(0).Infof("S3 API initialized FilerClient for volume location caching")
// Supports multiple filer addresses with automatic failover for high availability
filerClient := wdclient.NewFilerClient(option.Filers, option.GrpcDialOption, option.DataCenter)
glog.V(0).Infof("S3 API initialized FilerClient with %d filer(s) for volume location caching", len(option.Filers))
s3ApiServer = &S3ApiServer{ s3ApiServer = &S3ApiServer{
option: option, option: option,
@ -120,13 +120,22 @@ func NewS3ApiServerWithStore(router *mux.Router, option *S3ApiServerOption, expl
glog.V(1).Infof("Loading advanced IAM configuration from: %s", option.IamConfig) glog.V(1).Infof("Loading advanced IAM configuration from: %s", option.IamConfig)
iamManager, err := loadIAMManagerFromConfig(option.IamConfig, func() string { iamManager, err := loadIAMManagerFromConfig(option.IamConfig, func() string {
return string(option.Filer)
// Use the first filer address for IAM
if len(option.Filers) > 0 {
return string(option.Filers[0])
}
return ""
}) })
if err != nil { if err != nil {
glog.Errorf("Failed to load IAM configuration: %v", err) glog.Errorf("Failed to load IAM configuration: %v", err)
} else { } else {
// Create S3 IAM integration with the loaded IAM manager // Create S3 IAM integration with the loaded IAM manager
s3iam := NewS3IAMIntegration(iamManager, string(option.Filer))
// Use the first filer address for IAM
filerAddr := ""
if len(option.Filers) > 0 {
filerAddr = string(option.Filers[0])
}
s3iam := NewS3IAMIntegration(iamManager, filerAddr)
// Set IAM integration in server // Set IAM integration in server
s3ApiServer.iamIntegration = s3iam s3ApiServer.iamIntegration = s3iam
@ -173,6 +182,16 @@ func NewS3ApiServerWithStore(router *mux.Router, option *S3ApiServerOption, expl
return s3ApiServer, nil return s3ApiServer, nil
} }
// getFilerAddress returns the current filer address to use
// For operations that need a single address, returns the first one
// The underlying FilerClient handles failover automatically
func (s3a *S3ApiServer) getFilerAddress() pb.ServerAddress {
if len(s3a.option.Filers) > 0 {
return s3a.option.Filers[0]
}
return ""
}
// syncBucketPolicyToEngine syncs a bucket policy to the policy engine // syncBucketPolicyToEngine syncs a bucket policy to the policy engine
// This helper method centralizes the logic for loading bucket policies into the engine // This helper method centralizes the logic for loading bucket policies into the engine
// to avoid duplication and ensure consistent error handling // to avoid duplication and ensure consistent error handling

Loading…
Cancel
Save