|
|
@ -1,7 +1,6 @@ |
|
|
|
package filersink |
|
|
|
|
|
|
|
import ( |
|
|
|
"context" |
|
|
|
"fmt" |
|
|
|
"github.com/seaweedfs/seaweedfs/weed/util" |
|
|
|
"sync" |
|
|
@ -12,7 +11,6 @@ import ( |
|
|
|
"github.com/seaweedfs/seaweedfs/weed/operation" |
|
|
|
"github.com/seaweedfs/seaweedfs/weed/pb" |
|
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" |
|
|
|
"github.com/seaweedfs/seaweedfs/weed/security" |
|
|
|
) |
|
|
|
|
|
|
|
func (fs *FilerSink) replicateChunks(sourceChunks []*filer_pb.FileChunk, path string) (replicatedChunks []*filer_pb.FileChunk, err error) { |
|
|
@ -67,62 +65,41 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string) |
|
|
|
} |
|
|
|
defer util.CloseResponse(resp) |
|
|
|
|
|
|
|
var host string |
|
|
|
var auth security.EncodedJwt |
|
|
|
|
|
|
|
if err := fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { |
|
|
|
return util.Retry("assignVolume", func() error { |
|
|
|
request := &filer_pb.AssignVolumeRequest{ |
|
|
|
Count: 1, |
|
|
|
Replication: fs.replication, |
|
|
|
Collection: fs.collection, |
|
|
|
TtlSec: fs.ttlSec, |
|
|
|
DataCenter: fs.dataCenter, |
|
|
|
DiskType: fs.diskType, |
|
|
|
Path: path, |
|
|
|
fileId, uploadResult, err, _ := operation.UploadWithRetry( |
|
|
|
fs, |
|
|
|
&filer_pb.AssignVolumeRequest{ |
|
|
|
Count: 1, |
|
|
|
Replication: fs.replication, |
|
|
|
Collection: fs.collection, |
|
|
|
TtlSec: fs.ttlSec, |
|
|
|
DataCenter: fs.dataCenter, |
|
|
|
DiskType: fs.diskType, |
|
|
|
Path: path, |
|
|
|
}, |
|
|
|
&operation.UploadOption{ |
|
|
|
Filename: filename, |
|
|
|
Cipher: false, |
|
|
|
IsInputCompressed: "gzip" == header.Get("Content-Encoding"), |
|
|
|
MimeType: header.Get("Content-Type"), |
|
|
|
PairMap: nil, |
|
|
|
}, |
|
|
|
func(host, fileId string) string { |
|
|
|
fileUrl := fmt.Sprintf("http://%s/%s", host, fileId) |
|
|
|
if fs.writeChunkByFiler { |
|
|
|
fileUrl = fmt.Sprintf("http://%s/?proxyChunkId=%s", fs.address, fileId) |
|
|
|
} |
|
|
|
glog.V(4).Infof("replicating %s to %s header:%+v", filename, fileUrl, header) |
|
|
|
return fileUrl |
|
|
|
}, |
|
|
|
resp.Body, |
|
|
|
) |
|
|
|
|
|
|
|
resp, err := client.AssignVolume(context.Background(), request) |
|
|
|
if err != nil { |
|
|
|
glog.V(0).Infof("assign volume failure %v: %v", request, err) |
|
|
|
return err |
|
|
|
} |
|
|
|
if resp.Error != "" { |
|
|
|
return fmt.Errorf("assign volume failure %v: %v", request, resp.Error) |
|
|
|
} |
|
|
|
|
|
|
|
fileId, host, auth = resp.FileId, resp.Location.Url, security.EncodedJwt(resp.Auth) |
|
|
|
|
|
|
|
return nil |
|
|
|
}) |
|
|
|
}); err != nil { |
|
|
|
return "", fmt.Errorf("filerGrpcAddress assign volume: %v", err) |
|
|
|
} |
|
|
|
|
|
|
|
fileUrl := fmt.Sprintf("http://%s/%s", host, fileId) |
|
|
|
if fs.writeChunkByFiler { |
|
|
|
fileUrl = fmt.Sprintf("http://%s/?proxyChunkId=%s", fs.address, fileId) |
|
|
|
} |
|
|
|
|
|
|
|
glog.V(4).Infof("replicating %s to %s header:%+v", filename, fileUrl, header) |
|
|
|
|
|
|
|
// fetch data as is, regardless whether it is encrypted or not
|
|
|
|
uploadOption := &operation.UploadOption{ |
|
|
|
UploadUrl: fileUrl, |
|
|
|
Filename: filename, |
|
|
|
Cipher: false, |
|
|
|
IsInputCompressed: "gzip" == header.Get("Content-Encoding"), |
|
|
|
MimeType: header.Get("Content-Type"), |
|
|
|
PairMap: nil, |
|
|
|
Jwt: auth, |
|
|
|
} |
|
|
|
uploadResult, err, _ := operation.Upload(resp.Body, uploadOption) |
|
|
|
if err != nil { |
|
|
|
glog.V(0).Infof("upload source data %v to %s: %v", sourceChunk.GetFileIdString(), fileUrl, err) |
|
|
|
glog.V(0).Infof("upload source data %v: %v", sourceChunk.GetFileIdString(), err) |
|
|
|
return "", fmt.Errorf("upload data: %v", err) |
|
|
|
} |
|
|
|
if uploadResult.Error != "" { |
|
|
|
glog.V(0).Infof("upload failure %v to %s: %v", filename, fileUrl, err) |
|
|
|
glog.V(0).Infof("upload failure %v: %v", filename, err) |
|
|
|
return "", fmt.Errorf("upload result: %v", uploadResult.Error) |
|
|
|
} |
|
|
|
|
|
|
|