You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							154 lines
						
					
					
						
							3.8 KiB
						
					
					
				
			
		
		
		
			
			
			
		
		
	
	
							154 lines
						
					
					
						
							3.8 KiB
						
					
					
				| package resource_pool | |
| 
 | |
| import ( | |
| 	"fmt" | |
| 	"sync" | |
| 	"sync/atomic" | |
| 	"time" | |
| ) | |
| 
 | |
| type Semaphore interface { | |
| 	// Increment the semaphore counter by one. | |
| 	Release() | |
| 
 | |
| 	// Decrement the semaphore counter by one, and block if counter < 0 | |
| 	Acquire() | |
| 
 | |
| 	// Decrement the semaphore counter by one, and block if counter < 0 | |
| 	// Wait for up to the given duration.  Returns true if did not timeout | |
| 	TryAcquire(timeout time.Duration) bool | |
| } | |
| 
 | |
| // A simple counting Semaphore. | |
| type boundedSemaphore struct { | |
| 	slots chan struct{} | |
| } | |
| 
 | |
| // Create a bounded semaphore. The count parameter must be a positive number. | |
| // NOTE: The bounded semaphore will panic if the user tries to Release | |
| // beyond the specified count. | |
| func NewBoundedSemaphore(count uint) Semaphore { | |
| 	sem := &boundedSemaphore{ | |
| 		slots: make(chan struct{}, int(count)), | |
| 	} | |
| 	for i := 0; i < cap(sem.slots); i++ { | |
| 		sem.slots <- struct{}{} | |
| 	} | |
| 	return sem | |
| } | |
| 
 | |
| // Acquire returns on successful acquisition. | |
| func (sem *boundedSemaphore) Acquire() { | |
| 	<-sem.slots | |
| } | |
| 
 | |
| // TryAcquire returns true if it acquires a resource slot within the | |
| // timeout, false otherwise. | |
| func (sem *boundedSemaphore) TryAcquire(timeout time.Duration) bool { | |
| 	if timeout > 0 { | |
| 		// Wait until we get a slot or timeout expires. | |
| 		tm := time.NewTimer(timeout) | |
| 		defer tm.Stop() | |
| 		select { | |
| 		case <-sem.slots: | |
| 			return true | |
| 		case <-tm.C: | |
| 			// Timeout expired. In very rare cases this might happen even if | |
| 			// there is a slot available, e.g. GC pause after we create the timer | |
| 			// and select randomly picked this one out of the two available channels. | |
| 			// We should do one final immediate check below. | |
| 		} | |
| 	} | |
| 
 | |
| 	// Return true if we have a slot available immediately and false otherwise. | |
| 	select { | |
| 	case <-sem.slots: | |
| 		return true | |
| 	default: | |
| 		return false | |
| 	} | |
| } | |
| 
 | |
| // Release the acquired semaphore. You must not release more than you | |
| // have acquired. | |
| func (sem *boundedSemaphore) Release() { | |
| 	select { | |
| 	case sem.slots <- struct{}{}: | |
| 	default: | |
| 		// slots is buffered. If a send blocks, it indicates a programming | |
| 		// error. | |
| 		panic(fmt.Errorf("too many releases for boundedSemaphore")) | |
| 	} | |
| } | |
| 
 | |
| // This returns an unbound counting semaphore with the specified initial count. | |
| // The semaphore counter can be arbitrary large (i.e., Release can be called | |
| // unlimited amount of times). | |
| // | |
| // NOTE: In general, users should use bounded semaphore since it is more | |
| // efficient than unbounded semaphore. | |
| func NewUnboundedSemaphore(initialCount int) Semaphore { | |
| 	res := &unboundedSemaphore{ | |
| 		counter: int64(initialCount), | |
| 	} | |
| 	res.cond.L = &res.lock | |
| 	return res | |
| } | |
| 
 | |
| type unboundedSemaphore struct { | |
| 	lock    sync.Mutex | |
| 	cond    sync.Cond | |
| 	counter int64 | |
| } | |
| 
 | |
| func (s *unboundedSemaphore) Release() { | |
| 	s.lock.Lock() | |
| 	s.counter += 1 | |
| 	if s.counter > 0 { | |
| 		// Not broadcasting here since it's unlike we can satisfy all waiting | |
| 		// goroutines.  Instead, we will Signal again if there are left over | |
| 		// quota after Acquire, in case of lost wakeups. | |
| 		s.cond.Signal() | |
| 	} | |
| 	s.lock.Unlock() | |
| } | |
| 
 | |
| func (s *unboundedSemaphore) Acquire() { | |
| 	s.lock.Lock() | |
| 	for s.counter < 1 { | |
| 		s.cond.Wait() | |
| 	} | |
| 	s.counter -= 1 | |
| 	if s.counter > 0 { | |
| 		s.cond.Signal() | |
| 	} | |
| 	s.lock.Unlock() | |
| } | |
| 
 | |
| func (s *unboundedSemaphore) TryAcquire(timeout time.Duration) bool { | |
| 	done := make(chan bool, 1) | |
| 	// Gate used to communicate between the threads and decide what the result | |
| 	// is.  If the main thread decides, we have timed out, otherwise we succeed. | |
| 	decided := new(int32) | |
| 	atomic.StoreInt32(decided, 0) | |
| 	go func() { | |
| 		s.Acquire() | |
| 		if atomic.SwapInt32(decided, 1) == 0 { | |
| 			// Acquire won the race | |
| 			done <- true | |
| 		} else { | |
| 			// If we already decided the result, and this thread did not win | |
| 			s.Release() | |
| 		} | |
| 	}() | |
| 	select { | |
| 	case <-done: | |
| 		return true | |
| 	case <-time.After(timeout): | |
| 		if atomic.SwapInt32(decided, 1) == 1 { | |
| 			// The other thread already decided the result | |
| 			return true | |
| 		} | |
| 		return false | |
| 	} | |
| }
 |