Browse Source

feat(balance): add volume state filter (ALL/ACTIVE/FULL) (#8619)

* feat(balance): add volume state filter (ALL/ACTIVE/FULL)

Add a volume_state admin config field to the plugin worker volume balance
handler, matching the shell's -volumeBy flag. This allows filtering volumes
by state before balance detection:
- ALL (default): consider all volumes
- ACTIVE: only writable volumes below the size limit (FullnessRatio < 1.01)
- FULL: only read-only volumes above the size limit (FullnessRatio >= 1.01)

The 1.01 threshold mirrors the shell's thresholdVolumeSize constant.

* address PR review: use enum/select widget, switch-based filter, nil safety

- Change volume_state field from string/text to enum/select with
  dropdown options (ALL, ACTIVE, FULL)
- Refactor filterMetricsByVolumeState to use switch with predicate
  function for clearer extensibility
- Add nil-check guard to prevent panic on nil metric elements
- Add TestFilterMetricsByVolumeState_NilElement regression test
pull/8626/head
Chris Lu 4 days ago
committed by GitHub
parent
commit
34fe289f32
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 51
      weed/plugin/worker/volume_balance_handler.go
  2. 112
      weed/plugin/worker/volume_balance_handler_test.go

51
weed/plugin/worker/volume_balance_handler.go

@ -90,6 +90,18 @@ func (h *VolumeBalanceHandler) Descriptor() *plugin_pb.JobTypeDescriptor {
FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_STRING,
Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_TEXT,
},
{
Name: "volume_state",
Label: "Volume State Filter",
Description: "Filter volumes by state: ALL (default), ACTIVE (writable volumes below size limit), or FULL (read-only volumes above size limit).",
FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_ENUM,
Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_SELECT,
Options: []*plugin_pb.ConfigOption{
{Value: "ALL", Label: "All Volumes"},
{Value: "ACTIVE", Label: "Active (writable)"},
{Value: "FULL", Label: "Full (read-only)"},
},
},
},
},
},
@ -97,6 +109,9 @@ func (h *VolumeBalanceHandler) Descriptor() *plugin_pb.JobTypeDescriptor {
"collection_filter": {
Kind: &plugin_pb.ConfigValue_StringValue{StringValue: ""},
},
"volume_state": {
Kind: &plugin_pb.ConfigValue_StringValue{StringValue: "ALL"},
},
},
},
WorkerConfigForm: &plugin_pb.ConfigForm{
@ -266,6 +281,9 @@ func (h *VolumeBalanceHandler) Detect(
return err
}
volumeState := strings.ToUpper(strings.TrimSpace(readStringConfig(request.GetAdminConfigValues(), "volume_state", "ALL")))
metrics = filterMetricsByVolumeState(metrics, volumeState)
clusterInfo := &workertypes.ClusterInfo{ActiveTopology: activeTopology}
maxResults := int(request.MaxResults)
results, hasMore, err := balancetask.Detection(metrics, clusterInfo, workerConfig.TaskConfig, maxResults)
@ -544,6 +562,39 @@ func emitVolumeBalanceDetectionDecisionTrace(
return nil
}
// filterMetricsByVolumeState filters volume metrics by state.
// "ACTIVE" keeps volumes with FullnessRatio < 1.01 (writable, below size limit).
// "FULL" keeps volumes with FullnessRatio >= 1.01 (read-only, above size limit).
// "ALL" or any other value returns all metrics unfiltered.
func filterMetricsByVolumeState(metrics []*workertypes.VolumeHealthMetrics, volumeState string) []*workertypes.VolumeHealthMetrics {
const fullnessThreshold = 1.01
var predicate func(m *workertypes.VolumeHealthMetrics) bool
switch volumeState {
case "ACTIVE":
predicate = func(m *workertypes.VolumeHealthMetrics) bool {
return m.FullnessRatio < fullnessThreshold
}
case "FULL":
predicate = func(m *workertypes.VolumeHealthMetrics) bool {
return m.FullnessRatio >= fullnessThreshold
}
default:
return metrics
}
filtered := make([]*workertypes.VolumeHealthMetrics, 0, len(metrics))
for _, m := range metrics {
if m == nil {
continue
}
if predicate(m) {
filtered = append(filtered, m)
}
}
return filtered
}
func countBalanceDiskTypes(metrics []*workertypes.VolumeHealthMetrics) int {
diskTypes := make(map[string]struct{})
for _, metric := range metrics {

112
weed/plugin/worker/volume_balance_handler_test.go

@ -629,6 +629,118 @@ func TestExecuteSingleMovePathUnchanged(t *testing.T) {
}
}
func TestFilterMetricsByVolumeState(t *testing.T) {
metrics := []*workertypes.VolumeHealthMetrics{
{VolumeID: 1, FullnessRatio: 0.5}, // active
{VolumeID: 2, FullnessRatio: 1.0}, // active (below 1.01)
{VolumeID: 3, FullnessRatio: 1.009}, // active (below 1.01)
{VolumeID: 4, FullnessRatio: 1.01}, // full (exactly at threshold)
{VolumeID: 5, FullnessRatio: 1.5}, // full
{VolumeID: 6, FullnessRatio: 2.0}, // full
}
tests := []struct {
name string
state string
expectedIDs []uint32
}{
{
name: "ALL returns everything",
state: "ALL",
expectedIDs: []uint32{1, 2, 3, 4, 5, 6},
},
{
name: "empty string returns everything",
state: "",
expectedIDs: []uint32{1, 2, 3, 4, 5, 6},
},
{
name: "ACTIVE keeps FullnessRatio below 1.01",
state: "ACTIVE",
expectedIDs: []uint32{1, 2, 3},
},
{
name: "FULL keeps FullnessRatio at or above 1.01",
state: "FULL",
expectedIDs: []uint32{4, 5, 6},
},
{
name: "unknown value returns everything",
state: "INVALID",
expectedIDs: []uint32{1, 2, 3, 4, 5, 6},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := filterMetricsByVolumeState(metrics, tt.state)
if len(result) != len(tt.expectedIDs) {
t.Fatalf("expected %d metrics, got %d", len(tt.expectedIDs), len(result))
}
for i, m := range result {
if m.VolumeID != tt.expectedIDs[i] {
t.Errorf("result[%d].VolumeID = %d, want %d", i, m.VolumeID, tt.expectedIDs[i])
}
}
})
}
}
func TestFilterMetricsByVolumeState_NilElement(t *testing.T) {
metrics := []*workertypes.VolumeHealthMetrics{
nil,
{VolumeID: 1, FullnessRatio: 0.5},
nil,
{VolumeID: 2, FullnessRatio: 1.5},
}
result := filterMetricsByVolumeState(metrics, "ACTIVE")
if len(result) != 1 || result[0].VolumeID != 1 {
t.Fatalf("expected [vol 1] for ACTIVE with nil elements, got %d results", len(result))
}
result = filterMetricsByVolumeState(metrics, "FULL")
if len(result) != 1 || result[0].VolumeID != 2 {
t.Fatalf("expected [vol 2] for FULL with nil elements, got %d results", len(result))
}
}
func TestFilterMetricsByVolumeState_EmptyInput(t *testing.T) {
result := filterMetricsByVolumeState(nil, "ACTIVE")
if len(result) != 0 {
t.Fatalf("expected 0 metrics for nil input, got %d", len(result))
}
result = filterMetricsByVolumeState([]*workertypes.VolumeHealthMetrics{}, "FULL")
if len(result) != 0 {
t.Fatalf("expected 0 metrics for empty input, got %d", len(result))
}
}
func TestVolumeBalanceDescriptorHasVolumeStateField(t *testing.T) {
descriptor := NewVolumeBalanceHandler(nil).Descriptor()
if descriptor == nil || descriptor.AdminConfigForm == nil {
t.Fatalf("expected admin config form in descriptor")
}
found := false
for _, section := range descriptor.AdminConfigForm.Sections {
for _, field := range section.Fields {
if field.Name == "volume_state" {
found = true
break
}
}
}
if !found {
t.Fatalf("expected volume_state field in admin config form")
}
defaultVal, ok := descriptor.AdminConfigForm.DefaultValues["volume_state"]
if !ok {
t.Fatalf("expected volume_state default value")
}
if defaultVal.GetStringValue() != "ALL" {
t.Fatalf("expected volume_state default 'ALL', got %q", defaultVal.GetStringValue())
}
}
func workerConfigFormHasField(form *plugin_pb.ConfigForm, fieldName string) bool {
if form == nil {
return false

Loading…
Cancel
Save