|
|
package weed_server
import ( "context" "fmt" "io"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" "github.com/chrislusf/seaweedfs/weed/storage/backend" "github.com/chrislusf/seaweedfs/weed/storage/needle" )
func (vs *VolumeServer) VolumeIncrementalCopy(req *volume_server_pb.VolumeIncrementalCopyRequest, stream volume_server_pb.VolumeServer_VolumeIncrementalCopyServer) error {
v := vs.store.GetVolume(needle.VolumeId(req.VolumeId)) if v == nil { return fmt.Errorf("not found volume id %d", req.VolumeId) }
stopOffset, _, _ := v.FileStat() foundOffset, isLastOne, err := v.BinarySearchByAppendAtNs(req.SinceNs) if err != nil { return fmt.Errorf("fail to locate by appendAtNs %d: %s", req.SinceNs, err) }
if isLastOne { return nil }
startOffset := foundOffset.ToAcutalOffset()
buf := make([]byte, 1024*1024*2) return sendFileContent(v.DataBackend, buf, startOffset, int64(stopOffset), stream)
}
func (vs *VolumeServer) VolumeSyncStatus(ctx context.Context, req *volume_server_pb.VolumeSyncStatusRequest) (*volume_server_pb.VolumeSyncStatusResponse, error) {
v := vs.store.GetVolume(needle.VolumeId(req.VolumeId)) if v == nil { return nil, fmt.Errorf("not found volume id %d", req.VolumeId) }
resp := v.GetVolumeSyncStatus()
return resp, nil
}
func sendFileContent(datBackend backend.DataStorageBackend, buf []byte, startOffset, stopOffset int64, stream volume_server_pb.VolumeServer_VolumeIncrementalCopyServer) error { var blockSizeLimit = int64(len(buf)) for i := int64(0); i < stopOffset-startOffset; i += blockSizeLimit { n, readErr := datBackend.ReadAt(buf, startOffset+i) if readErr == nil || readErr == io.EOF { resp := &volume_server_pb.VolumeIncrementalCopyResponse{} resp.FileContent = buf[:int64(n)] sendErr := stream.Send(resp) if sendErr != nil { return sendErr } } else { return readErr } } return nil }
|