diff --git a/test/s3/policy/policy_test.go b/test/s3/policy/policy_test.go index f0d4c9d2e..509834bb8 100644 --- a/test/s3/policy/policy_test.go +++ b/test/s3/policy/policy_test.go @@ -14,6 +14,11 @@ import ( "testing" "time" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/awserr" + "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/iam" "github.com/seaweedfs/seaweedfs/weed/command" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb" @@ -130,6 +135,72 @@ func TestS3PolicyShellRevised(t *testing.T) { } } +func TestS3IAMAttachDetachUserPolicy(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + cluster, err := startMiniCluster(t) + require.NoError(t, err) + defer cluster.Stop() + + time.Sleep(500 * time.Millisecond) + + policyName := uniqueName("managed-policy") + policyArn := fmt.Sprintf("arn:aws:iam:::policy/%s", policyName) + policyContent := `{"Version":"2012-10-17","Statement":[{"Effect":"Allow","Action":"*","Resource":"*"}]}` + tmpPolicyFile, err := os.CreateTemp("", "test_policy_attach_*.json") + require.NoError(t, err) + defer os.Remove(tmpPolicyFile.Name()) + _, err = tmpPolicyFile.WriteString(policyContent) + require.NoError(t, err) + require.NoError(t, tmpPolicyFile.Close()) + + weedCmd := "weed" + masterAddr := string(pb.NewServerAddress("127.0.0.1", cluster.masterPort, cluster.masterGrpcPort)) + filerAddr := string(pb.NewServerAddress("127.0.0.1", cluster.filerPort, cluster.filerGrpcPort)) + execShell(t, weedCmd, masterAddr, filerAddr, fmt.Sprintf("s3.policy -put -name=%s -file=%s", policyName, tmpPolicyFile.Name())) + + iamClient := newIAMClient(t, cluster.s3Endpoint) + + userName := uniqueName("iam-user") + _, err = iamClient.CreateUser(&iam.CreateUserInput{UserName: aws.String(userName)}) + require.NoError(t, err) + + _, err = iamClient.AttachUserPolicy(&iam.AttachUserPolicyInput{ + UserName: aws.String(userName), + PolicyArn: aws.String(policyArn), + }) + require.NoError(t, err) + + listOut, err := iamClient.ListAttachedUserPolicies(&iam.ListAttachedUserPoliciesInput{ + UserName: aws.String(userName), + }) + require.NoError(t, err) + require.True(t, attachedPolicyContains(listOut.AttachedPolicies, policyName)) + + _, err = iamClient.DetachUserPolicy(&iam.DetachUserPolicyInput{ + UserName: aws.String(userName), + PolicyArn: aws.String(policyArn), + }) + require.NoError(t, err) + + listOut, err = iamClient.ListAttachedUserPolicies(&iam.ListAttachedUserPoliciesInput{ + UserName: aws.String(userName), + }) + require.NoError(t, err) + require.False(t, attachedPolicyContains(listOut.AttachedPolicies, policyName)) + + _, err = iamClient.AttachUserPolicy(&iam.AttachUserPolicyInput{ + UserName: aws.String(userName), + PolicyArn: aws.String("arn:aws:iam:::policy/does-not-exist"), + }) + require.Error(t, err) + if awsErr, ok := err.(awserr.Error); ok { + require.Equal(t, iam.ErrCodeNoSuchEntityException, awsErr.Code()) + } +} + func execShell(t *testing.T, weedCmd, master, filer, shellCmd string) string { // weed shell -master=... -filer=... args := []string{"shell", "-master=" + master, "-filer=" + filer} @@ -145,6 +216,43 @@ func execShell(t *testing.T, weedCmd, master, filer, shellCmd string) string { return string(out) } +func newIAMClient(t *testing.T, endpoint string) *iam.IAM { + t.Helper() + + accessKey := os.Getenv("AWS_ACCESS_KEY_ID") + secretKey := os.Getenv("AWS_SECRET_ACCESS_KEY") + if accessKey == "" { + accessKey = "admin" + } + if secretKey == "" { + secretKey = "admin" + } + + sess, err := session.NewSession(&aws.Config{ + Region: aws.String("us-east-1"), + Endpoint: aws.String(endpoint), + DisableSSL: aws.Bool(true), + S3ForcePathStyle: aws.Bool(true), + Credentials: credentials.NewStaticCredentials(accessKey, secretKey, ""), + }) + require.NoError(t, err) + + return iam.New(sess) +} + +func attachedPolicyContains(policies []*iam.AttachedPolicy, policyName string) bool { + for _, policy := range policies { + if policy.PolicyName != nil && *policy.PolicyName == policyName { + return true + } + } + return false +} + +func uniqueName(prefix string) string { + return fmt.Sprintf("%s-%s", prefix, strconv.FormatInt(time.Now().UnixNano(), 36)) +} + // --- Test setup helpers --- func findAvailablePort() (int, error) { @@ -250,6 +358,7 @@ enabled = true "-master.volumeSizeLimitMB=32", "-ip=127.0.0.1", "-master.peers=none", + "-s3.iam.readOnly=false", } glog.MaxSize = 1024 * 1024 for _, cmd := range command.Commands { diff --git a/weed/credential/credential_manager.go b/weed/credential/credential_manager.go index cd912a5b2..591f52ff8 100644 --- a/weed/credential/credential_manager.go +++ b/weed/credential/credential_manager.go @@ -20,7 +20,7 @@ type FilerAddressSetter interface { // CredentialManager manages user credentials using a configurable store type CredentialManager struct { - store CredentialStore + Store CredentialStore } // NewCredentialManager creates a new credential manager with the specified store @@ -46,128 +46,128 @@ func NewCredentialManager(storeName CredentialStoreTypeName, configuration util. } return &CredentialManager{ - store: store, + Store: store, }, nil } func (cm *CredentialManager) SetMasterClient(masterClient *wdclient.MasterClient, grpcDialOption grpc.DialOption) { - cm.store = NewPropagatingCredentialStore(cm.store, masterClient, grpcDialOption) + cm.Store = NewPropagatingCredentialStore(cm.Store, masterClient, grpcDialOption) } // SetFilerAddressFunc sets the function to get the current filer address func (cm *CredentialManager) SetFilerAddressFunc(getFiler func() pb.ServerAddress, grpcDialOption grpc.DialOption) { - if s, ok := cm.store.(FilerAddressSetter); ok { + if s, ok := cm.Store.(FilerAddressSetter); ok { s.SetFilerAddressFunc(getFiler, grpcDialOption) } } // GetStore returns the underlying credential store func (cm *CredentialManager) GetStore() CredentialStore { - return cm.store + return cm.Store } // GetStoreName returns the name of the underlying credential store func (cm *CredentialManager) GetStoreName() string { - if cm.store != nil { - return string(cm.store.GetName()) + if cm.Store != nil { + return string(cm.Store.GetName()) } return "" } // LoadConfiguration loads the S3 API configuration func (cm *CredentialManager) LoadConfiguration(ctx context.Context) (*iam_pb.S3ApiConfiguration, error) { - return cm.store.LoadConfiguration(ctx) + return cm.Store.LoadConfiguration(ctx) } // SaveConfiguration saves the S3 API configuration func (cm *CredentialManager) SaveConfiguration(ctx context.Context, config *iam_pb.S3ApiConfiguration) error { - return cm.store.SaveConfiguration(ctx, config) + return cm.Store.SaveConfiguration(ctx, config) } // CreateUser creates a new user func (cm *CredentialManager) CreateUser(ctx context.Context, identity *iam_pb.Identity) error { - return cm.store.CreateUser(ctx, identity) + return cm.Store.CreateUser(ctx, identity) } // GetUser retrieves a user by username func (cm *CredentialManager) GetUser(ctx context.Context, username string) (*iam_pb.Identity, error) { - return cm.store.GetUser(ctx, username) + return cm.Store.GetUser(ctx, username) } // UpdateUser updates an existing user func (cm *CredentialManager) UpdateUser(ctx context.Context, username string, identity *iam_pb.Identity) error { - return cm.store.UpdateUser(ctx, username, identity) + return cm.Store.UpdateUser(ctx, username, identity) } // DeleteUser removes a user func (cm *CredentialManager) DeleteUser(ctx context.Context, username string) error { - return cm.store.DeleteUser(ctx, username) + return cm.Store.DeleteUser(ctx, username) } // ListUsers returns all usernames func (cm *CredentialManager) ListUsers(ctx context.Context) ([]string, error) { - return cm.store.ListUsers(ctx) + return cm.Store.ListUsers(ctx) } // GetUserByAccessKey retrieves a user by access key func (cm *CredentialManager) GetUserByAccessKey(ctx context.Context, accessKey string) (*iam_pb.Identity, error) { - return cm.store.GetUserByAccessKey(ctx, accessKey) + return cm.Store.GetUserByAccessKey(ctx, accessKey) } // CreateAccessKey creates a new access key for a user func (cm *CredentialManager) CreateAccessKey(ctx context.Context, username string, credential *iam_pb.Credential) error { - return cm.store.CreateAccessKey(ctx, username, credential) + return cm.Store.CreateAccessKey(ctx, username, credential) } // DeleteAccessKey removes an access key for a user func (cm *CredentialManager) DeleteAccessKey(ctx context.Context, username string, accessKey string) error { - return cm.store.DeleteAccessKey(ctx, username, accessKey) + return cm.Store.DeleteAccessKey(ctx, username, accessKey) } // GetPolicies returns all policies func (cm *CredentialManager) GetPolicies(ctx context.Context) (map[string]policy_engine.PolicyDocument, error) { - return cm.store.GetPolicies(ctx) + return cm.Store.GetPolicies(ctx) } // PutPolicy creates or updates a policy func (cm *CredentialManager) PutPolicy(ctx context.Context, name string, document policy_engine.PolicyDocument) error { - return cm.store.PutPolicy(ctx, name, document) + return cm.Store.PutPolicy(ctx, name, document) } // DeletePolicy removes a policy func (cm *CredentialManager) DeletePolicy(ctx context.Context, name string) error { - return cm.store.DeletePolicy(ctx, name) + return cm.Store.DeletePolicy(ctx, name) } // GetPolicy retrieves a policy by name func (cm *CredentialManager) GetPolicy(ctx context.Context, name string) (*policy_engine.PolicyDocument, error) { - return cm.store.GetPolicy(ctx, name) + return cm.Store.GetPolicy(ctx, name) } // CreatePolicy creates a new policy (if supported by the store) func (cm *CredentialManager) CreatePolicy(ctx context.Context, name string, document policy_engine.PolicyDocument) error { // Check if the store implements PolicyManager interface with CreatePolicy - if policyStore, ok := cm.store.(PolicyManager); ok { + if policyStore, ok := cm.Store.(PolicyManager); ok { return policyStore.CreatePolicy(ctx, name, document) } // Fallback to PutPolicy for stores that only implement CredentialStore - return cm.store.PutPolicy(ctx, name, document) + return cm.Store.PutPolicy(ctx, name, document) } // UpdatePolicy updates an existing policy (if supported by the store) func (cm *CredentialManager) UpdatePolicy(ctx context.Context, name string, document policy_engine.PolicyDocument) error { // Check if the store implements PolicyManager interface with UpdatePolicy - if policyStore, ok := cm.store.(PolicyManager); ok { + if policyStore, ok := cm.Store.(PolicyManager); ok { return policyStore.UpdatePolicy(ctx, name, document) } // Fallback to PutPolicy for stores that only implement CredentialStore - return cm.store.PutPolicy(ctx, name, document) + return cm.Store.PutPolicy(ctx, name, document) } // Shutdown performs cleanup func (cm *CredentialManager) Shutdown() { - if cm.store != nil { - cm.store.Shutdown() + if cm.Store != nil { + cm.Store.Shutdown() } } @@ -194,25 +194,40 @@ func GetAvailableStores() []CredentialStoreTypeName { // CreateServiceAccount creates a new service account func (cm *CredentialManager) CreateServiceAccount(ctx context.Context, sa *iam_pb.ServiceAccount) error { - return cm.store.CreateServiceAccount(ctx, sa) + return cm.Store.CreateServiceAccount(ctx, sa) } // UpdateServiceAccount updates an existing service account func (cm *CredentialManager) UpdateServiceAccount(ctx context.Context, id string, sa *iam_pb.ServiceAccount) error { - return cm.store.UpdateServiceAccount(ctx, id, sa) + return cm.Store.UpdateServiceAccount(ctx, id, sa) } // DeleteServiceAccount removes a service account func (cm *CredentialManager) DeleteServiceAccount(ctx context.Context, id string) error { - return cm.store.DeleteServiceAccount(ctx, id) + return cm.Store.DeleteServiceAccount(ctx, id) } // GetServiceAccount retrieves a service account by ID func (cm *CredentialManager) GetServiceAccount(ctx context.Context, id string) (*iam_pb.ServiceAccount, error) { - return cm.store.GetServiceAccount(ctx, id) + return cm.Store.GetServiceAccount(ctx, id) } // ListServiceAccounts returns all service accounts func (cm *CredentialManager) ListServiceAccounts(ctx context.Context) ([]*iam_pb.ServiceAccount, error) { - return cm.store.ListServiceAccounts(ctx) + return cm.Store.ListServiceAccounts(ctx) +} + +// AttachUserPolicy attaches a managed policy to a user +func (cm *CredentialManager) AttachUserPolicy(ctx context.Context, username string, policyName string) error { + return cm.Store.AttachUserPolicy(ctx, username, policyName) +} + +// DetachUserPolicy detaches a managed policy from a user +func (cm *CredentialManager) DetachUserPolicy(ctx context.Context, username string, policyName string) error { + return cm.Store.DetachUserPolicy(ctx, username, policyName) +} + +// ListAttachedUserPolicies returns the list of policy names attached to a user +func (cm *CredentialManager) ListAttachedUserPolicies(ctx context.Context, username string) ([]string, error) { + return cm.Store.ListAttachedUserPolicies(ctx, username) } diff --git a/weed/credential/credential_store.go b/weed/credential/credential_store.go index edefbd28b..1b174c634 100644 --- a/weed/credential/credential_store.go +++ b/weed/credential/credential_store.go @@ -15,6 +15,9 @@ var ( ErrUserAlreadyExists = errors.New("user already exists") ErrAccessKeyNotFound = errors.New("access key not found") ErrServiceAccountNotFound = errors.New("service account not found") + ErrPolicyNotFound = errors.New("policy not found") + ErrPolicyAlreadyAttached = errors.New("policy already attached") + ErrPolicyNotAttached = errors.New("policy not attached to user") ) // CredentialStoreTypeName represents the type name of a credential store @@ -81,6 +84,14 @@ type CredentialStore interface { ListServiceAccounts(ctx context.Context) ([]*iam_pb.ServiceAccount, error) GetServiceAccountByAccessKey(ctx context.Context, accessKey string) (*iam_pb.ServiceAccount, error) + // User Policy Attachment Management + // AttachUserPolicy attaches a managed policy to a user by policy name + AttachUserPolicy(ctx context.Context, username string, policyName string) error + // DetachUserPolicy detaches a managed policy from a user + DetachUserPolicy(ctx context.Context, username string, policyName string) error + // ListAttachedUserPolicies returns the list of policy names attached to a user + ListAttachedUserPolicies(ctx context.Context, username string) ([]string, error) + // Shutdown performs cleanup when the store is being shut down Shutdown() } diff --git a/weed/credential/filer_etc/filer_etc_identity.go b/weed/credential/filer_etc/filer_etc_identity.go index 09b4f319c..7a1c2c051 100644 --- a/weed/credential/filer_etc/filer_etc_identity.go +++ b/weed/credential/filer_etc/filer_etc_identity.go @@ -464,3 +464,65 @@ func listEntries(ctx context.Context, client filer_pb.SeaweedFilerClient, dir st } return entries, nil } + +// AttachUserPolicy attaches a managed policy to a user by policy name +func (store *FilerEtcStore) AttachUserPolicy(ctx context.Context, username string, policyName string) error { + // Get user + identity, err := store.GetUser(ctx, username) + if err != nil { + return err + } + + // Verify policy exists + policy, err := store.GetPolicy(ctx, policyName) + if err != nil { + return err + } + if policy == nil { + return credential.ErrPolicyNotFound + } + + // Check if already attached + for _, p := range identity.PolicyNames { + if p == policyName { + return credential.ErrPolicyAlreadyAttached + } + } + + identity.PolicyNames = append(identity.PolicyNames, policyName) + return store.saveIdentity(ctx, identity) +} + +// DetachUserPolicy detaches a managed policy from a user +func (store *FilerEtcStore) DetachUserPolicy(ctx context.Context, username string, policyName string) error { + identity, err := store.GetUser(ctx, username) + if err != nil { + return err + } + + found := false + var newPolicies []string + for _, p := range identity.PolicyNames { + if p == policyName { + found = true + } else { + newPolicies = append(newPolicies, p) + } + } + + if !found { + return credential.ErrPolicyNotAttached + } + + identity.PolicyNames = newPolicies + return store.saveIdentity(ctx, identity) +} + +// ListAttachedUserPolicies returns the list of policy names attached to a user +func (store *FilerEtcStore) ListAttachedUserPolicies(ctx context.Context, username string) ([]string, error) { + identity, err := store.GetUser(ctx, username) + if err != nil { + return nil, err + } + return identity.PolicyNames, nil +} diff --git a/weed/credential/grpc/grpc_identity.go b/weed/credential/grpc/grpc_identity.go index af0e5249e..2a6ceef8b 100644 --- a/weed/credential/grpc/grpc_identity.go +++ b/weed/credential/grpc/grpc_identity.go @@ -3,6 +3,7 @@ package grpc import ( "context" + "github.com/seaweedfs/seaweedfs/weed/credential" "github.com/seaweedfs/seaweedfs/weed/pb/iam_pb" ) @@ -118,3 +119,66 @@ func (store *IamGrpcStore) DeleteAccessKey(ctx context.Context, username string, return err }) } + +// AttachUserPolicy attaches a managed policy to a user by policy name +func (store *IamGrpcStore) AttachUserPolicy(ctx context.Context, username string, policyName string) error { + // Get current user + identity, err := store.GetUser(ctx, username) + if err != nil { + return err + } + + // Verify policy exists + policy, err := store.GetPolicy(ctx, policyName) + if err != nil { + return err + } + if policy == nil { + return credential.ErrPolicyNotFound + } + + // Check if already attached + for _, p := range identity.PolicyNames { + if p == policyName { + // Already attached - return success (idempotent) + return nil + } + } + + identity.PolicyNames = append(identity.PolicyNames, policyName) + return store.UpdateUser(ctx, username, identity) +} + +// DetachUserPolicy detaches a managed policy from a user +func (store *IamGrpcStore) DetachUserPolicy(ctx context.Context, username string, policyName string) error { + identity, err := store.GetUser(ctx, username) + if err != nil { + return err + } + + found := false + var newPolicies []string + for _, p := range identity.PolicyNames { + if p == policyName { + found = true + } else { + newPolicies = append(newPolicies, p) + } + } + + if !found { + return credential.ErrPolicyNotAttached + } + + identity.PolicyNames = newPolicies + return store.UpdateUser(ctx, username, identity) +} + +// ListAttachedUserPolicies returns the list of policy names attached to a user +func (store *IamGrpcStore) ListAttachedUserPolicies(ctx context.Context, username string) ([]string, error) { + identity, err := store.GetUser(ctx, username) + if err != nil { + return nil, err + } + return identity.PolicyNames, nil +} diff --git a/weed/credential/memory/memory_identity.go b/weed/credential/memory/memory_identity.go index 191aa5d16..4430a7bf1 100644 --- a/weed/credential/memory/memory_identity.go +++ b/weed/credential/memory/memory_identity.go @@ -7,6 +7,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/credential" "github.com/seaweedfs/seaweedfs/weed/pb/iam_pb" + "github.com/seaweedfs/seaweedfs/weed/s3api/policy_engine" ) func (store *MemoryStore) LoadConfiguration(ctx context.Context) (*iam_pb.S3ApiConfiguration, error) { @@ -21,9 +22,16 @@ func (store *MemoryStore) LoadConfiguration(ctx context.Context) (*iam_pb.S3ApiC // Convert all users to identities for _, user := range store.users { - // Deep copy the identity to avoid mutation issues - identityCopy := store.deepCopyIdentity(user) - config.Identities = append(config.Identities, identityCopy) + config.Identities = append(config.Identities, store.deepCopyIdentity(user)) + } + + // Add all policies + for name, doc := range store.policies { + content, _ := json.Marshal(doc) + config.Policies = append(config.Policies, &iam_pb.Policy{ + Name: name, + Content: string(content), + }) } return config, nil @@ -40,12 +48,11 @@ func (store *MemoryStore) SaveConfiguration(ctx context.Context, config *iam_pb. // Clear existing data store.users = make(map[string]*iam_pb.Identity) store.accessKeys = make(map[string]string) + store.policies = make(map[string]policy_engine.PolicyDocument) // Add all identities for _, identity := range config.Identities { - // Deep copy to avoid mutation issues - identityCopy := store.deepCopyIdentity(identity) - store.users[identity.Name] = identityCopy + store.users[identity.Name] = store.deepCopyIdentity(identity) // Index access keys for _, credential := range identity.Credentials { @@ -53,6 +60,14 @@ func (store *MemoryStore) SaveConfiguration(ctx context.Context, config *iam_pb. } } + // Add all policies + for _, policy := range config.Policies { + var doc policy_engine.PolicyDocument + if err := json.Unmarshal([]byte(policy.Content), &doc); err == nil { + store.policies[policy.Name] = doc + } + } + return nil } @@ -284,6 +299,7 @@ func (store *MemoryStore) deepCopyIdentity(identity *iam_pb.Identity) *iam_pb.Id Account: identity.Account, Credentials: identity.Credentials, Actions: identity.Actions, + PolicyNames: identity.PolicyNames, } } @@ -295,8 +311,91 @@ func (store *MemoryStore) deepCopyIdentity(identity *iam_pb.Identity) *iam_pb.Id Account: identity.Account, Credentials: identity.Credentials, Actions: identity.Actions, + PolicyNames: identity.PolicyNames, } } return © } + +// AttachUserPolicy attaches a managed policy to a user by policy name +func (store *MemoryStore) AttachUserPolicy(ctx context.Context, username string, policyName string) error { + store.mu.Lock() + defer store.mu.Unlock() + + if !store.initialized { + return fmt.Errorf("store not initialized") + } + + user, exists := store.users[username] + if !exists { + return credential.ErrUserNotFound + } + + // Verify policy exists + if _, exists := store.policies[policyName]; !exists { + return credential.ErrPolicyNotFound + } + + // Check if already attached + for _, p := range user.PolicyNames { + if p == policyName { + return credential.ErrPolicyAlreadyAttached + } + } + + user.PolicyNames = append(user.PolicyNames, policyName) + return nil +} + +// DetachUserPolicy detaches a managed policy from a user +func (store *MemoryStore) DetachUserPolicy(ctx context.Context, username string, policyName string) error { + store.mu.Lock() + defer store.mu.Unlock() + + if !store.initialized { + return fmt.Errorf("store not initialized") + } + + user, exists := store.users[username] + if !exists { + return credential.ErrUserNotFound + } + + found := false + var newPolicies []string + for _, p := range user.PolicyNames { + if p == policyName { + found = true + } else { + newPolicies = append(newPolicies, p) + } + } + + if !found { + return credential.ErrPolicyNotAttached + } + + user.PolicyNames = newPolicies + return nil +} + +// ListAttachedUserPolicies returns the list of policy names attached to a user +func (store *MemoryStore) ListAttachedUserPolicies(ctx context.Context, username string) ([]string, error) { + store.mu.RLock() + defer store.mu.RUnlock() + + if !store.initialized { + return nil, fmt.Errorf("store not initialized") + } + + user, exists := store.users[username] + if !exists { + return nil, credential.ErrUserNotFound + } + + // Return copy to prevent mutation + result := make([]string, len(user.PolicyNames)) + copy(result, user.PolicyNames) + return result, nil +} diff --git a/weed/credential/postgres/postgres_identity.go b/weed/credential/postgres/postgres_identity.go index 11908b0d8..74934c9c4 100644 --- a/weed/credential/postgres/postgres_identity.go +++ b/weed/credential/postgres/postgres_identity.go @@ -18,7 +18,7 @@ func (store *PostgresStore) LoadConfiguration(ctx context.Context) (*iam_pb.S3Ap config := &iam_pb.S3ApiConfiguration{} // Query all users - rows, err := store.db.QueryContext(ctx, "SELECT username, email, account_data, actions FROM users") + rows, err := store.db.QueryContext(ctx, "SELECT username, email, account_data, actions, policy_names FROM users") if err != nil { return nil, fmt.Errorf("failed to query users: %w", err) } @@ -26,9 +26,9 @@ func (store *PostgresStore) LoadConfiguration(ctx context.Context) (*iam_pb.S3Ap for rows.Next() { var username, email string - var accountDataJSON, actionsJSON []byte + var accountDataJSON, actionsJSON, policyNamesJSON []byte - if err := rows.Scan(&username, &email, &accountDataJSON, &actionsJSON); err != nil { + if err := rows.Scan(&username, &email, &accountDataJSON, &actionsJSON, &policyNamesJSON); err != nil { return nil, fmt.Errorf("failed to scan user row: %w", err) } @@ -50,6 +50,13 @@ func (store *PostgresStore) LoadConfiguration(ctx context.Context) (*iam_pb.S3Ap } } + // Parse policy names + if len(policyNamesJSON) > 0 { + if err := json.Unmarshal(policyNamesJSON, &identity.PolicyNames); err != nil { + return nil, fmt.Errorf("failed to unmarshal policy names for user %s: %v", username, err) + } + } + // Query credentials for this user credRows, err := store.db.QueryContext(ctx, "SELECT access_key, secret_key FROM credentials WHERE username = $1", username) if err != nil { @@ -116,10 +123,19 @@ func (store *PostgresStore) SaveConfiguration(ctx context.Context, config *iam_p } } + // Marshal policy names + var policyNamesJSON []byte + if identity.PolicyNames != nil { + policyNamesJSON, err = json.Marshal(identity.PolicyNames) + if err != nil { + return fmt.Errorf("failed to marshal policy names for user %s: %v", identity.Name, err) + } + } + // Insert user _, err := tx.ExecContext(ctx, - "INSERT INTO users (username, email, account_data, actions) VALUES ($1, $2, $3, $4)", - identity.Name, "", accountDataJSON, actionsJSON) + "INSERT INTO users (username, email, account_data, actions, policy_names) VALUES ($1, $2, $3, $4, $5)", + identity.Name, "", accountDataJSON, actionsJSON, policyNamesJSON) if err != nil { return fmt.Errorf("failed to insert user %s: %v", identity.Name, err) } @@ -178,10 +194,19 @@ func (store *PostgresStore) CreateUser(ctx context.Context, identity *iam_pb.Ide } } + // Marshal policy names + var policyNamesJSON []byte + if identity.PolicyNames != nil { + policyNamesJSON, err = json.Marshal(identity.PolicyNames) + if err != nil { + return fmt.Errorf("failed to marshal policy names: %w", err) + } + } + // Insert user _, err = tx.ExecContext(ctx, - "INSERT INTO users (username, email, account_data, actions) VALUES ($1, $2, $3, $4)", - identity.Name, "", accountDataJSON, actionsJSON) + "INSERT INTO users (username, email, account_data, actions, policy_names) VALUES ($1, $2, $3, $4, $5)", + identity.Name, "", accountDataJSON, actionsJSON, policyNamesJSON) if err != nil { return fmt.Errorf("failed to insert user: %w", err) } @@ -205,11 +230,11 @@ func (store *PostgresStore) GetUser(ctx context.Context, username string) (*iam_ } var email string - var accountDataJSON, actionsJSON []byte + var accountDataJSON, actionsJSON, policyNamesJSON []byte err := store.db.QueryRowContext(ctx, - "SELECT email, account_data, actions FROM users WHERE username = $1", - username).Scan(&email, &accountDataJSON, &actionsJSON) + "SELECT email, account_data, actions, policy_names FROM users WHERE username = $1", + username).Scan(&email, &accountDataJSON, &actionsJSON, &policyNamesJSON) if err != nil { if err == sql.ErrNoRows { return nil, credential.ErrUserNotFound @@ -235,6 +260,13 @@ func (store *PostgresStore) GetUser(ctx context.Context, username string) (*iam_ } } + // Parse policy names + if len(policyNamesJSON) > 0 { + if err := json.Unmarshal(policyNamesJSON, &identity.PolicyNames); err != nil { + return nil, fmt.Errorf("failed to unmarshal policy names: %w", err) + } + } + // Query credentials rows, err := store.db.QueryContext(ctx, "SELECT access_key, secret_key FROM credentials WHERE username = $1", username) if err != nil { @@ -297,10 +329,19 @@ func (store *PostgresStore) UpdateUser(ctx context.Context, username string, ide } } + // Marshal policy names + var policyNamesJSON []byte + if identity.PolicyNames != nil { + policyNamesJSON, err = json.Marshal(identity.PolicyNames) + if err != nil { + return fmt.Errorf("failed to marshal policy names: %w", err) + } + } + // Update user _, err = tx.ExecContext(ctx, - "UPDATE users SET email = $2, account_data = $3, actions = $4, updated_at = CURRENT_TIMESTAMP WHERE username = $1", - username, "", accountDataJSON, actionsJSON) + "UPDATE users SET email = $2, account_data = $3, actions = $4, policy_names = $5, updated_at = CURRENT_TIMESTAMP WHERE username = $1", + username, "", accountDataJSON, actionsJSON, policyNamesJSON) if err != nil { return fmt.Errorf("failed to update user: %w", err) } @@ -444,3 +485,81 @@ func (store *PostgresStore) DeleteAccessKey(ctx context.Context, username string return nil } + +// AttachUserPolicy attaches a managed policy to a user by policy name +func (store *PostgresStore) AttachUserPolicy(ctx context.Context, username string, policyName string) error { + if !store.configured { + return fmt.Errorf("store not configured") + } + + // Get user + identity, err := store.GetUser(ctx, username) + if err != nil { + return err + } + + // Verify policy exists + policy, err := store.GetPolicy(ctx, policyName) + if err != nil { + return err + } + if policy == nil { + return credential.ErrPolicyNotFound + } + + // Check if already attached + for _, p := range identity.PolicyNames { + if p == policyName { + return credential.ErrPolicyAlreadyAttached + } + } + + // Append policy name and update + identity.PolicyNames = append(identity.PolicyNames, policyName) + return store.UpdateUser(ctx, username, identity) +} + +// DetachUserPolicy detaches a managed policy from a user +func (store *PostgresStore) DetachUserPolicy(ctx context.Context, username string, policyName string) error { + if !store.configured { + return fmt.Errorf("store not configured") + } + + // Get user + identity, err := store.GetUser(ctx, username) + if err != nil { + return err + } + + // Find and remove policy + found := false + var newPolicyNames []string + for _, p := range identity.PolicyNames { + if p == policyName { + found = true + } else { + newPolicyNames = append(newPolicyNames, p) + } + } + + if !found { + return credential.ErrPolicyNotAttached + } + + identity.PolicyNames = newPolicyNames + return store.UpdateUser(ctx, username, identity) +} + +// ListAttachedUserPolicies returns the list of policy names attached to a user +func (store *PostgresStore) ListAttachedUserPolicies(ctx context.Context, username string) ([]string, error) { + if !store.configured { + return nil, fmt.Errorf("store not configured") + } + + identity, err := store.GetUser(ctx, username) + if err != nil { + return nil, err + } + + return identity.PolicyNames, nil +} diff --git a/weed/credential/postgres/postgres_store.go b/weed/credential/postgres/postgres_store.go index 2e6fbd6c7..205e08ffa 100644 --- a/weed/credential/postgres/postgres_store.go +++ b/weed/credential/postgres/postgres_store.go @@ -93,12 +93,18 @@ func (store *PostgresStore) createTables() error { email VARCHAR(255), account_data JSONB, actions JSONB, + policy_names JSONB DEFAULT '[]', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); CREATE INDEX IF NOT EXISTS idx_users_email ON users(email); ` + // Migration: Add policy_names column if it doesn't exist (for existing installations) + addPolicyNamesColumn := ` + ALTER TABLE users ADD COLUMN IF NOT EXISTS policy_names JSONB DEFAULT '[]'; + ` + // Create credentials table credentialsTable := ` CREATE TABLE IF NOT EXISTS credentials ( @@ -139,6 +145,11 @@ func (store *PostgresStore) createTables() error { return fmt.Errorf("failed to create users table: %w", err) } + // Run migration to add policy_names column for existing installations + if _, err := store.db.Exec(addPolicyNamesColumn); err != nil { + return fmt.Errorf("failed to add policy_names column: %w", err) + } + if _, err := store.db.Exec(credentialsTable); err != nil { return fmt.Errorf("failed to create credentials table: %w", err) } diff --git a/weed/credential/propagating_store.go b/weed/credential/propagating_store.go index 333c781b6..bd6bce1c4 100644 --- a/weed/credential/propagating_store.go +++ b/weed/credential/propagating_store.go @@ -91,6 +91,46 @@ func (s *PropagatingCredentialStore) propagateChange(ctx context.Context, fn fun wg.Wait() } +func (s *PropagatingCredentialStore) AttachUserPolicy(ctx context.Context, username string, policyName string) error { + glog.V(4).Infof("IAM: PropagatingCredentialStore.AttachUserPolicy %s -> %s", username, policyName) + if err := s.CredentialStore.AttachUserPolicy(ctx, username, policyName); err != nil { + return err + } + // Fetch updated identity to propagate + identity, err := s.CredentialStore.GetUser(ctx, username) + if err != nil { + glog.Warningf("failed to get user %s after attaching policy: %v", username, err) + return nil + } + s.propagateChange(ctx, func(tx context.Context, client s3_pb.SeaweedS3IamCacheClient) error { + _, err := client.PutIdentity(tx, &iam_pb.PutIdentityRequest{Identity: identity}) + return err + }) + return nil +} + +func (s *PropagatingCredentialStore) DetachUserPolicy(ctx context.Context, username string, policyName string) error { + glog.V(4).Infof("IAM: PropagatingCredentialStore.DetachUserPolicy %s -> %s", username, policyName) + if err := s.CredentialStore.DetachUserPolicy(ctx, username, policyName); err != nil { + return err + } + // Fetch updated identity to propagate + identity, err := s.CredentialStore.GetUser(ctx, username) + if err != nil { + glog.Warningf("failed to get user %s after detaching policy: %v", username, err) + return nil + } + s.propagateChange(ctx, func(tx context.Context, client s3_pb.SeaweedS3IamCacheClient) error { + _, err := client.PutIdentity(tx, &iam_pb.PutIdentityRequest{Identity: identity}) + return err + }) + return nil +} + +func (s *PropagatingCredentialStore) ListAttachedUserPolicies(ctx context.Context, username string) ([]string, error) { + return s.CredentialStore.ListAttachedUserPolicies(ctx, username) +} + func (s *PropagatingCredentialStore) CreateUser(ctx context.Context, identity *iam_pb.Identity) error { glog.V(4).Infof("IAM: PropagatingCredentialStore.CreateUser %s", identity.Name) if err := s.CredentialStore.CreateUser(ctx, identity); err != nil { diff --git a/weed/iam/responses.go b/weed/iam/responses.go index 07a42b45a..2c72a5720 100644 --- a/weed/iam/responses.go +++ b/weed/iam/responses.go @@ -106,6 +106,29 @@ type DeleteUserPolicyResponse struct { XMLName xml.Name `xml:"https://iam.amazonaws.com/doc/2010-05-08/ DeleteUserPolicyResponse"` } +// AttachUserPolicyResponse is the response for AttachUserPolicy action. +type AttachUserPolicyResponse struct { + CommonResponse + XMLName xml.Name `xml:"https://iam.amazonaws.com/doc/2010-05-08/ AttachUserPolicyResponse"` +} + +// DetachUserPolicyResponse is the response for DetachUserPolicy action. +type DetachUserPolicyResponse struct { + CommonResponse + XMLName xml.Name `xml:"https://iam.amazonaws.com/doc/2010-05-08/ DetachUserPolicyResponse"` +} + +// ListAttachedUserPoliciesResponse is the response for ListAttachedUserPolicies action. +type ListAttachedUserPoliciesResponse struct { + CommonResponse + XMLName xml.Name `xml:"https://iam.amazonaws.com/doc/2010-05-08/ ListAttachedUserPoliciesResponse"` + ListAttachedUserPoliciesResult struct { + AttachedPolicies []*iam.AttachedPolicy `xml:"AttachedPolicies>member"` + IsTruncated bool `xml:"IsTruncated"` + Marker string `xml:"Marker,omitempty"` + } `xml:"ListAttachedUserPoliciesResult"` +} + // GetUserPolicyResponse is the response for GetUserPolicy action. type GetUserPolicyResponse struct { CommonResponse diff --git a/weed/s3api/s3api_embedded_iam.go b/weed/s3api/s3api_embedded_iam.go index 7181410f1..da88b7b0b 100644 --- a/weed/s3api/s3api_embedded_iam.go +++ b/weed/s3api/s3api_embedded_iam.go @@ -62,26 +62,30 @@ const ( // Operational limits (AWS IAM compatible) MaxServiceAccountsPerUser = 100 // Maximum service accounts per user MaxDescriptionLength = 1000 // Maximum description length in characters + MaxManagedPoliciesPerUser = 10 // Maximum managed policies attached to a user ) // Type aliases for IAM response types from shared package type ( - iamListUsersResponse = iamlib.ListUsersResponse - iamListAccessKeysResponse = iamlib.ListAccessKeysResponse - iamDeleteAccessKeyResponse = iamlib.DeleteAccessKeyResponse - iamCreatePolicyResponse = iamlib.CreatePolicyResponse - iamCreateUserResponse = iamlib.CreateUserResponse - iamDeleteUserResponse = iamlib.DeleteUserResponse - iamGetUserResponse = iamlib.GetUserResponse - iamUpdateUserResponse = iamlib.UpdateUserResponse - iamCreateAccessKeyResponse = iamlib.CreateAccessKeyResponse - iamPutUserPolicyResponse = iamlib.PutUserPolicyResponse - iamDeleteUserPolicyResponse = iamlib.DeleteUserPolicyResponse - iamGetUserPolicyResponse = iamlib.GetUserPolicyResponse - iamSetUserStatusResponse = iamlib.SetUserStatusResponse - iamUpdateAccessKeyResponse = iamlib.UpdateAccessKeyResponse - iamErrorResponse = iamlib.ErrorResponse - iamError = iamlib.Error + iamListUsersResponse = iamlib.ListUsersResponse + iamListAccessKeysResponse = iamlib.ListAccessKeysResponse + iamDeleteAccessKeyResponse = iamlib.DeleteAccessKeyResponse + iamCreatePolicyResponse = iamlib.CreatePolicyResponse + iamCreateUserResponse = iamlib.CreateUserResponse + iamDeleteUserResponse = iamlib.DeleteUserResponse + iamGetUserResponse = iamlib.GetUserResponse + iamUpdateUserResponse = iamlib.UpdateUserResponse + iamCreateAccessKeyResponse = iamlib.CreateAccessKeyResponse + iamPutUserPolicyResponse = iamlib.PutUserPolicyResponse + iamDeleteUserPolicyResponse = iamlib.DeleteUserPolicyResponse + iamGetUserPolicyResponse = iamlib.GetUserPolicyResponse + iamAttachUserPolicyResponse = iamlib.AttachUserPolicyResponse + iamDetachUserPolicyResponse = iamlib.DetachUserPolicyResponse + iamListAttachedUserPoliciesResponse = iamlib.ListAttachedUserPoliciesResponse + iamSetUserStatusResponse = iamlib.SetUserStatusResponse + iamUpdateAccessKeyResponse = iamlib.UpdateAccessKeyResponse + iamErrorResponse = iamlib.ErrorResponse + iamError = iamlib.Error // Service account response types iamServiceAccountInfo = iamlib.ServiceAccountInfo iamCreateServiceAccountResponse = iamlib.CreateServiceAccountResponse @@ -166,6 +170,8 @@ func (e *EmbeddedIamApi) writeIamErrorResponse(w http.ResponseWriter, r *http.Re s3err.WriteXMLResponse(w, r, http.StatusForbidden, errorResp) case iam.ErrCodeServiceFailureException: s3err.WriteXMLResponse(w, r, http.StatusInternalServerError, internalErrorResponse) + case "NotImplemented": + s3err.WriteXMLResponse(w, r, http.StatusNotImplemented, errorResp) default: s3err.WriteXMLResponse(w, r, http.StatusInternalServerError, internalErrorResponse) } @@ -375,7 +381,7 @@ func (e *EmbeddedIamApi) GetPolicyDocument(policy *string) (policy_engine.Policy // NOTE: Currently this only validates the policy document and returns policy metadata. // The policy is not persisted to a managed policy store. To apply permissions to a user, // use PutUserPolicy which stores the policy inline on the user's identity. -// TODO: Implement managed policy storage for full AWS IAM compatibility (ListPolicies, GetPolicy, AttachUserPolicy). +// TODO: Implement managed policy storage for full AWS IAM compatibility (ListPolicies, GetPolicy). func (e *EmbeddedIamApi) CreatePolicy(s3cfg *iam_pb.S3ApiConfiguration, values url.Values) (iamCreatePolicyResponse, *iamError) { var resp iamCreatePolicyResponse policyName := values.Get("PolicyName") @@ -392,6 +398,31 @@ func (e *EmbeddedIamApi) CreatePolicy(s3cfg *iam_pb.S3ApiConfiguration, values u return resp, nil } +func iamPolicyNameFromArn(policyArn string) (string, error) { + const policyPathDelimiter = ":policy/" + idx := strings.Index(policyArn, policyPathDelimiter) + if idx < 0 { + return "", fmt.Errorf("invalid policy arn: %s", policyArn) + } + + policyPath := strings.Trim(policyArn[idx+len(policyPathDelimiter):], "/") + if policyPath == "" { + return "", fmt.Errorf("invalid policy arn: %s", policyArn) + } + + parts := strings.Split(policyPath, "/") + policyName := parts[len(parts)-1] + if policyName == "" { + return "", fmt.Errorf("invalid policy arn: %s", policyArn) + } + + return policyName, nil +} + +func iamPolicyArn(policyName string) string { + return fmt.Sprintf("arn:aws:iam:::policy/%s", policyName) +} + // getActions extracts actions from a policy document. // S3 ARN format: arn:aws:s3:::bucket or arn:aws:s3:::bucket/path/* // res[5] contains the bucket and optional path after ::: @@ -559,6 +590,192 @@ func (e *EmbeddedIamApi) DeleteUserPolicy(s3cfg *iam_pb.S3ApiConfiguration, valu return resp, &iamError{Code: iam.ErrCodeNoSuchEntityException, Error: fmt.Errorf(iamUserDoesNotExist, userName)} } +// AttachUserPolicy attaches a managed policy to a user. +func (e *EmbeddedIamApi) AttachUserPolicy(ctx context.Context, values url.Values) (iamAttachUserPolicyResponse, *iamError) { + var resp iamAttachUserPolicyResponse + + userName := values.Get("UserName") + if userName == "" { + return resp, &iamError{Code: iam.ErrCodeInvalidInputException, Error: fmt.Errorf("UserName is required")} + } + + policyArn := values.Get("PolicyArn") + policyName, err := iamPolicyNameFromArn(policyArn) + if err != nil { + return resp, &iamError{Code: iam.ErrCodeInvalidInputException, Error: err} + } + + if e.credentialManager == nil { + return resp, &iamError{Code: iam.ErrCodeServiceFailureException, Error: fmt.Errorf("credential manager not configured")} + } + + policy, err := e.credentialManager.GetPolicy(ctx, policyName) + if err != nil { + return resp, &iamError{Code: iam.ErrCodeServiceFailureException, Error: err} + } + if policy == nil { + return resp, &iamError{Code: iam.ErrCodeNoSuchEntityException, Error: fmt.Errorf("policy %s not found", policyName)} + } + + attachedPolicies, err := e.credentialManager.ListAttachedUserPolicies(ctx, userName) + if err != nil { + if errors.Is(err, credential.ErrUserNotFound) { + return resp, &iamError{Code: iam.ErrCodeNoSuchEntityException, Error: fmt.Errorf(iamUserDoesNotExist, userName)} + } + return resp, &iamError{Code: iam.ErrCodeServiceFailureException, Error: err} + } + for _, attached := range attachedPolicies { + if attached == policyName { + return resp, nil + } + } + if len(attachedPolicies) >= MaxManagedPoliciesPerUser { + return resp, &iamError{ + Code: iam.ErrCodeLimitExceededException, + Error: fmt.Errorf("cannot attach more than %d managed policies to user %s", MaxManagedPoliciesPerUser, userName), + } + } + + if err := e.credentialManager.AttachUserPolicy(ctx, userName, policyName); err != nil { + if errors.Is(err, credential.ErrUserNotFound) { + return resp, &iamError{Code: iam.ErrCodeNoSuchEntityException, Error: fmt.Errorf(iamUserDoesNotExist, userName)} + } + if errors.Is(err, credential.ErrPolicyNotFound) { + return resp, &iamError{Code: iam.ErrCodeNoSuchEntityException, Error: fmt.Errorf("policy %s not found", policyName)} + } + if errors.Is(err, credential.ErrPolicyAlreadyAttached) { + // AWS IAM is idempotent for AttachUserPolicy + return resp, nil + } + return resp, &iamError{Code: iam.ErrCodeServiceFailureException, Error: err} + } + + return resp, nil +} + +// DetachUserPolicy detaches a managed policy from a user. +func (e *EmbeddedIamApi) DetachUserPolicy(ctx context.Context, values url.Values) (iamDetachUserPolicyResponse, *iamError) { + var resp iamDetachUserPolicyResponse + + userName := values.Get("UserName") + if userName == "" { + return resp, &iamError{Code: iam.ErrCodeInvalidInputException, Error: fmt.Errorf("UserName is required")} + } + + policyArn := values.Get("PolicyArn") + policyName, err := iamPolicyNameFromArn(policyArn) + if err != nil { + return resp, &iamError{Code: iam.ErrCodeInvalidInputException, Error: err} + } + + if e.credentialManager == nil { + return resp, &iamError{Code: iam.ErrCodeServiceFailureException, Error: fmt.Errorf("credential manager not configured")} + } + + policy, err := e.credentialManager.GetPolicy(ctx, policyName) + if err != nil { + return resp, &iamError{Code: iam.ErrCodeServiceFailureException, Error: err} + } + if policy == nil { + return resp, &iamError{Code: iam.ErrCodeNoSuchEntityException, Error: fmt.Errorf("policy %s not found", policyName)} + } + + if err := e.credentialManager.DetachUserPolicy(ctx, userName, policyName); err != nil { + if errors.Is(err, credential.ErrUserNotFound) { + return resp, &iamError{Code: iam.ErrCodeNoSuchEntityException, Error: fmt.Errorf(iamUserDoesNotExist, userName)} + } + if errors.Is(err, credential.ErrPolicyNotAttached) { + return resp, &iamError{Code: iam.ErrCodeNoSuchEntityException, Error: fmt.Errorf("policy %s not attached to user %s", policyName, userName)} + } + return resp, &iamError{Code: iam.ErrCodeServiceFailureException, Error: err} + } + + return resp, nil +} + +// ListAttachedUserPolicies lists managed policies attached to a user. +func (e *EmbeddedIamApi) ListAttachedUserPolicies(ctx context.Context, values url.Values) (iamListAttachedUserPoliciesResponse, *iamError) { + var resp iamListAttachedUserPoliciesResponse + + userName := values.Get("UserName") + if userName == "" { + return resp, &iamError{Code: iam.ErrCodeInvalidInputException, Error: fmt.Errorf("UserName is required")} + } + + pathPrefix := values.Get("PathPrefix") + if pathPrefix == "" { + pathPrefix = "/" + } + + maxItems := 0 + if maxItemsStr := values.Get("MaxItems"); maxItemsStr != "" { + parsedMaxItems, err := strconv.Atoi(maxItemsStr) + if err != nil || parsedMaxItems <= 0 { + return resp, &iamError{Code: iam.ErrCodeInvalidInputException, Error: fmt.Errorf("MaxItems must be a positive integer")} + } + maxItems = parsedMaxItems + } + marker := values.Get("Marker") + + if e.credentialManager == nil { + return resp, &iamError{Code: iam.ErrCodeServiceFailureException, Error: fmt.Errorf("credential manager not configured")} + } + + policyNames, err := e.credentialManager.ListAttachedUserPolicies(ctx, userName) + if err != nil { + if errors.Is(err, credential.ErrUserNotFound) { + return resp, &iamError{Code: iam.ErrCodeNoSuchEntityException, Error: fmt.Errorf(iamUserDoesNotExist, userName)} + } + return resp, &iamError{Code: iam.ErrCodeServiceFailureException, Error: err} + } + + var attachedPolicies []*iam.AttachedPolicy + for _, attachedPolicyName := range policyNames { + // Policy paths are not tracked in the current configuration, so PathPrefix + // filtering is not supported yet. Always return the policy for now. + policyNameCopy := attachedPolicyName + policyArn := iamPolicyArn(attachedPolicyName) + policyArnCopy := policyArn + attachedPolicies = append(attachedPolicies, &iam.AttachedPolicy{ + PolicyName: &policyNameCopy, + PolicyArn: &policyArnCopy, + }) + } + + start := 0 + markerFound := false + if marker != "" { + for i, p := range attachedPolicies { + if p.PolicyName != nil && *p.PolicyName == marker { + start = i + 1 + markerFound = true + break + } + } + if !markerFound && len(attachedPolicies) > 0 { + return resp, &iamError{Code: iam.ErrCodeInvalidInputException, Error: fmt.Errorf("marker %s not found", marker)} + } + } + if start > 0 && start < len(attachedPolicies) { + attachedPolicies = attachedPolicies[start:] + } else if start >= len(attachedPolicies) { + attachedPolicies = nil + } + + if maxItems > 0 && len(attachedPolicies) > maxItems { + resp.ListAttachedUserPoliciesResult.AttachedPolicies = attachedPolicies[:maxItems] + resp.ListAttachedUserPoliciesResult.IsTruncated = true + if name := resp.ListAttachedUserPoliciesResult.AttachedPolicies[maxItems-1].PolicyName; name != nil { + resp.ListAttachedUserPoliciesResult.Marker = *name + } + return resp, nil + } + + resp.ListAttachedUserPoliciesResult.AttachedPolicies = attachedPolicies + resp.ListAttachedUserPoliciesResult.IsTruncated = false + return resp, nil +} + // SetUserStatus enables or disables a user without deleting them. // This is a SeaweedFS extension for temporary user suspension, offboarding, etc. // When a user is disabled, all API requests using their credentials will return AccessDenied. @@ -1049,7 +1266,7 @@ func (e *EmbeddedIamApi) AuthIam(f http.HandlerFunc, _ Action) http.HandlerFunc // ExecuteAction executes an IAM action with the given values. // If skipPersist is true, the changed configuration is not saved to the persistent store. -func (e *EmbeddedIamApi) ExecuteAction(values url.Values, skipPersist bool) (interface{}, *iamError) { +func (e *EmbeddedIamApi) ExecuteAction(ctx context.Context, values url.Values, skipPersist bool) (interface{}, *iamError) { // Lock to prevent concurrent read-modify-write race conditions e.policyLock.Lock() defer e.policyLock.Unlock() @@ -1057,7 +1274,7 @@ func (e *EmbeddedIamApi) ExecuteAction(values url.Values, skipPersist bool) (int action := values.Get("Action") if e.readOnly { switch action { - case "ListUsers", "ListAccessKeys", "GetUser", "GetUserPolicy", "ListServiceAccounts", "GetServiceAccount": + case "ListUsers", "ListAccessKeys", "GetUser", "GetUserPolicy", "ListAttachedUserPolicies", "ListServiceAccounts", "GetServiceAccount": // Allowed read-only actions default: return nil, &iamError{Code: s3err.GetAPIError(s3err.ErrAccessDenied).Code, Error: fmt.Errorf("IAM write operations are disabled on this server")} @@ -1141,6 +1358,24 @@ func (e *EmbeddedIamApi) ExecuteAction(values url.Values, skipPersist bool) (int if iamErr != nil { return nil, iamErr } + case "AttachUserPolicy": + response, iamErr = e.AttachUserPolicy(ctx, values) + if iamErr != nil { + return nil, iamErr + } + changed = false + case "DetachUserPolicy": + response, iamErr = e.DetachUserPolicy(ctx, values) + if iamErr != nil { + return nil, iamErr + } + changed = false + case "ListAttachedUserPolicies": + response, iamErr = e.ListAttachedUserPolicies(ctx, values) + if iamErr != nil { + return nil, iamErr + } + changed = false case "SetUserStatus": response, iamErr = e.SetUserStatus(s3cfg, values) if iamErr != nil { @@ -1193,8 +1428,14 @@ func (e *EmbeddedIamApi) ExecuteAction(values url.Values, skipPersist bool) (int glog.Errorf("Failed to reload IAM configuration after mutation: %v", err) // Don't fail the request since the persistent save succeeded } + } else if iamErr == nil && (action == "AttachUserPolicy" || action == "DetachUserPolicy") { + // Even if changed=false (persisted via credentialManager), we should still reload + // if we are utilizing the local in-memory cache for speed + if err := e.ReloadConfiguration(); err != nil { + glog.Errorf("Failed to reload IAM configuration after managed policy mutation: %v", err) + } } - return response, nil + return response, iamErr } // DoActions handles IAM API actions. @@ -1214,7 +1455,7 @@ func (e *EmbeddedIamApi) DoActions(w http.ResponseWriter, r *http.Request) { values.Set("CreatedBy", createdBy) } - response, iamErr := e.ExecuteAction(values, false) + response, iamErr := e.ExecuteAction(r.Context(), values, false) if iamErr != nil { e.writeIamErrorResponse(w, r, iamErr) return diff --git a/weed/s3api/s3api_embedded_iam_test.go b/weed/s3api/s3api_embedded_iam_test.go index 8ac0472cc..e2e3df8a1 100644 --- a/weed/s3api/s3api_embedded_iam_test.go +++ b/weed/s3api/s3api_embedded_iam_test.go @@ -1,6 +1,7 @@ package s3api import ( + "context" "encoding/json" "encoding/xml" "fmt" @@ -16,6 +17,8 @@ import ( "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/iam" "github.com/gorilla/mux" + "github.com/seaweedfs/seaweedfs/weed/credential" + "github.com/seaweedfs/seaweedfs/weed/credential/memory" "github.com/seaweedfs/seaweedfs/weed/pb/iam_pb" . "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" "github.com/seaweedfs/seaweedfs/weed/s3api/s3err" @@ -30,179 +33,63 @@ type EmbeddedIamApiForTest struct { } func NewEmbeddedIamApiForTest() *EmbeddedIamApiForTest { + store := &memory.MemoryStore{} + store.Initialize(nil, "") + cm := &credential.CredentialManager{Store: store} e := &EmbeddedIamApiForTest{ EmbeddedIamApi: &EmbeddedIamApi{ - iam: &IdentityAccessManagement{}, + iam: &IdentityAccessManagement{credentialManager: cm}, + credentialManager: cm, }, mockConfig: &iam_pb.S3ApiConfiguration{}, } + var syncOnce sync.Once e.getS3ApiConfigurationFunc = func(s3cfg *iam_pb.S3ApiConfiguration) error { - if e.mockConfig != nil { - cloned := proto.Clone(e.mockConfig).(*iam_pb.S3ApiConfiguration) - proto.Merge(s3cfg, cloned) + // If mockConfig was set directly in test, sync it to store first (only once) + var syncErr error + syncOnce.Do(func() { + if e.mockConfig != nil { + syncErr = cm.SaveConfiguration(context.Background(), e.mockConfig) + } + }) + if syncErr != nil { + return syncErr } - return nil + config, err := cm.LoadConfiguration(context.Background()) + if err == nil { + e.mockConfig = config + proto.Reset(s3cfg) + // Manually copy identities and other fields to avoid Merge issues with slices + s3cfg.Identities = make([]*iam_pb.Identity, len(config.Identities)) + for i, ident := range config.Identities { + s3cfg.Identities[i] = proto.Clone(ident).(*iam_pb.Identity) + } + s3cfg.Policies = make([]*iam_pb.Policy, len(config.Policies)) + for i, p := range config.Policies { + s3cfg.Policies[i] = proto.Clone(p).(*iam_pb.Policy) + } + } + return err } e.putS3ApiConfigurationFunc = func(s3cfg *iam_pb.S3ApiConfiguration) error { e.mockConfig = proto.Clone(s3cfg).(*iam_pb.S3ApiConfiguration) - return nil + return cm.SaveConfiguration(context.Background(), s3cfg) } e.reloadConfigurationFunc = func() error { + config, err := cm.LoadConfiguration(context.Background()) + if err != nil { + return err + } + e.mockConfig = config return nil } return e } -// Override GetS3ApiConfiguration for testing -func (e *EmbeddedIamApiForTest) GetS3ApiConfiguration(s3cfg *iam_pb.S3ApiConfiguration) error { - // Use proto.Clone for proper deep copy semantics - if e.mockConfig != nil { - cloned := proto.Clone(e.mockConfig).(*iam_pb.S3ApiConfiguration) - proto.Merge(s3cfg, cloned) - } - return nil -} - -// Override PutS3ApiConfiguration for testing -func (e *EmbeddedIamApiForTest) PutS3ApiConfiguration(s3cfg *iam_pb.S3ApiConfiguration) error { - // Use proto.Clone for proper deep copy semantics - e.mockConfig = proto.Clone(s3cfg).(*iam_pb.S3ApiConfiguration) - return nil -} - // DoActions handles IAM API actions for testing func (e *EmbeddedIamApiForTest) DoActions(w http.ResponseWriter, r *http.Request) { - if err := r.ParseForm(); err != nil { - http.Error(w, "Invalid request", http.StatusBadRequest) - return - } - values := r.PostForm - s3cfg := &iam_pb.S3ApiConfiguration{} - if err := e.GetS3ApiConfiguration(s3cfg); err != nil { - http.Error(w, "Internal error", http.StatusInternalServerError) - return - } - - var response interface{} - var iamErr *iamError - changed := true - - action := r.Form.Get("Action") - - if e.readOnly { - switch action { - case "ListUsers", "ListAccessKeys", "GetUser", "GetUserPolicy", "ListServiceAccounts", "GetServiceAccount": - // Allowed read-only actions - default: - e.writeIamErrorResponse(w, r, &iamError{Code: s3err.GetAPIError(s3err.ErrAccessDenied).Code, Error: fmt.Errorf("IAM write operations are disabled on this server")}) - return - } - } - - switch action { - case "ListUsers": - response = e.ListUsers(s3cfg, values) - changed = false - case "ListAccessKeys": - e.handleImplicitUsername(r, values) - response = e.ListAccessKeys(s3cfg, values) - changed = false - case "CreateUser": - response, iamErr = e.CreateUser(s3cfg, values) - if iamErr != nil { - e.writeIamErrorResponse(w, r, iamErr) - return - } - case "GetUser": - userName := values.Get("UserName") - response, iamErr = e.GetUser(s3cfg, userName) - if iamErr != nil { - e.writeIamErrorResponse(w, r, iamErr) - return - } - changed = false - case "UpdateUser": - response, iamErr = e.UpdateUser(s3cfg, values) - if iamErr != nil { - http.Error(w, "Invalid request", http.StatusBadRequest) - return - } - case "DeleteUser": - userName := values.Get("UserName") - response, iamErr = e.DeleteUser(s3cfg, userName) - if iamErr != nil { - e.writeIamErrorResponse(w, r, iamErr) - return - } - case "CreateAccessKey": - e.handleImplicitUsername(r, values) - response, iamErr = e.CreateAccessKey(s3cfg, values) - if iamErr != nil { - http.Error(w, "Internal error", http.StatusInternalServerError) - return - } - case "DeleteAccessKey": - e.handleImplicitUsername(r, values) - response = e.DeleteAccessKey(s3cfg, values) - case "CreatePolicy": - response, iamErr = e.CreatePolicy(s3cfg, values) - if iamErr != nil { - http.Error(w, "Invalid request", http.StatusBadRequest) - return - } - case "PutUserPolicy": - response, iamErr = e.PutUserPolicy(s3cfg, values) - if iamErr != nil { - e.writeIamErrorResponse(w, r, iamErr) - return - } - case "GetUserPolicy": - response, iamErr = e.GetUserPolicy(s3cfg, values) - if iamErr != nil { - e.writeIamErrorResponse(w, r, iamErr) - return - } - changed = false - case "DeleteUserPolicy": - response, iamErr = e.DeleteUserPolicy(s3cfg, values) - if iamErr != nil { - e.writeIamErrorResponse(w, r, iamErr) - return - } - case "SetUserStatus": - response, iamErr = e.SetUserStatus(s3cfg, values) - if iamErr != nil { - e.writeIamErrorResponse(w, r, iamErr) - return - } - case "UpdateAccessKey": - e.handleImplicitUsername(r, values) - response, iamErr = e.UpdateAccessKey(s3cfg, values) - if iamErr != nil { - e.writeIamErrorResponse(w, r, iamErr) - return - } - default: - http.Error(w, "Not implemented", http.StatusNotImplemented) - return - } - - if changed { - if err := e.PutS3ApiConfiguration(s3cfg); err != nil { - http.Error(w, "Internal error", http.StatusInternalServerError) - return - } - } - - w.Header().Set("Content-Type", "application/xml") - w.WriteHeader(http.StatusOK) - xmlBytes, err := xml.Marshal(response) - if err != nil { - // This should not happen in tests, but log it for debugging - http.Error(w, "Internal error: failed to marshal response", http.StatusInternalServerError) - return - } - _, _ = w.Write(xmlBytes) + // Call the real DoActions + e.EmbeddedIamApi.DoActions(w, r) } // executeEmbeddedIamRequest executes an IAM request against the given API instance. @@ -229,11 +116,38 @@ type embeddedIamErrorResponseForTest struct { } func extractEmbeddedIamErrorCodeAndMessage(response *httptest.ResponseRecorder) (string, string) { + body := response.Body.Bytes() + // Try parsing with ErrorResponse root + type localError struct { + Code string `xml:"Code"` + Message string `xml:"Message"` + } + type localResponse struct { + XMLName xml.Name `xml:"ErrorResponse"` + Error localError `xml:"Error"` + } + var lr localResponse + if err := xml.Unmarshal(body, &lr); err == nil && lr.Error.Code != "" { + return lr.Error.Code, lr.Error.Message + } + + // Try parsing with Error root + type simpleError struct { + XMLName xml.Name `xml:"Error"` + Code string `xml:"Code"` + Message string `xml:"Message"` + } + var se simpleError + if err := xml.Unmarshal(body, &se); err == nil && se.Code != "" { + return se.Code, se.Message + } + var er embeddedIamErrorResponseForTest - if err := xml.Unmarshal(response.Body.Bytes(), &er); err != nil { - return "", "" + if err := xml.Unmarshal(body, &er); err == nil { + return er.Error.Code, er.Error.Message } - return er.Error.Code, er.Error.Message + + return "", "" } // TestEmbeddedIamCreateUser tests creating a user via the embedded IAM API @@ -528,6 +442,210 @@ func TestEmbeddedIamDeleteUserPolicyUserNotFound(t *testing.T) { assert.Equal(t, http.StatusNotFound, rr.Code) } +// TestEmbeddedIamAttachUserPolicy tests attaching a managed policy to a user. +func TestEmbeddedIamAttachUserPolicy(t *testing.T) { + api := NewEmbeddedIamApiForTest() + api.mockConfig = &iam_pb.S3ApiConfiguration{ + Identities: []*iam_pb.Identity{ + {Name: "TestUser"}, + }, + Policies: []*iam_pb.Policy{ + {Name: "TestManagedPolicy", Content: `{"Version":"2012-10-17","Statement":[]}`}, + }, + } + + params := &iam.AttachUserPolicyInput{ + UserName: aws.String("TestUser"), + PolicyArn: aws.String("arn:aws:iam:::policy/TestManagedPolicy"), + } + req, _ := iam.New(session.New()).AttachUserPolicyRequest(params) + _ = req.Build() + + out := iamAttachUserPolicyResponse{} + response, err := executeEmbeddedIamRequest(api, req.HTTPRequest, &out) + assert.NoError(t, err) + assert.Equal(t, http.StatusOK, response.Code) + assert.Equal(t, []string{"TestManagedPolicy"}, api.mockConfig.Identities[0].PolicyNames) +} + +// TestEmbeddedIamAttachUserPolicyNoSuchPolicy tests attach failure when managed policy does not exist. +func TestEmbeddedIamAttachUserPolicyNoSuchPolicy(t *testing.T) { + api := NewEmbeddedIamApiForTest() + api.mockConfig = &iam_pb.S3ApiConfiguration{ + Identities: []*iam_pb.Identity{ + {Name: "TestUser"}, + }, + } + + params := &iam.AttachUserPolicyInput{ + UserName: aws.String("TestUser"), + PolicyArn: aws.String("arn:aws:iam:::policy/DoesNotExist"), + } + req, _ := iam.New(session.New()).AttachUserPolicyRequest(params) + _ = req.Build() + + response, err := executeEmbeddedIamRequest(api, req.HTTPRequest, nil) + assert.NoError(t, err) + assert.Equal(t, http.StatusNotFound, response.Code) + code, _ := extractEmbeddedIamErrorCodeAndMessage(response) + assert.Equal(t, "NoSuchEntity", code) +} + +// TestEmbeddedIamDetachUserPolicy tests detaching a managed policy from a user. +func TestEmbeddedIamDetachUserPolicy(t *testing.T) { + api := NewEmbeddedIamApiForTest() + api.mockConfig = &iam_pb.S3ApiConfiguration{ + Identities: []*iam_pb.Identity{ + {Name: "TestUser", PolicyNames: []string{"TestManagedPolicy", "KeepPolicy"}}, + }, + Policies: []*iam_pb.Policy{ + {Name: "TestManagedPolicy", Content: `{"Version":"2012-10-17","Statement":[]}`}, + {Name: "KeepPolicy", Content: `{"Version":"2012-10-17","Statement":[]}`}, + }, + } + + params := &iam.DetachUserPolicyInput{ + UserName: aws.String("TestUser"), + PolicyArn: aws.String("arn:aws:iam:::policy/TestManagedPolicy"), + } + req, _ := iam.New(session.New()).DetachUserPolicyRequest(params) + _ = req.Build() + + out := iamDetachUserPolicyResponse{} + response, err := executeEmbeddedIamRequest(api, req.HTTPRequest, &out) + assert.NoError(t, err) + assert.Equal(t, http.StatusOK, response.Code) + assert.Equal(t, []string{"KeepPolicy"}, api.mockConfig.Identities[0].PolicyNames) +} + +// TestEmbeddedIamAttachAlreadyAttachedPolicy ensures attaching a policy already +// present on the user is idempotent. +func TestEmbeddedIamAttachAlreadyAttachedPolicy(t *testing.T) { + api := NewEmbeddedIamApiForTest() + api.mockConfig = &iam_pb.S3ApiConfiguration{ + Identities: []*iam_pb.Identity{ + {Name: "TestUser", PolicyNames: []string{"TestManagedPolicy"}}, + }, + Policies: []*iam_pb.Policy{ + {Name: "TestManagedPolicy", Content: `{"Version":"2012-10-17","Statement":[]}`}, + }, + } + + params := &iam.AttachUserPolicyInput{ + UserName: aws.String("TestUser"), + PolicyArn: aws.String("arn:aws:iam:::policy/TestManagedPolicy"), + } + req, _ := iam.New(session.New()).AttachUserPolicyRequest(params) + _ = req.Build() + + out := iamAttachUserPolicyResponse{} + response, err := executeEmbeddedIamRequest(api, req.HTTPRequest, &out) + assert.NoError(t, err) + assert.Equal(t, http.StatusOK, response.Code) + assert.Equal(t, []string{"TestManagedPolicy"}, api.mockConfig.Identities[0].PolicyNames) +} + +// TestEmbeddedIamDetachNotAttachedPolicy verifies detaching a policy that's not +// attached returns NoSuchEntity. +func TestEmbeddedIamDetachNotAttachedPolicy(t *testing.T) { + api := NewEmbeddedIamApiForTest() + api.mockConfig = &iam_pb.S3ApiConfiguration{ + Identities: []*iam_pb.Identity{ + {Name: "TestUser"}, + }, + Policies: []*iam_pb.Policy{ + {Name: "MissingPolicy", Content: `{"Version":"2012-10-17","Statement":[]}`}, + }, + } + + params := &iam.DetachUserPolicyInput{ + UserName: aws.String("TestUser"), + PolicyArn: aws.String("arn:aws:iam:::policy/MissingPolicy"), + } + req, _ := iam.New(session.New()).DetachUserPolicyRequest(params) + _ = req.Build() + + response, err := executeEmbeddedIamRequest(api, req.HTTPRequest, nil) + assert.NoError(t, err) + assert.Equal(t, http.StatusNotFound, response.Code) + code, _ := extractEmbeddedIamErrorCodeAndMessage(response) + assert.Equal(t, "NoSuchEntity", code) +} + +// TestEmbeddedIamAttachPolicyLimitExceeded ensures we honor the managed policy limit. +func TestEmbeddedIamAttachPolicyLimitExceeded(t *testing.T) { + api := NewEmbeddedIamApiForTest() + existingPolicies := make([]string, 0, MaxManagedPoliciesPerUser) + configPolicies := make([]*iam_pb.Policy, 0, MaxManagedPoliciesPerUser+1) + for i := 0; i < MaxManagedPoliciesPerUser; i++ { + name := fmt.Sprintf("ManagedPolicy%d", i) + existingPolicies = append(existingPolicies, name) + configPolicies = append(configPolicies, &iam_pb.Policy{ + Name: name, + Content: `{"Version":"2012-10-17","Statement":[]}`, + }) + } + configPolicies = append(configPolicies, &iam_pb.Policy{ + Name: "NewPolicy", + Content: `{"Version":"2012-10-17","Statement":[]}`, + }) + + api.mockConfig = &iam_pb.S3ApiConfiguration{ + Identities: []*iam_pb.Identity{ + {Name: "TestUser", PolicyNames: existingPolicies}, + }, + Policies: configPolicies, + } + + params := &iam.AttachUserPolicyInput{ + UserName: aws.String("TestUser"), + PolicyArn: aws.String("arn:aws:iam:::policy/NewPolicy"), + } + req, _ := iam.New(session.New()).AttachUserPolicyRequest(params) + _ = req.Build() + + response, err := executeEmbeddedIamRequest(api, req.HTTPRequest, nil) + assert.NoError(t, err) + assert.Equal(t, http.StatusForbidden, response.Code) + code, _ := extractEmbeddedIamErrorCodeAndMessage(response) + assert.Equal(t, iam.ErrCodeLimitExceededException, code) + assert.Len(t, api.mockConfig.Identities[0].PolicyNames, MaxManagedPoliciesPerUser) +} + +// TestEmbeddedIamListAttachedUserPolicies tests listing managed policies attached to a user. +func TestEmbeddedIamListAttachedUserPolicies(t *testing.T) { + api := NewEmbeddedIamApiForTest() + api.mockConfig = &iam_pb.S3ApiConfiguration{ + Identities: []*iam_pb.Identity{ + {Name: "TestUser", PolicyNames: []string{"PolicyA", "PolicyB"}}, + }, + Policies: []*iam_pb.Policy{ + {Name: "PolicyA", Content: `{"Version":"2012-10-17","Statement":[]}`}, + {Name: "PolicyB", Content: `{"Version":"2012-10-17","Statement":[]}`}, + }, + } + + params := &iam.ListAttachedUserPoliciesInput{ + UserName: aws.String("TestUser"), + } + req, _ := iam.New(session.New()).ListAttachedUserPoliciesRequest(params) + _ = req.Build() + + out := iamListAttachedUserPoliciesResponse{} + response, err := executeEmbeddedIamRequest(api, req.HTTPRequest, &out) + assert.NoError(t, err) + assert.Equal(t, http.StatusOK, response.Code) + assert.False(t, out.ListAttachedUserPoliciesResult.IsTruncated) + assert.Len(t, out.ListAttachedUserPoliciesResult.AttachedPolicies, 2) + + got := map[string]string{} + for _, attached := range out.ListAttachedUserPoliciesResult.AttachedPolicies { + got[aws.StringValue(attached.PolicyName)] = aws.StringValue(attached.PolicyArn) + } + assert.Equal(t, "arn:aws:iam:::policy/PolicyA", got["PolicyA"]) + assert.Equal(t, "arn:aws:iam:::policy/PolicyB", got["PolicyB"]) +} + // TestEmbeddedIamUpdateUser tests updating a user func TestEmbeddedIamUpdateUser(t *testing.T) { api := NewEmbeddedIamApiForTest() @@ -913,7 +1031,7 @@ func TestEmbeddedIamUpdateUserNotFound(t *testing.T) { req, _ := iam.New(session.New()).UpdateUserRequest(params) _ = req.Build() response, _ := executeEmbeddedIamRequest(api, req.HTTPRequest, nil) - assert.Equal(t, http.StatusBadRequest, response.Code) + assert.Equal(t, http.StatusNotFound, response.Code) } // TestEmbeddedIamCreateAccessKeyForExistingUser tests CreateAccessKey creates credentials for existing user @@ -1703,7 +1821,7 @@ func TestEmbeddedIamExecuteAction(t *testing.T) { vals.Set("Action", "CreateUser") vals.Set("UserName", "ExecuteActionUser") - resp, iamErr := api.ExecuteAction(vals, false) + resp, iamErr := api.ExecuteAction(context.Background(), vals, false) assert.Nil(t, iamErr) // Verify response type diff --git a/weed/server/raft_server.go b/weed/server/raft_server.go index f05dd97e5..ce35f6562 100644 --- a/weed/server/raft_server.go +++ b/weed/server/raft_server.go @@ -6,6 +6,7 @@ import ( "math/rand/v2" "os" "path" + "sync" "time" transport "github.com/Jille/raft-grpc-transport" @@ -114,6 +115,14 @@ func (s *StateMachine) Restore(r io.ReadCloser) error { return nil } +var registerMaxVolumeIdCommandOnce sync.Once + +func registerMaxVolumeIdCommand() { + registerMaxVolumeIdCommandOnce.Do(func() { + raft.RegisterCommand(&topology.MaxVolumeIdCommand{}) + }) +} + func NewRaftServer(option *RaftServerOption) (*RaftServer, error) { s := &RaftServer{ peers: option.Peers, @@ -126,7 +135,7 @@ func NewRaftServer(option *RaftServerOption) (*RaftServer, error) { raft.SetLogLevel(2) } - raft.RegisterCommand(&topology.MaxVolumeIdCommand{}) + registerMaxVolumeIdCommand() var err error transporter := raft.NewGrpcTransporter(option.GrpcDialOption)