@ -22,7 +22,7 @@ import (
const BufferSizeLimit = 1024 * 1024 * 2
// VolumeCopy copy the .idx .dat .vif files, and mount the volume
func ( vs * VolumeServer ) VolumeCopy ( ctx context . Context , req * volume_server_pb . VolumeCopyRequest ) ( * volume_server_pb . VolumeCopyResponse , error ) {
func ( vs * VolumeServer ) VolumeCopy ( req * volume_server_pb . VolumeCopyRequest , stream volume_server_pb . VolumeServer_VolumeCopyServer ) error {
v := vs . store . GetVolume ( needle . VolumeId ( req . VolumeId ) )
if v != nil {
@ -31,7 +31,7 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo
err := vs . store . DeleteVolume ( needle . VolumeId ( req . VolumeId ) )
if err != nil {
return nil , fmt . Errorf ( "failed to delete existing volume %d: %v" , req . VolumeId , err )
return fmt . Errorf ( "failed to delete existing volume %d: %v" , req . VolumeId , err )
}
glog . V ( 0 ) . Infof ( "deleted existing volume %d before copying." , req . VolumeId )
@ -79,22 +79,38 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo
} ( )
// println("source:", volFileInfoResp.String())
copyResponse := & volume_server_pb . VolumeCopyResponse { }
reportInterval := int64 ( 1024 * 1024 * 128 )
nextReportTarget := reportInterval
var modifiedTsNs int64
if modifiedTsNs , err = vs . doCopyFile ( client , false , req . Collection , req . VolumeId , volFileInfoResp . CompactionRevision , volFileInfoResp . DatFileSize , dataBaseFileName , ".dat" , false , true ) ; err != nil {
var sendErr error
if modifiedTsNs , err = vs . doCopyFile ( client , false , req . Collection , req . VolumeId , volFileInfoResp . CompactionRevision , volFileInfoResp . DatFileSize , dataBaseFileName , ".dat" , false , true , func ( processed int64 ) bool {
if processed > nextReportTarget {
copyResponse . ProcessedBytes = processed
if sendErr = stream . Send ( copyResponse ) ; sendErr != nil {
return false
}
nextReportTarget = processed + reportInterval
}
return true
} ) ; err != nil {
return err
}
if sendErr != nil {
return sendErr
}
if modifiedTsNs > 0 {
os . Chtimes ( dataBaseFileName + ".dat" , time . Unix ( 0 , modifiedTsNs ) , time . Unix ( 0 , modifiedTsNs ) )
}
if modifiedTsNs , err = vs . doCopyFile ( client , false , req . Collection , req . VolumeId , volFileInfoResp . CompactionRevision , volFileInfoResp . IdxFileSize , indexBaseFileName , ".idx" , false , false ) ; err != nil {
if modifiedTsNs , err = vs . doCopyFile ( client , false , req . Collection , req . VolumeId , volFileInfoResp . CompactionRevision , volFileInfoResp . IdxFileSize , indexBaseFileName , ".idx" , false , false , nil ) ; err != nil {
return err
}
if modifiedTsNs > 0 {
os . Chtimes ( indexBaseFileName + ".idx" , time . Unix ( 0 , modifiedTsNs ) , time . Unix ( 0 , modifiedTsNs ) )
}
if modifiedTsNs , err = vs . doCopyFile ( client , false , req . Collection , req . VolumeId , volFileInfoResp . CompactionRevision , volFileInfoResp . DatFileSize , dataBaseFileName , ".vif" , false , true ) ; err != nil {
if modifiedTsNs , err = vs . doCopyFile ( client , false , req . Collection , req . VolumeId , volFileInfoResp . CompactionRevision , volFileInfoResp . DatFileSize , dataBaseFileName , ".vif" , false , true , nil ) ; err != nil {
return err
}
if modifiedTsNs > 0 {
@ -107,10 +123,10 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo
} )
if err != nil {
return nil , err
return err
}
if dataBaseFileName == "" {
return nil , fmt . Errorf ( "not found volume %d file" , req . VolumeId )
return fmt . Errorf ( "not found volume %d file" , req . VolumeId )
}
idxFileName = indexBaseFileName + ".idx"
@ -125,21 +141,25 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo
} ( )
if err = checkCopyFiles ( volFileInfoResp , idxFileName , datFileName ) ; err != nil { // added by panyc16
return nil , err
return err
}
// mount the volume
err = vs . store . MountVolume ( needle . VolumeId ( req . VolumeId ) )
if err != nil {
return nil , fmt . Errorf ( "failed to mount volume %d: %v" , req . VolumeId , err )
return fmt . Errorf ( "failed to mount volume %d: %v" , req . VolumeId , err )
}
return & volume_server_pb . VolumeCopyResponse {
if err = stream . Send ( & volume_server_pb . VolumeCopyResponse {
LastAppendAtNs : volFileInfoResp . DatFileTimestampSeconds * uint64 ( time . Second ) ,
} , err
} ) ; err != nil {
glog . Errorf ( "send response: %v" , err )
}
return err
}
func ( vs * VolumeServer ) doCopyFile ( client volume_server_pb . VolumeServerClient , isEcVolume bool , collection string , vid , compactRevision uint32 , stopOffset uint64 , baseFileName , ext string , isAppend , ignoreSourceFileNotFound bool ) ( modifiedTsNs int64 , err error ) {
func ( vs * VolumeServer ) doCopyFile ( client volume_server_pb . VolumeServerClient , isEcVolume bool , collection string , vid , compactRevision uint32 , stopOffset uint64 , baseFileName , ext string , isAppend , ignoreSourceFileNotFound bool , progressFn storage . ProgressFunc ) ( modifiedTsNs int64 , err error ) {
copyFileClient , err := client . CopyFile ( context . Background ( ) , & volume_server_pb . CopyFileRequest {
VolumeId : vid ,
@ -154,7 +174,7 @@ func (vs *VolumeServer) doCopyFile(client volume_server_pb.VolumeServerClient, i
return modifiedTsNs , fmt . Errorf ( "failed to start copying volume %d %s file: %v" , vid , ext , err )
}
modifiedTsNs , err = writeToFile ( copyFileClient , baseFileName + ext , util . NewWriteThrottler ( vs . compactionBytePerSecond ) , isAppend )
modifiedTsNs , err = writeToFile ( copyFileClient , baseFileName + ext , util . NewWriteThrottler ( vs . compactionBytePerSecond ) , isAppend , progressFn )
if err != nil {
return modifiedTsNs , fmt . Errorf ( "failed to copy %s file: %v" , baseFileName + ext , err )
}
@ -188,7 +208,7 @@ func checkCopyFiles(originFileInf *volume_server_pb.ReadVolumeFileStatusResponse
return nil
}
func writeToFile ( client volume_server_pb . VolumeServer_CopyFileClient , fileName string , wt * util . WriteThrottler , isAppend bool ) ( modifiedTsNs int64 , err error ) {
func writeToFile ( client volume_server_pb . VolumeServer_CopyFileClient , fileName string , wt * util . WriteThrottler , isAppend bool , progressFn storage . ProgressFunc ) ( modifiedTsNs int64 , err error ) {
glog . V ( 4 ) . Infof ( "writing to %s" , fileName )
flags := os . O_WRONLY | os . O_CREATE | os . O_TRUNC
if isAppend {
@ -200,6 +220,7 @@ func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName s
}
defer dst . Close ( )
var progressedBytes int64
for {
resp , receiveErr := client . Recv ( )
if receiveErr == io . EOF {
@ -212,6 +233,12 @@ func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName s
return modifiedTsNs , fmt . Errorf ( "receiving %s: %v" , fileName , receiveErr )
}
dst . Write ( resp . FileContent )
progressedBytes += int64 ( len ( resp . FileContent ) )
if progressFn != nil {
if ! progressFn ( progressedBytes ) {
return modifiedTsNs , fmt . Errorf ( "interrupted copy operation" )
}
}
wt . MaybeSlowdown ( int64 ( len ( resp . FileContent ) ) )
}
return modifiedTsNs , nil