diff --git a/weed/util/buffered_queue.go b/weed/util/buffered_queue/buffered_queue.go similarity index 70% rename from weed/util/buffered_queue.go rename to weed/util/buffered_queue/buffered_queue.go index b2fbcd3c7..fedfcee51 100644 --- a/weed/util/buffered_queue.go +++ b/weed/util/buffered_queue/buffered_queue.go @@ -1,4 +1,4 @@ -package util +package buffered_queue import ( "sync" @@ -9,33 +9,38 @@ type ItemChunkNode[T any] struct { items []T headIndex int tailIndex int - next *ItemChunkNode[T] - nodeId int + next *ItemChunkNode[T] + nodeId int } // BufferedQueue implements a buffered queue using a linked list of job chunks type BufferedQueue[T any] struct { - chunkSize int // Maximum number of items per chunk - head *ItemChunkNode[T] - tail *ItemChunkNode[T] - last *ItemChunkNode[T] // Pointer to the last chunk, for reclaiming memory - count int // Total number of items in the queue - mutex sync.Mutex + chunkSize int // Maximum number of items per chunk + head *ItemChunkNode[T] + tail *ItemChunkNode[T] + last *ItemChunkNode[T] // Pointer to the last chunk, for reclaiming memory + count int // Total number of items in the queue + mutex sync.Mutex nodeCounter int + waitOnRead bool + waitCond *sync.Cond } // NewBufferedQueue creates a new buffered queue with the specified chunk size -func NewBufferedQueue[T any](chunkSize int) *BufferedQueue[T] { +func NewBufferedQueue[T any](chunkSize int, waitOnRead bool) *BufferedQueue[T] { // Create an empty chunk to initialize head and tail chunk := &ItemChunkNode[T]{items: make([]T, chunkSize), nodeId: 0} - return &BufferedQueue[T]{ - chunkSize: chunkSize, - head: chunk, - tail: chunk, - last: chunk, - count: 0, - mutex: sync.Mutex{}, + bq := &BufferedQueue[T]{ + chunkSize: chunkSize, + head: chunk, + tail: chunk, + last: chunk, + count: 0, + mutex: sync.Mutex{}, + waitOnRead: waitOnRead, } + bq.waitCond = sync.NewCond(&bq.mutex) + return bq } // Enqueue adds a job to the queue @@ -65,6 +70,9 @@ func (q *BufferedQueue[T]) Enqueue(job T) { q.tail.items[q.tail.tailIndex] = job q.tail.tailIndex++ q.count++ + if q.waitOnRead { + q.waitCond.Signal() + } } // Dequeue removes and returns a job from the queue @@ -72,9 +80,15 @@ func (q *BufferedQueue[T]) Dequeue() (T, bool) { q.mutex.Lock() defer q.mutex.Unlock() - if q.count == 0 { - var a T - return a, false + if q.waitOnRead { + for q.count <= 0 { + q.waitCond.Wait() + } + } else { + if q.count == 0 { + var a T + return a, false + } } job := q.head.items[q.head.headIndex] diff --git a/weed/util/buffered_queue_test.go b/weed/util/buffered_queue/buffered_queue_test.go similarity index 74% rename from weed/util/buffered_queue_test.go rename to weed/util/buffered_queue/buffered_queue_test.go index a4b08c036..c4236cd40 100644 --- a/weed/util/buffered_queue_test.go +++ b/weed/util/buffered_queue/buffered_queue_test.go @@ -1,4 +1,4 @@ -package util +package buffered_queue import "testing" @@ -9,7 +9,7 @@ func TestJobQueue(t *testing.T) { Data T } - queue := NewBufferedQueue[Job[string]](2) // Chunk size of 5 + queue := NewBufferedQueue[Job[string]](2, false) // Chunk size of 5 queue.Enqueue(Job[string]{ID: 1, Action: "task1", Data: "hello"}) queue.Enqueue(Job[string]{ID: 2, Action: "task2", Data: "world"}) @@ -62,7 +62,7 @@ func TestJobQueue(t *testing.T) { for i := 0; i < 5; i++ { println("enqueue", i+8) - queue.Enqueue(Job[string]{ID: i+8, Action: "task", Data: "data"}) + queue.Enqueue(Job[string]{ID: i + 8, Action: "task", Data: "data"}) } for i := 0; i < 5; i++ { job, ok = queue.Dequeue() @@ -76,3 +76,25 @@ func TestJobQueue(t *testing.T) { } } + +func BenchmarkBufferedQueue(b *testing.B) { + type Job[T any] struct { + ID int + Action string + Data T + } + + queue := NewBufferedQueue[Job[string]](1024, true) + + b.Run("Enqueue", func(b *testing.B) { + for i := 0; i < b.N; i++ { + queue.Enqueue(Job[string]{ID: i, Action: "task", Data: "data"}) + } + }) + + b.Run("Dequeue", func(b *testing.B) { + for i := 0; i < b.N; i++ { + _, _ = queue.Dequeue() + } + }) +}