You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							128 lines
						
					
					
						
							3.6 KiB
						
					
					
				
			
		
		
		
			
			
			
		
		
	
	
							128 lines
						
					
					
						
							3.6 KiB
						
					
					
				
								package filersink
							 | 
						|
								
							 | 
						|
								import (
							 | 
						|
									"context"
							 | 
						|
									"fmt"
							 | 
						|
									"sync"
							 | 
						|
								
							 | 
						|
									"google.golang.org/grpc"
							 | 
						|
								
							 | 
						|
									"github.com/chrislusf/seaweedfs/weed/glog"
							 | 
						|
									"github.com/chrislusf/seaweedfs/weed/operation"
							 | 
						|
									"github.com/chrislusf/seaweedfs/weed/pb"
							 | 
						|
									"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
							 | 
						|
									"github.com/chrislusf/seaweedfs/weed/security"
							 | 
						|
								)
							 | 
						|
								
							 | 
						|
								func (fs *FilerSink) replicateChunks(sourceChunks []*filer_pb.FileChunk, dir string) (replicatedChunks []*filer_pb.FileChunk, err error) {
							 | 
						|
									if len(sourceChunks) == 0 {
							 | 
						|
										return
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									replicatedChunks = make([]*filer_pb.FileChunk, len(sourceChunks))
							 | 
						|
								
							 | 
						|
									var wg sync.WaitGroup
							 | 
						|
									for chunkIndex, sourceChunk := range sourceChunks {
							 | 
						|
										wg.Add(1)
							 | 
						|
										go func(chunk *filer_pb.FileChunk, index int) {
							 | 
						|
											defer wg.Done()
							 | 
						|
											replicatedChunk, e := fs.replicateOneChunk(chunk, dir)
							 | 
						|
											if e != nil {
							 | 
						|
												err = e
							 | 
						|
											}
							 | 
						|
											replicatedChunks[index] = replicatedChunk
							 | 
						|
										}(sourceChunk, chunkIndex)
							 | 
						|
									}
							 | 
						|
									wg.Wait()
							 | 
						|
								
							 | 
						|
									return
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func (fs *FilerSink) replicateOneChunk(sourceChunk *filer_pb.FileChunk, dir string) (*filer_pb.FileChunk, error) {
							 | 
						|
								
							 | 
						|
									fileId, err := fs.fetchAndWrite(sourceChunk, dir)
							 | 
						|
									if err != nil {
							 | 
						|
										return nil, fmt.Errorf("copy %s: %v", sourceChunk.GetFileIdString(), err)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									return &filer_pb.FileChunk{
							 | 
						|
										FileId:       fileId,
							 | 
						|
										Offset:       sourceChunk.Offset,
							 | 
						|
										Size:         sourceChunk.Size,
							 | 
						|
										Mtime:        sourceChunk.Mtime,
							 | 
						|
										ETag:         sourceChunk.ETag,
							 | 
						|
										SourceFileId: sourceChunk.GetFileIdString(),
							 | 
						|
										CipherKey:    sourceChunk.CipherKey,
							 | 
						|
										IsCompressed: sourceChunk.IsCompressed,
							 | 
						|
									}, nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, dir string) (fileId string, err error) {
							 | 
						|
								
							 | 
						|
									filename, header, readCloser, err := fs.filerSource.ReadPart(sourceChunk.GetFileIdString())
							 | 
						|
									if err != nil {
							 | 
						|
										return "", fmt.Errorf("read part %s: %v", sourceChunk.GetFileIdString(), err)
							 | 
						|
									}
							 | 
						|
									defer readCloser.Close()
							 | 
						|
								
							 | 
						|
									var host string
							 | 
						|
									var auth security.EncodedJwt
							 | 
						|
								
							 | 
						|
									if err := fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
							 | 
						|
								
							 | 
						|
										request := &filer_pb.AssignVolumeRequest{
							 | 
						|
											Count:       1,
							 | 
						|
											Replication: fs.replication,
							 | 
						|
											Collection:  fs.collection,
							 | 
						|
											TtlSec:      fs.ttlSec,
							 | 
						|
											DataCenter:  fs.dataCenter,
							 | 
						|
											ParentPath:  dir,
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										resp, err := client.AssignVolume(context.Background(), request)
							 | 
						|
										if err != nil {
							 | 
						|
											glog.V(0).Infof("assign volume failure %v: %v", request, err)
							 | 
						|
											return err
							 | 
						|
										}
							 | 
						|
										if resp.Error != "" {
							 | 
						|
											return fmt.Errorf("assign volume failure %v: %v", request, resp.Error)
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										fileId, host, auth = resp.FileId, resp.Url, security.EncodedJwt(resp.Auth)
							 | 
						|
								
							 | 
						|
										return nil
							 | 
						|
									}); err != nil {
							 | 
						|
										return "", fmt.Errorf("filerGrpcAddress assign volume: %v", err)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
							 | 
						|
								
							 | 
						|
									glog.V(4).Infof("replicating %s to %s header:%+v", filename, fileUrl, header)
							 | 
						|
								
							 | 
						|
									// fetch data as is, regardless whether it is encrypted or not
							 | 
						|
									uploadResult, err, _ := operation.Upload(fileUrl, filename, false, readCloser, "gzip" == header.Get("Content-Encoding"), header.Get("Content-Type"), nil, auth)
							 | 
						|
									if err != nil {
							 | 
						|
										glog.V(0).Infof("upload data %v to %s: %v", filename, fileUrl, err)
							 | 
						|
										return "", fmt.Errorf("upload data: %v", err)
							 | 
						|
									}
							 | 
						|
									if uploadResult.Error != "" {
							 | 
						|
										glog.V(0).Infof("upload failure %v to %s: %v", filename, fileUrl, err)
							 | 
						|
										return "", fmt.Errorf("upload result: %v", uploadResult.Error)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									return
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								var _ = filer_pb.FilerClient(&FilerSink{})
							 | 
						|
								
							 | 
						|
								func (fs *FilerSink) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
							 | 
						|
								
							 | 
						|
									return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
							 | 
						|
										client := filer_pb.NewSeaweedFilerClient(grpcConnection)
							 | 
						|
										return fn(client)
							 | 
						|
									}, fs.grpcAddress, fs.grpcDialOption)
							 | 
						|
								
							 | 
						|
								}
							 | 
						|
								func (fs *FilerSink) AdjustedUrl(hostAndPort string) string {
							 | 
						|
									return hostAndPort
							 | 
						|
								}
							 |