Browse Source

s3api: make conditional mutations atomic and AWS-compatible (#8802)

* s3api: serialize conditional write finalization

* s3api: add conditional delete mutation checks

* s3api: enforce destination conditions for copy

* s3api: revalidate multipart completion under lock

* s3api: rollback failed put finalization hooks

* s3api: report delete-marker version deletions

* s3api: fix copy destination versioning edge cases

* s3api: make versioned multipart completion idempotent

* test/s3: cover conditional mutation regressions

* s3api: rollback failed copy version finalization

* s3api: resolve suspended delete conditions via latest entry

* s3api: remove copy test null-version injection

* s3api: reject out-of-order multipart completions

* s3api: preserve multipart replay version metadata

* s3api: surface copy destination existence errors

* s3api: simplify delete condition target resolution

* test/s3: make conditional delete assertions order independent

* test/s3: add distributed lock gateway integration

* s3api: fail closed multipart versioned completion

* s3api: harden copy metadata and overwrite paths

* s3api: create delete markers for suspended deletes

* s3api: allow duplicate multipart completion parts
pull/8804/merge
Chris Lu 21 hours ago
committed by GitHub
parent
commit
0adb78bc6b
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 135
      test/s3/delete/s3_conditional_delete_test.go
  2. 523
      test/s3/distributed_lock/distributed_lock_cluster_test.go
  3. 181
      test/s3/distributed_lock/distributed_lock_test.go
  4. 147
      test/s3/versioning/s3_copy_versioning_regression_test.go
  5. 82
      test/s3/versioning/s3_suspended_delete_marker_regression_test.go
  6. 79
      test/s3/versioning/s3_versioning_multipart_test.go
  7. 510
      weed/s3api/filer_multipart.go
  8. 75
      weed/s3api/filer_multipart_test.go
  9. 54
      weed/s3api/s3api_conditional_headers_test.go
  10. 403
      weed/s3api/s3api_object_handlers_copy.go
  11. 61
      weed/s3api/s3api_object_handlers_copy_test.go
  12. 418
      weed/s3api/s3api_object_handlers_delete.go
  13. 119
      weed/s3api/s3api_object_handlers_delete_test.go
  14. 2
      weed/s3api/s3api_object_handlers_multipart.go
  15. 2
      weed/s3api/s3api_object_handlers_postpolicy.go
  16. 236
      weed/s3api/s3api_object_handlers_put.go
  17. 102
      weed/s3api/s3api_object_handlers_put_test.go
  18. 24
      weed/s3api/s3api_server.go
  19. 6
      weed/s3api/s3err/s3api_errors.go

135
test/s3/delete/s3_conditional_delete_test.go

@ -0,0 +1,135 @@
package delete
import (
"bytes"
"context"
"errors"
"testing"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/s3/types"
"github.com/aws/smithy-go"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestConditionalDeleteIfMatchOnLatestVersion(t *testing.T) {
client := getTestClient(t)
bucket := createTestBucket(t, client)
defer cleanupBucket(t, client, bucket)
key := "conditional-delete.txt"
putResp, err := client.PutObject(context.TODO(), &s3.PutObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(key),
Body: bytes.NewReader([]byte("versioned body")),
})
require.NoError(t, err)
require.NotNil(t, putResp.ETag)
_, err = client.DeleteObject(context.TODO(), &s3.DeleteObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(key),
IfMatch: aws.String(`"not-the-current-etag"`),
})
require.Error(t, err, "DeleteObject should reject a mismatched If-Match header")
var apiErr smithy.APIError
if assert.True(t, errors.As(err, &apiErr), "Expected smithy API error for conditional delete") {
assert.Equal(t, "PreconditionFailed", apiErr.ErrorCode())
}
_, err = client.HeadObject(context.TODO(), &s3.HeadObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(key),
})
require.NoError(t, err, "Object should remain current after a failed conditional delete")
deleteResp, err := client.DeleteObject(context.TODO(), &s3.DeleteObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(key),
IfMatch: putResp.ETag,
})
require.NoError(t, err)
require.NotNil(t, deleteResp.DeleteMarker)
assert.True(t, *deleteResp.DeleteMarker, "Successful conditional delete on a versioned bucket should create a delete marker")
require.NotNil(t, deleteResp.VersionId)
_, err = client.HeadObject(context.TODO(), &s3.HeadObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(key),
})
require.Error(t, err, "Delete marker should hide the current object after a successful conditional delete")
}
func TestConditionalMultiDeletePerObjectETag(t *testing.T) {
client := getTestClient(t)
bucket := createTestBucket(t, client)
defer cleanupBucket(t, client, bucket)
okKey := "delete-ok.txt"
failKey := "delete-fail.txt"
okPutResp, err := client.PutObject(context.TODO(), &s3.PutObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(okKey),
Body: bytes.NewReader([]byte("delete me")),
})
require.NoError(t, err)
require.NotNil(t, okPutResp.ETag)
_, err = client.PutObject(context.TODO(), &s3.PutObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(failKey),
Body: bytes.NewReader([]byte("keep me")),
})
require.NoError(t, err)
deleteResp, err := client.DeleteObjects(context.TODO(), &s3.DeleteObjectsInput{
Bucket: aws.String(bucket),
Delete: &types.Delete{
Objects: []types.ObjectIdentifier{
{
Key: aws.String(okKey),
ETag: okPutResp.ETag,
},
{
Key: aws.String(failKey),
ETag: aws.String(`"mismatched-etag"`),
},
},
},
})
require.NoError(t, err)
require.Len(t, deleteResp.Deleted, 1, "One object should satisfy its ETag precondition")
require.Len(t, deleteResp.Errors, 1, "One object should report a precondition failure")
deletedKeys := make([]string, 0, len(deleteResp.Deleted))
for _, deleted := range deleteResp.Deleted {
deletedKeys = append(deletedKeys, aws.ToString(deleted.Key))
}
assert.Contains(t, deletedKeys, okKey)
var matchedError *types.Error
for i := range deleteResp.Errors {
if aws.ToString(deleteResp.Errors[i].Key) == failKey {
matchedError = &deleteResp.Errors[i]
break
}
}
if assert.NotNil(t, matchedError, "Expected error entry for failed key") {
assert.Equal(t, "PreconditionFailed", aws.ToString(matchedError.Code))
}
_, err = client.HeadObject(context.TODO(), &s3.HeadObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(okKey),
})
require.Error(t, err, "Successfully deleted key should no longer be current")
_, err = client.HeadObject(context.TODO(), &s3.HeadObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(failKey),
})
require.NoError(t, err, "Object with mismatched ETag should remain untouched")
}

523
test/s3/distributed_lock/distributed_lock_cluster_test.go

@ -0,0 +1,523 @@
package distributed_lock
import (
"bufio"
"context"
"encoding/json"
"fmt"
"net"
"net/http"
"os"
"os/exec"
"path/filepath"
"strconv"
"strings"
"sync"
"testing"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/seaweedfs/seaweedfs/test/volume_server/framework"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
const (
distributedLockTestRegion = "us-east-1"
distributedLockTestAccessKey = "some_access_key1"
distributedLockTestSecretKey = "some_secret_key1"
distributedLockTestGroup = "distributed-lock-it"
)
type distributedLockCluster struct {
t testing.TB
baseDir string
configDir string
logsDir string
keepLogs bool
weedBinary string
filerGroup string
s3Config string
masterPort int
masterGrpcPort int
volumePort int
volumeGrpcPort int
filerPorts []int
filerGrpcPorts []int
s3Ports []int
s3GrpcPorts []int
masterCmd *exec.Cmd
volumeCmd *exec.Cmd
filerCmds []*exec.Cmd
s3Cmds []*exec.Cmd
logFiles []*os.File
cleanupOnce sync.Once
}
type s3IdentityConfig struct {
Identities []s3Identity `json:"identities"`
}
type s3Identity struct {
Name string `json:"name"`
Credentials []s3Credential `json:"credentials,omitempty"`
Actions []string `json:"actions"`
}
type s3Credential struct {
AccessKey string `json:"accessKey"`
SecretKey string `json:"secretKey"`
}
func startDistributedLockCluster(t *testing.T) *distributedLockCluster {
t.Helper()
weedBinary, err := framework.FindOrBuildWeedBinary()
require.NoError(t, err, "resolve weed binary")
baseDir, err := os.MkdirTemp("", "seaweedfs_s3_distributed_lock_")
require.NoError(t, err, "create temp directory")
cluster := &distributedLockCluster{
t: t,
baseDir: baseDir,
configDir: filepath.Join(baseDir, "config"),
logsDir: filepath.Join(baseDir, "logs"),
keepLogs: os.Getenv("S3_DISTRIBUTED_LOCK_KEEP_LOGS") == "1",
weedBinary: weedBinary,
filerGroup: distributedLockTestGroup,
filerCmds: make([]*exec.Cmd, 0, 2),
s3Cmds: make([]*exec.Cmd, 0, 2),
}
t.Cleanup(cluster.Stop)
dirs := []string{
cluster.configDir,
cluster.logsDir,
filepath.Join(baseDir, "master"),
filepath.Join(baseDir, "volume"),
filepath.Join(baseDir, "filer0"),
filepath.Join(baseDir, "filer1"),
}
for _, dir := range dirs {
require.NoError(t, os.MkdirAll(dir, 0o755), "create %s", dir)
}
ports, err := allocatePorts(12)
require.NoError(t, err, "allocate ports")
cluster.masterPort = ports[0]
cluster.masterGrpcPort = ports[1]
cluster.volumePort = ports[2]
cluster.volumeGrpcPort = ports[3]
cluster.filerPorts = []int{ports[4], ports[6]}
cluster.filerGrpcPorts = []int{ports[5], ports[7]}
cluster.s3Ports = []int{ports[8], ports[10]}
cluster.s3GrpcPorts = []int{ports[9], ports[11]}
require.NoError(t, cluster.writeSecurityConfig(), "write security config")
require.NoError(t, cluster.writeS3Config(), "write s3 config")
require.NoError(t, cluster.startMaster(), "start master")
require.NoError(t, cluster.waitForHTTP("http://"+cluster.masterHTTPAddress()+"/dir/status", 30*time.Second), "wait for master\n%s", cluster.tailLog("master.log"))
require.NoError(t, cluster.startVolume(), "start volume")
require.NoError(t, cluster.waitForHTTP("http://"+cluster.volumeHTTPAddress()+"/status", 30*time.Second), "wait for volume\n%s", cluster.tailLog("volume.log"))
require.NoError(t, cluster.waitForTCP(cluster.volumeGRPCAddress(), 30*time.Second), "wait for volume grpc\n%s", cluster.tailLog("volume.log"))
for i := 0; i < 2; i++ {
require.NoError(t, cluster.startFiler(i), "start filer %d", i)
require.NoError(t, cluster.waitForTCP(cluster.filerGRPCAddress(i), 30*time.Second), "wait for filer %d grpc\n%s", i, cluster.tailLog(fmt.Sprintf("filer%d.log", i)))
}
require.NoError(t, cluster.waitForFilerCount(2, 30*time.Second), "wait for filer group registration")
for i := 0; i < 2; i++ {
require.NoError(t, cluster.startS3(i), "start s3 %d", i)
client := cluster.newS3Client(t, cluster.s3Endpoint(i))
require.NoError(t, cluster.waitForS3Ready(client, 30*time.Second), "wait for s3 %d\n%s", i, cluster.tailLog(fmt.Sprintf("s3-%d.log", i)))
}
return cluster
}
func (c *distributedLockCluster) Stop() {
if c == nil {
return
}
c.cleanupOnce.Do(func() {
for i := len(c.s3Cmds) - 1; i >= 0; i-- {
stopProcess(c.s3Cmds[i])
}
for i := len(c.filerCmds) - 1; i >= 0; i-- {
stopProcess(c.filerCmds[i])
}
stopProcess(c.volumeCmd)
stopProcess(c.masterCmd)
for _, f := range c.logFiles {
_ = f.Close()
}
if !c.keepLogs && !c.t.Failed() {
_ = os.RemoveAll(c.baseDir)
} else if c.baseDir != "" {
c.t.Logf("distributed lock integration logs kept at %s", c.baseDir)
}
})
}
func (c *distributedLockCluster) masterHTTPAddress() string {
return net.JoinHostPort("127.0.0.1", strconv.Itoa(c.masterPort))
}
func (c *distributedLockCluster) masterGRPCAddress() string {
return net.JoinHostPort("127.0.0.1", strconv.Itoa(c.masterGrpcPort))
}
func (c *distributedLockCluster) volumeHTTPAddress() string {
return net.JoinHostPort("127.0.0.1", strconv.Itoa(c.volumePort))
}
func (c *distributedLockCluster) volumeGRPCAddress() string {
return net.JoinHostPort("127.0.0.1", strconv.Itoa(c.volumeGrpcPort))
}
func (c *distributedLockCluster) filerServerAddress(index int) pb.ServerAddress {
return pb.NewServerAddress("127.0.0.1", c.filerPorts[index], c.filerGrpcPorts[index])
}
func (c *distributedLockCluster) filerGRPCAddress(index int) string {
return net.JoinHostPort("127.0.0.1", strconv.Itoa(c.filerGrpcPorts[index]))
}
func (c *distributedLockCluster) s3Endpoint(index int) string {
return fmt.Sprintf("http://127.0.0.1:%d", c.s3Ports[index])
}
func (c *distributedLockCluster) startMaster() error {
logFile, err := c.openLog("master.log")
if err != nil {
return err
}
args := []string{
"-config_dir=" + c.configDir,
"master",
"-ip=127.0.0.1",
"-ip.bind=127.0.0.1",
"-port=" + strconv.Itoa(c.masterPort),
"-port.grpc=" + strconv.Itoa(c.masterGrpcPort),
"-mdir=" + filepath.Join(c.baseDir, "master"),
"-peers=none",
"-volumeSizeLimitMB=32",
"-defaultReplication=000",
}
c.masterCmd = exec.Command(c.weedBinary, args...)
c.masterCmd.Dir = c.baseDir
c.masterCmd.Stdout = logFile
c.masterCmd.Stderr = logFile
return c.masterCmd.Start()
}
func (c *distributedLockCluster) startVolume() error {
logFile, err := c.openLog("volume.log")
if err != nil {
return err
}
masterAddress := string(pb.NewServerAddress("127.0.0.1", c.masterPort, c.masterGrpcPort))
args := []string{
"-config_dir=" + c.configDir,
"volume",
"-ip=127.0.0.1",
"-ip.bind=127.0.0.1",
"-port=" + strconv.Itoa(c.volumePort),
"-port.grpc=" + strconv.Itoa(c.volumeGrpcPort),
"-dir=" + filepath.Join(c.baseDir, "volume"),
"-max=16",
"-master=" + masterAddress,
"-readMode=proxy",
}
c.volumeCmd = exec.Command(c.weedBinary, args...)
c.volumeCmd.Dir = c.baseDir
c.volumeCmd.Stdout = logFile
c.volumeCmd.Stderr = logFile
return c.volumeCmd.Start()
}
func (c *distributedLockCluster) startFiler(index int) error {
logFile, err := c.openLog(fmt.Sprintf("filer%d.log", index))
if err != nil {
return err
}
masterAddress := string(pb.NewServerAddress("127.0.0.1", c.masterPort, c.masterGrpcPort))
args := []string{
"-config_dir=" + c.configDir,
"filer",
"-master=" + masterAddress,
"-filerGroup=" + c.filerGroup,
"-ip=127.0.0.1",
"-ip.bind=127.0.0.1",
"-port=" + strconv.Itoa(c.filerPorts[index]),
"-port.grpc=" + strconv.Itoa(c.filerGrpcPorts[index]),
"-defaultStoreDir=" + filepath.Join(c.baseDir, fmt.Sprintf("filer%d", index)),
}
cmd := exec.Command(c.weedBinary, args...)
cmd.Dir = c.baseDir
cmd.Stdout = logFile
cmd.Stderr = logFile
if err := cmd.Start(); err != nil {
return err
}
c.filerCmds = append(c.filerCmds, cmd)
return nil
}
func (c *distributedLockCluster) startS3(index int) error {
logFile, err := c.openLog(fmt.Sprintf("s3-%d.log", index))
if err != nil {
return err
}
filers := []string{string(c.filerServerAddress(0)), string(c.filerServerAddress(1))}
if index%2 == 1 {
filers[0], filers[1] = filers[1], filers[0]
}
args := []string{
"-config_dir=" + c.configDir,
"s3",
"-ip.bind=127.0.0.1",
"-port=" + strconv.Itoa(c.s3Ports[index]),
"-port.grpc=" + strconv.Itoa(c.s3GrpcPorts[index]),
"-port.iceberg=0",
"-filer=" + strings.Join(filers, ","),
"-config=" + c.s3Config,
"-iam.readOnly=false",
}
cmd := exec.Command(c.weedBinary, args...)
cmd.Dir = c.baseDir
cmd.Stdout = logFile
cmd.Stderr = logFile
if err := cmd.Start(); err != nil {
return err
}
c.s3Cmds = append(c.s3Cmds, cmd)
return nil
}
func (c *distributedLockCluster) writeSecurityConfig() error {
return os.WriteFile(filepath.Join(c.configDir, "security.toml"), []byte("# generated for distributed lock integration tests\n"), 0o644)
}
func (c *distributedLockCluster) writeS3Config() error {
configPath := filepath.Join(c.configDir, "s3.json")
payload := s3IdentityConfig{
Identities: []s3Identity{
{
Name: "distributed-lock-admin",
Credentials: []s3Credential{
{
AccessKey: distributedLockTestAccessKey,
SecretKey: distributedLockTestSecretKey,
},
},
Actions: []string{"Admin", "Read", "List", "Tagging", "Write"},
},
},
}
data, err := json.MarshalIndent(payload, "", " ")
if err != nil {
return err
}
if err := os.WriteFile(configPath, data, 0o644); err != nil {
return err
}
c.s3Config = configPath
return nil
}
func (c *distributedLockCluster) newS3Client(t testing.TB, endpoint string) *s3.Client {
t.Helper()
cfg, err := config.LoadDefaultConfig(context.Background(),
config.WithRegion(distributedLockTestRegion),
config.WithRetryMaxAttempts(1),
config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(
distributedLockTestAccessKey,
distributedLockTestSecretKey,
"",
)),
)
require.NoError(t, err, "load aws config")
return s3.NewFromConfig(cfg, func(o *s3.Options) {
o.BaseEndpoint = aws.String(endpoint)
o.UsePathStyle = true
})
}
func (c *distributedLockCluster) waitForS3Ready(client *s3.Client, timeout time.Duration) error {
deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
_, err := client.ListBuckets(ctx, &s3.ListBucketsInput{})
cancel()
if err == nil {
return nil
}
time.Sleep(200 * time.Millisecond)
}
return fmt.Errorf("timed out waiting for s3 readiness")
}
func (c *distributedLockCluster) waitForFilerCount(expected int, timeout time.Duration) error {
conn, err := grpc.NewClient(c.masterGRPCAddress(), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return err
}
defer conn.Close()
client := master_pb.NewSeaweedClient(conn)
deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
resp, err := client.ListClusterNodes(ctx, &master_pb.ListClusterNodesRequest{
ClientType: "filer",
FilerGroup: c.filerGroup,
})
cancel()
if err == nil && len(resp.ClusterNodes) >= expected {
return nil
}
time.Sleep(200 * time.Millisecond)
}
return fmt.Errorf("timed out waiting for %d filers in group %q", expected, c.filerGroup)
}
func (c *distributedLockCluster) waitForHTTP(url string, timeout time.Duration) error {
client := &net.Dialer{Timeout: time.Second}
httpClient := &httpClientWithDialer{dialer: client}
deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) {
if err := httpClient.Get(url); err == nil {
return nil
}
time.Sleep(200 * time.Millisecond)
}
return fmt.Errorf("timed out waiting for http %s", url)
}
func (c *distributedLockCluster) waitForTCP(addr string, timeout time.Duration) error {
deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) {
conn, err := net.DialTimeout("tcp", addr, time.Second)
if err == nil {
_ = conn.Close()
return nil
}
time.Sleep(200 * time.Millisecond)
}
return fmt.Errorf("timed out waiting for tcp %s", addr)
}
func (c *distributedLockCluster) openLog(name string) (*os.File, error) {
f, err := os.Create(filepath.Join(c.logsDir, name))
if err != nil {
return nil, err
}
c.logFiles = append(c.logFiles, f)
return f, nil
}
func (c *distributedLockCluster) tailLog(name string) string {
f, err := os.Open(filepath.Join(c.logsDir, name))
if err != nil {
return ""
}
defer f.Close()
scanner := bufio.NewScanner(f)
lines := make([]string, 0, 40)
for scanner.Scan() {
lines = append(lines, scanner.Text())
if len(lines) > 40 {
lines = lines[1:]
}
}
return strings.Join(lines, "\n")
}
func allocatePorts(count int) ([]int, error) {
listeners := make([]net.Listener, 0, count)
ports := make([]int, 0, count)
for i := 0; i < count; i++ {
l, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
for _, openListener := range listeners {
_ = openListener.Close()
}
return nil, err
}
listeners = append(listeners, l)
ports = append(ports, l.Addr().(*net.TCPAddr).Port)
}
for _, l := range listeners {
_ = l.Close()
}
return ports, nil
}
func stopProcess(cmd *exec.Cmd) {
if cmd == nil || cmd.Process == nil {
return
}
_ = cmd.Process.Signal(os.Interrupt)
done := make(chan error, 1)
go func() {
done <- cmd.Wait()
}()
select {
case <-done:
case <-time.After(10 * time.Second):
_ = cmd.Process.Kill()
<-done
}
}
type httpClientWithDialer struct {
dialer *net.Dialer
}
func (h *httpClientWithDialer) Get(url string) error {
client := &http.Client{
Timeout: time.Second,
Transport: &http.Transport{
DialContext: h.dialer.DialContext,
},
}
resp, err := client.Get(url)
if err != nil {
return err
}
_ = resp.Body.Close()
return nil
}

181
test/s3/distributed_lock/distributed_lock_test.go

@ -0,0 +1,181 @@
package distributed_lock
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"sort"
"strings"
"sync"
"testing"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/smithy-go"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
"github.com/seaweedfs/seaweedfs/weed/util"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestConditionalPutIfNoneMatchDistributedLockAcrossS3Gateways(t *testing.T) {
if testing.Short() {
t.Skip("skipping distributed lock integration test in short mode")
}
cluster := startDistributedLockCluster(t)
clientA := cluster.newS3Client(t, cluster.s3Endpoint(0))
clientB := cluster.newS3Client(t, cluster.s3Endpoint(1))
bucket := fmt.Sprintf("distributed-lock-%d", time.Now().UnixNano())
_, err := clientA.CreateBucket(context.Background(), &s3.CreateBucketInput{
Bucket: aws.String(bucket),
})
require.NoError(t, err)
require.Eventually(t, func() bool {
_, err := clientB.HeadBucket(context.Background(), &s3.HeadBucketInput{
Bucket: aws.String(bucket),
})
return err == nil
}, 30*time.Second, 200*time.Millisecond, "bucket should replicate to the second filer-backed gateway")
keysByOwner := cluster.findLockOwnerKeys(bucket, "conditional-put")
require.Len(t, keysByOwner, len(cluster.filerPorts), "should exercise both filer lock owners")
for owner, key := range keysByOwner {
owner := owner
key := key
t.Run(lockOwnerLabel(owner), func(t *testing.T) {
runConditionalPutRace(t, []s3RaceClient{
{name: "s3-a", client: clientA},
{name: "s3-b", client: clientB},
}, bucket, key)
})
}
}
type s3RaceClient struct {
name string
client *s3.Client
}
type putAttemptResult struct {
clientName string
body string
err error
}
func runConditionalPutRace(t *testing.T, clients []s3RaceClient, bucket, key string) {
t.Helper()
start := make(chan struct{})
results := make(chan putAttemptResult, len(clients)*2)
var wg sync.WaitGroup
for _, client := range clients {
for attempt := 0; attempt < 2; attempt++ {
wg.Add(1)
body := fmt.Sprintf("%s-attempt-%d", client.name, attempt)
go func(client s3RaceClient, body string) {
defer wg.Done()
<-start
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
_, err := client.client.PutObject(ctx, &s3.PutObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(key),
IfNoneMatch: aws.String("*"),
Body: bytes.NewReader([]byte(body)),
})
results <- putAttemptResult{
clientName: client.name,
body: body,
err: err,
}
}(client, body)
}
}
close(start)
wg.Wait()
close(results)
successes := 0
preconditionFailures := 0
winnerBody := ""
unexpectedErrors := make([]string, 0)
for result := range results {
if result.err == nil {
successes++
winnerBody = result.body
continue
}
if isPreconditionFailed(result.err) {
preconditionFailures++
continue
}
unexpectedErrors = append(unexpectedErrors, fmt.Sprintf("%s: %v", result.clientName, result.err))
}
require.Empty(t, unexpectedErrors, "unexpected race errors")
require.Equal(t, 1, successes, "exactly one write should win")
require.Equal(t, len(clients)*2-1, preconditionFailures, "all losing writes should fail with 412")
object, err := clients[0].client.GetObject(context.Background(), &s3.GetObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(key),
})
require.NoError(t, err)
defer object.Body.Close()
data, err := io.ReadAll(object.Body)
require.NoError(t, err)
assert.Equal(t, winnerBody, string(data), "stored object body should match the successful request")
}
func isPreconditionFailed(err error) bool {
var apiErr smithy.APIError
return errors.As(err, &apiErr) && apiErr.ErrorCode() == "PreconditionFailed"
}
func (c *distributedLockCluster) findLockOwnerKeys(bucket, prefix string) map[pb.ServerAddress]string {
owners := make([]pb.ServerAddress, 0, len(c.filerPorts))
for i := range c.filerPorts {
owners = append(owners, c.filerServerAddress(i))
}
sort.Slice(owners, func(i, j int) bool {
return owners[i] < owners[j]
})
keysByOwner := make(map[pb.ServerAddress]string, len(owners))
for i := 0; i < 1024 && len(keysByOwner) < len(owners); i++ {
key := fmt.Sprintf("%s-%03d.txt", prefix, i)
lockOwner := ownerForObjectLock(bucket, key, owners)
if _, exists := keysByOwner[lockOwner]; !exists {
keysByOwner[lockOwner] = key
}
}
return keysByOwner
}
func ownerForObjectLock(bucket, object string, owners []pb.ServerAddress) pb.ServerAddress {
lockKey := fmt.Sprintf("s3.object.write:/buckets/%s/%s", bucket, s3_constants.NormalizeObjectKey(object))
hash := util.HashStringToLong(lockKey)
if hash < 0 {
hash = -hash
}
return owners[hash%int64(len(owners))]
}
func lockOwnerLabel(owner pb.ServerAddress) string {
replacer := strings.NewReplacer(":", "_", ".", "_")
return "owner_" + replacer.Replace(string(owner))
}

147
test/s3/versioning/s3_copy_versioning_regression_test.go

@ -0,0 +1,147 @@
package s3api
import (
"bytes"
"context"
"fmt"
"io"
"net/url"
"testing"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/s3/types"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func versioningCopySource(bucketName, key string) string {
return fmt.Sprintf("%s/%s", bucketName, url.PathEscape(key))
}
func suspendVersioning(t *testing.T, client *s3.Client, bucketName string) {
_, err := client.PutBucketVersioning(context.TODO(), &s3.PutBucketVersioningInput{
Bucket: aws.String(bucketName),
VersioningConfiguration: &types.VersioningConfiguration{
Status: types.BucketVersioningStatusSuspended,
},
})
require.NoError(t, err)
}
func TestVersioningSelfCopyMetadataReplaceCreatesNewVersion(t *testing.T) {
client := getS3Client(t)
bucketName := getNewBucketName()
createBucket(t, client, bucketName)
defer deleteBucket(t, client, bucketName)
enableVersioning(t, client, bucketName)
checkVersioningStatus(t, client, bucketName, types.BucketVersioningStatusEnabled)
objectKey := "self-copy-versioned.txt"
initialContent := []byte("copy me without changing the body")
putResp, err := client.PutObject(context.TODO(), &s3.PutObjectInput{
Bucket: aws.String(bucketName),
Key: aws.String(objectKey),
Body: bytes.NewReader(initialContent),
Metadata: map[string]string{"stage": "one"},
})
require.NoError(t, err)
require.NotNil(t, putResp.VersionId)
copyResp, err := client.CopyObject(context.TODO(), &s3.CopyObjectInput{
Bucket: aws.String(bucketName),
Key: aws.String(objectKey),
CopySource: aws.String(versioningCopySource(bucketName, objectKey)),
Metadata: map[string]string{"stage": "two"},
MetadataDirective: types.MetadataDirectiveReplace,
})
require.NoError(t, err, "Self-copy with metadata replacement should succeed")
require.NotNil(t, copyResp.VersionId, "Versioned self-copy should create a new version")
require.NotEqual(t, *putResp.VersionId, *copyResp.VersionId, "Self-copy should create a distinct version")
headLatestResp, err := client.HeadObject(context.TODO(), &s3.HeadObjectInput{
Bucket: aws.String(bucketName),
Key: aws.String(objectKey),
})
require.NoError(t, err)
assert.Equal(t, "two", headLatestResp.Metadata["stage"], "Latest version should expose replaced metadata")
headOriginalResp, err := client.HeadObject(context.TODO(), &s3.HeadObjectInput{
Bucket: aws.String(bucketName),
Key: aws.String(objectKey),
VersionId: putResp.VersionId,
})
require.NoError(t, err)
assert.Equal(t, "one", headOriginalResp.Metadata["stage"], "Previous version metadata should remain intact")
getResp, err := client.GetObject(context.TODO(), &s3.GetObjectInput{
Bucket: aws.String(bucketName),
Key: aws.String(objectKey),
})
require.NoError(t, err)
defer getResp.Body.Close()
body, err := io.ReadAll(getResp.Body)
require.NoError(t, err)
assert.Equal(t, initialContent, body, "Self-copy should not alter the object body")
versionsResp, err := client.ListObjectVersions(context.TODO(), &s3.ListObjectVersionsInput{
Bucket: aws.String(bucketName),
Prefix: aws.String(objectKey),
})
require.NoError(t, err)
require.Len(t, versionsResp.Versions, 2, "Self-copy should append a new current version")
assert.Equal(t, *copyResp.VersionId, *versionsResp.Versions[0].VersionId, "New copy version should be latest")
}
func TestVersioningSelfCopyMetadataReplaceSuspendedKeepsNullVersion(t *testing.T) {
client := getS3Client(t)
bucketName := getNewBucketName()
createBucket(t, client, bucketName)
defer deleteBucket(t, client, bucketName)
enableVersioning(t, client, bucketName)
suspendVersioning(t, client, bucketName)
checkVersioningStatus(t, client, bucketName, types.BucketVersioningStatusSuspended)
objectKey := "self-copy-suspended.txt"
initialContent := []byte("null version content")
_, err := client.PutObject(context.TODO(), &s3.PutObjectInput{
Bucket: aws.String(bucketName),
Key: aws.String(objectKey),
Body: bytes.NewReader(initialContent),
Metadata: map[string]string{"stage": "one"},
})
require.NoError(t, err)
copyResp, err := client.CopyObject(context.TODO(), &s3.CopyObjectInput{
Bucket: aws.String(bucketName),
Key: aws.String(objectKey),
CopySource: aws.String(versioningCopySource(bucketName, objectKey)),
Metadata: map[string]string{"stage": "two"},
MetadataDirective: types.MetadataDirectiveReplace,
})
require.NoError(t, err, "Suspended self-copy with metadata replacement should succeed")
assert.Nil(t, copyResp.VersionId, "Suspended versioning should not return a version header for the current null version")
headResp, err := client.HeadObject(context.TODO(), &s3.HeadObjectInput{
Bucket: aws.String(bucketName),
Key: aws.String(objectKey),
})
require.NoError(t, err)
assert.Equal(t, "two", headResp.Metadata["stage"], "Null current version should be updated in place")
versionsResp, err := client.ListObjectVersions(context.TODO(), &s3.ListObjectVersionsInput{
Bucket: aws.String(bucketName),
Prefix: aws.String(objectKey),
})
require.NoError(t, err)
require.Len(t, versionsResp.Versions, 1, "Suspended self-copy should keep a single null current version")
require.NotNil(t, versionsResp.Versions[0].VersionId)
assert.Equal(t, "null", *versionsResp.Versions[0].VersionId, "Suspended self-copy should preserve null-version semantics")
assert.True(t, *versionsResp.Versions[0].IsLatest, "Null version should remain latest")
}

82
test/s3/versioning/s3_suspended_delete_marker_regression_test.go

@ -0,0 +1,82 @@
package s3api
import (
"bytes"
"context"
"io"
"testing"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestSuspendedDeleteCreatesDeleteMarker(t *testing.T) {
client := getS3Client(t)
bucketName := getNewBucketName()
createBucket(t, client, bucketName)
defer deleteBucket(t, client, bucketName)
enableVersioning(t, client, bucketName)
objectKey := "suspended-delete-marker.txt"
versionedResp, err := client.PutObject(context.TODO(), &s3.PutObjectInput{
Bucket: aws.String(bucketName),
Key: aws.String(objectKey),
Body: bytes.NewReader([]byte("versioned-content")),
})
require.NoError(t, err)
require.NotNil(t, versionedResp.VersionId)
suspendVersioning(t, client, bucketName)
_, err = client.PutObject(context.TODO(), &s3.PutObjectInput{
Bucket: aws.String(bucketName),
Key: aws.String(objectKey),
Body: bytes.NewReader([]byte("null-version-content")),
})
require.NoError(t, err)
deleteResp, err := client.DeleteObject(context.TODO(), &s3.DeleteObjectInput{
Bucket: aws.String(bucketName),
Key: aws.String(objectKey),
})
require.NoError(t, err)
require.NotNil(t, deleteResp.DeleteMarker)
assert.True(t, *deleteResp.DeleteMarker)
require.NotNil(t, deleteResp.VersionId)
listResp, err := client.ListObjectVersions(context.TODO(), &s3.ListObjectVersionsInput{
Bucket: aws.String(bucketName),
})
require.NoError(t, err)
require.Len(t, listResp.DeleteMarkers, 1)
deleteMarker := listResp.DeleteMarkers[0]
require.NotNil(t, deleteMarker.Key)
assert.Equal(t, objectKey, *deleteMarker.Key)
require.NotNil(t, deleteMarker.VersionId)
assert.Equal(t, *deleteResp.VersionId, *deleteMarker.VersionId)
require.NotNil(t, deleteMarker.IsLatest)
assert.True(t, *deleteMarker.IsLatest)
_, err = client.GetObject(context.TODO(), &s3.GetObjectInput{
Bucket: aws.String(bucketName),
Key: aws.String(objectKey),
})
require.Error(t, err)
getVersionedResp, err := client.GetObject(context.TODO(), &s3.GetObjectInput{
Bucket: aws.String(bucketName),
Key: aws.String(objectKey),
VersionId: versionedResp.VersionId,
})
require.NoError(t, err)
defer getVersionedResp.Body.Close()
body, err := io.ReadAll(getVersionedResp.Body)
require.NoError(t, err)
assert.Equal(t, "versioned-content", string(body))
}

79
test/s3/versioning/s3_versioning_multipart_test.go

@ -499,12 +499,16 @@ func TestMultipartUploadDeleteMarkerListBehavior(t *testing.T) {
t.Logf("Successfully retrieved version %s after delete marker", multipartVersionId)
// Delete the delete marker to "undelete" the object
_, err = client.DeleteObject(context.TODO(), &s3.DeleteObjectInput{
undeleteResp, err := client.DeleteObject(context.TODO(), &s3.DeleteObjectInput{
Bucket: aws.String(bucketName),
Key: aws.String(objectKey),
VersionId: aws.String(deleteMarkerVersionId),
})
require.NoError(t, err, "Failed to delete the delete marker")
require.NotNil(t, undeleteResp.DeleteMarker, "Deleting a delete marker version should report DeleteMarker=true")
assert.True(t, *undeleteResp.DeleteMarker, "Deleting a delete marker version should report DeleteMarker=true")
require.NotNil(t, undeleteResp.VersionId, "Deleting a delete marker version should echo the version ID")
assert.Equal(t, deleteMarkerVersionId, *undeleteResp.VersionId, "DeleteObject should report the deleted delete-marker version ID")
// ListObjectsV2 should show the object again
listAfterUndelete, err := client.ListObjectsV2(context.TODO(), &s3.ListObjectsV2Input{
@ -518,3 +522,76 @@ func TestMultipartUploadDeleteMarkerListBehavior(t *testing.T) {
t.Logf("Object restored after delete marker removal, ETag=%s", multipartETag)
}
func TestVersioningCompleteMultipartUploadIsIdempotent(t *testing.T) {
client := getS3Client(t)
bucketName := getNewBucketName()
createBucket(t, client, bucketName)
defer deleteBucket(t, client, bucketName)
enableVersioning(t, client, bucketName)
checkVersioningStatus(t, client, bucketName, types.BucketVersioningStatusEnabled)
objectKey := "multipart-idempotent-object"
createResp, err := client.CreateMultipartUpload(context.TODO(), &s3.CreateMultipartUploadInput{
Bucket: aws.String(bucketName),
Key: aws.String(objectKey),
})
require.NoError(t, err, "Failed to create multipart upload")
partSize := 5 * 1024 * 1024
part1Data := bytes.Repeat([]byte("i"), partSize)
part2Data := bytes.Repeat([]byte("j"), partSize)
uploadPart1Resp, err := client.UploadPart(context.TODO(), &s3.UploadPartInput{
Bucket: aws.String(bucketName),
Key: aws.String(objectKey),
UploadId: createResp.UploadId,
PartNumber: aws.Int32(1),
Body: bytes.NewReader(part1Data),
})
require.NoError(t, err, "Failed to upload first part")
uploadPart2Resp, err := client.UploadPart(context.TODO(), &s3.UploadPartInput{
Bucket: aws.String(bucketName),
Key: aws.String(objectKey),
UploadId: createResp.UploadId,
PartNumber: aws.Int32(2),
Body: bytes.NewReader(part2Data),
})
require.NoError(t, err, "Failed to upload second part")
completeInput := &s3.CompleteMultipartUploadInput{
Bucket: aws.String(bucketName),
Key: aws.String(objectKey),
UploadId: createResp.UploadId,
MultipartUpload: &types.CompletedMultipartUpload{
Parts: []types.CompletedPart{
{ETag: uploadPart1Resp.ETag, PartNumber: aws.Int32(1)},
{ETag: uploadPart2Resp.ETag, PartNumber: aws.Int32(2)},
},
},
}
firstCompleteResp, err := client.CompleteMultipartUpload(context.TODO(), completeInput)
require.NoError(t, err, "First CompleteMultipartUpload should succeed")
require.NotNil(t, firstCompleteResp.ETag)
require.NotNil(t, firstCompleteResp.VersionId)
secondCompleteResp, err := client.CompleteMultipartUpload(context.TODO(), completeInput)
require.NoError(t, err, "Repeated CompleteMultipartUpload should return the existing object instead of creating a second version")
require.NotNil(t, secondCompleteResp.ETag)
require.NotNil(t, secondCompleteResp.VersionId, "Repeated complete should report the existing version ID")
assert.Equal(t, *firstCompleteResp.ETag, *secondCompleteResp.ETag, "Repeated complete should report the same ETag")
assert.Equal(t, *firstCompleteResp.VersionId, *secondCompleteResp.VersionId, "Repeated complete should report the same version ID")
versionsResp, err := client.ListObjectVersions(context.TODO(), &s3.ListObjectVersionsInput{
Bucket: aws.String(bucketName),
Prefix: aws.String(objectKey),
})
require.NoError(t, err, "Failed to list object versions")
require.Len(t, versionsResp.Versions, 1, "Repeated completion must not create a duplicate version")
assert.Equal(t, *firstCompleteResp.VersionId, *versionsResp.Versions[0].VersionId, "The original multipart version should remain current")
assert.Empty(t, versionsResp.DeleteMarkers, "Repeated completion should not create delete markers")
}

510
weed/s3api/filer_multipart.go

@ -14,7 +14,6 @@ import (
"net/url"
"path"
"slices"
"sort"
"strconv"
"strings"
"time"
@ -166,63 +165,73 @@ func copySSEHeadersFromFirstPart(dst *filer_pb.Entry, firstPart *filer_pb.Entry,
}
}
func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.CompleteMultipartUploadInput, parts *CompleteMultipartUpload) (output *CompleteMultipartUploadResult, code s3err.ErrorCode) {
type multipartPartBoundary struct {
PartNumber int `json:"part"`
StartChunk int `json:"start"`
EndChunk int `json:"end"`
ETag string `json:"etag"`
}
glog.V(2).Infof("completeMultipartUpload input %v", input)
if len(parts.Parts) == 0 {
stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedNoSuchUpload).Inc()
return nil, s3err.ErrNoSuchUpload
}
completedPartNumbers := []int{}
completedPartMap := make(map[int][]string)
type multipartCompletionState struct {
deleteEntries []*filer_pb.Entry
partEntries map[int][]*filer_pb.Entry
pentry *filer_pb.Entry
mime string
finalParts []*filer_pb.FileChunk
offset int64
partBoundaries []multipartPartBoundary
multipartETag string
entityWithTtl bool
}
maxPartNo := 1
func completeMultipartResult(r *http.Request, input *s3.CompleteMultipartUploadInput, etag string, entry *filer_pb.Entry) *CompleteMultipartUploadResult {
result := &CompleteMultipartUploadResult{
Location: aws.String(fmt.Sprintf("%s://%s/%s/%s", getRequestScheme(r), r.Host, url.PathEscape(*input.Bucket), urlPathEscape(*input.Key))),
Bucket: input.Bucket,
ETag: aws.String(etag),
Key: objectKey(input.Key),
}
if entry != nil && entry.Extended != nil {
if versionIdBytes, ok := entry.Extended[s3_constants.ExtVersionIdKey]; ok {
versionId := string(versionIdBytes)
if versionId != "" && versionId != "null" {
result.VersionId = aws.String(versionId)
}
}
}
return result
}
for _, part := range parts.Parts {
if _, ok := completedPartMap[part.PartNumber]; !ok {
completedPartNumbers = append(completedPartNumbers, part.PartNumber)
func (s3a *S3ApiServer) prepareMultipartCompletionState(r *http.Request, input *s3.CompleteMultipartUploadInput, uploadDirectory, entryName, dirName string, completedPartNumbers []int, completedPartMap map[int][]string, maxPartNo int) (*multipartCompletionState, *CompleteMultipartUploadResult, s3err.ErrorCode) {
if entry, err := s3a.resolveObjectEntry(*input.Bucket, *input.Key); err == nil && entry != nil && entry.Extended != nil {
if uploadId, ok := entry.Extended[s3_constants.SeaweedFSUploadId]; ok && *input.UploadId == string(uploadId) {
cleanupEntries, _, cleanupErr := s3a.list(uploadDirectory, "", "", false, s3_constants.MaxS3MultipartParts+1)
if cleanupErr != nil && !errors.Is(cleanupErr, filer_pb.ErrNotFound) {
glog.Warningf("completeMultipartUpload: failed to list stale upload directory %s for cleanup: %v", uploadDirectory, cleanupErr)
}
return &multipartCompletionState{deleteEntries: cleanupEntries}, completeMultipartResult(r, input, getEtagFromEntry(entry), entry), s3err.ErrNone
}
completedPartMap[part.PartNumber] = append(completedPartMap[part.PartNumber], part.ETag)
maxPartNo = maxInt(maxPartNo, part.PartNumber)
}
sort.Ints(completedPartNumbers)
uploadDirectory := s3a.genUploadsFolder(*input.Bucket) + "/" + *input.UploadId
// Use explicit limit to ensure all parts are listed (up to S3's max of 10,000 parts)
// Previously limit=0 relied on server's DirListingLimit default (1000 in weed server mode),
// which caused CompleteMultipartUpload to fail for uploads with more than 1000 parts.
entries, _, err := s3a.list(uploadDirectory, "", "", false, s3_constants.MaxS3MultipartParts+1)
if err != nil {
glog.Errorf("completeMultipartUpload %s %s error: %v, entries:%d", *input.Bucket, *input.UploadId, err, len(entries))
glog.Errorf("completeMultipartUpload %s %s error: %v", *input.Bucket, *input.UploadId, err)
stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedNoSuchUpload).Inc()
return nil, s3err.ErrNoSuchUpload
return nil, nil, s3err.ErrNoSuchUpload
}
if len(entries) == 0 {
entryName, dirName := s3a.getEntryNameAndDir(input)
if entry, _ := s3a.getEntry(dirName, entryName); entry != nil && entry.Extended != nil {
if uploadId, ok := entry.Extended[s3_constants.SeaweedFSUploadId]; ok && *input.UploadId == string(uploadId) {
// Location uses the S3 endpoint that the client connected to
// Format: scheme://s3-endpoint/bucket/object (following AWS S3 API)
return &CompleteMultipartUploadResult{
Location: aws.String(fmt.Sprintf("%s://%s/%s/%s", getRequestScheme(r), r.Host, url.PathEscape(*input.Bucket), urlPathEscape(*input.Key))),
Bucket: input.Bucket,
ETag: aws.String(getEtagFromEntry(entry)),
Key: objectKey(input.Key),
}, s3err.ErrNone
}
}
stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedNoSuchUpload).Inc()
return nil, s3err.ErrNoSuchUpload
return nil, nil, s3err.ErrNoSuchUpload
}
pentry, err := s3a.getEntry(s3a.genUploadsFolder(*input.Bucket), *input.UploadId)
if err != nil {
glog.Errorf("completeMultipartUpload %s %s error: %v", *input.Bucket, *input.UploadId, err)
stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedNoSuchUpload).Inc()
return nil, s3err.ErrNoSuchUpload
return nil, nil, s3err.ErrNoSuchUpload
}
deleteEntries := []*filer_pb.Entry{}
deleteEntries := make([]*filer_pb.Entry, 0)
partEntries := make(map[int][]*filer_pb.Entry, len(entries))
entityTooSmall := false
entityWithTtl := false
@ -232,10 +241,10 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl
if entry.IsDirectory || !strings.HasSuffix(entry.Name, multipartExt) {
continue
}
partNumber, err := parsePartNumber(entry.Name)
if err != nil {
partNumber, parseErr := parsePartNumber(entry.Name)
if parseErr != nil {
stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedPartNumber).Inc()
glog.Errorf("completeMultipartUpload failed to pasre partNumber %s:%s", entry.Name, err)
glog.Errorf("completeMultipartUpload failed to parse partNumber %s:%s", entry.Name, parseErr)
continue
}
completedPartsByNumber, ok := completedPartMap[partNumber]
@ -259,7 +268,6 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl
stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedPartEmpty).Inc()
continue
}
//there maybe multi same part, because of client retry
partEntries[partNumber] = append(partEntries[partNumber], entry)
foundEntry = true
}
@ -278,27 +286,23 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl
}
if entityTooSmall {
stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompleteEntityTooSmall).Inc()
return nil, s3err.ErrEntityTooSmall
return nil, nil, s3err.ErrEntityTooSmall
}
mime := pentry.Attributes.Mime
var finalParts []*filer_pb.FileChunk
var offset int64
// Track part boundaries for later retrieval with PartNumber parameter
type PartBoundary struct {
PartNumber int `json:"part"`
StartChunk int `json:"start"`
EndChunk int `json:"end"` // exclusive
ETag string `json:"etag"`
mime := ""
if pentry.Attributes != nil {
mime = pentry.Attributes.Mime
}
var partBoundaries []PartBoundary
finalParts := make([]*filer_pb.FileChunk, 0)
partBoundaries := make([]multipartPartBoundary, 0, len(completedPartNumbers))
var offset int64
for _, partNumber := range completedPartNumbers {
partEntriesByNumber, ok := partEntries[partNumber]
if !ok {
glog.Errorf("part %d has no entry", partNumber)
stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedPartNotFound).Inc()
return nil, s3err.ErrInvalidPart
return nil, nil, s3err.ErrInvalidPart
}
found := false
@ -312,20 +316,11 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl
continue
}
// Record the start chunk index for this part
partStartChunk := len(finalParts)
// Calculate the part's ETag (for GetObject with PartNumber)
partETag := filer.ETag(entry)
for _, chunk := range entry.GetChunks() {
// CRITICAL: Do NOT modify SSE metadata offsets during assembly!
// The encrypted data was created with the offset stored in chunk.SseMetadata.
// Changing the offset here would cause decryption to fail because CTR mode
// uses the offset to initialize the counter. We must decrypt with the same
// offset that was used during encryption.
p := &filer_pb.FileChunk{
finalParts = append(finalParts, &filer_pb.FileChunk{
FileId: chunk.GetFileIdString(),
Offset: offset,
Size: chunk.Size,
@ -333,17 +328,14 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl
CipherKey: chunk.CipherKey,
ETag: chunk.ETag,
IsCompressed: chunk.IsCompressed,
// Preserve SSE metadata UNCHANGED - do not modify the offset!
SseType: chunk.SseType,
SseMetadata: chunk.SseMetadata,
}
finalParts = append(finalParts, p)
SseType: chunk.SseType,
SseMetadata: chunk.SseMetadata,
})
offset += int64(chunk.Size)
}
// Record the part boundary
partEndChunk := len(finalParts)
partBoundaries = append(partBoundaries, PartBoundary{
partBoundaries = append(partBoundaries, multipartPartBoundary{
PartNumber: partNumber,
StartChunk: partStartChunk,
EndChunk: partEndChunk,
@ -354,165 +346,218 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl
}
}
return &multipartCompletionState{
deleteEntries: deleteEntries,
partEntries: partEntries,
pentry: pentry,
mime: mime,
finalParts: finalParts,
offset: offset,
partBoundaries: partBoundaries,
multipartETag: calculateMultipartETag(partEntries, completedPartNumbers),
entityWithTtl: entityWithTtl,
}, nil, s3err.ErrNone
}
func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.CompleteMultipartUploadInput, parts *CompleteMultipartUpload) (output *CompleteMultipartUploadResult, code s3err.ErrorCode) {
glog.V(2).Infof("completeMultipartUpload input %v", input)
if len(parts.Parts) == 0 {
stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedNoSuchUpload).Inc()
return nil, s3err.ErrNoSuchUpload
}
completedPartNumbers := []int{}
completedPartMap := make(map[int][]string)
maxPartNo := 1
lastSeenPartNo := 0
for _, part := range parts.Parts {
if part.PartNumber < lastSeenPartNo {
return nil, s3err.ErrInvalidPartOrder
}
lastSeenPartNo = part.PartNumber
if _, ok := completedPartMap[part.PartNumber]; !ok {
completedPartNumbers = append(completedPartNumbers, part.PartNumber)
}
completedPartMap[part.PartNumber] = append(completedPartMap[part.PartNumber], part.ETag)
maxPartNo = maxInt(maxPartNo, part.PartNumber)
}
uploadDirectory := s3a.genUploadsFolder(*input.Bucket) + "/" + *input.UploadId
entryName, dirName := s3a.getEntryNameAndDir(input)
var completionState *multipartCompletionState
finalizeCode := s3a.withObjectWriteLock(*input.Bucket, *input.Key, func() s3err.ErrorCode {
return s3a.checkConditionalHeaders(r, *input.Bucket, *input.Key)
}, func() s3err.ErrorCode {
var prepCode s3err.ErrorCode
completionState, output, prepCode = s3a.prepareMultipartCompletionState(r, input, uploadDirectory, entryName, dirName, completedPartNumbers, completedPartMap, maxPartNo)
if prepCode != s3err.ErrNone || output != nil {
return prepCode
}
// Precompute ETag once for consistency across all paths
multipartETag := calculateMultipartETag(partEntries, completedPartNumbers)
etagQuote := "\"" + multipartETag + "\""
// Check if versioning is configured for this bucket BEFORE creating any files
versioningState, vErr := s3a.getVersioningState(*input.Bucket)
if vErr == nil && versioningState == s3_constants.VersioningEnabled {
// Use full object key (not just entryName) to ensure correct .versions directory is checked
normalizedKey := strings.TrimPrefix(*input.Key, "/")
useInvertedFormat := s3a.getVersionIdFormat(*input.Bucket, normalizedKey)
versionId := generateVersionId(useInvertedFormat)
versionFileName := s3a.getVersionFileName(versionId)
versionDir := dirName + "/" + entryName + s3_constants.VersionsFolder
// Capture timestamp and owner once for consistency between version entry and cache entry
versionMtime := time.Now().Unix()
amzAccountId := r.Header.Get(s3_constants.AmzAccountId)
etagQuote := "\"" + completionState.multipartETag + "\""
// Check if versioning is configured for this bucket BEFORE creating any files.
versioningState, vErr := s3a.getVersioningState(*input.Bucket)
if vErr != nil {
glog.Errorf("completeMultipartUpload: failed to get versioning state for bucket %s: %v", *input.Bucket, vErr)
return s3err.ErrInternalError
}
if versioningState == s3_constants.VersioningEnabled {
// Use full object key (not just entryName) to ensure correct .versions directory is checked
normalizedKey := strings.TrimPrefix(*input.Key, "/")
useInvertedFormat := s3a.getVersionIdFormat(*input.Bucket, normalizedKey)
versionId := generateVersionId(useInvertedFormat)
versionFileName := s3a.getVersionFileName(versionId)
versionDir := dirName + "/" + entryName + s3_constants.VersionsFolder
// Capture timestamp and owner once for consistency between version entry and cache entry
versionMtime := time.Now().Unix()
amzAccountId := r.Header.Get(s3_constants.AmzAccountId)
// Create the version file in the .versions directory
err = s3a.mkFile(versionDir, versionFileName, finalParts, func(versionEntry *filer_pb.Entry) {
if versionEntry.Extended == nil {
versionEntry.Extended = make(map[string][]byte)
}
versionEntry.Extended[s3_constants.ExtVersionIdKey] = []byte(versionId)
versionEntry.Extended[s3_constants.SeaweedFSUploadId] = []byte(*input.UploadId)
// Store parts count for x-amz-mp-parts-count header
versionEntry.Extended[s3_constants.SeaweedFSMultipartPartsCount] = []byte(fmt.Sprintf("%d", len(completedPartNumbers)))
// Store part boundaries for GetObject with PartNumber
if partBoundariesJSON, err := json.Marshal(partBoundaries); err == nil {
versionEntry.Extended[s3_constants.SeaweedFSMultipartPartBoundaries] = partBoundariesJSON
// Create the version file in the .versions directory
if err := s3a.mkFile(versionDir, versionFileName, completionState.finalParts, func(versionEntry *filer_pb.Entry) {
if versionEntry.Extended == nil {
versionEntry.Extended = make(map[string][]byte)
}
versionEntry.Extended[s3_constants.ExtVersionIdKey] = []byte(versionId)
versionEntry.Extended[s3_constants.SeaweedFSUploadId] = []byte(*input.UploadId)
// Store parts count for x-amz-mp-parts-count header
versionEntry.Extended[s3_constants.SeaweedFSMultipartPartsCount] = []byte(fmt.Sprintf("%d", len(completedPartNumbers)))
// Store part boundaries for GetObject with PartNumber
if partBoundariesJSON, err := json.Marshal(completionState.partBoundaries); err == nil {
versionEntry.Extended[s3_constants.SeaweedFSMultipartPartBoundaries] = partBoundariesJSON
}
// Set object owner for versioned multipart objects
if amzAccountId != "" {
versionEntry.Extended[s3_constants.ExtAmzOwnerKey] = []byte(amzAccountId)
}
for k, v := range completionState.pentry.Extended {
if k != s3_constants.ExtMultipartObjectKey {
versionEntry.Extended[k] = v
}
}
// Persist ETag to ensure subsequent HEAD/GET uses the same value
versionEntry.Extended[s3_constants.ExtETagKey] = []byte(completionState.multipartETag)
// Preserve ALL SSE metadata from the first part (if any)
// SSE metadata is stored in individual parts, not the upload directory
if len(completedPartNumbers) > 0 && len(completionState.partEntries[completedPartNumbers[0]]) > 0 {
firstPartEntry := completionState.partEntries[completedPartNumbers[0]][0]
copySSEHeadersFromFirstPart(versionEntry, firstPartEntry, "versioned")
}
if completionState.pentry.Attributes != nil && completionState.pentry.Attributes.Mime != "" {
versionEntry.Attributes.Mime = completionState.pentry.Attributes.Mime
} else if completionState.mime != "" {
versionEntry.Attributes.Mime = completionState.mime
}
versionEntry.Attributes.FileSize = uint64(completionState.offset)
versionEntry.Attributes.Mtime = versionMtime
}); err != nil {
glog.Errorf("completeMultipartUpload: failed to create version %s: %v", versionId, err)
return s3err.ErrInternalError
}
// Set object owner for versioned multipart objects
// Construct entry with metadata for caching in .versions directory
// Reuse versionMtime to keep list vs. HEAD timestamps aligned
// multipartETag is precomputed
versionEntryForCache := &filer_pb.Entry{
Attributes: &filer_pb.FuseAttributes{
FileSize: uint64(completionState.offset),
Mtime: versionMtime,
},
Extended: map[string][]byte{
s3_constants.ExtETagKey: []byte(completionState.multipartETag),
},
}
if amzAccountId != "" {
versionEntry.Extended[s3_constants.ExtAmzOwnerKey] = []byte(amzAccountId)
versionEntryForCache.Extended[s3_constants.ExtAmzOwnerKey] = []byte(amzAccountId)
}
for k, v := range pentry.Extended {
if k != s3_constants.ExtMultipartObjectKey {
versionEntry.Extended[k] = v
// Update the .versions directory metadata to indicate this is the latest version
// Pass entry to cache its metadata for single-scan list efficiency
if err := s3a.updateLatestVersionInDirectory(*input.Bucket, *input.Key, versionId, versionFileName, versionEntryForCache); err != nil {
if rollbackErr := s3a.rollbackMultipartVersion(versionDir, versionFileName); rollbackErr != nil {
glog.Errorf("completeMultipartUpload: failed to rollback version %s for %s/%s after latest pointer update error: %v", versionId, *input.Bucket, *input.Key, rollbackErr)
}
glog.Errorf("completeMultipartUpload: failed to update latest version in directory: %v", err)
return s3err.ErrInternalError
}
// Persist ETag to ensure subsequent HEAD/GET uses the same value
versionEntry.Extended[s3_constants.ExtETagKey] = []byte(multipartETag)
// Preserve ALL SSE metadata from the first part (if any)
// SSE metadata is stored in individual parts, not the upload directory
if len(completedPartNumbers) > 0 && len(partEntries[completedPartNumbers[0]]) > 0 {
firstPartEntry := partEntries[completedPartNumbers[0]][0]
copySSEHeadersFromFirstPart(versionEntry, firstPartEntry, "versioned")
}
if pentry.Attributes.Mime != "" {
versionEntry.Attributes.Mime = pentry.Attributes.Mime
} else if mime != "" {
versionEntry.Attributes.Mime = mime
// For versioned buckets, all content is stored in .versions directory
// The latest version information is tracked in the .versions directory metadata
output = &CompleteMultipartUploadResult{
Location: aws.String(fmt.Sprintf("%s://%s/%s/%s", getRequestScheme(r), r.Host, url.PathEscape(*input.Bucket), urlPathEscape(*input.Key))),
Bucket: input.Bucket,
ETag: aws.String(etagQuote),
Key: objectKey(input.Key),
VersionId: aws.String(versionId),
}
versionEntry.Attributes.FileSize = uint64(offset)
versionEntry.Attributes.Mtime = versionMtime
})
if err != nil {
glog.Errorf("completeMultipartUpload: failed to create version %s: %v", versionId, err)
return nil, s3err.ErrInternalError
}
// Construct entry with metadata for caching in .versions directory
// Reuse versionMtime to keep list vs. HEAD timestamps aligned
// multipartETag is precomputed
versionEntryForCache := &filer_pb.Entry{
Attributes: &filer_pb.FuseAttributes{
FileSize: uint64(offset),
Mtime: versionMtime,
},
Extended: map[string][]byte{
s3_constants.ExtETagKey: []byte(multipartETag),
},
}
if amzAccountId != "" {
versionEntryForCache.Extended[s3_constants.ExtAmzOwnerKey] = []byte(amzAccountId)
return s3err.ErrNone
}
// Update the .versions directory metadata to indicate this is the latest version
// Pass entry to cache its metadata for single-scan list efficiency
err = s3a.updateLatestVersionInDirectory(*input.Bucket, *input.Key, versionId, versionFileName, versionEntryForCache)
if err != nil {
glog.Errorf("completeMultipartUpload: failed to update latest version in directory: %v", err)
return nil, s3err.ErrInternalError
}
if versioningState == s3_constants.VersioningSuspended {
// For suspended versioning, add "null" version ID metadata and return "null" version ID
if err := s3a.mkFile(dirName, entryName, completionState.finalParts, func(entry *filer_pb.Entry) {
if entry.Extended == nil {
entry.Extended = make(map[string][]byte)
}
entry.Extended[s3_constants.ExtVersionIdKey] = []byte("null")
entry.Extended[s3_constants.SeaweedFSUploadId] = []byte(*input.UploadId)
// Store parts count for x-amz-mp-parts-count header
entry.Extended[s3_constants.SeaweedFSMultipartPartsCount] = []byte(fmt.Sprintf("%d", len(completedPartNumbers)))
// Store part boundaries for GetObject with PartNumber
if partBoundariesJSON, jsonErr := json.Marshal(completionState.partBoundaries); jsonErr == nil {
entry.Extended[s3_constants.SeaweedFSMultipartPartBoundaries] = partBoundariesJSON
}
// For versioned buckets, all content is stored in .versions directory
// The latest version information is tracked in the .versions directory metadata
output = &CompleteMultipartUploadResult{
Location: aws.String(fmt.Sprintf("%s://%s/%s/%s", getRequestScheme(r), r.Host, url.PathEscape(*input.Bucket), urlPathEscape(*input.Key))),
Bucket: input.Bucket,
ETag: aws.String(etagQuote),
Key: objectKey(input.Key),
VersionId: aws.String(versionId),
}
} else if vErr == nil && versioningState == s3_constants.VersioningSuspended {
// For suspended versioning, add "null" version ID metadata and return "null" version ID
err = s3a.mkFile(dirName, entryName, finalParts, func(entry *filer_pb.Entry) {
if entry.Extended == nil {
entry.Extended = make(map[string][]byte)
}
entry.Extended[s3_constants.ExtVersionIdKey] = []byte("null")
// Store parts count for x-amz-mp-parts-count header
entry.Extended[s3_constants.SeaweedFSMultipartPartsCount] = []byte(fmt.Sprintf("%d", len(completedPartNumbers)))
// Store part boundaries for GetObject with PartNumber
if partBoundariesJSON, jsonErr := json.Marshal(partBoundaries); jsonErr == nil {
entry.Extended[s3_constants.SeaweedFSMultipartPartBoundaries] = partBoundariesJSON
}
// Set object owner for suspended versioning multipart objects
amzAccountId := r.Header.Get(s3_constants.AmzAccountId)
if amzAccountId != "" {
entry.Extended[s3_constants.ExtAmzOwnerKey] = []byte(amzAccountId)
}
// Set object owner for suspended versioning multipart objects
amzAccountId := r.Header.Get(s3_constants.AmzAccountId)
if amzAccountId != "" {
entry.Extended[s3_constants.ExtAmzOwnerKey] = []byte(amzAccountId)
}
for k, v := range completionState.pentry.Extended {
if k != s3_constants.ExtMultipartObjectKey {
entry.Extended[k] = v
}
}
for k, v := range pentry.Extended {
if k != s3_constants.ExtMultipartObjectKey {
entry.Extended[k] = v
// Preserve ALL SSE metadata from the first part (if any)
// SSE metadata is stored in individual parts, not the upload directory
if len(completedPartNumbers) > 0 && len(completionState.partEntries[completedPartNumbers[0]]) > 0 {
firstPartEntry := completionState.partEntries[completedPartNumbers[0]][0]
copySSEHeadersFromFirstPart(entry, firstPartEntry, "suspended versioning")
}
// Persist ETag to ensure subsequent HEAD/GET uses the same value
entry.Extended[s3_constants.ExtETagKey] = []byte(completionState.multipartETag)
if completionState.pentry.Attributes != nil && completionState.pentry.Attributes.Mime != "" {
entry.Attributes.Mime = completionState.pentry.Attributes.Mime
} else if completionState.mime != "" {
entry.Attributes.Mime = completionState.mime
}
entry.Attributes.FileSize = uint64(completionState.offset)
}); err != nil {
glog.Errorf("completeMultipartUpload: failed to create suspended versioning object: %v", err)
return s3err.ErrInternalError
}
// Preserve ALL SSE metadata from the first part (if any)
// SSE metadata is stored in individual parts, not the upload directory
if len(completedPartNumbers) > 0 && len(partEntries[completedPartNumbers[0]]) > 0 {
firstPartEntry := partEntries[completedPartNumbers[0]][0]
copySSEHeadersFromFirstPart(entry, firstPartEntry, "suspended versioning")
}
// Persist ETag to ensure subsequent HEAD/GET uses the same value
entry.Extended[s3_constants.ExtETagKey] = []byte(multipartETag)
if pentry.Attributes.Mime != "" {
entry.Attributes.Mime = pentry.Attributes.Mime
} else if mime != "" {
entry.Attributes.Mime = mime
// Note: Suspended versioning should NOT return VersionId field according to AWS S3 spec
output = &CompleteMultipartUploadResult{
Location: aws.String(fmt.Sprintf("%s://%s/%s/%s", getRequestScheme(r), r.Host, url.PathEscape(*input.Bucket), urlPathEscape(*input.Key))),
Bucket: input.Bucket,
ETag: aws.String(etagQuote),
Key: objectKey(input.Key),
// VersionId field intentionally omitted for suspended versioning
}
entry.Attributes.FileSize = uint64(offset)
})
if err != nil {
glog.Errorf("completeMultipartUpload: failed to create suspended versioning object: %v", err)
return nil, s3err.ErrInternalError
return s3err.ErrNone
}
// Note: Suspended versioning should NOT return VersionId field according to AWS S3 spec
output = &CompleteMultipartUploadResult{
Location: aws.String(fmt.Sprintf("%s://%s/%s/%s", getRequestScheme(r), r.Host, url.PathEscape(*input.Bucket), urlPathEscape(*input.Key))),
Bucket: input.Bucket,
ETag: aws.String(etagQuote),
Key: objectKey(input.Key),
// VersionId field intentionally omitted for suspended versioning
}
} else {
// For non-versioned buckets, create main object file
err = s3a.mkFile(dirName, entryName, finalParts, func(entry *filer_pb.Entry) {
if err := s3a.mkFile(dirName, entryName, completionState.finalParts, func(entry *filer_pb.Entry) {
if entry.Extended == nil {
entry.Extended = make(map[string][]byte)
}
@ -520,7 +565,7 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl
// Store parts count for x-amz-mp-parts-count header
entry.Extended[s3_constants.SeaweedFSMultipartPartsCount] = []byte(fmt.Sprintf("%d", len(completedPartNumbers)))
// Store part boundaries for GetObject with PartNumber
if partBoundariesJSON, err := json.Marshal(partBoundaries); err == nil {
if partBoundariesJSON, err := json.Marshal(completionState.partBoundaries); err == nil {
entry.Extended[s3_constants.SeaweedFSMultipartPartBoundaries] = partBoundariesJSON
}
@ -530,7 +575,7 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl
entry.Extended[s3_constants.ExtAmzOwnerKey] = []byte(amzAccountId)
}
for k, v := range pentry.Extended {
for k, v := range completionState.pentry.Extended {
if k != s3_constants.ExtMultipartObjectKey {
entry.Extended[k] = v
}
@ -538,27 +583,25 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl
// Preserve ALL SSE metadata from the first part (if any)
// SSE metadata is stored in individual parts, not the upload directory
if len(completedPartNumbers) > 0 && len(partEntries[completedPartNumbers[0]]) > 0 {
firstPartEntry := partEntries[completedPartNumbers[0]][0]
if len(completedPartNumbers) > 0 && len(completionState.partEntries[completedPartNumbers[0]]) > 0 {
firstPartEntry := completionState.partEntries[completedPartNumbers[0]][0]
copySSEHeadersFromFirstPart(entry, firstPartEntry, "non-versioned")
}
// Persist ETag to ensure subsequent HEAD/GET uses the same value
entry.Extended[s3_constants.ExtETagKey] = []byte(multipartETag)
if pentry.Attributes.Mime != "" {
entry.Attributes.Mime = pentry.Attributes.Mime
} else if mime != "" {
entry.Attributes.Mime = mime
entry.Extended[s3_constants.ExtETagKey] = []byte(completionState.multipartETag)
if completionState.pentry.Attributes != nil && completionState.pentry.Attributes.Mime != "" {
entry.Attributes.Mime = completionState.pentry.Attributes.Mime
} else if completionState.mime != "" {
entry.Attributes.Mime = completionState.mime
}
entry.Attributes.FileSize = uint64(offset)
entry.Attributes.FileSize = uint64(completionState.offset)
// Set TTL-based S3 expiry (modification time)
if entityWithTtl {
if completionState.entityWithTtl {
entry.Extended[s3_constants.SeaweedFSExpiresS3] = []byte("true")
}
})
if err != nil {
}); err != nil {
glog.Errorf("completeMultipartUpload %s/%s error: %v", dirName, entryName, err)
return nil, s3err.ErrInternalError
return s3err.ErrInternalError
}
// For non-versioned buckets, return response without VersionId
@ -568,21 +611,30 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl
ETag: aws.String(etagQuote),
Key: objectKey(input.Key),
}
return s3err.ErrNone
})
if finalizeCode != s3err.ErrNone {
return nil, finalizeCode
}
for _, deleteEntry := range deleteEntries {
//delete unused part data
if err = s3a.rm(uploadDirectory, deleteEntry.Name, true, true); err != nil {
glog.Warningf("completeMultipartUpload cleanup %s upload %s unused %s : %v", *input.Bucket, *input.UploadId, deleteEntry.Name, err)
if completionState != nil {
for _, deleteEntry := range completionState.deleteEntries {
if err := s3a.rm(uploadDirectory, deleteEntry.Name, true, true); err != nil {
glog.Warningf("completeMultipartUpload cleanup %s upload %s unused %s : %v", *input.Bucket, *input.UploadId, deleteEntry.Name, err)
}
}
if err := s3a.rm(s3a.genUploadsFolder(*input.Bucket), *input.UploadId, false, true); err != nil {
glog.V(1).Infof("completeMultipartUpload cleanup %s upload %s: %v", *input.Bucket, *input.UploadId, err)
}
}
if err = s3a.rm(s3a.genUploadsFolder(*input.Bucket), *input.UploadId, false, true); err != nil {
glog.V(1).Infof("completeMultipartUpload cleanup %s upload %s: %v", *input.Bucket, *input.UploadId, err)
}
return
}
func (s3a *S3ApiServer) rollbackMultipartVersion(versionDir, versionFileName string) error {
return s3a.rmObject(versionDir, versionFileName, true, false)
}
func (s3a *S3ApiServer) getEntryNameAndDir(input *s3.CompleteMultipartUploadInput) (string, string) {
entryName := path.Base(*input.Key)
dirName := path.Dir(*input.Key)

75
weed/s3api/filer_multipart_test.go

@ -2,6 +2,7 @@ package s3api
import (
"encoding/hex"
"net/http"
"testing"
"time"
@ -54,6 +55,42 @@ func TestListPartsResult(t *testing.T) {
}
func TestCompleteMultipartResultIncludesVersionId(t *testing.T) {
r := &http.Request{Host: "localhost", Header: make(http.Header)}
input := &s3.CompleteMultipartUploadInput{
Bucket: aws.String("example-bucket"),
Key: aws.String("example-object"),
}
entry := &filer_pb.Entry{
Extended: map[string][]byte{
s3_constants.ExtVersionIdKey: []byte("version-123"),
},
}
result := completeMultipartResult(r, input, "\"etag-value\"", entry)
if assert.NotNil(t, result.VersionId) {
assert.Equal(t, "version-123", *result.VersionId)
}
}
func TestCompleteMultipartResultOmitsNullVersionId(t *testing.T) {
r := &http.Request{Host: "localhost", Header: make(http.Header)}
input := &s3.CompleteMultipartUploadInput{
Bucket: aws.String("example-bucket"),
Key: aws.String("example-object"),
}
entry := &filer_pb.Entry{
Extended: map[string][]byte{
s3_constants.ExtVersionIdKey: []byte("null"),
},
}
result := completeMultipartResult(r, input, "\"etag-value\"", entry)
assert.Nil(t, result.VersionId)
}
func Test_parsePartNumber(t *testing.T) {
tests := []struct {
name string
@ -190,3 +227,41 @@ func TestValidateCompletePartETag(t *testing.T) {
assert.True(t, invalid)
})
}
func TestCompleteMultipartUploadRejectsOutOfOrderParts(t *testing.T) {
s3a := NewS3ApiServerForTest()
input := &s3.CompleteMultipartUploadInput{
Bucket: aws.String("bucket"),
Key: aws.String("object"),
UploadId: aws.String("upload"),
}
parts := &CompleteMultipartUpload{
Parts: []CompletedPart{
{PartNumber: 2, ETag: "\"etag-2\""},
{PartNumber: 1, ETag: "\"etag-1\""},
},
}
result, errCode := s3a.completeMultipartUpload(&http.Request{Header: make(http.Header)}, input, parts)
assert.Nil(t, result)
assert.Equal(t, s3err.ErrInvalidPartOrder, errCode)
}
func TestCompleteMultipartUploadAllowsDuplicatePartNumbers(t *testing.T) {
s3a := NewS3ApiServerForTest()
input := &s3.CompleteMultipartUploadInput{
Bucket: aws.String("bucket"),
Key: aws.String("object"),
UploadId: aws.String("upload"),
}
parts := &CompleteMultipartUpload{
Parts: []CompletedPart{
{PartNumber: 1, ETag: "\"etag-older\""},
{PartNumber: 1, ETag: "\"etag-newer\""},
},
}
result, errCode := s3a.completeMultipartUpload(&http.Request{Header: make(http.Header)}, input, parts)
assert.Nil(t, result)
assert.Equal(t, s3err.ErrNoSuchUpload, errCode)
}

54
weed/s3api/s3api_conditional_headers_test.go

@ -778,6 +778,7 @@ func NewS3ApiServerForTest() *S3ApiServer {
option: &S3ApiServerOption{
BucketsPath: "/buckets",
},
bucketConfigCache: NewBucketConfigCache(60 * time.Minute),
}
}
@ -928,3 +929,56 @@ func TestConditionalHeadersMultipartUpload(t *testing.T) {
}
})
}
func TestConditionalHeadersTreatDeleteMarkerAsMissing(t *testing.T) {
bucket := "test-bucket"
object := "/deleted-object"
deleteMarkerEntry := &filer_pb.Entry{
Name: "deleted-object",
Extended: map[string][]byte{
s3_constants.ExtDeleteMarkerKey: []byte("true"),
},
Attributes: &filer_pb.FuseAttributes{
Mtime: time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC).Unix(),
},
}
t.Run("WriteIfNoneMatchAsteriskSucceeds", func(t *testing.T) {
getter := createMockEntryGetter(deleteMarkerEntry)
req := createTestPutRequest(bucket, object, "new content")
req.Header.Set(s3_constants.IfNoneMatch, "*")
s3a := NewS3ApiServerForTest()
errCode := s3a.checkConditionalHeadersWithGetter(getter, req, bucket, object)
if errCode != s3err.ErrNone {
t.Fatalf("expected ErrNone for delete marker with If-None-Match=*, got %v", errCode)
}
})
t.Run("WriteIfMatchAsteriskFails", func(t *testing.T) {
getter := createMockEntryGetter(deleteMarkerEntry)
req := createTestPutRequest(bucket, object, "new content")
req.Header.Set(s3_constants.IfMatch, "*")
s3a := NewS3ApiServerForTest()
errCode := s3a.checkConditionalHeadersWithGetter(getter, req, bucket, object)
if errCode != s3err.ErrPreconditionFailed {
t.Fatalf("expected ErrPreconditionFailed for delete marker with If-Match=*, got %v", errCode)
}
})
t.Run("ReadIfMatchAsteriskFails", func(t *testing.T) {
getter := createMockEntryGetter(deleteMarkerEntry)
req := &http.Request{Method: http.MethodGet, Header: make(http.Header)}
req.Header.Set(s3_constants.IfMatch, "*")
s3a := NewS3ApiServerForTest()
result := s3a.checkConditionalHeadersForReadsWithGetter(getter, req, bucket, object)
if result.ErrorCode != s3err.ErrPreconditionFailed {
t.Fatalf("expected ErrPreconditionFailed for read against delete marker with If-Match=*, got %v", result.ErrorCode)
}
if result.Entry != nil {
t.Fatalf("expected no entry to be returned for delete marker, got %#v", result.Entry)
}
})
}

403
weed/s3api/s3api_object_handlers_copy.go

@ -5,6 +5,7 @@ import (
"context"
"crypto/rand"
"encoding/base64"
"errors"
"fmt"
"io"
"net/http"
@ -24,6 +25,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/security"
"github.com/seaweedfs/seaweedfs/weed/util"
util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
"google.golang.org/protobuf/proto"
)
const (
@ -81,33 +83,6 @@ func (s3a *S3ApiServer) CopyObjectHandler(w http.ResponseWriter, r *http.Request
replaceMeta, replaceTagging := replaceDirective(r.Header)
if (srcBucket == dstBucket && srcObject == dstObject || cpSrcPath == "") && (replaceMeta || replaceTagging) {
fullPath := util.FullPath(fmt.Sprintf("%s/%s", s3a.bucketDir(dstBucket), dstObject))
dir, name := fullPath.DirAndName()
entry, err := s3a.getEntry(dir, name)
if err != nil || entry.IsDirectory {
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource)
return
}
entry.Extended, err = processMetadataBytes(r.Header, entry.Extended, replaceMeta, replaceTagging)
entry.Attributes.Mtime = t.Unix()
if err != nil {
glog.Errorf("CopyObjectHandler ValidateTags error %s: %v", r.URL, err)
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidTag)
return
}
err = s3a.touch(dir, name, entry)
if err != nil {
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource)
return
}
writeSuccessResponseXML(w, r, CopyObjectResult{
ETag: filer.ETag(entry),
LastModified: t,
})
return
}
// If source object is empty or bucket is empty, reply back invalid copy source.
if srcObject == "" || srcBucket == "" {
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource)
@ -122,38 +97,14 @@ func (s3a *S3ApiServer) CopyObjectHandler(w http.ResponseWriter, r *http.Request
return
}
// Get the source entry with version awareness based on versioning state
var entry *filer_pb.Entry
if srcVersionId != "" {
// Specific version requested - always use version-aware retrieval
entry, err = s3a.getSpecificObjectVersion(srcBucket, srcObject, srcVersionId)
} else if srcVersioningState == s3_constants.VersioningEnabled {
// Versioning enabled - get latest version from .versions directory
entry, err = s3a.getLatestObjectVersion(srcBucket, srcObject)
} else if srcVersioningState == s3_constants.VersioningSuspended {
// Versioning suspended - current object is stored as regular file ("null" version)
// Try regular file first, fall back to latest version if needed
srcPath := util.FullPath(fmt.Sprintf("%s/%s", s3a.bucketDir(srcBucket), srcObject))
dir, name := srcPath.DirAndName()
entry, err = s3a.getEntry(dir, name)
if err != nil {
// If regular file doesn't exist, try latest version as fallback
glog.V(2).Infof("CopyObject: regular file not found for suspended versioning, trying latest version")
entry, err = s3a.getLatestObjectVersion(srcBucket, srcObject)
}
} else {
// No versioning configured - use regular retrieval
srcPath := util.FullPath(fmt.Sprintf("%s/%s", s3a.bucketDir(srcBucket), srcObject))
dir, name := srcPath.DirAndName()
entry, err = s3a.getEntry(dir, name)
}
entry, err := s3a.resolveCopySourceEntry(srcBucket, srcObject, srcVersionId, srcVersioningState)
if err != nil || entry.IsDirectory {
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource)
return
}
if srcBucket == dstBucket && srcObject == dstObject {
sameDestination := srcBucket == dstBucket && srcObject == dstObject
if sameDestination && !(replaceMeta || replaceTagging) {
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopyDest)
return
}
@ -163,6 +114,10 @@ func (s3a *S3ApiServer) CopyObjectHandler(w http.ResponseWriter, r *http.Request
s3err.WriteErrorResponse(w, r, err)
return
}
if errCode := s3a.checkConditionalHeaders(r, dstBucket, dstObject); errCode != s3err.ErrNone {
s3err.WriteErrorResponse(w, r, errCode)
return
}
// Validate encryption parameters
if err := ValidateCopyEncryption(entry.Extended, r.Header); err != nil {
@ -172,6 +127,62 @@ func (s3a *S3ApiServer) CopyObjectHandler(w http.ResponseWriter, r *http.Request
return
}
dstVersioningState, err := s3a.getVersioningState(dstBucket)
if err != nil {
glog.Errorf("Error checking versioning state for destination bucket %s: %v", dstBucket, err)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
if sameDestination && (replaceMeta || replaceTagging) && s3a.canUseMetadataOnlySelfCopy(entry, r, dstBucket, dstObject) {
var dstVersionId string
var etag string
updateCode := s3a.withObjectWriteLock(dstBucket, dstObject, func() s3err.ErrorCode {
return s3a.checkConditionalHeaders(r, dstBucket, dstObject)
}, func() s3err.ErrorCode {
currentEntry, currentErr := s3a.resolveCopySourceEntry(srcBucket, srcObject, srcVersionId, srcVersioningState)
if currentErr != nil || currentEntry.IsDirectory {
return s3err.ErrInvalidCopySource
}
if errCode := s3a.validateConditionalCopyHeaders(r, currentEntry); errCode != s3err.ErrNone {
return errCode
}
updatedEntry := cloneProtoEntry(currentEntry)
updatedMetadata, metadataErr := processMetadataBytes(r.Header, updatedEntry.Extended, replaceMeta, replaceTagging)
currentErr = metadataErr
if currentErr != nil {
glog.Errorf("CopyObjectHandler ValidateTags error %s: %v", r.URL, currentErr)
return s3err.ErrInvalidTag
}
updatedEntry.Extended = mergeCopyMetadata(updatedEntry.Extended, updatedMetadata)
if updatedEntry.Attributes == nil {
updatedEntry.Attributes = &filer_pb.FuseAttributes{}
}
updatedEntry.Attributes.Mtime = t.Unix()
dstVersionId, etag, currentErr = s3a.finalizeCopyDestination(dstBucket, dstObject, dstVersioningState, updatedEntry)
if currentErr != nil {
return filerErrorToS3Error(currentErr)
}
return s3err.ErrNone
})
if updateCode != s3err.ErrNone {
s3err.WriteErrorResponse(w, r, updateCode)
return
}
if dstVersionId != "" {
w.Header().Set("x-amz-version-id", dstVersionId)
}
setEtag(w, etag)
writeSuccessResponseXML(w, r, CopyObjectResult{
ETag: etag,
LastModified: t,
})
return
}
// Determine whether we can reuse the source MD5 (direct copy without encryption changes).
canReuseSourceMd5 := false
var sourceMd5 []byte
@ -302,118 +313,233 @@ func (s3a *S3ApiServer) CopyObjectHandler(w http.ResponseWriter, r *http.Request
}
}
// Check if destination bucket has versioning enabled
// Only create versions if versioning is explicitly "Enabled", not "Suspended" or unconfigured
dstVersioningState, err := s3a.getVersioningState(dstBucket)
if err != nil {
glog.Errorf("Error checking versioning state for destination bucket %s: %v", dstBucket, err)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
var dstVersionId string
var etag string
finalizeCode := s3a.withObjectWriteLock(dstBucket, dstObject, func() s3err.ErrorCode {
return s3a.checkConditionalHeaders(r, dstBucket, dstObject)
}, func() s3err.ErrorCode {
var finalizeErr error
dstVersionId, etag, finalizeErr = s3a.finalizeCopyDestination(dstBucket, dstObject, dstVersioningState, dstEntry)
if finalizeErr != nil {
return filerErrorToS3Error(finalizeErr)
}
return s3err.ErrNone
})
if finalizeCode != s3err.ErrNone {
s3err.WriteErrorResponse(w, r, finalizeCode)
return
}
var dstVersionId string
var etag string
if dstVersionId != "" {
w.Header().Set("x-amz-version-id", dstVersionId)
}
if shouldCreateVersionForCopy(dstVersioningState) {
// For versioned destination, create a new version using appropriate format
dstVersionId = s3a.generateVersionIdForObject(dstBucket, dstObject)
glog.V(2).Infof("CopyObjectHandler: creating version %s for destination %s/%s", dstVersionId, dstBucket, dstObject)
setEtag(w, etag)
// Add version metadata to the entry
if dstEntry.Extended == nil {
dstEntry.Extended = make(map[string][]byte)
}
dstEntry.Extended[s3_constants.ExtVersionIdKey] = []byte(dstVersionId)
response := CopyObjectResult{
ETag: etag,
LastModified: t,
}
writeSuccessResponseXML(w, r, response)
}
// Calculate ETag for versioning
filerEntry := &filer.Entry{
FullPath: util.FullPath(fmt.Sprintf("%s/%s", s3a.bucketDir(dstBucket), dstObject)),
Attr: filer.Attr{
FileSize: dstEntry.Attributes.FileSize,
Mtime: time.Unix(dstEntry.Attributes.Mtime, 0),
Crtime: time.Unix(dstEntry.Attributes.Crtime, 0),
Mime: dstEntry.Attributes.Mime,
},
Chunks: dstEntry.Chunks,
func cloneProtoEntry(entry *filer_pb.Entry) *filer_pb.Entry {
if entry == nil {
return nil
}
return proto.Clone(entry).(*filer_pb.Entry)
}
func copyEntryETag(fullPath util.FullPath, entry *filer_pb.Entry) string {
if entry != nil && entry.Extended != nil {
if etag, ok := entry.Extended[s3_constants.ExtETagKey]; ok && len(etag) > 0 {
return string(etag)
}
etag = filer.ETagEntry(filerEntry)
if !strings.HasPrefix(etag, "\"") {
etag = "\"" + etag + "\""
}
attr := filer.Attr{}
if entry.Attributes != nil {
attr = filer.Attr{
FileSize: entry.Attributes.FileSize,
Mtime: time.Unix(entry.Attributes.Mtime, 0),
Crtime: time.Unix(entry.Attributes.Crtime, 0),
Mime: entry.Attributes.Mime,
Md5: entry.Attributes.Md5,
}
}
return filer.ETagEntry(&filer.Entry{
FullPath: fullPath,
Attr: attr,
Chunks: entry.Chunks,
Content: entry.Content,
Remote: entry.RemoteEntry,
})
}
func copyEntryToTarget(dst, src *filer_pb.Entry) {
dst.IsDirectory = src.IsDirectory
dst.Attributes = src.Attributes
dst.Extended = src.Extended
dst.Chunks = src.Chunks
dst.Content = src.Content
dst.RemoteEntry = src.RemoteEntry
dst.HardLinkId = src.HardLinkId
dst.HardLinkCounter = src.HardLinkCounter
dst.Quota = src.Quota
dst.WormEnforcedAtTsNs = src.WormEnforcedAtTsNs
}
func (s3a *S3ApiServer) finalizeCopyDestination(dstBucket, dstObject, dstVersioningState string, dstEntry *filer_pb.Entry) (versionId string, etag string, err error) {
normalizedObject := s3_constants.NormalizeObjectKey(dstObject)
dstPath := util.FullPath(fmt.Sprintf("%s/%s", s3a.bucketDir(dstBucket), normalizedObject))
dstDir, dstName := dstPath.DirAndName()
if dstEntry.Attributes == nil {
dstEntry.Attributes = &filer_pb.FuseAttributes{}
}
if dstEntry.Extended == nil {
dstEntry.Extended = make(map[string][]byte)
}
etag = copyEntryETag(dstPath, dstEntry)
switch dstVersioningState {
case s3_constants.VersioningEnabled:
versionId = s3a.generateVersionIdForObject(dstBucket, normalizedObject)
glog.V(2).Infof("CopyObjectHandler: creating version %s for destination %s/%s", versionId, dstBucket, normalizedObject)
dstEntry.Extended[s3_constants.ExtVersionIdKey] = []byte(versionId)
dstEntry.Extended[s3_constants.ExtETagKey] = []byte(etag)
// Create version file
versionFileName := s3a.getVersionFileName(dstVersionId)
versionObjectPath := dstObject + ".versions/" + versionFileName
versionFileName := s3a.getVersionFileName(versionId)
versionObjectPath := normalizedObject + s3_constants.VersionsFolder + "/" + versionFileName
bucketDir := s3a.bucketDir(dstBucket)
if err := s3a.mkFile(bucketDir, versionObjectPath, dstEntry.Chunks, func(entry *filer_pb.Entry) {
entry.Attributes = dstEntry.Attributes
entry.Extended = dstEntry.Extended
if err = s3a.mkFile(bucketDir, versionObjectPath, dstEntry.Chunks, func(entry *filer_pb.Entry) {
copyEntryToTarget(entry, dstEntry)
}); err != nil {
s3err.WriteErrorResponse(w, r, filerErrorToS3Error(err))
return
return "", "", err
}
// Update the .versions directory metadata
// Pass dstEntry to cache its metadata for single-scan list efficiency
err = s3a.updateLatestVersionInDirectory(dstBucket, dstObject, dstVersionId, versionFileName, dstEntry)
if err != nil {
if err = s3a.updateLatestVersionInDirectory(dstBucket, normalizedObject, versionId, versionFileName, dstEntry); err != nil {
if rollbackErr := s3a.rollbackCopyVersion(bucketDir, versionObjectPath); rollbackErr != nil {
glog.Errorf("CopyObjectHandler: failed to rollback version %s for %s/%s after latest pointer update error: %v", versionId, dstBucket, normalizedObject, rollbackErr)
}
glog.Errorf("CopyObjectHandler: failed to update latest version in directory: %v", err)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
return "", "", fmt.Errorf("update latest version metadata: %w", err)
}
// Set version ID in response header
w.Header().Set("x-amz-version-id", dstVersionId)
} else {
// For non-versioned destination, use regular copy
// Remove any versioning-related metadata from source that shouldn't carry over
return versionId, etag, nil
case s3_constants.VersioningSuspended:
cleanupVersioningMetadata(dstEntry.Extended)
dstEntry.Extended[s3_constants.ExtVersionIdKey] = []byte("null")
dstEntry.Extended[s3_constants.ExtETagKey] = []byte(etag)
dstPath := util.FullPath(fmt.Sprintf("%s/%s", s3a.bucketDir(dstBucket), dstObject))
dstDir, dstName := dstPath.DirAndName()
if err = s3a.mkFile(dstDir, dstName, dstEntry.Chunks, func(entry *filer_pb.Entry) {
copyEntryToTarget(entry, dstEntry)
}); err != nil {
return "", "", err
}
// Check if destination exists and remove it first (S3 copy overwrites)
if exists, _ := s3a.exists(dstDir, dstName, false); exists {
if err := s3a.rmObject(dstDir, dstName, false, false); err != nil {
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
if err = s3a.updateIsLatestFlagsForSuspendedVersioning(dstBucket, normalizedObject); err != nil {
glog.Warningf("CopyObjectHandler: failed to update suspended version latest flags for %s/%s: %v", dstBucket, normalizedObject, err)
}
// Create the new file
if err := s3a.mkFile(dstDir, dstName, dstEntry.Chunks, func(entry *filer_pb.Entry) {
entry.Attributes = dstEntry.Attributes
entry.Extended = dstEntry.Extended
return "", etag, nil
default:
cleanupVersioningMetadata(dstEntry.Extended)
dstEntry.Extended[s3_constants.ExtETagKey] = []byte(etag)
if err = s3a.mkFile(dstDir, dstName, dstEntry.Chunks, func(entry *filer_pb.Entry) {
copyEntryToTarget(entry, dstEntry)
}); err != nil {
s3err.WriteErrorResponse(w, r, filerErrorToS3Error(err))
return
return "", "", err
}
// Calculate ETag
filerEntry := &filer.Entry{
FullPath: dstPath,
Attr: filer.Attr{
FileSize: dstEntry.Attributes.FileSize,
Mtime: time.Unix(dstEntry.Attributes.Mtime, 0),
Crtime: time.Unix(dstEntry.Attributes.Crtime, 0),
Mime: dstEntry.Attributes.Mime,
},
Chunks: dstEntry.Chunks,
}
etag = filer.ETagEntry(filerEntry)
return "", etag, nil
}
}
setEtag(w, etag)
func (s3a *S3ApiServer) rollbackCopyVersion(bucketDir, versionObjectPath string) error {
versionPath := util.FullPath(fmt.Sprintf("%s/%s", bucketDir, versionObjectPath))
versionDir, versionName := versionPath.DirAndName()
return s3a.rmObject(versionDir, versionName, true, false)
}
response := CopyObjectResult{
ETag: etag,
LastModified: t,
func (s3a *S3ApiServer) resolveCopySourceEntry(bucket, object, versionId, versioningState string) (*filer_pb.Entry, error) {
normalizedObject := s3_constants.NormalizeObjectKey(object)
if versionId != "" {
return s3a.getSpecificObjectVersion(bucket, normalizedObject, versionId)
}
writeSuccessResponseXML(w, r, response)
switch versioningState {
case s3_constants.VersioningEnabled:
return s3a.getLatestObjectVersion(bucket, normalizedObject)
case s3_constants.VersioningSuspended:
return s3a.resolveSuspendedCopySourceEntry(bucket, normalizedObject, "CopyObject")
default:
srcPath := util.FullPath(fmt.Sprintf("%s/%s", s3a.bucketDir(bucket), normalizedObject))
dir, name := srcPath.DirAndName()
return s3a.getEntry(dir, name)
}
}
func mergeCopyMetadata(existing, updated map[string][]byte) map[string][]byte {
merged := make(map[string][]byte, len(existing)+len(updated))
for k, v := range existing {
merged[k] = v
}
for k := range merged {
if isManagedCopyMetadataKey(k) {
delete(merged, k)
}
}
for k, v := range updated {
merged[k] = v
}
return merged
}
func isManagedCopyMetadataKey(key string) bool {
switch key {
case s3_constants.AmzStorageClass,
s3_constants.AmzServerSideEncryption,
s3_constants.AmzServerSideEncryptionAwsKmsKeyId,
s3_constants.AmzServerSideEncryptionContext,
s3_constants.AmzServerSideEncryptionBucketKeyEnabled,
s3_constants.AmzServerSideEncryptionCustomerAlgorithm,
s3_constants.AmzServerSideEncryptionCustomerKeyMD5,
s3_constants.AmzTagCount:
return true
}
return strings.HasPrefix(key, s3_constants.AmzUserMetaPrefix) || strings.HasPrefix(key, s3_constants.AmzObjectTagging)
}
func (s3a *S3ApiServer) resolveSuspendedCopySourceEntry(bucket, normalizedObject, operation string) (*filer_pb.Entry, error) {
srcPath := util.FullPath(fmt.Sprintf("%s/%s", s3a.bucketDir(bucket), normalizedObject))
dir, name := srcPath.DirAndName()
entry, err := s3a.getEntry(dir, name)
if err == nil {
return entry, nil
}
if !errors.Is(err, filer_pb.ErrNotFound) {
return nil, err
}
glog.V(2).Infof("%s: regular file not found for suspended versioning, trying latest version", operation)
return s3a.getLatestObjectVersion(bucket, normalizedObject)
}
func (s3a *S3ApiServer) canUseMetadataOnlySelfCopy(entry *filer_pb.Entry, r *http.Request, bucket, object string) bool {
srcPath := fmt.Sprintf("%s/%s", s3a.bucketDir(bucket), s3_constants.NormalizeObjectKey(object))
state := DetectEncryptionStateWithEntry(entry, r, srcPath, srcPath)
s3a.applyCopyBucketDefaultEncryption(state, bucket)
strategy, err := DetermineUnifiedCopyStrategy(state, entry.Extended, r)
return err == nil && strategy == CopyStrategyDirect
}
func pathToBucketAndObject(path string) (bucket, object string) {
@ -553,14 +679,7 @@ func (s3a *S3ApiServer) CopyObjectPartHandler(w http.ResponseWriter, r *http.Req
} else if srcVersioningState == s3_constants.VersioningSuspended {
// Versioning suspended - current object is stored as regular file ("null" version)
// Try regular file first, fall back to latest version if needed
srcPath := util.FullPath(fmt.Sprintf("%s/%s", s3a.bucketDir(srcBucket), srcObject))
dir, name := srcPath.DirAndName()
entry, err = s3a.getEntry(dir, name)
if err != nil {
// If regular file doesn't exist, try latest version as fallback
glog.V(2).Infof("CopyObjectPart: regular file not found for suspended versioning, trying latest version")
entry, err = s3a.getLatestObjectVersion(srcBucket, srcObject)
}
entry, err = s3a.resolveSuspendedCopySourceEntry(srcBucket, s3_constants.NormalizeObjectKey(srcObject), "CopyObjectPart")
} else {
// No versioning configured - use regular retrieval
srcPath := util.FullPath(fmt.Sprintf("%s/%s", s3a.bucketDir(srcBucket), srcObject))

61
weed/s3api/s3api_object_handlers_copy_test.go

@ -8,7 +8,9 @@ import (
"strings"
"testing"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
"github.com/seaweedfs/seaweedfs/weed/util"
)
type H map[string]string
@ -396,6 +398,58 @@ func TestProcessMetadataBytes(t *testing.T) {
}
}
func TestMergeCopyMetadataPreservesInternalFields(t *testing.T) {
existing := map[string][]byte{
s3_constants.SeaweedFSSSEKMSKey: []byte("kms-secret"),
s3_constants.SeaweedFSSSEIV: []byte("iv"),
"X-Amz-Meta-Old": []byte("old"),
"X-Amz-Tagging-Old": []byte("old-tag"),
s3_constants.AmzStorageClass: []byte("STANDARD"),
}
updated := map[string][]byte{
"X-Amz-Meta-New": []byte("new"),
"X-Amz-Tagging-New": []byte("new-tag"),
s3_constants.AmzStorageClass: []byte("GLACIER"),
}
merged := mergeCopyMetadata(existing, updated)
if got := string(merged[s3_constants.SeaweedFSSSEKMSKey]); got != "kms-secret" {
t.Fatalf("expected internal KMS key to be preserved, got %q", got)
}
if got := string(merged[s3_constants.SeaweedFSSSEIV]); got != "iv" {
t.Fatalf("expected internal IV to be preserved, got %q", got)
}
if _, ok := merged["X-Amz-Meta-Old"]; ok {
t.Fatalf("expected stale user metadata to be removed, got %#v", merged)
}
if _, ok := merged["X-Amz-Tagging-Old"]; ok {
t.Fatalf("expected stale tagging metadata to be removed, got %#v", merged)
}
if got := string(merged["X-Amz-Meta-New"]); got != "new" {
t.Fatalf("expected replacement user metadata to be applied, got %q", got)
}
if got := string(merged["X-Amz-Tagging-New"]); got != "new-tag" {
t.Fatalf("expected replacement tagging metadata to be applied, got %q", got)
}
if got := string(merged[s3_constants.AmzStorageClass]); got != "GLACIER" {
t.Fatalf("expected storage class to be updated, got %q", got)
}
}
func TestCopyEntryETagPrefersStoredETag(t *testing.T) {
entry := &filer_pb.Entry{
Extended: map[string][]byte{
s3_constants.ExtETagKey: []byte("\"stored-etag\""),
},
Attributes: &filer_pb.FuseAttributes{},
}
if got := copyEntryETag(util.FullPath("/buckets/test-bucket/object.txt"), entry); got != "\"stored-etag\"" {
t.Fatalf("copyEntryETag() = %q, want %q", got, "\"stored-etag\"")
}
}
func fmtTagging(maps ...map[string]string) {
for _, m := range maps {
if tagging := m[s3_constants.AmzObjectTagging]; len(tagging) > 0 {
@ -556,9 +610,8 @@ func TestCleanupVersioningMetadata(t *testing.T) {
}
}
// TestCopyVersioningIntegration validates the interaction between
// shouldCreateVersionForCopy and cleanupVersioningMetadata functions.
// This integration test ensures the complete fix for issue #7505.
// TestCopyVersioningIntegration validates the metadata shaping that happens
// before copy finalization for each destination versioning mode.
func TestCopyVersioningIntegration(t *testing.T) {
testCases := []struct {
name string
@ -581,7 +634,7 @@ func TestCopyVersioningIntegration(t *testing.T) {
},
},
{
name: "SuspendedCleansMetadata",
name: "SuspendedCleansVersionMetadataBeforeFinalize",
versioningState: s3_constants.VersioningSuspended,
sourceMetadata: map[string][]byte{
s3_constants.ExtVersionIdKey: []byte("v123"),

418
weed/s3api/s3api_object_handlers_delete.go

@ -2,6 +2,7 @@ package s3api
import (
"encoding/xml"
"errors"
"io"
"net/http"
"strings"
@ -18,6 +19,159 @@ const (
deleteMultipleObjectsLimit = 1000
)
type deleteMutationResult struct {
versionId string
deleteMarker bool
}
func deleteErrorFromCode(code s3err.ErrorCode, key, versionId string) DeleteError {
apiErr := s3err.GetAPIError(code)
return DeleteError{
Code: apiErr.Code,
Message: apiErr.Description,
Key: key,
VersionId: versionId,
}
}
// isMissingDeleteConditionTarget normalizes missing-target detection for conditional deletes.
// Prefer errors.Is(err, filer_pb.ErrNotFound) and errors.Is(err, ErrDeleteMarker); keep the
// string-based fallback only as a defensive bridge for filer paths that still return plain text.
func isMissingDeleteConditionTarget(err error) bool {
if err == nil {
return false
}
if errors.Is(err, filer_pb.ErrNotFound) || errors.Is(err, ErrDeleteMarker) {
return true
}
lowerErr := strings.ToLower(err.Error())
return strings.Contains(lowerErr, "not found")
}
func (s3a *S3ApiServer) resolveDeleteConditionalEntry(bucket, object, versionId, versioningState string) (*filer_pb.Entry, error) {
normalizedObject := s3_constants.NormalizeObjectKey(object)
bucketDir := s3a.bucketDir(bucket)
if versionId != "" {
if versionId == "null" {
return s3a.getEntry(bucketDir, normalizedObject)
}
return s3a.getEntry(s3a.getVersionedObjectDir(bucket, normalizedObject), s3a.getVersionFileName(versionId))
}
switch versioningState {
case s3_constants.VersioningEnabled:
entry, err := s3a.getLatestObjectVersion(bucket, normalizedObject)
if err != nil {
return nil, err
}
return normalizeConditionalTargetEntry(entry), nil
default:
entry, err := s3a.resolveObjectEntry(bucket, normalizedObject)
if err != nil {
return nil, err
}
return normalizeConditionalTargetEntry(entry), nil
}
}
func (s3a *S3ApiServer) validateDeleteIfMatch(entry *filer_pb.Entry, ifMatch string, missingCode s3err.ErrorCode) s3err.ErrorCode {
if ifMatch == "" {
return s3err.ErrNone
}
if entry == nil {
return missingCode
}
if ifMatch == "*" {
return s3err.ErrNone
}
if !s3a.etagMatches(ifMatch, s3a.getObjectETag(entry)) {
return s3err.ErrPreconditionFailed
}
return s3err.ErrNone
}
func (s3a *S3ApiServer) checkDeleteIfMatch(bucket, object, versionId, versioningState, ifMatch string, missingCode s3err.ErrorCode) s3err.ErrorCode {
if ifMatch == "" {
return s3err.ErrNone
}
entry, err := s3a.resolveDeleteConditionalEntry(bucket, object, versionId, versioningState)
if err != nil {
if isMissingDeleteConditionTarget(err) {
return missingCode
}
glog.Errorf("checkDeleteIfMatch: failed to resolve %s/%s (versionId=%s): %v", bucket, object, versionId, err)
return s3err.ErrInternalError
}
return s3a.validateDeleteIfMatch(entry, ifMatch, missingCode)
}
func (s3a *S3ApiServer) deleteVersionedObject(r *http.Request, bucket, object, versionId, versioningState string) (deleteMutationResult, s3err.ErrorCode) {
var result deleteMutationResult
switch {
case versionId != "":
versionEntry, versionLookupErr := s3a.getSpecificObjectVersion(bucket, object, versionId)
if versionLookupErr == nil && versionEntry != nil && versionEntry.Extended != nil {
if deleteMarker, ok := versionEntry.Extended[s3_constants.ExtDeleteMarkerKey]; ok && string(deleteMarker) == "true" {
result.deleteMarker = true
}
}
governanceBypassAllowed := s3a.evaluateGovernanceBypassRequest(r, bucket, object)
if err := s3a.enforceObjectLockProtections(r, bucket, object, versionId, governanceBypassAllowed); err != nil {
glog.V(2).Infof("deleteVersionedObject: object lock check failed for %s/%s version %s: %v", bucket, object, versionId, err)
return result, s3err.ErrAccessDenied
}
if err := s3a.deleteSpecificObjectVersion(bucket, object, versionId); err != nil {
glog.Errorf("deleteVersionedObject: failed to delete specific version %s for %s/%s: %v", versionId, bucket, object, err)
return result, s3err.ErrInternalError
}
result.versionId = versionId
return result, s3err.ErrNone
case versioningState == s3_constants.VersioningEnabled:
deleteMarkerVersionId, err := s3a.createDeleteMarker(bucket, object)
if err != nil {
glog.Errorf("deleteVersionedObject: failed to create delete marker for %s/%s: %v", bucket, object, err)
return result, s3err.ErrInternalError
}
result.versionId = deleteMarkerVersionId
result.deleteMarker = true
return result, s3err.ErrNone
case versioningState == s3_constants.VersioningSuspended:
governanceBypassAllowed := s3a.evaluateGovernanceBypassRequest(r, bucket, object)
if err := s3a.enforceObjectLockProtections(r, bucket, object, "null", governanceBypassAllowed); err != nil {
glog.V(2).Infof("deleteVersionedObject: object lock check failed for %s/%s null version: %v", bucket, object, err)
return result, s3err.ErrAccessDenied
}
if err := s3a.deleteSpecificObjectVersion(bucket, object, "null"); err != nil {
glog.Errorf("deleteVersionedObject: failed to delete null version for %s/%s: %v", bucket, object, err)
return result, s3err.ErrInternalError
}
deleteMarkerVersionId, err := s3a.createDeleteMarker(bucket, object)
if err != nil {
glog.Errorf("deleteVersionedObject: failed to create delete marker for suspended versioning %s/%s: %v", bucket, object, err)
return result, s3err.ErrInternalError
}
result.versionId = deleteMarkerVersionId
result.deleteMarker = true
return result, s3err.ErrNone
}
glog.Errorf("deleteVersionedObject: unsupported versioning state %q for %s/%s", versioningState, bucket, object)
return result, s3err.ErrInternalError
}
func (s3a *S3ApiServer) deleteUnversionedObjectWithClient(client filer_pb.SeaweedFilerClient, bucket, object string) error {
target := util.NewFullPath(s3a.bucketDir(bucket), object)
dir, name := target.DirAndName()
return deleteObjectEntry(client, dir, name, true, false)
}
func (s3a *S3ApiServer) DeleteObjectHandler(w http.ResponseWriter, r *http.Request) {
bucket, object := s3_constants.GetBucketAndObject(r)
@ -42,8 +196,6 @@ func (s3a *S3ApiServer) DeleteObjectHandler(w http.ResponseWriter, r *http.Reque
return
}
versioningEnabled := (versioningState == s3_constants.VersioningEnabled)
versioningSuspended := (versioningState == s3_constants.VersioningSuspended)
versioningConfigured := (versioningState != "")
var auditLog *s3err.AccessLog
@ -51,93 +203,49 @@ func (s3a *S3ApiServer) DeleteObjectHandler(w http.ResponseWriter, r *http.Reque
auditLog = s3err.GetAccessLog(r, http.StatusNoContent, s3err.ErrNone)
}
if versioningConfigured {
// Handle versioned delete based on specific versioning state
if versionId != "" {
// Delete specific version (same for both enabled and suspended)
// Check object lock permissions before deleting specific version
governanceBypassAllowed := s3a.evaluateGovernanceBypassRequest(r, bucket, object)
if err := s3a.enforceObjectLockProtections(r, bucket, object, versionId, governanceBypassAllowed); err != nil {
glog.V(2).Infof("DeleteObjectHandler: object lock check failed for %s/%s: %v", bucket, object, err)
s3err.WriteErrorResponse(w, r, s3err.ErrAccessDenied)
return
}
// Delete specific version
err := s3a.deleteSpecificObjectVersion(bucket, object, versionId)
if err != nil {
glog.Errorf("Failed to delete specific version %s: %v", versionId, err)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
// Set version ID in response header
w.Header().Set("x-amz-version-id", versionId)
} else {
// Delete without version ID - behavior depends on versioning state
if versioningEnabled {
// Enabled versioning: Create delete marker (logical delete)
// AWS S3 behavior: Delete marker creation is NOT blocked by object retention
// because it's a logical delete that doesn't actually remove the retained version
deleteMarkerVersionId, err := s3a.createDeleteMarker(bucket, object)
if err != nil {
glog.Errorf("Failed to create delete marker: %v", err)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
// Set delete marker version ID in response header
w.Header().Set("x-amz-version-id", deleteMarkerVersionId)
w.Header().Set("x-amz-delete-marker", "true")
} else if versioningSuspended {
// Suspended versioning: Actually delete the "null" version object
glog.V(2).Infof("DeleteObjectHandler: deleting null version for suspended versioning %s/%s", bucket, object)
// Check object lock permissions before deleting "null" version
governanceBypassAllowed := s3a.evaluateGovernanceBypassRequest(r, bucket, object)
if err := s3a.enforceObjectLockProtections(r, bucket, object, "null", governanceBypassAllowed); err != nil {
glog.V(2).Infof("DeleteObjectHandler: object lock check failed for %s/%s: %v", bucket, object, err)
s3err.WriteErrorResponse(w, r, s3err.ErrAccessDenied)
return
}
// Delete the "null" version (the regular file)
err := s3a.deleteSpecificObjectVersion(bucket, object, "null")
if err != nil {
glog.Errorf("Failed to delete null version: %v", err)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
if ifMatchResult := s3a.checkDeleteIfMatch(bucket, object, versionId, versioningState, r.Header.Get(s3_constants.IfMatch), s3err.ErrPreconditionFailed); ifMatchResult != s3err.ErrNone {
s3err.WriteErrorResponse(w, r, ifMatchResult)
return
}
// Note: According to AWS S3 spec, suspended versioning should NOT return version ID headers
// The object is deleted but no version information is returned
var deleteResult deleteMutationResult
deleteCode := s3a.withObjectWriteLock(bucket, object, func() s3err.ErrorCode {
return s3a.checkDeleteIfMatch(bucket, object, versionId, versioningState, r.Header.Get(s3_constants.IfMatch), s3err.ErrPreconditionFailed)
}, func() s3err.ErrorCode {
if versioningConfigured {
result, errCode := s3a.deleteVersionedObject(r, bucket, object, versionId, versioningState)
if errCode != s3err.ErrNone {
return errCode
}
deleteResult = result
return s3err.ErrNone
}
} else {
// Handle regular delete (non-versioned)
// Check object lock permissions before deleting object
governanceBypassAllowed := s3a.evaluateGovernanceBypassRequest(r, bucket, object)
if err := s3a.enforceObjectLockProtections(r, bucket, object, "", governanceBypassAllowed); err != nil {
glog.V(2).Infof("DeleteObjectHandler: object lock check failed for %s/%s: %v", bucket, object, err)
s3err.WriteErrorResponse(w, r, s3err.ErrAccessDenied)
return
return s3err.ErrAccessDenied
}
// Normalize trailing-slash object keys (e.g. "path/") to the
// underlying directory entry path so DeleteEntry gets a valid name.
target := util.NewFullPath(s3a.bucketDir(bucket), object)
dir, name := target.DirAndName()
if err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
return s3a.deleteUnversionedObjectWithClient(client, bucket, object)
}); err != nil {
glog.Errorf("DeleteObjectHandler: failed to delete %s/%s: %v", bucket, object, err)
return s3err.ErrInternalError
}
err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
return deleteObjectEntry(client, dir, name, true, false)
// Note: Empty folder cleanup is now handled asynchronously by EmptyFolderCleaner
// which listens to metadata events and uses consistent hashing for coordination
})
return s3err.ErrNone
})
if deleteCode != s3err.ErrNone {
s3err.WriteErrorResponse(w, r, deleteCode)
return
}
if err != nil {
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
if deleteResult.versionId != "" {
w.Header().Set("x-amz-version-id", deleteResult.versionId)
}
if deleteResult.deleteMarker {
w.Header().Set("x-amz-delete-marker", "true")
}
if auditLog != nil {
@ -154,6 +262,7 @@ func (s3a *S3ApiServer) DeleteObjectHandler(w http.ResponseWriter, r *http.Reque
type ObjectIdentifier struct {
Key string `xml:"Key"`
VersionId string `xml:"VersionId,omitempty"`
ETag string `xml:"ETag,omitempty"`
DeleteMarker bool `xml:"DeleteMarker,omitempty"`
DeleteMarkerVersionId string `xml:"DeleteMarkerVersionId,omitempty"`
}
@ -228,133 +337,63 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h
return
}
versioningEnabled := (versioningState == s3_constants.VersioningEnabled)
versioningSuspended := (versioningState == s3_constants.VersioningSuspended)
versioningConfigured := (versioningState != "")
deletedCount := 0
s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
err = s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
// delete file entries
for _, object := range deleteObjects.Objects {
if object.Key == "" {
continue
}
if err := s3a.validateTableBucketObjectPath(bucket, object.Key); err != nil {
deleteErrors = append(deleteErrors, DeleteError{
Code: s3err.GetAPIError(s3err.ErrAccessDenied).Code,
Message: s3err.GetAPIError(s3err.ErrAccessDenied).Description,
Key: object.Key,
VersionId: object.VersionId,
})
deleteErrors = append(deleteErrors, deleteErrorFromCode(s3err.ErrAccessDenied, object.Key, object.VersionId))
continue
}
// Check object lock permissions before deletion (only for versioned buckets)
if versioningConfigured {
// Validate governance bypass for this specific object
var deleteResult deleteMutationResult
deleteCode := s3a.withObjectWriteLock(bucket, object.Key, func() s3err.ErrorCode {
return s3a.checkDeleteIfMatch(bucket, object.Key, object.VersionId, versioningState, object.ETag, s3err.ErrNoSuchKey)
}, func() s3err.ErrorCode {
if versioningConfigured {
result, errCode := s3a.deleteVersionedObject(r, bucket, object.Key, object.VersionId, versioningState)
if errCode != s3err.ErrNone {
return errCode
}
deleteResult = result
return s3err.ErrNone
}
governanceBypassAllowed := s3a.evaluateGovernanceBypassRequest(r, bucket, object.Key)
if err := s3a.enforceObjectLockProtections(r, bucket, object.Key, object.VersionId, governanceBypassAllowed); err != nil {
glog.V(2).Infof("DeleteMultipleObjectsHandler: object lock check failed for %s/%s (version: %s): %v", bucket, object.Key, object.VersionId, err)
deleteErrors = append(deleteErrors, DeleteError{
Code: s3err.GetAPIError(s3err.ErrAccessDenied).Code,
Message: s3err.GetAPIError(s3err.ErrAccessDenied).Description,
Key: object.Key,
VersionId: object.VersionId,
})
continue
if err := s3a.enforceObjectLockProtections(r, bucket, object.Key, "", governanceBypassAllowed); err != nil {
glog.V(2).Infof("DeleteMultipleObjectsHandler: object lock check failed for %s/%s: %v", bucket, object.Key, err)
return s3err.ErrAccessDenied
}
}
var deleteVersionId string
var isDeleteMarker bool
if versioningConfigured {
// Handle versioned delete based on specific versioning state
if object.VersionId != "" {
// Delete specific version (same for both enabled and suspended)
err := s3a.deleteSpecificObjectVersion(bucket, object.Key, object.VersionId)
if err != nil {
deleteErrors = append(deleteErrors, DeleteError{
Code: "",
Message: err.Error(),
Key: object.Key,
VersionId: object.VersionId,
})
continue
}
deleteVersionId = object.VersionId
} else {
// Delete without version ID - behavior depends on versioning state
if versioningEnabled {
// Enabled versioning: Create delete marker (logical delete)
deleteMarkerVersionId, err := s3a.createDeleteMarker(bucket, object.Key)
if err != nil {
deleteErrors = append(deleteErrors, DeleteError{
Code: "",
Message: err.Error(),
Key: object.Key,
VersionId: object.VersionId,
})
continue
}
deleteVersionId = deleteMarkerVersionId
isDeleteMarker = true
} else if versioningSuspended {
// Suspended versioning: Actually delete the "null" version object
glog.V(2).Infof("DeleteMultipleObjectsHandler: deleting null version for suspended versioning %s/%s", bucket, object.Key)
err := s3a.deleteSpecificObjectVersion(bucket, object.Key, "null")
if err != nil {
deleteErrors = append(deleteErrors, DeleteError{
Code: "",
Message: err.Error(),
Key: object.Key,
VersionId: "null",
})
continue
}
deleteVersionId = "null"
// Note: For suspended versioning, we don't set isDeleteMarker=true
// because we actually deleted the object, not created a delete marker
}
if err := s3a.deleteUnversionedObjectWithClient(client, bucket, object.Key); err != nil {
glog.Errorf("DeleteMultipleObjectsHandler: failed to delete %s/%s: %v", bucket, object.Key, err)
return s3err.ErrInternalError
}
// Add to successful deletions with version info
return s3err.ErrNone
})
if deleteCode != s3err.ErrNone {
deleteErrors = append(deleteErrors, deleteErrorFromCode(deleteCode, object.Key, object.VersionId))
continue
}
deletedCount++
if !deleteObjects.Quiet {
deletedObject := ObjectIdentifier{
Key: object.Key,
VersionId: deleteVersionId,
DeleteMarker: isDeleteMarker,
Key: object.Key,
VersionId: deleteResult.versionId,
}
// For delete markers, also set DeleteMarkerVersionId field
if isDeleteMarker {
deletedObject.DeleteMarkerVersionId = deleteVersionId
// Don't set VersionId for delete markers, use DeleteMarkerVersionId instead
if deleteResult.deleteMarker {
deletedObject.DeleteMarker = true
deletedObject.DeleteMarkerVersionId = deleteResult.versionId
deletedObject.VersionId = ""
}
if !deleteObjects.Quiet {
deletedObjects = append(deletedObjects, deletedObject)
}
if isDeleteMarker {
// For delete markers, we don't need to track directories for cleanup
continue
}
} else {
// Handle non-versioned delete (original logic)
target := util.NewFullPath(s3a.bucketDir(bucket), object.Key)
parentDirectoryPath, entryName := target.DirAndName()
isDeleteData, isRecursive := true, false
err := deleteObjectEntry(client, parentDirectoryPath, entryName, isDeleteData, isRecursive)
if err == nil {
deletedObjects = append(deletedObjects, object)
} else {
deleteErrors = append(deleteErrors, DeleteError{
Code: "",
Message: err.Error(),
Key: object.Key,
VersionId: object.VersionId,
})
}
deletedObjects = append(deletedObjects, deletedObject)
}
if auditLog != nil {
@ -368,6 +407,11 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h
return nil
})
if err != nil {
glog.Errorf("DeleteMultipleObjectsHandler: failed to initialize filer client for bucket %s: %v", bucket, err)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
deleteResp := DeleteObjectsResponse{}
if !deleteObjects.Quiet {
@ -375,7 +419,7 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h
}
deleteResp.Errors = deleteErrors
stats_collect.RecordBucketActiveTime(bucket)
stats_collect.S3DeletedObjectsCounter.WithLabelValues(bucket).Add(float64(len(deletedObjects)))
stats_collect.S3DeletedObjectsCounter.WithLabelValues(bucket).Add(float64(deletedCount))
writeSuccessResponseXML(w, r, deleteResp)

119
weed/s3api/s3api_object_handlers_delete_test.go

@ -0,0 +1,119 @@
package s3api
import (
"encoding/xml"
"testing"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
)
func TestValidateDeleteIfMatch(t *testing.T) {
s3a := NewS3ApiServerForTest()
existingEntry := &filer_pb.Entry{
Extended: map[string][]byte{
s3_constants.ExtETagKey: []byte("\"abc123\""),
},
}
deleteMarkerEntry := &filer_pb.Entry{
Extended: map[string][]byte{
s3_constants.ExtDeleteMarkerKey: []byte("true"),
},
}
testCases := []struct {
name string
entry *filer_pb.Entry
ifMatch string
missingCode s3err.ErrorCode
expected s3err.ErrorCode
}{
{
name: "matching etag succeeds",
entry: existingEntry,
ifMatch: "\"abc123\"",
missingCode: s3err.ErrPreconditionFailed,
expected: s3err.ErrNone,
},
{
name: "wildcard succeeds for existing entry",
entry: existingEntry,
ifMatch: "*",
missingCode: s3err.ErrPreconditionFailed,
expected: s3err.ErrNone,
},
{
name: "mismatched etag fails",
entry: existingEntry,
ifMatch: "\"other\"",
missingCode: s3err.ErrPreconditionFailed,
expected: s3err.ErrPreconditionFailed,
},
{
name: "missing current object fails single delete",
entry: nil,
ifMatch: "*",
missingCode: s3err.ErrPreconditionFailed,
expected: s3err.ErrPreconditionFailed,
},
{
name: "missing current object returns no such key for batch delete",
entry: nil,
ifMatch: "*",
missingCode: s3err.ErrNoSuchKey,
expected: s3err.ErrNoSuchKey,
},
{
name: "current delete marker behaves like missing object",
entry: normalizeConditionalTargetEntry(deleteMarkerEntry),
ifMatch: "*",
missingCode: s3err.ErrPreconditionFailed,
expected: s3err.ErrPreconditionFailed,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
if errCode := s3a.validateDeleteIfMatch(tc.entry, tc.ifMatch, tc.missingCode); errCode != tc.expected {
t.Fatalf("validateDeleteIfMatch() = %v, want %v", errCode, tc.expected)
}
})
}
}
func TestDeleteObjectsRequestUnmarshalConditionalETags(t *testing.T) {
var req DeleteObjectsRequest
body := []byte(`
<Delete>
<Quiet>true</Quiet>
<Object>
<Key>first.txt</Key>
<ETag>*</ETag>
</Object>
<Object>
<Key>second.txt</Key>
<VersionId>3HL4kqCxf3vjVBH40Nrjfkd</VersionId>
<ETag>"abc123"</ETag>
</Object>
</Delete>`)
if err := xml.Unmarshal(body, &req); err != nil {
t.Fatalf("xml.Unmarshal() error = %v", err)
}
if !req.Quiet {
t.Fatalf("expected Quiet=true")
}
if len(req.Objects) != 2 {
t.Fatalf("expected 2 objects, got %d", len(req.Objects))
}
if req.Objects[0].ETag != "*" {
t.Fatalf("expected first object ETag to be '*', got %q", req.Objects[0].ETag)
}
if req.Objects[1].ETag != "\"abc123\"" {
t.Fatalf("expected second object ETag to preserve quotes, got %q", req.Objects[1].ETag)
}
if req.Objects[1].VersionId != "3HL4kqCxf3vjVBH40Nrjfkd" {
t.Fatalf("expected second object VersionId to unmarshal, got %q", req.Objects[1].VersionId)
}
}

2
weed/s3api/s3api_object_handlers_multipart.go

@ -447,7 +447,7 @@ func (s3a *S3ApiServer) PutObjectPartHandler(w http.ResponseWriter, r *http.Requ
glog.V(2).Infof("PutObjectPart: bucket=%s, object=%s, uploadId=%s, partNumber=%d, size=%d",
bucket, object, uploadID, partID, r.ContentLength)
etag, errCode, sseMetadata := s3a.putToFiler(r, filePath, dataReader, bucket, partID)
etag, errCode, sseMetadata := s3a.putToFiler(r, filePath, dataReader, bucket, "", partID, nil)
if errCode != s3err.ErrNone {
glog.Errorf("PutObjectPart: putToFiler failed with error code %v for bucket=%s, object=%s, partNumber=%d",
errCode, bucket, object, partID)

2
weed/s3api/s3api_object_handlers_postpolicy.go

@ -140,7 +140,7 @@ func (s3a *S3ApiServer) PostPolicyBucketHandler(w http.ResponseWriter, r *http.R
}
}
etag, errCode, sseMetadata := s3a.putToFiler(r, filePath, fileBody, bucket, 1)
etag, errCode, sseMetadata := s3a.putToFiler(r, filePath, fileBody, bucket, object, 1, nil)
if errCode != s3err.ErrNone {
s3err.WriteErrorResponse(w, r, errCode)

236
weed/s3api/s3api_object_handlers_put.go

@ -292,7 +292,7 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request)
dataReader = mimeDetect(r, dataReader)
}
etag, errCode, sseMetadata := s3a.putToFiler(r, filePath, dataReader, bucket, 1)
etag, errCode, sseMetadata := s3a.putToFiler(r, filePath, dataReader, bucket, object, 1, nil)
if errCode != s3err.ErrNone {
s3err.WriteErrorResponse(w, r, errCode)
@ -312,7 +312,42 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request)
writeSuccessResponseEmpty(w, r)
}
func (s3a *S3ApiServer) putToFiler(r *http.Request, filePath string, dataReader io.Reader, bucket string, partNumber int) (etag string, code s3err.ErrorCode, sseMetadata SSEResponseMetadata) {
func (s3a *S3ApiServer) withObjectWriteLock(bucket, object string, preconditionFn func() s3err.ErrorCode, fn func() s3err.ErrorCode) s3err.ErrorCode {
runPrecondition := func() s3err.ErrorCode {
if preconditionFn == nil {
return s3err.ErrNone
}
return preconditionFn()
}
if object == "" || s3a.newObjectWriteLock == nil {
if errCode := runPrecondition(); errCode != s3err.ErrNone {
return errCode
}
return fn()
}
lock := s3a.newObjectWriteLock(bucket, object)
if lock == nil {
if errCode := runPrecondition(); errCode != s3err.ErrNone {
return errCode
}
return fn()
}
defer func() {
if err := lock.StopShortLivedLock(); err != nil {
glog.Warningf("withObjectWriteLock: failed to release lock for %s/%s: %v", bucket, object, err)
}
}()
if errCode := runPrecondition(); errCode != s3err.ErrNone {
return errCode
}
return fn()
}
func (s3a *S3ApiServer) putToFiler(r *http.Request, filePath string, dataReader io.Reader, bucket string, object string, partNumber int, afterCreate func(entry *filer_pb.Entry) s3err.ErrorCode) (etag string, code s3err.ErrorCode, sseMetadata SSEResponseMetadata) {
// NEW OPTIMIZATION: Write directly to volume servers, bypassing filer proxy
// This eliminates the filer proxy overhead for PUT operations
// Note: filePath is now passed directly instead of URL (no parsing needed)
@ -598,12 +633,8 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, filePath string, dataReader
// Store ETag in Extended attribute for future retrieval (e.g. multipart parts)
entry.Extended[s3_constants.ExtETagKey] = []byte(etag)
// Set object owner
amzAccountId := r.Header.Get(s3_constants.AmzAccountId)
if amzAccountId != "" {
entry.Extended[s3_constants.ExtAmzOwnerKey] = []byte(amzAccountId)
glog.V(2).Infof("putToFiler: setting owner %s for object %s", amzAccountId, filePath)
}
// Set object owner according to bucket ownership settings.
s3a.setObjectOwnerFromRequest(r, bucket, entry)
// Set version ID if present
if versionIdHeader := r.Header.Get(s3_constants.ExtVersionIdKey); versionIdHeader != "" {
@ -611,6 +642,16 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, filePath string, dataReader
glog.V(3).Infof("putToFiler: setting version ID %s for object %s", versionIdHeader, filePath)
}
for _, metadataHeader := range []string{
s3_constants.ExtObjectLockModeKey,
s3_constants.ExtRetentionUntilDateKey,
s3_constants.ExtLegalHoldKey,
} {
if value := r.Header.Get(metadataHeader); value != "" {
entry.Extended[metadataHeader] = []byte(value)
}
}
// Set TTL-based S3 expiry flag only if object has a TTL
if entry.Attributes.TtlSec > 0 {
entry.Extended[s3_constants.SeaweedFSExpiresS3] = []byte("true")
@ -699,30 +740,57 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, filePath string, dataReader
// This matches the chunk upload behavior and prevents orphaned chunks
glog.V(3).Infof("putToFiler: About to create entry - dir=%s, name=%s, chunks=%d, extended keys=%d",
path.Dir(filePath), path.Base(filePath), len(entry.Chunks), len(entry.Extended))
createErr := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
req := &filer_pb.CreateEntryRequest{
Directory: path.Dir(filePath),
Entry: entry,
}
glog.V(3).Infof("putToFiler: Calling CreateEntry for %s", filePath)
if err := filer_pb.CreateEntry(context.Background(), client, req); err != nil {
glog.Errorf("putToFiler: CreateEntry returned error: %v", err)
return err
var createErr error
var rollbackErr error
entryCreated := false
preconditionFn := func() s3err.ErrorCode {
if object == "" {
return s3err.ErrNone
}
return s3a.checkConditionalHeaders(r, bucket, object)
}
createCode := s3a.withObjectWriteLock(bucket, object, preconditionFn, func() s3err.ErrorCode {
createErr = s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
req := &filer_pb.CreateEntryRequest{
Directory: path.Dir(filePath),
Entry: entry,
}
glog.V(3).Infof("putToFiler: Calling CreateEntry for %s", filePath)
if err := filer_pb.CreateEntry(context.Background(), client, req); err != nil {
glog.Errorf("putToFiler: CreateEntry returned error: %v", err)
return err
}
return nil
})
if createErr != nil {
return filerErrorToS3Error(createErr)
}
entryCreated = true
if afterCreate != nil {
if afterCreateCode := afterCreate(entry); afterCreateCode != s3err.ErrNone {
rollbackErr = s3a.rmObject(path.Dir(filePath), path.Base(filePath), true, false)
if rollbackErr != nil {
glog.Errorf("putToFiler: failed to rollback created entry for %s after post-create error: %v", filePath, rollbackErr)
} else {
entryCreated = false
}
return afterCreateCode
}
}
return nil
return s3err.ErrNone
})
if createErr != nil {
glog.Errorf("putToFiler: failed to create entry for %s: %v", filePath, createErr)
if createCode != s3err.ErrNone {
if createErr != nil {
glog.Errorf("putToFiler: failed to create entry for %s: %v", filePath, createErr)
}
// CRITICAL: Cleanup orphaned chunks before returning error
// If CreateEntry fails, the uploaded chunks are orphaned and must be deleted
// to prevent resource leaks and wasted storage
if len(chunkResult.FileChunks) > 0 {
glog.Warningf("putToFiler: CreateEntry failed, attempting to cleanup %d orphaned chunks", len(chunkResult.FileChunks))
// If the entry was never created, the uploaded chunks are orphaned and must be deleted.
if !entryCreated && len(chunkResult.FileChunks) > 0 {
glog.Warningf("putToFiler: finalization failed, attempting to cleanup %d orphaned chunks", len(chunkResult.FileChunks))
s3a.deleteOrphanedChunks(chunkResult.FileChunks)
}
return "", filerErrorToS3Error(createErr), SSEResponseMetadata{}
return "", createCode, SSEResponseMetadata{}
}
glog.V(3).Infof("putToFiler: CreateEntry SUCCESS for %s", filePath)
@ -982,7 +1050,7 @@ func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, ob
}
// Upload the file using putToFiler - this will create the file with version metadata
etag, errCode, sseMetadata = s3a.putToFiler(r, filePath, body, bucket, 1)
etag, errCode, sseMetadata = s3a.putToFiler(r, filePath, body, bucket, normalizedObject, 1, nil)
if errCode != s3err.ErrNone {
glog.Errorf("putSuspendedVersioningObject: failed to upload object: %v", errCode)
return "", errCode, SSEResponseMetadata{}
@ -1088,8 +1156,6 @@ func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object strin
// We need to construct the object path relative to the bucket
versionObjectPath := normalizedObject + s3_constants.VersionsFolder + "/" + versionFileName
versionFilePath := s3a.toFilerPath(bucket, versionObjectPath)
bucketDir := s3a.bucketDir(bucket)
body := dataReader
if objectContentType == "" {
body = mimeDetect(r, body)
@ -1097,71 +1163,55 @@ func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object strin
glog.V(2).Infof("putVersionedObject: uploading %s/%s version %s to %s", bucket, object, versionId, versionFilePath)
etag, errCode, sseMetadata = s3a.putToFiler(r, versionFilePath, body, bucket, 1)
if errCode != s3err.ErrNone {
glog.Errorf("putVersionedObject: failed to upload version: %v", errCode)
return "", "", errCode, SSEResponseMetadata{}
}
r.Header.Set(s3_constants.ExtVersionIdKey, versionId)
defer r.Header.Del(s3_constants.ExtVersionIdKey)
// Get the uploaded entry to add versioning metadata
// Use retry logic to handle filer consistency delays
var versionEntry *filer_pb.Entry
var err error
maxRetries := 8
for attempt := 1; attempt <= maxRetries; attempt++ {
versionEntry, err = s3a.getEntry(bucketDir, versionObjectPath)
if err == nil {
break
}
explicitMode := r.Header.Get(s3_constants.AmzObjectLockMode)
explicitRetainUntilDate := r.Header.Get(s3_constants.AmzObjectLockRetainUntilDate)
if attempt < maxRetries {
// Exponential backoff: 10ms, 20ms, 40ms, 80ms, 160ms, 320ms, 640ms
delay := time.Millisecond * time.Duration(10*(1<<(attempt-1)))
time.Sleep(delay)
}
if explicitMode != "" {
r.Header.Set(s3_constants.ExtObjectLockModeKey, explicitMode)
defer r.Header.Del(s3_constants.ExtObjectLockModeKey)
}
if err != nil {
glog.Errorf("putVersionedObject: failed to get version entry after %d attempts: %v", maxRetries, err)
return "", "", s3err.ErrInternalError, SSEResponseMetadata{}
if explicitRetainUntilDate != "" {
parsedTime, parseErr := time.Parse(time.RFC3339, explicitRetainUntilDate)
if parseErr != nil {
glog.Errorf("putVersionedObject: failed to parse retention until date: %v", parseErr)
return "", "", s3err.ErrInvalidRequest, SSEResponseMetadata{}
}
r.Header.Set(s3_constants.ExtRetentionUntilDateKey, strconv.FormatInt(parsedTime.Unix(), 10))
defer r.Header.Del(s3_constants.ExtRetentionUntilDateKey)
}
// Add versioning metadata to this version
if versionEntry.Extended == nil {
versionEntry.Extended = make(map[string][]byte)
if legalHold := r.Header.Get(s3_constants.AmzObjectLockLegalHold); legalHold != "" {
r.Header.Set(s3_constants.ExtLegalHoldKey, legalHold)
defer r.Header.Del(s3_constants.ExtLegalHoldKey)
}
versionEntry.Extended[s3_constants.ExtVersionIdKey] = []byte(versionId)
// Store ETag (unquoted) in Extended attribute
versionEntry.Extended[s3_constants.ExtETagKey] = []byte(etag)
// Set object owner for versioned objects
s3a.setObjectOwnerFromRequest(r, bucket, versionEntry)
// Extract and store object lock metadata from request headers
if err := s3a.extractObjectLockMetadataFromRequest(r, versionEntry); err != nil {
glog.Errorf("putVersionedObject: failed to extract object lock metadata: %v", err)
return "", "", s3err.ErrInvalidRequest, SSEResponseMetadata{}
if explicitMode == "" && explicitRetainUntilDate == "" {
tempEntry := &filer_pb.Entry{Extended: make(map[string][]byte)}
if err := s3a.applyBucketDefaultRetention(bucket, tempEntry); err == nil {
if modeBytes, ok := tempEntry.Extended[s3_constants.ExtObjectLockModeKey]; ok {
r.Header.Set(s3_constants.ExtObjectLockModeKey, string(modeBytes))
defer r.Header.Del(s3_constants.ExtObjectLockModeKey)
}
if dateBytes, ok := tempEntry.Extended[s3_constants.ExtRetentionUntilDateKey]; ok {
r.Header.Set(s3_constants.ExtRetentionUntilDateKey, string(dateBytes))
defer r.Header.Del(s3_constants.ExtRetentionUntilDateKey)
}
}
}
// Update the version entry with metadata
err = s3a.mkFile(bucketDir, versionObjectPath, versionEntry.Chunks, func(updatedEntry *filer_pb.Entry) {
updatedEntry.Extended = versionEntry.Extended
updatedEntry.Attributes = versionEntry.Attributes
updatedEntry.Chunks = versionEntry.Chunks
etag, errCode, sseMetadata = s3a.putToFiler(r, versionFilePath, body, bucket, normalizedObject, 1, func(versionEntry *filer_pb.Entry) s3err.ErrorCode {
if err := s3a.updateLatestVersionInDirectory(bucket, normalizedObject, versionId, versionFileName, versionEntry); err != nil {
glog.Errorf("putVersionedObject: failed to update latest version in directory: %v", err)
return s3err.ErrInternalError
}
return s3err.ErrNone
})
if err != nil {
glog.Errorf("putVersionedObject: failed to update version metadata: %v", err)
return "", "", s3err.ErrInternalError, SSEResponseMetadata{}
if errCode != s3err.ErrNone {
glog.Errorf("putVersionedObject: failed to upload version: %v", errCode)
return "", "", errCode, SSEResponseMetadata{}
}
// Update the .versions directory metadata to indicate this is the latest version
// Pass versionEntry to cache its metadata for single-scan list efficiency
err = s3a.updateLatestVersionInDirectory(bucket, normalizedObject, versionId, versionFileName, versionEntry)
if err != nil {
glog.Errorf("putVersionedObject: failed to update latest version in directory: %v", err)
return "", "", s3err.ErrInternalError, SSEResponseMetadata{}
}
glog.V(2).Infof("putVersionedObject: successfully created version %s for %s/%s (normalized: %s)", versionId, bucket, object, normalizedObject)
return versionId, etag, s3err.ErrNone, sseMetadata
}
@ -1685,12 +1735,25 @@ func (s3a *S3ApiServer) etagMatches(headerValue, objectETag string) bool {
return false
}
func normalizeConditionalTargetEntry(entry *filer_pb.Entry) *filer_pb.Entry {
if entry == nil {
return nil
}
if entry.Extended != nil {
if deleteMarker, exists := entry.Extended[s3_constants.ExtDeleteMarkerKey]; exists && string(deleteMarker) == "true" {
return nil
}
}
return entry
}
// validateConditionalHeaders checks conditional headers against the provided entry
func (s3a *S3ApiServer) validateConditionalHeaders(r *http.Request, headers conditionalHeaders, entry *filer_pb.Entry, bucket, object string) s3err.ErrorCode {
if !headers.isSet {
return s3err.ErrNone
}
entry = normalizeConditionalTargetEntry(entry)
objectExists := entry != nil
// For PUT requests, all specified conditions must be met.
@ -1812,7 +1875,7 @@ func (s3a *S3ApiServer) checkConditionalHeaders(r *http.Request, bucket, object
// This ensures we check conditions against the LATEST version, not a null version.
entry, err := s3a.resolveObjectEntry(bucket, object)
if err != nil {
if errors.Is(err, filer_pb.ErrNotFound) {
if errors.Is(err, filer_pb.ErrNotFound) || errors.Is(err, ErrDeleteMarker) {
entry = nil
} else {
glog.Errorf("checkConditionalHeaders: error resolving object entry for %s/%s: %v", bucket, object, err)
@ -1828,6 +1891,7 @@ func (s3a *S3ApiServer) validateConditionalHeadersForReads(r *http.Request, head
return ConditionalHeaderResult{ErrorCode: s3err.ErrNone, Entry: entry}
}
entry = normalizeConditionalTargetEntry(entry)
objectExists := entry != nil
// If object doesn't exist, fail for If-Match and If-Unmodified-Since
@ -1954,7 +2018,7 @@ func (s3a *S3ApiServer) checkConditionalHeadersForReads(r *http.Request, bucket,
// This ensures we check conditions against the LATEST version, not a null version.
entry, err := s3a.resolveObjectEntry(bucket, object)
if err != nil {
if errors.Is(err, filer_pb.ErrNotFound) {
if errors.Is(err, filer_pb.ErrNotFound) || errors.Is(err, ErrDeleteMarker) {
entry = nil
} else {
glog.Errorf("checkConditionalHeadersForReads: error resolving object entry for %s/%s: %v", bucket, object, err)

102
weed/s3api/s3api_object_handlers_put_test.go

@ -7,6 +7,7 @@ import (
"net/http"
"net/http/httptest"
"strings"
"sync"
"testing"
"github.com/gorilla/mux"
@ -199,3 +200,104 @@ func TestNewMultipartUploadHandler_KeyTooLong(t *testing.T) {
t.Errorf("expected error code KeyTooLongError, got %s", errResp.Code)
}
}
type testObjectWriteLockFactory struct {
mu sync.Mutex
locks map[string]*sync.Mutex
}
func (f *testObjectWriteLockFactory) newLock(bucket, object string) objectWriteLock {
key := bucket + "|" + object
f.mu.Lock()
lock, ok := f.locks[key]
if !ok {
lock = &sync.Mutex{}
f.locks[key] = lock
}
f.mu.Unlock()
lock.Lock()
return &testObjectWriteLock{unlock: lock.Unlock}
}
type testObjectWriteLock struct {
once sync.Once
unlock func()
}
func (l *testObjectWriteLock) StopShortLivedLock() error {
l.once.Do(l.unlock)
return nil
}
func TestWithObjectWriteLockSerializesConcurrentPreconditions(t *testing.T) {
s3a := NewS3ApiServerForTest()
lockFactory := &testObjectWriteLockFactory{
locks: make(map[string]*sync.Mutex),
}
s3a.newObjectWriteLock = lockFactory.newLock
const workers = 3
const bucket = "test-bucket"
const object = "/file.txt"
start := make(chan struct{})
results := make(chan s3err.ErrorCode, workers)
var wg sync.WaitGroup
var stateMu sync.Mutex
objectExists := false
for i := 0; i < workers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
<-start
errCode := s3a.withObjectWriteLock(bucket, object,
func() s3err.ErrorCode {
stateMu.Lock()
defer stateMu.Unlock()
if objectExists {
return s3err.ErrPreconditionFailed
}
return s3err.ErrNone
},
func() s3err.ErrorCode {
stateMu.Lock()
defer stateMu.Unlock()
objectExists = true
return s3err.ErrNone
},
)
results <- errCode
}()
}
close(start)
wg.Wait()
close(results)
var successCount int
var preconditionFailedCount int
for errCode := range results {
switch errCode {
case s3err.ErrNone:
successCount++
case s3err.ErrPreconditionFailed:
preconditionFailedCount++
default:
t.Fatalf("unexpected error code: %v", errCode)
}
}
if successCount != 1 {
t.Fatalf("expected exactly one successful writer, got %d", successCount)
}
if preconditionFailedCount != workers-1 {
t.Fatalf("expected %d precondition failures, got %d", workers-1, preconditionFailedCount)
}
}

24
weed/s3api/s3api_server.go

@ -82,8 +82,17 @@ type S3ApiServer struct {
embeddedIam *EmbeddedIamApi // Embedded IAM API server (when enabled)
stsHandlers *STSHandlers // STS HTTP handlers for AssumeRoleWithWebIdentity
cipher bool // encrypt data on volume servers
newObjectWriteLock func(bucket, object string) objectWriteLock
}
type objectWriteLock interface {
StopShortLivedLock() error
}
const (
objectWriteLockTTL = 15 * time.Second
)
func NewS3ApiServer(router *mux.Router, option *S3ApiServerOption) (s3ApiServer *S3ApiServer, err error) {
return NewS3ApiServerWithStore(router, option, "")
}
@ -182,6 +191,21 @@ func NewS3ApiServerWithStore(router *mux.Router, option *S3ApiServerOption, expl
cipher: option.Cipher,
}
if len(option.Filers) > 0 {
objectWriteLockClient := cluster.NewLockClient(option.GrpcDialOption, option.Filers[0])
s3ApiServer.newObjectWriteLock = func(bucket, object string) objectWriteLock {
lockKey := fmt.Sprintf("s3.object.write:%s", s3ApiServer.toFilerPath(bucket, object))
owner := fmt.Sprintf("s3api-%d", s3ApiServer.randomClientId)
lock := objectWriteLockClient.NewShortLivedLock(lockKey, owner)
if err := lock.AttemptToLock(objectWriteLockTTL); err != nil {
// The initial acquisition already succeeded with the default short TTL.
// Renewal to a longer TTL is opportunistic to cover slower metadata paths.
glog.Warningf("objectWriteLock: failed to extend lock TTL for %s: %v", lockKey, err)
}
return lock
}
}
// Set s3a reference in circuit breaker for upload limiting
s3ApiServer.cb.s3a = s3ApiServer

6
weed/s3api/s3err/s3api_errors.go

@ -68,6 +68,7 @@ const (
ErrInvalidMaxDeleteObjects
ErrInvalidPartNumberMarker
ErrInvalidPart
ErrInvalidPartOrder
ErrInvalidRange
ErrInternalError
ErrInvalidCopyDest
@ -291,6 +292,11 @@ var errorCodeResponse = map[ErrorCode]APIError{
Description: "One or more of the specified parts could not be found. The part may not have been uploaded, or the specified entity tag may not match the part's entity tag.",
HTTPStatusCode: http.StatusBadRequest,
},
ErrInvalidPartOrder: {
Code: "InvalidPartOrder",
Description: "The list of parts was not in ascending order. The parts list must be specified in order by part number.",
HTTPStatusCode: http.StatusBadRequest,
},
ErrInvalidCopyDest: {
Code: "InvalidRequest",

Loading…
Cancel
Save