diff --git a/weed/operation/upload_processor.go b/weed/operation/upload_processor.go new file mode 100644 index 000000000..903ae72ce --- /dev/null +++ b/weed/operation/upload_processor.go @@ -0,0 +1,61 @@ +package operation + +import ( + "reflect" + "runtime" + "sync" + "sync/atomic" +) + +type OperationRequest func() + +var ( + requestSlots = uint32(32) + requests = make([]chan OperationRequest, requestSlots) // increase slots to increase fairness + ConcurrentUploadLimit = int32(runtime.NumCPU()) // directly related to memory usage + concurrentLimitCond = sync.NewCond(new(sync.Mutex)) + concurrentUpload int32 +) + +func init() { + + for i := 0; i < int(requestSlots); i++ { + requests[i] = make(chan OperationRequest) + } + + cases := make([]reflect.SelectCase, requestSlots) + for i, ch := range requests { + cases[i] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch)} + } + + go func() { + for { + _, value, ok := reflect.Select(cases) + if !ok { + continue + } + + request := value.Interface().(OperationRequest) + + concurrentLimitCond.L.Lock() + for atomic.LoadInt32(&concurrentUpload) > ConcurrentUploadLimit { + concurrentLimitCond.Wait() + } + atomic.AddInt32(&concurrentUpload, 1) + concurrentLimitCond.L.Unlock() + + go func() { + defer atomic.AddInt32(&concurrentUpload, -1) + defer concurrentLimitCond.Signal() + request() + }() + + } + }() + +} + +func AsyncOutOfOrderProcess(slotKey uint32, request OperationRequest) { + index := slotKey % requestSlots + requests[index] <- request +} diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index a876b6d83..2734223ea 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -59,7 +59,7 @@ type FilerOption struct { Port uint32 recursiveDelete bool Cipher bool - SaveToFilerLimit int + SaveToFilerLimit int64 Filers []string ConcurrentUploadLimit int64 } diff --git a/weed/server/filer_server_handlers_write_upload.go b/weed/server/filer_server_handlers_write_upload.go index cf8cdf3d8..21af6a109 100644 --- a/weed/server/filer_server_handlers_write_upload.go +++ b/weed/server/filer_server_handlers_write_upload.go @@ -5,8 +5,10 @@ import ( "hash" "io" "io/ioutil" + "math/rand" "net/http" "strings" + "sync" "time" "github.com/chrislusf/seaweedfs/weed/filer" @@ -18,75 +20,127 @@ import ( "github.com/chrislusf/seaweedfs/weed/util" ) -func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Request, reader io.Reader, chunkSize int32, fileName, contentType string, contentLength int64, so *operation.StorageOption) ([]*filer_pb.FileChunk, hash.Hash, int64, error, []byte) { - var fileChunks []*filer_pb.FileChunk +func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Request, reader io.Reader, chunkSize int32, fileName, contentType string, contentLength int64, so *operation.StorageOption) (fileChunks []*filer_pb.FileChunk, md5Hash hash.Hash, dataSize int64, err error, smallContent []byte) { - md5Hash := md5.New() + md5Hash = md5.New() var partReader = ioutil.NopCloser(io.TeeReader(reader, md5Hash)) - chunkOffset := int64(0) - var smallContent []byte + // save small content directly + if !isAppend(r) && ((0 < contentLength && contentLength < fs.option.SaveToFilerLimit) || strings.HasPrefix(r.URL.Path, filer.DirectoryEtcRoot) && contentLength < 4*1024) { + smallContent, err = ioutil.ReadAll(partReader) + dataSize = int64(len(smallContent)) + return + } - for { - limitedReader := io.LimitReader(partReader, int64(chunkSize)) + resultsChan := make(chan *ChunkCreationResult, operation.ConcurrentUploadLimit) - data, err := ioutil.ReadAll(limitedReader) - if err != nil { - return nil, nil, 0, err, nil - } - if chunkOffset == 0 && !isAppend(r) { - if len(data) < fs.option.SaveToFilerLimit || strings.HasPrefix(r.URL.Path, filer.DirectoryEtcRoot) && len(data) < 4*1024 { - smallContent = data - chunkOffset += int64(len(data)) - break + var waitForAllData sync.WaitGroup + waitForAllData.Add(1) + go func() { + // process upload results + defer waitForAllData.Done() + for result := range resultsChan { + if result.err != nil { + err = result.err + continue } + + // Save to chunk manifest structure + fileChunks = append(fileChunks, result.chunk) } - dataReader := util.NewBytesReader(data) - - // retry to assign a different file id - var fileId, urlLocation string - var auth security.EncodedJwt - var assignErr, uploadErr error - var uploadResult *operation.UploadResult - for i := 0; i < 3; i++ { - // assign one file id for one chunk - fileId, urlLocation, auth, assignErr = fs.assignNewFileInfo(so) - if assignErr != nil { - return nil, nil, 0, assignErr, nil + }() + + var lock sync.Mutex + readOffset := int64(0) + var wg sync.WaitGroup + var readErr error + + for readErr == nil { + + wg.Add(1) + operation.AsyncOutOfOrderProcess(rand.Uint32(), func() { + defer wg.Done() + + var localOffset int64 + var data []byte + // read from the input + lock.Lock() + localOffset = readOffset + limitedReader := io.LimitReader(partReader, int64(chunkSize)) + data, readErr = ioutil.ReadAll(limitedReader) + readOffset += int64(len(data)) + lock.Unlock() + // handle read errors + if readErr != nil { + if readErr != io.EOF { + resultsChan <- &ChunkCreationResult{ + err: readErr, + } + } + return + } + if len(data) == 0 { + readErr = io.EOF + return } - // upload the chunk to the volume server - uploadResult, uploadErr, _ = fs.doUpload(urlLocation, w, r, dataReader, fileName, contentType, nil, auth) + // upload + dataReader := util.NewBytesReader(data) + fileId, uploadResult, uploadErr := fs.doCreateChunk(w, r, so, dataReader, fileName, contentType) if uploadErr != nil { - time.Sleep(251 * time.Millisecond) - continue + resultsChan <- &ChunkCreationResult{ + err: uploadErr, + } + return } - break - } - if uploadErr != nil { - return nil, nil, 0, uploadErr, nil - } - // if last chunk exhausted the reader exactly at the border - if uploadResult.Size == 0 { - break - } + glog.V(4).Infof("uploaded %s to %s [%d,%d)", fileName, fileId, localOffset, localOffset+int64(uploadResult.Size)) - // Save to chunk manifest structure - fileChunks = append(fileChunks, uploadResult.ToPbFileChunk(fileId, chunkOffset)) + // send back uploaded file chunk + resultsChan <- &ChunkCreationResult{ + chunk: uploadResult.ToPbFileChunk(fileId, localOffset), + } + + }) + } + + go func() { + wg.Wait() + close(resultsChan) + }() + + waitForAllData.Wait() - glog.V(4).Infof("uploaded %s chunk %d to %s [%d,%d)", fileName, len(fileChunks), fileId, chunkOffset, chunkOffset+int64(uploadResult.Size)) + return fileChunks, md5Hash, readOffset, err, nil +} - // reset variables for the next chunk - chunkOffset = chunkOffset + int64(uploadResult.Size) +type ChunkCreationResult struct { + chunk *filer_pb.FileChunk + err error +} - // if last chunk was not at full chunk size, but already exhausted the reader - if int64(uploadResult.Size) < int64(chunkSize) { - break +func (fs *FilerServer) doCreateChunk(w http.ResponseWriter, r *http.Request, so *operation.StorageOption, dataReader *util.BytesReader, fileName string, contentType string) (string, *operation.UploadResult, error) { + // retry to assign a different file id + var fileId, urlLocation string + var auth security.EncodedJwt + var assignErr, uploadErr error + var uploadResult *operation.UploadResult + for i := 0; i < 3; i++ { + // assign one file id for one chunk + fileId, urlLocation, auth, assignErr = fs.assignNewFileInfo(so) + if assignErr != nil { + return "", nil, assignErr } - } - return fileChunks, md5Hash, chunkOffset, nil, smallContent + // upload the chunk to the volume server + uploadResult, uploadErr, _ = fs.doUpload(urlLocation, w, r, dataReader, fileName, contentType, nil, auth) + if uploadErr != nil { + time.Sleep(251 * time.Millisecond) + continue + } + break + } + return fileId, uploadResult, uploadErr } func (fs *FilerServer) doUpload(urlLocation string, w http.ResponseWriter, r *http.Request, limitedReader io.Reader, fileName string, contentType string, pairMap map[string]string, auth security.EncodedJwt) (*operation.UploadResult, error, []byte) {