diff --git a/test/kafka/kafka-client-loadtest/Dockerfile.seektest b/test/kafka/kafka-client-loadtest/Dockerfile.seektest new file mode 100644 index 000000000..5ce9d9602 --- /dev/null +++ b/test/kafka/kafka-client-loadtest/Dockerfile.seektest @@ -0,0 +1,20 @@ +FROM openjdk:11-jdk-slim + +# Install Maven +RUN apt-get update && apt-get install -y maven && rm -rf /var/lib/apt/lists/* + +WORKDIR /app + +# Create source directory +RUN mkdir -p src/main/java + +# Copy source and build files +COPY SeekToBeginningTest.java src/main/java/ +COPY pom.xml . + +# Compile and package +RUN mvn clean package -DskipTests + +# Run the test +ENTRYPOINT ["java", "-cp", "target/seek-test.jar", "SeekToBeginningTest"] +CMD ["kafka-gateway:9093"] diff --git a/test/kafka/kafka-client-loadtest/SeekToBeginningTest.java b/test/kafka/kafka-client-loadtest/SeekToBeginningTest.java new file mode 100644 index 000000000..65b0c11d8 --- /dev/null +++ b/test/kafka/kafka-client-loadtest/SeekToBeginningTest.java @@ -0,0 +1,73 @@ +import org.apache.kafka.clients.consumer.*; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import java.util.*; + +/** + * Test program to reproduce the seekToBeginning() hang issue + * + * This simulates what Schema Registry does: + * 1. Create KafkaConsumer + * 2. Assign to partition + * 3. Call seekToBeginning() + * 4. Poll for records + * + * Expected behavior: Consumer should send ListOffsets and then Fetch requests + * Observed behavior: Consumer sends InitProducerId but not ListOffsets, then + * hangs + */ +public class SeekToBeginningTest { + public static void main(String[] args) throws Exception { + String bootstrapServers = "localhost:9093"; + String topicName = "_schemas"; + + if (args.length > 0) { + bootstrapServers = args[0]; + } + + Properties props = new Properties(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-seek-group"); + props.put(ConsumerConfig.CLIENT_ID_CONFIG, "test-seek-client"); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); + props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "45000"); + props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "60000"); + + System.out.println("[TEST] Creating KafkaConsumer connecting to " + bootstrapServers); + KafkaConsumer consumer = new KafkaConsumer<>(props); + + TopicPartition tp = new TopicPartition(topicName, 0); + List partitions = Arrays.asList(tp); + + System.out.println("[TEST] Assigning to partition: " + tp); + consumer.assign(partitions); + + System.out.println("[TEST] Calling seekToBeginning()..."); + long startTime = System.currentTimeMillis(); + consumer.seekToBeginning(partitions); + long seekTime = System.currentTimeMillis() - startTime; + System.out.println("[TEST] seekToBeginning() completed in " + seekTime + "ms"); + + System.out.println("[TEST] Starting poll loop (30 second timeout per poll)..."); + for (int i = 0; i < 3; i++) { + System.out.println("[POLL " + (i + 1) + "] Polling for records..."); + long pollStart = System.currentTimeMillis(); + ConsumerRecords records = consumer.poll(java.time.Duration.ofSeconds(30)); + long pollTime = System.currentTimeMillis() - pollStart; + System.out.println("[POLL " + (i + 1) + "] Got " + records.count() + " records in " + pollTime + "ms"); + + for (ConsumerRecord record : records) { + System.out.println(" [RECORD] offset=" + record.offset() + ", key.len=" + + (record.key() != null ? record.key().length : 0) + + ", value.len=" + (record.value() != null ? record.value().length : 0)); + } + } + + System.out.println("[TEST] Closing consumer..."); + consumer.close(); + System.out.println("[TEST] Done!"); + } +} diff --git a/test/kafka/kafka-client-loadtest/docker-compose.yml b/test/kafka/kafka-client-loadtest/docker-compose.yml index fa88117e9..3555855b0 100644 --- a/test/kafka/kafka-client-loadtest/docker-compose.yml +++ b/test/kafka/kafka-client-loadtest/docker-compose.yml @@ -307,6 +307,24 @@ services: profiles: - debug + # SeekToBeginning test - reproduces the hang issue + seek-test: + build: + context: . + dockerfile: Dockerfile.seektest + container_name: loadtest-seek-test + depends_on: + kafka-gateway: + condition: service_healthy + schema-registry: + condition: service_healthy + environment: + - KAFKA_BOOTSTRAP_SERVERS=kafka-gateway:9093 + networks: + - kafka-loadtest-net + entrypoint: ["java", "-cp", "target/seek-test.jar", "SeekToBeginningTest"] + command: ["kafka-gateway:9093"] + volumes: prometheus-data: grafana-data: diff --git a/test/kafka/kafka-client-loadtest/pom.xml b/test/kafka/kafka-client-loadtest/pom.xml new file mode 100644 index 000000000..76422199b --- /dev/null +++ b/test/kafka/kafka-client-loadtest/pom.xml @@ -0,0 +1,60 @@ + + + 4.0.0 + + io.confluent.test + seek-test + 1.0 + + + 11 + 11 + 3.6.0 + + + + + org.apache.kafka + kafka-clients + ${kafka.version} + + + org.slf4j + slf4j-simple + 2.0.0 + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.8.1 + + + org.apache.maven.plugins + maven-shade-plugin + 3.2.4 + + + package + + shade + + + + + SeekToBeginningTest + + + seek-test + + + + + + +