diff --git a/test/kafka/kafka-client-loadtest/SeekToBeginningTest.java b/test/kafka/kafka-client-loadtest/SeekToBeginningTest.java index 65b0c11d8..e2f059285 100644 --- a/test/kafka/kafka-client-loadtest/SeekToBeginningTest.java +++ b/test/kafka/kafka-client-loadtest/SeekToBeginningTest.java @@ -1,22 +1,24 @@ import org.apache.kafka.clients.consumer.*; +import org.apache.kafka.clients.consumer.internals.*; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.errors.TimeoutException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.*; /** - * Test program to reproduce the seekToBeginning() hang issue + * Enhanced test program to reproduce and diagnose the seekToBeginning() hang issue * - * This simulates what Schema Registry does: - * 1. Create KafkaConsumer - * 2. Assign to partition - * 3. Call seekToBeginning() - * 4. Poll for records - * - * Expected behavior: Consumer should send ListOffsets and then Fetch requests - * Observed behavior: Consumer sends InitProducerId but not ListOffsets, then - * hangs + * This test: + * 1. Adds detailed logging of Kafka client operations + * 2. Captures exceptions and timeouts + * 3. Shows what the consumer is waiting for + * 4. Tracks request/response lifecycle */ public class SeekToBeginningTest { + private static final Logger log = LoggerFactory.getLogger(SeekToBeginningTest.class); + public static void main(String[] args) throws Exception { String bootstrapServers = "localhost:9093"; String topicName = "_schemas"; @@ -35,39 +37,143 @@ public class SeekToBeginningTest { props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "45000"); props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "60000"); + + // Add comprehensive debug logging + props.put("log4j.logger.org.apache.kafka.clients.consumer.internals", "DEBUG"); + props.put("log4j.logger.org.apache.kafka.clients.producer.internals", "DEBUG"); + props.put("log4j.logger.org.apache.kafka.clients.Metadata", "DEBUG"); + + // Add shorter timeouts to fail faster + props.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "10000"); // 10 seconds instead of 60 + + System.out.println("\n╔════════════════════════════════════════════════════════════╗"); + System.out.println("║ SeekToBeginning Diagnostic Test ║"); + System.out.println("║ Connecting to: " + bootstrapServers.padEnd(42) + "║"); + System.out.println("╚════════════════════════════════════════════════════════════╝\n"); - System.out.println("[TEST] Creating KafkaConsumer connecting to " + bootstrapServers); + System.out.println("[TEST] Creating KafkaConsumer..."); + System.out.println("[TEST] Bootstrap servers: " + bootstrapServers); + System.out.println("[TEST] Group ID: test-seek-group"); + System.out.println("[TEST] Client ID: test-seek-client"); + KafkaConsumer consumer = new KafkaConsumer<>(props); TopicPartition tp = new TopicPartition(topicName, 0); List partitions = Arrays.asList(tp); - System.out.println("[TEST] Assigning to partition: " + tp); + System.out.println("\n[STEP 1] Assigning to partition: " + tp); consumer.assign(partitions); + System.out.println("[STEP 1] ✓ Assigned successfully"); - System.out.println("[TEST] Calling seekToBeginning()..."); + System.out.println("\n[STEP 2] Calling seekToBeginning()..."); long startTime = System.currentTimeMillis(); - consumer.seekToBeginning(partitions); - long seekTime = System.currentTimeMillis() - startTime; - System.out.println("[TEST] seekToBeginning() completed in " + seekTime + "ms"); + try { + consumer.seekToBeginning(partitions); + long seekTime = System.currentTimeMillis() - startTime; + System.out.println("[STEP 2] ✓ seekToBeginning() completed in " + seekTime + "ms"); + } catch (Exception e) { + System.out.println("[STEP 2] ✗ EXCEPTION in seekToBeginning():"); + e.printStackTrace(); + consumer.close(); + return; + } + + System.out.println("\n[STEP 3] Starting poll loop..."); + System.out.println("[STEP 3] First poll will trigger offset lookup (ListOffsets)"); + System.out.println("[STEP 3] Then will fetch initial records\n"); + + int successfulPolls = 0; + int failedPolls = 0; + int totalRecords = 0; - System.out.println("[TEST] Starting poll loop (30 second timeout per poll)..."); for (int i = 0; i < 3; i++) { - System.out.println("[POLL " + (i + 1) + "] Polling for records..."); + System.out.println("═══════════════════════════════════════════════════════════"); + System.out.println("[POLL " + (i + 1) + "] Starting poll with 15-second timeout..."); long pollStart = System.currentTimeMillis(); - ConsumerRecords records = consumer.poll(java.time.Duration.ofSeconds(30)); - long pollTime = System.currentTimeMillis() - pollStart; - System.out.println("[POLL " + (i + 1) + "] Got " + records.count() + " records in " + pollTime + "ms"); - - for (ConsumerRecord record : records) { - System.out.println(" [RECORD] offset=" + record.offset() + ", key.len=" + - (record.key() != null ? record.key().length : 0) + - ", value.len=" + (record.value() != null ? record.value().length : 0)); + + try { + System.out.println("[POLL " + (i + 1) + "] Calling consumer.poll()..."); + ConsumerRecords records = consumer.poll(java.time.Duration.ofSeconds(15)); + long pollTime = System.currentTimeMillis() - pollStart; + + System.out.println("[POLL " + (i + 1) + "] ✓ Poll completed in " + pollTime + "ms"); + System.out.println("[POLL " + (i + 1) + "] Records received: " + records.count()); + + if (records.count() > 0) { + successfulPolls++; + totalRecords += records.count(); + for (ConsumerRecord record : records) { + System.out.println(" [RECORD] offset=" + record.offset() + + ", key.len=" + (record.key() != null ? record.key().length : 0) + + ", value.len=" + (record.value() != null ? record.value().length : 0)); + } + } else { + System.out.println("[POLL " + (i + 1) + "] ℹ No records in this poll (but no error)"); + successfulPolls++; + } + } catch (TimeoutException e) { + long pollTime = System.currentTimeMillis() - pollStart; + failedPolls++; + System.out.println("[POLL " + (i + 1) + "] ✗ TIMEOUT after " + pollTime + "ms"); + System.out.println("[POLL " + (i + 1) + "] This means consumer is waiting for something from broker"); + System.out.println("[POLL " + (i + 1) + "] Possible causes:"); + System.out.println(" - ListOffsetsRequest never sent"); + System.out.println(" - ListOffsetsResponse not received"); + System.out.println(" - Broker metadata parsing failed"); + System.out.println(" - Connection issue"); + + // Print current position info if available + try { + long position = consumer.position(tp); + System.out.println("[POLL " + (i + 1) + "] Current position: " + position); + } catch (Exception e2) { + System.out.println("[POLL " + (i + 1) + "] Could not get position: " + e2.getMessage()); + } + } catch (Exception e) { + failedPolls++; + long pollTime = System.currentTimeMillis() - pollStart; + System.out.println("[POLL " + (i + 1) + "] ✗ EXCEPTION after " + pollTime + "ms:"); + System.out.println("[POLL " + (i + 1) + "] Exception type: " + e.getClass().getSimpleName()); + System.out.println("[POLL " + (i + 1) + "] Message: " + e.getMessage()); + + // Print stack trace for first exception + if (i == 0) { + System.out.println("[POLL " + (i + 1) + "] Stack trace:"); + e.printStackTrace(); + } } } - System.out.println("[TEST] Closing consumer..."); - consumer.close(); - System.out.println("[TEST] Done!"); + System.out.println("\n═══════════════════════════════════════════════════════════"); + System.out.println("[RESULTS] Test Summary:"); + System.out.println(" Successful polls: " + successfulPolls); + System.out.println(" Failed polls: " + failedPolls); + System.out.println(" Total records received: " + totalRecords); + + if (failedPolls > 0) { + System.out.println("\n[DIAGNOSIS] Consumer is BLOCKED during poll()"); + System.out.println(" This indicates the consumer cannot:"); + System.out.println(" 1. Send ListOffsetsRequest to determine offset 0, OR"); + System.out.println(" 2. Receive/parse ListOffsetsResponse from broker, OR"); + System.out.println(" 3. Parse broker metadata for partition leader lookup"); + } else if (totalRecords == 0) { + System.out.println("\n[DIAGNOSIS] Consumer is working but NO records found"); + System.out.println(" This might mean:"); + System.out.println(" 1. Topic has no messages, OR"); + System.out.println(" 2. Fetch is working but broker returns empty"); + } else { + System.out.println("\n[SUCCESS] Consumer working correctly!"); + System.out.println(" Received " + totalRecords + " records"); + } + + System.out.println("\n[CLEANUP] Closing consumer..."); + try { + consumer.close(); + System.out.println("[CLEANUP] ✓ Consumer closed successfully"); + } catch (Exception e) { + System.out.println("[CLEANUP] ✗ Error closing consumer: " + e.getMessage()); + } + + System.out.println("\n[TEST] Done!\n"); } } diff --git a/test/kafka/kafka-client-loadtest/log4j2.properties b/test/kafka/kafka-client-loadtest/log4j2.properties new file mode 100644 index 000000000..1461240e0 --- /dev/null +++ b/test/kafka/kafka-client-loadtest/log4j2.properties @@ -0,0 +1,13 @@ +# Set everything to debug +log4j.rootLogger=INFO, CONSOLE + +# Enable DEBUG for Kafka client internals +log4j.logger.org.apache.kafka.clients.consumer=DEBUG +log4j.logger.org.apache.kafka.clients.producer=DEBUG +log4j.logger.org.apache.kafka.clients.Metadata=DEBUG +log4j.logger.org.apache.kafka.common.network=WARN +log4j.logger.org.apache.kafka.common.utils=WARN + +log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender +log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout +log4j.appender.CONSOLE.layout.ConversionPattern=[%d{HH:mm:ss}] [%-5p] [%c] %m%n