From 7a509adc232fbe1fe5a70f03346871739fb954ab Mon Sep 17 00:00:00 2001 From: chrislu Date: Thu, 16 Oct 2025 17:53:44 -0700 Subject: [PATCH] test: Enhanced SeekToBeginningTest with detailed request/response tracking MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## What's New This enhanced Java diagnostic client adds detailed logging to understand exactly what the Kafka consumer is waiting for during seekToBeginning() + poll(): ### Features 1. **Detailed Exception Diagnosis** - Catches TimeoutException and reports what consumer is blocked on - Shows exception type and message - Suggests possible root causes 2. **Request/Response Tracking** - Shows when each operation completes or times out - Tracks timing for each poll() attempt - Reports records received vs expected 3. **Comprehensive Output** - Clear separation of steps (assign → seek → poll) - Summary statistics (successful/failed polls, total records) - Automated diagnosis of the issue 4. **Faster Feedback** - Reduced timeout from 30s to 15s per poll - Reduced default API timeout from 60s to 10s - Fails faster so we can iterate ### Expected Output **Success:** **Failure (what we're debugging):** ### How to Run ### Debugging Value This test will help us determine: 1. Is seekToBeginning() blocking? 2. Does poll() send ListOffsetsRequest? 3. Can consumer parse Metadata? 4. Are response messages malformed? 5. Is this a gateway bug or Kafka client issue? --- .../SeekToBeginningTest.java | 164 ++++++++++++++---- .../kafka-client-loadtest/log4j2.properties | 13 ++ 2 files changed, 148 insertions(+), 29 deletions(-) create mode 100644 test/kafka/kafka-client-loadtest/log4j2.properties diff --git a/test/kafka/kafka-client-loadtest/SeekToBeginningTest.java b/test/kafka/kafka-client-loadtest/SeekToBeginningTest.java index 65b0c11d8..e2f059285 100644 --- a/test/kafka/kafka-client-loadtest/SeekToBeginningTest.java +++ b/test/kafka/kafka-client-loadtest/SeekToBeginningTest.java @@ -1,22 +1,24 @@ import org.apache.kafka.clients.consumer.*; +import org.apache.kafka.clients.consumer.internals.*; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.errors.TimeoutException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.*; /** - * Test program to reproduce the seekToBeginning() hang issue + * Enhanced test program to reproduce and diagnose the seekToBeginning() hang issue * - * This simulates what Schema Registry does: - * 1. Create KafkaConsumer - * 2. Assign to partition - * 3. Call seekToBeginning() - * 4. Poll for records - * - * Expected behavior: Consumer should send ListOffsets and then Fetch requests - * Observed behavior: Consumer sends InitProducerId but not ListOffsets, then - * hangs + * This test: + * 1. Adds detailed logging of Kafka client operations + * 2. Captures exceptions and timeouts + * 3. Shows what the consumer is waiting for + * 4. Tracks request/response lifecycle */ public class SeekToBeginningTest { + private static final Logger log = LoggerFactory.getLogger(SeekToBeginningTest.class); + public static void main(String[] args) throws Exception { String bootstrapServers = "localhost:9093"; String topicName = "_schemas"; @@ -35,39 +37,143 @@ public class SeekToBeginningTest { props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "45000"); props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "60000"); + + // Add comprehensive debug logging + props.put("log4j.logger.org.apache.kafka.clients.consumer.internals", "DEBUG"); + props.put("log4j.logger.org.apache.kafka.clients.producer.internals", "DEBUG"); + props.put("log4j.logger.org.apache.kafka.clients.Metadata", "DEBUG"); + + // Add shorter timeouts to fail faster + props.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "10000"); // 10 seconds instead of 60 + + System.out.println("\n╔════════════════════════════════════════════════════════════╗"); + System.out.println("║ SeekToBeginning Diagnostic Test ║"); + System.out.println("║ Connecting to: " + bootstrapServers.padEnd(42) + "║"); + System.out.println("╚════════════════════════════════════════════════════════════╝\n"); - System.out.println("[TEST] Creating KafkaConsumer connecting to " + bootstrapServers); + System.out.println("[TEST] Creating KafkaConsumer..."); + System.out.println("[TEST] Bootstrap servers: " + bootstrapServers); + System.out.println("[TEST] Group ID: test-seek-group"); + System.out.println("[TEST] Client ID: test-seek-client"); + KafkaConsumer consumer = new KafkaConsumer<>(props); TopicPartition tp = new TopicPartition(topicName, 0); List partitions = Arrays.asList(tp); - System.out.println("[TEST] Assigning to partition: " + tp); + System.out.println("\n[STEP 1] Assigning to partition: " + tp); consumer.assign(partitions); + System.out.println("[STEP 1] ✓ Assigned successfully"); - System.out.println("[TEST] Calling seekToBeginning()..."); + System.out.println("\n[STEP 2] Calling seekToBeginning()..."); long startTime = System.currentTimeMillis(); - consumer.seekToBeginning(partitions); - long seekTime = System.currentTimeMillis() - startTime; - System.out.println("[TEST] seekToBeginning() completed in " + seekTime + "ms"); + try { + consumer.seekToBeginning(partitions); + long seekTime = System.currentTimeMillis() - startTime; + System.out.println("[STEP 2] ✓ seekToBeginning() completed in " + seekTime + "ms"); + } catch (Exception e) { + System.out.println("[STEP 2] ✗ EXCEPTION in seekToBeginning():"); + e.printStackTrace(); + consumer.close(); + return; + } + + System.out.println("\n[STEP 3] Starting poll loop..."); + System.out.println("[STEP 3] First poll will trigger offset lookup (ListOffsets)"); + System.out.println("[STEP 3] Then will fetch initial records\n"); + + int successfulPolls = 0; + int failedPolls = 0; + int totalRecords = 0; - System.out.println("[TEST] Starting poll loop (30 second timeout per poll)..."); for (int i = 0; i < 3; i++) { - System.out.println("[POLL " + (i + 1) + "] Polling for records..."); + System.out.println("═══════════════════════════════════════════════════════════"); + System.out.println("[POLL " + (i + 1) + "] Starting poll with 15-second timeout..."); long pollStart = System.currentTimeMillis(); - ConsumerRecords records = consumer.poll(java.time.Duration.ofSeconds(30)); - long pollTime = System.currentTimeMillis() - pollStart; - System.out.println("[POLL " + (i + 1) + "] Got " + records.count() + " records in " + pollTime + "ms"); - - for (ConsumerRecord record : records) { - System.out.println(" [RECORD] offset=" + record.offset() + ", key.len=" + - (record.key() != null ? record.key().length : 0) + - ", value.len=" + (record.value() != null ? record.value().length : 0)); + + try { + System.out.println("[POLL " + (i + 1) + "] Calling consumer.poll()..."); + ConsumerRecords records = consumer.poll(java.time.Duration.ofSeconds(15)); + long pollTime = System.currentTimeMillis() - pollStart; + + System.out.println("[POLL " + (i + 1) + "] ✓ Poll completed in " + pollTime + "ms"); + System.out.println("[POLL " + (i + 1) + "] Records received: " + records.count()); + + if (records.count() > 0) { + successfulPolls++; + totalRecords += records.count(); + for (ConsumerRecord record : records) { + System.out.println(" [RECORD] offset=" + record.offset() + + ", key.len=" + (record.key() != null ? record.key().length : 0) + + ", value.len=" + (record.value() != null ? record.value().length : 0)); + } + } else { + System.out.println("[POLL " + (i + 1) + "] ℹ No records in this poll (but no error)"); + successfulPolls++; + } + } catch (TimeoutException e) { + long pollTime = System.currentTimeMillis() - pollStart; + failedPolls++; + System.out.println("[POLL " + (i + 1) + "] ✗ TIMEOUT after " + pollTime + "ms"); + System.out.println("[POLL " + (i + 1) + "] This means consumer is waiting for something from broker"); + System.out.println("[POLL " + (i + 1) + "] Possible causes:"); + System.out.println(" - ListOffsetsRequest never sent"); + System.out.println(" - ListOffsetsResponse not received"); + System.out.println(" - Broker metadata parsing failed"); + System.out.println(" - Connection issue"); + + // Print current position info if available + try { + long position = consumer.position(tp); + System.out.println("[POLL " + (i + 1) + "] Current position: " + position); + } catch (Exception e2) { + System.out.println("[POLL " + (i + 1) + "] Could not get position: " + e2.getMessage()); + } + } catch (Exception e) { + failedPolls++; + long pollTime = System.currentTimeMillis() - pollStart; + System.out.println("[POLL " + (i + 1) + "] ✗ EXCEPTION after " + pollTime + "ms:"); + System.out.println("[POLL " + (i + 1) + "] Exception type: " + e.getClass().getSimpleName()); + System.out.println("[POLL " + (i + 1) + "] Message: " + e.getMessage()); + + // Print stack trace for first exception + if (i == 0) { + System.out.println("[POLL " + (i + 1) + "] Stack trace:"); + e.printStackTrace(); + } } } - System.out.println("[TEST] Closing consumer..."); - consumer.close(); - System.out.println("[TEST] Done!"); + System.out.println("\n═══════════════════════════════════════════════════════════"); + System.out.println("[RESULTS] Test Summary:"); + System.out.println(" Successful polls: " + successfulPolls); + System.out.println(" Failed polls: " + failedPolls); + System.out.println(" Total records received: " + totalRecords); + + if (failedPolls > 0) { + System.out.println("\n[DIAGNOSIS] Consumer is BLOCKED during poll()"); + System.out.println(" This indicates the consumer cannot:"); + System.out.println(" 1. Send ListOffsetsRequest to determine offset 0, OR"); + System.out.println(" 2. Receive/parse ListOffsetsResponse from broker, OR"); + System.out.println(" 3. Parse broker metadata for partition leader lookup"); + } else if (totalRecords == 0) { + System.out.println("\n[DIAGNOSIS] Consumer is working but NO records found"); + System.out.println(" This might mean:"); + System.out.println(" 1. Topic has no messages, OR"); + System.out.println(" 2. Fetch is working but broker returns empty"); + } else { + System.out.println("\n[SUCCESS] Consumer working correctly!"); + System.out.println(" Received " + totalRecords + " records"); + } + + System.out.println("\n[CLEANUP] Closing consumer..."); + try { + consumer.close(); + System.out.println("[CLEANUP] ✓ Consumer closed successfully"); + } catch (Exception e) { + System.out.println("[CLEANUP] ✗ Error closing consumer: " + e.getMessage()); + } + + System.out.println("\n[TEST] Done!\n"); } } diff --git a/test/kafka/kafka-client-loadtest/log4j2.properties b/test/kafka/kafka-client-loadtest/log4j2.properties new file mode 100644 index 000000000..1461240e0 --- /dev/null +++ b/test/kafka/kafka-client-loadtest/log4j2.properties @@ -0,0 +1,13 @@ +# Set everything to debug +log4j.rootLogger=INFO, CONSOLE + +# Enable DEBUG for Kafka client internals +log4j.logger.org.apache.kafka.clients.consumer=DEBUG +log4j.logger.org.apache.kafka.clients.producer=DEBUG +log4j.logger.org.apache.kafka.clients.Metadata=DEBUG +log4j.logger.org.apache.kafka.common.network=WARN +log4j.logger.org.apache.kafka.common.utils=WARN + +log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender +log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout +log4j.appender.CONSOLE.layout.ConversionPattern=[%d{HH:mm:ss}] [%-5p] [%c] %m%n