You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

119 lines
3.1 KiB

5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
  1. package s3_backend
  2. import (
  3. "fmt"
  4. "os"
  5. "strings"
  6. "time"
  7. "github.com/aws/aws-sdk-go/service/s3"
  8. "github.com/aws/aws-sdk-go/service/s3/s3iface"
  9. "github.com/chrislusf/seaweedfs/weed/glog"
  10. "github.com/chrislusf/seaweedfs/weed/storage/backend"
  11. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  12. "github.com/chrislusf/seaweedfs/weed/util"
  13. )
  14. var (
  15. _ backend.DataStorageBackend = &S3Backend{}
  16. )
  17. func init() {
  18. backend.StorageBackends = append(backend.StorageBackends, &S3Backend{})
  19. }
  20. type S3Backend struct {
  21. conn s3iface.S3API
  22. region string
  23. bucket string
  24. vid needle.VolumeId
  25. key string
  26. }
  27. func (s3backend S3Backend) ReadAt(p []byte, off int64) (n int, err error) {
  28. bytesRange := fmt.Sprintf("bytes=%d-%d", off, off+int64(len(p))-1)
  29. getObjectOutput, getObjectErr := s3backend.conn.GetObject(&s3.GetObjectInput{
  30. Bucket: &s3backend.bucket,
  31. Key: &s3backend.key,
  32. Range: &bytesRange,
  33. })
  34. if getObjectErr != nil {
  35. return 0, fmt.Errorf("bucket %s GetObject %s: %v", s3backend.bucket, s3backend.key, getObjectErr)
  36. }
  37. defer getObjectOutput.Body.Close()
  38. return getObjectOutput.Body.Read(p)
  39. }
  40. func (s3backend S3Backend) WriteAt(p []byte, off int64) (n int, err error) {
  41. panic("implement me")
  42. }
  43. func (s3backend S3Backend) Truncate(off int64) error {
  44. panic("implement me")
  45. }
  46. func (s3backend S3Backend) Close() error {
  47. return nil
  48. }
  49. func (s3backend S3Backend) GetStat() (datSize int64, modTime time.Time, err error) {
  50. headObjectOutput, headObjectErr := s3backend.conn.HeadObject(&s3.HeadObjectInput{
  51. Bucket: &s3backend.bucket,
  52. Key: &s3backend.key,
  53. })
  54. if headObjectErr != nil {
  55. return 0, time.Now(), fmt.Errorf("bucket %s HeadObject %s: %v", s3backend.bucket, s3backend.key, headObjectErr)
  56. }
  57. datSize = int64(*headObjectOutput.ContentLength)
  58. modTime = *headObjectOutput.LastModified
  59. return
  60. }
  61. func (s3backend S3Backend) String() string {
  62. return fmt.Sprintf("%s/%s", s3backend.bucket, s3backend.key)
  63. }
  64. func (s3backend *S3Backend) GetName() string {
  65. return "s3"
  66. }
  67. func (s3backend S3Backend) Instantiate(src *os.File) error {
  68. panic("implement me")
  69. }
  70. func (s3backend *S3Backend) Initialize(configuration util.Configuration, prefix string, vid needle.VolumeId) error {
  71. glog.V(0).Infof("storage.backend.s3.region: %v", configuration.GetString("region"))
  72. glog.V(0).Infof("storage.backend.s3.bucket: %v", configuration.GetString("bucket"))
  73. glog.V(0).Infof("storage.backend.s3.directory: %v", configuration.GetString("directory"))
  74. return s3backend.initialize(
  75. configuration.GetString("aws_access_key_id"),
  76. configuration.GetString("aws_secret_access_key"),
  77. configuration.GetString("region"),
  78. configuration.GetString("bucket"),
  79. prefix,
  80. vid,
  81. )
  82. }
  83. func (s3backend *S3Backend) initialize(awsAccessKeyId, awsSecretAccessKey, region, bucket string,
  84. prefix string, vid needle.VolumeId) (err error) {
  85. s3backend.region = region
  86. s3backend.bucket = bucket
  87. s3backend.conn, err = createSession(awsAccessKeyId, awsSecretAccessKey, region)
  88. s3backend.vid = vid
  89. s3backend.key = fmt.Sprintf("%s_%d.dat", prefix, vid)
  90. if strings.HasPrefix(s3backend.key, "/") {
  91. s3backend.key = s3backend.key[1:]
  92. }
  93. return err
  94. }