You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

252 lines
7.0 KiB

6 years ago
  1. package s3api
  2. import (
  3. "fmt"
  4. "github.com/aws/aws-sdk-go/aws"
  5. "github.com/aws/aws-sdk-go/service/s3"
  6. "github.com/gorilla/mux"
  7. "net/http"
  8. "net/url"
  9. "strconv"
  10. "strings"
  11. )
  12. const (
  13. maxObjectList = 1000 // Limit number of objects in a listObjectsResponse.
  14. maxUploadsList = 1000 // Limit number of uploads in a listUploadsResponse.
  15. maxPartsList = 1000 // Limit number of parts in a listPartsResponse.
  16. globalMaxPartID = 10000
  17. )
  18. // NewMultipartUploadHandler - New multipart upload.
  19. func (s3a *S3ApiServer) NewMultipartUploadHandler(w http.ResponseWriter, r *http.Request) {
  20. var object, bucket string
  21. vars := mux.Vars(r)
  22. bucket = vars["bucket"]
  23. object = vars["object"]
  24. response, errCode := s3a.createMultipartUpload(&s3.CreateMultipartUploadInput{
  25. Bucket: aws.String(bucket),
  26. Key: aws.String(object),
  27. })
  28. if errCode != ErrNone {
  29. writeErrorResponse(w, errCode, r.URL)
  30. return
  31. }
  32. // println("NewMultipartUploadHandler", string(encodeResponse(response)))
  33. writeSuccessResponseXML(w, encodeResponse(response))
  34. }
  35. // CompleteMultipartUploadHandler - Completes multipart upload.
  36. func (s3a *S3ApiServer) CompleteMultipartUploadHandler(w http.ResponseWriter, r *http.Request) {
  37. vars := mux.Vars(r)
  38. bucket := vars["bucket"]
  39. object := getObject(vars)
  40. // Get upload id.
  41. uploadID, _, _, _ := getObjectResources(r.URL.Query())
  42. response, errCode := s3a.completeMultipartUpload(&s3.CompleteMultipartUploadInput{
  43. Bucket: aws.String(bucket),
  44. Key: aws.String(object),
  45. UploadId: aws.String(uploadID),
  46. })
  47. // println("CompleteMultipartUploadHandler", string(encodeResponse(response)), errCode)
  48. if errCode != ErrNone {
  49. writeErrorResponse(w, errCode, r.URL)
  50. return
  51. }
  52. writeSuccessResponseXML(w, encodeResponse(response))
  53. }
  54. // AbortMultipartUploadHandler - Aborts multipart upload.
  55. func (s3a *S3ApiServer) AbortMultipartUploadHandler(w http.ResponseWriter, r *http.Request) {
  56. vars := mux.Vars(r)
  57. bucket := vars["bucket"]
  58. object := getObject(vars)
  59. // Get upload id.
  60. uploadID, _, _, _ := getObjectResources(r.URL.Query())
  61. response, errCode := s3a.abortMultipartUpload(&s3.AbortMultipartUploadInput{
  62. Bucket: aws.String(bucket),
  63. Key: aws.String(object),
  64. UploadId: aws.String(uploadID),
  65. })
  66. if errCode != ErrNone {
  67. writeErrorResponse(w, errCode, r.URL)
  68. return
  69. }
  70. // println("AbortMultipartUploadHandler", string(encodeResponse(response)))
  71. writeSuccessResponseXML(w, encodeResponse(response))
  72. }
  73. // ListMultipartUploadsHandler - Lists multipart uploads.
  74. func (s3a *S3ApiServer) ListMultipartUploadsHandler(w http.ResponseWriter, r *http.Request) {
  75. vars := mux.Vars(r)
  76. bucket := vars["bucket"]
  77. prefix, keyMarker, uploadIDMarker, delimiter, maxUploads, encodingType := getBucketMultipartResources(r.URL.Query())
  78. if maxUploads < 0 {
  79. writeErrorResponse(w, ErrInvalidMaxUploads, r.URL)
  80. return
  81. }
  82. if keyMarker != "" {
  83. // Marker not common with prefix is not implemented.
  84. if !strings.HasPrefix(keyMarker, prefix) {
  85. writeErrorResponse(w, ErrNotImplemented, r.URL)
  86. return
  87. }
  88. }
  89. response, errCode := s3a.listMultipartUploads(&s3.ListMultipartUploadsInput{
  90. Bucket: aws.String(bucket),
  91. Delimiter: aws.String(delimiter),
  92. EncodingType: aws.String(encodingType),
  93. KeyMarker: aws.String(keyMarker),
  94. MaxUploads: aws.Int64(int64(maxUploads)),
  95. Prefix: aws.String(prefix),
  96. UploadIdMarker: aws.String(uploadIDMarker),
  97. })
  98. if errCode != ErrNone {
  99. writeErrorResponse(w, errCode, r.URL)
  100. return
  101. }
  102. // TODO handle encodingType
  103. // println("ListMultipartUploadsHandler", string(encodeResponse(response)))
  104. writeSuccessResponseXML(w, encodeResponse(response))
  105. }
  106. // ListObjectPartsHandler - Lists object parts in a multipart upload.
  107. func (s3a *S3ApiServer) ListObjectPartsHandler(w http.ResponseWriter, r *http.Request) {
  108. vars := mux.Vars(r)
  109. bucket := vars["bucket"]
  110. object := getObject(vars)
  111. uploadID, partNumberMarker, maxParts, _ := getObjectResources(r.URL.Query())
  112. if partNumberMarker < 0 {
  113. writeErrorResponse(w, ErrInvalidPartNumberMarker, r.URL)
  114. return
  115. }
  116. if maxParts < 0 {
  117. writeErrorResponse(w, ErrInvalidMaxParts, r.URL)
  118. return
  119. }
  120. response, errCode := s3a.listObjectParts(&s3.ListPartsInput{
  121. Bucket: aws.String(bucket),
  122. Key: aws.String(object),
  123. MaxParts: aws.Int64(int64(maxParts)),
  124. PartNumberMarker: aws.Int64(int64(partNumberMarker)),
  125. UploadId: aws.String(uploadID),
  126. })
  127. if errCode != ErrNone {
  128. writeErrorResponse(w, errCode, r.URL)
  129. return
  130. }
  131. // println("ListObjectPartsHandler", string(encodeResponse(response)))
  132. writeSuccessResponseXML(w, encodeResponse(response))
  133. }
  134. // PutObjectPartHandler - Put an object part in a multipart upload.
  135. func (s3a *S3ApiServer) PutObjectPartHandler(w http.ResponseWriter, r *http.Request) {
  136. vars := mux.Vars(r)
  137. bucket := vars["bucket"]
  138. rAuthType := getRequestAuthType(r)
  139. uploadID := r.URL.Query().Get("uploadId")
  140. exists, err := s3a.exists(s3a.genUploadsFolder(bucket), uploadID, true)
  141. if !exists {
  142. writeErrorResponse(w, ErrNoSuchUpload, r.URL)
  143. return
  144. }
  145. partIDString := r.URL.Query().Get("partNumber")
  146. partID, err := strconv.Atoi(partIDString)
  147. if err != nil {
  148. writeErrorResponse(w, ErrInvalidPart, r.URL)
  149. return
  150. }
  151. if partID > globalMaxPartID {
  152. writeErrorResponse(w, ErrInvalidMaxParts, r.URL)
  153. return
  154. }
  155. dataReader := r.Body
  156. if rAuthType == authTypeStreamingSigned {
  157. dataReader = newSignV4ChunkedReader(r)
  158. }
  159. uploadUrl := fmt.Sprintf("http://%s%s/%s/%04d.part",
  160. s3a.option.Filer, s3a.genUploadsFolder(bucket), uploadID, partID-1)
  161. etag, errCode := s3a.putToFiler(r, uploadUrl, dataReader)
  162. if errCode != ErrNone {
  163. writeErrorResponse(w, errCode, r.URL)
  164. return
  165. }
  166. setEtag(w, etag)
  167. writeSuccessResponseEmpty(w)
  168. }
  169. func (s3a *S3ApiServer) genUploadsFolder(bucket string) string {
  170. return fmt.Sprintf("%s/%s/_uploads", s3a.option.BucketsPath, bucket)
  171. }
  172. // Parse bucket url queries for ?uploads
  173. func getBucketMultipartResources(values url.Values) (prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int, encodingType string) {
  174. prefix = values.Get("prefix")
  175. keyMarker = values.Get("key-marker")
  176. uploadIDMarker = values.Get("upload-id-marker")
  177. delimiter = values.Get("delimiter")
  178. if values.Get("max-uploads") != "" {
  179. maxUploads, _ = strconv.Atoi(values.Get("max-uploads"))
  180. } else {
  181. maxUploads = maxUploadsList
  182. }
  183. encodingType = values.Get("encoding-type")
  184. return
  185. }
  186. // Parse object url queries
  187. func getObjectResources(values url.Values) (uploadID string, partNumberMarker, maxParts int, encodingType string) {
  188. uploadID = values.Get("uploadId")
  189. partNumberMarker, _ = strconv.Atoi(values.Get("part-number-marker"))
  190. if values.Get("max-parts") != "" {
  191. maxParts, _ = strconv.Atoi(values.Get("max-parts"))
  192. } else {
  193. maxParts = maxPartsList
  194. }
  195. encodingType = values.Get("encoding-type")
  196. return
  197. }
  198. type byCompletedPartNumber []*s3.CompletedPart
  199. func (a byCompletedPartNumber) Len() int { return len(a) }
  200. func (a byCompletedPartNumber) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
  201. func (a byCompletedPartNumber) Less(i, j int) bool { return *a[i].PartNumber < *a[j].PartNumber }