Browse Source

Fix live volume move tail timestamp (#8440)

* Improve move tail timestamp

* Add move tail timestamp integration test

* Simulate traffic during move
master
Chris Lu 11 hours ago
committed by GitHub
parent
commit
da4edb5fe6
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 4
      test/volume_server/framework/cluster_dual.go
  2. 279
      test/volume_server/grpc/move_tail_timestamp_test.go
  3. 68
      weed/server/volume_grpc_copy.go
  4. 2
      weed/shell/command_volume_copy.go
  5. 18
      weed/shell/command_volume_move.go

4
test/volume_server/framework/cluster_dual.go

@ -291,3 +291,7 @@ func (c *DualVolumeCluster) VolumeAdminURL(index int) string {
func (c *DualVolumeCluster) VolumePublicURL(index int) string {
return "http://" + c.VolumePublicAddress(index)
}
func (c *DualVolumeCluster) BaseDir() string {
return c.baseDir
}

279
test/volume_server/grpc/move_tail_timestamp_test.go

@ -0,0 +1,279 @@
package volume_server_grpc_test
import (
"bytes"
"context"
"fmt"
"io"
"net/http"
"os"
"path/filepath"
"strings"
"sync"
"testing"
"time"
"github.com/seaweedfs/seaweedfs/test/volume_server/framework"
"github.com/seaweedfs/seaweedfs/test/volume_server/matrix"
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
"github.com/seaweedfs/seaweedfs/weed/storage"
"github.com/seaweedfs/seaweedfs/weed/storage/backend"
"github.com/seaweedfs/seaweedfs/weed/storage/idx"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
"github.com/seaweedfs/seaweedfs/weed/util"
)
func TestVolumeCopyReturnsPreciseLastAppendTimestamp(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test in short mode")
}
cluster := framework.StartDualVolumeCluster(t, matrix.P1())
sourceConn, sourceClient := framework.DialVolumeServer(t, cluster.VolumeGRPCAddress(0))
defer sourceConn.Close()
destConn, destClient := framework.DialVolumeServer(t, cluster.VolumeGRPCAddress(1))
defer destConn.Close()
const volumeID = uint32(999)
framework.AllocateVolume(t, sourceClient, volumeID, "")
httpClient := framework.NewHTTPClient()
fid := framework.NewFileID(volumeID, 1, 0x42)
payload := []byte("move-tail-timestamp-payload")
uploadResp := framework.UploadBytes(t, httpClient, cluster.VolumeAdminURL(0), fid, payload)
_ = framework.ReadAllAndClose(t, uploadResp)
if uploadResp.StatusCode != http.StatusCreated {
t.Fatalf("source upload failed: status %d", uploadResp.StatusCode)
}
sourceDir := filepath.Join(cluster.BaseDir(), "volume0")
datPath := storage.VolumeFileName(sourceDir, "", int(volumeID)) + ".dat"
futureTime := time.Now().Add(2 * time.Hour)
if err := os.Chtimes(datPath, futureTime, futureTime); err != nil {
t.Fatalf("set future dat timestamp: %v", err)
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
sourceDataNode := cluster.VolumeAdminAddress(0) + "." + strings.Split(cluster.VolumeGRPCAddress(0), ":")[1]
copyStream, err := destClient.VolumeCopy(ctx, &volume_server_pb.VolumeCopyRequest{
VolumeId: volumeID,
SourceDataNode: sourceDataNode,
})
if err != nil {
t.Fatalf("VolumeCopy start failed: %v", err)
}
var lastAppendAtNs uint64
for {
resp, recvErr := copyStream.Recv()
if recvErr == io.EOF {
break
}
if recvErr != nil {
t.Fatalf("VolumeCopy recv failed: %v", recvErr)
}
if ts := resp.GetLastAppendAtNs(); ts > 0 {
lastAppendAtNs = ts
}
}
if lastAppendAtNs == 0 {
t.Fatalf("volume copy did not return a last append timestamp")
}
destDir := filepath.Join(cluster.BaseDir(), "volume1")
actualLastAppend := readLastAppendAtNs(t, destDir, volumeID)
if actualLastAppend == 0 {
t.Fatalf("failed to compute last append timestamp from destination files")
}
if lastAppendAtNs != actualLastAppend {
t.Fatalf("last append timestamp mismatch: got %d, actual %d", lastAppendAtNs, actualLastAppend)
}
}
func readLastAppendAtNs(t testing.TB, volumeDir string, volumeID uint32) uint64 {
t.Helper()
baseName := storage.VolumeFileName(volumeDir, "", int(volumeID))
idxPath := baseName + ".idx"
idxFile, err := os.Open(idxPath)
if err != nil {
t.Fatalf("open idx file %s: %v", idxPath, err)
}
defer idxFile.Close()
stat, err := idxFile.Stat()
if err != nil {
t.Fatalf("stat idx file %s: %v", idxPath, err)
}
if stat.Size() == 0 {
return 0
}
if stat.Size()%int64(types.NeedleMapEntrySize) != 0 {
t.Fatalf("unexpected idx file size %d", stat.Size())
}
buf := make([]byte, types.NeedleMapEntrySize)
if _, err := idxFile.ReadAt(buf, stat.Size()-int64(types.NeedleMapEntrySize)); err != nil {
t.Fatalf("read idx entry: %v", err)
}
_, offset, _ := idx.IdxFileEntry(buf)
if offset.IsZero() {
return 0
}
datPath := baseName + ".dat"
datFile, err := os.Open(datPath)
if err != nil {
t.Fatalf("open dat file %s: %v", datPath, err)
}
defer datFile.Close()
datBackend := backend.NewDiskFile(datFile)
n, _, _, err := needle.ReadNeedleHeader(datBackend, needle.GetCurrentVersion(), offset.ToActualOffset())
if err != nil {
t.Fatalf("read needle header: %v", err)
}
tailOffset := offset.ToActualOffset() + int64(types.NeedleHeaderSize) + int64(n.Size)
tail := make([]byte, needle.NeedleChecksumSize+types.TimestampSize)
readCount, readErr := datBackend.ReadAt(tail, tailOffset)
if readErr == io.EOF && readCount == len(tail) {
readErr = nil
}
if readErr != nil {
t.Fatalf("read needle tail: %v", readErr)
}
return util.BytesToUint64(tail[needle.NeedleChecksumSize : needle.NeedleChecksumSize+types.TimestampSize])
}
func TestVolumeMoveHandlesInFlightWrites(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test in short mode")
}
cluster := framework.StartDualVolumeCluster(t, matrix.P1())
sourceConn, sourceClient := framework.DialVolumeServer(t, cluster.VolumeGRPCAddress(0))
defer sourceConn.Close()
destConn, destClient := framework.DialVolumeServer(t, cluster.VolumeGRPCAddress(1))
defer destConn.Close()
const volumeID = uint32(988)
framework.AllocateVolume(t, sourceClient, volumeID, "")
httpClient := framework.NewHTTPClient()
fid := framework.NewFileID(volumeID, 7, 0xABCDEF01)
payload := []byte("volume-move-live-payload")
uploadResp := framework.UploadBytes(t, httpClient, cluster.VolumeAdminURL(0), fid, payload)
_ = framework.ReadAllAndClose(t, uploadResp)
if uploadResp.StatusCode != http.StatusCreated {
t.Fatalf("initial upload failed: %d", uploadResp.StatusCode)
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
sourceDataNode := cluster.VolumeAdminAddress(0) + "." + strings.Split(cluster.VolumeGRPCAddress(0), ":")[1]
copyStream, err := destClient.VolumeCopy(ctx, &volume_server_pb.VolumeCopyRequest{
VolumeId: volumeID,
SourceDataNode: sourceDataNode,
})
if err != nil {
t.Fatalf("VolumeCopy start failed: %v", err)
}
var lastAppendAtNs uint64
for {
resp, recvErr := copyStream.Recv()
if recvErr == io.EOF {
break
}
if recvErr != nil {
t.Fatalf("VolumeCopy recv failed: %v", recvErr)
}
if ts := resp.GetLastAppendAtNs(); ts > 0 {
lastAppendAtNs = ts
}
}
if lastAppendAtNs == 0 {
t.Fatalf("volume copy did not return a last append timestamp")
}
type written struct {
fid string
data []byte
}
var writesMu sync.Mutex
var writes []written
writeCtx, writeCancel := context.WithCancel(context.Background())
var writerWG sync.WaitGroup
writerWG.Add(1)
go func() {
defer writerWG.Done()
client := framework.NewHTTPClient()
for i := 0; i < 12; i++ {
select {
case <-writeCtx.Done():
return
default:
}
livePayload := []byte("live-data-" + fmt.Sprintf("%02d", i))
liveFid := framework.NewFileID(volumeID, uint64(2000+i), 0xAAAA1111+uint32(i))
resp := framework.UploadBytes(t, client, cluster.VolumeAdminURL(0), liveFid, livePayload)
_ = framework.ReadAllAndClose(t, resp)
if resp.StatusCode != http.StatusCreated {
t.Fatalf("live upload failed: %d", resp.StatusCode)
}
writesMu.Lock()
writes = append(writes, written{fid: liveFid, data: livePayload})
writesMu.Unlock()
time.Sleep(250 * time.Millisecond)
}
}()
tailCtx, tailCancel := context.WithTimeout(context.Background(), 60*time.Second)
defer tailCancel()
_, err = destClient.VolumeTailReceiver(tailCtx, &volume_server_pb.VolumeTailReceiverRequest{
VolumeId: volumeID,
SourceVolumeServer: sourceDataNode,
SinceNs: lastAppendAtNs,
IdleTimeoutSeconds: 3,
})
if err != nil {
writeCancel()
writerWG.Wait()
t.Fatalf("VolumeTailReceiver failed: %v", err)
}
writeCancel()
writerWG.Wait()
writesMu.Lock()
sampleCount := len(writes)
if sampleCount == 0 {
writesMu.Unlock()
t.Fatal("no live writes captured")
}
sample := writes
if sampleCount > 3 {
sample = writes[sampleCount-3:]
}
writesMu.Unlock()
httpCheckClient := framework.NewHTTPClient()
for _, w := range sample {
resp := framework.ReadBytes(t, httpCheckClient, cluster.VolumeAdminURL(1), w.fid)
body := framework.ReadAllAndClose(t, resp)
if resp.StatusCode != http.StatusOK {
t.Fatalf("dest read %s status %d", w.fid, resp.StatusCode)
}
if !bytes.Equal(body, w.data) {
t.Fatalf("dest read body mismatch for %s: %q vs %q", w.fid, string(body), string(w.data))
}
}
}

68
weed/server/volume_grpc_copy.go

@ -17,6 +17,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
"github.com/seaweedfs/seaweedfs/weed/storage"
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
"github.com/seaweedfs/seaweedfs/weed/storage/idx"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
"github.com/seaweedfs/seaweedfs/weed/util"
@ -187,6 +188,15 @@ func (vs *VolumeServer) VolumeCopy(req *volume_server_pb.VolumeCopyRequest, stre
return err
}
var lastAppendAtNs = volFileInfoResp.DatFileTimestampSeconds * uint64(time.Second)
if !hasRemoteDatFile {
if appendAtNs, appendErr := findLastAppendAtNsFromCopiedFiles(idxFileName, datFileName, needle.Version(volFileInfoResp.Version)); appendErr == nil && appendAtNs > 0 {
lastAppendAtNs = appendAtNs
} else if appendErr != nil {
glog.V(1).Infof("failed to find last append timestamp for volume %d: %v", req.VolumeId, appendErr)
}
}
// mount the volume
err = vs.store.MountVolume(needle.VolumeId(req.VolumeId))
if err != nil {
@ -194,7 +204,7 @@ func (vs *VolumeServer) VolumeCopy(req *volume_server_pb.VolumeCopyRequest, stre
}
if err = stream.Send(&volume_server_pb.VolumeCopyResponse{
LastAppendAtNs: volFileInfoResp.DatFileTimestampSeconds * uint64(time.Second),
LastAppendAtNs: lastAppendAtNs,
}); err != nil {
glog.Errorf("send response: %v", err)
}
@ -264,6 +274,62 @@ func checkCopyFiles(originFileInf *volume_server_pb.ReadVolumeFileStatusResponse
return nil
}
func findLastAppendAtNsFromCopiedFiles(idxFileName, datFileName string, version needle.Version) (uint64, error) {
if version < needle.Version3 {
return 0, nil
}
idxFile, err := os.Open(idxFileName)
if err != nil {
return 0, fmt.Errorf("open idx file %s: %w", idxFileName, err)
}
defer idxFile.Close()
fi, err := idxFile.Stat()
if err != nil {
return 0, fmt.Errorf("stat idx file %s: %w", idxFileName, err)
}
if fi.Size() == 0 {
return 0, nil
}
if fi.Size()%int64(types.NeedleMapEntrySize) != 0 {
return 0, fmt.Errorf("unexpected idx file %s size: %d", idxFileName, fi.Size())
}
buf := make([]byte, types.NeedleMapEntrySize)
if _, err := idxFile.ReadAt(buf, fi.Size()-int64(types.NeedleMapEntrySize)); err != nil {
return 0, fmt.Errorf("read idx file %s: %w", idxFileName, err)
}
_, offset, _ := idx.IdxFileEntry(buf)
if offset.IsZero() {
return 0, nil
}
datFile, err := os.Open(datFileName)
if err != nil {
return 0, fmt.Errorf("open dat file %s: %w", datFileName, err)
}
defer datFile.Close()
datBackend := backend.NewDiskFile(datFile)
n, _, _, err := needle.ReadNeedleHeader(datBackend, version, offset.ToActualOffset())
if err != nil {
return 0, fmt.Errorf("read needle header %s offset %d: %w", datFileName, offset.ToActualOffset(), err)
}
tailOffset := offset.ToActualOffset() + int64(types.NeedleHeaderSize) + int64(n.Size)
tail := make([]byte, needle.NeedleChecksumSize+types.TimestampSize)
readCount, readErr := datBackend.ReadAt(tail, tailOffset)
if readErr == io.EOF && readCount == len(tail) {
readErr = nil
}
if readErr != nil {
return 0, fmt.Errorf("read needle tail %s offset %d: %w", datFileName, tailOffset, readErr)
}
return util.BytesToUint64(tail[needle.NeedleChecksumSize : needle.NeedleChecksumSize+types.TimestampSize]), nil
}
func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName string, wt *util.WriteThrottler, isAppend bool, progressFn storage.ProgressFunc) (modifiedTsNs int64, err error) {
glog.V(4).Infof("writing to %s", fileName)
flags := os.O_WRONLY | os.O_CREATE | os.O_TRUNC

2
weed/shell/command_volume_copy.go

@ -60,6 +60,6 @@ func (c *commandVolumeCopy) Do(args []string, commandEnv *CommandEnv, writer io.
return fmt.Errorf("source and target volume servers are the same!")
}
_, err = copyVolume(commandEnv.option.GrpcDialOption, writer, volumeId, sourceVolumeServer, targetVolumeServer, "", 0)
_, err = copyVolume(commandEnv.option.GrpcDialOption, writer, volumeId, sourceVolumeServer, targetVolumeServer, "", 0, true)
return
}

18
weed/shell/command_volume_move.go

@ -38,13 +38,10 @@ func (c *commandVolumeMove) Help() string {
This command move a live volume from one volume server to another volume server. Here are the steps:
1. This command asks the target volume server to copy the source volume from source volume server, remember the last entry's timestamp.
2. This command asks the target volume server to mount the new volume
Now the master will mark this volume id as readonly.
3. This command asks the target volume server to tail the source volume for updates after the timestamp, for 1 minutes to drain the requests.
4. This command asks the source volume server to unmount the source volume
Now the master will mark this volume id as writable.
5. This command asks the source volume server to delete the source volume
1. This command marks the source volume as read-only, copies it to the target volume server, and records the last entry timestamp.
2. This command asks the target volume server to mount the new volume.
3. This command asks the target volume server to tail the source volume for updates after the timestamp, for 1 minutes to drain any in-flight requests.
4. This command asks the source volume server to delete the source volume.
The option "-disk [hdd|ssd|<tag>]" can be used to change the volume disk type.
@ -92,7 +89,7 @@ func (c *commandVolumeMove) Do(args []string, commandEnv *CommandEnv, writer io.
func LiveMoveVolume(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer pb.ServerAddress, idleTimeout time.Duration, diskType string, ioBytePerSecond int64, skipTailError bool) (err error) {
log.Printf("copying volume %d from %s to %s", volumeId, sourceVolumeServer, targetVolumeServer)
lastAppendAtNs, err := copyVolume(grpcDialOption, writer, volumeId, sourceVolumeServer, targetVolumeServer, diskType, ioBytePerSecond)
lastAppendAtNs, err := copyVolume(grpcDialOption, writer, volumeId, sourceVolumeServer, targetVolumeServer, diskType, ioBytePerSecond, false)
if err != nil {
return fmt.Errorf("copy volume %d from %s to %s: %v", volumeId, sourceVolumeServer, targetVolumeServer, err)
}
@ -115,7 +112,7 @@ func LiveMoveVolume(grpcDialOption grpc.DialOption, writer io.Writer, volumeId n
return nil
}
func copyVolume(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer pb.ServerAddress, diskType string, ioBytePerSecond int64) (lastAppendAtNs uint64, err error) {
func copyVolume(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer pb.ServerAddress, diskType string, ioBytePerSecond int64, restoreWritable bool) (lastAppendAtNs uint64, err error) {
// check to see if the volume is already read-only and if its not then we need
// to mark it as read-only and then before we return we need to undo what we
@ -125,6 +122,9 @@ func copyVolume(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needl
if !shouldMarkWritable {
return
}
if !restoreWritable && err == nil {
return
}
clientErr := operation.WithVolumeServerClient(false, sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, writableErr := volumeServerClient.VolumeMarkWritable(context.Background(), &volume_server_pb.VolumeMarkWritableRequest{

Loading…
Cancel
Save