You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

165 lines
4.9 KiB

  1. package S3Sink
  2. import (
  3. "github.com/aws/aws-sdk-go/service/s3"
  4. "github.com/aws/aws-sdk-go/aws"
  5. "github.com/chrislusf/seaweedfs/weed/glog"
  6. "github.com/aws/aws-sdk-go/aws/awserr"
  7. "fmt"
  8. "io"
  9. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  10. "github.com/chrislusf/seaweedfs/weed/filer2"
  11. "github.com/chrislusf/seaweedfs/weed/util"
  12. "bytes"
  13. )
  14. func (s3sink *S3Sink) deleteObject(key string) error {
  15. input := &s3.DeleteObjectInput{
  16. Bucket: aws.String(s3sink.bucket),
  17. Key: aws.String(key),
  18. }
  19. result, err := s3sink.conn.DeleteObject(input)
  20. if err == nil {
  21. glog.V(0).Infof("[%s] delete %s: %v", s3sink.bucket, key, result)
  22. } else {
  23. glog.Errorf("[%s] delete %s: %v", s3sink.bucket, key, err)
  24. }
  25. return err
  26. }
  27. func (s3sink *S3Sink) createMultipartUpload(key string, entry *filer_pb.Entry) (uploadId string, err error) {
  28. input := &s3.CreateMultipartUploadInput{
  29. Bucket: aws.String(s3sink.bucket),
  30. Key: aws.String(key),
  31. ContentType: aws.String(entry.Attributes.Mime),
  32. }
  33. result, err := s3sink.conn.CreateMultipartUpload(input)
  34. if err == nil {
  35. glog.V(0).Infof("[%s] createMultipartUpload %s: %v", s3sink.bucket, key, result)
  36. } else {
  37. glog.Errorf("[%s] createMultipartUpload %s: %v", s3sink.bucket, key, err)
  38. return "", err
  39. }
  40. return *result.UploadId, err
  41. }
  42. func (s3sink *S3Sink) abortMultipartUpload(key, uploadId string) error {
  43. input := &s3.AbortMultipartUploadInput{
  44. Bucket: aws.String(s3sink.bucket),
  45. Key: aws.String(key),
  46. UploadId: aws.String(uploadId),
  47. }
  48. result, err := s3sink.conn.AbortMultipartUpload(input)
  49. if err != nil {
  50. if aerr, ok := err.(awserr.Error); ok {
  51. switch aerr.Code() {
  52. case s3.ErrCodeNoSuchUpload:
  53. glog.Errorf("[%s] abortMultipartUpload %s: %v %v", s3sink.bucket, key, s3.ErrCodeNoSuchUpload, aerr.Error())
  54. default:
  55. glog.Errorf("[%s] abortMultipartUpload %s: %v", s3sink.bucket, key, aerr.Error())
  56. }
  57. } else {
  58. // Print the error, cast err to awserr.Error to get the Code and
  59. // Message from an error.
  60. glog.Errorf("[%s] abortMultipartUpload %s: %v", s3sink.bucket, key, aerr.Error())
  61. }
  62. return err
  63. }
  64. glog.V(0).Infof("[%s] abortMultipartUpload %s: %v", s3sink.bucket, key, result)
  65. return nil
  66. }
  67. // To complete multipart upload
  68. func (s3sink *S3Sink) completeMultipartUpload(key, uploadId string, parts []*s3.CompletedPart) error {
  69. input := &s3.CompleteMultipartUploadInput{
  70. Bucket: aws.String(s3sink.bucket),
  71. Key: aws.String(key),
  72. UploadId: aws.String(uploadId),
  73. MultipartUpload: &s3.CompletedMultipartUpload{
  74. Parts: parts,
  75. },
  76. }
  77. result, err := s3sink.conn.CompleteMultipartUpload(input)
  78. if err == nil {
  79. glog.V(0).Infof("[%s] completeMultipartUpload %s: %v", s3sink.bucket, key, result)
  80. } else {
  81. glog.Errorf("[%s] completeMultipartUpload %s: %v", s3sink.bucket, key, err)
  82. }
  83. return err
  84. }
  85. // To upload a part
  86. func (s3sink *S3Sink) uploadPart(key, uploadId string, partId int, chunk *filer2.ChunkView) (*s3.CompletedPart, error) {
  87. var readSeeker io.ReadSeeker
  88. readSeeker, err := s3sink.buildReadSeeker(chunk)
  89. if err != nil {
  90. glog.Errorf("[%s] uploadPart %s %d read: %v", s3sink.bucket, key, partId, err)
  91. return nil, fmt.Errorf("[%s] uploadPart %s %d read: %v", s3sink.bucket, key, partId, err)
  92. }
  93. input := &s3.UploadPartInput{
  94. Body: readSeeker,
  95. Bucket: aws.String(s3sink.bucket),
  96. Key: aws.String(key),
  97. PartNumber: aws.Int64(int64(partId)),
  98. UploadId: aws.String(uploadId),
  99. }
  100. result, err := s3sink.conn.UploadPart(input)
  101. if err == nil {
  102. glog.V(0).Infof("[%s] uploadPart %s %d upload: %v", s3sink.bucket, key, partId, result)
  103. } else {
  104. glog.Errorf("[%s] uploadPart %s %d upload: %v", s3sink.bucket, key, partId, err)
  105. }
  106. part := &s3.CompletedPart{
  107. ETag: result.ETag,
  108. PartNumber: aws.Int64(int64(partId)),
  109. }
  110. return part, err
  111. }
  112. // To upload a part by copying byte range from an existing object as data source
  113. func (s3sink *S3Sink) uploadPartCopy(key, uploadId string, partId int64, copySource string, sourceStart, sourceStop int) error {
  114. input := &s3.UploadPartCopyInput{
  115. Bucket: aws.String(s3sink.bucket),
  116. CopySource: aws.String(fmt.Sprintf("/%s/%s", s3sink.bucket, copySource)),
  117. CopySourceRange: aws.String(fmt.Sprintf("bytes=%d-%d", sourceStart, sourceStop)),
  118. Key: aws.String(key),
  119. PartNumber: aws.Int64(partId),
  120. UploadId: aws.String(uploadId),
  121. }
  122. result, err := s3sink.conn.UploadPartCopy(input)
  123. if err == nil {
  124. glog.V(0).Infof("[%s] uploadPartCopy %s %d: %v", s3sink.bucket, key, partId, result)
  125. } else {
  126. glog.Errorf("[%s] uploadPartCopy %s %d: %v", s3sink.bucket, key, partId, err)
  127. }
  128. return err
  129. }
  130. func (s3sink *S3Sink) buildReadSeeker(chunk *filer2.ChunkView) (io.ReadSeeker, error) {
  131. fileUrl, err := s3sink.filerSource.LookupFileId(chunk.FileId)
  132. if err != nil {
  133. return nil, err
  134. }
  135. buf := make([]byte, chunk.Size)
  136. util.ReadUrl(fileUrl, chunk.Offset, int(chunk.Size), buf)
  137. return bytes.NewReader(buf), nil
  138. }