You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

350 lines
10 KiB

3 years ago
7 years ago
4 years ago
4 years ago
5 years ago
3 years ago
3 years ago
4 years ago
5 years ago
3 years ago
3 years ago
5 years ago
3 years ago
4 years ago
3 years ago
3 years ago
3 years ago
4 years ago
5 years ago
3 years ago
3 years ago
3 years ago
3 years ago
5 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
5 years ago
5 years ago
3 years ago
3 years ago
3 years ago
5 years ago
3 years ago
3 years ago
3 years ago
  1. package s3api
  2. import (
  3. "crypto/sha1"
  4. "encoding/xml"
  5. "fmt"
  6. "io"
  7. "net/http"
  8. "net/url"
  9. "strconv"
  10. "strings"
  11. "github.com/seaweedfs/seaweedfs/weed/glog"
  12. "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
  13. "github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
  14. weed_server "github.com/seaweedfs/seaweedfs/weed/server"
  15. "github.com/aws/aws-sdk-go/aws"
  16. "github.com/aws/aws-sdk-go/service/s3"
  17. )
  18. const (
  19. maxObjectListSizeLimit = 10000 // Limit number of objects in a listObjectsResponse.
  20. maxUploadsList = 10000 // Limit number of uploads in a listUploadsResponse.
  21. maxPartsList = 10000 // Limit number of parts in a listPartsResponse.
  22. globalMaxPartID = 100000
  23. )
  24. // NewMultipartUploadHandler - New multipart upload.
  25. func (s3a *S3ApiServer) NewMultipartUploadHandler(w http.ResponseWriter, r *http.Request) {
  26. bucket, object := s3_constants.GetBucketAndObject(r)
  27. //acl
  28. errCode := s3a.CheckAccessForNewMultipartUpload(r, bucket, object)
  29. if errCode != s3err.ErrNone {
  30. s3err.WriteErrorResponse(w, r, errCode)
  31. return
  32. }
  33. createMultipartUploadInput := &s3.CreateMultipartUploadInput{
  34. Bucket: aws.String(bucket),
  35. Key: objectKey(aws.String(object)),
  36. Metadata: make(map[string]*string),
  37. }
  38. metadata := weed_server.SaveAmzMetaData(r, nil, false)
  39. for k, v := range metadata {
  40. createMultipartUploadInput.Metadata[k] = aws.String(string(v))
  41. }
  42. contentType := r.Header.Get("Content-Type")
  43. if contentType != "" {
  44. createMultipartUploadInput.ContentType = &contentType
  45. }
  46. response, errCode := s3a.createMultipartUpload(createMultipartUploadInput)
  47. glog.V(2).Info("NewMultipartUploadHandler", string(s3err.EncodeXMLResponse(response)), errCode)
  48. if errCode != s3err.ErrNone {
  49. s3err.WriteErrorResponse(w, r, errCode)
  50. return
  51. }
  52. writeSuccessResponseXML(w, r, response)
  53. }
  54. // CompleteMultipartUploadHandler - Completes multipart upload.
  55. func (s3a *S3ApiServer) CompleteMultipartUploadHandler(w http.ResponseWriter, r *http.Request) {
  56. // https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html
  57. bucket, object := s3_constants.GetBucketAndObject(r)
  58. parts := &CompleteMultipartUpload{}
  59. if err := xmlDecoder(r.Body, parts, r.ContentLength); err != nil {
  60. s3err.WriteErrorResponse(w, r, s3err.ErrMalformedXML)
  61. return
  62. }
  63. // Get upload id.
  64. uploadID, _, _, _ := getObjectResources(r.URL.Query())
  65. err := s3a.checkUploadId(object, uploadID)
  66. if err != nil {
  67. s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchUpload)
  68. return
  69. }
  70. response, errCode := s3a.completeMultipartUpload(&s3.CompleteMultipartUploadInput{
  71. Bucket: aws.String(bucket),
  72. Key: objectKey(aws.String(object)),
  73. UploadId: aws.String(uploadID),
  74. }, parts)
  75. glog.V(2).Info("CompleteMultipartUploadHandler", string(s3err.EncodeXMLResponse(response)), errCode)
  76. if errCode != s3err.ErrNone {
  77. s3err.WriteErrorResponse(w, r, errCode)
  78. return
  79. }
  80. writeSuccessResponseXML(w, r, response)
  81. }
  82. // AbortMultipartUploadHandler - Aborts multipart upload.
  83. func (s3a *S3ApiServer) AbortMultipartUploadHandler(w http.ResponseWriter, r *http.Request) {
  84. bucket, object := s3_constants.GetBucketAndObject(r)
  85. // Get upload id.
  86. uploadID, _, _, _ := getObjectResources(r.URL.Query())
  87. err := s3a.checkUploadId(object, uploadID)
  88. if err != nil {
  89. s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchUpload)
  90. return
  91. }
  92. response, errCode := s3a.abortMultipartUpload(&s3.AbortMultipartUploadInput{
  93. Bucket: aws.String(bucket),
  94. Key: objectKey(aws.String(object)),
  95. UploadId: aws.String(uploadID),
  96. })
  97. if errCode != s3err.ErrNone {
  98. s3err.WriteErrorResponse(w, r, errCode)
  99. return
  100. }
  101. glog.V(2).Info("AbortMultipartUploadHandler", string(s3err.EncodeXMLResponse(response)))
  102. //https://docs.aws.amazon.com/AmazonS3/latest/API/API_AbortMultipartUpload.html
  103. s3err.WriteXMLResponse(w, r, http.StatusNoContent, response)
  104. s3err.PostLog(r, http.StatusNoContent, s3err.ErrNone)
  105. }
  106. // ListMultipartUploadsHandler - Lists multipart uploads.
  107. func (s3a *S3ApiServer) ListMultipartUploadsHandler(w http.ResponseWriter, r *http.Request) {
  108. bucket, _ := s3_constants.GetBucketAndObject(r)
  109. prefix, keyMarker, uploadIDMarker, delimiter, maxUploads, encodingType := getBucketMultipartResources(r.URL.Query())
  110. if maxUploads < 0 {
  111. s3err.WriteErrorResponse(w, r, s3err.ErrInvalidMaxUploads)
  112. return
  113. }
  114. if keyMarker != "" {
  115. // Marker not common with prefix is not implemented.
  116. if !strings.HasPrefix(keyMarker, prefix) {
  117. s3err.WriteErrorResponse(w, r, s3err.ErrNotImplemented)
  118. return
  119. }
  120. }
  121. response, errCode := s3a.listMultipartUploads(&s3.ListMultipartUploadsInput{
  122. Bucket: aws.String(bucket),
  123. Delimiter: aws.String(delimiter),
  124. EncodingType: aws.String(encodingType),
  125. KeyMarker: aws.String(keyMarker),
  126. MaxUploads: aws.Int64(int64(maxUploads)),
  127. Prefix: aws.String(prefix),
  128. UploadIdMarker: aws.String(uploadIDMarker),
  129. })
  130. glog.V(2).Infof("ListMultipartUploadsHandler %s errCode=%d", string(s3err.EncodeXMLResponse(response)), errCode)
  131. if errCode != s3err.ErrNone {
  132. s3err.WriteErrorResponse(w, r, errCode)
  133. return
  134. }
  135. // TODO handle encodingType
  136. writeSuccessResponseXML(w, r, response)
  137. }
  138. // ListObjectPartsHandler - Lists object parts in a multipart upload.
  139. func (s3a *S3ApiServer) ListObjectPartsHandler(w http.ResponseWriter, r *http.Request) {
  140. bucket, object := s3_constants.GetBucketAndObject(r)
  141. uploadID, partNumberMarker, maxParts, _ := getObjectResources(r.URL.Query())
  142. if partNumberMarker < 0 {
  143. s3err.WriteErrorResponse(w, r, s3err.ErrInvalidPartNumberMarker)
  144. return
  145. }
  146. if maxParts < 0 {
  147. s3err.WriteErrorResponse(w, r, s3err.ErrInvalidMaxParts)
  148. return
  149. }
  150. err := s3a.checkUploadId(object, uploadID)
  151. if err != nil {
  152. s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchUpload)
  153. return
  154. }
  155. response, errCode := s3a.listObjectParts(&s3.ListPartsInput{
  156. Bucket: aws.String(bucket),
  157. Key: objectKey(aws.String(object)),
  158. MaxParts: aws.Int64(int64(maxParts)),
  159. PartNumberMarker: aws.Int64(int64(partNumberMarker)),
  160. UploadId: aws.String(uploadID),
  161. })
  162. if errCode != s3err.ErrNone {
  163. s3err.WriteErrorResponse(w, r, errCode)
  164. return
  165. }
  166. glog.V(2).Infof("ListObjectPartsHandler %s count=%d", string(s3err.EncodeXMLResponse(response)), len(response.Part))
  167. writeSuccessResponseXML(w, r, response)
  168. }
  169. // PutObjectPartHandler - Put an object part in a multipart upload.
  170. func (s3a *S3ApiServer) PutObjectPartHandler(w http.ResponseWriter, r *http.Request) {
  171. bucket, object := s3_constants.GetBucketAndObject(r)
  172. uploadID := r.URL.Query().Get("uploadId")
  173. err := s3a.checkUploadId(object, uploadID)
  174. if err != nil {
  175. s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchUpload)
  176. return
  177. }
  178. partIDString := r.URL.Query().Get("partNumber")
  179. partID, err := strconv.Atoi(partIDString)
  180. if err != nil {
  181. s3err.WriteErrorResponse(w, r, s3err.ErrInvalidPart)
  182. return
  183. }
  184. if partID > globalMaxPartID {
  185. s3err.WriteErrorResponse(w, r, s3err.ErrInvalidMaxParts)
  186. return
  187. }
  188. dataReader := r.Body
  189. if s3a.iam.isEnabled() {
  190. rAuthType := getRequestAuthType(r)
  191. var s3ErrCode s3err.ErrorCode
  192. switch rAuthType {
  193. case authTypeStreamingSigned:
  194. dataReader, _, s3ErrCode = s3a.iam.newSignV4ChunkedReader(r)
  195. case authTypeSignedV2, authTypePresignedV2:
  196. _, s3ErrCode = s3a.iam.isReqAuthenticatedV2(r)
  197. case authTypePresigned, authTypeSigned:
  198. _, s3ErrCode = s3a.iam.reqSignatureV4Verify(r)
  199. }
  200. if s3ErrCode != s3err.ErrNone {
  201. s3err.WriteErrorResponse(w, r, s3ErrCode)
  202. return
  203. }
  204. }
  205. defer dataReader.Close()
  206. glog.V(2).Infof("PutObjectPartHandler %s %s %04d", bucket, uploadID, partID)
  207. uploadUrl := fmt.Sprintf("http://%s%s/%s/%04d.part",
  208. s3a.option.Filer.ToHttpAddress(), s3a.genUploadsFolder(bucket), uploadID, partID)
  209. if partID == 1 && r.Header.Get("Content-Type") == "" {
  210. dataReader = mimeDetect(r, dataReader)
  211. }
  212. destination := fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object)
  213. etag, errCode := s3a.putToFiler(r, uploadUrl, dataReader, destination)
  214. if errCode != s3err.ErrNone {
  215. s3err.WriteErrorResponse(w, r, errCode)
  216. return
  217. }
  218. setEtag(w, etag)
  219. writeSuccessResponseEmpty(w, r)
  220. }
  221. func (s3a *S3ApiServer) genUploadsFolder(bucket string) string {
  222. return fmt.Sprintf("%s/%s/%s", s3a.option.BucketsPath, bucket, s3_constants.MultipartUploadsFolder)
  223. }
  224. // Generate uploadID hash string from object
  225. func (s3a *S3ApiServer) generateUploadID(object string) string {
  226. if strings.HasPrefix(object, "/") {
  227. object = object[1:]
  228. }
  229. h := sha1.New()
  230. h.Write([]byte(object))
  231. return fmt.Sprintf("%x", h.Sum(nil))
  232. }
  233. // Check object name and uploadID when processing multipart uploading
  234. func (s3a *S3ApiServer) checkUploadId(object string, id string) error {
  235. hash := s3a.generateUploadID(object)
  236. if !strings.HasPrefix(id, hash) {
  237. glog.Errorf("object %s and uploadID %s are not matched", object, id)
  238. return fmt.Errorf("object %s and uploadID %s are not matched", object, id)
  239. }
  240. return nil
  241. }
  242. // Parse bucket url queries for ?uploads
  243. func getBucketMultipartResources(values url.Values) (prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int, encodingType string) {
  244. prefix = values.Get("prefix")
  245. keyMarker = values.Get("key-marker")
  246. uploadIDMarker = values.Get("upload-id-marker")
  247. delimiter = values.Get("delimiter")
  248. if values.Get("max-uploads") != "" {
  249. maxUploads, _ = strconv.Atoi(values.Get("max-uploads"))
  250. } else {
  251. maxUploads = maxUploadsList
  252. }
  253. encodingType = values.Get("encoding-type")
  254. return
  255. }
  256. // Parse object url queries
  257. func getObjectResources(values url.Values) (uploadID string, partNumberMarker, maxParts int, encodingType string) {
  258. uploadID = values.Get("uploadId")
  259. partNumberMarker, _ = strconv.Atoi(values.Get("part-number-marker"))
  260. if values.Get("max-parts") != "" {
  261. maxParts, _ = strconv.Atoi(values.Get("max-parts"))
  262. } else {
  263. maxParts = maxPartsList
  264. }
  265. encodingType = values.Get("encoding-type")
  266. return
  267. }
  268. func xmlDecoder(body io.Reader, v interface{}, size int64) error {
  269. var lbody io.Reader
  270. if size > 0 {
  271. lbody = io.LimitReader(body, size)
  272. } else {
  273. lbody = body
  274. }
  275. d := xml.NewDecoder(lbody)
  276. d.CharsetReader = func(label string, input io.Reader) (io.Reader, error) {
  277. return input, nil
  278. }
  279. return d.Decode(v)
  280. }
  281. type CompleteMultipartUpload struct {
  282. Parts []CompletedPart `xml:"Part"`
  283. }
  284. type CompletedPart struct {
  285. ETag string
  286. PartNumber int
  287. }