Browse Source

Merge branch 'master' of https://github.com/seaweedfs/seaweedfs

pull/2932/merge
Chris Lu 3 days ago
parent
commit
694218dc13
  1. 4
      .gitignore
  2. 4
      k8s/charts/seaweedfs/values.yaml
  3. 38
      weed/command/worker.go
  4. 4
      weed/worker/worker.go

4
.gitignore

@ -133,3 +133,7 @@ test/s3/remote_cache/primary-server.pid
/test/erasure_coding/filerldb2
/test/s3/cors/test-mini-data
/test/s3/filer_group/test-volume-data
# ID and PID files
*.id
*.pid

4
k8s/charts/seaweedfs/values.yaml

@ -1306,7 +1306,7 @@ worker:
extraEnvironmentVars: {}
# Health checks for worker pods
# Workers expose metrics on the metricsPort with a /health endpoint for readiness checks.
# Workers expose /health (liveness) and /ready (readiness) endpoints on the metricsPort
livenessProbe:
enabled: true
httpGet:
@ -1321,7 +1321,7 @@ worker:
readinessProbe:
enabled: true
httpGet:
path: /health
path: /ready
port: metrics
initialDelaySeconds: 20
periodSeconds: 15

38
weed/command/worker.go

@ -1,6 +1,7 @@
package command
import (
"net/http"
"os"
"os/signal"
"path/filepath"
@ -13,6 +14,7 @@ import (
statsCollect "github.com/seaweedfs/seaweedfs/weed/stats"
"github.com/seaweedfs/seaweedfs/weed/util"
"github.com/seaweedfs/seaweedfs/weed/util/grace"
"github.com/seaweedfs/seaweedfs/weed/util/version"
"github.com/seaweedfs/seaweedfs/weed/worker"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
@ -24,6 +26,7 @@ import (
// TODO: Implement additional task packages (add to default capabilities when ready):
// _ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/remote" - for uploading volumes to remote/cloud storage
// _ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/replication" - for fixing replication issues and maintaining data consistency
"github.com/prometheus/client_golang/prometheus/promhttp"
)
var cmdWorker = &Command{
@ -57,6 +60,8 @@ var (
workerMetricsIp = cmdWorker.Flag.String("metricsIp", "0.0.0.0", "Prometheus metrics listen IP")
workerDebug = cmdWorker.Flag.Bool("debug", false, "serves runtime profiling data via pprof on the port specified by -debug.port")
workerDebugPort = cmdWorker.Flag.Int("debug.port", 6060, "http port for debugging")
workerServerHeader = "SeaweedFS Worker " + version.VERSION
)
func init() {
@ -257,8 +262,33 @@ type WorkerStatus struct {
TasksFailed int `json:"tasks_failed"`
}
// startWorkerMetricsServer starts the HTTP metrics server for the worker
func startWorkerMetricsServer(ip string, port int, _ *worker.Worker) {
// Use the standard SeaweedFS metrics server for consistency with other components
statsCollect.StartMetricsServer(ip, port)
func workerHealthHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Server", workerServerHeader)
w.WriteHeader(http.StatusOK)
}
func workerReadyHandler(workerInstance *worker.Worker) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Server", workerServerHeader)
admin := workerInstance.GetAdmin()
if admin == nil || !admin.IsConnected() {
w.WriteHeader(http.StatusServiceUnavailable)
return
}
w.WriteHeader(http.StatusOK)
}
}
func startWorkerMetricsServer(ip string, port int, w *worker.Worker) {
mux := http.NewServeMux()
mux.HandleFunc("/health", workerHealthHandler)
mux.HandleFunc("/ready", workerReadyHandler(w))
mux.Handle("/metrics", promhttp.HandlerFor(statsCollect.Gather, promhttp.HandlerOpts{}))
glog.V(0).Infof("Starting worker metrics server at %s", statsCollect.JoinHostPort(ip, port))
if err := http.ListenAndServe(statsCollect.JoinHostPort(ip, port), mux); err != nil {
glog.Errorf("Worker metrics server failed to start: %v", err)
}
}

4
weed/worker/worker.go

@ -896,6 +896,10 @@ func (w *Worker) GetPerformanceMetrics() *types.WorkerPerformance {
}
}
func (w *Worker) GetAdmin() AdminClient {
return w.getAdmin()
}
// messageProcessingLoop processes incoming admin messages
func (w *Worker) messageProcessingLoop() {
glog.Infof("MESSAGE LOOP STARTED: Worker %s message processing loop started", w.id)

Loading…
Cancel
Save