Browse Source

adds a buffered queue

pull/5637/head
chrislu 11 months ago
parent
commit
08c5fba825
  1. 112
      weed/util/buffered_queue.go
  2. 78
      weed/util/buffered_queue_test.go

112
weed/util/buffered_queue.go

@ -0,0 +1,112 @@
package util
import (
"sync"
)
// ItemChunkNode represents a node in the linked list of job chunks
type ItemChunkNode[T any] struct {
items []T
headIndex int
tailIndex int
next *ItemChunkNode[T]
nodeId int
}
// BufferedQueue implements a buffered queue using a linked list of job chunks
type BufferedQueue[T any] struct {
chunkSize int // Maximum number of items per chunk
head *ItemChunkNode[T]
tail *ItemChunkNode[T]
last *ItemChunkNode[T] // Pointer to the last chunk, for reclaiming memory
count int // Total number of items in the queue
mutex sync.Mutex
nodeCounter int
}
// NewBufferedQueue creates a new buffered queue with the specified chunk size
func NewBufferedQueue[T any](chunkSize int) *BufferedQueue[T] {
// Create an empty chunk to initialize head and tail
chunk := &ItemChunkNode[T]{items: make([]T, chunkSize), nodeId: 0}
return &BufferedQueue[T]{
chunkSize: chunkSize,
head: chunk,
tail: chunk,
last: chunk,
count: 0,
mutex: sync.Mutex{},
}
}
// Enqueue adds a job to the queue
func (q *BufferedQueue[T]) Enqueue(job T) {
q.mutex.Lock()
defer q.mutex.Unlock()
// If the tail chunk is full, create a new chunk (reusing empty chunks if available)
if q.tail.tailIndex == q.chunkSize {
if q.tail == q.last {
// Create a new chunk
q.nodeCounter++
newChunk := &ItemChunkNode[T]{items: make([]T, q.chunkSize), nodeId: q.nodeCounter}
q.tail.next = newChunk
q.tail = newChunk
q.last = newChunk
} else {
// Reuse an empty chunk
q.tail = q.tail.next
q.tail.headIndex = 0
q.tail.tailIndex = 0
// println("tail moved to chunk", q.tail.nodeId)
}
}
// Add the job to the tail chunk
q.tail.items[q.tail.tailIndex] = job
q.tail.tailIndex++
q.count++
}
// Dequeue removes and returns a job from the queue
func (q *BufferedQueue[T]) Dequeue() (T, bool) {
q.mutex.Lock()
defer q.mutex.Unlock()
if q.count == 0 {
var a T
return a, false
}
job := q.head.items[q.head.headIndex]
q.head.headIndex++
q.count--
if q.head.headIndex == q.chunkSize {
q.last.next = q.head
q.head = q.head.next
q.last = q.last.next
q.last.next = nil
//println("reusing chunk", q.last.nodeId)
//fmt.Printf("head: %+v\n", q.head)
//fmt.Printf("tail: %+v\n", q.tail)
//fmt.Printf("last: %+v\n", q.last)
//fmt.Printf("count: %d\n", q.count)
//for p := q.head; p != nil ; p = p.next {
// fmt.Printf("Node: %+v\n", p)
//}
}
return job, true
}
// Size returns the number of items in the queue
func (q *BufferedQueue[T]) Size() int {
q.mutex.Lock()
defer q.mutex.Unlock()
return q.count
}
// IsEmpty returns true if the queue is empty
func (q *BufferedQueue[T]) IsEmpty() bool {
return q.Size() == 0
}

78
weed/util/buffered_queue_test.go

@ -0,0 +1,78 @@
package util
import "testing"
func TestJobQueue(t *testing.T) {
type Job[T any] struct {
ID int
Action string
Data T
}
queue := NewBufferedQueue[Job[string]](2) // Chunk size of 5
queue.Enqueue(Job[string]{ID: 1, Action: "task1", Data: "hello"})
queue.Enqueue(Job[string]{ID: 2, Action: "task2", Data: "world"})
if queue.Size() != 2 {
t.Errorf("Expected queue size of 2, got %d", queue.Size())
}
queue.Enqueue(Job[string]{ID: 3, Action: "task3", Data: "3!"})
queue.Enqueue(Job[string]{ID: 4, Action: "task4", Data: "4!"})
queue.Enqueue(Job[string]{ID: 5, Action: "task5", Data: "5!"})
if queue.Size() != 5 {
t.Errorf("Expected queue size of 5, got %d", queue.Size())
}
println("enqueued 5 items")
println("dequeue", 1)
job, ok := queue.Dequeue()
if !ok {
t.Errorf("Expected dequeue to return true")
}
if job.ID != 1 {
t.Errorf("Expected job ID of 1, got %d", job.ID)
}
println("dequeue", 2)
job, ok = queue.Dequeue()
if !ok {
t.Errorf("Expected dequeue to return true")
}
println("enqueue", 6)
queue.Enqueue(Job[string]{ID: 6, Action: "task6", Data: "6!"})
println("enqueue", 7)
queue.Enqueue(Job[string]{ID: 7, Action: "task7", Data: "7!"})
for i := 0; i < 5; i++ {
println("dequeue ...")
job, ok = queue.Dequeue()
if !ok {
t.Errorf("Expected dequeue to return true")
}
println("dequeued", job.ID)
}
if queue.Size() != 0 {
t.Errorf("Expected queue size of 0, got %d", queue.Size())
}
for i := 0; i < 5; i++ {
println("enqueue", i+8)
queue.Enqueue(Job[string]{ID: i+8, Action: "task", Data: "data"})
}
for i := 0; i < 5; i++ {
job, ok = queue.Dequeue()
if !ok {
t.Errorf("Expected dequeue to return true")
}
if job.ID != i+8 {
t.Errorf("Expected job ID of %d, got %d", i, job.ID)
}
println("dequeued", job.ID)
}
}
Loading…
Cancel
Save