Browse Source

Merge pull request #3394 from kmlebedev/metricsReplicatedWrite

Detailed metrics VolumeServerRequestHistogram for writing to disk and replication
pull/3399/head
Chris Lu 2 years ago
committed by GitHub
parent
commit
fc8035d672
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      weed/server/volume_server_handlers.go
  2. 6
      weed/server/volume_server_handlers_read.go
  3. 15
      weed/server/volume_server_handlers_write.go
  4. 2
      weed/stats/metrics_names.go
  5. 10
      weed/topology/store_replicate.go

6
weed/server/volume_server_handlers.go

@ -36,6 +36,11 @@ func (vs *VolumeServer) privateStoreHandler(w http.ResponseWriter, r *http.Reque
w.Header().Set("Access-Control-Allow-Origin", "*") w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Credentials", "true") w.Header().Set("Access-Control-Allow-Credentials", "true")
} }
stats.VolumeServerRequestCounter.WithLabelValues(r.Method).Inc()
start := time.Now()
defer func() {
stats.VolumeServerRequestHistogram.WithLabelValues(r.Method).Observe(time.Since(start).Seconds())
}()
switch r.Method { switch r.Method {
case "GET", "HEAD": case "GET", "HEAD":
stats.ReadRequest() stats.ReadRequest()
@ -56,7 +61,6 @@ func (vs *VolumeServer) privateStoreHandler(w http.ResponseWriter, r *http.Reque
stats.DeleteRequest() stats.DeleteRequest()
vs.guard.WhiteList(vs.DeleteHandler)(w, r) vs.guard.WhiteList(vs.DeleteHandler)(w, r)
case "PUT", "POST": case "PUT", "POST":
contentLength := getContentLength(r) contentLength := getContentLength(r)
// exclude the replication from the concurrentUploadLimitMB // exclude the replication from the concurrentUploadLimitMB
if r.URL.Query().Get("type") != "replicate" && vs.concurrentUploadLimit != 0 { if r.URL.Query().Get("type") != "replicate" && vs.concurrentUploadLimit != 0 {

6
weed/server/volume_server_handlers_read.go

@ -20,7 +20,6 @@ import (
"github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/images" "github.com/seaweedfs/seaweedfs/weed/images"
"github.com/seaweedfs/seaweedfs/weed/operation" "github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/stats"
"github.com/seaweedfs/seaweedfs/weed/storage" "github.com/seaweedfs/seaweedfs/weed/storage"
"github.com/seaweedfs/seaweedfs/weed/storage/needle" "github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/util" "github.com/seaweedfs/seaweedfs/weed/util"
@ -29,11 +28,6 @@ import (
var fileNameEscaper = strings.NewReplacer(`\`, `\\`, `"`, `\"`) var fileNameEscaper = strings.NewReplacer(`\`, `\\`, `"`, `\"`)
func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) { func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) {
stats.VolumeServerRequestCounter.WithLabelValues("get").Inc()
start := time.Now()
defer func() { stats.VolumeServerRequestHistogram.WithLabelValues("get").Observe(time.Since(start).Seconds()) }()
n := new(needle.Needle) n := new(needle.Needle)
vid, fid, filename, ext, _ := parseURLPath(r.URL.Path) vid, fid, filename, ext, _ := parseURLPath(r.URL.Path)

15
weed/server/volume_server_handlers_write.go

@ -11,19 +11,11 @@ import (
"github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/operation" "github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/stats"
"github.com/seaweedfs/seaweedfs/weed/storage/needle" "github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/topology" "github.com/seaweedfs/seaweedfs/weed/topology"
) )
func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) { func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) {
stats.VolumeServerRequestCounter.WithLabelValues("post").Inc()
start := time.Now()
defer func() {
stats.VolumeServerRequestHistogram.WithLabelValues("post").Observe(time.Since(start).Seconds())
}()
if e := r.ParseForm(); e != nil { if e := r.ParseForm(); e != nil {
glog.V(0).Infoln("form parse error:", e) glog.V(0).Infoln("form parse error:", e)
writeJsonError(w, r, http.StatusBadRequest, e) writeJsonError(w, r, http.StatusBadRequest, e)
@ -79,13 +71,6 @@ func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) {
} }
func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) { func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
stats.VolumeServerRequestCounter.WithLabelValues("delete").Inc()
start := time.Now()
defer func() {
stats.VolumeServerRequestHistogram.WithLabelValues("delete").Observe(time.Since(start).Seconds())
}()
n := new(needle.Needle) n := new(needle.Needle)
vid, fid, _, _, _ := parseURLPath(r.URL.Path) vid, fid, _, _, _ := parseURLPath(r.URL.Path)
volumeId, _ := needle.NewVolumeId(vid) volumeId, _ := needle.NewVolumeId(vid)

2
weed/stats/metrics_names.go

@ -4,6 +4,8 @@ package stats
// The naming convention is ErrorSomeThing = "error.some.thing" // The naming convention is ErrorSomeThing = "error.some.thing"
const ( const (
// volume server // volume server
WriteToLocalDisk = "writeToLocalDisk"
WriteToReplicas = "writeToReplicas"
ErrorSizeMismatchOffsetSize = "errorSizeMismatchOffsetSize" ErrorSizeMismatchOffsetSize = "errorSizeMismatchOffsetSize"
ErrorSizeMismatch = "errorSizeMismatch" ErrorSizeMismatch = "errorSizeMismatch"
ErrorCRC = "errorCRC" ErrorCRC = "errorCRC"

10
weed/topology/store_replicate.go

@ -9,6 +9,7 @@ import (
"net/url" "net/url"
"strconv" "strconv"
"strings" "strings"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/operation" "github.com/seaweedfs/seaweedfs/weed/operation"
@ -43,7 +44,9 @@ func ReplicatedWrite(masterFn operation.GetMasterFn, grpcDialOption grpc.DialOpt
} }
if s.GetVolume(volumeId) != nil { if s.GetVolume(volumeId) != nil {
start := time.Now()
isUnchanged, err = s.WriteVolumeNeedle(volumeId, n, true, fsync) isUnchanged, err = s.WriteVolumeNeedle(volumeId, n, true, fsync)
stats.VolumeServerRequestHistogram.WithLabelValues(stats.WriteToLocalDisk).Observe(time.Since(start).Seconds())
if err != nil { if err != nil {
stats.VolumeServerRequestCounter.WithLabelValues(stats.ErrorWriteToLocalDisk).Inc() stats.VolumeServerRequestCounter.WithLabelValues(stats.ErrorWriteToLocalDisk).Inc()
err = fmt.Errorf("failed to write to local disk: %v", err) err = fmt.Errorf("failed to write to local disk: %v", err)
@ -53,7 +56,8 @@ func ReplicatedWrite(masterFn operation.GetMasterFn, grpcDialOption grpc.DialOpt
} }
if len(remoteLocations) > 0 { //send to other replica locations if len(remoteLocations) > 0 { //send to other replica locations
if err = DistributedOperation(remoteLocations, func(location operation.Location) error {
start := time.Now()
err = DistributedOperation(remoteLocations, func(location operation.Location) error {
u := url.URL{ u := url.URL{
Scheme: "http", Scheme: "http",
Host: location.Url, Host: location.Url,
@ -97,7 +101,9 @@ func ReplicatedWrite(masterFn operation.GetMasterFn, grpcDialOption grpc.DialOpt
} }
_, err := operation.UploadData(n.Data, uploadOption) _, err := operation.UploadData(n.Data, uploadOption)
return err return err
}); err != nil {
})
stats.VolumeServerRequestHistogram.WithLabelValues(stats.WriteToReplicas).Observe(time.Since(start).Seconds())
if err != nil {
stats.VolumeServerRequestCounter.WithLabelValues(stats.ErrorWriteToReplicas).Inc() stats.VolumeServerRequestCounter.WithLabelValues(stats.ErrorWriteToReplicas).Inc()
err = fmt.Errorf("failed to write to replicas for volume %d: %v", volumeId, err) err = fmt.Errorf("failed to write to replicas for volume %d: %v", volumeId, err)
glog.V(0).Infoln(err) glog.V(0).Infoln(err)

Loading…
Cancel
Save