From 44f1ba68944ed9ff79324317902e1a768d7e92bb Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 6 Jun 2021 18:43:04 -0700 Subject: [PATCH] refactor --- .../filer_server_handlers_write_upload.go | 89 +++++++++++-------- 1 file changed, 54 insertions(+), 35 deletions(-) diff --git a/weed/server/filer_server_handlers_write_upload.go b/weed/server/filer_server_handlers_write_upload.go index 7082ab0f8..d093676bc 100644 --- a/weed/server/filer_server_handlers_write_upload.go +++ b/weed/server/filer_server_handlers_write_upload.go @@ -56,54 +56,27 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque break } } - dataReader := util.NewBytesReader(bytesBuffer.Bytes()) - - // retry to assign a different file id - var fileId, urlLocation string - var auth security.EncodedJwt - var assignErr, uploadErr error - var uploadResult *operation.UploadResult - for i := 0; i < 3; i++ { - // assign one file id for one chunk - fileId, urlLocation, auth, assignErr = fs.assignNewFileInfo(so) - if assignErr != nil { - return nil, nil, 0, assignErr, nil - } - // upload the chunk to the volume server - uploadResult, uploadErr, _ = fs.doUpload(urlLocation, w, r, dataReader, fileName, contentType, nil, auth) - if uploadErr != nil { - time.Sleep(251 * time.Millisecond) - continue - } - break - } + chunk, uploadErr := fs.dataToChunk(fileName, contentType, bytesBuffer.Bytes(), chunkOffset, so, md5Hash) + if uploadErr != nil { return nil, nil, 0, uploadErr, nil } // if last chunk exhausted the reader exactly at the border - if uploadResult.Size == 0 { + if chunk == nil { break } - if chunkOffset == 0 { - uploadedMd5 := util.Base64Md5ToBytes(uploadResult.ContentMd5) - readedMd5 := md5Hash.Sum(nil) - if !bytes.Equal(uploadedMd5, readedMd5) { - glog.Errorf("md5 %x does not match %x uploaded chunk %s to the volume server", readedMd5, uploadedMd5, uploadResult.Name) - } - } - // Save to chunk manifest structure - fileChunks = append(fileChunks, uploadResult.ToPbFileChunk(fileId, chunkOffset)) + fileChunks = append(fileChunks, chunk) - glog.V(4).Infof("uploaded %s chunk %d to %s [%d,%d)", fileName, len(fileChunks), fileId, chunkOffset, chunkOffset+int64(uploadResult.Size)) + glog.V(4).Infof("uploaded %s chunk %d to %s [%d,%d)", fileName, len(fileChunks), chunk.FileId, chunkOffset, chunkOffset+int64(chunk.Size)) // reset variables for the next chunk - chunkOffset = chunkOffset + int64(uploadResult.Size) + chunkOffset = chunkOffset + int64(chunk.Size) // if last chunk was not at full chunk size, but already exhausted the reader - if int64(uploadResult.Size) < int64(chunkSize) { + if int64(chunk.Size) < int64(chunkSize) { break } } @@ -111,7 +84,7 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque return fileChunks, md5Hash, chunkOffset, nil, smallContent } -func (fs *FilerServer) doUpload(urlLocation string, w http.ResponseWriter, r *http.Request, limitedReader io.Reader, fileName string, contentType string, pairMap map[string]string, auth security.EncodedJwt) (*operation.UploadResult, error, []byte) { +func (fs *FilerServer) doUpload(urlLocation string, limitedReader io.Reader, fileName string, contentType string, pairMap map[string]string, auth security.EncodedJwt) (*operation.UploadResult, error, []byte) { stats.FilerRequestCounter.WithLabelValues("chunkUpload").Inc() start := time.Now() @@ -125,3 +98,49 @@ func (fs *FilerServer) doUpload(urlLocation string, w http.ResponseWriter, r *ht } return uploadResult, err, data } + +func (fs *FilerServer) dataToChunk(fileName, contentType string, data []byte, chunkOffset int64, so *operation.StorageOption, md5Hash hash.Hash) (*filer_pb.FileChunk, error) { + dataReader := util.NewBytesReader(data) + + // retry to assign a different file id + var fileId, urlLocation string + var auth security.EncodedJwt + var uploadErr error + var uploadResult *operation.UploadResult + for i := 0; i < 3; i++ { + // assign one file id for one chunk + fileId, urlLocation, auth, uploadErr = fs.assignNewFileInfo(so) + if uploadErr != nil { + glog.V(4).Infof("retry later due to assign error: %v", uploadErr) + time.Sleep(time.Duration(i+1) * 251 * time.Millisecond) + continue + } + + // upload the chunk to the volume server + uploadResult, uploadErr, _ = fs.doUpload(urlLocation, dataReader, fileName, contentType, nil, auth) + if uploadErr != nil { + glog.V(4).Infof("retry later due to upload error: %v", uploadErr) + time.Sleep(time.Duration(i+1) * 251 * time.Millisecond) + continue + } + break + } + if uploadErr != nil { + glog.Errorf("upload error: %v", uploadErr) + return nil, uploadErr + } + + // if last chunk exhausted the reader exactly at the border + if uploadResult.Size == 0 { + return nil, nil + } + if chunkOffset == 0 { + uploadedMd5 := util.Base64Md5ToBytes(uploadResult.ContentMd5) + readedMd5 := md5Hash.Sum(nil) + if !bytes.Equal(uploadedMd5, readedMd5) { + glog.Errorf("md5 %x does not match %x uploaded chunk %s to the volume server", readedMd5, uploadedMd5, uploadResult.Name) + } + } + + return uploadResult.ToPbFileChunk(fileId, chunkOffset), nil +}