From 88ed187c276e5dd021b4edcb54d9384f93b7ed45 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Tue, 23 Dec 2025 11:46:34 -0800 Subject: [PATCH] fix(worker): add metrics HTTP server and health checks for Kubernetes (#7860) * feat(worker): add metrics HTTP server and debug profiling support - Add -metricsPort flag to enable Prometheus metrics endpoint - Add -metricsIp flag to configure metrics server bind address - Implement /metrics endpoint for Prometheus-compatible metrics - Implement /health endpoint for Kubernetes readiness/liveness probes - Add -debug flag to enable pprof debugging server - Add -debug.port flag to configure debug server port - Fix stats package import naming conflict by using alias - Update usage examples to show new flags Fixes #7843 * feat(helm): add worker metrics and health check support - Update worker readiness probe to use httpGet on /health endpoint - Update worker liveness probe to use httpGet on /health endpoint - Add metricsPort flag to worker command in deployment template - Support both httpGet and tcpSocket probe types for backward compatibility - Update values.yaml with health check configuration This enables Kubernetes pod lifecycle management for worker components through proper health checks on the new metrics HTTP endpoint. * feat(mini): align all services to share single debug and metrics servers - Disable S3's separate debug server in mini mode (port 6060 now shared by all) - Add metrics server startup to embedded worker for health monitoring - All services now share the single metrics port (9327) and single debug port (6060) - Consistent pattern with master, filer, volume, webdav services * fix(worker): fix variable shadowing in health check handler - Rename http.ResponseWriter parameter from 'w' to 'rw' to avoid shadowing the outer 'w *worker.Worker' parameter - Prevents potential bugs if future code tries to use worker state in handler - Improves code clarity and follows Go best practices * fix(worker): remove unused worker parameter in metrics server - Change 'w *worker.Worker' parameter to '_' as it's not used - Clarifies intent that parameter is intentionally unused - Follows Go best practices and improves code clarity * fix(helm): fix trailing backslash syntax errors in worker command - Fix conditional backslash placement to prevent shell syntax errors - Only add backslash when metricsPort OR extraArgs are present - Prevents worker pod startup failures due to malformed command arguments - Ensures proper shell command parsing regardless of configuration state * refactor(worker): use standard stats.StartMetricsServer for consistency - Replace custom metrics server implementation with stats.StartMetricsServer to match pattern used in master, volume, s3, filer_sync components - Simplifies code and improves maintainability - Uses glog.Fatal for errors (consistent with other SeaweedFS components) - Remove unused net/http and prometheus/promhttp imports - Automatically provides /metrics and /health endpoints via standard implementation --- .../templates/worker/worker-deployment.yaml | 21 ++++++++++++---- k8s/charts/seaweedfs/values.yaml | 8 +++--- weed/command/mini.go | 9 +++++-- weed/command/worker.go | 25 ++++++++++++++++++- 4 files changed, 52 insertions(+), 11 deletions(-) diff --git a/k8s/charts/seaweedfs/templates/worker/worker-deployment.yaml b/k8s/charts/seaweedfs/templates/worker/worker-deployment.yaml index 60f608702..d6b94564c 100644 --- a/k8s/charts/seaweedfs/templates/worker/worker-deployment.yaml +++ b/k8s/charts/seaweedfs/templates/worker/worker-deployment.yaml @@ -138,7 +138,10 @@ spec: {{- end }} -capabilities={{ .Values.worker.capabilities }} \ -maxConcurrent={{ .Values.worker.maxConcurrent }} \ - -workingDir={{ .Values.worker.workingDir }}{{- if .Values.worker.extraArgs }} \{{ end }} + -workingDir={{ .Values.worker.workingDir }}{{- if or .Values.worker.metricsPort .Values.worker.extraArgs }} \{{ end }} + {{- if .Values.worker.metricsPort }} + -metricsPort={{ .Values.worker.metricsPort }}{{- if .Values.worker.extraArgs }} \{{ end }} + {{- end }} {{- range $index, $arg := .Values.worker.extraArgs }} {{ $arg }}{{- if lt $index (sub (len $.Values.worker.extraArgs) 1) }} \{{ end }} {{- end }} @@ -187,9 +190,13 @@ spec: {{- end }} {{- if .Values.worker.livenessProbe.enabled }} livenessProbe: - {{- with .Values.worker.livenessProbe.tcpSocket }} + {{- if .Values.worker.livenessProbe.httpGet }} + httpGet: + path: {{ .Values.worker.livenessProbe.httpGet.path }} + port: {{ .Values.worker.livenessProbe.httpGet.port }} + {{- else if .Values.worker.livenessProbe.tcpSocket }} tcpSocket: - port: {{ .port }} + port: {{ .Values.worker.livenessProbe.tcpSocket.port }} {{- end }} initialDelaySeconds: {{ .Values.worker.livenessProbe.initialDelaySeconds }} periodSeconds: {{ .Values.worker.livenessProbe.periodSeconds }} @@ -199,9 +206,13 @@ spec: {{- end }} {{- if .Values.worker.readinessProbe.enabled }} readinessProbe: - {{- with .Values.worker.readinessProbe.tcpSocket }} + {{- if .Values.worker.readinessProbe.httpGet }} + httpGet: + path: {{ .Values.worker.readinessProbe.httpGet.path }} + port: {{ .Values.worker.readinessProbe.httpGet.port }} + {{- else if .Values.worker.readinessProbe.tcpSocket }} tcpSocket: - port: {{ .port }} + port: {{ .Values.worker.readinessProbe.tcpSocket.port }} {{- end }} initialDelaySeconds: {{ .Values.worker.readinessProbe.initialDelaySeconds }} periodSeconds: {{ .Values.worker.readinessProbe.periodSeconds }} diff --git a/k8s/charts/seaweedfs/values.yaml b/k8s/charts/seaweedfs/values.yaml index b03e66c40..dd14f1ca0 100644 --- a/k8s/charts/seaweedfs/values.yaml +++ b/k8s/charts/seaweedfs/values.yaml @@ -1302,10 +1302,11 @@ worker: extraEnvironmentVars: {} # Health checks for worker pods - # Since workers do not have an HTTP endpoint, a tcpSocket probe on the metrics port is recommended. + # Workers expose metrics on the metricsPort with a /health endpoint for readiness checks. livenessProbe: enabled: true - tcpSocket: + httpGet: + path: /health port: metrics initialDelaySeconds: 30 periodSeconds: 60 @@ -1315,7 +1316,8 @@ worker: readinessProbe: enabled: true - tcpSocket: + httpGet: + path: /health port: metrics initialDelaySeconds: 20 periodSeconds: 15 diff --git a/weed/command/mini.go b/weed/command/mini.go index d52dc1c21..e7bdd5625 100644 --- a/weed/command/mini.go +++ b/weed/command/mini.go @@ -233,8 +233,9 @@ func initMiniS3Flags() { miniS3Options.iamConfig = miniIamConfig miniS3Options.auditLogConfig = cmdMini.Flag.String("s3.auditLogConfig", "", "path to the audit log config file") miniS3Options.allowDeleteBucketNotEmpty = miniS3AllowDeleteBucketNotEmpty - miniS3Options.debug = cmdMini.Flag.Bool("s3.debug", false, "serves runtime profiling data via pprof") - miniS3Options.debugPort = cmdMini.Flag.Int("s3.debug.port", 6060, "http port for debugging") + // In mini mode, S3 uses the shared debug server started at line 681, not its own separate debug server + miniS3Options.debug = new(bool) // explicitly false + miniS3Options.debugPort = cmdMini.Flag.Int("s3.debug.port", 6060, "http port for debugging (unused in mini mode)") } // initMiniWebDAVFlags initializes WebDAV server flag options @@ -1060,6 +1061,10 @@ func startMiniWorker() { // Set admin client workerInstance.SetAdminClient(adminClient) + // Start metrics server for health checks and monitoring (uses shared metrics port like other services) + // This allows Kubernetes probes to check worker health via /health endpoint + go stats_collect.StartMetricsServer(*miniMetricsHttpIp, *miniMetricsHttpPort) + // Start the worker err = workerInstance.Start() if err != nil { diff --git a/weed/command/worker.go b/weed/command/worker.go index 7b14dab8d..84ea55a0d 100644 --- a/weed/command/worker.go +++ b/weed/command/worker.go @@ -10,7 +10,9 @@ import ( "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/security" + statsCollect "github.com/seaweedfs/seaweedfs/weed/stats" "github.com/seaweedfs/seaweedfs/weed/util" + "github.com/seaweedfs/seaweedfs/weed/util/grace" "github.com/seaweedfs/seaweedfs/weed/worker" "github.com/seaweedfs/seaweedfs/weed/worker/tasks" "github.com/seaweedfs/seaweedfs/weed/worker/types" @@ -25,7 +27,7 @@ import ( ) var cmdWorker = &Command{ - UsageLine: "worker -admin= [-capabilities=] [-maxConcurrent=] [-workingDir=]", + UsageLine: "worker -admin= [-capabilities=] [-maxConcurrent=] [-workingDir=] [-metricsPort=] [-debug]", Short: "start a maintenance worker to process cluster maintenance tasks", Long: `Start a maintenance worker that connects to an admin server to process maintenance tasks like vacuum, erasure coding, remote upload, and replication fixes. @@ -39,6 +41,8 @@ Examples: weed worker -admin=localhost:23646 -capabilities=vacuum,replication weed worker -admin=localhost:23646 -maxConcurrent=4 weed worker -admin=localhost:23646 -workingDir=/tmp/worker + weed worker -admin=localhost:23646 -metricsPort=9327 + weed worker -admin=localhost:23646 -debug -debug.port=6060 `, } @@ -49,6 +53,10 @@ var ( workerHeartbeatInterval = cmdWorker.Flag.Duration("heartbeat", 30*time.Second, "heartbeat interval") workerTaskRequestInterval = cmdWorker.Flag.Duration("taskInterval", 5*time.Second, "task request interval") workerWorkingDir = cmdWorker.Flag.String("workingDir", "", "working directory for the worker") + workerMetricsPort = cmdWorker.Flag.Int("metricsPort", 0, "Prometheus metrics listen port") + workerMetricsIp = cmdWorker.Flag.String("metricsIp", "0.0.0.0", "Prometheus metrics listen IP") + workerDebug = cmdWorker.Flag.Bool("debug", false, "serves runtime profiling data via pprof on the port specified by -debug.port") + workerDebugPort = cmdWorker.Flag.Int("debug.port", 6060, "http port for debugging") ) func init() { @@ -60,6 +68,10 @@ func init() { } func runWorker(cmd *Command, args []string) bool { + if *workerDebug { + grace.StartDebugServer(*workerDebugPort) + } + util.LoadConfiguration("security", false) glog.Infof("Starting maintenance worker") @@ -153,6 +165,11 @@ func runWorker(cmd *Command, args []string) bool { glog.Infof("Current working directory: %s", wd) } + // Start metrics HTTP server if port is specified + if *workerMetricsPort > 0 { + go startWorkerMetricsServer(*workerMetricsIp, *workerMetricsPort, workerInstance) + } + // Start the worker err = workerInstance.Start() if err != nil { @@ -239,3 +256,9 @@ type WorkerStatus struct { TasksCompleted int `json:"tasks_completed"` TasksFailed int `json:"tasks_failed"` } + +// startWorkerMetricsServer starts the HTTP metrics server for the worker +func startWorkerMetricsServer(ip string, port int, _ *worker.Worker) { + // Use the standard SeaweedFS metrics server for consistency with other components + statsCollect.StartMetricsServer(ip, port) +}