diff --git a/test/s3/delete/s3_conditional_delete_test.go b/test/s3/delete/s3_conditional_delete_test.go new file mode 100644 index 000000000..ca63343a1 --- /dev/null +++ b/test/s3/delete/s3_conditional_delete_test.go @@ -0,0 +1,135 @@ +package delete + +import ( + "bytes" + "context" + "errors" + "testing" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/aws/aws-sdk-go-v2/service/s3/types" + "github.com/aws/smithy-go" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestConditionalDeleteIfMatchOnLatestVersion(t *testing.T) { + client := getTestClient(t) + bucket := createTestBucket(t, client) + defer cleanupBucket(t, client, bucket) + + key := "conditional-delete.txt" + putResp, err := client.PutObject(context.TODO(), &s3.PutObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(key), + Body: bytes.NewReader([]byte("versioned body")), + }) + require.NoError(t, err) + require.NotNil(t, putResp.ETag) + + _, err = client.DeleteObject(context.TODO(), &s3.DeleteObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(key), + IfMatch: aws.String(`"not-the-current-etag"`), + }) + require.Error(t, err, "DeleteObject should reject a mismatched If-Match header") + + var apiErr smithy.APIError + if assert.True(t, errors.As(err, &apiErr), "Expected smithy API error for conditional delete") { + assert.Equal(t, "PreconditionFailed", apiErr.ErrorCode()) + } + + _, err = client.HeadObject(context.TODO(), &s3.HeadObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(key), + }) + require.NoError(t, err, "Object should remain current after a failed conditional delete") + + deleteResp, err := client.DeleteObject(context.TODO(), &s3.DeleteObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(key), + IfMatch: putResp.ETag, + }) + require.NoError(t, err) + require.NotNil(t, deleteResp.DeleteMarker) + assert.True(t, *deleteResp.DeleteMarker, "Successful conditional delete on a versioned bucket should create a delete marker") + require.NotNil(t, deleteResp.VersionId) + + _, err = client.HeadObject(context.TODO(), &s3.HeadObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(key), + }) + require.Error(t, err, "Delete marker should hide the current object after a successful conditional delete") +} + +func TestConditionalMultiDeletePerObjectETag(t *testing.T) { + client := getTestClient(t) + bucket := createTestBucket(t, client) + defer cleanupBucket(t, client, bucket) + + okKey := "delete-ok.txt" + failKey := "delete-fail.txt" + + okPutResp, err := client.PutObject(context.TODO(), &s3.PutObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(okKey), + Body: bytes.NewReader([]byte("delete me")), + }) + require.NoError(t, err) + require.NotNil(t, okPutResp.ETag) + + _, err = client.PutObject(context.TODO(), &s3.PutObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(failKey), + Body: bytes.NewReader([]byte("keep me")), + }) + require.NoError(t, err) + + deleteResp, err := client.DeleteObjects(context.TODO(), &s3.DeleteObjectsInput{ + Bucket: aws.String(bucket), + Delete: &types.Delete{ + Objects: []types.ObjectIdentifier{ + { + Key: aws.String(okKey), + ETag: okPutResp.ETag, + }, + { + Key: aws.String(failKey), + ETag: aws.String(`"mismatched-etag"`), + }, + }, + }, + }) + require.NoError(t, err) + require.Len(t, deleteResp.Deleted, 1, "One object should satisfy its ETag precondition") + require.Len(t, deleteResp.Errors, 1, "One object should report a precondition failure") + deletedKeys := make([]string, 0, len(deleteResp.Deleted)) + for _, deleted := range deleteResp.Deleted { + deletedKeys = append(deletedKeys, aws.ToString(deleted.Key)) + } + assert.Contains(t, deletedKeys, okKey) + + var matchedError *types.Error + for i := range deleteResp.Errors { + if aws.ToString(deleteResp.Errors[i].Key) == failKey { + matchedError = &deleteResp.Errors[i] + break + } + } + if assert.NotNil(t, matchedError, "Expected error entry for failed key") { + assert.Equal(t, "PreconditionFailed", aws.ToString(matchedError.Code)) + } + + _, err = client.HeadObject(context.TODO(), &s3.HeadObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(okKey), + }) + require.Error(t, err, "Successfully deleted key should no longer be current") + + _, err = client.HeadObject(context.TODO(), &s3.HeadObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(failKey), + }) + require.NoError(t, err, "Object with mismatched ETag should remain untouched") +} diff --git a/test/s3/distributed_lock/distributed_lock_cluster_test.go b/test/s3/distributed_lock/distributed_lock_cluster_test.go new file mode 100644 index 000000000..d7054b8ae --- /dev/null +++ b/test/s3/distributed_lock/distributed_lock_cluster_test.go @@ -0,0 +1,523 @@ +package distributed_lock + +import ( + "bufio" + "context" + "encoding/json" + "fmt" + "net" + "net/http" + "os" + "os/exec" + "path/filepath" + "strconv" + "strings" + "sync" + "testing" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/credentials" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/seaweedfs/seaweedfs/test/volume_server/framework" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +const ( + distributedLockTestRegion = "us-east-1" + distributedLockTestAccessKey = "some_access_key1" + distributedLockTestSecretKey = "some_secret_key1" + distributedLockTestGroup = "distributed-lock-it" +) + +type distributedLockCluster struct { + t testing.TB + baseDir string + configDir string + logsDir string + keepLogs bool + + weedBinary string + filerGroup string + s3Config string + + masterPort int + masterGrpcPort int + volumePort int + volumeGrpcPort int + filerPorts []int + filerGrpcPorts []int + s3Ports []int + s3GrpcPorts []int + + masterCmd *exec.Cmd + volumeCmd *exec.Cmd + filerCmds []*exec.Cmd + s3Cmds []*exec.Cmd + logFiles []*os.File + + cleanupOnce sync.Once +} + +type s3IdentityConfig struct { + Identities []s3Identity `json:"identities"` +} + +type s3Identity struct { + Name string `json:"name"` + Credentials []s3Credential `json:"credentials,omitempty"` + Actions []string `json:"actions"` +} + +type s3Credential struct { + AccessKey string `json:"accessKey"` + SecretKey string `json:"secretKey"` +} + +func startDistributedLockCluster(t *testing.T) *distributedLockCluster { + t.Helper() + + weedBinary, err := framework.FindOrBuildWeedBinary() + require.NoError(t, err, "resolve weed binary") + + baseDir, err := os.MkdirTemp("", "seaweedfs_s3_distributed_lock_") + require.NoError(t, err, "create temp directory") + + cluster := &distributedLockCluster{ + t: t, + baseDir: baseDir, + configDir: filepath.Join(baseDir, "config"), + logsDir: filepath.Join(baseDir, "logs"), + keepLogs: os.Getenv("S3_DISTRIBUTED_LOCK_KEEP_LOGS") == "1", + weedBinary: weedBinary, + filerGroup: distributedLockTestGroup, + filerCmds: make([]*exec.Cmd, 0, 2), + s3Cmds: make([]*exec.Cmd, 0, 2), + } + t.Cleanup(cluster.Stop) + + dirs := []string{ + cluster.configDir, + cluster.logsDir, + filepath.Join(baseDir, "master"), + filepath.Join(baseDir, "volume"), + filepath.Join(baseDir, "filer0"), + filepath.Join(baseDir, "filer1"), + } + for _, dir := range dirs { + require.NoError(t, os.MkdirAll(dir, 0o755), "create %s", dir) + } + + ports, err := allocatePorts(12) + require.NoError(t, err, "allocate ports") + cluster.masterPort = ports[0] + cluster.masterGrpcPort = ports[1] + cluster.volumePort = ports[2] + cluster.volumeGrpcPort = ports[3] + cluster.filerPorts = []int{ports[4], ports[6]} + cluster.filerGrpcPorts = []int{ports[5], ports[7]} + cluster.s3Ports = []int{ports[8], ports[10]} + cluster.s3GrpcPorts = []int{ports[9], ports[11]} + + require.NoError(t, cluster.writeSecurityConfig(), "write security config") + require.NoError(t, cluster.writeS3Config(), "write s3 config") + + require.NoError(t, cluster.startMaster(), "start master") + require.NoError(t, cluster.waitForHTTP("http://"+cluster.masterHTTPAddress()+"/dir/status", 30*time.Second), "wait for master\n%s", cluster.tailLog("master.log")) + + require.NoError(t, cluster.startVolume(), "start volume") + require.NoError(t, cluster.waitForHTTP("http://"+cluster.volumeHTTPAddress()+"/status", 30*time.Second), "wait for volume\n%s", cluster.tailLog("volume.log")) + require.NoError(t, cluster.waitForTCP(cluster.volumeGRPCAddress(), 30*time.Second), "wait for volume grpc\n%s", cluster.tailLog("volume.log")) + + for i := 0; i < 2; i++ { + require.NoError(t, cluster.startFiler(i), "start filer %d", i) + require.NoError(t, cluster.waitForTCP(cluster.filerGRPCAddress(i), 30*time.Second), "wait for filer %d grpc\n%s", i, cluster.tailLog(fmt.Sprintf("filer%d.log", i))) + } + require.NoError(t, cluster.waitForFilerCount(2, 30*time.Second), "wait for filer group registration") + + for i := 0; i < 2; i++ { + require.NoError(t, cluster.startS3(i), "start s3 %d", i) + client := cluster.newS3Client(t, cluster.s3Endpoint(i)) + require.NoError(t, cluster.waitForS3Ready(client, 30*time.Second), "wait for s3 %d\n%s", i, cluster.tailLog(fmt.Sprintf("s3-%d.log", i))) + } + + return cluster +} + +func (c *distributedLockCluster) Stop() { + if c == nil { + return + } + c.cleanupOnce.Do(func() { + for i := len(c.s3Cmds) - 1; i >= 0; i-- { + stopProcess(c.s3Cmds[i]) + } + for i := len(c.filerCmds) - 1; i >= 0; i-- { + stopProcess(c.filerCmds[i]) + } + stopProcess(c.volumeCmd) + stopProcess(c.masterCmd) + + for _, f := range c.logFiles { + _ = f.Close() + } + + if !c.keepLogs && !c.t.Failed() { + _ = os.RemoveAll(c.baseDir) + } else if c.baseDir != "" { + c.t.Logf("distributed lock integration logs kept at %s", c.baseDir) + } + }) +} + +func (c *distributedLockCluster) masterHTTPAddress() string { + return net.JoinHostPort("127.0.0.1", strconv.Itoa(c.masterPort)) +} + +func (c *distributedLockCluster) masterGRPCAddress() string { + return net.JoinHostPort("127.0.0.1", strconv.Itoa(c.masterGrpcPort)) +} + +func (c *distributedLockCluster) volumeHTTPAddress() string { + return net.JoinHostPort("127.0.0.1", strconv.Itoa(c.volumePort)) +} + +func (c *distributedLockCluster) volumeGRPCAddress() string { + return net.JoinHostPort("127.0.0.1", strconv.Itoa(c.volumeGrpcPort)) +} + +func (c *distributedLockCluster) filerServerAddress(index int) pb.ServerAddress { + return pb.NewServerAddress("127.0.0.1", c.filerPorts[index], c.filerGrpcPorts[index]) +} + +func (c *distributedLockCluster) filerGRPCAddress(index int) string { + return net.JoinHostPort("127.0.0.1", strconv.Itoa(c.filerGrpcPorts[index])) +} + +func (c *distributedLockCluster) s3Endpoint(index int) string { + return fmt.Sprintf("http://127.0.0.1:%d", c.s3Ports[index]) +} + +func (c *distributedLockCluster) startMaster() error { + logFile, err := c.openLog("master.log") + if err != nil { + return err + } + + args := []string{ + "-config_dir=" + c.configDir, + "master", + "-ip=127.0.0.1", + "-ip.bind=127.0.0.1", + "-port=" + strconv.Itoa(c.masterPort), + "-port.grpc=" + strconv.Itoa(c.masterGrpcPort), + "-mdir=" + filepath.Join(c.baseDir, "master"), + "-peers=none", + "-volumeSizeLimitMB=32", + "-defaultReplication=000", + } + + c.masterCmd = exec.Command(c.weedBinary, args...) + c.masterCmd.Dir = c.baseDir + c.masterCmd.Stdout = logFile + c.masterCmd.Stderr = logFile + return c.masterCmd.Start() +} + +func (c *distributedLockCluster) startVolume() error { + logFile, err := c.openLog("volume.log") + if err != nil { + return err + } + + masterAddress := string(pb.NewServerAddress("127.0.0.1", c.masterPort, c.masterGrpcPort)) + + args := []string{ + "-config_dir=" + c.configDir, + "volume", + "-ip=127.0.0.1", + "-ip.bind=127.0.0.1", + "-port=" + strconv.Itoa(c.volumePort), + "-port.grpc=" + strconv.Itoa(c.volumeGrpcPort), + "-dir=" + filepath.Join(c.baseDir, "volume"), + "-max=16", + "-master=" + masterAddress, + "-readMode=proxy", + } + + c.volumeCmd = exec.Command(c.weedBinary, args...) + c.volumeCmd.Dir = c.baseDir + c.volumeCmd.Stdout = logFile + c.volumeCmd.Stderr = logFile + return c.volumeCmd.Start() +} + +func (c *distributedLockCluster) startFiler(index int) error { + logFile, err := c.openLog(fmt.Sprintf("filer%d.log", index)) + if err != nil { + return err + } + + masterAddress := string(pb.NewServerAddress("127.0.0.1", c.masterPort, c.masterGrpcPort)) + + args := []string{ + "-config_dir=" + c.configDir, + "filer", + "-master=" + masterAddress, + "-filerGroup=" + c.filerGroup, + "-ip=127.0.0.1", + "-ip.bind=127.0.0.1", + "-port=" + strconv.Itoa(c.filerPorts[index]), + "-port.grpc=" + strconv.Itoa(c.filerGrpcPorts[index]), + "-defaultStoreDir=" + filepath.Join(c.baseDir, fmt.Sprintf("filer%d", index)), + } + + cmd := exec.Command(c.weedBinary, args...) + cmd.Dir = c.baseDir + cmd.Stdout = logFile + cmd.Stderr = logFile + if err := cmd.Start(); err != nil { + return err + } + c.filerCmds = append(c.filerCmds, cmd) + return nil +} + +func (c *distributedLockCluster) startS3(index int) error { + logFile, err := c.openLog(fmt.Sprintf("s3-%d.log", index)) + if err != nil { + return err + } + + filers := []string{string(c.filerServerAddress(0)), string(c.filerServerAddress(1))} + if index%2 == 1 { + filers[0], filers[1] = filers[1], filers[0] + } + + args := []string{ + "-config_dir=" + c.configDir, + "s3", + "-ip.bind=127.0.0.1", + "-port=" + strconv.Itoa(c.s3Ports[index]), + "-port.grpc=" + strconv.Itoa(c.s3GrpcPorts[index]), + "-port.iceberg=0", + "-filer=" + strings.Join(filers, ","), + "-config=" + c.s3Config, + "-iam.readOnly=false", + } + + cmd := exec.Command(c.weedBinary, args...) + cmd.Dir = c.baseDir + cmd.Stdout = logFile + cmd.Stderr = logFile + if err := cmd.Start(); err != nil { + return err + } + c.s3Cmds = append(c.s3Cmds, cmd) + return nil +} + +func (c *distributedLockCluster) writeSecurityConfig() error { + return os.WriteFile(filepath.Join(c.configDir, "security.toml"), []byte("# generated for distributed lock integration tests\n"), 0o644) +} + +func (c *distributedLockCluster) writeS3Config() error { + configPath := filepath.Join(c.configDir, "s3.json") + payload := s3IdentityConfig{ + Identities: []s3Identity{ + { + Name: "distributed-lock-admin", + Credentials: []s3Credential{ + { + AccessKey: distributedLockTestAccessKey, + SecretKey: distributedLockTestSecretKey, + }, + }, + Actions: []string{"Admin", "Read", "List", "Tagging", "Write"}, + }, + }, + } + data, err := json.MarshalIndent(payload, "", " ") + if err != nil { + return err + } + if err := os.WriteFile(configPath, data, 0o644); err != nil { + return err + } + c.s3Config = configPath + return nil +} + +func (c *distributedLockCluster) newS3Client(t testing.TB, endpoint string) *s3.Client { + t.Helper() + + cfg, err := config.LoadDefaultConfig(context.Background(), + config.WithRegion(distributedLockTestRegion), + config.WithRetryMaxAttempts(1), + config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider( + distributedLockTestAccessKey, + distributedLockTestSecretKey, + "", + )), + ) + require.NoError(t, err, "load aws config") + + return s3.NewFromConfig(cfg, func(o *s3.Options) { + o.BaseEndpoint = aws.String(endpoint) + o.UsePathStyle = true + }) +} + +func (c *distributedLockCluster) waitForS3Ready(client *s3.Client, timeout time.Duration) error { + deadline := time.Now().Add(timeout) + for time.Now().Before(deadline) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + _, err := client.ListBuckets(ctx, &s3.ListBucketsInput{}) + cancel() + if err == nil { + return nil + } + time.Sleep(200 * time.Millisecond) + } + return fmt.Errorf("timed out waiting for s3 readiness") +} + +func (c *distributedLockCluster) waitForFilerCount(expected int, timeout time.Duration) error { + conn, err := grpc.NewClient(c.masterGRPCAddress(), grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + return err + } + defer conn.Close() + + client := master_pb.NewSeaweedClient(conn) + deadline := time.Now().Add(timeout) + for time.Now().Before(deadline) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + resp, err := client.ListClusterNodes(ctx, &master_pb.ListClusterNodesRequest{ + ClientType: "filer", + FilerGroup: c.filerGroup, + }) + cancel() + if err == nil && len(resp.ClusterNodes) >= expected { + return nil + } + time.Sleep(200 * time.Millisecond) + } + return fmt.Errorf("timed out waiting for %d filers in group %q", expected, c.filerGroup) +} + +func (c *distributedLockCluster) waitForHTTP(url string, timeout time.Duration) error { + client := &net.Dialer{Timeout: time.Second} + httpClient := &httpClientWithDialer{dialer: client} + deadline := time.Now().Add(timeout) + for time.Now().Before(deadline) { + if err := httpClient.Get(url); err == nil { + return nil + } + time.Sleep(200 * time.Millisecond) + } + return fmt.Errorf("timed out waiting for http %s", url) +} + +func (c *distributedLockCluster) waitForTCP(addr string, timeout time.Duration) error { + deadline := time.Now().Add(timeout) + for time.Now().Before(deadline) { + conn, err := net.DialTimeout("tcp", addr, time.Second) + if err == nil { + _ = conn.Close() + return nil + } + time.Sleep(200 * time.Millisecond) + } + return fmt.Errorf("timed out waiting for tcp %s", addr) +} + +func (c *distributedLockCluster) openLog(name string) (*os.File, error) { + f, err := os.Create(filepath.Join(c.logsDir, name)) + if err != nil { + return nil, err + } + c.logFiles = append(c.logFiles, f) + return f, nil +} + +func (c *distributedLockCluster) tailLog(name string) string { + f, err := os.Open(filepath.Join(c.logsDir, name)) + if err != nil { + return "" + } + defer f.Close() + + scanner := bufio.NewScanner(f) + lines := make([]string, 0, 40) + for scanner.Scan() { + lines = append(lines, scanner.Text()) + if len(lines) > 40 { + lines = lines[1:] + } + } + return strings.Join(lines, "\n") +} + +func allocatePorts(count int) ([]int, error) { + listeners := make([]net.Listener, 0, count) + ports := make([]int, 0, count) + for i := 0; i < count; i++ { + l, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + for _, openListener := range listeners { + _ = openListener.Close() + } + return nil, err + } + listeners = append(listeners, l) + ports = append(ports, l.Addr().(*net.TCPAddr).Port) + } + for _, l := range listeners { + _ = l.Close() + } + return ports, nil +} + +func stopProcess(cmd *exec.Cmd) { + if cmd == nil || cmd.Process == nil { + return + } + + _ = cmd.Process.Signal(os.Interrupt) + done := make(chan error, 1) + go func() { + done <- cmd.Wait() + }() + + select { + case <-done: + case <-time.After(10 * time.Second): + _ = cmd.Process.Kill() + <-done + } +} + +type httpClientWithDialer struct { + dialer *net.Dialer +} + +func (h *httpClientWithDialer) Get(url string) error { + client := &http.Client{ + Timeout: time.Second, + Transport: &http.Transport{ + DialContext: h.dialer.DialContext, + }, + } + resp, err := client.Get(url) + if err != nil { + return err + } + _ = resp.Body.Close() + return nil +} diff --git a/test/s3/distributed_lock/distributed_lock_test.go b/test/s3/distributed_lock/distributed_lock_test.go new file mode 100644 index 000000000..31dd0f181 --- /dev/null +++ b/test/s3/distributed_lock/distributed_lock_test.go @@ -0,0 +1,181 @@ +package distributed_lock + +import ( + "bytes" + "context" + "errors" + "fmt" + "io" + "sort" + "strings" + "sync" + "testing" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/aws/smithy-go" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" + "github.com/seaweedfs/seaweedfs/weed/util" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestConditionalPutIfNoneMatchDistributedLockAcrossS3Gateways(t *testing.T) { + if testing.Short() { + t.Skip("skipping distributed lock integration test in short mode") + } + + cluster := startDistributedLockCluster(t) + clientA := cluster.newS3Client(t, cluster.s3Endpoint(0)) + clientB := cluster.newS3Client(t, cluster.s3Endpoint(1)) + + bucket := fmt.Sprintf("distributed-lock-%d", time.Now().UnixNano()) + _, err := clientA.CreateBucket(context.Background(), &s3.CreateBucketInput{ + Bucket: aws.String(bucket), + }) + require.NoError(t, err) + + require.Eventually(t, func() bool { + _, err := clientB.HeadBucket(context.Background(), &s3.HeadBucketInput{ + Bucket: aws.String(bucket), + }) + return err == nil + }, 30*time.Second, 200*time.Millisecond, "bucket should replicate to the second filer-backed gateway") + + keysByOwner := cluster.findLockOwnerKeys(bucket, "conditional-put") + require.Len(t, keysByOwner, len(cluster.filerPorts), "should exercise both filer lock owners") + + for owner, key := range keysByOwner { + owner := owner + key := key + t.Run(lockOwnerLabel(owner), func(t *testing.T) { + runConditionalPutRace(t, []s3RaceClient{ + {name: "s3-a", client: clientA}, + {name: "s3-b", client: clientB}, + }, bucket, key) + }) + } +} + +type s3RaceClient struct { + name string + client *s3.Client +} + +type putAttemptResult struct { + clientName string + body string + err error +} + +func runConditionalPutRace(t *testing.T, clients []s3RaceClient, bucket, key string) { + t.Helper() + + start := make(chan struct{}) + results := make(chan putAttemptResult, len(clients)*2) + var wg sync.WaitGroup + + for _, client := range clients { + for attempt := 0; attempt < 2; attempt++ { + wg.Add(1) + body := fmt.Sprintf("%s-attempt-%d", client.name, attempt) + go func(client s3RaceClient, body string) { + defer wg.Done() + <-start + + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + + _, err := client.client.PutObject(ctx, &s3.PutObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(key), + IfNoneMatch: aws.String("*"), + Body: bytes.NewReader([]byte(body)), + }) + results <- putAttemptResult{ + clientName: client.name, + body: body, + err: err, + } + }(client, body) + } + } + + close(start) + wg.Wait() + close(results) + + successes := 0 + preconditionFailures := 0 + winnerBody := "" + unexpectedErrors := make([]string, 0) + + for result := range results { + if result.err == nil { + successes++ + winnerBody = result.body + continue + } + if isPreconditionFailed(result.err) { + preconditionFailures++ + continue + } + unexpectedErrors = append(unexpectedErrors, fmt.Sprintf("%s: %v", result.clientName, result.err)) + } + + require.Empty(t, unexpectedErrors, "unexpected race errors") + require.Equal(t, 1, successes, "exactly one write should win") + require.Equal(t, len(clients)*2-1, preconditionFailures, "all losing writes should fail with 412") + + object, err := clients[0].client.GetObject(context.Background(), &s3.GetObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(key), + }) + require.NoError(t, err) + defer object.Body.Close() + + data, err := io.ReadAll(object.Body) + require.NoError(t, err) + assert.Equal(t, winnerBody, string(data), "stored object body should match the successful request") +} + +func isPreconditionFailed(err error) bool { + var apiErr smithy.APIError + return errors.As(err, &apiErr) && apiErr.ErrorCode() == "PreconditionFailed" +} + +func (c *distributedLockCluster) findLockOwnerKeys(bucket, prefix string) map[pb.ServerAddress]string { + owners := make([]pb.ServerAddress, 0, len(c.filerPorts)) + for i := range c.filerPorts { + owners = append(owners, c.filerServerAddress(i)) + } + sort.Slice(owners, func(i, j int) bool { + return owners[i] < owners[j] + }) + + keysByOwner := make(map[pb.ServerAddress]string, len(owners)) + for i := 0; i < 1024 && len(keysByOwner) < len(owners); i++ { + key := fmt.Sprintf("%s-%03d.txt", prefix, i) + lockOwner := ownerForObjectLock(bucket, key, owners) + if _, exists := keysByOwner[lockOwner]; !exists { + keysByOwner[lockOwner] = key + } + } + return keysByOwner +} + +func ownerForObjectLock(bucket, object string, owners []pb.ServerAddress) pb.ServerAddress { + lockKey := fmt.Sprintf("s3.object.write:/buckets/%s/%s", bucket, s3_constants.NormalizeObjectKey(object)) + hash := util.HashStringToLong(lockKey) + if hash < 0 { + hash = -hash + } + return owners[hash%int64(len(owners))] +} + +func lockOwnerLabel(owner pb.ServerAddress) string { + replacer := strings.NewReplacer(":", "_", ".", "_") + return "owner_" + replacer.Replace(string(owner)) +} diff --git a/test/s3/versioning/s3_copy_versioning_regression_test.go b/test/s3/versioning/s3_copy_versioning_regression_test.go new file mode 100644 index 000000000..fc419b0ad --- /dev/null +++ b/test/s3/versioning/s3_copy_versioning_regression_test.go @@ -0,0 +1,147 @@ +package s3api + +import ( + "bytes" + "context" + "fmt" + "io" + "net/url" + "testing" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/aws/aws-sdk-go-v2/service/s3/types" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func versioningCopySource(bucketName, key string) string { + return fmt.Sprintf("%s/%s", bucketName, url.PathEscape(key)) +} + +func suspendVersioning(t *testing.T, client *s3.Client, bucketName string) { + _, err := client.PutBucketVersioning(context.TODO(), &s3.PutBucketVersioningInput{ + Bucket: aws.String(bucketName), + VersioningConfiguration: &types.VersioningConfiguration{ + Status: types.BucketVersioningStatusSuspended, + }, + }) + require.NoError(t, err) +} + +func TestVersioningSelfCopyMetadataReplaceCreatesNewVersion(t *testing.T) { + client := getS3Client(t) + bucketName := getNewBucketName() + + createBucket(t, client, bucketName) + defer deleteBucket(t, client, bucketName) + + enableVersioning(t, client, bucketName) + checkVersioningStatus(t, client, bucketName, types.BucketVersioningStatusEnabled) + + objectKey := "self-copy-versioned.txt" + initialContent := []byte("copy me without changing the body") + + putResp, err := client.PutObject(context.TODO(), &s3.PutObjectInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + Body: bytes.NewReader(initialContent), + Metadata: map[string]string{"stage": "one"}, + }) + require.NoError(t, err) + require.NotNil(t, putResp.VersionId) + + copyResp, err := client.CopyObject(context.TODO(), &s3.CopyObjectInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + CopySource: aws.String(versioningCopySource(bucketName, objectKey)), + Metadata: map[string]string{"stage": "two"}, + MetadataDirective: types.MetadataDirectiveReplace, + }) + require.NoError(t, err, "Self-copy with metadata replacement should succeed") + require.NotNil(t, copyResp.VersionId, "Versioned self-copy should create a new version") + require.NotEqual(t, *putResp.VersionId, *copyResp.VersionId, "Self-copy should create a distinct version") + + headLatestResp, err := client.HeadObject(context.TODO(), &s3.HeadObjectInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + }) + require.NoError(t, err) + assert.Equal(t, "two", headLatestResp.Metadata["stage"], "Latest version should expose replaced metadata") + + headOriginalResp, err := client.HeadObject(context.TODO(), &s3.HeadObjectInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + VersionId: putResp.VersionId, + }) + require.NoError(t, err) + assert.Equal(t, "one", headOriginalResp.Metadata["stage"], "Previous version metadata should remain intact") + + getResp, err := client.GetObject(context.TODO(), &s3.GetObjectInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + }) + require.NoError(t, err) + defer getResp.Body.Close() + body, err := io.ReadAll(getResp.Body) + require.NoError(t, err) + assert.Equal(t, initialContent, body, "Self-copy should not alter the object body") + + versionsResp, err := client.ListObjectVersions(context.TODO(), &s3.ListObjectVersionsInput{ + Bucket: aws.String(bucketName), + Prefix: aws.String(objectKey), + }) + require.NoError(t, err) + require.Len(t, versionsResp.Versions, 2, "Self-copy should append a new current version") + assert.Equal(t, *copyResp.VersionId, *versionsResp.Versions[0].VersionId, "New copy version should be latest") +} + +func TestVersioningSelfCopyMetadataReplaceSuspendedKeepsNullVersion(t *testing.T) { + client := getS3Client(t) + bucketName := getNewBucketName() + + createBucket(t, client, bucketName) + defer deleteBucket(t, client, bucketName) + + enableVersioning(t, client, bucketName) + suspendVersioning(t, client, bucketName) + checkVersioningStatus(t, client, bucketName, types.BucketVersioningStatusSuspended) + + objectKey := "self-copy-suspended.txt" + initialContent := []byte("null version content") + + _, err := client.PutObject(context.TODO(), &s3.PutObjectInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + Body: bytes.NewReader(initialContent), + Metadata: map[string]string{"stage": "one"}, + }) + require.NoError(t, err) + + copyResp, err := client.CopyObject(context.TODO(), &s3.CopyObjectInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + CopySource: aws.String(versioningCopySource(bucketName, objectKey)), + Metadata: map[string]string{"stage": "two"}, + MetadataDirective: types.MetadataDirectiveReplace, + }) + require.NoError(t, err, "Suspended self-copy with metadata replacement should succeed") + assert.Nil(t, copyResp.VersionId, "Suspended versioning should not return a version header for the current null version") + + headResp, err := client.HeadObject(context.TODO(), &s3.HeadObjectInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + }) + require.NoError(t, err) + assert.Equal(t, "two", headResp.Metadata["stage"], "Null current version should be updated in place") + + versionsResp, err := client.ListObjectVersions(context.TODO(), &s3.ListObjectVersionsInput{ + Bucket: aws.String(bucketName), + Prefix: aws.String(objectKey), + }) + require.NoError(t, err) + require.Len(t, versionsResp.Versions, 1, "Suspended self-copy should keep a single null current version") + require.NotNil(t, versionsResp.Versions[0].VersionId) + assert.Equal(t, "null", *versionsResp.Versions[0].VersionId, "Suspended self-copy should preserve null-version semantics") + assert.True(t, *versionsResp.Versions[0].IsLatest, "Null version should remain latest") +} diff --git a/test/s3/versioning/s3_suspended_delete_marker_regression_test.go b/test/s3/versioning/s3_suspended_delete_marker_regression_test.go new file mode 100644 index 000000000..a2d670289 --- /dev/null +++ b/test/s3/versioning/s3_suspended_delete_marker_regression_test.go @@ -0,0 +1,82 @@ +package s3api + +import ( + "bytes" + "context" + "io" + "testing" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestSuspendedDeleteCreatesDeleteMarker(t *testing.T) { + client := getS3Client(t) + bucketName := getNewBucketName() + + createBucket(t, client, bucketName) + defer deleteBucket(t, client, bucketName) + + enableVersioning(t, client, bucketName) + + objectKey := "suspended-delete-marker.txt" + versionedResp, err := client.PutObject(context.TODO(), &s3.PutObjectInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + Body: bytes.NewReader([]byte("versioned-content")), + }) + require.NoError(t, err) + require.NotNil(t, versionedResp.VersionId) + + suspendVersioning(t, client, bucketName) + + _, err = client.PutObject(context.TODO(), &s3.PutObjectInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + Body: bytes.NewReader([]byte("null-version-content")), + }) + require.NoError(t, err) + + deleteResp, err := client.DeleteObject(context.TODO(), &s3.DeleteObjectInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + }) + require.NoError(t, err) + require.NotNil(t, deleteResp.DeleteMarker) + assert.True(t, *deleteResp.DeleteMarker) + require.NotNil(t, deleteResp.VersionId) + + listResp, err := client.ListObjectVersions(context.TODO(), &s3.ListObjectVersionsInput{ + Bucket: aws.String(bucketName), + }) + require.NoError(t, err) + require.Len(t, listResp.DeleteMarkers, 1) + + deleteMarker := listResp.DeleteMarkers[0] + require.NotNil(t, deleteMarker.Key) + assert.Equal(t, objectKey, *deleteMarker.Key) + require.NotNil(t, deleteMarker.VersionId) + assert.Equal(t, *deleteResp.VersionId, *deleteMarker.VersionId) + require.NotNil(t, deleteMarker.IsLatest) + assert.True(t, *deleteMarker.IsLatest) + + _, err = client.GetObject(context.TODO(), &s3.GetObjectInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + }) + require.Error(t, err) + + getVersionedResp, err := client.GetObject(context.TODO(), &s3.GetObjectInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + VersionId: versionedResp.VersionId, + }) + require.NoError(t, err) + defer getVersionedResp.Body.Close() + + body, err := io.ReadAll(getVersionedResp.Body) + require.NoError(t, err) + assert.Equal(t, "versioned-content", string(body)) +} diff --git a/test/s3/versioning/s3_versioning_multipart_test.go b/test/s3/versioning/s3_versioning_multipart_test.go index 23520c806..873c9cca7 100644 --- a/test/s3/versioning/s3_versioning_multipart_test.go +++ b/test/s3/versioning/s3_versioning_multipart_test.go @@ -499,12 +499,16 @@ func TestMultipartUploadDeleteMarkerListBehavior(t *testing.T) { t.Logf("Successfully retrieved version %s after delete marker", multipartVersionId) // Delete the delete marker to "undelete" the object - _, err = client.DeleteObject(context.TODO(), &s3.DeleteObjectInput{ + undeleteResp, err := client.DeleteObject(context.TODO(), &s3.DeleteObjectInput{ Bucket: aws.String(bucketName), Key: aws.String(objectKey), VersionId: aws.String(deleteMarkerVersionId), }) require.NoError(t, err, "Failed to delete the delete marker") + require.NotNil(t, undeleteResp.DeleteMarker, "Deleting a delete marker version should report DeleteMarker=true") + assert.True(t, *undeleteResp.DeleteMarker, "Deleting a delete marker version should report DeleteMarker=true") + require.NotNil(t, undeleteResp.VersionId, "Deleting a delete marker version should echo the version ID") + assert.Equal(t, deleteMarkerVersionId, *undeleteResp.VersionId, "DeleteObject should report the deleted delete-marker version ID") // ListObjectsV2 should show the object again listAfterUndelete, err := client.ListObjectsV2(context.TODO(), &s3.ListObjectsV2Input{ @@ -518,3 +522,76 @@ func TestMultipartUploadDeleteMarkerListBehavior(t *testing.T) { t.Logf("Object restored after delete marker removal, ETag=%s", multipartETag) } + +func TestVersioningCompleteMultipartUploadIsIdempotent(t *testing.T) { + client := getS3Client(t) + bucketName := getNewBucketName() + + createBucket(t, client, bucketName) + defer deleteBucket(t, client, bucketName) + + enableVersioning(t, client, bucketName) + checkVersioningStatus(t, client, bucketName, types.BucketVersioningStatusEnabled) + + objectKey := "multipart-idempotent-object" + createResp, err := client.CreateMultipartUpload(context.TODO(), &s3.CreateMultipartUploadInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + }) + require.NoError(t, err, "Failed to create multipart upload") + + partSize := 5 * 1024 * 1024 + part1Data := bytes.Repeat([]byte("i"), partSize) + part2Data := bytes.Repeat([]byte("j"), partSize) + + uploadPart1Resp, err := client.UploadPart(context.TODO(), &s3.UploadPartInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + UploadId: createResp.UploadId, + PartNumber: aws.Int32(1), + Body: bytes.NewReader(part1Data), + }) + require.NoError(t, err, "Failed to upload first part") + + uploadPart2Resp, err := client.UploadPart(context.TODO(), &s3.UploadPartInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + UploadId: createResp.UploadId, + PartNumber: aws.Int32(2), + Body: bytes.NewReader(part2Data), + }) + require.NoError(t, err, "Failed to upload second part") + + completeInput := &s3.CompleteMultipartUploadInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + UploadId: createResp.UploadId, + MultipartUpload: &types.CompletedMultipartUpload{ + Parts: []types.CompletedPart{ + {ETag: uploadPart1Resp.ETag, PartNumber: aws.Int32(1)}, + {ETag: uploadPart2Resp.ETag, PartNumber: aws.Int32(2)}, + }, + }, + } + + firstCompleteResp, err := client.CompleteMultipartUpload(context.TODO(), completeInput) + require.NoError(t, err, "First CompleteMultipartUpload should succeed") + require.NotNil(t, firstCompleteResp.ETag) + require.NotNil(t, firstCompleteResp.VersionId) + + secondCompleteResp, err := client.CompleteMultipartUpload(context.TODO(), completeInput) + require.NoError(t, err, "Repeated CompleteMultipartUpload should return the existing object instead of creating a second version") + require.NotNil(t, secondCompleteResp.ETag) + require.NotNil(t, secondCompleteResp.VersionId, "Repeated complete should report the existing version ID") + assert.Equal(t, *firstCompleteResp.ETag, *secondCompleteResp.ETag, "Repeated complete should report the same ETag") + assert.Equal(t, *firstCompleteResp.VersionId, *secondCompleteResp.VersionId, "Repeated complete should report the same version ID") + + versionsResp, err := client.ListObjectVersions(context.TODO(), &s3.ListObjectVersionsInput{ + Bucket: aws.String(bucketName), + Prefix: aws.String(objectKey), + }) + require.NoError(t, err, "Failed to list object versions") + require.Len(t, versionsResp.Versions, 1, "Repeated completion must not create a duplicate version") + assert.Equal(t, *firstCompleteResp.VersionId, *versionsResp.Versions[0].VersionId, "The original multipart version should remain current") + assert.Empty(t, versionsResp.DeleteMarkers, "Repeated completion should not create delete markers") +} diff --git a/weed/s3api/filer_multipart.go b/weed/s3api/filer_multipart.go index ef9b58733..6cf5fa493 100644 --- a/weed/s3api/filer_multipart.go +++ b/weed/s3api/filer_multipart.go @@ -14,7 +14,6 @@ import ( "net/url" "path" "slices" - "sort" "strconv" "strings" "time" @@ -166,63 +165,73 @@ func copySSEHeadersFromFirstPart(dst *filer_pb.Entry, firstPart *filer_pb.Entry, } } -func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.CompleteMultipartUploadInput, parts *CompleteMultipartUpload) (output *CompleteMultipartUploadResult, code s3err.ErrorCode) { +type multipartPartBoundary struct { + PartNumber int `json:"part"` + StartChunk int `json:"start"` + EndChunk int `json:"end"` + ETag string `json:"etag"` +} - glog.V(2).Infof("completeMultipartUpload input %v", input) - if len(parts.Parts) == 0 { - stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedNoSuchUpload).Inc() - return nil, s3err.ErrNoSuchUpload - } - completedPartNumbers := []int{} - completedPartMap := make(map[int][]string) +type multipartCompletionState struct { + deleteEntries []*filer_pb.Entry + partEntries map[int][]*filer_pb.Entry + pentry *filer_pb.Entry + mime string + finalParts []*filer_pb.FileChunk + offset int64 + partBoundaries []multipartPartBoundary + multipartETag string + entityWithTtl bool +} - maxPartNo := 1 +func completeMultipartResult(r *http.Request, input *s3.CompleteMultipartUploadInput, etag string, entry *filer_pb.Entry) *CompleteMultipartUploadResult { + result := &CompleteMultipartUploadResult{ + Location: aws.String(fmt.Sprintf("%s://%s/%s/%s", getRequestScheme(r), r.Host, url.PathEscape(*input.Bucket), urlPathEscape(*input.Key))), + Bucket: input.Bucket, + ETag: aws.String(etag), + Key: objectKey(input.Key), + } + if entry != nil && entry.Extended != nil { + if versionIdBytes, ok := entry.Extended[s3_constants.ExtVersionIdKey]; ok { + versionId := string(versionIdBytes) + if versionId != "" && versionId != "null" { + result.VersionId = aws.String(versionId) + } + } + } + return result +} - for _, part := range parts.Parts { - if _, ok := completedPartMap[part.PartNumber]; !ok { - completedPartNumbers = append(completedPartNumbers, part.PartNumber) +func (s3a *S3ApiServer) prepareMultipartCompletionState(r *http.Request, input *s3.CompleteMultipartUploadInput, uploadDirectory, entryName, dirName string, completedPartNumbers []int, completedPartMap map[int][]string, maxPartNo int) (*multipartCompletionState, *CompleteMultipartUploadResult, s3err.ErrorCode) { + if entry, err := s3a.resolveObjectEntry(*input.Bucket, *input.Key); err == nil && entry != nil && entry.Extended != nil { + if uploadId, ok := entry.Extended[s3_constants.SeaweedFSUploadId]; ok && *input.UploadId == string(uploadId) { + cleanupEntries, _, cleanupErr := s3a.list(uploadDirectory, "", "", false, s3_constants.MaxS3MultipartParts+1) + if cleanupErr != nil && !errors.Is(cleanupErr, filer_pb.ErrNotFound) { + glog.Warningf("completeMultipartUpload: failed to list stale upload directory %s for cleanup: %v", uploadDirectory, cleanupErr) + } + return &multipartCompletionState{deleteEntries: cleanupEntries}, completeMultipartResult(r, input, getEtagFromEntry(entry), entry), s3err.ErrNone } - completedPartMap[part.PartNumber] = append(completedPartMap[part.PartNumber], part.ETag) - maxPartNo = maxInt(maxPartNo, part.PartNumber) } - sort.Ints(completedPartNumbers) - uploadDirectory := s3a.genUploadsFolder(*input.Bucket) + "/" + *input.UploadId - // Use explicit limit to ensure all parts are listed (up to S3's max of 10,000 parts) - // Previously limit=0 relied on server's DirListingLimit default (1000 in weed server mode), - // which caused CompleteMultipartUpload to fail for uploads with more than 1000 parts. entries, _, err := s3a.list(uploadDirectory, "", "", false, s3_constants.MaxS3MultipartParts+1) if err != nil { - glog.Errorf("completeMultipartUpload %s %s error: %v, entries:%d", *input.Bucket, *input.UploadId, err, len(entries)) + glog.Errorf("completeMultipartUpload %s %s error: %v", *input.Bucket, *input.UploadId, err) stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedNoSuchUpload).Inc() - return nil, s3err.ErrNoSuchUpload + return nil, nil, s3err.ErrNoSuchUpload } - if len(entries) == 0 { - entryName, dirName := s3a.getEntryNameAndDir(input) - if entry, _ := s3a.getEntry(dirName, entryName); entry != nil && entry.Extended != nil { - if uploadId, ok := entry.Extended[s3_constants.SeaweedFSUploadId]; ok && *input.UploadId == string(uploadId) { - // Location uses the S3 endpoint that the client connected to - // Format: scheme://s3-endpoint/bucket/object (following AWS S3 API) - return &CompleteMultipartUploadResult{ - Location: aws.String(fmt.Sprintf("%s://%s/%s/%s", getRequestScheme(r), r.Host, url.PathEscape(*input.Bucket), urlPathEscape(*input.Key))), - Bucket: input.Bucket, - ETag: aws.String(getEtagFromEntry(entry)), - Key: objectKey(input.Key), - }, s3err.ErrNone - } - } stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedNoSuchUpload).Inc() - return nil, s3err.ErrNoSuchUpload + return nil, nil, s3err.ErrNoSuchUpload } pentry, err := s3a.getEntry(s3a.genUploadsFolder(*input.Bucket), *input.UploadId) if err != nil { glog.Errorf("completeMultipartUpload %s %s error: %v", *input.Bucket, *input.UploadId, err) stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedNoSuchUpload).Inc() - return nil, s3err.ErrNoSuchUpload + return nil, nil, s3err.ErrNoSuchUpload } - deleteEntries := []*filer_pb.Entry{} + + deleteEntries := make([]*filer_pb.Entry, 0) partEntries := make(map[int][]*filer_pb.Entry, len(entries)) entityTooSmall := false entityWithTtl := false @@ -232,10 +241,10 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl if entry.IsDirectory || !strings.HasSuffix(entry.Name, multipartExt) { continue } - partNumber, err := parsePartNumber(entry.Name) - if err != nil { + partNumber, parseErr := parsePartNumber(entry.Name) + if parseErr != nil { stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedPartNumber).Inc() - glog.Errorf("completeMultipartUpload failed to pasre partNumber %s:%s", entry.Name, err) + glog.Errorf("completeMultipartUpload failed to parse partNumber %s:%s", entry.Name, parseErr) continue } completedPartsByNumber, ok := completedPartMap[partNumber] @@ -259,7 +268,6 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedPartEmpty).Inc() continue } - //there maybe multi same part, because of client retry partEntries[partNumber] = append(partEntries[partNumber], entry) foundEntry = true } @@ -278,27 +286,23 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl } if entityTooSmall { stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompleteEntityTooSmall).Inc() - return nil, s3err.ErrEntityTooSmall + return nil, nil, s3err.ErrEntityTooSmall } - mime := pentry.Attributes.Mime - var finalParts []*filer_pb.FileChunk - var offset int64 - // Track part boundaries for later retrieval with PartNumber parameter - type PartBoundary struct { - PartNumber int `json:"part"` - StartChunk int `json:"start"` - EndChunk int `json:"end"` // exclusive - ETag string `json:"etag"` + mime := "" + if pentry.Attributes != nil { + mime = pentry.Attributes.Mime } - var partBoundaries []PartBoundary + finalParts := make([]*filer_pb.FileChunk, 0) + partBoundaries := make([]multipartPartBoundary, 0, len(completedPartNumbers)) + var offset int64 for _, partNumber := range completedPartNumbers { partEntriesByNumber, ok := partEntries[partNumber] if !ok { glog.Errorf("part %d has no entry", partNumber) stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedPartNotFound).Inc() - return nil, s3err.ErrInvalidPart + return nil, nil, s3err.ErrInvalidPart } found := false @@ -312,20 +316,11 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl continue } - // Record the start chunk index for this part partStartChunk := len(finalParts) - - // Calculate the part's ETag (for GetObject with PartNumber) partETag := filer.ETag(entry) for _, chunk := range entry.GetChunks() { - // CRITICAL: Do NOT modify SSE metadata offsets during assembly! - // The encrypted data was created with the offset stored in chunk.SseMetadata. - // Changing the offset here would cause decryption to fail because CTR mode - // uses the offset to initialize the counter. We must decrypt with the same - // offset that was used during encryption. - - p := &filer_pb.FileChunk{ + finalParts = append(finalParts, &filer_pb.FileChunk{ FileId: chunk.GetFileIdString(), Offset: offset, Size: chunk.Size, @@ -333,17 +328,14 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl CipherKey: chunk.CipherKey, ETag: chunk.ETag, IsCompressed: chunk.IsCompressed, - // Preserve SSE metadata UNCHANGED - do not modify the offset! - SseType: chunk.SseType, - SseMetadata: chunk.SseMetadata, - } - finalParts = append(finalParts, p) + SseType: chunk.SseType, + SseMetadata: chunk.SseMetadata, + }) offset += int64(chunk.Size) } - // Record the part boundary partEndChunk := len(finalParts) - partBoundaries = append(partBoundaries, PartBoundary{ + partBoundaries = append(partBoundaries, multipartPartBoundary{ PartNumber: partNumber, StartChunk: partStartChunk, EndChunk: partEndChunk, @@ -354,165 +346,218 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl } } + return &multipartCompletionState{ + deleteEntries: deleteEntries, + partEntries: partEntries, + pentry: pentry, + mime: mime, + finalParts: finalParts, + offset: offset, + partBoundaries: partBoundaries, + multipartETag: calculateMultipartETag(partEntries, completedPartNumbers), + entityWithTtl: entityWithTtl, + }, nil, s3err.ErrNone +} + +func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.CompleteMultipartUploadInput, parts *CompleteMultipartUpload) (output *CompleteMultipartUploadResult, code s3err.ErrorCode) { + + glog.V(2).Infof("completeMultipartUpload input %v", input) + if len(parts.Parts) == 0 { + stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedNoSuchUpload).Inc() + return nil, s3err.ErrNoSuchUpload + } + completedPartNumbers := []int{} + completedPartMap := make(map[int][]string) + + maxPartNo := 1 + lastSeenPartNo := 0 + + for _, part := range parts.Parts { + if part.PartNumber < lastSeenPartNo { + return nil, s3err.ErrInvalidPartOrder + } + lastSeenPartNo = part.PartNumber + if _, ok := completedPartMap[part.PartNumber]; !ok { + completedPartNumbers = append(completedPartNumbers, part.PartNumber) + } + completedPartMap[part.PartNumber] = append(completedPartMap[part.PartNumber], part.ETag) + maxPartNo = maxInt(maxPartNo, part.PartNumber) + } + + uploadDirectory := s3a.genUploadsFolder(*input.Bucket) + "/" + *input.UploadId entryName, dirName := s3a.getEntryNameAndDir(input) + var completionState *multipartCompletionState + finalizeCode := s3a.withObjectWriteLock(*input.Bucket, *input.Key, func() s3err.ErrorCode { + return s3a.checkConditionalHeaders(r, *input.Bucket, *input.Key) + }, func() s3err.ErrorCode { + var prepCode s3err.ErrorCode + completionState, output, prepCode = s3a.prepareMultipartCompletionState(r, input, uploadDirectory, entryName, dirName, completedPartNumbers, completedPartMap, maxPartNo) + if prepCode != s3err.ErrNone || output != nil { + return prepCode + } - // Precompute ETag once for consistency across all paths - multipartETag := calculateMultipartETag(partEntries, completedPartNumbers) - etagQuote := "\"" + multipartETag + "\"" - - // Check if versioning is configured for this bucket BEFORE creating any files - versioningState, vErr := s3a.getVersioningState(*input.Bucket) - if vErr == nil && versioningState == s3_constants.VersioningEnabled { - // Use full object key (not just entryName) to ensure correct .versions directory is checked - normalizedKey := strings.TrimPrefix(*input.Key, "/") - useInvertedFormat := s3a.getVersionIdFormat(*input.Bucket, normalizedKey) - versionId := generateVersionId(useInvertedFormat) - versionFileName := s3a.getVersionFileName(versionId) - versionDir := dirName + "/" + entryName + s3_constants.VersionsFolder - - // Capture timestamp and owner once for consistency between version entry and cache entry - versionMtime := time.Now().Unix() - amzAccountId := r.Header.Get(s3_constants.AmzAccountId) + etagQuote := "\"" + completionState.multipartETag + "\"" + // Check if versioning is configured for this bucket BEFORE creating any files. + versioningState, vErr := s3a.getVersioningState(*input.Bucket) + if vErr != nil { + glog.Errorf("completeMultipartUpload: failed to get versioning state for bucket %s: %v", *input.Bucket, vErr) + return s3err.ErrInternalError + } + if versioningState == s3_constants.VersioningEnabled { + // Use full object key (not just entryName) to ensure correct .versions directory is checked + normalizedKey := strings.TrimPrefix(*input.Key, "/") + useInvertedFormat := s3a.getVersionIdFormat(*input.Bucket, normalizedKey) + versionId := generateVersionId(useInvertedFormat) + versionFileName := s3a.getVersionFileName(versionId) + versionDir := dirName + "/" + entryName + s3_constants.VersionsFolder + + // Capture timestamp and owner once for consistency between version entry and cache entry + versionMtime := time.Now().Unix() + amzAccountId := r.Header.Get(s3_constants.AmzAccountId) - // Create the version file in the .versions directory - err = s3a.mkFile(versionDir, versionFileName, finalParts, func(versionEntry *filer_pb.Entry) { - if versionEntry.Extended == nil { - versionEntry.Extended = make(map[string][]byte) - } - versionEntry.Extended[s3_constants.ExtVersionIdKey] = []byte(versionId) - versionEntry.Extended[s3_constants.SeaweedFSUploadId] = []byte(*input.UploadId) - // Store parts count for x-amz-mp-parts-count header - versionEntry.Extended[s3_constants.SeaweedFSMultipartPartsCount] = []byte(fmt.Sprintf("%d", len(completedPartNumbers))) - // Store part boundaries for GetObject with PartNumber - if partBoundariesJSON, err := json.Marshal(partBoundaries); err == nil { - versionEntry.Extended[s3_constants.SeaweedFSMultipartPartBoundaries] = partBoundariesJSON + // Create the version file in the .versions directory + if err := s3a.mkFile(versionDir, versionFileName, completionState.finalParts, func(versionEntry *filer_pb.Entry) { + if versionEntry.Extended == nil { + versionEntry.Extended = make(map[string][]byte) + } + versionEntry.Extended[s3_constants.ExtVersionIdKey] = []byte(versionId) + versionEntry.Extended[s3_constants.SeaweedFSUploadId] = []byte(*input.UploadId) + // Store parts count for x-amz-mp-parts-count header + versionEntry.Extended[s3_constants.SeaweedFSMultipartPartsCount] = []byte(fmt.Sprintf("%d", len(completedPartNumbers))) + // Store part boundaries for GetObject with PartNumber + if partBoundariesJSON, err := json.Marshal(completionState.partBoundaries); err == nil { + versionEntry.Extended[s3_constants.SeaweedFSMultipartPartBoundaries] = partBoundariesJSON + } + + // Set object owner for versioned multipart objects + if amzAccountId != "" { + versionEntry.Extended[s3_constants.ExtAmzOwnerKey] = []byte(amzAccountId) + } + + for k, v := range completionState.pentry.Extended { + if k != s3_constants.ExtMultipartObjectKey { + versionEntry.Extended[k] = v + } + } + + // Persist ETag to ensure subsequent HEAD/GET uses the same value + versionEntry.Extended[s3_constants.ExtETagKey] = []byte(completionState.multipartETag) + + // Preserve ALL SSE metadata from the first part (if any) + // SSE metadata is stored in individual parts, not the upload directory + if len(completedPartNumbers) > 0 && len(completionState.partEntries[completedPartNumbers[0]]) > 0 { + firstPartEntry := completionState.partEntries[completedPartNumbers[0]][0] + copySSEHeadersFromFirstPart(versionEntry, firstPartEntry, "versioned") + } + if completionState.pentry.Attributes != nil && completionState.pentry.Attributes.Mime != "" { + versionEntry.Attributes.Mime = completionState.pentry.Attributes.Mime + } else if completionState.mime != "" { + versionEntry.Attributes.Mime = completionState.mime + } + versionEntry.Attributes.FileSize = uint64(completionState.offset) + versionEntry.Attributes.Mtime = versionMtime + }); err != nil { + glog.Errorf("completeMultipartUpload: failed to create version %s: %v", versionId, err) + return s3err.ErrInternalError } - // Set object owner for versioned multipart objects + // Construct entry with metadata for caching in .versions directory + // Reuse versionMtime to keep list vs. HEAD timestamps aligned + // multipartETag is precomputed + versionEntryForCache := &filer_pb.Entry{ + Attributes: &filer_pb.FuseAttributes{ + FileSize: uint64(completionState.offset), + Mtime: versionMtime, + }, + Extended: map[string][]byte{ + s3_constants.ExtETagKey: []byte(completionState.multipartETag), + }, + } if amzAccountId != "" { - versionEntry.Extended[s3_constants.ExtAmzOwnerKey] = []byte(amzAccountId) + versionEntryForCache.Extended[s3_constants.ExtAmzOwnerKey] = []byte(amzAccountId) } - for k, v := range pentry.Extended { - if k != s3_constants.ExtMultipartObjectKey { - versionEntry.Extended[k] = v + // Update the .versions directory metadata to indicate this is the latest version + // Pass entry to cache its metadata for single-scan list efficiency + if err := s3a.updateLatestVersionInDirectory(*input.Bucket, *input.Key, versionId, versionFileName, versionEntryForCache); err != nil { + if rollbackErr := s3a.rollbackMultipartVersion(versionDir, versionFileName); rollbackErr != nil { + glog.Errorf("completeMultipartUpload: failed to rollback version %s for %s/%s after latest pointer update error: %v", versionId, *input.Bucket, *input.Key, rollbackErr) } + glog.Errorf("completeMultipartUpload: failed to update latest version in directory: %v", err) + return s3err.ErrInternalError } - // Persist ETag to ensure subsequent HEAD/GET uses the same value - versionEntry.Extended[s3_constants.ExtETagKey] = []byte(multipartETag) - - // Preserve ALL SSE metadata from the first part (if any) - // SSE metadata is stored in individual parts, not the upload directory - if len(completedPartNumbers) > 0 && len(partEntries[completedPartNumbers[0]]) > 0 { - firstPartEntry := partEntries[completedPartNumbers[0]][0] - copySSEHeadersFromFirstPart(versionEntry, firstPartEntry, "versioned") - } - if pentry.Attributes.Mime != "" { - versionEntry.Attributes.Mime = pentry.Attributes.Mime - } else if mime != "" { - versionEntry.Attributes.Mime = mime + // For versioned buckets, all content is stored in .versions directory + // The latest version information is tracked in the .versions directory metadata + output = &CompleteMultipartUploadResult{ + Location: aws.String(fmt.Sprintf("%s://%s/%s/%s", getRequestScheme(r), r.Host, url.PathEscape(*input.Bucket), urlPathEscape(*input.Key))), + Bucket: input.Bucket, + ETag: aws.String(etagQuote), + Key: objectKey(input.Key), + VersionId: aws.String(versionId), } - versionEntry.Attributes.FileSize = uint64(offset) - versionEntry.Attributes.Mtime = versionMtime - }) - - if err != nil { - glog.Errorf("completeMultipartUpload: failed to create version %s: %v", versionId, err) - return nil, s3err.ErrInternalError - } - - // Construct entry with metadata for caching in .versions directory - // Reuse versionMtime to keep list vs. HEAD timestamps aligned - // multipartETag is precomputed - versionEntryForCache := &filer_pb.Entry{ - Attributes: &filer_pb.FuseAttributes{ - FileSize: uint64(offset), - Mtime: versionMtime, - }, - Extended: map[string][]byte{ - s3_constants.ExtETagKey: []byte(multipartETag), - }, - } - if amzAccountId != "" { - versionEntryForCache.Extended[s3_constants.ExtAmzOwnerKey] = []byte(amzAccountId) + return s3err.ErrNone } - // Update the .versions directory metadata to indicate this is the latest version - // Pass entry to cache its metadata for single-scan list efficiency - err = s3a.updateLatestVersionInDirectory(*input.Bucket, *input.Key, versionId, versionFileName, versionEntryForCache) - if err != nil { - glog.Errorf("completeMultipartUpload: failed to update latest version in directory: %v", err) - return nil, s3err.ErrInternalError - } + if versioningState == s3_constants.VersioningSuspended { + // For suspended versioning, add "null" version ID metadata and return "null" version ID + if err := s3a.mkFile(dirName, entryName, completionState.finalParts, func(entry *filer_pb.Entry) { + if entry.Extended == nil { + entry.Extended = make(map[string][]byte) + } + entry.Extended[s3_constants.ExtVersionIdKey] = []byte("null") + entry.Extended[s3_constants.SeaweedFSUploadId] = []byte(*input.UploadId) + // Store parts count for x-amz-mp-parts-count header + entry.Extended[s3_constants.SeaweedFSMultipartPartsCount] = []byte(fmt.Sprintf("%d", len(completedPartNumbers))) + // Store part boundaries for GetObject with PartNumber + if partBoundariesJSON, jsonErr := json.Marshal(completionState.partBoundaries); jsonErr == nil { + entry.Extended[s3_constants.SeaweedFSMultipartPartBoundaries] = partBoundariesJSON + } - // For versioned buckets, all content is stored in .versions directory - // The latest version information is tracked in the .versions directory metadata - output = &CompleteMultipartUploadResult{ - Location: aws.String(fmt.Sprintf("%s://%s/%s/%s", getRequestScheme(r), r.Host, url.PathEscape(*input.Bucket), urlPathEscape(*input.Key))), - Bucket: input.Bucket, - ETag: aws.String(etagQuote), - Key: objectKey(input.Key), - VersionId: aws.String(versionId), - } - } else if vErr == nil && versioningState == s3_constants.VersioningSuspended { - // For suspended versioning, add "null" version ID metadata and return "null" version ID - err = s3a.mkFile(dirName, entryName, finalParts, func(entry *filer_pb.Entry) { - if entry.Extended == nil { - entry.Extended = make(map[string][]byte) - } - entry.Extended[s3_constants.ExtVersionIdKey] = []byte("null") - // Store parts count for x-amz-mp-parts-count header - entry.Extended[s3_constants.SeaweedFSMultipartPartsCount] = []byte(fmt.Sprintf("%d", len(completedPartNumbers))) - // Store part boundaries for GetObject with PartNumber - if partBoundariesJSON, jsonErr := json.Marshal(partBoundaries); jsonErr == nil { - entry.Extended[s3_constants.SeaweedFSMultipartPartBoundaries] = partBoundariesJSON - } + // Set object owner for suspended versioning multipart objects + amzAccountId := r.Header.Get(s3_constants.AmzAccountId) + if amzAccountId != "" { + entry.Extended[s3_constants.ExtAmzOwnerKey] = []byte(amzAccountId) + } - // Set object owner for suspended versioning multipart objects - amzAccountId := r.Header.Get(s3_constants.AmzAccountId) - if amzAccountId != "" { - entry.Extended[s3_constants.ExtAmzOwnerKey] = []byte(amzAccountId) - } + for k, v := range completionState.pentry.Extended { + if k != s3_constants.ExtMultipartObjectKey { + entry.Extended[k] = v + } + } - for k, v := range pentry.Extended { - if k != s3_constants.ExtMultipartObjectKey { - entry.Extended[k] = v + // Preserve ALL SSE metadata from the first part (if any) + // SSE metadata is stored in individual parts, not the upload directory + if len(completedPartNumbers) > 0 && len(completionState.partEntries[completedPartNumbers[0]]) > 0 { + firstPartEntry := completionState.partEntries[completedPartNumbers[0]][0] + copySSEHeadersFromFirstPart(entry, firstPartEntry, "suspended versioning") } + // Persist ETag to ensure subsequent HEAD/GET uses the same value + entry.Extended[s3_constants.ExtETagKey] = []byte(completionState.multipartETag) + if completionState.pentry.Attributes != nil && completionState.pentry.Attributes.Mime != "" { + entry.Attributes.Mime = completionState.pentry.Attributes.Mime + } else if completionState.mime != "" { + entry.Attributes.Mime = completionState.mime + } + entry.Attributes.FileSize = uint64(completionState.offset) + }); err != nil { + glog.Errorf("completeMultipartUpload: failed to create suspended versioning object: %v", err) + return s3err.ErrInternalError } - // Preserve ALL SSE metadata from the first part (if any) - // SSE metadata is stored in individual parts, not the upload directory - if len(completedPartNumbers) > 0 && len(partEntries[completedPartNumbers[0]]) > 0 { - firstPartEntry := partEntries[completedPartNumbers[0]][0] - copySSEHeadersFromFirstPart(entry, firstPartEntry, "suspended versioning") - } - // Persist ETag to ensure subsequent HEAD/GET uses the same value - entry.Extended[s3_constants.ExtETagKey] = []byte(multipartETag) - if pentry.Attributes.Mime != "" { - entry.Attributes.Mime = pentry.Attributes.Mime - } else if mime != "" { - entry.Attributes.Mime = mime + // Note: Suspended versioning should NOT return VersionId field according to AWS S3 spec + output = &CompleteMultipartUploadResult{ + Location: aws.String(fmt.Sprintf("%s://%s/%s/%s", getRequestScheme(r), r.Host, url.PathEscape(*input.Bucket), urlPathEscape(*input.Key))), + Bucket: input.Bucket, + ETag: aws.String(etagQuote), + Key: objectKey(input.Key), + // VersionId field intentionally omitted for suspended versioning } - entry.Attributes.FileSize = uint64(offset) - }) - - if err != nil { - glog.Errorf("completeMultipartUpload: failed to create suspended versioning object: %v", err) - return nil, s3err.ErrInternalError + return s3err.ErrNone } - // Note: Suspended versioning should NOT return VersionId field according to AWS S3 spec - output = &CompleteMultipartUploadResult{ - Location: aws.String(fmt.Sprintf("%s://%s/%s/%s", getRequestScheme(r), r.Host, url.PathEscape(*input.Bucket), urlPathEscape(*input.Key))), - Bucket: input.Bucket, - ETag: aws.String(etagQuote), - Key: objectKey(input.Key), - // VersionId field intentionally omitted for suspended versioning - } - } else { // For non-versioned buckets, create main object file - err = s3a.mkFile(dirName, entryName, finalParts, func(entry *filer_pb.Entry) { + if err := s3a.mkFile(dirName, entryName, completionState.finalParts, func(entry *filer_pb.Entry) { if entry.Extended == nil { entry.Extended = make(map[string][]byte) } @@ -520,7 +565,7 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl // Store parts count for x-amz-mp-parts-count header entry.Extended[s3_constants.SeaweedFSMultipartPartsCount] = []byte(fmt.Sprintf("%d", len(completedPartNumbers))) // Store part boundaries for GetObject with PartNumber - if partBoundariesJSON, err := json.Marshal(partBoundaries); err == nil { + if partBoundariesJSON, err := json.Marshal(completionState.partBoundaries); err == nil { entry.Extended[s3_constants.SeaweedFSMultipartPartBoundaries] = partBoundariesJSON } @@ -530,7 +575,7 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl entry.Extended[s3_constants.ExtAmzOwnerKey] = []byte(amzAccountId) } - for k, v := range pentry.Extended { + for k, v := range completionState.pentry.Extended { if k != s3_constants.ExtMultipartObjectKey { entry.Extended[k] = v } @@ -538,27 +583,25 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl // Preserve ALL SSE metadata from the first part (if any) // SSE metadata is stored in individual parts, not the upload directory - if len(completedPartNumbers) > 0 && len(partEntries[completedPartNumbers[0]]) > 0 { - firstPartEntry := partEntries[completedPartNumbers[0]][0] + if len(completedPartNumbers) > 0 && len(completionState.partEntries[completedPartNumbers[0]]) > 0 { + firstPartEntry := completionState.partEntries[completedPartNumbers[0]][0] copySSEHeadersFromFirstPart(entry, firstPartEntry, "non-versioned") } // Persist ETag to ensure subsequent HEAD/GET uses the same value - entry.Extended[s3_constants.ExtETagKey] = []byte(multipartETag) - if pentry.Attributes.Mime != "" { - entry.Attributes.Mime = pentry.Attributes.Mime - } else if mime != "" { - entry.Attributes.Mime = mime + entry.Extended[s3_constants.ExtETagKey] = []byte(completionState.multipartETag) + if completionState.pentry.Attributes != nil && completionState.pentry.Attributes.Mime != "" { + entry.Attributes.Mime = completionState.pentry.Attributes.Mime + } else if completionState.mime != "" { + entry.Attributes.Mime = completionState.mime } - entry.Attributes.FileSize = uint64(offset) + entry.Attributes.FileSize = uint64(completionState.offset) // Set TTL-based S3 expiry (modification time) - if entityWithTtl { + if completionState.entityWithTtl { entry.Extended[s3_constants.SeaweedFSExpiresS3] = []byte("true") } - }) - - if err != nil { + }); err != nil { glog.Errorf("completeMultipartUpload %s/%s error: %v", dirName, entryName, err) - return nil, s3err.ErrInternalError + return s3err.ErrInternalError } // For non-versioned buckets, return response without VersionId @@ -568,21 +611,30 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl ETag: aws.String(etagQuote), Key: objectKey(input.Key), } + return s3err.ErrNone + }) + if finalizeCode != s3err.ErrNone { + return nil, finalizeCode } - for _, deleteEntry := range deleteEntries { - //delete unused part data - if err = s3a.rm(uploadDirectory, deleteEntry.Name, true, true); err != nil { - glog.Warningf("completeMultipartUpload cleanup %s upload %s unused %s : %v", *input.Bucket, *input.UploadId, deleteEntry.Name, err) + if completionState != nil { + for _, deleteEntry := range completionState.deleteEntries { + if err := s3a.rm(uploadDirectory, deleteEntry.Name, true, true); err != nil { + glog.Warningf("completeMultipartUpload cleanup %s upload %s unused %s : %v", *input.Bucket, *input.UploadId, deleteEntry.Name, err) + } + } + if err := s3a.rm(s3a.genUploadsFolder(*input.Bucket), *input.UploadId, false, true); err != nil { + glog.V(1).Infof("completeMultipartUpload cleanup %s upload %s: %v", *input.Bucket, *input.UploadId, err) } - } - if err = s3a.rm(s3a.genUploadsFolder(*input.Bucket), *input.UploadId, false, true); err != nil { - glog.V(1).Infof("completeMultipartUpload cleanup %s upload %s: %v", *input.Bucket, *input.UploadId, err) } return } +func (s3a *S3ApiServer) rollbackMultipartVersion(versionDir, versionFileName string) error { + return s3a.rmObject(versionDir, versionFileName, true, false) +} + func (s3a *S3ApiServer) getEntryNameAndDir(input *s3.CompleteMultipartUploadInput) (string, string) { entryName := path.Base(*input.Key) dirName := path.Dir(*input.Key) diff --git a/weed/s3api/filer_multipart_test.go b/weed/s3api/filer_multipart_test.go index 451778117..92ecbeba9 100644 --- a/weed/s3api/filer_multipart_test.go +++ b/weed/s3api/filer_multipart_test.go @@ -2,6 +2,7 @@ package s3api import ( "encoding/hex" + "net/http" "testing" "time" @@ -54,6 +55,42 @@ func TestListPartsResult(t *testing.T) { } +func TestCompleteMultipartResultIncludesVersionId(t *testing.T) { + r := &http.Request{Host: "localhost", Header: make(http.Header)} + input := &s3.CompleteMultipartUploadInput{ + Bucket: aws.String("example-bucket"), + Key: aws.String("example-object"), + } + + entry := &filer_pb.Entry{ + Extended: map[string][]byte{ + s3_constants.ExtVersionIdKey: []byte("version-123"), + }, + } + + result := completeMultipartResult(r, input, "\"etag-value\"", entry) + if assert.NotNil(t, result.VersionId) { + assert.Equal(t, "version-123", *result.VersionId) + } +} + +func TestCompleteMultipartResultOmitsNullVersionId(t *testing.T) { + r := &http.Request{Host: "localhost", Header: make(http.Header)} + input := &s3.CompleteMultipartUploadInput{ + Bucket: aws.String("example-bucket"), + Key: aws.String("example-object"), + } + + entry := &filer_pb.Entry{ + Extended: map[string][]byte{ + s3_constants.ExtVersionIdKey: []byte("null"), + }, + } + + result := completeMultipartResult(r, input, "\"etag-value\"", entry) + assert.Nil(t, result.VersionId) +} + func Test_parsePartNumber(t *testing.T) { tests := []struct { name string @@ -190,3 +227,41 @@ func TestValidateCompletePartETag(t *testing.T) { assert.True(t, invalid) }) } + +func TestCompleteMultipartUploadRejectsOutOfOrderParts(t *testing.T) { + s3a := NewS3ApiServerForTest() + input := &s3.CompleteMultipartUploadInput{ + Bucket: aws.String("bucket"), + Key: aws.String("object"), + UploadId: aws.String("upload"), + } + parts := &CompleteMultipartUpload{ + Parts: []CompletedPart{ + {PartNumber: 2, ETag: "\"etag-2\""}, + {PartNumber: 1, ETag: "\"etag-1\""}, + }, + } + + result, errCode := s3a.completeMultipartUpload(&http.Request{Header: make(http.Header)}, input, parts) + assert.Nil(t, result) + assert.Equal(t, s3err.ErrInvalidPartOrder, errCode) +} + +func TestCompleteMultipartUploadAllowsDuplicatePartNumbers(t *testing.T) { + s3a := NewS3ApiServerForTest() + input := &s3.CompleteMultipartUploadInput{ + Bucket: aws.String("bucket"), + Key: aws.String("object"), + UploadId: aws.String("upload"), + } + parts := &CompleteMultipartUpload{ + Parts: []CompletedPart{ + {PartNumber: 1, ETag: "\"etag-older\""}, + {PartNumber: 1, ETag: "\"etag-newer\""}, + }, + } + + result, errCode := s3a.completeMultipartUpload(&http.Request{Header: make(http.Header)}, input, parts) + assert.Nil(t, result) + assert.Equal(t, s3err.ErrNoSuchUpload, errCode) +} diff --git a/weed/s3api/s3api_conditional_headers_test.go b/weed/s3api/s3api_conditional_headers_test.go index 20c92af0e..9cd220603 100644 --- a/weed/s3api/s3api_conditional_headers_test.go +++ b/weed/s3api/s3api_conditional_headers_test.go @@ -778,6 +778,7 @@ func NewS3ApiServerForTest() *S3ApiServer { option: &S3ApiServerOption{ BucketsPath: "/buckets", }, + bucketConfigCache: NewBucketConfigCache(60 * time.Minute), } } @@ -928,3 +929,56 @@ func TestConditionalHeadersMultipartUpload(t *testing.T) { } }) } + +func TestConditionalHeadersTreatDeleteMarkerAsMissing(t *testing.T) { + bucket := "test-bucket" + object := "/deleted-object" + deleteMarkerEntry := &filer_pb.Entry{ + Name: "deleted-object", + Extended: map[string][]byte{ + s3_constants.ExtDeleteMarkerKey: []byte("true"), + }, + Attributes: &filer_pb.FuseAttributes{ + Mtime: time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC).Unix(), + }, + } + + t.Run("WriteIfNoneMatchAsteriskSucceeds", func(t *testing.T) { + getter := createMockEntryGetter(deleteMarkerEntry) + req := createTestPutRequest(bucket, object, "new content") + req.Header.Set(s3_constants.IfNoneMatch, "*") + + s3a := NewS3ApiServerForTest() + errCode := s3a.checkConditionalHeadersWithGetter(getter, req, bucket, object) + if errCode != s3err.ErrNone { + t.Fatalf("expected ErrNone for delete marker with If-None-Match=*, got %v", errCode) + } + }) + + t.Run("WriteIfMatchAsteriskFails", func(t *testing.T) { + getter := createMockEntryGetter(deleteMarkerEntry) + req := createTestPutRequest(bucket, object, "new content") + req.Header.Set(s3_constants.IfMatch, "*") + + s3a := NewS3ApiServerForTest() + errCode := s3a.checkConditionalHeadersWithGetter(getter, req, bucket, object) + if errCode != s3err.ErrPreconditionFailed { + t.Fatalf("expected ErrPreconditionFailed for delete marker with If-Match=*, got %v", errCode) + } + }) + + t.Run("ReadIfMatchAsteriskFails", func(t *testing.T) { + getter := createMockEntryGetter(deleteMarkerEntry) + req := &http.Request{Method: http.MethodGet, Header: make(http.Header)} + req.Header.Set(s3_constants.IfMatch, "*") + + s3a := NewS3ApiServerForTest() + result := s3a.checkConditionalHeadersForReadsWithGetter(getter, req, bucket, object) + if result.ErrorCode != s3err.ErrPreconditionFailed { + t.Fatalf("expected ErrPreconditionFailed for read against delete marker with If-Match=*, got %v", result.ErrorCode) + } + if result.Entry != nil { + t.Fatalf("expected no entry to be returned for delete marker, got %#v", result.Entry) + } + }) +} diff --git a/weed/s3api/s3api_object_handlers_copy.go b/weed/s3api/s3api_object_handlers_copy.go index f4e446ec1..45c7b7a50 100644 --- a/weed/s3api/s3api_object_handlers_copy.go +++ b/weed/s3api/s3api_object_handlers_copy.go @@ -5,6 +5,7 @@ import ( "context" "crypto/rand" "encoding/base64" + "errors" "fmt" "io" "net/http" @@ -24,6 +25,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/security" "github.com/seaweedfs/seaweedfs/weed/util" util_http "github.com/seaweedfs/seaweedfs/weed/util/http" + "google.golang.org/protobuf/proto" ) const ( @@ -81,33 +83,6 @@ func (s3a *S3ApiServer) CopyObjectHandler(w http.ResponseWriter, r *http.Request replaceMeta, replaceTagging := replaceDirective(r.Header) - if (srcBucket == dstBucket && srcObject == dstObject || cpSrcPath == "") && (replaceMeta || replaceTagging) { - fullPath := util.FullPath(fmt.Sprintf("%s/%s", s3a.bucketDir(dstBucket), dstObject)) - dir, name := fullPath.DirAndName() - entry, err := s3a.getEntry(dir, name) - if err != nil || entry.IsDirectory { - s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource) - return - } - entry.Extended, err = processMetadataBytes(r.Header, entry.Extended, replaceMeta, replaceTagging) - entry.Attributes.Mtime = t.Unix() - if err != nil { - glog.Errorf("CopyObjectHandler ValidateTags error %s: %v", r.URL, err) - s3err.WriteErrorResponse(w, r, s3err.ErrInvalidTag) - return - } - err = s3a.touch(dir, name, entry) - if err != nil { - s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource) - return - } - writeSuccessResponseXML(w, r, CopyObjectResult{ - ETag: filer.ETag(entry), - LastModified: t, - }) - return - } - // If source object is empty or bucket is empty, reply back invalid copy source. if srcObject == "" || srcBucket == "" { s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource) @@ -122,38 +97,14 @@ func (s3a *S3ApiServer) CopyObjectHandler(w http.ResponseWriter, r *http.Request return } - // Get the source entry with version awareness based on versioning state - var entry *filer_pb.Entry - if srcVersionId != "" { - // Specific version requested - always use version-aware retrieval - entry, err = s3a.getSpecificObjectVersion(srcBucket, srcObject, srcVersionId) - } else if srcVersioningState == s3_constants.VersioningEnabled { - // Versioning enabled - get latest version from .versions directory - entry, err = s3a.getLatestObjectVersion(srcBucket, srcObject) - } else if srcVersioningState == s3_constants.VersioningSuspended { - // Versioning suspended - current object is stored as regular file ("null" version) - // Try regular file first, fall back to latest version if needed - srcPath := util.FullPath(fmt.Sprintf("%s/%s", s3a.bucketDir(srcBucket), srcObject)) - dir, name := srcPath.DirAndName() - entry, err = s3a.getEntry(dir, name) - if err != nil { - // If regular file doesn't exist, try latest version as fallback - glog.V(2).Infof("CopyObject: regular file not found for suspended versioning, trying latest version") - entry, err = s3a.getLatestObjectVersion(srcBucket, srcObject) - } - } else { - // No versioning configured - use regular retrieval - srcPath := util.FullPath(fmt.Sprintf("%s/%s", s3a.bucketDir(srcBucket), srcObject)) - dir, name := srcPath.DirAndName() - entry, err = s3a.getEntry(dir, name) - } - + entry, err := s3a.resolveCopySourceEntry(srcBucket, srcObject, srcVersionId, srcVersioningState) if err != nil || entry.IsDirectory { s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource) return } - if srcBucket == dstBucket && srcObject == dstObject { + sameDestination := srcBucket == dstBucket && srcObject == dstObject + if sameDestination && !(replaceMeta || replaceTagging) { s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopyDest) return } @@ -163,6 +114,10 @@ func (s3a *S3ApiServer) CopyObjectHandler(w http.ResponseWriter, r *http.Request s3err.WriteErrorResponse(w, r, err) return } + if errCode := s3a.checkConditionalHeaders(r, dstBucket, dstObject); errCode != s3err.ErrNone { + s3err.WriteErrorResponse(w, r, errCode) + return + } // Validate encryption parameters if err := ValidateCopyEncryption(entry.Extended, r.Header); err != nil { @@ -172,6 +127,62 @@ func (s3a *S3ApiServer) CopyObjectHandler(w http.ResponseWriter, r *http.Request return } + dstVersioningState, err := s3a.getVersioningState(dstBucket) + if err != nil { + glog.Errorf("Error checking versioning state for destination bucket %s: %v", dstBucket, err) + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) + return + } + + if sameDestination && (replaceMeta || replaceTagging) && s3a.canUseMetadataOnlySelfCopy(entry, r, dstBucket, dstObject) { + var dstVersionId string + var etag string + updateCode := s3a.withObjectWriteLock(dstBucket, dstObject, func() s3err.ErrorCode { + return s3a.checkConditionalHeaders(r, dstBucket, dstObject) + }, func() s3err.ErrorCode { + currentEntry, currentErr := s3a.resolveCopySourceEntry(srcBucket, srcObject, srcVersionId, srcVersioningState) + if currentErr != nil || currentEntry.IsDirectory { + return s3err.ErrInvalidCopySource + } + if errCode := s3a.validateConditionalCopyHeaders(r, currentEntry); errCode != s3err.ErrNone { + return errCode + } + + updatedEntry := cloneProtoEntry(currentEntry) + updatedMetadata, metadataErr := processMetadataBytes(r.Header, updatedEntry.Extended, replaceMeta, replaceTagging) + currentErr = metadataErr + if currentErr != nil { + glog.Errorf("CopyObjectHandler ValidateTags error %s: %v", r.URL, currentErr) + return s3err.ErrInvalidTag + } + updatedEntry.Extended = mergeCopyMetadata(updatedEntry.Extended, updatedMetadata) + if updatedEntry.Attributes == nil { + updatedEntry.Attributes = &filer_pb.FuseAttributes{} + } + updatedEntry.Attributes.Mtime = t.Unix() + + dstVersionId, etag, currentErr = s3a.finalizeCopyDestination(dstBucket, dstObject, dstVersioningState, updatedEntry) + if currentErr != nil { + return filerErrorToS3Error(currentErr) + } + return s3err.ErrNone + }) + if updateCode != s3err.ErrNone { + s3err.WriteErrorResponse(w, r, updateCode) + return + } + + if dstVersionId != "" { + w.Header().Set("x-amz-version-id", dstVersionId) + } + setEtag(w, etag) + writeSuccessResponseXML(w, r, CopyObjectResult{ + ETag: etag, + LastModified: t, + }) + return + } + // Determine whether we can reuse the source MD5 (direct copy without encryption changes). canReuseSourceMd5 := false var sourceMd5 []byte @@ -302,118 +313,233 @@ func (s3a *S3ApiServer) CopyObjectHandler(w http.ResponseWriter, r *http.Request } } - // Check if destination bucket has versioning enabled - // Only create versions if versioning is explicitly "Enabled", not "Suspended" or unconfigured - dstVersioningState, err := s3a.getVersioningState(dstBucket) - if err != nil { - glog.Errorf("Error checking versioning state for destination bucket %s: %v", dstBucket, err) - s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) + var dstVersionId string + var etag string + + finalizeCode := s3a.withObjectWriteLock(dstBucket, dstObject, func() s3err.ErrorCode { + return s3a.checkConditionalHeaders(r, dstBucket, dstObject) + }, func() s3err.ErrorCode { + var finalizeErr error + dstVersionId, etag, finalizeErr = s3a.finalizeCopyDestination(dstBucket, dstObject, dstVersioningState, dstEntry) + if finalizeErr != nil { + return filerErrorToS3Error(finalizeErr) + } + return s3err.ErrNone + }) + if finalizeCode != s3err.ErrNone { + s3err.WriteErrorResponse(w, r, finalizeCode) return } - var dstVersionId string - var etag string + if dstVersionId != "" { + w.Header().Set("x-amz-version-id", dstVersionId) + } - if shouldCreateVersionForCopy(dstVersioningState) { - // For versioned destination, create a new version using appropriate format - dstVersionId = s3a.generateVersionIdForObject(dstBucket, dstObject) - glog.V(2).Infof("CopyObjectHandler: creating version %s for destination %s/%s", dstVersionId, dstBucket, dstObject) + setEtag(w, etag) - // Add version metadata to the entry - if dstEntry.Extended == nil { - dstEntry.Extended = make(map[string][]byte) - } - dstEntry.Extended[s3_constants.ExtVersionIdKey] = []byte(dstVersionId) + response := CopyObjectResult{ + ETag: etag, + LastModified: t, + } + + writeSuccessResponseXML(w, r, response) + +} - // Calculate ETag for versioning - filerEntry := &filer.Entry{ - FullPath: util.FullPath(fmt.Sprintf("%s/%s", s3a.bucketDir(dstBucket), dstObject)), - Attr: filer.Attr{ - FileSize: dstEntry.Attributes.FileSize, - Mtime: time.Unix(dstEntry.Attributes.Mtime, 0), - Crtime: time.Unix(dstEntry.Attributes.Crtime, 0), - Mime: dstEntry.Attributes.Mime, - }, - Chunks: dstEntry.Chunks, +func cloneProtoEntry(entry *filer_pb.Entry) *filer_pb.Entry { + if entry == nil { + return nil + } + return proto.Clone(entry).(*filer_pb.Entry) +} + +func copyEntryETag(fullPath util.FullPath, entry *filer_pb.Entry) string { + if entry != nil && entry.Extended != nil { + if etag, ok := entry.Extended[s3_constants.ExtETagKey]; ok && len(etag) > 0 { + return string(etag) } - etag = filer.ETagEntry(filerEntry) - if !strings.HasPrefix(etag, "\"") { - etag = "\"" + etag + "\"" + } + attr := filer.Attr{} + if entry.Attributes != nil { + attr = filer.Attr{ + FileSize: entry.Attributes.FileSize, + Mtime: time.Unix(entry.Attributes.Mtime, 0), + Crtime: time.Unix(entry.Attributes.Crtime, 0), + Mime: entry.Attributes.Mime, + Md5: entry.Attributes.Md5, } + } + return filer.ETagEntry(&filer.Entry{ + FullPath: fullPath, + Attr: attr, + Chunks: entry.Chunks, + Content: entry.Content, + Remote: entry.RemoteEntry, + }) +} + +func copyEntryToTarget(dst, src *filer_pb.Entry) { + dst.IsDirectory = src.IsDirectory + dst.Attributes = src.Attributes + dst.Extended = src.Extended + dst.Chunks = src.Chunks + dst.Content = src.Content + dst.RemoteEntry = src.RemoteEntry + dst.HardLinkId = src.HardLinkId + dst.HardLinkCounter = src.HardLinkCounter + dst.Quota = src.Quota + dst.WormEnforcedAtTsNs = src.WormEnforcedAtTsNs +} + +func (s3a *S3ApiServer) finalizeCopyDestination(dstBucket, dstObject, dstVersioningState string, dstEntry *filer_pb.Entry) (versionId string, etag string, err error) { + normalizedObject := s3_constants.NormalizeObjectKey(dstObject) + dstPath := util.FullPath(fmt.Sprintf("%s/%s", s3a.bucketDir(dstBucket), normalizedObject)) + dstDir, dstName := dstPath.DirAndName() + + if dstEntry.Attributes == nil { + dstEntry.Attributes = &filer_pb.FuseAttributes{} + } + if dstEntry.Extended == nil { + dstEntry.Extended = make(map[string][]byte) + } + + etag = copyEntryETag(dstPath, dstEntry) + + switch dstVersioningState { + case s3_constants.VersioningEnabled: + versionId = s3a.generateVersionIdForObject(dstBucket, normalizedObject) + glog.V(2).Infof("CopyObjectHandler: creating version %s for destination %s/%s", versionId, dstBucket, normalizedObject) + + dstEntry.Extended[s3_constants.ExtVersionIdKey] = []byte(versionId) dstEntry.Extended[s3_constants.ExtETagKey] = []byte(etag) - // Create version file - versionFileName := s3a.getVersionFileName(dstVersionId) - versionObjectPath := dstObject + ".versions/" + versionFileName + versionFileName := s3a.getVersionFileName(versionId) + versionObjectPath := normalizedObject + s3_constants.VersionsFolder + "/" + versionFileName bucketDir := s3a.bucketDir(dstBucket) - if err := s3a.mkFile(bucketDir, versionObjectPath, dstEntry.Chunks, func(entry *filer_pb.Entry) { - entry.Attributes = dstEntry.Attributes - entry.Extended = dstEntry.Extended + if err = s3a.mkFile(bucketDir, versionObjectPath, dstEntry.Chunks, func(entry *filer_pb.Entry) { + copyEntryToTarget(entry, dstEntry) }); err != nil { - s3err.WriteErrorResponse(w, r, filerErrorToS3Error(err)) - return + return "", "", err } - // Update the .versions directory metadata - // Pass dstEntry to cache its metadata for single-scan list efficiency - err = s3a.updateLatestVersionInDirectory(dstBucket, dstObject, dstVersionId, versionFileName, dstEntry) - if err != nil { + if err = s3a.updateLatestVersionInDirectory(dstBucket, normalizedObject, versionId, versionFileName, dstEntry); err != nil { + if rollbackErr := s3a.rollbackCopyVersion(bucketDir, versionObjectPath); rollbackErr != nil { + glog.Errorf("CopyObjectHandler: failed to rollback version %s for %s/%s after latest pointer update error: %v", versionId, dstBucket, normalizedObject, rollbackErr) + } glog.Errorf("CopyObjectHandler: failed to update latest version in directory: %v", err) - s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) - return + return "", "", fmt.Errorf("update latest version metadata: %w", err) } - // Set version ID in response header - w.Header().Set("x-amz-version-id", dstVersionId) - } else { - // For non-versioned destination, use regular copy - // Remove any versioning-related metadata from source that shouldn't carry over + return versionId, etag, nil + + case s3_constants.VersioningSuspended: cleanupVersioningMetadata(dstEntry.Extended) + dstEntry.Extended[s3_constants.ExtVersionIdKey] = []byte("null") + dstEntry.Extended[s3_constants.ExtETagKey] = []byte(etag) - dstPath := util.FullPath(fmt.Sprintf("%s/%s", s3a.bucketDir(dstBucket), dstObject)) - dstDir, dstName := dstPath.DirAndName() + if err = s3a.mkFile(dstDir, dstName, dstEntry.Chunks, func(entry *filer_pb.Entry) { + copyEntryToTarget(entry, dstEntry) + }); err != nil { + return "", "", err + } - // Check if destination exists and remove it first (S3 copy overwrites) - if exists, _ := s3a.exists(dstDir, dstName, false); exists { - if err := s3a.rmObject(dstDir, dstName, false, false); err != nil { - s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) - return - } + if err = s3a.updateIsLatestFlagsForSuspendedVersioning(dstBucket, normalizedObject); err != nil { + glog.Warningf("CopyObjectHandler: failed to update suspended version latest flags for %s/%s: %v", dstBucket, normalizedObject, err) } - // Create the new file - if err := s3a.mkFile(dstDir, dstName, dstEntry.Chunks, func(entry *filer_pb.Entry) { - entry.Attributes = dstEntry.Attributes - entry.Extended = dstEntry.Extended + return "", etag, nil + + default: + cleanupVersioningMetadata(dstEntry.Extended) + dstEntry.Extended[s3_constants.ExtETagKey] = []byte(etag) + + if err = s3a.mkFile(dstDir, dstName, dstEntry.Chunks, func(entry *filer_pb.Entry) { + copyEntryToTarget(entry, dstEntry) }); err != nil { - s3err.WriteErrorResponse(w, r, filerErrorToS3Error(err)) - return + return "", "", err } - // Calculate ETag - filerEntry := &filer.Entry{ - FullPath: dstPath, - Attr: filer.Attr{ - FileSize: dstEntry.Attributes.FileSize, - Mtime: time.Unix(dstEntry.Attributes.Mtime, 0), - Crtime: time.Unix(dstEntry.Attributes.Crtime, 0), - Mime: dstEntry.Attributes.Mime, - }, - Chunks: dstEntry.Chunks, - } - etag = filer.ETagEntry(filerEntry) + return "", etag, nil } +} - setEtag(w, etag) +func (s3a *S3ApiServer) rollbackCopyVersion(bucketDir, versionObjectPath string) error { + versionPath := util.FullPath(fmt.Sprintf("%s/%s", bucketDir, versionObjectPath)) + versionDir, versionName := versionPath.DirAndName() + return s3a.rmObject(versionDir, versionName, true, false) +} - response := CopyObjectResult{ - ETag: etag, - LastModified: t, +func (s3a *S3ApiServer) resolveCopySourceEntry(bucket, object, versionId, versioningState string) (*filer_pb.Entry, error) { + normalizedObject := s3_constants.NormalizeObjectKey(object) + + if versionId != "" { + return s3a.getSpecificObjectVersion(bucket, normalizedObject, versionId) } - writeSuccessResponseXML(w, r, response) + switch versioningState { + case s3_constants.VersioningEnabled: + return s3a.getLatestObjectVersion(bucket, normalizedObject) + case s3_constants.VersioningSuspended: + return s3a.resolveSuspendedCopySourceEntry(bucket, normalizedObject, "CopyObject") + default: + srcPath := util.FullPath(fmt.Sprintf("%s/%s", s3a.bucketDir(bucket), normalizedObject)) + dir, name := srcPath.DirAndName() + return s3a.getEntry(dir, name) + } +} +func mergeCopyMetadata(existing, updated map[string][]byte) map[string][]byte { + merged := make(map[string][]byte, len(existing)+len(updated)) + for k, v := range existing { + merged[k] = v + } + for k := range merged { + if isManagedCopyMetadataKey(k) { + delete(merged, k) + } + } + for k, v := range updated { + merged[k] = v + } + return merged +} + +func isManagedCopyMetadataKey(key string) bool { + switch key { + case s3_constants.AmzStorageClass, + s3_constants.AmzServerSideEncryption, + s3_constants.AmzServerSideEncryptionAwsKmsKeyId, + s3_constants.AmzServerSideEncryptionContext, + s3_constants.AmzServerSideEncryptionBucketKeyEnabled, + s3_constants.AmzServerSideEncryptionCustomerAlgorithm, + s3_constants.AmzServerSideEncryptionCustomerKeyMD5, + s3_constants.AmzTagCount: + return true + } + return strings.HasPrefix(key, s3_constants.AmzUserMetaPrefix) || strings.HasPrefix(key, s3_constants.AmzObjectTagging) +} + +func (s3a *S3ApiServer) resolveSuspendedCopySourceEntry(bucket, normalizedObject, operation string) (*filer_pb.Entry, error) { + srcPath := util.FullPath(fmt.Sprintf("%s/%s", s3a.bucketDir(bucket), normalizedObject)) + dir, name := srcPath.DirAndName() + entry, err := s3a.getEntry(dir, name) + if err == nil { + return entry, nil + } + if !errors.Is(err, filer_pb.ErrNotFound) { + return nil, err + } + glog.V(2).Infof("%s: regular file not found for suspended versioning, trying latest version", operation) + return s3a.getLatestObjectVersion(bucket, normalizedObject) +} + +func (s3a *S3ApiServer) canUseMetadataOnlySelfCopy(entry *filer_pb.Entry, r *http.Request, bucket, object string) bool { + srcPath := fmt.Sprintf("%s/%s", s3a.bucketDir(bucket), s3_constants.NormalizeObjectKey(object)) + state := DetectEncryptionStateWithEntry(entry, r, srcPath, srcPath) + s3a.applyCopyBucketDefaultEncryption(state, bucket) + strategy, err := DetermineUnifiedCopyStrategy(state, entry.Extended, r) + return err == nil && strategy == CopyStrategyDirect } func pathToBucketAndObject(path string) (bucket, object string) { @@ -553,14 +679,7 @@ func (s3a *S3ApiServer) CopyObjectPartHandler(w http.ResponseWriter, r *http.Req } else if srcVersioningState == s3_constants.VersioningSuspended { // Versioning suspended - current object is stored as regular file ("null" version) // Try regular file first, fall back to latest version if needed - srcPath := util.FullPath(fmt.Sprintf("%s/%s", s3a.bucketDir(srcBucket), srcObject)) - dir, name := srcPath.DirAndName() - entry, err = s3a.getEntry(dir, name) - if err != nil { - // If regular file doesn't exist, try latest version as fallback - glog.V(2).Infof("CopyObjectPart: regular file not found for suspended versioning, trying latest version") - entry, err = s3a.getLatestObjectVersion(srcBucket, srcObject) - } + entry, err = s3a.resolveSuspendedCopySourceEntry(srcBucket, s3_constants.NormalizeObjectKey(srcObject), "CopyObjectPart") } else { // No versioning configured - use regular retrieval srcPath := util.FullPath(fmt.Sprintf("%s/%s", s3a.bucketDir(srcBucket), srcObject)) diff --git a/weed/s3api/s3api_object_handlers_copy_test.go b/weed/s3api/s3api_object_handlers_copy_test.go index 369d2aac9..93d1475cd 100644 --- a/weed/s3api/s3api_object_handlers_copy_test.go +++ b/weed/s3api/s3api_object_handlers_copy_test.go @@ -8,7 +8,9 @@ import ( "strings" "testing" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" + "github.com/seaweedfs/seaweedfs/weed/util" ) type H map[string]string @@ -396,6 +398,58 @@ func TestProcessMetadataBytes(t *testing.T) { } } +func TestMergeCopyMetadataPreservesInternalFields(t *testing.T) { + existing := map[string][]byte{ + s3_constants.SeaweedFSSSEKMSKey: []byte("kms-secret"), + s3_constants.SeaweedFSSSEIV: []byte("iv"), + "X-Amz-Meta-Old": []byte("old"), + "X-Amz-Tagging-Old": []byte("old-tag"), + s3_constants.AmzStorageClass: []byte("STANDARD"), + } + updated := map[string][]byte{ + "X-Amz-Meta-New": []byte("new"), + "X-Amz-Tagging-New": []byte("new-tag"), + s3_constants.AmzStorageClass: []byte("GLACIER"), + } + + merged := mergeCopyMetadata(existing, updated) + + if got := string(merged[s3_constants.SeaweedFSSSEKMSKey]); got != "kms-secret" { + t.Fatalf("expected internal KMS key to be preserved, got %q", got) + } + if got := string(merged[s3_constants.SeaweedFSSSEIV]); got != "iv" { + t.Fatalf("expected internal IV to be preserved, got %q", got) + } + if _, ok := merged["X-Amz-Meta-Old"]; ok { + t.Fatalf("expected stale user metadata to be removed, got %#v", merged) + } + if _, ok := merged["X-Amz-Tagging-Old"]; ok { + t.Fatalf("expected stale tagging metadata to be removed, got %#v", merged) + } + if got := string(merged["X-Amz-Meta-New"]); got != "new" { + t.Fatalf("expected replacement user metadata to be applied, got %q", got) + } + if got := string(merged["X-Amz-Tagging-New"]); got != "new-tag" { + t.Fatalf("expected replacement tagging metadata to be applied, got %q", got) + } + if got := string(merged[s3_constants.AmzStorageClass]); got != "GLACIER" { + t.Fatalf("expected storage class to be updated, got %q", got) + } +} + +func TestCopyEntryETagPrefersStoredETag(t *testing.T) { + entry := &filer_pb.Entry{ + Extended: map[string][]byte{ + s3_constants.ExtETagKey: []byte("\"stored-etag\""), + }, + Attributes: &filer_pb.FuseAttributes{}, + } + + if got := copyEntryETag(util.FullPath("/buckets/test-bucket/object.txt"), entry); got != "\"stored-etag\"" { + t.Fatalf("copyEntryETag() = %q, want %q", got, "\"stored-etag\"") + } +} + func fmtTagging(maps ...map[string]string) { for _, m := range maps { if tagging := m[s3_constants.AmzObjectTagging]; len(tagging) > 0 { @@ -556,9 +610,8 @@ func TestCleanupVersioningMetadata(t *testing.T) { } } -// TestCopyVersioningIntegration validates the interaction between -// shouldCreateVersionForCopy and cleanupVersioningMetadata functions. -// This integration test ensures the complete fix for issue #7505. +// TestCopyVersioningIntegration validates the metadata shaping that happens +// before copy finalization for each destination versioning mode. func TestCopyVersioningIntegration(t *testing.T) { testCases := []struct { name string @@ -581,7 +634,7 @@ func TestCopyVersioningIntegration(t *testing.T) { }, }, { - name: "SuspendedCleansMetadata", + name: "SuspendedCleansVersionMetadataBeforeFinalize", versioningState: s3_constants.VersioningSuspended, sourceMetadata: map[string][]byte{ s3_constants.ExtVersionIdKey: []byte("v123"), diff --git a/weed/s3api/s3api_object_handlers_delete.go b/weed/s3api/s3api_object_handlers_delete.go index e50de1e7f..e99700211 100644 --- a/weed/s3api/s3api_object_handlers_delete.go +++ b/weed/s3api/s3api_object_handlers_delete.go @@ -2,6 +2,7 @@ package s3api import ( "encoding/xml" + "errors" "io" "net/http" "strings" @@ -18,6 +19,159 @@ const ( deleteMultipleObjectsLimit = 1000 ) +type deleteMutationResult struct { + versionId string + deleteMarker bool +} + +func deleteErrorFromCode(code s3err.ErrorCode, key, versionId string) DeleteError { + apiErr := s3err.GetAPIError(code) + return DeleteError{ + Code: apiErr.Code, + Message: apiErr.Description, + Key: key, + VersionId: versionId, + } +} + +// isMissingDeleteConditionTarget normalizes missing-target detection for conditional deletes. +// Prefer errors.Is(err, filer_pb.ErrNotFound) and errors.Is(err, ErrDeleteMarker); keep the +// string-based fallback only as a defensive bridge for filer paths that still return plain text. +func isMissingDeleteConditionTarget(err error) bool { + if err == nil { + return false + } + if errors.Is(err, filer_pb.ErrNotFound) || errors.Is(err, ErrDeleteMarker) { + return true + } + + lowerErr := strings.ToLower(err.Error()) + return strings.Contains(lowerErr, "not found") +} + +func (s3a *S3ApiServer) resolveDeleteConditionalEntry(bucket, object, versionId, versioningState string) (*filer_pb.Entry, error) { + normalizedObject := s3_constants.NormalizeObjectKey(object) + bucketDir := s3a.bucketDir(bucket) + + if versionId != "" { + if versionId == "null" { + return s3a.getEntry(bucketDir, normalizedObject) + } + return s3a.getEntry(s3a.getVersionedObjectDir(bucket, normalizedObject), s3a.getVersionFileName(versionId)) + } + + switch versioningState { + case s3_constants.VersioningEnabled: + entry, err := s3a.getLatestObjectVersion(bucket, normalizedObject) + if err != nil { + return nil, err + } + return normalizeConditionalTargetEntry(entry), nil + default: + entry, err := s3a.resolveObjectEntry(bucket, normalizedObject) + if err != nil { + return nil, err + } + return normalizeConditionalTargetEntry(entry), nil + } +} + +func (s3a *S3ApiServer) validateDeleteIfMatch(entry *filer_pb.Entry, ifMatch string, missingCode s3err.ErrorCode) s3err.ErrorCode { + if ifMatch == "" { + return s3err.ErrNone + } + if entry == nil { + return missingCode + } + if ifMatch == "*" { + return s3err.ErrNone + } + if !s3a.etagMatches(ifMatch, s3a.getObjectETag(entry)) { + return s3err.ErrPreconditionFailed + } + return s3err.ErrNone +} + +func (s3a *S3ApiServer) checkDeleteIfMatch(bucket, object, versionId, versioningState, ifMatch string, missingCode s3err.ErrorCode) s3err.ErrorCode { + if ifMatch == "" { + return s3err.ErrNone + } + + entry, err := s3a.resolveDeleteConditionalEntry(bucket, object, versionId, versioningState) + if err != nil { + if isMissingDeleteConditionTarget(err) { + return missingCode + } + glog.Errorf("checkDeleteIfMatch: failed to resolve %s/%s (versionId=%s): %v", bucket, object, versionId, err) + return s3err.ErrInternalError + } + + return s3a.validateDeleteIfMatch(entry, ifMatch, missingCode) +} + +func (s3a *S3ApiServer) deleteVersionedObject(r *http.Request, bucket, object, versionId, versioningState string) (deleteMutationResult, s3err.ErrorCode) { + var result deleteMutationResult + + switch { + case versionId != "": + versionEntry, versionLookupErr := s3a.getSpecificObjectVersion(bucket, object, versionId) + if versionLookupErr == nil && versionEntry != nil && versionEntry.Extended != nil { + if deleteMarker, ok := versionEntry.Extended[s3_constants.ExtDeleteMarkerKey]; ok && string(deleteMarker) == "true" { + result.deleteMarker = true + } + } + governanceBypassAllowed := s3a.evaluateGovernanceBypassRequest(r, bucket, object) + if err := s3a.enforceObjectLockProtections(r, bucket, object, versionId, governanceBypassAllowed); err != nil { + glog.V(2).Infof("deleteVersionedObject: object lock check failed for %s/%s version %s: %v", bucket, object, versionId, err) + return result, s3err.ErrAccessDenied + } + if err := s3a.deleteSpecificObjectVersion(bucket, object, versionId); err != nil { + glog.Errorf("deleteVersionedObject: failed to delete specific version %s for %s/%s: %v", versionId, bucket, object, err) + return result, s3err.ErrInternalError + } + result.versionId = versionId + return result, s3err.ErrNone + + case versioningState == s3_constants.VersioningEnabled: + deleteMarkerVersionId, err := s3a.createDeleteMarker(bucket, object) + if err != nil { + glog.Errorf("deleteVersionedObject: failed to create delete marker for %s/%s: %v", bucket, object, err) + return result, s3err.ErrInternalError + } + result.versionId = deleteMarkerVersionId + result.deleteMarker = true + return result, s3err.ErrNone + + case versioningState == s3_constants.VersioningSuspended: + governanceBypassAllowed := s3a.evaluateGovernanceBypassRequest(r, bucket, object) + if err := s3a.enforceObjectLockProtections(r, bucket, object, "null", governanceBypassAllowed); err != nil { + glog.V(2).Infof("deleteVersionedObject: object lock check failed for %s/%s null version: %v", bucket, object, err) + return result, s3err.ErrAccessDenied + } + if err := s3a.deleteSpecificObjectVersion(bucket, object, "null"); err != nil { + glog.Errorf("deleteVersionedObject: failed to delete null version for %s/%s: %v", bucket, object, err) + return result, s3err.ErrInternalError + } + deleteMarkerVersionId, err := s3a.createDeleteMarker(bucket, object) + if err != nil { + glog.Errorf("deleteVersionedObject: failed to create delete marker for suspended versioning %s/%s: %v", bucket, object, err) + return result, s3err.ErrInternalError + } + result.versionId = deleteMarkerVersionId + result.deleteMarker = true + return result, s3err.ErrNone + } + + glog.Errorf("deleteVersionedObject: unsupported versioning state %q for %s/%s", versioningState, bucket, object) + return result, s3err.ErrInternalError +} + +func (s3a *S3ApiServer) deleteUnversionedObjectWithClient(client filer_pb.SeaweedFilerClient, bucket, object string) error { + target := util.NewFullPath(s3a.bucketDir(bucket), object) + dir, name := target.DirAndName() + return deleteObjectEntry(client, dir, name, true, false) +} + func (s3a *S3ApiServer) DeleteObjectHandler(w http.ResponseWriter, r *http.Request) { bucket, object := s3_constants.GetBucketAndObject(r) @@ -42,8 +196,6 @@ func (s3a *S3ApiServer) DeleteObjectHandler(w http.ResponseWriter, r *http.Reque return } - versioningEnabled := (versioningState == s3_constants.VersioningEnabled) - versioningSuspended := (versioningState == s3_constants.VersioningSuspended) versioningConfigured := (versioningState != "") var auditLog *s3err.AccessLog @@ -51,93 +203,49 @@ func (s3a *S3ApiServer) DeleteObjectHandler(w http.ResponseWriter, r *http.Reque auditLog = s3err.GetAccessLog(r, http.StatusNoContent, s3err.ErrNone) } - if versioningConfigured { - // Handle versioned delete based on specific versioning state - if versionId != "" { - // Delete specific version (same for both enabled and suspended) - // Check object lock permissions before deleting specific version - governanceBypassAllowed := s3a.evaluateGovernanceBypassRequest(r, bucket, object) - if err := s3a.enforceObjectLockProtections(r, bucket, object, versionId, governanceBypassAllowed); err != nil { - glog.V(2).Infof("DeleteObjectHandler: object lock check failed for %s/%s: %v", bucket, object, err) - s3err.WriteErrorResponse(w, r, s3err.ErrAccessDenied) - return - } - - // Delete specific version - err := s3a.deleteSpecificObjectVersion(bucket, object, versionId) - if err != nil { - glog.Errorf("Failed to delete specific version %s: %v", versionId, err) - s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) - return - } - - // Set version ID in response header - w.Header().Set("x-amz-version-id", versionId) - } else { - // Delete without version ID - behavior depends on versioning state - if versioningEnabled { - // Enabled versioning: Create delete marker (logical delete) - // AWS S3 behavior: Delete marker creation is NOT blocked by object retention - // because it's a logical delete that doesn't actually remove the retained version - deleteMarkerVersionId, err := s3a.createDeleteMarker(bucket, object) - if err != nil { - glog.Errorf("Failed to create delete marker: %v", err) - s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) - return - } - - // Set delete marker version ID in response header - w.Header().Set("x-amz-version-id", deleteMarkerVersionId) - w.Header().Set("x-amz-delete-marker", "true") - } else if versioningSuspended { - // Suspended versioning: Actually delete the "null" version object - glog.V(2).Infof("DeleteObjectHandler: deleting null version for suspended versioning %s/%s", bucket, object) - - // Check object lock permissions before deleting "null" version - governanceBypassAllowed := s3a.evaluateGovernanceBypassRequest(r, bucket, object) - if err := s3a.enforceObjectLockProtections(r, bucket, object, "null", governanceBypassAllowed); err != nil { - glog.V(2).Infof("DeleteObjectHandler: object lock check failed for %s/%s: %v", bucket, object, err) - s3err.WriteErrorResponse(w, r, s3err.ErrAccessDenied) - return - } - - // Delete the "null" version (the regular file) - err := s3a.deleteSpecificObjectVersion(bucket, object, "null") - if err != nil { - glog.Errorf("Failed to delete null version: %v", err) - s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) - return - } + if ifMatchResult := s3a.checkDeleteIfMatch(bucket, object, versionId, versioningState, r.Header.Get(s3_constants.IfMatch), s3err.ErrPreconditionFailed); ifMatchResult != s3err.ErrNone { + s3err.WriteErrorResponse(w, r, ifMatchResult) + return + } - // Note: According to AWS S3 spec, suspended versioning should NOT return version ID headers - // The object is deleted but no version information is returned + var deleteResult deleteMutationResult + deleteCode := s3a.withObjectWriteLock(bucket, object, func() s3err.ErrorCode { + return s3a.checkDeleteIfMatch(bucket, object, versionId, versioningState, r.Header.Get(s3_constants.IfMatch), s3err.ErrPreconditionFailed) + }, func() s3err.ErrorCode { + if versioningConfigured { + result, errCode := s3a.deleteVersionedObject(r, bucket, object, versionId, versioningState) + if errCode != s3err.ErrNone { + return errCode } + deleteResult = result + return s3err.ErrNone } - } else { - // Handle regular delete (non-versioned) - // Check object lock permissions before deleting object + governanceBypassAllowed := s3a.evaluateGovernanceBypassRequest(r, bucket, object) if err := s3a.enforceObjectLockProtections(r, bucket, object, "", governanceBypassAllowed); err != nil { glog.V(2).Infof("DeleteObjectHandler: object lock check failed for %s/%s: %v", bucket, object, err) - s3err.WriteErrorResponse(w, r, s3err.ErrAccessDenied) - return + return s3err.ErrAccessDenied } - // Normalize trailing-slash object keys (e.g. "path/") to the - // underlying directory entry path so DeleteEntry gets a valid name. - target := util.NewFullPath(s3a.bucketDir(bucket), object) - dir, name := target.DirAndName() + if err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + return s3a.deleteUnversionedObjectWithClient(client, bucket, object) + }); err != nil { + glog.Errorf("DeleteObjectHandler: failed to delete %s/%s: %v", bucket, object, err) + return s3err.ErrInternalError + } - err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { - return deleteObjectEntry(client, dir, name, true, false) - // Note: Empty folder cleanup is now handled asynchronously by EmptyFolderCleaner - // which listens to metadata events and uses consistent hashing for coordination - }) + return s3err.ErrNone + }) + if deleteCode != s3err.ErrNone { + s3err.WriteErrorResponse(w, r, deleteCode) + return + } - if err != nil { - s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) - return - } + if deleteResult.versionId != "" { + w.Header().Set("x-amz-version-id", deleteResult.versionId) + } + if deleteResult.deleteMarker { + w.Header().Set("x-amz-delete-marker", "true") } if auditLog != nil { @@ -154,6 +262,7 @@ func (s3a *S3ApiServer) DeleteObjectHandler(w http.ResponseWriter, r *http.Reque type ObjectIdentifier struct { Key string `xml:"Key"` VersionId string `xml:"VersionId,omitempty"` + ETag string `xml:"ETag,omitempty"` DeleteMarker bool `xml:"DeleteMarker,omitempty"` DeleteMarkerVersionId string `xml:"DeleteMarkerVersionId,omitempty"` } @@ -228,133 +337,63 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h return } - versioningEnabled := (versioningState == s3_constants.VersioningEnabled) - versioningSuspended := (versioningState == s3_constants.VersioningSuspended) versioningConfigured := (versioningState != "") + deletedCount := 0 - s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + err = s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { // delete file entries for _, object := range deleteObjects.Objects { if object.Key == "" { continue } if err := s3a.validateTableBucketObjectPath(bucket, object.Key); err != nil { - deleteErrors = append(deleteErrors, DeleteError{ - Code: s3err.GetAPIError(s3err.ErrAccessDenied).Code, - Message: s3err.GetAPIError(s3err.ErrAccessDenied).Description, - Key: object.Key, - VersionId: object.VersionId, - }) + deleteErrors = append(deleteErrors, deleteErrorFromCode(s3err.ErrAccessDenied, object.Key, object.VersionId)) continue } - // Check object lock permissions before deletion (only for versioned buckets) - if versioningConfigured { - // Validate governance bypass for this specific object + var deleteResult deleteMutationResult + deleteCode := s3a.withObjectWriteLock(bucket, object.Key, func() s3err.ErrorCode { + return s3a.checkDeleteIfMatch(bucket, object.Key, object.VersionId, versioningState, object.ETag, s3err.ErrNoSuchKey) + }, func() s3err.ErrorCode { + if versioningConfigured { + result, errCode := s3a.deleteVersionedObject(r, bucket, object.Key, object.VersionId, versioningState) + if errCode != s3err.ErrNone { + return errCode + } + deleteResult = result + return s3err.ErrNone + } + governanceBypassAllowed := s3a.evaluateGovernanceBypassRequest(r, bucket, object.Key) - if err := s3a.enforceObjectLockProtections(r, bucket, object.Key, object.VersionId, governanceBypassAllowed); err != nil { - glog.V(2).Infof("DeleteMultipleObjectsHandler: object lock check failed for %s/%s (version: %s): %v", bucket, object.Key, object.VersionId, err) - deleteErrors = append(deleteErrors, DeleteError{ - Code: s3err.GetAPIError(s3err.ErrAccessDenied).Code, - Message: s3err.GetAPIError(s3err.ErrAccessDenied).Description, - Key: object.Key, - VersionId: object.VersionId, - }) - continue + if err := s3a.enforceObjectLockProtections(r, bucket, object.Key, "", governanceBypassAllowed); err != nil { + glog.V(2).Infof("DeleteMultipleObjectsHandler: object lock check failed for %s/%s: %v", bucket, object.Key, err) + return s3err.ErrAccessDenied } - } - var deleteVersionId string - var isDeleteMarker bool - - if versioningConfigured { - // Handle versioned delete based on specific versioning state - if object.VersionId != "" { - // Delete specific version (same for both enabled and suspended) - err := s3a.deleteSpecificObjectVersion(bucket, object.Key, object.VersionId) - if err != nil { - deleteErrors = append(deleteErrors, DeleteError{ - Code: "", - Message: err.Error(), - Key: object.Key, - VersionId: object.VersionId, - }) - continue - } - deleteVersionId = object.VersionId - } else { - // Delete without version ID - behavior depends on versioning state - if versioningEnabled { - // Enabled versioning: Create delete marker (logical delete) - deleteMarkerVersionId, err := s3a.createDeleteMarker(bucket, object.Key) - if err != nil { - deleteErrors = append(deleteErrors, DeleteError{ - Code: "", - Message: err.Error(), - Key: object.Key, - VersionId: object.VersionId, - }) - continue - } - deleteVersionId = deleteMarkerVersionId - isDeleteMarker = true - } else if versioningSuspended { - // Suspended versioning: Actually delete the "null" version object - glog.V(2).Infof("DeleteMultipleObjectsHandler: deleting null version for suspended versioning %s/%s", bucket, object.Key) - - err := s3a.deleteSpecificObjectVersion(bucket, object.Key, "null") - if err != nil { - deleteErrors = append(deleteErrors, DeleteError{ - Code: "", - Message: err.Error(), - Key: object.Key, - VersionId: "null", - }) - continue - } - deleteVersionId = "null" - // Note: For suspended versioning, we don't set isDeleteMarker=true - // because we actually deleted the object, not created a delete marker - } + if err := s3a.deleteUnversionedObjectWithClient(client, bucket, object.Key); err != nil { + glog.Errorf("DeleteMultipleObjectsHandler: failed to delete %s/%s: %v", bucket, object.Key, err) + return s3err.ErrInternalError } - // Add to successful deletions with version info + return s3err.ErrNone + }) + if deleteCode != s3err.ErrNone { + deleteErrors = append(deleteErrors, deleteErrorFromCode(deleteCode, object.Key, object.VersionId)) + continue + } + + deletedCount++ + if !deleteObjects.Quiet { deletedObject := ObjectIdentifier{ - Key: object.Key, - VersionId: deleteVersionId, - DeleteMarker: isDeleteMarker, + Key: object.Key, + VersionId: deleteResult.versionId, } - - // For delete markers, also set DeleteMarkerVersionId field - if isDeleteMarker { - deletedObject.DeleteMarkerVersionId = deleteVersionId - // Don't set VersionId for delete markers, use DeleteMarkerVersionId instead + if deleteResult.deleteMarker { + deletedObject.DeleteMarker = true + deletedObject.DeleteMarkerVersionId = deleteResult.versionId deletedObject.VersionId = "" } - if !deleteObjects.Quiet { - deletedObjects = append(deletedObjects, deletedObject) - } - if isDeleteMarker { - // For delete markers, we don't need to track directories for cleanup - continue - } - } else { - // Handle non-versioned delete (original logic) - target := util.NewFullPath(s3a.bucketDir(bucket), object.Key) - parentDirectoryPath, entryName := target.DirAndName() - isDeleteData, isRecursive := true, false - - err := deleteObjectEntry(client, parentDirectoryPath, entryName, isDeleteData, isRecursive) - if err == nil { - deletedObjects = append(deletedObjects, object) - } else { - deleteErrors = append(deleteErrors, DeleteError{ - Code: "", - Message: err.Error(), - Key: object.Key, - VersionId: object.VersionId, - }) - } + deletedObjects = append(deletedObjects, deletedObject) } if auditLog != nil { @@ -368,6 +407,11 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h return nil }) + if err != nil { + glog.Errorf("DeleteMultipleObjectsHandler: failed to initialize filer client for bucket %s: %v", bucket, err) + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) + return + } deleteResp := DeleteObjectsResponse{} if !deleteObjects.Quiet { @@ -375,7 +419,7 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h } deleteResp.Errors = deleteErrors stats_collect.RecordBucketActiveTime(bucket) - stats_collect.S3DeletedObjectsCounter.WithLabelValues(bucket).Add(float64(len(deletedObjects))) + stats_collect.S3DeletedObjectsCounter.WithLabelValues(bucket).Add(float64(deletedCount)) writeSuccessResponseXML(w, r, deleteResp) diff --git a/weed/s3api/s3api_object_handlers_delete_test.go b/weed/s3api/s3api_object_handlers_delete_test.go new file mode 100644 index 000000000..5596d6130 --- /dev/null +++ b/weed/s3api/s3api_object_handlers_delete_test.go @@ -0,0 +1,119 @@ +package s3api + +import ( + "encoding/xml" + "testing" + + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3err" +) + +func TestValidateDeleteIfMatch(t *testing.T) { + s3a := NewS3ApiServerForTest() + existingEntry := &filer_pb.Entry{ + Extended: map[string][]byte{ + s3_constants.ExtETagKey: []byte("\"abc123\""), + }, + } + deleteMarkerEntry := &filer_pb.Entry{ + Extended: map[string][]byte{ + s3_constants.ExtDeleteMarkerKey: []byte("true"), + }, + } + + testCases := []struct { + name string + entry *filer_pb.Entry + ifMatch string + missingCode s3err.ErrorCode + expected s3err.ErrorCode + }{ + { + name: "matching etag succeeds", + entry: existingEntry, + ifMatch: "\"abc123\"", + missingCode: s3err.ErrPreconditionFailed, + expected: s3err.ErrNone, + }, + { + name: "wildcard succeeds for existing entry", + entry: existingEntry, + ifMatch: "*", + missingCode: s3err.ErrPreconditionFailed, + expected: s3err.ErrNone, + }, + { + name: "mismatched etag fails", + entry: existingEntry, + ifMatch: "\"other\"", + missingCode: s3err.ErrPreconditionFailed, + expected: s3err.ErrPreconditionFailed, + }, + { + name: "missing current object fails single delete", + entry: nil, + ifMatch: "*", + missingCode: s3err.ErrPreconditionFailed, + expected: s3err.ErrPreconditionFailed, + }, + { + name: "missing current object returns no such key for batch delete", + entry: nil, + ifMatch: "*", + missingCode: s3err.ErrNoSuchKey, + expected: s3err.ErrNoSuchKey, + }, + { + name: "current delete marker behaves like missing object", + entry: normalizeConditionalTargetEntry(deleteMarkerEntry), + ifMatch: "*", + missingCode: s3err.ErrPreconditionFailed, + expected: s3err.ErrPreconditionFailed, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + if errCode := s3a.validateDeleteIfMatch(tc.entry, tc.ifMatch, tc.missingCode); errCode != tc.expected { + t.Fatalf("validateDeleteIfMatch() = %v, want %v", errCode, tc.expected) + } + }) + } +} + +func TestDeleteObjectsRequestUnmarshalConditionalETags(t *testing.T) { + var req DeleteObjectsRequest + body := []byte(` + + true + + first.txt + * + + + second.txt + 3HL4kqCxf3vjVBH40Nrjfkd + "abc123" + +`) + + if err := xml.Unmarshal(body, &req); err != nil { + t.Fatalf("xml.Unmarshal() error = %v", err) + } + if !req.Quiet { + t.Fatalf("expected Quiet=true") + } + if len(req.Objects) != 2 { + t.Fatalf("expected 2 objects, got %d", len(req.Objects)) + } + if req.Objects[0].ETag != "*" { + t.Fatalf("expected first object ETag to be '*', got %q", req.Objects[0].ETag) + } + if req.Objects[1].ETag != "\"abc123\"" { + t.Fatalf("expected second object ETag to preserve quotes, got %q", req.Objects[1].ETag) + } + if req.Objects[1].VersionId != "3HL4kqCxf3vjVBH40Nrjfkd" { + t.Fatalf("expected second object VersionId to unmarshal, got %q", req.Objects[1].VersionId) + } +} diff --git a/weed/s3api/s3api_object_handlers_multipart.go b/weed/s3api/s3api_object_handlers_multipart.go index 46c249658..92745cdcc 100644 --- a/weed/s3api/s3api_object_handlers_multipart.go +++ b/weed/s3api/s3api_object_handlers_multipart.go @@ -447,7 +447,7 @@ func (s3a *S3ApiServer) PutObjectPartHandler(w http.ResponseWriter, r *http.Requ glog.V(2).Infof("PutObjectPart: bucket=%s, object=%s, uploadId=%s, partNumber=%d, size=%d", bucket, object, uploadID, partID, r.ContentLength) - etag, errCode, sseMetadata := s3a.putToFiler(r, filePath, dataReader, bucket, partID) + etag, errCode, sseMetadata := s3a.putToFiler(r, filePath, dataReader, bucket, "", partID, nil) if errCode != s3err.ErrNone { glog.Errorf("PutObjectPart: putToFiler failed with error code %v for bucket=%s, object=%s, partNumber=%d", errCode, bucket, object, partID) diff --git a/weed/s3api/s3api_object_handlers_postpolicy.go b/weed/s3api/s3api_object_handlers_postpolicy.go index 126af560e..d16810f4e 100644 --- a/weed/s3api/s3api_object_handlers_postpolicy.go +++ b/weed/s3api/s3api_object_handlers_postpolicy.go @@ -140,7 +140,7 @@ func (s3a *S3ApiServer) PostPolicyBucketHandler(w http.ResponseWriter, r *http.R } } - etag, errCode, sseMetadata := s3a.putToFiler(r, filePath, fileBody, bucket, 1) + etag, errCode, sseMetadata := s3a.putToFiler(r, filePath, fileBody, bucket, object, 1, nil) if errCode != s3err.ErrNone { s3err.WriteErrorResponse(w, r, errCode) diff --git a/weed/s3api/s3api_object_handlers_put.go b/weed/s3api/s3api_object_handlers_put.go index 2625df8be..5419a4852 100644 --- a/weed/s3api/s3api_object_handlers_put.go +++ b/weed/s3api/s3api_object_handlers_put.go @@ -292,7 +292,7 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request) dataReader = mimeDetect(r, dataReader) } - etag, errCode, sseMetadata := s3a.putToFiler(r, filePath, dataReader, bucket, 1) + etag, errCode, sseMetadata := s3a.putToFiler(r, filePath, dataReader, bucket, object, 1, nil) if errCode != s3err.ErrNone { s3err.WriteErrorResponse(w, r, errCode) @@ -312,7 +312,42 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request) writeSuccessResponseEmpty(w, r) } -func (s3a *S3ApiServer) putToFiler(r *http.Request, filePath string, dataReader io.Reader, bucket string, partNumber int) (etag string, code s3err.ErrorCode, sseMetadata SSEResponseMetadata) { +func (s3a *S3ApiServer) withObjectWriteLock(bucket, object string, preconditionFn func() s3err.ErrorCode, fn func() s3err.ErrorCode) s3err.ErrorCode { + runPrecondition := func() s3err.ErrorCode { + if preconditionFn == nil { + return s3err.ErrNone + } + return preconditionFn() + } + + if object == "" || s3a.newObjectWriteLock == nil { + if errCode := runPrecondition(); errCode != s3err.ErrNone { + return errCode + } + return fn() + } + + lock := s3a.newObjectWriteLock(bucket, object) + if lock == nil { + if errCode := runPrecondition(); errCode != s3err.ErrNone { + return errCode + } + return fn() + } + defer func() { + if err := lock.StopShortLivedLock(); err != nil { + glog.Warningf("withObjectWriteLock: failed to release lock for %s/%s: %v", bucket, object, err) + } + }() + + if errCode := runPrecondition(); errCode != s3err.ErrNone { + return errCode + } + + return fn() +} + +func (s3a *S3ApiServer) putToFiler(r *http.Request, filePath string, dataReader io.Reader, bucket string, object string, partNumber int, afterCreate func(entry *filer_pb.Entry) s3err.ErrorCode) (etag string, code s3err.ErrorCode, sseMetadata SSEResponseMetadata) { // NEW OPTIMIZATION: Write directly to volume servers, bypassing filer proxy // This eliminates the filer proxy overhead for PUT operations // Note: filePath is now passed directly instead of URL (no parsing needed) @@ -598,12 +633,8 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, filePath string, dataReader // Store ETag in Extended attribute for future retrieval (e.g. multipart parts) entry.Extended[s3_constants.ExtETagKey] = []byte(etag) - // Set object owner - amzAccountId := r.Header.Get(s3_constants.AmzAccountId) - if amzAccountId != "" { - entry.Extended[s3_constants.ExtAmzOwnerKey] = []byte(amzAccountId) - glog.V(2).Infof("putToFiler: setting owner %s for object %s", amzAccountId, filePath) - } + // Set object owner according to bucket ownership settings. + s3a.setObjectOwnerFromRequest(r, bucket, entry) // Set version ID if present if versionIdHeader := r.Header.Get(s3_constants.ExtVersionIdKey); versionIdHeader != "" { @@ -611,6 +642,16 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, filePath string, dataReader glog.V(3).Infof("putToFiler: setting version ID %s for object %s", versionIdHeader, filePath) } + for _, metadataHeader := range []string{ + s3_constants.ExtObjectLockModeKey, + s3_constants.ExtRetentionUntilDateKey, + s3_constants.ExtLegalHoldKey, + } { + if value := r.Header.Get(metadataHeader); value != "" { + entry.Extended[metadataHeader] = []byte(value) + } + } + // Set TTL-based S3 expiry flag only if object has a TTL if entry.Attributes.TtlSec > 0 { entry.Extended[s3_constants.SeaweedFSExpiresS3] = []byte("true") @@ -699,30 +740,57 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, filePath string, dataReader // This matches the chunk upload behavior and prevents orphaned chunks glog.V(3).Infof("putToFiler: About to create entry - dir=%s, name=%s, chunks=%d, extended keys=%d", path.Dir(filePath), path.Base(filePath), len(entry.Chunks), len(entry.Extended)) - createErr := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { - req := &filer_pb.CreateEntryRequest{ - Directory: path.Dir(filePath), - Entry: entry, - } - glog.V(3).Infof("putToFiler: Calling CreateEntry for %s", filePath) - if err := filer_pb.CreateEntry(context.Background(), client, req); err != nil { - glog.Errorf("putToFiler: CreateEntry returned error: %v", err) - return err + var createErr error + var rollbackErr error + entryCreated := false + preconditionFn := func() s3err.ErrorCode { + if object == "" { + return s3err.ErrNone + } + return s3a.checkConditionalHeaders(r, bucket, object) + } + createCode := s3a.withObjectWriteLock(bucket, object, preconditionFn, func() s3err.ErrorCode { + createErr = s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + req := &filer_pb.CreateEntryRequest{ + Directory: path.Dir(filePath), + Entry: entry, + } + glog.V(3).Infof("putToFiler: Calling CreateEntry for %s", filePath) + if err := filer_pb.CreateEntry(context.Background(), client, req); err != nil { + glog.Errorf("putToFiler: CreateEntry returned error: %v", err) + return err + } + return nil + }) + if createErr != nil { + return filerErrorToS3Error(createErr) + } + entryCreated = true + if afterCreate != nil { + if afterCreateCode := afterCreate(entry); afterCreateCode != s3err.ErrNone { + rollbackErr = s3a.rmObject(path.Dir(filePath), path.Base(filePath), true, false) + if rollbackErr != nil { + glog.Errorf("putToFiler: failed to rollback created entry for %s after post-create error: %v", filePath, rollbackErr) + } else { + entryCreated = false + } + return afterCreateCode + } } - return nil + return s3err.ErrNone }) - if createErr != nil { - glog.Errorf("putToFiler: failed to create entry for %s: %v", filePath, createErr) + if createCode != s3err.ErrNone { + if createErr != nil { + glog.Errorf("putToFiler: failed to create entry for %s: %v", filePath, createErr) + } - // CRITICAL: Cleanup orphaned chunks before returning error - // If CreateEntry fails, the uploaded chunks are orphaned and must be deleted - // to prevent resource leaks and wasted storage - if len(chunkResult.FileChunks) > 0 { - glog.Warningf("putToFiler: CreateEntry failed, attempting to cleanup %d orphaned chunks", len(chunkResult.FileChunks)) + // If the entry was never created, the uploaded chunks are orphaned and must be deleted. + if !entryCreated && len(chunkResult.FileChunks) > 0 { + glog.Warningf("putToFiler: finalization failed, attempting to cleanup %d orphaned chunks", len(chunkResult.FileChunks)) s3a.deleteOrphanedChunks(chunkResult.FileChunks) } - return "", filerErrorToS3Error(createErr), SSEResponseMetadata{} + return "", createCode, SSEResponseMetadata{} } glog.V(3).Infof("putToFiler: CreateEntry SUCCESS for %s", filePath) @@ -982,7 +1050,7 @@ func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, ob } // Upload the file using putToFiler - this will create the file with version metadata - etag, errCode, sseMetadata = s3a.putToFiler(r, filePath, body, bucket, 1) + etag, errCode, sseMetadata = s3a.putToFiler(r, filePath, body, bucket, normalizedObject, 1, nil) if errCode != s3err.ErrNone { glog.Errorf("putSuspendedVersioningObject: failed to upload object: %v", errCode) return "", errCode, SSEResponseMetadata{} @@ -1088,8 +1156,6 @@ func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object strin // We need to construct the object path relative to the bucket versionObjectPath := normalizedObject + s3_constants.VersionsFolder + "/" + versionFileName versionFilePath := s3a.toFilerPath(bucket, versionObjectPath) - bucketDir := s3a.bucketDir(bucket) - body := dataReader if objectContentType == "" { body = mimeDetect(r, body) @@ -1097,71 +1163,55 @@ func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object strin glog.V(2).Infof("putVersionedObject: uploading %s/%s version %s to %s", bucket, object, versionId, versionFilePath) - etag, errCode, sseMetadata = s3a.putToFiler(r, versionFilePath, body, bucket, 1) - if errCode != s3err.ErrNone { - glog.Errorf("putVersionedObject: failed to upload version: %v", errCode) - return "", "", errCode, SSEResponseMetadata{} - } + r.Header.Set(s3_constants.ExtVersionIdKey, versionId) + defer r.Header.Del(s3_constants.ExtVersionIdKey) - // Get the uploaded entry to add versioning metadata - // Use retry logic to handle filer consistency delays - var versionEntry *filer_pb.Entry - var err error - maxRetries := 8 - for attempt := 1; attempt <= maxRetries; attempt++ { - versionEntry, err = s3a.getEntry(bucketDir, versionObjectPath) - if err == nil { - break - } + explicitMode := r.Header.Get(s3_constants.AmzObjectLockMode) + explicitRetainUntilDate := r.Header.Get(s3_constants.AmzObjectLockRetainUntilDate) - if attempt < maxRetries { - // Exponential backoff: 10ms, 20ms, 40ms, 80ms, 160ms, 320ms, 640ms - delay := time.Millisecond * time.Duration(10*(1<<(attempt-1))) - time.Sleep(delay) - } + if explicitMode != "" { + r.Header.Set(s3_constants.ExtObjectLockModeKey, explicitMode) + defer r.Header.Del(s3_constants.ExtObjectLockModeKey) } - - if err != nil { - glog.Errorf("putVersionedObject: failed to get version entry after %d attempts: %v", maxRetries, err) - return "", "", s3err.ErrInternalError, SSEResponseMetadata{} + if explicitRetainUntilDate != "" { + parsedTime, parseErr := time.Parse(time.RFC3339, explicitRetainUntilDate) + if parseErr != nil { + glog.Errorf("putVersionedObject: failed to parse retention until date: %v", parseErr) + return "", "", s3err.ErrInvalidRequest, SSEResponseMetadata{} + } + r.Header.Set(s3_constants.ExtRetentionUntilDateKey, strconv.FormatInt(parsedTime.Unix(), 10)) + defer r.Header.Del(s3_constants.ExtRetentionUntilDateKey) } - - // Add versioning metadata to this version - if versionEntry.Extended == nil { - versionEntry.Extended = make(map[string][]byte) + if legalHold := r.Header.Get(s3_constants.AmzObjectLockLegalHold); legalHold != "" { + r.Header.Set(s3_constants.ExtLegalHoldKey, legalHold) + defer r.Header.Del(s3_constants.ExtLegalHoldKey) } - versionEntry.Extended[s3_constants.ExtVersionIdKey] = []byte(versionId) - - // Store ETag (unquoted) in Extended attribute - versionEntry.Extended[s3_constants.ExtETagKey] = []byte(etag) - - // Set object owner for versioned objects - s3a.setObjectOwnerFromRequest(r, bucket, versionEntry) - - // Extract and store object lock metadata from request headers - if err := s3a.extractObjectLockMetadataFromRequest(r, versionEntry); err != nil { - glog.Errorf("putVersionedObject: failed to extract object lock metadata: %v", err) - return "", "", s3err.ErrInvalidRequest, SSEResponseMetadata{} + if explicitMode == "" && explicitRetainUntilDate == "" { + tempEntry := &filer_pb.Entry{Extended: make(map[string][]byte)} + if err := s3a.applyBucketDefaultRetention(bucket, tempEntry); err == nil { + if modeBytes, ok := tempEntry.Extended[s3_constants.ExtObjectLockModeKey]; ok { + r.Header.Set(s3_constants.ExtObjectLockModeKey, string(modeBytes)) + defer r.Header.Del(s3_constants.ExtObjectLockModeKey) + } + if dateBytes, ok := tempEntry.Extended[s3_constants.ExtRetentionUntilDateKey]; ok { + r.Header.Set(s3_constants.ExtRetentionUntilDateKey, string(dateBytes)) + defer r.Header.Del(s3_constants.ExtRetentionUntilDateKey) + } + } } - // Update the version entry with metadata - err = s3a.mkFile(bucketDir, versionObjectPath, versionEntry.Chunks, func(updatedEntry *filer_pb.Entry) { - updatedEntry.Extended = versionEntry.Extended - updatedEntry.Attributes = versionEntry.Attributes - updatedEntry.Chunks = versionEntry.Chunks + etag, errCode, sseMetadata = s3a.putToFiler(r, versionFilePath, body, bucket, normalizedObject, 1, func(versionEntry *filer_pb.Entry) s3err.ErrorCode { + if err := s3a.updateLatestVersionInDirectory(bucket, normalizedObject, versionId, versionFileName, versionEntry); err != nil { + glog.Errorf("putVersionedObject: failed to update latest version in directory: %v", err) + return s3err.ErrInternalError + } + return s3err.ErrNone }) - if err != nil { - glog.Errorf("putVersionedObject: failed to update version metadata: %v", err) - return "", "", s3err.ErrInternalError, SSEResponseMetadata{} + if errCode != s3err.ErrNone { + glog.Errorf("putVersionedObject: failed to upload version: %v", errCode) + return "", "", errCode, SSEResponseMetadata{} } - // Update the .versions directory metadata to indicate this is the latest version - // Pass versionEntry to cache its metadata for single-scan list efficiency - err = s3a.updateLatestVersionInDirectory(bucket, normalizedObject, versionId, versionFileName, versionEntry) - if err != nil { - glog.Errorf("putVersionedObject: failed to update latest version in directory: %v", err) - return "", "", s3err.ErrInternalError, SSEResponseMetadata{} - } glog.V(2).Infof("putVersionedObject: successfully created version %s for %s/%s (normalized: %s)", versionId, bucket, object, normalizedObject) return versionId, etag, s3err.ErrNone, sseMetadata } @@ -1685,12 +1735,25 @@ func (s3a *S3ApiServer) etagMatches(headerValue, objectETag string) bool { return false } +func normalizeConditionalTargetEntry(entry *filer_pb.Entry) *filer_pb.Entry { + if entry == nil { + return nil + } + if entry.Extended != nil { + if deleteMarker, exists := entry.Extended[s3_constants.ExtDeleteMarkerKey]; exists && string(deleteMarker) == "true" { + return nil + } + } + return entry +} + // validateConditionalHeaders checks conditional headers against the provided entry func (s3a *S3ApiServer) validateConditionalHeaders(r *http.Request, headers conditionalHeaders, entry *filer_pb.Entry, bucket, object string) s3err.ErrorCode { if !headers.isSet { return s3err.ErrNone } + entry = normalizeConditionalTargetEntry(entry) objectExists := entry != nil // For PUT requests, all specified conditions must be met. @@ -1812,7 +1875,7 @@ func (s3a *S3ApiServer) checkConditionalHeaders(r *http.Request, bucket, object // This ensures we check conditions against the LATEST version, not a null version. entry, err := s3a.resolveObjectEntry(bucket, object) if err != nil { - if errors.Is(err, filer_pb.ErrNotFound) { + if errors.Is(err, filer_pb.ErrNotFound) || errors.Is(err, ErrDeleteMarker) { entry = nil } else { glog.Errorf("checkConditionalHeaders: error resolving object entry for %s/%s: %v", bucket, object, err) @@ -1828,6 +1891,7 @@ func (s3a *S3ApiServer) validateConditionalHeadersForReads(r *http.Request, head return ConditionalHeaderResult{ErrorCode: s3err.ErrNone, Entry: entry} } + entry = normalizeConditionalTargetEntry(entry) objectExists := entry != nil // If object doesn't exist, fail for If-Match and If-Unmodified-Since @@ -1954,7 +2018,7 @@ func (s3a *S3ApiServer) checkConditionalHeadersForReads(r *http.Request, bucket, // This ensures we check conditions against the LATEST version, not a null version. entry, err := s3a.resolveObjectEntry(bucket, object) if err != nil { - if errors.Is(err, filer_pb.ErrNotFound) { + if errors.Is(err, filer_pb.ErrNotFound) || errors.Is(err, ErrDeleteMarker) { entry = nil } else { glog.Errorf("checkConditionalHeadersForReads: error resolving object entry for %s/%s: %v", bucket, object, err) diff --git a/weed/s3api/s3api_object_handlers_put_test.go b/weed/s3api/s3api_object_handlers_put_test.go index dd5dee34e..eaad6ab6d 100644 --- a/weed/s3api/s3api_object_handlers_put_test.go +++ b/weed/s3api/s3api_object_handlers_put_test.go @@ -7,6 +7,7 @@ import ( "net/http" "net/http/httptest" "strings" + "sync" "testing" "github.com/gorilla/mux" @@ -199,3 +200,104 @@ func TestNewMultipartUploadHandler_KeyTooLong(t *testing.T) { t.Errorf("expected error code KeyTooLongError, got %s", errResp.Code) } } + +type testObjectWriteLockFactory struct { + mu sync.Mutex + locks map[string]*sync.Mutex +} + +func (f *testObjectWriteLockFactory) newLock(bucket, object string) objectWriteLock { + key := bucket + "|" + object + + f.mu.Lock() + lock, ok := f.locks[key] + if !ok { + lock = &sync.Mutex{} + f.locks[key] = lock + } + f.mu.Unlock() + + lock.Lock() + return &testObjectWriteLock{unlock: lock.Unlock} +} + +type testObjectWriteLock struct { + once sync.Once + unlock func() +} + +func (l *testObjectWriteLock) StopShortLivedLock() error { + l.once.Do(l.unlock) + return nil +} + +func TestWithObjectWriteLockSerializesConcurrentPreconditions(t *testing.T) { + s3a := NewS3ApiServerForTest() + lockFactory := &testObjectWriteLockFactory{ + locks: make(map[string]*sync.Mutex), + } + s3a.newObjectWriteLock = lockFactory.newLock + + const workers = 3 + const bucket = "test-bucket" + const object = "/file.txt" + + start := make(chan struct{}) + results := make(chan s3err.ErrorCode, workers) + var wg sync.WaitGroup + + var stateMu sync.Mutex + objectExists := false + + for i := 0; i < workers; i++ { + wg.Add(1) + go func() { + defer wg.Done() + <-start + + errCode := s3a.withObjectWriteLock(bucket, object, + func() s3err.ErrorCode { + stateMu.Lock() + defer stateMu.Unlock() + if objectExists { + return s3err.ErrPreconditionFailed + } + return s3err.ErrNone + }, + func() s3err.ErrorCode { + stateMu.Lock() + defer stateMu.Unlock() + objectExists = true + return s3err.ErrNone + }, + ) + + results <- errCode + }() + } + + close(start) + wg.Wait() + close(results) + + var successCount int + var preconditionFailedCount int + + for errCode := range results { + switch errCode { + case s3err.ErrNone: + successCount++ + case s3err.ErrPreconditionFailed: + preconditionFailedCount++ + default: + t.Fatalf("unexpected error code: %v", errCode) + } + } + + if successCount != 1 { + t.Fatalf("expected exactly one successful writer, got %d", successCount) + } + if preconditionFailedCount != workers-1 { + t.Fatalf("expected %d precondition failures, got %d", workers-1, preconditionFailedCount) + } +} diff --git a/weed/s3api/s3api_server.go b/weed/s3api/s3api_server.go index f969582bc..42d9e77c4 100644 --- a/weed/s3api/s3api_server.go +++ b/weed/s3api/s3api_server.go @@ -82,8 +82,17 @@ type S3ApiServer struct { embeddedIam *EmbeddedIamApi // Embedded IAM API server (when enabled) stsHandlers *STSHandlers // STS HTTP handlers for AssumeRoleWithWebIdentity cipher bool // encrypt data on volume servers + newObjectWriteLock func(bucket, object string) objectWriteLock } +type objectWriteLock interface { + StopShortLivedLock() error +} + +const ( + objectWriteLockTTL = 15 * time.Second +) + func NewS3ApiServer(router *mux.Router, option *S3ApiServerOption) (s3ApiServer *S3ApiServer, err error) { return NewS3ApiServerWithStore(router, option, "") } @@ -182,6 +191,21 @@ func NewS3ApiServerWithStore(router *mux.Router, option *S3ApiServerOption, expl cipher: option.Cipher, } + if len(option.Filers) > 0 { + objectWriteLockClient := cluster.NewLockClient(option.GrpcDialOption, option.Filers[0]) + s3ApiServer.newObjectWriteLock = func(bucket, object string) objectWriteLock { + lockKey := fmt.Sprintf("s3.object.write:%s", s3ApiServer.toFilerPath(bucket, object)) + owner := fmt.Sprintf("s3api-%d", s3ApiServer.randomClientId) + lock := objectWriteLockClient.NewShortLivedLock(lockKey, owner) + if err := lock.AttemptToLock(objectWriteLockTTL); err != nil { + // The initial acquisition already succeeded with the default short TTL. + // Renewal to a longer TTL is opportunistic to cover slower metadata paths. + glog.Warningf("objectWriteLock: failed to extend lock TTL for %s: %v", lockKey, err) + } + return lock + } + } + // Set s3a reference in circuit breaker for upload limiting s3ApiServer.cb.s3a = s3ApiServer diff --git a/weed/s3api/s3err/s3api_errors.go b/weed/s3api/s3err/s3api_errors.go index d46e7673b..1822c82c0 100644 --- a/weed/s3api/s3err/s3api_errors.go +++ b/weed/s3api/s3err/s3api_errors.go @@ -68,6 +68,7 @@ const ( ErrInvalidMaxDeleteObjects ErrInvalidPartNumberMarker ErrInvalidPart + ErrInvalidPartOrder ErrInvalidRange ErrInternalError ErrInvalidCopyDest @@ -291,6 +292,11 @@ var errorCodeResponse = map[ErrorCode]APIError{ Description: "One or more of the specified parts could not be found. The part may not have been uploaded, or the specified entity tag may not match the part's entity tag.", HTTPStatusCode: http.StatusBadRequest, }, + ErrInvalidPartOrder: { + Code: "InvalidPartOrder", + Description: "The list of parts was not in ascending order. The parts list must be specified in order by part number.", + HTTPStatusCode: http.StatusBadRequest, + }, ErrInvalidCopyDest: { Code: "InvalidRequest",