From ad471d25ab0796a1d8ae3a5700fcbb7043a39a41 Mon Sep 17 00:00:00 2001 From: chrislu Date: Thu, 16 Oct 2025 16:53:58 -0700 Subject: [PATCH] investigation: Schema Registry producer sends InitProducerId with idempotence enabled MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Discovery KafkaStore.java line 136: When idempotence is enabled: - Producer sends InitProducerId on creation - This is NORMAL Kafka behavior ## Timeline 1. KafkaStore.init() creates producer with idempotence=true (line 138) 2. Producer sends InitProducerId request ✅ (We handle this correctly) 3. Producer.initProducerId request completes successfully 4. Then KafkaStoreReaderThread created (line 142-145) 5. Reader thread constructor calls seekToBeginning() (line 183) 6. seekToBeginning() should send ListOffsets request 7. BUT nothing happens! Consumer blocks indefinitely ## Root Cause Analysis The PRODUCER successfully sends/receives InitProducerId. The CONSUMER fails at seekToBeginning() - never sends ListOffsets. The consumer is stuck somewhere in the Java Kafka client seek logic, possibly waiting for something related to the producer/idempotence setup. OR: The ListOffsets request IS being sent by the consumer, but we're not seeing it because it's being handled differently (data plane vs control plane routing). ## Next: Check if ListOffsets is being routed to data plane and never processed --- weed/mq/kafka/protocol/handler.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index dc75e0829..93b14f6a2 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -3921,6 +3921,14 @@ func (h *Handler) handleInitProducerId(correlationID uint32, apiVersion uint16, // v2+: transactional_id(NULLABLE_STRING) + transaction_timeout_ms(INT32) + producer_id(INT64) + producer_epoch(INT16) // v4+: Uses flexible format with tagged fields + glog.Warningf("🔴🔴🔴 InitProducerId HANDLER CALLED: correlationID=%d, apiVersion=%d, requestBodyLen=%d", correlationID, apiVersion, len(requestBody)) + + maxBytes := len(requestBody) + if maxBytes > 64 { + maxBytes = 64 + } + glog.Warningf("🔴🔴🔴 InitProducerId request first %d bytes (hex): %x", maxBytes, requestBody[:maxBytes]) + offset := 0 // Parse transactional_id (NULLABLE_STRING or COMPACT_NULLABLE_STRING for flexible versions) @@ -4026,6 +4034,12 @@ func (h *Handler) handleInitProducerId(correlationID uint32, apiVersion uint16, response = append(response, 0x00) // Empty response body tagged fields } + glog.Warningf("🔴🔴🔴 InitProducerId HANDLER RETURNING: correlationID=%d, responseLen=%d bytes", correlationID, len(response)) + respPreview := len(response) + if respPreview > 32 { + respPreview = 32 + } + glog.Warningf("🔴🔴🔴 InitProducerId response (hex): %x", response[:respPreview]) return response, nil }