You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

137 lines
3.1 KiB

12 months ago
12 months ago
11 months ago
12 months ago
12 months ago
11 months ago
12 months ago
  1. package buffered_queue
  2. import (
  3. "fmt"
  4. "sync"
  5. )
  6. // ItemChunkNode represents a node in the linked list of job chunks
  7. type ItemChunkNode[T any] struct {
  8. items []T
  9. headIndex int
  10. tailIndex int
  11. next *ItemChunkNode[T]
  12. nodeId int
  13. }
  14. // BufferedQueue implements a buffered queue using a linked list of job chunks
  15. type BufferedQueue[T any] struct {
  16. chunkSize int // Maximum number of items per chunk
  17. head *ItemChunkNode[T]
  18. tail *ItemChunkNode[T]
  19. last *ItemChunkNode[T] // Pointer to the last chunk, for reclaiming memory
  20. count int // Total number of items in the queue
  21. mutex sync.Mutex
  22. nodeCounter int
  23. waitCond *sync.Cond
  24. isClosed bool
  25. }
  26. // NewBufferedQueue creates a new buffered queue with the specified chunk size
  27. func NewBufferedQueue[T any](chunkSize int) *BufferedQueue[T] {
  28. // Create an empty chunk to initialize head and tail
  29. chunk := &ItemChunkNode[T]{items: make([]T, chunkSize), nodeId: 0}
  30. bq := &BufferedQueue[T]{
  31. chunkSize: chunkSize,
  32. head: chunk,
  33. tail: chunk,
  34. last: chunk,
  35. count: 0,
  36. mutex: sync.Mutex{},
  37. }
  38. bq.waitCond = sync.NewCond(&bq.mutex)
  39. return bq
  40. }
  41. // Enqueue adds a job to the queue
  42. func (q *BufferedQueue[T]) Enqueue(job T) error {
  43. if q.isClosed {
  44. return fmt.Errorf("queue is closed")
  45. }
  46. q.mutex.Lock()
  47. defer q.mutex.Unlock()
  48. // If the tail chunk is full, create a new chunk (reusing empty chunks if available)
  49. if q.tail.tailIndex == q.chunkSize {
  50. if q.tail == q.last {
  51. // Create a new chunk
  52. q.nodeCounter++
  53. newChunk := &ItemChunkNode[T]{items: make([]T, q.chunkSize), nodeId: q.nodeCounter}
  54. q.tail.next = newChunk
  55. q.tail = newChunk
  56. q.last = newChunk
  57. } else {
  58. // Reuse an empty chunk
  59. q.tail = q.tail.next
  60. q.tail.headIndex = 0
  61. q.tail.tailIndex = 0
  62. // println("tail moved to chunk", q.tail.nodeId)
  63. }
  64. }
  65. // Add the job to the tail chunk
  66. q.tail.items[q.tail.tailIndex] = job
  67. q.tail.tailIndex++
  68. q.count++
  69. if q.count == 1 {
  70. q.waitCond.Signal()
  71. }
  72. return nil
  73. }
  74. // Dequeue removes and returns a job from the queue
  75. func (q *BufferedQueue[T]) Dequeue() (T, bool) {
  76. q.mutex.Lock()
  77. defer q.mutex.Unlock()
  78. for q.count <= 0 && !q.isClosed {
  79. q.waitCond.Wait()
  80. }
  81. if q.count <= 0 && q.isClosed {
  82. var a T
  83. return a, false
  84. }
  85. job := q.head.items[q.head.headIndex]
  86. q.head.headIndex++
  87. q.count--
  88. if q.head.headIndex == q.chunkSize {
  89. q.last.next = q.head
  90. q.head = q.head.next
  91. q.last = q.last.next
  92. q.last.next = nil
  93. //println("reusing chunk", q.last.nodeId)
  94. //fmt.Printf("head: %+v\n", q.head)
  95. //fmt.Printf("tail: %+v\n", q.tail)
  96. //fmt.Printf("last: %+v\n", q.last)
  97. //fmt.Printf("count: %d\n", q.count)
  98. //for p := q.head; p != nil ; p = p.next {
  99. // fmt.Printf("Node: %+v\n", p)
  100. //}
  101. }
  102. return job, true
  103. }
  104. // Size returns the number of items in the queue
  105. func (q *BufferedQueue[T]) Size() int {
  106. q.mutex.Lock()
  107. defer q.mutex.Unlock()
  108. return q.count
  109. }
  110. // IsEmpty returns true if the queue is empty
  111. func (q *BufferedQueue[T]) IsEmpty() bool {
  112. return q.Size() == 0
  113. }
  114. func (q *BufferedQueue[T]) CloseInput() {
  115. q.mutex.Lock()
  116. defer q.mutex.Unlock()
  117. q.isClosed = true
  118. q.waitCond.Broadcast()
  119. }