You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

178 lines
5.2 KiB

6 years ago
6 years ago
6 years ago
  1. package S3Sink
  2. import (
  3. "bytes"
  4. "context"
  5. "fmt"
  6. "io"
  7. "github.com/aws/aws-sdk-go/aws"
  8. "github.com/aws/aws-sdk-go/aws/awserr"
  9. "github.com/aws/aws-sdk-go/service/s3"
  10. "github.com/chrislusf/seaweedfs/weed/filer"
  11. "github.com/chrislusf/seaweedfs/weed/glog"
  12. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  13. "github.com/chrislusf/seaweedfs/weed/util"
  14. )
  15. func (s3sink *S3Sink) deleteObject(key string) error {
  16. input := &s3.DeleteObjectInput{
  17. Bucket: aws.String(s3sink.bucket),
  18. Key: aws.String(key),
  19. }
  20. result, err := s3sink.conn.DeleteObject(input)
  21. if err == nil {
  22. glog.V(2).Infof("[%s] delete %s: %v", s3sink.bucket, key, result)
  23. } else {
  24. glog.Errorf("[%s] delete %s: %v", s3sink.bucket, key, err)
  25. }
  26. return err
  27. }
  28. func (s3sink *S3Sink) createMultipartUpload(key string, entry *filer_pb.Entry) (uploadId string, err error) {
  29. input := &s3.CreateMultipartUploadInput{
  30. Bucket: aws.String(s3sink.bucket),
  31. Key: aws.String(key),
  32. ContentType: aws.String(entry.Attributes.Mime),
  33. }
  34. if s3sink.acl != "" {
  35. input.ACL = aws.String(s3sink.acl)
  36. }
  37. result, err := s3sink.conn.CreateMultipartUpload(input)
  38. if err == nil {
  39. glog.V(2).Infof("[%s] createMultipartUpload %s: %v", s3sink.bucket, key, result)
  40. } else {
  41. glog.Errorf("[%s] createMultipartUpload %s: %v", s3sink.bucket, key, err)
  42. return "", err
  43. }
  44. return *result.UploadId, err
  45. }
  46. func (s3sink *S3Sink) abortMultipartUpload(key, uploadId string) error {
  47. input := &s3.AbortMultipartUploadInput{
  48. Bucket: aws.String(s3sink.bucket),
  49. Key: aws.String(key),
  50. UploadId: aws.String(uploadId),
  51. }
  52. result, err := s3sink.conn.AbortMultipartUpload(input)
  53. if err != nil {
  54. if aerr, ok := err.(awserr.Error); ok {
  55. switch aerr.Code() {
  56. case s3.ErrCodeNoSuchUpload:
  57. glog.Errorf("[%s] abortMultipartUpload %s: %v %v", s3sink.bucket, key, s3.ErrCodeNoSuchUpload, aerr.Error())
  58. default:
  59. glog.Errorf("[%s] abortMultipartUpload %s: %v", s3sink.bucket, key, aerr.Error())
  60. }
  61. } else {
  62. // Print the error, cast err to awserr.Error to get the Code and
  63. // Message from an error.
  64. glog.Errorf("[%s] abortMultipartUpload %s: %v", s3sink.bucket, key, aerr.Error())
  65. }
  66. return err
  67. }
  68. glog.V(0).Infof("[%s] abortMultipartUpload %s: %v", s3sink.bucket, key, result)
  69. return nil
  70. }
  71. // To complete multipart upload
  72. func (s3sink *S3Sink) completeMultipartUpload(ctx context.Context, key, uploadId string, parts []*s3.CompletedPart) error {
  73. input := &s3.CompleteMultipartUploadInput{
  74. Bucket: aws.String(s3sink.bucket),
  75. Key: aws.String(key),
  76. UploadId: aws.String(uploadId),
  77. MultipartUpload: &s3.CompletedMultipartUpload{
  78. Parts: parts,
  79. },
  80. }
  81. result, err := s3sink.conn.CompleteMultipartUpload(input)
  82. if err == nil {
  83. glog.V(2).Infof("[%s] completeMultipartUpload %s: %v", s3sink.bucket, key, result)
  84. } else {
  85. glog.Errorf("[%s] completeMultipartUpload %s: %v", s3sink.bucket, key, err)
  86. return fmt.Errorf("[%s] completeMultipartUpload %s: %v", s3sink.bucket, key, err)
  87. }
  88. return nil
  89. }
  90. // To upload a part
  91. func (s3sink *S3Sink) uploadPart(key, uploadId string, partId int, chunk *filer.ChunkView) (*s3.CompletedPart, error) {
  92. var readSeeker io.ReadSeeker
  93. readSeeker, err := s3sink.buildReadSeeker(chunk)
  94. if err != nil {
  95. glog.Errorf("[%s] uploadPart %s %d read: %v", s3sink.bucket, key, partId, err)
  96. return nil, fmt.Errorf("[%s] uploadPart %s %d read: %v", s3sink.bucket, key, partId, err)
  97. }
  98. input := &s3.UploadPartInput{
  99. Body: readSeeker,
  100. Bucket: aws.String(s3sink.bucket),
  101. Key: aws.String(key),
  102. PartNumber: aws.Int64(int64(partId)),
  103. UploadId: aws.String(uploadId),
  104. }
  105. result, err := s3sink.conn.UploadPart(input)
  106. if err == nil {
  107. glog.V(2).Infof("[%s] uploadPart %s %d upload: %v", s3sink.bucket, key, partId, result)
  108. } else {
  109. glog.Errorf("[%s] uploadPart %s %d upload: %v", s3sink.bucket, key, partId, err)
  110. }
  111. part := &s3.CompletedPart{
  112. ETag: result.ETag,
  113. PartNumber: aws.Int64(int64(partId)),
  114. }
  115. return part, err
  116. }
  117. // To upload a part by copying byte range from an existing object as data source
  118. func (s3sink *S3Sink) uploadPartCopy(key, uploadId string, partId int64, copySource string, sourceStart, sourceStop int) error {
  119. input := &s3.UploadPartCopyInput{
  120. Bucket: aws.String(s3sink.bucket),
  121. CopySource: aws.String(fmt.Sprintf("/%s/%s", s3sink.bucket, copySource)),
  122. CopySourceRange: aws.String(fmt.Sprintf("bytes=%d-%d", sourceStart, sourceStop)),
  123. Key: aws.String(key),
  124. PartNumber: aws.Int64(partId),
  125. UploadId: aws.String(uploadId),
  126. }
  127. result, err := s3sink.conn.UploadPartCopy(input)
  128. if err == nil {
  129. glog.V(0).Infof("[%s] uploadPartCopy %s %d: %v", s3sink.bucket, key, partId, result)
  130. } else {
  131. glog.Errorf("[%s] uploadPartCopy %s %d: %v", s3sink.bucket, key, partId, err)
  132. }
  133. return err
  134. }
  135. func (s3sink *S3Sink) buildReadSeeker(chunk *filer.ChunkView) (io.ReadSeeker, error) {
  136. fileUrls, err := s3sink.filerSource.LookupFileId(chunk.FileId)
  137. if err != nil {
  138. return nil, err
  139. }
  140. buf := make([]byte, chunk.Size)
  141. for _, fileUrl := range fileUrls {
  142. _, err = util.ReadUrl(fileUrl, chunk.CipherKey, chunk.IsGzipped, false, chunk.Offset, int(chunk.Size), buf)
  143. if err != nil {
  144. glog.V(1).Infof("read from %s: %v", fileUrl, err)
  145. } else {
  146. break
  147. }
  148. }
  149. return bytes.NewReader(buf), nil
  150. }