diff --git a/test/volume_server/framework/cluster_dual.go b/test/volume_server/framework/cluster_dual.go index 144f268cf..f19931ad0 100644 --- a/test/volume_server/framework/cluster_dual.go +++ b/test/volume_server/framework/cluster_dual.go @@ -291,3 +291,7 @@ func (c *DualVolumeCluster) VolumeAdminURL(index int) string { func (c *DualVolumeCluster) VolumePublicURL(index int) string { return "http://" + c.VolumePublicAddress(index) } + +func (c *DualVolumeCluster) BaseDir() string { + return c.baseDir +} diff --git a/test/volume_server/grpc/move_tail_timestamp_test.go b/test/volume_server/grpc/move_tail_timestamp_test.go new file mode 100644 index 000000000..416235133 --- /dev/null +++ b/test/volume_server/grpc/move_tail_timestamp_test.go @@ -0,0 +1,279 @@ +package volume_server_grpc_test + +import ( + "bytes" + "context" + "fmt" + "io" + "net/http" + "os" + "path/filepath" + "strings" + "sync" + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/test/volume_server/framework" + "github.com/seaweedfs/seaweedfs/test/volume_server/matrix" + "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" + "github.com/seaweedfs/seaweedfs/weed/storage" + "github.com/seaweedfs/seaweedfs/weed/storage/backend" + "github.com/seaweedfs/seaweedfs/weed/storage/idx" + "github.com/seaweedfs/seaweedfs/weed/storage/needle" + "github.com/seaweedfs/seaweedfs/weed/storage/types" + "github.com/seaweedfs/seaweedfs/weed/util" +) + +func TestVolumeCopyReturnsPreciseLastAppendTimestamp(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test in short mode") + } + + cluster := framework.StartDualVolumeCluster(t, matrix.P1()) + sourceConn, sourceClient := framework.DialVolumeServer(t, cluster.VolumeGRPCAddress(0)) + defer sourceConn.Close() + destConn, destClient := framework.DialVolumeServer(t, cluster.VolumeGRPCAddress(1)) + defer destConn.Close() + + const volumeID = uint32(999) + framework.AllocateVolume(t, sourceClient, volumeID, "") + + httpClient := framework.NewHTTPClient() + fid := framework.NewFileID(volumeID, 1, 0x42) + payload := []byte("move-tail-timestamp-payload") + uploadResp := framework.UploadBytes(t, httpClient, cluster.VolumeAdminURL(0), fid, payload) + _ = framework.ReadAllAndClose(t, uploadResp) + if uploadResp.StatusCode != http.StatusCreated { + t.Fatalf("source upload failed: status %d", uploadResp.StatusCode) + } + + sourceDir := filepath.Join(cluster.BaseDir(), "volume0") + datPath := storage.VolumeFileName(sourceDir, "", int(volumeID)) + ".dat" + futureTime := time.Now().Add(2 * time.Hour) + if err := os.Chtimes(datPath, futureTime, futureTime); err != nil { + t.Fatalf("set future dat timestamp: %v", err) + } + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + sourceDataNode := cluster.VolumeAdminAddress(0) + "." + strings.Split(cluster.VolumeGRPCAddress(0), ":")[1] + copyStream, err := destClient.VolumeCopy(ctx, &volume_server_pb.VolumeCopyRequest{ + VolumeId: volumeID, + SourceDataNode: sourceDataNode, + }) + if err != nil { + t.Fatalf("VolumeCopy start failed: %v", err) + } + + var lastAppendAtNs uint64 + for { + resp, recvErr := copyStream.Recv() + if recvErr == io.EOF { + break + } + if recvErr != nil { + t.Fatalf("VolumeCopy recv failed: %v", recvErr) + } + if ts := resp.GetLastAppendAtNs(); ts > 0 { + lastAppendAtNs = ts + } + } + if lastAppendAtNs == 0 { + t.Fatalf("volume copy did not return a last append timestamp") + } + + destDir := filepath.Join(cluster.BaseDir(), "volume1") + actualLastAppend := readLastAppendAtNs(t, destDir, volumeID) + if actualLastAppend == 0 { + t.Fatalf("failed to compute last append timestamp from destination files") + } + + if lastAppendAtNs != actualLastAppend { + t.Fatalf("last append timestamp mismatch: got %d, actual %d", lastAppendAtNs, actualLastAppend) + } +} + +func readLastAppendAtNs(t testing.TB, volumeDir string, volumeID uint32) uint64 { + t.Helper() + + baseName := storage.VolumeFileName(volumeDir, "", int(volumeID)) + idxPath := baseName + ".idx" + idxFile, err := os.Open(idxPath) + if err != nil { + t.Fatalf("open idx file %s: %v", idxPath, err) + } + defer idxFile.Close() + + stat, err := idxFile.Stat() + if err != nil { + t.Fatalf("stat idx file %s: %v", idxPath, err) + } + if stat.Size() == 0 { + return 0 + } + if stat.Size()%int64(types.NeedleMapEntrySize) != 0 { + t.Fatalf("unexpected idx file size %d", stat.Size()) + } + + buf := make([]byte, types.NeedleMapEntrySize) + if _, err := idxFile.ReadAt(buf, stat.Size()-int64(types.NeedleMapEntrySize)); err != nil { + t.Fatalf("read idx entry: %v", err) + } + _, offset, _ := idx.IdxFileEntry(buf) + if offset.IsZero() { + return 0 + } + + datPath := baseName + ".dat" + datFile, err := os.Open(datPath) + if err != nil { + t.Fatalf("open dat file %s: %v", datPath, err) + } + defer datFile.Close() + + datBackend := backend.NewDiskFile(datFile) + n, _, _, err := needle.ReadNeedleHeader(datBackend, needle.GetCurrentVersion(), offset.ToActualOffset()) + if err != nil { + t.Fatalf("read needle header: %v", err) + } + + tailOffset := offset.ToActualOffset() + int64(types.NeedleHeaderSize) + int64(n.Size) + tail := make([]byte, needle.NeedleChecksumSize+types.TimestampSize) + readCount, readErr := datBackend.ReadAt(tail, tailOffset) + if readErr == io.EOF && readCount == len(tail) { + readErr = nil + } + if readErr != nil { + t.Fatalf("read needle tail: %v", readErr) + } + + return util.BytesToUint64(tail[needle.NeedleChecksumSize : needle.NeedleChecksumSize+types.TimestampSize]) +} + +func TestVolumeMoveHandlesInFlightWrites(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test in short mode") + } + + cluster := framework.StartDualVolumeCluster(t, matrix.P1()) + sourceConn, sourceClient := framework.DialVolumeServer(t, cluster.VolumeGRPCAddress(0)) + defer sourceConn.Close() + destConn, destClient := framework.DialVolumeServer(t, cluster.VolumeGRPCAddress(1)) + defer destConn.Close() + + const volumeID = uint32(988) + framework.AllocateVolume(t, sourceClient, volumeID, "") + + httpClient := framework.NewHTTPClient() + fid := framework.NewFileID(volumeID, 7, 0xABCDEF01) + payload := []byte("volume-move-live-payload") + uploadResp := framework.UploadBytes(t, httpClient, cluster.VolumeAdminURL(0), fid, payload) + _ = framework.ReadAllAndClose(t, uploadResp) + if uploadResp.StatusCode != http.StatusCreated { + t.Fatalf("initial upload failed: %d", uploadResp.StatusCode) + } + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + sourceDataNode := cluster.VolumeAdminAddress(0) + "." + strings.Split(cluster.VolumeGRPCAddress(0), ":")[1] + copyStream, err := destClient.VolumeCopy(ctx, &volume_server_pb.VolumeCopyRequest{ + VolumeId: volumeID, + SourceDataNode: sourceDataNode, + }) + if err != nil { + t.Fatalf("VolumeCopy start failed: %v", err) + } + + var lastAppendAtNs uint64 + for { + resp, recvErr := copyStream.Recv() + if recvErr == io.EOF { + break + } + if recvErr != nil { + t.Fatalf("VolumeCopy recv failed: %v", recvErr) + } + if ts := resp.GetLastAppendAtNs(); ts > 0 { + lastAppendAtNs = ts + } + } + if lastAppendAtNs == 0 { + t.Fatalf("volume copy did not return a last append timestamp") + } + + type written struct { + fid string + data []byte + } + + var writesMu sync.Mutex + var writes []written + writeCtx, writeCancel := context.WithCancel(context.Background()) + var writerWG sync.WaitGroup + writerWG.Add(1) + go func() { + defer writerWG.Done() + client := framework.NewHTTPClient() + for i := 0; i < 12; i++ { + select { + case <-writeCtx.Done(): + return + default: + } + livePayload := []byte("live-data-" + fmt.Sprintf("%02d", i)) + liveFid := framework.NewFileID(volumeID, uint64(2000+i), 0xAAAA1111+uint32(i)) + resp := framework.UploadBytes(t, client, cluster.VolumeAdminURL(0), liveFid, livePayload) + _ = framework.ReadAllAndClose(t, resp) + if resp.StatusCode != http.StatusCreated { + t.Fatalf("live upload failed: %d", resp.StatusCode) + } + writesMu.Lock() + writes = append(writes, written{fid: liveFid, data: livePayload}) + writesMu.Unlock() + time.Sleep(250 * time.Millisecond) + } + }() + + tailCtx, tailCancel := context.WithTimeout(context.Background(), 60*time.Second) + defer tailCancel() + _, err = destClient.VolumeTailReceiver(tailCtx, &volume_server_pb.VolumeTailReceiverRequest{ + VolumeId: volumeID, + SourceVolumeServer: sourceDataNode, + SinceNs: lastAppendAtNs, + IdleTimeoutSeconds: 3, + }) + if err != nil { + writeCancel() + writerWG.Wait() + t.Fatalf("VolumeTailReceiver failed: %v", err) + } + + writeCancel() + writerWG.Wait() + + writesMu.Lock() + sampleCount := len(writes) + if sampleCount == 0 { + writesMu.Unlock() + t.Fatal("no live writes captured") + } + sample := writes + if sampleCount > 3 { + sample = writes[sampleCount-3:] + } + writesMu.Unlock() + + httpCheckClient := framework.NewHTTPClient() + for _, w := range sample { + resp := framework.ReadBytes(t, httpCheckClient, cluster.VolumeAdminURL(1), w.fid) + body := framework.ReadAllAndClose(t, resp) + if resp.StatusCode != http.StatusOK { + t.Fatalf("dest read %s status %d", w.fid, resp.StatusCode) + } + if !bytes.Equal(body, w.data) { + t.Fatalf("dest read body mismatch for %s: %q vs %q", w.fid, string(body), string(w.data)) + } + } +} diff --git a/weed/server/volume_grpc_copy.go b/weed/server/volume_grpc_copy.go index a6c0e5a20..317ca5907 100644 --- a/weed/server/volume_grpc_copy.go +++ b/weed/server/volume_grpc_copy.go @@ -17,6 +17,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" "github.com/seaweedfs/seaweedfs/weed/storage" "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" + "github.com/seaweedfs/seaweedfs/weed/storage/idx" "github.com/seaweedfs/seaweedfs/weed/storage/needle" "github.com/seaweedfs/seaweedfs/weed/storage/types" "github.com/seaweedfs/seaweedfs/weed/util" @@ -187,6 +188,15 @@ func (vs *VolumeServer) VolumeCopy(req *volume_server_pb.VolumeCopyRequest, stre return err } + var lastAppendAtNs = volFileInfoResp.DatFileTimestampSeconds * uint64(time.Second) + if !hasRemoteDatFile { + if appendAtNs, appendErr := findLastAppendAtNsFromCopiedFiles(idxFileName, datFileName, needle.Version(volFileInfoResp.Version)); appendErr == nil && appendAtNs > 0 { + lastAppendAtNs = appendAtNs + } else if appendErr != nil { + glog.V(1).Infof("failed to find last append timestamp for volume %d: %v", req.VolumeId, appendErr) + } + } + // mount the volume err = vs.store.MountVolume(needle.VolumeId(req.VolumeId)) if err != nil { @@ -194,7 +204,7 @@ func (vs *VolumeServer) VolumeCopy(req *volume_server_pb.VolumeCopyRequest, stre } if err = stream.Send(&volume_server_pb.VolumeCopyResponse{ - LastAppendAtNs: volFileInfoResp.DatFileTimestampSeconds * uint64(time.Second), + LastAppendAtNs: lastAppendAtNs, }); err != nil { glog.Errorf("send response: %v", err) } @@ -264,6 +274,62 @@ func checkCopyFiles(originFileInf *volume_server_pb.ReadVolumeFileStatusResponse return nil } +func findLastAppendAtNsFromCopiedFiles(idxFileName, datFileName string, version needle.Version) (uint64, error) { + if version < needle.Version3 { + return 0, nil + } + + idxFile, err := os.Open(idxFileName) + if err != nil { + return 0, fmt.Errorf("open idx file %s: %w", idxFileName, err) + } + defer idxFile.Close() + + fi, err := idxFile.Stat() + if err != nil { + return 0, fmt.Errorf("stat idx file %s: %w", idxFileName, err) + } + if fi.Size() == 0 { + return 0, nil + } + if fi.Size()%int64(types.NeedleMapEntrySize) != 0 { + return 0, fmt.Errorf("unexpected idx file %s size: %d", idxFileName, fi.Size()) + } + + buf := make([]byte, types.NeedleMapEntrySize) + if _, err := idxFile.ReadAt(buf, fi.Size()-int64(types.NeedleMapEntrySize)); err != nil { + return 0, fmt.Errorf("read idx file %s: %w", idxFileName, err) + } + _, offset, _ := idx.IdxFileEntry(buf) + if offset.IsZero() { + return 0, nil + } + + datFile, err := os.Open(datFileName) + if err != nil { + return 0, fmt.Errorf("open dat file %s: %w", datFileName, err) + } + defer datFile.Close() + + datBackend := backend.NewDiskFile(datFile) + n, _, _, err := needle.ReadNeedleHeader(datBackend, version, offset.ToActualOffset()) + if err != nil { + return 0, fmt.Errorf("read needle header %s offset %d: %w", datFileName, offset.ToActualOffset(), err) + } + + tailOffset := offset.ToActualOffset() + int64(types.NeedleHeaderSize) + int64(n.Size) + tail := make([]byte, needle.NeedleChecksumSize+types.TimestampSize) + readCount, readErr := datBackend.ReadAt(tail, tailOffset) + if readErr == io.EOF && readCount == len(tail) { + readErr = nil + } + if readErr != nil { + return 0, fmt.Errorf("read needle tail %s offset %d: %w", datFileName, tailOffset, readErr) + } + + return util.BytesToUint64(tail[needle.NeedleChecksumSize : needle.NeedleChecksumSize+types.TimestampSize]), nil +} + func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName string, wt *util.WriteThrottler, isAppend bool, progressFn storage.ProgressFunc) (modifiedTsNs int64, err error) { glog.V(4).Infof("writing to %s", fileName) flags := os.O_WRONLY | os.O_CREATE | os.O_TRUNC diff --git a/weed/shell/command_volume_copy.go b/weed/shell/command_volume_copy.go index 3156be8b5..9cbabdb23 100644 --- a/weed/shell/command_volume_copy.go +++ b/weed/shell/command_volume_copy.go @@ -60,6 +60,6 @@ func (c *commandVolumeCopy) Do(args []string, commandEnv *CommandEnv, writer io. return fmt.Errorf("source and target volume servers are the same!") } - _, err = copyVolume(commandEnv.option.GrpcDialOption, writer, volumeId, sourceVolumeServer, targetVolumeServer, "", 0) + _, err = copyVolume(commandEnv.option.GrpcDialOption, writer, volumeId, sourceVolumeServer, targetVolumeServer, "", 0, true) return } diff --git a/weed/shell/command_volume_move.go b/weed/shell/command_volume_move.go index 26fd5fc58..05bc00d44 100644 --- a/weed/shell/command_volume_move.go +++ b/weed/shell/command_volume_move.go @@ -38,13 +38,10 @@ func (c *commandVolumeMove) Help() string { This command move a live volume from one volume server to another volume server. Here are the steps: - 1. This command asks the target volume server to copy the source volume from source volume server, remember the last entry's timestamp. - 2. This command asks the target volume server to mount the new volume - Now the master will mark this volume id as readonly. - 3. This command asks the target volume server to tail the source volume for updates after the timestamp, for 1 minutes to drain the requests. - 4. This command asks the source volume server to unmount the source volume - Now the master will mark this volume id as writable. - 5. This command asks the source volume server to delete the source volume + 1. This command marks the source volume as read-only, copies it to the target volume server, and records the last entry timestamp. + 2. This command asks the target volume server to mount the new volume. + 3. This command asks the target volume server to tail the source volume for updates after the timestamp, for 1 minutes to drain any in-flight requests. + 4. This command asks the source volume server to delete the source volume. The option "-disk [hdd|ssd|]" can be used to change the volume disk type. @@ -92,7 +89,7 @@ func (c *commandVolumeMove) Do(args []string, commandEnv *CommandEnv, writer io. func LiveMoveVolume(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer pb.ServerAddress, idleTimeout time.Duration, diskType string, ioBytePerSecond int64, skipTailError bool) (err error) { log.Printf("copying volume %d from %s to %s", volumeId, sourceVolumeServer, targetVolumeServer) - lastAppendAtNs, err := copyVolume(grpcDialOption, writer, volumeId, sourceVolumeServer, targetVolumeServer, diskType, ioBytePerSecond) + lastAppendAtNs, err := copyVolume(grpcDialOption, writer, volumeId, sourceVolumeServer, targetVolumeServer, diskType, ioBytePerSecond, false) if err != nil { return fmt.Errorf("copy volume %d from %s to %s: %v", volumeId, sourceVolumeServer, targetVolumeServer, err) } @@ -115,7 +112,7 @@ func LiveMoveVolume(grpcDialOption grpc.DialOption, writer io.Writer, volumeId n return nil } -func copyVolume(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer pb.ServerAddress, diskType string, ioBytePerSecond int64) (lastAppendAtNs uint64, err error) { +func copyVolume(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer pb.ServerAddress, diskType string, ioBytePerSecond int64, restoreWritable bool) (lastAppendAtNs uint64, err error) { // check to see if the volume is already read-only and if its not then we need // to mark it as read-only and then before we return we need to undo what we @@ -125,6 +122,9 @@ func copyVolume(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needl if !shouldMarkWritable { return } + if !restoreWritable && err == nil { + return + } clientErr := operation.WithVolumeServerClient(false, sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { _, writableErr := volumeServerClient.VolumeMarkWritable(context.Background(), &volume_server_pb.VolumeMarkWritableRequest{