You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

166 lines
4.5 KiB

5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
  1. package s3_backend
  2. import (
  3. "fmt"
  4. "io"
  5. "os"
  6. "strings"
  7. "time"
  8. "github.com/aws/aws-sdk-go/service/s3"
  9. "github.com/aws/aws-sdk-go/service/s3/s3iface"
  10. "github.com/chrislusf/seaweedfs/weed/glog"
  11. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  12. "github.com/chrislusf/seaweedfs/weed/storage/backend"
  13. "github.com/google/uuid"
  14. )
  15. func init() {
  16. backend.BackendStorageFactories["s3"] = &S3BackendFactory{}
  17. }
  18. type S3BackendFactory struct {
  19. }
  20. func (factory *S3BackendFactory) StorageType() backend.StorageType {
  21. return backend.StorageType("s3")
  22. }
  23. func (factory *S3BackendFactory) BuildStorage(configuration backend.StringProperties, id string) (backend.BackendStorage, error) {
  24. return newS3BackendStorage(configuration, id)
  25. }
  26. type S3BackendStorage struct {
  27. id string
  28. aws_access_key_id string
  29. aws_secret_access_key string
  30. region string
  31. bucket string
  32. conn s3iface.S3API
  33. }
  34. func newS3BackendStorage(configuration backend.StringProperties, id string) (s *S3BackendStorage, err error) {
  35. s = &S3BackendStorage{}
  36. s.id = id
  37. s.aws_access_key_id = configuration.GetString("aws_access_key_id")
  38. s.aws_secret_access_key = configuration.GetString("aws_secret_access_key")
  39. s.region = configuration.GetString("region")
  40. s.bucket = configuration.GetString("bucket")
  41. s.conn, err = createSession(s.aws_access_key_id, s.aws_secret_access_key, s.region)
  42. glog.V(0).Infof("created backend storage s3.%s for region %s bucket %s", s.id, s.region, s.bucket)
  43. return
  44. }
  45. func (s *S3BackendStorage) ToProperties() map[string]string {
  46. m := make(map[string]string)
  47. m["aws_access_key_id"] = s.aws_access_key_id
  48. m["aws_secret_access_key"] = s.aws_secret_access_key
  49. m["region"] = s.region
  50. m["bucket"] = s.bucket
  51. return m
  52. }
  53. func (s *S3BackendStorage) NewStorageFile(key string, tierInfo *volume_server_pb.VolumeTierInfo) backend.BackendStorageFile {
  54. if strings.HasPrefix(key, "/") {
  55. key = key[1:]
  56. }
  57. f := &S3BackendStorageFile{
  58. backendStorage: s,
  59. key: key,
  60. tierInfo: tierInfo,
  61. }
  62. return f
  63. }
  64. func (s *S3BackendStorage) CopyFile(f *os.File, fn func(progressed int64, percentage float32) error) (key string, size int64, err error) {
  65. randomUuid, _ := uuid.NewRandom()
  66. key = randomUuid.String()
  67. glog.V(1).Infof("copying dat file of %s to remote s3.%s as %s", f.Name(), s.id, key)
  68. size, err = uploadToS3(s.conn, f.Name(), s.bucket, key, fn)
  69. return
  70. }
  71. type S3BackendStorageFile struct {
  72. backendStorage *S3BackendStorage
  73. key string
  74. tierInfo *volume_server_pb.VolumeTierInfo
  75. }
  76. func (s3backendStorageFile S3BackendStorageFile) ReadAt(p []byte, off int64) (n int, err error) {
  77. bytesRange := fmt.Sprintf("bytes=%d-%d", off, off+int64(len(p))-1)
  78. // glog.V(0).Infof("read %s %s", s3backendStorageFile.key, bytesRange)
  79. getObjectOutput, getObjectErr := s3backendStorageFile.backendStorage.conn.GetObject(&s3.GetObjectInput{
  80. Bucket: &s3backendStorageFile.backendStorage.bucket,
  81. Key: &s3backendStorageFile.key,
  82. Range: &bytesRange,
  83. })
  84. if getObjectErr != nil {
  85. return 0, fmt.Errorf("bucket %s GetObject %s: %v", s3backendStorageFile.backendStorage.bucket, s3backendStorageFile.key, getObjectErr)
  86. }
  87. defer getObjectOutput.Body.Close()
  88. glog.V(4).Infof("read %s %s", s3backendStorageFile.key, bytesRange)
  89. glog.V(4).Infof("content range: %s, contentLength: %d", *getObjectOutput.ContentRange, *getObjectOutput.ContentLength)
  90. for {
  91. if n, err = getObjectOutput.Body.Read(p); err == nil && n < len(p) {
  92. p = p[n:]
  93. } else {
  94. break
  95. }
  96. }
  97. if err == io.EOF {
  98. err = nil
  99. }
  100. return
  101. }
  102. func (s3backendStorageFile S3BackendStorageFile) WriteAt(p []byte, off int64) (n int, err error) {
  103. panic("implement me")
  104. }
  105. func (s3backendStorageFile S3BackendStorageFile) Truncate(off int64) error {
  106. panic("implement me")
  107. }
  108. func (s3backendStorageFile S3BackendStorageFile) Close() error {
  109. return nil
  110. }
  111. func (s3backendStorageFile S3BackendStorageFile) GetStat() (datSize int64, modTime time.Time, err error) {
  112. files := s3backendStorageFile.tierInfo.GetFiles()
  113. if len(files) == 0 {
  114. err = fmt.Errorf("remote file info not found")
  115. return
  116. }
  117. datSize = int64(files[0].FileSize)
  118. modTime = time.Unix(int64(files[0].ModifiedTime), 0)
  119. return
  120. }
  121. func (s3backendStorageFile S3BackendStorageFile) String() string {
  122. return s3backendStorageFile.key
  123. }
  124. func (s3backendStorageFile *S3BackendStorageFile) GetName() string {
  125. return "s3"
  126. }
  127. func (s3backendStorageFile S3BackendStorageFile) Instantiate(src *os.File) error {
  128. panic("implement me")
  129. }