diff --git a/weed/mq/sub_coordinator/inflight_message_tracker.go b/weed/mq/sub_coordinator/inflight_message_tracker.go index 6ac5103a9..7a72d2eef 100644 --- a/weed/mq/sub_coordinator/inflight_message_tracker.go +++ b/weed/mq/sub_coordinator/inflight_message_tracker.go @@ -88,8 +88,8 @@ type RingBuffer struct { buffer []*TimestampStatus head int size int - maxTimestamp int64 - minAckedTs int64 + maxTimestamp int64 + maxAllAckedTs int64 } // NewRingBuffer creates a new RingBuffer of the given capacity. @@ -139,17 +139,20 @@ func (rb *RingBuffer) AckTimestamp(timestamp int64) { rb.buffer[actualIndex].Acked = true - // Remove all the acknowledged timestamps from the buffer + // Remove all the continuously acknowledged timestamps from the buffer startPos := (rb.head + len(rb.buffer) - rb.size) % len(rb.buffer) for i := 0; i < len(rb.buffer) && rb.buffer[(startPos+i)%len(rb.buffer)].Acked; i++ { rb.size-- - rb.minAckedTs = rb.buffer[(startPos+i)%len(rb.buffer)].Timestamp + t := rb.buffer[(startPos+i)%len(rb.buffer)] + if rb.maxAllAckedTs < t.Timestamp { + rb.maxAllAckedTs = t.Timestamp + } } } // OldestAckedTimestamp returns the oldest that is already acked timestamp in the ring buffer. func (rb *RingBuffer) OldestAckedTimestamp() int64 { - return rb.minAckedTs + return rb.maxAllAckedTs } // Latest returns the most recently known timestamp in the ring buffer. diff --git a/weed/mq/sub_coordinator/inflight_message_tracker_test.go b/weed/mq/sub_coordinator/inflight_message_tracker_test.go index 83c33b5ba..9f62f80ad 100644 --- a/weed/mq/sub_coordinator/inflight_message_tracker_test.go +++ b/weed/mq/sub_coordinator/inflight_message_tracker_test.go @@ -94,3 +94,26 @@ func TestInflightMessageTracker2(t *testing.T) { assert.Equal(t, int64(2), tracker.GetOldestAckedTimestamp()) } + +func TestInflightMessageTracker3(t *testing.T) { + // Initialize an InflightMessageTracker with initial capacity 1 + tracker := NewInflightMessageTracker(1) + + tracker.EnflightMessage([]byte("1"), int64(1)) + tracker.EnflightMessage([]byte("2"), int64(2)) + tracker.EnflightMessage([]byte("3"), int64(3)) + assert.True(t, tracker.AcknowledgeMessage([]byte("1"), int64(1))) + tracker.EnflightMessage([]byte("4"), int64(4)) + tracker.EnflightMessage([]byte("5"), int64(5)) + assert.True(t, tracker.AcknowledgeMessage([]byte("2"), int64(2))) + assert.True(t, tracker.AcknowledgeMessage([]byte("3"), int64(3))) + tracker.EnflightMessage([]byte("6"), int64(6)) + tracker.EnflightMessage([]byte("7"), int64(7)) + assert.True(t, tracker.AcknowledgeMessage([]byte("4"), int64(4))) + assert.True(t, tracker.AcknowledgeMessage([]byte("5"), int64(5))) + assert.True(t, tracker.AcknowledgeMessage([]byte("6"), int64(6))) + assert.Equal(t, int64(6), tracker.GetOldestAckedTimestamp()) + assert.True(t, tracker.AcknowledgeMessage([]byte("7"), int64(7))) + assert.Equal(t, int64(7), tracker.GetOldestAckedTimestamp()) + +}