diff --git a/.gitignore b/.gitignore index b895a8f08..10bc81f63 100644 --- a/.gitignore +++ b/.gitignore @@ -133,3 +133,7 @@ test/s3/remote_cache/primary-server.pid /test/erasure_coding/filerldb2 /test/s3/cors/test-mini-data /test/s3/filer_group/test-volume-data + +# ID and PID files +*.id +*.pid diff --git a/k8s/charts/seaweedfs/values.yaml b/k8s/charts/seaweedfs/values.yaml index c4fd3a841..a94d7f183 100644 --- a/k8s/charts/seaweedfs/values.yaml +++ b/k8s/charts/seaweedfs/values.yaml @@ -1306,7 +1306,7 @@ worker: extraEnvironmentVars: {} # Health checks for worker pods - # Workers expose metrics on the metricsPort with a /health endpoint for readiness checks. + # Workers expose /health (liveness) and /ready (readiness) endpoints on the metricsPort livenessProbe: enabled: true httpGet: @@ -1321,7 +1321,7 @@ worker: readinessProbe: enabled: true httpGet: - path: /health + path: /ready port: metrics initialDelaySeconds: 20 periodSeconds: 15 diff --git a/weed/command/worker.go b/weed/command/worker.go index 84ea55a0d..1ff6678a0 100644 --- a/weed/command/worker.go +++ b/weed/command/worker.go @@ -1,6 +1,7 @@ package command import ( + "net/http" "os" "os/signal" "path/filepath" @@ -13,6 +14,7 @@ import ( statsCollect "github.com/seaweedfs/seaweedfs/weed/stats" "github.com/seaweedfs/seaweedfs/weed/util" "github.com/seaweedfs/seaweedfs/weed/util/grace" + "github.com/seaweedfs/seaweedfs/weed/util/version" "github.com/seaweedfs/seaweedfs/weed/worker" "github.com/seaweedfs/seaweedfs/weed/worker/tasks" "github.com/seaweedfs/seaweedfs/weed/worker/types" @@ -24,6 +26,7 @@ import ( // TODO: Implement additional task packages (add to default capabilities when ready): // _ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/remote" - for uploading volumes to remote/cloud storage // _ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/replication" - for fixing replication issues and maintaining data consistency + "github.com/prometheus/client_golang/prometheus/promhttp" ) var cmdWorker = &Command{ @@ -57,6 +60,8 @@ var ( workerMetricsIp = cmdWorker.Flag.String("metricsIp", "0.0.0.0", "Prometheus metrics listen IP") workerDebug = cmdWorker.Flag.Bool("debug", false, "serves runtime profiling data via pprof on the port specified by -debug.port") workerDebugPort = cmdWorker.Flag.Int("debug.port", 6060, "http port for debugging") + + workerServerHeader = "SeaweedFS Worker " + version.VERSION ) func init() { @@ -257,8 +262,33 @@ type WorkerStatus struct { TasksFailed int `json:"tasks_failed"` } -// startWorkerMetricsServer starts the HTTP metrics server for the worker -func startWorkerMetricsServer(ip string, port int, _ *worker.Worker) { - // Use the standard SeaweedFS metrics server for consistency with other components - statsCollect.StartMetricsServer(ip, port) +func workerHealthHandler(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Server", workerServerHeader) + w.WriteHeader(http.StatusOK) +} + +func workerReadyHandler(workerInstance *worker.Worker) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Server", workerServerHeader) + + admin := workerInstance.GetAdmin() + if admin == nil || !admin.IsConnected() { + w.WriteHeader(http.StatusServiceUnavailable) + return + } + + w.WriteHeader(http.StatusOK) + } +} + +func startWorkerMetricsServer(ip string, port int, w *worker.Worker) { + mux := http.NewServeMux() + mux.HandleFunc("/health", workerHealthHandler) + mux.HandleFunc("/ready", workerReadyHandler(w)) + mux.Handle("/metrics", promhttp.HandlerFor(statsCollect.Gather, promhttp.HandlerOpts{})) + + glog.V(0).Infof("Starting worker metrics server at %s", statsCollect.JoinHostPort(ip, port)) + if err := http.ListenAndServe(statsCollect.JoinHostPort(ip, port), mux); err != nil { + glog.Errorf("Worker metrics server failed to start: %v", err) + } } diff --git a/weed/worker/worker.go b/weed/worker/worker.go index bbd1f4662..97e6e7a1e 100644 --- a/weed/worker/worker.go +++ b/weed/worker/worker.go @@ -896,6 +896,10 @@ func (w *Worker) GetPerformanceMetrics() *types.WorkerPerformance { } } +func (w *Worker) GetAdmin() AdminClient { + return w.getAdmin() +} + // messageProcessingLoop processes incoming admin messages func (w *Worker) messageProcessingLoop() { glog.Infof("MESSAGE LOOP STARTED: Worker %s message processing loop started", w.id)