Browse Source

fix cases when buffer wraps around

pull/5890/head
chrislu 7 months ago
parent
commit
f7b818483a
  1. 11
      weed/mq/sub_coordinator/inflight_message_tracker.go
  2. 23
      weed/mq/sub_coordinator/inflight_message_tracker_test.go

11
weed/mq/sub_coordinator/inflight_message_tracker.go

@ -89,7 +89,7 @@ type RingBuffer struct {
head int
size int
maxTimestamp int64
minAckedTs int64
maxAllAckedTs int64
}
// NewRingBuffer creates a new RingBuffer of the given capacity.
@ -139,17 +139,20 @@ func (rb *RingBuffer) AckTimestamp(timestamp int64) {
rb.buffer[actualIndex].Acked = true
// Remove all the acknowledged timestamps from the buffer
// Remove all the continuously acknowledged timestamps from the buffer
startPos := (rb.head + len(rb.buffer) - rb.size) % len(rb.buffer)
for i := 0; i < len(rb.buffer) && rb.buffer[(startPos+i)%len(rb.buffer)].Acked; i++ {
rb.size--
rb.minAckedTs = rb.buffer[(startPos+i)%len(rb.buffer)].Timestamp
t := rb.buffer[(startPos+i)%len(rb.buffer)]
if rb.maxAllAckedTs < t.Timestamp {
rb.maxAllAckedTs = t.Timestamp
}
}
}
// OldestAckedTimestamp returns the oldest that is already acked timestamp in the ring buffer.
func (rb *RingBuffer) OldestAckedTimestamp() int64 {
return rb.minAckedTs
return rb.maxAllAckedTs
}
// Latest returns the most recently known timestamp in the ring buffer.

23
weed/mq/sub_coordinator/inflight_message_tracker_test.go

@ -94,3 +94,26 @@ func TestInflightMessageTracker2(t *testing.T) {
assert.Equal(t, int64(2), tracker.GetOldestAckedTimestamp())
}
func TestInflightMessageTracker3(t *testing.T) {
// Initialize an InflightMessageTracker with initial capacity 1
tracker := NewInflightMessageTracker(1)
tracker.EnflightMessage([]byte("1"), int64(1))
tracker.EnflightMessage([]byte("2"), int64(2))
tracker.EnflightMessage([]byte("3"), int64(3))
assert.True(t, tracker.AcknowledgeMessage([]byte("1"), int64(1)))
tracker.EnflightMessage([]byte("4"), int64(4))
tracker.EnflightMessage([]byte("5"), int64(5))
assert.True(t, tracker.AcknowledgeMessage([]byte("2"), int64(2)))
assert.True(t, tracker.AcknowledgeMessage([]byte("3"), int64(3)))
tracker.EnflightMessage([]byte("6"), int64(6))
tracker.EnflightMessage([]byte("7"), int64(7))
assert.True(t, tracker.AcknowledgeMessage([]byte("4"), int64(4)))
assert.True(t, tracker.AcknowledgeMessage([]byte("5"), int64(5)))
assert.True(t, tracker.AcknowledgeMessage([]byte("6"), int64(6)))
assert.Equal(t, int64(6), tracker.GetOldestAckedTimestamp())
assert.True(t, tracker.AcknowledgeMessage([]byte("7"), int64(7)))
assert.Equal(t, int64(7), tracker.GetOldestAckedTimestamp())
}
Loading…
Cancel
Save