You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

146 lines
3.3 KiB

12 months ago
12 months ago
12 months ago
12 months ago
12 months ago
12 months ago
12 months ago
  1. package buffered_queue
  2. import (
  3. "fmt"
  4. "sync"
  5. )
  6. // ItemChunkNode represents a node in the linked list of job chunks
  7. type ItemChunkNode[T any] struct {
  8. items []T
  9. headIndex int
  10. tailIndex int
  11. next *ItemChunkNode[T]
  12. nodeId int
  13. }
  14. // BufferedQueue implements a buffered queue using a linked list of job chunks
  15. type BufferedQueue[T any] struct {
  16. chunkSize int // Maximum number of items per chunk
  17. head *ItemChunkNode[T]
  18. tail *ItemChunkNode[T]
  19. last *ItemChunkNode[T] // Pointer to the last chunk, for reclaiming memory
  20. count int // Total number of items in the queue
  21. mutex sync.Mutex
  22. nodeCounter int
  23. waitOnRead bool
  24. waitCond *sync.Cond
  25. isClosed bool
  26. }
  27. // NewBufferedQueue creates a new buffered queue with the specified chunk size
  28. func NewBufferedQueue[T any](chunkSize int, waitOnRead bool) *BufferedQueue[T] {
  29. // Create an empty chunk to initialize head and tail
  30. chunk := &ItemChunkNode[T]{items: make([]T, chunkSize), nodeId: 0}
  31. bq := &BufferedQueue[T]{
  32. chunkSize: chunkSize,
  33. head: chunk,
  34. tail: chunk,
  35. last: chunk,
  36. count: 0,
  37. mutex: sync.Mutex{},
  38. waitOnRead: waitOnRead,
  39. }
  40. bq.waitCond = sync.NewCond(&bq.mutex)
  41. return bq
  42. }
  43. // Enqueue adds a job to the queue
  44. func (q *BufferedQueue[T]) Enqueue(job T) error {
  45. if q.isClosed {
  46. return fmt.Errorf("queue is closed")
  47. }
  48. q.mutex.Lock()
  49. defer q.mutex.Unlock()
  50. // If the tail chunk is full, create a new chunk (reusing empty chunks if available)
  51. if q.tail.tailIndex == q.chunkSize {
  52. if q.tail == q.last {
  53. // Create a new chunk
  54. q.nodeCounter++
  55. newChunk := &ItemChunkNode[T]{items: make([]T, q.chunkSize), nodeId: q.nodeCounter}
  56. q.tail.next = newChunk
  57. q.tail = newChunk
  58. q.last = newChunk
  59. } else {
  60. // Reuse an empty chunk
  61. q.tail = q.tail.next
  62. q.tail.headIndex = 0
  63. q.tail.tailIndex = 0
  64. // println("tail moved to chunk", q.tail.nodeId)
  65. }
  66. }
  67. // Add the job to the tail chunk
  68. q.tail.items[q.tail.tailIndex] = job
  69. q.tail.tailIndex++
  70. q.count++
  71. if q.waitOnRead {
  72. q.waitCond.Signal()
  73. }
  74. return nil
  75. }
  76. // Dequeue removes and returns a job from the queue
  77. func (q *BufferedQueue[T]) Dequeue() (T, bool) {
  78. q.mutex.Lock()
  79. defer q.mutex.Unlock()
  80. if q.waitOnRead {
  81. for q.count <= 0 && !q.isClosed {
  82. q.waitCond.Wait()
  83. }
  84. if q.isClosed {
  85. var a T
  86. return a, false
  87. }
  88. } else {
  89. if q.count == 0 {
  90. var a T
  91. return a, false
  92. }
  93. }
  94. job := q.head.items[q.head.headIndex]
  95. q.head.headIndex++
  96. q.count--
  97. if q.head.headIndex == q.chunkSize {
  98. q.last.next = q.head
  99. q.head = q.head.next
  100. q.last = q.last.next
  101. q.last.next = nil
  102. //println("reusing chunk", q.last.nodeId)
  103. //fmt.Printf("head: %+v\n", q.head)
  104. //fmt.Printf("tail: %+v\n", q.tail)
  105. //fmt.Printf("last: %+v\n", q.last)
  106. //fmt.Printf("count: %d\n", q.count)
  107. //for p := q.head; p != nil ; p = p.next {
  108. // fmt.Printf("Node: %+v\n", p)
  109. //}
  110. }
  111. return job, true
  112. }
  113. // Size returns the number of items in the queue
  114. func (q *BufferedQueue[T]) Size() int {
  115. q.mutex.Lock()
  116. defer q.mutex.Unlock()
  117. return q.count
  118. }
  119. // IsEmpty returns true if the queue is empty
  120. func (q *BufferedQueue[T]) IsEmpty() bool {
  121. return q.Size() == 0
  122. }
  123. func (q *BufferedQueue[T]) CloseInput() {
  124. q.mutex.Lock()
  125. defer q.mutex.Unlock()
  126. q.isClosed = true
  127. q.waitCond.Broadcast()
  128. }