You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
454 lines
14 KiB
454 lines
14 KiB
package volume_server_merge_test
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"os"
|
|
"os/exec"
|
|
"strings"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/test/volume_server/framework"
|
|
"github.com/seaweedfs/seaweedfs/test/volume_server/matrix"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
|
|
)
|
|
|
|
// runWeedShell executes a weed shell command by providing commands via stdin with lock/unlock.
|
|
// It uses a timeout to prevent hanging if the weed shell process becomes unresponsive.
|
|
func runWeedShell(t *testing.T, weedBinary, masterAddr, shellCommand string) (output string, err error) {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
|
defer cancel()
|
|
|
|
cmd := exec.CommandContext(ctx, weedBinary, "shell", "-master="+masterAddr)
|
|
// Wrap command in lock/unlock for cluster-wide operations
|
|
shellCommands := "lock\n" + shellCommand + "\nunlock\nexit\n"
|
|
cmd.Stdin = strings.NewReader(shellCommands)
|
|
outputBytes, err := cmd.CombinedOutput()
|
|
output = string(outputBytes)
|
|
if err != nil {
|
|
if ctx.Err() == context.DeadlineExceeded {
|
|
t.Logf("weed shell command '%s' timed out after 30s", shellCommand)
|
|
} else {
|
|
t.Logf("weed shell command '%s' output: %s, error: %v", shellCommand, output, err)
|
|
}
|
|
}
|
|
return output, err
|
|
}
|
|
|
|
// TestVolumeMergeBasic verifies the basic volume.merge workflow using the weed shell command
|
|
func TestVolumeMergeBasic(t *testing.T) {
|
|
if testing.Short() {
|
|
t.Skip("skipping integration test in short mode")
|
|
}
|
|
|
|
// Start a triple cluster with 3 volume servers (needed for merge which allocates to a third location)
|
|
cluster := framework.StartTripleVolumeCluster(t, matrix.P1())
|
|
|
|
// Connect to volume servers to allocate volumes
|
|
conn0, volumeClient0 := framework.DialVolumeServer(t, cluster.VolumeGRPCAddress(0))
|
|
defer conn0.Close()
|
|
|
|
conn1, volumeClient1 := framework.DialVolumeServer(t, cluster.VolumeGRPCAddress(1))
|
|
defer conn1.Close()
|
|
|
|
const volumeID = uint32(100)
|
|
|
|
// Allocate volume on only 2 servers (replicas)
|
|
// The merge command will allocate on the 3rd server as a temporary location
|
|
framework.AllocateVolume(t, volumeClient0, volumeID, "")
|
|
framework.AllocateVolume(t, volumeClient1, volumeID, "")
|
|
|
|
t.Logf("Successfully allocated volume %d on servers 0 and 1 as replicas", volumeID)
|
|
|
|
// Get weed binary
|
|
weedBinary := os.Getenv("WEED_BINARY")
|
|
if weedBinary == "" {
|
|
var err error
|
|
weedBinary, err = framework.FindOrBuildWeedBinary()
|
|
if err != nil {
|
|
t.Fatalf("failed to find weed binary: %v", err)
|
|
}
|
|
}
|
|
|
|
// Execute volume.merge command via weed shell
|
|
output, err := runWeedShell(t, weedBinary, cluster.MasterAddress(), fmt.Sprintf("volume.merge -volumeId %d", volumeID))
|
|
|
|
t.Logf("volume.merge command output:\n%s", output)
|
|
|
|
if err != nil {
|
|
t.Fatalf("volume.merge command failed: %v\noutput: %s", err, output)
|
|
}
|
|
|
|
// Verify the success message in output
|
|
if !strings.Contains(output, fmt.Sprintf("merged volume %d", volumeID)) {
|
|
t.Fatalf("expected success message in output, got: %s", output)
|
|
}
|
|
|
|
t.Logf("Successfully executed volume.merge command for volume %d", volumeID)
|
|
}
|
|
|
|
// TestVolumeMergeReadonly verifies that volume.merge requires readonly state
|
|
func TestVolumeMergeReadonly(t *testing.T) {
|
|
if testing.Short() {
|
|
t.Skip("skipping integration test in short mode")
|
|
}
|
|
|
|
cluster := framework.StartTripleVolumeCluster(t, matrix.P1())
|
|
|
|
// Connect to volume servers
|
|
conn0, volumeClient0 := framework.DialVolumeServer(t, cluster.VolumeGRPCAddress(0))
|
|
defer conn0.Close()
|
|
|
|
conn1, volumeClient1 := framework.DialVolumeServer(t, cluster.VolumeGRPCAddress(1))
|
|
defer conn1.Close()
|
|
|
|
const volumeID = uint32(101)
|
|
|
|
// Allocate volumes on only 2 servers (the merge will allocate on the 3rd)
|
|
framework.AllocateVolume(t, volumeClient0, volumeID, "")
|
|
framework.AllocateVolume(t, volumeClient1, volumeID, "")
|
|
|
|
// Get weed binary
|
|
weedBinary := os.Getenv("WEED_BINARY")
|
|
if weedBinary == "" {
|
|
var err error
|
|
weedBinary, err = framework.FindOrBuildWeedBinary()
|
|
if err != nil {
|
|
t.Fatalf("failed to find weed binary: %v", err)
|
|
}
|
|
}
|
|
|
|
// Test 1: Merge while writable (merge command will mark volumes readonly as needed)
|
|
output, err := runWeedShell(t, weedBinary, cluster.MasterAddress(), fmt.Sprintf("volume.merge -volumeId %d", volumeID))
|
|
if err != nil {
|
|
t.Logf("merge on writable volumes failed: %v\noutput: %s", err, output)
|
|
t.Fatalf("volume.merge should work on writable volumes (marks them readonly internally)")
|
|
}
|
|
|
|
if !strings.Contains(output, fmt.Sprintf("merged volume %d", volumeID)) {
|
|
t.Fatalf("expected success message in output, got: %s", output)
|
|
}
|
|
|
|
// Verify volumes were marked readonly during merge and restored after
|
|
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
|
defer cancel()
|
|
|
|
// Check that volumes are writable again after merge (were restored)
|
|
status0, err := volumeClient0.VolumeStatus(ctx, &volume_server_pb.VolumeStatusRequest{
|
|
VolumeId: volumeID,
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("failed to get status after merge: %v", err)
|
|
}
|
|
|
|
status1, err := volumeClient1.VolumeStatus(ctx, &volume_server_pb.VolumeStatusRequest{
|
|
VolumeId: volumeID,
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("failed to get status from server 1 after merge: %v", err)
|
|
}
|
|
|
|
if status0.GetIsReadOnly() {
|
|
t.Fatalf("expected volume to be writable again after merge on server 0")
|
|
}
|
|
|
|
if status1.GetIsReadOnly() {
|
|
t.Fatalf("expected volume to be writable again after merge on server 1")
|
|
}
|
|
|
|
t.Logf("Successfully tested merge on writable volumes and writable restoration")
|
|
}
|
|
|
|
// TestVolumeMergeRestore verifies that merge restores writable state for originally-writable replicas
|
|
func TestVolumeMergeRestore(t *testing.T) {
|
|
if testing.Short() {
|
|
t.Skip("skipping integration test in short mode")
|
|
}
|
|
|
|
cluster := framework.StartTripleVolumeCluster(t, matrix.P1())
|
|
|
|
conn0, volumeClient0 := framework.DialVolumeServer(t, cluster.VolumeGRPCAddress(0))
|
|
defer conn0.Close()
|
|
|
|
conn1, volumeClient1 := framework.DialVolumeServer(t, cluster.VolumeGRPCAddress(1))
|
|
defer conn1.Close()
|
|
|
|
const volumeID = uint32(102)
|
|
|
|
// Allocate volume on only 2 servers (the merge will allocate on the 3rd)
|
|
framework.AllocateVolume(t, volumeClient0, volumeID, "")
|
|
framework.AllocateVolume(t, volumeClient1, volumeID, "")
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
|
defer cancel()
|
|
|
|
// Mark both as readonly
|
|
_, err := volumeClient0.VolumeMarkReadonly(ctx, &volume_server_pb.VolumeMarkReadonlyRequest{
|
|
VolumeId: volumeID,
|
|
Persist: false,
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("failed to mark readonly: %v", err)
|
|
}
|
|
|
|
_, err = volumeClient1.VolumeMarkReadonly(ctx, &volume_server_pb.VolumeMarkReadonlyRequest{
|
|
VolumeId: volumeID,
|
|
Persist: false,
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("failed to mark readonly on server 1: %v", err)
|
|
}
|
|
|
|
// Get weed binary
|
|
weedBinary := os.Getenv("WEED_BINARY")
|
|
if weedBinary == "" {
|
|
var err error
|
|
weedBinary, err = framework.FindOrBuildWeedBinary()
|
|
if err != nil {
|
|
t.Fatalf("failed to find weed binary: %v", err)
|
|
}
|
|
}
|
|
|
|
// Execute volume.merge via shell
|
|
output, err := runWeedShell(t, weedBinary, cluster.MasterAddress(), fmt.Sprintf("volume.merge -volumeId %d", volumeID))
|
|
|
|
t.Logf("volume.merge output: %s, error: %v", output, err)
|
|
|
|
if err != nil {
|
|
t.Fatalf("volume.merge failed: %v\noutput: %s", err, output)
|
|
}
|
|
|
|
if !strings.Contains(output, fmt.Sprintf("merged volume %d", volumeID)) {
|
|
t.Fatalf("expected success message in output, got: %s", output)
|
|
}
|
|
|
|
// After merge, verify that originally-writable replicas are writable again
|
|
// (The merge command should restore writable state for replicas that were writable before readonly)
|
|
// Actually both were writable initially, then marked readonly, so both should be restored
|
|
|
|
// Poll for writable state restoration instead of fixed sleep
|
|
maxRetries := 50 // ~5s total with 100ms sleeps
|
|
for retries := 0; retries < maxRetries; retries++ {
|
|
status0, err := volumeClient0.VolumeStatus(ctx, &volume_server_pb.VolumeStatusRequest{
|
|
VolumeId: volumeID,
|
|
})
|
|
if err == nil && !status0.GetIsReadOnly() {
|
|
// Server 0 is writable, check server 1
|
|
status1, err := volumeClient1.VolumeStatus(ctx, &volume_server_pb.VolumeStatusRequest{
|
|
VolumeId: volumeID,
|
|
})
|
|
if err == nil && !status1.GetIsReadOnly() {
|
|
// Both are writable, break out
|
|
break
|
|
}
|
|
}
|
|
if retries < maxRetries-1 {
|
|
time.Sleep(100 * time.Millisecond)
|
|
}
|
|
}
|
|
|
|
status0Final, err := volumeClient0.VolumeStatus(ctx, &volume_server_pb.VolumeStatusRequest{
|
|
VolumeId: volumeID,
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("failed to get final status for server 0: %v", err)
|
|
}
|
|
|
|
status1Final, err := volumeClient1.VolumeStatus(ctx, &volume_server_pb.VolumeStatusRequest{
|
|
VolumeId: volumeID,
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("failed to get final status for server 1: %v", err)
|
|
}
|
|
|
|
if status0Final.GetIsReadOnly() {
|
|
t.Fatalf("expected volume %d to be writable on server 0 after merge, but it's still readonly", volumeID)
|
|
}
|
|
|
|
if status1Final.GetIsReadOnly() {
|
|
t.Fatalf("expected volume %d to be writable on server 1 after merge, but it's still readonly", volumeID)
|
|
}
|
|
|
|
t.Logf("After merge - volume %d on server 0: readonly=%v, server 1: readonly=%v", volumeID, status0Final.GetIsReadOnly(), status1Final.GetIsReadOnly())
|
|
|
|
t.Logf("Successfully tested merge and restore workflow for volume %d", volumeID)
|
|
}
|
|
|
|
// TestVolumeMergeTailNeedles verifies the volume.merge command with empty volumes
|
|
func TestVolumeMergeTailNeedles(t *testing.T) {
|
|
if testing.Short() {
|
|
t.Skip("skipping integration test in short mode")
|
|
}
|
|
|
|
cluster := framework.StartTripleVolumeCluster(t, matrix.P1())
|
|
|
|
conn0, volumeClient0 := framework.DialVolumeServer(t, cluster.VolumeGRPCAddress(0))
|
|
defer conn0.Close()
|
|
|
|
conn1, volumeClient1 := framework.DialVolumeServer(t, cluster.VolumeGRPCAddress(1))
|
|
defer conn1.Close()
|
|
|
|
const volumeID = uint32(200)
|
|
|
|
// Allocate empty volumes on only 2 servers (the merge will allocate on the 3rd)
|
|
framework.AllocateVolume(t, volumeClient0, volumeID, "")
|
|
framework.AllocateVolume(t, volumeClient1, volumeID, "")
|
|
|
|
// Mark as readonly
|
|
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
|
defer cancel()
|
|
|
|
_, err := volumeClient0.VolumeMarkReadonly(ctx, &volume_server_pb.VolumeMarkReadonlyRequest{
|
|
VolumeId: volumeID,
|
|
Persist: false,
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("failed to mark readonly on server 0: %v", err)
|
|
}
|
|
|
|
_, err = volumeClient1.VolumeMarkReadonly(ctx, &volume_server_pb.VolumeMarkReadonlyRequest{
|
|
VolumeId: volumeID,
|
|
Persist: false,
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("failed to mark readonly on server 1: %v", err)
|
|
}
|
|
|
|
// Get weed binary
|
|
weedBinary := os.Getenv("WEED_BINARY")
|
|
if weedBinary == "" {
|
|
var err error
|
|
weedBinary, err = framework.FindOrBuildWeedBinary()
|
|
if err != nil {
|
|
t.Fatalf("failed to find weed binary: %v", err)
|
|
}
|
|
}
|
|
|
|
// Execute volume.merge command on empty volumes
|
|
output, err := runWeedShell(t, weedBinary, cluster.MasterAddress(), fmt.Sprintf("volume.merge -volumeId %d", volumeID))
|
|
|
|
t.Logf("merge empty volumes - output: %s, error: %v", output, err)
|
|
|
|
if err != nil {
|
|
t.Fatalf("volume.merge failed on empty volumes: %v\noutput: %s", err, output)
|
|
}
|
|
|
|
// Verify merge completed successfully
|
|
if !strings.Contains(output, fmt.Sprintf("merged volume %d", volumeID)) {
|
|
t.Fatalf("expected success message in output, got: %s", output)
|
|
}
|
|
|
|
t.Logf("Successfully merged empty volumes %d", volumeID)
|
|
}
|
|
|
|
// TestVolumeMergeDivergentReplicas simulates a realistic merge scenario using shell command
|
|
func TestVolumeMergeDivergentReplicas(t *testing.T) {
|
|
if testing.Short() {
|
|
t.Skip("skipping integration test in short mode")
|
|
}
|
|
|
|
cluster := framework.StartTripleVolumeCluster(t, matrix.P1())
|
|
|
|
// Connect to both servers
|
|
conn0, volumeClient0 := framework.DialVolumeServer(t, cluster.VolumeGRPCAddress(0))
|
|
defer conn0.Close()
|
|
|
|
conn1, volumeClient1 := framework.DialVolumeServer(t, cluster.VolumeGRPCAddress(1))
|
|
defer conn1.Close()
|
|
|
|
const volumeID = uint32(201)
|
|
|
|
// Allocate the same volume on only 2 servers (the merge will allocate on the 3rd)
|
|
framework.AllocateVolume(t, volumeClient0, volumeID, "")
|
|
framework.AllocateVolume(t, volumeClient1, volumeID, "")
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
|
defer cancel()
|
|
|
|
// Verify both volumes are initially writable
|
|
status0, err := volumeClient0.VolumeStatus(ctx, &volume_server_pb.VolumeStatusRequest{
|
|
VolumeId: volumeID,
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("failed to get status for server 0: %v", err)
|
|
}
|
|
|
|
status1, err := volumeClient1.VolumeStatus(ctx, &volume_server_pb.VolumeStatusRequest{
|
|
VolumeId: volumeID,
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("failed to get status for server 1: %v", err)
|
|
}
|
|
|
|
if status0.GetIsReadOnly() || status1.GetIsReadOnly() {
|
|
t.Fatalf("expected both volumes to be writable initially")
|
|
}
|
|
|
|
// Mark both as readonly to simulate merge precondition
|
|
_, err = volumeClient0.VolumeMarkReadonly(ctx, &volume_server_pb.VolumeMarkReadonlyRequest{
|
|
VolumeId: volumeID,
|
|
Persist: false,
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("failed to mark readonly on server 0: %v", err)
|
|
}
|
|
|
|
_, err = volumeClient1.VolumeMarkReadonly(ctx, &volume_server_pb.VolumeMarkReadonlyRequest{
|
|
VolumeId: volumeID,
|
|
Persist: false,
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("failed to mark readonly on server 1: %v", err)
|
|
}
|
|
|
|
// Verify both are readonly
|
|
status0Again, err := volumeClient0.VolumeStatus(ctx, &volume_server_pb.VolumeStatusRequest{
|
|
VolumeId: volumeID,
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("failed to get status after readonly: %v", err)
|
|
}
|
|
|
|
if !status0Again.GetIsReadOnly() {
|
|
t.Fatalf("expected volume %d to be readonly", volumeID)
|
|
}
|
|
|
|
// Also verify server 1 is readonly
|
|
status1Again, err := volumeClient1.VolumeStatus(ctx, &volume_server_pb.VolumeStatusRequest{
|
|
VolumeId: volumeID,
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("failed to get status on server 1 after readonly: %v", err)
|
|
}
|
|
|
|
if !status1Again.GetIsReadOnly() {
|
|
t.Fatalf("expected volume %d to be readonly on server 1", volumeID)
|
|
}
|
|
|
|
// Get weed binary
|
|
weedBinary := os.Getenv("WEED_BINARY")
|
|
if weedBinary == "" {
|
|
var err error
|
|
weedBinary, err = framework.FindOrBuildWeedBinary()
|
|
if err != nil {
|
|
t.Fatalf("failed to find weed binary: %v", err)
|
|
}
|
|
}
|
|
|
|
// Execute volume.merge command via shell
|
|
output, err := runWeedShell(t, weedBinary, cluster.MasterAddress(), fmt.Sprintf("volume.merge -volumeId %d", volumeID))
|
|
|
|
t.Logf("merge divergent replicas - output: %s, error: %v", output, err)
|
|
|
|
if err != nil {
|
|
t.Fatalf("volume.merge failed: %v\noutput: %s", err, output)
|
|
}
|
|
|
|
// Verify merge completed successfully
|
|
if !strings.Contains(output, fmt.Sprintf("merged volume %d", volumeID)) {
|
|
t.Fatalf("expected success message in output, got: %s", output)
|
|
}
|
|
|
|
t.Logf("Successfully merged divergent replicas for volume %d using shell command", volumeID)
|
|
}
|