diff --git a/weed/worker/tasks/balance/detection.go b/weed/worker/tasks/balance/detection.go index 3e297dd13..ac4181a00 100644 --- a/weed/worker/tasks/balance/detection.go +++ b/weed/worker/tasks/balance/detection.go @@ -363,6 +363,23 @@ func createBalanceTask(diskType string, selectedVolume *types.VolumeHealthMetric return nil, "" } + // Parse replica placement once for use in both scoring and validation. + var volumeRP *super_block.ReplicaPlacement + if selectedVolume.ExpectedReplicas > 0 && selectedVolume.ExpectedReplicas <= 255 { + if parsed, rpErr := super_block.NewReplicaPlacementFromByte(byte(selectedVolume.ExpectedReplicas)); rpErr == nil && parsed.HasReplication() { + volumeRP = parsed + } + } + + var replicas []types.ReplicaLocation + if clusterInfo.VolumeReplicaMap != nil { + replicas = clusterInfo.VolumeReplicaMap[selectedVolume.VolumeID] + if volumeRP != nil && len(replicas) == 0 { + glog.V(1).Infof("BALANCE [%s]: No replica locations found for volume %d, skipping placement validation", + diskType, selectedVolume.VolumeID) + } + } + // Resolve the target server chosen by the detection loop's effective counts. // This keeps destination selection in sync with the greedy algorithm rather // than relying on topology LoadCount which can diverge across iterations. @@ -371,62 +388,43 @@ func createBalanceTask(diskType string, selectedVolume *types.VolumeHealthMetric // Fall back to score-based planning if the preferred target can't be resolved glog.V(1).Infof("BALANCE [%s]: Cannot resolve target %s for volume %d, falling back to score-based planning: %v", diskType, targetServer, selectedVolume.VolumeID, err) - destinationPlan, err = planBalanceDestination(clusterInfo.ActiveTopology, selectedVolume) + destinationPlan, err = planBalanceDestination(clusterInfo.ActiveTopology, selectedVolume, volumeRP, replicas, allowedServers) if err != nil { glog.Warningf("Failed to plan balance destination for volume %d: %v", selectedVolume.VolumeID, err) return nil, "" } } - // Verify the destination is within the filtered scope. When DC/rack/node - // filters are active, allowedServers contains only the servers that passed - // filtering. The fallback planner queries the full topology, so this check - // prevents out-of-scope targets from leaking through. - if len(allowedServers) > 0 { - if _, ok := allowedServers[destinationPlan.TargetNode]; !ok { - glog.V(1).Infof("BALANCE [%s]: Planned destination %s for volume %d is outside filtered scope, skipping", + // Verify the resolved destination. If it falls outside the filtered scope + // or violates replica placement, fall back to the score-based planner and + // pick the best candidate that is actually valid. + if !isValidBalanceDestination(destinationPlan, allowedServers, volumeRP, replicas, selectedVolume.Server) { + switch { + case destinationPlan == nil: + glog.V(1).Infof("BALANCE [%s]: Planned destination for volume %d is nil, falling back", + diskType, selectedVolume.VolumeID) + case !isAllowedBalanceTarget(destinationPlan.TargetNode, allowedServers): + glog.V(1).Infof("BALANCE [%s]: Planned destination %s for volume %d is outside filtered scope, falling back", diskType, destinationPlan.TargetNode, selectedVolume.VolumeID) - return nil, "" + case volumeRP != nil && len(replicas) > 0: + glog.V(1).Infof("BALANCE [%s]: Destination %s violates replica placement for volume %d (rp=%03d), falling back", + diskType, destinationPlan.TargetNode, selectedVolume.VolumeID, selectedVolume.ExpectedReplicas) } - } - // Validate move against replica placement policy - if selectedVolume.ExpectedReplicas > 0 && selectedVolume.ExpectedReplicas <= 255 && clusterInfo.VolumeReplicaMap != nil { - rpBytes, rpErr := super_block.NewReplicaPlacementFromByte(byte(selectedVolume.ExpectedReplicas)) - if rpErr == nil && rpBytes.HasReplication() { - replicas := clusterInfo.VolumeReplicaMap[selectedVolume.VolumeID] - if len(replicas) == 0 { - glog.V(1).Infof("BALANCE [%s]: No replica locations found for volume %d, skipping placement validation", + destinationPlan, err = planBalanceDestination(clusterInfo.ActiveTopology, selectedVolume, volumeRP, replicas, allowedServers) + if err != nil { + glog.Warningf("BALANCE [%s]: Failed to plan fallback destination for volume %d: %v", diskType, selectedVolume.VolumeID, err) + return nil, "" + } + if !isValidBalanceDestination(destinationPlan, allowedServers, volumeRP, replicas, selectedVolume.Server) { + if destinationPlan == nil { + glog.V(1).Infof("BALANCE [%s]: Fallback destination for volume %d is nil", diskType, selectedVolume.VolumeID) } else { - validateMove := func(plan *topology.DestinationPlan) bool { - if plan == nil { - return false - } - target := types.ReplicaLocation{ - DataCenter: plan.TargetDC, - Rack: plan.TargetRack, - NodeID: plan.TargetNode, - } - return IsGoodMove(rpBytes, replicas, selectedVolume.Server, target) - } - - if !validateMove(destinationPlan) { - glog.V(1).Infof("BALANCE [%s]: Destination %s violates replica placement for volume %d (rp=%03d), falling back", - diskType, destinationPlan.TargetNode, selectedVolume.VolumeID, selectedVolume.ExpectedReplicas) - // Fall back to score-based planning - destinationPlan, err = planBalanceDestination(clusterInfo.ActiveTopology, selectedVolume) - if err != nil { - glog.Warningf("BALANCE [%s]: Failed to plan fallback destination for volume %d: %v", diskType, selectedVolume.VolumeID, err) - return nil, "" - } - if !validateMove(destinationPlan) { - glog.V(1).Infof("BALANCE [%s]: Fallback destination %s also violates replica placement for volume %d", - diskType, destinationPlan.TargetNode, selectedVolume.VolumeID) - return nil, "" - } - } + glog.V(1).Infof("BALANCE [%s]: Fallback destination %s is not valid for volume %d", + diskType, destinationPlan.TargetNode, selectedVolume.VolumeID) } + return nil, "" } } @@ -555,7 +553,10 @@ func resolveBalanceDestination(activeTopology *topology.ActiveTopology, selected // planBalanceDestination plans the destination for a balance operation using // score-based selection. Used as a fallback when the preferred target cannot // be resolved, and for single-move scenarios outside the detection loop. -func planBalanceDestination(activeTopology *topology.ActiveTopology, selectedVolume *types.VolumeHealthMetrics) (*topology.DestinationPlan, error) { +// rp may be nil when the volume has no replication constraint. When replica +// locations are known, candidates that would violate placement are filtered +// out before scoring. +func planBalanceDestination(activeTopology *topology.ActiveTopology, selectedVolume *types.VolumeHealthMetrics, rp *super_block.ReplicaPlacement, replicas []types.ReplicaLocation, allowedServers map[string]int) (*topology.DestinationPlan, error) { // Get source node information from topology var sourceRack, sourceDC string @@ -604,8 +605,21 @@ func planBalanceDestination(activeTopology *topology.ActiveTopology, selectedVol if disk.DiskType != selectedVolume.DiskType { continue } + if !isAllowedBalanceTarget(disk.NodeID, allowedServers) { + continue + } + if rp != nil && len(replicas) > 0 { + target := types.ReplicaLocation{ + DataCenter: disk.DataCenter, + Rack: disk.Rack, + NodeID: disk.NodeID, + } + if !IsGoodMove(rp, replicas, selectedVolume.Server, target) { + continue + } + } - score := calculateBalanceScore(disk, sourceRack, sourceDC, selectedVolume.Size) + score := calculateBalanceScore(disk, sourceRack, sourceDC, selectedVolume.Size, rp) if score > bestScore { bestScore = score bestDisk = disk @@ -636,7 +650,9 @@ func planBalanceDestination(activeTopology *topology.ActiveTopology, selectedVol // calculateBalanceScore calculates placement score for balance operations. // LoadCount reflects pending+assigned tasks on the disk, so we factor it into // the utilization estimate to avoid stacking multiple moves onto the same target. -func calculateBalanceScore(disk *topology.DiskInfo, sourceRack, sourceDC string, volumeSize uint64) float64 { +// rp may be nil when the volume has no replication constraint; in that case the +// scorer defaults to preferring cross-rack/DC distribution. +func calculateBalanceScore(disk *topology.DiskInfo, sourceRack, sourceDC string, volumeSize uint64, rp *super_block.ReplicaPlacement) float64 { if disk.DiskInfo == nil { return 0.0 } @@ -652,17 +668,62 @@ func calculateBalanceScore(disk *topology.DiskInfo, sourceRack, sourceDC string, score += (1.0 - utilization) * 50.0 // Up to 50 points for low utilization } - // Prefer different racks for better distribution - if disk.Rack != sourceRack { - score += 30.0 + // Rack scoring: respect the replication policy. + // If replicas must stay on the same rack (SameRackCount > 0 with no + // cross-rack requirement), prefer same-rack destinations. Otherwise + // prefer different racks for better distribution. + sameRack := disk.Rack == sourceRack + if rp != nil && rp.DiffRackCount == 0 && rp.SameRackCount > 0 { + if sameRack { + score += 30.0 + } + } else { + if !sameRack { + score += 30.0 + } } - // Prefer different data centers for better distribution - if disk.DataCenter != sourceDC { - score += 20.0 + // DC scoring: same idea. If the policy requires all copies in one DC, + // prefer same-DC destinations. Otherwise prefer different DCs. + sameDC := disk.DataCenter == sourceDC + if rp != nil && rp.DiffDataCenterCount == 0 && (rp.SameRackCount > 0 || rp.DiffRackCount > 0) { + if sameDC { + score += 20.0 + } + } else { + if !sameDC { + score += 20.0 + } } return score } +func isAllowedBalanceTarget(nodeID string, allowedServers map[string]int) bool { + if len(allowedServers) == 0 { + return true + } + _, ok := allowedServers[nodeID] + return ok +} + +func isValidBalanceDestination(plan *topology.DestinationPlan, allowedServers map[string]int, rp *super_block.ReplicaPlacement, replicas []types.ReplicaLocation, sourceNodeID string) bool { + if plan == nil { + return false + } + if !isAllowedBalanceTarget(plan.TargetNode, allowedServers) { + return false + } + if rp == nil || len(replicas) == 0 { + return true + } + + target := types.ReplicaLocation{ + DataCenter: plan.TargetDC, + Rack: plan.TargetRack, + NodeID: plan.TargetNode, + } + return IsGoodMove(rp, replicas, sourceNodeID, target) +} + // parseCSVSet splits a comma-separated string into a set of trimmed, non-empty values. diff --git a/weed/worker/tasks/balance/replica_placement_test.go b/weed/worker/tasks/balance/replica_placement_test.go index 438cb303b..303fb192b 100644 --- a/weed/worker/tasks/balance/replica_placement_test.go +++ b/weed/worker/tasks/balance/replica_placement_test.go @@ -3,6 +3,8 @@ package balance import ( "testing" + "github.com/seaweedfs/seaweedfs/weed/admin/topology" + "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "github.com/seaweedfs/seaweedfs/weed/storage/super_block" "github.com/seaweedfs/seaweedfs/weed/worker/types" ) @@ -125,3 +127,112 @@ func TestIsGoodMove_NilReplicaPlacement(t *testing.T) { t.Error("nil replica placement should allow any move") } } + +func TestCalculateBalanceScore_ReplicationAware(t *testing.T) { + disk := func(dc, rack string) *topology.DiskInfo { + return &topology.DiskInfo{ + DataCenter: dc, + Rack: rack, + DiskInfo: &master_pb.DiskInfo{MaxVolumeCount: 100, VolumeCount: 50}, + } + } + + // 001: same-rack replication — should prefer same rack and same DC + rp001 := rp(t, "001") + sameRack := calculateBalanceScore(disk("dc1", "r1"), "r1", "dc1", 0, rp001) + diffRack := calculateBalanceScore(disk("dc1", "r2"), "r1", "dc1", 0, rp001) + diffDC := calculateBalanceScore(disk("dc2", "r2"), "r1", "dc1", 0, rp001) + if sameRack <= diffRack { + t.Errorf("001: same-rack score (%v) should exceed different-rack score (%v)", sameRack, diffRack) + } + if sameRack <= diffDC { + t.Errorf("001: same-rack score (%v) should exceed different-DC score (%v)", sameRack, diffDC) + } + + // 010: different-rack replication — should prefer different rack, same DC + rp010 := rp(t, "010") + sameRack = calculateBalanceScore(disk("dc1", "r1"), "r1", "dc1", 0, rp010) + diffRack = calculateBalanceScore(disk("dc1", "r2"), "r1", "dc1", 0, rp010) + if diffRack <= sameRack { + t.Errorf("010: different-rack score (%v) should exceed same-rack score (%v)", diffRack, sameRack) + } + + // 100: different-DC replication — should prefer different DC + rp100 := rp(t, "100") + sameDC := calculateBalanceScore(disk("dc1", "r2"), "r1", "dc1", 0, rp100) + diffDCScore := calculateBalanceScore(disk("dc2", "r2"), "r1", "dc1", 0, rp100) + if diffDCScore <= sameDC { + t.Errorf("100: different-DC score (%v) should exceed same-DC score (%v)", diffDCScore, sameDC) + } + + // nil rp: should prefer cross-rack/DC (default behavior) + sameRack = calculateBalanceScore(disk("dc1", "r1"), "r1", "dc1", 0, nil) + diffRack = calculateBalanceScore(disk("dc1", "r2"), "r1", "dc1", 0, nil) + if diffRack <= sameRack { + t.Errorf("nil rp: different-rack score (%v) should exceed same-rack score (%v)", diffRack, sameRack) + } +} + +func TestPlanBalanceDestination_ChoosesBestValidCompositeDestination(t *testing.T) { + servers := []serverSpec{ + {id: "node-a", diskType: "hdd", diskID: 1, dc: "dc1", rack: "rack1"}, + {id: "node-b", diskType: "hdd", diskID: 2, dc: "dc1", rack: "rack3"}, + {id: "node-c", diskType: "hdd", diskID: 3, dc: "dc1", rack: "rack1"}, + } + volumes := makeVolumesWith("node-a", "hdd", "dc1", "rack1", "c1", 1, 1, withReplicas(11)) + replicas := []types.ReplicaLocation{ + loc("dc1", "rack1", "node-a"), + loc("dc1", "rack1", "node-d"), + loc("dc1", "rack2", "node-e"), + } + + plan, err := planBalanceDestination(buildTopology(servers, volumes), volumes[0], rp(t, "011"), replicas, map[string]int{ + "node-b": 0, + "node-c": 0, + }) + if err != nil { + t.Fatalf("planBalanceDestination failed: %v", err) + } + if plan.TargetNode != "node-c" { + t.Fatalf("expected valid same-rack destination node-c, got %s", plan.TargetNode) + } + if plan.TargetRack != "rack1" { + t.Fatalf("expected rack1 destination, got %s", plan.TargetRack) + } +} + +func TestCreateBalanceTask_FallbackSelectsValidCompositeDestination(t *testing.T) { + servers := []serverSpec{ + {id: "node-a", diskType: "hdd", diskID: 1, dc: "dc1", rack: "rack1"}, + {id: "node-b", diskType: "hdd", diskID: 2, dc: "dc1", rack: "rack3"}, + {id: "node-c", diskType: "hdd", diskID: 3, dc: "dc1", rack: "rack1"}, + } + volumes := makeVolumesWith("node-a", "hdd", "dc1", "rack1", "c1", 1, 1, withReplicas(11)) + clusterInfo := &types.ClusterInfo{ + ActiveTopology: buildTopology(servers, volumes), + VolumeReplicaMap: map[uint32][]types.ReplicaLocation{ + 1: { + loc("dc1", "rack1", "node-a"), + loc("dc1", "rack1", "node-d"), + loc("dc1", "rack2", "node-e"), + }, + }, + } + + task, destination := createBalanceTask("hdd", volumes[0], clusterInfo, "node-b", map[string]int{ + "node-b": 0, + "node-c": 0, + }) + if task == nil { + t.Fatal("expected a balance task") + } + if destination != "node-c" { + t.Fatalf("expected fallback destination node-c, got %s", destination) + } + if len(task.TypedParams.Targets) != 1 { + t.Fatalf("expected 1 target, got %d", len(task.TypedParams.Targets)) + } + if got := task.TypedParams.Targets[0].Node; got != "node-c:8080" { + t.Fatalf("expected target node-c:8080, got %s", got) + } +}