|
|
package buffered_queue
import ( "fmt" "sync" )
// ItemChunkNode represents a node in the linked list of job chunks
type ItemChunkNode[T any] struct { items []T headIndex int tailIndex int next *ItemChunkNode[T] nodeId int }
// BufferedQueue implements a buffered queue using a linked list of job chunks
type BufferedQueue[T any] struct { chunkSize int // Maximum number of items per chunk
head *ItemChunkNode[T] tail *ItemChunkNode[T] last *ItemChunkNode[T] // Pointer to the last chunk, for reclaiming memory
count int // Total number of items in the queue
mutex sync.Mutex nodeCounter int waitCond *sync.Cond isClosed bool }
// NewBufferedQueue creates a new buffered queue with the specified chunk size
func NewBufferedQueue[T any](chunkSize int) *BufferedQueue[T] { // Create an empty chunk to initialize head and tail
chunk := &ItemChunkNode[T]{items: make([]T, chunkSize), nodeId: 0} bq := &BufferedQueue[T]{ chunkSize: chunkSize, head: chunk, tail: chunk, last: chunk, count: 0, mutex: sync.Mutex{}, } bq.waitCond = sync.NewCond(&bq.mutex) return bq }
// Enqueue adds a job to the queue
func (q *BufferedQueue[T]) Enqueue(job T) error {
if q.isClosed { return fmt.Errorf("queue is closed") }
q.mutex.Lock() defer q.mutex.Unlock()
// If the tail chunk is full, create a new chunk (reusing empty chunks if available)
if q.tail.tailIndex == q.chunkSize { if q.tail == q.last { // Create a new chunk
q.nodeCounter++ newChunk := &ItemChunkNode[T]{items: make([]T, q.chunkSize), nodeId: q.nodeCounter} q.tail.next = newChunk q.tail = newChunk q.last = newChunk } else { // Reuse an empty chunk
q.tail = q.tail.next q.tail.headIndex = 0 q.tail.tailIndex = 0 // println("tail moved to chunk", q.tail.nodeId)
} }
// Add the job to the tail chunk
q.tail.items[q.tail.tailIndex] = job q.tail.tailIndex++ q.count++ if q.count == 1 { q.waitCond.Signal() }
return nil }
// Dequeue removes and returns a job from the queue
func (q *BufferedQueue[T]) Dequeue() (T, bool) { q.mutex.Lock() defer q.mutex.Unlock()
for q.count <= 0 && !q.isClosed { q.waitCond.Wait() } if q.count <= 0 && q.isClosed { var a T return a, false }
job := q.head.items[q.head.headIndex] q.head.headIndex++ q.count--
if q.head.headIndex == q.chunkSize { q.last.next = q.head q.head = q.head.next q.last = q.last.next q.last.next = nil //println("reusing chunk", q.last.nodeId)
//fmt.Printf("head: %+v\n", q.head)
//fmt.Printf("tail: %+v\n", q.tail)
//fmt.Printf("last: %+v\n", q.last)
//fmt.Printf("count: %d\n", q.count)
//for p := q.head; p != nil ; p = p.next {
// fmt.Printf("Node: %+v\n", p)
//}
}
return job, true }
func (q *BufferedQueue[T]) PeekHead() (T, bool) { q.mutex.Lock() defer q.mutex.Unlock()
if q.count <= 0 { var a T return a, false }
job := q.head.items[q.head.headIndex] return job, true }
// Size returns the number of items in the queue
func (q *BufferedQueue[T]) Size() int { q.mutex.Lock() defer q.mutex.Unlock() return q.count }
// IsEmpty returns true if the queue is empty
func (q *BufferedQueue[T]) IsEmpty() bool { return q.Size() == 0 }
func (q *BufferedQueue[T]) CloseInput() { q.mutex.Lock() defer q.mutex.Unlock() q.isClosed = true q.waitCond.Broadcast() }
|