Browse Source

feat(vacuum): add volume state and location filters to vacuum handler (#8625)

* feat(vacuum): add volume state, location, and enhanced collection filters

Align the vacuum handler's admin config with the balance handler by adding:
- volume_state filter (ALL/ACTIVE/FULL) to scope vacuum to writable or
  read-only volumes
- data_center_filter, rack_filter, node_filter to scope vacuum to
  specific infrastructure locations
- Enhanced collection_filter description matching the balance handler's
  ALL_COLLECTIONS/EACH_COLLECTION/regex modes

The new filters reuse filterMetricsByVolumeState() and
filterMetricsByLocation() already defined in the same package.

* use wildcard matchers for DC/rack/node filters

Replace exact-match and CSV set lookups with wildcard matching
from util/wildcard package. Patterns like "dc*", "rack-1?", or
"node-a*" are now supported in all location filter fields for
both balance and vacuum handlers.

* add nil guard in filterMetricsByLocation
pull/8632/head
Chris Lu 6 days ago
committed by GitHub
parent
commit
2f51a94416
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 63
      weed/plugin/worker/vacuum_handler.go
  2. 132
      weed/plugin/worker/vacuum_handler_test.go
  3. 27
      weed/plugin/worker/volume_balance_handler.go
  4. 14
      weed/worker/tasks/balance/detection.go

63
weed/plugin/worker/vacuum_handler.go

@ -87,8 +87,44 @@ func (h *VacuumHandler) Descriptor() *plugin_pb.JobTypeDescriptor {
{
Name: "collection_filter",
Label: "Collection Filter",
Description: "Only scan this collection when set.",
Placeholder: "all collections",
Description: "Filter collections for vacuum detection. Use ALL_COLLECTIONS (default) to treat all volumes as one pool, EACH_COLLECTION to run detection separately per collection, or a regex pattern to match specific collections.",
Placeholder: "ALL_COLLECTIONS",
FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_STRING,
Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_TEXT,
},
{
Name: "volume_state",
Label: "Volume State Filter",
Description: "Filter volumes by state: ALL (default), ACTIVE (writable volumes below size limit), or FULL (read-only volumes above size limit).",
FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_ENUM,
Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_SELECT,
Options: []*plugin_pb.ConfigOption{
{Value: string(volumeStateAll), Label: "All Volumes"},
{Value: string(volumeStateActive), Label: "Active (writable)"},
{Value: string(volumeStateFull), Label: "Full (read-only)"},
},
},
{
Name: "data_center_filter",
Label: "Data Center Filter",
Description: "Only vacuum volumes in matching data centers (comma-separated, wildcards supported). Leave empty for all.",
Placeholder: "all data centers",
FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_STRING,
Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_TEXT,
},
{
Name: "rack_filter",
Label: "Rack Filter",
Description: "Only vacuum volumes on matching racks (comma-separated, wildcards supported). Leave empty for all.",
Placeholder: "all racks",
FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_STRING,
Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_TEXT,
},
{
Name: "node_filter",
Label: "Node Filter",
Description: "Only vacuum volumes on matching nodes (comma-separated, wildcards supported). Leave empty for all.",
Placeholder: "all nodes",
FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_STRING,
Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_TEXT,
},
@ -99,6 +135,18 @@ func (h *VacuumHandler) Descriptor() *plugin_pb.JobTypeDescriptor {
"collection_filter": {
Kind: &plugin_pb.ConfigValue_StringValue{StringValue: ""},
},
"volume_state": {
Kind: &plugin_pb.ConfigValue_StringValue{StringValue: string(volumeStateAll)},
},
"data_center_filter": {
Kind: &plugin_pb.ConfigValue_StringValue{StringValue: ""},
},
"rack_filter": {
Kind: &plugin_pb.ConfigValue_StringValue{StringValue: ""},
},
"node_filter": {
Kind: &plugin_pb.ConfigValue_StringValue{StringValue: ""},
},
},
},
WorkerConfigForm: &plugin_pb.ConfigForm{
@ -226,6 +274,17 @@ func (h *VacuumHandler) Detect(ctx context.Context, request *plugin_pb.RunDetect
return err
}
volState := volumeState(strings.ToUpper(strings.TrimSpace(readStringConfig(request.GetAdminConfigValues(), "volume_state", string(volumeStateAll)))))
metrics = filterMetricsByVolumeState(metrics, volState)
dataCenterFilter := strings.TrimSpace(readStringConfig(request.GetAdminConfigValues(), "data_center_filter", ""))
rackFilter := strings.TrimSpace(readStringConfig(request.GetAdminConfigValues(), "rack_filter", ""))
nodeFilter := strings.TrimSpace(readStringConfig(request.GetAdminConfigValues(), "node_filter", ""))
if dataCenterFilter != "" || rackFilter != "" || nodeFilter != "" {
metrics = filterMetricsByLocation(metrics, dataCenterFilter, rackFilter, nodeFilter)
}
clusterInfo := &workertypes.ClusterInfo{ActiveTopology: activeTopology}
results, err := vacuumtask.Detection(metrics, clusterInfo, workerConfig)
if err != nil {

132
weed/plugin/worker/vacuum_handler_test.go

@ -242,6 +242,138 @@ func TestEmitVacuumDetectionDecisionTraceNoTasks(t *testing.T) {
}
}
func TestVacuumDescriptorHasNewFields(t *testing.T) {
handler := NewVacuumHandler(nil, 2)
desc := handler.Descriptor()
if desc == nil {
t.Fatal("descriptor is nil")
}
adminForm := desc.AdminConfigForm
if adminForm == nil {
t.Fatal("admin config form is nil")
}
if len(adminForm.Sections) == 0 {
t.Fatal("admin config form has no sections")
}
scopeSection := adminForm.Sections[0]
fieldNames := make(map[string]*plugin_pb.ConfigField)
for _, f := range scopeSection.Fields {
fieldNames[f.Name] = f
}
requiredFields := []string{
"collection_filter",
"volume_state",
"data_center_filter",
"rack_filter",
"node_filter",
}
for _, name := range requiredFields {
if _, ok := fieldNames[name]; !ok {
t.Errorf("missing field %q in admin config scope section", name)
}
}
// Verify volume_state is an enum with correct options.
vsField := fieldNames["volume_state"]
if vsField.FieldType != plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_ENUM {
t.Errorf("volume_state field type = %v, want ENUM", vsField.FieldType)
}
if len(vsField.Options) != 3 {
t.Errorf("volume_state options count = %d, want 3", len(vsField.Options))
}
// Verify default values exist for new fields.
defaultKeys := []string{"volume_state", "data_center_filter", "rack_filter", "node_filter"}
for _, key := range defaultKeys {
if _, ok := adminForm.DefaultValues[key]; !ok {
t.Errorf("missing default value for %q", key)
}
}
// Verify volume_state default is ALL.
vsDefault := adminForm.DefaultValues["volume_state"]
if sv, ok := vsDefault.Kind.(*plugin_pb.ConfigValue_StringValue); !ok || sv.StringValue != "ALL" {
t.Errorf("volume_state default = %v, want ALL", vsDefault)
}
// Verify collection_filter description mentions ALL_COLLECTIONS.
cfField := fieldNames["collection_filter"]
if cfField.Placeholder != "ALL_COLLECTIONS" {
t.Errorf("collection_filter placeholder = %q, want ALL_COLLECTIONS", cfField.Placeholder)
}
}
func TestVacuumFiltersVolumeState(t *testing.T) {
metrics := []*workertypes.VolumeHealthMetrics{
{VolumeID: 1, FullnessRatio: 0.5}, // active
{VolumeID: 2, FullnessRatio: 1.5}, // full
{VolumeID: 3, FullnessRatio: 0.9}, // active
{VolumeID: 4, FullnessRatio: 1.01}, // full (at threshold)
}
tests := []struct {
name string
state volumeState
wantIDs []uint32
}{
{"ALL returns all", volumeStateAll, []uint32{1, 2, 3, 4}},
{"ACTIVE returns writable", volumeStateActive, []uint32{1, 3}},
{"FULL returns read-only", volumeStateFull, []uint32{2, 4}},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
filtered := filterMetricsByVolumeState(metrics, tt.state)
if len(filtered) != len(tt.wantIDs) {
t.Fatalf("got %d metrics, want %d", len(filtered), len(tt.wantIDs))
}
for i, m := range filtered {
if m.VolumeID != tt.wantIDs[i] {
t.Errorf("filtered[%d].VolumeID = %d, want %d", i, m.VolumeID, tt.wantIDs[i])
}
}
})
}
}
func TestVacuumFiltersLocation(t *testing.T) {
metrics := []*workertypes.VolumeHealthMetrics{
{VolumeID: 1, DataCenter: "dc1", Rack: "r1", Server: "s1"},
{VolumeID: 2, DataCenter: "dc1", Rack: "r2", Server: "s2"},
{VolumeID: 3, DataCenter: "dc2", Rack: "r1", Server: "s3"},
{VolumeID: 4, DataCenter: "dc2", Rack: "r3", Server: "s4"},
}
tests := []struct {
name string
dc string
rack string
node string
wantIDs []uint32
}{
{"dc filter", "dc1", "", "", []uint32{1, 2}},
{"rack filter", "", "r1", "", []uint32{1, 3}},
{"node filter", "", "", "s2,s4", []uint32{2, 4}},
{"dc + rack", "dc1", "r1", "", []uint32{1}},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
filtered := filterMetricsByLocation(metrics, tt.dc, tt.rack, tt.node)
if len(filtered) != len(tt.wantIDs) {
t.Fatalf("got %d metrics, want %d", len(filtered), len(tt.wantIDs))
}
for i, m := range filtered {
if m.VolumeID != tt.wantIDs[i] {
t.Errorf("filtered[%d].VolumeID = %d, want %d", i, m.VolumeID, tt.wantIDs[i])
}
}
})
}
}
type noopDetectionSender struct{}
func (noopDetectionSender) SendProposals(*plugin_pb.DetectionProposals) error { return nil }

27
weed/plugin/worker/volume_balance_handler.go

@ -15,7 +15,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb/plugin_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
balancetask "github.com/seaweedfs/seaweedfs/weed/worker/tasks/balance"
taskutil "github.com/seaweedfs/seaweedfs/weed/worker/tasks/util"
"github.com/seaweedfs/seaweedfs/weed/util/wildcard"
workertypes "github.com/seaweedfs/seaweedfs/weed/worker/types"
"google.golang.org/grpc"
"google.golang.org/protobuf/proto"
@ -123,7 +123,7 @@ func (h *VolumeBalanceHandler) Descriptor() *plugin_pb.JobTypeDescriptor {
{
Name: "data_center_filter",
Label: "Data Center Filter",
Description: "Only balance volumes in a single data center. Leave empty for all data centers.",
Description: "Only balance volumes in matching data centers (comma-separated, wildcards supported). Leave empty for all.",
Placeholder: "all data centers",
FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_STRING,
Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_TEXT,
@ -131,7 +131,7 @@ func (h *VolumeBalanceHandler) Descriptor() *plugin_pb.JobTypeDescriptor {
{
Name: "rack_filter",
Label: "Rack Filter",
Description: "Only balance volumes on these racks (comma-separated). Leave empty for all racks.",
Description: "Only balance volumes on matching racks (comma-separated, wildcards supported). Leave empty for all.",
Placeholder: "all racks",
FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_STRING,
Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_TEXT,
@ -139,7 +139,7 @@ func (h *VolumeBalanceHandler) Descriptor() *plugin_pb.JobTypeDescriptor {
{
Name: "node_filter",
Label: "Node Filter",
Description: "Only balance volumes on these nodes (comma-separated server IDs). Leave empty for all nodes.",
Description: "Only balance volumes on matching nodes (comma-separated, wildcards supported). Leave empty for all.",
Placeholder: "all nodes",
FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_STRING,
Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_TEXT,
@ -1142,23 +1142,22 @@ func deriveBalanceWorkerConfig(values map[string]*plugin_pb.ConfigValue) *volume
}
func filterMetricsByLocation(metrics []*workertypes.VolumeHealthMetrics, dcFilter, rackFilter, nodeFilter string) []*workertypes.VolumeHealthMetrics {
var rackSet, nodeSet map[string]bool
if rackFilter != "" {
rackSet = taskutil.ParseCSVSet(rackFilter)
}
if nodeFilter != "" {
nodeSet = taskutil.ParseCSVSet(nodeFilter)
}
dcMatchers := wildcard.CompileWildcardMatchers(dcFilter)
rackMatchers := wildcard.CompileWildcardMatchers(rackFilter)
nodeMatchers := wildcard.CompileWildcardMatchers(nodeFilter)
filtered := make([]*workertypes.VolumeHealthMetrics, 0, len(metrics))
for _, m := range metrics {
if dcFilter != "" && m.DataCenter != dcFilter {
if m == nil {
continue
}
if !wildcard.MatchesAnyWildcard(dcMatchers, m.DataCenter) {
continue
}
if rackSet != nil && !rackSet[m.Rack] {
if !wildcard.MatchesAnyWildcard(rackMatchers, m.Rack) {
continue
}
if nodeSet != nil && !nodeSet[m.Server] {
if !wildcard.MatchesAnyWildcard(nodeMatchers, m.Server) {
continue
}
filtered = append(filtered, m)

14
weed/worker/tasks/balance/detection.go

@ -4,13 +4,13 @@ import (
"fmt"
"math"
"sort"
"strings"
"time"
"github.com/seaweedfs/seaweedfs/weed/admin/topology"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
"github.com/seaweedfs/seaweedfs/weed/storage/super_block"
"github.com/seaweedfs/seaweedfs/weed/util/wildcard"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/util"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
@ -88,19 +88,19 @@ func detectForDiskType(diskType string, diskMetrics []*types.VolumeHealthMetrics
if clusterInfo.ActiveTopology != nil {
topologyInfo := clusterInfo.ActiveTopology.GetTopologyInfo()
if topologyInfo != nil {
rackFilterSet := util.ParseCSVSet(balanceConfig.RackFilter)
nodeFilterSet := util.ParseCSVSet(balanceConfig.NodeFilter)
dcFilter := strings.TrimSpace(balanceConfig.DataCenterFilter)
dcMatchers := wildcard.CompileWildcardMatchers(balanceConfig.DataCenterFilter)
rackMatchers := wildcard.CompileWildcardMatchers(balanceConfig.RackFilter)
nodeMatchers := wildcard.CompileWildcardMatchers(balanceConfig.NodeFilter)
for _, dc := range topologyInfo.DataCenterInfos {
if dcFilter != "" && dc.Id != dcFilter {
if !wildcard.MatchesAnyWildcard(dcMatchers, dc.Id) {
continue
}
for _, rack := range dc.RackInfos {
if rackFilterSet != nil && !rackFilterSet[rack.Id] {
if !wildcard.MatchesAnyWildcard(rackMatchers, rack.Id) {
continue
}
for _, node := range rack.DataNodeInfos {
if nodeFilterSet != nil && !nodeFilterSet[node.Id] {
if !wildcard.MatchesAnyWildcard(nodeMatchers, node.Id) {
continue
}
for diskTypeName, diskInfo := range node.DiskInfos {

Loading…
Cancel
Save