Browse Source

chore: add status code for request_total metrics (#5188)

pull/5197/head
Konstantin Lebedev 1 year ago
committed by GitHub
parent
commit
a7fc723ae0
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      Makefile
  2. 4
      weed/filer/stream.go
  3. 2
      weed/operation/upload_content.go
  4. 20
      weed/s3api/stats.go
  5. 20
      weed/server/filer_server_handlers.go
  6. 10
      weed/server/filer_server_handlers_read.go
  7. 2
      weed/server/filer_server_handlers_read_dir.go
  8. 2
      weed/server/filer_server_handlers_write.go
  9. 12
      weed/server/filer_server_handlers_write_upload.go
  10. 4
      weed/server/master_grpc_server.go
  11. 20
      weed/server/volume_server_handlers.go
  12. 4
      weed/server/volume_server_handlers_read.go
  13. 21
      weed/stats/http_status_recorder.go
  14. 18
      weed/stats/metrics.go
  15. 20
      weed/storage/needle/needle_read.go
  16. 6
      weed/topology/store_replicate.go

2
Makefile

@ -15,7 +15,7 @@ full_install:
cd weed; go install -tags "elastic gocdk sqlite ydb tikv rclone" cd weed; go install -tags "elastic gocdk sqlite ydb tikv rclone"
server: install server: install
weed -v 4 server -s3 -filer -volume.max=0 -master.volumeSizeLimitMB=1024 -volume.preStopSeconds=1 -s3.port=8000 -s3.allowEmptyFolder=false -s3.allowDeleteBucketNotEmpty=false -s3.config=./docker/compose/s3.json
weed -v 4 server -s3 -filer -volume.max=0 -master.volumeSizeLimitMB=1024 -volume.preStopSeconds=1 -s3.port=8000 -s3.allowEmptyFolder=false -s3.allowDeleteBucketNotEmpty=false -s3.config=./docker/compose/s3.json -metricsPort=9324
benchmark: install warp_install benchmark: install warp_install
pkill weed || true pkill weed || true

4
weed/filer/stream.go

@ -121,10 +121,10 @@ func StreamContentWithThrottler(masterClient wdclient.HasLookupFileIdFunction, w
remaining -= int64(chunkView.ViewSize) remaining -= int64(chunkView.ViewSize)
stats.FilerRequestHistogram.WithLabelValues("chunkDownload").Observe(time.Since(start).Seconds()) stats.FilerRequestHistogram.WithLabelValues("chunkDownload").Observe(time.Since(start).Seconds())
if err != nil { if err != nil {
stats.FilerRequestCounter.WithLabelValues("chunkDownloadError").Inc()
stats.FilerHandlerCounter.WithLabelValues("chunkDownloadError").Inc()
return fmt.Errorf("read chunk: %v", err) return fmt.Errorf("read chunk: %v", err)
} }
stats.FilerRequestCounter.WithLabelValues("chunkDownload").Inc()
stats.FilerHandlerCounter.WithLabelValues("chunkDownload").Inc()
downloadThrottler.MaybeSlowdown(int64(chunkView.ViewSize)) downloadThrottler.MaybeSlowdown(int64(chunkView.ViewSize))
} }
if remaining > 0 { if remaining > 0 {

2
weed/operation/upload_content.go

@ -329,7 +329,7 @@ func upload_content(fillBufferFunction func(w io.Writer) error, originalDataSize
if strings.Contains(post_err.Error(), "connection reset by peer") || if strings.Contains(post_err.Error(), "connection reset by peer") ||
strings.Contains(post_err.Error(), "use of closed network connection") { strings.Contains(post_err.Error(), "use of closed network connection") {
glog.V(1).Infof("repeat error upload request %s: %v", option.UploadUrl, postErr) glog.V(1).Infof("repeat error upload request %s: %v", option.UploadUrl, postErr)
stats.FilerRequestCounter.WithLabelValues(stats.RepeatErrorUploadContent).Inc()
stats.FilerHandlerCounter.WithLabelValues(stats.RepeatErrorUploadContent).Inc()
resp, post_err = HttpClient.Do(req) resp, post_err = HttpClient.Do(req)
defer util.CloseResponse(resp) defer util.CloseResponse(resp)
} }

20
weed/s3api/stats.go

@ -8,29 +8,11 @@ import (
"time" "time"
) )
type StatusRecorder struct {
http.ResponseWriter
Status int
}
func NewStatusResponseWriter(w http.ResponseWriter) *StatusRecorder {
return &StatusRecorder{w, http.StatusOK}
}
func (r *StatusRecorder) WriteHeader(status int) {
r.Status = status
r.ResponseWriter.WriteHeader(status)
}
func (r *StatusRecorder) Flush() {
r.ResponseWriter.(http.Flusher).Flush()
}
func track(f http.HandlerFunc, action string) http.HandlerFunc { func track(f http.HandlerFunc, action string) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
bucket, _ := s3_constants.GetBucketAndObject(r) bucket, _ := s3_constants.GetBucketAndObject(r)
w.Header().Set("Server", "SeaweedFS S3") w.Header().Set("Server", "SeaweedFS S3")
recorder := NewStatusResponseWriter(w)
recorder := stats_collect.NewStatusResponseWriter(w)
start := time.Now() start := time.Now()
f(recorder, r) f(recorder, r)
if recorder.Status == http.StatusForbidden { if recorder.Status == http.StatusForbidden {

20
weed/server/filer_server_handlers.go

@ -4,6 +4,7 @@ import (
"errors" "errors"
"net/http" "net/http"
"os" "os"
"strconv"
"strings" "strings"
"sync/atomic" "sync/atomic"
"time" "time"
@ -17,7 +18,8 @@ import (
func (fs *FilerServer) filerHandler(w http.ResponseWriter, r *http.Request) { func (fs *FilerServer) filerHandler(w http.ResponseWriter, r *http.Request) {
start := time.Now() start := time.Now()
statusRecorder := stats.NewStatusResponseWriter(w)
w = statusRecorder
origin := r.Header.Get("Origin") origin := r.Header.Get("Origin")
if origin != "" { if origin != "" {
if fs.option.AllowedOrigins == nil || len(fs.option.AllowedOrigins) == 0 || fs.option.AllowedOrigins[0] == "*" { if fs.option.AllowedOrigins == nil || len(fs.option.AllowedOrigins) == 0 || fs.option.AllowedOrigins[0] == "*" {
@ -53,23 +55,23 @@ func (fs *FilerServer) filerHandler(w http.ResponseWriter, r *http.Request) {
fileId = r.RequestURI[len("/?proxyChunkId="):] fileId = r.RequestURI[len("/?proxyChunkId="):]
} }
if fileId != "" { if fileId != "" {
stats.FilerRequestCounter.WithLabelValues(stats.ChunkProxy).Inc()
fs.proxyToVolumeServer(w, r, fileId) fs.proxyToVolumeServer(w, r, fileId)
stats.FilerHandlerCounter.WithLabelValues(stats.ChunkProxy).Inc()
stats.FilerRequestHistogram.WithLabelValues(stats.ChunkProxy).Observe(time.Since(start).Seconds()) stats.FilerRequestHistogram.WithLabelValues(stats.ChunkProxy).Observe(time.Since(start).Seconds())
return return
} }
defer func() {
stats.FilerRequestCounter.WithLabelValues(r.Method, strconv.Itoa(statusRecorder.Status)).Inc()
stats.FilerRequestHistogram.WithLabelValues(r.Method).Observe(time.Since(start).Seconds())
}()
isReadHttpCall := r.Method == "GET" || r.Method == "HEAD" isReadHttpCall := r.Method == "GET" || r.Method == "HEAD"
if !fs.maybeCheckJwtAuthorization(r, !isReadHttpCall) { if !fs.maybeCheckJwtAuthorization(r, !isReadHttpCall) {
writeJsonError(w, r, http.StatusUnauthorized, errors.New("wrong jwt")) writeJsonError(w, r, http.StatusUnauthorized, errors.New("wrong jwt"))
return return
} }
stats.FilerRequestCounter.WithLabelValues(r.Method).Inc()
defer func() {
stats.FilerRequestHistogram.WithLabelValues(r.Method).Observe(time.Since(start).Seconds())
}()
w.Header().Set("Server", "SeaweedFS Filer "+util.VERSION) w.Header().Set("Server", "SeaweedFS Filer "+util.VERSION)
switch r.Method { switch r.Method {
@ -115,6 +117,8 @@ func (fs *FilerServer) filerHandler(w http.ResponseWriter, r *http.Request) {
func (fs *FilerServer) readonlyFilerHandler(w http.ResponseWriter, r *http.Request) { func (fs *FilerServer) readonlyFilerHandler(w http.ResponseWriter, r *http.Request) {
start := time.Now() start := time.Now()
statusRecorder := stats.NewStatusResponseWriter(w)
w = statusRecorder
os.Stdout.WriteString("Request: " + r.Method + " " + r.URL.String() + "\n") os.Stdout.WriteString("Request: " + r.Method + " " + r.URL.String() + "\n")
@ -140,8 +144,8 @@ func (fs *FilerServer) readonlyFilerHandler(w http.ResponseWriter, r *http.Reque
w.Header().Set("Access-Control-Allow-Credentials", "true") w.Header().Set("Access-Control-Allow-Credentials", "true")
} }
stats.FilerRequestCounter.WithLabelValues(r.Method).Inc()
defer func() { defer func() {
stats.FilerRequestCounter.WithLabelValues(r.Method, strconv.Itoa(statusRecorder.Status)).Inc()
stats.FilerRequestHistogram.WithLabelValues(r.Method).Observe(time.Since(start).Seconds()) stats.FilerRequestHistogram.WithLabelValues(r.Method).Observe(time.Since(start).Seconds())
}() }()
// We handle OPTIONS first because it never should be authenticated // We handle OPTIONS first because it never should be authenticated

10
weed/server/filer_server_handlers_read.go

@ -98,11 +98,11 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
} }
if err == filer_pb.ErrNotFound { if err == filer_pb.ErrNotFound {
glog.V(2).Infof("Not found %s: %v", path, err) glog.V(2).Infof("Not found %s: %v", path, err)
stats.FilerRequestCounter.WithLabelValues(stats.ErrorReadNotFound).Inc()
stats.FilerHandlerCounter.WithLabelValues(stats.ErrorReadNotFound).Inc()
w.WriteHeader(http.StatusNotFound) w.WriteHeader(http.StatusNotFound)
} else { } else {
glog.Errorf("Internal %s: %v", path, err) glog.Errorf("Internal %s: %v", path, err)
stats.FilerRequestCounter.WithLabelValues(stats.ErrorReadInternal).Inc()
stats.FilerHandlerCounter.WithLabelValues(stats.ErrorReadInternal).Inc()
w.WriteHeader(http.StatusInternalServerError) w.WriteHeader(http.StatusInternalServerError)
} }
return return
@ -233,7 +233,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
if offset+size <= int64(len(entry.Content)) { if offset+size <= int64(len(entry.Content)) {
_, err := writer.Write(entry.Content[offset : offset+size]) _, err := writer.Write(entry.Content[offset : offset+size])
if err != nil { if err != nil {
stats.FilerRequestCounter.WithLabelValues(stats.ErrorWriteEntry).Inc()
stats.FilerHandlerCounter.WithLabelValues(stats.ErrorWriteEntry).Inc()
glog.Errorf("failed to write entry content: %v", err) glog.Errorf("failed to write entry content: %v", err)
} }
return err return err
@ -245,7 +245,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
Directory: dir, Directory: dir,
Name: name, Name: name,
}); err != nil { }); err != nil {
stats.FilerRequestCounter.WithLabelValues(stats.ErrorReadCache).Inc()
stats.FilerHandlerCounter.WithLabelValues(stats.ErrorReadCache).Inc()
glog.Errorf("CacheRemoteObjectToLocalCluster %s: %v", entry.FullPath, err) glog.Errorf("CacheRemoteObjectToLocalCluster %s: %v", entry.FullPath, err)
return fmt.Errorf("cache %s: %v", entry.FullPath, err) return fmt.Errorf("cache %s: %v", entry.FullPath, err)
} else { } else {
@ -255,7 +255,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
err = filer.StreamContentWithThrottler(fs.filer.MasterClient, writer, chunks, offset, size, fs.option.DownloadMaxBytesPs) err = filer.StreamContentWithThrottler(fs.filer.MasterClient, writer, chunks, offset, size, fs.option.DownloadMaxBytesPs)
if err != nil { if err != nil {
stats.FilerRequestCounter.WithLabelValues(stats.ErrorReadStream).Inc()
stats.FilerHandlerCounter.WithLabelValues(stats.ErrorReadStream).Inc()
glog.Errorf("failed to stream content %s: %v", r.URL, err) glog.Errorf("failed to stream content %s: %v", r.URL, err)
} }
return err return err

2
weed/server/filer_server_handlers_read_dir.go

@ -18,7 +18,7 @@ import (
// is empty. // is empty.
func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Request) { func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Request) {
stats.FilerRequestCounter.WithLabelValues(stats.DirList).Inc()
stats.FilerHandlerCounter.WithLabelValues(stats.DirList).Inc()
path := r.URL.Path path := r.URL.Path
if strings.HasSuffix(path, "/") && len(path) > 1 { if strings.HasSuffix(path, "/") && len(path) > 1 {

2
weed/server/filer_server_handlers_write.go

@ -35,7 +35,7 @@ type FilerPostResult struct {
func (fs *FilerServer) assignNewFileInfo(so *operation.StorageOption) (fileId, urlLocation string, auth security.EncodedJwt, err error) { func (fs *FilerServer) assignNewFileInfo(so *operation.StorageOption) (fileId, urlLocation string, auth security.EncodedJwt, err error) {
stats.FilerRequestCounter.WithLabelValues(stats.ChunkAssign).Inc()
stats.FilerHandlerCounter.WithLabelValues(stats.ChunkAssign).Inc()
start := time.Now() start := time.Now()
defer func() { defer func() {
stats.FilerRequestHistogram.WithLabelValues(stats.ChunkAssign).Observe(time.Since(start).Seconds()) stats.FilerRequestHistogram.WithLabelValues(stats.ChunkAssign).Observe(time.Since(start).Seconds())

12
weed/server/filer_server_handlers_write_upload.go

@ -86,11 +86,11 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque
bytesBuffer.Read(smallContent) bytesBuffer.Read(smallContent)
bufPool.Put(bytesBuffer) bufPool.Put(bytesBuffer)
<-bytesBufferLimitChan <-bytesBufferLimitChan
stats.FilerRequestCounter.WithLabelValues(stats.ContentSaveToFiler).Inc()
stats.FilerHandlerCounter.WithLabelValues(stats.ContentSaveToFiler).Inc()
break break
} }
} else { } else {
stats.FilerRequestCounter.WithLabelValues(stats.AutoChunk).Inc()
stats.FilerHandlerCounter.WithLabelValues(stats.AutoChunk).Inc()
} }
wg.Add(1) wg.Add(1)
@ -143,7 +143,7 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque
func (fs *FilerServer) doUpload(urlLocation string, limitedReader io.Reader, fileName string, contentType string, pairMap map[string]string, auth security.EncodedJwt) (*operation.UploadResult, error, []byte) { func (fs *FilerServer) doUpload(urlLocation string, limitedReader io.Reader, fileName string, contentType string, pairMap map[string]string, auth security.EncodedJwt) (*operation.UploadResult, error, []byte) {
stats.FilerRequestCounter.WithLabelValues(stats.ChunkUpload).Inc()
stats.FilerHandlerCounter.WithLabelValues(stats.ChunkUpload).Inc()
start := time.Now() start := time.Now()
defer func() { defer func() {
stats.FilerRequestHistogram.WithLabelValues(stats.ChunkUpload).Observe(time.Since(start).Seconds()) stats.FilerRequestHistogram.WithLabelValues(stats.ChunkUpload).Observe(time.Since(start).Seconds())
@ -160,7 +160,7 @@ func (fs *FilerServer) doUpload(urlLocation string, limitedReader io.Reader, fil
} }
uploadResult, err, data := operation.Upload(limitedReader, uploadOption) uploadResult, err, data := operation.Upload(limitedReader, uploadOption)
if uploadResult != nil && uploadResult.RetryCount > 0 { if uploadResult != nil && uploadResult.RetryCount > 0 {
stats.FilerRequestCounter.WithLabelValues(stats.ChunkUploadRetry).Add(float64(uploadResult.RetryCount))
stats.FilerHandlerCounter.WithLabelValues(stats.ChunkUploadRetry).Add(float64(uploadResult.RetryCount))
} }
return uploadResult, err, data return uploadResult, err, data
} }
@ -180,14 +180,14 @@ func (fs *FilerServer) dataToChunk(fileName, contentType string, data []byte, ch
fileId, urlLocation, auth, uploadErr = fs.assignNewFileInfo(so) fileId, urlLocation, auth, uploadErr = fs.assignNewFileInfo(so)
if uploadErr != nil { if uploadErr != nil {
glog.V(4).Infof("retry later due to assign error: %v", uploadErr) glog.V(4).Infof("retry later due to assign error: %v", uploadErr)
stats.FilerRequestCounter.WithLabelValues(stats.ChunkAssignRetry).Inc()
stats.FilerHandlerCounter.WithLabelValues(stats.ChunkAssignRetry).Inc()
return uploadErr return uploadErr
} }
// upload the chunk to the volume server // upload the chunk to the volume server
uploadResult, uploadErr, _ = fs.doUpload(urlLocation, dataReader, fileName, contentType, nil, auth) uploadResult, uploadErr, _ = fs.doUpload(urlLocation, dataReader, fileName, contentType, nil, auth)
if uploadErr != nil { if uploadErr != nil {
glog.V(4).Infof("retry later due to upload error: %v", uploadErr) glog.V(4).Infof("retry later due to upload error: %v", uploadErr)
stats.FilerRequestCounter.WithLabelValues(stats.ChunkDoUploadRetry).Inc()
stats.FilerHandlerCounter.WithLabelValues(stats.ChunkDoUploadRetry).Inc()
fid, _ := filer_pb.ToFileIdObject(fileId) fid, _ := filer_pb.ToFileIdObject(fileId)
fileChunk := filer_pb.FileChunk{ fileChunk := filer_pb.FileChunk{
FileId: fileId, FileId: fileId,

4
weed/server/master_grpc_server.go

@ -168,10 +168,10 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
DataCenter: dn.GetDataCenterId(), DataCenter: dn.GetDataCenterId(),
} }
if len(heartbeat.NewVolumes) > 0 { if len(heartbeat.NewVolumes) > 0 {
stats.FilerRequestCounter.WithLabelValues("newVolumes").Inc()
stats.MasterReceivedHeartbeatCounter.WithLabelValues("newVolumes").Inc()
} }
if len(heartbeat.DeletedVolumes) > 0 { if len(heartbeat.DeletedVolumes) > 0 {
stats.FilerRequestCounter.WithLabelValues("deletedVolumes").Inc()
stats.MasterReceivedHeartbeatCounter.WithLabelValues("deletedVolumes").Inc()
} }
if len(heartbeat.NewVolumes) > 0 || len(heartbeat.DeletedVolumes) > 0 { if len(heartbeat.NewVolumes) > 0 || len(heartbeat.DeletedVolumes) > 0 {
// process delta volume ids if exists for fast volume id updates // process delta volume ids if exists for fast volume id updates

20
weed/server/volume_server_handlers.go

@ -31,6 +31,8 @@ security settings:
*/ */
func (vs *VolumeServer) privateStoreHandler(w http.ResponseWriter, r *http.Request) { func (vs *VolumeServer) privateStoreHandler(w http.ResponseWriter, r *http.Request) {
statusRecorder := stats.NewStatusResponseWriter(w)
w = statusRecorder
w.Header().Set("Server", "SeaweedFS Volume "+util.VERSION) w.Header().Set("Server", "SeaweedFS Volume "+util.VERSION)
if r.Header.Get("Origin") != "" { if r.Header.Get("Origin") != "" {
w.Header().Set("Access-Control-Allow-Origin", "*") w.Header().Set("Access-Control-Allow-Origin", "*")
@ -38,10 +40,10 @@ func (vs *VolumeServer) privateStoreHandler(w http.ResponseWriter, r *http.Reque
} }
start := time.Now() start := time.Now()
requestMethod := r.Method requestMethod := r.Method
defer func(start time.Time, method *string) {
stats.VolumeServerRequestCounter.WithLabelValues(*method).Inc()
defer func(start time.Time, method *string, statusRecorder *stats.StatusRecorder) {
stats.VolumeServerRequestCounter.WithLabelValues(*method, strconv.Itoa(statusRecorder.Status)).Inc()
stats.VolumeServerRequestHistogram.WithLabelValues(*method).Observe(time.Since(start).Seconds()) stats.VolumeServerRequestHistogram.WithLabelValues(*method).Observe(time.Since(start).Seconds())
}(start, &requestMethod)
}(start, &requestMethod, statusRecorder)
switch r.Method { switch r.Method {
case http.MethodGet, http.MethodHead: case http.MethodGet, http.MethodHead:
stats.ReadRequest() stats.ReadRequest()
@ -63,11 +65,9 @@ func (vs *VolumeServer) privateStoreHandler(w http.ResponseWriter, r *http.Reque
vs.inFlightDownloadDataLimitCond.L.Unlock() vs.inFlightDownloadDataLimitCond.L.Unlock()
vs.GetOrHeadHandler(w, r) vs.GetOrHeadHandler(w, r)
case http.MethodDelete: case http.MethodDelete:
stats.VolumeServerRequestCounter.WithLabelValues(r.Method).Inc()
stats.DeleteRequest() stats.DeleteRequest()
vs.guard.WhiteList(vs.DeleteHandler)(w, r) vs.guard.WhiteList(vs.DeleteHandler)(w, r)
case http.MethodPut, http.MethodPost: case http.MethodPut, http.MethodPost:
stats.VolumeServerRequestCounter.WithLabelValues(r.Method).Inc()
contentLength := getContentLength(r) contentLength := getContentLength(r)
// exclude the replication from the concurrentUploadLimitMB // exclude the replication from the concurrentUploadLimitMB
if r.URL.Query().Get("type") != "replicate" && vs.concurrentUploadLimit != 0 { if r.URL.Query().Get("type") != "replicate" && vs.concurrentUploadLimit != 0 {
@ -124,11 +124,21 @@ func getContentLength(r *http.Request) int64 {
} }
func (vs *VolumeServer) publicReadOnlyHandler(w http.ResponseWriter, r *http.Request) { func (vs *VolumeServer) publicReadOnlyHandler(w http.ResponseWriter, r *http.Request) {
statusRecorder := stats.NewStatusResponseWriter(w)
w = statusRecorder
w.Header().Set("Server", "SeaweedFS Volume "+util.VERSION) w.Header().Set("Server", "SeaweedFS Volume "+util.VERSION)
if r.Header.Get("Origin") != "" { if r.Header.Get("Origin") != "" {
w.Header().Set("Access-Control-Allow-Origin", "*") w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Credentials", "true") w.Header().Set("Access-Control-Allow-Credentials", "true")
} }
start := time.Now()
requestMethod := r.Method
defer func(start time.Time, method *string, statusRecorder *stats.StatusRecorder) {
stats.VolumeServerRequestCounter.WithLabelValues(*method, strconv.Itoa(statusRecorder.Status)).Inc()
stats.VolumeServerRequestHistogram.WithLabelValues(*method).Observe(time.Since(start).Seconds())
}(start, &requestMethod, statusRecorder)
switch r.Method { switch r.Method {
case http.MethodGet, http.MethodHead: case http.MethodGet, http.MethodHead:
stats.ReadRequest() stats.ReadRequest()

4
weed/server/volume_server_handlers_read.go

@ -30,12 +30,12 @@ import (
var fileNameEscaper = strings.NewReplacer(`\`, `\\`, `"`, `\"`) var fileNameEscaper = strings.NewReplacer(`\`, `\\`, `"`, `\"`)
func NotFound(w http.ResponseWriter) { func NotFound(w http.ResponseWriter) {
stats.VolumeServerRequestCounter.WithLabelValues(stats.ErrorGetNotFound).Inc()
stats.VolumeServerHandlerCounter.WithLabelValues(stats.ErrorGetNotFound).Inc()
w.WriteHeader(http.StatusNotFound) w.WriteHeader(http.StatusNotFound)
} }
func InternalError(w http.ResponseWriter) { func InternalError(w http.ResponseWriter) {
stats.VolumeServerRequestCounter.WithLabelValues(stats.ErrorGetInternal).Inc()
stats.VolumeServerHandlerCounter.WithLabelValues(stats.ErrorGetInternal).Inc()
w.WriteHeader(http.StatusInternalServerError) w.WriteHeader(http.StatusInternalServerError)
} }

21
weed/stats/http_status_recorder.go

@ -0,0 +1,21 @@
package stats
import "net/http"
type StatusRecorder struct {
http.ResponseWriter
Status int
}
func NewStatusResponseWriter(w http.ResponseWriter) *StatusRecorder {
return &StatusRecorder{w, http.StatusOK}
}
func (r *StatusRecorder) WriteHeader(status int) {
r.Status = status
r.ResponseWriter.WriteHeader(status)
}
func (r *StatusRecorder) Flush() {
r.ResponseWriter.(http.Flusher).Flush()
}

18
weed/stats/metrics.go

@ -84,6 +84,14 @@ var (
Subsystem: "filer", Subsystem: "filer",
Name: "request_total", Name: "request_total",
Help: "Counter of filer requests.", Help: "Counter of filer requests.",
}, []string{"type", "code"})
FilerHandlerCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: Namespace,
Subsystem: "filer",
Name: "handler_total",
Help: "Counter of filer handlers.",
}, []string{"type"}) }, []string{"type"})
FilerRequestHistogram = prometheus.NewHistogramVec( FilerRequestHistogram = prometheus.NewHistogramVec(
@ -134,6 +142,14 @@ var (
Subsystem: "volumeServer", Subsystem: "volumeServer",
Name: "request_total", Name: "request_total",
Help: "Counter of volume server requests.", Help: "Counter of volume server requests.",
}, []string{"type", "code"})
VolumeServerHandlerCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: Namespace,
Subsystem: "volumeServer",
Name: "handler_total",
Help: "Counter of volume server handlers.",
}, []string{"type"}) }, []string{"type"})
VolumeServerVacuumingCompactCounter = prometheus.NewCounterVec( VolumeServerVacuumingCompactCounter = prometheus.NewCounterVec(
@ -245,6 +261,7 @@ func init() {
Gather.MustRegister(MasterReplicaPlacementMismatch) Gather.MustRegister(MasterReplicaPlacementMismatch)
Gather.MustRegister(FilerRequestCounter) Gather.MustRegister(FilerRequestCounter)
Gather.MustRegister(FilerHandlerCounter)
Gather.MustRegister(FilerRequestHistogram) Gather.MustRegister(FilerRequestHistogram)
Gather.MustRegister(FilerStoreCounter) Gather.MustRegister(FilerStoreCounter)
Gather.MustRegister(FilerStoreHistogram) Gather.MustRegister(FilerStoreHistogram)
@ -254,6 +271,7 @@ func init() {
Gather.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{})) Gather.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}))
Gather.MustRegister(VolumeServerRequestCounter) Gather.MustRegister(VolumeServerRequestCounter)
Gather.MustRegister(VolumeServerHandlerCounter)
Gather.MustRegister(VolumeServerRequestHistogram) Gather.MustRegister(VolumeServerRequestHistogram)
Gather.MustRegister(VolumeServerVacuumingCompactCounter) Gather.MustRegister(VolumeServerVacuumingCompactCounter)
Gather.MustRegister(VolumeServerVacuumingCommitCounter) Gather.MustRegister(VolumeServerVacuumingCommitCounter)

20
weed/storage/needle/needle_read.go

@ -54,11 +54,11 @@ func (n *Needle) ReadBytes(bytes []byte, offset int64, size Size, version Versio
if n.Size != size { if n.Size != size {
// cookie is not always passed in for this API. Use size to do preliminary checking. // cookie is not always passed in for this API. Use size to do preliminary checking.
if OffsetSize == 4 && offset < int64(MaxPossibleVolumeSize) { if OffsetSize == 4 && offset < int64(MaxPossibleVolumeSize) {
stats.VolumeServerRequestCounter.WithLabelValues(stats.ErrorSizeMismatchOffsetSize).Inc()
stats.VolumeServerHandlerCounter.WithLabelValues(stats.ErrorSizeMismatchOffsetSize).Inc()
glog.Errorf("entry not found1: offset %d found id %x size %d, expected size %d", offset, n.Id, n.Size, size) glog.Errorf("entry not found1: offset %d found id %x size %d, expected size %d", offset, n.Id, n.Size, size)
return ErrorSizeMismatch return ErrorSizeMismatch
} }
stats.VolumeServerRequestCounter.WithLabelValues(stats.ErrorSizeMismatch).Inc()
stats.VolumeServerHandlerCounter.WithLabelValues(stats.ErrorSizeMismatch).Inc()
return fmt.Errorf("entry not found: offset %d found id %x size %d, expected size %d", offset, n.Id, n.Size, size) return fmt.Errorf("entry not found: offset %d found id %x size %d, expected size %d", offset, n.Id, n.Size, size)
} }
switch version { switch version {
@ -75,7 +75,7 @@ func (n *Needle) ReadBytes(bytes []byte, offset int64, size Size, version Versio
newChecksum := NewCRC(n.Data) newChecksum := NewCRC(n.Data)
if checksum != newChecksum.Value() && checksum != uint32(newChecksum) { if checksum != newChecksum.Value() && checksum != uint32(newChecksum) {
// the crc.Value() function is to be deprecated. this double checking is for backward compatible. // the crc.Value() function is to be deprecated. this double checking is for backward compatible.
stats.VolumeServerRequestCounter.WithLabelValues(stats.ErrorCRC).Inc()
stats.VolumeServerHandlerCounter.WithLabelValues(stats.ErrorCRC).Inc()
return errors.New("CRC error! Data On Disk Corrupted") return errors.New("CRC error! Data On Disk Corrupted")
} }
n.Checksum = newChecksum n.Checksum = newChecksum
@ -108,7 +108,7 @@ func (n *Needle) readNeedleDataVersion2(bytes []byte) (err error) {
n.DataSize = util.BytesToUint32(bytes[index : index+4]) n.DataSize = util.BytesToUint32(bytes[index : index+4])
index = index + 4 index = index + 4
if int(n.DataSize)+index > lenBytes { if int(n.DataSize)+index > lenBytes {
stats.VolumeServerRequestCounter.WithLabelValues(stats.ErrorIndexOutOfRange).Inc()
stats.VolumeServerHandlerCounter.WithLabelValues(stats.ErrorIndexOutOfRange).Inc()
return fmt.Errorf("index out of range %d", 1) return fmt.Errorf("index out of range %d", 1)
} }
n.Data = bytes[index : index+int(n.DataSize)] n.Data = bytes[index : index+int(n.DataSize)]
@ -127,7 +127,7 @@ func (n *Needle) readNeedleDataVersion2NonData(bytes []byte) (index int, err err
n.NameSize = uint8(bytes[index]) n.NameSize = uint8(bytes[index])
index = index + 1 index = index + 1
if int(n.NameSize)+index > lenBytes { if int(n.NameSize)+index > lenBytes {
stats.VolumeServerRequestCounter.WithLabelValues(stats.ErrorIndexOutOfRange).Inc()
stats.VolumeServerHandlerCounter.WithLabelValues(stats.ErrorIndexOutOfRange).Inc()
return index, fmt.Errorf("index out of range %d", 2) return index, fmt.Errorf("index out of range %d", 2)
} }
n.Name = bytes[index : index+int(n.NameSize)] n.Name = bytes[index : index+int(n.NameSize)]
@ -137,7 +137,7 @@ func (n *Needle) readNeedleDataVersion2NonData(bytes []byte) (index int, err err
n.MimeSize = uint8(bytes[index]) n.MimeSize = uint8(bytes[index])
index = index + 1 index = index + 1
if int(n.MimeSize)+index > lenBytes { if int(n.MimeSize)+index > lenBytes {
stats.VolumeServerRequestCounter.WithLabelValues(stats.ErrorIndexOutOfRange).Inc()
stats.VolumeServerHandlerCounter.WithLabelValues(stats.ErrorIndexOutOfRange).Inc()
return index, fmt.Errorf("index out of range %d", 3) return index, fmt.Errorf("index out of range %d", 3)
} }
n.Mime = bytes[index : index+int(n.MimeSize)] n.Mime = bytes[index : index+int(n.MimeSize)]
@ -145,7 +145,7 @@ func (n *Needle) readNeedleDataVersion2NonData(bytes []byte) (index int, err err
} }
if index < lenBytes && n.HasLastModifiedDate() { if index < lenBytes && n.HasLastModifiedDate() {
if LastModifiedBytesLength+index > lenBytes { if LastModifiedBytesLength+index > lenBytes {
stats.VolumeServerRequestCounter.WithLabelValues(stats.ErrorIndexOutOfRange).Inc()
stats.VolumeServerHandlerCounter.WithLabelValues(stats.ErrorIndexOutOfRange).Inc()
return index, fmt.Errorf("index out of range %d", 4) return index, fmt.Errorf("index out of range %d", 4)
} }
n.LastModified = util.BytesToUint64(bytes[index : index+LastModifiedBytesLength]) n.LastModified = util.BytesToUint64(bytes[index : index+LastModifiedBytesLength])
@ -153,7 +153,7 @@ func (n *Needle) readNeedleDataVersion2NonData(bytes []byte) (index int, err err
} }
if index < lenBytes && n.HasTtl() { if index < lenBytes && n.HasTtl() {
if TtlBytesLength+index > lenBytes { if TtlBytesLength+index > lenBytes {
stats.VolumeServerRequestCounter.WithLabelValues(stats.ErrorIndexOutOfRange).Inc()
stats.VolumeServerHandlerCounter.WithLabelValues(stats.ErrorIndexOutOfRange).Inc()
return index, fmt.Errorf("index out of range %d", 5) return index, fmt.Errorf("index out of range %d", 5)
} }
n.Ttl = LoadTTLFromBytes(bytes[index : index+TtlBytesLength]) n.Ttl = LoadTTLFromBytes(bytes[index : index+TtlBytesLength])
@ -161,13 +161,13 @@ func (n *Needle) readNeedleDataVersion2NonData(bytes []byte) (index int, err err
} }
if index < lenBytes && n.HasPairs() { if index < lenBytes && n.HasPairs() {
if 2+index > lenBytes { if 2+index > lenBytes {
stats.VolumeServerRequestCounter.WithLabelValues(stats.ErrorIndexOutOfRange).Inc()
stats.VolumeServerHandlerCounter.WithLabelValues(stats.ErrorIndexOutOfRange).Inc()
return index, fmt.Errorf("index out of range %d", 6) return index, fmt.Errorf("index out of range %d", 6)
} }
n.PairsSize = util.BytesToUint16(bytes[index : index+2]) n.PairsSize = util.BytesToUint16(bytes[index : index+2])
index += 2 index += 2
if int(n.PairsSize)+index > lenBytes { if int(n.PairsSize)+index > lenBytes {
stats.VolumeServerRequestCounter.WithLabelValues(stats.ErrorIndexOutOfRange).Inc()
stats.VolumeServerHandlerCounter.WithLabelValues(stats.ErrorIndexOutOfRange).Inc()
return index, fmt.Errorf("index out of range %d", 7) return index, fmt.Errorf("index out of range %d", 7)
} }
end := index + int(n.PairsSize) end := index + int(n.PairsSize)

6
weed/topology/store_replicate.go

@ -48,7 +48,7 @@ func ReplicatedWrite(masterFn operation.GetMasterFn, grpcDialOption grpc.DialOpt
isUnchanged, err = s.WriteVolumeNeedle(volumeId, n, true, fsync) isUnchanged, err = s.WriteVolumeNeedle(volumeId, n, true, fsync)
stats.VolumeServerRequestHistogram.WithLabelValues(stats.WriteToLocalDisk).Observe(time.Since(start).Seconds()) stats.VolumeServerRequestHistogram.WithLabelValues(stats.WriteToLocalDisk).Observe(time.Since(start).Seconds())
if err != nil { if err != nil {
stats.VolumeServerRequestCounter.WithLabelValues(stats.ErrorWriteToLocalDisk).Inc()
stats.VolumeServerHandlerCounter.WithLabelValues(stats.ErrorWriteToLocalDisk).Inc()
err = fmt.Errorf("failed to write to local disk: %v", err) err = fmt.Errorf("failed to write to local disk: %v", err)
glog.V(0).Infoln(err) glog.V(0).Infoln(err)
return return
@ -80,7 +80,7 @@ func ReplicatedWrite(masterFn operation.GetMasterFn, grpcDialOption grpc.DialOpt
tmpMap := make(map[string]string) tmpMap := make(map[string]string)
err := json.Unmarshal(n.Pairs, &tmpMap) err := json.Unmarshal(n.Pairs, &tmpMap)
if err != nil { if err != nil {
stats.VolumeServerRequestCounter.WithLabelValues(stats.ErrorUnmarshalPairs).Inc()
stats.VolumeServerHandlerCounter.WithLabelValues(stats.ErrorUnmarshalPairs).Inc()
glog.V(0).Infoln("Unmarshal pairs error:", err) glog.V(0).Infoln("Unmarshal pairs error:", err)
} }
for k, v := range tmpMap { for k, v := range tmpMap {
@ -109,7 +109,7 @@ func ReplicatedWrite(masterFn operation.GetMasterFn, grpcDialOption grpc.DialOpt
}) })
stats.VolumeServerRequestHistogram.WithLabelValues(stats.WriteToReplicas).Observe(time.Since(start).Seconds()) stats.VolumeServerRequestHistogram.WithLabelValues(stats.WriteToReplicas).Observe(time.Since(start).Seconds())
if err != nil { if err != nil {
stats.VolumeServerRequestCounter.WithLabelValues(stats.ErrorWriteToReplicas).Inc()
stats.VolumeServerHandlerCounter.WithLabelValues(stats.ErrorWriteToReplicas).Inc()
err = fmt.Errorf("failed to write to replicas for volume %d: %v", volumeId, err) err = fmt.Errorf("failed to write to replicas for volume %d: %v", volumeId, err)
glog.V(0).Infoln(err) glog.V(0).Infoln(err)
return false, err return false, err

Loading…
Cancel
Save