Browse Source

ensure head index is within range

mq-subscribe
chrislu 8 months ago
parent
commit
a5645d50a7
  1. 10
      weed/util/buffered_queue/buffered_queue.go

10
weed/util/buffered_queue/buffered_queue.go

@ -95,10 +95,16 @@ func (q *BufferedQueue[T]) Dequeue() (T, bool) {
return a, false return a, false
} }
q.maybeAdjustHeadIndex()
job := q.head.items[q.head.headIndex] job := q.head.items[q.head.headIndex]
q.head.headIndex++ q.head.headIndex++
q.count-- q.count--
return job, true
}
func (q *BufferedQueue[T]) maybeAdjustHeadIndex() {
if q.head.headIndex == q.chunkSize { if q.head.headIndex == q.chunkSize {
q.last.next = q.head q.last.next = q.head
q.head = q.head.next q.head = q.head.next
@ -113,8 +119,6 @@ func (q *BufferedQueue[T]) Dequeue() (T, bool) {
// fmt.Printf("Node: %+v\n", p) // fmt.Printf("Node: %+v\n", p)
//} //}
} }
return job, true
} }
func (q *BufferedQueue[T]) PeekHead() (T, bool) { func (q *BufferedQueue[T]) PeekHead() (T, bool) {
@ -126,6 +130,8 @@ func (q *BufferedQueue[T]) PeekHead() (T, bool) {
return a, false return a, false
} }
q.maybeAdjustHeadIndex()
job := q.head.items[q.head.headIndex] job := q.head.items[q.head.headIndex]
return job, true return job, true
} }

Loading…
Cancel
Save