From d1cd8f8c5bedf2c5055579e6fb9bfaded32777c0 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 22 Jun 2019 22:53:52 -0700 Subject: [PATCH] add metrics, refactoring --- weed/server/filer_server_handlers_read.go | 2 + weed/server/filer_server_handlers_read_dir.go | 4 + weed/server/filer_server_handlers_write.go | 174 +++++++++++------- .../filer_server_handlers_write_autochunk.go | 17 +- 4 files changed, 126 insertions(+), 71 deletions(-) diff --git a/weed/server/filer_server_handlers_read.go b/weed/server/filer_server_handlers_read.go index c7cb4f0cc..576339326 100644 --- a/weed/server/filer_server_handlers_read.go +++ b/weed/server/filer_server_handlers_read.go @@ -61,6 +61,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request, w.Header().Set("Accept-Ranges", "bytes") if r.Method == "HEAD" { + stats.FilerRequestCounter.WithLabelValues("head").Inc() w.Header().Set("Content-Length", strconv.FormatInt(int64(filer2.TotalSize(entry.Chunks)), 10)) w.Header().Set("Last-Modified", entry.Attr.Mtime.Format(http.TimeFormat)) setEtag(w, filer2.ETag(entry.Chunks)) @@ -88,6 +89,7 @@ func (fs *FilerServer) handleSingleChunk(w http.ResponseWriter, r *http.Request, } if fs.option.RedirectOnRead { + stats.FilerRequestCounter.WithLabelValues("redirect").Inc() http.Redirect(w, r, urlString, http.StatusFound) return } diff --git a/weed/server/filer_server_handlers_read_dir.go b/weed/server/filer_server_handlers_read_dir.go index 94c894baa..87e864559 100644 --- a/weed/server/filer_server_handlers_read_dir.go +++ b/weed/server/filer_server_handlers_read_dir.go @@ -9,6 +9,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/glog" ui "github.com/chrislusf/seaweedfs/weed/server/filer_ui" + "github.com/chrislusf/seaweedfs/weed/stats" ) // listDirectoryHandler lists directories and folers under a directory @@ -16,6 +17,9 @@ import ( // sub directories are listed on the first page, when "lastFileName" // is empty. func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Request) { + + stats.FilerRequestCounter.WithLabelValues("list").Inc() + path := r.URL.Path if strings.HasSuffix(path, "/") && len(path) > 1 { path = path[:len(path)-1] diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go index 91996dd5e..2c571684d 100644 --- a/weed/server/filer_server_handlers_write.go +++ b/weed/server/filer_server_handlers_write.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "errors" + "fmt" "io" "io/ioutil" "mime" @@ -38,6 +39,11 @@ type FilerPostResult struct { } func (fs *FilerServer) assignNewFileInfo(w http.ResponseWriter, r *http.Request, replication, collection string, dataCenter string) (fileId, urlLocation string, auth security.EncodedJwt, err error) { + + stats.FilerRequestCounter.WithLabelValues("assign").Inc() + start := time.Now() + defer func() { stats.FilerRequestHistogram.WithLabelValues("assign").Observe(time.Since(start).Seconds()) }() + ar := &operation.VolumeAssignRequest{ Count: 1, Replication: replication, @@ -116,67 +122,36 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { } glog.V(4).Infoln("post to", u) - // send request to volume server - request := &http.Request{ - Method: r.Method, - URL: u, - Proto: r.Proto, - ProtoMajor: r.ProtoMajor, - ProtoMinor: r.ProtoMinor, - Header: r.Header, - Body: r.Body, - Host: r.Host, - ContentLength: r.ContentLength, - } - if auth != "" { - request.Header.Set("Authorization", "BEARER "+string(auth)) - } - resp, do_err := util.Do(request) - if do_err != nil { - glog.Errorf("failing to connect to volume server %s: %v, %+v", r.RequestURI, do_err, r.Method) - writeJsonError(w, r, http.StatusInternalServerError, do_err) - return - } - defer func() { - io.Copy(ioutil.Discard, resp.Body) - resp.Body.Close() - }() - etag := resp.Header.Get("ETag") - resp_body, ra_err := ioutil.ReadAll(resp.Body) - if ra_err != nil { - glog.V(0).Infoln("failing to upload to volume server", r.RequestURI, ra_err.Error()) - writeJsonError(w, r, http.StatusInternalServerError, ra_err) - return - } - glog.V(4).Infoln("post result", string(resp_body)) - var ret operation.UploadResult - unmarshal_err := json.Unmarshal(resp_body, &ret) - if unmarshal_err != nil { - glog.V(0).Infoln("failing to read upload resonse", r.RequestURI, string(resp_body)) - writeJsonError(w, r, http.StatusInternalServerError, unmarshal_err) + ret, err := fs.uploadToVolumeServer(r, u, auth, w, fileId) + if err != nil { return } - if ret.Error != "" { - glog.V(0).Infoln("failing to post to volume server", r.RequestURI, ret.Error) - writeJsonError(w, r, http.StatusInternalServerError, errors.New(ret.Error)) + + if err = fs.updateFilerStore(ctx, r, w, replication, collection, ret, fileId); err != nil { return } - // find correct final path - path := r.URL.Path - if strings.HasSuffix(path, "/") { - if ret.Name != "" { - path += ret.Name - } else { - fs.filer.DeleteFileByFileId(fileId) - glog.V(0).Infoln("Can not to write to folder", path, "without a file name!") - writeJsonError(w, r, http.StatusInternalServerError, - errors.New("Can not to write to folder "+path+" without a file name")) - return - } + // send back post result + reply := FilerPostResult{ + Name: ret.Name, + Size: ret.Size, + Error: ret.Error, + Fid: fileId, + Url: urlLocation, } + setEtag(w, ret.ETag) + writeJsonQuiet(w, r, http.StatusCreated, reply) +} + +// update metadata in filer store +func (fs *FilerServer) updateFilerStore(ctx context.Context, r *http.Request, w http.ResponseWriter, + replication string, collection string, ret operation.UploadResult, fileId string) (err error) { + + stats.FilerRequestCounter.WithLabelValues("postStoreWrite").Inc() + start := time.Now() + defer func() { stats.FilerRequestHistogram.WithLabelValues("postStoreWrite").Observe(time.Since(start).Seconds()) }() - // update metadata in filer store + path := r.URL.Path existingEntry, err := fs.filer.FindEntry(ctx, filer2.FullPath(path)) crTime := time.Now() if err == nil && existingEntry != nil { @@ -203,30 +178,95 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { FileId: fileId, Size: uint64(ret.Size), Mtime: time.Now().UnixNano(), - ETag: etag, + ETag: ret.ETag, }}, } if ext := filenamePath.Ext(path); ext != "" { entry.Attr.Mime = mime.TypeByExtension(ext) } // glog.V(4).Infof("saving %s => %+v", path, entry) - if db_err := fs.filer.CreateEntry(ctx, entry); db_err != nil { + if dbErr := fs.filer.CreateEntry(ctx, entry); dbErr != nil { fs.filer.DeleteChunks(entry.FullPath, entry.Chunks) - glog.V(0).Infof("failing to write %s to filer server : %v", path, db_err) - writeJsonError(w, r, http.StatusInternalServerError, db_err) + glog.V(0).Infof("failing to write %s to filer server : %v", path, dbErr) + writeJsonError(w, r, http.StatusInternalServerError, dbErr) + err = dbErr return } - // send back post result - reply := FilerPostResult{ - Name: ret.Name, - Size: ret.Size, - Error: ret.Error, - Fid: fileId, - Url: urlLocation, + return nil +} + +// send request to volume server +func (fs *FilerServer) uploadToVolumeServer(r *http.Request, u *url.URL, auth security.EncodedJwt, w http.ResponseWriter, fileId string) (ret operation.UploadResult, err error) { + + stats.FilerRequestCounter.WithLabelValues("postUpload").Inc() + start := time.Now() + defer func() { stats.FilerRequestHistogram.WithLabelValues("postUpload").Observe(time.Since(start).Seconds()) }() + + request := &http.Request{ + Method: r.Method, + URL: u, + Proto: r.Proto, + ProtoMajor: r.ProtoMajor, + ProtoMinor: r.ProtoMinor, + Header: r.Header, + Body: r.Body, + Host: r.Host, + ContentLength: r.ContentLength, } - setEtag(w, etag) - writeJsonQuiet(w, r, http.StatusCreated, reply) + if auth != "" { + request.Header.Set("Authorization", "BEARER "+string(auth)) + } + resp, doErr := util.Do(request) + if doErr != nil { + glog.Errorf("failing to connect to volume server %s: %v, %+v", r.RequestURI, doErr, r.Method) + writeJsonError(w, r, http.StatusInternalServerError, doErr) + err = doErr + return + } + defer func() { + io.Copy(ioutil.Discard, resp.Body) + resp.Body.Close() + }() + etag := resp.Header.Get("ETag") + respBody, raErr := ioutil.ReadAll(resp.Body) + if raErr != nil { + glog.V(0).Infoln("failing to upload to volume server", r.RequestURI, raErr.Error()) + writeJsonError(w, r, http.StatusInternalServerError, raErr) + err = raErr + return + } + glog.V(4).Infoln("post result", string(respBody)) + unmarshalErr := json.Unmarshal(respBody, &ret) + if unmarshalErr != nil { + glog.V(0).Infoln("failing to read upload resonse", r.RequestURI, string(respBody)) + writeJsonError(w, r, http.StatusInternalServerError, unmarshalErr) + err = unmarshalErr + return + } + if ret.Error != "" { + err = errors.New(ret.Error) + glog.V(0).Infoln("failing to post to volume server", r.RequestURI, ret.Error) + writeJsonError(w, r, http.StatusInternalServerError, err) + return + } + // find correct final path + path := r.URL.Path + if strings.HasSuffix(path, "/") { + if ret.Name != "" { + path += ret.Name + } else { + err = fmt.Errorf("can not to write to folder %s without a file name", path) + fs.filer.DeleteFileByFileId(fileId) + glog.V(0).Infoln("Can not to write to folder", path, "without a file name!") + writeJsonError(w, r, http.StatusInternalServerError, err) + return + } + } + if etag != "" { + ret.ETag = etag + } + return } // curl -X DELETE http://localhost:8888/path/to diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go index 7da6aab04..0727d480e 100644 --- a/weed/server/filer_server_handlers_write_autochunk.go +++ b/weed/server/filer_server_handlers_write_autochunk.go @@ -16,6 +16,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/security" + "github.com/chrislusf/seaweedfs/weed/stats" "github.com/chrislusf/seaweedfs/weed/util" ) @@ -68,6 +69,10 @@ func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r * func (fs *FilerServer) doAutoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, contentLength int64, chunkSize int32, replication string, collection string, dataCenter string) (filerResult *FilerPostResult, replyerr error) { + stats.FilerRequestCounter.WithLabelValues("postAutoChunk").Inc() + start := time.Now() + defer func() { stats.FilerRequestHistogram.WithLabelValues("postAutoChunk").Observe(time.Since(start).Seconds()) }() + multipartReader, multipartReaderErr := r.MultipartReader() if multipartReaderErr != nil { return nil, multipartReaderErr @@ -169,11 +174,11 @@ func (fs *FilerServer) doAutoChunk(ctx context.Context, w http.ResponseWriter, r }, Chunks: fileChunks, } - if db_err := fs.filer.CreateEntry(ctx, entry); db_err != nil { + if dbErr := fs.filer.CreateEntry(ctx, entry); dbErr != nil { fs.filer.DeleteChunks(entry.FullPath, entry.Chunks) - replyerr = db_err - filerResult.Error = db_err.Error() - glog.V(0).Infof("failing to write %s to filer server : %v", path, db_err) + replyerr = dbErr + filerResult.Error = dbErr.Error() + glog.V(0).Infof("failing to write %s to filer server : %v", path, dbErr) return } @@ -183,6 +188,10 @@ func (fs *FilerServer) doAutoChunk(ctx context.Context, w http.ResponseWriter, r func (fs *FilerServer) doUpload(urlLocation string, w http.ResponseWriter, r *http.Request, chunkBuf []byte, fileName string, contentType string, fileId string, auth security.EncodedJwt) (err error) { + stats.FilerRequestCounter.WithLabelValues("postAutoChunkUpload").Inc() + start := time.Now() + defer func() { stats.FilerRequestHistogram.WithLabelValues("postAutoChunkUpload").Observe(time.Since(start).Seconds()) }() + ioReader := ioutil.NopCloser(bytes.NewBuffer(chunkBuf)) uploadResult, uploadError := operation.Upload(urlLocation, fileName, ioReader, false, contentType, nil, auth) if uploadResult != nil {