From 5e4b3f0078caac34f85ab5e3f6eb701797b6af91 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 9 Feb 2026 13:32:34 -0800 Subject: [PATCH] s3: route uploads by storage class disk mapping --- docs/design/s3-storage-class-disk-routing.md | 45 +++++++++++ weed/command/filer.go | 1 + weed/command/mini.go | 1 + weed/command/s3.go | 7 ++ weed/command/server.go | 1 + weed/s3api/s3api_object_handlers_multipart.go | 6 ++ weed/s3api/s3api_object_handlers_put.go | 16 ++-- weed/s3api/s3api_server.go | 9 +++ weed/s3api/storage_class_routing.go | 79 +++++++++++++++++++ weed/s3api/storage_class_routing_test.go | 77 ++++++++++++++++++ 10 files changed, 235 insertions(+), 7 deletions(-) create mode 100644 docs/design/s3-storage-class-disk-routing.md create mode 100644 weed/s3api/storage_class_routing.go create mode 100644 weed/s3api/storage_class_routing_test.go diff --git a/docs/design/s3-storage-class-disk-routing.md b/docs/design/s3-storage-class-disk-routing.md new file mode 100644 index 000000000..5d50a44c6 --- /dev/null +++ b/docs/design/s3-storage-class-disk-routing.md @@ -0,0 +1,45 @@ +# S3 Storage Class to Disk Routing + +## Problem +SeaweedFS already stores S3 `x-amz-storage-class` as object metadata, but write allocation (`AssignVolume`) does not use it. Objects are therefore not routed to specific disk tags by storage class. + +## Goals +1. Route new writes to disk types based on storage class. +2. Preserve current behavior when no routing map is configured. +3. Keep implementation incremental so future storage-class transitions can reuse the same decision logic. + +## Phase 1 (implemented in this PR) +### Scope +1. Add S3 server option `storageClassDiskTypeMap` (`-s3.storageClassDiskTypeMap` in composite commands, `-storageClassDiskTypeMap` in standalone `weed s3`). +2. Parse map format: `STORAGE_CLASS=diskType` comma-separated, e.g. `STANDARD_IA=ssd,GLACIER=hdd`. +3. Resolve effective storage class from: + - request header `X-Amz-Storage-Class` + - fallback to stored entry metadata (when available) + - fallback to `STANDARD` +4. Apply mapped disk type on `AssignVolume` for `putToFiler` upload path. +5. For multipart uploads, propagate storage class from upload metadata to part requests so part chunk allocation also follows routing. + +### Behavior +1. If mapping is empty or class is unmapped: unchanged behavior (`DiskType=""`). +2. Invalid storage class in request header: return `InvalidStorageClass`. +3. Metadata storage remains AWS-compatible (`X-Amz-Storage-Class` is still saved when explicitly provided). + +## Phase 2 (next) +1. Apply the same routing decision to server-side copy chunk allocation paths. +2. Ensure storage-class changes via copy (`x-amz-metadata-directive: REPLACE` + new class) move chunks to target disk type immediately. + +## Phase 3 (future) +1. Add async background transition API for in-place class change: + - mark object transition intent in metadata + - enqueue migration job + - copy chunks to target class disk + - atomically swap metadata/chunks + - garbage collect old chunks +2. Add transition job status and retry handling. +3. Add bucket policy controls for allowed transitions. + +## Non-goals for Phase 1 +1. Lifecycle-driven transitions (`STANDARD` -> `GLACIER` by age). +2. Cost-aware placement balancing. +3. Cross-cluster migration. + diff --git a/weed/command/filer.go b/weed/command/filer.go index dfd08a9eb..601a22af7 100644 --- a/weed/command/filer.go +++ b/weed/command/filer.go @@ -144,6 +144,7 @@ func init() { filerS3Options.enableIam = cmdFiler.Flag.Bool("s3.iam", true, "enable embedded IAM API on the same S3 port") filerS3Options.cipher = cmdFiler.Flag.Bool("s3.encryptVolumeData", false, "encrypt data on volume servers for S3 uploads") filerS3Options.iamReadOnly = cmdFiler.Flag.Bool("s3.iam.readOnly", true, "disable IAM write operations on this server") + filerS3Options.storageClassDiskTypeMap = cmdFiler.Flag.String("s3.storageClassDiskTypeMap", "", "map S3 storage classes to filer disk types, e.g. STANDARD_IA=ssd,GLACIER=hdd") filerS3Options.portIceberg = cmdFiler.Flag.Int("s3.port.iceberg", 8181, "Iceberg REST Catalog server listen port (0 to disable)") // start webdav on filer diff --git a/weed/command/mini.go b/weed/command/mini.go index 278b8db6e..bc2540167 100644 --- a/weed/command/mini.go +++ b/weed/command/mini.go @@ -241,6 +241,7 @@ func initMiniS3Flags() { miniS3Options.iamReadOnly = miniS3IamReadOnly miniS3Options.dataCenter = cmdMini.Flag.String("s3.dataCenter", "", "prefer to read and write to volumes in this data center") miniS3Options.cipher = cmdMini.Flag.Bool("s3.encryptVolumeData", false, "encrypt data on volume servers for S3 uploads") + miniS3Options.storageClassDiskTypeMap = cmdMini.Flag.String("s3.storageClassDiskTypeMap", "", "map S3 storage classes to filer disk types, e.g. STANDARD_IA=ssd,GLACIER=hdd") miniS3Options.config = miniS3Config miniS3Options.iamConfig = miniIamConfig miniS3Options.auditLogConfig = cmdMini.Flag.String("s3.auditLogConfig", "", "path to the audit log config file") diff --git a/weed/command/s3.go b/weed/command/s3.go index 115147b69..1e1d6d8d7 100644 --- a/weed/command/s3.go +++ b/weed/command/s3.go @@ -67,6 +67,7 @@ type S3Options struct { debug *bool debugPort *int cipher *bool + storageClassDiskTypeMap *string } func init() { @@ -101,6 +102,7 @@ func init() { s3StandaloneOptions.debug = cmdS3.Flag.Bool("debug", false, "serves runtime profiling data via pprof on the port specified by -debug.port") s3StandaloneOptions.debugPort = cmdS3.Flag.Int("debug.port", 6060, "http port for debugging") s3StandaloneOptions.cipher = cmdS3.Flag.Bool("encryptVolumeData", false, "encrypt data on volume servers") + s3StandaloneOptions.storageClassDiskTypeMap = cmdS3.Flag.String("storageClassDiskTypeMap", "", "map S3 storage classes to filer disk types, e.g. STANDARD_IA=ssd,GLACIER=hdd") } var cmdS3 = &Command{ @@ -271,6 +273,10 @@ func (s3opt *S3Options) startS3Server() bool { } var s3ApiServer *s3api.S3ApiServer var s3ApiServer_err error + storageClassDiskTypeMap := "" + if s3opt.storageClassDiskTypeMap != nil { + storageClassDiskTypeMap = *s3opt.storageClassDiskTypeMap + } // Create S3 server with optional advanced IAM integration var iamConfigPath string @@ -309,6 +315,7 @@ func (s3opt *S3Options) startS3Server() bool { Cipher: *s3opt.cipher, // encrypt data on volume servers BindIp: *s3opt.bindIp, GrpcPort: *s3opt.portGrpc, + StorageClassDiskTypeMap: storageClassDiskTypeMap, }) if s3ApiServer_err != nil { glog.Fatalf("S3 API Server startup error: %v", s3ApiServer_err) diff --git a/weed/command/server.go b/weed/command/server.go index 5d480070b..d5baa4655 100644 --- a/weed/command/server.go +++ b/weed/command/server.go @@ -178,6 +178,7 @@ func init() { s3Options.enableIam = cmdServer.Flag.Bool("s3.iam", true, "enable embedded IAM API on the same S3 port") s3Options.iamReadOnly = cmdServer.Flag.Bool("s3.iam.readOnly", true, "disable IAM write operations on this server") s3Options.cipher = cmdServer.Flag.Bool("s3.encryptVolumeData", false, "encrypt data on volume servers for S3 uploads") + s3Options.storageClassDiskTypeMap = cmdServer.Flag.String("s3.storageClassDiskTypeMap", "", "map S3 storage classes to filer disk types, e.g. STANDARD_IA=ssd,GLACIER=hdd") sftpOptions.port = cmdServer.Flag.Int("sftp.port", 2022, "SFTP server listen port") sftpOptions.sshPrivateKey = cmdServer.Flag.String("sftp.sshPrivateKey", "", "path to the SSH private key file for host authentication") diff --git a/weed/s3api/s3api_object_handlers_multipart.go b/weed/s3api/s3api_object_handlers_multipart.go index b1190a1b4..713e40631 100644 --- a/weed/s3api/s3api_object_handlers_multipart.go +++ b/weed/s3api/s3api_object_handlers_multipart.go @@ -365,6 +365,12 @@ func (s3a *S3ApiServer) PutObjectPartHandler(w http.ResponseWriter, r *http.Requ // No SSE-C headers, check for SSE-KMS settings from upload directory if uploadEntry, err := s3a.getEntry(s3a.genUploadsFolder(bucket), uploadID); err == nil { if uploadEntry.Extended != nil { + if r.Header.Get(s3_constants.AmzStorageClass) == "" { + if storageClass := strings.TrimSpace(string(uploadEntry.Extended[s3_constants.AmzStorageClass])); storageClass != "" { + r.Header.Set(s3_constants.AmzStorageClass, storageClass) + } + } + // Check if this upload uses SSE-KMS if keyIDBytes, exists := uploadEntry.Extended[s3_constants.SeaweedFSSSEKMSKeyID]; exists { keyID := string(keyIDBytes) diff --git a/weed/s3api/s3api_object_handlers_put.go b/weed/s3api/s3api_object_handlers_put.go index 254885f22..d1ba4a223 100644 --- a/weed/s3api/s3api_object_handlers_put.go +++ b/weed/s3api/s3api_object_handlers_put.go @@ -357,6 +357,12 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, filePath string, dataReader collection = s3a.getCollectionName(bucket) } + effectiveStorageClass, errCode := resolveEffectiveStorageClass(r.Header, nil) + if errCode != s3err.ErrNone { + return "", errCode, SSEResponseMetadata{} + } + targetDiskType := s3a.mapStorageClassToDiskType(effectiveStorageClass) + // Create assign function for chunked upload assignFunc := func(ctx context.Context, count int) (*operation.VolumeAssignRequest, *operation.AssignResult, error) { var assignResult *filer_pb.AssignVolumeResponse @@ -365,7 +371,7 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, filePath string, dataReader Count: int32(count), Replication: "", Collection: collection, - DiskType: "", + DiskType: targetDiskType, DataCenter: s3a.option.DataCenter, Path: filePath, }) @@ -585,12 +591,8 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, filePath string, dataReader } // Store the storage class from header - if sc := r.Header.Get(s3_constants.AmzStorageClass); sc != "" { - if !validateStorageClass(sc) { - glog.Warningf("putToFiler: Invalid storage class '%s' for %s", sc, filePath) - return "", s3err.ErrInvalidStorageClass, SSEResponseMetadata{} - } - entry.Extended[s3_constants.AmzStorageClass] = []byte(sc) + if sc := strings.TrimSpace(r.Header.Get(s3_constants.AmzStorageClass)); sc != "" { + entry.Extended[s3_constants.AmzStorageClass] = []byte(effectiveStorageClass) } // Parse and store object tags from X-Amz-Tagging header diff --git a/weed/s3api/s3api_server.go b/weed/s3api/s3api_server.go index faa0fb8d5..7e9d53f3e 100644 --- a/weed/s3api/s3api_server.go +++ b/weed/s3api/s3api_server.go @@ -56,6 +56,7 @@ type S3ApiServerOption struct { Cipher bool // encrypt data on volume servers BindIp string GrpcPort int + StorageClassDiskTypeMap string // e.g. "STANDARD_IA=ssd,GLACIER=hdd" } type S3ApiServer struct { @@ -78,6 +79,7 @@ type S3ApiServer struct { embeddedIam *EmbeddedIamApi // Embedded IAM API server (when enabled) stsHandlers *STSHandlers // STS HTTP handlers for AssumeRoleWithWebIdentity cipher bool // encrypt data on volume servers + storageClassDiskTypes map[string]string } func NewS3ApiServer(router *mux.Router, option *S3ApiServerOption) (s3ApiServer *S3ApiServer, err error) { @@ -168,6 +170,13 @@ func NewS3ApiServerWithStore(router *mux.Router, option *S3ApiServerOption, expl inFlightDataLimitCond: sync.NewCond(new(sync.Mutex)), cipher: option.Cipher, } + if option.StorageClassDiskTypeMap != "" { + parsedMappings, parseErr := parseStorageClassDiskTypeMap(option.StorageClassDiskTypeMap) + if parseErr != nil { + return nil, fmt.Errorf("invalid -s3.storageClassDiskTypeMap: %w", parseErr) + } + s3ApiServer.storageClassDiskTypes = parsedMappings + } // Set s3a reference in circuit breaker for upload limiting s3ApiServer.cb.s3a = s3ApiServer diff --git a/weed/s3api/storage_class_routing.go b/weed/s3api/storage_class_routing.go new file mode 100644 index 000000000..2ab2fbe8d --- /dev/null +++ b/weed/s3api/storage_class_routing.go @@ -0,0 +1,79 @@ +package s3api + +import ( + "fmt" + "net/http" + "strings" + + "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3err" +) + +const defaultStorageClass = "STANDARD" + +func normalizeStorageClass(storageClass string) string { + return strings.ToUpper(strings.TrimSpace(storageClass)) +} + +func parseStorageClassDiskTypeMap(raw string) (map[string]string, error) { + mappings := make(map[string]string) + if strings.TrimSpace(raw) == "" { + return mappings, nil + } + + for _, token := range strings.Split(raw, ",") { + token = strings.TrimSpace(token) + if token == "" { + continue + } + + parts := strings.SplitN(token, "=", 2) + if len(parts) != 2 { + return nil, fmt.Errorf("invalid mapping %q, expected STORAGE_CLASS=diskType", token) + } + + storageClass := normalizeStorageClass(parts[0]) + if !validateStorageClass(storageClass) { + return nil, fmt.Errorf("invalid storage class %q in mapping %q", storageClass, token) + } + + diskType := strings.TrimSpace(parts[1]) + if diskType == "" { + return nil, fmt.Errorf("empty disk type in mapping %q", token) + } + + mappings[storageClass] = diskType + } + + return mappings, nil +} + +func resolveEffectiveStorageClass(header http.Header, entryExtended map[string][]byte) (string, s3err.ErrorCode) { + if header != nil { + if fromHeader := strings.TrimSpace(header.Get(s3_constants.AmzStorageClass)); fromHeader != "" { + storageClass := normalizeStorageClass(fromHeader) + if !validateStorageClass(storageClass) { + return "", s3err.ErrInvalidStorageClass + } + return storageClass, s3err.ErrNone + } + } + + if entryExtended != nil { + if fromEntry := strings.TrimSpace(string(entryExtended[s3_constants.AmzStorageClass])); fromEntry != "" { + storageClass := normalizeStorageClass(fromEntry) + if validateStorageClass(storageClass) { + return storageClass, s3err.ErrNone + } + } + } + + return defaultStorageClass, s3err.ErrNone +} + +func (s3a *S3ApiServer) mapStorageClassToDiskType(storageClass string) string { + if len(s3a.storageClassDiskTypes) == 0 { + return "" + } + return s3a.storageClassDiskTypes[normalizeStorageClass(storageClass)] +} diff --git a/weed/s3api/storage_class_routing_test.go b/weed/s3api/storage_class_routing_test.go new file mode 100644 index 000000000..b22ae17ea --- /dev/null +++ b/weed/s3api/storage_class_routing_test.go @@ -0,0 +1,77 @@ +package s3api + +import ( + "net/http" + "testing" + + "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3err" +) + +func TestParseStorageClassDiskTypeMap(t *testing.T) { + mappings, err := parseStorageClassDiskTypeMap("STANDARD_IA=ssd,GLACIER=hdd") + if err != nil { + t.Fatalf("parseStorageClassDiskTypeMap returned error: %v", err) + } + + if got, want := mappings["STANDARD_IA"], "ssd"; got != want { + t.Fatalf("STANDARD_IA mapping mismatch: got %q want %q", got, want) + } + if got, want := mappings["GLACIER"], "hdd"; got != want { + t.Fatalf("GLACIER mapping mismatch: got %q want %q", got, want) + } +} + +func TestParseStorageClassDiskTypeMapRejectsInvalidInput(t *testing.T) { + testCases := []string{ + "INVALID=ssd", + "STANDARD_IA=", + "STANDARD_IA", + } + + for _, tc := range testCases { + if _, err := parseStorageClassDiskTypeMap(tc); err == nil { + t.Fatalf("expected parse failure for %q", tc) + } + } +} + +func TestResolveEffectiveStorageClass(t *testing.T) { + header := make(http.Header) + header.Set(s3_constants.AmzStorageClass, "standard_ia") + sc, code := resolveEffectiveStorageClass(header, nil) + if code != s3err.ErrNone { + t.Fatalf("expected no error, got %v", code) + } + if sc != "STANDARD_IA" { + t.Fatalf("expected STANDARD_IA, got %q", sc) + } + + header = make(http.Header) + sc, code = resolveEffectiveStorageClass(header, map[string][]byte{ + s3_constants.AmzStorageClass: []byte("GLACIER"), + }) + if code != s3err.ErrNone { + t.Fatalf("expected no error for entry metadata, got %v", code) + } + if sc != "GLACIER" { + t.Fatalf("expected GLACIER, got %q", sc) + } + + sc, code = resolveEffectiveStorageClass(header, nil) + if code != s3err.ErrNone { + t.Fatalf("expected no error for default class, got %v", code) + } + if sc != defaultStorageClass { + t.Fatalf("expected default storage class %q, got %q", defaultStorageClass, sc) + } +} + +func TestResolveEffectiveStorageClassRejectsInvalidHeader(t *testing.T) { + header := make(http.Header) + header.Set(s3_constants.AmzStorageClass, "not-a-class") + _, code := resolveEffectiveStorageClass(header, nil) + if code != s3err.ErrInvalidStorageClass { + t.Fatalf("expected ErrInvalidStorageClass, got %v", code) + } +}