@ -29,7 +29,7 @@ func TestConsumerGroup_BasicFunctionality(t *testing.T) {
// Test configuration
// Test configuration
topicName := "consumer-group-test"
topicName := "consumer-group-test"
// Add topic for testing
// Add topic for testing
gatewayServer . GetHandler ( ) . AddTopicForTesting ( topicName , 1 )
gatewayServer . GetHandler ( ) . AddTopicForTesting ( topicName , 1 )
groupID := "test-consumer-group"
groupID := "test-consumer-group"
@ -75,15 +75,15 @@ func TestConsumerGroup_BasicFunctionality(t *testing.T) {
// Start multiple consumers in the same group
// Start multiple consumers in the same group
t . Logf ( "=== Starting %d consumers in group '%s' ===" , numConsumers , groupID )
t . Logf ( "=== Starting %d consumers in group '%s' ===" , numConsumers , groupID )
var wg sync . WaitGroup
var wg sync . WaitGroup
consumerErrors := make ( chan error , numConsumers )
consumerErrors := make ( chan error , numConsumers )
for i := 0 ; i < numConsumers ; i ++ {
for i := 0 ; i < numConsumers ; i ++ {
wg . Add ( 1 )
wg . Add ( 1 )
go func ( consumerID int ) {
go func ( consumerID int ) {
defer wg . Done ( )
defer wg . Done ( )
consumerGroup , err := sarama . NewConsumerGroup ( [ ] string { brokerAddr } , groupID , config )
consumerGroup , err := sarama . NewConsumerGroup ( [ ] string { brokerAddr } , groupID , config )
if err != nil {
if err != nil {
consumerErrors <- fmt . Errorf ( "consumer %d: failed to create consumer group: %v" , consumerID , err )
consumerErrors <- fmt . Errorf ( "consumer %d: failed to create consumer group: %v" , consumerID , err )
@ -95,14 +95,14 @@ func TestConsumerGroup_BasicFunctionality(t *testing.T) {
defer cancel ( )
defer cancel ( )
t . Logf ( "Consumer %d: Starting consumption" , consumerID )
t . Logf ( "Consumer %d: Starting consumption" , consumerID )
// Start consuming
// Start consuming
err = consumerGroup . Consume ( ctx , [ ] string { topicName } , handler )
err = consumerGroup . Consume ( ctx , [ ] string { topicName } , handler )
if err != nil && err != context . DeadlineExceeded {
if err != nil && err != context . DeadlineExceeded {
consumerErrors <- fmt . Errorf ( "consumer %d: consumption error: %v" , consumerID , err )
consumerErrors <- fmt . Errorf ( "consumer %d: consumption error: %v" , consumerID , err )
return
return
}
}
t . Logf ( "Consumer %d: Finished consumption" , consumerID )
t . Logf ( "Consumer %d: Finished consumption" , consumerID )
} ( i )
} ( i )
}
}
@ -164,20 +164,23 @@ func TestConsumerGroup_BasicFunctionality(t *testing.T) {
}
}
}
}
t . Logf ( "🎉 SUCCESS: Consumer group test completed with %d messages consumed by %d consumers" ,
t . Logf ( "🎉 SUCCESS: Consumer group test completed with %d messages consumed by %d consumers" ,
len ( consumedMessages ) , numConsumers )
len ( consumedMessages ) , numConsumers )
}
}
// ConsumerGroupHandler implements sarama.ConsumerGroupHandler
// ConsumerGroupHandler implements sarama.ConsumerGroupHandler
type ConsumerGroupHandler struct {
type ConsumerGroupHandler struct {
messages chan * sarama . ConsumerMessage
ready chan bool
t * testing . T
messages chan * sarama . ConsumerMessage
ready chan bool
readyOnce sync . Once
t * testing . T
}
}
func ( h * ConsumerGroupHandler ) Setup ( sarama . ConsumerGroupSession ) error {
func ( h * ConsumerGroupHandler ) Setup ( sarama . ConsumerGroupSession ) error {
h . t . Logf ( "Consumer group session setup" )
h . t . Logf ( "Consumer group session setup" )
close ( h . ready )
h . readyOnce . Do ( func ( ) {
close ( h . ready )
} )
return nil
return nil
}
}
@ -221,7 +224,7 @@ func TestConsumerGroup_OffsetCommitAndFetch(t *testing.T) {
topicName := "offset-commit-test"
topicName := "offset-commit-test"
groupID := "offset-test-group"
groupID := "offset-test-group"
numMessages := 5
numMessages := 5
// Add topic for testing
// Add topic for testing
gatewayServer . GetHandler ( ) . AddTopicForTesting ( topicName , 1 )
gatewayServer . GetHandler ( ) . AddTopicForTesting ( topicName , 1 )
@ -256,10 +259,10 @@ func TestConsumerGroup_OffsetCommitAndFetch(t *testing.T) {
// First consumer: consume first 3 messages and commit offsets
// First consumer: consume first 3 messages and commit offsets
t . Logf ( "=== First consumer: consuming first 3 messages ===" )
t . Logf ( "=== First consumer: consuming first 3 messages ===" )
handler1 := & OffsetTestHandler {
handler1 := & OffsetTestHandler {
messages : make ( chan * sarama . ConsumerMessage , numMessages ) ,
ready : make ( chan bool ) ,
stopAfter : 3 ,
t : t ,
messages : make ( chan * sarama . ConsumerMessage , numMessages ) ,
ready : make ( chan bool ) ,
stopAfter : 3 ,
t : t ,
}
}
consumerGroup1 , err := sarama . NewConsumerGroup ( [ ] string { brokerAddr } , groupID , config )
consumerGroup1 , err := sarama . NewConsumerGroup ( [ ] string { brokerAddr } , groupID , config )
@ -287,7 +290,7 @@ func TestConsumerGroup_OffsetCommitAndFetch(t *testing.T) {
select {
select {
case msg := <- handler1 . messages :
case msg := <- handler1 . messages :
consumedCount ++
consumedCount ++
t . Logf ( "✅ First consumer message %d: key=%s, offset=%d" ,
t . Logf ( "✅ First consumer message %d: key=%s, offset=%d" ,
consumedCount , string ( msg . Key ) , msg . Offset )
consumedCount , string ( msg . Key ) , msg . Offset )
case <- time . After ( 5 * time . Second ) :
case <- time . After ( 5 * time . Second ) :
t . Fatalf ( "Timeout waiting for first consumer messages" )
t . Fatalf ( "Timeout waiting for first consumer messages" )
@ -304,10 +307,10 @@ func TestConsumerGroup_OffsetCommitAndFetch(t *testing.T) {
// Second consumer: should start from offset 3 (after committed offset)
// Second consumer: should start from offset 3 (after committed offset)
t . Logf ( "=== Second consumer: should resume from offset 3 ===" )
t . Logf ( "=== Second consumer: should resume from offset 3 ===" )
handler2 := & OffsetTestHandler {
handler2 := & OffsetTestHandler {
messages : make ( chan * sarama . ConsumerMessage , numMessages ) ,
ready : make ( chan bool ) ,
stopAfter : 2 , // Should get remaining 2 messages
t : t ,
messages : make ( chan * sarama . ConsumerMessage , numMessages ) ,
ready : make ( chan bool ) ,
stopAfter : 2 , // Should get remaining 2 messages
t : t ,
}
}
consumerGroup2 , err := sarama . NewConsumerGroup ( [ ] string { brokerAddr } , groupID , config )
consumerGroup2 , err := sarama . NewConsumerGroup ( [ ] string { brokerAddr } , groupID , config )
@ -338,7 +341,7 @@ func TestConsumerGroup_OffsetCommitAndFetch(t *testing.T) {
case msg := <- handler2 . messages :
case msg := <- handler2 . messages :
consumedCount ++
consumedCount ++
secondConsumerMessages = append ( secondConsumerMessages , msg )
secondConsumerMessages = append ( secondConsumerMessages , msg )
t . Logf ( "✅ Second consumer message %d: key=%s, offset=%d" ,
t . Logf ( "✅ Second consumer message %d: key=%s, offset=%d" ,
consumedCount , string ( msg . Key ) , msg . Offset )
consumedCount , string ( msg . Key ) , msg . Offset )
case <- time . After ( 5 * time . Second ) :
case <- time . After ( 5 * time . Second ) :
t . Fatalf ( "Timeout waiting for second consumer messages. Got %d/2" , consumedCount )
t . Fatalf ( "Timeout waiting for second consumer messages. Got %d/2" , consumedCount )
@ -362,6 +365,7 @@ func TestConsumerGroup_OffsetCommitAndFetch(t *testing.T) {
type OffsetTestHandler struct {
type OffsetTestHandler struct {
messages chan * sarama . ConsumerMessage
messages chan * sarama . ConsumerMessage
ready chan bool
ready chan bool
readyOnce sync . Once
stopAfter int
stopAfter int
consumed int
consumed int
t * testing . T
t * testing . T
@ -369,7 +373,9 @@ type OffsetTestHandler struct {
func ( h * OffsetTestHandler ) Setup ( sarama . ConsumerGroupSession ) error {
func ( h * OffsetTestHandler ) Setup ( sarama . ConsumerGroupSession ) error {
h . t . Logf ( "Offset test consumer setup" )
h . t . Logf ( "Offset test consumer setup" )
close ( h . ready )
h . readyOnce . Do ( func ( ) {
close ( h . ready )
} )
return nil
return nil
}
}
@ -388,7 +394,7 @@ func (h *OffsetTestHandler) ConsumeClaim(session sarama.ConsumerGroupSession, cl
h . consumed ++
h . consumed ++
h . messages <- message
h . messages <- message
session . MarkMessage ( message , "" )
session . MarkMessage ( message , "" )
// Stop after consuming the specified number of messages
// Stop after consuming the specified number of messages
if h . consumed >= h . stopAfter {
if h . consumed >= h . stopAfter {
h . t . Logf ( "Stopping consumer after %d messages" , h . consumed )
h . t . Logf ( "Stopping consumer after %d messages" , h . consumed )
@ -420,7 +426,7 @@ func TestConsumerGroup_Rebalancing(t *testing.T) {
topicName := "rebalance-test"
topicName := "rebalance-test"
groupID := "rebalance-test-group"
groupID := "rebalance-test-group"
numMessages := 12
numMessages := 12
// Add topic for testing
// Add topic for testing
gatewayServer . GetHandler ( ) . AddTopicForTesting ( topicName , 1 )
gatewayServer . GetHandler ( ) . AddTopicForTesting ( topicName , 1 )
@ -457,7 +463,7 @@ func TestConsumerGroup_Rebalancing(t *testing.T) {
// Start with 2 consumers
// Start with 2 consumers
t . Logf ( "=== Starting 2 initial consumers ===" )
t . Logf ( "=== Starting 2 initial consumers ===" )
handler1 := & RebalanceTestHandler {
handler1 := & RebalanceTestHandler {
messages : make ( chan * sarama . ConsumerMessage , numMessages ) ,
messages : make ( chan * sarama . ConsumerMessage , numMessages ) ,
ready : make ( chan bool ) ,
ready : make ( chan bool ) ,
@ -537,7 +543,7 @@ func TestConsumerGroup_Rebalancing(t *testing.T) {
t . Logf ( "=== Collecting messages from 2 consumers ===" )
t . Logf ( "=== Collecting messages from 2 consumers ===" )
allMessages := make ( [ ] * sarama . ConsumerMessage , 0 )
allMessages := make ( [ ] * sarama . ConsumerMessage , 0 )
messageTimeout := time . After ( 10 * time . Second )
messageTimeout := time . After ( 10 * time . Second )
// Collect at least half the messages
// Collect at least half the messages
for len ( allMessages ) < numMessages / 2 {
for len ( allMessages ) < numMessages / 2 {
select {
select {
@ -556,7 +562,7 @@ func TestConsumerGroup_Rebalancing(t *testing.T) {
// Add a third consumer to trigger rebalancing
// Add a third consumer to trigger rebalancing
t . Logf ( "=== Adding third consumer to trigger rebalancing ===" )
t . Logf ( "=== Adding third consumer to trigger rebalancing ===" )
handler3 := & RebalanceTestHandler {
handler3 := & RebalanceTestHandler {
messages : make ( chan * sarama . ConsumerMessage , numMessages ) ,
messages : make ( chan * sarama . ConsumerMessage , numMessages ) ,
ready : make ( chan bool ) ,
ready : make ( chan bool ) ,
@ -589,7 +595,7 @@ func TestConsumerGroup_Rebalancing(t *testing.T) {
t . Logf ( "Waiting for rebalancing after adding third consumer..." )
t . Logf ( "Waiting for rebalancing after adding third consumer..." )
rebalanceCount = 0
rebalanceCount = 0
rebalanceTimeout := time . After ( 15 * time . Second )
rebalanceTimeout := time . After ( 15 * time . Second )
for rebalanceCount < 3 {
for rebalanceCount < 3 {
select {
select {
case <- handler1 . rebalanced :
case <- handler1 . rebalanced :
@ -610,7 +616,7 @@ func TestConsumerGroup_Rebalancing(t *testing.T) {
// Collect remaining messages from all 3 consumers
// Collect remaining messages from all 3 consumers
t . Logf ( "=== Collecting remaining messages from 3 consumers ===" )
t . Logf ( "=== Collecting remaining messages from 3 consumers ===" )
finalTimeout := time . After ( 10 * time . Second )
finalTimeout := time . After ( 10 * time . Second )
for len ( allMessages ) < numMessages {
for len ( allMessages ) < numMessages {
select {
select {
case msg := <- handler1 . messages :
case msg := <- handler1 . messages :
@ -645,7 +651,7 @@ func TestConsumerGroup_Rebalancing(t *testing.T) {
t . Errorf ( "Found %d duplicate messages during rebalancing" , duplicates )
t . Errorf ( "Found %d duplicate messages during rebalancing" , duplicates )
}
}
t . Logf ( "🎉 SUCCESS: Rebalancing test completed. Consumed %d unique messages with %d consumers" ,
t . Logf ( "🎉 SUCCESS: Rebalancing test completed. Consumed %d unique messages with %d consumers" ,
len ( messageKeys ) , 3 )
len ( messageKeys ) , 3 )
}
}
@ -653,25 +659,26 @@ func TestConsumerGroup_Rebalancing(t *testing.T) {
type RebalanceTestHandler struct {
type RebalanceTestHandler struct {
messages chan * sarama . ConsumerMessage
messages chan * sarama . ConsumerMessage
ready chan bool
ready chan bool
readyOnce sync . Once
rebalanced chan bool
rebalanced chan bool
consumerID string
consumerID string
t * testing . T
t * testing . T
}
}
func ( h * RebalanceTestHandler ) Setup ( session sarama . ConsumerGroupSession ) error {
func ( h * RebalanceTestHandler ) Setup ( session sarama . ConsumerGroupSession ) error {
h . t . Logf ( "%s: Setup - Generation: %d, Claims: %v" ,
h . t . Logf ( "%s: Setup - Generation: %d, Claims: %v" ,
h . consumerID , session . GenerationID ( ) , session . Claims ( ) )
h . consumerID , session . GenerationID ( ) , session . Claims ( ) )
select {
select {
case h . rebalanced <- true :
case h . rebalanced <- true :
default :
default :
}
}
select {
select {
case h . ready <- true :
case h . ready <- true :
default :
default :
}
}
return nil
return nil
}
}
@ -682,7 +689,7 @@ func (h *RebalanceTestHandler) Cleanup(session sarama.ConsumerGroupSession) erro
func ( h * RebalanceTestHandler ) ConsumeClaim ( session sarama . ConsumerGroupSession , claim sarama . ConsumerGroupClaim ) error {
func ( h * RebalanceTestHandler ) ConsumeClaim ( session sarama . ConsumerGroupSession , claim sarama . ConsumerGroupClaim ) error {
h . t . Logf ( "%s: Starting to consume partition %d" , h . consumerID , claim . Partition ( ) )
h . t . Logf ( "%s: Starting to consume partition %d" , h . consumerID , claim . Partition ( ) )
for {
for {
select {
select {
case message := <- claim . Messages ( ) :
case message := <- claim . Messages ( ) :