From 587c24ec89ea9c0c30d8275b83aa519a0c4cf1c3 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 7 Mar 2026 18:30:58 -0800 Subject: [PATCH] plugin worker: support job type categories (all, default, heavy) (#8547) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * plugin worker: add handler registry with job categories Introduce a self-registration pattern for plugin worker job handlers. Each handler can register itself via init() with a HandlerFactory that declares its job type, category (default/heavy), CLI aliases, and a builder function. ResolveHandlerFactories accepts a mix of category names ("all", "default", "heavy") and explicit job type names/aliases, returning the matching factories. This enables workers to be configured by resource profile rather than requiring explicit job type enumeration. * plugin worker: register all handlers via init() Each job handler now self-registers into the global handler registry with its canonical job type, category, CLI aliases, and build function: - vacuum: category=default - volume_balance: category=default - admin_script: category=default - erasure_coding: category=heavy - iceberg_maintenance: category=heavy Adding a new job type now only requires adding the init() call in the handler file itself — no other files need to be touched. * plugin worker: replace hardcoded job type switch with registry Remove buildPluginWorkerHandler, parsePluginWorkerJobTypes, and canonicalPluginWorkerJobType from worker_runtime.go. The simplified buildPluginWorkerHandlers now delegates to pluginworker.ResolveHandlerFactories, which resolves category names ("all", "default", "heavy") and explicit job type names/aliases. The default job type is changed from an explicit list to "all", so new handlers registered via init() are automatically picked up. Update all tests to use the new API. * plugin worker: update CLI help text for job categories Update the -jobType flag description and command examples to document category support (all, default, heavy) alongside explicit job type names. * plugin worker: address review feedback - Add CategoryAll constant; use typed constants in tokenAsCategory - Pre-allocate result slice in ResolveHandlerFactories - Add vacuum aliases (vol.vacuum, volume.vacuum) - List alias examples (ec, balance, iceberg) in -jobType flag help - Create handlers aggregator package for subpackage blank imports so new handler subpackages only need to be added in one place - Make category tests relationship-based (subset/union checks) instead of asserting exact handler counts - Add clarifying comments to worker_test.go and mini_plugin_test.go listing expected handler names next to count assertions --------- Co-authored-by: Copilot --- weed/command/mini_plugin_test.go | 18 +- weed/command/plugin_worker_test.go | 168 +++++++++++-------- weed/command/worker.go | 21 ++- weed/command/worker_runtime.go | 105 +++--------- weed/command/worker_test.go | 18 +- weed/plugin/worker/admin_script_handler.go | 11 ++ weed/plugin/worker/erasure_coding_handler.go | 11 ++ weed/plugin/worker/handler_registry.go | 130 ++++++++++++++ weed/plugin/worker/handlers/handlers.go | 9 + weed/plugin/worker/iceberg/handler.go | 11 ++ weed/plugin/worker/vacuum_handler.go | 11 ++ weed/plugin/worker/volume_balance_handler.go | 11 ++ 12 files changed, 349 insertions(+), 175 deletions(-) create mode 100644 weed/plugin/worker/handler_registry.go create mode 100644 weed/plugin/worker/handlers/handlers.go diff --git a/weed/command/mini_plugin_test.go b/weed/command/mini_plugin_test.go index 45f5e9482..096a4c668 100644 --- a/weed/command/mini_plugin_test.go +++ b/weed/command/mini_plugin_test.go @@ -1,13 +1,21 @@ package command -import "testing" +import ( + "testing" + + pluginworker "github.com/seaweedfs/seaweedfs/weed/plugin/worker" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) func TestMiniDefaultPluginJobTypes(t *testing.T) { - jobTypes, err := parsePluginWorkerJobTypes(defaultMiniPluginJobTypes) + dialOption := grpc.WithTransportCredentials(insecure.NewCredentials()) + // defaultMiniPluginJobTypes is an explicit list: "vacuum,volume_balance,erasure_coding,admin_script" + handlers, err := buildPluginWorkerHandlers(defaultMiniPluginJobTypes, dialOption, int(pluginworker.DefaultMaxExecutionConcurrency), "") if err != nil { - t.Fatalf("parsePluginWorkerJobTypes(mini default) err = %v", err) + t.Fatalf("buildPluginWorkerHandlers(mini default) err = %v", err) } - if len(jobTypes) != 4 { - t.Fatalf("expected mini default job types to include 4 handlers, got %v", jobTypes) + if len(handlers) != 4 { + t.Fatalf("expected mini default job types to include 4 handlers, got %d", len(handlers)) } } diff --git a/weed/command/plugin_worker_test.go b/weed/command/plugin_worker_test.go index 26c250cb5..0a3722a4b 100644 --- a/weed/command/plugin_worker_test.go +++ b/weed/command/plugin_worker_test.go @@ -6,7 +6,6 @@ import ( "net/http/httptest" "os" "path/filepath" - "slices" "strings" "testing" @@ -15,68 +14,46 @@ import ( "google.golang.org/grpc/credentials/insecure" ) -func TestBuildPluginWorkerHandler(t *testing.T) { +func TestBuildPluginWorkerHandlerExplicitTypes(t *testing.T) { dialOption := grpc.WithTransportCredentials(insecure.NewCredentials()) - testMaxConcurrency := int(pluginworker.DefaultMaxExecutionConcurrency) - handler, err := buildPluginWorkerHandler("vacuum", dialOption, testMaxConcurrency, "") - if err != nil { - t.Fatalf("buildPluginWorkerHandler(vacuum) err = %v", err) - } - if handler == nil { - t.Fatalf("expected non-nil handler") - } - - handler, err = buildPluginWorkerHandler("", dialOption, testMaxConcurrency, "") - if err != nil { - t.Fatalf("buildPluginWorkerHandler(default) err = %v", err) - } - if handler == nil { - t.Fatalf("expected non-nil default handler") - } - - handler, err = buildPluginWorkerHandler("volume_balance", dialOption, testMaxConcurrency, "") - if err != nil { - t.Fatalf("buildPluginWorkerHandler(volume_balance) err = %v", err) - } - if handler == nil { - t.Fatalf("expected non-nil volume_balance handler") - } - - handler, err = buildPluginWorkerHandler("balance", dialOption, testMaxConcurrency, "") - if err != nil { - t.Fatalf("buildPluginWorkerHandler(balance alias) err = %v", err) - } - if handler == nil { - t.Fatalf("expected non-nil balance alias handler") + for _, jobType := range []string{"vacuum", "volume_balance", "erasure_coding", "admin_script", "iceberg_maintenance"} { + handlers, err := buildPluginWorkerHandlers(jobType, dialOption, testMaxConcurrency, "") + if err != nil { + t.Fatalf("buildPluginWorkerHandlers(%s) err = %v", jobType, err) + } + if len(handlers) != 1 { + t.Fatalf("expected 1 handler for %s, got %d", jobType, len(handlers)) + } } +} - handler, err = buildPluginWorkerHandler("erasure_coding", dialOption, testMaxConcurrency, "") - if err != nil { - t.Fatalf("buildPluginWorkerHandler(erasure_coding) err = %v", err) - } - if handler == nil { - t.Fatalf("expected non-nil erasure_coding handler") - } +func TestBuildPluginWorkerHandlerAliases(t *testing.T) { + dialOption := grpc.WithTransportCredentials(insecure.NewCredentials()) + testMaxConcurrency := int(pluginworker.DefaultMaxExecutionConcurrency) - handler, err = buildPluginWorkerHandler("ec", dialOption, testMaxConcurrency, "") - if err != nil { - t.Fatalf("buildPluginWorkerHandler(ec alias) err = %v", err) - } - if handler == nil { - t.Fatalf("expected non-nil ec alias handler") + for _, alias := range []string{"balance", "ec", "iceberg", "admin", "script"} { + handlers, err := buildPluginWorkerHandlers(alias, dialOption, testMaxConcurrency, "") + if err != nil { + t.Fatalf("buildPluginWorkerHandlers(%s) err = %v", alias, err) + } + if len(handlers) != 1 { + t.Fatalf("expected 1 handler for alias %s, got %d", alias, len(handlers)) + } } +} - _, err = buildPluginWorkerHandler("unknown", dialOption, testMaxConcurrency, "") +func TestBuildPluginWorkerHandlerUnknown(t *testing.T) { + dialOption := grpc.WithTransportCredentials(insecure.NewCredentials()) + _, err := buildPluginWorkerHandlers("unknown", dialOption, 1, "") if err == nil { - t.Fatalf("expected unsupported job type error") + t.Fatalf("expected error for unknown job type") } } func TestBuildPluginWorkerHandlers(t *testing.T) { dialOption := grpc.WithTransportCredentials(insecure.NewCredentials()) - testMaxConcurrency := int(pluginworker.DefaultMaxExecutionConcurrency) handlers, err := buildPluginWorkerHandlers("vacuum,volume_balance,erasure_coding", dialOption, testMaxConcurrency, "") @@ -101,50 +78,97 @@ func TestBuildPluginWorkerHandlers(t *testing.T) { } } -func TestParsePluginWorkerJobTypes(t *testing.T) { - jobTypes, err := parsePluginWorkerJobTypes("") +func TestBuildPluginWorkerHandlersCategories(t *testing.T) { + dialOption := grpc.WithTransportCredentials(insecure.NewCredentials()) + testMaxConcurrency := int(pluginworker.DefaultMaxExecutionConcurrency) + + allHandlers, err := buildPluginWorkerHandlers("all", dialOption, testMaxConcurrency, "") if err != nil { - t.Fatalf("parsePluginWorkerJobTypes(default) err = %v", err) + t.Fatalf("buildPluginWorkerHandlers(all) err = %v", err) + } + // "all" must include at least vacuum and erasure_coding (one default, one heavy) + allNames := handlerJobTypes(allHandlers) + for _, required := range []string{"vacuum", "erasure_coding", "iceberg_maintenance"} { + if !allNames[required] { + t.Fatalf("'all' missing expected job type %q, got %v", required, allNames) + } } - if len(jobTypes) != 1 || jobTypes[0] != "vacuum" { - t.Fatalf("expected default [vacuum], got %v", jobTypes) + + defaultHandlers, err := buildPluginWorkerHandlers("default", dialOption, testMaxConcurrency, "") + if err != nil { + t.Fatalf("buildPluginWorkerHandlers(default) err = %v", err) } + defaultNames := handlerJobTypes(defaultHandlers) - jobTypes, err = parsePluginWorkerJobTypes(" volume_balance , ec , vacuum , volume_balance ") + heavyHandlers, err := buildPluginWorkerHandlers("heavy", dialOption, testMaxConcurrency, "") if err != nil { - t.Fatalf("parsePluginWorkerJobTypes(list) err = %v", err) + t.Fatalf("buildPluginWorkerHandlers(heavy) err = %v", err) + } + heavyNames := handlerJobTypes(heavyHandlers) + + // default and heavy must both be non-empty subsets of all + if len(defaultNames) == 0 { + t.Fatalf("'default' resolved no handlers") + } + if len(heavyNames) == 0 { + t.Fatalf("'heavy' resolved no handlers") } - if len(jobTypes) != 3 { - t.Fatalf("expected 3 deduped job types, got %d (%v)", len(jobTypes), jobTypes) + for name := range defaultNames { + if !allNames[name] { + t.Fatalf("default handler %q not in 'all'", name) + } } - if jobTypes[0] != "volume_balance" || jobTypes[1] != "erasure_coding" || jobTypes[2] != "vacuum" { - t.Fatalf("unexpected parsed order %v", jobTypes) + for name := range heavyNames { + if !allNames[name] { + t.Fatalf("heavy handler %q not in 'all'", name) + } } - if _, err = parsePluginWorkerJobTypes(" , "); err != nil { - t.Fatalf("expected empty list to resolve to default vacuum: %v", err) + // default and heavy must be disjoint and their union must equal all + for name := range defaultNames { + if heavyNames[name] { + t.Fatalf("handler %q appears in both default and heavy", name) + } + } + if len(defaultNames)+len(heavyNames) != len(allNames) { + t.Fatalf("union(default=%d, heavy=%d) != all(%d)", len(defaultNames), len(heavyNames), len(allNames)) } - jobTypes, err = parsePluginWorkerJobTypes("admin-script,script,admin_script") + // mix category + explicit: "default,iceberg" adds one heavy to default set + mixedHandlers, err := buildPluginWorkerHandlers("default,iceberg", dialOption, testMaxConcurrency, "") if err != nil { - t.Fatalf("parsePluginWorkerJobTypes(admin script aliases) err = %v", err) + t.Fatalf("buildPluginWorkerHandlers(default,iceberg) err = %v", err) } - if len(jobTypes) != 1 || jobTypes[0] != "admin_script" { - t.Fatalf("expected admin_script alias to resolve, got %v", jobTypes) + if len(mixedHandlers) != len(defaultHandlers)+1 { + t.Fatalf("expected default+1 handlers for 'default,iceberg', got %d (default=%d)", len(mixedHandlers), len(defaultHandlers)) } } func TestPluginWorkerDefaultJobTypes(t *testing.T) { - jobTypes, err := parsePluginWorkerJobTypes(defaultPluginWorkerJobTypes) + dialOption := grpc.WithTransportCredentials(insecure.NewCredentials()) + testMaxConcurrency := int(pluginworker.DefaultMaxExecutionConcurrency) + + // defaultPluginWorkerJobTypes is "all", so it should match the "all" category exactly + defaultHandlers, err := buildPluginWorkerHandlers(defaultPluginWorkerJobTypes, dialOption, testMaxConcurrency, "") + if err != nil { + t.Fatalf("buildPluginWorkerHandlers(default setting) err = %v", err) + } + allHandlers, err := buildPluginWorkerHandlers("all", dialOption, testMaxConcurrency, "") if err != nil { - t.Fatalf("parsePluginWorkerJobTypes(default setting) err = %v", err) + t.Fatalf("buildPluginWorkerHandlers(all) err = %v", err) } - if len(jobTypes) != 5 { - t.Fatalf("expected default job types to include 5 handlers, got %v", jobTypes) + if len(defaultHandlers) != len(allHandlers) { + t.Fatalf("default setting resolved %d handlers, 'all' resolved %d", len(defaultHandlers), len(allHandlers)) } - if !slices.Contains(jobTypes, "iceberg_maintenance") { - t.Fatalf("expected iceberg_maintenance in default job types, got %v", jobTypes) +} + +// handlerJobTypes returns the set of job type names from a slice of handlers. +func handlerJobTypes(handlers []pluginworker.JobHandler) map[string]bool { + m := make(map[string]bool, len(handlers)) + for _, h := range handlers { + m[h.Capability().JobType] = true } + return m } func TestResolvePluginWorkerID(t *testing.T) { diff --git a/weed/command/worker.go b/weed/command/worker.go index 962fc87f3..4db89b121 100644 --- a/weed/command/worker.go +++ b/weed/command/worker.go @@ -7,25 +7,30 @@ import ( ) var cmdWorker = &Command{ - UsageLine: "worker -admin= [-id=] [-jobType=vacuum,volume_balance,erasure_coding,admin_script] [-workingDir=] [-heartbeat=15s] [-reconnect=5s] [-maxDetect=1] [-maxExecute=4] [-metricsPort=] [-metricsIp=] [-debug]", + UsageLine: "worker -admin= [-id=] [-jobType=all] [-workingDir=] [-heartbeat=15s] [-reconnect=5s] [-maxDetect=1] [-maxExecute=4] [-metricsPort=] [-metricsIp=] [-debug]", Short: "start a plugin.proto worker process", Long: `Start an external plugin worker using weed/pb/plugin.proto over gRPC. -This command provides vacuum, volume_balance, erasure_coding, and admin_script job type -contracts with the plugin stream runtime, including descriptor delivery, -heartbeat/load reporting, detection, and execution. +This command provides plugin job type handlers for cluster maintenance, +including descriptor delivery, heartbeat/load reporting, detection, and execution. Behavior: - - Use -jobType to choose one or more plugin job handlers (comma-separated list) + - Use -jobType to choose handlers by category or explicit name (comma-separated) + - Categories: "all" (every registered handler), "default" (lightweight jobs), + "heavy" (resource-intensive jobs like erasure coding) + - Explicit job type names and aliases are still supported (e.g. "vacuum", "ec") + - Categories and explicit names can be mixed (e.g. "default,iceberg") - Use -workingDir to persist worker.id for stable worker identity across restarts - Use -metricsPort/-metricsIp to expose /health, /ready, and /metrics Examples: weed worker -admin=localhost:23646 - weed worker -admin=localhost:23646 -jobType=volume_balance + weed worker -admin=localhost:23646 -jobType=all + weed worker -admin=localhost:23646 -jobType=default + weed worker -admin=localhost:23646 -jobType=heavy + weed worker -admin=localhost:23646 -jobType=default,iceberg weed worker -admin=localhost:23646 -jobType=vacuum,volume_balance weed worker -admin=localhost:23646 -jobType=erasure_coding - weed worker -admin=localhost:23646 -jobType=admin_script weed worker -admin=admin.example.com:23646 -id=plugin-vacuum-a -heartbeat=10s weed worker -admin=localhost:23646 -workingDir=/var/lib/seaweedfs-plugin weed worker -admin=localhost:23646 -metricsPort=9327 -metricsIp=0.0.0.0 @@ -36,7 +41,7 @@ var ( workerAdminServer = cmdWorker.Flag.String("admin", "localhost:23646", "admin server address") workerID = cmdWorker.Flag.String("id", "", "worker ID (auto-generated when empty)") workerWorkingDir = cmdWorker.Flag.String("workingDir", "", "working directory for persistent worker state") - workerJobType = cmdWorker.Flag.String("jobType", defaultPluginWorkerJobTypes, "job types to serve (comma-separated list)") + workerJobType = cmdWorker.Flag.String("jobType", defaultPluginWorkerJobTypes, "job types or categories to serve: all, default, heavy, or explicit names/aliases such as ec, balance, iceberg (comma-separated)") workerHeartbeat = cmdWorker.Flag.Duration("heartbeat", 15*time.Second, "heartbeat interval") workerReconnect = cmdWorker.Flag.Duration("reconnect", 5*time.Second, "reconnect delay") workerMaxDetect = cmdWorker.Flag.Int("maxDetect", 1, "max concurrent detection requests") diff --git a/weed/command/worker_runtime.go b/weed/command/worker_runtime.go index c4387aea0..894edb552 100644 --- a/weed/command/worker_runtime.go +++ b/weed/command/worker_runtime.go @@ -15,7 +15,7 @@ import ( "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/seaweedfs/seaweedfs/weed/glog" pluginworker "github.com/seaweedfs/seaweedfs/weed/plugin/worker" - icebergworker "github.com/seaweedfs/seaweedfs/weed/plugin/worker/iceberg" + _ "github.com/seaweedfs/seaweedfs/weed/plugin/worker/handlers" // register all handler subpackages "github.com/seaweedfs/seaweedfs/weed/security" statsCollect "github.com/seaweedfs/seaweedfs/weed/stats" "github.com/seaweedfs/seaweedfs/weed/util" @@ -24,7 +24,7 @@ import ( "google.golang.org/grpc" ) -const defaultPluginWorkerJobTypes = "vacuum,volume_balance,erasure_coding,admin_script,iceberg_maintenance" +const defaultPluginWorkerJobTypes = "all" type pluginWorkerRunOptions struct { AdminServer string @@ -138,102 +138,37 @@ func resolvePluginWorkerID(explicitID string, workingDir string) (string, error) return worker.GenerateOrLoadWorkerID(workingDir) } -// buildPluginWorkerHandler constructs the JobHandler for the given job type. -// maxExecute is forwarded to handlers that use it to report their own -// MaxExecutionConcurrency in Capability for consistency and future-proofing. -// The scheduler's effective per-worker MaxExecutionConcurrency is derived from -// the worker-level configuration (e.g. WorkerOptions.MaxExecutionConcurrency), -// not directly from the handler's Capability. -func buildPluginWorkerHandler(jobType string, dialOption grpc.DialOption, maxExecute int, workingDir string) (pluginworker.JobHandler, error) { - canonicalJobType, err := canonicalPluginWorkerJobType(jobType) - if err != nil { - return nil, err - } - - switch canonicalJobType { - case "vacuum": - return pluginworker.NewVacuumHandler(dialOption, int32(maxExecute)), nil - case "volume_balance": - return pluginworker.NewVolumeBalanceHandler(dialOption), nil - case "erasure_coding": - return pluginworker.NewErasureCodingHandler(dialOption, workingDir), nil - case "admin_script": - return pluginworker.NewAdminScriptHandler(dialOption), nil - case "iceberg_maintenance": - return icebergworker.NewHandler(dialOption), nil - default: - return nil, fmt.Errorf("unsupported plugin job type %q", canonicalJobType) +// buildPluginWorkerHandlers resolves the comma-separated jobTypes string +// (which may contain category names like "all", "default", "heavy" and/or +// explicit job type names/aliases) into a deduplicated slice of JobHandlers. +func buildPluginWorkerHandlers(jobTypes string, dialOption grpc.DialOption, maxExecute int, workingDir string) ([]pluginworker.JobHandler, error) { + jobTypes = strings.TrimSpace(jobTypes) + if jobTypes == "" { + jobTypes = defaultPluginWorkerJobTypes } -} -// buildPluginWorkerHandlers constructs a deduplicated slice of JobHandlers for -// the comma-separated jobTypes string, forwarding maxExecute to each handler. -func buildPluginWorkerHandlers(jobTypes string, dialOption grpc.DialOption, maxExecute int, workingDir string) ([]pluginworker.JobHandler, error) { - parsedJobTypes, err := parsePluginWorkerJobTypes(jobTypes) + factories, err := pluginworker.ResolveHandlerFactories(jobTypes) if err != nil { return nil, err } - handlers := make([]pluginworker.JobHandler, 0, len(parsedJobTypes)) - for _, jobType := range parsedJobTypes { - handler, buildErr := buildPluginWorkerHandler(jobType, dialOption, maxExecute, workingDir) + opts := pluginworker.HandlerBuildOptions{ + GrpcDialOption: dialOption, + MaxExecute: maxExecute, + WorkingDir: workingDir, + } + + handlers := make([]pluginworker.JobHandler, 0, len(factories)) + for _, f := range factories { + handler, buildErr := f.Build(opts) if buildErr != nil { - return nil, buildErr + return nil, fmt.Errorf("building handler for %q: %w", f.JobType, buildErr) } handlers = append(handlers, handler) } return handlers, nil } -func parsePluginWorkerJobTypes(jobTypes string) ([]string, error) { - jobTypes = strings.TrimSpace(jobTypes) - if jobTypes == "" { - return []string{"vacuum"}, nil - } - - parts := strings.Split(jobTypes, ",") - parsed := make([]string, 0, len(parts)) - seen := make(map[string]struct{}, len(parts)) - - for _, part := range parts { - part = strings.TrimSpace(part) - if part == "" { - continue - } - canonical, err := canonicalPluginWorkerJobType(part) - if err != nil { - return nil, err - } - if _, found := seen[canonical]; found { - continue - } - seen[canonical] = struct{}{} - parsed = append(parsed, canonical) - } - - if len(parsed) == 0 { - return []string{"vacuum"}, nil - } - return parsed, nil -} - -func canonicalPluginWorkerJobType(jobType string) (string, error) { - switch strings.ToLower(strings.TrimSpace(jobType)) { - case "", "vacuum": - return "vacuum", nil - case "volume_balance", "balance", "volume.balance", "volume-balance": - return "volume_balance", nil - case "erasure_coding", "erasure-coding", "erasure.coding", "ec": - return "erasure_coding", nil - case "admin_script", "admin-script", "admin.script", "script", "admin": - return "admin_script", nil - case "iceberg_maintenance", "iceberg-maintenance", "iceberg.maintenance", "iceberg": - return "iceberg_maintenance", nil - default: - return "", fmt.Errorf("unsupported plugin job type %q", jobType) - } -} - func resolvePluginWorkerAdminServer(adminServer string) string { adminServer = strings.TrimSpace(adminServer) host, httpPort, hasExplicitGrpcPort, err := parsePluginWorkerAdminAddress(adminServer) diff --git a/weed/command/worker_test.go b/weed/command/worker_test.go index d800e20ab..8533b8978 100644 --- a/weed/command/worker_test.go +++ b/weed/command/worker_test.go @@ -1,13 +1,21 @@ package command -import "testing" +import ( + "testing" + + pluginworker "github.com/seaweedfs/seaweedfs/weed/plugin/worker" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) func TestWorkerDefaultJobTypes(t *testing.T) { - jobTypes, err := parsePluginWorkerJobTypes(*workerJobType) + dialOption := grpc.WithTransportCredentials(insecure.NewCredentials()) + handlers, err := buildPluginWorkerHandlers(*workerJobType, dialOption, int(pluginworker.DefaultMaxExecutionConcurrency), "") if err != nil { - t.Fatalf("parsePluginWorkerJobTypes(default worker flag) err = %v", err) + t.Fatalf("buildPluginWorkerHandlers(default worker flag) err = %v", err) } - if len(jobTypes) != 5 { - t.Fatalf("expected default worker job types to include 5 handlers, got %v", jobTypes) + // Expected: vacuum, volume_balance, admin_script, erasure_coding, iceberg_maintenance + if len(handlers) != 5 { + t.Fatalf("expected default worker job types to include 5 handlers, got %d", len(handlers)) } } diff --git a/weed/plugin/worker/admin_script_handler.go b/weed/plugin/worker/admin_script_handler.go index 348e2c0b8..5338a1162 100644 --- a/weed/plugin/worker/admin_script_handler.go +++ b/weed/plugin/worker/admin_script_handler.go @@ -33,6 +33,17 @@ s3.clean.uploads -timeAgo=24h` var adminScriptTokenRegex = regexp.MustCompile(`'.*?'|".*?"|\S+`) +func init() { + RegisterHandler(HandlerFactory{ + JobType: "admin_script", + Category: CategoryDefault, + Aliases: []string{"admin-script", "admin.script", "script", "admin"}, + Build: func(opts HandlerBuildOptions) (JobHandler, error) { + return NewAdminScriptHandler(opts.GrpcDialOption), nil + }, + }) +} + type AdminScriptHandler struct { grpcDialOption grpc.DialOption } diff --git a/weed/plugin/worker/erasure_coding_handler.go b/weed/plugin/worker/erasure_coding_handler.go index df4959f17..881181ae6 100644 --- a/weed/plugin/worker/erasure_coding_handler.go +++ b/weed/plugin/worker/erasure_coding_handler.go @@ -20,6 +20,17 @@ import ( "google.golang.org/protobuf/proto" ) +func init() { + RegisterHandler(HandlerFactory{ + JobType: "erasure_coding", + Category: CategoryHeavy, + Aliases: []string{"erasure-coding", "erasure.coding", "ec"}, + Build: func(opts HandlerBuildOptions) (JobHandler, error) { + return NewErasureCodingHandler(opts.GrpcDialOption, opts.WorkingDir), nil + }, + }) +} + type erasureCodingWorkerConfig struct { TaskConfig *erasurecodingtask.Config MinIntervalSeconds int diff --git a/weed/plugin/worker/handler_registry.go b/weed/plugin/worker/handler_registry.go new file mode 100644 index 000000000..029382c06 --- /dev/null +++ b/weed/plugin/worker/handler_registry.go @@ -0,0 +1,130 @@ +package pluginworker + +import ( + "fmt" + "strings" + "sync" + + "google.golang.org/grpc" +) + +// JobCategory groups job types by resource profile so that workers can be +// configured with a category name instead of an explicit list of job types. +type JobCategory string + +const ( + CategoryAll JobCategory = "all" // pseudo-category matching every handler + CategoryDefault JobCategory = "default" // lightweight, safe for any worker + CategoryHeavy JobCategory = "heavy" // resource-intensive jobs +) + +// HandlerFactory describes how to build a JobHandler for a single job type. +type HandlerFactory struct { + // JobType is the canonical job type string (e.g. "vacuum"). + JobType string + // Category controls which category label selects this handler. + Category JobCategory + // Aliases are alternative CLI names that resolve to this job type + // (e.g. "ec" for "erasure_coding"). + Aliases []string + // Build constructs the JobHandler. + Build func(opts HandlerBuildOptions) (JobHandler, error) +} + +// HandlerBuildOptions carries parameters forwarded from the CLI to handler +// constructors. +type HandlerBuildOptions struct { + GrpcDialOption grpc.DialOption + MaxExecute int + WorkingDir string +} + +var ( + registryMu sync.Mutex + registry []HandlerFactory +) + +// RegisterHandler adds a handler factory to the global registry. +// It is intended to be called from handler init() functions. +func RegisterHandler(f HandlerFactory) { + registryMu.Lock() + defer registryMu.Unlock() + registry = append(registry, f) +} + +// ResolveHandlerFactories takes a comma-separated token list that can contain +// category names ("all", "default", "heavy") and/or explicit job type names +// (including aliases). It returns a deduplicated, ordered slice of factories. +func ResolveHandlerFactories(tokens string) ([]HandlerFactory, error) { + registryMu.Lock() + snapshot := make([]HandlerFactory, len(registry)) + copy(snapshot, registry) + registryMu.Unlock() + + parts := strings.Split(tokens, ",") + result := make([]HandlerFactory, 0, len(snapshot)) + seen := make(map[string]bool) + + for _, raw := range parts { + tok := strings.ToLower(strings.TrimSpace(raw)) + if tok == "" { + continue + } + + if cat, ok := tokenAsCategory(tok); ok { + for _, f := range snapshot { + if cat == CategoryAll || f.Category == cat { + if !seen[f.JobType] { + seen[f.JobType] = true + result = append(result, f) + } + } + } + continue + } + + f, err := findFactory(snapshot, tok) + if err != nil { + return nil, err + } + if !seen[f.JobType] { + seen[f.JobType] = true + result = append(result, f) + } + } + + if len(result) == 0 { + return nil, fmt.Errorf("no job types resolved from %q", tokens) + } + return result, nil +} + +// tokenAsCategory returns the category and true when tok is a known category +// keyword. "all" is treated as a special pseudo-category that matches every +// registered handler. +func tokenAsCategory(tok string) (JobCategory, bool) { + switch tok { + case string(CategoryAll): + return CategoryAll, true + case string(CategoryDefault): + return CategoryDefault, true + case string(CategoryHeavy): + return CategoryHeavy, true + default: + return "", false + } +} + +func findFactory(factories []HandlerFactory, tok string) (HandlerFactory, error) { + for _, f := range factories { + if strings.EqualFold(f.JobType, tok) { + return f, nil + } + for _, alias := range f.Aliases { + if strings.EqualFold(alias, tok) { + return f, nil + } + } + } + return HandlerFactory{}, fmt.Errorf("unknown job type %q", tok) +} diff --git a/weed/plugin/worker/handlers/handlers.go b/weed/plugin/worker/handlers/handlers.go new file mode 100644 index 000000000..fe7990e5b --- /dev/null +++ b/weed/plugin/worker/handlers/handlers.go @@ -0,0 +1,9 @@ +// Package handlers is an aggregator that blank-imports every plugin worker +// handler subpackage so their init() functions register with the handler +// registry. Import this package instead of individual subpackages when you +// need all handlers available. +package handlers + +import ( + _ "github.com/seaweedfs/seaweedfs/weed/plugin/worker/iceberg" // register iceberg_maintenance handler +) diff --git a/weed/plugin/worker/iceberg/handler.go b/weed/plugin/worker/iceberg/handler.go index 071fe73f8..fcdbc1511 100644 --- a/weed/plugin/worker/iceberg/handler.go +++ b/weed/plugin/worker/iceberg/handler.go @@ -14,6 +14,17 @@ import ( "google.golang.org/protobuf/types/known/timestamppb" ) +func init() { + pluginworker.RegisterHandler(pluginworker.HandlerFactory{ + JobType: jobType, + Category: pluginworker.CategoryHeavy, + Aliases: []string{"iceberg-maintenance", "iceberg.maintenance", "iceberg"}, + Build: func(opts pluginworker.HandlerBuildOptions) (pluginworker.JobHandler, error) { + return NewHandler(opts.GrpcDialOption), nil + }, + }) +} + // Handler implements the JobHandler interface for Iceberg table maintenance: // snapshot expiration, orphan file removal, and manifest rewriting. type Handler struct { diff --git a/weed/plugin/worker/vacuum_handler.go b/weed/plugin/worker/vacuum_handler.go index 3012cc450..c520e7efc 100644 --- a/weed/plugin/worker/vacuum_handler.go +++ b/weed/plugin/worker/vacuum_handler.go @@ -23,6 +23,17 @@ const ( DefaultMaxExecutionConcurrency = int32(2) ) +func init() { + RegisterHandler(HandlerFactory{ + JobType: "vacuum", + Category: CategoryDefault, + Aliases: []string{"vol.vacuum", "volume.vacuum"}, + Build: func(opts HandlerBuildOptions) (JobHandler, error) { + return NewVacuumHandler(opts.GrpcDialOption, int32(opts.MaxExecute)), nil + }, + }) +} + // VacuumHandler is the plugin job handler for vacuum job type. type VacuumHandler struct { grpcDialOption grpc.DialOption diff --git a/weed/plugin/worker/volume_balance_handler.go b/weed/plugin/worker/volume_balance_handler.go index dba90e9d3..d15fc6de0 100644 --- a/weed/plugin/worker/volume_balance_handler.go +++ b/weed/plugin/worker/volume_balance_handler.go @@ -21,6 +21,17 @@ const ( defaultBalanceTimeoutSeconds = int32(10 * 60) ) +func init() { + RegisterHandler(HandlerFactory{ + JobType: "volume_balance", + Category: CategoryDefault, + Aliases: []string{"balance", "volume.balance", "volume-balance"}, + Build: func(opts HandlerBuildOptions) (JobHandler, error) { + return NewVolumeBalanceHandler(opts.GrpcDialOption), nil + }, + }) +} + type volumeBalanceWorkerConfig struct { TaskConfig *balancetask.Config MinIntervalSeconds int