|
|
@ -79,17 +79,27 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo |
|
|
|
}() |
|
|
|
|
|
|
|
// println("source:", volFileInfoResp.String())
|
|
|
|
if err = vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, dataBaseFileName, ".dat", false, true); err != nil { |
|
|
|
var modifiedTsNs int64 |
|
|
|
if modifiedTsNs, err = vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, dataBaseFileName, ".dat", false, true); err != nil { |
|
|
|
return err |
|
|
|
} |
|
|
|
if modifiedTsNs > 0 { |
|
|
|
os.Chtimes(dataBaseFileName + ".dat", time.Unix(0, modifiedTsNs), time.Unix(0, modifiedTsNs)) |
|
|
|
} |
|
|
|
|
|
|
|
if err = vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.IdxFileSize, indexBaseFileName, ".idx", false, false); err != nil { |
|
|
|
if modifiedTsNs, err = vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.IdxFileSize, indexBaseFileName, ".idx", false, false); err != nil { |
|
|
|
return err |
|
|
|
} |
|
|
|
if modifiedTsNs > 0 { |
|
|
|
os.Chtimes(indexBaseFileName + ".idx", time.Unix(0, modifiedTsNs), time.Unix(0, modifiedTsNs)) |
|
|
|
} |
|
|
|
|
|
|
|
if err = vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, dataBaseFileName, ".vif", false, true); err != nil { |
|
|
|
if modifiedTsNs, err = vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, dataBaseFileName, ".vif", false, true); err != nil { |
|
|
|
return err |
|
|
|
} |
|
|
|
if modifiedTsNs > 0 { |
|
|
|
os.Chtimes(dataBaseFileName + ".vif", time.Unix(0, modifiedTsNs), time.Unix(0, modifiedTsNs)) |
|
|
|
} |
|
|
|
|
|
|
|
os.Remove(dataBaseFileName + ".note") |
|
|
|
|
|
|
@ -129,7 +139,7 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo |
|
|
|
}, err |
|
|
|
} |
|
|
|
|
|
|
|
func (vs *VolumeServer) doCopyFile(client volume_server_pb.VolumeServerClient, isEcVolume bool, collection string, vid, compactRevision uint32, stopOffset uint64, baseFileName, ext string, isAppend, ignoreSourceFileNotFound bool) error { |
|
|
|
func (vs *VolumeServer) doCopyFile(client volume_server_pb.VolumeServerClient, isEcVolume bool, collection string, vid, compactRevision uint32, stopOffset uint64, baseFileName, ext string, isAppend, ignoreSourceFileNotFound bool) (modifiedTsNs int64, err error) { |
|
|
|
|
|
|
|
copyFileClient, err := client.CopyFile(context.Background(), &volume_server_pb.CopyFileRequest{ |
|
|
|
VolumeId: vid, |
|
|
@ -141,15 +151,15 @@ func (vs *VolumeServer) doCopyFile(client volume_server_pb.VolumeServerClient, i |
|
|
|
IgnoreSourceFileNotFound: ignoreSourceFileNotFound, |
|
|
|
}) |
|
|
|
if err != nil { |
|
|
|
return fmt.Errorf("failed to start copying volume %d %s file: %v", vid, ext, err) |
|
|
|
return modifiedTsNs, fmt.Errorf("failed to start copying volume %d %s file: %v", vid, ext, err) |
|
|
|
} |
|
|
|
|
|
|
|
err = writeToFile(copyFileClient, baseFileName+ext, util.NewWriteThrottler(vs.compactionBytePerSecond), isAppend) |
|
|
|
modifiedTsNs, err = writeToFile(copyFileClient, baseFileName+ext, util.NewWriteThrottler(vs.compactionBytePerSecond), isAppend) |
|
|
|
if err != nil { |
|
|
|
return fmt.Errorf("failed to copy %s file: %v", baseFileName+ext, err) |
|
|
|
return modifiedTsNs, fmt.Errorf("failed to copy %s file: %v", baseFileName+ext, err) |
|
|
|
} |
|
|
|
|
|
|
|
return nil |
|
|
|
return modifiedTsNs, nil |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
@ -157,7 +167,7 @@ func (vs *VolumeServer) doCopyFile(client volume_server_pb.VolumeServerClient, i |
|
|
|
only check the the differ of the file size |
|
|
|
todo: maybe should check the received count and deleted count of the volume |
|
|
|
*/ |
|
|
|
func checkCopyFiles(originFileInf *volume_server_pb.ReadVolumeFileStatusResponse, idxFileName, datFileName string) error { |
|
|
|
func checkCopyFiles(originFileInf *volume_server_pb.ReadVolumeFileStatusResponse, idxFileName, datFileName string) (error) { |
|
|
|
stat, err := os.Stat(idxFileName) |
|
|
|
if err != nil { |
|
|
|
return fmt.Errorf("stat idx file %s failed: %v", idxFileName, err) |
|
|
@ -178,7 +188,7 @@ func checkCopyFiles(originFileInf *volume_server_pb.ReadVolumeFileStatusResponse |
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName string, wt *util.WriteThrottler, isAppend bool) error { |
|
|
|
func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName string, wt *util.WriteThrottler, isAppend bool) (modifiedTsNs int64, err error) { |
|
|
|
glog.V(4).Infof("writing to %s", fileName) |
|
|
|
flags := os.O_WRONLY | os.O_CREATE | os.O_TRUNC |
|
|
|
if isAppend { |
|
|
@ -186,7 +196,7 @@ func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName s |
|
|
|
} |
|
|
|
dst, err := os.OpenFile(fileName, flags, 0644) |
|
|
|
if err != nil { |
|
|
|
return nil |
|
|
|
return modifiedTsNs, nil |
|
|
|
} |
|
|
|
defer dst.Close() |
|
|
|
|
|
|
@ -195,13 +205,16 @@ func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName s |
|
|
|
if receiveErr == io.EOF { |
|
|
|
break |
|
|
|
} |
|
|
|
if resp.ModifiedTsNs != 0 { |
|
|
|
modifiedTsNs = resp.ModifiedTsNs |
|
|
|
} |
|
|
|
if receiveErr != nil { |
|
|
|
return fmt.Errorf("receiving %s: %v", fileName, receiveErr) |
|
|
|
return modifiedTsNs, fmt.Errorf("receiving %s: %v", fileName, receiveErr) |
|
|
|
} |
|
|
|
dst.Write(resp.FileContent) |
|
|
|
wt.MaybeSlowdown(int64(len(resp.FileContent))) |
|
|
|
} |
|
|
|
return nil |
|
|
|
return modifiedTsNs, nil |
|
|
|
} |
|
|
|
|
|
|
|
func (vs *VolumeServer) ReadVolumeFileStatus(ctx context.Context, req *volume_server_pb.ReadVolumeFileStatusRequest) (*volume_server_pb.ReadVolumeFileStatusResponse, error) { |
|
|
@ -271,6 +284,12 @@ func (vs *VolumeServer) CopyFile(req *volume_server_pb.CopyFileRequest, stream v |
|
|
|
} |
|
|
|
defer file.Close() |
|
|
|
|
|
|
|
fileInfo, err := file.Stat() |
|
|
|
if err != nil { |
|
|
|
return err |
|
|
|
} |
|
|
|
fileModTsNs := fileInfo.ModTime().UnixNano() |
|
|
|
|
|
|
|
buffer := make([]byte, BufferSizeLimit) |
|
|
|
|
|
|
|
for bytesToRead > 0 { |
|
|
@ -290,12 +309,14 @@ func (vs *VolumeServer) CopyFile(req *volume_server_pb.CopyFileRequest, stream v |
|
|
|
bytesread = int(bytesToRead) |
|
|
|
} |
|
|
|
err = stream.Send(&volume_server_pb.CopyFileResponse{ |
|
|
|
FileContent: buffer[:bytesread], |
|
|
|
FileContent: buffer[:bytesread], |
|
|
|
ModifiedTsNs: fileModTsNs, |
|
|
|
}) |
|
|
|
if err != nil { |
|
|
|
// println("sending", bytesread, "bytes err", err.Error())
|
|
|
|
return err |
|
|
|
} |
|
|
|
fileModTsNs = 0 // only send once
|
|
|
|
|
|
|
|
bytesToRead -= int64(bytesread) |
|
|
|
|
|
|
|