diff --git a/test/erasure_coding/ec_integration_test.go b/test/erasure_coding/ec_integration_test.go index 9d7cde572..25693cb75 100644 --- a/test/erasure_coding/ec_integration_test.go +++ b/test/erasure_coding/ec_integration_test.go @@ -15,8 +15,10 @@ import ( "github.com/seaweedfs/seaweedfs/weed/operation" "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" "github.com/seaweedfs/seaweedfs/weed/shell" "github.com/seaweedfs/seaweedfs/weed/storage/needle" + "github.com/seaweedfs/seaweedfs/weed/wdclient" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "google.golang.org/grpc" @@ -386,7 +388,7 @@ func startSeaweedFSCluster(ctx context.Context, dataDir string) (*TestCluster, e // Find weed binary weedBinary := findWeedBinary() if weedBinary == "" { - return nil, fmt.Errorf("weed binary not found") + return nil, fmt.Errorf("weed binary not found - build with 'go build' or 'make' first") } cluster := &TestCluster{} @@ -395,6 +397,11 @@ func startSeaweedFSCluster(ctx context.Context, dataDir string) (*TestCluster, e masterDir := filepath.Join(dataDir, "master") os.MkdirAll(masterDir, 0755) + // Create an empty security.toml to disable JWT authentication in tests + // This prevents the test from picking up ~/.seaweedfs/security.toml + securityToml := filepath.Join(dataDir, "security.toml") + os.WriteFile(securityToml, []byte("# Empty security config for testing\n"), 0644) + // Start master server masterCmd := exec.CommandContext(ctx, weedBinary, "master", "-port", "9333", @@ -403,6 +410,7 @@ func startSeaweedFSCluster(ctx context.Context, dataDir string) (*TestCluster, e "-ip", "127.0.0.1", "-peers", "none", // Faster startup when no multiple masters needed ) + masterCmd.Dir = dataDir // Run from test dir so it picks up our security.toml masterLogFile, err := os.Create(filepath.Join(masterDir, "master.log")) if err != nil { @@ -435,6 +443,7 @@ func startSeaweedFSCluster(ctx context.Context, dataDir string) (*TestCluster, e "-dataCenter", "dc1", "-rack", rack, ) + volumeCmd.Dir = dataDir // Run from test dir so it picks up our security.toml volumeLogFile, err := os.Create(filepath.Join(volumeDir, "volume.log")) if err != nil { @@ -469,6 +478,10 @@ func findWeedBinary() string { for _, candidate := range candidates { if _, err := os.Stat(candidate); err == nil { + // Convert to absolute path so it works when command's Dir is changed + if absPath, err := filepath.Abs(candidate); err == nil { + return absPath + } return candidate } } @@ -2174,3 +2187,249 @@ func countShardsPerRack(testDir string, volumeId uint32) map[string]int { return rackDistribution } + +// TestECEncodeReplicatedVolumeSync tests that ec.encode properly syncs missing entries +// between replicas before encoding. This addresses issue #7797. +func TestECEncodeReplicatedVolumeSync(t *testing.T) { + if testing.Short() { + t.Skip("Skipping replicated volume sync integration test in short mode") + } + + // Create temporary directory for test data + testDir, err := os.MkdirTemp("", "seaweedfs_ec_replica_sync_test_") + require.NoError(t, err) + defer os.RemoveAll(testDir) + + // Start SeaweedFS cluster + ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) + defer cancel() + + cluster, err := startSeaweedFSCluster(ctx, testDir) + require.NoError(t, err) + defer cluster.Stop() + + // Wait for servers to be ready + require.NoError(t, waitForServer("127.0.0.1:9333", 30*time.Second)) + for i := 0; i < 6; i++ { + require.NoError(t, waitForServer(fmt.Sprintf("127.0.0.1:%d", 8080+i), 30*time.Second)) + } + + // Create command environment + options := &shell.ShellOptions{ + Masters: stringPtr("127.0.0.1:9333"), + GrpcDialOption: grpc.WithInsecure(), + FilerGroup: stringPtr("default"), + } + commandEnv := shell.NewCommandEnv(options) + + // Connect to master with longer timeout + ctx2, cancel2 := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel2() + go commandEnv.MasterClient.KeepConnectedToMaster(ctx2) + commandEnv.MasterClient.WaitUntilConnected(ctx2) + + // Wait for volume servers to register with master + time.Sleep(5 * time.Second) + + // Test: Create replicated volume and verify sync behavior with consistent replicas + t.Run("sync_replicated_volume_consistent", func(t *testing.T) { + const testCollection = "replicated_consistent" + const testReplication = "010" // 2 copies on different servers on the same rack + + // Retry a few times as volume servers may still be registering + var volumeId needle.VolumeId + var uploadErr error + for retry := 0; retry < 5; retry++ { + volumeId, uploadErr = uploadTestDataWithReplication(t, "127.0.0.1:9333", testCollection, testReplication) + if uploadErr == nil { + break + } + t.Logf("Upload attempt %d failed: %v, retrying...", retry+1, uploadErr) + time.Sleep(3 * time.Second) + } + if uploadErr != nil { + t.Skipf("Could not create replicated volume: %v", uploadErr) + return + } + t.Logf("Created replicated volume %d with collection %s, replication %s", volumeId, testCollection, testReplication) + + // Acquire lock + locked, unlock := tryLockWithTimeout(t, commandEnv, 30*time.Second) + if !locked { + t.Skip("Could not acquire lock within timeout") + } + defer unlock() + + // Execute EC encoding with the same collection + ecEncodeCmd := shell.Commands[findCommandIndex("ec.encode")] + args := []string{ + "-volumeId", fmt.Sprintf("%d", volumeId), + "-collection", testCollection, + "-force", + } + + outputStr, encodeErr := captureCommandOutput(t, ecEncodeCmd, args, commandEnv) + t.Logf("EC encode output:\n%s", outputStr) + + // For consistent replicas, should see "all X replicas are consistent" + assert.Contains(t, outputStr, "replicas are consistent", "Should detect replicas are consistent") + + if encodeErr != nil { + t.Logf("EC encoding result: %v", encodeErr) + } else { + t.Log("EC encoding completed successfully") + } + }) + + // Test: Create divergent replicas and verify sync/union code is exercised + t.Run("sync_replicated_volume_divergent", func(t *testing.T) { + const testCollection = "replicated_divergent" + const testReplication = "010" // 2 copies on different servers on the same rack + + // Create replicated volume with initial data + var volumeId needle.VolumeId + var uploadErr error + for retry := 0; retry < 5; retry++ { + volumeId, uploadErr = uploadTestDataWithReplication(t, "127.0.0.1:9333", testCollection, testReplication) + if uploadErr == nil { + break + } + t.Logf("Upload attempt %d failed: %v, retrying...", retry+1, uploadErr) + time.Sleep(3 * time.Second) + } + if uploadErr != nil { + t.Skipf("Could not create replicated volume: %v", uploadErr) + return + } + t.Logf("Created replicated volume %d with collection %s, replication %s", volumeId, testCollection, testReplication) + + // Get volume locations to identify the two replicas + locations, found := commandEnv.MasterClient.GetLocationsClone(uint32(volumeId)) + if !found || len(locations) < 2 { + t.Skipf("Could not get 2 replica locations for volume %d (got %d)", volumeId, len(locations)) + return + } + t.Logf("Volume %d has replicas at: %v and %v", volumeId, locations[0].Url, locations[1].Url) + + // Write an extra entry to ONLY one replica to create divergence + // We'll use the WriteNeedleBlob gRPC call to inject data directly + extraNeedleId := uint64(999999) // Use a high needle ID unlikely to conflict + extraData := []byte("extra data written to only one replica to create divergence") + + err := injectNeedleToOneReplica(grpc.WithInsecure(), volumeId, locations[0], extraNeedleId, extraData) + if err != nil { + t.Logf("Could not inject divergent needle (may not be supported): %v", err) + // Fall back to testing consistent path + } else { + t.Logf("Injected extra needle %d to replica %s to create divergence", extraNeedleId, locations[0].Url) + } + + // Acquire lock + locked, unlock := tryLockWithTimeout(t, commandEnv, 30*time.Second) + if !locked { + t.Skip("Could not acquire lock within timeout") + } + defer unlock() + + // Execute EC encoding - should detect divergence and build union + ecEncodeCmd := shell.Commands[findCommandIndex("ec.encode")] + args := []string{ + "-volumeId", fmt.Sprintf("%d", volumeId), + "-collection", testCollection, + "-force", + } + + outputStr, encodeErr := captureCommandOutput(t, ecEncodeCmd, args, commandEnv) + t.Logf("EC encode output:\n%s", outputStr) + + // Check if divergence was detected + if strings.Contains(outputStr, "building union") { + t.Log("SUCCESS: Divergent replicas detected - sync/union code was exercised") + assert.Contains(t, outputStr, "selected", "Should show which replica was selected as best") + // The "copied" message only appears if the best replica is missing entries from other replicas + // In our case, we injected into what becomes the best replica (higher file count), + // so copying may not be needed. The key is that "building union" was triggered. + if strings.Contains(outputStr, "copied") { + t.Log("Entries were copied between replicas") + } else { + t.Log("Best replica already had all entries (injected entry was on best replica)") + } + } else if strings.Contains(outputStr, "replicas are consistent") { + t.Log("Replicas were consistent (injection may not have worked)") + } else { + t.Log("Single replica or sync not triggered") + } + + if encodeErr != nil { + t.Logf("EC encoding result: %v", encodeErr) + } else { + t.Log("EC encoding completed successfully") + } + }) +} + +// injectNeedleToOneReplica writes a needle directly to one replica to create divergence +func injectNeedleToOneReplica(grpcDialOption grpc.DialOption, vid needle.VolumeId, location wdclient.Location, needleId uint64, data []byte) error { + return operation.WithVolumeServerClient(false, location.ServerAddress(), grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + _, err := client.WriteNeedleBlob(context.Background(), &volume_server_pb.WriteNeedleBlobRequest{ + VolumeId: uint32(vid), + NeedleId: needleId, + Size: int32(len(data)), + NeedleBlob: data, + }) + return err + }) +} + +// uploadTestDataWithReplication uploads test data and returns the volume ID +// using the specified collection and replication level. All files are uploaded to the same volume. +func uploadTestDataWithReplication(t *testing.T, masterAddr string, collection string, replication string) (needle.VolumeId, error) { + const numFiles = 20 // Reduced count since we're uploading to same volume + + // Assign multiple file IDs from the same volume + assignResult, err := operation.Assign(context.Background(), func(ctx context.Context) pb.ServerAddress { + return pb.ServerAddress(masterAddr) + }, grpc.WithInsecure(), &operation.VolumeAssignRequest{ + Count: uint64(numFiles), + Collection: collection, + Replication: replication, + }) + if err != nil { + return 0, fmt.Errorf("failed to assign volume: %v", err) + } + + // Parse volume ID from file ID + fid, err := needle.ParseFileIdFromString(assignResult.Fid) + if err != nil { + return 0, fmt.Errorf("failed to parse file ID: %v", err) + } + volumeId := fid.VolumeId + + // Create uploader for upload + uploader, err := operation.NewUploader() + if err != nil { + return 0, fmt.Errorf("failed to create uploader: %v", err) + } + + // Upload test data to the same volume using the assigned file IDs + // The first FID is assignResult.Fid, subsequent ones increment the needle key + baseNeedleKey := uint64(fid.Key) + for i := 0; i < numFiles; i++ { + data := []byte(fmt.Sprintf("test data for replicated volume sync test - entry %d - padding to make it larger %s", + i, strings.Repeat("x", 1000))) + + // Construct FID for this file (same volume, incrementing needle key) + currentFid := fmt.Sprintf("%d,%x%08x", volumeId, baseNeedleKey+uint64(i), fid.Cookie) + uploadUrl := fmt.Sprintf("http://%s/%s", assignResult.Url, currentFid) + + _, _, uploadErr := uploader.Upload(context.Background(), bytes.NewReader(data), &operation.UploadOption{ + UploadUrl: uploadUrl, + Filename: fmt.Sprintf("file%d.txt", i), + }) + if uploadErr != nil { + t.Logf("Upload %d failed: %v", i, uploadErr) + } + } + + return volumeId, nil +} diff --git a/weed/shell/command_ec_common.go b/weed/shell/command_ec_common.go index bce0141f2..388303ebe 100644 --- a/weed/shell/command_ec_common.go +++ b/weed/shell/command_ec_common.go @@ -202,6 +202,35 @@ func collectEcNodes(commandEnv *CommandEnv, diskType types.DiskType) (ecNodes [] return collectEcNodesForDC(commandEnv, "", diskType) } +// collectVolumeIdToCollection returns a map from volume ID to its collection name +func collectVolumeIdToCollection(t *master_pb.TopologyInfo, vids []needle.VolumeId) map[needle.VolumeId]string { + result := make(map[needle.VolumeId]string) + if len(vids) == 0 { + return result + } + + vidSet := make(map[needle.VolumeId]bool) + for _, vid := range vids { + vidSet[vid] = true + } + + for _, dc := range t.DataCenterInfos { + for _, r := range dc.RackInfos { + for _, dn := range r.DataNodeInfos { + for _, diskInfo := range dn.DiskInfos { + for _, vi := range diskInfo.VolumeInfos { + vid := needle.VolumeId(vi.Id) + if vidSet[vid] { + result[vid] = vi.Collection + } + } + } + } + } + } + return result +} + func collectCollectionsForVolumeIds(t *master_pb.TopologyInfo, vids []needle.VolumeId) []string { if len(vids) == 0 { return nil diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go index 2d62aff3f..33da76664 100644 --- a/weed/shell/command_ec_encode.go +++ b/weed/shell/command_ec_encode.go @@ -153,6 +153,9 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr return nil } + // Collect volume ID to collection name mapping for the sync operation + volumeIdToCollection := collectVolumeIdToCollection(topologyInfo, volumeIds) + // Collect volume locations BEFORE EC encoding starts to avoid race condition // where the master metadata is updated after EC encoding but before deletion fmt.Printf("Collecting volume locations for %d volumes before EC encoding...\n", len(volumeIds)) @@ -162,7 +165,7 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr } // encode all requested volumes... - if err = doEcEncode(commandEnv, *collection, volumeIds, *maxParallelization); err != nil { + if err = doEcEncode(commandEnv, writer, volumeIdToCollection, volumeIds, *maxParallelization); err != nil { return fmt.Errorf("ec encode for volumes %v: %w", volumeIds, err) } // ...re-balance ec shards... @@ -192,7 +195,7 @@ func volumeLocations(commandEnv *CommandEnv, volumeIds []needle.VolumeId) (map[n return res, nil } -func doEcEncode(commandEnv *CommandEnv, collection string, volumeIds []needle.VolumeId, maxParallelization int) error { +func doEcEncode(commandEnv *CommandEnv, writer io.Writer, volumeIdToCollection map[needle.VolumeId]string, volumeIds []needle.VolumeId, maxParallelization int) error { if !commandEnv.isLocked() { return fmt.Errorf("lock is lost") } @@ -217,10 +220,26 @@ func doEcEncode(commandEnv *CommandEnv, collection string, volumeIds []needle.Vo return err } - // generate ec shards + // Sync replicas and select the best one for each volume (with highest file count) + // This addresses data inconsistency risk in multi-replica volumes (issue #7797) + // by syncing missing entries between replicas before encoding + bestReplicas := make(map[needle.VolumeId]wdclient.Location) + for _, vid := range volumeIds { + locs := locations[vid] + collection := volumeIdToCollection[vid] + // Sync missing entries between replicas, then select the best one + bestLoc, selectErr := syncAndSelectBestReplica(commandEnv.option.GrpcDialOption, vid, collection, locs, "", writer) + if selectErr != nil { + return fmt.Errorf("failed to sync and select replica for volume %d: %v", vid, selectErr) + } + bestReplicas[vid] = bestLoc + } + + // generate ec shards using the best replica for each volume ewg.Reset() - for i, vid := range volumeIds { - target := locations[vid][i%len(locations[vid])] + for _, vid := range volumeIds { + target := bestReplicas[vid] + collection := volumeIdToCollection[vid] ewg.Add(func() error { if err := generateEcShards(commandEnv.option.GrpcDialOption, vid, collection, target.ServerAddress()); err != nil { return fmt.Errorf("generate ec shards for volume %d on %s: %v", vid, target.Url, err) @@ -239,8 +258,9 @@ func doEcEncode(commandEnv *CommandEnv, collection string, volumeIds []needle.Vo } ewg.Reset() - for i, vid := range volumeIds { - target := locations[vid][i%len(locations[vid])] + for _, vid := range volumeIds { + target := bestReplicas[vid] + collection := volumeIdToCollection[vid] ewg.Add(func() error { if err := mountEcShards(commandEnv.option.GrpcDialOption, collection, vid, target.ServerAddress(), shardIds); err != nil { return fmt.Errorf("mount ec shards for volume %d on %s: %v", vid, target.Url, err) diff --git a/weed/shell/command_volume_replica_check.go b/weed/shell/command_volume_replica_check.go new file mode 100644 index 000000000..cbc18e7f7 --- /dev/null +++ b/weed/shell/command_volume_replica_check.go @@ -0,0 +1,353 @@ +package shell + +import ( + "bytes" + "context" + "fmt" + "io" + "math" + "sync" + "time" + + "google.golang.org/grpc" + + "github.com/seaweedfs/seaweedfs/weed/operation" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" + "github.com/seaweedfs/seaweedfs/weed/storage/needle" + "github.com/seaweedfs/seaweedfs/weed/storage/needle_map" + "github.com/seaweedfs/seaweedfs/weed/wdclient" +) + +// VolumeReplicaStatus represents the status of a volume replica +type VolumeReplicaStatus struct { + Location wdclient.Location + FileCount uint64 + FileDeletedCount uint64 + VolumeSize uint64 + IsReadOnly bool + Error error +} + +// getVolumeReplicaStatus retrieves the current status of a volume replica +func getVolumeReplicaStatus(grpcDialOption grpc.DialOption, vid needle.VolumeId, location wdclient.Location) VolumeReplicaStatus { + status := VolumeReplicaStatus{ + Location: location, + } + + err := operation.WithVolumeServerClient(false, location.ServerAddress(), grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + resp, reqErr := volumeServerClient.VolumeStatus(context.Background(), &volume_server_pb.VolumeStatusRequest{ + VolumeId: uint32(vid), + }) + if reqErr != nil { + return reqErr + } + if resp != nil { + status.FileCount = resp.FileCount + status.FileDeletedCount = resp.FileDeletedCount + status.VolumeSize = resp.VolumeSize + status.IsReadOnly = resp.IsReadOnly + } + return nil + }) + status.Error = err + return status +} + +// getVolumeReplicaStatuses retrieves status for all replicas of a volume in parallel +func getVolumeReplicaStatuses(grpcDialOption grpc.DialOption, vid needle.VolumeId, locations []wdclient.Location) []VolumeReplicaStatus { + statuses := make([]VolumeReplicaStatus, len(locations)) + var wg sync.WaitGroup + for i, location := range locations { + wg.Add(1) + go func(i int, location wdclient.Location) { + defer wg.Done() + statuses[i] = getVolumeReplicaStatus(grpcDialOption, vid, location) + }(i, location) + } + wg.Wait() + return statuses +} + +// replicaUnionBuilder builds a union replica by copying missing entries from other replicas +type replicaUnionBuilder struct { + grpcDialOption grpc.DialOption + writer io.Writer + vid needle.VolumeId + collection string +} + +// buildUnionReplica finds the largest replica and copies missing entries from other replicas into it. +// If excludeFromSelection is non-empty, that server won't be selected as the target but will still +// be used as a source for missing entries. +// Returns the location of the union replica (the one that now has all entries). +func (rub *replicaUnionBuilder) buildUnionReplica(locations []wdclient.Location, excludeFromSelection string) (wdclient.Location, int, error) { + if len(locations) == 0 { + return wdclient.Location{}, 0, fmt.Errorf("no replicas available") + } + if len(locations) == 1 { + if locations[0].Url == excludeFromSelection { + return wdclient.Location{}, 0, fmt.Errorf("only replica is excluded") + } + return locations[0], 0, nil + } + + // Step 1: Find the largest replica (highest file count) that's not excluded + statuses := getVolumeReplicaStatuses(rub.grpcDialOption, rub.vid, locations) + + bestIdx := -1 + var bestFileCount uint64 + for i, s := range statuses { + if s.Error == nil && locations[i].Url != excludeFromSelection { + if bestIdx == -1 || s.FileCount > bestFileCount { + bestIdx = i + bestFileCount = s.FileCount + } + } + } + + if bestIdx == -1 { + return wdclient.Location{}, 0, fmt.Errorf("could not find valid replica (all excluded or errored)") + } + + bestLocation := locations[bestIdx] + fmt.Fprintf(rub.writer, "volume %d: selected %s as best replica (file count: %d)\n", + rub.vid, bestLocation.Url, bestFileCount) + + // Step 2: Read index database from the best replica + bestDB := needle_map.NewMemDb() + if bestDB == nil { + return wdclient.Location{}, 0, fmt.Errorf("failed to allocate in-memory needle DB") + } + defer bestDB.Close() + + if err := rub.readIndexDatabase(bestDB, bestLocation.ServerAddress()); err != nil { + return wdclient.Location{}, 0, fmt.Errorf("read index from best replica %s: %w", bestLocation.Url, err) + } + + // Step 3: For each other replica (including excluded), find entries missing from best and copy them + totalSynced := 0 + cutoffFromAtNs := uint64(time.Now().UnixNano()) + + for i, loc := range locations { + if i == bestIdx { + continue + } + if statuses[i].Error != nil { + fmt.Fprintf(rub.writer, " skipping %s: %v\n", loc.Url, statuses[i].Error) + continue + } + + // Read this replica's index + otherDB := needle_map.NewMemDb() + if otherDB == nil { + fmt.Fprintf(rub.writer, " skipping %s: failed to allocate DB\n", loc.Url) + continue + } + + if err := rub.readIndexDatabase(otherDB, loc.ServerAddress()); err != nil { + otherDB.Close() + fmt.Fprintf(rub.writer, " skipping %s: %v\n", loc.Url, err) + continue + } + + // Find entries in other that are missing from best + var missingNeedles []needle_map.NeedleValue + + otherDB.AscendingVisit(func(nv needle_map.NeedleValue) error { + if nv.Size.IsDeleted() { + return nil + } + if _, found := bestDB.Get(nv.Key); !found { + // Check if this entry was written too recently (after sync started) + // Skip entries written after sync started to avoid copying in-flight writes + if needleMeta, err := readNeedleMeta(rub.grpcDialOption, loc.ServerAddress(), uint32(rub.vid), nv); err == nil { + if needleMeta.AppendAtNs > cutoffFromAtNs { + return nil // Skip entries written after sync started + } + } + missingNeedles = append(missingNeedles, nv) + } + return nil + }) + otherDB.Close() + + if len(missingNeedles) == 0 { + continue + } + + // Copy missing entries from this replica to best replica + syncedFromThis := 0 + for _, nv := range missingNeedles { + needleBlob, err := rub.readNeedleBlob(loc.ServerAddress(), nv) + if err != nil { + fmt.Fprintf(rub.writer, " warning: read needle %d from %s: %v\n", nv.Key, loc.Url, err) + continue + } + + if err := rub.writeNeedleBlob(bestLocation.ServerAddress(), nv, needleBlob); err != nil { + fmt.Fprintf(rub.writer, " warning: write needle %d to %s: %v\n", nv.Key, bestLocation.Url, err) + continue + } + + // Also add to bestDB so we don't copy duplicates from other replicas + bestDB.Set(nv.Key, nv.Offset, nv.Size) + syncedFromThis++ + } + + if syncedFromThis > 0 { + fmt.Fprintf(rub.writer, " copied %d entries from %s to %s\n", + syncedFromThis, loc.Url, bestLocation.Url) + totalSynced += syncedFromThis + } + } + + return bestLocation, totalSynced, nil +} + +func (rub *replicaUnionBuilder) readIndexDatabase(db *needle_map.MemDb, server pb.ServerAddress) error { + var buf bytes.Buffer + + err := operation.WithVolumeServerClient(true, server, rub.grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + copyFileClient, err := volumeServerClient.CopyFile(context.Background(), &volume_server_pb.CopyFileRequest{ + VolumeId: uint32(rub.vid), + Ext: ".idx", + CompactionRevision: math.MaxUint32, + StopOffset: math.MaxInt64, + Collection: rub.collection, + IsEcVolume: false, + IgnoreSourceFileNotFound: false, + }) + if err != nil { + return fmt.Errorf("start copy: %w", err) + } + + for { + resp, recvErr := copyFileClient.Recv() + if recvErr == io.EOF { + break + } + if recvErr != nil { + return fmt.Errorf("receive: %w", recvErr) + } + buf.Write(resp.FileContent) + } + return nil + }) + if err != nil { + return err + } + + return db.LoadFilterFromReaderAt(bytes.NewReader(buf.Bytes()), true, false) +} + +func (rub *replicaUnionBuilder) readNeedleBlob(server pb.ServerAddress, nv needle_map.NeedleValue) ([]byte, error) { + var needleBlob []byte + err := operation.WithVolumeServerClient(false, server, rub.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + resp, err := client.ReadNeedleBlob(context.Background(), &volume_server_pb.ReadNeedleBlobRequest{ + VolumeId: uint32(rub.vid), + Offset: nv.Offset.ToActualOffset(), + Size: int32(nv.Size), + }) + if err != nil { + return err + } + needleBlob = resp.NeedleBlob + return nil + }) + return needleBlob, err +} + +func (rub *replicaUnionBuilder) writeNeedleBlob(server pb.ServerAddress, nv needle_map.NeedleValue, needleBlob []byte) error { + return operation.WithVolumeServerClient(false, server, rub.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + _, err := client.WriteNeedleBlob(context.Background(), &volume_server_pb.WriteNeedleBlobRequest{ + VolumeId: uint32(rub.vid), + NeedleId: uint64(nv.Key), + Size: int32(nv.Size), + NeedleBlob: needleBlob, + }) + return err + }) +} + +// syncAndSelectBestReplica finds the largest replica, copies missing entries from other replicas +// into it to create a union, then returns this union replica for the operation. +// If excludeFromSelection is non-empty, that server won't be selected but will still contribute entries. +// +// The process: +// 1. Find the replica with the highest file count (the "best" one), excluding excludeFromSelection +// 2. For each other replica, find entries missing from best and copy them to best +// 3. Return the best replica which now contains the union of all entries +func syncAndSelectBestReplica(grpcDialOption grpc.DialOption, vid needle.VolumeId, collection string, locations []wdclient.Location, excludeFromSelection string, writer io.Writer) (wdclient.Location, error) { + if len(locations) == 0 { + return wdclient.Location{}, fmt.Errorf("no replicas available for volume %d", vid) + } + + // Filter for checking consistency (exclude the excluded server) + var checkLocations []wdclient.Location + for _, loc := range locations { + if loc.Url != excludeFromSelection { + checkLocations = append(checkLocations, loc) + } + } + + if len(checkLocations) == 0 { + return wdclient.Location{}, fmt.Errorf("no replicas available for volume %d after exclusion", vid) + } + + if len(checkLocations) == 1 && len(locations) == 1 { + return checkLocations[0], nil + } + + // Check if replicas are already consistent (skip sync if so) + statuses := getVolumeReplicaStatuses(grpcDialOption, vid, locations) + var validStatuses []VolumeReplicaStatus + for i, s := range statuses { + if s.Error == nil { + // Include all for consistency check + validStatuses = append(validStatuses, s) + _ = i + } + } + + if len(validStatuses) > 1 { + allSame := true + for _, s := range validStatuses[1:] { + if s.FileCount != validStatuses[0].FileCount { + allSame = false + break + } + } + if allSame { + // All replicas are consistent, return the best non-excluded one + for _, s := range validStatuses { + if s.Location.Url != excludeFromSelection { + fmt.Fprintf(writer, "volume %d: all %d replicas are consistent (file count: %d)\n", + vid, len(validStatuses), s.FileCount) + return s.Location, nil + } + } + } + } + + // Replicas are inconsistent, build union on the best replica + fmt.Fprintf(writer, "volume %d: replicas are inconsistent, building union...\n", vid) + + builder := &replicaUnionBuilder{ + grpcDialOption: grpcDialOption, + writer: writer, + vid: vid, + collection: collection, + } + + unionLocation, totalSynced, err := builder.buildUnionReplica(locations, excludeFromSelection) + if err != nil { + return wdclient.Location{}, fmt.Errorf("failed to build union replica: %w", err) + } + + if totalSynced > 0 { + fmt.Fprintf(writer, "volume %d: added %d entries to union replica %s\n", vid, totalSynced, unionLocation.Url) + } + + return unionLocation, nil +} diff --git a/weed/shell/command_volume_replica_check_test.go b/weed/shell/command_volume_replica_check_test.go new file mode 100644 index 000000000..7629ea862 --- /dev/null +++ b/weed/shell/command_volume_replica_check_test.go @@ -0,0 +1,273 @@ +package shell + +import ( + "bytes" + "testing" + + "github.com/seaweedfs/seaweedfs/weed/storage/needle_map" + "github.com/seaweedfs/seaweedfs/weed/storage/types" +) + +func TestBuildUnionFromMultipleIndexDatabases(t *testing.T) { + // Test that we can correctly identify missing entries between replicas + + // Create mock index databases representing different replicas + replicaA := needle_map.NewMemDb() + replicaB := needle_map.NewMemDb() + replicaC := needle_map.NewMemDb() + defer replicaA.Close() + defer replicaB.Close() + defer replicaC.Close() + + // Replica A has entries 1, 2, 3, 4, 5 + replicaA.Set(types.NeedleId(1), types.Offset{}, types.Size(100)) + replicaA.Set(types.NeedleId(2), types.Offset{}, types.Size(100)) + replicaA.Set(types.NeedleId(3), types.Offset{}, types.Size(100)) + replicaA.Set(types.NeedleId(4), types.Offset{}, types.Size(100)) + replicaA.Set(types.NeedleId(5), types.Offset{}, types.Size(100)) + + // Replica B has entries 1, 2, 3, 6, 7 (missing 4, 5 from A, has unique 6, 7) + replicaB.Set(types.NeedleId(1), types.Offset{}, types.Size(100)) + replicaB.Set(types.NeedleId(2), types.Offset{}, types.Size(100)) + replicaB.Set(types.NeedleId(3), types.Offset{}, types.Size(100)) + replicaB.Set(types.NeedleId(6), types.Offset{}, types.Size(100)) + replicaB.Set(types.NeedleId(7), types.Offset{}, types.Size(100)) + + // Replica C has entries 1, 2, 8 (minimal overlap, has unique 8) + replicaC.Set(types.NeedleId(1), types.Offset{}, types.Size(100)) + replicaC.Set(types.NeedleId(2), types.Offset{}, types.Size(100)) + replicaC.Set(types.NeedleId(8), types.Offset{}, types.Size(100)) + + // Test: Find entries in B that are missing from A + var missingFromA []types.NeedleId + replicaB.AscendingVisit(func(nv needle_map.NeedleValue) error { + if _, found := replicaA.Get(nv.Key); !found { + missingFromA = append(missingFromA, nv.Key) + } + return nil + }) + + if len(missingFromA) != 2 { + t.Errorf("Expected 2 entries missing from A (6, 7), got %d: %v", len(missingFromA), missingFromA) + } + + // Test: Find entries in C that are missing from A + var missingFromAinC []types.NeedleId + replicaC.AscendingVisit(func(nv needle_map.NeedleValue) error { + if _, found := replicaA.Get(nv.Key); !found { + missingFromAinC = append(missingFromAinC, nv.Key) + } + return nil + }) + + if len(missingFromAinC) != 1 { + t.Errorf("Expected 1 entry missing from A in C (8), got %d: %v", len(missingFromAinC), missingFromAinC) + } + + // Simulate building union: add missing entries to A + for _, id := range missingFromA { + replicaA.Set(id, types.Offset{}, types.Size(100)) + } + for _, id := range missingFromAinC { + replicaA.Set(id, types.Offset{}, types.Size(100)) + } + + // Verify A now has all 8 unique entries + count := 0 + replicaA.AscendingVisit(func(nv needle_map.NeedleValue) error { + count++ + return nil + }) + + if count != 8 { + t.Errorf("Expected union to have 8 entries, got %d", count) + } +} + +func TestFindLargestReplica(t *testing.T) { + // Test that we correctly identify the replica with the most entries + + type replicaInfo struct { + url string + fileCount uint64 + } + + testCases := []struct { + name string + replicas []replicaInfo + expected string + }{ + { + name: "single replica", + replicas: []replicaInfo{ + {"server1:8080", 100}, + }, + expected: "server1:8080", + }, + { + name: "first is largest", + replicas: []replicaInfo{ + {"server1:8080", 100}, + {"server2:8080", 50}, + {"server3:8080", 75}, + }, + expected: "server1:8080", + }, + { + name: "last is largest", + replicas: []replicaInfo{ + {"server1:8080", 50}, + {"server2:8080", 75}, + {"server3:8080", 100}, + }, + expected: "server3:8080", + }, + { + name: "middle is largest", + replicas: []replicaInfo{ + {"server1:8080", 50}, + {"server2:8080", 100}, + {"server3:8080", 75}, + }, + expected: "server2:8080", + }, + { + name: "all equal - pick first", + replicas: []replicaInfo{ + {"server1:8080", 100}, + {"server2:8080", 100}, + {"server3:8080", 100}, + }, + expected: "server1:8080", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // Find the largest + bestIdx := 0 + var bestCount uint64 = 0 + for i, r := range tc.replicas { + if i == 0 || r.fileCount > bestCount { + bestIdx = i + bestCount = r.fileCount + } + } + + if tc.replicas[bestIdx].url != tc.expected { + t.Errorf("Expected %s, got %s", tc.expected, tc.replicas[bestIdx].url) + } + }) + } +} + +func TestDeletedEntriesAreSkipped(t *testing.T) { + // Test that deleted entries are not copied during sync + + replicaA := needle_map.NewMemDb() + replicaB := needle_map.NewMemDb() + defer replicaA.Close() + defer replicaB.Close() + + // Replica A has entries 1, 2, 3 (all valid) + replicaA.Set(types.NeedleId(1), types.Offset{}, types.Size(100)) + replicaA.Set(types.NeedleId(2), types.Offset{}, types.Size(100)) + replicaA.Set(types.NeedleId(3), types.Offset{}, types.Size(100)) + + // Replica B has entry 4 valid, entry 5 deleted + replicaB.Set(types.NeedleId(4), types.Offset{}, types.Size(100)) + replicaB.Set(types.NeedleId(5), types.Offset{}, types.Size(-1)) // Deleted (negative size) + + // Find non-deleted entries in B missing from A + var missingFromA []types.NeedleId + replicaB.AscendingVisit(func(nv needle_map.NeedleValue) error { + if nv.Size.IsDeleted() { + return nil // Skip deleted + } + if _, found := replicaA.Get(nv.Key); !found { + missingFromA = append(missingFromA, nv.Key) + } + return nil + }) + + if len(missingFromA) != 1 { + t.Errorf("Expected 1 non-deleted entry missing (4), got %d: %v", len(missingFromA), missingFromA) + } + + if len(missingFromA) > 0 && missingFromA[0] != types.NeedleId(4) { + t.Errorf("Expected missing entry to be 4, got %d", missingFromA[0]) + } +} + +func TestReplicaUnionBuilder_EmptyLocations(t *testing.T) { + // Test handling of empty locations slice + builder := &replicaUnionBuilder{ + writer: &bytes.Buffer{}, + vid: 1, + } + + _, count, err := builder.buildUnionReplica(nil, "") + if err == nil { + t.Error("Expected error for empty locations") + } + if count != 0 { + t.Errorf("Expected 0 synced, got %d", count) + } +} + +func TestAvoidDuplicateCopies(t *testing.T) { + // Test that when building union, we don't copy the same entry multiple times + // by updating the best replica's in-memory index after each copy + + bestDB := needle_map.NewMemDb() + defer bestDB.Close() + + // Best replica has entries 1, 2 + bestDB.Set(types.NeedleId(1), types.Offset{}, types.Size(100)) + bestDB.Set(types.NeedleId(2), types.Offset{}, types.Size(100)) + + // Simulate two other replicas both having entry 3 + otherReplicas := [][]types.NeedleId{ + {3, 4}, // Replica B has 3, 4 + {3, 5}, // Replica C has 3, 5 + } + + copiedEntries := make(map[types.NeedleId]int) // Track how many times each entry is "copied" + + for _, otherEntries := range otherReplicas { + for _, id := range otherEntries { + if _, found := bestDB.Get(id); !found { + // Would copy this entry + copiedEntries[id]++ + // Add to bestDB to prevent duplicate copies + bestDB.Set(id, types.Offset{}, types.Size(100)) + } + } + } + + // Entry 3 should only be copied once (from first replica that has it) + if copiedEntries[types.NeedleId(3)] != 1 { + t.Errorf("Entry 3 should be copied exactly once, got %d", copiedEntries[types.NeedleId(3)]) + } + + // Entry 4 should be copied once + if copiedEntries[types.NeedleId(4)] != 1 { + t.Errorf("Entry 4 should be copied exactly once, got %d", copiedEntries[types.NeedleId(4)]) + } + + // Entry 5 should be copied once + if copiedEntries[types.NeedleId(5)] != 1 { + t.Errorf("Entry 5 should be copied exactly once, got %d", copiedEntries[types.NeedleId(5)]) + } + + // Best should now have 5 entries total + count := 0 + bestDB.AscendingVisit(func(nv needle_map.NeedleValue) error { + count++ + return nil + }) + if count != 5 { + t.Errorf("Expected 5 entries in union, got %d", count) + } +} + diff --git a/weed/shell/command_volume_tier_move.go b/weed/shell/command_volume_tier_move.go index 3f8d2fc2e..cfa2eaef3 100644 --- a/weed/shell/command_volume_tier_move.go +++ b/weed/shell/command_volume_tier_move.go @@ -102,6 +102,9 @@ func (c *commandVolumeTierMove) Do(args []string, commandEnv *CommandEnv, writer } fmt.Printf("tier move volumes: %v\n", volumeIds) + // Collect volume ID to collection name mapping for the sync operation + volumeIdToCollection := collectVolumeIdToCollection(topologyInfo, volumeIds) + _, allLocations := collectVolumeReplicaLocations(topologyInfo) allLocations = filterLocationsByDiskType(allLocations, toDiskType) keepDataNodesSorted(allLocations, toDiskType) @@ -143,7 +146,8 @@ func (c *commandVolumeTierMove) Do(args []string, commandEnv *CommandEnv, writer } for _, vid := range volumeIds { - if err = c.doVolumeTierMove(commandEnv, writer, vid, toDiskType, allLocations); err != nil { + collection := volumeIdToCollection[vid] + if err = c.doVolumeTierMove(commandEnv, writer, vid, collection, toDiskType, allLocations); err != nil { fmt.Printf("tier move volume %d: %v\n", vid, err) } allLocations = rotateDataNodes(allLocations) @@ -192,7 +196,7 @@ func isOneOf(server string, locations []wdclient.Location) bool { return false } -func (c *commandVolumeTierMove) doVolumeTierMove(commandEnv *CommandEnv, writer io.Writer, vid needle.VolumeId, toDiskType types.DiskType, allLocations []location) (err error) { +func (c *commandVolumeTierMove) doVolumeTierMove(commandEnv *CommandEnv, writer io.Writer, vid needle.VolumeId, collection string, toDiskType types.DiskType, allLocations []location) (err error) { // find volume location locations, found := commandEnv.MasterClient.GetLocationsClone(uint32(vid)) if !found { @@ -208,12 +212,18 @@ func (c *commandVolumeTierMove) doVolumeTierMove(commandEnv *CommandEnv, writer if isOneOf(dst.dataNode.Id, locations) { continue } - var sourceVolumeServer pb.ServerAddress - for _, loc := range locations { - if loc.Url != dst.dataNode.Id { - sourceVolumeServer = loc.ServerAddress() - } + + // Sync replicas and select the best one (with highest file count) for multi-replica volumes + // This addresses data inconsistency risk in multi-replica volumes (issue #7797) + // by syncing missing entries between replicas before moving + sourceLoc, selectErr := syncAndSelectBestReplica( + commandEnv.option.GrpcDialOption, vid, collection, locations, dst.dataNode.Id, writer) + if selectErr != nil { + fmt.Fprintf(writer, "failed to sync and select source replica for volume %d: %v\n", vid, selectErr) + continue } + sourceVolumeServer := sourceLoc.ServerAddress() + if sourceVolumeServer == "" { continue }