From 498ac8903fe58cec8573bd94725ec4b463803095 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 3 Nov 2025 15:27:20 -0800 Subject: [PATCH] S3: prevent deleting buckets with object locking (#7434) * prevent deleting buckets with object locking * addressing comments * Update s3api_bucket_handlers.go * address comments * early return * refactor * simplify * constant * go fmt --- .../s3_bucket_delete_with_lock_test.go | 239 ++++++++++++++++++ weed/command/benchmark.go | 2 +- weed/command/download.go | 6 +- weed/s3api/auth_credentials_subscribe.go | 2 +- weed/s3api/auto_signature_v4_test.go | 2 +- weed/s3api/filer_multipart.go | 2 +- weed/s3api/s3_constants/s3_actions.go | 1 + weed/s3api/s3api_bucket_config.go | 4 +- weed/s3api/s3api_bucket_handlers.go | 181 ++++++++++++- weed/s3api/s3api_object_handlers_acl.go | 4 +- weed/s3api/s3api_object_handlers_list.go | 4 +- weed/s3api/s3api_object_handlers_put.go | 10 +- weed/s3api/s3api_object_versioning.go | 18 +- weed/util/net_timeout.go | 10 +- 14 files changed, 452 insertions(+), 33 deletions(-) create mode 100644 test/s3/retention/s3_bucket_delete_with_lock_test.go diff --git a/test/s3/retention/s3_bucket_delete_with_lock_test.go b/test/s3/retention/s3_bucket_delete_with_lock_test.go new file mode 100644 index 000000000..3a91f0369 --- /dev/null +++ b/test/s3/retention/s3_bucket_delete_with_lock_test.go @@ -0,0 +1,239 @@ +package retention + +import ( + "context" + "fmt" + "strings" + "testing" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/aws/aws-sdk-go-v2/service/s3/types" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// TestBucketDeletionWithObjectLock tests that buckets with object lock enabled +// cannot be deleted if they contain objects with active retention or legal hold +func TestBucketDeletionWithObjectLock(t *testing.T) { + client := getS3Client(t) + bucketName := getNewBucketName() + + // Create bucket with object lock enabled + createBucketWithObjectLock(t, client, bucketName) + + // Table-driven test for retention modes + retentionTestCases := []struct { + name string + lockMode types.ObjectLockMode + }{ + {name: "ComplianceRetention", lockMode: types.ObjectLockModeCompliance}, + {name: "GovernanceRetention", lockMode: types.ObjectLockModeGovernance}, + } + + for _, tc := range retentionTestCases { + t.Run(fmt.Sprintf("CannotDeleteBucketWith%s", tc.name), func(t *testing.T) { + key := fmt.Sprintf("test-%s", strings.ToLower(strings.ReplaceAll(tc.name, "Retention", "-retention"))) + content := fmt.Sprintf("test content for %s", strings.ToLower(tc.name)) + retainUntilDate := time.Now().Add(10 * time.Second) // 10 seconds in future + + // Upload object with retention + _, err := client.PutObject(context.Background(), &s3.PutObjectInput{ + Bucket: aws.String(bucketName), + Key: aws.String(key), + Body: strings.NewReader(content), + ObjectLockMode: tc.lockMode, + ObjectLockRetainUntilDate: aws.Time(retainUntilDate), + }) + require.NoError(t, err, "PutObject with %s should succeed", tc.name) + + // Try to delete bucket - should fail because object has active retention + _, err = client.DeleteBucket(context.Background(), &s3.DeleteBucketInput{ + Bucket: aws.String(bucketName), + }) + require.Error(t, err, "DeleteBucket should fail when objects have active retention") + assert.Contains(t, err.Error(), "BucketNotEmpty", "Error should be BucketNotEmpty") + t.Logf("Expected error: %v", err) + + // Wait for retention to expire with dynamic sleep based on actual retention time + t.Logf("Waiting for %s to expire...", tc.name) + time.Sleep(time.Until(retainUntilDate) + time.Second) + + // Delete the object + _, err = client.DeleteObject(context.Background(), &s3.DeleteObjectInput{ + Bucket: aws.String(bucketName), + Key: aws.String(key), + }) + require.NoError(t, err, "DeleteObject should succeed after retention expires") + + // Clean up versions + deleteAllObjectVersions(t, client, bucketName) + }) + } + + // Test 3: Bucket deletion with legal hold should fail + t.Run("CannotDeleteBucketWithLegalHold", func(t *testing.T) { + key := "test-legal-hold" + content := "test content for legal hold" + + // Upload object first + _, err := client.PutObject(context.Background(), &s3.PutObjectInput{ + Bucket: aws.String(bucketName), + Key: aws.String(key), + Body: strings.NewReader(content), + }) + require.NoError(t, err, "PutObject should succeed") + + // Set legal hold on the object + _, err = client.PutObjectLegalHold(context.Background(), &s3.PutObjectLegalHoldInput{ + Bucket: aws.String(bucketName), + Key: aws.String(key), + LegalHold: &types.ObjectLockLegalHold{Status: types.ObjectLockLegalHoldStatusOn}, + }) + require.NoError(t, err, "PutObjectLegalHold should succeed") + + // Try to delete bucket - should fail because object has active legal hold + _, err = client.DeleteBucket(context.Background(), &s3.DeleteBucketInput{ + Bucket: aws.String(bucketName), + }) + require.Error(t, err, "DeleteBucket should fail when objects have active legal hold") + assert.Contains(t, err.Error(), "BucketNotEmpty", "Error should be BucketNotEmpty") + t.Logf("Expected error: %v", err) + + // Remove legal hold + _, err = client.PutObjectLegalHold(context.Background(), &s3.PutObjectLegalHoldInput{ + Bucket: aws.String(bucketName), + Key: aws.String(key), + LegalHold: &types.ObjectLockLegalHold{Status: types.ObjectLockLegalHoldStatusOff}, + }) + require.NoError(t, err, "Removing legal hold should succeed") + + // Delete the object + _, err = client.DeleteObject(context.Background(), &s3.DeleteObjectInput{ + Bucket: aws.String(bucketName), + Key: aws.String(key), + }) + require.NoError(t, err, "DeleteObject should succeed after legal hold is removed") + + // Clean up versions + deleteAllObjectVersions(t, client, bucketName) + }) + + // Test 4: Bucket deletion should succeed when no objects have active locks + t.Run("CanDeleteBucketWithoutActiveLocks", func(t *testing.T) { + // Make sure all objects are deleted + deleteAllObjectVersions(t, client, bucketName) + + // Use retry mechanism for eventual consistency instead of fixed sleep + require.Eventually(t, func() bool { + _, err := client.DeleteBucket(context.Background(), &s3.DeleteBucketInput{ + Bucket: aws.String(bucketName), + }) + if err != nil { + t.Logf("Retrying DeleteBucket due to: %v", err) + return false + } + return true + }, 5*time.Second, 500*time.Millisecond, "DeleteBucket should succeed when no objects have active locks") + + t.Logf("Successfully deleted bucket without active locks") + }) +} + +// TestBucketDeletionWithVersionedLocks tests deletion with versioned objects under lock +func TestBucketDeletionWithVersionedLocks(t *testing.T) { + client := getS3Client(t) + bucketName := getNewBucketName() + + // Create bucket with object lock enabled + createBucketWithObjectLock(t, client, bucketName) + defer deleteBucket(t, client, bucketName) // Best effort cleanup + + key := "test-versioned-locks" + content1 := "version 1 content" + content2 := "version 2 content" + retainUntilDate := time.Now().Add(10 * time.Second) + + // Upload first version with retention + putResp1, err := client.PutObject(context.Background(), &s3.PutObjectInput{ + Bucket: aws.String(bucketName), + Key: aws.String(key), + Body: strings.NewReader(content1), + ObjectLockMode: types.ObjectLockModeGovernance, + ObjectLockRetainUntilDate: aws.Time(retainUntilDate), + }) + require.NoError(t, err) + version1 := *putResp1.VersionId + + // Upload second version with retention + putResp2, err := client.PutObject(context.Background(), &s3.PutObjectInput{ + Bucket: aws.String(bucketName), + Key: aws.String(key), + Body: strings.NewReader(content2), + ObjectLockMode: types.ObjectLockModeGovernance, + ObjectLockRetainUntilDate: aws.Time(retainUntilDate), + }) + require.NoError(t, err) + version2 := *putResp2.VersionId + + t.Logf("Created two versions: %s, %s", version1, version2) + + // Try to delete bucket - should fail because versions have active retention + _, err = client.DeleteBucket(context.Background(), &s3.DeleteBucketInput{ + Bucket: aws.String(bucketName), + }) + require.Error(t, err, "DeleteBucket should fail when object versions have active retention") + assert.Contains(t, err.Error(), "BucketNotEmpty", "Error should be BucketNotEmpty") + t.Logf("Expected error: %v", err) + + // Wait for retention to expire with dynamic sleep based on actual retention time + t.Logf("Waiting for retention to expire on all versions...") + time.Sleep(time.Until(retainUntilDate) + time.Second) + + // Clean up all versions + deleteAllObjectVersions(t, client, bucketName) + + // Wait for eventual consistency and attempt to delete the bucket with retry + require.Eventually(t, func() bool { + _, err := client.DeleteBucket(context.Background(), &s3.DeleteBucketInput{ + Bucket: aws.String(bucketName), + }) + if err != nil { + t.Logf("Retrying DeleteBucket due to: %v", err) + return false + } + return true + }, 5*time.Second, 500*time.Millisecond, "DeleteBucket should succeed after all locks expire") + + t.Logf("Successfully deleted bucket after locks expired") +} + +// TestBucketDeletionWithoutObjectLock tests that buckets without object lock can be deleted normally +func TestBucketDeletionWithoutObjectLock(t *testing.T) { + client := getS3Client(t) + bucketName := getNewBucketName() + + // Create regular bucket without object lock + createBucket(t, client, bucketName) + + // Upload some objects + for i := 0; i < 3; i++ { + _, err := client.PutObject(context.Background(), &s3.PutObjectInput{ + Bucket: aws.String(bucketName), + Key: aws.String(fmt.Sprintf("test-object-%d", i)), + Body: strings.NewReader("test content"), + }) + require.NoError(t, err) + } + + // Delete all objects + deleteAllObjectVersions(t, client, bucketName) + + // Delete bucket should succeed + _, err := client.DeleteBucket(context.Background(), &s3.DeleteBucketInput{ + Bucket: aws.String(bucketName), + }) + require.NoError(t, err, "DeleteBucket should succeed for regular bucket") + t.Logf("Successfully deleted regular bucket without object lock") +} diff --git a/weed/command/benchmark.go b/weed/command/benchmark.go index 660e31921..c9e6f6766 100644 --- a/weed/command/benchmark.go +++ b/weed/command/benchmark.go @@ -141,7 +141,7 @@ func runBenchmark(cmd *Command, args []string) bool { fmt.Fprintln(os.Stderr, "Error: -readOnly and -writeOnly are mutually exclusive.") return false } - + doWrite := true doRead := true if *b.readOnly { diff --git a/weed/command/download.go b/weed/command/download.go index 95238b99f..e44335097 100644 --- a/weed/command/download.go +++ b/weed/command/download.go @@ -23,9 +23,9 @@ var ( ) type DownloadOptions struct { - master *string - server *string // deprecated, for backward compatibility - dir *string + master *string + server *string // deprecated, for backward compatibility + dir *string } func init() { diff --git a/weed/s3api/auth_credentials_subscribe.go b/weed/s3api/auth_credentials_subscribe.go index 829a3d61c..09150f7c8 100644 --- a/weed/s3api/auth_credentials_subscribe.go +++ b/weed/s3api/auth_credentials_subscribe.go @@ -109,7 +109,7 @@ func (s3a *S3ApiServer) updateBucketConfigCacheFromEntry(entry *filer_pb.Entry) bucket := entry.Name - glog.V(3).Infof("updateBucketConfigCacheFromEntry: called for bucket %s, ExtObjectLockEnabledKey=%s", + glog.V(3).Infof("updateBucketConfigCacheFromEntry: called for bucket %s, ExtObjectLockEnabledKey=%s", bucket, string(entry.Extended[s3_constants.ExtObjectLockEnabledKey])) // Create new bucket config from the entry diff --git a/weed/s3api/auto_signature_v4_test.go b/weed/s3api/auto_signature_v4_test.go index 47c55e077..b23756f33 100644 --- a/weed/s3api/auto_signature_v4_test.go +++ b/weed/s3api/auto_signature_v4_test.go @@ -491,7 +491,7 @@ func TestSignatureV4WithoutProxy(t *testing.T) { // Set forwarded headers r.Header.Set("Host", tt.host) - + // First, verify that extractHostHeader returns the expected value extractedHost := extractHostHeader(r) if extractedHost != tt.expectedHost { diff --git a/weed/s3api/filer_multipart.go b/weed/s3api/filer_multipart.go index d181d51da..cb4c73692 100644 --- a/weed/s3api/filer_multipart.go +++ b/weed/s3api/filer_multipart.go @@ -313,7 +313,7 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl // For versioned buckets, create a version and return the version ID versionId := generateVersionId() versionFileName := s3a.getVersionFileName(versionId) - versionDir := dirName + "/" + entryName + ".versions" + versionDir := dirName + "/" + entryName + s3_constants.VersionsFolder // Move the completed object to the versions directory err = s3a.mkFile(versionDir, versionFileName, finalParts, func(versionEntry *filer_pb.Entry) { diff --git a/weed/s3api/s3_constants/s3_actions.go b/weed/s3api/s3_constants/s3_actions.go index 923327be2..835146bf3 100644 --- a/weed/s3api/s3_constants/s3_actions.go +++ b/weed/s3api/s3_constants/s3_actions.go @@ -27,5 +27,6 @@ const ( SeaweedStorageDestinationHeader = "x-seaweedfs-destination" MultipartUploadsFolder = ".uploads" + VersionsFolder = ".versions" FolderMimeType = "httpd/unix-directory" ) diff --git a/weed/s3api/s3api_bucket_config.go b/weed/s3api/s3api_bucket_config.go index 26b114160..128b17c06 100644 --- a/weed/s3api/s3api_bucket_config.go +++ b/weed/s3api/s3api_bucket_config.go @@ -350,7 +350,7 @@ func (s3a *S3ApiServer) getBucketConfig(bucket string) (*BucketConfig, s3err.Err // Extract configuration from extended attributes if entry.Extended != nil { - glog.V(3).Infof("getBucketConfig: checking extended attributes for bucket %s, ExtObjectLockEnabledKey value=%s", + glog.V(3).Infof("getBucketConfig: checking extended attributes for bucket %s, ExtObjectLockEnabledKey value=%s", bucket, string(entry.Extended[s3_constants.ExtObjectLockEnabledKey])) if versioning, exists := entry.Extended[s3_constants.ExtVersioningKey]; exists { config.Versioning = string(versioning) @@ -435,7 +435,7 @@ func (s3a *S3ApiServer) updateBucketConfig(bucket string, updateFn func(*BucketC glog.Errorf("updateBucketConfig: failed to store Object Lock configuration for bucket %s: %v", bucket, err) return s3err.ErrInternalError } - glog.V(3).Infof("updateBucketConfig: stored Object Lock config in extended attributes for bucket %s, key=%s, value=%s", + glog.V(3).Infof("updateBucketConfig: stored Object Lock config in extended attributes for bucket %s, key=%s, value=%s", bucket, s3_constants.ExtObjectLockEnabledKey, string(config.Entry.Extended[s3_constants.ExtObjectLockEnabledKey])) } diff --git a/weed/s3api/s3api_bucket_handlers.go b/weed/s3api/s3api_bucket_handlers.go index c3f934557..ead77041e 100644 --- a/weed/s3api/s3api_bucket_handlers.go +++ b/weed/s3api/s3api_bucket_handlers.go @@ -9,7 +9,9 @@ import ( "fmt" "math" "net/http" + "path" "sort" + "strconv" "strings" "time" @@ -251,6 +253,28 @@ func (s3a *S3ApiServer) DeleteBucketHandler(w http.ResponseWriter, r *http.Reque return } + // Check if bucket has object lock enabled + bucketConfig, errCode := s3a.getBucketConfig(bucket) + if errCode != s3err.ErrNone { + s3err.WriteErrorResponse(w, r, errCode) + return + } + + // If object lock is enabled, check for objects with active locks + if bucketConfig.ObjectLockConfig != nil { + hasLockedObjects, checkErr := s3a.hasObjectsWithActiveLocks(bucket) + if checkErr != nil { + glog.Errorf("DeleteBucketHandler: failed to check for locked objects in bucket %s: %v", bucket, checkErr) + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) + return + } + if hasLockedObjects { + glog.V(3).Infof("DeleteBucketHandler: bucket %s has objects with active object locks, cannot delete", bucket) + s3err.WriteErrorResponse(w, r, s3err.ErrBucketNotEmpty) + return + } + } + err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { if !s3a.option.AllowDeleteBucketNotEmpty { entries, _, err := s3a.list(s3a.option.BucketsPath+"/"+bucket, "", "", false, 2) @@ -258,7 +282,9 @@ func (s3a *S3ApiServer) DeleteBucketHandler(w http.ResponseWriter, r *http.Reque return fmt.Errorf("failed to list bucket %s: %v", bucket, err) } for _, entry := range entries { - if entry.Name != s3_constants.MultipartUploadsFolder { + // Allow bucket deletion if only special directories remain + if entry.Name != s3_constants.MultipartUploadsFolder && + !strings.HasSuffix(entry.Name, s3_constants.VersionsFolder) { return errors.New(s3err.GetAPIError(s3err.ErrBucketNotEmpty).Code) } } @@ -299,6 +325,159 @@ func (s3a *S3ApiServer) DeleteBucketHandler(w http.ResponseWriter, r *http.Reque s3err.WriteEmptyResponse(w, r, http.StatusNoContent) } +// hasObjectsWithActiveLocks checks if any objects in the bucket have active retention or legal hold +func (s3a *S3ApiServer) hasObjectsWithActiveLocks(bucket string) (bool, error) { + bucketPath := s3a.option.BucketsPath + "/" + bucket + + // Check all objects including versions for active locks + // Establish current time once at the start for consistency across the entire scan + hasLocks := false + currentTime := time.Now() + err := s3a.recursivelyCheckLocks(bucketPath, "", &hasLocks, currentTime) + if err != nil { + return false, fmt.Errorf("error checking for locked objects: %w", err) + } + + return hasLocks, nil +} + +const ( + // lockCheckPaginationSize is the page size for listing directories during lock checks + lockCheckPaginationSize = 10000 +) + +// errStopPagination is a sentinel error to signal early termination of pagination +var errStopPagination = errors.New("stop pagination") + +// paginateEntries iterates through directory entries with pagination +// Calls fn for each page of entries. If fn returns errStopPagination, iteration stops successfully. +func (s3a *S3ApiServer) paginateEntries(dir string, fn func(entries []*filer_pb.Entry) error) error { + startFrom := "" + for { + entries, isLast, err := s3a.list(dir, "", startFrom, false, lockCheckPaginationSize) + if err != nil { + // Fail-safe: propagate error to prevent incorrect bucket deletion + return fmt.Errorf("failed to list directory %s: %w", dir, err) + } + + if err := fn(entries); err != nil { + if errors.Is(err, errStopPagination) { + return nil + } + return err + } + + if isLast || len(entries) == 0 { + break + } + // Use the last entry name as the start point for next page + startFrom = entries[len(entries)-1].Name + } + return nil +} + +// recursivelyCheckLocks recursively checks all objects and versions for active locks +// Uses pagination to handle directories with more than 10,000 entries +func (s3a *S3ApiServer) recursivelyCheckLocks(dir string, relativePath string, hasLocks *bool, currentTime time.Time) error { + if *hasLocks { + // Early exit if we've already found a locked object + return nil + } + + // Process entries in the current directory with pagination + err := s3a.paginateEntries(dir, func(entries []*filer_pb.Entry) error { + for _, entry := range entries { + if *hasLocks { + // Early exit if we've already found a locked object + return errStopPagination + } + + // Skip special directories (multipart uploads, etc) + if entry.Name == s3_constants.MultipartUploadsFolder { + continue + } + + if entry.IsDirectory { + subDir := path.Join(dir, entry.Name) + if strings.HasSuffix(entry.Name, s3_constants.VersionsFolder) { + // If it's a .versions directory, check all version files with pagination + err := s3a.paginateEntries(subDir, func(versionEntries []*filer_pb.Entry) error { + for _, versionEntry := range versionEntries { + if s3a.entryHasActiveLock(versionEntry, currentTime) { + *hasLocks = true + glog.V(2).Infof("Found object with active lock in versions: %s/%s", subDir, versionEntry.Name) + return errStopPagination + } + } + return nil + }) + if err != nil { + return err + } + } else { + // Recursively check other subdirectories + subRelativePath := path.Join(relativePath, entry.Name) + if err := s3a.recursivelyCheckLocks(subDir, subRelativePath, hasLocks, currentTime); err != nil { + return err + } + // Early exit if a locked object was found in the subdirectory + if *hasLocks { + return errStopPagination + } + } + } else { + // Check regular files for locks + if s3a.entryHasActiveLock(entry, currentTime) { + *hasLocks = true + objectPath := path.Join(relativePath, entry.Name) + glog.V(2).Infof("Found object with active lock: %s", objectPath) + return errStopPagination + } + } + } + return nil + }) + + return err +} + +// entryHasActiveLock checks if an entry has an active retention or legal hold +func (s3a *S3ApiServer) entryHasActiveLock(entry *filer_pb.Entry, currentTime time.Time) bool { + if entry.Extended == nil { + return false + } + + // Check for active legal hold + if legalHoldBytes, exists := entry.Extended[s3_constants.ExtLegalHoldKey]; exists { + if string(legalHoldBytes) == s3_constants.LegalHoldOn { + return true + } + } + + // Check for active retention + if modeBytes, exists := entry.Extended[s3_constants.ExtObjectLockModeKey]; exists { + mode := string(modeBytes) + if mode == s3_constants.RetentionModeCompliance || mode == s3_constants.RetentionModeGovernance { + // Check if retention is still active + if dateBytes, dateExists := entry.Extended[s3_constants.ExtRetentionUntilDateKey]; dateExists { + timestamp, err := strconv.ParseInt(string(dateBytes), 10, 64) + if err != nil { + // Fail-safe: if we can't parse the retention date, assume the object is locked + // to prevent accidental data loss + glog.Warningf("Failed to parse retention date '%s' for entry, assuming locked: %v", string(dateBytes), err) + return true + } + retainUntil := time.Unix(timestamp, 0) + if retainUntil.After(currentTime) { + return true + } + } + } + } + + return false +} + func (s3a *S3ApiServer) HeadBucketHandler(w http.ResponseWriter, r *http.Request) { bucket, _ := s3_constants.GetBucketAndObject(r) diff --git a/weed/s3api/s3api_object_handlers_acl.go b/weed/s3api/s3api_object_handlers_acl.go index 1386b6cba..1b6f28916 100644 --- a/weed/s3api/s3api_object_handlers_acl.go +++ b/weed/s3api/s3api_object_handlers_acl.go @@ -308,7 +308,7 @@ func (s3a *S3ApiServer) PutObjectAclHandler(w http.ResponseWriter, r *http.Reque if versioningConfigured { if versionId != "" && versionId != "null" { // Versioned object - update the specific version file in .versions directory - updateDirectory = s3a.option.BucketsPath + "/" + bucket + "/" + object + ".versions" + updateDirectory = s3a.option.BucketsPath + "/" + bucket + "/" + object + s3_constants.VersionsFolder } else { // Latest version in versioned bucket - could be null version or versioned object // Extract version ID from the entry to determine where it's stored @@ -324,7 +324,7 @@ func (s3a *S3ApiServer) PutObjectAclHandler(w http.ResponseWriter, r *http.Reque updateDirectory = s3a.option.BucketsPath + "/" + bucket } else { // Versioned object - stored in .versions directory - updateDirectory = s3a.option.BucketsPath + "/" + bucket + "/" + object + ".versions" + updateDirectory = s3a.option.BucketsPath + "/" + bucket + "/" + object + s3_constants.VersionsFolder } } } else { diff --git a/weed/s3api/s3api_object_handlers_list.go b/weed/s3api/s3api_object_handlers_list.go index f60dccee0..9e6376a0e 100644 --- a/weed/s3api/s3api_object_handlers_list.go +++ b/weed/s3api/s3api_object_handlers_list.go @@ -511,7 +511,7 @@ func (s3a *S3ApiServer) doListFilerEntries(client filer_pb.SeaweedFilerClient, d } // Skip .versions directories in regular list operations but track them for logical object creation - if strings.HasSuffix(entry.Name, ".versions") { + if strings.HasSuffix(entry.Name, s3_constants.VersionsFolder) { glog.V(4).Infof("Found .versions directory: %s", entry.Name) versionsDirs = append(versionsDirs, entry.Name) continue @@ -566,7 +566,7 @@ func (s3a *S3ApiServer) doListFilerEntries(client filer_pb.SeaweedFilerClient, d } // Extract object name from .versions directory name (remove .versions suffix) - baseObjectName := strings.TrimSuffix(versionsDir, ".versions") + baseObjectName := strings.TrimSuffix(versionsDir, s3_constants.VersionsFolder) // Construct full object path relative to bucket // dir is something like "/buckets/sea-test-1/Veeam/Backup/vbr/Config" diff --git a/weed/s3api/s3api_object_handlers_put.go b/weed/s3api/s3api_object_handlers_put.go index 1fff23545..148df89f6 100644 --- a/weed/s3api/s3api_object_handlers_put.go +++ b/weed/s3api/s3api_object_handlers_put.go @@ -463,7 +463,7 @@ func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, ob // Check if there's an existing null version in .versions directory and delete it // This ensures suspended versioning properly overwrites the null version as per S3 spec // Note: We only delete null versions, NOT regular versions (those should be preserved) - versionsObjectPath := normalizedObject + ".versions" + versionsObjectPath := normalizedObject + s3_constants.VersionsFolder versionsDir := bucketDir + "/" + versionsObjectPath entries, _, err := s3a.list(versionsDir, "", "", false, 1000) if err == nil { @@ -617,7 +617,7 @@ func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, ob // when a new "null" version becomes the latest during suspended versioning func (s3a *S3ApiServer) updateIsLatestFlagsForSuspendedVersioning(bucket, object string) error { bucketDir := s3a.option.BucketsPath + "/" + bucket - versionsObjectPath := object + ".versions" + versionsObjectPath := object + s3_constants.VersionsFolder versionsDir := bucketDir + "/" + versionsObjectPath glog.V(2).Infof("updateIsLatestFlagsForSuspendedVersioning: updating flags for %s%s", bucket, object) @@ -696,12 +696,12 @@ func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object strin // Upload directly to the versions directory // We need to construct the object path relative to the bucket - versionObjectPath := normalizedObject + ".versions/" + versionFileName + versionObjectPath := normalizedObject + s3_constants.VersionsFolder + "/" + versionFileName versionUploadUrl := s3a.toFilerUrl(bucket, versionObjectPath) // Ensure the .versions directory exists before uploading bucketDir := s3a.option.BucketsPath + "/" + bucket - versionsDir := normalizedObject + ".versions" + versionsDir := normalizedObject + s3_constants.VersionsFolder err := s3a.mkdir(bucketDir, versionsDir, func(entry *filer_pb.Entry) { entry.Attributes.Mime = s3_constants.FolderMimeType }) @@ -791,7 +791,7 @@ func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object strin // updateLatestVersionInDirectory updates the .versions directory metadata to indicate the latest version func (s3a *S3ApiServer) updateLatestVersionInDirectory(bucket, object, versionId, versionFileName string) error { bucketDir := s3a.option.BucketsPath + "/" + bucket - versionsObjectPath := object + ".versions" + versionsObjectPath := object + s3_constants.VersionsFolder // Get the current .versions directory entry with retry logic for filer consistency var versionsEntry *filer_pb.Entry diff --git a/weed/s3api/s3api_object_versioning.go b/weed/s3api/s3api_object_versioning.go index 4f1ff901f..17a00ee01 100644 --- a/weed/s3api/s3api_object_versioning.go +++ b/weed/s3api/s3api_object_versioning.go @@ -95,7 +95,7 @@ func generateVersionId() string { // getVersionedObjectDir returns the directory path for storing object versions func (s3a *S3ApiServer) getVersionedObjectDir(bucket, object string) string { - return path.Join(s3a.option.BucketsPath, bucket, object+".versions") + return path.Join(s3a.option.BucketsPath, bucket, object+s3_constants.VersionsFolder) } // getVersionFileName returns the filename for a specific version @@ -116,7 +116,7 @@ func (s3a *S3ApiServer) createDeleteMarker(bucket, object string) (string, error // Make sure to clean up the object path to remove leading slashes cleanObject := strings.TrimPrefix(object, "/") bucketDir := s3a.option.BucketsPath + "/" + bucket - versionsDir := bucketDir + "/" + cleanObject + ".versions" + versionsDir := bucketDir + "/" + cleanObject + s3_constants.VersionsFolder // Create the delete marker entry in the .versions directory err := s3a.mkFile(versionsDir, versionFileName, nil, func(entry *filer_pb.Entry) { @@ -301,9 +301,9 @@ func (s3a *S3ApiServer) findVersionsRecursively(currentPath, relativePath string } // Check if this is a .versions directory - if strings.HasSuffix(entry.Name, ".versions") { + if strings.HasSuffix(entry.Name, s3_constants.VersionsFolder) { // Extract object name from .versions directory name - objectKey := strings.TrimSuffix(entryPath, ".versions") + objectKey := strings.TrimSuffix(entryPath, s3_constants.VersionsFolder) normalizedObjectKey := removeDuplicateSlashes(objectKey) // Mark both keys as processed for backward compatibility processedObjects[objectKey] = true @@ -419,7 +419,7 @@ func (s3a *S3ApiServer) findVersionsRecursively(currentPath, relativePath string } // Check if a .versions directory exists for this object - versionsObjectPath := normalizedObjectKey + ".versions" + versionsObjectPath := normalizedObjectKey + s3_constants.VersionsFolder _, versionsErr := s3a.getEntry(currentPath, versionsObjectPath) if versionsErr == nil { // .versions directory exists @@ -497,7 +497,7 @@ func (s3a *S3ApiServer) getObjectVersionList(bucket, object string) ([]*ObjectVe // All versions are now stored in the .versions directory only bucketDir := s3a.option.BucketsPath + "/" + bucket - versionsObjectPath := object + ".versions" + versionsObjectPath := object + s3_constants.VersionsFolder glog.V(2).Infof("getObjectVersionList: checking versions directory %s", versionsObjectPath) // Get the .versions directory entry to read latest version metadata @@ -676,7 +676,7 @@ func (s3a *S3ApiServer) deleteSpecificObjectVersion(bucket, object, versionId st versionFile := s3a.getVersionFileName(versionId) // Check if this is the latest version before attempting deletion (for potential metadata update) - versionsEntry, dirErr := s3a.getEntry(path.Join(s3a.option.BucketsPath, bucket), normalizedObject+".versions") + versionsEntry, dirErr := s3a.getEntry(path.Join(s3a.option.BucketsPath, bucket), normalizedObject+s3_constants.VersionsFolder) isLatestVersion := false if dirErr == nil && versionsEntry.Extended != nil { if latestVersionIdBytes, hasLatest := versionsEntry.Extended[s3_constants.ExtLatestVersionIdKey]; hasLatest { @@ -715,7 +715,7 @@ func (s3a *S3ApiServer) deleteSpecificObjectVersion(bucket, object, versionId st func (s3a *S3ApiServer) updateLatestVersionAfterDeletion(bucket, object string) error { bucketDir := s3a.option.BucketsPath + "/" + bucket cleanObject := strings.TrimPrefix(object, "/") - versionsObjectPath := cleanObject + ".versions" + versionsObjectPath := cleanObject + s3_constants.VersionsFolder versionsDir := bucketDir + "/" + versionsObjectPath glog.V(1).Infof("updateLatestVersionAfterDeletion: updating latest version for %s/%s, listing %s", bucket, object, versionsDir) @@ -847,7 +847,7 @@ func (s3a *S3ApiServer) getLatestObjectVersion(bucket, object string) (*filer_pb normalizedObject := removeDuplicateSlashes(object) bucketDir := s3a.option.BucketsPath + "/" + bucket - versionsObjectPath := normalizedObject + ".versions" + versionsObjectPath := normalizedObject + s3_constants.VersionsFolder glog.V(1).Infof("getLatestObjectVersion: looking for latest version of %s/%s (normalized: %s)", bucket, object, normalizedObject) diff --git a/weed/util/net_timeout.go b/weed/util/net_timeout.go index 313d7f849..75e475f6b 100644 --- a/weed/util/net_timeout.go +++ b/weed/util/net_timeout.go @@ -13,7 +13,7 @@ const ( // minThroughputBytesPerSecond defines the minimum expected throughput (4KB/s) // Used to calculate timeout scaling based on data transferred minThroughputBytesPerSecond = 4000 - + // graceTimeCapMultiplier caps the grace period for slow clients at 3x base timeout // This prevents indefinite connections while allowing time for server-side chunk fetches graceTimeCapMultiplier = 3 @@ -90,17 +90,17 @@ func (c *Conn) Write(b []byte) (count int, e error) { // Calculate timeout with two components: // 1. Base timeout scaled by cumulative data (minimum throughput of 4KB/s) // 2. Additional grace period if there was a gap since last write (for chunk fetch delays) - + // Calculate expected bytes per timeout period based on minimum throughput (4KB/s) // Example: with WriteTimeout=30s, bytesPerTimeout = 4000 * 30 = 120KB // After writing 1MB: multiplier = 1,000,000/120,000 + 1 ≈ 9, baseTimeout = 30s * 9 = 270s bytesPerTimeout := calculateBytesPerTimeout(c.WriteTimeout) timeoutMultiplier := time.Duration(c.bytesWritten/bytesPerTimeout + 1) baseTimeout := c.WriteTimeout * timeoutMultiplier - + // If it's been a while since last write, add grace time for server-side chunk fetches // But cap it to avoid keeping slow clients connected indefinitely - // + // // The comparison uses unscaled WriteTimeout intentionally: triggers grace when idle time // exceeds base timeout, independent of throughput scaling. if !c.lastWrite.IsZero() { @@ -120,7 +120,7 @@ func (c *Conn) Write(b []byte) (count int, e error) { baseTimeout += graceTime } } - + err := c.Conn.SetWriteDeadline(now.Add(baseTimeout)) if err != nil { return 0, err