You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

156 lines
3.4 KiB

12 months ago
12 months ago
10 months ago
12 months ago
12 months ago
10 months ago
12 months ago
  1. package buffered_queue
  2. import (
  3. "fmt"
  4. "sync"
  5. )
  6. // ItemChunkNode represents a node in the linked list of job chunks
  7. type ItemChunkNode[T any] struct {
  8. items []T
  9. headIndex int
  10. tailIndex int
  11. next *ItemChunkNode[T]
  12. nodeId int
  13. }
  14. // BufferedQueue implements a buffered queue using a linked list of job chunks
  15. type BufferedQueue[T any] struct {
  16. chunkSize int // Maximum number of items per chunk
  17. head *ItemChunkNode[T]
  18. tail *ItemChunkNode[T]
  19. last *ItemChunkNode[T] // Pointer to the last chunk, for reclaiming memory
  20. count int // Total number of items in the queue
  21. mutex sync.Mutex
  22. nodeCounter int
  23. waitCond *sync.Cond
  24. isClosed bool
  25. }
  26. // NewBufferedQueue creates a new buffered queue with the specified chunk size
  27. func NewBufferedQueue[T any](chunkSize int) *BufferedQueue[T] {
  28. // Create an empty chunk to initialize head and tail
  29. chunk := &ItemChunkNode[T]{items: make([]T, chunkSize), nodeId: 0}
  30. bq := &BufferedQueue[T]{
  31. chunkSize: chunkSize,
  32. head: chunk,
  33. tail: chunk,
  34. last: chunk,
  35. count: 0,
  36. mutex: sync.Mutex{},
  37. }
  38. bq.waitCond = sync.NewCond(&bq.mutex)
  39. return bq
  40. }
  41. // Enqueue adds a job to the queue
  42. func (q *BufferedQueue[T]) Enqueue(job T) error {
  43. if q.isClosed {
  44. return fmt.Errorf("queue is closed")
  45. }
  46. q.mutex.Lock()
  47. defer q.mutex.Unlock()
  48. // If the tail chunk is full, create a new chunk (reusing empty chunks if available)
  49. if q.tail.tailIndex == q.chunkSize {
  50. if q.tail == q.last {
  51. // Create a new chunk
  52. q.nodeCounter++
  53. newChunk := &ItemChunkNode[T]{items: make([]T, q.chunkSize), nodeId: q.nodeCounter}
  54. q.tail.next = newChunk
  55. q.tail = newChunk
  56. q.last = newChunk
  57. } else {
  58. // Reuse an empty chunk
  59. q.tail = q.tail.next
  60. q.tail.headIndex = 0
  61. q.tail.tailIndex = 0
  62. // println("tail moved to chunk", q.tail.nodeId)
  63. }
  64. }
  65. // Add the job to the tail chunk
  66. q.tail.items[q.tail.tailIndex] = job
  67. q.tail.tailIndex++
  68. q.count++
  69. if q.count == 1 {
  70. q.waitCond.Signal()
  71. }
  72. return nil
  73. }
  74. // Dequeue removes and returns a job from the queue
  75. func (q *BufferedQueue[T]) Dequeue() (T, bool) {
  76. q.mutex.Lock()
  77. defer q.mutex.Unlock()
  78. for q.count <= 0 && !q.isClosed {
  79. q.waitCond.Wait()
  80. }
  81. if q.count <= 0 && q.isClosed {
  82. var a T
  83. return a, false
  84. }
  85. q.maybeAdjustHeadIndex()
  86. job := q.head.items[q.head.headIndex]
  87. q.head.headIndex++
  88. q.count--
  89. return job, true
  90. }
  91. func (q *BufferedQueue[T]) maybeAdjustHeadIndex() {
  92. if q.head.headIndex == q.chunkSize {
  93. q.last.next = q.head
  94. q.head = q.head.next
  95. q.last = q.last.next
  96. q.last.next = nil
  97. //println("reusing chunk", q.last.nodeId)
  98. //fmt.Printf("head: %+v\n", q.head)
  99. //fmt.Printf("tail: %+v\n", q.tail)
  100. //fmt.Printf("last: %+v\n", q.last)
  101. //fmt.Printf("count: %d\n", q.count)
  102. //for p := q.head; p != nil ; p = p.next {
  103. // fmt.Printf("Node: %+v\n", p)
  104. //}
  105. }
  106. }
  107. func (q *BufferedQueue[T]) PeekHead() (T, bool) {
  108. q.mutex.Lock()
  109. defer q.mutex.Unlock()
  110. if q.count <= 0 {
  111. var a T
  112. return a, false
  113. }
  114. q.maybeAdjustHeadIndex()
  115. job := q.head.items[q.head.headIndex]
  116. return job, true
  117. }
  118. // Size returns the number of items in the queue
  119. func (q *BufferedQueue[T]) Size() int {
  120. q.mutex.Lock()
  121. defer q.mutex.Unlock()
  122. return q.count
  123. }
  124. // IsEmpty returns true if the queue is empty
  125. func (q *BufferedQueue[T]) IsEmpty() bool {
  126. return q.Size() == 0
  127. }
  128. func (q *BufferedQueue[T]) CloseInput() {
  129. q.mutex.Lock()
  130. defer q.mutex.Unlock()
  131. q.isClosed = true
  132. q.waitCond.Broadcast()
  133. }