You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

110 lines
2.5 KiB

4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
  1. package util
  2. import (
  3. "math/rand"
  4. "reflect"
  5. "sync"
  6. "sync/atomic"
  7. )
  8. // initial version comes from https://github.com/korovkin/limiter/blob/master/limiter.go
  9. // LimitedConcurrentExecutor object
  10. type LimitedConcurrentExecutor struct {
  11. limit int
  12. tokenChan chan int
  13. }
  14. func NewLimitedConcurrentExecutor(limit int) *LimitedConcurrentExecutor {
  15. // allocate a limiter instance
  16. c := &LimitedConcurrentExecutor{
  17. limit: limit,
  18. tokenChan: make(chan int, limit),
  19. }
  20. // allocate the tokenChan:
  21. for i := 0; i < c.limit; i++ {
  22. c.tokenChan <- i
  23. }
  24. return c
  25. }
  26. // Execute adds a function to the execution queue.
  27. // if num of go routines allocated by this instance is < limit
  28. // launch a new go routine to execute job
  29. // else wait until a go routine becomes available
  30. func (c *LimitedConcurrentExecutor) Execute(job func()) {
  31. token := <-c.tokenChan
  32. go func() {
  33. defer func() {
  34. c.tokenChan <- token
  35. }()
  36. // run the job
  37. job()
  38. }()
  39. }
  40. // a different implementation, but somehow more "conservative"
  41. type OperationRequest func()
  42. type LimitedOutOfOrderProcessor struct {
  43. processorSlots uint32
  44. processors []chan OperationRequest
  45. processorLimit int32
  46. processorLimitCond *sync.Cond
  47. currentProcessor int32
  48. }
  49. func NewLimitedOutOfOrderProcessor(limit int32) (c *LimitedOutOfOrderProcessor) {
  50. processorSlots := uint32(32)
  51. c = &LimitedOutOfOrderProcessor{
  52. processorSlots: processorSlots,
  53. processors: make([]chan OperationRequest, processorSlots),
  54. processorLimit: limit,
  55. processorLimitCond: sync.NewCond(new(sync.Mutex)),
  56. }
  57. for i := 0; i < int(processorSlots); i++ {
  58. c.processors[i] = make(chan OperationRequest)
  59. }
  60. cases := make([]reflect.SelectCase, processorSlots)
  61. for i, ch := range c.processors {
  62. cases[i] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch)}
  63. }
  64. go func() {
  65. for {
  66. _, value, ok := reflect.Select(cases)
  67. if !ok {
  68. continue
  69. }
  70. request := value.Interface().(OperationRequest)
  71. c.processorLimitCond.L.Lock()
  72. for atomic.LoadInt32(&c.currentProcessor) > c.processorLimit {
  73. c.processorLimitCond.Wait()
  74. }
  75. atomic.AddInt32(&c.currentProcessor, 1)
  76. c.processorLimitCond.L.Unlock()
  77. go func() {
  78. defer atomic.AddInt32(&c.currentProcessor, -1)
  79. defer c.processorLimitCond.Signal()
  80. request()
  81. }()
  82. }
  83. }()
  84. return c
  85. }
  86. func (c *LimitedOutOfOrderProcessor) Execute(request OperationRequest) {
  87. index := rand.Uint32() % c.processorSlots
  88. c.processors[index] <- request
  89. }