|
|
@ -85,16 +85,20 @@ func NewLimitedOutOfOrderProcessor(limit int32) (c *LimitedOutOfOrderProcessor) |
|
|
|
|
|
|
|
request := value.Interface().(OperationRequest) |
|
|
|
|
|
|
|
c.processorLimitCond.L.Lock() |
|
|
|
for atomic.LoadInt32(&c.currentProcessor) > c.processorLimit { |
|
|
|
c.processorLimitCond.Wait() |
|
|
|
if c.processorLimit > 0 { |
|
|
|
c.processorLimitCond.L.Lock() |
|
|
|
for atomic.LoadInt32(&c.currentProcessor) > c.processorLimit { |
|
|
|
c.processorLimitCond.Wait() |
|
|
|
} |
|
|
|
atomic.AddInt32(&c.currentProcessor, 1) |
|
|
|
c.processorLimitCond.L.Unlock() |
|
|
|
} |
|
|
|
atomic.AddInt32(&c.currentProcessor, 1) |
|
|
|
c.processorLimitCond.L.Unlock() |
|
|
|
|
|
|
|
go func() { |
|
|
|
defer atomic.AddInt32(&c.currentProcessor, -1) |
|
|
|
defer c.processorLimitCond.Signal() |
|
|
|
if c.processorLimit > 0 { |
|
|
|
defer atomic.AddInt32(&c.currentProcessor, -1) |
|
|
|
defer c.processorLimitCond.Signal() |
|
|
|
} |
|
|
|
request() |
|
|
|
}() |
|
|
|
|
|
|
|