From f6318edbc90995cf428a44a11b6c40f237dab2ad Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Fri, 23 Jan 2026 20:12:59 -0800 Subject: [PATCH] Refactor Admin UI to use unified IAM storage and add MultipleFileStore (#8101) * Refactor Admin UI to use unified IAM storage and add MultipleFileStore * Address PR feedback: fix renames, error handling, and sync logic in FilerMultipleStore * Address refined PR feedback: safe rename order, rollback logic, and structural sync refinement * Optimize LoadConfiguration: use streaming callback for memory efficiency * Refactor UpdateUser: log rollback failures during rename * Implement PolicyManager for FilerMultipleStore * include the filer_multiple backend configuration * Implement cross-S3 synchronization and proper shutdown for all IAM backends * Extract Admin UI refactoring to a separate PR --- weed/command/imports.go | 3 + weed/command/s3.go | 1 + weed/command/scaffold/credential.toml | 5 + weed/credential/credential_store.go | 7 +- .../filer_multiple/filer_multiple_store.go | 490 ++++++++++++++++++ weed/iamapi/iamapi_server.go | 3 + weed/s3api/auth_credentials.go | 48 +- weed/s3api/auth_credentials_subscribe.go | 44 +- weed/s3api/s3api_server.go | 12 +- 9 files changed, 586 insertions(+), 27 deletions(-) create mode 100644 weed/credential/filer_multiple/filer_multiple_store.go diff --git a/weed/command/imports.go b/weed/command/imports.go index d3cefc703..fbec3d7d0 100644 --- a/weed/command/imports.go +++ b/weed/command/imports.go @@ -34,4 +34,7 @@ import ( _ "github.com/seaweedfs/seaweedfs/weed/filer/tarantool" _ "github.com/seaweedfs/seaweedfs/weed/filer/tikv" _ "github.com/seaweedfs/seaweedfs/weed/filer/ydb" + + _ "github.com/seaweedfs/seaweedfs/weed/credential/filer_etc" + _ "github.com/seaweedfs/seaweedfs/weed/credential/filer_multiple" ) diff --git a/weed/command/s3.go b/weed/command/s3.go index e84a4f45d..211e51213 100644 --- a/weed/command/s3.go +++ b/weed/command/s3.go @@ -297,6 +297,7 @@ func (s3opt *S3Options) startS3Server() bool { if s3ApiServer_err != nil { glog.Fatalf("S3 API Server startup error: %v", s3ApiServer_err) } + defer s3ApiServer.Shutdown() if *s3opt.portGrpc == 0 { *s3opt.portGrpc = 10000 + *s3opt.port diff --git a/weed/command/scaffold/credential.toml b/weed/command/scaffold/credential.toml index d217786d6..c3380739a 100644 --- a/weed/command/scaffold/credential.toml +++ b/weed/command/scaffold/credential.toml @@ -12,6 +12,11 @@ enabled = true # filer address and grpc_dial_option will be automatically configured by the server +# Multi-file credential store (stores each user/policy in a separate file) +[credential.filer_multiple] +enabled = false +# filer address and grpc_dial_option will be automatically configured by the server + # PostgreSQL credential store (recommended for multi-node deployments) [credential.postgres] diff --git a/weed/credential/credential_store.go b/weed/credential/credential_store.go index 9bcb69260..6e1b4a3ae 100644 --- a/weed/credential/credential_store.go +++ b/weed/credential/credential_store.go @@ -21,9 +21,10 @@ type CredentialStoreTypeName string // Credential store name constants const ( - StoreTypeMemory CredentialStoreTypeName = "memory" - StoreTypeFilerEtc CredentialStoreTypeName = "filer_etc" - StoreTypePostgres CredentialStoreTypeName = "postgres" + StoreTypeMemory CredentialStoreTypeName = "memory" + StoreTypeFilerEtc CredentialStoreTypeName = "filer_etc" + StoreTypeFilerMultiple CredentialStoreTypeName = "filer_multiple" + StoreTypePostgres CredentialStoreTypeName = "postgres" ) // CredentialStore defines the interface for user credential storage and retrieval diff --git a/weed/credential/filer_multiple/filer_multiple_store.go b/weed/credential/filer_multiple/filer_multiple_store.go new file mode 100644 index 000000000..df94fcb9c --- /dev/null +++ b/weed/credential/filer_multiple/filer_multiple_store.go @@ -0,0 +1,490 @@ +package filer_multiple + +import ( + "context" + "encoding/json" + "fmt" + "strings" + "sync" + + "github.com/seaweedfs/seaweedfs/weed/credential" + "github.com/seaweedfs/seaweedfs/weed/filer" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/iam_pb" + "github.com/seaweedfs/seaweedfs/weed/s3api/policy_engine" + "github.com/seaweedfs/seaweedfs/weed/util" + "google.golang.org/grpc" +) + +const ( + IdentitiesDirectory = "/etc/seaweedfs/identities" + PoliciesDirectory = "/etc/seaweedfs/policies" +) + +func init() { + credential.Stores = append(credential.Stores, &FilerMultipleStore{}) +} + +// FilerMultipleStore implements CredentialStore using SeaweedFS filer for storage +// storing each identity in a separate file +type FilerMultipleStore struct { + filerAddressFunc func() pb.ServerAddress // Function to get current active filer + grpcDialOption grpc.DialOption + mu sync.RWMutex // Protects filerAddressFunc and grpcDialOption +} + +func (store *FilerMultipleStore) GetName() credential.CredentialStoreTypeName { + return credential.StoreTypeFilerMultiple +} + +func (store *FilerMultipleStore) Initialize(configuration util.Configuration, prefix string) error { + // Handle nil configuration gracefully + if configuration != nil { + filerAddr := configuration.GetString(prefix + "filer") + if filerAddr != "" { + // Static configuration - use fixed address + store.mu.Lock() + store.filerAddressFunc = func() pb.ServerAddress { + return pb.ServerAddress(filerAddr) + } + store.mu.Unlock() + } + } + // Note: filerAddressFunc can be set later via SetFilerAddressFunc method + return nil +} + +// SetFilerAddressFunc sets a function that returns the current active filer address +// This enables high availability by using the currently active filer +func (store *FilerMultipleStore) SetFilerAddressFunc(getFiler func() pb.ServerAddress, grpcDialOption grpc.DialOption) { + store.mu.Lock() + defer store.mu.Unlock() + store.filerAddressFunc = getFiler + store.grpcDialOption = grpcDialOption +} + +// withFilerClient executes a function with a filer client +func (store *FilerMultipleStore) withFilerClient(fn func(client filer_pb.SeaweedFilerClient) error) error { + store.mu.RLock() + if store.filerAddressFunc == nil { + store.mu.RUnlock() + return fmt.Errorf("filer_multiple: filer not yet available - please wait for filer discovery to complete and try again") + } + + filerAddress := store.filerAddressFunc() + dialOption := store.grpcDialOption + store.mu.RUnlock() + + if filerAddress == "" { + return fmt.Errorf("filer_multiple: no filer discovered yet - please ensure a filer is running and accessible") + } + + // Use the pb.WithGrpcFilerClient helper similar to existing code + return pb.WithGrpcFilerClient(false, 0, filerAddress, dialOption, fn) +} + +func (store *FilerMultipleStore) Shutdown() { + // No cleanup needed for file store +} + +func (store *FilerMultipleStore) LoadConfiguration(ctx context.Context) (*iam_pb.S3ApiConfiguration, error) { + s3cfg := &iam_pb.S3ApiConfiguration{} + + err := store.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { + // List and process all identity files in the directory using streaming callback + return filer_pb.SeaweedList(ctx, client, IdentitiesDirectory, "", func(entry *filer_pb.Entry, isLast bool) error { + if entry.IsDirectory || !strings.HasSuffix(entry.Name, ".json") { + return nil + } + + content, err := filer.ReadInsideFiler(client, IdentitiesDirectory, entry.Name) + if err != nil { + glog.Warningf("Failed to read identity file %s: %v", entry.Name, err) + return nil // Continue with next file + } + + identity := &iam_pb.Identity{} + if err := json.Unmarshal(content, identity); err != nil { + glog.Warningf("Failed to parse identity file %s: %v", entry.Name, err) + return nil // Continue with next file + } + + s3cfg.Identities = append(s3cfg.Identities, identity) + return nil + }, "", false, 10000) + }) + + if err != nil { + // If listing failed because directory doesn't exist, treat as empty config + if err == filer_pb.ErrNotFound { + return s3cfg, nil + } + return s3cfg, err + } + + return s3cfg, nil +} + +func (store *FilerMultipleStore) SaveConfiguration(ctx context.Context, config *iam_pb.S3ApiConfiguration) error { + // This operation is expensive for multiple files mode as it would overwrite everything + // But we implement it for interface compliance. + // We will write each identity to a separate file and remove stale files. + + return store.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { + // 1. List existing identity files + existingFileNames := make(map[string]bool) + err := filer_pb.SeaweedList(ctx, client, IdentitiesDirectory, "", func(entry *filer_pb.Entry, isLast bool) error { + if !entry.IsDirectory && strings.HasSuffix(entry.Name, ".json") { + existingFileNames[entry.Name] = true + } + return nil + }, "", false, 10000) + + if err != nil && err != filer_pb.ErrNotFound { + return fmt.Errorf("failed to list existing identities: %w", err) + } + + // 2. Build a set of identity keys present in the provided config + newKeys := make(map[string]bool) + for _, identity := range config.Identities { + newKeys[identity.Name+".json"] = true + } + + // 3. Write/overwrite each identity using saveIdentity + for _, identity := range config.Identities { + if err := store.saveIdentity(ctx, client, identity); err != nil { + return err + } + } + + // 4. Delete any existing files whose identity key is not in the new set + for filename := range existingFileNames { + if !newKeys[filename] { + err := filer_pb.DoRemove(ctx, client, IdentitiesDirectory, filename, false, false, false, false, nil) + if err != nil && err != filer_pb.ErrNotFound { + glog.Warningf("failed to remove stale identity file %s: %v", filename, err) + } + } + } + + return nil + }) +} + +func (store *FilerMultipleStore) saveIdentity(ctx context.Context, client filer_pb.SeaweedFilerClient, identity *iam_pb.Identity) error { + data, err := json.Marshal(identity) + if err != nil { + return fmt.Errorf("failed to marshal identity %s: %w", identity.Name, err) + } + + filename := identity.Name + ".json" + return filer.SaveInsideFiler(client, IdentitiesDirectory, filename, data) +} + +func (store *FilerMultipleStore) CreateUser(ctx context.Context, identity *iam_pb.Identity) error { + return store.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { + filename := identity.Name + ".json" + // Check if exists + exists, err := store.exists(ctx, client, IdentitiesDirectory, filename) + if err != nil { + return err + } + if exists { + return credential.ErrUserAlreadyExists + } + + return store.saveIdentity(ctx, client, identity) + }) +} + +func (store *FilerMultipleStore) exists(ctx context.Context, client filer_pb.SeaweedFilerClient, dir, name string) (bool, error) { + request := &filer_pb.LookupDirectoryEntryRequest{ + Directory: dir, + Name: name, + } + resp, err := filer_pb.LookupEntry(ctx, client, request) + if err != nil { + if err == filer_pb.ErrNotFound { + return false, nil + } + return false, err + } + return resp.Entry != nil, nil +} + +func (store *FilerMultipleStore) GetUser(ctx context.Context, username string) (*iam_pb.Identity, error) { + var identity *iam_pb.Identity + err := store.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { + filename := username + ".json" + content, err := filer.ReadInsideFiler(client, IdentitiesDirectory, filename) + if err != nil { + if err == filer_pb.ErrNotFound { + return credential.ErrUserNotFound + } + return err + } + + identity = &iam_pb.Identity{} + if err := json.Unmarshal(content, identity); err != nil { + return fmt.Errorf("failed to parse identity: %w", err) + } + return nil + }) + return identity, err +} + +func (store *FilerMultipleStore) UpdateUser(ctx context.Context, username string, identity *iam_pb.Identity) error { + return store.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { + filename := username + ".json" + // Check if exists + exists, err := store.exists(ctx, client, IdentitiesDirectory, filename) + if err != nil { + return err + } + if !exists { + return credential.ErrUserNotFound + } + + // If username changed (renamed), we need to create new file and then delete old one + if identity.Name != username { + // Check if the new username already exists to prevent overwrites + newFilename := identity.Name + ".json" + exists, err := store.exists(ctx, client, IdentitiesDirectory, newFilename) + if err != nil { + return err + } + if exists { + return fmt.Errorf("user %s already exists", identity.Name) + } + + // Create new identity file FIRST + if err := store.saveIdentity(ctx, client, identity); err != nil { + return err + } + + // Delete old user file SECOND + err = filer_pb.DoRemove(ctx, client, IdentitiesDirectory, filename, false, false, false, false, nil) + if err != nil && err != filer_pb.ErrNotFound { + // Rollback: try to remove the newly created file if deleting the old one failed + if errRollback := filer_pb.DoRemove(ctx, client, IdentitiesDirectory, newFilename, false, false, false, false, nil); errRollback != nil { + glog.Errorf("Rollback of creating %s failed after failing to remove %s: %v", newFilename, filename, errRollback) + } + return fmt.Errorf("failed to remove old identity file %s: %w", filename, err) + } + return nil + } + + return store.saveIdentity(ctx, client, identity) + }) +} + +func (store *FilerMultipleStore) DeleteUser(ctx context.Context, username string) error { + return store.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { + filename := username + ".json" + err := filer_pb.DoRemove(ctx, client, IdentitiesDirectory, filename, false, false, false, false, nil) + if err != nil { + if err == filer_pb.ErrNotFound { + return nil + } + return err + } + return nil + }) +} + +func (store *FilerMultipleStore) ListUsers(ctx context.Context) ([]string, error) { + var usernames []string + err := store.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { + err := filer_pb.SeaweedList(ctx, client, IdentitiesDirectory, "", func(entry *filer_pb.Entry, isLast bool) error { + if !entry.IsDirectory && strings.HasSuffix(entry.Name, ".json") { + name := strings.TrimSuffix(entry.Name, ".json") + usernames = append(usernames, name) + } + return nil + }, "", false, 10000) + + if err != nil { + if err == filer_pb.ErrNotFound { + // Treat as empty if directory not found + return nil + } + return err + } + return nil + }) + return usernames, err +} + +func (store *FilerMultipleStore) GetUserByAccessKey(ctx context.Context, accessKey string) (*iam_pb.Identity, error) { + // This is inefficient in file store without index. + // We must iterate all users. + config, err := store.LoadConfiguration(ctx) + if err != nil { + return nil, err + } + + for _, identity := range config.Identities { + for _, credential := range identity.Credentials { + if credential.AccessKey == accessKey { + return identity, nil + } + } + } + + return nil, credential.ErrAccessKeyNotFound +} + +func (store *FilerMultipleStore) CreateAccessKey(ctx context.Context, username string, cred *iam_pb.Credential) error { + identity, err := store.GetUser(ctx, username) + if err != nil { + return err + } + + // Check duplicates + for _, existing := range identity.Credentials { + if existing.AccessKey == cred.AccessKey { + return fmt.Errorf("access key already exists") + } + } + + identity.Credentials = append(identity.Credentials, cred) + return store.UpdateUser(ctx, username, identity) +} + +func (store *FilerMultipleStore) DeleteAccessKey(ctx context.Context, username string, accessKey string) error { + identity, err := store.GetUser(ctx, username) + if err != nil { + return err + } + + found := false + for i, cred := range identity.Credentials { + if cred.AccessKey == accessKey { + identity.Credentials = append(identity.Credentials[:i], identity.Credentials[i+1:]...) + found = true + break + } + } + + if !found { + return credential.ErrAccessKeyNotFound + } + + return store.UpdateUser(ctx, username, identity) +} + +// PolicyManager implementation + +func (store *FilerMultipleStore) GetPolicies(ctx context.Context) (map[string]policy_engine.PolicyDocument, error) { + policies := make(map[string]policy_engine.PolicyDocument) + + err := store.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { + return filer_pb.SeaweedList(ctx, client, PoliciesDirectory, "", func(entry *filer_pb.Entry, isLast bool) error { + if entry.IsDirectory || !strings.HasSuffix(entry.Name, ".json") { + return nil + } + + content, err := filer.ReadInsideFiler(client, PoliciesDirectory, entry.Name) + if err != nil { + glog.Warningf("Failed to read policy file %s: %v", entry.Name, err) + return nil + } + + var policy policy_engine.PolicyDocument + if err := json.Unmarshal(content, &policy); err != nil { + glog.Warningf("Failed to parse policy file %s: %v", entry.Name, err) + return nil + } + + name := strings.TrimSuffix(entry.Name, ".json") + policies[name] = policy + return nil + }, "", false, 10000) + }) + + if err != nil { + if err == filer_pb.ErrNotFound { + return policies, nil + } + return nil, err + } + + return policies, nil +} + +func (store *FilerMultipleStore) CreatePolicy(ctx context.Context, name string, document policy_engine.PolicyDocument) error { + return store.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { + filename := name + ".json" + exists, err := store.exists(ctx, client, PoliciesDirectory, filename) + if err != nil { + return err + } + if exists { + return fmt.Errorf("policy %s already exists", name) + } + + return store.savePolicy(ctx, client, name, document) + }) +} + +func (store *FilerMultipleStore) UpdatePolicy(ctx context.Context, name string, document policy_engine.PolicyDocument) error { + return store.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { + filename := name + ".json" + exists, err := store.exists(ctx, client, PoliciesDirectory, filename) + if err != nil { + return err + } + if !exists { + return fmt.Errorf("policy %s not found", name) + } + + return store.savePolicy(ctx, client, name, document) + }) +} + +func (store *FilerMultipleStore) DeletePolicy(ctx context.Context, name string) error { + return store.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { + filename := name + ".json" + err := filer_pb.DoRemove(ctx, client, PoliciesDirectory, filename, false, false, false, false, nil) + if err != nil { + if err == filer_pb.ErrNotFound { + return nil + } + return err + } + return nil + }) +} + +func (store *FilerMultipleStore) GetPolicy(ctx context.Context, name string) (*policy_engine.PolicyDocument, error) { + var policy *policy_engine.PolicyDocument + err := store.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { + filename := name + ".json" + content, err := filer.ReadInsideFiler(client, PoliciesDirectory, filename) + if err != nil { + if err == filer_pb.ErrNotFound { + return nil + } + return err + } + + policy = &policy_engine.PolicyDocument{} + if err := json.Unmarshal(content, policy); err != nil { + return fmt.Errorf("failed to parse policy: %w", err) + } + return nil + }) + return policy, err +} + +func (store *FilerMultipleStore) savePolicy(ctx context.Context, client filer_pb.SeaweedFilerClient, name string, document policy_engine.PolicyDocument) error { + data, err := json.Marshal(document) + if err != nil { + return fmt.Errorf("failed to marshal policy %s: %w", name, err) + } + + filename := name + ".json" + return filer.SaveInsideFiler(client, PoliciesDirectory, filename, data) +} diff --git a/weed/iamapi/iamapi_server.go b/weed/iamapi/iamapi_server.go index 602c2f28e..c1991c703 100644 --- a/weed/iamapi/iamapi_server.go +++ b/weed/iamapi/iamapi_server.go @@ -129,6 +129,9 @@ func (iama *IamApiServer) Shutdown() { glog.V(0).Infof("IAM API server shutting down, stopping master client connection") iama.shutdownCancel() } + if iama.iam != nil { + iama.iam.Shutdown() + } } func (iama *IamS3ApiConfigure) GetS3ApiConfiguration(s3cfg *iam_pb.S3ApiConfiguration) (err error) { diff --git a/weed/s3api/auth_credentials.go b/weed/s3api/auth_credentials.go index 9e3753abd..2d28b94c5 100644 --- a/weed/s3api/auth_credentials.go +++ b/weed/s3api/auth_credentials.go @@ -61,6 +61,9 @@ type IdentityAccessManagement struct { // Bucket policy engine for evaluating bucket policies policyEngine *BucketPolicyEngine + // background polling + stopChan chan struct{} + // useStaticConfig indicates if the configuration was loaded from a static file useStaticConfig bool @@ -161,8 +164,8 @@ func NewIdentityAccessManagementWithStore(option *S3ApiServerOption, explicitSto } iam.credentialManager = credentialManager + iam.stopChan = make(chan struct{}) - // First, try to load configurations from file or filer // First, try to load configurations from file or filer startConfigFile := option.Config if startConfigFile == "" { @@ -186,18 +189,22 @@ func NewIdentityAccessManagementWithStore(option *S3ApiServerOption, explicitSto iam.m.Unlock() } - // Always try to load/merge config from credential manager (filer) - // This ensures we get both static users (from file) and dynamic users (from filer) + // Always try to load/merge config from credential manager (filer/db) + // This ensures we get both static users (from file) and dynamic users (from backend) glog.V(3).Infof("loading dynamic config from credential manager") if err := iam.loadS3ApiConfigurationFromFiler(option); err != nil { glog.Warningf("fail to load config: %v", err) } - // Only consider config loaded if we actually have identities - // Don't block environment variable fallback just because filer call succeeded - // iam.m.RLock() - // configLoaded = len(iam.identities) > 0 - // iam.m.RUnlock() + // Determine whether to start background polling for updates + // We poll if using a store that doesn't support real-time events (like Postgres) + if store := iam.credentialManager.GetStore(); store != nil { + storeName := store.GetName() + if storeName == credential.StoreTypePostgres { + glog.V(1).Infof("Starting background IAM polling for store: %s", storeName) + go iam.pollIamConfigChanges(1 * time.Minute) + } + } // Check for AWS environment variables and merge them if present // This serves as an in-memory "static" configuration @@ -227,6 +234,31 @@ func NewIdentityAccessManagementWithStore(option *S3ApiServerOption, explicitSto return iam } +func (iam *IdentityAccessManagement) pollIamConfigChanges(interval time.Duration) { + ticker := time.NewTicker(interval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + if err := iam.LoadS3ApiConfigurationFromCredentialManager(); err != nil { + glog.Warningf("failed to reload IAM configuration via polling: %v", err) + } + case <-iam.stopChan: + return + } + } +} + +func (iam *IdentityAccessManagement) Shutdown() { + if iam.stopChan != nil { + close(iam.stopChan) + } + if iam.credentialManager != nil { + iam.credentialManager.Shutdown() + } +} + // loadEnvironmentVariableCredentials loads AWS credentials from environment variables // and adds them as a static admin identity. This function is idempotent and can be // called multiple times (e.g., after configuration reloads). diff --git a/weed/s3api/auth_credentials_subscribe.go b/weed/s3api/auth_credentials_subscribe.go index 99aa2e8d3..b0a04ffe1 100644 --- a/weed/s3api/auth_credentials_subscribe.go +++ b/weed/s3api/auth_credentials_subscribe.go @@ -2,6 +2,7 @@ package s3api import ( "errors" + "strings" "time" "github.com/seaweedfs/seaweedfs/weed/filer" @@ -56,31 +57,48 @@ func (s3a *S3ApiServer) subscribeMetaEvents(clientName string, lastTsNs int64, p // onIamConfigChange handles IAM config file changes (create, update, delete) func (s3a *S3ApiServer) onIamConfigChange(dir string, oldEntry *filer_pb.Entry, newEntry *filer_pb.Entry) error { - if dir != filer.IamConfigDirectory { - return nil - } if s3a.iam != nil && s3a.iam.IsStaticConfig() { glog.V(1).Infof("Skipping IAM config update for static configuration") return nil } - // Handle deletion: reset to empty config - if newEntry == nil && oldEntry != nil && oldEntry.Name == filer.IamIdentityFile { - glog.V(1).Infof("IAM config file deleted, clearing identities") - if err := s3a.iam.LoadS3ApiConfigurationFromBytes([]byte{}); err != nil { - glog.Warningf("failed to clear IAM config on deletion: %v", err) - return err + // 1. Handle traditional single identity.json file + if dir == filer.IamConfigDirectory { + // Handle deletion: reset to empty config + if newEntry == nil && oldEntry != nil && oldEntry.Name == filer.IamIdentityFile { + glog.V(1).Infof("IAM config file deleted, clearing identities") + if err := s3a.iam.LoadS3ApiConfigurationFromBytes([]byte{}); err != nil { + glog.Warningf("failed to clear IAM config on deletion: %v", err) + return err + } + return nil + } + + // Handle create/update + if newEntry != nil && newEntry.Name == filer.IamIdentityFile { + if err := s3a.iam.LoadS3ApiConfigurationFromBytes(newEntry.Content); err != nil { + return err + } + glog.V(1).Infof("updated %s/%s", dir, newEntry.Name) } return nil } - // Handle create/update - if newEntry != nil && newEntry.Name == filer.IamIdentityFile { - if err := s3a.iam.LoadS3ApiConfigurationFromBytes(newEntry.Content); err != nil { + // 2. Handle multiple-file identities and policies + // Watch /etc/seaweedfs/identities and /etc/seaweedfs/policies + isIdentityDir := strings.HasPrefix(dir, "/etc/seaweedfs/identities") + isPolicyDir := strings.HasPrefix(dir, "/etc/seaweedfs/policies") + + if isIdentityDir || isPolicyDir { + // For multiple-file mode, any change in these directories should trigger a full reload + // from the credential manager (which handles the details of loading from multiple files). + glog.V(1).Infof("IAM change detected in %s, reloading configuration", dir) + if err := s3a.iam.LoadS3ApiConfigurationFromCredentialManager(); err != nil { + glog.Errorf("failed to reload IAM configuration after change in %s: %v", dir, err) return err } - glog.V(1).Infof("updated %s/%s", dir, newEntry.Name) } + return nil } diff --git a/weed/s3api/s3api_server.go b/weed/s3api/s3api_server.go index c50499640..e219265bb 100644 --- a/weed/s3api/s3api_server.go +++ b/weed/s3api/s3api_server.go @@ -246,6 +246,12 @@ func NewS3ApiServerWithStore(router *mux.Router, option *S3ApiServerOption, expl return s3ApiServer, nil } +func (s3a *S3ApiServer) Shutdown() { + if s3a.iam != nil { + s3a.iam.Shutdown() + } +} + // getFilerAddress returns the current active filer address // Uses FilerClient's tracked current filer which is updated on successful operations // This provides better availability than always using the first filer @@ -675,14 +681,14 @@ func (s3a *S3ApiServer) registerRouter(router *mux.Router) { // ParseForm() consumes the request body, which breaks AWS Signature V4 verification // for IAM requests. The signature must be calculated on the original body. // Instead, check only the query string for the Action parameter. - + // For IAM requests, the Action is typically in the POST body, not query string // So we match all authenticated POST / requests and let AuthIam validate them // This is safe because: // 1. STS actions are excluded (handled by separate STS routes) // 2. S3 operations don't POST to / (they use / or //) // 3. IAM operations all POST to / - + // Only exclude STS actions which might be in query string action := r.URL.Query().Get("Action") if action == "AssumeRole" || action == "AssumeRoleWithWebIdentity" || action == "AssumeRoleWithLDAPIdentity" { @@ -695,7 +701,7 @@ func (s3a *S3ApiServer) registerRouter(router *mux.Router) { apiRouter.Methods(http.MethodPost).Path("/").MatcherFunc(iamMatcher). HandlerFunc(track(s3a.embeddedIam.AuthIam(s3a.cb.Limit(s3a.embeddedIam.DoActions, ACTION_WRITE)), "IAM")) - + glog.V(1).Infof("Embedded IAM API enabled on S3 port") }