You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

102 lines
2.5 KiB

5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
  1. package s3_backend
  2. import (
  3. "fmt"
  4. "os"
  5. "sync/atomic"
  6. "github.com/aws/aws-sdk-go/aws"
  7. "github.com/aws/aws-sdk-go/service/s3/s3iface"
  8. "github.com/aws/aws-sdk-go/service/s3/s3manager"
  9. "github.com/chrislusf/seaweedfs/weed/glog"
  10. )
  11. func uploadToS3(sess s3iface.S3API, filename string, destBucket string, destKey string,
  12. fn func(progressed int64, percentage float32) error) (fileSize int64, err error) {
  13. //open the file
  14. f, err := os.Open(filename)
  15. if err != nil {
  16. return 0, fmt.Errorf("failed to open file %q, %v", filename, err)
  17. }
  18. defer f.Close()
  19. info, err := f.Stat()
  20. if err != nil {
  21. return 0, fmt.Errorf("failed to stat file %q, %v", filename, err)
  22. }
  23. fileSize = info.Size()
  24. partSize := int64(64 * 1024 * 1024) // The minimum/default allowed part size is 5MB
  25. for partSize*1000 < fileSize {
  26. partSize *= 4
  27. }
  28. // Create an uploader with the session and custom options
  29. uploader := s3manager.NewUploaderWithClient(sess, func(u *s3manager.Uploader) {
  30. u.PartSize = partSize
  31. u.Concurrency = 5
  32. })
  33. fileReader := &s3UploadProgressedReader{
  34. fp: f,
  35. size: fileSize,
  36. read: -fileSize,
  37. fn: fn,
  38. }
  39. // Upload the file to S3.
  40. var result *s3manager.UploadOutput
  41. result, err = uploader.Upload(&s3manager.UploadInput{
  42. Bucket: aws.String(destBucket),
  43. Key: aws.String(destKey),
  44. Body: fileReader,
  45. ACL: aws.String("private"),
  46. ServerSideEncryption: aws.String("AES256"),
  47. StorageClass: aws.String("STANDARD_IA"),
  48. })
  49. //in case it fails to upload
  50. if err != nil {
  51. return 0, fmt.Errorf("failed to upload file %s: %v", filename, err)
  52. }
  53. glog.V(1).Infof("file %s uploaded to %s\n", filename, result.Location)
  54. return
  55. }
  56. // adapted from https://github.com/aws/aws-sdk-go/pull/1868
  57. type s3UploadProgressedReader struct {
  58. fp *os.File
  59. size int64
  60. read int64
  61. fn func(progressed int64, percentage float32) error
  62. }
  63. func (r *s3UploadProgressedReader) Read(p []byte) (int, error) {
  64. return r.fp.Read(p)
  65. }
  66. func (r *s3UploadProgressedReader) ReadAt(p []byte, off int64) (int, error) {
  67. n, err := r.fp.ReadAt(p, off)
  68. if err != nil {
  69. return n, err
  70. }
  71. // Got the length have read( or means has uploaded), and you can construct your message
  72. atomic.AddInt64(&r.read, int64(n))
  73. if r.fn != nil {
  74. read := r.read
  75. if err := r.fn(read, float32(read*100)/float32(r.size)); err != nil {
  76. return n, err
  77. }
  78. }
  79. return n, err
  80. }
  81. func (r *s3UploadProgressedReader) Seek(offset int64, whence int) (int64, error) {
  82. return r.fp.Seek(offset, whence)
  83. }