Browse Source

fix size calculation

pull/5890/head
chrislu 7 months ago
parent
commit
df9d889489
  1. 2
      weed/mq/sub_coordinator/inflight_message_tracker.go
  2. 14
      weed/mq/sub_coordinator/inflight_message_tracker_test.go

2
weed/mq/sub_coordinator/inflight_message_tracker.go

@ -142,9 +142,9 @@ func (rb *RingBuffer) AckTimestamp(timestamp int64) {
// Remove all the continuously acknowledged timestamps from the buffer // Remove all the continuously acknowledged timestamps from the buffer
startPos := (rb.head + len(rb.buffer) - rb.size) % len(rb.buffer) startPos := (rb.head + len(rb.buffer) - rb.size) % len(rb.buffer)
for i := 0; i < len(rb.buffer) && rb.buffer[(startPos+i)%len(rb.buffer)].Acked; i++ { for i := 0; i < len(rb.buffer) && rb.buffer[(startPos+i)%len(rb.buffer)].Acked; i++ {
rb.size--
t := rb.buffer[(startPos+i)%len(rb.buffer)] t := rb.buffer[(startPos+i)%len(rb.buffer)]
if rb.maxAllAckedTs < t.Timestamp { if rb.maxAllAckedTs < t.Timestamp {
rb.size--
rb.maxAllAckedTs = t.Timestamp rb.maxAllAckedTs = t.Timestamp
} }
} }

14
weed/mq/sub_coordinator/inflight_message_tracker_test.go

@ -117,3 +117,17 @@ func TestInflightMessageTracker3(t *testing.T) {
assert.Equal(t, int64(7), tracker.GetOldestAckedTimestamp()) assert.Equal(t, int64(7), tracker.GetOldestAckedTimestamp())
} }
func TestInflightMessageTracker4(t *testing.T) {
// Initialize an InflightMessageTracker with initial capacity 1
tracker := NewInflightMessageTracker(1)
tracker.EnflightMessage([]byte("1"), int64(1))
tracker.EnflightMessage([]byte("2"), int64(2))
assert.True(t, tracker.AcknowledgeMessage([]byte("1"), int64(1)))
assert.True(t, tracker.AcknowledgeMessage([]byte("2"), int64(2)))
tracker.EnflightMessage([]byte("3"), int64(3))
assert.True(t, tracker.AcknowledgeMessage([]byte("3"), int64(3)))
assert.Equal(t, int64(3), tracker.GetOldestAckedTimestamp())
}
Loading…
Cancel
Save