You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
73 lines
3.3 KiB
73 lines
3.3 KiB
import org.apache.kafka.clients.consumer.*;
|
|
import org.apache.kafka.common.TopicPartition;
|
|
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
|
|
import java.util.*;
|
|
|
|
/**
|
|
* Test program to reproduce the seekToBeginning() hang issue
|
|
*
|
|
* This simulates what Schema Registry does:
|
|
* 1. Create KafkaConsumer
|
|
* 2. Assign to partition
|
|
* 3. Call seekToBeginning()
|
|
* 4. Poll for records
|
|
*
|
|
* Expected behavior: Consumer should send ListOffsets and then Fetch requests
|
|
* Observed behavior: Consumer sends InitProducerId but not ListOffsets, then
|
|
* hangs
|
|
*/
|
|
public class SeekToBeginningTest {
|
|
public static void main(String[] args) throws Exception {
|
|
String bootstrapServers = "localhost:9093";
|
|
String topicName = "_schemas";
|
|
|
|
if (args.length > 0) {
|
|
bootstrapServers = args[0];
|
|
}
|
|
|
|
Properties props = new Properties();
|
|
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
|
|
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-seek-group");
|
|
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "test-seek-client");
|
|
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
|
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
|
|
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
|
|
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
|
|
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "45000");
|
|
props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "60000");
|
|
|
|
System.out.println("[TEST] Creating KafkaConsumer connecting to " + bootstrapServers);
|
|
KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props);
|
|
|
|
TopicPartition tp = new TopicPartition(topicName, 0);
|
|
List<TopicPartition> partitions = Arrays.asList(tp);
|
|
|
|
System.out.println("[TEST] Assigning to partition: " + tp);
|
|
consumer.assign(partitions);
|
|
|
|
System.out.println("[TEST] Calling seekToBeginning()...");
|
|
long startTime = System.currentTimeMillis();
|
|
consumer.seekToBeginning(partitions);
|
|
long seekTime = System.currentTimeMillis() - startTime;
|
|
System.out.println("[TEST] seekToBeginning() completed in " + seekTime + "ms");
|
|
|
|
System.out.println("[TEST] Starting poll loop (30 second timeout per poll)...");
|
|
for (int i = 0; i < 3; i++) {
|
|
System.out.println("[POLL " + (i + 1) + "] Polling for records...");
|
|
long pollStart = System.currentTimeMillis();
|
|
ConsumerRecords<byte[], byte[]> records = consumer.poll(java.time.Duration.ofSeconds(30));
|
|
long pollTime = System.currentTimeMillis() - pollStart;
|
|
System.out.println("[POLL " + (i + 1) + "] Got " + records.count() + " records in " + pollTime + "ms");
|
|
|
|
for (ConsumerRecord<byte[], byte[]> record : records) {
|
|
System.out.println(" [RECORD] offset=" + record.offset() + ", key.len=" +
|
|
(record.key() != null ? record.key().length : 0) +
|
|
", value.len=" + (record.value() != null ? record.value().length : 0));
|
|
}
|
|
}
|
|
|
|
System.out.println("[TEST] Closing consumer...");
|
|
consumer.close();
|
|
System.out.println("[TEST] Done!");
|
|
}
|
|
}
|