You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

344 lines
10 KiB

3 years ago
6 years ago
3 years ago
4 years ago
4 years ago
3 years ago
3 years ago
3 years ago
4 years ago
3 years ago
3 years ago
4 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
4 years ago
4 years ago
3 years ago
3 years ago
3 years ago
3 years ago
4 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
4 years ago
4 years ago
3 years ago
3 years ago
3 years ago
4 years ago
3 years ago
3 years ago
2 years ago
  1. package s3api
  2. import (
  3. "crypto/sha1"
  4. "encoding/xml"
  5. "fmt"
  6. "io"
  7. "net/http"
  8. "net/url"
  9. "strconv"
  10. "strings"
  11. "github.com/seaweedfs/seaweedfs/weed/glog"
  12. "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
  13. "github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
  14. weed_server "github.com/seaweedfs/seaweedfs/weed/server"
  15. "github.com/aws/aws-sdk-go/aws"
  16. "github.com/aws/aws-sdk-go/service/s3"
  17. )
  18. const (
  19. maxObjectListSizeLimit = 10000 // Limit number of objects in a listObjectsResponse.
  20. maxUploadsList = 10000 // Limit number of uploads in a listUploadsResponse.
  21. maxPartsList = 10000 // Limit number of parts in a listPartsResponse.
  22. globalMaxPartID = 100000
  23. )
  24. // NewMultipartUploadHandler - New multipart upload.
  25. func (s3a *S3ApiServer) NewMultipartUploadHandler(w http.ResponseWriter, r *http.Request) {
  26. bucket, object := s3_constants.GetBucketAndObject(r)
  27. createMultipartUploadInput := &s3.CreateMultipartUploadInput{
  28. Bucket: aws.String(bucket),
  29. Key: objectKey(aws.String(object)),
  30. Metadata: make(map[string]*string),
  31. }
  32. metadata := weed_server.SaveAmzMetaData(r, nil, false)
  33. for k, v := range metadata {
  34. createMultipartUploadInput.Metadata[k] = aws.String(string(v))
  35. }
  36. contentType := r.Header.Get("Content-Type")
  37. if contentType != "" {
  38. createMultipartUploadInput.ContentType = &contentType
  39. }
  40. response, errCode := s3a.createMultipartUpload(createMultipartUploadInput)
  41. glog.V(2).Info("NewMultipartUploadHandler", string(s3err.EncodeXMLResponse(response)), errCode)
  42. if errCode != s3err.ErrNone {
  43. s3err.WriteErrorResponse(w, r, errCode)
  44. return
  45. }
  46. writeSuccessResponseXML(w, r, response)
  47. }
  48. // CompleteMultipartUploadHandler - Completes multipart upload.
  49. func (s3a *S3ApiServer) CompleteMultipartUploadHandler(w http.ResponseWriter, r *http.Request) {
  50. // https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html
  51. bucket, object := s3_constants.GetBucketAndObject(r)
  52. parts := &CompleteMultipartUpload{}
  53. if err := xmlDecoder(r.Body, parts, r.ContentLength); err != nil {
  54. s3err.WriteErrorResponse(w, r, s3err.ErrMalformedXML)
  55. return
  56. }
  57. // Get upload id.
  58. uploadID, _, _, _ := getObjectResources(r.URL.Query())
  59. err := s3a.checkUploadId(object, uploadID)
  60. if err != nil {
  61. s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchUpload)
  62. return
  63. }
  64. response, errCode := s3a.completeMultipartUpload(&s3.CompleteMultipartUploadInput{
  65. Bucket: aws.String(bucket),
  66. Key: objectKey(aws.String(object)),
  67. UploadId: aws.String(uploadID),
  68. }, parts)
  69. glog.V(2).Info("CompleteMultipartUploadHandler", string(s3err.EncodeXMLResponse(response)), errCode)
  70. if errCode != s3err.ErrNone {
  71. s3err.WriteErrorResponse(w, r, errCode)
  72. return
  73. }
  74. writeSuccessResponseXML(w, r, response)
  75. }
  76. // AbortMultipartUploadHandler - Aborts multipart upload.
  77. func (s3a *S3ApiServer) AbortMultipartUploadHandler(w http.ResponseWriter, r *http.Request) {
  78. bucket, object := s3_constants.GetBucketAndObject(r)
  79. // Get upload id.
  80. uploadID, _, _, _ := getObjectResources(r.URL.Query())
  81. err := s3a.checkUploadId(object, uploadID)
  82. if err != nil {
  83. s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchUpload)
  84. return
  85. }
  86. response, errCode := s3a.abortMultipartUpload(&s3.AbortMultipartUploadInput{
  87. Bucket: aws.String(bucket),
  88. Key: objectKey(aws.String(object)),
  89. UploadId: aws.String(uploadID),
  90. })
  91. if errCode != s3err.ErrNone {
  92. s3err.WriteErrorResponse(w, r, errCode)
  93. return
  94. }
  95. glog.V(2).Info("AbortMultipartUploadHandler", string(s3err.EncodeXMLResponse(response)))
  96. //https://docs.aws.amazon.com/AmazonS3/latest/API/API_AbortMultipartUpload.html
  97. s3err.WriteEmptyResponse(w, r, http.StatusNoContent)
  98. s3err.PostLog(r, http.StatusNoContent, s3err.ErrNone)
  99. }
  100. // ListMultipartUploadsHandler - Lists multipart uploads.
  101. func (s3a *S3ApiServer) ListMultipartUploadsHandler(w http.ResponseWriter, r *http.Request) {
  102. bucket, _ := s3_constants.GetBucketAndObject(r)
  103. prefix, keyMarker, uploadIDMarker, delimiter, maxUploads, encodingType := getBucketMultipartResources(r.URL.Query())
  104. if maxUploads < 0 {
  105. s3err.WriteErrorResponse(w, r, s3err.ErrInvalidMaxUploads)
  106. return
  107. }
  108. if keyMarker != "" {
  109. // Marker not common with prefix is not implemented.
  110. if !strings.HasPrefix(keyMarker, prefix) {
  111. s3err.WriteErrorResponse(w, r, s3err.ErrNotImplemented)
  112. return
  113. }
  114. }
  115. response, errCode := s3a.listMultipartUploads(&s3.ListMultipartUploadsInput{
  116. Bucket: aws.String(bucket),
  117. Delimiter: aws.String(delimiter),
  118. EncodingType: aws.String(encodingType),
  119. KeyMarker: aws.String(keyMarker),
  120. MaxUploads: aws.Int64(int64(maxUploads)),
  121. Prefix: aws.String(prefix),
  122. UploadIdMarker: aws.String(uploadIDMarker),
  123. })
  124. glog.V(2).Infof("ListMultipartUploadsHandler %s errCode=%d", string(s3err.EncodeXMLResponse(response)), errCode)
  125. if errCode != s3err.ErrNone {
  126. s3err.WriteErrorResponse(w, r, errCode)
  127. return
  128. }
  129. // TODO handle encodingType
  130. writeSuccessResponseXML(w, r, response)
  131. }
  132. // ListObjectPartsHandler - Lists object parts in a multipart upload.
  133. func (s3a *S3ApiServer) ListObjectPartsHandler(w http.ResponseWriter, r *http.Request) {
  134. bucket, object := s3_constants.GetBucketAndObject(r)
  135. uploadID, partNumberMarker, maxParts, _ := getObjectResources(r.URL.Query())
  136. if partNumberMarker < 0 {
  137. s3err.WriteErrorResponse(w, r, s3err.ErrInvalidPartNumberMarker)
  138. return
  139. }
  140. if maxParts < 0 {
  141. s3err.WriteErrorResponse(w, r, s3err.ErrInvalidMaxParts)
  142. return
  143. }
  144. err := s3a.checkUploadId(object, uploadID)
  145. if err != nil {
  146. s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchUpload)
  147. return
  148. }
  149. response, errCode := s3a.listObjectParts(&s3.ListPartsInput{
  150. Bucket: aws.String(bucket),
  151. Key: objectKey(aws.String(object)),
  152. MaxParts: aws.Int64(int64(maxParts)),
  153. PartNumberMarker: aws.Int64(int64(partNumberMarker)),
  154. UploadId: aws.String(uploadID),
  155. })
  156. if errCode != s3err.ErrNone {
  157. s3err.WriteErrorResponse(w, r, errCode)
  158. return
  159. }
  160. glog.V(2).Infof("ListObjectPartsHandler %s count=%d", string(s3err.EncodeXMLResponse(response)), len(response.Part))
  161. writeSuccessResponseXML(w, r, response)
  162. }
  163. // PutObjectPartHandler - Put an object part in a multipart upload.
  164. func (s3a *S3ApiServer) PutObjectPartHandler(w http.ResponseWriter, r *http.Request) {
  165. bucket, object := s3_constants.GetBucketAndObject(r)
  166. uploadID := r.URL.Query().Get("uploadId")
  167. err := s3a.checkUploadId(object, uploadID)
  168. if err != nil {
  169. s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchUpload)
  170. return
  171. }
  172. partIDString := r.URL.Query().Get("partNumber")
  173. partID, err := strconv.Atoi(partIDString)
  174. if err != nil {
  175. s3err.WriteErrorResponse(w, r, s3err.ErrInvalidPart)
  176. return
  177. }
  178. if partID > globalMaxPartID {
  179. s3err.WriteErrorResponse(w, r, s3err.ErrInvalidMaxParts)
  180. return
  181. }
  182. dataReader := r.Body
  183. if s3a.iam.isEnabled() {
  184. rAuthType := getRequestAuthType(r)
  185. var s3ErrCode s3err.ErrorCode
  186. switch rAuthType {
  187. case authTypeStreamingSigned:
  188. dataReader, s3ErrCode = s3a.iam.newSignV4ChunkedReader(r)
  189. case authTypeSignedV2, authTypePresignedV2:
  190. _, s3ErrCode = s3a.iam.isReqAuthenticatedV2(r)
  191. case authTypePresigned, authTypeSigned:
  192. _, s3ErrCode = s3a.iam.reqSignatureV4Verify(r)
  193. }
  194. if s3ErrCode != s3err.ErrNone {
  195. s3err.WriteErrorResponse(w, r, s3ErrCode)
  196. return
  197. }
  198. }
  199. defer dataReader.Close()
  200. glog.V(2).Infof("PutObjectPartHandler %s %s %04d", bucket, uploadID, partID)
  201. uploadUrl := fmt.Sprintf("http://%s%s/%s/%04d.part",
  202. s3a.option.Filer.ToHttpAddress(), s3a.genUploadsFolder(bucket), uploadID, partID)
  203. if partID == 1 && r.Header.Get("Content-Type") == "" {
  204. dataReader = mimeDetect(r, dataReader)
  205. }
  206. destination := fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object)
  207. etag, errCode := s3a.putToFiler(r, uploadUrl, dataReader, destination)
  208. if errCode != s3err.ErrNone {
  209. s3err.WriteErrorResponse(w, r, errCode)
  210. return
  211. }
  212. setEtag(w, etag)
  213. writeSuccessResponseEmpty(w, r)
  214. }
  215. func (s3a *S3ApiServer) genUploadsFolder(bucket string) string {
  216. return fmt.Sprintf("%s/%s/%s", s3a.option.BucketsPath, bucket, s3_constants.MultipartUploadsFolder)
  217. }
  218. // Generate uploadID hash string from object
  219. func (s3a *S3ApiServer) generateUploadID(object string) string {
  220. if strings.HasPrefix(object, "/") {
  221. object = object[1:]
  222. }
  223. h := sha1.New()
  224. h.Write([]byte(object))
  225. return fmt.Sprintf("%x", h.Sum(nil))
  226. }
  227. // Check object name and uploadID when processing multipart uploading
  228. func (s3a *S3ApiServer) checkUploadId(object string, id string) error {
  229. hash := s3a.generateUploadID(object)
  230. if !strings.HasPrefix(id, hash) {
  231. glog.Errorf("object %s and uploadID %s are not matched", object, id)
  232. return fmt.Errorf("object %s and uploadID %s are not matched", object, id)
  233. }
  234. return nil
  235. }
  236. // Parse bucket url queries for ?uploads
  237. func getBucketMultipartResources(values url.Values) (prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int, encodingType string) {
  238. prefix = values.Get("prefix")
  239. keyMarker = values.Get("key-marker")
  240. uploadIDMarker = values.Get("upload-id-marker")
  241. delimiter = values.Get("delimiter")
  242. if values.Get("max-uploads") != "" {
  243. maxUploads, _ = strconv.Atoi(values.Get("max-uploads"))
  244. } else {
  245. maxUploads = maxUploadsList
  246. }
  247. encodingType = values.Get("encoding-type")
  248. return
  249. }
  250. // Parse object url queries
  251. func getObjectResources(values url.Values) (uploadID string, partNumberMarker, maxParts int, encodingType string) {
  252. uploadID = values.Get("uploadId")
  253. partNumberMarker, _ = strconv.Atoi(values.Get("part-number-marker"))
  254. if values.Get("max-parts") != "" {
  255. maxParts, _ = strconv.Atoi(values.Get("max-parts"))
  256. } else {
  257. maxParts = maxPartsList
  258. }
  259. encodingType = values.Get("encoding-type")
  260. return
  261. }
  262. func xmlDecoder(body io.Reader, v interface{}, size int64) error {
  263. var lbody io.Reader
  264. if size > 0 {
  265. lbody = io.LimitReader(body, size)
  266. } else {
  267. lbody = body
  268. }
  269. d := xml.NewDecoder(lbody)
  270. d.CharsetReader = func(label string, input io.Reader) (io.Reader, error) {
  271. return input, nil
  272. }
  273. return d.Decode(v)
  274. }
  275. type CompleteMultipartUpload struct {
  276. Parts []CompletedPart `xml:"Part"`
  277. }
  278. type CompletedPart struct {
  279. ETag string
  280. PartNumber int
  281. }