Browse Source

Add credential storage (#6938)

* add credential store interface

* load credential.toml

* lint

* create credentialManager with explicit store type

* add type name

* InitializeCredentialManager

* remove unused functions

* fix missing import

* fix import

* fix nil configuration
testing-sdx-generation
Chris Lu 3 months ago
committed by GitHub
parent
commit
1db7c2b8aa
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 48
      weed/admin/dash/admin_server.go
  2. 319
      weed/admin/dash/user_management.go
  3. 9
      weed/command/iam.go
  4. 3
      weed/command/s3.go
  5. 11
      weed/command/scaffold.go
  6. 55
      weed/command/scaffold/credential.toml
  7. 3
      weed/command/scaffold/example.go
  8. 182
      weed/credential/README.md
  9. 133
      weed/credential/config_loader.go
  10. 125
      weed/credential/credential_manager.go
  11. 91
      weed/credential/credential_store.go
  12. 353
      weed/credential/credential_test.go
  13. 235
      weed/credential/filer_etc/filer_etc_store.go
  14. 373
      weed/credential/memory/memory_store.go
  15. 315
      weed/credential/memory/memory_store_test.go
  16. 221
      weed/credential/migration.go
  17. 570
      weed/credential/postgres/postgres_store.go
  18. 557
      weed/credential/sqlite/sqlite_store.go
  19. 122
      weed/credential/test/integration_test.go
  20. 59
      weed/iamapi/iamapi_server.go
  21. 62
      weed/s3api/auth_credentials.go
  22. 8
      weed/s3api/s3api_put_object_helper_test.go
  23. 18
      weed/s3api/s3api_server.go

48
weed/admin/dash/admin_server.go

@ -9,6 +9,7 @@ import (
"time"
"github.com/seaweedfs/seaweedfs/weed/cluster"
"github.com/seaweedfs/seaweedfs/weed/credential"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/operation"
@ -34,6 +35,9 @@ type AdminServer struct {
cachedFilers []string
lastFilerUpdate time.Time
filerCacheExpiration time.Duration
// Credential management
credentialManager *credential.CredentialManager
}
type ClusterTopology struct {
@ -195,13 +199,55 @@ type ClusterFilersData struct {
}
func NewAdminServer(masterAddress string, templateFS http.FileSystem) *AdminServer {
return &AdminServer{
server := &AdminServer{
masterAddress: masterAddress,
templateFS: templateFS,
grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.client"),
cacheExpiration: 10 * time.Second,
filerCacheExpiration: 30 * time.Second, // Cache filers for 30 seconds
}
// Initialize credential manager with defaults
credentialManager, err := credential.NewCredentialManagerWithDefaults("")
if err != nil {
glog.Warningf("Failed to initialize credential manager: %v", err)
// Continue without credential manager - will fall back to legacy approach
} else {
// For stores that need filer client details, set them
if store := credentialManager.GetStore(); store != nil {
if filerClientSetter, ok := store.(interface {
SetFilerClient(string, grpc.DialOption)
}); ok {
// We'll set the filer client later when we discover filers
// For now, just store the credential manager
server.credentialManager = credentialManager
// Set up a goroutine to set filer client once we discover filers
go func() {
for {
filerAddr := server.GetFilerAddress()
if filerAddr != "" {
filerClientSetter.SetFilerClient(filerAddr, server.grpcDialOption)
glog.V(1).Infof("Set filer client for credential manager: %s", filerAddr)
break
}
time.Sleep(5 * time.Second) // Retry every 5 seconds
}
}()
} else {
server.credentialManager = credentialManager
}
} else {
server.credentialManager = credentialManager
}
}
return server
}
// GetCredentialManager returns the credential manager
func (s *AdminServer) GetCredentialManager() *credential.CredentialManager {
return s.credentialManager
}
// GetFilerAddress returns a filer address, discovering from masters if needed

319
weed/admin/dash/user_management.go

@ -1,45 +1,23 @@
package dash
import (
"bytes"
"context"
"crypto/rand"
"encoding/base64"
"fmt"
"time"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/credential"
"github.com/seaweedfs/seaweedfs/weed/pb/iam_pb"
)
// CreateObjectStoreUser creates a new user in identity.json
// CreateObjectStoreUser creates a new user using the credential manager
func (s *AdminServer) CreateObjectStoreUser(req CreateUserRequest) (*ObjectStoreUser, error) {
s3cfg := &iam_pb.S3ApiConfiguration{}
// Load existing configuration
err := s.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
var buf bytes.Buffer
if err := filer.ReadEntry(nil, client, filer.IamConfigDirectory, filer.IamIdentityFile, &buf); err != nil {
if err != filer_pb.ErrNotFound {
return err
}
}
if buf.Len() > 0 {
return filer.ParseS3ConfigurationFromBytes(buf.Bytes(), s3cfg)
}
return nil
})
if err != nil {
return nil, fmt.Errorf("failed to load IAM configuration: %v", err)
if s.credentialManager == nil {
return nil, fmt.Errorf("credential manager not available")
}
// Check if user already exists
for _, identity := range s3cfg.Identities {
if identity.Name == req.Username {
return nil, fmt.Errorf("user %s already exists", req.Username)
}
}
ctx := context.Background()
// Create new identity
newIdentity := &iam_pb.Identity{
@ -69,13 +47,13 @@ func (s *AdminServer) CreateObjectStoreUser(req CreateUserRequest) (*ObjectStore
}
}
// Add to configuration
s3cfg.Identities = append(s3cfg.Identities, newIdentity)
// Save configuration
err = s.saveS3Configuration(s3cfg)
// Create user using credential manager
err := s.credentialManager.CreateUser(ctx, newIdentity)
if err != nil {
return nil, fmt.Errorf("failed to save IAM configuration: %v", err)
if err == credential.ErrUserAlreadyExists {
return nil, fmt.Errorf("user %s already exists", req.Username)
}
return nil, fmt.Errorf("failed to create user: %v", err)
}
// Return created user
@ -92,35 +70,27 @@ func (s *AdminServer) CreateObjectStoreUser(req CreateUserRequest) (*ObjectStore
// UpdateObjectStoreUser updates an existing user
func (s *AdminServer) UpdateObjectStoreUser(username string, req UpdateUserRequest) (*ObjectStoreUser, error) {
s3cfg := &iam_pb.S3ApiConfiguration{}
// Load existing configuration
err := s.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
var buf bytes.Buffer
if err := filer.ReadEntry(nil, client, filer.IamConfigDirectory, filer.IamIdentityFile, &buf); err != nil {
return err
if s.credentialManager == nil {
return nil, fmt.Errorf("credential manager not available")
}
if buf.Len() > 0 {
return filer.ParseS3ConfigurationFromBytes(buf.Bytes(), s3cfg)
}
return nil
})
if err != nil {
return nil, fmt.Errorf("failed to load IAM configuration: %v", err)
}
ctx := context.Background()
// Find and update user
var updatedIdentity *iam_pb.Identity
for _, identity := range s3cfg.Identities {
if identity.Name == username {
updatedIdentity = identity
break
// Get existing user
identity, err := s.credentialManager.GetUser(ctx, username)
if err != nil {
if err == credential.ErrUserNotFound {
return nil, fmt.Errorf("user %s not found", username)
}
return nil, fmt.Errorf("failed to get user: %v", err)
}
if updatedIdentity == nil {
return nil, fmt.Errorf("user %s not found", username)
// Create updated identity
updatedIdentity := &iam_pb.Identity{
Name: identity.Name,
Account: identity.Account,
Credentials: identity.Credentials,
Actions: identity.Actions,
}
// Update actions if provided
@ -139,10 +109,10 @@ func (s *AdminServer) UpdateObjectStoreUser(username string, req UpdateUserReque
updatedIdentity.Account.EmailAddress = req.Email
}
// Save configuration
err = s.saveS3Configuration(s3cfg)
// Update user using credential manager
err = s.credentialManager.UpdateUser(ctx, username, updatedIdentity)
if err != nil {
return nil, fmt.Errorf("failed to save IAM configuration: %v", err)
return nil, fmt.Errorf("failed to update user: %v", err)
}
// Return updated user
@ -161,67 +131,43 @@ func (s *AdminServer) UpdateObjectStoreUser(username string, req UpdateUserReque
return user, nil
}
// DeleteObjectStoreUser deletes a user from identity.json
// DeleteObjectStoreUser deletes a user using the credential manager
func (s *AdminServer) DeleteObjectStoreUser(username string) error {
s3cfg := &iam_pb.S3ApiConfiguration{}
// Load existing configuration
err := s.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
var buf bytes.Buffer
if err := filer.ReadEntry(nil, client, filer.IamConfigDirectory, filer.IamIdentityFile, &buf); err != nil {
return err
}
if buf.Len() > 0 {
return filer.ParseS3ConfigurationFromBytes(buf.Bytes(), s3cfg)
}
return nil
})
if err != nil {
return fmt.Errorf("failed to load IAM configuration: %v", err)
if s.credentialManager == nil {
return fmt.Errorf("credential manager not available")
}
// Find and remove user
found := false
for i, identity := range s3cfg.Identities {
if identity.Name == username {
s3cfg.Identities = append(s3cfg.Identities[:i], s3cfg.Identities[i+1:]...)
found = true
break
}
}
ctx := context.Background()
if !found {
// Delete user using credential manager
err := s.credentialManager.DeleteUser(ctx, username)
if err != nil {
if err == credential.ErrUserNotFound {
return fmt.Errorf("user %s not found", username)
}
return fmt.Errorf("failed to delete user: %v", err)
}
// Save configuration
return s.saveS3Configuration(s3cfg)
return nil
}
// GetObjectStoreUserDetails returns detailed information about a user
func (s *AdminServer) GetObjectStoreUserDetails(username string) (*UserDetails, error) {
s3cfg := &iam_pb.S3ApiConfiguration{}
// Load existing configuration
err := s.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
var buf bytes.Buffer
if err := filer.ReadEntry(nil, client, filer.IamConfigDirectory, filer.IamIdentityFile, &buf); err != nil {
return err
}
if buf.Len() > 0 {
return filer.ParseS3ConfigurationFromBytes(buf.Bytes(), s3cfg)
if s.credentialManager == nil {
return nil, fmt.Errorf("credential manager not available")
}
return nil
})
ctx := context.Background()
// Get user using credential manager
identity, err := s.credentialManager.GetUser(ctx, username)
if err != nil {
return nil, fmt.Errorf("failed to load IAM configuration: %v", err)
if err == credential.ErrUserNotFound {
return nil, fmt.Errorf("user %s not found", username)
}
return nil, fmt.Errorf("failed to get user: %v", err)
}
// Find user
for _, identity := range s3cfg.Identities {
if identity.Name == username {
details := &UserDetails{
Username: username,
Actions: identity.Actions,
@ -243,60 +189,37 @@ func (s *AdminServer) GetObjectStoreUserDetails(username string) (*UserDetails,
return details, nil
}
}
return nil, fmt.Errorf("user %s not found", username)
}
// CreateAccessKey creates a new access key for a user
func (s *AdminServer) CreateAccessKey(username string) (*AccessKeyInfo, error) {
s3cfg := &iam_pb.S3ApiConfiguration{}
// Load existing configuration
err := s.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
var buf bytes.Buffer
if err := filer.ReadEntry(nil, client, filer.IamConfigDirectory, filer.IamIdentityFile, &buf); err != nil {
return err
}
if buf.Len() > 0 {
return filer.ParseS3ConfigurationFromBytes(buf.Bytes(), s3cfg)
}
return nil
})
if err != nil {
return nil, fmt.Errorf("failed to load IAM configuration: %v", err)
if s.credentialManager == nil {
return nil, fmt.Errorf("credential manager not available")
}
// Find user
var targetIdentity *iam_pb.Identity
for _, identity := range s3cfg.Identities {
if identity.Name == username {
targetIdentity = identity
break
}
}
ctx := context.Background()
if targetIdentity == nil {
// Check if user exists
_, err := s.credentialManager.GetUser(ctx, username)
if err != nil {
if err == credential.ErrUserNotFound {
return nil, fmt.Errorf("user %s not found", username)
}
return nil, fmt.Errorf("failed to get user: %v", err)
}
// Generate new access key
accessKey := generateAccessKey()
secretKey := generateSecretKey()
newCredential := &iam_pb.Credential{
credential := &iam_pb.Credential{
AccessKey: accessKey,
SecretKey: secretKey,
}
// Add to user's credentials
targetIdentity.Credentials = append(targetIdentity.Credentials, newCredential)
// Save configuration
err = s.saveS3Configuration(s3cfg)
// Create access key using credential manager
err = s.credentialManager.CreateAccessKey(ctx, username, credential)
if err != nil {
return nil, fmt.Errorf("failed to save IAM configuration: %v", err)
return nil, fmt.Errorf("failed to create access key: %v", err)
}
return &AccessKeyInfo{
@ -308,111 +231,79 @@ func (s *AdminServer) CreateAccessKey(username string) (*AccessKeyInfo, error) {
// DeleteAccessKey deletes an access key for a user
func (s *AdminServer) DeleteAccessKey(username, accessKeyId string) error {
s3cfg := &iam_pb.S3ApiConfiguration{}
// Load existing configuration
err := s.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
var buf bytes.Buffer
if err := filer.ReadEntry(nil, client, filer.IamConfigDirectory, filer.IamIdentityFile, &buf); err != nil {
return err
}
if buf.Len() > 0 {
return filer.ParseS3ConfigurationFromBytes(buf.Bytes(), s3cfg)
if s.credentialManager == nil {
return fmt.Errorf("credential manager not available")
}
return nil
})
if err != nil {
return fmt.Errorf("failed to load IAM configuration: %v", err)
}
ctx := context.Background()
// Find user and remove access key
for _, identity := range s3cfg.Identities {
if identity.Name == username {
for i, cred := range identity.Credentials {
if cred.AccessKey == accessKeyId {
identity.Credentials = append(identity.Credentials[:i], identity.Credentials[i+1:]...)
return s.saveS3Configuration(s3cfg)
}
// Delete access key using credential manager
err := s.credentialManager.DeleteAccessKey(ctx, username, accessKeyId)
if err != nil {
if err == credential.ErrUserNotFound {
return fmt.Errorf("user %s not found", username)
}
if err == credential.ErrAccessKeyNotFound {
return fmt.Errorf("access key %s not found for user %s", accessKeyId, username)
}
return fmt.Errorf("failed to delete access key: %v", err)
}
return fmt.Errorf("user %s not found", username)
return nil
}
// GetUserPolicies returns the policies for a user (actions)
func (s *AdminServer) GetUserPolicies(username string) ([]string, error) {
s3cfg := &iam_pb.S3ApiConfiguration{}
// Load existing configuration
err := s.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
var buf bytes.Buffer
if err := filer.ReadEntry(nil, client, filer.IamConfigDirectory, filer.IamIdentityFile, &buf); err != nil {
return err
}
if buf.Len() > 0 {
return filer.ParseS3ConfigurationFromBytes(buf.Bytes(), s3cfg)
if s.credentialManager == nil {
return nil, fmt.Errorf("credential manager not available")
}
return nil
})
if err != nil {
return nil, fmt.Errorf("failed to load IAM configuration: %v", err)
}
ctx := context.Background()
// Find user and return policies
for _, identity := range s3cfg.Identities {
if identity.Name == username {
return identity.Actions, nil
// Get user using credential manager
identity, err := s.credentialManager.GetUser(ctx, username)
if err != nil {
if err == credential.ErrUserNotFound {
return nil, fmt.Errorf("user %s not found", username)
}
return nil, fmt.Errorf("failed to get user: %v", err)
}
return nil, fmt.Errorf("user %s not found", username)
return identity.Actions, nil
}
// UpdateUserPolicies updates the policies (actions) for a user
func (s *AdminServer) UpdateUserPolicies(username string, actions []string) error {
s3cfg := &iam_pb.S3ApiConfiguration{}
// Load existing configuration
err := s.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
var buf bytes.Buffer
if err := filer.ReadEntry(nil, client, filer.IamConfigDirectory, filer.IamIdentityFile, &buf); err != nil {
return err
if s.credentialManager == nil {
return fmt.Errorf("credential manager not available")
}
if buf.Len() > 0 {
return filer.ParseS3ConfigurationFromBytes(buf.Bytes(), s3cfg)
}
return nil
})
if err != nil {
return fmt.Errorf("failed to load IAM configuration: %v", err)
}
ctx := context.Background()
// Find user and update policies
for _, identity := range s3cfg.Identities {
if identity.Name == username {
identity.Actions = actions
return s.saveS3Configuration(s3cfg)
// Get existing user
identity, err := s.credentialManager.GetUser(ctx, username)
if err != nil {
if err == credential.ErrUserNotFound {
return fmt.Errorf("user %s not found", username)
}
return fmt.Errorf("failed to get user: %v", err)
}
return fmt.Errorf("user %s not found", username)
// Create updated identity with new actions
updatedIdentity := &iam_pb.Identity{
Name: identity.Name,
Account: identity.Account,
Credentials: identity.Credentials,
Actions: actions,
}
// saveS3Configuration saves the S3 configuration to identity.json
func (s *AdminServer) saveS3Configuration(s3cfg *iam_pb.S3ApiConfiguration) error {
return s.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
var buf bytes.Buffer
if err := filer.ProtoToText(&buf, s3cfg); err != nil {
return fmt.Errorf("failed to marshal configuration: %v", err)
// Update user using credential manager
err = s.credentialManager.UpdateUser(ctx, username, updatedIdentity)
if err != nil {
return fmt.Errorf("failed to update user policies: %v", err)
}
return filer.SaveInsideFiler(client, filer.IamConfigDirectory, filer.IamIdentityFile, buf.Bytes())
})
return nil
}
// Helper functions for generating keys and IDs

9
weed/command/iam.go

@ -3,9 +3,10 @@ package command
import (
"context"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/util/version"
"net/http"
"github.com/seaweedfs/seaweedfs/weed/util/version"
"time"
"github.com/gorilla/mux"
@ -15,6 +16,12 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/security"
"github.com/seaweedfs/seaweedfs/weed/util"
// Import credential stores to register them
_ "github.com/seaweedfs/seaweedfs/weed/credential/filer_etc"
_ "github.com/seaweedfs/seaweedfs/weed/credential/memory"
_ "github.com/seaweedfs/seaweedfs/weed/credential/postgres"
_ "github.com/seaweedfs/seaweedfs/weed/credential/sqlite"
)
var (

3
weed/command/s3.go

@ -5,7 +5,6 @@ import (
"crypto/tls"
"crypto/x509"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/util/version"
"io/ioutil"
"net"
"net/http"
@ -14,6 +13,8 @@ import (
"strings"
"time"
"github.com/seaweedfs/seaweedfs/weed/util/version"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
"google.golang.org/grpc/credentials/tls/certprovider"
"google.golang.org/grpc/credentials/tls/certprovider/pemfile"

11
weed/command/scaffold.go

@ -2,9 +2,10 @@ package command
import (
"fmt"
"github.com/seaweedfs/seaweedfs/weed/util"
"path/filepath"
"github.com/seaweedfs/seaweedfs/weed/util"
"github.com/seaweedfs/seaweedfs/weed/command/scaffold"
)
@ -13,9 +14,9 @@ func init() {
}
var cmdScaffold = &Command{
UsageLine: "scaffold -config=[filer|notification|replication|security|master]",
UsageLine: "scaffold -config=[filer|notification|replication|security|master|shell|credential]",
Short: "generate basic configuration files",
Long: `Generate filer.toml with all possible configurations for you to customize.
Long: `Generate configuration files with all possible configurations for you to customize.
The options can also be overwritten by environment variables.
For example, the filer.toml mysql password can be overwritten by environment variable
@ -30,7 +31,7 @@ var cmdScaffold = &Command{
var (
outputPath = cmdScaffold.Flag.String("output", "", "if not empty, save the configuration file to this directory")
config = cmdScaffold.Flag.String("config", "filer", "[filer|notification|replication|security|master] the configuration file to generate")
config = cmdScaffold.Flag.String("config", "filer", "[filer|notification|replication|security|master|shell|credential] the configuration file to generate")
)
func runScaffold(cmd *Command, args []string) bool {
@ -49,6 +50,8 @@ func runScaffold(cmd *Command, args []string) bool {
content = scaffold.Master
case "shell":
content = scaffold.Shell
case "credential":
content = scaffold.Credential
}
if content == "" {
println("need a valid -config option")

55
weed/command/scaffold/credential.toml

@ -0,0 +1,55 @@
# Put this file to one of the location, with descending priority
# ./credential.toml
# $HOME/.seaweedfs/credential.toml
# /etc/seaweedfs/credential.toml
# this file is read by S3 API and IAM API servers
# Choose one of the credential stores below
# Only one store can be enabled at a time
# Filer-based credential store (default, uses existing filer storage)
[credential.filer_etc]
enabled = true
# filer address and grpc_dial_option will be automatically configured by the server
# SQLite credential store (recommended for single-node deployments)
[credential.sqlite]
enabled = false
file = "/var/lib/seaweedfs/credentials.db"
# Optional: table name prefix (default: "sw_")
table_prefix = "sw_"
# PostgreSQL credential store (recommended for multi-node deployments)
[credential.postgres]
enabled = false
hostname = "localhost"
port = 5432
username = "seaweedfs"
password = "your_password"
database = "seaweedfs"
schema = "public"
sslmode = "disable"
# Optional: table name prefix (default: "sw_")
table_prefix = "sw_"
# Connection pool settings
connection_max_idle = 10
connection_max_open = 100
connection_max_lifetime_seconds = 3600
# Memory credential store (for testing only, data is lost on restart)
[credential.memory]
enabled = false
# Environment variable overrides:
# Any configuration value can be overridden by environment variables
# Rules:
# * Prefix with "WEED_CREDENTIAL_"
# * Convert to uppercase
# * Replace '.' with '_'
#
# Examples:
# export WEED_CREDENTIAL_POSTGRES_PASSWORD=secret
# export WEED_CREDENTIAL_SQLITE_FILE=/custom/path/credentials.db
# export WEED_CREDENTIAL_POSTGRES_HOSTNAME=db.example.com
# export WEED_CREDENTIAL_FILER_ETC_ENABLED=true
# export WEED_CREDENTIAL_SQLITE_ENABLED=false

3
weed/command/scaffold/example.go

@ -19,3 +19,6 @@ var Master string
//go:embed shell.toml
var Shell string
//go:embed credential.toml
var Credential string

182
weed/credential/README.md

@ -0,0 +1,182 @@
# Credential Store Integration
This document shows how the credential store has been integrated into SeaweedFS's S3 API and IAM API components.
## Quick Start
1. **Generate credential configuration:**
```bash
weed scaffold -config=credential -output=.
```
2. **Edit credential.toml** to enable your preferred store (filer_etc is enabled by default)
3. **Start S3 API server** - it will automatically load credential.toml:
```bash
weed s3 -filer=localhost:8888
```
## Integration Overview
The credential store provides a pluggable backend for storing S3 identities and credentials, supporting:
- **Filer-based storage** (filer_etc) - Uses existing filer storage (default)
- **SQLite** - Local database storage
- **PostgreSQL** - Shared database for multiple servers
- **Memory** - In-memory storage for testing
## Configuration
### Using credential.toml
Generate the configuration template:
```bash
weed scaffold -config=credential
```
This creates a `credential.toml` file with all available options. The filer_etc store is enabled by default:
```toml
# Filer-based credential store (default, uses existing filer storage)
[credential.filer_etc]
enabled = true
# SQLite credential store (recommended for single-node deployments)
[credential.sqlite]
enabled = false
file = "/var/lib/seaweedfs/credentials.db"
# PostgreSQL credential store (recommended for multi-node deployments)
[credential.postgres]
enabled = false
hostname = "localhost"
port = 5432
username = "seaweedfs"
password = "your_password"
database = "seaweedfs"
# Memory credential store (for testing only, data is lost on restart)
[credential.memory]
enabled = false
```
The credential.toml file is automatically loaded from these locations (in priority order):
- `./credential.toml`
- `$HOME/.seaweedfs/credential.toml`
- `/etc/seaweedfs/credential.toml`
### Server Configuration
Both S3 API and IAM API servers automatically load credential.toml during startup. No additional configuration is required.
## Usage Examples
### Filer-based Store (Default)
```toml
[credential.filer_etc]
enabled = true
```
This uses the existing filer storage and is compatible with current deployments.
### SQLite Store
```toml
[credential.sqlite]
enabled = true
file = "/var/lib/seaweedfs/credentials.db"
table_prefix = "sw_"
```
### PostgreSQL Store
```toml
[credential.postgres]
enabled = true
hostname = "localhost"
port = 5432
username = "seaweedfs"
password = "your_password"
database = "seaweedfs"
schema = "public"
sslmode = "disable"
table_prefix = "sw_"
connection_max_idle = 10
connection_max_open = 100
connection_max_lifetime_seconds = 3600
```
### Memory Store (Testing)
```toml
[credential.memory]
enabled = true
```
## Environment Variables
All credential configuration can be overridden with environment variables:
```bash
# Override PostgreSQL password
export WEED_CREDENTIAL_POSTGRES_PASSWORD=secret
# Override SQLite file path
export WEED_CREDENTIAL_SQLITE_FILE=/custom/path/credentials.db
# Override PostgreSQL hostname
export WEED_CREDENTIAL_POSTGRES_HOSTNAME=db.example.com
# Enable/disable stores
export WEED_CREDENTIAL_FILER_ETC_ENABLED=true
export WEED_CREDENTIAL_SQLITE_ENABLED=false
```
Rules:
- Prefix with `WEED_CREDENTIAL_`
- Convert to uppercase
- Replace `.` with `_`
## Implementation Details
Components automatically load credential configuration during startup:
```go
// Server initialization
if credConfig, err := credential.LoadCredentialConfiguration(); err == nil && credConfig != nil {
credentialManager, err := credential.NewCredentialManager(
credConfig.Store,
credConfig.Config,
credConfig.Prefix,
)
if err != nil {
return nil, fmt.Errorf("failed to initialize credential manager: %v", err)
}
// Use credential manager for operations
}
```
## Benefits
1. **Easy Configuration** - Generate template with `weed scaffold -config=credential`
2. **Pluggable Storage** - Switch between filer_etc, SQLite, PostgreSQL without code changes
3. **Backward Compatibility** - Filer-based storage works with existing deployments
4. **Scalability** - Database stores support multiple concurrent servers
5. **Performance** - Database access can be faster than file-based storage
6. **Testing** - Memory store simplifies unit testing
7. **Environment Override** - All settings can be overridden with environment variables
## Error Handling
When a credential store is configured, it must initialize successfully or the server will fail to start:
```go
if credConfig != nil {
credentialManager, err = credential.NewCredentialManager(...)
if err != nil {
return nil, fmt.Errorf("failed to initialize credential manager: %v", err)
}
}
```
This ensures explicit configuration - if you configure a credential store, it must work properly.

133
weed/credential/config_loader.go

@ -0,0 +1,133 @@
package credential
import (
"fmt"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/util"
)
// CredentialConfig represents the credential configuration from credential.toml
type CredentialConfig struct {
Store string
Config util.Configuration
Prefix string
}
// LoadCredentialConfiguration loads credential configuration from credential.toml
// Returns the store type, configuration, and prefix for credential management
func LoadCredentialConfiguration() (*CredentialConfig, error) {
// Try to load credential.toml configuration
loaded := util.LoadConfiguration("credential", false)
if !loaded {
glog.V(1).Info("No credential.toml found, credential store disabled")
return nil, nil
}
viper := util.GetViper()
// Find which credential store is enabled
var enabledStore string
var storePrefix string
// Get available store types from registered stores
storeTypes := GetAvailableStores()
for _, storeType := range storeTypes {
key := fmt.Sprintf("credential.%s.enabled", string(storeType))
if viper.GetBool(key) {
if enabledStore != "" {
return nil, fmt.Errorf("multiple credential stores enabled: %s and %s. Only one store can be enabled", enabledStore, string(storeType))
}
enabledStore = string(storeType)
storePrefix = fmt.Sprintf("credential.%s.", string(storeType))
}
}
if enabledStore == "" {
glog.V(1).Info("No credential store enabled in credential.toml")
return nil, nil
}
glog.V(0).Infof("Loaded credential configuration: store=%s", enabledStore)
return &CredentialConfig{
Store: enabledStore,
Config: viper,
Prefix: storePrefix,
}, nil
}
// GetCredentialStoreConfig extracts credential store configuration from command line flags
// This is used when credential store is configured via command line instead of credential.toml
func GetCredentialStoreConfig(store string, config util.Configuration, prefix string) *CredentialConfig {
if store == "" {
return nil
}
return &CredentialConfig{
Store: store,
Config: config,
Prefix: prefix,
}
}
// MergeCredentialConfig merges command line credential config with credential.toml config
// Command line flags take priority over credential.toml
func MergeCredentialConfig(cmdLineStore string, cmdLineConfig util.Configuration, cmdLinePrefix string) (*CredentialConfig, error) {
// If command line credential store is specified, use it
if cmdLineStore != "" {
glog.V(0).Infof("Using command line credential configuration: store=%s", cmdLineStore)
return GetCredentialStoreConfig(cmdLineStore, cmdLineConfig, cmdLinePrefix), nil
}
// Otherwise, try to load from credential.toml
config, err := LoadCredentialConfiguration()
if err != nil {
return nil, err
}
if config == nil {
glog.V(1).Info("No credential store configured")
}
return config, nil
}
// NewCredentialManagerWithDefaults creates a credential manager with fallback to defaults
// If explicitStore is provided, it will be used regardless of credential.toml
// If explicitStore is empty, it tries credential.toml first, then defaults to "filer_etc"
func NewCredentialManagerWithDefaults(explicitStore CredentialStoreTypeName) (*CredentialManager, error) {
var storeName CredentialStoreTypeName
var config util.Configuration
var prefix string
// If explicit store is provided, use it
if explicitStore != "" {
storeName = explicitStore
config = nil
prefix = ""
glog.V(0).Infof("Using explicit credential store: %s", storeName)
} else {
// Try to load from credential.toml first
if credConfig, err := LoadCredentialConfiguration(); err == nil && credConfig != nil {
storeName = CredentialStoreTypeName(credConfig.Store)
config = credConfig.Config
prefix = credConfig.Prefix
glog.V(0).Infof("Loaded credential configuration from credential.toml: store=%s", storeName)
} else {
// Default to filer_etc store
storeName = StoreTypeFilerEtc
config = nil
prefix = ""
glog.V(1).Info("No credential.toml found, defaulting to filer_etc store")
}
}
// Create the credential manager
credentialManager, err := NewCredentialManager(storeName, config, prefix)
if err != nil {
return nil, fmt.Errorf("failed to initialize credential manager with store '%s': %v", storeName, err)
}
return credentialManager, nil
}

125
weed/credential/credential_manager.go

@ -0,0 +1,125 @@
package credential
import (
"context"
"fmt"
"strings"
"github.com/seaweedfs/seaweedfs/weed/pb/iam_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
)
// CredentialManager manages user credentials using a configurable store
type CredentialManager struct {
store CredentialStore
}
// NewCredentialManager creates a new credential manager with the specified store
func NewCredentialManager(storeName CredentialStoreTypeName, configuration util.Configuration, prefix string) (*CredentialManager, error) {
var store CredentialStore
// Find the requested store implementation
for _, s := range Stores {
if s.GetName() == storeName {
store = s
break
}
}
if store == nil {
return nil, fmt.Errorf("credential store '%s' not found. Available stores: %s",
storeName, getAvailableStores())
}
// Initialize the store
if err := store.Initialize(configuration, prefix); err != nil {
return nil, fmt.Errorf("failed to initialize credential store '%s': %v", storeName, err)
}
return &CredentialManager{
store: store,
}, nil
}
// GetStore returns the underlying credential store
func (cm *CredentialManager) GetStore() CredentialStore {
return cm.store
}
// LoadConfiguration loads the S3 API configuration
func (cm *CredentialManager) LoadConfiguration(ctx context.Context) (*iam_pb.S3ApiConfiguration, error) {
return cm.store.LoadConfiguration(ctx)
}
// SaveConfiguration saves the S3 API configuration
func (cm *CredentialManager) SaveConfiguration(ctx context.Context, config *iam_pb.S3ApiConfiguration) error {
return cm.store.SaveConfiguration(ctx, config)
}
// CreateUser creates a new user
func (cm *CredentialManager) CreateUser(ctx context.Context, identity *iam_pb.Identity) error {
return cm.store.CreateUser(ctx, identity)
}
// GetUser retrieves a user by username
func (cm *CredentialManager) GetUser(ctx context.Context, username string) (*iam_pb.Identity, error) {
return cm.store.GetUser(ctx, username)
}
// UpdateUser updates an existing user
func (cm *CredentialManager) UpdateUser(ctx context.Context, username string, identity *iam_pb.Identity) error {
return cm.store.UpdateUser(ctx, username, identity)
}
// DeleteUser removes a user
func (cm *CredentialManager) DeleteUser(ctx context.Context, username string) error {
return cm.store.DeleteUser(ctx, username)
}
// ListUsers returns all usernames
func (cm *CredentialManager) ListUsers(ctx context.Context) ([]string, error) {
return cm.store.ListUsers(ctx)
}
// GetUserByAccessKey retrieves a user by access key
func (cm *CredentialManager) GetUserByAccessKey(ctx context.Context, accessKey string) (*iam_pb.Identity, error) {
return cm.store.GetUserByAccessKey(ctx, accessKey)
}
// CreateAccessKey creates a new access key for a user
func (cm *CredentialManager) CreateAccessKey(ctx context.Context, username string, credential *iam_pb.Credential) error {
return cm.store.CreateAccessKey(ctx, username, credential)
}
// DeleteAccessKey removes an access key for a user
func (cm *CredentialManager) DeleteAccessKey(ctx context.Context, username string, accessKey string) error {
return cm.store.DeleteAccessKey(ctx, username, accessKey)
}
// Shutdown performs cleanup
func (cm *CredentialManager) Shutdown() {
if cm.store != nil {
cm.store.Shutdown()
}
}
// getAvailableStores returns a comma-separated list of available store names
func getAvailableStores() string {
var storeNames []string
for _, store := range Stores {
storeNames = append(storeNames, string(store.GetName()))
}
return strings.Join(storeNames, ", ")
}
// GetAvailableStores returns a list of available credential store names
func GetAvailableStores() []CredentialStoreTypeName {
var storeNames []CredentialStoreTypeName
for _, store := range Stores {
storeNames = append(storeNames, store.GetName())
}
if storeNames == nil {
return []CredentialStoreTypeName{}
}
return storeNames
}

91
weed/credential/credential_store.go

@ -0,0 +1,91 @@
package credential
import (
"context"
"errors"
"time"
"github.com/seaweedfs/seaweedfs/weed/pb/iam_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
)
var (
ErrUserNotFound = errors.New("user not found")
ErrUserAlreadyExists = errors.New("user already exists")
ErrAccessKeyNotFound = errors.New("access key not found")
)
// CredentialStoreTypeName represents the type name of a credential store
type CredentialStoreTypeName string
// Credential store name constants
const (
StoreTypeMemory CredentialStoreTypeName = "memory"
StoreTypeFilerEtc CredentialStoreTypeName = "filer_etc"
StoreTypePostgres CredentialStoreTypeName = "postgres"
StoreTypeSQLite CredentialStoreTypeName = "sqlite"
)
// CredentialStore defines the interface for user credential storage and retrieval
type CredentialStore interface {
// GetName returns the name of the credential store implementation
GetName() CredentialStoreTypeName
// Initialize initializes the credential store with configuration
Initialize(configuration util.Configuration, prefix string) error
// LoadConfiguration loads the entire S3 API configuration
LoadConfiguration(ctx context.Context) (*iam_pb.S3ApiConfiguration, error)
// SaveConfiguration saves the entire S3 API configuration
SaveConfiguration(ctx context.Context, config *iam_pb.S3ApiConfiguration) error
// CreateUser creates a new user with the given identity
CreateUser(ctx context.Context, identity *iam_pb.Identity) error
// GetUser retrieves a user by username
GetUser(ctx context.Context, username string) (*iam_pb.Identity, error)
// UpdateUser updates an existing user
UpdateUser(ctx context.Context, username string, identity *iam_pb.Identity) error
// DeleteUser removes a user by username
DeleteUser(ctx context.Context, username string) error
// ListUsers returns all usernames
ListUsers(ctx context.Context) ([]string, error)
// GetUserByAccessKey retrieves a user by access key
GetUserByAccessKey(ctx context.Context, accessKey string) (*iam_pb.Identity, error)
// CreateAccessKey creates a new access key for a user
CreateAccessKey(ctx context.Context, username string, credential *iam_pb.Credential) error
// DeleteAccessKey removes an access key for a user
DeleteAccessKey(ctx context.Context, username string, accessKey string) error
// Shutdown performs cleanup when the store is being shut down
Shutdown()
}
// AccessKeyInfo represents access key information with metadata
type AccessKeyInfo struct {
AccessKey string `json:"accessKey"`
SecretKey string `json:"secretKey"`
Username string `json:"username"`
CreatedAt time.Time `json:"createdAt"`
}
// UserCredentials represents a user's credentials and metadata
type UserCredentials struct {
Username string `json:"username"`
Email string `json:"email"`
Account *iam_pb.Account `json:"account,omitempty"`
Credentials []*iam_pb.Credential `json:"credentials"`
Actions []string `json:"actions"`
CreatedAt time.Time `json:"createdAt"`
UpdatedAt time.Time `json:"updatedAt"`
}
// Stores holds all available credential store implementations
var Stores []CredentialStore

353
weed/credential/credential_test.go

@ -0,0 +1,353 @@
package credential
import (
"context"
"testing"
"github.com/seaweedfs/seaweedfs/weed/pb/iam_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
)
func TestCredentialStoreInterface(t *testing.T) {
// Note: This test may fail if run without importing store packages
// For full integration testing, see the test/ package
if len(Stores) == 0 {
t.Skip("No credential stores registered - this is expected when testing the base package without store imports")
}
// Check that expected stores are available
storeNames := GetAvailableStores()
expectedStores := []string{string(StoreTypeFilerEtc), string(StoreTypeMemory)}
// Add SQLite and PostgreSQL if they're available (build tags dependent)
for _, storeName := range storeNames {
found := false
for _, expected := range append(expectedStores, string(StoreTypeSQLite), string(StoreTypePostgres)) {
if string(storeName) == expected {
found = true
break
}
}
if !found {
t.Errorf("Unexpected store found: %s", storeName)
}
}
// Test that filer_etc store is always available
filerEtcStoreFound := false
memoryStoreFound := false
for _, storeName := range storeNames {
if string(storeName) == string(StoreTypeFilerEtc) {
filerEtcStoreFound = true
}
if string(storeName) == string(StoreTypeMemory) {
memoryStoreFound = true
}
}
if !filerEtcStoreFound {
t.Error("FilerEtc store should always be available")
}
if !memoryStoreFound {
t.Error("Memory store should always be available")
}
}
func TestCredentialManagerCreation(t *testing.T) {
config := util.GetViper()
// Test creating credential manager with invalid store
_, err := NewCredentialManager(CredentialStoreTypeName("nonexistent"), config, "test.")
if err == nil {
t.Error("Expected error for nonexistent store")
}
// Skip store-specific tests if no stores are registered
if len(Stores) == 0 {
t.Skip("No credential stores registered - skipping store-specific tests")
}
// Test creating credential manager with available stores
availableStores := GetAvailableStores()
if len(availableStores) == 0 {
t.Skip("No stores available for testing")
}
// Test with the first available store
storeName := availableStores[0]
cm, err := NewCredentialManager(storeName, config, "test.")
if err != nil {
t.Fatalf("Failed to create credential manager with store %s: %v", storeName, err)
}
if cm == nil {
t.Error("Credential manager should not be nil")
}
defer cm.Shutdown()
// Test that the store is of the correct type
if cm.GetStore().GetName() != storeName {
t.Errorf("Expected %s store, got %s", storeName, cm.GetStore().GetName())
}
}
func TestCredentialInterface(t *testing.T) {
// Skip if no stores are registered
if len(Stores) == 0 {
t.Skip("No credential stores registered - for full testing see test/ package")
}
// Test the interface with the first available store
availableStores := GetAvailableStores()
if len(availableStores) == 0 {
t.Skip("No stores available for testing")
}
testCredentialInterfaceWithStore(t, availableStores[0])
}
func testCredentialInterfaceWithStore(t *testing.T, storeName CredentialStoreTypeName) {
// Create a test identity
testIdentity := &iam_pb.Identity{
Name: "testuser",
Actions: []string{"Read", "Write"},
Account: &iam_pb.Account{
Id: "123456789012",
DisplayName: "Test User",
EmailAddress: "test@example.com",
},
Credentials: []*iam_pb.Credential{
{
AccessKey: "AKIAIOSFODNN7EXAMPLE",
SecretKey: "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
},
},
}
// Test the interface methods exist (compile-time check)
config := util.GetViper()
cm, err := NewCredentialManager(storeName, config, "test.")
if err != nil {
t.Fatalf("Failed to create credential manager: %v", err)
}
defer cm.Shutdown()
ctx := context.Background()
// Test LoadConfiguration
_, err = cm.LoadConfiguration(ctx)
if err != nil {
t.Fatalf("LoadConfiguration failed: %v", err)
}
// Test CreateUser
err = cm.CreateUser(ctx, testIdentity)
if err != nil {
t.Fatalf("CreateUser failed: %v", err)
}
// Test GetUser
user, err := cm.GetUser(ctx, "testuser")
if err != nil {
t.Fatalf("GetUser failed: %v", err)
}
if user.Name != "testuser" {
t.Errorf("Expected user name 'testuser', got %s", user.Name)
}
// Test ListUsers
users, err := cm.ListUsers(ctx)
if err != nil {
t.Fatalf("ListUsers failed: %v", err)
}
if len(users) != 1 || users[0] != "testuser" {
t.Errorf("Expected ['testuser'], got %v", users)
}
// Test GetUserByAccessKey
userByKey, err := cm.GetUserByAccessKey(ctx, "AKIAIOSFODNN7EXAMPLE")
if err != nil {
t.Fatalf("GetUserByAccessKey failed: %v", err)
}
if userByKey.Name != "testuser" {
t.Errorf("Expected user name 'testuser', got %s", userByKey.Name)
}
}
func TestCredentialManagerIntegration(t *testing.T) {
// Skip if no stores are registered
if len(Stores) == 0 {
t.Skip("No credential stores registered - for full testing see test/ package")
}
// Test with the first available store
availableStores := GetAvailableStores()
if len(availableStores) == 0 {
t.Skip("No stores available for testing")
}
storeName := availableStores[0]
config := util.GetViper()
cm, err := NewCredentialManager(storeName, config, "test.")
if err != nil {
t.Fatalf("Failed to create credential manager: %v", err)
}
defer cm.Shutdown()
ctx := context.Background()
// Test complete workflow
user1 := &iam_pb.Identity{
Name: "user1",
Actions: []string{"Read"},
Account: &iam_pb.Account{
Id: "111111111111",
DisplayName: "User One",
EmailAddress: "user1@example.com",
},
Credentials: []*iam_pb.Credential{
{
AccessKey: "AKIAUSER1",
SecretKey: "secret1",
},
},
}
user2 := &iam_pb.Identity{
Name: "user2",
Actions: []string{"Write"},
Account: &iam_pb.Account{
Id: "222222222222",
DisplayName: "User Two",
EmailAddress: "user2@example.com",
},
Credentials: []*iam_pb.Credential{
{
AccessKey: "AKIAUSER2",
SecretKey: "secret2",
},
},
}
// Create users
err = cm.CreateUser(ctx, user1)
if err != nil {
t.Fatalf("Failed to create user1: %v", err)
}
err = cm.CreateUser(ctx, user2)
if err != nil {
t.Fatalf("Failed to create user2: %v", err)
}
// List users
users, err := cm.ListUsers(ctx)
if err != nil {
t.Fatalf("Failed to list users: %v", err)
}
if len(users) != 2 {
t.Errorf("Expected 2 users, got %d", len(users))
}
// Test access key lookup
foundUser, err := cm.GetUserByAccessKey(ctx, "AKIAUSER1")
if err != nil {
t.Fatalf("Failed to get user by access key: %v", err)
}
if foundUser.Name != "user1" {
t.Errorf("Expected user1, got %s", foundUser.Name)
}
// Delete user
err = cm.DeleteUser(ctx, "user1")
if err != nil {
t.Fatalf("Failed to delete user: %v", err)
}
// Verify user is deleted
_, err = cm.GetUser(ctx, "user1")
if err != ErrUserNotFound {
t.Errorf("Expected ErrUserNotFound, got %v", err)
}
// Clean up
err = cm.DeleteUser(ctx, "user2")
if err != nil {
t.Fatalf("Failed to delete user2: %v", err)
}
}
// TestErrorTypes tests that the custom error types are defined correctly
func TestErrorTypes(t *testing.T) {
// Test that error types are defined
if ErrUserNotFound == nil {
t.Error("ErrUserNotFound should be defined")
}
if ErrUserAlreadyExists == nil {
t.Error("ErrUserAlreadyExists should be defined")
}
if ErrAccessKeyNotFound == nil {
t.Error("ErrAccessKeyNotFound should be defined")
}
// Test error messages
if ErrUserNotFound.Error() != "user not found" {
t.Errorf("Expected 'user not found', got '%s'", ErrUserNotFound.Error())
}
if ErrUserAlreadyExists.Error() != "user already exists" {
t.Errorf("Expected 'user already exists', got '%s'", ErrUserAlreadyExists.Error())
}
if ErrAccessKeyNotFound.Error() != "access key not found" {
t.Errorf("Expected 'access key not found', got '%s'", ErrAccessKeyNotFound.Error())
}
}
// TestGetAvailableStores tests the store discovery function
func TestGetAvailableStores(t *testing.T) {
stores := GetAvailableStores()
if len(stores) == 0 {
t.Skip("No stores available for testing")
}
// Convert to strings for comparison
storeNames := make([]string, len(stores))
for i, store := range stores {
storeNames[i] = string(store)
}
t.Logf("Available stores: %v (count: %d)", storeNames, len(storeNames))
// We expect at least memory and filer_etc stores to be available
expectedStores := []string{string(StoreTypeFilerEtc), string(StoreTypeMemory)}
// Add SQLite and PostgreSQL if they're available (build tags dependent)
for _, storeName := range storeNames {
found := false
for _, expected := range append(expectedStores, string(StoreTypeSQLite), string(StoreTypePostgres)) {
if storeName == expected {
found = true
break
}
}
if !found {
t.Errorf("Unexpected store found: %s", storeName)
}
}
// Test that filer_etc store is always available
filerEtcStoreFound := false
memoryStoreFound := false
for _, storeName := range storeNames {
if storeName == string(StoreTypeFilerEtc) {
filerEtcStoreFound = true
}
if storeName == string(StoreTypeMemory) {
memoryStoreFound = true
}
}
if !filerEtcStoreFound {
t.Error("FilerEtc store should always be available")
}
if !memoryStoreFound {
t.Error("Memory store should always be available")
}
}

235
weed/credential/filer_etc/filer_etc_store.go

@ -0,0 +1,235 @@
package filer_etc
import (
"bytes"
"context"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/credential"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/iam_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
"google.golang.org/grpc"
)
func init() {
credential.Stores = append(credential.Stores, &FilerEtcStore{})
}
// FilerEtcStore implements CredentialStore using SeaweedFS filer for storage
type FilerEtcStore struct {
filerGrpcAddress string
grpcDialOption grpc.DialOption
}
func (store *FilerEtcStore) GetName() credential.CredentialStoreTypeName {
return credential.StoreTypeFilerEtc
}
func (store *FilerEtcStore) Initialize(configuration util.Configuration, prefix string) error {
// Handle nil configuration gracefully
if configuration != nil {
store.filerGrpcAddress = configuration.GetString(prefix + "filer")
// TODO: Initialize grpcDialOption based on configuration
}
// Note: filerGrpcAddress can be set later via SetFilerClient method
return nil
}
// SetFilerClient sets the filer client details for the file store
func (store *FilerEtcStore) SetFilerClient(filerAddress string, grpcDialOption grpc.DialOption) {
store.filerGrpcAddress = filerAddress
store.grpcDialOption = grpcDialOption
}
// withFilerClient executes a function with a filer client
func (store *FilerEtcStore) withFilerClient(fn func(client filer_pb.SeaweedFilerClient) error) error {
if store.filerGrpcAddress == "" {
return fmt.Errorf("filer address not configured")
}
// Use the pb.WithGrpcFilerClient helper similar to existing code
return pb.WithGrpcFilerClient(false, 0, pb.ServerAddress(store.filerGrpcAddress), store.grpcDialOption, fn)
}
func (store *FilerEtcStore) LoadConfiguration(ctx context.Context) (*iam_pb.S3ApiConfiguration, error) {
s3cfg := &iam_pb.S3ApiConfiguration{}
err := store.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
var buf bytes.Buffer
if err := filer.ReadEntry(nil, client, filer.IamConfigDirectory, filer.IamIdentityFile, &buf); err != nil {
if err != filer_pb.ErrNotFound {
return err
}
}
if buf.Len() > 0 {
return filer.ParseS3ConfigurationFromBytes(buf.Bytes(), s3cfg)
}
return nil
})
return s3cfg, err
}
func (store *FilerEtcStore) SaveConfiguration(ctx context.Context, config *iam_pb.S3ApiConfiguration) error {
return store.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
var buf bytes.Buffer
if err := filer.ProtoToText(&buf, config); err != nil {
return fmt.Errorf("failed to marshal configuration: %v", err)
}
return filer.SaveInsideFiler(client, filer.IamConfigDirectory, filer.IamIdentityFile, buf.Bytes())
})
}
func (store *FilerEtcStore) CreateUser(ctx context.Context, identity *iam_pb.Identity) error {
// Load existing configuration
config, err := store.LoadConfiguration(ctx)
if err != nil {
return fmt.Errorf("failed to load configuration: %v", err)
}
// Check if user already exists
for _, existingIdentity := range config.Identities {
if existingIdentity.Name == identity.Name {
return credential.ErrUserAlreadyExists
}
}
// Add new identity
config.Identities = append(config.Identities, identity)
// Save configuration
return store.SaveConfiguration(ctx, config)
}
func (store *FilerEtcStore) GetUser(ctx context.Context, username string) (*iam_pb.Identity, error) {
config, err := store.LoadConfiguration(ctx)
if err != nil {
return nil, fmt.Errorf("failed to load configuration: %v", err)
}
for _, identity := range config.Identities {
if identity.Name == username {
return identity, nil
}
}
return nil, credential.ErrUserNotFound
}
func (store *FilerEtcStore) UpdateUser(ctx context.Context, username string, identity *iam_pb.Identity) error {
config, err := store.LoadConfiguration(ctx)
if err != nil {
return fmt.Errorf("failed to load configuration: %v", err)
}
// Find and update the user
for i, existingIdentity := range config.Identities {
if existingIdentity.Name == username {
config.Identities[i] = identity
return store.SaveConfiguration(ctx, config)
}
}
return credential.ErrUserNotFound
}
func (store *FilerEtcStore) DeleteUser(ctx context.Context, username string) error {
config, err := store.LoadConfiguration(ctx)
if err != nil {
return fmt.Errorf("failed to load configuration: %v", err)
}
// Find and remove the user
for i, identity := range config.Identities {
if identity.Name == username {
config.Identities = append(config.Identities[:i], config.Identities[i+1:]...)
return store.SaveConfiguration(ctx, config)
}
}
return credential.ErrUserNotFound
}
func (store *FilerEtcStore) ListUsers(ctx context.Context) ([]string, error) {
config, err := store.LoadConfiguration(ctx)
if err != nil {
return nil, fmt.Errorf("failed to load configuration: %v", err)
}
var usernames []string
for _, identity := range config.Identities {
usernames = append(usernames, identity.Name)
}
return usernames, nil
}
func (store *FilerEtcStore) GetUserByAccessKey(ctx context.Context, accessKey string) (*iam_pb.Identity, error) {
config, err := store.LoadConfiguration(ctx)
if err != nil {
return nil, fmt.Errorf("failed to load configuration: %v", err)
}
for _, identity := range config.Identities {
for _, credential := range identity.Credentials {
if credential.AccessKey == accessKey {
return identity, nil
}
}
}
return nil, credential.ErrAccessKeyNotFound
}
func (store *FilerEtcStore) CreateAccessKey(ctx context.Context, username string, cred *iam_pb.Credential) error {
config, err := store.LoadConfiguration(ctx)
if err != nil {
return fmt.Errorf("failed to load configuration: %v", err)
}
// Find the user and add the credential
for _, identity := range config.Identities {
if identity.Name == username {
// Check if access key already exists
for _, existingCred := range identity.Credentials {
if existingCred.AccessKey == cred.AccessKey {
return fmt.Errorf("access key %s already exists", cred.AccessKey)
}
}
identity.Credentials = append(identity.Credentials, cred)
return store.SaveConfiguration(ctx, config)
}
}
return credential.ErrUserNotFound
}
func (store *FilerEtcStore) DeleteAccessKey(ctx context.Context, username string, accessKey string) error {
config, err := store.LoadConfiguration(ctx)
if err != nil {
return fmt.Errorf("failed to load configuration: %v", err)
}
// Find the user and remove the credential
for _, identity := range config.Identities {
if identity.Name == username {
for i, cred := range identity.Credentials {
if cred.AccessKey == accessKey {
identity.Credentials = append(identity.Credentials[:i], identity.Credentials[i+1:]...)
return store.SaveConfiguration(ctx, config)
}
}
return credential.ErrAccessKeyNotFound
}
}
return credential.ErrUserNotFound
}
func (store *FilerEtcStore) Shutdown() {
// No cleanup needed for file store
}

373
weed/credential/memory/memory_store.go

@ -0,0 +1,373 @@
package memory
import (
"context"
"encoding/json"
"fmt"
"sync"
"github.com/seaweedfs/seaweedfs/weed/credential"
"github.com/seaweedfs/seaweedfs/weed/pb/iam_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
)
func init() {
credential.Stores = append(credential.Stores, &MemoryStore{})
}
// MemoryStore implements CredentialStore using in-memory storage
// This is primarily intended for testing purposes
type MemoryStore struct {
mu sync.RWMutex
users map[string]*iam_pb.Identity // username -> identity
accessKeys map[string]string // access_key -> username
initialized bool
}
func (store *MemoryStore) GetName() credential.CredentialStoreTypeName {
return credential.StoreTypeMemory
}
func (store *MemoryStore) Initialize(configuration util.Configuration, prefix string) error {
store.mu.Lock()
defer store.mu.Unlock()
if store.initialized {
return nil
}
store.users = make(map[string]*iam_pb.Identity)
store.accessKeys = make(map[string]string)
store.initialized = true
return nil
}
func (store *MemoryStore) LoadConfiguration(ctx context.Context) (*iam_pb.S3ApiConfiguration, error) {
store.mu.RLock()
defer store.mu.RUnlock()
if !store.initialized {
return nil, fmt.Errorf("store not initialized")
}
config := &iam_pb.S3ApiConfiguration{}
// Convert all users to identities
for _, user := range store.users {
// Deep copy the identity to avoid mutation issues
identityCopy := store.deepCopyIdentity(user)
config.Identities = append(config.Identities, identityCopy)
}
return config, nil
}
func (store *MemoryStore) SaveConfiguration(ctx context.Context, config *iam_pb.S3ApiConfiguration) error {
store.mu.Lock()
defer store.mu.Unlock()
if !store.initialized {
return fmt.Errorf("store not initialized")
}
// Clear existing data
store.users = make(map[string]*iam_pb.Identity)
store.accessKeys = make(map[string]string)
// Add all identities
for _, identity := range config.Identities {
// Deep copy to avoid mutation issues
identityCopy := store.deepCopyIdentity(identity)
store.users[identity.Name] = identityCopy
// Index access keys
for _, credential := range identity.Credentials {
store.accessKeys[credential.AccessKey] = identity.Name
}
}
return nil
}
func (store *MemoryStore) CreateUser(ctx context.Context, identity *iam_pb.Identity) error {
store.mu.Lock()
defer store.mu.Unlock()
if !store.initialized {
return fmt.Errorf("store not initialized")
}
if _, exists := store.users[identity.Name]; exists {
return credential.ErrUserAlreadyExists
}
// Check for duplicate access keys
for _, cred := range identity.Credentials {
if _, exists := store.accessKeys[cred.AccessKey]; exists {
return fmt.Errorf("access key %s already exists", cred.AccessKey)
}
}
// Deep copy to avoid mutation issues
identityCopy := store.deepCopyIdentity(identity)
store.users[identity.Name] = identityCopy
// Index access keys
for _, cred := range identity.Credentials {
store.accessKeys[cred.AccessKey] = identity.Name
}
return nil
}
func (store *MemoryStore) GetUser(ctx context.Context, username string) (*iam_pb.Identity, error) {
store.mu.RLock()
defer store.mu.RUnlock()
if !store.initialized {
return nil, fmt.Errorf("store not initialized")
}
user, exists := store.users[username]
if !exists {
return nil, credential.ErrUserNotFound
}
// Return a deep copy to avoid mutation issues
return store.deepCopyIdentity(user), nil
}
func (store *MemoryStore) UpdateUser(ctx context.Context, username string, identity *iam_pb.Identity) error {
store.mu.Lock()
defer store.mu.Unlock()
if !store.initialized {
return fmt.Errorf("store not initialized")
}
existingUser, exists := store.users[username]
if !exists {
return credential.ErrUserNotFound
}
// Remove old access keys from index
for _, cred := range existingUser.Credentials {
delete(store.accessKeys, cred.AccessKey)
}
// Check for duplicate access keys (excluding current user)
for _, cred := range identity.Credentials {
if existingUsername, exists := store.accessKeys[cred.AccessKey]; exists && existingUsername != username {
return fmt.Errorf("access key %s already exists", cred.AccessKey)
}
}
// Deep copy to avoid mutation issues
identityCopy := store.deepCopyIdentity(identity)
store.users[username] = identityCopy
// Re-index access keys
for _, cred := range identity.Credentials {
store.accessKeys[cred.AccessKey] = username
}
return nil
}
func (store *MemoryStore) DeleteUser(ctx context.Context, username string) error {
store.mu.Lock()
defer store.mu.Unlock()
if !store.initialized {
return fmt.Errorf("store not initialized")
}
user, exists := store.users[username]
if !exists {
return credential.ErrUserNotFound
}
// Remove access keys from index
for _, cred := range user.Credentials {
delete(store.accessKeys, cred.AccessKey)
}
// Remove user
delete(store.users, username)
return nil
}
func (store *MemoryStore) ListUsers(ctx context.Context) ([]string, error) {
store.mu.RLock()
defer store.mu.RUnlock()
if !store.initialized {
return nil, fmt.Errorf("store not initialized")
}
var usernames []string
for username := range store.users {
usernames = append(usernames, username)
}
return usernames, nil
}
func (store *MemoryStore) GetUserByAccessKey(ctx context.Context, accessKey string) (*iam_pb.Identity, error) {
store.mu.RLock()
defer store.mu.RUnlock()
if !store.initialized {
return nil, fmt.Errorf("store not initialized")
}
username, exists := store.accessKeys[accessKey]
if !exists {
return nil, credential.ErrAccessKeyNotFound
}
user, exists := store.users[username]
if !exists {
// This should not happen, but handle it gracefully
return nil, credential.ErrUserNotFound
}
// Return a deep copy to avoid mutation issues
return store.deepCopyIdentity(user), nil
}
func (store *MemoryStore) CreateAccessKey(ctx context.Context, username string, cred *iam_pb.Credential) error {
store.mu.Lock()
defer store.mu.Unlock()
if !store.initialized {
return fmt.Errorf("store not initialized")
}
user, exists := store.users[username]
if !exists {
return credential.ErrUserNotFound
}
// Check if access key already exists
if _, exists := store.accessKeys[cred.AccessKey]; exists {
return fmt.Errorf("access key %s already exists", cred.AccessKey)
}
// Add credential to user
user.Credentials = append(user.Credentials, &iam_pb.Credential{
AccessKey: cred.AccessKey,
SecretKey: cred.SecretKey,
})
// Index the access key
store.accessKeys[cred.AccessKey] = username
return nil
}
func (store *MemoryStore) DeleteAccessKey(ctx context.Context, username string, accessKey string) error {
store.mu.Lock()
defer store.mu.Unlock()
if !store.initialized {
return fmt.Errorf("store not initialized")
}
user, exists := store.users[username]
if !exists {
return credential.ErrUserNotFound
}
// Find and remove the credential
var newCredentials []*iam_pb.Credential
found := false
for _, cred := range user.Credentials {
if cred.AccessKey == accessKey {
found = true
// Remove from access key index
delete(store.accessKeys, accessKey)
} else {
newCredentials = append(newCredentials, cred)
}
}
if !found {
return credential.ErrAccessKeyNotFound
}
user.Credentials = newCredentials
return nil
}
func (store *MemoryStore) Shutdown() {
store.mu.Lock()
defer store.mu.Unlock()
// Clear all data
store.users = nil
store.accessKeys = nil
store.initialized = false
}
// deepCopyIdentity creates a deep copy of an identity to avoid mutation issues
func (store *MemoryStore) deepCopyIdentity(identity *iam_pb.Identity) *iam_pb.Identity {
if identity == nil {
return nil
}
// Use JSON marshaling/unmarshaling for deep copy
// This is simple and safe for protobuf messages
data, err := json.Marshal(identity)
if err != nil {
// Fallback to shallow copy if JSON fails
return &iam_pb.Identity{
Name: identity.Name,
Account: identity.Account,
Credentials: identity.Credentials,
Actions: identity.Actions,
}
}
var copy iam_pb.Identity
if err := json.Unmarshal(data, &copy); err != nil {
// Fallback to shallow copy if JSON fails
return &iam_pb.Identity{
Name: identity.Name,
Account: identity.Account,
Credentials: identity.Credentials,
Actions: identity.Actions,
}
}
return &copy
}
// Reset clears all data in the store (useful for testing)
func (store *MemoryStore) Reset() {
store.mu.Lock()
defer store.mu.Unlock()
if store.initialized {
store.users = make(map[string]*iam_pb.Identity)
store.accessKeys = make(map[string]string)
}
}
// GetUserCount returns the number of users in the store (useful for testing)
func (store *MemoryStore) GetUserCount() int {
store.mu.RLock()
defer store.mu.RUnlock()
return len(store.users)
}
// GetAccessKeyCount returns the number of access keys in the store (useful for testing)
func (store *MemoryStore) GetAccessKeyCount() int {
store.mu.RLock()
defer store.mu.RUnlock()
return len(store.accessKeys)
}

315
weed/credential/memory/memory_store_test.go

@ -0,0 +1,315 @@
package memory
import (
"context"
"fmt"
"testing"
"github.com/seaweedfs/seaweedfs/weed/credential"
"github.com/seaweedfs/seaweedfs/weed/pb/iam_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
)
func TestMemoryStore(t *testing.T) {
store := &MemoryStore{}
// Test initialization
config := util.GetViper()
if err := store.Initialize(config, "credential."); err != nil {
t.Fatalf("Failed to initialize store: %v", err)
}
ctx := context.Background()
// Test creating a user
identity := &iam_pb.Identity{
Name: "testuser",
Credentials: []*iam_pb.Credential{
{
AccessKey: "access123",
SecretKey: "secret123",
},
},
}
if err := store.CreateUser(ctx, identity); err != nil {
t.Fatalf("Failed to create user: %v", err)
}
// Test getting user
retrievedUser, err := store.GetUser(ctx, "testuser")
if err != nil {
t.Fatalf("Failed to get user: %v", err)
}
if retrievedUser.Name != "testuser" {
t.Errorf("Expected username 'testuser', got '%s'", retrievedUser.Name)
}
if len(retrievedUser.Credentials) != 1 {
t.Errorf("Expected 1 credential, got %d", len(retrievedUser.Credentials))
}
// Test getting user by access key
userByAccessKey, err := store.GetUserByAccessKey(ctx, "access123")
if err != nil {
t.Fatalf("Failed to get user by access key: %v", err)
}
if userByAccessKey.Name != "testuser" {
t.Errorf("Expected username 'testuser', got '%s'", userByAccessKey.Name)
}
// Test listing users
users, err := store.ListUsers(ctx)
if err != nil {
t.Fatalf("Failed to list users: %v", err)
}
if len(users) != 1 || users[0] != "testuser" {
t.Errorf("Expected ['testuser'], got %v", users)
}
// Test creating access key
newCred := &iam_pb.Credential{
AccessKey: "access456",
SecretKey: "secret456",
}
if err := store.CreateAccessKey(ctx, "testuser", newCred); err != nil {
t.Fatalf("Failed to create access key: %v", err)
}
// Verify user now has 2 credentials
updatedUser, err := store.GetUser(ctx, "testuser")
if err != nil {
t.Fatalf("Failed to get updated user: %v", err)
}
if len(updatedUser.Credentials) != 2 {
t.Errorf("Expected 2 credentials, got %d", len(updatedUser.Credentials))
}
// Test deleting access key
if err := store.DeleteAccessKey(ctx, "testuser", "access456"); err != nil {
t.Fatalf("Failed to delete access key: %v", err)
}
// Verify user now has 1 credential again
finalUser, err := store.GetUser(ctx, "testuser")
if err != nil {
t.Fatalf("Failed to get final user: %v", err)
}
if len(finalUser.Credentials) != 1 {
t.Errorf("Expected 1 credential, got %d", len(finalUser.Credentials))
}
// Test deleting user
if err := store.DeleteUser(ctx, "testuser"); err != nil {
t.Fatalf("Failed to delete user: %v", err)
}
// Verify user is gone
_, err = store.GetUser(ctx, "testuser")
if err != credential.ErrUserNotFound {
t.Errorf("Expected ErrUserNotFound, got %v", err)
}
// Test error cases
if err := store.CreateUser(ctx, identity); err != nil {
t.Fatalf("Failed to create user for error tests: %v", err)
}
// Try to create duplicate user
if err := store.CreateUser(ctx, identity); err != credential.ErrUserAlreadyExists {
t.Errorf("Expected ErrUserAlreadyExists, got %v", err)
}
// Try to get non-existent user
_, err = store.GetUser(ctx, "nonexistent")
if err != credential.ErrUserNotFound {
t.Errorf("Expected ErrUserNotFound, got %v", err)
}
// Try to get user by non-existent access key
_, err = store.GetUserByAccessKey(ctx, "nonexistent")
if err != credential.ErrAccessKeyNotFound {
t.Errorf("Expected ErrAccessKeyNotFound, got %v", err)
}
}
func TestMemoryStoreConcurrency(t *testing.T) {
store := &MemoryStore{}
config := util.GetViper()
if err := store.Initialize(config, "credential."); err != nil {
t.Fatalf("Failed to initialize store: %v", err)
}
ctx := context.Background()
// Test concurrent access
done := make(chan bool, 10)
for i := 0; i < 10; i++ {
go func(i int) {
defer func() { done <- true }()
username := fmt.Sprintf("user%d", i)
identity := &iam_pb.Identity{
Name: username,
Credentials: []*iam_pb.Credential{
{
AccessKey: fmt.Sprintf("access%d", i),
SecretKey: fmt.Sprintf("secret%d", i),
},
},
}
if err := store.CreateUser(ctx, identity); err != nil {
t.Errorf("Failed to create user %s: %v", username, err)
return
}
if _, err := store.GetUser(ctx, username); err != nil {
t.Errorf("Failed to get user %s: %v", username, err)
return
}
}(i)
}
// Wait for all goroutines to complete
for i := 0; i < 10; i++ {
<-done
}
// Verify all users were created
users, err := store.ListUsers(ctx)
if err != nil {
t.Fatalf("Failed to list users: %v", err)
}
if len(users) != 10 {
t.Errorf("Expected 10 users, got %d", len(users))
}
}
func TestMemoryStoreReset(t *testing.T) {
store := &MemoryStore{}
config := util.GetViper()
if err := store.Initialize(config, "credential."); err != nil {
t.Fatalf("Failed to initialize store: %v", err)
}
ctx := context.Background()
// Create a user
identity := &iam_pb.Identity{
Name: "testuser",
Credentials: []*iam_pb.Credential{
{
AccessKey: "access123",
SecretKey: "secret123",
},
},
}
if err := store.CreateUser(ctx, identity); err != nil {
t.Fatalf("Failed to create user: %v", err)
}
// Verify user exists
if store.GetUserCount() != 1 {
t.Errorf("Expected 1 user, got %d", store.GetUserCount())
}
if store.GetAccessKeyCount() != 1 {
t.Errorf("Expected 1 access key, got %d", store.GetAccessKeyCount())
}
// Reset the store
store.Reset()
// Verify store is empty
if store.GetUserCount() != 0 {
t.Errorf("Expected 0 users after reset, got %d", store.GetUserCount())
}
if store.GetAccessKeyCount() != 0 {
t.Errorf("Expected 0 access keys after reset, got %d", store.GetAccessKeyCount())
}
// Verify user is gone
_, err := store.GetUser(ctx, "testuser")
if err != credential.ErrUserNotFound {
t.Errorf("Expected ErrUserNotFound after reset, got %v", err)
}
}
func TestMemoryStoreConfigurationSaveLoad(t *testing.T) {
store := &MemoryStore{}
config := util.GetViper()
if err := store.Initialize(config, "credential."); err != nil {
t.Fatalf("Failed to initialize store: %v", err)
}
ctx := context.Background()
// Create initial configuration
originalConfig := &iam_pb.S3ApiConfiguration{
Identities: []*iam_pb.Identity{
{
Name: "user1",
Credentials: []*iam_pb.Credential{
{
AccessKey: "access1",
SecretKey: "secret1",
},
},
},
{
Name: "user2",
Credentials: []*iam_pb.Credential{
{
AccessKey: "access2",
SecretKey: "secret2",
},
},
},
},
}
// Save configuration
if err := store.SaveConfiguration(ctx, originalConfig); err != nil {
t.Fatalf("Failed to save configuration: %v", err)
}
// Load configuration
loadedConfig, err := store.LoadConfiguration(ctx)
if err != nil {
t.Fatalf("Failed to load configuration: %v", err)
}
// Verify configuration matches
if len(loadedConfig.Identities) != 2 {
t.Errorf("Expected 2 identities, got %d", len(loadedConfig.Identities))
}
// Check users exist
user1, err := store.GetUser(ctx, "user1")
if err != nil {
t.Fatalf("Failed to get user1: %v", err)
}
if len(user1.Credentials) != 1 || user1.Credentials[0].AccessKey != "access1" {
t.Errorf("User1 credentials not correct: %+v", user1.Credentials)
}
user2, err := store.GetUser(ctx, "user2")
if err != nil {
t.Fatalf("Failed to get user2: %v", err)
}
if len(user2.Credentials) != 1 || user2.Credentials[0].AccessKey != "access2" {
t.Errorf("User2 credentials not correct: %+v", user2.Credentials)
}
}

221
weed/credential/migration.go

@ -0,0 +1,221 @@
package credential
import (
"context"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/iam_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
)
// MigrateCredentials migrates credentials from one store to another
func MigrateCredentials(fromStoreName, toStoreName CredentialStoreTypeName, configuration util.Configuration, fromPrefix, toPrefix string) error {
ctx := context.Background()
// Create source credential manager
fromCM, err := NewCredentialManager(fromStoreName, configuration, fromPrefix)
if err != nil {
return fmt.Errorf("failed to create source credential manager (%s): %v", fromStoreName, err)
}
defer fromCM.Shutdown()
// Create destination credential manager
toCM, err := NewCredentialManager(toStoreName, configuration, toPrefix)
if err != nil {
return fmt.Errorf("failed to create destination credential manager (%s): %v", toStoreName, err)
}
defer toCM.Shutdown()
// Load configuration from source
glog.Infof("Loading configuration from %s store...", fromStoreName)
config, err := fromCM.LoadConfiguration(ctx)
if err != nil {
return fmt.Errorf("failed to load configuration from source store: %v", err)
}
if config == nil || len(config.Identities) == 0 {
glog.Info("No identities found in source store")
return nil
}
glog.Infof("Found %d identities in source store", len(config.Identities))
// Migrate each identity
var migrated, failed int
for _, identity := range config.Identities {
glog.V(1).Infof("Migrating user: %s", identity.Name)
// Check if user already exists in destination
existingUser, err := toCM.GetUser(ctx, identity.Name)
if err != nil && err != ErrUserNotFound {
glog.Errorf("Failed to check if user %s exists in destination: %v", identity.Name, err)
failed++
continue
}
if existingUser != nil {
glog.Warningf("User %s already exists in destination store, skipping", identity.Name)
continue
}
// Create user in destination
err = toCM.CreateUser(ctx, identity)
if err != nil {
glog.Errorf("Failed to create user %s in destination store: %v", identity.Name, err)
failed++
continue
}
migrated++
glog.V(1).Infof("Successfully migrated user: %s", identity.Name)
}
glog.Infof("Migration completed: %d migrated, %d failed", migrated, failed)
if failed > 0 {
return fmt.Errorf("migration completed with %d failures", failed)
}
return nil
}
// ExportCredentials exports credentials from a store to a configuration
func ExportCredentials(storeName CredentialStoreTypeName, configuration util.Configuration, prefix string) (*iam_pb.S3ApiConfiguration, error) {
ctx := context.Background()
// Create credential manager
cm, err := NewCredentialManager(storeName, configuration, prefix)
if err != nil {
return nil, fmt.Errorf("failed to create credential manager (%s): %v", storeName, err)
}
defer cm.Shutdown()
// Load configuration
config, err := cm.LoadConfiguration(ctx)
if err != nil {
return nil, fmt.Errorf("failed to load configuration: %v", err)
}
return config, nil
}
// ImportCredentials imports credentials from a configuration to a store
func ImportCredentials(storeName CredentialStoreTypeName, configuration util.Configuration, prefix string, config *iam_pb.S3ApiConfiguration) error {
ctx := context.Background()
// Create credential manager
cm, err := NewCredentialManager(storeName, configuration, prefix)
if err != nil {
return fmt.Errorf("failed to create credential manager (%s): %v", storeName, err)
}
defer cm.Shutdown()
// Import each identity
var imported, failed int
for _, identity := range config.Identities {
glog.V(1).Infof("Importing user: %s", identity.Name)
// Check if user already exists
existingUser, err := cm.GetUser(ctx, identity.Name)
if err != nil && err != ErrUserNotFound {
glog.Errorf("Failed to check if user %s exists: %v", identity.Name, err)
failed++
continue
}
if existingUser != nil {
glog.Warningf("User %s already exists, skipping", identity.Name)
continue
}
// Create user
err = cm.CreateUser(ctx, identity)
if err != nil {
glog.Errorf("Failed to create user %s: %v", identity.Name, err)
failed++
continue
}
imported++
glog.V(1).Infof("Successfully imported user: %s", identity.Name)
}
glog.Infof("Import completed: %d imported, %d failed", imported, failed)
if failed > 0 {
return fmt.Errorf("import completed with %d failures", failed)
}
return nil
}
// ValidateCredentials validates that all credentials in a store are accessible
func ValidateCredentials(storeName CredentialStoreTypeName, configuration util.Configuration, prefix string) error {
ctx := context.Background()
// Create credential manager
cm, err := NewCredentialManager(storeName, configuration, prefix)
if err != nil {
return fmt.Errorf("failed to create credential manager (%s): %v", storeName, err)
}
defer cm.Shutdown()
// Load configuration
config, err := cm.LoadConfiguration(ctx)
if err != nil {
return fmt.Errorf("failed to load configuration: %v", err)
}
if config == nil || len(config.Identities) == 0 {
glog.Info("No identities found in store")
return nil
}
glog.Infof("Validating %d identities...", len(config.Identities))
// Validate each identity
var validated, failed int
for _, identity := range config.Identities {
// Check if user can be retrieved
user, err := cm.GetUser(ctx, identity.Name)
if err != nil {
glog.Errorf("Failed to retrieve user %s: %v", identity.Name, err)
failed++
continue
}
if user == nil {
glog.Errorf("User %s not found", identity.Name)
failed++
continue
}
// Validate access keys
for _, credential := range identity.Credentials {
accessKeyUser, err := cm.GetUserByAccessKey(ctx, credential.AccessKey)
if err != nil {
glog.Errorf("Failed to retrieve user by access key %s: %v", credential.AccessKey, err)
failed++
continue
}
if accessKeyUser == nil || accessKeyUser.Name != identity.Name {
glog.Errorf("Access key %s does not map to correct user %s", credential.AccessKey, identity.Name)
failed++
continue
}
}
validated++
glog.V(1).Infof("Successfully validated user: %s", identity.Name)
}
glog.Infof("Validation completed: %d validated, %d failed", validated, failed)
if failed > 0 {
return fmt.Errorf("validation completed with %d failures", failed)
}
return nil
}

570
weed/credential/postgres/postgres_store.go

@ -0,0 +1,570 @@
package postgres
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"time"
"github.com/seaweedfs/seaweedfs/weed/credential"
"github.com/seaweedfs/seaweedfs/weed/pb/iam_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
_ "github.com/lib/pq"
)
func init() {
credential.Stores = append(credential.Stores, &PostgresStore{})
}
// PostgresStore implements CredentialStore using PostgreSQL
type PostgresStore struct {
db *sql.DB
configured bool
}
func (store *PostgresStore) GetName() credential.CredentialStoreTypeName {
return credential.StoreTypePostgres
}
func (store *PostgresStore) Initialize(configuration util.Configuration, prefix string) error {
if store.configured {
return nil
}
hostname := configuration.GetString(prefix + "hostname")
port := configuration.GetInt(prefix + "port")
username := configuration.GetString(prefix + "username")
password := configuration.GetString(prefix + "password")
database := configuration.GetString(prefix + "database")
schema := configuration.GetString(prefix + "schema")
sslmode := configuration.GetString(prefix + "sslmode")
// Set defaults
if hostname == "" {
hostname = "localhost"
}
if port == 0 {
port = 5432
}
if schema == "" {
schema = "public"
}
if sslmode == "" {
sslmode = "disable"
}
// Build connection string
connStr := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=%s search_path=%s",
hostname, port, username, password, database, sslmode, schema)
db, err := sql.Open("postgres", connStr)
if err != nil {
return fmt.Errorf("failed to open database: %v", err)
}
// Test connection
if err := db.Ping(); err != nil {
db.Close()
return fmt.Errorf("failed to ping database: %v", err)
}
// Set connection pool settings
db.SetMaxOpenConns(25)
db.SetMaxIdleConns(5)
db.SetConnMaxLifetime(5 * time.Minute)
store.db = db
// Create tables if they don't exist
if err := store.createTables(); err != nil {
db.Close()
return fmt.Errorf("failed to create tables: %v", err)
}
store.configured = true
return nil
}
func (store *PostgresStore) createTables() error {
// Create users table
usersTable := `
CREATE TABLE IF NOT EXISTS users (
username VARCHAR(255) PRIMARY KEY,
email VARCHAR(255),
account_data JSONB,
actions JSONB,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_users_email ON users(email);
`
// Create credentials table
credentialsTable := `
CREATE TABLE IF NOT EXISTS credentials (
id SERIAL PRIMARY KEY,
username VARCHAR(255) REFERENCES users(username) ON DELETE CASCADE,
access_key VARCHAR(255) UNIQUE NOT NULL,
secret_key VARCHAR(255) NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_credentials_username ON credentials(username);
CREATE INDEX IF NOT EXISTS idx_credentials_access_key ON credentials(access_key);
`
// Execute table creation
if _, err := store.db.Exec(usersTable); err != nil {
return fmt.Errorf("failed to create users table: %v", err)
}
if _, err := store.db.Exec(credentialsTable); err != nil {
return fmt.Errorf("failed to create credentials table: %v", err)
}
return nil
}
func (store *PostgresStore) LoadConfiguration(ctx context.Context) (*iam_pb.S3ApiConfiguration, error) {
if !store.configured {
return nil, fmt.Errorf("store not configured")
}
config := &iam_pb.S3ApiConfiguration{}
// Query all users
rows, err := store.db.QueryContext(ctx, "SELECT username, email, account_data, actions FROM users")
if err != nil {
return nil, fmt.Errorf("failed to query users: %v", err)
}
defer rows.Close()
for rows.Next() {
var username, email string
var accountDataJSON, actionsJSON []byte
if err := rows.Scan(&username, &email, &accountDataJSON, &actionsJSON); err != nil {
return nil, fmt.Errorf("failed to scan user row: %v", err)
}
identity := &iam_pb.Identity{
Name: username,
}
// Parse account data
if len(accountDataJSON) > 0 {
if err := json.Unmarshal(accountDataJSON, &identity.Account); err != nil {
return nil, fmt.Errorf("failed to unmarshal account data for user %s: %v", username, err)
}
}
// Parse actions
if len(actionsJSON) > 0 {
if err := json.Unmarshal(actionsJSON, &identity.Actions); err != nil {
return nil, fmt.Errorf("failed to unmarshal actions for user %s: %v", username, err)
}
}
// Query credentials for this user
credRows, err := store.db.QueryContext(ctx, "SELECT access_key, secret_key FROM credentials WHERE username = $1", username)
if err != nil {
return nil, fmt.Errorf("failed to query credentials for user %s: %v", username, err)
}
for credRows.Next() {
var accessKey, secretKey string
if err := credRows.Scan(&accessKey, &secretKey); err != nil {
credRows.Close()
return nil, fmt.Errorf("failed to scan credential row for user %s: %v", username, err)
}
identity.Credentials = append(identity.Credentials, &iam_pb.Credential{
AccessKey: accessKey,
SecretKey: secretKey,
})
}
credRows.Close()
config.Identities = append(config.Identities, identity)
}
return config, nil
}
func (store *PostgresStore) SaveConfiguration(ctx context.Context, config *iam_pb.S3ApiConfiguration) error {
if !store.configured {
return fmt.Errorf("store not configured")
}
// Start transaction
tx, err := store.db.BeginTx(ctx, nil)
if err != nil {
return fmt.Errorf("failed to begin transaction: %v", err)
}
defer tx.Rollback()
// Clear existing data
if _, err := tx.ExecContext(ctx, "DELETE FROM credentials"); err != nil {
return fmt.Errorf("failed to clear credentials: %v", err)
}
if _, err := tx.ExecContext(ctx, "DELETE FROM users"); err != nil {
return fmt.Errorf("failed to clear users: %v", err)
}
// Insert all identities
for _, identity := range config.Identities {
// Marshal account data
var accountDataJSON []byte
if identity.Account != nil {
accountDataJSON, err = json.Marshal(identity.Account)
if err != nil {
return fmt.Errorf("failed to marshal account data for user %s: %v", identity.Name, err)
}
}
// Marshal actions
var actionsJSON []byte
if identity.Actions != nil {
actionsJSON, err = json.Marshal(identity.Actions)
if err != nil {
return fmt.Errorf("failed to marshal actions for user %s: %v", identity.Name, err)
}
}
// Insert user
_, err := tx.ExecContext(ctx,
"INSERT INTO users (username, email, account_data, actions) VALUES ($1, $2, $3, $4)",
identity.Name, "", accountDataJSON, actionsJSON)
if err != nil {
return fmt.Errorf("failed to insert user %s: %v", identity.Name, err)
}
// Insert credentials
for _, cred := range identity.Credentials {
_, err := tx.ExecContext(ctx,
"INSERT INTO credentials (username, access_key, secret_key) VALUES ($1, $2, $3)",
identity.Name, cred.AccessKey, cred.SecretKey)
if err != nil {
return fmt.Errorf("failed to insert credential for user %s: %v", identity.Name, err)
}
}
}
return tx.Commit()
}
func (store *PostgresStore) CreateUser(ctx context.Context, identity *iam_pb.Identity) error {
if !store.configured {
return fmt.Errorf("store not configured")
}
// Check if user already exists
var count int
err := store.db.QueryRowContext(ctx, "SELECT COUNT(*) FROM users WHERE username = $1", identity.Name).Scan(&count)
if err != nil {
return fmt.Errorf("failed to check user existence: %v", err)
}
if count > 0 {
return credential.ErrUserAlreadyExists
}
// Start transaction
tx, err := store.db.BeginTx(ctx, nil)
if err != nil {
return fmt.Errorf("failed to begin transaction: %v", err)
}
defer tx.Rollback()
// Marshal account data
var accountDataJSON []byte
if identity.Account != nil {
accountDataJSON, err = json.Marshal(identity.Account)
if err != nil {
return fmt.Errorf("failed to marshal account data: %v", err)
}
}
// Marshal actions
var actionsJSON []byte
if identity.Actions != nil {
actionsJSON, err = json.Marshal(identity.Actions)
if err != nil {
return fmt.Errorf("failed to marshal actions: %v", err)
}
}
// Insert user
_, err = tx.ExecContext(ctx,
"INSERT INTO users (username, email, account_data, actions) VALUES ($1, $2, $3, $4)",
identity.Name, "", accountDataJSON, actionsJSON)
if err != nil {
return fmt.Errorf("failed to insert user: %v", err)
}
// Insert credentials
for _, cred := range identity.Credentials {
_, err = tx.ExecContext(ctx,
"INSERT INTO credentials (username, access_key, secret_key) VALUES ($1, $2, $3)",
identity.Name, cred.AccessKey, cred.SecretKey)
if err != nil {
return fmt.Errorf("failed to insert credential: %v", err)
}
}
return tx.Commit()
}
func (store *PostgresStore) GetUser(ctx context.Context, username string) (*iam_pb.Identity, error) {
if !store.configured {
return nil, fmt.Errorf("store not configured")
}
var email string
var accountDataJSON, actionsJSON []byte
err := store.db.QueryRowContext(ctx,
"SELECT email, account_data, actions FROM users WHERE username = $1",
username).Scan(&email, &accountDataJSON, &actionsJSON)
if err != nil {
if err == sql.ErrNoRows {
return nil, credential.ErrUserNotFound
}
return nil, fmt.Errorf("failed to query user: %v", err)
}
identity := &iam_pb.Identity{
Name: username,
}
// Parse account data
if len(accountDataJSON) > 0 {
if err := json.Unmarshal(accountDataJSON, &identity.Account); err != nil {
return nil, fmt.Errorf("failed to unmarshal account data: %v", err)
}
}
// Parse actions
if len(actionsJSON) > 0 {
if err := json.Unmarshal(actionsJSON, &identity.Actions); err != nil {
return nil, fmt.Errorf("failed to unmarshal actions: %v", err)
}
}
// Query credentials
rows, err := store.db.QueryContext(ctx, "SELECT access_key, secret_key FROM credentials WHERE username = $1", username)
if err != nil {
return nil, fmt.Errorf("failed to query credentials: %v", err)
}
defer rows.Close()
for rows.Next() {
var accessKey, secretKey string
if err := rows.Scan(&accessKey, &secretKey); err != nil {
return nil, fmt.Errorf("failed to scan credential: %v", err)
}
identity.Credentials = append(identity.Credentials, &iam_pb.Credential{
AccessKey: accessKey,
SecretKey: secretKey,
})
}
return identity, nil
}
func (store *PostgresStore) UpdateUser(ctx context.Context, username string, identity *iam_pb.Identity) error {
if !store.configured {
return fmt.Errorf("store not configured")
}
// Start transaction
tx, err := store.db.BeginTx(ctx, nil)
if err != nil {
return fmt.Errorf("failed to begin transaction: %v", err)
}
defer tx.Rollback()
// Check if user exists
var count int
err = tx.QueryRowContext(ctx, "SELECT COUNT(*) FROM users WHERE username = $1", username).Scan(&count)
if err != nil {
return fmt.Errorf("failed to check user existence: %v", err)
}
if count == 0 {
return credential.ErrUserNotFound
}
// Marshal account data
var accountDataJSON []byte
if identity.Account != nil {
accountDataJSON, err = json.Marshal(identity.Account)
if err != nil {
return fmt.Errorf("failed to marshal account data: %v", err)
}
}
// Marshal actions
var actionsJSON []byte
if identity.Actions != nil {
actionsJSON, err = json.Marshal(identity.Actions)
if err != nil {
return fmt.Errorf("failed to marshal actions: %v", err)
}
}
// Update user
_, err = tx.ExecContext(ctx,
"UPDATE users SET email = $2, account_data = $3, actions = $4, updated_at = CURRENT_TIMESTAMP WHERE username = $1",
username, "", accountDataJSON, actionsJSON)
if err != nil {
return fmt.Errorf("failed to update user: %v", err)
}
// Delete existing credentials
_, err = tx.ExecContext(ctx, "DELETE FROM credentials WHERE username = $1", username)
if err != nil {
return fmt.Errorf("failed to delete existing credentials: %v", err)
}
// Insert new credentials
for _, cred := range identity.Credentials {
_, err = tx.ExecContext(ctx,
"INSERT INTO credentials (username, access_key, secret_key) VALUES ($1, $2, $3)",
username, cred.AccessKey, cred.SecretKey)
if err != nil {
return fmt.Errorf("failed to insert credential: %v", err)
}
}
return tx.Commit()
}
func (store *PostgresStore) DeleteUser(ctx context.Context, username string) error {
if !store.configured {
return fmt.Errorf("store not configured")
}
result, err := store.db.ExecContext(ctx, "DELETE FROM users WHERE username = $1", username)
if err != nil {
return fmt.Errorf("failed to delete user: %v", err)
}
rowsAffected, err := result.RowsAffected()
if err != nil {
return fmt.Errorf("failed to get rows affected: %v", err)
}
if rowsAffected == 0 {
return credential.ErrUserNotFound
}
return nil
}
func (store *PostgresStore) ListUsers(ctx context.Context) ([]string, error) {
if !store.configured {
return nil, fmt.Errorf("store not configured")
}
rows, err := store.db.QueryContext(ctx, "SELECT username FROM users ORDER BY username")
if err != nil {
return nil, fmt.Errorf("failed to query users: %v", err)
}
defer rows.Close()
var usernames []string
for rows.Next() {
var username string
if err := rows.Scan(&username); err != nil {
return nil, fmt.Errorf("failed to scan username: %v", err)
}
usernames = append(usernames, username)
}
return usernames, nil
}
func (store *PostgresStore) GetUserByAccessKey(ctx context.Context, accessKey string) (*iam_pb.Identity, error) {
if !store.configured {
return nil, fmt.Errorf("store not configured")
}
var username string
err := store.db.QueryRowContext(ctx, "SELECT username FROM credentials WHERE access_key = $1", accessKey).Scan(&username)
if err != nil {
if err == sql.ErrNoRows {
return nil, credential.ErrAccessKeyNotFound
}
return nil, fmt.Errorf("failed to query access key: %v", err)
}
return store.GetUser(ctx, username)
}
func (store *PostgresStore) CreateAccessKey(ctx context.Context, username string, cred *iam_pb.Credential) error {
if !store.configured {
return fmt.Errorf("store not configured")
}
// Check if user exists
var count int
err := store.db.QueryRowContext(ctx, "SELECT COUNT(*) FROM users WHERE username = $1", username).Scan(&count)
if err != nil {
return fmt.Errorf("failed to check user existence: %v", err)
}
if count == 0 {
return credential.ErrUserNotFound
}
// Insert credential
_, err = store.db.ExecContext(ctx,
"INSERT INTO credentials (username, access_key, secret_key) VALUES ($1, $2, $3)",
username, cred.AccessKey, cred.SecretKey)
if err != nil {
return fmt.Errorf("failed to insert credential: %v", err)
}
return nil
}
func (store *PostgresStore) DeleteAccessKey(ctx context.Context, username string, accessKey string) error {
if !store.configured {
return fmt.Errorf("store not configured")
}
result, err := store.db.ExecContext(ctx,
"DELETE FROM credentials WHERE username = $1 AND access_key = $2",
username, accessKey)
if err != nil {
return fmt.Errorf("failed to delete access key: %v", err)
}
rowsAffected, err := result.RowsAffected()
if err != nil {
return fmt.Errorf("failed to get rows affected: %v", err)
}
if rowsAffected == 0 {
// Check if user exists
var count int
err = store.db.QueryRowContext(ctx, "SELECT COUNT(*) FROM users WHERE username = $1", username).Scan(&count)
if err != nil {
return fmt.Errorf("failed to check user existence: %v", err)
}
if count == 0 {
return credential.ErrUserNotFound
}
return credential.ErrAccessKeyNotFound
}
return nil
}
func (store *PostgresStore) Shutdown() {
if store.db != nil {
store.db.Close()
store.db = nil
}
store.configured = false
}

557
weed/credential/sqlite/sqlite_store.go

@ -0,0 +1,557 @@
package sqlite
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"os"
"path/filepath"
"github.com/seaweedfs/seaweedfs/weed/credential"
"github.com/seaweedfs/seaweedfs/weed/pb/iam_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
_ "modernc.org/sqlite"
)
func init() {
credential.Stores = append(credential.Stores, &SqliteStore{})
}
// SqliteStore implements CredentialStore using SQLite
type SqliteStore struct {
db *sql.DB
configured bool
}
func (store *SqliteStore) GetName() credential.CredentialStoreTypeName {
return credential.StoreTypeSQLite
}
func (store *SqliteStore) Initialize(configuration util.Configuration, prefix string) error {
if store.configured {
return nil
}
dbFile := configuration.GetString(prefix + "dbFile")
if dbFile == "" {
dbFile = "seaweedfs_credentials.db"
}
// Create directory if it doesn't exist
dir := filepath.Dir(dbFile)
if dir != "." {
if err := os.MkdirAll(dir, 0755); err != nil {
return fmt.Errorf("failed to create directory %s: %v", dir, err)
}
}
db, err := sql.Open("sqlite", dbFile)
if err != nil {
return fmt.Errorf("failed to open database: %v", err)
}
// Test connection
if err := db.Ping(); err != nil {
db.Close()
return fmt.Errorf("failed to ping database: %v", err)
}
store.db = db
// Create tables if they don't exist
if err := store.createTables(); err != nil {
db.Close()
return fmt.Errorf("failed to create tables: %v", err)
}
store.configured = true
return nil
}
func (store *SqliteStore) createTables() error {
// Create users table
usersTable := `
CREATE TABLE IF NOT EXISTS users (
username TEXT PRIMARY KEY,
email TEXT,
account_data TEXT,
actions TEXT,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_users_email ON users(email);
`
// Create credentials table
credentialsTable := `
CREATE TABLE IF NOT EXISTS credentials (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT REFERENCES users(username) ON DELETE CASCADE,
access_key TEXT UNIQUE NOT NULL,
secret_key TEXT NOT NULL,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_credentials_username ON credentials(username);
CREATE INDEX IF NOT EXISTS idx_credentials_access_key ON credentials(access_key);
`
// Execute table creation
if _, err := store.db.Exec(usersTable); err != nil {
return fmt.Errorf("failed to create users table: %v", err)
}
if _, err := store.db.Exec(credentialsTable); err != nil {
return fmt.Errorf("failed to create credentials table: %v", err)
}
return nil
}
func (store *SqliteStore) LoadConfiguration(ctx context.Context) (*iam_pb.S3ApiConfiguration, error) {
if !store.configured {
return nil, fmt.Errorf("store not configured")
}
config := &iam_pb.S3ApiConfiguration{}
// Query all users
rows, err := store.db.QueryContext(ctx, "SELECT username, email, account_data, actions FROM users")
if err != nil {
return nil, fmt.Errorf("failed to query users: %v", err)
}
defer rows.Close()
for rows.Next() {
var username, email, accountDataJSON, actionsJSON string
if err := rows.Scan(&username, &email, &accountDataJSON, &actionsJSON); err != nil {
return nil, fmt.Errorf("failed to scan user row: %v", err)
}
identity := &iam_pb.Identity{
Name: username,
}
// Parse account data
if accountDataJSON != "" {
if err := json.Unmarshal([]byte(accountDataJSON), &identity.Account); err != nil {
return nil, fmt.Errorf("failed to unmarshal account data for user %s: %v", username, err)
}
}
// Parse actions
if actionsJSON != "" {
if err := json.Unmarshal([]byte(actionsJSON), &identity.Actions); err != nil {
return nil, fmt.Errorf("failed to unmarshal actions for user %s: %v", username, err)
}
}
// Query credentials for this user
credRows, err := store.db.QueryContext(ctx, "SELECT access_key, secret_key FROM credentials WHERE username = ?", username)
if err != nil {
return nil, fmt.Errorf("failed to query credentials for user %s: %v", username, err)
}
for credRows.Next() {
var accessKey, secretKey string
if err := credRows.Scan(&accessKey, &secretKey); err != nil {
credRows.Close()
return nil, fmt.Errorf("failed to scan credential row for user %s: %v", username, err)
}
identity.Credentials = append(identity.Credentials, &iam_pb.Credential{
AccessKey: accessKey,
SecretKey: secretKey,
})
}
credRows.Close()
config.Identities = append(config.Identities, identity)
}
return config, nil
}
func (store *SqliteStore) SaveConfiguration(ctx context.Context, config *iam_pb.S3ApiConfiguration) error {
if !store.configured {
return fmt.Errorf("store not configured")
}
// Start transaction
tx, err := store.db.BeginTx(ctx, nil)
if err != nil {
return fmt.Errorf("failed to begin transaction: %v", err)
}
defer tx.Rollback()
// Clear existing data
if _, err := tx.ExecContext(ctx, "DELETE FROM credentials"); err != nil {
return fmt.Errorf("failed to clear credentials: %v", err)
}
if _, err := tx.ExecContext(ctx, "DELETE FROM users"); err != nil {
return fmt.Errorf("failed to clear users: %v", err)
}
// Insert all identities
for _, identity := range config.Identities {
// Marshal account data
var accountDataJSON string
if identity.Account != nil {
data, err := json.Marshal(identity.Account)
if err != nil {
return fmt.Errorf("failed to marshal account data for user %s: %v", identity.Name, err)
}
accountDataJSON = string(data)
}
// Marshal actions
var actionsJSON string
if identity.Actions != nil {
data, err := json.Marshal(identity.Actions)
if err != nil {
return fmt.Errorf("failed to marshal actions for user %s: %v", identity.Name, err)
}
actionsJSON = string(data)
}
// Insert user
_, err := tx.ExecContext(ctx,
"INSERT INTO users (username, email, account_data, actions) VALUES (?, ?, ?, ?)",
identity.Name, "", accountDataJSON, actionsJSON)
if err != nil {
return fmt.Errorf("failed to insert user %s: %v", identity.Name, err)
}
// Insert credentials
for _, cred := range identity.Credentials {
_, err := tx.ExecContext(ctx,
"INSERT INTO credentials (username, access_key, secret_key) VALUES (?, ?, ?)",
identity.Name, cred.AccessKey, cred.SecretKey)
if err != nil {
return fmt.Errorf("failed to insert credential for user %s: %v", identity.Name, err)
}
}
}
return tx.Commit()
}
func (store *SqliteStore) CreateUser(ctx context.Context, identity *iam_pb.Identity) error {
if !store.configured {
return fmt.Errorf("store not configured")
}
// Check if user already exists
var count int
err := store.db.QueryRowContext(ctx, "SELECT COUNT(*) FROM users WHERE username = ?", identity.Name).Scan(&count)
if err != nil {
return fmt.Errorf("failed to check user existence: %v", err)
}
if count > 0 {
return credential.ErrUserAlreadyExists
}
// Start transaction
tx, err := store.db.BeginTx(ctx, nil)
if err != nil {
return fmt.Errorf("failed to begin transaction: %v", err)
}
defer tx.Rollback()
// Marshal account data
var accountDataJSON string
if identity.Account != nil {
data, err := json.Marshal(identity.Account)
if err != nil {
return fmt.Errorf("failed to marshal account data: %v", err)
}
accountDataJSON = string(data)
}
// Marshal actions
var actionsJSON string
if identity.Actions != nil {
data, err := json.Marshal(identity.Actions)
if err != nil {
return fmt.Errorf("failed to marshal actions: %v", err)
}
actionsJSON = string(data)
}
// Insert user
_, err = tx.ExecContext(ctx,
"INSERT INTO users (username, email, account_data, actions) VALUES (?, ?, ?, ?)",
identity.Name, "", accountDataJSON, actionsJSON)
if err != nil {
return fmt.Errorf("failed to insert user: %v", err)
}
// Insert credentials
for _, cred := range identity.Credentials {
_, err = tx.ExecContext(ctx,
"INSERT INTO credentials (username, access_key, secret_key) VALUES (?, ?, ?)",
identity.Name, cred.AccessKey, cred.SecretKey)
if err != nil {
return fmt.Errorf("failed to insert credential: %v", err)
}
}
return tx.Commit()
}
func (store *SqliteStore) GetUser(ctx context.Context, username string) (*iam_pb.Identity, error) {
if !store.configured {
return nil, fmt.Errorf("store not configured")
}
var email, accountDataJSON, actionsJSON string
err := store.db.QueryRowContext(ctx,
"SELECT email, account_data, actions FROM users WHERE username = ?",
username).Scan(&email, &accountDataJSON, &actionsJSON)
if err != nil {
if err == sql.ErrNoRows {
return nil, credential.ErrUserNotFound
}
return nil, fmt.Errorf("failed to query user: %v", err)
}
identity := &iam_pb.Identity{
Name: username,
}
// Parse account data
if accountDataJSON != "" {
if err := json.Unmarshal([]byte(accountDataJSON), &identity.Account); err != nil {
return nil, fmt.Errorf("failed to unmarshal account data: %v", err)
}
}
// Parse actions
if actionsJSON != "" {
if err := json.Unmarshal([]byte(actionsJSON), &identity.Actions); err != nil {
return nil, fmt.Errorf("failed to unmarshal actions: %v", err)
}
}
// Query credentials
rows, err := store.db.QueryContext(ctx, "SELECT access_key, secret_key FROM credentials WHERE username = ?", username)
if err != nil {
return nil, fmt.Errorf("failed to query credentials: %v", err)
}
defer rows.Close()
for rows.Next() {
var accessKey, secretKey string
if err := rows.Scan(&accessKey, &secretKey); err != nil {
return nil, fmt.Errorf("failed to scan credential: %v", err)
}
identity.Credentials = append(identity.Credentials, &iam_pb.Credential{
AccessKey: accessKey,
SecretKey: secretKey,
})
}
return identity, nil
}
func (store *SqliteStore) UpdateUser(ctx context.Context, username string, identity *iam_pb.Identity) error {
if !store.configured {
return fmt.Errorf("store not configured")
}
// Start transaction
tx, err := store.db.BeginTx(ctx, nil)
if err != nil {
return fmt.Errorf("failed to begin transaction: %v", err)
}
defer tx.Rollback()
// Check if user exists
var count int
err = tx.QueryRowContext(ctx, "SELECT COUNT(*) FROM users WHERE username = ?", username).Scan(&count)
if err != nil {
return fmt.Errorf("failed to check user existence: %v", err)
}
if count == 0 {
return credential.ErrUserNotFound
}
// Marshal account data
var accountDataJSON string
if identity.Account != nil {
data, err := json.Marshal(identity.Account)
if err != nil {
return fmt.Errorf("failed to marshal account data: %v", err)
}
accountDataJSON = string(data)
}
// Marshal actions
var actionsJSON string
if identity.Actions != nil {
data, err := json.Marshal(identity.Actions)
if err != nil {
return fmt.Errorf("failed to marshal actions: %v", err)
}
actionsJSON = string(data)
}
// Update user
_, err = tx.ExecContext(ctx,
"UPDATE users SET email = ?, account_data = ?, actions = ?, updated_at = CURRENT_TIMESTAMP WHERE username = ?",
"", accountDataJSON, actionsJSON, username)
if err != nil {
return fmt.Errorf("failed to update user: %v", err)
}
// Delete existing credentials
_, err = tx.ExecContext(ctx, "DELETE FROM credentials WHERE username = ?", username)
if err != nil {
return fmt.Errorf("failed to delete existing credentials: %v", err)
}
// Insert new credentials
for _, cred := range identity.Credentials {
_, err = tx.ExecContext(ctx,
"INSERT INTO credentials (username, access_key, secret_key) VALUES (?, ?, ?)",
username, cred.AccessKey, cred.SecretKey)
if err != nil {
return fmt.Errorf("failed to insert credential: %v", err)
}
}
return tx.Commit()
}
func (store *SqliteStore) DeleteUser(ctx context.Context, username string) error {
if !store.configured {
return fmt.Errorf("store not configured")
}
result, err := store.db.ExecContext(ctx, "DELETE FROM users WHERE username = ?", username)
if err != nil {
return fmt.Errorf("failed to delete user: %v", err)
}
rowsAffected, err := result.RowsAffected()
if err != nil {
return fmt.Errorf("failed to get rows affected: %v", err)
}
if rowsAffected == 0 {
return credential.ErrUserNotFound
}
return nil
}
func (store *SqliteStore) ListUsers(ctx context.Context) ([]string, error) {
if !store.configured {
return nil, fmt.Errorf("store not configured")
}
rows, err := store.db.QueryContext(ctx, "SELECT username FROM users ORDER BY username")
if err != nil {
return nil, fmt.Errorf("failed to query users: %v", err)
}
defer rows.Close()
var usernames []string
for rows.Next() {
var username string
if err := rows.Scan(&username); err != nil {
return nil, fmt.Errorf("failed to scan username: %v", err)
}
usernames = append(usernames, username)
}
return usernames, nil
}
func (store *SqliteStore) GetUserByAccessKey(ctx context.Context, accessKey string) (*iam_pb.Identity, error) {
if !store.configured {
return nil, fmt.Errorf("store not configured")
}
var username string
err := store.db.QueryRowContext(ctx, "SELECT username FROM credentials WHERE access_key = ?", accessKey).Scan(&username)
if err != nil {
if err == sql.ErrNoRows {
return nil, credential.ErrAccessKeyNotFound
}
return nil, fmt.Errorf("failed to query access key: %v", err)
}
return store.GetUser(ctx, username)
}
func (store *SqliteStore) CreateAccessKey(ctx context.Context, username string, cred *iam_pb.Credential) error {
if !store.configured {
return fmt.Errorf("store not configured")
}
// Check if user exists
var count int
err := store.db.QueryRowContext(ctx, "SELECT COUNT(*) FROM users WHERE username = ?", username).Scan(&count)
if err != nil {
return fmt.Errorf("failed to check user existence: %v", err)
}
if count == 0 {
return credential.ErrUserNotFound
}
// Insert credential
_, err = store.db.ExecContext(ctx,
"INSERT INTO credentials (username, access_key, secret_key) VALUES (?, ?, ?)",
username, cred.AccessKey, cred.SecretKey)
if err != nil {
return fmt.Errorf("failed to insert credential: %v", err)
}
return nil
}
func (store *SqliteStore) DeleteAccessKey(ctx context.Context, username string, accessKey string) error {
if !store.configured {
return fmt.Errorf("store not configured")
}
result, err := store.db.ExecContext(ctx,
"DELETE FROM credentials WHERE username = ? AND access_key = ?",
username, accessKey)
if err != nil {
return fmt.Errorf("failed to delete access key: %v", err)
}
rowsAffected, err := result.RowsAffected()
if err != nil {
return fmt.Errorf("failed to get rows affected: %v", err)
}
if rowsAffected == 0 {
// Check if user exists
var count int
err = store.db.QueryRowContext(ctx, "SELECT COUNT(*) FROM users WHERE username = ?", username).Scan(&count)
if err != nil {
return fmt.Errorf("failed to check user existence: %v", err)
}
if count == 0 {
return credential.ErrUserNotFound
}
return credential.ErrAccessKeyNotFound
}
return nil
}
func (store *SqliteStore) Shutdown() {
if store.db != nil {
store.db.Close()
store.db = nil
}
store.configured = false
}

122
weed/credential/test/integration_test.go

@ -0,0 +1,122 @@
package test
import (
"context"
"testing"
"github.com/seaweedfs/seaweedfs/weed/credential"
"github.com/seaweedfs/seaweedfs/weed/pb/iam_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
// Import all store implementations to register them
_ "github.com/seaweedfs/seaweedfs/weed/credential/filer_etc"
_ "github.com/seaweedfs/seaweedfs/weed/credential/memory"
_ "github.com/seaweedfs/seaweedfs/weed/credential/postgres"
_ "github.com/seaweedfs/seaweedfs/weed/credential/sqlite"
)
func TestStoreRegistration(t *testing.T) {
// Test that stores are registered
storeNames := credential.GetAvailableStores()
if len(storeNames) == 0 {
t.Fatal("No credential stores registered")
}
expectedStores := []string{string(credential.StoreTypeFilerEtc), string(credential.StoreTypeMemory), string(credential.StoreTypeSQLite), string(credential.StoreTypePostgres)}
// Verify all expected stores are present
for _, expected := range expectedStores {
found := false
for _, storeName := range storeNames {
if string(storeName) == expected {
found = true
break
}
}
if !found {
t.Errorf("Expected store not found: %s", expected)
}
}
t.Logf("Available stores: %v", storeNames)
}
func TestMemoryStoreIntegration(t *testing.T) {
// Test creating credential manager with memory store
config := util.GetViper()
cm, err := credential.NewCredentialManager(credential.StoreTypeMemory, config, "test.")
if err != nil {
t.Fatalf("Failed to create memory credential manager: %v", err)
}
defer cm.Shutdown()
// Test that the store is of the correct type
if cm.GetStore().GetName() != credential.StoreTypeMemory {
t.Errorf("Expected memory store, got %s", cm.GetStore().GetName())
}
// Test basic operations
ctx := context.Background()
// Create test user
testUser := &iam_pb.Identity{
Name: "testuser",
Actions: []string{"Read", "Write"},
Account: &iam_pb.Account{
Id: "123456789012",
DisplayName: "Test User",
EmailAddress: "test@example.com",
},
Credentials: []*iam_pb.Credential{
{
AccessKey: "AKIAIOSFODNN7EXAMPLE",
SecretKey: "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
},
},
}
// Test CreateUser
err = cm.CreateUser(ctx, testUser)
if err != nil {
t.Fatalf("CreateUser failed: %v", err)
}
// Test GetUser
user, err := cm.GetUser(ctx, "testuser")
if err != nil {
t.Fatalf("GetUser failed: %v", err)
}
if user.Name != "testuser" {
t.Errorf("Expected user name 'testuser', got %s", user.Name)
}
// Test ListUsers
users, err := cm.ListUsers(ctx)
if err != nil {
t.Fatalf("ListUsers failed: %v", err)
}
if len(users) != 1 || users[0] != "testuser" {
t.Errorf("Expected ['testuser'], got %v", users)
}
// Test GetUserByAccessKey
userByKey, err := cm.GetUserByAccessKey(ctx, "AKIAIOSFODNN7EXAMPLE")
if err != nil {
t.Fatalf("GetUserByAccessKey failed: %v", err)
}
if userByKey.Name != "testuser" {
t.Errorf("Expected user name 'testuser', got %s", userByKey.Name)
}
// Test DeleteUser
err = cm.DeleteUser(ctx, "testuser")
if err != nil {
t.Fatalf("DeleteUser failed: %v", err)
}
// Verify user was deleted
_, err = cm.GetUser(ctx, "testuser")
if err != credential.ErrUserNotFound {
t.Errorf("Expected ErrUserNotFound, got %v", err)
}
}

59
weed/iamapi/iamapi_server.go

@ -4,11 +4,13 @@ package iamapi
import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
"github.com/gorilla/mux"
"github.com/seaweedfs/seaweedfs/weed/credential"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
@ -31,6 +33,7 @@ type IamS3ApiConfig interface {
type IamS3ApiConfigure struct {
option *IamServerOption
masterClient *wdclient.MasterClient
credentialManager *credential.CredentialManager
}
type IamServerOption struct {
@ -48,17 +51,28 @@ type IamApiServer struct {
var s3ApiConfigure IamS3ApiConfig
func NewIamApiServer(router *mux.Router, option *IamServerOption) (iamApiServer *IamApiServer, err error) {
s3ApiConfigure = IamS3ApiConfigure{
return NewIamApiServerWithStore(router, option, "")
}
func NewIamApiServerWithStore(router *mux.Router, option *IamServerOption, explicitStore string) (iamApiServer *IamApiServer, err error) {
configure := &IamS3ApiConfigure{
option: option,
masterClient: wdclient.NewMasterClient(option.GrpcDialOption, "", "iam", "", "", "", *pb.NewServiceDiscoveryFromMap(option.Masters)),
}
s3ApiConfigure = configure
s3Option := s3api.S3ApiServerOption{
Filer: option.Filer,
GrpcDialOption: option.GrpcDialOption,
}
iam := s3api.NewIdentityAccessManagementWithStore(&s3Option, explicitStore)
configure.credentialManager = iam.GetCredentialManager()
iamApiServer = &IamApiServer{
s3ApiConfig: s3ApiConfigure,
iam: s3api.NewIdentityAccessManagement(&s3Option),
iam: iam,
}
iamApiServer.registerRouter(router)
@ -78,10 +92,31 @@ func (iama *IamApiServer) registerRouter(router *mux.Router) {
apiRouter.NotFoundHandler = http.HandlerFunc(s3err.NotFoundHandler)
}
func (iam IamS3ApiConfigure) GetS3ApiConfiguration(s3cfg *iam_pb.S3ApiConfiguration) (err error) {
func (iama *IamS3ApiConfigure) GetS3ApiConfiguration(s3cfg *iam_pb.S3ApiConfiguration) (err error) {
return iama.GetS3ApiConfigurationFromCredentialManager(s3cfg)
}
func (iama *IamS3ApiConfigure) PutS3ApiConfiguration(s3cfg *iam_pb.S3ApiConfiguration) (err error) {
return iama.PutS3ApiConfigurationToCredentialManager(s3cfg)
}
func (iama *IamS3ApiConfigure) GetS3ApiConfigurationFromCredentialManager(s3cfg *iam_pb.S3ApiConfiguration) (err error) {
config, err := iama.credentialManager.LoadConfiguration(context.Background())
if err != nil {
return fmt.Errorf("failed to load configuration from credential manager: %v", err)
}
*s3cfg = *config
return nil
}
func (iama *IamS3ApiConfigure) PutS3ApiConfigurationToCredentialManager(s3cfg *iam_pb.S3ApiConfiguration) (err error) {
return iama.credentialManager.SaveConfiguration(context.Background(), s3cfg)
}
func (iama *IamS3ApiConfigure) GetS3ApiConfigurationFromFiler(s3cfg *iam_pb.S3ApiConfiguration) (err error) {
var buf bytes.Buffer
err = pb.WithGrpcFilerClient(false, 0, iam.option.Filer, iam.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
if err = filer.ReadEntry(iam.masterClient, client, filer.IamConfigDirectory, filer.IamIdentityFile, &buf); err != nil {
err = pb.WithGrpcFilerClient(false, 0, iama.option.Filer, iama.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
if err = filer.ReadEntry(iama.masterClient, client, filer.IamConfigDirectory, filer.IamIdentityFile, &buf); err != nil {
return err
}
return nil
@ -97,12 +132,12 @@ func (iam IamS3ApiConfigure) GetS3ApiConfiguration(s3cfg *iam_pb.S3ApiConfigurat
return nil
}
func (iam IamS3ApiConfigure) PutS3ApiConfiguration(s3cfg *iam_pb.S3ApiConfiguration) (err error) {
func (iama *IamS3ApiConfigure) PutS3ApiConfigurationToFiler(s3cfg *iam_pb.S3ApiConfiguration) (err error) {
buf := bytes.Buffer{}
if err := filer.ProtoToText(&buf, s3cfg); err != nil {
return fmt.Errorf("ProtoToText: %s", err)
}
return pb.WithGrpcFilerClient(false, 0, iam.option.Filer, iam.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
return pb.WithGrpcFilerClient(false, 0, iama.option.Filer, iama.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
err = util.Retry("saveIamIdentity", func() error {
return filer.SaveInsideFiler(client, filer.IamConfigDirectory, filer.IamIdentityFile, buf.Bytes())
})
@ -113,10 +148,10 @@ func (iam IamS3ApiConfigure) PutS3ApiConfiguration(s3cfg *iam_pb.S3ApiConfigurat
})
}
func (iam IamS3ApiConfigure) GetPolicies(policies *Policies) (err error) {
func (iama *IamS3ApiConfigure) GetPolicies(policies *Policies) (err error) {
var buf bytes.Buffer
err = pb.WithGrpcFilerClient(false, 0, iam.option.Filer, iam.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
if err = filer.ReadEntry(iam.masterClient, client, filer.IamConfigDirectory, filer.IamPoliciesFile, &buf); err != nil {
err = pb.WithGrpcFilerClient(false, 0, iama.option.Filer, iama.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
if err = filer.ReadEntry(iama.masterClient, client, filer.IamConfigDirectory, filer.IamPoliciesFile, &buf); err != nil {
return err
}
return nil
@ -134,12 +169,12 @@ func (iam IamS3ApiConfigure) GetPolicies(policies *Policies) (err error) {
return nil
}
func (iam IamS3ApiConfigure) PutPolicies(policies *Policies) (err error) {
func (iama *IamS3ApiConfigure) PutPolicies(policies *Policies) (err error) {
var b []byte
if b, err = json.Marshal(policies); err != nil {
return err
}
return pb.WithGrpcFilerClient(false, 0, iam.option.Filer, iam.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
return pb.WithGrpcFilerClient(false, 0, iama.option.Filer, iama.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
if err := filer.SaveInsideFiler(client, filer.IamConfigDirectory, filer.IamPoliciesFile, b); err != nil {
return err
}

62
weed/s3api/auth_credentials.go

@ -1,19 +1,21 @@
package s3api
import (
"context"
"fmt"
"net/http"
"os"
"strings"
"sync"
"github.com/seaweedfs/seaweedfs/weed/credential"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/iam_pb"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
"google.golang.org/grpc"
)
type Action string
@ -35,6 +37,9 @@ type IdentityAccessManagement struct {
hashMu sync.RWMutex
domain string
isAuthEnabled bool
credentialManager *credential.CredentialManager
filerClient filer_pb.SeaweedFilerClient
grpcDialOption grpc.DialOption
}
type Identity struct {
@ -114,19 +119,40 @@ func (action Action) getPermission() Permission {
}
func NewIdentityAccessManagement(option *S3ApiServerOption) *IdentityAccessManagement {
return NewIdentityAccessManagementWithStore(option, "")
}
func NewIdentityAccessManagementWithStore(option *S3ApiServerOption, explicitStore string) *IdentityAccessManagement {
iam := &IdentityAccessManagement{
domain: option.DomainName,
hashes: make(map[string]*sync.Pool),
hashCounters: make(map[string]*int32),
}
// Always initialize credential manager with fallback to defaults
credentialManager, err := credential.NewCredentialManagerWithDefaults(credential.CredentialStoreTypeName(explicitStore))
if err != nil {
glog.Fatalf("failed to initialize credential manager: %v", err)
}
// For stores that need filer client details, set them
if store := credentialManager.GetStore(); store != nil {
if filerClientSetter, ok := store.(interface {
SetFilerClient(string, grpc.DialOption)
}); ok {
filerClientSetter.SetFilerClient(string(option.Filer), option.GrpcDialOption)
}
}
iam.credentialManager = credentialManager
if option.Config != "" {
glog.V(3).Infof("loading static config file %s", option.Config)
if err := iam.loadS3ApiConfigurationFromFile(option.Config); err != nil {
glog.Fatalf("fail to load config file %s: %v", option.Config, err)
}
} else {
glog.V(3).Infof("no static config file specified... loading config from filer %s", option.Filer)
glog.V(3).Infof("no static config file specified... loading config from credential manager")
if err := iam.loadS3ApiConfigurationFromFiler(option); err != nil {
glog.Warningf("fail to load config: %v", err)
}
@ -134,17 +160,8 @@ func NewIdentityAccessManagement(option *S3ApiServerOption) *IdentityAccessManag
return iam
}
func (iam *IdentityAccessManagement) loadS3ApiConfigurationFromFiler(option *S3ApiServerOption) (err error) {
var content []byte
err = pb.WithFilerClient(false, 0, option.Filer, option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
glog.V(3).Infof("loading config %s from filer %s", filer.IamConfigDirectory+"/"+filer.IamIdentityFile, option.Filer)
content, err = filer.ReadInsideFiler(client, filer.IamConfigDirectory, filer.IamIdentityFile)
return err
})
if err != nil {
return fmt.Errorf("read S3 config: %v", err)
}
return iam.LoadS3ApiConfigurationFromBytes(content)
func (iam *IdentityAccessManagement) loadS3ApiConfigurationFromFiler(option *S3ApiServerOption) error {
return iam.LoadS3ApiConfigurationFromCredentialManager()
}
func (iam *IdentityAccessManagement) loadS3ApiConfigurationFromFile(fileName string) error {
@ -516,3 +533,22 @@ func (identity *Identity) isAdmin() bool {
}
return false
}
// GetCredentialManager returns the credential manager instance
func (iam *IdentityAccessManagement) GetCredentialManager() *credential.CredentialManager {
return iam.credentialManager
}
// LoadS3ApiConfigurationFromCredentialManager loads configuration using the credential manager
func (iam *IdentityAccessManagement) LoadS3ApiConfigurationFromCredentialManager() error {
s3ApiConfiguration, err := iam.credentialManager.LoadConfiguration(context.Background())
if err != nil {
return fmt.Errorf("failed to load configuration from credential manager: %v", err)
}
if len(s3ApiConfiguration.Identities) == 0 {
return fmt.Errorf("no identities found")
}
return iam.loadS3ApiConfiguration(s3ApiConfiguration)
}

8
weed/s3api/s3api_put_object_helper_test.go

@ -5,13 +5,15 @@ import (
"strings"
"testing"
"github.com/seaweedfs/seaweedfs/weed/credential"
_ "github.com/seaweedfs/seaweedfs/weed/credential/memory"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
)
func TestGetRequestDataReader_ChunkedEncodingWithoutIAM(t *testing.T) {
// Create an S3ApiServer with IAM disabled
s3a := &S3ApiServer{
iam: NewIdentityAccessManagement(&S3ApiServerOption{}),
iam: NewIdentityAccessManagementWithStore(&S3ApiServerOption{}, string(credential.StoreTypeMemory)),
}
// Ensure IAM is disabled for this test
s3a.iam.isAuthEnabled = false
@ -85,7 +87,7 @@ func TestGetRequestDataReader_ChunkedEncodingWithoutIAM(t *testing.T) {
func TestGetRequestDataReader_AuthTypeDetection(t *testing.T) {
// Create an S3ApiServer with IAM disabled
s3a := &S3ApiServer{
iam: NewIdentityAccessManagement(&S3ApiServerOption{}),
iam: NewIdentityAccessManagementWithStore(&S3ApiServerOption{}, string(credential.StoreTypeMemory)),
}
s3a.iam.isAuthEnabled = false
@ -120,7 +122,7 @@ func TestGetRequestDataReader_AuthTypeDetection(t *testing.T) {
func TestGetRequestDataReader_IAMEnabled(t *testing.T) {
// Create an S3ApiServer with IAM enabled
s3a := &S3ApiServer{
iam: NewIdentityAccessManagement(&S3ApiServerOption{}),
iam: NewIdentityAccessManagementWithStore(&S3ApiServerOption{}, string(credential.StoreTypeMemory)),
}
s3a.iam.isAuthEnabled = true

18
weed/s3api/s3api_server.go

@ -8,6 +8,7 @@ import (
"strings"
"time"
"github.com/seaweedfs/seaweedfs/weed/credential"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/s3_pb"
@ -48,9 +49,14 @@ type S3ApiServer struct {
filerGuard *security.Guard
client util_http_client.HTTPClientInterface
bucketRegistry *BucketRegistry
credentialManager *credential.CredentialManager
}
func NewS3ApiServer(router *mux.Router, option *S3ApiServerOption) (s3ApiServer *S3ApiServer, err error) {
return NewS3ApiServerWithStore(router, option, "")
}
func NewS3ApiServerWithStore(router *mux.Router, option *S3ApiServerOption, explicitStore string) (s3ApiServer *S3ApiServer, err error) {
startTsNs := time.Now().UnixNano()
v := util.GetViper()
@ -64,19 +70,25 @@ func NewS3ApiServer(router *mux.Router, option *S3ApiServerOption) (s3ApiServer
v.SetDefault("cors.allowed_origins.values", "*")
if (option.AllowedOrigins == nil) || (len(option.AllowedOrigins) == 0) {
if len(option.AllowedOrigins) == 0 {
allowedOrigins := v.GetString("cors.allowed_origins.values")
domains := strings.Split(allowedOrigins, ",")
option.AllowedOrigins = domains
}
var iam *IdentityAccessManagement
iam = NewIdentityAccessManagementWithStore(option, explicitStore)
s3ApiServer = &S3ApiServer{
option: option,
iam: NewIdentityAccessManagement(option),
iam: iam,
randomClientId: util.RandomInt32(),
filerGuard: security.NewGuard([]string{}, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec),
cb: NewCircuitBreaker(option),
credentialManager: iam.credentialManager,
}
if option.Config != "" {
grace.OnReload(func() {
if err := s3ApiServer.iam.loadS3ApiConfigurationFromFile(option.Config); err != nil {
@ -119,7 +131,7 @@ func (s3a *S3ApiServer) registerRouter(router *mux.Router) {
func(w http.ResponseWriter, r *http.Request) {
origin := r.Header.Get("Origin")
if origin != "" {
if s3a.option.AllowedOrigins == nil || len(s3a.option.AllowedOrigins) == 0 || s3a.option.AllowedOrigins[0] == "*" {
if len(s3a.option.AllowedOrigins) == 0 || s3a.option.AllowedOrigins[0] == "*" {
origin = "*"
} else {
originFound := false

Loading…
Cancel
Save