mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-01-09 11:21:11 +08:00
feat: Add probes to worker service (#7896)
* feat: Add probes to worker service * feat: Add probes to worker service * Merge branch 'master' into pr/7896 * refactor --------- Co-authored-by: Chris Lu <chris.lu@gmail.com>
This commit is contained in:
4
.gitignore
vendored
4
.gitignore
vendored
@@ -133,3 +133,7 @@ test/s3/remote_cache/primary-server.pid
|
||||
/test/erasure_coding/filerldb2
|
||||
/test/s3/cors/test-mini-data
|
||||
/test/s3/filer_group/test-volume-data
|
||||
|
||||
# ID and PID files
|
||||
*.id
|
||||
*.pid
|
||||
|
||||
@@ -1306,7 +1306,7 @@ worker:
|
||||
extraEnvironmentVars: {}
|
||||
|
||||
# Health checks for worker pods
|
||||
# Workers expose metrics on the metricsPort with a /health endpoint for readiness checks.
|
||||
# Workers expose /health (liveness) and /ready (readiness) endpoints on the metricsPort
|
||||
livenessProbe:
|
||||
enabled: true
|
||||
httpGet:
|
||||
@@ -1321,7 +1321,7 @@ worker:
|
||||
readinessProbe:
|
||||
enabled: true
|
||||
httpGet:
|
||||
path: /health
|
||||
path: /ready
|
||||
port: metrics
|
||||
initialDelaySeconds: 20
|
||||
periodSeconds: 15
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package command
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"os"
|
||||
"os/signal"
|
||||
"path/filepath"
|
||||
@@ -13,6 +14,7 @@ import (
|
||||
statsCollect "github.com/seaweedfs/seaweedfs/weed/stats"
|
||||
"github.com/seaweedfs/seaweedfs/weed/util"
|
||||
"github.com/seaweedfs/seaweedfs/weed/util/grace"
|
||||
"github.com/seaweedfs/seaweedfs/weed/util/version"
|
||||
"github.com/seaweedfs/seaweedfs/weed/worker"
|
||||
"github.com/seaweedfs/seaweedfs/weed/worker/tasks"
|
||||
"github.com/seaweedfs/seaweedfs/weed/worker/types"
|
||||
@@ -24,6 +26,7 @@ import (
|
||||
// TODO: Implement additional task packages (add to default capabilities when ready):
|
||||
// _ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/remote" - for uploading volumes to remote/cloud storage
|
||||
// _ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/replication" - for fixing replication issues and maintaining data consistency
|
||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||
)
|
||||
|
||||
var cmdWorker = &Command{
|
||||
@@ -57,6 +60,8 @@ var (
|
||||
workerMetricsIp = cmdWorker.Flag.String("metricsIp", "0.0.0.0", "Prometheus metrics listen IP")
|
||||
workerDebug = cmdWorker.Flag.Bool("debug", false, "serves runtime profiling data via pprof on the port specified by -debug.port")
|
||||
workerDebugPort = cmdWorker.Flag.Int("debug.port", 6060, "http port for debugging")
|
||||
|
||||
workerServerHeader = "SeaweedFS Worker " + version.VERSION
|
||||
)
|
||||
|
||||
func init() {
|
||||
@@ -257,8 +262,33 @@ type WorkerStatus struct {
|
||||
TasksFailed int `json:"tasks_failed"`
|
||||
}
|
||||
|
||||
// startWorkerMetricsServer starts the HTTP metrics server for the worker
|
||||
func startWorkerMetricsServer(ip string, port int, _ *worker.Worker) {
|
||||
// Use the standard SeaweedFS metrics server for consistency with other components
|
||||
statsCollect.StartMetricsServer(ip, port)
|
||||
func workerHealthHandler(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Server", workerServerHeader)
|
||||
w.WriteHeader(http.StatusOK)
|
||||
}
|
||||
|
||||
func workerReadyHandler(workerInstance *worker.Worker) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Server", workerServerHeader)
|
||||
|
||||
admin := workerInstance.GetAdmin()
|
||||
if admin == nil || !admin.IsConnected() {
|
||||
w.WriteHeader(http.StatusServiceUnavailable)
|
||||
return
|
||||
}
|
||||
|
||||
w.WriteHeader(http.StatusOK)
|
||||
}
|
||||
}
|
||||
|
||||
func startWorkerMetricsServer(ip string, port int, w *worker.Worker) {
|
||||
mux := http.NewServeMux()
|
||||
mux.HandleFunc("/health", workerHealthHandler)
|
||||
mux.HandleFunc("/ready", workerReadyHandler(w))
|
||||
mux.Handle("/metrics", promhttp.HandlerFor(statsCollect.Gather, promhttp.HandlerOpts{}))
|
||||
|
||||
glog.V(0).Infof("Starting worker metrics server at %s", statsCollect.JoinHostPort(ip, port))
|
||||
if err := http.ListenAndServe(statsCollect.JoinHostPort(ip, port), mux); err != nil {
|
||||
glog.Errorf("Worker metrics server failed to start: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -896,6 +896,10 @@ func (w *Worker) GetPerformanceMetrics() *types.WorkerPerformance {
|
||||
}
|
||||
}
|
||||
|
||||
func (w *Worker) GetAdmin() AdminClient {
|
||||
return w.getAdmin()
|
||||
}
|
||||
|
||||
// messageProcessingLoop processes incoming admin messages
|
||||
func (w *Worker) messageProcessingLoop() {
|
||||
glog.Infof("MESSAGE LOOP STARTED: Worker %s message processing loop started", w.id)
|
||||
|
||||
Reference in New Issue
Block a user