Browse Source

Merge 34922fd761 into 78a3441b30

pull/8305/merge
Chris Lu 11 hours ago
committed by GitHub
parent
commit
81d6c0f62f
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 3
      weed/s3api/s3_constants/extend_key.go
  2. 18
      weed/s3api/s3api_bucket_handlers.go
  3. 124
      weed/s3api/s3api_bucket_lifecycle_ttl.go
  4. 68
      weed/s3api/s3api_bucket_lifecycle_ttl_test.go
  5. 5
      weed/s3api/s3api_object_handlers_put.go

3
weed/s3api/s3_constants/extend_key.go

@ -21,7 +21,8 @@ const (
ExtMultipartObjectKey = "key"
// Bucket Policy
ExtBucketPolicyKey = "Seaweed-X-Amz-Bucket-Policy"
ExtBucketPolicyKey = "Seaweed-X-Amz-Bucket-Policy"
ExtBucketLifecycleTTLRulesKey = "Seaweed-X-Amz-Bucket-Lifecycle-TTL-Rules"
// Object Retention and Legal Hold
ExtObjectLockModeKey = "Seaweed-X-Amz-Object-Lock-Mode"

18
weed/s3api/s3api_bucket_handlers.go

@ -909,6 +909,7 @@ func (s3a *S3ApiServer) PutBucketLifecycleConfigurationHandler(w http.ResponseWr
collectionName := s3a.getCollectionName(bucket)
collectionTtls := fc.GetCollectionTtls(collectionName)
changed := false
lifecycleRules := make([]bucketLifecycleTTLRule, 0, len(lifeCycleConfig.Rules))
for _, rule := range lifeCycleConfig.Rules {
if rule.Status != Enabled {
@ -928,6 +929,12 @@ func (s3a *S3ApiServer) PutBucketLifecycleConfigurationHandler(w http.ResponseWr
if rule.Expiration.Days == 0 {
continue
}
ttlSec := int32((time.Duration(rule.Expiration.Days) * util.LifeCycleInterval).Seconds())
lifecycleRules = append(lifecycleRules, bucketLifecycleTTLRule{
Prefix: rulePrefix,
TtlSec: ttlSec,
})
locationPrefix := fmt.Sprintf("%s/%s/%s", s3a.option.BucketsPath, bucket, rulePrefix)
locConf := &filer_pb.FilerConf_PathConf{
LocationPrefix: locationPrefix,
@ -946,7 +953,6 @@ func (s3a *S3ApiServer) PutBucketLifecycleConfigurationHandler(w http.ResponseWr
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
ttlSec := int32((time.Duration(rule.Expiration.Days) * util.LifeCycleInterval).Seconds())
glog.V(2).Infof("Start updating TTL for %s", locationPrefix)
if updErr := s3a.updateEntriesTTL(locationPrefix, ttlSec); updErr != nil {
glog.Errorf("PutBucketLifecycleConfigurationHandler update TTL for %s: %s", locationPrefix, updErr)
@ -970,6 +976,11 @@ func (s3a *S3ApiServer) PutBucketLifecycleConfigurationHandler(w http.ResponseWr
return
}
}
if err := s3a.persistBucketLifecycleTTLRules(bucket, lifecycleRules); err != nil {
glog.Errorf("PutBucketLifecycleConfigurationHandler persist lifecycle metadata for %s: %v", bucket, err)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
writeSuccessResponseEmpty(w, r)
}
@ -1020,6 +1031,11 @@ func (s3a *S3ApiServer) DeleteBucketLifecycleHandler(w http.ResponseWriter, r *h
return
}
}
if err := s3a.persistBucketLifecycleTTLRules(bucket, nil); err != nil {
glog.Errorf("DeleteBucketLifecycleHandler clear lifecycle metadata for %s: %v", bucket, err)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
s3err.WriteEmptyResponse(w, r, http.StatusNoContent)
}

124
weed/s3api/s3api_bucket_lifecycle_ttl.go

@ -0,0 +1,124 @@
package s3api
import (
"encoding/json"
"fmt"
"sort"
"strings"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
)
type bucketLifecycleTTLRule struct {
Prefix string `json:"prefix"`
TtlSec int32 `json:"ttlSec"`
}
func normalizeLifecycleRulePrefix(prefix string) string {
prefix = strings.TrimSpace(prefix)
prefix = strings.TrimPrefix(prefix, "/")
if prefix == "." || prefix == "/" {
return ""
}
return prefix
}
func encodeBucketLifecycleTTLRules(rules []bucketLifecycleTTLRule) ([]byte, error) {
if len(rules) == 0 {
return nil, nil
}
sorted := make([]bucketLifecycleTTLRule, 0, len(rules))
for _, rule := range rules {
if rule.TtlSec <= 0 {
continue
}
sorted = append(sorted, bucketLifecycleTTLRule{
Prefix: normalizeLifecycleRulePrefix(rule.Prefix),
TtlSec: rule.TtlSec,
})
}
if len(sorted) == 0 {
return nil, nil
}
sort.Slice(sorted, func(i, j int) bool {
return sorted[i].Prefix < sorted[j].Prefix
})
return json.Marshal(sorted)
}
func decodeBucketLifecycleTTLRules(extended map[string][]byte) []bucketLifecycleTTLRule {
if len(extended) == 0 {
return nil
}
serialized := extended[s3_constants.ExtBucketLifecycleTTLRulesKey]
if len(serialized) == 0 {
return nil
}
var rules []bucketLifecycleTTLRule
if err := json.Unmarshal(serialized, &rules); err != nil {
glog.Warningf("decode bucket lifecycle ttl rules: %v", err)
return nil
}
return rules
}
func matchBucketLifecycleTTLSeconds(rules []bucketLifecycleTTLRule, objectKey string) int32 {
if len(rules) == 0 {
return 0
}
objectKey = normalizeLifecycleRulePrefix(objectKey)
var bestPrefix string
var ttlSeconds int32
for _, rule := range rules {
prefix := normalizeLifecycleRulePrefix(rule.Prefix)
if prefix != "" && !strings.HasPrefix(objectKey, prefix) {
continue
}
if len(prefix) >= len(bestPrefix) {
bestPrefix = prefix
ttlSeconds = rule.TtlSec
}
}
return ttlSeconds
}
func (s3a *S3ApiServer) resolveBucketLifecycleTTLSeconds(bucket, filePath string) int32 {
config, errCode := s3a.getBucketConfig(bucket)
if errCode != s3err.ErrNone || config == nil || config.Entry == nil {
return 0
}
bucketPrefix := s3a.bucketDir(bucket) + "/"
objectKey := strings.TrimPrefix(filePath, bucketPrefix)
if objectKey == filePath {
return 0
}
rules := decodeBucketLifecycleTTLRules(config.Entry.Extended)
return matchBucketLifecycleTTLSeconds(rules, objectKey)
}
func (s3a *S3ApiServer) persistBucketLifecycleTTLRules(bucket string, rules []bucketLifecycleTTLRule) error {
serialized, err := encodeBucketLifecycleTTLRules(rules)
if err != nil {
return fmt.Errorf("encode lifecycle ttl rules: %w", err)
}
errCode := s3a.updateBucketConfig(bucket, func(config *BucketConfig) error {
if config.Entry == nil {
return fmt.Errorf("bucket %s has no entry", bucket)
}
if config.Entry.Extended == nil {
config.Entry.Extended = make(map[string][]byte)
}
if len(serialized) == 0 {
delete(config.Entry.Extended, s3_constants.ExtBucketLifecycleTTLRulesKey)
} else {
config.Entry.Extended[s3_constants.ExtBucketLifecycleTTLRulesKey] = serialized
}
return nil
})
if errCode != s3err.ErrNone {
return fmt.Errorf("persist lifecycle ttl rules: %v", errCode)
}
return nil
}

68
weed/s3api/s3api_bucket_lifecycle_ttl_test.go

@ -0,0 +1,68 @@
package s3api
import (
"encoding/json"
"testing"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
)
func TestMatchBucketLifecycleTTLSeconds(t *testing.T) {
rules := []bucketLifecycleTTLRule{
{Prefix: "", TtlSec: 3600},
{Prefix: "logs/", TtlSec: 7200},
{Prefix: "logs/archive/", TtlSec: 10800},
}
if got := matchBucketLifecycleTTLSeconds(rules, "logs/file.txt"); got != 7200 {
t.Fatalf("expected 7200 for logs/file.txt, got %d", got)
}
if got := matchBucketLifecycleTTLSeconds(rules, "logs/archive/file.txt"); got != 10800 {
t.Fatalf("expected 10800 for logs/archive/file.txt, got %d", got)
}
if got := matchBucketLifecycleTTLSeconds(rules, "other/file.txt"); got != 3600 {
t.Fatalf("expected 3600 for other/file.txt, got %d", got)
}
}
func TestDecodeBucketLifecycleTTLRules(t *testing.T) {
extended := map[string][]byte{
s3_constants.ExtBucketLifecycleTTLRulesKey: []byte(`[{"prefix":"logs/","ttlSec":86400}]`),
}
rules := decodeBucketLifecycleTTLRules(extended)
if len(rules) != 1 {
t.Fatalf("expected 1 rule, got %d", len(rules))
}
if rules[0].Prefix != "logs/" || rules[0].TtlSec != 86400 {
t.Fatalf("unexpected rule: %+v", rules[0])
}
extended[s3_constants.ExtBucketLifecycleTTLRulesKey] = []byte(`{invalid json`)
if got := decodeBucketLifecycleTTLRules(extended); got != nil {
t.Fatalf("expected nil rules for invalid JSON, got %+v", got)
}
}
func TestEncodeBucketLifecycleTTLRules(t *testing.T) {
serialized, err := encodeBucketLifecycleTTLRules([]bucketLifecycleTTLRule{
{Prefix: "/logs/", TtlSec: 86400},
{Prefix: "tmp/", TtlSec: 0},
{Prefix: "", TtlSec: 3600},
})
if err != nil {
t.Fatalf("encode failed: %v", err)
}
var decoded []bucketLifecycleTTLRule
if err := json.Unmarshal(serialized, &decoded); err != nil {
t.Fatalf("unmarshal failed: %v", err)
}
if len(decoded) != 2 {
t.Fatalf("expected 2 persisted rules, got %d", len(decoded))
}
if decoded[0].Prefix != "" || decoded[0].TtlSec != 3600 {
t.Fatalf("unexpected first rule: %+v", decoded[0])
}
if decoded[1].Prefix != "logs/" || decoded[1].TtlSec != 86400 {
t.Fatalf("unexpected second rule: %+v", decoded[1])
}
}

5
weed/s3api/s3api_object_handlers_put.go

@ -360,6 +360,7 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, filePath string, dataReader
if s3a.option.FilerGroup != "" {
collection = s3a.getCollectionName(bucket)
}
ttlSeconds := s3a.resolveBucketLifecycleTTLSeconds(bucket, filePath)
// Create assign function for chunked upload
assignFunc := func(ctx context.Context, count int) (*operation.VolumeAssignRequest, *operation.AssignResult, error) {
@ -372,6 +373,7 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, filePath string, dataReader
DiskType: "",
DataCenter: s3a.option.DataCenter,
Path: filePath,
TtlSec: ttlSeconds,
})
if err != nil {
return fmt.Errorf("assign volume: %w", err)
@ -549,6 +551,9 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, filePath string, dataReader
Chunks: chunkResult.FileChunks, // All chunks from auto-chunking
Extended: make(map[string][]byte),
}
if ttlSeconds > 0 {
entry.Attributes.TtlSec = ttlSeconds
}
// Always set Md5 attribute for regular object uploads (PutObject)
// This ensures the ETag is a pure MD5 hash, which AWS S3 SDKs expect

Loading…
Cancel
Save