From 2f51a94416775160775ed5a759ad892238a6876b Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Fri, 13 Mar 2026 23:41:58 -0700 Subject: [PATCH] feat(vacuum): add volume state and location filters to vacuum handler (#8625) * feat(vacuum): add volume state, location, and enhanced collection filters Align the vacuum handler's admin config with the balance handler by adding: - volume_state filter (ALL/ACTIVE/FULL) to scope vacuum to writable or read-only volumes - data_center_filter, rack_filter, node_filter to scope vacuum to specific infrastructure locations - Enhanced collection_filter description matching the balance handler's ALL_COLLECTIONS/EACH_COLLECTION/regex modes The new filters reuse filterMetricsByVolumeState() and filterMetricsByLocation() already defined in the same package. * use wildcard matchers for DC/rack/node filters Replace exact-match and CSV set lookups with wildcard matching from util/wildcard package. Patterns like "dc*", "rack-1?", or "node-a*" are now supported in all location filter fields for both balance and vacuum handlers. * add nil guard in filterMetricsByLocation --- weed/plugin/worker/vacuum_handler.go | 63 ++++++++- weed/plugin/worker/vacuum_handler_test.go | 132 +++++++++++++++++++ weed/plugin/worker/volume_balance_handler.go | 27 ++-- weed/worker/tasks/balance/detection.go | 14 +- 4 files changed, 213 insertions(+), 23 deletions(-) diff --git a/weed/plugin/worker/vacuum_handler.go b/weed/plugin/worker/vacuum_handler.go index 7855e6b72..02c110aa5 100644 --- a/weed/plugin/worker/vacuum_handler.go +++ b/weed/plugin/worker/vacuum_handler.go @@ -87,8 +87,44 @@ func (h *VacuumHandler) Descriptor() *plugin_pb.JobTypeDescriptor { { Name: "collection_filter", Label: "Collection Filter", - Description: "Only scan this collection when set.", - Placeholder: "all collections", + Description: "Filter collections for vacuum detection. Use ALL_COLLECTIONS (default) to treat all volumes as one pool, EACH_COLLECTION to run detection separately per collection, or a regex pattern to match specific collections.", + Placeholder: "ALL_COLLECTIONS", + FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_STRING, + Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_TEXT, + }, + { + Name: "volume_state", + Label: "Volume State Filter", + Description: "Filter volumes by state: ALL (default), ACTIVE (writable volumes below size limit), or FULL (read-only volumes above size limit).", + FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_ENUM, + Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_SELECT, + Options: []*plugin_pb.ConfigOption{ + {Value: string(volumeStateAll), Label: "All Volumes"}, + {Value: string(volumeStateActive), Label: "Active (writable)"}, + {Value: string(volumeStateFull), Label: "Full (read-only)"}, + }, + }, + { + Name: "data_center_filter", + Label: "Data Center Filter", + Description: "Only vacuum volumes in matching data centers (comma-separated, wildcards supported). Leave empty for all.", + Placeholder: "all data centers", + FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_STRING, + Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_TEXT, + }, + { + Name: "rack_filter", + Label: "Rack Filter", + Description: "Only vacuum volumes on matching racks (comma-separated, wildcards supported). Leave empty for all.", + Placeholder: "all racks", + FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_STRING, + Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_TEXT, + }, + { + Name: "node_filter", + Label: "Node Filter", + Description: "Only vacuum volumes on matching nodes (comma-separated, wildcards supported). Leave empty for all.", + Placeholder: "all nodes", FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_STRING, Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_TEXT, }, @@ -99,6 +135,18 @@ func (h *VacuumHandler) Descriptor() *plugin_pb.JobTypeDescriptor { "collection_filter": { Kind: &plugin_pb.ConfigValue_StringValue{StringValue: ""}, }, + "volume_state": { + Kind: &plugin_pb.ConfigValue_StringValue{StringValue: string(volumeStateAll)}, + }, + "data_center_filter": { + Kind: &plugin_pb.ConfigValue_StringValue{StringValue: ""}, + }, + "rack_filter": { + Kind: &plugin_pb.ConfigValue_StringValue{StringValue: ""}, + }, + "node_filter": { + Kind: &plugin_pb.ConfigValue_StringValue{StringValue: ""}, + }, }, }, WorkerConfigForm: &plugin_pb.ConfigForm{ @@ -226,6 +274,17 @@ func (h *VacuumHandler) Detect(ctx context.Context, request *plugin_pb.RunDetect return err } + volState := volumeState(strings.ToUpper(strings.TrimSpace(readStringConfig(request.GetAdminConfigValues(), "volume_state", string(volumeStateAll))))) + metrics = filterMetricsByVolumeState(metrics, volState) + + dataCenterFilter := strings.TrimSpace(readStringConfig(request.GetAdminConfigValues(), "data_center_filter", "")) + rackFilter := strings.TrimSpace(readStringConfig(request.GetAdminConfigValues(), "rack_filter", "")) + nodeFilter := strings.TrimSpace(readStringConfig(request.GetAdminConfigValues(), "node_filter", "")) + + if dataCenterFilter != "" || rackFilter != "" || nodeFilter != "" { + metrics = filterMetricsByLocation(metrics, dataCenterFilter, rackFilter, nodeFilter) + } + clusterInfo := &workertypes.ClusterInfo{ActiveTopology: activeTopology} results, err := vacuumtask.Detection(metrics, clusterInfo, workerConfig) if err != nil { diff --git a/weed/plugin/worker/vacuum_handler_test.go b/weed/plugin/worker/vacuum_handler_test.go index 346b592c0..6fbcfd60d 100644 --- a/weed/plugin/worker/vacuum_handler_test.go +++ b/weed/plugin/worker/vacuum_handler_test.go @@ -242,6 +242,138 @@ func TestEmitVacuumDetectionDecisionTraceNoTasks(t *testing.T) { } } +func TestVacuumDescriptorHasNewFields(t *testing.T) { + handler := NewVacuumHandler(nil, 2) + desc := handler.Descriptor() + if desc == nil { + t.Fatal("descriptor is nil") + } + adminForm := desc.AdminConfigForm + if adminForm == nil { + t.Fatal("admin config form is nil") + } + if len(adminForm.Sections) == 0 { + t.Fatal("admin config form has no sections") + } + + scopeSection := adminForm.Sections[0] + fieldNames := make(map[string]*plugin_pb.ConfigField) + for _, f := range scopeSection.Fields { + fieldNames[f.Name] = f + } + + requiredFields := []string{ + "collection_filter", + "volume_state", + "data_center_filter", + "rack_filter", + "node_filter", + } + for _, name := range requiredFields { + if _, ok := fieldNames[name]; !ok { + t.Errorf("missing field %q in admin config scope section", name) + } + } + + // Verify volume_state is an enum with correct options. + vsField := fieldNames["volume_state"] + if vsField.FieldType != plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_ENUM { + t.Errorf("volume_state field type = %v, want ENUM", vsField.FieldType) + } + if len(vsField.Options) != 3 { + t.Errorf("volume_state options count = %d, want 3", len(vsField.Options)) + } + + // Verify default values exist for new fields. + defaultKeys := []string{"volume_state", "data_center_filter", "rack_filter", "node_filter"} + for _, key := range defaultKeys { + if _, ok := adminForm.DefaultValues[key]; !ok { + t.Errorf("missing default value for %q", key) + } + } + + // Verify volume_state default is ALL. + vsDefault := adminForm.DefaultValues["volume_state"] + if sv, ok := vsDefault.Kind.(*plugin_pb.ConfigValue_StringValue); !ok || sv.StringValue != "ALL" { + t.Errorf("volume_state default = %v, want ALL", vsDefault) + } + + // Verify collection_filter description mentions ALL_COLLECTIONS. + cfField := fieldNames["collection_filter"] + if cfField.Placeholder != "ALL_COLLECTIONS" { + t.Errorf("collection_filter placeholder = %q, want ALL_COLLECTIONS", cfField.Placeholder) + } +} + +func TestVacuumFiltersVolumeState(t *testing.T) { + metrics := []*workertypes.VolumeHealthMetrics{ + {VolumeID: 1, FullnessRatio: 0.5}, // active + {VolumeID: 2, FullnessRatio: 1.5}, // full + {VolumeID: 3, FullnessRatio: 0.9}, // active + {VolumeID: 4, FullnessRatio: 1.01}, // full (at threshold) + } + + tests := []struct { + name string + state volumeState + wantIDs []uint32 + }{ + {"ALL returns all", volumeStateAll, []uint32{1, 2, 3, 4}}, + {"ACTIVE returns writable", volumeStateActive, []uint32{1, 3}}, + {"FULL returns read-only", volumeStateFull, []uint32{2, 4}}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + filtered := filterMetricsByVolumeState(metrics, tt.state) + if len(filtered) != len(tt.wantIDs) { + t.Fatalf("got %d metrics, want %d", len(filtered), len(tt.wantIDs)) + } + for i, m := range filtered { + if m.VolumeID != tt.wantIDs[i] { + t.Errorf("filtered[%d].VolumeID = %d, want %d", i, m.VolumeID, tt.wantIDs[i]) + } + } + }) + } +} + +func TestVacuumFiltersLocation(t *testing.T) { + metrics := []*workertypes.VolumeHealthMetrics{ + {VolumeID: 1, DataCenter: "dc1", Rack: "r1", Server: "s1"}, + {VolumeID: 2, DataCenter: "dc1", Rack: "r2", Server: "s2"}, + {VolumeID: 3, DataCenter: "dc2", Rack: "r1", Server: "s3"}, + {VolumeID: 4, DataCenter: "dc2", Rack: "r3", Server: "s4"}, + } + + tests := []struct { + name string + dc string + rack string + node string + wantIDs []uint32 + }{ + {"dc filter", "dc1", "", "", []uint32{1, 2}}, + {"rack filter", "", "r1", "", []uint32{1, 3}}, + {"node filter", "", "", "s2,s4", []uint32{2, 4}}, + {"dc + rack", "dc1", "r1", "", []uint32{1}}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + filtered := filterMetricsByLocation(metrics, tt.dc, tt.rack, tt.node) + if len(filtered) != len(tt.wantIDs) { + t.Fatalf("got %d metrics, want %d", len(filtered), len(tt.wantIDs)) + } + for i, m := range filtered { + if m.VolumeID != tt.wantIDs[i] { + t.Errorf("filtered[%d].VolumeID = %d, want %d", i, m.VolumeID, tt.wantIDs[i]) + } + } + }) + } +} + type noopDetectionSender struct{} func (noopDetectionSender) SendProposals(*plugin_pb.DetectionProposals) error { return nil } diff --git a/weed/plugin/worker/volume_balance_handler.go b/weed/plugin/worker/volume_balance_handler.go index b37a19f54..a84420824 100644 --- a/weed/plugin/worker/volume_balance_handler.go +++ b/weed/plugin/worker/volume_balance_handler.go @@ -15,7 +15,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb/plugin_pb" "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" balancetask "github.com/seaweedfs/seaweedfs/weed/worker/tasks/balance" - taskutil "github.com/seaweedfs/seaweedfs/weed/worker/tasks/util" + "github.com/seaweedfs/seaweedfs/weed/util/wildcard" workertypes "github.com/seaweedfs/seaweedfs/weed/worker/types" "google.golang.org/grpc" "google.golang.org/protobuf/proto" @@ -123,7 +123,7 @@ func (h *VolumeBalanceHandler) Descriptor() *plugin_pb.JobTypeDescriptor { { Name: "data_center_filter", Label: "Data Center Filter", - Description: "Only balance volumes in a single data center. Leave empty for all data centers.", + Description: "Only balance volumes in matching data centers (comma-separated, wildcards supported). Leave empty for all.", Placeholder: "all data centers", FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_STRING, Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_TEXT, @@ -131,7 +131,7 @@ func (h *VolumeBalanceHandler) Descriptor() *plugin_pb.JobTypeDescriptor { { Name: "rack_filter", Label: "Rack Filter", - Description: "Only balance volumes on these racks (comma-separated). Leave empty for all racks.", + Description: "Only balance volumes on matching racks (comma-separated, wildcards supported). Leave empty for all.", Placeholder: "all racks", FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_STRING, Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_TEXT, @@ -139,7 +139,7 @@ func (h *VolumeBalanceHandler) Descriptor() *plugin_pb.JobTypeDescriptor { { Name: "node_filter", Label: "Node Filter", - Description: "Only balance volumes on these nodes (comma-separated server IDs). Leave empty for all nodes.", + Description: "Only balance volumes on matching nodes (comma-separated, wildcards supported). Leave empty for all.", Placeholder: "all nodes", FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_STRING, Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_TEXT, @@ -1142,23 +1142,22 @@ func deriveBalanceWorkerConfig(values map[string]*plugin_pb.ConfigValue) *volume } func filterMetricsByLocation(metrics []*workertypes.VolumeHealthMetrics, dcFilter, rackFilter, nodeFilter string) []*workertypes.VolumeHealthMetrics { - var rackSet, nodeSet map[string]bool - if rackFilter != "" { - rackSet = taskutil.ParseCSVSet(rackFilter) - } - if nodeFilter != "" { - nodeSet = taskutil.ParseCSVSet(nodeFilter) - } + dcMatchers := wildcard.CompileWildcardMatchers(dcFilter) + rackMatchers := wildcard.CompileWildcardMatchers(rackFilter) + nodeMatchers := wildcard.CompileWildcardMatchers(nodeFilter) filtered := make([]*workertypes.VolumeHealthMetrics, 0, len(metrics)) for _, m := range metrics { - if dcFilter != "" && m.DataCenter != dcFilter { + if m == nil { + continue + } + if !wildcard.MatchesAnyWildcard(dcMatchers, m.DataCenter) { continue } - if rackSet != nil && !rackSet[m.Rack] { + if !wildcard.MatchesAnyWildcard(rackMatchers, m.Rack) { continue } - if nodeSet != nil && !nodeSet[m.Server] { + if !wildcard.MatchesAnyWildcard(nodeMatchers, m.Server) { continue } filtered = append(filtered, m) diff --git a/weed/worker/tasks/balance/detection.go b/weed/worker/tasks/balance/detection.go index c81444540..3e297dd13 100644 --- a/weed/worker/tasks/balance/detection.go +++ b/weed/worker/tasks/balance/detection.go @@ -4,13 +4,13 @@ import ( "fmt" "math" "sort" - "strings" "time" "github.com/seaweedfs/seaweedfs/weed/admin/topology" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" "github.com/seaweedfs/seaweedfs/weed/storage/super_block" + "github.com/seaweedfs/seaweedfs/weed/util/wildcard" "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base" "github.com/seaweedfs/seaweedfs/weed/worker/tasks/util" "github.com/seaweedfs/seaweedfs/weed/worker/types" @@ -88,19 +88,19 @@ func detectForDiskType(diskType string, diskMetrics []*types.VolumeHealthMetrics if clusterInfo.ActiveTopology != nil { topologyInfo := clusterInfo.ActiveTopology.GetTopologyInfo() if topologyInfo != nil { - rackFilterSet := util.ParseCSVSet(balanceConfig.RackFilter) - nodeFilterSet := util.ParseCSVSet(balanceConfig.NodeFilter) - dcFilter := strings.TrimSpace(balanceConfig.DataCenterFilter) + dcMatchers := wildcard.CompileWildcardMatchers(balanceConfig.DataCenterFilter) + rackMatchers := wildcard.CompileWildcardMatchers(balanceConfig.RackFilter) + nodeMatchers := wildcard.CompileWildcardMatchers(balanceConfig.NodeFilter) for _, dc := range topologyInfo.DataCenterInfos { - if dcFilter != "" && dc.Id != dcFilter { + if !wildcard.MatchesAnyWildcard(dcMatchers, dc.Id) { continue } for _, rack := range dc.RackInfos { - if rackFilterSet != nil && !rackFilterSet[rack.Id] { + if !wildcard.MatchesAnyWildcard(rackMatchers, rack.Id) { continue } for _, node := range rack.DataNodeInfos { - if nodeFilterSet != nil && !nodeFilterSet[node.Id] { + if !wildcard.MatchesAnyWildcard(nodeMatchers, node.Id) { continue } for diskTypeName, diskInfo := range node.DiskInfos {