diff --git a/weed/plugin/worker/volume_balance_handler.go b/weed/plugin/worker/volume_balance_handler.go index c23c7fd2b..1f880bd50 100644 --- a/weed/plugin/worker/volume_balance_handler.go +++ b/weed/plugin/worker/volume_balance_handler.go @@ -15,6 +15,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb/plugin_pb" "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" balancetask "github.com/seaweedfs/seaweedfs/weed/worker/tasks/balance" + taskutil "github.com/seaweedfs/seaweedfs/weed/worker/tasks/util" workertypes "github.com/seaweedfs/seaweedfs/weed/worker/types" "google.golang.org/grpc" "google.golang.org/protobuf/proto" @@ -106,6 +107,30 @@ func (h *VolumeBalanceHandler) Descriptor() *plugin_pb.JobTypeDescriptor { {Value: "FULL", Label: "Full (read-only)"}, }, }, + { + Name: "data_center_filter", + Label: "Data Center Filter", + Description: "Only balance volumes in a single data center. Leave empty for all data centers.", + Placeholder: "all data centers", + FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_STRING, + Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_TEXT, + }, + { + Name: "rack_filter", + Label: "Rack Filter", + Description: "Only balance volumes on these racks (comma-separated). Leave empty for all racks.", + Placeholder: "all racks", + FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_STRING, + Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_TEXT, + }, + { + Name: "node_filter", + Label: "Node Filter", + Description: "Only balance volumes on these nodes (comma-separated server IDs). Leave empty for all nodes.", + Placeholder: "all nodes", + FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_STRING, + Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_TEXT, + }, }, }, }, @@ -116,6 +141,15 @@ func (h *VolumeBalanceHandler) Descriptor() *plugin_pb.JobTypeDescriptor { "volume_state": { Kind: &plugin_pb.ConfigValue_StringValue{StringValue: "ALL"}, }, + "data_center_filter": { + Kind: &plugin_pb.ConfigValue_StringValue{StringValue: ""}, + }, + "rack_filter": { + Kind: &plugin_pb.ConfigValue_StringValue{StringValue: ""}, + }, + "node_filter": { + Kind: &plugin_pb.ConfigValue_StringValue{StringValue: ""}, + }, }, }, WorkerConfigForm: &plugin_pb.ConfigForm{ @@ -288,6 +322,18 @@ func (h *VolumeBalanceHandler) Detect( volumeState := strings.ToUpper(strings.TrimSpace(readStringConfig(request.GetAdminConfigValues(), "volume_state", "ALL"))) metrics = filterMetricsByVolumeState(metrics, volumeState) + dataCenterFilter := strings.TrimSpace(readStringConfig(request.GetAdminConfigValues(), "data_center_filter", "")) + rackFilter := strings.TrimSpace(readStringConfig(request.GetAdminConfigValues(), "rack_filter", "")) + nodeFilter := strings.TrimSpace(readStringConfig(request.GetAdminConfigValues(), "node_filter", "")) + + if dataCenterFilter != "" || rackFilter != "" || nodeFilter != "" { + metrics = filterMetricsByLocation(metrics, dataCenterFilter, rackFilter, nodeFilter) + } + + workerConfig.TaskConfig.DataCenterFilter = dataCenterFilter + workerConfig.TaskConfig.RackFilter = rackFilter + workerConfig.TaskConfig.NodeFilter = nodeFilter + clusterInfo := &workertypes.ClusterInfo{ActiveTopology: activeTopology} maxResults := int(request.MaxResults) @@ -1081,6 +1127,31 @@ func deriveBalanceWorkerConfig(values map[string]*plugin_pb.ConfigValue) *volume } } +func filterMetricsByLocation(metrics []*workertypes.VolumeHealthMetrics, dcFilter, rackFilter, nodeFilter string) []*workertypes.VolumeHealthMetrics { + var rackSet, nodeSet map[string]bool + if rackFilter != "" { + rackSet = taskutil.ParseCSVSet(rackFilter) + } + if nodeFilter != "" { + nodeSet = taskutil.ParseCSVSet(nodeFilter) + } + + filtered := make([]*workertypes.VolumeHealthMetrics, 0, len(metrics)) + for _, m := range metrics { + if dcFilter != "" && m.DataCenter != dcFilter { + continue + } + if rackSet != nil && !rackSet[m.Rack] { + continue + } + if nodeSet != nil && !nodeSet[m.Server] { + continue + } + filtered = append(filtered, m) + } + return filtered +} + func buildVolumeBalanceProposal( result *workertypes.TaskDetectionResult, ) (*plugin_pb.JobProposal, error) { diff --git a/weed/plugin/worker/volume_balance_handler_test.go b/weed/plugin/worker/volume_balance_handler_test.go index 85196c809..f65fb164e 100644 --- a/weed/plugin/worker/volume_balance_handler_test.go +++ b/weed/plugin/worker/volume_balance_handler_test.go @@ -629,6 +629,45 @@ func TestExecuteSingleMovePathUnchanged(t *testing.T) { } } +func TestFilterMetricsByLocation(t *testing.T) { + metrics := []*workertypes.VolumeHealthMetrics{ + {VolumeID: 1, Server: "node-a", DataCenter: "dc1", Rack: "rack1"}, + {VolumeID: 2, Server: "node-b", DataCenter: "dc1", Rack: "rack2"}, + {VolumeID: 3, Server: "node-c", DataCenter: "dc2", Rack: "rack1"}, + {VolumeID: 4, Server: "node-d", DataCenter: "dc2", Rack: "rack3"}, + } + + // Filter by DC + filtered := filterMetricsByLocation(metrics, "dc1", "", "") + if len(filtered) != 2 { + t.Fatalf("DC filter: expected 2, got %d", len(filtered)) + } + + // Filter by rack + filtered = filterMetricsByLocation(metrics, "", "rack1,rack2", "") + if len(filtered) != 3 { + t.Fatalf("rack filter: expected 3, got %d", len(filtered)) + } + + // Filter by node + filtered = filterMetricsByLocation(metrics, "", "", "node-a,node-c") + if len(filtered) != 2 { + t.Fatalf("node filter: expected 2, got %d", len(filtered)) + } + + // Combined DC + rack + filtered = filterMetricsByLocation(metrics, "dc2", "rack3", "") + if len(filtered) != 1 { + t.Fatalf("DC+rack filter: expected 1, got %d", len(filtered)) + } + + // Empty filters pass all + filtered = filterMetricsByLocation(metrics, "", "", "") + if len(filtered) != 4 { + t.Fatalf("no filter: expected 4, got %d", len(filtered)) + } +} + func TestFilterMetricsByVolumeState(t *testing.T) { metrics := []*workertypes.VolumeHealthMetrics{ {VolumeID: 1, FullnessRatio: 0.5}, // active diff --git a/weed/worker/tasks/balance/config.go b/weed/worker/tasks/balance/config.go index 9303b4b2a..e9d2af637 100644 --- a/weed/worker/tasks/balance/config.go +++ b/weed/worker/tasks/balance/config.go @@ -14,6 +14,9 @@ type Config struct { base.BaseConfig ImbalanceThreshold float64 `json:"imbalance_threshold"` MinServerCount int `json:"min_server_count"` + DataCenterFilter string `json:"-"` // per-detection-run, not persisted + RackFilter string `json:"-"` // per-detection-run, not persisted + NodeFilter string `json:"-"` // per-detection-run, not persisted } // NewDefaultConfig creates a new default balance configuration diff --git a/weed/worker/tasks/balance/detection.go b/weed/worker/tasks/balance/detection.go index d10033dd3..6af1dfd77 100644 --- a/weed/worker/tasks/balance/detection.go +++ b/weed/worker/tasks/balance/detection.go @@ -4,6 +4,7 @@ import ( "fmt" "math" "sort" + "strings" "time" "github.com/seaweedfs/seaweedfs/weed/admin/topology" @@ -86,9 +87,21 @@ func detectForDiskType(diskType string, diskMetrics []*types.VolumeHealthMetrics if clusterInfo.ActiveTopology != nil { topologyInfo := clusterInfo.ActiveTopology.GetTopologyInfo() if topologyInfo != nil { + rackFilterSet := util.ParseCSVSet(balanceConfig.RackFilter) + nodeFilterSet := util.ParseCSVSet(balanceConfig.NodeFilter) + dcFilter := strings.TrimSpace(balanceConfig.DataCenterFilter) for _, dc := range topologyInfo.DataCenterInfos { + if dcFilter != "" && dc.Id != dcFilter { + continue + } for _, rack := range dc.RackInfos { + if rackFilterSet != nil && !rackFilterSet[rack.Id] { + continue + } for _, node := range rack.DataNodeInfos { + if nodeFilterSet != nil && !nodeFilterSet[node.Id] { + continue + } for diskTypeName, diskInfo := range node.DiskInfos { if diskTypeName == diskType { serverVolumeCounts[node.Id] = 0 @@ -100,7 +113,15 @@ func detectForDiskType(diskType string, diskMetrics []*types.VolumeHealthMetrics } } } + hasLocationFilter := balanceConfig.DataCenterFilter != "" || balanceConfig.RackFilter != "" || balanceConfig.NodeFilter != "" for _, metric := range diskMetrics { + if hasLocationFilter { + // Only count metrics for servers that passed filtering. + // Without this guard, out-of-scope servers are re-introduced. + if _, allowed := serverVolumeCounts[metric.Server]; !allowed { + continue + } + } serverVolumeCounts[metric.Server]++ } @@ -285,7 +306,7 @@ func detectForDiskType(diskType string, diskMetrics []*types.VolumeHealthMetrics // and the destination selection stay in sync. Without this, the topology's // LoadCount-based scoring can diverge from the adjustment-based effective // counts, causing moves to pile onto one server or oscillate (A→B, B→A). - task, destServerID := createBalanceTask(diskType, selectedVolume, clusterInfo, minServer) + task, destServerID := createBalanceTask(diskType, selectedVolume, clusterInfo, minServer, serverVolumeCounts) if task == nil { glog.V(1).Infof("BALANCE [%s]: Cannot plan task for volume %d on server %s, trying next volume", diskType, selectedVolume.VolumeID, maxServer) continue @@ -317,7 +338,10 @@ func detectForDiskType(diskType string, diskMetrics []*types.VolumeHealthMetrics // targetServer is the server ID chosen by the detection loop's greedy algorithm. // Returns (nil, "") if destination planning fails. // On success, returns the task result and the canonical destination server ID. -func createBalanceTask(diskType string, selectedVolume *types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo, targetServer string) (*types.TaskDetectionResult, string) { +// allowedServers is the set of servers that passed DC/rack/node filtering in +// the detection loop. When non-empty, the fallback destination planner is +// checked against this set so that filter scope cannot leak. +func createBalanceTask(diskType string, selectedVolume *types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo, targetServer string, allowedServers map[string]int) (*types.TaskDetectionResult, string) { taskID := fmt.Sprintf("balance_vol_%d_%d", selectedVolume.VolumeID, time.Now().UnixNano()) task := &types.TaskDetectionResult{ @@ -353,6 +377,18 @@ func createBalanceTask(diskType string, selectedVolume *types.VolumeHealthMetric } } + // Verify the destination is within the filtered scope. When DC/rack/node + // filters are active, allowedServers contains only the servers that passed + // filtering. The fallback planner queries the full topology, so this check + // prevents out-of-scope targets from leaking through. + if len(allowedServers) > 0 { + if _, ok := allowedServers[destinationPlan.TargetNode]; !ok { + glog.V(1).Infof("BALANCE [%s]: Planned destination %s for volume %d is outside filtered scope, skipping", + diskType, destinationPlan.TargetNode, selectedVolume.VolumeID) + return nil, "" + } + } + // Find the actual disk containing the volume on the source server sourceDisk, found := base.FindVolumeDisk(clusterInfo.ActiveTopology, selectedVolume.VolumeID, selectedVolume.Collection, selectedVolume.Server) if !found { @@ -587,3 +623,5 @@ func calculateBalanceScore(disk *topology.DiskInfo, sourceRack, sourceDC string, return score } + +// parseCSVSet splits a comma-separated string into a set of trimmed, non-empty values. diff --git a/weed/worker/tasks/balance/detection_test.go b/weed/worker/tasks/balance/detection_test.go index aee5b5420..d55fbd5d4 100644 --- a/weed/worker/tasks/balance/detection_test.go +++ b/weed/worker/tasks/balance/detection_test.go @@ -2,6 +2,7 @@ package balance import ( "fmt" + "strings" "testing" "github.com/seaweedfs/seaweedfs/weed/admin/topology" @@ -1000,3 +1001,112 @@ func TestDetection_ZeroVolumeServerIncludedInBalance(t *testing.T) { t.Logf("Distribution 8/2/1/0 → %v after %d moves (imbalance=%.1f%%)", effective, len(tasks), imbalance*100) } + +func TestDetection_DataCenterFilter(t *testing.T) { + servers := []serverSpec{ + {id: "node-a", diskType: "hdd", diskID: 1, dc: "dc1", rack: "rack1"}, + {id: "node-b", diskType: "hdd", diskID: 2, dc: "dc1", rack: "rack1"}, + {id: "node-c", diskType: "hdd", diskID: 3, dc: "dc2", rack: "rack1"}, + } + + var metrics []*types.VolumeHealthMetrics + metrics = append(metrics, makeVolumes("node-a", "hdd", "dc1", "rack1", "c1", 1, 50)...) + metrics = append(metrics, makeVolumes("node-b", "hdd", "dc1", "rack1", "c1", 100, 10)...) + // node-c is in dc2, should be excluded by filter + metrics = append(metrics, makeVolumes("node-c", "hdd", "dc2", "rack1", "c1", 200, 30)...) + + // Only include metrics from dc1 + dc1Metrics := make([]*types.VolumeHealthMetrics, 0) + for _, m := range metrics { + if m.DataCenter == "dc1" { + dc1Metrics = append(dc1Metrics, m) + } + } + + at := buildTopology(servers, metrics) + clusterInfo := &types.ClusterInfo{ActiveTopology: at} + + conf := defaultConf() + conf.DataCenterFilter = "dc1" + + tasks, _, err := Detection(dc1Metrics, clusterInfo, conf, 100) + if err != nil { + t.Fatalf("Detection failed: %v", err) + } + + // Ensure detection produced tasks so the following checks are not vacuous. + if len(tasks) == 0 { + t.Fatal("Expected balance tasks for 50/10 imbalance within dc1, got 0") + } + + // With DC filter, only node-a and node-b are considered in topology seeding. + // node-c should never appear as source or destination. + for _, task := range tasks { + if task.Server == "node-c" { + t.Errorf("node-c (dc2) should not be a source with dc1 filter") + } + if task.TypedParams != nil { + for _, tgt := range task.TypedParams.Targets { + if strings.Contains(tgt.Node, "node-c") { + t.Errorf("node-c (dc2) should not be a target with dc1 filter") + } + } + } + } + + if len(tasks) > 0 { + t.Logf("Created %d tasks within dc1 scope", len(tasks)) + } +} + +func TestDetection_NodeFilter(t *testing.T) { + servers := []serverSpec{ + {id: "node-a", diskType: "hdd", diskID: 1, dc: "dc1", rack: "rack1"}, + {id: "node-b", diskType: "hdd", diskID: 2, dc: "dc1", rack: "rack1"}, + {id: "node-c", diskType: "hdd", diskID: 3, dc: "dc1", rack: "rack1"}, + } + + var metrics []*types.VolumeHealthMetrics + metrics = append(metrics, makeVolumes("node-a", "hdd", "dc1", "rack1", "c1", 1, 50)...) + metrics = append(metrics, makeVolumes("node-b", "hdd", "dc1", "rack1", "c1", 100, 10)...) + metrics = append(metrics, makeVolumes("node-c", "hdd", "dc1", "rack1", "c1", 200, 5)...) + + // Only include metrics from node-a and node-b + filteredMetrics := make([]*types.VolumeHealthMetrics, 0) + for _, m := range metrics { + if m.Server == "node-a" || m.Server == "node-b" { + filteredMetrics = append(filteredMetrics, m) + } + } + + at := buildTopology(servers, metrics) + clusterInfo := &types.ClusterInfo{ActiveTopology: at} + + conf := defaultConf() + conf.NodeFilter = "node-a,node-b" + + tasks, _, err := Detection(filteredMetrics, clusterInfo, conf, 100) + if err != nil { + t.Fatalf("Detection failed: %v", err) + } + + // Ensure detection produced tasks so the following checks are not vacuous. + if len(tasks) == 0 { + t.Fatal("Expected balance tasks for 50/10 imbalance within node-a,node-b scope, got 0") + } + + for _, task := range tasks { + if task.Server == "node-c" { + t.Errorf("node-c should not be a source with node filter") + } + if task.TypedParams != nil { + for _, tgt := range task.TypedParams.Targets { + if strings.Contains(tgt.Node, "node-c") { + t.Errorf("node-c should not be a target with node filter") + } + } + } + } + + t.Logf("Created %d tasks within node-a,node-b scope", len(tasks)) +} diff --git a/weed/worker/tasks/util/csv.go b/weed/worker/tasks/util/csv.go new file mode 100644 index 000000000..50fb09bff --- /dev/null +++ b/weed/worker/tasks/util/csv.go @@ -0,0 +1,20 @@ +package util + +import "strings" + +// ParseCSVSet splits a comma-separated string into a set of trimmed, +// non-empty values. Returns nil if the input is empty. +func ParseCSVSet(csv string) map[string]bool { + csv = strings.TrimSpace(csv) + if csv == "" { + return nil + } + set := make(map[string]bool) + for _, item := range strings.Split(csv, ",") { + trimmed := strings.TrimSpace(item) + if trimmed != "" { + set[trimmed] = true + } + } + return set +}