Browse Source

filer: adds basic metrics pushing to Prometheus gateway

pull/991/head
Chris Lu 6 years ago
parent
commit
a11525fe4e
  1. 6
      weed/command/filer.go
  2. 7
      weed/command/server.go
  3. 29
      weed/server/filer_server.go
  4. 9
      weed/server/filer_server_handlers_read.go
  5. 8
      weed/server/filer_server_handlers_write.go
  6. 27
      weed/server/metrics.go

6
weed/command/filer.go

@ -34,6 +34,8 @@ type FilerOptions struct {
dataCenter *string dataCenter *string
enableNotification *bool enableNotification *bool
disableHttp *bool disableHttp *bool
metricsAddress *string
metricsIntervalSec *int
// default leveldb directory, used in "weed server" mode // default leveldb directory, used in "weed server" mode
defaultLevelDbDirectory *string defaultLevelDbDirectory *string
@ -53,6 +55,8 @@ func init() {
f.dirListingLimit = cmdFiler.Flag.Int("dirListLimit", 100000, "limit sub dir listing size") f.dirListingLimit = cmdFiler.Flag.Int("dirListLimit", 100000, "limit sub dir listing size")
f.dataCenter = cmdFiler.Flag.String("dataCenter", "", "prefer to write to volumes in this data center") f.dataCenter = cmdFiler.Flag.String("dataCenter", "", "prefer to write to volumes in this data center")
f.disableHttp = cmdFiler.Flag.Bool("disableHttp", false, "disable http request, only gRpc operations are allowed") f.disableHttp = cmdFiler.Flag.Bool("disableHttp", false, "disable http request, only gRpc operations are allowed")
f.metricsAddress = cmdFiler.Flag.String("metrics.address", "", "Prometheus gateway address")
f.metricsIntervalSec = cmdFiler.Flag.Int("metrics.intervalSeconds", 15, "Prometheus push interval in seconds")
} }
var cmdFiler = &Command{ var cmdFiler = &Command{
@ -110,6 +114,8 @@ func (fo *FilerOptions) startFiler() {
DataCenter: *fo.dataCenter, DataCenter: *fo.dataCenter,
DefaultLevelDbDir: defaultLevelDbDirectory, DefaultLevelDbDir: defaultLevelDbDirectory,
DisableHttp: *fo.disableHttp, DisableHttp: *fo.disableHttp,
MetricsAddress: *fo.metricsAddress,
MetricsIntervalSec: *fo.metricsIntervalSec,
}) })
if nfs_err != nil { if nfs_err != nil {
glog.Fatalf("Filer startup error: %v", nfs_err) glog.Fatalf("Filer startup error: %v", nfs_err)

7
weed/command/server.go

@ -25,6 +25,8 @@ import (
type ServerOptions struct { type ServerOptions struct {
cpuprofile *string cpuprofile *string
metricsAddress *string
metricsIntervalSec *int
v VolumeServerOptions v VolumeServerOptions
} }
@ -81,6 +83,8 @@ var (
func init() { func init() {
serverOptions.cpuprofile = cmdServer.Flag.String("cpuprofile", "", "cpu profile output file") serverOptions.cpuprofile = cmdServer.Flag.String("cpuprofile", "", "cpu profile output file")
serverOptions.metricsAddress = cmdServer.Flag.String("metrics.address", "", "Prometheus gateway address")
serverOptions.metricsIntervalSec = cmdServer.Flag.Int("metrics.intervalSeconds", 15, "Prometheus push interval in seconds")
filerOptions.collection = cmdServer.Flag.String("filer.collection", "", "all data will be stored in this collection") filerOptions.collection = cmdServer.Flag.String("filer.collection", "", "all data will be stored in this collection")
filerOptions.port = cmdServer.Flag.Int("filer.port", 8888, "filer server http listen port") filerOptions.port = cmdServer.Flag.Int("filer.port", 8888, "filer server http listen port")
filerOptions.publicPort = cmdServer.Flag.Int("filer.port.public", 0, "filer server public http listen port") filerOptions.publicPort = cmdServer.Flag.Int("filer.port.public", 0, "filer server public http listen port")
@ -143,6 +147,9 @@ func runServer(cmd *Command, args []string) bool {
filerOptions.dataCenter = serverDataCenter filerOptions.dataCenter = serverDataCenter
filerOptions.disableHttp = serverDisableHttp filerOptions.disableHttp = serverDisableHttp
filerOptions.metricsAddress = serverOptions.metricsAddress
filerOptions.metricsIntervalSec = serverOptions.metricsIntervalSec
filerAddress := fmt.Sprintf("%s:%d", *serverIp, *filerOptions.port) filerAddress := fmt.Sprintf("%s:%d", *serverIp, *filerOptions.port)
s3Options.filer = &filerAddress s3Options.filer = &filerAddress

29
weed/server/filer_server.go

@ -3,8 +3,11 @@ package weed_server
import ( import (
"net/http" "net/http"
"os" "os"
"time"
"github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/push"
"google.golang.org/grpc" "google.golang.org/grpc"
"github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/filer2"
@ -36,6 +39,8 @@ type FilerOption struct {
DataCenter string DataCenter string
DefaultLevelDbDir string DefaultLevelDbDir string
DisableHttp bool DisableHttp bool
MetricsAddress string
MetricsIntervalSec int
} }
type FilerServer struct { type FilerServer struct {
@ -83,5 +88,29 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
readonlyMux.HandleFunc("/", fs.readonlyFilerHandler) readonlyMux.HandleFunc("/", fs.readonlyFilerHandler)
} }
startPushingMetric(option.MetricsAddress, option.MetricsIntervalSec)
return fs, nil return fs, nil
} }
func startPushingMetric(addr string, intervalSeconds int) {
if intervalSeconds == 0 || addr == "" {
glog.V(0).Info("disable metrics reporting")
return
}
glog.V(0).Infof("push metrics to %s every %d seconds", addr, intervalSeconds)
go loopPushMetrics(addr, intervalSeconds)
}
func loopPushMetrics(addr string, intervalSeconds int) {
pusher := push.New(addr, "filer").Gatherer(prometheus.DefaultGatherer)
for {
err := pusher.Push()
if err != nil {
glog.V(0).Infof("could not push metrics to prometheus push gateway %s: %v", addr, err)
}
time.Sleep(time.Duration(intervalSeconds) * time.Second)
}
}

9
weed/server/filer_server_handlers_read.go

@ -11,6 +11,7 @@ import (
"path" "path"
"strconv" "strconv"
"strings" "strings"
"time"
"github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
@ -18,6 +19,11 @@ import (
) )
func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request, isGetMethod bool) { func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request, isGetMethod bool) {
filerRequestCounter.WithLabelValues("get").Inc()
start := time.Now()
defer func() { filerRequestHistogram.WithLabelValues("get").Observe(time.Since(start).Seconds()) }()
path := r.URL.Path path := r.URL.Path
if strings.HasSuffix(path, "/") && len(path) > 1 { if strings.HasSuffix(path, "/") && len(path) > 1 {
path = path[:len(path)-1] path = path[:len(path)-1]
@ -30,6 +36,8 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request,
return return
} }
glog.V(1).Infof("Not found %s: %v", path, err) glog.V(1).Infof("Not found %s: %v", path, err)
filerRequestCounter.WithLabelValues("read.notfound").Inc()
w.WriteHeader(http.StatusNotFound) w.WriteHeader(http.StatusNotFound)
return return
} }
@ -45,6 +53,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request,
if len(entry.Chunks) == 0 { if len(entry.Chunks) == 0 {
glog.V(1).Infof("no file chunks for %s, attr=%+v", path, entry.Attr) glog.V(1).Infof("no file chunks for %s, attr=%+v", path, entry.Attr)
filerRequestCounter.WithLabelValues("read.nocontent").Inc()
w.WriteHeader(http.StatusNoContent) w.WriteHeader(http.StatusNoContent)
return return
} }

8
weed/server/filer_server_handlers_write.go

@ -70,6 +70,10 @@ func (fs *FilerServer) assignNewFileInfo(w http.ResponseWriter, r *http.Request,
func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
filerRequestCounter.WithLabelValues("post").Inc()
start := time.Now()
defer func() { filerRequestHistogram.WithLabelValues("post").Observe(time.Since(start).Seconds()) }()
ctx := context.Background() ctx := context.Background()
query := r.URL.Query() query := r.URL.Query()
@ -228,6 +232,10 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
// curl -X DELETE http://localhost:8888/path/to?recursive=true // curl -X DELETE http://localhost:8888/path/to?recursive=true
func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) { func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
filerRequestCounter.WithLabelValues("delete").Inc()
start := time.Now()
defer func() { filerRequestHistogram.WithLabelValues("delete").Observe(time.Since(start).Seconds()) }()
isRecursive := r.FormValue("recursive") == "true" isRecursive := r.FormValue("recursive") == "true"
err := fs.filer.DeleteEntryMetaAndData(context.Background(), filer2.FullPath(r.URL.Path), isRecursive, true) err := fs.filer.DeleteEntryMetaAndData(context.Background(), filer2.FullPath(r.URL.Path), isRecursive, true)

27
weed/server/metrics.go

@ -0,0 +1,27 @@
package weed_server
import "github.com/prometheus/client_golang/prometheus"
var (
filerRequestCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "SeaweedFS",
Subsystem: "filer",
Name: "request_total",
Help: "Counter of filer requests.",
}, []string{"type"})
filerRequestHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "SeaweedFS",
Subsystem: "filer",
Name: "request_seconds",
Help: "Bucketed histogram of filer request processing time.",
Buckets: prometheus.ExponentialBuckets(0.0005, 2, 18),
}, []string{"type"})
)
func init() {
prometheus.MustRegister(filerRequestCounter)
prometheus.MustRegister(filerRequestHistogram)
}
Loading…
Cancel
Save