diff --git a/weed/util/limited_pool.go b/weed/util/limited_pool.go new file mode 100644 index 000000000..91499632c --- /dev/null +++ b/weed/util/limited_pool.go @@ -0,0 +1,40 @@ +package util + +// initial version comes from https://github.com/korovkin/limiter/blob/master/limiter.go + +// LimitedConcurrentExecutor object +type LimitedConcurrentExecutor struct { + limit int + tokenChan chan int +} + +func NewLimitedConcurrentExecutor(limit int) *LimitedConcurrentExecutor { + + // allocate a limiter instance + c := &LimitedConcurrentExecutor{ + limit: limit, + tokenChan: make(chan int, limit), + } + + // allocate the tokenChan: + for i := 0; i < c.limit; i++ { + c.tokenChan <- i + } + + return c +} + +// Execute adds a function to the execution queue. +// if num of go routines allocated by this instance is < limit +// launch a new go routine to execute job +// else wait until a go routine becomes available +func (c *LimitedConcurrentExecutor) Execute(job func()) { + token := <-c.tokenChan + go func() { + defer func() { + c.tokenChan <- token + }() + // run the job + job() + }() +} diff --git a/weed/util/limiter.go b/weed/util/limiter.go deleted file mode 100644 index 9d63c12a1..000000000 --- a/weed/util/limiter.go +++ /dev/null @@ -1,114 +0,0 @@ -package util - -import ( - "math/rand" - "reflect" - "sync" - "sync/atomic" -) - -// initial version comes from https://github.com/korovkin/limiter/blob/master/limiter.go - -// LimitedConcurrentExecutor object -type LimitedConcurrentExecutor struct { - limit int - tokenChan chan int -} - -func NewLimitedConcurrentExecutor(limit int) *LimitedConcurrentExecutor { - - // allocate a limiter instance - c := &LimitedConcurrentExecutor{ - limit: limit, - tokenChan: make(chan int, limit), - } - - // allocate the tokenChan: - for i := 0; i < c.limit; i++ { - c.tokenChan <- i - } - - return c -} - -// Execute adds a function to the execution queue. -// if num of go routines allocated by this instance is < limit -// launch a new go routine to execute job -// else wait until a go routine becomes available -func (c *LimitedConcurrentExecutor) Execute(job func()) { - token := <-c.tokenChan - go func() { - defer func() { - c.tokenChan <- token - }() - // run the job - job() - }() -} - -// a different implementation, but somehow more "conservative" -type OperationRequest func() - -type LimitedOutOfOrderProcessor struct { - processorLimit int32 - processorLimitCond *sync.Cond - processorSlots uint32 - processors []chan OperationRequest - currentProcessor int32 -} - -func NewLimitedOutOfOrderProcessor(limit int32) (c *LimitedOutOfOrderProcessor) { - - processorSlots := uint32(32) - c = &LimitedOutOfOrderProcessor{ - processorSlots: processorSlots, - processors: make([]chan OperationRequest, processorSlots), - processorLimit: limit, - processorLimitCond: sync.NewCond(new(sync.Mutex)), - } - - for i := 0; i < int(processorSlots); i++ { - c.processors[i] = make(chan OperationRequest) - } - - cases := make([]reflect.SelectCase, processorSlots) - for i, ch := range c.processors { - cases[i] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch)} - } - - go func() { - for { - _, value, ok := reflect.Select(cases) - if !ok { - continue - } - - request := value.Interface().(OperationRequest) - - if c.processorLimit > 0 { - c.processorLimitCond.L.Lock() - for atomic.LoadInt32(&c.currentProcessor) > c.processorLimit { - c.processorLimitCond.Wait() - } - atomic.AddInt32(&c.currentProcessor, 1) - c.processorLimitCond.L.Unlock() - } - - go func() { - if c.processorLimit > 0 { - defer atomic.AddInt32(&c.currentProcessor, -1) - defer c.processorLimitCond.Signal() - } - request() - }() - - } - }() - - return c -} - -func (c *LimitedOutOfOrderProcessor) Execute(request OperationRequest) { - index := rand.Uint32() % c.processorSlots - c.processors[index] <- request -}