diff --git a/weed/server/filer_server_handlers_write_upload.go b/weed/server/filer_server_handlers_write_upload.go index cc9bb0dc0..32a722507 100644 --- a/weed/server/filer_server_handlers_write_upload.go +++ b/weed/server/filer_server_handlers_write_upload.go @@ -10,6 +10,7 @@ import ( "sort" "strings" "sync" + "sync/atomic" "time" "github.com/chrislusf/seaweedfs/weed/filer" @@ -38,11 +39,21 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque var uploadErr error var wg sync.WaitGroup + var bytesBufferCounter int64 + bytesBufferLimitCond := sync.NewCond(new(sync.Mutex)) for { - // need to throttle this for large files + // need to throttle used byte buffer + bytesBufferLimitCond.L.Lock() + for atomic.LoadInt64(&bytesBufferCounter) >= 4 { + glog.V(4).Infof("waiting for byte buffer %d", bytesBufferCounter) + bytesBufferLimitCond.Wait() + } + atomic.AddInt64(&bytesBufferCounter, 1) + bytesBufferLimitCond.L.Unlock() + bytesBuffer := bufPool.Get().(*bytes.Buffer) - defer bufPool.Put(bytesBuffer) + glog.V(4).Infof("received byte buffer %d", bytesBufferCounter) limitedReader := io.LimitReader(partReader, int64(chunkSize)) @@ -52,6 +63,9 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque // data, err := ioutil.ReadAll(limitedReader) if err != nil || dataSize == 0 { + bufPool.Put(bytesBuffer) + atomic.AddInt64(&bytesBufferCounter, -1) + bytesBufferLimitCond.Signal() return nil, md5Hash, 0, err, nil } if chunkOffset == 0 && !isAppend(r) { @@ -59,13 +73,21 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque chunkOffset += dataSize smallContent = make([]byte, dataSize) bytesBuffer.Read(smallContent) + bufPool.Put(bytesBuffer) + atomic.AddInt64(&bytesBufferCounter, -1) + bytesBufferLimitCond.Signal() break } } wg.Add(1) go func(offset int64) { - defer wg.Done() + defer func() { + bufPool.Put(bytesBuffer) + atomic.AddInt64(&bytesBufferCounter, -1) + bytesBufferLimitCond.Signal() + wg.Done() + }() chunk, toChunkErr := fs.dataToChunk(fileName, contentType, bytesBuffer.Bytes(), offset, so) if toChunkErr != nil {