diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go index c6d9080a1..f0ac6d80d 100644 --- a/weed/filesys/wfs.go +++ b/weed/filesys/wfs.go @@ -77,7 +77,7 @@ type WFS struct { signature int32 // throttle writers - concurrentWriters *util.LimitedConcurrentExecutor + concurrentWriters *util.LimitedOutOfOrderProcessor Server *fs.Server } type statsCache struct { @@ -135,7 +135,7 @@ func NewSeaweedFileSystem(option *Option) *WFS { wfs.fsNodeCache = newFsCache(wfs.root) if wfs.option.ConcurrentWriters > 0 { - wfs.concurrentWriters = util.NewLimitedConcurrentExecutor(wfs.option.ConcurrentWriters) + wfs.concurrentWriters = util.NewLimitedOutOfOrderProcessor(int32(wfs.option.ConcurrentWriters)) } return wfs diff --git a/weed/operation/upload_processor.go b/weed/operation/upload_processor.go deleted file mode 100644 index 903ae72ce..000000000 --- a/weed/operation/upload_processor.go +++ /dev/null @@ -1,61 +0,0 @@ -package operation - -import ( - "reflect" - "runtime" - "sync" - "sync/atomic" -) - -type OperationRequest func() - -var ( - requestSlots = uint32(32) - requests = make([]chan OperationRequest, requestSlots) // increase slots to increase fairness - ConcurrentUploadLimit = int32(runtime.NumCPU()) // directly related to memory usage - concurrentLimitCond = sync.NewCond(new(sync.Mutex)) - concurrentUpload int32 -) - -func init() { - - for i := 0; i < int(requestSlots); i++ { - requests[i] = make(chan OperationRequest) - } - - cases := make([]reflect.SelectCase, requestSlots) - for i, ch := range requests { - cases[i] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch)} - } - - go func() { - for { - _, value, ok := reflect.Select(cases) - if !ok { - continue - } - - request := value.Interface().(OperationRequest) - - concurrentLimitCond.L.Lock() - for atomic.LoadInt32(&concurrentUpload) > ConcurrentUploadLimit { - concurrentLimitCond.Wait() - } - atomic.AddInt32(&concurrentUpload, 1) - concurrentLimitCond.L.Unlock() - - go func() { - defer atomic.AddInt32(&concurrentUpload, -1) - defer concurrentLimitCond.Signal() - request() - }() - - } - }() - -} - -func AsyncOutOfOrderProcess(slotKey uint32, request OperationRequest) { - index := slotKey % requestSlots - requests[index] <- request -} diff --git a/weed/server/filer_server_handlers_write_upload.go b/weed/server/filer_server_handlers_write_upload.go index 21af6a109..25275cf05 100644 --- a/weed/server/filer_server_handlers_write_upload.go +++ b/weed/server/filer_server_handlers_write_upload.go @@ -5,8 +5,8 @@ import ( "hash" "io" "io/ioutil" - "math/rand" "net/http" + "runtime" "strings" "sync" "time" @@ -20,6 +20,10 @@ import ( "github.com/chrislusf/seaweedfs/weed/util" ) +var ( + limitedUploadProcessor = util.NewLimitedOutOfOrderProcessor(int32(runtime.NumCPU())) +) + func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Request, reader io.Reader, chunkSize int32, fileName, contentType string, contentLength int64, so *operation.StorageOption) (fileChunks []*filer_pb.FileChunk, md5Hash hash.Hash, dataSize int64, err error, smallContent []byte) { md5Hash = md5.New() @@ -58,7 +62,7 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque for readErr == nil { wg.Add(1) - operation.AsyncOutOfOrderProcess(rand.Uint32(), func() { + limitedUploadProcessor.Execute(func() { defer wg.Done() var localOffset int64 diff --git a/weed/util/limiter.go b/weed/util/limiter.go index 91499632c..2e5168d3d 100644 --- a/weed/util/limiter.go +++ b/weed/util/limiter.go @@ -1,40 +1,70 @@ package util -// initial version comes from https://github.com/korovkin/limiter/blob/master/limiter.go +import ( + "math/rand" + "reflect" + "sync" + "sync/atomic" +) -// LimitedConcurrentExecutor object -type LimitedConcurrentExecutor struct { - limit int - tokenChan chan int +type OperationRequest func() + +type LimitedOutOfOrderProcessor struct { + processorSlots uint32 + processors []chan OperationRequest + processorLimit int32 + processorLimitCond *sync.Cond + currentProcessor int32 } -func NewLimitedConcurrentExecutor(limit int) *LimitedConcurrentExecutor { +func NewLimitedOutOfOrderProcessor(limit int32) (c *LimitedOutOfOrderProcessor) { - // allocate a limiter instance - c := &LimitedConcurrentExecutor{ - limit: limit, - tokenChan: make(chan int, limit), + processorSlots := uint32(32) + c = &LimitedOutOfOrderProcessor{ + processorSlots: processorSlots, + processors: make([]chan OperationRequest, processorSlots), + processorLimit: limit, + processorLimitCond: sync.NewCond(new(sync.Mutex)), } - // allocate the tokenChan: - for i := 0; i < c.limit; i++ { - c.tokenChan <- i + for i := 0; i < int(processorSlots); i++ { + c.processors[i] = make(chan OperationRequest) } - return c -} + cases := make([]reflect.SelectCase, processorSlots) + for i, ch := range c.processors { + cases[i] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch)} + } -// Execute adds a function to the execution queue. -// if num of go routines allocated by this instance is < limit -// launch a new go routine to execute job -// else wait until a go routine becomes available -func (c *LimitedConcurrentExecutor) Execute(job func()) { - token := <-c.tokenChan go func() { - defer func() { - c.tokenChan <- token - }() - // run the job - job() + for { + _, value, ok := reflect.Select(cases) + if !ok { + continue + } + + request := value.Interface().(OperationRequest) + + c.processorLimitCond.L.Lock() + for atomic.LoadInt32(&c.currentProcessor) > c.processorLimit { + c.processorLimitCond.Wait() + } + atomic.AddInt32(&c.currentProcessor, 1) + c.processorLimitCond.L.Unlock() + + go func() { + defer atomic.AddInt32(&c.currentProcessor, -1) + defer c.processorLimitCond.Signal() + request() + }() + + } }() + + return c +} + +func (c *LimitedOutOfOrderProcessor) Execute(request OperationRequest) { + index := rand.Uint32() % c.processorSlots + c.processors[index] <- request }