|
|
|
@ -76,6 +76,11 @@ func TestKafkaGoClient_BasicProduceConsume(t *testing.T) { |
|
|
|
} |
|
|
|
|
|
|
|
t.Logf("Successfully produced and consumed %d messages", len(consumedMessages)) |
|
|
|
|
|
|
|
// Add a small delay to ensure all kafka-go goroutines are cleaned up
|
|
|
|
fmt.Printf("DEBUG: Waiting for cleanup...\n") |
|
|
|
time.Sleep(2 * time.Second) |
|
|
|
fmt.Printf("DEBUG: Cleanup wait completed\n") |
|
|
|
} |
|
|
|
|
|
|
|
// TestKafkaGoClient_ConsumerGroups tests consumer group functionality
|
|
|
|
@ -242,12 +247,20 @@ func produceMessages(brokerAddr, topicName string, messages []kafka.Message) err |
|
|
|
fmt.Printf("PRODUCER ERROR: "+msg+"\n", args...) |
|
|
|
}), |
|
|
|
} |
|
|
|
defer writer.Close() |
|
|
|
defer func() { |
|
|
|
fmt.Printf("DEBUG: Closing kafka writer\n") |
|
|
|
if err := writer.Close(); err != nil { |
|
|
|
fmt.Printf("DEBUG: Error closing writer: %v\n", err) |
|
|
|
} |
|
|
|
fmt.Printf("DEBUG: Kafka writer closed\n") |
|
|
|
}() |
|
|
|
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) |
|
|
|
defer cancel() |
|
|
|
|
|
|
|
return writer.WriteMessages(ctx, messages...) |
|
|
|
err := writer.WriteMessages(ctx, messages...) |
|
|
|
fmt.Printf("DEBUG: WriteMessages completed with error: %v\n", err) |
|
|
|
return err |
|
|
|
} |
|
|
|
|
|
|
|
func consumeMessages(brokerAddr, topicName string, expectedCount int) ([]kafka.Message, error) { |
|
|
|
@ -266,7 +279,13 @@ func consumeMessages(brokerAddr, topicName string, expectedCount int) ([]kafka.M |
|
|
|
fmt.Printf("CONSUMER ERROR: "+msg+"\n", args...) |
|
|
|
}), |
|
|
|
}) |
|
|
|
defer reader.Close() |
|
|
|
defer func() { |
|
|
|
fmt.Printf("DEBUG: Closing kafka reader\n") |
|
|
|
if err := reader.Close(); err != nil { |
|
|
|
fmt.Printf("DEBUG: Error closing reader: %v\n", err) |
|
|
|
} |
|
|
|
fmt.Printf("DEBUG: Kafka reader closed\n") |
|
|
|
}() |
|
|
|
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) |
|
|
|
defer cancel() |
|
|
|
@ -275,11 +294,13 @@ func consumeMessages(brokerAddr, topicName string, expectedCount int) ([]kafka.M |
|
|
|
for i := 0; i < expectedCount; i++ { |
|
|
|
msg, err := reader.ReadMessage(ctx) |
|
|
|
if err != nil { |
|
|
|
fmt.Printf("DEBUG: Error reading message %d: %v\n", i, err) |
|
|
|
return messages, fmt.Errorf("read message %d: %w", i, err) |
|
|
|
} |
|
|
|
messages = append(messages, msg) |
|
|
|
} |
|
|
|
|
|
|
|
fmt.Printf("DEBUG: Successfully read %d messages\n", len(messages)) |
|
|
|
return messages, nil |
|
|
|
} |
|
|
|
|
|
|
|
|