From 3b1a95ac26debb3080794bf8605ea2d5636818c7 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 8 Aug 2020 12:02:06 -0700 Subject: [PATCH] filer refactoring: same auto chunking logic for POST and PUT, no size limit --- weed/server/filer_server_handlers_write.go | 202 +----------------- .../filer_server_handlers_write_autochunk.go | 92 ++++---- 2 files changed, 49 insertions(+), 245 deletions(-) diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go index c7833a85e..d22376a45 100644 --- a/weed/server/filer_server_handlers_write.go +++ b/weed/server/filer_server_handlers_write.go @@ -2,21 +2,11 @@ package weed_server import ( "context" - "encoding/json" - "errors" - "fmt" - "io" - "io/ioutil" - "mime" "net/http" - "net/url" "os" - filenamePath "path" - "strconv" "strings" "time" - "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" @@ -97,198 +87,8 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { ttlSeconds = int32(ttl.Minutes()) * 60 } - if autoChunked := fs.autoChunk(ctx, w, r, replication, collection, dataCenter, ttlSeconds, ttlString, fsync); autoChunked { - return - } - - if fs.option.Cipher { - reply, err := fs.encrypt(ctx, w, r, replication, collection, dataCenter, ttlSeconds, ttlString, fsync) - if err != nil { - writeJsonError(w, r, http.StatusInternalServerError, err) - } else if reply != nil { - writeJsonQuiet(w, r, http.StatusCreated, reply) - } - - return - } - - fileId, urlLocation, auth, err := fs.assignNewFileInfo(replication, collection, dataCenter, ttlString, fsync) - - if err != nil || fileId == "" || urlLocation == "" { - glog.V(0).Infof("fail to allocate volume for %s, collection:%s, datacenter:%s", r.URL.Path, collection, dataCenter) - writeJsonError(w, r, http.StatusInternalServerError, fmt.Errorf("fail to allocate volume for %s, collection:%s, datacenter:%s", r.URL.Path, collection, dataCenter)) - return - } - - glog.V(4).Infof("write %s to %v", r.URL.Path, urlLocation) - - u, _ := url.Parse(urlLocation) - ret, err := fs.uploadToVolumeServer(r, u, auth, w, fileId) - if err != nil { - return - } + fs.autoChunk(ctx, w, r, replication, collection, dataCenter, ttlSeconds, ttlString, fsync) - if err = fs.updateFilerStore(ctx, r, w, replication, collection, ret, fileId, ttlSeconds); err != nil { - return - } - - // send back post result - reply := FilerPostResult{ - Name: ret.Name, - Size: int64(ret.Size), - Error: ret.Error, - Fid: fileId, - Url: urlLocation, - } - setEtag(w, ret.ETag) - writeJsonQuiet(w, r, http.StatusCreated, reply) -} - -// update metadata in filer store -func (fs *FilerServer) updateFilerStore(ctx context.Context, r *http.Request, w http.ResponseWriter, replication string, - collection string, ret *operation.UploadResult, fileId string, ttlSeconds int32) (err error) { - - stats.FilerRequestCounter.WithLabelValues("postStoreWrite").Inc() - start := time.Now() - defer func() { - stats.FilerRequestHistogram.WithLabelValues("postStoreWrite").Observe(time.Since(start).Seconds()) - }() - - modeStr := r.URL.Query().Get("mode") - if modeStr == "" { - modeStr = "0660" - } - mode, err := strconv.ParseUint(modeStr, 8, 32) - if err != nil { - glog.Errorf("Invalid mode format: %s, use 0660 by default", modeStr) - mode = 0660 - } - - path := r.URL.Path - if strings.HasSuffix(path, "/") { - if ret.Name != "" { - path += ret.Name - } - } - existingEntry, err := fs.filer.FindEntry(ctx, util.FullPath(path)) - crTime := time.Now() - if err == nil && existingEntry != nil { - crTime = existingEntry.Crtime - } - entry := &filer2.Entry{ - FullPath: util.FullPath(path), - Attr: filer2.Attr{ - Mtime: time.Now(), - Crtime: crTime, - Mode: os.FileMode(mode), - Uid: OS_UID, - Gid: OS_GID, - Replication: replication, - Collection: collection, - TtlSec: ttlSeconds, - Mime: ret.Mime, - Md5: util.Base64Md5ToBytes(ret.ContentMd5), - }, - Chunks: []*filer_pb.FileChunk{{ - FileId: fileId, - Size: uint64(ret.Size), - Mtime: time.Now().UnixNano(), - ETag: ret.ETag, - }}, - } - if entry.Attr.Mime == "" { - if ext := filenamePath.Ext(path); ext != "" { - entry.Attr.Mime = mime.TypeByExtension(ext) - } - } - // glog.V(4).Infof("saving %s => %+v", path, entry) - if dbErr := fs.filer.CreateEntry(ctx, entry, false, false); dbErr != nil { - fs.filer.DeleteChunks(entry.Chunks) - glog.V(0).Infof("failing to write %s to filer server : %v", path, dbErr) - writeJsonError(w, r, http.StatusInternalServerError, dbErr) - err = dbErr - return - } - - return nil -} - -// send request to volume server -func (fs *FilerServer) uploadToVolumeServer(r *http.Request, u *url.URL, auth security.EncodedJwt, w http.ResponseWriter, fileId string) (ret *operation.UploadResult, err error) { - - stats.FilerRequestCounter.WithLabelValues("postUpload").Inc() - start := time.Now() - defer func() { stats.FilerRequestHistogram.WithLabelValues("postUpload").Observe(time.Since(start).Seconds()) }() - - ret = &operation.UploadResult{} - - body := r.Body - - request := &http.Request{ - Method: r.Method, - URL: u, - Proto: r.Proto, - ProtoMajor: r.ProtoMajor, - ProtoMinor: r.ProtoMinor, - Header: r.Header, - Body: body, - Host: r.Host, - ContentLength: r.ContentLength, - } - - if auth != "" { - request.Header.Set("Authorization", "BEARER "+string(auth)) - } - resp, doErr := util.Do(request) - if doErr != nil { - glog.Errorf("failing to connect to volume server %s: %v, %+v", r.RequestURI, doErr, r.Method) - writeJsonError(w, r, http.StatusInternalServerError, doErr) - err = doErr - return - } - defer func() { - io.Copy(ioutil.Discard, resp.Body) - resp.Body.Close() - }() - - respBody, raErr := ioutil.ReadAll(resp.Body) - if raErr != nil { - glog.V(0).Infoln("failing to upload to volume server", r.RequestURI, raErr.Error()) - writeJsonError(w, r, http.StatusInternalServerError, raErr) - err = raErr - return - } - - glog.V(4).Infoln("post result", string(respBody)) - unmarshalErr := json.Unmarshal(respBody, &ret) - if unmarshalErr != nil { - glog.V(0).Infoln("failing to read upload resonse", r.RequestURI, string(respBody)) - writeJsonError(w, r, http.StatusInternalServerError, unmarshalErr) - err = unmarshalErr - return - } - if ret.Error != "" { - err = errors.New(ret.Error) - glog.V(0).Infoln("failing to post to volume server", r.RequestURI, ret.Error) - writeJsonError(w, r, http.StatusInternalServerError, err) - return - } - // find correct final path - path := r.URL.Path - if strings.HasSuffix(path, "/") { - if ret.Name != "" { - path += ret.Name - } else { - err = fmt.Errorf("can not to write to folder %s without a file name", path) - fs.filer.DeleteFileByFileId(fileId) - glog.V(0).Infoln("Can not to write to folder", path, "without a file name!") - writeJsonError(w, r, http.StatusInternalServerError, err) - return - } - } - ret.ETag = getEtag(resp) - ret.ContentMd5 = resp.Header.Get("Content-MD5") - return } // curl -X DELETE http://localhost:8888/path/to diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go index 77ae10f19..0365ea3ab 100644 --- a/weed/server/filer_server_handlers_write_autochunk.go +++ b/weed/server/filer_server_handlers_write_autochunk.go @@ -7,6 +7,7 @@ import ( "io" "io/ioutil" "net/http" + "os" "path" "strconv" "strings" @@ -22,7 +23,7 @@ import ( ) func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, - replication string, collection string, dataCenter string, ttlSec int32, ttlString string, fsync bool) bool { + replication string, collection string, dataCenter string, ttlSec int32, ttlString string, fsync bool) { // autoChunking can be set at the command-line level or as a query param. Query param overrides command-line query := r.URL.Query() @@ -32,28 +33,9 @@ func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r * if maxMB <= 0 && fs.option.MaxMB > 0 { maxMB = int32(fs.option.MaxMB) } - if maxMB <= 0 { - glog.V(4).Infoln("AutoChunking not enabled") - return false - } - glog.V(4).Infoln("AutoChunking level set to", maxMB, "(MB)") chunkSize := 1024 * 1024 * maxMB - contentLength := int64(0) - if contentLengthHeader := r.Header["Content-Length"]; len(contentLengthHeader) == 1 { - contentLength, _ = strconv.ParseInt(contentLengthHeader[0], 10, 64) - if contentLength <= int64(chunkSize) { - glog.V(4).Infoln("Content-Length of", contentLength, "is less than the chunk size of", chunkSize, "so autoChunking will be skipped.") - return false - } - } - - if contentLength <= 0 { - glog.V(4).Infoln("Content-Length value is missing or unexpected so autoChunking will be skipped.") - return false - } - stats.FilerRequestCounter.WithLabelValues("postAutoChunk").Inc() start := time.Now() defer func() { @@ -62,30 +44,32 @@ func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r * var reply *FilerPostResult var err error + var md5bytes []byte if r.Method == "POST" { - reply, err = fs.doPostAutoChunk(ctx, w, r, contentLength, chunkSize, replication, collection, dataCenter, ttlSec, ttlString, fsync) + reply, md5bytes, err = fs.doPostAutoChunk(ctx, w, r, chunkSize, replication, collection, dataCenter, ttlSec, ttlString, fsync) } else { - reply, err = fs.doPutAutoChunk(ctx, w, r, contentLength, chunkSize, replication, collection, dataCenter, ttlSec, ttlString, fsync) + reply, md5bytes, err = fs.doPutAutoChunk(ctx, w, r, chunkSize, replication, collection, dataCenter, ttlSec, ttlString, fsync) } if err != nil { writeJsonError(w, r, http.StatusInternalServerError, err) } else if reply != nil { + if len(md5bytes) > 0 { + w.Header().Set("Content-MD5", util.Base64Encode(md5bytes)) + } writeJsonQuiet(w, r, http.StatusCreated, reply) } - return true } -func (fs *FilerServer) doPostAutoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, - contentLength int64, chunkSize int32, replication string, collection string, dataCenter string, ttlSec int32, ttlString string, fsync bool) (filerResult *FilerPostResult, replyerr error) { +func (fs *FilerServer) doPostAutoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, chunkSize int32, replication string, collection string, dataCenter string, ttlSec int32, ttlString string, fsync bool) (filerResult *FilerPostResult, md5bytes []byte, replyerr error) { multipartReader, multipartReaderErr := r.MultipartReader() if multipartReaderErr != nil { - return nil, multipartReaderErr + return nil, nil, multipartReaderErr } part1, part1Err := multipartReader.NextPart() if part1Err != nil { - return nil, part1Err + return nil, nil, part1Err } fileName := part1.FileName() @@ -97,9 +81,9 @@ func (fs *FilerServer) doPostAutoChunk(ctx context.Context, w http.ResponseWrite contentType = "" } - fileChunks, md5Hash, chunkOffset, err := fs.uploadReaderToChunks(w, r, part1, contentLength, chunkSize, replication, collection, dataCenter, ttlString, fileName, contentType, fsync) + fileChunks, md5Hash, chunkOffset, err := fs.uploadReaderToChunks(w, r, part1, chunkSize, replication, collection, dataCenter, ttlString, fileName, contentType, fsync) if err != nil { - return nil, err + return nil, nil, err } fileChunks, replyerr = filer2.MaybeManifestize(fs.saveAsChunk(replication, collection, dataCenter, ttlString, fsync), fileChunks) @@ -108,21 +92,20 @@ func (fs *FilerServer) doPostAutoChunk(ctx context.Context, w http.ResponseWrite return } - filerResult, replyerr = fs.saveMetaData(ctx, r, fileName, replication, collection, ttlSec, contentType, md5Hash, fileChunks, chunkOffset) + md5bytes = md5Hash.Sum(nil) + filerResult, replyerr = fs.saveMetaData(ctx, r, fileName, replication, collection, ttlSec, contentType, md5bytes, fileChunks, chunkOffset) return } - -func (fs *FilerServer) doPutAutoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, - contentLength int64, chunkSize int32, replication string, collection string, dataCenter string, ttlSec int32, ttlString string, fsync bool) (filerResult *FilerPostResult, replyerr error) { +func (fs *FilerServer) doPutAutoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, chunkSize int32, replication string, collection string, dataCenter string, ttlSec int32, ttlString string, fsync bool) (filerResult *FilerPostResult, md5bytes []byte, replyerr error) { fileName := "" contentType := "" - fileChunks, md5Hash, chunkOffset, err := fs.uploadReaderToChunks(w, r, r.Body, contentLength, chunkSize, replication, collection, dataCenter, ttlString, fileName, contentType, fsync) + fileChunks, md5Hash, chunkOffset, err := fs.uploadReaderToChunks(w, r, r.Body, chunkSize, replication, collection, dataCenter, ttlString, fileName, contentType, fsync) if err != nil { - return nil, err + return nil, nil, err } fileChunks, replyerr = filer2.MaybeManifestize(fs.saveAsChunk(replication, collection, dataCenter, ttlString, fsync), fileChunks) @@ -131,12 +114,26 @@ func (fs *FilerServer) doPutAutoChunk(ctx context.Context, w http.ResponseWriter return } - filerResult, replyerr = fs.saveMetaData(ctx, r, fileName, replication, collection, ttlSec, contentType, md5Hash, fileChunks, chunkOffset) + md5bytes = md5Hash.Sum(nil) + filerResult, replyerr = fs.saveMetaData(ctx, r, fileName, replication, collection, ttlSec, contentType, md5bytes, fileChunks, chunkOffset) return } -func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileName string, replication string, collection string, ttlSec int32, contentType string, md5Hash hash.Hash, fileChunks []*filer_pb.FileChunk, chunkOffset int64) (filerResult *FilerPostResult, replyerr error) { +func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileName string, replication string, collection string, ttlSec int32, contentType string, md5bytes []byte, fileChunks []*filer_pb.FileChunk, chunkOffset int64) (filerResult *FilerPostResult, replyerr error) { + + // detect file mode + modeStr := r.URL.Query().Get("mode") + if modeStr == "" { + modeStr = "0660" + } + mode, err := strconv.ParseUint(modeStr, 8, 32) + if err != nil { + glog.Errorf("Invalid mode format: %s, use 0660 by default", modeStr) + mode = 0660 + } + + // fix the path path := r.URL.Path if strings.HasSuffix(path, "/") { if fileName != "" { @@ -144,20 +141,28 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa } } + // fix the crTime + existingEntry, err := fs.filer.FindEntry(ctx, util.FullPath(path)) + crTime := time.Now() + if err == nil && existingEntry != nil { + crTime = existingEntry.Crtime + } + + glog.V(4).Infoln("saving", path) entry := &filer2.Entry{ FullPath: util.FullPath(path), Attr: filer2.Attr{ Mtime: time.Now(), - Crtime: time.Now(), - Mode: 0660, + Crtime: crTime, + Mode: os.FileMode(mode), Uid: OS_UID, Gid: OS_GID, Replication: replication, Collection: collection, TtlSec: ttlSec, Mime: contentType, - Md5: md5Hash.Sum(nil), + Md5: md5bytes, }, Chunks: fileChunks, } @@ -176,7 +181,7 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa return filerResult, replyerr } -func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Request, reader io.Reader, contentLength int64, chunkSize int32, replication string, collection string, dataCenter string, ttlString string, fileName string, contentType string, fsync bool) ([]*filer_pb.FileChunk, hash.Hash, int64, error) { +func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Request, reader io.Reader, chunkSize int32, replication string, collection string, dataCenter string, ttlString string, fileName string, contentType string, fsync bool) ([]*filer_pb.FileChunk, hash.Hash, int64, error) { var fileChunks []*filer_pb.FileChunk md5Hash := md5.New() @@ -184,7 +189,7 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque chunkOffset := int64(0) - for chunkOffset < contentLength { + for { limitedReader := io.LimitReader(partReader, int64(chunkSize)) // assign one file id for one chunk @@ -207,7 +212,7 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque // Save to chunk manifest structure fileChunks = append(fileChunks, uploadResult.ToPbFileChunk(fileId, chunkOffset)) - glog.V(4).Infof("uploaded %s chunk %d to %s [%d,%d) of %d", fileName, len(fileChunks), fileId, chunkOffset, chunkOffset+int64(uploadResult.Size), contentLength) + glog.V(4).Infof("uploaded %s chunk %d to %s [%d,%d)", fileName, len(fileChunks), fileId, chunkOffset, chunkOffset+int64(uploadResult.Size)) // reset variables for the next chunk chunkOffset = chunkOffset + int64(uploadResult.Size) @@ -250,4 +255,3 @@ func (fs *FilerServer) saveAsChunk(replication string, collection string, dataCe return uploadResult.ToPbFileChunk(fileId, offset), collection, replication, nil } } -