You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

61 lines
1.3 KiB

  1. package operation
  2. import (
  3. "reflect"
  4. "runtime"
  5. "sync"
  6. "sync/atomic"
  7. )
  8. type OperationRequest func()
  9. var (
  10. requestSlots = uint32(32)
  11. requests = make([]chan OperationRequest, requestSlots) // increase slots to increase fairness
  12. ConcurrentUploadLimit = int32(runtime.NumCPU()) // directly related to memory usage
  13. concurrentLimitCond = sync.NewCond(new(sync.Mutex))
  14. concurrentUpload int32
  15. )
  16. func init() {
  17. for i := 0; i < int(requestSlots); i++ {
  18. requests[i] = make(chan OperationRequest)
  19. }
  20. cases := make([]reflect.SelectCase, requestSlots)
  21. for i, ch := range requests {
  22. cases[i] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch)}
  23. }
  24. go func() {
  25. for {
  26. _, value, ok := reflect.Select(cases)
  27. if !ok {
  28. continue
  29. }
  30. request := value.Interface().(OperationRequest)
  31. concurrentLimitCond.L.Lock()
  32. for atomic.LoadInt32(&concurrentUpload) > ConcurrentUploadLimit {
  33. concurrentLimitCond.Wait()
  34. }
  35. atomic.AddInt32(&concurrentUpload, 1)
  36. concurrentLimitCond.L.Unlock()
  37. go func() {
  38. defer atomic.AddInt32(&concurrentUpload, -1)
  39. defer concurrentLimitCond.Signal()
  40. request()
  41. }()
  42. }
  43. }()
  44. }
  45. func AsyncOutOfOrderProcess(slotKey uint32, request OperationRequest) {
  46. index := slotKey % requestSlots
  47. requests[index] <- request
  48. }