You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

223 lines
7.5 KiB

6 years ago
6 years ago
3 years ago
5 years ago
6 years ago
2 years ago
2 years ago
6 years ago
6 years ago
6 years ago
  1. package S3Sink
  2. import (
  3. "fmt"
  4. "github.com/aws/aws-sdk-go/aws"
  5. "github.com/aws/aws-sdk-go/aws/credentials"
  6. "github.com/aws/aws-sdk-go/aws/session"
  7. "github.com/aws/aws-sdk-go/service/s3"
  8. "github.com/aws/aws-sdk-go/service/s3/s3iface"
  9. "github.com/aws/aws-sdk-go/service/s3/s3manager"
  10. "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
  11. "strconv"
  12. "strings"
  13. "github.com/seaweedfs/seaweedfs/weed/filer"
  14. "github.com/seaweedfs/seaweedfs/weed/glog"
  15. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  16. "github.com/seaweedfs/seaweedfs/weed/replication/sink"
  17. "github.com/seaweedfs/seaweedfs/weed/replication/source"
  18. "github.com/seaweedfs/seaweedfs/weed/util"
  19. )
  20. type S3Sink struct {
  21. conn s3iface.S3API
  22. filerSource *source.FilerSource
  23. isIncremental bool
  24. keepPartSize bool
  25. s3DisableContentMD5Validation bool
  26. s3ForcePathStyle bool
  27. uploaderConcurrency int
  28. uploaderMaxUploadParts int
  29. uploaderPartSizeMb int
  30. region string
  31. bucket string
  32. dir string
  33. endpoint string
  34. acl string
  35. }
  36. func init() {
  37. sink.Sinks = append(sink.Sinks, &S3Sink{})
  38. }
  39. func (s3sink *S3Sink) GetName() string {
  40. return "s3"
  41. }
  42. func (s3sink *S3Sink) GetSinkToDirectory() string {
  43. return s3sink.dir
  44. }
  45. func (s3sink *S3Sink) IsIncremental() bool {
  46. return s3sink.isIncremental
  47. }
  48. func (s3sink *S3Sink) Initialize(configuration util.Configuration, prefix string) error {
  49. configuration.SetDefault(prefix+"region", "us-east-2")
  50. configuration.SetDefault(prefix+"directory", "/")
  51. configuration.SetDefault(prefix+"keep_part_size", true)
  52. configuration.SetDefault(prefix+"uploader_max_upload_parts", 1000)
  53. configuration.SetDefault(prefix+"uploader_part_size_mb", 8)
  54. configuration.SetDefault(prefix+"uploader_concurrency", 8)
  55. configuration.SetDefault(prefix+"s3_disable_content_md5_validation", true)
  56. configuration.SetDefault(prefix+"s3_force_path_style", true)
  57. s3sink.region = configuration.GetString(prefix + "region")
  58. s3sink.bucket = configuration.GetString(prefix + "bucket")
  59. s3sink.dir = configuration.GetString(prefix + "directory")
  60. s3sink.endpoint = configuration.GetString(prefix + "endpoint")
  61. s3sink.acl = configuration.GetString(prefix + "acl")
  62. s3sink.isIncremental = configuration.GetBool(prefix + "is_incremental")
  63. s3sink.keepPartSize = configuration.GetBool(prefix + "keep_part_size")
  64. s3sink.s3DisableContentMD5Validation = configuration.GetBool(prefix + "s3_disable_content_md5_validation")
  65. s3sink.s3ForcePathStyle = configuration.GetBool(prefix + "s3_force_path_style")
  66. s3sink.uploaderMaxUploadParts = configuration.GetInt(prefix + "uploader_max_upload_parts")
  67. s3sink.uploaderPartSizeMb = configuration.GetInt(prefix + "uploader_part_size")
  68. s3sink.uploaderConcurrency = configuration.GetInt(prefix + "uploader_concurrency")
  69. glog.V(0).Infof("sink.s3.region: %v", s3sink.region)
  70. glog.V(0).Infof("sink.s3.bucket: %v", s3sink.bucket)
  71. glog.V(0).Infof("sink.s3.directory: %v", s3sink.dir)
  72. glog.V(0).Infof("sink.s3.endpoint: %v", s3sink.endpoint)
  73. glog.V(0).Infof("sink.s3.acl: %v", s3sink.acl)
  74. glog.V(0).Infof("sink.s3.is_incremental: %v", s3sink.isIncremental)
  75. glog.V(0).Infof("sink.s3.s3_disable_content_md5_validation: %v", s3sink.s3DisableContentMD5Validation)
  76. glog.V(0).Infof("sink.s3.s3_force_path_style: %v", s3sink.s3ForcePathStyle)
  77. glog.V(0).Infof("sink.s3.keep_part_size: %v", s3sink.keepPartSize)
  78. if s3sink.uploaderMaxUploadParts > s3manager.MaxUploadParts {
  79. s3sink.uploaderMaxUploadParts = s3manager.MaxUploadParts
  80. glog.Warningf("uploader_max_upload_parts is greater than the maximum number of parts allowed when uploading multiple parts to Amazon S3")
  81. glog.V(0).Infof("sink.s3.uploader_max_upload_parts: %v => %v", s3sink.uploaderMaxUploadParts, s3manager.MaxUploadParts)
  82. } else {
  83. glog.V(0).Infof("sink.s3.uploader_max_upload_parts: %v", s3sink.uploaderMaxUploadParts)
  84. }
  85. glog.V(0).Infof("sink.s3.uploader_part_size_mb: %v", s3sink.uploaderPartSizeMb)
  86. glog.V(0).Infof("sink.s3.uploader_concurrency: %v", s3sink.uploaderConcurrency)
  87. return s3sink.initialize(
  88. configuration.GetString(prefix+"aws_access_key_id"),
  89. configuration.GetString(prefix+"aws_secret_access_key"),
  90. )
  91. }
  92. func (s3sink *S3Sink) SetSourceFiler(s *source.FilerSource) {
  93. s3sink.filerSource = s
  94. }
  95. func (s3sink *S3Sink) initialize(awsAccessKeyId, awsSecretAccessKey string) error {
  96. config := &aws.Config{
  97. Region: aws.String(s3sink.region),
  98. Endpoint: aws.String(s3sink.endpoint),
  99. S3DisableContentMD5Validation: aws.Bool(s3sink.s3DisableContentMD5Validation),
  100. S3ForcePathStyle: aws.Bool(s3sink.s3ForcePathStyle),
  101. }
  102. if awsAccessKeyId != "" && awsSecretAccessKey != "" {
  103. config.Credentials = credentials.NewStaticCredentials(awsAccessKeyId, awsSecretAccessKey, "")
  104. }
  105. sess, err := session.NewSession(config)
  106. if err != nil {
  107. return fmt.Errorf("create aws session: %v", err)
  108. }
  109. s3sink.conn = s3.New(sess)
  110. return nil
  111. }
  112. func (s3sink *S3Sink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool, signatures []int32) error {
  113. key = cleanKey(key)
  114. if isDirectory {
  115. return nil
  116. }
  117. input := &s3.DeleteObjectInput{
  118. Bucket: aws.String(s3sink.bucket),
  119. Key: aws.String(key),
  120. }
  121. result, err := s3sink.conn.DeleteObject(input)
  122. if err == nil {
  123. glog.V(2).Infof("[%s] delete %s: %v", s3sink.bucket, key, result)
  124. } else {
  125. glog.Errorf("[%s] delete %s: %v", s3sink.bucket, key, err)
  126. }
  127. return err
  128. }
  129. func (s3sink *S3Sink) CreateEntry(key string, entry *filer_pb.Entry, signatures []int32) (err error) {
  130. key = cleanKey(key)
  131. if entry.IsDirectory {
  132. return nil
  133. }
  134. reader := filer.NewFileReader(s3sink.filerSource, entry)
  135. // Create an uploader with the session and custom options
  136. uploader := s3manager.NewUploaderWithClient(s3sink.conn, func(u *s3manager.Uploader) {
  137. u.PartSize = int64(s3sink.uploaderPartSizeMb * 1024 * 1024)
  138. u.Concurrency = s3sink.uploaderConcurrency
  139. u.MaxUploadParts = s3sink.uploaderMaxUploadParts
  140. })
  141. if s3sink.keepPartSize {
  142. switch chunkCount := len(entry.Chunks); {
  143. case chunkCount > 1:
  144. if firstChunkSize := int64(entry.Chunks[0].Size); firstChunkSize > s3manager.MinUploadPartSize {
  145. uploader.PartSize = firstChunkSize
  146. }
  147. default:
  148. uploader.PartSize = 0
  149. }
  150. }
  151. doSaveMtime := true
  152. if entry.Extended == nil {
  153. entry.Extended = make(map[string][]byte)
  154. } else if _, ok := entry.Extended[s3_constants.AmzUserMetaMtime]; ok {
  155. doSaveMtime = false
  156. }
  157. if doSaveMtime {
  158. entry.Extended[s3_constants.AmzUserMetaMtime] = []byte(strconv.FormatInt(entry.Attributes.Mtime, 10))
  159. }
  160. // process tagging
  161. tags := ""
  162. for k, v := range entry.Extended {
  163. if len(tags) > 0 {
  164. tags = tags + "&"
  165. }
  166. tags = tags + k + "=" + string(v)
  167. }
  168. // Upload the file to S3.
  169. uploadInput := s3manager.UploadInput{
  170. Bucket: aws.String(s3sink.bucket),
  171. Key: aws.String(key),
  172. Body: reader,
  173. Tagging: aws.String(tags),
  174. }
  175. if len(entry.Attributes.Md5) > 0 {
  176. uploadInput.ContentMD5 = aws.String(fmt.Sprintf("%x", entry.Attributes.Md5))
  177. }
  178. _, err = uploader.Upload(&uploadInput)
  179. return err
  180. }
  181. func (s3sink *S3Sink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool, signatures []int32) (foundExistingEntry bool, err error) {
  182. key = cleanKey(key)
  183. return true, s3sink.CreateEntry(key, newEntry, signatures)
  184. }
  185. func cleanKey(key string) string {
  186. if strings.HasPrefix(key, "/") {
  187. key = key[1:]
  188. }
  189. return key
  190. }