You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							114 lines
						
					
					
						
							2.7 KiB
						
					
					
				
			
		
		
		
			
			
			
		
		
	
	
							114 lines
						
					
					
						
							2.7 KiB
						
					
					
				| package s3_backend | |
| 
 | |
| import ( | |
| 	"fmt" | |
| 	"os" | |
| 	"sync/atomic" | |
| 
 | |
| 	"github.com/aws/aws-sdk-go/aws" | |
| 	"github.com/aws/aws-sdk-go/service/s3/s3iface" | |
| 	"github.com/aws/aws-sdk-go/service/s3/s3manager" | |
| 
 | |
| 	"github.com/chrislusf/seaweedfs/weed/glog" | |
| ) | |
| 
 | |
| func uploadToS3(sess s3iface.S3API, filename string, destBucket string, destKey string, | |
| 	attributes map[string]string, | |
| 	fn func(progressed int64, percentage float32) error) (fileSize int64, err error) { | |
| 
 | |
| 	//open the file | |
| 	f, err := os.Open(filename) | |
| 	if err != nil { | |
| 		return 0, fmt.Errorf("failed to open file %q, %v", filename, err) | |
| 	} | |
| 	defer f.Close() | |
| 
 | |
| 	info, err := f.Stat() | |
| 	if err != nil { | |
| 		return 0, fmt.Errorf("failed to stat file %q, %v", filename, err) | |
| 	} | |
| 
 | |
| 	fileSize = info.Size() | |
| 
 | |
| 	partSize := int64(64 * 1024 * 1024) // The minimum/default allowed part size is 5MB | |
| 	for partSize*1000 < fileSize { | |
| 		partSize *= 4 | |
| 	} | |
| 
 | |
| 	// Create an uploader with the session and custom options | |
| 	uploader := s3manager.NewUploaderWithClient(sess, func(u *s3manager.Uploader) { | |
| 		u.PartSize = partSize | |
| 		u.Concurrency = 5 | |
| 	}) | |
| 
 | |
| 	fileReader := &s3UploadProgressedReader{ | |
| 		fp:   f, | |
| 		size: fileSize, | |
| 		read: -fileSize, | |
| 		fn:   fn, | |
| 	} | |
| 
 | |
| 	// process tagging | |
| 	tags := "" | |
| 	for k, v := range attributes { | |
| 		if len(tags) > 0 { | |
| 			tags = tags + "&" | |
| 		} | |
| 		tags = tags + k + "=" + v | |
| 	} | |
| 
 | |
| 	// Upload the file to S3. | |
| 	var result *s3manager.UploadOutput | |
| 	result, err = uploader.Upload(&s3manager.UploadInput{ | |
| 		Bucket:               aws.String(destBucket), | |
| 		Key:                  aws.String(destKey), | |
| 		Body:                 fileReader, | |
| 		ACL:                  aws.String("private"), | |
| 		ServerSideEncryption: aws.String("AES256"), | |
| 		StorageClass:         aws.String("STANDARD_IA"), | |
| 		Tagging:              aws.String(tags), | |
| 	}) | |
| 
 | |
| 	//in case it fails to upload | |
| 	if err != nil { | |
| 		return 0, fmt.Errorf("failed to upload file %s: %v", filename, err) | |
| 	} | |
| 	glog.V(1).Infof("file %s uploaded to %s\n", filename, result.Location) | |
| 
 | |
| 	return | |
| } | |
| 
 | |
| // adapted from https://github.com/aws/aws-sdk-go/pull/1868 | |
| type s3UploadProgressedReader struct { | |
| 	fp   *os.File | |
| 	size int64 | |
| 	read int64 | |
| 	fn   func(progressed int64, percentage float32) error | |
| } | |
| 
 | |
| func (r *s3UploadProgressedReader) Read(p []byte) (int, error) { | |
| 	return r.fp.Read(p) | |
| } | |
| 
 | |
| func (r *s3UploadProgressedReader) ReadAt(p []byte, off int64) (int, error) { | |
| 	n, err := r.fp.ReadAt(p, off) | |
| 	if err != nil { | |
| 		return n, err | |
| 	} | |
| 
 | |
| 	// Got the length have read( or means has uploaded), and you can construct your message | |
| 	atomic.AddInt64(&r.read, int64(n)) | |
| 
 | |
| 	if r.fn != nil { | |
| 		read := r.read | |
| 		if err := r.fn(read, float32(read*100)/float32(r.size)); err != nil { | |
| 			return n, err | |
| 		} | |
| 	} | |
| 
 | |
| 	return n, err | |
| } | |
| 
 | |
| func (r *s3UploadProgressedReader) Seek(offset int64, whence int) (int64, error) { | |
| 	return r.fp.Seek(offset, whence) | |
| }
 |