Browse Source

iam: add group CRUD to CredentialStore interface and all backends

Add group management methods (CreateGroup, GetGroup, DeleteGroup,
ListGroups, UpdateGroup) to the CredentialStore interface with
implementations for memory, filer_etc, postgres, and grpc stores.
Wire group loading/saving into filer_etc LoadConfiguration and
SaveConfiguration.
pull/8560/head
Chris Lu 2 days ago
parent
commit
6d8e36a415
  1. 10
      weed/credential/credential_store.go
  2. 158
      weed/credential/filer_etc/filer_etc_group.go
  3. 43
      weed/credential/filer_etc/filer_etc_identity.go
  4. 75
      weed/credential/grpc/grpc_group.go
  5. 62
      weed/credential/memory/memory_group.go
  6. 4
      weed/credential/memory/memory_store.go
  7. 116
      weed/credential/postgres/postgres_group.go
  8. 16
      weed/credential/postgres/postgres_store.go

10
weed/credential/credential_store.go

@ -18,6 +18,9 @@ var (
ErrPolicyNotFound = errors.New("policy not found")
ErrPolicyAlreadyAttached = errors.New("policy already attached")
ErrPolicyNotAttached = errors.New("policy not attached to user")
ErrGroupNotFound = errors.New("group not found")
ErrGroupAlreadyExists = errors.New("group already exists")
ErrUserNotInGroup = errors.New("user is not a member of the group")
)
// CredentialStoreTypeName represents the type name of a credential store
@ -94,6 +97,13 @@ type CredentialStore interface {
// ListAttachedUserPolicies returns the list of policy names attached to a user
ListAttachedUserPolicies(ctx context.Context, username string) ([]string, error)
// Group Management
CreateGroup(ctx context.Context, group *iam_pb.Group) error
GetGroup(ctx context.Context, groupName string) (*iam_pb.Group, error)
DeleteGroup(ctx context.Context, groupName string) error
ListGroups(ctx context.Context) ([]string, error)
UpdateGroup(ctx context.Context, group *iam_pb.Group) error
// Shutdown performs cleanup when the store is being shut down
Shutdown()
}

158
weed/credential/filer_etc/filer_etc_group.go

@ -0,0 +1,158 @@
package filer_etc
import (
"context"
"encoding/json"
"errors"
"fmt"
"strings"
"github.com/seaweedfs/seaweedfs/weed/credential"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/iam_pb"
)
const IamGroupsDirectory = "groups"
func (store *FilerEtcStore) loadGroupsFromMultiFile(ctx context.Context, s3cfg *iam_pb.S3ApiConfiguration) error {
return store.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
dir := filer.IamConfigDirectory + "/" + IamGroupsDirectory
entries, err := listEntries(ctx, client, dir)
if err != nil {
if err == filer_pb.ErrNotFound {
return nil
}
return err
}
for _, entry := range entries {
if entry.IsDirectory {
continue
}
var content []byte
if len(entry.Content) > 0 {
content = entry.Content
} else {
c, err := filer.ReadInsideFiler(ctx, client, dir, entry.Name)
if err != nil {
glog.Warningf("Failed to read group file %s: %v", entry.Name, err)
continue
}
content = c
}
if len(content) > 0 {
g := &iam_pb.Group{}
if err := json.Unmarshal(content, g); err != nil {
glog.Warningf("Failed to unmarshal group %s: %v", entry.Name, err)
continue
}
s3cfg.Groups = append(s3cfg.Groups, g)
}
}
return nil
})
}
func (store *FilerEtcStore) saveGroup(ctx context.Context, group *iam_pb.Group) error {
if group == nil {
return fmt.Errorf("group is nil")
}
return store.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
data, err := json.MarshalIndent(group, "", " ")
if err != nil {
return err
}
return filer.SaveInsideFiler(client, filer.IamConfigDirectory+"/"+IamGroupsDirectory, group.Name+".json", data)
})
}
func (store *FilerEtcStore) deleteGroupFile(ctx context.Context, groupName string) error {
return store.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.DeleteEntry(ctx, &filer_pb.DeleteEntryRequest{
Directory: filer.IamConfigDirectory + "/" + IamGroupsDirectory,
Name: groupName + ".json",
})
if err != nil {
if strings.Contains(err.Error(), filer_pb.ErrNotFound.Error()) {
return credential.ErrGroupNotFound
}
return err
}
if resp != nil && resp.Error != "" {
if strings.Contains(resp.Error, filer_pb.ErrNotFound.Error()) {
return credential.ErrGroupNotFound
}
return fmt.Errorf("delete group %s: %s", groupName, resp.Error)
}
return nil
})
}
func (store *FilerEtcStore) CreateGroup(ctx context.Context, group *iam_pb.Group) error {
existing, err := store.GetGroup(ctx, group.Name)
if err != nil {
if !errors.Is(err, credential.ErrGroupNotFound) {
return err
}
} else if existing != nil {
return credential.ErrGroupAlreadyExists
}
return store.saveGroup(ctx, group)
}
func (store *FilerEtcStore) GetGroup(ctx context.Context, groupName string) (*iam_pb.Group, error) {
var group *iam_pb.Group
err := store.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
data, err := filer.ReadInsideFiler(ctx, client, filer.IamConfigDirectory+"/"+IamGroupsDirectory, groupName+".json")
if err != nil {
if err == filer_pb.ErrNotFound {
return credential.ErrGroupNotFound
}
return err
}
if len(data) == 0 {
return credential.ErrGroupNotFound
}
group = &iam_pb.Group{}
return json.Unmarshal(data, group)
})
return group, err
}
func (store *FilerEtcStore) DeleteGroup(ctx context.Context, groupName string) error {
if _, err := store.GetGroup(ctx, groupName); err != nil {
return err
}
return store.deleteGroupFile(ctx, groupName)
}
func (store *FilerEtcStore) ListGroups(ctx context.Context) ([]string, error) {
var names []string
err := store.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
entries, err := listEntries(ctx, client, filer.IamConfigDirectory+"/"+IamGroupsDirectory)
if err != nil {
if err == filer_pb.ErrNotFound {
return nil
}
return err
}
for _, entry := range entries {
if !entry.IsDirectory && strings.HasSuffix(entry.Name, ".json") {
names = append(names, strings.TrimSuffix(entry.Name, ".json"))
}
}
return nil
})
return names, err
}
func (store *FilerEtcStore) UpdateGroup(ctx context.Context, group *iam_pb.Group) error {
if _, err := store.GetGroup(ctx, group.Name); err != nil {
return err
}
return store.saveGroup(ctx, group)
}

43
weed/credential/filer_etc/filer_etc_identity.go

@ -45,6 +45,11 @@ func (store *FilerEtcStore) LoadConfiguration(ctx context.Context) (*iam_pb.S3Ap
return s3cfg, fmt.Errorf("failed to load service accounts: %w", err)
}
// 3b. Load groups
if err := store.loadGroupsFromMultiFile(ctx, s3cfg); err != nil {
return s3cfg, fmt.Errorf("failed to load groups: %w", err)
}
// 4. Perform migration if we loaded legacy config
// This ensures that all identities (including legacy ones) are written to individual files
// and the legacy file is renamed.
@ -171,6 +176,13 @@ func (store *FilerEtcStore) SaveConfiguration(ctx context.Context, config *iam_p
}
}
// 2b. Save all groups
for _, g := range config.Groups {
if err := store.saveGroup(ctx, g); err != nil {
return err
}
}
// 3. Cleanup removed identities (Full Sync)
if err := store.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
dir := filer.IamConfigDirectory + "/" + IamIdentitiesDirectory
@ -234,6 +246,37 @@ func (store *FilerEtcStore) SaveConfiguration(ctx context.Context, config *iam_p
return err
}
// 5. Cleanup removed groups (Full Sync)
if err := store.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
dir := filer.IamConfigDirectory + "/" + IamGroupsDirectory
entries, err := listEntries(ctx, client, dir)
if err != nil {
if err == filer_pb.ErrNotFound {
return nil
}
return err
}
validNames := make(map[string]bool)
for _, g := range config.Groups {
validNames[g.Name+".json"] = true
}
for _, entry := range entries {
if !entry.IsDirectory && !validNames[entry.Name] {
if _, err := client.DeleteEntry(ctx, &filer_pb.DeleteEntryRequest{
Directory: dir,
Name: entry.Name,
}); err != nil {
glog.Warningf("Failed to delete obsolete group file %s: %v", entry.Name, err)
}
}
}
return nil
}); err != nil {
return err
}
return nil
}

75
weed/credential/grpc/grpc_group.go

@ -0,0 +1,75 @@
package grpc
import (
"context"
"github.com/seaweedfs/seaweedfs/weed/credential"
"github.com/seaweedfs/seaweedfs/weed/pb/iam_pb"
)
func (store *IamGrpcStore) CreateGroup(ctx context.Context, group *iam_pb.Group) error {
config, err := store.LoadConfiguration(ctx)
if err != nil {
return err
}
for _, g := range config.Groups {
if g.Name == group.Name {
return credential.ErrGroupAlreadyExists
}
}
config.Groups = append(config.Groups, group)
return store.SaveConfiguration(ctx, config)
}
func (store *IamGrpcStore) GetGroup(ctx context.Context, groupName string) (*iam_pb.Group, error) {
config, err := store.LoadConfiguration(ctx)
if err != nil {
return nil, err
}
for _, g := range config.Groups {
if g.Name == groupName {
return g, nil
}
}
return nil, credential.ErrGroupNotFound
}
func (store *IamGrpcStore) DeleteGroup(ctx context.Context, groupName string) error {
config, err := store.LoadConfiguration(ctx)
if err != nil {
return err
}
for i, g := range config.Groups {
if g.Name == groupName {
config.Groups = append(config.Groups[:i], config.Groups[i+1:]...)
return store.SaveConfiguration(ctx, config)
}
}
return credential.ErrGroupNotFound
}
func (store *IamGrpcStore) ListGroups(ctx context.Context) ([]string, error) {
config, err := store.LoadConfiguration(ctx)
if err != nil {
return nil, err
}
var names []string
for _, g := range config.Groups {
names = append(names, g.Name)
}
return names, nil
}
func (store *IamGrpcStore) UpdateGroup(ctx context.Context, group *iam_pb.Group) error {
config, err := store.LoadConfiguration(ctx)
if err != nil {
return err
}
for i, g := range config.Groups {
if g.Name == group.Name {
config.Groups[i] = group
return store.SaveConfiguration(ctx, config)
}
}
return credential.ErrGroupNotFound
}

62
weed/credential/memory/memory_group.go

@ -0,0 +1,62 @@
package memory
import (
"context"
"github.com/seaweedfs/seaweedfs/weed/credential"
"github.com/seaweedfs/seaweedfs/weed/pb/iam_pb"
)
func (store *MemoryStore) CreateGroup(ctx context.Context, group *iam_pb.Group) error {
store.mu.Lock()
defer store.mu.Unlock()
if _, exists := store.groups[group.Name]; exists {
return credential.ErrGroupAlreadyExists
}
store.groups[group.Name] = group
return nil
}
func (store *MemoryStore) GetGroup(ctx context.Context, groupName string) (*iam_pb.Group, error) {
store.mu.RLock()
defer store.mu.RUnlock()
if g, exists := store.groups[groupName]; exists {
return g, nil
}
return nil, credential.ErrGroupNotFound
}
func (store *MemoryStore) DeleteGroup(ctx context.Context, groupName string) error {
store.mu.Lock()
defer store.mu.Unlock()
if _, exists := store.groups[groupName]; !exists {
return credential.ErrGroupNotFound
}
delete(store.groups, groupName)
return nil
}
func (store *MemoryStore) ListGroups(ctx context.Context) ([]string, error) {
store.mu.RLock()
defer store.mu.RUnlock()
var names []string
for name := range store.groups {
names = append(names, name)
}
return names, nil
}
func (store *MemoryStore) UpdateGroup(ctx context.Context, group *iam_pb.Group) error {
store.mu.Lock()
defer store.mu.Unlock()
if _, exists := store.groups[group.Name]; !exists {
return credential.ErrGroupNotFound
}
store.groups[group.Name] = group
return nil
}

4
weed/credential/memory/memory_store.go

@ -23,6 +23,7 @@ type MemoryStore struct {
serviceAccounts map[string]*iam_pb.ServiceAccount // id -> service_account
serviceAccountAccessKeys map[string]string // access_key -> id
policies map[string]policy_engine.PolicyDocument // policy_name -> policy_document
groups map[string]*iam_pb.Group // group_name -> group
initialized bool
}
@ -43,6 +44,7 @@ func (store *MemoryStore) Initialize(configuration util.Configuration, prefix st
store.serviceAccounts = make(map[string]*iam_pb.ServiceAccount)
store.serviceAccountAccessKeys = make(map[string]string)
store.policies = make(map[string]policy_engine.PolicyDocument)
store.groups = make(map[string]*iam_pb.Group)
store.initialized = true
return nil
@ -57,6 +59,7 @@ func (store *MemoryStore) Shutdown() {
store.serviceAccounts = nil
store.serviceAccountAccessKeys = nil
store.policies = nil
store.groups = nil
store.initialized = false
}
@ -71,6 +74,7 @@ func (store *MemoryStore) Reset() {
store.serviceAccounts = make(map[string]*iam_pb.ServiceAccount)
store.serviceAccountAccessKeys = make(map[string]string)
store.policies = make(map[string]policy_engine.PolicyDocument)
store.groups = make(map[string]*iam_pb.Group)
}
}

116
weed/credential/postgres/postgres_group.go

@ -0,0 +1,116 @@
package postgres
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/credential"
"github.com/seaweedfs/seaweedfs/weed/pb/iam_pb"
)
func (store *PostgresStore) CreateGroup(ctx context.Context, group *iam_pb.Group) error {
membersJSON, err := json.Marshal(group.Members)
if err != nil {
return fmt.Errorf("failed to marshal members: %w", err)
}
policyNamesJSON, err := json.Marshal(group.PolicyNames)
if err != nil {
return fmt.Errorf("failed to marshal policy_names: %w", err)
}
_, err = store.db.ExecContext(ctx,
`INSERT INTO groups (name, members, policy_names, disabled) VALUES ($1, $2, $3, $4)`,
group.Name, membersJSON, policyNamesJSON, group.Disabled)
if err != nil {
// Check for unique constraint violation
return fmt.Errorf("failed to create group: %w", err)
}
return nil
}
func (store *PostgresStore) GetGroup(ctx context.Context, groupName string) (*iam_pb.Group, error) {
var membersJSON, policyNamesJSON []byte
var disabled bool
err := store.db.QueryRowContext(ctx,
`SELECT members, policy_names, disabled FROM groups WHERE name = $1`, groupName).
Scan(&membersJSON, &policyNamesJSON, &disabled)
if err != nil {
if err == sql.ErrNoRows {
return nil, credential.ErrGroupNotFound
}
return nil, fmt.Errorf("failed to get group: %w", err)
}
group := &iam_pb.Group{
Name: groupName,
Disabled: disabled,
}
if err := json.Unmarshal(membersJSON, &group.Members); err != nil {
return nil, fmt.Errorf("failed to unmarshal members: %w", err)
}
if err := json.Unmarshal(policyNamesJSON, &group.PolicyNames); err != nil {
return nil, fmt.Errorf("failed to unmarshal policy_names: %w", err)
}
return group, nil
}
func (store *PostgresStore) DeleteGroup(ctx context.Context, groupName string) error {
result, err := store.db.ExecContext(ctx, `DELETE FROM groups WHERE name = $1`, groupName)
if err != nil {
return fmt.Errorf("failed to delete group: %w", err)
}
rows, err := result.RowsAffected()
if err != nil {
return fmt.Errorf("failed to get rows affected: %w", err)
}
if rows == 0 {
return credential.ErrGroupNotFound
}
return nil
}
func (store *PostgresStore) ListGroups(ctx context.Context) ([]string, error) {
rows, err := store.db.QueryContext(ctx, `SELECT name FROM groups ORDER BY name`)
if err != nil {
return nil, fmt.Errorf("failed to list groups: %w", err)
}
defer rows.Close()
var names []string
for rows.Next() {
var name string
if err := rows.Scan(&name); err != nil {
return nil, fmt.Errorf("failed to scan group name: %w", err)
}
names = append(names, name)
}
return names, rows.Err()
}
func (store *PostgresStore) UpdateGroup(ctx context.Context, group *iam_pb.Group) error {
membersJSON, err := json.Marshal(group.Members)
if err != nil {
return fmt.Errorf("failed to marshal members: %w", err)
}
policyNamesJSON, err := json.Marshal(group.PolicyNames)
if err != nil {
return fmt.Errorf("failed to marshal policy_names: %w", err)
}
result, err := store.db.ExecContext(ctx,
`UPDATE groups SET members = $1, policy_names = $2, disabled = $3, updated_at = CURRENT_TIMESTAMP WHERE name = $4`,
membersJSON, policyNamesJSON, group.Disabled, group.Name)
if err != nil {
return fmt.Errorf("failed to update group: %w", err)
}
rows, err := result.RowsAffected()
if err != nil {
return fmt.Errorf("failed to get rows affected: %w", err)
}
if rows == 0 {
return credential.ErrGroupNotFound
}
return nil
}

16
weed/credential/postgres/postgres_store.go

@ -140,6 +140,18 @@ func (store *PostgresStore) createTables() error {
);
`
// Create groups table
groupsTable := `
CREATE TABLE IF NOT EXISTS groups (
name VARCHAR(255) PRIMARY KEY,
members JSONB DEFAULT '[]',
policy_names JSONB DEFAULT '[]',
disabled BOOLEAN DEFAULT FALSE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
`
// Execute table creation
if _, err := store.db.Exec(usersTable); err != nil {
return fmt.Errorf("failed to create users table: %w", err)
@ -162,6 +174,10 @@ func (store *PostgresStore) createTables() error {
return fmt.Errorf("failed to create service_accounts table: %w", err)
}
if _, err := store.db.Exec(groupsTable); err != nil {
return fmt.Errorf("failed to create groups table: %w", err)
}
return nil
}

Loading…
Cancel
Save