You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

544 lines
16 KiB

package integration
import (
"context"
"encoding/json"
"fmt"
"strings"
"sync"
"time"
"github.com/karlseguin/ccache/v2"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/iam/policy"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"google.golang.org/grpc"
)
// RoleStore defines the interface for storing IAM role definitions
type RoleStore interface {
// StoreRole stores a role definition (filerAddress ignored for memory stores)
StoreRole(ctx context.Context, filerAddress string, roleName string, role *RoleDefinition) error
// GetRole retrieves a role definition (filerAddress ignored for memory stores)
GetRole(ctx context.Context, filerAddress string, roleName string) (*RoleDefinition, error)
// ListRoles lists all role names (filerAddress ignored for memory stores)
ListRoles(ctx context.Context, filerAddress string) ([]string, error)
// DeleteRole deletes a role definition (filerAddress ignored for memory stores)
DeleteRole(ctx context.Context, filerAddress string, roleName string) error
}
// MemoryRoleStore implements RoleStore using in-memory storage
type MemoryRoleStore struct {
roles map[string]*RoleDefinition
mutex sync.RWMutex
}
// NewMemoryRoleStore creates a new memory-based role store
func NewMemoryRoleStore() *MemoryRoleStore {
return &MemoryRoleStore{
roles: make(map[string]*RoleDefinition),
}
}
// StoreRole stores a role definition in memory (filerAddress ignored for memory store)
func (m *MemoryRoleStore) StoreRole(ctx context.Context, filerAddress string, roleName string, role *RoleDefinition) error {
if roleName == "" {
return fmt.Errorf("role name cannot be empty")
}
if role == nil {
return fmt.Errorf("role cannot be nil")
}
m.mutex.Lock()
defer m.mutex.Unlock()
// Deep copy the role to prevent external modifications
m.roles[roleName] = copyRoleDefinition(role)
return nil
}
// GetRole retrieves a role definition from memory (filerAddress ignored for memory store)
func (m *MemoryRoleStore) GetRole(ctx context.Context, filerAddress string, roleName string) (*RoleDefinition, error) {
if roleName == "" {
return nil, fmt.Errorf("role name cannot be empty")
}
m.mutex.RLock()
defer m.mutex.RUnlock()
role, exists := m.roles[roleName]
if !exists {
return nil, fmt.Errorf("role not found: %s", roleName)
}
// Return a copy to prevent external modifications
return copyRoleDefinition(role), nil
}
// ListRoles lists all role names in memory (filerAddress ignored for memory store)
func (m *MemoryRoleStore) ListRoles(ctx context.Context, filerAddress string) ([]string, error) {
m.mutex.RLock()
defer m.mutex.RUnlock()
names := make([]string, 0, len(m.roles))
for name := range m.roles {
names = append(names, name)
}
return names, nil
}
// DeleteRole deletes a role definition from memory (filerAddress ignored for memory store)
func (m *MemoryRoleStore) DeleteRole(ctx context.Context, filerAddress string, roleName string) error {
if roleName == "" {
return fmt.Errorf("role name cannot be empty")
}
m.mutex.Lock()
defer m.mutex.Unlock()
delete(m.roles, roleName)
return nil
}
// copyRoleDefinition creates a deep copy of a role definition
func copyRoleDefinition(original *RoleDefinition) *RoleDefinition {
if original == nil {
return nil
}
copied := &RoleDefinition{
RoleName: original.RoleName,
RoleArn: original.RoleArn,
Description: original.Description,
}
// Deep copy trust policy if it exists
if original.TrustPolicy != nil {
// Use JSON marshaling for deep copy of the complex policy structure
trustPolicyData, _ := json.Marshal(original.TrustPolicy)
var trustPolicyCopy policy.PolicyDocument
json.Unmarshal(trustPolicyData, &trustPolicyCopy)
copied.TrustPolicy = &trustPolicyCopy
}
// Copy attached policies slice
if original.AttachedPolicies != nil {
copied.AttachedPolicies = make([]string, len(original.AttachedPolicies))
copy(copied.AttachedPolicies, original.AttachedPolicies)
}
return copied
}
// FilerRoleStore implements RoleStore using SeaweedFS filer
type FilerRoleStore struct {
grpcDialOption grpc.DialOption
basePath string
filerAddressProvider func() string
}
// NewFilerRoleStore creates a new filer-based role store
func NewFilerRoleStore(config map[string]interface{}, filerAddressProvider func() string) (*FilerRoleStore, error) {
store := &FilerRoleStore{
basePath: "/etc/iam/roles", // Default path for role storage - aligned with /etc/ convention
filerAddressProvider: filerAddressProvider,
}
// Parse configuration - only basePath and other settings, NOT filerAddress
if config != nil {
if basePath, ok := config["basePath"].(string); ok && basePath != "" {
store.basePath = strings.TrimSuffix(basePath, "/")
}
}
glog.V(2).Infof("Initialized FilerRoleStore with basePath %s", store.basePath)
return store, nil
}
// StoreRole stores a role definition in filer
func (f *FilerRoleStore) StoreRole(ctx context.Context, filerAddress string, roleName string, role *RoleDefinition) error {
// Use provider function if filerAddress is not provided
if filerAddress == "" && f.filerAddressProvider != nil {
filerAddress = f.filerAddressProvider()
}
if filerAddress == "" {
return fmt.Errorf("filer address is required for FilerRoleStore")
}
if roleName == "" {
return fmt.Errorf("role name cannot be empty")
}
if role == nil {
return fmt.Errorf("role cannot be nil")
}
// Serialize role to JSON
roleData, err := json.MarshalIndent(role, "", " ")
if err != nil {
return fmt.Errorf("failed to serialize role: %v", err)
}
rolePath := f.getRolePath(roleName)
// Store in filer
return f.withFilerClient(filerAddress, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.CreateEntryRequest{
Directory: f.basePath,
Entry: &filer_pb.Entry{
Name: f.getRoleFileName(roleName),
IsDirectory: false,
Attributes: &filer_pb.FuseAttributes{
Mtime: time.Now().Unix(),
Crtime: time.Now().Unix(),
FileMode: uint32(0600), // Read/write for owner only
Uid: uint32(0),
Gid: uint32(0),
},
Content: roleData,
},
}
glog.V(3).Infof("Storing role %s at %s", roleName, rolePath)
_, err := client.CreateEntry(ctx, request)
if err != nil {
return fmt.Errorf("failed to store role %s: %v", roleName, err)
}
return nil
})
}
// GetRole retrieves a role definition from filer
func (f *FilerRoleStore) GetRole(ctx context.Context, filerAddress string, roleName string) (*RoleDefinition, error) {
// Use provider function if filerAddress is not provided
if filerAddress == "" && f.filerAddressProvider != nil {
filerAddress = f.filerAddressProvider()
}
if filerAddress == "" {
return nil, fmt.Errorf("filer address is required for FilerRoleStore")
}
if roleName == "" {
return nil, fmt.Errorf("role name cannot be empty")
}
var roleData []byte
err := f.withFilerClient(filerAddress, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.LookupDirectoryEntryRequest{
Directory: f.basePath,
Name: f.getRoleFileName(roleName),
}
glog.V(3).Infof("Looking up role %s", roleName)
response, err := client.LookupDirectoryEntry(ctx, request)
if err != nil {
return fmt.Errorf("role not found: %v", err)
}
if response.Entry == nil {
return fmt.Errorf("role not found")
}
roleData = response.Entry.Content
return nil
})
if err != nil {
return nil, err
}
// Deserialize role from JSON
var role RoleDefinition
if err := json.Unmarshal(roleData, &role); err != nil {
return nil, fmt.Errorf("failed to deserialize role: %v", err)
}
return &role, nil
}
// ListRoles lists all role names in filer
func (f *FilerRoleStore) ListRoles(ctx context.Context, filerAddress string) ([]string, error) {
// Use provider function if filerAddress is not provided
if filerAddress == "" && f.filerAddressProvider != nil {
filerAddress = f.filerAddressProvider()
}
if filerAddress == "" {
return nil, fmt.Errorf("filer address is required for FilerRoleStore")
}
var roleNames []string
err := f.withFilerClient(filerAddress, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.ListEntriesRequest{
Directory: f.basePath,
Prefix: "",
StartFromFileName: "",
InclusiveStartFrom: false,
Limit: 1000, // Process in batches of 1000
}
glog.V(3).Infof("Listing roles in %s", f.basePath)
stream, err := client.ListEntries(ctx, request)
if err != nil {
return fmt.Errorf("failed to list roles: %v", err)
}
for {
resp, err := stream.Recv()
if err != nil {
break // End of stream or error
}
if resp.Entry == nil || resp.Entry.IsDirectory {
continue
}
// Extract role name from filename
filename := resp.Entry.Name
if strings.HasSuffix(filename, ".json") {
roleName := strings.TrimSuffix(filename, ".json")
roleNames = append(roleNames, roleName)
}
}
return nil
})
if err != nil {
return nil, err
}
return roleNames, nil
}
// DeleteRole deletes a role definition from filer
func (f *FilerRoleStore) DeleteRole(ctx context.Context, filerAddress string, roleName string) error {
// Use provider function if filerAddress is not provided
if filerAddress == "" && f.filerAddressProvider != nil {
filerAddress = f.filerAddressProvider()
}
if filerAddress == "" {
return fmt.Errorf("filer address is required for FilerRoleStore")
}
if roleName == "" {
return fmt.Errorf("role name cannot be empty")
}
return f.withFilerClient(filerAddress, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.DeleteEntryRequest{
Directory: f.basePath,
Name: f.getRoleFileName(roleName),
IsDeleteData: true,
}
glog.V(3).Infof("Deleting role %s", roleName)
resp, err := client.DeleteEntry(ctx, request)
if err != nil {
if strings.Contains(err.Error(), "not found") {
return nil // Idempotent: deletion of non-existent role is successful
}
return fmt.Errorf("failed to delete role %s: %v", roleName, err)
}
if resp.Error != "" {
if strings.Contains(resp.Error, "not found") {
return nil // Idempotent: deletion of non-existent role is successful
}
return fmt.Errorf("failed to delete role %s: %s", roleName, resp.Error)
}
return nil
})
}
// Helper methods for FilerRoleStore
func (f *FilerRoleStore) getRoleFileName(roleName string) string {
return roleName + ".json"
}
func (f *FilerRoleStore) getRolePath(roleName string) string {
return f.basePath + "/" + f.getRoleFileName(roleName)
}
func (f *FilerRoleStore) withFilerClient(filerAddress string, fn func(filer_pb.SeaweedFilerClient) error) error {
if filerAddress == "" {
return fmt.Errorf("filer address is required for FilerRoleStore")
}
return pb.WithGrpcFilerClient(false, 0, pb.ServerAddress(filerAddress), f.grpcDialOption, fn)
}
// CachedFilerRoleStore implements RoleStore with TTL caching on top of FilerRoleStore
type CachedFilerRoleStore struct {
filerStore *FilerRoleStore
cache *ccache.Cache
listCache *ccache.Cache
ttl time.Duration
listTTL time.Duration
}
// CachedFilerRoleStoreConfig holds configuration for the cached role store
type CachedFilerRoleStoreConfig struct {
BasePath string `json:"basePath,omitempty"`
TTL string `json:"ttl,omitempty"` // e.g., "5m", "1h"
ListTTL string `json:"listTtl,omitempty"` // e.g., "1m", "30s"
MaxCacheSize int `json:"maxCacheSize,omitempty"` // Maximum number of cached roles
}
// NewCachedFilerRoleStore creates a new cached filer-based role store
func NewCachedFilerRoleStore(config map[string]interface{}) (*CachedFilerRoleStore, error) {
// Create underlying filer store
filerStore, err := NewFilerRoleStore(config, nil)
if err != nil {
return nil, fmt.Errorf("failed to create filer role store: %w", err)
}
// Parse cache configuration with defaults
cacheTTL := 5 * time.Minute // Default 5 minutes for role cache
listTTL := 1 * time.Minute // Default 1 minute for list cache
maxCacheSize := 1000 // Default max 1000 cached roles
if config != nil {
if ttlStr, ok := config["ttl"].(string); ok && ttlStr != "" {
if parsed, err := time.ParseDuration(ttlStr); err == nil {
cacheTTL = parsed
}
}
if listTTLStr, ok := config["listTtl"].(string); ok && listTTLStr != "" {
if parsed, err := time.ParseDuration(listTTLStr); err == nil {
listTTL = parsed
}
}
if maxSize, ok := config["maxCacheSize"].(int); ok && maxSize > 0 {
maxCacheSize = maxSize
}
}
// Create ccache instances with appropriate configurations
pruneCount := int64(maxCacheSize) >> 3
if pruneCount <= 0 {
pruneCount = 100
}
store := &CachedFilerRoleStore{
filerStore: filerStore,
cache: ccache.New(ccache.Configure().MaxSize(int64(maxCacheSize)).ItemsToPrune(uint32(pruneCount))),
listCache: ccache.New(ccache.Configure().MaxSize(100).ItemsToPrune(10)), // Smaller cache for lists
ttl: cacheTTL,
listTTL: listTTL,
}
glog.V(2).Infof("Initialized CachedFilerRoleStore with TTL %v, List TTL %v, Max Cache Size %d",
cacheTTL, listTTL, maxCacheSize)
return store, nil
}
// StoreRole stores a role definition and invalidates the cache
func (c *CachedFilerRoleStore) StoreRole(ctx context.Context, filerAddress string, roleName string, role *RoleDefinition) error {
// Store in filer
err := c.filerStore.StoreRole(ctx, filerAddress, roleName, role)
if err != nil {
return err
}
// Invalidate cache entries
c.cache.Delete(roleName)
c.listCache.Clear() // Invalidate list cache
glog.V(3).Infof("Stored and invalidated cache for role %s", roleName)
return nil
}
// GetRole retrieves a role definition with caching
func (c *CachedFilerRoleStore) GetRole(ctx context.Context, filerAddress string, roleName string) (*RoleDefinition, error) {
// Try to get from cache first
item := c.cache.Get(roleName)
if item != nil {
// Cache hit - return cached role (DO NOT extend TTL)
role := item.Value().(*RoleDefinition)
glog.V(4).Infof("Cache hit for role %s", roleName)
return copyRoleDefinition(role), nil
}
// Cache miss - fetch from filer
glog.V(4).Infof("Cache miss for role %s, fetching from filer", roleName)
role, err := c.filerStore.GetRole(ctx, filerAddress, roleName)
if err != nil {
return nil, err
}
// Cache the result with TTL
c.cache.Set(roleName, copyRoleDefinition(role), c.ttl)
glog.V(3).Infof("Cached role %s with TTL %v", roleName, c.ttl)
return role, nil
}
// ListRoles lists all role names with caching
func (c *CachedFilerRoleStore) ListRoles(ctx context.Context, filerAddress string) ([]string, error) {
// Use a constant key for the role list cache
const listCacheKey = "role_list"
// Try to get from list cache first
item := c.listCache.Get(listCacheKey)
if item != nil {
// Cache hit - return cached list (DO NOT extend TTL)
roles := item.Value().([]string)
glog.V(4).Infof("List cache hit, returning %d roles", len(roles))
return append([]string(nil), roles...), nil // Return a copy
}
// Cache miss - fetch from filer
glog.V(4).Infof("List cache miss, fetching from filer")
roles, err := c.filerStore.ListRoles(ctx, filerAddress)
if err != nil {
return nil, err
}
// Cache the result with TTL (store a copy)
rolesCopy := append([]string(nil), roles...)
c.listCache.Set(listCacheKey, rolesCopy, c.listTTL)
glog.V(3).Infof("Cached role list with %d entries, TTL %v", len(roles), c.listTTL)
return roles, nil
}
// DeleteRole deletes a role definition and invalidates the cache
func (c *CachedFilerRoleStore) DeleteRole(ctx context.Context, filerAddress string, roleName string) error {
// Delete from filer
err := c.filerStore.DeleteRole(ctx, filerAddress, roleName)
if err != nil {
return err
}
// Invalidate cache entries
c.cache.Delete(roleName)
c.listCache.Clear() // Invalidate list cache
glog.V(3).Infof("Deleted and invalidated cache for role %s", roleName)
return nil
}
// ClearCache clears all cached entries (for testing or manual cache invalidation)
func (c *CachedFilerRoleStore) ClearCache() {
c.cache.Clear()
c.listCache.Clear()
glog.V(2).Infof("Cleared all role cache entries")
}
// GetCacheStats returns cache statistics
func (c *CachedFilerRoleStore) GetCacheStats() map[string]interface{} {
return map[string]interface{}{
"roleCache": map[string]interface{}{
"size": c.cache.ItemCount(),
"ttl": c.ttl.String(),
},
"listCache": map[string]interface{}{
"size": c.listCache.ItemCount(),
"ttl": c.listTTL.String(),
},
}
}