You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

400 lines
12 KiB

3 years ago
6 years ago
4 years ago
4 years ago
4 years ago
3 years ago
3 years ago
4 years ago
4 years ago
3 years ago
3 years ago
4 years ago
3 years ago
4 years ago
3 years ago
3 years ago
3 years ago
4 years ago
4 years ago
3 years ago
3 years ago
3 years ago
3 years ago
4 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
4 years ago
4 years ago
3 years ago
3 years ago
3 years ago
4 years ago
3 years ago
3 years ago
2 years ago
  1. package s3api
  2. import (
  3. "crypto/sha1"
  4. "encoding/xml"
  5. "fmt"
  6. "io"
  7. "net/http"
  8. "net/url"
  9. "strconv"
  10. "strings"
  11. "github.com/seaweedfs/seaweedfs/weed/glog"
  12. "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
  13. "github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
  14. weed_server "github.com/seaweedfs/seaweedfs/weed/server"
  15. "github.com/aws/aws-sdk-go/aws"
  16. "github.com/aws/aws-sdk-go/service/s3"
  17. )
  18. const (
  19. maxObjectListSizeLimit = 10000 // Limit number of objects in a listObjectsResponse.
  20. maxUploadsList = 10000 // Limit number of uploads in a listUploadsResponse.
  21. maxPartsList = 10000 // Limit number of parts in a listPartsResponse.
  22. globalMaxPartID = 100000
  23. )
  24. // NewMultipartUploadHandler - New multipart upload.
  25. func (s3a *S3ApiServer) NewMultipartUploadHandler(w http.ResponseWriter, r *http.Request) {
  26. bucket, object := s3_constants.GetBucketAndObject(r)
  27. errCode, initiatorId := s3a.CheckAccessForNewMultipartUpload(r, bucket, object)
  28. if errCode != s3err.ErrNone {
  29. s3err.WriteErrorResponse(w, r, errCode)
  30. return
  31. }
  32. createMultipartUploadInput := &s3.CreateMultipartUploadInput{
  33. Bucket: aws.String(bucket),
  34. Key: objectKey(aws.String(object)),
  35. Metadata: make(map[string]*string),
  36. }
  37. metadata := weed_server.SaveAmzMetaData(r, nil, false)
  38. for k, v := range metadata {
  39. createMultipartUploadInput.Metadata[k] = aws.String(string(v))
  40. }
  41. contentType := r.Header.Get("Content-Type")
  42. if contentType != "" {
  43. createMultipartUploadInput.ContentType = &contentType
  44. }
  45. response, errCode := s3a.createMultipartUpload(initiatorId, createMultipartUploadInput)
  46. glog.V(2).Info("NewMultipartUploadHandler", string(s3err.EncodeXMLResponse(response)), errCode)
  47. if errCode != s3err.ErrNone {
  48. s3err.WriteErrorResponse(w, r, errCode)
  49. return
  50. }
  51. writeSuccessResponseXML(w, r, response)
  52. }
  53. // CompleteMultipartUploadHandler - Completes multipart upload.
  54. func (s3a *S3ApiServer) CompleteMultipartUploadHandler(w http.ResponseWriter, r *http.Request) {
  55. // https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html
  56. bucket, object := s3_constants.GetBucketAndObject(r)
  57. parts := &CompleteMultipartUpload{}
  58. if err := xmlDecoder(r.Body, parts, r.ContentLength); err != nil {
  59. s3err.WriteErrorResponse(w, r, s3err.ErrMalformedXML)
  60. return
  61. }
  62. // Get upload id.
  63. uploadID, _, _, _ := getObjectResources(r.URL.Query())
  64. err := s3a.checkUploadId(object, uploadID)
  65. if err != nil {
  66. s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchUpload)
  67. return
  68. }
  69. errCode := s3a.CheckAccessForCompleteMultipartUpload(r, bucket, uploadID)
  70. if errCode != s3err.ErrNone {
  71. s3err.WriteErrorResponse(w, r, errCode)
  72. return
  73. }
  74. response, errCode := s3a.completeMultipartUpload(&s3.CompleteMultipartUploadInput{
  75. Bucket: aws.String(bucket),
  76. Key: objectKey(aws.String(object)),
  77. UploadId: aws.String(uploadID),
  78. }, parts)
  79. glog.V(2).Info("CompleteMultipartUploadHandler", string(s3err.EncodeXMLResponse(response)), errCode)
  80. if errCode != s3err.ErrNone {
  81. s3err.WriteErrorResponse(w, r, errCode)
  82. return
  83. }
  84. writeSuccessResponseXML(w, r, response)
  85. }
  86. // AbortMultipartUploadHandler - Aborts multipart upload.
  87. func (s3a *S3ApiServer) AbortMultipartUploadHandler(w http.ResponseWriter, r *http.Request) {
  88. bucket, object := s3_constants.GetBucketAndObject(r)
  89. // Get upload id.
  90. uploadID, _, _, _ := getObjectResources(r.URL.Query())
  91. err := s3a.checkUploadId(object, uploadID)
  92. if err != nil {
  93. s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchUpload)
  94. return
  95. }
  96. errCode := s3a.CheckAccessForAbortMultipartUpload(r, bucket, uploadID)
  97. if errCode != s3err.ErrNone {
  98. s3err.WriteErrorResponse(w, r, errCode)
  99. return
  100. }
  101. exists, err := s3a.exists(s3a.genUploadsFolder(bucket), uploadID, true)
  102. if err != nil {
  103. glog.V(1).Infof("list parts error: %v, request url: %s", err, r.RequestURI)
  104. s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchUpload)
  105. return
  106. }
  107. if !exists {
  108. glog.V(1).Infof("list parts not found, request url: %s", r.RequestURI)
  109. s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchUpload)
  110. return
  111. }
  112. response, errCode := s3a.abortMultipartUpload(&s3.AbortMultipartUploadInput{
  113. Bucket: aws.String(bucket),
  114. Key: objectKey(aws.String(object)),
  115. UploadId: aws.String(uploadID),
  116. })
  117. if errCode != s3err.ErrNone {
  118. s3err.WriteErrorResponse(w, r, errCode)
  119. return
  120. }
  121. glog.V(2).Info("AbortMultipartUploadHandler", string(s3err.EncodeXMLResponse(response)))
  122. //https://docs.aws.amazon.com/AmazonS3/latest/API/API_AbortMultipartUpload.html
  123. s3err.WriteEmptyResponse(w, r, http.StatusNoContent)
  124. s3err.PostLog(r, http.StatusNoContent, s3err.ErrNone)
  125. }
  126. // ListMultipartUploadsHandler - Lists multipart uploads.
  127. func (s3a *S3ApiServer) ListMultipartUploadsHandler(w http.ResponseWriter, r *http.Request) {
  128. bucket, _ := s3_constants.GetBucketAndObject(r)
  129. prefix, keyMarker, uploadIDMarker, delimiter, maxUploads, encodingType := getBucketMultipartResources(r.URL.Query())
  130. if maxUploads < 0 {
  131. s3err.WriteErrorResponse(w, r, s3err.ErrInvalidMaxUploads)
  132. return
  133. }
  134. if keyMarker != "" {
  135. // Marker not common with prefix is not implemented.
  136. if !strings.HasPrefix(keyMarker, prefix) {
  137. s3err.WriteErrorResponse(w, r, s3err.ErrNotImplemented)
  138. return
  139. }
  140. }
  141. bucketMetaData, errorCode := s3a.checkAccessForReadBucket(r, bucket, s3_constants.PermissionRead)
  142. if errorCode != s3err.ErrNone {
  143. s3err.WriteErrorResponse(w, r, errorCode)
  144. return
  145. }
  146. response, errCode := s3a.listMultipartUploads(bucketMetaData, &s3.ListMultipartUploadsInput{
  147. Bucket: aws.String(bucket),
  148. Delimiter: aws.String(delimiter),
  149. EncodingType: aws.String(encodingType),
  150. KeyMarker: aws.String(keyMarker),
  151. MaxUploads: aws.Int64(int64(maxUploads)),
  152. Prefix: aws.String(prefix),
  153. UploadIdMarker: aws.String(uploadIDMarker),
  154. })
  155. glog.V(2).Infof("ListMultipartUploadsHandler %s errCode=%d", string(s3err.EncodeXMLResponse(response)), errCode)
  156. if errCode != s3err.ErrNone {
  157. s3err.WriteErrorResponse(w, r, errCode)
  158. return
  159. }
  160. // TODO handle encodingType
  161. writeSuccessResponseXML(w, r, response)
  162. }
  163. // ListObjectPartsHandler - Lists object parts in a multipart upload.
  164. func (s3a *S3ApiServer) ListObjectPartsHandler(w http.ResponseWriter, r *http.Request) {
  165. bucket, object := s3_constants.GetBucketAndObject(r)
  166. uploadID, partNumberMarker, maxParts, _ := getObjectResources(r.URL.Query())
  167. if partNumberMarker < 0 {
  168. s3err.WriteErrorResponse(w, r, s3err.ErrInvalidPartNumberMarker)
  169. return
  170. }
  171. if maxParts < 0 {
  172. s3err.WriteErrorResponse(w, r, s3err.ErrInvalidMaxParts)
  173. return
  174. }
  175. err := s3a.checkUploadId(object, uploadID)
  176. if err != nil {
  177. s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchUpload)
  178. return
  179. }
  180. errCode := s3a.CheckAccessForListMultipartUploadParts(r, bucket, uploadID)
  181. if errCode != s3err.ErrNone {
  182. s3err.WriteErrorResponse(w, r, errCode)
  183. return
  184. }
  185. exists, err := s3a.exists(s3a.genUploadsFolder(bucket), uploadID, true)
  186. if err != nil {
  187. glog.V(1).Infof("list parts error: %v, request url: %s", err, r.RequestURI)
  188. s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchUpload)
  189. return
  190. }
  191. if !exists {
  192. glog.V(1).Infof("list parts not found, request url: %s", r.RequestURI)
  193. s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchUpload)
  194. return
  195. }
  196. response, errCode := s3a.listObjectParts(&s3.ListPartsInput{
  197. Bucket: aws.String(bucket),
  198. Key: objectKey(aws.String(object)),
  199. MaxParts: aws.Int64(int64(maxParts)),
  200. PartNumberMarker: aws.Int64(int64(partNumberMarker)),
  201. UploadId: aws.String(uploadID),
  202. })
  203. if errCode != s3err.ErrNone {
  204. s3err.WriteErrorResponse(w, r, errCode)
  205. return
  206. }
  207. glog.V(2).Infof("ListObjectPartsHandler %s count=%d", string(s3err.EncodeXMLResponse(response)), len(response.Part))
  208. writeSuccessResponseXML(w, r, response)
  209. }
  210. // PutObjectPartHandler - Put an object part in a multipart upload.
  211. func (s3a *S3ApiServer) PutObjectPartHandler(w http.ResponseWriter, r *http.Request) {
  212. bucket, object := s3_constants.GetBucketAndObject(r)
  213. uploadID := r.URL.Query().Get("uploadId")
  214. err := s3a.checkUploadId(object, uploadID)
  215. if err != nil {
  216. s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchUpload)
  217. return
  218. }
  219. partIDString := r.URL.Query().Get("partNumber")
  220. partID, err := strconv.Atoi(partIDString)
  221. if err != nil {
  222. s3err.WriteErrorResponse(w, r, s3err.ErrInvalidPart)
  223. return
  224. }
  225. if partID > globalMaxPartID {
  226. s3err.WriteErrorResponse(w, r, s3err.ErrInvalidMaxParts)
  227. return
  228. }
  229. dataReader := r.Body
  230. if s3a.iam.isEnabled() {
  231. rAuthType := getRequestAuthType(r)
  232. var s3ErrCode s3err.ErrorCode
  233. switch rAuthType {
  234. case authTypeStreamingSigned:
  235. dataReader, _, s3ErrCode = s3a.iam.newSignV4ChunkedReader(r)
  236. case authTypeSignedV2, authTypePresignedV2:
  237. _, s3ErrCode = s3a.iam.isReqAuthenticatedV2(r)
  238. case authTypePresigned, authTypeSigned:
  239. _, s3ErrCode = s3a.iam.reqSignatureV4Verify(r)
  240. }
  241. if s3ErrCode != s3err.ErrNone {
  242. s3err.WriteErrorResponse(w, r, s3ErrCode)
  243. return
  244. }
  245. }
  246. defer dataReader.Close()
  247. errorCode := s3a.CheckAccessForPutObjectPartHandler(r, bucket)
  248. if errorCode != s3err.ErrNone {
  249. s3err.WriteErrorResponse(w, r, errorCode)
  250. return
  251. }
  252. glog.V(2).Infof("PutObjectPartHandler %s %s %04d", bucket, uploadID, partID)
  253. uploadUrl := fmt.Sprintf("http://%s%s/%s/%04d.part",
  254. s3a.option.Filer.ToHttpAddress(), s3a.genUploadsFolder(bucket), uploadID, partID)
  255. if partID == 1 && r.Header.Get("Content-Type") == "" {
  256. dataReader = mimeDetect(r, dataReader)
  257. }
  258. etag, errCode := s3a.putToFiler(r, uploadUrl, dataReader, "")
  259. if errCode != s3err.ErrNone {
  260. s3err.WriteErrorResponse(w, r, errCode)
  261. return
  262. }
  263. setEtag(w, etag)
  264. writeSuccessResponseEmpty(w, r)
  265. }
  266. func (s3a *S3ApiServer) genUploadsFolder(bucket string) string {
  267. return fmt.Sprintf("%s/%s/%s", s3a.option.BucketsPath, bucket, s3_constants.MultipartUploadsFolder)
  268. }
  269. // Generate uploadID hash string from object
  270. func (s3a *S3ApiServer) generateUploadID(object string) string {
  271. if strings.HasPrefix(object, "/") {
  272. object = object[1:]
  273. }
  274. h := sha1.New()
  275. h.Write([]byte(object))
  276. return fmt.Sprintf("%x", h.Sum(nil))
  277. }
  278. // Check object name and uploadID when processing multipart uploading
  279. func (s3a *S3ApiServer) checkUploadId(object string, id string) error {
  280. hash := s3a.generateUploadID(object)
  281. if !strings.HasPrefix(id, hash) {
  282. glog.Errorf("object %s and uploadID %s are not matched", object, id)
  283. return fmt.Errorf("object %s and uploadID %s are not matched", object, id)
  284. }
  285. return nil
  286. }
  287. // Parse bucket url queries for ?uploads
  288. func getBucketMultipartResources(values url.Values) (prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int, encodingType string) {
  289. prefix = values.Get("prefix")
  290. keyMarker = values.Get("key-marker")
  291. uploadIDMarker = values.Get("upload-id-marker")
  292. delimiter = values.Get("delimiter")
  293. if values.Get("max-uploads") != "" {
  294. maxUploads, _ = strconv.Atoi(values.Get("max-uploads"))
  295. } else {
  296. maxUploads = maxUploadsList
  297. }
  298. encodingType = values.Get("encoding-type")
  299. return
  300. }
  301. // Parse object url queries
  302. func getObjectResources(values url.Values) (uploadID string, partNumberMarker, maxParts int, encodingType string) {
  303. uploadID = values.Get("uploadId")
  304. partNumberMarker, _ = strconv.Atoi(values.Get("part-number-marker"))
  305. if values.Get("max-parts") != "" {
  306. maxParts, _ = strconv.Atoi(values.Get("max-parts"))
  307. } else {
  308. maxParts = maxPartsList
  309. }
  310. encodingType = values.Get("encoding-type")
  311. return
  312. }
  313. func xmlDecoder(body io.Reader, v interface{}, size int64) error {
  314. var lbody io.Reader
  315. if size > 0 {
  316. lbody = io.LimitReader(body, size)
  317. } else {
  318. lbody = body
  319. }
  320. d := xml.NewDecoder(lbody)
  321. d.CharsetReader = func(label string, input io.Reader) (io.Reader, error) {
  322. return input, nil
  323. }
  324. return d.Decode(v)
  325. }
  326. type CompleteMultipartUpload struct {
  327. Parts []CompletedPart `xml:"Part"`
  328. }
  329. type CompletedPart struct {
  330. ETag string
  331. PartNumber int
  332. }