|
@ -3,6 +3,7 @@ package filersink |
|
|
import ( |
|
|
import ( |
|
|
"context" |
|
|
"context" |
|
|
"fmt" |
|
|
"fmt" |
|
|
|
|
|
"github.com/chrislusf/seaweedfs/weed/util" |
|
|
"sync" |
|
|
"sync" |
|
|
|
|
|
|
|
|
"google.golang.org/grpc" |
|
|
"google.golang.org/grpc" |
|
@ -59,11 +60,11 @@ func (fs *FilerSink) replicateOneChunk(sourceChunk *filer_pb.FileChunk, dir stri |
|
|
|
|
|
|
|
|
func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, dir string) (fileId string, err error) { |
|
|
func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, dir string) (fileId string, err error) { |
|
|
|
|
|
|
|
|
filename, header, readCloser, err := fs.filerSource.ReadPart(sourceChunk.GetFileIdString()) |
|
|
|
|
|
|
|
|
filename, header, resp, err := fs.filerSource.ReadPart(sourceChunk.GetFileIdString()) |
|
|
if err != nil { |
|
|
if err != nil { |
|
|
return "", fmt.Errorf("read part %s: %v", sourceChunk.GetFileIdString(), err) |
|
|
return "", fmt.Errorf("read part %s: %v", sourceChunk.GetFileIdString(), err) |
|
|
} |
|
|
} |
|
|
defer readCloser.Close() |
|
|
|
|
|
|
|
|
defer util.CloseResponse(resp) |
|
|
|
|
|
|
|
|
var host string |
|
|
var host string |
|
|
var auth security.EncodedJwt |
|
|
var auth security.EncodedJwt |
|
@ -100,9 +101,9 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, dir string) |
|
|
glog.V(4).Infof("replicating %s to %s header:%+v", filename, fileUrl, header) |
|
|
glog.V(4).Infof("replicating %s to %s header:%+v", filename, fileUrl, header) |
|
|
|
|
|
|
|
|
// fetch data as is, regardless whether it is encrypted or not
|
|
|
// fetch data as is, regardless whether it is encrypted or not
|
|
|
uploadResult, err, _ := operation.Upload(fileUrl, filename, false, readCloser, "gzip" == header.Get("Content-Encoding"), header.Get("Content-Type"), nil, auth) |
|
|
|
|
|
|
|
|
uploadResult, err, _ := operation.Upload(fileUrl, filename, false, resp.Body, "gzip" == header.Get("Content-Encoding"), header.Get("Content-Type"), nil, auth) |
|
|
if err != nil { |
|
|
if err != nil { |
|
|
glog.V(0).Infof("upload data %v to %s: %v", filename, fileUrl, err) |
|
|
|
|
|
|
|
|
glog.V(0).Infof("upload source data %v to %s: %v", sourceChunk.GetFileIdString(), fileUrl, err) |
|
|
return "", fmt.Errorf("upload data: %v", err) |
|
|
return "", fmt.Errorf("upload data: %v", err) |
|
|
} |
|
|
} |
|
|
if uploadResult.Error != "" { |
|
|
if uploadResult.Error != "" { |
|
|