You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

175 lines
5.2 KiB

6 years ago
6 years ago
6 years ago
  1. package S3Sink
  2. import (
  3. "bytes"
  4. "context"
  5. "fmt"
  6. "io"
  7. "github.com/aws/aws-sdk-go/aws"
  8. "github.com/aws/aws-sdk-go/aws/awserr"
  9. "github.com/aws/aws-sdk-go/service/s3"
  10. "github.com/chrislusf/seaweedfs/weed/filer"
  11. "github.com/chrislusf/seaweedfs/weed/glog"
  12. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  13. "github.com/chrislusf/seaweedfs/weed/util"
  14. )
  15. func (s3sink *S3Sink) deleteObject(key string) error {
  16. input := &s3.DeleteObjectInput{
  17. Bucket: aws.String(s3sink.bucket),
  18. Key: aws.String(key),
  19. }
  20. result, err := s3sink.conn.DeleteObject(input)
  21. if err == nil {
  22. glog.V(2).Infof("[%s] delete %s: %v", s3sink.bucket, key, result)
  23. } else {
  24. glog.Errorf("[%s] delete %s: %v", s3sink.bucket, key, err)
  25. }
  26. return err
  27. }
  28. func (s3sink *S3Sink) createMultipartUpload(key string, entry *filer_pb.Entry) (uploadId string, err error) {
  29. input := &s3.CreateMultipartUploadInput{
  30. Bucket: aws.String(s3sink.bucket),
  31. Key: aws.String(key),
  32. ContentType: aws.String(entry.Attributes.Mime),
  33. }
  34. result, err := s3sink.conn.CreateMultipartUpload(input)
  35. if err == nil {
  36. glog.V(2).Infof("[%s] createMultipartUpload %s: %v", s3sink.bucket, key, result)
  37. } else {
  38. glog.Errorf("[%s] createMultipartUpload %s: %v", s3sink.bucket, key, err)
  39. return "", err
  40. }
  41. return *result.UploadId, err
  42. }
  43. func (s3sink *S3Sink) abortMultipartUpload(key, uploadId string) error {
  44. input := &s3.AbortMultipartUploadInput{
  45. Bucket: aws.String(s3sink.bucket),
  46. Key: aws.String(key),
  47. UploadId: aws.String(uploadId),
  48. }
  49. result, err := s3sink.conn.AbortMultipartUpload(input)
  50. if err != nil {
  51. if aerr, ok := err.(awserr.Error); ok {
  52. switch aerr.Code() {
  53. case s3.ErrCodeNoSuchUpload:
  54. glog.Errorf("[%s] abortMultipartUpload %s: %v %v", s3sink.bucket, key, s3.ErrCodeNoSuchUpload, aerr.Error())
  55. default:
  56. glog.Errorf("[%s] abortMultipartUpload %s: %v", s3sink.bucket, key, aerr.Error())
  57. }
  58. } else {
  59. // Print the error, cast err to awserr.Error to get the Code and
  60. // Message from an error.
  61. glog.Errorf("[%s] abortMultipartUpload %s: %v", s3sink.bucket, key, aerr.Error())
  62. }
  63. return err
  64. }
  65. glog.V(0).Infof("[%s] abortMultipartUpload %s: %v", s3sink.bucket, key, result)
  66. return nil
  67. }
  68. // To complete multipart upload
  69. func (s3sink *S3Sink) completeMultipartUpload(ctx context.Context, key, uploadId string, parts []*s3.CompletedPart) error {
  70. input := &s3.CompleteMultipartUploadInput{
  71. Bucket: aws.String(s3sink.bucket),
  72. Key: aws.String(key),
  73. UploadId: aws.String(uploadId),
  74. MultipartUpload: &s3.CompletedMultipartUpload{
  75. Parts: parts,
  76. },
  77. }
  78. result, err := s3sink.conn.CompleteMultipartUpload(input)
  79. if err == nil {
  80. glog.V(2).Infof("[%s] completeMultipartUpload %s: %v", s3sink.bucket, key, result)
  81. } else {
  82. glog.Errorf("[%s] completeMultipartUpload %s: %v", s3sink.bucket, key, err)
  83. return fmt.Errorf("[%s] completeMultipartUpload %s: %v", s3sink.bucket, key, err)
  84. }
  85. return nil
  86. }
  87. // To upload a part
  88. func (s3sink *S3Sink) uploadPart(key, uploadId string, partId int, chunk *filer.ChunkView) (*s3.CompletedPart, error) {
  89. var readSeeker io.ReadSeeker
  90. readSeeker, err := s3sink.buildReadSeeker(chunk)
  91. if err != nil {
  92. glog.Errorf("[%s] uploadPart %s %d read: %v", s3sink.bucket, key, partId, err)
  93. return nil, fmt.Errorf("[%s] uploadPart %s %d read: %v", s3sink.bucket, key, partId, err)
  94. }
  95. input := &s3.UploadPartInput{
  96. Body: readSeeker,
  97. Bucket: aws.String(s3sink.bucket),
  98. Key: aws.String(key),
  99. PartNumber: aws.Int64(int64(partId)),
  100. UploadId: aws.String(uploadId),
  101. }
  102. result, err := s3sink.conn.UploadPart(input)
  103. if err == nil {
  104. glog.V(2).Infof("[%s] uploadPart %s %d upload: %v", s3sink.bucket, key, partId, result)
  105. } else {
  106. glog.Errorf("[%s] uploadPart %s %d upload: %v", s3sink.bucket, key, partId, err)
  107. }
  108. part := &s3.CompletedPart{
  109. ETag: result.ETag,
  110. PartNumber: aws.Int64(int64(partId)),
  111. }
  112. return part, err
  113. }
  114. // To upload a part by copying byte range from an existing object as data source
  115. func (s3sink *S3Sink) uploadPartCopy(key, uploadId string, partId int64, copySource string, sourceStart, sourceStop int) error {
  116. input := &s3.UploadPartCopyInput{
  117. Bucket: aws.String(s3sink.bucket),
  118. CopySource: aws.String(fmt.Sprintf("/%s/%s", s3sink.bucket, copySource)),
  119. CopySourceRange: aws.String(fmt.Sprintf("bytes=%d-%d", sourceStart, sourceStop)),
  120. Key: aws.String(key),
  121. PartNumber: aws.Int64(partId),
  122. UploadId: aws.String(uploadId),
  123. }
  124. result, err := s3sink.conn.UploadPartCopy(input)
  125. if err == nil {
  126. glog.V(0).Infof("[%s] uploadPartCopy %s %d: %v", s3sink.bucket, key, partId, result)
  127. } else {
  128. glog.Errorf("[%s] uploadPartCopy %s %d: %v", s3sink.bucket, key, partId, err)
  129. }
  130. return err
  131. }
  132. func (s3sink *S3Sink) buildReadSeeker(chunk *filer.ChunkView) (io.ReadSeeker, error) {
  133. fileUrls, err := s3sink.filerSource.LookupFileId(chunk.FileId)
  134. if err != nil {
  135. return nil, err
  136. }
  137. buf := make([]byte, chunk.Size)
  138. for _, fileUrl := range fileUrls {
  139. _, err = util.ReadUrl(fileUrl, chunk.CipherKey, chunk.IsGzipped, false, chunk.Offset, int(chunk.Size), buf)
  140. if err != nil {
  141. glog.V(1).Infof("read from %s: %v", fileUrl, err)
  142. } else {
  143. break
  144. }
  145. }
  146. return bytes.NewReader(buf), nil
  147. }