Browse Source

Merge pull request #2901 from guol-fnst/fix_multiupload

check object name and uploadID when processing  multiupload
pull/2906/head
Chris Lu 3 years ago
committed by GitHub
parent
commit
72db181d68
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      weed/s3api/filer_multipart.go
  2. 44
      weed/s3api/s3api_object_multipart_handlers.go

4
weed/s3api/filer_multipart.go

@ -13,7 +13,6 @@ import (
"github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3"
"github.com/google/uuid"
"github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
@ -29,8 +28,7 @@ func (s3a *S3ApiServer) createMultipartUpload(input *s3.CreateMultipartUploadInp
glog.V(2).Infof("createMultipartUpload input %v", input) glog.V(2).Infof("createMultipartUpload input %v", input)
uploadId, _ := uuid.NewRandom()
uploadIdString := uploadId.String()
uploadIdString := s3a.generateUploadID(*input.Key)
if err := s3a.mkdir(s3a.genUploadsFolder(*input.Bucket), uploadIdString, func(entry *filer_pb.Entry) { if err := s3a.mkdir(s3a.genUploadsFolder(*input.Bucket), uploadIdString, func(entry *filer_pb.Entry) {
if entry.Extended == nil { if entry.Extended == nil {

44
weed/s3api/s3api_object_multipart_handlers.go

@ -2,6 +2,7 @@ package s3api
import ( import (
"encoding/xml" "encoding/xml"
"crypto/sha1"
"fmt" "fmt"
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
xhttp "github.com/chrislusf/seaweedfs/weed/s3api/http" xhttp "github.com/chrislusf/seaweedfs/weed/s3api/http"
@ -70,6 +71,11 @@ func (s3a *S3ApiServer) CompleteMultipartUploadHandler(w http.ResponseWriter, r
// Get upload id. // Get upload id.
uploadID, _, _, _ := getObjectResources(r.URL.Query()) uploadID, _, _, _ := getObjectResources(r.URL.Query())
err := s3a.checkUploadId(object, uploadID)
if err != nil {
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchUpload)
return
}
response, errCode := s3a.completeMultipartUpload(&s3.CompleteMultipartUploadInput{ response, errCode := s3a.completeMultipartUpload(&s3.CompleteMultipartUploadInput{
Bucket: aws.String(bucket), Bucket: aws.String(bucket),
@ -94,6 +100,11 @@ func (s3a *S3ApiServer) AbortMultipartUploadHandler(w http.ResponseWriter, r *ht
// Get upload id. // Get upload id.
uploadID, _, _, _ := getObjectResources(r.URL.Query()) uploadID, _, _, _ := getObjectResources(r.URL.Query())
err := s3a.checkUploadId(object, uploadID)
if err != nil {
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchUpload)
return
}
response, errCode := s3a.abortMultipartUpload(&s3.AbortMultipartUploadInput{ response, errCode := s3a.abortMultipartUpload(&s3.AbortMultipartUploadInput{
Bucket: aws.String(bucket), Bucket: aws.String(bucket),
@ -165,6 +176,12 @@ func (s3a *S3ApiServer) ListObjectPartsHandler(w http.ResponseWriter, r *http.Re
return return
} }
err := s3a.checkUploadId(object, uploadID)
if err != nil {
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchUpload)
return
}
response, errCode := s3a.listObjectParts(&s3.ListPartsInput{ response, errCode := s3a.listObjectParts(&s3.ListPartsInput{
Bucket: aws.String(bucket), Bucket: aws.String(bucket),
Key: objectKey(aws.String(object)), Key: objectKey(aws.String(object)),
@ -186,11 +203,11 @@ func (s3a *S3ApiServer) ListObjectPartsHandler(w http.ResponseWriter, r *http.Re
// PutObjectPartHandler - Put an object part in a multipart upload. // PutObjectPartHandler - Put an object part in a multipart upload.
func (s3a *S3ApiServer) PutObjectPartHandler(w http.ResponseWriter, r *http.Request) { func (s3a *S3ApiServer) PutObjectPartHandler(w http.ResponseWriter, r *http.Request) {
bucket, _ := xhttp.GetBucketAndObject(r)
bucket, object := xhttp.GetBucketAndObject(r)
uploadID := r.URL.Query().Get("uploadId") uploadID := r.URL.Query().Get("uploadId")
exists, err := s3a.exists(s3a.genUploadsFolder(bucket), uploadID, true)
if !exists {
err := s3a.checkUploadId(object, uploadID)
if err != nil {
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchUpload) s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchUpload)
return return
} }
@ -250,6 +267,27 @@ func (s3a *S3ApiServer) genUploadsFolder(bucket string) string {
return fmt.Sprintf("%s/%s/.uploads", s3a.option.BucketsPath, bucket) return fmt.Sprintf("%s/%s/.uploads", s3a.option.BucketsPath, bucket)
} }
// Generate uploadID hash string from object
func (s3a *S3ApiServer) generateUploadID(object string) string {
if strings.HasPrefix(object, "/") {
object = object[1:]
}
h := sha1.New()
h.Write([]byte(object))
return fmt.Sprintf("%x", h.Sum(nil))
}
//Check object name and uploadID when processing multipart uploading
func (s3a *S3ApiServer) checkUploadId(object string, id string) error {
hash := s3a.generateUploadID(object)
if hash != id {
glog.Errorf("object %s and uploadID %s are not matched", object, id)
return fmt.Errorf("object %s and uploadID %s are not matched", object, id)
}
return nil
}
// Parse bucket url queries for ?uploads // Parse bucket url queries for ?uploads
func getBucketMultipartResources(values url.Values) (prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int, encodingType string) { func getBucketMultipartResources(values url.Values) (prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int, encodingType string) {
prefix = values.Get("prefix") prefix = values.Get("prefix")

Loading…
Cancel
Save