Browse Source

s3api: persist lifecycle TTL rules and enforce on assign

fix-8303-s3-lifecycle-ttl-assign
Chris Lu 4 weeks ago
parent
commit
34922fd761
  1. 5
      weed/s3api/s3_constants/extend_key.go
  2. 18
      weed/s3api/s3api_bucket_handlers.go
  3. 124
      weed/s3api/s3api_bucket_lifecycle_ttl.go
  4. 68
      weed/s3api/s3api_bucket_lifecycle_ttl_test.go
  5. 5
      weed/s3api/s3api_object_handlers_put.go

5
weed/s3api/s3_constants/extend_key.go

@ -11,7 +11,7 @@ const (
ExtETagKey = "Seaweed-X-Amz-ETag" ExtETagKey = "Seaweed-X-Amz-ETag"
ExtLatestVersionIdKey = "Seaweed-X-Amz-Latest-Version-Id" ExtLatestVersionIdKey = "Seaweed-X-Amz-Latest-Version-Id"
ExtLatestVersionFileNameKey = "Seaweed-X-Amz-Latest-Version-File-Name" ExtLatestVersionFileNameKey = "Seaweed-X-Amz-Latest-Version-File-Name"
ExtAllowEmptyFolders = "Seaweed-X-Amz-Allow-Empty-Folders"
ExtAllowEmptyFolders = "Seaweed-X-Amz-Allow-Empty-Folders"
// Cached list metadata in .versions directory for single-scan efficiency // Cached list metadata in .versions directory for single-scan efficiency
ExtLatestVersionSizeKey = "Seaweed-X-Amz-Latest-Version-Size" ExtLatestVersionSizeKey = "Seaweed-X-Amz-Latest-Version-Size"
ExtLatestVersionETagKey = "Seaweed-X-Amz-Latest-Version-ETag" ExtLatestVersionETagKey = "Seaweed-X-Amz-Latest-Version-ETag"
@ -21,7 +21,8 @@ const (
ExtMultipartObjectKey = "key" ExtMultipartObjectKey = "key"
// Bucket Policy // Bucket Policy
ExtBucketPolicyKey = "Seaweed-X-Amz-Bucket-Policy"
ExtBucketPolicyKey = "Seaweed-X-Amz-Bucket-Policy"
ExtBucketLifecycleTTLRulesKey = "Seaweed-X-Amz-Bucket-Lifecycle-TTL-Rules"
// Object Retention and Legal Hold // Object Retention and Legal Hold
ExtObjectLockModeKey = "Seaweed-X-Amz-Object-Lock-Mode" ExtObjectLockModeKey = "Seaweed-X-Amz-Object-Lock-Mode"

18
weed/s3api/s3api_bucket_handlers.go

@ -860,6 +860,7 @@ func (s3a *S3ApiServer) PutBucketLifecycleConfigurationHandler(w http.ResponseWr
collectionName := s3a.getCollectionName(bucket) collectionName := s3a.getCollectionName(bucket)
collectionTtls := fc.GetCollectionTtls(collectionName) collectionTtls := fc.GetCollectionTtls(collectionName)
changed := false changed := false
lifecycleRules := make([]bucketLifecycleTTLRule, 0, len(lifeCycleConfig.Rules))
for _, rule := range lifeCycleConfig.Rules { for _, rule := range lifeCycleConfig.Rules {
if rule.Status != Enabled { if rule.Status != Enabled {
@ -879,6 +880,12 @@ func (s3a *S3ApiServer) PutBucketLifecycleConfigurationHandler(w http.ResponseWr
if rule.Expiration.Days == 0 { if rule.Expiration.Days == 0 {
continue continue
} }
ttlSec := int32((time.Duration(rule.Expiration.Days) * util.LifeCycleInterval).Seconds())
lifecycleRules = append(lifecycleRules, bucketLifecycleTTLRule{
Prefix: rulePrefix,
TtlSec: ttlSec,
})
locationPrefix := fmt.Sprintf("%s/%s/%s", s3a.option.BucketsPath, bucket, rulePrefix) locationPrefix := fmt.Sprintf("%s/%s/%s", s3a.option.BucketsPath, bucket, rulePrefix)
locConf := &filer_pb.FilerConf_PathConf{ locConf := &filer_pb.FilerConf_PathConf{
LocationPrefix: locationPrefix, LocationPrefix: locationPrefix,
@ -893,7 +900,6 @@ func (s3a *S3ApiServer) PutBucketLifecycleConfigurationHandler(w http.ResponseWr
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return return
} }
ttlSec := int32((time.Duration(rule.Expiration.Days) * util.LifeCycleInterval).Seconds())
glog.V(2).Infof("Start updating TTL for %s", locationPrefix) glog.V(2).Infof("Start updating TTL for %s", locationPrefix)
if updErr := s3a.updateEntriesTTL(locationPrefix, ttlSec); updErr != nil { if updErr := s3a.updateEntriesTTL(locationPrefix, ttlSec); updErr != nil {
glog.Errorf("PutBucketLifecycleConfigurationHandler update TTL for %s: %s", locationPrefix, updErr) glog.Errorf("PutBucketLifecycleConfigurationHandler update TTL for %s: %s", locationPrefix, updErr)
@ -917,6 +923,11 @@ func (s3a *S3ApiServer) PutBucketLifecycleConfigurationHandler(w http.ResponseWr
return return
} }
} }
if err := s3a.persistBucketLifecycleTTLRules(bucket, lifecycleRules); err != nil {
glog.Errorf("PutBucketLifecycleConfigurationHandler persist lifecycle metadata for %s: %v", bucket, err)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
writeSuccessResponseEmpty(w, r) writeSuccessResponseEmpty(w, r)
} }
@ -967,6 +978,11 @@ func (s3a *S3ApiServer) DeleteBucketLifecycleHandler(w http.ResponseWriter, r *h
return return
} }
} }
if err := s3a.persistBucketLifecycleTTLRules(bucket, nil); err != nil {
glog.Errorf("DeleteBucketLifecycleHandler clear lifecycle metadata for %s: %v", bucket, err)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
s3err.WriteEmptyResponse(w, r, http.StatusNoContent) s3err.WriteEmptyResponse(w, r, http.StatusNoContent)
} }

124
weed/s3api/s3api_bucket_lifecycle_ttl.go

@ -0,0 +1,124 @@
package s3api
import (
"encoding/json"
"fmt"
"sort"
"strings"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
)
type bucketLifecycleTTLRule struct {
Prefix string `json:"prefix"`
TtlSec int32 `json:"ttlSec"`
}
func normalizeLifecycleRulePrefix(prefix string) string {
prefix = strings.TrimSpace(prefix)
prefix = strings.TrimPrefix(prefix, "/")
if prefix == "." || prefix == "/" {
return ""
}
return prefix
}
func encodeBucketLifecycleTTLRules(rules []bucketLifecycleTTLRule) ([]byte, error) {
if len(rules) == 0 {
return nil, nil
}
sorted := make([]bucketLifecycleTTLRule, 0, len(rules))
for _, rule := range rules {
if rule.TtlSec <= 0 {
continue
}
sorted = append(sorted, bucketLifecycleTTLRule{
Prefix: normalizeLifecycleRulePrefix(rule.Prefix),
TtlSec: rule.TtlSec,
})
}
if len(sorted) == 0 {
return nil, nil
}
sort.Slice(sorted, func(i, j int) bool {
return sorted[i].Prefix < sorted[j].Prefix
})
return json.Marshal(sorted)
}
func decodeBucketLifecycleTTLRules(extended map[string][]byte) []bucketLifecycleTTLRule {
if len(extended) == 0 {
return nil
}
serialized := extended[s3_constants.ExtBucketLifecycleTTLRulesKey]
if len(serialized) == 0 {
return nil
}
var rules []bucketLifecycleTTLRule
if err := json.Unmarshal(serialized, &rules); err != nil {
glog.Warningf("decode bucket lifecycle ttl rules: %v", err)
return nil
}
return rules
}
func matchBucketLifecycleTTLSeconds(rules []bucketLifecycleTTLRule, objectKey string) int32 {
if len(rules) == 0 {
return 0
}
objectKey = normalizeLifecycleRulePrefix(objectKey)
var bestPrefix string
var ttlSeconds int32
for _, rule := range rules {
prefix := normalizeLifecycleRulePrefix(rule.Prefix)
if prefix != "" && !strings.HasPrefix(objectKey, prefix) {
continue
}
if len(prefix) >= len(bestPrefix) {
bestPrefix = prefix
ttlSeconds = rule.TtlSec
}
}
return ttlSeconds
}
func (s3a *S3ApiServer) resolveBucketLifecycleTTLSeconds(bucket, filePath string) int32 {
config, errCode := s3a.getBucketConfig(bucket)
if errCode != s3err.ErrNone || config == nil || config.Entry == nil {
return 0
}
bucketPrefix := s3a.bucketDir(bucket) + "/"
objectKey := strings.TrimPrefix(filePath, bucketPrefix)
if objectKey == filePath {
return 0
}
rules := decodeBucketLifecycleTTLRules(config.Entry.Extended)
return matchBucketLifecycleTTLSeconds(rules, objectKey)
}
func (s3a *S3ApiServer) persistBucketLifecycleTTLRules(bucket string, rules []bucketLifecycleTTLRule) error {
serialized, err := encodeBucketLifecycleTTLRules(rules)
if err != nil {
return fmt.Errorf("encode lifecycle ttl rules: %w", err)
}
errCode := s3a.updateBucketConfig(bucket, func(config *BucketConfig) error {
if config.Entry == nil {
return fmt.Errorf("bucket %s has no entry", bucket)
}
if config.Entry.Extended == nil {
config.Entry.Extended = make(map[string][]byte)
}
if len(serialized) == 0 {
delete(config.Entry.Extended, s3_constants.ExtBucketLifecycleTTLRulesKey)
} else {
config.Entry.Extended[s3_constants.ExtBucketLifecycleTTLRulesKey] = serialized
}
return nil
})
if errCode != s3err.ErrNone {
return fmt.Errorf("persist lifecycle ttl rules: %v", errCode)
}
return nil
}

68
weed/s3api/s3api_bucket_lifecycle_ttl_test.go

@ -0,0 +1,68 @@
package s3api
import (
"encoding/json"
"testing"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
)
func TestMatchBucketLifecycleTTLSeconds(t *testing.T) {
rules := []bucketLifecycleTTLRule{
{Prefix: "", TtlSec: 3600},
{Prefix: "logs/", TtlSec: 7200},
{Prefix: "logs/archive/", TtlSec: 10800},
}
if got := matchBucketLifecycleTTLSeconds(rules, "logs/file.txt"); got != 7200 {
t.Fatalf("expected 7200 for logs/file.txt, got %d", got)
}
if got := matchBucketLifecycleTTLSeconds(rules, "logs/archive/file.txt"); got != 10800 {
t.Fatalf("expected 10800 for logs/archive/file.txt, got %d", got)
}
if got := matchBucketLifecycleTTLSeconds(rules, "other/file.txt"); got != 3600 {
t.Fatalf("expected 3600 for other/file.txt, got %d", got)
}
}
func TestDecodeBucketLifecycleTTLRules(t *testing.T) {
extended := map[string][]byte{
s3_constants.ExtBucketLifecycleTTLRulesKey: []byte(`[{"prefix":"logs/","ttlSec":86400}]`),
}
rules := decodeBucketLifecycleTTLRules(extended)
if len(rules) != 1 {
t.Fatalf("expected 1 rule, got %d", len(rules))
}
if rules[0].Prefix != "logs/" || rules[0].TtlSec != 86400 {
t.Fatalf("unexpected rule: %+v", rules[0])
}
extended[s3_constants.ExtBucketLifecycleTTLRulesKey] = []byte(`{invalid json`)
if got := decodeBucketLifecycleTTLRules(extended); got != nil {
t.Fatalf("expected nil rules for invalid JSON, got %+v", got)
}
}
func TestEncodeBucketLifecycleTTLRules(t *testing.T) {
serialized, err := encodeBucketLifecycleTTLRules([]bucketLifecycleTTLRule{
{Prefix: "/logs/", TtlSec: 86400},
{Prefix: "tmp/", TtlSec: 0},
{Prefix: "", TtlSec: 3600},
})
if err != nil {
t.Fatalf("encode failed: %v", err)
}
var decoded []bucketLifecycleTTLRule
if err := json.Unmarshal(serialized, &decoded); err != nil {
t.Fatalf("unmarshal failed: %v", err)
}
if len(decoded) != 2 {
t.Fatalf("expected 2 persisted rules, got %d", len(decoded))
}
if decoded[0].Prefix != "" || decoded[0].TtlSec != 3600 {
t.Fatalf("unexpected first rule: %+v", decoded[0])
}
if decoded[1].Prefix != "logs/" || decoded[1].TtlSec != 86400 {
t.Fatalf("unexpected second rule: %+v", decoded[1])
}
}

5
weed/s3api/s3api_object_handlers_put.go

@ -358,6 +358,7 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, filePath string, dataReader
if s3a.option.FilerGroup != "" { if s3a.option.FilerGroup != "" {
collection = s3a.getCollectionName(bucket) collection = s3a.getCollectionName(bucket)
} }
ttlSeconds := s3a.resolveBucketLifecycleTTLSeconds(bucket, filePath)
// Create assign function for chunked upload // Create assign function for chunked upload
assignFunc := func(ctx context.Context, count int) (*operation.VolumeAssignRequest, *operation.AssignResult, error) { assignFunc := func(ctx context.Context, count int) (*operation.VolumeAssignRequest, *operation.AssignResult, error) {
@ -370,6 +371,7 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, filePath string, dataReader
DiskType: "", DiskType: "",
DataCenter: s3a.option.DataCenter, DataCenter: s3a.option.DataCenter,
Path: filePath, Path: filePath,
TtlSec: ttlSeconds,
}) })
if err != nil { if err != nil {
return fmt.Errorf("assign volume: %w", err) return fmt.Errorf("assign volume: %w", err)
@ -534,6 +536,9 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, filePath string, dataReader
Chunks: chunkResult.FileChunks, // All chunks from auto-chunking Chunks: chunkResult.FileChunks, // All chunks from auto-chunking
Extended: make(map[string][]byte), Extended: make(map[string][]byte),
} }
if ttlSeconds > 0 {
entry.Attributes.TtlSec = ttlSeconds
}
// Always set Md5 attribute for regular object uploads (PutObject) // Always set Md5 attribute for regular object uploads (PutObject)
// This ensures the ETag is a pure MD5 hash, which AWS S3 SDKs expect // This ensures the ETag is a pure MD5 hash, which AWS S3 SDKs expect

Loading…
Cancel
Save