|
|
@ -48,6 +48,7 @@ func (vs *VolumeServer) VolumeCopy(req *volume_server_pb.VolumeCopyRequest, stre |
|
|
|
// confirm size and timestamp
|
|
|
|
var volFileInfoResp *volume_server_pb.ReadVolumeFileStatusResponse |
|
|
|
var dataBaseFileName, indexBaseFileName, idxFileName, datFileName string |
|
|
|
var hasRemoteDatFile bool |
|
|
|
err := operation.WithVolumeServerClient(true, pb.ServerAddress(req.SourceDataNode), vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { |
|
|
|
var err error |
|
|
|
volFileInfoResp, err = client.ReadVolumeFileStatus(context.Background(), |
|
|
@ -69,6 +70,7 @@ func (vs *VolumeServer) VolumeCopy(req *volume_server_pb.VolumeCopyRequest, stre |
|
|
|
|
|
|
|
dataBaseFileName = storage.VolumeFileName(location.Directory, volFileInfoResp.Collection, int(req.VolumeId)) |
|
|
|
indexBaseFileName = storage.VolumeFileName(location.IdxDirectory, volFileInfoResp.Collection, int(req.VolumeId)) |
|
|
|
hasRemoteDatFile = volFileInfoResp.VolumeInfo != nil && len(volFileInfoResp.VolumeInfo.Files) > 0 |
|
|
|
|
|
|
|
util.WriteFile(dataBaseFileName+".note", []byte(fmt.Sprintf("copying from %s", req.SourceDataNode)), 0755) |
|
|
|
|
|
|
@ -95,7 +97,7 @@ func (vs *VolumeServer) VolumeCopy(req *volume_server_pb.VolumeCopyRequest, stre |
|
|
|
glog.V(0).Infof("connect to %s: %v", vs.GetMaster(), grpcErr) |
|
|
|
} |
|
|
|
|
|
|
|
if preallocateSize > 0 { |
|
|
|
if preallocateSize > 0 && !hasRemoteDatFile { |
|
|
|
volumeFile := dataBaseFileName + ".dat" |
|
|
|
_, err := backend.CreateVolumeFile(volumeFile, preallocateSize, 0) |
|
|
|
if err != nil { |
|
|
@ -116,23 +118,26 @@ func (vs *VolumeServer) VolumeCopy(req *volume_server_pb.VolumeCopyRequest, stre |
|
|
|
ioBytePerSecond = req.IoBytePerSecond |
|
|
|
} |
|
|
|
throttler := util.NewWriteThrottler(ioBytePerSecond) |
|
|
|
if modifiedTsNs, err = vs.doCopyFileWithThrottler(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, dataBaseFileName, ".dat", false, true, func(processed int64) bool { |
|
|
|
if processed > nextReportTarget { |
|
|
|
copyResponse.ProcessedBytes = processed |
|
|
|
if sendErr = stream.Send(copyResponse); sendErr != nil { |
|
|
|
return false |
|
|
|
|
|
|
|
if !hasRemoteDatFile { |
|
|
|
if modifiedTsNs, err = vs.doCopyFileWithThrottler(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, dataBaseFileName, ".dat", false, true, func(processed int64) bool { |
|
|
|
if processed > nextReportTarget { |
|
|
|
copyResponse.ProcessedBytes = processed |
|
|
|
if sendErr = stream.Send(copyResponse); sendErr != nil { |
|
|
|
return false |
|
|
|
} |
|
|
|
nextReportTarget = processed + reportInterval |
|
|
|
} |
|
|
|
nextReportTarget = processed + reportInterval |
|
|
|
return true |
|
|
|
}, throttler); err != nil { |
|
|
|
return err |
|
|
|
} |
|
|
|
if sendErr != nil { |
|
|
|
return sendErr |
|
|
|
} |
|
|
|
if modifiedTsNs > 0 { |
|
|
|
os.Chtimes(dataBaseFileName+".dat", time.Unix(0, modifiedTsNs), time.Unix(0, modifiedTsNs)) |
|
|
|
} |
|
|
|
return true |
|
|
|
}, throttler); err != nil { |
|
|
|
return err |
|
|
|
} |
|
|
|
if sendErr != nil { |
|
|
|
return sendErr |
|
|
|
} |
|
|
|
if modifiedTsNs > 0 { |
|
|
|
os.Chtimes(dataBaseFileName+".dat", time.Unix(0, modifiedTsNs), time.Unix(0, modifiedTsNs)) |
|
|
|
} |
|
|
|
|
|
|
|
if modifiedTsNs, err = vs.doCopyFileWithThrottler(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.IdxFileSize, indexBaseFileName, ".idx", false, false, nil, throttler); err != nil { |
|
|
@ -172,7 +177,7 @@ func (vs *VolumeServer) VolumeCopy(req *volume_server_pb.VolumeCopyRequest, stre |
|
|
|
} |
|
|
|
}() |
|
|
|
|
|
|
|
if err = checkCopyFiles(volFileInfoResp, idxFileName, datFileName); err != nil { // added by panyc16
|
|
|
|
if err = checkCopyFiles(volFileInfoResp, hasRemoteDatFile, idxFileName, datFileName); err != nil { // added by panyc16
|
|
|
|
return err |
|
|
|
} |
|
|
|
|
|
|
@ -224,7 +229,7 @@ func (vs *VolumeServer) doCopyFileWithThrottler(client volume_server_pb.VolumeSe |
|
|
|
only check the the differ of the file size |
|
|
|
todo: maybe should check the received count and deleted count of the volume |
|
|
|
*/ |
|
|
|
func checkCopyFiles(originFileInf *volume_server_pb.ReadVolumeFileStatusResponse, idxFileName, datFileName string) error { |
|
|
|
func checkCopyFiles(originFileInf *volume_server_pb.ReadVolumeFileStatusResponse, hasRemoteDatFile bool, idxFileName, datFileName string) error { |
|
|
|
stat, err := os.Stat(idxFileName) |
|
|
|
if err != nil { |
|
|
|
return fmt.Errorf("stat idx file %s failed: %v", idxFileName, err) |
|
|
@ -234,6 +239,10 @@ func checkCopyFiles(originFileInf *volume_server_pb.ReadVolumeFileStatusResponse |
|
|
|
idxFileName, stat.Size(), originFileInf.IdxFileSize) |
|
|
|
} |
|
|
|
|
|
|
|
if hasRemoteDatFile { |
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
stat, err = os.Stat(datFileName) |
|
|
|
if err != nil { |
|
|
|
return fmt.Errorf("get dat file info failed, %v", err) |
|
|
@ -298,6 +307,7 @@ func (vs *VolumeServer) ReadVolumeFileStatus(ctx context.Context, req *volume_se |
|
|
|
resp.CompactionRevision = uint32(v.CompactionRevision) |
|
|
|
resp.Collection = v.Collection |
|
|
|
resp.DiskType = string(v.DiskType()) |
|
|
|
resp.VolumeInfo = v.GetVolumeInfo() |
|
|
|
return resp, nil |
|
|
|
} |
|
|
|
|
|
|
|