Browse Source

s3: load storage-class disk mapping from filer grpc config

feature-8113-storage-class-disk-routing
Chris Lu 2 days ago
parent
commit
4f1fce0882
  1. 23
      docs/design/s3-storage-class-disk-routing.md
  2. 1
      other/java/client/src/main/proto/filer.proto
  3. 1
      weed/command/filer.go
  4. 1
      weed/command/mini.go
  5. 8
      weed/command/s3.go
  6. 16
      weed/command/scaffold/filer.toml
  7. 1
      weed/command/server.go
  8. 1
      weed/pb/filer.proto
  9. 192
      weed/pb/filer_pb/filer.pb.go
  10. 10
      weed/s3api/s3api_server.go
  11. 48
      weed/s3api/storage_class_routing.go
  12. 30
      weed/s3api/storage_class_routing_test.go
  13. 57
      weed/server/filer_grpc_server_admin.go

23
docs/design/s3-storage-class-disk-routing.md

@ -10,19 +10,25 @@ SeaweedFS already stores S3 `x-amz-storage-class` as object metadata, but write
## Phase 1 (implemented in this PR)
### Scope
1. Add S3 server option `storageClassDiskTypeMap` (`-s3.storageClassDiskTypeMap` in composite commands, `-storageClassDiskTypeMap` in standalone `weed s3`).
2. Parse map format: `STORAGE_CLASS=diskType` comma-separated, e.g. `STANDARD_IA=ssd,GLACIER=hdd`.
3. Resolve effective storage class from:
1. Configure routing in `filer.toml`:
- section: `[s3.storage_class_disk_type]`
- keys: lowercase storage classes (e.g. `standard_ia = "hdd"`).
2. Load this routing map from filer via gRPC (`GetFilerConfiguration`) so S3 instances use filer-provided config.
3. Provide explicit defaults (when not set in `filer.toml`):
- `standard = "ssd"`
- all colder classes default to `"hdd"`.
4. Resolve effective storage class from:
- request header `X-Amz-Storage-Class`
- fallback to stored entry metadata (when available)
- fallback to `STANDARD`
4. Apply mapped disk type on `AssignVolume` for `putToFiler` upload path.
5. For multipart uploads, propagate storage class from upload metadata to part requests so part chunk allocation also follows routing.
5. Apply mapped disk type on `AssignVolume` for `putToFiler` upload path.
6. For multipart uploads, propagate storage class from upload metadata to part requests so part chunk allocation also follows routing.
### Behavior
1. If mapping is empty or class is unmapped: unchanged behavior (`DiskType=""`).
2. Invalid storage class in request header: return `InvalidStorageClass`.
3. Metadata storage remains AWS-compatible (`X-Amz-Storage-Class` is still saved when explicitly provided).
1. If class mapping is not configured explicitly in `filer.toml`, filer defaults are applied.
2. If class is unknown to the routing table at runtime: unchanged behavior (`DiskType=""`).
3. Invalid storage class in request header: return `InvalidStorageClass`.
4. Metadata storage remains AWS-compatible (`X-Amz-Storage-Class` is still saved when explicitly provided).
## Phase 2 (next)
1. Apply the same routing decision to server-side copy chunk allocation paths.
@ -42,4 +48,3 @@ SeaweedFS already stores S3 `x-amz-storage-class` as object metadata, but write
1. Lifecycle-driven transitions (`STANDARD` -> `GLACIER` by age).
2. Cost-aware placement balancing.
3. Cross-cluster migration.

1
other/java/client/src/main/proto/filer.proto

@ -357,6 +357,7 @@ message GetFilerConfigurationResponse {
string filer_group = 13;
int32 major_version = 14;
int32 minor_version = 15;
map<string, string> storage_class_disk_type = 16;
}
message SubscribeMetadataRequest {

1
weed/command/filer.go

@ -144,7 +144,6 @@ func init() {
filerS3Options.enableIam = cmdFiler.Flag.Bool("s3.iam", true, "enable embedded IAM API on the same S3 port")
filerS3Options.cipher = cmdFiler.Flag.Bool("s3.encryptVolumeData", false, "encrypt data on volume servers for S3 uploads")
filerS3Options.iamReadOnly = cmdFiler.Flag.Bool("s3.iam.readOnly", true, "disable IAM write operations on this server")
filerS3Options.storageClassDiskTypeMap = cmdFiler.Flag.String("s3.storageClassDiskTypeMap", "", "map S3 storage classes to filer disk types, e.g. STANDARD_IA=ssd,GLACIER=hdd")
filerS3Options.portIceberg = cmdFiler.Flag.Int("s3.port.iceberg", 8181, "Iceberg REST Catalog server listen port (0 to disable)")
// start webdav on filer

1
weed/command/mini.go

@ -241,7 +241,6 @@ func initMiniS3Flags() {
miniS3Options.iamReadOnly = miniS3IamReadOnly
miniS3Options.dataCenter = cmdMini.Flag.String("s3.dataCenter", "", "prefer to read and write to volumes in this data center")
miniS3Options.cipher = cmdMini.Flag.Bool("s3.encryptVolumeData", false, "encrypt data on volume servers for S3 uploads")
miniS3Options.storageClassDiskTypeMap = cmdMini.Flag.String("s3.storageClassDiskTypeMap", "", "map S3 storage classes to filer disk types, e.g. STANDARD_IA=ssd,GLACIER=hdd")
miniS3Options.config = miniS3Config
miniS3Options.iamConfig = miniIamConfig
miniS3Options.auditLogConfig = cmdMini.Flag.String("s3.auditLogConfig", "", "path to the audit log config file")

8
weed/command/s3.go

@ -67,7 +67,6 @@ type S3Options struct {
debug *bool
debugPort *int
cipher *bool
storageClassDiskTypeMap *string
}
func init() {
@ -102,7 +101,6 @@ func init() {
s3StandaloneOptions.debug = cmdS3.Flag.Bool("debug", false, "serves runtime profiling data via pprof on the port specified by -debug.port")
s3StandaloneOptions.debugPort = cmdS3.Flag.Int("debug.port", 6060, "http port for debugging")
s3StandaloneOptions.cipher = cmdS3.Flag.Bool("encryptVolumeData", false, "encrypt data on volume servers")
s3StandaloneOptions.storageClassDiskTypeMap = cmdS3.Flag.String("storageClassDiskTypeMap", "", "map S3 storage classes to filer disk types, e.g. STANDARD_IA=ssd,GLACIER=hdd")
}
var cmdS3 = &Command{
@ -231,6 +229,7 @@ func (s3opt *S3Options) startS3Server() bool {
filerBucketsPath := "/buckets"
filerGroup := ""
var masterAddresses []pb.ServerAddress
storageClassDiskTypeMap := make(map[string]string)
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
@ -248,6 +247,7 @@ func (s3opt *S3Options) startS3Server() bool {
filerGroup = resp.FilerGroup
// Get master addresses for filer discovery
masterAddresses = pb.ServerAddresses(strings.Join(resp.Masters, ",")).ToAddresses()
storageClassDiskTypeMap = resp.GetStorageClassDiskType()
metricsAddress, metricsIntervalSec = resp.MetricsAddress, int(resp.MetricsIntervalSec)
glog.V(0).Infof("S3 read filer buckets dir: %s", filerBucketsPath)
if len(masterAddresses) > 0 {
@ -273,10 +273,6 @@ func (s3opt *S3Options) startS3Server() bool {
}
var s3ApiServer *s3api.S3ApiServer
var s3ApiServer_err error
storageClassDiskTypeMap := ""
if s3opt.storageClassDiskTypeMap != nil {
storageClassDiskTypeMap = *s3opt.storageClassDiskTypeMap
}
// Create S3 server with optional advanced IAM integration
var iamConfigPath string

16
weed/command/scaffold/filer.toml

@ -14,6 +14,21 @@
recursive_delete = false
#max_file_name_length = 255
[s3.storage_class_disk_type]
# Route S3 object writes to volume disk types by storage class.
# Valid disk types are existing SeaweedFS disk tags, e.g. "hdd", "ssd", or custom tags.
# Defaults: STANDARD on ssd, all colder tiers on hdd.
standard = "ssd"
reduced_redundancy = "hdd"
standard_ia = "hdd"
onezone_ia = "hdd"
intelligent_tiering = "hdd"
glacier = "hdd"
deep_archive = "hdd"
outposts = "hdd"
glacier_ir = "hdd"
snow = "hdd"
####################################################
# The following are filer store options
####################################################
@ -440,4 +455,3 @@ password = ""
timeout = "5s"
maxReconnects = 1000

1
weed/command/server.go

@ -178,7 +178,6 @@ func init() {
s3Options.enableIam = cmdServer.Flag.Bool("s3.iam", true, "enable embedded IAM API on the same S3 port")
s3Options.iamReadOnly = cmdServer.Flag.Bool("s3.iam.readOnly", true, "disable IAM write operations on this server")
s3Options.cipher = cmdServer.Flag.Bool("s3.encryptVolumeData", false, "encrypt data on volume servers for S3 uploads")
s3Options.storageClassDiskTypeMap = cmdServer.Flag.String("s3.storageClassDiskTypeMap", "", "map S3 storage classes to filer disk types, e.g. STANDARD_IA=ssd,GLACIER=hdd")
sftpOptions.port = cmdServer.Flag.Int("sftp.port", 2022, "SFTP server listen port")
sftpOptions.sshPrivateKey = cmdServer.Flag.String("sftp.sshPrivateKey", "", "path to the SSH private key file for host authentication")

1
weed/pb/filer.proto

@ -357,6 +357,7 @@ message GetFilerConfigurationResponse {
string filer_group = 13;
int32 major_version = 14;
int32 minor_version = 15;
map<string, string> storage_class_disk_type = 16;
}
message SubscribeMetadataRequest {

192
weed/pb/filer_pb/filer.pb.go

@ -2635,23 +2635,24 @@ func (*GetFilerConfigurationRequest) Descriptor() ([]byte, []int) {
}
type GetFilerConfigurationResponse struct {
state protoimpl.MessageState `protogen:"open.v1"`
Masters []string `protobuf:"bytes,1,rep,name=masters,proto3" json:"masters,omitempty"`
Replication string `protobuf:"bytes,2,opt,name=replication,proto3" json:"replication,omitempty"`
Collection string `protobuf:"bytes,3,opt,name=collection,proto3" json:"collection,omitempty"`
MaxMb uint32 `protobuf:"varint,4,opt,name=max_mb,json=maxMb,proto3" json:"max_mb,omitempty"`
DirBuckets string `protobuf:"bytes,5,opt,name=dir_buckets,json=dirBuckets,proto3" json:"dir_buckets,omitempty"`
Cipher bool `protobuf:"varint,7,opt,name=cipher,proto3" json:"cipher,omitempty"`
Signature int32 `protobuf:"varint,8,opt,name=signature,proto3" json:"signature,omitempty"`
MetricsAddress string `protobuf:"bytes,9,opt,name=metrics_address,json=metricsAddress,proto3" json:"metrics_address,omitempty"`
MetricsIntervalSec int32 `protobuf:"varint,10,opt,name=metrics_interval_sec,json=metricsIntervalSec,proto3" json:"metrics_interval_sec,omitempty"`
Version string `protobuf:"bytes,11,opt,name=version,proto3" json:"version,omitempty"`
ClusterId string `protobuf:"bytes,12,opt,name=cluster_id,json=clusterId,proto3" json:"cluster_id,omitempty"`
FilerGroup string `protobuf:"bytes,13,opt,name=filer_group,json=filerGroup,proto3" json:"filer_group,omitempty"`
MajorVersion int32 `protobuf:"varint,14,opt,name=major_version,json=majorVersion,proto3" json:"major_version,omitempty"`
MinorVersion int32 `protobuf:"varint,15,opt,name=minor_version,json=minorVersion,proto3" json:"minor_version,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
state protoimpl.MessageState `protogen:"open.v1"`
Masters []string `protobuf:"bytes,1,rep,name=masters,proto3" json:"masters,omitempty"`
Replication string `protobuf:"bytes,2,opt,name=replication,proto3" json:"replication,omitempty"`
Collection string `protobuf:"bytes,3,opt,name=collection,proto3" json:"collection,omitempty"`
MaxMb uint32 `protobuf:"varint,4,opt,name=max_mb,json=maxMb,proto3" json:"max_mb,omitempty"`
DirBuckets string `protobuf:"bytes,5,opt,name=dir_buckets,json=dirBuckets,proto3" json:"dir_buckets,omitempty"`
Cipher bool `protobuf:"varint,7,opt,name=cipher,proto3" json:"cipher,omitempty"`
Signature int32 `protobuf:"varint,8,opt,name=signature,proto3" json:"signature,omitempty"`
MetricsAddress string `protobuf:"bytes,9,opt,name=metrics_address,json=metricsAddress,proto3" json:"metrics_address,omitempty"`
MetricsIntervalSec int32 `protobuf:"varint,10,opt,name=metrics_interval_sec,json=metricsIntervalSec,proto3" json:"metrics_interval_sec,omitempty"`
Version string `protobuf:"bytes,11,opt,name=version,proto3" json:"version,omitempty"`
ClusterId string `protobuf:"bytes,12,opt,name=cluster_id,json=clusterId,proto3" json:"cluster_id,omitempty"`
FilerGroup string `protobuf:"bytes,13,opt,name=filer_group,json=filerGroup,proto3" json:"filer_group,omitempty"`
MajorVersion int32 `protobuf:"varint,14,opt,name=major_version,json=majorVersion,proto3" json:"major_version,omitempty"`
MinorVersion int32 `protobuf:"varint,15,opt,name=minor_version,json=minorVersion,proto3" json:"minor_version,omitempty"`
StorageClassDiskType map[string]string `protobuf:"bytes,16,rep,name=storage_class_disk_type,json=storageClassDiskType,proto3" json:"storage_class_disk_type,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *GetFilerConfigurationResponse) Reset() {
@ -2782,6 +2783,13 @@ func (x *GetFilerConfigurationResponse) GetMinorVersion() int32 {
return 0
}
func (x *GetFilerConfigurationResponse) GetStorageClassDiskType() map[string]string {
if x != nil {
return x.StorageClassDiskType
}
return nil
}
type SubscribeMetadataRequest struct {
state protoimpl.MessageState `protogen:"open.v1"`
ClientName string `protobuf:"bytes,1,opt,name=client_name,json=clientName,proto3" json:"client_name,omitempty"`
@ -4186,7 +4194,7 @@ type LocateBrokerResponse_Resource struct {
func (x *LocateBrokerResponse_Resource) Reset() {
*x = LocateBrokerResponse_Resource{}
mi := &file_filer_proto_msgTypes[68]
mi := &file_filer_proto_msgTypes[69]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@ -4198,7 +4206,7 @@ func (x *LocateBrokerResponse_Resource) String() string {
func (*LocateBrokerResponse_Resource) ProtoMessage() {}
func (x *LocateBrokerResponse_Resource) ProtoReflect() protoreflect.Message {
mi := &file_filer_proto_msgTypes[68]
mi := &file_filer_proto_msgTypes[69]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@ -4252,7 +4260,7 @@ type FilerConf_PathConf struct {
func (x *FilerConf_PathConf) Reset() {
*x = FilerConf_PathConf{}
mi := &file_filer_proto_msgTypes[69]
mi := &file_filer_proto_msgTypes[70]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@ -4264,7 +4272,7 @@ func (x *FilerConf_PathConf) String() string {
func (*FilerConf_PathConf) ProtoMessage() {}
func (x *FilerConf_PathConf) ProtoReflect() protoreflect.Message {
mi := &file_filer_proto_msgTypes[69]
mi := &file_filer_proto_msgTypes[70]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@ -4623,7 +4631,7 @@ const file_filer_proto_rawDesc = "" +
"\x0eremote_time_ns\x18\x02 \x01(\x03R\fremoteTimeNs\x12 \n" +
"\fstop_time_ns\x18\x03 \x01(\x03R\n" +
"stopTimeNs\"\x1e\n" +
"\x1cGetFilerConfigurationRequest\"\xe8\x03\n" +
"\x1cGetFilerConfigurationRequest\"\xab\x05\n" +
"\x1dGetFilerConfigurationResponse\x12\x18\n" +
"\amasters\x18\x01 \x03(\tR\amasters\x12 \n" +
"\vreplication\x18\x02 \x01(\tR\vreplication\x12\x1e\n" +
@ -4644,7 +4652,11 @@ const file_filer_proto_rawDesc = "" +
"\vfiler_group\x18\r \x01(\tR\n" +
"filerGroup\x12#\n" +
"\rmajor_version\x18\x0e \x01(\x05R\fmajorVersion\x12#\n" +
"\rminor_version\x18\x0f \x01(\x05R\fminorVersion\"\xb7\x02\n" +
"\rminor_version\x18\x0f \x01(\x05R\fminorVersion\x12x\n" +
"\x17storage_class_disk_type\x18\x10 \x03(\v2A.filer_pb.GetFilerConfigurationResponse.StorageClassDiskTypeEntryR\x14storageClassDiskType\x1aG\n" +
"\x19StorageClassDiskTypeEntry\x12\x10\n" +
"\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n" +
"\x05value\x18\x02 \x01(\tR\x05value:\x028\x01\"\xb7\x02\n" +
"\x18SubscribeMetadataRequest\x12\x1f\n" +
"\vclient_name\x18\x01 \x01(\tR\n" +
"clientName\x12\x1f\n" +
@ -4811,7 +4823,7 @@ func file_filer_proto_rawDescGZIP() []byte {
}
var file_filer_proto_enumTypes = make([]protoimpl.EnumInfo, 1)
var file_filer_proto_msgTypes = make([]protoimpl.MessageInfo, 70)
var file_filer_proto_msgTypes = make([]protoimpl.MessageInfo, 71)
var file_filer_proto_goTypes = []any{
(SSEType)(0), // 0: filer_pb.SSEType
(*LookupDirectoryEntryRequest)(nil), // 1: filer_pb.LookupDirectoryEntryRequest
@ -4882,8 +4894,9 @@ var file_filer_proto_goTypes = []any{
(*TransferLocksResponse)(nil), // 66: filer_pb.TransferLocksResponse
nil, // 67: filer_pb.Entry.ExtendedEntry
nil, // 68: filer_pb.LookupVolumeResponse.LocationsMapEntry
(*LocateBrokerResponse_Resource)(nil), // 69: filer_pb.LocateBrokerResponse.Resource
(*FilerConf_PathConf)(nil), // 70: filer_pb.FilerConf.PathConf
nil, // 69: filer_pb.GetFilerConfigurationResponse.StorageClassDiskTypeEntry
(*LocateBrokerResponse_Resource)(nil), // 70: filer_pb.LocateBrokerResponse.Resource
(*FilerConf_PathConf)(nil), // 71: filer_pb.FilerConf.PathConf
}
var file_filer_proto_depIdxs = []int32{
6, // 0: filer_pb.LookupDirectoryEntryResponse.entry:type_name -> filer_pb.Entry
@ -4907,68 +4920,69 @@ var file_filer_proto_depIdxs = []int32{
29, // 18: filer_pb.Locations.locations:type_name -> filer_pb.Location
68, // 19: filer_pb.LookupVolumeResponse.locations_map:type_name -> filer_pb.LookupVolumeResponse.LocationsMapEntry
31, // 20: filer_pb.CollectionListResponse.collections:type_name -> filer_pb.Collection
8, // 21: filer_pb.SubscribeMetadataResponse.event_notification:type_name -> filer_pb.EventNotification
6, // 22: filer_pb.TraverseBfsMetadataResponse.entry:type_name -> filer_pb.Entry
69, // 23: filer_pb.LocateBrokerResponse.resources:type_name -> filer_pb.LocateBrokerResponse.Resource
70, // 24: filer_pb.FilerConf.locations:type_name -> filer_pb.FilerConf.PathConf
6, // 25: filer_pb.CacheRemoteObjectToLocalClusterResponse.entry:type_name -> filer_pb.Entry
64, // 26: filer_pb.TransferLocksRequest.locks:type_name -> filer_pb.Lock
28, // 27: filer_pb.LookupVolumeResponse.LocationsMapEntry.value:type_name -> filer_pb.Locations
1, // 28: filer_pb.SeaweedFiler.LookupDirectoryEntry:input_type -> filer_pb.LookupDirectoryEntryRequest
3, // 29: filer_pb.SeaweedFiler.ListEntries:input_type -> filer_pb.ListEntriesRequest
13, // 30: filer_pb.SeaweedFiler.CreateEntry:input_type -> filer_pb.CreateEntryRequest
15, // 31: filer_pb.SeaweedFiler.UpdateEntry:input_type -> filer_pb.UpdateEntryRequest
17, // 32: filer_pb.SeaweedFiler.AppendToEntry:input_type -> filer_pb.AppendToEntryRequest
19, // 33: filer_pb.SeaweedFiler.DeleteEntry:input_type -> filer_pb.DeleteEntryRequest
21, // 34: filer_pb.SeaweedFiler.AtomicRenameEntry:input_type -> filer_pb.AtomicRenameEntryRequest
23, // 35: filer_pb.SeaweedFiler.StreamRenameEntry:input_type -> filer_pb.StreamRenameEntryRequest
25, // 36: filer_pb.SeaweedFiler.AssignVolume:input_type -> filer_pb.AssignVolumeRequest
27, // 37: filer_pb.SeaweedFiler.LookupVolume:input_type -> filer_pb.LookupVolumeRequest
32, // 38: filer_pb.SeaweedFiler.CollectionList:input_type -> filer_pb.CollectionListRequest
34, // 39: filer_pb.SeaweedFiler.DeleteCollection:input_type -> filer_pb.DeleteCollectionRequest
36, // 40: filer_pb.SeaweedFiler.Statistics:input_type -> filer_pb.StatisticsRequest
38, // 41: filer_pb.SeaweedFiler.Ping:input_type -> filer_pb.PingRequest
40, // 42: filer_pb.SeaweedFiler.GetFilerConfiguration:input_type -> filer_pb.GetFilerConfigurationRequest
44, // 43: filer_pb.SeaweedFiler.TraverseBfsMetadata:input_type -> filer_pb.TraverseBfsMetadataRequest
42, // 44: filer_pb.SeaweedFiler.SubscribeMetadata:input_type -> filer_pb.SubscribeMetadataRequest
42, // 45: filer_pb.SeaweedFiler.SubscribeLocalMetadata:input_type -> filer_pb.SubscribeMetadataRequest
51, // 46: filer_pb.SeaweedFiler.KvGet:input_type -> filer_pb.KvGetRequest
53, // 47: filer_pb.SeaweedFiler.KvPut:input_type -> filer_pb.KvPutRequest
56, // 48: filer_pb.SeaweedFiler.CacheRemoteObjectToLocalCluster:input_type -> filer_pb.CacheRemoteObjectToLocalClusterRequest
58, // 49: filer_pb.SeaweedFiler.DistributedLock:input_type -> filer_pb.LockRequest
60, // 50: filer_pb.SeaweedFiler.DistributedUnlock:input_type -> filer_pb.UnlockRequest
62, // 51: filer_pb.SeaweedFiler.FindLockOwner:input_type -> filer_pb.FindLockOwnerRequest
65, // 52: filer_pb.SeaweedFiler.TransferLocks:input_type -> filer_pb.TransferLocksRequest
2, // 53: filer_pb.SeaweedFiler.LookupDirectoryEntry:output_type -> filer_pb.LookupDirectoryEntryResponse
4, // 54: filer_pb.SeaweedFiler.ListEntries:output_type -> filer_pb.ListEntriesResponse
14, // 55: filer_pb.SeaweedFiler.CreateEntry:output_type -> filer_pb.CreateEntryResponse
16, // 56: filer_pb.SeaweedFiler.UpdateEntry:output_type -> filer_pb.UpdateEntryResponse
18, // 57: filer_pb.SeaweedFiler.AppendToEntry:output_type -> filer_pb.AppendToEntryResponse
20, // 58: filer_pb.SeaweedFiler.DeleteEntry:output_type -> filer_pb.DeleteEntryResponse
22, // 59: filer_pb.SeaweedFiler.AtomicRenameEntry:output_type -> filer_pb.AtomicRenameEntryResponse
24, // 60: filer_pb.SeaweedFiler.StreamRenameEntry:output_type -> filer_pb.StreamRenameEntryResponse
26, // 61: filer_pb.SeaweedFiler.AssignVolume:output_type -> filer_pb.AssignVolumeResponse
30, // 62: filer_pb.SeaweedFiler.LookupVolume:output_type -> filer_pb.LookupVolumeResponse
33, // 63: filer_pb.SeaweedFiler.CollectionList:output_type -> filer_pb.CollectionListResponse
35, // 64: filer_pb.SeaweedFiler.DeleteCollection:output_type -> filer_pb.DeleteCollectionResponse
37, // 65: filer_pb.SeaweedFiler.Statistics:output_type -> filer_pb.StatisticsResponse
39, // 66: filer_pb.SeaweedFiler.Ping:output_type -> filer_pb.PingResponse
41, // 67: filer_pb.SeaweedFiler.GetFilerConfiguration:output_type -> filer_pb.GetFilerConfigurationResponse
45, // 68: filer_pb.SeaweedFiler.TraverseBfsMetadata:output_type -> filer_pb.TraverseBfsMetadataResponse
43, // 69: filer_pb.SeaweedFiler.SubscribeMetadata:output_type -> filer_pb.SubscribeMetadataResponse
43, // 70: filer_pb.SeaweedFiler.SubscribeLocalMetadata:output_type -> filer_pb.SubscribeMetadataResponse
52, // 71: filer_pb.SeaweedFiler.KvGet:output_type -> filer_pb.KvGetResponse
54, // 72: filer_pb.SeaweedFiler.KvPut:output_type -> filer_pb.KvPutResponse
57, // 73: filer_pb.SeaweedFiler.CacheRemoteObjectToLocalCluster:output_type -> filer_pb.CacheRemoteObjectToLocalClusterResponse
59, // 74: filer_pb.SeaweedFiler.DistributedLock:output_type -> filer_pb.LockResponse
61, // 75: filer_pb.SeaweedFiler.DistributedUnlock:output_type -> filer_pb.UnlockResponse
63, // 76: filer_pb.SeaweedFiler.FindLockOwner:output_type -> filer_pb.FindLockOwnerResponse
66, // 77: filer_pb.SeaweedFiler.TransferLocks:output_type -> filer_pb.TransferLocksResponse
53, // [53:78] is the sub-list for method output_type
28, // [28:53] is the sub-list for method input_type
28, // [28:28] is the sub-list for extension type_name
28, // [28:28] is the sub-list for extension extendee
0, // [0:28] is the sub-list for field type_name
69, // 21: filer_pb.GetFilerConfigurationResponse.storage_class_disk_type:type_name -> filer_pb.GetFilerConfigurationResponse.StorageClassDiskTypeEntry
8, // 22: filer_pb.SubscribeMetadataResponse.event_notification:type_name -> filer_pb.EventNotification
6, // 23: filer_pb.TraverseBfsMetadataResponse.entry:type_name -> filer_pb.Entry
70, // 24: filer_pb.LocateBrokerResponse.resources:type_name -> filer_pb.LocateBrokerResponse.Resource
71, // 25: filer_pb.FilerConf.locations:type_name -> filer_pb.FilerConf.PathConf
6, // 26: filer_pb.CacheRemoteObjectToLocalClusterResponse.entry:type_name -> filer_pb.Entry
64, // 27: filer_pb.TransferLocksRequest.locks:type_name -> filer_pb.Lock
28, // 28: filer_pb.LookupVolumeResponse.LocationsMapEntry.value:type_name -> filer_pb.Locations
1, // 29: filer_pb.SeaweedFiler.LookupDirectoryEntry:input_type -> filer_pb.LookupDirectoryEntryRequest
3, // 30: filer_pb.SeaweedFiler.ListEntries:input_type -> filer_pb.ListEntriesRequest
13, // 31: filer_pb.SeaweedFiler.CreateEntry:input_type -> filer_pb.CreateEntryRequest
15, // 32: filer_pb.SeaweedFiler.UpdateEntry:input_type -> filer_pb.UpdateEntryRequest
17, // 33: filer_pb.SeaweedFiler.AppendToEntry:input_type -> filer_pb.AppendToEntryRequest
19, // 34: filer_pb.SeaweedFiler.DeleteEntry:input_type -> filer_pb.DeleteEntryRequest
21, // 35: filer_pb.SeaweedFiler.AtomicRenameEntry:input_type -> filer_pb.AtomicRenameEntryRequest
23, // 36: filer_pb.SeaweedFiler.StreamRenameEntry:input_type -> filer_pb.StreamRenameEntryRequest
25, // 37: filer_pb.SeaweedFiler.AssignVolume:input_type -> filer_pb.AssignVolumeRequest
27, // 38: filer_pb.SeaweedFiler.LookupVolume:input_type -> filer_pb.LookupVolumeRequest
32, // 39: filer_pb.SeaweedFiler.CollectionList:input_type -> filer_pb.CollectionListRequest
34, // 40: filer_pb.SeaweedFiler.DeleteCollection:input_type -> filer_pb.DeleteCollectionRequest
36, // 41: filer_pb.SeaweedFiler.Statistics:input_type -> filer_pb.StatisticsRequest
38, // 42: filer_pb.SeaweedFiler.Ping:input_type -> filer_pb.PingRequest
40, // 43: filer_pb.SeaweedFiler.GetFilerConfiguration:input_type -> filer_pb.GetFilerConfigurationRequest
44, // 44: filer_pb.SeaweedFiler.TraverseBfsMetadata:input_type -> filer_pb.TraverseBfsMetadataRequest
42, // 45: filer_pb.SeaweedFiler.SubscribeMetadata:input_type -> filer_pb.SubscribeMetadataRequest
42, // 46: filer_pb.SeaweedFiler.SubscribeLocalMetadata:input_type -> filer_pb.SubscribeMetadataRequest
51, // 47: filer_pb.SeaweedFiler.KvGet:input_type -> filer_pb.KvGetRequest
53, // 48: filer_pb.SeaweedFiler.KvPut:input_type -> filer_pb.KvPutRequest
56, // 49: filer_pb.SeaweedFiler.CacheRemoteObjectToLocalCluster:input_type -> filer_pb.CacheRemoteObjectToLocalClusterRequest
58, // 50: filer_pb.SeaweedFiler.DistributedLock:input_type -> filer_pb.LockRequest
60, // 51: filer_pb.SeaweedFiler.DistributedUnlock:input_type -> filer_pb.UnlockRequest
62, // 52: filer_pb.SeaweedFiler.FindLockOwner:input_type -> filer_pb.FindLockOwnerRequest
65, // 53: filer_pb.SeaweedFiler.TransferLocks:input_type -> filer_pb.TransferLocksRequest
2, // 54: filer_pb.SeaweedFiler.LookupDirectoryEntry:output_type -> filer_pb.LookupDirectoryEntryResponse
4, // 55: filer_pb.SeaweedFiler.ListEntries:output_type -> filer_pb.ListEntriesResponse
14, // 56: filer_pb.SeaweedFiler.CreateEntry:output_type -> filer_pb.CreateEntryResponse
16, // 57: filer_pb.SeaweedFiler.UpdateEntry:output_type -> filer_pb.UpdateEntryResponse
18, // 58: filer_pb.SeaweedFiler.AppendToEntry:output_type -> filer_pb.AppendToEntryResponse
20, // 59: filer_pb.SeaweedFiler.DeleteEntry:output_type -> filer_pb.DeleteEntryResponse
22, // 60: filer_pb.SeaweedFiler.AtomicRenameEntry:output_type -> filer_pb.AtomicRenameEntryResponse
24, // 61: filer_pb.SeaweedFiler.StreamRenameEntry:output_type -> filer_pb.StreamRenameEntryResponse
26, // 62: filer_pb.SeaweedFiler.AssignVolume:output_type -> filer_pb.AssignVolumeResponse
30, // 63: filer_pb.SeaweedFiler.LookupVolume:output_type -> filer_pb.LookupVolumeResponse
33, // 64: filer_pb.SeaweedFiler.CollectionList:output_type -> filer_pb.CollectionListResponse
35, // 65: filer_pb.SeaweedFiler.DeleteCollection:output_type -> filer_pb.DeleteCollectionResponse
37, // 66: filer_pb.SeaweedFiler.Statistics:output_type -> filer_pb.StatisticsResponse
39, // 67: filer_pb.SeaweedFiler.Ping:output_type -> filer_pb.PingResponse
41, // 68: filer_pb.SeaweedFiler.GetFilerConfiguration:output_type -> filer_pb.GetFilerConfigurationResponse
45, // 69: filer_pb.SeaweedFiler.TraverseBfsMetadata:output_type -> filer_pb.TraverseBfsMetadataResponse
43, // 70: filer_pb.SeaweedFiler.SubscribeMetadata:output_type -> filer_pb.SubscribeMetadataResponse
43, // 71: filer_pb.SeaweedFiler.SubscribeLocalMetadata:output_type -> filer_pb.SubscribeMetadataResponse
52, // 72: filer_pb.SeaweedFiler.KvGet:output_type -> filer_pb.KvGetResponse
54, // 73: filer_pb.SeaweedFiler.KvPut:output_type -> filer_pb.KvPutResponse
57, // 74: filer_pb.SeaweedFiler.CacheRemoteObjectToLocalCluster:output_type -> filer_pb.CacheRemoteObjectToLocalClusterResponse
59, // 75: filer_pb.SeaweedFiler.DistributedLock:output_type -> filer_pb.LockResponse
61, // 76: filer_pb.SeaweedFiler.DistributedUnlock:output_type -> filer_pb.UnlockResponse
63, // 77: filer_pb.SeaweedFiler.FindLockOwner:output_type -> filer_pb.FindLockOwnerResponse
66, // 78: filer_pb.SeaweedFiler.TransferLocks:output_type -> filer_pb.TransferLocksResponse
54, // [54:79] is the sub-list for method output_type
29, // [29:54] is the sub-list for method input_type
29, // [29:29] is the sub-list for extension type_name
29, // [29:29] is the sub-list for extension extendee
0, // [0:29] is the sub-list for field type_name
}
func init() { file_filer_proto_init() }
@ -4982,7 +4996,7 @@ func file_filer_proto_init() {
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: unsafe.Slice(unsafe.StringData(file_filer_proto_rawDesc), len(file_filer_proto_rawDesc)),
NumEnums: 1,
NumMessages: 70,
NumMessages: 71,
NumExtensions: 0,
NumServices: 1,
},

10
weed/s3api/s3api_server.go

@ -56,7 +56,7 @@ type S3ApiServerOption struct {
Cipher bool // encrypt data on volume servers
BindIp string
GrpcPort int
StorageClassDiskTypeMap string // e.g. "STANDARD_IA=ssd,GLACIER=hdd"
StorageClassDiskTypeMap map[string]string
}
type S3ApiServer struct {
@ -170,13 +170,7 @@ func NewS3ApiServerWithStore(router *mux.Router, option *S3ApiServerOption, expl
inFlightDataLimitCond: sync.NewCond(new(sync.Mutex)),
cipher: option.Cipher,
}
if option.StorageClassDiskTypeMap != "" {
parsedMappings, parseErr := parseStorageClassDiskTypeMap(option.StorageClassDiskTypeMap)
if parseErr != nil {
return nil, fmt.Errorf("invalid -s3.storageClassDiskTypeMap: %w", parseErr)
}
s3ApiServer.storageClassDiskTypes = parsedMappings
}
s3ApiServer.storageClassDiskTypes = loadStorageClassDiskTypeMap(option.StorageClassDiskTypeMap)
// Set s3a reference in circuit breaker for upload limiting
s3ApiServer.cb.s3a = s3ApiServer

48
weed/s3api/storage_class_routing.go

@ -1,7 +1,6 @@
package s3api
import (
"fmt"
"net/http"
"strings"
@ -11,41 +10,40 @@ import (
const defaultStorageClass = "STANDARD"
var storageClassDefaults = map[string]string{
"STANDARD": "ssd",
"REDUCED_REDUNDANCY": "hdd",
"STANDARD_IA": "hdd",
"ONEZONE_IA": "hdd",
"INTELLIGENT_TIERING": "hdd",
"GLACIER": "hdd",
"DEEP_ARCHIVE": "hdd",
"OUTPOSTS": "hdd",
"GLACIER_IR": "hdd",
"SNOW": "hdd",
}
func normalizeStorageClass(storageClass string) string {
return strings.ToUpper(strings.TrimSpace(storageClass))
}
func parseStorageClassDiskTypeMap(raw string) (map[string]string, error) {
func loadStorageClassDiskTypeMap(overrides map[string]string) map[string]string {
mappings := make(map[string]string)
if strings.TrimSpace(raw) == "" {
return mappings, nil
normalizedOverrides := make(map[string]string)
for k, v := range overrides {
normalizedOverrides[normalizeStorageClass(k)] = v
}
for _, token := range strings.Split(raw, ",") {
token = strings.TrimSpace(token)
if token == "" {
continue
}
parts := strings.SplitN(token, "=", 2)
if len(parts) != 2 {
return nil, fmt.Errorf("invalid mapping %q, expected STORAGE_CLASS=diskType", token)
for storageClass, defaultDiskType := range storageClassDefaults {
diskType := defaultDiskType
if v, ok := normalizedOverrides[storageClass]; ok {
diskType = strings.TrimSpace(v)
}
storageClass := normalizeStorageClass(parts[0])
if !validateStorageClass(storageClass) {
return nil, fmt.Errorf("invalid storage class %q in mapping %q", storageClass, token)
}
diskType := strings.TrimSpace(parts[1])
if diskType == "" {
return nil, fmt.Errorf("empty disk type in mapping %q", token)
continue
}
mappings[storageClass] = diskType
}
return mappings, nil
return mappings
}
func resolveEffectiveStorageClass(header http.Header, entryExtended map[string][]byte) (string, s3err.ErrorCode) {

30
weed/s3api/storage_class_routing_test.go

@ -8,31 +8,19 @@ import (
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
)
func TestParseStorageClassDiskTypeMap(t *testing.T) {
mappings, err := parseStorageClassDiskTypeMap("STANDARD_IA=ssd,GLACIER=hdd")
if err != nil {
t.Fatalf("parseStorageClassDiskTypeMap returned error: %v", err)
}
func TestLoadStorageClassDiskTypeMap(t *testing.T) {
mappings := loadStorageClassDiskTypeMap(map[string]string{
"STANDARD_IA": "nvme",
})
if got, want := mappings["STANDARD_IA"], "ssd"; got != want {
if got, want := mappings["STANDARD_IA"], "nvme"; got != want {
t.Fatalf("STANDARD_IA mapping mismatch: got %q want %q", got, want)
}
if got, want := mappings["GLACIER"], "hdd"; got != want {
t.Fatalf("GLACIER mapping mismatch: got %q want %q", got, want)
}
}
func TestParseStorageClassDiskTypeMapRejectsInvalidInput(t *testing.T) {
testCases := []string{
"INVALID=ssd",
"STANDARD_IA=",
"STANDARD_IA",
if got, want := mappings["STANDARD"], "ssd"; got != want {
t.Fatalf("STANDARD default mismatch: got %q want %q", got, want)
}
for _, tc := range testCases {
if _, err := parseStorageClassDiskTypeMap(tc); err == nil {
t.Fatalf("expected parse failure for %q", tc)
}
if got, want := mappings["GLACIER"], "hdd"; got != want {
t.Fatalf("GLACIER default mismatch: got %q want %q", got, want)
}
}

57
weed/server/filer_grpc_server_admin.go

@ -3,6 +3,7 @@ package weed_server
import (
"context"
"fmt"
"strings"
"time"
"github.com/seaweedfs/seaweedfs/weed/cluster"
@ -11,9 +12,37 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
"github.com/seaweedfs/seaweedfs/weed/util/version"
)
var storageClassDiskTypeDefaults = map[string]string{
"STANDARD": "ssd",
"REDUCED_REDUNDANCY": "hdd",
"STANDARD_IA": "hdd",
"ONEZONE_IA": "hdd",
"INTELLIGENT_TIERING": "hdd",
"GLACIER": "hdd",
"DEEP_ARCHIVE": "hdd",
"OUTPOSTS": "hdd",
"GLACIER_IR": "hdd",
"SNOW": "hdd",
}
func loadStorageClassDiskTypeConfigFromViper(v util.Configuration) map[string]string {
mappings := make(map[string]string)
for storageClass, defaultDiskType := range storageClassDiskTypeDefaults {
key := "s3.storage_class_disk_type." + strings.ToLower(storageClass)
v.SetDefault(key, defaultDiskType)
diskType := strings.TrimSpace(v.GetString(key))
if diskType == "" {
continue
}
mappings[storageClass] = diskType
}
return mappings
}
func (fs *FilerServer) Statistics(ctx context.Context, req *filer_pb.StatisticsRequest) (resp *filer_pb.StatisticsResponse, err error) {
var output *master_pb.StatisticsResponse
@ -84,20 +113,22 @@ func (fs *FilerServer) Ping(ctx context.Context, req *filer_pb.PingRequest) (res
func (fs *FilerServer) GetFilerConfiguration(ctx context.Context, req *filer_pb.GetFilerConfigurationRequest) (resp *filer_pb.GetFilerConfigurationResponse, err error) {
v := util.GetViper()
t := &filer_pb.GetFilerConfigurationResponse{
Masters: fs.option.Masters.GetInstancesAsStrings(),
Collection: fs.option.Collection,
Replication: fs.option.DefaultReplication,
MaxMb: uint32(fs.option.MaxMB),
DirBuckets: fs.filer.DirBucketsPath,
Cipher: fs.filer.Cipher,
Signature: fs.filer.Signature,
MetricsAddress: fs.metricsAddress,
MetricsIntervalSec: int32(fs.metricsIntervalSec),
Version: version.Version(),
FilerGroup: fs.option.FilerGroup,
MajorVersion: version.MAJOR_VERSION,
MinorVersion: version.MINOR_VERSION,
Masters: fs.option.Masters.GetInstancesAsStrings(),
Collection: fs.option.Collection,
Replication: fs.option.DefaultReplication,
MaxMb: uint32(fs.option.MaxMB),
DirBuckets: fs.filer.DirBucketsPath,
Cipher: fs.filer.Cipher,
Signature: fs.filer.Signature,
MetricsAddress: fs.metricsAddress,
MetricsIntervalSec: int32(fs.metricsIntervalSec),
Version: version.Version(),
FilerGroup: fs.option.FilerGroup,
MajorVersion: version.MAJOR_VERSION,
MinorVersion: version.MINOR_VERSION,
StorageClassDiskType: loadStorageClassDiskTypeConfigFromViper(v),
}
glog.V(4).InfofCtx(ctx, "GetFilerConfiguration: %v", t)

Loading…
Cancel
Save