diff --git a/weed/plugin/worker/volume_balance_handler.go b/weed/plugin/worker/volume_balance_handler.go index fd3a1bb00..c23c7fd2b 100644 --- a/weed/plugin/worker/volume_balance_handler.go +++ b/weed/plugin/worker/volume_balance_handler.go @@ -23,6 +23,10 @@ import ( const ( defaultBalanceTimeoutSeconds = int32(10 * 60) maxProposalStringLength = 200 + + // Collection filter mode constants. + collectionFilterAll = "ALL_COLLECTIONS" + collectionFilterEach = "EACH_COLLECTION" ) func init() { @@ -85,8 +89,8 @@ func (h *VolumeBalanceHandler) Descriptor() *plugin_pb.JobTypeDescriptor { { Name: "collection_filter", Label: "Collection Filter", - Description: "Only detect balance opportunities in this collection when set.", - Placeholder: "all collections", + Description: "Filter collections for balance detection. Use ALL_COLLECTIONS (default) to treat all volumes as one pool, EACH_COLLECTION to run detection separately per collection, or a regex pattern to match specific collections.", + Placeholder: "ALL_COLLECTIONS", FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_STRING, Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_TEXT, }, @@ -286,9 +290,54 @@ func (h *VolumeBalanceHandler) Detect( clusterInfo := &workertypes.ClusterInfo{ActiveTopology: activeTopology} maxResults := int(request.MaxResults) - results, hasMore, err := balancetask.Detection(metrics, clusterInfo, workerConfig.TaskConfig, maxResults) - if err != nil { - return err + + var results []*workertypes.TaskDetectionResult + var hasMore bool + + if collectionFilter == collectionFilterEach { + // Group metrics by collection in a single pass (O(N) instead of O(C*N)) + metricsByCollection := make(map[string][]*workertypes.VolumeHealthMetrics) + for _, m := range metrics { + if m == nil { + continue + } + metricsByCollection[m.Collection] = append(metricsByCollection[m.Collection], m) + } + collections := make([]string, 0, len(metricsByCollection)) + for c := range metricsByCollection { + collections = append(collections, c) + } + sort.Strings(collections) + + budget := maxResults + unlimitedBudget := budget <= 0 + for _, collection := range collections { + if !unlimitedBudget && budget <= 0 { + hasMore = true + break + } + perCollectionLimit := budget + if unlimitedBudget { + perCollectionLimit = 0 // Detection treats <= 0 as unbounded + } + perResults, perHasMore, perErr := balancetask.Detection(metricsByCollection[collection], clusterInfo, workerConfig.TaskConfig, perCollectionLimit) + if perErr != nil { + return perErr + } + results = append(results, perResults...) + if !unlimitedBudget { + budget -= len(perResults) + } + if perHasMore { + hasMore = true + } + } + } else { + var err error + results, hasMore, err = balancetask.Detection(metrics, clusterInfo, workerConfig.TaskConfig, maxResults) + if err != nil { + return err + } } if traceErr := emitVolumeBalanceDetectionDecisionTrace(sender, metrics, activeTopology, workerConfig.TaskConfig, results); traceErr != nil { diff --git a/weed/plugin/worker/volume_metrics.go b/weed/plugin/worker/volume_metrics.go index 32a11f489..454eff079 100644 --- a/weed/plugin/worker/volume_metrics.go +++ b/weed/plugin/worker/volume_metrics.go @@ -2,7 +2,9 @@ package pluginworker import ( "context" + "errors" "fmt" + "regexp" "sort" "strings" "time" @@ -11,7 +13,6 @@ import ( "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" - "github.com/seaweedfs/seaweedfs/weed/util/wildcard" workertypes "github.com/seaweedfs/seaweedfs/weed/worker/types" "google.golang.org/grpc" ) @@ -38,6 +39,11 @@ func collectVolumeMetricsFromMasters( metrics, activeTopology, buildErr := buildVolumeMetrics(response, collectionFilter) if buildErr != nil { + // Configuration errors (e.g. invalid regex) will fail on every master, + // so return immediately instead of masking them with retries. + if isConfigError(buildErr) { + return nil, nil, buildErr + } glog.Warningf("Plugin worker failed to build metrics from master %s: %v", masterAddress, buildErr) continue } @@ -93,7 +99,16 @@ func buildVolumeMetrics( return nil, nil, err } - patterns := wildcard.CompileWildcardMatchers(collectionFilter) + var collectionRegex *regexp.Regexp + trimmedFilter := strings.TrimSpace(collectionFilter) + if trimmedFilter != "" && trimmedFilter != collectionFilterAll && trimmedFilter != collectionFilterEach && trimmedFilter != "*" { + var err error + collectionRegex, err = regexp.Compile(trimmedFilter) + if err != nil { + return nil, nil, &configError{err: fmt.Errorf("invalid collection_filter regex %q: %w", trimmedFilter, err)} + } + } + volumeSizeLimitBytes := uint64(response.VolumeSizeLimitMb) * 1024 * 1024 now := time.Now() metrics := make([]*workertypes.VolumeHealthMetrics, 0, 256) @@ -103,7 +118,7 @@ func buildVolumeMetrics( for _, node := range rack.DataNodeInfos { for diskType, diskInfo := range node.DiskInfos { for _, volume := range diskInfo.VolumeInfos { - if !wildcard.MatchesAnyWildcard(patterns, volume.Collection) { + if collectionRegex != nil && !collectionRegex.MatchString(volume.Collection) { continue } @@ -148,6 +163,19 @@ func buildVolumeMetrics( return metrics, activeTopology, nil } +// configError wraps configuration errors that should not be retried across masters. +type configError struct { + err error +} + +func (e *configError) Error() string { return e.err.Error() } +func (e *configError) Unwrap() error { return e.err } + +func isConfigError(err error) bool { + var ce *configError + return errors.As(err, &ce) +} + func masterAddressCandidates(address string) []string { trimmed := strings.TrimSpace(address) if trimmed == "" { diff --git a/weed/plugin/worker/volume_metrics_test.go b/weed/plugin/worker/volume_metrics_test.go new file mode 100644 index 000000000..6e5e6904f --- /dev/null +++ b/weed/plugin/worker/volume_metrics_test.go @@ -0,0 +1,109 @@ +package pluginworker + +import ( + "testing" + + "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" +) + +func makeTestVolumeListResponse(volumes ...*master_pb.VolumeInformationMessage) *master_pb.VolumeListResponse { + return &master_pb.VolumeListResponse{ + VolumeSizeLimitMb: 30000, + TopologyInfo: &master_pb.TopologyInfo{ + DataCenterInfos: []*master_pb.DataCenterInfo{ + { + Id: "dc1", + RackInfos: []*master_pb.RackInfo{ + { + Id: "rack1", + DataNodeInfos: []*master_pb.DataNodeInfo{ + { + Id: "server1:8080", + DiskInfos: map[string]*master_pb.DiskInfo{ + "hdd": { + VolumeInfos: volumes, + }, + }, + }, + }, + }, + }, + }, + }, + }, + } +} + +func TestBuildVolumeMetricsEmptyFilter(t *testing.T) { + resp := makeTestVolumeListResponse( + &master_pb.VolumeInformationMessage{Id: 1, Collection: "photos", Size: 100}, + &master_pb.VolumeInformationMessage{Id: 2, Collection: "videos", Size: 200}, + ) + metrics, _, err := buildVolumeMetrics(resp, "") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(metrics) != 2 { + t.Fatalf("expected 2 metrics, got %d", len(metrics)) + } +} + +func TestBuildVolumeMetricsAllCollections(t *testing.T) { + resp := makeTestVolumeListResponse( + &master_pb.VolumeInformationMessage{Id: 1, Collection: "photos", Size: 100}, + &master_pb.VolumeInformationMessage{Id: 2, Collection: "videos", Size: 200}, + ) + metrics, _, err := buildVolumeMetrics(resp, collectionFilterAll) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(metrics) != 2 { + t.Fatalf("expected 2 metrics, got %d", len(metrics)) + } +} + +func TestBuildVolumeMetricsEachCollection(t *testing.T) { + resp := makeTestVolumeListResponse( + &master_pb.VolumeInformationMessage{Id: 1, Collection: "photos", Size: 100}, + &master_pb.VolumeInformationMessage{Id: 2, Collection: "videos", Size: 200}, + ) + // EACH_COLLECTION passes all volumes through; filtering happens in the handler + metrics, _, err := buildVolumeMetrics(resp, collectionFilterEach) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(metrics) != 2 { + t.Fatalf("expected 2 metrics, got %d", len(metrics)) + } +} + +func TestBuildVolumeMetricsRegexFilter(t *testing.T) { + resp := makeTestVolumeListResponse( + &master_pb.VolumeInformationMessage{Id: 1, Collection: "photos", Size: 100}, + &master_pb.VolumeInformationMessage{Id: 2, Collection: "videos", Size: 200}, + &master_pb.VolumeInformationMessage{Id: 3, Collection: "photos-backup", Size: 300}, + ) + metrics, _, err := buildVolumeMetrics(resp, "^photos$") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(metrics) != 1 { + t.Fatalf("expected 1 metric, got %d", len(metrics)) + } + if metrics[0].Collection != "photos" { + t.Fatalf("expected collection 'photos', got %q", metrics[0].Collection) + } +} + +func TestBuildVolumeMetricsInvalidRegex(t *testing.T) { + resp := makeTestVolumeListResponse( + &master_pb.VolumeInformationMessage{Id: 1, Collection: "photos", Size: 100}, + ) + _, _, err := buildVolumeMetrics(resp, "[invalid") + if err == nil { + t.Fatal("expected error for invalid regex") + } + if !isConfigError(err) { + t.Fatalf("expected config error for invalid regex, got: %v", err) + } +}