From 34fe289f326b36a79052eb91c377a230d785af4c Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Fri, 13 Mar 2026 15:56:03 -0700 Subject: [PATCH] feat(balance): add volume state filter (ALL/ACTIVE/FULL) (#8619) * feat(balance): add volume state filter (ALL/ACTIVE/FULL) Add a volume_state admin config field to the plugin worker volume balance handler, matching the shell's -volumeBy flag. This allows filtering volumes by state before balance detection: - ALL (default): consider all volumes - ACTIVE: only writable volumes below the size limit (FullnessRatio < 1.01) - FULL: only read-only volumes above the size limit (FullnessRatio >= 1.01) The 1.01 threshold mirrors the shell's thresholdVolumeSize constant. * address PR review: use enum/select widget, switch-based filter, nil safety - Change volume_state field from string/text to enum/select with dropdown options (ALL, ACTIVE, FULL) - Refactor filterMetricsByVolumeState to use switch with predicate function for clearer extensibility - Add nil-check guard to prevent panic on nil metric elements - Add TestFilterMetricsByVolumeState_NilElement regression test --- weed/plugin/worker/volume_balance_handler.go | 51 ++++++++ .../worker/volume_balance_handler_test.go | 112 ++++++++++++++++++ 2 files changed, 163 insertions(+) diff --git a/weed/plugin/worker/volume_balance_handler.go b/weed/plugin/worker/volume_balance_handler.go index 01b8bc1d3..fd3a1bb00 100644 --- a/weed/plugin/worker/volume_balance_handler.go +++ b/weed/plugin/worker/volume_balance_handler.go @@ -90,6 +90,18 @@ func (h *VolumeBalanceHandler) Descriptor() *plugin_pb.JobTypeDescriptor { FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_STRING, Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_TEXT, }, + { + Name: "volume_state", + Label: "Volume State Filter", + Description: "Filter volumes by state: ALL (default), ACTIVE (writable volumes below size limit), or FULL (read-only volumes above size limit).", + FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_ENUM, + Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_SELECT, + Options: []*plugin_pb.ConfigOption{ + {Value: "ALL", Label: "All Volumes"}, + {Value: "ACTIVE", Label: "Active (writable)"}, + {Value: "FULL", Label: "Full (read-only)"}, + }, + }, }, }, }, @@ -97,6 +109,9 @@ func (h *VolumeBalanceHandler) Descriptor() *plugin_pb.JobTypeDescriptor { "collection_filter": { Kind: &plugin_pb.ConfigValue_StringValue{StringValue: ""}, }, + "volume_state": { + Kind: &plugin_pb.ConfigValue_StringValue{StringValue: "ALL"}, + }, }, }, WorkerConfigForm: &plugin_pb.ConfigForm{ @@ -266,6 +281,9 @@ func (h *VolumeBalanceHandler) Detect( return err } + volumeState := strings.ToUpper(strings.TrimSpace(readStringConfig(request.GetAdminConfigValues(), "volume_state", "ALL"))) + metrics = filterMetricsByVolumeState(metrics, volumeState) + clusterInfo := &workertypes.ClusterInfo{ActiveTopology: activeTopology} maxResults := int(request.MaxResults) results, hasMore, err := balancetask.Detection(metrics, clusterInfo, workerConfig.TaskConfig, maxResults) @@ -544,6 +562,39 @@ func emitVolumeBalanceDetectionDecisionTrace( return nil } +// filterMetricsByVolumeState filters volume metrics by state. +// "ACTIVE" keeps volumes with FullnessRatio < 1.01 (writable, below size limit). +// "FULL" keeps volumes with FullnessRatio >= 1.01 (read-only, above size limit). +// "ALL" or any other value returns all metrics unfiltered. +func filterMetricsByVolumeState(metrics []*workertypes.VolumeHealthMetrics, volumeState string) []*workertypes.VolumeHealthMetrics { + const fullnessThreshold = 1.01 + + var predicate func(m *workertypes.VolumeHealthMetrics) bool + switch volumeState { + case "ACTIVE": + predicate = func(m *workertypes.VolumeHealthMetrics) bool { + return m.FullnessRatio < fullnessThreshold + } + case "FULL": + predicate = func(m *workertypes.VolumeHealthMetrics) bool { + return m.FullnessRatio >= fullnessThreshold + } + default: + return metrics + } + + filtered := make([]*workertypes.VolumeHealthMetrics, 0, len(metrics)) + for _, m := range metrics { + if m == nil { + continue + } + if predicate(m) { + filtered = append(filtered, m) + } + } + return filtered +} + func countBalanceDiskTypes(metrics []*workertypes.VolumeHealthMetrics) int { diskTypes := make(map[string]struct{}) for _, metric := range metrics { diff --git a/weed/plugin/worker/volume_balance_handler_test.go b/weed/plugin/worker/volume_balance_handler_test.go index ccfdd5a7c..85196c809 100644 --- a/weed/plugin/worker/volume_balance_handler_test.go +++ b/weed/plugin/worker/volume_balance_handler_test.go @@ -629,6 +629,118 @@ func TestExecuteSingleMovePathUnchanged(t *testing.T) { } } +func TestFilterMetricsByVolumeState(t *testing.T) { + metrics := []*workertypes.VolumeHealthMetrics{ + {VolumeID: 1, FullnessRatio: 0.5}, // active + {VolumeID: 2, FullnessRatio: 1.0}, // active (below 1.01) + {VolumeID: 3, FullnessRatio: 1.009}, // active (below 1.01) + {VolumeID: 4, FullnessRatio: 1.01}, // full (exactly at threshold) + {VolumeID: 5, FullnessRatio: 1.5}, // full + {VolumeID: 6, FullnessRatio: 2.0}, // full + } + + tests := []struct { + name string + state string + expectedIDs []uint32 + }{ + { + name: "ALL returns everything", + state: "ALL", + expectedIDs: []uint32{1, 2, 3, 4, 5, 6}, + }, + { + name: "empty string returns everything", + state: "", + expectedIDs: []uint32{1, 2, 3, 4, 5, 6}, + }, + { + name: "ACTIVE keeps FullnessRatio below 1.01", + state: "ACTIVE", + expectedIDs: []uint32{1, 2, 3}, + }, + { + name: "FULL keeps FullnessRatio at or above 1.01", + state: "FULL", + expectedIDs: []uint32{4, 5, 6}, + }, + { + name: "unknown value returns everything", + state: "INVALID", + expectedIDs: []uint32{1, 2, 3, 4, 5, 6}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := filterMetricsByVolumeState(metrics, tt.state) + if len(result) != len(tt.expectedIDs) { + t.Fatalf("expected %d metrics, got %d", len(tt.expectedIDs), len(result)) + } + for i, m := range result { + if m.VolumeID != tt.expectedIDs[i] { + t.Errorf("result[%d].VolumeID = %d, want %d", i, m.VolumeID, tt.expectedIDs[i]) + } + } + }) + } +} + +func TestFilterMetricsByVolumeState_NilElement(t *testing.T) { + metrics := []*workertypes.VolumeHealthMetrics{ + nil, + {VolumeID: 1, FullnessRatio: 0.5}, + nil, + {VolumeID: 2, FullnessRatio: 1.5}, + } + result := filterMetricsByVolumeState(metrics, "ACTIVE") + if len(result) != 1 || result[0].VolumeID != 1 { + t.Fatalf("expected [vol 1] for ACTIVE with nil elements, got %d results", len(result)) + } + result = filterMetricsByVolumeState(metrics, "FULL") + if len(result) != 1 || result[0].VolumeID != 2 { + t.Fatalf("expected [vol 2] for FULL with nil elements, got %d results", len(result)) + } +} + +func TestFilterMetricsByVolumeState_EmptyInput(t *testing.T) { + result := filterMetricsByVolumeState(nil, "ACTIVE") + if len(result) != 0 { + t.Fatalf("expected 0 metrics for nil input, got %d", len(result)) + } + + result = filterMetricsByVolumeState([]*workertypes.VolumeHealthMetrics{}, "FULL") + if len(result) != 0 { + t.Fatalf("expected 0 metrics for empty input, got %d", len(result)) + } +} + +func TestVolumeBalanceDescriptorHasVolumeStateField(t *testing.T) { + descriptor := NewVolumeBalanceHandler(nil).Descriptor() + if descriptor == nil || descriptor.AdminConfigForm == nil { + t.Fatalf("expected admin config form in descriptor") + } + found := false + for _, section := range descriptor.AdminConfigForm.Sections { + for _, field := range section.Fields { + if field.Name == "volume_state" { + found = true + break + } + } + } + if !found { + t.Fatalf("expected volume_state field in admin config form") + } + defaultVal, ok := descriptor.AdminConfigForm.DefaultValues["volume_state"] + if !ok { + t.Fatalf("expected volume_state default value") + } + if defaultVal.GetStringValue() != "ALL" { + t.Fatalf("expected volume_state default 'ALL', got %q", defaultVal.GetStringValue()) + } +} + func workerConfigFormHasField(form *plugin_pb.ConfigForm, fieldName string) bool { if form == nil { return false