diff --git a/weed/s3api/stats.go b/weed/s3api/stats.go index cd04b9970..415ec55dd 100644 --- a/weed/s3api/stats.go +++ b/weed/s3api/stats.go @@ -11,6 +11,10 @@ import ( func track(f http.HandlerFunc, action string) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { + inFlightGauge := stats_collect.S3InFlightRequestsGauge.WithLabelValues(action) + inFlightGauge.Inc() + defer inFlightGauge.Dec() + bucket, _ := s3_constants.GetBucketAndObject(r) w.Header().Set("Server", "SeaweedFS "+util.VERSION) recorder := stats_collect.NewStatusResponseWriter(w) diff --git a/weed/server/filer_server_handlers.go b/weed/server/filer_server_handlers.go index 69774ce27..1c5c89dcf 100644 --- a/weed/server/filer_server_handlers.go +++ b/weed/server/filer_server_handlers.go @@ -21,6 +21,11 @@ import ( func (fs *FilerServer) filerHandler(w http.ResponseWriter, r *http.Request) { start := time.Now() + + inFlightGauge := stats.FilerInFlightRequestsGauge.WithLabelValues(r.Method) + inFlightGauge.Inc() + defer inFlightGauge.Dec() + statusRecorder := stats.NewStatusResponseWriter(w) w = statusRecorder origin := r.Header.Get("Origin") diff --git a/weed/server/volume_server_handlers.go b/weed/server/volume_server_handlers.go index 85fb9ba81..22ef0e1c8 100644 --- a/weed/server/volume_server_handlers.go +++ b/weed/server/volume_server_handlers.go @@ -31,6 +31,10 @@ security settings: */ func (vs *VolumeServer) privateStoreHandler(w http.ResponseWriter, r *http.Request) { + inFlightGauge := stats.VolumeServerInFlightRequestsGauge.WithLabelValues(r.Method) + inFlightGauge.Inc() + defer inFlightGauge.Dec() + statusRecorder := stats.NewStatusResponseWriter(w) w = statusRecorder w.Header().Set("Server", "SeaweedFS Volume "+util.VERSION) diff --git a/weed/stats/metrics.go b/weed/stats/metrics.go index 956bf4009..93f80c1f4 100644 --- a/weed/stats/metrics.go +++ b/weed/stats/metrics.go @@ -127,6 +127,14 @@ var ( Buckets: prometheus.ExponentialBuckets(0.0001, 2, 24), }, []string{"type"}) + FilerInFlightRequestsGauge = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: Namespace, + Subsystem: "filer", + Name: "in_flight_requests", + Help: "Current number of in-flight requests being handled by filer.", + }, []string{"type"}) + FilerServerLastSendTsOfSubscribeGauge = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: Namespace, @@ -210,6 +218,14 @@ var ( Buckets: prometheus.ExponentialBuckets(0.0001, 2, 24), }, []string{"type"}) + VolumeServerInFlightRequestsGauge = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: Namespace, + Subsystem: "volumeServer", + Name: "in_flight_requests", + Help: "Current number of in-flight requests being handled by volume server.", + }, []string{"type"}) + VolumeServerVolumeGauge = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: Namespace, @@ -280,6 +296,13 @@ var ( Help: "Bucketed histogram of s3 time to first byte request processing time.", Buckets: prometheus.ExponentialBuckets(0.001, 2, 27), }, []string{"type", "bucket"}) + S3InFlightRequestsGauge = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: Namespace, + Subsystem: "s3", + Name: "in_flight_requests", + Help: "Current number of in-flight requests being handled by s3.", + }, []string{"type"}) ) func init() { @@ -295,6 +318,7 @@ func init() { Gather.MustRegister(FilerRequestCounter) Gather.MustRegister(FilerHandlerCounter) Gather.MustRegister(FilerRequestHistogram) + Gather.MustRegister(FilerInFlightRequestsGauge) Gather.MustRegister(FilerStoreCounter) Gather.MustRegister(FilerStoreHistogram) Gather.MustRegister(FilerSyncOffsetGauge) @@ -305,6 +329,7 @@ func init() { Gather.MustRegister(VolumeServerRequestCounter) Gather.MustRegister(VolumeServerHandlerCounter) Gather.MustRegister(VolumeServerRequestHistogram) + Gather.MustRegister(VolumeServerInFlightRequestsGauge) Gather.MustRegister(VolumeServerVacuumingCompactCounter) Gather.MustRegister(VolumeServerVacuumingCommitCounter) Gather.MustRegister(VolumeServerVacuumingHistogram) @@ -317,6 +342,7 @@ func init() { Gather.MustRegister(S3RequestCounter) Gather.MustRegister(S3HandlerCounter) Gather.MustRegister(S3RequestHistogram) + Gather.MustRegister(S3InFlightRequestsGauge) Gather.MustRegister(S3TimeToFirstByteHistogram) } diff --git a/weed/topology/store_replicate.go b/weed/topology/store_replicate.go index b4a7d649c..a2be991fa 100644 --- a/weed/topology/store_replicate.go +++ b/weed/topology/store_replicate.go @@ -47,6 +47,11 @@ func ReplicatedWrite(masterFn operation.GetMasterFn, grpcDialOption grpc.DialOpt if s.GetVolume(volumeId) != nil { start := time.Now() + + inFlightGauge := stats.VolumeServerInFlightRequestsGauge.WithLabelValues(stats.WriteToLocalDisk) + inFlightGauge.Inc() + defer inFlightGauge.Dec() + isUnchanged, err = s.WriteVolumeNeedle(volumeId, n, true, fsync) stats.VolumeServerRequestHistogram.WithLabelValues(stats.WriteToLocalDisk).Observe(time.Since(start).Seconds()) if err != nil { @@ -59,6 +64,11 @@ func ReplicatedWrite(masterFn operation.GetMasterFn, grpcDialOption grpc.DialOpt if len(remoteLocations) > 0 { //send to other replica locations start := time.Now() + + inFlightGauge := stats.VolumeServerInFlightRequestsGauge.WithLabelValues(stats.WriteToReplicas) + inFlightGauge.Inc() + defer inFlightGauge.Dec() + err = DistributedOperation(remoteLocations, func(location operation.Location) error { u := url.URL{ Scheme: "http",