diff --git a/weed/server/filer_server_handlers_write_upload.go b/weed/server/filer_server_handlers_write_upload.go index 81b2ce1b0..3ab45453e 100644 --- a/weed/server/filer_server_handlers_write_upload.go +++ b/weed/server/filer_server_handlers_write_upload.go @@ -6,9 +6,7 @@ import ( "io" "io/ioutil" "net/http" - "runtime" "strings" - "sync" "time" "github.com/chrislusf/seaweedfs/weed/filer" @@ -20,143 +18,75 @@ import ( "github.com/chrislusf/seaweedfs/weed/util" ) -var ( - limitedUploadProcessor = util.NewLimitedOutOfOrderProcessor(int32(runtime.NumCPU())) -) - -func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Request, reader io.Reader, chunkSize int32, fileName, contentType string, contentLength int64, so *operation.StorageOption) (fileChunks []*filer_pb.FileChunk, md5Hash hash.Hash, dataSize int64, err error, smallContent []byte) { +func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Request, reader io.Reader, chunkSize int32, fileName, contentType string, contentLength int64, so *operation.StorageOption) ([]*filer_pb.FileChunk, hash.Hash, int64, error, []byte) { + var fileChunks []*filer_pb.FileChunk - md5Hash = md5.New() + md5Hash := md5.New() var partReader = ioutil.NopCloser(io.TeeReader(reader, md5Hash)) - // save small content directly - if !isAppend(r) && ((0 < contentLength && contentLength < fs.option.SaveToFilerLimit) || strings.HasPrefix(r.URL.Path, filer.DirectoryEtcRoot) && contentLength < 4*1024) { - smallContent, err = ioutil.ReadAll(partReader) - dataSize = int64(len(smallContent)) - return - } + chunkOffset := int64(0) + var smallContent []byte - resultsChan := make(chan *ChunkCreationResult, 2) + for { + limitedReader := io.LimitReader(partReader, int64(chunkSize)) - var waitForAllData sync.WaitGroup - waitForAllData.Add(1) - go func() { - // process upload results - defer waitForAllData.Done() - for result := range resultsChan { - if result.err != nil { - err = result.err - continue - } - - // Save to chunk manifest structure - fileChunks = append(fileChunks, result.chunk) + data, err := ioutil.ReadAll(limitedReader) + if err != nil { + return nil, nil, 0, err, nil } - }() - - var lock sync.Mutex - readOffset := int64(0) - var wg sync.WaitGroup - - for err == nil { - - wg.Add(1) - request := func() { - defer wg.Done() - - var localOffset int64 - // read from the input - lock.Lock() - localOffset = readOffset - limitedReader := io.LimitReader(partReader, int64(chunkSize)) - data, readErr := ioutil.ReadAll(limitedReader) - readOffset += int64(len(data)) - lock.Unlock() - // handle read errors - if readErr != nil { - if err == nil { - err = readErr - } - if readErr != io.EOF { - resultsChan <- &ChunkCreationResult{ - err: readErr, - } - } - return + if chunkOffset == 0 && !isAppend(r) { + if len(data) < int(fs.option.SaveToFilerLimit) || strings.HasPrefix(r.URL.Path, filer.DirectoryEtcRoot) && len(data) < 4*1024 { + smallContent = data + chunkOffset += int64(len(data)) + break } - if len(data) == 0 { - readErr = io.EOF - if err == nil { - err = readErr - } - return + } + dataReader := util.NewBytesReader(data) + + // retry to assign a different file id + var fileId, urlLocation string + var auth security.EncodedJwt + var assignErr, uploadErr error + var uploadResult *operation.UploadResult + for i := 0; i < 3; i++ { + // assign one file id for one chunk + fileId, urlLocation, auth, assignErr = fs.assignNewFileInfo(so) + if assignErr != nil { + return nil, nil, 0, assignErr, nil } - // upload - dataReader := util.NewBytesReader(data) - fileId, uploadResult, uploadErr := fs.doCreateChunk(w, r, so, dataReader, fileName, contentType) + // upload the chunk to the volume server + uploadResult, uploadErr, _ = fs.doUpload(urlLocation, w, r, dataReader, fileName, contentType, nil, auth) if uploadErr != nil { - if err == nil { - err = uploadErr - } - resultsChan <- &ChunkCreationResult{ - err: uploadErr, - } - return - } - - glog.V(4).Infof("uploaded %s to %s [%d,%d)", fileName, fileId, localOffset, localOffset+int64(uploadResult.Size)) - - // send back uploaded file chunk - resultsChan <- &ChunkCreationResult{ - chunk: uploadResult.ToPbFileChunk(fileId, localOffset), + time.Sleep(251 * time.Millisecond) + continue } - + break + } + if uploadErr != nil { + return nil, nil, 0, uploadErr, nil } - limitedUploadProcessor.Execute(request) - } - - go func() { - wg.Wait() - close(resultsChan) - }() - - waitForAllData.Wait() - if err == io.EOF { - err = nil - } + // if last chunk exhausted the reader exactly at the border + if uploadResult.Size == 0 { + break + } - return fileChunks, md5Hash, readOffset, err, nil -} + // Save to chunk manifest structure + fileChunks = append(fileChunks, uploadResult.ToPbFileChunk(fileId, chunkOffset)) -type ChunkCreationResult struct { - chunk *filer_pb.FileChunk - err error -} + glog.V(4).Infof("uploaded %s chunk %d to %s [%d,%d)", fileName, len(fileChunks), fileId, chunkOffset, chunkOffset+int64(uploadResult.Size)) -func (fs *FilerServer) doCreateChunk(w http.ResponseWriter, r *http.Request, so *operation.StorageOption, dataReader *util.BytesReader, fileName string, contentType string) (string, *operation.UploadResult, error) { - // retry to assign a different file id - var fileId, urlLocation string - var auth security.EncodedJwt - var assignErr, uploadErr error - var uploadResult *operation.UploadResult - for i := 0; i < 3; i++ { - // assign one file id for one chunk - fileId, urlLocation, auth, assignErr = fs.assignNewFileInfo(so) - if assignErr != nil { - return "", nil, assignErr - } + // reset variables for the next chunk + chunkOffset = chunkOffset + int64(uploadResult.Size) - // upload the chunk to the volume server - uploadResult, uploadErr, _ = fs.doUpload(urlLocation, w, r, dataReader, fileName, contentType, nil, auth) - if uploadErr != nil { - time.Sleep(251 * time.Millisecond) - continue + // if last chunk was not at full chunk size, but already exhausted the reader + if int64(uploadResult.Size) < int64(chunkSize) { + break } - break } - return fileId, uploadResult, uploadErr + + return fileChunks, md5Hash, chunkOffset, nil, smallContent } func (fs *FilerServer) doUpload(urlLocation string, w http.ResponseWriter, r *http.Request, limitedReader io.Reader, fileName string, contentType string, pairMap map[string]string, auth security.EncodedJwt) (*operation.UploadResult, error, []byte) {