diff --git a/test/kafka/client_integration_test.go b/test/kafka/client_integration_test.go index 493bd3b9c..0c1f94ff1 100644 --- a/test/kafka/client_integration_test.go +++ b/test/kafka/client_integration_test.go @@ -76,6 +76,11 @@ func TestKafkaGoClient_BasicProduceConsume(t *testing.T) { } t.Logf("Successfully produced and consumed %d messages", len(consumedMessages)) + + // Add a small delay to ensure all kafka-go goroutines are cleaned up + fmt.Printf("DEBUG: Waiting for cleanup...\n") + time.Sleep(2 * time.Second) + fmt.Printf("DEBUG: Cleanup wait completed\n") } // TestKafkaGoClient_ConsumerGroups tests consumer group functionality @@ -242,12 +247,20 @@ func produceMessages(brokerAddr, topicName string, messages []kafka.Message) err fmt.Printf("PRODUCER ERROR: "+msg+"\n", args...) }), } - defer writer.Close() + defer func() { + fmt.Printf("DEBUG: Closing kafka writer\n") + if err := writer.Close(); err != nil { + fmt.Printf("DEBUG: Error closing writer: %v\n", err) + } + fmt.Printf("DEBUG: Kafka writer closed\n") + }() ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - return writer.WriteMessages(ctx, messages...) + err := writer.WriteMessages(ctx, messages...) + fmt.Printf("DEBUG: WriteMessages completed with error: %v\n", err) + return err } func consumeMessages(brokerAddr, topicName string, expectedCount int) ([]kafka.Message, error) { @@ -266,7 +279,13 @@ func consumeMessages(brokerAddr, topicName string, expectedCount int) ([]kafka.M fmt.Printf("CONSUMER ERROR: "+msg+"\n", args...) }), }) - defer reader.Close() + defer func() { + fmt.Printf("DEBUG: Closing kafka reader\n") + if err := reader.Close(); err != nil { + fmt.Printf("DEBUG: Error closing reader: %v\n", err) + } + fmt.Printf("DEBUG: Kafka reader closed\n") + }() ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() @@ -275,11 +294,13 @@ func consumeMessages(brokerAddr, topicName string, expectedCount int) ([]kafka.M for i := 0; i < expectedCount; i++ { msg, err := reader.ReadMessage(ctx) if err != nil { + fmt.Printf("DEBUG: Error reading message %d: %v\n", i, err) return messages, fmt.Errorf("read message %d: %w", i, err) } messages = append(messages, msg) } + fmt.Printf("DEBUG: Successfully read %d messages\n", len(messages)) return messages, nil }