You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							156 lines
						
					
					
						
							3.4 KiB
						
					
					
				
			
		
		
		
			
			
			
		
		
	
	
							156 lines
						
					
					
						
							3.4 KiB
						
					
					
				
								package buffered_queue
							 | 
						|
								
							 | 
						|
								import (
							 | 
						|
									"fmt"
							 | 
						|
									"sync"
							 | 
						|
								)
							 | 
						|
								
							 | 
						|
								// ItemChunkNode represents a node in the linked list of job chunks
							 | 
						|
								type ItemChunkNode[T any] struct {
							 | 
						|
									items     []T
							 | 
						|
									headIndex int
							 | 
						|
									tailIndex int
							 | 
						|
									next      *ItemChunkNode[T]
							 | 
						|
									nodeId    int
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// BufferedQueue implements a buffered queue using a linked list of job chunks
							 | 
						|
								type BufferedQueue[T any] struct {
							 | 
						|
									chunkSize   int // Maximum number of items per chunk
							 | 
						|
									head        *ItemChunkNode[T]
							 | 
						|
									tail        *ItemChunkNode[T]
							 | 
						|
									last        *ItemChunkNode[T] // Pointer to the last chunk, for reclaiming memory
							 | 
						|
									count       int               // Total number of items in the queue
							 | 
						|
									mutex       sync.Mutex
							 | 
						|
									nodeCounter int
							 | 
						|
									waitCond    *sync.Cond
							 | 
						|
									isClosed    bool
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// NewBufferedQueue creates a new buffered queue with the specified chunk size
							 | 
						|
								func NewBufferedQueue[T any](chunkSize int) *BufferedQueue[T] {
							 | 
						|
									// Create an empty chunk to initialize head and tail
							 | 
						|
									chunk := &ItemChunkNode[T]{items: make([]T, chunkSize), nodeId: 0}
							 | 
						|
									bq := &BufferedQueue[T]{
							 | 
						|
										chunkSize: chunkSize,
							 | 
						|
										head:      chunk,
							 | 
						|
										tail:      chunk,
							 | 
						|
										last:      chunk,
							 | 
						|
										count:     0,
							 | 
						|
										mutex:     sync.Mutex{},
							 | 
						|
									}
							 | 
						|
									bq.waitCond = sync.NewCond(&bq.mutex)
							 | 
						|
									return bq
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// Enqueue adds a job to the queue
							 | 
						|
								func (q *BufferedQueue[T]) Enqueue(job T) error {
							 | 
						|
								
							 | 
						|
									if q.isClosed {
							 | 
						|
										return fmt.Errorf("queue is closed")
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									q.mutex.Lock()
							 | 
						|
									defer q.mutex.Unlock()
							 | 
						|
								
							 | 
						|
									// If the tail chunk is full, create a new chunk (reusing empty chunks if available)
							 | 
						|
									if q.tail.tailIndex == q.chunkSize {
							 | 
						|
										if q.tail == q.last {
							 | 
						|
											// Create a new chunk
							 | 
						|
											q.nodeCounter++
							 | 
						|
											newChunk := &ItemChunkNode[T]{items: make([]T, q.chunkSize), nodeId: q.nodeCounter}
							 | 
						|
											q.tail.next = newChunk
							 | 
						|
											q.tail = newChunk
							 | 
						|
											q.last = newChunk
							 | 
						|
										} else {
							 | 
						|
											// Reuse an empty chunk
							 | 
						|
											q.tail = q.tail.next
							 | 
						|
											q.tail.headIndex = 0
							 | 
						|
											q.tail.tailIndex = 0
							 | 
						|
											// println("tail moved to chunk", q.tail.nodeId)
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Add the job to the tail chunk
							 | 
						|
									q.tail.items[q.tail.tailIndex] = job
							 | 
						|
									q.tail.tailIndex++
							 | 
						|
									q.count++
							 | 
						|
									if q.count == 1 {
							 | 
						|
										q.waitCond.Signal()
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									return nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// Dequeue removes and returns a job from the queue
							 | 
						|
								func (q *BufferedQueue[T]) Dequeue() (T, bool) {
							 | 
						|
									q.mutex.Lock()
							 | 
						|
									defer q.mutex.Unlock()
							 | 
						|
								
							 | 
						|
									for q.count <= 0 && !q.isClosed {
							 | 
						|
										q.waitCond.Wait()
							 | 
						|
									}
							 | 
						|
									if q.count <= 0 && q.isClosed {
							 | 
						|
										var a T
							 | 
						|
										return a, false
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									q.maybeAdjustHeadIndex()
							 | 
						|
								
							 | 
						|
									job := q.head.items[q.head.headIndex]
							 | 
						|
									q.head.headIndex++
							 | 
						|
									q.count--
							 | 
						|
								
							 | 
						|
									return job, true
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func (q *BufferedQueue[T]) maybeAdjustHeadIndex() {
							 | 
						|
									if q.head.headIndex == q.chunkSize {
							 | 
						|
										q.last.next = q.head
							 | 
						|
										q.head = q.head.next
							 | 
						|
										q.last = q.last.next
							 | 
						|
										q.last.next = nil
							 | 
						|
										//println("reusing chunk", q.last.nodeId)
							 | 
						|
										//fmt.Printf("head: %+v\n", q.head)
							 | 
						|
										//fmt.Printf("tail: %+v\n", q.tail)
							 | 
						|
										//fmt.Printf("last: %+v\n", q.last)
							 | 
						|
										//fmt.Printf("count: %d\n", q.count)
							 | 
						|
										//for p := q.head; p != nil ; p = p.next {
							 | 
						|
										//	fmt.Printf("Node: %+v\n", p)
							 | 
						|
										//}
							 | 
						|
									}
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func (q *BufferedQueue[T]) PeekHead() (T, bool) {
							 | 
						|
									q.mutex.Lock()
							 | 
						|
									defer q.mutex.Unlock()
							 | 
						|
								
							 | 
						|
									if q.count <= 0 {
							 | 
						|
										var a T
							 | 
						|
										return a, false
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									q.maybeAdjustHeadIndex()
							 | 
						|
								
							 | 
						|
									job := q.head.items[q.head.headIndex]
							 | 
						|
									return job, true
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// Size returns the number of items in the queue
							 | 
						|
								func (q *BufferedQueue[T]) Size() int {
							 | 
						|
									q.mutex.Lock()
							 | 
						|
									defer q.mutex.Unlock()
							 | 
						|
									return q.count
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// IsEmpty returns true if the queue is empty
							 | 
						|
								func (q *BufferedQueue[T]) IsEmpty() bool {
							 | 
						|
									return q.Size() == 0
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func (q *BufferedQueue[T]) CloseInput() {
							 | 
						|
									q.mutex.Lock()
							 | 
						|
									defer q.mutex.Unlock()
							 | 
						|
									q.isClosed = true
							 | 
						|
									q.waitCond.Broadcast()
							 | 
						|
								}
							 |