|
@ -1,7 +1,6 @@ |
|
|
package sub_coordinator |
|
|
package sub_coordinator |
|
|
|
|
|
|
|
|
import ( |
|
|
import ( |
|
|
"fmt" |
|
|
|
|
|
"sort" |
|
|
"sort" |
|
|
"sync" |
|
|
"sync" |
|
|
) |
|
|
) |
|
@ -22,7 +21,7 @@ func NewInflightMessageTracker(capacity int) *InflightMessageTracker { |
|
|
// EnflightMessage tracks the message with the key and timestamp.
|
|
|
// EnflightMessage tracks the message with the key and timestamp.
|
|
|
// These messages are sent to the consumer group instances and waiting for ack.
|
|
|
// These messages are sent to the consumer group instances and waiting for ack.
|
|
|
func (imt *InflightMessageTracker) EnflightMessage(key []byte, tsNs int64) { |
|
|
func (imt *InflightMessageTracker) EnflightMessage(key []byte, tsNs int64) { |
|
|
fmt.Printf("EnflightMessage(%s,%d)\n", string(key), tsNs) |
|
|
|
|
|
|
|
|
// fmt.Printf("EnflightMessage(%s,%d)\n", string(key), tsNs)
|
|
|
imt.mu.Lock() |
|
|
imt.mu.Lock() |
|
|
defer imt.mu.Unlock() |
|
|
defer imt.mu.Unlock() |
|
|
imt.messages[string(key)] = tsNs |
|
|
imt.messages[string(key)] = tsNs |
|
@ -53,7 +52,7 @@ func (imt *InflightMessageTracker) IsMessageAcknowledged(key []byte, tsNs int64) |
|
|
|
|
|
|
|
|
// AcknowledgeMessage acknowledges the message with the key and timestamp.
|
|
|
// AcknowledgeMessage acknowledges the message with the key and timestamp.
|
|
|
func (imt *InflightMessageTracker) AcknowledgeMessage(key []byte, tsNs int64) bool { |
|
|
func (imt *InflightMessageTracker) AcknowledgeMessage(key []byte, tsNs int64) bool { |
|
|
fmt.Printf("AcknowledgeMessage(%s,%d)\n", string(key), tsNs) |
|
|
|
|
|
|
|
|
// fmt.Printf("AcknowledgeMessage(%s,%d)\n", string(key), tsNs)
|
|
|
imt.mu.Lock() |
|
|
imt.mu.Lock() |
|
|
defer imt.mu.Unlock() |
|
|
defer imt.mu.Unlock() |
|
|
timestamp, exists := imt.messages[string(key)] |
|
|
timestamp, exists := imt.messages[string(key)] |
|
|