Browse Source

feat(plugin): enhanced collection filtering for volume balance (#8620)

* feat(plugin): enhanced collection filtering for volume balance

Replace wildcard matching with three collection filter modes:
- ALL_COLLECTIONS (default): treat all volumes as one pool
- EACH_COLLECTION: run detection separately per collection
- Regex pattern: filter volumes by matching collection names

The EACH_COLLECTION mode extracts distinct collections from metrics
and calls Detection() per collection, sharing the maxResults budget
and clusterInfo (with ActiveTopology) across all calls.

* address PR review: fix wildcard→regexp replacement, optimize EACH_COLLECTION

* address nitpick: fail fast on config errors (invalid regex)

Add configError type so invalid collection_filter regex returns
immediately instead of retrying across all masters with the same
bad config. Transient errors still retry.

* address review: constants, unbounded maxResults, wildcard compat

- Define collectionFilterAll/collectionFilterEach constants to
  eliminate magic strings across handler and metrics code
- Fix EACH_COLLECTION budget loop to treat maxResults <= 0 as
  unbounded, matching Detection's existing semantics
- Treat "*" as ALL_COLLECTIONS for backward compat with wildcard

* address review: nil guard in EACH_COLLECTION grouping loop

* remove useless descriptor string test
pull/8626/head
Chris Lu 3 days ago
committed by GitHub
parent
commit
00ce1c6eba
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 59
      weed/plugin/worker/volume_balance_handler.go
  2. 34
      weed/plugin/worker/volume_metrics.go
  3. 109
      weed/plugin/worker/volume_metrics_test.go

59
weed/plugin/worker/volume_balance_handler.go

@ -23,6 +23,10 @@ import (
const (
defaultBalanceTimeoutSeconds = int32(10 * 60)
maxProposalStringLength = 200
// Collection filter mode constants.
collectionFilterAll = "ALL_COLLECTIONS"
collectionFilterEach = "EACH_COLLECTION"
)
func init() {
@ -85,8 +89,8 @@ func (h *VolumeBalanceHandler) Descriptor() *plugin_pb.JobTypeDescriptor {
{
Name: "collection_filter",
Label: "Collection Filter",
Description: "Only detect balance opportunities in this collection when set.",
Placeholder: "all collections",
Description: "Filter collections for balance detection. Use ALL_COLLECTIONS (default) to treat all volumes as one pool, EACH_COLLECTION to run detection separately per collection, or a regex pattern to match specific collections.",
Placeholder: "ALL_COLLECTIONS",
FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_STRING,
Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_TEXT,
},
@ -286,9 +290,54 @@ func (h *VolumeBalanceHandler) Detect(
clusterInfo := &workertypes.ClusterInfo{ActiveTopology: activeTopology}
maxResults := int(request.MaxResults)
results, hasMore, err := balancetask.Detection(metrics, clusterInfo, workerConfig.TaskConfig, maxResults)
if err != nil {
return err
var results []*workertypes.TaskDetectionResult
var hasMore bool
if collectionFilter == collectionFilterEach {
// Group metrics by collection in a single pass (O(N) instead of O(C*N))
metricsByCollection := make(map[string][]*workertypes.VolumeHealthMetrics)
for _, m := range metrics {
if m == nil {
continue
}
metricsByCollection[m.Collection] = append(metricsByCollection[m.Collection], m)
}
collections := make([]string, 0, len(metricsByCollection))
for c := range metricsByCollection {
collections = append(collections, c)
}
sort.Strings(collections)
budget := maxResults
unlimitedBudget := budget <= 0
for _, collection := range collections {
if !unlimitedBudget && budget <= 0 {
hasMore = true
break
}
perCollectionLimit := budget
if unlimitedBudget {
perCollectionLimit = 0 // Detection treats <= 0 as unbounded
}
perResults, perHasMore, perErr := balancetask.Detection(metricsByCollection[collection], clusterInfo, workerConfig.TaskConfig, perCollectionLimit)
if perErr != nil {
return perErr
}
results = append(results, perResults...)
if !unlimitedBudget {
budget -= len(perResults)
}
if perHasMore {
hasMore = true
}
}
} else {
var err error
results, hasMore, err = balancetask.Detection(metrics, clusterInfo, workerConfig.TaskConfig, maxResults)
if err != nil {
return err
}
}
if traceErr := emitVolumeBalanceDetectionDecisionTrace(sender, metrics, activeTopology, workerConfig.TaskConfig, results); traceErr != nil {

34
weed/plugin/worker/volume_metrics.go

@ -2,7 +2,9 @@ package pluginworker
import (
"context"
"errors"
"fmt"
"regexp"
"sort"
"strings"
"time"
@ -11,7 +13,6 @@ import (
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/util/wildcard"
workertypes "github.com/seaweedfs/seaweedfs/weed/worker/types"
"google.golang.org/grpc"
)
@ -38,6 +39,11 @@ func collectVolumeMetricsFromMasters(
metrics, activeTopology, buildErr := buildVolumeMetrics(response, collectionFilter)
if buildErr != nil {
// Configuration errors (e.g. invalid regex) will fail on every master,
// so return immediately instead of masking them with retries.
if isConfigError(buildErr) {
return nil, nil, buildErr
}
glog.Warningf("Plugin worker failed to build metrics from master %s: %v", masterAddress, buildErr)
continue
}
@ -93,7 +99,16 @@ func buildVolumeMetrics(
return nil, nil, err
}
patterns := wildcard.CompileWildcardMatchers(collectionFilter)
var collectionRegex *regexp.Regexp
trimmedFilter := strings.TrimSpace(collectionFilter)
if trimmedFilter != "" && trimmedFilter != collectionFilterAll && trimmedFilter != collectionFilterEach && trimmedFilter != "*" {
var err error
collectionRegex, err = regexp.Compile(trimmedFilter)
if err != nil {
return nil, nil, &configError{err: fmt.Errorf("invalid collection_filter regex %q: %w", trimmedFilter, err)}
}
}
volumeSizeLimitBytes := uint64(response.VolumeSizeLimitMb) * 1024 * 1024
now := time.Now()
metrics := make([]*workertypes.VolumeHealthMetrics, 0, 256)
@ -103,7 +118,7 @@ func buildVolumeMetrics(
for _, node := range rack.DataNodeInfos {
for diskType, diskInfo := range node.DiskInfos {
for _, volume := range diskInfo.VolumeInfos {
if !wildcard.MatchesAnyWildcard(patterns, volume.Collection) {
if collectionRegex != nil && !collectionRegex.MatchString(volume.Collection) {
continue
}
@ -148,6 +163,19 @@ func buildVolumeMetrics(
return metrics, activeTopology, nil
}
// configError wraps configuration errors that should not be retried across masters.
type configError struct {
err error
}
func (e *configError) Error() string { return e.err.Error() }
func (e *configError) Unwrap() error { return e.err }
func isConfigError(err error) bool {
var ce *configError
return errors.As(err, &ce)
}
func masterAddressCandidates(address string) []string {
trimmed := strings.TrimSpace(address)
if trimmed == "" {

109
weed/plugin/worker/volume_metrics_test.go

@ -0,0 +1,109 @@
package pluginworker
import (
"testing"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
)
func makeTestVolumeListResponse(volumes ...*master_pb.VolumeInformationMessage) *master_pb.VolumeListResponse {
return &master_pb.VolumeListResponse{
VolumeSizeLimitMb: 30000,
TopologyInfo: &master_pb.TopologyInfo{
DataCenterInfos: []*master_pb.DataCenterInfo{
{
Id: "dc1",
RackInfos: []*master_pb.RackInfo{
{
Id: "rack1",
DataNodeInfos: []*master_pb.DataNodeInfo{
{
Id: "server1:8080",
DiskInfos: map[string]*master_pb.DiskInfo{
"hdd": {
VolumeInfos: volumes,
},
},
},
},
},
},
},
},
},
}
}
func TestBuildVolumeMetricsEmptyFilter(t *testing.T) {
resp := makeTestVolumeListResponse(
&master_pb.VolumeInformationMessage{Id: 1, Collection: "photos", Size: 100},
&master_pb.VolumeInformationMessage{Id: 2, Collection: "videos", Size: 200},
)
metrics, _, err := buildVolumeMetrics(resp, "")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if len(metrics) != 2 {
t.Fatalf("expected 2 metrics, got %d", len(metrics))
}
}
func TestBuildVolumeMetricsAllCollections(t *testing.T) {
resp := makeTestVolumeListResponse(
&master_pb.VolumeInformationMessage{Id: 1, Collection: "photos", Size: 100},
&master_pb.VolumeInformationMessage{Id: 2, Collection: "videos", Size: 200},
)
metrics, _, err := buildVolumeMetrics(resp, collectionFilterAll)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if len(metrics) != 2 {
t.Fatalf("expected 2 metrics, got %d", len(metrics))
}
}
func TestBuildVolumeMetricsEachCollection(t *testing.T) {
resp := makeTestVolumeListResponse(
&master_pb.VolumeInformationMessage{Id: 1, Collection: "photos", Size: 100},
&master_pb.VolumeInformationMessage{Id: 2, Collection: "videos", Size: 200},
)
// EACH_COLLECTION passes all volumes through; filtering happens in the handler
metrics, _, err := buildVolumeMetrics(resp, collectionFilterEach)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if len(metrics) != 2 {
t.Fatalf("expected 2 metrics, got %d", len(metrics))
}
}
func TestBuildVolumeMetricsRegexFilter(t *testing.T) {
resp := makeTestVolumeListResponse(
&master_pb.VolumeInformationMessage{Id: 1, Collection: "photos", Size: 100},
&master_pb.VolumeInformationMessage{Id: 2, Collection: "videos", Size: 200},
&master_pb.VolumeInformationMessage{Id: 3, Collection: "photos-backup", Size: 300},
)
metrics, _, err := buildVolumeMetrics(resp, "^photos$")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if len(metrics) != 1 {
t.Fatalf("expected 1 metric, got %d", len(metrics))
}
if metrics[0].Collection != "photos" {
t.Fatalf("expected collection 'photos', got %q", metrics[0].Collection)
}
}
func TestBuildVolumeMetricsInvalidRegex(t *testing.T) {
resp := makeTestVolumeListResponse(
&master_pb.VolumeInformationMessage{Id: 1, Collection: "photos", Size: 100},
)
_, _, err := buildVolumeMetrics(resp, "[invalid")
if err == nil {
t.Fatal("expected error for invalid regex")
}
if !isConfigError(err) {
t.Fatalf("expected config error for invalid regex, got: %v", err)
}
}
Loading…
Cancel
Save