You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

411 lines
12 KiB

3 years ago
6 years ago
4 years ago
4 years ago
4 years ago
3 years ago
3 years ago
4 years ago
4 years ago
3 years ago
3 years ago
4 years ago
3 years ago
4 years ago
3 years ago
3 years ago
3 years ago
4 years ago
4 years ago
3 years ago
3 years ago
3 years ago
3 years ago
4 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
4 years ago
4 years ago
3 years ago
5 months ago
3 years ago
3 years ago
4 years ago
3 years ago
3 years ago
2 years ago
  1. package s3api
  2. import (
  3. "crypto/sha1"
  4. "encoding/xml"
  5. "fmt"
  6. "io"
  7. "net/http"
  8. "net/url"
  9. "strconv"
  10. "strings"
  11. "github.com/google/uuid"
  12. "github.com/seaweedfs/seaweedfs/weed/glog"
  13. "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
  14. "github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
  15. weed_server "github.com/seaweedfs/seaweedfs/weed/server"
  16. "github.com/aws/aws-sdk-go/aws"
  17. "github.com/aws/aws-sdk-go/service/s3"
  18. )
  19. const (
  20. maxObjectListSizeLimit = 10000 // Limit number of objects in a listObjectsResponse.
  21. maxUploadsList = 10000 // Limit number of uploads in a listUploadsResponse.
  22. maxPartsList = 10000 // Limit number of parts in a listPartsResponse.
  23. globalMaxPartID = 100000
  24. )
  25. // NewMultipartUploadHandler - New multipart upload.
  26. func (s3a *S3ApiServer) NewMultipartUploadHandler(w http.ResponseWriter, r *http.Request) {
  27. bucket, object := s3_constants.GetBucketAndObject(r)
  28. errCode, initiatorId := s3a.CheckAccessForNewMultipartUpload(r, bucket, object)
  29. if errCode != s3err.ErrNone {
  30. s3err.WriteErrorResponse(w, r, errCode)
  31. return
  32. }
  33. createMultipartUploadInput := &s3.CreateMultipartUploadInput{
  34. Bucket: aws.String(bucket),
  35. Key: objectKey(aws.String(object)),
  36. Metadata: make(map[string]*string),
  37. }
  38. metadata := weed_server.SaveAmzMetaData(r, nil, false)
  39. for k, v := range metadata {
  40. createMultipartUploadInput.Metadata[k] = aws.String(string(v))
  41. }
  42. contentType := r.Header.Get("Content-Type")
  43. if contentType != "" {
  44. createMultipartUploadInput.ContentType = &contentType
  45. }
  46. response, errCode := s3a.createMultipartUpload(initiatorId, createMultipartUploadInput)
  47. glog.V(2).Info("NewMultipartUploadHandler", string(s3err.EncodeXMLResponse(response)), errCode)
  48. if errCode != s3err.ErrNone {
  49. s3err.WriteErrorResponse(w, r, errCode)
  50. return
  51. }
  52. writeSuccessResponseXML(w, r, response)
  53. }
  54. // CompleteMultipartUploadHandler - Completes multipart upload.
  55. func (s3a *S3ApiServer) CompleteMultipartUploadHandler(w http.ResponseWriter, r *http.Request) {
  56. // https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html
  57. bucket, object := s3_constants.GetBucketAndObject(r)
  58. parts := &CompleteMultipartUpload{}
  59. if err := xmlDecoder(r.Body, parts, r.ContentLength); err != nil {
  60. s3err.WriteErrorResponse(w, r, s3err.ErrMalformedXML)
  61. return
  62. }
  63. // Get upload id.
  64. uploadID, _, _, _ := getObjectResources(r.URL.Query())
  65. err := s3a.checkUploadId(object, uploadID)
  66. if err != nil {
  67. s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchUpload)
  68. return
  69. }
  70. errCode := s3a.CheckAccessForCompleteMultipartUpload(r, bucket, uploadID)
  71. if errCode != s3err.ErrNone {
  72. s3err.WriteErrorResponse(w, r, errCode)
  73. return
  74. }
  75. response, errCode := s3a.completeMultipartUpload(&s3.CompleteMultipartUploadInput{
  76. Bucket: aws.String(bucket),
  77. Key: objectKey(aws.String(object)),
  78. UploadId: aws.String(uploadID),
  79. }, parts)
  80. glog.V(2).Info("CompleteMultipartUploadHandler", string(s3err.EncodeXMLResponse(response)), errCode)
  81. if errCode != s3err.ErrNone {
  82. s3err.WriteErrorResponse(w, r, errCode)
  83. return
  84. }
  85. writeSuccessResponseXML(w, r, response)
  86. }
  87. // AbortMultipartUploadHandler - Aborts multipart upload.
  88. func (s3a *S3ApiServer) AbortMultipartUploadHandler(w http.ResponseWriter, r *http.Request) {
  89. bucket, object := s3_constants.GetBucketAndObject(r)
  90. // Get upload id.
  91. uploadID, _, _, _ := getObjectResources(r.URL.Query())
  92. err := s3a.checkUploadId(object, uploadID)
  93. if err != nil {
  94. s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchUpload)
  95. return
  96. }
  97. errCode := s3a.CheckAccessForAbortMultipartUpload(r, bucket, uploadID)
  98. if errCode != s3err.ErrNone {
  99. s3err.WriteErrorResponse(w, r, errCode)
  100. return
  101. }
  102. exists, err := s3a.exists(s3a.genUploadsFolder(bucket), uploadID, true)
  103. if err != nil {
  104. glog.V(1).Infof("list parts error: %v, request url: %s", err, r.RequestURI)
  105. s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchUpload)
  106. return
  107. }
  108. if !exists {
  109. glog.V(1).Infof("list parts not found, request url: %s", r.RequestURI)
  110. s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchUpload)
  111. return
  112. }
  113. response, errCode := s3a.abortMultipartUpload(&s3.AbortMultipartUploadInput{
  114. Bucket: aws.String(bucket),
  115. Key: objectKey(aws.String(object)),
  116. UploadId: aws.String(uploadID),
  117. })
  118. if errCode != s3err.ErrNone {
  119. s3err.WriteErrorResponse(w, r, errCode)
  120. return
  121. }
  122. glog.V(2).Info("AbortMultipartUploadHandler", string(s3err.EncodeXMLResponse(response)))
  123. //https://docs.aws.amazon.com/AmazonS3/latest/API/API_AbortMultipartUpload.html
  124. s3err.WriteEmptyResponse(w, r, http.StatusNoContent)
  125. s3err.PostLog(r, http.StatusNoContent, s3err.ErrNone)
  126. }
  127. // ListMultipartUploadsHandler - Lists multipart uploads.
  128. func (s3a *S3ApiServer) ListMultipartUploadsHandler(w http.ResponseWriter, r *http.Request) {
  129. bucket, _ := s3_constants.GetBucketAndObject(r)
  130. prefix, keyMarker, uploadIDMarker, delimiter, maxUploads, encodingType := getBucketMultipartResources(r.URL.Query())
  131. if maxUploads < 0 {
  132. s3err.WriteErrorResponse(w, r, s3err.ErrInvalidMaxUploads)
  133. return
  134. }
  135. if keyMarker != "" {
  136. // Marker not common with prefix is not implemented.
  137. if !strings.HasPrefix(keyMarker, prefix) {
  138. s3err.WriteErrorResponse(w, r, s3err.ErrNotImplemented)
  139. return
  140. }
  141. }
  142. bucketMetaData, errorCode := s3a.checkAccessForReadBucket(r, bucket, s3_constants.PermissionRead)
  143. if errorCode != s3err.ErrNone {
  144. s3err.WriteErrorResponse(w, r, errorCode)
  145. return
  146. }
  147. response, errCode := s3a.listMultipartUploads(bucketMetaData, &s3.ListMultipartUploadsInput{
  148. Bucket: aws.String(bucket),
  149. Delimiter: aws.String(delimiter),
  150. EncodingType: aws.String(encodingType),
  151. KeyMarker: aws.String(keyMarker),
  152. MaxUploads: aws.Int64(int64(maxUploads)),
  153. Prefix: aws.String(prefix),
  154. UploadIdMarker: aws.String(uploadIDMarker),
  155. })
  156. glog.V(2).Infof("ListMultipartUploadsHandler %s errCode=%d", string(s3err.EncodeXMLResponse(response)), errCode)
  157. if errCode != s3err.ErrNone {
  158. s3err.WriteErrorResponse(w, r, errCode)
  159. return
  160. }
  161. // TODO handle encodingType
  162. writeSuccessResponseXML(w, r, response)
  163. }
  164. // ListObjectPartsHandler - Lists object parts in a multipart upload.
  165. func (s3a *S3ApiServer) ListObjectPartsHandler(w http.ResponseWriter, r *http.Request) {
  166. bucket, object := s3_constants.GetBucketAndObject(r)
  167. uploadID, partNumberMarker, maxParts, _ := getObjectResources(r.URL.Query())
  168. if partNumberMarker < 0 {
  169. s3err.WriteErrorResponse(w, r, s3err.ErrInvalidPartNumberMarker)
  170. return
  171. }
  172. if maxParts < 0 {
  173. s3err.WriteErrorResponse(w, r, s3err.ErrInvalidMaxParts)
  174. return
  175. }
  176. err := s3a.checkUploadId(object, uploadID)
  177. if err != nil {
  178. s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchUpload)
  179. return
  180. }
  181. errCode := s3a.CheckAccessForListMultipartUploadParts(r, bucket, uploadID)
  182. if errCode != s3err.ErrNone {
  183. s3err.WriteErrorResponse(w, r, errCode)
  184. return
  185. }
  186. exists, err := s3a.exists(s3a.genUploadsFolder(bucket), uploadID, true)
  187. if err != nil {
  188. glog.V(1).Infof("list parts error: %v, request url: %s", err, r.RequestURI)
  189. s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchUpload)
  190. return
  191. }
  192. if !exists {
  193. glog.V(1).Infof("list parts not found, request url: %s", r.RequestURI)
  194. s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchUpload)
  195. return
  196. }
  197. response, errCode := s3a.listObjectParts(&s3.ListPartsInput{
  198. Bucket: aws.String(bucket),
  199. Key: objectKey(aws.String(object)),
  200. MaxParts: aws.Int64(int64(maxParts)),
  201. PartNumberMarker: aws.Int64(int64(partNumberMarker)),
  202. UploadId: aws.String(uploadID),
  203. })
  204. if errCode != s3err.ErrNone {
  205. s3err.WriteErrorResponse(w, r, errCode)
  206. return
  207. }
  208. glog.V(2).Infof("ListObjectPartsHandler %s count=%d", string(s3err.EncodeXMLResponse(response)), len(response.Part))
  209. writeSuccessResponseXML(w, r, response)
  210. }
  211. // PutObjectPartHandler - Put an object part in a multipart upload.
  212. func (s3a *S3ApiServer) PutObjectPartHandler(w http.ResponseWriter, r *http.Request) {
  213. bucket, object := s3_constants.GetBucketAndObject(r)
  214. uploadID := r.URL.Query().Get("uploadId")
  215. err := s3a.checkUploadId(object, uploadID)
  216. if err != nil {
  217. s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchUpload)
  218. return
  219. }
  220. partIDString := r.URL.Query().Get("partNumber")
  221. partID, err := strconv.Atoi(partIDString)
  222. if err != nil {
  223. s3err.WriteErrorResponse(w, r, s3err.ErrInvalidPart)
  224. return
  225. }
  226. if partID > globalMaxPartID {
  227. s3err.WriteErrorResponse(w, r, s3err.ErrInvalidMaxParts)
  228. return
  229. }
  230. dataReader := r.Body
  231. if s3a.iam.isEnabled() {
  232. rAuthType := getRequestAuthType(r)
  233. var s3ErrCode s3err.ErrorCode
  234. var identity *Identity
  235. switch rAuthType {
  236. case authTypeStreamingSigned:
  237. dataReader, identity, s3ErrCode = s3a.iam.newSignV4ChunkedReader(r)
  238. case authTypeSignedV2, authTypePresignedV2:
  239. identity, s3ErrCode = s3a.iam.isReqAuthenticatedV2(r)
  240. case authTypePresigned, authTypeSigned:
  241. identity, s3ErrCode = s3a.iam.reqSignatureV4Verify(r)
  242. }
  243. if s3ErrCode != s3err.ErrNone {
  244. s3err.WriteErrorResponse(w, r, s3ErrCode)
  245. return
  246. }
  247. if identity != nil && identity.Account.Id != AccountAnonymous.Id {
  248. r.Header.Set(s3_constants.AmzAccountId, identity.Account.Id)
  249. }
  250. }
  251. defer dataReader.Close()
  252. errorCode := s3a.CheckAccessForPutObjectPartHandler(r, bucket)
  253. if errorCode != s3err.ErrNone {
  254. s3err.WriteErrorResponse(w, r, errorCode)
  255. return
  256. }
  257. glog.V(2).Infof("PutObjectPartHandler %s %s %04d", bucket, uploadID, partID)
  258. uploadUrl := s3a.genPartUploadUrl(bucket, uploadID, partID)
  259. if partID == 1 && r.Header.Get("Content-Type") == "" {
  260. dataReader = mimeDetect(r, dataReader)
  261. }
  262. destination := fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object)
  263. etag, errCode := s3a.putToFiler(r, uploadUrl, dataReader, destination, bucket)
  264. if errCode != s3err.ErrNone {
  265. s3err.WriteErrorResponse(w, r, errCode)
  266. return
  267. }
  268. setEtag(w, etag)
  269. writeSuccessResponseEmpty(w, r)
  270. }
  271. func (s3a *S3ApiServer) genUploadsFolder(bucket string) string {
  272. return fmt.Sprintf("%s/%s/%s", s3a.option.BucketsPath, bucket, s3_constants.MultipartUploadsFolder)
  273. }
  274. func (s3a *S3ApiServer) genPartUploadUrl(bucket, uploadID string, partID int) string {
  275. return fmt.Sprintf("http://%s%s/%s/%04d_%s.part",
  276. s3a.option.Filer.ToHttpAddress(), s3a.genUploadsFolder(bucket), uploadID, partID, uuid.NewString())
  277. }
  278. // Generate uploadID hash string from object
  279. func (s3a *S3ApiServer) generateUploadID(object string) string {
  280. if strings.HasPrefix(object, "/") {
  281. object = object[1:]
  282. }
  283. h := sha1.New()
  284. h.Write([]byte(object))
  285. return fmt.Sprintf("%x", h.Sum(nil))
  286. }
  287. // Check object name and uploadID when processing multipart uploading
  288. func (s3a *S3ApiServer) checkUploadId(object string, id string) error {
  289. hash := s3a.generateUploadID(object)
  290. if !strings.HasPrefix(id, hash) {
  291. glog.Errorf("object %s and uploadID %s are not matched", object, id)
  292. return fmt.Errorf("object %s and uploadID %s are not matched", object, id)
  293. }
  294. return nil
  295. }
  296. // Parse bucket url queries for ?uploads
  297. func getBucketMultipartResources(values url.Values) (prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int, encodingType string) {
  298. prefix = values.Get("prefix")
  299. keyMarker = values.Get("key-marker")
  300. uploadIDMarker = values.Get("upload-id-marker")
  301. delimiter = values.Get("delimiter")
  302. if values.Get("max-uploads") != "" {
  303. maxUploads, _ = strconv.Atoi(values.Get("max-uploads"))
  304. } else {
  305. maxUploads = maxUploadsList
  306. }
  307. encodingType = values.Get("encoding-type")
  308. return
  309. }
  310. // Parse object url queries
  311. func getObjectResources(values url.Values) (uploadID string, partNumberMarker, maxParts int, encodingType string) {
  312. uploadID = values.Get("uploadId")
  313. partNumberMarker, _ = strconv.Atoi(values.Get("part-number-marker"))
  314. if values.Get("max-parts") != "" {
  315. maxParts, _ = strconv.Atoi(values.Get("max-parts"))
  316. } else {
  317. maxParts = maxPartsList
  318. }
  319. encodingType = values.Get("encoding-type")
  320. return
  321. }
  322. func xmlDecoder(body io.Reader, v interface{}, size int64) error {
  323. var lbody io.Reader
  324. if size > 0 {
  325. lbody = io.LimitReader(body, size)
  326. } else {
  327. lbody = body
  328. }
  329. d := xml.NewDecoder(lbody)
  330. d.CharsetReader = func(label string, input io.Reader) (io.Reader, error) {
  331. return input, nil
  332. }
  333. return d.Decode(v)
  334. }
  335. type CompleteMultipartUpload struct {
  336. Parts []CompletedPart `xml:"Part"`
  337. }
  338. type CompletedPart struct {
  339. ETag string
  340. PartNumber int
  341. }