diff --git a/weed/command/iam.go b/weed/command/iam.go index 8f4ac878d..c7dc860d4 100644 --- a/weed/command/iam.go +++ b/weed/command/iam.go @@ -35,16 +35,18 @@ type IamOptions struct { func init() { cmdIam.Run = runIam // break init cycle - iamStandaloneOptions.filer = cmdIam.Flag.String("filer", "localhost:8888", "filer server address") + iamStandaloneOptions.filer = cmdIam.Flag.String("filer", "localhost:8888", "comma-separated filer server addresses for high availability") iamStandaloneOptions.masters = cmdIam.Flag.String("master", "localhost:9333", "comma-separated master servers") iamStandaloneOptions.ip = cmdIam.Flag.String("ip", util.DetectedHostAddress(), "iam server http listen ip address") iamStandaloneOptions.port = cmdIam.Flag.Int("port", 8111, "iam server http listen port") } var cmdIam = &Command{ - UsageLine: "iam [-port=8111] [-filer=] [-master=,]", + UsageLine: "iam [-port=8111] [-filer=[,]...] [-master=,]", Short: "start a iam API compatible server", - Long: "start a iam API compatible server.", + Long: `start a iam API compatible server. + + Multiple filer addresses can be specified for high availability, separated by commas.`, } func runIam(cmd *Command, args []string) bool { @@ -52,24 +54,24 @@ func runIam(cmd *Command, args []string) bool { } func (iamopt *IamOptions) startIamServer() bool { - filerAddress := pb.ServerAddress(*iamopt.filer) + filerAddresses := pb.ServerAddresses(*iamopt.filer).ToAddresses() util.LoadSecurityConfiguration() grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") for { - err := pb.WithGrpcFilerClient(false, 0, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err := pb.WithOneOfGrpcFilerClients(false, filerAddresses, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) if err != nil { - return fmt.Errorf("get filer %s configuration: %v", filerAddress, err) + return fmt.Errorf("get filer configuration: %v", err) } glog.V(0).Infof("IAM read filer configuration: %s", resp) return nil }) if err != nil { - glog.V(0).Infof("wait to connect to filer %s grpc address %s", *iamopt.filer, filerAddress.ToGrpcAddress()) + glog.V(0).Infof("wait to connect to filers %v", filerAddresses) time.Sleep(time.Second) } else { - glog.V(0).Infof("connected to filer %s grpc address %s", *iamopt.filer, filerAddress.ToGrpcAddress()) + glog.V(0).Infof("connected to filers %v", filerAddresses) break } } @@ -78,7 +80,7 @@ func (iamopt *IamOptions) startIamServer() bool { router := mux.NewRouter().SkipClean(true) iamApiServer, iamApiServer_err := iamapi.NewIamApiServer(router, &iamapi.IamServerOption{ Masters: masters, - Filer: filerAddress, + Filers: filerAddresses, Port: *iamopt.port, GrpcDialOption: grpcDialOption, }) diff --git a/weed/command/s3.go b/weed/command/s3.go index fa575b3db..14998f667 100644 --- a/weed/command/s3.go +++ b/weed/command/s3.go @@ -61,7 +61,7 @@ type S3Options struct { func init() { cmdS3.Run = runS3 // break init cycle - s3StandaloneOptions.filer = cmdS3.Flag.String("filer", "localhost:8888", "filer server address") + s3StandaloneOptions.filer = cmdS3.Flag.String("filer", "localhost:8888", "comma-separated filer server addresses for high availability") s3StandaloneOptions.bindIp = cmdS3.Flag.String("ip.bind", "", "ip address to bind to. Default to localhost.") s3StandaloneOptions.port = cmdS3.Flag.Int("port", 8333, "s3 server http listen port") s3StandaloneOptions.portHttps = cmdS3.Flag.Int("port.https", 0, "s3 server https listen port") @@ -86,9 +86,12 @@ func init() { } var cmdS3 = &Command{ - UsageLine: "s3 [-port=8333] [-filer=] [-config=]", - Short: "start a s3 API compatible server that is backed by a filer", - Long: `start a s3 API compatible server that is backed by a filer. + UsageLine: "s3 [-port=8333] [-filer=[,]...] [-config=]", + Short: "start a s3 API compatible server that is backed by filer(s)", + Long: `start a s3 API compatible server that is backed by filer(s). + + Multiple filer addresses can be specified for high availability, separated by commas. + The S3 server will automatically failover between filers if one becomes unavailable. By default, you can use any access key and secret key to access the S3 APIs. To enable credential based access, create a config.json file similar to this: @@ -200,7 +203,7 @@ func (s3opt *S3Options) GetCertificateWithUpdate(*tls.ClientHelloInfo) (*tls.Cer func (s3opt *S3Options) startS3Server() bool { - filerAddress := pb.ServerAddress(*s3opt.filer) + filerAddresses := pb.ServerAddresses(*s3opt.filer).ToAddresses() filerBucketsPath := "/buckets" filerGroup := "" @@ -212,10 +215,10 @@ func (s3opt *S3Options) startS3Server() bool { var metricsIntervalSec int for { - err := pb.WithGrpcFilerClient(false, 0, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err := pb.WithOneOfGrpcFilerClients(false, filerAddresses, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) if err != nil { - return fmt.Errorf("get filer %s configuration: %v", filerAddress, err) + return fmt.Errorf("get filer configuration: %v", err) } filerBucketsPath = resp.DirBuckets filerGroup = resp.FilerGroup @@ -224,10 +227,10 @@ func (s3opt *S3Options) startS3Server() bool { return nil }) if err != nil { - glog.V(0).Infof("wait to connect to filer %s grpc address %s", *s3opt.filer, filerAddress.ToGrpcAddress()) + glog.V(0).Infof("wait to connect to filers %v grpc address", filerAddresses) time.Sleep(time.Second) } else { - glog.V(0).Infof("connected to filer %s grpc address %s", *s3opt.filer, filerAddress.ToGrpcAddress()) + glog.V(0).Infof("connected to filers %v", filerAddresses) break } } @@ -252,7 +255,7 @@ func (s3opt *S3Options) startS3Server() bool { } s3ApiServer, s3ApiServer_err = s3api.NewS3ApiServer(router, &s3api.S3ApiServerOption{ - Filer: filerAddress, + Filers: filerAddresses, Port: *s3opt.port, Config: *s3opt.config, DomainName: *s3opt.domainName, diff --git a/weed/iamapi/iamapi_server.go b/weed/iamapi/iamapi_server.go index 361d9bec9..90a175ca5 100644 --- a/weed/iamapi/iamapi_server.go +++ b/weed/iamapi/iamapi_server.go @@ -38,9 +38,18 @@ type IamS3ApiConfigure struct { credentialManager *credential.CredentialManager } +// getFilerAddress returns the current filer address to use +// Returns the first filer for single-address operations +func (iama *IamS3ApiConfigure) getFilerAddress() pb.ServerAddress { + if len(iama.option.Filers) > 0 { + return iama.option.Filers[0] + } + return "" +} + type IamServerOption struct { Masters map[string]pb.ServerAddress - Filer pb.ServerAddress + Filers []pb.ServerAddress Port int GrpcDialOption grpc.DialOption } @@ -80,7 +89,7 @@ func NewIamApiServerWithStore(router *mux.Router, option *IamServerOption, expli s3ApiConfigure = configure s3Option := s3api.S3ApiServerOption{ - Filer: option.Filer, + Filers: option.Filers, GrpcDialOption: option.GrpcDialOption, } @@ -149,7 +158,7 @@ func (iama *IamS3ApiConfigure) PutS3ApiConfigurationToCredentialManager(s3cfg *i func (iama *IamS3ApiConfigure) GetS3ApiConfigurationFromFiler(s3cfg *iam_pb.S3ApiConfiguration) (err error) { var buf bytes.Buffer - err = pb.WithGrpcFilerClient(false, 0, iama.option.Filer, iama.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err = pb.WithGrpcFilerClient(false, 0, iama.getFilerAddress(), iama.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error { if err = filer.ReadEntry(iama.masterClient, client, filer.IamConfigDirectory, filer.IamIdentityFile, &buf); err != nil { return err } @@ -171,7 +180,7 @@ func (iama *IamS3ApiConfigure) PutS3ApiConfigurationToFiler(s3cfg *iam_pb.S3ApiC if err := filer.ProtoToText(&buf, s3cfg); err != nil { return fmt.Errorf("ProtoToText: %s", err) } - return pb.WithGrpcFilerClient(false, 0, iama.option.Filer, iama.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + return pb.WithGrpcFilerClient(false, 0, iama.getFilerAddress(), iama.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error { err = util.Retry("saveIamIdentity", func() error { return filer.SaveInsideFiler(client, filer.IamConfigDirectory, filer.IamIdentityFile, buf.Bytes()) }) @@ -184,7 +193,7 @@ func (iama *IamS3ApiConfigure) PutS3ApiConfigurationToFiler(s3cfg *iam_pb.S3ApiC func (iama *IamS3ApiConfigure) GetPolicies(policies *Policies) (err error) { var buf bytes.Buffer - err = pb.WithGrpcFilerClient(false, 0, iama.option.Filer, iama.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err = pb.WithGrpcFilerClient(false, 0, iama.getFilerAddress(), iama.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error { if err = filer.ReadEntry(iama.masterClient, client, filer.IamConfigDirectory, filer.IamPoliciesFile, &buf); err != nil { return err } @@ -208,7 +217,7 @@ func (iama *IamS3ApiConfigure) PutPolicies(policies *Policies) (err error) { if b, err = json.Marshal(policies); err != nil { return err } - return pb.WithGrpcFilerClient(false, 0, iama.option.Filer, iama.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + return pb.WithGrpcFilerClient(false, 0, iama.getFilerAddress(), iama.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error { if err := filer.SaveInsideFiler(client, filer.IamConfigDirectory, filer.IamPoliciesFile, b); err != nil { return err } diff --git a/weed/s3api/auth_credentials.go b/weed/s3api/auth_credentials.go index cebcd17f5..a91b3572f 100644 --- a/weed/s3api/auth_credentials.go +++ b/weed/s3api/auth_credentials.go @@ -141,7 +141,12 @@ func NewIdentityAccessManagementWithStore(option *S3ApiServerOption, explicitSto if filerClientSetter, ok := store.(interface { SetFilerClient(string, grpc.DialOption) }); ok { - filerClientSetter.SetFilerClient(string(option.Filer), option.GrpcDialOption) + // Use the first filer address for credential store + filerAddr := "" + if len(option.Filers) > 0 { + filerAddr = string(option.Filers[0]) + } + filerClientSetter.SetFilerClient(filerAddr, option.GrpcDialOption) } } diff --git a/weed/s3api/filer_multipart.go b/weed/s3api/filer_multipart.go index 4b8fbaa62..d31c52990 100644 --- a/weed/s3api/filer_multipart.go +++ b/weed/s3api/filer_multipart.go @@ -184,7 +184,7 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl if entry, _ := s3a.getEntry(dirName, entryName); entry != nil && entry.Extended != nil { if uploadId, ok := entry.Extended[s3_constants.SeaweedFSUploadId]; ok && *input.UploadId == string(uploadId) { return &CompleteMultipartUploadResult{ - Location: aws.String(fmt.Sprintf("http://%s%s/%s", s3a.option.Filer.ToHttpAddress(), urlEscapeObject(dirName), urlPathEscape(entryName))), + Location: aws.String(fmt.Sprintf("http://%s%s/%s", s3a.getFilerAddress().ToHttpAddress(), urlEscapeObject(dirName), urlPathEscape(entryName))), Bucket: input.Bucket, ETag: aws.String("\"" + filer.ETagChunks(entry.GetChunks()) + "\""), Key: objectKey(input.Key), @@ -401,7 +401,7 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl // The latest version information is tracked in the .versions directory metadata output = &CompleteMultipartUploadResult{ - Location: aws.String(fmt.Sprintf("http://%s%s/%s", s3a.option.Filer.ToHttpAddress(), urlEscapeObject(dirName), urlPathEscape(entryName))), + Location: aws.String(fmt.Sprintf("http://%s%s/%s", s3a.getFilerAddress().ToHttpAddress(), urlEscapeObject(dirName), urlPathEscape(entryName))), Bucket: input.Bucket, ETag: aws.String("\"" + filer.ETagChunks(finalParts) + "\""), Key: objectKey(input.Key), @@ -454,7 +454,7 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl // Note: Suspended versioning should NOT return VersionId field according to AWS S3 spec output = &CompleteMultipartUploadResult{ - Location: aws.String(fmt.Sprintf("http://%s%s/%s", s3a.option.Filer.ToHttpAddress(), urlEscapeObject(dirName), urlPathEscape(entryName))), + Location: aws.String(fmt.Sprintf("http://%s%s/%s", s3a.getFilerAddress().ToHttpAddress(), urlEscapeObject(dirName), urlPathEscape(entryName))), Bucket: input.Bucket, ETag: aws.String("\"" + filer.ETagChunks(finalParts) + "\""), Key: objectKey(input.Key), @@ -511,7 +511,7 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl // For non-versioned buckets, return response without VersionId output = &CompleteMultipartUploadResult{ - Location: aws.String(fmt.Sprintf("http://%s%s/%s", s3a.option.Filer.ToHttpAddress(), urlEscapeObject(dirName), urlPathEscape(entryName))), + Location: aws.String(fmt.Sprintf("http://%s%s/%s", s3a.getFilerAddress().ToHttpAddress(), urlEscapeObject(dirName), urlPathEscape(entryName))), Bucket: input.Bucket, ETag: aws.String("\"" + filer.ETagChunks(finalParts) + "\""), Key: objectKey(input.Key), diff --git a/weed/s3api/s3api_bucket_handlers.go b/weed/s3api/s3api_bucket_handlers.go index d73fabd2f..ecdd39ec2 100644 --- a/weed/s3api/s3api_bucket_handlers.go +++ b/weed/s3api/s3api_bucket_handlers.go @@ -877,7 +877,7 @@ func (s3a *S3ApiServer) GetBucketLifecycleConfigurationHandler(w http.ResponseWr s3err.WriteErrorResponse(w, r, err) return } - fc, err := filer.ReadFilerConf(s3a.option.Filer, s3a.option.GrpcDialOption, nil) + fc, err := filer.ReadFilerConf(s3a.getFilerAddress(), s3a.option.GrpcDialOption, nil) if err != nil { glog.Errorf("GetBucketLifecycleConfigurationHandler: %s", err) s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) @@ -938,7 +938,7 @@ func (s3a *S3ApiServer) PutBucketLifecycleConfigurationHandler(w http.ResponseWr return } - fc, err := filer.ReadFilerConf(s3a.option.Filer, s3a.option.GrpcDialOption, nil) + fc, err := filer.ReadFilerConf(s3a.getFilerAddress(), s3a.option.GrpcDialOption, nil) if err != nil { glog.Errorf("PutBucketLifecycleConfigurationHandler read filer config: %s", err) s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) @@ -1020,7 +1020,7 @@ func (s3a *S3ApiServer) DeleteBucketLifecycleHandler(w http.ResponseWriter, r *h return } - fc, err := filer.ReadFilerConf(s3a.option.Filer, s3a.option.GrpcDialOption, nil) + fc, err := filer.ReadFilerConf(s3a.getFilerAddress(), s3a.option.GrpcDialOption, nil) if err != nil { glog.Errorf("DeleteBucketLifecycleHandler read filer config: %s", err) s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) diff --git a/weed/s3api/s3api_circuit_breaker.go b/weed/s3api/s3api_circuit_breaker.go index 47efa728a..0d7dc283a 100644 --- a/weed/s3api/s3api_circuit_breaker.go +++ b/weed/s3api/s3api_circuit_breaker.go @@ -29,7 +29,8 @@ func NewCircuitBreaker(option *S3ApiServerOption) *CircuitBreaker { limitations: make(map[string]int64), } - err := pb.WithFilerClient(false, 0, option.Filer, option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + // Use WithOneOfGrpcFilerClients to support multiple filers with failover + err := pb.WithOneOfGrpcFilerClients(false, option.Filers, option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error { content, err := filer.ReadInsideFiler(client, s3_constants.CircuitBreakerConfigDir, s3_constants.CircuitBreakerConfigFile) if errors.Is(err, filer_pb.ErrNotFound) { return nil diff --git a/weed/s3api/s3api_handlers.go b/weed/s3api/s3api_handlers.go index c146a8b15..ffeedc968 100644 --- a/weed/s3api/s3api_handlers.go +++ b/weed/s3api/s3api_handlers.go @@ -19,7 +19,7 @@ func (s3a *S3ApiServer) WithFilerClient(streamingMode bool, fn func(filer_pb.Sea return pb.WithGrpcClient(streamingMode, s3a.randomClientId, func(grpcConnection *grpc.ClientConn) error { client := filer_pb.NewSeaweedFilerClient(grpcConnection) return fn(client) - }, s3a.option.Filer.ToGrpcAddress(), false, s3a.option.GrpcDialOption) + }, s3a.getFilerAddress().ToGrpcAddress(), false, s3a.option.GrpcDialOption) } diff --git a/weed/s3api/s3api_object_handlers.go b/weed/s3api/s3api_object_handlers.go index cd0e82421..d65c1b69a 100644 --- a/weed/s3api/s3api_object_handlers.go +++ b/weed/s3api/s3api_object_handlers.go @@ -407,7 +407,7 @@ func newListEntry(entry *filer_pb.Entry, key string, dir string, name string, bu func (s3a *S3ApiServer) toFilerUrl(bucket, object string) string { object = urlPathEscape(removeDuplicateSlashes(object)) destUrl := fmt.Sprintf("http://%s%s/%s%s", - s3a.option.Filer.ToHttpAddress(), s3a.option.BucketsPath, bucket, object) + s3a.getFilerAddress().ToHttpAddress(), s3a.option.BucketsPath, bucket, object) return destUrl } diff --git a/weed/s3api/s3api_object_handlers_multipart.go b/weed/s3api/s3api_object_handlers_multipart.go index 2d9f8e620..137cd73fe 100644 --- a/weed/s3api/s3api_object_handlers_multipart.go +++ b/weed/s3api/s3api_object_handlers_multipart.go @@ -439,7 +439,7 @@ func (s3a *S3ApiServer) genUploadsFolder(bucket string) string { func (s3a *S3ApiServer) genPartUploadUrl(bucket, uploadID string, partID int) string { return fmt.Sprintf("http://%s%s/%s/%04d_%s.part", - s3a.option.Filer.ToHttpAddress(), s3a.genUploadsFolder(bucket), uploadID, partID, uuid.NewString()) + s3a.getFilerAddress().ToHttpAddress(), s3a.genUploadsFolder(bucket), uploadID, partID, uuid.NewString()) } // Generate uploadID hash string from object diff --git a/weed/s3api/s3api_object_handlers_postpolicy.go b/weed/s3api/s3api_object_handlers_postpolicy.go index ecb2ac8d1..b43a60927 100644 --- a/weed/s3api/s3api_object_handlers_postpolicy.go +++ b/weed/s3api/s3api_object_handlers_postpolicy.go @@ -114,7 +114,7 @@ func (s3a *S3ApiServer) PostPolicyBucketHandler(w http.ResponseWriter, r *http.R } } - uploadUrl := fmt.Sprintf("http://%s%s/%s%s", s3a.option.Filer.ToHttpAddress(), s3a.option.BucketsPath, bucket, urlEscapeObject(object)) + uploadUrl := fmt.Sprintf("http://%s%s/%s%s", s3a.getFilerAddress().ToHttpAddress(), s3a.option.BucketsPath, bucket, urlEscapeObject(object)) // Get ContentType from post formData // Otherwise from formFile ContentType diff --git a/weed/s3api/s3api_server.go b/weed/s3api/s3api_server.go index 992027fda..444e1fd10 100644 --- a/weed/s3api/s3api_server.go +++ b/weed/s3api/s3api_server.go @@ -33,7 +33,7 @@ import ( ) type S3ApiServerOption struct { - Filer pb.ServerAddress + Filers []pb.ServerAddress Port int Config string DomainName string @@ -95,9 +95,9 @@ func NewS3ApiServerWithStore(router *mux.Router, option *S3ApiServerOption, expl // Initialize FilerClient for volume location caching // Uses the battle-tested vidMap with filer-based lookups - // S3 API typically connects to a single filer, but wrap in slice for consistency - filerClient := wdclient.NewFilerClient([]pb.ServerAddress{option.Filer}, option.GrpcDialOption, option.DataCenter) - glog.V(0).Infof("S3 API initialized FilerClient for volume location caching") + // Supports multiple filer addresses with automatic failover for high availability + filerClient := wdclient.NewFilerClient(option.Filers, option.GrpcDialOption, option.DataCenter) + glog.V(0).Infof("S3 API initialized FilerClient with %d filer(s) for volume location caching", len(option.Filers)) s3ApiServer = &S3ApiServer{ option: option, @@ -120,13 +120,22 @@ func NewS3ApiServerWithStore(router *mux.Router, option *S3ApiServerOption, expl glog.V(1).Infof("Loading advanced IAM configuration from: %s", option.IamConfig) iamManager, err := loadIAMManagerFromConfig(option.IamConfig, func() string { - return string(option.Filer) + // Use the first filer address for IAM + if len(option.Filers) > 0 { + return string(option.Filers[0]) + } + return "" }) if err != nil { glog.Errorf("Failed to load IAM configuration: %v", err) } else { // Create S3 IAM integration with the loaded IAM manager - s3iam := NewS3IAMIntegration(iamManager, string(option.Filer)) + // Use the first filer address for IAM + filerAddr := "" + if len(option.Filers) > 0 { + filerAddr = string(option.Filers[0]) + } + s3iam := NewS3IAMIntegration(iamManager, filerAddr) // Set IAM integration in server s3ApiServer.iamIntegration = s3iam @@ -173,6 +182,16 @@ func NewS3ApiServerWithStore(router *mux.Router, option *S3ApiServerOption, expl return s3ApiServer, nil } +// getFilerAddress returns the current filer address to use +// For operations that need a single address, returns the first one +// The underlying FilerClient handles failover automatically +func (s3a *S3ApiServer) getFilerAddress() pb.ServerAddress { + if len(s3a.option.Filers) > 0 { + return s3a.option.Filers[0] + } + return "" +} + // syncBucketPolicyToEngine syncs a bucket policy to the policy engine // This helper method centralizes the logic for loading bucket policies into the engine // to avoid duplication and ensure consistent error handling