diff --git a/weed/plugin/worker/erasure_coding_handler.go b/weed/plugin/worker/erasure_coding_handler.go index d5f32c3f4..a8a028133 100644 --- a/weed/plugin/worker/erasure_coding_handler.go +++ b/weed/plugin/worker/erasure_coding_handler.go @@ -601,7 +601,8 @@ func (h *ErasureCodingHandler) collectVolumeMetrics( masterAddresses []string, collectionFilter string, ) ([]*workertypes.VolumeHealthMetrics, *topology.ActiveTopology, error) { - return collectVolumeMetricsFromMasters(ctx, masterAddresses, collectionFilter, h.grpcDialOption) + metrics, activeTopology, _, err := collectVolumeMetricsFromMasters(ctx, masterAddresses, collectionFilter, h.grpcDialOption) + return metrics, activeTopology, err } func deriveErasureCodingWorkerConfig(values map[string]*plugin_pb.ConfigValue) *erasureCodingWorkerConfig { diff --git a/weed/plugin/worker/vacuum_handler.go b/weed/plugin/worker/vacuum_handler.go index 1ad58b30d..7855e6b72 100644 --- a/weed/plugin/worker/vacuum_handler.go +++ b/weed/plugin/worker/vacuum_handler.go @@ -505,7 +505,8 @@ func (h *VacuumHandler) collectVolumeMetrics( masterAddresses []string, collectionFilter string, ) ([]*workertypes.VolumeHealthMetrics, *topology.ActiveTopology, error) { - return collectVolumeMetricsFromMasters(ctx, masterAddresses, collectionFilter, h.grpcDialOption) + metrics, activeTopology, _, err := collectVolumeMetricsFromMasters(ctx, masterAddresses, collectionFilter, h.grpcDialOption) + return metrics, activeTopology, err } func deriveVacuumConfig(values map[string]*plugin_pb.ConfigValue) *vacuumtask.Config { diff --git a/weed/plugin/worker/volume_balance_handler.go b/weed/plugin/worker/volume_balance_handler.go index 1f880bd50..5d49cc784 100644 --- a/weed/plugin/worker/volume_balance_handler.go +++ b/weed/plugin/worker/volume_balance_handler.go @@ -314,7 +314,7 @@ func (h *VolumeBalanceHandler) Detect( masters = append(masters, request.ClusterContext.MasterGrpcAddresses...) } - metrics, activeTopology, err := h.collectVolumeMetrics(ctx, masters, collectionFilter) + metrics, activeTopology, replicaMap, err := h.collectVolumeMetrics(ctx, masters, collectionFilter) if err != nil { return err } @@ -334,7 +334,10 @@ func (h *VolumeBalanceHandler) Detect( workerConfig.TaskConfig.RackFilter = rackFilter workerConfig.TaskConfig.NodeFilter = nodeFilter - clusterInfo := &workertypes.ClusterInfo{ActiveTopology: activeTopology} + clusterInfo := &workertypes.ClusterInfo{ + ActiveTopology: activeTopology, + VolumeReplicaMap: replicaMap, + } maxResults := int(request.MaxResults) var results []*workertypes.TaskDetectionResult @@ -1072,10 +1075,8 @@ func (h *VolumeBalanceHandler) collectVolumeMetrics( ctx context.Context, masterAddresses []string, collectionFilter string, -) ([]*workertypes.VolumeHealthMetrics, *topology.ActiveTopology, error) { - // Reuse the same master topology fetch/build flow used by the vacuum handler. - helper := &VacuumHandler{grpcDialOption: h.grpcDialOption} - return helper.collectVolumeMetrics(ctx, masterAddresses, collectionFilter) +) ([]*workertypes.VolumeHealthMetrics, *topology.ActiveTopology, map[uint32][]workertypes.ReplicaLocation, error) { + return collectVolumeMetricsFromMasters(ctx, masterAddresses, collectionFilter, h.grpcDialOption) } func deriveBalanceWorkerConfig(values map[string]*plugin_pb.ConfigValue) *volumeBalanceWorkerConfig { diff --git a/weed/plugin/worker/volume_metrics.go b/weed/plugin/worker/volume_metrics.go index 454eff079..7cc184119 100644 --- a/weed/plugin/worker/volume_metrics.go +++ b/weed/plugin/worker/volume_metrics.go @@ -22,12 +22,12 @@ func collectVolumeMetricsFromMasters( masterAddresses []string, collectionFilter string, grpcDialOption grpc.DialOption, -) ([]*workertypes.VolumeHealthMetrics, *topology.ActiveTopology, error) { +) ([]*workertypes.VolumeHealthMetrics, *topology.ActiveTopology, map[uint32][]workertypes.ReplicaLocation, error) { if grpcDialOption == nil { - return nil, nil, fmt.Errorf("grpc dial option is not configured") + return nil, nil, nil, fmt.Errorf("grpc dial option is not configured") } if len(masterAddresses) == 0 { - return nil, nil, fmt.Errorf("no master addresses provided in cluster context") + return nil, nil, nil, fmt.Errorf("no master addresses provided in cluster context") } for _, masterAddress := range masterAddresses { @@ -37,20 +37,20 @@ func collectVolumeMetricsFromMasters( continue } - metrics, activeTopology, buildErr := buildVolumeMetrics(response, collectionFilter) + metrics, activeTopology, replicaMap, buildErr := buildVolumeMetrics(response, collectionFilter) if buildErr != nil { // Configuration errors (e.g. invalid regex) will fail on every master, // so return immediately instead of masking them with retries. if isConfigError(buildErr) { - return nil, nil, buildErr + return nil, nil, nil, buildErr } glog.Warningf("Plugin worker failed to build metrics from master %s: %v", masterAddress, buildErr) continue } - return metrics, activeTopology, nil + return metrics, activeTopology, replicaMap, nil } - return nil, nil, fmt.Errorf("failed to load topology from all provided masters") + return nil, nil, nil, fmt.Errorf("failed to load topology from all provided masters") } func fetchVolumeList(ctx context.Context, address string, grpcDialOption grpc.DialOption) (*master_pb.VolumeListResponse, error) { @@ -89,14 +89,14 @@ func fetchVolumeList(ctx context.Context, address string, grpcDialOption grpc.Di func buildVolumeMetrics( response *master_pb.VolumeListResponse, collectionFilter string, -) ([]*workertypes.VolumeHealthMetrics, *topology.ActiveTopology, error) { +) ([]*workertypes.VolumeHealthMetrics, *topology.ActiveTopology, map[uint32][]workertypes.ReplicaLocation, error) { if response == nil || response.TopologyInfo == nil { - return nil, nil, fmt.Errorf("volume list response has no topology info") + return nil, nil, nil, fmt.Errorf("volume list response has no topology info") } activeTopology := topology.NewActiveTopology(10) if err := activeTopology.UpdateTopology(response.TopologyInfo); err != nil { - return nil, nil, err + return nil, nil, nil, err } var collectionRegex *regexp.Regexp @@ -105,19 +105,28 @@ func buildVolumeMetrics( var err error collectionRegex, err = regexp.Compile(trimmedFilter) if err != nil { - return nil, nil, &configError{err: fmt.Errorf("invalid collection_filter regex %q: %w", trimmedFilter, err)} + return nil, nil, nil, &configError{err: fmt.Errorf("invalid collection_filter regex %q: %w", trimmedFilter, err)} } } volumeSizeLimitBytes := uint64(response.VolumeSizeLimitMb) * 1024 * 1024 now := time.Now() metrics := make([]*workertypes.VolumeHealthMetrics, 0, 256) + replicaMap := make(map[uint32][]workertypes.ReplicaLocation) for _, dc := range response.TopologyInfo.DataCenterInfos { for _, rack := range dc.RackInfos { for _, node := range rack.DataNodeInfos { for diskType, diskInfo := range node.DiskInfos { for _, volume := range diskInfo.VolumeInfos { + // Build replica map from ALL volumes BEFORE collection filtering, + // since replicas may span filtered/unfiltered nodes. + replicaMap[volume.Id] = append(replicaMap[volume.Id], workertypes.ReplicaLocation{ + DataCenter: dc.Id, + Rack: rack.Id, + NodeID: node.Id, + }) + if collectionRegex != nil && !collectionRegex.MatchString(volume.Collection) { continue } @@ -160,7 +169,7 @@ func buildVolumeMetrics( metric.ReplicaCount = replicaCounts[metric.VolumeID] } - return metrics, activeTopology, nil + return metrics, activeTopology, replicaMap, nil } // configError wraps configuration errors that should not be retried across masters. diff --git a/weed/plugin/worker/volume_metrics_test.go b/weed/plugin/worker/volume_metrics_test.go index 6e5e6904f..17710724e 100644 --- a/weed/plugin/worker/volume_metrics_test.go +++ b/weed/plugin/worker/volume_metrics_test.go @@ -39,7 +39,7 @@ func TestBuildVolumeMetricsEmptyFilter(t *testing.T) { &master_pb.VolumeInformationMessage{Id: 1, Collection: "photos", Size: 100}, &master_pb.VolumeInformationMessage{Id: 2, Collection: "videos", Size: 200}, ) - metrics, _, err := buildVolumeMetrics(resp, "") + metrics, _, _, err := buildVolumeMetrics(resp, "") if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -53,7 +53,7 @@ func TestBuildVolumeMetricsAllCollections(t *testing.T) { &master_pb.VolumeInformationMessage{Id: 1, Collection: "photos", Size: 100}, &master_pb.VolumeInformationMessage{Id: 2, Collection: "videos", Size: 200}, ) - metrics, _, err := buildVolumeMetrics(resp, collectionFilterAll) + metrics, _, _, err := buildVolumeMetrics(resp, collectionFilterAll) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -68,7 +68,7 @@ func TestBuildVolumeMetricsEachCollection(t *testing.T) { &master_pb.VolumeInformationMessage{Id: 2, Collection: "videos", Size: 200}, ) // EACH_COLLECTION passes all volumes through; filtering happens in the handler - metrics, _, err := buildVolumeMetrics(resp, collectionFilterEach) + metrics, _, _, err := buildVolumeMetrics(resp, collectionFilterEach) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -83,7 +83,7 @@ func TestBuildVolumeMetricsRegexFilter(t *testing.T) { &master_pb.VolumeInformationMessage{Id: 2, Collection: "videos", Size: 200}, &master_pb.VolumeInformationMessage{Id: 3, Collection: "photos-backup", Size: 300}, ) - metrics, _, err := buildVolumeMetrics(resp, "^photos$") + metrics, _, _, err := buildVolumeMetrics(resp, "^photos$") if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -99,7 +99,7 @@ func TestBuildVolumeMetricsInvalidRegex(t *testing.T) { resp := makeTestVolumeListResponse( &master_pb.VolumeInformationMessage{Id: 1, Collection: "photos", Size: 100}, ) - _, _, err := buildVolumeMetrics(resp, "[invalid") + _, _, _, err := buildVolumeMetrics(resp, "[invalid") if err == nil { t.Fatal("expected error for invalid regex") } diff --git a/weed/worker/tasks/balance/detection.go b/weed/worker/tasks/balance/detection.go index 6af1dfd77..c81444540 100644 --- a/weed/worker/tasks/balance/detection.go +++ b/weed/worker/tasks/balance/detection.go @@ -10,6 +10,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/admin/topology" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" + "github.com/seaweedfs/seaweedfs/weed/storage/super_block" "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base" "github.com/seaweedfs/seaweedfs/weed/worker/tasks/util" "github.com/seaweedfs/seaweedfs/weed/worker/types" @@ -389,6 +390,46 @@ func createBalanceTask(diskType string, selectedVolume *types.VolumeHealthMetric } } + // Validate move against replica placement policy + if selectedVolume.ExpectedReplicas > 0 && selectedVolume.ExpectedReplicas <= 255 && clusterInfo.VolumeReplicaMap != nil { + rpBytes, rpErr := super_block.NewReplicaPlacementFromByte(byte(selectedVolume.ExpectedReplicas)) + if rpErr == nil && rpBytes.HasReplication() { + replicas := clusterInfo.VolumeReplicaMap[selectedVolume.VolumeID] + if len(replicas) == 0 { + glog.V(1).Infof("BALANCE [%s]: No replica locations found for volume %d, skipping placement validation", + diskType, selectedVolume.VolumeID) + } else { + validateMove := func(plan *topology.DestinationPlan) bool { + if plan == nil { + return false + } + target := types.ReplicaLocation{ + DataCenter: plan.TargetDC, + Rack: plan.TargetRack, + NodeID: plan.TargetNode, + } + return IsGoodMove(rpBytes, replicas, selectedVolume.Server, target) + } + + if !validateMove(destinationPlan) { + glog.V(1).Infof("BALANCE [%s]: Destination %s violates replica placement for volume %d (rp=%03d), falling back", + diskType, destinationPlan.TargetNode, selectedVolume.VolumeID, selectedVolume.ExpectedReplicas) + // Fall back to score-based planning + destinationPlan, err = planBalanceDestination(clusterInfo.ActiveTopology, selectedVolume) + if err != nil { + glog.Warningf("BALANCE [%s]: Failed to plan fallback destination for volume %d: %v", diskType, selectedVolume.VolumeID, err) + return nil, "" + } + if !validateMove(destinationPlan) { + glog.V(1).Infof("BALANCE [%s]: Fallback destination %s also violates replica placement for volume %d", + diskType, destinationPlan.TargetNode, selectedVolume.VolumeID) + return nil, "" + } + } + } + } + } + // Find the actual disk containing the volume on the source server sourceDisk, found := base.FindVolumeDisk(clusterInfo.ActiveTopology, selectedVolume.VolumeID, selectedVolume.Collection, selectedVolume.Server) if !found { diff --git a/weed/worker/tasks/balance/replica_placement.go b/weed/worker/tasks/balance/replica_placement.go new file mode 100644 index 000000000..b46745df2 --- /dev/null +++ b/weed/worker/tasks/balance/replica_placement.go @@ -0,0 +1,146 @@ +package balance + +import ( + "slices" + + "github.com/seaweedfs/seaweedfs/weed/storage/super_block" + "github.com/seaweedfs/seaweedfs/weed/worker/types" +) + +// rackKey uniquely identifies a rack within a data center. +type rackKey struct { + DataCenter string + Rack string +} + +// nodeKey uniquely identifies a node within a rack. +type nodeKey struct { + DataCenter string + Rack string + NodeID string +} + +// IsGoodMove checks whether moving a volume from sourceNodeID to target +// would satisfy the volume's replica placement policy, given the current +// set of replica locations. +func IsGoodMove(rp *super_block.ReplicaPlacement, existingReplicas []types.ReplicaLocation, sourceNodeID string, target types.ReplicaLocation) bool { + if rp == nil || !rp.HasReplication() { + return true // no replication constraint + } + + // Build the replica set after the move: remove source, add target + afterMove := make([]types.ReplicaLocation, 0, len(existingReplicas)) + sourceFound := false + for _, r := range existingReplicas { + if r.NodeID == sourceNodeID { + sourceFound = true + } else { + afterMove = append(afterMove, r) + } + } + if !sourceFound { + // Source not in replica list — cluster state may be inconsistent. + // Treat as unsafe to avoid incorrect placement decisions. + return false + } + + return satisfyReplicaPlacement(rp, afterMove, target) +} + +// satisfyReplicaPlacement checks whether placing a replica at target +// is consistent with the replication policy, given the existing replicas. +// Ported from weed/shell/command_volume_fix_replication.go +func satisfyReplicaPlacement(rp *super_block.ReplicaPlacement, replicas []types.ReplicaLocation, target types.ReplicaLocation) bool { + existingDCs, _, existingNodes := countReplicas(replicas) + + targetNK := nodeKey{DataCenter: target.DataCenter, Rack: target.Rack, NodeID: target.NodeID} + if _, found := existingNodes[targetNK]; found { + // avoid duplicated volume on the same data node + return false + } + + primaryDCs, _ := findTopDCKeys(existingDCs) + + // ensure data center count is within limit + if _, found := existingDCs[target.DataCenter]; !found { + // different from existing dcs + if len(existingDCs) < rp.DiffDataCenterCount+1 { + return true + } + return false + } + // now same as one of existing data centers + if !slices.Contains(primaryDCs, target.DataCenter) { + return false + } + + // now on a primary dc - check racks within this DC + primaryDcRacks := make(map[rackKey]int) + for _, r := range replicas { + if r.DataCenter != target.DataCenter { + continue + } + primaryDcRacks[rackKey{DataCenter: r.DataCenter, Rack: r.Rack}]++ + } + + targetRK := rackKey{DataCenter: target.DataCenter, Rack: target.Rack} + primaryRacks, _ := findTopRackKeys(primaryDcRacks) + sameRackCount := primaryDcRacks[targetRK] + + if _, found := primaryDcRacks[targetRK]; !found { + // different from existing racks + if len(primaryDcRacks) < rp.DiffRackCount+1 { + return true + } + return false + } + // same as one of existing racks + if !slices.Contains(primaryRacks, targetRK) { + return false + } + + // on primary rack - check same-rack count + if sameRackCount < rp.SameRackCount+1 { + return true + } + return false +} + +func countReplicas(replicas []types.ReplicaLocation) (dcCounts map[string]int, rackCounts map[rackKey]int, nodeCounts map[nodeKey]int) { + dcCounts = make(map[string]int) + rackCounts = make(map[rackKey]int) + nodeCounts = make(map[nodeKey]int) + for _, r := range replicas { + dcCounts[r.DataCenter]++ + rackCounts[rackKey{DataCenter: r.DataCenter, Rack: r.Rack}]++ + nodeCounts[nodeKey{DataCenter: r.DataCenter, Rack: r.Rack, NodeID: r.NodeID}]++ + } + return +} + +func findTopDCKeys(m map[string]int) (topKeys []string, max int) { + for k, c := range m { + if max < c { + topKeys = topKeys[:0] + topKeys = append(topKeys, k) + max = c + } else if max == c { + topKeys = append(topKeys, k) + } + } + return +} + +func findTopRackKeys(m map[rackKey]int) (topKeys []rackKey, max int) { + for k, c := range m { + if max < c { + topKeys = topKeys[:0] + topKeys = append(topKeys, k) + max = c + } else if max == c { + topKeys = append(topKeys, k) + } + } + return +} + diff --git a/weed/worker/tasks/balance/replica_placement_test.go b/weed/worker/tasks/balance/replica_placement_test.go new file mode 100644 index 000000000..438cb303b --- /dev/null +++ b/weed/worker/tasks/balance/replica_placement_test.go @@ -0,0 +1,127 @@ +package balance + +import ( + "testing" + + "github.com/seaweedfs/seaweedfs/weed/storage/super_block" + "github.com/seaweedfs/seaweedfs/weed/worker/types" +) + +func rp(t *testing.T, code string) *super_block.ReplicaPlacement { + t.Helper() + r, err := super_block.NewReplicaPlacementFromString(code) + if err != nil { + t.Fatalf("invalid replica placement code %q: %v", code, err) + } + return r +} + +func loc(dc, rack, node string) types.ReplicaLocation { + return types.ReplicaLocation{DataCenter: dc, Rack: rack, NodeID: node} +} + +func TestIsGoodMove_NoReplication(t *testing.T) { + // 000 = no replication. Any move is fine. + if !IsGoodMove(rp(t, "000"), []types.ReplicaLocation{loc("dc1", "r1", "n1")}, "n1", loc("dc1", "r1", "n2")) { + t.Error("000: any move should be allowed") + } +} + +func TestIsGoodMove_001_SameRack(t *testing.T) { + // 001 = 1 replica on same rack (2 total on same rack) + existing := []types.ReplicaLocation{ + loc("dc1", "r1", "n1"), + loc("dc1", "r1", "n2"), + } + // Move n1 -> n3 on same rack: good + if !IsGoodMove(rp(t, "001"), existing, "n1", loc("dc1", "r1", "n3")) { + t.Error("001: move to same rack should be allowed") + } + // Move n1 -> n3 on different rack: bad (would leave only 1 on r1, need 2) + if IsGoodMove(rp(t, "001"), existing, "n1", loc("dc1", "r2", "n3")) { + t.Error("001: move to different rack should not be allowed when it breaks same-rack count") + } +} + +func TestIsGoodMove_010_DiffRack(t *testing.T) { + // 010 = 1 replica on different rack (2 racks total) + existing := []types.ReplicaLocation{ + loc("dc1", "r1", "n1"), + loc("dc1", "r2", "n2"), + } + // Move n1 -> n3 on r2: bad (both replicas on same rack) + if IsGoodMove(rp(t, "010"), existing, "n1", loc("dc1", "r2", "n3")) { + t.Error("010: move to same rack as other replica should not be allowed") + } + // Move n1 -> n3 on r3: good (still 2 different racks) + if !IsGoodMove(rp(t, "010"), existing, "n1", loc("dc1", "r3", "n3")) { + t.Error("010: move to different rack should be allowed") + } +} + +func TestIsGoodMove_100_DiffDC(t *testing.T) { + // 100 = 1 replica in different DC + existing := []types.ReplicaLocation{ + loc("dc1", "r1", "n1"), + loc("dc2", "r1", "n2"), + } + // Move n1 -> n3 in dc2: bad (both in same DC) + if IsGoodMove(rp(t, "100"), existing, "n1", loc("dc2", "r1", "n3")) { + t.Error("100: move to same DC as other replica should not be allowed") + } + // Move n1 -> n3 in dc3: good (different DCs) + if !IsGoodMove(rp(t, "100"), existing, "n1", loc("dc3", "r1", "n3")) { + t.Error("100: move to different DC should be allowed") + } +} + +func TestIsGoodMove_SameNode(t *testing.T) { + // Moving to the same node as an existing replica should always be rejected + existing := []types.ReplicaLocation{ + loc("dc1", "r1", "n1"), + loc("dc1", "r2", "n2"), + } + if IsGoodMove(rp(t, "010"), existing, "n1", loc("dc1", "r2", "n2")) { + t.Error("should reject move to same node as existing replica") + } +} + +func TestIsGoodMove_011_Composite(t *testing.T) { + // 011 = 1 same-rack + 1 different-rack (3 replicas: 2 on same rack, 1 on different) + existing := []types.ReplicaLocation{ + loc("dc1", "r1", "n1"), + loc("dc1", "r1", "n2"), + loc("dc1", "r2", "n3"), + } + // Move n1 -> n4 on r1: good (maintains 2 on r1, 1 on r2) + if !IsGoodMove(rp(t, "011"), existing, "n1", loc("dc1", "r1", "n4")) { + t.Error("011: move within same rack should be allowed") + } + // Move n3 -> n4 on r1: bad (would have 3 on r1, 0 on different rack) + if IsGoodMove(rp(t, "011"), existing, "n3", loc("dc1", "r1", "n4")) { + t.Error("011: move that eliminates different-rack replica should not be allowed") + } +} + +func TestIsGoodMove_110_Composite(t *testing.T) { + // 110 = 1 different-rack + 1 different-DC (3 replicas across 2 DCs and 2 racks) + existing := []types.ReplicaLocation{ + loc("dc1", "r1", "n1"), + loc("dc1", "r2", "n2"), + loc("dc2", "r1", "n3"), + } + // Move n1 -> n4 in dc1/r3: good (dc1 still has r2+r3, dc2 has r1) + if !IsGoodMove(rp(t, "110"), existing, "n1", loc("dc1", "r3", "n4")) { + t.Error("110: move to new rack in same DC should be allowed") + } + // Move n3 -> n4 in dc1/r1: bad (would lose the different-DC replica) + if IsGoodMove(rp(t, "110"), existing, "n3", loc("dc1", "r1", "n4")) { + t.Error("110: move that eliminates different-DC replica should not be allowed") + } +} + +func TestIsGoodMove_NilReplicaPlacement(t *testing.T) { + if !IsGoodMove(nil, []types.ReplicaLocation{loc("dc1", "r1", "n1")}, "n1", loc("dc1", "r1", "n2")) { + t.Error("nil replica placement should allow any move") + } +} diff --git a/weed/worker/types/data_types.go b/weed/worker/types/data_types.go index 64ba5e11c..dcb69cf7e 100644 --- a/weed/worker/types/data_types.go +++ b/weed/worker/types/data_types.go @@ -6,13 +6,21 @@ import ( "github.com/seaweedfs/seaweedfs/weed/admin/topology" ) +// ReplicaLocation identifies where a volume replica lives. +type ReplicaLocation struct { + DataCenter string + Rack string + NodeID string +} + // ClusterInfo contains cluster information for task detection type ClusterInfo struct { - Servers []*VolumeServerInfo - TotalVolumes int - TotalServers int - LastUpdated time.Time - ActiveTopology *topology.ActiveTopology // Added for destination planning in detection + Servers []*VolumeServerInfo + TotalVolumes int + TotalServers int + LastUpdated time.Time + ActiveTopology *topology.ActiveTopology // Added for destination planning in detection + VolumeReplicaMap map[uint32][]ReplicaLocation } // VolumeHealthMetrics contains health information about a volume (simplified)