diff --git a/weed/admin/dash/mq_management.go b/weed/admin/dash/mq_management.go index 3fd4aed85..ba9c1cd18 100644 --- a/weed/admin/dash/mq_management.go +++ b/weed/admin/dash/mq_management.go @@ -391,7 +391,7 @@ func (s *AdminServer) GetConsumerGroupOffsets(namespace, topicName string) ([]Co consumerGroup := strings.TrimSuffix(offsetResp.Entry.Name, ".offset") // Read the offset value from the file - offsetData, err := filer.ReadInsideFiler(client, partitionDir, offsetResp.Entry.Name) + offsetData, err := filer.ReadInsideFiler(context.Background(), client, partitionDir, offsetResp.Entry.Name) if err != nil { glog.Warningf("Failed to read offset file %s: %v", offsetResp.Entry.Name, err) continue diff --git a/weed/credential/filer_etc/filer_etc_identity.go b/weed/credential/filer_etc/filer_etc_identity.go index 7a1c2c051..56af5381b 100644 --- a/weed/credential/filer_etc/filer_etc_identity.go +++ b/weed/credential/filer_etc/filer_etc_identity.go @@ -24,7 +24,7 @@ func (store *FilerEtcStore) LoadConfiguration(ctx context.Context) (*iam_pb.S3Ap s3cfg := &iam_pb.S3ApiConfiguration{} // 1. Load from legacy single file (low priority) - content, foundLegacy, err := store.readInsideFiler(filer.IamConfigDirectory, IamLegacyIdentityFile) + content, foundLegacy, err := store.readInsideFiler(ctx, filer.IamConfigDirectory, IamLegacyIdentityFile) if err != nil { return s3cfg, err } @@ -93,7 +93,7 @@ func (store *FilerEtcStore) loadFromMultiFile(ctx context.Context, s3cfg *iam_pb if len(entry.Content) > 0 { content = entry.Content } else { - c, err := filer.ReadInsideFiler(client, dir, entry.Name) + c, err := filer.ReadInsideFiler(ctx, client, dir, entry.Name) if err != nil { glog.Warningf("Failed to read identity file %s: %v", entry.Name, err) continue @@ -249,7 +249,7 @@ func (store *FilerEtcStore) CreateUser(ctx context.Context, identity *iam_pb.Ide func (store *FilerEtcStore) GetUser(ctx context.Context, username string) (*iam_pb.Identity, error) { var identity *iam_pb.Identity err := store.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { - data, err := filer.ReadInsideFiler(client, filer.IamConfigDirectory+"/"+IamIdentitiesDirectory, username+".json") + data, err := filer.ReadInsideFiler(ctx, client, filer.IamConfigDirectory+"/"+IamIdentitiesDirectory, username+".json") if err != nil { if err == filer_pb.ErrNotFound { return credential.ErrUserNotFound @@ -350,7 +350,7 @@ func (store *FilerEtcStore) GetUserByAccessKey(ctx context.Context, accessKey st if len(entry.Content) > 0 { content = entry.Content } else { - c, err := filer.ReadInsideFiler(client, dir, entry.Name) + c, err := filer.ReadInsideFiler(ctx, client, dir, entry.Name) if err != nil { continue } @@ -435,11 +435,11 @@ func (store *FilerEtcStore) saveIdentity(ctx context.Context, identity *iam_pb.I }) } -func (store *FilerEtcStore) readInsideFiler(dir string, name string) ([]byte, bool, error) { +func (store *FilerEtcStore) readInsideFiler(ctx context.Context, dir string, name string) ([]byte, bool, error) { var content []byte found := false err := store.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { - c, err := filer.ReadInsideFiler(client, dir, name) + c, err := filer.ReadInsideFiler(ctx, client, dir, name) if err != nil { if err == filer_pb.ErrNotFound { return nil diff --git a/weed/credential/filer_etc/filer_etc_policy.go b/weed/credential/filer_etc/filer_etc_policy.go index bff70fc0b..c83e56647 100644 --- a/weed/credential/filer_etc/filer_etc_policy.go +++ b/weed/credential/filer_etc/filer_etc_policy.go @@ -9,6 +9,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/iam_pb" "github.com/seaweedfs/seaweedfs/weed/s3api/policy_engine" ) @@ -18,13 +19,113 @@ const ( ) type PoliciesCollection struct { - Policies map[string]policy_engine.PolicyDocument `json:"policies"` + Policies map[string]policy_engine.PolicyDocument `json:"policies"` + InlinePolicies map[string]map[string]policy_engine.PolicyDocument `json:"inlinePolicies"` } func validatePolicyName(name string) error { return credential.ValidatePolicyName(name) } +func newPoliciesCollection() *PoliciesCollection { + return &PoliciesCollection{ + Policies: make(map[string]policy_engine.PolicyDocument), + InlinePolicies: make(map[string]map[string]policy_engine.PolicyDocument), + } +} + +func (store *FilerEtcStore) loadLegacyPoliciesCollection(ctx context.Context) (*PoliciesCollection, bool, error) { + policiesCollection := newPoliciesCollection() + + content, foundLegacy, err := store.readInsideFiler(ctx, filer.IamConfigDirectory, filer.IamPoliciesFile) + if err != nil { + return nil, false, err + } + if !foundLegacy || len(content) == 0 { + return policiesCollection, foundLegacy, nil + } + + if err := json.Unmarshal(content, policiesCollection); err != nil { + return nil, false, err + } + if policiesCollection.Policies == nil { + policiesCollection.Policies = make(map[string]policy_engine.PolicyDocument) + } + if policiesCollection.InlinePolicies == nil { + policiesCollection.InlinePolicies = make(map[string]map[string]policy_engine.PolicyDocument) + } + + return policiesCollection, true, nil +} + +func (store *FilerEtcStore) saveLegacyPoliciesCollection(ctx context.Context, policiesCollection *PoliciesCollection) error { + return store.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { + content, err := json.MarshalIndent(policiesCollection, "", " ") + if err != nil { + return err + } + return filer.SaveInsideFiler(client, filer.IamConfigDirectory, filer.IamPoliciesFile, content) + }) +} + +func policyDocumentToPbPolicy(name string, policy policy_engine.PolicyDocument) (*iam_pb.Policy, error) { + content, err := json.Marshal(policy) + if err != nil { + return nil, err + } + return &iam_pb.Policy{Name: name, Content: string(content)}, nil +} + +// LoadManagedPolicies loads managed policies for the S3 runtime without +// triggering legacy-to-multifile migration. This lets the runtime hydrate +// policies while preserving any legacy inline policy data stored alongside +// managed policies. +func (store *FilerEtcStore) LoadManagedPolicies(ctx context.Context) ([]*iam_pb.Policy, error) { + policiesCollection, _, err := store.loadLegacyPoliciesCollection(ctx) + if err != nil { + return nil, err + } + + policies := make(map[string]policy_engine.PolicyDocument, len(policiesCollection.Policies)) + for name, policy := range policiesCollection.Policies { + policies[name] = policy + } + + if err := store.loadPoliciesFromMultiFile(ctx, policies); err != nil { + return nil, err + } + + managedPolicies := make([]*iam_pb.Policy, 0, len(policies)) + for name, policy := range policies { + pbPolicy, err := policyDocumentToPbPolicy(name, policy) + if err != nil { + return nil, err + } + managedPolicies = append(managedPolicies, pbPolicy) + } + + return managedPolicies, nil +} + +// LoadInlinePolicies loads legacy inline policies keyed by user name. Inline +// policies are still stored in the legacy shared policies file. +func (store *FilerEtcStore) LoadInlinePolicies(ctx context.Context) (map[string]map[string]policy_engine.PolicyDocument, error) { + policiesCollection, _, err := store.loadLegacyPoliciesCollection(ctx) + if err != nil { + return nil, err + } + + inlinePolicies := make(map[string]map[string]policy_engine.PolicyDocument, len(policiesCollection.InlinePolicies)) + for userName, userPolicies := range policiesCollection.InlinePolicies { + inlinePolicies[userName] = make(map[string]policy_engine.PolicyDocument, len(userPolicies)) + for policyName, policy := range userPolicies { + inlinePolicies[userName][policyName] = policy + } + } + + return inlinePolicies, nil +} + // GetPolicies retrieves all IAM policies from the filer func (store *FilerEtcStore) GetPolicies(ctx context.Context) (map[string]policy_engine.PolicyDocument, error) { policies := make(map[string]policy_engine.PolicyDocument) @@ -43,23 +144,12 @@ func (store *FilerEtcStore) GetPolicies(ctx context.Context) (map[string]policy_ filer.IamConfigDirectory, filer.IamPoliciesFile) // 1. Load from legacy single file (low priority) - content, foundLegacy, err := store.readInsideFiler(filer.IamConfigDirectory, filer.IamPoliciesFile) + policiesCollection, _, err := store.loadLegacyPoliciesCollection(ctx) if err != nil { return nil, err } - - if foundLegacy && len(content) > 0 { - policiesCollection := &PoliciesCollection{ - Policies: make(map[string]policy_engine.PolicyDocument), - } - if err := json.Unmarshal(content, policiesCollection); err != nil { - glog.Errorf("Failed to parse legacy IAM policies from %s/%s: %v", - filer.IamConfigDirectory, filer.IamPoliciesFile, err) - } else { - for name, policy := range policiesCollection.Policies { - policies[name] = policy - } - } + for name, policy := range policiesCollection.Policies { + policies[name] = policy } // 2. Load from multi-file structure (high priority, overrides legacy) @@ -67,14 +157,6 @@ func (store *FilerEtcStore) GetPolicies(ctx context.Context) (map[string]policy_ return nil, err } - // 3. Perform migration if we loaded legacy config - if foundLegacy { - if err := store.migratePoliciesToMultiFile(ctx, policies); err != nil { - glog.Errorf("Failed to migrate IAM policies to multi-file layout: %v", err) - return policies, err - } - } - return policies, nil } @@ -98,7 +180,7 @@ func (store *FilerEtcStore) loadPoliciesFromMultiFile(ctx context.Context, polic if len(entry.Content) > 0 { content = entry.Content } else { - c, err := filer.ReadInsideFiler(client, dir, entry.Name) + c, err := filer.ReadInsideFiler(ctx, client, dir, entry.Name) if err != nil { glog.Warningf("Failed to read policy file %s: %v", entry.Name, err) continue @@ -115,7 +197,7 @@ func (store *FilerEtcStore) loadPoliciesFromMultiFile(ctx context.Context, polic // The file name is "policyName.json" policyName := entry.Name - if len(policyName) > 5 && policyName[len(policyName)-5:] == ".json" { + if strings.HasSuffix(policyName, ".json") { policyName = policyName[:len(policyName)-5] policies[policyName] = policy } @@ -184,7 +266,23 @@ func (store *FilerEtcStore) DeletePolicy(ctx context.Context, name string) error if err := validatePolicyName(name); err != nil { return err } - return store.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { + store.policyMu.Lock() + defer store.policyMu.Unlock() + + policiesCollection, foundLegacy, err := store.loadLegacyPoliciesCollection(ctx) + if err != nil { + return err + } + + deleteLegacyPolicy := false + if foundLegacy { + if _, exists := policiesCollection.Policies[name]; exists { + delete(policiesCollection.Policies, name) + deleteLegacyPolicy = true + } + } + + if err := store.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { _, err := client.DeleteEntry(ctx, &filer_pb.DeleteEntryRequest{ Directory: filer.IamConfigDirectory + "/" + IamPoliciesDirectory, Name: name + ".json", @@ -193,7 +291,15 @@ func (store *FilerEtcStore) DeletePolicy(ctx context.Context, name string) error return err } return nil - }) + }); err != nil { + return err + } + + if deleteLegacyPolicy { + return store.saveLegacyPoliciesCollection(ctx, policiesCollection) + } + + return nil } // GetPolicy retrieves a specific IAM policy by name from the filer @@ -204,7 +310,7 @@ func (store *FilerEtcStore) GetPolicy(ctx context.Context, name string) (*policy var policy *policy_engine.PolicyDocument err := store.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { - data, err := filer.ReadInsideFiler(client, filer.IamConfigDirectory+"/"+IamPoliciesDirectory, name+".json") + data, err := filer.ReadInsideFiler(ctx, client, filer.IamConfigDirectory+"/"+IamPoliciesDirectory, name+".json") if err != nil { if err == filer_pb.ErrNotFound { return nil @@ -239,6 +345,7 @@ func (store *FilerEtcStore) GetPolicy(ctx context.Context, name string) (*policy // ListPolicyNames returns all managed policy names stored in the filer. func (store *FilerEtcStore) ListPolicyNames(ctx context.Context) ([]string, error) { names := make([]string, 0) + seenNames := make(map[string]struct{}) store.mu.RLock() configured := store.filerAddressFunc != nil @@ -248,7 +355,19 @@ func (store *FilerEtcStore) ListPolicyNames(ctx context.Context) ([]string, erro return names, nil } - err := store.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { + policiesCollection, _, err := store.loadLegacyPoliciesCollection(ctx) + if err != nil { + return nil, err + } + for name := range policiesCollection.Policies { + if _, found := seenNames[name]; found { + continue + } + names = append(names, name) + seenNames[name] = struct{}{} + } + + err = store.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { dir := filer.IamConfigDirectory + "/" + IamPoliciesDirectory entries, err := listEntries(ctx, client, dir) if err != nil { @@ -266,7 +385,11 @@ func (store *FilerEtcStore) ListPolicyNames(ctx context.Context) ([]string, erro if strings.HasSuffix(name, ".json") { name = name[:len(name)-5] } + if _, found := seenNames[name]; found { + continue + } names = append(names, name) + seenNames[name] = struct{}{} } return nil diff --git a/weed/credential/filer_etc/filer_etc_policy_test.go b/weed/credential/filer_etc/filer_etc_policy_test.go new file mode 100644 index 000000000..12f0ed796 --- /dev/null +++ b/weed/credential/filer_etc/filer_etc_policy_test.go @@ -0,0 +1,369 @@ +package filer_etc + +import ( + "context" + "net" + "sort" + "strconv" + "sync" + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/weed/filer" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/iam_pb" + "github.com/seaweedfs/seaweedfs/weed/s3api/policy_engine" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/proto" +) + +type policyTestFilerServer struct { + filer_pb.UnimplementedSeaweedFilerServer + mu sync.RWMutex + entries map[string]*filer_pb.Entry + contentlessListEntry map[string]struct{} + beforeLookup func(context.Context, string, string) error + afterListEntry func(string, string) + beforeDelete func(string, string) error + beforeUpdate func(string, string) error +} + +func newPolicyTestFilerServer() *policyTestFilerServer { + return &policyTestFilerServer{ + entries: make(map[string]*filer_pb.Entry), + contentlessListEntry: make(map[string]struct{}), + } +} + +func (s *policyTestFilerServer) LookupDirectoryEntry(ctx context.Context, req *filer_pb.LookupDirectoryEntryRequest) (*filer_pb.LookupDirectoryEntryResponse, error) { + s.mu.RLock() + beforeLookup := s.beforeLookup + s.mu.RUnlock() + if beforeLookup != nil { + if err := beforeLookup(ctx, req.Directory, req.Name); err != nil { + return nil, err + } + } + + s.mu.RLock() + defer s.mu.RUnlock() + + entry, found := s.entries[filerEntryKey(req.Directory, req.Name)] + if !found { + return nil, status.Error(codes.NotFound, filer_pb.ErrNotFound.Error()) + } + + return &filer_pb.LookupDirectoryEntryResponse{Entry: cloneEntry(entry)}, nil +} + +func (s *policyTestFilerServer) ListEntries(req *filer_pb.ListEntriesRequest, stream grpc.ServerStreamingServer[filer_pb.ListEntriesResponse]) error { + s.mu.RLock() + defer s.mu.RUnlock() + + names := make([]string, 0) + for key := range s.entries { + dir, name := splitFilerEntryKey(key) + if dir != req.Directory { + continue + } + names = append(names, name) + } + sort.Strings(names) + + for _, name := range names { + entry := cloneEntry(s.entries[filerEntryKey(req.Directory, name)]) + if _, found := s.contentlessListEntry[filerEntryKey(req.Directory, name)]; found { + entry.Content = nil + } + if err := stream.Send(&filer_pb.ListEntriesResponse{Entry: entry}); err != nil { + return err + } + if s.afterListEntry != nil { + s.afterListEntry(req.Directory, name) + } + } + + return nil +} + +func (s *policyTestFilerServer) CreateEntry(_ context.Context, req *filer_pb.CreateEntryRequest) (*filer_pb.CreateEntryResponse, error) { + s.mu.Lock() + defer s.mu.Unlock() + + s.entries[filerEntryKey(req.Directory, req.Entry.Name)] = cloneEntry(req.Entry) + return &filer_pb.CreateEntryResponse{}, nil +} + +func (s *policyTestFilerServer) UpdateEntry(_ context.Context, req *filer_pb.UpdateEntryRequest) (*filer_pb.UpdateEntryResponse, error) { + s.mu.RLock() + beforeUpdate := s.beforeUpdate + s.mu.RUnlock() + if beforeUpdate != nil { + if err := beforeUpdate(req.Directory, req.Entry.Name); err != nil { + return nil, err + } + } + + s.mu.Lock() + defer s.mu.Unlock() + + s.entries[filerEntryKey(req.Directory, req.Entry.Name)] = cloneEntry(req.Entry) + return &filer_pb.UpdateEntryResponse{}, nil +} + +func (s *policyTestFilerServer) DeleteEntry(_ context.Context, req *filer_pb.DeleteEntryRequest) (*filer_pb.DeleteEntryResponse, error) { + s.mu.RLock() + beforeDelete := s.beforeDelete + s.mu.RUnlock() + if beforeDelete != nil { + if err := beforeDelete(req.Directory, req.Name); err != nil { + return nil, err + } + } + + s.mu.Lock() + defer s.mu.Unlock() + + key := filerEntryKey(req.Directory, req.Name) + if _, found := s.entries[key]; !found { + return nil, status.Error(codes.NotFound, filer_pb.ErrNotFound.Error()) + } + + delete(s.entries, key) + return &filer_pb.DeleteEntryResponse{}, nil +} + +func newPolicyTestStore(t *testing.T) *FilerEtcStore { + store, _ := newPolicyTestStoreWithServer(t) + return store +} + +func newPolicyTestStoreWithServer(t *testing.T) (*FilerEtcStore, *policyTestFilerServer) { + t.Helper() + + lis, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + + server := newPolicyTestFilerServer() + grpcServer := pb.NewGrpcServer() + filer_pb.RegisterSeaweedFilerServer(grpcServer, server) + go func() { + _ = grpcServer.Serve(lis) + }() + + t.Cleanup(func() { + grpcServer.Stop() + _ = lis.Close() + }) + + store := &FilerEtcStore{} + host, portString, err := net.SplitHostPort(lis.Addr().String()) + require.NoError(t, err) + grpcPort, err := strconv.Atoi(portString) + require.NoError(t, err) + store.SetFilerAddressFunc(func() pb.ServerAddress { + return pb.NewServerAddress(host, 1, grpcPort) + }, grpc.WithTransportCredentials(insecure.NewCredentials())) + + return store, server +} + +func TestFilerEtcStoreListPolicyNamesIncludesLegacyPolicies(t *testing.T) { + ctx := context.Background() + store := newPolicyTestStore(t) + + legacyPolicies := newPoliciesCollection() + legacyPolicies.Policies["legacy-only"] = testPolicyDocument("s3:GetObject", "arn:aws:s3:::legacy-only/*") + legacyPolicies.Policies["shared"] = testPolicyDocument("s3:GetObject", "arn:aws:s3:::shared/*") + require.NoError(t, store.saveLegacyPoliciesCollection(ctx, legacyPolicies)) + + require.NoError(t, store.savePolicy(ctx, "multi-file-only", testPolicyDocument("s3:PutObject", "arn:aws:s3:::multi-file-only/*"))) + require.NoError(t, store.savePolicy(ctx, "shared", testPolicyDocument("s3:DeleteObject", "arn:aws:s3:::shared/*"))) + + names, err := store.ListPolicyNames(ctx) + require.NoError(t, err) + + assert.ElementsMatch(t, []string{"legacy-only", "multi-file-only", "shared"}, names) +} + +func TestFilerEtcStoreDeletePolicyRemovesLegacyManagedCopy(t *testing.T) { + ctx := context.Background() + store := newPolicyTestStore(t) + + inlinePolicy := testPolicyDocument("s3:PutObject", "arn:aws:s3:::inline-user/*") + legacyPolicies := newPoliciesCollection() + legacyPolicies.Policies["legacy-only"] = testPolicyDocument("s3:GetObject", "arn:aws:s3:::legacy-only/*") + legacyPolicies.InlinePolicies["inline-user"] = map[string]policy_engine.PolicyDocument{ + "PutOnly": inlinePolicy, + } + require.NoError(t, store.saveLegacyPoliciesCollection(ctx, legacyPolicies)) + + managedPolicies, err := store.LoadManagedPolicies(ctx) + require.NoError(t, err) + assert.Equal(t, []string{"legacy-only"}, managedPolicyNames(managedPolicies)) + + require.NoError(t, store.DeletePolicy(ctx, "legacy-only")) + + managedPolicies, err = store.LoadManagedPolicies(ctx) + require.NoError(t, err) + assert.Empty(t, managedPolicies) + + inlinePolicies, err := store.LoadInlinePolicies(ctx) + require.NoError(t, err) + assertInlinePolicyPreserved(t, inlinePolicies, "inline-user", "PutOnly") + + loadedLegacyPolicies, foundLegacy, err := store.loadLegacyPoliciesCollection(ctx) + require.NoError(t, err) + require.True(t, foundLegacy) + assert.Empty(t, loadedLegacyPolicies.Policies) + assertInlinePolicyPreserved(t, loadedLegacyPolicies.InlinePolicies, "inline-user", "PutOnly") +} + +func TestFilerEtcStoreDeletePolicySerializesLegacyUpdates(t *testing.T) { + ctx := context.Background() + store, server := newPolicyTestStoreWithServer(t) + + legacyPolicies := newPoliciesCollection() + legacyPolicies.Policies["first"] = testPolicyDocument("s3:GetObject", "arn:aws:s3:::first/*") + legacyPolicies.Policies["second"] = testPolicyDocument("s3:GetObject", "arn:aws:s3:::second/*") + require.NoError(t, store.saveLegacyPoliciesCollection(ctx, legacyPolicies)) + require.NoError(t, store.savePolicy(ctx, "first", testPolicyDocument("s3:GetObject", "arn:aws:s3:::first/*"))) + require.NoError(t, store.savePolicy(ctx, "second", testPolicyDocument("s3:GetObject", "arn:aws:s3:::second/*"))) + + firstSaveStarted := make(chan struct{}) + releaseFirstSave := make(chan struct{}) + secondReachedDelete := make(chan struct{}, 1) + var blockOnce sync.Once + + server.mu.Lock() + server.beforeUpdate = func(dir string, name string) error { + if dir == filer.IamConfigDirectory && name == filer.IamPoliciesFile { + blockOnce.Do(func() { + close(firstSaveStarted) + <-releaseFirstSave + }) + } + return nil + } + server.beforeDelete = func(dir string, name string) error { + if dir == filer.IamConfigDirectory+"/"+IamPoliciesDirectory && name == "second.json" { + select { + case secondReachedDelete <- struct{}{}: + default: + } + } + return nil + } + server.mu.Unlock() + + firstDeleteErr := make(chan error, 1) + go func() { + firstDeleteErr <- store.DeletePolicy(ctx, "first") + }() + + <-firstSaveStarted + + secondDeleteErr := make(chan error, 1) + go func() { + secondDeleteErr <- store.DeletePolicy(ctx, "second") + }() + + select { + case <-secondReachedDelete: + t.Fatal("second delete reached filer mutation while first delete was still blocked") + case <-time.After(300 * time.Millisecond): + } + + close(releaseFirstSave) + + require.NoError(t, <-firstDeleteErr) + require.NoError(t, <-secondDeleteErr) + + loadedLegacyPolicies, foundLegacy, err := store.loadLegacyPoliciesCollection(ctx) + require.NoError(t, err) + require.True(t, foundLegacy) + assert.Empty(t, loadedLegacyPolicies.Policies) +} + +func TestFilerEtcStoreLoadManagedPoliciesRespectsReadContext(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + store, server := newPolicyTestStoreWithServer(t) + + require.NoError(t, store.savePolicy(context.Background(), "cancel-me", testPolicyDocument("s3:GetObject", "arn:aws:s3:::cancel-me/*"))) + + server.mu.Lock() + server.contentlessListEntry[filerEntryKey(filer.IamConfigDirectory+"/"+IamPoliciesDirectory, "cancel-me.json")] = struct{}{} + server.beforeLookup = func(ctx context.Context, dir string, name string) error { + if dir == filer.IamConfigDirectory+"/"+IamPoliciesDirectory && name == "cancel-me.json" { + cancel() + return status.Error(codes.Canceled, context.Canceled.Error()) + } + return nil + } + server.mu.Unlock() + + managedPolicies, err := store.LoadManagedPolicies(ctx) + require.NoError(t, err) + assert.Empty(t, managedPolicies) +} + +func testPolicyDocument(action string, resource string) policy_engine.PolicyDocument { + return policy_engine.PolicyDocument{ + Version: policy_engine.PolicyVersion2012_10_17, + Statement: []policy_engine.PolicyStatement{ + { + Effect: policy_engine.PolicyEffectAllow, + Action: policy_engine.NewStringOrStringSlice(action), + Resource: policy_engine.NewStringOrStringSlice(resource), + }, + }, + } +} + +func managedPolicyNames(policies []*iam_pb.Policy) []string { + names := make([]string, 0, len(policies)) + for _, policy := range policies { + names = append(names, policy.Name) + } + sort.Strings(names) + return names +} + +func assertInlinePolicyPreserved(t *testing.T, inlinePolicies map[string]map[string]policy_engine.PolicyDocument, userName string, policyName string) { + t.Helper() + + userPolicies, found := inlinePolicies[userName] + require.True(t, found) + + policy, found := userPolicies[policyName] + require.True(t, found) + assert.Equal(t, policy_engine.PolicyVersion2012_10_17, policy.Version) + require.Len(t, policy.Statement, 1) + assert.Equal(t, policy_engine.PolicyEffectAllow, policy.Statement[0].Effect) +} + +func cloneEntry(entry *filer_pb.Entry) *filer_pb.Entry { + if entry == nil { + return nil + } + return proto.Clone(entry).(*filer_pb.Entry) +} + +func filerEntryKey(dir string, name string) string { + return dir + "\x00" + name +} + +func splitFilerEntryKey(key string) (dir string, name string) { + for idx := 0; idx < len(key); idx++ { + if key[idx] == '\x00' { + return key[:idx], key[idx+1:] + } + } + return key, "" +} diff --git a/weed/credential/filer_etc/filer_etc_service_account.go b/weed/credential/filer_etc/filer_etc_service_account.go index 0fe6091cb..94324b90b 100644 --- a/weed/credential/filer_etc/filer_etc_service_account.go +++ b/weed/credential/filer_etc/filer_etc_service_account.go @@ -38,7 +38,7 @@ func (store *FilerEtcStore) loadServiceAccountsFromMultiFile(ctx context.Context if len(entry.Content) > 0 { content = entry.Content } else { - c, err := filer.ReadInsideFiler(client, dir, entry.Name) + c, err := filer.ReadInsideFiler(ctx, client, dir, entry.Name) if err != nil { glog.Warningf("Failed to read service account file %s: %v", entry.Name, err) continue @@ -133,7 +133,7 @@ func (store *FilerEtcStore) GetServiceAccount(ctx context.Context, id string) (* } var sa *iam_pb.ServiceAccount err := store.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { - data, err := filer.ReadInsideFiler(client, filer.IamConfigDirectory+"/"+IamServiceAccountsDirectory, id+".json") + data, err := filer.ReadInsideFiler(ctx, client, filer.IamConfigDirectory+"/"+IamServiceAccountsDirectory, id+".json") if err != nil { if err == filer_pb.ErrNotFound { return credential.ErrServiceAccountNotFound @@ -170,7 +170,7 @@ func (store *FilerEtcStore) ListServiceAccounts(ctx context.Context) ([]*iam_pb. if len(entry.Content) > 0 { content = entry.Content } else { - c, err := filer.ReadInsideFiler(client, dir, entry.Name) + c, err := filer.ReadInsideFiler(ctx, client, dir, entry.Name) if err != nil { glog.Warningf("Failed to read service account file %s: %v", entry.Name, err) continue diff --git a/weed/credential/filer_etc/filer_etc_store.go b/weed/credential/filer_etc/filer_etc_store.go index a9f7cc1e7..ceb15b82f 100644 --- a/weed/credential/filer_etc/filer_etc_store.go +++ b/weed/credential/filer_etc/filer_etc_store.go @@ -20,6 +20,7 @@ type FilerEtcStore struct { filerAddressFunc func() pb.ServerAddress // Function to get current active filer grpcDialOption grpc.DialOption mu sync.RWMutex // Protects filerAddressFunc and grpcDialOption + policyMu sync.Mutex // Serializes legacy managed-policy mutations } func (store *FilerEtcStore) GetName() credential.CredentialStoreTypeName { diff --git a/weed/credential/propagating_store.go b/weed/credential/propagating_store.go index 26b3446c7..d8fe615c9 100644 --- a/weed/credential/propagating_store.go +++ b/weed/credential/propagating_store.go @@ -20,6 +20,14 @@ import ( var _ CredentialStore = &PropagatingCredentialStore{} var _ PolicyManager = &PropagatingCredentialStore{} +type propagatingManagedPolicyLoader interface { + LoadManagedPolicies(ctx context.Context) ([]*iam_pb.Policy, error) +} + +type propagatingInlinePolicyLoader interface { + LoadInlinePolicies(ctx context.Context) (map[string]map[string]policy_engine.PolicyDocument, error) +} + type PropagatingCredentialStore struct { CredentialStore masterClient *wdclient.MasterClient @@ -240,6 +248,38 @@ func (s *PropagatingCredentialStore) ListPolicyNames(ctx context.Context) ([]str return s.CredentialStore.ListPolicyNames(ctx) } +func (s *PropagatingCredentialStore) LoadManagedPolicies(ctx context.Context) ([]*iam_pb.Policy, error) { + if loader, ok := s.CredentialStore.(propagatingManagedPolicyLoader); ok { + return loader.LoadManagedPolicies(ctx) + } + + policies, err := s.CredentialStore.GetPolicies(ctx) + if err != nil { + return nil, err + } + + managedPolicies := make([]*iam_pb.Policy, 0, len(policies)) + for name, policyDocument := range policies { + content, err := json.Marshal(policyDocument) + if err != nil { + return nil, err + } + managedPolicies = append(managedPolicies, &iam_pb.Policy{ + Name: name, + Content: string(content), + }) + } + + return managedPolicies, nil +} + +func (s *PropagatingCredentialStore) LoadInlinePolicies(ctx context.Context) (map[string]map[string]policy_engine.PolicyDocument, error) { + if loader, ok := s.CredentialStore.(propagatingInlinePolicyLoader); ok { + return loader.LoadInlinePolicies(ctx) + } + return nil, nil +} + func (s *PropagatingCredentialStore) CreatePolicy(ctx context.Context, name string, document policy_engine.PolicyDocument) error { if pm, ok := s.CredentialStore.(PolicyManager); ok { if err := pm.CreatePolicy(ctx, name, document); err != nil { diff --git a/weed/filer/filer_conf.go b/weed/filer/filer_conf.go index b5219df20..6ddef02e8 100644 --- a/weed/filer/filer_conf.go +++ b/weed/filer/filer_conf.go @@ -47,7 +47,7 @@ func ReadFilerConfFromFilers(filerGrpcAddresses []pb.ServerAddress, grpcDialOpti data = buf.Bytes() return nil } - content, err := ReadInsideFiler(client, DirectoryEtcSeaweedFS, FilerConfName) + content, err := ReadInsideFiler(context.Background(), client, DirectoryEtcSeaweedFS, FilerConfName) if err != nil { return err } diff --git a/weed/filer/read_write.go b/weed/filer/read_write.go index 66f4c0bf8..976f88da1 100644 --- a/weed/filer/read_write.go +++ b/weed/filer/read_write.go @@ -28,12 +28,12 @@ func ReadEntry(masterClient *wdclient.MasterClient, filerClient filer_pb.Seaweed } -func ReadInsideFiler(filerClient filer_pb.SeaweedFilerClient, dir, name string) (content []byte, err error) { +func ReadInsideFiler(ctx context.Context, filerClient filer_pb.SeaweedFilerClient, dir, name string) (content []byte, err error) { request := &filer_pb.LookupDirectoryEntryRequest{ Directory: dir, Name: name, } - respLookupEntry, err := filer_pb.LookupEntry(context.Background(), filerClient, request) + respLookupEntry, err := filer_pb.LookupEntry(ctx, filerClient, request) if err != nil { return } diff --git a/weed/filer/remote_mapping.go b/weed/filer/remote_mapping.go index 194411d54..bfb609d6b 100644 --- a/weed/filer/remote_mapping.go +++ b/weed/filer/remote_mapping.go @@ -1,6 +1,7 @@ package filer import ( + "context" "fmt" "github.com/seaweedfs/seaweedfs/weed/pb" @@ -13,7 +14,7 @@ import ( func ReadMountMappings(grpcDialOption grpc.DialOption, filerAddress pb.ServerAddress) (mappings *remote_pb.RemoteStorageMapping, readErr error) { var oldContent []byte if readErr = pb.WithFilerClient(false, 0, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { - oldContent, readErr = ReadInsideFiler(client, DirectoryEtcRemote, REMOTE_STORAGE_MOUNT_FILE) + oldContent, readErr = ReadInsideFiler(context.Background(), client, DirectoryEtcRemote, REMOTE_STORAGE_MOUNT_FILE) return readErr }); readErr != nil { if readErr != filer_pb.ErrNotFound { @@ -34,7 +35,7 @@ func InsertMountMapping(filerClient filer_pb.FilerClient, dir string, remoteStor // read current mapping var oldContent, newContent []byte err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { - oldContent, err = ReadInsideFiler(client, DirectoryEtcRemote, REMOTE_STORAGE_MOUNT_FILE) + oldContent, err = ReadInsideFiler(context.Background(), client, DirectoryEtcRemote, REMOTE_STORAGE_MOUNT_FILE) return err }) if err != nil { @@ -65,7 +66,7 @@ func DeleteMountMapping(filerClient filer_pb.FilerClient, dir string) (err error // read current mapping var oldContent, newContent []byte err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { - oldContent, err = ReadInsideFiler(client, DirectoryEtcRemote, REMOTE_STORAGE_MOUNT_FILE) + oldContent, err = ReadInsideFiler(context.Background(), client, DirectoryEtcRemote, REMOTE_STORAGE_MOUNT_FILE) return err }) if err != nil { diff --git a/weed/filer/remote_storage.go b/weed/filer/remote_storage.go index 73ef4c052..de33c8d20 100644 --- a/weed/filer/remote_storage.go +++ b/weed/filer/remote_storage.go @@ -138,7 +138,7 @@ func UnmarshalRemoteStorageMappings(oldContent []byte) (mappings *remote_pb.Remo func ReadRemoteStorageConf(grpcDialOption grpc.DialOption, filerAddress pb.ServerAddress, storageName string) (conf *remote_pb.RemoteConf, readErr error) { var oldContent []byte if readErr = pb.WithFilerClient(false, 0, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { - oldContent, readErr = ReadInsideFiler(client, DirectoryEtcRemote, storageName+REMOTE_STORAGE_CONF_SUFFIX) + oldContent, readErr = ReadInsideFiler(context.Background(), client, DirectoryEtcRemote, storageName+REMOTE_STORAGE_CONF_SUFFIX) return readErr }); readErr != nil { return nil, readErr diff --git a/weed/iam/integration/iam_manager.go b/weed/iam/integration/iam_manager.go index ba48c3e08..2e1225a89 100644 --- a/weed/iam/integration/iam_manager.go +++ b/weed/iam/integration/iam_manager.go @@ -6,6 +6,7 @@ import ( "encoding/json" "fmt" "strings" + "sync" "github.com/golang-jwt/jwt/v5" "github.com/seaweedfs/seaweedfs/weed/iam/policy" @@ -28,6 +29,8 @@ type IAMManager struct { userStore UserStore filerAddressProvider func() string // Function to get current filer address initialized bool + runtimePolicyMu sync.Mutex + runtimePolicyNames map[string]struct{} } // IAMConfig holds configuration for all IAM components @@ -105,6 +108,57 @@ func (m *IAMManager) SetUserStore(store UserStore) { m.userStore = store } +// SyncRuntimePolicies keeps zero-config runtime policies available to the +// in-memory policy engine used by the advanced IAM authorizer. +func (m *IAMManager) SyncRuntimePolicies(ctx context.Context, policies []*iam_pb.Policy) error { + if !m.initialized || m.policyEngine == nil { + return nil + } + if m.policyEngine.StoreType() != sts.StoreTypeMemory { + return nil + } + + desiredPolicies := make(map[string]*policy.PolicyDocument, len(policies)) + for _, runtimePolicy := range policies { + if runtimePolicy == nil || runtimePolicy.Name == "" { + continue + } + + var document policy.PolicyDocument + if err := json.Unmarshal([]byte(runtimePolicy.Content), &document); err != nil { + return fmt.Errorf("failed to parse runtime policy %q: %w", runtimePolicy.Name, err) + } + + desiredPolicies[runtimePolicy.Name] = &document + } + + m.runtimePolicyMu.Lock() + defer m.runtimePolicyMu.Unlock() + + filerAddress := m.getFilerAddress() + for policyName := range m.runtimePolicyNames { + if _, keep := desiredPolicies[policyName]; keep { + continue + } + if err := m.policyEngine.DeletePolicy(ctx, filerAddress, policyName); err != nil { + return fmt.Errorf("failed to delete runtime policy %q: %w", policyName, err) + } + } + + for policyName, document := range desiredPolicies { + if err := m.policyEngine.AddPolicy(filerAddress, policyName, document); err != nil { + return fmt.Errorf("failed to sync runtime policy %q: %w", policyName, err) + } + } + + m.runtimePolicyNames = make(map[string]struct{}, len(desiredPolicies)) + for policyName := range desiredPolicies { + m.runtimePolicyNames[policyName] = struct{}{} + } + + return nil +} + // Initialize initializes the IAM manager with all components func (m *IAMManager) Initialize(config *IAMConfig, filerAddressProvider func() string) error { if config == nil { @@ -422,6 +476,7 @@ func (m *IAMManager) IsActionAllowed(ctx context.Context, request *ActionRequest var baseResult *policy.EvaluationResult var err error + subjectPolicyCount := 0 if isAdmin { // Admin always has base access allowed @@ -454,6 +509,7 @@ func (m *IAMManager) IsActionAllowed(ctx context.Context, request *ActionRequest policies = roleDef.AttachedPolicies } } + subjectPolicyCount = len(policies) if bucketPolicyName != "" { // Enforce an upper bound on the number of policies to avoid excessive allocations @@ -477,6 +533,14 @@ func (m *IAMManager) IsActionAllowed(ctx context.Context, request *ActionRequest return false, nil } + // Zero-config IAM uses DefaultEffect=Allow to preserve open-by-default behavior + // for requests without any subject policies. Once a user or role has attached + // policies, "no matching statement" must fall back to deny so the attachment + // actually scopes access. + if subjectPolicyCount > 0 && len(baseResult.MatchingStatements) == 0 { + return false, nil + } + // If there's a session policy, it must also allow the action if sessionInfo != nil && sessionInfo.SessionPolicy != "" { var sessionPolicy policy.PolicyDocument diff --git a/weed/iam/policy/policy_engine.go b/weed/iam/policy/policy_engine.go index 1d7b715e5..c8cd07367 100644 --- a/weed/iam/policy/policy_engine.go +++ b/weed/iam/policy/policy_engine.go @@ -353,6 +353,27 @@ func (e *PolicyEngine) AddPolicy(filerAddress string, name string, policy *Polic return e.store.StorePolicy(context.Background(), filerAddress, name, policy) } +// DeletePolicy removes a policy from the configured store. +func (e *PolicyEngine) DeletePolicy(ctx context.Context, filerAddress string, name string) error { + if !e.initialized { + return fmt.Errorf("policy engine not initialized") + } + + if name == "" { + return fmt.Errorf("policy name cannot be empty") + } + + return e.store.DeletePolicy(ctx, filerAddress, name) +} + +// StoreType returns the configured backend type for the policy store. +func (e *PolicyEngine) StoreType() string { + if e.config == nil { + return "" + } + return e.config.StoreType +} + // Evaluate evaluates policies against a request context (filerAddress ignored for memory stores) func (e *PolicyEngine) Evaluate(ctx context.Context, filerAddress string, evalCtx *EvaluationContext, policyNames []string) (*EvaluationResult, error) { if !e.initialized { diff --git a/weed/iam/policy/policy_store.go b/weed/iam/policy/policy_store.go index d25adce61..a83f065ad 100644 --- a/weed/iam/policy/policy_store.go +++ b/weed/iam/policy/policy_store.go @@ -3,15 +3,19 @@ package policy import ( "context" "encoding/json" + "errors" "fmt" "strings" "sync" "time" + "github.com/seaweedfs/seaweedfs/weed/credential" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) // MemoryPolicyStore implements PolicyStore using in-memory storage @@ -134,11 +138,15 @@ func copyPolicyDocument(original *PolicyDocument) *PolicyDocument { copy(copied.Statement[i].NotResource, stmt.NotResource) } - // Copy condition map (shallow copy for now) + // Copy condition map if stmt.Condition != nil { copied.Statement[i].Condition = make(map[string]map[string]interface{}) - for k, v := range stmt.Condition { - copied.Statement[i].Condition[k] = v + for conditionType, conditionValues := range stmt.Condition { + copiedConditionValues := make(map[string]interface{}, len(conditionValues)) + for conditionKey, conditionValue := range conditionValues { + copiedConditionValues[conditionKey] = copyPolicyConditionValue(conditionValue) + } + copied.Statement[i].Condition[conditionType] = copiedConditionValues } } } @@ -146,6 +154,29 @@ func copyPolicyDocument(original *PolicyDocument) *PolicyDocument { return copied } +func copyPolicyConditionValue(value interface{}) interface{} { + switch v := value.(type) { + case []string: + copied := make([]string, len(v)) + copy(copied, v) + return copied + case []interface{}: + copied := make([]interface{}, len(v)) + for i := range v { + copied[i] = copyPolicyConditionValue(v[i]) + } + return copied + case map[string]interface{}: + copied := make(map[string]interface{}, len(v)) + for key, nestedValue := range v { + copied[key] = copyPolicyConditionValue(nestedValue) + } + return copied + default: + return v + } +} + // FilerPolicyStore implements PolicyStore using SeaweedFS filer type FilerPolicyStore struct { grpcDialOption grpc.DialOption @@ -198,27 +229,13 @@ func (s *FilerPolicyStore) StorePolicy(ctx context.Context, filerAddress string, // Store in filer return s.withFilerClient(filerAddress, func(client filer_pb.SeaweedFilerClient) error { - request := &filer_pb.CreateEntryRequest{ - Directory: s.basePath, - Entry: &filer_pb.Entry{ - Name: s.getPolicyFileName(name), - IsDirectory: false, - Attributes: &filer_pb.FuseAttributes{ - Mtime: time.Now().Unix(), - Crtime: time.Now().Unix(), - FileMode: uint32(0600), // Read/write for owner only - Uid: uint32(0), - Gid: uint32(0), - }, - Content: policyData, - }, - } - glog.V(3).Infof("Storing policy %s at %s", name, policyPath) - _, err := client.CreateEntry(ctx, request) - if err != nil { + if err := s.savePolicyFile(ctx, client, s.getPolicyFileName(name), policyData); err != nil { return fmt.Errorf("failed to store policy %s: %v", name, err) } + if err := s.deleteLegacyPolicyFileIfPresent(ctx, client, name); err != nil { + return err + } return nil }) @@ -239,23 +256,30 @@ func (s *FilerPolicyStore) GetPolicy(ctx context.Context, filerAddress string, n var policyData []byte err := s.withFilerClient(filerAddress, func(client filer_pb.SeaweedFilerClient) error { - request := &filer_pb.LookupDirectoryEntryRequest{ - Directory: s.basePath, - Name: s.getPolicyFileName(name), - } + for _, fileName := range s.getPolicyLookupFileNames(name) { + request := &filer_pb.LookupDirectoryEntryRequest{ + Directory: s.basePath, + Name: fileName, + } - glog.V(3).Infof("Looking up policy %s", name) - response, err := client.LookupDirectoryEntry(ctx, request) - if err != nil { - return fmt.Errorf("policy not found: %v", err) - } + glog.V(3).Infof("Looking up policy %s as %s", name, fileName) + response, err := client.LookupDirectoryEntry(ctx, request) + if err != nil { + if isNotFoundPolicyStoreError(err) { + continue + } + return fmt.Errorf("policy lookup failed: %v", err) + } + + if response.Entry == nil { + continue + } - if response.Entry == nil { - return fmt.Errorf("policy not found") + policyData = response.Entry.Content + return nil } - policyData = response.Entry.Content - return nil + return fmt.Errorf("policy not found") }) if err != nil { @@ -285,31 +309,27 @@ func (s *FilerPolicyStore) DeletePolicy(ctx context.Context, filerAddress string } return s.withFilerClient(filerAddress, func(client filer_pb.SeaweedFilerClient) error { - request := &filer_pb.DeleteEntryRequest{ - Directory: s.basePath, - Name: s.getPolicyFileName(name), - IsDeleteData: true, - IsRecursive: false, - IgnoreRecursiveError: false, - } + for _, fileName := range s.getPolicyLookupFileNames(name) { + request := &filer_pb.DeleteEntryRequest{ + Directory: s.basePath, + Name: fileName, + IsDeleteData: true, + IsRecursive: false, + IgnoreRecursiveError: false, + } - glog.V(3).Infof("Deleting policy %s", name) - resp, err := client.DeleteEntry(ctx, request) - if err != nil { - // Ignore "not found" errors - policy may already be deleted - if strings.Contains(err.Error(), "not found") { - return nil + glog.V(3).Infof("Deleting policy %s as %s", name, fileName) + resp, err := client.DeleteEntry(ctx, request) + if err != nil { + if isNotFoundPolicyStoreError(err) { + continue + } + return fmt.Errorf("failed to delete policy %s: %v", name, err) } - return fmt.Errorf("failed to delete policy %s: %v", name, err) - } - // Check response error - if resp.Error != "" { - // Ignore "not found" errors - policy may already be deleted - if strings.Contains(resp.Error, "not found") { - return nil + if resp.Error != "" { + return fmt.Errorf("failed to delete policy %s: %s", name, resp.Error) } - return fmt.Errorf("failed to delete policy %s: %s", name, resp.Error) } return nil @@ -332,7 +352,7 @@ func (s *FilerPolicyStore) ListPolicies(ctx context.Context, filerAddress string // List all entries in the policy directory request := &filer_pb.ListEntriesRequest{ Directory: s.basePath, - Prefix: "policy_", + Prefix: "", StartFromFileName: "", InclusiveStartFrom: false, Limit: 1000, // Process in batches of 1000 @@ -353,11 +373,7 @@ func (s *FilerPolicyStore) ListPolicies(ctx context.Context, filerAddress string continue } - // Extract policy name from filename - filename := resp.Entry.Name - if strings.HasPrefix(filename, "policy_") && strings.HasSuffix(filename, ".json") { - // Remove "policy_" prefix and ".json" suffix - policyName := strings.TrimSuffix(strings.TrimPrefix(filename, "policy_"), ".json") + if policyName, ok := s.policyNameFromFileName(resp.Entry.Name); ok { policyNames = append(policyNames, policyName) } } @@ -369,7 +385,17 @@ func (s *FilerPolicyStore) ListPolicies(ctx context.Context, filerAddress string return nil, err } - return policyNames, nil + uniquePolicyNames := make([]string, 0, len(policyNames)) + seen := make(map[string]struct{}, len(policyNames)) + for _, policyName := range policyNames { + if _, found := seen[policyName]; found { + continue + } + seen[policyName] = struct{}{} + uniquePolicyNames = append(uniquePolicyNames, policyName) + } + + return uniquePolicyNames, nil } // Helper methods @@ -391,5 +417,115 @@ func (s *FilerPolicyStore) getPolicyPath(policyName string) string { // getPolicyFileName returns the filename for a policy func (s *FilerPolicyStore) getPolicyFileName(policyName string) string { + return s.getCanonicalPolicyFileName(policyName) +} + +func (s *FilerPolicyStore) getLegacyPolicyFileName(policyName string) string { return "policy_" + policyName + ".json" } + +func (s *FilerPolicyStore) getCanonicalPolicyFileName(policyName string) string { + return policyName + ".json" +} + +func (s *FilerPolicyStore) getPolicyLookupFileNames(policyName string) []string { + return []string{ + s.getCanonicalPolicyFileName(policyName), + s.getLegacyPolicyFileName(policyName), + } +} + +func (s *FilerPolicyStore) policyNameFromFileName(fileName string) (string, bool) { + if !strings.HasSuffix(fileName, ".json") { + return "", false + } + policyName := strings.TrimSuffix(fileName, ".json") + if strings.HasPrefix(fileName, "policy_") { + policyName = strings.TrimPrefix(policyName, "policy_") + } + if s.isSupportedPolicyName(policyName) { + return policyName, true + } + return "", false +} + +func (s *FilerPolicyStore) isSupportedPolicyName(policyName string) bool { + if policyName == "" { + return false + } + // Bucket policies are stored alongside IAM policies but use the internal + // "bucket-policy:" naming scheme, which is intentionally outside the + // public IAM policy-name validator. + if strings.HasPrefix(policyName, "bucket-policy:") { + return len(policyName) > len("bucket-policy:") + } + return credential.ValidatePolicyName(policyName) == nil +} + +func (s *FilerPolicyStore) deleteLegacyPolicyFileIfPresent(ctx context.Context, client filer_pb.SeaweedFilerClient, policyName string) error { + legacyFileName := s.getLegacyPolicyFileName(policyName) + response, err := client.DeleteEntry(ctx, &filer_pb.DeleteEntryRequest{ + Directory: s.basePath, + Name: legacyFileName, + IsDeleteData: true, + IsRecursive: false, + IgnoreRecursiveError: false, + }) + if err != nil { + if isNotFoundPolicyStoreError(err) { + return nil + } + return fmt.Errorf("failed to delete legacy policy %s: %v", policyName, err) + } + if response.Error != "" { + return fmt.Errorf("failed to delete legacy policy %s: %s", policyName, response.Error) + } + return nil +} + +func (s *FilerPolicyStore) savePolicyFile(ctx context.Context, client filer_pb.SeaweedFilerClient, fileName string, content []byte) error { + now := time.Now().Unix() + entry := &filer_pb.Entry{ + Name: fileName, + IsDirectory: false, + Attributes: &filer_pb.FuseAttributes{ + Mtime: now, + Crtime: now, + FileMode: uint32(0600), + Uid: uint32(0), + Gid: uint32(0), + FileSize: uint64(len(content)), + }, + Content: content, + } + + createRequest := &filer_pb.CreateEntryRequest{ + Directory: s.basePath, + Entry: entry, + } + + if err := filer_pb.CreateEntry(ctx, client, createRequest); err == nil { + return nil + } else if !isAlreadyExistsPolicyStoreError(err) { + return err + } + + return filer_pb.UpdateEntry(ctx, client, &filer_pb.UpdateEntryRequest{ + Directory: s.basePath, + Entry: entry, + }) +} + +func isNotFoundPolicyStoreError(err error) bool { + if err == nil { + return false + } + return errors.Is(err, filer_pb.ErrNotFound) || status.Code(err) == codes.NotFound +} + +func isAlreadyExistsPolicyStoreError(err error) bool { + if err == nil { + return false + } + return status.Code(err) == codes.AlreadyExists +} diff --git a/weed/iam/policy/policy_store_test.go b/weed/iam/policy/policy_store_test.go new file mode 100644 index 000000000..8e4f915a4 --- /dev/null +++ b/weed/iam/policy/policy_store_test.go @@ -0,0 +1,314 @@ +package policy + +import ( + "context" + "encoding/json" + "fmt" + "net" + "sort" + "strconv" + "sync" + "testing" + + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/proto" +) + +type policyStoreTestFilerServer struct { + filer_pb.UnimplementedSeaweedFilerServer + mu sync.RWMutex + entries map[string]*filer_pb.Entry +} + +func newPolicyStoreTestFilerServer() *policyStoreTestFilerServer { + return &policyStoreTestFilerServer{ + entries: make(map[string]*filer_pb.Entry), + } +} + +func (s *policyStoreTestFilerServer) LookupDirectoryEntry(_ context.Context, req *filer_pb.LookupDirectoryEntryRequest) (*filer_pb.LookupDirectoryEntryResponse, error) { + s.mu.RLock() + defer s.mu.RUnlock() + + entry, found := s.entries[policyStoreTestEntryKey(req.Directory, req.Name)] + if !found { + return nil, status.Error(codes.NotFound, filer_pb.ErrNotFound.Error()) + } + + return &filer_pb.LookupDirectoryEntryResponse{Entry: clonePolicyStoreEntry(entry)}, nil +} + +func (s *policyStoreTestFilerServer) CreateEntry(_ context.Context, req *filer_pb.CreateEntryRequest) (*filer_pb.CreateEntryResponse, error) { + s.mu.Lock() + defer s.mu.Unlock() + + key := policyStoreTestEntryKey(req.Directory, req.Entry.Name) + if _, found := s.entries[key]; found { + return nil, status.Error(codes.AlreadyExists, "entry already exists") + } + + s.entries[key] = clonePolicyStoreEntry(req.Entry) + return &filer_pb.CreateEntryResponse{}, nil +} + +func (s *policyStoreTestFilerServer) UpdateEntry(_ context.Context, req *filer_pb.UpdateEntryRequest) (*filer_pb.UpdateEntryResponse, error) { + s.mu.Lock() + defer s.mu.Unlock() + + key := policyStoreTestEntryKey(req.Directory, req.Entry.Name) + if _, found := s.entries[key]; !found { + return nil, status.Error(codes.NotFound, filer_pb.ErrNotFound.Error()) + } + + s.entries[key] = clonePolicyStoreEntry(req.Entry) + return &filer_pb.UpdateEntryResponse{}, nil +} + +func (s *policyStoreTestFilerServer) ListEntries(req *filer_pb.ListEntriesRequest, stream grpc.ServerStreamingServer[filer_pb.ListEntriesResponse]) error { + s.mu.RLock() + defer s.mu.RUnlock() + + names := make([]string, 0) + for key := range s.entries { + dir, name := splitPolicyStoreEntryKey(key) + if dir != req.Directory { + continue + } + if req.Prefix != "" && len(name) >= len(req.Prefix) && name[:len(req.Prefix)] != req.Prefix { + continue + } + if req.Prefix != "" && len(name) < len(req.Prefix) { + continue + } + names = append(names, name) + } + sort.Strings(names) + + for _, name := range names { + if err := stream.Send(&filer_pb.ListEntriesResponse{ + Entry: clonePolicyStoreEntry(s.entries[policyStoreTestEntryKey(req.Directory, name)]), + }); err != nil { + return err + } + } + + return nil +} + +func (s *policyStoreTestFilerServer) DeleteEntry(_ context.Context, req *filer_pb.DeleteEntryRequest) (*filer_pb.DeleteEntryResponse, error) { + s.mu.Lock() + defer s.mu.Unlock() + + key := policyStoreTestEntryKey(req.Directory, req.Name) + if _, found := s.entries[key]; !found { + return nil, status.Error(codes.NotFound, filer_pb.ErrNotFound.Error()) + } + + delete(s.entries, key) + return &filer_pb.DeleteEntryResponse{}, nil +} + +func (s *policyStoreTestFilerServer) putPolicyFile(t *testing.T, dir string, name string, document *PolicyDocument) { + t.Helper() + + content, err := json.Marshal(document) + require.NoError(t, err) + + s.mu.Lock() + defer s.mu.Unlock() + s.entries[policyStoreTestEntryKey(dir, name)] = &filer_pb.Entry{ + Name: name, + Content: content, + } +} + +func (s *policyStoreTestFilerServer) hasEntry(dir string, name string) bool { + s.mu.RLock() + defer s.mu.RUnlock() + _, found := s.entries[policyStoreTestEntryKey(dir, name)] + return found +} + +func newTestFilerPolicyStore(t *testing.T) (*FilerPolicyStore, *policyStoreTestFilerServer) { + t.Helper() + + lis, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + + server := newPolicyStoreTestFilerServer() + grpcServer := pb.NewGrpcServer() + filer_pb.RegisterSeaweedFilerServer(grpcServer, server) + go func() { + _ = grpcServer.Serve(lis) + }() + + t.Cleanup(func() { + grpcServer.Stop() + _ = lis.Close() + }) + + host, portString, err := net.SplitHostPort(lis.Addr().String()) + require.NoError(t, err) + grpcPort, err := strconv.Atoi(portString) + require.NoError(t, err) + + store, err := NewFilerPolicyStore(nil, func() string { + return string(pb.NewServerAddress(host, 1, grpcPort)) + }) + require.NoError(t, err) + store.grpcDialOption = grpc.WithTransportCredentials(insecure.NewCredentials()) + + return store, server +} + +func TestFilerPolicyStoreGetPolicyPrefersCanonicalFiles(t *testing.T) { + ctx := context.Background() + store, server := newTestFilerPolicyStore(t) + + server.putPolicyFile(t, store.basePath, "cli-bucket-access-policy.json", testPolicyDocument("s3:ListBucket", "arn:aws:s3:::cli-allowed-bucket")) + server.putPolicyFile(t, store.basePath, "policy_cli-bucket-access-policy.json", testPolicyDocument("s3:PutObject", "arn:aws:s3:::cli-forbidden-bucket/*")) + + document, err := store.GetPolicy(ctx, "", "cli-bucket-access-policy") + require.NoError(t, err) + require.Len(t, document.Statement, 1) + assert.Equal(t, "s3:ListBucket", document.Statement[0].Action[0]) + assert.Equal(t, "arn:aws:s3:::cli-allowed-bucket", document.Statement[0].Resource[0]) +} + +func TestFilerPolicyStoreListPoliciesIncludesCanonicalAndLegacyFiles(t *testing.T) { + ctx := context.Background() + store, server := newTestFilerPolicyStore(t) + + server.putPolicyFile(t, store.basePath, "canonical-only.json", testPolicyDocument("s3:GetObject", "arn:aws:s3:::canonical-only/*")) + server.putPolicyFile(t, store.basePath, "policy_legacy-only.json", testPolicyDocument("s3:PutObject", "arn:aws:s3:::legacy-only/*")) + server.putPolicyFile(t, store.basePath, "shared.json", testPolicyDocument("s3:DeleteObject", "arn:aws:s3:::shared/*")) + server.putPolicyFile(t, store.basePath, "policy_shared.json", testPolicyDocument("s3:ListBucket", "arn:aws:s3:::shared")) + server.putPolicyFile(t, store.basePath, "policy_invalid:name.json", testPolicyDocument("s3:GetObject", "arn:aws:s3:::ignored/*")) + server.putPolicyFile(t, store.basePath, "bucket-policy:bucket-a.json", testPolicyDocument("s3:ListBucket", "arn:aws:s3:::bucket-a")) + + names, err := store.ListPolicies(ctx, "") + require.NoError(t, err) + + assert.ElementsMatch(t, []string{"canonical-only", "legacy-only", "shared", "bucket-policy:bucket-a"}, names) +} + +func TestFilerPolicyStoreDeletePolicyRemovesCanonicalAndLegacyFiles(t *testing.T) { + ctx := context.Background() + store, server := newTestFilerPolicyStore(t) + + server.putPolicyFile(t, store.basePath, "dual-format.json", testPolicyDocument("s3:GetObject", "arn:aws:s3:::dual-format/*")) + server.putPolicyFile(t, store.basePath, "policy_dual-format.json", testPolicyDocument("s3:PutObject", "arn:aws:s3:::dual-format/*")) + + require.NoError(t, store.DeletePolicy(ctx, "", "dual-format")) + assert.False(t, server.hasEntry(store.basePath, "dual-format.json")) + assert.False(t, server.hasEntry(store.basePath, "policy_dual-format.json")) +} + +func TestFilerPolicyStoreStorePolicyWritesCanonicalFileAndRemovesLegacyTwin(t *testing.T) { + ctx := context.Background() + store, server := newTestFilerPolicyStore(t) + + server.putPolicyFile(t, store.basePath, "policy_dual-format.json", testPolicyDocument("s3:PutObject", "arn:aws:s3:::dual-format/*")) + + require.NoError(t, store.StorePolicy(ctx, "", "dual-format", testPolicyDocument("s3:GetObject", "arn:aws:s3:::dual-format/*"))) + + assert.True(t, server.hasEntry(store.basePath, "dual-format.json")) + assert.False(t, server.hasEntry(store.basePath, "policy_dual-format.json")) + + document, err := store.GetPolicy(ctx, "", "dual-format") + require.NoError(t, err) + require.Len(t, document.Statement, 1) + assert.Equal(t, "s3:GetObject", document.Statement[0].Action[0]) +} + +func TestFilerPolicyStoreStorePolicyUpdatesExistingCanonicalFile(t *testing.T) { + ctx := context.Background() + store, server := newTestFilerPolicyStore(t) + + server.putPolicyFile(t, store.basePath, "existing.json", testPolicyDocument("s3:PutObject", "arn:aws:s3:::existing/*")) + + require.NoError(t, store.StorePolicy(ctx, "", "existing", testPolicyDocument("s3:GetObject", "arn:aws:s3:::existing/*"))) + + document, err := store.GetPolicy(ctx, "", "existing") + require.NoError(t, err) + require.Len(t, document.Statement, 1) + assert.Equal(t, "s3:GetObject", document.Statement[0].Action[0]) + assert.Equal(t, "arn:aws:s3:::existing/*", document.Statement[0].Resource[0]) +} + +func TestCopyPolicyDocumentClonesConditionState(t *testing.T) { + original := &PolicyDocument{ + Version: "2012-10-17", + Statement: []Statement{ + { + Effect: "Allow", + Action: []string{"s3:GetObject"}, + Resource: []string{ + "arn:aws:s3:::test-bucket/*", + }, + Condition: map[string]map[string]interface{}{ + "StringEquals": { + "s3:prefix": []string{"public/", "private/"}, + }, + "Null": { + "aws:PrincipalArn": "false", + }, + }, + }, + }, + } + + copied := copyPolicyDocument(original) + require.NotNil(t, copied) + + original.Statement[0].Condition["StringEquals"]["s3:prefix"] = []string{"mutated/"} + original.Statement[0].Condition["Null"]["aws:PrincipalArn"] = "true" + + assert.Equal(t, []string{"public/", "private/"}, copied.Statement[0].Condition["StringEquals"]["s3:prefix"]) + assert.Equal(t, "false", copied.Statement[0].Condition["Null"]["aws:PrincipalArn"]) +} + +func TestIsAlreadyExistsPolicyStoreErrorUsesStatusCode(t *testing.T) { + assert.True(t, isAlreadyExistsPolicyStoreError(status.Error(codes.AlreadyExists, "entry already exists"))) + assert.False(t, isAlreadyExistsPolicyStoreError(fmt.Errorf("entry already exists"))) +} + +func testPolicyDocument(action string, resource string) *PolicyDocument { + return &PolicyDocument{ + Version: "2012-10-17", + Statement: []Statement{ + { + Effect: "Allow", + Action: []string{action}, + Resource: []string{resource}, + }, + }, + } +} + +func clonePolicyStoreEntry(entry *filer_pb.Entry) *filer_pb.Entry { + if entry == nil { + return nil + } + return proto.Clone(entry).(*filer_pb.Entry) +} + +func policyStoreTestEntryKey(dir string, name string) string { + return dir + "\x00" + name +} + +func splitPolicyStoreEntryKey(key string) (string, string) { + for i := 0; i < len(key); i++ { + if key[i] == '\x00' { + return key[:i], key[i+1:] + } + } + return key, "" +} diff --git a/weed/mount/filer_conf.go b/weed/mount/filer_conf.go index 3c71bb9ce..48c9c98c7 100644 --- a/weed/mount/filer_conf.go +++ b/weed/mount/filer_conf.go @@ -1,6 +1,7 @@ package mount import ( + "context" "errors" "fmt" "path/filepath" @@ -20,7 +21,7 @@ func (wfs *WFS) subscribeFilerConfEvents() (*meta_cache.MetadataFollower, error) // read current conf err := wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { - content, err := filer.ReadInsideFiler(client, confDir, confName) + content, err := filer.ReadInsideFiler(context.Background(), client, confDir, confName) if err != nil { return err } diff --git a/weed/mq/kafka/gateway/coordinator_registry.go b/weed/mq/kafka/gateway/coordinator_registry.go index f0af6647a..6e651a5d8 100644 --- a/weed/mq/kafka/gateway/coordinator_registry.go +++ b/weed/mq/kafka/gateway/coordinator_registry.go @@ -678,7 +678,7 @@ func (cr *CoordinatorRegistry) loadCoordinatorAssignmentWithClient(consumerGroup err := clientAccessor.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { // Load from individual file: /topics/kafka/.meta/coordinators/_assignments.json fileName := fmt.Sprintf("%s_assignments.json", consumerGroup) - data, err := filer.ReadInsideFiler(client, CoordinatorAssignmentsDir, fileName) + data, err := filer.ReadInsideFiler(context.Background(), client, CoordinatorAssignmentsDir, fileName) if err != nil { return fmt.Errorf("assignment file not found for group %s: %w", consumerGroup, err) } diff --git a/weed/mq/offset/consumer_group_storage.go b/weed/mq/offset/consumer_group_storage.go index 74c2db908..c38e77308 100644 --- a/weed/mq/offset/consumer_group_storage.go +++ b/weed/mq/offset/consumer_group_storage.go @@ -108,7 +108,7 @@ func (f *FilerConsumerGroupOffsetStorage) LoadConsumerGroupPosition(t topic.Topi var position *ConsumerGroupPosition err := f.filerClientAccessor.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { - data, err := filer.ReadInsideFiler(client, consumersDir, offsetFileName) + data, err := filer.ReadInsideFiler(context.Background(), client, consumersDir, offsetFileName) if err != nil { return err } diff --git a/weed/mq/offset/filer_storage.go b/weed/mq/offset/filer_storage.go index 81be78470..6f1a71e39 100644 --- a/weed/mq/offset/filer_storage.go +++ b/weed/mq/offset/filer_storage.go @@ -1,6 +1,7 @@ package offset import ( + "context" "fmt" "time" @@ -48,7 +49,7 @@ func (f *FilerOffsetStorage) LoadCheckpoint(namespace, topicName string, partiti var offset int64 = -1 err := f.filerClientAccessor.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { - data, err := filer.ReadInsideFiler(client, partitionDir, fileName) + data, err := filer.ReadInsideFiler(context.Background(), client, partitionDir, fileName) if err != nil { return err } diff --git a/weed/mq/topic/topic.go b/weed/mq/topic/topic.go index 6fb0f0ce9..ad8c4864d 100644 --- a/weed/mq/topic/topic.go +++ b/weed/mq/topic/topic.go @@ -50,7 +50,7 @@ func (t Topic) Dir() string { } func (t Topic) ReadConfFile(client filer_pb.SeaweedFilerClient) (*mq_pb.ConfigureTopicResponse, error) { - data, err := filer.ReadInsideFiler(client, t.Dir(), filer.TopicConfFile) + data, err := filer.ReadInsideFiler(context.Background(), client, t.Dir(), filer.TopicConfFile) if errors.Is(err, filer_pb.ErrNotFound) { return nil, err } diff --git a/weed/query/engine/broker_client.go b/weed/query/engine/broker_client.go index cb0b90411..a34e79b3e 100644 --- a/weed/query/engine/broker_client.go +++ b/weed/query/engine/broker_client.go @@ -284,7 +284,7 @@ func (c *BrokerClient) GetTopicSchema(ctx context.Context, namespace, topicName } // Read the topic.conf file content - data, err := filer.ReadInsideFiler(client, topicDir, "topic.conf") + data, err := filer.ReadInsideFiler(ctx, client, topicDir, "topic.conf") if err != nil { return fmt.Errorf("failed to read topic.conf for %s.%s: %v", namespace, topicName, err) } diff --git a/weed/s3api/auth_credentials.go b/weed/s3api/auth_credentials.go index 32c7b044a..ae7b48be3 100644 --- a/weed/s3api/auth_credentials.go +++ b/weed/s3api/auth_credentials.go @@ -1553,6 +1553,165 @@ func (iam *IdentityAccessManagement) GetCredentialManager() *credential.Credenti return iam.credentialManager } +type managedPolicyLoader interface { + LoadManagedPolicies(ctx context.Context) ([]*iam_pb.Policy, error) +} + +type inlinePolicyLoader interface { + LoadInlinePolicies(ctx context.Context) (map[string]map[string]policy_engine.PolicyDocument, error) +} + +func inlinePolicyRuntimeName(userName, policyName string) string { + return "__inline_policy__/" + userName + "/" + policyName +} + +func mergePoliciesIntoConfiguration(config *iam_pb.S3ApiConfiguration, policies []*iam_pb.Policy) { + if len(policies) == 0 { + return + } + + existingPolicies := make(map[string]int, len(config.Policies)) + for idx, policy := range config.Policies { + if policy == nil || policy.Name == "" { + continue + } + existingPolicies[policy.Name] = idx + } + + for _, policy := range policies { + if policy == nil || policy.Name == "" { + continue + } + policyCopy := &iam_pb.Policy{Name: policy.Name, Content: policy.Content} + if existingIdx, found := existingPolicies[policy.Name]; found { + config.Policies[existingIdx] = policyCopy + continue + } + + config.Policies = append(config.Policies, policyCopy) + existingPolicies[policy.Name] = len(config.Policies) - 1 + } +} + +func appendUniquePolicyName(policyNames []string, policyName string) []string { + for _, existingPolicyName := range policyNames { + if existingPolicyName == policyName { + return policyNames + } + } + return append(policyNames, policyName) +} + +func (iam *IdentityAccessManagement) loadManagedPoliciesForRuntime(ctx context.Context) ([]*iam_pb.Policy, error) { + store := iam.credentialManager.GetStore() + if store == nil { + return nil, nil + } + + if loader, ok := store.(managedPolicyLoader); ok { + return loader.LoadManagedPolicies(ctx) + } + + policies, err := iam.credentialManager.GetPolicies(ctx) + if err != nil { + return nil, err + } + + managedPolicies := make([]*iam_pb.Policy, 0, len(policies)) + for name, policyDocument := range policies { + content, err := json.Marshal(policyDocument) + if err != nil { + return nil, fmt.Errorf("failed to marshal policy %q: %w", name, err) + } + + managedPolicies = append(managedPolicies, &iam_pb.Policy{ + Name: name, + Content: string(content), + }) + } + + return managedPolicies, nil +} + +func (iam *IdentityAccessManagement) hydrateRuntimePolicies(ctx context.Context, config *iam_pb.S3ApiConfiguration) error { + if iam.credentialManager == nil || config == nil { + return nil + } + + managedPolicies, err := iam.loadManagedPoliciesForRuntime(ctx) + if err != nil { + return fmt.Errorf("failed to load managed policies for runtime: %w", err) + } + mergePoliciesIntoConfiguration(config, managedPolicies) + + store := iam.credentialManager.GetStore() + if store == nil { + return nil + } + + inlineLoader, ok := store.(inlinePolicyLoader) + if !ok { + return nil + } + + inlinePoliciesByUser, err := inlineLoader.LoadInlinePolicies(ctx) + if err != nil { + return fmt.Errorf("failed to load inline policies for runtime: %w", err) + } + + if len(inlinePoliciesByUser) == 0 { + return nil + } + + identityByName := make(map[string]*iam_pb.Identity, len(config.Identities)) + for _, identity := range config.Identities { + identityByName[identity.Name] = identity + } + + inlinePolicies := make([]*iam_pb.Policy, 0) + for userName, userPolicies := range inlinePoliciesByUser { + identity, found := identityByName[userName] + if !found { + continue + } + + for policyName, policyDocument := range userPolicies { + content, err := json.Marshal(policyDocument) + if err != nil { + return fmt.Errorf("failed to marshal inline policy %q for user %q: %w", policyName, userName, err) + } + + runtimePolicyName := inlinePolicyRuntimeName(userName, policyName) + inlinePolicies = append(inlinePolicies, &iam_pb.Policy{ + Name: runtimePolicyName, + Content: string(content), + }) + identity.PolicyNames = appendUniquePolicyName(identity.PolicyNames, runtimePolicyName) + } + } + + mergePoliciesIntoConfiguration(config, inlinePolicies) + return nil +} + +func (iam *IdentityAccessManagement) syncRuntimePoliciesToIAMManager(ctx context.Context, policies []*iam_pb.Policy) error { + if iam == nil || iam.iamIntegration == nil { + return nil + } + + provider, ok := iam.iamIntegration.(IAMManagerProvider) + if !ok { + return nil + } + + manager := provider.GetIAMManager() + if manager == nil { + return nil + } + + return manager.SyncRuntimePolicies(ctx, policies) +} + // LoadS3ApiConfigurationFromCredentialManager loads configuration using the credential manager func (iam *IdentityAccessManagement) LoadS3ApiConfigurationFromCredentialManager() error { glog.V(1).Infof("Loading S3 API configuration from credential manager") @@ -1566,6 +1725,15 @@ func (iam *IdentityAccessManagement) LoadS3ApiConfigurationFromCredentialManager glog.V(2).Infof("Credential manager returned %d identities and %d accounts", len(s3ApiConfiguration.Identities), len(s3ApiConfiguration.Accounts)) + if err := iam.hydrateRuntimePolicies(context.Background(), s3ApiConfiguration); err != nil { + glog.Errorf("Failed to hydrate runtime IAM policies: %v", err) + return err + } + if err := iam.syncRuntimePoliciesToIAMManager(context.Background(), s3ApiConfiguration.Policies); err != nil { + glog.Errorf("Failed to sync runtime IAM policies to advanced IAM manager: %v", err) + return err + } + if err := iam.loadS3ApiConfiguration(s3ApiConfiguration); err != nil { glog.Errorf("Failed to load S3 API configuration: %v", err) return err @@ -1726,11 +1894,23 @@ func (iam *IdentityAccessManagement) VerifyActionPermission(r *http.Request, ide hasSessionToken := r.Header.Get("X-SeaweedFS-Session-Token") != "" || r.Header.Get("X-Amz-Security-Token") != "" || r.URL.Query().Get("X-Amz-Security-Token") != "" + hasAttachedPolicies := len(identity.PolicyNames) > 0 - if (len(identity.Actions) == 0 || hasSessionToken) && iam.iamIntegration != nil { + if (len(identity.Actions) == 0 || hasSessionToken || hasAttachedPolicies) && iam.iamIntegration != nil { return iam.authorizeWithIAM(r, identity, action, bucket, object) } + // Attached IAM policies are authoritative for IAM users. The legacy Actions + // field is a lossy projection that cannot represent deny statements, + // conditions, or fine-grained action differences such as PutObject vs + // DeleteObject. + if hasAttachedPolicies { + if iam.evaluateIAMPolicies(r, identity, action, bucket, object) { + return s3err.ErrNone + } + return s3err.ErrAccessDenied + } + // Traditional actions-based authorization from static S3 config. if len(identity.Actions) > 0 { if !identity.CanDo(action, bucket, object) { @@ -1739,14 +1919,6 @@ func (iam *IdentityAccessManagement) VerifyActionPermission(r *http.Request, ide return s3err.ErrNone } - // IAM policy fallback for identities with attached policies but without IAM integration. - if len(identity.PolicyNames) > 0 { - if iam.evaluateIAMPolicies(r, identity, action, bucket, object) { - return s3err.ErrNone - } - return s3err.ErrAccessDenied - } - return s3err.ErrAccessDenied } diff --git a/weed/s3api/auth_credentials_test.go b/weed/s3api/auth_credentials_test.go index ad4f68f72..7438cce83 100644 --- a/weed/s3api/auth_credentials_test.go +++ b/weed/s3api/auth_credentials_test.go @@ -1,6 +1,7 @@ package s3api import ( + "context" "crypto/tls" "fmt" "net/http" @@ -10,18 +11,71 @@ import ( "testing" "github.com/seaweedfs/seaweedfs/weed/credential" + "github.com/seaweedfs/seaweedfs/weed/credential/memory" + "github.com/seaweedfs/seaweedfs/weed/pb/iam_pb" + "github.com/seaweedfs/seaweedfs/weed/s3api/policy_engine" . "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" "github.com/seaweedfs/seaweedfs/weed/s3api/s3err" "github.com/seaweedfs/seaweedfs/weed/util/wildcard" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" - "github.com/seaweedfs/seaweedfs/weed/pb/iam_pb" jsonpb "google.golang.org/protobuf/encoding/protojson" _ "github.com/seaweedfs/seaweedfs/weed/credential/filer_etc" - _ "github.com/seaweedfs/seaweedfs/weed/credential/memory" ) +type loadConfigurationDropsPoliciesStore struct { + *memory.MemoryStore + loadManagedPoliciesCalled bool +} + +func (store *loadConfigurationDropsPoliciesStore) LoadConfiguration(ctx context.Context) (*iam_pb.S3ApiConfiguration, error) { + config, err := store.MemoryStore.LoadConfiguration(ctx) + if err != nil { + return nil, err + } + stripped := *config + stripped.Policies = nil + return &stripped, nil +} + +func (store *loadConfigurationDropsPoliciesStore) LoadManagedPolicies(ctx context.Context) ([]*iam_pb.Policy, error) { + store.loadManagedPoliciesCalled = true + + config, err := store.MemoryStore.LoadConfiguration(ctx) + if err != nil { + return nil, err + } + + policies := make([]*iam_pb.Policy, 0, len(config.Policies)) + for _, policy := range config.Policies { + policies = append(policies, &iam_pb.Policy{ + Name: policy.Name, + Content: policy.Content, + }) + } + + return policies, nil +} + +type inlinePolicyRuntimeStore struct { + *memory.MemoryStore + inlinePolicies map[string]map[string]policy_engine.PolicyDocument +} + +func (store *inlinePolicyRuntimeStore) LoadInlinePolicies(ctx context.Context) (map[string]map[string]policy_engine.PolicyDocument, error) { + _ = ctx + return store.inlinePolicies, nil +} + +func newPolicyAuthRequest(t *testing.T, method string) *http.Request { + t.Helper() + req, err := http.NewRequest(method, "http://s3.amazonaws.com/test-bucket/test-object", nil) + require.NoError(t, err) + return req +} + func TestIdentityListFileFormat(t *testing.T) { s3ApiConfiguration := &iam_pb.S3ApiConfiguration{} @@ -374,6 +428,25 @@ func TestVerifyActionPermissionPolicyFallback(t *testing.T) { assert.Equal(t, s3err.ErrNone, errCode) }) + t.Run("attached policies override coarse legacy actions", func(t *testing.T) { + iam := &IdentityAccessManagement{} + err := iam.PutPolicy("putOnly", `{"Version":"2012-10-17","Statement":[{"Effect":"Allow","Action":"s3:PutObject","Resource":"arn:aws:s3:::test-bucket/*"}]}`) + assert.NoError(t, err) + + identity := &Identity{ + Name: "policy-user", + Account: &AccountAdmin, + Actions: []Action{"Write:test-bucket"}, + PolicyNames: []string{"putOnly"}, + } + + putErrCode := iam.VerifyActionPermission(buildRequest(t, http.MethodPut), identity, Action(ACTION_WRITE), "test-bucket", "test-object") + assert.Equal(t, s3err.ErrNone, putErrCode) + + deleteErrCode := iam.VerifyActionPermission(buildRequest(t, http.MethodDelete), identity, Action(ACTION_WRITE), "test-bucket", "test-object") + assert.Equal(t, s3err.ErrAccessDenied, deleteErrCode) + }) + t.Run("valid policy updated to invalid denies access", func(t *testing.T) { iam := &IdentityAccessManagement{} err := iam.PutPolicy("myPolicy", `{"Version":"2012-10-17","Statement":[{"Effect":"Allow","Action":"s3:GetObject","Resource":"arn:aws:s3:::test-bucket/*"}]}`) @@ -409,6 +482,288 @@ func TestVerifyActionPermissionPolicyFallback(t *testing.T) { }) } +func TestLoadS3ApiConfigurationFromCredentialManagerHydratesManagedPolicies(t *testing.T) { + baseStore := &memory.MemoryStore{} + assert.NoError(t, baseStore.Initialize(nil, "")) + + store := &loadConfigurationDropsPoliciesStore{MemoryStore: baseStore} + cm := &credential.CredentialManager{Store: store} + + config := &iam_pb.S3ApiConfiguration{ + Identities: []*iam_pb.Identity{ + { + Name: "managed-user", + PolicyNames: []string{"managedGet"}, + Credentials: []*iam_pb.Credential{ + {AccessKey: "AKIAMANAGED000001", SecretKey: "managed-secret"}, + }, + }, + }, + Policies: []*iam_pb.Policy{ + { + Name: "managedGet", + Content: `{"Version":"2012-10-17","Statement":[{"Effect":"Allow","Action":"s3:GetObject","Resource":"arn:aws:s3:::test-bucket/*"}]}`, + }, + }, + } + assert.NoError(t, cm.SaveConfiguration(context.Background(), config)) + + iam := &IdentityAccessManagement{credentialManager: cm} + assert.NoError(t, iam.LoadS3ApiConfigurationFromCredentialManager()) + assert.True(t, store.loadManagedPoliciesCalled) + + identity := iam.lookupByIdentityName("managed-user") + if !assert.NotNil(t, identity) { + return + } + + errCode := iam.VerifyActionPermission(newPolicyAuthRequest(t, http.MethodGet), identity, Action(ACTION_READ), "test-bucket", "test-object") + assert.Equal(t, s3err.ErrNone, errCode) +} + +func TestLoadS3ApiConfigurationFromCredentialManagerHydratesManagedPoliciesThroughPropagatingStore(t *testing.T) { + baseStore := &memory.MemoryStore{} + assert.NoError(t, baseStore.Initialize(nil, "")) + + upstream := &loadConfigurationDropsPoliciesStore{MemoryStore: baseStore} + wrappedStore := credential.NewPropagatingCredentialStore(upstream, nil, nil) + cm := &credential.CredentialManager{Store: wrappedStore} + + config := &iam_pb.S3ApiConfiguration{ + Identities: []*iam_pb.Identity{ + { + Name: "managed-user", + PolicyNames: []string{"managedGet"}, + Credentials: []*iam_pb.Credential{ + {AccessKey: "AKIAMANAGED000010", SecretKey: "managed-secret"}, + }, + }, + }, + Policies: []*iam_pb.Policy{ + { + Name: "managedGet", + Content: `{"Version":"2012-10-17","Statement":[{"Effect":"Allow","Action":"s3:GetObject","Resource":"arn:aws:s3:::test-bucket/*"}]}`, + }, + }, + } + assert.NoError(t, cm.SaveConfiguration(context.Background(), config)) + + iam := &IdentityAccessManagement{credentialManager: cm} + assert.NoError(t, iam.LoadS3ApiConfigurationFromCredentialManager()) + assert.True(t, upstream.loadManagedPoliciesCalled) + + identity := iam.lookupByIdentityName("managed-user") + if !assert.NotNil(t, identity) { + return + } + + errCode := iam.VerifyActionPermission(newPolicyAuthRequest(t, http.MethodGet), identity, Action(ACTION_READ), "test-bucket", "test-object") + assert.Equal(t, s3err.ErrNone, errCode) +} + +func TestLoadS3ApiConfigurationFromCredentialManagerSyncsPoliciesToIAMManager(t *testing.T) { + ctx := context.Background() + baseStore := &memory.MemoryStore{} + assert.NoError(t, baseStore.Initialize(nil, "")) + + cm := &credential.CredentialManager{Store: baseStore} + config := &iam_pb.S3ApiConfiguration{ + Identities: []*iam_pb.Identity{ + { + Name: "managed-user", + PolicyNames: []string{"managedPut"}, + Credentials: []*iam_pb.Credential{ + {AccessKey: "AKIAMANAGED000002", SecretKey: "managed-secret"}, + }, + }, + }, + Policies: []*iam_pb.Policy{ + { + Name: "managedPut", + Content: `{"Version":"2012-10-17","Statement":[{"Effect":"Allow","Action":["s3:PutObject","s3:ListBucket"],"Resource":["arn:aws:s3:::cli-allowed-bucket","arn:aws:s3:::cli-allowed-bucket/*"]}]}`, + }, + }, + } + assert.NoError(t, cm.SaveConfiguration(ctx, config)) + + iamManager, err := loadIAMManagerFromConfig("", func() string { return "localhost:8888" }, func() string { + return "fallback-key-for-zero-config" + }) + assert.NoError(t, err) + iamManager.SetUserStore(cm) + + iam := &IdentityAccessManagement{credentialManager: cm} + iam.SetIAMIntegration(NewS3IAMIntegration(iamManager, "")) + + assert.NoError(t, iam.LoadS3ApiConfigurationFromCredentialManager()) + + identity := iam.lookupByIdentityName("managed-user") + if !assert.NotNil(t, identity) { + return + } + + allowedErrCode := iam.VerifyActionPermission(newPolicyAuthRequest(t, http.MethodPut), identity, Action(ACTION_WRITE), "cli-allowed-bucket", "test-object") + assert.Equal(t, s3err.ErrNone, allowedErrCode) + + forbiddenErrCode := iam.VerifyActionPermission(newPolicyAuthRequest(t, http.MethodPut), identity, Action(ACTION_WRITE), "cli-forbidden-bucket", "test-object") + assert.Equal(t, s3err.ErrAccessDenied, forbiddenErrCode) +} + +func TestLoadS3ApiConfigurationFromCredentialManagerHydratesInlinePolicies(t *testing.T) { + baseStore := &memory.MemoryStore{} + assert.NoError(t, baseStore.Initialize(nil, "")) + + inlinePolicy := policy_engine.PolicyDocument{ + Version: policy_engine.PolicyVersion2012_10_17, + Statement: []policy_engine.PolicyStatement{ + { + Effect: policy_engine.PolicyEffectAllow, + Action: policy_engine.NewStringOrStringSlice("s3:PutObject"), + Resource: policy_engine.NewStringOrStringSlice("arn:aws:s3:::test-bucket/*"), + }, + }, + } + + store := &inlinePolicyRuntimeStore{ + MemoryStore: baseStore, + inlinePolicies: map[string]map[string]policy_engine.PolicyDocument{ + "inline-user": { + "PutOnly": inlinePolicy, + }, + }, + } + cm := &credential.CredentialManager{Store: store} + + config := &iam_pb.S3ApiConfiguration{ + Identities: []*iam_pb.Identity{ + { + Name: "inline-user", + Actions: []string{"Write:test-bucket"}, + Credentials: []*iam_pb.Credential{ + {AccessKey: "AKIAINLINE0000001", SecretKey: "inline-secret"}, + }, + }, + }, + } + assert.NoError(t, cm.SaveConfiguration(context.Background(), config)) + + iam := &IdentityAccessManagement{credentialManager: cm} + assert.NoError(t, iam.LoadS3ApiConfigurationFromCredentialManager()) + + identity := iam.lookupByIdentityName("inline-user") + if !assert.NotNil(t, identity) { + return + } + assert.Contains(t, identity.PolicyNames, inlinePolicyRuntimeName("inline-user", "PutOnly")) + + putErrCode := iam.VerifyActionPermission(newPolicyAuthRequest(t, http.MethodPut), identity, Action(ACTION_WRITE), "test-bucket", "test-object") + assert.Equal(t, s3err.ErrNone, putErrCode) + + deleteErrCode := iam.VerifyActionPermission(newPolicyAuthRequest(t, http.MethodDelete), identity, Action(ACTION_WRITE), "test-bucket", "test-object") + assert.Equal(t, s3err.ErrAccessDenied, deleteErrCode) +} + +func TestLoadS3ApiConfigurationFromCredentialManagerHydratesInlinePoliciesThroughPropagatingStore(t *testing.T) { + baseStore := &memory.MemoryStore{} + assert.NoError(t, baseStore.Initialize(nil, "")) + + inlinePolicy := policy_engine.PolicyDocument{ + Version: policy_engine.PolicyVersion2012_10_17, + Statement: []policy_engine.PolicyStatement{ + { + Effect: policy_engine.PolicyEffectAllow, + Action: policy_engine.NewStringOrStringSlice("s3:PutObject"), + Resource: policy_engine.NewStringOrStringSlice("arn:aws:s3:::test-bucket/*"), + }, + }, + } + + upstream := &inlinePolicyRuntimeStore{ + MemoryStore: baseStore, + inlinePolicies: map[string]map[string]policy_engine.PolicyDocument{ + "inline-user": { + "PutOnly": inlinePolicy, + }, + }, + } + wrappedStore := credential.NewPropagatingCredentialStore(upstream, nil, nil) + cm := &credential.CredentialManager{Store: wrappedStore} + + config := &iam_pb.S3ApiConfiguration{ + Identities: []*iam_pb.Identity{ + { + Name: "inline-user", + Actions: []string{"Write:test-bucket"}, + Credentials: []*iam_pb.Credential{ + {AccessKey: "AKIAINLINE0000010", SecretKey: "inline-secret"}, + }, + }, + }, + } + assert.NoError(t, cm.SaveConfiguration(context.Background(), config)) + + iam := &IdentityAccessManagement{credentialManager: cm} + assert.NoError(t, iam.LoadS3ApiConfigurationFromCredentialManager()) + + identity := iam.lookupByIdentityName("inline-user") + if !assert.NotNil(t, identity) { + return + } + assert.Contains(t, identity.PolicyNames, inlinePolicyRuntimeName("inline-user", "PutOnly")) + + putErrCode := iam.VerifyActionPermission(newPolicyAuthRequest(t, http.MethodPut), identity, Action(ACTION_WRITE), "test-bucket", "test-object") + assert.Equal(t, s3err.ErrNone, putErrCode) + + deleteErrCode := iam.VerifyActionPermission(newPolicyAuthRequest(t, http.MethodDelete), identity, Action(ACTION_WRITE), "test-bucket", "test-object") + assert.Equal(t, s3err.ErrAccessDenied, deleteErrCode) +} + +func TestLoadConfigurationDropsPoliciesStoreDoesNotMutateSourceConfig(t *testing.T) { + baseStore := &memory.MemoryStore{} + require.NoError(t, baseStore.Initialize(nil, "")) + + config := &iam_pb.S3ApiConfiguration{ + Policies: []*iam_pb.Policy{ + {Name: "managedGet", Content: `{"Version":"2012-10-17","Statement":[]}`}, + }, + } + require.NoError(t, baseStore.SaveConfiguration(context.Background(), config)) + + store := &loadConfigurationDropsPoliciesStore{MemoryStore: baseStore} + + stripped, err := store.LoadConfiguration(context.Background()) + require.NoError(t, err) + assert.Nil(t, stripped.Policies) + + source, err := baseStore.LoadConfiguration(context.Background()) + require.NoError(t, err) + require.Len(t, source.Policies, 1) + assert.Equal(t, "managedGet", source.Policies[0].Name) +} + +func TestMergePoliciesIntoConfigurationSkipsNilPolicies(t *testing.T) { + config := &iam_pb.S3ApiConfiguration{ + Policies: []*iam_pb.Policy{ + nil, + {Name: "existing", Content: "old"}, + }, + } + + mergePoliciesIntoConfiguration(config, []*iam_pb.Policy{ + nil, + {Name: "", Content: "ignored"}, + {Name: "existing", Content: "updated"}, + {Name: "new", Content: "created"}, + }) + + require.Len(t, config.Policies, 3) + assert.Nil(t, config.Policies[0]) + assert.Equal(t, "existing", config.Policies[1].Name) + assert.Equal(t, "updated", config.Policies[1].Content) + assert.Equal(t, "new", config.Policies[2].Name) + assert.Equal(t, "created", config.Policies[2].Content) +} + type LoadS3ApiConfigurationTestCase struct { pbAccount *iam_pb.Account pbIdent *iam_pb.Identity diff --git a/weed/s3api/s3_iam_middleware.go b/weed/s3api/s3_iam_middleware.go index d3489f461..532669f29 100644 --- a/weed/s3api/s3_iam_middleware.go +++ b/weed/s3api/s3_iam_middleware.go @@ -248,7 +248,7 @@ func (s3iam *S3IAMIntegration) AuthorizeAction(ctx context.Context, identity *IA return s3err.ErrNone // Fallback to existing authorization } - if identity.SessionToken == "" { + if identity == nil || identity.Principal == "" { return s3err.ErrAccessDenied } @@ -292,9 +292,12 @@ func (s3iam *S3IAMIntegration) AuthorizeAction(ctx context.Context, identity *IA // Create action request actionRequest := &integration.ActionRequest{ - Principal: identity.Principal, - Action: specificAction, - Resource: resourceArn, + Principal: identity.Principal, + Action: specificAction, + Resource: resourceArn, + // Static SigV4 IAM users do not carry a session token. IAMManager + // evaluates their attached policies directly and only validates STS/OIDC + // session state when a token is actually present. SessionToken: identity.SessionToken, RequestContext: requestContext, PolicyNames: identity.PolicyNames, diff --git a/weed/s3api/s3_iam_simple_test.go b/weed/s3api/s3_iam_simple_test.go index cb0d084ce..c2c68321f 100644 --- a/weed/s3api/s3_iam_simple_test.go +++ b/weed/s3api/s3_iam_simple_test.go @@ -13,16 +13,15 @@ import ( "github.com/seaweedfs/seaweedfs/weed/iam/sts" "github.com/seaweedfs/seaweedfs/weed/iam/utils" "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3err" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) -// TestS3IAMMiddleware tests the basic S3 IAM middleware functionality -func TestS3IAMMiddleware(t *testing.T) { - // Create IAM manager - iamManager := integration.NewIAMManager() +func newTestS3IAMManagerWithDefaultEffect(t *testing.T, defaultEffect string) *integration.IAMManager { + t.Helper() - // Initialize with test configuration + iamManager := integration.NewIAMManager() config := &integration.IAMConfig{ STS: &sts.STSConfig{ TokenDuration: sts.FlexibleDuration{Duration: time.Hour}, @@ -31,7 +30,7 @@ func TestS3IAMMiddleware(t *testing.T) { SigningKey: []byte("test-signing-key-32-characters-long"), }, Policy: &policy.PolicyEngineConfig{ - DefaultEffect: "Deny", + DefaultEffect: defaultEffect, StoreType: "memory", }, Roles: &integration.RoleStoreConfig{ @@ -40,10 +39,22 @@ func TestS3IAMMiddleware(t *testing.T) { } err := iamManager.Initialize(config, func() string { - return "localhost:8888" // Mock filer address for testing + return "localhost:8888" }) require.NoError(t, err) + return iamManager +} + +func newTestS3IAMManager(t *testing.T) *integration.IAMManager { + t.Helper() + return newTestS3IAMManagerWithDefaultEffect(t, "Deny") +} + +// TestS3IAMMiddleware tests the basic S3 IAM middleware functionality +func TestS3IAMMiddleware(t *testing.T) { + iamManager := newTestS3IAMManager(t) + // Create S3 IAM integration s3IAMIntegration := NewS3IAMIntegration(iamManager, "localhost:8888") @@ -52,6 +63,74 @@ func TestS3IAMMiddleware(t *testing.T) { assert.True(t, s3IAMIntegration.enabled) } +func TestS3IAMMiddlewareStaticV4ManagedPolicies(t *testing.T) { + ctx := context.Background() + iamManager := newTestS3IAMManager(t) + + allowPolicy := &policy.PolicyDocument{ + Version: "2012-10-17", + Statement: []policy.Statement{ + { + Effect: "Allow", + Action: policy.StringList{"s3:PutObject", "s3:ListBucket"}, + Resource: policy.StringList{"arn:aws:s3:::cli-allowed-bucket", "arn:aws:s3:::cli-allowed-bucket/*"}, + }, + }, + } + require.NoError(t, iamManager.CreatePolicy(ctx, "localhost:8888", "cli-bucket-access-policy", allowPolicy)) + + s3IAMIntegration := NewS3IAMIntegration(iamManager, "localhost:8888") + identity := &IAMIdentity{ + Name: "cli-test-user", + Principal: "arn:aws:iam::000000000000:user/cli-test-user", + PolicyNames: []string{"cli-bucket-access-policy"}, + } + + putReq := httptest.NewRequest(http.MethodPut, "http://example.com/cli-allowed-bucket/test-file.txt", http.NoBody) + putErrCode := s3IAMIntegration.AuthorizeAction(ctx, identity, s3_constants.ACTION_WRITE, "cli-allowed-bucket", "test-file.txt", putReq) + assert.Equal(t, s3err.ErrNone, putErrCode) + + listReq := httptest.NewRequest(http.MethodGet, "http://example.com/cli-allowed-bucket/", http.NoBody) + listErrCode := s3IAMIntegration.AuthorizeAction(ctx, identity, s3_constants.ACTION_LIST, "cli-allowed-bucket", "", listReq) + assert.Equal(t, s3err.ErrNone, listErrCode) +} + +func TestS3IAMMiddlewareAttachedPoliciesRestrictDefaultAllow(t *testing.T) { + ctx := context.Background() + iamManager := newTestS3IAMManagerWithDefaultEffect(t, "Allow") + + allowPolicy := &policy.PolicyDocument{ + Version: "2012-10-17", + Statement: []policy.Statement{ + { + Effect: "Allow", + Action: policy.StringList{"s3:PutObject", "s3:ListBucket"}, + Resource: policy.StringList{"arn:aws:s3:::cli-allowed-bucket", "arn:aws:s3:::cli-allowed-bucket/*"}, + }, + }, + } + require.NoError(t, iamManager.CreatePolicy(ctx, "localhost:8888", "cli-bucket-access-policy", allowPolicy)) + + s3IAMIntegration := NewS3IAMIntegration(iamManager, "localhost:8888") + identity := &IAMIdentity{ + Name: "cli-test-user", + Principal: "arn:aws:iam::000000000000:user/cli-test-user", + PolicyNames: []string{"cli-bucket-access-policy"}, + } + + allowedReq := httptest.NewRequest(http.MethodPut, "http://example.com/cli-allowed-bucket/test-file.txt", http.NoBody) + allowedErrCode := s3IAMIntegration.AuthorizeAction(ctx, identity, s3_constants.ACTION_WRITE, "cli-allowed-bucket", "test-file.txt", allowedReq) + assert.Equal(t, s3err.ErrNone, allowedErrCode) + + forbiddenReq := httptest.NewRequest(http.MethodPut, "http://example.com/cli-forbidden-bucket/forbidden-file.txt", http.NoBody) + forbiddenErrCode := s3IAMIntegration.AuthorizeAction(ctx, identity, s3_constants.ACTION_WRITE, "cli-forbidden-bucket", "forbidden-file.txt", forbiddenReq) + assert.Equal(t, s3err.ErrAccessDenied, forbiddenErrCode) + + forbiddenListReq := httptest.NewRequest(http.MethodGet, "http://example.com/cli-forbidden-bucket/", http.NoBody) + forbiddenListErrCode := s3IAMIntegration.AuthorizeAction(ctx, identity, s3_constants.ACTION_LIST, "cli-forbidden-bucket", "", forbiddenListReq) + assert.Equal(t, s3err.ErrAccessDenied, forbiddenListErrCode) +} + // TestS3IAMMiddlewareJWTAuth tests JWT authentication func TestS3IAMMiddlewareJWTAuth(t *testing.T) { // Skip for now since it requires full setup diff --git a/weed/s3api/s3_presigned_url_iam.go b/weed/s3api/s3_presigned_url_iam.go index 82ccdcb6c..b731b1634 100644 --- a/weed/s3api/s3_presigned_url_iam.go +++ b/weed/s3api/s3_presigned_url_iam.go @@ -101,21 +101,10 @@ func (pm *S3PresignedURLManager) GeneratePresignedURLWithIAM(ctx context.Context if pm.s3iam == nil || !pm.s3iam.enabled { return nil, fmt.Errorf("IAM integration not enabled") } - - // Validate session token and get identity - // Use a proper ARN format for the principal - principalArn := fmt.Sprintf("arn:aws:sts::assumed-role/PresignedUser/presigned-session") - iamIdentity := &IAMIdentity{ - SessionToken: req.SessionToken, - Principal: principalArn, - Name: "presigned-user", - Account: &AccountAdmin, + if req == nil || strings.TrimSpace(req.SessionToken) == "" { + return nil, fmt.Errorf("IAM authorization failed: session token is required") } - // Determine S3 action from method - action := determineS3ActionFromMethodAndPath(req.Method, req.Bucket, req.ObjectKey) - - // Check IAM permissions before generating URL authRequest := &http.Request{ Method: req.Method, URL: &url.URL{Path: "/" + req.Bucket + "/" + req.ObjectKey}, @@ -124,7 +113,16 @@ func (pm *S3PresignedURLManager) GeneratePresignedURLWithIAM(ctx context.Context authRequest.Header.Set("Authorization", "Bearer "+req.SessionToken) authRequest = authRequest.WithContext(ctx) - errCode := pm.s3iam.AuthorizeAction(ctx, iamIdentity, action, req.Bucket, req.ObjectKey, authRequest) + iamIdentity, errCode := pm.s3iam.AuthenticateJWT(ctx, authRequest) + if errCode != s3err.ErrNone { + return nil, fmt.Errorf("IAM authorization failed: invalid session token") + } + + // Determine S3 action from method + action := determineS3ActionFromMethodAndPath(req.Method, req.Bucket, req.ObjectKey) + + // Check IAM permissions before generating URL + errCode = pm.s3iam.AuthorizeAction(ctx, iamIdentity, action, req.Bucket, req.ObjectKey, authRequest) if errCode != s3err.ErrNone { return nil, fmt.Errorf("IAM authorization failed: user does not have permission for action %s on resource %s/%s", action, req.Bucket, req.ObjectKey) } diff --git a/weed/s3api/s3_presigned_url_iam_test.go b/weed/s3api/s3_presigned_url_iam_test.go index 2a2686f7b..5d50f06dc 100644 --- a/weed/s3api/s3_presigned_url_iam_test.go +++ b/weed/s3api/s3_presigned_url_iam_test.go @@ -220,6 +220,35 @@ func TestPresignedURLGeneration(t *testing.T) { } } +func TestPresignedURLGenerationUsesAuthenticatedPrincipal(t *testing.T) { + iamManager := setupTestIAMManagerForPresigned(t) + s3iam := NewS3IAMIntegration(iamManager, "localhost:8888") + s3iam.enabled = true + presignedManager := NewS3PresignedURLManager(s3iam) + + ctx := context.Background() + setupTestRolesForPresigned(ctx, iamManager) + + validJWTToken := createTestJWTPresigned(t, "https://test-issuer.com", "test-user-123", "test-signing-key") + + response, err := iamManager.AssumeRoleWithWebIdentity(ctx, &sts.AssumeRoleWithWebIdentityRequest{ + RoleArn: "arn:aws:iam::role/S3ReadOnlyRole", + WebIdentityToken: validJWTToken, + RoleSessionName: "presigned-read-only-session", + }) + require.NoError(t, err) + + _, err = presignedManager.GeneratePresignedURLWithIAM(ctx, &PresignedURLRequest{ + Method: "PUT", + Bucket: "test-bucket", + ObjectKey: "new-file.txt", + Expiration: time.Hour, + SessionToken: response.Credentials.SessionToken, + }, "http://localhost:8333") + require.Error(t, err) + assert.Contains(t, err.Error(), "IAM authorization failed") +} + // TestPresignedURLExpiration tests URL expiration validation func TestPresignedURLExpiration(t *testing.T) { tests := []struct { diff --git a/weed/s3api/s3api_bucket_handlers.go b/weed/s3api/s3api_bucket_handlers.go index 2401901d9..c4cfd1bd9 100644 --- a/weed/s3api/s3api_bucket_handlers.go +++ b/weed/s3api/s3api_bucket_handlers.go @@ -88,20 +88,7 @@ func (s3a *S3ApiServer) ListBucketsHandler(w http.ResponseWriter, r *http.Reques // Skip permission check if user is already the owner (optimization) if !isOwner { - hasPermission := false - // Check permissions for each bucket - // For JWT-authenticated users, use IAM authorization - sessionToken := r.Header.Get("X-SeaweedFS-Session-Token") - if s3a.iam.iamIntegration != nil && sessionToken != "" { - // Use IAM authorization for JWT users - errCode := s3a.iam.authorizeWithIAM(r, identity, s3_constants.ACTION_LIST, entry.Name, "") - hasPermission = (errCode == s3err.ErrNone) - } else { - // Use legacy authorization for non-JWT users - hasPermission = identity.CanDo(s3_constants.ACTION_LIST, entry.Name, "") - } - - if !hasPermission { + if errCode := s3a.iam.VerifyActionPermission(r, identity, s3_constants.ACTION_LIST, entry.Name, ""); errCode != s3err.ErrNone { continue } } diff --git a/weed/s3api/s3api_bucket_handlers_test.go b/weed/s3api/s3api_bucket_handlers_test.go index 3e73e4768..ee79381b3 100644 --- a/weed/s3api/s3api_bucket_handlers_test.go +++ b/weed/s3api/s3api_bucket_handlers_test.go @@ -1043,3 +1043,43 @@ func TestListBucketsIssue7796(t *testing.T) { "geoserver should NOT see buckets they neither own nor have permission for") }) } + +func TestListBucketsIssue8516PolicyBasedVisibility(t *testing.T) { + iam := &IdentityAccessManagement{} + require.NoError(t, iam.PutPolicy("listOnly", `{"Version":"2012-10-17","Statement":[{"Effect":"Allow","Action":"s3:ListBucket","Resource":"arn:aws:s3:::policy-bucket"}]}`)) + + identity := &Identity{ + Name: "policy-user", + Account: &AccountAdmin, + PolicyNames: []string{"listOnly"}, + } + + req := httptest.NewRequest("GET", "http://s3.amazonaws.com/", nil) + buckets := []*filer_pb.Entry{ + { + Name: "policy-bucket", + IsDirectory: true, + Extended: map[string][]byte{s3_constants.AmzIdentityId: []byte("admin")}, + Attributes: &filer_pb.FuseAttributes{Crtime: time.Now().Unix()}, + }, + { + Name: "other-bucket", + IsDirectory: true, + Extended: map[string][]byte{s3_constants.AmzIdentityId: []byte("admin")}, + Attributes: &filer_pb.FuseAttributes{Crtime: time.Now().Unix()}, + }, + } + + var visibleBuckets []string + for _, entry := range buckets { + isOwner := isBucketOwnedByIdentity(entry, identity) + if !isOwner { + if errCode := iam.VerifyActionPermission(req, identity, s3_constants.ACTION_LIST, entry.Name, ""); errCode != s3err.ErrNone { + continue + } + } + visibleBuckets = append(visibleBuckets, entry.Name) + } + + assert.Equal(t, []string{"policy-bucket"}, visibleBuckets) +} diff --git a/weed/s3api/s3api_circuit_breaker.go b/weed/s3api/s3api_circuit_breaker.go index 0b6dc160e..9296036af 100644 --- a/weed/s3api/s3api_circuit_breaker.go +++ b/weed/s3api/s3api_circuit_breaker.go @@ -1,6 +1,7 @@ package s3api import ( + "context" "errors" "fmt" "net/http" @@ -34,7 +35,7 @@ func NewCircuitBreaker(option *S3ApiServerOption) *CircuitBreaker { // Use WithOneOfGrpcFilerClients to support multiple filers with failover err := pb.WithOneOfGrpcFilerClients(false, option.Filers, option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error { - content, err := filer.ReadInsideFiler(client, s3_constants.CircuitBreakerConfigDir, s3_constants.CircuitBreakerConfigFile) + content, err := filer.ReadInsideFiler(context.Background(), client, s3_constants.CircuitBreakerConfigDir, s3_constants.CircuitBreakerConfigFile) if errors.Is(err, filer_pb.ErrNotFound) { return nil }