diff --git a/weed/mq/sub_coordinator/inflight_message_tracker.go b/weed/mq/sub_coordinator/inflight_message_tracker.go index 7a72d2eef..f8effef95 100644 --- a/weed/mq/sub_coordinator/inflight_message_tracker.go +++ b/weed/mq/sub_coordinator/inflight_message_tracker.go @@ -142,9 +142,9 @@ func (rb *RingBuffer) AckTimestamp(timestamp int64) { // Remove all the continuously acknowledged timestamps from the buffer startPos := (rb.head + len(rb.buffer) - rb.size) % len(rb.buffer) for i := 0; i < len(rb.buffer) && rb.buffer[(startPos+i)%len(rb.buffer)].Acked; i++ { - rb.size-- t := rb.buffer[(startPos+i)%len(rb.buffer)] if rb.maxAllAckedTs < t.Timestamp { + rb.size-- rb.maxAllAckedTs = t.Timestamp } } diff --git a/weed/mq/sub_coordinator/inflight_message_tracker_test.go b/weed/mq/sub_coordinator/inflight_message_tracker_test.go index 9f62f80ad..5b7a1bdd8 100644 --- a/weed/mq/sub_coordinator/inflight_message_tracker_test.go +++ b/weed/mq/sub_coordinator/inflight_message_tracker_test.go @@ -117,3 +117,17 @@ func TestInflightMessageTracker3(t *testing.T) { assert.Equal(t, int64(7), tracker.GetOldestAckedTimestamp()) } + +func TestInflightMessageTracker4(t *testing.T) { + // Initialize an InflightMessageTracker with initial capacity 1 + tracker := NewInflightMessageTracker(1) + + tracker.EnflightMessage([]byte("1"), int64(1)) + tracker.EnflightMessage([]byte("2"), int64(2)) + assert.True(t, tracker.AcknowledgeMessage([]byte("1"), int64(1))) + assert.True(t, tracker.AcknowledgeMessage([]byte("2"), int64(2))) + tracker.EnflightMessage([]byte("3"), int64(3)) + assert.True(t, tracker.AcknowledgeMessage([]byte("3"), int64(3))) + assert.Equal(t, int64(3), tracker.GetOldestAckedTimestamp()) + +}