From f7b818483a7b8c96720fd625f689691e0f97b867 Mon Sep 17 00:00:00 2001
From: chrislu <chris.lu@gmail.com>
Date: Thu, 30 May 2024 09:10:30 -0700
Subject: [PATCH] fix cases when buffer wraps around

---
 .../inflight_message_tracker.go               | 13 +++++++----
 .../inflight_message_tracker_test.go          | 23 +++++++++++++++++++
 2 files changed, 31 insertions(+), 5 deletions(-)

diff --git a/weed/mq/sub_coordinator/inflight_message_tracker.go b/weed/mq/sub_coordinator/inflight_message_tracker.go
index 6ac5103a9..7a72d2eef 100644
--- a/weed/mq/sub_coordinator/inflight_message_tracker.go
+++ b/weed/mq/sub_coordinator/inflight_message_tracker.go
@@ -88,8 +88,8 @@ type RingBuffer struct {
 	buffer       []*TimestampStatus
 	head         int
 	size         int
-	maxTimestamp int64
-	minAckedTs   int64
+	maxTimestamp  int64
+	maxAllAckedTs int64
 }
 
 // NewRingBuffer creates a new RingBuffer of the given capacity.
@@ -139,17 +139,20 @@ func (rb *RingBuffer) AckTimestamp(timestamp int64) {
 
 	rb.buffer[actualIndex].Acked = true
 
-	// Remove all the acknowledged timestamps from the buffer
+	// Remove all the continuously acknowledged timestamps from the buffer
 	startPos := (rb.head + len(rb.buffer) - rb.size) % len(rb.buffer)
 	for i := 0; i < len(rb.buffer) && rb.buffer[(startPos+i)%len(rb.buffer)].Acked; i++ {
 		rb.size--
-		rb.minAckedTs = rb.buffer[(startPos+i)%len(rb.buffer)].Timestamp
+		t := rb.buffer[(startPos+i)%len(rb.buffer)]
+		if rb.maxAllAckedTs < t.Timestamp {
+			rb.maxAllAckedTs = t.Timestamp
+		}
 	}
 }
 
 // OldestAckedTimestamp returns the oldest that is already acked timestamp in the ring buffer.
 func (rb *RingBuffer) OldestAckedTimestamp() int64 {
-	return rb.minAckedTs
+	return rb.maxAllAckedTs
 }
 
 // Latest returns the most recently known timestamp in the ring buffer.
diff --git a/weed/mq/sub_coordinator/inflight_message_tracker_test.go b/weed/mq/sub_coordinator/inflight_message_tracker_test.go
index 83c33b5ba..9f62f80ad 100644
--- a/weed/mq/sub_coordinator/inflight_message_tracker_test.go
+++ b/weed/mq/sub_coordinator/inflight_message_tracker_test.go
@@ -94,3 +94,26 @@ func TestInflightMessageTracker2(t *testing.T) {
 	assert.Equal(t, int64(2), tracker.GetOldestAckedTimestamp())
 
 }
+
+func TestInflightMessageTracker3(t *testing.T) {
+	// Initialize an InflightMessageTracker with initial capacity 1
+	tracker := NewInflightMessageTracker(1)
+
+	tracker.EnflightMessage([]byte("1"), int64(1))
+	tracker.EnflightMessage([]byte("2"), int64(2))
+	tracker.EnflightMessage([]byte("3"), int64(3))
+	assert.True(t, tracker.AcknowledgeMessage([]byte("1"), int64(1)))
+	tracker.EnflightMessage([]byte("4"), int64(4))
+	tracker.EnflightMessage([]byte("5"), int64(5))
+	assert.True(t, tracker.AcknowledgeMessage([]byte("2"), int64(2)))
+	assert.True(t, tracker.AcknowledgeMessage([]byte("3"), int64(3)))
+	tracker.EnflightMessage([]byte("6"), int64(6))
+	tracker.EnflightMessage([]byte("7"), int64(7))
+	assert.True(t, tracker.AcknowledgeMessage([]byte("4"), int64(4)))
+	assert.True(t, tracker.AcknowledgeMessage([]byte("5"), int64(5)))
+	assert.True(t, tracker.AcknowledgeMessage([]byte("6"), int64(6)))
+	assert.Equal(t, int64(6), tracker.GetOldestAckedTimestamp())
+	assert.True(t, tracker.AcknowledgeMessage([]byte("7"), int64(7)))
+	assert.Equal(t, int64(7), tracker.GetOldestAckedTimestamp())
+
+}