Chris Lu
5 years ago
7 changed files with 195 additions and 9 deletions
-
8weed/command/scaffold.go
-
6weed/notification/aws_sqs/aws_sqs_pub.go
-
6weed/replication/sink/s3sink/s3_sink.go
-
6weed/replication/sub/notification_aws_sqs.go
-
4weed/storage/backend/backend.go
-
120weed/storage/backend/s3_backend/s3_backend.go
-
54weed/storage/backend/s3_backend/s3_sessions.go
@ -0,0 +1,120 @@ |
|||||
|
package s3_backend |
||||
|
|
||||
|
import ( |
||||
|
"fmt" |
||||
|
"strings" |
||||
|
"time" |
||||
|
|
||||
|
"github.com/aws/aws-sdk-go/service/s3" |
||||
|
"github.com/aws/aws-sdk-go/service/s3/s3iface" |
||||
|
"github.com/chrislusf/seaweedfs/weed/glog" |
||||
|
"github.com/chrislusf/seaweedfs/weed/storage/backend" |
||||
|
"github.com/chrislusf/seaweedfs/weed/storage/needle" |
||||
|
"github.com/chrislusf/seaweedfs/weed/util" |
||||
|
) |
||||
|
|
||||
|
var ( |
||||
|
_ backend.DataStorageBackend = &S3Backend{} |
||||
|
) |
||||
|
|
||||
|
func init() { |
||||
|
backend.StorageBackends = append(backend.StorageBackends, &S3Backend{}) |
||||
|
} |
||||
|
|
||||
|
type S3Backend struct { |
||||
|
conn s3iface.S3API |
||||
|
region string |
||||
|
bucket string |
||||
|
dir string |
||||
|
vid needle.VolumeId |
||||
|
key string |
||||
|
} |
||||
|
|
||||
|
func (s3backend S3Backend) ReadAt(p []byte, off int64) (n int, err error) { |
||||
|
bytesRange := fmt.Sprintf("bytes=%d-%d", off, off+int64(len(p))-1) |
||||
|
getObjectOutput, getObjectErr := s3backend.conn.GetObject(&s3.GetObjectInput{ |
||||
|
Bucket: &s3backend.bucket, |
||||
|
Key: &s3backend.key, |
||||
|
Range: &bytesRange, |
||||
|
}) |
||||
|
|
||||
|
if getObjectErr != nil { |
||||
|
return 0, fmt.Errorf("bucket %s GetObject %s: %v", s3backend.bucket, s3backend.key, getObjectErr) |
||||
|
} |
||||
|
defer getObjectOutput.Body.Close() |
||||
|
|
||||
|
return getObjectOutput.Body.Read(p) |
||||
|
|
||||
|
} |
||||
|
|
||||
|
func (s3backend S3Backend) WriteAt(p []byte, off int64) (n int, err error) { |
||||
|
panic("implement me") |
||||
|
} |
||||
|
|
||||
|
func (s3backend S3Backend) Truncate(off int64) error { |
||||
|
panic("implement me") |
||||
|
} |
||||
|
|
||||
|
func (s3backend S3Backend) Close() error { |
||||
|
return nil |
||||
|
} |
||||
|
|
||||
|
func (s3backend S3Backend) GetStat() (datSize int64, modTime time.Time, err error) { |
||||
|
|
||||
|
headObjectOutput, headObjectErr := s3backend.conn.HeadObject(&s3.HeadObjectInput{ |
||||
|
Bucket: &s3backend.bucket, |
||||
|
Key: &s3backend.key, |
||||
|
}) |
||||
|
|
||||
|
if headObjectErr != nil { |
||||
|
return 0, time.Now(), fmt.Errorf("bucket %s HeadObject %s: %v", s3backend.bucket, s3backend.key, headObjectErr) |
||||
|
} |
||||
|
|
||||
|
datSize = int64(*headObjectOutput.ContentLength) |
||||
|
modTime = *headObjectOutput.LastModified |
||||
|
|
||||
|
return |
||||
|
} |
||||
|
|
||||
|
func (s3backend S3Backend) String() string { |
||||
|
return fmt.Sprintf("%s/%s", s3backend.bucket, s3backend.key) |
||||
|
} |
||||
|
|
||||
|
func (s3backend *S3Backend) GetName() string { |
||||
|
return "s3" |
||||
|
} |
||||
|
|
||||
|
func (s3backend *S3Backend) GetSinkToDirectory() string { |
||||
|
return s3backend.dir |
||||
|
} |
||||
|
|
||||
|
func (s3backend *S3Backend) Initialize(configuration util.Configuration, vid needle.VolumeId) error { |
||||
|
glog.V(0).Infof("storage.backend.s3.region: %v", configuration.GetString("region")) |
||||
|
glog.V(0).Infof("storage.backend.s3.bucket: %v", configuration.GetString("bucket")) |
||||
|
glog.V(0).Infof("storage.backend.s3.directory: %v", configuration.GetString("directory")) |
||||
|
|
||||
|
return s3backend.initialize( |
||||
|
configuration.GetString("aws_access_key_id"), |
||||
|
configuration.GetString("aws_secret_access_key"), |
||||
|
configuration.GetString("region"), |
||||
|
configuration.GetString("bucket"), |
||||
|
configuration.GetString("directory"), |
||||
|
vid, |
||||
|
) |
||||
|
} |
||||
|
|
||||
|
func (s3backend *S3Backend) initialize(awsAccessKeyId, awsSecretAccessKey, region, bucket, dir string, |
||||
|
vid needle.VolumeId) (err error) { |
||||
|
s3backend.region = region |
||||
|
s3backend.bucket = bucket |
||||
|
s3backend.dir = dir |
||||
|
s3backend.conn, err = createSession(awsAccessKeyId, awsSecretAccessKey, region) |
||||
|
|
||||
|
s3backend.vid = vid |
||||
|
s3backend.key = fmt.Sprintf("%s/%d.dat", dir, vid) |
||||
|
if strings.HasPrefix(s3backend.key, "/") { |
||||
|
s3backend.key = s3backend.key[1:] |
||||
|
} |
||||
|
|
||||
|
return err |
||||
|
} |
@ -0,0 +1,54 @@ |
|||||
|
package s3_backend |
||||
|
|
||||
|
import ( |
||||
|
"fmt" |
||||
|
"sync" |
||||
|
|
||||
|
"github.com/aws/aws-sdk-go/aws" |
||||
|
"github.com/aws/aws-sdk-go/aws/credentials" |
||||
|
"github.com/aws/aws-sdk-go/aws/session" |
||||
|
"github.com/aws/aws-sdk-go/service/s3" |
||||
|
"github.com/aws/aws-sdk-go/service/s3/s3iface" |
||||
|
) |
||||
|
|
||||
|
var ( |
||||
|
s3Sessions = make(map[string]s3iface.S3API) |
||||
|
sessionsLock sync.RWMutex |
||||
|
) |
||||
|
|
||||
|
func getSession(region string) (s3iface.S3API, bool) { |
||||
|
sessionsLock.RLock() |
||||
|
defer sessionsLock.RUnlock() |
||||
|
|
||||
|
sess, found := s3Sessions[region] |
||||
|
return sess, found |
||||
|
} |
||||
|
|
||||
|
func createSession(awsAccessKeyId, awsSecretAccessKey, region string) (s3iface.S3API, error) { |
||||
|
|
||||
|
sessionsLock.Lock() |
||||
|
defer sessionsLock.Unlock() |
||||
|
|
||||
|
if t, found := s3Sessions[region]; found { |
||||
|
return t, nil |
||||
|
} |
||||
|
|
||||
|
config := &aws.Config{ |
||||
|
Region: aws.String(region), |
||||
|
} |
||||
|
if awsAccessKeyId != "" && awsSecretAccessKey != "" { |
||||
|
config.Credentials = credentials.NewStaticCredentials(awsAccessKeyId, awsSecretAccessKey, "") |
||||
|
} |
||||
|
|
||||
|
sess, err := session.NewSession(config) |
||||
|
if err != nil { |
||||
|
return nil, fmt.Errorf("create aws session in region %s: %v", region, err) |
||||
|
} |
||||
|
|
||||
|
t:= s3.New(sess) |
||||
|
|
||||
|
s3Sessions[region] = t |
||||
|
|
||||
|
return t, nil |
||||
|
|
||||
|
} |
Write
Preview
Loading…
Cancel
Save
Reference in new issue