Browse Source

s3: route uploads by storage class disk mapping

feature-8113-storage-class-disk-routing
Chris Lu 3 days ago
parent
commit
5e4b3f0078
  1. 45
      docs/design/s3-storage-class-disk-routing.md
  2. 1
      weed/command/filer.go
  3. 1
      weed/command/mini.go
  4. 7
      weed/command/s3.go
  5. 1
      weed/command/server.go
  6. 6
      weed/s3api/s3api_object_handlers_multipart.go
  7. 16
      weed/s3api/s3api_object_handlers_put.go
  8. 9
      weed/s3api/s3api_server.go
  9. 79
      weed/s3api/storage_class_routing.go
  10. 77
      weed/s3api/storage_class_routing_test.go

45
docs/design/s3-storage-class-disk-routing.md

@ -0,0 +1,45 @@
# S3 Storage Class to Disk Routing
## Problem
SeaweedFS already stores S3 `x-amz-storage-class` as object metadata, but write allocation (`AssignVolume`) does not use it. Objects are therefore not routed to specific disk tags by storage class.
## Goals
1. Route new writes to disk types based on storage class.
2. Preserve current behavior when no routing map is configured.
3. Keep implementation incremental so future storage-class transitions can reuse the same decision logic.
## Phase 1 (implemented in this PR)
### Scope
1. Add S3 server option `storageClassDiskTypeMap` (`-s3.storageClassDiskTypeMap` in composite commands, `-storageClassDiskTypeMap` in standalone `weed s3`).
2. Parse map format: `STORAGE_CLASS=diskType` comma-separated, e.g. `STANDARD_IA=ssd,GLACIER=hdd`.
3. Resolve effective storage class from:
- request header `X-Amz-Storage-Class`
- fallback to stored entry metadata (when available)
- fallback to `STANDARD`
4. Apply mapped disk type on `AssignVolume` for `putToFiler` upload path.
5. For multipart uploads, propagate storage class from upload metadata to part requests so part chunk allocation also follows routing.
### Behavior
1. If mapping is empty or class is unmapped: unchanged behavior (`DiskType=""`).
2. Invalid storage class in request header: return `InvalidStorageClass`.
3. Metadata storage remains AWS-compatible (`X-Amz-Storage-Class` is still saved when explicitly provided).
## Phase 2 (next)
1. Apply the same routing decision to server-side copy chunk allocation paths.
2. Ensure storage-class changes via copy (`x-amz-metadata-directive: REPLACE` + new class) move chunks to target disk type immediately.
## Phase 3 (future)
1. Add async background transition API for in-place class change:
- mark object transition intent in metadata
- enqueue migration job
- copy chunks to target class disk
- atomically swap metadata/chunks
- garbage collect old chunks
2. Add transition job status and retry handling.
3. Add bucket policy controls for allowed transitions.
## Non-goals for Phase 1
1. Lifecycle-driven transitions (`STANDARD` -> `GLACIER` by age).
2. Cost-aware placement balancing.
3. Cross-cluster migration.

1
weed/command/filer.go

@ -144,6 +144,7 @@ func init() {
filerS3Options.enableIam = cmdFiler.Flag.Bool("s3.iam", true, "enable embedded IAM API on the same S3 port") filerS3Options.enableIam = cmdFiler.Flag.Bool("s3.iam", true, "enable embedded IAM API on the same S3 port")
filerS3Options.cipher = cmdFiler.Flag.Bool("s3.encryptVolumeData", false, "encrypt data on volume servers for S3 uploads") filerS3Options.cipher = cmdFiler.Flag.Bool("s3.encryptVolumeData", false, "encrypt data on volume servers for S3 uploads")
filerS3Options.iamReadOnly = cmdFiler.Flag.Bool("s3.iam.readOnly", true, "disable IAM write operations on this server") filerS3Options.iamReadOnly = cmdFiler.Flag.Bool("s3.iam.readOnly", true, "disable IAM write operations on this server")
filerS3Options.storageClassDiskTypeMap = cmdFiler.Flag.String("s3.storageClassDiskTypeMap", "", "map S3 storage classes to filer disk types, e.g. STANDARD_IA=ssd,GLACIER=hdd")
filerS3Options.portIceberg = cmdFiler.Flag.Int("s3.port.iceberg", 8181, "Iceberg REST Catalog server listen port (0 to disable)") filerS3Options.portIceberg = cmdFiler.Flag.Int("s3.port.iceberg", 8181, "Iceberg REST Catalog server listen port (0 to disable)")
// start webdav on filer // start webdav on filer

1
weed/command/mini.go

@ -241,6 +241,7 @@ func initMiniS3Flags() {
miniS3Options.iamReadOnly = miniS3IamReadOnly miniS3Options.iamReadOnly = miniS3IamReadOnly
miniS3Options.dataCenter = cmdMini.Flag.String("s3.dataCenter", "", "prefer to read and write to volumes in this data center") miniS3Options.dataCenter = cmdMini.Flag.String("s3.dataCenter", "", "prefer to read and write to volumes in this data center")
miniS3Options.cipher = cmdMini.Flag.Bool("s3.encryptVolumeData", false, "encrypt data on volume servers for S3 uploads") miniS3Options.cipher = cmdMini.Flag.Bool("s3.encryptVolumeData", false, "encrypt data on volume servers for S3 uploads")
miniS3Options.storageClassDiskTypeMap = cmdMini.Flag.String("s3.storageClassDiskTypeMap", "", "map S3 storage classes to filer disk types, e.g. STANDARD_IA=ssd,GLACIER=hdd")
miniS3Options.config = miniS3Config miniS3Options.config = miniS3Config
miniS3Options.iamConfig = miniIamConfig miniS3Options.iamConfig = miniIamConfig
miniS3Options.auditLogConfig = cmdMini.Flag.String("s3.auditLogConfig", "", "path to the audit log config file") miniS3Options.auditLogConfig = cmdMini.Flag.String("s3.auditLogConfig", "", "path to the audit log config file")

7
weed/command/s3.go

@ -67,6 +67,7 @@ type S3Options struct {
debug *bool debug *bool
debugPort *int debugPort *int
cipher *bool cipher *bool
storageClassDiskTypeMap *string
} }
func init() { func init() {
@ -101,6 +102,7 @@ func init() {
s3StandaloneOptions.debug = cmdS3.Flag.Bool("debug", false, "serves runtime profiling data via pprof on the port specified by -debug.port") s3StandaloneOptions.debug = cmdS3.Flag.Bool("debug", false, "serves runtime profiling data via pprof on the port specified by -debug.port")
s3StandaloneOptions.debugPort = cmdS3.Flag.Int("debug.port", 6060, "http port for debugging") s3StandaloneOptions.debugPort = cmdS3.Flag.Int("debug.port", 6060, "http port for debugging")
s3StandaloneOptions.cipher = cmdS3.Flag.Bool("encryptVolumeData", false, "encrypt data on volume servers") s3StandaloneOptions.cipher = cmdS3.Flag.Bool("encryptVolumeData", false, "encrypt data on volume servers")
s3StandaloneOptions.storageClassDiskTypeMap = cmdS3.Flag.String("storageClassDiskTypeMap", "", "map S3 storage classes to filer disk types, e.g. STANDARD_IA=ssd,GLACIER=hdd")
} }
var cmdS3 = &Command{ var cmdS3 = &Command{
@ -271,6 +273,10 @@ func (s3opt *S3Options) startS3Server() bool {
} }
var s3ApiServer *s3api.S3ApiServer var s3ApiServer *s3api.S3ApiServer
var s3ApiServer_err error var s3ApiServer_err error
storageClassDiskTypeMap := ""
if s3opt.storageClassDiskTypeMap != nil {
storageClassDiskTypeMap = *s3opt.storageClassDiskTypeMap
}
// Create S3 server with optional advanced IAM integration // Create S3 server with optional advanced IAM integration
var iamConfigPath string var iamConfigPath string
@ -309,6 +315,7 @@ func (s3opt *S3Options) startS3Server() bool {
Cipher: *s3opt.cipher, // encrypt data on volume servers Cipher: *s3opt.cipher, // encrypt data on volume servers
BindIp: *s3opt.bindIp, BindIp: *s3opt.bindIp,
GrpcPort: *s3opt.portGrpc, GrpcPort: *s3opt.portGrpc,
StorageClassDiskTypeMap: storageClassDiskTypeMap,
}) })
if s3ApiServer_err != nil { if s3ApiServer_err != nil {
glog.Fatalf("S3 API Server startup error: %v", s3ApiServer_err) glog.Fatalf("S3 API Server startup error: %v", s3ApiServer_err)

1
weed/command/server.go

@ -178,6 +178,7 @@ func init() {
s3Options.enableIam = cmdServer.Flag.Bool("s3.iam", true, "enable embedded IAM API on the same S3 port") s3Options.enableIam = cmdServer.Flag.Bool("s3.iam", true, "enable embedded IAM API on the same S3 port")
s3Options.iamReadOnly = cmdServer.Flag.Bool("s3.iam.readOnly", true, "disable IAM write operations on this server") s3Options.iamReadOnly = cmdServer.Flag.Bool("s3.iam.readOnly", true, "disable IAM write operations on this server")
s3Options.cipher = cmdServer.Flag.Bool("s3.encryptVolumeData", false, "encrypt data on volume servers for S3 uploads") s3Options.cipher = cmdServer.Flag.Bool("s3.encryptVolumeData", false, "encrypt data on volume servers for S3 uploads")
s3Options.storageClassDiskTypeMap = cmdServer.Flag.String("s3.storageClassDiskTypeMap", "", "map S3 storage classes to filer disk types, e.g. STANDARD_IA=ssd,GLACIER=hdd")
sftpOptions.port = cmdServer.Flag.Int("sftp.port", 2022, "SFTP server listen port") sftpOptions.port = cmdServer.Flag.Int("sftp.port", 2022, "SFTP server listen port")
sftpOptions.sshPrivateKey = cmdServer.Flag.String("sftp.sshPrivateKey", "", "path to the SSH private key file for host authentication") sftpOptions.sshPrivateKey = cmdServer.Flag.String("sftp.sshPrivateKey", "", "path to the SSH private key file for host authentication")

6
weed/s3api/s3api_object_handlers_multipart.go

@ -365,6 +365,12 @@ func (s3a *S3ApiServer) PutObjectPartHandler(w http.ResponseWriter, r *http.Requ
// No SSE-C headers, check for SSE-KMS settings from upload directory // No SSE-C headers, check for SSE-KMS settings from upload directory
if uploadEntry, err := s3a.getEntry(s3a.genUploadsFolder(bucket), uploadID); err == nil { if uploadEntry, err := s3a.getEntry(s3a.genUploadsFolder(bucket), uploadID); err == nil {
if uploadEntry.Extended != nil { if uploadEntry.Extended != nil {
if r.Header.Get(s3_constants.AmzStorageClass) == "" {
if storageClass := strings.TrimSpace(string(uploadEntry.Extended[s3_constants.AmzStorageClass])); storageClass != "" {
r.Header.Set(s3_constants.AmzStorageClass, storageClass)
}
}
// Check if this upload uses SSE-KMS // Check if this upload uses SSE-KMS
if keyIDBytes, exists := uploadEntry.Extended[s3_constants.SeaweedFSSSEKMSKeyID]; exists { if keyIDBytes, exists := uploadEntry.Extended[s3_constants.SeaweedFSSSEKMSKeyID]; exists {
keyID := string(keyIDBytes) keyID := string(keyIDBytes)

16
weed/s3api/s3api_object_handlers_put.go

@ -357,6 +357,12 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, filePath string, dataReader
collection = s3a.getCollectionName(bucket) collection = s3a.getCollectionName(bucket)
} }
effectiveStorageClass, errCode := resolveEffectiveStorageClass(r.Header, nil)
if errCode != s3err.ErrNone {
return "", errCode, SSEResponseMetadata{}
}
targetDiskType := s3a.mapStorageClassToDiskType(effectiveStorageClass)
// Create assign function for chunked upload // Create assign function for chunked upload
assignFunc := func(ctx context.Context, count int) (*operation.VolumeAssignRequest, *operation.AssignResult, error) { assignFunc := func(ctx context.Context, count int) (*operation.VolumeAssignRequest, *operation.AssignResult, error) {
var assignResult *filer_pb.AssignVolumeResponse var assignResult *filer_pb.AssignVolumeResponse
@ -365,7 +371,7 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, filePath string, dataReader
Count: int32(count), Count: int32(count),
Replication: "", Replication: "",
Collection: collection, Collection: collection,
DiskType: "",
DiskType: targetDiskType,
DataCenter: s3a.option.DataCenter, DataCenter: s3a.option.DataCenter,
Path: filePath, Path: filePath,
}) })
@ -585,12 +591,8 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, filePath string, dataReader
} }
// Store the storage class from header // Store the storage class from header
if sc := r.Header.Get(s3_constants.AmzStorageClass); sc != "" {
if !validateStorageClass(sc) {
glog.Warningf("putToFiler: Invalid storage class '%s' for %s", sc, filePath)
return "", s3err.ErrInvalidStorageClass, SSEResponseMetadata{}
}
entry.Extended[s3_constants.AmzStorageClass] = []byte(sc)
if sc := strings.TrimSpace(r.Header.Get(s3_constants.AmzStorageClass)); sc != "" {
entry.Extended[s3_constants.AmzStorageClass] = []byte(effectiveStorageClass)
} }
// Parse and store object tags from X-Amz-Tagging header // Parse and store object tags from X-Amz-Tagging header

9
weed/s3api/s3api_server.go

@ -56,6 +56,7 @@ type S3ApiServerOption struct {
Cipher bool // encrypt data on volume servers Cipher bool // encrypt data on volume servers
BindIp string BindIp string
GrpcPort int GrpcPort int
StorageClassDiskTypeMap string // e.g. "STANDARD_IA=ssd,GLACIER=hdd"
} }
type S3ApiServer struct { type S3ApiServer struct {
@ -78,6 +79,7 @@ type S3ApiServer struct {
embeddedIam *EmbeddedIamApi // Embedded IAM API server (when enabled) embeddedIam *EmbeddedIamApi // Embedded IAM API server (when enabled)
stsHandlers *STSHandlers // STS HTTP handlers for AssumeRoleWithWebIdentity stsHandlers *STSHandlers // STS HTTP handlers for AssumeRoleWithWebIdentity
cipher bool // encrypt data on volume servers cipher bool // encrypt data on volume servers
storageClassDiskTypes map[string]string
} }
func NewS3ApiServer(router *mux.Router, option *S3ApiServerOption) (s3ApiServer *S3ApiServer, err error) { func NewS3ApiServer(router *mux.Router, option *S3ApiServerOption) (s3ApiServer *S3ApiServer, err error) {
@ -168,6 +170,13 @@ func NewS3ApiServerWithStore(router *mux.Router, option *S3ApiServerOption, expl
inFlightDataLimitCond: sync.NewCond(new(sync.Mutex)), inFlightDataLimitCond: sync.NewCond(new(sync.Mutex)),
cipher: option.Cipher, cipher: option.Cipher,
} }
if option.StorageClassDiskTypeMap != "" {
parsedMappings, parseErr := parseStorageClassDiskTypeMap(option.StorageClassDiskTypeMap)
if parseErr != nil {
return nil, fmt.Errorf("invalid -s3.storageClassDiskTypeMap: %w", parseErr)
}
s3ApiServer.storageClassDiskTypes = parsedMappings
}
// Set s3a reference in circuit breaker for upload limiting // Set s3a reference in circuit breaker for upload limiting
s3ApiServer.cb.s3a = s3ApiServer s3ApiServer.cb.s3a = s3ApiServer

79
weed/s3api/storage_class_routing.go

@ -0,0 +1,79 @@
package s3api
import (
"fmt"
"net/http"
"strings"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
)
const defaultStorageClass = "STANDARD"
func normalizeStorageClass(storageClass string) string {
return strings.ToUpper(strings.TrimSpace(storageClass))
}
func parseStorageClassDiskTypeMap(raw string) (map[string]string, error) {
mappings := make(map[string]string)
if strings.TrimSpace(raw) == "" {
return mappings, nil
}
for _, token := range strings.Split(raw, ",") {
token = strings.TrimSpace(token)
if token == "" {
continue
}
parts := strings.SplitN(token, "=", 2)
if len(parts) != 2 {
return nil, fmt.Errorf("invalid mapping %q, expected STORAGE_CLASS=diskType", token)
}
storageClass := normalizeStorageClass(parts[0])
if !validateStorageClass(storageClass) {
return nil, fmt.Errorf("invalid storage class %q in mapping %q", storageClass, token)
}
diskType := strings.TrimSpace(parts[1])
if diskType == "" {
return nil, fmt.Errorf("empty disk type in mapping %q", token)
}
mappings[storageClass] = diskType
}
return mappings, nil
}
func resolveEffectiveStorageClass(header http.Header, entryExtended map[string][]byte) (string, s3err.ErrorCode) {
if header != nil {
if fromHeader := strings.TrimSpace(header.Get(s3_constants.AmzStorageClass)); fromHeader != "" {
storageClass := normalizeStorageClass(fromHeader)
if !validateStorageClass(storageClass) {
return "", s3err.ErrInvalidStorageClass
}
return storageClass, s3err.ErrNone
}
}
if entryExtended != nil {
if fromEntry := strings.TrimSpace(string(entryExtended[s3_constants.AmzStorageClass])); fromEntry != "" {
storageClass := normalizeStorageClass(fromEntry)
if validateStorageClass(storageClass) {
return storageClass, s3err.ErrNone
}
}
}
return defaultStorageClass, s3err.ErrNone
}
func (s3a *S3ApiServer) mapStorageClassToDiskType(storageClass string) string {
if len(s3a.storageClassDiskTypes) == 0 {
return ""
}
return s3a.storageClassDiskTypes[normalizeStorageClass(storageClass)]
}

77
weed/s3api/storage_class_routing_test.go

@ -0,0 +1,77 @@
package s3api
import (
"net/http"
"testing"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
)
func TestParseStorageClassDiskTypeMap(t *testing.T) {
mappings, err := parseStorageClassDiskTypeMap("STANDARD_IA=ssd,GLACIER=hdd")
if err != nil {
t.Fatalf("parseStorageClassDiskTypeMap returned error: %v", err)
}
if got, want := mappings["STANDARD_IA"], "ssd"; got != want {
t.Fatalf("STANDARD_IA mapping mismatch: got %q want %q", got, want)
}
if got, want := mappings["GLACIER"], "hdd"; got != want {
t.Fatalf("GLACIER mapping mismatch: got %q want %q", got, want)
}
}
func TestParseStorageClassDiskTypeMapRejectsInvalidInput(t *testing.T) {
testCases := []string{
"INVALID=ssd",
"STANDARD_IA=",
"STANDARD_IA",
}
for _, tc := range testCases {
if _, err := parseStorageClassDiskTypeMap(tc); err == nil {
t.Fatalf("expected parse failure for %q", tc)
}
}
}
func TestResolveEffectiveStorageClass(t *testing.T) {
header := make(http.Header)
header.Set(s3_constants.AmzStorageClass, "standard_ia")
sc, code := resolveEffectiveStorageClass(header, nil)
if code != s3err.ErrNone {
t.Fatalf("expected no error, got %v", code)
}
if sc != "STANDARD_IA" {
t.Fatalf("expected STANDARD_IA, got %q", sc)
}
header = make(http.Header)
sc, code = resolveEffectiveStorageClass(header, map[string][]byte{
s3_constants.AmzStorageClass: []byte("GLACIER"),
})
if code != s3err.ErrNone {
t.Fatalf("expected no error for entry metadata, got %v", code)
}
if sc != "GLACIER" {
t.Fatalf("expected GLACIER, got %q", sc)
}
sc, code = resolveEffectiveStorageClass(header, nil)
if code != s3err.ErrNone {
t.Fatalf("expected no error for default class, got %v", code)
}
if sc != defaultStorageClass {
t.Fatalf("expected default storage class %q, got %q", defaultStorageClass, sc)
}
}
func TestResolveEffectiveStorageClassRejectsInvalidHeader(t *testing.T) {
header := make(http.Header)
header.Set(s3_constants.AmzStorageClass, "not-a-class")
_, code := resolveEffectiveStorageClass(header, nil)
if code != s3err.ErrInvalidStorageClass {
t.Fatalf("expected ErrInvalidStorageClass, got %v", code)
}
}
Loading…
Cancel
Save