Browse Source

fix(plugin/worker): make VacuumHandler report MaxExecutionConcurrency from worker startup flag (#8435)

* fix(plugin/worker): make VacuumHandler report MaxExecutionConcurrency from worker startup flag

Previously, MaxExecutionConcurrency was hardcoded to 2 in VacuumHandler.Capability().
The scheduler's schedulerWorkerExecutionLimit() takes the minimum of the UI-configured
PerWorkerExecutionConcurrency and the worker-reported capability limit, so the hardcoded
value silently capped each worker to 2 concurrent vacuum executions regardless of the
--max-execute flag passed at worker startup.

Pass maxExecutionConcurrency into NewVacuumHandler() and wire it through
buildPluginWorkerHandler/buildPluginWorkerHandlers so the capability reflects the actual
worker configuration. The default falls back to 2 when the value is unset or zero.

* Update weed/command/worker_runtime.go

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

---------

Co-authored-by: Anton Ustyugov <anton@devops>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
pull/8372/merge
Anton 1 day ago
committed by GitHub
parent
commit
427c975ff3
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 4
      weed/command/mini.go
  2. 25
      weed/command/plugin_worker_test.go
  3. 18
      weed/command/worker_runtime.go
  4. 22
      weed/plugin/worker/vacuum_handler.go
  5. 4
      weed/plugin/worker/vacuum_handler_test.go

4
weed/command/mini.go

@ -1186,7 +1186,7 @@ func startMiniPluginWorker(ctx context.Context) {
util.LoadConfiguration("security", false) util.LoadConfiguration("security", false)
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.worker") grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.worker")
handlers, err := buildPluginWorkerHandlers(defaultMiniPluginJobTypes, grpcDialOption)
handlers, err := buildPluginWorkerHandlers(defaultMiniPluginJobTypes, grpcDialOption, int(pluginworker.DefaultMaxExecutionConcurrency))
if err != nil { if err != nil {
glog.Fatalf("Failed to build mini plugin worker handlers: %v", err) glog.Fatalf("Failed to build mini plugin worker handlers: %v", err)
} }
@ -1204,7 +1204,7 @@ func startMiniPluginWorker(ctx context.Context) {
HeartbeatInterval: 15 * time.Second, HeartbeatInterval: 15 * time.Second,
ReconnectDelay: 5 * time.Second, ReconnectDelay: 5 * time.Second,
MaxDetectionConcurrency: 1, MaxDetectionConcurrency: 1,
MaxExecutionConcurrency: 2,
MaxExecutionConcurrency: int(pluginworker.DefaultMaxExecutionConcurrency),
GrpcDialOption: grpcDialOption, GrpcDialOption: grpcDialOption,
Handlers: handlers, Handlers: handlers,
}) })

25
weed/command/plugin_worker_test.go

@ -9,6 +9,7 @@ import (
"strings" "strings"
"testing" "testing"
pluginworker "github.com/seaweedfs/seaweedfs/weed/plugin/worker"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/credentials/insecure"
) )
@ -16,7 +17,9 @@ import (
func TestBuildPluginWorkerHandler(t *testing.T) { func TestBuildPluginWorkerHandler(t *testing.T) {
dialOption := grpc.WithTransportCredentials(insecure.NewCredentials()) dialOption := grpc.WithTransportCredentials(insecure.NewCredentials())
handler, err := buildPluginWorkerHandler("vacuum", dialOption)
testMaxConcurrency := int(pluginworker.DefaultMaxExecutionConcurrency)
handler, err := buildPluginWorkerHandler("vacuum", dialOption, testMaxConcurrency)
if err != nil { if err != nil {
t.Fatalf("buildPluginWorkerHandler(vacuum) err = %v", err) t.Fatalf("buildPluginWorkerHandler(vacuum) err = %v", err)
} }
@ -24,7 +27,7 @@ func TestBuildPluginWorkerHandler(t *testing.T) {
t.Fatalf("expected non-nil handler") t.Fatalf("expected non-nil handler")
} }
handler, err = buildPluginWorkerHandler("", dialOption)
handler, err = buildPluginWorkerHandler("", dialOption, testMaxConcurrency)
if err != nil { if err != nil {
t.Fatalf("buildPluginWorkerHandler(default) err = %v", err) t.Fatalf("buildPluginWorkerHandler(default) err = %v", err)
} }
@ -32,7 +35,7 @@ func TestBuildPluginWorkerHandler(t *testing.T) {
t.Fatalf("expected non-nil default handler") t.Fatalf("expected non-nil default handler")
} }
handler, err = buildPluginWorkerHandler("volume_balance", dialOption)
handler, err = buildPluginWorkerHandler("volume_balance", dialOption, testMaxConcurrency)
if err != nil { if err != nil {
t.Fatalf("buildPluginWorkerHandler(volume_balance) err = %v", err) t.Fatalf("buildPluginWorkerHandler(volume_balance) err = %v", err)
} }
@ -40,7 +43,7 @@ func TestBuildPluginWorkerHandler(t *testing.T) {
t.Fatalf("expected non-nil volume_balance handler") t.Fatalf("expected non-nil volume_balance handler")
} }
handler, err = buildPluginWorkerHandler("balance", dialOption)
handler, err = buildPluginWorkerHandler("balance", dialOption, testMaxConcurrency)
if err != nil { if err != nil {
t.Fatalf("buildPluginWorkerHandler(balance alias) err = %v", err) t.Fatalf("buildPluginWorkerHandler(balance alias) err = %v", err)
} }
@ -48,7 +51,7 @@ func TestBuildPluginWorkerHandler(t *testing.T) {
t.Fatalf("expected non-nil balance alias handler") t.Fatalf("expected non-nil balance alias handler")
} }
handler, err = buildPluginWorkerHandler("erasure_coding", dialOption)
handler, err = buildPluginWorkerHandler("erasure_coding", dialOption, testMaxConcurrency)
if err != nil { if err != nil {
t.Fatalf("buildPluginWorkerHandler(erasure_coding) err = %v", err) t.Fatalf("buildPluginWorkerHandler(erasure_coding) err = %v", err)
} }
@ -56,7 +59,7 @@ func TestBuildPluginWorkerHandler(t *testing.T) {
t.Fatalf("expected non-nil erasure_coding handler") t.Fatalf("expected non-nil erasure_coding handler")
} }
handler, err = buildPluginWorkerHandler("ec", dialOption)
handler, err = buildPluginWorkerHandler("ec", dialOption, testMaxConcurrency)
if err != nil { if err != nil {
t.Fatalf("buildPluginWorkerHandler(ec alias) err = %v", err) t.Fatalf("buildPluginWorkerHandler(ec alias) err = %v", err)
} }
@ -64,7 +67,7 @@ func TestBuildPluginWorkerHandler(t *testing.T) {
t.Fatalf("expected non-nil ec alias handler") t.Fatalf("expected non-nil ec alias handler")
} }
_, err = buildPluginWorkerHandler("unknown", dialOption)
_, err = buildPluginWorkerHandler("unknown", dialOption, testMaxConcurrency)
if err == nil { if err == nil {
t.Fatalf("expected unsupported job type error") t.Fatalf("expected unsupported job type error")
} }
@ -73,7 +76,9 @@ func TestBuildPluginWorkerHandler(t *testing.T) {
func TestBuildPluginWorkerHandlers(t *testing.T) { func TestBuildPluginWorkerHandlers(t *testing.T) {
dialOption := grpc.WithTransportCredentials(insecure.NewCredentials()) dialOption := grpc.WithTransportCredentials(insecure.NewCredentials())
handlers, err := buildPluginWorkerHandlers("vacuum,volume_balance,erasure_coding", dialOption)
testMaxConcurrency := int(pluginworker.DefaultMaxExecutionConcurrency)
handlers, err := buildPluginWorkerHandlers("vacuum,volume_balance,erasure_coding", dialOption, testMaxConcurrency)
if err != nil { if err != nil {
t.Fatalf("buildPluginWorkerHandlers(list) err = %v", err) t.Fatalf("buildPluginWorkerHandlers(list) err = %v", err)
} }
@ -81,7 +86,7 @@ func TestBuildPluginWorkerHandlers(t *testing.T) {
t.Fatalf("expected 3 handlers, got %d", len(handlers)) t.Fatalf("expected 3 handlers, got %d", len(handlers))
} }
handlers, err = buildPluginWorkerHandlers("balance,ec,vacuum,balance", dialOption)
handlers, err = buildPluginWorkerHandlers("balance,ec,vacuum,balance", dialOption, testMaxConcurrency)
if err != nil { if err != nil {
t.Fatalf("buildPluginWorkerHandlers(aliases) err = %v", err) t.Fatalf("buildPluginWorkerHandlers(aliases) err = %v", err)
} }
@ -89,7 +94,7 @@ func TestBuildPluginWorkerHandlers(t *testing.T) {
t.Fatalf("expected deduped 3 handlers, got %d", len(handlers)) t.Fatalf("expected deduped 3 handlers, got %d", len(handlers))
} }
_, err = buildPluginWorkerHandlers("unknown,vacuum", dialOption)
_, err = buildPluginWorkerHandlers("unknown,vacuum", dialOption, testMaxConcurrency)
if err == nil { if err == nil {
t.Fatalf("expected unsupported job type error") t.Fatalf("expected unsupported job type error")
} }

18
weed/command/worker_runtime.go

@ -81,7 +81,7 @@ func runPluginWorkerWithOptions(options pluginWorkerRunOptions) bool {
return false return false
} }
handlers, err := buildPluginWorkerHandlers(options.JobTypes, dialOption)
handlers, err := buildPluginWorkerHandlers(options.JobTypes, dialOption, options.MaxExecute)
if err != nil { if err != nil {
glog.Errorf("Failed to build plugin worker handlers: %v", err) glog.Errorf("Failed to build plugin worker handlers: %v", err)
return false return false
@ -157,7 +157,13 @@ func resolvePluginWorkerID(explicitID string, workingDir string) (string, error)
return generated, nil return generated, nil
} }
func buildPluginWorkerHandler(jobType string, dialOption grpc.DialOption) (pluginworker.JobHandler, error) {
// buildPluginWorkerHandler constructs the JobHandler for the given job type.
// maxExecute is forwarded to handlers that use it to report their own
// MaxExecutionConcurrency in Capability for consistency and future-proofing.
// The scheduler's effective per-worker MaxExecutionConcurrency is derived from
// the worker-level configuration (e.g. WorkerOptions.MaxExecutionConcurrency),
// not directly from the handler's Capability.
func buildPluginWorkerHandler(jobType string, dialOption grpc.DialOption, maxExecute int) (pluginworker.JobHandler, error) {
canonicalJobType, err := canonicalPluginWorkerJobType(jobType) canonicalJobType, err := canonicalPluginWorkerJobType(jobType)
if err != nil { if err != nil {
return nil, err return nil, err
@ -165,7 +171,7 @@ func buildPluginWorkerHandler(jobType string, dialOption grpc.DialOption) (plugi
switch canonicalJobType { switch canonicalJobType {
case "vacuum": case "vacuum":
return pluginworker.NewVacuumHandler(dialOption), nil
return pluginworker.NewVacuumHandler(dialOption, int32(maxExecute)), nil
case "volume_balance": case "volume_balance":
return pluginworker.NewVolumeBalanceHandler(dialOption), nil return pluginworker.NewVolumeBalanceHandler(dialOption), nil
case "erasure_coding": case "erasure_coding":
@ -175,7 +181,9 @@ func buildPluginWorkerHandler(jobType string, dialOption grpc.DialOption) (plugi
} }
} }
func buildPluginWorkerHandlers(jobTypes string, dialOption grpc.DialOption) ([]pluginworker.JobHandler, error) {
// buildPluginWorkerHandlers constructs a deduplicated slice of JobHandlers for
// the comma-separated jobTypes string, forwarding maxExecute to each handler.
func buildPluginWorkerHandlers(jobTypes string, dialOption grpc.DialOption, maxExecute int) ([]pluginworker.JobHandler, error) {
parsedJobTypes, err := parsePluginWorkerJobTypes(jobTypes) parsedJobTypes, err := parsePluginWorkerJobTypes(jobTypes)
if err != nil { if err != nil {
return nil, err return nil, err
@ -183,7 +191,7 @@ func buildPluginWorkerHandlers(jobTypes string, dialOption grpc.DialOption) ([]p
handlers := make([]pluginworker.JobHandler, 0, len(parsedJobTypes)) handlers := make([]pluginworker.JobHandler, 0, len(parsedJobTypes))
for _, jobType := range parsedJobTypes { for _, jobType := range parsedJobTypes {
handler, buildErr := buildPluginWorkerHandler(jobType, dialOption)
handler, buildErr := buildPluginWorkerHandler(jobType, dialOption, maxExecute)
if buildErr != nil { if buildErr != nil {
return nil, buildErr return nil, buildErr
} }

22
weed/plugin/worker/vacuum_handler.go

@ -22,25 +22,37 @@ import (
) )
const ( const (
defaultVacuumTaskBatchSize = int32(1000)
defaultVacuumTaskBatchSize = int32(1000)
DefaultMaxExecutionConcurrency = int32(2)
) )
// VacuumHandler is the plugin job handler for vacuum job type. // VacuumHandler is the plugin job handler for vacuum job type.
type VacuumHandler struct { type VacuumHandler struct {
grpcDialOption grpc.DialOption
grpcDialOption grpc.DialOption
maxExecutionConcurrency int32
} }
func NewVacuumHandler(grpcDialOption grpc.DialOption) *VacuumHandler {
return &VacuumHandler{grpcDialOption: grpcDialOption}
// NewVacuumHandler creates a VacuumHandler with the given gRPC dial option and
// maximum execution concurrency. When maxExecutionConcurrency is zero or
// negative, DefaultMaxExecutionConcurrency is used as the fallback.
func NewVacuumHandler(grpcDialOption grpc.DialOption, maxExecutionConcurrency int32) *VacuumHandler {
return &VacuumHandler{grpcDialOption: grpcDialOption, maxExecutionConcurrency: maxExecutionConcurrency}
} }
// Capability returns the job type capability for the vacuum handler.
// MaxExecutionConcurrency reflects the value passed at construction time,
// falling back to DefaultMaxExecutionConcurrency when unset.
func (h *VacuumHandler) Capability() *plugin_pb.JobTypeCapability { func (h *VacuumHandler) Capability() *plugin_pb.JobTypeCapability {
maxExec := h.maxExecutionConcurrency
if maxExec <= 0 {
maxExec = DefaultMaxExecutionConcurrency
}
return &plugin_pb.JobTypeCapability{ return &plugin_pb.JobTypeCapability{
JobType: "vacuum", JobType: "vacuum",
CanDetect: true, CanDetect: true,
CanExecute: true, CanExecute: true,
MaxDetectionConcurrency: 1, MaxDetectionConcurrency: 1,
MaxExecutionConcurrency: 2,
MaxExecutionConcurrency: maxExec,
DisplayName: "Volume Vacuum", DisplayName: "Volume Vacuum",
Description: "Reclaims disk space by removing deleted files from volumes", Description: "Reclaims disk space by removing deleted files from volumes",
} }

4
weed/plugin/worker/vacuum_handler_test.go

@ -141,7 +141,7 @@ func TestShouldSkipDetectionByInterval(t *testing.T) {
} }
func TestVacuumHandlerRejectsUnsupportedJobType(t *testing.T) { func TestVacuumHandlerRejectsUnsupportedJobType(t *testing.T) {
handler := NewVacuumHandler(nil)
handler := NewVacuumHandler(nil, 0)
err := handler.Detect(context.Background(), &plugin_pb.RunDetectionRequest{ err := handler.Detect(context.Background(), &plugin_pb.RunDetectionRequest{
JobType: "balance", JobType: "balance",
}, noopDetectionSender{}) }, noopDetectionSender{})
@ -158,7 +158,7 @@ func TestVacuumHandlerRejectsUnsupportedJobType(t *testing.T) {
} }
func TestVacuumHandlerDetectSkipsByMinInterval(t *testing.T) { func TestVacuumHandlerDetectSkipsByMinInterval(t *testing.T) {
handler := NewVacuumHandler(nil)
handler := NewVacuumHandler(nil, 0)
sender := &recordingDetectionSender{} sender := &recordingDetectionSender{}
err := handler.Detect(context.Background(), &plugin_pb.RunDetectionRequest{ err := handler.Detect(context.Background(), &plugin_pb.RunDetectionRequest{
JobType: "vacuum", JobType: "vacuum",

Loading…
Cancel
Save