You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
544 lines
16 KiB
544 lines
16 KiB
package integration
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/karlseguin/ccache/v2"
|
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
|
"github.com/seaweedfs/seaweedfs/weed/iam/policy"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
|
"google.golang.org/grpc"
|
|
)
|
|
|
|
// RoleStore defines the interface for storing IAM role definitions
|
|
type RoleStore interface {
|
|
// StoreRole stores a role definition (filerAddress ignored for memory stores)
|
|
StoreRole(ctx context.Context, filerAddress string, roleName string, role *RoleDefinition) error
|
|
|
|
// GetRole retrieves a role definition (filerAddress ignored for memory stores)
|
|
GetRole(ctx context.Context, filerAddress string, roleName string) (*RoleDefinition, error)
|
|
|
|
// ListRoles lists all role names (filerAddress ignored for memory stores)
|
|
ListRoles(ctx context.Context, filerAddress string) ([]string, error)
|
|
|
|
// DeleteRole deletes a role definition (filerAddress ignored for memory stores)
|
|
DeleteRole(ctx context.Context, filerAddress string, roleName string) error
|
|
}
|
|
|
|
// MemoryRoleStore implements RoleStore using in-memory storage
|
|
type MemoryRoleStore struct {
|
|
roles map[string]*RoleDefinition
|
|
mutex sync.RWMutex
|
|
}
|
|
|
|
// NewMemoryRoleStore creates a new memory-based role store
|
|
func NewMemoryRoleStore() *MemoryRoleStore {
|
|
return &MemoryRoleStore{
|
|
roles: make(map[string]*RoleDefinition),
|
|
}
|
|
}
|
|
|
|
// StoreRole stores a role definition in memory (filerAddress ignored for memory store)
|
|
func (m *MemoryRoleStore) StoreRole(ctx context.Context, filerAddress string, roleName string, role *RoleDefinition) error {
|
|
if roleName == "" {
|
|
return fmt.Errorf("role name cannot be empty")
|
|
}
|
|
if role == nil {
|
|
return fmt.Errorf("role cannot be nil")
|
|
}
|
|
|
|
m.mutex.Lock()
|
|
defer m.mutex.Unlock()
|
|
|
|
// Deep copy the role to prevent external modifications
|
|
m.roles[roleName] = copyRoleDefinition(role)
|
|
return nil
|
|
}
|
|
|
|
// GetRole retrieves a role definition from memory (filerAddress ignored for memory store)
|
|
func (m *MemoryRoleStore) GetRole(ctx context.Context, filerAddress string, roleName string) (*RoleDefinition, error) {
|
|
if roleName == "" {
|
|
return nil, fmt.Errorf("role name cannot be empty")
|
|
}
|
|
|
|
m.mutex.RLock()
|
|
defer m.mutex.RUnlock()
|
|
|
|
role, exists := m.roles[roleName]
|
|
if !exists {
|
|
return nil, fmt.Errorf("role not found: %s", roleName)
|
|
}
|
|
|
|
// Return a copy to prevent external modifications
|
|
return copyRoleDefinition(role), nil
|
|
}
|
|
|
|
// ListRoles lists all role names in memory (filerAddress ignored for memory store)
|
|
func (m *MemoryRoleStore) ListRoles(ctx context.Context, filerAddress string) ([]string, error) {
|
|
m.mutex.RLock()
|
|
defer m.mutex.RUnlock()
|
|
|
|
names := make([]string, 0, len(m.roles))
|
|
for name := range m.roles {
|
|
names = append(names, name)
|
|
}
|
|
|
|
return names, nil
|
|
}
|
|
|
|
// DeleteRole deletes a role definition from memory (filerAddress ignored for memory store)
|
|
func (m *MemoryRoleStore) DeleteRole(ctx context.Context, filerAddress string, roleName string) error {
|
|
if roleName == "" {
|
|
return fmt.Errorf("role name cannot be empty")
|
|
}
|
|
|
|
m.mutex.Lock()
|
|
defer m.mutex.Unlock()
|
|
|
|
delete(m.roles, roleName)
|
|
return nil
|
|
}
|
|
|
|
// copyRoleDefinition creates a deep copy of a role definition
|
|
func copyRoleDefinition(original *RoleDefinition) *RoleDefinition {
|
|
if original == nil {
|
|
return nil
|
|
}
|
|
|
|
copied := &RoleDefinition{
|
|
RoleName: original.RoleName,
|
|
RoleArn: original.RoleArn,
|
|
Description: original.Description,
|
|
}
|
|
|
|
// Deep copy trust policy if it exists
|
|
if original.TrustPolicy != nil {
|
|
// Use JSON marshaling for deep copy of the complex policy structure
|
|
trustPolicyData, _ := json.Marshal(original.TrustPolicy)
|
|
var trustPolicyCopy policy.PolicyDocument
|
|
json.Unmarshal(trustPolicyData, &trustPolicyCopy)
|
|
copied.TrustPolicy = &trustPolicyCopy
|
|
}
|
|
|
|
// Copy attached policies slice
|
|
if original.AttachedPolicies != nil {
|
|
copied.AttachedPolicies = make([]string, len(original.AttachedPolicies))
|
|
copy(copied.AttachedPolicies, original.AttachedPolicies)
|
|
}
|
|
|
|
return copied
|
|
}
|
|
|
|
// FilerRoleStore implements RoleStore using SeaweedFS filer
|
|
type FilerRoleStore struct {
|
|
grpcDialOption grpc.DialOption
|
|
basePath string
|
|
filerAddressProvider func() string
|
|
}
|
|
|
|
// NewFilerRoleStore creates a new filer-based role store
|
|
func NewFilerRoleStore(config map[string]interface{}, filerAddressProvider func() string) (*FilerRoleStore, error) {
|
|
store := &FilerRoleStore{
|
|
basePath: "/etc/iam/roles", // Default path for role storage - aligned with /etc/ convention
|
|
filerAddressProvider: filerAddressProvider,
|
|
}
|
|
|
|
// Parse configuration - only basePath and other settings, NOT filerAddress
|
|
if config != nil {
|
|
if basePath, ok := config["basePath"].(string); ok && basePath != "" {
|
|
store.basePath = strings.TrimSuffix(basePath, "/")
|
|
}
|
|
}
|
|
|
|
glog.V(2).Infof("Initialized FilerRoleStore with basePath %s", store.basePath)
|
|
|
|
return store, nil
|
|
}
|
|
|
|
// StoreRole stores a role definition in filer
|
|
func (f *FilerRoleStore) StoreRole(ctx context.Context, filerAddress string, roleName string, role *RoleDefinition) error {
|
|
// Use provider function if filerAddress is not provided
|
|
if filerAddress == "" && f.filerAddressProvider != nil {
|
|
filerAddress = f.filerAddressProvider()
|
|
}
|
|
if filerAddress == "" {
|
|
return fmt.Errorf("filer address is required for FilerRoleStore")
|
|
}
|
|
if roleName == "" {
|
|
return fmt.Errorf("role name cannot be empty")
|
|
}
|
|
if role == nil {
|
|
return fmt.Errorf("role cannot be nil")
|
|
}
|
|
|
|
// Serialize role to JSON
|
|
roleData, err := json.MarshalIndent(role, "", " ")
|
|
if err != nil {
|
|
return fmt.Errorf("failed to serialize role: %v", err)
|
|
}
|
|
|
|
rolePath := f.getRolePath(roleName)
|
|
|
|
// Store in filer
|
|
return f.withFilerClient(filerAddress, func(client filer_pb.SeaweedFilerClient) error {
|
|
request := &filer_pb.CreateEntryRequest{
|
|
Directory: f.basePath,
|
|
Entry: &filer_pb.Entry{
|
|
Name: f.getRoleFileName(roleName),
|
|
IsDirectory: false,
|
|
Attributes: &filer_pb.FuseAttributes{
|
|
Mtime: time.Now().Unix(),
|
|
Crtime: time.Now().Unix(),
|
|
FileMode: uint32(0600), // Read/write for owner only
|
|
Uid: uint32(0),
|
|
Gid: uint32(0),
|
|
},
|
|
Content: roleData,
|
|
},
|
|
}
|
|
|
|
glog.V(3).Infof("Storing role %s at %s", roleName, rolePath)
|
|
_, err := client.CreateEntry(ctx, request)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to store role %s: %v", roleName, err)
|
|
}
|
|
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// GetRole retrieves a role definition from filer
|
|
func (f *FilerRoleStore) GetRole(ctx context.Context, filerAddress string, roleName string) (*RoleDefinition, error) {
|
|
// Use provider function if filerAddress is not provided
|
|
if filerAddress == "" && f.filerAddressProvider != nil {
|
|
filerAddress = f.filerAddressProvider()
|
|
}
|
|
if filerAddress == "" {
|
|
return nil, fmt.Errorf("filer address is required for FilerRoleStore")
|
|
}
|
|
if roleName == "" {
|
|
return nil, fmt.Errorf("role name cannot be empty")
|
|
}
|
|
|
|
var roleData []byte
|
|
err := f.withFilerClient(filerAddress, func(client filer_pb.SeaweedFilerClient) error {
|
|
request := &filer_pb.LookupDirectoryEntryRequest{
|
|
Directory: f.basePath,
|
|
Name: f.getRoleFileName(roleName),
|
|
}
|
|
|
|
glog.V(3).Infof("Looking up role %s", roleName)
|
|
response, err := client.LookupDirectoryEntry(ctx, request)
|
|
if err != nil {
|
|
return fmt.Errorf("role not found: %v", err)
|
|
}
|
|
|
|
if response.Entry == nil {
|
|
return fmt.Errorf("role not found")
|
|
}
|
|
|
|
roleData = response.Entry.Content
|
|
return nil
|
|
})
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Deserialize role from JSON
|
|
var role RoleDefinition
|
|
if err := json.Unmarshal(roleData, &role); err != nil {
|
|
return nil, fmt.Errorf("failed to deserialize role: %v", err)
|
|
}
|
|
|
|
return &role, nil
|
|
}
|
|
|
|
// ListRoles lists all role names in filer
|
|
func (f *FilerRoleStore) ListRoles(ctx context.Context, filerAddress string) ([]string, error) {
|
|
// Use provider function if filerAddress is not provided
|
|
if filerAddress == "" && f.filerAddressProvider != nil {
|
|
filerAddress = f.filerAddressProvider()
|
|
}
|
|
if filerAddress == "" {
|
|
return nil, fmt.Errorf("filer address is required for FilerRoleStore")
|
|
}
|
|
|
|
var roleNames []string
|
|
|
|
err := f.withFilerClient(filerAddress, func(client filer_pb.SeaweedFilerClient) error {
|
|
request := &filer_pb.ListEntriesRequest{
|
|
Directory: f.basePath,
|
|
Prefix: "",
|
|
StartFromFileName: "",
|
|
InclusiveStartFrom: false,
|
|
Limit: 1000, // Process in batches of 1000
|
|
}
|
|
|
|
glog.V(3).Infof("Listing roles in %s", f.basePath)
|
|
stream, err := client.ListEntries(ctx, request)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to list roles: %v", err)
|
|
}
|
|
|
|
for {
|
|
resp, err := stream.Recv()
|
|
if err != nil {
|
|
break // End of stream or error
|
|
}
|
|
|
|
if resp.Entry == nil || resp.Entry.IsDirectory {
|
|
continue
|
|
}
|
|
|
|
// Extract role name from filename
|
|
filename := resp.Entry.Name
|
|
if strings.HasSuffix(filename, ".json") {
|
|
roleName := strings.TrimSuffix(filename, ".json")
|
|
roleNames = append(roleNames, roleName)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
})
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return roleNames, nil
|
|
}
|
|
|
|
// DeleteRole deletes a role definition from filer
|
|
func (f *FilerRoleStore) DeleteRole(ctx context.Context, filerAddress string, roleName string) error {
|
|
// Use provider function if filerAddress is not provided
|
|
if filerAddress == "" && f.filerAddressProvider != nil {
|
|
filerAddress = f.filerAddressProvider()
|
|
}
|
|
if filerAddress == "" {
|
|
return fmt.Errorf("filer address is required for FilerRoleStore")
|
|
}
|
|
if roleName == "" {
|
|
return fmt.Errorf("role name cannot be empty")
|
|
}
|
|
|
|
return f.withFilerClient(filerAddress, func(client filer_pb.SeaweedFilerClient) error {
|
|
request := &filer_pb.DeleteEntryRequest{
|
|
Directory: f.basePath,
|
|
Name: f.getRoleFileName(roleName),
|
|
IsDeleteData: true,
|
|
}
|
|
|
|
glog.V(3).Infof("Deleting role %s", roleName)
|
|
resp, err := client.DeleteEntry(ctx, request)
|
|
if err != nil {
|
|
if strings.Contains(err.Error(), "not found") {
|
|
return nil // Idempotent: deletion of non-existent role is successful
|
|
}
|
|
return fmt.Errorf("failed to delete role %s: %v", roleName, err)
|
|
}
|
|
|
|
if resp.Error != "" {
|
|
if strings.Contains(resp.Error, "not found") {
|
|
return nil // Idempotent: deletion of non-existent role is successful
|
|
}
|
|
return fmt.Errorf("failed to delete role %s: %s", roleName, resp.Error)
|
|
}
|
|
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// Helper methods for FilerRoleStore
|
|
|
|
func (f *FilerRoleStore) getRoleFileName(roleName string) string {
|
|
return roleName + ".json"
|
|
}
|
|
|
|
func (f *FilerRoleStore) getRolePath(roleName string) string {
|
|
return f.basePath + "/" + f.getRoleFileName(roleName)
|
|
}
|
|
|
|
func (f *FilerRoleStore) withFilerClient(filerAddress string, fn func(filer_pb.SeaweedFilerClient) error) error {
|
|
if filerAddress == "" {
|
|
return fmt.Errorf("filer address is required for FilerRoleStore")
|
|
}
|
|
return pb.WithGrpcFilerClient(false, 0, pb.ServerAddress(filerAddress), f.grpcDialOption, fn)
|
|
}
|
|
|
|
// CachedFilerRoleStore implements RoleStore with TTL caching on top of FilerRoleStore
|
|
type CachedFilerRoleStore struct {
|
|
filerStore *FilerRoleStore
|
|
cache *ccache.Cache
|
|
listCache *ccache.Cache
|
|
ttl time.Duration
|
|
listTTL time.Duration
|
|
}
|
|
|
|
// CachedFilerRoleStoreConfig holds configuration for the cached role store
|
|
type CachedFilerRoleStoreConfig struct {
|
|
BasePath string `json:"basePath,omitempty"`
|
|
TTL string `json:"ttl,omitempty"` // e.g., "5m", "1h"
|
|
ListTTL string `json:"listTtl,omitempty"` // e.g., "1m", "30s"
|
|
MaxCacheSize int `json:"maxCacheSize,omitempty"` // Maximum number of cached roles
|
|
}
|
|
|
|
// NewCachedFilerRoleStore creates a new cached filer-based role store
|
|
func NewCachedFilerRoleStore(config map[string]interface{}) (*CachedFilerRoleStore, error) {
|
|
// Create underlying filer store
|
|
filerStore, err := NewFilerRoleStore(config, nil)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create filer role store: %w", err)
|
|
}
|
|
|
|
// Parse cache configuration with defaults
|
|
cacheTTL := 5 * time.Minute // Default 5 minutes for role cache
|
|
listTTL := 1 * time.Minute // Default 1 minute for list cache
|
|
maxCacheSize := 1000 // Default max 1000 cached roles
|
|
|
|
if config != nil {
|
|
if ttlStr, ok := config["ttl"].(string); ok && ttlStr != "" {
|
|
if parsed, err := time.ParseDuration(ttlStr); err == nil {
|
|
cacheTTL = parsed
|
|
}
|
|
}
|
|
if listTTLStr, ok := config["listTtl"].(string); ok && listTTLStr != "" {
|
|
if parsed, err := time.ParseDuration(listTTLStr); err == nil {
|
|
listTTL = parsed
|
|
}
|
|
}
|
|
if maxSize, ok := config["maxCacheSize"].(int); ok && maxSize > 0 {
|
|
maxCacheSize = maxSize
|
|
}
|
|
}
|
|
|
|
// Create ccache instances with appropriate configurations
|
|
pruneCount := int64(maxCacheSize) >> 3
|
|
if pruneCount <= 0 {
|
|
pruneCount = 100
|
|
}
|
|
|
|
store := &CachedFilerRoleStore{
|
|
filerStore: filerStore,
|
|
cache: ccache.New(ccache.Configure().MaxSize(int64(maxCacheSize)).ItemsToPrune(uint32(pruneCount))),
|
|
listCache: ccache.New(ccache.Configure().MaxSize(100).ItemsToPrune(10)), // Smaller cache for lists
|
|
ttl: cacheTTL,
|
|
listTTL: listTTL,
|
|
}
|
|
|
|
glog.V(2).Infof("Initialized CachedFilerRoleStore with TTL %v, List TTL %v, Max Cache Size %d",
|
|
cacheTTL, listTTL, maxCacheSize)
|
|
|
|
return store, nil
|
|
}
|
|
|
|
// StoreRole stores a role definition and invalidates the cache
|
|
func (c *CachedFilerRoleStore) StoreRole(ctx context.Context, filerAddress string, roleName string, role *RoleDefinition) error {
|
|
// Store in filer
|
|
err := c.filerStore.StoreRole(ctx, filerAddress, roleName, role)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Invalidate cache entries
|
|
c.cache.Delete(roleName)
|
|
c.listCache.Clear() // Invalidate list cache
|
|
|
|
glog.V(3).Infof("Stored and invalidated cache for role %s", roleName)
|
|
return nil
|
|
}
|
|
|
|
// GetRole retrieves a role definition with caching
|
|
func (c *CachedFilerRoleStore) GetRole(ctx context.Context, filerAddress string, roleName string) (*RoleDefinition, error) {
|
|
// Try to get from cache first
|
|
item := c.cache.Get(roleName)
|
|
if item != nil {
|
|
// Cache hit - return cached role (DO NOT extend TTL)
|
|
role := item.Value().(*RoleDefinition)
|
|
glog.V(4).Infof("Cache hit for role %s", roleName)
|
|
return copyRoleDefinition(role), nil
|
|
}
|
|
|
|
// Cache miss - fetch from filer
|
|
glog.V(4).Infof("Cache miss for role %s, fetching from filer", roleName)
|
|
role, err := c.filerStore.GetRole(ctx, filerAddress, roleName)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Cache the result with TTL
|
|
c.cache.Set(roleName, copyRoleDefinition(role), c.ttl)
|
|
glog.V(3).Infof("Cached role %s with TTL %v", roleName, c.ttl)
|
|
return role, nil
|
|
}
|
|
|
|
// ListRoles lists all role names with caching
|
|
func (c *CachedFilerRoleStore) ListRoles(ctx context.Context, filerAddress string) ([]string, error) {
|
|
// Use a constant key for the role list cache
|
|
const listCacheKey = "role_list"
|
|
|
|
// Try to get from list cache first
|
|
item := c.listCache.Get(listCacheKey)
|
|
if item != nil {
|
|
// Cache hit - return cached list (DO NOT extend TTL)
|
|
roles := item.Value().([]string)
|
|
glog.V(4).Infof("List cache hit, returning %d roles", len(roles))
|
|
return append([]string(nil), roles...), nil // Return a copy
|
|
}
|
|
|
|
// Cache miss - fetch from filer
|
|
glog.V(4).Infof("List cache miss, fetching from filer")
|
|
roles, err := c.filerStore.ListRoles(ctx, filerAddress)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Cache the result with TTL (store a copy)
|
|
rolesCopy := append([]string(nil), roles...)
|
|
c.listCache.Set(listCacheKey, rolesCopy, c.listTTL)
|
|
glog.V(3).Infof("Cached role list with %d entries, TTL %v", len(roles), c.listTTL)
|
|
return roles, nil
|
|
}
|
|
|
|
// DeleteRole deletes a role definition and invalidates the cache
|
|
func (c *CachedFilerRoleStore) DeleteRole(ctx context.Context, filerAddress string, roleName string) error {
|
|
// Delete from filer
|
|
err := c.filerStore.DeleteRole(ctx, filerAddress, roleName)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Invalidate cache entries
|
|
c.cache.Delete(roleName)
|
|
c.listCache.Clear() // Invalidate list cache
|
|
|
|
glog.V(3).Infof("Deleted and invalidated cache for role %s", roleName)
|
|
return nil
|
|
}
|
|
|
|
// ClearCache clears all cached entries (for testing or manual cache invalidation)
|
|
func (c *CachedFilerRoleStore) ClearCache() {
|
|
c.cache.Clear()
|
|
c.listCache.Clear()
|
|
glog.V(2).Infof("Cleared all role cache entries")
|
|
}
|
|
|
|
// GetCacheStats returns cache statistics
|
|
func (c *CachedFilerRoleStore) GetCacheStats() map[string]interface{} {
|
|
return map[string]interface{}{
|
|
"roleCache": map[string]interface{}{
|
|
"size": c.cache.ItemCount(),
|
|
"ttl": c.ttl.String(),
|
|
},
|
|
"listCache": map[string]interface{}{
|
|
"size": c.listCache.ItemCount(),
|
|
"ttl": c.listTTL.String(),
|
|
},
|
|
}
|
|
}
|