Browse Source

fixes

pull/6997/head
chrislu 5 months ago
parent
commit
f63f40a3d4
  1. 54
      weed/s3api/filer_multipart.go
  2. 29
      weed/s3api/s3api_object_handlers_multipart.go

54
weed/s3api/filer_multipart.go

@ -21,6 +21,8 @@ import (
"github.com/google/uuid" "github.com/google/uuid"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err" "github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
"net/http"
"github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
@ -36,7 +38,7 @@ type InitiateMultipartUploadResult struct {
s3.CreateMultipartUploadOutput s3.CreateMultipartUploadOutput
} }
func (s3a *S3ApiServer) createMultipartUpload(input *s3.CreateMultipartUploadInput) (output *InitiateMultipartUploadResult, code s3err.ErrorCode) {
func (s3a *S3ApiServer) createMultipartUpload(r *http.Request, input *s3.CreateMultipartUploadInput) (output *InitiateMultipartUploadResult, code s3err.ErrorCode) {
glog.V(2).Infof("createMultipartUpload input %v", input) glog.V(2).Infof("createMultipartUpload input %v", input)
@ -55,6 +57,13 @@ func (s3a *S3ApiServer) createMultipartUpload(input *s3.CreateMultipartUploadInp
if input.ContentType != nil { if input.ContentType != nil {
entry.Attributes.Mime = *input.ContentType entry.Attributes.Mime = *input.ContentType
} }
// Extract and store object lock metadata from request headers
// This ensures object lock settings from create_multipart_upload are preserved
if err := s3a.extractObjectLockMetadataFromRequest(r, entry); err != nil {
glog.Errorf("createMultipartUpload: failed to extract object lock metadata: %v", err)
// Don't fail the upload - this matches AWS behavior for invalid metadata
}
}); err != nil { }); err != nil {
glog.Errorf("NewMultipartUpload error: %v", err) glog.Errorf("NewMultipartUpload error: %v", err)
return nil, s3err.ErrInternalError return nil, s3err.ErrInternalError
@ -72,8 +81,15 @@ func (s3a *S3ApiServer) createMultipartUpload(input *s3.CreateMultipartUploadInp
} }
type CompleteMultipartUploadResult struct { type CompleteMultipartUploadResult struct {
XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ CompleteMultipartUploadResult"`
s3.CompleteMultipartUploadOutput
XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ CompleteMultipartUploadResult"`
Location *string `xml:"Location,omitempty"`
Bucket *string `xml:"Bucket,omitempty"`
Key *string `xml:"Key,omitempty"`
ETag *string `xml:"ETag,omitempty"`
// VersionId is NOT included in XML body - it should only be in x-amz-version-id HTTP header
// Store the VersionId internally for setting HTTP header, but don't marshal to XML
VersionId *string `xml:"-"`
} }
func (s3a *S3ApiServer) completeMultipartUpload(input *s3.CompleteMultipartUploadInput, parts *CompleteMultipartUpload) (output *CompleteMultipartUploadResult, code s3err.ErrorCode) { func (s3a *S3ApiServer) completeMultipartUpload(input *s3.CompleteMultipartUploadInput, parts *CompleteMultipartUpload) (output *CompleteMultipartUploadResult, code s3err.ErrorCode) {
@ -110,12 +126,10 @@ func (s3a *S3ApiServer) completeMultipartUpload(input *s3.CompleteMultipartUploa
if entry, _ := s3a.getEntry(dirName, entryName); entry != nil && entry.Extended != nil { if entry, _ := s3a.getEntry(dirName, entryName); entry != nil && entry.Extended != nil {
if uploadId, ok := entry.Extended[s3_constants.SeaweedFSUploadId]; ok && *input.UploadId == string(uploadId) { if uploadId, ok := entry.Extended[s3_constants.SeaweedFSUploadId]; ok && *input.UploadId == string(uploadId) {
return &CompleteMultipartUploadResult{ return &CompleteMultipartUploadResult{
CompleteMultipartUploadOutput: s3.CompleteMultipartUploadOutput{
Location: aws.String(fmt.Sprintf("http://%s%s/%s", s3a.option.Filer.ToHttpAddress(), urlEscapeObject(dirName), urlPathEscape(entryName))),
Bucket: input.Bucket,
ETag: aws.String("\"" + filer.ETagChunks(entry.GetChunks()) + "\""),
Key: objectKey(input.Key),
},
Location: aws.String(fmt.Sprintf("http://%s%s/%s", s3a.option.Filer.ToHttpAddress(), urlEscapeObject(dirName), urlPathEscape(entryName))),
Bucket: input.Bucket,
ETag: aws.String("\"" + filer.ETagChunks(entry.GetChunks()) + "\""),
Key: objectKey(input.Key),
}, s3err.ErrNone }, s3err.ErrNone
} }
} }
@ -295,23 +309,19 @@ func (s3a *S3ApiServer) completeMultipartUpload(input *s3.CompleteMultipartUploa
} }
output = &CompleteMultipartUploadResult{ output = &CompleteMultipartUploadResult{
CompleteMultipartUploadOutput: s3.CompleteMultipartUploadOutput{
Location: aws.String(fmt.Sprintf("http://%s%s/%s", s3a.option.Filer.ToHttpAddress(), urlEscapeObject(dirName), urlPathEscape(entryName))),
Bucket: input.Bucket,
ETag: aws.String("\"" + filer.ETagChunks(finalParts) + "\""),
Key: objectKey(input.Key),
VersionId: aws.String(versionId),
},
Location: aws.String(fmt.Sprintf("http://%s%s/%s", s3a.option.Filer.ToHttpAddress(), urlEscapeObject(dirName), urlPathEscape(entryName))),
Bucket: input.Bucket,
ETag: aws.String("\"" + filer.ETagChunks(finalParts) + "\""),
Key: objectKey(input.Key),
VersionId: aws.String(versionId),
} }
} else { } else {
// For non-versioned buckets, return response without VersionId // For non-versioned buckets, return response without VersionId
output = &CompleteMultipartUploadResult{ output = &CompleteMultipartUploadResult{
CompleteMultipartUploadOutput: s3.CompleteMultipartUploadOutput{
Location: aws.String(fmt.Sprintf("http://%s%s/%s", s3a.option.Filer.ToHttpAddress(), urlEscapeObject(dirName), urlPathEscape(entryName))),
Bucket: input.Bucket,
ETag: aws.String("\"" + filer.ETagChunks(finalParts) + "\""),
Key: objectKey(input.Key),
},
Location: aws.String(fmt.Sprintf("http://%s%s/%s", s3a.option.Filer.ToHttpAddress(), urlEscapeObject(dirName), urlPathEscape(entryName))),
Bucket: input.Bucket,
ETag: aws.String("\"" + filer.ETagChunks(finalParts) + "\""),
Key: objectKey(input.Key),
} }
} }

29
weed/s3api/s3api_object_handlers_multipart.go

@ -14,6 +14,7 @@ import (
"github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err" "github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
weed_server "github.com/seaweedfs/seaweedfs/weed/server" weed_server "github.com/seaweedfs/seaweedfs/weed/server"
@ -37,6 +38,25 @@ func (s3a *S3ApiServer) NewMultipartUploadHandler(w http.ResponseWriter, r *http
return return
} }
// Check if versioning is enabled for the bucket (needed for object lock)
versioningEnabled, err := s3a.isVersioningEnabled(bucket)
if err != nil {
if err == filer_pb.ErrNotFound {
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket)
return
}
glog.Errorf("Error checking versioning status for bucket %s: %v", bucket, err)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
// Validate object lock headers before processing
if err := s3a.validateObjectLockHeaders(r, versioningEnabled); err != nil {
glog.V(2).Infof("NewMultipartUploadHandler: object lock header validation failed for bucket %s, object %s: %v", bucket, object, err)
s3err.WriteErrorResponse(w, r, mapValidationErrorToS3Error(err))
return
}
createMultipartUploadInput := &s3.CreateMultipartUploadInput{ createMultipartUploadInput := &s3.CreateMultipartUploadInput{
Bucket: aws.String(bucket), Bucket: aws.String(bucket),
Key: objectKey(aws.String(object)), Key: objectKey(aws.String(object)),
@ -52,7 +72,7 @@ func (s3a *S3ApiServer) NewMultipartUploadHandler(w http.ResponseWriter, r *http
if contentType != "" { if contentType != "" {
createMultipartUploadInput.ContentType = &contentType createMultipartUploadInput.ContentType = &contentType
} }
response, errCode := s3a.createMultipartUpload(createMultipartUploadInput)
response, errCode := s3a.createMultipartUpload(r, createMultipartUploadInput)
glog.V(2).Info("NewMultipartUploadHandler", string(s3err.EncodeXMLResponse(response)), errCode) glog.V(2).Info("NewMultipartUploadHandler", string(s3err.EncodeXMLResponse(response)), errCode)
@ -103,8 +123,15 @@ func (s3a *S3ApiServer) CompleteMultipartUploadHandler(w http.ResponseWriter, r
s3err.WriteErrorResponse(w, r, errCode) s3err.WriteErrorResponse(w, r, errCode)
return return
} }
// Set version ID in HTTP header if present
if response.VersionId != nil {
w.Header().Set("x-amz-version-id", *response.VersionId)
}
stats_collect.RecordBucketActiveTime(bucket) stats_collect.RecordBucketActiveTime(bucket)
stats_collect.S3UploadedObjectsCounter.WithLabelValues(bucket).Inc() stats_collect.S3UploadedObjectsCounter.WithLabelValues(bucket).Inc()
writeSuccessResponseXML(w, r, response) writeSuccessResponseXML(w, r, response)
} }

Loading…
Cancel
Save