You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							343 lines
						
					
					
						
							8.4 KiB
						
					
					
				
			
		
		
		
			
			
			
		
		
	
	
							343 lines
						
					
					
						
							8.4 KiB
						
					
					
				| package resource_pool | |
| 
 | |
| import ( | |
| 	"errors" | |
| 	"fmt" | |
| 	"sync" | |
| 	"sync/atomic" | |
| 	"time" | |
| ) | |
| 
 | |
| type idleHandle struct { | |
| 	handle    interface{} | |
| 	keepUntil *time.Time | |
| } | |
| 
 | |
| type TooManyHandles struct { | |
| 	location string | |
| } | |
| 
 | |
| func (t TooManyHandles) Error() string { | |
| 	return fmt.Sprintf("Too many handles to %s", t.location) | |
| } | |
| 
 | |
| type OpenHandleError struct { | |
| 	location string | |
| 	err      error | |
| } | |
| 
 | |
| func (o OpenHandleError) Error() string { | |
| 	return fmt.Sprintf("Failed to open resource handle: %s (%v)", o.location, o.err) | |
| } | |
| 
 | |
| // A resource pool implementation where all handles are associated to the | |
| // same resource location. | |
| type simpleResourcePool struct { | |
| 	options Options | |
| 
 | |
| 	numActive *int32 // atomic counter | |
|  | |
| 	activeHighWaterMark *int32 // atomic / monotonically increasing value | |
|  | |
| 	openTokens Semaphore | |
| 
 | |
| 	mutex       sync.Mutex | |
| 	location    string        // guard by mutex | |
| 	idleHandles []*idleHandle // guarded by mutex | |
| 	isLameDuck  bool          // guarded by mutex | |
| } | |
| 
 | |
| // This returns a SimpleResourcePool, where all handles are associated to a | |
| // single resource location. | |
| func NewSimpleResourcePool(options Options) ResourcePool { | |
| 	numActive := new(int32) | |
| 	atomic.StoreInt32(numActive, 0) | |
| 
 | |
| 	activeHighWaterMark := new(int32) | |
| 	atomic.StoreInt32(activeHighWaterMark, 0) | |
| 
 | |
| 	var tokens Semaphore | |
| 	if options.OpenMaxConcurrency > 0 { | |
| 		tokens = NewBoundedSemaphore(uint(options.OpenMaxConcurrency)) | |
| 	} | |
| 
 | |
| 	return &simpleResourcePool{ | |
| 		location:            "", | |
| 		options:             options, | |
| 		numActive:           numActive, | |
| 		activeHighWaterMark: activeHighWaterMark, | |
| 		openTokens:          tokens, | |
| 		mutex:               sync.Mutex{}, | |
| 		idleHandles:         make([]*idleHandle, 0, 0), | |
| 		isLameDuck:          false, | |
| 	} | |
| } | |
| 
 | |
| // See ResourcePool for documentation. | |
| func (p *simpleResourcePool) NumActive() int32 { | |
| 	return atomic.LoadInt32(p.numActive) | |
| } | |
| 
 | |
| // See ResourcePool for documentation. | |
| func (p *simpleResourcePool) ActiveHighWaterMark() int32 { | |
| 	return atomic.LoadInt32(p.activeHighWaterMark) | |
| } | |
| 
 | |
| // See ResourcePool for documentation. | |
| func (p *simpleResourcePool) NumIdle() int { | |
| 	p.mutex.Lock() | |
| 	defer p.mutex.Unlock() | |
| 	return len(p.idleHandles) | |
| } | |
| 
 | |
| // SimpleResourcePool can only register a single (network, address) entry. | |
| // Register should be call before any Get calls. | |
| func (p *simpleResourcePool) Register(resourceLocation string) error { | |
| 	if resourceLocation == "" { | |
| 		return errors.New("Invalid resource location") | |
| 	} | |
| 
 | |
| 	p.mutex.Lock() | |
| 	defer p.mutex.Unlock() | |
| 
 | |
| 	if p.isLameDuck { | |
| 		return fmt.Errorf( | |
| 			"cannot register %s to lame duck resource pool", | |
| 			resourceLocation) | |
| 	} | |
| 
 | |
| 	if p.location == "" { | |
| 		p.location = resourceLocation | |
| 		return nil | |
| 	} | |
| 	return errors.New("SimpleResourcePool can only register one location") | |
| } | |
| 
 | |
| // SimpleResourcePool will enter lame duck mode upon calling Unregister. | |
| func (p *simpleResourcePool) Unregister(resourceLocation string) error { | |
| 	p.EnterLameDuckMode() | |
| 	return nil | |
| } | |
| 
 | |
| func (p *simpleResourcePool) ListRegistered() []string { | |
| 	p.mutex.Lock() | |
| 	defer p.mutex.Unlock() | |
| 
 | |
| 	if p.location != "" { | |
| 		return []string{p.location} | |
| 	} | |
| 	return []string{} | |
| } | |
| 
 | |
| func (p *simpleResourcePool) getLocation() (string, error) { | |
| 	p.mutex.Lock() | |
| 	defer p.mutex.Unlock() | |
| 
 | |
| 	if p.location == "" { | |
| 		return "", fmt.Errorf( | |
| 			"resource location is not set for SimpleResourcePool") | |
| 	} | |
| 
 | |
| 	if p.isLameDuck { | |
| 		return "", fmt.Errorf( | |
| 			"lame duck resource pool cannot return handles to %s", | |
| 			p.location) | |
| 	} | |
| 
 | |
| 	return p.location, nil | |
| } | |
| 
 | |
| // This gets an active resource from the resource pool.  Note that the | |
| // resourceLocation argument is ignored (The handles are associated to the | |
| // resource location provided by the first Register call). | |
| func (p *simpleResourcePool) Get(unused string) (ManagedHandle, error) { | |
| 	activeCount := atomic.AddInt32(p.numActive, 1) | |
| 	if p.options.MaxActiveHandles > 0 && | |
| 		activeCount > p.options.MaxActiveHandles { | |
| 
 | |
| 		atomic.AddInt32(p.numActive, -1) | |
| 		return nil, TooManyHandles{p.location} | |
| 	} | |
| 
 | |
| 	highest := atomic.LoadInt32(p.activeHighWaterMark) | |
| 	for activeCount > highest && | |
| 		!atomic.CompareAndSwapInt32( | |
| 			p.activeHighWaterMark, | |
| 			highest, | |
| 			activeCount) { | |
| 
 | |
| 		highest = atomic.LoadInt32(p.activeHighWaterMark) | |
| 	} | |
| 
 | |
| 	if h := p.getIdleHandle(); h != nil { | |
| 		return h, nil | |
| 	} | |
| 
 | |
| 	location, err := p.getLocation() | |
| 	if err != nil { | |
| 		atomic.AddInt32(p.numActive, -1) | |
| 		return nil, err | |
| 	} | |
| 
 | |
| 	if p.openTokens != nil { | |
| 		// Current implementation does not wait for tokens to become available. | |
| 		// If that causes availability hits, we could increase the wait, | |
| 		// similar to simple_pool.go. | |
| 		if p.openTokens.TryAcquire(0) { | |
| 			defer p.openTokens.Release() | |
| 		} else { | |
| 			// We could not immediately acquire a token. | |
| 			// Instead of waiting | |
| 			atomic.AddInt32(p.numActive, -1) | |
| 			return nil, OpenHandleError{ | |
| 				p.location, errors.New("Open Error: reached OpenMaxConcurrency")} | |
| 		} | |
| 	} | |
| 
 | |
| 	handle, err := p.options.Open(location) | |
| 	if err != nil { | |
| 		atomic.AddInt32(p.numActive, -1) | |
| 		return nil, OpenHandleError{p.location, err} | |
| 	} | |
| 
 | |
| 	return NewManagedHandle(p.location, handle, p, p.options), nil | |
| } | |
| 
 | |
| // See ResourcePool for documentation. | |
| func (p *simpleResourcePool) Release(handle ManagedHandle) error { | |
| 	if pool, ok := handle.Owner().(*simpleResourcePool); !ok || pool != p { | |
| 		return errors.New( | |
| 			"Resource pool cannot take control of a handle owned " + | |
| 				"by another resource pool") | |
| 	} | |
| 
 | |
| 	h := handle.ReleaseUnderlyingHandle() | |
| 	if h != nil { | |
| 		// We can unref either before or after queuing the idle handle. | |
| 		// The advantage of unref-ing before queuing is that there is | |
| 		// a higher chance of successful Get when number of active handles | |
| 		// is close to the limit (but potentially more handle creation). | |
| 		// The advantage of queuing before unref-ing is that there's a | |
| 		// higher chance of reusing handle (but potentially more Get failures). | |
| 		atomic.AddInt32(p.numActive, -1) | |
| 		p.queueIdleHandles(h) | |
| 	} | |
| 
 | |
| 	return nil | |
| } | |
| 
 | |
| // See ResourcePool for documentation. | |
| func (p *simpleResourcePool) Discard(handle ManagedHandle) error { | |
| 	if pool, ok := handle.Owner().(*simpleResourcePool); !ok || pool != p { | |
| 		return errors.New( | |
| 			"Resource pool cannot take control of a handle owned " + | |
| 				"by another resource pool") | |
| 	} | |
| 
 | |
| 	h := handle.ReleaseUnderlyingHandle() | |
| 	if h != nil { | |
| 		atomic.AddInt32(p.numActive, -1) | |
| 		if err := p.options.Close(h); err != nil { | |
| 			return fmt.Errorf("failed to close resource handle: %w", err) | |
| 		} | |
| 	} | |
| 	return nil | |
| } | |
| 
 | |
| // See ResourcePool for documentation. | |
| func (p *simpleResourcePool) EnterLameDuckMode() { | |
| 	p.mutex.Lock() | |
| 
 | |
| 	toClose := p.idleHandles | |
| 	p.isLameDuck = true | |
| 	p.idleHandles = []*idleHandle{} | |
| 
 | |
| 	p.mutex.Unlock() | |
| 
 | |
| 	p.closeHandles(toClose) | |
| } | |
| 
 | |
| // This returns an idle resource, if there is one. | |
| func (p *simpleResourcePool) getIdleHandle() ManagedHandle { | |
| 	var toClose []*idleHandle | |
| 	defer func() { | |
| 		// NOTE: Must keep the closure around to late bind the toClose slice. | |
| 		p.closeHandles(toClose) | |
| 	}() | |
| 
 | |
| 	now := p.options.getCurrentTime() | |
| 
 | |
| 	p.mutex.Lock() | |
| 	defer p.mutex.Unlock() | |
| 
 | |
| 	var i int | |
| 	for i = 0; i < len(p.idleHandles); i++ { | |
| 		idle := p.idleHandles[i] | |
| 		if idle.keepUntil == nil || now.Before(*idle.keepUntil) { | |
| 			break | |
| 		} | |
| 	} | |
| 	if i > 0 { | |
| 		toClose = p.idleHandles[0:i] | |
| 	} | |
| 
 | |
| 	if i < len(p.idleHandles) { | |
| 		idle := p.idleHandles[i] | |
| 		p.idleHandles = p.idleHandles[i+1:] | |
| 		return NewManagedHandle(p.location, idle.handle, p, p.options) | |
| 	} | |
| 
 | |
| 	if len(p.idleHandles) > 0 { | |
| 		p.idleHandles = []*idleHandle{} | |
| 	} | |
| 	return nil | |
| } | |
| 
 | |
| // This adds an idle resource to the pool. | |
| func (p *simpleResourcePool) queueIdleHandles(handle interface{}) { | |
| 	var toClose []*idleHandle | |
| 	defer func() { | |
| 		// NOTE: Must keep the closure around to late bind the toClose slice. | |
| 		p.closeHandles(toClose) | |
| 	}() | |
| 
 | |
| 	now := p.options.getCurrentTime() | |
| 	var keepUntil *time.Time | |
| 	if p.options.MaxIdleTime != nil { | |
| 		// NOTE: Assign to temp variable first to work around compiler bug | |
| 		x := now.Add(*p.options.MaxIdleTime) | |
| 		keepUntil = &x | |
| 	} | |
| 
 | |
| 	p.mutex.Lock() | |
| 	defer p.mutex.Unlock() | |
| 
 | |
| 	if p.isLameDuck { | |
| 		toClose = []*idleHandle{ | |
| 			{handle: handle}, | |
| 		} | |
| 		return | |
| 	} | |
| 
 | |
| 	p.idleHandles = append( | |
| 		p.idleHandles, | |
| 		&idleHandle{ | |
| 			handle:    handle, | |
| 			keepUntil: keepUntil, | |
| 		}) | |
| 
 | |
| 	nIdleHandles := uint32(len(p.idleHandles)) | |
| 	if nIdleHandles > p.options.MaxIdleHandles { | |
| 		handlesToClose := nIdleHandles - p.options.MaxIdleHandles | |
| 		toClose = p.idleHandles[0:handlesToClose] | |
| 		p.idleHandles = p.idleHandles[handlesToClose:nIdleHandles] | |
| 	} | |
| } | |
| 
 | |
| // Closes resources, at this point it is assumed that this resources | |
| // are no longer referenced from the main idleHandles slice. | |
| func (p *simpleResourcePool) closeHandles(handles []*idleHandle) { | |
| 	for _, handle := range handles { | |
| 		_ = p.options.Close(handle.handle) | |
| 	} | |
| }
 |