diff --git a/weed/command/mini.go b/weed/command/mini.go index bb7b6ebc3..051d319da 100644 --- a/weed/command/mini.go +++ b/weed/command/mini.go @@ -1186,7 +1186,7 @@ func startMiniPluginWorker(ctx context.Context) { util.LoadConfiguration("security", false) grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.worker") - handlers, err := buildPluginWorkerHandlers(defaultMiniPluginJobTypes, grpcDialOption) + handlers, err := buildPluginWorkerHandlers(defaultMiniPluginJobTypes, grpcDialOption, int(pluginworker.DefaultMaxExecutionConcurrency)) if err != nil { glog.Fatalf("Failed to build mini plugin worker handlers: %v", err) } @@ -1204,7 +1204,7 @@ func startMiniPluginWorker(ctx context.Context) { HeartbeatInterval: 15 * time.Second, ReconnectDelay: 5 * time.Second, MaxDetectionConcurrency: 1, - MaxExecutionConcurrency: 2, + MaxExecutionConcurrency: int(pluginworker.DefaultMaxExecutionConcurrency), GrpcDialOption: grpcDialOption, Handlers: handlers, }) diff --git a/weed/command/plugin_worker_test.go b/weed/command/plugin_worker_test.go index 7c357cbc0..56f4f38c4 100644 --- a/weed/command/plugin_worker_test.go +++ b/weed/command/plugin_worker_test.go @@ -9,6 +9,7 @@ import ( "strings" "testing" + pluginworker "github.com/seaweedfs/seaweedfs/weed/plugin/worker" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" ) @@ -16,7 +17,9 @@ import ( func TestBuildPluginWorkerHandler(t *testing.T) { dialOption := grpc.WithTransportCredentials(insecure.NewCredentials()) - handler, err := buildPluginWorkerHandler("vacuum", dialOption) + testMaxConcurrency := int(pluginworker.DefaultMaxExecutionConcurrency) + + handler, err := buildPluginWorkerHandler("vacuum", dialOption, testMaxConcurrency) if err != nil { t.Fatalf("buildPluginWorkerHandler(vacuum) err = %v", err) } @@ -24,7 +27,7 @@ func TestBuildPluginWorkerHandler(t *testing.T) { t.Fatalf("expected non-nil handler") } - handler, err = buildPluginWorkerHandler("", dialOption) + handler, err = buildPluginWorkerHandler("", dialOption, testMaxConcurrency) if err != nil { t.Fatalf("buildPluginWorkerHandler(default) err = %v", err) } @@ -32,7 +35,7 @@ func TestBuildPluginWorkerHandler(t *testing.T) { t.Fatalf("expected non-nil default handler") } - handler, err = buildPluginWorkerHandler("volume_balance", dialOption) + handler, err = buildPluginWorkerHandler("volume_balance", dialOption, testMaxConcurrency) if err != nil { t.Fatalf("buildPluginWorkerHandler(volume_balance) err = %v", err) } @@ -40,7 +43,7 @@ func TestBuildPluginWorkerHandler(t *testing.T) { t.Fatalf("expected non-nil volume_balance handler") } - handler, err = buildPluginWorkerHandler("balance", dialOption) + handler, err = buildPluginWorkerHandler("balance", dialOption, testMaxConcurrency) if err != nil { t.Fatalf("buildPluginWorkerHandler(balance alias) err = %v", err) } @@ -48,7 +51,7 @@ func TestBuildPluginWorkerHandler(t *testing.T) { t.Fatalf("expected non-nil balance alias handler") } - handler, err = buildPluginWorkerHandler("erasure_coding", dialOption) + handler, err = buildPluginWorkerHandler("erasure_coding", dialOption, testMaxConcurrency) if err != nil { t.Fatalf("buildPluginWorkerHandler(erasure_coding) err = %v", err) } @@ -56,7 +59,7 @@ func TestBuildPluginWorkerHandler(t *testing.T) { t.Fatalf("expected non-nil erasure_coding handler") } - handler, err = buildPluginWorkerHandler("ec", dialOption) + handler, err = buildPluginWorkerHandler("ec", dialOption, testMaxConcurrency) if err != nil { t.Fatalf("buildPluginWorkerHandler(ec alias) err = %v", err) } @@ -64,7 +67,7 @@ func TestBuildPluginWorkerHandler(t *testing.T) { t.Fatalf("expected non-nil ec alias handler") } - _, err = buildPluginWorkerHandler("unknown", dialOption) + _, err = buildPluginWorkerHandler("unknown", dialOption, testMaxConcurrency) if err == nil { t.Fatalf("expected unsupported job type error") } @@ -73,7 +76,9 @@ func TestBuildPluginWorkerHandler(t *testing.T) { func TestBuildPluginWorkerHandlers(t *testing.T) { dialOption := grpc.WithTransportCredentials(insecure.NewCredentials()) - handlers, err := buildPluginWorkerHandlers("vacuum,volume_balance,erasure_coding", dialOption) + testMaxConcurrency := int(pluginworker.DefaultMaxExecutionConcurrency) + + handlers, err := buildPluginWorkerHandlers("vacuum,volume_balance,erasure_coding", dialOption, testMaxConcurrency) if err != nil { t.Fatalf("buildPluginWorkerHandlers(list) err = %v", err) } @@ -81,7 +86,7 @@ func TestBuildPluginWorkerHandlers(t *testing.T) { t.Fatalf("expected 3 handlers, got %d", len(handlers)) } - handlers, err = buildPluginWorkerHandlers("balance,ec,vacuum,balance", dialOption) + handlers, err = buildPluginWorkerHandlers("balance,ec,vacuum,balance", dialOption, testMaxConcurrency) if err != nil { t.Fatalf("buildPluginWorkerHandlers(aliases) err = %v", err) } @@ -89,7 +94,7 @@ func TestBuildPluginWorkerHandlers(t *testing.T) { t.Fatalf("expected deduped 3 handlers, got %d", len(handlers)) } - _, err = buildPluginWorkerHandlers("unknown,vacuum", dialOption) + _, err = buildPluginWorkerHandlers("unknown,vacuum", dialOption, testMaxConcurrency) if err == nil { t.Fatalf("expected unsupported job type error") } diff --git a/weed/command/worker_runtime.go b/weed/command/worker_runtime.go index 20069a14d..27c381d4a 100644 --- a/weed/command/worker_runtime.go +++ b/weed/command/worker_runtime.go @@ -81,7 +81,7 @@ func runPluginWorkerWithOptions(options pluginWorkerRunOptions) bool { return false } - handlers, err := buildPluginWorkerHandlers(options.JobTypes, dialOption) + handlers, err := buildPluginWorkerHandlers(options.JobTypes, dialOption, options.MaxExecute) if err != nil { glog.Errorf("Failed to build plugin worker handlers: %v", err) return false @@ -157,7 +157,13 @@ func resolvePluginWorkerID(explicitID string, workingDir string) (string, error) return generated, nil } -func buildPluginWorkerHandler(jobType string, dialOption grpc.DialOption) (pluginworker.JobHandler, error) { +// buildPluginWorkerHandler constructs the JobHandler for the given job type. +// maxExecute is forwarded to handlers that use it to report their own +// MaxExecutionConcurrency in Capability for consistency and future-proofing. +// The scheduler's effective per-worker MaxExecutionConcurrency is derived from +// the worker-level configuration (e.g. WorkerOptions.MaxExecutionConcurrency), +// not directly from the handler's Capability. +func buildPluginWorkerHandler(jobType string, dialOption grpc.DialOption, maxExecute int) (pluginworker.JobHandler, error) { canonicalJobType, err := canonicalPluginWorkerJobType(jobType) if err != nil { return nil, err @@ -165,7 +171,7 @@ func buildPluginWorkerHandler(jobType string, dialOption grpc.DialOption) (plugi switch canonicalJobType { case "vacuum": - return pluginworker.NewVacuumHandler(dialOption), nil + return pluginworker.NewVacuumHandler(dialOption, int32(maxExecute)), nil case "volume_balance": return pluginworker.NewVolumeBalanceHandler(dialOption), nil case "erasure_coding": @@ -175,7 +181,9 @@ func buildPluginWorkerHandler(jobType string, dialOption grpc.DialOption) (plugi } } -func buildPluginWorkerHandlers(jobTypes string, dialOption grpc.DialOption) ([]pluginworker.JobHandler, error) { +// buildPluginWorkerHandlers constructs a deduplicated slice of JobHandlers for +// the comma-separated jobTypes string, forwarding maxExecute to each handler. +func buildPluginWorkerHandlers(jobTypes string, dialOption grpc.DialOption, maxExecute int) ([]pluginworker.JobHandler, error) { parsedJobTypes, err := parsePluginWorkerJobTypes(jobTypes) if err != nil { return nil, err @@ -183,7 +191,7 @@ func buildPluginWorkerHandlers(jobTypes string, dialOption grpc.DialOption) ([]p handlers := make([]pluginworker.JobHandler, 0, len(parsedJobTypes)) for _, jobType := range parsedJobTypes { - handler, buildErr := buildPluginWorkerHandler(jobType, dialOption) + handler, buildErr := buildPluginWorkerHandler(jobType, dialOption, maxExecute) if buildErr != nil { return nil, buildErr } diff --git a/weed/plugin/worker/vacuum_handler.go b/weed/plugin/worker/vacuum_handler.go index 6326a45b1..fdb51b5fb 100644 --- a/weed/plugin/worker/vacuum_handler.go +++ b/weed/plugin/worker/vacuum_handler.go @@ -22,25 +22,37 @@ import ( ) const ( - defaultVacuumTaskBatchSize = int32(1000) + defaultVacuumTaskBatchSize = int32(1000) + DefaultMaxExecutionConcurrency = int32(2) ) // VacuumHandler is the plugin job handler for vacuum job type. type VacuumHandler struct { - grpcDialOption grpc.DialOption + grpcDialOption grpc.DialOption + maxExecutionConcurrency int32 } -func NewVacuumHandler(grpcDialOption grpc.DialOption) *VacuumHandler { - return &VacuumHandler{grpcDialOption: grpcDialOption} +// NewVacuumHandler creates a VacuumHandler with the given gRPC dial option and +// maximum execution concurrency. When maxExecutionConcurrency is zero or +// negative, DefaultMaxExecutionConcurrency is used as the fallback. +func NewVacuumHandler(grpcDialOption grpc.DialOption, maxExecutionConcurrency int32) *VacuumHandler { + return &VacuumHandler{grpcDialOption: grpcDialOption, maxExecutionConcurrency: maxExecutionConcurrency} } +// Capability returns the job type capability for the vacuum handler. +// MaxExecutionConcurrency reflects the value passed at construction time, +// falling back to DefaultMaxExecutionConcurrency when unset. func (h *VacuumHandler) Capability() *plugin_pb.JobTypeCapability { + maxExec := h.maxExecutionConcurrency + if maxExec <= 0 { + maxExec = DefaultMaxExecutionConcurrency + } return &plugin_pb.JobTypeCapability{ JobType: "vacuum", CanDetect: true, CanExecute: true, MaxDetectionConcurrency: 1, - MaxExecutionConcurrency: 2, + MaxExecutionConcurrency: maxExec, DisplayName: "Volume Vacuum", Description: "Reclaims disk space by removing deleted files from volumes", } diff --git a/weed/plugin/worker/vacuum_handler_test.go b/weed/plugin/worker/vacuum_handler_test.go index 3f2528974..05454266d 100644 --- a/weed/plugin/worker/vacuum_handler_test.go +++ b/weed/plugin/worker/vacuum_handler_test.go @@ -141,7 +141,7 @@ func TestShouldSkipDetectionByInterval(t *testing.T) { } func TestVacuumHandlerRejectsUnsupportedJobType(t *testing.T) { - handler := NewVacuumHandler(nil) + handler := NewVacuumHandler(nil, 0) err := handler.Detect(context.Background(), &plugin_pb.RunDetectionRequest{ JobType: "balance", }, noopDetectionSender{}) @@ -158,7 +158,7 @@ func TestVacuumHandlerRejectsUnsupportedJobType(t *testing.T) { } func TestVacuumHandlerDetectSkipsByMinInterval(t *testing.T) { - handler := NewVacuumHandler(nil) + handler := NewVacuumHandler(nil, 0) sender := &recordingDetectionSender{} err := handler.Detect(context.Background(), &plugin_pb.RunDetectionRequest{ JobType: "vacuum",