From 347ed7cbfaed9ab7af00eac7fc9ab0b237591ad9 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Tue, 16 Dec 2025 23:16:07 -0800 Subject: [PATCH] fix: sync replica entries before ec.encode and volume.tier.move (#7798) * fix: sync replica entries before ec.encode and volume.tier.move (#7797) This addresses the data inconsistency risk in multi-replica volumes. When ec.encode or volume.tier.move operates on a multi-replica volume: 1. Find the replica with the highest file count (the 'best' one) 2. Copy missing entries from other replicas INTO this best replica 3. Use this union replica for the destructive operation This ensures no data is lost due to replica inconsistency before EC encoding or tier moving. Added: - command_volume_replica_check.go: Core sync and select logic - command_volume_replica_check_test.go: Test coverage Modified: - command_ec_encode.go: Call syncAndSelectBestReplica before encoding - command_volume_tier_move.go: Call syncAndSelectBestReplica before moving Fixes #7797 * test: add integration test for replicated volume sync during ec.encode * test: improve retry logic for replicated volume integration test * fix: resolve JWT issue in integration tests by using empty security.toml * address review comments: add readNeedleMeta, parallelize status fetch, fix collection param, fix test issues * test: use collection parameter consistently in replica sync test * fix: convert weed binary path to absolute to work with changed working directory * fix: remove skip behavior, keep tests failing on missing binary * fix: always check recency for each needle, add divergent replica test --- test/erasure_coding/ec_integration_test.go | 261 ++++++++++++- weed/shell/command_ec_common.go | 29 ++ weed/shell/command_ec_encode.go | 34 +- weed/shell/command_volume_replica_check.go | 353 ++++++++++++++++++ .../command_volume_replica_check_test.go | 273 ++++++++++++++ weed/shell/command_volume_tier_move.go | 24 +- 6 files changed, 959 insertions(+), 15 deletions(-) create mode 100644 weed/shell/command_volume_replica_check.go create mode 100644 weed/shell/command_volume_replica_check_test.go diff --git a/test/erasure_coding/ec_integration_test.go b/test/erasure_coding/ec_integration_test.go index 9d7cde572..25693cb75 100644 --- a/test/erasure_coding/ec_integration_test.go +++ b/test/erasure_coding/ec_integration_test.go @@ -15,8 +15,10 @@ import ( "github.com/seaweedfs/seaweedfs/weed/operation" "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" "github.com/seaweedfs/seaweedfs/weed/shell" "github.com/seaweedfs/seaweedfs/weed/storage/needle" + "github.com/seaweedfs/seaweedfs/weed/wdclient" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "google.golang.org/grpc" @@ -386,7 +388,7 @@ func startSeaweedFSCluster(ctx context.Context, dataDir string) (*TestCluster, e // Find weed binary weedBinary := findWeedBinary() if weedBinary == "" { - return nil, fmt.Errorf("weed binary not found") + return nil, fmt.Errorf("weed binary not found - build with 'go build' or 'make' first") } cluster := &TestCluster{} @@ -395,6 +397,11 @@ func startSeaweedFSCluster(ctx context.Context, dataDir string) (*TestCluster, e masterDir := filepath.Join(dataDir, "master") os.MkdirAll(masterDir, 0755) + // Create an empty security.toml to disable JWT authentication in tests + // This prevents the test from picking up ~/.seaweedfs/security.toml + securityToml := filepath.Join(dataDir, "security.toml") + os.WriteFile(securityToml, []byte("# Empty security config for testing\n"), 0644) + // Start master server masterCmd := exec.CommandContext(ctx, weedBinary, "master", "-port", "9333", @@ -403,6 +410,7 @@ func startSeaweedFSCluster(ctx context.Context, dataDir string) (*TestCluster, e "-ip", "127.0.0.1", "-peers", "none", // Faster startup when no multiple masters needed ) + masterCmd.Dir = dataDir // Run from test dir so it picks up our security.toml masterLogFile, err := os.Create(filepath.Join(masterDir, "master.log")) if err != nil { @@ -435,6 +443,7 @@ func startSeaweedFSCluster(ctx context.Context, dataDir string) (*TestCluster, e "-dataCenter", "dc1", "-rack", rack, ) + volumeCmd.Dir = dataDir // Run from test dir so it picks up our security.toml volumeLogFile, err := os.Create(filepath.Join(volumeDir, "volume.log")) if err != nil { @@ -469,6 +478,10 @@ func findWeedBinary() string { for _, candidate := range candidates { if _, err := os.Stat(candidate); err == nil { + // Convert to absolute path so it works when command's Dir is changed + if absPath, err := filepath.Abs(candidate); err == nil { + return absPath + } return candidate } } @@ -2174,3 +2187,249 @@ func countShardsPerRack(testDir string, volumeId uint32) map[string]int { return rackDistribution } + +// TestECEncodeReplicatedVolumeSync tests that ec.encode properly syncs missing entries +// between replicas before encoding. This addresses issue #7797. +func TestECEncodeReplicatedVolumeSync(t *testing.T) { + if testing.Short() { + t.Skip("Skipping replicated volume sync integration test in short mode") + } + + // Create temporary directory for test data + testDir, err := os.MkdirTemp("", "seaweedfs_ec_replica_sync_test_") + require.NoError(t, err) + defer os.RemoveAll(testDir) + + // Start SeaweedFS cluster + ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) + defer cancel() + + cluster, err := startSeaweedFSCluster(ctx, testDir) + require.NoError(t, err) + defer cluster.Stop() + + // Wait for servers to be ready + require.NoError(t, waitForServer("127.0.0.1:9333", 30*time.Second)) + for i := 0; i < 6; i++ { + require.NoError(t, waitForServer(fmt.Sprintf("127.0.0.1:%d", 8080+i), 30*time.Second)) + } + + // Create command environment + options := &shell.ShellOptions{ + Masters: stringPtr("127.0.0.1:9333"), + GrpcDialOption: grpc.WithInsecure(), + FilerGroup: stringPtr("default"), + } + commandEnv := shell.NewCommandEnv(options) + + // Connect to master with longer timeout + ctx2, cancel2 := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel2() + go commandEnv.MasterClient.KeepConnectedToMaster(ctx2) + commandEnv.MasterClient.WaitUntilConnected(ctx2) + + // Wait for volume servers to register with master + time.Sleep(5 * time.Second) + + // Test: Create replicated volume and verify sync behavior with consistent replicas + t.Run("sync_replicated_volume_consistent", func(t *testing.T) { + const testCollection = "replicated_consistent" + const testReplication = "010" // 2 copies on different servers on the same rack + + // Retry a few times as volume servers may still be registering + var volumeId needle.VolumeId + var uploadErr error + for retry := 0; retry < 5; retry++ { + volumeId, uploadErr = uploadTestDataWithReplication(t, "127.0.0.1:9333", testCollection, testReplication) + if uploadErr == nil { + break + } + t.Logf("Upload attempt %d failed: %v, retrying...", retry+1, uploadErr) + time.Sleep(3 * time.Second) + } + if uploadErr != nil { + t.Skipf("Could not create replicated volume: %v", uploadErr) + return + } + t.Logf("Created replicated volume %d with collection %s, replication %s", volumeId, testCollection, testReplication) + + // Acquire lock + locked, unlock := tryLockWithTimeout(t, commandEnv, 30*time.Second) + if !locked { + t.Skip("Could not acquire lock within timeout") + } + defer unlock() + + // Execute EC encoding with the same collection + ecEncodeCmd := shell.Commands[findCommandIndex("ec.encode")] + args := []string{ + "-volumeId", fmt.Sprintf("%d", volumeId), + "-collection", testCollection, + "-force", + } + + outputStr, encodeErr := captureCommandOutput(t, ecEncodeCmd, args, commandEnv) + t.Logf("EC encode output:\n%s", outputStr) + + // For consistent replicas, should see "all X replicas are consistent" + assert.Contains(t, outputStr, "replicas are consistent", "Should detect replicas are consistent") + + if encodeErr != nil { + t.Logf("EC encoding result: %v", encodeErr) + } else { + t.Log("EC encoding completed successfully") + } + }) + + // Test: Create divergent replicas and verify sync/union code is exercised + t.Run("sync_replicated_volume_divergent", func(t *testing.T) { + const testCollection = "replicated_divergent" + const testReplication = "010" // 2 copies on different servers on the same rack + + // Create replicated volume with initial data + var volumeId needle.VolumeId + var uploadErr error + for retry := 0; retry < 5; retry++ { + volumeId, uploadErr = uploadTestDataWithReplication(t, "127.0.0.1:9333", testCollection, testReplication) + if uploadErr == nil { + break + } + t.Logf("Upload attempt %d failed: %v, retrying...", retry+1, uploadErr) + time.Sleep(3 * time.Second) + } + if uploadErr != nil { + t.Skipf("Could not create replicated volume: %v", uploadErr) + return + } + t.Logf("Created replicated volume %d with collection %s, replication %s", volumeId, testCollection, testReplication) + + // Get volume locations to identify the two replicas + locations, found := commandEnv.MasterClient.GetLocationsClone(uint32(volumeId)) + if !found || len(locations) < 2 { + t.Skipf("Could not get 2 replica locations for volume %d (got %d)", volumeId, len(locations)) + return + } + t.Logf("Volume %d has replicas at: %v and %v", volumeId, locations[0].Url, locations[1].Url) + + // Write an extra entry to ONLY one replica to create divergence + // We'll use the WriteNeedleBlob gRPC call to inject data directly + extraNeedleId := uint64(999999) // Use a high needle ID unlikely to conflict + extraData := []byte("extra data written to only one replica to create divergence") + + err := injectNeedleToOneReplica(grpc.WithInsecure(), volumeId, locations[0], extraNeedleId, extraData) + if err != nil { + t.Logf("Could not inject divergent needle (may not be supported): %v", err) + // Fall back to testing consistent path + } else { + t.Logf("Injected extra needle %d to replica %s to create divergence", extraNeedleId, locations[0].Url) + } + + // Acquire lock + locked, unlock := tryLockWithTimeout(t, commandEnv, 30*time.Second) + if !locked { + t.Skip("Could not acquire lock within timeout") + } + defer unlock() + + // Execute EC encoding - should detect divergence and build union + ecEncodeCmd := shell.Commands[findCommandIndex("ec.encode")] + args := []string{ + "-volumeId", fmt.Sprintf("%d", volumeId), + "-collection", testCollection, + "-force", + } + + outputStr, encodeErr := captureCommandOutput(t, ecEncodeCmd, args, commandEnv) + t.Logf("EC encode output:\n%s", outputStr) + + // Check if divergence was detected + if strings.Contains(outputStr, "building union") { + t.Log("SUCCESS: Divergent replicas detected - sync/union code was exercised") + assert.Contains(t, outputStr, "selected", "Should show which replica was selected as best") + // The "copied" message only appears if the best replica is missing entries from other replicas + // In our case, we injected into what becomes the best replica (higher file count), + // so copying may not be needed. The key is that "building union" was triggered. + if strings.Contains(outputStr, "copied") { + t.Log("Entries were copied between replicas") + } else { + t.Log("Best replica already had all entries (injected entry was on best replica)") + } + } else if strings.Contains(outputStr, "replicas are consistent") { + t.Log("Replicas were consistent (injection may not have worked)") + } else { + t.Log("Single replica or sync not triggered") + } + + if encodeErr != nil { + t.Logf("EC encoding result: %v", encodeErr) + } else { + t.Log("EC encoding completed successfully") + } + }) +} + +// injectNeedleToOneReplica writes a needle directly to one replica to create divergence +func injectNeedleToOneReplica(grpcDialOption grpc.DialOption, vid needle.VolumeId, location wdclient.Location, needleId uint64, data []byte) error { + return operation.WithVolumeServerClient(false, location.ServerAddress(), grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + _, err := client.WriteNeedleBlob(context.Background(), &volume_server_pb.WriteNeedleBlobRequest{ + VolumeId: uint32(vid), + NeedleId: needleId, + Size: int32(len(data)), + NeedleBlob: data, + }) + return err + }) +} + +// uploadTestDataWithReplication uploads test data and returns the volume ID +// using the specified collection and replication level. All files are uploaded to the same volume. +func uploadTestDataWithReplication(t *testing.T, masterAddr string, collection string, replication string) (needle.VolumeId, error) { + const numFiles = 20 // Reduced count since we're uploading to same volume + + // Assign multiple file IDs from the same volume + assignResult, err := operation.Assign(context.Background(), func(ctx context.Context) pb.ServerAddress { + return pb.ServerAddress(masterAddr) + }, grpc.WithInsecure(), &operation.VolumeAssignRequest{ + Count: uint64(numFiles), + Collection: collection, + Replication: replication, + }) + if err != nil { + return 0, fmt.Errorf("failed to assign volume: %v", err) + } + + // Parse volume ID from file ID + fid, err := needle.ParseFileIdFromString(assignResult.Fid) + if err != nil { + return 0, fmt.Errorf("failed to parse file ID: %v", err) + } + volumeId := fid.VolumeId + + // Create uploader for upload + uploader, err := operation.NewUploader() + if err != nil { + return 0, fmt.Errorf("failed to create uploader: %v", err) + } + + // Upload test data to the same volume using the assigned file IDs + // The first FID is assignResult.Fid, subsequent ones increment the needle key + baseNeedleKey := uint64(fid.Key) + for i := 0; i < numFiles; i++ { + data := []byte(fmt.Sprintf("test data for replicated volume sync test - entry %d - padding to make it larger %s", + i, strings.Repeat("x", 1000))) + + // Construct FID for this file (same volume, incrementing needle key) + currentFid := fmt.Sprintf("%d,%x%08x", volumeId, baseNeedleKey+uint64(i), fid.Cookie) + uploadUrl := fmt.Sprintf("http://%s/%s", assignResult.Url, currentFid) + + _, _, uploadErr := uploader.Upload(context.Background(), bytes.NewReader(data), &operation.UploadOption{ + UploadUrl: uploadUrl, + Filename: fmt.Sprintf("file%d.txt", i), + }) + if uploadErr != nil { + t.Logf("Upload %d failed: %v", i, uploadErr) + } + } + + return volumeId, nil +} diff --git a/weed/shell/command_ec_common.go b/weed/shell/command_ec_common.go index bce0141f2..388303ebe 100644 --- a/weed/shell/command_ec_common.go +++ b/weed/shell/command_ec_common.go @@ -202,6 +202,35 @@ func collectEcNodes(commandEnv *CommandEnv, diskType types.DiskType) (ecNodes [] return collectEcNodesForDC(commandEnv, "", diskType) } +// collectVolumeIdToCollection returns a map from volume ID to its collection name +func collectVolumeIdToCollection(t *master_pb.TopologyInfo, vids []needle.VolumeId) map[needle.VolumeId]string { + result := make(map[needle.VolumeId]string) + if len(vids) == 0 { + return result + } + + vidSet := make(map[needle.VolumeId]bool) + for _, vid := range vids { + vidSet[vid] = true + } + + for _, dc := range t.DataCenterInfos { + for _, r := range dc.RackInfos { + for _, dn := range r.DataNodeInfos { + for _, diskInfo := range dn.DiskInfos { + for _, vi := range diskInfo.VolumeInfos { + vid := needle.VolumeId(vi.Id) + if vidSet[vid] { + result[vid] = vi.Collection + } + } + } + } + } + } + return result +} + func collectCollectionsForVolumeIds(t *master_pb.TopologyInfo, vids []needle.VolumeId) []string { if len(vids) == 0 { return nil diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go index 2d62aff3f..33da76664 100644 --- a/weed/shell/command_ec_encode.go +++ b/weed/shell/command_ec_encode.go @@ -153,6 +153,9 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr return nil } + // Collect volume ID to collection name mapping for the sync operation + volumeIdToCollection := collectVolumeIdToCollection(topologyInfo, volumeIds) + // Collect volume locations BEFORE EC encoding starts to avoid race condition // where the master metadata is updated after EC encoding but before deletion fmt.Printf("Collecting volume locations for %d volumes before EC encoding...\n", len(volumeIds)) @@ -162,7 +165,7 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr } // encode all requested volumes... - if err = doEcEncode(commandEnv, *collection, volumeIds, *maxParallelization); err != nil { + if err = doEcEncode(commandEnv, writer, volumeIdToCollection, volumeIds, *maxParallelization); err != nil { return fmt.Errorf("ec encode for volumes %v: %w", volumeIds, err) } // ...re-balance ec shards... @@ -192,7 +195,7 @@ func volumeLocations(commandEnv *CommandEnv, volumeIds []needle.VolumeId) (map[n return res, nil } -func doEcEncode(commandEnv *CommandEnv, collection string, volumeIds []needle.VolumeId, maxParallelization int) error { +func doEcEncode(commandEnv *CommandEnv, writer io.Writer, volumeIdToCollection map[needle.VolumeId]string, volumeIds []needle.VolumeId, maxParallelization int) error { if !commandEnv.isLocked() { return fmt.Errorf("lock is lost") } @@ -217,10 +220,26 @@ func doEcEncode(commandEnv *CommandEnv, collection string, volumeIds []needle.Vo return err } - // generate ec shards + // Sync replicas and select the best one for each volume (with highest file count) + // This addresses data inconsistency risk in multi-replica volumes (issue #7797) + // by syncing missing entries between replicas before encoding + bestReplicas := make(map[needle.VolumeId]wdclient.Location) + for _, vid := range volumeIds { + locs := locations[vid] + collection := volumeIdToCollection[vid] + // Sync missing entries between replicas, then select the best one + bestLoc, selectErr := syncAndSelectBestReplica(commandEnv.option.GrpcDialOption, vid, collection, locs, "", writer) + if selectErr != nil { + return fmt.Errorf("failed to sync and select replica for volume %d: %v", vid, selectErr) + } + bestReplicas[vid] = bestLoc + } + + // generate ec shards using the best replica for each volume ewg.Reset() - for i, vid := range volumeIds { - target := locations[vid][i%len(locations[vid])] + for _, vid := range volumeIds { + target := bestReplicas[vid] + collection := volumeIdToCollection[vid] ewg.Add(func() error { if err := generateEcShards(commandEnv.option.GrpcDialOption, vid, collection, target.ServerAddress()); err != nil { return fmt.Errorf("generate ec shards for volume %d on %s: %v", vid, target.Url, err) @@ -239,8 +258,9 @@ func doEcEncode(commandEnv *CommandEnv, collection string, volumeIds []needle.Vo } ewg.Reset() - for i, vid := range volumeIds { - target := locations[vid][i%len(locations[vid])] + for _, vid := range volumeIds { + target := bestReplicas[vid] + collection := volumeIdToCollection[vid] ewg.Add(func() error { if err := mountEcShards(commandEnv.option.GrpcDialOption, collection, vid, target.ServerAddress(), shardIds); err != nil { return fmt.Errorf("mount ec shards for volume %d on %s: %v", vid, target.Url, err) diff --git a/weed/shell/command_volume_replica_check.go b/weed/shell/command_volume_replica_check.go new file mode 100644 index 000000000..cbc18e7f7 --- /dev/null +++ b/weed/shell/command_volume_replica_check.go @@ -0,0 +1,353 @@ +package shell + +import ( + "bytes" + "context" + "fmt" + "io" + "math" + "sync" + "time" + + "google.golang.org/grpc" + + "github.com/seaweedfs/seaweedfs/weed/operation" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" + "github.com/seaweedfs/seaweedfs/weed/storage/needle" + "github.com/seaweedfs/seaweedfs/weed/storage/needle_map" + "github.com/seaweedfs/seaweedfs/weed/wdclient" +) + +// VolumeReplicaStatus represents the status of a volume replica +type VolumeReplicaStatus struct { + Location wdclient.Location + FileCount uint64 + FileDeletedCount uint64 + VolumeSize uint64 + IsReadOnly bool + Error error +} + +// getVolumeReplicaStatus retrieves the current status of a volume replica +func getVolumeReplicaStatus(grpcDialOption grpc.DialOption, vid needle.VolumeId, location wdclient.Location) VolumeReplicaStatus { + status := VolumeReplicaStatus{ + Location: location, + } + + err := operation.WithVolumeServerClient(false, location.ServerAddress(), grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + resp, reqErr := volumeServerClient.VolumeStatus(context.Background(), &volume_server_pb.VolumeStatusRequest{ + VolumeId: uint32(vid), + }) + if reqErr != nil { + return reqErr + } + if resp != nil { + status.FileCount = resp.FileCount + status.FileDeletedCount = resp.FileDeletedCount + status.VolumeSize = resp.VolumeSize + status.IsReadOnly = resp.IsReadOnly + } + return nil + }) + status.Error = err + return status +} + +// getVolumeReplicaStatuses retrieves status for all replicas of a volume in parallel +func getVolumeReplicaStatuses(grpcDialOption grpc.DialOption, vid needle.VolumeId, locations []wdclient.Location) []VolumeReplicaStatus { + statuses := make([]VolumeReplicaStatus, len(locations)) + var wg sync.WaitGroup + for i, location := range locations { + wg.Add(1) + go func(i int, location wdclient.Location) { + defer wg.Done() + statuses[i] = getVolumeReplicaStatus(grpcDialOption, vid, location) + }(i, location) + } + wg.Wait() + return statuses +} + +// replicaUnionBuilder builds a union replica by copying missing entries from other replicas +type replicaUnionBuilder struct { + grpcDialOption grpc.DialOption + writer io.Writer + vid needle.VolumeId + collection string +} + +// buildUnionReplica finds the largest replica and copies missing entries from other replicas into it. +// If excludeFromSelection is non-empty, that server won't be selected as the target but will still +// be used as a source for missing entries. +// Returns the location of the union replica (the one that now has all entries). +func (rub *replicaUnionBuilder) buildUnionReplica(locations []wdclient.Location, excludeFromSelection string) (wdclient.Location, int, error) { + if len(locations) == 0 { + return wdclient.Location{}, 0, fmt.Errorf("no replicas available") + } + if len(locations) == 1 { + if locations[0].Url == excludeFromSelection { + return wdclient.Location{}, 0, fmt.Errorf("only replica is excluded") + } + return locations[0], 0, nil + } + + // Step 1: Find the largest replica (highest file count) that's not excluded + statuses := getVolumeReplicaStatuses(rub.grpcDialOption, rub.vid, locations) + + bestIdx := -1 + var bestFileCount uint64 + for i, s := range statuses { + if s.Error == nil && locations[i].Url != excludeFromSelection { + if bestIdx == -1 || s.FileCount > bestFileCount { + bestIdx = i + bestFileCount = s.FileCount + } + } + } + + if bestIdx == -1 { + return wdclient.Location{}, 0, fmt.Errorf("could not find valid replica (all excluded or errored)") + } + + bestLocation := locations[bestIdx] + fmt.Fprintf(rub.writer, "volume %d: selected %s as best replica (file count: %d)\n", + rub.vid, bestLocation.Url, bestFileCount) + + // Step 2: Read index database from the best replica + bestDB := needle_map.NewMemDb() + if bestDB == nil { + return wdclient.Location{}, 0, fmt.Errorf("failed to allocate in-memory needle DB") + } + defer bestDB.Close() + + if err := rub.readIndexDatabase(bestDB, bestLocation.ServerAddress()); err != nil { + return wdclient.Location{}, 0, fmt.Errorf("read index from best replica %s: %w", bestLocation.Url, err) + } + + // Step 3: For each other replica (including excluded), find entries missing from best and copy them + totalSynced := 0 + cutoffFromAtNs := uint64(time.Now().UnixNano()) + + for i, loc := range locations { + if i == bestIdx { + continue + } + if statuses[i].Error != nil { + fmt.Fprintf(rub.writer, " skipping %s: %v\n", loc.Url, statuses[i].Error) + continue + } + + // Read this replica's index + otherDB := needle_map.NewMemDb() + if otherDB == nil { + fmt.Fprintf(rub.writer, " skipping %s: failed to allocate DB\n", loc.Url) + continue + } + + if err := rub.readIndexDatabase(otherDB, loc.ServerAddress()); err != nil { + otherDB.Close() + fmt.Fprintf(rub.writer, " skipping %s: %v\n", loc.Url, err) + continue + } + + // Find entries in other that are missing from best + var missingNeedles []needle_map.NeedleValue + + otherDB.AscendingVisit(func(nv needle_map.NeedleValue) error { + if nv.Size.IsDeleted() { + return nil + } + if _, found := bestDB.Get(nv.Key); !found { + // Check if this entry was written too recently (after sync started) + // Skip entries written after sync started to avoid copying in-flight writes + if needleMeta, err := readNeedleMeta(rub.grpcDialOption, loc.ServerAddress(), uint32(rub.vid), nv); err == nil { + if needleMeta.AppendAtNs > cutoffFromAtNs { + return nil // Skip entries written after sync started + } + } + missingNeedles = append(missingNeedles, nv) + } + return nil + }) + otherDB.Close() + + if len(missingNeedles) == 0 { + continue + } + + // Copy missing entries from this replica to best replica + syncedFromThis := 0 + for _, nv := range missingNeedles { + needleBlob, err := rub.readNeedleBlob(loc.ServerAddress(), nv) + if err != nil { + fmt.Fprintf(rub.writer, " warning: read needle %d from %s: %v\n", nv.Key, loc.Url, err) + continue + } + + if err := rub.writeNeedleBlob(bestLocation.ServerAddress(), nv, needleBlob); err != nil { + fmt.Fprintf(rub.writer, " warning: write needle %d to %s: %v\n", nv.Key, bestLocation.Url, err) + continue + } + + // Also add to bestDB so we don't copy duplicates from other replicas + bestDB.Set(nv.Key, nv.Offset, nv.Size) + syncedFromThis++ + } + + if syncedFromThis > 0 { + fmt.Fprintf(rub.writer, " copied %d entries from %s to %s\n", + syncedFromThis, loc.Url, bestLocation.Url) + totalSynced += syncedFromThis + } + } + + return bestLocation, totalSynced, nil +} + +func (rub *replicaUnionBuilder) readIndexDatabase(db *needle_map.MemDb, server pb.ServerAddress) error { + var buf bytes.Buffer + + err := operation.WithVolumeServerClient(true, server, rub.grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + copyFileClient, err := volumeServerClient.CopyFile(context.Background(), &volume_server_pb.CopyFileRequest{ + VolumeId: uint32(rub.vid), + Ext: ".idx", + CompactionRevision: math.MaxUint32, + StopOffset: math.MaxInt64, + Collection: rub.collection, + IsEcVolume: false, + IgnoreSourceFileNotFound: false, + }) + if err != nil { + return fmt.Errorf("start copy: %w", err) + } + + for { + resp, recvErr := copyFileClient.Recv() + if recvErr == io.EOF { + break + } + if recvErr != nil { + return fmt.Errorf("receive: %w", recvErr) + } + buf.Write(resp.FileContent) + } + return nil + }) + if err != nil { + return err + } + + return db.LoadFilterFromReaderAt(bytes.NewReader(buf.Bytes()), true, false) +} + +func (rub *replicaUnionBuilder) readNeedleBlob(server pb.ServerAddress, nv needle_map.NeedleValue) ([]byte, error) { + var needleBlob []byte + err := operation.WithVolumeServerClient(false, server, rub.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + resp, err := client.ReadNeedleBlob(context.Background(), &volume_server_pb.ReadNeedleBlobRequest{ + VolumeId: uint32(rub.vid), + Offset: nv.Offset.ToActualOffset(), + Size: int32(nv.Size), + }) + if err != nil { + return err + } + needleBlob = resp.NeedleBlob + return nil + }) + return needleBlob, err +} + +func (rub *replicaUnionBuilder) writeNeedleBlob(server pb.ServerAddress, nv needle_map.NeedleValue, needleBlob []byte) error { + return operation.WithVolumeServerClient(false, server, rub.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + _, err := client.WriteNeedleBlob(context.Background(), &volume_server_pb.WriteNeedleBlobRequest{ + VolumeId: uint32(rub.vid), + NeedleId: uint64(nv.Key), + Size: int32(nv.Size), + NeedleBlob: needleBlob, + }) + return err + }) +} + +// syncAndSelectBestReplica finds the largest replica, copies missing entries from other replicas +// into it to create a union, then returns this union replica for the operation. +// If excludeFromSelection is non-empty, that server won't be selected but will still contribute entries. +// +// The process: +// 1. Find the replica with the highest file count (the "best" one), excluding excludeFromSelection +// 2. For each other replica, find entries missing from best and copy them to best +// 3. Return the best replica which now contains the union of all entries +func syncAndSelectBestReplica(grpcDialOption grpc.DialOption, vid needle.VolumeId, collection string, locations []wdclient.Location, excludeFromSelection string, writer io.Writer) (wdclient.Location, error) { + if len(locations) == 0 { + return wdclient.Location{}, fmt.Errorf("no replicas available for volume %d", vid) + } + + // Filter for checking consistency (exclude the excluded server) + var checkLocations []wdclient.Location + for _, loc := range locations { + if loc.Url != excludeFromSelection { + checkLocations = append(checkLocations, loc) + } + } + + if len(checkLocations) == 0 { + return wdclient.Location{}, fmt.Errorf("no replicas available for volume %d after exclusion", vid) + } + + if len(checkLocations) == 1 && len(locations) == 1 { + return checkLocations[0], nil + } + + // Check if replicas are already consistent (skip sync if so) + statuses := getVolumeReplicaStatuses(grpcDialOption, vid, locations) + var validStatuses []VolumeReplicaStatus + for i, s := range statuses { + if s.Error == nil { + // Include all for consistency check + validStatuses = append(validStatuses, s) + _ = i + } + } + + if len(validStatuses) > 1 { + allSame := true + for _, s := range validStatuses[1:] { + if s.FileCount != validStatuses[0].FileCount { + allSame = false + break + } + } + if allSame { + // All replicas are consistent, return the best non-excluded one + for _, s := range validStatuses { + if s.Location.Url != excludeFromSelection { + fmt.Fprintf(writer, "volume %d: all %d replicas are consistent (file count: %d)\n", + vid, len(validStatuses), s.FileCount) + return s.Location, nil + } + } + } + } + + // Replicas are inconsistent, build union on the best replica + fmt.Fprintf(writer, "volume %d: replicas are inconsistent, building union...\n", vid) + + builder := &replicaUnionBuilder{ + grpcDialOption: grpcDialOption, + writer: writer, + vid: vid, + collection: collection, + } + + unionLocation, totalSynced, err := builder.buildUnionReplica(locations, excludeFromSelection) + if err != nil { + return wdclient.Location{}, fmt.Errorf("failed to build union replica: %w", err) + } + + if totalSynced > 0 { + fmt.Fprintf(writer, "volume %d: added %d entries to union replica %s\n", vid, totalSynced, unionLocation.Url) + } + + return unionLocation, nil +} diff --git a/weed/shell/command_volume_replica_check_test.go b/weed/shell/command_volume_replica_check_test.go new file mode 100644 index 000000000..7629ea862 --- /dev/null +++ b/weed/shell/command_volume_replica_check_test.go @@ -0,0 +1,273 @@ +package shell + +import ( + "bytes" + "testing" + + "github.com/seaweedfs/seaweedfs/weed/storage/needle_map" + "github.com/seaweedfs/seaweedfs/weed/storage/types" +) + +func TestBuildUnionFromMultipleIndexDatabases(t *testing.T) { + // Test that we can correctly identify missing entries between replicas + + // Create mock index databases representing different replicas + replicaA := needle_map.NewMemDb() + replicaB := needle_map.NewMemDb() + replicaC := needle_map.NewMemDb() + defer replicaA.Close() + defer replicaB.Close() + defer replicaC.Close() + + // Replica A has entries 1, 2, 3, 4, 5 + replicaA.Set(types.NeedleId(1), types.Offset{}, types.Size(100)) + replicaA.Set(types.NeedleId(2), types.Offset{}, types.Size(100)) + replicaA.Set(types.NeedleId(3), types.Offset{}, types.Size(100)) + replicaA.Set(types.NeedleId(4), types.Offset{}, types.Size(100)) + replicaA.Set(types.NeedleId(5), types.Offset{}, types.Size(100)) + + // Replica B has entries 1, 2, 3, 6, 7 (missing 4, 5 from A, has unique 6, 7) + replicaB.Set(types.NeedleId(1), types.Offset{}, types.Size(100)) + replicaB.Set(types.NeedleId(2), types.Offset{}, types.Size(100)) + replicaB.Set(types.NeedleId(3), types.Offset{}, types.Size(100)) + replicaB.Set(types.NeedleId(6), types.Offset{}, types.Size(100)) + replicaB.Set(types.NeedleId(7), types.Offset{}, types.Size(100)) + + // Replica C has entries 1, 2, 8 (minimal overlap, has unique 8) + replicaC.Set(types.NeedleId(1), types.Offset{}, types.Size(100)) + replicaC.Set(types.NeedleId(2), types.Offset{}, types.Size(100)) + replicaC.Set(types.NeedleId(8), types.Offset{}, types.Size(100)) + + // Test: Find entries in B that are missing from A + var missingFromA []types.NeedleId + replicaB.AscendingVisit(func(nv needle_map.NeedleValue) error { + if _, found := replicaA.Get(nv.Key); !found { + missingFromA = append(missingFromA, nv.Key) + } + return nil + }) + + if len(missingFromA) != 2 { + t.Errorf("Expected 2 entries missing from A (6, 7), got %d: %v", len(missingFromA), missingFromA) + } + + // Test: Find entries in C that are missing from A + var missingFromAinC []types.NeedleId + replicaC.AscendingVisit(func(nv needle_map.NeedleValue) error { + if _, found := replicaA.Get(nv.Key); !found { + missingFromAinC = append(missingFromAinC, nv.Key) + } + return nil + }) + + if len(missingFromAinC) != 1 { + t.Errorf("Expected 1 entry missing from A in C (8), got %d: %v", len(missingFromAinC), missingFromAinC) + } + + // Simulate building union: add missing entries to A + for _, id := range missingFromA { + replicaA.Set(id, types.Offset{}, types.Size(100)) + } + for _, id := range missingFromAinC { + replicaA.Set(id, types.Offset{}, types.Size(100)) + } + + // Verify A now has all 8 unique entries + count := 0 + replicaA.AscendingVisit(func(nv needle_map.NeedleValue) error { + count++ + return nil + }) + + if count != 8 { + t.Errorf("Expected union to have 8 entries, got %d", count) + } +} + +func TestFindLargestReplica(t *testing.T) { + // Test that we correctly identify the replica with the most entries + + type replicaInfo struct { + url string + fileCount uint64 + } + + testCases := []struct { + name string + replicas []replicaInfo + expected string + }{ + { + name: "single replica", + replicas: []replicaInfo{ + {"server1:8080", 100}, + }, + expected: "server1:8080", + }, + { + name: "first is largest", + replicas: []replicaInfo{ + {"server1:8080", 100}, + {"server2:8080", 50}, + {"server3:8080", 75}, + }, + expected: "server1:8080", + }, + { + name: "last is largest", + replicas: []replicaInfo{ + {"server1:8080", 50}, + {"server2:8080", 75}, + {"server3:8080", 100}, + }, + expected: "server3:8080", + }, + { + name: "middle is largest", + replicas: []replicaInfo{ + {"server1:8080", 50}, + {"server2:8080", 100}, + {"server3:8080", 75}, + }, + expected: "server2:8080", + }, + { + name: "all equal - pick first", + replicas: []replicaInfo{ + {"server1:8080", 100}, + {"server2:8080", 100}, + {"server3:8080", 100}, + }, + expected: "server1:8080", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // Find the largest + bestIdx := 0 + var bestCount uint64 = 0 + for i, r := range tc.replicas { + if i == 0 || r.fileCount > bestCount { + bestIdx = i + bestCount = r.fileCount + } + } + + if tc.replicas[bestIdx].url != tc.expected { + t.Errorf("Expected %s, got %s", tc.expected, tc.replicas[bestIdx].url) + } + }) + } +} + +func TestDeletedEntriesAreSkipped(t *testing.T) { + // Test that deleted entries are not copied during sync + + replicaA := needle_map.NewMemDb() + replicaB := needle_map.NewMemDb() + defer replicaA.Close() + defer replicaB.Close() + + // Replica A has entries 1, 2, 3 (all valid) + replicaA.Set(types.NeedleId(1), types.Offset{}, types.Size(100)) + replicaA.Set(types.NeedleId(2), types.Offset{}, types.Size(100)) + replicaA.Set(types.NeedleId(3), types.Offset{}, types.Size(100)) + + // Replica B has entry 4 valid, entry 5 deleted + replicaB.Set(types.NeedleId(4), types.Offset{}, types.Size(100)) + replicaB.Set(types.NeedleId(5), types.Offset{}, types.Size(-1)) // Deleted (negative size) + + // Find non-deleted entries in B missing from A + var missingFromA []types.NeedleId + replicaB.AscendingVisit(func(nv needle_map.NeedleValue) error { + if nv.Size.IsDeleted() { + return nil // Skip deleted + } + if _, found := replicaA.Get(nv.Key); !found { + missingFromA = append(missingFromA, nv.Key) + } + return nil + }) + + if len(missingFromA) != 1 { + t.Errorf("Expected 1 non-deleted entry missing (4), got %d: %v", len(missingFromA), missingFromA) + } + + if len(missingFromA) > 0 && missingFromA[0] != types.NeedleId(4) { + t.Errorf("Expected missing entry to be 4, got %d", missingFromA[0]) + } +} + +func TestReplicaUnionBuilder_EmptyLocations(t *testing.T) { + // Test handling of empty locations slice + builder := &replicaUnionBuilder{ + writer: &bytes.Buffer{}, + vid: 1, + } + + _, count, err := builder.buildUnionReplica(nil, "") + if err == nil { + t.Error("Expected error for empty locations") + } + if count != 0 { + t.Errorf("Expected 0 synced, got %d", count) + } +} + +func TestAvoidDuplicateCopies(t *testing.T) { + // Test that when building union, we don't copy the same entry multiple times + // by updating the best replica's in-memory index after each copy + + bestDB := needle_map.NewMemDb() + defer bestDB.Close() + + // Best replica has entries 1, 2 + bestDB.Set(types.NeedleId(1), types.Offset{}, types.Size(100)) + bestDB.Set(types.NeedleId(2), types.Offset{}, types.Size(100)) + + // Simulate two other replicas both having entry 3 + otherReplicas := [][]types.NeedleId{ + {3, 4}, // Replica B has 3, 4 + {3, 5}, // Replica C has 3, 5 + } + + copiedEntries := make(map[types.NeedleId]int) // Track how many times each entry is "copied" + + for _, otherEntries := range otherReplicas { + for _, id := range otherEntries { + if _, found := bestDB.Get(id); !found { + // Would copy this entry + copiedEntries[id]++ + // Add to bestDB to prevent duplicate copies + bestDB.Set(id, types.Offset{}, types.Size(100)) + } + } + } + + // Entry 3 should only be copied once (from first replica that has it) + if copiedEntries[types.NeedleId(3)] != 1 { + t.Errorf("Entry 3 should be copied exactly once, got %d", copiedEntries[types.NeedleId(3)]) + } + + // Entry 4 should be copied once + if copiedEntries[types.NeedleId(4)] != 1 { + t.Errorf("Entry 4 should be copied exactly once, got %d", copiedEntries[types.NeedleId(4)]) + } + + // Entry 5 should be copied once + if copiedEntries[types.NeedleId(5)] != 1 { + t.Errorf("Entry 5 should be copied exactly once, got %d", copiedEntries[types.NeedleId(5)]) + } + + // Best should now have 5 entries total + count := 0 + bestDB.AscendingVisit(func(nv needle_map.NeedleValue) error { + count++ + return nil + }) + if count != 5 { + t.Errorf("Expected 5 entries in union, got %d", count) + } +} + diff --git a/weed/shell/command_volume_tier_move.go b/weed/shell/command_volume_tier_move.go index 3f8d2fc2e..cfa2eaef3 100644 --- a/weed/shell/command_volume_tier_move.go +++ b/weed/shell/command_volume_tier_move.go @@ -102,6 +102,9 @@ func (c *commandVolumeTierMove) Do(args []string, commandEnv *CommandEnv, writer } fmt.Printf("tier move volumes: %v\n", volumeIds) + // Collect volume ID to collection name mapping for the sync operation + volumeIdToCollection := collectVolumeIdToCollection(topologyInfo, volumeIds) + _, allLocations := collectVolumeReplicaLocations(topologyInfo) allLocations = filterLocationsByDiskType(allLocations, toDiskType) keepDataNodesSorted(allLocations, toDiskType) @@ -143,7 +146,8 @@ func (c *commandVolumeTierMove) Do(args []string, commandEnv *CommandEnv, writer } for _, vid := range volumeIds { - if err = c.doVolumeTierMove(commandEnv, writer, vid, toDiskType, allLocations); err != nil { + collection := volumeIdToCollection[vid] + if err = c.doVolumeTierMove(commandEnv, writer, vid, collection, toDiskType, allLocations); err != nil { fmt.Printf("tier move volume %d: %v\n", vid, err) } allLocations = rotateDataNodes(allLocations) @@ -192,7 +196,7 @@ func isOneOf(server string, locations []wdclient.Location) bool { return false } -func (c *commandVolumeTierMove) doVolumeTierMove(commandEnv *CommandEnv, writer io.Writer, vid needle.VolumeId, toDiskType types.DiskType, allLocations []location) (err error) { +func (c *commandVolumeTierMove) doVolumeTierMove(commandEnv *CommandEnv, writer io.Writer, vid needle.VolumeId, collection string, toDiskType types.DiskType, allLocations []location) (err error) { // find volume location locations, found := commandEnv.MasterClient.GetLocationsClone(uint32(vid)) if !found { @@ -208,12 +212,18 @@ func (c *commandVolumeTierMove) doVolumeTierMove(commandEnv *CommandEnv, writer if isOneOf(dst.dataNode.Id, locations) { continue } - var sourceVolumeServer pb.ServerAddress - for _, loc := range locations { - if loc.Url != dst.dataNode.Id { - sourceVolumeServer = loc.ServerAddress() - } + + // Sync replicas and select the best one (with highest file count) for multi-replica volumes + // This addresses data inconsistency risk in multi-replica volumes (issue #7797) + // by syncing missing entries between replicas before moving + sourceLoc, selectErr := syncAndSelectBestReplica( + commandEnv.option.GrpcDialOption, vid, collection, locations, dst.dataNode.Id, writer) + if selectErr != nil { + fmt.Fprintf(writer, "failed to sync and select source replica for volume %d: %v\n", vid, selectErr) + continue } + sourceVolumeServer := sourceLoc.ServerAddress() + if sourceVolumeServer == "" { continue }