From 47ddf05d95d86117aaab5efbe89e91765b5d788e Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Fri, 13 Mar 2026 17:03:37 -0700 Subject: [PATCH] feat(plugin): DC/rack/node filtering for volume balance (#8621) * feat(plugin): add DC/rack/node filtering for volume balance detection Add scoping filters so balance detection can be limited to specific data centers, racks, or nodes. Filters are applied both at the metrics level (in the handler) and at the topology seeding level (in detection) to ensure only the targeted infrastructure participates in balancing. * address PR review: use set lookups, deduplicate test helpers, add target checks * address review: assert non-empty tasks in filter tests Prevent vacuous test passes by requiring len(tasks) > 0 before checking source/target exclusions. * address review: enforce filter scope in fallback, clarify DC filter - Thread allowedServers into createBalanceTask so the fallback planner cannot produce out-of-scope targets when DC/rack/node filters are active - Update data_center_filter description to clarify single-DC usage * address review: centralize parseCSVSet, fix filter scope leak, iterate all targets - Extract ParseCSVSet to shared weed/worker/tasks/util package, remove duplicates from detection.go and volume_balance_handler.go - Fix metric accumulation re-introducing filtered-out servers by only counting metrics for servers that passed DC/rack/node filters - Trim DataCenterFilter before matching to handle trailing spaces - Iterate all task.TypedParams.Targets in filter tests, not just [0] * remove useless descriptor string test --- weed/plugin/worker/volume_balance_handler.go | 71 +++++++++++ .../worker/volume_balance_handler_test.go | 39 +++++++ weed/worker/tasks/balance/config.go | 3 + weed/worker/tasks/balance/detection.go | 42 ++++++- weed/worker/tasks/balance/detection_test.go | 110 ++++++++++++++++++ weed/worker/tasks/util/csv.go | 20 ++++ 6 files changed, 283 insertions(+), 2 deletions(-) create mode 100644 weed/worker/tasks/util/csv.go diff --git a/weed/plugin/worker/volume_balance_handler.go b/weed/plugin/worker/volume_balance_handler.go index c23c7fd2b..1f880bd50 100644 --- a/weed/plugin/worker/volume_balance_handler.go +++ b/weed/plugin/worker/volume_balance_handler.go @@ -15,6 +15,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb/plugin_pb" "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" balancetask "github.com/seaweedfs/seaweedfs/weed/worker/tasks/balance" + taskutil "github.com/seaweedfs/seaweedfs/weed/worker/tasks/util" workertypes "github.com/seaweedfs/seaweedfs/weed/worker/types" "google.golang.org/grpc" "google.golang.org/protobuf/proto" @@ -106,6 +107,30 @@ func (h *VolumeBalanceHandler) Descriptor() *plugin_pb.JobTypeDescriptor { {Value: "FULL", Label: "Full (read-only)"}, }, }, + { + Name: "data_center_filter", + Label: "Data Center Filter", + Description: "Only balance volumes in a single data center. Leave empty for all data centers.", + Placeholder: "all data centers", + FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_STRING, + Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_TEXT, + }, + { + Name: "rack_filter", + Label: "Rack Filter", + Description: "Only balance volumes on these racks (comma-separated). Leave empty for all racks.", + Placeholder: "all racks", + FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_STRING, + Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_TEXT, + }, + { + Name: "node_filter", + Label: "Node Filter", + Description: "Only balance volumes on these nodes (comma-separated server IDs). Leave empty for all nodes.", + Placeholder: "all nodes", + FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_STRING, + Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_TEXT, + }, }, }, }, @@ -116,6 +141,15 @@ func (h *VolumeBalanceHandler) Descriptor() *plugin_pb.JobTypeDescriptor { "volume_state": { Kind: &plugin_pb.ConfigValue_StringValue{StringValue: "ALL"}, }, + "data_center_filter": { + Kind: &plugin_pb.ConfigValue_StringValue{StringValue: ""}, + }, + "rack_filter": { + Kind: &plugin_pb.ConfigValue_StringValue{StringValue: ""}, + }, + "node_filter": { + Kind: &plugin_pb.ConfigValue_StringValue{StringValue: ""}, + }, }, }, WorkerConfigForm: &plugin_pb.ConfigForm{ @@ -288,6 +322,18 @@ func (h *VolumeBalanceHandler) Detect( volumeState := strings.ToUpper(strings.TrimSpace(readStringConfig(request.GetAdminConfigValues(), "volume_state", "ALL"))) metrics = filterMetricsByVolumeState(metrics, volumeState) + dataCenterFilter := strings.TrimSpace(readStringConfig(request.GetAdminConfigValues(), "data_center_filter", "")) + rackFilter := strings.TrimSpace(readStringConfig(request.GetAdminConfigValues(), "rack_filter", "")) + nodeFilter := strings.TrimSpace(readStringConfig(request.GetAdminConfigValues(), "node_filter", "")) + + if dataCenterFilter != "" || rackFilter != "" || nodeFilter != "" { + metrics = filterMetricsByLocation(metrics, dataCenterFilter, rackFilter, nodeFilter) + } + + workerConfig.TaskConfig.DataCenterFilter = dataCenterFilter + workerConfig.TaskConfig.RackFilter = rackFilter + workerConfig.TaskConfig.NodeFilter = nodeFilter + clusterInfo := &workertypes.ClusterInfo{ActiveTopology: activeTopology} maxResults := int(request.MaxResults) @@ -1081,6 +1127,31 @@ func deriveBalanceWorkerConfig(values map[string]*plugin_pb.ConfigValue) *volume } } +func filterMetricsByLocation(metrics []*workertypes.VolumeHealthMetrics, dcFilter, rackFilter, nodeFilter string) []*workertypes.VolumeHealthMetrics { + var rackSet, nodeSet map[string]bool + if rackFilter != "" { + rackSet = taskutil.ParseCSVSet(rackFilter) + } + if nodeFilter != "" { + nodeSet = taskutil.ParseCSVSet(nodeFilter) + } + + filtered := make([]*workertypes.VolumeHealthMetrics, 0, len(metrics)) + for _, m := range metrics { + if dcFilter != "" && m.DataCenter != dcFilter { + continue + } + if rackSet != nil && !rackSet[m.Rack] { + continue + } + if nodeSet != nil && !nodeSet[m.Server] { + continue + } + filtered = append(filtered, m) + } + return filtered +} + func buildVolumeBalanceProposal( result *workertypes.TaskDetectionResult, ) (*plugin_pb.JobProposal, error) { diff --git a/weed/plugin/worker/volume_balance_handler_test.go b/weed/plugin/worker/volume_balance_handler_test.go index 85196c809..f65fb164e 100644 --- a/weed/plugin/worker/volume_balance_handler_test.go +++ b/weed/plugin/worker/volume_balance_handler_test.go @@ -629,6 +629,45 @@ func TestExecuteSingleMovePathUnchanged(t *testing.T) { } } +func TestFilterMetricsByLocation(t *testing.T) { + metrics := []*workertypes.VolumeHealthMetrics{ + {VolumeID: 1, Server: "node-a", DataCenter: "dc1", Rack: "rack1"}, + {VolumeID: 2, Server: "node-b", DataCenter: "dc1", Rack: "rack2"}, + {VolumeID: 3, Server: "node-c", DataCenter: "dc2", Rack: "rack1"}, + {VolumeID: 4, Server: "node-d", DataCenter: "dc2", Rack: "rack3"}, + } + + // Filter by DC + filtered := filterMetricsByLocation(metrics, "dc1", "", "") + if len(filtered) != 2 { + t.Fatalf("DC filter: expected 2, got %d", len(filtered)) + } + + // Filter by rack + filtered = filterMetricsByLocation(metrics, "", "rack1,rack2", "") + if len(filtered) != 3 { + t.Fatalf("rack filter: expected 3, got %d", len(filtered)) + } + + // Filter by node + filtered = filterMetricsByLocation(metrics, "", "", "node-a,node-c") + if len(filtered) != 2 { + t.Fatalf("node filter: expected 2, got %d", len(filtered)) + } + + // Combined DC + rack + filtered = filterMetricsByLocation(metrics, "dc2", "rack3", "") + if len(filtered) != 1 { + t.Fatalf("DC+rack filter: expected 1, got %d", len(filtered)) + } + + // Empty filters pass all + filtered = filterMetricsByLocation(metrics, "", "", "") + if len(filtered) != 4 { + t.Fatalf("no filter: expected 4, got %d", len(filtered)) + } +} + func TestFilterMetricsByVolumeState(t *testing.T) { metrics := []*workertypes.VolumeHealthMetrics{ {VolumeID: 1, FullnessRatio: 0.5}, // active diff --git a/weed/worker/tasks/balance/config.go b/weed/worker/tasks/balance/config.go index 9303b4b2a..e9d2af637 100644 --- a/weed/worker/tasks/balance/config.go +++ b/weed/worker/tasks/balance/config.go @@ -14,6 +14,9 @@ type Config struct { base.BaseConfig ImbalanceThreshold float64 `json:"imbalance_threshold"` MinServerCount int `json:"min_server_count"` + DataCenterFilter string `json:"-"` // per-detection-run, not persisted + RackFilter string `json:"-"` // per-detection-run, not persisted + NodeFilter string `json:"-"` // per-detection-run, not persisted } // NewDefaultConfig creates a new default balance configuration diff --git a/weed/worker/tasks/balance/detection.go b/weed/worker/tasks/balance/detection.go index d10033dd3..6af1dfd77 100644 --- a/weed/worker/tasks/balance/detection.go +++ b/weed/worker/tasks/balance/detection.go @@ -4,6 +4,7 @@ import ( "fmt" "math" "sort" + "strings" "time" "github.com/seaweedfs/seaweedfs/weed/admin/topology" @@ -86,9 +87,21 @@ func detectForDiskType(diskType string, diskMetrics []*types.VolumeHealthMetrics if clusterInfo.ActiveTopology != nil { topologyInfo := clusterInfo.ActiveTopology.GetTopologyInfo() if topologyInfo != nil { + rackFilterSet := util.ParseCSVSet(balanceConfig.RackFilter) + nodeFilterSet := util.ParseCSVSet(balanceConfig.NodeFilter) + dcFilter := strings.TrimSpace(balanceConfig.DataCenterFilter) for _, dc := range topologyInfo.DataCenterInfos { + if dcFilter != "" && dc.Id != dcFilter { + continue + } for _, rack := range dc.RackInfos { + if rackFilterSet != nil && !rackFilterSet[rack.Id] { + continue + } for _, node := range rack.DataNodeInfos { + if nodeFilterSet != nil && !nodeFilterSet[node.Id] { + continue + } for diskTypeName, diskInfo := range node.DiskInfos { if diskTypeName == diskType { serverVolumeCounts[node.Id] = 0 @@ -100,7 +113,15 @@ func detectForDiskType(diskType string, diskMetrics []*types.VolumeHealthMetrics } } } + hasLocationFilter := balanceConfig.DataCenterFilter != "" || balanceConfig.RackFilter != "" || balanceConfig.NodeFilter != "" for _, metric := range diskMetrics { + if hasLocationFilter { + // Only count metrics for servers that passed filtering. + // Without this guard, out-of-scope servers are re-introduced. + if _, allowed := serverVolumeCounts[metric.Server]; !allowed { + continue + } + } serverVolumeCounts[metric.Server]++ } @@ -285,7 +306,7 @@ func detectForDiskType(diskType string, diskMetrics []*types.VolumeHealthMetrics // and the destination selection stay in sync. Without this, the topology's // LoadCount-based scoring can diverge from the adjustment-based effective // counts, causing moves to pile onto one server or oscillate (A→B, B→A). - task, destServerID := createBalanceTask(diskType, selectedVolume, clusterInfo, minServer) + task, destServerID := createBalanceTask(diskType, selectedVolume, clusterInfo, minServer, serverVolumeCounts) if task == nil { glog.V(1).Infof("BALANCE [%s]: Cannot plan task for volume %d on server %s, trying next volume", diskType, selectedVolume.VolumeID, maxServer) continue @@ -317,7 +338,10 @@ func detectForDiskType(diskType string, diskMetrics []*types.VolumeHealthMetrics // targetServer is the server ID chosen by the detection loop's greedy algorithm. // Returns (nil, "") if destination planning fails. // On success, returns the task result and the canonical destination server ID. -func createBalanceTask(diskType string, selectedVolume *types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo, targetServer string) (*types.TaskDetectionResult, string) { +// allowedServers is the set of servers that passed DC/rack/node filtering in +// the detection loop. When non-empty, the fallback destination planner is +// checked against this set so that filter scope cannot leak. +func createBalanceTask(diskType string, selectedVolume *types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo, targetServer string, allowedServers map[string]int) (*types.TaskDetectionResult, string) { taskID := fmt.Sprintf("balance_vol_%d_%d", selectedVolume.VolumeID, time.Now().UnixNano()) task := &types.TaskDetectionResult{ @@ -353,6 +377,18 @@ func createBalanceTask(diskType string, selectedVolume *types.VolumeHealthMetric } } + // Verify the destination is within the filtered scope. When DC/rack/node + // filters are active, allowedServers contains only the servers that passed + // filtering. The fallback planner queries the full topology, so this check + // prevents out-of-scope targets from leaking through. + if len(allowedServers) > 0 { + if _, ok := allowedServers[destinationPlan.TargetNode]; !ok { + glog.V(1).Infof("BALANCE [%s]: Planned destination %s for volume %d is outside filtered scope, skipping", + diskType, destinationPlan.TargetNode, selectedVolume.VolumeID) + return nil, "" + } + } + // Find the actual disk containing the volume on the source server sourceDisk, found := base.FindVolumeDisk(clusterInfo.ActiveTopology, selectedVolume.VolumeID, selectedVolume.Collection, selectedVolume.Server) if !found { @@ -587,3 +623,5 @@ func calculateBalanceScore(disk *topology.DiskInfo, sourceRack, sourceDC string, return score } + +// parseCSVSet splits a comma-separated string into a set of trimmed, non-empty values. diff --git a/weed/worker/tasks/balance/detection_test.go b/weed/worker/tasks/balance/detection_test.go index aee5b5420..d55fbd5d4 100644 --- a/weed/worker/tasks/balance/detection_test.go +++ b/weed/worker/tasks/balance/detection_test.go @@ -2,6 +2,7 @@ package balance import ( "fmt" + "strings" "testing" "github.com/seaweedfs/seaweedfs/weed/admin/topology" @@ -1000,3 +1001,112 @@ func TestDetection_ZeroVolumeServerIncludedInBalance(t *testing.T) { t.Logf("Distribution 8/2/1/0 → %v after %d moves (imbalance=%.1f%%)", effective, len(tasks), imbalance*100) } + +func TestDetection_DataCenterFilter(t *testing.T) { + servers := []serverSpec{ + {id: "node-a", diskType: "hdd", diskID: 1, dc: "dc1", rack: "rack1"}, + {id: "node-b", diskType: "hdd", diskID: 2, dc: "dc1", rack: "rack1"}, + {id: "node-c", diskType: "hdd", diskID: 3, dc: "dc2", rack: "rack1"}, + } + + var metrics []*types.VolumeHealthMetrics + metrics = append(metrics, makeVolumes("node-a", "hdd", "dc1", "rack1", "c1", 1, 50)...) + metrics = append(metrics, makeVolumes("node-b", "hdd", "dc1", "rack1", "c1", 100, 10)...) + // node-c is in dc2, should be excluded by filter + metrics = append(metrics, makeVolumes("node-c", "hdd", "dc2", "rack1", "c1", 200, 30)...) + + // Only include metrics from dc1 + dc1Metrics := make([]*types.VolumeHealthMetrics, 0) + for _, m := range metrics { + if m.DataCenter == "dc1" { + dc1Metrics = append(dc1Metrics, m) + } + } + + at := buildTopology(servers, metrics) + clusterInfo := &types.ClusterInfo{ActiveTopology: at} + + conf := defaultConf() + conf.DataCenterFilter = "dc1" + + tasks, _, err := Detection(dc1Metrics, clusterInfo, conf, 100) + if err != nil { + t.Fatalf("Detection failed: %v", err) + } + + // Ensure detection produced tasks so the following checks are not vacuous. + if len(tasks) == 0 { + t.Fatal("Expected balance tasks for 50/10 imbalance within dc1, got 0") + } + + // With DC filter, only node-a and node-b are considered in topology seeding. + // node-c should never appear as source or destination. + for _, task := range tasks { + if task.Server == "node-c" { + t.Errorf("node-c (dc2) should not be a source with dc1 filter") + } + if task.TypedParams != nil { + for _, tgt := range task.TypedParams.Targets { + if strings.Contains(tgt.Node, "node-c") { + t.Errorf("node-c (dc2) should not be a target with dc1 filter") + } + } + } + } + + if len(tasks) > 0 { + t.Logf("Created %d tasks within dc1 scope", len(tasks)) + } +} + +func TestDetection_NodeFilter(t *testing.T) { + servers := []serverSpec{ + {id: "node-a", diskType: "hdd", diskID: 1, dc: "dc1", rack: "rack1"}, + {id: "node-b", diskType: "hdd", diskID: 2, dc: "dc1", rack: "rack1"}, + {id: "node-c", diskType: "hdd", diskID: 3, dc: "dc1", rack: "rack1"}, + } + + var metrics []*types.VolumeHealthMetrics + metrics = append(metrics, makeVolumes("node-a", "hdd", "dc1", "rack1", "c1", 1, 50)...) + metrics = append(metrics, makeVolumes("node-b", "hdd", "dc1", "rack1", "c1", 100, 10)...) + metrics = append(metrics, makeVolumes("node-c", "hdd", "dc1", "rack1", "c1", 200, 5)...) + + // Only include metrics from node-a and node-b + filteredMetrics := make([]*types.VolumeHealthMetrics, 0) + for _, m := range metrics { + if m.Server == "node-a" || m.Server == "node-b" { + filteredMetrics = append(filteredMetrics, m) + } + } + + at := buildTopology(servers, metrics) + clusterInfo := &types.ClusterInfo{ActiveTopology: at} + + conf := defaultConf() + conf.NodeFilter = "node-a,node-b" + + tasks, _, err := Detection(filteredMetrics, clusterInfo, conf, 100) + if err != nil { + t.Fatalf("Detection failed: %v", err) + } + + // Ensure detection produced tasks so the following checks are not vacuous. + if len(tasks) == 0 { + t.Fatal("Expected balance tasks for 50/10 imbalance within node-a,node-b scope, got 0") + } + + for _, task := range tasks { + if task.Server == "node-c" { + t.Errorf("node-c should not be a source with node filter") + } + if task.TypedParams != nil { + for _, tgt := range task.TypedParams.Targets { + if strings.Contains(tgt.Node, "node-c") { + t.Errorf("node-c should not be a target with node filter") + } + } + } + } + + t.Logf("Created %d tasks within node-a,node-b scope", len(tasks)) +} diff --git a/weed/worker/tasks/util/csv.go b/weed/worker/tasks/util/csv.go new file mode 100644 index 000000000..50fb09bff --- /dev/null +++ b/weed/worker/tasks/util/csv.go @@ -0,0 +1,20 @@ +package util + +import "strings" + +// ParseCSVSet splits a comma-separated string into a set of trimmed, +// non-empty values. Returns nil if the input is empty. +func ParseCSVSet(csv string) map[string]bool { + csv = strings.TrimSpace(csv) + if csv == "" { + return nil + } + set := make(map[string]bool) + for _, item := range strings.Split(csv, ",") { + trimmed := strings.TrimSpace(item) + if trimmed != "" { + set[trimmed] = true + } + } + return set +}